Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@

from typing import Dict, Any, List, Callable
from pipecat.services.llm_service import FunctionCallParams
from pipecat.frames.frames import LLMMessagesUpdateFrame
from call_processing.log.logger import logger
from call_processing.constants.language_config import LANGUAGE_INSTRUCTIONS
from call_processing.services.tts_service import TTSServiceFactory
from call_processing.services.stt_service import STTServiceFactory


class LanguageDetectionToolFactory:
Expand All @@ -17,8 +18,9 @@ class LanguageDetectionToolFactory:
@staticmethod
def create_language_detection_tool(
task_container: Dict[str, Any],
language_switcher: Any,
stt_language_switcher: Any,
tts_provider: str,
stt_provider: str,
tts_voice_ids: Dict[str, str],
context_container: Dict[str, Any],
supported_languages: List[str],
default_language: str,
Expand All @@ -30,8 +32,9 @@ def create_language_detection_tool(
Args:
task_container: Dictionary containing PipelineTask (populated after task creation)
Format: {'task': PipelineTask | None}
language_switcher: LanguageSwitcher instance that manages TTS routing
stt_language_switcher: STTLanguageSwitcher instance that manages STT routing
tts_provider: TTS provider name (e.g. 'elevenlabs', 'azure')
stt_provider: STT provider name (e.g. 'deepgram', 'azure')
tts_voice_ids: Dict mapping language code -> voice ID for TTS
context_container: Dictionary containing LLMContext (populated after context creation)
Format: {'context': LLMContext | None}
supported_languages: List of supported language codes
Expand Down Expand Up @@ -123,15 +126,35 @@ async def detect_and_switch_language(params: FunctionCallParams):

# Perform language switch
try:
# Update TTS language switcher state
language_switcher.set_language(target_language)

# Update STT language switcher state
stt_language_switcher.set_language(target_language)

logger.info(
f'Switched TTS and STT language from {current_language} to {target_language}'
# Queue TTS settings update (voice + language)
tts_frame = TTSServiceFactory.create_language_update_frame(
tts_provider,
target_language,
tts_voice_ids.get(target_language),
)
tts_frame_queued = False
if tts_frame:
await task.queue_frame(tts_frame)
tts_frame_queued = True

# Queue STT settings update (language)
stt_frame = STTServiceFactory.create_language_update_frame(
stt_provider, target_language
)
stt_frame_queued = False
if stt_frame:
await task.queue_frame(stt_frame)
stt_frame_queued = True

log_msg = (
f'Language update {current_language} -> {target_language}: '
f'TTS={"queued" if tts_frame_queued else "skipped"}, '
f'STT={"queued" if stt_frame_queued else "skipped"}'
)
if tts_frame_queued or stt_frame_queued:
logger.info(log_msg)
else:
logger.error(log_msg)

# Update system prompt with language instruction
language_instruction = LANGUAGE_INSTRUCTIONS.get(
Expand All @@ -151,17 +174,16 @@ async def detect_and_switch_language(params: FunctionCallParams):

# Append new language instruction to clean base prompt
updated_content = f'{language_instruction}\n\n{base_prompt}'
updated_system_message = {
'role': 'system',
'content': updated_content,
}

# Update context
# Mutate the system message in-place on the context object so
# the full conversation history (including the current tool
# call + result being appended by pipecat) is preserved.
# Using LLMMessagesUpdateFrame would snapshot messages BEFORE
# pipecat appends the tool result, stripping it from context
# and causing a second spurious tool call on LLM continuation.
current_messages = context.get_messages()
new_messages = [updated_system_message] + current_messages[1:]
await task.queue_frame(
LLMMessagesUpdateFrame(new_messages, run_llm=False)
)
current_messages[0] = {'role': 'system', 'content': updated_content}
context.set_messages(current_messages)

logger.info(
f'Updated system prompt with {target_language} instruction'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
StopFrame,
TTSSpeakFrame,
BotSpeakingFrame,
BotStartedSpeakingFrame,
UserSpeakingFrame,
UserStartedSpeakingFrame,
)
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
Expand All @@ -44,10 +42,6 @@
LLMUserAggregatorParams,
UserTurnStoppedMessage,
)
from pipecat.processors.filters.function_filter import FunctionFilter
from pipecat.processors.transcript_processor import (
TranscriptProcessor,
)

# from pipecat.pipeline.service_switcher import (
# ServiceSwitcher,
Expand Down Expand Up @@ -101,103 +95,6 @@
)


class STTLanguageSwitcher(ParallelPipeline):
"""
ParallelPipeline that routes STT to different language-specific services
based on current language state. Same pattern as LanguageSwitcher for TTS.
"""

def __init__(
self,
stt_services: Dict[str, Any],
supported_languages: List[str],
default_language: str,
):
self._current_language = default_language
self._stt_services = stt_services
self._supported_languages = supported_languages

# Build parallel routes: one per language
routes = []
for lang_code in supported_languages:
filter_func = self._create_language_filter(lang_code)
stt_service = stt_services[lang_code]
routes.append(
[FunctionFilter(filter_func, filter_system_frames=True), stt_service]
)

super().__init__(*routes)

def _create_language_filter(self, lang_code: str):
"""Create filter function for specific language"""

async def language_filter(_: Frame) -> bool:
return self._current_language == lang_code

return language_filter

@property
def current_language(self):
return self._current_language

def set_language(self, language_code: str):
"""Update current language (called by language detection tool)"""
if language_code in self._supported_languages:
self._current_language = language_code
logger.info(f'STTLanguageSwitcher: Language set to {language_code}')
else:
logger.warning(f'STTLanguageSwitcher: Invalid language {language_code}')


class LanguageSwitcher(ParallelPipeline):
"""
ParallelPipeline that routes TTS to different language-specific services
based on current language state.
"""

def __init__(
self,
tts_services: Dict[str, Any],
supported_languages: List[str],
default_language: str,
):
self._current_language = default_language
self._tts_services = tts_services
self._supported_languages = supported_languages

# Build parallel routes: one per language
# Each route: [FunctionFilter, TTS service]
routes = []
for lang_code in supported_languages:
filter_func = self._create_language_filter(lang_code)
tts_service = tts_services[lang_code]
routes.append(
[FunctionFilter(filter_func, filter_system_frames=True), tts_service]
)

super().__init__(*routes)

def _create_language_filter(self, lang_code: str):
"""Create filter function for specific language"""

async def language_filter(_: Frame) -> bool:
return self._current_language == lang_code

return language_filter

@property
def current_language(self):
return self._current_language

def set_language(self, language_code: str):
"""Update current language (called by language detection tool)"""
if language_code in self._supported_languages:
self._current_language = language_code
logger.info(f'LanguageSwitcher: Language set to {language_code}')
else:
logger.warning(f'LanguageSwitcher: Invalid language {language_code}')


class PipecatService:
"""Service for creating and running Pipecat pipelines"""

Expand Down Expand Up @@ -275,74 +172,28 @@ async def run_conversation(
'parameters': stt_parameters or {},
}

# Create TTS services (one per language for multi-language mode)
tts_services = {}

if is_multi_language:
logger.info(
f'Multi-language mode enabled for languages: {supported_languages}'
f'Multi-language mode enabled for languages: {supported_languages}. '
f'Creating single TTS/STT service with default language: {default_language}'
)

# Create TTS services for each supported language
for lang_code in supported_languages:
# Get voice ID for this language
voice_id_for_lang = tts_voice_ids_dict.get(lang_code)
if not voice_id_for_lang:
logger.warning(
f'No voice ID for language {lang_code}, using default'
)
voice_id_for_lang = default_voice_id

# Deep clone config to avoid mutating original
tts_config_lang = deepcopy(tts_config_with_params)

# Update language parameters
if 'parameters' not in tts_config_lang:
tts_config_lang['parameters'] = {}
tts_config_lang['parameters']['language'] = lang_code
tts_config_lang['voice_id'] = voice_id_for_lang

# Create TTS service
tts_services[lang_code] = TTSServiceFactory.create_tts_service(
tts_config_lang
)
logger.info(
f'Created TTS service for language: {lang_code} '
f'with voice: {voice_id_for_lang}'
)

# Create per-language STT services (same pattern as TTS)
stt_services = {}
for lang_code in supported_languages:
stt_config_lang = deepcopy(stt_config_with_params)
if 'parameters' not in stt_config_lang:
stt_config_lang['parameters'] = {}
stt_config_lang['parameters']['language'] = lang_code

stt_services[lang_code] = STTServiceFactory.create_stt_service(
stt_config_lang
)
logger.info(f'Created STT service for language: {lang_code}')

# Create STTLanguageSwitcher for STT routing
stt = STTLanguageSwitcher(
stt_services=stt_services,
supported_languages=supported_languages,
default_language=default_language,
)
logger.info(
f'Initialized STTLanguageSwitcher with default language: {default_language}'
# Create a single TTS service with the default language; language switches
# are handled at runtime via TTSUpdateSettingsFrame / STTUpdateSettingsFrame
tts_config_lang = deepcopy(tts_config_with_params)
if 'parameters' not in tts_config_lang:
tts_config_lang['parameters'] = {}
tts_config_lang['parameters']['language'] = default_language
tts_config_lang['voice_id'] = tts_voice_ids_dict.get(
default_language, default_voice_id
)
tts = TTSServiceFactory.create_tts_service(tts_config_lang)

# Create LanguageSwitcher for TTS routing
tts = LanguageSwitcher(
tts_services=tts_services,
supported_languages=supported_languages,
default_language=default_language,
)
logger.info(
f'Initialized LanguageSwitcher with default language: {default_language}'
)
stt_config_lang = deepcopy(stt_config_with_params)
if 'parameters' not in stt_config_lang:
stt_config_lang['parameters'] = {}
stt_config_lang['parameters']['language'] = default_language
stt = STTServiceFactory.create_stt_service(stt_config_lang)

else:
logger.info('Single language mode - no language detection needed')
Expand Down Expand Up @@ -414,8 +265,7 @@ async def run_conversation(
{
'role': 'system',
'content': system_content,
},
{'role': 'assistant', 'content': agent_config['welcome_message']},
}
]

# Load and register tools for this agent
Expand Down Expand Up @@ -459,8 +309,9 @@ async def run_conversation(
language_detection_func = (
LanguageDetectionToolFactory.create_language_detection_tool(
task_container=task_container,
language_switcher=tts, # Pass the TTS LanguageSwitcher instance
stt_language_switcher=stt, # Pass the STT LanguageSwitcher instance
tts_provider=tts_config['provider'],
stt_provider=stt_config['provider'],
tts_voice_ids=tts_voice_ids_dict,
context_container=context_container,
supported_languages=supported_languages,
default_language=default_language,
Expand Down Expand Up @@ -570,9 +421,6 @@ async def run_conversation(
),
)

# Create transcript processor for language detection
transcript = TranscriptProcessor()

# --- Call evaluation: transcript log and stats ---
transcript_log: List[Dict[str, Any]] = []
call_evaluation_tasks: List[asyncio.Task] = []
Expand Down Expand Up @@ -624,13 +472,11 @@ async def on_assistant_turn_stopped(
# Build pipeline components list
pipeline_components = [
transport.input(), # Audio input from Twilio
stt, # Speech-to-Text (ServiceSwitcher for multi-lang, direct for single)
transcript.user(), # Transcript processor for user messages
stt, # Speech-to-Text
context_aggregator.user(), # Add user message to context
llm, # LLM processing
tts, # Text-to-Speech (ServiceSwitcher for multi-lang, direct for single)
tts, # Text-to-Speech
transport.output(), # Audio output to Twilio
transcript.assistant(), # Transcript processor for assistant messages
context_aggregator.assistant(), # Add assistant response to context
]

Expand Down Expand Up @@ -734,7 +580,9 @@ async def on_pipeline_finished(task, frame):
@transport.event_handler('on_client_connected')
async def on_client_connected(transport, client):
logger.info(f"Client connected for agent: {agent_config['name']}")
await task.queue_frame(TTSSpeakFrame(agent_config['welcome_message']))
await task.queue_frame(
TTSSpeakFrame(agent_config['welcome_message'], append_to_context=True)
)

@transport.event_handler('on_client_disconnected')
async def on_client_disconnected(transport, client):
Expand Down
Loading
Loading