diff --git a/meltano.yml b/meltano.yml index 37a5c443..9968648a 100644 --- a/meltano.yml +++ b/meltano.yml @@ -42,6 +42,12 @@ plugins: - name: ssl_client_private_key kind: password sensitive: true + - name: replication_max_run_seconds + kind: integer + description: Max seconds the LOG_BASED replication loop will run before exiting (default 600). + - name: replication_idle_exit_seconds + kind: integer + description: Exit LOG_BASED replication if no data messages arrive for this many seconds (default 60). config: sqlalchemy_url: postgresql://postgres:postgres@localhost:5432/postgres select: diff --git a/tap_postgres/client.py b/tap_postgres/client.py index 436cf01d..0beb9ceb 100644 --- a/tap_postgres/client.py +++ b/tap_postgres/client.py @@ -307,24 +307,45 @@ def _increment_stream_state( @override def get_records(self, context: Context | None) -> Iterable[dict[str, t.Any]]: - """Return a generator of row-type dictionary objects.""" - status_interval = 5 # if no records in 5 seconds the tap can exit + """Return a generator of row-type dictionary objects. + + Runs a long-lived replication session (up to + ``replication_max_run_seconds``, default 600 s) so the tap can drain + large WAL backlogs in a single sync. Sends periodic flush feedback + while yielding records so the slot releases retained WAL incrementally. + + Uses the server's ``wal_end`` keepalive signal to detect when the + server has finished scanning WAL for this table, allowing fast exit + (typically 5-35 s) instead of waiting the full idle timeout. The + ``replication_idle_exit_seconds`` (default 60 s) acts as a safety net + when keepalive detection is unavailable. + """ + status_interval = 30 + max_run_seconds = int( + self.config.get("replication_max_run_seconds", 600), + ) + idle_exit_seconds = int( + self.config.get("replication_idle_exit_seconds", 60), + ) + fast_exit_data_seconds = 5 + feedback_interval = 30 + start_lsn = self.get_starting_replication_key_value(context=context) if start_lsn is None: start_lsn = 0 + logical_replication_connection = self.logical_replication_connection() logical_replication_cursor = logical_replication_connection.cursor() - # Flush logs from the previous sync. send_feedback() will only flush LSNs before - # the value of flush_lsn, not including the value of flush_lsn, so this is safe - # even though we still want logs with an LSN == start_lsn. logical_replication_cursor.send_feedback(flush_lsn=start_lsn) - # get the slot name from the configuration or use the default value - replication_slot_name = self.config.get("replication_slot_name", "tappostgres") + replication_slot_name = self.config.get( + "replication_slot_name", + "tappostgres", + ) logical_replication_cursor.start_replication( - slot_name=replication_slot_name, # use slot name + slot_name=replication_slot_name, decode=True, start_lsn=start_lsn, status_interval=status_interval, @@ -335,34 +356,208 @@ def get_records(self, context: Context | None) -> Iterable[dict[str, t.Any]]: }, ) - # Using scaffolding layout from: - # https://www.psycopg.org/docs/extras.html#psycopg2.extras.ReplicationCursor + run_start = datetime.datetime.now() + last_data_message = run_start + last_feedback_time = run_start + records_yielded = 0 + connection_lost = False + + prev_wal_end = 0 + wal_end_seen = False + last_wal_end_change = run_start + while True: - message = logical_replication_cursor.read_message() + now = datetime.datetime.now() + elapsed = (now - run_start).total_seconds() + if elapsed > max_run_seconds: + self.logger.info( + "Reached max run time of %d seconds (%d records yielded)", + max_run_seconds, + records_yielded, + ) + break + + try: + message = logical_replication_cursor.read_message() + except psycopg2.DatabaseError as exc: + self.logger.warning( + "Replication connection lost after %d records in %.0f seconds: %s", + records_yielded, + elapsed, + exc, + ) + connection_lost = True + break + + current_wal_end = getattr(logical_replication_cursor, "wal_end", 0) or 0 + if current_wal_end != prev_wal_end: + if current_wal_end > 0: + wal_end_seen = True + last_wal_end_change = datetime.datetime.now() + prev_wal_end = current_wal_end + if message: + last_data_message = datetime.datetime.now() row = self.consume(message, logical_replication_cursor) if row: + records_yielded += 1 yield row - else: - timeout = ( - status_interval - - ( - datetime.datetime.now() - logical_replication_cursor.feedback_timestamp - ).total_seconds() - ) - try: - # If the timeout has passed and the cursor still has no new - # messages, the sync has completed. if ( - select.select([logical_replication_cursor], [], [], max(0, timeout))[0] - == [] - ): - break - except InterruptedError: - pass - - logical_replication_cursor.close() - logical_replication_connection.close() + datetime.datetime.now() - last_feedback_time + ).total_seconds() >= feedback_interval: + try: + logical_replication_cursor.send_feedback( + flush_lsn=message.data_start, + ) + last_feedback_time = datetime.datetime.now() + except Exception: + pass + continue + + try: + ready = select.select( + [logical_replication_cursor], + [], + [], + 1.0, + )[0] + except (InterruptedError, OSError): + ready = True + + if not ready: + now = datetime.datetime.now() + data_idle = (now - last_data_message).total_seconds() + wal_end_stable_for = (now - last_wal_end_change).total_seconds() + + if ( + wal_end_seen + and wal_end_stable_for >= fast_exit_data_seconds + and data_idle >= fast_exit_data_seconds + ): + self.logger.info( + "Server caught up (wal_end stable for %.0fs, no data " + "for %.0fs), ending sync (%d records yielded in " + "%.0f seconds)", + wal_end_stable_for, + data_idle, + records_yielded, + elapsed, + ) + break + + if data_idle >= idle_exit_seconds: + self.logger.info( + "No data messages for %.0f seconds, ending sync " + "(%d records yielded in %.0f seconds)", + data_idle, + records_yielded, + elapsed, + ) + break + + if not connection_lost: + self._advance_slot_and_state( + logical_replication_cursor, + start_lsn, + context, + ) + else: + self.logger.info( + "Skipping slot advancement after connection loss to avoid " + "skipping unprocessed records. Bookmark stays at the last " + "yielded record; next run will resume from there. " + "(%d records yielded before disconnect)", + records_yielded, + ) + + try: + logical_replication_cursor.close() + logical_replication_connection.close() + except Exception: + pass + + def _advance_slot_and_state( + self, + replication_cursor, + start_lsn: int, + context: Context | None, + ) -> None: + """Advance the replication slot and bookmark to the current WAL tip. + + When ``add-tables`` filters out most WAL records, the slot's confirmed + flush position can fall far behind the actual WAL tip, causing + PostgreSQL to retain gigabytes of WAL that will never be consumed. + + This method queries the server for its current WAL flush position on a + separate (regular) connection and, if it is ahead of ``start_lsn``: + + 1. Sends ``send_feedback`` on the replication cursor so the slot can + release retained WAL. + 2. Updates ``replication_key_value`` in the stream state so the next + sync resumes from the advanced position rather than re-scanning the + same WAL segment. + + Records between ``start_lsn`` and the new position for *other* tables + are irrelevant (filtered by ``add-tables``). Any matching records for + *this* table that fell within the scanned window were already yielded + by ``get_records``; records beyond the scan window will be picked up + from the new, advanced position on the next run. + """ + flush_lsn: int | None = None + + # Prefer the wal_end reported by the server during the replication + # session (set from keepalive or data messages). + try: + wal_end = getattr(replication_cursor, "wal_end", 0) or 0 + if wal_end > start_lsn: + flush_lsn = wal_end + except Exception: + pass + + # Fallback: query the server directly for the current WAL position. + if not flush_lsn or flush_lsn <= start_lsn: + flush_lsn = self._query_current_wal_lsn() + + if not flush_lsn or flush_lsn <= start_lsn: + return + + try: + replication_cursor.send_feedback(flush_lsn=flush_lsn) + self.logger.info( + "Advanced replication slot confirmed position from %d to %d (delta %.2f MB)", + start_lsn, + flush_lsn, + (flush_lsn - start_lsn) / (1024 * 1024), + ) + except Exception as exc: + self.logger.warning("Failed to send final slot feedback: %s", exc) + return + + state_dict = self.get_context_state(context) + state_dict["replication_key"] = self.replication_key + state_dict["replication_key_value"] = flush_lsn + + def _query_current_wal_lsn(self) -> int | None: + """Query pg_current_wal_flush_lsn() and return the result as an int.""" + try: + conn = psycopg2.connect( + self.connection_parameters.render_as_psycopg2_dsn(), + ) + try: + conn.autocommit = True + with conn.cursor() as cur: + cur.execute("SELECT pg_current_wal_flush_lsn()") + row = cur.fetchone() + if row is None: + return None + lsn_str = row[0] # e.g. '6/4A3B2C10' + hi, lo = lsn_str.split("/") + return (int(hi, 16) << 32) + int(lo, 16) + finally: + conn.close() + except Exception as exc: + self.logger.warning("Could not query current WAL LSN: %s", exc) + return None def consume(self, message, cursor) -> dict | None: """Ingest WAL message."""