This project demonstrates how a service can provide both aggregates and their corresponding change events in a consistent way.
An aggregate is an entity that is stored and retrieved as a whole. The project showcases two example aggregates:
- A "person" aggregate provides information about a person.
- A "location" aggregate provides statistical data about all persons in a city.
Aggregates can be built from any source. In this project, they are created via REST requests, as shown in the table below.
Aggregates are delivered to consumers as JSON objects via HTTP GET requests.
| # | Input operation | Resulting person aggregates | Resulting location aggregates |
|---|---|---|---|
| 1 | POST /persons {"name":"Ann","city":"Rome"} |
{"1":{"name":"Ann","city":"Rome"}} |
{"Rome":{"total":1}} |
| 2 | POST /persons {"name":"Bob"} |
{"1":{"name":"Ann","city":"Rome"}}{"2":{"name":"Bob"}} |
{"Rome":{"total":1}} |
| 3 | PATCH /persons/2 {"city":"Rome"} |
{"1":{"name":"Ann","city":"Rome"}}{"2":{"name":"Bob","city":"Rome"}} |
{"Rome":{"total":2}} |
| 4 | PATCH /persons/1 {"city":null} |
{"1":{"name":"Ann"}}{"2":{"name":"Bob","city":"Rome"}} |
{"Rome":{"total":1}} |
| 5 | DELETE /persons/2 |
{"1":{"name":"Ann"}} |
(none) |
Every change event encodes the difference between two states of an aggregate. A consumer can rebuild the aggregate by listening to the stream of change events. The protocol of choice is JSON Merge Patch (not to be confused with JSON Patch).
| # | Input operation | Resulting person event | Resulting location event |
|---|---|---|---|
| 1 | POST /persons {"name":"Ann","city":"Rome"} |
{"1":{"name":"Ann","city":"Rome"}} |
{"Rome":{"total":1}} |
| 2 | POST /persons {"name":"Bob"} |
{"2":{"name":"Bob"}} |
(none) |
| 3 | PATCH /persons/2 {"city":"Rome"} |
{"2":{"city":"Rome"}} |
{"Rome":{"total":2}} |
| 4 | PATCH /persons/1 {"city":null} |
{"1":{"city":null}} |
{"Rome":{"total":1}} |
| 5 | DELETE /persons/1 |
{"1":null} |
{"Rome":null} |
In contrast to Event Sourcing, the consumer does not need to read the entire event stream. It can bootstrap from the aggregates and keep itself up-to-date by applying subsequent changes. Older events can be deleted, avoiding storage bottlenecks and potential GDPR violations.
The event stream can be transported by any messaging system such as ActiveMQ. The approach chosen in this project utilizes Server-Sent Events as a yet simpler solution. No shared infrastructure is required; both aggregates and events are delivered over HTTP, although through different server endpoints.
The separation into two steps, bootstrapping and event consumption, requires synchronisation: The consumer must receive change events in order. Moreover, it must not miss change events for the previously loaded aggregates. The challenge here is on server side. The server must store the aggregate and publish the corresponding change event in an atomic operation.
Atomicity can be guaranteed with the Transactional Outbox pattern, which persists the aggregate and the corresponding event in one transaction. A message relay periodically publishes new events and drops older ones. On failure, events may be published repeatedly. Therefore, events must be idempotent. This holds for the JSON Merge Patch protocol.
To match aggregate state and event, another table stores the latest revision of the aggregates.
Every event in the outbox event table is annotated with the aggregate revision.
The aggregate endpoint delivers the revision in header X-Revision.
curl -D - http://localhost:3000/personsproduces
HTTP/1.1 200 OK
content-type: application/json
x-revision: 7After reading the aggregates, the consumer can use the revision value to subscribe to all subsequent change events:
curl -N -H "X-Revision: 8" http://localhost:3000/person-eventsNote that the consumer may also use 7 or any smaller value instead, because the events are idempotent. The only limitation is the event retention time on the server, which perodically deletes older events.
You need Rust for the server.
git clone https://github.com/mouton0815/aggregate-event-duality.git
cd aggregate-event-duality
cargo buildTo use the example consumer, you also need Node.js:
cd node
npm installStart the server with
RUST_LOG=info cargo runNote that the server uses an in-memory SQLite database,
so the aggregates are lost on restart of the server. This can be changed by replacing the ":memory:
argument of the Aggregator constructor in server.rs by a path to a database file,
for example "database.db".
When the server is running, you can start the example consumer in another shell:
node node/consumer.jsTo build aggregates and product the corresponding change events, you need to create/update/delete persons via the REST endpoint of the server. Example requests:
curl -X POST -H 'Content-Type: application/json' -d '{"name":"Ann","city":"Rome"}' http://localhost:3000/persons
curl -X POST -H 'Content-Type: application/json' -d '{"name":"Bob"}' http://localhost:3000/persons
curl -X PATCH -H 'Content-Type: application/json' -d '{"city":null}' http://localhost:3000/persons/1
curl -X DELETE http://localhost:3000/persons/1The aggregates are available at the following endpoints:
curl http://localhost:3000/persons
curl http://localhost:3000/locationsThe corresponding change streams can be accessed via
curl -N -H "X-Revision: 1" http://localhost:3000/person-events
curl -N -H "X-Revision: 1" http://localhost:3000/location-eventsFor more examples, see curl-examples.sh.

