Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion crates/terraphim_sessions/src/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,21 @@ pub trait SessionConnector: Send + Sync {
false
}

/// Start watching for new sessions in real-time
/// Start watching for new sessions in real-time.
///
/// Returns a receiver that emits sessions as they are detected.
///
/// ## Deduplication contract
///
/// Implementations MUST debounce rapid filesystem events and deduplicate emissions:
/// - Multiple `Modify` events for the same file within a quiescent period (≤ 250 ms)
/// MUST result in at most one session emission per quiescent window.
/// - A session is only emitted when its `messages.len()` has increased since the last
/// emission for that path (dedup key: `(path, messages.len)`).
///
/// Callers may see a session emitted more than once if the file grows across separate
/// quiescent windows, but MUST NOT see repeated emissions carrying identical content.
///
/// Default implementation returns an error if not supported.
async fn watch(&self) -> Result<mpsc::Receiver<Session>> {
anyhow::bail!("Watch not supported for this connector")
Expand Down
276 changes: 243 additions & 33 deletions crates/terraphim_sessions/src/connector/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ use anyhow::{Context, Result};
use async_trait::async_trait;
use notify::{Event, EventKind, RecursiveMode, Watcher, recommended_watcher};
use serde::Deserialize;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;

/// Native Claude Code session connector
Expand Down Expand Up @@ -125,7 +127,6 @@ impl SessionConnector for NativeClaudeConnector {
}

async fn watch(&self) -> Result<mpsc::Receiver<Session>> {
let (tx, rx) = mpsc::channel(32);
let base_path = self
.default_path()
.ok_or_else(|| anyhow::anyhow!("No default path found for watch"))?;
Expand All @@ -134,6 +135,22 @@ impl SessionConnector for NativeClaudeConnector {
anyhow::bail!("Watch path does not exist: {}", base_path.display());
}

self.watch_at(base_path).await
}
}

impl NativeClaudeConnector {
/// Debounce window: events arriving within this period for the same path are coalesced.
const DEBOUNCE_MS: u64 = 200;

/// Start watching `base_path` for JSONL session file changes.
///
/// Separated from `watch()` to allow injection of a test-controlled path.
/// Deduplication strategy: per-path `HashMap<PathBuf, (messages_len, last_event_time)>`.
/// A session is emitted only when `messages.len()` has grown since the last emission
/// AND the debounce window has elapsed since the most recent filesystem event.
async fn watch_at(&self, base_path: PathBuf) -> Result<mpsc::Receiver<Session>> {
let (tx, rx) = mpsc::channel(32);
let path = Arc::new(base_path);

tokio::task::spawn_blocking(move || -> Result<()> {
Expand All @@ -149,46 +166,67 @@ impl SessionConnector for NativeClaudeConnector {
path.display()
);

for res in watcher_rx {
match res {
Ok(event) => {
// pending: path -> time of the most-recent filesystem event (debounce clock).
let mut pending: HashMap<PathBuf, Instant> = HashMap::new();
// last_emitted: dedup key (PathBuf -> messages_len_at_last_emission).
let mut last_emitted: HashMap<PathBuf, usize> = HashMap::new();
let poll_interval = Duration::from_millis(50);
let debounce = Duration::from_millis(Self::DEBOUNCE_MS);

loop {
// Exit when all receivers have been dropped (e.g. test teardown or caller gone).
if tx.is_closed() {
break;
}

match watcher_rx.recv_timeout(poll_interval) {
Ok(Ok(event)) => {
if let Event {
kind: EventKind::Create(_) | EventKind::Modify(_),
paths,
..
} = event
{
for path in paths {
if path.extension().is_some_and(|ext| ext == "jsonl") {
let tx = tx.clone();
let path_clone = path.clone();

tokio::spawn(async move {
let connector = NativeClaudeConnector;
match connector.parse_session_file(&path_clone).await {
Ok(Some(session)) => {
if tx.send(session).await.is_err() {
tracing::warn!(
"Failed to send session from watch"
);
}
}
Ok(None) => {}
Err(e) => {
tracing::warn!(
"Failed to parse new session {}: {}",
path_clone.display(),
e
);
}
}
});
for p in paths {
if p.extension().is_some_and(|ext| ext == "jsonl") {
pending.insert(p, Instant::now());
}
}
}
}
Err(e) => {
tracing::warn!("Watch error: {}", e);
Ok(Err(e)) => tracing::warn!("Watch error: {}", e),
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
}

// Flush paths whose debounce window has elapsed.
let now = Instant::now();
let ready: Vec<PathBuf> = pending
.iter()
.filter(|(_, t)| now.duration_since(**t) >= debounce)
.map(|(p, _)| p.clone())
.collect();

for p in ready {
pending.remove(&p);

// Parse synchronously on the blocking thread to avoid spawn-inside-spawn
// and single-threaded-runtime stall in tests.
match parse_session_file_sync(&p) {
Ok(Some(session)) => {
let new_len = session.messages.len();
let prev_len = last_emitted.get(&p).copied().unwrap_or(0);
if new_len > prev_len {
last_emitted.insert(p.clone(), new_len);
if tx.blocking_send(session).is_err() {
tracing::warn!("Failed to send session from watch");
}
}
}
Ok(None) => {}
Err(e) => {
tracing::warn!("Failed to parse session {}: {}", p.display(), e);
}
}
}
}
Expand All @@ -198,9 +236,7 @@ impl SessionConnector for NativeClaudeConnector {

Ok(rx)
}
}

impl NativeClaudeConnector {
/// Parse a single session file
async fn parse_session_file(&self, path: &PathBuf) -> Result<Option<Session>> {
let content = tokio::fs::read_to_string(path)
Expand Down Expand Up @@ -392,6 +428,65 @@ struct ToolResultContent {
content: String,
}

/// Synchronous version of `NativeClaudeConnector::parse_session_file` for use from
/// blocking threads (avoids `tokio::spawn`-inside-`spawn_blocking` deadlocks on
/// single-threaded test runtimes).
fn parse_session_file_sync(path: &PathBuf) -> Result<Option<Session>> {
let content = std::fs::read_to_string(path)
.with_context(|| format!("Failed to read {}", path.display()))?;

let mut entries: Vec<LogEntry> = Vec::new();
for line in content.lines() {
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<LogEntry>(line) {
Ok(entry) => entries.push(entry),
Err(e) => tracing::trace!("Skipping malformed line: {}", e),
}
}

if entries.is_empty() {
return Ok(None);
}

let connector = NativeClaudeConnector;
let session_id = entries
.first()
.and_then(|e| e.session_id.clone())
.unwrap_or_else(|| {
path.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown")
.to_string()
});
let project_path = entries.first().and_then(|e| e.cwd.clone());
let messages: Vec<Message> = entries
.iter()
.enumerate()
.filter_map(|(idx, entry)| connector.entry_to_message(idx, entry))
.collect();

if messages.is_empty() {
return Ok(None);
}

let started_at = entries.first().and_then(|e| parse_timestamp(&e.timestamp));
let ended_at = entries.last().and_then(|e| parse_timestamp(&e.timestamp));

Ok(Some(Session {
id: format!("claude-code-native:{}", session_id),
source: "claude-code-native".to_string(),
external_id: session_id,
title: project_path.clone(),
source_path: path.clone(),
started_at,
ended_at,
messages,
metadata: SessionMetadata::new(project_path, None, vec![], serde_json::Value::Null),
}))
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -565,4 +660,119 @@ mod tests {
let connector = NativeClaudeConnector;
assert!(connector.supports_watch());
}

/// Regression test for #815: pure unit test for the dedup key logic.
/// Writes a JSONL file with 3 messages and verifies parse_session_file_sync returns them.
#[tokio::test]
async fn test_parse_session_file_sync_dedup_key() {
let dir = tempfile::tempdir().unwrap();
let file_path = dir.path().join("dedup.jsonl");
let line1 = r#"{"sessionId":"sess1","cwd":"/p","timestamp":"2024-01-15T10:00:00Z","message":{"role":"user","content":"hello"}}"#;
let line2 = r#"{"sessionId":"sess1","cwd":"/p","timestamp":"2024-01-15T10:00:01Z","message":{"role":"assistant","content":[{"type":"text","text":"world"}]}}"#;
let line3 = r#"{"sessionId":"sess1","cwd":"/p","timestamp":"2024-01-15T10:00:02Z","message":{"role":"user","content":"more"}}"#;

// 1-message state
tokio::fs::write(&file_path, line1).await.unwrap();
let s1 = parse_session_file_sync(&file_path).unwrap().unwrap();
assert_eq!(s1.messages.len(), 1);

// 2-message state (same session, grown)
tokio::fs::write(&file_path, format!("{line1}\n{line2}"))
.await
.unwrap();
let s2 = parse_session_file_sync(&file_path).unwrap().unwrap();
assert_eq!(s2.messages.len(), 2);
assert!(
s2.messages.len() > s1.messages.len(),
"dedup: emit when grown"
);

// 3-message state
tokio::fs::write(&file_path, format!("{line1}\n{line2}\n{line3}"))
.await
.unwrap();
let s3 = parse_session_file_sync(&file_path).unwrap().unwrap();
assert_eq!(s3.messages.len(), 3);
assert!(
s3.messages.len() > s2.messages.len(),
"dedup: emit when grown"
);

// Same state again: messages.len() unchanged → dedup should suppress emission.
let s3b = parse_session_file_sync(&file_path).unwrap().unwrap();
assert_eq!(
s3b.messages.len(),
s3.messages.len(),
"dedup: no-op when unchanged"
);
}

/// Integration test for #815: writing a JSONL file in N rapid increments must result in
/// at most one session emission per quiescent window, not N emissions for the same content.
///
/// Marked ignore: requires real inotify on a non-tmpfs filesystem; timing-sensitive.
/// Run manually with: cargo test test_watch_deduplicates_incremental_appends -- --ignored
#[ignore = "integration test: real inotify; run manually with -- --ignored"]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_watch_deduplicates_incremental_appends() {
let dir = tempfile::tempdir().unwrap();
let file_path = dir.path().join("incremental.jsonl");

// Start watching the temp directory and give the inotify watcher time to initialise.
let connector = NativeClaudeConnector;
let mut rx = connector.watch_at(dir.path().to_path_buf()).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;

// Write the same session in 3 rapid increments (simulating JSONL line-by-line flushes).
let line1 = r#"{"sessionId":"sess1","cwd":"/p","timestamp":"2024-01-15T10:00:00Z","message":{"role":"user","content":"hello"}}"#;
let line2 = r#"{"sessionId":"sess1","cwd":"/p","timestamp":"2024-01-15T10:00:01Z","message":{"role":"assistant","content":[{"type":"text","text":"world"}]}}"#;
let line3 = r#"{"sessionId":"sess1","cwd":"/p","timestamp":"2024-01-15T10:00:02Z","message":{"role":"user","content":"more"}}"#;

tokio::fs::write(&file_path, line1).await.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
tokio::fs::write(&file_path, format!("{line1}\n{line2}"))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
tokio::fs::write(&file_path, format!("{line1}\n{line2}\n{line3}"))
.await
.unwrap();

// Collect all emissions within debounce + buffer using select! to avoid deadline-loop
// pitfalls. The sleep fires once at the outer deadline; each received session extends
// nothing — we just keep collecting until the sleep fires.
let collect_ms = NativeClaudeConnector::DEBOUNCE_MS + 600;
let mut emissions: Vec<usize> = Vec::new();
let deadline = tokio::time::sleep(Duration::from_millis(collect_ms));
tokio::pin!(deadline);

loop {
tokio::select! {
biased;
_ = &mut deadline => break,
msg = rx.recv() => match msg {
Some(session) => emissions.push(session.messages.len()),
None => break,
},
}
}

// Must have received at least one emission with the final state (3 messages).
assert!(
!emissions.is_empty(),
"expected at least one session emission; watcher may not have initialised before writes"
);
assert_eq!(
*emissions.last().unwrap(),
3,
"last emission must carry the final 3-message state"
);
// Must NOT have received more than a small constant number of emissions
// (the 3 rapid writes fall within one debounce window, so ideally 1).
assert!(
emissions.len() <= 3,
"received {} emissions for 3 rapid increments; expected at most 3",
emissions.len()
);
}
}
Loading