Valir supports multiple message brokers for event-driven architectures.
The IEventBroker interface provides a unified API for publishing and subscribing to events:
public interface IEventBroker
{
Task PublishAsync(string topic, EventEnvelope envelope);
Task SubscribeAsync(string topic, string subscriptionId,
Func<EventEnvelope, Task> onMessage, CancellationToken ct);
Task UnsubscribeAsync(string topic, string subscriptionId);
}dotnet add package Valir.Brokers.Kafkabuilder.Services.AddValirKafka(options =>
{
options.BootstrapServers = "localhost:9092";
options.GroupId = "my-service";
options.EnableAutoCommit = false; // At-least-once delivery
options.AutoOffsetReset = "earliest";
});| Property | Type | Default | Description |
|---|---|---|---|
BootstrapServers |
string |
localhost:9092 |
Kafka broker addresses |
GroupId |
string |
required | Consumer group ID |
EnableAutoCommit |
bool |
false |
Auto-commit offsets |
AutoOffsetReset |
string |
earliest |
Where to start reading |
SecurityProtocol |
string |
Plaintext |
SASL security |
SaslMechanism |
string |
- | SASL mechanism |
SaslUsername |
string |
- | SASL username |
SaslPassword |
string |
- | SASL password |
app.MapPost("/events", async (IEventBroker broker, OrderCreatedEvent evt) =>
{
var envelope = new EventEnvelope(
Id: Guid.NewGuid().ToString("N"),
Topic: "orders.created",
Payload: JsonSerializer.SerializeToUtf8Bytes(evt),
PublishedAt: DateTimeOffset.UtcNow
);
await broker.PublishAsync("orders.created", envelope);
return Results.Accepted();
});dotnet add package Valir.Brokers.RabbitMQbuilder.Services.AddValirRabbitMQ(options =>
{
options.HostName = "localhost";
options.Port = 5672;
options.UserName = "guest";
options.Password = "guest";
options.VirtualHost = "/";
options.ExchangeName = "valir.events";
options.ExchangeType = "topic";
});| Property | Type | Default | Description |
|---|---|---|---|
HostName |
string |
localhost |
RabbitMQ host |
Port |
int |
5672 |
RabbitMQ port |
UserName |
string |
guest |
Username |
Password |
string |
guest |
Password |
VirtualHost |
string |
/ |
Virtual host |
ExchangeName |
string |
valir.events |
Exchange name |
ExchangeType |
string |
topic |
Exchange type |
Durable |
bool |
true |
Durable queues |
PrefetchCount |
ushort |
10 |
Prefetch count |
public class OrderEventHandler : BackgroundService
{
private readonly IEventBroker _broker;
protected override async Task ExecuteAsync(CancellationToken ct)
{
await _broker.SubscribeAsync(
topic: "orders.*",
subscriptionId: "order-handler",
onMessage: async envelope =>
{
var order = JsonSerializer.Deserialize<OrderEvent>(envelope.Payload);
await ProcessOrderEventAsync(order);
},
ct: ct
);
}
}dotnet add package Valir.Brokers.AzureSBbuilder.Services.AddValirAzureServiceBus(options =>
{
options.ConnectionString = "Endpoint=sb://mybus.servicebus.windows.net/;SharedAccessKeyName=...";
options.MaxConcurrentCalls = 10;
options.PrefetchCount = 20;
options.MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(5);
});| Property | Type | Default | Description |
|---|---|---|---|
ConnectionString |
string |
required | Azure SB connection string |
EnableSessions |
bool |
false |
Enable sessions for ordering |
MaxConcurrentCalls |
int |
10 |
Concurrent message handlers |
PrefetchCount |
int |
20 |
Prefetch count |
MaxAutoLockRenewalDuration |
TimeSpan |
5m |
Lock renewal duration |
All brokers implement at-least-once delivery semantics:
- Kafka: Manual offset commit after processing
- RabbitMQ: Manual ACK after processing
- Azure SB: CompleteMessageAsync after processing
Handle duplicates using idempotency keys:
await broker.SubscribeAsync("orders.created", "handler", async envelope =>
{
// Check if already processed
if (await _cache.ExistsAsync($"processed:{envelope.Id}"))
return;
// Process event
await ProcessEventAsync(envelope);
// Mark as processed (with TTL)
await _cache.SetAsync($"processed:{envelope.Id}", "1", TimeSpan.FromHours(24));
});| Feature | Kafka | RabbitMQ | Azure SB |
|---|---|---|---|
| Throughput | Very High | High | High |
| Ordering | Per-partition | Per-queue | With sessions |
| Replay | Yes | No | Yes (topics) |
| Managed | Confluent Cloud | CloudAMQP | Azure |
| Complexity | Higher | Lower | Medium |
| Best For | Event streaming | Task queues | Azure apps |