diff --git a/src/octopal/runtime/octo/prompts/octo_system.md b/src/octopal/runtime/octo/prompts/octo_system.md index 8f14963..6c0d875 100644 --- a/src/octopal/runtime/octo/prompts/octo_system.md +++ b/src/octopal/runtime/octo/prompts/octo_system.md @@ -318,6 +318,7 @@ You are the manager of your own schedule. - Use `list_schedule` to see all your planned tasks. - Use `schedule_task` to add new recurring tasks or update existing ones. - Use `remove_task` to stop a recurring task. +- For `execution_mode="worker"`, set `allowed_paths` when the scheduled worker must read or write files from your main workspace. Workers have private scratch workspaces by default and cannot assume main-workspace file visibility. Pass only the smallest workspace-relative files or directories needed for that scheduled task. Omit `allowed_paths` for `octo_task` and `octo_control` schedules. - When creating schedules, set `notify_user` explicitly: - `never` for quiet maintenance/checks - `if_significant` for most background work diff --git a/src/octopal/runtime/octo/scheduled_runtime.py b/src/octopal/runtime/octo/scheduled_runtime.py index d705cf3..b56d1ff 100644 --- a/src/octopal/runtime/octo/scheduled_runtime.py +++ b/src/octopal/runtime/octo/scheduled_runtime.py @@ -35,6 +35,10 @@ normalize_notify_user_policy, parse_scheduled_task_blocked_until, ) +from octopal.runtime.workers.allowed_paths import ( + infer_allowed_paths_from_task, + normalize_allowed_paths, +) from octopal.utils import utc_now logger = structlog.get_logger(__name__) @@ -74,6 +78,39 @@ def _scheduled_octo_control_backoff_seconds() -> float: ) +def _scheduled_workspace_dir(octo: Any) -> Any: + scheduler = getattr(octo, "scheduler", None) + workspace_dir = getattr(scheduler, "workspace_dir", None) + if workspace_dir is not None: + return workspace_dir + canon = getattr(octo, "canon", None) + return getattr(canon, "workspace_dir", None) + + +def _scheduled_task_allowed_paths( + octo: Any, + task: dict[str, Any], + *, + task_text: str, + inputs: dict[str, Any], +) -> list[str] | None: + workspace_dir = _scheduled_workspace_dir(octo) + metadata = task.get("metadata") if isinstance(task.get("metadata"), dict) else {} + explicit = normalize_allowed_paths( + metadata.get("allowed_paths") if isinstance(metadata, dict) else None, + workspace_dir=workspace_dir, + ) + if explicit: + return explicit + explicit = normalize_allowed_paths( + inputs.get("allowed_paths"), + workspace_dir=workspace_dir, + ) + if explicit: + return explicit + return infer_allowed_paths_from_task(task_text, workspace_dir=workspace_dir) + + class OctoScheduledRuntimeMixin: def _get_scheduled_octo_control_backoff(self, task_id: str) -> tuple[float, str] | None: task_id_value = str(task_id or "").strip() @@ -354,6 +391,12 @@ async def _dispatch_due_scheduled_tasks_once( summary["attempted"] += 1 try: + allowed_paths = _scheduled_task_allowed_paths( + self, + task, + task_text=task_text, + inputs=inputs, + ) result = await self._start_worker_async( worker_id=worker_id, task=task_text, @@ -362,6 +405,7 @@ async def _dispatch_due_scheduled_tasks_once( tools=None, model=None, timeout_seconds=None, + allowed_paths=allowed_paths, scheduled_task_id=task_id or None, ) except Exception: diff --git a/src/octopal/runtime/scheduler/service.py b/src/octopal/runtime/scheduler/service.py index 3c45878..64a7504 100644 --- a/src/octopal/runtime/scheduler/service.py +++ b/src/octopal/runtime/scheduler/service.py @@ -9,6 +9,7 @@ import structlog from octopal.infrastructure.store.base import Store +from octopal.runtime.workers.allowed_paths import normalize_allowed_paths from octopal.runtime.workers.loader import get_worker_template from octopal.utils import utc_now @@ -91,6 +92,7 @@ def schedule_task( description: str | None = None, worker_id: str | None = None, inputs: dict | None = None, + allowed_paths: list[str] | None = None, notify_user: str | None = None, execution_mode: str | None = None, ) -> str: @@ -110,7 +112,19 @@ def schedule_task( raise ValueError( f"worker_id must be omitted when execution_mode={normalized_execution_mode}." ) + normalized_allowed_paths = normalize_allowed_paths( + allowed_paths, + workspace_dir=self.workspace_dir, + ) + if normalized_allowed_paths and normalized_execution_mode != "worker": + raise ValueError("allowed_paths can only be used when execution_mode=worker.") task_id = self._generate_id(name) + metadata: dict[str, Any] = { + "notify_user": normalized_notify_user, + "execution_mode": normalized_execution_mode, + } + if normalized_allowed_paths: + metadata["allowed_paths"] = normalized_allowed_paths self.store.upsert_scheduled_task( task_id=task_id, name=name, @@ -119,10 +133,7 @@ def schedule_task( description=description, worker_id=worker_id_value, inputs=inputs, - metadata={ - "notify_user": normalized_notify_user, - "execution_mode": normalized_execution_mode, - }, + metadata=metadata, ) self.sync_to_markdown() return task_id diff --git a/src/octopal/runtime/workers/allowed_paths.py b/src/octopal/runtime/workers/allowed_paths.py new file mode 100644 index 0000000..86306e5 --- /dev/null +++ b/src/octopal/runtime/workers/allowed_paths.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +import os +import re +from pathlib import Path + +_PATH_TOKEN_RE = re.compile(r"(?P(?:[A-Za-z]:[\\/])?(?:[\w.@()+-]+[\\/])+[\w.@()+-]+)") + + +def _workspace_path(workspace_dir: Path | None = None) -> Path: + if workspace_dir is not None: + return Path(workspace_dir).resolve() + return Path(os.getenv("OCTOPAL_WORKSPACE_DIR", "workspace")).resolve() + + +def _workspace_relative_path( + raw_path: object, + *, + workspace_dir: Path | None = None, + require_exists: bool = False, +) -> str | None: + raw = str(raw_path or "").strip().strip("`'\".,;:)") + if not raw: + return None + + workspace = _workspace_path(workspace_dir) + path = Path(raw) + if path.is_absolute(): + try: + resolved = path.resolve() + rel_path = resolved.relative_to(workspace) + except (OSError, ValueError): + return None + else: + rel_path = path + try: + resolved = (workspace / rel_path).resolve() + rel_path = resolved.relative_to(workspace) + except (OSError, ValueError): + return None + + if require_exists and not (workspace / rel_path).exists(): + return None + return rel_path.as_posix() + + +def normalize_allowed_paths( + value: object, + *, + workspace_dir: Path | None = None, +) -> list[str] | None: + if value is None: + return None + if isinstance(value, str): + raw_items = [value] + elif isinstance(value, (list, tuple, set)): + raw_items = list(value) + else: + return None + + seen: set[str] = set() + normalized: list[str] = [] + for item in raw_items: + rel = _workspace_relative_path(item, workspace_dir=workspace_dir) + if not rel or rel in seen: + continue + seen.add(rel) + normalized.append(rel) + return normalized or None + + +def infer_allowed_paths_from_task( + task: str, + *, + workspace_dir: Path | None = None, +) -> list[str] | None: + seen: set[str] = set() + inferred: list[str] = [] + for match in _PATH_TOKEN_RE.finditer(task or ""): + rel = _workspace_relative_path( + match.group("path"), + workspace_dir=workspace_dir, + require_exists=True, + ) + if not rel or rel in seen: + continue + seen.add(rel) + inferred.append(rel) + return inferred or None diff --git a/src/octopal/tools/catalog.py b/src/octopal/tools/catalog.py index ad1115d..e23c0b8 100644 --- a/src/octopal/tools/catalog.py +++ b/src/octopal/tools/catalog.py @@ -18,12 +18,13 @@ ) from octopal.runtime.metrics import read_metrics_snapshot from octopal.runtime.octo_status import build_octo_status -from octopal.runtime.scheduler.service import ( - normalize_execution_mode, - normalize_notify_user_policy, -) -from octopal.runtime.state import is_pid_running, read_status -from octopal.tools.browser.actions import ( +from octopal.runtime.scheduler.service import ( + normalize_execution_mode, + normalize_notify_user_policy, +) +from octopal.runtime.state import is_pid_running, read_status +from octopal.runtime.workers.allowed_paths import normalize_allowed_paths +from octopal.tools.browser.actions import ( browser_click, browser_close, browser_extract, @@ -692,9 +693,19 @@ def get_tools(mcp_manager=None) -> list[ToolSpec]: }, "worker_id": {"type": "string", "description": "Specific worker template ID to use when execution_mode=worker."}, "inputs": {"type": "object", "description": "Optional: Inputs for the worker."}, - "notify_user": { - "type": "string", - "enum": ["never", "if_significant", "always"], + "allowed_paths": { + "type": "array", + "items": {"type": "string"}, + "description": ( + "For execution_mode=worker only: workspace-relative files or directories " + "from Octo's main workspace that the scheduled worker must read or write. " + "Use the smallest explicit set needed; omit this for octo_task and " + "octo_control schedules." + ), + }, + "notify_user": { + "type": "string", + "enum": ["never", "if_significant", "always"], "description": "When the user should hear about this scheduled task: never, only if significant, or always.", }, }, @@ -1874,6 +1885,7 @@ def _tool_schedule_task(args, ctx) -> str: description=args.get("description"), worker_id=args.get("worker_id"), inputs=args.get("inputs"), + allowed_paths=args.get("allowed_paths"), notify_user=args.get("notify_user"), execution_mode=args.get("execution_mode"), ) @@ -1887,6 +1899,10 @@ def _tool_schedule_task(args, ctx) -> str: notify_user = normalize_notify_user_policy(args.get("notify_user")) if execution_mode == "octo_control" and notify_user == "if_significant": notify_user = "never" + allowed_paths = normalize_allowed_paths( + args.get("allowed_paths"), + workspace_dir=ctx["octo"].scheduler.workspace_dir, + ) return json.dumps( { @@ -1896,6 +1912,7 @@ def _tool_schedule_task(args, ctx) -> str: "frequency": args["frequency"], "execution_mode": execution_mode, "notify_user": notify_user, + "allowed_paths": allowed_paths or [], }, ensure_ascii=False, ) diff --git a/src/octopal/tools/workers/management.py b/src/octopal/tools/workers/management.py index ce4d84c..aa01967 100644 --- a/src/octopal/tools/workers/management.py +++ b/src/octopal/tools/workers/management.py @@ -3,7 +3,6 @@ import asyncio import hashlib import json -import os import re from datetime import timedelta from pathlib import Path @@ -13,6 +12,9 @@ SYNTHESIZE_WORKER_OUTPUT_CONTEXT_BUDGET, summarize_worker_output_for_context, ) +from octopal.runtime.workers.allowed_paths import ( + infer_allowed_paths_from_task as _infer_allowed_paths_from_task, +) from octopal.tools.registry import ToolSpec from octopal.utils import utc_now @@ -21,7 +23,6 @@ _WORKER_ID_PATTERN = re.compile(r"^[a-z0-9][a-z0-9_-]*$") _TOKEN_RE = re.compile(r"[a-z0-9_]+") -_PATH_TOKEN_RE = re.compile(r"(?P(?:[A-Za-z]:[\\/])?(?:[\w.@()+-]+[\\/])+[\w.@()+-]+)") _MAX_PARALLEL_BATCH = 10 _WORKER_BLOCKED_TOOL_NAMES = { "send_file_to_user", @@ -1900,36 +1901,6 @@ def _normalize_tool_name_list(value: object) -> list[str]: return normalized -def _infer_allowed_paths_from_task(task: str) -> list[str] | None: - workspace = Path(os.getenv("OCTOPAL_WORKSPACE_DIR", "workspace")).resolve() - inferred: list[str] = [] - seen: set[str] = set() - for match in _PATH_TOKEN_RE.finditer(task or ""): - raw_path = match.group("path").strip().strip("`'\".,;:)") - if not raw_path: - continue - candidate = Path(raw_path) - if candidate.is_absolute(): - try: - rel_path = candidate.resolve().relative_to(workspace) - except (OSError, ValueError): - continue - else: - rel_path = Path(raw_path) - candidate = workspace / rel_path - try: - candidate.resolve().relative_to(workspace) - except (OSError, ValueError): - continue - if not candidate.exists(): - continue - rel = rel_path.as_posix() - if rel not in seen: - seen.add(rel) - inferred.append(rel) - return inferred or None - - def _task_mentions_image_analysis(task: str) -> bool: tokens = _tokenize(task) return bool(tokens & _IMAGE_TASK_TOKENS) diff --git a/tests/test_scheduler_safety.py b/tests/test_scheduler_safety.py index d7dcf61..f14af16 100644 --- a/tests/test_scheduler_safety.py +++ b/tests/test_scheduler_safety.py @@ -214,6 +214,49 @@ def test_schedule_task_rejects_worker_id_for_octo_task_mode(tmp_path: Path) -> N assert result == "schedule_task error: worker_id must be omitted when execution_mode=octo_task." +def test_schedule_task_accepts_allowed_paths_for_worker_mode(tmp_path: Path) -> None: + store = _StoreStub() + scheduler = SchedulerService(store=store, workspace_dir=tmp_path) + payload = json.loads( + _tool_schedule_task( + { + "name": "Publish report", + "frequency": "Daily at 22:00", + "task": "Read the report and publish it", + "execution_mode": "worker", + "worker_id": "publisher", + "allowed_paths": ["memory/reports/latest.md", "memory/reports/latest.md"], + }, + {"octo": SimpleNamespace(scheduler=scheduler)}, + ) + ) + + assert payload["status"] == "scheduled" + assert payload["allowed_paths"] == ["memory/reports/latest.md"] + assert store.last_upsert is not None + assert store.last_upsert["metadata"] == { + "notify_user": "if_significant", + "execution_mode": "worker", + "allowed_paths": ["memory/reports/latest.md"], + } + + +def test_schedule_task_rejects_allowed_paths_for_octo_task_mode(tmp_path: Path) -> None: + scheduler = SchedulerService(store=_StoreStub(), workspace_dir=tmp_path) + result = _tool_schedule_task( + { + "name": "Write report", + "frequency": "Daily at 22:00", + "task": "Write the report", + "execution_mode": "octo_task", + "allowed_paths": ["memory/reports/latest.md"], + }, + {"octo": SimpleNamespace(scheduler=scheduler)}, + ) + + assert result == "schedule_task error: allowed_paths can only be used when execution_mode=worker." + + def test_schedule_task_derives_octo_task_mode_without_worker_id(tmp_path: Path) -> None: store = _StoreStub() scheduler = SchedulerService(store=store, workspace_dir=tmp_path) @@ -1755,11 +1798,121 @@ async def _start_worker_async(self, **kwargs): "tools": None, "model": None, "timeout_seconds": None, + "allowed_paths": None, "scheduled_task_id": "daily_digest", } ] +@pytest.mark.asyncio +async def test_octo_dispatch_due_scheduled_tasks_passes_stored_allowed_paths(monkeypatch, tmp_path: Path): + started_calls = [] + scheduler = SchedulerService( + store=_StoreStub( + tasks=[ + { + "id": "publish_report", + "name": "Publish Report", + "description": "Publish report", + "frequency": "Daily at 22:00", + "worker_id": "publisher", + "task_text": "Read the report and publish it", + "inputs_json": "{}", + "metadata_json": json.dumps( + { + "notify_user": "never", + "execution_mode": "worker", + "allowed_paths": ["memory/reports/latest.md"], + } + ), + "last_run_at": None, + "enabled": 1, + } + ] + ), + workspace_dir=tmp_path, + ) + + async def _start_worker_async(self, **kwargs): + started_calls.append(kwargs) + return {"status": "started", "run_id": "run-1", "worker_id": "run-1"} + + monkeypatch.setattr(octo_core.Octo, "_start_worker_async", _start_worker_async) + + octo = Octo( + provider=object(), + store=_StoreStub(), + policy=object(), + runtime=_RuntimeStub(), + approvals=_ApprovalsStub(), + memory=_MemoryStub(), + canon=SimpleNamespace(workspace_dir=tmp_path), + scheduler=scheduler, + ) + + summary = await octo._dispatch_due_scheduled_tasks_once(chat_id=0, max_tasks=5) + + assert summary["started"] == 1 + assert started_calls[0]["allowed_paths"] == ["memory/reports/latest.md"] + + +@pytest.mark.asyncio +async def test_octo_dispatch_due_scheduled_tasks_infers_existing_workspace_paths( + monkeypatch, + tmp_path: Path, +): + started_calls = [] + report_path = tmp_path / "memory" / "reports" / "latest.md" + report_path.parent.mkdir(parents=True) + report_path.write_text("report", encoding="utf-8") + scheduler = SchedulerService( + store=_StoreStub( + tasks=[ + { + "id": "publish_report", + "name": "Publish Report", + "description": "Publish report", + "frequency": "Daily at 22:00", + "worker_id": "publisher", + "task_text": "Read memory/reports/latest.md and publish it", + "inputs_json": "{}", + "metadata_json": json.dumps( + { + "notify_user": "never", + "execution_mode": "worker", + } + ), + "last_run_at": None, + "enabled": 1, + } + ] + ), + workspace_dir=tmp_path, + ) + + async def _start_worker_async(self, **kwargs): + started_calls.append(kwargs) + return {"status": "started", "run_id": "run-1", "worker_id": "run-1"} + + monkeypatch.setattr(octo_core.Octo, "_start_worker_async", _start_worker_async) + + octo = Octo( + provider=object(), + store=_StoreStub(), + policy=object(), + runtime=_RuntimeStub(), + approvals=_ApprovalsStub(), + memory=_MemoryStub(), + canon=SimpleNamespace(workspace_dir=tmp_path), + scheduler=scheduler, + ) + + summary = await octo._dispatch_due_scheduled_tasks_once(chat_id=0, max_tasks=5) + + assert summary["started"] == 1 + assert started_calls[0]["allowed_paths"] == ["memory/reports/latest.md"] + + @pytest.mark.asyncio async def test_octo_dispatch_due_scheduled_tasks_targets_single_configured_chat(monkeypatch): started_calls = []