Skip to main content

Data Lake Connector

Data Lake Connector

The NPipeline.Connectors.DataLake package adds table-level semantics on top of the Parquet connector. It provides:

  • Hive-style partitioning — automatically routes records into column=value/ directory hierarchies
  • Manifest tracking — an NDJSON-based file inventory that records snapshot IDs, row counts, and partition metadata
  • Time travel — read the table as of any past timestamp or snapshot ID
  • Small-file compaction — merge many small files into fewer, query-optimised files
  • Format adapter interface — extend to Iceberg, Delta Lake, or custom table formats

Parquet is the default storage format. All file I/O uses the NPipeline storage abstraction so the same connector works with local files, S3, Azure Blob Storage, and any other IStorageProvider.

Installation

dotnet add package NPipeline.Connectors.DataLake

This automatically pulls in NPipeline.Connectors.Parquet, NPipeline.Connectors, and NPipeline.StorageProviders.

For the core NPipeline package and other available extensions, see the Installation Guide.

Relationship to the Parquet Connector

NPipeline.Connectors.Parquet handles single-file columnar I/O. NPipeline.Connectors.DataLake builds on top of it to add table-level concerns:

ConcernParquet ConnectorData Lake Connector
Read / write Parquet I/O✓ (via Parquet)
Column projection
Hive-style partitioning
Snapshot / manifest
Time travel
Small-file compaction

You can use each package independently or together.

Partition Specifications

A PartitionSpec<T> defines which properties determine the partition path. The fluent API compiles property expressions into efficient delegates at startup.

using NPipeline.Connectors.DataLake.Partitioning;

public class SalesRecord
{
public long Id { get; set; }
public string ProductName { get; set; } = string.Empty;
public decimal Amount { get; set; }
public DateTime EventDate { get; set; }
public string Region { get; set; } = string.Empty;
}

// Single-level — produces: event_date=2025-01-15/
var spec = PartitionSpec<SalesRecord>.By(x => x.EventDate);

// Multi-level — produces: event_date=2025-01-15/region=EU/
var spec = PartitionSpec<SalesRecord>
.By(x => x.EventDate)
.ThenBy(x => x.Region);

// Custom column names
var spec = PartitionSpec<SalesRecord>
.By(x => x.EventDate, "date")
.ThenBy(x => x.Region, "geo");

// No partitioning — all files written to the table root
var spec = PartitionSpec<SalesRecord>.None();

Column names default to snake_case (e.g., EventDateevent_date), matching Hive conventions and ensuring compatibility with Spark, Athena, Trino, and similar query engines.

Hive-Style Path Formatting

CLR TypePath FormatExample
DateOnlyyyyy-MM-dd2025-01-15
DateTimeyyyy-MM-dd-HH-mm-ss2025-01-15-14-30-00
DateTimeOffsetyyyy-MM-dd-HH-mm-ss2025-01-15-14-30-00
stringURL-encodedHello%20World
enumLowercase nameactive
GuidLowercase D formata1b2c3d4-...
Numeric typesInvariant culture12345, 3.14

DataLakePartitionedSinkNode<T>

DataLakePartitionedSinkNode<T> receives a stream of records and writes them to a partitioned table, flushing row groups as buffers fill.

Constructors

// Resolver-based (default resolver used when null)
public DataLakePartitionedSinkNode(
StorageUri tableBasePath,
PartitionSpec<T>? partitionSpec = null,
IStorageResolver? resolver = null,
ParquetConfiguration? configuration = null)

// Direct provider injection
public DataLakePartitionedSinkNode(
IStorageProvider provider,
StorageUri tableBasePath,
PartitionSpec<T>? partitionSpec = null,
ParquetConfiguration? configuration = null)

Example: Partitioned Write

using NPipeline.Connectors.DataLake;
using NPipeline.Connectors.DataLake.Partitioning;
using NPipeline.Connectors.Parquet;
using NPipeline.Pipeline;
using NPipeline.StorageProviders;
using NPipeline.StorageProviders.Models;

public sealed class SalesWritePipeline : IPipelineDefinition
{
private static readonly StorageUri TableUri =
StorageUri.Parse("s3://warehouse/sales_table/");

public void Define(PipelineBuilder builder, PipelineContext context)
{
var resolver = StorageProviderFactory.CreateResolver();
var partitionSpec = PartitionSpec<SalesRecord>
.By(x => x.EventDate)
.ThenBy(x => x.Region);

var source = builder.AddSource<SalesSourceNode, SalesRecord>("source");

var sink = builder.AddSink(
new DataLakePartitionedSinkNode<SalesRecord>(
StorageProviderFactory.GetProviderOrThrow(resolver, TableUri),
TableUri,
partitionSpec,
new ParquetConfiguration
{
RowGroupSize = 100_000,
TargetFileSizeBytes = 512L * 1024 * 1024
}),
"lake_sink");

builder.Connect(source, sink);
}
}

Generated directory structure:

sales_table/
├── _manifest/
│ ├── manifest.ndjson
│ └── snapshots/
│ └── 20250215093045abcd1234.ndjson
├── event_date=2025-01-15/
│ ├── region=EU/
│ │ └── part-0001.parquet
│ └── region=US/
│ └── part-0001.parquet
└── event_date=2025-01-16/
└── region=APAC/
└── part-0001.parquet

Each write operation creates a new snapshot ID and appends entries to the manifest.

DataLakeTableSourceNode<T>

DataLakeTableSourceNode<T> reads a table by consulting the manifest to discover data files, then streams all rows.

Constructors

// Latest snapshot
public DataLakeTableSourceNode(
StorageUri tableBasePath,
IStorageResolver? resolver = null,
ParquetConfiguration? configuration = null)

// Direct provider
public DataLakeTableSourceNode(
IStorageProvider provider,
StorageUri tableBasePath,
ParquetConfiguration? configuration = null)

// Time travel — as of a timestamp
public DataLakeTableSourceNode(
StorageUri tableBasePath,
DateTimeOffset asOf,
IStorageResolver? resolver = null,
ParquetConfiguration? configuration = null)

// Time travel — specific snapshot ID
public DataLakeTableSourceNode(
StorageUri tableBasePath,
string snapshotId,
IStorageResolver? resolver = null,
ParquetConfiguration? configuration = null)

Example: Reading the Latest Snapshot

var source = builder.AddSource(
new DataLakeTableSourceNode<SalesRecord>(provider, TableUri),
"lake_source");

Example: Time Travel

// By timestamp — returns data as it existed at that moment
var asOf = new DateTimeOffset(2025, 1, 15, 12, 0, 0, TimeSpan.Zero);
var source = new DataLakeTableSourceNode<SalesRecord>(provider, TableUri, asOf);

// By snapshot ID — pinpoints an exact write operation
var source = new DataLakeTableSourceNode<SalesRecord>(
provider,
TableUri,
snapshotId: "20250215093045abcd1234");

Time travel is useful for debugging, point-in-time reporting, and auditing data at earlier states.

DataLakeTableWriter<T>

DataLakeTableWriter<T> is a lower-level type used when you need explicit control over snapshot lifecycle outside of a pipeline node. It exposes AppendAsync and implements IAsyncDisposable to flush the manifest on completion.

using NPipeline.Connectors.DataLake;

await using var writer = new DataLakeTableWriter<SalesRecord>(
provider,
tableUri,
partitionSpec,
new ParquetConfiguration { RowGroupSize = 100_000 });

Console.WriteLine($"Snapshot ID: {writer.SnapshotId}");

var dataPipe = new InMemoryDataPipe<SalesRecord>(records, "SalesData");
await writer.AppendAsync(dataPipe, CancellationToken.None);
// Manifest is flushed when the writer is disposed

Manifest

Every write appends entries to _manifest/manifest.ndjson (NDJSON format). Each ManifestEntry contains:

FieldDescription
pathRelative path from the table base
row_countNumber of rows in this file
written_atUTC timestamp of the write
file_size_bytesFile size in bytes
partition_valuesMap of partition column names to values
snapshot_idID of the containing snapshot
content_hashOptional SHA-256 hash for integrity
file_format"parquet"
compressionCodec used (e.g., "snappy")

NDJSON is used because it supports append-only writes and allows streaming reads — no full-file rewrite is required when adding new entries.

Inspecting the Manifest Programmatically

using NPipeline.Connectors.DataLake.Manifest;

var reader = new ManifestReader(provider, tableUri);
var entries = await reader.ReadAllAsync(cancellationToken);

foreach (var entry in entries)
{
Console.WriteLine($"{entry.Path}: {entry.RowCount:N0} rows, {entry.FileSizeBytes:N0} B");
Console.WriteLine($" Snapshot: {entry.SnapshotId} Written: {entry.WrittenAt:u}");
}

var snapshotIds = await reader.GetSnapshotIdsAsync(cancellationToken);

Compaction

Small files reduce query performance because each file incurs metadata overhead and network round trips. DataLakeCompactor consolidates files below a size threshold into larger files while updating the manifest.

using NPipeline.Connectors.DataLake;
using NPipeline.Connectors.DataLake.FormatAdapters;

var compactor = new DataLakeCompactor(provider, tableUri);

// Dry run first to see impact
var request = new TableCompactRequest
{
SmallFileThresholdBytes = 32L * 1024 * 1024, // compact files < 32 MB
TargetFileSizeBytes = 256L * 1024 * 1024, // target 256 MB output files
MinFilesToCompact = 5,
MaxFilesToCompact = 100,
DeleteOriginalFiles = true,
DryRun = true
};

var preview = await compactor.CompactAsync(request, cancellationToken);
Console.WriteLine($"Would compact {preview.FilesCompacted} → {preview.FilesCreated} files");

// Apply for real
var result = await compactor.CompactAsync(request with { DryRun = false }, cancellationToken);
Console.WriteLine($"Done in {result.Duration.TotalSeconds:N1}s: {result.BytesBefore:N0} → {result.BytesAfter:N0} B");

Run compaction as a scheduled background job, triggered when the manifest contains more than a threshold number of small files.

ParquetConfiguration for Data Lake

DataLakePartitionedSinkNode and DataLakeTableSourceNode accept the same ParquetConfiguration as the Parquet connector. The most relevant options are:

PropertyRecommended ValueReason
RowGroupSize100,000+Larger groups improve query engine scan speed
TargetFileSizeBytes256 MB1 GBFewer files means lower metadata overhead
MaxBufferedRows500,000Prevents OOM during high-cardinality partition fan-out
CompressionSnappyFast codec compatible with all major query engines

See the Parquet connector docs for the full reference.

Format Adapter Interface

Implement ITableFormatAdapter to target alternative table formats such as Apache Iceberg or Delta Lake:

using NPipeline.Connectors.DataLake.FormatAdapters;

public class IcebergFormatAdapter : ITableFormatAdapter
{
public string Name => "iceberg";

public Task AppendAsync(TableAppendRequest request, CancellationToken ct)
{
// Write Iceberg metadata files and update snapshot log
}

public Task<TableSnapshot> GetSnapshotAsync(TableSnapshotRequest request, CancellationToken ct)
{
// Resolve snapshot from Iceberg metadata
}

public Task<TableCompactResult> CompactAsync(TableCompactRequest request, CancellationToken ct)
{
// Compact with Iceberg metadata updates
}

public Task<IReadOnlyList<SnapshotSummary>> ListSnapshotsAsync(StorageUri tableBasePath, CancellationToken ct)
{
// Read Iceberg snapshot log
}

public Task<bool> TableExistsAsync(StorageUri tableBasePath, CancellationToken ct)
{
// Check for Iceberg metadata files
}

public Task CreateTableAsync(StorageUri tableBasePath, CancellationToken ct)
{
// Initialise Iceberg table metadata
}
}

Production Considerations

File sizing. Target 256 MB–1 GB per output file. Use TargetFileSizeBytes on ParquetConfiguration and run the compactor periodically to recover from streaming ingestion that naturally produces many small files.

Memory during fan-out. High-cardinality partition keys (e.g., user_id) can create thousands of open partition buffers. Set MaxBufferedRows to a value your heap can accommodate, and reduce RowGroupSize to increase flush frequency.

Idempotent writes. Pass an IdempotencyKey on the append request to prevent duplicate entries when a pipeline is retried after failure.

Manifest growth. The main manifest file grows with each write. Once it is large enough to affect read latency, snapshot files provide isolation — each snapshot is stored separately under _manifest/snapshots/ and referenced from the top-level manifest.

Compaction scheduling. Run compaction when the manifest contains more than ~10–20 small files per partition. A lightweight cron or Azure Function / AWS Lambda reading ManifestReader.ReadAllAsync to count small files is sufficient.

Complete Pipeline Example

using NPipeline.Connectors.DataLake;
using NPipeline.Connectors.DataLake.Partitioning;
using NPipeline.Connectors.Parquet;
using NPipeline.Connectors.Parquet.Attributes;
using NPipeline.Pipeline;
using NPipeline.StorageProviders;
using NPipeline.StorageProviders.Models;

public class SalesRecord
{
[ParquetColumn("sale_id")]
public long Id { get; set; }

[ParquetColumn("product_name")]
public string ProductName { get; set; } = string.Empty;

[ParquetDecimal(18, 2)]
public decimal Amount { get; set; }

public DateTime EventDate { get; set; } // Partition key 1
public string Region { get; set; } = string.Empty; // Partition key 2
}

public sealed class DataLakePipeline : IPipelineDefinition
{
private static readonly StorageUri TableUri =
StorageUri.Parse("s3://warehouse/sales_table/");

public void Define(PipelineBuilder builder, PipelineContext context)
{
var resolver = StorageProviderFactory.CreateResolver();
var provider = StorageProviderFactory.GetProviderOrThrow(resolver, TableUri);

var partitionSpec = PartitionSpec<SalesRecord>
.By(x => x.EventDate)
.ThenBy(x => x.Region);

var config = new ParquetConfiguration
{
RowGroupSize = 100_000,
TargetFileSizeBytes = 512L * 1024 * 1024
};

// Read the table as of a specific date for reprocessing
var asOf = new DateTimeOffset(2025, 1, 15, 0, 0, 0, TimeSpan.Zero);
var source = builder.AddSource(
new DataLakeTableSourceNode<SalesRecord>(provider, TableUri, asOf),
"lake_source");

var transform = builder.AddTransform<SalesTransform, SalesRecord, SalesRecord>("transform");

// Write the results back to the same table (new snapshot)
var sink = builder.AddSink(
new DataLakePartitionedSinkNode<SalesRecord>(
provider,
TableUri,
partitionSpec,
config),
"lake_sink");

builder.Connect(source, transform);
builder.Connect(transform, sink);
}
}