From 489b8ed7b19b1f74b68ab0cf96ba7b9561955dc1 Mon Sep 17 00:00:00 2001 From: Cody Date: Sun, 5 Apr 2026 10:51:05 -0400 Subject: [PATCH 1/3] refactor(core): extract magic strings to named constants Replace repeated magic strings/numbers with named constants: - DEFAULT_HOST, DEFAULT_PORT, DEFAULT_SERVER_URL in dkdc-db-core (shared) - IN_MEMORY_PATH in manager.rs (crate-local) - DEFAULT_LOG_LINES in cli.rs (crate-local) - Reuse api::MAX_BODY_SIZE in server lib.rs instead of duplicating 16*1024*1024 Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/dkdc-db-cli/src/cli.rs | 23 ++++++++++++----------- crates/dkdc-db-cli/src/lib.rs | 8 ++++++-- crates/dkdc-db-client/src/cli.rs | 10 +++++----- crates/dkdc-db-core/src/lib.rs | 9 +++++++++ crates/dkdc-db-core/src/manager.rs | 6 ++++-- crates/dkdc-db-core/src/toml_config.rs | 4 ++-- crates/dkdc-db-server/src/api.rs | 2 +- crates/dkdc-db-server/src/lib.rs | 2 +- 8 files changed, 40 insertions(+), 24 deletions(-) diff --git a/crates/dkdc-db-cli/src/cli.rs b/crates/dkdc-db-cli/src/cli.rs index b06eb96..615716c 100644 --- a/crates/dkdc-db-cli/src/cli.rs +++ b/crates/dkdc-db-cli/src/cli.rs @@ -1,6 +1,7 @@ use clap::{Parser, Subcommand}; pub const TMUX_SESSION: &str = "dkdc-db"; +const DEFAULT_LOG_LINES: usize = 50; #[derive(Parser)] #[command(name = "db", about = "dkdc-db: HTAP database")] @@ -14,11 +15,11 @@ pub enum Commands { /// Start the database server (in tmux) Serve { /// Host to bind to - #[arg(long, default_value = "127.0.0.1")] + #[arg(long, default_value = dkdc_db_core::DEFAULT_HOST)] host: String, /// Port to bind to - #[arg(long, default_value_t = 4200)] + #[arg(long, default_value_t = dkdc_db_core::DEFAULT_PORT)] port: u16, /// Run in foreground (skip tmux) @@ -30,7 +31,7 @@ pub enum Commands { /// Show database server status Status { /// Port to check - #[arg(long, default_value_t = 4200)] + #[arg(long, default_value_t = dkdc_db_core::DEFAULT_PORT)] port: u16, }, /// Attach to database server tmux session @@ -38,7 +39,7 @@ pub enum Commands { /// Show recent logs from tmux session Logs { /// Number of lines to show - #[arg(short, long, default_value_t = 50)] + #[arg(short, long, default_value_t = DEFAULT_LOG_LINES)] lines: usize, }, /// Generate a starter db.toml in the current directory @@ -55,7 +56,7 @@ pub enum Commands { #[arg()] name: String, /// Server URL - #[arg(long, default_value = "http://127.0.0.1:4200")] + #[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)] url: String, }, /// Drop a database @@ -64,13 +65,13 @@ pub enum Commands { #[arg()] name: String, /// Server URL - #[arg(long, default_value = "http://127.0.0.1:4200")] + #[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)] url: String, }, /// Interactive SQL REPL Repl { /// Server URL - #[arg(long, default_value = "http://127.0.0.1:4200")] + #[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)] url: String, /// Initial database to use #[arg(long)] @@ -79,7 +80,7 @@ pub enum Commands { /// Execute a read query and print results Query { /// Server URL - #[arg(long, default_value = "http://127.0.0.1:4200")] + #[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)] url: String, /// Database for OLTP query (omit for global analytical) #[arg(long)] @@ -90,7 +91,7 @@ pub enum Commands { /// Execute a write statement Execute { /// Server URL - #[arg(long, default_value = "http://127.0.0.1:4200")] + #[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)] url: String, /// Target database (required) #[arg(long)] @@ -101,7 +102,7 @@ pub enum Commands { /// List tables in a database Tables { /// Server URL - #[arg(long, default_value = "http://127.0.0.1:4200")] + #[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)] url: String, /// Database name (required) #[arg(long)] @@ -110,7 +111,7 @@ pub enum Commands { /// List databases List { /// Server URL - #[arg(long, default_value = "http://127.0.0.1:4200")] + #[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)] url: String, }, } diff --git a/crates/dkdc-db-cli/src/lib.rs b/crates/dkdc-db-cli/src/lib.rs index d55dde7..4e2a1af 100644 --- a/crates/dkdc-db-cli/src/lib.rs +++ b/crates/dkdc-db-cli/src/lib.rs @@ -54,12 +54,16 @@ async fn async_main() -> anyhow::Result<()> { // Apply server config (CLI flags override config file) let (effective_host, effective_port) = match &config { Some(c) => { - let h = if host != "127.0.0.1" { + let h = if host != dkdc_db_core::DEFAULT_HOST { host.clone() } else { c.server.host.clone() }; - let p = if port != 4200 { port } else { c.server.port }; + let p = if port != dkdc_db_core::DEFAULT_PORT { + port + } else { + c.server.port + }; (h, p) } None => (host.clone(), port), diff --git a/crates/dkdc-db-client/src/cli.rs b/crates/dkdc-db-client/src/cli.rs index cfe8dea..10d262b 100644 --- a/crates/dkdc-db-client/src/cli.rs +++ b/crates/dkdc-db-client/src/cli.rs @@ -12,7 +12,7 @@ pub enum Commands { /// Interactive SQL REPL Repl { /// Server URL - #[arg(long, default_value = "http://127.0.0.1:4200")] + #[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)] url: String, /// Initial database to use #[arg(long)] @@ -21,7 +21,7 @@ pub enum Commands { /// Execute a read query and print results Query { /// Server URL - #[arg(long, default_value = "http://127.0.0.1:4200")] + #[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)] url: String, /// Database for OLTP query (omit for global analytical) #[arg(long)] @@ -32,7 +32,7 @@ pub enum Commands { /// Execute a write statement Execute { /// Server URL - #[arg(long, default_value = "http://127.0.0.1:4200")] + #[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)] url: String, /// Target database (required) #[arg(long)] @@ -43,7 +43,7 @@ pub enum Commands { /// List tables in a database Tables { /// Server URL - #[arg(long, default_value = "http://127.0.0.1:4200")] + #[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)] url: String, /// Database name (required) #[arg(long)] @@ -52,7 +52,7 @@ pub enum Commands { /// List databases List { /// Server URL - #[arg(long, default_value = "http://127.0.0.1:4200")] + #[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)] url: String, }, } diff --git a/crates/dkdc-db-core/src/lib.rs b/crates/dkdc-db-core/src/lib.rs index 47c2090..75c2195 100644 --- a/crates/dkdc-db-core/src/lib.rs +++ b/crates/dkdc-db-core/src/lib.rs @@ -18,3 +18,12 @@ pub use manager::DbManager; pub use arrow::record_batch::RecordBatch; pub use datafusion::dataframe::DataFrame; + +/// Default host for the database server. +pub const DEFAULT_HOST: &str = "127.0.0.1"; + +/// Default port for the database server. +pub const DEFAULT_PORT: u16 = 4200; + +/// Default server URL (`http://{DEFAULT_HOST}:{DEFAULT_PORT}`). +pub const DEFAULT_SERVER_URL: &str = "http://127.0.0.1:4200"; diff --git a/crates/dkdc-db-core/src/manager.rs b/crates/dkdc-db-core/src/manager.rs index f84bcfc..61e38f3 100644 --- a/crates/dkdc-db-core/src/manager.rs +++ b/crates/dkdc-db-core/src/manager.rs @@ -15,6 +15,8 @@ use crate::error::{self, Error, Result}; use crate::router; use crate::toml_config::DbTomlConfig; +const IN_MEMORY_PATH: &str = ":memory:"; + struct ManagedDb { db: DkdcDb, catalog: Arc, @@ -51,7 +53,7 @@ impl DbManager { ctx: SessionContext::new(), dbs: RwLock::new(HashMap::new()), known: RwLock::new(Vec::new()), - base_path: PathBuf::from(":memory:"), + base_path: PathBuf::from(IN_MEMORY_PATH), }) } @@ -226,7 +228,7 @@ impl DbManager { // -- internal -- async fn open_db(&self, name: &str) -> Result { - let is_memory = self.base_path.to_string_lossy() == ":memory:"; + let is_memory = self.base_path.to_string_lossy() == IN_MEMORY_PATH; let db = if is_memory { DkdcDb::open_in_memory().await? } else { diff --git a/crates/dkdc-db-core/src/toml_config.rs b/crates/dkdc-db-core/src/toml_config.rs index 181549e..c6bf789 100644 --- a/crates/dkdc-db-core/src/toml_config.rs +++ b/crates/dkdc-db-core/src/toml_config.rs @@ -30,11 +30,11 @@ impl Default for ServerConfig { } fn default_host() -> String { - "127.0.0.1".to_string() + crate::DEFAULT_HOST.to_string() } fn default_port() -> u16 { - 4200 + crate::DEFAULT_PORT } #[derive(Debug, Deserialize, Default)] diff --git a/crates/dkdc-db-server/src/api.rs b/crates/dkdc-db-server/src/api.rs index 65914cb..4ca37bc 100644 --- a/crates/dkdc-db-server/src/api.rs +++ b/crates/dkdc-db-server/src/api.rs @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; use tower_http::trace::TraceLayer; /// Max request body size: 16 MB -const MAX_BODY_SIZE: usize = 16 * 1024 * 1024; +pub const MAX_BODY_SIZE: usize = 16 * 1024 * 1024; /// Request timeout const REQUEST_TIMEOUT: Duration = Duration::from_secs(30); diff --git a/crates/dkdc-db-server/src/lib.rs b/crates/dkdc-db-server/src/lib.rs index 4405d5e..6a9d9f7 100644 --- a/crates/dkdc-db-server/src/lib.rs +++ b/crates/dkdc-db-server/src/lib.rs @@ -9,7 +9,7 @@ use dkdc_db_core::DbManager; pub async fn serve(manager: Arc, host: &str, port: u16) -> std::io::Result<()> { let app = api::routes() .merge(ui::ui_routes()) - .layer(axum::extract::DefaultBodyLimit::max(16 * 1024 * 1024)) + .layer(axum::extract::DefaultBodyLimit::max(api::MAX_BODY_SIZE)) .layer(tower_http::trace::TraceLayer::new_for_http()) .layer(axum::middleware::from_fn(api::timeout_middleware)) .with_state(manager); From ce7148f6c81e9f6b452d9ed851d267b004442a44 Mon Sep 17 00:00:00 2001 From: Cody Date: Sun, 5 Apr 2026 13:47:06 -0400 Subject: [PATCH 2/3] =?UTF-8?q?fix(ui):=20security=20hardening=20=E2=80=94?= =?UTF-8?q?=20SQL=20injection,=20XSS,=20input=20validation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Quote table names in SQL queries to prevent SQL injection - Validate table names against actual table list before use in SQL - Add single-quote escaping to escape_html to prevent XSS - Validate database exists in onboarding_create_table before rendering Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/dkdc-db-server/src/ui.rs | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/crates/dkdc-db-server/src/ui.rs b/crates/dkdc-db-server/src/ui.rs index daad052..aaf8c81 100644 --- a/crates/dkdc-db-server/src/ui.rs +++ b/crates/dkdc-db-server/src/ui.rs @@ -293,6 +293,7 @@ fn escape_html(s: &str) -> String { .replace('<', "<") .replace('>', ">") .replace('"', """) + .replace('\'', "'") } // --------------------------------------------------------------------------- @@ -360,9 +361,15 @@ fn onboarding_welcome() -> Html { } async fn onboarding_create_table( - State(_mgr): State, + State(mgr): State, Path(name): Path, -) -> Html { +) -> Response { + // Validate database exists before showing the create-table form + let dbs = mgr.list_dbs().await; + if !dbs.contains(&name) { + return Redirect::to("/ui").into_response(); + } + let n = escape_html(&name); let content = format!( r##"
@@ -419,7 +426,7 @@ async fn onboarding_create_table(
"##, n = n, ); - layout("Create Table", &content, "") + layout("Create Table", &content, "").into_response() } async fn onboarding_insert(State(mgr): State, Path(name): Path) -> Html { @@ -497,7 +504,7 @@ async fn database_view(State(mgr): State, Path(name): Path) -> let mut table_rows = String::new(); for table_name in &tables { let row_count = mgr - .query_oltp(&name, &format!("SELECT COUNT(*) FROM {}", table_name)) + .query_oltp(&name, &format!("SELECT COUNT(*) FROM \"{}\"", table_name)) .await .ok() .and_then(|batches| { @@ -605,6 +612,18 @@ async fn table_view( let db = escape_html(&db_name); let tbl = escape_html(&table_name); + // Validate table_name exists in this database before using it in SQL + let valid_tables = mgr.list_tables(&db_name).await.unwrap_or_default(); + if !valid_tables.contains(&table_name) { + let content = format!( + r##"
Table not found: {tbl}
+← Back to database"##, + tbl = tbl, + db = db, + ); + return layout("Error", &content, "dashboard").into_response(); + } + // Schema let schema_html = match mgr.table_schema(&db_name, &table_name).await { Ok(batches) => { @@ -637,7 +656,10 @@ async fn table_view( // Data (first 100 rows) let data_html = match mgr - .query_oltp(&db_name, &format!("SELECT * FROM {} LIMIT 100", table_name)) + .query_oltp( + &db_name, + &format!("SELECT * FROM \"{}\" LIMIT 100", table_name), + ) .await { Ok(batches) => { From 8273af83fb843d9ada8538951ae992fa6bdb37e5 Mon Sep 17 00:00:00 2001 From: Cody Date: Sun, 5 Apr 2026 18:22:43 -0400 Subject: [PATCH 3/3] refactor: extract helpers, constants, and idiomatic patterns - Extract handle_response/handle_response_unit in client (~100 lines saved) - Extract db_not_found() helper in manager (6 call sites) - Extract enable_wal() helper and WAL constants in db.rs - Extract DEFAULT_SCHEMA constant in catalog.rs - Use IN_MEMORY_PATH constant in db.rs (manager.rs already had it) - Fix indexed loop to iterator pattern in benchmark Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/dkdc-db-client/src/lib.rs | 82 +++++++++--------------- crates/dkdc-db-core/benches/benchmark.rs | 3 +- crates/dkdc-db-core/src/catalog.rs | 6 +- crates/dkdc-db-core/src/db.rs | 20 ++++-- crates/dkdc-db-core/src/manager.rs | 18 ++++-- 5 files changed, 61 insertions(+), 68 deletions(-) diff --git a/crates/dkdc-db-client/src/lib.rs b/crates/dkdc-db-client/src/lib.rs index d3b8dfb..61984ec 100644 --- a/crates/dkdc-db-client/src/lib.rs +++ b/crates/dkdc-db-client/src/lib.rs @@ -1,7 +1,7 @@ #[cfg(feature = "cli")] pub mod repl; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Serialize, de::DeserializeOwned}; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -74,12 +74,7 @@ impl DbClient { .send() .await?; - if resp.status().is_success() { - Ok(()) - } else { - let body: ErrorResponse = resp.json().await?; - Err(Error::Server(body.error)) - } + self.handle_response_unit(resp).await } /// Drop a database. @@ -90,12 +85,7 @@ impl DbClient { .send() .await?; - if resp.status().is_success() { - Ok(()) - } else { - let body: ErrorResponse = resp.json().await?; - Err(Error::Server(body.error)) - } + self.handle_response_unit(resp).await } /// List all databases. @@ -106,12 +96,7 @@ impl DbClient { .send() .await?; - if resp.status().is_success() { - Ok(resp.json().await?) - } else { - let body: ErrorResponse = resp.json().await?; - Err(Error::Server(body.error)) - } + self.handle_response(resp).await } /// Execute a write statement against a specific database. @@ -125,13 +110,8 @@ impl DbClient { .send() .await?; - if resp.status().is_success() { - let body: ExecuteResponse = resp.json().await?; - Ok(body.affected) - } else { - let body: ErrorResponse = resp.json().await?; - Err(Error::Server(body.error)) - } + let body: ExecuteResponse = self.handle_response(resp).await?; + Ok(body.affected) } /// Analytical query through DataFusion. Supports cross-db joins. @@ -145,12 +125,7 @@ impl DbClient { .send() .await?; - if resp.status().is_success() { - Ok(resp.json().await?) - } else { - let body: ErrorResponse = resp.json().await?; - Err(Error::Server(body.error)) - } + self.handle_response(resp).await } /// OLTP fast-path read against a specific database. @@ -164,12 +139,7 @@ impl DbClient { .send() .await?; - if resp.status().is_success() { - Ok(resp.json().await?) - } else { - let body: ErrorResponse = resp.json().await?; - Err(Error::Server(body.error)) - } + self.handle_response(resp).await } /// List tables in a specific database. @@ -180,12 +150,7 @@ impl DbClient { .send() .await?; - if resp.status().is_success() { - Ok(resp.json().await?) - } else { - let body: ErrorResponse = resp.json().await?; - Err(Error::Server(body.error)) - } + self.handle_response(resp).await } /// Get table schema for a specific database. @@ -196,12 +161,7 @@ impl DbClient { .send() .await?; - if resp.status().is_success() { - Ok(resp.json().await?) - } else { - let body: ErrorResponse = resp.json().await?; - Err(Error::Server(body.error)) - } + self.handle_response(resp).await } /// Health check. @@ -213,4 +173,26 @@ impl DbClient { .await?; Ok(resp.status().is_success()) } + + // -- internal -- + + /// Deserialize a successful JSON response or extract the server error. + async fn handle_response(&self, resp: reqwest::Response) -> Result { + if resp.status().is_success() { + Ok(resp.json().await?) + } else { + let body: ErrorResponse = resp.json().await?; + Err(Error::Server(body.error)) + } + } + + /// Check for success on responses with no meaningful body. + async fn handle_response_unit(&self, resp: reqwest::Response) -> Result<()> { + if resp.status().is_success() { + Ok(()) + } else { + let body: ErrorResponse = resp.json().await?; + Err(Error::Server(body.error)) + } + } } diff --git a/crates/dkdc-db-core/benches/benchmark.rs b/crates/dkdc-db-core/benches/benchmark.rs index 393ec5c..5e23291 100644 --- a/crates/dkdc-db-core/benches/benchmark.rs +++ b/crates/dkdc-db-core/benches/benchmark.rs @@ -66,8 +66,7 @@ async fn setup_multi_table(size: usize) -> DbManager { // Insert regions mgr.execute("bench", "BEGIN").await.unwrap(); - for i in 0..10 { - let name = REGIONS[i]; + for (i, name) in REGIONS.iter().enumerate() { mgr.execute( "bench", &format!("INSERT INTO regions VALUES ({i}, '{name}', 'US')"), diff --git a/crates/dkdc-db-core/src/catalog.rs b/crates/dkdc-db-core/src/catalog.rs index 633233d..6a3c7b1 100644 --- a/crates/dkdc-db-core/src/catalog.rs +++ b/crates/dkdc-db-core/src/catalog.rs @@ -11,6 +11,8 @@ use crate::error::Result; use crate::provider::SqliteTableProvider; use crate::schema; +const DEFAULT_SCHEMA: &str = "public"; + /// One catalog per database. Contains a single "public" schema. pub struct SqliteCatalogProvider { schema: Arc, @@ -40,12 +42,12 @@ impl CatalogProvider for SqliteCatalogProvider { } fn schema_names(&self) -> Vec { - vec!["public".to_string()] + vec![DEFAULT_SCHEMA.to_string()] } fn schema(&self, name: &str) -> Option> { match name { - "public" => Some(self.schema.clone()), + DEFAULT_SCHEMA => Some(self.schema.clone()), _ => None, } } diff --git a/crates/dkdc-db-core/src/db.rs b/crates/dkdc-db-core/src/db.rs index 4681b21..a7502f1 100644 --- a/crates/dkdc-db-core/src/db.rs +++ b/crates/dkdc-db-core/src/db.rs @@ -7,6 +7,10 @@ use crate::router; use crate::schema; use crate::write::WriteEngine; +const IN_MEMORY_PATH: &str = ":memory:"; +const WAL_MODE_PRAGMA: &str = "journal_mode"; +const WAL_MODE_VALUE: &str = "'wal'"; + /// Try to extract a table name from a simple SELECT query for PRAGMA fallback. /// Handles `SELECT ... FROM table_name` patterns. fn extract_table_name(sql: &str) -> Option { @@ -21,6 +25,12 @@ fn extract_table_name(sql: &str) -> Option { if name.is_empty() { None } else { Some(name) } } +/// Enable WAL mode on a connection for concurrent read+write. +async fn enable_wal(conn: &turso::Connection) -> Result<()> { + conn.pragma_update(WAL_MODE_PRAGMA, WAL_MODE_VALUE).await?; + Ok(()) +} + pub struct DkdcDb { write: WriteEngine, db: turso::Database, @@ -40,9 +50,7 @@ impl DkdcDb { .await?; let write_conn = db.connect()?; - - // Enable WAL mode for concurrent read+write - write_conn.pragma_update("journal_mode", "'wal'").await?; + enable_wal(&write_conn).await?; let write = WriteEngine::new(write_conn); @@ -52,11 +60,9 @@ impl DkdcDb { /// Open an in-memory database (for testing). /// Turso connections from the same Database share the same in-memory data. pub async fn open_in_memory() -> Result { - let db = turso::Builder::new_local(":memory:").build().await?; + let db = turso::Builder::new_local(IN_MEMORY_PATH).build().await?; let write_conn = db.connect()?; - - // Enable WAL mode - write_conn.pragma_update("journal_mode", "'wal'").await?; + enable_wal(&write_conn).await?; let write = WriteEngine::new(write_conn); diff --git a/crates/dkdc-db-core/src/manager.rs b/crates/dkdc-db-core/src/manager.rs index 61e38f3..869da64 100644 --- a/crates/dkdc-db-core/src/manager.rs +++ b/crates/dkdc-db-core/src/manager.rs @@ -76,7 +76,7 @@ impl DbManager { pub async fn drop_db(&self, name: &str) -> Result<()> { let mut dbs = self.dbs.write().await; if dbs.remove(name).is_none() { - return Err(Error::Schema(format!("database '{name}' not found"))); + return Err(db_not_found(name)); } // DataFusion has no deregister_catalog — replace with an empty catalog. self.ctx @@ -116,6 +116,7 @@ impl DbManager { "database '{name}' not found — create it with POST /db" ))); } + // Lazy open — use create_db, but tolerate "already exists" from a concurrent ensure_db match self.create_db(name).await { Ok(()) => Ok(()), @@ -129,9 +130,7 @@ impl DbManager { error::validate_sql(sql)?; self.ensure_db(db_name).await?; let dbs = self.dbs.read().await; - let managed = dbs - .get(db_name) - .ok_or_else(|| Error::Schema(format!("database '{db_name}' not found")))?; + let managed = dbs.get(db_name).ok_or_else(|| db_not_found(db_name))?; let result = managed.db.execute(sql).await?; // Refresh catalog if DDL (selective: only the affected table) if router::is_ddl(sql) { @@ -163,7 +162,7 @@ impl DbManager { self.ensure_db(db_name).await?; let dbs = self.dbs.read().await; dbs.get(db_name) - .ok_or_else(|| Error::Schema(format!("database '{db_name}' not found")))? + .ok_or_else(|| db_not_found(db_name))? .db .query_oltp(sql) .await @@ -174,7 +173,7 @@ impl DbManager { self.ensure_db(db_name).await?; let dbs = self.dbs.read().await; dbs.get(db_name) - .ok_or_else(|| Error::Schema(format!("database '{db_name}' not found")))? + .ok_or_else(|| db_not_found(db_name))? .db .list_tables() .await @@ -188,7 +187,7 @@ impl DbManager { // Table name is validated above (alphanumeric + underscores only), safe for interpolation let sql = format!("SELECT name, type FROM pragma_table_info('{table}')"); dbs.get(db_name) - .ok_or_else(|| Error::Schema(format!("database '{db_name}' not found")))? + .ok_or_else(|| db_not_found(db_name))? .db .query_oltp(&sql) .await @@ -241,6 +240,11 @@ impl DbManager { } } +/// Return a "database not found" schema error. +fn db_not_found(db_name: &str) -> Error { + Error::Schema(format!("database '{db_name}' not found")) +} + /// Map database name to catalog name. Slashes become underscores /// because DataFusion catalog names can't contain slashes. fn catalog_name(db_name: &str) -> String {