API Quick Reference
Core Interfaces
IPipelineDefinition
Purpose: Define pipeline structure and connections.
public interface IPipelineDefinition
{
void Define(PipelineBuilder builder, PipelineContext context);
}
Implement to:
- Create custom pipelines
- Define node connections
- Configure pipeline topology
Example:
using NPipeline.Pipeline;
public class MyPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var source = builder.AddSource<MySource, MyData>();
var transform = builder.AddTransform<MyTransform, MyData, ProcessedData>();
var sink = builder.AddSink<MySink, ProcessedData>();
builder.Connect(source, transform);
builder.Connect(transform, sink);
}
}
PipelineBuilder
Purpose: Build and connect nodes.
| Method | Returns | Purpose |
|---|---|---|
AddSource<TNode, TOut>() | SourceNodeHandle<TOut> | Create and register a source node |
AddTransform<TNode, TIn, TOut>() | TransformNodeHandle<TIn, TOut> | Create and register a transform node |
AddSink<TNode, TIn>() | SinkNodeHandle<TIn> | Create and register a sink node |
Connect<TData>(SourceNodeHandle<TData>, TransformNodeHandle<TData, TOut>) | PipelineBuilder | Connect two nodes |
Example:
var source = builder.AddSource<OrderSource, Order>();
var sink = builder.AddSink<OrderSink, Order>();
builder.Connect(source, sink);
INode
Purpose: Base interface for all pipeline nodes.
All nodes implement INode. Use specific base classes instead:
SourceNode<T>- Data sourceTransformNode<TIn, TOut>- Data transformSinkNode<T>- Data consumer
Base Node Classes
SourceNode<T>
Purpose: Produce data to pipeline.
public abstract class SourceNode<T> : INode
{
public abstract IDataPipe<T> ExecuteAsync(
PipelineContext context,
CancellationToken cancellationToken);
}
Implement by: Override ExecuteAsync() to return data synchronously.
Example:
public class CsvSource : SourceNode<Customer>
{
private readonly string _filePath;
public override IDataPipe<Customer> ExecuteAsync(
PipelineContext context,
CancellationToken cancellationToken)
{
static IAsyncEnumerable<Customer> ReadAsync(string path, CancellationToken ct)
{
return Read();
async IAsyncEnumerable<Customer> Read()
{
using var reader = new StreamReader(path);
string? line;
while ((line = await reader.ReadLineAsync(ct)) != null)
{
ct.ThrowIfCancellationRequested();
yield return ParseCsv(line);
}
}
}
return new StreamingDataPipe<Customer>(ReadAsync(_filePath, cancellationToken));
}
}
TransformNode<TIn, TOut>
Purpose: Transform data item-by-item.
public abstract class TransformNode<TIn, TOut> : INode
{
public abstract Task<TOut> ExecuteAsync(
TIn item,
PipelineContext context,
CancellationToken cancellationToken);
}
Implement by: Override ExecuteAsync() for each item.
Example:
public class OrderValidator : TransformNode<Order, ValidatedOrder>
{
public override Task<ValidatedOrder> ExecuteAsync(
Order item,
PipelineContext context,
CancellationToken cancellationToken)
{
if (item.Amount <= 0)
throw new ValidationException("Amount must be positive");
var validated = new ValidatedOrder(item.Id, item.Amount);
return Task.FromResult(validated);
}
}
SinkNode<T>
Purpose: Consume final data.
using NPipeline.Nodes;
using NPipeline.DataFlow;
using NPipeline.Observability.Tracing;
using NPipeline.Pipeline;
public abstract class SinkNode<T> : ISinkNode<T>
{
public abstract Task ExecuteAsync(
IDataPipe<T> input,
PipelineContext context,
CancellationToken cancellationToken);
}
Implement by: Override ExecuteAsync() to consume the data pipe.
Example:
public class OrderSink : SinkNode<ValidatedOrder>
{
private readonly IOrderRepository _repository;
public override async Task ExecuteAsync(
IDataPipe<ValidatedOrder> input,
PipelineContext context,
CancellationToken cancellationToken)
{
await foreach (var order in input.WithCancellation(cancellationToken))
{
await _repository.SaveAsync(order, cancellationToken);
}
}
}
Core Types
PipelineContext
Purpose: Shared runtime context for pipeline.
using NPipeline.Pipeline;
using NPipeline.Observability.Logging;
using NPipeline.Observability.Tracing;
public class PipelineContext
{
public Dictionary<string, object> Items { get; }
public Dictionary<string, object> Parameters { get; }
public CancellationToken CancellationToken { get; }
public IPipelineLoggerFactory LoggerFactory { get; }
public IPipelineTracer Tracer { get; }
public IDeadLetterSink? DeadLetterSink { get; }
public IPipelineErrorHandler? PipelineErrorHandler { get; }
public Properties Properties { get; }
public IErrorHandlerFactory ErrorHandlerFactory { get; }
public ILineageFactory LineageFactory { get; }
public IObservabilityFactory ObservabilityFactory { get; }
public RetryOptions RetryOptions { get; }
public StateManager StateManager { get; }
public IExecutionObserver ExecutionObserver { get; }
}
Key Members:
Items- Store and retrieve shared state between nodesParameters- Input parameters for the pipelineCancellationToken- For graceful cancellationLoggerFactory- Factory to create loggers for nodesTracer- For distributed tracingDeadLetterSink- Sink for failed itemsPipelineErrorHandler- Error handler for pipeline-level errorsProperties- Pipeline propertiesErrorHandlerFactory- Factory for creating error handlers and dead-letter sinksLineageFactory- Factory for creating lineage sinks and resolving lineage collectorsObservabilityFactory- Factory for resolving observability collectorsRetryOptions- Configuration for retry behaviorStateManager- Manages pipeline stateExecutionObserver- Observer for execution events
Example:
var context = new PipelineContextBuilder()
.WithCancellation(cts.Token)
.Build();
// In a node:
var logger = context.LoggerFactory.CreateLogger("MyNode");
logger.Log(LogLevel.Information, "Processing item");
// Access shared state
if (context.Items.TryGetValue("cache", out var cacheObj))
{
var sharedState = cacheObj as MyCache;
}
IDataPipe
Purpose: Represents streaming data flow.
using NPipeline.DataFlow;
public interface IDataPipe<out T> : IAsyncEnumerable<T>
{
string StreamName { get; }
// IDataPipe<T> implements IAsyncEnumerable<T> directly
// Iterate using: await foreach (var item in dataPipe)
}
Usage:
var dataPipe = sourceNode.ExecuteAsync(context, cancellationToken);
await foreach (var item in dataPipe.WithCancellation(cancellationToken))
{
// Process item
}
StreamingDataPipe
Purpose: Default implementation of IDataPipe.
using NPipeline.DataFlow.DataPipes;
public class StreamingDataPipe<T> : IDataPipe<T>
{
public StreamingDataPipe(IAsyncEnumerable<T> data, string streamName = "");
// Implements IAsyncEnumerable<T> directly
}
Create in source nodes:
static IAsyncEnumerable<Item> ReadAsync()
{
return Read();
async IAsyncEnumerable<Item> Read()
{
// Yield items
yield return new Item();
}
}
return new StreamingDataPipe<Item>(ReadAsync());
Execution
PipelineRunner
Purpose: Execute pipelines.
using NPipeline.Execution;
public class PipelineRunner
{
public Task RunAsync<TDefinition>(
PipelineContext context,
CancellationToken cancellationToken = default)
where TDefinition : IPipelineDefinition, new();
public Task RunAsync<TDefinition>(
CancellationToken cancellationToken = default)
where TDefinition : IPipelineDefinition, new();
}
Usage:
var runner = new PipelineRunner();
var context = new PipelineContextBuilder()
.WithCancellation(cancellationToken)
.Build();
await runner.RunAsync<MyPipeline>(context, cancellationToken);
Dependency Injection
AddNPipeline Extension (Assembly Scanning)
Purpose: Register nodes and services with automatic discovery.
public static IServiceCollection AddNPipeline(
this IServiceCollection services,
params Assembly[] assembliesToScan);
Usage:
services.AddNPipeline(
Assembly.GetExecutingAssembly(),
typeof(ConnectorAssembly).Assembly
);
Registers:
- All
INodeimplementations as Transient - All
IPipelineDefinitionimplementations - All
IPipelineErrorHandlerandINodeErrorHandlerimplementations - All
IDeadLetterSink,ILineageSink, andIPipelineLineageSinkimplementations - All
IPipelineLineageSinkProviderimplementations - Pipeline infrastructure services (IPipelineFactory, IPipelineRunner, etc.)
AddNPipeline Extension (Fluent Configuration)
Purpose: Register nodes and services with fine-grained control using a fluent API.
public static IServiceCollection AddNPipeline(
this IServiceCollection services,
Action<NPipelineServiceBuilder> configure);
Usage:
services.AddNPipeline(builder => builder
.AddNode<MySourceNode>()
.AddNode<MyTransformNode>()
.AddNode<MySinkNode>(ServiceLifetime.Singleton)
.AddPipeline<MyPipelineDefinition>()
.AddErrorHandler<MyErrorHandler>()
.AddLineageSink<MyLineageSink>()
);
Benefits:
- No reflection overhead - explicit component registration
- Clear, discoverable API via IntelliSense
- Type-safe registration with compile-time verification
- Flexible
ServiceLifetimecontrol per component (Transient, Scoped, Singleton) - Can mix manual registration with assembly scanning via
ScanAssemblies()
NPipelineServiceBuilder Methods:
// Register nodes
AddNode<TNode>()
AddNode<TNode>(ServiceLifetime lifetime)
// Register pipelines
AddPipeline<TPipeline>()
AddPipeline<TPipeline>(ServiceLifetime lifetime)
// Register error handlers
AddErrorHandler<THandler>()
AddErrorHandler<THandler>(ServiceLifetime lifetime)
AddPipelineErrorHandler<THandler>()
AddPipelineErrorHandler<THandler>(ServiceLifetime lifetime)
// Register sinks
AddDeadLetterSink<TSink>()
AddDeadLetterSink<TSink>(ServiceLifetime lifetime)
AddLineageSink<TSink>()
AddLineageSink<TSink>(ServiceLifetime lifetime)
AddPipelineLineageSink<TSink>()
AddPipelineLineageSink<TSink>(ServiceLifetime lifetime)
// Register providers
AddLineageSinkProvider<TProvider>()
AddLineageSinkProvider<TProvider>(ServiceLifetime lifetime)
// Scan assemblies for implementations
ScanAssemblies(params Assembly[] assemblies)
Example - Mixed Approach:
services.AddNPipeline(builder => builder
// Register high-performance or singleton components manually
.AddNode<SpecialCachedNode>(ServiceLifetime.Singleton)
.AddPipeline<MyPipeline>()
// Then scan remaining components from assemblies
.ScanAssemblies(
Assembly.GetExecutingAssembly(),
typeof(PluginProvider).Assembly
)
);
## Extensions
### Parallelism Extension
**Package:** `NPipeline.Extensions.Parallelism`
**Configure Parallel Execution:**
```csharp
using NPipeline.Extensions.Parallelism;
using NPipeline.Pipeline;
public class MyPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var source = builder.AddSource<MySource, MyData>();
var transform = builder.AddTransform<MyTransform, MyData, ProcessedData>();
var sink = builder.AddSink<MySink, ProcessedData>();
builder.Connect(source, transform);
builder.Connect(transform, sink);
// Configure parallel execution for the transform
builder.WithParallelOptions(
new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }
);
}
}
Method:
public static PipelineBuilder WithParallelOptions(this PipelineBuilder builder, ParallelOptions options)
Testing Extension
Package: NPipeline.Extensions.Testing
InMemorySourceNode
using NPipeline.Extensions.Testing;
public class InMemorySourceNode<T> : SourceNode<T>
{
public T[] Data { get; set; }
public override IDataPipe<T> ExecuteAsync(
PipelineContext context,
CancellationToken cancellationToken);
}
Usage:
var source = new InMemorySourceNode<int> { Data = new[] { 1, 2, 3 } };
InMemorySinkNode<T>
public class InMemorySinkNode<T> : SinkNode<T>
{
public Task<List<T>> Completion { get; }
public override Task ExecuteAsync(
IDataPipe<T> input,
PipelineContext context,
CancellationToken cancellationToken);
}
Usage:
var sink = new InMemorySinkNode<int>();
// ... run pipeline ...
var results = await sink.Completion;
Connectors
CSV Connector: NPipeline.Connectors.Csv (Available in initial release)
CsvSourceNode<T>- Read CSV filesCsvSinkNode<T>- Write CSV files
Example:
var source = new CsvSourceNode<Customer>("customers.csv");
var sink = new CsvSinkNode<ProcessedCustomer>("output.csv");
Excel Connector: NPipeline.Connectors.Excel (Planned for future release)
ExcelSourceNode<T>- Read Excel filesExcelSinkNode<T>- Write Excel files
AWS Connector: NPipeline.Connectors.Aws (Planned for future release)
S3SourceNode<T>- Read from S3SqsSourceNode<T>- Read from SQSS3SinkNode<T>- Write to S3
Common Patterns
Create Pipeline and Run
using NPipeline.Execution;
using NPipeline.Pipeline;
public class MyPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var source = builder.AddSource<MySource, MyData>();
var transform = builder.AddTransform<MyTransform, MyData, ProcessedData>();
var sink = builder.AddSink<MySink, ProcessedData>();
builder.Connect(source, transform);
builder.Connect(transform, sink);
}
}
// Execute
var runner = new PipelineRunner();
var context = new PipelineContext();
await runner.RunAsync<MyPipeline>(context);
Use with Dependency Injection
using Microsoft.Extensions.DependencyInjection;
using NPipeline.Extensions.DependencyInjection;
// Approach 1: Assembly Scanning (automatic discovery)
var services = new ServiceCollection();
services.AddLogging();
services.AddNPipeline(Assembly.GetExecutingAssembly());
var provider = services.BuildServiceProvider();
var runner = provider.GetRequiredService<IPipelineRunner>();
await runner.RunAsync<MyPipeline>();
// Approach 2: Fluent Configuration (explicit registration)
var services = new ServiceCollection();
services.AddLogging();
services.AddNPipeline(builder => builder
.AddNode<MySourceNode>()
.AddNode<MyTransformNode>()
.AddNode<MySinkNode>()
.AddPipeline<MyPipeline>()
);
var provider = services.BuildServiceProvider();
var runner = provider.GetRequiredService<IPipelineRunner>();
await runner.RunAsync<MyPipeline>();
Handle Cancellation
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
var context = new PipelineContext
{
CancellationToken = cts.Token
};
try
{
await runner.RunAsync<MyPipeline>(context, cts.Token);
}
catch (OperationCanceledException)
{
Console.WriteLine("Pipeline was cancelled");
}
Share State Between Nodes
var context = new PipelineContext();
context.Items["cache"] = new Dictionary<int, Customer>();
context.Items["stats"] = new ProcessingStats();
// In nodes:
if (context.Items.TryGetValue("cache", out var cacheObj))
{
var cache = cacheObj as Dictionary<int, Customer>;
}
if (context.Items.TryGetValue("stats", out var statsObj))
{
var stats = statsObj as ProcessingStats;
}
Custom Error Handling
using NPipeline.Observability.Logging;
public override async Task<Item> ExecuteAsync(
Item item,
PipelineContext context,
CancellationToken cancellationToken)
{
var logger = context.LoggerFactory.CreateLogger("MyTransform");
try
{
return await ProcessAsync(item, cancellationToken);
}
catch (ValidationException ex)
{
logger.Log(LogLevel.Warning, $"Validation failed: {ex.Message}");
throw;
}
catch (Exception ex)
{
logger.Log(LogLevel.Error, ex, "Unexpected error");
throw;
}
}
Quick Lookup Table
| Need | Class/Method | Package | Availability |
|---|---|---|---|
| Define pipeline | IPipelineDefinition | Core | Initial Release |
| Create source | SourceNode<T> | Core | Initial Release |
| Create transform | TransformNode<TIn, TOut> | Core | Initial Release |
| Create sink | SinkNode<T> | Core | Initial Release |
| Run pipeline | PipelineRunner.RunAsync<T>() | Core | Initial Release |
| Share state | PipelineContext.Items | Core | Initial Release |
| Register nodes | AddNPipeline() | Core | Initial Release |
| Parallel execution | WithParallelOptions() | Extensions.Parallelism | Initial Release |
| Unit testing | InMemorySourceNode<T>, InMemorySinkNode<T> | Extensions.Testing | Initial Release |
| Read CSV | CsvSourceNode<T> | Connectors.Csv | Initial Release |
| Write CSV | CsvSinkNode<T> | Connectors.Csv | Initial Release |
Next Steps
- Common Patterns - Practical examples
- Architecture - Deep dive into how NPipeline works
- Best Practices - Design guidelines
- Getting Started - Installation and first pipeline