From 1e2ee1a2c2e05b12e14b2ad1d5f944c82346d345 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Wed, 11 Mar 2026 10:45:26 +0530 Subject: [PATCH 1/8] add pipecat tracing, upgrade flo-ai to 1.1.2 --- .../services/pipecat_service.py | 34 +++++++ .../apps/call_processing/pyproject.toml | 2 +- .../modules/agents_module/pyproject.toml | 2 +- .../knowledge_base_module/pyproject.toml | 2 +- .../modules/tools_module/pyproject.toml | 2 +- wavefront/server/uv.lock | 95 +++++++++---------- 6 files changed, 82 insertions(+), 55 deletions(-) diff --git a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py index 2978c7de..613b6c55 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py @@ -13,6 +13,8 @@ from call_processing.utils import get_current_ist_time_str # Pipecat core imports +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from pipecat.utils.tracing.setup import setup_tracing from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 @@ -62,6 +64,23 @@ ) from call_processing.constants.filler_phrases import FILLER_PHRASES +# Step 1: Initialize OpenTelemetry with your chosen exporter +exporter = OTLPSpanExporter( + endpoint=os.getenv('CALL_PROCESSING_OTLP_ENDPOINT', 'http://localhost:4317'), + insecure=True, +) + +setup_tracing( + service_name=os.getenv('CALL_PROCESSING_TRACING_SERVICE_NAME', 'call-processing'), + exporter=exporter, + console_export=False, # Set to True for debug output +) + +ENABLE_TRACING = os.getenv('CALL_PROCESSING_ENABLE_TRACING', 'true').lower() == 'true' +ENABLE_TURN_TRACKING = ( + os.getenv('CALL_PROCESSING_ENABLE_TURN_TRACKING', 'true').lower() == 'true' +) + class STTLanguageSwitcher(ParallelPipeline): """ @@ -545,6 +564,13 @@ async def run_conversation( # Create pipeline pipeline = Pipeline(pipeline_components) + # Mask customer number: keep last 4 digits + masked_customer_number = ( + '*' * (len(customer_number) - 4) + customer_number[-4:] + if customer_number and len(customer_number) > 4 + else customer_number + ) + # Create pipeline task with Twilio-specific parameters task = PipelineTask( pipeline, @@ -556,6 +582,14 @@ async def run_conversation( # report_only_initial_ttfb=True ), idle_timeout_secs=20, + enable_tracing=ENABLE_TRACING, + enable_turn_tracking=ENABLE_TURN_TRACKING, + conversation_id=None, + additional_span_attributes={ + 'customer.phone_number': masked_customer_number, + 'voice_agent.id': str(agent_id) if agent_id else '', + 'voice_agent.name': agent_config.get('name', ''), + }, ) # Populate task container for language detection tool (if multi-language) diff --git a/wavefront/server/apps/call_processing/pyproject.toml b/wavefront/server/apps/call_processing/pyproject.toml index 8a1c44c9..6b52839e 100644 --- a/wavefront/server/apps/call_processing/pyproject.toml +++ b/wavefront/server/apps/call_processing/pyproject.toml @@ -21,7 +21,7 @@ dependencies = [ "redis>=5.0.0", "tenacity>=8.0.0", # Pipecat and voice processing - "pipecat-ai[websocket,cartesia,google,silero,deepgram,groq,runner,azure,local-smart-turn-v3,sarvam]==0.0.103", + "pipecat-ai[websocket,cartesia,google,silero,deepgram,groq,runner,azure,local-smart-turn-v3,sarvam,tracing]==0.0.103", # Twilio "twilio>=8.0.0", ] diff --git a/wavefront/server/modules/agents_module/pyproject.toml b/wavefront/server/modules/agents_module/pyproject.toml index c2794206..bd37f885 100644 --- a/wavefront/server/modules/agents_module/pyproject.toml +++ b/wavefront/server/modules/agents_module/pyproject.toml @@ -13,7 +13,7 @@ dependencies = [ "flo-utils", "tools-module", "api-services-module", - "flo-ai==1.1.1", + "flo-ai==1.1.2", ] [tool.uv.sources] diff --git a/wavefront/server/modules/knowledge_base_module/pyproject.toml b/wavefront/server/modules/knowledge_base_module/pyproject.toml index bd0255ac..6e9600f8 100644 --- a/wavefront/server/modules/knowledge_base_module/pyproject.toml +++ b/wavefront/server/modules/knowledge_base_module/pyproject.toml @@ -17,7 +17,7 @@ dependencies = [ "pandas~=2.2.3", "ollama~=0.4.8", "textract~=1.6.5", - "flo-ai==1.1.1", + "flo-ai==1.1.2", "google-cloud-pubsub~=2.30.0", "boto3<=1.38.40", "pyyaml>=6.0.3,<7", diff --git a/wavefront/server/modules/tools_module/pyproject.toml b/wavefront/server/modules/tools_module/pyproject.toml index f20df982..b4cac1bb 100644 --- a/wavefront/server/modules/tools_module/pyproject.toml +++ b/wavefront/server/modules/tools_module/pyproject.toml @@ -3,7 +3,7 @@ name = "tools_module" version = "0.1.0" description = "Tools module for Flo AI agent system" dependencies = [ - "flo-ai==1.1.1", + "flo-ai==1.1.2", "flo_cloud", "datasource", diff --git a/wavefront/server/uv.lock b/wavefront/server/uv.lock index 89e95218..a2b789ee 100644 --- a/wavefront/server/uv.lock +++ b/wavefront/server/uv.lock @@ -91,7 +91,7 @@ dependencies = [ requires-dist = [ { name = "api-services-module", editable = "modules/api_services_module" }, { name = "common-module", editable = "modules/common_module" }, - { name = "flo-ai", specifier = "==1.1.1" }, + { name = "flo-ai", specifier = "==1.1.2" }, { name = "flo-cloud", editable = "packages/flo_cloud" }, { name = "flo-utils", editable = "packages/flo_utils" }, { name = "tools-module", editable = "modules/tools_module" }, @@ -625,7 +625,7 @@ dependencies = [ { name = "dependency-injector" }, { name = "fastapi" }, { name = "httpx" }, - { name = "pipecat-ai", extra = ["azure", "cartesia", "deepgram", "google", "groq", "local-smart-turn-v3", "runner", "sarvam", "silero", "websocket"] }, + { name = "pipecat-ai", extra = ["azure", "cartesia", "deepgram", "google", "groq", "local-smart-turn-v3", "runner", "sarvam", "silero", "tracing", "websocket"] }, { name = "pydantic" }, { name = "python-dotenv" }, { name = "python-multipart" }, @@ -640,7 +640,7 @@ requires-dist = [ { name = "dependency-injector", specifier = ">=4.46.0,<5.0.0" }, { name = "fastapi", specifier = ">=0.115.2,<1.0.0" }, { name = "httpx", specifier = ">=0.27.0" }, - { name = "pipecat-ai", extras = ["websocket", "cartesia", "google", "silero", "deepgram", "groq", "runner", "azure", "local-smart-turn-v3", "sarvam"], specifier = "==0.0.103" }, + { name = "pipecat-ai", extras = ["websocket", "cartesia", "google", "silero", "deepgram", "groq", "runner", "azure", "local-smart-turn-v3", "sarvam", "tracing"], specifier = "==0.0.103" }, { name = "pydantic", specifier = ">=2.0.0" }, { name = "python-dotenv", specifier = ">=1.1.0,<2.0.0" }, { name = "python-multipart", specifier = ">=0.0.9" }, @@ -1062,18 +1062,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7c/2f/41598584075fef9e2bc33c102ba2e0b91ffb207d914b19402d3abf566de8/dependency_injector-4.48.2-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:05ff29398e23a08e840c9a89a0b516d988b337a38534d33791857bd1defd2d23", size = 1623553, upload-time = "2025-09-19T10:19:40.947Z" }, ] -[[package]] -name = "deprecated" -version = "1.2.18" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "wrapt" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/98/97/06afe62762c9a8a86af0cfb7bfdab22a43ad17138b07af5b1a58442690a2/deprecated-1.2.18.tar.gz", hash = "sha256:422b6f6d859da6f2ef57857761bfb392480502a64c3028ca9bbe86085d72115d", size = 2928744, upload-time = "2025-01-27T10:46:25.7Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/6e/c6/ac0b6c1e2d138f1002bcf799d330bd6d85084fece321e662a14223794041/Deprecated-1.2.18-py2.py3-none-any.whl", hash = "sha256:bd5011788200372a32418f888e326a09ff80d0214bd961147cfed01b5c018eec", size = 9998, upload-time = "2025-01-27T10:46:09.186Z" }, -] - [[package]] name = "deprecation" version = "2.1.0" @@ -1364,7 +1352,7 @@ wheels = [ [[package]] name = "flo-ai" -version = "1.1.1" +version = "1.1.2" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "aiohttp" }, @@ -1386,9 +1374,9 @@ dependencies = [ { name = "pypdf" }, { name = "pyyaml" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/02/45/481650564bb42834e047c64ffbacc84d8832123ae8b87732ce7d618ff2d6/flo_ai-1.1.1.tar.gz", hash = "sha256:7a7c42566e42ba116a9331621cfdc9c745f79eb857e3a759a90e7b5d82061c17", size = 92428, upload-time = "2026-03-06T14:12:24.645Z" } +sdist = { url = "https://files.pythonhosted.org/packages/4c/f7/b9eda4b7167ed3c586a677fcb43313c7748868bd795101ba061b5e2584b3/flo_ai-1.1.2.tar.gz", hash = "sha256:2829e7723f2bcba0ab2fffdabee52e9a7c690ea0e148692504203a47cbf7cd26", size = 92418, upload-time = "2026-03-11T04:53:04.664Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/5a/59/bb8321d385d85b15d79f241c8a2f6448748ddf102fbc0f942f33de470d11/flo_ai-1.1.1-py3-none-any.whl", hash = "sha256:08b08e3692121b1d65250bacadecf1d0873675323e6be00749c453f3b1d7fbea", size = 119699, upload-time = "2026-03-06T14:12:18.812Z" }, + { url = "https://files.pythonhosted.org/packages/03/69/e649b44c3780576516865bce50eea58fbeeb18b02895b1dc8c139d4d9784/flo_ai-1.1.2-py3-none-any.whl", hash = "sha256:25454cd0857f6962ed6141c4d56731cb207396db52f3e9d419125f857722e7c0", size = 119697, upload-time = "2026-03-11T04:53:02.905Z" }, ] [[package]] @@ -2592,7 +2580,7 @@ dev = [ requires-dist = [ { name = "boto3", specifier = "<=1.38.40" }, { name = "datasource", editable = "plugins/datasource" }, - { name = "flo-ai", specifier = "==1.1.1" }, + { name = "flo-ai", specifier = "==1.1.2" }, { name = "flo-cloud", editable = "packages/flo_cloud" }, { name = "google-cloud-pubsub", specifier = "~=2.30.0" }, { name = "numpy", specifier = ">=1.24,<2.0" }, @@ -3528,81 +3516,81 @@ wheels = [ [[package]] name = "opentelemetry-api" -version = "1.28.2" +version = "1.40.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "deprecated" }, { name = "importlib-metadata" }, + { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/51/34/e4e9245c868c6490a46ffedf6bd5b0f512bbc0a848b19e3a51f6bbad648c/opentelemetry_api-1.28.2.tar.gz", hash = "sha256:ecdc70c7139f17f9b0cf3742d57d7020e3e8315d6cffcdf1a12a905d45b19cc0", size = 62796, upload-time = "2024-11-18T18:29:42.747Z" } +sdist = { url = "https://files.pythonhosted.org/packages/2c/1d/4049a9e8698361cc1a1aa03a6c59e4fa4c71e0c0f94a30f988a6876a2ae6/opentelemetry_api-1.40.0.tar.gz", hash = "sha256:159be641c0b04d11e9ecd576906462773eb97ae1b657730f0ecf64d32071569f", size = 70851, upload-time = "2026-03-04T14:17:21.555Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/4d/58/b17393cdfc149e14ee84c662abf921993dcce8058628359ef1f49e2abb97/opentelemetry_api-1.28.2-py3-none-any.whl", hash = "sha256:6fcec89e265beb258fe6b1acaaa3c8c705a934bd977b9f534a2b7c0d2d4275a6", size = 64302, upload-time = "2024-11-18T18:29:16.783Z" }, + { url = "https://files.pythonhosted.org/packages/5f/bf/93795954016c522008da367da292adceed71cca6ee1717e1d64c83089099/opentelemetry_api-1.40.0-py3-none-any.whl", hash = "sha256:82dd69331ae74b06f6a874704be0cfaa49a1650e1537d4a813b86ecef7d0ecf9", size = 68676, upload-time = "2026-03-04T14:17:01.24Z" }, ] [[package]] name = "opentelemetry-exporter-otlp" -version = "1.28.2" +version = "1.40.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "opentelemetry-exporter-otlp-proto-grpc" }, { name = "opentelemetry-exporter-otlp-proto-http" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/8a/eb/ad88c61b4e51cdd294ad4ae7c45b35120fb381eb019675954c4fc15b6c4c/opentelemetry_exporter_otlp-1.28.2.tar.gz", hash = "sha256:45f8d7fe4cdd41526464b542ce91b1fd1ae661be92d2c6cba71a3d948b2bdf70", size = 6155, upload-time = "2024-11-18T18:29:45.549Z" } +sdist = { url = "https://files.pythonhosted.org/packages/d0/37/b6708e0eff5c5fb9aba2e0ea09f7f3bcbfd12a592d2a780241b5f6014df7/opentelemetry_exporter_otlp-1.40.0.tar.gz", hash = "sha256:7caa0870b95e2fcb59d64e16e2b639ecffb07771b6cd0000b5d12e5e4fef765a", size = 6152, upload-time = "2026-03-04T14:17:23.235Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/b8/16/65b0f0f9a85e6c0e1ce30e0ea96e0174ca4db85301883d1d6a9702700946/opentelemetry_exporter_otlp-1.28.2-py3-none-any.whl", hash = "sha256:b50f6d4a80e6bcd329e36f360ac486ecfa106ea704d6226ceea05d3a48455f70", size = 7010, upload-time = "2024-11-18T18:29:21.195Z" }, + { url = "https://files.pythonhosted.org/packages/2d/fc/aea77c28d9f3ffef2fdafdc3f4a235aee4091d262ddabd25882f47ce5c5f/opentelemetry_exporter_otlp-1.40.0-py3-none-any.whl", hash = "sha256:48c87e539ec9afb30dc443775a1334cc5487de2f72a770a4c00b1610bf6c697d", size = 7023, upload-time = "2026-03-04T14:17:03.612Z" }, ] [[package]] name = "opentelemetry-exporter-otlp-proto-common" -version = "1.28.2" +version = "1.40.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "opentelemetry-proto" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/60/cd/cd990f891b64e7698b8a6b6ab90dfac7f957db5a3d06788acd52f73ad4c0/opentelemetry_exporter_otlp_proto_common-1.28.2.tar.gz", hash = "sha256:7aebaa5fc9ff6029374546df1f3a62616fda07fccd9c6a8b7892ec130dd8baca", size = 19136, upload-time = "2024-11-18T18:29:46.87Z" } +sdist = { url = "https://files.pythonhosted.org/packages/51/bc/1559d46557fe6eca0b46c88d4c2676285f1f3be2e8d06bb5d15fbffc814a/opentelemetry_exporter_otlp_proto_common-1.40.0.tar.gz", hash = "sha256:1cbee86a4064790b362a86601ee7934f368b81cd4cc2f2e163902a6e7818a0fa", size = 20416, upload-time = "2026-03-04T14:17:23.801Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/2a/4d/769f3b1b1c6af5e603da50349ba31af757897540a75d666de22d39461055/opentelemetry_exporter_otlp_proto_common-1.28.2-py3-none-any.whl", hash = "sha256:545b1943b574f666c35b3d6cc67cb0b111060727e93a1e2866e346b33bff2a12", size = 18460, upload-time = "2024-11-18T18:29:22.79Z" }, + { url = "https://files.pythonhosted.org/packages/8b/ca/8f122055c97a932311a3f640273f084e738008933503d0c2563cd5d591fc/opentelemetry_exporter_otlp_proto_common-1.40.0-py3-none-any.whl", hash = "sha256:7081ff453835a82417bf38dccf122c827c3cbc94f2079b03bba02a3165f25149", size = 18369, upload-time = "2026-03-04T14:17:04.796Z" }, ] [[package]] name = "opentelemetry-exporter-otlp-proto-grpc" -version = "1.28.2" +version = "1.40.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "deprecated" }, { name = "googleapis-common-protos" }, { name = "grpcio" }, { name = "opentelemetry-api" }, { name = "opentelemetry-exporter-otlp-proto-common" }, { name = "opentelemetry-proto" }, { name = "opentelemetry-sdk" }, + { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/f7/4c/b5374467e97f2b290611de746d0e6cab3a07aec865d6b99d11535cd60059/opentelemetry_exporter_otlp_proto_grpc-1.28.2.tar.gz", hash = "sha256:07c10378380bbb01a7f621a5ce833fc1fab816e971140cd3ea1cd587840bc0e6", size = 26227, upload-time = "2024-11-18T18:29:47.576Z" } +sdist = { url = "https://files.pythonhosted.org/packages/8f/7f/b9e60435cfcc7590fa87436edad6822240dddbc184643a2a005301cc31f4/opentelemetry_exporter_otlp_proto_grpc-1.40.0.tar.gz", hash = "sha256:bd4015183e40b635b3dab8da528b27161ba83bf4ef545776b196f0fb4ec47740", size = 25759, upload-time = "2026-03-04T14:17:24.4Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/dd/7e/6af5a7de87988cfc951db86f7fd0ecaabc20bc112fd9cfe06b8a01f11400/opentelemetry_exporter_otlp_proto_grpc-1.28.2-py3-none-any.whl", hash = "sha256:6083d9300863aab35bfce7c172d5fc1007686e6f8dff366eae460cd9a21592e2", size = 18518, upload-time = "2024-11-18T18:29:23.71Z" }, + { url = "https://files.pythonhosted.org/packages/96/6f/7ee0980afcbdcd2d40362da16f7f9796bd083bf7f0b8e038abfbc0300f5d/opentelemetry_exporter_otlp_proto_grpc-1.40.0-py3-none-any.whl", hash = "sha256:2aa0ca53483fe0cf6405087a7491472b70335bc5c7944378a0a8e72e86995c52", size = 20304, upload-time = "2026-03-04T14:17:05.942Z" }, ] [[package]] name = "opentelemetry-exporter-otlp-proto-http" -version = "1.28.2" +version = "1.40.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "deprecated" }, { name = "googleapis-common-protos" }, { name = "opentelemetry-api" }, { name = "opentelemetry-exporter-otlp-proto-common" }, { name = "opentelemetry-proto" }, { name = "opentelemetry-sdk" }, { name = "requests" }, + { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/b1/91/4e32e52d13dbdf9560bc095dfe66a2c09e0034a886f7725fcda8fe10a052/opentelemetry_exporter_otlp_proto_http-1.28.2.tar.gz", hash = "sha256:d9b353d67217f091aaf4cfe8693c170973bb3e90a558992570d97020618fda79", size = 15043, upload-time = "2024-11-18T18:29:48.237Z" } +sdist = { url = "https://files.pythonhosted.org/packages/2e/fa/73d50e2c15c56be4d000c98e24221d494674b0cc95524e2a8cb3856d95a4/opentelemetry_exporter_otlp_proto_http-1.40.0.tar.gz", hash = "sha256:db48f5e0f33217588bbc00274a31517ba830da576e59503507c839b38fa0869c", size = 17772, upload-time = "2026-03-04T14:17:25.324Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/19/23/802b889cf8bf3e235f30fbcbaa2b3fd484fe8c76b5b4db00f00c0e9af20f/opentelemetry_exporter_otlp_proto_http-1.28.2-py3-none-any.whl", hash = "sha256:af921c18212a56ef4be68458ba475791c0517ebfd8a2ff04669c9cd477d90ff2", size = 17218, upload-time = "2024-11-18T18:29:25.474Z" }, + { url = "https://files.pythonhosted.org/packages/a0/3a/8865d6754e61c9fb170cdd530a124a53769ee5f740236064816eb0ca7301/opentelemetry_exporter_otlp_proto_http-1.40.0-py3-none-any.whl", hash = "sha256:a8d1dab28f504c5d96577d6509f80a8150e44e8f45f82cdbe0e34c99ab040069", size = 19960, upload-time = "2026-03-04T14:17:07.153Z" }, ] [[package]] name = "opentelemetry-instrumentation" -version = "0.49b2" +version = "0.61b0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "opentelemetry-api" }, @@ -3610,48 +3598,48 @@ dependencies = [ { name = "packaging" }, { name = "wrapt" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/6f/1f/9fa51f6f64f4d179f4e3370eb042176ff7717682428552f5e1f4c5efcc09/opentelemetry_instrumentation-0.49b2.tar.gz", hash = "sha256:8cf00cc8d9d479e4b72adb9bd267ec544308c602b7188598db5a687e77b298e2", size = 26480, upload-time = "2024-11-18T18:39:46.03Z" } +sdist = { url = "https://files.pythonhosted.org/packages/da/37/6bf8e66bfcee5d3c6515b79cb2ee9ad05fe573c20f7ceb288d0e7eeec28c/opentelemetry_instrumentation-0.61b0.tar.gz", hash = "sha256:cb21b48db738c9de196eba6b805b4ff9de3b7f187e4bbf9a466fa170514f1fc7", size = 32606, upload-time = "2026-03-04T14:20:16.825Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/ef/e3/ad23372525653b0221212d5e2a71bd97aae64cc35f90cbf0c70de57dfa4e/opentelemetry_instrumentation-0.49b2-py3-none-any.whl", hash = "sha256:f6d782b0ef9fef4a4c745298651c65f5c532c34cd4c40d230ab5b9f3b3b4d151", size = 30693, upload-time = "2024-11-18T18:38:31.962Z" }, + { url = "https://files.pythonhosted.org/packages/d8/3e/f6f10f178b6316de67f0dfdbbb699a24fbe8917cf1743c1595fb9dcdd461/opentelemetry_instrumentation-0.61b0-py3-none-any.whl", hash = "sha256:92a93a280e69788e8f88391247cc530fd81f16f2b011979d4d6398f805cfbc63", size = 33448, upload-time = "2026-03-04T14:19:02.447Z" }, ] [[package]] name = "opentelemetry-proto" -version = "1.28.2" +version = "1.40.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "protobuf" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/d0/45/96c4f34c79fd87dc8a1c0c432f23a5a202729f21e4e63c8b36fc8e57767a/opentelemetry_proto-1.28.2.tar.gz", hash = "sha256:7c0d125a6b71af88bfeeda16bfdd0ff63dc2cf0039baf6f49fa133b203e3f566", size = 34316, upload-time = "2024-11-18T18:29:57.324Z" } +sdist = { url = "https://files.pythonhosted.org/packages/4c/77/dd38991db037fdfce45849491cb61de5ab000f49824a00230afb112a4392/opentelemetry_proto-1.40.0.tar.gz", hash = "sha256:03f639ca129ba513f5819810f5b1f42bcb371391405d99c168fe6937c62febcd", size = 45667, upload-time = "2026-03-04T14:17:31.194Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/1d/12/646f48d6d698a6df0437a22b591387440dc4888c8752d1a1300f730da710/opentelemetry_proto-1.28.2-py3-none-any.whl", hash = "sha256:0837498f59db55086462915e5898d0b1a18c1392f6db4d7e937143072a72370c", size = 55818, upload-time = "2024-11-18T18:29:37.002Z" }, + { url = "https://files.pythonhosted.org/packages/b9/b2/189b2577dde745b15625b3214302605b1353436219d42b7912e77fa8dc24/opentelemetry_proto-1.40.0-py3-none-any.whl", hash = "sha256:266c4385d88923a23d63e353e9761af0f47a6ed0d486979777fe4de59dc9b25f", size = 72073, upload-time = "2026-03-04T14:17:16.673Z" }, ] [[package]] name = "opentelemetry-sdk" -version = "1.28.2" +version = "1.40.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "opentelemetry-api" }, { name = "opentelemetry-semantic-conventions" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/4b/f4/840a5af4efe48d7fb4c456ad60fd624673e871a60d6494f7ff8a934755d4/opentelemetry_sdk-1.28.2.tar.gz", hash = "sha256:5fed24c5497e10df30282456fe2910f83377797511de07d14cec0d3e0a1a3110", size = 157272, upload-time = "2024-11-18T18:29:58.094Z" } +sdist = { url = "https://files.pythonhosted.org/packages/58/fd/3c3125b20ba18ce2155ba9ea74acb0ae5d25f8cd39cfd37455601b7955cc/opentelemetry_sdk-1.40.0.tar.gz", hash = "sha256:18e9f5ec20d859d268c7cb3c5198c8d105d073714db3de50b593b8c1345a48f2", size = 184252, upload-time = "2026-03-04T14:17:31.87Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/da/8b/4f2b418496c08016d4384f9b1c4725a8af7faafa248d624be4bb95993ce1/opentelemetry_sdk-1.28.2-py3-none-any.whl", hash = "sha256:93336c129556f1e3ccd21442b94d3521759541521861b2214c499571b85cb71b", size = 118757, upload-time = "2024-11-18T18:29:38.744Z" }, + { url = "https://files.pythonhosted.org/packages/2c/c5/6a852903d8bfac758c6dc6e9a68b015d3c33f2f1be5e9591e0f4b69c7e0a/opentelemetry_sdk-1.40.0-py3-none-any.whl", hash = "sha256:787d2154a71f4b3d81f20524a8ce061b7db667d24e46753f32a7bc48f1c1f3f1", size = 141951, upload-time = "2026-03-04T14:17:17.961Z" }, ] [[package]] name = "opentelemetry-semantic-conventions" -version = "0.49b2" +version = "0.61b0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "deprecated" }, { name = "opentelemetry-api" }, + { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/7d/0a/e3b93f94aa3223c6fd8e743502a1fefd4fb3a753d8f501ce2a418f7c0bd4/opentelemetry_semantic_conventions-0.49b2.tar.gz", hash = "sha256:44e32ce6a5bb8d7c0c617f84b9dc1c8deda1045a07dc16a688cc7cbeab679997", size = 95213, upload-time = "2024-11-18T18:29:58.915Z" } +sdist = { url = "https://files.pythonhosted.org/packages/6d/c0/4ae7973f3c2cfd2b6e321f1675626f0dab0a97027cc7a297474c9c8f3d04/opentelemetry_semantic_conventions-0.61b0.tar.gz", hash = "sha256:072f65473c5d7c6dc0355b27d6c9d1a679d63b6d4b4b16a9773062cb7e31192a", size = 145755, upload-time = "2026-03-04T14:17:32.664Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/b1/be/6661c8f76708bb3ba38c90be8fa8d7ffe17ccbc5cbbc229334f5535f6448/opentelemetry_semantic_conventions-0.49b2-py3-none-any.whl", hash = "sha256:51e7e1d0daa958782b6c2a8ed05e5f0e7dd0716fc327ac058777b8659649ee54", size = 159199, upload-time = "2024-11-18T18:29:39.906Z" }, + { url = "https://files.pythonhosted.org/packages/b2/37/cc6a55e448deaa9b27377d087da8615a3416d8ad523d5960b78dbeadd02a/opentelemetry_semantic_conventions-0.61b0-py3-none-any.whl", hash = "sha256:fa530a96be229795f8cef353739b618148b0fe2b4b3f005e60e262926c4d38e2", size = 231621, upload-time = "2026-03-04T14:17:19.33Z" }, ] [[package]] @@ -3964,6 +3952,11 @@ sarvam = [ silero = [ { name = "onnxruntime" }, ] +tracing = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-sdk" }, +] websocket = [ { name = "fastapi" }, { name = "websockets" }, @@ -5530,7 +5523,7 @@ dev = [ requires-dist = [ { name = "common-module", editable = "modules/common_module" }, { name = "datasource", editable = "plugins/datasource" }, - { name = "flo-ai", specifier = "==1.1.1" }, + { name = "flo-ai", specifier = "==1.1.2" }, { name = "flo-cloud", editable = "packages/flo_cloud" }, { name = "knowledge-base-module", editable = "modules/knowledge_base_module" }, { name = "plugins-module", editable = "modules/plugins_module" }, From 720a9af71e96fd28abf52d5bf758ed40d01bca14 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Wed, 11 Mar 2026 10:57:26 +0530 Subject: [PATCH 2/8] resolved review comment --- .../services/pipecat_service.py | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py index 613b6c55..0eb8720a 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py @@ -64,23 +64,26 @@ ) from call_processing.constants.filler_phrases import FILLER_PHRASES -# Step 1: Initialize OpenTelemetry with your chosen exporter -exporter = OTLPSpanExporter( - endpoint=os.getenv('CALL_PROCESSING_OTLP_ENDPOINT', 'http://localhost:4317'), - insecure=True, -) - -setup_tracing( - service_name=os.getenv('CALL_PROCESSING_TRACING_SERVICE_NAME', 'call-processing'), - exporter=exporter, - console_export=False, # Set to True for debug output -) - ENABLE_TRACING = os.getenv('CALL_PROCESSING_ENABLE_TRACING', 'true').lower() == 'true' ENABLE_TURN_TRACKING = ( os.getenv('CALL_PROCESSING_ENABLE_TURN_TRACKING', 'true').lower() == 'true' ) +OTLP_ENDPOINT = os.getenv('CALL_PROCESSING_OTLP_ENDPOINT') + +if ENABLE_TRACING and OTLP_ENDPOINT: + exporter = OTLPSpanExporter( + endpoint=OTLP_ENDPOINT, + insecure=True, + ) + setup_tracing( + service_name=os.getenv( + 'CALL_PROCESSING_TRACING_SERVICE_NAME', 'call-processing' + ), + exporter=exporter, + console_export=False, + ) + class STTLanguageSwitcher(ParallelPipeline): """ From a6935f13f8e6508a1ac100e302af9aac0f3f3eb9 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Wed, 11 Mar 2026 14:38:15 +0530 Subject: [PATCH 3/8] added opentelemetry http and grpc exporter --- wavefront/server/apps/call_processing/pyproject.toml | 2 ++ wavefront/server/uv.lock | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/wavefront/server/apps/call_processing/pyproject.toml b/wavefront/server/apps/call_processing/pyproject.toml index 6b52839e..8e7f4644 100644 --- a/wavefront/server/apps/call_processing/pyproject.toml +++ b/wavefront/server/apps/call_processing/pyproject.toml @@ -22,6 +22,8 @@ dependencies = [ "tenacity>=8.0.0", # Pipecat and voice processing "pipecat-ai[websocket,cartesia,google,silero,deepgram,groq,runner,azure,local-smart-turn-v3,sarvam,tracing]==0.0.103", + "opentelemetry-exporter-otlp-proto-grpc>=1.0.0", + "opentelemetry-exporter-otlp-proto-http>=1.0.0", # Twilio "twilio>=8.0.0", ] diff --git a/wavefront/server/uv.lock b/wavefront/server/uv.lock index a2b789ee..fa12f388 100644 --- a/wavefront/server/uv.lock +++ b/wavefront/server/uv.lock @@ -625,6 +625,8 @@ dependencies = [ { name = "dependency-injector" }, { name = "fastapi" }, { name = "httpx" }, + { name = "opentelemetry-exporter-otlp-proto-grpc" }, + { name = "opentelemetry-exporter-otlp-proto-http" }, { name = "pipecat-ai", extra = ["azure", "cartesia", "deepgram", "google", "groq", "local-smart-turn-v3", "runner", "sarvam", "silero", "tracing", "websocket"] }, { name = "pydantic" }, { name = "python-dotenv" }, @@ -640,6 +642,8 @@ requires-dist = [ { name = "dependency-injector", specifier = ">=4.46.0,<5.0.0" }, { name = "fastapi", specifier = ">=0.115.2,<1.0.0" }, { name = "httpx", specifier = ">=0.27.0" }, + { name = "opentelemetry-exporter-otlp-proto-grpc", specifier = ">=1.0.0" }, + { name = "opentelemetry-exporter-otlp-proto-http", specifier = ">=1.0.0" }, { name = "pipecat-ai", extras = ["websocket", "cartesia", "google", "silero", "deepgram", "groq", "runner", "azure", "local-smart-turn-v3", "sarvam", "tracing"], specifier = "==0.0.103" }, { name = "pydantic", specifier = ">=2.0.0" }, { name = "python-dotenv", specifier = ">=1.1.0,<2.0.0" }, From 059c6101fb3426820f9894c941575ced3655d587 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Thu, 12 Mar 2026 11:24:57 +0530 Subject: [PATCH 4/8] additional trace attributes --- .../call_processing/controllers/webhook_controller.py | 10 ++++++++++ .../call_processing/services/pipecat_service.py | 8 ++++++++ 2 files changed, 18 insertions(+) diff --git a/wavefront/server/apps/call_processing/call_processing/controllers/webhook_controller.py b/wavefront/server/apps/call_processing/call_processing/controllers/webhook_controller.py index 0a01f319..ff2dbaa2 100644 --- a/wavefront/server/apps/call_processing/call_processing/controllers/webhook_controller.py +++ b/wavefront/server/apps/call_processing/call_processing/controllers/webhook_controller.py @@ -201,6 +201,7 @@ async def websocket_endpoint( body_data = call_data.get('body', {}) voice_agent_id = body_data.get('voice_agent_id') customer_number = body_data.get('customer_number') + agent_number = body_data.get('agent_number', '') elif transport_type == 'exotel': custom_parameters = call_data.get('custom_parameters', {}) logger.info(f'Exotel custom_parameters: {custom_parameters}') @@ -217,6 +218,7 @@ async def websocket_endpoint( continue customer_number = call_data.get('from', '') + agent_number = call_data.get('to', '') else: logger.error(f'Unknown transport type: {transport_type}') await websocket.close( @@ -301,6 +303,10 @@ async def websocket_endpoint( stt_config=configs['stt_config'], tools=configs['tools'], customer_number=customer_number, + call_id=call_data.get('call_id', ''), + agent_number=agent_number, + provider=transport_type, + call_direction='outbound', ) except Exception as e: @@ -447,6 +453,10 @@ async def exotel_inbound_websocket( stt_config=configs['stt_config'], tools=configs['tools'], customer_number=normalized_from_number, + call_id=call_sid, + agent_number=normalized_to_number, + provider=transport_type, + call_direction='inbound', ) except Exception as e: diff --git a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py index 0eb8720a..0cfbedc2 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py @@ -194,6 +194,10 @@ async def run_conversation( stt_config: Dict[str, Any], tools: List[Dict[str, Any]], customer_number: str, + call_id: str, + agent_number: str, + provider: str, + call_direction: str, ): """ Create and run the Pipecat pipeline for a voice conversation @@ -592,6 +596,10 @@ async def run_conversation( 'customer.phone_number': masked_customer_number, 'voice_agent.id': str(agent_id) if agent_id else '', 'voice_agent.name': agent_config.get('name', ''), + 'call.direction': call_direction, + 'call.id': call_id, + 'telephony.provider': provider, + 'telephony.agent_number': agent_number, }, ) From 4d8eaaf0340d8567280e14b72b31cff1b44ceedb Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Mon, 16 Mar 2026 15:10:52 +0530 Subject: [PATCH 5/8] added post call analysis as metrics - added call_evalution_service which analyzes which pushes quantitative (turns count, tool calls count, etc) and qualitative (llm analyzed like summary, goal_completion, etc) --- .../services/call_evaluation_service.py | 279 ++++++++++++++++++ .../services/pipecat_service.py | 86 +++++- 2 files changed, 364 insertions(+), 1 deletion(-) create mode 100644 wavefront/server/apps/call_processing/call_processing/services/call_evaluation_service.py diff --git a/wavefront/server/apps/call_processing/call_processing/services/call_evaluation_service.py b/wavefront/server/apps/call_processing/call_processing/services/call_evaluation_service.py new file mode 100644 index 00000000..8301f04c --- /dev/null +++ b/wavefront/server/apps/call_processing/call_processing/services/call_evaluation_service.py @@ -0,0 +1,279 @@ +""" +Post-call evaluation metrics service + +Emits a `call.evaluation` OTel span with: + - Quantitative metrics: turn counts, interruptions, tool calls, language switches, word counts + - Qualitative LLM analysis: multi-dimensional rubric scoring via Azure OpenAI (optional) + +LLM analysis is best-effort — if Azure config is missing or the call fails, the metrics +span is still emitted with eval.llm_analysis_skipped=True. + +Required env vars for LLM analysis (all must be set to enable): + CALL_EVAL_AZURE_ENDPOINT e.g. https://my-resource.openai.azure.com + CALL_EVAL_AZURE_API_KEY + CALL_EVAL_AZURE_LLM_MODEL (optional, default: gpt-4.1) + CALL_EVAL_AZURE_API_VERSION (optional, default: 2024-02-01) +""" + +import json +import os +from typing import Any, Dict, List, Optional + +import aiohttp +from call_processing.log.logger import logger +from opentelemetry import trace + +tracer = trace.get_tracer(__name__) + +_EVAL_DIMENSIONS = [ + 'goal_completion', + 'instruction_adherence', + 'tone_professionalism', + 'naturalness', + 'conciseness', + 'handling_unknowns', + 'language_quality', +] + + +class CallEvaluationService: + """Emits a post-call OTel span with quantitative metrics and optional LLM analysis.""" + + # ------------------------------------------------------------------ + # Public entry point + # ------------------------------------------------------------------ + + @staticmethod + async def record_call_metrics( + call_id: str, + agent_config: Dict[str, Any], + call_outcome: str, + transcript_log: List[Dict[str, Any]], + stats: Dict[str, Any], + ) -> None: + """ + Record call evaluation metrics as an OTel span. + + Args: + call_id: Unique call identifier + agent_config: Voice agent configuration (id, name, system_prompt, etc.) + call_outcome: "completed" | "cancelled" | "error" | "stopped" + transcript_log: List of {"role", "content", "timestamp"} dicts + stats: Dict with keys: user_turns, assistant_turns, interruption_count, + tool_calls_count, language_switch_count + """ + try: + agent_id = str(agent_config.get('id', '')) + agent_name = agent_config.get('name', '') + + user_turns = stats.get('user_turns', 0) + assistant_turns = stats.get('assistant_turns', 0) + total_turns = user_turns + assistant_turns + + total_words_user = sum( + len(t['content'].split()) + for t in transcript_log + if t.get('role') == 'user' and t.get('content') + ) + total_words_assistant = sum( + len(t['content'].split()) + for t in transcript_log + if t.get('role') == 'assistant' and t.get('content') + ) + + logger.info( + f'Recording call evaluation for {call_id}: outcome={call_outcome}, ' + f'turns={total_turns}, user_words={total_words_user}, ' + f'assistant_words={total_words_assistant}' + ) + + with tracer.start_as_current_span( + 'call.evaluation', + attributes={ + 'call.id': call_id, + 'voice_agent.id': agent_id, + 'voice_agent.name': agent_name, + # --- Call outcome --- + 'call.outcome': call_outcome, + # --- Turn counts --- + 'call.total_turns': total_turns, + 'call.user_turns': user_turns, + 'call.assistant_turns': assistant_turns, + # --- Engagement metrics --- + 'call.interruption_count': stats.get('interruption_count', 0), + 'call.tool_calls_count': stats.get('tool_calls_count', 0), + 'call.language_switch_count': stats.get('language_switch_count', 0), + # --- Transcript volume --- + 'call.transcript_turns': len(transcript_log), + 'call.total_words_user': total_words_user, + 'call.total_words_assistant': total_words_assistant, + }, + ) as span: + # Add one span event per turn for searchable transcript + for entry in transcript_log: + content = entry.get('content', '') + span.add_event( + 'turn', + { + 'role': entry.get('role', ''), + 'content': content, + 'timestamp': entry.get('timestamp', ''), + 'word_count': len(content.split()) if content else 0, + }, + ) + + # --- LLM qualitative analysis (best-effort) --- + azure_config = CallEvaluationService._get_azure_eval_config() + if azure_config and transcript_log: + try: + prompt = CallEvaluationService._build_eval_prompt( + system_prompt=agent_config.get('system_prompt', ''), + transcript_log=transcript_log, + ) + analysis = await CallEvaluationService._call_azure_llm( + prompt, azure_config + ) + CallEvaluationService._apply_analysis_to_span(span, analysis) + logger.info( + f"LLM analysis complete for {call_id}: " + f"overall_rating={analysis.get('overall_rating')}" + ) + except Exception as e: + logger.error( + f'LLM analysis failed for {call_id}: {e}', exc_info=True + ) + span.set_attribute('eval.llm_analysis_skipped', True) + else: + reason = ( + 'no Azure config' if not azure_config else 'empty transcript' + ) + logger.info(f'LLM eval skipped for {call_id}: {reason}') + span.set_attribute('eval.llm_analysis_skipped', True) + + except Exception as e: + logger.error( + f'Error recording call evaluation for {call_id}: {e}', exc_info=True + ) + + # ------------------------------------------------------------------ + # Private helpers + # ------------------------------------------------------------------ + + @staticmethod + def _get_azure_eval_config() -> Optional[Dict[str, str]]: + """Read Azure OpenAI eval config from env vars. Returns None if incomplete.""" + endpoint = os.getenv('CALL_EVAL_AZURE_ENDPOINT', '').rstrip('/') + api_key = os.getenv('CALL_EVAL_AZURE_API_KEY', '') + llm_model = os.getenv('CALL_EVAL_AZURE_LLM_MODEL', 'gpt-4.1') + api_version = os.getenv('CALL_EVAL_AZURE_API_VERSION', '2025-01-01-preview') + + if not all([endpoint, api_key]): + return None + + return { + 'endpoint': endpoint, + 'api_key': api_key, + 'llm_model': llm_model, + 'api_version': api_version, + } + + @staticmethod + def _build_eval_prompt( + system_prompt: str, transcript_log: List[Dict[str, Any]] + ) -> str: + """Build the evaluation prompt with rubric and transcript.""" + transcript_text = '\n'.join( + f"{entry['role'].upper()}: {entry.get('content', '')}" + for entry in transcript_log + if entry.get('content', '').strip() + ) + + return f"""You are an expert AI quality assurance evaluator for voice agent conversations. +Evaluate the transcript below against the agent's configured objective. + +## Agent Objective (System Prompt) +{system_prompt} + +## Transcript +{transcript_text} + +## Evaluation Rubric +Score each dimension from 1 (very poor) to 10 (excellent). Include a brief comment (1-2 sentences). + +Dimensions: +- goal_completion: Did the agent achieve the conversation's objective as defined in the system prompt? +- instruction_adherence: Did the agent follow all rules, restrictions, persona and format instructions in the system prompt? +- tone_professionalism: Was the tone warm, professional and appropriate to the context? +- naturalness: Did the conversation flow naturally, avoiding robotic, repetitive or scripted-sounding phrasing? +- conciseness: Were responses appropriately brief without sacrificing clarity? Penalise over-verbose responses. +- handling_unknowns: Did the agent gracefully handle questions outside its scope — avoiding hallucination and staying in persona? +- language_quality: Clarity, grammar and vocabulary appropriateness. For multi-language calls, assess the switched language too. + +## Output Format +Respond ONLY with a valid JSON object in this exact structure: +{{ + "overall_rating": , + "summary": "<2-3 sentence summary of the conversation>", + "dimensions": {{ + "goal_completion": {{"score": , "comment": ""}}, + "instruction_adherence": {{"score": , "comment": ""}}, + "tone_professionalism": {{"score": , "comment": ""}}, + "naturalness": {{"score": , "comment": ""}}, + "conciseness": {{"score": , "comment": ""}}, + "handling_unknowns": {{"score": , "comment": ""}}, + "language_quality": {{"score": , "comment": ""}} + }}, + "strengths": ["", ...], + "improvement_areas": ["", ...] +}}""" + + @staticmethod + async def _call_azure_llm(prompt: str, config: Dict[str, str]) -> Dict[str, Any]: + """POST to Azure OpenAI and return parsed JSON result.""" + url = ( + f"{config['endpoint']}/openai/deployments/{config['llm_model']}" + f"/chat/completions?api-version={config['api_version']}" + ) + headers = { + 'api-key': config['api_key'], + 'Content-Type': 'application/json', + } + payload = { + 'messages': [{'role': 'user', 'content': prompt}], + 'temperature': 0.1, + 'response_format': {'type': 'json_object'}, + } + + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=headers, json=payload) as resp: + if resp.status != 200: + body = await resp.text() + raise RuntimeError( + f'Azure LLM returned {resp.status}: {body[:200]}' + ) + data = await resp.json() + content = data['choices'][0]['message']['content'] + return json.loads(content) + + @staticmethod + def _apply_analysis_to_span(span: Any, analysis: Dict[str, Any]) -> None: + """Write LLM analysis result as OTel span attributes.""" + span.set_attribute('eval.llm_analysis_skipped', False) + span.set_attribute( + 'eval.overall_rating', int(analysis.get('overall_rating', 0)) + ) + span.set_attribute('eval.summary', str(analysis.get('summary', ''))) + + dimensions = analysis.get('dimensions', {}) + for dim in _EVAL_DIMENSIONS: + dim_data = dimensions.get(dim, {}) + span.set_attribute(f'eval.{dim}', int(dim_data.get('score', 0))) + span.set_attribute(f'eval.{dim}_comment', str(dim_data.get('comment', ''))) + + span.set_attribute( + 'eval.strengths', [str(s) for s in analysis.get('strengths', [])] + ) + span.set_attribute( + 'eval.improvement_areas', + [str(a) for a in analysis.get('improvement_areas', [])], + ) diff --git a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py index 0cfbedc2..93e582ab 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py @@ -6,9 +6,11 @@ from typing import Dict, Any, List from copy import deepcopy +import asyncio import os import random from call_processing.log.logger import logger +from call_processing.services.call_evaluation_service import CallEvaluationService from call_processing.services.tool_wrapper_service import ToolWrapperFactory from call_processing.utils import get_current_ist_time_str @@ -18,15 +20,24 @@ from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 -from pipecat.frames.frames import Frame, TTSSpeakFrame +from pipecat.frames.frames import ( + CancelFrame, + EndFrame, + ErrorFrame, + Frame, + StopFrame, + TTSSpeakFrame, +) from pipecat.pipeline.parallel_pipeline import ParallelPipeline from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( + AssistantTurnStoppedMessage, LLMContextAggregatorPair, LLMUserAggregatorParams, + UserTurnStoppedMessage, ) from pipecat.processors.filters.function_filter import FunctionFilter from pipecat.processors.transcript_processor import ( @@ -555,6 +566,53 @@ async def run_conversation( # Create transcript processor for language detection transcript = TranscriptProcessor() + # --- Call evaluation: transcript log and stats --- + transcript_log: List[Dict[str, Any]] = [] + call_stats: Dict[str, Any] = { + 'user_turns': 0, + 'assistant_turns': 0, + 'interruption_count': 0, + 'tool_calls_count': 0, + 'language_switch_count': 0, + '_bot_speaking': False, + } + + @context_aggregator.user().event_handler('on_user_turn_started') + async def on_user_turn_started(aggregator, strategy): + if call_stats['_bot_speaking']: + call_stats['interruption_count'] += 1 + + @context_aggregator.user().event_handler('on_user_turn_stopped') + async def on_user_turn_stopped( + aggregator, strategy, message: UserTurnStoppedMessage + ): + call_stats['user_turns'] += 1 + transcript_log.append( + { + 'role': 'user', + 'content': message.content, + 'timestamp': message.timestamp, + } + ) + + @context_aggregator.assistant().event_handler('on_assistant_turn_started') + async def on_assistant_turn_started(aggregator): + call_stats['_bot_speaking'] = True + + @context_aggregator.assistant().event_handler('on_assistant_turn_stopped') + async def on_assistant_turn_stopped( + aggregator, message: AssistantTurnStoppedMessage + ): + call_stats['_bot_speaking'] = False + call_stats['assistant_turns'] += 1 + transcript_log.append( + { + 'role': 'assistant', + 'content': message.content, + 'timestamp': message.timestamp, + } + ) + # Build pipeline components list pipeline_components = [ transport.input(), # Audio input from Twilio @@ -619,6 +677,8 @@ async def on_function_calls_started(service, function_calls): call_names = [fc.function_name for fc in function_calls] if 'detect_and_switch_language' in call_names: return + # Count non-language-switch tool invocations + call_stats['tool_calls_count'] += len(function_calls) current_lang = language_state.get('current_language', 'en') phrases = FILLER_PHRASES.get(current_lang) if not phrases: @@ -626,6 +686,30 @@ async def on_function_calls_started(service, function_calls): phrase = random.choice(phrases) await task.queue_frame(TTSSpeakFrame(phrase)) + @task.event_handler('on_pipeline_finished') + async def on_pipeline_finished(task, frame): + if isinstance(frame, EndFrame): + outcome = 'completed' + elif isinstance(frame, CancelFrame): + outcome = 'cancelled' + elif isinstance(frame, ErrorFrame): + outcome = 'error' + elif isinstance(frame, StopFrame): + outcome = 'stopped' + else: + outcome = 'unknown' + # Pull language switch count from language_state (already tracked there) + call_stats['language_switch_count'] = language_state.get('switch_count', 0) + asyncio.create_task( + CallEvaluationService.record_call_metrics( + call_id=call_id, + agent_config=agent_config, + call_outcome=outcome, + transcript_log=transcript_log, + stats=call_stats, + ) + ) + @transport.event_handler('on_client_connected') async def on_client_connected(transport, client): logger.info(f"Client connected for agent: {agent_config['name']}") From b3c1228a5d8f46d0ef215f5da6e775edcd92f6af Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Mon, 16 Mar 2026 15:48:34 +0530 Subject: [PATCH 6/8] resolved review comments --- .../services/call_evaluation_service.py | 3 ++- .../services/pipecat_service.py | 21 ++++++++++++------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/wavefront/server/apps/call_processing/call_processing/services/call_evaluation_service.py b/wavefront/server/apps/call_processing/call_processing/services/call_evaluation_service.py index 8301f04c..0a96f3d8 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/call_evaluation_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/call_evaluation_service.py @@ -244,7 +244,8 @@ async def _call_azure_llm(prompt: str, config: Dict[str, str]) -> Dict[str, Any] 'response_format': {'type': 'json_object'}, } - async with aiohttp.ClientSession() as session: + timeout = aiohttp.ClientTimeout(total=40, connect=8, sock_read=32) + async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post(url, headers=headers, json=payload) as resp: if resp.status != 200: body = await resp.text() diff --git a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py index 93e582ab..5c3e4bcc 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py @@ -568,6 +568,7 @@ async def run_conversation( # --- Call evaluation: transcript log and stats --- transcript_log: List[Dict[str, Any]] = [] + call_evaluation_tasks: List[asyncio.Task] = [] call_stats: Dict[str, Any] = { 'user_turns': 0, 'assistant_turns': 0, @@ -700,15 +701,17 @@ async def on_pipeline_finished(task, frame): outcome = 'unknown' # Pull language switch count from language_state (already tracked there) call_stats['language_switch_count'] = language_state.get('switch_count', 0) - asyncio.create_task( - CallEvaluationService.record_call_metrics( - call_id=call_id, - agent_config=agent_config, - call_outcome=outcome, - transcript_log=transcript_log, - stats=call_stats, + if ENABLE_TRACING and OTLP_ENDPOINT: + t = asyncio.create_task( + CallEvaluationService.record_call_metrics( + call_id=call_id, + agent_config=agent_config, + call_outcome=outcome, + transcript_log=transcript_log, + stats=call_stats, + ) ) - ) + call_evaluation_tasks.append(t) @transport.event_handler('on_client_connected') async def on_client_connected(transport, client): @@ -732,4 +735,6 @@ async def on_client_disconnected(transport, client): raise finally: await task.cancel() + if call_evaluation_tasks: + await asyncio.gather(*call_evaluation_tasks, return_exceptions=True) logger.info(f"Conversation ended for agent: {agent_config['name']}") From 68b558e894c2c95e6c8fb82634d932c735f05e5a Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Mon, 16 Mar 2026 16:29:04 +0530 Subject: [PATCH 7/8] passing otel parent context to call_evaluation_service - also removed content from turn span to avoid PII in metrics --- .../call_processing/services/call_evaluation_service.py | 7 ++++--- .../call_processing/services/pipecat_service.py | 6 ++++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/wavefront/server/apps/call_processing/call_processing/services/call_evaluation_service.py b/wavefront/server/apps/call_processing/call_processing/services/call_evaluation_service.py index 0a96f3d8..7748ba35 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/call_evaluation_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/call_evaluation_service.py @@ -21,7 +21,7 @@ import aiohttp from call_processing.log.logger import logger -from opentelemetry import trace +from opentelemetry import context as otel_context, trace tracer = trace.get_tracer(__name__) @@ -50,6 +50,7 @@ async def record_call_metrics( call_outcome: str, transcript_log: List[Dict[str, Any]], stats: Dict[str, Any], + parent_context: Optional[otel_context.Context] = None, ) -> None: """ Record call evaluation metrics as an OTel span. @@ -89,6 +90,7 @@ async def record_call_metrics( with tracer.start_as_current_span( 'call.evaluation', + context=parent_context, attributes={ 'call.id': call_id, 'voice_agent.id': agent_id, @@ -109,14 +111,13 @@ async def record_call_metrics( 'call.total_words_assistant': total_words_assistant, }, ) as span: - # Add one span event per turn for searchable transcript + # Add one span event per turn — no raw content to avoid PII in OTel for entry in transcript_log: content = entry.get('content', '') span.add_event( 'turn', { 'role': entry.get('role', ''), - 'content': content, 'timestamp': entry.get('timestamp', ''), 'word_count': len(content.split()) if content else 0, }, diff --git a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py index 5c3e4bcc..56a276b5 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py @@ -15,6 +15,7 @@ from call_processing.utils import get_current_ist_time_str # Pipecat core imports +from opentelemetry import context as otel_context from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from pipecat.utils.tracing.setup import setup_tracing from pipecat.adapters.schemas.tools_schema import ToolsSchema @@ -702,6 +703,10 @@ async def on_pipeline_finished(task, frame): # Pull language switch count from language_state (already tracked there) call_stats['language_switch_count'] = language_state.get('switch_count', 0) if ENABLE_TRACING and OTLP_ENDPOINT: + # Capture the current OTel context now, while the pipecat span is still + # active. The background task will use this as the parent so call.evaluation + # appears under the same trace rather than as a new root trace. + parent_ctx = otel_context.get_current() t = asyncio.create_task( CallEvaluationService.record_call_metrics( call_id=call_id, @@ -709,6 +714,7 @@ async def on_pipeline_finished(task, frame): call_outcome=outcome, transcript_log=transcript_log, stats=call_stats, + parent_context=parent_ctx, ) ) call_evaluation_tasks.append(t) From 3d6a1d498bd7fff388e5fdfb6cd9a2fb3130d9e3 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Tue, 17 Mar 2026 18:18:20 +0530 Subject: [PATCH 8/8] modified call evaluation prompt --- .../services/call_evaluation_service.py | 91 +++++++++++++------ 1 file changed, 63 insertions(+), 28 deletions(-) diff --git a/wavefront/server/apps/call_processing/call_processing/services/call_evaluation_service.py b/wavefront/server/apps/call_processing/call_processing/services/call_evaluation_service.py index 7748ba35..499839b9 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/call_evaluation_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/call_evaluation_service.py @@ -28,11 +28,13 @@ _EVAL_DIMENSIONS = [ 'goal_completion', 'instruction_adherence', - 'tone_professionalism', - 'naturalness', - 'conciseness', - 'handling_unknowns', - 'language_quality', + 'conversation_quality', + 'response_efficiency', + 'language_handling', + 'turn_management', + 'factual_accuracy', + 'voice_delivery_quality', + 'compliance_safety', ] @@ -191,6 +193,7 @@ def _build_eval_prompt( return f"""You are an expert AI quality assurance evaluator for voice agent conversations. Evaluate the transcript below against the agent's configured objective. +The voice agent may operate in multiple languages including English and regional languages (such as Hindi, Tamil, Malayalam, Kannada, Telugu, Bengali, Marathi, or code-mixed forms like Hinglish or Tanglish). ## Agent Objective (System Prompt) {system_prompt} @@ -198,34 +201,60 @@ def _build_eval_prompt( ## Transcript {transcript_text} -## Evaluation Rubric -Score each dimension from 1 (very poor) to 10 (excellent). Include a brief comment (1-2 sentences). +## Evaluation Process +Before assigning scores, perform the following reasoning internally: +1. Identify the agent's primary objective from the system prompt. +2. Determine whether the conversation successfully progressed toward that objective. +3. Identify any major failures, including: + hallucinated or fabricated information + compliance or policy violations + truncated responses or speech likely to break in TTS + incorrect language usage or unnatural translation + poor turn-taking or interruptions in the conversation +4. Check whether the agent stayed within its allowed scope and handled unknown questions safely. +5. Consider whether the responses would sound natural and complete when spoken aloud. +After completing this reasoning, assign scores for each evaluation dimension. +Do NOT output your reasoning. Only output the final JSON result. -Dimensions: -- goal_completion: Did the agent achieve the conversation's objective as defined in the system prompt? -- instruction_adherence: Did the agent follow all rules, restrictions, persona and format instructions in the system prompt? -- tone_professionalism: Was the tone warm, professional and appropriate to the context? -- naturalness: Did the conversation flow naturally, avoiding robotic, repetitive or scripted-sounding phrasing? -- conciseness: Were responses appropriately brief without sacrificing clarity? Penalise over-verbose responses. -- handling_unknowns: Did the agent gracefully handle questions outside its scope — avoiding hallucination and staying in persona? -- language_quality: Clarity, grammar and vocabulary appropriateness. For multi-language calls, assess the switched language too. +## Evaluation Rubric +Score each dimension from 1 (very poor) to 10 (excellent). +Scoring guidance: +1-3 = major failure +4-5 = below expectations +6-7 = acceptable but flawed +8-9 = strong performance +10 = near perfect execution + +## Dimensions +- goal_completion: Did the agent successfully achieve the objective defined in the system prompt? +- instruction_adherence: Did the agent follow all system prompt rules, restrictions, persona guidelines, and behavioural instructions? +- conversation_quality: Did the conversation flow naturally with a professional, clear, and human-like tone suitable for spoken interaction? +- response_efficiency: Were responses concise, relevant, and free from unnecessary repetition or verbosity? +- language_handling: Did the agent correctly understand and respond in the user's language with fluency and clarity, including appropriate grammar, vocabulary, and switching between languages such as English, Hindi, Tamil, Malayalam, Kannada, Telugu, Bengali, Marathi, or mixed forms like Hinglish? +- turn_management: Did the agent manage conversation turns effectively without interrupting the user, missing responses, causing awkward pauses, or breaking conversational flow? +- factual_accuracy: Did the agent avoid hallucinating or fabricating information and stay within its knowledge boundaries? +- voice_delivery_quality: Were responses suitable for spoken delivery without truncation, abrupt endings, incomplete words, or formatting that could break TTS playback? +- compliance_safety: Did the agent avoid sharing restricted information, violating policies, or creating regulatory risks? ## Output Format Respond ONLY with a valid JSON object in this exact structure: {{ - "overall_rating": , - "summary": "<2-3 sentence summary of the conversation>", + "overall_rating": , + "detected_languages": ["", "..."], "dimensions": {{ - "goal_completion": {{"score": , "comment": ""}}, - "instruction_adherence": {{"score": , "comment": ""}}, - "tone_professionalism": {{"score": , "comment": ""}}, - "naturalness": {{"score": , "comment": ""}}, - "conciseness": {{"score": , "comment": ""}}, - "handling_unknowns": {{"score": , "comment": ""}}, - "language_quality": {{"score": , "comment": ""}} + "goal_completion": {{"score": }}, + "instruction_adherence": {{"score": }}, + "conversation_quality": {{"score": }}, + "response_efficiency": {{"score": }}, + "language_handling": {{"score": }}, + "turn_management": {{"score": }}, + "factual_accuracy": {{"score": }}, + "voice_delivery_quality": {{"score": }}, + "compliance_safety": {{"score": }} }}, - "strengths": ["", ...], - "improvement_areas": ["", ...] + "strengths": ["", "..."], + "improvement_areas": ["", "..."], + "failure_tags": [""] }}""" @staticmethod @@ -264,13 +293,15 @@ def _apply_analysis_to_span(span: Any, analysis: Dict[str, Any]) -> None: span.set_attribute( 'eval.overall_rating', int(analysis.get('overall_rating', 0)) ) - span.set_attribute('eval.summary', str(analysis.get('summary', ''))) + span.set_attribute( + 'eval.detected_languages', + [str(lang) for lang in analysis.get('detected_languages', [])], + ) dimensions = analysis.get('dimensions', {}) for dim in _EVAL_DIMENSIONS: dim_data = dimensions.get(dim, {}) span.set_attribute(f'eval.{dim}', int(dim_data.get('score', 0))) - span.set_attribute(f'eval.{dim}_comment', str(dim_data.get('comment', ''))) span.set_attribute( 'eval.strengths', [str(s) for s in analysis.get('strengths', [])] @@ -279,3 +310,7 @@ def _apply_analysis_to_span(span: Any, analysis: Dict[str, Any]) -> None: 'eval.improvement_areas', [str(a) for a in analysis.get('improvement_areas', [])], ) + span.set_attribute( + 'eval.failure_tags', + [str(t) for t in analysis.get('failure_tags', [])], + )