Skip to content

Commit 4622073

Browse files
committed
Bootstrap metadata from append deltas
Why these changes are being introduced: For a brand new dataset there is a possible bootstrapping issue after first write. When attemping the first read, it formerly would fail because a metadata.db file did not yet exist. It would correctly log that it should be rebuilt, but that's not strictly necessary. With the append deltas written, we can project over the append deltas themselves for the metadata layer, just like we do inbetween metadata rebuilds or append delta merges. How this addresses that need: If no metadata.db is found, but there exist append deltas for a TIMDEXDataSource, proceed with metadata table creation, thereby skipping the need for an immediate metadata rebuild. Side effects of this change: * TDA is able to write and read from a brand new dataset location without a metadata rebuild. Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/USE-496
1 parent 8382797 commit 4622073

3 files changed

Lines changed: 336 additions & 254 deletions

File tree

tests/test_metadata.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -740,6 +740,45 @@ def test_tdm_keyset_paginated_query_on_prejoined_embeddings_view(tmp_path):
740740
assert set(result_df.columns) == expected_cols
741741

742742

743+
def test_tdm_records_bootstrap_from_append_deltas_without_static_db(tmp_path):
744+
record_count = 20
745+
td = TIMDEXDataset(str(tmp_path / "records_append_deltas_bootstrap"))
746+
747+
td.records.write(
748+
generate_sample_records(
749+
num_records=record_count,
750+
source="alma",
751+
run_date="2025-03-01",
752+
run_type="full",
753+
run_id="records-bootstrap-run",
754+
)
755+
)
756+
757+
assert td.metadata.database_exists() is False
758+
assert len(td.records.read_dataframe()) == record_count
759+
assert len(td.records.read_dataframe(table="current_records")) == record_count
760+
761+
762+
def test_tdm_embeddings_bootstrap_from_append_deltas_without_static_db(tmp_path):
763+
record_count = 20
764+
td = TIMDEXDataset(str(tmp_path / "embeddings_append_deltas_bootstrap"))
765+
766+
td.records.write(
767+
generate_sample_records(
768+
num_records=record_count,
769+
source="alma",
770+
run_date="2025-03-02",
771+
run_type="full",
772+
run_id="emb-delta-run",
773+
)
774+
)
775+
td.embeddings.write(generate_sample_embeddings_for_run(td, run_id="emb-delta-run"))
776+
777+
assert td.metadata.database_exists() is False
778+
assert len(td.embeddings.read_dataframe()) == record_count
779+
assert len(td.embeddings.read_dataframe(table="current_embeddings")) == record_count
780+
781+
743782
def test_tdm_embeddings_write_append_deltas_without_static_embeddings_table(tmp_path):
744783
record_count = 20
745784
td = TIMDEXDataset(str(tmp_path / "embeddings_append_deltas_only"))

timdex_dataset_api/metadata.py

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from duckdb import BinderException as DuckDBBinderException
1111
from duckdb import CatalogException as DuckDBCatalogException
1212
from duckdb import DuckDBPyConnection
13+
from duckdb import HTTPException as DuckDBHTTPException
1314
from duckdb import IOException as DuckDBIOException
1415
from duckdb_engine import Dialect as DuckDBDialect
1516
from sqlalchemy import func, literal, select, text, tuple_
@@ -256,18 +257,35 @@ def _setup_metadata_schema(self) -> None:
256257
"""Set up metadata schema views in the DuckDB connection.
257258
258259
Creates views for accessing static metadata DB and append deltas.
259-
If static DB doesn't exist, logs warning but doesn't fail.
260+
If the static DB does not exist yet, bootstrap metadata views from append
261+
deltas when available.
260262
"""
261263
start_time = time.perf_counter()
262264

263-
if not self.database_exists():
264-
logger.warning(
265-
f"Static metadata database not found @ '{self.metadata_database_path}'. "
266-
"Consider rebuild via TIMDEXDataset.metadata.rebuild_dataset_metadata()."
267-
)
268-
return
269-
270-
self._attach_database_file(self.timdex_dataset.conn)
265+
if self.database_exists():
266+
self._attach_database_file(self.timdex_dataset.conn)
267+
else:
268+
bootstrap_sources = [
269+
source_class.NAME
270+
for source_class in self.source_classes
271+
if self._append_delta_count(self.timdex_dataset.conn, source_class) > 0
272+
]
273+
if bootstrap_sources:
274+
logger.warning(
275+
"Static metadata database not found @ "
276+
f"'{self.metadata_database_path}'. "
277+
"Bootstrapping metadata views from append deltas for: "
278+
f"{', '.join(bootstrap_sources)}. "
279+
"Consider rebuild via "
280+
"TIMDEXDataset.metadata.rebuild_dataset_metadata()."
281+
)
282+
else:
283+
logger.warning(
284+
"Static metadata database not found @ "
285+
f"'{self.metadata_database_path}'. "
286+
"Consider rebuild via "
287+
"TIMDEXDataset.metadata.rebuild_dataset_metadata()."
288+
)
271289

272290
for source_class in self.source_classes:
273291
self._create_append_deltas_view(self.timdex_dataset.conn, source_class)
@@ -313,10 +331,7 @@ def _create_append_deltas_view(
313331
logger.debug(f"creating view metadata.{view_name}")
314332

315333
# get current append delta count
316-
append_delta_count = conn.execute(f"""
317-
select count(*) as file_count
318-
from glob('{deltas_path}/*.parquet')
319-
""").fetchone()[0] # type: ignore[index]
334+
append_delta_count = self._append_delta_count(conn, source_class)
320335
logger.debug(
321336
f"{append_delta_count} append deltas found for '{source_class.NAME}'"
322337
)
@@ -416,6 +431,13 @@ def _create_union_view(
416431
return
417432

418433
if source_class.PREJOIN_RECORDS:
434+
if not self._metadata_table_exists(conn, "records"):
435+
logger.warning(
436+
f"Skipping metadata.{view_name} view creation because missing "
437+
"dependency: records"
438+
)
439+
return
440+
419441
prejoin_cols = ",".join(f"r.{c}" for c in self.PREJOIN_RECORDS_COLUMNS)
420442
join_keys = "timdex_record_id, run_id, run_record_offset"
421443
conn.execute(f"""
@@ -502,6 +524,27 @@ def _should_preload_table(self, table_config: "DataSourceTableConfig") -> bool:
502524
getattr(self.timdex_dataset, table_config.preload_setting_attribute, False)
503525
)
504526

527+
def _append_delta_count(
528+
self, conn: DuckDBPyConnection, source_class: type["TIMDEXDataSource"]
529+
) -> int:
530+
"""Return append delta parquet file count for a single data source."""
531+
deltas_glob = f"{self.append_deltas_path_for(source_class)}/*.parquet"
532+
533+
try:
534+
return cast(
535+
"int",
536+
conn.execute(f"""
537+
select count(*) as file_count
538+
from glob('{deltas_glob}')
539+
""").fetchone()[0], # type: ignore[index]
540+
)
541+
except (DuckDBHTTPException, DuckDBIOException):
542+
logger.debug(
543+
"Could not inspect append deltas for "
544+
f"'{source_class.NAME}' at '{deltas_glob}'; assuming none exist."
545+
)
546+
return 0
547+
505548
def _metadata_table_exists(self, conn: DuckDBPyConnection, table_name: str) -> bool:
506549
"""Return True if a metadata schema table or view exists by name."""
507550
table_exists = conn.execute(f"""

0 commit comments

Comments
 (0)