From 3d25bd97425f38f7401d8dbf86d8008b177171a4 Mon Sep 17 00:00:00 2001
From: Slava Trofimov <26082149+pmbstyle@users.noreply.github.com>
Date: Wed, 13 May 2026 18:44:01 -0400
Subject: [PATCH] allow scheduled Octo self tasks
---
src/octopal/runtime/octo/control_replies.py | 18 +-
src/octopal/runtime/octo/core.py | 1 +
src/octopal/runtime/octo/router.py | 73 ++++-
src/octopal/runtime/octo/scheduled_runtime.py | 128 ++++++++-
src/octopal/runtime/octo/self_queue.py | 31 ++-
src/octopal/runtime/scheduler/service.py | 31 ++-
src/octopal/tools/catalog.py | 10 +-
tests/test_scheduler_safety.py | 262 ++++++++++++++++--
8 files changed, 499 insertions(+), 55 deletions(-)
diff --git a/src/octopal/runtime/octo/control_replies.py b/src/octopal/runtime/octo/control_replies.py
index 8b34dd8..c3490d6 100644
--- a/src/octopal/runtime/octo/control_replies.py
+++ b/src/octopal/runtime/octo/control_replies.py
@@ -113,6 +113,8 @@ def _coerce_scheduled_octo_control_reply(text: str) -> str:
async def _normalize_scheduled_octo_control_reply(
provider: InferenceProvider | None,
text: str,
+ *,
+ bounded_control: bool = True,
) -> str:
raw_value = str(text or "")
explicit = extract_heartbeat_user_visible_message(raw_value)
@@ -129,15 +131,27 @@ async def _normalize_scheduled_octo_control_reply(
if provider is None:
return _coerce_scheduled_octo_control_reply(value)
+ if bounded_control:
+ blocked_rule = (
+ "Use SCHEDULED_TASK_BLOCKED when the task cannot complete from the bounded route "
+ "because it needs workers, external access, or unavailable tools."
+ )
+ rewrite_context = "scheduled Octo control"
+ else:
+ blocked_rule = (
+ "Use SCHEDULED_TASK_BLOCKED only when the task cannot complete even with the normal "
+ "scheduled Octo task toolset."
+ )
+ rewrite_context = "scheduled Octo task"
rewrite_prompt = (
- "Rewrite the draft scheduled Octo control reply into the strict completion contract.\n"
+ f"Rewrite the draft {rewrite_context} reply into the strict completion contract.\n"
"Return exactly one of:\n"
"- SCHEDULED_TASK_DONE\n"
"- SCHEDULED_TASK_BLOCKED\n"
"- NO_USER_RESPONSE\n"
"- ...\n"
"Use SCHEDULED_TASK_DONE only if the task completed successfully with no user-visible update.\n"
- "Use SCHEDULED_TASK_BLOCKED when the task cannot complete from the bounded route because it needs workers, external access, or unavailable tools.\n"
+ f"{blocked_rule}\n"
"Use only for a concise completed user-facing update.\n"
"Use NO_USER_RESPONSE if the task did not complete or there is no completion signal.\n"
"Do not include any extra text outside the token or wrapper."
diff --git a/src/octopal/runtime/octo/core.py b/src/octopal/runtime/octo/core.py
index 6a2013d..5c6648f 100644
--- a/src/octopal/runtime/octo/core.py
+++ b/src/octopal/runtime/octo/core.py
@@ -58,6 +58,7 @@
from octopal.runtime.octo.router import route_or_reply as route_or_reply
from octopal.runtime.octo.router import route_proactive_tick as route_proactive_tick
from octopal.runtime.octo.router import route_scheduled_octo_control as route_scheduled_octo_control
+from octopal.runtime.octo.router import route_scheduled_octo_task as route_scheduled_octo_task
from octopal.runtime.octo.router import route_scheduler_tick as route_scheduler_tick
from octopal.runtime.octo.runtime_config import _env_flag, _env_int
from octopal.runtime.octo.scheduled_runtime import OctoScheduledRuntimeMixin
diff --git a/src/octopal/runtime/octo/router.py b/src/octopal/runtime/octo/router.py
index 4f86b9d..7e526be 100644
--- a/src/octopal/runtime/octo/router.py
+++ b/src/octopal/runtime/octo/router.py
@@ -173,6 +173,7 @@
_SCHEDULER_ALLOWED_TOOL_NAMES = {
"check_schedule",
"scheduler_status",
+ "repair_scheduled_tasks",
"octo_context_health",
"list_workers",
"list_active_workers",
@@ -656,6 +657,8 @@ async def route_scheduler_tick(
"Scheduler route rules:\n"
"- Keep this turn operational and bounded.\n"
"- You may inspect schedule state and worker availability.\n"
+ "- You may apply safe scheduled-task route repairs with repair_scheduled_tasks(apply=true) "
+ "when the candidate is unambiguous.\n"
"- Do not dispatch workers directly from this route.\n"
"- Return one of: SCHEDULER_IDLE, NO_USER_RESPONSE, or ...."
),
@@ -721,8 +724,8 @@ async def route_proactive_tick(
"- Keep this turn bounded to initiative discovery and self-queue maintenance.\n"
"- You may add, claim, execute, cancel, or mark self-queue items only when the payload supports it.\n"
"- You may preview scheduled-task repair candidates with repair_scheduled_tasks(apply=false).\n"
- "- You may apply scheduled-task repairs only for an unambiguous blocked_by_route -> worker repair "
- "where the task already has a valid worker_id; never provide worker_id from this route.\n"
+ "- You may apply scheduled-task repairs only for unambiguous blocked_by_route candidates. "
+ "For worker repairs the task must already have a valid worker_id; never provide worker_id from this route.\n"
"- Do not start workers directly, schedule recurring tasks, use filesystem tools, use network/MCP tools, "
"or perform external side effects from this route.\n"
"- Use execute_self_queue_item only for an existing low/medium-risk queue item with an explicit worker_id; "
@@ -834,6 +837,31 @@ async def route_scheduled_octo_control(
await octo.set_thinking(False)
+async def route_scheduled_octo_task(
+ octo: Any,
+ task: dict[str, Any],
+ *,
+ chat_id: int = 0,
+) -> str:
+ """Run a scheduled task as a full Octo workspace task with normal tools and context."""
+ task_text = _build_scheduled_octo_task_input(task)
+ bootstrap_context = await build_bootstrap_context_prompt(octo.store, chat_id)
+ return await route_or_reply(
+ octo,
+ octo.provider,
+ octo.memory,
+ task_text,
+ chat_id,
+ bootstrap_context.content,
+ internal_followup=True,
+ show_typing=False,
+ images=None,
+ saved_file_paths=None,
+ include_wakeup=False,
+ route_mode=RouteMode.CONVERSATION,
+ )
+
+
async def _complete_route_with_tools(
*,
octo: Any,
@@ -1880,6 +1908,11 @@ def _build_scheduler_tick_input(octo: Any, *, max_tasks: int = 10) -> str:
"worker_id": task.get("worker_id"),
"frequency": task.get("frequency"),
"notify_user": task.get("notify_user"),
+ "execution_mode": task.get("execution_mode"),
+ "dispatch_ready": task.get("dispatch_ready"),
+ "dispatch_policy_reason": task.get("dispatch_policy_reason"),
+ "blocked_reason": task.get("blocked_reason"),
+ "suggested_execution_mode": task.get("suggested_execution_mode"),
"task_text": task.get("task_text"),
}
for task in due_tasks[: max(1, int(max_tasks))]
@@ -1891,6 +1924,11 @@ def _build_scheduler_tick_input(octo: Any, *, max_tasks: int = 10) -> str:
"due_now": bool(task.get("due_now")),
"next_run_at": task.get("next_run_at"),
"notify_user": task.get("notify_user"),
+ "execution_mode": task.get("execution_mode"),
+ "dispatch_ready": task.get("dispatch_ready"),
+ "dispatch_policy_reason": task.get("dispatch_policy_reason"),
+ "blocked_reason": task.get("blocked_reason"),
+ "suggested_execution_mode": task.get("suggested_execution_mode"),
}
for task in preview_tasks
],
@@ -1940,8 +1978,8 @@ async def _build_proactive_tick_input(octo: Any, *, chat_id: int, reason: str) -
f"{json.dumps(payload, ensure_ascii=False, sort_keys=True)}\n"
"If there is already pending self-queue work with an explicit worker_id, you may use execute_self_queue_item. "
"If pending work lacks a worker_id, prefer decision=blocked or noop. "
- "If an opportunity kind is scheduled_task_repair and the task already has worker_id, you may preview "
- "repair_scheduled_tasks and apply it only when the candidate is safe. "
+ "If an opportunity kind is scheduled_task_repair, you may preview repair_scheduled_tasks and apply it "
+ "only when the candidate is safe. Worker repairs require an existing worker_id. "
"If the best opportunity is confidence >= 0.75, low/medium risk, and no pending work exists, "
"use octo_self_queue_add to queue exactly one concrete initiative. "
"Do not call start_worker directly from this route."
@@ -1967,6 +2005,33 @@ def _build_scheduled_octo_control_input(task: dict[str, Any]) -> str:
)
+def _build_scheduled_octo_task_input(task: dict[str, Any]) -> str:
+ payload = {
+ "task_id": task.get("id"),
+ "name": task.get("name"),
+ "frequency": task.get("frequency"),
+ "execution_mode": task.get("execution_mode"),
+ "notify_user": task.get("notify_user"),
+ "description": task.get("description"),
+ "task_text": task.get("task_text"),
+ "inputs": task.get("inputs") if isinstance(task.get("inputs"), dict) else {},
+ "last_run_at": task.get("last_run_at"),
+ }
+ return (
+ "Run this scheduled Octo task as a full autonomous workspace task:\n"
+ f"{json.dumps(payload, ensure_ascii=False, sort_keys=True)}\n\n"
+ "Use the normal tools, workspace context, memory, filesystem, MCP, web, and workers as needed. "
+ "Complete the task end-to-end before returning a completion signal. "
+ "If you create or update a file, verify it exists before finishing. "
+ "Do not treat this as a bounded control-plane route.\n\n"
+ "When the task is complete, return exactly one of:\n"
+ "- SCHEDULED_TASK_DONE if it completed successfully and no user-facing update is needed.\n"
+ "- ... if it completed and the user should receive a concise update.\n"
+ "- NO_USER_RESPONSE only if the task intentionally produced no change.\n"
+ "Return SCHEDULED_TASK_BLOCKED only if the task truly cannot be completed even with the full Octo toolset."
+ )
+
+
def _ensure_mandatory_octo_tools(
active_tools: list[ToolSpec], all_tools: list[ToolSpec]
) -> list[ToolSpec]:
diff --git a/src/octopal/runtime/octo/scheduled_runtime.py b/src/octopal/runtime/octo/scheduled_runtime.py
index 2e4e8bc..d705cf3 100644
--- a/src/octopal/runtime/octo/scheduled_runtime.py
+++ b/src/octopal/runtime/octo/scheduled_runtime.py
@@ -21,6 +21,9 @@
from octopal.runtime.octo.router import (
route_scheduled_octo_control as _default_route_scheduled_octo_control,
)
+from octopal.runtime.octo.router import (
+ route_scheduled_octo_task as _default_route_scheduled_octo_task,
+)
from octopal.runtime.octo.runtime_config import _env_int
from octopal.runtime.octo.scheduler_helpers import _coerce_positive_chat_id
from octopal.runtime.scheduler.service import (
@@ -154,7 +157,10 @@ def _update_scheduled_octo_control_backoff_metadata(
else:
metadata.pop(SCHEDULED_TASK_BLOCKED_REASON_KEY, None)
if reason_value == "blocked_by_route":
- metadata[SCHEDULED_TASK_SUGGESTED_EXECUTION_MODE_KEY] = "worker"
+ if str(task.get("worker_id") or "").strip():
+ metadata[SCHEDULED_TASK_SUGGESTED_EXECUTION_MODE_KEY] = "worker"
+ else:
+ metadata[SCHEDULED_TASK_SUGGESTED_EXECUTION_MODE_KEY] = "octo_task"
elif blocked_until is None:
metadata.pop(SCHEDULED_TASK_SUGGESTED_EXECUTION_MODE_KEY, None)
try:
@@ -312,6 +318,25 @@ async def _dispatch_due_scheduled_tasks_once(
elif str(result.get("status") or "").strip().lower() == "failed":
summary["errors"] += 1
continue
+ if execution_mode == "octo_task":
+ summary["attempted"] += 1
+ try:
+ result = await self._run_scheduled_octo_task_once(
+ task=task,
+ chat_id=dispatch_chat_id,
+ )
+ except Exception:
+ summary["errors"] += 1
+ logger.exception(
+ "Scheduled Octo task failed",
+ task_id=task_id or None,
+ )
+ continue
+ if bool(result.get("completed")):
+ summary["completed"] += 1
+ elif str(result.get("status") or "").strip().lower() == "failed":
+ summary["errors"] += 1
+ continue
if not worker_id or not task_text:
summary["rejected_by_policy"] += 1
reason = "missing_worker_id" if not worker_id else "missing_task_text"
@@ -471,3 +496,104 @@ async def _run_scheduled_octo_control_task_once(
"user_visible_sent": user_visible_sent,
"delivery_mode": delivery.mode,
}
+
+ async def _run_scheduled_octo_task_once(
+ self,
+ *,
+ task: dict[str, Any],
+ chat_id: int = 0,
+ ) -> dict[str, Any]:
+ scheduler = self.scheduler
+ task_id = str(task.get("id") or "").strip()
+ notify_user = normalize_notify_user_policy(task.get("notify_user"))
+ route_scheduled_octo_task = _core_callable(
+ "route_scheduled_octo_task",
+ _default_route_scheduled_octo_task,
+ )
+ reply_text = await route_scheduled_octo_task(
+ self,
+ task,
+ chat_id=chat_id,
+ )
+ normalized_reply = await _normalize_scheduled_octo_control_reply(
+ self.provider,
+ reply_text,
+ bounded_control=False,
+ )
+ if normalized_reply == _SCHEDULED_OCTO_CONTROL_BLOCKED:
+ logger.warning(
+ "Scheduled Octo task reported blocked",
+ task_id=task_id or None,
+ chat_id=chat_id,
+ raw_reply_preview=safe_preview(reply_text, limit=200),
+ )
+ return {
+ "status": "failed",
+ "completed": False,
+ "reason": "blocked",
+ }
+ if normalized_reply == "NO_USER_RESPONSE":
+ logger.warning(
+ "Scheduled Octo task missing explicit completion signal",
+ task_id=task_id or None,
+ chat_id=chat_id,
+ raw_reply_preview=safe_preview(reply_text, limit=200),
+ )
+ return {
+ "status": "failed",
+ "completed": False,
+ "reason": "missing_completion_signal",
+ }
+ if normalized_reply == _SCHEDULED_OCTO_CONTROL_DONE:
+ if scheduler is not None and task_id:
+ scheduler.mark_executed(task_id)
+ logger.info(
+ "Scheduled Octo task completed silently",
+ task_id=task_id or None,
+ notify_user=notify_user,
+ chat_id=chat_id,
+ )
+ return {
+ "status": "completed",
+ "completed": True,
+ "user_visible_sent": False,
+ "delivery_mode": DeliveryMode.SILENT,
+ }
+
+ delivery = resolve_user_delivery(normalized_reply)
+ user_visible_sent = False
+ if delivery.user_visible and notify_user != "never":
+ send_scheduler_control_update = _core_callable(
+ "_send_scheduler_control_update",
+ _default_send_scheduler_control_update,
+ )
+ await send_scheduler_control_update(
+ self,
+ chat_id,
+ task_id or None,
+ delivery.text,
+ )
+ user_visible_sent = True
+ elif delivery.user_visible:
+ logger.info(
+ "Scheduled Octo task update suppressed by notify policy",
+ task_id=task_id or None,
+ notify_user=notify_user,
+ chat_id=chat_id,
+ )
+ if scheduler is not None and task_id:
+ scheduler.mark_executed(task_id)
+ logger.info(
+ "Scheduled Octo task completed",
+ task_id=task_id or None,
+ notify_user=notify_user,
+ chat_id=chat_id,
+ user_visible_sent=user_visible_sent,
+ delivery_mode=delivery.mode,
+ )
+ return {
+ "status": "completed",
+ "completed": True,
+ "user_visible_sent": user_visible_sent,
+ "delivery_mode": delivery.mode,
+ }
diff --git a/src/octopal/runtime/octo/self_queue.py b/src/octopal/runtime/octo/self_queue.py
index ecdba04..fd37ccd 100644
--- a/src/octopal/runtime/octo/self_queue.py
+++ b/src/octopal/runtime/octo/self_queue.py
@@ -84,19 +84,34 @@ def _scheduler_opportunity_cards(
if not task_id:
continue
suggested_mode = str(scheduled_task.get("suggested_execution_mode") or "").strip().lower()
- if suggested_mode != "worker":
+ if suggested_mode not in {"worker", "octo_task"}:
continue
- dedupe_key = f"scheduled-task:{task_id}:suggested-worker"
+ worker_id = str(scheduled_task.get("worker_id") or "").strip()
+ if suggested_mode == "worker" and not worker_id:
+ continue
+ dedupe_key = f"scheduled-task:{task_id}:suggested-{suggested_mode}"
if dedupe_key in active_dedupe_keys:
continue
name = str(scheduled_task.get("name") or task_id).strip()
- worker_id = str(scheduled_task.get("worker_id") or "").strip() or "ops_sre"
blocked_reason = str(
scheduled_task.get("blocked_reason")
or scheduled_task.get("dispatch_policy_reason")
- or "suggested_execution_mode=worker"
+ or f"suggested_execution_mode={suggested_mode}"
).strip()
+ if suggested_mode == "worker":
+ task = (
+ f"Inspect scheduled task {task_id!r} ({name!r}) blocked from its current route. "
+ "Find the least-risk repair or migration path, verify whether worker execution is appropriate, "
+ "and report the exact recommended change."
+ )
+ suggested_worker_id = worker_id
+ else:
+ task = (
+ f"Repair scheduled task {task_id!r} ({name!r}) from its blocked control route "
+ "to the full scheduled Octo task execution mode, then verify it is dispatch-ready."
+ )
+ suggested_worker_id = None
cards.append(
_build_opportunity_card(
kind="scheduled_task_repair",
@@ -106,12 +121,8 @@ def _scheduler_opportunity_cards(
effort="medium",
confidence=0.88,
risk="medium",
- suggested_worker_id=worker_id,
- task=(
- f"Inspect scheduled task {task_id!r} ({name!r}) blocked from its current route. "
- "Find the least-risk repair or migration path, verify whether worker execution is appropriate, "
- "and report the exact recommended change."
- ),
+ suggested_worker_id=suggested_worker_id,
+ task=task,
dedupe_key=dedupe_key,
inputs={
"scheduled_task_id": task_id,
diff --git a/src/octopal/runtime/scheduler/service.py b/src/octopal/runtime/scheduler/service.py
index ac816a2..3c45878 100644
--- a/src/octopal/runtime/scheduler/service.py
+++ b/src/octopal/runtime/scheduler/service.py
@@ -18,7 +18,7 @@
_EVERY_HOURS_RE = re.compile(r"^every\s+(\d+)\s+hours?$", re.IGNORECASE)
_DAILY_AT_RE = re.compile(r"^daily\s+at\s+(\d{1,2}):(\d{2})$", re.IGNORECASE)
_NOTIFY_USER_POLICIES = {"never", "if_significant", "always"}
-_EXECUTION_MODES = {"worker", "octo_control"}
+_EXECUTION_MODES = {"worker", "octo_control", "octo_task"}
SCHEDULED_TASK_DELIVERY_CHAT_ID_KEY = "delivery_chat_id"
SCHEDULED_TASK_TARGET_CHAT_ID_KEY = "target_chat_id"
SCHEDULED_TASK_BLOCKED_UNTIL_KEY = "blocked_until"
@@ -40,8 +40,14 @@ def normalize_execution_mode(
worker_id: str | None = None,
) -> str:
value = str(execution_mode or "").strip().lower()
+ aliases = {
+ "octo": "octo_task",
+ "octo_self": "octo_task",
+ "self": "octo_task",
+ }
+ value = aliases.get(value, value)
if not value:
- return "worker" if str(worker_id or "").strip() else "octo_control"
+ return "worker" if str(worker_id or "").strip() else "octo_task"
if value not in _EXECUTION_MODES:
allowed = ", ".join(sorted(_EXECUTION_MODES))
raise ValueError(f"execution_mode must be one of: {allowed}.")
@@ -100,8 +106,10 @@ def schedule_task(
normalized_notify_user = "never"
if normalized_execution_mode == "worker" and not worker_id_value:
raise ValueError("worker_id is required when execution_mode=worker.")
- if normalized_execution_mode == "octo_control" and worker_id_value:
- raise ValueError("worker_id must be omitted when execution_mode=octo_control.")
+ if normalized_execution_mode in {"octo_control", "octo_task"} and worker_id_value:
+ raise ValueError(
+ f"worker_id must be omitted when execution_mode={normalized_execution_mode}."
+ )
task_id = self._generate_id(name)
self.store.upsert_scheduled_task(
task_id=task_id,
@@ -163,6 +171,8 @@ def repair_suggested_tasks(
elif get_worker_template(self.workspace_dir, resolved_worker_id) is None:
can_apply = False
skip_reason = skip_reason or "unknown_worker_id"
+ elif suggested_mode == "octo_task":
+ resolved_worker_id = None
candidate = {
"task_id": task_id,
"name": task.get("name"),
@@ -441,11 +451,11 @@ def _normalize_task_record(self, task: dict[str, Any]) -> dict[str, Any]:
blocked_reason = str(metadata.get(SCHEDULED_TASK_BLOCKED_REASON_KEY) or "").strip() or None
suggested_execution_mode = parse_scheduled_task_suggested_execution_mode(metadata)
if (
- suggested_execution_mode is None
- and normalized["execution_mode"] == "octo_control"
+ normalized["execution_mode"] == "octo_control"
and blocked_reason == "blocked_by_route"
+ and not str(normalized.get("worker_id") or "").strip()
):
- suggested_execution_mode = "worker"
+ suggested_execution_mode = "octo_task"
normalized["blocked_until"] = blocked_until.isoformat() if blocked_until is not None else None
normalized["blocked_reason"] = blocked_reason
normalized["suggested_execution_mode"] = suggested_execution_mode
@@ -457,7 +467,10 @@ def _normalize_task_record(self, task: dict[str, Any]) -> dict[str, Any]:
def _dispatch_readiness(self, task: dict[str, Any]) -> tuple[bool, str | None]:
execution_mode = str(task.get("execution_mode") or "").strip().lower()
suggested_execution_mode = str(task.get("suggested_execution_mode") or "").strip().lower()
- if execution_mode == "octo_control" and suggested_execution_mode == "worker":
+ if execution_mode == "octo_control" and suggested_execution_mode in {
+ "worker",
+ "octo_task",
+ }:
return False, str(task.get("blocked_reason") or "").strip() or "blocked_by_route"
blocked_until_value = str(task.get("blocked_until") or "").strip()
if blocked_until_value:
@@ -467,7 +480,7 @@ def _dispatch_readiness(self, task: dict[str, Any]) -> tuple[bool, str | None]:
blocked_until = None
if blocked_until is not None and blocked_until.tzinfo is not None and blocked_until > utc_now():
return False, str(task.get("blocked_reason") or "").strip() or "blocked_by_route_backoff"
- if execution_mode == "octo_control":
+ if execution_mode in {"octo_control", "octo_task"}:
task_text = str(task.get("task_text") or "").strip()
if not task_text:
return False, "missing_task_text"
diff --git a/src/octopal/tools/catalog.py b/src/octopal/tools/catalog.py
index c14b94b..ad1115d 100644
--- a/src/octopal/tools/catalog.py
+++ b/src/octopal/tools/catalog.py
@@ -683,8 +683,12 @@ def get_tools(mcp_manager=None) -> list[ToolSpec]:
"description": {"type": "string", "description": "Brief description of the task purpose."},
"execution_mode": {
"type": "string",
- "enum": ["worker", "octo_control"],
- "description": "Execution mode for this scheduled task. Use worker for worker dispatch, or octo_control for future direct Octo control-plane tasks.",
+ "enum": ["worker", "octo_task", "octo_control"],
+ "description": (
+ "Execution mode for this scheduled task. Use worker for worker dispatch, "
+ "octo_task for a full autonomous Octo workspace task, or octo_control for "
+ "bounded scheduler/control-plane maintenance."
+ ),
},
"worker_id": {"type": "string", "description": "Specific worker template ID to use when execution_mode=worker."},
"inputs": {"type": "object", "description": "Optional: Inputs for the worker."},
@@ -720,7 +724,7 @@ def get_tools(mcp_manager=None) -> list[ToolSpec]:
name="repair_scheduled_tasks",
description=(
"Preview or apply safe repairs for scheduled tasks with known route-compatibility suggestions. "
- "Proactive mode is guarded to existing-worker blocked_by_route repairs and cannot provide worker_id."
+ "Proactive mode is guarded to blocked_by_route repairs and cannot provide worker_id."
),
parameters={
"type": "object",
diff --git a/tests/test_scheduler_safety.py b/tests/test_scheduler_safety.py
index 75637d6..d7dcf61 100644
--- a/tests/test_scheduler_safety.py
+++ b/tests/test_scheduler_safety.py
@@ -199,7 +199,22 @@ def test_schedule_task_rejects_worker_id_for_octo_control_mode(tmp_path: Path) -
assert result == "schedule_task error: worker_id must be omitted when execution_mode=octo_control."
-def test_schedule_task_derives_legacy_octo_control_mode_without_worker_id(tmp_path: Path) -> None:
+def test_schedule_task_rejects_worker_id_for_octo_task_mode(tmp_path: Path) -> None:
+ scheduler = SchedulerService(store=_StoreStub(), workspace_dir=tmp_path)
+ result = _tool_schedule_task(
+ {
+ "name": "Digest",
+ "frequency": "Every 30 minutes",
+ "task": "Generate digest",
+ "execution_mode": "octo_task",
+ "worker_id": "writer",
+ },
+ {"octo": SimpleNamespace(scheduler=scheduler)},
+ )
+ assert result == "schedule_task error: worker_id must be omitted when execution_mode=octo_task."
+
+
+def test_schedule_task_derives_octo_task_mode_without_worker_id(tmp_path: Path) -> None:
store = _StoreStub()
scheduler = SchedulerService(store=store, workspace_dir=tmp_path)
payload = json.loads(
@@ -214,13 +229,33 @@ def test_schedule_task_derives_legacy_octo_control_mode_without_worker_id(tmp_pa
)
assert payload["status"] == "scheduled"
- assert payload["execution_mode"] == "octo_control"
- assert payload["notify_user"] == "never"
+ assert payload["execution_mode"] == "octo_task"
+ assert payload["notify_user"] == "if_significant"
assert store.last_upsert is not None
assert store.last_upsert["metadata"] == {
- "notify_user": "never",
- "execution_mode": "octo_control",
+ "notify_user": "if_significant",
+ "execution_mode": "octo_task",
}
+
+
+def test_schedule_task_accepts_octo_control_for_bounded_maintenance(tmp_path: Path) -> None:
+ store = _StoreStub()
+ scheduler = SchedulerService(store=store, workspace_dir=tmp_path)
+
+ payload = json.loads(
+ _tool_schedule_task(
+ {
+ "name": "Compact memory",
+ "frequency": "Every 30 minutes",
+ "task": "Compact memory",
+ "execution_mode": "octo_control",
+ },
+ {"octo": SimpleNamespace(scheduler=scheduler)},
+ )
+ )
+
+ assert payload["execution_mode"] == "octo_control"
+ assert payload["notify_user"] == "never"
def test_check_schedule_returns_json_with_inputs(tmp_path: Path) -> None:
@@ -298,7 +333,7 @@ def test_scheduler_status_reports_due_and_next_run_preview(tmp_path: Path) -> No
assert payload["tasks"][0]["execution_mode"] == "worker"
assert payload["tasks"][0]["dispatch_ready"] is True
assert payload["tasks"][0]["suggested_execution_mode"] is None
- assert payload["tasks"][1]["execution_mode"] == "octo_control"
+ assert payload["tasks"][1]["execution_mode"] == "octo_task"
assert payload["tasks"][1]["dispatch_ready"] is True
assert payload["tasks"][1]["dispatch_policy_reason"] is None
assert payload["tasks"][1]["suggested_execution_mode"] is None
@@ -329,7 +364,7 @@ def test_scheduler_sync_to_markdown_includes_dispatch_readiness(tmp_path: Path)
scheduler.sync_to_markdown()
heartbeat = (tmp_path / "HEARTBEAT.md").read_text(encoding="utf-8")
- assert "**Execution mode**: octo_control" in heartbeat
+ assert "**Execution mode**: octo_task" in heartbeat
assert "**Dispatch**: ready" in heartbeat
@@ -382,7 +417,7 @@ def test_describe_tasks_marks_blocked_octo_control_backoff(tmp_path: Path) -> No
assert described[0]["dispatch_policy_reason"] == "blocked_by_route"
assert described[0]["blocked_until"] == blocked_until.isoformat()
assert described[0]["blocked_reason"] == "blocked_by_route"
- assert described[0]["suggested_execution_mode"] == "worker"
+ assert described[0]["suggested_execution_mode"] == "octo_task"
assert described[0]["due_now"] is False
@@ -455,7 +490,7 @@ def test_route_blocked_octo_control_stays_not_ready_after_backoff_expires(
assert scheduler.get_actionable_tasks() == []
assert described[0]["dispatch_ready"] is False
assert described[0]["dispatch_policy_reason"] == "blocked_by_route"
- assert described[0]["suggested_execution_mode"] == "worker"
+ assert described[0]["suggested_execution_mode"] == "octo_task"
assert described[0]["due_now"] is False
@@ -498,7 +533,7 @@ def test_scheduler_sync_to_markdown_shows_suggested_execution_mode_for_blocked_t
scheduler.sync_to_markdown()
heartbeat = (tmp_path / "HEARTBEAT.md").read_text(encoding="utf-8")
- assert "**Suggested execution mode**: worker" in heartbeat
+ assert "**Suggested execution mode**: octo_task" in heartbeat
def test_scheduler_status_reports_suggested_execution_mode_for_blocked_tasks(tmp_path: Path) -> None:
@@ -534,7 +569,7 @@ def test_scheduler_status_reports_suggested_execution_mode_for_blocked_tasks(tmp
assert payload["tasks"][0]["execution_mode"] == "octo_control"
assert payload["tasks"][0]["dispatch_ready"] is False
assert payload["tasks"][0]["dispatch_policy_reason"] == "blocked_by_route"
- assert payload["tasks"][0]["suggested_execution_mode"] == "worker"
+ assert payload["tasks"][0]["suggested_execution_mode"] == "octo_task"
assert payload["due_count"] == 0
assert any("suggested execution mode" in hint for hint in payload["hints"])
@@ -574,12 +609,62 @@ def test_repair_scheduled_tasks_previews_candidates_without_applying(tmp_path: P
assert payload["candidate_count"] == 1
assert payload["applied_count"] == 0
assert payload["candidates"][0]["task_id"] == "weather_check"
- assert payload["candidates"][0]["suggested_execution_mode"] == "worker"
- assert payload["candidates"][0]["can_apply"] is False
- assert payload["candidates"][0]["skip_reason"] == "missing_worker_id"
+ assert payload["candidates"][0]["suggested_execution_mode"] == "octo_task"
+ assert payload["candidates"][0]["can_apply"] is True
+ assert "skip_reason" not in payload["candidates"][0]
assert store.last_upsert is None
+def test_repair_scheduled_tasks_applies_octo_task_migration(tmp_path: Path) -> None:
+ blocked_until = utc_now() + timedelta(minutes=30)
+ store = _StoreStub(
+ tasks=[
+ {
+ "id": "draft_write",
+ "name": "Draft Write",
+ "description": "Write a draft",
+ "frequency": "Every 30 minutes",
+ "worker_id": None,
+ "task_text": "Write a draft to memory/draft.md",
+ "inputs_json": "{}",
+ "metadata_json": json.dumps(
+ {
+ "notify_user": "never",
+ "execution_mode": "octo_control",
+ "blocked_until": blocked_until.isoformat(),
+ "blocked_reason": "blocked_by_route",
+ "suggested_execution_mode": "worker",
+ }
+ ),
+ "last_run_at": None,
+ "enabled": 1,
+ }
+ ]
+ )
+ scheduler = SchedulerService(store=store, workspace_dir=tmp_path)
+
+ payload = json.loads(
+ _tool_repair_scheduled_tasks(
+ {"apply": True, "task_ids": ["draft_write"]},
+ {"octo": SimpleNamespace(scheduler=scheduler)},
+ )
+ )
+
+ assert payload["status"] == "applied"
+ assert payload["applied"][0] == {
+ "task_id": "draft_write",
+ "name": "Draft Write",
+ "execution_mode": "octo_task",
+ "worker_id": None,
+ }
+ assert store.last_upsert is not None
+ assert store.last_upsert["worker_id"] is None
+ assert store.last_upsert["metadata"] == {
+ "notify_user": "never",
+ "execution_mode": "octo_task",
+ }
+
+
def test_repair_scheduled_tasks_applies_worker_migration_with_valid_worker_id(tmp_path: Path) -> None:
blocked_until = utc_now() + timedelta(minutes=30)
_write_worker_template(tmp_path)
@@ -590,7 +675,7 @@ def test_repair_scheduled_tasks_applies_worker_migration_with_valid_worker_id(tm
"name": "Weather Check",
"description": "Check weather",
"frequency": "Every 30 minutes",
- "worker_id": None,
+ "worker_id": "weather_worker",
"task_text": "Check the weather",
"inputs_json": "{}",
"metadata_json": json.dumps(
@@ -611,7 +696,7 @@ def test_repair_scheduled_tasks_applies_worker_migration_with_valid_worker_id(tm
payload = json.loads(
_tool_repair_scheduled_tasks(
- {"apply": True, "task_ids": ["weather_check"], "worker_id": "weather_worker"},
+ {"apply": True, "task_ids": ["weather_check"]},
{"octo": SimpleNamespace(scheduler=scheduler)},
)
)
@@ -1160,7 +1245,10 @@ async def test_add_self_queue_item_dedupes_active_items(tmp_path, monkeypatch):
@pytest.mark.asyncio
-async def test_scan_opportunities_includes_blocked_scheduled_task_worker_candidate(tmp_path, monkeypatch):
+async def test_scan_opportunities_includes_blocked_scheduled_task_octo_task_candidate(
+ tmp_path,
+ monkeypatch,
+):
monkeypatch.setattr(octo_core, "_workspace_dir", lambda: tmp_path)
store = _StoreStub(
tasks=[
@@ -1205,13 +1293,13 @@ async def _health(chat_id: int):
card = result["opportunities"][0]
assert card["kind"] == "scheduled_task_repair"
- assert card["dedupe_key"] == "scheduled-task:weather_digest:suggested-worker"
- assert card["suggested_worker_id"] == "ops_sre"
+ assert card["dedupe_key"] == "scheduled-task:weather_digest:suggested-octo_task"
+ assert "suggested_worker_id" not in card
assert card["risk"] == "medium"
assert card["inputs"] == {
"scheduled_task_id": "weather_digest",
"blocked_reason": "blocked_by_route",
- "suggested_execution_mode": "worker",
+ "suggested_execution_mode": "octo_task",
}
@@ -1261,7 +1349,7 @@ async def _health(chat_id: int):
{
"title": "Already queued",
"task": "Inspect scheduled task.",
- "dedupe_key": "scheduled-task:weather_digest:suggested-worker",
+ "dedupe_key": "scheduled-task:weather_digest:suggested-octo_task",
},
)
@@ -1357,6 +1445,57 @@ def _build_plan_should_not_run(*args, **kwargs):
assert calls == {"control_prompt": 1, "complete_route": 1}
+@pytest.mark.asyncio
+async def test_route_scheduled_octo_task_uses_full_conversation_route(monkeypatch):
+ calls = {"bootstrap": 0, "route": 0}
+
+ class DummyOcto:
+ provider = object()
+ memory = object()
+ store = object()
+
+ task = {
+ "id": "draft_write",
+ "name": "Draft Write",
+ "frequency": "Every 30 minutes",
+ "execution_mode": "octo_task",
+ "notify_user": "never",
+ "task_text": "Write a draft to memory/draft.md",
+ "inputs": {"path": "memory/draft.md"},
+ }
+
+ async def _build_bootstrap_context_prompt(store, chat_id):
+ calls["bootstrap"] += 1
+ assert chat_id == 123
+ return SimpleNamespace(content="full context")
+
+ async def _route_or_reply(octo, provider, memory, user_text, chat_id, bootstrap_context, **kwargs):
+ calls["route"] += 1
+ assert "full autonomous workspace task" in user_text
+ assert "memory/draft.md" in user_text
+ assert bootstrap_context == "full context"
+ assert kwargs["show_typing"] is False
+ assert kwargs["internal_followup"] is True
+ assert kwargs["route_mode"] == octo_router.RouteMode.CONVERSATION
+ return "SCHEDULED_TASK_DONE"
+
+ async def _build_control_plane_prompt_should_not_run(**kwargs):
+ raise AssertionError("control-plane prompt should not run for octo_task")
+
+ monkeypatch.setattr(octo_router, "build_bootstrap_context_prompt", _build_bootstrap_context_prompt)
+ monkeypatch.setattr(octo_router, "route_or_reply", _route_or_reply)
+ monkeypatch.setattr(
+ octo_router,
+ "build_control_plane_prompt",
+ _build_control_plane_prompt_should_not_run,
+ )
+
+ result = await octo_router.route_scheduled_octo_task(DummyOcto(), task, chat_id=123)
+
+ assert result == "SCHEDULED_TASK_DONE"
+ assert calls == {"bootstrap": 1, "route": 1}
+
+
@pytest.mark.asyncio
async def test_octo_run_scheduler_tick_once_uses_bounded_scheduler_route(monkeypatch):
calls = {"scheduler_tick": 0, "dispatch": 0}
@@ -1796,7 +1935,9 @@ async def test_octo_dispatch_due_scheduled_tasks_runs_octo_control_tasks(monkeyp
"worker_id": None,
"task_text": "Compact memory",
"inputs_json": "{}",
- "metadata_json": json.dumps({"notify_user": "never"}),
+ "metadata_json": json.dumps(
+ {"notify_user": "never", "execution_mode": "octo_control"}
+ ),
"last_run_at": None,
"enabled": 1,
}
@@ -1846,6 +1987,69 @@ async def _start_worker_async(self, **kwargs):
assert scheduler.store.marked_task_ids == ["memory_compact"]
+@pytest.mark.asyncio
+async def test_octo_dispatch_due_scheduled_tasks_runs_full_octo_tasks(monkeypatch):
+ scheduler = SchedulerService(
+ store=_StoreStub(
+ tasks=[
+ {
+ "id": "draft_write",
+ "name": "Draft Write",
+ "description": "Write a draft",
+ "frequency": "Every 30 minutes",
+ "worker_id": None,
+ "task_text": "Write a draft to memory/draft.md",
+ "inputs_json": "{}",
+ "metadata_json": json.dumps(
+ {"notify_user": "never", "execution_mode": "octo_task"}
+ ),
+ "last_run_at": None,
+ "enabled": 1,
+ }
+ ]
+ ),
+ workspace_dir=Path("."),
+ )
+ route_calls: list[dict] = []
+
+ async def _start_worker_async(self, **kwargs):
+ raise AssertionError("_start_worker_async should not be called directly for octo_task tasks")
+
+ async def _route_scheduled_octo_task(octo, task, *, chat_id=0):
+ route_calls.append({"task": task, "chat_id": chat_id})
+ return "SCHEDULED_TASK_DONE"
+
+ monkeypatch.setattr(octo_core.Octo, "_start_worker_async", _start_worker_async)
+ monkeypatch.setattr(octo_router, "route_scheduled_octo_task", _route_scheduled_octo_task)
+ monkeypatch.setattr(octo_core, "route_scheduled_octo_task", _route_scheduled_octo_task)
+
+ octo = Octo(
+ provider=object(),
+ store=_StoreStub(),
+ policy=object(),
+ runtime=_RuntimeStub(),
+ approvals=_ApprovalsStub(),
+ memory=_MemoryStub(),
+ canon=SimpleNamespace(workspace_dir=Path(".")),
+ scheduler=scheduler,
+ )
+
+ summary = await octo._dispatch_due_scheduled_tasks_once(chat_id=0, max_tasks=5)
+
+ assert summary == {
+ "due_count": 1,
+ "attempted": 1,
+ "started": 0,
+ "completed": 1,
+ "duplicates": 0,
+ "rejected_by_policy": 0,
+ "policy_reasons": {},
+ "errors": 0,
+ }
+ assert route_calls and route_calls[0]["task"]["id"] == "draft_write"
+ assert scheduler.store.marked_task_ids == ["draft_write"]
+
+
@pytest.mark.asyncio
async def test_octo_dispatch_due_scheduled_tasks_requires_explicit_octo_control_completion_signal(monkeypatch):
scheduler = SchedulerService(
@@ -1859,7 +2063,9 @@ async def test_octo_dispatch_due_scheduled_tasks_requires_explicit_octo_control_
"worker_id": None,
"task_text": "Compact memory",
"inputs_json": "{}",
- "metadata_json": json.dumps({"notify_user": "never"}),
+ "metadata_json": json.dumps(
+ {"notify_user": "never", "execution_mode": "octo_control"}
+ ),
"last_run_at": None,
"enabled": 1,
}
@@ -1921,7 +2127,9 @@ async def test_octo_dispatch_due_scheduled_tasks_backs_off_blocked_octo_control_
"worker_id": None,
"task_text": "Check the weather",
"inputs_json": "{}",
- "metadata_json": json.dumps({"notify_user": "never"}),
+ "metadata_json": json.dumps(
+ {"notify_user": "never", "execution_mode": "octo_control"}
+ ),
"last_run_at": None,
"enabled": 1,
}
@@ -1984,7 +2192,7 @@ async def _route_scheduled_octo_control(octo, task, *, chat_id=0):
assert isinstance(metadata, dict)
assert metadata["blocked_reason"] == "blocked_by_route"
assert "blocked_until" in metadata
- assert metadata["suggested_execution_mode"] == "worker"
+ assert metadata["suggested_execution_mode"] == "octo_task"
@pytest.mark.asyncio
@@ -2137,7 +2345,9 @@ async def test_octo_control_if_significant_is_treated_as_never_for_delivery(monk
"worker_id": None,
"task_text": "Send daily digest",
"inputs_json": "{}",
- "metadata_json": json.dumps({"notify_user": "if_significant"}),
+ "metadata_json": json.dumps(
+ {"notify_user": "if_significant", "execution_mode": "octo_control"}
+ ),
"last_run_at": None,
"enabled": 1,
}