Error Handling Architecture
This page provides a deep dive into how errors propagate and are handled throughout NPipeline. Understanding error handling architecture helps you reason about error propagation, containment strategies, and integration patterns.
NPipeline provides multiple strategies for handling errors that occur during pipeline execution. The key insight is where the error happens determines what you can do about it:
- Item-level errors (one order failed validation) → skip it, retry it, or store it for review
- Stream-level errors (database connection died) → restart the node, bypass it, or fail the pipeline
For practical implementation guidance, see Error Handling.
Error Propagation
By default, errors propagate up the pipeline and stop execution. This is often the right behavior for critical failures, but may be too harsh for transient errors or data validation issues.
var sourcePipe = source.Initialize(context, ct); // Returns 100 items (synchronous)
var transformPipe = new TransformPipe(sourcePipe, transform); // Processing...
// Error occurs on item #50
try
{
await foreach (var item in transformPipe.WithCancellation(ct))
{
await sink.ProcessAsync(item);
}
}
catch (InvalidOperationException ex)
{
// Error caught here - items 51-100 never processed
// This is appropriate for critical errors (e.g., corrupted data stream)
// But maybe not for validation failures on individual items
}
When fail-fast is right: Database connection errors, invalid stream format, system resource exhaustion
When you want containment instead: Validation failures, temporary network issues, item-level data problems
Error Containment
Contain errors within a node to prevent pipeline failure. This is how you convert a single item failure into a recoverable problem.
public class SafeTransform : ITransformNode<Input, Output>
{
private readonly ITransformNode<Input, Output> _inner;
private readonly ILogger<SafeTransform> _logger;
public SafeTransform(
ITransformNode<Input, Output> inner,
ILogger<SafeTransform> logger)
{
_inner = inner;
_logger = logger;
}
public async Task<Output> ExecuteAsync(
Input item,
PipelineContext context,
CancellationToken cancellationToken)
{
try
{
return await _inner.ExecuteAsync(item, context, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing item: {@input}", item);
// Rethrow or return default value depending on strategy
throw;
}
}
}
Use this pattern when: A validation check fails, a cache lookup times out, or external service responds with an error for one item (but should work for others)
Dead-Letter Handling
Route failed items to a dead-letter sink configured in the pipeline context:
// Configure dead-letter sink when building pipeline context
var deadLetterSink = new FileDeadLetterSink("dead-letters.json");
var context = new PipelineContext(
new PipelineContextConfiguration(DeadLetterSink: deadLetterSink));
// In a transform node, use INodeErrorHandler to route failed items
public class OrderTransform : ITransformNode<Order, ProcessedOrder>
{
public INodeErrorHandler? ErrorHandler { get; set; }
public async Task<ProcessedOrder> ExecuteAsync(
Order item,
PipelineContext context,
CancellationToken cancellationToken)
{
try
{
// Validate and process order
if (item.Amount <= 0)
throw new InvalidOperationException("Invalid order amount");
return new ProcessedOrder { /* ... */ };
}
catch (Exception ex)
{
// Error handler can route to dead-letter sink
ErrorHandler?.Handle(item, ex, context);
throw; // Or return default value depending on strategy
}
}
}
Retry Patterns
Configure retry behavior using PipelineRetryOptions:
// Global retry configuration
var builder = new PipelineBuilder();
builder.ConfigureRetry(new PipelineRetryOptions
{
MaxRetries = 3,
InitialDelayMs = 100,
MaxDelayMs = 5000,
BackoffMultiplier = 2.0
});
// Or per-node retry configuration
var nodeRetryOptions = new PipelineRetryOptions
{
MaxRetries = 3,
RetryDelay = TimeSpan.FromSeconds(1),
ShouldRetry = (exception) =>
exception is TimeoutException or HttpRequestException
};
builder.ConfigureNodeRetry<FetchInventoryTransform>(nodeRetryOptions);
var pipeline = builder
.AddSourceNode<OrderSourceNode>()
.AddTransformNode<FetchInventoryTransform>() // May fail temporarily
.AddTransformNode<ProcessOrderTransform>()
.AddSinkNode<OrderSinkNode>()
.BuildPipeline();
Error Context and Lineage
Track errors using the current node ID and lineage tracking:
// Access current node information during error handling
public override async Task<ProcessedOrder> ExecuteAsync(
Order item,
PipelineContext context,
CancellationToken cancellationToken)
{
try
{
// Process order
return await ProcessAsync(item, cancellationToken);
}
catch (Exception ex)
{
var currentNodeId = context.CurrentNodeId;
// Log complete error with context
logger.LogError(
ex,
"Error at node {nodeId}: {error}",
currentNodeId,
ex.Message);
throw;
}
}
// Enable item-level lineage tracking to see all nodes that have processed an item
var builder = new PipelineBuilder();
builder.EnableItemLevelLineage();
Supporting Components
Materialization Node
Buffer entire streams to catch downstream errors early:
// Materialize (collect all items) to detect errors before processing
var materialized = new MaterializationNode<Order>();
var pipeline = PipelineBuilder
.AddSourceNode<OrderSourceNode>()
.AddNode(materialized) // Buffers all orders
.AddTransformNode<ValidateOrderTransform>()
.AddSinkNode<OrderSinkNode>()
.BuildPipeline();
Stateful Registry
Maintain error state across pipeline executions:
var registry = new StatefulRegistry();
for (int i = 0; i < 5; i++)
{
try
{
await runner.ExecuteAsync(pipeline, context);
}
catch (Exception ex)
{
registry.RecordError(ex);
}
}
var stats = registry.GetErrorStatistics();
logger.LogInformation("Total errors: {count}", stats.ErrorCount);
Error Handling Strategies
| Strategy | Best For | Trade-offs |
|---|---|---|
| Fail Fast | Data quality critical | May lose unprocessed items |
| Skip Errors | Best-effort processing | Silent failures may hide bugs |
| Dead-Letter | Audit trail required | Added storage overhead |
| Retry | Transient failures | Delayed processing, retry storms |
| Materialize First | Need all data or nothing | Memory overhead |
Next Steps
- Cancellation Model - Learn how cancellation interacts with error handling
- Performance Characteristics - Understand error handling performance impact