A reference implementation of a production-grade multi-agent pipeline built on LangGraph. Takes a plain-English data transformation request (or a screenshot) and delivers a tested, documented, sandboxed Python script.
The primary goal is engineering: demonstrating LangGraph's Send API for parallel fan-out, Annotated reducers for concurrent state, Docker SDK sandboxing, model tiering, and multimodal input — all wired together in one coherent system.
flowchart TD
A([User Input\ntext or image]) --> B{route_after_input}
B -->|image| C[Vision Agent\nSonnet]
B -->|text| D
C --> D[Orchestrator Agent\nSonnet]
D -->|Send API — parallel fan-out| E
D -->|Send API — parallel fan-out| F
D -->|Send API — parallel fan-out| G
D -->|Send API — parallel fan-out| H
subgraph Parallel ["⚡ Parallel Execution"]
E[Schema Agent\nHaiku]
F[Code Agent\nSonnet]
G[Test Agent\nHaiku]
H[Docs Agent\nHaiku]
end
E --> I[Merge\nAnnotated reducer]
F --> I
G --> I
H --> I
I --> J[Assemble\nfile bundle]
J --> K[Sandbox Executor\nDocker SDK]
K -->|success| L[Review\ninterrupt — HITL]
K -->|failure| M[Code Fix\n1 auto-retry]
M --> J
L -->|approved| N([Delivered])
L -->|rejected| O([Aborted])
Interactive architecture diagram (pipeline, AgentState, model tiering, tech stack) available at
http://localhost:8000/static/architecture.htmlwhen running locally.
- Send API for parallel fan-out. The orchestrator dispatches four specialist agents simultaneously via
Send. Total latency ismax(agents)notsum(agents). This is the right pattern for any pipeline with independent specialist work. - Annotated reducer for concurrent writes. All four agents write to the same
AgentState. Without a reducer, last-writer-wins drops data.Annotated[list[dict], operator.add]accumulates outputs safely; the merge node unpacks by discriminator field.
agent_outputs: Annotated[list[dict], operator.add]- Model tiering. Sonnet handles reasoning-heavy nodes (orchestrator, code, vision). Haiku handles mechanical work (schema inference, test generation, docs). Three of five agents run on Haiku — significant cost reduction with no quality loss on deterministic tasks.
- Docker SDK over subprocess. Each execution gets an ephemeral container via
client.containers.run()withmem_limit,cpu_quota,network_mode="none", andpids_limit. No persistent state between runs, no network egress, hard timeout. This is the same pattern as production code execution services. - No
ge/leon LLM output schemas. Anthropic's structured output format rejects JSON Schemaminimum/maximumkeywords on number fields. Numeric bounds are encoded in field descriptions instead.
| Layer | Technology |
|---|---|
| Orchestration | LangGraph — StateGraph, Send API, interrupt(), MemorySaver |
| LLM | Claude Sonnet 4.5 + Haiku 4.5 via OpenRouter |
| Serving | FastAPI + SSE (astream_events v2) |
| Sandboxing | Docker Python SDK — containers.run() with resource limits |
| Data generation | Faker + Polars — schema-aware synthetic CSV |
| Schemas | Pydantic v2 throughout — all agent I/O typed |
| Config | pydantic-settings with env-prefix namespacing |
| Observability | Langfuse (self-hosted) — parallel spans, session tracking |
| Prompts | Jinja2 templates |
| Tooling | uv, ruff, pytest — 50 tests, 0 lint errors |
Prerequisites: Python 3.12+, uv, Docker Desktop, OpenRouter API key.
git clone https://github.com/shreyabaid007/multi-agent-script-generator.git
cd multi-agent-script-generator
uv sync
cp .env.example .env # add ANTHROPIC_API_KEY (OpenRouter key)
docker build -t script-sandbox:latest ./sandbox/
uv run serve # http://localhost:8000Optional — Langfuse tracing:
docker compose up -d # UI at http://localhost:3000Set LANGFUSE_ENABLED=false in .env to skip.
Generate (SSE stream):
curl -N -X POST http://localhost:8000/generate \
-H 'Content-Type: application/json' \
-d '{"description": "Read a file with columns product and price. Add a discounted_price column at 10% off."}'Resume after HITL pause (thread_id from done event):
curl -X POST http://localhost:8000/resume \
-H 'Content-Type: application/json' \
-d '{"thread_id": "<id>", "approved": true}'Image input: add "image_b64": "<base64-encoded PNG>" to the generate request body.
SSE event types: node_start · node_end · stream · interrupt · error · done
uv run ruff check . && uv run ruff format . && uv run pytestOpen an issue before submitting a PR for non-trivial changes.
MIT © 2026 Shreya Baid