Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 261 additions & 30 deletions crates/dkdc-db-core/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl SqliteSchemaProvider {

if upper.starts_with("CREATE TABLE") || upper.starts_with("CREATE TEMP TABLE") {
// Extract table name and add just that table
if let Some(name) = extract_ddl_table_name(&upper) {
if let Some(name) = extract_ddl_table_name(sql) {
let conn = self.db.connect()?;
if let Ok(columns) = schema::introspect_table(&conn, &name).await {
let arrow_schema = schema::build_arrow_schema(&columns);
Expand All @@ -104,12 +104,12 @@ impl SqliteSchemaProvider {
}
}
} else if upper.starts_with("DROP TABLE") {
if let Some(name) = extract_ddl_table_name(&upper) {
if let Some(name) = extract_ddl_table_name(sql) {
self.tables.write().await.remove(&name);
return Ok(());
}
} else if upper.starts_with("ALTER TABLE") {
if let Some(name) = extract_ddl_table_name(&upper) {
if let Some(name) = extract_ddl_table_name(sql) {
let conn = self.db.connect()?;
if let Ok(columns) = schema::introspect_table(&conn, &name).await {
let arrow_schema = schema::build_arrow_schema(&columns);
Expand All @@ -135,41 +135,272 @@ impl fmt::Debug for SqliteSchemaProvider {
}
}

/// Extract the table name from a DDL statement (uppercased).
/// Extract the table name from a DDL statement (original case).
/// Handles: CREATE [TEMP] TABLE [IF NOT EXISTS] name, DROP TABLE [IF EXISTS] name,
/// ALTER TABLE name.
fn extract_ddl_table_name(upper: &str) -> Option<String> {
let tokens: Vec<&str> = upper.split_whitespace().collect();
let name_idx = if tokens.first() == Some(&"CREATE") {
// CREATE [TEMP] TABLE [IF NOT EXISTS] name
let table_pos = tokens.iter().position(|t| *t == "TABLE")?;
let mut idx = table_pos + 1;
// Skip IF NOT EXISTS
if tokens.get(idx) == Some(&"IF") {
idx += 3; // skip IF NOT EXISTS
/// ALTER TABLE name. Supports quoted identifiers (double quotes, backticks, single
/// quotes) including those containing spaces, and schema-qualified names (extracts
/// the last segment after dot).
fn extract_ddl_table_name(sql: &str) -> Option<String> {
let sql = sql.trim_start();

// Find the keyword position after TABLE where the name begins.
// We do case-insensitive matching on keywords but preserve the name's original case.
let upper = sql.to_uppercase();
let tokens_upper: Vec<&str> = upper.split_whitespace().collect();

let skip_keywords = if tokens_upper.first() == Some(&"CREATE") {
let table_pos = tokens_upper.iter().position(|t| *t == "TABLE")?;
let mut skip = table_pos + 1;
if tokens_upper.get(skip) == Some(&"IF") {
skip += 3; // IF NOT EXISTS
}
idx
} else if tokens.first() == Some(&"DROP") {
// DROP TABLE [IF EXISTS] name
let table_pos = tokens.iter().position(|t| *t == "TABLE")?;
let mut idx = table_pos + 1;
if tokens.get(idx) == Some(&"IF") {
idx += 2; // skip IF EXISTS
skip
} else if tokens_upper.first() == Some(&"DROP") {
let table_pos = tokens_upper.iter().position(|t| *t == "TABLE")?;
let mut skip = table_pos + 1;
if tokens_upper.get(skip) == Some(&"IF") {
skip += 2; // IF EXISTS
}
idx
} else if tokens.first() == Some(&"ALTER") {
// ALTER TABLE name
let table_pos = tokens.iter().position(|t| *t == "TABLE")?;
skip
} else if tokens_upper.first() == Some(&"ALTER") {
let table_pos = tokens_upper.iter().position(|t| *t == "TABLE")?;
table_pos + 1
} else {
return None;
};

tokens.get(name_idx).map(|name| {
// Remove any surrounding quotes or backticks, and convert to lowercase
name.trim_matches(|c| c == '"' || c == '`' || c == '\'')
.to_lowercase()
})
// Now walk the original SQL to skip that many whitespace-delimited tokens,
// then extract the name (which may be quoted and contain spaces).
let mut pos = 0;
let bytes = sql.as_bytes();
let len = bytes.len();

for _ in 0..skip_keywords {
// Skip whitespace
while pos < len && bytes[pos].is_ascii_whitespace() {
pos += 1;
}
// Skip token
while pos < len && !bytes[pos].is_ascii_whitespace() {
pos += 1;
}
}

// Skip whitespace before the name
while pos < len && bytes[pos].is_ascii_whitespace() {
pos += 1;
}

if pos >= len {
return None;
}

// Extract the name token. If it starts with a quote, read until the matching
// closing quote (handling dot-separated quoted segments).
let raw_name = extract_name_token(&sql[pos..])?;

// Handle schema-qualified names: take the last segment after dot
let table_part = if let Some(dot_pos) = raw_name.rfind('.') {
&raw_name[dot_pos + 1..]
} else {
&raw_name
};

// Strip surrounding quotes
let name = strip_quotes(table_part);
if name.is_empty() {
return None;
}

Some(name.to_lowercase())
}

/// Extract a possibly-quoted, possibly-schema-qualified name token from the
/// start of `s`. Returns the raw token including any quotes and dots.
fn extract_name_token(s: &str) -> Option<String> {
let mut result = String::new();
let mut chars = s.chars().peekable();

loop {
let Some(&ch) = chars.peek() else {
break; // end of input
};

if ch == '"' || ch == '`' || ch == '\'' {
// Quoted segment: read until matching close quote
let quote = ch;
result.push(chars.next().unwrap());
loop {
let c = chars.next()?; // unclosed quote → None
result.push(c);
if c == quote {
break;
}
}
} else if ch.is_ascii_whitespace() || ch == '(' || ch == ';' {
// End of name token
break;
} else {
result.push(chars.next().unwrap());
}
}

if result.is_empty() {
None
} else {
Some(result)
}
}

/// Strip one layer of surrounding quotes (double quotes, backticks, or single quotes).
fn strip_quotes(s: &str) -> &str {
let bytes = s.as_bytes();
if bytes.len() >= 2 {
let first = bytes[0];
let last = bytes[bytes.len() - 1];
if (first == b'"' && last == b'"')
|| (first == b'`' && last == b'`')
|| (first == b'\'' && last == b'\'')
{
return &s[1..s.len() - 1];
}
}
s
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_simple_create() {
assert_eq!(
extract_ddl_table_name("CREATE TABLE users (id INTEGER)"),
Some("users".to_string())
);
}

#[test]
fn test_create_if_not_exists() {
assert_eq!(
extract_ddl_table_name("CREATE TABLE IF NOT EXISTS users (id INTEGER)"),
Some("users".to_string())
);
}

#[test]
fn test_create_temp_table() {
assert_eq!(
extract_ddl_table_name("CREATE TEMP TABLE staging (id INTEGER)"),
Some("staging".to_string())
);
}

#[test]
fn test_drop_table() {
assert_eq!(
extract_ddl_table_name("DROP TABLE users"),
Some("users".to_string())
);
}

#[test]
fn test_drop_table_if_exists() {
assert_eq!(
extract_ddl_table_name("DROP TABLE IF EXISTS users"),
Some("users".to_string())
);
}

#[test]
fn test_alter_table() {
assert_eq!(
extract_ddl_table_name("ALTER TABLE users ADD COLUMN email TEXT"),
Some("users".to_string())
);
}

#[test]
fn test_quoted_double() {
assert_eq!(
extract_ddl_table_name(r#"CREATE TABLE "MyTable" (id INTEGER)"#),
Some("mytable".to_string())
);
}

#[test]
fn test_quoted_backtick() {
assert_eq!(
extract_ddl_table_name("CREATE TABLE `MyTable` (id INTEGER)"),
Some("mytable".to_string())
);
}

#[test]
fn test_quoted_with_spaces() {
assert_eq!(
extract_ddl_table_name(r#"CREATE TABLE "my table" (id INTEGER)"#),
Some("my table".to_string())
);
}

#[test]
fn test_schema_qualified() {
assert_eq!(
extract_ddl_table_name("CREATE TABLE myschema.mytable (id INTEGER)"),
Some("mytable".to_string())
);
}

#[test]
fn test_schema_qualified_quoted() {
assert_eq!(
extract_ddl_table_name(r#"CREATE TABLE "myschema"."MyTable" (id INTEGER)"#),
Some("mytable".to_string())
);
}

#[test]
fn test_case_insensitive_keywords() {
assert_eq!(
extract_ddl_table_name("create table Users (id INTEGER)"),
Some("users".to_string())
);
}

#[test]
fn test_leading_whitespace() {
assert_eq!(
extract_ddl_table_name(" CREATE TABLE users (id INTEGER)"),
Some("users".to_string())
);
}

#[test]
fn test_no_table_keyword() {
assert_eq!(extract_ddl_table_name("SELECT * FROM users"), None);
}

#[test]
fn test_empty_input() {
assert_eq!(extract_ddl_table_name(""), None);
}

#[test]
fn test_name_followed_by_semicolon() {
assert_eq!(
extract_ddl_table_name("DROP TABLE users;"),
Some("users".to_string())
);
}

#[test]
fn test_name_with_no_trailing() {
assert_eq!(
extract_ddl_table_name("DROP TABLE users"),
Some("users".to_string())
);
}
}

#[async_trait]
Expand Down
Loading