Data processing in .NET shouldn't feel like this
Memory nightmares
Your pipeline loads everything into memory, then crashes at 2 AM when someone uploads a file that's slightly larger than usual. You've added more RAM twice this year.
Spaghetti transforms
Your data processing logic started simple. Now it's 2,000 lines of nested loops and conditional statements that nobody wants to touch - or test.
Silent failures
One malformed record takes down your entire batch. You've wrapped everything in try-catch blocks, but errors still slip through to production.
A better way to process data
NPipeline gives you a graph-based architecture where data flows through discrete, testable nodes. Each piece does one thing well. Complexity emerges from composition, not accumulation.
Sources
Where data enters your pipeline. Read from files, databases, APIs, or message queues. Sources produce streams of typed items that flow downstream.
Transforms
Where data gets processed. Validate, enrich, filter, aggregate, or reshape your data. Each transform is a focused, single-responsibility component.
Sinks
Where data lands. Write to databases, send to APIs, or stream to files. Sinks consume the processed data and handle final delivery.
The Graph
Connect nodes to form a directed acyclic graph. See exactly how data flows through your system. Debug by tracing the path, not by hunting through nested loops.
Process more data than fits in memory
NPipeline is streaming-first. Data flows through your pipeline item by item, so memory usage stays constant regardless of dataset size. Process a million records or a billion - your memory footprint stays the same.
Real numbers, real impact
1M records, 500 bytes each
Eager loading: ~500 MB peak memory
NPipeline streaming: ~1-2 MB peak memory
Sub-millisecond first item
Start processing immediately. Don't wait for your entire dataset to load before seeing results.
Predictable GC behavior
No surprise pauses. Memory usage scales with your pipeline's complexity, not your data volume.
Zero-allocation fast paths for high-throughput scenarios
NPipeline uses ValueTask<T> to eliminate heap allocations for synchronous operations. Cache hits, validation checks, simple calculations—they all run without touching the heap.
100,000 items/second, 90% cache hits
That's 90,000 Task allocations eliminated per second. Your GC pressure drops by up to 90%. Your P99 latency becomes predictable.
Plan-based execution
NPipeline compiles your pipeline structure once. During execution, there's no reflection, no per-item routing decisions—just direct method dispatch.
Fast path
Synchronous result available? Stack allocation, zero GC pressure.
Slow path
I/O required? Seamlessly transitions to true async.
Same code, both paths
Write it once. NPipeline handles the optimization.
Built for the real world, where things fail
Production pipelines encounter bad data, network blips, and overwhelmed dependencies. NPipeline gives you the tools to handle failure gracefully—without bringing down your entire system.
Retry policies
Transient failures get automatic retries with configurable backoff. Persistent failures trigger node restarts or route items to dead-letter queues.
Circuit breakers
Protect downstream systems from cascading failures. When a dependency is struggling, stop hammering it and give it time to recover.
Granular error handling
Handle errors at the item level or the stream level. One bad record doesn't have to poison your entire batch.
Code that reads like a diagram
NPipeline's fluent API makes your pipeline structure visible in your code. The compiler enforces type safety between nodes—if it compiles, it connects.
public void Define(PipelineBuilder builder, PipelineContext context)
{
// Define your nodes
var source = builder.AddSource<OrderSource, Order>();
var validate = builder.AddTransform<ValidateOrder, Order, Order>();
var enrich = builder.AddTransform<EnrichWithCustomer, Order, EnrichedOrder>();
var sink = builder.AddSink<DatabaseSink, EnrichedOrder>();
// Connect the graph — types must match
builder.Connect(source, validate);
builder.Connect(validate, enrich);
builder.Connect(enrich, sink);
// Add resilience
builder.WithRetryOptions(new PipelineRetryOptions(
MaxItemRetries: 3,
MaxNodeRestartAttempts: 2
));
}Each node is a single class with a single responsibility. Test them in isolation. Compose them into complex workflows.
Designed for testing from day one
Every node is a standalone class. Test your transforms with simple unit tests — no mocking of pipeline infrastructure required.
Isolated nodes
Test each node independently. Pass in test data, assert on outputs. No pipeline ceremony required.
In-memory testing
Use the testing extensions to run entire pipelines in memory. Verify end-to-end behavior without external dependencies.
Assertion libraries
First-class support for FluentAssertions and AwesomeAssertions. Write expressive tests that read like specifications.
Built for these problems
ETL workflows
Extract from databases, APIs, and files. Transform with validation and enrichment. Load to your destination. All with clear, testable code.
Real-time streaming
Process data as it arrives from message queues, webhooks, or IoT devices. Sub-millisecond latency to first item processed.
Data validation
Implement complex validation rules as discrete, testable transforms. Route invalid items to review queues without stopping the pipeline.
Batch processing
Process millions of historical records without running out of memory. Streaming architecture means predictable resource usage.
Event-driven systems
React to events with complex processing logic. Fan out to multiple sinks. Handle backpressure gracefully.
Microservice integration
Transform data between services with different schemas. Enrich with data from multiple sources. Maintain type safety across boundaries.
Modular by design
Start with the core library. Add extensions as you need them.
NPipeline.DependencyInjection
Full integration with Microsoft.Extensions.DependencyInjection. Constructor injection in nodes. Proper service lifetimes.
NPipeline.Parallelism
Parallel execution strategies for CPU-bound transforms. Configurable concurrency limits. Linear throughput scaling.
NPipeline.Connectors
Pre-built sources and sinks for common targets. CSV files, storage providers, and more. Unified abstraction layer.
NPipeline.Testing
In-memory test nodes. Assertion helpers for FluentAssertions and AwesomeAssertions. Test your pipelines without external dependencies.
Ready to build better pipelines?
Get started in minutes. Build your first pipeline in 15.
dotnet add package NPipeline