Skip to content

Commit 6c5b7e9

Browse files
rafalmaciagclaude
andcommitted
Add MetadataFactory to centralize Metadata construction
Introduces MetadataFactory with 4 Create overloads: - Full (EventStore read path with all fields) - From event object (conventions compute streamId + enrichers) - Explicit fields (tests with known sourceStreamId) - Generic <TEvent, TId> (conventions + duck-typed Id from event) PlumberEngine.ReadEventData and EventAggregatorEventHandlerStarter now use MetadataFactory instead of direct Metadata construction. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent e311eca commit 6c5b7e9

6 files changed

Lines changed: 141 additions & 30 deletions

File tree

CLAUDE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ MicroPlumberd is a CQRS/Event Sourcing framework built on EventStore that emphas
4444
#### 1. Event Sourcing Layer
4545
- **PlumberEngine** serves as the core abstraction over EventStore operations
4646
- **IPlumber/IPlumberInstance** interfaces provide the main API surface
47+
- **MetadataFactory** centralizes `Metadata` construction with proper JSON schema (`plumber.MetadataFactory` or standalone `new MetadataFactory()`)
4748
- Events are stored in streams following naming conventions (e.g., `agg-{type}-{id}` for aggregates)
4849
- Supports snapshots for aggregate performance optimization
4950

README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,24 @@ services.AddPlumberd(configure: c => {
315315
.AddCommandHandler<FooCommandHandler>()
316316
```
317317

318+
### MetadataFactory
319+
320+
`MetadataFactory` centralizes `Metadata` construction with proper JSON schema. Available via `plumber.MetadataFactory` or standalone with `new MetadataFactory()` (uses default conventions).
321+
322+
```csharp
323+
// From an event object — conventions compute sourceStreamId and enrich metadata
324+
var metadata = plumber.MetadataFactory.Create(context, evt, recipientId);
325+
326+
// With explicit sourceStreamId — for tests and simple scenarios
327+
var factory = new MetadataFactory();
328+
var metadata = factory.Create($"Category-{id}", created: DateTimeOffset.Now, userId: "admin");
329+
330+
// Typed — conventions compute sourceStreamId from event type
331+
var metadata = factory.Create<OrderCreated, Guid>(orderId, evt, created: DateTimeOffset.Now);
332+
```
333+
334+
The factory ensures `Metadata.StreamId<T>()`, `Metadata.Created()`, `Metadata.UserId()` and other accessors work correctly by building the proper JSON `Data` schema.
335+
318336
### Conventions
319337
- SteamNameConvention - from aggregate type, and aggregate id
320338
- EventNameConvention - from aggregate? instance and event instance

src/MicroPlumberd.Services.BatchOperations.Tests/BatchOperationModelTests.cs

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
using System.Text.Json;
21
using FluentAssertions;
32
using MicroPlumberd;
43
using Xunit;
@@ -53,28 +52,13 @@ namespace MicroPlumberd.Services.BatchOperations.Tests;
5352
/// </summary>
5453
public class BatchOperationModelTests
5554
{
55+
private static readonly MetadataFactory _metadataFactory = new();
56+
5657
/// <summary>
5758
/// Creates test Metadata with minimal required fields.
5859
/// </summary>
5960
private static Metadata CreateMetadata(Guid id, DateTimeOffset? created = null)
60-
{
61-
var eventId = Guid.NewGuid();
62-
var streamId = $"BatchOperations-{id}";
63-
64-
// Build metadata JSON with Created timestamp if provided
65-
var metadataJson = created.HasValue
66-
? JsonDocument.Parse($"{{\"Created\":\"{created.Value:O}\"}}")
67-
: JsonDocument.Parse("{}");
68-
69-
return new Metadata(
70-
id: id,
71-
eventId: eventId,
72-
sourceStreamPosition: 0,
73-
linkStreamPosition: null,
74-
sourceStreamId: streamId,
75-
data: metadataJson.RootElement
76-
);
77-
}
61+
=> _metadataFactory.Create($"BatchOperations-{id}", created: created);
7862

7963
[Fact]
8064
public async Task Given_BatchOperationStarted_CreatesOperation()

src/MicroPlumberd.Services.EventAggregator/EventAggregatorEventHandlerStarter.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
using System.Text.Json;
21
using MicroPlumberd;
32
using MicroPlumberd.Services;
43
using ModelingEvolution.EventAggregator;
@@ -62,11 +61,7 @@ private void SubscribeToEnvelope<TEvent>(IEventAggregator eventAggregator, THand
6261
{
6362
try
6463
{
65-
var sourceStreamId = _plumber.Conventions.StreamNameFromEventConvention(
66-
context, typeof(TEvent), envelope.RecipientId);
67-
var metadataObj = _plumber.Conventions.GetMetadata(context, null, envelope.Event!, null);
68-
var data = JsonSerializer.SerializeToElement(metadataObj);
69-
var metadata = new Metadata(Guid.Empty, Guid.NewGuid(), 0, null, sourceStreamId, data);
64+
var metadata = _plumber.MetadataFactory.Create(context, envelope.Event!, envelope.RecipientId);
7065
await handler.Handle(metadata, envelope.Event!);
7166
}
7267
catch (Exception ex)

src/MicroPlumberd/PlumberEngine.cs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ internal PlumberEngine(EventStoreClientSettings settings, PlumberConfig? config
2828
PersistentSubscriptionClient = new EventStorePersistentSubscriptionsClient(settings);
2929
ProjectionManagementClient = new EventStoreProjectionManagementClient(settings);
3030
Conventions = config.Conventions;
31+
MetadataFactory = new MetadataFactory(Conventions);
3132
SerializerFactory = config.SerializerFactory;
3233
ServiceProvider = config.ServiceProvider;
3334
_extension = config.Extension; // Shouldn't we make a copy?
@@ -80,6 +81,11 @@ internal PlumberEngine(EventStoreClientSettings settings, PlumberConfig? config
8081
/// <inheritdoc/>
8182
public IReadOnlyConventions Conventions { get; }
8283

84+
/// <summary>
85+
/// Gets the metadata factory for creating <see cref="Metadata"/> instances with proper JSON schema.
86+
/// </summary>
87+
public MetadataFactory MetadataFactory { get; }
88+
8389
/// <summary>
8490
/// Creates a subscription to a stream starting from the specified position.
8591
/// </summary>
@@ -1175,17 +1181,15 @@ public static PlumberEngine Create(EventStoreClientSettings? settings = null, Ac
11751181
internal (object, Metadata) ReadEventData(OperationContext context, EventRecord er, EventRecord? eLink, Type t)
11761182
{
11771183
var streamIdSuffix = er.EventStreamId.Substring(er.EventStreamId.IndexOf('-') + 1);
1178-
if (!Guid.TryParse(streamIdSuffix, out var aggregateId))
1184+
if (!Guid.TryParse(streamIdSuffix, out var aggregateId))
11791185
aggregateId = streamIdSuffix.ToGuid();
11801186

11811187
var s = Serializer(t);
11821188
var ev = s.Deserialize(context,er.Data.Span, t)!;
11831189
var m = s.ParseMetadata(context, er.Metadata.Span);
11841190

1185-
long? linkStreamPosition = eLink?.EventNumber.ToInt64();
1186-
long sourceStreamPosition = er.EventNumber.ToInt64();
1187-
1188-
var metadata = new Metadata(aggregateId, er.EventId.ToGuid(), sourceStreamPosition, linkStreamPosition, er.EventStreamId, m);
1191+
var metadata = MetadataFactory.Create(aggregateId, er.EventStreamId, er.EventId.ToGuid(),
1192+
er.EventNumber.ToInt64(), eLink?.EventNumber.ToInt64(), m);
11891193
return (ev, metadata);
11901194
}
11911195

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
using System.Dynamic;
2+
using System.Text.Json;
3+
using MicroPlumberd.Utils;
4+
5+
namespace MicroPlumberd;
6+
7+
/// <summary>
8+
/// Factory for creating <see cref="Metadata"/> instances with proper JSON schema.
9+
/// Centralizes metadata construction that was previously scattered across PlumberEngine, EventAggregator, and tests.
10+
/// </summary>
11+
public class MetadataFactory
12+
{
13+
private readonly IReadOnlyConventions _conventions;
14+
15+
/// <summary>
16+
/// Creates a factory with default conventions (Created timestamp, InvocationContext enrichers).
17+
/// </summary>
18+
public MetadataFactory() : this(new Conventions()) { }
19+
20+
/// <summary>
21+
/// Creates a factory with custom conventions.
22+
/// </summary>
23+
/// <param name="conventions">The conventions to use for metadata enrichment and stream naming.</param>
24+
public MetadataFactory(IReadOnlyConventions conventions) => _conventions = conventions;
25+
26+
/// <summary>
27+
/// Creates metadata from raw EventStore fields.
28+
/// Used by PlumberEngine when reading events from EventStore.
29+
/// </summary>
30+
public Metadata Create(
31+
Guid id,
32+
string sourceStreamId,
33+
Guid eventId,
34+
long sourceStreamPosition,
35+
long? linkStreamPosition,
36+
JsonElement data)
37+
{
38+
return new Metadata(id, eventId, sourceStreamPosition, linkStreamPosition, sourceStreamId, data);
39+
}
40+
41+
/// <summary>
42+
/// Creates metadata from an event object. Conventions compute the sourceStreamId and enrich metadata.
43+
/// The <see cref="Metadata.Id"/> is extracted from the event's Id property via duck typing if present.
44+
/// Used by EventAggregator and other in-process event sources.
45+
/// </summary>
46+
public Metadata Create(
47+
OperationContext context,
48+
object evt,
49+
object? id = null,
50+
object? customMetadata = null,
51+
IAggregate? aggregate = null)
52+
{
53+
var sourceStreamId = _conventions.StreamNameFromEventConvention(context, evt.GetType(), id);
54+
var enriched = _conventions.GetMetadata(context, aggregate, evt, customMetadata);
55+
var data = JsonSerializer.SerializeToElement(enriched);
56+
var eventId = _conventions.GetEventIdConvention(context, aggregate, evt).ToGuid();
57+
IdDuckTyping.Instance.TryGetGuidId(evt, out var metadataId);
58+
return new Metadata(metadataId, eventId, 0, null, sourceStreamId, data);
59+
}
60+
61+
/// <summary>
62+
/// Creates metadata with explicit field values.
63+
/// Useful for tests and simple scenarios where you know the sourceStreamId.
64+
/// </summary>
65+
public Metadata Create(
66+
string sourceStreamId,
67+
DateTimeOffset? created = null,
68+
Guid? correlationId = null,
69+
Guid? causationId = null,
70+
string? userId = null)
71+
{
72+
var data = BuildData(created, correlationId, causationId, userId);
73+
return new Metadata(Guid.Empty, Guid.NewGuid(), 0, null, sourceStreamId, data);
74+
}
75+
76+
/// <summary>
77+
/// Creates metadata for a specific event type and id. Conventions compute the sourceStreamId.
78+
/// The <see cref="Metadata.Id"/> and <see cref="Metadata.EventId"/> are extracted from the event's Id property via duck typing if present.
79+
/// </summary>
80+
public Metadata Create<TEvent, TId>(
81+
TId id,
82+
TEvent evt,
83+
DateTimeOffset? created = null,
84+
Guid? correlationId = null,
85+
Guid? causationId = null,
86+
string? userId = null)
87+
{
88+
var context = OperationContext.Create(Flow.Component);
89+
var sourceStreamId = _conventions.StreamNameFromEventConvention(context, typeof(TEvent), id);
90+
var data = BuildData(created, correlationId, causationId, userId);
91+
var eventId = _conventions.GetEventIdConvention(context, null, evt!).ToGuid();
92+
IdDuckTyping.Instance.TryGetGuidId(evt!, out var metadataId);
93+
return new Metadata(metadataId, eventId, 0, null, sourceStreamId, data);
94+
}
95+
96+
private static JsonElement BuildData(
97+
DateTimeOffset? created,
98+
Guid? correlationId,
99+
Guid? causationId,
100+
string? userId)
101+
{
102+
IDictionary<string, object> obj = new ExpandoObject();
103+
if (created.HasValue) obj["Created"] = created.Value;
104+
if (correlationId.HasValue) obj["$correlationId"] = correlationId.Value;
105+
if (causationId.HasValue) obj["$causationId"] = causationId.Value;
106+
if (userId != null) obj["UserId"] = userId;
107+
return JsonSerializer.SerializeToElement(obj);
108+
}
109+
}

0 commit comments

Comments
 (0)