Skip to main content

High-Performance, Type-Safe, 
Streaming Data Pipelines in .NET

NPipeline is a powerful, flexible library designed for constructing robust, graph-based streaming data workflows.

By combining the type safety of C# with a directed acyclic graph (DAG) architecture, NPipeline empowers developers to build complex ETL processes, real-time data streams, and event-driven architectures that are easy to test, debug, and maintain.

Data processing in .NET shouldn't feel like this

Memory nightmares

Your pipeline loads everything into memory, then crashes at 2 AM when someone uploads a file that's slightly larger than usual. You've added more RAM twice this year.

Spaghetti transforms

Your data processing logic started simple. Now it's 2,000 lines of nested loops and conditional statements that nobody wants to touch - or test.

Silent failures

One malformed record takes down your entire batch. You've wrapped everything in try-catch blocks, but errors still slip through to production.

A better way to process data

NPipeline gives you a graph-based architecture where data flows through discrete, testable nodes. Each piece does one thing well. Complexity emerges from composition, not accumulation.

Sources

Where data enters your pipeline. Read from files, databases, APIs, or message queues. Sources produce streams of typed items that flow downstream.

Transforms

Where data gets processed. Validate, enrich, filter, aggregate, or reshape your data. Each transform is a focused, single-responsibility component.

Sinks

Where data lands. Write to databases, send to APIs, or stream to files. Sinks consume the processed data and handle final delivery.

The Graph

Connect nodes to form a directed acyclic graph. See exactly how data flows through your system. Debug by tracing the path, not by hunting through nested loops.

Process more data than fits in memory

NPipeline is streaming-first. Data flows through your pipeline item by item, so memory usage stays constant regardless of dataset size. Process a million records or a billion - your memory footprint stays the same.

Real numbers, real impact

1M records, 500 bytes each

Eager loading: ~500 MB peak memory
NPipeline streaming: ~1-2 MB peak memory

Sub-millisecond first item

Start processing immediately. Don't wait for your entire dataset to load before seeing results.

Predictable GC behavior

No surprise pauses. Memory usage scales with your pipeline's complexity, not your data volume.

Zero-allocation fast paths for high-throughput scenarios

NPipeline uses ValueTask<T> to eliminate heap allocations for synchronous operations. Cache hits, validation checks, simple calculations—they all run without touching the heap.

100,000 items/second, 90% cache hits

That's 90,000 Task allocations eliminated per second. Your GC pressure drops by up to 90%. Your P99 latency becomes predictable.

Plan-based execution

NPipeline compiles your pipeline structure once. During execution, there's no reflection, no per-item routing decisions—just direct method dispatch.

Fast path

Synchronous result available? Stack allocation, zero GC pressure.

Slow path

I/O required? Seamlessly transitions to true async.

Same code, both paths

Write it once. NPipeline handles the optimization.

Built for the real world, where things fail

Production pipelines encounter bad data, network blips, and overwhelmed dependencies. NPipeline gives you the tools to handle failure gracefully—without bringing down your entire system.

Retry policies

Transient failures get automatic retries with configurable backoff. Persistent failures trigger node restarts or route items to dead-letter queues.

Circuit breakers

Protect downstream systems from cascading failures. When a dependency is struggling, stop hammering it and give it time to recover.

Granular error handling

Handle errors at the item level or the stream level. One bad record doesn't have to poison your entire batch.

Code that reads like a diagram

NPipeline's fluent API makes your pipeline structure visible in your code. The compiler enforces type safety between nodes—if it compiles, it connects.

public void Define(PipelineBuilder builder, PipelineContext context) 
{
    // Define your nodes
    var source = builder.AddSource<OrderSource, Order>();
    var validate = builder.AddTransform<ValidateOrder, Order, Order>();
    var enrich = builder.AddTransform<EnrichWithCustomer, Order, EnrichedOrder>();
    var sink = builder.AddSink<DatabaseSink, EnrichedOrder>();

    // Connect the graph — types must match
    builder.Connect(source, validate);
    builder.Connect(validate, enrich);
    builder.Connect(enrich, sink);
    
    // Add resilience
    builder.WithRetryOptions(new PipelineRetryOptions(
        MaxItemRetries: 3, 
        MaxNodeRestartAttempts: 2
    ));
}

Each node is a single class with a single responsibility. Test them in isolation. Compose them into complex workflows.

Designed for testing from day one

Every node is a standalone class. Test your transforms with simple unit tests — no mocking of pipeline infrastructure required.

Isolated nodes

Test each node independently. Pass in test data, assert on outputs. No pipeline ceremony required.

In-memory testing

Use the testing extensions to run entire pipelines in memory. Verify end-to-end behavior without external dependencies.

Assertion libraries

First-class support for FluentAssertions and AwesomeAssertions. Write expressive tests that read like specifications.

Built for these problems

ETL workflows

Extract from databases, APIs, and files. Transform with validation and enrichment. Load to your destination. All with clear, testable code.

Real-time streaming

Process data as it arrives from message queues, webhooks, or IoT devices. Sub-millisecond latency to first item processed.

Data validation

Implement complex validation rules as discrete, testable transforms. Route invalid items to review queues without stopping the pipeline.

Batch processing

Process millions of historical records without running out of memory. Streaming architecture means predictable resource usage.

Event-driven systems

React to events with complex processing logic. Fan out to multiple sinks. Handle backpressure gracefully.

Microservice integration

Transform data between services with different schemas. Enrich with data from multiple sources. Maintain type safety across boundaries.

Modular by design

Start with the core library. Add extensions as you need them.

NPipeline.DependencyInjection

Full integration with Microsoft.Extensions.DependencyInjection. Constructor injection in nodes. Proper service lifetimes.

NPipeline.Parallelism

Parallel execution strategies for CPU-bound transforms. Configurable concurrency limits. Linear throughput scaling.

NPipeline.Connectors

Pre-built sources and sinks for common targets. CSV files, storage providers, and more. Unified abstraction layer.

NPipeline.Testing

In-memory test nodes. Assertion helpers for FluentAssertions and AwesomeAssertions. Test your pipelines without external dependencies.

Ready to build better pipelines?

Get started in minutes. Build your first pipeline in 15.

dotnet add package NPipeline