From ec0f29afa935f900b295ed26e100489bb4e413ec Mon Sep 17 00:00:00 2001 From: Cody Date: Mon, 6 Apr 2026 00:42:19 -0400 Subject: [PATCH] fix(catalog): harden extract_ddl_table_name for quoted and schema-qualified names MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The function now works on original-case SQL (not uppercased), handles quoted identifiers containing spaces, and extracts the table segment from schema-qualified names (schema.table → table). Adds 17 unit tests covering edge cases. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/dkdc-db-core/src/catalog.rs | 291 ++++++++++++++++++++++++++--- 1 file changed, 261 insertions(+), 30 deletions(-) diff --git a/crates/dkdc-db-core/src/catalog.rs b/crates/dkdc-db-core/src/catalog.rs index 6a3c7b1..166b66c 100644 --- a/crates/dkdc-db-core/src/catalog.rs +++ b/crates/dkdc-db-core/src/catalog.rs @@ -90,7 +90,7 @@ impl SqliteSchemaProvider { if upper.starts_with("CREATE TABLE") || upper.starts_with("CREATE TEMP TABLE") { // Extract table name and add just that table - if let Some(name) = extract_ddl_table_name(&upper) { + if let Some(name) = extract_ddl_table_name(sql) { let conn = self.db.connect()?; if let Ok(columns) = schema::introspect_table(&conn, &name).await { let arrow_schema = schema::build_arrow_schema(&columns); @@ -104,12 +104,12 @@ impl SqliteSchemaProvider { } } } else if upper.starts_with("DROP TABLE") { - if let Some(name) = extract_ddl_table_name(&upper) { + if let Some(name) = extract_ddl_table_name(sql) { self.tables.write().await.remove(&name); return Ok(()); } } else if upper.starts_with("ALTER TABLE") { - if let Some(name) = extract_ddl_table_name(&upper) { + if let Some(name) = extract_ddl_table_name(sql) { let conn = self.db.connect()?; if let Ok(columns) = schema::introspect_table(&conn, &name).await { let arrow_schema = schema::build_arrow_schema(&columns); @@ -135,41 +135,272 @@ impl fmt::Debug for SqliteSchemaProvider { } } -/// Extract the table name from a DDL statement (uppercased). +/// Extract the table name from a DDL statement (original case). /// Handles: CREATE [TEMP] TABLE [IF NOT EXISTS] name, DROP TABLE [IF EXISTS] name, -/// ALTER TABLE name. -fn extract_ddl_table_name(upper: &str) -> Option { - let tokens: Vec<&str> = upper.split_whitespace().collect(); - let name_idx = if tokens.first() == Some(&"CREATE") { - // CREATE [TEMP] TABLE [IF NOT EXISTS] name - let table_pos = tokens.iter().position(|t| *t == "TABLE")?; - let mut idx = table_pos + 1; - // Skip IF NOT EXISTS - if tokens.get(idx) == Some(&"IF") { - idx += 3; // skip IF NOT EXISTS +/// ALTER TABLE name. Supports quoted identifiers (double quotes, backticks, single +/// quotes) including those containing spaces, and schema-qualified names (extracts +/// the last segment after dot). +fn extract_ddl_table_name(sql: &str) -> Option { + let sql = sql.trim_start(); + + // Find the keyword position after TABLE where the name begins. + // We do case-insensitive matching on keywords but preserve the name's original case. + let upper = sql.to_uppercase(); + let tokens_upper: Vec<&str> = upper.split_whitespace().collect(); + + let skip_keywords = if tokens_upper.first() == Some(&"CREATE") { + let table_pos = tokens_upper.iter().position(|t| *t == "TABLE")?; + let mut skip = table_pos + 1; + if tokens_upper.get(skip) == Some(&"IF") { + skip += 3; // IF NOT EXISTS } - idx - } else if tokens.first() == Some(&"DROP") { - // DROP TABLE [IF EXISTS] name - let table_pos = tokens.iter().position(|t| *t == "TABLE")?; - let mut idx = table_pos + 1; - if tokens.get(idx) == Some(&"IF") { - idx += 2; // skip IF EXISTS + skip + } else if tokens_upper.first() == Some(&"DROP") { + let table_pos = tokens_upper.iter().position(|t| *t == "TABLE")?; + let mut skip = table_pos + 1; + if tokens_upper.get(skip) == Some(&"IF") { + skip += 2; // IF EXISTS } - idx - } else if tokens.first() == Some(&"ALTER") { - // ALTER TABLE name - let table_pos = tokens.iter().position(|t| *t == "TABLE")?; + skip + } else if tokens_upper.first() == Some(&"ALTER") { + let table_pos = tokens_upper.iter().position(|t| *t == "TABLE")?; table_pos + 1 } else { return None; }; - tokens.get(name_idx).map(|name| { - // Remove any surrounding quotes or backticks, and convert to lowercase - name.trim_matches(|c| c == '"' || c == '`' || c == '\'') - .to_lowercase() - }) + // Now walk the original SQL to skip that many whitespace-delimited tokens, + // then extract the name (which may be quoted and contain spaces). + let mut pos = 0; + let bytes = sql.as_bytes(); + let len = bytes.len(); + + for _ in 0..skip_keywords { + // Skip whitespace + while pos < len && bytes[pos].is_ascii_whitespace() { + pos += 1; + } + // Skip token + while pos < len && !bytes[pos].is_ascii_whitespace() { + pos += 1; + } + } + + // Skip whitespace before the name + while pos < len && bytes[pos].is_ascii_whitespace() { + pos += 1; + } + + if pos >= len { + return None; + } + + // Extract the name token. If it starts with a quote, read until the matching + // closing quote (handling dot-separated quoted segments). + let raw_name = extract_name_token(&sql[pos..])?; + + // Handle schema-qualified names: take the last segment after dot + let table_part = if let Some(dot_pos) = raw_name.rfind('.') { + &raw_name[dot_pos + 1..] + } else { + &raw_name + }; + + // Strip surrounding quotes + let name = strip_quotes(table_part); + if name.is_empty() { + return None; + } + + Some(name.to_lowercase()) +} + +/// Extract a possibly-quoted, possibly-schema-qualified name token from the +/// start of `s`. Returns the raw token including any quotes and dots. +fn extract_name_token(s: &str) -> Option { + let mut result = String::new(); + let mut chars = s.chars().peekable(); + + loop { + let Some(&ch) = chars.peek() else { + break; // end of input + }; + + if ch == '"' || ch == '`' || ch == '\'' { + // Quoted segment: read until matching close quote + let quote = ch; + result.push(chars.next().unwrap()); + loop { + let c = chars.next()?; // unclosed quote → None + result.push(c); + if c == quote { + break; + } + } + } else if ch.is_ascii_whitespace() || ch == '(' || ch == ';' { + // End of name token + break; + } else { + result.push(chars.next().unwrap()); + } + } + + if result.is_empty() { + None + } else { + Some(result) + } +} + +/// Strip one layer of surrounding quotes (double quotes, backticks, or single quotes). +fn strip_quotes(s: &str) -> &str { + let bytes = s.as_bytes(); + if bytes.len() >= 2 { + let first = bytes[0]; + let last = bytes[bytes.len() - 1]; + if (first == b'"' && last == b'"') + || (first == b'`' && last == b'`') + || (first == b'\'' && last == b'\'') + { + return &s[1..s.len() - 1]; + } + } + s +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_simple_create() { + assert_eq!( + extract_ddl_table_name("CREATE TABLE users (id INTEGER)"), + Some("users".to_string()) + ); + } + + #[test] + fn test_create_if_not_exists() { + assert_eq!( + extract_ddl_table_name("CREATE TABLE IF NOT EXISTS users (id INTEGER)"), + Some("users".to_string()) + ); + } + + #[test] + fn test_create_temp_table() { + assert_eq!( + extract_ddl_table_name("CREATE TEMP TABLE staging (id INTEGER)"), + Some("staging".to_string()) + ); + } + + #[test] + fn test_drop_table() { + assert_eq!( + extract_ddl_table_name("DROP TABLE users"), + Some("users".to_string()) + ); + } + + #[test] + fn test_drop_table_if_exists() { + assert_eq!( + extract_ddl_table_name("DROP TABLE IF EXISTS users"), + Some("users".to_string()) + ); + } + + #[test] + fn test_alter_table() { + assert_eq!( + extract_ddl_table_name("ALTER TABLE users ADD COLUMN email TEXT"), + Some("users".to_string()) + ); + } + + #[test] + fn test_quoted_double() { + assert_eq!( + extract_ddl_table_name(r#"CREATE TABLE "MyTable" (id INTEGER)"#), + Some("mytable".to_string()) + ); + } + + #[test] + fn test_quoted_backtick() { + assert_eq!( + extract_ddl_table_name("CREATE TABLE `MyTable` (id INTEGER)"), + Some("mytable".to_string()) + ); + } + + #[test] + fn test_quoted_with_spaces() { + assert_eq!( + extract_ddl_table_name(r#"CREATE TABLE "my table" (id INTEGER)"#), + Some("my table".to_string()) + ); + } + + #[test] + fn test_schema_qualified() { + assert_eq!( + extract_ddl_table_name("CREATE TABLE myschema.mytable (id INTEGER)"), + Some("mytable".to_string()) + ); + } + + #[test] + fn test_schema_qualified_quoted() { + assert_eq!( + extract_ddl_table_name(r#"CREATE TABLE "myschema"."MyTable" (id INTEGER)"#), + Some("mytable".to_string()) + ); + } + + #[test] + fn test_case_insensitive_keywords() { + assert_eq!( + extract_ddl_table_name("create table Users (id INTEGER)"), + Some("users".to_string()) + ); + } + + #[test] + fn test_leading_whitespace() { + assert_eq!( + extract_ddl_table_name(" CREATE TABLE users (id INTEGER)"), + Some("users".to_string()) + ); + } + + #[test] + fn test_no_table_keyword() { + assert_eq!(extract_ddl_table_name("SELECT * FROM users"), None); + } + + #[test] + fn test_empty_input() { + assert_eq!(extract_ddl_table_name(""), None); + } + + #[test] + fn test_name_followed_by_semicolon() { + assert_eq!( + extract_ddl_table_name("DROP TABLE users;"), + Some("users".to_string()) + ); + } + + #[test] + fn test_name_with_no_trailing() { + assert_eq!( + extract_ddl_table_name("DROP TABLE users"), + Some("users".to_string()) + ); + } } #[async_trait]