From b7c23c2b01083c317d93b87fe2beca6752313151 Mon Sep 17 00:00:00 2001 From: ttiee <469784630@qq.com> Date: Wed, 18 Mar 2026 10:37:12 +0800 Subject: [PATCH 1/7] =?UTF-8?q?=F0=9F=90=9B=20fix(telegram):=20=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E5=AD=90=20agent=20=E6=94=B6=E5=8F=A3=E5=B9=B6?= =?UTF-8?q?=E8=A1=A5=E5=85=A8=E7=8A=B6=E6=80=81=E9=9D=A2=E6=9D=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修正 native 主 agent 在 delta-only 场景下的最终文本 fallback,避免子 agent 出错后主流程看起来提前结束。 调整 Telegram 主进度面板在无最终文本时的收口文案,不再误报为已正常完成。 补全 /status 面板、回调处理和命令描述,展示额度窗口、使用比例、剩余比例与刷新时间。 补充 native、service、telegram、commands 层回归测试,并更新相关维护记录。 --- ...2026-03-17-status-rate-limit-visibility.md | 74 ++++ ...2026-03-17-telegram-subagent-visibility.md | 19 + src/nonebot_plugin_codex/__init__.py | 10 + src/nonebot_plugin_codex/native_client.py | 61 ++- src/nonebot_plugin_codex/service.py | 149 ++++++- src/nonebot_plugin_codex/telegram.py | 103 ++++- src/nonebot_plugin_codex/telegram_commands.py | 2 +- tests/test_native_client.py | 372 ++++++++++++++++++ tests/test_service.py | 141 ++++++- tests/test_telegram_commands.py | 2 +- tests/test_telegram_handlers.py | 170 +++++++- 11 files changed, 1089 insertions(+), 14 deletions(-) create mode 100644 docs/maintenance/2026-03-17-status-rate-limit-visibility.md diff --git a/docs/maintenance/2026-03-17-status-rate-limit-visibility.md b/docs/maintenance/2026-03-17-status-rate-limit-visibility.md new file mode 100644 index 0000000..a0c10f9 --- /dev/null +++ b/docs/maintenance/2026-03-17-status-rate-limit-visibility.md @@ -0,0 +1,74 @@ +# /status Rate Limit Visibility + +## Feature Summary + +Enhance the Telegram `/status` command so it shows the active half-day usage window, current usage percentage, remaining percentage, and the refresh time or countdown. + +## Problem Background + +Current behavior: + +1. Open a Telegram chat that uses `nonebot-plugin-codex`. +2. Send `/status`. +3. The plugin opens the generic workspace panel only. + +Current gaps: + +- no current morning or afternoon usage state +- no usage percentage for the active quota window +- no remaining percentage +- no refresh time or countdown +- no graceful status text when rate-limit data is temporarily unavailable + +Actual behavior today is that `/status` is effectively an alias of `/panel`. The panel shows chat preferences, workdir, session state, and recent history, but it does not expose quota information. + +## Proposal + +- Keep `/status` as the Telegram entrypoint for operational state. +- Extend the existing workspace or status rendering path with a dedicated rate-limit section. +- Prefer official Codex account rate-limit data from the `codex app-server` lane when available. +- Display percentage-based data and refresh timing, not guessed absolute credits. +- Show morning or afternoon wording based on the active local window. +- Fall back to explicit unavailable text when upstream rate-limit data cannot be fetched. + +Expected user-visible result: + +- current morning or afternoon status +- used percentage +- remaining percentage +- reset time +- human-readable time until refresh + +## Alternatives + +- Infer quota state only from local session token-usage logs. + - This is insufficient because local usage does not equal account-level remaining quota or refresh timing. +- Keep `/status` unchanged and add a separate quota command. + - This is possible, but weaker for discoverability because users already expect `/status` to answer this question. + +## Scope And Constraints + +- Preserve command compatibility for `/status` and `/panel` unless a deliberate behavior change is documented. +- Do not silently change documented config semantics. +- Keep the implementation small and reviewable. +- Follow TDD for behavior changes. +- If upstream rate-limit data is unavailable, degrade cleanly instead of estimating. + +Affected files or commands likely include: + +- `src/nonebot_plugin_codex/service.py` +- `src/nonebot_plugin_codex/telegram.py` +- `src/nonebot_plugin_codex/native_client.py` +- `tests/test_service.py` +- `tests/test_telegram_handlers.py` +- `/status` +- `/panel` +- `codex app-server` + +## Verification Plan + +- Add service-level tests covering status rendering with and without rate-limit data. +- Add Telegram handler tests confirming `/status` shows the enriched status panel. +- Run `pdm run pytest tests/test_service.py tests/test_telegram_handlers.py -q`. +- Run `pdm run pytest -q`. +- Run `pdm run ruff check .`. diff --git a/docs/maintenance/2026-03-17-telegram-subagent-visibility.md b/docs/maintenance/2026-03-17-telegram-subagent-visibility.md index 8ecdf0f..9e6491e 100644 --- a/docs/maintenance/2026-03-17-telegram-subagent-visibility.md +++ b/docs/maintenance/2026-03-17-telegram-subagent-visibility.md @@ -32,6 +32,17 @@ When the plugin uses the native `codex app-server` lane and Codex delegates work - The native client forwarded all `agentMessage` deltas and completed texts without checking `phase`. - Commentary text could therefore appear in the stream/final reply path. - Collaboration tool calls were ignored, so the Telegram progress panel lacked main-agent/subagent context. +- In follow-up testing, main-agent final text could still be lost when it only existed in + `item/agentMessage/delta` frames, because the native fallback looked up the wrong + buffered key on `turn/completed`. +- In successful multi-agent turns where the main thread produced no separate final-answer + message after `wait`, the bridge also discarded: + - subagent `agentMessage` items with `phase: "final_answer"` + - completed `wait.agentsStates[*].message` payloads + This left Telegram with a finished run but an empty `final_text`. +- When that happened, Telegram could still finalize the main progress panel as + `Codex 已完成。`, then separately send `Codex 已完成,但没有返回可展示的最终文本。`, + which made successful-looking runs appear to stop right after a subagent failure. ## Affected Modules @@ -48,3 +59,11 @@ When the plugin uses the native `codex app-server` lane and Codex delegates work - final-answer `agentMessage` - Confirm that only the final answer reaches `on_stream_text`. - Confirm that progress updates mention both the main agent and the subagent state. +- Add a native-client regression where a subagent reports `errored` but the main agent + still produces a final answer through delta-only fallback. +- Add native-client regressions where: + - only a subagent `final_answer` exists before `turn/completed` + - only `wait.agentsStates[*].message` exists before `turn/completed` + and confirm the bridge still returns a non-empty `final_text`. +- Confirm that Telegram uses `Codex 已完成,但没有返回可展示的最终文本。` for the main + progress panel instead of a plain `Codex 已完成。` when no final text is available. diff --git a/src/nonebot_plugin_codex/__init__.py b/src/nonebot_plugin_codex/__init__.py index 81518fa..73359d8 100644 --- a/src/nonebot_plugin_codex/__init__.py +++ b/src/nonebot_plugin_codex/__init__.py @@ -134,6 +134,12 @@ async def _sync_telegram_commands(bot: Bot) -> None: block=True, rule=handlers.is_workspace_callback, ) + status_callback = on_type( + CallbackQueryEvent, + priority=10, + block=True, + rule=handlers.is_status_callback, + ) @codex_cmd.handle() async def _handle_codex( @@ -247,6 +253,10 @@ async def _handle_workspace_callback( ) -> None: await handlers.handle_workspace_callback(bot, event) + @status_callback.handle() + async def _handle_status_callback(bot: Bot, event: CallbackQueryEvent) -> None: + await handlers.handle_status_callback(bot, event) + @follow_up.handle() async def _handle_follow_up(bot: Bot, event: MessageEvent) -> None: await handlers.handle_follow_up(bot, event) diff --git a/src/nonebot_plugin_codex/native_client.py b/src/nonebot_plugin_codex/native_client.py index 4ac1911..69e091b 100644 --- a/src/nonebot_plugin_codex/native_client.py +++ b/src/nonebot_plugin_codex/native_client.py @@ -212,6 +212,40 @@ def _format_collab_tool_progress( return updates +def _latest_completed_subagent_status_message( + item: dict[str, Any], + *, + main_thread_id: str, +) -> str: + receiver_ids = item.get("receiverThreadIds") + agent_states = item.get("agentsStates") + if not isinstance(agent_states, dict): + return "" + + ordered_ids: list[str] = [] + if isinstance(receiver_ids, list): + for entry in receiver_ids: + if isinstance(entry, str) and entry and entry not in ordered_ids: + ordered_ids.append(entry) + for entry in agent_states: + if isinstance(entry, str) and entry and entry not in ordered_ids: + ordered_ids.append(entry) + + latest_message = "" + for agent_id in ordered_ids: + if _normalize_agent_key(agent_id, main_thread_id=main_thread_id) == "main": + continue + state = agent_states.get(agent_id) + if not isinstance(state, dict): + continue + if str(state.get("status") or "") != "completed": + continue + message = state.get("message") + if isinstance(message, str) and message.strip(): + latest_message = message.strip() + return latest_message + + async def _terminate_process(process: Any, timeout: float) -> None: if process is None: return @@ -324,6 +358,8 @@ async def run_turn( ) -> NativeRunResult: diagnostics: list[str] = [] final_text = "" + last_subagent_final_text = "" + last_completed_subagent_status_message = "" pending_agent_messages: dict[str, str] = {} last_streamed_text: dict[str, str] = {} last_compaction_notice: dict[str, str] = {} @@ -396,6 +432,13 @@ async def emit_compaction_notice(agent_key: str, text: str) -> None: ) continue if item_type == "collabAgentToolCall": + if method == "item/completed": + latest_message = _latest_completed_subagent_status_message( + item, + main_thread_id=thread_id, + ) + if latest_message: + last_completed_subagent_status_message = latest_message collab_updates = _format_collab_tool_progress( item, main_thread_id=thread_id, @@ -421,8 +464,11 @@ async def emit_compaction_notice(agent_key: str, text: str) -> None: phase = item.get("phase") stripped = text.strip() await emit_stream_update(agent_key, stripped) - if phase != "commentary" and agent_key == "main": - final_text = stripped + if phase != "commentary": + if agent_key == "main": + final_text = stripped + else: + last_subagent_final_text = stripped continue if method == "item/agentMessage/delta": @@ -470,7 +516,7 @@ async def emit_compaction_notice(agent_key: str, text: str) -> None: ( key for key in reversed(list(pending_agent_messages)) - if key.endswith(":main") or key == "__legacy__:main" + if key.startswith("main:") or key == "__legacy__:main" ), None, ) @@ -479,6 +525,15 @@ async def emit_compaction_notice(agent_key: str, text: str) -> None: if buffered_text: final_text = buffered_text await emit_stream_update("main", final_text) + if not final_text and last_subagent_final_text: + final_text = last_subagent_final_text + await emit_stream_update("main", final_text) + if ( + not final_text + and last_completed_subagent_status_message + ): + final_text = last_completed_subagent_status_message + await emit_stream_update("main", final_text) status = turn.get("status") error = turn.get("error") exit_code = 0 if status == "completed" and error is None else 1 diff --git a/src/nonebot_plugin_codex/service.py b/src/nonebot_plugin_codex/service.py index 0c57fd9..e16f743 100644 --- a/src/nonebot_plugin_codex/service.py +++ b/src/nonebot_plugin_codex/service.py @@ -10,7 +10,7 @@ except ModuleNotFoundError: # pragma: no cover - Python < 3.11 import tomli as tomllib from pathlib import Path -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from typing import Any from collections.abc import Callable, Awaitable from dataclasses import field, asdict, dataclass @@ -50,6 +50,8 @@ class AgentPanelUpdate: ONBOARDING_STALE_MESSAGE = "引导面板已失效,请重新执行 /codex" WORKSPACE_CALLBACK_PREFIX = "cwp" WORKSPACE_STALE_MESSAGE = "工作台面板已失效,请重新执行 /panel" +STATUS_CALLBACK_PREFIX = "cst" +STATUS_STALE_MESSAGE = "状态面板已失效,请重新执行 /status" @dataclass(slots=True) @@ -238,6 +240,14 @@ class WorkspacePanelState: message_id: int | None = None +@dataclass(slots=True) +class StatusPanelState: + chat_key: str + token: str + version: int + message_id: int | None = None + + def build_chat_key(chat_type: str, chat_id: int) -> str: if chat_type == "private": return f"private_{chat_id}" @@ -405,6 +415,22 @@ def decode_workspace_callback(payload: str) -> tuple[str, int, str]: return token, version, parts[3] +def encode_status_callback(token: str, version: int, action: str) -> str: + return f"{STATUS_CALLBACK_PREFIX}:{token}:{version}:{action}" + + +def decode_status_callback(payload: str) -> tuple[str, int, str]: + parts = payload.split(":") + if len(parts) != 4 or parts[0] != STATUS_CALLBACK_PREFIX: + raise ValueError("无效的状态回调。") + token = parts[1] + try: + version = int(parts[2]) + except ValueError as exc: + raise ValueError("无效的状态回调。") from exc + return token, version, parts[3] + + def parse_event_line(line: str) -> dict[str, Any] | None: try: payload = json.loads(line) @@ -647,6 +673,7 @@ def __init__( self.setting_panels: dict[str, SettingPanelState] = {} self.onboarding_panels: dict[str, OnboardingPanelState] = {} self.workspace_panels: dict[str, WorkspacePanelState] = {} + self.status_panels: dict[str, StatusPanelState] = {} self._native_history_entries: list[HistoricalSessionSummary] = [] self._native_history_loaded = False self._history_log_cache: dict[str, HistoryLogCacheEntry] = {} @@ -1849,10 +1876,29 @@ def _replace_workspace_panel_state( self.workspace_panels[chat_key] = state return state + def _replace_status_panel_state( + self, + chat_key: str, + *, + previous: StatusPanelState | None = None, + ) -> StatusPanelState: + state = StatusPanelState( + chat_key=chat_key, + token=previous.token if previous else self._make_browser_token(), + version=(previous.version + 1) if previous else 1, + message_id=previous.message_id if previous else None, + ) + self.status_panels[chat_key] = state + return state + def open_workspace_panel(self, chat_key: str) -> WorkspacePanelState: self.get_preferences(chat_key) return self._replace_workspace_panel_state(chat_key) + def open_status_panel(self, chat_key: str) -> StatusPanelState: + self.get_preferences(chat_key) + return self._replace_status_panel_state(chat_key) + def get_workspace_panel( self, chat_key: str, @@ -1879,6 +1925,36 @@ def remember_workspace_panel_message( panel = self.get_workspace_panel(chat_key, token=token) panel.message_id = message_id + def get_status_panel( + self, + chat_key: str, + token: str | None = None, + version: int | None = None, + ) -> StatusPanelState: + state = self.status_panels.get(chat_key) + if state is None: + raise ValueError(STATUS_STALE_MESSAGE) + if token is not None and state.token != token: + raise ValueError(STATUS_STALE_MESSAGE) + if version is not None and state.version != version: + raise ValueError(STATUS_STALE_MESSAGE) + return state + + def remember_status_panel_message( + self, + chat_key: str, + token: str, + message_id: int | None, + ) -> None: + if message_id is None: + return + panel = self.get_status_panel(chat_key, token=token) + panel.message_id = message_id + + def close_status_panel(self, chat_key: str, token: str, version: int) -> None: + self.get_status_panel(chat_key, token=token, version=version) + self.status_panels.pop(chat_key, None) + def close_workspace_panel(self, chat_key: str, token: str, version: int) -> None: self.get_workspace_panel(chat_key, token=token, version=version) self.workspace_panels.pop(chat_key, None) @@ -2022,6 +2098,77 @@ def render_workspace_panel( ] return "\n".join(lines), InlineKeyboardMarkup(inline_keyboard=keyboard) + def navigate_status_panel( + self, + chat_key: str, + token: str, + version: int, + action: str, + ) -> StatusPanelState: + panel = self.get_status_panel(chat_key, token=token, version=version) + if action != "refresh": + raise ValueError("未知状态面板操作。") + return self._replace_status_panel_state(chat_key, previous=panel) + + async def render_status_panel( + self, chat_key: str + ) -> tuple[str, InlineKeyboardMarkup]: + panel = self.get_status_panel(chat_key) + now = datetime.now().astimezone() + current_window = "上午窗口" if now.hour < 12 else "下午窗口" + lines = ["当前额度状态", f"当前窗口:{current_window}"] + + runner = self._spawn_native_client() + try: + if runner is None: + raise RuntimeError("Native Codex client is not configured.") + limits = await runner.read_rate_limits() + primary = limits.get("primary") + if not isinstance(primary, dict): + raise RuntimeError("额度状态响应缺少 primary 字段。") + + used_percent = int(primary.get("usedPercent") or 0) + remaining_percent = max(0, 100 - used_percent) + resets_at = int(primary.get("resetsAt") or 0) + if resets_at > 0: + reset_time = datetime.fromtimestamp(resets_at, tz=now.tzinfo) + else: + reset_time = now + delta = max(reset_time - now, timedelta()) + hours, remainder = divmod(int(delta.total_seconds()), 3600) + minutes = remainder // 60 + + lines.extend( + [ + f"已使用:{used_percent}%", + f"剩余:{remaining_percent}%", + f"刷新时间:{reset_time.strftime('%Y-%m-%d %H:%M:%S %z')}", + f"距离刷新:{hours}小时{minutes}分钟", + ] + ) + except Exception as exc: + lines.extend(["额度状态:暂不可用", str(exc) or "未知错误。"]) + finally: + await self._close_native_runner(runner) + + keyboard = [ + [ + InlineKeyboardButton( + text="刷新", + callback_data=encode_status_callback( + panel.token, panel.version, "refresh" + ), + ), + InlineKeyboardButton( + text="关闭", + callback_data=encode_status_callback( + panel.token, panel.version, "close" + ), + ), + ] + ] + return "\n".join(lines), InlineKeyboardMarkup(inline_keyboard=keyboard) + def render_onboarding_panel( self, chat_key: str ) -> tuple[str, InlineKeyboardMarkup]: diff --git a/src/nonebot_plugin_codex/telegram.py b/src/nonebot_plugin_codex/telegram.py index 50d0b99..1ff89f0 100644 --- a/src/nonebot_plugin_codex/telegram.py +++ b/src/nonebot_plugin_codex/telegram.py @@ -20,6 +20,8 @@ ONBOARDING_CALLBACK_PREFIX, SETTING_STALE_MESSAGE, SETTING_CALLBACK_PREFIX, + STATUS_STALE_MESSAGE, + STATUS_CALLBACK_PREFIX, WORKSPACE_STALE_MESSAGE, WORKSPACE_CALLBACK_PREFIX, AgentPanelUpdate, @@ -27,6 +29,7 @@ chunk_text, build_chat_key, decode_onboarding_callback, + decode_status_callback, decode_workspace_callback, format_result_text, decode_browser_callback, @@ -465,7 +468,15 @@ async def on_stream_text(update: AgentPanelUpdate) -> None: ) return - status = "Codex 已完成。" if result.exit_code == 0 else "Codex 执行失败。" + if result.exit_code == 0: + if result.final_text: + status = "Codex 已完成。" + elif result.notice: + status = "Codex 已完成。" + else: + status = "Codex 已完成,但没有返回可展示的最终文本。" + else: + status = "Codex 执行失败。" for panel in ( session.agent_panels[agent_key] for agent_key in session.agent_order @@ -532,6 +543,11 @@ async def is_workspace_callback(self, event: CallbackQueryEvent) -> bool: f"{WORKSPACE_CALLBACK_PREFIX}:" ) + async def is_status_callback(self, event: CallbackQueryEvent) -> bool: + return isinstance(event.data, str) and event.data.startswith( + f"{STATUS_CALLBACK_PREFIX}:" + ) + def callback_message_id(self, event: CallbackQueryEvent) -> int | None: message = getattr(event, "message", None) return getattr(message, "message_id", None) @@ -636,6 +652,18 @@ async def send_workspace_panel( getattr(message, "message_id", None), ) + async def send_status_panel( + self, bot: Bot, event: MessageEvent, chat_key: str + ) -> None: + panel = self.service.open_status_panel(chat_key) + text, markup = await self.service.render_status_panel(chat_key) + message = await self.send_event_message(bot, event, text, reply_markup=markup) + self.service.remember_status_panel_message( + chat_key, + panel.token, + getattr(message, "message_id", None), + ) + async def edit_or_resend_browser( self, bot: Bot, @@ -770,6 +798,37 @@ async def edit_or_resend_workspace_panel( getattr(message, "message_id", None), ) + async def edit_or_resend_status_panel( + self, + bot: Bot, + event: CallbackQueryEvent, + chat_key: str, + ) -> None: + panel = self.service.get_status_panel(chat_key) + text, markup = await self.service.render_status_panel(chat_key) + message_id = self.callback_message_id(event) or panel.message_id + chat_id = self.event_chat(event).id + try: + if message_id is None: + raise ValueError("missing message id") + await self.edit_message( + bot, + chat_id=chat_id, + message_id=message_id, + text=text, + reply_markup=markup, + ) + self.service.remember_status_panel_message(chat_key, panel.token, message_id) + except Exception: + message = await self.send_chat_message( + bot, chat_id, text, reply_markup=markup + ) + self.service.remember_status_panel_message( + chat_key, + panel.token, + getattr(message, "message_id", None), + ) + async def handle_codex(self, bot: Bot, event: MessageEvent, args: Message) -> None: chat_key = self.chat_key(event) session = self.service.activate_chat(chat_key) @@ -796,7 +855,7 @@ async def handle_panel(self, bot: Bot, event: MessageEvent) -> None: await self.send_workspace_panel(bot, event, self.chat_key(event)) async def handle_status(self, bot: Bot, event: MessageEvent) -> None: - await self.send_workspace_panel(bot, event, self.chat_key(event)) + await self.send_status_panel(bot, event, self.chat_key(event)) async def handle_mode(self, bot: Bot, event: MessageEvent, args: Message) -> None: chat_key = self.chat_key(event) @@ -1199,6 +1258,46 @@ async def handle_workspace_callback( event.id, text=self.error_text(exc), show_alert=True ) + async def handle_status_callback(self, bot: Bot, event: CallbackQueryEvent) -> None: + if not isinstance(event.data, str): + await bot.answer_callback_query( + event.id, text=STATUS_STALE_MESSAGE, show_alert=True + ) + return + + try: + chat_key = self.chat_key(event) + chat_id = self.event_chat(event).id + token, version, action = decode_status_callback(event.data) + self.service.get_status_panel(chat_key, token=token, version=version) + if action == "close": + self.service.close_status_panel(chat_key, token, version) + message_id = self.callback_message_id(event) + if message_id is not None: + await self.edit_message( + bot, + chat_id=chat_id, + message_id=message_id, + text="状态面板已关闭。", + reply_markup=None, + ) + await bot.answer_callback_query(event.id, text="已关闭。") + return + self.service.navigate_status_panel(chat_key, token, version, action) + await self.edit_or_resend_status_panel(bot, event, chat_key) + await bot.answer_callback_query(event.id) + except ValueError as exc: + text = str(exc) or STATUS_STALE_MESSAGE + await bot.answer_callback_query( + event.id, + text=text, + show_alert=text == STATUS_STALE_MESSAGE, + ) + except RuntimeError as exc: + await bot.answer_callback_query( + event.id, text=self.error_text(exc), show_alert=True + ) + async def handle_follow_up(self, bot: Bot, event: MessageEvent) -> None: chat_key = self.chat_key(event) session = self.service.get_session(chat_key) diff --git a/src/nonebot_plugin_codex/telegram_commands.py b/src/nonebot_plugin_codex/telegram_commands.py index 0d0a39a..a9d1327 100644 --- a/src/nonebot_plugin_codex/telegram_commands.py +++ b/src/nonebot_plugin_codex/telegram_commands.py @@ -35,7 +35,7 @@ class TelegramCommandSpec: ), TelegramCommandSpec( name="status", - description="打开当前工作台", + description="打开额度状态面板", usage="/status", ), TelegramCommandSpec( diff --git a/tests/test_native_client.py b/tests/test_native_client.py index b6359c8..b4ecd1f 100644 --- a/tests/test_native_client.py +++ b/tests/test_native_client.py @@ -609,6 +609,378 @@ async def launcher(*_args: Any, **_kwargs: Any) -> FakeProcess: assert streamed[-1].text == "main final" +@pytest.mark.asyncio +async def test_native_client_uses_main_delta_fallback_for_final_text() -> None: + process = FakeProcess( + stdout=FakeStdout( + [ + json.dumps({"jsonrpc": "2.0", "id": 1, "result": {}}) + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "id": 2, + "result": { + "thread": { + "id": "thread-1", + "name": "Thread One", + "updatedAt": "2025-03-01T00:00:00Z", + "cwd": "/tmp/work", + "source": "cli", + } + }, + } + ) + + "\n", + json.dumps({"jsonrpc": "2.0", "id": 3, "result": {}}) + "\n", + json.dumps({"jsonrpc": "2.0", "method": "turn/started", "params": {}}) + + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "item/agentMessage/delta", + "params": { + "threadId": "thread-1", + "itemId": "msg-main-final", + "delta": "main final only from delta", + }, + } + ) + + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "turn/completed", + "params": { + "threadId": "thread-1", + "turn": {"status": "completed", "error": None}, + }, + } + ) + + "\n", + ] + ), + stdin=FakeStdin(), + ) + + async def launcher(*_args: Any, **_kwargs: Any) -> FakeProcess: + return process + + client = NativeCodexClient(binary="codex", launcher=launcher) + + thread = await client.start_thread( + workdir="/tmp/work", + model="gpt-5", + reasoning_effort="xhigh", + permission_mode="safe", + ) + result = await client.run_turn(thread.thread_id, "hello") + + assert result.exit_code == 0 + assert result.final_text == "main final only from delta" + + +@pytest.mark.asyncio +async def test_native_client_keeps_main_final_text_after_subagent_error() -> None: + process = FakeProcess( + stdout=FakeStdout( + [ + json.dumps({"jsonrpc": "2.0", "id": 1, "result": {}}) + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "id": 2, + "result": { + "thread": { + "id": "thread-main", + "name": "Main Thread", + "updatedAt": "2025-03-01T00:00:00Z", + "cwd": "/tmp/work", + "source": "cli", + } + }, + } + ) + + "\n", + json.dumps({"jsonrpc": "2.0", "id": 3, "result": {}}) + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "turn/started", + "params": {"threadId": "thread-main"}, + } + ) + + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "item/completed", + "params": { + "threadId": "thread-main", + "item": { + "id": "collab-1", + "type": "collabAgentToolCall", + "tool": "wait", + "status": "completed", + "senderThreadId": "thread-main", + "receiverThreadIds": ["thread-sub-1"], + "agentsStates": { + "thread-sub-1": { + "status": "errored", + "message": "tests failed", + } + }, + }, + }, + } + ) + + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "item/agentMessage/delta", + "params": { + "threadId": "thread-main", + "itemId": "msg-main-final", + "delta": "main recovered after child failure", + }, + } + ) + + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "turn/completed", + "params": { + "threadId": "thread-main", + "turn": {"status": "completed", "error": None}, + }, + } + ) + + "\n", + ] + ), + stdin=FakeStdin(), + ) + + async def launcher(*_args: Any, **_kwargs: Any) -> FakeProcess: + return process + + client = NativeCodexClient(binary="codex", launcher=launcher) + progress: list[Any] = [] + + thread = await client.start_thread( + workdir="/tmp/work", + model="gpt-5", + reasoning_effort="xhigh", + permission_mode="safe", + ) + result = await client.run_turn( + thread.thread_id, + "hello", + on_progress=progress.append, + ) + + assert ("thread-sub-1", "出错(tests failed)") in [ + (entry.agent_key, entry.text) for entry in progress + ] + assert result.exit_code == 0 + assert result.final_text == "main recovered after child failure" + + +@pytest.mark.asyncio +async def test_native_client_uses_subagent_final_answer_when_main_turn_has_no_final_text( +) -> None: + process = FakeProcess( + stdout=FakeStdout( + [ + json.dumps({"jsonrpc": "2.0", "id": 1, "result": {}}) + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "id": 2, + "result": { + "thread": { + "id": "thread-main", + "name": "Main Thread", + "updatedAt": "2025-03-01T00:00:00Z", + "cwd": "/tmp/work", + "source": "cli", + } + }, + } + ) + + "\n", + json.dumps({"jsonrpc": "2.0", "id": 3, "result": {}}) + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "turn/started", + "params": {"threadId": "thread-main"}, + } + ) + + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "item/completed", + "params": { + "threadId": "thread-sub-1", + "item": { + "id": "msg-sub-final", + "type": "agentMessage", + "text": "subagent final answer", + "phase": "final_answer", + }, + }, + } + ) + + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "item/completed", + "params": { + "threadId": "thread-main", + "item": { + "id": "collab-1", + "type": "collabAgentToolCall", + "tool": "wait", + "status": "completed", + "senderThreadId": "thread-main", + "receiverThreadIds": ["thread-sub-1"], + "agentsStates": { + "thread-sub-1": { + "status": "completed", + "message": "subagent final answer", + } + }, + }, + }, + } + ) + + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "turn/completed", + "params": { + "threadId": "thread-main", + "turn": {"status": "completed", "error": None}, + }, + } + ) + + "\n", + ] + ), + stdin=FakeStdin(), + ) + + async def launcher(*_args: Any, **_kwargs: Any) -> FakeProcess: + return process + + client = NativeCodexClient(binary="codex", launcher=launcher) + + thread = await client.start_thread( + workdir="/tmp/work", + model="gpt-5", + reasoning_effort="xhigh", + permission_mode="safe", + ) + result = await client.run_turn(thread.thread_id, "hello") + + assert result.exit_code == 0 + assert result.final_text == "subagent final answer" + + +@pytest.mark.asyncio +async def test_native_client_uses_completed_wait_message_when_no_agent_final_text_exists( +) -> None: + process = FakeProcess( + stdout=FakeStdout( + [ + json.dumps({"jsonrpc": "2.0", "id": 1, "result": {}}) + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "id": 2, + "result": { + "thread": { + "id": "thread-main", + "name": "Main Thread", + "updatedAt": "2025-03-01T00:00:00Z", + "cwd": "/tmp/work", + "source": "cli", + } + }, + } + ) + + "\n", + json.dumps({"jsonrpc": "2.0", "id": 3, "result": {}}) + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "turn/started", + "params": {"threadId": "thread-main"}, + } + ) + + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "item/completed", + "params": { + "threadId": "thread-main", + "item": { + "id": "collab-1", + "type": "collabAgentToolCall", + "tool": "wait", + "status": "completed", + "senderThreadId": "thread-main", + "receiverThreadIds": ["thread-sub-1"], + "agentsStates": { + "thread-sub-1": { + "status": "completed", + "message": "tests ready", + } + }, + }, + }, + } + ) + + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "turn/completed", + "params": { + "threadId": "thread-main", + "turn": {"status": "completed", "error": None}, + }, + } + ) + + "\n", + ] + ), + stdin=FakeStdin(), + ) + + async def launcher(*_args: Any, **_kwargs: Any) -> FakeProcess: + return process + + client = NativeCodexClient(binary="codex", launcher=launcher) + + thread = await client.start_thread( + workdir="/tmp/work", + model="gpt-5", + reasoning_effort="xhigh", + permission_mode="safe", + ) + result = await client.run_turn(thread.thread_id, "hello") + + assert result.exit_code == 0 + assert result.final_text == "tests ready" + + @pytest.mark.asyncio async def test_native_client_reads_large_stdout_frame_without_readline_limit( tmp_path: Path, diff --git a/tests/test_service.py b/tests/test_service.py index a38f41e..6f2e353 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -2,6 +2,7 @@ import asyncio import json +from datetime import datetime, timedelta from pathlib import Path import sys @@ -18,15 +19,30 @@ class DummyNativeClient: - def __init__(self, threads: list[NativeThreadSummary] | None = None) -> None: + def __init__( + self, + threads: list[NativeThreadSummary] | None = None, + *, + rate_limits: dict[str, object] | None = None, + rate_limit_error: Exception | None = None, + ) -> None: self._threads = threads or [] self.compact_calls: list[str] = [] self.compact_notice = "已压缩当前 resume 会话上下文。" self.resume_calls: list[str] = [] self.require_resume_before_compact = False + self.rate_limits = rate_limits + self.rate_limit_error = rate_limit_error def clone(self) -> DummyNativeClient: - return self + clone = DummyNativeClient( + list(self._threads), + rate_limits=self.rate_limits, + rate_limit_error=self.rate_limit_error, + ) + clone.compact_notice = self.compact_notice + clone.require_resume_before_compact = self.require_resume_before_compact + return clone async def close(self, timeout: float = 5.0) -> None: return None @@ -58,12 +74,21 @@ async def compact_thread(self, thread_id: str) -> str: self.compact_calls.append(thread_id) return self.compact_notice + async def read_rate_limits(self) -> dict[str, object]: + if self.rate_limit_error is not None: + raise self.rate_limit_error + if self.rate_limits is None: + raise RuntimeError("rate limits unavailable") + return self.rate_limits + def make_service( tmp_path: Path, model_cache_file: Path, *, threads: list[NativeThreadSummary] | None = None, + rate_limits: dict[str, object] | None = None, + rate_limit_error: Exception | None = None, launcher=None, stream_read_limit: int = 1024 * 1024, ) -> CodexBridgeService: @@ -84,7 +109,11 @@ def make_service( archived_sessions_dir=tmp_path / ".codex" / "archived_sessions", ), launcher=launcher, - native_client=DummyNativeClient(threads), + native_client=DummyNativeClient( + threads, + rate_limits=rate_limits, + rate_limit_error=rate_limit_error, + ), which_resolver=lambda _: "/usr/bin/codex", ) @@ -226,6 +255,112 @@ def test_default_preferences_use_codex_config_when_model_cache_is_missing( assert preferences.workdir == str(tmp_path.resolve()) +@pytest.mark.asyncio +async def test_render_status_panel_shows_rate_limit_summary( + tmp_path: Path, + model_cache_file: Path, +) -> None: + primary_resets_at = int( + (datetime.now().astimezone() + timedelta(hours=2, minutes=5)).timestamp() + ) + secondary_resets_at = int( + (datetime.now().astimezone() + timedelta(days=3, hours=4)).timestamp() + ) + service = make_service( + tmp_path, + model_cache_file, + rate_limits={ + "limitId": "codex", + "primary": { + "usedPercent": 48, + "windowDurationMins": 300, + "resetsAt": primary_resets_at, + }, + "secondary": { + "usedPercent": 38, + "windowDurationMins": 10080, + "resetsAt": secondary_resets_at, + }, + }, + ) + session = service.get_session("private_1") + session.context_used_tokens = 12345 + session.context_window_tokens = 200000 + + panel = service.open_status_panel("private_1") + text, markup = await service.render_status_panel("private_1") + primary_reset_text = service._format_status_reset_time(primary_resets_at) # noqa: SLF001 + secondary_reset_text = service._format_status_reset_time(secondary_resets_at) # noqa: SLF001 + + assert "当前额度状态" in text + assert "上午窗口" not in text + assert "下午窗口" not in text + assert "上下文:12,345 / 200,000 tokens" in text + assert "5小时:剩余 52%" in text + assert f"5小时刷新时间:{primary_reset_text}" in text + assert "1周:剩余 62%" in text + assert f"1周刷新时间:{secondary_reset_text}" in text + assert markup.inline_keyboard[0][0].text == "刷新" + assert markup.inline_keyboard[0][0].callback_data == ( + f"cst:{panel.token}:{panel.version}:refresh" + ) + assert markup.inline_keyboard[0][1].text == "关闭" + + +@pytest.mark.asyncio +async def test_render_status_panel_degrades_cleanly_when_rate_limits_unavailable( + tmp_path: Path, + model_cache_file: Path, +) -> None: + service = make_service( + tmp_path, + model_cache_file, + rate_limit_error=RuntimeError("Codex app-server 请求失败。"), + ) + + service.open_status_panel("private_1") + text, _markup = await service.render_status_panel("private_1") + + assert "当前额度状态" in text + assert "上下文:暂不可用" in text + assert "额度状态:暂不可用" in text + assert "Codex app-server 请求失败。" in text + + +@pytest.mark.asyncio +async def test_reset_chat_clears_status_context_usage( + tmp_path: Path, + model_cache_file: Path, +) -> None: + service = make_service(tmp_path, model_cache_file) + session = service.get_session("private_1") + session.context_used_tokens = 12345 + session.context_window_tokens = 200000 + + await service.reset_chat("private_1", keep_active=False) + + assert session.context_used_tokens is None + assert session.context_window_tokens is None + + +@pytest.mark.asyncio +async def test_update_workdir_clears_status_context_usage( + tmp_path: Path, + model_cache_file: Path, +) -> None: + service = make_service(tmp_path, model_cache_file) + session = service.get_session("private_1") + session.context_used_tokens = 12345 + session.context_window_tokens = 200000 + target = tmp_path / "workspace" + target.mkdir() + + await service.update_workdir("private_1", str(target)) + + assert session.context_used_tokens is None + assert session.context_window_tokens is None + + def test_chat_session_tracks_agent_panels_in_creation_order( tmp_path: Path, model_cache_file: Path, diff --git a/tests/test_telegram_commands.py b/tests/test_telegram_commands.py index ebc35a6..2e44435 100644 --- a/tests/test_telegram_commands.py +++ b/tests/test_telegram_commands.py @@ -34,7 +34,7 @@ def test_build_telegram_commands_uses_expected_order_and_chinese_descriptions() {"command": "help", "description": "打开使用引导面板"}, {"command": "start", "description": "打开使用引导面板"}, {"command": "panel", "description": "打开当前工作台"}, - {"command": "status", "description": "打开当前工作台"}, + {"command": "status", "description": "打开额度状态面板"}, {"command": "mode", "description": "查看或切换默认模式"}, {"command": "exec", "description": "以一次性 exec 模式执行任务"}, {"command": "new", "description": "新建当前聊天会话"}, diff --git a/tests/test_telegram_handlers.py b/tests/test_telegram_handlers.py index b87ce4d..c914728 100644 --- a/tests/test_telegram_handlers.py +++ b/tests/test_telegram_handlers.py @@ -16,6 +16,7 @@ encode_browser_callback, encode_history_callback, encode_setting_callback, + encode_status_callback, encode_workspace_callback, ) @@ -149,6 +150,15 @@ def __init__(self) -> None: self.onboarding_markup = SimpleNamespace(name="onboarding") self.workspace_text = "当前工作台" self.workspace_markup = SimpleNamespace(name="workspace") + self.status_text = ( + "当前额度状态\n" + "上下文:12,345 / 200,000 tokens\n" + "5小时:剩余 52%\n" + "5小时刷新时间:03-18 01:24:28\n" + "1周:剩余 62%\n" + "1周刷新时间:03-24 13:04:30" + ) + self.status_markup = SimpleNamespace(name="status") self.default_mode = "resume" self.execute_calls: list[tuple[str, str | None]] = [] self.browser_token = "token" @@ -170,6 +180,9 @@ def __init__(self) -> None: self.workspace_closed = False self.compact_calls: list[str] = [] self.compact_notice = "已压缩当前 resume 会话上下文。" + self.status_token = "status" + self.status_version = 1 + self.status_closed = False self.run_updates: list[tuple[str, Any]] = [] self.run_progress_updates: list[Any] = [] self.run_stream_updates: list[Any] = [] @@ -434,6 +447,49 @@ def navigate_workspace_panel( message_id=1, ) + def open_status_panel(self, chat_key: str) -> SimpleNamespace: + return SimpleNamespace(token=self.status_token) + + async def render_status_panel(self, chat_key: str) -> tuple[str, Any]: + return self.status_text, self.status_markup + + def remember_status_panel_message( + self, chat_key: str, token: str, message_id: int | None + ) -> None: + return None + + def get_status_panel( + self, + chat_key: str, + token: str | None = None, + version: int | None = None, + ) -> SimpleNamespace: + if token is not None and token != self.status_token: + raise ValueError("状态面板已失效,请重新执行 /status") + if version is not None and version != self.status_version: + raise ValueError("状态面板已失效,请重新执行 /status") + return SimpleNamespace( + token=self.status_token, + version=self.status_version, + message_id=1, + ) + + def navigate_status_panel( + self, + chat_key: str, + token: str, + version: int, + action: str, + ) -> SimpleNamespace: + return SimpleNamespace( + token=self.status_token, + version=self.status_version, + message_id=1, + ) + + def close_status_panel(self, chat_key: str, token: str, version: int) -> None: + self.status_closed = True + def make_real_service( tmp_path: Path, @@ -655,6 +711,52 @@ async def test_execute_prompt_keeps_separate_message_pairs_per_agent() -> None: ) +@pytest.mark.asyncio +async def test_execute_prompt_does_not_mark_main_panel_completed_without_final_text( +) -> None: + service = FakeService() + service.run_updates = [ + ( + "progress", + SimpleNamespace( + agent_key="main", + agent_label="主 agent", + text="Codex 运行中…", + ), + ), + ( + "progress", + SimpleNamespace( + agent_key="thread-sub-1", + agent_label="子 agent 1", + text="运行中(tests failed)", + ), + ), + ] + service.run_result = SimpleNamespace( + cancelled=False, + exit_code=0, + final_text="", + notice="", + diagnostics=[], + ) + handlers = TelegramHandlers(service) + bot = FakeBot() + + await handlers.execute_prompt(bot, FakeEvent(""), "hello") + + assert any( + payload["message_id"] == 1 + and payload["text"] == "🧠 主 agent\nCodex 已完成,但没有返回可展示的最终文本。" + for payload in bot.edited + ) + assert not any( + payload["message_id"] == 1 and payload["text"] == "🧠 主 agent\nCodex 已完成。" + for payload in bot.edited + ) + assert bot.sent[-1]["text"] == "Codex 已完成,但没有返回可展示的最终文本。" + + @pytest.mark.asyncio async def test_send_event_message_uses_html_parse_mode_and_renders_text() -> None: handlers = TelegramHandlers(FakeService()) @@ -751,18 +853,80 @@ async def test_handle_compact_compacts_current_resume_chat() -> None: @pytest.mark.asyncio -@pytest.mark.parametrize("handler_name", ["handle_panel", "handle_status"]) -async def test_panel_and_status_open_workspace_panel(handler_name: str) -> None: +async def test_handle_panel_opens_workspace_panel() -> None: service = FakeService() handlers = TelegramHandlers(service) bot = FakeBot() - await getattr(handlers, handler_name)(bot, FakeEvent("")) + await handlers.handle_panel(bot, FakeEvent("")) assert bot.sent[0]["text"] == "当前工作台" assert bot.sent[0]["reply_markup"] is service.workspace_markup +@pytest.mark.asyncio +async def test_handle_status_opens_status_panel() -> None: + service = FakeService() + handlers = TelegramHandlers(service) + bot = FakeBot() + + await handlers.handle_status(bot, FakeEvent("")) + + assert bot.sent[0]["text"] == service.status_text + assert bot.sent[0]["reply_markup"] is service.status_markup + + +@pytest.mark.asyncio +async def test_handle_status_callback_refresh_rerenders_panel() -> None: + service = FakeService() + handlers = TelegramHandlers(service) + bot = FakeBot() + event = FakeCallbackEvent( + encode_status_callback( + service.status_token, + service.status_version, + "refresh", + ) + ) + + await handlers.handle_status_callback(bot, event) + + assert bot.edited[0]["text"] == service.status_text + + +@pytest.mark.asyncio +async def test_handle_status_callback_close_closes_panel() -> None: + service = FakeService() + handlers = TelegramHandlers(service) + bot = FakeBot() + event = FakeCallbackEvent( + encode_status_callback( + service.status_token, + service.status_version, + "close", + ) + ) + + await handlers.handle_status_callback(bot, event) + + assert service.status_closed is True + assert bot.edited[0]["text"] == "状态面板已关闭。" + assert bot.answered[0]["text"] == "已关闭。" + + +@pytest.mark.asyncio +async def test_handle_status_callback_rejects_stale_payload() -> None: + handlers = TelegramHandlers(FakeService()) + bot = FakeBot() + + await handlers.handle_status_callback( + bot, FakeCallbackEvent("cst:stale:1:refresh") + ) + + assert bot.answered[0]["text"] == "状态面板已失效,请重新执行 /status" + assert bot.answered[0]["show_alert"] is True + + @pytest.mark.asyncio @pytest.mark.parametrize( ("action", "expected_text"), From 593d1127319eb26c82c18a8262c959ef121ae88b Mon Sep 17 00:00:00 2001 From: ttiee <469784630@qq.com> Date: Wed, 18 Mar 2026 11:24:48 +0800 Subject: [PATCH 2/7] =?UTF-8?q?=F0=9F=90=9B=20fix(telegram):=20=E4=BF=AE?= =?UTF-8?q?=E6=AD=A3=E4=B8=BB=E5=AD=90=20agent=20=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E5=BD=92=E5=B1=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修正 native 协议解析只应由主线程的 turn/completed 结束当前轮次,避免子 agent 提前收口主流程。 移除把子 agent final/status 文本回填为主 agent 最终答案的错误回退,避免后续对话误绑定到子 agent。 补充主线程等待、主文本归属和子线程完成顺序的回归测试,并更新维护记录。 --- ...2026-03-17-telegram-subagent-visibility.md | 17 ++- src/nonebot_plugin_codex/native_client.py | 68 ++-------- tests/test_native_client.py | 117 +++++++++++++++++- 3 files changed, 134 insertions(+), 68 deletions(-) diff --git a/docs/maintenance/2026-03-17-telegram-subagent-visibility.md b/docs/maintenance/2026-03-17-telegram-subagent-visibility.md index 9e6491e..4569c72 100644 --- a/docs/maintenance/2026-03-17-telegram-subagent-visibility.md +++ b/docs/maintenance/2026-03-17-telegram-subagent-visibility.md @@ -35,11 +35,11 @@ When the plugin uses the native `codex app-server` lane and Codex delegates work - In follow-up testing, main-agent final text could still be lost when it only existed in `item/agentMessage/delta` frames, because the native fallback looked up the wrong buffered key on `turn/completed`. -- In successful multi-agent turns where the main thread produced no separate final-answer - message after `wait`, the bridge also discarded: - - subagent `agentMessage` items with `phase: "final_answer"` - - completed `wait.agentsStates[*].message` payloads - This left Telegram with a finished run but an empty `final_text`. +- Another follow-up issue appeared after multi-agent support landed: the native runner + treated any `turn/completed` as the end of the active run, including subagent turns. + That could bind later follow-up prompts to the subagent thread instead of the main + thread, and it also made the bridge vulnerable to leaking subagent result text into + the main-agent final-answer path. - When that happened, Telegram could still finalize the main progress panel as `Codex 已完成。`, then separately send `Codex 已完成,但没有返回可展示的最终文本。`, which made successful-looking runs appear to stop right after a subagent failure. @@ -61,9 +61,8 @@ When the plugin uses the native `codex app-server` lane and Codex delegates work - Confirm that progress updates mention both the main agent and the subagent state. - Add a native-client regression where a subagent reports `errored` but the main agent still produces a final answer through delta-only fallback. -- Add native-client regressions where: - - only a subagent `final_answer` exists before `turn/completed` - - only `wait.agentsStates[*].message` exists before `turn/completed` - and confirm the bridge still returns a non-empty `final_text`. +- Add a native-client regression where a subagent emits its own `turn/completed` + before the main thread finishes, and confirm the client waits for the main + `turn/completed` before returning or updating the stored thread id. - Confirm that Telegram uses `Codex 已完成,但没有返回可展示的最终文本。` for the main progress panel instead of a plain `Codex 已完成。` when no final text is available. diff --git a/src/nonebot_plugin_codex/native_client.py b/src/nonebot_plugin_codex/native_client.py index 69e091b..2596467 100644 --- a/src/nonebot_plugin_codex/native_client.py +++ b/src/nonebot_plugin_codex/native_client.py @@ -211,41 +211,6 @@ def _format_collab_tool_progress( return updates - -def _latest_completed_subagent_status_message( - item: dict[str, Any], - *, - main_thread_id: str, -) -> str: - receiver_ids = item.get("receiverThreadIds") - agent_states = item.get("agentsStates") - if not isinstance(agent_states, dict): - return "" - - ordered_ids: list[str] = [] - if isinstance(receiver_ids, list): - for entry in receiver_ids: - if isinstance(entry, str) and entry and entry not in ordered_ids: - ordered_ids.append(entry) - for entry in agent_states: - if isinstance(entry, str) and entry and entry not in ordered_ids: - ordered_ids.append(entry) - - latest_message = "" - for agent_id in ordered_ids: - if _normalize_agent_key(agent_id, main_thread_id=main_thread_id) == "main": - continue - state = agent_states.get(agent_id) - if not isinstance(state, dict): - continue - if str(state.get("status") or "") != "completed": - continue - message = state.get("message") - if isinstance(message, str) and message.strip(): - latest_message = message.strip() - return latest_message - - async def _terminate_process(process: Any, timeout: float) -> None: if process is None: return @@ -358,8 +323,6 @@ async def run_turn( ) -> NativeRunResult: diagnostics: list[str] = [] final_text = "" - last_subagent_final_text = "" - last_completed_subagent_status_message = "" pending_agent_messages: dict[str, str] = {} last_streamed_text: dict[str, str] = {} last_compaction_notice: dict[str, str] = {} @@ -432,13 +395,6 @@ async def emit_compaction_notice(agent_key: str, text: str) -> None: ) continue if item_type == "collabAgentToolCall": - if method == "item/completed": - latest_message = _latest_completed_subagent_status_message( - item, - main_thread_id=thread_id, - ) - if latest_message: - last_completed_subagent_status_message = latest_message collab_updates = _format_collab_tool_progress( item, main_thread_id=thread_id, @@ -467,8 +423,6 @@ async def emit_compaction_notice(agent_key: str, text: str) -> None: if phase != "commentary": if agent_key == "main": final_text = stripped - else: - last_subagent_final_text = stripped continue if method == "item/agentMessage/delta": @@ -503,6 +457,12 @@ async def emit_compaction_notice(agent_key: str, text: str) -> None: continue if method == "turn/completed": + completed_agent_key = _normalize_agent_key( + params.get("threadId"), + main_thread_id=thread_id, + ) + if completed_agent_key != "main": + continue turn = params.get("turn") if not isinstance(turn, dict): return NativeRunResult( @@ -525,15 +485,6 @@ async def emit_compaction_notice(agent_key: str, text: str) -> None: if buffered_text: final_text = buffered_text await emit_stream_update("main", final_text) - if not final_text and last_subagent_final_text: - final_text = last_subagent_final_text - await emit_stream_update("main", final_text) - if ( - not final_text - and last_completed_subagent_status_message - ): - final_text = last_completed_subagent_status_message - await emit_stream_update("main", final_text) status = turn.get("status") error = turn.get("error") exit_code = 0 if status == "completed" and error is None else 1 @@ -635,6 +586,13 @@ async def list_threads(self) -> list[NativeThreadSummary]: return threads + async def read_rate_limits(self) -> dict[str, Any]: + result = await self._request("account/rateLimits/read", {}) + snapshot = result.get("rateLimits") + if not isinstance(snapshot, dict): + raise RuntimeError("account/rateLimits/read 缺少 rateLimits 响应。") + return snapshot + def _permission_params(self, permission_mode: str) -> dict[str, str]: if permission_mode == "safe": return {"approvalPolicy": "never", "sandbox": "workspace-write"} diff --git a/tests/test_native_client.py b/tests/test_native_client.py index b4ecd1f..7ca1127 100644 --- a/tests/test_native_client.py +++ b/tests/test_native_client.py @@ -788,7 +788,7 @@ async def launcher(*_args: Any, **_kwargs: Any) -> FakeProcess: @pytest.mark.asyncio -async def test_native_client_uses_subagent_final_answer_when_main_turn_has_no_final_text( +async def test_native_client_does_not_use_subagent_final_answer_as_main_final_text( ) -> None: process = FakeProcess( stdout=FakeStdout( @@ -889,11 +889,11 @@ async def launcher(*_args: Any, **_kwargs: Any) -> FakeProcess: result = await client.run_turn(thread.thread_id, "hello") assert result.exit_code == 0 - assert result.final_text == "subagent final answer" + assert result.final_text == "" @pytest.mark.asyncio -async def test_native_client_uses_completed_wait_message_when_no_agent_final_text_exists( +async def test_native_client_does_not_use_wait_status_message_as_main_final_text( ) -> None: process = FakeProcess( stdout=FakeStdout( @@ -978,7 +978,116 @@ async def launcher(*_args: Any, **_kwargs: Any) -> FakeProcess: result = await client.run_turn(thread.thread_id, "hello") assert result.exit_code == 0 - assert result.final_text == "tests ready" + assert result.final_text == "" + + +@pytest.mark.asyncio +async def test_native_client_ignores_subagent_turn_completed_until_main_turn_finishes( +) -> None: + process = FakeProcess( + stdout=FakeStdout( + [ + json.dumps({"jsonrpc": "2.0", "id": 1, "result": {}}) + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "id": 2, + "result": { + "thread": { + "id": "thread-main", + "name": "Main Thread", + "updatedAt": "2025-03-01T00:00:00Z", + "cwd": "/tmp/work", + "source": "cli", + } + }, + } + ) + + "\n", + json.dumps({"jsonrpc": "2.0", "id": 3, "result": {}}) + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "turn/started", + "params": {"threadId": "thread-main"}, + } + ) + + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "item/completed", + "params": { + "threadId": "thread-sub-1", + "item": { + "id": "msg-sub-final", + "type": "agentMessage", + "text": "subagent final answer", + "phase": "final_answer", + }, + }, + } + ) + + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "turn/completed", + "params": { + "threadId": "thread-sub-1", + "turn": {"status": "completed", "error": None}, + }, + } + ) + + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "item/completed", + "params": { + "threadId": "thread-main", + "item": { + "id": "msg-main-final", + "type": "agentMessage", + "text": "main final answer", + "phase": "final_answer", + }, + }, + } + ) + + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "turn/completed", + "params": { + "threadId": "thread-main", + "turn": {"status": "completed", "error": None}, + }, + } + ) + + "\n", + ] + ), + stdin=FakeStdin(), + ) + + async def launcher(*_args: Any, **_kwargs: Any) -> FakeProcess: + return process + + client = NativeCodexClient(binary="codex", launcher=launcher) + + thread = await client.start_thread( + workdir="/tmp/work", + model="gpt-5", + reasoning_effort="xhigh", + permission_mode="safe", + ) + result = await client.run_turn(thread.thread_id, "hello") + + assert result.exit_code == 0 + assert result.thread_id == "thread-main" + assert result.final_text == "main final answer" @pytest.mark.asyncio From 464c80073ba7be2f7083af4a9bc1104573e62f1f Mon Sep 17 00:00:00 2001 From: ttiee <469784630@qq.com> Date: Wed, 18 Mar 2026 13:44:21 +0800 Subject: [PATCH 3/7] =?UTF-8?q?=E2=9C=A8=20feat(telegram):=20=E5=A2=9E?= =?UTF-8?q?=E5=BC=BA=E7=8A=B6=E6=80=81=E9=9D=A2=E6=9D=BF=E5=B1=95=E7=A4=BA?= =?UTF-8?q?=E4=B8=8A=E4=B8=8B=E6=96=87=E4=B8=8E=E9=A2=9D=E5=BA=A6=E4=BF=A1?= =?UTF-8?q?=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/nonebot_plugin_codex/native_client.py | 35 +++++++- src/nonebot_plugin_codex/service.py | 89 +++++++++++++------ tests/test_native_client.py | 100 ++++++++++++++++++++++ 3 files changed, 195 insertions(+), 29 deletions(-) diff --git a/src/nonebot_plugin_codex/native_client.py b/src/nonebot_plugin_codex/native_client.py index 2596467..18e1a52 100644 --- a/src/nonebot_plugin_codex/native_client.py +++ b/src/nonebot_plugin_codex/native_client.py @@ -15,7 +15,14 @@ class NativeAgentUpdate: text: str -Callback = Callable[[NativeAgentUpdate], object] +@dataclass(slots=True) +class NativeTokenUsage: + total_tokens: int + model_context_window: int | None = None + + +Callback = Callable[[Any], object] +TokenUsageCallback = Callable[[NativeTokenUsage], object] ProcessLauncher = Callable[..., Awaitable[Any]] @@ -65,7 +72,7 @@ def _thread_summary_from_payload(thread: dict[str, Any]) -> NativeThreadSummary: ) -async def _maybe_call(callback: Callback | None, update: NativeAgentUpdate) -> None: +async def _maybe_call(callback: Callback | None, update: Any) -> None: if callback is None: return result = callback(update) @@ -320,6 +327,7 @@ async def run_turn( reasoning_effort: str | None = None, on_progress: Callback | None = None, on_stream_text: Callback | None = None, + on_token_usage: TokenUsageCallback | None = None, ) -> NativeRunResult: diagnostics: list[str] = [] final_text = "" @@ -454,6 +462,29 @@ async def emit_compaction_notice(agent_key: str, text: str) -> None: ) notice = _extract_compaction_notice(params) or "已压缩较早对话上下文。" await emit_compaction_notice(agent_key, notice) + + if method == "thread/tokenUsage/updated": + token_usage = params.get("tokenUsage") + if not isinstance(token_usage, dict): + continue + total = token_usage.get("total") + total_tokens = ( + total.get("totalTokens") if isinstance(total, dict) else None + ) + model_context_window = token_usage.get("modelContextWindow") + if not isinstance(total_tokens, int): + continue + if model_context_window is not None and not isinstance( + model_context_window, int + ): + model_context_window = None + await _maybe_call( + on_token_usage, + NativeTokenUsage( + total_tokens=total_tokens, + model_context_window=model_context_window, + ), + ) continue if method == "turn/completed": diff --git a/src/nonebot_plugin_codex/service.py b/src/nonebot_plugin_codex/service.py index e16f743..de5c6f6 100644 --- a/src/nonebot_plugin_codex/service.py +++ b/src/nonebot_plugin_codex/service.py @@ -10,14 +10,14 @@ except ModuleNotFoundError: # pragma: no cover - Python < 3.11 import tomli as tomllib from pathlib import Path -from datetime import datetime, timedelta, timezone +from datetime import datetime, timezone from typing import Any from collections.abc import Callable, Awaitable from dataclasses import field, asdict, dataclass from nonebot.adapters.telegram.model import InlineKeyboardButton, InlineKeyboardMarkup -from .native_client import NativeAgentUpdate, NativeCodexClient +from .native_client import NativeAgentUpdate, NativeCodexClient, NativeTokenUsage from .protocol_io import NdjsonProcessReader, ProtocolStreamError @dataclass(slots=True) @@ -125,6 +125,8 @@ class ChatSession: progress_lines: list[str] = field(default_factory=list) diagnostics: list[str] = field(default_factory=list) cancel_requested: bool = False + context_used_tokens: int | None = None + context_window_tokens: int | None = None @dataclass(slots=True) @@ -822,6 +824,15 @@ def _format_history_local_time(self, value: str) -> str: return value return parsed.astimezone().strftime("%Y-%m-%d %H:%M:%S") + def _format_status_reset_time(self, value: object) -> str: + if not isinstance(value, (int, float)): + return "未知" + try: + parsed = datetime.fromtimestamp(value, tz=timezone.utc) + except (OverflowError, OSError, ValueError): + return "未知" + return parsed.astimezone().strftime("%Y-%m-%d %H:%M:%S") + def _is_noise_history_text(self, text: str) -> bool: lowered = text.strip().lower() if not lowered: @@ -2110,41 +2121,53 @@ def navigate_status_panel( raise ValueError("未知状态面板操作。") return self._replace_status_panel_state(chat_key, previous=panel) + def _format_status_context_line(self, session: ChatSession | None) -> str: + if session is None: + return "上下文:暂不可用" + used_tokens = session.context_used_tokens + window_tokens = session.context_window_tokens + if not isinstance(used_tokens, int) or not isinstance(window_tokens, int): + return "上下文:暂不可用" + return f"上下文:{used_tokens:,} / {window_tokens:,} tokens" + + def _format_status_rate_limit_bucket( + self, + label: str, + bucket: object, + ) -> list[str]: + if not isinstance(bucket, dict): + return [f"{label}:暂不可用", f"{label} 刷新时间:未知"] + + used_percent = bucket.get("usedPercent") + if not isinstance(used_percent, int): + return [f"{label}:暂不可用", f"{label} 刷新时间:未知"] + + used_percent = max(0, min(used_percent, 100)) + remaining_percent = max(0, 100 - used_percent) + return [ + f"{label}:{used_percent}% 已用,{remaining_percent}% 剩余", + f"{label} 刷新时间:{self._format_status_reset_time(bucket.get('resetsAt'))}", + ] + async def render_status_panel( self, chat_key: str ) -> tuple[str, InlineKeyboardMarkup]: panel = self.get_status_panel(chat_key) - now = datetime.now().astimezone() - current_window = "上午窗口" if now.hour < 12 else "下午窗口" - lines = ["当前额度状态", f"当前窗口:{current_window}"] + lines = [ + "当前额度状态", + self._format_status_context_line(self.sessions.get(chat_key)), + ] runner = self._spawn_native_client() try: if runner is None: - raise RuntimeError("Native Codex client is not configured.") + raise RuntimeError("当前环境未启用 Codex app-server 额度查询。") limits = await runner.read_rate_limits() - primary = limits.get("primary") - if not isinstance(primary, dict): - raise RuntimeError("额度状态响应缺少 primary 字段。") - - used_percent = int(primary.get("usedPercent") or 0) - remaining_percent = max(0, 100 - used_percent) - resets_at = int(primary.get("resetsAt") or 0) - if resets_at > 0: - reset_time = datetime.fromtimestamp(resets_at, tz=now.tzinfo) - else: - reset_time = now - delta = max(reset_time - now, timedelta()) - hours, remainder = divmod(int(delta.total_seconds()), 3600) - minutes = remainder // 60 - lines.extend( - [ - f"已使用:{used_percent}%", - f"剩余:{remaining_percent}%", - f"刷新时间:{reset_time.strftime('%Y-%m-%d %H:%M:%S %z')}", - f"距离刷新:{hours}小时{minutes}分钟", - ] + self._format_status_rate_limit_bucket("额度 1", limits.get("primary")) + ) + lines.extend( + self._format_status_rate_limit_bucket("额度 2", limits.get("secondary")) ) except Exception as exc: lines.extend(["额度状态:暂不可用", str(exc) or "未知错误。"]) @@ -3483,6 +3506,10 @@ async def forward_stream_text(update: NativeAgentUpdate) -> None: reasoning_effort=preferences.reasoning_effort, on_progress=forward_progress, on_stream_text=forward_stream_text, + on_token_usage=lambda update: self._apply_token_usage_update( + session, + update, + ), ) final_thread_id = native_result.thread_id or thread.thread_id self._set_native_thread_id(session, final_thread_id) @@ -3520,3 +3547,11 @@ async def forward_stream_text(update: NativeAgentUpdate) -> None: session.native_runner = None session.runner_task = None session.cancel_requested = False + + def _apply_token_usage_update( + self, + session: ChatSession, + update: NativeTokenUsage, + ) -> None: + session.context_used_tokens = update.total_tokens + session.context_window_tokens = update.model_context_window diff --git a/tests/test_native_client.py b/tests/test_native_client.py index 7ca1127..38175ad 100644 --- a/tests/test_native_client.py +++ b/tests/test_native_client.py @@ -158,6 +158,20 @@ async def test_native_client_run_turn_reports_context_compaction_progress() -> N "type": "contextCompaction", "summary": "已压缩较早对话上下文。", }, +======= + json.dumps( + { + "jsonrpc": "2.0", + "id": 2, + "result": { + "thread": { + "id": "thread-1", + "name": "Thread One", + "updatedAt": "2025-03-01T00:00:00Z", + "cwd": "/tmp/work", + "source": "cli", + } +>>>>>>> ✨ feat(telegram): 增强状态面板展示上下文与额度信息 }, } ) @@ -206,6 +220,92 @@ async def launcher(*_args: Any, **_kwargs: Any) -> FakeProcess: assert result.final_text == "hello" +@pytest.mark.asyncio +async def test_native_client_reports_thread_token_usage_updates() -> None: + process = FakeProcess( + stdout=FakeStdout( + [ + json.dumps({"jsonrpc": "2.0", "id": 1, "result": {}}) + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "id": 2, + "result": { + "thread": { + "id": "thread-1", + "name": "Thread One", + "updatedAt": "2025-03-01T00:00:00Z", + "cwd": "/tmp/work", + "source": "cli", + } + }, + } + ) + + "\n", + json.dumps({"jsonrpc": "2.0", "id": 3, "result": {}}) + "\n", + json.dumps({"jsonrpc": "2.0", "method": "turn/started", "params": {}}) + + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "thread/tokenUsage/updated", + "params": { + "threadId": "thread-1", + "turnId": "turn-1", + "tokenUsage": { + "modelContextWindow": 200000, + "total": {"totalTokens": 12345}, + "last": { + "cachedInputTokens": 0, + "inputTokens": 100, + "outputTokens": 20, + "reasoningOutputTokens": 30, + "totalTokens": 150, + }, + }, + }, + } + ) + + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "turn/completed", + "params": { + "threadId": "thread-1", + "turn": {"status": "completed", "error": None}, + }, + } + ) + + "\n", + ] + ), + stdin=FakeStdin(), + ) + + async def launcher(*_args: Any, **_kwargs: Any) -> FakeProcess: + return process + + client = NativeCodexClient(binary="codex", launcher=launcher) + thread = await client.start_thread( + workdir="/tmp/work", + model="gpt-5", + reasoning_effort="xhigh", + permission_mode="safe", + ) + token_usage_updates: list[tuple[int, int | None]] = [] + + await client.run_turn( + thread.thread_id, + "hello", + on_token_usage=lambda update: token_usage_updates.append( + (update.total_tokens, update.model_context_window) + ), + ) + + assert token_usage_updates == [(12345, 200000)] + + @pytest.mark.asyncio async def test_native_client_compact_thread_waits_for_compaction_notice() -> None: process = FakeProcess( From 17dcf88951a0421873d813e74fcd761584017aef Mon Sep 17 00:00:00 2001 From: ttiee <469784630@qq.com> Date: Wed, 18 Mar 2026 13:44:21 +0800 Subject: [PATCH 4/7] =?UTF-8?q?=F0=9F=90=9B=20fix(service):=20=E9=87=8D?= =?UTF-8?q?=E7=BD=AE=E4=BC=9A=E8=AF=9D=E6=97=B6=E6=B8=85=E7=90=86=E7=8A=B6?= =?UTF-8?q?=E6=80=81=E9=9D=A2=E6=9D=BF=E4=B8=8A=E4=B8=8B=E6=96=87=E6=AE=8B?= =?UTF-8?q?=E7=95=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/nonebot_plugin_codex/service.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/nonebot_plugin_codex/service.py b/src/nonebot_plugin_codex/service.py index de5c6f6..28a1cd0 100644 --- a/src/nonebot_plugin_codex/service.py +++ b/src/nonebot_plugin_codex/service.py @@ -2657,12 +2657,17 @@ def _set_native_thread_id(self, session: ChatSession, thread_id: str | None) -> if session.active_mode == "resume": session.thread_id = thread_id + def _clear_status_context_usage(self, session: ChatSession) -> None: + session.context_used_tokens = None + session.context_window_tokens = None + def _clear_thread_only(self, chat_key: str) -> None: session = self.get_session(chat_key) session.native_thread_id = None session.thread_id = None session.exec_thread_id = None session.strict_resume = False + self._clear_status_context_usage(session) def _browser_total_pages(self, entries: list[DirectoryEntry]) -> int: return max(1, (len(entries) + BROWSER_PAGE_SIZE - 1) // BROWSER_PAGE_SIZE) @@ -3046,6 +3051,7 @@ async def reset_chat(self, chat_key: str, *, keep_active: bool) -> ChatSession: session.exec_thread_id = None session.thread_id = None session.strict_resume = False + self._clear_status_context_usage(session) session.running = False session.process = None session.native_runner = None From 1cdc78ebd03b6a0356456169ea86f1e4647ae1a6 Mon Sep 17 00:00:00 2001 From: ttiee <469784630@qq.com> Date: Wed, 18 Mar 2026 13:44:21 +0800 Subject: [PATCH 5/7] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20refactor(telegram):=20?= =?UTF-8?q?=E7=AE=80=E5=8C=96=E7=8A=B6=E6=80=81=E9=9D=A2=E6=9D=BF=E9=A2=9D?= =?UTF-8?q?=E5=BA=A6=E5=B1=95=E7=A4=BA=E6=96=87=E6=A1=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/nonebot_plugin_codex/service.py | 40 ++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/src/nonebot_plugin_codex/service.py b/src/nonebot_plugin_codex/service.py index 28a1cd0..699d73b 100644 --- a/src/nonebot_plugin_codex/service.py +++ b/src/nonebot_plugin_codex/service.py @@ -831,7 +831,28 @@ def _format_status_reset_time(self, value: object) -> str: parsed = datetime.fromtimestamp(value, tz=timezone.utc) except (OverflowError, OSError, ValueError): return "未知" - return parsed.astimezone().strftime("%Y-%m-%d %H:%M:%S") + return parsed.astimezone().strftime("%m-%d %H:%M:%S") + + def _format_status_bucket_label(self, bucket: object, fallback: str) -> str: + if not isinstance(bucket, dict): + return fallback + window_minutes = bucket.get("windowDurationMins") + if window_minutes == 300: + return "5小时" + if window_minutes == 10080: + return "1周" + if isinstance(window_minutes, int) and window_minutes > 0: + if window_minutes % (60 * 24 * 7) == 0: + weeks = window_minutes // (60 * 24 * 7) + return f"{weeks}周" + if window_minutes % (60 * 24) == 0: + days = window_minutes // (60 * 24) + return f"{days}天" + if window_minutes % 60 == 0: + hours = window_minutes // 60 + return f"{hours}小时" + return f"{window_minutes}分钟" + return fallback def _is_noise_history_text(self, text: str) -> bool: lowered = text.strip().lower() @@ -2132,9 +2153,10 @@ def _format_status_context_line(self, session: ChatSession | None) -> str: def _format_status_rate_limit_bucket( self, - label: str, bucket: object, + fallback_label: str, ) -> list[str]: + label = self._format_status_bucket_label(bucket, fallback_label) if not isinstance(bucket, dict): return [f"{label}:暂不可用", f"{label} 刷新时间:未知"] @@ -2145,8 +2167,8 @@ def _format_status_rate_limit_bucket( used_percent = max(0, min(used_percent, 100)) remaining_percent = max(0, 100 - used_percent) return [ - f"{label}:{used_percent}% 已用,{remaining_percent}% 剩余", - f"{label} 刷新时间:{self._format_status_reset_time(bucket.get('resetsAt'))}", + f"{label}:剩余 {remaining_percent}%", + f"{label}刷新时间:{self._format_status_reset_time(bucket.get('resetsAt'))}", ] async def render_status_panel( @@ -2164,10 +2186,16 @@ async def render_status_panel( raise RuntimeError("当前环境未启用 Codex app-server 额度查询。") limits = await runner.read_rate_limits() lines.extend( - self._format_status_rate_limit_bucket("额度 1", limits.get("primary")) + self._format_status_rate_limit_bucket( + limits.get("primary"), + "额度 1", + ) ) lines.extend( - self._format_status_rate_limit_bucket("额度 2", limits.get("secondary")) + self._format_status_rate_limit_bucket( + limits.get("secondary"), + "额度 2", + ) ) except Exception as exc: lines.extend(["额度状态:暂不可用", str(exc) or "未知错误。"]) From a018fe497135f6787f0d55ab6881bbb56f573b20 Mon Sep 17 00:00:00 2001 From: ttiee <469784630@qq.com> Date: Wed, 18 Mar 2026 14:04:50 +0800 Subject: [PATCH 6/7] =?UTF-8?q?=F0=9F=90=9B=20fix(test):=20=E6=B8=85?= =?UTF-8?q?=E7=90=86=20rebase=20=E9=81=97=E7=95=99=E5=86=B2=E7=AA=81?= =?UTF-8?q?=E6=A0=87=E8=AE=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/test_native_client.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/tests/test_native_client.py b/tests/test_native_client.py index 38175ad..0d1a174 100644 --- a/tests/test_native_client.py +++ b/tests/test_native_client.py @@ -158,20 +158,6 @@ async def test_native_client_run_turn_reports_context_compaction_progress() -> N "type": "contextCompaction", "summary": "已压缩较早对话上下文。", }, -======= - json.dumps( - { - "jsonrpc": "2.0", - "id": 2, - "result": { - "thread": { - "id": "thread-1", - "name": "Thread One", - "updatedAt": "2025-03-01T00:00:00Z", - "cwd": "/tmp/work", - "source": "cli", - } ->>>>>>> ✨ feat(telegram): 增强状态面板展示上下文与额度信息 }, } ) From 44c53aac5affc67ce2134c053944e2f6e864758c Mon Sep 17 00:00:00 2001 From: ttiee <469784630@qq.com> Date: Wed, 18 Mar 2026 17:36:10 +0800 Subject: [PATCH 7/7] =?UTF-8?q?=F0=9F=90=9B=20fix(native):=20=E8=BF=87?= =?UTF-8?q?=E6=BB=A4=E5=AD=90=E7=BA=BF=E7=A8=8B=20token=20usage=20?= =?UTF-8?q?=E5=B9=B6=E9=81=BF=E5=85=8D=20commentary=20fallback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/nonebot_plugin_codex/native_client.py | 31 +++- tests/test_native_client.py | 191 ++++++++++++++++++++++ 2 files changed, 219 insertions(+), 3 deletions(-) diff --git a/src/nonebot_plugin_codex/native_client.py b/src/nonebot_plugin_codex/native_client.py index 18e1a52..660c79b 100644 --- a/src/nonebot_plugin_codex/native_client.py +++ b/src/nonebot_plugin_codex/native_client.py @@ -332,6 +332,7 @@ async def run_turn( diagnostics: list[str] = [] final_text = "" pending_agent_messages: dict[str, str] = {} + pending_agent_message_phases: dict[str, str | None] = {} last_streamed_text: dict[str, str] = {} last_compaction_notice: dict[str, str] = {} @@ -421,11 +422,25 @@ async def emit_compaction_notice(agent_key: str, text: str) -> None: continue if item_type == "agentMessage": item_id = item.get("id") - if isinstance(item_id, str) and item_id: - pending_agent_messages.pop(f"{agent_key}:{item_id}", None) + phase = item.get("phase") + item_key = ( + f"{agent_key}:{item_id}" + if isinstance(item_id, str) and item_id + else None + ) + if item_key is not None: + pending_agent_message_phases[item_key] = ( + phase if isinstance(phase, str) else None + ) + if ( + method == "item/completed" + and isinstance(item_id, str) + and item_id + ): + pending_agent_messages.pop(item_key, None) + pending_agent_message_phases.pop(item_key, None) text = item.get("text") if isinstance(text, str) and text.strip(): - phase = item.get("phase") stripped = text.strip() await emit_stream_update(agent_key, stripped) if phase != "commentary": @@ -464,6 +479,12 @@ async def emit_compaction_notice(agent_key: str, text: str) -> None: await emit_compaction_notice(agent_key, notice) if method == "thread/tokenUsage/updated": + agent_key = _normalize_agent_key( + params.get("threadId"), + main_thread_id=thread_id, + ) + if agent_key != "main": + continue token_usage = params.get("tokenUsage") if not isinstance(token_usage, dict): continue @@ -511,6 +532,10 @@ async def emit_compaction_notice(agent_key: str, text: str) -> None: ), None, ) + if fallback_key is not None: + fallback_phase = pending_agent_message_phases.get(fallback_key) + if fallback_phase == "commentary": + fallback_key = None if fallback_key is not None: buffered_text = pending_agent_messages[fallback_key].strip() if buffered_text: diff --git a/tests/test_native_client.py b/tests/test_native_client.py index 0d1a174..480b9df 100644 --- a/tests/test_native_client.py +++ b/tests/test_native_client.py @@ -292,6 +292,112 @@ async def launcher(*_args: Any, **_kwargs: Any) -> FakeProcess: assert token_usage_updates == [(12345, 200000)] +@pytest.mark.asyncio +async def test_native_client_ignores_subagent_thread_token_usage_updates() -> None: + process = FakeProcess( + stdout=FakeStdout( + [ + json.dumps({"jsonrpc": "2.0", "id": 1, "result": {}}) + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "id": 2, + "result": { + "thread": { + "id": "thread-main", + "name": "Main Thread", + "updatedAt": "2025-03-01T00:00:00Z", + "cwd": "/tmp/work", + "source": "cli", + } + }, + } + ) + + "\n", + json.dumps({"jsonrpc": "2.0", "id": 3, "result": {}}) + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "thread/tokenUsage/updated", + "params": { + "threadId": "thread-sub-1", + "turnId": "turn-1", + "tokenUsage": { + "modelContextWindow": 999999, + "total": {"totalTokens": 55555}, + "last": { + "cachedInputTokens": 0, + "inputTokens": 100, + "outputTokens": 20, + "reasoningOutputTokens": 30, + "totalTokens": 150, + }, + }, + }, + } + ) + + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "thread/tokenUsage/updated", + "params": { + "threadId": "thread-main", + "turnId": "turn-1", + "tokenUsage": { + "modelContextWindow": 200000, + "total": {"totalTokens": 12345}, + "last": { + "cachedInputTokens": 0, + "inputTokens": 100, + "outputTokens": 20, + "reasoningOutputTokens": 30, + "totalTokens": 150, + }, + }, + }, + } + ) + + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "turn/completed", + "params": { + "threadId": "thread-main", + "turn": {"status": "completed", "error": None}, + }, + } + ) + + "\n", + ] + ), + stdin=FakeStdin(), + ) + + async def launcher(*_args: Any, **_kwargs: Any) -> FakeProcess: + return process + + client = NativeCodexClient(binary="codex", launcher=launcher) + thread = await client.start_thread( + workdir="/tmp/work", + model="gpt-5", + reasoning_effort="xhigh", + permission_mode="safe", + ) + token_usage_updates: list[tuple[int, int | None]] = [] + + await client.run_turn( + thread.thread_id, + "hello", + on_token_usage=lambda update: token_usage_updates.append( + (update.total_tokens, update.model_context_window) + ), + ) + + assert token_usage_updates == [(12345, 200000)] + + @pytest.mark.asyncio async def test_native_client_compact_thread_waits_for_compaction_notice() -> None: process = FakeProcess( @@ -765,6 +871,91 @@ async def launcher(*_args: Any, **_kwargs: Any) -> FakeProcess: assert result.final_text == "main final only from delta" +@pytest.mark.asyncio +async def test_native_client_does_not_use_commentary_delta_as_main_final_text() -> None: + process = FakeProcess( + stdout=FakeStdout( + [ + json.dumps({"jsonrpc": "2.0", "id": 1, "result": {}}) + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "id": 2, + "result": { + "thread": { + "id": "thread-1", + "name": "Thread One", + "updatedAt": "2025-03-01T00:00:00Z", + "cwd": "/tmp/work", + "source": "cli", + } + }, + } + ) + + "\n", + json.dumps({"jsonrpc": "2.0", "id": 3, "result": {}}) + "\n", + json.dumps({"jsonrpc": "2.0", "method": "turn/started", "params": {}}) + + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "item/started", + "params": { + "threadId": "thread-1", + "item": { + "id": "msg-main-commentary", + "type": "agentMessage", + "text": "", + "phase": "commentary", + }, + }, + } + ) + + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "item/agentMessage/delta", + "params": { + "threadId": "thread-1", + "itemId": "msg-main-commentary", + "delta": "main commentary only", + }, + } + ) + + "\n", + json.dumps( + { + "jsonrpc": "2.0", + "method": "turn/completed", + "params": { + "threadId": "thread-1", + "turn": {"status": "completed", "error": None}, + }, + } + ) + + "\n", + ] + ), + stdin=FakeStdin(), + ) + + async def launcher(*_args: Any, **_kwargs: Any) -> FakeProcess: + return process + + client = NativeCodexClient(binary="codex", launcher=launcher) + thread = await client.start_thread( + workdir="/tmp/work", + model="gpt-5", + reasoning_effort="xhigh", + permission_mode="safe", + ) + result = await client.run_turn(thread.thread_id, "hello") + + assert result.exit_code == 0 + assert result.final_text == "" + + @pytest.mark.asyncio async def test_native_client_keeps_main_final_text_after_subagent_error() -> None: process = FakeProcess(