Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions crates/dkdc-db-cli/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use clap::{Parser, Subcommand};

pub const TMUX_SESSION: &str = "dkdc-db";
const DEFAULT_LOG_LINES: usize = 50;

#[derive(Parser)]
#[command(name = "db", about = "dkdc-db: HTAP database")]
Expand All @@ -14,11 +15,11 @@ pub enum Commands {
/// Start the database server (in tmux)
Serve {
/// Host to bind to
#[arg(long, default_value = "127.0.0.1")]
#[arg(long, default_value = dkdc_db_core::DEFAULT_HOST)]
host: String,

/// Port to bind to
#[arg(long, default_value_t = 4200)]
#[arg(long, default_value_t = dkdc_db_core::DEFAULT_PORT)]
port: u16,

/// Run in foreground (skip tmux)
Expand All @@ -30,15 +31,15 @@ pub enum Commands {
/// Show database server status
Status {
/// Port to check
#[arg(long, default_value_t = 4200)]
#[arg(long, default_value_t = dkdc_db_core::DEFAULT_PORT)]
port: u16,
},
/// Attach to database server tmux session
Attach,
/// Show recent logs from tmux session
Logs {
/// Number of lines to show
#[arg(short, long, default_value_t = 50)]
#[arg(short, long, default_value_t = DEFAULT_LOG_LINES)]
lines: usize,
},
/// Generate a starter db.toml in the current directory
Expand All @@ -55,7 +56,7 @@ pub enum Commands {
#[arg()]
name: String,
/// Server URL
#[arg(long, default_value = "http://127.0.0.1:4200")]
#[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)]
url: String,
},
/// Drop a database
Expand All @@ -64,13 +65,13 @@ pub enum Commands {
#[arg()]
name: String,
/// Server URL
#[arg(long, default_value = "http://127.0.0.1:4200")]
#[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)]
url: String,
},
/// Interactive SQL REPL
Repl {
/// Server URL
#[arg(long, default_value = "http://127.0.0.1:4200")]
#[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)]
url: String,
/// Initial database to use
#[arg(long)]
Expand All @@ -79,7 +80,7 @@ pub enum Commands {
/// Execute a read query and print results
Query {
/// Server URL
#[arg(long, default_value = "http://127.0.0.1:4200")]
#[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)]
url: String,
/// Database for OLTP query (omit for global analytical)
#[arg(long)]
Expand All @@ -90,7 +91,7 @@ pub enum Commands {
/// Execute a write statement
Execute {
/// Server URL
#[arg(long, default_value = "http://127.0.0.1:4200")]
#[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)]
url: String,
/// Target database (required)
#[arg(long)]
Expand All @@ -101,7 +102,7 @@ pub enum Commands {
/// List tables in a database
Tables {
/// Server URL
#[arg(long, default_value = "http://127.0.0.1:4200")]
#[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)]
url: String,
/// Database name (required)
#[arg(long)]
Expand All @@ -110,7 +111,7 @@ pub enum Commands {
/// List databases
List {
/// Server URL
#[arg(long, default_value = "http://127.0.0.1:4200")]
#[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)]
url: String,
},
}
8 changes: 6 additions & 2 deletions crates/dkdc-db-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,16 @@ async fn async_main() -> anyhow::Result<()> {
// Apply server config (CLI flags override config file)
let (effective_host, effective_port) = match &config {
Some(c) => {
let h = if host != "127.0.0.1" {
let h = if host != dkdc_db_core::DEFAULT_HOST {
host.clone()
} else {
c.server.host.clone()
};
let p = if port != 4200 { port } else { c.server.port };
let p = if port != dkdc_db_core::DEFAULT_PORT {
port
} else {
c.server.port
};
(h, p)
}
None => (host.clone(), port),
Expand Down
10 changes: 5 additions & 5 deletions crates/dkdc-db-client/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub enum Commands {
/// Interactive SQL REPL
Repl {
/// Server URL
#[arg(long, default_value = "http://127.0.0.1:4200")]
#[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)]
url: String,
/// Initial database to use
#[arg(long)]
Expand All @@ -21,7 +21,7 @@ pub enum Commands {
/// Execute a read query and print results
Query {
/// Server URL
#[arg(long, default_value = "http://127.0.0.1:4200")]
#[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)]
url: String,
/// Database for OLTP query (omit for global analytical)
#[arg(long)]
Expand All @@ -32,7 +32,7 @@ pub enum Commands {
/// Execute a write statement
Execute {
/// Server URL
#[arg(long, default_value = "http://127.0.0.1:4200")]
#[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)]
url: String,
/// Target database (required)
#[arg(long)]
Expand All @@ -43,7 +43,7 @@ pub enum Commands {
/// List tables in a database
Tables {
/// Server URL
#[arg(long, default_value = "http://127.0.0.1:4200")]
#[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)]
url: String,
/// Database name (required)
#[arg(long)]
Expand All @@ -52,7 +52,7 @@ pub enum Commands {
/// List databases
List {
/// Server URL
#[arg(long, default_value = "http://127.0.0.1:4200")]
#[arg(long, default_value = dkdc_db_core::DEFAULT_SERVER_URL)]
url: String,
},
}
82 changes: 32 additions & 50 deletions crates/dkdc-db-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[cfg(feature = "cli")]
pub mod repl;

use serde::{Deserialize, Serialize};
use serde::{Deserialize, Serialize, de::DeserializeOwned};

#[derive(thiserror::Error, Debug)]
pub enum Error {
Expand Down Expand Up @@ -74,12 +74,7 @@ impl DbClient {
.send()
.await?;

if resp.status().is_success() {
Ok(())
} else {
let body: ErrorResponse = resp.json().await?;
Err(Error::Server(body.error))
}
self.handle_response_unit(resp).await
}

/// Drop a database.
Expand All @@ -90,12 +85,7 @@ impl DbClient {
.send()
.await?;

if resp.status().is_success() {
Ok(())
} else {
let body: ErrorResponse = resp.json().await?;
Err(Error::Server(body.error))
}
self.handle_response_unit(resp).await
}

/// List all databases.
Expand All @@ -106,12 +96,7 @@ impl DbClient {
.send()
.await?;

if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let body: ErrorResponse = resp.json().await?;
Err(Error::Server(body.error))
}
self.handle_response(resp).await
}

/// Execute a write statement against a specific database.
Expand All @@ -125,13 +110,8 @@ impl DbClient {
.send()
.await?;

if resp.status().is_success() {
let body: ExecuteResponse = resp.json().await?;
Ok(body.affected)
} else {
let body: ErrorResponse = resp.json().await?;
Err(Error::Server(body.error))
}
let body: ExecuteResponse = self.handle_response(resp).await?;
Ok(body.affected)
}

/// Analytical query through DataFusion. Supports cross-db joins.
Expand All @@ -145,12 +125,7 @@ impl DbClient {
.send()
.await?;

if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let body: ErrorResponse = resp.json().await?;
Err(Error::Server(body.error))
}
self.handle_response(resp).await
}

/// OLTP fast-path read against a specific database.
Expand All @@ -164,12 +139,7 @@ impl DbClient {
.send()
.await?;

if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let body: ErrorResponse = resp.json().await?;
Err(Error::Server(body.error))
}
self.handle_response(resp).await
}

/// List tables in a specific database.
Expand All @@ -180,12 +150,7 @@ impl DbClient {
.send()
.await?;

if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let body: ErrorResponse = resp.json().await?;
Err(Error::Server(body.error))
}
self.handle_response(resp).await
}

/// Get table schema for a specific database.
Expand All @@ -196,12 +161,7 @@ impl DbClient {
.send()
.await?;

if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let body: ErrorResponse = resp.json().await?;
Err(Error::Server(body.error))
}
self.handle_response(resp).await
}

/// Health check.
Expand All @@ -213,4 +173,26 @@ impl DbClient {
.await?;
Ok(resp.status().is_success())
}

// -- internal --

/// Deserialize a successful JSON response or extract the server error.
async fn handle_response<T: DeserializeOwned>(&self, resp: reqwest::Response) -> Result<T> {
if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let body: ErrorResponse = resp.json().await?;
Err(Error::Server(body.error))
}
}

/// Check for success on responses with no meaningful body.
async fn handle_response_unit(&self, resp: reqwest::Response) -> Result<()> {
if resp.status().is_success() {
Ok(())
} else {
let body: ErrorResponse = resp.json().await?;
Err(Error::Server(body.error))
}
}
}
3 changes: 1 addition & 2 deletions crates/dkdc-db-core/benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ async fn setup_multi_table(size: usize) -> DbManager {

// Insert regions
mgr.execute("bench", "BEGIN").await.unwrap();
for i in 0..10 {
let name = REGIONS[i];
for (i, name) in REGIONS.iter().enumerate() {
mgr.execute(
"bench",
&format!("INSERT INTO regions VALUES ({i}, '{name}', 'US')"),
Expand Down
6 changes: 4 additions & 2 deletions crates/dkdc-db-core/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use crate::error::Result;
use crate::provider::SqliteTableProvider;
use crate::schema;

const DEFAULT_SCHEMA: &str = "public";

/// One catalog per database. Contains a single "public" schema.
pub struct SqliteCatalogProvider {
schema: Arc<SqliteSchemaProvider>,
Expand Down Expand Up @@ -40,12 +42,12 @@ impl CatalogProvider for SqliteCatalogProvider {
}

fn schema_names(&self) -> Vec<String> {
vec!["public".to_string()]
vec![DEFAULT_SCHEMA.to_string()]
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
match name {
"public" => Some(self.schema.clone()),
DEFAULT_SCHEMA => Some(self.schema.clone()),
_ => None,
}
}
Expand Down
20 changes: 13 additions & 7 deletions crates/dkdc-db-core/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ use crate::router;
use crate::schema;
use crate::write::WriteEngine;

const IN_MEMORY_PATH: &str = ":memory:";
const WAL_MODE_PRAGMA: &str = "journal_mode";
const WAL_MODE_VALUE: &str = "'wal'";

/// Try to extract a table name from a simple SELECT query for PRAGMA fallback.
/// Handles `SELECT ... FROM table_name` patterns.
fn extract_table_name(sql: &str) -> Option<String> {
Expand All @@ -21,6 +25,12 @@ fn extract_table_name(sql: &str) -> Option<String> {
if name.is_empty() { None } else { Some(name) }
}

/// Enable WAL mode on a connection for concurrent read+write.
async fn enable_wal(conn: &turso::Connection) -> Result<()> {
conn.pragma_update(WAL_MODE_PRAGMA, WAL_MODE_VALUE).await?;
Ok(())
}

pub struct DkdcDb {
write: WriteEngine,
db: turso::Database,
Expand All @@ -40,9 +50,7 @@ impl DkdcDb {
.await?;

let write_conn = db.connect()?;

// Enable WAL mode for concurrent read+write
write_conn.pragma_update("journal_mode", "'wal'").await?;
enable_wal(&write_conn).await?;

let write = WriteEngine::new(write_conn);

Expand All @@ -52,11 +60,9 @@ impl DkdcDb {
/// Open an in-memory database (for testing).
/// Turso connections from the same Database share the same in-memory data.
pub async fn open_in_memory() -> Result<Self> {
let db = turso::Builder::new_local(":memory:").build().await?;
let db = turso::Builder::new_local(IN_MEMORY_PATH).build().await?;
let write_conn = db.connect()?;

// Enable WAL mode
write_conn.pragma_update("journal_mode", "'wal'").await?;
enable_wal(&write_conn).await?;

let write = WriteEngine::new(write_conn);

Expand Down
Loading
Loading