Skip to content
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright 2026 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.scio.parquet

import com.spotify.scio.ScioContext
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import com.spotify.scio.util.ScioUtil
import com.spotify.scio.values.SCollection
import com.twitter.chill.ClosureCleaner
import org.apache.beam.sdk.io.FileSystems
import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO
import org.apache.beam.sdk.transforms.DoFn.{ProcessElement, Setup}
import org.apache.beam.sdk.transforms.{DoFn, SimpleFunction}
import org.apache.beam.sdk.values.TypeDescriptor
import org.apache.hadoop.conf.Configuration
import org.slf4j.LoggerFactory

import scala.reflect.ClassTag

private[parquet] object HadoopParquet {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to reduce duplication or is there a functional change here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to reduce, no new functionality, except I noticed that in some cases Scio's derived coder was not set to HadoopFormatIO transformation. Probably Beam auto-derives the same coder, but anyway it is better to set explicitly


/** Read data from Hadoop format using HadoopFormatIO. */
def readHadoopFormatIO[A: ClassTag, T: ClassTag](
sc: ScioContext,
conf: Configuration,
projectionFn: Option[A => T],
skipValueClone: Option[Boolean]
)(implicit coder: Coder[T]): SCollection[T] = {
val inputType = ScioUtil.classOf[A]
val outputType = ScioUtil.classOf[T]
val bcoder = CoderMaterializer.beam(sc, Coder[T])
Comment thread
shnapz marked this conversation as resolved.
Outdated

val hadoop = HadoopFormatIO
.read[java.lang.Boolean, T]()
// Hadoop input always emit key-value, and `Void` causes NPE in Beam coder
.withKeyTranslation(new SimpleFunction[Void, java.lang.Boolean]() {
override def apply(input: Void): java.lang.Boolean = true
})

val withSkipClone = skipValueClone.fold(hadoop)(skip => hadoop.withSkipValueClone(skip))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withSkipValueClone?


val withValueTranslation = projectionFn.fold {
withSkipClone.withValueTranslation(
new SimpleFunction[T, T]() {
override def apply(input: T): T = input

override def getInputTypeDescriptor: TypeDescriptor[T] = TypeDescriptor.of(outputType)
Comment thread
shnapz marked this conversation as resolved.
Outdated

override def getOutputTypeDescriptor = TypeDescriptor.of(outputType)
},
bcoder
)
} { fn =>
val g = ClosureCleaner.clean(fn) // defeat closure
withSkipClone.withValueTranslation(
new SimpleFunction[A, T]() {
// Workaround for incomplete Avro objects
// `SCollection#map` might throw NPE on incomplete Avro objects when the runner tries
// to serialized them. Lifting the mapping function here fixes the problem.
override def apply(input: A): T = g(input)

override def getInputTypeDescriptor = TypeDescriptor.of(inputType)

override def getOutputTypeDescriptor = TypeDescriptor.of(outputType)
},
bcoder
)
}

sc.applyTransform(withValueTranslation.withConfiguration(conf)).map(_.getValue)
}
}

/**
* DoFn that reports directory-level source lineage for legacy Parquet reads.
*
* @param filePattern
* The file pattern or path to report lineage for
*/
private[parquet] class LineageReportDoFn[T](filePattern: String) extends DoFn[T, T] {

@transient
private lazy val logger = LoggerFactory.getLogger(this.getClass)

@Setup
def setup(): Unit = {
try {
val isDirectory = filePattern.endsWith("/")
val resourceId = FileSystems.matchNewResource(filePattern, isDirectory)
val directory = resourceId.getCurrentDirectory
FileSystems.reportSourceLineage(directory)
} catch {
case e: Exception =>
logger.warn(
s"Error when reporting lineage for pattern: $filePattern",
e
)
}
}

@ProcessElement
def processElement(@DoFn.Element element: T, out: DoFn.OutputReceiver[T]): Unit =
out.output(element)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

package com.spotify.scio.parquet.avro

import java.lang.{Boolean => JBoolean}
import com.spotify.scio.ScioContext
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT}
import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory}
import com.spotify.scio.parquet.{GcsConnectorUtil, ParquetConfiguration}
import com.spotify.scio.parquet.{
GcsConnectorUtil,
HadoopParquet,
LineageReportDoFn,
ParquetConfiguration
}
import com.spotify.scio.testing.TestDataManager
import com.spotify.scio.util.{FilenamePolicySupplier, Functions, ScioUtil}
import com.spotify.scio.values.SCollection
Expand All @@ -32,12 +36,9 @@ import org.apache.avro.reflect.ReflectData
import org.apache.avro.specific.SpecificRecord
import org.apache.beam.sdk.io._
import org.apache.beam.sdk.transforms.SerializableFunctions
import org.apache.beam.sdk.transforms.SimpleFunction
import org.apache.beam.sdk.io.fs.ResourceId
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration
import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider
import org.apache.beam.sdk.values.TypeDescriptor
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
import org.apache.parquet.avro.{
Expand Down Expand Up @@ -245,26 +246,14 @@ object ParquetAvroIO {
job.getConfiguration.setClass("key.class", classOf[Void], classOf[Void])
job.getConfiguration.setClass("value.class", avroClass, avroClass)

val g = ClosureCleaner.clean(projectionFn) // defeat closure
val aCls = avroClass
val oCls = ScioUtil.classOf[T]
val transform = HadoopFormatIO
.read[JBoolean, T]()
// Hadoop input always emit key-value, and `Void` causes NPE in Beam coder
.withKeyTranslation(new SimpleFunction[Void, JBoolean]() {
override def apply(input: Void): JBoolean = true
})
.withValueTranslation(new SimpleFunction[A, T]() {
// Workaround for incomplete Avro objects
// `SCollection#map` might throw NPE on incomplete Avro objects when the runner tries
// to serialized them. Lifting the mapping function here fixes the problem.
override def apply(input: A): T = g(input)
override def getInputTypeDescriptor = TypeDescriptor.of(aCls)
override def getOutputTypeDescriptor = TypeDescriptor.of(oCls)
})
.withConfiguration(job.getConfiguration)

sc.applyTransform(transform).map(_.getValue)
HadoopParquet
.readHadoopFormatIO[A, T](
sc,
job.getConfiguration,
Some(projectionFn),
None
)
.parDo(new LineageReportDoFn(filePattern))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this going to result in a new node in the graph? Why are we doing this in sequence w/ the read if it's not actually using any of the read elements; we should be doing like the scio init metrics which is just its own distinct graph create impulse -> submit parquet lineage

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to correspond Beam conventions and have this metric associated with the actual read transform. This way we keep transform-level lineage (which is supported in Beam)

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.spotify.scio.parquet.read
import com.spotify.scio.parquet.BeamInputFile
import com.spotify.scio.parquet.read.ParquetReadFn._
import org.apache.beam.sdk.io.FileIO.ReadableFile
import org.apache.beam.sdk.io.FileSystems
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration
import org.apache.beam.sdk.io.range.OffsetRange
import org.apache.beam.sdk.transforms.DoFn._
Expand Down Expand Up @@ -167,6 +168,8 @@ class ParquetReadFn[T, R](
tracker.currentRestriction.getFrom,
if (splitGranularity == SplitGranularity.File) "end" else tracker.currentRestriction().getTo
)
FileSystems.reportSourceLineage(file.getMetadata.resourceId())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is different than the hadoop one insofar as we get every file here, right? That seems bad/annoying for using the lineage for anything

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually file-level lineage is the default approach in Beam. Which we might not need directly. Both Lineage Metric implementations (legacy and new one) work ok with many files:
StringSet has internal truncation to 100
BoundedTrie is a data structure that stores hierarchical data very well


val reader = parquetFileReader(file)
try {
val filter = options.getRecordFilter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,32 @@ import com.spotify.parquet.tensorflow.{
TensorflowExampleParquetReader,
TensorflowExampleReadSupport
}

import java.lang.{Boolean => JBoolean}
import com.spotify.scio.ScioContext
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT}
import com.spotify.scio.parquet.ParquetConfiguration
import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory}
import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil}
import com.spotify.scio.parquet.{
BeamInputFile,
GcsConnectorUtil,
HadoopParquet,
LineageReportDoFn,
ParquetConfiguration
}
import com.spotify.scio.testing.TestDataManager
import com.spotify.scio.util.ScioUtil
import com.spotify.scio.util.FilenamePolicySupplier
import com.spotify.scio.util.{FilenamePolicySupplier, ScioUtil}
import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration
import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO
import org.apache.beam.sdk.io._
import org.apache.beam.sdk.io.fs.ResourceId
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider
import org.apache.beam.sdk.transforms.SerializableFunctions
import org.apache.beam.sdk.transforms.SimpleFunction
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
import org.apache.parquet.filter2.predicate.FilterPredicate
import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetReader}
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.tensorflow.proto.{Example, Features}
import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetReader}
import org.tensorflow.metadata.v0.Schema
import org.tensorflow.proto.{Example, Features}

import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -100,7 +100,8 @@ final case class ParquetExampleIO(path: String) extends ScioIO[Example] {
params: ReadP
): SCollection[Example] = {
val job = Job.getInstance(conf)
GcsConnectorUtil.setInputPaths(sc, job, path)
val filePattern = ScioUtil.filePattern(path, params.suffix)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am surprised suffix was not used initially. Or was it intentional?

GcsConnectorUtil.setInputPaths(sc, job, filePattern)
job.setInputFormatClass(classOf[TensorflowExampleParquetInputFormat])
job.getConfiguration.setClass("key.class", classOf[Void], classOf[Void])
job.getConfiguration.setClass("value.class", classOf[Example], classOf[Example])
Expand All @@ -115,14 +116,14 @@ final case class ParquetExampleIO(path: String) extends ScioIO[Example] {
ParquetInputFormat.setFilterPredicate(job.getConfiguration, predicate)
}

val source = HadoopFormatIO
.read[JBoolean, Example]()
// Hadoop input always emit key-value, and `Void` causes NPE in Beam coder
.withKeyTranslation(new SimpleFunction[Void, JBoolean]() {
override def apply(input: Void): JBoolean = true
})
.withConfiguration(job.getConfiguration)
sc.applyTransform(source).map(_.getValue)
HadoopParquet
.readHadoopFormatIO[Example, Example](
sc,
job.getConfiguration,
projectionFn = None,
skipValueClone = None
)
.parDo(new LineageReportDoFn(filePattern))
}

override protected def readTest(sc: ScioContext, params: ReadP): SCollection[Example] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,25 @@

package com.spotify.scio.parquet.types

import java.lang.{Boolean => JBoolean}
import com.spotify.scio.ScioContext
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT}
import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory}
import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil, ParquetConfiguration}
import com.spotify.scio.util.ScioUtil
import com.spotify.scio.util.FilenamePolicySupplier
import com.spotify.scio.parquet.{
BeamInputFile,
GcsConnectorUtil,
HadoopParquet,
LineageReportDoFn,
ParquetConfiguration
}
import com.spotify.scio.util.{FilenamePolicySupplier, ScioUtil}
import com.spotify.scio.values.SCollection
import magnolify.parquet.ParquetType
import org.apache.beam.sdk.io.fs.ResourceId
import org.apache.beam.sdk.transforms.SerializableFunctions
import org.apache.beam.sdk.transforms.SimpleFunction
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration
import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO
import org.apache.beam.sdk.io.{DynamicFileDestinations, FileSystems, WriteFiles}
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider
import org.apache.beam.sdk.values.TypeDescriptor
import org.apache.beam.sdk.transforms.SerializableFunctions
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
import org.apache.parquet.filter2.predicate.FilterPredicate
Expand Down Expand Up @@ -99,27 +100,19 @@ final case class ParquetTypeIO[T: ClassTag: Coder: ParquetType](
ParquetInputFormat.setFilterPredicate(job.getConfiguration, params.predicate)
}

val source = HadoopFormatIO
.read[JBoolean, T]
// Hadoop input always emit key-value, and `Void` causes NPE in Beam coder
.withKeyTranslation(new SimpleFunction[Void, JBoolean]() {
override def apply(input: Void): JBoolean = true
})
.withValueTranslation(
new SimpleFunction[T, T]() {
override def apply(input: T): T = input
override def getInputTypeDescriptor: TypeDescriptor[T] = TypeDescriptor.of(cls)
},
CoderMaterializer.beam(sc, Coder[T])
)
.withConfiguration(job.getConfiguration)
.withSkipValueClone(
job.getConfiguration.getBoolean(
ParquetReadConfiguration.SkipClone: @nowarn("cat=deprecation"),
true
HadoopParquet
.readHadoopFormatIO[T, T](
sc,
job.getConfiguration,
projectionFn = None,
skipValueClone = Some(
job.getConfiguration.getBoolean(
ParquetReadConfiguration.SkipClone: @nowarn("cat=deprecation"),
true
)
)
)
sc.applyTransform(source).map(_.getValue)
.parDo(new LineageReportDoFn(filePattern))
}

private def parquetOut(
Expand Down