From 79fe037fea95b29f412fb213e38b18997493358b Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Wed, 2 Jul 2025 18:44:27 -0400 Subject: [PATCH 1/6] Configure metricsLocation from env var SCIO_LINEAGE_LOCATION --- .../scala/com/spotify/scio/ScioContext.scala | 5 +++- .../scala/com/spotify/scio/ScioResult.scala | 27 ++++++++++++++----- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala index 7c87df7aa0..53522c6882 100644 --- a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala +++ b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala @@ -700,7 +700,10 @@ class ScioContext private[scio] ( new ScioResult(pipelineResult) { Option(sc.optionsAs[ScioOptions].getMetricsLocation).foreach(saveMetrics) - Option(sc.optionsAs[ScioOptions].getLineageLocation).foreach(saveLineage) + List( + Option(sc.optionsAs[ScioOptions].getLineageLocation), + sys.env.get("SCIO_LINEAGE_LOCATION") + ).flatten.foreach(saveLineage) override def getMetrics: Metrics = Metrics( diff --git a/scio-core/src/main/scala/com/spotify/scio/ScioResult.scala b/scio-core/src/main/scala/com/spotify/scio/ScioResult.scala index 21c464caef..87042957d2 100644 --- a/scio-core/src/main/scala/com/spotify/scio/ScioResult.scala +++ b/scio-core/src/main/scala/com/spotify/scio/ScioResult.scala @@ -27,8 +27,8 @@ import org.apache.beam.sdk.io.fs.{ResolveOptions, ResourceId} import org.apache.beam.sdk.metrics.{DistributionResult, GaugeResult, Lineage} import org.apache.beam.sdk.util.MimeTypes import org.apache.beam.sdk.{metrics => beam, PipelineResult} - import org.joda.time.Instant +import org.slf4j.LoggerFactory import java.io.File import java.nio.ByteBuffer @@ -46,6 +46,7 @@ trait RunnerResult { /** Represent a Scio pipeline result. */ abstract class ScioResult private[scio] (val internal: PipelineResult) { + private val logger = LoggerFactory.getLogger(this.getClass) /** Get a Beam runner specific result. */ def as[T <: RunnerResult: ClassTag]: T = { @@ -74,6 +75,7 @@ abstract class ScioResult private[scio] (val internal: PipelineResult) { val isDirectory = path.endsWith(File.separator) saveJsonFile( getResourceId(path, enforceNewFile = isDirectory, newFilePrefix = "metrics"), + path, getMetrics ) } @@ -83,6 +85,7 @@ abstract class ScioResult private[scio] (val internal: PipelineResult) { val isDirectory = path.endsWith(File.separator) saveJsonFile( getResourceId(path, enforceNewFile = isDirectory, newFilePrefix = "lineage"), + path, getBeamLineage ) } @@ -102,18 +105,28 @@ abstract class ScioResult private[scio] (val internal: PipelineResult) { } } - private def saveJsonFile(resourceId: ResourceId, value: Object): Unit = { + private def saveJsonFile(resourceId: => ResourceId, path: String, value: Object): Unit = { val mapper = ScioUtil.getScalaJsonMapper - val out = FileSystems.create(resourceId, MimeTypes.TEXT) try { - out.write(ByteBuffer.wrap(mapper.writeValueAsBytes(value))) - } finally { - if (out != null) { - out.close() + val out = FileSystems.create(resourceId, MimeTypes.TEXT) + try { + out.write(ByteBuffer.wrap(mapper.writeValueAsBytes(value))) + } finally { + if (out != null) { + out.close() + } } + logger.info(f"Saved metrics to '$path'") + } catch { + case e: Throwable => + logger.warn( + f"Failed to save metrics to '$path': ${mapper.writeValueAsString(value)}", + e + ) } } + /** Get lineage metric values. */ def getBeamLineage: BeamLineage = { def asScalaCrossCompatible(set: java.util.Set[String]): Iterable[String] = { val iterator = set.iterator() From 8021bdce6270f2c5b85f2f52e8428b9ee0dfde6e Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Wed, 9 Jul 2025 17:47:56 -0400 Subject: [PATCH 2/6] Remove env var getter --- scio-core/src/main/scala/com/spotify/scio/ScioContext.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala index 53522c6882..7c87df7aa0 100644 --- a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala +++ b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala @@ -700,10 +700,7 @@ class ScioContext private[scio] ( new ScioResult(pipelineResult) { Option(sc.optionsAs[ScioOptions].getMetricsLocation).foreach(saveMetrics) - List( - Option(sc.optionsAs[ScioOptions].getLineageLocation), - sys.env.get("SCIO_LINEAGE_LOCATION") - ).flatten.foreach(saveLineage) + Option(sc.optionsAs[ScioOptions].getLineageLocation).foreach(saveLineage) override def getMetrics: Metrics = Metrics( From ee6b17947b93fcdf26412651cbb0a2cdeee03d7b Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Thu, 10 Jul 2025 11:28:50 -0400 Subject: [PATCH 3/6] simplify code --- .../src/main/scala/com/spotify/scio/ScioResult.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/scio-core/src/main/scala/com/spotify/scio/ScioResult.scala b/scio-core/src/main/scala/com/spotify/scio/ScioResult.scala index 87042957d2..1f94d8e830 100644 --- a/scio-core/src/main/scala/com/spotify/scio/ScioResult.scala +++ b/scio-core/src/main/scala/com/spotify/scio/ScioResult.scala @@ -75,7 +75,6 @@ abstract class ScioResult private[scio] (val internal: PipelineResult) { val isDirectory = path.endsWith(File.separator) saveJsonFile( getResourceId(path, enforceNewFile = isDirectory, newFilePrefix = "metrics"), - path, getMetrics ) } @@ -85,7 +84,6 @@ abstract class ScioResult private[scio] (val internal: PipelineResult) { val isDirectory = path.endsWith(File.separator) saveJsonFile( getResourceId(path, enforceNewFile = isDirectory, newFilePrefix = "lineage"), - path, getBeamLineage ) } @@ -105,7 +103,7 @@ abstract class ScioResult private[scio] (val internal: PipelineResult) { } } - private def saveJsonFile(resourceId: => ResourceId, path: String, value: Object): Unit = { + private def saveJsonFile(resourceId: ResourceId, value: Object): Unit = { val mapper = ScioUtil.getScalaJsonMapper try { val out = FileSystems.create(resourceId, MimeTypes.TEXT) @@ -116,11 +114,11 @@ abstract class ScioResult private[scio] (val internal: PipelineResult) { out.close() } } - logger.info(f"Saved metrics to '$path'") + logger.info(f"Saved metrics to '$resourceId'") } catch { case e: Throwable => logger.warn( - f"Failed to save metrics to '$path': ${mapper.writeValueAsString(value)}", + f"Failed to save metrics to '$resourceId': ${mapper.writeValueAsString(value)}", e ) } From 92a474efaffb33afacff2784450bff2079a7bc14 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Thu, 10 Jul 2025 11:32:21 -0400 Subject: [PATCH 4/6] Update scio-core/src/main/scala/com/spotify/scio/ScioResult.scala Co-authored-by: Claire McGinty --- scio-core/src/main/scala/com/spotify/scio/ScioResult.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scio-core/src/main/scala/com/spotify/scio/ScioResult.scala b/scio-core/src/main/scala/com/spotify/scio/ScioResult.scala index 1f94d8e830..ae8f7b3424 100644 --- a/scio-core/src/main/scala/com/spotify/scio/ScioResult.scala +++ b/scio-core/src/main/scala/com/spotify/scio/ScioResult.scala @@ -46,7 +46,7 @@ trait RunnerResult { /** Represent a Scio pipeline result. */ abstract class ScioResult private[scio] (val internal: PipelineResult) { - private val logger = LoggerFactory.getLogger(this.getClass) + private val logger = LoggerFactory.getLogger(getClass) /** Get a Beam runner specific result. */ def as[T <: RunnerResult: ClassTag]: T = { From 0ed0cbf918578f265b7e5e635f96cfb1b14d7193 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Thu, 10 Jul 2025 12:51:49 -0400 Subject: [PATCH 5/6] Assert Beam lineage in RunPreReleaseIT --- .../scio/examples/RunPreReleaseIT.scala | 95 +++++++++++++++---- 1 file changed, 79 insertions(+), 16 deletions(-) diff --git a/scio-examples/src/test/scala/com/spotify/scio/examples/RunPreReleaseIT.scala b/scio-examples/src/test/scala/com/spotify/scio/examples/RunPreReleaseIT.scala index e113ff39d2..de6c8ebb34 100644 --- a/scio-examples/src/test/scala/com/spotify/scio/examples/RunPreReleaseIT.scala +++ b/scio-examples/src/test/scala/com/spotify/scio/examples/RunPreReleaseIT.scala @@ -18,7 +18,7 @@ package com.spotify.scio.examples import com.spotify.scio.util.ScioUtil -import com.spotify.scio.{ContextAndArgs, ScioContext} +import com.spotify.scio.{ContextAndArgs, ScioContext, ScioResult} import org.slf4j.LoggerFactory import scala.concurrent.duration._ @@ -33,7 +33,8 @@ object RunPreReleaseIT { "--runner=DataflowRunner", "--project=data-integration-test", "--region=us-central1", - "--tempLocation=gs://dataflow-tmp-us-central1/gha" + "--tempLocation=gs://dataflow-tmp-us-central1/gha", + "--dataflowServiceOptions=enable_lineage=true" ) private val log = LoggerFactory.getLogger(getClass) @@ -51,6 +52,33 @@ object RunPreReleaseIT { } } + private def assertLineageHasGcsItem(metrics: List[String], prefix: String, path: String): Unit = { + if (!metrics.exists(x => x.startsWith(prefix) && x.contains(path))) { + throw new Throwable(f"Expect to have '$prefix...$path...' in $metrics"); + } + } + + private def assertLineageIsPopulated( + scioResult: ScioResult + )( + prefix: String = "gcs:", + assertInput: String = null, + assertInput2: String = null, + assertOutput: String = null + ): Unit = { + if (assertInput != null) { + assertLineageHasGcsItem(scioResult.getBeamLineage.sources, prefix, assertInput) + } + + if (assertInput2 != null) { + assertLineageHasGcsItem(scioResult.getBeamLineage.sources, prefix, assertInput2) + } + + if (assertOutput != null) { + assertLineageHasGcsItem(scioResult.getBeamLineage.sinks, prefix, assertOutput) + } + } + private def avro(runId: String): Future[Unit] = { import com.spotify.scio.examples.extra.AvroExample @@ -59,10 +87,12 @@ object RunPreReleaseIT { log.info("Starting Avro write tests... ") val write = invokeJob[AvroExample.type]("--method=specificOut", s"--output=$out1") - write.flatMap { _ => - log.info("Starting Avro read tests... ") - invokeJob[AvroExample.type]("--method=specificIn", s"--input=$out1/*", s"--output=$out2") - } + write + .flatMap { _ => + log.info("Starting Avro read tests... ") + invokeJob[AvroExample.type]("--method=specificIn", s"--input=$out1/*", s"--output=$out2") + } + .map(assertLineageIsPopulated(_)(assertInput = out1, assertOutput = out2)) } private def parquet(runId: String): Future[Unit] = { @@ -77,34 +107,37 @@ object RunPreReleaseIT { log.info("Starting Parquet write tests... ") val writes = List( - invokeJob[ParquetExample.type]("--method=avroOut", s"--output=$out1"), + invokeJob[ParquetExample.type]("--method=avroOut", s"--output=$out1") + .map(assertLineageIsPopulated(_)(assertOutput = out2)), invokeJob[ParquetExample.type]("--method=exampleOut", s"--output=$out5") + .map(assertLineageIsPopulated(_)(assertOutput = out5)) ) Future.sequence(writes).flatMap { _ => log.info("Starting Parquet read tests... ") val reads = List( - invokeJob[ParquetExample.type]("--method=typedIn", s"--input=$out1/*", s"--output=$out2"), + invokeJob[ParquetExample.type]("--method=typedIn", s"--input=$out1/*", s"--output=$out2") + .map(assertLineageIsPopulated(_)(assertInput = out1, assertOutput = out2)), invokeJob[ParquetExample.type]( "--method=avroSpecificIn", s"--input=$out1/*", s"--output=$out3" - ), + ).map(assertLineageIsPopulated(_)(assertInput = out1, assertOutput = out3)), invokeJob[ParquetExample.type]( "--method=avroGenericIn", s"--input=$out1/*", s"--output=$out4" - ), + ).map(assertLineageIsPopulated(_)(assertInput = out1, assertOutput = out4)), invokeJob[ParquetExample.type]( "--method=avroGenericIn", s"--input=$out1/*", s"--output=$out4" - ), + ).map(assertLineageIsPopulated(_)(assertInput = out1, assertOutput = out4)), invokeJob[ParquetExample.type]( "--method=exampleIn", s"--input=$out5/*", s"--output=$out6" - ) + ).map(assertLineageIsPopulated(_)(assertInput = out5, assertOutput = out6)) ) Future.sequence(reads).map(_ => ()) } @@ -118,21 +151,41 @@ object RunPreReleaseIT { } val out1 = gcsPath[SortMergeBucketWriteExample.type]("users", runId) val out2 = gcsPath[SortMergeBucketWriteExample.type]("accounts", runId) + val joinPath = gcsPath[SortMergeBucketWriteExample.type]("join", runId) + val transformPath = gcsPath[SortMergeBucketWriteExample.type]("transform", runId) log.info("Starting SMB write tests... ") - val write = invokeJob[SortMergeBucketWriteExample.type](s"--users=$out1", s"--accounts=$out2") + val write = + invokeJob[SortMergeBucketWriteExample.type](s"--users=$out1", s"--accounts=$out2").map( + assertLineageIsPopulated(_)( + assertInput = out1, + assertOutput = out2 + ) + ) write.flatMap { _ => log.info("Starting SMB read tests... ") val readJobs = List( invokeJob[SortMergeBucketJoinExample.type]( s"--users=$out1", s"--accounts=$out2", - s"--output=${gcsPath[SortMergeBucketJoinExample.type]("join", runId)}" + s"--output=$joinPath" + ).map( + assertLineageIsPopulated(_)( + assertInput = out1, + assertInput2 = out2, + assertOutput = joinPath + ) ), invokeJob[SortMergeBucketTransformExample.type]( s"--users=$out1", s"--accounts=$out2", - s"--output=${gcsPath[SortMergeBucketTransformExample.type]("transform", runId)}" + s"--output=$transformPath" + ).map( + assertLineageIsPopulated(_)( + assertInput = out1, + assertInput2 = out2, + assertOutput = transformPath + ) ) ) Future.sequence(readJobs).map(_ => ()) @@ -145,15 +198,25 @@ object RunPreReleaseIT { val jobs = List( invokeJob[TypedStorageBigQueryTornadoes.type]( s"--output=data-integration-test:gha_it_us.typed_storage" + ).map( + assertLineageIsPopulated(_)( + prefix = "bigquery:", + assertOutput = "data-integration-test.gha_it_us.typed_storage" + ) ), invokeJob[TypedBigQueryTornadoes.type]( s"--output=data-integration-test:gha_it_us.typed_row" + ).map( + assertLineageIsPopulated(_)( + prefix = "bigquery:", + assertOutput = "data-integration-test.gha_it_us.typed_row" + ) ) ) Future.sequence(jobs).map(_ => ()) } - private def invokeJob[T: ClassTag](args: String*): Future[Unit] = { + private def invokeJob[T: ClassTag](args: String*): Future[ScioResult] = { val cls = ScioUtil.classOf[T] val jobObjName = cls.getName.replaceAll("\\$$", "") val pipelines = Class From 8a6dd183e96935aa07c4c5c94b212200d7f3c011 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Thu, 10 Jul 2025 14:10:34 -0400 Subject: [PATCH 6/6] Add experimental metrics sink --- .../scala/com/spotify/scio/ScioContext.scala | 48 ++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala index 7c87df7aa0..ed101b3cb4 100644 --- a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala +++ b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala @@ -33,7 +33,15 @@ import org.apache.beam.sdk.Pipeline.PipelineExecutionException import org.apache.beam.sdk.PipelineResult.State import org.apache.beam.sdk.extensions.gcp.options.GcsOptions import org.apache.beam.sdk.io.FileSystems -import org.apache.beam.sdk.metrics.Counter +import org.apache.beam.sdk.metrics.{ + Counter, + Lineage, + MetricNameFilter, + MetricQueryResults, + MetricsFilter, + MetricsOptions, + MetricsSink +} import org.apache.beam.sdk.options._ import org.apache.beam.sdk.transforms._ import org.apache.beam.sdk.values._ @@ -247,6 +255,39 @@ object ContextAndArgs { } } +class CustomMetricsSink extends MetricsSink { + private[scio] val log = LoggerFactory.getLogger(this.getClass) + + override def writeMetrics(metricQueryResults: MetricQueryResults): Unit = { + log.warn("!!!!!!! WRITE METRIC stringSets = " + queryLineageStringSets(metricQueryResults)) + log.warn("!!!!!!! WRITE METRIC boundedTries = " + queryLineageBoundedTries(metricQueryResults)) + } + + private def queryLineageStringSets(results: MetricQueryResults): Set[String] = { + results.getStringSets.asScala.flatMap { metric => + Try(metric.getCommitted.getStringSet.asScala.toSet) + .orElse(Try(metric.getAttempted.getStringSet.asScala.toSet)) + .getOrElse(Set.empty[String]) + }.toSet + } + + private def queryLineageBoundedTries(results: MetricQueryResults): Set[String] = { + results.getBoundedTries.asScala.flatMap { metric => + val processResult = (result: java.util.Set[java.util.List[String]]) => + result.asScala.map { fqn => + val segments = fqn.asScala.toList + val truncated = if (segments.nonEmpty && segments.last.toBoolean) "*" else "" + segments.dropRight(1).mkString + truncated + }.toSet + + Try(processResult(metric.getCommitted.getResult)) + .orElse(Try(processResult(metric.getAttempted.getResult))) + .getOrElse(Set.empty[String]) + }.toSet + } + +} + /** * ScioExecutionContext is the result of [[ScioContext#run()]]. * @@ -378,6 +419,11 @@ object ScioContext { .as(classOf[ScioOptions]) .setAppArguments(sanitizedArgString) } + + val metricsOptions = pipelineOpts.as(classOf[MetricsOptions]) + metricsOptions.setMetricsSink(classOf[CustomMetricsSink]) + metricsOptions.setMetricsPushPeriod(5) + (pipelineOpts, args) } }