Frequently Asked Questions
General Questions
What is NPipeline?
NPipeline is a high-performance, graph-based streaming data pipeline library for .NET. It allows you to build complex data processing workflows by defining interconnected nodes (sources, transforms, and sinks) that process data efficiently without loading entire datasets into memory.
When should I use NPipeline?
NPipeline is ideal for:
- ETL (Extract, Transform, Load) pipelines
- Real-time data processing
- Batch file processing (CSV, JSON, XML)
- Data validation and cleansing
- API data integration
- Any scenario requiring efficient data flow processing
What makes NPipeline different from other pipeline libraries?
NPipeline offers:
- Graph-based architecture for clear data flow visualization
- High performance with minimal memory allocations
- Type safety with compile-time validation
- Flexibility to mix sequential and parallel execution
- Production-ready error handling and resilience
- Testability with built-in testing utilities
Does NPipeline support streaming data?
Yes! NPipeline is designed around streaming data using IAsyncEnumerable<T>. Data flows through the pipeline without loading everything into memory at once.
Installation & Setup
Which version of .NET is supported?
NPipeline requires .NET 8.0, 9.0, or 10.0.
Do I need to install all the extensions?
No. The core NPipeline package is all you need. Extensions are optional:
- DependencyInjection - For managing dependencies
- Parallelism - For parallel execution
- Testing - For unit testing pipelines
- Connectors - For pre-built source/sink nodes
Can I use NPipeline with ASP.NET Core?
Yes! NPipeline integrates well with ASP.NET Core via dependency injection. See Dependency Injection for details.
How do I set up logging?
Integrate with Microsoft.Extensions.Logging:
services.AddLogging(builder => builder.AddConsole());
services.AddNPipeline(Assembly.GetExecutingAssembly());
Then inject ILogger<T> into your nodes.
Architecture & Concepts
What's the difference between IPipeline and IPipelineDefinition?
- IPipelineDefinition - A blueprint that defines your pipeline structure
- IPipeline - The compiled, runnable instance created from a definition
- You implement
IPipelineDefinition, thenPipelineRunnerexecutes it
Can a pipeline have multiple sources?
Yes. Multiple source nodes can connect to the same transform or separate transforms. See Common Patterns for examples.
What's the purpose of PipelineContext?
PipelineContext carries runtime information through the pipeline:
- Cancellation tokens
- Shared state between nodes
- Logging and observability services
- Custom application data
How does data flow between nodes?
Data flows through IDataPipe<T> objects:
- Source nodes produce an
IDataPipe<T> - Transforms consume it and produce a new
IDataPipe<T> - Sinks consume the final
IDataPipe<T>
Core Concepts
What's the difference between batching and aggregation?
Batching groups items for operational efficiency - to optimize interactions with external systems like databases or APIs. It looks at the wall clock and says "every N items or every X seconds, send what we have."
Aggregation groups items for data correctness - to handle out-of-order or late-arriving events in event-time windows. It uses event timestamps, not arrival times, and can wait for latecomers within a configured grace period.
Use batching when:
- External systems work more efficiently with bulk operations
- You need to reduce API call overhead
- Order/timing of items doesn't affect correctness
Use aggregation when:
- Events can arrive out of order or late
- You need time-windowed summaries or counts
- Results must be correct despite late-arriving data
For a detailed comparison and decision framework, see Grouping Strategies: Batching vs Aggregation.
When should I use ValueTask vs Task in transforms?
Use ValueTask<T> when your transform can complete synchronously in common cases (cache hits, simple calculations). Use Task<T> when your transform is almost always asynchronous.
ValueTask benefits:
- Zero heap allocations for synchronous completions
- Eliminates up to 90% of GC pressure in high-cache-hit scenarios
- Seamlessly transitions to async when needed
Example pattern:
public override ValueTask<UserData> ExecuteAsync(string userId, PipelineContext context, CancellationToken cancellationToken)
{
// Fast path: cache hit - no Task allocation
if (_cache.TryGetValue(userId, out var cached))
return new ValueTask<UserData>(cached);
// Slow path: async database call
return new ValueTask<UserData>(FetchAndCacheAsync(userId, cancellationToken));
}
For complete implementation guidelines and performance impact analysis, see Synchronous Fast Paths and ValueTask Optimization.
Do I need ResilientExecutionStrategy for retries?
No, ResilientExecutionStrategy is specifically for node-level restarts, not basic item retries. There are two different retry mechanisms:
Item-level retries (no ResilientExecutionStrategy needed):
- Retry individual failed items
- Configured via
PipelineRetryOptions.MaxItemRetries - Handled in node error handlers with
NodeErrorDecision.Retry
Node-level restarts (requires ResilientExecutionStrategy):
- Restart entire node streams on failure
- Configured via
PipelineRetryOptions.MaxNodeRestartAttempts - Requires three mandatory components:
- Node wrapped with
ResilientExecutionStrategy MaxNodeRestartAttempts > 0in retry optionsMaxMaterializedItemsset to a positive number (not null)
- Node wrapped with
For detailed configuration requirements, see Node Restart - Quick Start Checklist.
Performance
How do I know if my pipeline is optimized?
Use these approaches to verify pipeline optimization:
-
Enable build-time analyzers to catch performance anti-patterns:
// In your .csproj
<PackageReference Include="NPipeline.Analyzers" Version="*" /> -
Check for common issues:
- Blocking operations in async methods (NP9102)
- Missing ValueTask optimizations (NP9209)
- Non-streaming patterns in source nodes (NP9211)
- Swallowed cancellation exceptions (NP9103)
-
Benchmark critical paths with BenchmarkDotNet:
[MemoryDiagnoser]
public class MyTransformBenchmarks
{
[Benchmark]
public async Task Transform() => await _transform.ProcessAsync(_data);
} -
Monitor runtime metrics:
- GC pressure and allocation rates
- Throughput vs. latency trade-offs
- Memory usage patterns
For comprehensive performance best practices, see Performance Hygiene and Performance Analyzers.
What's the memory overhead of materialization?
Materialization buffers items in memory to enable replay functionality during node restarts. The memory overhead depends on:
- Item size: Larger items consume more memory per buffered item
- Buffer limit:
MaxMaterializedItemsdetermines maximum items buffered - Buffer duration: How many seconds of data you need to replay
Example calculations:
- Small objects (100 bytes): 10,000 items ≈ 1MB
- Medium objects (1KB): 1,000 items ≈ 1MB
- Large objects (10KB): 100 items ≈ 1MB
Configuration guidance:
var options = new PipelineRetryOptions(
MaxItemRetries: 3,
MaxNodeRestartAttempts: 2,
MaxMaterializedItems: 1000 // Adjust based on item size and memory budget
);
For detailed memory calculations and configuration examples, see Materialization and Buffering.
Troubleshooting
My node restarts aren't working
Node restarts require three mandatory components. If any are missing, restarts silently fail:
-
ResilientExecutionStrategy not applied:
// REQUIRED
var nodeHandle = builder
.AddTransform<MyTransform, Input, Output>("myNode")
.WithExecutionStrategy(builder,
new ResilientExecutionStrategy(new SequentialExecutionStrategy())); -
MaxNodeRestartAttempts not configured:
// REQUIRED - must be > 0
var options = new PipelineRetryOptions(
MaxItemRetries: 3,
MaxNodeRestartAttempts: 2, // ← Must be set
MaxMaterializedItems: 1000
); -
MaxMaterializedItems is null (most common issue):
// REQUIRED - must be positive number, not null
var options = new PipelineRetryOptions(
MaxItemRetries: 3,
MaxNodeRestartAttempts: 2,
MaxMaterializedItems: 1000 // ← CRITICAL: null disables restarts
);
Verification checklist:
- Node wrapped with ResilientExecutionStrategy
- MaxNodeRestartAttempts > 0
- MaxMaterializedItems is set to positive number
- Error handler returns PipelineErrorDecision.RestartNode
For the complete checklist and troubleshooting guide, see Node Restart - Quick Start Checklist.
My parallel pipeline is slower than sequential
Common parallelism anti-patterns that can make parallel pipelines slower:
-
Resource contention:
- Multiple threads competing for same database connection
- Shared resources without proper synchronization
- Too high degree of parallelism causing context switching overhead
-
I/O-bound work with excessive parallelism:
// WRONG: Too much parallelism for I/O work
new ParallelOptions { MaxDegreeOfParallelism = 100 }
// BETTER: Match to I/O capacity
new ParallelOptions { MaxDegreeOfParallelism = 4 } -
Unnecessary ordering preservation:
// SLOWER: Preserves order (default)
new ParallelOptions { PreserveOrdering = true }
// FASTER: When order doesn't matter
new ParallelOptions { PreserveOrdering = false } -
Improper queue configuration:
- Unbounded queues causing memory pressure
- Too small queues causing blocking
Optimization steps:
- Start with
MaxDegreeOfParallelism = Environment.ProcessorCount - Set
PreserveOrdering = falseif order isn't required - Configure appropriate queue limits with
MaxQueueLength - Profile to identify actual bottlenecks
For detailed parallelism configuration and best practices, see Parallelism.
Error Handling
What happens if a transform throws an exception?
By default, the exception propagates up and stops the pipeline. Handle errors explicitly:
try
{
return await ProcessAsync(item);
}
catch (Exception ex)
{
logger.LogError(ex, "Processing failed");
throw; // or handle gracefully
}
How do I implement retries?
NPipeline provides several built-in retry mechanisms:
1. Item-level retries - Retry individual items that fail:
// Configure max retries per item
builder.WithRetryOptions(opt => opt.With(maxItemRetries: 3));
// In your error handler, return Retry for transient failures
public Task<NodeErrorDecision> HandleAsync(...)
{
if (exception is TransientException)
return Task.FromResult(NodeErrorDecision.Retry);
return Task.FromResult(NodeErrorDecision.Skip);
}
2. Node-level restarts - Restart entire node streams on failure:
// Enable resilience and configure restart limits
var myNode = builder
.AddTransform<MyTransform, Input, Output>("myTransform")
.WithResilience(builder)
.WithRetryOptions(builder, opt => opt.With(
maxNodeRestartAttempts: 3,
maxMaterializedItems: 5000
));
// In your pipeline error handler, return RestartNode
public Task<PipelineErrorDecision> HandleNodeFailureAsync(...)
{
if (exception is TimeoutException)
return Task.FromResult(PipelineErrorDecision.RestartNode);
return Task.FromResult(PipelineErrorDecision.FailPipeline);
}
3. Circuit breaker - Prevent cascading failures:
// Configure circuit breaker through PipelineRetryOptions
var retryOptions = new PipelineRetryOptions(
MaxItemRetries: 3,
MaxNodeRestartAttempts: 2,
MaxSequentialNodeAttempts: 5,
CircuitBreakerOptions: new PipelineCircuitBreakerOptions(
failureThreshold: 5,
openDuration: TimeSpan.FromMinutes(1),
samplingWindow: TimeSpan.FromMinutes(5),
thresholdType: CircuitBreakerThresholdType.ConsecutiveFailures
)
);
var context = PipelineContext.WithRetry(retryOptions);
For advanced patterns beyond simple retry counts, integrate external libraries like Polly:
// Exponential backoff with Polly
var policy = Policy
.Handle<HttpRequestException>()
.WaitAndRetryAsync(3, attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)));
await policy.ExecuteAsync(async () => await CallServiceAsync());
See Configuring Retries for detailed examples and best practices.
Error Codes Reference
What do the error codes like [NP0101] mean?
NPipeline uses standardized error codes (NP prefix) to help you quickly identify and resolve issues:
- NP01xx - Graph Validation Errors
- NP02xx - Type Mismatch and Conversion Errors
- NP03xx - Execution Errors
- NP04xx - Configuration Errors
- NP05xx - Resource and Capacity Errors
See the Error Codes Reference for a complete list of all error codes, their causes, and solutions.
How do I handle invalid data?
Route invalid data to a separate error stream or log it:
public override Task<Order> ExecuteAsync(Order item, PipelineContext context, CancellationToken cancellationToken)
{
if (item.Price < 0)
{
logger.LogWarning($"Invalid price: {item.Price}");
throw new ValidationException("Price cannot be negative");
}
return Task.FromResult(item);
}
Testing
How do I test a pipeline?
Use InMemorySourceNode and InMemorySinkNode:
var context = new PipelineContext();
context.Items[typeof(InMemorySourceNode<int>).FullName!] = new[] { 1, 2, 3 };
var runner = new PipelineRunner();
await runner.RunAsync<TestPipeline>(context);
// Retrieve results
var sink = context.Items[typeof(InMemorySinkNode<int>).FullName!] as InMemorySinkNode<int>;
var results = await sink!.Completion;
See Testing Pipelines for details.
Can I test individual nodes?
Yes. Call ExecuteAsync directly with test data:
var transform = new MyTransform();
var context = new PipelineContext();
var result = await transform.ExecuteAsync(testData, context, CancellationToken.None);
Assert.Equal(expected, result);
How do I mock external dependencies?
Use dependency injection with mock services:
var mockService = new Mock<IMyService>();
var node = new MyNode(mockService.Object);
Integration
How do I use NPipeline with a database?
Implement custom source and sink nodes:
public class DatabaseSource : SourceNode<Customer>
{
private readonly string _connectionString;
public override IDataPipe<Customer> ExecuteAsync(PipelineContext context, CancellationToken cancellationToken)
{
static IAsyncEnumerable<Customer> ReadAsync(string connStr, CancellationToken ct)
{
return Read();
async IAsyncEnumerable<Customer> Read()
{
using var connection = new SqlConnection(connStr);
await connection.OpenAsync(ct);
// Read and yield customers
}
}
return new StreamingDataPipe<Customer>(ReadAsync(_connectionString, cancellationToken));
}
}
How do I use NPipeline with Message Queues (like RabbitMQ)?
Implement a source node that reads from the queue:
public class QueueSource : SourceNode<Message>
{
private readonly IQueueClient _client;
public override IDataPipe<Message> ExecuteAsync(PipelineContext context, CancellationToken cancellationToken)
{
static IAsyncEnumerable<Message> ReadAsync(IQueueClient client, CancellationToken ct)
{
return Read();
async IAsyncEnumerable<Message> Read()
{
while (!ct.IsCancellationRequested)
{
var message = await client.ReceiveAsync(ct);
if (message != null)
{
yield return message;
}
}
}
}
return new StreamingDataPipe<Message>(ReadAsync(_client, cancellationToken));
}
}
Can I use NPipeline with dependency injection containers?
Yes, with the NPipeline.Extensions.DependencyInjection package:
services.AddNPipeline(Assembly.GetExecutingAssembly());
// Your nodes are automatically registered and resolved
Troubleshooting
My pipeline is running out of memory
Cause: Likely loading all data at once instead of streaming.
Solution: Use async enumerable:
// BAD - loads all data
var allData = database.GetAllRecords().ToList(); // ❌
return Task.FromResult(new StreamingDataPipe<T>(allData.ToAsyncEnumerable()));
// GOOD - streams data
async IAsyncEnumerable<T> GetDataAsync()
{
foreach (var record in database.GetAllRecords()) // ✅ Lazy enumeration
yield return record;
}
return Task.FromResult(new StreamingDataPipe<T>(GetDataAsync()));
Pipeline is slow
Solution: Profile to find the bottleneck:
- Check which transform is slow
- Consider parallelism if CPU-bound
- Consider batching if I/O-bound
- Minimize allocations
Nodes aren't being registered with DI
Cause: Assembly not scanned.
Solution: Pass assembly to AddNPipeline:
services.AddNPipeline(Assembly.GetExecutingAssembly()); // ✅
// NOT just: services.AddNPipeline(); ❌
CancellationToken not working
Cause: Not checking the token in nodes.
Solution: Check and respect the token:
public override async Task ExecuteAsync(IDataPipe<T> input, PipelineContext context, CancellationToken cancellationToken)
{
await foreach (var item in input.WithCancellation(cancellationToken)) // ✅
{
cancellationToken.ThrowIfCancellationRequested(); // ✅
// Process item
}
}
Next Steps
- Getting Started - Installation and quick start
- Common Patterns - See practical examples
- Best Practices - Design guidelines
- Troubleshooting - Common issues and solutions