Skip to main content

Performance Characteristics

This page explains WHAT the performance characteristics are for different patterns. For HOW TO optimize your pipeline, see Advanced Topics.

NPipeline is designed from the ground up for performance. Understanding its characteristics helps you build efficient pipelines.

Memory Usage

Streaming Model

Memory usage is proportional to the number of items in flight, not total dataset size:

Lazy Evaluation (NPipeline):
Item 1: [Read] → [Transform] → [Write] → [GC] → Item 2
Memory: ~1 item worth of data at any time

Eager Evaluation (.ToList()):
[All items in memory] → Process all → [GC]
Memory: ~N × item_size for N items

Real-world Example:

// Processing 1 million CSV records (500 bytes each)
// Streaming: ~1-2 MB peak memory (1-2 items buffered)
// Eager (.ToList()): ~500 MB+ required
var pipeline = PipelineBuilder
.AddSourceNode<CsvSourceNode>()
.AddTransformNode<TransformNode>()
.AddSinkNode<SinkNode>()
.BuildPipeline();

Memory Per Item

The memory used per item in the pipeline is minimal:

// Memory footprint:
// - Source item: varies (100 bytes - 10 KB typical)
// - Transform: composes items, minimal overhead
// - Sink: determines lifetime of item reference

Throughput Characteristics

Sequential Processing

Time:    0ms         10ms        20ms        30ms
↓ ↓ ↓ ↓
Item 1: [Read]→[Trans]→[Write]
Item 2: [Read]→[Trans]→[Write]
Item 3: [Read]→[Trans]→[Write]

Throughput: 1 item / 10ms = 100 items/second

Performance Optimizations:

NPipeline automatically applies several optimizations to maximize sequential throughput:

  • Context Caching: Execution state (retry options, tracer, logger) is cached at node scope to eliminate per-item dictionary lookups
    • Impact: ~150-250μs reduction per 1K items in typical pipelines
    • Transparent: No configuration needed; automatically applied by execution strategies
  • ValueTask Fast Path: Synchronous transforms avoid Task allocation overhead
    • Impact: Up to 90% reduction in GC pressure for cache-hit scenarios
  • Lazy Evaluation: Items are processed on-demand as consumed, minimizing memory footprint

Parallel Processing

Using ParallelismExtension:

Time:    0ms         10ms        20ms        30ms
↓ ↓ ↓ ↓
Item 1: [Read]→[Trans]→[Write]
Item 2: [Read]→[Trans]→[Write]
Item 3: [Read]→[Trans]→[Write]

Throughput: 3 items / 10ms = 300 items/second (3x speedup)

Implementation:

var pipeline = PipelineBuilder
.AddSourceNode<SourceNode>()
.AddTransformNode<SlowTransform>(parallelism: 4)
.AddSinkNode<SinkNode>()
.BuildPipeline();

Scalability

Vertical Scaling

Scale within a single machine:

// Use parallelism for CPU-bound transforms
.AddTransformNode<CpuIntensiveTransform>(parallelism: Environment.ProcessorCount)

// Use batching for IO-bound transforms
.AddNode(new BatchNode<Item>(batchSize: 100))
.AddTransformNode<DatabaseInsertTransform>()

Horizontal Scaling

Scale across multiple machines:

// Partition source data
var machineId = GetMachineId();
var totalMachines = GetTotalMachines();

var pipeline = PipelineBuilder
.AddSourceNode(new PartitionedSourceNode(machineId, totalMachines))
.AddTransformNode<TransformNode>()
.AddSinkNode(new CentralizedSinkNode()) // Write to shared storage
.BuildPipeline();

Comparative Performance

NPipeline vs Alternatives

AspectNPipelineLINQ StreamingMessage QueuesDirect Iteration
MemoryO(k) active items*O(1) per itemO(batch)O(N) all items
Latency< 1ms first item< 1ms first item10-100msN/A (batch)
SetupLowLowHighVery Low
Typed CompositionYesYesWeakNo
Error HandlingFlexibleBasicRichNone
ObservabilityBuilt-inLimitedRichNone

*k = number of items actively in the pipeline's processing stages at any given time (typically 1-2 for sequential execution, k = parallelism factor for parallel execution). This is independent of total dataset size N.

Optimization Tips

1. Use Async/Await Properly

// Good - respects async model
public async IAsyncEnumerable<Output> ProcessAsync(
Input input,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var result = await _service.ProcessAsync(input, cancellationToken);
yield return result;
}

// Bad - blocks thread
public async IAsyncEnumerable<Output> ProcessAsync(
Input input,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var result = _service.Process(input); // Blocks! Use await instead
yield return result;
}

2. Batch Expensive Operations

// Process 10,000 items one-by-one: 10,000 DB roundtrips (slow)
.AddTransformNode<SingleInsertTransform>()

// vs batch them: 100 DB roundtrips (100x faster)
.AddNode(new BatchNode<Order>(batchSize: 100))
.AddTransformNode<BatchInsertTransform>()

3. Avoid Materialization

// Bad - materializes everything
.AddNode(new MaterializationNode<Item>())
.AddTransformNode<Transform>()

// Good - processes streaming
.AddTransformNode<Transform>()

4. Use Parallelism for CPU Work

// CPU-bound: parallelize
.AddTransformNode<JsonParsingTransform>(parallelism: 8)

// IO-bound: lower parallelism needed
.AddTransformNode<DatabaseQueryTransform>(parallelism: 2)

Benchmarking

Run your own benchmarks with realistic data:

var stopwatch = Stopwatch.StartNew();

var pipeline = BuildPipeline();
var context = PipelineContext.Default;
var result = await runner.ExecuteAsync(pipeline, context);

stopwatch.Stop();
Console.WriteLine($"Processed {result.ItemsProcessed} items in {stopwatch.ElapsedMilliseconds}ms");
Console.WriteLine($"Throughput: {result.ItemsProcessed / stopwatch.Elapsed.TotalSeconds:F0} items/sec");

Next Steps