Skip to content

fix: propagate errors from take in sort merge join spill path#20480

Open
andygrove wants to merge 1 commit intoapache:mainfrom
andygrove:fix/smj-spill-error-propagation
Open

fix: propagate errors from take in sort merge join spill path#20480
andygrove wants to merge 1 commit intoapache:mainfrom
andygrove:fix/smj-spill-error-propagation

Conversation

@andygrove
Copy link
Copy Markdown
Member

Summary

  • Fix silent error swallowing in fetch_right_columns_from_batch_by_idxs spill path where vec.extend(take(...)) silently dropped errors (since Result<T> implements IntoIterator, yielding 0 items on Err)
  • Replace with vec.push(take(...)?) to properly propagate errors
  • Fix incorrect Vec::with_capacity(buffered_indices.len()) which pre-allocated based on row count instead of column count
  • Add unit test verifying spill path produces identical results to in-memory path

Test plan

  • cargo test -p datafusion-physical-plan sort_merge_join — all 49 tests pass
  • cargo clippy -p datafusion-physical-plan -- -D warnings — clean
  • New spill_path_matches_in_memory_path test verifies column-for-column parity between spill and in-memory paths

🤖 Generated with Claude Code

The spill path in `fetch_right_columns_from_batch_by_idxs` used
`vec.extend(take(column, indices, None))` which silently swallowed
errors. Since `Result<T>` implements `IntoIterator` (yielding 0 items
on `Err`), any error from `take` would silently drop that column from
the output rather than propagating the error.

Replace with `vec.push(take(...)?)` to properly propagate errors, and
also fix incorrect `Vec::with_capacity(buffered_indices.len())` which
pre-allocated based on row count instead of column count.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@github-actions github-actions Bot added the physical-plan Changes to the physical-plan crate label Feb 22, 2026
@andygrove andygrove marked this pull request as ready for review February 22, 2026 18:04
Copy link
Copy Markdown
Contributor

@jonathanc-n jonathanc-n left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LGTM, nice catch

) -> Result<Vec<ArrayRef>> {
match &buffered_batch.batch {
// In memory batch
// In memory batch
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expanded upwards and found this duplicate comment. If possible can you quickly remove 😆

@mbutrovich mbutrovich self-requested a review February 23, 2026 14:20
Copy link
Copy Markdown
Contributor

@kosiew kosiew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andygrove

Thanks for working on this.

/// (yielding 0 items on `Err`), any `take` error would silently drop
/// that column from the output instead of propagating the error.
#[test]
fn spill_path_matches_in_memory_path() -> Result<()> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new regression test only covers the happy path.
It does not exercise the silent error-swallowing bug that this patch fixes.

The previous bug appeared when take(...) returned Err. Because Result<T, E> implements IntoIterator, vec.extend(result) would quietly add zero items instead of failing.

In this test, all indices are in bounds, so every take(...) succeeds. The old buggy implementation would also pass this test.

Please add a failure-path case. Use an out-of-bounds index (or another way to make take return an error).

The test should verify that the spilled path returns an error rather than a truncated column list.

Comment on lines +1643 to +1646
let mut buffered_cols: Vec<ArrayRef> = Vec::new();
for batch in reader {
batch?.columns().iter().for_each(|column| {
buffered_cols.extend(take(column, &buffered_indices, None))
});
for column in batch?.columns() {
buffered_cols.push(take(column, buffered_indices, None)?);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spilled branch still reimplements the column-by-column take logic.
It would be clearer if both branches shared a small helper over &[ArrayRef].
Alternatively, the spilled branch could call take_arrays(batch.columns(), buffered_indices, None) after reading the spilled batch.


/// Creates a `BufferedBatch` with `BufferedBatchState::Spilled` by writing
/// the given batch to an IPC temp file.
fn make_spilled_batch(batch: &RecordBatch) -> Result<BufferedBatch> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make_spilled_batch writes IPC files directly with StreamWriter, which is fine for this test, but using the existing spill machinery (SpillManager) would keep the test closer to the production path.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants