Node-Level Error Handling
Node-level error handling allows you to define what happens to problematic items without affecting the entire pipeline. When an error occurs during the processing of an individual item in a node, NPipeline invokes your error handler to determine how to proceed.
INodeErrorHandler Interface
To handle errors within a specific node, implement the INodeErrorHandler<in TNode, in TData> interface.
public interface INodeErrorHandler
{
}
/// <summary>
/// Defines the contract for handling errors that occur within a specific node.
/// </summary>
/// <typeparam name="TNode">The type of node where the error occurred.</typeparam>
/// <typeparam name="TData">The type of the data item that failed.</typeparam>
public interface INodeErrorHandler<in TNode, in TData> : INodeErrorHandler where TNode : INode
{
/// <summary>
/// Handles an error that occurred during node execution.
/// </summary>
/// <param name="node">The instance of node that failed.</param>
/// <param name="failedItem">The data item that caused the error.</param>
/// <param name="error">The exception that was thrown.</param>
/// <param name="context">The current pipeline context.</param>
/// <param name="cancellationToken">A token to observe for cancellation requests.</param>
/// <returns>A <see cref="NodeErrorDecision" /> indicating how to proceed.</returns>
Task<NodeErrorDecision> HandleAsync(
TNode node,
TData failedItem,
Exception error,
PipelineContext context,
CancellationToken cancellationToken);
}
Interface Components
INodeErrorHandler: Marker interface for dependency injection registration.INodeErrorHandler<in TNode, in TData>: Generic interface defining the actual error handling logic.TNode: The type of node where the error occurred.TData: The type of the data item that caused the error.HandleAsync: Called when an error occurs. Receives the failing node, item, exception, and pipeline context. Must return aNodeErrorDecision.
NodeErrorDecision Enum
This enum dictates how the pipeline should proceed after a node-level error:
Skip: The failed item is discarded, and the pipeline continues processing subsequent items.Retry: The pipeline attempts to re-process the failed item. The number of retries is configured viaPipelineRetryOptions.DeadLetter: The failed item is sent to a configured dead-letter sink, and the pipeline continues.Fail: The pipeline immediately terminates with an exception.
Implementing a Custom Node Error Handler
Here's a basic example:
using NPipeline;
using NPipeline.ErrorHandling;
using NPipeline.Nodes;
using NPipeline.Pipeline;
/// <summary>
/// Custom node error handler for transform nodes processing string data.
/// Demonstrates error classification and appropriate response strategies.
/// </summary>
public sealed class MyNodeErrorHandler : INodeErrorHandler<ITransformNode<string, string>, string>
{
private readonly ILogger _logger;
public MyNodeErrorHandler(ILogger logger)
{
_logger = logger;
}
/// <summary>
/// Handles errors that occur during string transformation.
/// Implements different strategies based on error type for optimal recovery.
/// </summary>
public Task<NodeErrorDecision> HandleAsync(
ITransformNode<string, string> node,
string failedItem,
Exception error,
PipelineContext context,
CancellationToken cancellationToken)
{
// Log error with full context for troubleshooting
_logger.LogError(error, "Error in node '{NodeName}' processing '{FailedItem}': {ErrorMessage}",
node.Name, failedItem, error.Message);
// Choose error handling strategy based on exception type
return error switch
{
// Data format errors are permanent - send to dead letter queue
FormatException => Task.FromResult(NodeErrorDecision.DeadLetter),
// Items marked for retry get another chance
_ when failedItem.Contains("retry") => Task.FromResult(NodeErrorDecision.Retry),
// All other errors are skipped to continue processing
_ => Task.FromResult(NodeErrorDecision.Skip)
};
}
}
Registering a Node Error Handler
Register your error handler with the PipelineBuilder:
using NPipeline;
using NPipeline.ErrorHandling;
using NPipeline.Pipeline;
public sealed class ErrorHandlingPipelineDefinition : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var sourceHandle = builder.AddSource<DataSource, string>();
var transformHandle = builder.AddTransform<DataTransform, string, string>();
var sinkHandle = builder.AddSink<DataSink, string>();
builder.Connect(sourceHandle, transformHandle);
builder.Connect(transformHandle, sinkHandle);
// Configure retry options
builder.WithRetryOptions(new PipelineRetryOptions(
MaxItemRetries: 3,
MaxNodeRestartAttempts: 2,
MaxSequentialNodeAttempts: 5
));
}
}
Also register with your DI container:
services.AddSingleton<INodeErrorHandler<ITransformNode<string, string>, string>, MyNodeErrorHandler>();
The marker interface INodeErrorHandler (non-generic) is used for DI registration, allowing the container to discover all node error handler implementations.
Common Scenarios
Scenario 1: Handling Transient Network Errors
public class NetworkErrorHandler : INodeErrorHandler<IApiTransformNode, string>
{
private readonly ILogger _logger;
private int _retryCount = 0;
public NetworkErrorHandler(ILogger logger)
{
_logger = logger;
}
/// <summary>
/// Handles network errors with exponential backoff retry strategy.
/// Transient errors are retried, persistent failures are redirected.
/// </summary>
public Task<NodeErrorDecision> HandleAsync(
IApiTransformNode node,
string failedItem,
Exception error,
PipelineContext context,
CancellationToken cancellationToken)
{
// Handle network-related errors specifically
if (error is HttpRequestException httpEx)
{
_retryCount++;
_logger.LogWarning("Network error (attempt {RetryCount}): {ErrorMessage}",
_retryCount, httpEx.Message);
// Retry up to 3 times for transient network errors
if (_retryCount <= 3)
{
return Task.FromResult(NodeErrorDecision.Retry);
}
else
{
// After max retries, reset counter and redirect to dead letter
_retryCount = 0;
return Task.FromResult(NodeErrorDecision.DeadLetter);
}
}
// Non-network errors are skipped to continue processing
return Task.FromResult(NodeErrorDecision.Skip);
}
}
Scenario 2: Data Validation Errors
public class ValidationErrorHandler : INodeErrorHandler<IValidatorNode, string>
{
private readonly ILogger _logger;
public ValidationErrorHandler(ILogger logger)
{
_logger = logger;
}
/// <summary>
/// Handles validation errors by redirecting to dead letter queue.
/// Data quality issues are logged separately from system errors.
/// </summary>
public Task<NodeErrorDecision> HandleAsync(
IValidatorNode node,
string failedItem,
Exception error,
PipelineContext context,
CancellationToken cancellationToken)
{
// Handle validation errors specifically
if (error is ValidationException validationEx)
{
_logger.LogWarning("Validation failed for item: {Item}. Error: {Error}",
failedItem, validationEx.Message);
// Validation failures indicate data quality issues - redirect for manual review
return Task.FromResult(NodeErrorDecision.DeadLetter);
}
// Other types of errors are skipped to continue processing
return Task.FromResult(NodeErrorDecision.Skip);
}
}
Scenario 3: Production-Ready Error Handler with Metrics Integration
public class ProductionNodeErrorHandler : INodeErrorHandler<ITransformNode<string, string>, string>
{
private readonly ILogger _logger;
private readonly IMetrics _metrics;
public ProductionNodeErrorHandler(ILogger logger, IMetrics metrics)
{
_logger = logger;
_metrics = metrics;
}
/// <summary>
/// Handles errors with comprehensive logging and metrics collection.
/// Enables monitoring and alerting for production environments.
/// </summary>
public Task<NodeErrorDecision> HandleAsync(
ITransformNode<string, string> node,
string failedItem,
Exception error,
PipelineContext context,
CancellationToken cancellationToken)
{
// Record error metrics for monitoring
_metrics.Increment("node_errors", new[] {
new KeyValuePair<string, object>("node_type", node.GetType().Name),
new KeyValuePair<string, object>("error_type", error.GetType().Name)
});
// Log error with full context
_logger.LogError(error, "Error processing item in node {NodeName}", node.Name);
// Implement error handling strategy based on exception type
return error switch
{
// Data validation errors - redirect to dead letter
ValidationException => Task.FromResult(NodeErrorDecision.DeadLetter),
// Transient errors - retry
TimeoutException => Task.FromResult(NodeErrorDecision.Retry),
HttpRequestException => Task.FromResult(NodeErrorDecision.Retry),
// All other errors - skip
_ => Task.FromResult(NodeErrorDecision.Skip)
};
}
}
Fluent Error Handler Builder
For simpler scenarios, use the fluent builder API to construct error handlers inline:
using NPipeline.ErrorHandling;
// Create a handler that retries on timeout, skips on validation errors
var handler = ErrorHandler.ForNode<MyTransform, string>()
.On<TimeoutException>().Retry(3)
.On<ValidationException>().Skip()
.OnAny().DeadLetter()
.Build();
Pre-built Handler Factories
// Retry all errors up to N times, then dead-letter
var retryHandler = ErrorHandler.RetryAlways<MyTransform, string>(maxRetries: 3);
// Skip all errors and continue processing
var skipHandler = ErrorHandler.SkipAlways<MyTransform, string>();
// Send all errors to dead-letter sink
var deadLetterHandler = ErrorHandler.DeadLetterAlways<MyTransform, string>();
Exception Type Matching
Rules are evaluated in order, so place more specific rules first:
var handler = ErrorHandler.ForNode<MyTransform, string>()
.On<TimeoutException>().Retry(3)
.On<IOException>().Retry(5)
.On<ArgumentException>().Skip()
.On<InvalidOperationException>().Fail()
.OnAny().DeadLetter() // Catch-all (must be last)
.Build();
⚠️ Important: OnAny() must be the last rule because it matches all exceptions.
Custom Predicate Matching
For complex scenarios, use custom predicates:
var handler = ErrorHandler.ForNode<MyTransform, string>()
.When(ex => ex.Message.Contains("timeout", StringComparison.OrdinalIgnoreCase))
.Retry(3)
.When(ex => ex.Message.Contains("invalid", StringComparison.OrdinalIgnoreCase))
.Skip()
.OnAny().Fail()
.Build();
Default Behavior
Use Otherwise() for a default behavior when no rules match:
var handler = ErrorHandler.ForNode<MyTransform, string>()
.On<TimeoutException>().Retry(2)
.Otherwise(NodeErrorDecision.Skip) // Default for unmatched exceptions
.Build();
Fluent Builder Best Practices
- Rule Evaluation Order: Rules are evaluated in order added. First match wins.
- Retry Counting: Automatically tracks retry attempts and transitions to dead-letter when exhausted.
- Type Hierarchy: Exception matching respects inheritance -
On<ArgumentException>()matches derived types. - Catch-All Pattern: Use
OnAny()only at the end as a catch-all.
When to Use Each Approach
Use Fluent Builder When:
- ✅ Simple error handling logic with clear exception-to-action mapping
- ✅ Prototyping or quick implementations
- ✅ Straightforward retry/skip/dead-letter strategies
Implement INodeErrorHandler When:
- ✅ Complex state management across multiple items
- ✅ Advanced logging, metrics, or custom recovery logic
- ✅ Error handling that requires dependency injection
- ✅ Need access to custom services or configuration
Best Practices
-
Be specific about error types: Different error types should be handled differently. Transient errors might be worth retrying, while data validation errors should probably be redirected.
-
Implement retry limits: Always limit the number of retries to prevent infinite loops and resource exhaustion.
-
Log detailed error information: Include sufficient context in your error logs to help with troubleshooting.
-
Use dead-letter queues for problematic items: Items that consistently fail should be redirected to a dead-letter queue for later analysis.
-
Consider performance implications: Error handling logic adds overhead to normal processing, so keep it efficient.
Related Documentation
- Error Handling Overview - Understand both levels of error handling
- Pipeline-Level Error Handling - Handle stream/node failures
- Retries - Configure retry policies and strategies
- Dead Letter Queues - Route problematic items for analysis