Skip to main content

Nested Composition

Overview

Nested composition allows composite nodes to contain other composite nodes, creating deep hierarchical pipeline structures. This enables building complex workflows from layers of simpler sub-pipelines.

Basic Nesting

Two-Level Nesting

The simplest form of nesting - a composite node within a parent pipeline:

// Level 2: Inner sub-pipeline (multiplies by 2)
public class DoubleTransformPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var input = builder.AddSource<PipelineInputSource<int>, int>("input");
var transform = builder.AddTransform<DoubleTransform, int, int>("double");
var output = builder.AddSink<PipelineOutputSink<int>, int>("output");

builder.Connect(input, transform);
builder.Connect(transform, output);
}
}

// Level 1: Outer sub-pipeline (contains composite)
public class ProcessingPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var input = builder.AddSource<PipelineInputSource<int>, int>("input");

// Nested composite node
var double = builder.AddComposite<int, int, DoubleTransformPipeline>("double");

var output = builder.AddSink<PipelineOutputSink<int>, int>("output");

builder.Connect(input, double);
builder.Connect(double, output);
}
}

// Level 0: Main pipeline
public class MainPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var source = builder.AddSource<IntSource, int>("source");

// Top-level composite node
var process = builder.AddComposite<int, int, ProcessingPipeline>("process");

var sink = builder.AddSink<IntSink, int>("sink");

builder.Connect(source, process);
builder.Connect(process, sink);
}
}

Execution Flow

MainPipeline:
[Source: 1, 2, 3] → [ProcessingPipeline] → [Sink]

ProcessingPipeline:
[Input] → [DoubleTransformPipeline] → [Output]

DoubleTransformPipeline:
[Input] → [Transform x2] → [Output]

Result: 2, 4, 6

Deep Nesting

Three or More Levels

Unlimited nesting depth is supported:

// Level 3: Core processing
public class CorePipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var input = builder.AddSource<PipelineInputSource<Data>, Data>("input");
var process = builder.AddTransform<CoreTransform, Data, Data>("process");
var output = builder.AddSink<PipelineOutputSink<Data>, Data>("output");

builder.Connect(input, process);
builder.Connect(process, output);
}
}

// Level 2: Validation + Core
public class ValidationProcessingPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var input = builder.AddSource<PipelineInputSource<Data>, Data>("input");

var validate = builder.AddTransform<Validator, Data, Data>("validate");
var core = builder.AddComposite<Data, Data, CorePipeline>("core");

var output = builder.AddSink<PipelineOutputSink<Data>, Data>("output");

builder.Connect(input, validate);
builder.Connect(validate, core);
builder.Connect(core, output);
}
}

// Level 1: Enrichment + Validation + Core
public class EnrichmentPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var input = builder.AddSource<PipelineInputSource<Data>, Data>("input");

var enrich = builder.AddTransform<Enricher, Data, Data>("enrich");
var validateAndProcess = builder.AddComposite<Data, Data, ValidationProcessingPipeline>("validate-process");

var output = builder.AddSink<PipelineOutputSink<Data>, Data>("output");

builder.Connect(input, enrich);
builder.Connect(enrich, validateAndProcess);
builder.Connect(validateAndProcess, output);
}
}

// Level 0: Main pipeline
public class MainPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var source = builder.AddSource<DataSource, Data>("source");
var process = builder.AddComposite<Data, Data, EnrichmentPipeline>("process");
var sink = builder.AddSink<DataSink, Data>("sink");

builder.Connect(source, process);
builder.Connect(process, sink);
}
}

Context Propagation

Inheritance Through Levels

Context inheritance can be configured at each level:

// Level 2: Core - no inheritance
public class CorePipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
// Uses Default context - isolated
var input = builder.AddSource<PipelineInputSource<T>, T>("input");
var transform = builder.AddTransform<CoreTransform, T, T>("core");
var output = builder.AddSink<PipelineOutputSink<T>, T>("output");

builder.Connect(input, transform);
builder.Connect(transform, output);
}
}

// Level 1: Middle - inherits from Level 0
public class MiddlePipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var input = builder.AddSource<PipelineInputSource<T>, T>("input");

// Pass context to Level 2 with inheritance
var core = builder.AddComposite<T, T, CorePipeline>(
contextConfiguration: CompositeContextConfiguration.InheritAll);

var output = builder.AddSink<PipelineOutputSink<T>, T>("output");

builder.Connect(input, core);
builder.Connect(core, output);
}
}

// Level 0: Main - sets up initial context
public class MainPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var source = builder.AddSource<DataSource, T>("source");

// Inherit context to Level 1
var middle = builder.AddComposite<T, T, MiddlePipeline>(
contextConfiguration: CompositeContextConfiguration.InheritAll);

var sink = builder.AddSink<DataSink, T>("sink");

builder.Connect(source, middle);
builder.Connect(middle, sink);
}
}

// Usage
var context = new PipelineContext();
context.Parameters["Config"] = "value";

// Config propagates: Level 0 → Level 1 → Level 2
await runner.RunAsync<MainPipeline>(context);

Selective Propagation

Different inheritance at each level:

// Level 1: Inherits parameters only
builder.AddComposite<T, T, MiddlePipeline>(
contextConfiguration: new CompositeContextConfiguration
{
InheritParentParameters = true, // Pass params down
InheritParentItems = false, // Don't pass items
InheritParentProperties = false
});

// In MiddlePipeline, Level 2: Inherits everything from Level 1
builder.AddComposite<T, T, CorePipeline>(
contextConfiguration: CompositeContextConfiguration.InheritAll);

Common Patterns

Pattern 1: Layered Architecture

Organize pipelines by responsibility layers:

// Layer 3: Business Logic
public class BusinessLogicPipeline : IPipelineDefinition { }

// Layer 2: Validation + Business Logic
public class ValidationLayerPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var input = builder.AddSource<PipelineInputSource<T>, T>("input");
var validate = builder.AddTransform<Validator, T, T>("validate");
var business = builder.AddComposite<T, T, BusinessLogicPipeline>("business");
var output = builder.AddSink<PipelineOutputSink<T>, T>("output");

builder.Connect(input, validate);
builder.Connect(validate, business);
builder.Connect(business, output);
}
}

// Layer 1: Enrichment + Validation + Business Logic
public class EnrichmentLayerPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var input = builder.AddSource<PipelineInputSource<T>, T>("input");
var enrich = builder.AddTransform<Enricher, T, T>("enrich");
var validateAndProcess = builder.AddComposite<T, T, ValidationLayerPipeline>("validate-process");
var output = builder.AddSink<PipelineOutputSink<T>, T>("output");

builder.Connect(input, enrich);
builder.Connect(enrich, validateAndProcess);
builder.Connect(validateAndProcess, output);
}
}

// Layer 0: Orchestration
public class OrchestrationPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var source = builder.AddSource<DataSource, T>("source");
var process = builder.AddComposite<T, T, EnrichmentLayerPipeline>("process");
var sink = builder.AddSink<DataSink, T>("sink");

builder.Connect(source, process);
builder.Connect(process, sink);
}
}

Pattern 2: Recursive Processing

Process hierarchical data structures:

// Process node and children
public class TreeProcessingPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var input = builder.AddSource<PipelineInputSource<TreeNode>, TreeNode>("input");

// Process current node
var processNode = builder.AddTransform<NodeProcessor, TreeNode, TreeNode>("process-node");

// Process children (recursive composite)
var processChildren = builder.AddTransform<ChildrenProcessor, TreeNode, TreeNode>("process-children");

var output = builder.AddSink<PipelineOutputSink<TreeNode>, TreeNode>("output");

builder.Connect(input, processNode);
builder.Connect(processNode, processChildren);
builder.Connect(processChildren, output);
}
}

// ChildrenProcessor creates composite nodes for each child
public class ChildrenProcessor : TransformNode<TreeNode, TreeNode>
{
public override async Task<TreeNode> ExecuteAsync(TreeNode node, PipelineContext context, CancellationToken ct)
{
if (!node.HasChildren)
return node;

// Process each child with a nested pipeline
foreach (var child in node.Children)
{
// Create and run sub-pipeline for child
// (Implementation details omitted for brevity)
}

return node;
}
}

Pattern 3: Pipeline Templates

Reusable pipeline templates with different core logic:

// Template structure
public class PipelineTemplate<TCoreLogic> : IPipelineDefinition
where TCoreLogic : IPipelineDefinition, new()
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var input = builder.AddSource<PipelineInputSource<Data>, Data>("input");

// Common preprocessing
var preprocess = builder.AddTransform<Preprocessor, Data, Data>("preprocess");

// Core logic (varies)
var core = builder.AddComposite<Data, Data, TCoreLogic>("core");

// Common postprocessing
var postprocess = builder.AddTransform<Postprocessor, Data, Data>("postprocess");

var output = builder.AddSink<PipelineOutputSink<Data>, Data>("output");

builder.Connect(input, preprocess);
builder.Connect(preprocess, core);
builder.Connect(core, postprocess);
builder.Connect(postprocess, output);
}
}

// Different core implementations
public class ValidationCorePipeline : IPipelineDefinition { }
public class TransformationCorePipeline : IPipelineDefinition { }

// Usage
builder.AddComposite<Data, Data, PipelineTemplate<ValidationCorePipeline>>("validate");
builder.AddComposite<Data, Data, PipelineTemplate<TransformationCorePipeline>>("transform");

Performance Considerations

Nesting Depth Impact

Each nesting level adds overhead:

AspectImpact per Level
Context Creation~1-2μs
Memory~1-5KB (depending on context)
Stack DepthMinimal (async execution)

Practical Limit: 5-10 levels before performance degrades noticeably.

Optimization Strategies

1. Flatten When Possible

❌ Avoid excessive nesting:
[Composite A] → [Composite B] → [Composite C] → [Transform]

✅ Prefer flatter structure:
[Composite ABC] → [Transform]

2. Cache Composite Definitions

Reuse sub-pipeline definitions:

// Define once, use many times
var validationConfig = CompositeContextConfiguration.Default;

builder.AddComposite<T, T, ValidationPipeline>("validate-1", validationConfig);
builder.AddComposite<T, T, ValidationPipeline>("validate-2", validationConfig);

3. Minimize Context Inheritance

Only inherit at necessary levels:

✅ Good: Selective inheritance
// Level 0 → Level 1: InheritAll
// Level 1 → Level 2: Default (no inheritance)

❌ Bad: Unnecessary inheritance
// Level 0 → Level 1: InheritAll
// Level 1 → Level 2: InheritAll
// Level 2 → Level 3: InheritAll

Testing Nested Pipelines

Test Each Level Independently

[Fact]
public async Task Level3_CorePipeline_ShouldProcess()
{
// Test deepest level first
var context = new PipelineContext();
await runner.RunAsync<CorePipeline>(context);
// Assert
}

[Fact]
public async Task Level2_MiddlePipeline_ShouldProcess()
{
// Test middle level
var context = new PipelineContext();
await runner.RunAsync<MiddlePipeline>(context);
// Assert
}

[Fact]
public async Task Level1_MainPipeline_ShouldProcess()
{
// Test full hierarchy
var context = new PipelineContext();
await runner.RunAsync<MainPipeline>(context);
// Assert
}

Test Integration Between Levels

[Fact]
public async Task NestedPipelines_ShouldPassDataCorrectly()
{
var context = new PipelineContext();
context.Parameters["Input"] = "test";

await runner.RunAsync<MainPipeline>(context);

// Verify data flowed through all levels
var result = context.Parameters["Output"];
Assert.Equal("expected", result);
}

Best Practices

1. Limit Nesting Depth

Keep nesting to 2-3 levels for most use cases:

✅ Good: 2-3 levels
Main → Processing → Validation

❌ Excessive: 5+ levels
Main → Orchestration → Processing → Transformation → Validation → Core

2. Name Levels Clearly

Use clear, hierarchical naming:

✅ Good names:
"order-processing"
↳ "validation"
↳ "schema-validation"

❌ Bad names:
"pipeline1"
↳ "pipeline2"
↳ "pipeline3"

3. Document Hierarchy

Document the pipeline structure:

/// <summary>
/// Order processing pipeline with nested validation.
/// </summary>
/// <remarks>
/// Pipeline Structure:
/// - Level 0: OrderProcessingPipeline
/// - Level 1: ValidationPipeline
/// - Level 2: SchemaValidationPipeline
/// - Level 1: TransformationPipeline
/// </remarks>
public class OrderProcessingPipeline : IPipelineDefinition
{
// ...
}

4. Use Composition for Reusability

Nest when reusing sub-pipelines:

✅ Good: Reusable validation
public class OrderPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var validate = builder.AddComposite<Order, ValidatedOrder, OrderValidationPipeline>("validate");
// Use same validation in multiple pipelines
}
}

public class InvoicePipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var validate = builder.AddComposite<Invoice, ValidatedInvoice, InvoiceValidationPipeline>("validate");
// Reuse validation logic
}
}

5. Balance Isolation and Integration

Choose appropriate inheritance at each level:

// Level 0: Set up shared config
context.Parameters["ApiKey"] = key;

// Level 1: Inherit config, isolate processing
builder.AddComposite<T, T, ProcessingPipeline>(
contextConfiguration: new CompositeContextConfiguration
{
InheritParentParameters = true // Get config
});

// In ProcessingPipeline, Level 2: Full isolation for core logic
builder.AddComposite<T, T, CorePipeline>(
contextConfiguration: CompositeContextConfiguration.Default);

Summary

Nested composition enables building complex pipelines from simple building blocks:

  • Unlimited depth supported (practical limit 5-10 levels)
  • Context propagation configurable at each level
  • Independent testing of each level
  • Performance impact increases with depth
  • Best practices favor 2-3 levels for most use cases

Use nesting when it improves modularity, reusability, and maintainability - but avoid excessive depth that adds complexity without benefit.