Skip to main content

Lookup Nodes

Lookup nodes are specialized transforms that enrich incoming data items by querying an external data source (e.g., a database, an in-memory cache, an API) to retrieve additional information. This is a common pattern in ETL and data processing pipelines where raw data needs to be augmented with reference data.

NPipeline provides an abstract LookupNode<TIn, TKey, TValue, TOut> base class and a concrete InMemoryLookupNode<TIn, TKey, TValue, TOut> for in-memory lookups.

LookupNode<TIn, TKey, TValue, TOut>

This abstract base class allows you to define custom lookup logic. You need to specify:

  • TIn: The type of the input item to be enriched.
  • TKey: The type of the key used to perform the lookup (extracted from TIn).
  • TValue: The type of the value retrieved from the lookup source.
  • TOut: The type of the enriched output item.

To implement a custom lookup, you typically override methods to:

InMemoryLookupNode<TIn, TKey, TValue, TOut>

InMemoryLookupNode is a concrete implementation of LookupNode that performs lookups against an in-memory dictionary. It's useful for small to medium-sized reference datasets that can be loaded entirely into memory.

Example: Enriching Product Orders with Product Details

Let's say we have a stream of OrderLine items containing a ProductId and a static, in-memory collection of Product details. We want to enrich each OrderLine with the ProductName.

using NPipeline;
using NPipeline.Nodes;

// Define input and lookup data structures
public sealed record OrderLine(int OrderLineId, int ProductId, int Quantity);
public sealed record Product(int ProductId, string ProductName, decimal UnitPrice);
public sealed record EnrichedOrderLine(int OrderLineId, int ProductId, string ProductName, int Quantity);

public sealed class ProductLookupNode : InMemoryLookupNode<OrderLine, int, Product, EnrichedOrderLine>
{
public ProductLookupNode(IReadOnlyDictionary<int, Product> lookupData) : base(lookupData) { }

protected override int ExtractKey(OrderLine input, PipelineContext context) => input.ProductId;

protected override EnrichedOrderLine CreateOutput(OrderLine input, Product? lookupValue, PipelineContext context)
{
if (lookupValue is null)
{
Console.WriteLine($"Warning: Product with ID {input.ProductId} not found for OrderLine {input.OrderLineId}. Skipping enrichment.");
return new EnrichedOrderLine(input.OrderLineId, input.ProductId, "Unknown Product", input.Quantity);
}

return new EnrichedOrderLine(
input.OrderLineId,
input.ProductId,
lookupValue.ProductName,
input.Quantity
);
}
}

public static class Program
{
public static async Task Main(string[] args)
{
var productCatalog = new Dictionary<int, Product>
{
{ 1, new Product(1, "Laptop", 1200.00m) },
{ 2, new Product(2, "Mouse", 25.00m) },
{ 3, new Product(3, "Keyboard", 75.00m) }
};

var orderLineSource = new InMemorySourceNode<OrderLine>(
new OrderLine(1, 1, 1),
new OrderLine(2, 3, 2),
new OrderLine(3, 99, 1) // Product 99 does not exist
);

var context = PipelineContext.Default;
var runner = new PipelineRunner();

Console.WriteLine("Starting lookup pipeline...");
await runner.RunAsync<LookupPipelineDefinition>(context);
Console.WriteLine("Lookup pipeline finished.");
}
}

public sealed class LookupPipelineDefinition : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var sourceHandle = builder.AddSource<InMemorySourceNode<OrderLine>, OrderLine>("source");
var transformHandle = builder.AddTransform<ProductLookupNode, OrderLine, EnrichedOrderLine>("lookup");
var sinkHandle = builder.AddSink<ConsoleSink<EnrichedOrderLine>, EnrichedOrderLine>("sink");

builder.Connect(sourceHandle, transformHandle);
builder.Connect(transformHandle, sinkHandle);
}
}

Expected Output:

Starting lookup pipeline...
Sink received: EnrichedOrderLine { OrderLineId = 1, ProductId = 1, ProductName = "Laptop", Quantity = 1 }
Sink received: EnrichedOrderLine { OrderLineId = 2, ProductId = 3, ProductName = "Keyboard", Quantity = 2 }
Warning: Product with ID 99 not found for OrderLine 3. Skipping enrichment.
Sink received: EnrichedOrderLine { OrderLineId = 3, ProductId = 99, ProductName = "Unknown Product", Quantity = 1 }
Lookup pipeline finished.

PipelineBuilderLookupExtensions

The PipelineBuilderLookupExtensions provide convenient extension methods for integrating lookup nodes into your pipelines, often simplifying the syntax for common lookup scenarios.

// Example using lookup extension
public sealed class LookupWithExtensionPipelineDefinition : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var sourceHandle = builder.AddSource<CustomerIdSource, int>("source");
var sinkHandle = builder.AddSink<EnrichedCustomerSink, EnrichedCustomer>("sink");

// Use the lookup extension for concise syntax
var lookupHandle = builder.AddLookup<int, EnrichedCustomer>("lookup",
keySelector: (customerId) => customerId,
lookupFunction: async (key, ct) => await GetCustomerDetailsAsync(key, ct),
combineFunction: (input, customerValue) => new EnrichedCustomer(input, customerValue)
);

builder.Connect(sourceHandle, lookupHandle);
builder.Connect(lookupHandle, sinkHandle);
}

private async Task<CustomerDetails> GetCustomerDetailsAsync(int customerId, CancellationToken ct)
{
// Implementation
return new CustomerDetails(customerId, "Customer Name");
}
}

Configuration Pattern

The lookup nodes use a configuration pattern where the lookup data, key extraction logic, and output creation logic are encapsulated in a configuration object. This pattern allows for flexible setup while maintaining clean separation of concerns.

// The configuration pattern used internally
var config = new InMemoryLookupNode<TIn, TKey, TValue, TOut>.Configuration(
lookupData: dictionary, // The data to lookup against
keyExtractor: input => input.Id, // How to extract the key from input
outputCreator: (input, value) => // How to create the output
new EnrichedInput(input, value)
);

Considerations for Lookup Nodes

  • Lookup Source Performance: The performance of your lookup node is heavily dependent on the underlying lookup source. Optimize your data access (e.g., indexing, caching) for frequently accessed lookup data.
  • Memory vs. External Calls: For large lookup datasets, consider whether an InMemoryLookupNode is feasible or if an external lookup (e.g., database query, API call) is more appropriate.
  • Error Handling: Implement robust error handling for lookup failures (e.g., key not found, external service unavailable). Decide whether to skip the item, return a default value, or halt the pipeline.
  • Asynchronous Lookups: Ensure your LookupAsync implementation is truly asynchronous to avoid blocking the pipeline.

Lookup nodes are powerful for creating rich, contextual data streams within your NPipelines.

Next Steps

  • Branch Nodes: Learn about duplicating data streams to multiple downstream paths.