From 12d2ba93c399c0996d2656653d1172ad23187cce Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Fri, 20 Oct 2023 19:15:36 +0200 Subject: [PATCH] IT test with null and empty values --- .../scio/bigquery/PopulateTestData.scala | 107 +++++--- .../bigquery/types/BigQueryStorageIT.scala | 232 +++++++++++------- 2 files changed, 208 insertions(+), 131 deletions(-) diff --git a/integration/src/main/scala/com/spotify/scio/bigquery/PopulateTestData.scala b/integration/src/main/scala/com/spotify/scio/bigquery/PopulateTestData.scala index 1141404a3e..1ef25d6a6d 100644 --- a/integration/src/main/scala/com/spotify/scio/bigquery/PopulateTestData.scala +++ b/integration/src/main/scala/com/spotify/scio/bigquery/PopulateTestData.scala @@ -132,7 +132,6 @@ object PopulateTestData { log.info(s"Dataset $projectId:$datasetId exists.") case NonFatal(e) => e.printStackTrace() } - () } private def populatePartitionedTables(bq: BigQuery, projectId: String): Unit = { @@ -165,17 +164,13 @@ object PopulateTestData { s"Populated $projectId:samples_eu.shakespeare, $projectId:partition_a, " + s"$projectId:partition_b and $projectId:partition_c." ) - () } private def populateStorageTables(bq: BigQuery, projectId: String): Unit = { val required = (0 until 10).toList.map(newRequired) val optional = (0 until 10).toList.map(newOptional) val repeated = (0 until 10).toList.map(newRepeated) - val nested = (0 until 10).toList.map { i => - val r = Record(i.toLong, s"s$i") - Nested(r, Some(r), List(r)) - } + val nested = (0 until 10).toList.map(newNested) bq.writeTypedRows(s"$projectId:storage.required", required, WRITE_TRUNCATE) bq.writeTypedRows(s"$projectId:storage.optional", optional, WRITE_TRUNCATE) @@ -184,7 +179,6 @@ object PopulateTestData { log.info( s"Populated $projectId:storage, $projectId:optional, $projectId:repeated, and $projectId:nested." ) - () } private def newRequired(i: Int): Required = { @@ -195,8 +189,8 @@ object PopulateTestData { i.toLong, i.toDouble, BigDecimal(i), - s"s$i", - ByteString.copyFromUtf8(s"s$i"), + i.toString, + ByteString.copyFromUtf8(i.toString), t.plus(Duration.millis(i.toLong)), dt.toLocalDate.plusDays(i), dt.toLocalTime.plusMillis(i), @@ -205,36 +199,75 @@ object PopulateTestData { } private def newOptional(i: Int): Optional = { - val t = new Instant(0) - val dt = t.toDateTime(DateTimeZone.UTC) - Optional( - Some(true), - Some(i.toLong), - Some(i.toDouble), - Some(BigDecimal(i)), - Some(s"s$i"), - Some(ByteString.copyFromUtf8(s"s$i")), - Some(t.plus(Duration.millis(i.toLong))), - Some(dt.toLocalDate.plusDays(i)), - Some(dt.toLocalTime.plusMillis(i)), - Some(dt.toLocalDateTime.plusMillis(i)) - ) + if (i == 0) { + Optional( + None, + None, + None, + None, + None, + None, + None, + None, + None, + None + ) + } else { + val t = new Instant(0) + val dt = t.toDateTime(DateTimeZone.UTC) + Optional( + Some(true), + Some(i.toLong), + Some(i.toDouble), + Some(BigDecimal(i)), + Some(i.toString), + Some(ByteString.copyFromUtf8(i.toString)), + Some(t.plus(Duration.millis(i.toLong))), + Some(dt.toLocalDate.plusDays(i)), + Some(dt.toLocalTime.plusMillis(i)), + Some(dt.toLocalDateTime.plusMillis(i)) + ) + } } private def newRepeated(i: Int): Repeated = { - val t = new Instant(0) - val dt = t.toDateTime(DateTimeZone.UTC) - Repeated( - List(true), - List(i.toLong), - List(i.toDouble), - List(BigDecimal(i)), - List(s"s$i"), - List(ByteString.copyFromUtf8(s"s$i")), - List(t.plus(Duration.millis(i.toLong))), - List(dt.toLocalDate.plusDays(i)), - List(dt.toLocalTime.plusMillis(i)), - List(dt.toLocalDateTime.plusMillis(i)) - ) + if (i == 0) { + Repeated( + Nil, + Nil, + Nil, + Nil, + Nil, + Nil, + Nil, + Nil, + Nil, + Nil + ) + } else { + val t = new Instant(0) + val dt = t.toDateTime(DateTimeZone.UTC) + Repeated( + List(true), + List(i.toLong), + List(i.toDouble), + List(BigDecimal(i)), + List(i.toString), + List(ByteString.copyFromUtf8(i.toString)), + List(t.plus(Duration.millis(i.toLong))), + List(dt.toLocalDate.plusDays(i)), + List(dt.toLocalTime.plusMillis(i)), + List(dt.toLocalDateTime.plusMillis(i)) + ) + } + } + + private def newNested(i: Int): Nested = { + val required = Record(i, i.toString) + if (i == 0) { + Nested(required, None, Nil) + } else { + Nested(required, Some(required), List(required)) + } } } diff --git a/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryStorageIT.scala b/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryStorageIT.scala index 7affc8bfd6..262479381d 100644 --- a/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryStorageIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryStorageIT.scala @@ -36,22 +36,7 @@ class BigQueryStorageIT extends AnyFlatSpec with Matchers { import BigQueryStorageIT._ "fromStorage" should "work with REQUIRED fields" in { - val t = new Instant(0) - val dt = t.toDateTime(DateTimeZone.UTC) - val expected = (0 until 10).map { i => - Required( - true, - i.toLong, - i.toDouble, - BigDecimal(i), - s"s$i", - ByteString.copyFromUtf8(s"s$i"), - t.plus(Duration.millis(i.toLong)), - dt.toLocalDate.plusDays(i), - dt.toLocalTime.plusMillis(i), - dt.toLocalDateTime.plusMillis(i) - ) - }.asJava + val expected = (0 until 10).map(newRequired).asJava val (sc, _) = ContextAndArgs( Array("--project=data-integration-test", "--tempLocation=gs://data-integration-test-eu/temp") ) @@ -61,22 +46,7 @@ class BigQueryStorageIT extends AnyFlatSpec with Matchers { } it should "work with OPTIONAL fields" in { - val t = new Instant(0) - val dt = t.toDateTime(DateTimeZone.UTC) - val expected = (0 until 10).map { i => - Optional( - Some(true), - Some(i.toLong), - Some(i.toDouble), - Some(BigDecimal(i)), - Some(s"s$i"), - Some(ByteString.copyFromUtf8(s"s$i")), - Some(t.plus(Duration.millis(i.toLong))), - Some(dt.toLocalDate.plusDays(i)), - Some(dt.toLocalTime.plusMillis(i)), - Some(dt.toLocalDateTime.plusMillis(i)) - ) - }.asJava + val expected = (0 until 10).map(newOptional).asJava val (sc, _) = ContextAndArgs( Array("--project=data-integration-test", "--tempLocation=gs://data-integration-test-eu/temp") ) @@ -86,22 +56,7 @@ class BigQueryStorageIT extends AnyFlatSpec with Matchers { } it should "work with REPEATED fields" in { - val t = new Instant(0) - val dt = t.toDateTime(DateTimeZone.UTC) - val expected = (0 until 10).map { i => - Repeated( - List(true), - List(i.toLong), - List(i.toDouble), - List(BigDecimal(i)), - List(s"s$i"), - List(ByteString.copyFromUtf8(s"s$i")), - List(t.plus(Duration.millis(i.toLong))), - List(dt.toLocalDate.plusDays(i)), - List(dt.toLocalTime.plusMillis(i)), - List(dt.toLocalDateTime.plusMillis(i)) - ) - }.asJava + val expected = (0 until 10).map(newRepeated).asJava val (sc, _) = ContextAndArgs( Array("--project=data-integration-test", "--tempLocation=gs://data-integration-test-eu/temp") ) @@ -110,62 +65,43 @@ class BigQueryStorageIT extends AnyFlatSpec with Matchers { sc.run() } - it should "work with selectedFields" in { - val expected = (0 until 10).map(i => (i.toLong, s"s$i", i.toLong)).asJava + it should "work with nested fields" in { + val expected = (0 until 10).map(newNested).asJava val (sc, _) = ContextAndArgs( Array("--project=data-integration-test", "--tempLocation=gs://data-integration-test-eu/temp") ) - val p = sc - .typedBigQuery[NestedWithFields]() - .map(r => (r.required.int, r.required.string, r.optional.get.int)) - .internal + val p = sc.typedBigQuery[Nested]().internal PAssert.that(p).containsInAnyOrder(expected) sc.run() } - it should "work with rowRestriction" in { - val expected = - (0 until 5).map(i => (i.toLong, s"s$i", i.toLong, s"s$i", i.toLong, s"s$i")).asJava + it should "work with selectedFields" in { + val expected = (0 until 10).map(newNestedWithFields).asJava val (sc, _) = ContextAndArgs( Array("--project=data-integration-test", "--tempLocation=gs://data-integration-test-eu/temp") ) - val p = sc - .typedBigQuery[NestedWithRestriction]() - .map { r => - val (req, opt, rep) = (r.required, r.optional.get, r.repeated.head) - (req.int, req.string, opt.int, opt.string, rep.int, rep.string) - } - .internal + val p = sc.typedBigQuery[NestedWithFields]().internal PAssert.that(p).containsInAnyOrder(expected) sc.run() } - it should "work with rowRestriction override" in { - val expected = - (0 until 3).map(i => (i.toLong, s"s$i", i.toLong, s"s$i", i.toLong, s"s$i")).asJava + it should "work with rowRestriction" in { + val expected = (0 until 5).map(nestedWithRestriction).asJava val (sc, _) = ContextAndArgs( Array("--project=data-integration-test", "--tempLocation=gs://data-integration-test-eu/temp") ) - val p = sc - .typedBigQueryStorage[NestedWithRestriction](rowRestriction = "required.int < 3") - .map { r => - val (req, opt, rep) = (r.required, r.optional.get, r.repeated.head) - (req.int, req.string, opt.int, opt.string, rep.int, rep.string) - } - .internal + val p = sc.typedBigQuery[NestedWithRestriction]().internal PAssert.that(p).containsInAnyOrder(expected) sc.run() } - it should "work with all options" in { - val expected = (0 until 5).map(i => (i.toLong, s"s$i", i.toLong)).asJava + it should "work with rowRestriction override" in { + val expected = (0 until 3).map(nestedWithRestriction).asJava val (sc, _) = ContextAndArgs( Array("--project=data-integration-test", "--tempLocation=gs://data-integration-test-eu/temp") ) - val p = sc - .typedBigQuery[NestedWithAll](Table.Spec(NestedWithAll.table.format("nested"))) - .map(r => (r.required.int, r.required.string, r.optional.get.int)) - .internal + val p = + sc.typedBigQueryStorage[NestedWithRestriction](rowRestriction = "required.int < 3").internal PAssert.that(p).containsInAnyOrder(expected) sc.run() } @@ -179,8 +115,8 @@ class BigQueryStorageIT extends AnyFlatSpec with Matchers { Some(i.toLong), Some(i.toDouble), Some(BigDecimal(i)), - Some(s"s$i"), - Some(ByteString.copyFromUtf8(s"s$i")), + Some(i.toString), + Some(ByteString.copyFromUtf8(i.toString)), Some(t.plus(Duration.millis(i.toLong))), Some(dt.toLocalDate.plusDays(i)), Some(dt.toLocalTime.plusMillis(i)), @@ -206,8 +142,8 @@ class BigQueryStorageIT extends AnyFlatSpec with Matchers { i.toLong, i.toDouble, BigDecimal(i), - s"s$i", - ByteString.copyFromUtf8(s"s$i"), + i.toString, + ByteString.copyFromUtf8(i.toString), t.plus(Duration.millis(i.toLong)), dt.toLocalDate.plusDays(i), dt.toLocalTime.plusMillis(i), @@ -243,8 +179,8 @@ class BigQueryStorageIT extends AnyFlatSpec with Matchers { Some(i.toLong), Some(i.toDouble), Some(BigDecimal(i)), - Some(s"s$i"), - Some(ByteString.copyFromUtf8(s"s$i")), + Some(i.toString), + Some(ByteString.copyFromUtf8(i.toString)), Some(t.plus(Duration.millis(i.toLong))), Some(dt.toLocalDate.plusDays(i)), Some(dt.toLocalTime.plusMillis(i)), @@ -293,6 +229,9 @@ object BigQueryStorageIT { @BigQueryType.fromStorage("data-integration-test:storage.repeated") class Repeated + @BigQueryType.fromStorage("data-integration-test:storage.nested") + class Nested + @BigQueryType.fromStorage( "data-integration-test:storage.nested", selectedFields = List("required", "optional.int") @@ -305,14 +244,6 @@ object BigQueryStorageIT { ) class NestedWithRestriction - @BigQueryType.fromStorage( - "data-integration-test:storage.%s", - List("nested"), - selectedFields = List("required", "optional.int"), - rowRestriction = "required.int < 5" - ) - class NestedWithAll - @BigQueryType.fromStorage("data-integration-test:partition_a.table_%s", List("$LATEST")) class StorageLatest @@ -328,4 +259,117 @@ object BigQueryStorageIT { @BigQueryType.toTable case class ToTableRequired(bool: Boolean) + + private def newRequired(i: Int): Required = { + val t = new Instant(0) + val dt = t.toDateTime(DateTimeZone.UTC) + Required( + true, + i.toLong, + i.toDouble, + BigDecimal(i), + i.toString, + ByteString.copyFromUtf8(i.toString), + t.plus(Duration.millis(i.toLong)), + dt.toLocalDate.plusDays(i), + dt.toLocalTime.plusMillis(i), + dt.toLocalDateTime.plusMillis(i) + ) + } + + private def newOptional(i: Int): Optional = { + if (i == 0) { + Optional( + None, + None, + None, + None, + None, + None, + None, + None, + None, + None + ) + } else { + val t = new Instant(0) + val dt = t.toDateTime(DateTimeZone.UTC) + Optional( + Some(true), + Some(i.toLong), + Some(i.toDouble), + Some(BigDecimal(i)), + Some(i.toString), + Some(ByteString.copyFromUtf8(i.toString)), + Some(t.plus(Duration.millis(i.toLong))), + Some(dt.toLocalDate.plusDays(i)), + Some(dt.toLocalTime.plusMillis(i)), + Some(dt.toLocalDateTime.plusMillis(i)) + ) + } + } + + private def newRepeated(i: Int): Repeated = { + if (i == 0) { + Repeated( + Nil, + Nil, + Nil, + Nil, + Nil, + Nil, + Nil, + Nil, + Nil, + Nil + ) + } else { + val t = new Instant(0) + val dt = t.toDateTime(DateTimeZone.UTC) + Repeated( + List(true), + List(i.toLong), + List(i.toDouble), + List(BigDecimal(i)), + List(i.toString), + List(ByteString.copyFromUtf8(i.toString)), + List(t.plus(Duration.millis(i.toLong))), + List(dt.toLocalDate.plusDays(i)), + List(dt.toLocalTime.plusMillis(i)), + List(dt.toLocalDateTime.plusMillis(i)) + ) + } + } + + private def newNested(i: Int): Nested = { + val required = Required$1(i, i.toString) + if (i == 0) { + Nested(required, None, Nil) + } else { + val optional = Optional$1(i, i.toString) + val repeated = Repeated$1(i, i.toString) + Nested(required, Some(optional), List(repeated)) + } + } + + private def newNestedWithFields(i: Int): NestedWithFields = { + val required = Required$2(i, i.toString) + if (i == 0) { + NestedWithFields(required, None) + } else { + val optional = Optional$2(i) + NestedWithFields(required, Some(optional)) + } + } + + private def nestedWithRestriction(i: Int): NestedWithRestriction = { + val required = Required$3(i, i.toString) + if (i == 0) { + NestedWithRestriction(required, None, Nil) + } else { + val optional = Optional$3(i, i.toString) + val repeated = Repeated$2(i, i.toString) + NestedWithRestriction(required, Some(optional), List(repeated)) + } + } }