-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconductor_server.py
More file actions
73 lines (62 loc) · 3.38 KB
/
conductor_server.py
File metadata and controls
73 lines (62 loc) · 3.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
"""
MCP Server exposing the Conductor Agent functionality.
"""
import asyncio
from agents import Agent, Runner
from mcp.server.fastmcp import FastMCP
# Import the sub-agents used by the Conductor
from search_agent import search_agent
from classify_agent import classify_agent
from compose_agent import compose_agent
# Define the Conductor Agent (originally test_agent in main.py)
# This agent uses the other agents as tools to perform the orchestration.
conductor_agent = Agent(
name="Conductor",
instructions="You are the mighty orchestrator. Your workflow is as follows: "
"1. Use the search tool to find the list of events in the city provided by the user input. "
"2. For each event found, use the classifier tool to determine the most relevant group of people interested in it. "
"3. For each event and its corresponding audience group, use the compose tool to generate a push notification. "
"4. Format the final output for EACH processed event as follows: "
" Line 1: The event name"
" Line 2: The determined audience group name"
" Line 3: The generated push notification text"
"Separate the entries for each event with a single empty line.",
tools=[
search_agent.as_tool(tool_name="search", tool_description="Searches for events in a given city and returns a list of event details."),
classify_agent.as_tool(tool_name="classifier", tool_description="Takes event details (and potentially user PII from the prompt context) and determines the target audience groups."),
compose_agent.as_tool(tool_name="compose", tool_description="Generates a push notification text for a given event name and target audience group.")
],
model="gpt-4.1", # Ensure consistency or use appropriate model
)
# Initialize the MCP Server
mcp = FastMCP("Conductor Event Pipeline Server")
@mcp.tool(description="Runs a complete pipeline to find events in a specified city, "
"classify their target audience, and generate tailored push notifications. "
"Use this when a user wants to discover events and get notifications ready "
"for different groups.")
async def run_event_pipeline(city: str) -> str:
"""
Runs the full event processing pipeline for a given city.
1. Searches for events in the city.
2. Classifies the audience for each event.
3. Composes a push notification for each event and audience pair.
Args:
city: The name of the city to search for events in.
Returns:
A string containing the formatted results (event name, group, notification)
for each event, separated by empty lines.
"""
print(f"Received request to run event pipeline for city: {city}") # Server-side log
try:
# Use the Runner to execute the conductor_agent with the city name as input
# The agent's instructions will guide the LLM to use the configured tools.
result = await Runner.run(conductor_agent, city)
print(f"Pipeline result: {result.final_output}") # Server-side log
return result.final_output
except Exception as e:
print(f"Error running event pipeline for city {city}: {e}")
# Depending on requirements, might return an error message or raise exception
return f"An error occurred while processing events for {city}: {str(e)}"
if __name__ == "__main__":
# Run the MCP server (likely via stdio by default with mcp-sdk)
mcp.run()