Skip to content

Commit a1a80de

Browse files
Merge pull request #67 from mainframecomputer/bugfix/MFE-524-fix-conduct-tool-and-json-handling
Bugfix/mfe 524 fix conduct tool and json handling
2 parents fe329ac + 24e62f8 commit a1a80de

6 files changed

Lines changed: 280 additions & 225 deletions

File tree

packages/python/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "mainframe-orchestra"
3-
version = "0.0.28"
3+
version = "0.0.29"
44
description = "Mainframe-Orchestra is a lightweight, open-source agentic framework for building LLM based pipelines and self-orchestrating multi-agent teams"
55
authors = [
66
"Mainframe Computer Inc. <hi@mainfra.me>",

packages/python/src/mainframe_orchestra/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"""
44
# Copyright 2024 Mainframe-Orchestra Contributors. Licensed under Apache License 2.0.
55

6-
__version__ = "0.0.28"
6+
__version__ = "0.0.29"
77

88
import importlib
99

packages/python/src/mainframe_orchestra/llm.py

Lines changed: 44 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
)
5757

5858
from .utils.braintrust_utils import wrap_openai
59+
from .utils.parse_json_response import parse_json_response
5960

6061
# Import the configured logger
6162
from .utils.logging_config import logger
@@ -125,53 +126,16 @@ def set_verbosity(value: Union[str, bool, int]):
125126
logger.setLevel(logging.WARNING)
126127

127128

128-
def parse_json_response(response: str) -> dict:
129-
"""
130-
Parse a JSON response, handling potential formatting issues.
131-
132-
Args:
133-
response (str): The JSON response string to parse.
134-
135-
Returns:
136-
dict: The parsed JSON data.
137-
138-
Raises:
139-
ValueError: If the JSON cannot be parsed after multiple attempts.
140-
"""
141-
# First attempt: Try to parse the entire response
142-
try:
143-
return json.loads(response)
144-
except json.JSONDecodeError:
145-
# Second attempt: Find the first complete JSON object
146-
json_pattern = r"(\{(?:[^{}]|(?:\{[^{}]*\}))*\})"
147-
json_matches = re.finditer(json_pattern, response, re.DOTALL)
148-
149-
for match in json_matches:
150-
try:
151-
result = json.loads(match.group(1))
152-
# Validate it's a dict and has expected structure
153-
if isinstance(result, dict):
154-
return result
155-
except json.JSONDecodeError:
156-
continue
157-
158-
# Third attempt: Try to cleave strings before and after JSON
159-
cleaved_json = response.strip().lstrip("`").rstrip("`")
160-
try:
161-
return json.loads(cleaved_json)
162-
except json.JSONDecodeError as e:
163-
logger.warning(f"All JSON parsing attempts failed: {e}")
164-
raise ValueError(f"Invalid JSON structure: {e}")
165-
166-
167129
class OpenAICompatibleProvider:
168130
"""
169131
Base class for handling OpenAI-compatible API providers.
170132
This handles providers that use the OpenAI API format but with different base URLs.
171133
"""
172134

173135
@staticmethod
174-
async def _prepare_image_data(image_data: Union[str, List[str]], provider_name: str) -> Union[str, List[str]]:
136+
async def _prepare_image_data(
137+
image_data: Union[str, List[str]], provider_name: str
138+
) -> Union[str, List[str]]:
175139
"""Prepare image data according to provider requirements"""
176140
if not image_data:
177141
return image_data
@@ -184,7 +148,7 @@ async def _prepare_image_data(image_data: Union[str, List[str]], provider_name:
184148
# Download and convert URL to base64
185149
response = requests.get(img)
186150
response.raise_for_status()
187-
base64_data = base64.b64encode(response.content).decode('utf-8')
151+
base64_data = base64.b64encode(response.content).decode("utf-8")
188152

189153
if provider_name in ["OpenAI", "Gemini"]:
190154
# These providers need data URL format
@@ -220,7 +184,9 @@ async def send_request(
220184
try:
221185
# Process image data if present
222186
if image_data:
223-
image_data = await OpenAICompatibleProvider._prepare_image_data(image_data, provider_name)
187+
image_data = await OpenAICompatibleProvider._prepare_image_data(
188+
image_data, provider_name
189+
)
224190

225191
spinner = Halo(text=f"Sending request to {provider_name}...", spinner="dots")
226192
spinner.start()
@@ -563,6 +529,17 @@ async def send_anthropic_request(
563529
{"role": "user", "content": f"Function result: {content}"}
564530
)
565531

532+
# If JSON output is required, add instruction to the system message
533+
if require_json_output:
534+
json_instruction = "Do not comment before or after the JSON, or provide backticks or language declarations, return only the JSON object."
535+
536+
# If we have a system message, append the instruction
537+
if system_message is not None:
538+
system_message += f"\n\n{json_instruction}"
539+
else:
540+
# If no system message exists, create one
541+
system_message = json_instruction
542+
566543
# Handle image data if present
567544
if image_data:
568545
if isinstance(image_data, str):
@@ -591,28 +568,32 @@ async def send_anthropic_request(
591568
try:
592569
response = requests.get(img)
593570
response.raise_for_status()
594-
image_base64 = base64.b64encode(response.content).decode('utf-8')
595-
last_msg["content"].append({
596-
"type": "image",
597-
"source": {
598-
"type": "base64",
599-
"media_type": "image/jpeg",
600-
"data": image_base64
571+
image_base64 = base64.b64encode(response.content).decode("utf-8")
572+
last_msg["content"].append(
573+
{
574+
"type": "image",
575+
"source": {
576+
"type": "base64",
577+
"media_type": "image/jpeg",
578+
"data": image_base64,
579+
},
601580
}
602-
})
581+
)
603582
except Exception as e:
604583
logger.error(f"Failed to process image URL: {str(e)}")
605584
raise
606585
else:
607586
# For base64 data, use it directly
608-
last_msg["content"].append({
609-
"type": "image",
610-
"source": {
611-
"type": "base64",
612-
"media_type": "image/jpeg",
613-
"data": img
587+
last_msg["content"].append(
588+
{
589+
"type": "image",
590+
"source": {
591+
"type": "base64",
592+
"media_type": "image/jpeg",
593+
"data": img,
594+
},
614595
}
615-
})
596+
)
616597

617598
# Log request details
618599
logger.debug(
@@ -1656,16 +1637,16 @@ def _clean_response_tags(text: str) -> str:
16561637

16571638
# Remove common tag patterns that appear in HuggingFace model responses
16581639
# This handles tags like <||, <|assistant|>, etc.
1659-
cleaned = re.sub(r'<\|[^>]*\|>', '', text)
1640+
cleaned = re.sub(r"<\|[^>]*\|>", "", text)
16601641

16611642
# Handle incomplete tags at the beginning or end
1662-
cleaned = re.sub(r'^<\|.*?(?=\w)', '', cleaned) # Beginning of text
1663-
cleaned = re.sub(r'(?<=\w).*?\|>$', '', cleaned) # End of text
1643+
cleaned = re.sub(r"^<\|.*?(?=\w)", "", cleaned) # Beginning of text
1644+
cleaned = re.sub(r"(?<=\w).*?\|>$", "", cleaned) # End of text
16641645

16651646
# Handle other special cases
1666-
cleaned = re.sub(r'<\|\|', '', cleaned)
1667-
cleaned = re.sub(r'<\|', '', cleaned)
1668-
cleaned = re.sub(r'\|>', '', cleaned)
1647+
cleaned = re.sub(r"<\|\|", "", cleaned)
1648+
cleaned = re.sub(r"<\|", "", cleaned)
1649+
cleaned = re.sub(r"\|>", "", cleaned)
16691650

16701651
return cleaned.strip()
16711652

packages/python/src/mainframe_orchestra/orchestration.py

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from pydantic import BaseModel
99
from .task import Task
1010
from .agent import Agent
11-
11+
from .utils.logging_config import logger
1212

1313
class TaskInstruction(BaseModel):
1414
task_id: str
@@ -30,15 +30,15 @@ def create_conduct_tool(agents: List[Any], tool_summaries: bool) -> Callable:
3030
}
3131

3232
# Format available agents string with their tools
33-
available_agents = "\n ".join(
33+
available_agents = "No agents have been installed yet. Notify the user to install or add some agents first." if not agents else "\n ".join(
3434
f"- {agent_id}\n ({agent_id}'s tools: {', '.join(agent_tools[agent_id] or ['No tools'])})"
3535
for agent_id in sorted(agent_map.keys())
3636
)
3737

3838
async def conduct_tool(
3939
tasks: List, event_queue: Optional[Queue] = None, **kwargs
4040
) -> Any:
41-
print(f"[DELEGATION] Starting conduct delegation with {len(tasks)} tasks")
41+
logger.debug(f"Starting conduct delegation with {len(tasks)} tasks")
4242

4343
# Add max iteration limits
4444
MAX_AGENT_ITERATIONS = 3 # Maximum times an agent can attempt to complete a task
@@ -77,22 +77,26 @@ async def conduct_tool(
7777
# Convert dict to TaskInstruction model
7878
task = TaskInstruction.model_validate(instruction_item)
7979

80+
# Add progress logging
81+
current_task_index = tasks.index(instruction_item) + 1
82+
logger.info(f"Processing task {current_task_index} of {len(tasks)}: '{task.task_id}' with agent '{task.agent_id}'")
83+
8084
target_agent = agent_map.get(task.agent_id)
81-
print(
82-
f"[DELEGATION] Processing task '{task.task_id}' with agent '{task.agent_id}'"
85+
logger.debug(
86+
f"Processing task '{task.task_id}' with agent '{task.agent_id}'"
8387
)
8488

8589
if not target_agent:
86-
print(
87-
f"[DELEGATION] Warning: Agent {task.agent_id} not found. Available agents: {list(agent_map.keys())}"
90+
logger.warning(
91+
f"Warning: Agent {task.agent_id} not found. Available agents: {list(agent_map.keys())}"
8892
)
8993
continue
9094

9195
# Track agent iterations
9296
agent_call_counts[task.agent_id] = agent_call_counts.get(task.agent_id, 0) + 1
9397
if agent_call_counts[task.agent_id] > MAX_AGENT_ITERATIONS:
94-
print(
95-
f"[DELEGATION] Warning: Agent {task.agent_id} exceeded maximum iterations"
98+
logger.warning(
99+
f"Warning: Agent {task.agent_id} exceeded maximum iterations"
96100
)
97101
continue
98102

@@ -108,7 +112,7 @@ async def conduct_tool(
108112
}
109113
]
110114

111-
print(f"\n[DELEGATION] Starting task for agent: {task.agent_id}")
115+
logger.debug(f"\nStarting task for agent: {task.agent_id}")
112116
instruction_text = task.instruction + (
113117
"\n\nUse the following information from previous tasks:\n\n"
114118
+ "\n\n".join(
@@ -121,20 +125,19 @@ async def conduct_tool(
121125
)
122126

123127
async def nested_callback(result):
124-
if isinstance(result, dict) and result.get("tool"):
128+
if isinstance(result, dict) and (result.get("tool") or result.get("type") == "delegation_result"):
125129
current_time = datetime.now().isoformat()
126130

127131
# Ensure any existing timestamp is serializable
128132
if "timestamp" in result and isinstance(result["timestamp"], datetime):
129133
result["timestamp"] = result["timestamp"].isoformat()
130134

131135
# Standardize message format for all delegation-related events
132-
if result.get("type") in ["delegation_result", "final_response"]:
136+
if result.get("type") == "delegation_result":
133137
message = {
134138
"type": "delegation_result",
135-
"role": "delegation",
136-
"name": target_agent.agent_id,
137139
"content": result.get("content", ""),
140+
"agent_id": target_agent.agent_id,
138141
"conducted_task_id": task.task_id,
139142
"timestamp": current_time,
140143
}
@@ -177,13 +180,12 @@ async def nested_callback(result):
177180
msg_signature += (
178181
f":{result.get('tool')}:{json.dumps(result.get('params', {}))}"
179182
)
180-
# print(f"[DELEGATION DEBUG] Tool call: {result.get('tool')}")
183+
181184
elif result.get("type") == "tool_result":
182185
msg_signature += f":{result.get('tool')}"
183-
# print(f"[DELEGATION DEBUG] Tool result received")
186+
184187
elif result.get("type") == "delegation_result":
185188
msg_signature += f":delegation:{result.get('conducted_task_id')}"
186-
# print(f"[DELEGATION DEBUG] Conductor result received for task: {result.get('conducted_task_id')}")
187189

188190
# Send to event queue if available
189191
if event_queue:
@@ -199,6 +201,15 @@ async def nested_callback(result):
199201
tool_summaries=tool_summaries,
200202
)
201203

204+
# Generate a delegation result event after task completion
205+
delegation_result = {
206+
"type": "delegation_result",
207+
"content": task_result,
208+
"agent_id": target_agent.agent_id,
209+
"conducted_task_id": task.task_id
210+
}
211+
await nested_callback(delegation_result)
212+
202213
# Include context in the result
203214
context = "\n\n".join(
204215
f"Results from task '{dep_id}':\n{all_results[dep_id]}"
@@ -209,13 +220,16 @@ async def nested_callback(result):
209220
f"{context}\n\n{task_result}" if context else task_result
210221
)
211222

212-
# Return the final combined results
213-
return "\n\n".join(
214-
f"Task '{task_id}':\n"
215-
f"Instruction: {next((item['instruction'] for item in tasks if item['task_id'] == task_id), '')}\n"
216-
f"Result: {result}"
223+
# Return results as JSON structure
224+
return json.dumps([
225+
{
226+
"task_id": task_id,
227+
"agent": next((item['agent_id'] for item in tasks if item['task_id'] == task_id), ''),
228+
"instruction": next((item['instruction'] for item in tasks if item['task_id'] == task_id), ''),
229+
"result": result
230+
}
217231
for task_id, result in all_results.items()
218-
)
232+
])
219233

220234
conduct_tool.__name__ = "conduct_tool"
221235
conduct_tool.__doc__ = f"""Tool function to orchestrate multiple agents in a single, coordinated multi-agent flow. Tasks should be submitted in a single list, and they will be executed in the order they are submitted. Do not make separate calls to the tool.
@@ -275,7 +289,6 @@ def create_composition_tool(agents: List[Agent]) -> Callable:
275289
async def composition_tool(
276290
goal: str, event_queue: Optional[Queue] = None, **kwargs
277291
) -> Any:
278-
# KEEP: Create composer agent instance with all these fields
279292
composer_agent = Agent(
280293
agent_id="composer",
281294
role="Composer",
@@ -287,7 +300,7 @@ async def composition_tool(
287300
llm=next(iter(agents)).llm,
288301
)
289302

290-
# KEEP: Initialize messages array with BOTH system and user messages
303+
# Initialize messages array with both system and user messages
291304
messages = [
292305
{
293306
"role": "system",
@@ -312,7 +325,6 @@ async def composition_tool(
312325
]
313326

314327
try:
315-
# KEEP: All these parameters to Task.create()
316328
task_result = await Task.create(
317329
agent=composer_agent,
318330
instruction=f"Create a detailed plan for achieving this goal: {goal}",
@@ -322,7 +334,7 @@ async def composition_tool(
322334
)
323335
return task_result
324336
except Exception as e:
325-
print(f"[COMPOSITION ERROR] Failed to create task: {str(e)}")
337+
logger.error(f"[COMPOSITION ERROR] Failed to create task: {str(e)}")
326338
raise
327339

328340
composition_tool.__name__ = "composition_flow"

0 commit comments

Comments
 (0)