Data Flow Details
Understanding how data flows through NPipeline and the lazy evaluation principles that make it efficient is key to building high-performance pipelines.
How Data Pipes Work
Data pipes are the channels through which data flows from nodes to the next stage in the pipeline.
Data Pipe Interface:
public interface IDataPipe<T> : IAsyncEnumerable<T>
{
// IDataPipe<T> implements IAsyncEnumerable<T> directly
// Iterate using: await foreach (var item in dataPipe)
}
Basic Data Flow:
// 1. Source produces a pipe
var sourcePipe = await sourceNode.ExecuteAsync(context, cancellationToken);
// 2. Transform consumes and wraps it
var transformedPipe = new TransformPipe(sourcePipe, transformNode);
// 3. Sink consumes the pipe
await foreach (var item in transformedPipe.WithCancellation(cancellationToken))
{
// Each item flows through here
}
Lazy Evaluation
The key to NPipeline's efficiency is lazy evaluation: data is only processed when explicitly consumed.
How Lazy Evaluation Works
Step 1: Source creates pipe
↓
Pipe exists, but no data is read yet
Step 2: Transform wraps pipe
↓
Transform is ready, but no processing happens yet
Step 3: Sink iterates through pipe
↓
NOW data flows:
- Source reads item
- Transform processes item
- Sink consumes item
- REPEAT for next item
Code Example:
// Step 1: Source creates pipe (but doesn't read data yet)
var pipe = await source.ExecuteAsync(context, cancellationToken);
// Step 2: Transform wraps pipe (but doesn't process yet)
var wrappedPipe = new TransformPipe(pipe, transform);
// Step 3: Sink actually triggers execution
await foreach (var item in wrappedPipe.WithCancellation(cancellationToken))
{
// NOW data is read, transformed, consumed
await sink.ProcessAsync(item);
}
Benefits of Lazy Evaluation
Early Termination:
// If pipeline is cancelled before consuming all items,
// source never reads remaining data
await foreach (var item in pipe.WithCancellation(cancellationToken))
{
if (shouldStop)
{
cancellationToken.Cancel();
break; // Source stops reading
}
}
Memory Efficiency:
// Reading 1 million items from a file:
// - Lazy: Only ~1 item in memory at a time
// - Eager (.ToList()): ~100 MB or more in memory
Streaming Responsiveness:
// Results are available immediately
// Don't wait for entire dataset to load
await foreach (var result in pipeline.WithCancellation(cancellationToken))
{
// Process each result as it's available
await WriteToUIAsync(result);
}
Composability of Data Pipes
Each transform creates a new data pipe, allowing for clean composition:
var source = await sourceNode.ExecuteAsync(context, ct); // IDataPipe<Order>
var validated = new TransformPipe(source, validator); // IDataPipe<ValidatedOrder>
var enriched = new TransformPipe(validated, enricher); // IDataPipe<EnrichedOrder>
var processed = new TransformPipe(enriched, processor); // IDataPipe<ProcessedOrder>
// Only when iterated does the entire chain execute
await foreach (var result in processed.WithCancellation(ct))
{
// All transforms happen here for each item
}
Memory Patterns
Good: Streaming Processing
// Only one item in memory at a time
await foreach (var item in pipe.WithCancellation(ct))
{
var result = await ProcessAsync(item);
await WriteAsync(result);
// Item is eligible for GC after this iteration
}
Bad: Materializing Entire Stream
// Loads everything into memory!
var allItems = await pipe.ToListAsync(ct); // ❌ Bad for large datasets
foreach (var item in allItems)
{
// Process...
}
Next Steps
- Dependency Injection Integration - Learn how DI works with NPipeline
- Performance Characteristics - Understand memory and throughput implications