Skip to content

feat(sparkey): Deduplicate SparkeyReader across DoFn clones#5918

Closed
spkrka wants to merge 1 commit intospotify:mainfrom
spkrka:krka/reader-dedup
Closed

feat(sparkey): Deduplicate SparkeyReader across DoFn clones#5918
spkrka wants to merge 1 commit intospotify:mainfrom
spkrka:krka/reader-dedup

Conversation

@spkrka
Copy link
Copy Markdown
Member

@spkrka spkrka commented Apr 3, 2026

Summary

Beam creates one DoFn clone per vCPU thread (e.g. 80 on an n4-standard-80 worker). Previously, each clone independently called uri.getReader(), which downloads files from GCS and opens new SparkeyReader instances — duplicating file I/O, file descriptors, and mmap regions.

This adds a CompletableFuture-based reader cache so the first thread loads the reader and all others wait on the same future. The reader is reused for the JVM lifetime, which is safe for Dataflow batch (one pipeline per JVM).

Changes

  • SparkeySideInput, LargeMapSideInput, and LargeSetSideInput now go through SparkeySideInput.getOrCreateReader() instead of calling uri.getReader() directly
  • Reader cache is a ConcurrentHashMap[String, CompletableFuture[SparkeyReader]] keyed by URI path
  • checkMemory is preserved but called once per reader (inside the cache loader) instead of on every get() call
  • No API changes, no new dependencies, no version bumps

Why this is safe

  • No behavior change for correct programs: readers were already expected to be long-lived; this just ensures they're shared
  • Thread safety: SparkeyReader from Sparkey.open() returns a PooledSparkeyReader which is already thread-safe
  • Cache semantics: keyed by path, never evicted. Acceptable for Dataflow batch where one pipeline runs per JVM. For DirectRunner/REPL reuse, stale readers could be returned if the same path is rewritten — same limitation as the existing checkMemory which also uses a static reference

What this does NOT include

This is intentionally minimal. The heap-backed reader fallback and memory-aware shard placement from #5903 / #5904 are separate concerns and will be proposed separately (likely as opt-in features).

Test plan

  • Existing sparkey tests pass (no behavior change for single-reader scenarios)
  • Verified on production Dataflow pipeline (n4-standard-80, 80 DoFn clones) — readers are loaded once instead of 80 times

🤖 Generated with Claude Code

.computeIfAbsent(
uri.path,
_ =>
CompletableFuture.supplyAsync { () =>
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: using a future here to support loading multiple readers in parallel if they are expensive to create - currently that's not true, but if we add loading/locking/load to heap later it might be.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 But is the CompletableFuture any relevant if its call is always followed by .join()? I wonder removing CompletableFuture might be simpler.. If keeping, then consider leaving a comment for its motivation.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, perhaps you're right and I reasoned incorrectly about this - my hope was that multiple threads loading the same sideinputs would still allow for loading multiple things in parallel, but if the file ordering is stable, that doesn't actually help. Will think more about this, and potentially simplify

@codecov
Copy link
Copy Markdown

codecov bot commented Apr 3, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 61.72%. Comparing base (1480594) to head (2ce6af5).
⚠️ Report is 3 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5918      +/-   ##
==========================================
+ Coverage   61.71%   61.72%   +0.01%     
==========================================
  Files         317      317              
  Lines       11654    11660       +6     
  Branches      815      827      +12     
==========================================
+ Hits         7192     7197       +5     
- Misses       4462     4463       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Beam creates one DoFn clone per vCPU thread (e.g. 80 on n4-standard-80).
Previously, each clone independently called uri.getReader() which downloads
files from GCS and opens new SparkeyReader instances — duplicating work
and wasting file descriptors and mmap regions.

This adds a static ConcurrentHashMap<String, CompletableFuture<SparkeyReader>>
cache so the first thread loads the reader and all others wait on the same
future. The reader is reused for the lifetime of the JVM, which is safe for
Dataflow batch (one pipeline per JVM).

The cache is used by SparkeySideInput, LargeMapSideInput, and
LargeSetSideInput. No API changes, no new dependencies.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@spkrka spkrka force-pushed the krka/reader-dedup branch from efdd017 to 2ce6af5 Compare April 3, 2026 16:42
Comment on lines +609 to +611
val reader = uri.getReader(rfu)
checkMemory(reader)
reader
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val reader = uri.getReader(rfu)
checkMemory(reader)
reader
checkMemory(uri.getReader(rfu))

Can this be simplified?

.computeIfAbsent(
uri.path,
_ =>
CompletableFuture.supplyAsync { () =>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 But is the CompletableFuture any relevant if its call is always followed by .join()? I wonder removing CompletableFuture might be simpler.. If keeping, then consider leaving a comment for its motivation.

@spkrka spkrka closed this Apr 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants