-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathRabbitMqEventBus.ts
More file actions
153 lines (134 loc) · 5.22 KB
/
RabbitMqEventBus.ts
File metadata and controls
153 lines (134 loc) · 5.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import type { IContainer } from 'node-cqrs';
import type { IEventBus, IMessage, IMessageHandler, IObservable, IObservableQueueProvider } from '../interfaces/index.ts';
import { assertBoolean, assertDefined, assertNonNegativeInteger, assertString } from '../utils/index.ts';
import { RabbitMqCommandBus } from './RabbitMqCommandBus.ts';
import { RabbitMqGateway, type Subscription, type SubscribeResult } from './RabbitMqGateway.ts';
import { type ConfigProvider, resolveProvider } from './utils/index.ts';
export type RabbitMqEventBusConfig = Partial<Pick<Subscription,
'exchange' | 'queueName' | 'ignoreOwn' | 'concurrentLimit' | 'handlerProcessTimeout' | 'queueExpires' | 'deadLetterQueue'>>;
type ResolvedRabbitMqEventBusConfig = RabbitMqEventBusConfig
& Required<Pick<RabbitMqEventBusConfig, 'exchange' | 'ignoreOwn'>>;
async function resolveConfig(provider?: ConfigProvider<RabbitMqEventBusConfig>) {
const {
// eslint-disable-next-line no-use-before-define
exchange = RabbitMqEventBus.DEFAULT_EXCHANGE,
ignoreOwn = true,
concurrentLimit,
handlerProcessTimeout,
queueName,
queueExpires,
deadLetterQueue = false
} = await resolveProvider(provider) ?? {};
assertString(exchange, 'rabbitMqEventConfig.exchange');
assertBoolean(ignoreOwn, 'rabbitMqEventConfig.ignoreOwn');
if (concurrentLimit !== undefined)
assertNonNegativeInteger(concurrentLimit, 'rabbitMqEventConfig.concurrentLimit');
if (handlerProcessTimeout !== undefined)
assertNonNegativeInteger(handlerProcessTimeout, 'rabbitMqEventConfig.handlerProcessTimeout');
if (queueName !== undefined)
assertString(queueName, 'rabbitMqEventConfig.queueName');
if (queueExpires !== undefined)
assertNonNegativeInteger(queueExpires, 'rabbitMqEventConfig.queueExpires');
assertBoolean(deadLetterQueue, 'rabbitMqEventConfig.deadLetterQueue');
return { exchange, ignoreOwn, concurrentLimit, handlerProcessTimeout, queueName, queueExpires, deadLetterQueue };
}
/**
* RabbitMQ-backed event bus: delivers each published message
* to all subscribers.
*
* By default uses an exclusive (non-durable) queue per connection.
* Set `queueName` in config for a durable queue that survives restarts.
*
* Dead letter queue is disabled by default (`deadLetterQueue: false`).
* When a handler fails or times out, the message is discarded.
* Set `deadLetterQueue: true` to route rejected messages to a `.failed` queue.
*
* Optionally ignores messages published by this instance (default: true).
*
* Supports named durable queues via {@link queue} for single-consumer delivery.
*/
export class RabbitMqEventBus implements IEventBus, IObservableQueueProvider {
static get allEventsWildcard(): string {
return RabbitMqGateway.ALL_EVENTS_WILDCARD;
}
static DEFAULT_EXCHANGE = 'node-cqrs.events';
readonly #gateway: RabbitMqGateway;
readonly #queues = new Map<string, RabbitMqCommandBus>();
readonly #configProvider?: ConfigProvider<RabbitMqEventBusConfig>;
#config?: ResolvedRabbitMqEventBusConfig;
constructor({
rabbitMqGateway,
rabbitMqEventBusConfig
}: Pick<IContainer, 'rabbitMqGateway' | 'rabbitMqEventBusConfig'>) {
assertDefined(rabbitMqGateway, 'rabbitMqGateway');
this.#gateway = rabbitMqGateway;
this.#configProvider = rabbitMqEventBusConfig;
}
async #resolveConfig(): Promise<ResolvedRabbitMqEventBusConfig> {
this.#config ??= await resolveConfig(this.#configProvider);
return this.#config;
}
/**
* Publishes a message to the event exchange.
* The message will be delivered to all subscribers.
*/
async publish(message: IMessage): Promise<void> {
const { exchange } = await this.#resolveConfig();
await this.#gateway.publish(exchange, message);
}
/**
* Registers a message handler for a specific message type.
*
* When `queueName` is set in config, uses a durable queue
* that survives broker restarts. Otherwise uses an exclusive
* (non-durable) queue that is deleted on disconnect.
*/
async on(eventType: string, handler: IMessageHandler): Promise<SubscribeResult> {
const subscriptionParams = await this.#resolveConfig();
return this.#gateway.subscribe({
...subscriptionParams,
eventType,
handler,
singleActiveConsumer: true
});
}
/**
* Removes a previously registered message handler for a specific message type.
*/
async off(eventType: string, handler: IMessageHandler): Promise<void> {
const { exchange, queueName } = await this.#resolveConfig();
await this.#gateway.unsubscribe({
exchange,
queueName,
eventType,
handler
});
}
/**
* Returns a {@link RabbitMqCommandBus} that uses a durable queue with the given name.
* Messages published to the event exchange are also delivered to this queue,
* but only one consumer will process each message.
*/
queue(queueName: string): IObservable {
let queue = this.#queues.get(queueName);
if (!queue) {
queue = new RabbitMqCommandBus({
rabbitMqGateway: this.#gateway,
rabbitMqCommandBusConfig: async () => {
const { exchange, concurrentLimit, handlerProcessTimeout, queueExpires, deadLetterQueue } =
await this.#resolveConfig();
return {
exchange,
queueName,
concurrentLimit,
handlerProcessTimeout,
queueExpires,
deadLetterQueue
};
}
});
this.#queues.set(queueName, queue);
}
return queue;
}
}