Skip to content

feat: Single-pass WAL streaming for LOG_BASED replication#772

Open
bdewilde wants to merge 19 commits into
MeltanoLabs:mainfrom
bdewilde:single-pass-wal-streaming
Open

feat: Single-pass WAL streaming for LOG_BASED replication#772
bdewilde wants to merge 19 commits into
MeltanoLabs:mainfrom
bdewilde:single-pass-wal-streaming

Conversation

@bdewilde
Copy link
Copy Markdown

@bdewilde bdewilde commented Apr 27, 2026

problem

PostgresLogBasedStream.get_records() opens its own LogicalReplicationConnection per selected stream. With N LOG_BASED streams the tap runs N sequential WAL scans -- each rereads the same segments, with add-tables discarding most records server-side. End-to-end sync time scales ~linearly in N. For pipelines with multiple LOG_BASED streams against a large backlog, this dominates run-time.

changes

A new SingleConnectionWALReader opens one logical replication connection with add-tables covering all selected LOG_BASED tables, scans the WAL once, and dispatches each parsed wal2json message inline to the owning stream's new emit_record() method for immediate Singer RECORD emission. STATE flushes every 30s, and the slot is advanced to the WAL tip on idle/max-run exit.

  • new modules: _wal_helpers.py (FQN/escaping/parsing helpers) and wal_reader.py (the reader and read loop).
  • client.py gains emit_record method and a config-flag branch in get_records
  • tap.py adds the log_based_single_connection config setting (default False!) and _sync_log_based_streams_shared orchestration
  • new tests: tests/test_wal_helpers.py, tests/test_wal_reader.py, tests/test_consume.py

Full disclosure, I had Claude Code implement those three test modules, and then I iterated a bit. If it's still excessive / not testing usefully -- something Claude is known to do, sigh -- just let me know, and I will take a hatchet to it.

This is a very belated follow-up to PR #667 and Issue #587.

constraints

  • Tap.sync_all is @typing.final, so dispatch can't be restructured at the SDK boundary -- this was a bummer. My next best option was trigger at the first LOG_BASED stream's get_records() call, gated by a _shared_wal_run_completed flag on the tap so siblings become no-ops.
  • SCHEMA-before-RECORD across streams: _sync_log_based_streams_shared pre-writes every stream's schema before the reader runs. Since the SDK's Stream.sync() later calls _write_schema_message() again, the override on PostgresLogBasedStream is idempotent. (Without that flag every SCHEMA would be emitted twice, which is not great.)
  • Per-stream LSN filter: Replication opens at min(start_lsn) across all streams, so each stream's own bookmark is captured at construction and used to drop messages that it's already past.
  • I had to dip into private SDK calls -- _write_record_message() and _increment_stream_state() -- for this to work. I consolidated them in one place -- emit_record() -- so SDK renames hit one method. Not sure how stable the API is here...

questions

  1. _write_schema_message idempotency: I made the smallest fix I could for the duplicate-SCHEMA bug given the @final constraint on sync_all. Is there another / cleaner approach?
  2. emit_record() uses internal SDK calls. Is this going to be an issue? Is there a safer / "public" equivalent?
  3. get_records() as trigger: The "first stream's get_records fires the shared reader" pattern is a not-great workaround for sync_all being final. Is it okay as documented, or is there a cleaner SDK hook?
  4. replication_max_run_seconds / replication_idle_exit_seconds now bound the whole LOG_BASED batch instead of each stream. To me that feels like an improvement, but I don't know the whole system / downstream use caess. For example, does anything downstream assume per-stream bounds?

@edgarrmondragon
Copy link
Copy Markdown
Member

Thanks @bdewilde!

There are some typing errors. I might take a longer look later in the week.

@edgarrmondragon edgarrmondragon changed the title Single-pass WAL streaming for LOG_BASED replication? feat: Single-pass WAL streaming for LOG_BASED replication May 4, 2026
@edgarrmondragon edgarrmondragon self-assigned this May 4, 2026
@edgarrmondragon edgarrmondragon added the enhancement New feature or request label May 4, 2026
@bdewilde
Copy link
Copy Markdown
Author

bdewilde commented May 4, 2026

Thanks @bdewilde!

There are some typing errors. I might take a longer look later in the week.

Hi Edgar, thanks in advance for giving it a longer look! My bad for the typing errors -- if you can point me at them (without having to do a whole review ;), I can try to fix them sometime this week. And apologies for the test failures. I wasn't ever able to get the full unit test suite running green in my local dev, but all of the new tests I added did pass for me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants