Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,81 @@ private[types] object SchemaProvider {
def avroSchemaOf[T: TypeTag]: Schema =
AvroSchemaCache.get(
typeTag[T].tpe, {
val tableSchema = schemaOf[T]
// BigQueryUtils converts nested record namespaces, but not top-level namespace
val converted =
BigQueryUtils.toGenericAvroSchema(typeTag[T].tpe.toString, schemaOf[T].getFields, true)
BigQueryUtils.toGenericAvroSchema(typeTag[T].tpe.toString, tableSchema.getFields, true)
Schema.createRecord(
converted.getName,
converted.getDoc,
BeamAvroConverterNamespace,
converted.isError,
converted.getFields.asScala
.map(f => new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal()))
.map(f =>
new Schema.Field(
f.name(),
withBigQueryLogicalType(
f.schema(),
tableSchema.getFields.asScala.find(_.getName == f.name())
),
f.doc(),
f.defaultVal()
)
)
.asJava
)
}
)

/**
* Stamps BigQuery-specific logicalType properties onto Avro schemas for temporal types.
* BigQueryUtils.toGenericAvroSchema converts BQ DATE/TIME/DATETIME/TIMESTAMP to bare Avro
* primitives without logicalType annotations; BigQuery's Avro load job requires them.
*
* See: https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#avro_conversions
*/
private def withBigQueryLogicalType(
avroSchema: Schema,
bqField: Option[TableFieldSchema]
): Schema = {
// unwrap NULLABLE (["null", actual]) to get the real schema, then rewrap
val (isNullable, inner) = avroSchema.getType match {
case Schema.Type.UNION =>
val types = avroSchema.getTypes.asScala
val nonNull = types.find(_.getType != Schema.Type.NULL)
(true, nonNull.getOrElse(avroSchema))
case _ =>
(false, avroSchema)
}

val stamped = bqField.map(_.getType) match {
case Some("DATETIME") => addLogicalType(inner, "datetime")
case Some("DATE") => addLogicalType(inner, "date")
case Some("TIME") => addLogicalType(inner, "time-micros")
case Some("TIMESTAMP") => addLogicalType(inner, "timestamp-micros")
case _ => inner
}

if (isNullable && (stamped ne inner)) {
// rebuild the union with the stamped inner schema
Schema.createUnion(
avroSchema.getTypes.asScala
.map(t => if (t.getType != Schema.Type.NULL) stamped else t)
.asJava
)
} else {
// non-temporal nullable field (or required temporal): preserve original schema
avroSchema
}
}

private def addLogicalType(schema: Schema, logicalType: String): Schema = {
// schema is immutable post-parse for properties, so copy via JSON round-trip
val copy = new Schema.Parser().parse(schema.toString)
copy.addProp("logicalType", logicalType: AnyRef)
copy
}

def schemaOf[T: TypeTag]: TableSchema =
TableSchemaCache.get(
typeOf[T].erasure, {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,25 @@ import com.spotify.scio.bigquery.BigQueryUtil.parseSchema
import org.apache.beam.sdk.util.SerializableUtils
import org.scalatest.matchers.should.Matchers
import org.scalatest.flatspec.AnyFlatSpec
import org.apache.avro.Schema
import org.joda.time.{Instant, LocalDate, LocalDateTime, LocalTime}
import scala.jdk.CollectionConverters._

@BigQueryType.toTable
case class RequiredTemporal(
timestampF: Instant,
dateF: LocalDate,
timeF: LocalTime,
datetimeF: LocalDateTime
)

@BigQueryType.toTable
case class NullableTemporal(
timestampF: Option[Instant],
dateF: Option[LocalDate],
timeF: Option[LocalTime],
datetimeF: Option[LocalDateTime]
)

class SchemaProviderTest extends AnyFlatSpec with Matchers {
import Schemas._
Expand Down Expand Up @@ -114,4 +133,35 @@ class SchemaProviderTest extends AnyFlatSpec with Matchers {
it should "ignore methods in case classes" in {
SchemaProvider.schemaOf[CaseClassWithMethods] shouldBe parseSchema(recordFields("REQUIRED"))
}

it should "stamp logicalType on required temporal Avro fields" in {
val avroSchema = BigQueryType.avroSchemaOf[RequiredTemporal]

def logicalType(fieldName: String): String =
avroSchema.getField(fieldName).schema().getProp("logicalType")

logicalType("timestampF") shouldBe "timestamp-micros"
logicalType("dateF") shouldBe "date"
logicalType("timeF") shouldBe "time-micros"
logicalType("datetimeF") shouldBe "datetime"
}

it should "stamp logicalType on nullable temporal Avro fields" in {
val avroSchema = BigQueryType.avroSchemaOf[NullableTemporal]

def logicalType(fieldName: String): String =
avroSchema
.getField(fieldName)
.schema()
.getTypes
.asScala
.find(_.getType != Schema.Type.NULL)
.map(_.getProp("logicalType"))
.orNull

logicalType("timestampF") shouldBe "timestamp-micros"
logicalType("dateF") shouldBe "date"
logicalType("timeF") shouldBe "time-micros"
logicalType("datetimeF") shouldBe "datetime"
}
}