Skip to main content

Azure Cosmos DB Connector

Azure Cosmos DB Connector

The NPipeline.Connectors.Azure.CosmosDb package provides specialized source and sink nodes for working with Azure Cosmos DB. This allows you to easily integrate Cosmos DB data into your pipelines as an input source or an output destination across multiple API types.

This connector supports the SQL API with native change feed capabilities, plus Mongo API and Cassandra API adapters for multi-model support. It uses the Azure.Cosmos SDK for reliable operations.

Installation

To use the Cosmos DB connector, install the NPipeline.Connectors.Azure.CosmosDb NuGet package:

dotnet add package NPipeline.Connectors.Azure.CosmosDb

For the core NPipeline package and other available extensions, see the Installation Guide.

Features

The Cosmos DB connector provides the following capabilities:

  • SQL API Source Node: Read data using Cosmos DB SQL queries with parameterization
  • Change Feed Source Node: Real-time streaming from Cosmos DB Change Feed
  • Sink Node: Write data with multiple strategies for different use cases
  • Write Strategies: PerRow, Batch, TransactionalBatch, and Bulk execution modes
  • Partition Key Handling: Attribute-based, explicit selector, or automatic detection
  • Azure AD Authentication: Connection strings and managed identity support via DefaultAzureCredential
  • Multi-API Support: SQL, Mongo API, and Cassandra API with dedicated nodes
  • StorageUri Configuration: Environment-aware setup via URI schemes
  • Connection Pooling: Efficient resource management through dependency injection

Dependency Injection

The Cosmos DB connector supports dependency injection for managing connections and factories. This is the recommended approach for production applications.

Registering the Connector

Use AddCosmosDbConnector to register connection management and node factories:

using Microsoft.Extensions.DependencyInjection;
using NPipeline.Connectors.Azure.CosmosDb.DependencyInjection;

var services = new ServiceCollection()
.AddCosmosDbConnector(options =>
{
// Using connection string
options.DefaultConnectionString = "AccountEndpoint=https://your-account.documents.azure.com:443/;AccountKey=your-key;";

// Or using Azure AD
// options.DefaultUri = new Uri("https://your-account.documents.azure.com:443/");
// options.DefaultCredential = new DefaultAzureCredential();

// Add named connections
options.AddOrUpdateConnection("secondary", "secondary-connection-string");
})
.BuildServiceProvider();

var sourceFactory = services.GetRequiredService<CosmosSourceNodeFactory>();
var sinkFactory = services.GetRequiredService<CosmosSinkNodeFactory>();

Why Use Dependency Injection?

Using dependency injection provides several benefits:

  • Connection Pooling: Efficiently reuses connections across multiple nodes
  • Configuration Centralization: All Cosmos DB connections configured in one place
  • Testability: Easy to mock or replace dependencies in unit tests
  • Lifetime Management: Services properly disposed when the application shuts down

Common Attributes

The Cosmos DB connector supports common attributes from NPipeline.Connectors.Attributes for consistent data mapping across all connectors.

[Column] Attribute

The [Column] attribute allows you to specify property names and control mapping:

using NPipeline.Connectors.Attributes;

public class Customer
{
[Column("customer_id")]
public string Id { get; set; } = string.Empty;

[Column("customer_type")]
public string CustomerType { get; set; } = string.Empty;

public string Name { get; set; } = string.Empty;
public string Email { get; set; } = string.Empty;
}

[IgnoreColumn] Attribute

The [IgnoreColumn] attribute excludes properties from mapping. Useful for computed properties:

using NPipeline.Connectors.Attributes;

public class Order
{
public string Id { get; set; } = string.Empty;
public decimal Subtotal { get; set; }
public decimal Tax { get; set; }

[IgnoreColumn]
public decimal Total => Subtotal + Tax;
}

[CosmosPartitionKey] Attribute

Cosmos DB-specific attribute to mark partition key properties:

using NPipeline.Connectors.Azure.CosmosDb.Mapping;

public class Customer
{
public string Id { get; set; } = string.Empty;

[CosmosPartitionKey]
public string Region { get; set; } = string.Empty;

public string Name { get; set; } = string.Empty;
}

SQL API: Query Source Node

The CosmosSourceNode<T> reads data using Cosmos DB SQL queries.

Basic Example

using NPipeline.Connectors.Azure.CosmosDb.Nodes;

var sourceNode = new CosmosSourceNode<Customer>(
connectionString: "your-connection-string",
databaseId: "MyDatabase",
containerId: "Customers",
query: "SELECT * FROM c WHERE c.Status = @status",
parameters: [new DatabaseParameter("status", "Active")]);

var pipeline = PipelineBuilder.Create<Customer>()
.Source(sourceNode)
.Transform(customer => new CustomerDto { /* ... */ })
.Sink(consoleSink)
.Build();

Using StorageUri

Configure connections using environment-aware URIs:

var uri = StorageUri.Parse("cosmosdb://account.documents.azure.com:443/MyDatabase/Customers?key=account-key");

var sourceNode = new CosmosSourceNode<Customer>(
uri: uri,
query: "SELECT * FROM c WHERE c.Region = @region",
parameters: [new DatabaseParameter("region", "US")]);

Change Feed Source Node

The CosmosChangeFeedSourceNode<T> enables real-time streaming from the Cosmos DB Change Feed.

using NPipeline.Connectors.Azure.CosmosDb.ChangeFeed;
using NPipeline.Connectors.Azure.CosmosDb.Configuration;

var changeFeedConfig = new ChangeFeedConfiguration
{
StartFrom = ChangeFeedStartFrom.Beginning,
PollingInterval = TimeSpan.FromSeconds(1),
MaxItemCount = 100
};

var changeFeedSource = new CosmosChangeFeedSourceNode<Order>(
connectionString: "your-connection-string",
databaseId: "MyDatabase",
containerId: "Orders",
configuration: changeFeedConfig);

var pipeline = PipelineBuilder.Create<Order>()
.Source(changeFeedSource)
.Transform(order => ProcessOrder(order))
.Sink(orderSink)
.Build();

Sink Node

The CosmosSinkNode<T> writes data to Cosmos DB with configurable strategies.

using NPipeline.Connectors.Azure.CosmosDb.Nodes;
using NPipeline.Connectors.Azure.CosmosDb.Configuration;

var sinkNode = new CosmosSinkNode<Customer>(
connectionString: "your-connection-string",
databaseId: "MyDatabase",
containerId: "Customers",
writeStrategy: CosmosWriteStrategy.Batch,
idSelector: c => c.Id,
partitionKeySelector: c => new PartitionKey(c.Region));

var pipeline = PipelineBuilder.Create<CustomerDto>()
.Source(customerSource)
.Transform(dto => new Customer { /* ... */ })
.Sink(sinkNode)
.Build();

Write Strategies

Choose the write strategy that matches your requirements:

PerRow

Writes items one at a time. Best for:

  • Small data volumes
  • When immediate consistency is required
  • Individual error handling per item

Batch

Writes items in parallel batches. Best for:

  • High-throughput scenarios
  • Items distributed across partitions
  • When some failures are acceptable

TransactionalBatch

Writes items atomically within the same partition. Best for:

  • When you need ACID guarantees
  • Related items in the same partition
  • Financial or critical data

Bulk

Uses Cosmos DB bulk execution mode. Best for:

  • Maximum throughput
  • Large data migrations
  • When operation order doesn't matter

Partition Key Handling

Cosmos DB requires proper partition key configuration for scalable operations.

Attribute-Based

Use [CosmosPartitionKey] to automatically detect the partition key:

public class Customer
{
public string Id { get; set; } = string.Empty;

[CosmosPartitionKey]
public string Region { get; set; } = string.Empty;
}

Explicit Selector

Specify partition key selection in the sink node:

var sinkNode = new CosmosSinkNode<Customer>(
/* ... */
partitionKeySelector: c => new PartitionKey(c.Region));

No Partition Key

For containers without partition key requirements, PartitionKey.None is used automatically.

Mongo API

Access Cosmos DB Mongo API with dedicated nodes.

using NPipeline.Connectors.Azure.CosmosDb.Nodes;

var mongoSource = new CosmosMongoSourceNode<BsonDocument>(
connectionString: "mongodb://user:pass@account.mongo.cosmos.azure.com:10255/?ssl=true",
databaseId: "MyDatabase",
containerId: "Customers",
query: "{ \"status\": \"active\" }");

var mongoSink = new CosmosMongoSinkNode<BsonDocument>(
connectionString: "mongodb://user:pass@account.mongo.cosmos.azure.com:10255/?ssl=true",
databaseId: "MyDatabase",
containerId: "Customers",
writeStrategy: CosmosWriteStrategy.Bulk);

Cassandra API

Access Cosmos DB Cassandra API with dedicated nodes.

using NPipeline.Connectors.Azure.CosmosDb.Api.Cassandra;
using NPipeline.Connectors.Azure.CosmosDb.Nodes;

var cassandraSource = new CosmosCassandraSourceNode<Dictionary<string, object?>>(
contactPoint: "account.cassandra.cosmos.azure.com",
keyspace: "my_keyspace",
query: "SELECT id, status FROM orders WHERE status = 'open';");

var cassandraSink = new CosmosCassandraSinkNode<Dictionary<string, object?>>(
contactPoint: "account.cassandra.cosmos.azure.com",
keyspace: "my_keyspace",
writeStrategy: CosmosWriteStrategy.Batch);

Note: Cassandra change feed is not supported as a native Cosmos DB feature. Use polling or external change data capture (CDC) for streaming requirements.

Configuration

CosmosConfiguration

PropertyTypeDefaultDescription
CommandTimeoutint30Command timeout in seconds
FetchSizeint100Number of items per request
StreamResultsboolfalseStream results vs. buffer
WriteBatchSizeint100Batch size for writes
MaxConcurrencyint?nullMax concurrent operations
ContinueOnErrorboolfalseContinue on row-level errors

ChangeFeedConfiguration

PropertyTypeDefaultDescription
StartFromChangeFeedStartFromBeginningStart position (Beginning, Now, Time)
StartTimeDateTime?nullStart time for time-based start
PollingIntervalTimeSpan1 secondInterval between polls
MaxItemCountint100Max items per poll
ContinueOnErrorboolfalseContinue on errors

Error Handling

Configure error handling to match your resilience requirements:

var config = new CosmosConfiguration
{
ContinueOnError = true, // Continue processing on errors
ThrowOnMappingError = false // Don't throw on mapping issues
};

Custom Mapping

Provide custom mapping logic for complex transformations:

var sourceNode = new CosmosSourceNode<Customer>(
connectionString: "your-connection-string",
databaseId: "MyDatabase",
containerId: "Customers",
query: "SELECT * FROM c",
mapper: row => new Customer
{
Id = row.Get<string>("id") ?? string.Empty,
Name = row.Get<string>("name") ?? string.Empty,
Email = row.GetValue("email")?.ToString() ?? string.Empty
});

Next Steps