From 178e40edf84cdf43f81adfc15303c94ef1ef4f90 Mon Sep 17 00:00:00 2001 From: Stefan Steiner Date: Thu, 21 May 2026 00:47:32 -0700 Subject: [PATCH] feat: single-instance daemon for shared hyperd across MCP clients A lightweight daemon manages one shared hyperd process per user so multiple AI clients (Claude Code, Cursor, VS Code, etc.) can access the same persistent databases simultaneously with reduced resource overhead. Architecture: - TCP port binding as cross-platform single-instance lock - Discovery file at ~/.hyperdb/daemon.json (overridable via HYPERDB_STATE_DIR) - Health protocol (PING/HEARTBEAT/STOP/STATUS) for liveness and idle tracking - Auto-spawn: MCP clients transparently start the daemon if none is running - Idle timeout (default 30 min) with heartbeat-based keep-alive - Ephemeral databases DETACH + delete on session end (Windows-safe) - --no-daemon flag to opt out and use legacy per-client hyperd New files: - hyperdb-mcp/src/daemon/{mod,discovery,health,run,spawn}.rs - hyperdb-mcp/tests/daemon_tests.rs (26 tests: unit + integration) --- Cargo.lock | 15 +- hyperdb-mcp/Cargo.toml | 5 +- hyperdb-mcp/src/daemon/discovery.rs | 125 ++++++ hyperdb-mcp/src/daemon/health.rs | 186 +++++++++ hyperdb-mcp/src/daemon/mod.rs | 21 + hyperdb-mcp/src/daemon/run.rs | 152 +++++++ hyperdb-mcp/src/daemon/spawn.rs | 104 +++++ hyperdb-mcp/src/engine.rs | 170 ++++++-- hyperdb-mcp/src/lib.rs | 1 + hyperdb-mcp/src/main.rs | 133 +++++- hyperdb-mcp/src/server.rs | 54 ++- hyperdb-mcp/tests/daemon_tests.rs | 612 ++++++++++++++++++++++++++++ 12 files changed, 1521 insertions(+), 57 deletions(-) create mode 100644 hyperdb-mcp/src/daemon/discovery.rs create mode 100644 hyperdb-mcp/src/daemon/health.rs create mode 100644 hyperdb-mcp/src/daemon/mod.rs create mode 100644 hyperdb-mcp/src/daemon/run.rs create mode 100644 hyperdb-mcp/src/daemon/spawn.rs create mode 100644 hyperdb-mcp/tests/daemon_tests.rs diff --git a/Cargo.lock b/Cargo.lock index 733d888..15c8a7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1811,7 +1811,7 @@ dependencies = [ [[package]] name = "hyperdb-api" -version = "0.1.2" +version = "0.1.3" dependencies = [ "arrow", "bytes", @@ -1832,7 +1832,7 @@ dependencies = [ [[package]] name = "hyperdb-api-core" -version = "0.1.2" +version = "0.1.3" dependencies = [ "arrow", "base64", @@ -1872,7 +1872,7 @@ dependencies = [ [[package]] name = "hyperdb-api-node" -version = "0.1.2" +version = "0.1.3" dependencies = [ "hyperdb-api", "napi", @@ -1884,7 +1884,7 @@ dependencies = [ [[package]] name = "hyperdb-api-salesforce" -version = "0.1.2" +version = "0.1.3" dependencies = [ "arrow", "base64", @@ -1905,7 +1905,7 @@ dependencies = [ [[package]] name = "hyperdb-bootstrap" -version = "0.1.2" +version = "0.1.3" dependencies = [ "anyhow", "clap", @@ -1923,7 +1923,7 @@ dependencies = [ [[package]] name = "hyperdb-mcp" -version = "0.1.2" +version = "0.1.3" dependencies = [ "arrow", "base64", @@ -1931,6 +1931,7 @@ dependencies = [ "clap", "csv", "hyperdb-api", + "libc", "notify", "parquet", "plotters", @@ -3767,7 +3768,7 @@ dependencies = [ [[package]] name = "sea-query-hyperdb" -version = "0.1.2" +version = "0.1.3" dependencies = [ "sea-query", ] diff --git a/hyperdb-mcp/Cargo.toml b/hyperdb-mcp/Cargo.toml index 17ac7f2..ae36e8f 100644 --- a/hyperdb-mcp/Cargo.toml +++ b/hyperdb-mcp/Cargo.toml @@ -22,7 +22,7 @@ path = "src/main.rs" [dependencies] hyperdb-api = { path = "../hyperdb-api", version = "0.1.1" } rmcp = { version = "1.7", features = ["server", "transport-io"] } -tokio = { version = "1", features = ["rt-multi-thread", "macros", "io-std", "signal"] } +tokio = { version = "1", features = ["rt-multi-thread", "macros", "io-std", "signal", "time"] } serde = { workspace = true } serde_json = { workspace = true, features = ["preserve_order"] } clap = { version = "4", features = ["derive"] } @@ -42,6 +42,9 @@ tokio-util = { version = "0.7", features = ["rt"] } tempfile = { workspace = true } sqlformat = "0.5.0" +[target.'cfg(unix)'.dependencies] +libc = "0.2" + [lints] workspace = true diff --git a/hyperdb-mcp/src/daemon/discovery.rs b/hyperdb-mcp/src/daemon/discovery.rs new file mode 100644 index 0000000..002ffb9 --- /dev/null +++ b/hyperdb-mcp/src/daemon/discovery.rs @@ -0,0 +1,125 @@ +// Copyright (c) 2026, Salesforce, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR MIT + +//! Discovery file management for the single-instance daemon. +//! +//! The daemon writes a JSON file to `~/.hyperdb/daemon.json` containing its +//! PID and the `hyperd` endpoint. Clients read this file to locate the running +//! daemon, validating liveness via a TCP health check before trusting it. + +use std::io; +use std::net::TcpStream; +use std::path::PathBuf; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; + +use super::DEFAULT_DAEMON_PORT; + +/// Information written by the daemon so clients can discover and connect. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DaemonInfo { + /// OS process ID of the daemon. + pub pid: u32, + /// The `hyperd` libpq endpoint clients should connect to (e.g. `127.0.0.1:54321`). + pub hyperd_endpoint: String, + /// The TCP port the daemon's health listener is bound to. + pub health_port: u16, + /// ISO-8601 timestamp when the daemon started. + pub started_at: String, + /// Version of the daemon binary. + pub version: String, +} + +/// Returns the directory used for daemon state files. +/// +/// Resolution order: +/// 1. `HYPERDB_STATE_DIR` environment variable (if set) +/// 2. `~/.hyperdb/` (where `~` is `HOME` on Unix, `USERPROFILE` on Windows) +/// +/// # Errors +/// Returns an error if neither the env var nor the home directory can be determined. +pub fn state_dir() -> io::Result { + if let Some(dir) = std::env::var_os("HYPERDB_STATE_DIR") { + return Ok(PathBuf::from(dir)); + } + let home = home_dir().ok_or_else(|| { + io::Error::new(io::ErrorKind::NotFound, "cannot determine home directory") + })?; + Ok(home.join(".hyperdb")) +} + +/// Returns the path to the discovery file. +/// +/// # Errors +/// Returns an error if the home directory cannot be determined. +pub fn discovery_file_path() -> io::Result { + Ok(state_dir()?.join("daemon.json")) +} + +/// Write the discovery file atomically (write-to-temp then rename). +/// +/// # Errors +/// Returns an error if the state directory cannot be created or the file cannot be written. +pub fn write_discovery_file(info: &DaemonInfo) -> io::Result<()> { + let dir = state_dir()?; + std::fs::create_dir_all(&dir)?; + + let path = dir.join("daemon.json"); + let tmp_path = dir.join("daemon.json.tmp"); + let json = serde_json::to_string_pretty(info).map_err(|e| io::Error::other(e.to_string()))?; + std::fs::write(&tmp_path, json.as_bytes())?; + // On Windows, rename fails if target exists. Remove stale target first. + let _ = std::fs::remove_file(&path); + std::fs::rename(&tmp_path, &path)?; + Ok(()) +} + +/// Read the discovery file and validate that the daemon is still alive. +/// Returns `None` if no daemon is running (file missing, stale, or unreachable). +pub fn discover() -> Option { + let path = discovery_file_path().ok()?; + let contents = std::fs::read_to_string(&path).ok()?; + let info: DaemonInfo = serde_json::from_str(&contents).ok()?; + + // Validate liveness by connecting to the health port + if is_daemon_alive(info.health_port) { + Some(info) + } else { + // Stale file — daemon crashed. Clean up. + let _ = std::fs::remove_file(&path); + None + } +} + +/// Remove the discovery file (called during graceful shutdown). +pub fn remove_discovery_file() { + if let Ok(path) = discovery_file_path() { + let _ = std::fs::remove_file(&path); + } +} + +/// Check if the daemon is alive by attempting a TCP connection to its health port. +fn is_daemon_alive(port: u16) -> bool { + TcpStream::connect_timeout( + &std::net::SocketAddr::from(([127, 0, 0, 1], port)), + Duration::from_secs(2), + ) + .is_ok() +} + +/// Resolve the daemon health port from environment or default. +pub fn resolve_port() -> u16 { + std::env::var(super::ENV_DAEMON_PORT) + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(DEFAULT_DAEMON_PORT) +} + +/// Cross-platform home directory resolution. +fn home_dir() -> Option { + // Try HOME (Unix) then USERPROFILE (Windows) + std::env::var_os("HOME") + .or_else(|| std::env::var_os("USERPROFILE")) + .map(PathBuf::from) +} diff --git a/hyperdb-mcp/src/daemon/health.rs b/hyperdb-mcp/src/daemon/health.rs new file mode 100644 index 0000000..d9a4c69 --- /dev/null +++ b/hyperdb-mcp/src/daemon/health.rs @@ -0,0 +1,186 @@ +// Copyright (c) 2026, Salesforce, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR MIT + +//! TCP health listener for the daemon. +//! +//! The health listener serves two purposes: +//! 1. **Single-instance lock** — binding the port guarantees at most one daemon per user. +//! 2. **Liveness probe + heartbeat** — clients connect and send simple text commands. +//! +//! Protocol (line-based, newline-terminated): +//! - `PING\n` → `PONG\n` (liveness check) +//! - `HEARTBEAT\n` → `OK\n` (resets idle timer) +//! - `STOP\n` → `STOPPING\n` (triggers graceful shutdown) +//! - `STATUS\n` → JSON line with daemon info + +use std::io::{BufRead, BufReader, Write}; +use std::net::{TcpListener, TcpStream}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Instant; + +use tracing::{debug, warn}; + +use super::discovery::DaemonInfo; + +/// Handle to the health listener, used to check binding success and manage lifecycle. +#[derive(Debug)] +pub struct HealthListener { + listener: TcpListener, + pub port: u16, +} + +/// Shared state between the health listener and the daemon main loop. +#[derive(Debug)] +pub struct DaemonState { + /// Last time any client sent a heartbeat or query. + pub last_activity: std::sync::Mutex, + /// Signal to shut down the daemon. + pub shutdown: AtomicBool, +} + +impl Default for DaemonState { + fn default() -> Self { + Self::new() + } +} + +impl DaemonState { + pub fn new() -> Self { + Self { + last_activity: std::sync::Mutex::new(Instant::now()), + shutdown: AtomicBool::new(false), + } + } + + /// Record activity (resets idle timer). + /// + /// # Panics + /// Panics if the internal mutex is poisoned. + pub fn touch(&self) { + *self.last_activity.lock().expect("mutex poisoned") = Instant::now(); + } + + /// Duration since the last activity. + /// + /// # Panics + /// Panics if the internal mutex is poisoned. + pub fn idle_duration(&self) -> std::time::Duration { + self.last_activity.lock().expect("mutex poisoned").elapsed() + } + + pub fn request_shutdown(&self) { + self.shutdown.store(true, Ordering::Release); + } + + pub fn should_shutdown(&self) -> bool { + self.shutdown.load(Ordering::Acquire) + } +} + +impl HealthListener { + /// Try to bind the health port. + /// + /// # Errors + /// Returns `Err` if the port is already in use (another daemon is running) + /// or the bind fails for another reason. + pub fn bind(port: u16) -> std::io::Result { + let addr = std::net::SocketAddr::from(([127, 0, 0, 1], port)); + let listener = TcpListener::bind(addr)?; + listener.set_nonblocking(true)?; + let port = listener.local_addr()?.port(); + Ok(Self { listener, port }) + } + + /// Run the health listener loop. Spawns per-connection threads until shutdown. + /// Consumes `self` because this is intended to be called from a dedicated thread. + #[expect( + clippy::needless_pass_by_value, + reason = "Arc and DaemonInfo are cloned into per-connection threads" + )] + pub fn run(self, state: Arc, info: DaemonInfo) { + loop { + if state.should_shutdown() { + break; + } + + match self.listener.accept() { + Ok((stream, _addr)) => { + let state = Arc::clone(&state); + let info = info.clone(); + std::thread::spawn(move || { + handle_client(stream, &state, &info); + }); + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + std::thread::sleep(std::time::Duration::from_millis(100)); + } + Err(e) => { + warn!(error = %e, "health listener accept error"); + std::thread::sleep(std::time::Duration::from_millis(500)); + } + } + } + debug!("health listener shut down"); + } +} + +#[expect( + clippy::needless_pass_by_value, + reason = "TcpStream must be owned for BufReader" +)] +fn handle_client(stream: TcpStream, state: &DaemonState, info: &DaemonInfo) { + let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(5))); + let mut reader = BufReader::new(&stream); + let mut writer = &stream; + let mut line = String::new(); + + loop { + line.clear(); + match reader.read_line(&mut line) { + Ok(0) => break, + Ok(_) => { + let cmd = line.trim(); + let response = match cmd { + "PING" => "PONG\n".to_string(), + "HEARTBEAT" => { + state.touch(); + "OK\n".to_string() + } + "STOP" => { + state.request_shutdown(); + "STOPPING\n".to_string() + } + "STATUS" => { + let json = serde_json::to_string(info).unwrap_or_default(); + format!("{json}\n") + } + _ => "ERR unknown command\n".to_string(), + }; + if writer.write_all(response.as_bytes()).is_err() { + break; + } + } + Err(_) => break, + } + } +} + +/// Send a command to the daemon's health port and return the response. +/// +/// # Errors +/// Returns an error if the connection fails or the response cannot be read. +pub fn send_command(port: u16, command: &str) -> std::io::Result { + let addr = std::net::SocketAddr::from(([127, 0, 0, 1], port)); + let mut stream = TcpStream::connect_timeout(&addr, std::time::Duration::from_secs(2))?; + stream.set_read_timeout(Some(std::time::Duration::from_secs(5)))?; + + let msg = format!("{command}\n"); + stream.write_all(msg.as_bytes())?; + stream.flush()?; + + let mut reader = BufReader::new(&stream); + let mut response = String::new(); + reader.read_line(&mut response)?; + Ok(response) +} diff --git a/hyperdb-mcp/src/daemon/mod.rs b/hyperdb-mcp/src/daemon/mod.rs new file mode 100644 index 0000000..8f870a7 --- /dev/null +++ b/hyperdb-mcp/src/daemon/mod.rs @@ -0,0 +1,21 @@ +// Copyright (c) 2026, Salesforce, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR MIT + +//! Single-instance daemon for sharing a `hyperd` process across MCP clients. + +pub mod discovery; +pub mod health; +pub mod run; +pub mod spawn; + +/// Default TCP port the daemon binds for health checks and single-instance locking. +pub const DEFAULT_DAEMON_PORT: u16 = 7484; + +/// Default idle timeout in seconds before the daemon shuts down. +pub const DEFAULT_IDLE_TIMEOUT_SECS: u64 = 30 * 60; // 30 minutes + +/// Environment variable to override the daemon port. +pub const ENV_DAEMON_PORT: &str = "HYPERDB_DAEMON_PORT"; + +/// Environment variable to override the idle timeout (seconds). +pub const ENV_IDLE_TIMEOUT: &str = "HYPERDB_DAEMON_IDLE_TIMEOUT"; diff --git a/hyperdb-mcp/src/daemon/run.rs b/hyperdb-mcp/src/daemon/run.rs new file mode 100644 index 0000000..86b95a3 --- /dev/null +++ b/hyperdb-mcp/src/daemon/run.rs @@ -0,0 +1,152 @@ +// Copyright (c) 2026, Salesforce, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR MIT + +//! Daemon main loop: spawns `hyperd`, runs health listener, monitors idle timeout. + +use std::sync::Arc; +use std::time::Duration; + +use tokio::signal; +use tracing::info; + +use hyperdb_api::{HyperProcess, Parameters, TransportMode}; + +use super::discovery::{self, DaemonInfo}; +use super::health::{DaemonState, HealthListener}; +use super::{DEFAULT_IDLE_TIMEOUT_SECS, ENV_IDLE_TIMEOUT}; + +/// Configuration for the daemon process. +#[derive(Debug)] +pub struct DaemonConfig { + pub port: u16, + pub idle_timeout: Duration, +} + +impl DaemonConfig { + pub fn from_args(port: u16, idle_timeout_secs: Option) -> Self { + let idle_timeout_secs = idle_timeout_secs + .or_else(|| { + std::env::var(ENV_IDLE_TIMEOUT) + .ok() + .and_then(|v| v.parse().ok()) + }) + .unwrap_or(DEFAULT_IDLE_TIMEOUT_SECS); + + Self { + port, + idle_timeout: Duration::from_secs(idle_timeout_secs), + } + } +} + +/// Run the daemon. This function blocks until shutdown is triggered. +/// +/// # Errors +/// Returns an error if the health port cannot be bound, `hyperd` fails to start, +/// or the discovery file cannot be written. +pub async fn run_daemon(config: DaemonConfig) -> Result<(), Box> { + // Step 1: Bind health port (single-instance lock) + let listener = HealthListener::bind(config.port).map_err(|e| { + if e.kind() == std::io::ErrorKind::AddrInUse { + format!( + "Another hyperdb daemon is already running on port {}. \ + Use `hyperdb-mcp daemon status` to check or `hyperdb-mcp daemon stop` to stop it.", + config.port + ) + } else { + format!("Failed to bind health port {}: {e}", config.port) + } + })?; + let bound_port = listener.port; + info!(port = bound_port, "daemon health listener bound"); + + // Step 2: Spawn HyperProcess with TCP transport (shared across clients) + let log_dir = discovery::state_dir()?.join("logs"); + std::fs::create_dir_all(&log_dir)?; + + let mut params = Parameters::new(); + params.set("log_file_max_count", "2"); + params.set("log_file_size_limit", "100M"); + params.set("log_dir", log_dir.to_string_lossy().as_ref()); + params.set_transport_mode(TransportMode::Tcp); + + let hyper = HyperProcess::new(None, Some(¶ms))?; + let endpoint = hyper + .endpoint() + .ok_or("hyperd did not report an endpoint")? + .to_string(); + info!(endpoint = %endpoint, "hyperd started"); + + // Step 3: Write discovery file + let info = DaemonInfo { + pid: std::process::id(), + hyperd_endpoint: endpoint.clone(), + health_port: bound_port, + started_at: chrono::Utc::now().to_rfc3339(), + version: env!("CARGO_PKG_VERSION").to_string(), + }; + discovery::write_discovery_file(&info)?; + info!(path = %discovery::discovery_file_path()?.display(), "discovery file written"); + + // Step 4: Start health listener in background thread + let state = Arc::new(DaemonState::new()); + let health_state = Arc::clone(&state); + let health_info = info.clone(); + let health_handle = std::thread::spawn(move || { + listener.run(health_state, health_info); + }); + + // Step 5: Monitor idle timeout + OS signals + let idle_timeout = config.idle_timeout; + let shutdown_state = Arc::clone(&state); + + tokio::select! { + () = async { + loop { + tokio::time::sleep(Duration::from_secs(10)).await; + if shutdown_state.idle_duration() >= idle_timeout { + info!( + idle_secs = idle_timeout.as_secs(), + "idle timeout reached, shutting down" + ); + shutdown_state.request_shutdown(); + break; + } + if shutdown_state.should_shutdown() { + break; + } + } + } => {} + () = shutdown_signal() => { + info!("received shutdown signal"); + state.request_shutdown(); + } + } + + // Step 6: Graceful shutdown + info!("shutting down daemon"); + discovery::remove_discovery_file(); + drop(hyper); // closes callback connection → hyperd exits + let _ = health_handle.join(); + + Ok(()) +} + +async fn shutdown_signal() { + let ctrl_c = signal::ctrl_c(); + + #[cfg(unix)] + { + let mut sigterm = + signal::unix::signal(signal::unix::SignalKind::terminate()).expect("sigterm handler"); + tokio::select! { + _ = ctrl_c => {} + _ = sigterm.recv() => {} + } + } + + #[cfg(not(unix))] + { + ctrl_c.await.ok(); + } +} diff --git a/hyperdb-mcp/src/daemon/spawn.rs b/hyperdb-mcp/src/daemon/spawn.rs new file mode 100644 index 0000000..deb3de4 --- /dev/null +++ b/hyperdb-mcp/src/daemon/spawn.rs @@ -0,0 +1,104 @@ +// Copyright (c) 2026, Salesforce, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR MIT + +//! Spawn the daemon as a detached background process. +//! +//! When an MCP client starts and no daemon is running, it spawns one using the +//! current binary with the `daemon` subcommand. The spawned process is fully +//! detached so it outlives the parent MCP session. + +use std::io; +use std::process::Command; +use std::time::{Duration, Instant}; + +use tracing::{debug, info}; + +use super::discovery::{self, DaemonInfo}; + +/// Maximum time to wait for the daemon to write its discovery file after spawning. +const SPAWN_TIMEOUT: Duration = Duration::from_secs(10); + +/// Polling interval while waiting for the discovery file. +const POLL_INTERVAL: Duration = Duration::from_millis(100); + +/// Ensure a daemon is running and return its info. +/// If no daemon is detected, spawn one and wait for it to become ready. +/// +/// # Errors +/// Returns an error if the daemon cannot be spawned or does not become ready +/// within the timeout period. +pub fn ensure_daemon(port: u16) -> io::Result { + // Check if already running + if let Some(info) = discovery::discover() { + debug!(endpoint = %info.hyperd_endpoint, "daemon already running"); + return Ok(info); + } + + info!("no running daemon detected, spawning one"); + spawn_detached(port)?; + wait_for_daemon() +} + +/// Spawn `hyperdb-mcp daemon` as a fully detached background process. +fn spawn_detached(port: u16) -> io::Result<()> { + let exe = std::env::current_exe()?; + let port_str = port.to_string(); + + let mut cmd = Command::new(&exe); + cmd.arg("daemon").arg("--port").arg(&port_str); + + // Detach from parent: redirect stdio to null + cmd.stdin(std::process::Stdio::null()); + cmd.stdout(std::process::Stdio::null()); + cmd.stderr(std::process::Stdio::null()); + + // Platform-specific detach flags + #[cfg(unix)] + { + use std::os::unix::process::CommandExt; + // SAFETY: setsid() is async-signal-safe per POSIX. Called in pre_exec + // (between fork and exec) to create a new session so the daemon isn't + // killed when the parent terminal/process exits. + unsafe { + cmd.pre_exec(|| { + libc::setsid(); + Ok(()) + }); + } + } + + #[cfg(windows)] + { + use std::os::windows::process::CommandExt; + const CREATE_NO_WINDOW: u32 = 0x0800_0000; + const DETACHED_PROCESS: u32 = 0x0000_0008; + cmd.creation_flags(CREATE_NO_WINDOW | DETACHED_PROCESS); + } + + let child = cmd.spawn()?; + info!(pid = child.id(), "daemon process spawned"); + Ok(()) +} + +/// Poll for the discovery file to appear (daemon is ready). +fn wait_for_daemon() -> io::Result { + let start = Instant::now(); + loop { + if let Some(info) = discovery::discover() { + info!(endpoint = %info.hyperd_endpoint, "daemon is ready"); + return Ok(info); + } + + if start.elapsed() >= SPAWN_TIMEOUT { + return Err(io::Error::new( + io::ErrorKind::TimedOut, + format!( + "daemon did not become ready within {} seconds", + SPAWN_TIMEOUT.as_secs() + ), + )); + } + + std::thread::sleep(POLL_INTERVAL); + } +} diff --git a/hyperdb-mcp/src/engine.rs b/hyperdb-mcp/src/engine.rs index f47cc4e..b225909 100644 --- a/hyperdb-mcp/src/engine.rs +++ b/hyperdb-mcp/src/engine.rs @@ -38,23 +38,29 @@ //! optimization could use `spawn_blocking` or an async connection API, but the //! current approach is correct and simple. +use crate::daemon; use crate::error::{ErrorCode, McpError}; use crate::schema::ColumnSchema; use hyperdb_api::{Catalog, Connection, CreateMode, HyperProcess, Parameters, SqlType}; use serde_json::{json, Value}; use std::path::{Path, PathBuf}; -/// Owns a running `HyperProcess` and the single `Connection` to its workspace -/// `.hyper` file. All SQL execution flows through this struct. +/// Owns a connection to `hyperd` and the workspace `.hyper` file. All SQL +/// execution flows through this struct. /// -/// Two workspace modes are supported: -/// - **Persistent** — caller supplies a path; the `.hyper` file survives across -/// sessions so tables can be built up incrementally. -/// - **Ephemeral** — a temp directory is created per process; everything is -/// discarded when the server exits. +/// Two process modes: +/// - **Local** — this engine owns the `HyperProcess` subprocess directly. +/// - **Daemon** — a shared daemon manages `hyperd`; the engine only holds a connection. +/// +/// Two workspace modes: +/// - **Persistent** — caller supplies a path; the `.hyper` file survives across sessions. +/// - **Ephemeral** — a temp directory is created per process; discarded on exit. #[derive(Debug)] pub struct Engine { - hyper: HyperProcess, + /// `None` in daemon mode (the daemon owns the process). + hyper: Option, + /// Stored endpoint for daemon mode (the daemon advertises this). + daemon_endpoint: Option, connection: Connection, workspace_path: PathBuf, log_dir: PathBuf, @@ -62,17 +68,10 @@ pub struct Engine { } impl Engine { - #[expect( - clippy::needless_pass_by_value, - reason = "call-site ergonomics: function consumes logically-owned parameters, refactoring signatures is not worth per-site churn" - )] /// Create a new Engine. If `workspace_path` is Some, use that path (persistent mode). /// If None, use a temp file (ephemeral mode). /// - /// Logs from `hyperd` are written to the directory returned by - /// [`resolve_log_dir`]. The same directory should be used by the MCP - /// binary for its own client-side log so operators can find everything - /// in one place when debugging. + /// Connects to the shared daemon if available, falling back to a local `hyperd`. /// /// # Errors /// @@ -85,6 +84,22 @@ impl Engine { /// reports the `hyperd` executable is missing or unreachable via /// `HYPERD_PATH`. pub fn new(workspace_path: Option) -> Result { + Self::new_with_mode(workspace_path, false) + } + + /// Create an engine that bypasses the shared daemon and spawns a private `hyperd`. + /// + /// # Errors + /// Same as [`Self::new`]. + pub fn new_no_daemon(workspace_path: Option) -> Result { + Self::new_with_mode(workspace_path, true) + } + + #[expect( + clippy::needless_pass_by_value, + reason = "Option is consumed by the workspace path resolution logic" + )] + fn new_with_mode(workspace_path: Option, no_daemon: bool) -> Result { let (path, is_persistent) = if let Some(ref p) = workspace_path { let path = PathBuf::from(shellexpand_tilde(p)); if let Some(parent) = path.parent() { @@ -115,6 +130,14 @@ impl Engine { ) })?; + // Try daemon mode first unless disabled + if !no_daemon { + if let Some(engine) = Self::try_daemon_mode(&path, &log_dir, is_persistent)? { + return Ok(engine); + } + } + + // Fall back to spawning a local HyperProcess let mut params = Parameters::new(); params.set("log_file_max_count", "2"); params.set("log_file_size_limit", "100M"); @@ -135,20 +158,11 @@ impl Engine { McpError::new(ErrorCode::InternalError, format!("Failed to connect: {e}")) })?; - // Ensure the `public` schema exists in the workspace database so that - // `load_file`, `load_data`, and unqualified `CREATE TABLE` statements - // resolve without a "could not resolve the schema (3F000)" error. - connection - .execute_command("CREATE SCHEMA IF NOT EXISTS public") - .map_err(|e| { - McpError::new( - ErrorCode::InternalError, - format!("Failed to bootstrap public schema: {e}"), - ) - })?; + bootstrap_public_schema(&connection)?; Ok(Self { - hyper, + hyper: Some(hyper), + daemon_endpoint: None, connection, workspace_path: path, log_dir, @@ -156,22 +170,80 @@ impl Engine { }) } - /// Whether the `hyperd` child process is still alive. + /// Attempt to connect via the shared daemon. Returns `None` if the daemon + /// cannot be reached (falls back to local mode). + fn try_daemon_mode( + path: &Path, + log_dir: &Path, + is_persistent: bool, + ) -> Result, McpError> { + let port = daemon::discovery::resolve_port(); + let info = match daemon::spawn::ensure_daemon(port) { + Ok(info) => info, + Err(e) => { + tracing::debug!(error = %e, "daemon unavailable, falling back to local mode"); + return Ok(None); + } + }; + + let endpoint = &info.hyperd_endpoint; + let connection = Connection::connect( + endpoint, + &path.to_string_lossy(), + CreateMode::CreateIfNotExists, + ) + .map_err(|e| { + McpError::new( + ErrorCode::InternalError, + format!("Failed to connect to daemon hyperd at {endpoint}: {e}"), + ) + })?; + + bootstrap_public_schema(&connection)?; + + // Send heartbeat so daemon knows we're active + let _ = daemon::health::send_command(info.health_port, "HEARTBEAT"); + + Ok(Some(Self { + hyper: None, + daemon_endpoint: Some(info.hyperd_endpoint), + connection, + workspace_path: path.to_path_buf(), + log_dir: log_dir.to_path_buf(), + is_persistent, + })) + } + + /// Whether the backing `hyperd` process is still alive. + /// In daemon mode, checks the daemon health port. pub fn is_running(&self) -> bool { - self.hyper.is_running() + if let Some(ref hyper) = self.hyper { + hyper.is_running() + } else { + // Daemon mode: check if daemon is still reachable + daemon::discovery::discover().is_some() + } } - /// `host:port` endpoint of the hyperd child process. Used by the + /// `host:port` endpoint of the `hyperd` process. Used by the /// watcher to build additional async connections via `hyperdb_api::pool` /// without touching the primary sync connection this engine holds. /// /// # Errors /// - /// Returns [`ErrorCode::InternalError`] if the underlying - /// [`HyperProcess::require_endpoint`] call fails — typically when - /// `hyperd` has exited or never successfully reported an endpoint. + /// Returns [`ErrorCode::InternalError`] if the endpoint is unavailable. pub fn hyperd_endpoint(&self) -> Result { + if let Some(ref endpoint) = self.daemon_endpoint { + return Ok(endpoint.clone()); + } self.hyper + .as_ref() + .ok_or_else(|| { + McpError::new( + ErrorCode::InternalError, + "no hyperd endpoint available".to_string(), + ) + })? .require_endpoint() .map(std::string::ToString::to_string) .map_err(|e| McpError::new(ErrorCode::InternalError, e.to_string())) @@ -766,7 +838,7 @@ impl Engine { }; Ok(json!({ - "hyperd_running": self.hyper.is_running(), + "hyperd_running": self.is_running(), "workspace_path": self.workspace_path.to_string_lossy(), "workspace_mode": if self.is_persistent { "persistent" } else { "ephemeral" }, "table_count": table_count, @@ -1070,6 +1142,34 @@ fn strip_leading_sql_comments(sql: &str) -> &str { s } +impl Drop for Engine { + fn drop(&mut self) { + // In daemon mode with ephemeral databases, DETACH the workspace from hyperd + // (releases the file handle — critical on Windows) then delete the temp file. + if !self.is_persistent && self.daemon_endpoint.is_some() { + let db_name = self.primary_db_name(); + let detach = format!("DETACH DATABASE \"{db_name}\""); + let _ = self.connection.execute_command(&detach); + // Remove the temp directory containing the ephemeral .hyper file + if let Some(parent) = self.workspace_path.parent() { + let _ = std::fs::remove_dir_all(parent); + } + } + } +} + +fn bootstrap_public_schema(connection: &Connection) -> Result<(), McpError> { + connection + .execute_command("CREATE SCHEMA IF NOT EXISTS public") + .map(|_| ()) + .map_err(|e| { + McpError::new( + ErrorCode::InternalError, + format!("Failed to bootstrap public schema: {e}"), + ) + }) +} + /// Minimal `~/` (and `~\` on Windows) expansion. Resolves the home /// directory via `$HOME` on Unix and `%USERPROFILE%` (falling back to /// `%HOMEDRIVE%%HOMEPATH%`) on Windows. `~username/` is not supported — diff --git a/hyperdb-mcp/src/lib.rs b/hyperdb-mcp/src/lib.rs index 4fba0ba..2d187ca 100644 --- a/hyperdb-mcp/src/lib.rs +++ b/hyperdb-mcp/src/lib.rs @@ -38,6 +38,7 @@ pub mod attach; pub mod chart; +pub mod daemon; pub mod engine; pub mod error; pub mod export; diff --git a/hyperdb-mcp/src/main.rs b/hyperdb-mcp/src/main.rs index 43a4966..d83917f 100644 --- a/hyperdb-mcp/src/main.rs +++ b/hyperdb-mcp/src/main.rs @@ -4,6 +4,7 @@ //! Binary entry point for the `hyperdb-mcp` MCP server. //! //! Starts an MCP server on stdio, optionally backed by a persistent workspace. +//! Can also run in daemon mode to manage a shared `hyperd` process. //! //! # Logging //! @@ -19,7 +20,11 @@ //! [`hyperdb_mcp::engine::resolve_log_dir`]). Check the `status` tool for //! the exact paths. -use clap::Parser; +use clap::{Parser, Subcommand}; +use hyperdb_mcp::daemon; +use hyperdb_mcp::daemon::discovery; +use hyperdb_mcp::daemon::health; +use hyperdb_mcp::daemon::run::DaemonConfig; use hyperdb_mcp::engine::{resolve_log_dir, CLIENT_LOG_FILE_NAME}; use hyperdb_mcp::server::HyperMcpServer; use rmcp::ServiceExt; @@ -37,29 +42,107 @@ const VERSION: &str = concat!(env!("CARGO_PKG_VERSION"), ".r", env!("HYPERDB_GIT about = "MCP server for Hyper database analytics" )] struct Cli { + #[command(subcommand)] + command: Option, + /// Path to the `.hyper` workspace file for persistent mode (omit for ephemeral mode) - #[arg(long)] + #[arg(long, global = true)] workspace: Option, /// Run in read-only mode: disables execute, `load_data`, `load_file`, and export to hyper format - #[arg(long)] + #[arg(long, global = true)] read_only: bool, /// Bare mode: disable MCP-managed auxiliary tables. Skips creating /// `_table_catalog` and forces saved queries into in-memory /// (non-persistent) storage, even with --workspace. - #[arg(long)] + #[arg(long, global = true)] bare: bool, + + /// Disable the shared daemon and spawn a private `hyperd` (legacy behavior) + #[arg(long, global = true)] + no_daemon: bool, +} + +#[derive(Subcommand)] +enum Commands { + /// Run as a background daemon managing a shared hyperd process + Daemon { + #[command(subcommand)] + action: Option, + + /// TCP port for health listener and single-instance lock + #[arg(long, default_value_t = daemon::DEFAULT_DAEMON_PORT)] + port: u16, + + /// Idle timeout in seconds before the daemon shuts down + #[arg(long)] + idle_timeout: Option, + }, +} + +#[derive(Subcommand)] +enum DaemonAction { + /// Stop a running daemon + Stop, + /// Show status of the running daemon + Status, } #[tokio::main] async fn main() -> Result<(), Box> { - // Parse CLI first so the log directory matches whatever workspace the - // user requested. This has to happen before tracing is initialized so - // the file layer points at the right place. let cli = Cli::parse(); - // Compute and create the log dir (same logic Engine will use later). + match cli.command { + Some(Commands::Daemon { + action: Some(DaemonAction::Stop), + port, + .. + }) => { + daemon_stop(port); + Ok(()) + } + Some(Commands::Daemon { + action: Some(DaemonAction::Status), + .. + }) => { + daemon_status(); + Ok(()) + } + Some(Commands::Daemon { + action: None, + port, + idle_timeout, + }) => run_daemon_mode(port, idle_timeout).await, + None => run_mcp_mode(cli).await, + } +} + +async fn run_daemon_mode( + port: u16, + idle_timeout: Option, +) -> Result<(), Box> { + // Daemon logs go to ~/.hyperdb/logs/ + let log_dir = discovery::state_dir()?.join("logs"); + std::fs::create_dir_all(&log_dir)?; + + let file_appender = tracing_appender::rolling::never(&log_dir, "hyperdb-daemon.log"); + let (file_writer, _file_guard) = tracing_appender::non_blocking(file_appender); + + let filter = EnvFilter::try_from_default_env() + .unwrap_or_else(|_| EnvFilter::new("info,hyperdb_mcp=debug")); + + tracing_subscriber::registry() + .with(filter) + .with(fmt::layer().with_writer(std::io::stderr)) + .with(fmt::layer().with_writer(file_writer).with_ansi(false)) + .init(); + + let config = DaemonConfig::from_args(port, idle_timeout); + daemon::run::run_daemon(config).await +} + +async fn run_mcp_mode(cli: Cli) -> Result<(), Box> { let log_dir = resolve_log_dir(cli.workspace.as_deref()); if let Err(e) = std::fs::create_dir_all(&log_dir) { eprintln!( @@ -68,13 +151,9 @@ async fn main() -> Result<(), Box> { ); } - // tracing_appender writes via a background thread; we keep the guard - // alive for the duration of `main` so buffered logs get flushed cleanly. let file_appender = tracing_appender::rolling::never(&log_dir, CLIENT_LOG_FILE_NAME); let (file_writer, _file_guard) = tracing_appender::non_blocking(file_appender); - // Default to `info` when RUST_LOG is unset so the log files are actually - // populated. Users can still override via RUST_LOG=debug,hyperdb_api=trace etc. let filter = EnvFilter::try_from_default_env() .unwrap_or_else(|_| EnvFilter::new("info,hyperdb_mcp=debug")); @@ -89,12 +168,40 @@ async fn main() -> Result<(), Box> { workspace = cli.workspace.as_deref().unwrap_or(""), read_only = cli.read_only, bare = cli.bare, + no_daemon = cli.no_daemon, "hyperdb-mcp starting" ); - let server = HyperMcpServer::new(cli.workspace, cli.read_only, cli.bare); + let server = + HyperMcpServer::with_no_daemon(cli.workspace, cli.read_only, cli.bare, cli.no_daemon); let service = server.serve(rmcp::transport::io::stdio()).await?; service.waiting().await?; Ok(()) } + +fn daemon_stop(port: u16) { + match health::send_command(port, "STOP") { + Ok(response) => { + println!("Daemon responded: {}", response.trim()); + } + Err(e) => { + eprintln!("No daemon running on port {port} (or cannot connect): {e}"); + std::process::exit(1); + } + } +} + +fn daemon_status() { + if let Some(info) = discovery::discover() { + println!("Daemon is running:"); + println!(" PID: {}", info.pid); + println!(" Hyperd endpoint: {}", info.hyperd_endpoint); + println!(" Health port: {}", info.health_port); + println!(" Started: {}", info.started_at); + println!(" Version: {}", info.version); + } else { + eprintln!("No daemon is currently running."); + std::process::exit(1); + } +} diff --git a/hyperdb-mcp/src/server.rs b/hyperdb-mcp/src/server.rs index 38c8873..0159eae 100644 --- a/hyperdb-mcp/src/server.rs +++ b/hyperdb-mcp/src/server.rs @@ -755,6 +755,10 @@ pub struct HyperMcpServer { /// [`crate::saved_queries::SessionStore`] and the catalog is never /// created or updated. bare: bool, + /// Skip the shared daemon and spawn a private `hyperd` (legacy behavior). + no_daemon: bool, + /// Last time a heartbeat was sent to the daemon (debounced to avoid per-call TCP overhead). + last_heartbeat: std::sync::Mutex, // Under rmcp 1.x the router fields are constructed for downstream // macro-generated dispatch but not read through a direct field access // that the compiler can see. Keep them; the `#[tool_router]` / @@ -797,6 +801,25 @@ impl HyperMcpServer { /// workspace file. Useful when callers want a pristine `.hyper` file /// containing only their own data. pub fn new(workspace_path: Option, read_only: bool, bare: bool) -> Self { + Self::with_options(workspace_path, read_only, bare, false) + } + + /// Create a server instance with explicit daemon control. + pub fn with_no_daemon( + workspace_path: Option, + read_only: bool, + bare: bool, + no_daemon: bool, + ) -> Self { + Self::with_options(workspace_path, read_only, bare, no_daemon) + } + + fn with_options( + workspace_path: Option, + read_only: bool, + bare: bool, + no_daemon: bool, + ) -> Self { // Bare mode forces a SessionStore regardless of workspace so the // `_hyperdb_saved_queries` meta-table is never created. let saved_queries: Arc = if bare { @@ -818,6 +841,8 @@ impl HyperMcpServer { workspace_path, read_only, bare, + no_daemon, + last_heartbeat: std::sync::Mutex::new(std::time::Instant::now()), tool_router: Self::tool_router(), prompt_router: Self::prompt_router(), } @@ -927,7 +952,11 @@ impl HyperMcpServer { bare = self.bare, "initializing hyper engine" ); - let engine = Engine::new(self.workspace_path.clone())?; + let engine = if self.no_daemon { + Engine::new_no_daemon(self.workspace_path.clone())? + } else { + Engine::new(self.workspace_path.clone())? + }; tracing::info!( workspace_path = %engine.workspace_path().display(), log_dir = %engine.log_dir().display(), @@ -1045,6 +1074,11 @@ impl HyperMcpServer { // catalog SQL can see errors classified via the normal error // path. No-op in bare or read-only mode. self.ensure_catalog_ready(engine); + // In daemon mode, send a heartbeat so the daemon knows we're still active. + // Debounced to avoid per-call TCP overhead (only sends if >60s since last). + if !self.no_daemon { + self.maybe_send_heartbeat(); + } let result = f(engine); if let Err(e) = &result { tracing::debug!(code = ?e.code, message = %e.message, "tool call returned error"); @@ -1069,6 +1103,24 @@ impl HyperMcpServer { result } + /// Best-effort heartbeat to keep the daemon alive while this client is active. + /// Debounced: only sends if more than 60 seconds have elapsed since the last heartbeat, + /// avoiding a new TCP connection on every tool call. + fn maybe_send_heartbeat(&self) { + const HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60); + let should_send = self + .last_heartbeat + .lock() + .is_ok_and(|guard| guard.elapsed() >= HEARTBEAT_INTERVAL); + if should_send { + let port = crate::daemon::discovery::resolve_port(); + let _ = crate::daemon::health::send_command(port, "HEARTBEAT"); + if let Ok(mut guard) = self.last_heartbeat.lock() { + *guard = std::time::Instant::now(); + } + } + } + /// Run a closure that accesses the saved-query store. /// /// Some store variants (notably diff --git a/hyperdb-mcp/tests/daemon_tests.rs b/hyperdb-mcp/tests/daemon_tests.rs new file mode 100644 index 0000000..9af31d8 --- /dev/null +++ b/hyperdb-mcp/tests/daemon_tests.rs @@ -0,0 +1,612 @@ +// Copyright (c) 2026, Salesforce, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR MIT + +//! Tests for the single-instance daemon: discovery file, health protocol, +//! idle timeout, and full lifecycle integration with a real `hyperd`. +//! +//! Many tests mutate process-global environment variables (`HYPERDB_STATE_DIR`, +//! `HYPERDB_DAEMON_PORT`) to isolate their state directories. Because env vars +//! are process-global, these tests MUST run sequentially. We enforce this via a +//! shared mutex — every test that touches env vars acquires `ENV_LOCK` first. + +use std::net::TcpListener; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use hyperdb_mcp::daemon::discovery::{self, DaemonInfo}; +use hyperdb_mcp::daemon::health::{self, DaemonState, HealthListener}; +use tempfile::TempDir; + +/// Process-wide lock for tests that mutate environment variables. +/// Cargo runs tests in the same process by default — this prevents races. +static ENV_LOCK: Mutex<()> = Mutex::new(()); + +// ─── Unit tests: DaemonState (no env vars, safe to run in parallel) ─────────── + +#[test] +fn daemon_state_touch_resets_idle_duration() { + let state = DaemonState::new(); + std::thread::sleep(Duration::from_millis(50)); + assert!(state.idle_duration() >= Duration::from_millis(50)); + + state.touch(); + assert!(state.idle_duration() < Duration::from_millis(30)); +} + +#[test] +fn daemon_state_shutdown_flag() { + let state = DaemonState::new(); + assert!(!state.should_shutdown()); + + state.request_shutdown(); + assert!(state.should_shutdown()); +} + +#[test] +fn daemon_state_default_is_equivalent_to_new() { + let default_state = DaemonState::default(); + assert!(!default_state.should_shutdown()); + assert!(default_state.idle_duration() < Duration::from_millis(100)); +} + +// ─── Unit tests: Health protocol (no env vars, safe to run in parallel) ─────── + +#[test] +fn health_listener_bind_succeeds_on_free_port() { + let listener = HealthListener::bind(0).unwrap(); + assert_ne!(listener.port, 0); +} + +#[test] +fn health_listener_second_bind_same_port_fails() { + let listener = HealthListener::bind(0).unwrap(); + let port = listener.port; + + let result = HealthListener::bind(port); + assert!(result.is_err()); + assert_eq!(result.unwrap_err().kind(), std::io::ErrorKind::AddrInUse); +} + +#[test] +fn health_protocol_ping_pong() { + let (port, _handle, _state) = start_health_listener(); + + let response = health::send_command(port, "PING").unwrap(); + assert_eq!(response.trim(), "PONG"); +} + +#[test] +fn health_protocol_heartbeat_resets_idle() { + let (port, _handle, state) = start_health_listener(); + + std::thread::sleep(Duration::from_millis(50)); + assert!(state.idle_duration() >= Duration::from_millis(50)); + + let response = health::send_command(port, "HEARTBEAT").unwrap(); + assert_eq!(response.trim(), "OK"); + + assert!(state.idle_duration() < Duration::from_millis(30)); +} + +#[test] +fn health_protocol_stop_triggers_shutdown() { + let (port, handle, state) = start_health_listener(); + + assert!(!state.should_shutdown()); + + let response = health::send_command(port, "STOP").unwrap(); + assert_eq!(response.trim(), "STOPPING"); + + assert!(state.should_shutdown()); + + // Health listener should exit its loop + handle.join().unwrap(); +} + +#[test] +fn health_protocol_status_returns_json() { + let (port, _handle, _state) = start_health_listener(); + + let response = health::send_command(port, "STATUS").unwrap(); + let parsed: serde_json::Value = serde_json::from_str(response.trim()).unwrap(); + assert_eq!(parsed["pid"], 12345); + assert_eq!(parsed["hyperd_endpoint"], "127.0.0.1:54321"); +} + +#[test] +fn health_protocol_unknown_command_returns_error() { + let (port, _handle, _state) = start_health_listener(); + + let response = health::send_command(port, "INVALID").unwrap(); + assert!(response.contains("ERR")); +} + +#[test] +fn health_protocol_multi_command_session() { + let (port, _handle, _state) = start_health_listener(); + + let response1 = health::send_command(port, "PING").unwrap(); + assert_eq!(response1.trim(), "PONG"); + + let response2 = health::send_command(port, "STATUS").unwrap(); + let parsed: serde_json::Value = serde_json::from_str(response2.trim()).unwrap(); + assert_eq!(parsed["health_port"], port); + + let response3 = health::send_command(port, "HEARTBEAT").unwrap(); + assert_eq!(response3.trim(), "OK"); +} + +// ─── Unit tests: idle timeout logic (no env vars) ───────────────────────────── + +#[test] +fn daemon_idle_timeout_shuts_down_daemon() { + let state = Arc::new(DaemonState::new()); + let idle_timeout = Duration::from_secs(2); + + let monitor_state = Arc::clone(&state); + let monitor = std::thread::spawn(move || loop { + std::thread::sleep(Duration::from_millis(100)); + if monitor_state.idle_duration() >= idle_timeout { + monitor_state.request_shutdown(); + break; + } + if monitor_state.should_shutdown() { + break; + } + }); + + let start = Instant::now(); + monitor.join().unwrap(); + let elapsed = start.elapsed(); + + assert!(state.should_shutdown()); + assert!(elapsed >= Duration::from_secs(2)); + assert!(elapsed < Duration::from_secs(4)); +} + +#[test] +fn daemon_heartbeat_prevents_idle_shutdown() { + let state = Arc::new(DaemonState::new()); + let idle_timeout = Duration::from_secs(1); + + let monitor_state = Arc::clone(&state); + let heartbeat_state = Arc::clone(&state); + + let heartbeat = std::thread::spawn(move || { + let start = Instant::now(); + while start.elapsed() < Duration::from_millis(1500) { + heartbeat_state.touch(); + std::thread::sleep(Duration::from_millis(200)); + } + }); + + let monitor = std::thread::spawn(move || loop { + std::thread::sleep(Duration::from_millis(100)); + if monitor_state.idle_duration() >= idle_timeout { + monitor_state.request_shutdown(); + break; + } + if monitor_state.should_shutdown() { + break; + } + }); + + heartbeat.join().unwrap(); + let start = Instant::now(); + monitor.join().unwrap(); + let after_heartbeat_stop = start.elapsed(); + + assert!(state.should_shutdown()); + assert!( + after_heartbeat_stop >= Duration::from_millis(800), + "daemon should have waited for idle timeout after heartbeats stopped" + ); +} + +// ─── Unit tests: Discovery file (require ENV_LOCK) ──────────────────────────── + +#[test] +fn discovery_file_write_and_read() { + let _lock = ENV_LOCK.lock().unwrap(); + let tmp = TempDir::new().unwrap(); + let _guard = EnvGuard::set("HYPERDB_STATE_DIR", tmp.path().to_str().unwrap()); + + let info = DaemonInfo { + pid: 12345, + hyperd_endpoint: "127.0.0.1:54321".to_string(), + health_port: 7484, + started_at: "2026-05-20T10:30:00Z".to_string(), + version: "0.1.3".to_string(), + }; + + discovery::write_discovery_file(&info).unwrap(); + + let path = tmp.path().join("daemon.json"); + assert!(path.exists()); + + let contents = std::fs::read_to_string(&path).unwrap(); + let read_back: DaemonInfo = serde_json::from_str(&contents).unwrap(); + assert_eq!(read_back.pid, 12345); + assert_eq!(read_back.hyperd_endpoint, "127.0.0.1:54321"); + assert_eq!(read_back.health_port, 7484); + assert_eq!(read_back.version, "0.1.3"); +} + +#[test] +fn discovery_file_overwrite_replaces_content() { + let _lock = ENV_LOCK.lock().unwrap(); + let tmp = TempDir::new().unwrap(); + let _guard = EnvGuard::set("HYPERDB_STATE_DIR", tmp.path().to_str().unwrap()); + + let info1 = DaemonInfo { + pid: 100, + hyperd_endpoint: "127.0.0.1:1111".to_string(), + health_port: 7484, + started_at: "2026-01-01T00:00:00Z".to_string(), + version: "0.1.0".to_string(), + }; + discovery::write_discovery_file(&info1).unwrap(); + + let info2 = DaemonInfo { + pid: 200, + hyperd_endpoint: "127.0.0.1:2222".to_string(), + health_port: 7485, + started_at: "2026-02-02T00:00:00Z".to_string(), + version: "0.2.0".to_string(), + }; + discovery::write_discovery_file(&info2).unwrap(); + + let path = tmp.path().join("daemon.json"); + let contents = std::fs::read_to_string(&path).unwrap(); + let read_back: DaemonInfo = serde_json::from_str(&contents).unwrap(); + assert_eq!(read_back.pid, 200); + assert_eq!(read_back.hyperd_endpoint, "127.0.0.1:2222"); +} + +#[test] +fn remove_discovery_file_deletes_it() { + let _lock = ENV_LOCK.lock().unwrap(); + let tmp = TempDir::new().unwrap(); + let _guard = EnvGuard::set("HYPERDB_STATE_DIR", tmp.path().to_str().unwrap()); + + let info = DaemonInfo { + pid: 1, + hyperd_endpoint: "127.0.0.1:1".to_string(), + health_port: 7484, + started_at: "2026-01-01T00:00:00Z".to_string(), + version: "0.0.1".to_string(), + }; + discovery::write_discovery_file(&info).unwrap(); + let path = tmp.path().join("daemon.json"); + assert!(path.exists()); + + discovery::remove_discovery_file(); + assert!(!path.exists()); +} + +#[test] +fn discover_returns_none_when_no_file_exists() { + let _lock = ENV_LOCK.lock().unwrap(); + let tmp = TempDir::new().unwrap(); + let _guard = EnvGuard::set("HYPERDB_STATE_DIR", tmp.path().to_str().unwrap()); + + assert!(discovery::discover().is_none()); +} + +#[test] +fn discover_returns_none_for_stale_file() { + let _lock = ENV_LOCK.lock().unwrap(); + let tmp = TempDir::new().unwrap(); + let _guard = EnvGuard::set("HYPERDB_STATE_DIR", tmp.path().to_str().unwrap()); + + let info = DaemonInfo { + pid: 99999, + hyperd_endpoint: "127.0.0.1:1".to_string(), + health_port: 1, + started_at: "2026-01-01T00:00:00Z".to_string(), + version: "0.0.1".to_string(), + }; + discovery::write_discovery_file(&info).unwrap(); + + assert!(discovery::discover().is_none()); + + let path = tmp.path().join("daemon.json"); + assert!(!path.exists()); +} + +#[test] +fn resolve_port_uses_env_var() { + let _lock = ENV_LOCK.lock().unwrap(); + let _guard = EnvGuard::set("HYPERDB_DAEMON_PORT", "9999"); + assert_eq!(discovery::resolve_port(), 9999); +} + +#[test] +fn resolve_port_uses_default_when_env_unset() { + let _lock = ENV_LOCK.lock().unwrap(); + let _guard = EnvGuard::remove("HYPERDB_DAEMON_PORT"); + assert_eq!( + discovery::resolve_port(), + hyperdb_mcp::daemon::DEFAULT_DAEMON_PORT + ); +} + +#[test] +fn discover_finds_live_daemon() { + let _lock = ENV_LOCK.lock().unwrap(); + let tmp = TempDir::new().unwrap(); + let _guard = EnvGuard::set("HYPERDB_STATE_DIR", tmp.path().to_str().unwrap()); + + let (port, _handle, _state) = start_health_listener(); + + let info = DaemonInfo { + pid: 12345, + hyperd_endpoint: "127.0.0.1:54321".to_string(), + health_port: port, + started_at: "2026-05-20T10:30:00Z".to_string(), + version: "0.1.3".to_string(), + }; + discovery::write_discovery_file(&info).unwrap(); + + let discovered = discovery::discover().expect("should discover live daemon"); + assert_eq!(discovered.pid, 12345); + assert_eq!(discovered.health_port, port); +} + +// ─── Integration tests: full daemon lifecycle with real hyperd ───────────────── + +#[test] +fn daemon_mode_engine_connects_to_shared_hyperd() { + let _lock = ENV_LOCK.lock().unwrap(); + let daemon = TestDaemon::start(); + + let tmp = TempDir::new().unwrap(); + let workspace_path = tmp.path().join("test.hyper"); + + let engine = + hyperdb_mcp::engine::Engine::new(Some(workspace_path.to_str().unwrap().to_string())) + .expect("engine should connect to daemon"); + + assert!(engine.is_running()); + + let endpoint = engine.hyperd_endpoint().unwrap(); + assert_eq!(endpoint, daemon.info.hyperd_endpoint); +} + +#[test] +fn daemon_mode_two_engines_share_same_hyperd() { + let _lock = ENV_LOCK.lock().unwrap(); + let _daemon = TestDaemon::start(); + + let tmp1 = TempDir::new().unwrap(); + let tmp2 = TempDir::new().unwrap(); + let path1 = tmp1.path().join("db1.hyper"); + let path2 = tmp2.path().join("db2.hyper"); + + let engine1 = + hyperdb_mcp::engine::Engine::new(Some(path1.to_str().unwrap().to_string())).unwrap(); + + let engine2 = + hyperdb_mcp::engine::Engine::new(Some(path2.to_str().unwrap().to_string())).unwrap(); + + assert_eq!( + engine1.hyperd_endpoint().unwrap(), + engine2.hyperd_endpoint().unwrap() + ); + + engine1.execute_command("CREATE TABLE foo (x INT)").unwrap(); + engine1 + .execute_command("INSERT INTO foo VALUES (42)") + .unwrap(); + + let tables = engine2.describe_tables().unwrap(); + assert!( + tables.iter().all(|t| t["name"] != "foo"), + "engine2 should not see engine1's table" + ); +} + +#[test] +fn daemon_mode_persistent_database_file_survives_engine_drop() { + let _lock = ENV_LOCK.lock().unwrap(); + let _daemon = TestDaemon::start(); + let tmp = TempDir::new().unwrap(); + let path = tmp.path().join("persistent.hyper"); + let path_str = path.to_str().unwrap().to_string(); + + { + let engine = hyperdb_mcp::engine::Engine::new(Some(path_str.clone())).unwrap(); + engine + .execute_command("CREATE TABLE survive (val TEXT)") + .unwrap(); + engine + .execute_command("INSERT INTO survive VALUES ('hello')") + .unwrap(); + } + + assert!( + path.exists(), + "persistent .hyper file should survive engine drop" + ); +} + +#[test] +fn daemon_mode_persistent_engine_data_is_queryable() { + let _lock = ENV_LOCK.lock().unwrap(); + let daemon = TestDaemon::start(); + let tmp = TempDir::new().unwrap(); + let path = tmp.path().join("queryable.hyper"); + let path_str = path.to_str().unwrap().to_string(); + + let engine = hyperdb_mcp::engine::Engine::new(Some(path_str)).unwrap(); + engine + .execute_command("CREATE TABLE items (id INT, name TEXT)") + .unwrap(); + engine + .execute_command("INSERT INTO items VALUES (1, 'alpha'), (2, 'beta')") + .unwrap(); + + let rows = engine + .execute_query_to_json("SELECT * FROM items ORDER BY id") + .unwrap(); + assert_eq!(rows.len(), 2); + assert_eq!(rows[0]["name"], "alpha"); + assert_eq!(rows[1]["name"], "beta"); + + let resp = health::send_command(daemon.info.health_port, "PING").unwrap(); + assert_eq!(resp.trim(), "PONG"); +} + +#[test] +fn daemon_mode_ephemeral_database_cleaned_up_on_drop() { + let _lock = ENV_LOCK.lock().unwrap(); + let _daemon = TestDaemon::start(); + + let engine = hyperdb_mcp::engine::Engine::new(None).unwrap(); + let workspace_path = engine.workspace_path().to_path_buf(); + + assert!(workspace_path.exists()); + + engine + .execute_command("CREATE TABLE ephemeral_test (id INT)") + .unwrap(); + + drop(engine); + + assert!( + !workspace_path.exists(), + "ephemeral .hyper file should be deleted after engine drop" + ); +} + +// ─── Test helpers ───────────────────────────────────────────────────────────── + +/// Starts a health listener on a random port and returns the port, join handle, +/// and shared state. Does NOT touch env vars — safe for parallel use. +fn start_health_listener() -> (u16, std::thread::JoinHandle<()>, Arc) { + let listener = HealthListener::bind(0).unwrap(); + let port = listener.port; + let state = Arc::new(DaemonState::new()); + let run_state = Arc::clone(&state); + + let info = DaemonInfo { + pid: 12345, + hyperd_endpoint: "127.0.0.1:54321".to_string(), + health_port: port, + started_at: "2026-05-20T10:30:00Z".to_string(), + version: "0.1.3".to_string(), + }; + + let handle = std::thread::spawn(move || { + listener.run(run_state, info); + }); + + // Give the listener a moment to start accepting + std::thread::sleep(Duration::from_millis(50)); + + (port, handle, state) +} + +/// A real daemon running in a background thread for integration tests. +/// Sets `HYPERDB_STATE_DIR` and `HYPERDB_DAEMON_PORT` to isolated values. +/// Caller MUST hold `ENV_LOCK` before calling `start()`. +struct TestDaemon { + info: DaemonInfo, + _state_dir_guard: EnvGuard, + _port_guard: EnvGuard, +} + +impl TestDaemon { + fn start() -> Self { + let tmp = TempDir::new().unwrap(); + // Leak the TempDir so it persists for the lifetime of the test. + let tmp = Box::leak(Box::new(tmp)); + + let state_dir_guard = EnvGuard::set("HYPERDB_STATE_DIR", tmp.path().to_str().unwrap()); + + let port = find_free_port(); + let port_guard = EnvGuard::set("HYPERDB_DAEMON_PORT", &port.to_string()); + + // Start the daemon in a background tokio runtime + let daemon_port = port; + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let config = hyperdb_mcp::daemon::run::DaemonConfig { + port: daemon_port, + idle_timeout: Duration::from_secs(300), + }; + let _ = hyperdb_mcp::daemon::run::run_daemon(config).await; + }); + }); + + // Wait for daemon to become ready + let start = Instant::now(); + loop { + if let Some(info) = discovery::discover() { + return Self { + info, + _state_dir_guard: state_dir_guard, + _port_guard: port_guard, + }; + } + assert!( + start.elapsed() <= Duration::from_secs(15), + "TestDaemon did not start within 15 seconds" + ); + std::thread::sleep(Duration::from_millis(100)); + } + } +} + +impl Drop for TestDaemon { + fn drop(&mut self) { + let _ = health::send_command(self.info.health_port, "STOP"); + std::thread::sleep(Duration::from_millis(200)); + } +} + +/// Find a free TCP port by binding to port 0 and reading the assigned port. +fn find_free_port() -> u16 { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + listener.local_addr().unwrap().port() +} + +/// RAII guard that sets/removes an environment variable and restores it on drop. +struct EnvGuard { + key: String, + previous: Option, +} + +impl EnvGuard { + fn set(key: &str, value: &str) -> Self { + let previous = std::env::var(key).ok(); + // SAFETY: Callers hold ENV_LOCK, ensuring no concurrent env var access. + unsafe { std::env::set_var(key, value) }; + Self { + key: key.to_string(), + previous, + } + } + + fn remove(key: &str) -> Self { + let previous = std::env::var(key).ok(); + // SAFETY: Callers hold ENV_LOCK, ensuring no concurrent env var access. + unsafe { std::env::remove_var(key) }; + Self { + key: key.to_string(), + previous, + } + } +} + +impl Drop for EnvGuard { + fn drop(&mut self) { + match &self.previous { + // SAFETY: Callers hold ENV_LOCK for the lifetime of this guard. + Some(val) => unsafe { std::env::set_var(&self.key, val) }, + // SAFETY: Callers hold ENV_LOCK for the lifetime of this guard. + None => unsafe { std::env::remove_var(&self.key) }, + } + } +}