Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/octopal/runtime/octo/prompts/octo_system.md
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ You are the manager of your own schedule.
- Use `list_schedule` to see all your planned tasks.
- Use `schedule_task` to add new recurring tasks or update existing ones.
- Use `remove_task` to stop a recurring task.
- For `execution_mode="worker"`, set `allowed_paths` when the scheduled worker must read or write files from your main workspace. Workers have private scratch workspaces by default and cannot assume main-workspace file visibility. Pass only the smallest workspace-relative files or directories needed for that scheduled task. Omit `allowed_paths` for `octo_task` and `octo_control` schedules.
- When creating schedules, set `notify_user` explicitly:
- `never` for quiet maintenance/checks
- `if_significant` for most background work
Expand Down
44 changes: 44 additions & 0 deletions src/octopal/runtime/octo/scheduled_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
normalize_notify_user_policy,
parse_scheduled_task_blocked_until,
)
from octopal.runtime.workers.allowed_paths import (
infer_allowed_paths_from_task,
normalize_allowed_paths,
)
from octopal.utils import utc_now

logger = structlog.get_logger(__name__)
Expand Down Expand Up @@ -74,6 +78,39 @@ def _scheduled_octo_control_backoff_seconds() -> float:
)


def _scheduled_workspace_dir(octo: Any) -> Any:
scheduler = getattr(octo, "scheduler", None)
workspace_dir = getattr(scheduler, "workspace_dir", None)
if workspace_dir is not None:
return workspace_dir
canon = getattr(octo, "canon", None)
return getattr(canon, "workspace_dir", None)


def _scheduled_task_allowed_paths(
octo: Any,
task: dict[str, Any],
*,
task_text: str,
inputs: dict[str, Any],
) -> list[str] | None:
workspace_dir = _scheduled_workspace_dir(octo)
metadata = task.get("metadata") if isinstance(task.get("metadata"), dict) else {}
explicit = normalize_allowed_paths(
metadata.get("allowed_paths") if isinstance(metadata, dict) else None,
workspace_dir=workspace_dir,
)
if explicit:
return explicit
explicit = normalize_allowed_paths(
inputs.get("allowed_paths"),
workspace_dir=workspace_dir,
)
if explicit:
return explicit
return infer_allowed_paths_from_task(task_text, workspace_dir=workspace_dir)


class OctoScheduledRuntimeMixin:
def _get_scheduled_octo_control_backoff(self, task_id: str) -> tuple[float, str] | None:
task_id_value = str(task_id or "").strip()
Expand Down Expand Up @@ -354,6 +391,12 @@ async def _dispatch_due_scheduled_tasks_once(

summary["attempted"] += 1
try:
allowed_paths = _scheduled_task_allowed_paths(
self,
task,
task_text=task_text,
inputs=inputs,
)
result = await self._start_worker_async(
worker_id=worker_id,
task=task_text,
Expand All @@ -362,6 +405,7 @@ async def _dispatch_due_scheduled_tasks_once(
tools=None,
model=None,
timeout_seconds=None,
allowed_paths=allowed_paths,
scheduled_task_id=task_id or None,
)
except Exception:
Expand Down
19 changes: 15 additions & 4 deletions src/octopal/runtime/scheduler/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import structlog

from octopal.infrastructure.store.base import Store
from octopal.runtime.workers.allowed_paths import normalize_allowed_paths
from octopal.runtime.workers.loader import get_worker_template
from octopal.utils import utc_now

Expand Down Expand Up @@ -91,6 +92,7 @@ def schedule_task(
description: str | None = None,
worker_id: str | None = None,
inputs: dict | None = None,
allowed_paths: list[str] | None = None,
notify_user: str | None = None,
execution_mode: str | None = None,
) -> str:
Expand All @@ -110,7 +112,19 @@ def schedule_task(
raise ValueError(
f"worker_id must be omitted when execution_mode={normalized_execution_mode}."
)
normalized_allowed_paths = normalize_allowed_paths(
allowed_paths,
workspace_dir=self.workspace_dir,
)
if normalized_allowed_paths and normalized_execution_mode != "worker":
raise ValueError("allowed_paths can only be used when execution_mode=worker.")
task_id = self._generate_id(name)
metadata: dict[str, Any] = {
"notify_user": normalized_notify_user,
"execution_mode": normalized_execution_mode,
}
if normalized_allowed_paths:
metadata["allowed_paths"] = normalized_allowed_paths
self.store.upsert_scheduled_task(
task_id=task_id,
name=name,
Expand All @@ -119,10 +133,7 @@ def schedule_task(
description=description,
worker_id=worker_id_value,
inputs=inputs,
metadata={
"notify_user": normalized_notify_user,
"execution_mode": normalized_execution_mode,
},
metadata=metadata,
)
self.sync_to_markdown()
return task_id
Expand Down
89 changes: 89 additions & 0 deletions src/octopal/runtime/workers/allowed_paths.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from __future__ import annotations

import os
import re
from pathlib import Path

_PATH_TOKEN_RE = re.compile(r"(?P<path>(?:[A-Za-z]:[\\/])?(?:[\w.@()+-]+[\\/])+[\w.@()+-]+)")


def _workspace_path(workspace_dir: Path | None = None) -> Path:
if workspace_dir is not None:
return Path(workspace_dir).resolve()
return Path(os.getenv("OCTOPAL_WORKSPACE_DIR", "workspace")).resolve()


def _workspace_relative_path(
raw_path: object,
*,
workspace_dir: Path | None = None,
require_exists: bool = False,
) -> str | None:
raw = str(raw_path or "").strip().strip("`'\".,;:)")
if not raw:
return None

workspace = _workspace_path(workspace_dir)
path = Path(raw)
if path.is_absolute():
try:
resolved = path.resolve()
rel_path = resolved.relative_to(workspace)
except (OSError, ValueError):
return None
else:
rel_path = path
try:
resolved = (workspace / rel_path).resolve()
rel_path = resolved.relative_to(workspace)
except (OSError, ValueError):
return None

if require_exists and not (workspace / rel_path).exists():
return None
return rel_path.as_posix()


def normalize_allowed_paths(
value: object,
*,
workspace_dir: Path | None = None,
) -> list[str] | None:
if value is None:
return None
if isinstance(value, str):
raw_items = [value]
elif isinstance(value, (list, tuple, set)):
raw_items = list(value)
else:
return None

seen: set[str] = set()
normalized: list[str] = []
for item in raw_items:
rel = _workspace_relative_path(item, workspace_dir=workspace_dir)
if not rel or rel in seen:
continue
seen.add(rel)
normalized.append(rel)
return normalized or None


def infer_allowed_paths_from_task(
task: str,
*,
workspace_dir: Path | None = None,
) -> list[str] | None:
seen: set[str] = set()
inferred: list[str] = []
for match in _PATH_TOKEN_RE.finditer(task or ""):
rel = _workspace_relative_path(
match.group("path"),
workspace_dir=workspace_dir,
require_exists=True,
)
if not rel or rel in seen:
continue
seen.add(rel)
inferred.append(rel)
return inferred or None
35 changes: 26 additions & 9 deletions src/octopal/tools/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
)
from octopal.runtime.metrics import read_metrics_snapshot
from octopal.runtime.octo_status import build_octo_status
from octopal.runtime.scheduler.service import (
normalize_execution_mode,
normalize_notify_user_policy,
)
from octopal.runtime.state import is_pid_running, read_status
from octopal.tools.browser.actions import (
from octopal.runtime.scheduler.service import (
normalize_execution_mode,
normalize_notify_user_policy,
)
from octopal.runtime.state import is_pid_running, read_status
from octopal.runtime.workers.allowed_paths import normalize_allowed_paths
from octopal.tools.browser.actions import (
browser_click,
browser_close,
browser_extract,
Expand Down Expand Up @@ -692,9 +693,19 @@ def get_tools(mcp_manager=None) -> list[ToolSpec]:
},
"worker_id": {"type": "string", "description": "Specific worker template ID to use when execution_mode=worker."},
"inputs": {"type": "object", "description": "Optional: Inputs for the worker."},
"notify_user": {
"type": "string",
"enum": ["never", "if_significant", "always"],
"allowed_paths": {
"type": "array",
"items": {"type": "string"},
"description": (
"For execution_mode=worker only: workspace-relative files or directories "
"from Octo's main workspace that the scheduled worker must read or write. "
"Use the smallest explicit set needed; omit this for octo_task and "
"octo_control schedules."
),
},
"notify_user": {
"type": "string",
"enum": ["never", "if_significant", "always"],
"description": "When the user should hear about this scheduled task: never, only if significant, or always.",
},
},
Expand Down Expand Up @@ -1874,6 +1885,7 @@ def _tool_schedule_task(args, ctx) -> str:
description=args.get("description"),
worker_id=args.get("worker_id"),
inputs=args.get("inputs"),
allowed_paths=args.get("allowed_paths"),
notify_user=args.get("notify_user"),
execution_mode=args.get("execution_mode"),
)
Expand All @@ -1887,6 +1899,10 @@ def _tool_schedule_task(args, ctx) -> str:
notify_user = normalize_notify_user_policy(args.get("notify_user"))
if execution_mode == "octo_control" and notify_user == "if_significant":
notify_user = "never"
allowed_paths = normalize_allowed_paths(
args.get("allowed_paths"),
workspace_dir=ctx["octo"].scheduler.workspace_dir,
)

return json.dumps(
{
Expand All @@ -1896,6 +1912,7 @@ def _tool_schedule_task(args, ctx) -> str:
"frequency": args["frequency"],
"execution_mode": execution_mode,
"notify_user": notify_user,
"allowed_paths": allowed_paths or [],
},
ensure_ascii=False,
)
Expand Down
35 changes: 3 additions & 32 deletions src/octopal/tools/workers/management.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import asyncio
import hashlib
import json
import os
import re
from datetime import timedelta
from pathlib import Path
Expand All @@ -13,6 +12,9 @@
SYNTHESIZE_WORKER_OUTPUT_CONTEXT_BUDGET,
summarize_worker_output_for_context,
)
from octopal.runtime.workers.allowed_paths import (
infer_allowed_paths_from_task as _infer_allowed_paths_from_task,
)
from octopal.tools.registry import ToolSpec
from octopal.utils import utc_now

Expand All @@ -21,7 +23,6 @@

_WORKER_ID_PATTERN = re.compile(r"^[a-z0-9][a-z0-9_-]*$")
_TOKEN_RE = re.compile(r"[a-z0-9_]+")
_PATH_TOKEN_RE = re.compile(r"(?P<path>(?:[A-Za-z]:[\\/])?(?:[\w.@()+-]+[\\/])+[\w.@()+-]+)")
_MAX_PARALLEL_BATCH = 10
_WORKER_BLOCKED_TOOL_NAMES = {
"send_file_to_user",
Expand Down Expand Up @@ -1900,36 +1901,6 @@ def _normalize_tool_name_list(value: object) -> list[str]:
return normalized


def _infer_allowed_paths_from_task(task: str) -> list[str] | None:
workspace = Path(os.getenv("OCTOPAL_WORKSPACE_DIR", "workspace")).resolve()
inferred: list[str] = []
seen: set[str] = set()
for match in _PATH_TOKEN_RE.finditer(task or ""):
raw_path = match.group("path").strip().strip("`'\".,;:)")
if not raw_path:
continue
candidate = Path(raw_path)
if candidate.is_absolute():
try:
rel_path = candidate.resolve().relative_to(workspace)
except (OSError, ValueError):
continue
else:
rel_path = Path(raw_path)
candidate = workspace / rel_path
try:
candidate.resolve().relative_to(workspace)
except (OSError, ValueError):
continue
if not candidate.exists():
continue
rel = rel_path.as_posix()
if rel not in seen:
seen.add(rel)
inferred.append(rel)
return inferred or None


def _task_mentions_image_analysis(task: str) -> bool:
tokens = _tokenize(task)
return bool(tokens & _IMAGE_TASK_TOKENS)
Expand Down
Loading