-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathAggregateCommandHandler.ts
More file actions
257 lines (214 loc) · 7.91 KB
/
AggregateCommandHandler.ts
File metadata and controls
257 lines (214 loc) · 7.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
import {
assertBoolean, assertDefined, assertMessage, assertNonNegativeInteger, assertObservable, assertStringArray,
Lock, MapAssertable
} from './utils/index.ts';
import { ConcurrencyError } from './errors/index.ts';
import type {
AggregateEventsQueryParams,
IAggregate,
IAggregateConstructor,
IAggregateFactory,
ICommand,
ICommandHandler,
IContainer,
Identifier,
IEventSet,
IEventStore,
ILocker,
ILogger,
IObservable,
RetryOnConcurrencyErrorDecision,
RetryOnConcurrencyErrorOptions,
RetryOnConcurrencyErrorResolver
} from './interfaces/index.ts';
import { isObject } from './interfaces/isObject.ts';
const DEFAULT_MAX_RETRY_ATTEMPTS = 5;
function normalizeRetryResolver(value?: RetryOnConcurrencyErrorOptions): RetryOnConcurrencyErrorResolver {
if (typeof value === 'function')
return value;
if (value === false)
return () => false;
if (value === 'ignore')
return err => (err instanceof ConcurrencyError ? 'ignore' : false);
if (typeof value === 'number')
return (err, events, attempt) => err instanceof ConcurrencyError && attempt < value;
if (isObject(value)) {
const { maxRetries = DEFAULT_MAX_RETRY_ATTEMPTS, ignoreAfterMaxRetries = false } = value;
assertNonNegativeInteger(maxRetries, 'retryOnConcurrencyError.maxRetries');
assertBoolean(ignoreAfterMaxRetries, 'retryOnConcurrencyError.ignoreAfterMaxRetries');
return (err, events, attempt): RetryOnConcurrencyErrorDecision => {
if (!(err instanceof ConcurrencyError))
return false;
if (attempt < maxRetries)
return true;
return ignoreAfterMaxRetries ? 'ignore' : false;
};
}
// undefined or true — default behavior
return (err, events, attempt) =>
err instanceof ConcurrencyError && attempt < DEFAULT_MAX_RETRY_ATTEMPTS;
}
/**
* Aggregate command handler.
*
* Subscribes to event store and awaits aggregate commands.
* Upon command receiving creates an instance of aggregate,
* restores its state, passes command and commits emitted events to event store.
*/
export class AggregateCommandHandler<TAggregate extends IAggregate> implements ICommandHandler {
readonly #eventStore: IEventStore;
readonly #logger?: ILogger;
readonly #aggregateFactory: IAggregateFactory<TAggregate, any>;
readonly #handles: Readonly<string[]>;
readonly #restoresFrom?: Readonly<string[]>;
readonly #shouldRetry: RetryOnConcurrencyErrorResolver;
/** Aggregate instances cache for concurrent command handling */
#aggregatesCache: MapAssertable<Identifier, Promise<TAggregate>> = new MapAssertable();
/** Lock for sequential aggregate command execution */
#executionLock: ILocker;
constructor({
eventStore,
aggregateType,
aggregateFactory,
handles,
executionLocker = new Lock(),
restoresFrom,
retryOnConcurrencyError,
logger
}: Pick<IContainer, 'eventStore' | 'executionLocker' | 'logger'> & {
aggregateType?: IAggregateConstructor<TAggregate, any>,
aggregateFactory?: IAggregateFactory<TAggregate, any>,
handles?: Readonly<string[]>,
restoresFrom?: Readonly<string[]>,
retryOnConcurrencyError?: RetryOnConcurrencyErrorOptions
}) {
assertDefined(eventStore, 'eventStore');
this.#eventStore = eventStore;
this.#executionLock = executionLocker;
this.#logger = logger && 'child' in logger ?
logger.child({ service: new.target.name }) :
logger;
if (aggregateType) {
const AggregateType = aggregateType;
this.#aggregateFactory = params => new AggregateType(params);
this.#handles = AggregateType.handles;
this.#restoresFrom = AggregateType.restoresFrom;
this.#shouldRetry = normalizeRetryResolver(retryOnConcurrencyError ??
AggregateType.retryOnConcurrencyError);
}
else if (aggregateFactory) {
assertStringArray(handles, 'handles');
this.#aggregateFactory = aggregateFactory;
this.#handles = handles;
this.#restoresFrom = restoresFrom;
this.#shouldRetry = normalizeRetryResolver(retryOnConcurrencyError);
}
else {
throw new TypeError('either aggregateType or aggregateFactory is required');
}
}
/** Subscribe to all command types handled by aggregateType */
subscribe(commandBus: IObservable) {
assertObservable(commandBus, 'commandBus');
for (const commandType of this.#handles)
commandBus.on(commandType, (cmd: ICommand) => this.execute(cmd));
}
/** Restore aggregate from event store events */
async #restoreAggregate(id: Identifier): Promise<TAggregate> {
assertDefined(id, 'id');
const aggregate = this.#aggregateFactory({ id });
const queryOptions = this.#restoresFrom?.length ?
{ eventTypes: this.#restoresFrom, tail: 'last' } satisfies AggregateEventsQueryParams :
undefined;
const eventsIterable = this.#eventStore.getAggregateEvents(id, queryOptions);
let eventCount = 0;
for await (const event of eventsIterable) {
aggregate.mutate(event);
eventCount += 1;
}
this.#logger?.info(`${aggregate} state restored from ${eventCount} event(s)`);
return aggregate;
}
/** Create new aggregate with new Id generated by event store */
async #createAggregate(): Promise<TAggregate> {
const id = await this.#eventStore.getNewId();
const aggregate = this.#aggregateFactory({ id });
this.#logger?.info(`${aggregate} created`);
return aggregate;
}
/**
* Register interest in the cache entry before acquiring the lock, so concurrent callers for the same aggregateId
* share one restoration promise instead of each triggering a separate event-store read
*/
#allocateCacheEntry(aggregateId: Identifier | undefined) {
if (aggregateId)
this.#aggregatesCache.assert(aggregateId, () => this.#restoreAggregate(aggregateId));
}
/**
* Replace the dirty cache entry with a fresh restoration promise
* so both the retry and any commands queued on the lock start from a clean state.
*/
#resetCacheEntry(aggregateId: Identifier | undefined) {
if (aggregateId)
this.#aggregatesCache.set(aggregateId, this.#restoreAggregate(aggregateId));
}
/**
* Decrement the usage counter registered above;
* deletes the entry when the last concurrent caller for this aggregateId is done.
*/
#releaseCacheEntry(aggregateId: Identifier | undefined) {
if (aggregateId)
this.#aggregatesCache.release(aggregateId);
}
/** Pass a command to corresponding aggregate */
async execute(cmd: ICommand): Promise<IEventSet> {
assertMessage(cmd, 'cmd');
const { aggregateId } = cmd;
this.#allocateCacheEntry(aggregateId);
// Serialize execution per aggregate — commands for the same id queue here.
const lease = aggregateId ?
await this.#executionLock.acquire(String(aggregateId)) :
undefined;
try {
for (let attempt = 0; ; attempt++) {
// Read the current cache entry after acquiring the lock. On the first attempt
// this is the pre-warmed (possibly shared) instance; on retries it is the
// fresh instance placed into the cache by the error handler below.
const aggregate = aggregateId ?
await this.#aggregatesCache.get(aggregateId)! :
await this.#createAggregate();
let events: IEventSet;
try {
events = await aggregate.handle(cmd);
this.#logger?.info(`${aggregate} "${cmd.type}" command processed, ${events.length} event(s) produced`);
}
catch (error: unknown) {
this.#resetCacheEntry(aggregateId);
throw error;
}
try {
if (events.length)
await this.#eventStore.dispatch(events);
return events;
}
catch (error: unknown) {
this.#resetCacheEntry(aggregateId);
const retryDecision = this.#shouldRetry(error, events, attempt);
if (!retryDecision)
throw error;
if (retryDecision === 'ignore') {
this.#logger?.warn(`"${cmd.type}" command error ignored after ${attempt + 1} attempt(s), force-dispatching`, { error });
if (events.length)
await this.#eventStore.dispatch(events, { ignoreConcurrencyError: true });
return events;
}
this.#logger?.warn(`"${cmd.type}" command failed on attempt ${attempt + 1}, will retry`, { error });
}
}
}
finally {
lease?.release();
this.#releaseCacheEntry(aggregateId);
}
}
}