Skip to content

feature(internal, cmd): protocol-migrate history#546

Open
aristidesstaffieri wants to merge 46 commits intofeature/data-migrationsfrom
feature/protocol-migrate-history
Open

feature(internal, cmd): protocol-migrate history#546
aristidesstaffieri wants to merge 46 commits intofeature/data-migrationsfrom
feature/protocol-migrate-history

Conversation

@aristidesstaffieri
Copy link
Copy Markdown
Contributor

@aristidesstaffieri aristidesstaffieri commented Mar 20, 2026

Closes #515

What

Implements the protocol-migrate history CLI subcommand, backfills protocol history for historical ledgers within the retention window, synchronizing with live ingestion through a shared per-protocol history CAS cursor.

  • ProtocolMigrateHistoryService — walks forward from the oldest ingest cursor to the latest cursor, calling PersistHistory at each ledger with batch processing and CAS-based progress tracking. On CAS failure, detects handoff from live ingestion and exits cleanly. Tracks history_migration_status lifecycle (in_progress → success/failed).
  • protocol-data-migrate CLI command (cmd/protocol_data_migrate.go) — accepts --protocol-id flags to select which protocols to backfill.
  • Shared helpers extracted to ingest_helpers.go — getLedgerWithRetry, buildProtocolProcessorMap, and cursor name formatters deduplicate logic between history migration and live ingestion.
  • UpdateHistoryMigrationStatus added to ProtocolsModel for migration lifecycle tracking.
  • Configurable cursor names — history migration respects --latest-ledger-cursor-name / --oldest-ledger-cursor-name overrides, matching live ingestion behavior.
  • Convergence poll — distinguishes poll-deadline-exceeded (converged) from transient RPC errors (retry) to avoid premature success marking during network blips.
  • Bulk StatusFailed — excludes protocols already handed off to live ingestion via CAS failure, preventing re-processing conflicts.
  • Integration tests covering history-to-live handoff, asymmetric cursor CAS paths (history ready but current_state lags), and BoundedRange→UnboundedRange transitions on shared ledger backends.

Why

Protocol state production needs historical data within the retention window before live ingestion can provide complete coverage. Without this command, protocols only have history state from the point live ingestion started using their processors — all prior ledgers are missing. This subcommand lets operators backfill per-protocol, with safe handoff to live
ingestion via CAS cursors ensuring no gaps or double-processing at the migration boundary.

Known limitations

N/A

Issue that this PR addresses

#515

Checklist

PR Structure

  • It is not possible to break this PR down into smaller PRs.
  • This PR does not mix refactoring changes with feature changes.
  • This PR's title starts with name of package that is most changed in the PR, or all if the changes are broad or impact many packages.

Thoroughness

  • This PR adds tests for the new functionality or fixes.
  • All updated queries have been tested (refer to this check if the data set returned by the updated query is expected to be same as the original one).

Release

  • This is not a breaking change.
  • This is ready to be tested in development.
  • The new functionality is gated with a feature flag if this is not ready for production.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new protocol history backfill workflow to support protocol state production over the retention window, with a CLI entrypoint and shared ingest helpers to coordinate handoff with live ingestion via CAS cursors.

Changes:

  • Introduces ProtocolMigrateHistoryService (+ extensive unit tests) to backfill per-protocol history using CAS progress tracking and clean handoff semantics.
  • Adds protocol-migrate history CLI command and integration tests covering migration→live handoff and cursor edge-cases.
  • Extracts shared ingest helpers (ledger fetch retry, processor map building, cursor name helpers) and adds UpdateHistoryMigrationStatus to the protocols model.

Reviewed changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
internal/services/protocol_migrate_history.go Implements history migration service with CAS-based cursor advancement and convergence polling.
internal/services/protocol_migrate_history_test.go Adds comprehensive unit tests for migration success/failure, resume, handoff, and convergence polling.
internal/services/ingest_helpers.go Adds shared helpers: getLedgerWithRetry, buildProtocolProcessorMap, and protocol cursor name formatters.
internal/services/ingest.go Reuses buildProtocolProcessorMap; removes duplicated getLedgerWithRetry method.
internal/services/ingest_live.go Switches to shared cursor name helpers and shared getLedgerWithRetry.
internal/services/ingest_backfill.go Switches to shared getLedgerWithRetry.
internal/services/ingest_test.go Updates tests to call shared getLedgerWithRetry directly.
internal/integrationtests/data_migration_test.go Adds integration coverage for history migration→live ingestion handoff and cursor-asymmetry scenarios.
internal/data/protocols.go Adds UpdateHistoryMigrationStatus to support migration lifecycle tracking.
internal/data/protocols_test.go Adds test for UpdateHistoryMigrationStatus.
internal/data/mocks.go Updates ProtocolsModelMock to include UpdateHistoryMigrationStatus.
internal/data/ingest_store.go Adds exported constants for ingest cursor key names.
cmd/root.go Registers the new protocol-migrate command.
cmd/protocol_data_migrate.go Implements protocol-migrate history CLI subcommand and wires it to the service.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@aristidesstaffieri aristidesstaffieri force-pushed the feature/protocol-migrate-history branch 2 times, most recently from f8fe9b9 to a530835 Compare March 23, 2026 17:37
@aristidesstaffieri aristidesstaffieri marked this pull request as ready for review March 23, 2026 17:50
@aristidesstaffieri aristidesstaffieri requested a review from a team March 23, 2026 17:50
@aristidesstaffieri aristidesstaffieri force-pushed the feature/live-ingest-state-production branch 2 times, most recently from addd208 to aa7ee47 Compare March 25, 2026 20:51
@aristidesstaffieri aristidesstaffieri force-pushed the feature/protocol-migrate-history branch 4 times, most recently from 6674b16 to 262156b Compare March 25, 2026 22:35
@aristidesstaffieri aristidesstaffieri force-pushed the feature/live-ingest-state-production branch from aa7ee47 to 0f30c7c Compare March 30, 2026 18:39
@aristidesstaffieri aristidesstaffieri force-pushed the feature/protocol-migrate-history branch from 262156b to 1812163 Compare March 30, 2026 18:44
Base automatically changed from feature/live-ingest-state-production to feature/data-migrations April 2, 2026 21:38
aristidesstaffieri and others added 13 commits April 2, 2026 15:58
…_status to differentiate between not started and in progress migrations
…steps in the ContractData branch, removes Balance branch
…istinction between uploads and upgrades/deployments
…col-setup in the "When Checkpoint Classification Runs" section
…dow, in order to discard state changes outside of retention.

1. Schema changes: enabled field removed, display_name removed, status default is not_started
2. Status values: All updated to new naming scheme (not_started, classification_in_progress, classification_success, backfilling_in_progress, backfilling_success, failed)
3. protocol-setup: Now uses --protocol-id flag (opt-in), updated command examples and workflow
4. Classification section (line 125): Updated to describe ContractCode validation and ContractData lookup
5. Checkpoint population diagram: Removed Balance branch, updated to show WASM hash storage in known_wasms
6. Live ingestion classification diagram: Separated into ContractCode and ContractData paths with RPC fallback
7. Live State Production diagram: Updated classification box to mention ContractCode uploads and ContractData Instance changes
8. Backfill migration: Added retention-aware processing throughout (flow diagram, workflow diagram, parallel processing)
9. Parallel backfill worker pool: Added steps for retention window filtering
… relationship between classification and state production
…s tracking

  - Add known_wasms table (migration, model, mock, and data layer tests) for tracking WASM hashes during checkpoint population
  - Add KnownWasm field to Models struct
  - Create WasmIngestionService (wasm_ingestion.go) that runs protocol validators against WASM bytecode and batch-persists hashes to known_wasms
  - Create CheckpointService (checkpoint.go) that orchestrates single-pass checkpoint population, delegating ContractCode entries to both WasmIngestionService and
  TokenProcessor, and all other entries to TokenProcessor
  - Extract readerFactory on checkpointService for injectable checkpoint reader creation
  - Extract TokenProcessor interface and NewTokenProcessor from TokenIngestionService, moving checkpoint iteration logic out of token_ingestion.go into checkpoint.go
  - Remove db, archive, and PopulateAccountTokens from TokenIngestionService interface and struct
  - Remove dbPool parameter from NewTokenIngestionServiceForLoadtest
  - Wire CheckpointService into IngestServiceConfig and ingestService
  - Update ingest_live.go to call checkpointService.PopulateFromCheckpoint instead of tokenIngestionService.PopulateAccountTokens
  - Update ingest.go setupDeps to construct WasmIngestionService and CheckpointService
  - Add ContractValidatorMock, ProtocolValidatorMock, ChangeReaderMock, CheckpointServiceMock, WasmIngestionServiceMock, TokenProcessorMock, and TokenIngestionServiceMock
  updates to mocks.go
  - Add unit tests for WasmIngestionService (10 cases covering ProcessContractCode and PersistKnownWasms)
  - Add unit tests for CheckpointService (16 cases covering entry routing, error propagation, and context cancellation)
…IngestionService (#524)

* Initial plan

* Remove validator execution from WasmIngestionService

Co-authored-by: aristidesstaffieri <6886006+aristidesstaffieri@users.noreply.github.com>

* services/wasm_ingestion: remove ProtocolValidator execution from WasmIngestionService

Co-authored-by: aristidesstaffieri <6886006+aristidesstaffieri@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: aristidesstaffieri <6886006+aristidesstaffieri@users.noreply.github.com>
…IngestionService to use config struct

  WasmIngestionService.ProcessContractCode no longer receives the full
  bytecode—it only needs the hash to track protocol WASMs. This reduces
  memory pressure during checkpoint population.

  TokenIngestionService construction is consolidated into a single
  NewTokenIngestionService(config) constructor, eliminating the separate
  NewTokenIngestionServiceForLoadtest variant. The loadtest runner now
  uses the same constructor with only the fields it needs.

  Also refactors processContractInstanceChange to return a
  contractInstanceResult struct instead of multiple return values,
  extracts newCheckpointData() helper, uses idiomatic nil slices
  instead of make([]T, 0), and introduces a checkpointTestFixture
  struct to reduce boilerplate in checkpoint tests. Constructors
  return concrete types instead of interfaces to allow direct field
  access in tests.
  Persist contract-to-WASM-hash mappings by extending WasmIngestionService
  with ProcessContractData and PersistProtocolContracts methods. During
  checkpoint population, ContractData Instance entries are parsed to extract
  the wasm_hash and contract_id relationship, which is stored in a new
  protocol_contracts table (FK to protocol_wasms). This mapping will be used
  by protocol-setup and live ingestion to classify contracts by protocol.
aristidesstaffieri and others added 28 commits April 2, 2026 15:58
   Replace raw []byte with types.HashBytea for WasmHash and ContractID
   fields in ProtocolWasm and ProtocolContract models. HashBytea implements
   sql.Scanner and driver.Valuer to auto-convert between raw bytes (DB)
   and hex strings (Go), consistent with how Transaction.Hash is handled.

   Updated files:
   - internal/data/protocol_wasms.go, protocol_contracts.go (models + BatchInsert)
   - internal/indexer/processors/protocol_wasms.go, protocol_contracts.go
   - internal/services/wasm_ingestion.go
   - All corresponding test files
…kpointService

  WasmIngestionService was only used by CheckpointService, and
  TokenIngestionService's NewTokenProcessor/TokenProcessor interface was
  only used by CheckpointService. This inlines all checkpoint-specific
  logic directly into CheckpointService, eliminating unnecessary
  intermediate service abstractions.

  - Rewrite checkpoint.go to absorb all checkpoint logic: checkpointData,
    batch, trustline/contract/WASM processing, and protocol persistence
  - Replace positional NewCheckpointService args with CheckpointServiceConfig
  - Strip token_ingestion.go to live-only (ProcessTokenChanges); remove
    TokenProcessor interface, NewTokenProcessor, and checkpoint-only fields
    from TokenIngestionServiceConfig
  - Delete wasm_ingestion.go (absorbed into checkpoint.go)
  - Remove WasmIngestionServiceMock, TokenProcessorMock from mocks.go
  - Update ingest.go wiring and simplify TokenIngestionServiceConfig
  - Rewrite checkpoint_test.go with data model mocks; port WASM and
    checkpoint processor tests from deleted test files
  - Add TrustlineAssetModelMock to data/mocks.go
  - Add valid AccountId to makeAccountChange() helper to prevent nil pointer dereference
  - Add missing protocolWasmModel.BatchInsert mock expectation in ContractCodeEntry test
  - Fix ContextCancellation test to cancel context during reader.Read() instead of before PopulateFromCheckpoint, matching the expected error path
…s tracking

  - Add known_wasms table (migration, model, mock, and data layer tests) for tracking WASM hashes during checkpoint population
  - Add KnownWasm field to Models struct
  - Create WasmIngestionService (wasm_ingestion.go) that runs protocol validators against WASM bytecode and batch-persists hashes to known_wasms
  - Create CheckpointService (checkpoint.go) that orchestrates single-pass checkpoint population, delegating ContractCode entries to both WasmIngestionService and
  TokenProcessor, and all other entries to TokenProcessor
  - Extract readerFactory on checkpointService for injectable checkpoint reader creation
  - Extract TokenProcessor interface and NewTokenProcessor from TokenIngestionService, moving checkpoint iteration logic out of token_ingestion.go into checkpoint.go
  - Remove db, archive, and PopulateAccountTokens from TokenIngestionService interface and struct
  - Remove dbPool parameter from NewTokenIngestionServiceForLoadtest
  - Wire CheckpointService into IngestServiceConfig and ingestService
  - Update ingest_live.go to call checkpointService.PopulateFromCheckpoint instead of tokenIngestionService.PopulateAccountTokens
  - Update ingest.go setupDeps to construct WasmIngestionService and CheckpointService
  - Add ContractValidatorMock, ProtocolValidatorMock, ChangeReaderMock, CheckpointServiceMock, WasmIngestionServiceMock, TokenProcessorMock, and TokenIngestionServiceMock
  updates to mocks.go
  - Add unit tests for WasmIngestionService (10 cases covering ProcessContractCode and PersistKnownWasms)
  - Add unit tests for CheckpointService (16 cases covering entry routing, error propagation, and context cancellation)
   The known_wasms table was renamed to protocol_wasms, and the new
   ProtocolWasm model already exists. Remove the obsolete KnownWasm
   model, its tests, and the old migration file.
… names

  This aligns Protocol→Protocols and ProtocolWasm→ProtocolWasms (structs, interfaces, mocks, and Models struct fields) to match the protocols and protocol_wasms table names,
   consistent with the existing ProtocolContracts convention.
  Introduces the infrastructure for protocol processors to produce and
  persist protocol-specific state during live ledger ingestion, gated by
  per-protocol compare-and-swap cursors that coordinate with concurrent
  migration processes.

  Key changes:
  - ProtocolProcessor interface and ProtocolProcessorInput for protocol-
    specific ledger analysis and state persistence
  - Processor registry (RegisterProcessor/GetAllProcessors) for protocol
    processor discovery at startup
  - Dual CAS gating in PersistLedgerData (step 5.5): per-protocol history
    and current_state cursors ensure exactly-once writes even when live
    ingestion and migration run concurrently
  - Protocol contract cache with periodic refresh to avoid per-ledger DB
    queries for classified contracts
  - Data layer additions: IngestStoreModel.GetTx, CompareAndSwap,
    ProtocolContractsModel.GetByProtocolID, ProtocolsModel.GetClassified

  Tests:
  - Unit tests for processor registry (concurrent safety, overwrite, etc.)
  - 5 subtests for PersistLedgerData CAS gating (win, lose, behind, no
    cursor, no processors) using a real test DB and sentinel-writing
    testProtocolProcessor
  - Docker integration test (ProtocolStateProductionTestSuite) exercising
    CAS gating against a live ingest container's DB in three phases
   Combine protocol setup and protocol state tests into a shared
   DataMigrationTestSuite. Use real SEP41 setup classification plus
   manual cursor seeding to verify live ingestion produces protocol
   history/current state only when the protocol cursors are ready,
   and stays inert when they are absent.
  The two IngestStore.Get calls inside the RunInPgxTransaction callback
  read from the connection pool instead of dbTx, breaking transactional
  consistency and opening extra connections. They are also redundant:
  CompareAndSwap already handles all cursor states (at ledger-1, ahead,
  behind, or missing) with the correct outcome, making the pre-read
  guards unnecessary.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
  When GetByProtocolID fails for a protocol during cache refresh, preserve
  the previously cached contracts instead of silently dropping them. Only
  advance lastRefreshLedger when all protocols refresh successfully so
  transient DB errors trigger a retry on the next ledger rather than
  serving empty data for the next 100 ledgers.
…rProcessors test

  The skippedProcessor mock was never included in the processors map passed
  to produceProtocolStateForProcessors, making AssertNotCalled trivially true.
  Removed the unused mock and renamed the test to accurately reflect what it
  verifies: that the function processes only the processors provided in the map.
  Scope PersistLedgerData's CAS loop to eligibleProtocolProcessors so
  processors that were skipped by protocolProcessorsEligibleForProduction
  cannot win a CAS race if a migration advances the cursor between the
  pre-check and the in-transaction swap.

  Add IncDBQueryError calls on rows.Scan and rows.Err failure paths in
  GetByProtocolID, GetByIDs, and GetClassified so scan/iteration errors
  are surfaced in metrics the same way Query() errors already are.

  Move DB queries outside the write lock in refreshProtocolContractCache
  so concurrent readers are not blocked for the full DB round-trip.
  When any GetByProtocolID call fails during cache refresh, lastRefreshLedger
  was never updated, causing the staleness check to trigger on every ledger
  instead of every 100th — a 100x query amplification. Make the ledger update
  unconditional since the cache already preserves previous entries on partial
  failure, so data integrity is not at risk. Add warn-level logging to
  distinguish partial from full refreshes.
… tests

  Introduce ProtocolMigrateHistoryService that backfills protocol state
  changes for historical ledgers, walking forward from the oldest ingest
  cursor to the latest cursor and persisting PersistHistory at each ledger.
  The service tracks progress via a per-protocol history_cursor using CAS,
  refreshes the protocol contract cache periodically, and marks
  history_migration_status on completion.

  Supporting changes:
  - Add `protocol-data-migrate` CLI command (cmd/protocol_data_migrate.go)
  - Add UpdateHistoryMigrationStatus to ProtocolsModel and its mock/tests
  - Add per-call tracking (persistedHistorySeqs, persistedCurrentStateSeqs)
    to integrationTestProcessor for verifying persistence call counts

  Integration test additions:
  - Enhance TestHistoryMigrationThenLiveIngestionHandoff with per-ledger
    PersistHistory verification across migration and live handoff phases
  - Add TestLiveIngestionHistoryCursorReadyCurrentStateLags proving the
    asymmetric cursor CAS path: when history_cursor is ready but
    current_state_cursor lags, only PersistHistory executes while
    PersistCurrentState is correctly skipped
  Move duplicated logic into ingest_helpers.go:
  - getLedgerWithRetry: was identical method on both ingestService and
    protocolMigrateHistoryService, now a package-level function
  - buildProtocolProcessorMap: deduplicates processor slice-to-map
    conversion with nil/duplicate validation
  - protocolHistoryCursorName/protocolCurrentStateCursorName: replaces
    scattered Sprintf calls for cursor key formatting

  Simplifies getLedgerWithRetry test to call the function directly
  without constructing a full ingestService.
  The convergence poll in processAllProtocols treated any error from
  PrepareRange/GetLedger as convergence, including transient RPC failures
  like connection refused. This could prematurely mark protocols as
  StatusSuccess during network blips. Now discriminates three cases:
  poll deadline exceeded (converged), parent context cancelled (propagate),
  anything else (transient — retry).
  The history migration service read cursor positions using hardcoded
  constants (data.OldestLedgerCursorName, data.LatestLedgerCursorName),
  ignoring operator overrides via CLI flags. Add configurable cursor name
  fields with defaults matching the ingest command, so operators who
  override --latest-ledger-cursor-name or --oldest-ledger-cursor-name
  get consistent behavior across live ingestion and history migration.
…dger backend

  The outer loop in protocol history migration transitions the same
  LedgerBackend instance between BoundedRange and UnboundedRange without
  explicit reset. This works because captive core internally closes the
  subprocess before opening a new range, but that behavior is an
  implementation detail not guaranteed by the LedgerBackend interface.

  Add an explanatory comment at the transition point and a new integration
  test (rangeTrackingBackend) that verifies the Bounded→Unbounded→Bounded
  PrepareRange sequence when the tip advances during the convergence poll.
  When processAllProtocols fails, the Run() method was marking all active
  protocols as StatusFailed, including ones already handed off to live
  ingestion via CAS failure. This caused handed-off protocols to be
  re-processed on the next Run(), conflicting with live ingestion's cursor
  ownership.

  Change processAllProtocols to return handed-off protocol IDs alongside
  the error, then split the status update: handed-off protocols get
  StatusSuccess (live ingestion owns them), while only non-handed-off
  protocols get StatusFailed.
  If the caller passes duplicate protocol IDs (e.g. --protocol-id foo
  --protocol-id foo), duplicate trackers would be created for the same
  protocol, causing self-induced CAS failures and incorrect handoff
  detection. Add order-preserving deduplication as the first operation
  in validate(), which is the single choke-point for both Run() and
  processAllProtocols().
  Move reusable logic into internal/utils/ as generic functions
  (RetryWithBackoff[T], BuildMap[T]) and move cursor name helpers to
  ingestion_utils.go. Inline all call sites in services to use utils
  directly and delete the ingest helpers file entirely.

  Also fix variable shadow lint errors in ingest_live.go and
  protocol_migrate_history.go.
  These tables were referenced by model code and tests but had no
  corresponding migration, causing test failures after rebase.
@aristidesstaffieri aristidesstaffieri force-pushed the feature/protocol-migrate-history branch from 13f61d7 to a22f382 Compare April 2, 2026 21:59
Copy link
Copy Markdown
Contributor

@aditya1702 aditya1702 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found some issues with the migration logic


// BatchGetByProtocolIDs returns all contracts for the given protocol IDs in a single query,
// grouped by protocol ID.
func (m *ProtocolContractsModel) BatchGetByProtocolIDs(ctx context.Context, protocolIDs []string) (map[string][]ProtocolContracts, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we remove this function (didnt we introduce in earlier PR?)


// SetEligibleProtocolProcessorsForTest sets the eligible protocol processors for testing.
// In production, this is set by ingestLiveLedgers before each PersistLedgerData call.
func (m *ingestService) SetEligibleProtocolProcessorsForTest(processors map[string]ProtocolProcessor) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not have methods on receivers just for tests to use. We should find some way to do this in the test file. I think this might be against Go best practices. Can you double check this one?

}

// produceProtocolState runs all registered protocol processors against a ledger.
func (m *ingestService) produceProtocolState(ctx context.Context, ledgerMeta xdr.LedgerCloseMeta, ledgerSeq uint32) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same check for this function too - this function is only used in tests so is this fine adding a method for that on ingestService reciever?

// protocolContractCache caches classified protocol contracts to avoid per-ledger DB queries.
// Only accessed from the single-threaded live ingestion loop, so no mutex is needed.
type protocolContractCache struct {
mu sync.RWMutex
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to be shared by goroutines?

}

// processAllProtocols runs history migration for all protocols using ledger-first iteration.
// Each ledger is fetched once and processed by all eligible protocols, avoiding redundant RPC calls.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a historical ledger processing (migration), why not use the datastore backend? Because that is better suited for backfilling, with RPC we will have to ensure the historical range is supported by its history retention

}

// Find minimum cursor among non-handed-off trackers
var minCursor uint32
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we tracking the minCursor here?


// processAllProtocols runs history migration for all protocols using ledger-first iteration.
// Each ledger is fetched once and processed by all eligible protocols, avoiding redundant RPC calls.
func (s *protocolMigrateHistoryService) processAllProtocols(ctx context.Context, protocolIDs []string) ([]string, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we break this function in smaller sub functions for readability. It is very long and hard to understand on all the moving parts

@@ -0,0 +1,1191 @@
package services
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome test suite 👏

I think these 2 are missing:

  1. Context cancellation during processing — There's a select { case <-ctx.Done() } check at line 304 but no test cancels a context mid-batch. A test with
    context.WithCancel that cancels after N ledgers would verify the cleanup path returns correct handedOffProtocolIDs.
  2. oldest_ingest_ledger is 0 — The guard at line 226 (return nil, fmt.Errorf("ingestion has not started yet")) has no test.


log.Ctx(ctx).Infof("Processing ledgers %d to %d for %d protocol(s)", startLedger, latestLedger, len(protocolIDs))

if err := s.ledgerBackend.PrepareRange(ctx, ledgerbackend.BoundedRange(startLedger, latestLedger)); err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I found a valid bug here based on my knowledge of ledger backend. I used Claude to craft this reply:

Bug: PrepareRange called multiple times on RPCLedgerBackend — fails in production

processAllProtocols calls PrepareRange on every iteration of the outer for {} loop (line 299) and again during the convergence poll (line 411).
However, RPCLedgerBackend.PrepareRange returns an error if the backend is already
prepared
:

if b.preparedRange != nil {
    return fmt.Errorf("RPCLedgerBackend is already prepared with range [%d, %d]",
        b.preparedRange.from, b.preparedRange.to)
}

There is no reset mechanism — preparedRange is only nil at construction time.

Affected paths

Path Where What happens
Tip advances after first batch Line 299 (second outer loop iteration) PrepareRange returns error → function returns with failure
Convergence poll Line 411 PrepareRange(UnboundedRange(...)) returns error → misinterpreted as transient error → infinite retry loop
Transient poll error retry Line 421 → continue → line 299 Same as tip-advance case

Why tests don't catch this

All test backends (multiLedgerBackend, transientErrorBackend, rangeTrackingBackend) implement PrepareRange as a no-op that always returns nil.
None of them enforce the single-prepare constraint that RPCLedgerBackend has.

Impact

The function works correctly only when the entire backfill completes in a single batch — i.e., all ledgers from startLedger to latestLedger are
processed and the tip doesn't advance during that time. If the tip advances or the convergence poll is reached, the service errors out.

continue
}

// At tip — poll briefly for convergence.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following from the PrepareRange bug above — rather than patching the re-call sites, the entire outer loop and convergence poll can be replaced with a
single UnboundedRange.

Current design

PrepareRange(BoundedRange(start, latest))    ← first call
  process ledgers start..latest
  tip advanced?
    PrepareRange(BoundedRange(next, newLatest))  ← BUG: errors on RPCLedgerBackend
  at tip?
    PrepareRange(UnboundedRange(latest+1))       ← BUG: same
    GetLedger with 5s timeout (convergence poll)
    new ledger? loop again → PrepareRange(Bounded...) ← BUG: same

Proposed design

PrepareRange(UnboundedRange(startLedger))    ← once, at the top

for seq := startLedger; ; seq++ {
    // for each tracker, check if converged (via timeout on GetLedger)
    pollCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
    meta, err := s.ledgerBackend.GetLedger(pollCtx, seq)
    cancel()
    if errors.Is(err, context.DeadlineExceeded) {
        // no new ledger in 5s → converged at seq-1
        break
    }
    // process meta for eligible trackers...
}

Why this works with RPCLedgerBackend

  • PrepareRange is called exactly once — no conflict with the single-prepare constraint
  • UnboundedRange means GetLedger accepts any sequence number without range validation
  • RPCLedgerBackend.GetLedger already has built-in 2s polling when a ledger is beyond the RPC tip — so the timeout context is the only convergence
    mechanism needed
  • Bounded vs Unbounded has no behavioral difference in RPCLedgerBackend other than the range check at line
    131

What gets removed

  • The outer for {} loop (lines 268–441)
  • minCursor recomputation on each iteration
  • The 40-line convergence poll section (lines 402–440)
  • All Bounded/Unbounded mode switching
  • The PrepareRange bug entirely

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw this same refactor should also work with the datastore backend since that also supports live ingestion of new ledgers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants