💡 Proposed PraisonAI Agents Implementation
1. Granular Token Tracking
from dataclasses import dataclass
from praisonaiagents import Agent, Process
New metrics classes
@DataClass
class TokenMetrics:
"""Comprehensive token tracking"""
input_tokens: int = 0
output_tokens: int = 0
total_tokens: int = 0
# Special tokens
audio_tokens: int = 0
input_audio_tokens: int = 0
output_audio_tokens: int = 0
cached_tokens: int = 0
cache_write_tokens: int = 0
reasoning_tokens: int = 0
def __add__(self, other):
"""Enable metric aggregation"""
return TokenMetrics(
input_tokens=self.input_tokens + other.input_tokens,
output_tokens=self.output_tokens + other.output_tokens,
# ... aggregate all fields
)
Client usage
agent = Agent(
name="Assistant",
model="gpt-4o",
track_metrics=True # Enable detailed metrics
)
Automatic token tracking
response = agent.chat("Analyze this audio file", audio_file="speech.mp3")
Access detailed metrics
print(f"Text tokens: {response.metrics.input_tokens}")
print(f"Audio tokens: {response.metrics.audio_tokens}")
print(f"Cached tokens: {response.metrics.cached_tokens}")
Session-level aggregation
process = Process(agents=[agent])
result = process.run()
print(f"Total session tokens: {result.metrics.total_tokens}")
2. Time-to-First-Token (TTFT) Tracking
from praisonaiagents import Agent
from praisonaiagents.metrics import PerformanceMetrics
TTFT automatically tracked for streaming
agent = Agent(
name="StreamingBot",
model="claude-3",
stream=True,
track_performance=True
)
Stream with TTFT tracking
for chunk in agent.stream("Generate a poem"):
print(chunk, end="")
Access performance metrics
metrics = agent.last_metrics
print(f"\nTime to first token: {metrics.time_to_first_token:.3f}s")
print(f"Total generation time: {metrics.total_time:.3f}s")
print(f"Tokens per second: {metrics.tokens_per_second:.1f}")
3. Session-Level Metric Aggregation
from praisonaiagents import Agent, Task, Process
from praisonaiagents.metrics import MetricsCollector
Automatic session aggregation
collector = MetricsCollector()
agent1 = Agent(name="Researcher", metrics_collector=collector)
agent2 = Agent(name="Writer", metrics_collector=collector)
process = Process(
agents=[agent1, agent2],
metrics_collector=collector
)
result = process.run()
Get aggregated metrics
session_metrics = collector.get_session_metrics()
print(f"Total tokens used: {session_metrics.total_tokens}")
print(f"By agent:")
for agent_name, metrics in session_metrics.by_agent.items():
print(f" {agent_name}: {metrics.total_tokens} tokens")
Export metrics
collector.export_metrics("metrics.json")
4. Enhanced Telemetry Integration
from praisonaiagents import Agent
from praisonaiagents.telemetry import TelemetryConfig, MetricsExporter
Configure telemetry
config = TelemetryConfig(
# Privacy-first defaults
track_tokens=True,
track_performance=True,
track_errors=True,
# Optional integrations
export_to_opentelemetry=True,
export_to_prometheus=False,
# Control via environment
respect_env_vars=True # PRAISONAI_TELEMETRY_DISABLED
)
Option 1: Global configuration
Agent.configure_telemetry(config)
Option 2: Per-agent configuration
agent = Agent(
name="Assistant",
telemetry_config=config
)
Option 3: Custom exporters
class CustomExporter(MetricsExporter):
def export(self, metrics):
# Send to your monitoring system
send_to_datadog(metrics)
agent = Agent(
name="Assistant",
metrics_exporters=[CustomExporter()]
)
Real-time metrics access
with agent.track_metrics() as tracker:
response = agent.chat("Complex task")
# Mid-execution metrics
print(f"Current tokens: {tracker.current_tokens}")
print(f"Elapsed time: {tracker.elapsed_time}")
📋 Implementation Recommendations
1. Phased Rollout
Phase 1: Core Token Metrics
Add to praisonaiagents/metrics.py
class TokenMetrics:
# Basic token tracking
class PerformanceMetrics:
# Time tracking including TTFT
Phase 2: Provider Integration
Update each LLM provider to extract metrics
def _extract_openai_metrics(response):
return TokenMetrics(
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
cached_tokens=response.usage.get('cached_tokens', 0)
)
Phase 3: Aggregation & Export
Add session-level aggregation
class MetricsCollector:
def aggregate_by_agent(self)
def aggregate_by_task(self)
def export_to_json(self)
def export_to_opentelemetry(self)
2. Integration with Existing Telemetry
Extend existing MinimalTelemetry
class EnhancedTelemetry(MinimalTelemetry):
def track_tokens(self, metrics: TokenMetrics):
self._post_event("tokens_used", {
"total": metrics.total_tokens,
"cached": metrics.cached_tokens,
"reasoning": metrics.reasoning_tokens
})
def track_performance(self, metrics: PerformanceMetrics):
self._post_event("performance", {
"ttft": metrics.time_to_first_token,
"total_time": metrics.total_time,
"tokens_per_second": metrics.tokens_per_second
})
3. Backward Compatibility
Maintain simple API while adding advanced features
agent = Agent(name="Bot") # Works as before
Opt-in to enhanced monitoring
agent = Agent(
name="Bot",
track_metrics=True # New feature
)
@claude
monitoring