Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ proptest = "1.0.0"
regex = "1.5.6"
criterion = { version = "0.7.0", features = ["async_tokio"] }
ldk-node-062 = { package = "ldk-node", version = "=0.6.2" }
ldk-node-070 = { package = "ldk-node", version = "=0.7.0" }

[target.'cfg(not(no_download))'.dev-dependencies]
electrsd = { version = "0.36.1", default-features = false, features = ["legacy", "esplora_a33e97e1", "corepc-node_27_2"] }
Expand Down
263 changes: 206 additions & 57 deletions src/io/sqlite_store/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,74 +9,136 @@ use lightning::io;
use rusqlite::Connection;

pub(super) fn migrate_schema(
connection: &mut Connection, kv_table_name: &str, from_version: u16, to_version: u16,
connection: &mut Connection, kv_table_name: &str, mut from_version: u16, to_version: u16,
) -> io::Result<()> {
assert!(from_version < to_version);
if from_version == 1 && to_version == 2 {
let tx = connection.transaction().map_err(|e| {
let msg = format!(
"Failed to migrate table {} from user_version {} to {}: {}",
kv_table_name, from_version, to_version, e
);
io::Error::new(io::ErrorKind::Other, msg)
})?;

// Rename 'namespace' column to 'primary_namespace'
let sql = format!(
"ALTER TABLE {}
RENAME COLUMN namespace TO primary_namespace;",
kv_table_name
);

tx.execute(&sql, []).map_err(|e| {
let msg = format!(
"Failed to migrate table {} from user_version {} to {}: {}",
kv_table_name, from_version, to_version, e
);
io::Error::new(io::ErrorKind::Other, msg)
})?;

// Add new 'secondary_namespace' column
let sql = format!(
"ALTER TABLE {}
ADD secondary_namespace TEXT DEFAULT \"\" NOT NULL;",
kv_table_name
);

tx.execute(&sql, []).map_err(|e| {
let msg = format!(
"Failed to migrate table {} from user_version {} to {}: {}",
kv_table_name, from_version, to_version, e
);
if from_version == 1 && to_version >= 2 {
migrate_v1_to_v2(connection, kv_table_name)?;
from_version = 2;
}
if from_version == 2 && to_version >= 3 {
migrate_v2_to_v3(connection, kv_table_name)?;
}

Ok(())
}

fn migrate_v1_to_v2(connection: &mut Connection, kv_table_name: &str) -> io::Result<()> {
let tx = connection.transaction().map_err(|e| {
let msg = format!("Failed to migrate table {} from v1 to v2: {}", kv_table_name, e);
io::Error::new(io::ErrorKind::Other, msg)
})?;

// Rename 'namespace' column to 'primary_namespace'
let sql = format!(
"ALTER TABLE {}
RENAME COLUMN namespace TO primary_namespace;",
kv_table_name
);

tx.execute(&sql, []).map_err(|e| {
let msg = format!("Failed to migrate table {} from v1 to v2: {}", kv_table_name, e);
io::Error::new(io::ErrorKind::Other, msg)
})?;

// Add new 'secondary_namespace' column
let sql = format!(
"ALTER TABLE {}
ADD secondary_namespace TEXT DEFAULT \"\" NOT NULL;",
kv_table_name
);

tx.execute(&sql, []).map_err(|e| {
let msg = format!("Failed to migrate table {} from v1 to v2: {}", kv_table_name, e);
io::Error::new(io::ErrorKind::Other, msg)
})?;

// Update user_version
tx.pragma(Some(rusqlite::DatabaseName::Main), "user_version", 2u16, |_| Ok(())).map_err(
|e| {
let msg = format!("Failed to upgrade user_version from 1 to 2: {}", e);
io::Error::new(io::ErrorKind::Other, msg)
})?;

// Update user_version
tx.pragma(Some(rusqlite::DatabaseName::Main), "user_version", to_version, |_| Ok(()))
.map_err(|e| {
let msg = format!(
"Failed to upgrade user_version from {} to {}: {}",
from_version, to_version, e
);
io::Error::new(io::ErrorKind::Other, msg)
})?;
},
)?;

tx.commit().map_err(|e| {
let msg = format!(
"Failed to migrate table {} from user_version {} to {}: {}",
kv_table_name, from_version, to_version, e
);
tx.commit().map_err(|e| {
let msg = format!("Failed to migrate table {} from v1 to v2: {}", kv_table_name, e);
io::Error::new(io::ErrorKind::Other, msg)
})?;

Ok(())
}

fn migrate_v2_to_v3(connection: &mut Connection, kv_table_name: &str) -> io::Result<()> {
let tx = connection.transaction().map_err(|e| {
let msg = format!("Failed to migrate table {} from v2 to v3: {}", kv_table_name, e);
io::Error::new(io::ErrorKind::Other, msg)
})?;

let old_table = format!("{}_v2_old", kv_table_name);
let map_err = |e: rusqlite::Error| -> io::Error {
let msg = format!("Failed to migrate table {} from v2 to v3: {}", kv_table_name, e);
io::Error::new(io::ErrorKind::Other, msg)
};

// Recreate the table to ensure the correct PRIMARY KEY regardless of migration history.
// Tables migrated from v1 have PK (primary_namespace, key) only — missing
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, I'm confused, why is this true? We explictly have

	// Add new 'secondary_namespace' column
	let sql = format!(
		"ALTER TABLE {}
			ADD secondary_namespace TEXT DEFAULT \"\" NOT NULL;",
		kv_table_name
	);

in migrate_v1_to_v2? Am I missing something, or is this Claude hallucinating?

Copy link
Contributor Author

@benthecarman benthecarman Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In v1 to v2 is added but its not a part of the primary key so we can't use ON CONFLICT on writes for just updating the value. Before we could use replace but now we want to keep the created_at date. This rewrites the table to make it a part of the primary key, otherwise we need to do a query inside to look up the created_at date when replacing. Could add a unique index instead but this felt cleaner

// secondary_namespace. Recreating normalizes the schema for all databases.
let rename_sql = format!("ALTER TABLE {} RENAME TO {}", kv_table_name, old_table);
tx.execute(&rename_sql, []).map_err(map_err)?;

let create_table_sql = format!(
"CREATE TABLE {} (
primary_namespace TEXT NOT NULL,
secondary_namespace TEXT DEFAULT \"\" NOT NULL,
key TEXT NOT NULL CHECK (key <> ''),
value BLOB,
created_at INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (primary_namespace, secondary_namespace, key)
)",
kv_table_name
);
tx.execute(&create_table_sql, []).map_err(map_err)?;

// Copy data and backfill created_at from ROWID for relative ordering
let set_created_at_sql = format!(
"INSERT INTO {} (primary_namespace, secondary_namespace, key, value, created_at)
SELECT primary_namespace, secondary_namespace, key, value, ROWID FROM {}",
kv_table_name, old_table
);
tx.execute(&set_created_at_sql, []).map_err(map_err)?;

let drop_old_sql = format!("DROP TABLE {}", old_table);
tx.execute(&drop_old_sql, []).map_err(map_err)?;

// Create composite index for paginated listing
let sql = format!(
"CREATE INDEX idx_{}_paginated ON {} (primary_namespace, secondary_namespace, created_at DESC, key ASC)",
kv_table_name, kv_table_name
);
tx.execute(&sql, []).map_err(map_err)?;

// Update user_version
tx.pragma(Some(rusqlite::DatabaseName::Main), "user_version", 3u16, |_| Ok(())).map_err(
|e| {
let msg = format!("Failed to upgrade user_version from 2 to 3: {}", e);
io::Error::new(io::ErrorKind::Other, msg)
})?;
}
},
)?;

tx.commit().map_err(|e| {
let msg = format!("Failed to migrate table {} from v2 to v3: {}", kv_table_name, e);
io::Error::new(io::ErrorKind::Other, msg)
})?;

Ok(())
}

#[cfg(test)]
mod tests {
use std::fs;

use lightning::util::persist::KVStoreSync;
use lightning::util::persist::{KVStoreSync, PaginatedKVStoreSync};
use rusqlite::{named_params, Connection};

use crate::io::sqlite_store::SqliteStore;
Expand Down Expand Up @@ -128,7 +190,7 @@ mod tests {
let sql = format!(
"INSERT OR REPLACE INTO {} (namespace, key, value) VALUES (:namespace, :key, :value);",
kv_table_name
);
);
let mut stmt = connection.prepare_cached(&sql).unwrap();

stmt.execute(named_params! {
Expand Down Expand Up @@ -166,4 +228,91 @@ mod tests {
// Check we can continue to use the store just fine.
do_read_write_remove_list_persist(&store);
}

#[test]
fn rwrl_post_schema_2_migration() {
let old_schema_version = 2u16;

let mut temp_path = random_storage_path();
temp_path.push("rwrl_post_schema_2_migration");

let db_file_name = "test_db".to_string();
let kv_table_name = "test_table".to_string();

let test_ns = "testspace";
let test_sub = "testsub";

{
// Create a v2 database manually
fs::create_dir_all(temp_path.clone()).unwrap();
let mut db_file_path = temp_path.clone();
db_file_path.push(db_file_name.clone());

let connection = Connection::open(db_file_path.clone()).unwrap();

connection
.pragma(
Some(rusqlite::DatabaseName::Main),
"user_version",
old_schema_version,
|_| Ok(()),
)
.unwrap();

let sql = format!(
"CREATE TABLE IF NOT EXISTS {} (
primary_namespace TEXT NOT NULL,
secondary_namespace TEXT DEFAULT \"\" NOT NULL,
key TEXT NOT NULL CHECK (key <> ''),
value BLOB, PRIMARY KEY ( primary_namespace, secondary_namespace, key )
);",
kv_table_name
);
connection.execute(&sql, []).unwrap();

// Insert 3 rows in a known order
for i in 0..3 {
let key = format!("key_{}", i);
let sql = format!(
"INSERT INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:ns, :sub, :key, :value);",
kv_table_name
);
let mut stmt = connection.prepare_cached(&sql).unwrap();
stmt.execute(named_params! {
":ns": test_ns,
":sub": test_sub,
":key": key,
":value": vec![i as u8; 8],
})
.unwrap();
}
}

// Open with new code, triggering v2→v3 migration
let store = SqliteStore::new(temp_path, Some(db_file_name), Some(kv_table_name)).unwrap();

// Verify data survived
for i in 0..3 {
let key = format!("key_{}", i);
let data = store.read(test_ns, test_sub, &key).unwrap();
assert_eq!(data, vec![i as u8; 8]);
}

// Verify paginated listing works and returns entries in ROWID-backfilled order (newest first)
let response =
PaginatedKVStoreSync::list_paginated(&store, test_ns, test_sub, None).unwrap();
assert_eq!(response.keys.len(), 3);
// ROWIDs were 1, 2, 3 so created_at was backfilled as 1, 2, 3
// Newest first: key_2, key_1, key_0
assert_eq!(response.keys, vec!["key_2", "key_1", "key_0"]);

// Verify we can write new entries and they get proper ordering
KVStoreSync::write(&store, test_ns, test_sub, "key_new", vec![99u8; 8]).unwrap();
let response =
PaginatedKVStoreSync::list_paginated(&store, test_ns, test_sub, None).unwrap();
assert_eq!(response.keys[0], "key_new");

// Check we can continue to use the store just fine.
do_read_write_remove_list_persist(&store);
}
}
Loading
Loading