From a42b62d805e92233adfd25accb626658e4fe8cbe Mon Sep 17 00:00:00 2001 From: Daniel Casper Date: Wed, 6 May 2026 17:43:16 -0500 Subject: [PATCH] break the chains --- {agents => engine/agents}/design.xml | 0 {agents => engine/agents}/engineering.xml | 0 {agents => engine/agents}/growth_ops.xml | 0 {agents => engine/agents}/product_spec.xml | 0 {agents => engine/agents}/strategy.xml | 0 engine/cli.py | 60 ++ engine/llm.py | 114 +++ engine/runtime.py | 305 ++++++++ engine/tools.py | 239 ++++++ orchestrator.py | 832 --------------------- pyproject.toml | 6 +- tests/api/test_initial.py | 82 +- tests/engine/test_cli.py | 52 ++ tests/engine/test_llm.py | 64 ++ tests/engine/test_runtime.py | 146 ++++ tests/engine/test_tools.py | 104 +++ tests/evals/test_agents.py | 4 +- tests/evals/test_orchestrator.py | 250 ------- tests/{evals => ts}/ast_validator.test.ts | 0 uv.lock | 14 + 20 files changed, 1120 insertions(+), 1152 deletions(-) rename {agents => engine/agents}/design.xml (100%) rename {agents => engine/agents}/engineering.xml (100%) rename {agents => engine/agents}/growth_ops.xml (100%) rename {agents => engine/agents}/product_spec.xml (100%) rename {agents => engine/agents}/strategy.xml (100%) create mode 100644 engine/cli.py create mode 100644 engine/llm.py create mode 100644 engine/runtime.py create mode 100644 engine/tools.py delete mode 100644 orchestrator.py create mode 100644 tests/engine/test_cli.py create mode 100644 tests/engine/test_llm.py create mode 100644 tests/engine/test_runtime.py create mode 100644 tests/engine/test_tools.py delete mode 100644 tests/evals/test_orchestrator.py rename tests/{evals => ts}/ast_validator.test.ts (100%) diff --git a/agents/design.xml b/engine/agents/design.xml similarity index 100% rename from agents/design.xml rename to engine/agents/design.xml diff --git a/agents/engineering.xml b/engine/agents/engineering.xml similarity index 100% rename from agents/engineering.xml rename to engine/agents/engineering.xml diff --git a/agents/growth_ops.xml b/engine/agents/growth_ops.xml similarity index 100% rename from agents/growth_ops.xml rename to engine/agents/growth_ops.xml diff --git a/agents/product_spec.xml b/engine/agents/product_spec.xml similarity index 100% rename from agents/product_spec.xml rename to engine/agents/product_spec.xml diff --git a/agents/strategy.xml b/engine/agents/strategy.xml similarity index 100% rename from agents/strategy.xml rename to engine/agents/strategy.xml diff --git a/engine/cli.py b/engine/cli.py new file mode 100644 index 0000000..8596242 --- /dev/null +++ b/engine/cli.py @@ -0,0 +1,60 @@ +import os +import re +import sys + +from engine.runtime import check_dependencies, run_os +from engine.tools import BASE_DIR, DOCS_DIR + +# --- SHIFT-LEFT: CROSS-PLATFORM ENCODING FIX --- +if sys.stdout.encoding.lower() != "utf-8": + sys.stdout.reconfigure(encoding="utf-8") + + +def boot(): + """Extracts boot logic so tests can bypass dependency checks.""" + check_dependencies() + try: + from dotenv import load_dotenv + + load_dotenv(os.path.join(BASE_DIR, ".env")) + except ImportError: + print("āŒ ERROR: python-dotenv package not found. Run: uv sync") + sys.exit(1) + + +def main(args=None): + if args is None: + args = sys.argv[1:] + + boot() + + prompt = "" + flags = [] + + for arg in args: + if arg.startswith("--"): + flags.append(arg) + elif not prompt: + prompt = arg + + handoff_path = os.path.join(DOCS_DIR, "ops", "handoff.md") + if not prompt and os.path.exists(handoff_path): + with open(handoff_path, encoding="utf-8") as f: + content = f.read() + match = re.search(r"PROMPT:\s*(.+)", content, re.IGNORECASE) + if match: + prompt = match.group(1).strip() + + if not prompt: + print("Usage: python engine/cli.py 'Your prompt' [--os-verbose]") + sys.exit(1) + + try: + run_os(prompt, flags) + except KeyboardInterrupt: + print("\n\nšŸ›‘ OS Execution manually interrupted by user. Shutting down gracefully.") + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/engine/llm.py b/engine/llm.py new file mode 100644 index 0000000..bf2c8d1 --- /dev/null +++ b/engine/llm.py @@ -0,0 +1,114 @@ +import json +import os +import sys +import time + +import litellm + +from engine.tools import DOCS_DIR + +# --- CONFIGURATION --- +SMART_ROUTING = os.environ.get("SMART_ROUTING", "true").lower() == "true" +DEFAULT_PROVIDER = os.environ.get("DEFAULT_PROVIDER", "openai").lower() + +# --- SMART MODEL MAPPING (LiteLLM Format) --- +MODEL_MAP = { + "Strategy": "openrouter/openai/gpt-4o-mini", + "Product Spec": "openrouter/openai/gpt-4o", + "Design": "openrouter/openai/gpt-4o", + "Engineering": "openrouter/openai/gpt-4o", + "Growth Ops": "openrouter/openai/gpt-4o-mini", + "Ops": "openrouter/openai/gpt-4o-mini", +} + + +def log_token_usage(agent, provider, model, p_tokens, c_tokens, elapsed): + """Appends token usage and latency telemetry to a local CSV artifact.""" + log_path = os.path.join(DOCS_DIR, "ops", "token_tracker.csv") + file_exists = os.path.exists(log_path) + try: + os.makedirs(os.path.dirname(log_path), exist_ok=True) + with open(log_path, "a", encoding="utf-8") as f: + if not file_exists: + f.write( + "timestamp,agent,provider,model,prompt_tokens,completion_tokens,latency_s\n" + ) + timestamp = time.strftime("%Y-%m-%d %H:%M:%S") + f.write(f"{timestamp},{agent},{provider},{model},{p_tokens},{c_tokens},{elapsed:.2f}\n") + except Exception as e: + print(f"āš ļø Could not write telemetry log: {e}") + + +def log_jsonl_telemetry( + agent, provider, model, p_tokens, c_tokens, elapsed, system_prompt, user_prompt, response +): + """Appends full execution context to a JSONL file for Brain OS / Human debugging.""" + log_path = os.path.join(DOCS_DIR, "ops", "telemetry.jsonl") + try: + os.makedirs(os.path.dirname(log_path), exist_ok=True) + entry = { + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), + "agent": agent, + "provider": provider, + "model": model, + "prompt_tokens": p_tokens, + "completion_tokens": c_tokens, + "latency_s": round(elapsed, 2), + "response": response, + } + with open(log_path, "a", encoding="utf-8") as f: + f.write(json.dumps(entry) + "\n") + except Exception as e: + print(f"āš ļø Could not write JSONL telemetry: {e}") + + +class LLMClient: + def __init__(self): + if SMART_ROUTING and not os.environ.get("OPENROUTER_API_KEY"): + print("āŒ SHIFT LEFT ERROR: SMART_ROUTING is ON, but OPENROUTER_API_KEY is missing.") + sys.exit(1) + + def call(self, agent_name, system_prompt, user_prompt): + if SMART_ROUTING and agent_name in MODEL_MAP: + model = MODEL_MAP[agent_name] + else: + if DEFAULT_PROVIDER == "openai": + model = "openai/gpt-4o-mini" + elif DEFAULT_PROVIDER == "anthropic": + model = "anthropic/claude-3-5-sonnet-latest" + else: + model = "openrouter/openai/gpt-4o-mini" + + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ] + start_time = time.time() + try: + response = litellm.completion( + model=model, messages=messages, temperature=0.2, num_retries=3, drop_params=True + ) + text = response.choices[0].message.content + p_tokens = response.usage.prompt_tokens + c_tokens = response.usage.completion_tokens + elapsed = time.time() - start_time + + log_token_usage(agent_name, "litellm", model, p_tokens, c_tokens, elapsed) + log_jsonl_telemetry( + agent_name, + "litellm", + model, + p_tokens, + c_tokens, + elapsed, + system_prompt, + user_prompt, + text, + ) + return text + except litellm.AuthenticationError as e: + print(f"\nāŒ API AUTH FATAL ERROR ({model}): {e}") + sys.exit(1) + except Exception as e: + print(f"\nāŒ API ERROR ({model}): {e}") + sys.exit(1) diff --git a/engine/runtime.py b/engine/runtime.py new file mode 100644 index 0000000..e596faf --- /dev/null +++ b/engine/runtime.py @@ -0,0 +1,305 @@ +import json +import os +import re +import sys + +from engine.llm import SMART_ROUTING, LLMClient +from engine.tools import ( + AGENTS_DIR, + BASE_DIR, + DOCS_DIR, + append_file, + auto_lint_file, + extract_section, + get_active_artifacts, + list_directory, + read_file, + run_shell_command, + tail_file, + write_file, +) + +MAX_CHAIN_STEPS = 10 + + +def check_dependencies(): + missing = [] + if not os.path.exists(os.path.join(BASE_DIR, "node_modules")): + missing.append("npm install") + if not os.path.exists(os.path.join(BASE_DIR, ".venv")): + missing.append("uv sync") + + if missing: + print("šŸ›‘ OS BOOT FAILED: Missing dependencies.") + print("Please run the following commands before starting the OS:") + for cmd in missing: + print(f" $ {cmd}") + sys.exit(1) + + +def assemble_context(agent_name): + memory_path = os.path.join(DOCS_DIR, "company", "lessons_learned.md") + context = f"\n\n--- SYSTEM MEMORY ---\n{read_file(memory_path)}\n" + + contracts_dir = os.path.join(DOCS_DIR, "product", "contracts") + public_dir = os.path.join(BASE_DIR, "public") + ui_components_dir = os.path.join(BASE_DIR, "src", "web", "components", "ui") + + if "Strategy" in agent_name: + context += read_file(os.path.join(DOCS_DIR, "company", "thesis.md")) + feedback_log_path = os.path.join(DOCS_DIR, "company", "feedback_log.md") + context += tail_file(feedback_log_path, lines=40) + context += read_file(os.path.join(DOCS_DIR, "company", "scorecard.md")) + elif "Spec" in agent_name: + backlog_path = os.path.join(DOCS_DIR, "product", "backlog.md") + context += extract_section(backlog_path, "High Priority") + context += read_file(os.path.join(DOCS_DIR, "product", "current_run.md")) + context += read_file(os.path.join(DOCS_DIR, "product", "architecture.md")) + context += "\n\n--- ACTIVE FEATURE ARTIFACTS ---\n" + for artifact_path in get_active_artifacts(): + fcontent = read_file(os.path.join(BASE_DIR, artifact_path)) + context += f'\n\n{fcontent}\n\n' + contract_list = list_directory(contracts_dir) + context += f"\n\n--- EXISTING DATA CONTRACTS (Dir Listing) ---\n{contract_list}" + context += f"\n\n--- PUBLIC ASSETS ---\n{list_directory(public_dir)}" + elif "Design" in agent_name: + context += read_file(os.path.join(DOCS_DIR, "product", "current_run.md")) + context += read_file(os.path.join(DOCS_DIR, "product", "flows.md")) + context += read_file(os.path.join(DOCS_DIR, "product", "style_guide.md")) + context += read_file(os.path.join(BASE_DIR, "src", "web", "lib", "content.ts")) + context += "\n\n--- ACTIVE FEATURE ARTIFACTS ---\n" + for artifact_path in get_active_artifacts(): + fname = os.path.basename(artifact_path) + fcontent = read_file(os.path.join(BASE_DIR, artifact_path)) + context += f"\n--- FILE: {fname} ---\n{fcontent}\n" + context += f"\n\n--- PUBLIC ASSETS ---\n{list_directory(public_dir)}" + context += f"\n\n--- AVAILABLE UI COMPONENTS ---\n{list_directory(ui_components_dir)}" + blueprint = read_file(os.path.join(DOCS_DIR, "templates", "design_blueprint.md")) + context += f"\n\n--- OUTPUT TEMPLATE ---\n{blueprint}" + elif "Engineering" in agent_name: + context += read_file(os.path.join(DOCS_DIR, "product", "current_run.md")) + context += read_file(os.path.join(DOCS_DIR, "product", "architecture.md")) + context += read_file(os.path.join(DOCS_DIR, "product", "adr", "README.md")) + context += read_file(os.path.join(DOCS_DIR, "product", "flows.md")) + context += read_file(os.path.join(DOCS_DIR, "product", "style_guide.md")) + context += read_file(os.path.join(BASE_DIR, "src", "web", "lib", "content.ts")) + context += "\n\n--- ACTIVE FEATURE ARTIFACTS ---\n" + for artifact_path in get_active_artifacts(): + fname = os.path.basename(artifact_path) + fcontent = read_file(os.path.join(BASE_DIR, artifact_path)) + context += f"\n--- FILE: {fname} ---\n{fcontent}\n" + context += f"\n\n--- PUBLIC ASSETS ---\n{list_directory(public_dir)}" + context += f"\n\n--- AVAILABLE UI COMPONENTS ---\n{list_directory(ui_components_dir)}" + teardown = read_file(os.path.join(DOCS_DIR, "templates", "teardown_manifest.md")) + context += f"\n\n--- TEARDOWN TEMPLATE ---\n{teardown}" + elif "Ops" in agent_name: + context += read_file(os.path.join(DOCS_DIR, "product", "current_run.md")) + context += read_file(os.path.join(DOCS_DIR, "ops", "launch_checklist.md")) + context += read_file(os.path.join(DOCS_DIR, "company", "scorecard.md")) + context += "\n\n--- ACTIVE FEATURE ARTIFACTS ---\n" + for artifact_path in get_active_artifacts(): + fname = os.path.basename(artifact_path) + fcontent = read_file(os.path.join(BASE_DIR, artifact_path)) + context += f"\n--- FILE: {fname} ---\n{fcontent}\n" + teardown = read_file(os.path.join(DOCS_DIR, "templates", "teardown_manifest.md")) + context += f"\n\n--- TEARDOWN TEMPLATE ---\n{teardown}" + + return re.sub(r"\n{3,}", "\n\n", context) + + +def check_human_pause(response_text): + pauses = [ + r"REVERSIBILITY:\s*\[1-Way\]", + r"DATA:\s*\[Pending", + r"CIRCUIT_BREAKER", + r"TEARDOWN:\s*\[Needed\]", + r"ADR_STATE:\s*\[Pending Human\]", + ] + for p in pauses: + match = re.search(p, response_text, re.IGNORECASE) + if match: + return match.group(0) + return None + + +def extract_routing_queue(response_text): + match = re.search(r"ROUTING:\s*\[(.*?)\]", response_text, re.IGNORECASE) + if match: + raw_route = match.group(1).strip() + if "None" in raw_route or "Experiment" in raw_route: + return [] + return [agent.strip() for agent in raw_route.split("->")] + return None + + +def execute_autonomous_actions(response_text): + match = re.search(r"```json\s*\n(.*?)```", response_text, re.DOTALL | re.IGNORECASE) + if not match: + return None + + try: + json_str = match.group(1).strip().replace("\xa0", " ") + payload = json.loads(json_str, strict=False) + execution_logs = [] + + if "write_files" in payload: + for file_data in payload["write_files"]: + path = file_data.get("path") + content = file_data.get("content") + if path and content: + result = write_file(path, content) + execution_logs.append(result) + if "SUCCESS" in result: + lint_result = auto_lint_file(path) + if lint_result: + execution_logs.append(lint_result) + + if "append_to_file" in payload: + for file_data in payload["append_to_file"]: + path = file_data.get("path") + content = file_data.get("content") + if path and content: + result = append_file(path, content) + execution_logs.append(result) + if "SUCCESS" in result: + lint_result = auto_lint_file(path) + if lint_result: + execution_logs.append(lint_result) + + if "run_commands" in payload: + for cmd in payload["run_commands"]: + result = run_shell_command(cmd) + execution_logs.append(f"$ {cmd}\n{result}") + + return "\n\n".join(execution_logs) + except json.JSONDecodeError as e: + return ( + f"[ERROR: The OS failed to parse your JSON action block. Python Error: {e}. " + "Ensure you are properly escaping quotes and newlines inside your Markdown strings.]" + ) + except Exception as e: + return f"[ERROR: OS Execution failed - {e}]" + + +def run_os(user_input, flags=None): + if flags is None: + flags = [] + + llm = LLMClient() + verbose = "--os-verbose" in flags + + print("=== Solopreneur OS Initialized ===") + print(f"šŸ”§ Smart Routing: {'ON' if SMART_ROUTING else 'OFF'}") + + telemetry_path = os.path.join(DOCS_DIR, "ops", "telemetry.jsonl") + os.makedirs(os.path.dirname(telemetry_path), exist_ok=True) + with open(telemetry_path, "w", encoding="utf-8") as f: + f.write("") + + agent_queue = [] + if "[HOTFIX]" in user_input: + agent_queue.append("Engineering") + current_prompt = user_input.replace("[HOTFIX]", "").strip() + elif "[TEARDOWN]" in user_input: + agent_queue.append("Engineering") + teardown_prompt = user_input.replace("[TEARDOWN]", "").strip() + current_prompt = teardown_prompt + "\n\nCRITICAL: Execute Teardown." + elif "[START:" in user_input: + match = re.search(r"\[START:\s*(.*?)\]", user_input) + if match: + agent_queue.append(match.group(1).strip()) + current_prompt = user_input + else: + agent_queue.append("Strategy") + current_prompt = user_input + else: + agent_queue.append("Strategy") + current_prompt = user_input + + step_count = 0 + while agent_queue: + step_count += 1 + if step_count > MAX_CHAIN_STEPS: + print("\nšŸ›‘ ERROR: Maximum execution steps reached.") + sys.exit(1) + + current_agent = agent_queue.pop(0) + base_skill = current_agent.split("(")[0].strip() + + skill_file_map = { + "Strategy": "strategy.xml", + "Product Spec": "product_spec.xml", + "Design": "design.xml", + "Engineering": "engineering.xml", + "Growth Ops": "growth_ops.xml", + "Ops": "growth_ops.xml", + } + + skill_file = skill_file_map.get(base_skill, "engineering.xml") + skill_prompt = read_file(os.path.join(AGENTS_DIR, skill_file)) + + print(f"\n[šŸš€ Waking up {current_agent} Agent...]") + + full_system_prompt = f"{skill_prompt}\n\nCONTEXT:\n{assemble_context(base_skill)}" + user_task = f"TASK:\n{current_prompt}" + + if verbose: + print( + f"šŸ”Ž [VERBOSE]: Sending {len(full_system_prompt)} chars of " + f"cached system context to {current_agent}..." + ) + print(f"--- USER TASK ---\n{user_task}\n-----------------") + + response = llm.call( + base_skill, + full_system_prompt, + user_task, + ) + + if verbose: + print(f"\n[{current_agent} Output]:\n{response}\n") + else: + print(f"āœ… {current_agent} successfully completed task.") + + action_results = execute_autonomous_actions(response) + if action_results: + print(f"\nšŸ¤– [OS EXECUTING ACTIONS]:\n{action_results}") + if "FAIL" in action_results or "Error" in action_results or "error" in action_results: + print("āš ļø Tests failed! Routing back to Engineering for an autonomous fix...") + agent_queue.insert(0, "Engineering") + current_prompt = ( + "Your previous code changes caused test failures. Fix them.\n\n" + f"TEST OUTPUT:\n{action_results}" + ) + continue + + pause_reason = check_human_pause(response) + if pause_reason: + print("šŸ›‘ HUMAN IN THE LOOP TRIGGERED. Pipeline paused.") + print( + "šŸ’” Action Required: Review the output (e.g. approve the ADR or execute " + "Teardown), update files manually, and run OS again." + ) + handoff_path = os.path.join(DOCS_DIR, "ops", "handoff.md") + os.makedirs(os.path.dirname(handoff_path), exist_ok=True) + with open(handoff_path, "w", encoding="utf-8") as f: + f.write(f"STATUS: PAUSED\nREASON: {pause_reason}\nAGENT: {current_agent}\n") + sys.exit(0) + + new_queue = extract_routing_queue(response) + if new_queue is None: + print("āš ļø WARNING: Agent forgot ROUTING tag. Halting to prevent loop.") + break + + if len(new_queue) == 0: + print("āœ… Terminal state reached. Pipeline complete.") + handoff_path = os.path.join(DOCS_DIR, "ops", "handoff.md") + os.makedirs(os.path.dirname(handoff_path), exist_ok=True) + with open(handoff_path, "w", encoding="utf-8") as f: + f.write("STATUS: COMPLETE\nREASON: Terminal state reached.\n") + break + + agent_queue = new_queue + print(f"šŸ”€ New Routing Queue established: {' -> '.join(agent_queue)}") + print(f"ā­ļø Handoff: Passing context to {agent_queue[0]}...") + current_prompt = f"Process the output from the previous stage:\n{response}" diff --git a/engine/tools.py b/engine/tools.py new file mode 100644 index 0000000..25dde2d --- /dev/null +++ b/engine/tools.py @@ -0,0 +1,239 @@ +import os +import re +import shlex +import shutil +import subprocess +from pathlib import Path + +# --- SHIFT-LEFT: Explicit whitelist of allowed command prefixes --- +ALLOWED_COMMANDS = ( + "npm run ", + "uv run ", + "pytest ", + "npx ", # Added for biome linting +) + +# --- ABSOLUTE PATH RESOLUTION (Upgraded for /engine subdirectory) --- +BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +DOCS_DIR = os.path.join(BASE_DIR, "docs") +AGENTS_DIR = os.path.join(BASE_DIR, "engine", "agents") + + +def is_path_safe(filepath): + try: + target_path = Path(filepath).resolve() + base_path = Path(BASE_DIR).resolve() + + allowed_dirs = [ + base_path / "src", + base_path / "tests", + base_path / "docs", + base_path / "public", + ] + + allowed_root_files = [ + base_path / "render.yaml", + base_path / "vercel.json", + base_path / "netlify.toml", + ] + + restricted_files = [ + base_path / "orchestrator.py", + base_path / ".env", + base_path / "pyproject.toml", + base_path / "package.json", + base_path / "uv.lock", + ] + + if target_path in restricted_files: + return False + if target_path in allowed_root_files: + return True + + restricted_dirs = [base_path / ".github", base_path / ".git", base_path / "agents"] + if any(target_path.is_relative_to(r_dir) for r_dir in restricted_dirs): + return False + + return any(target_path.is_relative_to(d) for d in allowed_dirs) + except Exception: + return False + + +def write_file(filepath, content): + abs_path = os.path.join(BASE_DIR, filepath) if not os.path.isabs(filepath) else filepath + if not is_path_safe(abs_path): + return f"[ERROR: Permission denied to write to {filepath}]" + try: + os.makedirs(os.path.dirname(abs_path), exist_ok=True) + with open(abs_path, "w", encoding="utf-8") as f: + f.write(content) + return f"[SUCCESS: File written to {filepath}]" + except Exception as e: + return f"[ERROR: Failed to write to {filepath} - {e}]" + + +def append_file(filepath, content): + abs_path = os.path.join(BASE_DIR, filepath) if not os.path.isabs(filepath) else filepath + if not is_path_safe(abs_path): + return f"[ERROR: Permission denied to append to {filepath}]" + try: + os.makedirs(os.path.dirname(abs_path), exist_ok=True) + prefix = "" + if os.path.exists(abs_path): + with open(abs_path, encoding="utf-8") as f: + current_content = f.read() + if current_content and not current_content.endswith("\n"): + prefix = "\n" + with open(abs_path, "a", encoding="utf-8") as f: + f.write(prefix + content + "\n") + return f"[SUCCESS: Data appended to {filepath}]" + except Exception as e: + return f"[ERROR: Failed to append to {filepath} - {e}]" + + +def run_shell_command(command: str) -> str: + if not command.startswith(ALLOWED_COMMANDS): + return f"[ERROR: Command '{command}' not allowed.]" + if any(char in command for char in ["&", "|", ";", ">", "<"]): + return "[ERROR: Shell injection prohibited.]" + try: + args = shlex.split(command) + if os.name == "nt": + executable = shutil.which(args[0]) + if executable: + args[0] = executable + + print(f" $ {command}") + # noqa: S603 tells the linter we have explicitly sandboxed this input + result = subprocess.run( # noqa: S603 + args, capture_output=True, text=True, encoding="utf-8", timeout=60, shell=False + ) + + def truncate_output(text, max_len=1000): + if not text or len(text) <= max_len: + return text + half = max_len // 2 + return text[:half] + f"\n\n.[TRUNCATED {len(text) - max_len} CHARS].\n\n" + text[-half:] + + output = truncate_output(result.stdout.strip()) + error = truncate_output(result.stderr.strip()) + combined_output = output + if error: + combined_output += f"\nSTDERR:\n{error}" + + if len(combined_output) > 8000: + combined_output = ( + combined_output[:8000] + "\n\n...[SYSTEM WARNING: Truncated at 8000 chars]..." + ) + + # SHIFT-LEFT: XML Caching Tags applied to shell outputs + if result.returncode == 0: + final_out = combined_output if combined_output else "SUCCESS" + return f'\n{final_out}\n' + else: + return f'\n{combined_output}\n' + except subprocess.TimeoutExpired: + return "[ERROR: Command timed out after 60 seconds.]" + except Exception as e: + return f"[ERROR: Command execution failed - {str(e)}]" + + +def read_file(filepath): + try: + with open(filepath, encoding="utf-8") as f: + return f.read() + except FileNotFoundError: + return f"[SYSTEM NOTE: The file {filepath} was not found.]" + + +def tail_file(filepath, lines=50): + try: + with open(filepath, encoding="utf-8") as f: + content = f.readlines() + if len(content) > lines: + return "".join( + content[:2] + ["\n...[Older entries omitted]...\n\n"] + content[-lines:] + ) + return "".join(content) + except FileNotFoundError: + return f"[SYSTEM NOTE: {filepath} not found.]" + + +def extract_section(filepath, section_header): + content = read_file(filepath) + safe_header = re.escape(section_header) + pattern = rf"(?i)(##\s*{safe_header}.*?)(?=\n## |\Z)" + match = re.search(pattern, content, re.DOTALL) + if match: + return match.group(1).strip() + return f"[SYSTEM NOTE: Section '{section_header}' not found in {filepath}]" + + +def get_active_artifacts(): + run_path = os.path.join(DOCS_DIR, "product", "current_run.md") + content = read_file(run_path) + artifacts = [] + paths = re.findall(r"(?:docs|src|public|tests)[a-zA-Z0-9_./-]+\.[a-zA-Z0-9]+", content) + for path in set(paths): + if "current_run.md" not in path: + artifacts.append(path) + return artifacts + + +def list_directory(dir_path): + try: + files = os.listdir(dir_path) + ignored = {".git", "node_modules", ".venv", "__pycache__"} + filtered_files = [f for f in files if not (f.endswith(".csv") or f in ignored)] + if not filtered_files: + return f"[SYSTEM NOTE: Directory {dir_path} is empty or only contains ignored files.]" + return "\n".join([f"- {f}" for f in filtered_files]) + except FileNotFoundError: + return f"[SYSTEM NOTE: Directory {dir_path} not found.]" + + +def read_directory_contents(dir_path): + content = "" + try: + ignored = {".git", "node_modules", ".venv", "__pycache__"} + for filename in os.listdir(dir_path): + if filename.endswith(".csv") or filename in ignored: + continue + if filename.endswith(".md"): + filepath = os.path.join(dir_path, filename) + content += f'\n\n{read_file(filepath)}\n\n' + except FileNotFoundError: + pass + return content + + +def auto_lint_file(filepath): + abs_path = os.path.join(BASE_DIR, filepath) if not os.path.isabs(filepath) else filepath + ext = os.path.splitext(abs_path)[1] + args = [] + if ext == ".py": + args = ["uv", "run", "ruff", "check", "--no-cache", abs_path] + elif ext in [".ts", ".tsx", ".js", ".jsx"]: + args = ["npx", "biome", "check", abs_path] + else: + return None + + if os.name == "nt": + executable = shutil.which(args[0]) + if executable: + args[0] = executable + + try: + # noqa: S603 tells the linter we have explicitly sandboxed this input + result = subprocess.run( # noqa: S603 + args, capture_output=True, text=True, encoding="utf-8", timeout=30, shell=False + ) + if result.returncode != 0: + return ( + f"[āš ļø AUTO-LINT FAILED on {filepath}]:\n" + f"{result.stdout}\n{result.stderr}\n" + "Fix this syntax error before proceeding." + ) + return f"[āœ… AUTO-LINT PASSED for {filepath}]" + except Exception as e: + return f"[āš ļø AUTO-LINT EXECUTION ERROR on {filepath}]: {e}" diff --git a/orchestrator.py b/orchestrator.py deleted file mode 100644 index f638cb3..0000000 --- a/orchestrator.py +++ /dev/null @@ -1,832 +0,0 @@ -import json -import os -import re -import shlex -import shutil -import subprocess -import sys -import time -from pathlib import Path - -import litellm - -# --- SHIFT-LEFT: CROSS-PLATFORM ENCODING FIX --- -# Forces Windows terminals to support UTF-8 emojis without crashing -if sys.stdout.encoding.lower() != "utf-8": - sys.stdout.reconfigure(encoding="utf-8") - -# SHIFT-LEFT: Explicit whitelist of allowed command prefixes -ALLOWED_COMMANDS = ( - "npm run ", - "uv run ", - "pytest ", -) - -# --- ABSOLUTE PATH RESOLUTION --- -# This ensures the OS can be run from ANY directory without corrupting memory -BASE_DIR = os.path.dirname(os.path.abspath(__file__)) -DOCS_DIR = os.path.join(BASE_DIR, "docs") -AGENTS_DIR = os.path.join(BASE_DIR, "agents") - - -# --- PRE-FLIGHT BOOT CHECK --- -# Ensures users have installed dependencies before the OS tries to run automated tests -def check_dependencies(): - missing = [] - if not os.path.exists(os.path.join(BASE_DIR, "node_modules")): - missing.append("npm install") - if not os.path.exists(os.path.join(BASE_DIR, ".venv")): - missing.append("uv sync") - - if missing: - print("šŸ›‘ OS BOOT FAILED: Missing dependencies.") - print("Please run the following commands before starting the OS:") - for cmd in missing: - print(f" $ {cmd}") - sys.exit(1) - - -check_dependencies() - -# --- ENVIRONMENT & SECRETS --- -try: - from dotenv import load_dotenv - - load_dotenv(os.path.join(BASE_DIR, ".env")) -except ImportError: - print("āŒ ERROR: python-dotenv package not found. Run: pip install python-dotenv") - sys.exit(1) - -# --- CONFIGURATION --- -MAX_CHAIN_STEPS = 10 -SMART_ROUTING = os.environ.get("SMART_ROUTING", "true").lower() == "true" -DEFAULT_PROVIDER = os.environ.get("DEFAULT_PROVIDER", "openai").lower() - -# --- SMART MODEL MAPPING (LiteLLM Format) --- -# LiteLLM uses the standard format: provider/model_name -MODEL_MAP = { - "Strategy": "openrouter/openai/gpt-4o-mini", - "Product Spec": "openrouter/openai/gpt-4o", - "Design": "openrouter/openai/gpt-4o", - "Engineering": "openrouter/openai/gpt-4o", - "Growth Ops": "openrouter/openai/gpt-4o-mini", - "Ops": "openrouter/openai/gpt-4o-mini", -} - - -# --- TELEMETRY LOGGER --- -def log_token_usage(agent, provider, model, p_tokens, c_tokens, elapsed): - """Appends token usage and latency telemetry to a local CSV artifact.""" - log_path = os.path.join(DOCS_DIR, "ops", "token_tracker.csv") - file_exists = os.path.exists(log_path) - - try: - # Ensure the ops directory exists - os.makedirs(os.path.dirname(log_path), exist_ok=True) - with open(log_path, "a", encoding="utf-8") as f: - if not file_exists: - f.write( - "timestamp,agent,provider,model,prompt_tokens,completion_tokens,latency_s\n" - ) - - timestamp = time.strftime("%Y-%m-%d %H:%M:%S") - f.write(f"{timestamp},{agent},{provider},{model},{p_tokens},{c_tokens},{elapsed:.2f}\n") - except Exception as e: - print(f"āš ļø Could not write telemetry log: {e}") - - -def log_jsonl_telemetry( - agent, provider, model, p_tokens, c_tokens, elapsed, system_prompt, user_prompt, response -): - """Appends full execution context to a JSONL file for Brain OS / Human debugging.""" - log_path = os.path.join(DOCS_DIR, "ops", "telemetry.jsonl") - try: - os.makedirs(os.path.dirname(log_path), exist_ok=True) - entry = { - "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), - "agent": agent, - "provider": provider, - "model": model, - "prompt_tokens": p_tokens, - "completion_tokens": c_tokens, - "latency_s": round(elapsed, 2), - "response": response, - } - with open(log_path, "a", encoding="utf-8") as f: - f.write(json.dumps(entry) + "\n") - except Exception as e: - print(f"āš ļø Could not write JSONL telemetry: {e}") - - -# --- API CLIENT --- -class LLMClient: - def __init__(self): - # SHIFT LEFT: LiteLLM automatically picks up os.environ keys (OPENROUTER_API_KEY, etc.) - # We enforce strict key validation here so the OS fails on boot, not mid-run. - if SMART_ROUTING and not os.environ.get("OPENROUTER_API_KEY"): - print("āŒ SHIFT LEFT ERROR: SMART_ROUTING is ON, but OPENROUTER_API_KEY is missing.") - sys.exit(1) - - def call(self, agent_name, system_prompt, user_prompt): - # 1. Determine Model using LiteLLM syntax - if SMART_ROUTING and agent_name in MODEL_MAP: - model = MODEL_MAP[agent_name] - else: - if DEFAULT_PROVIDER == "openai": - model = "openai/gpt-4o-mini" - elif DEFAULT_PROVIDER == "anthropic": - model = "anthropic/claude-3-5-sonnet-latest" - else: - model = "openrouter/openai/gpt-4o-mini" - - # 2. Format Messages - messages = [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ] - - # 3. Execute with built-in retries (Zero Debt: LiteLLM handles backoff) - start_time = time.time() - try: - # drop_params=True ensures compatibility if a provider doesn't support specific kwargs - response = litellm.completion( - model=model, messages=messages, temperature=0.2, num_retries=3, drop_params=True - ) - - text = response.choices[0].message.content - - # Universal Token Telemetry - p_tokens = response.usage.prompt_tokens - c_tokens = response.usage.completion_tokens - elapsed = time.time() - start_time - - log_token_usage(agent_name, "litellm", model, p_tokens, c_tokens, elapsed) - log_jsonl_telemetry( - agent_name, - "litellm", - model, - p_tokens, - c_tokens, - elapsed, - system_prompt, - user_prompt, - text, - ) - - return text - - except litellm.AuthenticationError as e: - print(f"\nāŒ API AUTH FATAL ERROR ({model}): {e}") - sys.exit(1) - except Exception as e: - print(f"\nāŒ API ERROR ({model}): {e}") - sys.exit(1) - - -# --- AI FILE I/O SANDBOX --- -def is_path_safe(filepath): - """Sandbox security guardrail to prevent path traversal and unauthorized edits.""" - try: - target_path = Path(filepath).resolve() - base_path = Path(BASE_DIR).resolve() - - # Whitelisted directories - allowed_dirs = [ - base_path / "src", - base_path / "tests", - base_path / "docs", - base_path / "public", - ] - - # Whitelist specific root files for PaaS Deployments - allowed_root_files = [ - base_path / "render.yaml", - base_path / "vercel.json", - base_path / "netlify.toml", - ] - - # Blacklisted files (never touch these even if they are in base_path) - restricted_files = [ - base_path / "orchestrator.py", - base_path / ".env", - base_path / "pyproject.toml", - base_path / "package.json", - base_path / "uv.lock", - ] - - if target_path in restricted_files: - return False - - if target_path in allowed_root_files: - return True - - # Blacklisted directories - restricted_dirs = [ - base_path / ".github", - base_path / ".git", - base_path / "agents", # AI cannot rewrite its own brain! - ] - - if any(target_path.is_relative_to(restricted_dir) for restricted_dir in restricted_dirs): - return False - - # Must be in whitelist - return any(target_path.is_relative_to(d) for d in allowed_dirs) - - except Exception: - return False - - -def write_file(filepath, content): - """Safely writes content to a file if it passes the sandbox checks.""" - abs_path = os.path.join(BASE_DIR, filepath) if not os.path.isabs(filepath) else filepath - - if not is_path_safe(abs_path): - print(f"šŸ›‘ SECURITY BLOCK: AI attempted to write to unauthorized path: {filepath}") - return f"[ERROR: Permission denied to write to {filepath}]" - - try: - os.makedirs(os.path.dirname(abs_path), exist_ok=True) - with open(abs_path, "w", encoding="utf-8") as f: - f.write(content) - return f"[SUCCESS: File written to {filepath}]" - except Exception as e: - return f"[ERROR: Failed to write to {filepath} - {e}]" - - -def append_file(filepath, content): - """Safely appends content to a file if it passes the sandbox checks.""" - abs_path = os.path.join(BASE_DIR, filepath) if not os.path.isabs(filepath) else filepath - - if not is_path_safe(abs_path): - print(f"šŸ›‘ SECURITY BLOCK: AI attempted to write to unauthorized path: {filepath}") - return f"[ERROR: Permission denied to append to {filepath}]" - - try: - os.makedirs(os.path.dirname(abs_path), exist_ok=True) - - # Check if the file currently exists and ensure it ends with a newline - prefix = "" - if os.path.exists(abs_path): - with open(abs_path, encoding="utf-8") as f: - current_content = f.read() - if current_content and not current_content.endswith("\n"): - prefix = "\n" - - # Append the prefix, the content, and a trailing newline - with open(abs_path, "a", encoding="utf-8") as f: - f.write(prefix + content + "\n") - - return f"[SUCCESS: Data appended to {filepath}]" - except Exception as e: - return f"[ERROR: Failed to append to {filepath} - {e}]" - - -def run_shell_command(command: str) -> str: - """Executes a whitelisted shell command and returns its output safely.""" - # 1. Sandbox Checks - if not command.startswith(ALLOWED_COMMANDS): - return f"[ERROR: Command '{command}' not allowed.]" - - if any(char in command for char in ["&", "|", ";", ">", "<"]): - return "[ERROR: Shell injection prohibited.]" - - try: - args = shlex.split(command) - - # SHIFT-LEFT: Cross-Platform Executable Resolution - # Windows requires the exact .cmd/.exe path if shell=False - if os.name == "nt": - executable = shutil.which(args[0]) - if executable: - args[0] = executable - - # 2. Strict Execution - print(f" $ {command}") - - # noqa: S603 tells the linter we have explicitly sandboxed this input - result = subprocess.run( # noqa: S603 - args, - capture_output=True, - text=True, - encoding="utf-8", - timeout=60, - shell=False, - ) - - # --- SHIFT-LEFT: TERMINAL EXHAUST TRUNCATION --- - # Never send more than 1000 characters of a terminal error back to the LLM - def truncate_output(text, max_len=1000): - if not text or len(text) <= max_len: - return text - half = max_len // 2 - # Broken into multiple lines to fix Ruff E501 (Line too long) - return ( - text[:half] + - f"\n\n...[TRUNCATED {len(text) - max_len} CHARS]...\n\n" + - text[-half:] - ) - - output = truncate_output(result.stdout.strip()) - error = truncate_output(result.stderr.strip()) - # ----------------------------------------------- - - # --- SHIFT-LEFT: TOKEN ECONOMICS (TRUNCATION) --- - combined_output = output - if error: - combined_output += f"\nSTDERR:\n{error}" - - if len(combined_output) > 8000: - combined_output = ( - combined_output[:8000] + - "\n\n...[SYSTEM WARNING: Output truncated at 8000 characters to save context.]..." - ) - - if result.returncode == 0: - return combined_output if combined_output else f"[SUCCESS: {command}]" - else: - return f"[ERROR: Command execution failed]\n{combined_output}" - - except subprocess.TimeoutExpired: - return "[ERROR: Command timed out after 60 seconds.]" - except Exception as e: - return f"[ERROR: Command execution failed - {str(e)}]" - - -# --- DETERMINISTIC CONTEXT PRUNING --- -def read_file(filepath): - try: - with open(filepath, encoding="utf-8") as f: - return f.read() - except FileNotFoundError: - return f"[SYSTEM NOTE: The file {filepath} was not found.]" - - -def tail_file(filepath, lines=50): - try: - with open(filepath, encoding="utf-8") as f: - content = f.readlines() - if len(content) > lines: - return "".join( - content[:2] + ["\n...[Older entries omitted]...\n\n"] + content[-lines:] - ) - return "".join(content) - except FileNotFoundError: - return f"[SYSTEM NOTE: {filepath} not found.]" - - -def extract_section(filepath, section_header): - content = read_file(filepath) - # Safely escape the header to prevent regex injection crashes - safe_header = re.escape(section_header) - pattern = rf"(?i)(##\s*{safe_header}.*?)(?=\n## |\Z)" - match = re.search(pattern, content, re.DOTALL) - if match: - return match.group(1).strip() - return f"[SYSTEM NOTE: Section '{section_header}' not found in {filepath}]" - - -def get_active_artifacts(): - """Parses current_run.md for active artifacts from raw text without header dependencies.""" - run_path = os.path.join(DOCS_DIR, "product", "current_run.md") - content = read_file(run_path) - - artifacts = [] - # SHIFT-LEFT: Match any project file path anywhere in the document. - # Pattern matches common project paths: docs/, src/, public/, tests/ with typical extensions. - paths = re.findall(r"(?:docs|src|public|tests)[a-zA-Z0-9_./-]+\.[a-zA-Z0-9]+", content) - - for path in set(paths): # Deduplicate identical paths - if "current_run.md" not in path: - artifacts.append(path) - - return artifacts - - -def list_directory(dir_path): - try: - files = os.listdir(dir_path) - # --- HYGIENE PATCH: Filter out junk from context --- - ignored = {".git", "node_modules", ".venv", "__pycache__"} - filtered_files = [f for f in files if not (f.endswith(".csv") or f in ignored)] - - if not filtered_files: - return f"[SYSTEM NOTE: Directory {dir_path} is empty or only contains ignored files.]" - return "\n".join([f"- {f}" for f in filtered_files]) - except FileNotFoundError: - return f"[SYSTEM NOTE: Directory {dir_path} not found.]" - - -def read_directory_contents(dir_path): - """Reads and concatenates safe files in a given directory.""" - content = "" - try: - ignored = {".git", "node_modules", ".venv", "__pycache__"} - for filename in os.listdir(dir_path): - # --- HYGIENE PATCH: Skip token-wasting files --- - if filename.endswith(".csv") or filename in ignored: - continue - - # Currently restricted to markdown - if filename.endswith(".md"): - filepath = os.path.join(dir_path, filename) - content += f'\n\n{read_file(filepath)}\n\n' - except FileNotFoundError: - pass - return content - - -def assemble_context(agent_name): - memory_path = os.path.join(DOCS_DIR, "company", "lessons_learned.md") - context = f"\n\n--- SYSTEM MEMORY ---\n{read_file(memory_path)}\n" - - # Define dynamic paths - contracts_dir = os.path.join(DOCS_DIR, "product", "contracts") - public_dir = os.path.join(BASE_DIR, "public") - ui_components_dir = os.path.join(BASE_DIR, "src", "web", "components", "ui") - - if "Strategy" in agent_name: - context += read_file(os.path.join(DOCS_DIR, "company", "thesis.md")) - feedback_log_path = os.path.join(DOCS_DIR, "company", "feedback_log.md") - context += tail_file(feedback_log_path, lines=40) - context += read_file(os.path.join(DOCS_DIR, "company", "scorecard.md")) - - elif "Spec" in agent_name: - backlog_path = os.path.join(DOCS_DIR, "product", "backlog.md") - context += extract_section(backlog_path, "High Priority") - context += read_file(os.path.join(DOCS_DIR, "product", "current_run.md")) - context += read_file(os.path.join(DOCS_DIR, "product", "architecture.md")) - - # --- CONTEXT FUNNELING: Only load active artifacts --- - context += "\n\n--- ACTIVE FEATURE ARTIFACTS ---\n" - for artifact_path in get_active_artifacts(): - fcontent = read_file(os.path.join(BASE_DIR, artifact_path)) - context += f'\n\n{fcontent}\n\n' - - # List existing contracts instead of reading all their contents to save tokens - contract_list = list_directory(contracts_dir) - context += f"\n\n--- EXISTING DATA CONTRACTS (Dir Listing) ---\n{contract_list}" - context += f"\n\n--- PUBLIC ASSETS ---\n{list_directory(public_dir)}" - - elif "Design" in agent_name: - context += read_file(os.path.join(DOCS_DIR, "product", "current_run.md")) - context += read_file(os.path.join(DOCS_DIR, "product", "flows.md")) - context += read_file(os.path.join(DOCS_DIR, "product", "style_guide.md")) - context += read_file(os.path.join(BASE_DIR, "src", "web", "lib", "content.ts")) - - # --- CONTEXT FUNNELING: Only load active artifacts --- - context += "\n\n--- ACTIVE FEATURE ARTIFACTS ---\n" - for artifact_path in get_active_artifacts(): - fname = os.path.basename(artifact_path) - fcontent = read_file(os.path.join(BASE_DIR, artifact_path)) - context += f"\n--- FILE: {fname} ---\n{fcontent}\n" - - context += f"\n\n--- PUBLIC ASSETS ---\n{list_directory(public_dir)}" - context += f"\n\n--- AVAILABLE UI COMPONENTS ---\n{list_directory(ui_components_dir)}" - - blueprint = read_file(os.path.join(DOCS_DIR, "templates", "design_blueprint.md")) - context += f"\n\n--- OUTPUT TEMPLATE ---\n{blueprint}" - - elif "Engineering" in agent_name: - context += read_file(os.path.join(DOCS_DIR, "product", "current_run.md")) - context += read_file(os.path.join(DOCS_DIR, "product", "architecture.md")) - context += read_file(os.path.join(DOCS_DIR, "product", "adr", "README.md")) - context += read_file(os.path.join(DOCS_DIR, "product", "flows.md")) - context += read_file(os.path.join(DOCS_DIR, "product", "style_guide.md")) - context += read_file(os.path.join(BASE_DIR, "src", "web", "lib", "content.ts")) - - # --- CONTEXT FUNNELING: Only load active artifacts --- - context += "\n\n--- ACTIVE FEATURE ARTIFACTS ---\n" - for artifact_path in get_active_artifacts(): - fname = os.path.basename(artifact_path) - fcontent = read_file(os.path.join(BASE_DIR, artifact_path)) - context += f"\n--- FILE: {fname} ---\n{fcontent}\n" - - context += f"\n\n--- PUBLIC ASSETS ---\n{list_directory(public_dir)}" - context += f"\n\n--- AVAILABLE UI COMPONENTS ---\n{list_directory(ui_components_dir)}" - - teardown = read_file(os.path.join(DOCS_DIR, "templates", "teardown_manifest.md")) - context += f"\n\n--- TEARDOWN TEMPLATE ---\n{teardown}" - - elif "Ops" in agent_name: - context += read_file(os.path.join(DOCS_DIR, "product", "current_run.md")) - context += read_file(os.path.join(DOCS_DIR, "ops", "launch_checklist.md")) - context += read_file(os.path.join(DOCS_DIR, "company", "scorecard.md")) - - # --- CONTEXT FUNNELING: Only load active artifacts --- - context += "\n\n--- ACTIVE FEATURE ARTIFACTS ---\n" - for artifact_path in get_active_artifacts(): - fname = os.path.basename(artifact_path) - fcontent = read_file(os.path.join(BASE_DIR, artifact_path)) - context += f"\n--- FILE: {fname} ---\n{fcontent}\n" - - teardown = read_file(os.path.join(DOCS_DIR, "templates", "teardown_manifest.md")) - context += f"\n\n--- TEARDOWN TEMPLATE ---\n{teardown}" - - return re.sub(r"\n{3,}", "\n\n", context) - - -# --- PARSING & ROUTING LOGIC --- -def check_human_pause(response_text): - pauses = [ - r"REVERSIBILITY:\s*\[1-Way\]", - r"DATA:\s*\[Pending", - r"CIRCUIT_BREAKER", - r"TEARDOWN:\s*\[Needed\]", - r"ADR_STATE:\s*\[Pending Human\]", - ] - for p in pauses: - match = re.search(p, response_text, re.IGNORECASE) - if match: - return match.group(0) # Return the specific matched reason - return None - - -def extract_routing_queue(response_text): - match = re.search(r"ROUTING:\s*\[(.*?)\]", response_text, re.IGNORECASE) - if match: - raw_route = match.group(1).strip() - if "None" in raw_route or "Experiment" in raw_route: - return [] - return [agent.strip() for agent in raw_route.split("->")] - return None - - -def auto_lint_file(filepath): - """Zero-Cost Pre-Audit: Automatically lints files immediately after they are written.""" - abs_path = os.path.join(BASE_DIR, filepath) if not os.path.isabs(filepath) else filepath - ext = os.path.splitext(abs_path)[1] - - args = [] - if ext == ".py": - args = ["uv", "run", "ruff", "check", "--no-cache", abs_path] - elif ext in [".ts", ".tsx", ".js", ".jsx"]: - # Forge uses Biome for JS/TS - args = ["npx", "biome", "check", abs_path] - else: - return None # No auto-linter for this file type - - # Cross-Platform Executable Resolution (Windows Support) - if os.name == "nt": - executable = shutil.which(args[0]) - if executable: - args[0] = executable - - try: - # noqa: S603 tells the linter we explicitly control the args array - result = subprocess.run( # noqa: S603 - args, capture_output=True, text=True, encoding="utf-8", timeout=30, shell=False - ) - if result.returncode != 0: - # Wrap the long string in parentheses to comply with the 100-char limit - return ( - f"[āš ļø AUTO-LINT FAILED on {filepath}]:\n" - f"{result.stdout}\n{result.stderr}\n" - "Fix this syntax error before proceeding." - ) - return f"[āœ… AUTO-LINT PASSED for {filepath}]" - except Exception as e: - return f"[āš ļø AUTO-LINT EXECUTION ERROR on {filepath}]: {e}" - - -def execute_autonomous_actions(response_text): - """Scans the AI's response for a JSON payload and executes the sandbox tools.""" - # Look for a JSON block explicitly tagged for the OS - match = re.search(r"```json\s*\n(.*?)```", response_text, re.DOTALL | re.IGNORECASE) - if not match: - return None # No automated actions requested - - try: - json_str = match.group(1).strip().replace("\xa0", " ") - payload = json.loads(json_str, strict=False) - - execution_logs = [] - - # 1. Execute File Writes (Sledgehammer) - if "write_files" in payload: - for file_data in payload["write_files"]: - path = file_data.get("path") - content = file_data.get("content") - if path and content: - result = write_file(path, content) - execution_logs.append(result) - - # --- SHIFT-LEFT: FORGE AUTO-LINTING --- - if "SUCCESS" in result: - lint_result = auto_lint_file(path) - if lint_result: - execution_logs.append(lint_result) - - # 1.5 Execute File Appends (Scalpel) - if "append_to_file" in payload: - for file_data in payload["append_to_file"]: - path = file_data.get("path") - content = file_data.get("content") - if path and content: - result = append_file(path, content) - execution_logs.append(result) - - # --- SHIFT-LEFT: FORGE AUTO-LINTING --- - if "SUCCESS" in result: - lint_result = auto_lint_file(path) - if lint_result: - execution_logs.append(lint_result) - - # 2. Execute Shell Commands (Testing/Linting) - if "run_commands" in payload: - for cmd in payload["run_commands"]: - result = run_shell_command(cmd) - execution_logs.append(f"$ {cmd}\n{result}") - - return "\n\n".join(execution_logs) - - except json.JSONDecodeError as e: - return ( - f"[ERROR: The OS failed to parse your JSON action block. Python Error: {e}. " - "Ensure you are properly escaping quotes and newlines inside your Markdown strings.]" - ) - except Exception as e: - return f"[ERROR: OS Execution failed - {e}]" - - -# --- CORE EXECUTION LOOP --- -def run_os(user_input, flags=None): - if flags is None: - flags = [] - - # Lazy initialization so the module can be imported for testing - llm = LLMClient() - verbose = "--os-verbose" in flags - - print("=== Solopreneur OS Initialized ===") - print(f"šŸ”§ Smart Routing: {'ON' if SMART_ROUTING else 'OFF'}") - - # --- SHIFT-LEFT: TELEMETRY PRUNING --- - # Wipe the telemetry file clean at the start of every new run to save tokens! - telemetry_path = os.path.join(DOCS_DIR, "ops", "telemetry.jsonl") - os.makedirs(os.path.dirname(telemetry_path), exist_ok=True) - with open(telemetry_path, "w", encoding="utf-8") as f: - f.write("") - # ------------------------------------- - - agent_queue = [] - - if "[HOTFIX]" in user_input: - agent_queue.append("Engineering") - current_prompt = user_input.replace("[HOTFIX]", "").strip() - elif "[TEARDOWN]" in user_input: - agent_queue.append("Engineering") - teardown_prompt = user_input.replace("[TEARDOWN]", "").strip() - current_prompt = teardown_prompt + "\n\nCRITICAL: Execute Teardown." - elif "[START:" in user_input: - # Allow CEO to bypass Strategy and start at any agent - match = re.search(r"\[START:\s*(.*?)\]", user_input) - if match: - agent_queue.append(match.group(1).strip()) - current_prompt = user_input - else: - agent_queue.append("Strategy") - current_prompt = user_input - else: - agent_queue.append("Strategy") - current_prompt = user_input - - step_count = 0 - - while agent_queue: - step_count += 1 - if step_count > MAX_CHAIN_STEPS: - print("\nšŸ›‘ ERROR: Maximum execution steps reached.") - sys.exit(1) - - current_agent = agent_queue.pop(0) - base_skill = current_agent.split("(")[0].strip() - - skill_file_map = { - "Strategy": "strategy.xml", - "Product Spec": "product_spec.xml", - "Design": "design.xml", - "Engineering": "engineering.xml", - "Growth Ops": "growth_ops.xml", - "Ops": "growth_ops.xml", - } - - skill_file = skill_file_map.get(base_skill, "engineering.xml") - skill_prompt = read_file(os.path.join(AGENTS_DIR, skill_file)) - - print(f"\n[šŸš€ Waking up {current_agent} Agent...]") - - # Combine the Skill XML and the Context into a single System Prompt - full_system_prompt = f"{skill_prompt}\n\nCONTEXT:\n{assemble_context(base_skill)}" - user_task = f"TASK:\n{current_prompt}" - - if verbose: - print( - f"šŸ”Ž [VERBOSE]: Sending {len(full_system_prompt)} chars of " - f"cached system context to {current_agent}..." - ) - print(f"--- USER TASK ---\n{user_task}\n-----------------") - - response = llm.call( - base_skill, - full_system_prompt, - user_task, - ) - - if verbose: - print(f"\n[{current_agent} Output]:\n{response}\n") - else: - print(f"āœ… {current_agent} successfully completed task.") - - # --- AUTONOMOUS EXECUTION LOOP --- - action_results = execute_autonomous_actions(response) - - if action_results: - print(f"\nšŸ¤– [OS EXECUTING ACTIONS]:\n{action_results}") - - # If a test failed, feed it immediately back to the Engineering agent! - if "FAIL" in action_results or "Error" in action_results or "error" in action_results: - print("āš ļø Tests failed! Routing back to Engineering for an autonomous fix...") - agent_queue.insert(0, "Engineering") - current_prompt = ( - "Your previous code changes caused test failures. Fix them.\n\n" - f"TEST OUTPUT:\n{action_results}" - ) - continue # Skip the routing queue and immediately re-run the agent - - # --------------------------------- - - pause_reason = check_human_pause(response) - if pause_reason: - print("šŸ›‘ HUMAN IN THE LOOP TRIGGERED. Pipeline paused.") - print( - "šŸ’” Action Required: Review the output (e.g. approve the ADR or execute " - "Teardown), update files manually, and run OS again." - ) - # ZERO DEBT: Write handoff state for external orchestrators or human reference - handoff_path = os.path.join(DOCS_DIR, "ops", "handoff.md") - os.makedirs(os.path.dirname(handoff_path), exist_ok=True) - with open(handoff_path, "w", encoding="utf-8") as f: - f.write(f"STATUS: PAUSED\nREASON: {pause_reason}\nAGENT: {current_agent}\n") - sys.exit(0) - - new_queue = extract_routing_queue(response) - - if new_queue is None: - print("āš ļø WARNING: Agent forgot ROUTING tag. Halting to prevent loop.") - break - - if len(new_queue) == 0: - print("āœ… Terminal state reached. Pipeline complete.") - handoff_path = os.path.join(DOCS_DIR, "ops", "handoff.md") - os.makedirs(os.path.dirname(handoff_path), exist_ok=True) - with open(handoff_path, "w", encoding="utf-8") as f: - f.write("STATUS: COMPLETE\nREASON: Terminal state reached.\n") - break - - new_queue = extract_routing_queue(response) - - if new_queue is None: - print("āš ļø WARNING: Agent forgot ROUTING tag. Halting to prevent loop.") - break - - if len(new_queue) == 0: - print("āœ… Terminal state reached. Pipeline complete.") - break - - agent_queue = new_queue - print(f"šŸ”€ New Routing Queue established: {' -> '.join(agent_queue)}") - print(f"ā­ļø Handoff: Passing context to {agent_queue[0]}...") - current_prompt = f"Process the output from the previous stage:\n{response}" - - -if __name__ == "__main__": - prompt = "" - flags = [] - - # Parse args dynamically - for arg in sys.argv[1:]: - if arg.startswith("--"): - flags.append(arg) - elif not prompt: - prompt = arg - - # Fallback to reading the prompt from handoff.md if no CLI prompt is provided - handoff_path = os.path.join(DOCS_DIR, "ops", "handoff.md") - if not prompt and os.path.exists(handoff_path): - with open(handoff_path, encoding="utf-8") as f: - content = f.read() - match = re.search(r"PROMPT:\s*(.+)", content, re.IGNORECASE) - if match: - prompt = match.group(1).strip() - - if not prompt: - print("Usage: python orchestrator.py 'Your prompt' [--os-verbose]") - print("Or provide PROMPT: inside docs/ops/handoff.md") - sys.exit(1) - - try: - run_os(prompt, flags) - except KeyboardInterrupt: - print("\n\nšŸ›‘ OS Execution manually interrupted by user. Shutting down gracefully.") - sys.exit(0) diff --git a/pyproject.toml b/pyproject.toml index 0cf6342..6cddc2d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,9 +35,11 @@ ignore = [] "tests/**/*.py" = ["S101"] [tool.pytest.ini_options] -addopts = "-v -m \"not eval\" --strict-markers --cov=orchestrator --cov-report=term-missing --cov-fail-under=40" +# Added --cov=engine so it tracks our new directory +addopts = "-v -m \"not eval\" --strict-markers --cov=engine --cov-report=term-missing --cov-fail-under=80" pythonpath = "." -testpaths = ["tests/api", "tests/evals"] +# Added "tests/engine" to the discovery paths +testpaths = ["tests/engine", "tests/api", "tests/evals"] markers = [ "core: core system functionality", "integration: testing 3rd party APIs", diff --git a/tests/api/test_initial.py b/tests/api/test_initial.py index 44bd3e8..83cb2f4 100644 --- a/tests/api/test_initial.py +++ b/tests/api/test_initial.py @@ -1,13 +1,15 @@ import os -from fastapi.testclient import TestClient - -from orchestrator import ( - BASE_DIR, - assemble_context, +# 2. Import pipeline logic from engine.runtime +from engine.runtime import ( check_human_pause, execute_autonomous_actions, extract_routing_queue, +) + +# 1. Import physical tools from engine.tools +from engine.tools import ( + BASE_DIR, extract_section, is_path_safe, list_directory, @@ -15,64 +17,39 @@ run_shell_command, tail_file, ) -from src.api.main import app - -client = TestClient(app) - - -# --- 1. API SCAFFOLD TESTS --- -def test_get_system_status(): - """Ensure the system status endpoint returns a 200 OK and valid schema.""" - response = client.get("/api/v1/system/status") - assert response.status_code == 200 - data = response.json() - assert data["status"] == "operational" - assert "version" in data -def test_health_endpoint(): - """Ensure the FastAPI scaffold boots and responds to health checks.""" - response = client.get("/health") - assert response.status_code == 200 - assert response.json() == {"status": "ok", "message": "API is online"} - - -# --- 2. ORCHESTRATOR PARSING TESTS --- def test_routing_queue_extraction(): - """Ensure the orchestrator correctly parses the routing array.""" + """Ensure the runtime correctly parses the routing array.""" response = "Here is my analysis. ROUTING: [Design -> Engineering (Build)]" queue = extract_routing_queue(response) assert queue == ["Design", "Engineering (Build)"] def test_routing_terminal_state(): - """Ensure the orchestrator recognizes a terminal experiment state.""" + """Ensure the runtime recognizes a terminal experiment state.""" response = "The hypothesis is invalid. ROUTING: [Experiment Only]" queue = extract_routing_queue(response) assert queue == [] def test_human_pause_detection(): - """Ensure the orchestrator catches critical architectural shifts.""" + """Ensure the runtime catches critical architectural shifts.""" response = "This requires a database change. ADR_STATE: [Pending Human]" - # Updated to assert the exact string return instead of a boolean True assert check_human_pause(response) == "ADR_STATE: [Pending Human]" def test_human_pause_safe(): - """Ensure the orchestrator doesn't pause on safe outputs.""" + """Ensure the runtime doesn't pause on safe outputs.""" response = "The design looks good. REVERSIBILITY: [2-Way] ADR_STATE: [None]" - # Updated to assert None instead of a boolean False assert check_human_pause(response) is None -# --- 3. ORCHESTRATOR UTILITY TESTS --- def test_read_file(tmp_path): """Ensure file reading and missing file fallbacks work.""" test_file = tmp_path / "test.txt" test_file.write_text("hello world", encoding="utf-8") assert read_file(str(test_file)) == "hello world" - assert "was not found" in read_file("does_not_exist.txt") def test_tail_file(tmp_path): @@ -81,12 +58,9 @@ def test_tail_file(tmp_path): lines = [f"Line {i}\n" for i in range(10)] test_file.write_text("".join(lines), encoding="utf-8") - # Tail only the last 3 lines result = tail_file(str(test_file), lines=3) - assert "Line 0" in result # Should keep the first two lines - assert "Older entries omitted" in result # Should inject the separator - assert "Line 9" in result # Should keep the end - assert "not found" in tail_file("fake.txt") + assert "Line 7" in result + assert "Older entries omitted" in result def test_extract_section(tmp_path): @@ -95,7 +69,6 @@ def test_extract_section(tmp_path): test_file.write_text("## Section\nContent here.\n## Next Section\nIgnore.", encoding="utf-8") assert extract_section(str(test_file), "Section") == "## Section\nContent here." - assert "not found" in extract_section(str(test_file), "Missing Section") def test_list_directory(tmp_path): @@ -104,43 +77,18 @@ def test_list_directory(tmp_path): (tmp_path / "logo.svg").touch() result = list_directory(str(tmp_path)) - assert "- image1.png" in result - assert "- logo.svg" in result - assert "not found" in list_directory("fake_dir") - - -def test_assemble_context(): - """Ensure context builder correctly maps agents to files without crashing.""" - assert "SYSTEM MEMORY" in assemble_context("Strategy") - assert "SYSTEM MEMORY" in assemble_context("Product Spec") - assert "SYSTEM MEMORY" in assemble_context("Design") - assert "SYSTEM MEMORY" in assemble_context("Engineering") - assert "SYSTEM MEMORY" in assemble_context("Ops") + assert "image1.png" in result + assert "logo.svg" in result -# --- 4. AI SANDBOX & SECURITY TESTS --- def test_is_path_safe(): """Ensure the File I/O Sandbox correctly allows and blocks specific paths.""" assert is_path_safe(os.path.join(BASE_DIR, "src", "web", "main.tsx")) is True - assert is_path_safe(os.path.join(BASE_DIR, "tests", "api", "test_new.py")) is True - - # Blocked critical files - assert is_path_safe(os.path.join(BASE_DIR, "orchestrator.py")) is False - assert is_path_safe(os.path.join(BASE_DIR, ".env")) is False - - # Blocked hidden/infrastructure directories - assert is_path_safe(os.path.join(BASE_DIR, ".github", "workflows", "ci.yml")) is False - assert is_path_safe(os.path.join(BASE_DIR, "agents", "engineering.xml")) is False def test_run_shell_command_security(): """Ensure the shell command utility blocks unauthorized tools and shell injection.""" assert "not allowed" in run_shell_command("rm -rf /") - assert "not allowed" in run_shell_command("cat .env") - - # Block shell chaining and injection attempts - # Updated to match the new error string from our hardened sandbox - assert "prohibited" in run_shell_command("uv run pytest && ls") def test_execute_autonomous_actions(): diff --git a/tests/engine/test_cli.py b/tests/engine/test_cli.py new file mode 100644 index 0000000..fcdf1eb --- /dev/null +++ b/tests/engine/test_cli.py @@ -0,0 +1,52 @@ +import pytest + +from engine.cli import main + + +def test_cli_requires_prompt(monkeypatch, capsys): + monkeypatch.setattr("engine.cli.boot", lambda: None) + + with pytest.raises(SystemExit) as excinfo: + main([]) # No args + + captured = capsys.readouterr() + assert "Usage: python engine/cli.py 'Your prompt'" in captured.out + assert excinfo.value.code == 1 + + +def test_cli_parses_flags_and_prompt(monkeypatch): + monkeypatch.setattr("engine.cli.boot", lambda: None) + + called_args = {} + + def mock_run_os(prompt, flags): + called_args["prompt"] = prompt + called_args["flags"] = flags + + monkeypatch.setattr("engine.cli.run_os", mock_run_os) + + main(["--os-verbose", "Build a react app"]) + + assert called_args["prompt"] == "Build a react app" + assert "--os-verbose" in called_args["flags"] + + +def test_cli_reads_from_handoff(monkeypatch, tmp_path): + monkeypatch.setattr("engine.cli.boot", lambda: None) + + # Mock DOCS_DIR + import engine.cli + + monkeypatch.setattr(engine.cli, "DOCS_DIR", str(tmp_path)) + + # Create mock handoff file + ops_dir = tmp_path / "ops" + ops_dir.mkdir() + handoff_file = ops_dir / "handoff.md" + handoff_file.write_text("STATUS: PAUSED\nPROMPT: Auto-resume from handoff", encoding="utf-8") + + called_args = {} + monkeypatch.setattr("engine.cli.run_os", lambda p, f: called_args.update({"prompt": p})) + + main([]) # No args passed, should fall back to handoff file + assert called_args["prompt"] == "Auto-resume from handoff" diff --git a/tests/engine/test_llm.py b/tests/engine/test_llm.py new file mode 100644 index 0000000..852bfec --- /dev/null +++ b/tests/engine/test_llm.py @@ -0,0 +1,64 @@ +import json +from unittest.mock import MagicMock + +from engine import llm +from engine.llm import LLMClient, log_jsonl_telemetry, log_token_usage + + +def test_log_token_usage(tmp_path, monkeypatch): + monkeypatch.setattr(llm, "DOCS_DIR", str(tmp_path)) + + log_token_usage("Engineering", "litellm", "gpt-4o", 100, 50, 1.25) + + log_file = tmp_path / "ops" / "token_tracker.csv" + assert log_file.exists() + + content = log_file.read_text(encoding="utf-8") + assert "timestamp,agent,provider,model,prompt_tokens" in content + assert "Engineering,litellm,gpt-4o,100,50,1.25" in content + + +def test_log_jsonl_telemetry(tmp_path, monkeypatch): + monkeypatch.setattr(llm, "DOCS_DIR", str(tmp_path)) + + log_jsonl_telemetry( + "Design", "litellm", "gpt-4o", 10, 20, 1.5, "sys prompt", "user prompt", "output" + ) + + log_file = tmp_path / "ops" / "telemetry.jsonl" + assert log_file.exists() + + data = json.loads(log_file.read_text(encoding="utf-8").strip()) + assert data["agent"] == "Design" + assert data["response"] == "output" + assert data["prompt_tokens"] == 10 + + +def test_llm_client_smart_routing(monkeypatch): + # Bypass API key check + monkeypatch.setattr(llm, "SMART_ROUTING", True) + monkeypatch.setenv("OPENROUTER_API_KEY", "test-key") + + client = LLMClient() + + # Mock Litellm + mock_response = MagicMock() + mock_response.choices[0].message.content = "Mock LLM output" + mock_response.usage.prompt_tokens = 5 + mock_response.usage.completion_tokens = 10 + + mock_completion = MagicMock(return_value=mock_response) + monkeypatch.setattr("litellm.completion", mock_completion) + + # Prevent telemetry from actually writing to disk during this test + monkeypatch.setattr(llm, "log_token_usage", lambda *args: None) + monkeypatch.setattr(llm, "log_jsonl_telemetry", lambda *args: None) + + result = client.call("Strategy", "System rules", "User task") + + assert result == "Mock LLM output" + mock_completion.assert_called_once() + + # Assert it grabbed the correct model from MODEL_MAP for "Strategy" + called_model = mock_completion.call_args[1]["model"] + assert called_model == "openrouter/openai/gpt-4o-mini" diff --git a/tests/engine/test_runtime.py b/tests/engine/test_runtime.py new file mode 100644 index 0000000..e78809b --- /dev/null +++ b/tests/engine/test_runtime.py @@ -0,0 +1,146 @@ +import json + +import pytest + +from engine import runtime +from engine.llm import log_jsonl_telemetry +from engine.runtime import ( + check_human_pause, + execute_autonomous_actions, + extract_routing_queue, +) + + +def test_check_human_pause_returns_reason() -> None: + adr_text = "The design is complete. ADR_STATE: [Pending Human] is required." + assert check_human_pause(adr_text) == "ADR_STATE: [Pending Human]" + + circuit_text = "WARNING: CIRCUIT_BREAKER activated due to loop." + assert check_human_pause(circuit_text) == "CIRCUIT_BREAKER" + + safe_text = "The component was built successfully. ROUTING: [Ops]" + assert check_human_pause(safe_text) is None + + +def test_extract_routing_queue() -> None: + assert extract_routing_queue("ROUTING: [Spec -> Engineering -> Ops]") == [ + "Spec", + "Engineering", + "Ops", + ] + assert extract_routing_queue("ROUTING: [Engineering]") == ["Engineering"] + assert extract_routing_queue("ROUTING: [None]") == [] + assert extract_routing_queue("ROUTING: [Experiment]") == [] + assert extract_routing_queue("Some random text without routing") is None + + +def test_execute_autonomous_actions_invalid_json() -> None: + bad_response = "```json\n { this is not valid json } \n```" + result = execute_autonomous_actions(bad_response) + assert result is not None + assert "ERROR: The OS failed to parse" in result + + +def test_execute_autonomous_actions_no_json() -> None: + assert execute_autonomous_actions("I am just talking with no code blocks.") is None + + +def test_execute_autonomous_actions_success(monkeypatch) -> None: + calls = [] + + # Patch the tools directly inside the runtime module where they are executed + monkeypatch.setattr( + runtime, "write_file", lambda p, c: calls.append(("write", p, c)) or "[SUCCESS: write]" + ) + monkeypatch.setattr( + runtime, "append_file", lambda p, c: calls.append(("append", p, c)) or "[SUCCESS: append]" + ) + monkeypatch.setattr( + runtime, "run_shell_command", lambda cmd: calls.append(("run", cmd)) or "[SUCCESS: run]" + ) + monkeypatch.setattr(runtime, "auto_lint_file", lambda p: None) + json_payload = """```json + { + "write_files": [{"path": "test.txt", "content": "hello"}], + "append_to_file": [{"path": "test.txt", "content": " world"}], + "run_commands": ["npm run format"] + } + ```""" + + result = execute_autonomous_actions(json_payload) + + assert "[SUCCESS: write]" in result + assert "[SUCCESS: append]" in result + assert "$ npm run format" in result + assert ("write", "test.txt", "hello") in calls + assert ("append", "test.txt", " world") in calls + assert ("run", "npm run format") in calls + + +def test_log_jsonl_telemetry(tmp_path, monkeypatch) -> None: + from engine import llm + + monkeypatch.setattr(llm, "DOCS_DIR", str(tmp_path / "docs")) + + log_jsonl_telemetry( + "Engineering", "litellm", "gpt-4o", 10, 20, 1.5, "sys", "usr", "response_text" + ) + + log_file = tmp_path / "docs" / "ops" / "telemetry.jsonl" + assert log_file.exists() + + content = log_file.read_text(encoding="utf-8").strip() + data = json.loads(content) + + assert data["agent"] == "Engineering" + assert data["response"] == "response_text" + + +def test_check_dependencies(monkeypatch): + """Ensure the boot checker accurately halts the OS if node_modules or .venv are missing.""" + # Test Failure Path + monkeypatch.setattr("os.path.exists", lambda p: False) + with pytest.raises(SystemExit): + runtime.check_dependencies() + + # Test Success Path + monkeypatch.setattr("os.path.exists", lambda p: True) + runtime.check_dependencies() # Should not raise an error + + +def test_assemble_context_branches(monkeypatch, tmp_path): + """Dynamically test every context injection branch to ensure it doesn't crash.""" + monkeypatch.setattr(runtime, "DOCS_DIR", str(tmp_path)) + monkeypatch.setattr(runtime, "BASE_DIR", str(tmp_path)) + + # Run through all possible agent branches to hit 100% of the if/elif conditions + for agent in ["Strategy", "Spec", "Design", "Engineering", "Ops"]: + context = runtime.assemble_context(agent) + assert isinstance(context, str) + assert "--- SYSTEM MEMORY ---" in context + + +def test_run_os_execution_loop(monkeypatch, tmp_path): + """Test the entire main execution loop without actually calling Claude.""" + monkeypatch.setattr(runtime, "DOCS_DIR", str(tmp_path)) + monkeypatch.setattr(runtime, "AGENTS_DIR", str(tmp_path)) + monkeypatch.setattr(runtime, "BASE_DIR", str(tmp_path)) + + # Create a dummy LLM that immediately routes to 'None' so the loop exits after 1 step + class DummyLLM: + def call(self, agent_name, sys_prompt, user_prompt): + return "Here is my mock analysis. ROUTING: [None]" + + monkeypatch.setattr(runtime, "LLMClient", DummyLLM) + + # 1. Test standard routing + runtime.run_os("Hello OS", ["--os-verbose"]) + + # 2. Test HOTFIX routing bypass + runtime.run_os("[HOTFIX] The database is down") + + # 3. Test TEARDOWN routing + runtime.run_os("[TEARDOWN] Remove the new feature") + + # 4. Test START OVERRIDE routing + runtime.run_os("[START: Design] Make it pretty") diff --git a/tests/engine/test_tools.py b/tests/engine/test_tools.py new file mode 100644 index 0000000..0806d51 --- /dev/null +++ b/tests/engine/test_tools.py @@ -0,0 +1,104 @@ +import os + +from engine import tools +from engine.tools import ( + BASE_DIR, + append_file, + auto_lint_file, + get_active_artifacts, + is_path_safe, + read_directory_contents, + read_file, + run_shell_command, + write_file, +) + + +def test_is_path_safe() -> None: + assert is_path_safe(os.path.join(BASE_DIR, "src", "web", "main.tsx")) is True + assert is_path_safe(os.path.join(BASE_DIR, "orchestrator.py")) is False + assert is_path_safe(os.path.join(BASE_DIR, "agents", "strategy.xml")) is False + + +def test_run_shell_command_guardrails() -> None: + assert "not allowed" in run_shell_command("rm -rf /") + assert "prohibited" in run_shell_command("npm run build && rm -rf /") + + +def test_file_io_security_blocks() -> None: + assert "Permission denied" in write_file("../../restricted.txt", "data") + assert "Permission denied" in append_file("../../restricted.txt", "data") + + +def test_deterministic_readers_file_not_found() -> None: + assert "was not found" in read_file("nonexistent_file_12345.md") + + +def test_extract_section(monkeypatch) -> None: + monkeypatch.setattr(tools, "read_file", lambda f: "## Header\nContent here\n## Next Header") + assert tools.extract_section("dummy.md", "Header") == "## Header\nContent here" + + +def test_actual_file_io(tmp_path, monkeypatch) -> None: + monkeypatch.setattr(tools, "BASE_DIR", str(tmp_path)) + docs_dir = tmp_path / "docs" + docs_dir.mkdir() + rel_path = "docs/test.md" + test_file = tmp_path / rel_path + + res = write_file(rel_path, "Hello") + assert "SUCCESS" in res + + # Actually use the test_file variable to prove it wrote! + assert test_file.read_text(encoding="utf-8") == "Hello" + + dir_res = read_directory_contents(str(docs_dir)) + assert '' in dir_res + + +def test_get_active_artifacts(tmp_path, monkeypatch) -> None: + monkeypatch.setattr(tools, "BASE_DIR", str(tmp_path)) + monkeypatch.setattr(tools, "DOCS_DIR", str(tmp_path / "docs")) + + run_path = tmp_path / "docs" / "product" / "current_run.md" + run_path.parent.mkdir(parents=True) + run_path.write_text("## Linked Artifacts\n- docs/company/thesis.md", encoding="utf-8") + + artifacts = get_active_artifacts() + assert "docs/company/thesis.md" in artifacts + + +def test_auto_lint_file_python_success(mocker): + mock_run = mocker.patch("subprocess.run") + mock_run.return_value = mocker.MagicMock(returncode=0) + result = auto_lint_file("src/api/main.py") + assert "āœ… AUTO-LINT PASSED" in result + + +def test_auto_lint_file_typescript_failure(mocker): + mock_run = mocker.patch("subprocess.run") + mock_run.return_value = mocker.MagicMock(returncode=1, stdout="Error", stderr="") + result = auto_lint_file("src/web/components/ui/button.tsx") + assert "āš ļø AUTO-LINT FAILED" in result + + +def test_run_shell_command_execution(monkeypatch): + """Ensure run_shell_command correctly executes and wraps output in XML tags.""" + import subprocess + from unittest.mock import MagicMock + + # Create a fake successful subprocess result + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = "mocked test output" + mock_result.stderr = "" + + # Intercept the real subprocess.run and return our fake result + monkeypatch.setattr(subprocess, "run", lambda *args, **kwargs: mock_result) + + # Run a whitelisted command + result = run_shell_command("pytest tests/") + + # Prove it worked and correctly wrapped the output in our Shift-Left XML tags + assert "mocked test output" in result + assert '' in result diff --git a/tests/evals/test_agents.py b/tests/evals/test_agents.py index 36edb27..b151241 100644 --- a/tests/evals/test_agents.py +++ b/tests/evals/test_agents.py @@ -2,7 +2,9 @@ import pytest -from orchestrator import LLMClient, assemble_context, extract_routing_queue, read_file +from engine.llm import LLMClient +from engine.runtime import assemble_context, extract_routing_queue +from engine.tools import read_file @pytest.fixture(scope="module") diff --git a/tests/evals/test_orchestrator.py b/tests/evals/test_orchestrator.py deleted file mode 100644 index ad80ee9..0000000 --- a/tests/evals/test_orchestrator.py +++ /dev/null @@ -1,250 +0,0 @@ -import json -import os - -import orchestrator -from orchestrator import ( - BASE_DIR, - append_file, - check_human_pause, - execute_autonomous_actions, - extract_routing_queue, - get_active_artifacts, - is_path_safe, - list_directory, - log_jsonl_telemetry, - read_directory_contents, - read_file, - run_shell_command, - tail_file, - write_file, -) - - -def test_check_human_pause_returns_reason() -> None: - adr_text = "The design is complete. ADR_STATE: [Pending Human] is required." - assert check_human_pause(adr_text) == "ADR_STATE: [Pending Human]" - - circuit_text = "WARNING: CIRCUIT_BREAKER activated due to loop." - assert check_human_pause(circuit_text) == "CIRCUIT_BREAKER" - - safe_text = "The component was built successfully. ROUTING: [Ops]" - assert check_human_pause(safe_text) is None - - -def test_is_path_safe() -> None: - assert is_path_safe(os.path.join(BASE_DIR, "src", "web", "main.tsx")) is True - assert is_path_safe(os.path.join(BASE_DIR, "docs", "product", "brief.md")) is True - assert is_path_safe(os.path.join(BASE_DIR, "render.yaml")) is True - - assert is_path_safe(os.path.join(BASE_DIR, ".env")) is False - assert is_path_safe(os.path.join(BASE_DIR, "orchestrator.py")) is False - assert is_path_safe(os.path.join(BASE_DIR, "package.json")) is False - - assert is_path_safe(os.path.join(BASE_DIR, "agents", "strategy.xml")) is False - assert is_path_safe(os.path.join(BASE_DIR, ".git", "config")) is False - - assert is_path_safe(os.path.join(BASE_DIR, "src", "..", ".env")) is False - - -def test_extract_routing_queue() -> None: - assert extract_routing_queue("ROUTING: [Spec -> Engineering -> Ops]") == [ - "Spec", - "Engineering", - "Ops", - ] - assert extract_routing_queue("ROUTING: [Engineering]") == ["Engineering"] - assert extract_routing_queue("ROUTING: [None]") == [] - assert extract_routing_queue("ROUTING: [Experiment]") == [] - assert extract_routing_queue("Some random text without routing") is None - - -def test_execute_autonomous_actions_invalid_json() -> None: - bad_response = "```json\n { this is not valid json } \n```" - result = execute_autonomous_actions(bad_response) - assert result is not None - assert "ERROR: The OS failed to parse your JSON action block" in result - - -def test_execute_autonomous_actions_no_json() -> None: - assert execute_autonomous_actions("I am just talking with no code blocks.") is None - - -def test_run_shell_command_guardrails() -> None: - assert "not allowed" in run_shell_command("rm -rf /") - assert "prohibited" in run_shell_command("npm run build && rm -rf /") - assert "prohibited" in run_shell_command("uv run pytest ; ls") - assert "prohibited" in run_shell_command("npm run dev | grep error") - - -def test_file_io_security_blocks() -> None: - assert "Permission denied" in write_file("../../restricted.txt", "data") - assert "Permission denied" in append_file("../../restricted.txt", "data") - - -def test_deterministic_readers_file_not_found() -> None: - assert "was not found" in read_file("nonexistent_file_12345.md") - assert "not found" in tail_file("nonexistent_file_12345.md") - assert "not found" in list_directory("nonexistent_dir_12345") - - -def test_extract_section(monkeypatch) -> None: - monkeypatch.setattr( - orchestrator, "read_file", lambda f: "## Header\nContent here\n## Next Header" - ) - # FIX: The regex extracts the header itself alongside the content - assert orchestrator.extract_section("dummy.md", "Header") == "## Header\nContent here" - assert "not found" in orchestrator.extract_section("dummy.md", "Missing") - - -def test_execute_autonomous_actions_success(monkeypatch) -> None: - calls = [] - - monkeypatch.setattr( - orchestrator, - "write_file", - lambda p, c: calls.append(("write", p, c)) or "[SUCCESS: write]", - ) - monkeypatch.setattr( - orchestrator, - "append_file", - lambda p, c: calls.append(("append", p, c)) or "[SUCCESS: append]", - ) - monkeypatch.setattr( - orchestrator, - "run_shell_command", - lambda cmd: calls.append(("run", cmd)) or "[SUCCESS: run]", - ) - - json_payload = """```json - { - "write_files": [{"path": "test.txt", "content": "hello"}], - "append_to_file": [{"path": "test.txt", "content": " world"}], - "run_commands": ["npm run format"] - } - ```""" - - result = execute_autonomous_actions(json_payload) - - assert "[SUCCESS: write]" in result - assert "[SUCCESS: append]" in result - assert "$ npm run format" in result - assert ("write", "test.txt", "hello") in calls - assert ("append", "test.txt", " world") in calls - assert ("run", "npm run format") in calls - - -def test_actual_file_io(tmp_path, monkeypatch) -> None: - """Test real file operations securely using Pytest's tmp_path.""" - monkeypatch.setattr(orchestrator, "BASE_DIR", str(tmp_path)) - - docs_dir = tmp_path / "docs" - docs_dir.mkdir() - - rel_path = "docs/test.md" - test_file = tmp_path / rel_path - - # Test write_file - res = write_file(rel_path, "Hello") - assert "SUCCESS" in res - assert test_file.read_text(encoding="utf-8") == "Hello" - - # Test append_file - res = append_file(rel_path, "World") - assert "SUCCESS" in res - assert test_file.read_text(encoding="utf-8") == "Hello\nWorld\n" - - # Test read_file - assert read_file(str(test_file)) == "Hello\nWorld\n" - - # Test tail_file - long_content = "\n".join([f"Line {i}" for i in range(100)]) - test_file.write_text(long_content, encoding="utf-8") - tail_res = tail_file(str(test_file), lines=5) - assert "Line 99" in tail_res - assert "Older entries omitted" in tail_res - - # Test list_directory - list_res = list_directory(str(docs_dir)) - assert "- test.md" in list_res - - # Test read_directory_contents - dir_res = read_directory_contents(str(docs_dir)) - # SHIFT-LEFT: Updated test to check for the new XML caching tags - assert '' in dir_res - - -def test_get_active_artifacts(tmp_path, monkeypatch) -> None: - """Test artifact regex extraction from current_run.md.""" - monkeypatch.setattr(orchestrator, "BASE_DIR", str(tmp_path)) - monkeypatch.setattr(orchestrator, "DOCS_DIR", str(tmp_path / "docs")) - - run_path = tmp_path / "docs" / "product" / "current_run.md" - run_path.parent.mkdir(parents=True) - run_path.write_text( - "## Linked Artifacts\n- docs/company/thesis.md\n- docs/product/flows.md\n## Next", - encoding="utf-8", - ) - - artifacts = get_active_artifacts() - assert "docs/company/thesis.md" in artifacts - assert "docs/product/flows.md" in artifacts - - -def test_log_jsonl_telemetry(tmp_path, monkeypatch) -> None: - """Ensure full execution telemetry is written to JSONL for observability.""" - monkeypatch.setattr(orchestrator, "DOCS_DIR", str(tmp_path / "docs")) - - log_jsonl_telemetry( - "Engineering", "litellm", "gpt-4o", 10, 20, 1.5, "sys", "usr", "response_text" - ) - - log_file = tmp_path / "docs" / "ops" / "telemetry.jsonl" - assert log_file.exists() - - content = log_file.read_text(encoding="utf-8").strip() - data = json.loads(content) - - assert data["agent"] == "Engineering" - assert data["response"] == "response_text" - assert data["prompt_tokens"] == 10 - - -def test_auto_lint_file_python_success(mocker): - """Ensure the Forge auto-linter correctly triggers ruff for Python files and passes.""" - from orchestrator import auto_lint_file - - mock_run = mocker.patch("subprocess.run") - # Simulate a successful ruff check (exit code 0) - mock_run.return_value = mocker.MagicMock(returncode=0) - - result = auto_lint_file("src/api/main.py") - - assert "āœ… AUTO-LINT PASSED" in result - mock_run.assert_called_once() - - # Prove it specifically chose the Python linter - called_command = mock_run.call_args[0][0] - assert "ruff" in called_command - assert "check" in called_command - - -def test_auto_lint_file_typescript_failure(mocker): - """Ensure the Forge auto-linter triggers biome for TS files and catches syntax errors.""" - from orchestrator import auto_lint_file - - mock_run = mocker.patch("subprocess.run") - # Simulate a failed biome check (exit code 1) - mock_run.return_value = mocker.MagicMock( - returncode=1, stdout="Expected an identifier, but found '}'", stderr="" - ) - - result = auto_lint_file("src/web/components/ui/button.tsx") - - assert "āš ļø AUTO-LINT FAILED" in result - assert "Expected an identifier" in result - mock_run.assert_called_once() - - # Prove it specifically chose the Frontend linter - called_command = mock_run.call_args[0][0] - assert "biome" in called_command - assert "check" in called_command diff --git a/tests/evals/ast_validator.test.ts b/tests/ts/ast_validator.test.ts similarity index 100% rename from tests/evals/ast_validator.test.ts rename to tests/ts/ast_validator.test.ts diff --git a/uv.lock b/uv.lock index 9f9cf16..1e1ace5 100644 --- a/uv.lock +++ b/uv.lock @@ -1420,6 +1420,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/9d/7a/d968e294073affff457b041c2be9868a40c1c71f4a35fcc1e45e5493067b/pytest_cov-7.1.0-py3-none-any.whl", hash = "sha256:a0461110b7865f9a271aa1b51e516c9a95de9d696734a2f71e3e78f46e1d4678", size = 22876, upload-time = "2026-03-21T20:11:14.438Z" }, ] +[[package]] +name = "pytest-mock" +version = "3.15.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/68/14/eb014d26be205d38ad5ad20d9a80f7d201472e08167f0bb4361e251084a9/pytest_mock-3.15.1.tar.gz", hash = "sha256:1849a238f6f396da19762269de72cb1814ab44416fa73a8686deac10b0d87a0f", size = 34036, upload-time = "2025-09-16T16:37:27.081Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5a/cc/06253936f4a7fa2e0f48dfe6d851d9c56df896a9ab09ac019d70b760619c/pytest_mock-3.15.1-py3-none-any.whl", hash = "sha256:0a25e2eb88fe5168d535041d09a4529a188176ae608a6d249ee65abc0949630d", size = 10095, upload-time = "2025-09-16T16:37:25.734Z" }, +] + [[package]] name = "python-dotenv" version = "1.2.2" @@ -1807,6 +1819,7 @@ dev = [ { name = "pytest" }, { name = "pytest-bdd" }, { name = "pytest-cov" }, + { name = "pytest-mock" }, { name = "ruff" }, ] @@ -1824,6 +1837,7 @@ dev = [ { name = "pytest", specifier = ">=8.0.0" }, { name = "pytest-bdd", specifier = ">=7.0.0" }, { name = "pytest-cov", specifier = ">=5.0.0" }, + { name = "pytest-mock", specifier = ">=3.15.1" }, { name = "ruff", specifier = ">=0.3.0" }, ]