From 6e71d314753b13b6a4568d3ae08b9dc03346a99f Mon Sep 17 00:00:00 2001 From: subhramit Date: Wed, 8 Apr 2026 06:22:07 +0530 Subject: [PATCH 1/6] Correlate avro schema fields with TableSchema Signed-off-by: subhramit --- .../scio/bigquery/types/SchemaProvider.scala | 64 ++++++++++++++++++- 1 file changed, 61 insertions(+), 3 deletions(-) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala index e60d33d7a3..5343200f4e 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala @@ -37,21 +37,79 @@ private[types] object SchemaProvider { def avroSchemaOf[T: TypeTag]: Schema = AvroSchemaCache.get( typeTag[T].tpe, { - // BigQueryUtils converts nested record namespaces, but not top-level namespace + val tableSchema = schemaOf[T] val converted = - BigQueryUtils.toGenericAvroSchema(typeTag[T].tpe.toString, schemaOf[T].getFields, true) + BigQueryUtils.toGenericAvroSchema(typeTag[T].tpe.toString, tableSchema.getFields, true) Schema.createRecord( converted.getName, converted.getDoc, BeamAvroConverterNamespace, converted.isError, converted.getFields.asScala - .map(f => new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal())) + .map(f => + new Schema.Field( + f.name(), + withBigQueryLogicalType( + f.schema(), + tableSchema.getFields.asScala.find(_.getName == f.name()) + ), + f.doc(), + f.defaultVal() + ) + ) .asJava ) } ) + /** + * Stamps BigQuery-specific logicalType properties onto Avro schemas for temporal types. + * BigQueryUtils.toGenericAvroSchema converts BQ DATE/TIME/DATETIME/TIMESTAMP to bare Avro + * primitives without logicalType annotations; BigQuery's Avro load job requires them. + * + * See: https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#avro_conversions + */ + private def withBigQueryLogicalType( + avroSchema: Schema, + bqField: Option[TableFieldSchema] + ): Schema = { + // unwrap NULLABLE (["null", actual]) to get the real schema, then rewrap + val (isNullable, inner) = avroSchema.getType match { + case Schema.Type.UNION => + val types = avroSchema.getTypes.asScala + val nonNull = types.find(_.getType != Schema.Type.NULL) + (true, nonNull.getOrElse(avroSchema)) + case _ => + (false, avroSchema) + } + + val stamped = bqField.map(_.getType) match { + case Some("DATETIME") => addLogicalType(inner, "datetime") + case Some("DATE") => addLogicalType(inner, "date") + case Some("TIME") => addLogicalType(inner, "time-micros") + case Some("TIMESTAMP") => addLogicalType(inner, "timestamp-micros") + case _ => inner + } + + if (isNullable && (stamped ne inner)) { + // rebuild the union with the stamped inner schema + Schema.createUnion( + avroSchema.getTypes.asScala + .map(t => if (t.getType != Schema.Type.NULL) stamped else t) + .asJava + ) + } else { + stamped + } + } + + private def addLogicalType(schema: Schema, logicalType: String): Schema = { + // schema is immutable post-parse for properties, so copy via JSON round-trip + val copy = new Schema.Parser().parse(schema.toString) + copy.addProp("logicalType", logicalType: AnyRef) + copy + } + def schemaOf[T: TypeTag]: TableSchema = TableSchemaCache.get( typeOf[T].erasure, { From b8f71154b0add151cd472e4b9d7f2011411ef9d1 Mon Sep 17 00:00:00 2001 From: subhramit Date: Wed, 8 Apr 2026 06:24:06 +0530 Subject: [PATCH 2/6] Shift old comment down Signed-off-by: subhramit --- .../scala/com/spotify/scio/bigquery/types/SchemaProvider.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala index 5343200f4e..844a036d28 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala @@ -38,6 +38,7 @@ private[types] object SchemaProvider { AvroSchemaCache.get( typeTag[T].tpe, { val tableSchema = schemaOf[T] + // BigQueryUtils converts nested record namespaces, but not top-level namespace val converted = BigQueryUtils.toGenericAvroSchema(typeTag[T].tpe.toString, tableSchema.getFields, true) Schema.createRecord( From 094c57dbe4c9e5c4d41068b200294635b648c6f0 Mon Sep 17 00:00:00 2001 From: subhramit Date: Wed, 8 Apr 2026 07:12:35 +0530 Subject: [PATCH 3/6] Add tests Signed-off-by: subhramit --- .../bigquery/types/SchemaProviderTest.scala | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala index 1bde9bfba9..0473a8b248 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala @@ -21,6 +21,25 @@ import com.spotify.scio.bigquery.BigQueryUtil.parseSchema import org.apache.beam.sdk.util.SerializableUtils import org.scalatest.matchers.should.Matchers import org.scalatest.flatspec.AnyFlatSpec +import org.apache.avro.Schema +import org.joda.time.{Instant, LocalDate, LocalDateTime, LocalTime} +import scala.jdk.CollectionConverters._ + +@BigQueryType.toTable +case class RequiredTemporal( + timestampF: Instant, + dateF: LocalDate, + timeF: LocalTime, + datetimeF: LocalDateTime + ) + +@BigQueryType.toTable +case class NullableTemporal( + timestampF: Option[Instant], + dateF: Option[LocalDate], + timeF: Option[LocalTime], + datetimeF: Option[LocalDateTime] + ) class SchemaProviderTest extends AnyFlatSpec with Matchers { import Schemas._ @@ -114,4 +133,31 @@ class SchemaProviderTest extends AnyFlatSpec with Matchers { it should "ignore methods in case classes" in { SchemaProvider.schemaOf[CaseClassWithMethods] shouldBe parseSchema(recordFields("REQUIRED")) } + + it should "stamp logicalType on required temporal Avro fields" in { + val avroSchema = BigQueryType.avroSchemaOf[RequiredTemporal] + + def logicalType(fieldName: String): String = + avroSchema.getField(fieldName).schema().getProp("logicalType") + + logicalType("timestampF") shouldBe "timestamp-micros" + logicalType("dateF") shouldBe "date" + logicalType("timeF") shouldBe "time-micros" + logicalType("datetimeF") shouldBe "datetime" + } + + it should "stamp logicalType on nullable temporal Avro fields" in { + val avroSchema = BigQueryType.avroSchemaOf[NullableTemporal] + + def logicalType(fieldName: String): String = + avroSchema.getField(fieldName).schema().getTypes.asScala + .find(_.getType != Schema.Type.NULL) + .map(_.getProp("logicalType")) + .orNull + + logicalType("timestampF") shouldBe "timestamp-micros" + logicalType("dateF") shouldBe "date" + logicalType("timeF") shouldBe "time-micros" + logicalType("datetimeF") shouldBe "datetime" + } } From 1bd6299a63b452fc04e464fe4a8ad7d2a6a516f8 Mon Sep 17 00:00:00 2001 From: subhramit Date: Wed, 8 Apr 2026 07:14:30 +0530 Subject: [PATCH 4/6] Fix existing test failures Signed-off-by: subhramit --- .../scala/com/spotify/scio/bigquery/types/SchemaProvider.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala index 844a036d28..6f2b3c0d3a 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala @@ -100,7 +100,7 @@ private[types] object SchemaProvider { .asJava ) } else { - stamped + avroSchema } } From 959527590b796632fd7ac92dc295d7844ecd9307 Mon Sep 17 00:00:00 2001 From: subhramit Date: Wed, 8 Apr 2026 07:15:51 +0530 Subject: [PATCH 5/6] Add comment Signed-off-by: subhramit --- .../scala/com/spotify/scio/bigquery/types/SchemaProvider.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala index 6f2b3c0d3a..b8e7ed788a 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala @@ -100,6 +100,7 @@ private[types] object SchemaProvider { .asJava ) } else { + // non-temporal nullable field (or required temporal): preserve original schema avroSchema } } From f483563d6e05facd84f5058a697c1fa323df6d19 Mon Sep 17 00:00:00 2001 From: Subhramit Basu Date: Wed, 8 Apr 2026 01:58:53 +0000 Subject: [PATCH 6/6] scalafmt --- .../bigquery/types/SchemaProviderTest.scala | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala index 0473a8b248..25f4859bdd 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala @@ -27,19 +27,19 @@ import scala.jdk.CollectionConverters._ @BigQueryType.toTable case class RequiredTemporal( - timestampF: Instant, - dateF: LocalDate, - timeF: LocalTime, - datetimeF: LocalDateTime - ) + timestampF: Instant, + dateF: LocalDate, + timeF: LocalTime, + datetimeF: LocalDateTime +) @BigQueryType.toTable case class NullableTemporal( - timestampF: Option[Instant], - dateF: Option[LocalDate], - timeF: Option[LocalTime], - datetimeF: Option[LocalDateTime] - ) + timestampF: Option[Instant], + dateF: Option[LocalDate], + timeF: Option[LocalTime], + datetimeF: Option[LocalDateTime] +) class SchemaProviderTest extends AnyFlatSpec with Matchers { import Schemas._ @@ -141,23 +141,27 @@ class SchemaProviderTest extends AnyFlatSpec with Matchers { avroSchema.getField(fieldName).schema().getProp("logicalType") logicalType("timestampF") shouldBe "timestamp-micros" - logicalType("dateF") shouldBe "date" - logicalType("timeF") shouldBe "time-micros" - logicalType("datetimeF") shouldBe "datetime" + logicalType("dateF") shouldBe "date" + logicalType("timeF") shouldBe "time-micros" + logicalType("datetimeF") shouldBe "datetime" } it should "stamp logicalType on nullable temporal Avro fields" in { val avroSchema = BigQueryType.avroSchemaOf[NullableTemporal] def logicalType(fieldName: String): String = - avroSchema.getField(fieldName).schema().getTypes.asScala + avroSchema + .getField(fieldName) + .schema() + .getTypes + .asScala .find(_.getType != Schema.Type.NULL) .map(_.getProp("logicalType")) .orNull logicalType("timestampF") shouldBe "timestamp-micros" - logicalType("dateF") shouldBe "date" - logicalType("timeF") shouldBe "time-micros" - logicalType("datetimeF") shouldBe "datetime" + logicalType("dateF") shouldBe "date" + logicalType("timeF") shouldBe "time-micros" + logicalType("datetimeF") shouldBe "datetime" } }