Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2026 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.scio.parquet

import org.apache.beam.sdk.io.FileSystems
import org.apache.beam.sdk.transforms.DoFn
import org.apache.beam.sdk.transforms.DoFn.{ProcessElement, Setup}
import org.slf4j.LoggerFactory

/**
* DoFn that reports directory-level source lineage for legacy Parquet reads.
*
* @param filePattern
* The file pattern or path to report lineage for
*/
private[parquet] class LineageReportingDoFn[T](filePattern: String) extends DoFn[T, T] {

@transient
private lazy val logger = LoggerFactory.getLogger(classOf[LineageReportingDoFn[T]])

@Setup
def setup(): Unit = {
try {
val isDirectory = filePattern.endsWith("/")
val resourceId = FileSystems.matchNewResource(filePattern, isDirectory)
val directory = resourceId.getCurrentDirectory
FileSystems.reportSourceLineage(directory)
} catch {
case e: Exception =>
logger.warn(
s"Error when reporting lineage for pattern: $filePattern",
e
)
}
}

@ProcessElement
def processElement(@DoFn.Element element: T, out: DoFn.OutputReceiver[T]): Unit =
out.output(element)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.spotify.scio.ScioContext
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT}
import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory}
import com.spotify.scio.parquet.{GcsConnectorUtil, ParquetConfiguration}
import com.spotify.scio.parquet.{GcsConnectorUtil, LineageReportingDoFn, ParquetConfiguration}
import com.spotify.scio.testing.TestDataManager
import com.spotify.scio.util.{FilenamePolicySupplier, Functions, ScioUtil}
import com.spotify.scio.values.SCollection
Expand Down Expand Up @@ -264,7 +264,12 @@ object ParquetAvroIO {
})
.withConfiguration(job.getConfiguration)

sc.applyTransform(transform).map(_.getValue)
sc.applyTransform(transform)
.applyTransform(
org.apache.beam.sdk.transforms.ParDo
.of(new LineageReportingDoFn[org.apache.beam.sdk.values.KV[JBoolean, T]](filePattern))
)
.map(_.getValue)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.spotify.scio.parquet.read
import com.spotify.scio.parquet.BeamInputFile
import com.spotify.scio.parquet.read.ParquetReadFn._
import org.apache.beam.sdk.io.FileIO.ReadableFile
import org.apache.beam.sdk.io.FileSystems
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration
import org.apache.beam.sdk.io.range.OffsetRange
import org.apache.beam.sdk.transforms.DoFn._
Expand Down Expand Up @@ -167,6 +168,8 @@ class ParquetReadFn[T, R](
tracker.currentRestriction.getFrom,
if (splitGranularity == SplitGranularity.File) "end" else tracker.currentRestriction().getTo
)
FileSystems.reportSourceLineage(file.getMetadata.resourceId())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is different than the hadoop one insofar as we get every file here, right? That seems bad/annoying for using the lineage for anything

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually file-level lineage is the default approach in Beam. Which we might not need directly. Both Lineage Metric implementations (legacy and new one) work ok with many files:
StringSet has internal truncation to 100
BoundedTrie is a data structure that stores hierarchical data very well


val reader = parquetFileReader(file)
try {
val filter = options.getRecordFilter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import com.spotify.scio.coders.{Coder, CoderMaterializer}
import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT}
import com.spotify.scio.parquet.ParquetConfiguration
import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory}
import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil}
import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil, LineageReportingDoFn}
import com.spotify.scio.testing.TestDataManager
import com.spotify.scio.util.ScioUtil
import com.spotify.scio.util.FilenamePolicySupplier
Expand All @@ -39,8 +39,8 @@ import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO
import org.apache.beam.sdk.io._
import org.apache.beam.sdk.io.fs.ResourceId
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider
import org.apache.beam.sdk.transforms.SerializableFunctions
import org.apache.beam.sdk.transforms.SimpleFunction
import org.apache.beam.sdk.transforms.{ParDo, SerializableFunctions, SimpleFunction}
import org.apache.beam.sdk.values.KV
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
import org.apache.parquet.filter2.predicate.FilterPredicate
Expand Down Expand Up @@ -100,7 +100,8 @@ final case class ParquetExampleIO(path: String) extends ScioIO[Example] {
params: ReadP
): SCollection[Example] = {
val job = Job.getInstance(conf)
GcsConnectorUtil.setInputPaths(sc, job, path)
val filePattern = ScioUtil.filePattern(path, params.suffix)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am surprised suffix was not used initially. Or was it intentional?

GcsConnectorUtil.setInputPaths(sc, job, filePattern)
job.setInputFormatClass(classOf[TensorflowExampleParquetInputFormat])
job.getConfiguration.setClass("key.class", classOf[Void], classOf[Void])
job.getConfiguration.setClass("value.class", classOf[Example], classOf[Example])
Expand All @@ -122,7 +123,14 @@ final case class ParquetExampleIO(path: String) extends ScioIO[Example] {
override def apply(input: Void): JBoolean = true
})
.withConfiguration(job.getConfiguration)
sc.applyTransform(source).map(_.getValue)

sc.applyTransform(source)
.applyTransform(
ParDo.of(
new LineageReportingDoFn[KV[JBoolean, Example]](filePattern)
)
)
.map(_.getValue)
}

override protected def readTest(sc: ScioContext, params: ReadP): SCollection[Example] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,23 @@ import com.spotify.scio.ScioContext
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT}
import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory}
import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil, ParquetConfiguration}
import com.spotify.scio.parquet.{
BeamInputFile,
GcsConnectorUtil,
LineageReportingDoFn,
ParquetConfiguration
}
import com.spotify.scio.util.ScioUtil
import com.spotify.scio.util.FilenamePolicySupplier
import com.spotify.scio.values.SCollection
import magnolify.parquet.ParquetType
import org.apache.beam.sdk.io.fs.ResourceId
import org.apache.beam.sdk.transforms.SerializableFunctions
import org.apache.beam.sdk.transforms.SimpleFunction
import org.apache.beam.sdk.transforms.{ParDo, SerializableFunctions, SimpleFunction}
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration
import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO
import org.apache.beam.sdk.io.{DynamicFileDestinations, FileSystems, WriteFiles}
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider
import org.apache.beam.sdk.values.TypeDescriptor
import org.apache.beam.sdk.values.{KV, TypeDescriptor}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
import org.apache.parquet.filter2.predicate.FilterPredicate
Expand Down Expand Up @@ -119,7 +123,12 @@ final case class ParquetTypeIO[T: ClassTag: Coder: ParquetType](
true
)
)
sc.applyTransform(source).map(_.getValue)

sc.applyTransform(source)
.applyTransform(
ParDo.of(new LineageReportingDoFn[KV[JBoolean, T]](filePattern))
)
.map(_.getValue)
}

private def parquetOut(
Expand Down