diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala new file mode 100644 index 0000000000..0ca9e34b87 --- /dev/null +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala @@ -0,0 +1,124 @@ +/* + * Copyright 2026 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.parquet + +import com.spotify.scio.ScioContext +import com.spotify.scio.coders.{Coder, CoderMaterializer} +import com.spotify.scio.util.ScioUtil +import com.spotify.scio.values.SCollection +import com.twitter.chill.ClosureCleaner +import org.apache.beam.sdk.io.FileSystems +import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO +import org.apache.beam.sdk.transforms.DoFn.ProcessElement +import org.apache.beam.sdk.transforms.{DoFn, SimpleFunction} +import org.apache.beam.sdk.values.TypeDescriptor +import org.apache.hadoop.conf.Configuration +import org.slf4j.LoggerFactory + +import java.util.concurrent.atomic.AtomicBoolean +import scala.reflect.ClassTag + +private[parquet] object HadoopParquet { + + /** Read data from Hadoop format using HadoopFormatIO. */ + def readHadoopFormatIO[A: ClassTag, T: ClassTag]( + sc: ScioContext, + conf: Configuration, + projectionFn: Option[A => T], + skipValueClone: Option[Boolean] + )(implicit coder: Coder[T]): SCollection[T] = { + val inputType = ScioUtil.classOf[A] + val outputType = ScioUtil.classOf[T] + val bcoder = CoderMaterializer.beam(sc, coder) + + val hadoop = HadoopFormatIO + .read[java.lang.Boolean, T]() + // Hadoop input always emit key-value, and `Void` causes NPE in Beam coder + .withKeyTranslation(new SimpleFunction[Void, java.lang.Boolean]() { + override def apply(input: Void): java.lang.Boolean = true + }) + + val withSkipClone = skipValueClone.fold(hadoop)(skip => hadoop.withSkipValueClone(skip)) + + val withValueTranslation = projectionFn.fold { + withSkipClone.withValueTranslation( + new SimpleFunction[T, T]() { + override def apply(input: T): T = input + + override def getInputTypeDescriptor = TypeDescriptor.of(outputType) + + override def getOutputTypeDescriptor = TypeDescriptor.of(outputType) + }, + bcoder + ) + } { fn => + val g = ClosureCleaner.clean(fn) // defeat closure + withSkipClone.withValueTranslation( + new SimpleFunction[A, T]() { + // Workaround for incomplete Avro objects + // `SCollection#map` might throw NPE on incomplete Avro objects when the runner tries + // to serialized them. Lifting the mapping function here fixes the problem. + override def apply(input: A): T = g(input) + + override def getInputTypeDescriptor = TypeDescriptor.of(inputType) + + override def getOutputTypeDescriptor = TypeDescriptor.of(outputType) + }, + bcoder + ) + } + + sc.applyTransform(withValueTranslation.withConfiguration(conf)).map(_.getValue) + } +} + +private[parquet] object LineageReportDoFn { + // Atomic flag to ensure lineage is reported only once per JVM + private val lineageReported = new AtomicBoolean(false) +} + +/** + * DoFn that reports directory-level source lineage for legacy Parquet reads. + * + * @param filePattern + * The file pattern or path to report lineage for + */ +private[parquet] class LineageReportDoFn[T](filePattern: String) extends DoFn[T, T] { + + @transient + private lazy val logger = LoggerFactory.getLogger(this.getClass) + + @ProcessElement + def processElement(@DoFn.Element element: T, out: DoFn.OutputReceiver[T]): Unit = { + if (LineageReportDoFn.lineageReported.compareAndSet(false, true)) { + try { + val isDirectory = filePattern.endsWith("/") + val resourceId = FileSystems.matchNewResource(filePattern, isDirectory) + val directory = resourceId.getCurrentDirectory + FileSystems.reportSourceLineage(directory) + } catch { + case e: Exception => + logger.warn( + s"Error when reporting lineage for pattern: $filePattern", + e + ) + } + } + out.output(element) + } +} diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala index 12520c9c0e..bf10161bf8 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala @@ -17,12 +17,16 @@ package com.spotify.scio.parquet.avro -import java.lang.{Boolean => JBoolean} import com.spotify.scio.ScioContext import com.spotify.scio.coders.{Coder, CoderMaterializer} import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT} import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory} -import com.spotify.scio.parquet.{GcsConnectorUtil, ParquetConfiguration} +import com.spotify.scio.parquet.{ + GcsConnectorUtil, + HadoopParquet, + LineageReportDoFn, + ParquetConfiguration +} import com.spotify.scio.testing.TestDataManager import com.spotify.scio.util.{FilenamePolicySupplier, Functions, ScioUtil} import com.spotify.scio.values.SCollection @@ -32,12 +36,9 @@ import org.apache.avro.reflect.ReflectData import org.apache.avro.specific.SpecificRecord import org.apache.beam.sdk.io._ import org.apache.beam.sdk.transforms.SerializableFunctions -import org.apache.beam.sdk.transforms.SimpleFunction import org.apache.beam.sdk.io.fs.ResourceId import org.apache.beam.sdk.io.hadoop.SerializableConfiguration -import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider -import org.apache.beam.sdk.values.TypeDescriptor import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.parquet.avro.{ @@ -245,26 +246,14 @@ object ParquetAvroIO { job.getConfiguration.setClass("key.class", classOf[Void], classOf[Void]) job.getConfiguration.setClass("value.class", avroClass, avroClass) - val g = ClosureCleaner.clean(projectionFn) // defeat closure - val aCls = avroClass - val oCls = ScioUtil.classOf[T] - val transform = HadoopFormatIO - .read[JBoolean, T]() - // Hadoop input always emit key-value, and `Void` causes NPE in Beam coder - .withKeyTranslation(new SimpleFunction[Void, JBoolean]() { - override def apply(input: Void): JBoolean = true - }) - .withValueTranslation(new SimpleFunction[A, T]() { - // Workaround for incomplete Avro objects - // `SCollection#map` might throw NPE on incomplete Avro objects when the runner tries - // to serialized them. Lifting the mapping function here fixes the problem. - override def apply(input: A): T = g(input) - override def getInputTypeDescriptor = TypeDescriptor.of(aCls) - override def getOutputTypeDescriptor = TypeDescriptor.of(oCls) - }) - .withConfiguration(job.getConfiguration) - - sc.applyTransform(transform).map(_.getValue) + HadoopParquet + .readHadoopFormatIO[A, T]( + sc, + job.getConfiguration, + Some(projectionFn), + None + ) + .parDo(new LineageReportDoFn(filePattern)) } } diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetReadFn.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetReadFn.scala index f0fa14fd4e..e724a748d0 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetReadFn.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetReadFn.scala @@ -19,6 +19,7 @@ package com.spotify.scio.parquet.read import com.spotify.scio.parquet.BeamInputFile import com.spotify.scio.parquet.read.ParquetReadFn._ import org.apache.beam.sdk.io.FileIO.ReadableFile +import org.apache.beam.sdk.io.FileSystems import org.apache.beam.sdk.io.hadoop.SerializableConfiguration import org.apache.beam.sdk.io.range.OffsetRange import org.apache.beam.sdk.transforms.DoFn._ @@ -167,6 +168,8 @@ class ParquetReadFn[T, R]( tracker.currentRestriction.getFrom, if (splitGranularity == SplitGranularity.File) "end" else tracker.currentRestriction().getTo ) + FileSystems.reportSourceLineage(file.getMetadata.resourceId()) + val reader = parquetFileReader(file) try { val filter = options.getRecordFilter diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala index 3efca5477d..8e615d8bf8 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala @@ -22,32 +22,32 @@ import com.spotify.parquet.tensorflow.{ TensorflowExampleParquetReader, TensorflowExampleReadSupport } - -import java.lang.{Boolean => JBoolean} import com.spotify.scio.ScioContext import com.spotify.scio.coders.{Coder, CoderMaterializer} import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT} -import com.spotify.scio.parquet.ParquetConfiguration import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory} -import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil} +import com.spotify.scio.parquet.{ + BeamInputFile, + GcsConnectorUtil, + HadoopParquet, + LineageReportDoFn, + ParquetConfiguration +} import com.spotify.scio.testing.TestDataManager -import com.spotify.scio.util.ScioUtil -import com.spotify.scio.util.FilenamePolicySupplier +import com.spotify.scio.util.{FilenamePolicySupplier, ScioUtil} import com.spotify.scio.values.SCollection -import org.apache.beam.sdk.io.hadoop.SerializableConfiguration -import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO import org.apache.beam.sdk.io._ import org.apache.beam.sdk.io.fs.ResourceId +import org.apache.beam.sdk.io.hadoop.SerializableConfiguration import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider import org.apache.beam.sdk.transforms.SerializableFunctions -import org.apache.beam.sdk.transforms.SimpleFunction import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.parquet.filter2.predicate.FilterPredicate -import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetReader} import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.tensorflow.proto.{Example, Features} +import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetReader} import org.tensorflow.metadata.v0.Schema +import org.tensorflow.proto.{Example, Features} import scala.jdk.CollectionConverters._ @@ -100,7 +100,8 @@ final case class ParquetExampleIO(path: String) extends ScioIO[Example] { params: ReadP ): SCollection[Example] = { val job = Job.getInstance(conf) - GcsConnectorUtil.setInputPaths(sc, job, path) + val filePattern = ScioUtil.filePattern(path, params.suffix) + GcsConnectorUtil.setInputPaths(sc, job, filePattern) job.setInputFormatClass(classOf[TensorflowExampleParquetInputFormat]) job.getConfiguration.setClass("key.class", classOf[Void], classOf[Void]) job.getConfiguration.setClass("value.class", classOf[Example], classOf[Example]) @@ -115,14 +116,14 @@ final case class ParquetExampleIO(path: String) extends ScioIO[Example] { ParquetInputFormat.setFilterPredicate(job.getConfiguration, predicate) } - val source = HadoopFormatIO - .read[JBoolean, Example]() - // Hadoop input always emit key-value, and `Void` causes NPE in Beam coder - .withKeyTranslation(new SimpleFunction[Void, JBoolean]() { - override def apply(input: Void): JBoolean = true - }) - .withConfiguration(job.getConfiguration) - sc.applyTransform(source).map(_.getValue) + HadoopParquet + .readHadoopFormatIO[Example, Example]( + sc, + job.getConfiguration, + projectionFn = None, + skipValueClone = None + ) + .parDo(new LineageReportDoFn(filePattern)) } override protected def readTest(sc: ScioContext, params: ReadP): SCollection[Example] = { diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala index 7acbcf8154..807c172428 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala @@ -17,24 +17,25 @@ package com.spotify.scio.parquet.types -import java.lang.{Boolean => JBoolean} import com.spotify.scio.ScioContext import com.spotify.scio.coders.{Coder, CoderMaterializer} import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT} import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory} -import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil, ParquetConfiguration} -import com.spotify.scio.util.ScioUtil -import com.spotify.scio.util.FilenamePolicySupplier +import com.spotify.scio.parquet.{ + BeamInputFile, + GcsConnectorUtil, + HadoopParquet, + LineageReportDoFn, + ParquetConfiguration +} +import com.spotify.scio.util.{FilenamePolicySupplier, ScioUtil} import com.spotify.scio.values.SCollection import magnolify.parquet.ParquetType import org.apache.beam.sdk.io.fs.ResourceId -import org.apache.beam.sdk.transforms.SerializableFunctions -import org.apache.beam.sdk.transforms.SimpleFunction import org.apache.beam.sdk.io.hadoop.SerializableConfiguration -import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO import org.apache.beam.sdk.io.{DynamicFileDestinations, FileSystems, WriteFiles} import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider -import org.apache.beam.sdk.values.TypeDescriptor +import org.apache.beam.sdk.transforms.SerializableFunctions import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.parquet.filter2.predicate.FilterPredicate @@ -99,27 +100,19 @@ final case class ParquetTypeIO[T: ClassTag: Coder: ParquetType]( ParquetInputFormat.setFilterPredicate(job.getConfiguration, params.predicate) } - val source = HadoopFormatIO - .read[JBoolean, T] - // Hadoop input always emit key-value, and `Void` causes NPE in Beam coder - .withKeyTranslation(new SimpleFunction[Void, JBoolean]() { - override def apply(input: Void): JBoolean = true - }) - .withValueTranslation( - new SimpleFunction[T, T]() { - override def apply(input: T): T = input - override def getInputTypeDescriptor: TypeDescriptor[T] = TypeDescriptor.of(cls) - }, - CoderMaterializer.beam(sc, Coder[T]) - ) - .withConfiguration(job.getConfiguration) - .withSkipValueClone( - job.getConfiguration.getBoolean( - ParquetReadConfiguration.SkipClone: @nowarn("cat=deprecation"), - true + HadoopParquet + .readHadoopFormatIO[T, T]( + sc, + job.getConfiguration, + projectionFn = None, + skipValueClone = Some( + job.getConfiguration.getBoolean( + ParquetReadConfiguration.SkipClone: @nowarn("cat=deprecation"), + true + ) ) ) - sc.applyTransform(source).map(_.getValue) + .parDo(new LineageReportDoFn(filePattern)) } private def parquetOut(