fix: propagate errors from take in sort merge join spill path#20480
fix: propagate errors from take in sort merge join spill path#20480andygrove wants to merge 1 commit intoapache:mainfrom
Conversation
The spill path in `fetch_right_columns_from_batch_by_idxs` used `vec.extend(take(column, indices, None))` which silently swallowed errors. Since `Result<T>` implements `IntoIterator` (yielding 0 items on `Err`), any error from `take` would silently drop that column from the output rather than propagating the error. Replace with `vec.push(take(...)?)` to properly propagate errors, and also fix incorrect `Vec::with_capacity(buffered_indices.len())` which pre-allocated based on row count instead of column count. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
jonathanc-n
left a comment
There was a problem hiding this comment.
This LGTM, nice catch
| ) -> Result<Vec<ArrayRef>> { | ||
| match &buffered_batch.batch { | ||
| // In memory batch | ||
| // In memory batch |
There was a problem hiding this comment.
Expanded upwards and found this duplicate comment. If possible can you quickly remove 😆
kosiew
left a comment
There was a problem hiding this comment.
Thanks for working on this.
| /// (yielding 0 items on `Err`), any `take` error would silently drop | ||
| /// that column from the output instead of propagating the error. | ||
| #[test] | ||
| fn spill_path_matches_in_memory_path() -> Result<()> { |
There was a problem hiding this comment.
The new regression test only covers the happy path.
It does not exercise the silent error-swallowing bug that this patch fixes.
The previous bug appeared when take(...) returned Err. Because Result<T, E> implements IntoIterator, vec.extend(result) would quietly add zero items instead of failing.
In this test, all indices are in bounds, so every take(...) succeeds. The old buggy implementation would also pass this test.
Please add a failure-path case. Use an out-of-bounds index (or another way to make take return an error).
The test should verify that the spilled path returns an error rather than a truncated column list.
| let mut buffered_cols: Vec<ArrayRef> = Vec::new(); | ||
| for batch in reader { | ||
| batch?.columns().iter().for_each(|column| { | ||
| buffered_cols.extend(take(column, &buffered_indices, None)) | ||
| }); | ||
| for column in batch?.columns() { | ||
| buffered_cols.push(take(column, buffered_indices, None)?); |
There was a problem hiding this comment.
The spilled branch still reimplements the column-by-column take logic.
It would be clearer if both branches shared a small helper over &[ArrayRef].
Alternatively, the spilled branch could call take_arrays(batch.columns(), buffered_indices, None) after reading the spilled batch.
|
|
||
| /// Creates a `BufferedBatch` with `BufferedBatchState::Spilled` by writing | ||
| /// the given batch to an IPC temp file. | ||
| fn make_spilled_batch(batch: &RecordBatch) -> Result<BufferedBatch> { |
There was a problem hiding this comment.
make_spilled_batch writes IPC files directly with StreamWriter, which is fine for this test, but using the existing spill machinery (SpillManager) would keep the test closer to the production path.
Summary
fetch_right_columns_from_batch_by_idxsspill path wherevec.extend(take(...))silently dropped errors (sinceResult<T>implementsIntoIterator, yielding 0 items onErr)vec.push(take(...)?)to properly propagate errorsVec::with_capacity(buffered_indices.len())which pre-allocated based on row count instead of column countTest plan
cargo test -p datafusion-physical-plan sort_merge_join— all 49 tests passcargo clippy -p datafusion-physical-plan -- -D warnings— cleanspill_path_matches_in_memory_pathtest verifies column-for-column parity between spill and in-memory paths🤖 Generated with Claude Code