Skip to content

Commit 97c986a

Browse files
spkrkaclaude
andcommitted
Share sparkey readers across DoFn instances with memory-aware heap fallback
- Shared reader cache: static ConcurrentHashMap deduplicates readers across all DoFn instances on a worker (was: one reader per vCPU thread). Loaded concurrently via a dedicated 4-thread daemon pool. - HostMemoryTracker: estimates off-heap budget (totalPhysical - maxHeap - 2GB) and heap budget (maxHeap - max(4GB, 10%)). Each shard atomically claims from off-heap first; if exhausted, claims heap and opens with Sparkey.reader().useHeap(true); if neither fits, falls back to mmap with a warning. - Bump sparkey 3.5.1 → 3.7.0 for the heap-backed reader API. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent d59fbba commit 97c986a

File tree

6 files changed

+366
-27
lines changed

6 files changed

+366
-27
lines changed

build.sbt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ val scalaCollectionCompatVersion = "2.14.0"
115115
val scalaMacrosVersion = "2.1.1"
116116
val scalatestVersion = "3.2.19"
117117
val shapelessVersion = "2.3.13"
118-
val sparkeyVersion = "3.5.1"
118+
val sparkeyVersion = "3.7.0"
119119
val tensorFlowVersion = "1.1.0"
120120
val tensorFlowMetadataVersion = "1.16.1"
121121
val testContainersVersion = "0.44.1"
@@ -330,6 +330,10 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
330330
),
331331
ProblemFilters.exclude[MissingClassProblem](
332332
"org.apache.beam.sdk.extensions.sorter.BufferedExternalSorter$Options"
333+
),
334+
// checkMemory replaced by HostMemoryTracker (private object, no external callers)
335+
ProblemFilters.exclude[DirectMissingMethodProblem](
336+
"com.spotify.scio.extra.sparkey.package#SparkeySideInput.checkMemory"
333337
)
334338
)
335339

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright 2026 Spotify AB.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing,
11+
* software distributed under the License is distributed on an
12+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
13+
* KIND, either express or implied. See the License for the
14+
* specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
18+
package com.spotify.scio.extra.sparkey
19+
20+
import com.sun.management.OperatingSystemMXBean
21+
import java.lang.management.ManagementFactory
22+
import java.util.concurrent.atomic.AtomicLong
23+
import org.slf4j.LoggerFactory
24+
25+
/**
26+
* Tracks memory budgets for sparkey readers on this JVM, covering both off-heap and on-heap memory.
27+
* On Dataflow, the JVM heap is hardcoded to ~70% of worker memory, leaving only ~30% for off-heap
28+
* use (page cache, OS, kernel, etc.).
29+
*
30+
* Sparkey readers are opened via mmap (off-heap) by default. When the off-heap budget is exhausted,
31+
* readers can fall back to heap-backed mode. This tracker provides atomic budget claiming for both
32+
* pools to coordinate across threads.
33+
*
34+
* Budget is claimed but never released — readers are cached for the JVM lifetime and never closed.
35+
* If a reader fails to open partway through (e.g. a sharded sparkey with some shards already
36+
* claimed), the budget for those shards is leaked. This is acceptable since a reader failure is
37+
* fatal to the pipeline.
38+
*/
39+
private[sparkey] class HostMemoryTracker(offHeapBudget: Long, heapBudget: Long) {
40+
private val logger = LoggerFactory.getLogger(this.getClass)
41+
42+
private val remainingOffHeap = new AtomicLong(offHeapBudget)
43+
private val remainingHeap = new AtomicLong(heapBudget)
44+
45+
logger.info(
46+
"Memory budgets — off-heap: {} bytes, heap: {} bytes",
47+
Long.box(offHeapBudget),
48+
Long.box(heapBudget)
49+
)
50+
51+
/**
52+
* Atomically try to claim `bytes` from the off-heap budget. Returns true if the claim succeeded
53+
* (enough budget remaining), false otherwise. On success, the budget is reduced by `bytes`. On
54+
* failure, the budget is unchanged.
55+
*/
56+
def tryClaimOffHeap(bytes: Long): Boolean = tryClaim(remainingOffHeap, "off-heap", bytes)
57+
58+
/**
59+
* Atomically try to claim `bytes` from the heap budget. Returns true if the claim succeeded
60+
* (enough budget remaining), false otherwise.
61+
*/
62+
def tryClaimHeap(bytes: Long): Boolean = tryClaim(remainingHeap, "heap", bytes)
63+
64+
private def tryClaim(budget: AtomicLong, name: String, bytes: Long): Boolean = {
65+
val prev =
66+
budget.getAndAccumulate(bytes, (current, b) => if (current >= b) current - b else current)
67+
if (prev >= bytes) {
68+
logger.info(
69+
"Claimed {} bytes of {} budget, {} bytes remaining",
70+
Long.box(bytes),
71+
name,
72+
Long.box(prev - bytes)
73+
)
74+
true
75+
} else {
76+
logger.debug(
77+
"Cannot claim {} bytes of {} budget, only {} bytes remaining",
78+
Long.box(bytes),
79+
name,
80+
Long.box(prev)
81+
)
82+
false
83+
}
84+
}
85+
}
86+
87+
private[sparkey] object HostMemoryTracker {
88+
private val logger = LoggerFactory.getLogger(this.getClass)
89+
90+
// Reserve 2 GB of off-heap memory for OS, kernel structures, JVM class files, Beam shuffle, etc.
91+
private val OffHeapReserveBytes: Long = 2L * 1024 * 1024 * 1024
92+
93+
// Reserve 10% of max heap (min 4 GB) for GC headroom and non-sparkey allocations.
94+
private val HeapReserveBytes: Long = {
95+
val maxHeap = Runtime.getRuntime.maxMemory()
96+
Math.max(4L * 1024 * 1024 * 1024, (maxHeap * 0.1).toLong)
97+
}
98+
99+
private val offHeapBudget: Long = {
100+
val totalPhysical = ManagementFactory.getOperatingSystemMXBean
101+
.asInstanceOf[OperatingSystemMXBean]
102+
.getTotalPhysicalMemorySize
103+
val maxHeap = Runtime.getRuntime.maxMemory()
104+
Math.max(0, totalPhysical - maxHeap - OffHeapReserveBytes)
105+
}
106+
107+
private val heapBudget: Long = {
108+
val maxHeap = Runtime.getRuntime.maxMemory()
109+
Math.max(0, maxHeap - HeapReserveBytes)
110+
}
111+
112+
logger.info(
113+
"Host memory — off-heap reserve: {}, heap reserve: {}",
114+
Long.box(OffHeapReserveBytes),
115+
Long.box(HeapReserveBytes)
116+
)
117+
118+
val instance: HostMemoryTracker = new HostMemoryTracker(offHeapBudget, heapBudget)
119+
}

scio-extra/src/main/scala/com/spotify/scio/extra/sparkey/SparkeyUri.scala

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ import java.io.File
2121
import java.net.URI
2222
import com.spotify.scio.util.{RemoteFileUtil, ScioUtil}
2323
import com.spotify.scio.extra.sparkey.instances.ShardedSparkeyReader
24-
import com.spotify.sparkey.Sparkey
25-
import com.spotify.sparkey.SparkeyReader
24+
import com.spotify.sparkey.{Sparkey, SparkeyReader}
2625
import org.apache.beam.sdk.io.FileSystems
2726
import org.apache.beam.sdk.io.fs.{EmptyMatchTreatment, MatchResult, ResourceId}
2827
import org.apache.beam.sdk.options.PipelineOptions
28+
import org.slf4j.LoggerFactory
2929

3030
import java.nio.file.Path
3131
import java.util.UUID
@@ -80,7 +80,7 @@ case class SparkeyUri(path: String) {
8080
if (!isSharded) {
8181
val path =
8282
if (isLocal) new File(basePath) else downloadRemoteUris(Seq(basePath), rfu).head.toFile
83-
Sparkey.open(path)
83+
ShardedSparkeyUri.openWithMemoryTracking(path)
8484
} else {
8585
val (basePaths, numShards) =
8686
ShardedSparkeyUri.basePathsAndCount(EmptyMatchTreatment.DISALLOW, globExpression)
@@ -113,6 +113,36 @@ case class SparkeyUri(path: String) {
113113
}
114114

115115
private[sparkey] object ShardedSparkeyUri {
116+
private val logger = LoggerFactory.getLogger(this.getClass)
117+
118+
/**
119+
* Open a sparkey reader with memory-aware strategy: try off-heap (mmap) first, fall back to
120+
* on-heap (byte[]) if page cache budget is exhausted, or use off-heap with a warning if neither
121+
* budget has room.
122+
*/
123+
private[sparkey] def openWithMemoryTracking(
124+
file: File,
125+
tracker: HostMemoryTracker = HostMemoryTracker.instance
126+
): SparkeyReader = {
127+
val indexFile = Sparkey.getIndexFile(file)
128+
val logFile = Sparkey.getLogFile(file)
129+
val totalBytes = indexFile.length() + logFile.length()
130+
131+
if (tracker.tryClaimOffHeap(totalBytes)) {
132+
Sparkey.open(indexFile)
133+
} else if (tracker.tryClaimHeap(totalBytes)) {
134+
logger.info("Opening {} on heap ({} bytes)", indexFile.getName, Long.box(totalBytes))
135+
Sparkey.reader().file(indexFile).useHeap(true).open()
136+
} else {
137+
logger.warn(
138+
"Neither off-heap nor heap budget available for {} ({} bytes), falling back to mmap",
139+
indexFile.getName,
140+
Long.box(totalBytes)
141+
)
142+
Sparkey.open(indexFile)
143+
}
144+
}
145+
116146
private[sparkey] def shardsFromPath(path: String): (Short, Short) = {
117147
"part-([0-9]+)-of-([0-9]+)".r
118148
.findFirstMatchIn(path)
@@ -131,7 +161,7 @@ private[sparkey] object ShardedSparkeyUri {
131161
): Map[Short, SparkeyReader] =
132162
localBasePaths.iterator.map { path =>
133163
val (shardIndex, _) = shardsFromPath(path)
134-
val reader = Sparkey.open(new File(path + ".spi"))
164+
val reader = openWithMemoryTracking(new File(path + ".spi"))
135165
(shardIndex, reader)
136166
}.toMap
137167

scio-extra/src/main/scala/com/spotify/scio/extra/sparkey/package.scala

Lines changed: 49 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ import org.apache.beam.sdk.util.CoderUtils
3232
import org.apache.beam.sdk.values.PCollectionView
3333
import org.slf4j.LoggerFactory
3434

35+
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, Executors}
36+
import java.util.concurrent.atomic.AtomicInteger
3537
import scala.util.hashing.MurmurHash3
3638

3739
/**
@@ -545,12 +547,11 @@ package object sparkey extends SparkeyReaderInstances with SparkeyCoders {
545547
mapFn: SparkeyReader => T
546548
) extends SideInput[T] {
547549
override def updateCacheOnGlobalWindow: Boolean = false
548-
override def get[I, O](context: DoFn[I, O]#ProcessContext): T =
549-
mapFn(
550-
SparkeySideInput.checkMemory(
551-
context.sideInput(view).getReader(RemoteFileUtil.create(context.getPipelineOptions))
552-
)
553-
)
550+
override def get[I, O](context: DoFn[I, O]#ProcessContext): T = {
551+
val uri = context.sideInput(view)
552+
val rfu = RemoteFileUtil.create(context.getPipelineOptions)
553+
mapFn(SparkeySideInput.getOrCreateReader(uri, rfu))
554+
}
554555
}
555556

556557
/**
@@ -561,8 +562,10 @@ package object sparkey extends SparkeyReaderInstances with SparkeyCoders {
561562
extends SideInput[SparkeyMap[K, V]] {
562563
override def updateCacheOnGlobalWindow: Boolean = false
563564
override def get[I, O](context: DoFn[I, O]#ProcessContext): SparkeyMap[K, V] = {
565+
val uri = context.sideInput(view)
566+
val rfu = RemoteFileUtil.create(context.getPipelineOptions)
564567
new SparkeyMap(
565-
context.sideInput(view).getReader(RemoteFileUtil.create(context.getPipelineOptions)),
568+
SparkeySideInput.getOrCreateReader(uri, rfu),
566569
CoderMaterializer.beam(context.getPipelineOptions, Coder[K]),
567570
CoderMaterializer.beam(context.getPipelineOptions, Coder[V])
568571
)
@@ -576,29 +579,53 @@ package object sparkey extends SparkeyReaderInstances with SparkeyCoders {
576579
private class LargeSetSideInput[K: Coder](val view: PCollectionView[SparkeyUri])
577580
extends SideInput[SparkeySet[K]] {
578581
override def updateCacheOnGlobalWindow: Boolean = false
579-
override def get[I, O](context: DoFn[I, O]#ProcessContext): SparkeySet[K] =
582+
override def get[I, O](context: DoFn[I, O]#ProcessContext): SparkeySet[K] = {
583+
val uri = context.sideInput(view)
584+
val rfu = RemoteFileUtil.create(context.getPipelineOptions)
580585
new SparkeySet(
581-
context.sideInput(view).getReader(RemoteFileUtil.create(context.getPipelineOptions)),
586+
SparkeySideInput.getOrCreateReader(uri, rfu),
582587
CoderMaterializer.beam(context.getPipelineOptions, Coder[K])
583588
)
589+
}
584590
}
585591

592+
// Readers are cached for the lifetime of the JVM and never closed. This is intentional:
593+
// Beam side inputs have no close/teardown lifecycle, and in batch pipelines the JVM exits
594+
// when the pipeline finishes. The HostMemoryTracker budget is similarly never released.
586595
private object SparkeySideInput {
587596
private val logger = LoggerFactory.getLogger(this.getClass)
588-
def checkMemory(reader: SparkeyReader): SparkeyReader = {
589-
val memoryBytes = java.lang.management.ManagementFactory.getOperatingSystemMXBean
590-
.asInstanceOf[com.sun.management.OperatingSystemMXBean]
591-
.getTotalPhysicalMemorySize
592-
if (reader.getTotalBytes > memoryBytes) {
593-
logger.warn(
594-
"Sparkey size {} > total memory {}, look up performance will be severely degraded. " +
595-
"Increase memory or use faster SSD drives.",
596-
reader.getTotalBytes,
597-
memoryBytes
598-
)
597+
598+
// Small dedicated pool for loading readers concurrently. Multiple side inputs can be
599+
// opened in parallel (useful when on-heap loading is CPU-bound), without risking
600+
// starvation of the common ForkJoinPool.
601+
private val threadCount = new AtomicInteger()
602+
private val loaderPool = Executors.newFixedThreadPool(
603+
4,
604+
r => {
605+
val t = new Thread(r)
606+
t.setDaemon(true)
607+
t.setName(s"sparkey-reader-loader-${threadCount.getAndIncrement()}")
608+
t
599609
}
600-
reader
601-
}
610+
)
611+
612+
private val readerCache =
613+
new ConcurrentHashMap[String, CompletableFuture[SparkeyReader]]()
614+
615+
def getOrCreateReader(uri: SparkeyUri, rfu: RemoteFileUtil): SparkeyReader =
616+
readerCache
617+
.computeIfAbsent(
618+
uri.path,
619+
_ =>
620+
CompletableFuture.supplyAsync(
621+
() => {
622+
logger.info("Opening shared sparkey reader for {}", uri.path)
623+
uri.getReader(rfu)
624+
},
625+
loaderPool
626+
)
627+
)
628+
.join()
602629
}
603630

604631
sealed trait SparkeyWritable[K, V] extends Serializable {
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2026 Spotify AB.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing,
11+
* software distributed under the License is distributed on an
12+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
13+
* KIND, either express or implied. See the License for the
14+
* specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
18+
package com.spotify.scio.extra.sparkey
19+
20+
import org.scalatest.flatspec.AnyFlatSpec
21+
import org.scalatest.matchers.should.Matchers
22+
23+
class HostMemoryTrackerTest extends AnyFlatSpec with Matchers {
24+
25+
"HostMemoryTracker" should "claim off-heap budget when sufficient" in {
26+
val tracker = new HostMemoryTracker(offHeapBudget = 100, heapBudget = 50)
27+
tracker.tryClaimOffHeap(60) shouldBe true
28+
tracker.tryClaimOffHeap(40) shouldBe true
29+
}
30+
31+
it should "reject off-heap claim when insufficient" in {
32+
val tracker = new HostMemoryTracker(offHeapBudget = 100, heapBudget = 50)
33+
tracker.tryClaimOffHeap(60) shouldBe true
34+
tracker.tryClaimOffHeap(50) shouldBe false
35+
}
36+
37+
it should "claim heap budget when sufficient" in {
38+
val tracker = new HostMemoryTracker(offHeapBudget = 0, heapBudget = 100)
39+
tracker.tryClaimHeap(60) shouldBe true
40+
tracker.tryClaimHeap(40) shouldBe true
41+
}
42+
43+
it should "reject heap claim when insufficient" in {
44+
val tracker = new HostMemoryTracker(offHeapBudget = 0, heapBudget = 100)
45+
tracker.tryClaimHeap(60) shouldBe true
46+
tracker.tryClaimHeap(50) shouldBe false
47+
}
48+
49+
it should "handle zero budgets" in {
50+
val tracker = new HostMemoryTracker(offHeapBudget = 0, heapBudget = 0)
51+
tracker.tryClaimOffHeap(1) shouldBe false
52+
tracker.tryClaimHeap(1) shouldBe false
53+
}
54+
55+
it should "handle exact budget claims" in {
56+
val tracker = new HostMemoryTracker(offHeapBudget = 100, heapBudget = 50)
57+
tracker.tryClaimOffHeap(100) shouldBe true
58+
tracker.tryClaimOffHeap(1) shouldBe false
59+
}
60+
61+
it should "track off-heap and heap budgets independently" in {
62+
val tracker = new HostMemoryTracker(offHeapBudget = 100, heapBudget = 100)
63+
tracker.tryClaimOffHeap(100) shouldBe true
64+
tracker.tryClaimOffHeap(1) shouldBe false
65+
// heap should still be available
66+
tracker.tryClaimHeap(100) shouldBe true
67+
tracker.tryClaimHeap(1) shouldBe false
68+
}
69+
70+
"HostMemoryTracker.instance" should "exist as a singleton" in {
71+
HostMemoryTracker.instance should not be null
72+
// same reference
73+
HostMemoryTracker.instance shouldBe theSameInstanceAs(HostMemoryTracker.instance)
74+
}
75+
}

0 commit comments

Comments
 (0)