Component Architecture
NPipeline consists of several key components that work together to define, build, and execute pipelines. Understanding their roles and interactions is essential to working effectively with the framework.
Major Components Overview
1. Pipeline Definition (IPipelineDefinition)
Purpose: Declarative blueprint of your pipeline structure
public interface IPipelineDefinition
{
void Define(PipelineBuilder builder, PipelineContext context);
}
Responsibilities:
- Connect nodes via the builder
- Configure the execution topology
- Define error handling strategy
Example:
public class OrderProcessingPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var source = builder.AddSource<OrderSource, Order>();
var validator = builder.AddTransform<OrderValidator, Order, ValidatedOrder>();
var enricher = builder.AddTransform<OrderEnricher, ValidatedOrder, EnrichedOrder>();
var sink = builder.AddSink<OrderSink, EnrichedOrder>();
builder.Connect(source, validator);
builder.Connect(validator, enricher);
builder.Connect(enricher, sink);
}
}
2. Pipeline Builder
Purpose: Orchestrates node creation and graph construction
public class PipelineBuilder
{
public SourceNodeHandle<TOut> AddSource<TNode, TOut>()
where TNode : ISourceNode<TOut>;
public TransformNodeHandle<TIn, TOut> AddTransform<TNode, TIn, TOut>()
where TNode : ITransformNode<TIn, TOut>;
public SinkNodeHandle<TIn> AddSink<TNode, TIn>()
where TNode : ISinkNode<TIn>;
public PipelineBuilder Connect<TData>(
SourceNodeHandle<TData> source,
TransformNodeHandle<TData, TOut> target);
// ... other Connect overloads
}
Builder Process:
- Resolves nodes from dependency injection
- Builds execution graph
- Validates connectivity
- Compiles to executable pipeline
3. Pipeline Context
Purpose: Shared runtime context for all nodes in the pipeline
public class PipelineContext
{
public Dictionary<string, object> Items { get; }
public Dictionary<string, object> Parameters { get; }
public Dictionary<string, object> Properties { get; }
public CancellationToken CancellationToken { get; }
public string CurrentNodeId { get; }
public IPipelineLoggerFactory LoggerFactory { get; }
public IPipelineTracer Tracer { get; }
public IErrorHandlerFactory ErrorHandlerFactory { get; }
public ILineageFactory LineageFactory { get; }
public IObservabilityFactory ObservabilityFactory { get; }
public IDeadLetterSink? DeadLetterSink { get; }
public IPipelineErrorHandler? PipelineErrorHandler { get; }
public PipelineRetryOptions RetryOptions { get; }
}
Contains:
- Items - Shared state between nodes
- Parameters - Input parameters for the pipeline
- Properties - Dictionary for extensions and plugins to store custom data
- CancellationToken - For cancellation propagation
- CurrentNodeId - ID of the node currently being executed
- LoggerFactory - Factory to create loggers for nodes
- Tracer - For distributed tracing
- ErrorHandlerFactory - Factory for creating error handlers and dead-letter sinks
- LineageFactory - Factory for creating lineage sinks and resolving lineage collectors
- ObservabilityFactory - Factory for resolving observability collectors
- DeadLetterSink - Sink for handling failed items
- PipelineErrorHandler - Error handler for pipeline-level errors
- RetryOptions - Configuration for retry behavior
Usage Example:
public override Task<TOut> ExecuteAsync(
TIn item,
PipelineContext context,
CancellationToken cancellationToken)
{
var logger = context.LoggerFactory.CreateLogger("MyTransform");
logger.Log(LogLevel.Information, "Processing item");
// Access shared state
if (context.Items.TryGetValue("state", out var stateObj))
{
var sharedState = stateObj as MyState;
}
return Task.FromResult(transformedItem);
}
4. Pipeline Runner
Purpose: Executes compiled pipelines
public class PipelineRunner
{
// Overload 1: With PipelineContext only
public Task RunAsync<TDefinition>(
PipelineContext context)
where TDefinition : IPipelineDefinition, new();
// Overload 2: With CancellationToken only
public Task RunAsync<TDefinition>(
CancellationToken cancellationToken = default)
where TDefinition : IPipelineDefinition, new();
// Overload 3: With both PipelineContext and CancellationToken
public Task RunAsync<TDefinition>(
PipelineContext context,
CancellationToken cancellationToken)
where TDefinition : IPipelineDefinition, new();
}
Execution Workflow:
- Instantiates pipeline definition
- Creates pipeline builder
- Calls
Define()to build graph - Traverses graph and connects nodes
- Starts execution from source nodes
- Waits for completion
Usage Examples:
// Using only PipelineContext
var context = PipelineContext.Default;
var runner = new PipelineRunner();
await runner.RunAsync<MyPipeline>(context);
// Using only CancellationToken
var cts = new CancellationTokenSource();
await runner.RunAsync<MyPipeline>(cts.Token);
// Using both PipelineContext and CancellationToken
var context = PipelineContext.Default;
var cts = new CancellationTokenSource();
await runner.RunAsync<MyPipeline>(context, cts.Token);
5. Node Execution Model
NPipeline uses a plan-based execution model for optimal performance. During pipeline initialization, the system generates pre-compiled execution plans for each node containing strongly-typed delegates that eliminate reflection overhead during steady-state execution.
Benefits:
- Zero reflection overhead during execution
- Improved performance through direct delegate calls
- Type safety enforced at plan creation time
- Reduced allocation and GC pressure
Node Execution Patterns:
SourceNode Execution:
public abstract IDataPipe<T> ExecuteAsync(
PipelineContext context,
CancellationToken cancellationToken);
Produces initial IDataPipe<T> containing all source data synchronously.
TransformNode Execution:
public abstract Task<T> ExecuteAsync(
T item,
PipelineContext context,
CancellationToken cancellationToken);
Called for each item; returns transformed item or throws to fail.
SinkNode Execution:
public abstract Task ExecuteAsync(
IDataPipe<T> input,
PipelineContext context,
CancellationToken cancellationToken);
Consumes entire data pipe for final processing (save, send, etc).
Component Interaction Flow
Next Steps
- Execution Flow - Understand sequential and parallel execution
- Dependency Injection Integration - Learn about DI integration