From af776f81d5e40f1f700c0ab568115540dd7e295e Mon Sep 17 00:00:00 2001 From: Kashif Sohail Date: Tue, 10 Mar 2026 22:13:48 +0500 Subject: [PATCH 1/4] Fix: LOG_BASED replication bookmark not advancing between syncs --- tap_postgres/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tap_postgres/client.py b/tap_postgres/client.py index adaa086d..390ab1e6 100644 --- a/tap_postgres/client.py +++ b/tap_postgres/client.py @@ -213,6 +213,7 @@ class PostgresLogBasedStream(SQLStream): TYPE_CONFORMANCE_LEVEL = TypeConformanceLevel.ROOT_ONLY replication_key = "_sdc_lsn" + is_sorted = True _WAL2JSON_ENUM_QUOTE_RE = re.compile(r'"type":""([^"]+)""') From cc2b9d79a2eb5b1849ac060b62fe71a29c0a4a72 Mon Sep 17 00:00:00 2001 From: Kashif Sohail Date: Wed, 11 Mar 2026 22:10:41 +0500 Subject: [PATCH 2/4] Fix InvalidStreamSortException in LOG_BASED replication --- tap_postgres/client.py | 45 ++++++++++++++++++------------------------ 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/tap_postgres/client.py b/tap_postgres/client.py index 390ab1e6..c2ddf3fd 100644 --- a/tap_postgres/client.py +++ b/tap_postgres/client.py @@ -17,7 +17,6 @@ import sqlalchemy as sa import sqlalchemy.types from psycopg2 import extras -from singer_sdk.helpers._state import increment_state from singer_sdk.helpers._typing import TypeConformanceLevel from singer_sdk.sql import SQLConnector, SQLStream from singer_sdk.sql.connector import SQLToJSONSchema @@ -261,34 +260,28 @@ def _increment_stream_state( *, context: Context | None = None, ) -> None: - """Update state of stream or partition with data from the provided record. + """Update state bookmark with max-forward-only LSN advancement. - The default implementation does not advance any bookmarks unless - `self.replication_method == 'INCREMENTAL'`. For us, `self.replication_method == - 'LOG_BASED'`, so an override is required. + The base class only advances bookmarks for INCREMENTAL streams. + LOG_BASED streams need this override to track the replication position. + + WAL records are delivered in LSN order overall, but records near + transaction boundaries can arrive with LSN values slightly below the + stored bookmark (e.g. when start_replication resumes mid-transaction). + We silently skip those rather than crashing with InvalidStreamSortException. """ - # This also creates a state entry if one does not yet exist: - state_dict = self.get_context_state(context) + if not latest_record or not self.replication_key: + return - # Advance state bookmark values if applicable - if latest_record: # This is the only line that has been overridden. - if not self.replication_key: - msg = ( - f"Could not detect replication key for '{self.name}' " - f"stream(replication method={self.replication_method})" - ) - raise ValueError(msg) - treat_as_sorted = self.is_sorted - if not treat_as_sorted and self.state_partitioning_keys is not None: - # Streams with custom state partitioning are not resumable. - treat_as_sorted = False - increment_state( - state_dict, - replication_key=self.replication_key, # ty:ignore[invalid-argument-type] - latest_record=latest_record, - is_sorted=treat_as_sorted, - check_sorted=self.check_sorted, - ) + state_dict = self.get_context_state(context) + new_value = latest_record.get(self.replication_key) + if new_value is None: + return + + old_value = state_dict.get("replication_key_value") + if old_value is None or new_value >= old_value: + state_dict["replication_key"] = self.replication_key + state_dict["replication_key_value"] = new_value def get_records(self, context: Context | None) -> Iterable[dict[str, t.Any]]: """Return a generator of row-type dictionary objects.""" From 6f8be47596461f27205079219503589009a4f144 Mon Sep 17 00:00:00 2001 From: Kashif Sohail Date: Thu, 12 Mar 2026 20:53:21 +0500 Subject: [PATCH 3/4] Fix: Unbounded WAL retention and premature replication loop exit for LOG_BASED streams --- meltano.yml | 6 ++ tap_postgres/client.py | 187 +++++++++++++++++++++++++++++++++++------ 2 files changed, 167 insertions(+), 26 deletions(-) diff --git a/meltano.yml b/meltano.yml index 37a5c443..9968648a 100644 --- a/meltano.yml +++ b/meltano.yml @@ -42,6 +42,12 @@ plugins: - name: ssl_client_private_key kind: password sensitive: true + - name: replication_max_run_seconds + kind: integer + description: Max seconds the LOG_BASED replication loop will run before exiting (default 600). + - name: replication_idle_exit_seconds + kind: integer + description: Exit LOG_BASED replication if no data messages arrive for this many seconds (default 60). config: sqlalchemy_url: postgresql://postgres:postgres@localhost:5432/postgres select: diff --git a/tap_postgres/client.py b/tap_postgres/client.py index c2ddf3fd..dd3a276b 100644 --- a/tap_postgres/client.py +++ b/tap_postgres/client.py @@ -284,24 +284,42 @@ def _increment_stream_state( state_dict["replication_key_value"] = new_value def get_records(self, context: Context | None) -> Iterable[dict[str, t.Any]]: - """Return a generator of row-type dictionary objects.""" - status_interval = 5 # if no records in 5 seconds the tap can exit + """Return a generator of row-type dictionary objects. + + Runs a long-lived replication session (up to + ``replication_max_run_seconds``, default 600 s) so the tap can drain + large WAL backlogs in a single sync. Sends periodic flush feedback + while yielding records so the slot releases retained WAL incrementally. + + After the loop ends -- either because no data messages arrived for + ``replication_idle_exit_seconds`` (default 60 s) or the time budget is + exhausted -- the slot is advanced to the current WAL tip to prevent + unbounded WAL retention. + """ + status_interval = 10 + max_run_seconds = int( + self.config.get("replication_max_run_seconds", 600), + ) + idle_exit_seconds = int( + self.config.get("replication_idle_exit_seconds", 60), + ) + feedback_interval = 30 + start_lsn = self.get_starting_replication_key_value(context=context) if start_lsn is None: start_lsn = 0 + logical_replication_connection = self.logical_replication_connection() logical_replication_cursor = logical_replication_connection.cursor() - # Flush logs from the previous sync. send_feedback() will only flush LSNs before - # the value of flush_lsn, not including the value of flush_lsn, so this is safe - # even though we still want logs with an LSN == start_lsn. logical_replication_cursor.send_feedback(flush_lsn=start_lsn) - # get the slot name from the configuration or use the default value - replication_slot_name = self.config.get("replication_slot_name", "tappostgres") + replication_slot_name = self.config.get( + "replication_slot_name", "tappostgres", + ) logical_replication_cursor.start_replication( - slot_name=replication_slot_name, # use slot name + slot_name=replication_slot_name, decode=True, start_lsn=start_lsn, status_interval=status_interval, @@ -312,35 +330,152 @@ def get_records(self, context: Context | None) -> Iterable[dict[str, t.Any]]: }, ) - # Using scaffolding layout from: - # https://www.psycopg.org/docs/extras.html#psycopg2.extras.ReplicationCursor + run_start = datetime.datetime.now() + last_data_message = run_start + last_feedback_time = run_start + records_yielded = 0 + while True: + now = datetime.datetime.now() + elapsed = (now - run_start).total_seconds() + if elapsed > max_run_seconds: + self.logger.info( + "Reached max run time of %d seconds (%d records yielded)", + max_run_seconds, + records_yielded, + ) + break + message = logical_replication_cursor.read_message() if message: + last_data_message = datetime.datetime.now() row = self.consume(message, logical_replication_cursor) if row: + records_yielded += 1 yield row - else: - timeout = ( - status_interval - - ( - datetime.datetime.now() - logical_replication_cursor.feedback_timestamp - ).total_seconds() - ) - try: - # If the timeout has passed and the cursor still has no new - # messages, the sync has completed. if ( - select.select([logical_replication_cursor], [], [], max(0, timeout))[0] - == [] - ): - break - except InterruptedError: - pass + datetime.datetime.now() - last_feedback_time + ).total_seconds() >= feedback_interval: + try: + logical_replication_cursor.send_feedback( + flush_lsn=message.data_start, + ) + last_feedback_time = datetime.datetime.now() + except Exception: + pass + continue + + try: + ready = select.select( + [logical_replication_cursor], [], [], 1.0, + )[0] + except InterruptedError: + ready = True + + if not ready: + data_idle = ( + datetime.datetime.now() - last_data_message + ).total_seconds() + if data_idle >= idle_exit_seconds: + self.logger.info( + "No data messages for %.0f seconds, ending sync " + "(%d records yielded in %.0f seconds)", + data_idle, + records_yielded, + elapsed, + ) + break + + self._advance_slot_and_state( + logical_replication_cursor, + start_lsn, + context, + ) logical_replication_cursor.close() logical_replication_connection.close() + def _advance_slot_and_state( + self, + replication_cursor, + start_lsn: int, + context: Context | None, + ) -> None: + """Advance the replication slot and bookmark to the current WAL tip. + + When ``add-tables`` filters out most WAL records, the slot's confirmed + flush position can fall far behind the actual WAL tip, causing + PostgreSQL to retain gigabytes of WAL that will never be consumed. + + This method queries the server for its current WAL flush position on a + separate (regular) connection and, if it is ahead of ``start_lsn``: + + 1. Sends ``send_feedback`` on the replication cursor so the slot can + release retained WAL. + 2. Updates ``replication_key_value`` in the stream state so the next + sync resumes from the advanced position rather than re-scanning the + same WAL segment. + + Records between ``start_lsn`` and the new position for *other* tables + are irrelevant (filtered by ``add-tables``). Any matching records for + *this* table that fell within the scanned window were already yielded + by ``get_records``; records beyond the scan window will be picked up + from the new, advanced position on the next run. + """ + flush_lsn: int | None = None + + # Prefer the wal_end reported by the server during the replication + # session (set from keepalive or data messages). + try: + wal_end = getattr(replication_cursor, "wal_end", 0) or 0 + if wal_end > start_lsn: + flush_lsn = wal_end + except Exception: + pass + + # Fallback: query the server directly for the current WAL position. + if not flush_lsn or flush_lsn <= start_lsn: + flush_lsn = self._query_current_wal_lsn() + + if not flush_lsn or flush_lsn <= start_lsn: + return + + try: + replication_cursor.send_feedback(flush_lsn=flush_lsn) + self.logger.info( + "Advanced replication slot confirmed position from %d to %d " + "(delta %.2f MB)", + start_lsn, + flush_lsn, + (flush_lsn - start_lsn) / (1024 * 1024), + ) + except Exception as exc: + self.logger.warning("Failed to send final slot feedback: %s", exc) + return + + state_dict = self.get_context_state(context) + state_dict["replication_key"] = self.replication_key + state_dict["replication_key_value"] = flush_lsn + + def _query_current_wal_lsn(self) -> int | None: + """Query pg_current_wal_flush_lsn() and return the result as an int.""" + try: + conn = psycopg2.connect( + self.connection_parameters.render_as_psycopg2_dsn(), + ) + try: + conn.autocommit = True + with conn.cursor() as cur: + cur.execute("SELECT pg_current_wal_flush_lsn()") + lsn_str = cur.fetchone()[0] # e.g. '6/4A3B2C10' + hi, lo = lsn_str.split("/") + return (int(hi, 16) << 32) + int(lo, 16) + finally: + conn.close() + except Exception as exc: + self.logger.warning("Could not query current WAL LSN: %s", exc) + return None + def consume(self, message, cursor) -> dict | None: """Ingest WAL message.""" try: From c46b5adae38ca4dacbd888cd2c1dbb9e00fec136 Mon Sep 17 00:00:00 2001 From: Kashif Sohail Date: Fri, 13 Mar 2026 21:57:39 +0500 Subject: [PATCH 4/4] fix: Handle replication connection loss (PGRES_COPY_BOTH) and reduce keepalive overhead --- tap_postgres/client.py | 104 ++++++++++++++++++++++++++++++++--------- 1 file changed, 82 insertions(+), 22 deletions(-) diff --git a/tap_postgres/client.py b/tap_postgres/client.py index dd3a276b..50bcff91 100644 --- a/tap_postgres/client.py +++ b/tap_postgres/client.py @@ -291,18 +291,20 @@ def get_records(self, context: Context | None) -> Iterable[dict[str, t.Any]]: large WAL backlogs in a single sync. Sends periodic flush feedback while yielding records so the slot releases retained WAL incrementally. - After the loop ends -- either because no data messages arrived for - ``replication_idle_exit_seconds`` (default 60 s) or the time budget is - exhausted -- the slot is advanced to the current WAL tip to prevent - unbounded WAL retention. + Uses the server's ``wal_end`` keepalive signal to detect when the + server has finished scanning WAL for this table, allowing fast exit + (typically 5-35 s) instead of waiting the full idle timeout. The + ``replication_idle_exit_seconds`` (default 60 s) acts as a safety net + when keepalive detection is unavailable. """ - status_interval = 10 + status_interval = 30 max_run_seconds = int( self.config.get("replication_max_run_seconds", 600), ) idle_exit_seconds = int( self.config.get("replication_idle_exit_seconds", 60), ) + fast_exit_data_seconds = 5 feedback_interval = 30 start_lsn = self.get_starting_replication_key_value(context=context) @@ -315,7 +317,8 @@ def get_records(self, context: Context | None) -> Iterable[dict[str, t.Any]]: logical_replication_cursor.send_feedback(flush_lsn=start_lsn) replication_slot_name = self.config.get( - "replication_slot_name", "tappostgres", + "replication_slot_name", + "tappostgres", ) logical_replication_cursor.start_replication( @@ -334,6 +337,11 @@ def get_records(self, context: Context | None) -> Iterable[dict[str, t.Any]]: last_data_message = run_start last_feedback_time = run_start records_yielded = 0 + connection_lost = False + + prev_wal_end = 0 + wal_end_seen = False + last_wal_end_change = run_start while True: now = datetime.datetime.now() @@ -346,7 +354,25 @@ def get_records(self, context: Context | None) -> Iterable[dict[str, t.Any]]: ) break - message = logical_replication_cursor.read_message() + try: + message = logical_replication_cursor.read_message() + except psycopg2.DatabaseError as exc: + self.logger.warning( + "Replication connection lost after %d records in %.0f seconds: %s", + records_yielded, + elapsed, + exc, + ) + connection_lost = True + break + + current_wal_end = getattr(logical_replication_cursor, "wal_end", 0) or 0 + if current_wal_end != prev_wal_end: + if current_wal_end > 0: + wal_end_seen = True + last_wal_end_change = datetime.datetime.now() + prev_wal_end = current_wal_end + if message: last_data_message = datetime.datetime.now() row = self.consume(message, logical_replication_cursor) @@ -367,15 +393,35 @@ def get_records(self, context: Context | None) -> Iterable[dict[str, t.Any]]: try: ready = select.select( - [logical_replication_cursor], [], [], 1.0, + [logical_replication_cursor], + [], + [], + 1.0, )[0] - except InterruptedError: + except (InterruptedError, OSError): ready = True if not ready: - data_idle = ( - datetime.datetime.now() - last_data_message - ).total_seconds() + now = datetime.datetime.now() + data_idle = (now - last_data_message).total_seconds() + wal_end_stable_for = (now - last_wal_end_change).total_seconds() + + if ( + wal_end_seen + and wal_end_stable_for >= fast_exit_data_seconds + and data_idle >= fast_exit_data_seconds + ): + self.logger.info( + "Server caught up (wal_end stable for %.0fs, no data " + "for %.0fs), ending sync (%d records yielded in " + "%.0f seconds)", + wal_end_stable_for, + data_idle, + records_yielded, + elapsed, + ) + break + if data_idle >= idle_exit_seconds: self.logger.info( "No data messages for %.0f seconds, ending sync " @@ -386,14 +432,26 @@ def get_records(self, context: Context | None) -> Iterable[dict[str, t.Any]]: ) break - self._advance_slot_and_state( - logical_replication_cursor, - start_lsn, - context, - ) + if not connection_lost: + self._advance_slot_and_state( + logical_replication_cursor, + start_lsn, + context, + ) + else: + self.logger.info( + "Skipping slot advancement after connection loss to avoid " + "skipping unprocessed records. Bookmark stays at the last " + "yielded record; next run will resume from there. " + "(%d records yielded before disconnect)", + records_yielded, + ) - logical_replication_cursor.close() - logical_replication_connection.close() + try: + logical_replication_cursor.close() + logical_replication_connection.close() + except Exception: + pass def _advance_slot_and_state( self, @@ -443,8 +501,7 @@ def _advance_slot_and_state( try: replication_cursor.send_feedback(flush_lsn=flush_lsn) self.logger.info( - "Advanced replication slot confirmed position from %d to %d " - "(delta %.2f MB)", + "Advanced replication slot confirmed position from %d to %d (delta %.2f MB)", start_lsn, flush_lsn, (flush_lsn - start_lsn) / (1024 * 1024), @@ -467,7 +524,10 @@ def _query_current_wal_lsn(self) -> int | None: conn.autocommit = True with conn.cursor() as cur: cur.execute("SELECT pg_current_wal_flush_lsn()") - lsn_str = cur.fetchone()[0] # e.g. '6/4A3B2C10' + row = cur.fetchone() + if row is None: + return None + lsn_str = row[0] # e.g. '6/4A3B2C10' hi, lo = lsn_str.split("/") return (int(hi, 16) << 32) + int(lo, 16) finally: