Lineage Performance
This guide covers performance characteristics, benchmarks, and optimization strategies for NPipeline Lineage extension.
Performance Characteristics
Overhead Breakdown
Lineage tracking introduces overhead at several points in pipeline execution:
| Component | Impact | Notes |
|---|---|---|
| LineagePacket creation | Per item | One-time allocation |
| Hop recording | Per hop | Metadata updates |
| Sampling check | Per hop | Hash or random operation |
| Dictionary lookup | Per hop | ConcurrentDictionary access |
| Sink export | Per item | Depends on sink implementation |
Note: Total overhead scales with the number of items processed and the number of hops in the pipeline.
Estimated Performance Impact
Performance impact varies based on configuration and pipeline characteristics:
| Configuration | Relative Impact | Notes |
|---|---|---|
| Without lineage (baseline) | None | No tracking overhead |
| 100% tracking, no redaction | Highest | Complete visibility |
| 10% sampling, no redaction | Low | Good balance |
| 1% sampling, no redaction | Minimal | Minimal overhead |
| 10% sampling, with redaction | Low | Reduces memory usage |
Important: Actual performance impact depends on data sizes, pipeline complexity, and hardware. Measure performance in your specific environment to determine the actual impact.
Memory Usage
Per-Item Memory
Memory usage scales with data size and number of hops:
// Approximate memory per item
var perItemMemory =
sizeof(Guid) + // CorrelationId: 16 bytes
sizeof(List<string>) + // TraversalPath: list overhead
sizeof(List<LineageHop>) + // LineageHops: list overhead
(hopCount * perHopOverhead) + // Per hop: metadata
(dataRedacted ? 0 : dataSize); // Data: 0 or actual size
Factors affecting memory:
- Data size (unless redacted)
- Number of hops in the pipeline
- Number of items sampled
- Metadata stored per hop
Per-Pipeline Memory
Fixed overhead per pipeline execution:
| Component | Memory | Notes |
|---|---|---|
| LineageCollector | Dictionary overhead | Scales with items |
| LineageOptions | Configuration | Negligible |
| Total | Varies | Depends on items sampled |
Memory Scaling
Memory usage scales linearly with:
- Number of items sampled: Each sampled item adds memory
- Number of hops: Each hop adds metadata
- Data size: Proportional to actual data (unless redacted)
Formula:
TotalMemory = PipelineOverhead + (SampledItems × PerItemMemory)
Note: Use sampling and redaction to control memory usage. The materialization cap provides additional protection against excessive memory consumption.
CPU Impact
Aggregate/Join Mapping Performance
Aggregate and join nodes are special because output cardinality is often not 1:1 with input cardinality.
Current behavior balances continuity and throughput:
- Preserves upstream lineage context for aggregate/join outputs.
- Uses mapper-driven ancestry when a
LineageMapperAttributemapper is present. - Uses deterministic fallback mapping when mapper data is unavailable.
Per-Item Outcome Registry Overhead
When lineage is enabled, execution strategies record per-item outcomes (retry count, error/skip/dead-letter flags) into a concurrent in-memory registry. This registry is scoped per (PipelineId, NodeId) and is cleared after each node's output stream is fully consumed.
| Operation | Cost | Notes |
|---|---|---|
| Registry write | ConcurrentDictionary add/update per item | Only when lineage is active for the node |
| Registry read | Dictionary lookup per item during hop construction | Inline with existing lineage mapping |
| Registry clear | Single TryRemove per node completion | Frees all per-item entries at once |
The overhead is proportional to the number of items processed per node and is bounded by node lifetime. For pipelines without lineage enabled, no registry operations occur.
To avoid forcing full-stream materialization on hot paths, core only materializes when useful for accurate mapping (for example, mapper-driven mapping or small/capped input sets). For larger streams, it degrades to representative-chain lineage with contributor metadata instead of buffering unbounded output.
This keeps lineage continuous while avoiding a fundamental shift away from streaming execution.
Practical Guidance
- For high-throughput pipelines: keep sampling enabled (
SampleEvery > 1) and avoid expensive custom mappers unless necessary. - For precise aggregate/join ancestry: provide a dedicated mapper and set a practical
MaterializationCap. - For memory-sensitive workloads: prefer
OverflowPolicy = Degradeand keepCaptureHopSnapshots = false.
Per-Operation Costs
| Operation | Cost | Frequency | Notes |
|---|---|---|---|
| LineagePacket creation | Per item | One-time | Allocation |
| Sampling check | Per hop | Hash or random | Fast operation |
| Hop recording | Per hop | List operations | Metadata updates |
| Dictionary lookup | Per hop | ConcurrentDictionary | Efficient access |
| Sink export | Per item | Depends on sink | I/O bound |
Hash vs Random Sampling
| Sampling Type | Characteristics | Use Case |
|---|---|---|
| Deterministic (hash) | Consistent items across runs | Debugging, compliance |
| Random | Different items each run | Monitoring, analytics |
Note: Both sampling methods have similar CPU characteristics. Hash-based sampling provides consistency across runs.
Optimization Strategies
1. Use Sampling
The most effective optimization is sampling:
// Production: 1% sampling
builder.EnableItemLevelLineage(options =>
{
options.SampleEvery = 100;
options.DeterministicSampling = true;
});
// High-volume: 0.1% sampling
builder.EnableItemLevelLineage(options =>
{
options.SampleEvery = 1000;
options.DeterministicSampling = false;
});
Impact:
- Reduces memory usage proportionally
- Reduces CPU overhead proportionally
- Maintains representative visibility
2. Enable Data Redaction
Redact data when possible:
builder.EnableItemLevelLineage(options =>
{
options.RedactData = true; // Reduces memory usage
});
Impact:
- Reduces memory usage by not storing actual data
- No CPU impact
- Maintains all metadata
3. Use Materialization Cap
Limit in-memory storage:
builder.EnableItemLevelLineage(options =>
{
options.MaterializationCap = 10000; // Default
options.OverflowPolicy = LineageOverflowPolicy.Degrade;
});
Impact:
- Predictable memory usage
- Prevents out-of-memory errors
- Graceful degradation
4. Use Async Sinks
Implement async sink operations:
public sealed class DatabaseLineageSink : ILineageSink
{
public async Task RecordAsync(LineageInfo lineageInfo, CancellationToken cancellationToken)
{
// Non-blocking async I/O
await _database.SaveChangesAsync(cancellationToken);
}
}
Impact:
- Non-blocking to pipeline execution
- Better throughput
- No pipeline stalls
5. Batch Sink Operations
Batch multiple lineage records:
public sealed class BatchedLineageSink : ILineageSink
{
private readonly List<LineageInfo> _batch = new();
private readonly int _batchSize;
public Task RecordAsync(LineageInfo lineageInfo, CancellationToken cancellationToken)
{
lock (_batch)
{
_batch.Add(lineageInfo);
if (_batch.Count >= _batchSize)
{
return FlushBatchAsync(cancellationToken);
}
}
return Task.CompletedTask;
}
private async Task FlushBatchAsync(CancellationToken cancellationToken)
{
List<LineageInfo> itemsToFlush;
lock (_batch)
{
itemsToFlush = _batch.ToList();
_batch.Clear();
}
await _database.BulkInsertAsync(itemsToFlush, cancellationToken);
}
}
Impact:
- Reduces database round trips
- Better throughput
- Lower CPU overhead
6. Use Degrade Overflow Policy
Default policy provides best balance:
options.OverflowPolicy = LineageOverflowPolicy.Degrade;
Impact:
- Predictable memory usage
- Maintains visibility
- Automatic cleanup
7. Minimize Hop Count
Reduce pipeline complexity:
// Complex: Many hops
[Source] → [Transform1] → [Transform2] → [Transform3] → [Sink]
// Simplified: Fewer hops
[Source] → [CombinedTransform] → [Sink]
Impact:
- Reduces lineage overhead
- Faster pipeline execution
- Lower memory usage
Benchmarking
Measuring Lineage Overhead
using BenchmarkDotNet.Attributes;
[MemoryDiagnoser]
public class LineageBenchmarks
{
private PipelineRunner _runner = null!;
private PipelineContext _context = null!;
[GlobalSetup]
public void Setup()
{
_runner = PipelineRunner.Create();
_context = new PipelineContext();
}
[Benchmark(Baseline = true)]
public async Task NoLineage()
{
var builder = new PipelineBuilder("TestPipeline");
// No lineage enabled
await _runner.RunAsync(builder.Build(), _context);
}
[Benchmark]
public async Task WithLineage_100Percent()
{
var builder = new PipelineBuilder("TestPipeline");
builder.EnableItemLevelLineage(options =>
{
options.SampleEvery = 1; // 100%
});
await _runner.RunAsync(builder.Build(), _context);
}
[Benchmark]
public async Task WithLineage_10Percent()
{
var builder = new PipelineBuilder("TestPipeline");
builder.EnableItemLevelLineage(options =>
{
options.SampleEvery = 10; // 10%
});
await _runner.RunAsync(builder.Build(), _context);
}
[Benchmark]
public async Task WithLineage_Redacted()
{
var builder = new PipelineBuilder("TestPipeline");
builder.EnableItemLevelLineage(options =>
{
options.SampleEvery = 1;
options.RedactData = true;
});
await _runner.RunAsync(builder.Build(), _context);
}
}
Note: Run benchmarks in your specific environment with representative data to measure actual performance impact.
Real-World Scenarios
Scenario 1: Production ETL Pipeline
Requirements:
- Process items at production volume
- Multiple nodes in pipeline
- Need compliance tracking
Configuration:
builder.EnableItemLevelLineage(options =>
{
options.SampleEvery = 1; // 100% for compliance
options.RedactData = true; // Sensitive data
options.MaterializationCap = 100000; // All items
options.OverflowPolicy = LineageOverflowPolicy.Degrade;
});
Considerations:
- CPU overhead is present but acceptable for compliance
- Memory usage scales with number of items processed
- Throughput impact depends on pipeline complexity and data sizes
Scenario 2: High-Volume Analytics Pipeline
Requirements:
- High throughput processing
- Multiple nodes in pipeline
- Need monitoring, not compliance
Configuration:
builder.EnableItemLevelLineage(options =>
{
options.SampleEvery = 1000; // 0.1% sampling
options.RedactData = true; // Default
options.MaterializationCap = 1000; // Small cap
options.OverflowPolicy = LineageOverflowPolicy.Degrade; // Graceful degradation
});
Considerations:
- Minimal CPU overhead with aggressive sampling
- Low memory footprint with small cap
- Throughput impact is negligible
Scenario 3: Development/Debugging Pipeline
Requirements:
- Process items for testing
- Multiple nodes in pipeline
- Need complete visibility
Configuration:
builder.EnableItemLevelLineage(options =>
{
options.SampleEvery = 1; // 100% for debugging
options.RedactData = false; // Keep data for inspection
options.MaterializationCap = int.MaxValue; // No cap
options.OverflowPolicy = LineageOverflowPolicy.Materialize;
});
Considerations:
- CPU overhead is acceptable for development
- Memory usage scales with test data size
- Throughput is not critical for development
Performance Monitoring
Track Lineage Performance
Monitor lineage-specific metrics:
public sealed class LineagePerformanceSink : ILineageSink
{
private readonly ILogger _logger;
private readonly Stopwatch _sw = new();
public LineagePerformanceSink(ILogger<LineagePerformanceSink> logger)
{
_logger = logger;
}
public async Task RecordAsync(LineageInfo lineageInfo, CancellationToken cancellationToken)
{
_sw.Restart();
// Actual sink operation
await _database.SaveAsync(lineageInfo, cancellationToken);
var elapsed = _sw.ElapsedMilliseconds;
if (elapsed > 100)
{
_logger.LogWarning(
"Slow lineage export: {ElapsedMs}ms for {CorrelationId}",
elapsed,
lineageInfo.CorrelationId);
}
}
}
Metrics to Track
- Lineage export time: Time to write to sinks
- Memory usage: Collector memory over time
- Sampling rate: Actual vs configured
- Overflow events: How often cap is reached
- Sink errors: Failed exports
Best Practices
1. Profile Before Optimizing
Measure before making changes:
// Use BenchmarkDotNet or similar tools
[Benchmark]
public async Task Baseline()
{
await RunPipeline(withLineage: false);
}
[Benchmark]
public async Task WithLineage()
{
await RunPipeline(withLineage: true);
}
2. Start Conservative, Adjust Later
Begin with low sampling rate:
options.SampleEvery = 100; // Conservative start
Monitor and adjust based on requirements.
3. Use Appropriate Overflow Policy
Choose policy based on scenario:
| Scenario | Policy | | | Production | Degrade | | Development | WarnContinue | | Memory-constrained | Strict | | Compliance | Degrade |
4. Implement Async Sinks
Always use async operations in sinks:
public async Task RecordAsync(LineageInfo lineageInfo, CancellationToken cancellationToken)
{
await _repository.SaveAsync(lineageInfo, cancellationToken);
}
5. Monitor Memory Usage
Track collector memory:
var collector = serviceProvider.GetRequiredService<ILineageCollector>();
var lineageCount = collector.GetAllLineageInfo().Count;
var estimatedMemory = lineageCount * 750; // Approximate bytes
_logger.LogInformation(
"Lineage memory: {Count} items, ~{MemoryMB} MB",
lineageCount,
estimatedMemory / (1024 * 1024));
Note: Actual memory usage depends on data sizes and pipeline complexity. Monitor in production to understand real memory consumption.
Related Topics
- Getting Started - Installation and basic setup
- Configuration - Configuration options and settings
- Architecture - Internal architecture and design decisions
- Use Cases - Common use cases and examples