AgenticFleet combines Microsoft's agent-framework with DSPy's intelligent prompt optimization to create self-improving multi-agent workflows.
graph TB
subgraph Client[Clients]
UI[React/Vite Frontend]
CLI[CLI: agentic-fleet]
end
subgraph Backend[FastAPI Backend]
HTTP[HTTP API: /api/*]
WS[WebSocket: /api/ws/chat]
CHAT[Chat WebSocket Service]
end
subgraph Workflow[Supervisor Workflow (agent-framework)]
A[Analysis]
R[Routing]
E[Execution]
P[Progress]
Q[Quality]
end
subgraph Intelligence[DSPy Intelligence Layer]
REASONER[DSPyReasoner (cached, offline-compiled)]
SIGS[Typed signatures + assertions]
end
subgraph Runtime[Agents + Tools]
AGENTS[ChatAgent instances]
MODEL[OpenAIResponsesClient]
TOOLS[ToolRegistry + tools]
end
subgraph StateObs[State & Observability]
CONV[(Conversations store)]
THREAD[(AgentThread / multi-turn state)]
CKPT[(Checkpoint storage)]
HIST[(Execution history JSONL)]
OTEL[(OpenTelemetry tracing)]
end
UI -->|REST| HTTP
UI -->|stream events| WS
WS --> CHAT
CHAT -->|new run: message (+ enable_checkpointing)| Workflow
CHAT -->|resume: checkpoint_id only| Workflow
CHAT --> CONV
CHAT --> THREAD
CHAT --> CKPT
CLI -->|run / run_stream| Workflow
A --> REASONER
R --> REASONER
P --> REASONER
Q --> REASONER
REASONER --> SIGS
E --> AGENTS
AGENTS --> MODEL
AGENTS --> TOOLS
Workflow --> HIST
Workflow --> OTEL
┌─────────────────────────────────────────────────────────────┐
│ Entry Point │
├─────────────────────────────────────────────────────────────┤
│ cli/console.py (CLI) │
└──────────────────────────┬───────────────────────────────────┘
│
┌────────────▼──────────┐
│ SupervisorWorkflow │
│ (Orchestrator) │
└────────────┬──────────┘
│
┌──────────────────┼──────────────────┐
│ │ │
┌───────▼──────┐ ┌─────────▼────┐ ┌──────────▼──────┐
│ DSPyReasoner │ │ Agents │ │ ToolRegistry │
│ (Routing) │ │ (Execute) │ │ (Metadata) │
└───────┬──────┘ └──────────────┘ └─────────────────┘
│ ▲
┌───────▼──────┐ │
│ Signatures │ │
│ (Prompts) │ │
└───────┬──────┘ │
│ │
┌───────▼──────────────────┴──────┐
│ Offline Layer │
│ (Cached Modules & Optimization) │
└─────────────────────────────────┘
- Task Input → Console receives user task
- DSPy Analysis → Reasoner (using cached module) analyzes task
- DSPy Routing → Reasoner routes task to appropriate agents
- Agent Execution → Agents execute in parallel/sequential/delegated mode
- Progress Evaluation → DSPy evaluates execution progress
- Quality Assessment → DSPy evaluates output quality
- History Persistence → Execution saved via
BridgeMiddleware
Note: The Judge/Refinement phase was removed in v0.6.6 for ~66% latency improvement (from ~6 min to ~2 min for complex queries).
Operational note (concurrency): Per-request
reasoning_effortcan require mutating agent client state inSupervisorWorkflow._apply_reasoning_effort. To avoid cross-request interference under concurrent WebSocket sessions, the WebSocket service initializes a fresh SupervisorWorkflow per socket session (isolating agent instances).
The workflow is built entirely on agent-framework primitives:
graph TB
subgraph "Entry Point"
CLI[cli/console.py]
API[API Entry Points]
end
subgraph "Workflow Creation"
FACTORY[create_supervisor_workflow]
BUILDER["WorkflowBuilder<br/>agent-framework"]
CONTEXT["SupervisorContext<br/>DSPy + Agents + Tools"]
end
subgraph "Agent-Framework Executors"
AE["AnalysisExecutor<br/>extends Executor"]
RE["RoutingExecutor<br/>extends Executor"]
EE["ExecutionExecutor<br/>extends Executor"]
PE["ProgressExecutor<br/>extends Executor"]
QE["QualityExecutor<br/>extends Executor"]
end
subgraph "DSPy Intelligence Layer"
DSPY[DSPyReasoner]
SIGS["Enhanced Signatures<br/>EnhancedTaskRouting<br/>JudgeEvaluation"]
end
subgraph "Agent Execution"
CA["ChatAgent<br/>agent-framework"]
OAI["OpenAIResponsesClient<br/>agent-framework.openai"]
TOOLS[Tool Registry]
end
subgraph "Event Streaming"
MAE["MagenticAgentMessageEvent<br/>agent-framework"]
WOE["WorkflowOutputEvent<br/>agent-framework"]
CM["ChatMessage + Role<br/>agent-framework"]
end
CLI --> FACTORY
API --> FACTORY
FACTORY --> BUILDER
FACTORY --> CONTEXT
BUILDER -->|set_start_executor| AE
BUILDER -->|add_edge| RE
BUILDER -->|add_edge| EE
BUILDER -->|add_edge| PE
BUILDER -->|add_edge| QE
AE -->|uses| DSPY
RE -->|uses| DSPY
PE -->|uses| DSPY
QE -->|uses| DSPY
DSPY -->|uses| SIGS
EE -->|creates| CA
CA -->|uses| OAI
CA -->|uses| TOOLS
AE -->|emits| MAE
RE -->|emits| MAE
EE -->|emits| MAE
PE -->|emits| MAE
QE -->|emits| WOE
MAE -->|contains| CM
WOE -->|contains| CM
style BUILDER fill:#e1f5ff
style AE fill:#fff4e1
style RE fill:#fff4e1
style EE fill:#fff4e1
style PE fill:#fff4e1
style QE fill:#fff4e1
style CA fill:#e8f5e9
style OAI fill:#e8f5e9
style MAE fill:#fce4ec
style WOE fill:#fce4ec
style CM fill:#fce4ec
Note:
JudgeRefineExecutorwas removed in v0.6.6 for latency optimization. The workflow now terminates atQualityExecutor.
Key Agent-Framework Components:
- WorkflowBuilder (blue): Constructs the executor graph with
.set_start_executor()and.add_edge() - Executors (orange): All 5 fleet executors extend
agent_framework.Executorand use@handlerdecorator - ChatAgent (green): Created via
agent_framework.ChatAgentwithOpenAIResponsesClient - Events (pink):
MagenticAgentMessageEventandWorkflowOutputEventwithChatMessage+Roleenum
AgenticFleet uses the OpenAI Response format throughout, ensuring compatibility with agent-framework's event system:
ChatMessage + Role Enum:
from agent_framework._types import ChatMessage, Role
# Used in event streaming
MagenticAgentMessageEvent(
agent_id="fleet",
message=ChatMessage(
role=Role.ASSISTANT, # Uses Role enum (SYSTEM, USER, ASSISTANT)
text="Processing task...",
),
)OpenAIResponsesClient:
from agent_framework.openai import OpenAIResponsesClient
# Used for all ChatAgent instances
chat_client = OpenAIResponsesClient(
model_id=model_id,
api_key=api_key,
reasoning_effort=reasoning_effort,
reasoning_verbosity=reasoning_verbosity,
temperature=temperature,
max_tokens=max_tokens,
)
agent = ChatAgent(
name=agent_name,
chat_client=chat_client, # OpenAIResponsesClient (not OpenAIChatClient)
instructions=instructions,
tools=tools_param,
)Why OpenAIResponsesClient:
- ✅ Structured output support with better type safety
- ✅ Access to newer OpenAI Responses API features
- ✅ Better integration with Pydantic models for tool responses
- ✅ Official recommended pattern for agent applications
- ✅ Proper
ChatMessage+Roleenum support for event streaming
graph TD
A[Task input] --> B[DSPy analysis]
B --> C[DSPy routing]
C --> D1[Agent execution delegated]
C --> D2[Agent execution sequential]
C --> D3[Agent execution parallel]
D1 --> E[Quality assessment]
D2 --> E
D3 --> E
E --> F[Final output]
U[User] -->|agentic-fleet run -m "..."| CLI[CLI (agentic-fleet)]
CLI -->|run_stream(message)| WF[Supervisor Workflow]
WF -->|execute phases + tool calls| AG[Agents/Tools]
AG -->|results| WF
WF -->|StreamEvent| CLI
CLI -->|Render output/progress| U
WF -->|final output| CLI
CLI -->|exit 0| U
sequenceDiagram
participant U as User
participant CLI as CLI (agentic-fleet)
participant WF as Supervisor Workflow
participant AG as Agents/Tools
U->>CLI: agentic-fleet run -m "..."
CLI->>WF: run_stream(message)
WF->>AG: execute phases + tool calls
loop Stream events
WF-->>CLI: StreamEvent
CLI-->>U: Render output/progress
end
WF-->>CLI: final output
CLI-->>U: exit 0
Entry point:
cli/console.pyprovides the Typer CLI used to start workflows.
All workflow orchestration uses agent-framework primitives:
| Component | Agent-Framework Usage | Location |
|---|---|---|
| Workflow Graph | WorkflowBuilder().set_start_executor().add_edge() |
workflows/builder.py:79-87 |
| Executors | All extend agent_framework.Executor with @handler |
workflows/executors/ |
| Workflow Runtime | workflow_builder.build().as_agent() → WorkflowAgent |
workflows/supervisor.py:275-276 |
| Execution | workflow_agent.run() / run_stream() |
workflows/supervisor.py:66, 116 |
| Chat Agents | ChatAgent with OpenAIResponsesClient |
agents/coordinator.py:96-115 |
| Events | MagenticAgentMessageEvent, WorkflowOutputEvent |
workflows/supervisor.py:118-120 |
| Messages | ChatMessage(role=Role.ASSISTANT, text=...) |
workflows/strategies.py |
Code Examples:
# 1. Build workflow graph
workflow_builder = (
WorkflowBuilder()
.set_start_executor(analysis_executor)
.add_edge(analysis_executor, routing_executor)
# ... more edges
)
# 2. Create runtime agent
workflow = workflow_builder.build()
workflow_agent = workflow.as_agent()
# 3. Execute workflow
result = await workflow_agent.run(task_msg)
async for event in workflow_agent.run_stream(task_msg):
if isinstance(event, MagenticAgentMessageEvent):
yield event-
src/agentic_fleet/workflows/- Orchestration logicsupervisor.py- Main entry point and workflow runtimebuilder.py- WorkflowBuilder configurationexecutors/- Phase executors (Analysis, Routing, Execution, Progress, Quality)strategies.py- Execution strategies (delegated, sequential, parallel)handoff.py- Handoff logiccontext.py-SupervisorContextdefinitionmodels.py- Shared data modelshelpers/- Execution helpers and thread utilitiesinitialization.py- Bootstraps supervisor contexts and cachesexceptions.py- Custom exceptions
-
src/agentic_fleet/dspy_modules/- DSPy integrationreasoner.py-DSPyReasonerorchestrating analysis, routing, progress, and qualitysignatures.py- Core DSPy signatures:TaskAnalysis,TaskRouting,QualityAssessment,ProgressEvaluationtyped_models.py- Pydantic output models for typed signaturesassertions.py- DSPy assertions/constraintsgepa/- GEPA optimization modulesnlu.py+nlu_signatures.py- NLU endpoints and signatures
-
src/agentic_fleet/agents/- Agent layercoordinator.py-AgentFactoryand agent creationprompts.py- Prompt templateshelpers.py- Agent helper utilities
-
src/agentic_fleet/cli/- Command-line interface (modular structure)cli/console.py- Minimal Typer app (~61 lines) that registers commandsrunner.py-WorkflowRunnerfor executing workflowsdisplay.py- Rich console display utilitiesutils.py- CLI helper functions (tracing, resource resolution)commands/- Individual command modulesrun.py- Run workflow commandhandoff.py- Handoff exploration commandanalyze.py- Task analysis commandbenchmark.py- Performance benchmarking commandagents.py- List agents commandhistory.py- Export history commandoptimize.py- GEPA optimization commandimprove.py- Self-improvement commandevaluate.py- Batch evaluation command
-
src/agentic_fleet/utils/- Utilities (organized into subpackages)cfg/- Configuration utilitiesconfig_loader.py- YAML/environment configuration loadingconfig_schema.py- Pydantic configuration validation
infra/- Infrastructure concernstracing.py- OpenTelemetry tracing integrationresilience.py- Circuit breakers, retries, fault tolerancetelemetry.py- Metrics and telemetry collectionlogging.py- Structured logging setup
storage/- Data persistencecosmos.py- Azure Cosmos DB integrationpersistence.py- Local file persistencehistory_manager.py- Execution history management
- Root utilities:
compiler.py- DSPy compilation with cachingdspy_manager.py- DSPy settings and LM managementgepa_optimizer.py- GEPA optimization utilitiesmodels.py- Data models and type definitionsself_improvement.py- Self-improvement enginetool_registry.py- Tool metadata registrycache.py- TTL cache utilitiesconstants.py- Centralized constants and defaultsasync_compiler.py- Async compilation utilities
-
src/tools/- Tool implementationstavily_tool.py- Tavily web search tooltavily_mcp_tool.py- Tavily MCP tool adapterbrowser_tool.py- Browser automation toolhosted_code_adapter.py- Hosted code interpreter adapter
-
src/evaluation/- Evaluation frameworkevaluator.py- Batch evaluation enginemetrics.py- Evaluation metrics computation
Single agent handles entire task end-to-end.
Task flows through agents in order, output of one becomes input of next.
Multiple agents work simultaneously on subtasks, results are synthesized.
Agents participate in a multi-turn group chat orchestrated by DSPyGroupChatManager. The DSPyReasoner dynamically selects the next speaker based on conversation history.
The framework uses DSPy for:
- Task Analysis: Understanding task complexity and requirements
- Task Routing: Intelligent agent selection and mode selection
- Quality Assessment: Evaluating output quality
- Progress Evaluation: Monitoring execution progress
DSPy modules are compiled using BootstrapFewShot optimization with training examples from data/supervisor_examples.json.
Tools are registered in the ToolRegistry and made available to DSPy modules for:
- Tool-aware routing decisions with concise tool descriptions and latency hints (
low|medium|high) - Pre-analysis tool usage (e.g., web search for context) with TTL-cached results to reduce repeated network calls
- Tool requirement identification and a compact, ReAct-style tool plan emitted by the enhanced routing signature
- Enhanced
EnhancedTaskRoutingsignature outputs:tool_plan: ordered list of tools to usetool_goals: short justification/goals for tool uselatency_budget:low|medium|highguidance
- Reasoner helper
decide_tools(task, team, current_context)provides a compact tool plan to execution. - Per-phase timings (analysis, routing, progress) recorded in
phase_timings; warnings logged when exceedingslow_execution_threshold.
Configuration is loaded from src/agentic_fleet/config/workflow_config.yaml and validated using Pydantic schemas. Environment variables override YAML settings.
Execution history is saved in JSONL format (preferred) or JSON format (legacy). History manager supports:
- Automatic rotation (keep last N entries)
- Statistics generation
- Format conversion
DSPy compilation results are cached with:
- Version-based invalidation
- File modification time checking
- Metadata tracking
The codebase has been refactored to improve maintainability and reduce complexity:
-
Fleet Workflow Architecture: Workflow implemented via agent-framework
WorkflowBuilderand executors:workflows/flattened structure (executors, strategies, logic in single level)workflows/supervisor.pyis the main entry point- Execution strategies in
workflows/strategies.py - Executors in
workflows/executors/ - Shared typed models in
workflows/models.py - Shared utilities in
workflows/utils.py
-
CLI Modularization: Commands separated into individual modules in
cli/commands/:- Each command is self-contained with its own Typer app
cli/console.pyreduced to ~61 lines, focusing on registration- Better separation of concerns and easier testing
-
Benefits:
- Reduced code duplication through shared execution strategies
- Improved testability with focused modules
- Better maintainability with clear separation of concerns
- Easier to extend with new execution modes or quality metrics
Custom exception hierarchy:
WorkflowError- Base exceptionAgentExecutionError- Agent failuresRoutingError- Routing failuresConfigurationError- Config validation failuresHistoryError- History operation failures
Typical slow phases and tuning guidance:
- DSPy compilation on first run
- Use cached compiled reasoner on subsequent runs; clear via
scripts/manage_cache.py- Reduce GEPA effort in
src/agentic_fleet/config/workflow_config.yamle.g.gepa_max_metric_calls,max_bootstrapped_demos - Temporarily set
DSPY_COMPILE=falsefor rapid iteration
- Reduce GEPA effort in
- External tool calls and network latency
- Prefer lighter Reasoner model e.g.
dspy.model: gpt-5-mini - Disable pre-analysis tool usage for simple tasks
- Prefer lighter Reasoner model e.g.
- Parallel fan-out synthesis
- Cap
execution.max_parallel_agentsto a small number - Enable streaming to surface progress early
- Cap
- History and tracing I/O
- Reduce logging verbosity in production
- Batch writes or use buffered logging handlers
Measure and diagnose latency using history analytics:
uv run python -m agentic_fleet.scripts.analyze_history --timingFocus improvements on compilation time, external API latency, and minimizing unnecessary refinement rounds.