Skip to content

Add origin and timestamp preservation to table repair.#73

Open
ibrarahmad wants to merge 7 commits intopgEdge:mainfrom
ibrarahmad:PRESERVE-TIME-ORIGINID
Open

Add origin and timestamp preservation to table repair.#73
ibrarahmad wants to merge 7 commits intopgEdge:mainfrom
ibrarahmad:PRESERVE-TIME-ORIGINID

Conversation

@ibrarahmad
Copy link
Contributor

This commit introduces logic to preserve replication origin metadata and commit timestamps during table repair operations. When enabled, repairs now maintain the original source node and timestamp information instead of marking repaired rows with current repair-time metadata.

The implementation extracts origin information from source row metadata and applies it during repair using PostgreSQL's replication origin API. Timestamp parsing supports multiple formats including RFC3339, PostgreSQL text format, and numeric Unix timestamps with automatic scale detection (seconds, milliseconds, microseconds, nanoseconds). Error handling ensures proper session cleanup and graceful degradation when origin metadata is unavailable.

Origin preservation works across repair modes including standard repair, fix-nulls updates, and bidirectional synchronization. New integration tests verify correct preservation of timestamps, origin node IDs, and row content integrity throughout the repair process.

@ibrarahmad
Copy link
Contributor Author

For detailed refence see PR

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Fix all issues with AI agents
In `@db/queries/templates.go`:
- Around line 1552-1569: Add a brief comment above the replication origin SQL
templates (near GetReplicationOriginByName, CreateReplicationOrigin,
SetupReplicationOriginSession, ResetReplicationOriginSession,
SetupReplicationOriginXact, ResetReplicationOriginXact) stating that the
pg_replication_origin_* functions require PostgreSQL 9.5 or newer and that
calling them typically requires superuser or equivalent replication/owner
privileges; also add a short note to the README or database setup docs
documenting the minimum PostgreSQL version (>=9.5) and the privilege
requirements for these functions.

In `@internal/consistency/repair/table_repair_batch_test.go`:
- Around line 217-224: The comment for TestParseNumericTimestamp incorrectly
states that Unix seconds value 1704067200 corresponds to 2024-01-01 12:00:00
UTC; update the comment to the correct timestamp (2024-01-01 00:00:00 UTC) so it
accurately documents the expected value used in the sec variable and
parseNumericTimestamp test; locate the comment in TestParseNumericTimestamp near
the sec := time.Unix(1704067200, 0) line and change the annotation only.

In `@internal/consistency/repair/table_repair.go`:
- Around line 2736-2750: The code in resolveOriginLSN (the block that picks
survivorNode using t.Pools and t.SourceOfTruth and currently returns fmt.Errorf
when none found) should not hard-fail; instead log or warn and return (nil,
false, nil) to allow repairs to continue in degraded preserve-origin mode.
Change the branch that checks survivorNode == "" so it emits a warning
mentioning batchKey.nodeOrigin (use the existing logger or
fmt.Printf/processLogger) and then returns nil, false, nil rather than an error;
keep the survivor selection logic using t.Pools and t.SourceOfTruth unchanged.
- Around line 2301-2343: The current loop in executeUpserts processes multiple
originGroups in a single transaction so pg_replication_origin_xact_setup() is
overwritten and all rows get the last batch's origin; fix by ensuring each
origin group runs in its own transaction or by resetting the origin xact before
re-setting it: update executeUpserts (referencing executeUpsertBatch,
task.setupOriginForBatchKey, task.resetReplicationOriginXact,
task.resetReplicationOriginSession) to either (A) begin a fresh transaction for
each batchKey, call task.setupOriginForBatchKey, run executeUpsertBatch, commit
that transaction and then reset session as needed, or (B) if keeping one outer
tx, call task.resetReplicationOriginXact() (and handle errors) before calling
task.setupOriginForBatchKey for the next batch so per-batch origin
timestamps/LSNs are preserved.
🧹 Nitpick comments (1)
pkg/logger/logger.go (1)

31-33: *Accept io.Writer instead of os.File in SetOutput.

The underlying Log.SetOutput accepts io.Writer. Restricting the wrapper to *os.File is unnecessarily limiting and goes against Go best practices of accepting interfaces, not concrete types. Since *os.File implements io.Writer, this change is fully backward compatible while enabling greater flexibility for testing and custom output destinations.

♻️ Proposed refactor
 import (
 	"fmt"
+	"io"
 	"os"

 	"github.com/charmbracelet/log"
 )
@@
-func SetOutput(w *os.File) {
+func SetOutput(w io.Writer) {
 	Log.SetOutput(w)
 }

@mason-sharp mason-sharp marked this pull request as draft February 12, 2026 00:28
@ibrarahmad ibrarahmad force-pushed the PRESERVE-TIME-ORIGINID branch 5 times, most recently from cca05e9 to 5596050 Compare February 17, 2026 15:00
@coderabbitai
Copy link

coderabbitai bot commented Feb 17, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds replication-origin preservation: new SQL templates and DB query functions, adjusted origin-LSN lookup, PreserveOrigin flag wired through CLI/HTTP and repair logic (per-origin batching, session/xact setup, timestamp handling), extensive tests and docs, sqlite3 version bump, logger.SetOutput, and minor merkle/mtree test and implementation adjustments.

Changes

Cohort / File(s) Summary
DB queries & templates
db/queries/queries.go, db/queries/templates.go
Renamed GetSpockOriginLSNForNode param to originNodeName/signature change; added six replication-origin SQL templates and six public query functions to get/create/setup/reset replication origins, and adjusted origin-LSN lookup join path.
Core repair logic
internal/consistency/repair/table_repair.go
Added PreserveOrigin flag and wiring; per-row origin extraction/types, originBatchKey grouping, numeric/RFC3339Nano timestamp parsing, per-origin transaction/session setup/reset, origin-aware upsert/fix-null execution, and setupTransactionMode.
Repair batching tests
internal/consistency/repair/table_repair_batch_test.go
New unit tests covering originBatchKey construction, grouping semantics, timestamp parsing, and origin extraction edge cases.
Integration tests & helpers
tests/integration/...
tests/integration/table_repair_test.go, tests/integration/timestamp_comparison_test.go, tests/integration/crash_recovery_test.go, tests/integration/helpers_test.go, tests/integration/main_test.go, tests/integration/merkle_tree_test.go, tests/integration/cdc_busy_table_test.go, tests/integration/table_diff_test.go
Added multiple preserve-origin integration tests and helpers (commit timestamp, replication origin, timestamp compare helpers), adjusted test harness to capture logger output, added resetSharedTable helper, relaxed some assertions (CDC LSN non-regression), and updated merkle tests/cleanup.
CLI & HTTP
internal/cli/cli.go, internal/api/http/handler.go
Added --preserve-origin CLI flag and preserve_origin HTTP request field; propagate value into TableRepairTask.PreserveOrigin.
Documentation
docs/commands/repair/table-repair.md, docs/api.md, docs/http-api.md
Documented --preserve-origin flag and preserve_origin API field; added detailed "Preserving replication origin" section describing workflow, requirements, and timestamp precision.
Merkle / mtree change
internal/consistency/mtree/merkle.go, tests/integration/merkle_tree_test.go
Synchronize metadata leaf count with derived blockRanges in BuildMtree; adjust merkle tests to be less strict on per-node diffs and ensure setup/cleanup stability.
Miscellaneous
go.mod, pkg/logger/logger.go
Bumped github.com/mattn/go-sqlite3 version; added pkg/logger.SetOutput(*os.File) to allow redirecting logger output.

Poem

🐇 I twitch my nose and trace each line,
I stitch LSNs and timestamps fine,
per-row origins held with care,
sessions, txns, and fixes there,
soft patch hops — the data's mine.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 53.45% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Add origin and timestamp preservation to table repair' clearly and concisely summarizes the main change—adding functionality to preserve replication origin metadata and timestamps during repairs.
Description check ✅ Passed The description comprehensively covers the changeset, explaining the origin/timestamp preservation feature, implementation approach including metadata extraction, timestamp parsing, error handling, and scope across repair modes.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@pgEdge pgEdge deleted a comment from coderabbitai bot Feb 25, 2026
@mason-sharp mason-sharp marked this pull request as ready for review February 25, 2026 02:17
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
db/queries/templates.go (1)

1542-1550: ⚠️ Potential issue | 🟠 Major

Make origin LSN selection deterministic.

LIMIT 1 without ORDER BY can return an arbitrary remote_lsn when multiple rows match, which can pick stale origin progress. The paired slot query (GetSpockSlotLSNForNode) uses ORDER BY rs.confirmed_flush_lsn DESC for deterministic ordering; apply the same to the origin query.

💡 Proposed fix
 GetSpockOriginLSNForNode: template.Must(template.New("getSpockOriginLSNForNode").Parse(`
 		SELECT ros.remote_lsn::text
 		FROM pg_catalog.pg_replication_origin_status ros
 		JOIN pg_catalog.pg_replication_origin ro ON ro.roident = ros.local_id
 		JOIN spock.subscription s ON ro.roname LIKE '%' || s.sub_name
 		JOIN spock.node o ON o.node_id = s.sub_origin
 		WHERE o.node_name = $1
 			AND ros.remote_lsn IS NOT NULL
+		ORDER BY ros.remote_lsn DESC
 		LIMIT 1
 	`)),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@db/queries/templates.go` around lines 1542 - 1550, GetSpockOriginLSNForNode
currently uses LIMIT 1 with no ordering which can return an arbitrary
remote_lsn; update the SQL in the GetSpockOriginLSNForNode template to order
deterministically (e.g., add "ORDER BY ros.remote_lsn DESC" before LIMIT 1) so
the query returns the latest origin LSN consistently, mirroring the
deterministic ordering used by GetSpockSlotLSNForNode.
tests/integration/table_repair_test.go (1)

161-187: ⚠️ Potential issue | 🟠 Major

captureOutput does not save and restore the logger's previous writer — currently resets to os.Stderr directly.

pkgLogger.SetOutput(w) redirects the global logger, but the logger's prior output writer is never captured. The restore on line 181 unconditionally sets the logger back to oldStderr (the OS-level os.Stderr file descriptor). While this currently works because the logger is initialized to os.Stderr at module load time and SetOutput is never called elsewhere, the pattern is fragile: if the logger is ever configured to write elsewhere, or if captureOutput is called recursively, the previous writer will be lost.

Additionally, charmbracelet/log v0.4.2 (the underlying logger library) does not expose a GetOutput() getter. To fix this safely, the logger wrapper in pkg/logger/logger.go would need to add a GetOutput() method alongside SetOutput(), or use a pattern that saves the writer before redirecting:

🛠️ Proposed fix (requires exposing GetOutput)
 func captureOutput(t *testing.T, task func()) string {
 	t.Helper()
 	oldStdout := os.Stdout
 	oldStderr := os.Stderr
 	r, w, err := os.Pipe()
 	require.NoError(t, err)
 	os.Stdout = w
 	os.Stderr = w

-	// Redirect package logger to capture WARN logs
-	pkgLogger.SetOutput(w)
+	// Redirect package logger to capture WARN logs; save and restore its prior writer.
+	prevLogWriter := pkgLogger.GetOutput()
+	pkgLogger.SetOutput(w)

 	task()

 	err = w.Close()
 	require.NoError(t, err)
 	os.Stdout = oldStdout
 	os.Stderr = oldStderr

-	// Restore package logger
-	pkgLogger.SetOutput(oldStderr)
+	// Restore package logger to its previous writer
+	pkgLogger.SetOutput(prevLogWriter)

 	var buf bytes.Buffer
 	_, err = io.Copy(&buf, r)
 	require.NoError(t, err)
 	return buf.String()
 }

Add GetOutput() to pkg/logger/logger.go:

func GetOutput() *os.File {
	// Requires charmbracelet/log to expose output via GetOutput or similar
}

Additionally, pkgLogger.SetOutput mutates global state without synchronization. If any test using captureOutput is marked t.Parallel() or if sub-tests run concurrently, there will be a data race on the logger's writer. Document that captureOutput is not safe for concurrent use, or guard with a mutex.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration/table_repair_test.go` around lines 161 - 187, captureOutput
mutates the global logger output via pkgLogger.SetOutput(w) but never
captures/restores the logger's prior writer and is racy; modify captureOutput to
first retrieve and save the current logger writer (add a GetOutput method in
pkg/logger/logger.go on the pkgLogger wrapper to return the current io.Writer or
*os.File), call pkgLogger.SetOutput(w) and after the task restore the saved
writer via pkgLogger.SetOutput(saved), and protect SetOutput/GetOutput usage
with a package-level mutex in pkg/logger/logger.go (or document that
captureOutput is not concurrency-safe) so tests calling captureOutput cannot
race when run in parallel.
🧹 Nitpick comments (1)
tests/integration/table_repair_test.go (1)

1464-1477: Duplicate doc-comment — consolidate into one.

The function has two consecutive GoDoc comment blocks (lines 1464–1470 and 1470–1477) that describe the same test scenario with slightly different wording. Only one block is needed; the first should be removed.

📝 Proposed fix
-// TestTableRepair_FixNulls_PreserveOrigin verifies that fix-nulls with PreserveOrigin=true
-// preserves replication origin (and commit timestamp when available) on the target node.
-// Fix-nulls UPDATE scenario: only NULL columns are updated, not a full row INSERT.
-// Source row (n1) has origin metadata (node_n3); target row (n2) has a NULL column.
-// Preserve-origin must extract origin/timestamp from source and apply during UPDATE.
-// Validates origin preservation for UPDATE operations, not just INSERT operations.
-// TestTableRepair_FixNulls_PreserveOrigin verifies that fix-nulls with
-// PreserveOrigin=true preserves the original replication origin after
-// updating NULL columns.
-//
-// Scenario: rows originate on n3 (origin=node_n3) and replicate to n1/n2.
-// Then specific columns are set to NULL on n2 via repair_mode, creating
-// NULL divergence. Fix-nulls fills in the NULLs from n1, and the repaired
-// rows on n2 must still show origin=node_n3.
+// TestTableRepair_FixNulls_PreserveOrigin verifies that fix-nulls with
+// PreserveOrigin=true preserves the original replication origin after
+// updating NULL columns.
+//
+// Scenario: rows originate on n3 (origin=node_n3) and replicate to n1/n2.
+// Then specific columns are set to NULL on n2 via repair_mode, creating
+// NULL divergence. Fix-nulls fills in the NULLs from n1, and the repaired
+// rows on n2 must still show origin=node_n3.
+// Validates origin preservation for UPDATE operations (not just INSERTs);
+// commit timestamp preservation is intentionally not checked here since
+// UPDATE paths do not carry timestamp metadata in the current implementation.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration/table_repair_test.go` around lines 1464 - 1477, There are
two duplicate GoDoc comment blocks above the
TestTableRepair_FixNulls_PreserveOrigin test; remove the first/comment block
(the one starting at the earlier comment lines) so only a single descriptive
doc-comment remains for the TestTableRepair_FixNulls_PreserveOrigin function,
keeping the clearer wording and leaving the test body and the remaining comment
block unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/commands/repair/table-repair.md`:
- Around line 97-101: The doc incorrectly states that deletes and upserts run
atomically in a single transaction; update the wording to reflect the actual
preserve-origin execution: explain that deletes are committed in a separate
step, then upserts are applied in per-(origin,LSN,timestamp) batches (each batch
configured with its replication origin/commit timestamp) so failure/rollback
boundaries are per-step and per-batch rather than “all operations commit
together”; reference the preserve-origin behavior and the
per-(origin,timestamp/LSN) upsert batching when rewriting the paragraph to avoid
implying a single all-or-nothing transaction.

In `@internal/cli/cli.go`:
- Around line 226-230: Update the BoolFlag with Name "preserve-origin" so its
Usage string accurately describes what is preserved: replace "node ID and LSN"
with "node ID and commit timestamp" and mention that this corresponds to the
replication origin preservation done via pg_replication_origin_session_setup /
pg_replication_origin_xact_setup; modify the Usage field in the
&cli.BoolFlag{...} for "preserve-origin" accordingly.

In `@internal/consistency/repair/table_repair.go`:
- Around line 1306-1327: When processing a batch with no origin, the code never
resets a previously set session origin, so currentSessionOrigin can leak into
subsequent transactions; update the logic around batchKey.nodeOrigin and
currentSessionOrigin to call queries.ResetReplicationOriginSession(t.Ctx, conn)
whenever batchKey.nodeOrigin == "" and currentSessionOrigin != "" (and then
clear currentSessionOrigin), mirroring the existing setup path that calls
queries.SetupReplicationOriginSession; apply the same change to the other
preserve-origin loop that contains similar handling (the block using
batchKey.nodeOrigin, currentSessionOrigin, queries.GetReplicationOriginByName,
queries.CreateReplicationOrigin, queries.SetupReplicationOriginSession).
- Around line 2163-2171: The numeric timestamp detection switch using variable n
should treat exact boundary values as the higher-precision branch: change the
comparisons in the switch (the cases checking 1e12, 1e15, 1e18 and their
negatives) to be inclusive (use >= / <=) so values exactly equal to 1e12, 1e15,
1e18 (and -1e12, -1e15, -1e18) fall into the intended higher-magnitude cases;
keep the same branch order and time.Unix multipliers so the cases for n >
1e18/1e15/1e12 become n >= 1e18/1e15/1e12 and similarly for negatives to ensure
correct parsing of boundary timestamps.

---

Outside diff comments:
In `@db/queries/templates.go`:
- Around line 1542-1550: GetSpockOriginLSNForNode currently uses LIMIT 1 with no
ordering which can return an arbitrary remote_lsn; update the SQL in the
GetSpockOriginLSNForNode template to order deterministically (e.g., add "ORDER
BY ros.remote_lsn DESC" before LIMIT 1) so the query returns the latest origin
LSN consistently, mirroring the deterministic ordering used by
GetSpockSlotLSNForNode.

In `@tests/integration/table_repair_test.go`:
- Around line 161-187: captureOutput mutates the global logger output via
pkgLogger.SetOutput(w) but never captures/restores the logger's prior writer and
is racy; modify captureOutput to first retrieve and save the current logger
writer (add a GetOutput method in pkg/logger/logger.go on the pkgLogger wrapper
to return the current io.Writer or *os.File), call pkgLogger.SetOutput(w) and
after the task restore the saved writer via pkgLogger.SetOutput(saved), and
protect SetOutput/GetOutput usage with a package-level mutex in
pkg/logger/logger.go (or document that captureOutput is not concurrency-safe) so
tests calling captureOutput cannot race when run in parallel.

---

Nitpick comments:
In `@tests/integration/table_repair_test.go`:
- Around line 1464-1477: There are two duplicate GoDoc comment blocks above the
TestTableRepair_FixNulls_PreserveOrigin test; remove the first/comment block
(the one starting at the earlier comment lines) so only a single descriptive
doc-comment remains for the TestTableRepair_FixNulls_PreserveOrigin function,
keeping the clearer wording and leaving the test body and the remaining comment
block unchanged.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cc1d1ae and 4058953.

📒 Files selected for processing (10)
  • db/queries/queries.go
  • db/queries/templates.go
  • docs/api.md
  • docs/commands/repair/table-repair.md
  • docs/http-api.md
  • internal/api/http/handler.go
  • internal/cli/cli.go
  • internal/consistency/repair/table_repair.go
  • tests/integration/helpers_test.go
  • tests/integration/table_repair_test.go
🚧 Files skipped from review as they are similar to previous changes (4)
  • internal/api/http/handler.go
  • docs/http-api.md
  • tests/integration/helpers_test.go
  • docs/api.md

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (1)
internal/consistency/repair/table_repair.go (1)

2516-2539: Past session-origin leak concern is addressed, but ResetReplicationOriginSession errors are silently discarded.

The transition to no-origin batches (lines 2536-2539) correctly resets the session origin, addressing the previously flagged issue. However, errors from ResetReplicationOriginSession at lines 2518 and 2537 are discarded. If a reset fails, the stale session origin remains active and could taint the next batch's rows.

Consider at minimum logging the error:

💡 Suggested improvement
 		if currentSessionOrigin != "" {
-			queries.ResetReplicationOriginSession(t.Ctx, conn)
+			if err := queries.ResetReplicationOriginSession(t.Ctx, conn); err != nil {
+				logger.Warn("preserve-origin: failed to reset session origin: %v", err)
+			}
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/consistency/repair/table_repair.go` around lines 2516 - 2539, The
code currently ignores errors returned by queries.ResetReplicationOriginSession
when switching replication origins (the calls surrounding currentSessionOrigin
and batchKey.nodeOrigin), which can leave a stale origin session active; capture
the error return from both ResetReplicationOriginSession calls and handle it
(either return a wrapped error from the surrounding function or at minimum log
the error via the existing logger in this context before proceeding) so failures
to reset the session are not silently discarded and can be observed; update the
ResetReplicationOriginSession invocations used with currentSessionOrigin and the
branch where batchKey.nodeOrigin == "" to check the error and act accordingly.
🧹 Nitpick comments (3)
internal/consistency/repair/table_repair.go (3)

1638-1657: Inconsistent error flow: preserve-origin upsert failure doesn't skip reporting.

When executePreserveOriginUpserts fails (line 1640), the code logs the error and appends to repairErrors but does not continue to the next node. This means lines 1659-1672 still execute, potentially writing a report entry for a partially-failed node. The standard path (line 1650-1653) rolls back and continues immediately on upsert failure.

While not strictly a bug (the outer error aggregation catches it), adding continue after the error branch would keep the flow consistent and avoid misleading report entries.

💡 Proposed fix
 			if upsertErr != nil {
 				logger.Error("executing preserve-origin upserts on node %s: %v", nodeName, upsertErr)
 				repairErrors = append(repairErrors, fmt.Sprintf("preserve-origin upsert ops failed for %s: %v", nodeName, upsertErr))
+				continue
 			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/consistency/repair/table_repair.go` around lines 1638 - 1657, The
preserve-origin branch handling executePreserveOriginUpserts currently logs and
appends to repairErrors on upsertErr but then continues executing the
success-path reporting; update the error flow so that after detecting upsertErr
from executePreserveOriginUpserts(nodeName,... ) you immediately skip further
processing for that node (i.e., add a continue) similar to the standard path
rollback/continue logic; ensure you reference upsertErr,
executePreserveOriginUpserts, nodeName, repairErrors, logger and avoid writing
to totalOps or logging a success Info for that node when upsertErr is non-nil.

2222-2233: PostgreSQL text format string may not cover all timezone offset patterns.

The format "2006-01-02 15:04:05.999999-07" handles offsets like +05 or -07 but will fail for offsets with minutes (e.g., +05:30) or UTC (+00:00). Since failure gracefully falls back to non-origin repair (line 2251), this isn't a blocker, but rows with those timezone formats would silently lose origin preservation.

Consider adding an additional fallback format like "2006-01-02 15:04:05.999999-07:00" to cover PostgreSQL's common +HH:MM output.

💡 Proposed fix
 			ts, err = time.Parse("2006-01-02 15:04:05.999999-07", v)
+			if err != nil {
+				ts, err = time.Parse("2006-01-02 15:04:05.999999-07:00", v)
+			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/consistency/repair/table_repair.go` around lines 2222 - 2233, The
string timestamp parsing in the switch case (variables ts, err and the
time.Parse calls) currently falls back to the PostgreSQL format "2006-01-02
15:04:05.999999-07" which misses timezone offsets like "+05:30"; update the
fallback sequence so after trying time.RFC3339Nano and time.RFC3339 you also
attempt parsing with the PostgreSQL/ISO style that includes minute offsets
("2006-01-02 15:04:05.999999-07:00") before the existing "-07" format, so
timestamps with "+HH:MM" or "Z" style offsets are preserved.

3004-3010: Synthetic LSN offset is architecturally safe but could benefit from clearer documentation.

The concern about replication origin progress tracking is unfounded: pg_replication_origin_xact_setup() does not validate that the LSN corresponds to actual WAL segments. Per PostgreSQL's replication origin design, it simply records the passed LSN as a progress marker when the local transaction commits. Data consistency is determined by the applied DML operations themselves, not the LSN value.

That said, the offset logic—adding commitTS.UnixMicro() % 1000000 to create synthetic LSN values—is semantically questionable because it:

  • Creates gaps in apparent LSN progression that don't reflect real WAL structure
  • Could confuse monitoring or debugging tools expecting LSN monotonicity
  • Relies on modulo arithmetic that, while unlikely to collide, weakens the "uniqueness" argument

Consider documenting why synthetic LSNs are necessary here, or using a simpler uniqueness approach (e.g., baseLSN + 1 with a per-batch counter) to avoid semantic LSN pollution.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/consistency/repair/table_repair.go` around lines 3004 - 3010, The
synthetic LSN creation using baseLSN := *fetchedLSN; offset :=
uint64(commitTS.UnixMicro() % 1000000); uniqueLSN := baseLSN + offset should be
clarified and made less semantically confusing: update the code around the
synthetic LSN generation (references: fetchedLSN, commitTS, uniqueLSN and the
call site that feeds this into pg_replication_origin_xact_setup()) to either (a)
replace the modulo timestamp offset with a deterministic per-batch monotonic
increment (e.g., baseLSN + 1 with an in-memory per-batch counter) to preserve
apparent LSN monotonicity, or (b) if keeping the timestamp offset, add a concise
comment explaining why synthetic LSNs are required, why
pg_replication_origin_xact_setup() accepts arbitrary values, and the low
collision risk; implement the chosen approach and ensure any new counter is
initialized and incremented consistently within the same batch before returning
&uniqueLSN, true, nil.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/commands/repair/table-repair.md`:
- Around line 90-95: Update the step list to explicitly state that the
replication origin session is reset after the transaction commit: when ACE
"Executes the repairs" it should then call tx.Commit(), and only after a
successful commit call resetReplicationOriginSession for the origin session (the
previous bug occurred when resetReplicationOriginSession ran before
tx.Commit()). Mention the ordering explicitly and reference the replication
origin/session and origin LSN/timestamp steps to make the correct sequence
clear.
- Around line 102-105: Update the "Timestamp precision" paragraph to clarify
that RFC3339Nano is Go's format name while PostgreSQL timestamptz stores
microsecond precision; specifically, edit the text under the "Timestamp
precision" section (the line mentioning "RFC3339Nano format") to add a
parenthetical like "(RFC3339Nano is Go's format name; Postgres timestamptz
stores up to microsecond precision)" so the example timestamp and surrounding
bullets accurately reflect microsecond-level storage and avoid implying
nanosecond storage.
- Line 33: Update the `--preserve-origin` option text to consistently reflect
that LSN availability is expected but not strictly required: change "Requires
LSN to be available from a survivor node." to "LSN should be available from a
survivor node; if absent, the repair will gracefully fall back to using the
repair node's origin/LSN behavior." Also replace the ambiguous phrase "instead
of the local node ID" with "instead of the repair node's replication origin ID",
and make the same "should be available" wording in the requirements section (the
paragraph describing LSN availability) so both the options table
(`--preserve-origin`) and the requirements text use identical, fallback-friendly
phrasing.

In `@internal/consistency/repair/table_repair.go`:
- Around line 1255-1263: The preserve-origin "fix-nulls" path is missing a check
that colTypes[col] exists before using it, causing an empty colType (and invalid
SQL in buildFixNullsBatchSQL); modify the loop over columns so after obtaining
colType := colTypes[col] you validate it's non-empty and return an error
(mirroring the standard path's behavior around the earlier check), so
buildFixNullsBatchSQL never receives an empty type for a column.
- Around line 3013-3093: The three wrapper methods
setupReplicationOriginSession, resetReplicationOriginXact, and
resetReplicationOriginSession are unused and should be removed to reduce dead
code; delete the functions setupReplicationOriginSession (which wraps
GetReplicationOriginByName/CreateReplicationOrigin/SetupReplicationOriginSession),
resetReplicationOriginXact (which wraps ResetReplicationOriginXact), and
resetReplicationOriginSession (which wraps ResetReplicationOriginSession) from
internal/consistency/repair/table_repair.go, and run a quick project-wide search
to ensure no callers reference those symbols before committing.

---

Duplicate comments:
In `@internal/consistency/repair/table_repair.go`:
- Around line 2516-2539: The code currently ignores errors returned by
queries.ResetReplicationOriginSession when switching replication origins (the
calls surrounding currentSessionOrigin and batchKey.nodeOrigin), which can leave
a stale origin session active; capture the error return from both
ResetReplicationOriginSession calls and handle it (either return a wrapped error
from the surrounding function or at minimum log the error via the existing
logger in this context before proceeding) so failures to reset the session are
not silently discarded and can be observed; update the
ResetReplicationOriginSession invocations used with currentSessionOrigin and the
branch where batchKey.nodeOrigin == "" to check the error and act accordingly.

---

Nitpick comments:
In `@internal/consistency/repair/table_repair.go`:
- Around line 1638-1657: The preserve-origin branch handling
executePreserveOriginUpserts currently logs and appends to repairErrors on
upsertErr but then continues executing the success-path reporting; update the
error flow so that after detecting upsertErr from
executePreserveOriginUpserts(nodeName,... ) you immediately skip further
processing for that node (i.e., add a continue) similar to the standard path
rollback/continue logic; ensure you reference upsertErr,
executePreserveOriginUpserts, nodeName, repairErrors, logger and avoid writing
to totalOps or logging a success Info for that node when upsertErr is non-nil.
- Around line 2222-2233: The string timestamp parsing in the switch case
(variables ts, err and the time.Parse calls) currently falls back to the
PostgreSQL format "2006-01-02 15:04:05.999999-07" which misses timezone offsets
like "+05:30"; update the fallback sequence so after trying time.RFC3339Nano and
time.RFC3339 you also attempt parsing with the PostgreSQL/ISO style that
includes minute offsets ("2006-01-02 15:04:05.999999-07:00") before the existing
"-07" format, so timestamps with "+HH:MM" or "Z" style offsets are preserved.
- Around line 3004-3010: The synthetic LSN creation using baseLSN :=
*fetchedLSN; offset := uint64(commitTS.UnixMicro() % 1000000); uniqueLSN :=
baseLSN + offset should be clarified and made less semantically confusing:
update the code around the synthetic LSN generation (references: fetchedLSN,
commitTS, uniqueLSN and the call site that feeds this into
pg_replication_origin_xact_setup()) to either (a) replace the modulo timestamp
offset with a deterministic per-batch monotonic increment (e.g., baseLSN + 1
with an in-memory per-batch counter) to preserve apparent LSN monotonicity, or
(b) if keeping the timestamp offset, add a concise comment explaining why
synthetic LSNs are required, why pg_replication_origin_xact_setup() accepts
arbitrary values, and the low collision risk; implement the chosen approach and
ensure any new counter is initialized and incremented consistently within the
same batch before returning &uniqueLSN, true, nil.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4058953 and bcab7c0.

📒 Files selected for processing (3)
  • docs/commands/repair/table-repair.md
  • internal/cli/cli.go
  • internal/consistency/repair/table_repair.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/cli/cli.go

@ibrarahmad ibrarahmad added enhancement New feature or request labels Feb 25, 2026
Ibrar Ahmed and others added 5 commits February 25, 2026 11:15
This commit introduces logic to preserve replication origin metadata
and commit timestamps during table repair operations. When enabled,
repairs now maintain the original source node and timestamp information
instead of marking repaired rows with current repair-time metadata.

The implementation extracts origin information from source row metadata
and applies it during repair using PostgreSQL's replication origin API.
Timestamp parsing supports multiple formats including RFC3339, PostgreSQL
text format, and numeric Unix timestamps with automatic scale detection
(seconds, milliseconds, microseconds, nanoseconds). Error handling
ensures proper session cleanup and graceful degradation when origin
metadata is unavailable.

Origin preservation works across repair modes including standard repair,
fix-nulls updates, and bidirectional synchronization. New integration
tests verify correct preservation of timestamps, origin node IDs, and
row content integrity throughout the repair process.
Three bugs prevented preserve-origin from working end-to-end:

1. GetSpockOriginLSNForNode used a broken LIKE pattern that never matched
   actual spock origin names. Replaced with JOINs through spock.subscription
   and spock.node, matching the approach used by GetSpockSlotLSNForNode.

2. executeUpserts called resetReplicationOriginSession before tx.Commit(),
   clearing the origin from the WAL commit record (roident=0).

3. pg_replication_origin_xact_setup is per-transaction, so all rows in a
   single transaction received the same timestamp. Added
   executePreserveOriginUpserts which uses a dedicated connection and
   separate transactions per (origin, timestamp) batch key, preserving
   per-row timestamps.

When PreserveOrigin is active, deletes now commit in a separate transaction
before the per-batch-key upsert transactions. The standard (non-preserve)
path is unchanged.

Also adds TestTableRepair_MixedOps_PreserveOrigin integration test covering
DELETE+INSERT+UPDATE with origin/timestamp verification, and fixes the
getReplicationOrigin test helper to use spock.xact_commit_timestamp_origin.
Skip FixNulls test (UPDATE doesn't preserve origin). Fix Bidirectional
test (proper transaction isolation for divergence creation).
@mason-sharp mason-sharp force-pushed the PRESERVE-TIME-ORIGINID branch from bcab7c0 to baeb117 Compare February 25, 2026 19:25
@mason-sharp
Copy link
Member

I did a rebase. I squashed a couple of small commits at the same time.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
tests/integration/table_repair_test.go (1)

161-186: ⚠️ Potential issue | 🟠 Major

Make output redirection fail-safe with deferred restoration.

If task() exits early (e.g., require.* triggers FailNow), stdout/stderr and pkgLogger can remain redirected and leak into other tests.

Proposed fix
 func captureOutput(t *testing.T, task func()) string {
 	t.Helper()
 	oldStdout := os.Stdout
 	oldStderr := os.Stderr
 	r, w, err := os.Pipe()
 	require.NoError(t, err)
+	defer func() {
+		_ = r.Close()
+		_ = w.Close()
+		os.Stdout = oldStdout
+		os.Stderr = oldStderr
+		pkgLogger.SetOutput(oldStderr)
+	}()

 	os.Stdout = w
 	os.Stderr = w

 	// Redirect package logger to capture WARN logs
 	pkgLogger.SetOutput(w)

 	task()

 	err = w.Close()
 	require.NoError(t, err)
-	os.Stdout = oldStdout
-	os.Stderr = oldStderr
-
-	// Restore package logger
-	pkgLogger.SetOutput(oldStderr)

 	var buf bytes.Buffer
 	_, err = io.Copy(&buf, r)
 	require.NoError(t, err)
 	return buf.String()
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration/table_repair_test.go` around lines 161 - 186, In
captureOutput, make the stdout/stderr and pkgLogger redirection fail-safe by
moving restoration and resource cleanup into defers: after creating r,w replace
os.Stdout, os.Stderr and pkgLogger.SetOutput(w) and immediately defer restoring
the originals via pkgLogger.SetOutput(oldStderr) and setting os.Stdout/os.Stderr
back to oldStdout/oldStderr; also defer closing w and copying from r into the
buffer (or at minimum defer w.Close() and ensure io.Copy(&buf, r) happens after
restoration) so that if task() calls FailNow/returns early the pipe is closed
and outputs are restored. Reference: captureOutput function, variables
oldStdout/oldStderr, r/w, pkgLogger.SetOutput, task().
♻️ Duplicate comments (3)
docs/commands/repair/table-repair.md (1)

33-33: ⚠️ Potential issue | 🟡 Minor

Align --preserve-origin option text with the documented fallback behavior.

The option table still states a hard requirement for LSN availability, but the later requirements section documents graceful fallback when LSN is unavailable.

📝 Proposed wording update
-| `--preserve-origin` |  | Preserve replication origin node ID and LSN with per-row timestamp accuracy for repaired rows. When enabled, repaired rows will have commits with the original node's origin ID and exact commit timestamp (microsecond precision) instead of the local node ID. Requires LSN to be available from a survivor node. | `false` |
+| `--preserve-origin` |  | Preserve replication origin node ID and commit timestamp for repaired rows. When enabled, repaired rows retain the original node origin and commit timestamp (microsecond precision) when metadata is available. LSN should be available from a survivor node; if unavailable, affected rows fall back to regular repair behavior. | `false` |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/commands/repair/table-repair.md` at line 33, Update the
`--preserve-origin` option text to match the documented fallback behavior:
change the hard requirement phrasing that "Requires LSN to be available from a
survivor node" to indicate LSN is used when available but the command will
gracefully fall back to using the local node ID and a new commit timestamp if
LSN is not available; keep the description that when available it preserves
origin node ID and exact commit timestamp (microsecond precision). Reference the
`--preserve-origin` option name when making the edit.
internal/consistency/repair/table_repair.go (1)

1255-1257: ⚠️ Potential issue | 🟠 Major

Validate colTypes[col] before building preserve-origin fix-nulls batches.

colType := colTypes[col] can be empty when the key is absent, which leads to invalid cast SQL downstream.

🛡️ Proposed fix
 	for _, col := range columns {
-		colType := colTypes[col]
+		colType, ok := colTypes[col]
+		if !ok || strings.TrimSpace(colType) == "" {
+			return totalUpdated, fmt.Errorf("column type for %s not found in preserve-origin fix-nulls path on %s", col, nodeName)
+		}
 		var rowsForCol []*nullUpdate
 		for _, nu := range columnsByRow[col] {
 			rowsForCol = append(rowsForCol, nu)
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/consistency/repair/table_repair.go` around lines 1255 - 1257, The
loop uses colType := colTypes[col] without checking the map for the key, which
can produce an empty colType and generate invalid cast SQL; update the code in
the columns loop (the variables columns, colTypes, and colType) to check the map
membership (e.g., ok := colTypes[col]) before using colType, and if the key is
missing either skip that column (continue) or return an explicit error/log
message; ensure this validation is applied before creating rowsForCol or
building preserve-origin fix-nulls batches that rely on nullUpdate and colType.
internal/consistency/repair/table_repair_batch_test.go (1)

219-220: ⚠️ Potential issue | 🟡 Minor

Correct the Unix-seconds timestamp annotation (1704067200).

The inline documentation says 12:00:00 UTC, but 1704067200 corresponds to 2024-01-01 00:00:00 UTC.

Proposed correction
-	// Unix seconds: 1704067200 = 2024-01-01 12:00:00 UTC
+	// Unix seconds: 1704067200 = 2024-01-01 00:00:00 UTC
...
-				"commit_ts":   int64(1704067200), // 2024-01-01 12:00:00 UTC
+				"commit_ts":   int64(1704067200), // 2024-01-01 00:00:00 UTC

Also applies to: 261-262

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/consistency/repair/table_repair_batch_test.go` around lines 219 -
220, The inline timestamp comment for the time.Unix call is incorrect: the value
1704067200 yields 2024-01-01 00:00:00 UTC, not 12:00:00 UTC; update the comment
next to the sec := time.Unix(1704067200, 0) expression (and the other occurrence
around the sec variable at the later occurrence) to reflect "00:00:00 UTC" or,
if you intended noon, replace the numeric Unix seconds with the correct value
for 2024-01-01 12:00:00 UTC so the sec variable's value and its comment match.
🧹 Nitpick comments (1)
pkg/logger/logger.go (1)

31-33: Use io.Writer for SetOutput instead of *os.File.

The wrapper currently uses a narrower type than the upstream charmbracelet/log API, which accepts io.Writer. This prevents valid outputs like bytes.Buffer and other writer-backed test sinks.

♻️ Proposed change
 import (
 	"fmt"
+	"io"
 	"os"
 )
 
-func SetOutput(w *os.File) {
+func SetOutput(w io.Writer) {
 	Log.SetOutput(w)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/logger/logger.go` around lines 31 - 33, The SetOutput wrapper currently
accepts *os.File which is too narrow; change the function signature of SetOutput
to accept io.Writer (import "io"), update its parameter name if needed, and call
Log.SetOutput(w) as before so existing *os.File callers still work; ensure any
call sites that relied on *os.File continue to compile (no changes needed) and
run tests to verify writer-backed sinks like bytes.Buffer can be passed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/consistency/repair/table_repair.go`:
- Around line 1294-1296: The calls to
queries.ResetReplicationOriginSession(t.Ctx, conn) currently ignore errors and
may let a stale replication origin remain on a pooled connection; update each
occurrence (including inside executePreserveOriginFixNulls and
executePreserveOriginUpserts and the other noted locations) to check the
returned error, log it with context (e.g., batch/shard/txn identifiers), and
prevent returning a tainted connection to the pool — typically by calling
conn.Close() (or otherwise marking it unusable) when
ResetReplicationOriginSession returns an error instead of letting conn.Release()
reuse it; ensure the existing conn.Release() or defer behavior is adjusted so
Release is skipped after a Close, and apply this same pattern at all listed
callsites.

In `@tests/integration/table_repair_test.go`:
- Around line 1430-1443: The transaction started with pgCluster.Node2Pool.Begin
(tx) around the DELETE loop needs a deferred rollback to ensure the tx is
cleaned up on test assertion failures; after obtaining tx, add a defer that
attempts tx.Rollback(ctx) unless the transaction has already been
committed/closed so open transactions are not left behind (apply the same
pattern to the other Begin blocks referenced—those using tx, qualifiedTableName,
and sampleIDs at the other locations). Ensure the defer runs immediately after
Begin and does not mask commit errors (i.e., it should ignore
ErrTxClosed/“already closed” semantics and allow commit errors to surface).

---

Outside diff comments:
In `@tests/integration/table_repair_test.go`:
- Around line 161-186: In captureOutput, make the stdout/stderr and pkgLogger
redirection fail-safe by moving restoration and resource cleanup into defers:
after creating r,w replace os.Stdout, os.Stderr and pkgLogger.SetOutput(w) and
immediately defer restoring the originals via pkgLogger.SetOutput(oldStderr) and
setting os.Stdout/os.Stderr back to oldStdout/oldStderr; also defer closing w
and copying from r into the buffer (or at minimum defer w.Close() and ensure
io.Copy(&buf, r) happens after restoration) so that if task() calls
FailNow/returns early the pipe is closed and outputs are restored. Reference:
captureOutput function, variables oldStdout/oldStderr, r/w, pkgLogger.SetOutput,
task().

---

Duplicate comments:
In `@docs/commands/repair/table-repair.md`:
- Line 33: Update the `--preserve-origin` option text to match the documented
fallback behavior: change the hard requirement phrasing that "Requires LSN to be
available from a survivor node" to indicate LSN is used when available but the
command will gracefully fall back to using the local node ID and a new commit
timestamp if LSN is not available; keep the description that when available it
preserves origin node ID and exact commit timestamp (microsecond precision).
Reference the `--preserve-origin` option name when making the edit.

In `@internal/consistency/repair/table_repair_batch_test.go`:
- Around line 219-220: The inline timestamp comment for the time.Unix call is
incorrect: the value 1704067200 yields 2024-01-01 00:00:00 UTC, not 12:00:00
UTC; update the comment next to the sec := time.Unix(1704067200, 0) expression
(and the other occurrence around the sec variable at the later occurrence) to
reflect "00:00:00 UTC" or, if you intended noon, replace the numeric Unix
seconds with the correct value for 2024-01-01 12:00:00 UTC so the sec variable's
value and its comment match.

In `@internal/consistency/repair/table_repair.go`:
- Around line 1255-1257: The loop uses colType := colTypes[col] without checking
the map for the key, which can produce an empty colType and generate invalid
cast SQL; update the code in the columns loop (the variables columns, colTypes,
and colType) to check the map membership (e.g., ok := colTypes[col]) before
using colType, and if the key is missing either skip that column (continue) or
return an explicit error/log message; ensure this validation is applied before
creating rowsForCol or building preserve-origin fix-nulls batches that rely on
nullUpdate and colType.

---

Nitpick comments:
In `@pkg/logger/logger.go`:
- Around line 31-33: The SetOutput wrapper currently accepts *os.File which is
too narrow; change the function signature of SetOutput to accept io.Writer
(import "io"), update its parameter name if needed, and call Log.SetOutput(w) as
before so existing *os.File callers still work; ensure any call sites that
relied on *os.File continue to compile (no changes needed) and run tests to
verify writer-backed sinks like bytes.Buffer can be passed.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bcab7c0 and baeb117.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (15)
  • db/queries/queries.go
  • db/queries/templates.go
  • docs/api.md
  • docs/commands/repair/table-repair.md
  • docs/http-api.md
  • go.mod
  • internal/api/http/handler.go
  • internal/cli/cli.go
  • internal/consistency/repair/table_repair.go
  • internal/consistency/repair/table_repair_batch_test.go
  • pkg/logger/logger.go
  • tests/integration/crash_recovery_test.go
  • tests/integration/helpers_test.go
  • tests/integration/table_repair_test.go
  • tests/integration/timestamp_comparison_test.go
🚧 Files skipped from review as they are similar to previous changes (8)
  • docs/http-api.md
  • internal/api/http/handler.go
  • tests/integration/timestamp_comparison_test.go
  • go.mod
  • tests/integration/helpers_test.go
  • docs/api.md
  • tests/integration/crash_recovery_test.go
  • db/queries/templates.go

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
internal/consistency/repair/table_repair.go (2)

1259-1261: ⚠️ Potential issue | 🟡 Minor

Validate column type existence in preserve-origin fix-nulls batching.

Line 1260 reads colTypes[col] without checking presence. Missing entries can produce invalid cast SQL ($n::).

🛡️ Proposed fix
-		colType := colTypes[col]
+		colType, ok := colTypes[col]
+		if !ok || strings.TrimSpace(colType) == "" {
+			return 0, fmt.Errorf("column type for %s not found in preserve-origin fix-nulls path", col)
+		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/consistency/repair/table_repair.go` around lines 1259 - 1261, The
loop over columns uses colTypes[col] without verifying the map contains the key,
which can produce an empty cast in generated SQL; update the code in the loop
that iterates over columns (the block referencing columns, colTypes, rowsForCol
and nullUpdate) to first check for existence (e.g., typ, ok := colTypes[col])
and if missing either skip the column or handle it explicitly (log/warn and
continue) so you never use an empty typ value when constructing cast SQL
(`$n::`), ensuring downstream SQL building always receives a valid type string.

1297-1333: ⚠️ Potential issue | 🟠 Major

Handle ResetReplicationOriginSession failures before releasing pooled connections.

Line 1298, Line 1313, Line 1332, Line 2507, Line 2522, and Line 2541 ignore reset errors. If reset fails, the connection may carry stale origin state into later work.

🔧 Proposed fix pattern
-	defer func() {
-		queries.ResetReplicationOriginSession(t.Ctx, conn)
-		conn.Release()
-	}()
+	defer func() {
+		if err := queries.ResetReplicationOriginSession(t.Ctx, conn); err != nil {
+			logger.Warn("preserve-origin: failed to reset replication origin session on %s: %v; closing connection", nodeName, err)
+			conn.Conn().Close(t.Ctx)
+			return
+		}
+		conn.Release()
+	}()
@@
-				queries.ResetReplicationOriginSession(t.Ctx, conn)
+				if err := queries.ResetReplicationOriginSession(t.Ctx, conn); err != nil {
+					batchTx.Rollback(t.Ctx)
+					return totalUpdated, fmt.Errorf("failed to reset origin session before switching origin: %w", err)
+				}
@@
-			queries.ResetReplicationOriginSession(t.Ctx, conn)
+			if err := queries.ResetReplicationOriginSession(t.Ctx, conn); err != nil {
+				batchTx.Rollback(t.Ctx)
+				return totalUpdated, fmt.Errorf("failed to reset origin session for no-origin batch: %w", err)
+			}

Also applies to: 2505-2542

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/consistency/repair/table_repair.go` around lines 1297 - 1333, The
code ignores errors returned by queries.ResetReplicationOriginSession before
releasing pooled connections, which can leave a connection with stale origin
state; update all places using ResetReplicationOriginSession (e.g., the deferred
cleanup block and the branch that clears currentSessionOrigin) to check its
error return and handle it (return an error or wrap and return it with context)
before calling conn.Release; locate calls to ResetReplicationOriginSession in
table_repair.go (near the loop handling batchGroups and the defer) and change
them to capture the error, wrap it with context (including currentSessionOrigin
or relevant originName) and return totalUpdated plus that error so the caller
can handle the failed reset rather than silently proceeding.
🧹 Nitpick comments (1)
tests/integration/table_repair_test.go (1)

170-186: Add deferred restoration for panic safety in captureOutput.

The logger and stdout/stderr restoration (lines 175-181) happens after task() completes normally. If task() panics, the restoration code won't execute, leaving the package logger redirected to a closed pipe for subsequent tests.

♻️ Proposed fix using defer for restoration
 func captureOutput(t *testing.T, task func()) string {
 	t.Helper()
 	oldStdout := os.Stdout
 	oldStderr := os.Stderr
 	r, w, err := os.Pipe()
 	require.NoError(t, err)
 	os.Stdout = w
 	os.Stderr = w
 
 	// Redirect package logger to capture WARN logs
 	pkgLogger.SetOutput(w)
+	defer pkgLogger.SetOutput(oldStderr)
 
+	defer func() {
+		os.Stdout = oldStdout
+		os.Stderr = oldStderr
+	}()
+
 	task()
 
 	err = w.Close()
 	require.NoError(t, err)
-	os.Stdout = oldStdout
-	os.Stderr = oldStderr
-
-	// Restore package logger
-	pkgLogger.SetOutput(oldStderr)
 
 	var buf bytes.Buffer
 	_, err = io.Copy(&buf, r)
 	require.NoError(t, err)
 	return buf.String()
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration/table_repair_test.go` around lines 170 - 186, captureOutput
currently redirects pkgLogger and os.Stdout/os.Stderr but only restores them
after calling task(), so a panic in task() leaves the logger pointing at a
closed pipe; fix this by performing the redirections first then immediately
deferring restoration and pipe closure (restore pkgLogger.SetOutput(oldStderr),
os.Stdout=os.Stderr=oldStdout/oldStderr, and close w and r) so they run even on
panic; reference the captureOutput function, the pkgLogger variable, the task()
invocation, and the oldStdout/oldStderr, w and r variables when making the
change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/consistency/mtree/merkle.go`:
- Around line 1353-1356: numBlocks can be left as 0 when blockRanges is
populated from m.RangesFile because the current correction only runs for the
computed-path branch; update the logic that sets metadata to always derive
numBlocks from the actual blockRanges slice (regardless of whether it came from
the computed path or m.RangesFile) by assigning numBlocks = len(blockRanges)
whenever blockRanges is non-nil/has length before writing metadata—look for the
variables numBlocks, blockRanges and the code paths that populate blockRanges
(e.g., the m.RangesFile handling) and ensure the metadata write uses the
corrected numBlocks.

In `@internal/consistency/repair/table_repair.go`:
- Around line 2574-2575: executePreserveOriginUpserts currently calls
executeUpsertBatch with originUpserts directly and thus skips stale-write
filtering; update executePreserveOriginUpserts to run the originUpserts through
filterStaleRepairs (the same filtering used by the standard upsert path) before
calling executeUpsertBatch so stale repairs are removed, while still preserving
origin metadata; specifically, locate executePreserveOriginUpserts and ensure it
invokes filterStaleRepairs(origins/upserts, ...) or reuses the same filtering
helper that the normal upsert flow uses, then pass the filtered slice into
executeUpsertBatch (keeping any flags/fields needed to mark preserved-origin
writes).

---

Duplicate comments:
In `@internal/consistency/repair/table_repair.go`:
- Around line 1259-1261: The loop over columns uses colTypes[col] without
verifying the map contains the key, which can produce an empty cast in generated
SQL; update the code in the loop that iterates over columns (the block
referencing columns, colTypes, rowsForCol and nullUpdate) to first check for
existence (e.g., typ, ok := colTypes[col]) and if missing either skip the column
or handle it explicitly (log/warn and continue) so you never use an empty typ
value when constructing cast SQL (`$n::`), ensuring downstream SQL building
always receives a valid type string.
- Around line 1297-1333: The code ignores errors returned by
queries.ResetReplicationOriginSession before releasing pooled connections, which
can leave a connection with stale origin state; update all places using
ResetReplicationOriginSession (e.g., the deferred cleanup block and the branch
that clears currentSessionOrigin) to check its error return and handle it
(return an error or wrap and return it with context) before calling
conn.Release; locate calls to ResetReplicationOriginSession in table_repair.go
(near the loop handling batchGroups and the defer) and change them to capture
the error, wrap it with context (including currentSessionOrigin or relevant
originName) and return totalUpdated plus that error so the caller can handle the
failed reset rather than silently proceeding.

---

Nitpick comments:
In `@tests/integration/table_repair_test.go`:
- Around line 170-186: captureOutput currently redirects pkgLogger and
os.Stdout/os.Stderr but only restores them after calling task(), so a panic in
task() leaves the logger pointing at a closed pipe; fix this by performing the
redirections first then immediately deferring restoration and pipe closure
(restore pkgLogger.SetOutput(oldStderr),
os.Stdout=os.Stderr=oldStdout/oldStderr, and close w and r) so they run even on
panic; reference the captureOutput function, the pkgLogger variable, the task()
invocation, and the oldStdout/oldStderr, w and r variables when making the
change.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between baeb117 and c8254ef.

📒 Files selected for processing (7)
  • internal/consistency/mtree/merkle.go
  • internal/consistency/repair/table_repair.go
  • tests/integration/cdc_busy_table_test.go
  • tests/integration/main_test.go
  • tests/integration/merkle_tree_test.go
  • tests/integration/table_diff_test.go
  • tests/integration/table_repair_test.go

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants