Skip to main content

Error Handling Architecture

This page provides a deep dive into how errors propagate and are handled throughout NPipeline. Understanding error handling architecture helps you reason about error propagation, containment strategies, and integration patterns.

NPipeline provides multiple strategies for handling errors that occur during pipeline execution. The key insight is where the error happens determines what you can do about it:

  • Item-level errors (one order failed validation) → skip it, retry it, or store it for review
  • Stream-level errors (database connection died) → restart the node, bypass it, or fail the pipeline

For practical implementation guidance, see Error Handling.

Error Propagation

By default, errors propagate up the pipeline and stop execution. This is often the right behavior for critical failures, but may be too harsh for transient errors or data validation issues.

var sourcePipe = source.Initialize(context, ct);      // Returns 100 items (synchronous)
var transformPipe = new TransformPipe(sourcePipe, transform); // Processing...

// Error occurs on item #50
try
{
await foreach (var item in transformPipe.WithCancellation(ct))
{
await sink.ProcessAsync(item);
}
}
catch (InvalidOperationException ex)
{
// Error caught here - items 51-100 never processed
// This is appropriate for critical errors (e.g., corrupted data stream)
// But maybe not for validation failures on individual items
}

When fail-fast is right: Database connection errors, invalid stream format, system resource exhaustion

When you want containment instead: Validation failures, temporary network issues, item-level data problems

Error Containment

Contain errors within a node to prevent pipeline failure. This is how you convert a single item failure into a recoverable problem.

public class SafeTransform : ITransformNode<Input, Output>
{
private readonly ITransformNode<Input, Output> _inner;
private readonly ILogger<SafeTransform> _logger;

public SafeTransform(
ITransformNode<Input, Output> inner,
ILogger<SafeTransform> logger)
{
_inner = inner;
_logger = logger;
}

public async Task<Output> ExecuteAsync(
Input item,
PipelineContext context,
CancellationToken cancellationToken)
{
try
{
return await _inner.ExecuteAsync(item, context, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing item: {@input}", item);
// Rethrow or return default value depending on strategy
throw;
}
}
}

Use this pattern when: A validation check fails, a cache lookup times out, or external service responds with an error for one item (but should work for others)

Dead-Letter Handling

Route failed items to a dead-letter sink configured in the pipeline context:

// Configure dead-letter sink when building pipeline context
var deadLetterSink = new FileDeadLetterSink("dead-letters.json");
var context = new PipelineContext(
new PipelineContextConfiguration(DeadLetterSink: deadLetterSink));

// In a transform node, use INodeErrorHandler to route failed items
public class OrderTransform : ITransformNode<Order, ProcessedOrder>
{
public INodeErrorHandler? ErrorHandler { get; set; }

public async Task<ProcessedOrder> ExecuteAsync(
Order item,
PipelineContext context,
CancellationToken cancellationToken)
{
try
{
// Validate and process order
if (item.Amount <= 0)
throw new InvalidOperationException("Invalid order amount");

return new ProcessedOrder { /* ... */ };
}
catch (Exception ex)
{
// Error handler can route to dead-letter sink
ErrorHandler?.Handle(item, ex, context);
throw; // Or return default value depending on strategy
}
}
}

Retry Patterns

Configure retry behavior using PipelineRetryOptions:

// Global retry configuration
var builder = new PipelineBuilder();
builder.ConfigureRetry(new PipelineRetryOptions
{
MaxRetries = 3,
InitialDelayMs = 100,
MaxDelayMs = 5000,
BackoffMultiplier = 2.0
});

// Or per-node retry configuration
var nodeRetryOptions = new PipelineRetryOptions
{
MaxRetries = 3,
RetryDelay = TimeSpan.FromSeconds(1),
ShouldRetry = (exception) =>
exception is TimeoutException or HttpRequestException
};
builder.ConfigureNodeRetry<FetchInventoryTransform>(nodeRetryOptions);

var pipeline = builder
.AddSourceNode<OrderSourceNode>()
.AddTransformNode<FetchInventoryTransform>() // May fail temporarily
.AddTransformNode<ProcessOrderTransform>()
.AddSinkNode<OrderSinkNode>()
.BuildPipeline();

Error Context and Lineage

Track errors using the current node ID and lineage tracking:

// Access current node information during error handling
public override async Task<ProcessedOrder> ExecuteAsync(
Order item,
PipelineContext context,
CancellationToken cancellationToken)
{
try
{
// Process order
return await ProcessAsync(item, cancellationToken);
}
catch (Exception ex)
{
var currentNodeId = context.CurrentNodeId;

// Log complete error with context
logger.LogError(
ex,
"Error at node {nodeId}: {error}",
currentNodeId,
ex.Message);

throw;
}
}

// Enable item-level lineage tracking to see all nodes that have processed an item
var builder = new PipelineBuilder();
builder.EnableItemLevelLineage();

Supporting Components

Materialization Node

Buffer entire streams to catch downstream errors early:

// Materialize (collect all items) to detect errors before processing
var materialized = new MaterializationNode<Order>();
var pipeline = PipelineBuilder
.AddSourceNode<OrderSourceNode>()
.AddNode(materialized) // Buffers all orders
.AddTransformNode<ValidateOrderTransform>()
.AddSinkNode<OrderSinkNode>()
.BuildPipeline();

Stateful Registry

Maintain error state across pipeline executions:

var registry = new StatefulRegistry();

for (int i = 0; i < 5; i++)
{
try
{
await runner.ExecuteAsync(pipeline, context);
}
catch (Exception ex)
{
registry.RecordError(ex);
}
}

var stats = registry.GetErrorStatistics();
logger.LogInformation("Total errors: {count}", stats.ErrorCount);

Error Handling Strategies

StrategyBest ForTrade-offs
Fail FastData quality criticalMay lose unprocessed items
Skip ErrorsBest-effort processingSilent failures may hide bugs
Dead-LetterAudit trail requiredAdded storage overhead
RetryTransient failuresDelayed processing, retry storms
Materialize FirstNeed all data or nothingMemory overhead

Next Steps