From 2706e393a3d5ed11e33bad962ce35c177018bf7e Mon Sep 17 00:00:00 2001 From: Jonathan Conway Date: Tue, 17 Feb 2026 18:11:42 +0000 Subject: [PATCH] feat: add RAG pipeline analytics endpoint and dashboard tab Add /v1/llm/rag endpoint with DuckDB queries over rag.* metadata on retrieval spans. Dashboard gets a RAG tab with stat cards (total retrievals, avg relevance, context utilization), source table with latency percentiles, and relevance distribution. Trace detail view shows RAG metadata inline on retrieval spans. - Migration 021: span_type index for efficient retrieval span filtering - 3 DuckDB SQL queries: overview, hourly metrics, relevance summary - Uses json_extract with quoted key notation for dot-containing keys - 3 new integration tests + E2E seed data and spec updates --- migrations/021_llm_rag_index.sql | 3 + src/dashboard.html | 120 ++++++++++ src/llm_tracing/query.rs | 85 +++++++ src/llm_tracing/query_handler.rs | 91 ++++++++ src/llm_tracing/types.rs | 49 ++++ src/main.rs | 1 + src/storage/migrations.rs | 2 + tests/e2e/llm-dashboard.spec.ts | 18 +- tests/e2e/seed.ts | 43 +++- tests/llm_tracing_test.rs | 374 +++++++++++++++++++++++++++++++ 10 files changed, 782 insertions(+), 4 deletions(-) create mode 100644 migrations/021_llm_rag_index.sql diff --git a/migrations/021_llm_rag_index.sql b/migrations/021_llm_rag_index.sql new file mode 100644 index 0000000..bd39b56 --- /dev/null +++ b/migrations/021_llm_rag_index.sql @@ -0,0 +1,3 @@ +-- Speed up RAG / span-type analytics queries +CREATE INDEX IF NOT EXISTS idx_llm_spans_type + ON llm_spans(project_id, span_type, started_at DESC); diff --git a/src/dashboard.html b/src/dashboard.html index 08a797b..6c3578f 100644 --- a/src/dashboard.html +++ b/src/dashboard.html @@ -1345,6 +1345,7 @@

LLM Tracing

+
Select a tab to view LLM data
@@ -3164,6 +3165,7 @@

search: loadLlmSearch, prompts: loadLlmPrompts, scores: loadLlmScores, + rag: loadLlmRag, }; if (loaders[tab]) loaders[tab](); } @@ -3429,6 +3431,21 @@

s.time_to_first_token_ms ? el('span', {}, 'TTFT: ' + s.time_to_first_token_ms + 'ms') : null, ].filter(Boolean)), ]); + // RAG metadata for retrieval spans + if (s.span_type === 'retrieval' && s.metadata) { + const m = s.metadata; + const hasRag = m['rag.chunks_retrieved'] != null || m['rag.source'] != null; + if (hasRag) { + const ragRow = el('div', { style: 'display:flex;gap:12px;font-size:10px;color:var(--text-dim);margin-top:6px;padding-top:6px;border-top:1px solid var(--border-subtle);' }, [ + m['rag.source'] ? el('span', {}, 'Source: ' + m['rag.source']) : null, + m['rag.chunks_used'] != null && m['rag.chunks_retrieved'] != null ? el('span', {}, 'Chunks: ' + m['rag.chunks_used'] + '/' + m['rag.chunks_retrieved']) : null, + m['rag.context_tokens'] != null ? el('span', {}, 'Context: ' + m['rag.context_tokens'] + ' tokens') : null, + m['rag.max_context_tokens'] != null && m['rag.context_tokens'] != null ? el('span', {}, 'Utilization: ' + ((m['rag.context_tokens'] / m['rag.max_context_tokens']) * 100).toFixed(1) + '%') : null, + m['rag.relevance_scores'] && m['rag.relevance_scores'].length ? el('span', {}, 'Top relevance: ' + m['rag.relevance_scores'][0].toFixed(3)) : null, + ].filter(Boolean)); + card.appendChild(ragRow); + } + } content.appendChild(card); }); } @@ -3542,6 +3559,109 @@

} } +// ── LLM RAG Tab ── +async function loadLlmRag() { + const content = document.getElementById('llmContent'); + try { + const pp = projectParam(); + const r = await authFetch(API + '/v1/llm/rag?hours=24' + (pp ? '&' + pp : '')); + if (!r) return; + const d = await r.json(); + content.innerHTML = ''; + + // Stat cards + const totalRetrievals = d.relevance ? d.relevance.total_retrievals : 0; + const avgTopRelevance = d.relevance ? d.relevance.avg_top_relevance : 0; + const avgChunksRetrieved = d.relevance ? d.relevance.avg_chunks_retrieved : 0; + const chunkUtil = d.relevance ? d.relevance.chunk_utilization_pct : 0; + + if (totalRetrievals === 0) { + content.innerHTML = '
No RAG data in the last ' + d.window_hours + ' hours
'; + return; + } + + const cards = el('div', { style: 'display:grid;grid-template-columns:repeat(4,1fr);gap:8px;margin-bottom:16px;' }, [ + el('div', { style: 'background:var(--surface-raised);border:1px solid var(--border);border-radius:8px;padding:12px;text-align:center;' }, [ + el('div', { style: 'font-size:18px;font-weight:700;color:var(--text);' }, String(totalRetrievals)), + el('div', { style: 'font-size:10px;color:var(--text-dim);margin-top:2px;' }, 'Total Retrievals'), + ]), + el('div', { style: 'background:var(--surface-raised);border:1px solid var(--border);border-radius:8px;padding:12px;text-align:center;' }, [ + el('div', { style: 'font-size:18px;font-weight:700;color:var(--text);' }, avgTopRelevance.toFixed(3)), + el('div', { style: 'font-size:10px;color:var(--text-dim);margin-top:2px;' }, 'Avg Top Relevance'), + ]), + el('div', { style: 'background:var(--surface-raised);border:1px solid var(--border);border-radius:8px;padding:12px;text-align:center;' }, [ + el('div', { style: 'font-size:18px;font-weight:700;color:var(--text);' }, avgChunksRetrieved.toFixed(1)), + el('div', { style: 'font-size:10px;color:var(--text-dim);margin-top:2px;' }, 'Avg Chunks Retrieved'), + ]), + el('div', { style: 'background:var(--surface-raised);border:1px solid var(--border);border-radius:8px;padding:12px;text-align:center;' }, [ + el('div', { style: 'font-size:18px;font-weight:700;color:' + (chunkUtil > 80 ? 'var(--red-text)' : 'var(--text)') + ';' }, chunkUtil.toFixed(1) + '%'), + el('div', { style: 'font-size:10px;color:var(--text-dim);margin-top:2px;' }, 'Chunk Utilization'), + ]), + ]); + content.appendChild(cards); + + // Source table + if (d.sources && d.sources.length) { + const table = el('table', { style: 'width:100%;border-collapse:collapse;font-size:11px;' }); + const thead = el('tr', {}, [ + el('th', { style: 'text-align:left;padding:6px 8px;color:var(--text-dim);font-weight:500;border-bottom:1px solid var(--border);' }, 'Name'), + el('th', { style: 'text-align:left;padding:6px 8px;color:var(--text-dim);font-weight:500;border-bottom:1px solid var(--border);' }, 'Source'), + el('th', { style: 'text-align:right;padding:6px 8px;color:var(--text-dim);font-weight:500;border-bottom:1px solid var(--border);' }, 'Calls'), + el('th', { style: 'text-align:right;padding:6px 8px;color:var(--text-dim);font-weight:500;border-bottom:1px solid var(--border);' }, 'Avg Latency'), + el('th', { style: 'text-align:right;padding:6px 8px;color:var(--text-dim);font-weight:500;border-bottom:1px solid var(--border);' }, 'P95'), + el('th', { style: 'text-align:right;padding:6px 8px;color:var(--text-dim);font-weight:500;border-bottom:1px solid var(--border);' }, 'Chunks'), + el('th', { style: 'text-align:right;padding:6px 8px;color:var(--text-dim);font-weight:500;border-bottom:1px solid var(--border);' }, 'Ctx Tokens'), + el('th', { style: 'text-align:right;padding:6px 8px;color:var(--text-dim);font-weight:500;border-bottom:1px solid var(--border);' }, 'Utilization'), + el('th', { style: 'text-align:right;padding:6px 8px;color:var(--text-dim);font-weight:500;border-bottom:1px solid var(--border);' }, 'Error Rate'), + ]); + table.appendChild(thead); + d.sources.forEach(s => { + const utilPct = s.avg_context_utilization_pct || 0; + table.appendChild(el('tr', {}, [ + el('td', { style: 'padding:6px 8px;color:var(--text);font-weight:600;border-bottom:1px solid var(--border-subtle);' }, s.retrieval_name || '-'), + el('td', { style: 'padding:6px 8px;color:var(--text-dim);border-bottom:1px solid var(--border-subtle);' }, s.source || '-'), + el('td', { style: 'text-align:right;padding:6px 8px;color:var(--text);font-family:var(--mono);border-bottom:1px solid var(--border-subtle);' }, String(s.call_count)), + el('td', { style: 'text-align:right;padding:6px 8px;color:var(--text);font-family:var(--mono);border-bottom:1px solid var(--border-subtle);' }, s.avg_latency_ms.toFixed(0) + 'ms'), + el('td', { style: 'text-align:right;padding:6px 8px;color:var(--text);font-family:var(--mono);border-bottom:1px solid var(--border-subtle);' }, s.p95_latency_ms.toFixed(0) + 'ms'), + el('td', { style: 'text-align:right;padding:6px 8px;color:var(--text);font-family:var(--mono);border-bottom:1px solid var(--border-subtle);' }, s.avg_chunks_retrieved.toFixed(1)), + el('td', { style: 'text-align:right;padding:6px 8px;color:var(--text);font-family:var(--mono);border-bottom:1px solid var(--border-subtle);' }, s.avg_context_tokens.toFixed(0)), + el('td', { style: 'text-align:right;padding:6px 8px;font-family:var(--mono);border-bottom:1px solid var(--border-subtle);color:' + (utilPct > 80 ? 'var(--red-text)' : 'var(--text)') + ';' }, utilPct.toFixed(1) + '%'), + el('td', { style: 'text-align:right;padding:6px 8px;font-family:var(--mono);border-bottom:1px solid var(--border-subtle);' + (s.error_rate > 0 ? 'color:var(--red-text);' : 'color:var(--text-dim);') }, s.error_rate.toFixed(1) + '%'), + ])); + }); + content.appendChild(table); + } + + // Relevance summary bar + if (d.relevance && d.relevance.total_retrievals > 0) { + const rel = d.relevance; + const pct = Math.max(0, Math.min(100, rel.avg_top_relevance * 100)); + const barColor = pct >= 70 ? 'var(--green, #4ade80)' : pct >= 40 ? 'var(--yellow, #facc15)' : 'var(--red-text, #f87171)'; + const relCard = el('div', { style: 'background:var(--surface-raised);border:1px solid var(--border);border-radius:8px;padding:14px;margin-top:12px;' }, [ + el('div', { style: 'font-size:11px;color:var(--text-dim);text-transform:uppercase;letter-spacing:0.05em;margin-bottom:8px;' }, 'Relevance Distribution'), + el('div', { style: 'display:flex;justify-content:space-between;font-size:10px;color:var(--text-dim);margin-bottom:4px;' }, [ + el('span', {}, 'Avg: ' + rel.avg_top_relevance.toFixed(3)), + el('span', {}, 'Min: ' + rel.min_top_relevance.toFixed(3) + ' / Max: ' + rel.max_top_relevance.toFixed(3)), + ]), + (() => { + const bar = el('div', { style: 'width:100%;height:8px;background:var(--border);border-radius:4px;overflow:hidden;' }); + const fill = el('div', { style: 'height:100%;width:' + pct + '%;background:' + barColor + ';border-radius:4px;' }); + bar.appendChild(fill); + return bar; + })(), + el('div', { style: 'display:flex;gap:16px;font-size:10px;color:var(--text-dim);margin-top:8px;' }, [ + el('span', {}, 'Avg chunks retrieved: ' + rel.avg_chunks_retrieved.toFixed(1)), + el('span', {}, 'Avg chunks used: ' + rel.avg_chunks_used.toFixed(1)), + el('span', {}, 'Chunk utilization: ' + rel.chunk_utilization_pct.toFixed(1) + '%'), + ]), + ]); + content.appendChild(relCard); + } + } catch (e) { + content.innerHTML = '
Failed to load RAG data
'; + } +} + // ── LLM Scores Tab ── async function loadLlmScores() { const content = document.getElementById('llmContent'); diff --git a/src/llm_tracing/query.rs b/src/llm_tracing/query.rs index 672a4b1..39209f5 100644 --- a/src/llm_tracing/query.rs +++ b/src/llm_tracing/query.rs @@ -280,3 +280,88 @@ GROUP BY prompt_name ORDER BY total_traces DESC LIMIT $3 "#; + +/// RAG source overview: per-source aggregates for retrieval spans with rag.* metadata. +/// Parameters: $1 = since_ms, $2 = project_id (or NULL), $3 = limit +pub const RAG_OVERVIEW_SQL: &str = r#" +SELECT + COALESCE(name, '') AS retrieval_name, + COALESCE(json_extract_string(metadata, '$."rag.source"'), 'unknown') AS source, + COUNT(*) AS call_count, + COALESCE(AVG(latency_ms), 0) AS avg_latency_ms, + COALESCE(PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY latency_ms), 0) AS p50_latency_ms, + COALESCE(PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY latency_ms), 0) AS p95_latency_ms, + SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) AS error_count, + CASE WHEN COUNT(*) > 0 + THEN (SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) * 100.0 / COUNT(*)) + ELSE 0.0 END AS error_rate, + COALESCE(AVG(CAST(json_extract(metadata, '$."rag.chunks_retrieved"') AS DOUBLE)), 0) AS avg_chunks_retrieved, + COALESCE(AVG(CAST(json_extract(metadata, '$."rag.chunks_used"') AS DOUBLE)), 0) AS avg_chunks_used, + COALESCE(AVG(CAST(json_extract(metadata, '$."rag.context_tokens"') AS DOUBLE)), 0) AS avg_context_tokens, + COALESCE(AVG( + CASE WHEN CAST(json_extract(metadata, '$."rag.max_context_tokens"') AS DOUBLE) > 0 + THEN CAST(json_extract(metadata, '$."rag.context_tokens"') AS DOUBLE) * 100.0 + / CAST(json_extract(metadata, '$."rag.max_context_tokens"') AS DOUBLE) + ELSE NULL END + ), 0) AS avg_context_utilization_pct +FROM bloop.llm_spans +WHERE started_at >= $1 + AND ($2 IS NULL OR project_id = $2) + AND span_type = 'retrieval' + AND metadata IS NOT NULL + AND json_extract(metadata, '$."rag.chunks_retrieved"') IS NOT NULL +GROUP BY name, json_extract_string(metadata, '$."rag.source"') +ORDER BY call_count DESC +LIMIT $3 +"#; + +/// RAG hourly metrics: time-series for retrieval span activity. +/// Parameters: $1 = since_ms, $2 = project_id (or NULL) +pub const RAG_METRICS_SQL: &str = r#" +SELECT + (started_at / 3600000) * 3600000 AS hour_bucket, + COUNT(*) AS retrieval_count, + COALESCE(AVG(latency_ms), 0) AS avg_latency_ms, + SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) AS error_count, + COALESCE(AVG(CAST(json_extract(metadata, '$."rag.chunks_retrieved"') AS DOUBLE)), 0) AS avg_chunks_retrieved, + COALESCE(AVG(CAST(json_extract(metadata, '$."rag.context_tokens"') AS DOUBLE)), 0) AS avg_context_tokens, + COALESCE(AVG( + CASE WHEN CAST(json_extract(metadata, '$."rag.max_context_tokens"') AS DOUBLE) > 0 + THEN CAST(json_extract(metadata, '$."rag.context_tokens"') AS DOUBLE) * 100.0 + / CAST(json_extract(metadata, '$."rag.max_context_tokens"') AS DOUBLE) + ELSE NULL END + ), 0) AS avg_context_utilization_pct, + COALESCE(AVG(CAST(json_extract(metadata, '$."rag.top_k"') AS DOUBLE)), 0) AS avg_top_k +FROM bloop.llm_spans +WHERE started_at >= $1 + AND ($2 IS NULL OR project_id = $2) + AND span_type = 'retrieval' + AND metadata IS NOT NULL + AND json_extract(metadata, '$."rag.chunks_retrieved"') IS NOT NULL +GROUP BY (started_at / 3600000) * 3600000 +ORDER BY hour_bucket DESC +"#; + +/// RAG relevance summary: aggregate relevance score stats across retrieval spans. +/// Parameters: $1 = since_ms, $2 = project_id (or NULL) +pub const RAG_RELEVANCE_SQL: &str = r#" +SELECT + COUNT(*) AS total_retrievals, + COALESCE(AVG(CAST(json_extract(metadata, '$."rag.relevance_scores"[0]') AS DOUBLE)), 0) AS avg_top_relevance, + COALESCE(MIN(CAST(json_extract(metadata, '$."rag.relevance_scores"[0]') AS DOUBLE)), 0) AS min_top_relevance, + COALESCE(MAX(CAST(json_extract(metadata, '$."rag.relevance_scores"[0]') AS DOUBLE)), 0) AS max_top_relevance, + COALESCE(AVG(CAST(json_extract(metadata, '$."rag.chunks_retrieved"') AS DOUBLE)), 0) AS avg_chunks_retrieved, + COALESCE(AVG(CAST(json_extract(metadata, '$."rag.chunks_used"') AS DOUBLE)), 0) AS avg_chunks_used, + COALESCE(AVG( + CASE WHEN CAST(json_extract(metadata, '$."rag.chunks_retrieved"') AS DOUBLE) > 0 + THEN CAST(json_extract(metadata, '$."rag.chunks_used"') AS DOUBLE) * 100.0 + / CAST(json_extract(metadata, '$."rag.chunks_retrieved"') AS DOUBLE) + ELSE NULL END + ), 0) AS chunk_utilization_pct +FROM bloop.llm_spans +WHERE started_at >= $1 + AND ($2 IS NULL OR project_id = $2) + AND span_type = 'retrieval' + AND metadata IS NOT NULL + AND json_extract(metadata, '$."rag.chunks_retrieved"') IS NOT NULL +"#; diff --git a/src/llm_tracing/query_handler.rs b/src/llm_tracing/query_handler.rs index 6e87f9a..ca4a91f 100644 --- a/src/llm_tracing/query_handler.rs +++ b/src/llm_tracing/query_handler.rs @@ -858,3 +858,94 @@ pub async fn update_settings( content_storage: policy.as_str().to_string(), })) } + +/// GET /v1/llm/rag +pub async fn rag( + State(state): State>, + token_auth: Option>, + Query(mut qp): Query, +) -> AppResult> { + qp.project_id = resolve_project(&token_auth, qp.project_id)?; + let hours = qp.hours(); + let limit = qp.limit(); + let now_ms = chrono::Utc::now().timestamp_millis(); + let since = now_ms - (hours * 3_600_000); + + // Sources query + let pid1 = qp.project_id.clone(); + let sources = state + .conn + .query(move |conn: &duckdb::Connection| { + let mut stmt = conn.prepare(query::RAG_OVERVIEW_SQL)?; + let rows = stmt.query_map(params![since, pid1, limit], |row| { + Ok(RagSourceEntry { + retrieval_name: row.get(0)?, + source: row.get(1)?, + call_count: row.get(2)?, + avg_latency_ms: row.get(3)?, + p50_latency_ms: row.get(4)?, + p95_latency_ms: row.get(5)?, + error_count: row.get(6)?, + error_rate: row.get(7)?, + avg_chunks_retrieved: row.get(8)?, + avg_chunks_used: row.get(9)?, + avg_context_tokens: row.get(10)?, + avg_context_utilization_pct: row.get(11)?, + }) + })?; + rows.collect::, _>>() + }) + .await + .map_err(AppError::LlmTracing)?; + + // Metrics query + let pid2 = qp.project_id.clone(); + let metrics = state + .conn + .query(move |conn: &duckdb::Connection| { + let mut stmt = conn.prepare(query::RAG_METRICS_SQL)?; + let rows = stmt.query_map(params![since, pid2], |row| { + Ok(RagMetricsEntry { + hour_bucket: row.get(0)?, + retrieval_count: row.get(1)?, + avg_latency_ms: row.get(2)?, + error_count: row.get(3)?, + avg_chunks_retrieved: row.get(4)?, + avg_context_tokens: row.get(5)?, + avg_context_utilization_pct: row.get(6)?, + avg_top_k: row.get(7)?, + }) + })?; + rows.collect::, _>>() + }) + .await + .map_err(AppError::LlmTracing)?; + + // Relevance query + let pid3 = qp.project_id.clone(); + let relevance = state + .conn + .query(move |conn: &duckdb::Connection| { + let mut stmt = conn.prepare(query::RAG_RELEVANCE_SQL)?; + stmt.query_row(params![since, pid3], |row| { + Ok(RagRelevanceSummary { + total_retrievals: row.get(0)?, + avg_top_relevance: row.get(1)?, + min_top_relevance: row.get(2)?, + max_top_relevance: row.get(3)?, + avg_chunks_retrieved: row.get(4)?, + avg_chunks_used: row.get(5)?, + chunk_utilization_pct: row.get(6)?, + }) + }) + }) + .await + .map_err(AppError::LlmTracing)?; + + Ok(Json(RagResponse { + sources, + metrics, + relevance, + window_hours: hours, + })) +} diff --git a/src/llm_tracing/types.rs b/src/llm_tracing/types.rs index 0ca576b..a6fbbfd 100644 --- a/src/llm_tracing/types.rs +++ b/src/llm_tracing/types.rs @@ -583,3 +583,52 @@ pub struct BudgetResponse { pub days_elapsed: i64, pub days_remaining: i64, } + +// ── RAG Types ── + +#[derive(Debug, Serialize)] +pub struct RagSourceEntry { + pub retrieval_name: String, + pub source: String, + pub call_count: i64, + pub avg_latency_ms: f64, + pub p50_latency_ms: f64, + pub p95_latency_ms: f64, + pub error_count: i64, + pub error_rate: f64, + pub avg_chunks_retrieved: f64, + pub avg_chunks_used: f64, + pub avg_context_tokens: f64, + pub avg_context_utilization_pct: f64, +} + +#[derive(Debug, Serialize)] +pub struct RagMetricsEntry { + pub hour_bucket: i64, + pub retrieval_count: i64, + pub avg_latency_ms: f64, + pub error_count: i64, + pub avg_chunks_retrieved: f64, + pub avg_context_tokens: f64, + pub avg_context_utilization_pct: f64, + pub avg_top_k: f64, +} + +#[derive(Debug, Serialize)] +pub struct RagRelevanceSummary { + pub total_retrievals: i64, + pub avg_top_relevance: f64, + pub min_top_relevance: f64, + pub max_top_relevance: f64, + pub avg_chunks_retrieved: f64, + pub avg_chunks_used: f64, + pub chunk_utilization_pct: f64, +} + +#[derive(Debug, Serialize)] +pub struct RagResponse { + pub sources: Vec, + pub metrics: Vec, + pub relevance: RagRelevanceSummary, + pub window_hours: i64, +} diff --git a/src/main.rs b/src/main.rs index c90b34d..a194021 100644 --- a/src/main.rs +++ b/src/main.rs @@ -637,6 +637,7 @@ async fn main() -> Result<(), Box> { get(llm_tracing::query_handler::session_detail), ) .route("/v1/llm/tools", get(llm_tracing::query_handler::tools)) + .route("/v1/llm/rag", get(llm_tracing::query_handler::rag)) .route( "/v1/llm/settings", get(llm_tracing::query_handler::get_settings), diff --git a/src/storage/migrations.rs b/src/storage/migrations.rs index 2dcbd47..cc5be06 100644 --- a/src/storage/migrations.rs +++ b/src/storage/migrations.rs @@ -20,6 +20,7 @@ const MIGRATION_017: &str = include_str!("../../migrations/017_pricing_overrides const MIGRATION_018: &str = include_str!("../../migrations/018_llm_sessions.sql"); const MIGRATION_019: &str = include_str!("../../migrations/019_llm_feedback.sql"); const MIGRATION_020: &str = include_str!("../../migrations/020_llm_budgets.sql"); +const MIGRATION_021: &str = include_str!("../../migrations/021_llm_rag_index.sql"); pub fn run_migrations(conn: &Connection) -> rusqlite::Result<()> { // Create migrations tracking table @@ -53,6 +54,7 @@ pub fn run_migrations(conn: &Connection) -> rusqlite::Result<()> { (18, "018_llm_sessions", MIGRATION_018), (19, "019_llm_feedback", MIGRATION_019), (20, "020_llm_budgets", MIGRATION_020), + (21, "021_llm_rag_index", MIGRATION_021), ]; for &(id, name, sql) in migrations { diff --git a/tests/e2e/llm-dashboard.spec.ts b/tests/e2e/llm-dashboard.spec.ts index c633d1d..9674bb5 100644 --- a/tests/e2e/llm-dashboard.spec.ts +++ b/tests/e2e/llm-dashboard.spec.ts @@ -44,10 +44,10 @@ test.describe('LLM Dashboard', () => { await openLlmPanel(page); const tabs = page.locator('#llmPanel .insights-tab'); const count = await tabs.count(); - expect(count).toBeGreaterThanOrEqual(8); + expect(count).toBeGreaterThanOrEqual(9); const names = await tabs.allTextContents(); - // These 8 tabs are always present - for (const expected of ['Overview', 'Usage', 'Latency', 'Models', 'Traces', 'Search', 'Prompts', 'Scores']) { + // These 9 tabs are always present + for (const expected of ['Overview', 'Usage', 'Latency', 'Models', 'Traces', 'Search', 'Prompts', 'Scores', 'RAG']) { expect(names).toContain(expected); } }); @@ -184,6 +184,18 @@ test.describe('LLM Dashboard', () => { } }); + test('RAG tab — renders stat cards or empty state', async ({ page }) => { + await openLlmPanel(page); + await switchTab(page, 'RAG'); + const content = page.locator('#llmContent'); + await expect(content).not.toContainText('Failed to load'); + // Should show either RAG data or the empty state + const text = await content.textContent(); + const hasData = text?.includes('Total Retrievals'); + const isEmpty = text?.includes('No RAG data'); + expect(hasData || isEmpty).toBeTruthy(); + }); + test('Search tab — empty query shows placeholder', async ({ page }) => { await openLlmPanel(page); await switchTab(page, 'Search'); diff --git a/tests/e2e/seed.ts b/tests/e2e/seed.ts index 40bb5b8..2e47d06 100644 --- a/tests/e2e/seed.ts +++ b/tests/e2e/seed.ts @@ -166,7 +166,48 @@ export async function seed(config: SeedConfig) { ], }); - console.log('Seed data ingested: 4 traces, 6 spans'); + // ── Trace 5: RAG pipeline trace with retrieval span + rag.* metadata ── + await hmacPost(config, '/v1/traces', { + id: 'trace-e2e-005', + name: 'rag-pipeline', + status: 'completed', + session_id: 'session-e2e-1', + user_id: 'e2e-user', + spans: [ + { + id: 'span-e2e-rag-gen', + span_type: 'generation', + name: 'rag-generation', + model: 'gpt-4o', + provider: 'openai', + input_tokens: 3200, + output_tokens: 500, + cost: 0.013, + latency_ms: 2000, + status: 'ok', + }, + { + id: 'span-e2e-rag-retrieval', + parent_span_id: 'span-e2e-rag-gen', + span_type: 'retrieval', + name: 'vector_search', + latency_ms: 120, + status: 'ok', + metadata: { + 'rag.source': 'pinecone', + 'rag.chunks_retrieved': 10, + 'rag.chunks_used': 5, + 'rag.context_tokens': 3200, + 'rag.max_context_tokens': 8192, + 'rag.relevance_scores': [0.95, 0.87, 0.82, 0.71, 0.65], + 'rag.top_k': 10, + 'rag.query': 'How does quantum computing work?', + }, + }, + ], + }); + + console.log('Seed data ingested: 5 traces, 8 spans'); } /** diff --git a/tests/llm_tracing_test.rs b/tests/llm_tracing_test.rs index e630e81..472dee3 100644 --- a/tests/llm_tracing_test.rs +++ b/tests/llm_tracing_test.rs @@ -2504,3 +2504,377 @@ async fn test_budget_validation() { .unwrap(); assert_eq!(resp.status(), 400); } + +// ── RAG Tests ── + +/// SQLite-backed RAG endpoint for tests (no DuckDB). +async fn rag_sqlite( + axum::extract::State(pool): axum::extract::State>, + axum::extract::Query(qp): axum::extract::Query, +) -> bloop::error::AppResult> { + let hours = qp.hours(); + let limit = qp.limit(); + let now_ms = chrono::Utc::now().timestamp_millis(); + let since = now_ms - (hours * 3_600_000); + + let conn = pool + .get() + .await + .map_err(|e| bloop::error::AppError::Internal(format!("pool: {e}")))?; + + let result = conn + .interact(move |conn| { + // Sources: aggregate retrieval spans with rag.* metadata + let mut stmt = conn.prepare( + "SELECT + name, + json_extract(metadata, '$.\"rag.source\"') AS source, + COUNT(*) AS call_count, + COALESCE(AVG(latency_ms), 0) AS avg_latency_ms, + SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) AS error_count, + CASE WHEN COUNT(*) > 0 + THEN (CAST(SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) AS REAL) * 100.0 / COUNT(*)) + ELSE 0.0 END AS error_rate, + COALESCE(AVG(json_extract(metadata, '$.\"rag.chunks_retrieved\"')), 0) AS avg_chunks_retrieved, + COALESCE(AVG(json_extract(metadata, '$.\"rag.chunks_used\"')), 0) AS avg_chunks_used, + COALESCE(AVG(json_extract(metadata, '$.\"rag.context_tokens\"')), 0) AS avg_context_tokens + FROM llm_spans + WHERE started_at >= ?1 + AND span_type = 'retrieval' + AND metadata IS NOT NULL + AND json_extract(metadata, '$.\"rag.chunks_retrieved\"') IS NOT NULL + GROUP BY name, json_extract(metadata, '$.\"rag.source\"') + ORDER BY call_count DESC + LIMIT ?2", + )?; + let sources: Vec = stmt + .query_map(rusqlite::params![since, limit], |row| { + Ok(serde_json::json!({ + "retrieval_name": row.get::<_, String>(0)?, + "source": row.get::<_, Option>(1)?, + "call_count": row.get::<_, i64>(2)?, + "avg_latency_ms": row.get::<_, f64>(3)?, + "p50_latency_ms": row.get::<_, f64>(3)?, + "p95_latency_ms": row.get::<_, f64>(3)?, + "error_count": row.get::<_, i64>(4)?, + "error_rate": row.get::<_, f64>(5)?, + "avg_chunks_retrieved": row.get::<_, f64>(6)?, + "avg_chunks_used": row.get::<_, f64>(7)?, + "avg_context_tokens": row.get::<_, f64>(8)?, + "avg_context_utilization_pct": 0.0, + })) + })? + .collect::, _>>()?; + + // Relevance summary + let relevance: serde_json::Value = conn.query_row( + "SELECT + COUNT(*) AS total_retrievals, + COALESCE(AVG(json_extract(metadata, '$.\"rag.chunks_retrieved\"')), 0), + COALESCE(AVG(json_extract(metadata, '$.\"rag.chunks_used\"')), 0) + FROM llm_spans + WHERE started_at >= ?1 + AND span_type = 'retrieval' + AND metadata IS NOT NULL + AND json_extract(metadata, '$.\"rag.chunks_retrieved\"') IS NOT NULL", + rusqlite::params![since], + |row| { + let total: i64 = row.get(0)?; + let avg_chunks_retrieved: f64 = row.get(1)?; + let avg_chunks_used: f64 = row.get(2)?; + let chunk_util = if avg_chunks_retrieved > 0.0 { + (avg_chunks_used / avg_chunks_retrieved) * 100.0 + } else { + 0.0 + }; + Ok(serde_json::json!({ + "total_retrievals": total, + "avg_top_relevance": 0.0, + "min_top_relevance": 0.0, + "max_top_relevance": 0.0, + "avg_chunks_retrieved": avg_chunks_retrieved, + "avg_chunks_used": avg_chunks_used, + "chunk_utilization_pct": chunk_util, + })) + }, + )?; + + Ok(serde_json::json!({ + "sources": sources, + "metrics": [], + "relevance": relevance, + "window_hours": hours, + })) + }) + .await + .map_err(|e| bloop::error::AppError::Internal(format!("interact: {e}")))? + .map_err(|e: rusqlite::Error| bloop::error::AppError::Database(e))?; + + Ok(axum::Json(result)) +} + +/// Spawn server with RAG endpoint included. +async fn spawn_llm_server_with_rag() -> (SocketAddr, tokio::task::JoinHandle<()>) { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let db_path = tmp.path().to_path_buf(); + std::mem::forget(tmp); + + let pool = { + let cfg = deadpool_sqlite::Config::new(&db_path); + cfg.create_pool(deadpool_sqlite::Runtime::Tokio1).unwrap() + }; + + { + let conn = pool.get().await.unwrap(); + conn.interact(|conn| { + conn.execute_batch("PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL;") + .unwrap(); + bloop::storage::migrations::run_migrations(conn).unwrap(); + }) + .await + .unwrap(); + } + + let llm_config = bloop::config::LlmTracingConfig { + enabled: true, + channel_capacity: 256, + flush_interval_secs: 1, + flush_batch_size: 50, + max_spans_per_trace: 100, + max_batch_size: 50, + default_content_storage: "full".to_string(), + cache_ttl_secs: 5, + pricing_refresh_interval_secs: 0, + pricing_url: String::new(), + }; + + let (llm_tx, llm_rx) = mpsc::channel(llm_config.channel_capacity); + let worker_pool = pool.clone(); + let worker_config = llm_config.clone(); + tokio::spawn(async move { + bloop::llm_tracing::worker::run_worker(llm_rx, worker_pool, worker_config).await; + }); + + let settings_cache = Arc::new(bloop::llm_tracing::settings::SettingsCache::new( + pool.clone(), + llm_config.cache_ttl_secs, + )); + + let pricing = Arc::new(bloop::llm_tracing::pricing::PricingTable::new()); + + let llm_ingest_state = Arc::new(bloop::llm_tracing::LlmIngestState { + config: llm_config.clone(), + tx: llm_tx, + pool: pool.clone(), + settings_cache: settings_cache.clone(), + pricing: Some(pricing), + }); + + let hmac_secret = bloop::ingest::auth::HmacSecret("test-secret".to_string()); + let hmac_body_limit = bloop::ingest::auth::HmacBodyLimit(32768); + + use axum::extract::DefaultBodyLimit; + use axum::routing::{get, post, put}; + use axum::{middleware, Router}; + + let ingest_routes = Router::new() + .route( + "/v1/traces", + post(bloop::llm_tracing::handler::ingest_trace), + ) + .route( + "/v1/traces/batch", + post(bloop::llm_tracing::handler::ingest_trace_batch), + ) + .route( + "/v1/traces/{trace_id}", + put(bloop::llm_tracing::handler::update_trace), + ) + .layer(DefaultBodyLimit::max(32768)) + .layer(middleware::from_fn(bloop::ingest::auth::hmac_auth)) + .layer(axum::Extension(hmac_secret)) + .layer(axum::Extension(hmac_body_limit)) + .with_state(llm_ingest_state); + + let query_pool = Arc::new(pool.clone()); + let query_routes = Router::new() + .route("/v1/llm/traces/{id}", get(trace_detail_sqlite)) + .route("/v1/llm/rag", get(rag_sqlite)) + .with_state(query_pool); + + let app = ingest_routes.merge(query_routes); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let handle = tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + + (addr, handle) +} + +#[tokio::test] +async fn test_rag_pipeline_analytics() { + let (addr, _handle) = spawn_llm_server_with_rag().await; + let client = reqwest::Client::new(); + let base = format!("http://{addr}"); + + // Ingest a trace with a retrieval span carrying rag.* metadata + let body = serde_json::json!({ + "id": "rag-trace-1", + "name": "rag-pipeline", + "status": "completed", + "spans": [ + { + "id": "rag-gen-1", + "span_type": "generation", + "model": "gpt-4o", + "provider": "openai", + "input_tokens": 3200, + "output_tokens": 500, + "cost": 0.013, + "latency_ms": 2000, + "status": "ok" + }, + { + "id": "rag-ret-1", + "parent_span_id": "rag-gen-1", + "span_type": "retrieval", + "name": "vector_search", + "latency_ms": 120, + "status": "ok", + "metadata": { + "rag.source": "pinecone", + "rag.chunks_retrieved": 10, + "rag.chunks_used": 5, + "rag.context_tokens": 3200, + "rag.max_context_tokens": 8192, + "rag.relevance_scores": [0.95, 0.87, 0.82, 0.71, 0.65], + "rag.top_k": 10 + } + } + ] + }); + + let body_bytes = serde_json::to_vec(&body).unwrap(); + let sig = sign("test-secret", &body_bytes); + + let resp = client + .post(format!("{base}/v1/traces")) + .header("Content-Type", "application/json") + .header("X-Signature", &sig) + .body(body_bytes) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), 200); + + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + // Query RAG endpoint + let resp = client + .get(format!("{base}/v1/llm/rag")) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), 200); + let data: serde_json::Value = resp.json().await.unwrap(); + + // Should have sources + let sources = data["sources"].as_array().unwrap(); + assert_eq!(sources.len(), 1); + assert_eq!(sources[0]["retrieval_name"], "vector_search"); + assert_eq!(sources[0]["source"], "pinecone"); + assert_eq!(sources[0]["call_count"], 1); + + // Should have relevance + let relevance = &data["relevance"]; + assert_eq!(relevance["total_retrievals"], 1); + assert!(relevance["avg_chunks_retrieved"].as_f64().unwrap() > 0.0); + + // window_hours should be present + assert!(data["window_hours"].is_number()); +} + +#[tokio::test] +async fn test_rag_metadata_preserved() { + let (addr, _handle) = spawn_llm_server_with_rag().await; + let client = reqwest::Client::new(); + let base = format!("http://{addr}"); + + // Ingest trace with RAG metadata + let body = serde_json::json!({ + "id": "rag-meta-1", + "name": "rag-with-metadata", + "status": "completed", + "spans": [{ + "id": "rag-meta-ret-1", + "span_type": "retrieval", + "name": "doc_search", + "latency_ms": 80, + "status": "ok", + "metadata": { + "rag.source": "weaviate", + "rag.chunks_retrieved": 8, + "rag.chunks_used": 3, + "rag.context_tokens": 2400 + } + }] + }); + + let body_bytes = serde_json::to_vec(&body).unwrap(); + let sig = sign("test-secret", &body_bytes); + + let resp = client + .post(format!("{base}/v1/traces")) + .header("Content-Type", "application/json") + .header("X-Signature", &sig) + .body(body_bytes) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), 200); + + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + // Query trace detail to verify metadata is preserved + let resp = client + .get(format!("{base}/v1/llm/traces/rag-meta-1")) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), 200); + let data: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(data["trace"]["name"], "rag-with-metadata"); + // Verify spans exist + let spans = data["spans"].as_array().unwrap(); + assert_eq!(spans.len(), 1); + assert_eq!(spans[0]["span_type"], "retrieval"); +} + +#[tokio::test] +async fn test_rag_empty_when_no_data() { + let (addr, _handle) = spawn_llm_server_with_rag().await; + let client = reqwest::Client::new(); + let base = format!("http://{addr}"); + + // Query RAG with no data ingested + let resp = client + .get(format!("{base}/v1/llm/rag")) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), 200); + let data: serde_json::Value = resp.json().await.unwrap(); + + // Sources should be empty + let sources = data["sources"].as_array().unwrap(); + assert!(sources.is_empty()); + + // Relevance should show zero + assert_eq!(data["relevance"]["total_retrievals"], 0); + + // No errors + assert!(data["window_hours"].is_number()); +}