The MintPlayer.Spark.SubscriptionWorker package provides a framework for building RavenDB subscription workers with built-in retry logic, categorized exception handling, and ASP.NET Core lifecycle management. It runs independently of the core Spark CRUD framework -- any project with a RavenDB IDocumentStore can use it.
dotnet add package MintPlayer.Spark.SubscriptionWorkerIf you also use the Spark source generators (for auto-registration), ensure the MintPlayer.Spark.SourceGenerators package is referenced.
A subscription worker continuously listens for document changes in RavenDB via the Data Subscriptions mechanism. When documents match the subscription's RQL query, RavenDB delivers them in batches to the worker for processing.
SparkSubscriptionWorker<T> wraps this into an ASP.NET Core BackgroundService with:
- Automatic subscription creation/update on startup
- A connection loop that reconnects after errors or normal completion
- Categorized exception handling (retryable vs. fatal)
- Per-document retry tracking via
RetryNumerator - Lifecycle hooks for startup, shutdown, and batch completion
Extend SparkSubscriptionWorker<T> and implement two abstract methods:
ConfigureSubscription()-- returns the RQL query that filters which documents are deliveredProcessBatchAsync()-- handles each batch of documents
using MintPlayer.Spark.SubscriptionWorker;
using Raven.Client.Documents;
using Raven.Client.Documents.Subscriptions;
public class OrderProcessingWorker : SparkSubscriptionWorker<Order>
{
private readonly RetryNumerator _retryNumerator = new();
public OrderProcessingWorker(IDocumentStore store, ILogger<OrderProcessingWorker> logger)
: base(store, logger) { }
protected override SubscriptionCreationOptions ConfigureSubscription()
=> new() { Query = "from Orders where Status = 'Pending'" };
protected override async Task ProcessBatchAsync(
SubscriptionBatch<Order> batch, CancellationToken cancellationToken)
{
using var session = batch.OpenAsyncSession();
foreach (var item in batch.Items)
{
try
{
var order = item.Result;
// Process the order...
order.Status = "Processed";
await _retryNumerator.ClearRetryAsync(session, order);
}
catch (Exception ex)
{
var willRetry = await _retryNumerator.TrackRetryAsync(
session, item.Result, ex, Logger);
if (!willRetry)
{
Logger.LogError(ex, "Permanently failed processing order {Id}", item.Id);
}
}
}
await session.SaveChangesAsync(cancellationToken);
}
}By default, the subscription name in RavenDB is derived from the class name by stripping common suffixes:
OrderProcessingWorkerbecomes"OrderProcessing"OrderProcessingSubscriptionWorkerbecomes"OrderProcessing"
Override SubscriptionName to set a custom name:
protected override string SubscriptionName => "MyCustomSubscription";If your project references MintPlayer.Spark.SourceGenerators, a source generator discovers all SparkSubscriptionWorker<T> subclasses in your project and generates an AddSparkSubscriptionWorkers() extension method:
// Program.cs
builder.Services.AddSparkSubscriptions();
builder.Services.AddSparkSubscriptionWorkers(); // source-generatedThe generated code calls AddSubscriptionWorker<T>() for each worker class found.
Register workers individually:
builder.Services.AddSparkSubscriptions();
builder.Services.AddSubscriptionWorker<OrderProcessingWorker>();Override virtual properties on your worker class to tune behavior:
public class OrderProcessingWorker : SparkSubscriptionWorker<Order>
{
// Target database (default: null = store default)
protected override string? Database => null;
// Max documents per batch (default: 256)
protected override int MaxDocsPerBatch => 100;
// Whether to reconnect after normal subscription completion (default: true)
protected override bool KeepRunning => true;
// Wait time before connection retry (default: 30 seconds)
protected override TimeSpan RetryDelay => TimeSpan.FromSeconds(30);
// Max erroneous period before giving up on connection (default: 5 minutes)
protected override TimeSpan MaxDownTime => TimeSpan.FromMinutes(5);
}AddSparkSubscriptions() accepts an optional configuration callback:
builder.Services.AddSparkSubscriptions(options =>
{
options.WaitForNonStaleIndexes = true; // default: true
options.NonStaleIndexTimeout = TimeSpan.FromMinutes(2); // default: 2 minutes
});Each worker follows this lifecycle as a BackgroundService:
- Startup:
EnsureSubscriptionExistsAsynccreates or updates the RavenDB subscription (idempotent -- if it already exists, the query is updated). OnWorkerStartedAsync(): Lifecycle hook called after the subscription is ready.- Connection loop: Opens a subscription worker connection and starts receiving batches.
- Batch processing: Calls
ProcessBatchAsync()for each batch, thenOnBatchCompletedAsync(itemCount). - Error recovery: Catches and categorizes exceptions (see table below).
- Shutdown: Triggered by
CancellationTokencancellation (e.g., app shutdown). CallsOnWorkerStoppedAsync().
The connection loop classifies exceptions into three categories:
These errors cause the worker to wait and then reconnect:
| Exception | Wait Time | Description |
|---|---|---|
SubscriptionInUseException |
RetryDelay * 2 |
Another node holds the subscription |
SubscriberErrorException |
RetryDelay |
Error in the subscriber callback |
Other unexpected exceptions (when KeepRunning = true) |
RetryDelay |
Transient errors |
These errors cause the worker to stop permanently and call OnNonRecoverableErrorAsync():
| Exception | Description |
|---|---|
SubscriptionClosedException |
The subscription was deleted or disabled |
DatabaseDoesNotExistException |
The target database does not exist |
SubscriptionDoesNotExistException |
The subscription was removed |
SubscriptionInvalidStateException |
The subscription is in an invalid state |
AuthorizationException |
Authentication/authorization failure |
Other unexpected exceptions (when KeepRunning = false) |
Any error when auto-reconnect is disabled |
OperationCanceledException when the CancellationToken is cancelled triggers a graceful shutdown.
Override these virtual methods to react to worker events:
// Called after startup, before the first batch
protected override Task OnWorkerStartedAsync() => Task.CompletedTask;
// Called when the worker stops (graceful or error)
protected override Task OnWorkerStoppedAsync() => Task.CompletedTask;
// Called after each batch is successfully processed
protected override Task OnBatchCompletedAsync(int itemCount) => Task.CompletedTask;
// Called when a non-recoverable error occurs (before stopping)
protected override Task OnNonRecoverableErrorAsync(Exception exception) => Task.CompletedTask;RetryNumerator tracks failed processing attempts for individual documents using RavenDB counters and the @refresh metadata mechanism.
- When
TrackRetryAsync()is called for a failed document, it increments a RavenDB counter on the document. - It sets the
@refreshmetadata to a future timestamp, which causes RavenDB to redeliver the document to the subscription at that time. - If the maximum number of attempts is exhausted, the counter is cleared and the document is "parked" for a longer delay (default: 1 day).
var retryNumerator = new RetryNumerator
{
MaxAttempts = 5, // default: 5
BaseDelay = TimeSpan.FromSeconds(30), // default: 30s
CounterName = "SparkRetryAttempts", // default
ExhaustedDelay = TimeSpan.FromDays(1), // default: 1 day
};RetryNumerator uses linear incremental backoff (BaseDelay * attempt):
| Attempt | Delay |
|---|---|
| 1 | 30 seconds |
| 2 | 60 seconds |
| 3 | 90 seconds |
| 4 | 120 seconds |
| 5 | 150 seconds |
| Exhausted | 1 day (parked) |
protected override async Task ProcessBatchAsync(
SubscriptionBatch<Order> batch, CancellationToken cancellationToken)
{
using var session = batch.OpenAsyncSession();
foreach (var item in batch.Items)
{
try
{
// Process the document...
await _retryNumerator.ClearRetryAsync(session, item.Result);
}
catch (Exception ex)
{
var willRetry = await _retryNumerator.TrackRetryAsync(
session, item.Result, ex, Logger);
// willRetry = false when max attempts are exhausted
}
}
await session.SaveChangesAsync(cancellationToken);
}Call ClearRetryAsync() after successful processing to remove any leftover retry counters from previous failures.
For change detection (comparing previous vs. current document state), subscribe to Revision<T>:
public class CompanyChangeWorker : SparkSubscriptionWorker<Revision<Company>>
{
public CompanyChangeWorker(IDocumentStore store, ILogger<CompanyChangeWorker> logger)
: base(store, logger) { }
protected override SubscriptionCreationOptions ConfigureSubscription()
=> new() { Query = "from Companies (Revisions = true)" };
protected override async Task ProcessBatchAsync(
SubscriptionBatch<Revision<Company>> batch, CancellationToken cancellationToken)
{
using var session = batch.OpenAsyncSession();
foreach (var item in batch.Items)
{
var previous = item.Result.Previous;
var current = item.Result.Current;
// React to changes between previous and current...
}
await session.SaveChangesAsync(cancellationToken);
}
}This requires RavenDB document revisions to be enabled on the collection.
The MintPlayer.Spark.Messaging package uses SparkSubscriptionWorker<T> internally for its message processing pipeline. MessageSubscriptionWorker subscribes to SparkMessage documents filtered by queue name and status:
internal sealed class MessageSubscriptionWorker : SparkSubscriptionWorker<SparkMessage>
{
protected override string SubscriptionName => $"SparkMessaging-{_queueName}";
protected override int MaxDocsPerBatch => 1;
protected override SubscriptionCreationOptions ConfigureSubscription()
{
return new SubscriptionCreationOptions
{
Query = $@"from SparkMessages
where QueueName = '{_queueName}'
and Status = 'Pending'
and (NextAttemptAtUtc = null or NextAttemptAtUtc <= now())"
};
}
protected override async Task ProcessBatchAsync(
SubscriptionBatch<SparkMessage> batch, CancellationToken cancellationToken)
{
foreach (var item in batch.Items)
{
var message = item.Result;
var session = batch.OpenAsyncSession();
// Mark as Processing, deserialize payload, resolve handlers,
// handle retries, dead-lettering, and expiration...
}
}
}This demonstrates a pattern where the subscription query does server-side filtering (only pending messages past their retry delay), and the worker handles retries, dead-lettering, and state transitions within ProcessBatchAsync.
The SubscriptionWorkerRegistrationGenerator source generator scans your project for all non-abstract classes that inherit from SparkSubscriptionWorker<T> (at any depth in the inheritance chain). It generates a static extension method:
// Auto-generated: SparkSubscriptionWorkerRegistrations.g.cs
namespace YourProject
{
internal static class SparkSubscriptionWorkersExtensions
{
internal static IServiceCollection AddSparkSubscriptionWorkers(
this IServiceCollection services)
{
SparkSubscriptionExtensions.AddSubscriptionWorker<OrderProcessingWorker>(services);
SparkSubscriptionExtensions.AddSubscriptionWorker<CompanyChangeWorker>(services);
return services;
}
}
}This eliminates the need to manually register each worker in Program.cs.
- .NET 10.0+
- RavenDB 6.2+
- An
IDocumentStoreregistered in the DI container (provided byAddSpark()or registered manually)
See the following files for working implementations:
MintPlayer.Spark.SubscriptionWorker/SparkSubscriptionWorker.cs-- abstract base class with connection loop and error handlingMintPlayer.Spark.SubscriptionWorker/RetryNumerator.cs-- per-document retry trackingMintPlayer.Spark.SubscriptionWorker/SparkSubscriptionExtensions.cs-- DI registration helpersMintPlayer.Spark.Messaging/Services/MessageSubscriptionWorker.cs-- real-world usage in the messaging packageMintPlayer.Spark.SourceGenerators/Generators/SubscriptionWorkerRegistrationGenerator.cs-- source generator for auto-registration