Skip to main content

RabbitMQ Connector

RabbitMQ Connector

The NPipeline.Connectors.RabbitMQ package provides specialized source and sink nodes for working with RabbitMQ. This allows you to integrate RabbitMQ message queues into your pipelines as an input source, an output destination, or both.

This connector uses the official RabbitMQ.Client 7.x library, providing fully asynchronous operations with IChannel, push-based consumers with backpressure, publisher confirms, quorum queue support, automatic topology declaration, and comprehensive dead-letter handling.

Installation

dotnet add package NPipeline.Connectors.RabbitMQ

For the core NPipeline package and other available extensions, see the Installation Guide.

Quick Start

Here's a minimal pipeline that consumes messages from a RabbitMQ queue, processes them, and publishes results to another exchange:

using NPipeline.Connectors.RabbitMQ.Configuration;
using NPipeline.Connectors.RabbitMQ.DependencyInjection;
using NPipeline.Extensions.DependencyInjection;

services.AddNPipeline(Assembly.GetExecutingAssembly());

// Register RabbitMQ connection
services.AddRabbitMq(o =>
{
o.HostName = "localhost";
o.Port = 5672;
o.UserName = "guest";
o.Password = "guest";
});

// Register source and sink
services.AddRabbitMqSource<OrderEvent>(new RabbitMqSourceOptions
{
QueueName = "orders",
PrefetchCount = 100,
});
services.AddRabbitMqSink<EnrichedOrder>(new RabbitMqSinkOptions
{
ExchangeName = "enriched-orders",
RoutingKey = "order.enriched",
});

Configuration

Connection Options

The RabbitMqConnectionOptions class configures the connection to the RabbitMQ broker:

services.AddRabbitMq(o =>
{
o.HostName = "rabbitmq.example.com";
o.Port = 5672;
o.VirtualHost = "/production";
o.UserName = "app-user";
o.Password = "secret";
o.ClientProvidedName = "my-service"; // Visible in RabbitMQ Management UI
o.AutomaticRecoveryEnabled = true; // Default: true
o.TopologyRecoveryEnabled = true; // Default: true
o.RequestedHeartbeat = TimeSpan.FromSeconds(60);
o.MaxChannelPoolSize = 4; // Default: 4
});

AMQP URI

You can use an AMQP URI instead of individual connection properties:

services.AddRabbitMq(o =>
{
o.Uri = new Uri("amqp://user:pass@host:5672/vhost");
});

TLS

services.AddRabbitMq(o =>
{
o.HostName = "rabbitmq.example.com";
o.Port = 5671;
o.Tls = new RabbitMqTlsOptions
{
Enabled = true,
ServerName = "rabbitmq.example.com",
CertificatePath = "/path/to/client.pfx",
CertificatePassphrase = "cert-password",
};
});

Source Options

The RabbitMqSourceOptions configures the consumer node:

services.AddRabbitMqSource<MyMessage>(new RabbitMqSourceOptions
{
QueueName = "my-queue", // Required
PrefetchCount = 100, // QoS prefetch (default: 100)
InternalBufferCapacity = 1000, // Backpressure buffer (default: 1000)
ConsumerDispatchConcurrency = 1, // Preserves ordering (default: 1)
MaxDeliveryAttempts = 5, // Poison message rejection (default: 5)
RejectOnMaxDeliveryAttempts = true, // Reject without requeue (default: true)
ContinueOnDeserializationError = false, // Reject bad messages (default: false)
RequeueOnNack = true, // Requeue nack'd messages (default: true)
});

Sink Options

The RabbitMqSinkOptions configures the publisher node:

services.AddRabbitMqSink<MyMessage>(new RabbitMqSinkOptions
{
ExchangeName = "my-exchange", // Required (use "" for default exchange)
RoutingKey = "my.routing.key", // Default routing key
EnablePublisherConfirms = true, // Wait for broker confirms (default: true)
Persistent = true, // Durable messages (default: true)
Mandatory = false, // Return unroutable messages (default: false)
MaxRetries = 3, // Retry on transient failure (default: 3)
RetryBaseDelayMs = 100, // Exponential backoff base (default: 100)
ContinueOnError = false, // Stop pipeline on publish error (default: false)
});

Dynamic Routing Keys

services.AddRabbitMqSink<MyMessage>(new RabbitMqSinkOptions
{
ExchangeName = "events",
RoutingKeySelector = obj =>
{
if (obj is MyMessage msg)
return $"events.{msg.Type.ToLowerInvariant()}";
return "events.unknown";
},
});

Batch Publishing

services.AddRabbitMqSink<MyMessage>(new RabbitMqSinkOptions
{
ExchangeName = "my-exchange",
Batching = new BatchPublishOptions
{
BatchSize = 100,
LingerTime = TimeSpan.FromMilliseconds(50),
},
});

Topology Auto-Declaration

Both source and sink nodes can automatically declare exchanges, queues, and bindings at startup. This is useful for development and testing but can be disabled in production where topology is managed externally.

Source Topology

services.AddRabbitMqSource<MyMessage>(new RabbitMqSourceOptions
{
QueueName = "orders",
Topology = new RabbitMqTopologyOptions
{
AutoDeclare = true,
Durable = true,
QueueType = QueueType.Quorum, // Classic, Quorum, or Stream
DeadLetterExchange = "orders-dlx", // Broker-level dead-letter exchange
DeadLetterRoutingKey = "orders.dead",
MessageTtlMs = 60_000, // Per-queue message TTL
MaxLength = 100_000, // Max queue length
Bindings =
[
new BindingOptions("orders-exchange", "order.created"),
new BindingOptions("orders-exchange", "order.updated"),
],
ExchangeType = "topic",
},
});

Sink Topology

services.AddRabbitMqSink<MyMessage>(new RabbitMqSinkOptions
{
ExchangeName = "enriched-orders",
Topology = new RabbitMqTopologyOptions
{
AutoDeclare = true,
Durable = true,
ExchangeType = "topic",
},
});

Message Model

The RabbitMqMessage<T> class wraps consumed messages with RabbitMQ-specific metadata and ack/nack capabilities:

public class OrderProcessor : TransformNode<RabbitMqMessage<OrderEvent>, EnrichedOrder>
{
public override async Task<EnrichedOrder> ExecuteAsync(
RabbitMqMessage<OrderEvent> input,
PipelineContext context,
CancellationToken cancellationToken)
{
// Access the deserialized body
var order = input.Body;

// Access RabbitMQ metadata
Console.WriteLine($"Exchange: {input.Exchange}");
Console.WriteLine($"Routing Key: {input.RoutingKey}");
Console.WriteLine($"Delivery Tag: {input.DeliveryTag}");
Console.WriteLine($"Redelivered: {input.Redelivered}");
Console.WriteLine($"Correlation ID: {input.CorrelationId}");

// Manual acknowledgment (if not using AutoOnSinkSuccess)
await input.AcknowledgeAsync(cancellationToken);

// Or negative acknowledgment with requeue
// await input.NegativeAcknowledgeAsync(requeue: true, cancellationToken);

return new EnrichedOrder(order, DateTime.UtcNow);
}
}

Acknowledgment Strategies

StrategyDescription
AutoOnSinkSuccessMessages are acknowledged after the sink node successfully publishes them. This is the default and provides at-least-once delivery.
ManualYour code must call AcknowledgeAsync() or NegativeAcknowledgeAsync() explicitly.

Thread-Safe Ack State Machine

The RabbitMqMessage<T> uses atomic Interlocked.CompareExchange for ack state transitions:

  • PendingAcknowledged (via AcknowledgeAsync)
  • PendingNacked (via NegativeAcknowledgeAsync)
  • Double-ack is idempotent (no-op)
  • Ack after nack (or vice versa) throws InvalidOperationException

Dead-Letter Handling

Broker-Level Dead Lettering

Configure DLX via topology options — the broker automatically routes rejected/expired messages:

Topology = new RabbitMqTopologyOptions
{
DeadLetterExchange = "my-dlx",
DeadLetterRoutingKey = "dead-letter",
}

Pipeline-Level Dead Lettering

For transform/processing failures, use RabbitMqDeadLetterSink:

var deadLetterSink = new RabbitMqDeadLetterSink(
connectionManager,
deadLetterExchange: "pipeline-dlx",
routingKey: "pipeline.errors");

Dead-lettered messages include enriched headers:

HeaderDescription
x-death-reasonThe exception message
x-death-nodeThe pipeline node ID that failed
x-death-timestampISO 8601 timestamp of the failure
x-death-exception-typeThe .NET exception type name
x-death-stack-traceTruncated stack trace (max 2048 chars)
x-original-exchangeOriginal exchange (if from RabbitMQ source)
x-original-routing-keyOriginal routing key
x-original-message-idOriginal message ID

Connection Management

The RabbitMqConnectionManager provides:

  • Lazy connection creation — connects on first use
  • Automatic recovery — reconnects on connection loss
  • Channel pooling — bounded pool with configurable size for publisher channels
  • Publisher confirms — pooled channels have confirms enabled automatically

Consumer channels are created separately (not pooled) since they are long-lived.

Metrics

Implement IRabbitMqMetrics to capture connector metrics. The default NullRabbitMqMetrics is a no-op:

public class PrometheusRabbitMqMetrics : IRabbitMqMetrics
{
// Source metrics
public void RecordConsumed(string queue, int count) { /* ... */ }
public void RecordConsumeLatency(string queue, double milliseconds) { /* ... */ }
public void RecordDeserializationError(string queue) { /* ... */ }
public void RecordAck(string queue, int count) { /* ... */ }
public void RecordNack(string queue, int count, bool requeued) { /* ... */ }

// Sink metrics
public void RecordPublished(string exchange, string routingKey, int count) { /* ... */ }
public void RecordPublishLatency(string exchange, double milliseconds) { /* ... */ }
public void RecordConfirmLatency(string exchange, double milliseconds) { /* ... */ }
// ... and more
}

// Register before AddRabbitMq to override the default no-op
services.AddSingleton<IRabbitMqMetrics, PrometheusRabbitMqMetrics>();

Serialization

The connector uses IMessageSerializer for message serialization. The default RabbitMqJsonSerializer uses System.Text.Json with camelCase naming.

Override with a custom serializer:

services.AddSingleton<IMessageSerializer, MyCustomSerializer>();
services.AddRabbitMq(o => { /* ... */ });

Architecture

                    ┌──────────────────────────────────┐
│ RabbitMQ Broker │
│ │
│ ┌──────────┐ ┌─────────────┐ │
│ │ Exchange │──▶│ Queue │ │
│ └──────────┘ └──────┬──────┘ │
└────────────────────────┼────────┘
│ Push (BasicDeliver)
┌────────────────────────▼────────┐
│ AsyncEventingBasicConsumer │
│ (prefetch QoS controls rate) │
└────────────────────────┬────────┘
│ Write
┌────────────────────────▼────────┐
│ Bounded Channel<T> │
│ (backpressure buffer) │
└────────────────────────┬────────┘
│ ReadAllAsync
┌────────────────────────▼────────┐
│ IAsyncEnumerable<T> / DataPipe │
│ (NPipeline streaming surface) │
└────────────────────────┬────────┘

┌────────────────────────▼────────┐
│ Transform Nodes │
│ (enrichment, filtering, etc.) │
└────────────────────────┬────────┘

┌────────────────────────▼────────┐
│ RabbitMqSinkNode<T> │
│ (BasicPublishAsync + confirms) │
└────────────────────────┬────────┘
│ Publish
┌────────────────────────▼────────┐
│ RabbitMQ Broker (output) │
└─────────────────────────────────┘

Sample

See the Sample_RabbitMqConnector for a complete working example with docker-compose.

Troubleshooting

Consumer stops receiving messages

  • Check that PrefetchCount is not set to 0 (minimum is 1)
  • Verify the queue exists and has messages using rabbitmqctl list_queues
  • Check consumer tag in RabbitMQ Management UI under Connections/Channels

Publisher confirms timeout

  • Ensure the channel was created with PublisherConfirmationsEnabled (automatic with DI)
  • Check broker health and disk space (alarms block publishing)
  • Increase ConfirmTimeout if the broker is under heavy load

Messages not routed

  • Verify exchange type matches the routing key pattern
  • Check bindings in RabbitMQ Management UI
  • Enable Mandatory = true on the sink to get unroutable message notifications