diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala index e60d33d7a3..b8e7ed788a 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala @@ -37,21 +37,81 @@ private[types] object SchemaProvider { def avroSchemaOf[T: TypeTag]: Schema = AvroSchemaCache.get( typeTag[T].tpe, { + val tableSchema = schemaOf[T] // BigQueryUtils converts nested record namespaces, but not top-level namespace val converted = - BigQueryUtils.toGenericAvroSchema(typeTag[T].tpe.toString, schemaOf[T].getFields, true) + BigQueryUtils.toGenericAvroSchema(typeTag[T].tpe.toString, tableSchema.getFields, true) Schema.createRecord( converted.getName, converted.getDoc, BeamAvroConverterNamespace, converted.isError, converted.getFields.asScala - .map(f => new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal())) + .map(f => + new Schema.Field( + f.name(), + withBigQueryLogicalType( + f.schema(), + tableSchema.getFields.asScala.find(_.getName == f.name()) + ), + f.doc(), + f.defaultVal() + ) + ) .asJava ) } ) + /** + * Stamps BigQuery-specific logicalType properties onto Avro schemas for temporal types. + * BigQueryUtils.toGenericAvroSchema converts BQ DATE/TIME/DATETIME/TIMESTAMP to bare Avro + * primitives without logicalType annotations; BigQuery's Avro load job requires them. + * + * See: https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#avro_conversions + */ + private def withBigQueryLogicalType( + avroSchema: Schema, + bqField: Option[TableFieldSchema] + ): Schema = { + // unwrap NULLABLE (["null", actual]) to get the real schema, then rewrap + val (isNullable, inner) = avroSchema.getType match { + case Schema.Type.UNION => + val types = avroSchema.getTypes.asScala + val nonNull = types.find(_.getType != Schema.Type.NULL) + (true, nonNull.getOrElse(avroSchema)) + case _ => + (false, avroSchema) + } + + val stamped = bqField.map(_.getType) match { + case Some("DATETIME") => addLogicalType(inner, "datetime") + case Some("DATE") => addLogicalType(inner, "date") + case Some("TIME") => addLogicalType(inner, "time-micros") + case Some("TIMESTAMP") => addLogicalType(inner, "timestamp-micros") + case _ => inner + } + + if (isNullable && (stamped ne inner)) { + // rebuild the union with the stamped inner schema + Schema.createUnion( + avroSchema.getTypes.asScala + .map(t => if (t.getType != Schema.Type.NULL) stamped else t) + .asJava + ) + } else { + // non-temporal nullable field (or required temporal): preserve original schema + avroSchema + } + } + + private def addLogicalType(schema: Schema, logicalType: String): Schema = { + // schema is immutable post-parse for properties, so copy via JSON round-trip + val copy = new Schema.Parser().parse(schema.toString) + copy.addProp("logicalType", logicalType: AnyRef) + copy + } + def schemaOf[T: TypeTag]: TableSchema = TableSchemaCache.get( typeOf[T].erasure, { diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala index 1bde9bfba9..25f4859bdd 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala @@ -21,6 +21,25 @@ import com.spotify.scio.bigquery.BigQueryUtil.parseSchema import org.apache.beam.sdk.util.SerializableUtils import org.scalatest.matchers.should.Matchers import org.scalatest.flatspec.AnyFlatSpec +import org.apache.avro.Schema +import org.joda.time.{Instant, LocalDate, LocalDateTime, LocalTime} +import scala.jdk.CollectionConverters._ + +@BigQueryType.toTable +case class RequiredTemporal( + timestampF: Instant, + dateF: LocalDate, + timeF: LocalTime, + datetimeF: LocalDateTime +) + +@BigQueryType.toTable +case class NullableTemporal( + timestampF: Option[Instant], + dateF: Option[LocalDate], + timeF: Option[LocalTime], + datetimeF: Option[LocalDateTime] +) class SchemaProviderTest extends AnyFlatSpec with Matchers { import Schemas._ @@ -114,4 +133,35 @@ class SchemaProviderTest extends AnyFlatSpec with Matchers { it should "ignore methods in case classes" in { SchemaProvider.schemaOf[CaseClassWithMethods] shouldBe parseSchema(recordFields("REQUIRED")) } + + it should "stamp logicalType on required temporal Avro fields" in { + val avroSchema = BigQueryType.avroSchemaOf[RequiredTemporal] + + def logicalType(fieldName: String): String = + avroSchema.getField(fieldName).schema().getProp("logicalType") + + logicalType("timestampF") shouldBe "timestamp-micros" + logicalType("dateF") shouldBe "date" + logicalType("timeF") shouldBe "time-micros" + logicalType("datetimeF") shouldBe "datetime" + } + + it should "stamp logicalType on nullable temporal Avro fields" in { + val avroSchema = BigQueryType.avroSchemaOf[NullableTemporal] + + def logicalType(fieldName: String): String = + avroSchema + .getField(fieldName) + .schema() + .getTypes + .asScala + .find(_.getType != Schema.Type.NULL) + .map(_.getProp("logicalType")) + .orNull + + logicalType("timestampF") shouldBe "timestamp-micros" + logicalType("dateF") shouldBe "date" + logicalType("timeF") shouldBe "time-micros" + logicalType("datetimeF") shouldBe "datetime" + } }