diff --git a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala index 7c87df7aa0..ed101b3cb4 100644 --- a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala +++ b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala @@ -33,7 +33,15 @@ import org.apache.beam.sdk.Pipeline.PipelineExecutionException import org.apache.beam.sdk.PipelineResult.State import org.apache.beam.sdk.extensions.gcp.options.GcsOptions import org.apache.beam.sdk.io.FileSystems -import org.apache.beam.sdk.metrics.Counter +import org.apache.beam.sdk.metrics.{ + Counter, + Lineage, + MetricNameFilter, + MetricQueryResults, + MetricsFilter, + MetricsOptions, + MetricsSink +} import org.apache.beam.sdk.options._ import org.apache.beam.sdk.transforms._ import org.apache.beam.sdk.values._ @@ -247,6 +255,39 @@ object ContextAndArgs { } } +class CustomMetricsSink extends MetricsSink { + private[scio] val log = LoggerFactory.getLogger(this.getClass) + + override def writeMetrics(metricQueryResults: MetricQueryResults): Unit = { + log.warn("!!!!!!! WRITE METRIC stringSets = " + queryLineageStringSets(metricQueryResults)) + log.warn("!!!!!!! WRITE METRIC boundedTries = " + queryLineageBoundedTries(metricQueryResults)) + } + + private def queryLineageStringSets(results: MetricQueryResults): Set[String] = { + results.getStringSets.asScala.flatMap { metric => + Try(metric.getCommitted.getStringSet.asScala.toSet) + .orElse(Try(metric.getAttempted.getStringSet.asScala.toSet)) + .getOrElse(Set.empty[String]) + }.toSet + } + + private def queryLineageBoundedTries(results: MetricQueryResults): Set[String] = { + results.getBoundedTries.asScala.flatMap { metric => + val processResult = (result: java.util.Set[java.util.List[String]]) => + result.asScala.map { fqn => + val segments = fqn.asScala.toList + val truncated = if (segments.nonEmpty && segments.last.toBoolean) "*" else "" + segments.dropRight(1).mkString + truncated + }.toSet + + Try(processResult(metric.getCommitted.getResult)) + .orElse(Try(processResult(metric.getAttempted.getResult))) + .getOrElse(Set.empty[String]) + }.toSet + } + +} + /** * ScioExecutionContext is the result of [[ScioContext#run()]]. * @@ -378,6 +419,11 @@ object ScioContext { .as(classOf[ScioOptions]) .setAppArguments(sanitizedArgString) } + + val metricsOptions = pipelineOpts.as(classOf[MetricsOptions]) + metricsOptions.setMetricsSink(classOf[CustomMetricsSink]) + metricsOptions.setMetricsPushPeriod(5) + (pipelineOpts, args) } } diff --git a/scio-core/src/main/scala/com/spotify/scio/ScioResult.scala b/scio-core/src/main/scala/com/spotify/scio/ScioResult.scala index 21c464caef..ae8f7b3424 100644 --- a/scio-core/src/main/scala/com/spotify/scio/ScioResult.scala +++ b/scio-core/src/main/scala/com/spotify/scio/ScioResult.scala @@ -27,8 +27,8 @@ import org.apache.beam.sdk.io.fs.{ResolveOptions, ResourceId} import org.apache.beam.sdk.metrics.{DistributionResult, GaugeResult, Lineage} import org.apache.beam.sdk.util.MimeTypes import org.apache.beam.sdk.{metrics => beam, PipelineResult} - import org.joda.time.Instant +import org.slf4j.LoggerFactory import java.io.File import java.nio.ByteBuffer @@ -46,6 +46,7 @@ trait RunnerResult { /** Represent a Scio pipeline result. */ abstract class ScioResult private[scio] (val internal: PipelineResult) { + private val logger = LoggerFactory.getLogger(getClass) /** Get a Beam runner specific result. */ def as[T <: RunnerResult: ClassTag]: T = { @@ -104,16 +105,26 @@ abstract class ScioResult private[scio] (val internal: PipelineResult) { private def saveJsonFile(resourceId: ResourceId, value: Object): Unit = { val mapper = ScioUtil.getScalaJsonMapper - val out = FileSystems.create(resourceId, MimeTypes.TEXT) try { - out.write(ByteBuffer.wrap(mapper.writeValueAsBytes(value))) - } finally { - if (out != null) { - out.close() + val out = FileSystems.create(resourceId, MimeTypes.TEXT) + try { + out.write(ByteBuffer.wrap(mapper.writeValueAsBytes(value))) + } finally { + if (out != null) { + out.close() + } } + logger.info(f"Saved metrics to '$resourceId'") + } catch { + case e: Throwable => + logger.warn( + f"Failed to save metrics to '$resourceId': ${mapper.writeValueAsString(value)}", + e + ) } } + /** Get lineage metric values. */ def getBeamLineage: BeamLineage = { def asScalaCrossCompatible(set: java.util.Set[String]): Iterable[String] = { val iterator = set.iterator() diff --git a/scio-examples/src/test/scala/com/spotify/scio/examples/RunPreReleaseIT.scala b/scio-examples/src/test/scala/com/spotify/scio/examples/RunPreReleaseIT.scala index e113ff39d2..de6c8ebb34 100644 --- a/scio-examples/src/test/scala/com/spotify/scio/examples/RunPreReleaseIT.scala +++ b/scio-examples/src/test/scala/com/spotify/scio/examples/RunPreReleaseIT.scala @@ -18,7 +18,7 @@ package com.spotify.scio.examples import com.spotify.scio.util.ScioUtil -import com.spotify.scio.{ContextAndArgs, ScioContext} +import com.spotify.scio.{ContextAndArgs, ScioContext, ScioResult} import org.slf4j.LoggerFactory import scala.concurrent.duration._ @@ -33,7 +33,8 @@ object RunPreReleaseIT { "--runner=DataflowRunner", "--project=data-integration-test", "--region=us-central1", - "--tempLocation=gs://dataflow-tmp-us-central1/gha" + "--tempLocation=gs://dataflow-tmp-us-central1/gha", + "--dataflowServiceOptions=enable_lineage=true" ) private val log = LoggerFactory.getLogger(getClass) @@ -51,6 +52,33 @@ object RunPreReleaseIT { } } + private def assertLineageHasGcsItem(metrics: List[String], prefix: String, path: String): Unit = { + if (!metrics.exists(x => x.startsWith(prefix) && x.contains(path))) { + throw new Throwable(f"Expect to have '$prefix...$path...' in $metrics"); + } + } + + private def assertLineageIsPopulated( + scioResult: ScioResult + )( + prefix: String = "gcs:", + assertInput: String = null, + assertInput2: String = null, + assertOutput: String = null + ): Unit = { + if (assertInput != null) { + assertLineageHasGcsItem(scioResult.getBeamLineage.sources, prefix, assertInput) + } + + if (assertInput2 != null) { + assertLineageHasGcsItem(scioResult.getBeamLineage.sources, prefix, assertInput2) + } + + if (assertOutput != null) { + assertLineageHasGcsItem(scioResult.getBeamLineage.sinks, prefix, assertOutput) + } + } + private def avro(runId: String): Future[Unit] = { import com.spotify.scio.examples.extra.AvroExample @@ -59,10 +87,12 @@ object RunPreReleaseIT { log.info("Starting Avro write tests... ") val write = invokeJob[AvroExample.type]("--method=specificOut", s"--output=$out1") - write.flatMap { _ => - log.info("Starting Avro read tests... ") - invokeJob[AvroExample.type]("--method=specificIn", s"--input=$out1/*", s"--output=$out2") - } + write + .flatMap { _ => + log.info("Starting Avro read tests... ") + invokeJob[AvroExample.type]("--method=specificIn", s"--input=$out1/*", s"--output=$out2") + } + .map(assertLineageIsPopulated(_)(assertInput = out1, assertOutput = out2)) } private def parquet(runId: String): Future[Unit] = { @@ -77,34 +107,37 @@ object RunPreReleaseIT { log.info("Starting Parquet write tests... ") val writes = List( - invokeJob[ParquetExample.type]("--method=avroOut", s"--output=$out1"), + invokeJob[ParquetExample.type]("--method=avroOut", s"--output=$out1") + .map(assertLineageIsPopulated(_)(assertOutput = out2)), invokeJob[ParquetExample.type]("--method=exampleOut", s"--output=$out5") + .map(assertLineageIsPopulated(_)(assertOutput = out5)) ) Future.sequence(writes).flatMap { _ => log.info("Starting Parquet read tests... ") val reads = List( - invokeJob[ParquetExample.type]("--method=typedIn", s"--input=$out1/*", s"--output=$out2"), + invokeJob[ParquetExample.type]("--method=typedIn", s"--input=$out1/*", s"--output=$out2") + .map(assertLineageIsPopulated(_)(assertInput = out1, assertOutput = out2)), invokeJob[ParquetExample.type]( "--method=avroSpecificIn", s"--input=$out1/*", s"--output=$out3" - ), + ).map(assertLineageIsPopulated(_)(assertInput = out1, assertOutput = out3)), invokeJob[ParquetExample.type]( "--method=avroGenericIn", s"--input=$out1/*", s"--output=$out4" - ), + ).map(assertLineageIsPopulated(_)(assertInput = out1, assertOutput = out4)), invokeJob[ParquetExample.type]( "--method=avroGenericIn", s"--input=$out1/*", s"--output=$out4" - ), + ).map(assertLineageIsPopulated(_)(assertInput = out1, assertOutput = out4)), invokeJob[ParquetExample.type]( "--method=exampleIn", s"--input=$out5/*", s"--output=$out6" - ) + ).map(assertLineageIsPopulated(_)(assertInput = out5, assertOutput = out6)) ) Future.sequence(reads).map(_ => ()) } @@ -118,21 +151,41 @@ object RunPreReleaseIT { } val out1 = gcsPath[SortMergeBucketWriteExample.type]("users", runId) val out2 = gcsPath[SortMergeBucketWriteExample.type]("accounts", runId) + val joinPath = gcsPath[SortMergeBucketWriteExample.type]("join", runId) + val transformPath = gcsPath[SortMergeBucketWriteExample.type]("transform", runId) log.info("Starting SMB write tests... ") - val write = invokeJob[SortMergeBucketWriteExample.type](s"--users=$out1", s"--accounts=$out2") + val write = + invokeJob[SortMergeBucketWriteExample.type](s"--users=$out1", s"--accounts=$out2").map( + assertLineageIsPopulated(_)( + assertInput = out1, + assertOutput = out2 + ) + ) write.flatMap { _ => log.info("Starting SMB read tests... ") val readJobs = List( invokeJob[SortMergeBucketJoinExample.type]( s"--users=$out1", s"--accounts=$out2", - s"--output=${gcsPath[SortMergeBucketJoinExample.type]("join", runId)}" + s"--output=$joinPath" + ).map( + assertLineageIsPopulated(_)( + assertInput = out1, + assertInput2 = out2, + assertOutput = joinPath + ) ), invokeJob[SortMergeBucketTransformExample.type]( s"--users=$out1", s"--accounts=$out2", - s"--output=${gcsPath[SortMergeBucketTransformExample.type]("transform", runId)}" + s"--output=$transformPath" + ).map( + assertLineageIsPopulated(_)( + assertInput = out1, + assertInput2 = out2, + assertOutput = transformPath + ) ) ) Future.sequence(readJobs).map(_ => ()) @@ -145,15 +198,25 @@ object RunPreReleaseIT { val jobs = List( invokeJob[TypedStorageBigQueryTornadoes.type]( s"--output=data-integration-test:gha_it_us.typed_storage" + ).map( + assertLineageIsPopulated(_)( + prefix = "bigquery:", + assertOutput = "data-integration-test.gha_it_us.typed_storage" + ) ), invokeJob[TypedBigQueryTornadoes.type]( s"--output=data-integration-test:gha_it_us.typed_row" + ).map( + assertLineageIsPopulated(_)( + prefix = "bigquery:", + assertOutput = "data-integration-test.gha_it_us.typed_row" + ) ) ) Future.sequence(jobs).map(_ => ()) } - private def invokeJob[T: ClassTag](args: String*): Future[Unit] = { + private def invokeJob[T: ClassTag](args: String*): Future[ScioResult] = { val cls = ScioUtil.classOf[T] val jobObjName = cls.getName.replaceAll("\\$$", "") val pipelines = Class