Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions src/octopal/runtime/octo/control_replies.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ def _coerce_scheduled_octo_control_reply(text: str) -> str:
async def _normalize_scheduled_octo_control_reply(
provider: InferenceProvider | None,
text: str,
*,
bounded_control: bool = True,
) -> str:
raw_value = str(text or "")
explicit = extract_heartbeat_user_visible_message(raw_value)
Expand All @@ -129,15 +131,27 @@ async def _normalize_scheduled_octo_control_reply(
if provider is None:
return _coerce_scheduled_octo_control_reply(value)

if bounded_control:
blocked_rule = (
"Use SCHEDULED_TASK_BLOCKED when the task cannot complete from the bounded route "
"because it needs workers, external access, or unavailable tools."
)
rewrite_context = "scheduled Octo control"
else:
blocked_rule = (
"Use SCHEDULED_TASK_BLOCKED only when the task cannot complete even with the normal "
"scheduled Octo task toolset."
)
rewrite_context = "scheduled Octo task"
rewrite_prompt = (
"Rewrite the draft scheduled Octo control reply into the strict completion contract.\n"
f"Rewrite the draft {rewrite_context} reply into the strict completion contract.\n"
"Return exactly one of:\n"
"- SCHEDULED_TASK_DONE\n"
"- SCHEDULED_TASK_BLOCKED\n"
"- NO_USER_RESPONSE\n"
"- <user_visible>...</user_visible>\n"
"Use SCHEDULED_TASK_DONE only if the task completed successfully with no user-visible update.\n"
"Use SCHEDULED_TASK_BLOCKED when the task cannot complete from the bounded route because it needs workers, external access, or unavailable tools.\n"
f"{blocked_rule}\n"
"Use <user_visible> only for a concise completed user-facing update.\n"
"Use NO_USER_RESPONSE if the task did not complete or there is no completion signal.\n"
"Do not include any extra text outside the token or wrapper."
Expand Down
1 change: 1 addition & 0 deletions src/octopal/runtime/octo/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from octopal.runtime.octo.router import route_or_reply as route_or_reply
from octopal.runtime.octo.router import route_proactive_tick as route_proactive_tick
from octopal.runtime.octo.router import route_scheduled_octo_control as route_scheduled_octo_control
from octopal.runtime.octo.router import route_scheduled_octo_task as route_scheduled_octo_task
from octopal.runtime.octo.router import route_scheduler_tick as route_scheduler_tick
from octopal.runtime.octo.runtime_config import _env_flag, _env_int
from octopal.runtime.octo.scheduled_runtime import OctoScheduledRuntimeMixin
Expand Down
73 changes: 69 additions & 4 deletions src/octopal/runtime/octo/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@
_SCHEDULER_ALLOWED_TOOL_NAMES = {
"check_schedule",
"scheduler_status",
"repair_scheduled_tasks",
"octo_context_health",
"list_workers",
"list_active_workers",
Expand Down Expand Up @@ -656,6 +657,8 @@ async def route_scheduler_tick(
"Scheduler route rules:\n"
"- Keep this turn operational and bounded.\n"
"- You may inspect schedule state and worker availability.\n"
"- You may apply safe scheduled-task route repairs with repair_scheduled_tasks(apply=true) "
"when the candidate is unambiguous.\n"
"- Do not dispatch workers directly from this route.\n"
"- Return one of: SCHEDULER_IDLE, NO_USER_RESPONSE, or <user_visible>...</user_visible>."
),
Expand Down Expand Up @@ -721,8 +724,8 @@ async def route_proactive_tick(
"- Keep this turn bounded to initiative discovery and self-queue maintenance.\n"
"- You may add, claim, execute, cancel, or mark self-queue items only when the payload supports it.\n"
"- You may preview scheduled-task repair candidates with repair_scheduled_tasks(apply=false).\n"
"- You may apply scheduled-task repairs only for an unambiguous blocked_by_route -> worker repair "
"where the task already has a valid worker_id; never provide worker_id from this route.\n"
"- You may apply scheduled-task repairs only for unambiguous blocked_by_route candidates. "
"For worker repairs the task must already have a valid worker_id; never provide worker_id from this route.\n"
"- Do not start workers directly, schedule recurring tasks, use filesystem tools, use network/MCP tools, "
"or perform external side effects from this route.\n"
"- Use execute_self_queue_item only for an existing low/medium-risk queue item with an explicit worker_id; "
Expand Down Expand Up @@ -834,6 +837,31 @@ async def route_scheduled_octo_control(
await octo.set_thinking(False)


async def route_scheduled_octo_task(
octo: Any,
task: dict[str, Any],
*,
chat_id: int = 0,
) -> str:
"""Run a scheduled task as a full Octo workspace task with normal tools and context."""
task_text = _build_scheduled_octo_task_input(task)
bootstrap_context = await build_bootstrap_context_prompt(octo.store, chat_id)
return await route_or_reply(
octo,
octo.provider,
octo.memory,
task_text,
chat_id,
bootstrap_context.content,
internal_followup=True,
show_typing=False,
images=None,
saved_file_paths=None,
include_wakeup=False,
route_mode=RouteMode.CONVERSATION,
)


async def _complete_route_with_tools(
*,
octo: Any,
Expand Down Expand Up @@ -1880,6 +1908,11 @@ def _build_scheduler_tick_input(octo: Any, *, max_tasks: int = 10) -> str:
"worker_id": task.get("worker_id"),
"frequency": task.get("frequency"),
"notify_user": task.get("notify_user"),
"execution_mode": task.get("execution_mode"),
"dispatch_ready": task.get("dispatch_ready"),
"dispatch_policy_reason": task.get("dispatch_policy_reason"),
"blocked_reason": task.get("blocked_reason"),
"suggested_execution_mode": task.get("suggested_execution_mode"),
"task_text": task.get("task_text"),
}
for task in due_tasks[: max(1, int(max_tasks))]
Expand All @@ -1891,6 +1924,11 @@ def _build_scheduler_tick_input(octo: Any, *, max_tasks: int = 10) -> str:
"due_now": bool(task.get("due_now")),
"next_run_at": task.get("next_run_at"),
"notify_user": task.get("notify_user"),
"execution_mode": task.get("execution_mode"),
"dispatch_ready": task.get("dispatch_ready"),
"dispatch_policy_reason": task.get("dispatch_policy_reason"),
"blocked_reason": task.get("blocked_reason"),
"suggested_execution_mode": task.get("suggested_execution_mode"),
}
for task in preview_tasks
],
Expand Down Expand Up @@ -1940,8 +1978,8 @@ async def _build_proactive_tick_input(octo: Any, *, chat_id: int, reason: str) -
f"{json.dumps(payload, ensure_ascii=False, sort_keys=True)}\n"
"If there is already pending self-queue work with an explicit worker_id, you may use execute_self_queue_item. "
"If pending work lacks a worker_id, prefer decision=blocked or noop. "
"If an opportunity kind is scheduled_task_repair and the task already has worker_id, you may preview "
"repair_scheduled_tasks and apply it only when the candidate is safe. "
"If an opportunity kind is scheduled_task_repair, you may preview repair_scheduled_tasks and apply it "
"only when the candidate is safe. Worker repairs require an existing worker_id. "
"If the best opportunity is confidence >= 0.75, low/medium risk, and no pending work exists, "
"use octo_self_queue_add to queue exactly one concrete initiative. "
"Do not call start_worker directly from this route."
Expand All @@ -1967,6 +2005,33 @@ def _build_scheduled_octo_control_input(task: dict[str, Any]) -> str:
)


def _build_scheduled_octo_task_input(task: dict[str, Any]) -> str:
payload = {
"task_id": task.get("id"),
"name": task.get("name"),
"frequency": task.get("frequency"),
"execution_mode": task.get("execution_mode"),
"notify_user": task.get("notify_user"),
"description": task.get("description"),
"task_text": task.get("task_text"),
"inputs": task.get("inputs") if isinstance(task.get("inputs"), dict) else {},
"last_run_at": task.get("last_run_at"),
}
return (
"Run this scheduled Octo task as a full autonomous workspace task:\n"
f"{json.dumps(payload, ensure_ascii=False, sort_keys=True)}\n\n"
"Use the normal tools, workspace context, memory, filesystem, MCP, web, and workers as needed. "
"Complete the task end-to-end before returning a completion signal. "
"If you create or update a file, verify it exists before finishing. "
"Do not treat this as a bounded control-plane route.\n\n"
"When the task is complete, return exactly one of:\n"
"- SCHEDULED_TASK_DONE if it completed successfully and no user-facing update is needed.\n"
"- <user_visible>...</user_visible> if it completed and the user should receive a concise update.\n"
"- NO_USER_RESPONSE only if the task intentionally produced no change.\n"
"Return SCHEDULED_TASK_BLOCKED only if the task truly cannot be completed even with the full Octo toolset."
)


def _ensure_mandatory_octo_tools(
active_tools: list[ToolSpec], all_tools: list[ToolSpec]
) -> list[ToolSpec]:
Expand Down
128 changes: 127 additions & 1 deletion src/octopal/runtime/octo/scheduled_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
from octopal.runtime.octo.router import (
route_scheduled_octo_control as _default_route_scheduled_octo_control,
)
from octopal.runtime.octo.router import (
route_scheduled_octo_task as _default_route_scheduled_octo_task,
)
from octopal.runtime.octo.runtime_config import _env_int
from octopal.runtime.octo.scheduler_helpers import _coerce_positive_chat_id
from octopal.runtime.scheduler.service import (
Expand Down Expand Up @@ -154,7 +157,10 @@ def _update_scheduled_octo_control_backoff_metadata(
else:
metadata.pop(SCHEDULED_TASK_BLOCKED_REASON_KEY, None)
if reason_value == "blocked_by_route":
metadata[SCHEDULED_TASK_SUGGESTED_EXECUTION_MODE_KEY] = "worker"
if str(task.get("worker_id") or "").strip():
metadata[SCHEDULED_TASK_SUGGESTED_EXECUTION_MODE_KEY] = "worker"
else:
metadata[SCHEDULED_TASK_SUGGESTED_EXECUTION_MODE_KEY] = "octo_task"
elif blocked_until is None:
metadata.pop(SCHEDULED_TASK_SUGGESTED_EXECUTION_MODE_KEY, None)
try:
Expand Down Expand Up @@ -312,6 +318,25 @@ async def _dispatch_due_scheduled_tasks_once(
elif str(result.get("status") or "").strip().lower() == "failed":
summary["errors"] += 1
continue
if execution_mode == "octo_task":
summary["attempted"] += 1
try:
result = await self._run_scheduled_octo_task_once(
task=task,
chat_id=dispatch_chat_id,
)
except Exception:
summary["errors"] += 1
logger.exception(
"Scheduled Octo task failed",
task_id=task_id or None,
)
continue
if bool(result.get("completed")):
summary["completed"] += 1
elif str(result.get("status") or "").strip().lower() == "failed":
summary["errors"] += 1
continue
if not worker_id or not task_text:
summary["rejected_by_policy"] += 1
reason = "missing_worker_id" if not worker_id else "missing_task_text"
Expand Down Expand Up @@ -471,3 +496,104 @@ async def _run_scheduled_octo_control_task_once(
"user_visible_sent": user_visible_sent,
"delivery_mode": delivery.mode,
}

async def _run_scheduled_octo_task_once(
self,
*,
task: dict[str, Any],
chat_id: int = 0,
) -> dict[str, Any]:
scheduler = self.scheduler
task_id = str(task.get("id") or "").strip()
notify_user = normalize_notify_user_policy(task.get("notify_user"))
route_scheduled_octo_task = _core_callable(
"route_scheduled_octo_task",
_default_route_scheduled_octo_task,
)
reply_text = await route_scheduled_octo_task(
self,
task,
chat_id=chat_id,
)
normalized_reply = await _normalize_scheduled_octo_control_reply(
self.provider,
reply_text,
bounded_control=False,
)
if normalized_reply == _SCHEDULED_OCTO_CONTROL_BLOCKED:
logger.warning(
"Scheduled Octo task reported blocked",
task_id=task_id or None,
chat_id=chat_id,
raw_reply_preview=safe_preview(reply_text, limit=200),
)
return {
"status": "failed",
"completed": False,
"reason": "blocked",
}
if normalized_reply == "NO_USER_RESPONSE":
logger.warning(
"Scheduled Octo task missing explicit completion signal",
task_id=task_id or None,
chat_id=chat_id,
raw_reply_preview=safe_preview(reply_text, limit=200),
)
return {
"status": "failed",
"completed": False,
"reason": "missing_completion_signal",
}
if normalized_reply == _SCHEDULED_OCTO_CONTROL_DONE:
if scheduler is not None and task_id:
scheduler.mark_executed(task_id)
logger.info(
"Scheduled Octo task completed silently",
task_id=task_id or None,
notify_user=notify_user,
chat_id=chat_id,
)
return {
"status": "completed",
"completed": True,
"user_visible_sent": False,
"delivery_mode": DeliveryMode.SILENT,
}

delivery = resolve_user_delivery(normalized_reply)
user_visible_sent = False
if delivery.user_visible and notify_user != "never":
send_scheduler_control_update = _core_callable(
"_send_scheduler_control_update",
_default_send_scheduler_control_update,
)
await send_scheduler_control_update(
self,
chat_id,
task_id or None,
delivery.text,
)
user_visible_sent = True
elif delivery.user_visible:
logger.info(
"Scheduled Octo task update suppressed by notify policy",
task_id=task_id or None,
notify_user=notify_user,
chat_id=chat_id,
)
if scheduler is not None and task_id:
scheduler.mark_executed(task_id)
logger.info(
"Scheduled Octo task completed",
task_id=task_id or None,
notify_user=notify_user,
chat_id=chat_id,
user_visible_sent=user_visible_sent,
delivery_mode=delivery.mode,
)
return {
"status": "completed",
"completed": True,
"user_visible_sent": user_visible_sent,
"delivery_mode": delivery.mode,
}
31 changes: 21 additions & 10 deletions src/octopal/runtime/octo/self_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,34 @@ def _scheduler_opportunity_cards(
if not task_id:
continue
suggested_mode = str(scheduled_task.get("suggested_execution_mode") or "").strip().lower()
if suggested_mode != "worker":
if suggested_mode not in {"worker", "octo_task"}:
continue
dedupe_key = f"scheduled-task:{task_id}:suggested-worker"
worker_id = str(scheduled_task.get("worker_id") or "").strip()
if suggested_mode == "worker" and not worker_id:
continue
dedupe_key = f"scheduled-task:{task_id}:suggested-{suggested_mode}"
if dedupe_key in active_dedupe_keys:
continue

name = str(scheduled_task.get("name") or task_id).strip()
worker_id = str(scheduled_task.get("worker_id") or "").strip() or "ops_sre"
blocked_reason = str(
scheduled_task.get("blocked_reason")
or scheduled_task.get("dispatch_policy_reason")
or "suggested_execution_mode=worker"
or f"suggested_execution_mode={suggested_mode}"
).strip()
if suggested_mode == "worker":
task = (
f"Inspect scheduled task {task_id!r} ({name!r}) blocked from its current route. "
"Find the least-risk repair or migration path, verify whether worker execution is appropriate, "
"and report the exact recommended change."
)
suggested_worker_id = worker_id
else:
task = (
f"Repair scheduled task {task_id!r} ({name!r}) from its blocked control route "
"to the full scheduled Octo task execution mode, then verify it is dispatch-ready."
)
suggested_worker_id = None
cards.append(
_build_opportunity_card(
kind="scheduled_task_repair",
Expand All @@ -106,12 +121,8 @@ def _scheduler_opportunity_cards(
effort="medium",
confidence=0.88,
risk="medium",
suggested_worker_id=worker_id,
task=(
f"Inspect scheduled task {task_id!r} ({name!r}) blocked from its current route. "
"Find the least-risk repair or migration path, verify whether worker execution is appropriate, "
"and report the exact recommended change."
),
suggested_worker_id=suggested_worker_id,
task=task,
dedupe_key=dedupe_key,
inputs={
"scheduled_task_id": task_id,
Expand Down
Loading