@@ -26,7 +26,7 @@ import com.spotify.scio.io.{ClosedTap, EmptyTap}
2626import com .spotify .scio .testing .TestDataManager
2727import com .spotify .scio .util .{Cache , RemoteFileUtil }
2828import com .spotify .scio .values .{SCollection , SideInput }
29- import com .spotify .sparkey .{CompressionType , SparkeyReader }
29+ import com .spotify .sparkey .{CompressionType , LoadMode , SparkeyReader }
3030import org .apache .beam .sdk .transforms .{DoFn , View }
3131import org .apache .beam .sdk .util .CoderUtils
3232import org .apache .beam .sdk .values .PCollectionView
@@ -113,7 +113,11 @@ package object sparkey extends SparkeyReaderInstances with SparkeyCoders {
113113 /** Enhanced version of [[ScioContext ]] with Sparkey methods. */
114114 implicit class SparkeyScioContext (private val self : ScioContext ) extends AnyVal {
115115
116- private def sparkeySideInput [T ](basePath : String , mapFn : SparkeyReader => T ): SideInput [T ] = {
116+ private def sparkeySideInput [T ](
117+ basePath : String ,
118+ mapFn : SparkeyReader => T ,
119+ loadMode : LoadMode
120+ ): SideInput [T ] = {
117121 if (self.isTest) {
118122 val id = self.testId.get
119123 val view = TestDataManager
@@ -126,7 +130,7 @@ package object sparkey extends SparkeyReaderInstances with SparkeyCoders {
126130 val view : PCollectionView [SparkeyUri ] = self
127131 .parallelize(paths)
128132 .applyInternal(View .asSingleton())
129- new SparkeySideInput (view, mapFn)
133+ new SparkeySideInput (view, mapFn, loadMode )
130134 }
131135 }
132136
@@ -135,10 +139,18 @@ package object sparkey extends SparkeyReaderInstances with SparkeyCoders {
135139 * [[com.spotify.scio.values.SCollection.withSideInputs SCollection.withSideInputs ]]. If the
136140 * provided base path ends with "*", it will be treated as a sharded collection of Sparkey
137141 * files.
142+ *
143+ * @param loadMode
144+ * (optional) page cache prefetch mode. Use `LoadMode.ALL` to prefetch both index and log
145+ * files into memory on first access, reducing page faults for large files on slow storage.
146+ * Default is `LoadMode.NONE` (no prefetching).
138147 */
139148 @ experimental
140- def sparkeySideInput (basePath : String ): SideInput [SparkeyReader ] =
141- sparkeySideInput(basePath, identity)
149+ def sparkeySideInput (
150+ basePath : String ,
151+ loadMode : LoadMode = LoadMode .NONE
152+ ): SideInput [SparkeyReader ] =
153+ sparkeySideInput(basePath, identity, loadMode)
142154
143155 /**
144156 * Create a SideInput of `TypedSparkeyReader` from a [[SparkeyUri ]] base path, to be used with
@@ -159,7 +171,8 @@ package object sparkey extends SparkeyReaderInstances with SparkeyCoders {
159171 reader,
160172 decoder,
161173 Option (cache).getOrElse(Cache .noOp[String , T ])
162- )
174+ ),
175+ LoadMode .NONE
163176 )
164177
165178 /**
@@ -171,7 +184,11 @@ package object sparkey extends SparkeyReaderInstances with SparkeyCoders {
171184 basePath : String ,
172185 cache : Cache [String , String ]
173186 ): SideInput [CachedStringSparkeyReader ] =
174- sparkeySideInput(basePath, reader => new CachedStringSparkeyReader (reader, cache))
187+ sparkeySideInput(
188+ basePath,
189+ reader => new CachedStringSparkeyReader (reader, cache),
190+ LoadMode .NONE
191+ )
175192 }
176193
177194 /** Enhanced version of [[com.spotify.scio.values.SCollection SCollection ]] with Sparkey methods. */
@@ -476,7 +493,7 @@ package object sparkey extends SparkeyReaderInstances with SparkeyCoders {
476493 */
477494 @ experimental
478495 def asSparkeySideInput : SideInput [SparkeyReader ] =
479- new SparkeySideInput (self.applyInternal(View .asSingleton()), identity)
496+ new SparkeySideInput (self.applyInternal(View .asSingleton()), identity, LoadMode . NONE )
480497
481498 /**
482499 * Convert this SCollection to a SideInput of `SparkeyReader`, to be used with
@@ -490,7 +507,8 @@ package object sparkey extends SparkeyReaderInstances with SparkeyCoders {
490507 ): SideInput [TypedSparkeyReader [T ]] = {
491508 new SparkeySideInput (
492509 self.applyInternal(View .asSingleton()),
493- reader => new TypedSparkeyReader [T ](reader, decoder, cache)
510+ reader => new TypedSparkeyReader [T ](reader, decoder, cache),
511+ LoadMode .NONE
494512 )
495513 }
496514
@@ -504,7 +522,8 @@ package object sparkey extends SparkeyReaderInstances with SparkeyCoders {
504522 def asTypedSparkeySideInput [T ](decoder : Array [Byte ] => T ): SideInput [TypedSparkeyReader [T ]] =
505523 new SparkeySideInput (
506524 self.applyInternal(View .asSingleton()),
507- reader => new TypedSparkeyReader [T ](reader, decoder, Cache .noOp)
525+ reader => new TypedSparkeyReader [T ](reader, decoder, Cache .noOp),
526+ LoadMode .NONE
508527 )
509528
510529 /**
@@ -517,7 +536,8 @@ package object sparkey extends SparkeyReaderInstances with SparkeyCoders {
517536 ): SideInput [CachedStringSparkeyReader ] =
518537 new SparkeySideInput (
519538 self.applyInternal(View .asSingleton()),
520- reader => new CachedStringSparkeyReader (reader, cache)
539+ reader => new CachedStringSparkeyReader (reader, cache),
540+ LoadMode .NONE
521541 )
522542 }
523543
@@ -542,15 +562,17 @@ package object sparkey extends SparkeyReaderInstances with SparkeyCoders {
542562
543563 private class SparkeySideInput [T ](
544564 val view : PCollectionView [SparkeyUri ],
545- mapFn : SparkeyReader => T
565+ mapFn : SparkeyReader => T ,
566+ loadMode : LoadMode
546567 ) extends SideInput [T ] {
547568 override def updateCacheOnGlobalWindow : Boolean = false
548- override def get [I , O ](context : DoFn [I , O ]# ProcessContext ): T =
549- mapFn(
550- SparkeySideInput .checkMemory(
551- context.sideInput(view).getReader(RemoteFileUtil .create(context.getPipelineOptions))
552- )
569+ override def get [I , O ](context : DoFn [I , O ]# ProcessContext ): T = {
570+ val reader = SparkeySideInput .checkMemory(
571+ context.sideInput(view).getReader(RemoteFileUtil .create(context.getPipelineOptions))
553572 )
573+ reader.load(loadMode)
574+ mapFn(reader)
575+ }
554576 }
555577
556578 /**
0 commit comments