Skip to content

Comments

Dump/restore for subgraphs#6397

Open
lutter wants to merge 20 commits intomasterfrom
lutter/dump
Open

Dump/restore for subgraphs#6397
lutter wants to merge 20 commits intomasterfrom
lutter/dump

Conversation

@lutter
Copy link
Collaborator

@lutter lutter commented Feb 24, 2026

This PR implements graphman dump and graphman restore commands that do what it says on the tin.

Dumps are consistent, but they are taken using a single transaction; that may have very bad effects on the overall system if dumps take a very long time. Restore is a little nicer in that it splits the data import into multiple jobs.

There are lots of ways in which this could be improved, but I feel this is a useful starting point, at the very least for development and test systems. The help text for the dump and restore command have scary warnings about not using them in production - and they shouldn't, even though I think at least for smaller subgraphs they might work ok.

Define the directory layout, metadata.json schema, and per-entity
Parquet file format for dumping and restoring subgraph data.
Add `OidValue::Int4Range(Bound<i32>, Bound<i32>)` variant to properly
deserialize PostgreSQL int4range columns (OID 3904). Update `select_cols`
in dsl.rs to bind block_range as `Range<Integer>` instead of using the
Bytes placeholder, resolving the existing TODO.
Add block$ (immutable), block_range (mutable), and causality_region
to the column lookup so callers that need these columns by name can
find them. Also add CAUSALITY_REGION_COL pseudo-column static and
tighten with_nsp visibility to pub(crate).
Create the foundation for Parquet-based dump/restore: a schema mapping
module that converts relational Table definitions to Arrow Schema objects.

- arrow_schema() maps Table -> Arrow Schema with system columns (vid,
  block tracking, causality_region) followed by data columns
- data_sources_arrow_schema() provides fixed schema for data_sources$ table
- All ColumnType variants mapped (TSVector skipped, Enum -> Utf8)
- List columns wrapped in Arrow List type based on Column.is_list()
Implement rows_to_record_batch() which converts Vec<DynamicRow<OidValue>>
into Arrow RecordBatch objects. Uses a type-erased ColumnBuilder enum that
dispatches on Arrow DataType to create the appropriate array builder.

Supports all OidValue scalar variants (Bool, Int, Int8, Bytes, String,
BigDecimal, Timestamp) and all array variants (BoolArray, Ints, Int8Array,
BytesArray, StringArray, BigDecimalArray, TimestampArray). Block range
columns arrive as separate Int32 values from the dump query, keeping the
converter as a clean 1:1 mapping.
Add ParquetChunkWriter that wraps Arrow's ArrowWriter to stream
RecordBatches into a ZSTD-compressed Parquet file while tracking
row count and vid range. Returns ChunkInfo metadata on finish for
inclusion in metadata.json.
Wire dump queries and Parquet writing into Layout::dump(). This is the
core of the dump feature, covering both entity tables and the special
data_sources$ table.

For entity tables: build a DynamicSelectClause that selects vid, block
columns (split into lower/upper for mutable tables), causality_region,
and data columns. Use VidBatcher for adaptive batching, convert rows
to Arrow RecordBatch, and write via ParquetChunkWriter.

For data_sources$: use a concrete QueryableByName struct with raw SQL
(fixed schema, no DynamicSelectClause needed). Check table existence
via catalog::table_exists before attempting dump.

Metadata includes version, network, block pointers, entity count,
graft info, health, indexes, and per-table chunk tracking. Written
atomically via tmp+rename so its presence signals a complete dump.

Add entity_count() helper in detail.rs. Wire dump() through
DeploymentStore and SubgraphStore.
Wire the dump subcommand into graphman CLI. Takes a deployment search
argument and an output directory, resolves the deployment, and delegates
to SubgraphStore::dump().
Add `parquet/reader.rs` with `read_batches()` to read Parquet files
back into Arrow RecordBatches. This is the foundation for the restore
pipeline (step 1 of the restore plan).
Add `RestoreRow`, `DataSourceRestoreRow` structs and conversion
functions `record_batch_to_restore_rows()` and
`record_batch_to_data_source_rows()` that convert Arrow RecordBatches
back into typed data suitable for database insertion. This is step 2
of the restore plan.
Add a new constructor to InsertQuery that accepts RestoreRow data
from Parquet files, bypassing the WriteChunk/EntityWrite pipeline.
This is the insertion foundation for the restore path.

Also add From<i32> impl for CausalityRegion to allow constructing
values from deserialized Parquet data.
Add a method to read and validate metadata.json from a dump
directory. Make fields of Metadata, Manifest, BlockPtr, Health,
and Error structs pub(crate) so they are accessible from the
restore module.
Add import_data() to the restore module that reads Parquet chunks
and inserts entity data into PostgreSQL tables. Supports resumability
by checking max(vid) in each table and skipping already-imported rows.

Entity tables use InsertQuery::for_restore() for efficient batch
inserts. The data_sources$ table uses raw SQL with bind parameters
since it has a fixed schema outside the Layout.
Wire up the restore pipeline through SubgraphStore::restore() and
DeploymentStore::restore(). Uses plan_restore() to determine whether to
create or replace the deployment site, validates the target shard exists,
resolves the subgraph name for deployment rule matching, and assigns the
restored deployment to a node.

Changes:
- DeploymentStore::restore() coordinates schema creation, data import,
  and finalization
- Inner::restore() handles conflict resolution, site allocation, and
  node assignment via deployment rules
- Expose create_site() and find_active_site() on primary::Connection
- Make create_site() accept an `active` parameter
Add `graphman restore` CLI with options:
  --directory  Path to dump directory
  --shard      Target shard (default: primary)
  --name       Subgraph name for deployment rule matching
  --replace    Drop and recreate if exists in target shard
  --add        Create copy in a different shard
  --force      Restore regardless of current state
Wrap the dump's data-reading operations in a REPEATABLE READ READ ONLY
transaction to get a consistent MVCC snapshot. This prevents head block
vs entity data mismatches, cross-table inconsistency, and missing or
phantom rows caused by concurrent indexing or pruning.

Add a TransactionBuilder for PermittedConnection since diesel-async's
TransactionBuilder requires TransactionManager = AnsiTransactionManager,
which pool-wrapped connection types don't satisfy.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant