Skip to main content

Component Architecture

NPipeline consists of several key components that work together to define, build, and execute pipelines. Understanding their roles and interactions is essential to working effectively with the framework.

Major Components Overview

1. Pipeline Definition (IPipelineDefinition)

Purpose: Declarative blueprint of your pipeline structure

public interface IPipelineDefinition
{
void Define(PipelineBuilder builder, PipelineContext context);
}

Responsibilities:

  • Connect nodes via the builder
  • Configure the execution topology
  • Define error handling strategy

Example:

public class OrderProcessingPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var source = builder.AddSource<OrderSource, Order>();
var validator = builder.AddTransform<OrderValidator, Order, ValidatedOrder>();
var enricher = builder.AddTransform<OrderEnricher, ValidatedOrder, EnrichedOrder>();
var sink = builder.AddSink<OrderSink, EnrichedOrder>();

builder.Connect(source, validator);
builder.Connect(validator, enricher);
builder.Connect(enricher, sink);
}
}

2. Pipeline Builder

Purpose: Orchestrates node creation and graph construction

public class PipelineBuilder
{
public SourceNodeHandle<TOut> AddSource<TNode, TOut>()
where TNode : ISourceNode<TOut>;
public TransformNodeHandle<TIn, TOut> AddTransform<TNode, TIn, TOut>()
where TNode : ITransformNode<TIn, TOut>;
public SinkNodeHandle<TIn> AddSink<TNode, TIn>()
where TNode : ISinkNode<TIn>;

public PipelineBuilder Connect<TData>(
SourceNodeHandle<TData> source,
TransformNodeHandle<TData, TOut> target);
// ... other Connect overloads
}

Builder Process:

  1. Resolves nodes from dependency injection
  2. Builds execution graph
  3. Validates connectivity
  4. Compiles to executable pipeline

3. Pipeline Context

Purpose: Shared runtime context for all nodes in the pipeline

public class PipelineContext
{
public Dictionary<string, object> Items { get; }
public Dictionary<string, object> Parameters { get; }
public Dictionary<string, object> Properties { get; }
public CancellationToken CancellationToken { get; }
public string CurrentNodeId { get; }
public IPipelineLoggerFactory LoggerFactory { get; }
public IPipelineTracer Tracer { get; }
public IErrorHandlerFactory ErrorHandlerFactory { get; }
public ILineageFactory LineageFactory { get; }
public IObservabilityFactory ObservabilityFactory { get; }
public IDeadLetterSink? DeadLetterSink { get; }
public IPipelineErrorHandler? PipelineErrorHandler { get; }
public PipelineRetryOptions RetryOptions { get; }
}

Contains:

  • Items - Shared state between nodes
  • Parameters - Input parameters for the pipeline
  • Properties - Dictionary for extensions and plugins to store custom data
  • CancellationToken - For cancellation propagation
  • CurrentNodeId - ID of the node currently being executed
  • LoggerFactory - Factory to create loggers for nodes
  • Tracer - For distributed tracing
  • ErrorHandlerFactory - Factory for creating error handlers and dead-letter sinks
  • LineageFactory - Factory for creating lineage sinks and resolving lineage collectors
  • ObservabilityFactory - Factory for resolving observability collectors
  • DeadLetterSink - Sink for handling failed items
  • PipelineErrorHandler - Error handler for pipeline-level errors
  • RetryOptions - Configuration for retry behavior

Usage Example:

public override Task<TOut> ExecuteAsync(
TIn item,
PipelineContext context,
CancellationToken cancellationToken)
{
var logger = context.LoggerFactory.CreateLogger("MyTransform");
logger.Log(LogLevel.Information, "Processing item");

// Access shared state
if (context.Items.TryGetValue("state", out var stateObj))
{
var sharedState = stateObj as MyState;
}

return Task.FromResult(transformedItem);
}

4. Pipeline Runner

Purpose: Executes compiled pipelines

public class PipelineRunner
{
// Overload 1: With PipelineContext only
public Task RunAsync<TDefinition>(
PipelineContext context)
where TDefinition : IPipelineDefinition, new();

// Overload 2: With CancellationToken only
public Task RunAsync<TDefinition>(
CancellationToken cancellationToken = default)
where TDefinition : IPipelineDefinition, new();

// Overload 3: With both PipelineContext and CancellationToken
public Task RunAsync<TDefinition>(
PipelineContext context,
CancellationToken cancellationToken)
where TDefinition : IPipelineDefinition, new();
}

Execution Workflow:

  1. Instantiates pipeline definition
  2. Creates pipeline builder
  3. Calls Define() to build graph
  4. Traverses graph and connects nodes
  5. Starts execution from source nodes
  6. Waits for completion

Usage Examples:

// Using only PipelineContext
var context = PipelineContext.Default;
var runner = new PipelineRunner();
await runner.RunAsync<MyPipeline>(context);

// Using only CancellationToken
var cts = new CancellationTokenSource();
await runner.RunAsync<MyPipeline>(cts.Token);

// Using both PipelineContext and CancellationToken
var context = PipelineContext.Default;
var cts = new CancellationTokenSource();
await runner.RunAsync<MyPipeline>(context, cts.Token);

5. Node Execution Model

NPipeline uses a plan-based execution model for optimal performance. During pipeline initialization, the system generates pre-compiled execution plans for each node containing strongly-typed delegates that eliminate reflection overhead during steady-state execution.

Benefits:

  • Zero reflection overhead during execution
  • Improved performance through direct delegate calls
  • Type safety enforced at plan creation time
  • Reduced allocation and GC pressure

Node Execution Patterns:

SourceNode Execution:

public abstract IDataPipe<T> ExecuteAsync(
PipelineContext context,
CancellationToken cancellationToken);

Produces initial IDataPipe<T> containing all source data synchronously.

TransformNode Execution:

public abstract Task<T> ExecuteAsync(
T item,
PipelineContext context,
CancellationToken cancellationToken);

Called for each item; returns transformed item or throws to fail.

SinkNode Execution:

public abstract Task ExecuteAsync(
IDataPipe<T> input,
PipelineContext context,
CancellationToken cancellationToken);

Consumes entire data pipe for final processing (save, send, etc).

Component Interaction Flow

Next Steps