-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathmain.py
More file actions
311 lines (247 loc) · 9.88 KB
/
main.py
File metadata and controls
311 lines (247 loc) · 9.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
import logging
from dataclasses import dataclass, field
from typing import Optional
from dotenv import load_dotenv
from livekit import api
from livekit.agents import (
Agent,
AgentSession,
ChatContext,
JobContext,
JobProcess,
RoomInputOptions,
RoomOutputOptions,
RunContext,
WorkerOptions,
cli,
metrics,
)
from livekit.agents.job import get_job_context
from livekit.agents.llm import function_tool
from livekit.agents.voice import MetricsCollectedEvent
from livekit.plugins import deepgram, openai, silero
# uncomment to enable Krisp BVC noise cancellation, currently supported on Linux and MacOS
# from livekit.plugins import noise_cancellation
## The storyteller agent is a multi-agent that can handoff the session to another agent.
## This example demonstrates more complex workflows with multiple agents.
## Each agent could have its own instructions, as well as different STT, LLM, TTS,
## or realtime models.
logger = logging.getLogger("multi-agent")
load_dotenv(dotenv_path=".env.local")
common_instructions = (
"You are an editor at a leading publishing house, with a strong track record "
"of discovering and nurturing new talent. You are a great communicator and ask "
"the right questions to get the best out of people. You want the best for your "
"authors, and you are not afraid to tell them when their ideas are not good enough."
)
@dataclass
class CharacterData:
# Shared data that's used by the editor agent.
# This structure is passed as a parameter to function calls.
name: Optional[str] = None
background: Optional[str] = None
@dataclass
class StoryData:
# Shared data that's used by the editor agent.
# This structure is passed as a parameter to function calls.
characters: list[CharacterData] = field(default_factory=list)
locations: list[str] = field(default_factory=list)
theme: Optional[str] = None
class LeadEditorAgent(Agent):
def __init__(self) -> None:
super().__init__(
instructions=f"{common_instructions} You are the lead editor at this business, "
"and are yourself a generalist -- but empoly several specialist editors, "
"specializing in childrens' books and fiction, respectively. You trust your "
"editors to do their jobs, and will hand off the conversation to them when you feel "
"you have an idea of the right one."
"Your goal is to gather a few pieces of information from the user about their next"
"idea for a short story, and then hand off to the right agent."
"Start the conversation with a short introduction, then get straight to the "
"details. You may hand off to either editor as soon as you know which one is the right fit.",
)
async def on_enter(self):
# when the agent is added to the session, it'll generate a reply
# according to its instructions
self.session.generate_reply()
@function_tool
async def character_introduction(
self,
context: RunContext[StoryData],
name: str,
background: str,
):
"""Called when the user has provided a character.
Args:
name: The name of the character
background: The character's history, occupation, and other details
"""
character = CharacterData(name=name, background=background)
context.userdata.characters.append(character)
logger.info(
"added character to the story: %s", name
)
@function_tool
async def location_introduction(
self,
context: RunContext[StoryData],
location: str,
):
"""Called when the user has provided a location.
Args:
location: The name of the location
"""
context.userdata.locations.append(location)
logger.info(
"added location to the story: %s", location
)
@function_tool
async def theme_introduction(
self,
context: RunContext[StoryData],
theme: str,
):
"""Called when the user has provided a theme.
Args:
theme: The name of the theme
"""
context.userdata.theme = theme
logger.info(
"set theme to the story: %s", theme
)
@function_tool
async def detected_childrens_book(
self,
context: RunContext[StoryData],
):
"""Called when the user has provided enough information to suggest a children's book.
"""
childrens_editor = SpecialistEditorAgent("children's books", chat_ctx=context.session._chat_ctx)
# here we are creating a ChilrensEditorAgent with the full chat history,
# as if they were there in the room with the user the whole time.
# we could also omit it and rely on the userdata to share context.
logger.info(
"switching to the children's book editor with the provided user data: %s", context.userdata
)
return childrens_editor, "Let's switch to the children's book editor."
@function_tool
async def detected_novel(
self,
context: RunContext[StoryData],
):
"""Called when the user has provided enough information to suggest a children's book.
"""
childrens_editor = SpecialistEditorAgent("novels", chat_ctx=context.session._chat_ctx)
# here we are creating a ChilrensEditorAgent with the full chat history,
# as if they were there in the room with the user the whole time.
# we could also omit it and rely on the userdata to share context.
logger.info(
"switching to the children's book editor with the provided user data: %s", context.userdata
)
return childrens_editor, "Let's switch to the children's book editor."
class SpecialistEditorAgent(Agent):
def __init__(self, specialty: str, chat_ctx: Optional[ChatContext] = None) -> None:
super().__init__(
instructions=f"{common_instructions}. You specialize in {specialty}, and have "
"worked with some of the greats, and have even written a few books yourself.",
# each agent could override any of the model services, including mixing
# realtime and non-realtime models
tts=openai.TTS(voice="echo"),
chat_ctx=chat_ctx,
)
async def on_enter(self):
# when the agent is added to the session, we'll initiate the conversation by
# using the LLM to generate a reply
self.session.generate_reply()
@function_tool
async def character_introduction(
self,
context: RunContext[StoryData],
name: str,
background: str,
):
"""Called when the user has provided a character.
Args:
name: The name of the character
background: The character's history, occupation, and other details
"""
character = CharacterData(name=name, background=background)
context.userdata.characters.append(character)
logger.info(
"added character to the story: %s", name
)
@function_tool
async def location_introduction(
self,
context: RunContext[StoryData],
location: str,
):
"""Called when the user has provided a location.
Args:
location: The name of the location
"""
context.userdata.locations.append(location)
logger.info(
"added location to the story: %s", location
)
@function_tool
async def theme_introduction(
self,
context: RunContext[StoryData],
theme: str,
):
"""Called when the user has provided a theme.
Args:
theme: The name of the theme
"""
context.userdata.theme = theme
logger.info(
"set theme to the story: %s", theme
)
@function_tool
async def story_finished(self, context: RunContext[StoryData]):
"""When the editor think the broad strokes of the story have been hammered out,
they can stop you with their final thoughts.
"""
# interrupt any existing generation
self.session.interrupt()
# generate a goodbye message and hang up
# awaiting it will ensure the message is played out before returning
await self.session.generate_reply(
instructions="give brief but honest feedback on the story idea", allow_interruptions=False
)
job_ctx = get_job_context()
await job_ctx.api.room.delete_room(api.DeleteRoomRequest(room=job_ctx.room.name))
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
async def entrypoint(ctx: JobContext):
await ctx.connect()
session = AgentSession[StoryData](
vad=ctx.proc.userdata["vad"],
# any combination of STT, LLM, TTS, or realtime API can be used
llm=openai.LLM(model="gpt-4o-mini"),
stt=deepgram.STT(model="nova-3"),
tts=openai.TTS(voice="ash"),
userdata=StoryData(),
)
# log metrics as they are emitted, and total usage after session is over
usage_collector = metrics.UsageCollector()
@session.on("metrics_collected")
def _on_metrics_collected(ev: MetricsCollectedEvent):
metrics.log_metrics(ev.metrics)
usage_collector.collect(ev.metrics)
async def log_usage():
summary = usage_collector.get_summary()
logger.info(f"Usage: {summary}")
ctx.add_shutdown_callback(log_usage)
await session.start(
agent=LeadEditorAgent(),
room=ctx.room,
room_input_options=RoomInputOptions(
# uncomment to enable Krisp BVC noise cancellation
# noise_cancellation=noise_cancellation.BVC(),
),
room_output_options=RoomOutputOptions(transcription_enabled=True),
)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm))