Extension Points
NPipeline provides multiple ways to extend and customize its behavior without modifying the core framework.
Custom Nodes
Build your own source, transform, or sink nodes:
// Custom Source
public class DatabaseSourceNode : ISourceNode<Order>
{
private readonly string _connectionString;
public DatabaseSourceNode(string connectionString)
{
_connectionString = connectionString;
}
public async IAsyncEnumerable<Order> ExecuteAsync(
PipelineContext context,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync(cancellationToken);
using var command = new SqlCommand("SELECT * FROM Orders", connection);
using var reader = await command.ExecuteReaderAsync(cancellationToken);
while (await reader.ReadAsync(cancellationToken))
{
yield return new Order
{
Id = reader.GetInt32(0),
Amount = reader.GetDecimal(1),
// ...
};
}
}
}
// Custom Transform
public class EnrichmentTransform : ITransformNode<Order, EnrichedOrder>
{
private readonly IEnrichmentService _enrichmentService;
public EnrichmentTransform(IEnrichmentService enrichmentService)
{
_enrichmentService = enrichmentService;
}
public async Task<EnrichedOrder> ExecuteAsync(
Order input,
PipelineContext context,
CancellationToken cancellationToken)
{
var enrichedData = await _enrichmentService.EnrichAsync(input, cancellationToken);
return new EnrichedOrder
{
Order = input,
EnrichedData = enrichedData
};
}
}
// Custom Sink
public class MetricsCollectorSink : ISinkNode<Result>
{
private readonly IMetricsCollector _metrics;
public MetricsCollectorSink(IMetricsCollector metrics)
{
_metrics = metrics;
}
public async Task ExecuteAsync(
IDataPipe<Result> input,
PipelineContext context,
CancellationToken cancellationToken)
{
var activity = context.Tracer.CurrentActivity;
await foreach (var result in input.WithCancellation(cancellationToken))
{
_metrics.RecordSuccess(result.ProcessingTimeMs);
await Task.Delay(100); // Simulate processing
}
}
}
Build-Time Roslyn Analyzers
NPipeline includes built-in Roslyn analyzers that provide compile-time validation of your pipeline configurations. These analyzers detect common mistakes before they reach production.
Key Analyzer:
- NP9002 - Detects incomplete resilience configurations where
RestartNodeis used without required prerequisites
The analyzer framework is designed to be extensible, allowing community contributions of additional analyzers for NPipeline-specific patterns.
Learn more: Build-Time Resilience Analyzer Guide
Custom Execution Strategies
Create custom node execution strategies:
public class ThrottledExecutionStrategy : INodeExecutionStrategy
{
private readonly int _maxConcurrent;
private readonly SemaphoreSlim _semaphore;
public ThrottledExecutionStrategy(int maxConcurrent)
{
_maxConcurrent = maxConcurrent;
_semaphore = new SemaphoreSlim(maxConcurrent);
}
public async Task ExecuteAsync(
Func<CancellationToken, Task> work,
CancellationToken cancellationToken)
{
await _semaphore.WaitAsync(cancellationToken);
try
{
await work(cancellationToken);
}
finally
{
_semaphore.Release();
}
}
}
Context Data
Store and retrieve arbitrary data in pipeline context:
// Store data
var context = PipelineContext.Default;
context.Set("startTime", DateTime.UtcNow);
context.Set("userId", 12345);
context.Set("requestId", Guid.NewGuid());
// Access in transform
public async IAsyncEnumerable<Output> ProcessAsync(
Input input,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var startTime = context.Get<DateTime>("startTime");
var userId = context.Get<int>("userId");
var requestId = context.Get<Guid>("requestId");
yield return new Output
{
UserId = userId,
RequestId = requestId,
ProcessingTime = DateTime.UtcNow - startTime
};
}
Diagnostic Observers
Observe pipeline execution for diagnostics:
public class DiagnosticObserver : IPipelineDiagnosticObserver
{
private readonly ILogger<DiagnosticObserver> _logger;
public DiagnosticObserver(ILogger<DiagnosticObserver> logger)
{
_logger = logger;
}
public void OnNodeStarted(string nodeName)
{
_logger.LogInformation("Node started: {node}", nodeName);
}
public void OnNodeCompleted(string nodeName, int itemsProcessed)
{
_logger.LogInformation("Node completed: {node}, items: {count}", nodeName, itemsProcessed);
}
public void OnNodeError(string nodeName, Exception ex)
{
_logger.LogError(ex, "Node error: {node}", nodeName);
}
}
Composite Patterns
Combine extensions for complex behaviors:
public class ResilientTransform : ITransformNode<T, T>
{
private readonly ITransformNode<T, T> _inner;
private readonly IRetryPolicy _retryPolicy;
private readonly IFallbackProvider<T> _fallback;
private readonly IDiagnostics _diagnostics;
public ResilientTransform(
ITransformNode<T, T> inner,
IRetryPolicy retryPolicy,
IFallbackProvider<T> fallback,
IDiagnostics diagnostics)
{
_inner = inner;
_retryPolicy = retryPolicy;
_fallback = fallback;
_diagnostics = diagnostics;
}
public async IAsyncEnumerable<T> ProcessAsync(
T input,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
try
{
_diagnostics.LogAttempt(_inner.GetType().Name);
await foreach (var result in _retryPolicy.ExecuteAsync(
() => _inner.ProcessAsync(input, cancellationToken),
cancellationToken))
{
_diagnostics.LogSuccess(_inner.GetType().Name);
yield return result;
}
}
catch (Exception ex)
{
_diagnostics.LogFailure(_inner.GetType().Name, ex);
var fallback = await _fallback.GetFallbackAsync(input, ex);
yield return fallback;
}
}
}
Next Steps
- Design Principles - Understand guiding principles for extensions
- Advanced Topics - Testing Pipelines - Test your custom nodes