Skip to content

fix: track join_arrays memory in reservation after SMJ spill#21962

Open
SubhamSinghal wants to merge 7 commits intoapache:mainfrom
SubhamSinghal:smj-spill-join-arrays-memory-accounting
Open

fix: track join_arrays memory in reservation after SMJ spill#21962
SubhamSinghal wants to merge 7 commits intoapache:mainfrom
SubhamSinghal:smj-spill-join-arrays-memory-accounting

Conversation

@SubhamSinghal
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Related to the TODO at materializing_stream.rs:283 (from #17429): spilled BufferedBatch join key arrays are not tracked in memory reservation.

Rationale for this change

When a BufferedBatch is spilled to disk in Sort Merge Join, only the RecordBatch data is written to the IPC file. The join_arrays (evaluated join key columns) remain in memory because the merge-scan comparator needs them to detect key group boundaries.
Before this fix, these in-memory join_arrays were invisible to the memory pool:

allocate_reservation():
try_grow(size_estimation) → FAILS (pool full)
spill batch to disk
→ join_arrays still in memory, but reservation was never grown
→ pool thinks 0 bytes are used for this batch

free_reservation():
if InMemory → shrink(size_estimation)
if Spilled → no-op ← correct (nothing was grown), but join_arrays are invisible

With many spilled batches for a skewed key (e.g., millions of rows sharing the same join key), the untracked join_arrays memory accumulates. The memory pool cannot account for this when making spill decisions for concurrent operators.

What changes are included in this PR?

Memory accounting fix (materializing_stream.rs):

  • Add reserved_amount field to BufferedBatch — tracks how much memory was actually reserved in the pool for this batch
  • Add join_arrays_mem() helper — computes total memory of join key arrays
  • allocate_reservation(): after spilling, calls try_grow(join_arrays_mem) to track the remaining in-memory data. If the pool is too tight for even that, reserved_amount stays 0
    (best-effort, safe)
  • free_reservation(): shrinks by reserved_amount instead of checking InMemory variant. Invariant: only shrink by what was actually grown — no underflow risk
Scenario try_grow reserved_amount try_shrink Safe?
InMemory Ok(size_estimation) size_estimation size_estimation Yes
Spilled, tracked Ok(join_arrays_mem) join_arrays_mem join_arrays_mem Yes
Spilled, pool tight Err 0 0 (no-op) Yes

Tests (tests.rs):

  • spill_many_batches_same_key — 10+5 batches all sharing key=1, verifies correctness under heavy spilling
  • spill_string_join_keys — Utf8 join keys to exercise larger join_arrays footprint
  • spill_mixed_keys_some_match — multiple distinct keys with partial matching, tests Full outer join NULL rows from spilled batches
  • spill_join_arrays_memory_accounting — verifies memory pool is fully released after join completes (memory_pool.reserved() == 0) and peak_mem_used > 0

Are these changes tested?

Yes. Four new tests added covering heavy spilling with same-key batches, string join keys, mixed keys with partial matching, and memory pool accounting verification.

Are there any user-facing changes?

No.

@github-actions github-actions Bot added the physical-plan Changes to the physical-plan crate label Apr 30, 2026
@mbutrovich mbutrovich self-requested a review April 30, 2026 15:38
@mbutrovich
Copy link
Copy Markdown
Contributor

mbutrovich commented Apr 30, 2026

Thanks for picking this up, the accounting fix itself reads cleanly and the reserved_amount invariant (only shrink by what you grew) is easy to follow.

A few things I wanted to ask about:

Tests that fail without the fix?

overallocation_multi_batch_spill already covers the same shape as spill_many_batches_same_key: N+M batches all on one key, 500-byte limit, same join types, same [1, 50] batch sizes, same spilled-vs-non-spilled equality check. The new tests assert the same kinds of things, so I think they pass on main too.

spill_join_arrays_memory_accounting asserts pool.reserved() == 0 after the join, but that was already true before this PR: pre-fix, spilled batches never grew the reservation, so there was nothing left to release either.

Since this is a pure accounting fix with no observable behavior change, could you point to a test here that fails on main and passes on this branch? If one is hard to construct, I wonder if we'd be better off with a single targeted assertion added to an existing spill test (something like a mid-execution check that pool.reserved() reflects the spilled batches' join_arrays), rather than ~440 lines of new tests. Happy to hear if I'm missing what these catch.

peak_mem_used on the spill path

In allocate_reservation, the Ok(_) arm calls peak_mem_used().set_max(self.reservation.size()), but the new try_grow(join_arrays_mem) after a spill doesn't. For spill-heavy workloads that metric will under-report now that the pool is actually being grown on that path. Worth mirroring the set_max call there?

Test duplication

The three spill_* tests are structurally very similar (spill run + baseline, nested batch_size and join_type loops). Would a small helper parameterized over (left, right, on, memory_limit) be worth it, or do you think the explicit form reads better here?

Minor

  • reserved_amount: 0, // set by allocate_reservation() is already covered by the doc comment on the field itself, could probably drop the inline comment.
  • The field doc listing the three cases (InMemory / Spilled-tracked / Spilled-untracked) is really helpful, thanks for including it.

Overall the change looks safe and the invariant is nice. Mainly curious about the test angle before signing off.

@SubhamSinghal
Copy link
Copy Markdown
Contributor Author

@mbutrovich Thanks for reviewing this PR. I have removed redundant tests. I have added UT which would always hit spill path, with this peak_memory would be 0 in main and non-zero in this branch.

Added set_max call in spill path.

Copy link
Copy Markdown
Contributor

@kosiew kosiew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SubhamSinghal
Thanks for working on this. I think there is still a memory accounting gap in the spill path that needs to be addressed before this can merge.

// usually succeed. If it fails, reserved_amount stays 0 -
// best-effort tracking, free_reservation will safely be a no-op.
let join_arrays_mem = buffered_batch.join_arrays_mem();
if self.reservation.try_grow(join_arrays_mem).is_ok() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the spill path can still leave retained join key arrays invisible to the memory pool.

Right now, if the full batch try_grow(size_estimation) fails because the pool is full, and the follow-up try_grow(join_arrays_mem) also fails, we spill the IPC batch but still push buffered_batch with reserved_amount = 0.

At that point the operator is still holding the retained join_arrays, but the pool is no longer aware of them when making spill decisions for other operators. This seems like the same invariant violation we were trying to avoid.

I think this can still happen with concurrent reservations or when the memory limit is below a single join-array allocation, and in those cases many skewed spilled batches could accumulate untracked memory.

Can we make the retained in-memory portion accounted deterministically here? For example, by growing or resizing the reservation after the physical memory is retained, or by returning an error instead of continuing untracked.

It would also be great to add a regression test that covers the no-headroom path where try_grow(join_arrays_mem) fails, since the current test only exercises the successful reservation case.

@SubhamSinghal
Copy link
Copy Markdown
Contributor Author

SubhamSinghal commented May 6, 2026

Thanks for the review @kosiew

I've updated the implementation to use unconditional grow() instead. The join key arrays are physically in memory. Not tracking them gives the pool a stale view and could let concurrent operators over-allocate. Since grow() is infallible (returns ()), the accounting is now deterministic — no conditional path.
I've also added a regression test (spill_join_arrays_no_headroom) that configures the pool smaller than join_arrays_mem,
verifying:

  • The join completes correctly
  • peak_mem_used > 0 (force-grow always tracks)
  • runtime.memory_pool.reserved() == 0 after completion (grow/shrink balanced)

@mbutrovich
Copy link
Copy Markdown
Contributor

Thanks for turning this around quickly, the force-grow version is easier to reason about than the conditional one.

A few things I wanted to ask about before signing off:

grow() can push over the pool limit

reservation.grow() is infallible, so in the no-headroom case (which spill_join_arrays_no_headroom exercises) we'll end up with pool.reserved() > memory_limit. I think that's the right call since the memory is physically there, but it does mean the next operator to call try_grow sees a pool that's already over limit and fails. Could we add a one-line comment at the grow() site noting this is intentional? Future readers running into a "why is the pool over limit" question will appreciate the pointer.

Recomputing join_arrays memory

BufferedBatch::new already sums get_array_memory_size() over join_arrays as part of size_estimation (line 262-265). join_arrays_mem() walks the same arrays again on the spill path. Cheap, but would you be open to caching the sum as a field during construction? One less place for the two size calculations to drift if someone touches the estimation formula later.

try_shrink vs shrink on free

Given the invariant that we only shrink by what we grew, would shrink() read more symmetrically with the force-grow on the alloc side? The ? on try_shrink(reserved_amount) is propagating a Result that shouldn't be reachable. Not a blocker, just noticed the asymmetry.

peak_mem assertion strength

The two new tests check peak_mem > 0, which catches the "never tracked at all" regression nicely. Would peak_mem >= join_arrays_mem be worth tightening to? That would also catch a future bug where we track something but not the right amount. Happy to leave as-is if you think the current form is enough.

Perf-wise I looked at this against #20729 and I don't think the concerns there apply here. The extra grow() only fires on the spill branch, and spilling already pays for disk I/O, so one more pool call on a custom backend is in the noise. Non-spill workloads see the same pool-call count as before.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants