Troubleshooting
ℹ️ Related Documentation This guide covers resilience-specific issues (retries, node restarts, materialization). For general pipeline execution issues, see General Troubleshooting.
This guide helps you diagnose and resolve common issues with resilience configuration in NPipeline. It provides symptom-based troubleshooting, debugging techniques, and solutions for common anti-patterns.
Symptom-Based Troubleshooting
Symptom: Node Doesn't Restart Despite Failures
Possible Causes:
- Missing
ResilientExecutionStrategy - No materialization configured for streaming inputs
- Error handler not returning
RestartNode - Retry limits exhausted
Diagnostic Steps:
// 1. Check if ResilientExecutionStrategy is applied
var nodeDefinition = pipeline.GetNodeDefinition("problematicNode");
var hasResilientStrategy = nodeDefinition.ExecutionStrategy is ResilientExecutionStrategy;
Console.WriteLine($"Has ResilientExecutionStrategy: {hasResilientStrategy}");
// 2. Check materialization configuration
var retryOptions = context.RetryOptions;
Console.WriteLine($"MaxMaterializedItems: {retryOptions.MaxMaterializedItems}");
if (retryOptions.MaxMaterializedItems == null)
{
Console.WriteLine("ERROR: No materialization configured for streaming inputs");
}
// 3. Add logging to error handler
public class DebuggingErrorHandler : IPipelineErrorHandler
{
public async Task<PipelineErrorDecision> HandleNodeFailureAsync(
string nodeId, Exception error, PipelineContext context, CancellationToken ct)
{
Console.WriteLine($"Error handler called for node: {nodeId}");
Console.WriteLine($"Exception type: {error.GetType().Name}");
Console.WriteLine($"Exception message: {error.Message}");
var decision = await HandleError(nodeId, error, context, ct);
Console.WriteLine($"Decision: {decision}");
return decision;
}
}
// 4. Check retry counts
Console.WriteLine($"MaxItemRetries: {retryOptions.MaxItemRetries}");
Console.WriteLine($"MaxNodeRestartAttempts: {retryOptions.MaxNodeRestartAttempts}");
Console.WriteLine($"MaxSequentialNodeAttempts: {retryOptions.MaxSequentialNodeAttempts}");
Solutions:
// Solution 1: Apply ResilientExecutionStrategy
var problematicHandle = builder
.AddTransform<ProblematicTransform, Input, Output>("problematicNode")
.WithExecutionStrategy(builder, new ResilientExecutionStrategy(
new SequentialExecutionStrategy()
));
// Solution 2: Configure materialization
var options = new PipelineRetryOptions(
MaxItemRetries: 3,
MaxNodeRestartAttempts: 2,
MaxSequentialNodeAttempts: 5
);
// Solution 3: Fix error handler
public class FixedErrorHandler : IPipelineErrorHandler
{
public Task<PipelineErrorDecision> HandleNodeFailureAsync(
string nodeId, Exception error, PipelineContext context, CancellationToken ct)
{
return error switch
{
TimeoutException => Task.FromResult(PipelineErrorDecision.RestartNode),
NetworkException => Task.FromResult(PipelineErrorDecision.RestartNode),
_ => Task.FromResult(PipelineErrorDecision.FailPipeline)
};
}
}
Symptom: OutOfMemoryException with Resilient Nodes
Possible Causes:
- Unbounded materialization (
MaxMaterializedItems = null) - Buffer size too large for available memory
- Large items being buffered
- Memory leaks in custom components
Diagnostic Steps:
// 1. Monitor memory usage
public class MemoryMonitor : IExecutionObserver
{
public void OnBufferUsage(string nodeId, int currentItems, int maxItems)
{
var memoryMB = GC.GetTotalMemory(false) / (1024 * 1024);
Console.WriteLine($"Node {nodeId}: {currentItems}/{maxItems} items, Memory: {memoryMB}MB");
if (memoryMB > 1000) // 1GB threshold
{
Console.WriteLine($"WARNING: High memory usage: {memoryMB}MB");
}
}
}
// 2. Check buffer configuration
var retryOptions = context.RetryOptions;
if (retryOptions.MaxMaterializedItems == null)
{
Console.WriteLine("WARNING: Unbounded materialization may cause memory issues");
}
// 3. Estimate memory requirements
public static long EstimateMemoryUsage(int itemCount, long itemSizeBytes)
{
return itemCount * itemSizeBytes * 2; // Factor in overhead
}
var estimatedMemory = EstimateMemoryUsage(
retryOptions.MaxMaterializedItems ?? 0,
estimatedItemSizeBytes
);
Console.WriteLine($"Estimated memory usage: {estimatedMemory / (1024 * 1024)}MB");
Solutions:
// Solution 1: Set appropriate buffer limits
var options = new PipelineRetryOptions(
MaxItemRetries: 3,
MaxNodeRestartAttempts: 2,
MaxSequentialNodeAttempts: 5,
MaxMaterializedItems: CalculateOptimalBufferSize()
);
private static int CalculateOptimalBufferSize()
{
var availableMemoryMB = GetAvailableMemoryMB();
var estimatedItemSizeKB = EstimateItemSize();
var memoryBudgetMB = availableMemoryMB / 4; // Use 25% of available memory
return (memoryBudgetMB * 1024) / estimatedItemSizeKB;
}
// Solution 2: Implement memory-aware error handling
public class MemoryAwareErrorHandler : IPipelineErrorHandler
{
public Task<PipelineErrorDecision> HandleNodeFailureAsync(
string nodeId, Exception error, PipelineContext context, CancellationToken ct)
{
// Check memory pressure
var memoryMB = GC.GetTotalMemory(false) / (1024 * 1024);
if (memoryMB > 2000) // 2GB threshold
{
Console.WriteLine("High memory pressure - avoiding restart");
return Task.FromResult(PipelineErrorDecision.ContinueWithoutNode);
}
return Task.FromResult(PipelineErrorDecision.RestartNode);
}
}
Symptom: Buffer Overflow Exceptions
Possible Causes:
MaxMaterializedItemsset too low- High throughput with small buffer
- Processing bottlenecks downstream
- Infinite loops in processing logic
Diagnostic Steps:
// 1. Monitor buffer utilization
public class BufferMonitor : IExecutionObserver
{
public void OnBufferUsage(string nodeId, int currentItems, int maxItems)
{
var usagePercent = (currentItems * 100) / maxItems;
if (usagePercent > 80)
{
Console.WriteLine($"WARNING: Node {nodeId} buffer at {usagePercent}% capacity");
}
}
}
// 2. Check throughput vs. processing rate
public class ThroughputMonitor
{
private readonly Dictionary<string, (int Input, int Output)> _counters = new();
public void RecordInput(string nodeId)
{
_counters.TryGetValue(nodeId, out var counter);
_counters[nodeId] = (counter.Input + 1, counter.Output);
}
public void RecordOutput(string nodeId)
{
_counters.TryGetValue(nodeId, out var counter);
_counters[nodeId] = (counter.Input, counter.Output + 1);
}
public void AnalyzeBackpressure()
{
foreach (var (nodeId, (input, output)) in _counters)
{
var backlog = input - output;
if (backlog > 1000)
{
Console.WriteLine($"WARNING: Node {nodeId} has backlog of {backlog} items");
}
}
}
}
Solutions:
// Solution 1: Increase buffer size
var options = new PipelineRetryOptions(
MaxItemRetries: 3,
MaxNodeRestartAttempts: 2,
MaxSequentialNodeAttempts: 5,
MaxMaterializedItems: 5000 // Increased buffer size
);
// Solution 2: Implement adaptive buffering
public class AdaptiveRetryOptions : PipelineRetryOptions
{
private int _currentMaxItems;
public AdaptiveRetryOptions() : base(3, 2, 5, 1000)
{
_currentMaxItems = MaxMaterializedItems ?? 1000;
}
public void AdjustBufferSize(int currentUsage, int maxCapacity)
{
var usagePercent = (currentUsage * 100) / maxCapacity;
if (usagePercent > 90)
{
_currentMaxItems = (int)(_currentMaxItems * 1.5); // Increase by 50%
Console.WriteLine($"Increased buffer size to {_currentMaxItems}");
}
}
}
// Solution 3: Add circuit breaker for buffer overflow
public class BufferOverflowAwareErrorHandler : IPipelineErrorHandler
{
public Task<PipelineErrorDecision> HandleNodeFailureAsync(
string nodeId, Exception error, PipelineContext context, CancellationToken ct)
{
if (error.Message.Contains("Resilience materialization exceeded MaxMaterializedItems"))
{
Console.WriteLine($"Buffer overflow for node {nodeId} - skipping restart");
return Task.FromResult(PipelineErrorDecision.ContinueWithoutNode);
}
return Task.FromResult(PipelineErrorDecision.RestartNode);
}
}
Symptom: Circuit Breaker Tripping Too Frequently
Possible Causes:
- Failure threshold set too low
- Persistent infrastructure issues
- Incorrect error classification
- Resource contention
- Wrong threshold type for your scenario (e.g., using ConsecutiveFailures when rate-based would be better)
Diagnostic Steps:
// 1. Monitor circuit breaker state
public class CircuitBreakerMonitor : IExecutionObserver
{
public void OnRetry(NodeRetryEvent retryEvent)
{
if (retryEvent.RetryKind == RetryKind.NodeRestart)
{
Console.WriteLine($"Node restart: {retryEvent.NodeId}, Attempt: {retryEvent.Attempt}");
}
}
}
// 2. Check circuit breaker statistics
if (context.Items.TryGetValue(PipelineContextKeys.CircuitBreakerManager, out var managerObj) &&
managerObj is ICircuitBreakerManager manager)
{
var circuitBreaker = manager.GetCircuitBreaker(nodeId, circuitBreakerOptions);
var stats = circuitBreaker.GetStatistics();
Console.WriteLine($"Circuit breaker stats: {stats.TotalOperations} total, " +
$"{stats.FailureCount} failures, {stats.FailureRate:P2} failure rate");
}
// 3. Analyze failure patterns
public class FailureAnalyzer
{
private readonly Dictionary<string, List<Exception>> _failures = new();
public void RecordFailure(string nodeId, Exception error)
{
if (!_failures.ContainsKey(nodeId))
_failures[nodeId] = new List<Exception>();
_failures[nodeId].Add(error);
// Analyze pattern after 10 failures
if (_failures[nodeId].Count >= 10)
{
AnalyzeFailurePattern(nodeId);
}
}
private void AnalyzeFailurePattern(string nodeId)
{
var failures = _failures[nodeId];
var errorTypes = failures.GroupBy(e => e.GetType().Name)
.ToDictionary(g => g.Key, g => g.Count());
Console.WriteLine($"Failure pattern for {nodeId}:");
foreach (var (errorType, count) in errorTypes)
{
Console.WriteLine($" {errorType}: {count} occurrences");
}
}
}
Solutions:
// Solution 1: Adjust circuit breaker options
var circuitBreakerOptions = new PipelineCircuitBreakerOptions
{
Enabled: true,
FailureThreshold: 10, // Increased from default
OpenDuration: TimeSpan.FromMinutes(2),
SamplingWindow: TimeSpan.FromMinutes(5),
ThresholdType: CircuitBreakerThresholdType.RollingWindowRate, // Try rate-based
FailureRateThreshold: 0.2 // 20% failure rate
};
// Solution 2: Implement smart error classification
public class SmartErrorHandler : IPipelineErrorHandler
{
public Task<PipelineErrorDecision> HandleNodeFailureAsync(
string nodeId, Exception error, PipelineContext context, CancellationToken ct)
{
// Don't restart for permanent failures
if (IsPermanentFailure(error))
{
return Task.FromResult(PipelineErrorDecision.ContinueWithoutNode);
}
// Only restart for transient failures
if (IsTransientFailure(error))
{
return Task.FromResult(PipelineErrorDecision.RestartNode);
}
return Task.FromResult(PipelineErrorDecision.FailPipeline);
}
private bool IsPermanentFailure(Exception ex)
{
return ex is AuthenticationException or
AuthorizationException or
NotFoundException;
}
private bool IsTransientFailure(Exception ex)
{
return ex is TimeoutException or
NetworkException or
TemporaryServiceException;
}
}
// Solution 3: Try different threshold types based on your scenario
// For high-volume, rate-sensitive scenarios:
var rateBasedOptions = new PipelineCircuitBreakerOptions
{
FailureThreshold: 100,
OpenDuration: TimeSpan.FromMinutes(2),
SamplingWindow: TimeSpan.FromMinutes(5),
ThresholdType: CircuitBreakerThresholdType.RollingWindowRate,
FailureRateThreshold: 0.05 // 5% failure rate
};
// For scenarios where both count and rate matter:
var hybridOptions = new PipelineCircuitBreakerOptions
{
FailureThreshold: 5,
OpenDuration: TimeSpan.FromMinutes(1),
SamplingWindow: TimeSpan.FromMinutes(5),
ThresholdType: CircuitBreakerThresholdType.Hybrid,
FailureRateThreshold: 0.3 // 30% failure rate
};
Debugging Techniques
1. Enable Detailed Logging
public class ResilienceLogger : IPipelineLogger
{
private readonly ILogger _logger;
public ResilienceLogger(ILogger logger)
{
_logger = logger;
}
public void LogDebug(string message, params object[] args)
{
_logger.LogDebug(message, args);
}
public void LogInformation(string message, params object[] args)
{
_logger.LogInformation(message, args);
}
public void LogWarning(string message, params object[] args)
{
_logger.LogWarning(message, args);
}
public void LogError(Exception exception, string message, params object[] args)
{
_logger.LogError(exception, message, args);
}
}
// Configure in context
var context = PipelineContext.WithObservability(
loggerFactory: new ResilienceLoggerFactory()
);
2. Add Custom Observability
public class ResilienceObserver : IExecutionObserver
{
public void OnRetry(NodeRetryEvent retryEvent)
{
Console.WriteLine($"[{DateTime.UtcNow:O}] Retry: {retryEvent.NodeId}, " +
$"Kind: {retryEvent.RetryKind}, " +
$"Attempt: {retryEvent.Attempt}, " +
$"Error: {retryEvent.Error?.Message}");
}
public void OnBufferUsage(string nodeId, int currentItems, int maxItems)
{
var usagePercent = (currentItems * 100) / maxItems;
Console.WriteLine($"[{DateTime.UtcNow:O}] Buffer: {nodeId}, " +
$"{currentItems}/{maxItems} ({usagePercent}%)");
}
}
// Register observer
var context = PipelineContext.Default;
context.ExecutionObserver = new ResilienceObserver();
3. Create Test Scenarios
public class ResilienceTestHarness
{
public async Task TestRestartScenario()
{
var flakySource = new FlakyDataSource(failureRate: 0.3);
var pipeline = CreateResilientPipeline(flakySource);
var context = PipelineContext.WithRetry(new PipelineRetryOptions(
MaxItemRetries: 3,
MaxNodeRestartAttempts: 2,
MaxSequentialNodeAttempts: 5,
MaxMaterializedItems: 1000
));
var result = await pipeline.RunAsync(context);
Assert.IsTrue(result.IsSuccess);
Assert.Greater(flakySource.ProcessedItems, 0);
}
public async Task TestBufferOverflowScenario()
{
var highVolumeSource = new HighVolumeSource(itemsPerSecond: 1000);
var pipeline = CreateResilientPipeline(highVolumeSource);
var context = PipelineContext.WithRetry(new PipelineRetryOptions(
MaxMaterializedItems: 100 // Small buffer to trigger overflow
));
try
{
await pipeline.RunAsync(context);
Assert.Fail("Expected buffer overflow exception");
}
catch (InvalidOperationException ex) when ex.Message.Contains("MaxMaterializedItems")
{
// Expected exception
}
}
}
Common Anti-Patterns and Solutions
Anti-Pattern 1: Blind Retry Everything
// WRONG: Retry everything, including permanent failures
public class BlindRetryHandler : IPipelineErrorHandler
{
public Task<PipelineErrorDecision> HandleNodeFailureAsync(
string nodeId, Exception error, PipelineContext context, CancellationToken ct)
{
return Task.FromResult(PipelineErrorDecision.RestartNode); // Always restarts!
}
}
// CORRECT: Smart error classification
public class SmartRetryHandler : IPipelineErrorHandler
{
public Task<PipelineErrorDecision> HandleNodeFailureAsync(
string nodeId, Exception error, PipelineContext context, CancellationToken ct)
{
return error switch
{
// Transient failures - retry
TimeoutException => Task.FromResult(PipelineErrorDecision.RestartNode),
NetworkException => Task.FromResult(PipelineErrorDecision.RestartNode),
HttpRequestException http when IsTransientHttpError(http) =>
Task.FromResult(PipelineErrorDecision.RestartNode),
// Permanent failures - don't retry
AuthenticationException => Task.FromResult(PipelineErrorDecision.FailPipeline),
ValidationException => Task.FromResult(PipelineErrorDecision.Skip),
NotFoundException => Task.FromResult(PipelineErrorDecision.ContinueWithoutNode),
// Unknown failures - fail safe
_ => Task.FromResult(PipelineErrorDecision.FailPipeline)
};
}
private bool IsTransientHttpError(HttpRequestException ex)
{
return ex.StatusCode is HttpStatusCode.ServiceUnavailable or
HttpStatusCode.RequestTimeout or
HttpStatusCode.TooManyRequests;
}
}
Anti-Pattern 2: Unbounded Buffering
// WRONG: Unbounded materialization
var options = new PipelineRetryOptions(
MaxItemRetries: 3,
MaxNodeRestartAttempts: 2,
MaxSequentialNodeAttempts: 5,
MaxMaterializedItems: null // No limit - potential OOM
);
// CORRECT: Calculated buffer limits
var options = new PipelineRetryOptions(
MaxItemRetries: 3,
MaxNodeRestartAttempts: 2,
MaxSequentialNodeAttempts: 5,
MaxMaterializedItems: CalculateSafeBufferLimit()
);
private static int CalculateSafeBufferLimit()
{
var availableMemoryMB = GC.GetTotalMemory(false) / (1024 * 1024);
var estimatedItemSizeKB = 10; // Estimate based on your data
var memoryBudgetMB = Math.Min(availableMemoryMB / 4, 1000); // Max 1GB
return (memoryBudgetMB * 1024) / estimatedItemSizeKB;
}
Anti-Pattern 3: Ignoring Memory Pressure
// WRONG: No memory awareness
public class MemoryObliviousHandler : IPipelineErrorHandler
{
public Task<PipelineErrorDecision> HandleNodeFailureAsync(
string nodeId, Exception error, PipelineContext context, CancellationToken ct)
{
return Task.FromResult(PipelineErrorDecision.RestartNode); // Always restarts
}
}
// CORRECT: Memory-aware error handling
public class MemoryAwareHandler : IPipelineErrorHandler
{
public Task<PipelineErrorDecision> HandleNodeFailureAsync(
string nodeId, Exception error, PipelineContext context, CancellationToken ct)
{
// Check memory pressure
var memoryMB = GC.GetTotalMemory(false) / (1024 * 1024);
if (memoryMB > 2000) // 2GB threshold
{
Console.WriteLine($"High memory usage ({memoryMB}MB) - avoiding restart");
return Task.FromResult(PipelineErrorDecision.ContinueWithoutNode);
}
return error switch
{
TimeoutException => Task.FromResult(PipelineErrorDecision.RestartNode),
_ => Task.FromResult(PipelineErrorDecision.FailPipeline)
};
}
}
Anti-Pattern 4: One-Size-Fits-All Configuration
// WRONG: Same configuration for all nodes
var defaultOptions = new PipelineRetryOptions(
MaxItemRetries: 3,
MaxNodeRestartAttempts: 2,
MaxSequentialNodeAttempts: 5,
MaxMaterializedItems: 1000
);
var context = PipelineContext.WithRetry(defaultOptions);
// CORRECT: Node-specific configuration using per-node retry options
var betterDefaultOptions = new PipelineRetryOptions(
MaxItemRetries: 2,
MaxNodeRestartAttempts: 1,
MaxSequentialNodeAttempts: 3,
MaxMaterializedItems: 500
);
var betterContext = PipelineContext.WithRetry(betterDefaultOptions);
var criticalNodeHandle = builder
.AddTransform<CriticalTransform, Input, Output>("criticalNode");
var highVolumeHandle = builder
.AddTransform<HighVolumeTransform, Input, Output>("highVolumeNode");
builder
.WithRetryOptions(criticalNodeHandle, new PipelineRetryOptions(
MaxItemRetries: 5,
MaxNodeRestartAttempts: 5,
MaxSequentialNodeAttempts: 10,
MaxMaterializedItems: 2000
))
.WithRetryOptions(highVolumeHandle, new PipelineRetryOptions(
MaxItemRetries: 1,
MaxNodeRestartAttempts: 2,
MaxSequentialNodeAttempts: 4,
MaxMaterializedItems: 10000
));
Monitoring and Alerting
1. Key Metrics to Monitor
public class ResilienceMetrics
{
private readonly IMetrics _metrics;
public ResilienceMetrics(IMetrics metrics)
{
_metrics = metrics;
}
public void RecordNodeRestart(string nodeId)
{
_metrics.Counter("node_restarts", new[] { ("node_id", nodeId) }).Increment();
}
public void RecordBufferUsage(string nodeId, int current, int max)
{
var usagePercent = (current * 100) / max;
_metrics.Gauge("buffer_usage_percent", usagePercent, new[] { ("node_id", nodeId) });
}
public void RecordMemoryUsage(long bytes)
{
_metrics.Gauge("memory_usage_bytes", bytes);
}
public void RecordCircuitBreakerTrip(string nodeId)
{
_metrics.Counter("circuit_breaker_trips", new[] { ("node_id", nodeId) }).Increment();
}
}
2. Alert Thresholds
| Metric | Warning Threshold | Critical Threshold | Action |
|---|---|---|---|
| Node restarts per minute | >5 | >10 | Investigate infrastructure |
| Buffer usage percent | >80% | >95% | Increase buffer size |
| Memory usage | >1GB | >2GB | Scale horizontally |
| Circuit breaker trips per hour | >2 | >5 | Review error classification |
3. Health Checks
public class ResilienceHealthCheck : IHealthCheck
{
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context, CancellationToken cancellationToken = default)
{
var memoryMB = GC.GetTotalMemory(false) / (1024 * 1024);
var issues = new List<string>();
if (memoryMB > 2000)
issues.Add($"High memory usage: {memoryMB}MB");
if (GetFailedNodeCount() > 5)
issues.Add($"High failure rate: {GetFailedNodeCount()} nodes");
return issues.Count == 0
? HealthCheckResult.Healthy()
: HealthCheckResult.Degraded(string.Join(", ", issues));
}
}
Next Steps
- Configuration Guide: Review proper configuration patterns
- Dependency Chains: Understand critical prerequisite relationships
- Error Codes Reference: Look up specific NPipeline error codes (NP01xx-NP05xx)