Bump sparkey to 3.7.0, add heap budget support for side inputs#5901
Bump sparkey to 3.7.0, add heap budget support for side inputs#5901spkrka wants to merge 2 commits intospotify:mainfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #5901 +/- ##
==========================================
+ Coverage 61.54% 61.67% +0.12%
==========================================
Files 317 318 +1
Lines 11653 11715 +62
Branches 822 844 +22
==========================================
+ Hits 7172 7225 +53
- Misses 4481 4490 +9 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
1fc0253 to
f3cd63f
Compare
244d4fd to
57dba3f
Compare
|
As written, this api only helps when loading from an externally-produced sparkey, but we also have ones that can be produced in the same pipeline as they are read e.g. via the |
|
Good point, Sorry about converting to PR form - draft form was better, I was hoping it would trigger some type of snapshot build I could use! I'll switch back. On whether locally-produced sparkeys benefit from prefetching: I think it depends on the data flow. If the sparkey files go through GCS (write → download on each worker), the page cache will be cold and |
b0f141e to
8faf9b3
Compare
- Upgrade sparkey from 3.5.1 to 3.7.0 (adds heap-backed reader, builder API) - Add SparkeyReadConfig with heapBudgetBytes parameter controlling how much sparkey data is loaded into JVM heap (byte[]) vs memory-mapped files - Shards are sorted largest-first and greedily allocated to heap until budget is exhausted; remaining shards use mmap (current default behavior) - Default heapBudgetBytes=0 preserves existing all-mmap behavior Usage: val config = SparkeyReadConfig(heapBudgetBytes = 100L * 1024 * 1024 * 1024) sc.sparkeySideInput(path, config) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
8faf9b3 to
fbe4642
Compare
SideInput.get() is called per DoFn clone (one per vCPU). With mmap this was fine (OS deduplicates pages), but with heap-backed readers each clone allocated its own byte[] copies, causing OOM on high-vCPU machines. Use a static ConcurrentHashMap[path, CompletableFuture[SparkeyReader]] so all DoFn instances on the same worker share a single reader. The future is created eagerly via supplyAsync so computeIfAbsent returns quickly without holding the bucket lock during slow loading. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Closing, made a cleaner one here: #5903 |
Summary
SparkeyReadConfigwithheapBudgetBytesparameter controlling how much sparkey data is loaded into JVM heap (byte[]) vs memory-mapped filesheapBudgetBytes=0preserves existing all-mmap behaviorUsage
Motivation
Large sparkey side inputs using mmap can cause major GC and page fault overhead on Dataflow workers. Loading sparkey data into JVM heap instead eliminates page faults and allows the JVM to manage memory directly. The heap budget lets users control the tradeoff per side input.
Test plan
🤖 Generated with Claude Code