Skip to content

feat: ZMQ 消息总线 — Agent 常驻进程 + 实时通信#27

Merged
robscc merged 1 commit intomainfrom
feat/zmq-message-bus
Mar 19, 2026
Merged

feat: ZMQ 消息总线 — Agent 常驻进程 + 实时通信#27
robscc merged 1 commit intomainfrom
feat/zmq-message-bus

Conversation

@robscc
Copy link
Owner

@robscc robscc commented Mar 19, 2026

Summary

  • 引入 ZMQ inproc 消息总线(backend/agentpal/zmq_bus/),将 Agent 从一次性 HTTP 请求处理改为常驻 daemon 进程
  • 支持 DEALER/ROUTER 点对点通信和 PUB/SUB 事件广播,替代 asyncio.Queue 单进程事件总线
  • HTTP API(/chat/dispatch)自动检测 ZMQ 可用性,优先 ZMQ 桥接,不可用时回退到直接模式
  • MessageBus 改为 hybrid 模式:DB 持久化(审计 + 历史查询)+ ZMQ 实时投递

架构

AgentDaemonManager (singleton)
├── ROUTER socket ← 中心路由,按 identity 转发消息
├── XPUB/XSUB proxy ← 事件广播代理
├── PersonalAssistantDaemon (per-session, lazy)
│   ├── DEALER → ROUTER (接收/发送)
│   ├── PUB → XSUB (发布 SSE 事件)
│   └── asyncio.Queue (FIFO 处理)
├── SubAgentDaemon (on-demand)
│   └── 同上模式 + task event publishing
└── CronDaemon (singleton)
    └── 定时任务结果通过 AGENT_NOTIFY 推送

新增文件 (8 个模块 + 6 个测试)

文件 职责
zmq_bus/protocol.py Envelope 消息信封 + MessageType 枚举 + msgpack 序列化
zmq_bus/daemon.py AgentDaemon 基类 (DEALER + PUB + FIFO Queue)
zmq_bus/pa_daemon.py PA Daemon — 包装 PersonalAssistant,per-session 常驻
zmq_bus/sub_daemon.py SubAgent Daemon — 按需创建,任务完成后保活
zmq_bus/cron_daemon.py Cron Daemon — 全局单例,定时任务结果通知
zmq_bus/manager.py Manager — ROUTER 路由 + XPUB/XSUB 代理 + 生命周期管理
zmq_bus/event_subscriber.py EventSubscriber — SUB socket async iterator → SSE
tests/unit/test_zmq_*.py 26 个单元测试(协议、daemon、manager)
tests/integration/test_zmq_*.py 17 个集成测试(chat flow、dispatch、interagent)

修改文件

文件 变更
main.py lifespan 中启动/停止 AgentDaemonManager
config.py 添加 zmq_router_addrzmq_events_addr、idle timeout 配置
agent.py /chat /dispatch 端点 ZMQ 桥接模式 + 自动回退
message_bus.py hybrid 模式:DB 审计 + ZMQ 实时投递
pyproject.toml 添加 pyzmq>=26.0.0msgpack>=1.0.0
test_e2e.py 修复 networkidle 超时(SSE/WS 长连接导致),改用 domcontentloaded

Test plan

  • 26 个 ZMQ 单元测试全部通过
  • 17 个 ZMQ 集成测试全部通过
  • 553 个现有回归测试全部通过(无破坏性变更)
  • 17/17 E2E 非 LLM 测试通过(networkidle 修复后)
  • 手动验证:ChatPage 流式对话通过 ZMQ 正常工作
  • 手动验证:SubAgent 派遣任务通过 ZMQ 正常工作
  • 手动验证:多 session 并发对话互不干扰

🤖 Generated with Claude Code

引入 ZMQ inproc 消息总线,将 Agent 从一次性请求处理改为常驻 daemon 进程,
支持 DEALER/ROUTER 点对点通信和 PUB/SUB 事件广播。

新增模块 (backend/agentpal/zmq_bus/):
- protocol.py: Envelope 消息信封 + MessageType 枚举 + msgpack 序列化
- daemon.py: AgentDaemon 基类 (DEALER + PUB + asyncio.Queue FIFO)
- pa_daemon.py: PersonalAssistantDaemon (per-session 常驻)
- sub_daemon.py: SubAgentDaemon (按需创建,任务完成后保活)
- cron_daemon.py: CronDaemon (全局单例,定时任务结果通知)
- manager.py: AgentDaemonManager (ROUTER 路由 + XPUB/XSUB 事件代理 + 生命周期管理)
- event_subscriber.py: EventSubscriber (SUB socket async iterator,SSE 桥接)

修改:
- main.py: lifespan 中启动/停止 AgentDaemonManager
- config.py: 添加 zmq_router_addr/zmq_events_addr 等配置项
- agent.py: /chat 和 /dispatch 端点支持 ZMQ 桥接模式(自动回退)
- message_bus.py: hybrid 模式 — DB 审计 + ZMQ 实时投递
- pyproject.toml: 添加 pyzmq>=26.0.0, msgpack>=1.0.0
- test_e2e.py: 修复 networkidle 超时(改用 domcontentloaded)

测试: 43 个 ZMQ 专项测试 (26 unit + 17 integration) 全部通过

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@robscc robscc merged commit f9e29ea into main Mar 19, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant