-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathagents_simulation.py
More file actions
211 lines (179 loc) · 8.24 KB
/
agents_simulation.py
File metadata and controls
211 lines (179 loc) · 8.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
from dag import DAGEngine, ConditionalSubgraph, DAGEngineFactory
from hooks import paragraph_count_hook
# =======================
# EXECUTORS & FACTORY
# =======================
class Executor:
def execute(self, node_id, node_info, dep_outputs):
raise NotImplementedError
class LLMExecutor(Executor):
def __init__(self, run_agent):
self.run_agent = run_agent
def execute(self, node_id, node_info, dep_outputs):
return self.run_agent(
node_id,
node_info.get("system_prompt", ""),
node_info.get("user_prompts", []),
dep_outputs
)
class ExternalContextExecutor(Executor):
def __init__(self, value):
self.value = value
def execute(self, node_id, node_info, dep_outputs):
return self.value
class ExecutorFactory:
def __init__(self, run_agent):
self.run_agent = run_agent
def create(self, node_info):
"""Return an executor instance based on node properties."""
if "system_prompt" in node_info:
return LLMExecutor(self.run_agent)
elif "value" in node_info:
return ExternalContextExecutor(node_info["value"])
else:
raise ValueError(f"No executor found for node: {node_info}")
# =======================
# MOCK AGENT LOGIC
# =======================
# Global variable to control simulation scenarios
current_scenario = "default"
def run_agent(agent_id, system_prompt, user_prompts, dep_outputs):
"""Simulated LLM behavior based on agent_id."""
if agent_id == "FilterApplicantFacts":
raw = dep_outputs.get("applicant_facts", "")
filtered = [f for f in raw.split(". ") if "product" in f.lower() or "customer" in f.lower()]
return f"Filtered facts: {', '.join(filtered)}"
if agent_id == "CompanyProfile":
job_desc = dep_outputs.get("job_description", "")
return f"Company profile: SaaS products for enterprise customers. (Based on: {job_desc[:50]}...)"
if agent_id == "MatchingApplicantFacts":
filtered_facts = dep_outputs.get("FilterApplicantFacts", "")
company_profile = dep_outputs.get("CompanyProfile", "")
return f"Matched applicant facts with company profile. (Used: {filtered_facts[:30]}... + {company_profile[:30]}...)"
if agent_id == "MissionProductCustomerParagraph":
company_profile = dep_outputs.get("CompanyProfile", "")
matching_facts = dep_outputs.get("MatchingApplicantFacts", "")
return f"Applicant's product experience supports the company's mission. (From: {company_profile[:25]}... + {matching_facts[:25]}...)"
if agent_id == "JobQualifications":
job_desc = dep_outputs.get("job_description", "")
return f"Qualifications: Python, Leadership, Distributed Systems. (Extracted from: {job_desc[:40]}...)"
if agent_id == "QualificationMatchedApplicantWorkExperience":
work_exp = dep_outputs.get("work_experience", "")
qualifications = dep_outputs.get("JobQualifications", "")
return f"Work matches: Senior Engineer → Leadership, Software Engineer → Python. (Matched: {work_exp[:25]}... with {qualifications[:25]}...)"
if agent_id == "WorkExperienceParagraphs":
matched_work = dep_outputs.get("QualificationMatchedApplicantWorkExperience", "")
if current_scenario == "single_paragraph":
return f"Senior Engineer: leadership in distributed systems. (Based on: {matched_work[:30]}...)"
else:
return (
f"Senior Engineer: leadership in distributed systems.\n\n"
f"Software Engineer: built scalable Python solutions. (Generated from: {matched_work[:40]}...)"
)
if agent_id == "QualificationMatchedApplicantProjectExperience":
project_exp = dep_outputs.get("project_experience", "")
qualifications = dep_outputs.get("JobQualifications", "")
return f"Projects: Alpha → ML, Beta → Cloud Infra. (Matched: {project_exp[:30]}... with {qualifications[:25]}...)"
if agent_id == "ProjectExperienceParagraphs":
matched_projects = dep_outputs.get("QualificationMatchedApplicantProjectExperience", "")
return f"Project Alpha: implemented ML pipeline. (Based on: {matched_projects[:40]}...)"
return f"Generic output from {agent_id}"
# =======================
# DAG DEFINITION
# =======================
dag = {
"job_description": {
"value": "The company builds SaaS products for retail and healthcare.",
"dependencies": []
},
"applicant_facts": {
"value": "Experience in software, product strategy, customer success.",
"dependencies": []
},
"work_experience": {
"value": "Senior Engineer and Software Engineer roles.",
"dependencies": []
},
"project_experience": {
"value": "Project Alpha ML pipeline, Project Beta cloud infra.",
"dependencies": []
},
"CompanyProfile": {
"dependencies": ["job_description"],
"system_prompt": "Summarize the company's profile.",
"user_prompts": ["What is the company profile?"]
},
"FilterApplicantFacts": {
"dependencies": ["applicant_facts"],
"system_prompt": "Filter applicant facts for product and customer tags.",
"user_prompts": ["Filter relevant facts."]
},
"MatchingApplicantFacts": {
"dependencies": ["FilterApplicantFacts", "CompanyProfile"],
"system_prompt": "Match applicant facts to company profile.",
"user_prompts": ["Which facts align with the company?"]
},
"MissionProductCustomerParagraph": {
"dependencies": ["CompanyProfile", "MatchingApplicantFacts"],
"system_prompt": "Write a paragraph aligning applicant with company mission.",
"user_prompts": ["Write the paragraph."]
},
"JobQualifications": {
"dependencies": ["job_description"],
"system_prompt": "Extract job qualifications.",
"user_prompts": ["List qualifications."]
},
"QualificationMatchedApplicantWorkExperience": {
"dependencies": ["work_experience", "JobQualifications"],
"system_prompt": "Match work experience to job qualifications.",
"user_prompts": ["Which experiences match qualifications?"]
},
"WorkExperienceParagraphs": {
"dependencies": ["QualificationMatchedApplicantWorkExperience"],
"system_prompt": "Write paragraphs based on matched work experience.",
"user_prompts": ["Write the paragraphs."],
"hooks": [paragraph_count_hook]
},
"QualificationMatchedApplicantProjectExperience": {
"dependencies": ["project_experience", "JobQualifications"],
"system_prompt": "Match project experience to job qualifications.",
"user_prompts": ["Which projects match qualifications?"]
},
"ProjectExperienceParagraphs": {
"dependencies": ["QualificationMatchedApplicantProjectExperience"],
"system_prompt": "Write paragraphs for matched projects.",
"user_prompts": ["Write project paragraphs."],
"conditional": {
"trigger": "WorkExperienceParagraphs",
"condition": lambda outputs, meta: meta.get("WorkExperienceParagraphs", {}).get("paragraph_count", 0) < 2
}
},
}
# =======================
# SIMULATION
# =======================
def run_simulation(scenario="default"):
if scenario == "single_paragraph":
print("\n=== Running Simulation (Single Paragraph - Should Trigger Subgraph) ===")
else:
print("\n=== Running Simulation (Default - 2 Paragraphs) ===")
global current_scenario
current_scenario = scenario
factory = ExecutorFactory(run_agent)
engine = DAGEngineFactory.create(dag, factory)
engine.execute()
# ---- PRINT ALL OUTPUTS ----
for node, out in engine.outputs.items():
print(f"\n[{node}] OUTPUT:\n{out}")
# ---- FINAL ASSEMBLED OUTPUT ----
print("\n=== FINAL OUTPUT ===")
print(engine.outputs.get("MissionProductCustomerParagraph", ""))
print(engine.outputs.get("WorkExperienceParagraphs", ""))
if "ProjectExperienceParagraphs" in engine.outputs:
print(engine.outputs["ProjectExperienceParagraphs"])
# =======================
# MAIN ENTRY
# =======================
if __name__ == "__main__":
run_simulation()
#run_simulation("single_paragraph")