[All] Graceful close on arrow streams#258
Conversation
e64589d to
e5de6d8
Compare
Signed-off-by: elenagaljak-db <elena.galjak@databricks.com>
| )); | ||
| } | ||
|
|
||
| is_paused.store(true, Ordering::Relaxed); |
There was a problem hiding this comment.
I think the is_paused flag is not actually checked where it should be checked. We flip it to true here once a close signal is received, but send_flight_data_internal never reads it. Ingest calls during the grace period continue to push FlightData onto the existing batch_tx mpsc channel and out over the DoPut stream that the server has already announced it is closing.
In send_flight_data_internal, after appending to pending_batches, we can skip the sender.send(...) loop while is_paused.load(Relaxed) is true (still return Ok(offset_id)). The supervisor will initialize a fresh batch_tx on recovery and the pending batches will be replayed.
| * Returns the maximum time in milliseconds to wait during graceful stream close. | ||
| * | ||
| * <p>When the server sends a close stream signal, the SDK enters a "paused" state where it stops | ||
| * accepting new batches but continues processing acknowledgments for in-flight batches. |
There was a problem hiding this comment.
I think this is a bit confusingly worded, the SDK doesn't stop accepting batches, it just stops sending them.
| flush_timeout_ms: opts.flush_timeout_ms.map(|v| v as u64).unwrap_or(default.flush_timeout_ms), | ||
| connection_timeout_ms: opts.connection_timeout_ms.map(|v| v as u64).unwrap_or(default.connection_timeout_ms), | ||
| ipc_compression, | ||
| stream_paused_max_wait_time_ms: opts.stream_paused_max_wait_time_ms.map(|v| v as u64), |
There was a problem hiding this comment.
Since TS SDK depends on a pinned Rust SDK version, I don't think we should update TS here, but instead first release a new Rust SDK version and then use that version in TS.
| /// Maximum time in milliseconds to wait during graceful stream close. | ||
| /// None = wait full server duration, 0 = immediate recovery, >0 = wait up to min(this, server_duration). | ||
| #[pyo3(get, set)] | ||
| pub stream_paused_max_wait_time_ms: Option<i64>, |
There was a problem hiding this comment.
Let's just add a negative check in to_rust like we do for other fields.
|
|
||
| // Capture whether this was a graceful close recovery before resetting. | ||
| // Graceful close recoveries should not consume the retry budget. | ||
| let was_graceful_close = is_paused.load(Ordering::Relaxed); |
There was a problem hiding this comment.
The supervisor decides whether to skip retry-budget accounting by reading
is_paused after process_acks returns:
But for two of the three close-signal branches in process_acks,
is_paused is never set to true before the function returns:
stream_paused_max_wait_time_ms == Some(0)- returns at line 1205 without
touchingis_paused.wait_duration_ms == 0(server says "close now", or
min(client_max, server_duration) == 0) - returns at line 1218 without
touchingis_paused.
In both cases was_graceful_close is observed false, so the supervisor
falls through to the else branch at line 836 and runs
recovery_attempts.fetch_add(1, Relaxed) - consuming a retry slot.
What changes are proposed in this pull request?
How is this tested?