Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/octopal/cli/branding.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ def print_banner() -> None:
""").strip()

output_encoding = (sys.stdout.encoding or "utf-8").lower()
banner_text.encode(output_encoding, errors="strict")
try:
banner_text.encode(output_encoding, errors="strict")
except UnicodeEncodeError:
banner_text = "OCTOPAL"

tagline = Text("Your trusted AI pal", style=f"italic {OCTO_SILVER}")
subline = Text("SECURE MULTI-AGENT EXECUTION RUNTIME", style=OCTO_WHITE)
Expand Down
52 changes: 39 additions & 13 deletions src/octopal/infrastructure/providers/litellm_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ async def complete_with_tools(
messages: list[Message | dict],
*,
tools: list[dict],
tool_choice: str = "auto",
tool_choice: object = "auto",
**kwargs: object,
) -> dict:
"""Complete a chat request with tool/function calling."""
Expand Down Expand Up @@ -537,7 +537,7 @@ async def _complete_with_tools_adaptive_response_format(
*,
messages: list[dict[str, Any]],
tools: list[dict[str, Any]],
tool_choice: str,
tool_choice: object,
request_kwargs: dict[str, object],
) -> tuple[Any, str]:
requested_response_format = request_kwargs.get("response_format")
Expand Down Expand Up @@ -606,30 +606,40 @@ async def _acompletion_with_resilience(self, **kwargs: object) -> Any:

async def _acompletion_guarded(self, **kwargs: object) -> Any:
async with self._semaphore:
timeout_seconds = _coerce_timeout_seconds(kwargs.get("timeout"))
try:
response = await acompletion(
model=self._model,
api_base=self._api_base,
api_key=self._api_key,
**kwargs,
response = await _await_with_runtime_timeout(
acompletion(
model=self._model,
api_base=self._api_base,
api_key=self._api_key,
**kwargs,
),
timeout_seconds=timeout_seconds,
)
except Exception as exc:
if not _is_closed_client_error(exc):
raise
logger.warning(
"LiteLLM client was closed mid-request; retrying once with a fresh completion call"
)
response = await acompletion(
model=self._model,
api_base=self._api_base,
api_key=self._api_key,
**kwargs,
response = await _await_with_runtime_timeout(
acompletion(
model=self._model,
api_base=self._api_base,
api_key=self._api_key,
**kwargs,
),
timeout_seconds=timeout_seconds,
)
# LiteLLM can occasionally return a nested awaitable object on
# provider-error paths (seen on Python 3.14). Unwrap it to avoid
# "coroutine ... was never awaited" warnings and leaked coroutines.
while inspect.isawaitable(response):
response = await response
response = await _await_with_runtime_timeout(
response,
timeout_seconds=timeout_seconds,
)
return response


Expand All @@ -644,6 +654,22 @@ def _serialize_message(message: Message | dict) -> dict:
return serialized


def _coerce_timeout_seconds(value: object) -> float | None:
try:
timeout = float(value) if value is not None else None
except (TypeError, ValueError):
return None
if timeout is None or timeout <= 0:
return None
return timeout


async def _await_with_runtime_timeout(awaitable: Any, *, timeout_seconds: float | None) -> Any:
if timeout_seconds is None:
return await awaitable
return await asyncio.wait_for(awaitable, timeout=timeout_seconds)


def _normalize_plain_messages(messages: list[dict[str, Any]]) -> list[dict[str, str]]:
normalized: list[dict[str, str]] = []
for message in messages:
Expand Down
131 changes: 124 additions & 7 deletions src/octopal/runtime/workers/agent_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import json
import os
import random
import re
import time
import traceback
from pathlib import Path
Expand Down Expand Up @@ -131,6 +132,50 @@
"start_child_worker",
"start_workers_parallel",
}
_WRITE_TASK_TOKENS = {
"append",
"create",
"created",
"creates",
"draft",
"edit",
"edits",
"save",
"saved",
"update",
"updates",
"write",
"writes",
"writing",
}
_FILE_TASK_TOKENS = {
"artifact",
"config",
"csv",
"doc",
"document",
"draft",
"file",
"files",
"json",
"markdown",
"md",
"note",
"notes",
"path",
"report",
"text",
"toml",
"workspace",
"yaml",
"yml",
}
_FILE_PATH_HINT_RE = re.compile(
r"(?:^|[\s`'\"])[\w./\\-]+\."
r"(?:cfg|conf|csv|html|ini|json|log|md|py|toml|txt|ya?ml)"
r"(?:$|[\s`'\",.:;])",
re.IGNORECASE,
)


def _parse_positive_int_env(name: str, default: int) -> int:
Expand All @@ -155,6 +200,31 @@ def _parse_nonnegative_int_env(name: str, default: int) -> int:
return value if value >= 0 else default


def _tokenize_task(text: str) -> set[str]:
return set(re.findall(r"[a-z0-9_]+", (text or "").lower()))


def _task_requires_workspace_write(task: str) -> bool:
tokens = _tokenize_task(task)
return bool(tokens & _WRITE_TASK_TOKENS) and (
bool(tokens & _FILE_TASK_TOKENS) or bool(_FILE_PATH_HINT_RE.search(task or ""))
)


def _fs_write_completion_missing(task: str, available_tools: list[str], tools_used: list[str]) -> bool:
normalized_available = {str(tool).strip().lower() for tool in available_tools}
normalized_used = {str(tool).strip().lower() for tool in tools_used}
return (
"fs_write" in normalized_available
and "fs_write" not in normalized_used
and _task_requires_workspace_write(task)
)


def _force_tool_choice(tool_name: str) -> dict[str, dict[str, str] | str]:
return {"type": "function", "function": {"name": tool_name}}


def _extract_tool_progress_key(tool_name: str | None, tool_result: Any) -> str | None:
normalized_tool = str(tool_name or "").strip()
structured = _decode_structured_tool_result(tool_result)
Expand Down Expand Up @@ -710,6 +780,7 @@ async def mcp_proxy_handler(args: dict, ctx: dict, s_id=s_id, t_name=t_name):
{tool_descriptions}

Use available tools through normal tool calls. Do not emit ad-hoc JSON tool_use blocks.
If the task asks you to create, write, save, update, or edit a workspace file and fs_write is available, you must call fs_write before returning a result. Do not claim a file was written until the fs_write tool returns successfully.

{coordination_prompt}

Expand Down Expand Up @@ -783,8 +854,19 @@ async def mcp_proxy_handler(args: dict, ctx: dict, s_id=s_id, t_name=t_name):

while thinking_steps < effective_max_steps:
llm_start = time.perf_counter()
force_fs_write = _fs_write_completion_missing(
spec.task,
list(spec.available_tools or []),
tools_used,
)
try:
response = await _call_llm(provider, messages, filtered_tools)
response = await _call_llm(
provider,
messages,
filtered_tools,
tool_choice=_force_tool_choice("fs_write") if force_fs_write else "auto",
response_format_enabled=not force_fs_write,
)
except Exception as exc:
telemetry["llm_latency_ms_total"] += int((time.perf_counter() - llm_start) * 1000)
error_text = str(exc)
Expand Down Expand Up @@ -1082,6 +1164,20 @@ async def mcp_proxy_handler(args: dict, ctx: dict, s_id=s_id, t_name=t_name):
# Try to parse structured JSON result, including fenced JSON blocks.
result_block = _extract_result_block(content)
if result_block is not None:
if _fs_write_completion_missing(spec.task, list(spec.available_tools or []), tools_used):
messages.append({"role": "assistant", "content": content})
messages.append(
{
"role": "user",
"content": (
"The task requires an actual fs_write tool call before completion. "
"Call fs_write with the requested path and content now, then return "
"the structured result only after fs_write succeeds."
),
}
)
thinking_steps += 1
continue
cycle_steps = thinking_steps + 1
return WorkerResult(
status=(
Expand All @@ -1100,6 +1196,20 @@ async def mcp_proxy_handler(args: dict, ctx: dict, s_id=s_id, t_name=t_name):

# If model produced plain text with no tool call, treat it as completion.
if content:
if _fs_write_completion_missing(spec.task, list(spec.available_tools or []), tools_used):
messages.append({"role": "assistant", "content": content})
messages.append(
{
"role": "user",
"content": (
"The task requires an actual fs_write tool call before completion. "
"Call fs_write with the requested path and content now, then return "
"the final answer only after fs_write succeeds."
),
}
)
thinking_steps += 1
continue
cycle_steps = thinking_steps + 1
return WorkerResult(
summary=content,
Expand Down Expand Up @@ -1168,6 +1278,9 @@ async def _call_llm(
provider: LiteLLMProvider,
messages: list[dict],
tools: list,
*,
tool_choice: object = "auto",
response_format_enabled: bool = True,
) -> dict:
"""Call LLM with tools using the centralized provider."""
# Build OpenAI-style tools format
Expand All @@ -1183,17 +1296,21 @@ async def _call_llm(
for t in tools
]

response_format = {
"type": "json_schema",
"json_schema": {"name": "worker_result", "schema": _RESULT_SCHEMA},
}
response_format = None
if response_format_enabled:
response_format = {
"type": "json_schema",
"json_schema": {"name": "worker_result", "schema": _RESULT_SCHEMA},
}
# Provider handles adaptive response_format downgrade when a route does not
# support schema-constrained outputs.
request_kwargs: dict[str, Any] = {"tool_choice": tool_choice}
if response_format is not None:
request_kwargs["response_format"] = response_format
response = await provider.complete_with_tools(
messages=messages,
tools=openai_tools if openai_tools else [],
tool_choice="auto",
response_format=response_format,
**request_kwargs,
)

# Return in expected format: {"content": "...", "tool_calls": [...]}
Expand Down
29 changes: 28 additions & 1 deletion src/octopal/runtime/workers/allowed_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,30 @@ def _workspace_relative_path(
return rel_path.as_posix()


def _existing_parent_workspace_path(
raw_path: object,
*,
workspace_dir: Path | None = None,
) -> str | None:
raw = str(raw_path or "").strip().strip("`'\".,;:)")
if not raw:
return None
rel = _workspace_relative_path(raw, workspace_dir=workspace_dir)
if not rel:
return None
rel_path = Path(rel)
if not rel_path.suffix:
return None

workspace = _workspace_path(workspace_dir)
parent = rel_path.parent
while str(parent) not in {"", "."}:
if (workspace / parent).is_dir():
return parent.as_posix()
parent = parent.parent
return None


def normalize_allowed_paths(
value: object,
*,
Expand Down Expand Up @@ -77,11 +101,14 @@ def infer_allowed_paths_from_task(
seen: set[str] = set()
inferred: list[str] = []
for match in _PATH_TOKEN_RE.finditer(task or ""):
raw_path = match.group("path")
rel = _workspace_relative_path(
match.group("path"),
raw_path,
workspace_dir=workspace_dir,
require_exists=True,
)
if not rel:
rel = _existing_parent_workspace_path(raw_path, workspace_dir=workspace_dir)
if not rel or rel in seen:
continue
seen.add(rel)
Expand Down
8 changes: 4 additions & 4 deletions src/octopal/runtime/workers/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async def launch(
host_worker_dir = Path(cwd).resolve()
container_worker_dir = f"{container_ws}/workers/{worker_id}"
cmd_args.extend(["-v", f"{host_worker_dir}:{container_worker_dir}"])
container_env = _filter_container_env(env, worker_workspace=container_worker_dir)
container_env = _filter_container_env(env, container_workspace=container_ws)
for key, value in container_env.items():
cmd_args.extend(["-e", f"{key}={value}"])
cmd_args.extend(["-e", f"HOME={container_worker_dir}"])
Expand Down Expand Up @@ -149,7 +149,7 @@ async def launch(


def _filter_container_env(
env: dict[str, str], *, worker_workspace: str | None = None
env: dict[str, str], *, container_workspace: str | None = None
) -> dict[str, str]:
# Container env must be explicit; keep only a safe subset.
allowed = {
Expand All @@ -168,8 +168,8 @@ def _filter_container_env(
"FIRECRAWL_API_KEY",
}
filtered = {key: value for key, value in env.items() if key in allowed}
if worker_workspace:
filtered["OCTOPAL_WORKSPACE_DIR"] = worker_workspace
if container_workspace:
filtered["OCTOPAL_WORKSPACE_DIR"] = container_workspace
return filtered


Expand Down
Loading