Skip to content

Cooperative memory reclaim via async MemoryReclaimer#22043

Draft
JanKaul wants to merge 4 commits intoapache:mainfrom
Embucket:memory-reclaimer
Draft

Cooperative memory reclaim via async MemoryReclaimer#22043
JanKaul wants to merge 4 commits intoapache:mainfrom
Embucket:memory-reclaimer

Conversation

@JanKaul
Copy link
Copy Markdown
Contributor

@JanKaul JanKaul commented May 6, 2026

Adds an async hook that lets a MemoryPool ask other consumers to free memory before failing an allocation.

This PR is complementary to #21425 — it is not a replacement. It exists to broaden the design discussion there with a concrete alternative, not to supersede that work.

Design

  • trait MemoryReclaimer (async) attached to a MemoryConsumer via with_reclaimer. Implements: reclaim(target), optional reclaimable_bytes, optional priority.
  • MemoryPool::try_grow_async — default delegates to sync try_grow. TrackConsumersPool overrides it to walk registered reclaimers (priority desc, size desc) on OOM, retry the grow after
    each, then fall through to inner.try_grow_async so a wrapped reclaim-aware pool isn't shadowed.
  • Operator-side state machine (SortExec): a channel-based ExternalSorterReclaimer hands a oneshot to the partition's stream loop; tokio::select! biased { reclaim_rx.recv() … ; input.next() … } spills end-to-end before replying with the freed-byte count. The stream loop is the sole owner of the sorter's batches, so the spill is ordered before the report — the bytes the pool sees are bytes already on disk.

How this differs from #21425

  • Async trait + try_grow_async instead of sync pool.reclaim(...) — matches the channel hand-off pattern needed for cooperative spill inside DataFusion's async execution.
  • Auto-triggered on OOM rather than caller-driven.
  • Includes operator wiring for SortExec to demonstrate the full flow.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 6, 2026

Thank you for opening this pull request!

Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch).

Details
     Cloning apache/main
    Building datafusion-execution v53.1.0 (current)
       Built [  29.287s] (current)
     Parsing datafusion-execution v53.1.0 (current)
      Parsed [   0.027s] (current)
    Building datafusion-execution v53.1.0 (baseline)
       Built [  29.205s] (baseline)
     Parsing datafusion-execution v53.1.0 (baseline)
      Parsed [   0.028s] (baseline)
    Checking datafusion-execution v53.1.0 -> v53.1.0 (no change; assume patch)
     Checked [   0.291s] 222 checks: 221 pass, 1 fail, 0 warn, 30 skip

--- failure auto_trait_impl_removed: auto trait no longer implemented ---

Description:
A public type has stopped implementing one or more auto traits. This can break downstream code that depends on the traits being implemented.
        ref: https://doc.rust-lang.org/reference/special-types-and-traits.html#auto-traits
       impl: https://github.com/obi1kenobi/cargo-semver-checks/tree/v0.47.0/src/lints/auto_trait_impl_removed.ron

Failed in:
  type MemoryConsumer is no longer UnwindSafe, in /home/runner/work/datafusion/datafusion/datafusion/execution/src/memory_pool/mod.rs:263
  type MemoryConsumer is no longer RefUnwindSafe, in /home/runner/work/datafusion/datafusion/datafusion/execution/src/memory_pool/mod.rs:263
  type TrackConsumersPool is no longer UnwindSafe, in /home/runner/work/datafusion/datafusion/datafusion/execution/src/memory_pool/pool.rs:410

     Summary semver requires new major version: 1 major and 0 minor checks failed
    Finished [  60.335s] datafusion-execution
    Building datafusion-physical-plan v53.1.0 (current)
       Built [  31.760s] (current)
     Parsing datafusion-physical-plan v53.1.0 (current)
      Parsed [   0.131s] (current)
    Building datafusion-physical-plan v53.1.0 (baseline)
       Built [  31.563s] (baseline)
     Parsing datafusion-physical-plan v53.1.0 (baseline)
      Parsed [   0.132s] (baseline)
    Checking datafusion-physical-plan v53.1.0 -> v53.1.0 (no change; assume patch)
     Checked [   0.826s] 222 checks: 222 pass, 30 skip
     Summary no semver update required
    Finished [  65.813s] datafusion-physical-plan

@github-actions github-actions Bot added the auto detected api change Auto detected API change label May 6, 2026
@JanKaul
Copy link
Copy Markdown
Contributor Author

JanKaul commented May 7, 2026

If I use this branch to query a larger-than-memory dataset, I get:

Error: Resources exhausted: Additional allocation failed for ExternalSorterMerge[3] with top memory consumers (across reservations) as:
  ExternalSorter[6]#13(can spill: true) consumed 597.0 MB, peak 597.0 MB,
  ExternalSorter[12]#138(can spill: true) consumed 597.0 MB, peak 597.0 MB,
  ExternalSorter[2]#11(can spill: true) consumed 596.6 MB, peak 596.6 MB.
Error: Failed to allocate additional 192.0 KB for ExternalSorterMerge[3] with 277.7 MB already allocated for this reservation - 2.2 KB remain available for the total memory pool: greedy(used: 8.0 GB, pool_size: 8.0 GB)

With vanilla datafusion the ExternalSorter fails to allocate memory. So it looks like it solves the memory reclaiming issue for a single operator. However, now the next operator ExternalSorterMerge fails. So this solution doesn't handle cross operator reclamations.

I think we need a hierarchical design with a MemoryPool and Reclaimer tree such that we have full control. I think the Velox design would be really great.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto detected api change Auto detected API change execution Related to the execution crate physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant