Skip to main content

Parallel Execution Configuration

NPipeline provides multiple ways to configure parallel execution, each suited to different needs:

  • Preset API: Best for common workload patterns with automatically optimized defaults
  • Builder API: Best for flexible customization while starting from sensible defaults
  • Manual Configuration API: Best for advanced performance tuning and complex scenarios

Preset API: Using Workload Type Presets

For common workload patterns, use the RunParallel extension method with a workload type to automatically select optimal parallelism configuration:

using NPipeline.Extensions.Parallelism;
using NPipeline.Pipeline;

public sealed class SimplifiedParallelPipeline : IPipelineDefinition
{
public void DefinePipeline(PipelineBuilder builder)
{
builder
.AddTransform<MyTransform, Input, Output>()
.RunParallel(builder, ParallelWorkloadType.IoBound)
.AddSink<MySink>();
}
}

That's it! The RunParallel method with ParallelWorkloadType.IoBound automatically configures:

  • Degree of parallelism: ProcessorCount * 4 (hide I/O latency)
  • Queue length: ProcessorCount * 8
  • Output buffer: ProcessorCount * 16
  • Queue policy: Block (apply backpressure)

Workload Type Presets

NPipeline provides four built-in presets optimized for common workload patterns:

ParallelWorkloadType.General (Default)

Best for: Mixed CPU and I/O workloads, when you're unsure

builder
.AddTransform<MyTransform, Input, Output>()
.RunParallel(builder, ParallelWorkloadType.General)

Configuration:

  • DOP: ProcessorCount * 2
  • Queue: ProcessorCount * 4
  • Buffer: ProcessorCount * 8

ParallelWorkloadType.CpuBound

Best for: CPU-intensive operations, mathematical computations, DSP

builder
.AddTransform<IntensiveMathTransform, double, double>()
.RunParallel(builder, ParallelWorkloadType.CpuBound)

Configuration (avoids oversubscription):

  • DOP: ProcessorCount (1:1 with CPU cores)
  • Queue: ProcessorCount * 2
  • Buffer: ProcessorCount * 4

ParallelWorkloadType.IoBound

Best for: File I/O, database operations, local service calls

builder
.AddTransform<DatabaseTransform, int, Record>()
.RunParallel(builder, ParallelWorkloadType.IoBound)

Configuration (high parallelism hides I/O latency):

  • DOP: ProcessorCount * 4
  • Queue: ProcessorCount * 8
  • Buffer: ProcessorCount * 16

ParallelWorkloadType.NetworkBound

Best for: HTTP calls, remote service calls, high-latency network operations

builder
.AddTransform<WebServiceTransform, Request, Response>()
.RunParallel(builder, ParallelWorkloadType.NetworkBound)

Configuration (maximum throughput under high latency):

  • DOP: Min(ProcessorCount * 8, 100) (capped at 100)
  • Queue: 200 (large buffer)
  • Buffer: 400

Builder API: Fine-Grained Control with ParallelOptionsBuilder

When you need to customize beyond the presets, use the fluent builder API for flexible configuration:

builder
.AddTransform<MyTransform, Input, Output>()
.RunParallel(builder, opt => opt
.MaxDegreeOfParallelism(8)
.MaxQueueLength(100)
.DropOldestOnBackpressure()
.OutputBufferCapacity(50)
.AllowUnorderedOutput()
.MetricsInterval(TimeSpan.FromSeconds(2)))
.AddSink<MySink>();

The ParallelOptionsBuilder provides full configuration:

public class ParallelOptionsBuilder
{
// Configure degree of parallelism
public ParallelOptionsBuilder MaxDegreeOfParallelism(int value)

// Configure input queue behavior
public ParallelOptionsBuilder MaxQueueLength(int value)
public ParallelOptionsBuilder BlockOnBackpressure()
public ParallelOptionsBuilder DropOldestOnBackpressure()
public ParallelOptionsBuilder DropNewestOnBackpressure()

// Configure output buffering
public ParallelOptionsBuilder OutputBufferCapacity(int value)
public ParallelOptionsBuilder AllowUnorderedOutput()

// Configure metrics
public ParallelOptionsBuilder MetricsInterval(TimeSpan interval)

// Build the final options
public ParallelOptions Build()
}

Comparison: Configuration Methods

AspectManual APIPreset APIBuilder API
Lines of code5-612-3
Parameters to understandAll 5+01-2 (as needed)
Configuration styleExplicitPredefinedFluent
When to useAdvanced tuningCommon patternsCustom needs
Learning curveSteeperGentleGradual

Manual Configuration:

.WithBlockingParallelism(
builder,
maxDegreeOfParallelism: Environment.ProcessorCount * 4,
maxQueueLength: Environment.ProcessorCount * 8,
outputBufferCapacity: Environment.ProcessorCount * 16)

Preset API:

.RunParallel(builder, ParallelWorkloadType.IoBound)

Builder API:

.RunParallel(builder, opt => opt
.MaxDegreeOfParallelism(Environment.ProcessorCount * 4)
.MaxQueueLength(Environment.ProcessorCount * 8))

When to Use Each Approach

ScenarioRecommendation
New to NPipelinePreset API with ParallelWorkloadType
Typical workloadsPreset API
Need slight customizationBuilder API
Advanced performance tuningManual API
PrototypingPreset API
Production optimizationBuilder or Manual API (after profiling)

Example: Full Pipeline with Simplified API

using NPipeline.Extensions.Parallelism;
using NPipeline.Pipeline;

public class FileProcessingPipeline : IPipelineDefinition
{
public void DefinePipeline(PipelineBuilder builder)
{
builder
// Read files (I/O-bound)
.AddTransform<FileReaderTransform, string, FileContent>()
.RunParallel(builder, ParallelWorkloadType.IoBound)

// Parse content (CPU-bound)
.AddTransform<ParserTransform, FileContent, ParsedData>()
.RunParallel(builder, ParallelWorkloadType.CpuBound)

// Upload results (network-bound)
.AddTransform<UploaderTransform, ParsedData, UploadResult>()
.RunParallel(builder, ParallelWorkloadType.NetworkBound)

// Store results
.AddSink<DatabaseSinkNode<UploadResult>>();
}
}

This example shows how different stages of a pipeline can use different workload types, automatically configured for their specific characteristics.

  • Validation: Learn about parallel configuration validation rules
  • Best Practices: Guidelines for optimizing parallelism in your pipelines
  • Thread Safety: Comprehensive guide to thread safety and shared state management