Skip to main content

Troubleshooting Guide

Related Documentation If you're experiencing issues with resilience configuration (retries, node restarts, materialization), see Resilience Troubleshooting instead.

Pipeline Execution Issues

Pipeline doesn't execute

Symptoms: Pipeline runs but nothing happens.

Common Causes:

  1. Sinks not configured
// BAD - Source and transform but no sink
class MyPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var source = builder.AddSource<MySource, MyData>();
var transform = builder.AddTransform<MyTransform, MyData, ProcessedData>();
builder.Connect(source, transform);
// Missing sink connection!
}
}

// GOOD - Complete pipeline with sink
class MyPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var source = builder.AddSource<MySource, MyData>();
var transform = builder.AddTransform<MyTransform, MyData, ProcessedData>();
var sink = builder.AddSink<MySink, ProcessedData>();
builder.Connect(source, transform);
builder.Connect(transform, sink); // Sink connected
}
}
  1. Source not yielding data

Verify your source node returns data:

public override IDataPipe<T> Initialize(PipelineContext context, CancellationToken cancellationToken)
{
static IAsyncEnumerable<T> GetDataAsync(CancellationToken ct)
{
return GetData();

async IAsyncEnumerable<T> GetData()
{
// Add logging to verify data is being yielded
await foreach (var item in GetSourceData(ct))
{
Console.WriteLine($"Yielding: {item}"); // Verify data
yield return item;
}
}
}

return new StreamingDataPipe<T>(GetDataAsync(cancellationToken));
}
  1. Transform returning empty

Ensure transform yields data for each input:

// BAD - Might not yield for all items
public override Task<Item> ExecuteAsync(Item item, PipelineContext context, CancellationToken cancellationToken)
{
if (item.IsSpecial)
return Task.FromResult(item);
// Returns null implicitly for others
}

// GOOD - Explicit for all paths
public override Task<Item> ExecuteAsync(Item item, PipelineContext context, CancellationToken cancellationToken)
{
if (item.IsSpecial)
return Task.FromResult(Transform(item));
else
return Task.FromResult(item); // Always return
}

Pipeline throws "Node not registered" exception

Symptoms: InvalidOperationException: Node type not found in registry

Cause: Nodes not registered with dependency injection.

Solution - Assembly Scanning (Automatic Discovery):

// WRONG - No assembly specified
services.AddNPipeline();

// CORRECT - Include your assembly
services.AddNPipeline(Assembly.GetExecutingAssembly());

// CORRECT - Include multiple assemblies if nodes are in different projects
services.AddNPipeline(
Assembly.GetExecutingAssembly(),
typeof(ExternalNode).Assembly
);

Solution - Fluent Configuration (Explicit Registration):

If you prefer not to use reflection or have a specific set of nodes, use the fluent API:

// CORRECT - Explicit registration without reflection
services.AddNPipeline(builder => builder
.AddNode<MySourceNode>()
.AddNode<MyTransformNode>()
.AddNode<MySinkNode>()
.AddPipeline<MyPipelineDefinition>()
);

Pipeline connection fails silently

Symptoms: Nodes connect but data doesn't flow.

Cause: Incompatible types or incorrect graph structure.

Solution: Verify type compatibility:

// BAD - Type mismatch
var source = builder.AddSource<SourceNode<int>, int>();
var transform = builder.AddTransform<TransformNode<string, int>, string, int>(); // Expects string
builder.Connect(source, transform); // Type mismatch!

// GOOD - Matching types
var source = builder.AddSource<SourceNode<int>, int>();
var transform = builder.AddTransform<TransformNode<int, int>, int, int>(); // Expects int
builder.Connect(source, transform);

Performance Issues

Pipeline is very slow

Diagnosis Steps:

  1. Measure each stage:
public override async Task<Item> ExecuteAsync(Item item, PipelineContext context, CancellationToken cancellationToken)
{
var sw = Stopwatch.StartNew();
try
{
var result = await SlowOperationAsync(item, cancellationToken);
sw.Stop();
if (sw.ElapsedMilliseconds > 1000)
logger.LogWarning($"Slow operation: {sw.ElapsedMilliseconds}ms");
return result;
}
catch (Exception ex)
{
logger.LogError(ex, $"Operation failed after {sw.ElapsedMilliseconds}ms");
throw;
}
}
  1. Check for blocking operations:
// BAD - Blocking I/O
public override Task<Item> ExecuteAsync(Item item, PipelineContext context, CancellationToken cancellationToken)
{
var data = database.GetData(item.Id); // Synchronous blocking
return Task.FromResult(Transform(data));
}

// GOOD - Async I/O
public override Task<Item> ExecuteAsync(Item item, PipelineContext context, CancellationToken cancellationToken)
{
var data = await database.GetDataAsync(item.Id, cancellationToken); // Async
return Task.FromResult(Transform(data));
}
  1. Enable parallelism if CPU-bound:
services.AddNPipelineParallelism();

// In pipeline definition:
builder.WithParallelOptions(new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount });

Memory usage constantly grows

Symptoms: Application memory increases over time.

Common Causes:

  1. Loading all data into memory:
// BAD - Loads all records at once
async IAsyncEnumerable<Item> ReadAsync()
{
var allRecords = database.GetAllRecords().ToList(); // Everything in memory!
foreach (var record in allRecords)
yield return record;
}

// GOOD - Streams from database
async IAsyncEnumerable<Item> ReadAsync()
{
var reader = database.GetRecords(); // Lazy enumerable
await foreach (var record in reader)
yield return record;
}
  1. Not disposing resources:
// BAD - Connections not disposed
public override async Task ExecuteAsync(IDataPipe<Item> input, PipelineContext context, CancellationToken cancellationToken)
{
var connection = new SqlConnection(_connString);
await connection.OpenAsync(cancellationToken);
// ... use connection
// Missing dispose!
}

// GOOD - Properly disposed
public override async Task ExecuteAsync(IDataPipe<Item> input, PipelineContext context, CancellationToken cancellationToken)
{
using var connection = new SqlConnection(_connString);
await connection.OpenAsync(cancellationToken);
// ... use connection
// Disposed when out of scope
}
  1. Accumulating state in context:
// BAD - Context grows unbounded
public override Task<Item> ExecuteAsync(Item item, PipelineContext context, CancellationToken cancellationToken)
{
if (!context.Items.ContainsKey("cache"))
context.Items["cache"] = new Dictionary<int, Item>();

var cache = context.Items["cache"] as Dictionary<int, Item>;
cache[item.Id] = item; // Cache grows forever!
return Task.FromResult(item);
}

// GOOD - Limited cache or external state
public override Task<Item> ExecuteAsync(Item item, PipelineContext context, CancellationToken cancellationToken)
{
// Use bounded cache or external storage
_cache.AddOrUpdate(item.Id, item,
(_, old) => item,
TimeSpan.FromMinutes(5)); // Bounded by time
return Task.FromResult(item);
}

Error Handling Issues

Exceptions are silently swallowed

Symptoms: Pipeline runs without errors but data isn't processed.

Cause: Caught and not re-thrown or logged.

Solution:

// BAD - Silent failures
try
{
return await ProcessAsync(item);
}
catch (Exception ex)
{
// Swallowed silently
}

// GOOD - Logged and re-thrown
try
{
return await ProcessAsync(item);
}
catch (Exception ex)
{
logger.LogError(ex, "Processing failed for item");
throw; // Re-throw or handle explicitly
}

Tip: If you encounter error codes in your exceptions (e.g., [NP0301]), see the Error Codes Reference for detailed explanations and solutions.

Cancellation not working

Symptoms: Pipeline ignores cancellation requests.

Cause: Not checking the cancellation token.

Solution:

// BAD - Ignores cancellation
public override async Task<Item> ExecuteAsync(Item item, PipelineContext context, CancellationToken cancellationToken)
{
for (int i = 0; i < 1000000; i++)
{
await Task.Delay(10); // No cancellation check
}
return item;
}

// GOOD - Checks cancellation
public override async Task<Item> ExecuteAsync(Item item, PipelineContext context, CancellationToken cancellationToken)
{
for (int i = 0; i < 1000000; i++)
{
cancellationToken.ThrowIfCancellationRequested(); // Check token
await Task.Delay(10, cancellationToken);
}
return item;
}

Retry mechanism not triggering

Symptoms: Transient errors cause pipeline to fail instead of retry.

Solution: Implement explicit retry logic:

private async Task<T> ExecuteWithRetryAsync<T>(
Func<CancellationToken, Task<T>> operation,
int maxRetries = 3,
CancellationToken cancellationToken = default)
{
int retryCount = 0;

while (true)
{
try
{
return await operation(cancellationToken);
}
catch (IOException) when (retryCount < maxRetries) // Retry on transient
{
retryCount++;
var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount - 1));
logger.LogWarning($"Transient error, retrying in {delay.TotalSeconds}s");
await Task.Delay(delay, cancellationToken);
}
}
}

Data Issues

Data transformed incorrectly

Symptoms: Output data is malformed or incomplete.

Diagnosis:

  1. Add detailed logging:
public override Task<Item> ExecuteAsync(Item item, PipelineContext context, CancellationToken cancellationToken)
{
logger.LogDebug($"Input: {JsonSerializer.Serialize(item)}");
var result = Transform(item);
logger.LogDebug($"Output: {JsonSerializer.Serialize(result)}");
return Task.FromResult(result);
}
  1. Verify with unit tests:
[Fact]
public async Task TransformHandlesNullValues()
{
var transform = new MyTransform();
var context = new PipelineContext();

var input = new Item { Value = null };
var output = await transform.ExecuteAsync(input, context, CancellationToken.None);

Assert.NotNull(output);
Assert.Null(output.Value);
}

Missing data in output

Symptoms: Some input records don't appear in output.

Common Causes:

  1. Transform filtering unintentionally:
// BAD - Implicit filtering
public override Task<Item> ExecuteAsync(Item item, PipelineContext context, CancellationToken cancellationToken)
{
if (item.IsValid)
return Task.FromResult(Transform(item));
// Null returned for invalid items
}

// GOOD - Explicit handling
public override Task<Item> ExecuteAsync(Item item, PipelineContext context, CancellationToken cancellationToken)
{
if (!item.IsValid)
{
logger.LogWarning($"Invalid item skipped: {item.Id}");
throw new ValidationException(); // Fail fast or log
}
return Task.FromResult(Transform(item));
}
  1. Async enumerable not fully consumed:
// BAD - Only reads first item
var result = source.Initialize(context, CancellationToken.None);
var first = (await result.GetAsyncEnumerator().MoveNextAsync()).Current;

// GOOD - Consumes all items
var result = source.Initialize(context, CancellationToken.None);
await foreach (var item in result)
{
// Process all items
}

Configuration Issues

Dependency Injection not finding nodes

Symptoms: ServiceCollection doesn't contain... or similar.

Cause: Assembly scanning not including node locations.

Solution:

// Include all assemblies containing nodes
services.AddNPipeline(
typeof(MyNode).Assembly,
typeof(AnotherNode).Assembly
);

// Or use the entry assembly
services.AddNPipeline(Assembly.GetEntryAssembly()!);

Configuration values not applying

Symptoms: Settings are defined but not used by nodes.

Solution: Inject IOptions<T>:

public class MyTransform : TransformNode<Item, Item>
{
private readonly IOptions<MySettings> _options;

public MyTransform(IOptions<MySettings> options)
{
_options = options;
}

public override Task<Item> ExecuteAsync(Item item, PipelineContext context, CancellationToken cancellationToken)
{
var timeout = _options.Value.Timeout; // Use injected settings
return Task.FromResult(item);
}
}

Testing Issues

Test pipeline doesn't execute expected nodes

Symptoms: Test passes but you're unsure nodes actually ran.

Solution: Add verification logging:

[Fact]
public async Task PipelineProcessesData()
{
var source = new InMemorySourceNode<int> { Data = new[] { 1, 2, 3 } };
var sink = new InMemorySinkNode<int>();

var context = new PipelineContext();
var runner = PipelineRunner.Create();

// Register nodes
context.Items["source"] = source;
context.Items["sink"] = sink;

await runner.RunAsync<MyPipeline>(context);

// Verify sink received data
var results = await sink.Completion;
Assert.Equal(3, results.Count); // Verify processing occurred
}

Mocks not being used in pipeline

Symptoms: Test uses mock but real service is called.

Cause: Registration order matters in Dependency Injection (DI) - services added first take precedence.

Solution - Assembly Scanning Approach:

var mockService = new Mock<IMyService>();
var services = new ServiceCollection();

// Register mock BEFORE AddNPipeline so it's used
services.AddSingleton(mockService.Object);
services.AddNPipeline(Assembly.GetExecutingAssembly());

var provider = services.BuildServiceProvider();
var runner = provider.GetRequiredService<IPipelineRunner>();

Solution - Fluent Configuration Approach:

With fluent configuration, you can explicitly control which nodes/services are registered:

var mockService = new Mock<IMyService>();
var services = new ServiceCollection();

// Register mock BEFORE AddNPipeline fluent builder
services.AddSingleton(mockService.Object);

// Explicitly register only the nodes you want, skipping real implementations
services.AddNPipeline(builder => builder
.AddNode<MySourceNode>()
.AddNode<MyTransformNode>()
.AddNode<MySinkNode>()
.AddPipeline<MyPipelineDefinition>()
// No assembly scanning means no other nodes auto-registered
);

var provider = services.BuildServiceProvider();
var runner = provider.GetRequiredService<IPipelineRunner>();

Getting Help

If you can't resolve the issue:

  1. Check the logs - Enable debug logging:
services.AddLogging(builder => builder.AddConsole().SetMinimumLevel(LogLevel.Debug));
  1. Review similar examples - See Common Patterns

  2. Check FAQ - Common questions answered

  3. Review Error Handling - Error-specific guidance

  4. Look up error codes - See Error Codes Reference for NP error codes (e.g., [NP0101])

  5. Check source code - Inspect node implementations in /src/NPipeline

Next Steps