Skip to content

[All] Graceful close on arrow streams#258

Open
elenagaljak-db wants to merge 4 commits intomainfrom
arrow-graceful-stream-close
Open

[All] Graceful close on arrow streams#258
elenagaljak-db wants to merge 4 commits intomainfrom
arrow-graceful-stream-close

Conversation

@elenagaljak-db
Copy link
Copy Markdown
Contributor

What changes are proposed in this pull request?

  • Adds stream_paused_max_wait_time_ms config option to ArrowStreamConfigurationOptions across all SDKs (Rust, Go, Python, TypeScript, Java)
  • Implements client-side graceful close state machine: when the server signals it will close the stream, the SDK enters a paused state, stops sending new batches but drains in-flight ack responses before triggering recovery
  • Graceful close recoveries don't count against the recovery_retries budget, since they're intentional server-side restarts (e.g. rolling deploys)

How is this tested?

  • 5 new integration tests in rust/tests/src/arrow_tests.rs covering: full server duration wait, immediate recovery, client-capped wait, early exit when all batches acked, ingest accepted during grace period

Signed-off-by: elenagaljak-db <elena.galjak@databricks.com>
@elenagaljak-db elenagaljak-db force-pushed the arrow-graceful-stream-close branch from e64589d to e5de6d8 Compare April 29, 2026 14:47
Signed-off-by: elenagaljak-db <elena.galjak@databricks.com>
Signed-off-by: elenagaljak-db <elena.galjak@databricks.com>
));
}

is_paused.store(true, Ordering::Relaxed);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the is_paused flag is not actually checked where it should be checked. We flip it to true here once a close signal is received, but send_flight_data_internal never reads it. Ingest calls during the grace period continue to push FlightData onto the existing batch_tx mpsc channel and out over the DoPut stream that the server has already announced it is closing.

In send_flight_data_internal, after appending to pending_batches, we can skip the sender.send(...) loop while is_paused.load(Relaxed) is true (still return Ok(offset_id)). The supervisor will initialize a fresh batch_tx on recovery and the pending batches will be replayed.

* Returns the maximum time in milliseconds to wait during graceful stream close.
*
* <p>When the server sends a close stream signal, the SDK enters a "paused" state where it stops
* accepting new batches but continues processing acknowledgments for in-flight batches.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a bit confusingly worded, the SDK doesn't stop accepting batches, it just stops sending them.

Comment thread typescript/src/lib.rs
flush_timeout_ms: opts.flush_timeout_ms.map(|v| v as u64).unwrap_or(default.flush_timeout_ms),
connection_timeout_ms: opts.connection_timeout_ms.map(|v| v as u64).unwrap_or(default.connection_timeout_ms),
ipc_compression,
stream_paused_max_wait_time_ms: opts.stream_paused_max_wait_time_ms.map(|v| v as u64),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since TS SDK depends on a pinned Rust SDK version, I don't think we should update TS here, but instead first release a new Rust SDK version and then use that version in TS.

Comment thread python/rust/src/arrow.rs
/// Maximum time in milliseconds to wait during graceful stream close.
/// None = wait full server duration, 0 = immediate recovery, >0 = wait up to min(this, server_duration).
#[pyo3(get, set)]
pub stream_paused_max_wait_time_ms: Option<i64>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just add a negative check in to_rust like we do for other fields.


// Capture whether this was a graceful close recovery before resetting.
// Graceful close recoveries should not consume the retry budget.
let was_graceful_close = is_paused.load(Ordering::Relaxed);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The supervisor decides whether to skip retry-budget accounting by reading
is_paused after process_acks returns:

But for two of the three close-signal branches in process_acks,
is_paused is never set to true before the function returns:

  • stream_paused_max_wait_time_ms == Some(0) - returns at line 1205 without
    touching is_paused.
  • wait_duration_ms == 0 (server says "close now", or
    min(client_max, server_duration) == 0) - returns at line 1218 without
    touching is_paused.

In both cases was_graceful_close is observed false, so the supervisor
falls through to the else branch at line 836 and runs
recovery_attempts.fetch_add(1, Relaxed) - consuming a retry slot.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants