Skip to content

Commit f707af7

Browse files
committed
feat: implement BullMQ task queue integration with configuration options for concurrency, retries, and Redis setup in backend-nodejs
1 parent 1cf6933 commit f707af7

18 files changed

Lines changed: 2355 additions & 9 deletions

File tree

backend-nodejs/cmd/server/main.ts

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import {
2424
registerDomainRoutes,
2525
} from '../../infrastructure/bootstrap/server.js';
2626
import { initDependencies } from '../../infrastructure/bootstrap/dependencies.js';
27+
import { startQueueWorkers } from '../../infrastructure/queue/worker_bootstrap.js';
2728
import type { RedisClientType } from 'redis';
2829

2930
async function main() {
@@ -100,10 +101,17 @@ async function main() {
100101

101102
// 7. 初始化依赖注入容器
102103
console.log('🏗️ Initializing dependency injection container...');
103-
const container = initDependencies(config, db, redis);
104+
const container = await initDependencies(config, db, redis);
104105
console.log('✅ Dependencies initialized');
105106

106-
// 8. 注册领域路由
107+
// 8. 启动队列 Worker(如果启用)
108+
if (container.workerManager && config.queue.enabled && container.queueProcessors) {
109+
console.log('⚙️ Starting queue workers...');
110+
startQueueWorkers(container.workerManager, container.queueProcessors, config);
111+
console.log('✅ Queue workers started');
112+
}
113+
114+
// 9. 注册领域路由
107115
console.log('📚 Registering domain routes...');
108116
await registerDomainRoutes(
109117
fastify,
@@ -116,7 +124,7 @@ async function main() {
116124
redis
117125
);
118126

119-
// 9. 启动服务器
127+
// 10. 启动服务器
120128
const address = `http://${config.server.host}:${config.server.port}`;
121129
try {
122130
await fastify.listen({
@@ -134,7 +142,7 @@ async function main() {
134142
process.exit(1);
135143
}
136144

137-
// 10. 优雅关闭
145+
// 11. 优雅关闭
138146
const shutdown = async (signal: string) => {
139147
console.log(`\n🛑 Received ${signal}, shutting down gracefully...`);
140148
try {
@@ -143,6 +151,13 @@ async function main() {
143151
if (redis) {
144152
await closeRedisConnection(redis);
145153
}
154+
// 关闭队列
155+
if (container.queueClient) {
156+
await container.queueClient.close();
157+
}
158+
if (container.workerManager) {
159+
await container.workerManager.stopAll();
160+
}
146161
// 关闭事件总线
147162
await container.eventBus.close();
148163
// 刷新日志缓冲区

backend-nodejs/env.example

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,19 @@ TRACING_ENDPOINT=http://localhost:4318
5656
# ANTHROPIC_API_KEY=your-anthropic-api-key
5757

5858
# ==================== 队列配置(可选,用于 BullMQ)====================
59-
# QUEUE_REDIS_HOST=localhost
60-
# QUEUE_REDIS_PORT=6379
61-
# QUEUE_REDIS_PASSWORD=
59+
# 队列功能开关
60+
QUEUE_ENABLED=true
61+
62+
# 队列使用的 Redis DB(与缓存分离,避免冲突)
63+
# 缓存使用 REDIS_DB=0,队列使用 QUEUE_REDIS_DB=1
64+
QUEUE_REDIS_DB=1
65+
66+
# Worker 配置
67+
QUEUE_WORKER_CONCURRENCY=10 # Worker 并发数(同时处理的任务数)
68+
QUEUE_WORKER_MAX_RETRIES=3 # 最大重试次数
69+
QUEUE_WORKER_RETRY_DELAY=5000 # 重试延迟(毫秒)
70+
71+
# 注意:队列使用与 Redis 缓存相同的 Redis 实例
72+
# 但使用不同的 DB 编号(REDIS_DB vs QUEUE_REDIS_DB)
73+
# 队列配置会从上面的 REDIS_HOST、REDIS_PORT、REDIS_PASSWORD 读取连接信息
6274

backend-nodejs/infrastructure/bootstrap/dependencies.ts

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ import type { Kysely } from 'kysely';
1414
import type { RedisClientType } from 'redis';
1515
import type { Database } from '../persistence/postgres/database.js';
1616
import type { Config } from '../config/config.js';
17+
import type { QueueClient, WorkerManager } from '../queue/index.js';
18+
import type { QueueProcessorsResult } from '../queue/bootstrap.js';
19+
import { bootstrapQueueProcessors } from '../queue/bootstrap.js';
1720

1821
// Repository 实现
1922
import { TaskRepositoryImpl } from '../../domains/task/repository/task_repo.js';
@@ -37,13 +40,20 @@ import { createAuthMiddleware } from '../middleware/auth.js';
3740
// Event Bus
3841
import { createEventBus, type EventBus } from '../../domains/shared/events/event_bus.js';
3942

43+
// Queue
44+
import { createQueueClient } from '../queue/client.js';
45+
import { createWorkerManager } from '../queue/worker.js';
46+
4047
/**
4148
* 应用依赖容器
4249
* 包含所有领域的 Handler Dependencies 和 Middleware
4350
*/
4451
export interface AppContainer {
4552
// 基础设施
4653
eventBus: EventBus;
54+
queueClient: QueueClient | null;
55+
workerManager: WorkerManager | null;
56+
queueProcessors: QueueProcessorsResult | null; // 队列处理器引导结果
4757

4858
// Auth 领域
4959
authHandlerDeps: AuthHandlerDependencies;
@@ -64,18 +74,40 @@ export interface AppContainer {
6474
* @param _redis Redis 连接(可选,保留用于未来扩展)
6575
* @returns 应用依赖容器
6676
*/
67-
export function initDependencies(
77+
export async function initDependencies(
6878
config: Config,
6979
db: Kysely<Database>,
7080
_redis: RedisClientType | null = null
71-
): AppContainer {
81+
): Promise<AppContainer> {
7282
// ============================================
7383
// Infrastructure Layer(基础设施层)
7484
// ============================================
7585

7686
// 事件总线(用于领域间通信)
7787
const eventBus = createEventBus();
7888

89+
// 队列客户端和 Worker 管理器(如果启用)
90+
let queueClient: QueueClient | null = null;
91+
let workerManager: WorkerManager | null = null;
92+
let queueProcessors: QueueProcessorsResult | null = null;
93+
94+
if (config.queue.enabled && _redis) {
95+
const queueRedisConfig = {
96+
host: config.redis.host,
97+
port: config.redis.port,
98+
password: config.redis.password || undefined,
99+
db: config.queue.redisDb,
100+
};
101+
102+
queueClient = createQueueClient(queueRedisConfig);
103+
workerManager = createWorkerManager(queueRedisConfig);
104+
105+
// 引导并注册所有队列处理器(领域自注册模式)
106+
queueProcessors = await bootstrapQueueProcessors({
107+
includeExamples: config.server.env === 'development',
108+
});
109+
}
110+
79111
// ============================================
80112
// Repository Layer(基础设施层):数据访问
81113
// ============================================
@@ -139,6 +171,9 @@ export function initDependencies(
139171

140172
return {
141173
eventBus,
174+
queueClient,
175+
workerManager,
176+
queueProcessors,
142177
authHandlerDeps,
143178
authMiddleware,
144179
userHandlerDeps,

backend-nodejs/infrastructure/config/config.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,15 @@ const ConfigSchema = z.object({
7474
password: z.string().default(''),
7575
db: z.coerce.number().int().nonnegative().default(0),
7676
}),
77+
queue: z.object({
78+
enabled: z.coerce.boolean().default(true),
79+
redisDb: z.coerce.number().int().nonnegative().default(1), // 队列使用 DB 1,与缓存分离
80+
worker: z.object({
81+
concurrency: z.coerce.number().int().positive().default(10),
82+
maxRetries: z.coerce.number().int().nonnegative().default(3),
83+
retryDelay: z.coerce.number().int().nonnegative().default(5000), // 毫秒
84+
}),
85+
}),
7786
jwt: z.object({
7887
secret: z
7988
.string()
@@ -157,6 +166,15 @@ export function loadConfig(): Config {
157166
password: process.env.REDIS_PASSWORD,
158167
db: process.env.REDIS_DB,
159168
},
169+
queue: {
170+
enabled: process.env.QUEUE_ENABLED,
171+
redisDb: process.env.QUEUE_REDIS_DB,
172+
worker: {
173+
concurrency: process.env.QUEUE_WORKER_CONCURRENCY,
174+
maxRetries: process.env.QUEUE_WORKER_MAX_RETRIES,
175+
retryDelay: process.env.QUEUE_WORKER_RETRY_DELAY,
176+
},
177+
},
160178
jwt: {
161179
secret: process.env.JWT_SECRET,
162180
accessTokenExpiry: process.env.JWT_ACCESS_TOKEN_EXPIRY,

0 commit comments

Comments
 (0)