diff --git a/src/octopal/runtime/octo/control_replies.py b/src/octopal/runtime/octo/control_replies.py index 8b34dd8..c3490d6 100644 --- a/src/octopal/runtime/octo/control_replies.py +++ b/src/octopal/runtime/octo/control_replies.py @@ -113,6 +113,8 @@ def _coerce_scheduled_octo_control_reply(text: str) -> str: async def _normalize_scheduled_octo_control_reply( provider: InferenceProvider | None, text: str, + *, + bounded_control: bool = True, ) -> str: raw_value = str(text or "") explicit = extract_heartbeat_user_visible_message(raw_value) @@ -129,15 +131,27 @@ async def _normalize_scheduled_octo_control_reply( if provider is None: return _coerce_scheduled_octo_control_reply(value) + if bounded_control: + blocked_rule = ( + "Use SCHEDULED_TASK_BLOCKED when the task cannot complete from the bounded route " + "because it needs workers, external access, or unavailable tools." + ) + rewrite_context = "scheduled Octo control" + else: + blocked_rule = ( + "Use SCHEDULED_TASK_BLOCKED only when the task cannot complete even with the normal " + "scheduled Octo task toolset." + ) + rewrite_context = "scheduled Octo task" rewrite_prompt = ( - "Rewrite the draft scheduled Octo control reply into the strict completion contract.\n" + f"Rewrite the draft {rewrite_context} reply into the strict completion contract.\n" "Return exactly one of:\n" "- SCHEDULED_TASK_DONE\n" "- SCHEDULED_TASK_BLOCKED\n" "- NO_USER_RESPONSE\n" "- ...\n" "Use SCHEDULED_TASK_DONE only if the task completed successfully with no user-visible update.\n" - "Use SCHEDULED_TASK_BLOCKED when the task cannot complete from the bounded route because it needs workers, external access, or unavailable tools.\n" + f"{blocked_rule}\n" "Use only for a concise completed user-facing update.\n" "Use NO_USER_RESPONSE if the task did not complete or there is no completion signal.\n" "Do not include any extra text outside the token or wrapper." diff --git a/src/octopal/runtime/octo/core.py b/src/octopal/runtime/octo/core.py index 6a2013d..5c6648f 100644 --- a/src/octopal/runtime/octo/core.py +++ b/src/octopal/runtime/octo/core.py @@ -58,6 +58,7 @@ from octopal.runtime.octo.router import route_or_reply as route_or_reply from octopal.runtime.octo.router import route_proactive_tick as route_proactive_tick from octopal.runtime.octo.router import route_scheduled_octo_control as route_scheduled_octo_control +from octopal.runtime.octo.router import route_scheduled_octo_task as route_scheduled_octo_task from octopal.runtime.octo.router import route_scheduler_tick as route_scheduler_tick from octopal.runtime.octo.runtime_config import _env_flag, _env_int from octopal.runtime.octo.scheduled_runtime import OctoScheduledRuntimeMixin diff --git a/src/octopal/runtime/octo/router.py b/src/octopal/runtime/octo/router.py index 4f86b9d..7e526be 100644 --- a/src/octopal/runtime/octo/router.py +++ b/src/octopal/runtime/octo/router.py @@ -173,6 +173,7 @@ _SCHEDULER_ALLOWED_TOOL_NAMES = { "check_schedule", "scheduler_status", + "repair_scheduled_tasks", "octo_context_health", "list_workers", "list_active_workers", @@ -656,6 +657,8 @@ async def route_scheduler_tick( "Scheduler route rules:\n" "- Keep this turn operational and bounded.\n" "- You may inspect schedule state and worker availability.\n" + "- You may apply safe scheduled-task route repairs with repair_scheduled_tasks(apply=true) " + "when the candidate is unambiguous.\n" "- Do not dispatch workers directly from this route.\n" "- Return one of: SCHEDULER_IDLE, NO_USER_RESPONSE, or ...." ), @@ -721,8 +724,8 @@ async def route_proactive_tick( "- Keep this turn bounded to initiative discovery and self-queue maintenance.\n" "- You may add, claim, execute, cancel, or mark self-queue items only when the payload supports it.\n" "- You may preview scheduled-task repair candidates with repair_scheduled_tasks(apply=false).\n" - "- You may apply scheduled-task repairs only for an unambiguous blocked_by_route -> worker repair " - "where the task already has a valid worker_id; never provide worker_id from this route.\n" + "- You may apply scheduled-task repairs only for unambiguous blocked_by_route candidates. " + "For worker repairs the task must already have a valid worker_id; never provide worker_id from this route.\n" "- Do not start workers directly, schedule recurring tasks, use filesystem tools, use network/MCP tools, " "or perform external side effects from this route.\n" "- Use execute_self_queue_item only for an existing low/medium-risk queue item with an explicit worker_id; " @@ -834,6 +837,31 @@ async def route_scheduled_octo_control( await octo.set_thinking(False) +async def route_scheduled_octo_task( + octo: Any, + task: dict[str, Any], + *, + chat_id: int = 0, +) -> str: + """Run a scheduled task as a full Octo workspace task with normal tools and context.""" + task_text = _build_scheduled_octo_task_input(task) + bootstrap_context = await build_bootstrap_context_prompt(octo.store, chat_id) + return await route_or_reply( + octo, + octo.provider, + octo.memory, + task_text, + chat_id, + bootstrap_context.content, + internal_followup=True, + show_typing=False, + images=None, + saved_file_paths=None, + include_wakeup=False, + route_mode=RouteMode.CONVERSATION, + ) + + async def _complete_route_with_tools( *, octo: Any, @@ -1880,6 +1908,11 @@ def _build_scheduler_tick_input(octo: Any, *, max_tasks: int = 10) -> str: "worker_id": task.get("worker_id"), "frequency": task.get("frequency"), "notify_user": task.get("notify_user"), + "execution_mode": task.get("execution_mode"), + "dispatch_ready": task.get("dispatch_ready"), + "dispatch_policy_reason": task.get("dispatch_policy_reason"), + "blocked_reason": task.get("blocked_reason"), + "suggested_execution_mode": task.get("suggested_execution_mode"), "task_text": task.get("task_text"), } for task in due_tasks[: max(1, int(max_tasks))] @@ -1891,6 +1924,11 @@ def _build_scheduler_tick_input(octo: Any, *, max_tasks: int = 10) -> str: "due_now": bool(task.get("due_now")), "next_run_at": task.get("next_run_at"), "notify_user": task.get("notify_user"), + "execution_mode": task.get("execution_mode"), + "dispatch_ready": task.get("dispatch_ready"), + "dispatch_policy_reason": task.get("dispatch_policy_reason"), + "blocked_reason": task.get("blocked_reason"), + "suggested_execution_mode": task.get("suggested_execution_mode"), } for task in preview_tasks ], @@ -1940,8 +1978,8 @@ async def _build_proactive_tick_input(octo: Any, *, chat_id: int, reason: str) - f"{json.dumps(payload, ensure_ascii=False, sort_keys=True)}\n" "If there is already pending self-queue work with an explicit worker_id, you may use execute_self_queue_item. " "If pending work lacks a worker_id, prefer decision=blocked or noop. " - "If an opportunity kind is scheduled_task_repair and the task already has worker_id, you may preview " - "repair_scheduled_tasks and apply it only when the candidate is safe. " + "If an opportunity kind is scheduled_task_repair, you may preview repair_scheduled_tasks and apply it " + "only when the candidate is safe. Worker repairs require an existing worker_id. " "If the best opportunity is confidence >= 0.75, low/medium risk, and no pending work exists, " "use octo_self_queue_add to queue exactly one concrete initiative. " "Do not call start_worker directly from this route." @@ -1967,6 +2005,33 @@ def _build_scheduled_octo_control_input(task: dict[str, Any]) -> str: ) +def _build_scheduled_octo_task_input(task: dict[str, Any]) -> str: + payload = { + "task_id": task.get("id"), + "name": task.get("name"), + "frequency": task.get("frequency"), + "execution_mode": task.get("execution_mode"), + "notify_user": task.get("notify_user"), + "description": task.get("description"), + "task_text": task.get("task_text"), + "inputs": task.get("inputs") if isinstance(task.get("inputs"), dict) else {}, + "last_run_at": task.get("last_run_at"), + } + return ( + "Run this scheduled Octo task as a full autonomous workspace task:\n" + f"{json.dumps(payload, ensure_ascii=False, sort_keys=True)}\n\n" + "Use the normal tools, workspace context, memory, filesystem, MCP, web, and workers as needed. " + "Complete the task end-to-end before returning a completion signal. " + "If you create or update a file, verify it exists before finishing. " + "Do not treat this as a bounded control-plane route.\n\n" + "When the task is complete, return exactly one of:\n" + "- SCHEDULED_TASK_DONE if it completed successfully and no user-facing update is needed.\n" + "- ... if it completed and the user should receive a concise update.\n" + "- NO_USER_RESPONSE only if the task intentionally produced no change.\n" + "Return SCHEDULED_TASK_BLOCKED only if the task truly cannot be completed even with the full Octo toolset." + ) + + def _ensure_mandatory_octo_tools( active_tools: list[ToolSpec], all_tools: list[ToolSpec] ) -> list[ToolSpec]: diff --git a/src/octopal/runtime/octo/scheduled_runtime.py b/src/octopal/runtime/octo/scheduled_runtime.py index 2e4e8bc..d705cf3 100644 --- a/src/octopal/runtime/octo/scheduled_runtime.py +++ b/src/octopal/runtime/octo/scheduled_runtime.py @@ -21,6 +21,9 @@ from octopal.runtime.octo.router import ( route_scheduled_octo_control as _default_route_scheduled_octo_control, ) +from octopal.runtime.octo.router import ( + route_scheduled_octo_task as _default_route_scheduled_octo_task, +) from octopal.runtime.octo.runtime_config import _env_int from octopal.runtime.octo.scheduler_helpers import _coerce_positive_chat_id from octopal.runtime.scheduler.service import ( @@ -154,7 +157,10 @@ def _update_scheduled_octo_control_backoff_metadata( else: metadata.pop(SCHEDULED_TASK_BLOCKED_REASON_KEY, None) if reason_value == "blocked_by_route": - metadata[SCHEDULED_TASK_SUGGESTED_EXECUTION_MODE_KEY] = "worker" + if str(task.get("worker_id") or "").strip(): + metadata[SCHEDULED_TASK_SUGGESTED_EXECUTION_MODE_KEY] = "worker" + else: + metadata[SCHEDULED_TASK_SUGGESTED_EXECUTION_MODE_KEY] = "octo_task" elif blocked_until is None: metadata.pop(SCHEDULED_TASK_SUGGESTED_EXECUTION_MODE_KEY, None) try: @@ -312,6 +318,25 @@ async def _dispatch_due_scheduled_tasks_once( elif str(result.get("status") or "").strip().lower() == "failed": summary["errors"] += 1 continue + if execution_mode == "octo_task": + summary["attempted"] += 1 + try: + result = await self._run_scheduled_octo_task_once( + task=task, + chat_id=dispatch_chat_id, + ) + except Exception: + summary["errors"] += 1 + logger.exception( + "Scheduled Octo task failed", + task_id=task_id or None, + ) + continue + if bool(result.get("completed")): + summary["completed"] += 1 + elif str(result.get("status") or "").strip().lower() == "failed": + summary["errors"] += 1 + continue if not worker_id or not task_text: summary["rejected_by_policy"] += 1 reason = "missing_worker_id" if not worker_id else "missing_task_text" @@ -471,3 +496,104 @@ async def _run_scheduled_octo_control_task_once( "user_visible_sent": user_visible_sent, "delivery_mode": delivery.mode, } + + async def _run_scheduled_octo_task_once( + self, + *, + task: dict[str, Any], + chat_id: int = 0, + ) -> dict[str, Any]: + scheduler = self.scheduler + task_id = str(task.get("id") or "").strip() + notify_user = normalize_notify_user_policy(task.get("notify_user")) + route_scheduled_octo_task = _core_callable( + "route_scheduled_octo_task", + _default_route_scheduled_octo_task, + ) + reply_text = await route_scheduled_octo_task( + self, + task, + chat_id=chat_id, + ) + normalized_reply = await _normalize_scheduled_octo_control_reply( + self.provider, + reply_text, + bounded_control=False, + ) + if normalized_reply == _SCHEDULED_OCTO_CONTROL_BLOCKED: + logger.warning( + "Scheduled Octo task reported blocked", + task_id=task_id or None, + chat_id=chat_id, + raw_reply_preview=safe_preview(reply_text, limit=200), + ) + return { + "status": "failed", + "completed": False, + "reason": "blocked", + } + if normalized_reply == "NO_USER_RESPONSE": + logger.warning( + "Scheduled Octo task missing explicit completion signal", + task_id=task_id or None, + chat_id=chat_id, + raw_reply_preview=safe_preview(reply_text, limit=200), + ) + return { + "status": "failed", + "completed": False, + "reason": "missing_completion_signal", + } + if normalized_reply == _SCHEDULED_OCTO_CONTROL_DONE: + if scheduler is not None and task_id: + scheduler.mark_executed(task_id) + logger.info( + "Scheduled Octo task completed silently", + task_id=task_id or None, + notify_user=notify_user, + chat_id=chat_id, + ) + return { + "status": "completed", + "completed": True, + "user_visible_sent": False, + "delivery_mode": DeliveryMode.SILENT, + } + + delivery = resolve_user_delivery(normalized_reply) + user_visible_sent = False + if delivery.user_visible and notify_user != "never": + send_scheduler_control_update = _core_callable( + "_send_scheduler_control_update", + _default_send_scheduler_control_update, + ) + await send_scheduler_control_update( + self, + chat_id, + task_id or None, + delivery.text, + ) + user_visible_sent = True + elif delivery.user_visible: + logger.info( + "Scheduled Octo task update suppressed by notify policy", + task_id=task_id or None, + notify_user=notify_user, + chat_id=chat_id, + ) + if scheduler is not None and task_id: + scheduler.mark_executed(task_id) + logger.info( + "Scheduled Octo task completed", + task_id=task_id or None, + notify_user=notify_user, + chat_id=chat_id, + user_visible_sent=user_visible_sent, + delivery_mode=delivery.mode, + ) + return { + "status": "completed", + "completed": True, + "user_visible_sent": user_visible_sent, + "delivery_mode": delivery.mode, + } diff --git a/src/octopal/runtime/octo/self_queue.py b/src/octopal/runtime/octo/self_queue.py index ecdba04..fd37ccd 100644 --- a/src/octopal/runtime/octo/self_queue.py +++ b/src/octopal/runtime/octo/self_queue.py @@ -84,19 +84,34 @@ def _scheduler_opportunity_cards( if not task_id: continue suggested_mode = str(scheduled_task.get("suggested_execution_mode") or "").strip().lower() - if suggested_mode != "worker": + if suggested_mode not in {"worker", "octo_task"}: continue - dedupe_key = f"scheduled-task:{task_id}:suggested-worker" + worker_id = str(scheduled_task.get("worker_id") or "").strip() + if suggested_mode == "worker" and not worker_id: + continue + dedupe_key = f"scheduled-task:{task_id}:suggested-{suggested_mode}" if dedupe_key in active_dedupe_keys: continue name = str(scheduled_task.get("name") or task_id).strip() - worker_id = str(scheduled_task.get("worker_id") or "").strip() or "ops_sre" blocked_reason = str( scheduled_task.get("blocked_reason") or scheduled_task.get("dispatch_policy_reason") - or "suggested_execution_mode=worker" + or f"suggested_execution_mode={suggested_mode}" ).strip() + if suggested_mode == "worker": + task = ( + f"Inspect scheduled task {task_id!r} ({name!r}) blocked from its current route. " + "Find the least-risk repair or migration path, verify whether worker execution is appropriate, " + "and report the exact recommended change." + ) + suggested_worker_id = worker_id + else: + task = ( + f"Repair scheduled task {task_id!r} ({name!r}) from its blocked control route " + "to the full scheduled Octo task execution mode, then verify it is dispatch-ready." + ) + suggested_worker_id = None cards.append( _build_opportunity_card( kind="scheduled_task_repair", @@ -106,12 +121,8 @@ def _scheduler_opportunity_cards( effort="medium", confidence=0.88, risk="medium", - suggested_worker_id=worker_id, - task=( - f"Inspect scheduled task {task_id!r} ({name!r}) blocked from its current route. " - "Find the least-risk repair or migration path, verify whether worker execution is appropriate, " - "and report the exact recommended change." - ), + suggested_worker_id=suggested_worker_id, + task=task, dedupe_key=dedupe_key, inputs={ "scheduled_task_id": task_id, diff --git a/src/octopal/runtime/scheduler/service.py b/src/octopal/runtime/scheduler/service.py index ac816a2..3c45878 100644 --- a/src/octopal/runtime/scheduler/service.py +++ b/src/octopal/runtime/scheduler/service.py @@ -18,7 +18,7 @@ _EVERY_HOURS_RE = re.compile(r"^every\s+(\d+)\s+hours?$", re.IGNORECASE) _DAILY_AT_RE = re.compile(r"^daily\s+at\s+(\d{1,2}):(\d{2})$", re.IGNORECASE) _NOTIFY_USER_POLICIES = {"never", "if_significant", "always"} -_EXECUTION_MODES = {"worker", "octo_control"} +_EXECUTION_MODES = {"worker", "octo_control", "octo_task"} SCHEDULED_TASK_DELIVERY_CHAT_ID_KEY = "delivery_chat_id" SCHEDULED_TASK_TARGET_CHAT_ID_KEY = "target_chat_id" SCHEDULED_TASK_BLOCKED_UNTIL_KEY = "blocked_until" @@ -40,8 +40,14 @@ def normalize_execution_mode( worker_id: str | None = None, ) -> str: value = str(execution_mode or "").strip().lower() + aliases = { + "octo": "octo_task", + "octo_self": "octo_task", + "self": "octo_task", + } + value = aliases.get(value, value) if not value: - return "worker" if str(worker_id or "").strip() else "octo_control" + return "worker" if str(worker_id or "").strip() else "octo_task" if value not in _EXECUTION_MODES: allowed = ", ".join(sorted(_EXECUTION_MODES)) raise ValueError(f"execution_mode must be one of: {allowed}.") @@ -100,8 +106,10 @@ def schedule_task( normalized_notify_user = "never" if normalized_execution_mode == "worker" and not worker_id_value: raise ValueError("worker_id is required when execution_mode=worker.") - if normalized_execution_mode == "octo_control" and worker_id_value: - raise ValueError("worker_id must be omitted when execution_mode=octo_control.") + if normalized_execution_mode in {"octo_control", "octo_task"} and worker_id_value: + raise ValueError( + f"worker_id must be omitted when execution_mode={normalized_execution_mode}." + ) task_id = self._generate_id(name) self.store.upsert_scheduled_task( task_id=task_id, @@ -163,6 +171,8 @@ def repair_suggested_tasks( elif get_worker_template(self.workspace_dir, resolved_worker_id) is None: can_apply = False skip_reason = skip_reason or "unknown_worker_id" + elif suggested_mode == "octo_task": + resolved_worker_id = None candidate = { "task_id": task_id, "name": task.get("name"), @@ -441,11 +451,11 @@ def _normalize_task_record(self, task: dict[str, Any]) -> dict[str, Any]: blocked_reason = str(metadata.get(SCHEDULED_TASK_BLOCKED_REASON_KEY) or "").strip() or None suggested_execution_mode = parse_scheduled_task_suggested_execution_mode(metadata) if ( - suggested_execution_mode is None - and normalized["execution_mode"] == "octo_control" + normalized["execution_mode"] == "octo_control" and blocked_reason == "blocked_by_route" + and not str(normalized.get("worker_id") or "").strip() ): - suggested_execution_mode = "worker" + suggested_execution_mode = "octo_task" normalized["blocked_until"] = blocked_until.isoformat() if blocked_until is not None else None normalized["blocked_reason"] = blocked_reason normalized["suggested_execution_mode"] = suggested_execution_mode @@ -457,7 +467,10 @@ def _normalize_task_record(self, task: dict[str, Any]) -> dict[str, Any]: def _dispatch_readiness(self, task: dict[str, Any]) -> tuple[bool, str | None]: execution_mode = str(task.get("execution_mode") or "").strip().lower() suggested_execution_mode = str(task.get("suggested_execution_mode") or "").strip().lower() - if execution_mode == "octo_control" and suggested_execution_mode == "worker": + if execution_mode == "octo_control" and suggested_execution_mode in { + "worker", + "octo_task", + }: return False, str(task.get("blocked_reason") or "").strip() or "blocked_by_route" blocked_until_value = str(task.get("blocked_until") or "").strip() if blocked_until_value: @@ -467,7 +480,7 @@ def _dispatch_readiness(self, task: dict[str, Any]) -> tuple[bool, str | None]: blocked_until = None if blocked_until is not None and blocked_until.tzinfo is not None and blocked_until > utc_now(): return False, str(task.get("blocked_reason") or "").strip() or "blocked_by_route_backoff" - if execution_mode == "octo_control": + if execution_mode in {"octo_control", "octo_task"}: task_text = str(task.get("task_text") or "").strip() if not task_text: return False, "missing_task_text" diff --git a/src/octopal/tools/catalog.py b/src/octopal/tools/catalog.py index c14b94b..ad1115d 100644 --- a/src/octopal/tools/catalog.py +++ b/src/octopal/tools/catalog.py @@ -683,8 +683,12 @@ def get_tools(mcp_manager=None) -> list[ToolSpec]: "description": {"type": "string", "description": "Brief description of the task purpose."}, "execution_mode": { "type": "string", - "enum": ["worker", "octo_control"], - "description": "Execution mode for this scheduled task. Use worker for worker dispatch, or octo_control for future direct Octo control-plane tasks.", + "enum": ["worker", "octo_task", "octo_control"], + "description": ( + "Execution mode for this scheduled task. Use worker for worker dispatch, " + "octo_task for a full autonomous Octo workspace task, or octo_control for " + "bounded scheduler/control-plane maintenance." + ), }, "worker_id": {"type": "string", "description": "Specific worker template ID to use when execution_mode=worker."}, "inputs": {"type": "object", "description": "Optional: Inputs for the worker."}, @@ -720,7 +724,7 @@ def get_tools(mcp_manager=None) -> list[ToolSpec]: name="repair_scheduled_tasks", description=( "Preview or apply safe repairs for scheduled tasks with known route-compatibility suggestions. " - "Proactive mode is guarded to existing-worker blocked_by_route repairs and cannot provide worker_id." + "Proactive mode is guarded to blocked_by_route repairs and cannot provide worker_id." ), parameters={ "type": "object", diff --git a/tests/test_scheduler_safety.py b/tests/test_scheduler_safety.py index 75637d6..d7dcf61 100644 --- a/tests/test_scheduler_safety.py +++ b/tests/test_scheduler_safety.py @@ -199,7 +199,22 @@ def test_schedule_task_rejects_worker_id_for_octo_control_mode(tmp_path: Path) - assert result == "schedule_task error: worker_id must be omitted when execution_mode=octo_control." -def test_schedule_task_derives_legacy_octo_control_mode_without_worker_id(tmp_path: Path) -> None: +def test_schedule_task_rejects_worker_id_for_octo_task_mode(tmp_path: Path) -> None: + scheduler = SchedulerService(store=_StoreStub(), workspace_dir=tmp_path) + result = _tool_schedule_task( + { + "name": "Digest", + "frequency": "Every 30 minutes", + "task": "Generate digest", + "execution_mode": "octo_task", + "worker_id": "writer", + }, + {"octo": SimpleNamespace(scheduler=scheduler)}, + ) + assert result == "schedule_task error: worker_id must be omitted when execution_mode=octo_task." + + +def test_schedule_task_derives_octo_task_mode_without_worker_id(tmp_path: Path) -> None: store = _StoreStub() scheduler = SchedulerService(store=store, workspace_dir=tmp_path) payload = json.loads( @@ -214,13 +229,33 @@ def test_schedule_task_derives_legacy_octo_control_mode_without_worker_id(tmp_pa ) assert payload["status"] == "scheduled" - assert payload["execution_mode"] == "octo_control" - assert payload["notify_user"] == "never" + assert payload["execution_mode"] == "octo_task" + assert payload["notify_user"] == "if_significant" assert store.last_upsert is not None assert store.last_upsert["metadata"] == { - "notify_user": "never", - "execution_mode": "octo_control", + "notify_user": "if_significant", + "execution_mode": "octo_task", } + + +def test_schedule_task_accepts_octo_control_for_bounded_maintenance(tmp_path: Path) -> None: + store = _StoreStub() + scheduler = SchedulerService(store=store, workspace_dir=tmp_path) + + payload = json.loads( + _tool_schedule_task( + { + "name": "Compact memory", + "frequency": "Every 30 minutes", + "task": "Compact memory", + "execution_mode": "octo_control", + }, + {"octo": SimpleNamespace(scheduler=scheduler)}, + ) + ) + + assert payload["execution_mode"] == "octo_control" + assert payload["notify_user"] == "never" def test_check_schedule_returns_json_with_inputs(tmp_path: Path) -> None: @@ -298,7 +333,7 @@ def test_scheduler_status_reports_due_and_next_run_preview(tmp_path: Path) -> No assert payload["tasks"][0]["execution_mode"] == "worker" assert payload["tasks"][0]["dispatch_ready"] is True assert payload["tasks"][0]["suggested_execution_mode"] is None - assert payload["tasks"][1]["execution_mode"] == "octo_control" + assert payload["tasks"][1]["execution_mode"] == "octo_task" assert payload["tasks"][1]["dispatch_ready"] is True assert payload["tasks"][1]["dispatch_policy_reason"] is None assert payload["tasks"][1]["suggested_execution_mode"] is None @@ -329,7 +364,7 @@ def test_scheduler_sync_to_markdown_includes_dispatch_readiness(tmp_path: Path) scheduler.sync_to_markdown() heartbeat = (tmp_path / "HEARTBEAT.md").read_text(encoding="utf-8") - assert "**Execution mode**: octo_control" in heartbeat + assert "**Execution mode**: octo_task" in heartbeat assert "**Dispatch**: ready" in heartbeat @@ -382,7 +417,7 @@ def test_describe_tasks_marks_blocked_octo_control_backoff(tmp_path: Path) -> No assert described[0]["dispatch_policy_reason"] == "blocked_by_route" assert described[0]["blocked_until"] == blocked_until.isoformat() assert described[0]["blocked_reason"] == "blocked_by_route" - assert described[0]["suggested_execution_mode"] == "worker" + assert described[0]["suggested_execution_mode"] == "octo_task" assert described[0]["due_now"] is False @@ -455,7 +490,7 @@ def test_route_blocked_octo_control_stays_not_ready_after_backoff_expires( assert scheduler.get_actionable_tasks() == [] assert described[0]["dispatch_ready"] is False assert described[0]["dispatch_policy_reason"] == "blocked_by_route" - assert described[0]["suggested_execution_mode"] == "worker" + assert described[0]["suggested_execution_mode"] == "octo_task" assert described[0]["due_now"] is False @@ -498,7 +533,7 @@ def test_scheduler_sync_to_markdown_shows_suggested_execution_mode_for_blocked_t scheduler.sync_to_markdown() heartbeat = (tmp_path / "HEARTBEAT.md").read_text(encoding="utf-8") - assert "**Suggested execution mode**: worker" in heartbeat + assert "**Suggested execution mode**: octo_task" in heartbeat def test_scheduler_status_reports_suggested_execution_mode_for_blocked_tasks(tmp_path: Path) -> None: @@ -534,7 +569,7 @@ def test_scheduler_status_reports_suggested_execution_mode_for_blocked_tasks(tmp assert payload["tasks"][0]["execution_mode"] == "octo_control" assert payload["tasks"][0]["dispatch_ready"] is False assert payload["tasks"][0]["dispatch_policy_reason"] == "blocked_by_route" - assert payload["tasks"][0]["suggested_execution_mode"] == "worker" + assert payload["tasks"][0]["suggested_execution_mode"] == "octo_task" assert payload["due_count"] == 0 assert any("suggested execution mode" in hint for hint in payload["hints"]) @@ -574,12 +609,62 @@ def test_repair_scheduled_tasks_previews_candidates_without_applying(tmp_path: P assert payload["candidate_count"] == 1 assert payload["applied_count"] == 0 assert payload["candidates"][0]["task_id"] == "weather_check" - assert payload["candidates"][0]["suggested_execution_mode"] == "worker" - assert payload["candidates"][0]["can_apply"] is False - assert payload["candidates"][0]["skip_reason"] == "missing_worker_id" + assert payload["candidates"][0]["suggested_execution_mode"] == "octo_task" + assert payload["candidates"][0]["can_apply"] is True + assert "skip_reason" not in payload["candidates"][0] assert store.last_upsert is None +def test_repair_scheduled_tasks_applies_octo_task_migration(tmp_path: Path) -> None: + blocked_until = utc_now() + timedelta(minutes=30) + store = _StoreStub( + tasks=[ + { + "id": "draft_write", + "name": "Draft Write", + "description": "Write a draft", + "frequency": "Every 30 minutes", + "worker_id": None, + "task_text": "Write a draft to memory/draft.md", + "inputs_json": "{}", + "metadata_json": json.dumps( + { + "notify_user": "never", + "execution_mode": "octo_control", + "blocked_until": blocked_until.isoformat(), + "blocked_reason": "blocked_by_route", + "suggested_execution_mode": "worker", + } + ), + "last_run_at": None, + "enabled": 1, + } + ] + ) + scheduler = SchedulerService(store=store, workspace_dir=tmp_path) + + payload = json.loads( + _tool_repair_scheduled_tasks( + {"apply": True, "task_ids": ["draft_write"]}, + {"octo": SimpleNamespace(scheduler=scheduler)}, + ) + ) + + assert payload["status"] == "applied" + assert payload["applied"][0] == { + "task_id": "draft_write", + "name": "Draft Write", + "execution_mode": "octo_task", + "worker_id": None, + } + assert store.last_upsert is not None + assert store.last_upsert["worker_id"] is None + assert store.last_upsert["metadata"] == { + "notify_user": "never", + "execution_mode": "octo_task", + } + + def test_repair_scheduled_tasks_applies_worker_migration_with_valid_worker_id(tmp_path: Path) -> None: blocked_until = utc_now() + timedelta(minutes=30) _write_worker_template(tmp_path) @@ -590,7 +675,7 @@ def test_repair_scheduled_tasks_applies_worker_migration_with_valid_worker_id(tm "name": "Weather Check", "description": "Check weather", "frequency": "Every 30 minutes", - "worker_id": None, + "worker_id": "weather_worker", "task_text": "Check the weather", "inputs_json": "{}", "metadata_json": json.dumps( @@ -611,7 +696,7 @@ def test_repair_scheduled_tasks_applies_worker_migration_with_valid_worker_id(tm payload = json.loads( _tool_repair_scheduled_tasks( - {"apply": True, "task_ids": ["weather_check"], "worker_id": "weather_worker"}, + {"apply": True, "task_ids": ["weather_check"]}, {"octo": SimpleNamespace(scheduler=scheduler)}, ) ) @@ -1160,7 +1245,10 @@ async def test_add_self_queue_item_dedupes_active_items(tmp_path, monkeypatch): @pytest.mark.asyncio -async def test_scan_opportunities_includes_blocked_scheduled_task_worker_candidate(tmp_path, monkeypatch): +async def test_scan_opportunities_includes_blocked_scheduled_task_octo_task_candidate( + tmp_path, + monkeypatch, +): monkeypatch.setattr(octo_core, "_workspace_dir", lambda: tmp_path) store = _StoreStub( tasks=[ @@ -1205,13 +1293,13 @@ async def _health(chat_id: int): card = result["opportunities"][0] assert card["kind"] == "scheduled_task_repair" - assert card["dedupe_key"] == "scheduled-task:weather_digest:suggested-worker" - assert card["suggested_worker_id"] == "ops_sre" + assert card["dedupe_key"] == "scheduled-task:weather_digest:suggested-octo_task" + assert "suggested_worker_id" not in card assert card["risk"] == "medium" assert card["inputs"] == { "scheduled_task_id": "weather_digest", "blocked_reason": "blocked_by_route", - "suggested_execution_mode": "worker", + "suggested_execution_mode": "octo_task", } @@ -1261,7 +1349,7 @@ async def _health(chat_id: int): { "title": "Already queued", "task": "Inspect scheduled task.", - "dedupe_key": "scheduled-task:weather_digest:suggested-worker", + "dedupe_key": "scheduled-task:weather_digest:suggested-octo_task", }, ) @@ -1357,6 +1445,57 @@ def _build_plan_should_not_run(*args, **kwargs): assert calls == {"control_prompt": 1, "complete_route": 1} +@pytest.mark.asyncio +async def test_route_scheduled_octo_task_uses_full_conversation_route(monkeypatch): + calls = {"bootstrap": 0, "route": 0} + + class DummyOcto: + provider = object() + memory = object() + store = object() + + task = { + "id": "draft_write", + "name": "Draft Write", + "frequency": "Every 30 minutes", + "execution_mode": "octo_task", + "notify_user": "never", + "task_text": "Write a draft to memory/draft.md", + "inputs": {"path": "memory/draft.md"}, + } + + async def _build_bootstrap_context_prompt(store, chat_id): + calls["bootstrap"] += 1 + assert chat_id == 123 + return SimpleNamespace(content="full context") + + async def _route_or_reply(octo, provider, memory, user_text, chat_id, bootstrap_context, **kwargs): + calls["route"] += 1 + assert "full autonomous workspace task" in user_text + assert "memory/draft.md" in user_text + assert bootstrap_context == "full context" + assert kwargs["show_typing"] is False + assert kwargs["internal_followup"] is True + assert kwargs["route_mode"] == octo_router.RouteMode.CONVERSATION + return "SCHEDULED_TASK_DONE" + + async def _build_control_plane_prompt_should_not_run(**kwargs): + raise AssertionError("control-plane prompt should not run for octo_task") + + monkeypatch.setattr(octo_router, "build_bootstrap_context_prompt", _build_bootstrap_context_prompt) + monkeypatch.setattr(octo_router, "route_or_reply", _route_or_reply) + monkeypatch.setattr( + octo_router, + "build_control_plane_prompt", + _build_control_plane_prompt_should_not_run, + ) + + result = await octo_router.route_scheduled_octo_task(DummyOcto(), task, chat_id=123) + + assert result == "SCHEDULED_TASK_DONE" + assert calls == {"bootstrap": 1, "route": 1} + + @pytest.mark.asyncio async def test_octo_run_scheduler_tick_once_uses_bounded_scheduler_route(monkeypatch): calls = {"scheduler_tick": 0, "dispatch": 0} @@ -1796,7 +1935,9 @@ async def test_octo_dispatch_due_scheduled_tasks_runs_octo_control_tasks(monkeyp "worker_id": None, "task_text": "Compact memory", "inputs_json": "{}", - "metadata_json": json.dumps({"notify_user": "never"}), + "metadata_json": json.dumps( + {"notify_user": "never", "execution_mode": "octo_control"} + ), "last_run_at": None, "enabled": 1, } @@ -1846,6 +1987,69 @@ async def _start_worker_async(self, **kwargs): assert scheduler.store.marked_task_ids == ["memory_compact"] +@pytest.mark.asyncio +async def test_octo_dispatch_due_scheduled_tasks_runs_full_octo_tasks(monkeypatch): + scheduler = SchedulerService( + store=_StoreStub( + tasks=[ + { + "id": "draft_write", + "name": "Draft Write", + "description": "Write a draft", + "frequency": "Every 30 minutes", + "worker_id": None, + "task_text": "Write a draft to memory/draft.md", + "inputs_json": "{}", + "metadata_json": json.dumps( + {"notify_user": "never", "execution_mode": "octo_task"} + ), + "last_run_at": None, + "enabled": 1, + } + ] + ), + workspace_dir=Path("."), + ) + route_calls: list[dict] = [] + + async def _start_worker_async(self, **kwargs): + raise AssertionError("_start_worker_async should not be called directly for octo_task tasks") + + async def _route_scheduled_octo_task(octo, task, *, chat_id=0): + route_calls.append({"task": task, "chat_id": chat_id}) + return "SCHEDULED_TASK_DONE" + + monkeypatch.setattr(octo_core.Octo, "_start_worker_async", _start_worker_async) + monkeypatch.setattr(octo_router, "route_scheduled_octo_task", _route_scheduled_octo_task) + monkeypatch.setattr(octo_core, "route_scheduled_octo_task", _route_scheduled_octo_task) + + octo = Octo( + provider=object(), + store=_StoreStub(), + policy=object(), + runtime=_RuntimeStub(), + approvals=_ApprovalsStub(), + memory=_MemoryStub(), + canon=SimpleNamespace(workspace_dir=Path(".")), + scheduler=scheduler, + ) + + summary = await octo._dispatch_due_scheduled_tasks_once(chat_id=0, max_tasks=5) + + assert summary == { + "due_count": 1, + "attempted": 1, + "started": 0, + "completed": 1, + "duplicates": 0, + "rejected_by_policy": 0, + "policy_reasons": {}, + "errors": 0, + } + assert route_calls and route_calls[0]["task"]["id"] == "draft_write" + assert scheduler.store.marked_task_ids == ["draft_write"] + + @pytest.mark.asyncio async def test_octo_dispatch_due_scheduled_tasks_requires_explicit_octo_control_completion_signal(monkeypatch): scheduler = SchedulerService( @@ -1859,7 +2063,9 @@ async def test_octo_dispatch_due_scheduled_tasks_requires_explicit_octo_control_ "worker_id": None, "task_text": "Compact memory", "inputs_json": "{}", - "metadata_json": json.dumps({"notify_user": "never"}), + "metadata_json": json.dumps( + {"notify_user": "never", "execution_mode": "octo_control"} + ), "last_run_at": None, "enabled": 1, } @@ -1921,7 +2127,9 @@ async def test_octo_dispatch_due_scheduled_tasks_backs_off_blocked_octo_control_ "worker_id": None, "task_text": "Check the weather", "inputs_json": "{}", - "metadata_json": json.dumps({"notify_user": "never"}), + "metadata_json": json.dumps( + {"notify_user": "never", "execution_mode": "octo_control"} + ), "last_run_at": None, "enabled": 1, } @@ -1984,7 +2192,7 @@ async def _route_scheduled_octo_control(octo, task, *, chat_id=0): assert isinstance(metadata, dict) assert metadata["blocked_reason"] == "blocked_by_route" assert "blocked_until" in metadata - assert metadata["suggested_execution_mode"] == "worker" + assert metadata["suggested_execution_mode"] == "octo_task" @pytest.mark.asyncio @@ -2137,7 +2345,9 @@ async def test_octo_control_if_significant_is_treated_as_never_for_delivery(monk "worker_id": None, "task_text": "Send daily digest", "inputs_json": "{}", - "metadata_json": json.dumps({"notify_user": "if_significant"}), + "metadata_json": json.dumps( + {"notify_user": "if_significant", "execution_mode": "octo_control"} + ), "last_run_at": None, "enabled": 1, }