perf: reclaim capacity in take_n during OOM-triggered partial-aggregation emit#22205
Open
RyanJamesStewart wants to merge 1 commit into
Open
perf: reclaim capacity in take_n during OOM-triggered partial-aggregation emit#22205RyanJamesStewart wants to merge 1 commit into
RyanJamesStewart wants to merge 1 commit into
Conversation
…tion emit Per apache#22164, the three Vec-based GroupColumn::take_n implementations (bytes.rs, primitive.rs, bytes_view.rs) use drain(0..n).collect() to extract the first-n elements. drain does not free capacity, so self.offsets / self.group_values / self.views keeps the pre-emit allocation, defeating the memory-pressure signal that triggered the early emit in the first place. Replace with mem::take + Vec::split_off(n) so the retained side becomes the freshly-sized buffer (capacity reclaimed) and the emitted side owns the original allocation, which is consumed and dropped immediately downstream (ScalarBuffer::from(Vec) is zero-copy via Vec::into_raw_parts). Allocation count is unchanged; the win is correct capacity assignment. Bytes.rs gets a secondary win: the subsequent first_n_offsets.push(offset_n) no longer triggers a realloc, since first_n_offsets now carries the pre-emit capacity rather than the exact-n capacity from collect(). boolean.rs::take_n is already capacity-correct via swap-then-truncate over BooleanBufferBuilder; out of scope. Tests: cargo test -p datafusion-physical-plan --lib aggregates::group_values passes (26 tests including test_byte_take_n, test_byte_view_take_n, test_byte_view_take_n_partial_completed_nonzero_index, test_emit_first_n_for_vectorized_group_values).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #22164.
Rationale for this change
When partial hash aggregation hits the memory limit and switches to early-emit mode (
EmitTo::First(n)), three of the fourGroupColumn::take_nimplementations extract the first n elements withdrain(0..n).collect::<Vec<_>>():bytes.rs:383(self.offsets)primitive.rs:270(self.group_values)bytes_view.rs:366(self.views)As the issue reporter noted:
draindoes not affect the Vec's capacity, soself.offsets/self.group_values/self.viewskeeps the pre-emit allocation with reduced length. The whole point of the OOM-triggered early emit is to release memory; with the current code the builder ends up holding the same buffer it had a moment before the emit, which defeats the signal.(The fourth implementation,
boolean.rs::take_n, already does the right thing via swap-then-truncate overBooleanBufferBuilder; out of scope.)What changes are included in this PR?
For each of the three Vec-based call sites, replace
drain(0..n).collect()withstd::mem::take+Vec::split_off(n):Vec::split_off(at)keeps[0, at)inselfwith the original capacity unchanged and returns a new Vec for[at, len)sized to its actual length. Combined withmem::take, the retained side becomes the freshly-sized buffer and the emitted side owns the original allocation. That is the correct assignment for OOM-emit because:ScalarBuffer::from(Vec<T>), which is zero-copy viaVec::into_raw_parts; the original buffer travels into the outputArrayRefand is freed when downstream consumers drop it.Allocation accounting. Same allocation count as before.
mem::takeswaps withVec::new()(no allocation);split_offallocates exactly one Vec for the tail; the originaldrain(..n).collect()allocated exactly one Vec for the head. The fix changes which side gets the new allocation, not how many allocations happen. The win is correct capacity assignment, not reduced allocation work.Bonus on
bytes.rs. The drain-then-collect is immediately followed byfirst_n_offsets.push(offset_n)to close the offset range. Withcollect(),first_n_offsetshas capacity exactlyn, so the push triggered a reallocation. Withsplit_off,first_n_offsetscarries the pre-emit capacity, and the push fits without reallocating.Diff is 16 insertions, 3 deletions across the three files.
Are these changes tested?
Yes, by existing coverage:
test_byte_take_n(bytes.rs)test_byte_view_take_n,test_byte_view_take_n_partial_completed_nonzero_index(bytes_view.rs)test_emit_first_n_for_vectorized_group_values,test_hashtable_modifying_in_emit_first_nexercise the partial-aggregation emit path end-to-end through theGroupColumntrait, coveringprimitive.rs's implementation.cargo test -p datafusion-physical-plan --lib aggregates::group_valuespasses (26 tests, 0 failed).cargo clippy -p datafusion-physical-plan --lib -- -D warningsis clean.No new test added: the existing tests pin the take_n behavior (output array correctness and remaining-side state), and the fix is behaviorally identical at that level. The change is in which Vec each side ends up holding. The only observable difference is heap reservation on the retained side, which is what the issue reports;
Vec::capacity()is implementation-defined and asserting on it from a test would be flaky under allocator changes, so I have not added one. Happy to add a capacity assertion if you'd prefer it; flag and I'll push.Are there any user-facing changes?
No. No API changes, no behavior change at the operator level. Purely an internal memory-pressure improvement on the partial-aggregation early-emit path.
AI-assisted. The fix shape (mem::take + split_off direction) and the implementation are mine; I used Claude to help survey the three call sites and check that I had not missed a
take_nimpl elsewhere ingroup_values/. The substantive comprehension steps I went through:ScalarBuffer::from(Vec<T>) -> Buffer::from(Vec<T>) -> Vec::into_raw_parts, which is what justifies routing the original allocation to the emitted side.run benchmarkson a non-OOM workload. The improvement is qualitative (memory-pressure semantics), not throughput.boolean.rs::take_nis already capacity-correct (different primitive,BooleanBufferBuilderswap-then-truncate) so the PR is correctly scoped to the three Vec-based sites.Named unknown: I have not measured the heap-reservation improvement under an actual OOM-bound workload (e.g.
clickbench_partitionedwith a memory limit). The fix is justified by the mechanism analytically; an empirical peak-RSS confirmation belongs in a follow-up if you want the perf-evidence side strengthened.