Skip to content

Partial hash aggregation produces batches reporting huge memory size #22526

@ariel-miculas

Description

@ariel-miculas

Describe the bug

For partial aggregation with GroupOrdering::None, a huge batch is produced after consuming all the input RecordBatches, which is further sliced in order to produce batches of batch_size length.
In row_hash.rs, ExecutionState::ProducingOutput(batch) slices the large batch:

                        let remaining = batch.slice(size, num_remaining);
                        let output = batch.slice(0, size);

Unfortunately get_array_memory_size for each of these small RecordBatches returns the physical memory of the initial huge batch, causing unnecessary spills in the downstream operator.

Operators such as RepartitionExec use batch.get_array_memory_size() for deciding whether or not to spill.

Related to #19481

To Reproduce

See the linked PR: #22527

Expected behavior

Option 1. Avoid producing the initial huge RecordBatch

Avoid producing a large RecordBatch and slicing it afterwards, instead emit only batch_size RecordBatches at a time, this would also fix: #18907
The issue with this approach is described here: #19906, i.e. the bottleneck of Vec::drain of shifting all the existing elements in the vector (though I'm not sure this is more significant then the performance impact of spilling)
Merging #15591 would probably make this solution more feasible, alternatively VecDeque could be considered: #19906 (comment)

Option 2. Fix memory accounting

Alternatively, fix the memory accounting for these small RecordBatches, so they only report the memory occupied by their slice instead of the entire underlying array

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions