Skip to content

Commit 28c68e6

Browse files
committed
Upgrade com.google.cloud.bigdataoss libraries to 3.x
1 parent 05379a9 commit 28c68e6

File tree

3 files changed

+38
-31
lines changed

3 files changed

+38
-31
lines changed

build.sbt

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ val beamVersion = "2.66.0"
3636
val autoServiceVersion = "1.0.1"
3737
val autoValueVersion = "1.9"
3838
val avroVersion = sys.props.getOrElse("avro.version", "1.11.4")
39-
val bigdataossVersion = "2.2.26"
39+
val bigdataossVersion = "3.1.3"
4040
val bigtableClientVersion = "1.28.0"
4141
val commonsCodecVersion = "1.18.0"
4242
val commonsCompressVersion = "1.26.2"
@@ -879,9 +879,9 @@ lazy val `scio-core` = project
879879
"org.apache.beam" % s"beam-runners-spark-$sparkMajorVersion" % beamVersion % Provided,
880880
"org.apache.beam" % "beam-sdks-java-extensions-google-cloud-platform-core" % beamVersion % Provided,
881881
"org.apache.hadoop" % "hadoop-common" % hadoopVersion % Provided,
882-
"com.google.cloud.bigdataoss" % "gcs-connector" % s"hadoop2-$bigdataossVersion" % Provided,
882+
"com.google.cloud.bigdataoss" % "gcs-connector" % bigdataossVersion % Provided,
883883
"com.google.cloud.bigdataoss" % "gcsio" % bigdataossVersion % Provided,
884-
"com.google.cloud.bigdataoss" % "util-hadoop" % s"hadoop2-$bigdataossVersion" % Provided,
884+
"com.google.cloud.bigdataoss" % "util-hadoop" % bigdataossVersion % Provided,
885885
// test
886886
"com.lihaoyi" %% "fansi" % fansiVersion % Test,
887887
"com.lihaoyi" %% "pprint" % pprintVersion % Test,
@@ -1355,7 +1355,7 @@ lazy val `scio-parquet` = project
13551355
libraryDependencies ++= Seq(
13561356
// compile
13571357
"com.google.auth" % "google-auth-library-oauth2-http" % gcpBom.key.value,
1358-
"com.google.cloud.bigdataoss" % "util-hadoop" % s"hadoop2-$bigdataossVersion",
1358+
"com.google.cloud.bigdataoss" % "util-hadoop" % bigdataossVersion,
13591359
"com.google.protobuf" % "protobuf-java" % protobufVersion,
13601360
"com.spotify" %% "magnolify-parquet" % magnolifyVersion,
13611361
"com.twitter" %% "chill" % chillVersion,
@@ -1563,7 +1563,8 @@ lazy val `scio-examples` = project
15631563
"org.tensorflow" % "tensorflow-core-api" % tensorFlowVersion,
15641564
"redis.clients" % "jedis" % jedisVersion,
15651565
// runtime
1566-
"com.google.cloud.bigdataoss" % "gcs-connector" % s"hadoop2-$bigdataossVersion" % Runtime,
1566+
"com.google.cloud.bigdataoss" % "gcs-connector" % bigdataossVersion % Runtime,
1567+
"com.google.cloud.bigdataoss" % "gcsio" % bigdataossVersion % Runtime,
15671568
"com.google.cloud.sql" % "mysql-socket-factory-connector-j-8" % "1.25.2" % Runtime,
15681569
// test
15691570
"org.scalacheck" %% "scalacheck" % scalacheckVersion % Test

scio-core/src/main/scala/com/spotify/scio/ScioContext.scala

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ import scala.util.control.NoStackTrace
5353
import scala.util.{Failure, Success, Try}
5454
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
5555

56+
import java.util.concurrent.TimeUnit
57+
5658
/** Runner specific context. */
5759
trait RunnerContext {
5860
def prepareOptions(options: PipelineOptions, artifacts: List[String]): Unit
@@ -472,12 +474,12 @@ class ScioContext private[scio] (
472474
.pipe(o =>
473475
Option(config.get(GfsConfig.GCS_INPUT_STREAM_FAST_FAIL_ON_NOT_FOUND_ENABLE.getKey))
474476
.map(_.toBoolean)
475-
.fold(o)(o.setFastFailOnNotFound)
477+
.fold(o)(o.setFastFailOnNotFoundEnabled)
476478
)
477479
.pipe(o =>
478480
Option(config.get(GfsConfig.GCS_INPUT_STREAM_SUPPORT_GZIP_ENCODING_ENABLE.getKey))
479481
.map(_.toBoolean)
480-
.fold(o)(o.setSupportGzipEncoding)
482+
.fold(o)(o.setGzipEncodingSupportEnabled)
481483
)
482484
.pipe(o =>
483485
Option(config.get(GfsConfig.GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT.getKey))
@@ -491,7 +493,7 @@ class ScioContext private[scio] (
491493
)
492494
.pipe(o =>
493495
Option(config.get(GfsConfig.GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE.getKey))
494-
.map(_.toInt)
496+
.map(_.toLong)
495497
.fold(o)(o.setMinRangeRequestSize)
496498
)
497499
.pipe(o =>
@@ -500,40 +502,41 @@ class ScioContext private[scio] (
500502
.fold(o)(o.setGrpcChecksumsEnabled)
501503
)
502504
.pipe(o =>
503-
Option(config.get(GfsConfig.GCS_GRPC_READ_TIMEOUT_MS.getKey))
504-
.map(_.toLong)
505-
.fold(o)(o.setGrpcReadTimeoutMillis)
505+
Option(config.get(GfsConfig.GCS_GRPC_READ_TIMEOUT.getKey))
506+
.map(v =>
507+
java.time.Duration.ofMillis(
508+
config.getTimeDurationHelper(
509+
GfsConfig.GCS_GRPC_READ_TIMEOUT.getKey,
510+
v,
511+
TimeUnit.MILLISECONDS
512+
)
513+
)
514+
)
515+
.fold(o)(o.setGrpcReadTimeout)
506516
)
507517
.pipe(o =>
508-
Option(config.get(GfsConfig.GCS_GRPC_READ_MESSAGE_TIMEOUT_MS.getKey))
509-
.map(_.toLong)
510-
.fold(o)(o.setGrpcReadMessageTimeoutMillis)
511-
)
512-
.pipe(o =>
513-
Option(config.get(GfsConfig.GCS_GRPC_READ_METADATA_TIMEOUT_MS.getKey))
514-
.map(_.toLong)
515-
.fold(o)(o.setGrpcReadMetadataTimeoutMillis)
518+
Option(config.get(GfsConfig.GCS_GRPC_READ_MESSAGE_TIMEOUT.getKey))
519+
.map(v =>
520+
java.time.Duration.ofMillis(
521+
config.getTimeDurationHelper(
522+
GfsConfig.GCS_GRPC_READ_TIMEOUT.getKey,
523+
v,
524+
TimeUnit.MILLISECONDS
525+
)
526+
)
527+
)
528+
.fold(o)(o.setGrpcReadMessageTimeout)
516529
)
517530
.pipe(o =>
518531
Option(config.get(GfsConfig.GCS_GRPC_READ_ZEROCOPY_ENABLE.getKey))
519532
.map(_.toBoolean)
520533
.fold(o)(o.setGrpcReadZeroCopyEnabled)
521534
)
522-
.pipe(o =>
523-
Option(config.get(GfsConfig.GCS_TRACE_LOG_ENABLE.getKey))
524-
.map(_.toBoolean)
525-
.fold(o)(o.setTraceLogEnabled)
526-
)
527-
.pipe(o =>
528-
Option(config.get(GfsConfig.GCS_TRACE_LOG_TIME_THRESHOLD_MS.getKey))
529-
.map(_.toLong)
530-
.fold(o)(o.setTraceLogTimeThreshold)
531-
)
532535
.build()
533536
)
534537
} catch {
535538
// Hadoop and/or gcs-connector is excluded from classpath, do not try to set options
536-
case _: NoClassDefFoundError | _: NoSuchMethodException =>
539+
case _: LinkageError =>
537540
}
538541
}
539542

scio-parquet/src/main/scala/com/spotify/scio/parquet/GcsConnectorUtil.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,10 @@ class ApplicationDefaultTokenProvider() extends AccessTokenProvider {
106106

107107
override def getAccessToken: AccessTokenProvider.AccessToken = {
108108
val gToken = Option(adc.getAccessToken).getOrElse { adc.refresh(); adc.getAccessToken }
109-
new AccessTokenProvider.AccessToken(gToken.getTokenValue, gToken.getExpirationTime.getTime)
109+
new AccessTokenProvider.AccessToken(
110+
gToken.getTokenValue,
111+
gToken.getExpirationTime.toInstant
112+
)
110113
}
111114
override def refresh(): Unit = adc.refresh()
112115
override def setConf(c: Configuration): Unit = conf = Some(c)

0 commit comments

Comments
 (0)