Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion scio-core/src/main/scala/com/spotify/scio/ScioContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,10 @@ class ScioContext private[scio] (

new ScioResult(pipelineResult) {
Option(sc.optionsAs[ScioOptions].getMetricsLocation).foreach(saveMetrics)
Option(sc.optionsAs[ScioOptions].getLineageLocation).foreach(saveLineage)
List(
Option(sc.optionsAs[ScioOptions].getLineageLocation),
sys.env.get("SCIO_LINEAGE_LOCATION")
Comment thread
shnapz marked this conversation as resolved.
Outdated
).flatten.foreach(saveLineage)

override def getMetrics: Metrics =
Metrics(
Expand Down
27 changes: 20 additions & 7 deletions scio-core/src/main/scala/com/spotify/scio/ScioResult.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import org.apache.beam.sdk.io.fs.{ResolveOptions, ResourceId}
import org.apache.beam.sdk.metrics.{DistributionResult, GaugeResult, Lineage}
import org.apache.beam.sdk.util.MimeTypes
import org.apache.beam.sdk.{metrics => beam, PipelineResult}

import org.joda.time.Instant
import org.slf4j.LoggerFactory

import java.io.File
import java.nio.ByteBuffer
Expand All @@ -46,6 +46,7 @@ trait RunnerResult {

/** Represent a Scio pipeline result. */
abstract class ScioResult private[scio] (val internal: PipelineResult) {
private val logger = LoggerFactory.getLogger(this.getClass)
Comment thread
shnapz marked this conversation as resolved.
Outdated

/** Get a Beam runner specific result. */
def as[T <: RunnerResult: ClassTag]: T = {
Expand Down Expand Up @@ -74,6 +75,7 @@ abstract class ScioResult private[scio] (val internal: PipelineResult) {
val isDirectory = path.endsWith(File.separator)
saveJsonFile(
getResourceId(path, enforceNewFile = isDirectory, newFilePrefix = "metrics"),
path,
getMetrics
)
}
Expand All @@ -83,6 +85,7 @@ abstract class ScioResult private[scio] (val internal: PipelineResult) {
val isDirectory = path.endsWith(File.separator)
saveJsonFile(
getResourceId(path, enforceNewFile = isDirectory, newFilePrefix = "lineage"),
path,
getBeamLineage
)
}
Expand All @@ -102,18 +105,28 @@ abstract class ScioResult private[scio] (val internal: PipelineResult) {
}
}

private def saveJsonFile(resourceId: ResourceId, value: Object): Unit = {
private def saveJsonFile(resourceId: => ResourceId, path: String, value: Object): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, where is path getting used in this method outside of the logging? can we just log the resourceID directly?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem is that creation of resourceId can fail if bucket does not exist

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's ok; if we do:

      logger.info(f"Saved metrics to '$resourceId'")
    } catch {
      case e: Throwable =>
        logger.warn(
          f"Failed to save metrics: ${mapper.writeValueAsString(value)}",
          e
        )

in the success case, the resourceID exists so it's safe to log, and in the failure case, the error msg would include the path anyway, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I re-tested it and resourceID is safely created even if bucket does not exist! So I have updated the code and replaced path with resourceId!

val mapper = ScioUtil.getScalaJsonMapper
val out = FileSystems.create(resourceId, MimeTypes.TEXT)
try {
out.write(ByteBuffer.wrap(mapper.writeValueAsBytes(value)))
} finally {
if (out != null) {
out.close()
val out = FileSystems.create(resourceId, MimeTypes.TEXT)
try {
out.write(ByteBuffer.wrap(mapper.writeValueAsBytes(value)))
} finally {
if (out != null) {
out.close()
}
}
logger.info(f"Saved metrics to '$path'")
} catch {
case e: Throwable =>
logger.warn(
f"Failed to save metrics to '$path': ${mapper.writeValueAsString(value)}",
e
)
}
}

/** Get lineage metric values. */
def getBeamLineage: BeamLineage = {
def asScalaCrossCompatible(set: java.util.Set[String]): Iterable[String] = {
val iterator = set.iterator()
Expand Down