-
Notifications
You must be signed in to change notification settings - Fork 526
Report Beam Lineage from Parquet reads #5850
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e0dc0a4
88cc967
d651108
785ae4a
4ea6923
4a650a7
fe790e7
4c920e3
b0a3741
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,124 @@ | ||
| /* | ||
| * Copyright 2026 Spotify AB. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package com.spotify.scio.parquet | ||
|
|
||
| import com.spotify.scio.ScioContext | ||
| import com.spotify.scio.coders.{Coder, CoderMaterializer} | ||
| import com.spotify.scio.util.ScioUtil | ||
| import com.spotify.scio.values.SCollection | ||
| import com.twitter.chill.ClosureCleaner | ||
| import org.apache.beam.sdk.io.FileSystems | ||
| import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO | ||
| import org.apache.beam.sdk.transforms.DoFn.ProcessElement | ||
| import org.apache.beam.sdk.transforms.{DoFn, SimpleFunction} | ||
| import org.apache.beam.sdk.values.TypeDescriptor | ||
| import org.apache.hadoop.conf.Configuration | ||
| import org.slf4j.LoggerFactory | ||
|
|
||
| import java.util.concurrent.atomic.AtomicBoolean | ||
| import scala.reflect.ClassTag | ||
|
|
||
| private[parquet] object HadoopParquet { | ||
|
|
||
| /** Read data from Hadoop format using HadoopFormatIO. */ | ||
| def readHadoopFormatIO[A: ClassTag, T: ClassTag]( | ||
| sc: ScioContext, | ||
| conf: Configuration, | ||
| projectionFn: Option[A => T], | ||
| skipValueClone: Option[Boolean] | ||
| )(implicit coder: Coder[T]): SCollection[T] = { | ||
| val inputType = ScioUtil.classOf[A] | ||
| val outputType = ScioUtil.classOf[T] | ||
| val bcoder = CoderMaterializer.beam(sc, coder) | ||
|
|
||
| val hadoop = HadoopFormatIO | ||
| .read[java.lang.Boolean, T]() | ||
| // Hadoop input always emit key-value, and `Void` causes NPE in Beam coder | ||
| .withKeyTranslation(new SimpleFunction[Void, java.lang.Boolean]() { | ||
| override def apply(input: Void): java.lang.Boolean = true | ||
| }) | ||
|
|
||
| val withSkipClone = skipValueClone.fold(hadoop)(skip => hadoop.withSkipValueClone(skip)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| val withValueTranslation = projectionFn.fold { | ||
| withSkipClone.withValueTranslation( | ||
| new SimpleFunction[T, T]() { | ||
| override def apply(input: T): T = input | ||
|
|
||
| override def getInputTypeDescriptor = TypeDescriptor.of(outputType) | ||
|
|
||
| override def getOutputTypeDescriptor = TypeDescriptor.of(outputType) | ||
| }, | ||
| bcoder | ||
| ) | ||
| } { fn => | ||
| val g = ClosureCleaner.clean(fn) // defeat closure | ||
| withSkipClone.withValueTranslation( | ||
| new SimpleFunction[A, T]() { | ||
| // Workaround for incomplete Avro objects | ||
| // `SCollection#map` might throw NPE on incomplete Avro objects when the runner tries | ||
| // to serialized them. Lifting the mapping function here fixes the problem. | ||
| override def apply(input: A): T = g(input) | ||
|
|
||
| override def getInputTypeDescriptor = TypeDescriptor.of(inputType) | ||
|
|
||
| override def getOutputTypeDescriptor = TypeDescriptor.of(outputType) | ||
| }, | ||
| bcoder | ||
| ) | ||
| } | ||
|
|
||
| sc.applyTransform(withValueTranslation.withConfiguration(conf)).map(_.getValue) | ||
| } | ||
| } | ||
|
|
||
| private[parquet] object LineageReportDoFn { | ||
| // Atomic flag to ensure lineage is reported only once per JVM | ||
| private val lineageReported = new AtomicBoolean(false) | ||
| } | ||
|
|
||
| /** | ||
| * DoFn that reports directory-level source lineage for legacy Parquet reads. | ||
| * | ||
| * @param filePattern | ||
| * The file pattern or path to report lineage for | ||
| */ | ||
| private[parquet] class LineageReportDoFn[T](filePattern: String) extends DoFn[T, T] { | ||
|
|
||
| @transient | ||
| private lazy val logger = LoggerFactory.getLogger(this.getClass) | ||
|
|
||
| @ProcessElement | ||
| def processElement(@DoFn.Element element: T, out: DoFn.OutputReceiver[T]): Unit = { | ||
| if (LineageReportDoFn.lineageReported.compareAndSet(false, true)) { | ||
| try { | ||
| val isDirectory = filePattern.endsWith("/") | ||
| val resourceId = FileSystems.matchNewResource(filePattern, isDirectory) | ||
| val directory = resourceId.getCurrentDirectory | ||
| FileSystems.reportSourceLineage(directory) | ||
| } catch { | ||
| case e: Exception => | ||
| logger.warn( | ||
| s"Error when reporting lineage for pattern: $filePattern", | ||
| e | ||
| ) | ||
| } | ||
| } | ||
| out.output(element) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,12 +17,16 @@ | |
|
|
||
| package com.spotify.scio.parquet.avro | ||
|
|
||
| import java.lang.{Boolean => JBoolean} | ||
| import com.spotify.scio.ScioContext | ||
| import com.spotify.scio.coders.{Coder, CoderMaterializer} | ||
| import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT} | ||
| import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory} | ||
| import com.spotify.scio.parquet.{GcsConnectorUtil, ParquetConfiguration} | ||
| import com.spotify.scio.parquet.{ | ||
| GcsConnectorUtil, | ||
| HadoopParquet, | ||
| LineageReportDoFn, | ||
| ParquetConfiguration | ||
| } | ||
| import com.spotify.scio.testing.TestDataManager | ||
| import com.spotify.scio.util.{FilenamePolicySupplier, Functions, ScioUtil} | ||
| import com.spotify.scio.values.SCollection | ||
|
|
@@ -32,12 +36,9 @@ import org.apache.avro.reflect.ReflectData | |
| import org.apache.avro.specific.SpecificRecord | ||
| import org.apache.beam.sdk.io._ | ||
| import org.apache.beam.sdk.transforms.SerializableFunctions | ||
| import org.apache.beam.sdk.transforms.SimpleFunction | ||
| import org.apache.beam.sdk.io.fs.ResourceId | ||
| import org.apache.beam.sdk.io.hadoop.SerializableConfiguration | ||
| import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO | ||
| import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider | ||
| import org.apache.beam.sdk.values.TypeDescriptor | ||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.mapreduce.Job | ||
| import org.apache.parquet.avro.{ | ||
|
|
@@ -245,26 +246,14 @@ object ParquetAvroIO { | |
| job.getConfiguration.setClass("key.class", classOf[Void], classOf[Void]) | ||
| job.getConfiguration.setClass("value.class", avroClass, avroClass) | ||
|
|
||
| val g = ClosureCleaner.clean(projectionFn) // defeat closure | ||
| val aCls = avroClass | ||
| val oCls = ScioUtil.classOf[T] | ||
| val transform = HadoopFormatIO | ||
| .read[JBoolean, T]() | ||
| // Hadoop input always emit key-value, and `Void` causes NPE in Beam coder | ||
| .withKeyTranslation(new SimpleFunction[Void, JBoolean]() { | ||
| override def apply(input: Void): JBoolean = true | ||
| }) | ||
| .withValueTranslation(new SimpleFunction[A, T]() { | ||
| // Workaround for incomplete Avro objects | ||
| // `SCollection#map` might throw NPE on incomplete Avro objects when the runner tries | ||
| // to serialized them. Lifting the mapping function here fixes the problem. | ||
| override def apply(input: A): T = g(input) | ||
| override def getInputTypeDescriptor = TypeDescriptor.of(aCls) | ||
| override def getOutputTypeDescriptor = TypeDescriptor.of(oCls) | ||
| }) | ||
| .withConfiguration(job.getConfiguration) | ||
|
|
||
| sc.applyTransform(transform).map(_.getValue) | ||
| HadoopParquet | ||
| .readHadoopFormatIO[A, T]( | ||
| sc, | ||
| job.getConfiguration, | ||
| Some(projectionFn), | ||
| None | ||
| ) | ||
| .parDo(new LineageReportDoFn(filePattern)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this going to result in a new node in the graph? Why are we doing this in sequence w/ the read if it's not actually using any of the read elements; we should be doing like the scio init metrics which is just its own distinct graph
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am trying to correspond Beam conventions and have this metric associated with the actual read transform. This way we keep transform-level lineage (which is supported in Beam) |
||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ package com.spotify.scio.parquet.read | |
| import com.spotify.scio.parquet.BeamInputFile | ||
| import com.spotify.scio.parquet.read.ParquetReadFn._ | ||
| import org.apache.beam.sdk.io.FileIO.ReadableFile | ||
| import org.apache.beam.sdk.io.FileSystems | ||
| import org.apache.beam.sdk.io.hadoop.SerializableConfiguration | ||
| import org.apache.beam.sdk.io.range.OffsetRange | ||
| import org.apache.beam.sdk.transforms.DoFn._ | ||
|
|
@@ -167,6 +168,8 @@ class ParquetReadFn[T, R]( | |
| tracker.currentRestriction.getFrom, | ||
| if (splitGranularity == SplitGranularity.File) "end" else tracker.currentRestriction().getTo | ||
| ) | ||
| FileSystems.reportSourceLineage(file.getMetadata.resourceId()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is different than the hadoop one insofar as we get every file here, right? That seems bad/annoying for using the lineage for anything
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually file-level lineage is the default approach in Beam. Which we might not need directly. Both Lineage Metric implementations (legacy and new one) work ok with many files: |
||
|
|
||
| val reader = parquetFileReader(file) | ||
| try { | ||
| val filter = options.getRecordFilter | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,32 +22,32 @@ import com.spotify.parquet.tensorflow.{ | |
| TensorflowExampleParquetReader, | ||
| TensorflowExampleReadSupport | ||
| } | ||
|
|
||
| import java.lang.{Boolean => JBoolean} | ||
| import com.spotify.scio.ScioContext | ||
| import com.spotify.scio.coders.{Coder, CoderMaterializer} | ||
| import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT} | ||
| import com.spotify.scio.parquet.ParquetConfiguration | ||
| import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory} | ||
| import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil} | ||
| import com.spotify.scio.parquet.{ | ||
| BeamInputFile, | ||
| GcsConnectorUtil, | ||
| HadoopParquet, | ||
| LineageReportDoFn, | ||
| ParquetConfiguration | ||
| } | ||
| import com.spotify.scio.testing.TestDataManager | ||
| import com.spotify.scio.util.ScioUtil | ||
| import com.spotify.scio.util.FilenamePolicySupplier | ||
| import com.spotify.scio.util.{FilenamePolicySupplier, ScioUtil} | ||
| import com.spotify.scio.values.SCollection | ||
| import org.apache.beam.sdk.io.hadoop.SerializableConfiguration | ||
| import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO | ||
| import org.apache.beam.sdk.io._ | ||
| import org.apache.beam.sdk.io.fs.ResourceId | ||
| import org.apache.beam.sdk.io.hadoop.SerializableConfiguration | ||
| import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider | ||
| import org.apache.beam.sdk.transforms.SerializableFunctions | ||
| import org.apache.beam.sdk.transforms.SimpleFunction | ||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.mapreduce.Job | ||
| import org.apache.parquet.filter2.predicate.FilterPredicate | ||
| import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetReader} | ||
| import org.apache.parquet.hadoop.metadata.CompressionCodecName | ||
| import org.tensorflow.proto.{Example, Features} | ||
| import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetReader} | ||
| import org.tensorflow.metadata.v0.Schema | ||
| import org.tensorflow.proto.{Example, Features} | ||
|
|
||
| import scala.jdk.CollectionConverters._ | ||
|
|
||
|
|
@@ -100,7 +100,8 @@ final case class ParquetExampleIO(path: String) extends ScioIO[Example] { | |
| params: ReadP | ||
| ): SCollection[Example] = { | ||
| val job = Job.getInstance(conf) | ||
| GcsConnectorUtil.setInputPaths(sc, job, path) | ||
| val filePattern = ScioUtil.filePattern(path, params.suffix) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am surprised suffix was not used initially. Or was it intentional? |
||
| GcsConnectorUtil.setInputPaths(sc, job, filePattern) | ||
| job.setInputFormatClass(classOf[TensorflowExampleParquetInputFormat]) | ||
| job.getConfiguration.setClass("key.class", classOf[Void], classOf[Void]) | ||
| job.getConfiguration.setClass("value.class", classOf[Example], classOf[Example]) | ||
|
|
@@ -115,14 +116,14 @@ final case class ParquetExampleIO(path: String) extends ScioIO[Example] { | |
| ParquetInputFormat.setFilterPredicate(job.getConfiguration, predicate) | ||
| } | ||
|
|
||
| val source = HadoopFormatIO | ||
| .read[JBoolean, Example]() | ||
| // Hadoop input always emit key-value, and `Void` causes NPE in Beam coder | ||
| .withKeyTranslation(new SimpleFunction[Void, JBoolean]() { | ||
| override def apply(input: Void): JBoolean = true | ||
| }) | ||
| .withConfiguration(job.getConfiguration) | ||
| sc.applyTransform(source).map(_.getValue) | ||
| HadoopParquet | ||
| .readHadoopFormatIO[Example, Example]( | ||
| sc, | ||
| job.getConfiguration, | ||
| projectionFn = None, | ||
| skipValueClone = None | ||
| ) | ||
| .parDo(new LineageReportDoFn(filePattern)) | ||
| } | ||
|
|
||
| override protected def readTest(sc: ScioContext, params: ReadP): SCollection[Example] = { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just to reduce duplication or is there a functional change here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to reduce, no new functionality, except I noticed that in some cases Scio's derived coder was not set to HadoopFormatIO transformation. Probably Beam auto-derives the same coder, but anyway it is better to set explicitly