diff --git a/Cargo.lock b/Cargo.lock index 733d888..15c8a7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1811,7 +1811,7 @@ dependencies = [ [[package]] name = "hyperdb-api" -version = "0.1.2" +version = "0.1.3" dependencies = [ "arrow", "bytes", @@ -1832,7 +1832,7 @@ dependencies = [ [[package]] name = "hyperdb-api-core" -version = "0.1.2" +version = "0.1.3" dependencies = [ "arrow", "base64", @@ -1872,7 +1872,7 @@ dependencies = [ [[package]] name = "hyperdb-api-node" -version = "0.1.2" +version = "0.1.3" dependencies = [ "hyperdb-api", "napi", @@ -1884,7 +1884,7 @@ dependencies = [ [[package]] name = "hyperdb-api-salesforce" -version = "0.1.2" +version = "0.1.3" dependencies = [ "arrow", "base64", @@ -1905,7 +1905,7 @@ dependencies = [ [[package]] name = "hyperdb-bootstrap" -version = "0.1.2" +version = "0.1.3" dependencies = [ "anyhow", "clap", @@ -1923,7 +1923,7 @@ dependencies = [ [[package]] name = "hyperdb-mcp" -version = "0.1.2" +version = "0.1.3" dependencies = [ "arrow", "base64", @@ -1931,6 +1931,7 @@ dependencies = [ "clap", "csv", "hyperdb-api", + "libc", "notify", "parquet", "plotters", @@ -3767,7 +3768,7 @@ dependencies = [ [[package]] name = "sea-query-hyperdb" -version = "0.1.2" +version = "0.1.3" dependencies = [ "sea-query", ] diff --git a/hyperdb-mcp/Cargo.toml b/hyperdb-mcp/Cargo.toml index 17ac7f2..ae36e8f 100644 --- a/hyperdb-mcp/Cargo.toml +++ b/hyperdb-mcp/Cargo.toml @@ -22,7 +22,7 @@ path = "src/main.rs" [dependencies] hyperdb-api = { path = "../hyperdb-api", version = "0.1.1" } rmcp = { version = "1.7", features = ["server", "transport-io"] } -tokio = { version = "1", features = ["rt-multi-thread", "macros", "io-std", "signal"] } +tokio = { version = "1", features = ["rt-multi-thread", "macros", "io-std", "signal", "time"] } serde = { workspace = true } serde_json = { workspace = true, features = ["preserve_order"] } clap = { version = "4", features = ["derive"] } @@ -42,6 +42,9 @@ tokio-util = { version = "0.7", features = ["rt"] } tempfile = { workspace = true } sqlformat = "0.5.0" +[target.'cfg(unix)'.dependencies] +libc = "0.2" + [lints] workspace = true diff --git a/hyperdb-mcp/src/daemon/discovery.rs b/hyperdb-mcp/src/daemon/discovery.rs new file mode 100644 index 0000000..002ffb9 --- /dev/null +++ b/hyperdb-mcp/src/daemon/discovery.rs @@ -0,0 +1,125 @@ +// Copyright (c) 2026, Salesforce, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR MIT + +//! Discovery file management for the single-instance daemon. +//! +//! The daemon writes a JSON file to `~/.hyperdb/daemon.json` containing its +//! PID and the `hyperd` endpoint. Clients read this file to locate the running +//! daemon, validating liveness via a TCP health check before trusting it. + +use std::io; +use std::net::TcpStream; +use std::path::PathBuf; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; + +use super::DEFAULT_DAEMON_PORT; + +/// Information written by the daemon so clients can discover and connect. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DaemonInfo { + /// OS process ID of the daemon. + pub pid: u32, + /// The `hyperd` libpq endpoint clients should connect to (e.g. `127.0.0.1:54321`). + pub hyperd_endpoint: String, + /// The TCP port the daemon's health listener is bound to. + pub health_port: u16, + /// ISO-8601 timestamp when the daemon started. + pub started_at: String, + /// Version of the daemon binary. + pub version: String, +} + +/// Returns the directory used for daemon state files. +/// +/// Resolution order: +/// 1. `HYPERDB_STATE_DIR` environment variable (if set) +/// 2. `~/.hyperdb/` (where `~` is `HOME` on Unix, `USERPROFILE` on Windows) +/// +/// # Errors +/// Returns an error if neither the env var nor the home directory can be determined. +pub fn state_dir() -> io::Result { + if let Some(dir) = std::env::var_os("HYPERDB_STATE_DIR") { + return Ok(PathBuf::from(dir)); + } + let home = home_dir().ok_or_else(|| { + io::Error::new(io::ErrorKind::NotFound, "cannot determine home directory") + })?; + Ok(home.join(".hyperdb")) +} + +/// Returns the path to the discovery file. +/// +/// # Errors +/// Returns an error if the home directory cannot be determined. +pub fn discovery_file_path() -> io::Result { + Ok(state_dir()?.join("daemon.json")) +} + +/// Write the discovery file atomically (write-to-temp then rename). +/// +/// # Errors +/// Returns an error if the state directory cannot be created or the file cannot be written. +pub fn write_discovery_file(info: &DaemonInfo) -> io::Result<()> { + let dir = state_dir()?; + std::fs::create_dir_all(&dir)?; + + let path = dir.join("daemon.json"); + let tmp_path = dir.join("daemon.json.tmp"); + let json = serde_json::to_string_pretty(info).map_err(|e| io::Error::other(e.to_string()))?; + std::fs::write(&tmp_path, json.as_bytes())?; + // On Windows, rename fails if target exists. Remove stale target first. + let _ = std::fs::remove_file(&path); + std::fs::rename(&tmp_path, &path)?; + Ok(()) +} + +/// Read the discovery file and validate that the daemon is still alive. +/// Returns `None` if no daemon is running (file missing, stale, or unreachable). +pub fn discover() -> Option { + let path = discovery_file_path().ok()?; + let contents = std::fs::read_to_string(&path).ok()?; + let info: DaemonInfo = serde_json::from_str(&contents).ok()?; + + // Validate liveness by connecting to the health port + if is_daemon_alive(info.health_port) { + Some(info) + } else { + // Stale file — daemon crashed. Clean up. + let _ = std::fs::remove_file(&path); + None + } +} + +/// Remove the discovery file (called during graceful shutdown). +pub fn remove_discovery_file() { + if let Ok(path) = discovery_file_path() { + let _ = std::fs::remove_file(&path); + } +} + +/// Check if the daemon is alive by attempting a TCP connection to its health port. +fn is_daemon_alive(port: u16) -> bool { + TcpStream::connect_timeout( + &std::net::SocketAddr::from(([127, 0, 0, 1], port)), + Duration::from_secs(2), + ) + .is_ok() +} + +/// Resolve the daemon health port from environment or default. +pub fn resolve_port() -> u16 { + std::env::var(super::ENV_DAEMON_PORT) + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(DEFAULT_DAEMON_PORT) +} + +/// Cross-platform home directory resolution. +fn home_dir() -> Option { + // Try HOME (Unix) then USERPROFILE (Windows) + std::env::var_os("HOME") + .or_else(|| std::env::var_os("USERPROFILE")) + .map(PathBuf::from) +} diff --git a/hyperdb-mcp/src/daemon/health.rs b/hyperdb-mcp/src/daemon/health.rs new file mode 100644 index 0000000..d9a4c69 --- /dev/null +++ b/hyperdb-mcp/src/daemon/health.rs @@ -0,0 +1,186 @@ +// Copyright (c) 2026, Salesforce, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR MIT + +//! TCP health listener for the daemon. +//! +//! The health listener serves two purposes: +//! 1. **Single-instance lock** — binding the port guarantees at most one daemon per user. +//! 2. **Liveness probe + heartbeat** — clients connect and send simple text commands. +//! +//! Protocol (line-based, newline-terminated): +//! - `PING\n` → `PONG\n` (liveness check) +//! - `HEARTBEAT\n` → `OK\n` (resets idle timer) +//! - `STOP\n` → `STOPPING\n` (triggers graceful shutdown) +//! - `STATUS\n` → JSON line with daemon info + +use std::io::{BufRead, BufReader, Write}; +use std::net::{TcpListener, TcpStream}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Instant; + +use tracing::{debug, warn}; + +use super::discovery::DaemonInfo; + +/// Handle to the health listener, used to check binding success and manage lifecycle. +#[derive(Debug)] +pub struct HealthListener { + listener: TcpListener, + pub port: u16, +} + +/// Shared state between the health listener and the daemon main loop. +#[derive(Debug)] +pub struct DaemonState { + /// Last time any client sent a heartbeat or query. + pub last_activity: std::sync::Mutex, + /// Signal to shut down the daemon. + pub shutdown: AtomicBool, +} + +impl Default for DaemonState { + fn default() -> Self { + Self::new() + } +} + +impl DaemonState { + pub fn new() -> Self { + Self { + last_activity: std::sync::Mutex::new(Instant::now()), + shutdown: AtomicBool::new(false), + } + } + + /// Record activity (resets idle timer). + /// + /// # Panics + /// Panics if the internal mutex is poisoned. + pub fn touch(&self) { + *self.last_activity.lock().expect("mutex poisoned") = Instant::now(); + } + + /// Duration since the last activity. + /// + /// # Panics + /// Panics if the internal mutex is poisoned. + pub fn idle_duration(&self) -> std::time::Duration { + self.last_activity.lock().expect("mutex poisoned").elapsed() + } + + pub fn request_shutdown(&self) { + self.shutdown.store(true, Ordering::Release); + } + + pub fn should_shutdown(&self) -> bool { + self.shutdown.load(Ordering::Acquire) + } +} + +impl HealthListener { + /// Try to bind the health port. + /// + /// # Errors + /// Returns `Err` if the port is already in use (another daemon is running) + /// or the bind fails for another reason. + pub fn bind(port: u16) -> std::io::Result { + let addr = std::net::SocketAddr::from(([127, 0, 0, 1], port)); + let listener = TcpListener::bind(addr)?; + listener.set_nonblocking(true)?; + let port = listener.local_addr()?.port(); + Ok(Self { listener, port }) + } + + /// Run the health listener loop. Spawns per-connection threads until shutdown. + /// Consumes `self` because this is intended to be called from a dedicated thread. + #[expect( + clippy::needless_pass_by_value, + reason = "Arc and DaemonInfo are cloned into per-connection threads" + )] + pub fn run(self, state: Arc, info: DaemonInfo) { + loop { + if state.should_shutdown() { + break; + } + + match self.listener.accept() { + Ok((stream, _addr)) => { + let state = Arc::clone(&state); + let info = info.clone(); + std::thread::spawn(move || { + handle_client(stream, &state, &info); + }); + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + std::thread::sleep(std::time::Duration::from_millis(100)); + } + Err(e) => { + warn!(error = %e, "health listener accept error"); + std::thread::sleep(std::time::Duration::from_millis(500)); + } + } + } + debug!("health listener shut down"); + } +} + +#[expect( + clippy::needless_pass_by_value, + reason = "TcpStream must be owned for BufReader" +)] +fn handle_client(stream: TcpStream, state: &DaemonState, info: &DaemonInfo) { + let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(5))); + let mut reader = BufReader::new(&stream); + let mut writer = &stream; + let mut line = String::new(); + + loop { + line.clear(); + match reader.read_line(&mut line) { + Ok(0) => break, + Ok(_) => { + let cmd = line.trim(); + let response = match cmd { + "PING" => "PONG\n".to_string(), + "HEARTBEAT" => { + state.touch(); + "OK\n".to_string() + } + "STOP" => { + state.request_shutdown(); + "STOPPING\n".to_string() + } + "STATUS" => { + let json = serde_json::to_string(info).unwrap_or_default(); + format!("{json}\n") + } + _ => "ERR unknown command\n".to_string(), + }; + if writer.write_all(response.as_bytes()).is_err() { + break; + } + } + Err(_) => break, + } + } +} + +/// Send a command to the daemon's health port and return the response. +/// +/// # Errors +/// Returns an error if the connection fails or the response cannot be read. +pub fn send_command(port: u16, command: &str) -> std::io::Result { + let addr = std::net::SocketAddr::from(([127, 0, 0, 1], port)); + let mut stream = TcpStream::connect_timeout(&addr, std::time::Duration::from_secs(2))?; + stream.set_read_timeout(Some(std::time::Duration::from_secs(5)))?; + + let msg = format!("{command}\n"); + stream.write_all(msg.as_bytes())?; + stream.flush()?; + + let mut reader = BufReader::new(&stream); + let mut response = String::new(); + reader.read_line(&mut response)?; + Ok(response) +} diff --git a/hyperdb-mcp/src/daemon/mod.rs b/hyperdb-mcp/src/daemon/mod.rs new file mode 100644 index 0000000..8f870a7 --- /dev/null +++ b/hyperdb-mcp/src/daemon/mod.rs @@ -0,0 +1,21 @@ +// Copyright (c) 2026, Salesforce, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR MIT + +//! Single-instance daemon for sharing a `hyperd` process across MCP clients. + +pub mod discovery; +pub mod health; +pub mod run; +pub mod spawn; + +/// Default TCP port the daemon binds for health checks and single-instance locking. +pub const DEFAULT_DAEMON_PORT: u16 = 7484; + +/// Default idle timeout in seconds before the daemon shuts down. +pub const DEFAULT_IDLE_TIMEOUT_SECS: u64 = 30 * 60; // 30 minutes + +/// Environment variable to override the daemon port. +pub const ENV_DAEMON_PORT: &str = "HYPERDB_DAEMON_PORT"; + +/// Environment variable to override the idle timeout (seconds). +pub const ENV_IDLE_TIMEOUT: &str = "HYPERDB_DAEMON_IDLE_TIMEOUT"; diff --git a/hyperdb-mcp/src/daemon/run.rs b/hyperdb-mcp/src/daemon/run.rs new file mode 100644 index 0000000..86b95a3 --- /dev/null +++ b/hyperdb-mcp/src/daemon/run.rs @@ -0,0 +1,152 @@ +// Copyright (c) 2026, Salesforce, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR MIT + +//! Daemon main loop: spawns `hyperd`, runs health listener, monitors idle timeout. + +use std::sync::Arc; +use std::time::Duration; + +use tokio::signal; +use tracing::info; + +use hyperdb_api::{HyperProcess, Parameters, TransportMode}; + +use super::discovery::{self, DaemonInfo}; +use super::health::{DaemonState, HealthListener}; +use super::{DEFAULT_IDLE_TIMEOUT_SECS, ENV_IDLE_TIMEOUT}; + +/// Configuration for the daemon process. +#[derive(Debug)] +pub struct DaemonConfig { + pub port: u16, + pub idle_timeout: Duration, +} + +impl DaemonConfig { + pub fn from_args(port: u16, idle_timeout_secs: Option) -> Self { + let idle_timeout_secs = idle_timeout_secs + .or_else(|| { + std::env::var(ENV_IDLE_TIMEOUT) + .ok() + .and_then(|v| v.parse().ok()) + }) + .unwrap_or(DEFAULT_IDLE_TIMEOUT_SECS); + + Self { + port, + idle_timeout: Duration::from_secs(idle_timeout_secs), + } + } +} + +/// Run the daemon. This function blocks until shutdown is triggered. +/// +/// # Errors +/// Returns an error if the health port cannot be bound, `hyperd` fails to start, +/// or the discovery file cannot be written. +pub async fn run_daemon(config: DaemonConfig) -> Result<(), Box> { + // Step 1: Bind health port (single-instance lock) + let listener = HealthListener::bind(config.port).map_err(|e| { + if e.kind() == std::io::ErrorKind::AddrInUse { + format!( + "Another hyperdb daemon is already running on port {}. \ + Use `hyperdb-mcp daemon status` to check or `hyperdb-mcp daemon stop` to stop it.", + config.port + ) + } else { + format!("Failed to bind health port {}: {e}", config.port) + } + })?; + let bound_port = listener.port; + info!(port = bound_port, "daemon health listener bound"); + + // Step 2: Spawn HyperProcess with TCP transport (shared across clients) + let log_dir = discovery::state_dir()?.join("logs"); + std::fs::create_dir_all(&log_dir)?; + + let mut params = Parameters::new(); + params.set("log_file_max_count", "2"); + params.set("log_file_size_limit", "100M"); + params.set("log_dir", log_dir.to_string_lossy().as_ref()); + params.set_transport_mode(TransportMode::Tcp); + + let hyper = HyperProcess::new(None, Some(¶ms))?; + let endpoint = hyper + .endpoint() + .ok_or("hyperd did not report an endpoint")? + .to_string(); + info!(endpoint = %endpoint, "hyperd started"); + + // Step 3: Write discovery file + let info = DaemonInfo { + pid: std::process::id(), + hyperd_endpoint: endpoint.clone(), + health_port: bound_port, + started_at: chrono::Utc::now().to_rfc3339(), + version: env!("CARGO_PKG_VERSION").to_string(), + }; + discovery::write_discovery_file(&info)?; + info!(path = %discovery::discovery_file_path()?.display(), "discovery file written"); + + // Step 4: Start health listener in background thread + let state = Arc::new(DaemonState::new()); + let health_state = Arc::clone(&state); + let health_info = info.clone(); + let health_handle = std::thread::spawn(move || { + listener.run(health_state, health_info); + }); + + // Step 5: Monitor idle timeout + OS signals + let idle_timeout = config.idle_timeout; + let shutdown_state = Arc::clone(&state); + + tokio::select! { + () = async { + loop { + tokio::time::sleep(Duration::from_secs(10)).await; + if shutdown_state.idle_duration() >= idle_timeout { + info!( + idle_secs = idle_timeout.as_secs(), + "idle timeout reached, shutting down" + ); + shutdown_state.request_shutdown(); + break; + } + if shutdown_state.should_shutdown() { + break; + } + } + } => {} + () = shutdown_signal() => { + info!("received shutdown signal"); + state.request_shutdown(); + } + } + + // Step 6: Graceful shutdown + info!("shutting down daemon"); + discovery::remove_discovery_file(); + drop(hyper); // closes callback connection → hyperd exits + let _ = health_handle.join(); + + Ok(()) +} + +async fn shutdown_signal() { + let ctrl_c = signal::ctrl_c(); + + #[cfg(unix)] + { + let mut sigterm = + signal::unix::signal(signal::unix::SignalKind::terminate()).expect("sigterm handler"); + tokio::select! { + _ = ctrl_c => {} + _ = sigterm.recv() => {} + } + } + + #[cfg(not(unix))] + { + ctrl_c.await.ok(); + } +} diff --git a/hyperdb-mcp/src/daemon/spawn.rs b/hyperdb-mcp/src/daemon/spawn.rs new file mode 100644 index 0000000..deb3de4 --- /dev/null +++ b/hyperdb-mcp/src/daemon/spawn.rs @@ -0,0 +1,104 @@ +// Copyright (c) 2026, Salesforce, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR MIT + +//! Spawn the daemon as a detached background process. +//! +//! When an MCP client starts and no daemon is running, it spawns one using the +//! current binary with the `daemon` subcommand. The spawned process is fully +//! detached so it outlives the parent MCP session. + +use std::io; +use std::process::Command; +use std::time::{Duration, Instant}; + +use tracing::{debug, info}; + +use super::discovery::{self, DaemonInfo}; + +/// Maximum time to wait for the daemon to write its discovery file after spawning. +const SPAWN_TIMEOUT: Duration = Duration::from_secs(10); + +/// Polling interval while waiting for the discovery file. +const POLL_INTERVAL: Duration = Duration::from_millis(100); + +/// Ensure a daemon is running and return its info. +/// If no daemon is detected, spawn one and wait for it to become ready. +/// +/// # Errors +/// Returns an error if the daemon cannot be spawned or does not become ready +/// within the timeout period. +pub fn ensure_daemon(port: u16) -> io::Result { + // Check if already running + if let Some(info) = discovery::discover() { + debug!(endpoint = %info.hyperd_endpoint, "daemon already running"); + return Ok(info); + } + + info!("no running daemon detected, spawning one"); + spawn_detached(port)?; + wait_for_daemon() +} + +/// Spawn `hyperdb-mcp daemon` as a fully detached background process. +fn spawn_detached(port: u16) -> io::Result<()> { + let exe = std::env::current_exe()?; + let port_str = port.to_string(); + + let mut cmd = Command::new(&exe); + cmd.arg("daemon").arg("--port").arg(&port_str); + + // Detach from parent: redirect stdio to null + cmd.stdin(std::process::Stdio::null()); + cmd.stdout(std::process::Stdio::null()); + cmd.stderr(std::process::Stdio::null()); + + // Platform-specific detach flags + #[cfg(unix)] + { + use std::os::unix::process::CommandExt; + // SAFETY: setsid() is async-signal-safe per POSIX. Called in pre_exec + // (between fork and exec) to create a new session so the daemon isn't + // killed when the parent terminal/process exits. + unsafe { + cmd.pre_exec(|| { + libc::setsid(); + Ok(()) + }); + } + } + + #[cfg(windows)] + { + use std::os::windows::process::CommandExt; + const CREATE_NO_WINDOW: u32 = 0x0800_0000; + const DETACHED_PROCESS: u32 = 0x0000_0008; + cmd.creation_flags(CREATE_NO_WINDOW | DETACHED_PROCESS); + } + + let child = cmd.spawn()?; + info!(pid = child.id(), "daemon process spawned"); + Ok(()) +} + +/// Poll for the discovery file to appear (daemon is ready). +fn wait_for_daemon() -> io::Result { + let start = Instant::now(); + loop { + if let Some(info) = discovery::discover() { + info!(endpoint = %info.hyperd_endpoint, "daemon is ready"); + return Ok(info); + } + + if start.elapsed() >= SPAWN_TIMEOUT { + return Err(io::Error::new( + io::ErrorKind::TimedOut, + format!( + "daemon did not become ready within {} seconds", + SPAWN_TIMEOUT.as_secs() + ), + )); + } + + std::thread::sleep(POLL_INTERVAL); + } +} diff --git a/hyperdb-mcp/src/engine.rs b/hyperdb-mcp/src/engine.rs index f47cc4e..b225909 100644 --- a/hyperdb-mcp/src/engine.rs +++ b/hyperdb-mcp/src/engine.rs @@ -38,23 +38,29 @@ //! optimization could use `spawn_blocking` or an async connection API, but the //! current approach is correct and simple. +use crate::daemon; use crate::error::{ErrorCode, McpError}; use crate::schema::ColumnSchema; use hyperdb_api::{Catalog, Connection, CreateMode, HyperProcess, Parameters, SqlType}; use serde_json::{json, Value}; use std::path::{Path, PathBuf}; -/// Owns a running `HyperProcess` and the single `Connection` to its workspace -/// `.hyper` file. All SQL execution flows through this struct. +/// Owns a connection to `hyperd` and the workspace `.hyper` file. All SQL +/// execution flows through this struct. /// -/// Two workspace modes are supported: -/// - **Persistent** — caller supplies a path; the `.hyper` file survives across -/// sessions so tables can be built up incrementally. -/// - **Ephemeral** — a temp directory is created per process; everything is -/// discarded when the server exits. +/// Two process modes: +/// - **Local** — this engine owns the `HyperProcess` subprocess directly. +/// - **Daemon** — a shared daemon manages `hyperd`; the engine only holds a connection. +/// +/// Two workspace modes: +/// - **Persistent** — caller supplies a path; the `.hyper` file survives across sessions. +/// - **Ephemeral** — a temp directory is created per process; discarded on exit. #[derive(Debug)] pub struct Engine { - hyper: HyperProcess, + /// `None` in daemon mode (the daemon owns the process). + hyper: Option, + /// Stored endpoint for daemon mode (the daemon advertises this). + daemon_endpoint: Option, connection: Connection, workspace_path: PathBuf, log_dir: PathBuf, @@ -62,17 +68,10 @@ pub struct Engine { } impl Engine { - #[expect( - clippy::needless_pass_by_value, - reason = "call-site ergonomics: function consumes logically-owned parameters, refactoring signatures is not worth per-site churn" - )] /// Create a new Engine. If `workspace_path` is Some, use that path (persistent mode). /// If None, use a temp file (ephemeral mode). /// - /// Logs from `hyperd` are written to the directory returned by - /// [`resolve_log_dir`]. The same directory should be used by the MCP - /// binary for its own client-side log so operators can find everything - /// in one place when debugging. + /// Connects to the shared daemon if available, falling back to a local `hyperd`. /// /// # Errors /// @@ -85,6 +84,22 @@ impl Engine { /// reports the `hyperd` executable is missing or unreachable via /// `HYPERD_PATH`. pub fn new(workspace_path: Option) -> Result { + Self::new_with_mode(workspace_path, false) + } + + /// Create an engine that bypasses the shared daemon and spawns a private `hyperd`. + /// + /// # Errors + /// Same as [`Self::new`]. + pub fn new_no_daemon(workspace_path: Option) -> Result { + Self::new_with_mode(workspace_path, true) + } + + #[expect( + clippy::needless_pass_by_value, + reason = "Option is consumed by the workspace path resolution logic" + )] + fn new_with_mode(workspace_path: Option, no_daemon: bool) -> Result { let (path, is_persistent) = if let Some(ref p) = workspace_path { let path = PathBuf::from(shellexpand_tilde(p)); if let Some(parent) = path.parent() { @@ -115,6 +130,14 @@ impl Engine { ) })?; + // Try daemon mode first unless disabled + if !no_daemon { + if let Some(engine) = Self::try_daemon_mode(&path, &log_dir, is_persistent)? { + return Ok(engine); + } + } + + // Fall back to spawning a local HyperProcess let mut params = Parameters::new(); params.set("log_file_max_count", "2"); params.set("log_file_size_limit", "100M"); @@ -135,20 +158,11 @@ impl Engine { McpError::new(ErrorCode::InternalError, format!("Failed to connect: {e}")) })?; - // Ensure the `public` schema exists in the workspace database so that - // `load_file`, `load_data`, and unqualified `CREATE TABLE` statements - // resolve without a "could not resolve the schema (3F000)" error. - connection - .execute_command("CREATE SCHEMA IF NOT EXISTS public") - .map_err(|e| { - McpError::new( - ErrorCode::InternalError, - format!("Failed to bootstrap public schema: {e}"), - ) - })?; + bootstrap_public_schema(&connection)?; Ok(Self { - hyper, + hyper: Some(hyper), + daemon_endpoint: None, connection, workspace_path: path, log_dir, @@ -156,22 +170,80 @@ impl Engine { }) } - /// Whether the `hyperd` child process is still alive. + /// Attempt to connect via the shared daemon. Returns `None` if the daemon + /// cannot be reached (falls back to local mode). + fn try_daemon_mode( + path: &Path, + log_dir: &Path, + is_persistent: bool, + ) -> Result, McpError> { + let port = daemon::discovery::resolve_port(); + let info = match daemon::spawn::ensure_daemon(port) { + Ok(info) => info, + Err(e) => { + tracing::debug!(error = %e, "daemon unavailable, falling back to local mode"); + return Ok(None); + } + }; + + let endpoint = &info.hyperd_endpoint; + let connection = Connection::connect( + endpoint, + &path.to_string_lossy(), + CreateMode::CreateIfNotExists, + ) + .map_err(|e| { + McpError::new( + ErrorCode::InternalError, + format!("Failed to connect to daemon hyperd at {endpoint}: {e}"), + ) + })?; + + bootstrap_public_schema(&connection)?; + + // Send heartbeat so daemon knows we're active + let _ = daemon::health::send_command(info.health_port, "HEARTBEAT"); + + Ok(Some(Self { + hyper: None, + daemon_endpoint: Some(info.hyperd_endpoint), + connection, + workspace_path: path.to_path_buf(), + log_dir: log_dir.to_path_buf(), + is_persistent, + })) + } + + /// Whether the backing `hyperd` process is still alive. + /// In daemon mode, checks the daemon health port. pub fn is_running(&self) -> bool { - self.hyper.is_running() + if let Some(ref hyper) = self.hyper { + hyper.is_running() + } else { + // Daemon mode: check if daemon is still reachable + daemon::discovery::discover().is_some() + } } - /// `host:port` endpoint of the hyperd child process. Used by the + /// `host:port` endpoint of the `hyperd` process. Used by the /// watcher to build additional async connections via `hyperdb_api::pool` /// without touching the primary sync connection this engine holds. /// /// # Errors /// - /// Returns [`ErrorCode::InternalError`] if the underlying - /// [`HyperProcess::require_endpoint`] call fails — typically when - /// `hyperd` has exited or never successfully reported an endpoint. + /// Returns [`ErrorCode::InternalError`] if the endpoint is unavailable. pub fn hyperd_endpoint(&self) -> Result { + if let Some(ref endpoint) = self.daemon_endpoint { + return Ok(endpoint.clone()); + } self.hyper + .as_ref() + .ok_or_else(|| { + McpError::new( + ErrorCode::InternalError, + "no hyperd endpoint available".to_string(), + ) + })? .require_endpoint() .map(std::string::ToString::to_string) .map_err(|e| McpError::new(ErrorCode::InternalError, e.to_string())) @@ -766,7 +838,7 @@ impl Engine { }; Ok(json!({ - "hyperd_running": self.hyper.is_running(), + "hyperd_running": self.is_running(), "workspace_path": self.workspace_path.to_string_lossy(), "workspace_mode": if self.is_persistent { "persistent" } else { "ephemeral" }, "table_count": table_count, @@ -1070,6 +1142,34 @@ fn strip_leading_sql_comments(sql: &str) -> &str { s } +impl Drop for Engine { + fn drop(&mut self) { + // In daemon mode with ephemeral databases, DETACH the workspace from hyperd + // (releases the file handle — critical on Windows) then delete the temp file. + if !self.is_persistent && self.daemon_endpoint.is_some() { + let db_name = self.primary_db_name(); + let detach = format!("DETACH DATABASE \"{db_name}\""); + let _ = self.connection.execute_command(&detach); + // Remove the temp directory containing the ephemeral .hyper file + if let Some(parent) = self.workspace_path.parent() { + let _ = std::fs::remove_dir_all(parent); + } + } + } +} + +fn bootstrap_public_schema(connection: &Connection) -> Result<(), McpError> { + connection + .execute_command("CREATE SCHEMA IF NOT EXISTS public") + .map(|_| ()) + .map_err(|e| { + McpError::new( + ErrorCode::InternalError, + format!("Failed to bootstrap public schema: {e}"), + ) + }) +} + /// Minimal `~/` (and `~\` on Windows) expansion. Resolves the home /// directory via `$HOME` on Unix and `%USERPROFILE%` (falling back to /// `%HOMEDRIVE%%HOMEPATH%`) on Windows. `~username/` is not supported — diff --git a/hyperdb-mcp/src/lib.rs b/hyperdb-mcp/src/lib.rs index 4fba0ba..2d187ca 100644 --- a/hyperdb-mcp/src/lib.rs +++ b/hyperdb-mcp/src/lib.rs @@ -38,6 +38,7 @@ pub mod attach; pub mod chart; +pub mod daemon; pub mod engine; pub mod error; pub mod export; diff --git a/hyperdb-mcp/src/main.rs b/hyperdb-mcp/src/main.rs index 43a4966..d83917f 100644 --- a/hyperdb-mcp/src/main.rs +++ b/hyperdb-mcp/src/main.rs @@ -4,6 +4,7 @@ //! Binary entry point for the `hyperdb-mcp` MCP server. //! //! Starts an MCP server on stdio, optionally backed by a persistent workspace. +//! Can also run in daemon mode to manage a shared `hyperd` process. //! //! # Logging //! @@ -19,7 +20,11 @@ //! [`hyperdb_mcp::engine::resolve_log_dir`]). Check the `status` tool for //! the exact paths. -use clap::Parser; +use clap::{Parser, Subcommand}; +use hyperdb_mcp::daemon; +use hyperdb_mcp::daemon::discovery; +use hyperdb_mcp::daemon::health; +use hyperdb_mcp::daemon::run::DaemonConfig; use hyperdb_mcp::engine::{resolve_log_dir, CLIENT_LOG_FILE_NAME}; use hyperdb_mcp::server::HyperMcpServer; use rmcp::ServiceExt; @@ -37,29 +42,107 @@ const VERSION: &str = concat!(env!("CARGO_PKG_VERSION"), ".r", env!("HYPERDB_GIT about = "MCP server for Hyper database analytics" )] struct Cli { + #[command(subcommand)] + command: Option, + /// Path to the `.hyper` workspace file for persistent mode (omit for ephemeral mode) - #[arg(long)] + #[arg(long, global = true)] workspace: Option, /// Run in read-only mode: disables execute, `load_data`, `load_file`, and export to hyper format - #[arg(long)] + #[arg(long, global = true)] read_only: bool, /// Bare mode: disable MCP-managed auxiliary tables. Skips creating /// `_table_catalog` and forces saved queries into in-memory /// (non-persistent) storage, even with --workspace. - #[arg(long)] + #[arg(long, global = true)] bare: bool, + + /// Disable the shared daemon and spawn a private `hyperd` (legacy behavior) + #[arg(long, global = true)] + no_daemon: bool, +} + +#[derive(Subcommand)] +enum Commands { + /// Run as a background daemon managing a shared hyperd process + Daemon { + #[command(subcommand)] + action: Option, + + /// TCP port for health listener and single-instance lock + #[arg(long, default_value_t = daemon::DEFAULT_DAEMON_PORT)] + port: u16, + + /// Idle timeout in seconds before the daemon shuts down + #[arg(long)] + idle_timeout: Option, + }, +} + +#[derive(Subcommand)] +enum DaemonAction { + /// Stop a running daemon + Stop, + /// Show status of the running daemon + Status, } #[tokio::main] async fn main() -> Result<(), Box> { - // Parse CLI first so the log directory matches whatever workspace the - // user requested. This has to happen before tracing is initialized so - // the file layer points at the right place. let cli = Cli::parse(); - // Compute and create the log dir (same logic Engine will use later). + match cli.command { + Some(Commands::Daemon { + action: Some(DaemonAction::Stop), + port, + .. + }) => { + daemon_stop(port); + Ok(()) + } + Some(Commands::Daemon { + action: Some(DaemonAction::Status), + .. + }) => { + daemon_status(); + Ok(()) + } + Some(Commands::Daemon { + action: None, + port, + idle_timeout, + }) => run_daemon_mode(port, idle_timeout).await, + None => run_mcp_mode(cli).await, + } +} + +async fn run_daemon_mode( + port: u16, + idle_timeout: Option, +) -> Result<(), Box> { + // Daemon logs go to ~/.hyperdb/logs/ + let log_dir = discovery::state_dir()?.join("logs"); + std::fs::create_dir_all(&log_dir)?; + + let file_appender = tracing_appender::rolling::never(&log_dir, "hyperdb-daemon.log"); + let (file_writer, _file_guard) = tracing_appender::non_blocking(file_appender); + + let filter = EnvFilter::try_from_default_env() + .unwrap_or_else(|_| EnvFilter::new("info,hyperdb_mcp=debug")); + + tracing_subscriber::registry() + .with(filter) + .with(fmt::layer().with_writer(std::io::stderr)) + .with(fmt::layer().with_writer(file_writer).with_ansi(false)) + .init(); + + let config = DaemonConfig::from_args(port, idle_timeout); + daemon::run::run_daemon(config).await +} + +async fn run_mcp_mode(cli: Cli) -> Result<(), Box> { let log_dir = resolve_log_dir(cli.workspace.as_deref()); if let Err(e) = std::fs::create_dir_all(&log_dir) { eprintln!( @@ -68,13 +151,9 @@ async fn main() -> Result<(), Box> { ); } - // tracing_appender writes via a background thread; we keep the guard - // alive for the duration of `main` so buffered logs get flushed cleanly. let file_appender = tracing_appender::rolling::never(&log_dir, CLIENT_LOG_FILE_NAME); let (file_writer, _file_guard) = tracing_appender::non_blocking(file_appender); - // Default to `info` when RUST_LOG is unset so the log files are actually - // populated. Users can still override via RUST_LOG=debug,hyperdb_api=trace etc. let filter = EnvFilter::try_from_default_env() .unwrap_or_else(|_| EnvFilter::new("info,hyperdb_mcp=debug")); @@ -89,12 +168,40 @@ async fn main() -> Result<(), Box> { workspace = cli.workspace.as_deref().unwrap_or(""), read_only = cli.read_only, bare = cli.bare, + no_daemon = cli.no_daemon, "hyperdb-mcp starting" ); - let server = HyperMcpServer::new(cli.workspace, cli.read_only, cli.bare); + let server = + HyperMcpServer::with_no_daemon(cli.workspace, cli.read_only, cli.bare, cli.no_daemon); let service = server.serve(rmcp::transport::io::stdio()).await?; service.waiting().await?; Ok(()) } + +fn daemon_stop(port: u16) { + match health::send_command(port, "STOP") { + Ok(response) => { + println!("Daemon responded: {}", response.trim()); + } + Err(e) => { + eprintln!("No daemon running on port {port} (or cannot connect): {e}"); + std::process::exit(1); + } + } +} + +fn daemon_status() { + if let Some(info) = discovery::discover() { + println!("Daemon is running:"); + println!(" PID: {}", info.pid); + println!(" Hyperd endpoint: {}", info.hyperd_endpoint); + println!(" Health port: {}", info.health_port); + println!(" Started: {}", info.started_at); + println!(" Version: {}", info.version); + } else { + eprintln!("No daemon is currently running."); + std::process::exit(1); + } +} diff --git a/hyperdb-mcp/src/server.rs b/hyperdb-mcp/src/server.rs index 38c8873..0159eae 100644 --- a/hyperdb-mcp/src/server.rs +++ b/hyperdb-mcp/src/server.rs @@ -755,6 +755,10 @@ pub struct HyperMcpServer { /// [`crate::saved_queries::SessionStore`] and the catalog is never /// created or updated. bare: bool, + /// Skip the shared daemon and spawn a private `hyperd` (legacy behavior). + no_daemon: bool, + /// Last time a heartbeat was sent to the daemon (debounced to avoid per-call TCP overhead). + last_heartbeat: std::sync::Mutex, // Under rmcp 1.x the router fields are constructed for downstream // macro-generated dispatch but not read through a direct field access // that the compiler can see. Keep them; the `#[tool_router]` / @@ -797,6 +801,25 @@ impl HyperMcpServer { /// workspace file. Useful when callers want a pristine `.hyper` file /// containing only their own data. pub fn new(workspace_path: Option, read_only: bool, bare: bool) -> Self { + Self::with_options(workspace_path, read_only, bare, false) + } + + /// Create a server instance with explicit daemon control. + pub fn with_no_daemon( + workspace_path: Option, + read_only: bool, + bare: bool, + no_daemon: bool, + ) -> Self { + Self::with_options(workspace_path, read_only, bare, no_daemon) + } + + fn with_options( + workspace_path: Option, + read_only: bool, + bare: bool, + no_daemon: bool, + ) -> Self { // Bare mode forces a SessionStore regardless of workspace so the // `_hyperdb_saved_queries` meta-table is never created. let saved_queries: Arc = if bare { @@ -818,6 +841,8 @@ impl HyperMcpServer { workspace_path, read_only, bare, + no_daemon, + last_heartbeat: std::sync::Mutex::new(std::time::Instant::now()), tool_router: Self::tool_router(), prompt_router: Self::prompt_router(), } @@ -927,7 +952,11 @@ impl HyperMcpServer { bare = self.bare, "initializing hyper engine" ); - let engine = Engine::new(self.workspace_path.clone())?; + let engine = if self.no_daemon { + Engine::new_no_daemon(self.workspace_path.clone())? + } else { + Engine::new(self.workspace_path.clone())? + }; tracing::info!( workspace_path = %engine.workspace_path().display(), log_dir = %engine.log_dir().display(), @@ -1045,6 +1074,11 @@ impl HyperMcpServer { // catalog SQL can see errors classified via the normal error // path. No-op in bare or read-only mode. self.ensure_catalog_ready(engine); + // In daemon mode, send a heartbeat so the daemon knows we're still active. + // Debounced to avoid per-call TCP overhead (only sends if >60s since last). + if !self.no_daemon { + self.maybe_send_heartbeat(); + } let result = f(engine); if let Err(e) = &result { tracing::debug!(code = ?e.code, message = %e.message, "tool call returned error"); @@ -1069,6 +1103,24 @@ impl HyperMcpServer { result } + /// Best-effort heartbeat to keep the daemon alive while this client is active. + /// Debounced: only sends if more than 60 seconds have elapsed since the last heartbeat, + /// avoiding a new TCP connection on every tool call. + fn maybe_send_heartbeat(&self) { + const HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60); + let should_send = self + .last_heartbeat + .lock() + .is_ok_and(|guard| guard.elapsed() >= HEARTBEAT_INTERVAL); + if should_send { + let port = crate::daemon::discovery::resolve_port(); + let _ = crate::daemon::health::send_command(port, "HEARTBEAT"); + if let Ok(mut guard) = self.last_heartbeat.lock() { + *guard = std::time::Instant::now(); + } + } + } + /// Run a closure that accesses the saved-query store. /// /// Some store variants (notably diff --git a/hyperdb-mcp/tests/daemon_tests.rs b/hyperdb-mcp/tests/daemon_tests.rs new file mode 100644 index 0000000..9af31d8 --- /dev/null +++ b/hyperdb-mcp/tests/daemon_tests.rs @@ -0,0 +1,612 @@ +// Copyright (c) 2026, Salesforce, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR MIT + +//! Tests for the single-instance daemon: discovery file, health protocol, +//! idle timeout, and full lifecycle integration with a real `hyperd`. +//! +//! Many tests mutate process-global environment variables (`HYPERDB_STATE_DIR`, +//! `HYPERDB_DAEMON_PORT`) to isolate their state directories. Because env vars +//! are process-global, these tests MUST run sequentially. We enforce this via a +//! shared mutex — every test that touches env vars acquires `ENV_LOCK` first. + +use std::net::TcpListener; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use hyperdb_mcp::daemon::discovery::{self, DaemonInfo}; +use hyperdb_mcp::daemon::health::{self, DaemonState, HealthListener}; +use tempfile::TempDir; + +/// Process-wide lock for tests that mutate environment variables. +/// Cargo runs tests in the same process by default — this prevents races. +static ENV_LOCK: Mutex<()> = Mutex::new(()); + +// ─── Unit tests: DaemonState (no env vars, safe to run in parallel) ─────────── + +#[test] +fn daemon_state_touch_resets_idle_duration() { + let state = DaemonState::new(); + std::thread::sleep(Duration::from_millis(50)); + assert!(state.idle_duration() >= Duration::from_millis(50)); + + state.touch(); + assert!(state.idle_duration() < Duration::from_millis(30)); +} + +#[test] +fn daemon_state_shutdown_flag() { + let state = DaemonState::new(); + assert!(!state.should_shutdown()); + + state.request_shutdown(); + assert!(state.should_shutdown()); +} + +#[test] +fn daemon_state_default_is_equivalent_to_new() { + let default_state = DaemonState::default(); + assert!(!default_state.should_shutdown()); + assert!(default_state.idle_duration() < Duration::from_millis(100)); +} + +// ─── Unit tests: Health protocol (no env vars, safe to run in parallel) ─────── + +#[test] +fn health_listener_bind_succeeds_on_free_port() { + let listener = HealthListener::bind(0).unwrap(); + assert_ne!(listener.port, 0); +} + +#[test] +fn health_listener_second_bind_same_port_fails() { + let listener = HealthListener::bind(0).unwrap(); + let port = listener.port; + + let result = HealthListener::bind(port); + assert!(result.is_err()); + assert_eq!(result.unwrap_err().kind(), std::io::ErrorKind::AddrInUse); +} + +#[test] +fn health_protocol_ping_pong() { + let (port, _handle, _state) = start_health_listener(); + + let response = health::send_command(port, "PING").unwrap(); + assert_eq!(response.trim(), "PONG"); +} + +#[test] +fn health_protocol_heartbeat_resets_idle() { + let (port, _handle, state) = start_health_listener(); + + std::thread::sleep(Duration::from_millis(50)); + assert!(state.idle_duration() >= Duration::from_millis(50)); + + let response = health::send_command(port, "HEARTBEAT").unwrap(); + assert_eq!(response.trim(), "OK"); + + assert!(state.idle_duration() < Duration::from_millis(30)); +} + +#[test] +fn health_protocol_stop_triggers_shutdown() { + let (port, handle, state) = start_health_listener(); + + assert!(!state.should_shutdown()); + + let response = health::send_command(port, "STOP").unwrap(); + assert_eq!(response.trim(), "STOPPING"); + + assert!(state.should_shutdown()); + + // Health listener should exit its loop + handle.join().unwrap(); +} + +#[test] +fn health_protocol_status_returns_json() { + let (port, _handle, _state) = start_health_listener(); + + let response = health::send_command(port, "STATUS").unwrap(); + let parsed: serde_json::Value = serde_json::from_str(response.trim()).unwrap(); + assert_eq!(parsed["pid"], 12345); + assert_eq!(parsed["hyperd_endpoint"], "127.0.0.1:54321"); +} + +#[test] +fn health_protocol_unknown_command_returns_error() { + let (port, _handle, _state) = start_health_listener(); + + let response = health::send_command(port, "INVALID").unwrap(); + assert!(response.contains("ERR")); +} + +#[test] +fn health_protocol_multi_command_session() { + let (port, _handle, _state) = start_health_listener(); + + let response1 = health::send_command(port, "PING").unwrap(); + assert_eq!(response1.trim(), "PONG"); + + let response2 = health::send_command(port, "STATUS").unwrap(); + let parsed: serde_json::Value = serde_json::from_str(response2.trim()).unwrap(); + assert_eq!(parsed["health_port"], port); + + let response3 = health::send_command(port, "HEARTBEAT").unwrap(); + assert_eq!(response3.trim(), "OK"); +} + +// ─── Unit tests: idle timeout logic (no env vars) ───────────────────────────── + +#[test] +fn daemon_idle_timeout_shuts_down_daemon() { + let state = Arc::new(DaemonState::new()); + let idle_timeout = Duration::from_secs(2); + + let monitor_state = Arc::clone(&state); + let monitor = std::thread::spawn(move || loop { + std::thread::sleep(Duration::from_millis(100)); + if monitor_state.idle_duration() >= idle_timeout { + monitor_state.request_shutdown(); + break; + } + if monitor_state.should_shutdown() { + break; + } + }); + + let start = Instant::now(); + monitor.join().unwrap(); + let elapsed = start.elapsed(); + + assert!(state.should_shutdown()); + assert!(elapsed >= Duration::from_secs(2)); + assert!(elapsed < Duration::from_secs(4)); +} + +#[test] +fn daemon_heartbeat_prevents_idle_shutdown() { + let state = Arc::new(DaemonState::new()); + let idle_timeout = Duration::from_secs(1); + + let monitor_state = Arc::clone(&state); + let heartbeat_state = Arc::clone(&state); + + let heartbeat = std::thread::spawn(move || { + let start = Instant::now(); + while start.elapsed() < Duration::from_millis(1500) { + heartbeat_state.touch(); + std::thread::sleep(Duration::from_millis(200)); + } + }); + + let monitor = std::thread::spawn(move || loop { + std::thread::sleep(Duration::from_millis(100)); + if monitor_state.idle_duration() >= idle_timeout { + monitor_state.request_shutdown(); + break; + } + if monitor_state.should_shutdown() { + break; + } + }); + + heartbeat.join().unwrap(); + let start = Instant::now(); + monitor.join().unwrap(); + let after_heartbeat_stop = start.elapsed(); + + assert!(state.should_shutdown()); + assert!( + after_heartbeat_stop >= Duration::from_millis(800), + "daemon should have waited for idle timeout after heartbeats stopped" + ); +} + +// ─── Unit tests: Discovery file (require ENV_LOCK) ──────────────────────────── + +#[test] +fn discovery_file_write_and_read() { + let _lock = ENV_LOCK.lock().unwrap(); + let tmp = TempDir::new().unwrap(); + let _guard = EnvGuard::set("HYPERDB_STATE_DIR", tmp.path().to_str().unwrap()); + + let info = DaemonInfo { + pid: 12345, + hyperd_endpoint: "127.0.0.1:54321".to_string(), + health_port: 7484, + started_at: "2026-05-20T10:30:00Z".to_string(), + version: "0.1.3".to_string(), + }; + + discovery::write_discovery_file(&info).unwrap(); + + let path = tmp.path().join("daemon.json"); + assert!(path.exists()); + + let contents = std::fs::read_to_string(&path).unwrap(); + let read_back: DaemonInfo = serde_json::from_str(&contents).unwrap(); + assert_eq!(read_back.pid, 12345); + assert_eq!(read_back.hyperd_endpoint, "127.0.0.1:54321"); + assert_eq!(read_back.health_port, 7484); + assert_eq!(read_back.version, "0.1.3"); +} + +#[test] +fn discovery_file_overwrite_replaces_content() { + let _lock = ENV_LOCK.lock().unwrap(); + let tmp = TempDir::new().unwrap(); + let _guard = EnvGuard::set("HYPERDB_STATE_DIR", tmp.path().to_str().unwrap()); + + let info1 = DaemonInfo { + pid: 100, + hyperd_endpoint: "127.0.0.1:1111".to_string(), + health_port: 7484, + started_at: "2026-01-01T00:00:00Z".to_string(), + version: "0.1.0".to_string(), + }; + discovery::write_discovery_file(&info1).unwrap(); + + let info2 = DaemonInfo { + pid: 200, + hyperd_endpoint: "127.0.0.1:2222".to_string(), + health_port: 7485, + started_at: "2026-02-02T00:00:00Z".to_string(), + version: "0.2.0".to_string(), + }; + discovery::write_discovery_file(&info2).unwrap(); + + let path = tmp.path().join("daemon.json"); + let contents = std::fs::read_to_string(&path).unwrap(); + let read_back: DaemonInfo = serde_json::from_str(&contents).unwrap(); + assert_eq!(read_back.pid, 200); + assert_eq!(read_back.hyperd_endpoint, "127.0.0.1:2222"); +} + +#[test] +fn remove_discovery_file_deletes_it() { + let _lock = ENV_LOCK.lock().unwrap(); + let tmp = TempDir::new().unwrap(); + let _guard = EnvGuard::set("HYPERDB_STATE_DIR", tmp.path().to_str().unwrap()); + + let info = DaemonInfo { + pid: 1, + hyperd_endpoint: "127.0.0.1:1".to_string(), + health_port: 7484, + started_at: "2026-01-01T00:00:00Z".to_string(), + version: "0.0.1".to_string(), + }; + discovery::write_discovery_file(&info).unwrap(); + let path = tmp.path().join("daemon.json"); + assert!(path.exists()); + + discovery::remove_discovery_file(); + assert!(!path.exists()); +} + +#[test] +fn discover_returns_none_when_no_file_exists() { + let _lock = ENV_LOCK.lock().unwrap(); + let tmp = TempDir::new().unwrap(); + let _guard = EnvGuard::set("HYPERDB_STATE_DIR", tmp.path().to_str().unwrap()); + + assert!(discovery::discover().is_none()); +} + +#[test] +fn discover_returns_none_for_stale_file() { + let _lock = ENV_LOCK.lock().unwrap(); + let tmp = TempDir::new().unwrap(); + let _guard = EnvGuard::set("HYPERDB_STATE_DIR", tmp.path().to_str().unwrap()); + + let info = DaemonInfo { + pid: 99999, + hyperd_endpoint: "127.0.0.1:1".to_string(), + health_port: 1, + started_at: "2026-01-01T00:00:00Z".to_string(), + version: "0.0.1".to_string(), + }; + discovery::write_discovery_file(&info).unwrap(); + + assert!(discovery::discover().is_none()); + + let path = tmp.path().join("daemon.json"); + assert!(!path.exists()); +} + +#[test] +fn resolve_port_uses_env_var() { + let _lock = ENV_LOCK.lock().unwrap(); + let _guard = EnvGuard::set("HYPERDB_DAEMON_PORT", "9999"); + assert_eq!(discovery::resolve_port(), 9999); +} + +#[test] +fn resolve_port_uses_default_when_env_unset() { + let _lock = ENV_LOCK.lock().unwrap(); + let _guard = EnvGuard::remove("HYPERDB_DAEMON_PORT"); + assert_eq!( + discovery::resolve_port(), + hyperdb_mcp::daemon::DEFAULT_DAEMON_PORT + ); +} + +#[test] +fn discover_finds_live_daemon() { + let _lock = ENV_LOCK.lock().unwrap(); + let tmp = TempDir::new().unwrap(); + let _guard = EnvGuard::set("HYPERDB_STATE_DIR", tmp.path().to_str().unwrap()); + + let (port, _handle, _state) = start_health_listener(); + + let info = DaemonInfo { + pid: 12345, + hyperd_endpoint: "127.0.0.1:54321".to_string(), + health_port: port, + started_at: "2026-05-20T10:30:00Z".to_string(), + version: "0.1.3".to_string(), + }; + discovery::write_discovery_file(&info).unwrap(); + + let discovered = discovery::discover().expect("should discover live daemon"); + assert_eq!(discovered.pid, 12345); + assert_eq!(discovered.health_port, port); +} + +// ─── Integration tests: full daemon lifecycle with real hyperd ───────────────── + +#[test] +fn daemon_mode_engine_connects_to_shared_hyperd() { + let _lock = ENV_LOCK.lock().unwrap(); + let daemon = TestDaemon::start(); + + let tmp = TempDir::new().unwrap(); + let workspace_path = tmp.path().join("test.hyper"); + + let engine = + hyperdb_mcp::engine::Engine::new(Some(workspace_path.to_str().unwrap().to_string())) + .expect("engine should connect to daemon"); + + assert!(engine.is_running()); + + let endpoint = engine.hyperd_endpoint().unwrap(); + assert_eq!(endpoint, daemon.info.hyperd_endpoint); +} + +#[test] +fn daemon_mode_two_engines_share_same_hyperd() { + let _lock = ENV_LOCK.lock().unwrap(); + let _daemon = TestDaemon::start(); + + let tmp1 = TempDir::new().unwrap(); + let tmp2 = TempDir::new().unwrap(); + let path1 = tmp1.path().join("db1.hyper"); + let path2 = tmp2.path().join("db2.hyper"); + + let engine1 = + hyperdb_mcp::engine::Engine::new(Some(path1.to_str().unwrap().to_string())).unwrap(); + + let engine2 = + hyperdb_mcp::engine::Engine::new(Some(path2.to_str().unwrap().to_string())).unwrap(); + + assert_eq!( + engine1.hyperd_endpoint().unwrap(), + engine2.hyperd_endpoint().unwrap() + ); + + engine1.execute_command("CREATE TABLE foo (x INT)").unwrap(); + engine1 + .execute_command("INSERT INTO foo VALUES (42)") + .unwrap(); + + let tables = engine2.describe_tables().unwrap(); + assert!( + tables.iter().all(|t| t["name"] != "foo"), + "engine2 should not see engine1's table" + ); +} + +#[test] +fn daemon_mode_persistent_database_file_survives_engine_drop() { + let _lock = ENV_LOCK.lock().unwrap(); + let _daemon = TestDaemon::start(); + let tmp = TempDir::new().unwrap(); + let path = tmp.path().join("persistent.hyper"); + let path_str = path.to_str().unwrap().to_string(); + + { + let engine = hyperdb_mcp::engine::Engine::new(Some(path_str.clone())).unwrap(); + engine + .execute_command("CREATE TABLE survive (val TEXT)") + .unwrap(); + engine + .execute_command("INSERT INTO survive VALUES ('hello')") + .unwrap(); + } + + assert!( + path.exists(), + "persistent .hyper file should survive engine drop" + ); +} + +#[test] +fn daemon_mode_persistent_engine_data_is_queryable() { + let _lock = ENV_LOCK.lock().unwrap(); + let daemon = TestDaemon::start(); + let tmp = TempDir::new().unwrap(); + let path = tmp.path().join("queryable.hyper"); + let path_str = path.to_str().unwrap().to_string(); + + let engine = hyperdb_mcp::engine::Engine::new(Some(path_str)).unwrap(); + engine + .execute_command("CREATE TABLE items (id INT, name TEXT)") + .unwrap(); + engine + .execute_command("INSERT INTO items VALUES (1, 'alpha'), (2, 'beta')") + .unwrap(); + + let rows = engine + .execute_query_to_json("SELECT * FROM items ORDER BY id") + .unwrap(); + assert_eq!(rows.len(), 2); + assert_eq!(rows[0]["name"], "alpha"); + assert_eq!(rows[1]["name"], "beta"); + + let resp = health::send_command(daemon.info.health_port, "PING").unwrap(); + assert_eq!(resp.trim(), "PONG"); +} + +#[test] +fn daemon_mode_ephemeral_database_cleaned_up_on_drop() { + let _lock = ENV_LOCK.lock().unwrap(); + let _daemon = TestDaemon::start(); + + let engine = hyperdb_mcp::engine::Engine::new(None).unwrap(); + let workspace_path = engine.workspace_path().to_path_buf(); + + assert!(workspace_path.exists()); + + engine + .execute_command("CREATE TABLE ephemeral_test (id INT)") + .unwrap(); + + drop(engine); + + assert!( + !workspace_path.exists(), + "ephemeral .hyper file should be deleted after engine drop" + ); +} + +// ─── Test helpers ───────────────────────────────────────────────────────────── + +/// Starts a health listener on a random port and returns the port, join handle, +/// and shared state. Does NOT touch env vars — safe for parallel use. +fn start_health_listener() -> (u16, std::thread::JoinHandle<()>, Arc) { + let listener = HealthListener::bind(0).unwrap(); + let port = listener.port; + let state = Arc::new(DaemonState::new()); + let run_state = Arc::clone(&state); + + let info = DaemonInfo { + pid: 12345, + hyperd_endpoint: "127.0.0.1:54321".to_string(), + health_port: port, + started_at: "2026-05-20T10:30:00Z".to_string(), + version: "0.1.3".to_string(), + }; + + let handle = std::thread::spawn(move || { + listener.run(run_state, info); + }); + + // Give the listener a moment to start accepting + std::thread::sleep(Duration::from_millis(50)); + + (port, handle, state) +} + +/// A real daemon running in a background thread for integration tests. +/// Sets `HYPERDB_STATE_DIR` and `HYPERDB_DAEMON_PORT` to isolated values. +/// Caller MUST hold `ENV_LOCK` before calling `start()`. +struct TestDaemon { + info: DaemonInfo, + _state_dir_guard: EnvGuard, + _port_guard: EnvGuard, +} + +impl TestDaemon { + fn start() -> Self { + let tmp = TempDir::new().unwrap(); + // Leak the TempDir so it persists for the lifetime of the test. + let tmp = Box::leak(Box::new(tmp)); + + let state_dir_guard = EnvGuard::set("HYPERDB_STATE_DIR", tmp.path().to_str().unwrap()); + + let port = find_free_port(); + let port_guard = EnvGuard::set("HYPERDB_DAEMON_PORT", &port.to_string()); + + // Start the daemon in a background tokio runtime + let daemon_port = port; + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let config = hyperdb_mcp::daemon::run::DaemonConfig { + port: daemon_port, + idle_timeout: Duration::from_secs(300), + }; + let _ = hyperdb_mcp::daemon::run::run_daemon(config).await; + }); + }); + + // Wait for daemon to become ready + let start = Instant::now(); + loop { + if let Some(info) = discovery::discover() { + return Self { + info, + _state_dir_guard: state_dir_guard, + _port_guard: port_guard, + }; + } + assert!( + start.elapsed() <= Duration::from_secs(15), + "TestDaemon did not start within 15 seconds" + ); + std::thread::sleep(Duration::from_millis(100)); + } + } +} + +impl Drop for TestDaemon { + fn drop(&mut self) { + let _ = health::send_command(self.info.health_port, "STOP"); + std::thread::sleep(Duration::from_millis(200)); + } +} + +/// Find a free TCP port by binding to port 0 and reading the assigned port. +fn find_free_port() -> u16 { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + listener.local_addr().unwrap().port() +} + +/// RAII guard that sets/removes an environment variable and restores it on drop. +struct EnvGuard { + key: String, + previous: Option, +} + +impl EnvGuard { + fn set(key: &str, value: &str) -> Self { + let previous = std::env::var(key).ok(); + // SAFETY: Callers hold ENV_LOCK, ensuring no concurrent env var access. + unsafe { std::env::set_var(key, value) }; + Self { + key: key.to_string(), + previous, + } + } + + fn remove(key: &str) -> Self { + let previous = std::env::var(key).ok(); + // SAFETY: Callers hold ENV_LOCK, ensuring no concurrent env var access. + unsafe { std::env::remove_var(key) }; + Self { + key: key.to_string(), + previous, + } + } +} + +impl Drop for EnvGuard { + fn drop(&mut self) { + match &self.previous { + // SAFETY: Callers hold ENV_LOCK for the lifetime of this guard. + Some(val) => unsafe { std::env::set_var(&self.key, val) }, + // SAFETY: Callers hold ENV_LOCK for the lifetime of this guard. + None => unsafe { std::env::remove_var(&self.key) }, + } + } +}