feat: Single-pass WAL streaming for LOG_BASED replication#772
Open
bdewilde wants to merge 19 commits into
Open
feat: Single-pass WAL streaming for LOG_BASED replication#772bdewilde wants to merge 19 commits into
bdewilde wants to merge 19 commits into
Conversation
Member
|
Thanks @bdewilde! There are some typing errors. I might take a longer look later in the week. |
Author
Hi Edgar, thanks in advance for giving it a longer look! My bad for the typing errors -- if you can point me at them (without having to do a whole review ;), I can try to fix them sometime this week. And apologies for the test failures. I wasn't ever able to get the full unit test suite running green in my local dev, but all of the new tests I added did pass for me. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
problem
PostgresLogBasedStream.get_records()opens its ownLogicalReplicationConnectionper selected stream. With N LOG_BASED streams the tap runs N sequential WAL scans -- each rereads the same segments, with add-tables discarding most records server-side. End-to-end sync time scales ~linearly in N. For pipelines with multiple LOG_BASED streams against a large backlog, this dominates run-time.changes
A new
SingleConnectionWALReaderopens one logical replication connection withadd-tablescovering all selected LOG_BASED tables, scans the WAL once, and dispatches each parsed wal2json message inline to the owning stream's newemit_record()method for immediate Singer RECORD emission. STATE flushes every 30s, and the slot is advanced to the WAL tip on idle/max-run exit._wal_helpers.py(FQN/escaping/parsing helpers) andwal_reader.py(the reader and read loop).client.pygainsemit_recordmethod and a config-flag branch inget_recordstap.pyadds thelog_based_single_connectionconfig setting (default False!) and_sync_log_based_streams_sharedorchestrationtests/test_wal_helpers.py,tests/test_wal_reader.py,tests/test_consume.pyFull disclosure, I had Claude Code implement those three test modules, and then I iterated a bit. If it's still excessive / not testing usefully -- something Claude is known to do, sigh -- just let me know, and I will take a hatchet to it.
This is a very belated follow-up to PR #667 and Issue #587.
constraints
Tap.sync_allis@typing.final, so dispatch can't be restructured at the SDK boundary -- this was a bummer. My next best option was trigger at the first LOG_BASED stream'sget_records()call, gated by a_shared_wal_run_completedflag on the tap so siblings become no-ops._sync_log_based_streams_sharedpre-writes every stream's schema before the reader runs. Since the SDK'sStream.sync()later calls_write_schema_message()again, the override onPostgresLogBasedStreamis idempotent. (Without that flag every SCHEMA would be emitted twice, which is not great.)min(start_lsn)across all streams, so each stream's own bookmark is captured at construction and used to drop messages that it's already past._write_record_message()and_increment_stream_state()-- for this to work. I consolidated them in one place --emit_record()-- so SDK renames hit one method. Not sure how stable the API is here...questions
_write_schema_messageidempotency: I made the smallest fix I could for the duplicate-SCHEMA bug given the@finalconstraint onsync_all. Is there another / cleaner approach?emit_record()uses internal SDK calls. Is this going to be an issue? Is there a safer / "public" equivalent?get_records()as trigger: The "first stream's get_records fires the shared reader" pattern is a not-great workaround forsync_allbeing final. Is it okay as documented, or is there a cleaner SDK hook?replication_max_run_seconds/replication_idle_exit_secondsnow bound the whole LOG_BASED batch instead of each stream. To me that feels like an improvement, but I don't know the whole system / downstream use caess. For example, does anything downstream assume per-stream bounds?