diff --git a/build.sbt b/build.sbt index dd69361e31..73d76d8730 100644 --- a/build.sbt +++ b/build.sbt @@ -147,7 +147,7 @@ val scalatestplusVersion = s"$scalatestVersion.0" val NothingFilter: explicitdeps.ModuleFilter = { _ => false } // project -ThisBuild / tlBaseVersion := "0.14" +ThisBuild / tlBaseVersion := "0.15" ThisBuild / tlSonatypeUseLegacyHost := true ThisBuild / organization := "com.spotify" ThisBuild / organizationName := "Spotify AB" @@ -738,6 +738,10 @@ lazy val `scio-avro` = project .settings(macroSettings) .settings( description := "Scio add-on for working with Avro", + unusedCompileDependenciesFilter -= Seq( + // transitively require for parquet-avro projection macros + moduleFilter("org.apache.avro", "avro-compiler") + ).reduce(_ | _), libraryDependencies ++= Seq( // compile "com.esotericsoftware" % "kryo-shaded" % kryoVersion, @@ -746,6 +750,7 @@ lazy val `scio-avro` = project "com.twitter" % "chill-java" % chillVersion, "me.lyh" %% "protobuf-generic" % protobufGenericVersion, "org.apache.avro" % "avro" % avroVersion, + "org.apache.avro" % "avro-compiler" % avroVersion, "org.apache.beam" % "beam-sdks-java-core" % beamVersion, "org.apache.beam" % "beam-sdks-java-extensions-avro" % beamVersion, "org.apache.beam" % "beam-vendor-guava-32_1_2-jre" % beamVendorVersion, @@ -1035,8 +1040,8 @@ lazy val `scio-parquet` = project .in(file("scio-parquet")) .dependsOn( `scio-core`, + `scio-avro` % "provided", `scio-tensorflow` % "provided", - `scio-avro` % Test, `scio-test` % "test->test" ) .settings(commonSettings) @@ -1050,7 +1055,7 @@ lazy val `scio-parquet` = project javacOptions ++= Seq("-s", (sourceManaged.value / "main").toString), description := "Scio add-on for Parquet", unusedCompileDependenciesFilter -= Seq( - // required by me.lyh:parquet-avro + // required by me.lyh:parquet-avro macros moduleFilter("org.apache.avro", "avro-compiler"), // replacing log4j compile time dependency moduleFilter("org.slf4j", "log4j-over-slf4j") @@ -1061,10 +1066,7 @@ lazy val `scio-parquet` = project "com.google.cloud.bigdataoss" % "util-hadoop" % s"hadoop2-$bigdataossVersion", "com.google.protobuf" % "protobuf-java" % protobufVersion, "com.spotify" %% "magnolify-parquet" % magnolifyVersion, - "com.twitter" %% "chill" % chillVersion, "me.lyh" %% "parquet-avro" % parquetExtraVersion, - "org.apache.avro" % "avro" % avroVersion, - "org.apache.avro" % "avro-compiler" % avroVersion, "org.apache.beam" % "beam-sdks-java-core" % beamVersion, "org.apache.beam" % "beam-sdks-java-io-hadoop-common" % beamVersion, "org.apache.beam" % "beam-sdks-java-io-hadoop-format" % beamVersion, @@ -1078,6 +1080,9 @@ lazy val `scio-parquet` = project "org.slf4j" % "log4j-over-slf4j" % slf4jVersion, // log4j is excluded from hadoop "org.slf4j" % "slf4j-api" % slf4jVersion, // provided + "org.apache.avro" % "avro" % avroVersion % Provided, + "org.apache.avro" % "avro-compiler" % avroVersion % Provided, + "org.apache.beam" % "beam-sdks-java-extensions-avro" % beamVersion % Provided, "org.tensorflow" % "tensorflow-core-api" % tensorFlowVersion % Provided, // runtime "org.apache.hadoop" % "hadoop-client" % hadoopVersion % Runtime excludeAll (Exclude.metricsCore), diff --git a/scio-core/src/main/scala/com/spotify/scio/coders/instances/JavaCoders.scala b/scio-core/src/main/scala/com/spotify/scio/coders/instances/JavaCoders.scala index c845162378..b519f325e5 100644 --- a/scio-core/src/main/scala/com/spotify/scio/coders/instances/JavaCoders.scala +++ b/scio-core/src/main/scala/com/spotify/scio/coders/instances/JavaCoders.scala @@ -37,7 +37,7 @@ import scala.util.{Failure, Success} private[coders] object VoidCoder extends AtomicCoder[Void] { override def encode(value: Void, outStream: OutputStream): Unit = () - override def decode(inStream: InputStream): Void = ??? + override def decode(inStream: InputStream): Void = null override def structuralValue(value: Void): AnyRef = AnyRef } diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ParquetExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ParquetExample.scala index 4ea605a82a..5c000a3683 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ParquetExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ParquetExample.scala @@ -41,7 +41,13 @@ object ParquetExample { * These case classes represent both full and projected field mappings from the [[Account]] Avro * record. */ - case class AccountFull(id: Int, `type`: String, name: Option[String], amount: Double) + case class AccountFull( + id: Int, + `type`: String, + name: Option[String], + amount: Double, + accountStatus: Option[AccountStatus] + ) case class AccountProjection(id: Int, name: Option[String]) /** @@ -108,13 +114,11 @@ object ParquetExample { private def avroSpecificIn(sc: ScioContext, args: Args): ClosedTap[String] = { // Macros for generating column projections and row predicates - val projection = Projection[Account](_.getId, _.getName, _.getAmount) + // account_status is the only field with default value that can be left out the projection + val projection = Projection[Account](_.getId, _.getType, _.getName, _.getAmount) val predicate = Predicate[Account](x => x.getAmount > 0) sc.parquetAvroFile[Account](args("input"), projection, predicate) - // The result Account records are not complete Avro objects. Only the projected columns are present while the rest are null. - // These objects may fail serialization and it’s recommended that you map them out to tuples or case classes right after reading. - .map(x => AccountProjection(x.getId, Some(x.getName.toString))) .saveAsTextFile(args("output")) } @@ -122,7 +126,7 @@ object ParquetExample { val schema = Account.getClassSchema implicit val genericRecordCoder: Coder[GenericRecord] = avroGenericRecordCoder(schema) - val parquetIn = sc.parquetAvroFile[GenericRecord](args("input"), schema) + val parquetIn = sc.parquetAvroGenericRecordFile(args("input"), schema) // Catches a specific bug with encoding GenericRecords read by parquet-avro parquetIn @@ -146,12 +150,19 @@ object ParquetExample { // but close to `parquet.block.size`, i.e. 1 GiB. This guarantees that each file contains 1 row group only and reduces seeks. .saveAsParquetAvroFile(args("output"), numShards = 1, conf = fineTunedParquetWriterConfig) + private[extra] def toScalaFull(account: Account): AccountFull = + AccountFull( + account.getId, + account.getType.toString, + Some(account.getName.toString), + account.getAmount, + Some(account.getAccountStatus) + ) + private def typedOut(sc: ScioContext, args: Args): ClosedTap[AccountFull] = sc.parallelize(fakeData) - .map(x => AccountFull(x.getId, x.getType.toString, Some(x.getName.toString), x.getAmount)) - .saveAsTypedParquetFile( - args("output") - ) + .map(toScalaFull) + .saveAsTypedParquetFile(args("output")) private[extra] def toExample(account: Account): Example = { val amount = Feature diff --git a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/ParquetExampleTest.scala b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/ParquetExampleTest.scala index 554a3a0095..89cc90c21d 100644 --- a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/ParquetExampleTest.scala +++ b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/ParquetExampleTest.scala @@ -46,7 +46,8 @@ class ParquetExampleTest extends PipelineSpec { "ParquetExample" should "work for SpecificRecord input" in { val expected = ParquetExample.fakeData - .map(x => AccountProjection(x.getId, Some(x.getName.toString))) + // set default value on field outside projection + .map(x => Account.newBuilder(x).setAccountStatus(null).build()) .map(_.toString) JobTest[com.spotify.scio.examples.extra.ParquetExample.type] @@ -79,8 +80,7 @@ class ParquetExampleTest extends PipelineSpec { } it should "work for typed output" in { - val expected = ParquetExample.fakeData - .map(a => AccountFull(a.getId, a.getType.toString, Some(a.getName.toString), a.getAmount)) + val expected = ParquetExample.fakeData.map(ParquetExample.toScalaFull) JobTest[com.spotify.scio.examples.extra.ParquetExample.type] .args("--output=out.parquet", "--method=typedOut") diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala index c9be7bdd24..51fa88b221 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala @@ -17,60 +17,157 @@ package com.spotify.scio.parquet.avro -import java.lang.{Boolean => JBoolean} import com.spotify.scio.ScioContext +import com.spotify.scio.avro._ import com.spotify.scio.coders.{Coder, CoderMaterializer} -import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT} -import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory} +import com.spotify.scio.io._ +import com.spotify.scio.parquet.avro.ParquetAvroIO.ReadParam.{ + DefaultConfiguration, + DefaultPredicate, + DefaultProjection, + DefaultSuffix +} +import com.spotify.scio.parquet.avro.ParquetAvroIO.WriteParam._ +import com.spotify.scio.parquet.read.ParquetReadConfiguration import com.spotify.scio.parquet.{GcsConnectorUtil, ParquetConfiguration} import com.spotify.scio.testing.TestDataManager -import com.spotify.scio.util.{FilenamePolicySupplier, Functions, ScioUtil} +import com.spotify.scio.transforms._ +import com.spotify.scio.transforms.DoFnWithResource.ResourceType +import com.spotify.scio.util.{FilenamePolicySupplier, ScioUtil} import com.spotify.scio.values.SCollection -import com.twitter.chill.ClosureCleaner import org.apache.avro.Schema -import org.apache.avro.reflect.ReflectData -import org.apache.avro.specific.SpecificRecord +import org.apache.avro.generic.{GenericRecord, IndexedRecord} +import org.apache.avro.specific.{SpecificData, SpecificRecord, SpecificRecordBase} +import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory +import org.apache.beam.sdk.io.FileIO.ReadableFile import org.apache.beam.sdk.io._ -import org.apache.beam.sdk.transforms.SerializableFunctions -import org.apache.beam.sdk.transforms.SimpleFunction import org.apache.beam.sdk.io.fs.ResourceId -import org.apache.beam.sdk.io.hadoop.SerializableConfiguration import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider -import org.apache.beam.sdk.values.TypeDescriptor +import org.apache.beam.sdk.transforms.{PTransform, SerializableFunctions, SimpleFunction} +import org.apache.beam.sdk.values.{PBegin, PCollection, TypeDescriptor} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job -import org.apache.parquet.avro.{ - AvroDataSupplier, - AvroParquetInputFormat, - AvroReadSupport, - GenericDataSupplier -} +import org.apache.parquet.avro._ import org.apache.parquet.filter2.predicate.FilterPredicate import org.apache.parquet.hadoop.ParquetInputFormat import org.apache.parquet.hadoop.metadata.CompressionCodecName -import scala.reflect.{classTag, ClassTag} +import scala.jdk.CollectionConverters._ +import scala.reflect.ClassTag -final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[T] { - override type ReadP = ParquetAvroIO.ReadParam[_, T] - override type WriteP = ParquetAvroIO.WriteParam +sealed trait ParquetAvroIO[T <: IndexedRecord] extends ScioIO[T] { + import ParquetAvroIO._ + + override type ReadP = ParquetAvroIO.ReadParam[T] + override type WriteP = ParquetAvroIO.WriteParam[T] override val tapT: TapT.Aux[T, T] = TapOf[T] + override def testId: String = s"ParquetAvroIO($path)" + + def path: String + protected def schema: Schema + protected def defaultDatumFactory: AvroDatumFactory[T] + + protected def readFiles( + projection: Schema, + predicate: FilterPredicate, + configuration: Configuration + ): PTransform[PCollection[ReadableFile], PCollection[T]] + override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = { - val bCoder = CoderMaterializer.beam(sc, Coder[T]) - sc.pipeline.getCoderRegistry.registerCoderForClass(ScioUtil.classOf[T], bCoder) - params.setupConfig() - params.read(sc, path)(Coder[T]) + val conf = ParquetConfiguration.ofNullable(params.conf) + if (ParquetReadConfiguration.getUseSplittableDoFn(conf, sc.options)) { + readSplittableDoFn(sc, conf, params) + } else { + readLegacy(sc, conf, params) + } + } + + private def readSplittableDoFn( + sc: ScioContext, + conf: Configuration, + params: ReadP + ): SCollection[T] = { + val filePattern = ScioUtil.filePattern(path, params.suffix) + val datumFactory = Option(params.datumFactory).getOrElse(defaultDatumFactory) + val coder = avroCoder(datumFactory, schema) + val bCoder = CoderMaterializer.beam(sc, coder) + val transform = new PTransform[PBegin, PCollection[T]] { + override def expand(input: PBegin): PCollection[T] = { + input + .apply(FileIO.`match`().filepattern(filePattern)) + .apply(FileIO.readMatches) + .apply(readFiles(params.projection, params.predicate, conf)) + } + } + sc.applyTransform(transform).setCoder(bCoder) + } + + private def readLegacy( + sc: ScioContext, + conf: Configuration, + params: ReadP + ): SCollection[T] = { + conf.setClass("key.class", classOf[Void], classOf[Void]) + implicit val keyCoder: Coder[Void] = Coder.voidCoder + val bKeyCoder = CoderMaterializer.beam(sc, keyCoder) + + val datumFactory = Option(params.datumFactory).getOrElse(defaultDatumFactory) + val recordClass = datumFactory.getType + conf.setClass("value.class", recordClass, recordClass) + implicit val valueCoder: Coder[T] = avroCoder(datumFactory, schema) + val bValueCoder = CoderMaterializer.beam(sc, valueCoder) + + AvroReadSupport.setAvroReadSchema(conf, schema) + if (recordClass == classOf[GenericRecord]) { + conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true) + } + + Option(params.projection).foreach(p => AvroReadSupport.setRequestedProjection(conf, p)) + Option(params.predicate).foreach(p => ParquetInputFormat.setFilterPredicate(conf, p)) + + val job = Job.getInstance(conf) + val filePattern = ScioUtil.filePattern(path, params.suffix) + GcsConnectorUtil.setInputPaths(sc, job, filePattern) + job.setInputFormatClass(classOf[AvroParquetInputFormat[T]]) + val transform = HadoopFormatIO + .read[Void, T]() + // Force coders for hadoop job + .withKeyTranslation(ParquetAvroIO.Identity[Void], bKeyCoder) + .withValueTranslation(ParquetAvroIO.Identity(recordClass), bValueCoder) + .withConfiguration(job.getConfiguration) + + sc.applyTransform(transform).map(_.getValue) } override protected def readTest(sc: ScioContext, params: ReadP): SCollection[T] = { - type AvroType = params.avroClass.type + val datumFactory = Option(params.datumFactory).getOrElse(defaultDatumFactory) + implicit val coder: Coder[T] = avroCoder(datumFactory, schema) + // The projection function is not part of the test input, so it must be applied directly + val projectedFields = Option(params.projection).map(_.getFields.asScala.map(_.name()).toSet) TestDataManager - .getInput(sc.testId.get)(ParquetAvroIO[AvroType](path)(classTag, null)) + .getInput(sc.testId.get)(this) .toSCollection(sc) - .map(params.projectionFn.asInstanceOf[AvroType => T]) + .mapWithResource(dataForClass(datumFactory.getType), ResourceType.PER_INSTANCE) { + case (data, record) => + projectedFields match { + case None => record + case Some(projection) => + // beam forbids mutations. Create a new record + val copy = data.deepCopy(record.getSchema, record) + record.getSchema.getFields.asScala + .foldLeft(copy) { (c, f) => + val names = Set(f.name()) ++ f.aliases().asScala.toSet + if (projection.intersect(names).isEmpty) { + // field is not part of the projection. user default value + c.put(f.pos(), data.getDefaultValue(f)) + } + c + } + } + } } private def parquetOut( @@ -111,18 +208,15 @@ final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[ } override protected def write(data: SCollection[T], params: WriteP): Tap[T] = { - val avroClass = ScioUtil.classOf[T] - val isSpecific: Boolean = classOf[SpecificRecord] isAssignableFrom avroClass - val writerSchema = if (isSpecific) ReflectData.get().getSchema(avroClass) else params.schema - + val conf = ParquetConfiguration.ofNullable(params.conf) data.applyInternal( parquetOut( path, - writerSchema, + schema, params.suffix, params.numShards, params.compression, - ParquetConfiguration.ofNullable(params.conf), + conf, params.filenamePolicySupplier, params.prefix, params.shardNameTemplate, @@ -133,124 +227,64 @@ final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[ ) tap(ParquetAvroIO.ReadParam(params)) } - - override def tap(params: ReadP): Tap[T] = - ParquetAvroTap(path, params) } object ParquetAvroIO { + + // SpecificData.getForClass is only available for 1.9+ + private def dataForClass[T](recordClass: Class[T]) = { + if (classOf[SpecificRecordBase].isAssignableFrom(recordClass)) { + val classModelField = recordClass.getDeclaredField("MODEL$") + classModelField.setAccessible(true) + classModelField.get(null).asInstanceOf[SpecificData] + } else { + SpecificData.get() + } + } + + private class Identity[T](cls: Class[T]) + extends SimpleFunction[T, T](SerializableFunctions.identity[T]) { + override def getInputTypeDescriptor: TypeDescriptor[T] = TypeDescriptor.of(cls) + override def getOutputTypeDescriptor: TypeDescriptor[T] = TypeDescriptor.of(cls) + } + + private object Identity { + def apply[T: ClassTag]: Identity[T] = new Identity(ScioUtil.classOf[T]) + def apply[T](cls: Class[T]): Identity[T] = new Identity(cls) + } + + @inline final def apply[T](path: String): TestIO[T] = + new TestIO[T] { + override val tapT: TapT.Aux[T, T] = TapOf[T] + override def testId: String = s"ParquetAvroIO($path)" + } + object ReadParam { val DefaultProjection: Schema = null val DefaultPredicate: FilterPredicate = null val DefaultConfiguration: Configuration = null val DefaultSuffix: String = null + val DefaultDatumFactory: Null = null - private[scio] def apply[T: ClassTag](params: WriteParam): ReadParam[T, T] = - new ReadParam[T, T]( - projectionFn = identity, - projection = params.schema, + private[scio] def apply[T](params: WriteParam[T]): ReadParam[T] = + new ReadParam( + projection = DefaultProjection, + predicate = DefaultPredicate, conf = params.conf, - suffix = params.suffix + suffix = params.suffix, + datumFactory = params.datumFactory ) } - final case class ReadParam[A: ClassTag, T: ClassTag] private ( - projectionFn: A => T, - projection: Schema = ReadParam.DefaultProjection, - predicate: FilterPredicate = ReadParam.DefaultPredicate, - conf: Configuration = ReadParam.DefaultConfiguration, - suffix: String = null - ) { - lazy val confOrDefault = ParquetConfiguration.ofNullable(conf) - val avroClass: Class[A] = ScioUtil.classOf[A] - val isSpecific: Boolean = classOf[SpecificRecord] isAssignableFrom avroClass - val readSchema: Schema = - if (isSpecific) ReflectData.get().getSchema(avroClass) else projection - - def read(sc: ScioContext, path: String)(implicit coder: Coder[T]): SCollection[T] = { - if (ParquetReadConfiguration.getUseSplittableDoFn(confOrDefault, sc.options)) { - readSplittableDoFn(sc, path) - } else { - readLegacy(sc, path) - } - } - - def setupConfig(): Unit = { - AvroReadSupport.setAvroReadSchema(confOrDefault, readSchema) - if (projection != null) { - AvroReadSupport.setRequestedProjection(confOrDefault, projection) - } - - if (predicate != null) { - ParquetInputFormat.setFilterPredicate(confOrDefault, predicate) - } - - // Needed to make GenericRecord read by parquet-avro work with Beam's - // org.apache.beam.sdk.extensions.avro.coders.AvroCoder - if (!isSpecific) { - confOrDefault.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false) - if (confOrDefault.get(AvroReadSupport.AVRO_DATA_SUPPLIER) == null) { - confOrDefault.setClass( - AvroReadSupport.AVRO_DATA_SUPPLIER, - classOf[GenericDataSupplier], - classOf[AvroDataSupplier] - ) - } - } - } - - private def readSplittableDoFn(sc: ScioContext, path: String)(implicit - coder: Coder[T] - ): SCollection[T] = { - val filePattern = ScioUtil.filePattern(path, suffix) - val bCoder = CoderMaterializer.beam(sc, coder) - val cleanedProjectionFn = ClosureCleaner.clean(projectionFn) - - sc.applyTransform( - ParquetRead.read[A, T]( - ReadSupportFactory.avro, - new SerializableConfiguration(confOrDefault), - filePattern, - Functions.serializableFn(cleanedProjectionFn) - ) - ).setCoder(bCoder) - } - - private def readLegacy(sc: ScioContext, path: String)(implicit - coder: Coder[T] - ): SCollection[T] = { - val job = Job.getInstance(confOrDefault) - val filePattern = ScioUtil.filePattern(path, suffix) - GcsConnectorUtil.setInputPaths(sc, job, filePattern) - job.setInputFormatClass(classOf[AvroParquetInputFormat[T]]) - job.getConfiguration.setClass("key.class", classOf[Void], classOf[Void]) - job.getConfiguration.setClass("value.class", avroClass, avroClass) - - val g = ClosureCleaner.clean(projectionFn) // defeat closure - val aCls = avroClass - val oCls = ScioUtil.classOf[T] - val transform = HadoopFormatIO - .read[JBoolean, T]() - // Hadoop input always emit key-value, and `Void` causes NPE in Beam coder - .withKeyTranslation(new SimpleFunction[Void, JBoolean]() { - override def apply(input: Void): JBoolean = true - }) - .withValueTranslation(new SimpleFunction[A, T]() { - // Workaround for incomplete Avro objects - // `SCollection#map` might throw NPE on incomplete Avro objects when the runner tries - // to serialized them. Lifting the mapping function here fixes the problem. - override def apply(input: A): T = g(input) - override def getInputTypeDescriptor = TypeDescriptor.of(aCls) - override def getOutputTypeDescriptor = TypeDescriptor.of(oCls) - }) - .withConfiguration(job.getConfiguration) - - sc.applyTransform(transform).map(_.getValue) - } - } + final case class ReadParam[T] private ( + projection: Schema = DefaultProjection, + predicate: FilterPredicate = DefaultPredicate, + conf: Configuration = DefaultConfiguration, + suffix: String = DefaultSuffix, + datumFactory: AvroDatumFactory[T] = DefaultDatumFactory + ) object WriteParam { - val DefaultSchema: Schema = null val DefaultNumShards: Int = 0 val DefaultSuffix: String = ".parquet" val DefaultCompression: CompressionCodecName = CompressionCodecName.ZSTD @@ -259,17 +293,69 @@ object ParquetAvroIO { val DefaultPrefix: String = null val DefaultShardNameTemplate: String = null val DefaultTempDirectory: String = null + val DefaultDatumFactory: Null = null } - final case class WriteParam private ( - schema: Schema = WriteParam.DefaultSchema, - numShards: Int = WriteParam.DefaultNumShards, - suffix: String = WriteParam.DefaultSuffix, - compression: CompressionCodecName = WriteParam.DefaultCompression, - conf: Configuration = WriteParam.DefaultConfiguration, - filenamePolicySupplier: FilenamePolicySupplier = WriteParam.DefaultFilenamePolicySupplier, - prefix: String = WriteParam.DefaultPrefix, - shardNameTemplate: String = WriteParam.DefaultShardNameTemplate, - tempDirectory: String = WriteParam.DefaultTempDirectory + final case class WriteParam[T] private ( + numShards: Int = DefaultNumShards, + suffix: String = DefaultSuffix, + compression: CompressionCodecName = DefaultCompression, + conf: Configuration = DefaultConfiguration, + filenamePolicySupplier: FilenamePolicySupplier = DefaultFilenamePolicySupplier, + prefix: String = DefaultPrefix, + shardNameTemplate: String = DefaultShardNameTemplate, + tempDirectory: String = DefaultTempDirectory, + datumFactory: AvroDatumFactory[T] = DefaultDatumFactory ) } + +final case class ParquetGenericRecordIO( + path: String, + schema: Schema +) extends ParquetAvroIO[GenericRecord] { + override protected def defaultDatumFactory: AvroDatumFactory[GenericRecord] = + GenericRecordDatumFactory + override protected def readFiles( + projection: Schema, + predicate: FilterPredicate, + configuration: Configuration + ): PTransform[PCollection[ReadableFile], PCollection[GenericRecord]] = + ParquetAvroRead.readAvroGenericRecordFiles(schema, projection, predicate, configuration) + override def tap(read: ReadP): Tap[GenericRecord] = + ParquetGenericRecordTap(path, schema, read) + +} + +object ParquetGenericRecordIO { + type ReadParam = ParquetAvroIO.ReadParam[GenericRecord] + val ReadParam = ParquetAvroIO.ReadParam + type WriteParam = ParquetAvroIO.WriteParam[GenericRecord] + val WriteParam = ParquetAvroIO.WriteParam +} + +final case class ParquetSpecificRecordIO[T <: SpecificRecord: ClassTag](path: String) + extends ParquetAvroIO[T] { + + private lazy val recordClass: Class[T] = ScioUtil.classOf[T] + override protected val schema: Schema = + SpecificData.get().getSchema(recordClass) + override protected val defaultDatumFactory: AvroDatumFactory[T] = + new SpecificRecordDatumFactory(recordClass) + + override protected def readFiles( + projection: Schema, + predicate: FilterPredicate, + configuration: Configuration + ): PTransform[PCollection[ReadableFile], PCollection[T]] = + ParquetAvroRead.readAvroFiles[T](projection, predicate, configuration) + override def tap(read: ReadP): Tap[T] = + ParquetSpecificRecordTap(path, read) + +} + +object ParquetSpecificRecordIO { + type ReadParam[T] = ParquetAvroIO.ReadParam[T] + val ReadParam = ParquetAvroIO.ReadParam + type WriteParam[T] = ParquetAvroIO.WriteParam[T] + val WriteParam = ParquetAvroIO.WriteParam +} diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroRead.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroRead.scala new file mode 100644 index 0000000000..c8275cbdec --- /dev/null +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroRead.scala @@ -0,0 +1,127 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.parquet.avro + +import com.spotify.scio.parquet.ParquetConfiguration +import com.spotify.scio.parquet.read.{ParquetRead, ReadSupportFactory} +import com.spotify.scio.util.ScioUtil +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.avro.specific.{SpecificData, SpecificRecord} +import org.apache.beam.sdk.io.FileIO.ReadableFile +import org.apache.beam.sdk.io.hadoop.SerializableConfiguration +import org.apache.beam.sdk.transforms.PTransform +import org.apache.beam.sdk.values.PCollection +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.avro.AvroReadSupport +import org.apache.parquet.filter2.predicate.FilterPredicate +import org.apache.parquet.hadoop.ParquetInputFormat + +import scala.reflect.ClassTag + +trait ParquetAvroRead { self: ParquetRead => + + /** + * A ReadFiles implementation that reads Parquet file(s) into Avro [[GenericRecord]]s using the + * supplied schema + * + * @param schema + * The Avro [[Schema]] to use for Parquet reads; can be a projection of the full file schema + * @param projection + * an [[Schema]] used for Projection, made up of a subset of fields from the full schema + * @param predicate + * a Parquet [[FilterPredicate]] predicate + * @param conf + * a Parquet [[Configuration]] + */ + def readAvroGenericRecordFiles( + schema: Schema, + projection: Schema = null, + predicate: FilterPredicate = null, + conf: Configuration = null + ): PTransform[PCollection[ReadableFile], PCollection[GenericRecord]] = { + val configuration = ParquetConfiguration.ofNullable(conf) + + configuration.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false) + AvroReadSupport.setAvroReadSchema(configuration, schema) + + Option(projection).foreach(p => AvroReadSupport.setRequestedProjection(configuration, p)) + Option(predicate).foreach(p => ParquetInputFormat.setFilterPredicate(configuration, p)) + + readFiles(ReadSupportFactory.avro, new SerializableConfiguration(configuration)) + } + + /** + * A ReadFiles implementation that reads Parquet file(s) into Avro [[SpecificRecord]]s. + * + * @param projection + * an [[Schema]] used for Projection, made up of a subset of fields from the full Avro type `T` + * @param predicate + * a Parquet [[FilterPredicate]] predicate + * @param conf + * a Parquet [[Configuration]] + */ + def readAvroFiles[T <: SpecificRecord: ClassTag]( + projection: Schema = null, + predicate: FilterPredicate = null, + conf: Configuration = null + ): PTransform[PCollection[ReadableFile], PCollection[T]] = { + val configuration = ParquetConfiguration.ofNullable(conf) + + val recordClass = ScioUtil.classOf[T] + val schema = SpecificData.get().getSchema(recordClass) + AvroReadSupport.setAvroReadSchema(configuration, schema) + + Option(projection).foreach(p => AvroReadSupport.setRequestedProjection(configuration, p)) + Option(predicate).foreach(p => ParquetInputFormat.setFilterPredicate(configuration, p)) + + readFiles(ReadSupportFactory.avro, new SerializableConfiguration(configuration)) + } + + /** + * A ReadFiles implementation that reads Parquet file(s) into possibly incomplete Avro + * [[SpecificRecord]]s and directly transformed into desired type with the given parse function. + * + * @param parseFn + * the function transforming the Avro [[SpecificRecord]]s into the desired type + * @param projection + * an [[Schema]] used for Projection, made up of a subset of fields from the full Avro type `T` + * @param predicate + * a Parquet [[FilterPredicate]] predicate + * @param conf + * a Parquet [[Configuration]] + */ + def parseAvroFiles[A <: SpecificRecord: ClassTag, T]( + parseFn: A => T, + projection: Schema = null, + predicate: FilterPredicate = null, + conf: Configuration = null + ): PTransform[PCollection[ReadableFile], PCollection[T]] = { + val configuration = ParquetConfiguration.ofNullable(conf) + + val recordClass = ScioUtil.classOf[A] + val schema = SpecificData.get().getSchema(recordClass) + AvroReadSupport.setAvroReadSchema(configuration, schema) + + Option(projection).foreach(p => AvroReadSupport.setRequestedProjection(configuration, p)) + Option(predicate).foreach(p => ParquetInputFormat.setFilterPredicate(configuration, p)) + + parseFiles(ReadSupportFactory.avro, new SerializableConfiguration(configuration), parseFn) + } +} + +object ParquetAvroRead extends ParquetRead with ParquetAvroRead diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroTap.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroTap.scala index 83a178d5de..ae7f8a6d7f 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroTap.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroTap.scala @@ -18,42 +18,67 @@ package com.spotify.scio.parquet.avro import com.spotify.scio.ScioContext -import com.spotify.scio.coders.Coder import com.spotify.scio.io.Tap -import com.spotify.scio.parquet.BeamInputFile +import com.spotify.scio.parquet.{BeamInputFile, ParquetConfiguration} import com.spotify.scio.util.ScioUtil import com.spotify.scio.values.SCollection +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, IndexedRecord} +import org.apache.avro.specific.{SpecificData, SpecificRecord} import org.apache.beam.sdk.io._ -import org.apache.parquet.avro.AvroParquetReader +import org.apache.parquet.avro.{AvroParquetReader, AvroReadSupport} +import org.apache.parquet.hadoop.ParquetInputFormat import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag -final case class ParquetAvroTap[A, T: ClassTag: Coder]( - path: String, - params: ParquetAvroIO.ReadParam[A, T] -) extends Tap[T] { +sealed trait ParquetAvroTap[T <: IndexedRecord] extends Tap[T] { + def path: String + def schema: Schema + def params: ParquetAvroIO.ReadParam[T] + override def value: Iterator[T] = { val filePattern = ScioUtil.filePattern(path, params.suffix) - params.setupConfig() - val xs = FileSystems.`match`(filePattern).metadata().asScala.toList + val conf = ParquetConfiguration.ofNullable(params.conf) + Option(params.projection).foreach(AvroReadSupport.setRequestedProjection(conf, _)) + Option(params.predicate).foreach(ParquetInputFormat.setFilterPredicate(conf, _)) + xs.iterator.flatMap { metadata => val reader = AvroParquetReader - .builder[A](BeamInputFile.of(metadata.resourceId())) - .withConf(params.confOrDefault) + .builder[T](BeamInputFile.of(metadata.resourceId())) + .withConf(conf) .build() + new Iterator[T] { - private var current: A = reader.read() + private var current: T = reader.read() + override def hasNext: Boolean = current != null + override def next(): T = { - val r = params.projectionFn(current) + val prev = current current = reader.read() - r + prev } } } } +} + +final case class ParquetGenericRecordTap( + path: String, + schema: Schema, + params: ParquetGenericRecordIO.ReadParam = ParquetGenericRecordIO.ReadParam() +) extends ParquetAvroTap[GenericRecord] { + override def open(sc: ScioContext): SCollection[GenericRecord] = + sc.read(ParquetGenericRecordIO(path, schema))(params) +} + +final case class ParquetSpecificRecordTap[T <: SpecificRecord: ClassTag]( + path: String, + params: ParquetSpecificRecordIO.ReadParam[T] = ParquetSpecificRecordIO.ReadParam[T]() +) extends ParquetAvroTap[T] { + override def schema: Schema = SpecificData.get().getSchema(ScioUtil.classOf[T]) override def open(sc: ScioContext): SCollection[T] = - sc.read(ParquetAvroIO[T](path))(params) + sc.read(ParquetSpecificRecordIO[T](path))(params) } diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/dynamic/syntax/SCollectionSyntax.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/dynamic/syntax/SCollectionSyntax.scala index 35afd68592..c7cbfab7ab 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/dynamic/syntax/SCollectionSyntax.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/dynamic/syntax/SCollectionSyntax.scala @@ -23,22 +23,62 @@ import com.spotify.scio.parquet.avro.{ParquetAvroIO, ParquetAvroSink} import com.spotify.scio.util.ScioUtil import com.spotify.scio.values.SCollection import org.apache.avro.Schema -import org.apache.avro.reflect.ReflectData -import org.apache.avro.specific.SpecificRecord +import org.apache.avro.generic.GenericRecord +import org.apache.avro.specific.{SpecificData, SpecificRecord} import org.apache.beam.sdk.io.hadoop.SerializableConfiguration import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.metadata.CompressionCodecName import scala.reflect.ClassTag -final class DynamicParquetAvroSCollectionOps[T]( +final class DynamicAvroGenericSCollectionOps( + private val self: SCollection[GenericRecord] +) extends AnyVal { + + /** Save this SCollection of Avro records as a Parquet files written to dynamic destinations. */ + def saveAsDynamicParquetAvroFile( + path: String, + schema: Schema, + numShards: Int = ParquetAvroIO.WriteParam.DefaultNumShards, + suffix: String = ParquetAvroIO.WriteParam.DefaultSuffix, + compression: CompressionCodecName = ParquetAvroIO.WriteParam.DefaultCompression, + conf: Configuration = ParquetAvroIO.WriteParam.DefaultConfiguration, + tempDirectory: String = ParquetAvroIO.WriteParam.DefaultTempDirectory, + prefix: String = ParquetAvroIO.WriteParam.DefaultPrefix + )( + destinationFn: GenericRecord => String + ): ClosedTap[Nothing] = { + if (self.context.isTest) { + throw new NotImplementedError( + "Parquet avro file with dynamic destinations cannot be used in a test context" + ) + } else { + val sink = new ParquetAvroSink[GenericRecord]( + schema, + compression, + new SerializableConfiguration(ParquetConfiguration.ofNullable(conf)) + ) + val write = writeDynamic( + path = path, + destinationFn = destinationFn, + numShards = numShards, + prefix = prefix, + suffix = suffix, + tempDirectory = tempDirectory + ).via(sink) + self.applyInternal(write) + } + ClosedTap[Nothing](EmptyTap) + } +} + +final class DynamicParquetAvroSpecificSCollectionOps[T <: SpecificRecord]( private val self: SCollection[T] ) extends AnyVal { /** Save this SCollection of Avro records as a Parquet files written to dynamic destinations. */ def saveAsDynamicParquetAvroFile( path: String, - schema: Schema = ParquetAvroIO.WriteParam.DefaultSchema, numShards: Int = ParquetAvroIO.WriteParam.DefaultNumShards, suffix: String = ParquetAvroIO.WriteParam.DefaultSuffix, compression: CompressionCodecName = ParquetAvroIO.WriteParam.DefaultCompression, @@ -54,12 +94,10 @@ final class DynamicParquetAvroSCollectionOps[T]( ) } else { val cls = ScioUtil.classOf[T] - val isAssignable = classOf[SpecificRecord].isAssignableFrom(cls) - val writerSchema = if (isAssignable) ReflectData.get().getSchema(cls) else schema - if (writerSchema == null) throw new IllegalArgumentException("Schema must not be null") + val schema = SpecificData.get().getSchema(cls) val sink = new ParquetAvroSink[T]( - writerSchema, + schema, compression, new SerializableConfiguration(ParquetConfiguration.ofNullable(conf)) ) @@ -78,8 +116,11 @@ final class DynamicParquetAvroSCollectionOps[T]( } trait SCollectionSyntax { - implicit def dynamicParquetAvroSCollectionOps[T]( + implicit def dynamicParquetAvroGenericSCollectionOps( + sc: SCollection[GenericRecord] + ): DynamicAvroGenericSCollectionOps = new DynamicAvroGenericSCollectionOps(sc) + + implicit def dynamicParquetAvroSpecificSCollectionOps[T <: SpecificRecord]( sc: SCollection[T] - ): DynamicParquetAvroSCollectionOps[T] = - new DynamicParquetAvroSCollectionOps(sc) + ): DynamicParquetAvroSpecificSCollectionOps[T] = new DynamicParquetAvroSpecificSCollectionOps(sc) } diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/syntax/SCollectionSyntax.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/syntax/SCollectionSyntax.scala index d2b789ba7a..95b3c0968d 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/syntax/SCollectionSyntax.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/syntax/SCollectionSyntax.scala @@ -17,14 +17,15 @@ package com.spotify.scio.parquet.avro.syntax -import com.spotify.scio.coders.Coder import com.spotify.scio.io.ClosedTap import com.spotify.scio.parquet.avro.ParquetAvroIO.WriteParam -import com.spotify.scio.parquet.avro.ParquetAvroIO +import com.spotify.scio.parquet.avro.{ParquetGenericRecordIO, ParquetSpecificRecordIO} import com.spotify.scio.util.FilenamePolicySupplier import com.spotify.scio.values.SCollection import org.apache.avro.Schema -import org.apache.avro.generic.IndexedRecord +import org.apache.avro.generic.GenericRecord +import org.apache.avro.specific.SpecificRecord +import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.metadata.CompressionCodecName @@ -34,14 +35,15 @@ import scala.reflect.ClassTag * Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with Parquet Avro * methods. */ -class SCollectionOps[T <: IndexedRecord](private val self: SCollection[T]) extends AnyVal { +class ParquetAvroGenericSCollectionOps(private val self: SCollection[GenericRecord]) + extends AnyVal { /** * Save this SCollection of Avro records as a Parquet file. * @param path * output location of the write operation * @param schema - * must be not null if `T` is of type [[org.apache.avro.generic.GenericRecord GenericRecord]]. + * the avro schema * @param numShards * number of shards per output directory * @param suffix @@ -56,7 +58,7 @@ class SCollectionOps[T <: IndexedRecord](private val self: SCollection[T]) exten */ def saveAsParquetAvroFile( path: String, - schema: Schema = WriteParam.DefaultSchema, + schema: Schema, numShards: Int = WriteParam.DefaultNumShards, suffix: String = WriteParam.DefaultSuffix, compression: CompressionCodecName = WriteParam.DefaultCompression, @@ -64,10 +66,10 @@ class SCollectionOps[T <: IndexedRecord](private val self: SCollection[T]) exten shardNameTemplate: String = WriteParam.DefaultShardNameTemplate, tempDirectory: String = WriteParam.DefaultTempDirectory, filenamePolicySupplier: FilenamePolicySupplier = WriteParam.DefaultFilenamePolicySupplier, - prefix: String = WriteParam.DefaultPrefix - )(implicit ct: ClassTag[T], coder: Coder[T]): ClosedTap[T] = { - val param = WriteParam( - schema = schema, + prefix: String = WriteParam.DefaultPrefix, + datumFactory: AvroDatumFactory[GenericRecord] = WriteParam.DefaultDatumFactory + ): ClosedTap[GenericRecord] = { + val param = ParquetGenericRecordIO.WriteParam( numShards = numShards, suffix = suffix, compression = compression, @@ -75,17 +77,67 @@ class SCollectionOps[T <: IndexedRecord](private val self: SCollection[T]) exten filenamePolicySupplier = filenamePolicySupplier, prefix = prefix, shardNameTemplate = shardNameTemplate, - tempDirectory = tempDirectory + tempDirectory = tempDirectory, + datumFactory = datumFactory ) - self.write(ParquetAvroIO[T](path))(param) + self.write(ParquetGenericRecordIO(path, schema))(param) + } +} + +class ParquetAvroSpecificSCollectionOps[T <: SpecificRecord](private val self: SCollection[T]) + extends AnyVal { + + /** + * Save this SCollection of Avro records as a Parquet file. + * @param path + * output location of the write operation + * @param numShards + * number of shards per output directory + * @param suffix + * defaults to .parquet + * @param compression + * defaults to snappy + * @param conf + * @param shardNameTemplate + * @param tempDirectory + * @param filenamePolicySupplier + * @param prefix + */ + def saveAsParquetAvroFile( + path: String, + numShards: Int = WriteParam.DefaultNumShards, + suffix: String = WriteParam.DefaultSuffix, + compression: CompressionCodecName = WriteParam.DefaultCompression, + conf: Configuration = WriteParam.DefaultConfiguration, + shardNameTemplate: String = WriteParam.DefaultShardNameTemplate, + tempDirectory: String = WriteParam.DefaultTempDirectory, + filenamePolicySupplier: FilenamePolicySupplier = WriteParam.DefaultFilenamePolicySupplier, + prefix: String = WriteParam.DefaultPrefix, + datumFactory: AvroDatumFactory[T] = WriteParam.DefaultDatumFactory + )(implicit ct: ClassTag[T]): ClosedTap[T] = { + val param = ParquetSpecificRecordIO.WriteParam( + numShards = numShards, + suffix = suffix, + compression = compression, + conf = conf, + filenamePolicySupplier = filenamePolicySupplier, + prefix = prefix, + shardNameTemplate = shardNameTemplate, + tempDirectory = tempDirectory, + datumFactory = datumFactory + ) + self.write(ParquetSpecificRecordIO[T](path))(param) } } trait SCollectionSyntax { - implicit def parquetAvroSCollectionOps[T <: IndexedRecord](c: SCollection[T]): SCollectionOps[T] = - new SCollectionOps[T](c) - implicit def parquetAvroSCollection[T <: IndexedRecord: Coder]( - self: ParquetAvroFile[T] - ): SCollectionOps[T] = - new SCollectionOps[T](self.toSCollection) + implicit def parquetAvroGenericSCollectionOps( + c: SCollection[GenericRecord] + ): ParquetAvroGenericSCollectionOps = + new ParquetAvroGenericSCollectionOps(c) + + implicit def parquetAvroSpecificSCollectionOps[T <: SpecificRecord]( + c: SCollection[T] + ): ParquetAvroSpecificSCollectionOps[T] = + new ParquetAvroSpecificSCollectionOps(c) } diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/syntax/ScioContextSyntax.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/syntax/ScioContextSyntax.scala index 5a430a7eba..225b6146d2 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/syntax/ScioContextSyntax.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/syntax/ScioContextSyntax.scala @@ -18,88 +18,47 @@ package com.spotify.scio.parquet.avro.syntax import com.spotify.scio.ScioContext -import com.spotify.scio.coders.Coder -import com.spotify.scio.parquet.avro.ParquetAvroIO +import com.spotify.scio.parquet.avro.{ParquetGenericRecordIO, ParquetSpecificRecordIO} import com.spotify.scio.parquet.avro.ParquetAvroIO.ReadParam import com.spotify.scio.values.SCollection import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord +import org.apache.avro.specific.SpecificRecord +import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory import org.apache.hadoop.conf.Configuration import org.apache.parquet.filter2.predicate.FilterPredicate -import org.slf4j.LoggerFactory import scala.reflect.ClassTag /** Enhanced version of [[ScioContext]] with Parquet Avro methods. */ final class ScioContextOps(@transient private val self: ScioContext) extends AnyVal { - /** - * Get an SCollection for a Parquet file as Avro records. Since Avro records produced by Parquet - * column projection may be incomplete and may fail serialization, you must - * [[ParquetAvroFile.map map]] the result to extract projected fields from the Avro records. - * - * Note that due to limitations of the underlying `HadoopInputFormatIO`, dynamic work rebalancing - * is not supported. Pipelines may not autoscale up or down during the initial read and subsequent - * fused transforms. - */ - def parquetAvroFile[T <: GenericRecord: ClassTag]( + def parquetAvroGenericRecordFile( path: String, + schema: Schema, projection: Schema = ReadParam.DefaultProjection, predicate: FilterPredicate = ReadParam.DefaultPredicate, conf: Configuration = ReadParam.DefaultConfiguration, - suffix: String = ReadParam.DefaultSuffix - ): ParquetAvroFile[T] = - self.requireNotClosed { - new ParquetAvroFile[T](self, path, projection, predicate, conf, suffix) - } -} - -class ParquetAvroFile[T: ClassTag] private[avro] ( - context: ScioContext, - path: String, - projection: Schema, - predicate: FilterPredicate, - conf: Configuration, - suffix: String -) { - private val logger = LoggerFactory.getLogger(this.getClass) - - /** - * Return a new SCollection by applying a function to all Parquet Avro records of this Parquet - * file. - */ - def map[U: ClassTag: Coder](f: T => U): SCollection[U] = { - val param = ParquetAvroIO.ReadParam[T, U](f, projection, predicate, conf, suffix) - context.read(ParquetAvroIO[U](path))(param) - } - - /** - * Return a new SCollection by first applying a function to all Parquet Avro records of this - * Parquet file, and then flattening the results. - */ - def flatMap[U: Coder](f: T => TraversableOnce[U]): SCollection[U] = { - implicit val coder: Coder[TraversableOnce[U]] = Coder.kryo - this - // HadoopInputFormatIO does not support custom coder, force SerializableCoder - .map(x => f(x)) - .asInstanceOf[SCollection[TraversableOnce[U]]] - .flatten + suffix: String = ReadParam.DefaultSuffix, + datumFactory: AvroDatumFactory[GenericRecord] = ReadParam.DefaultDatumFactory + ): SCollection[GenericRecord] = { + val param = ParquetGenericRecordIO.ReadParam(projection, predicate, conf, suffix, datumFactory) + self.read(ParquetGenericRecordIO(path, schema))(param) } - private[avro] def toSCollection(implicit c: Coder[T]): SCollection[T] = { - if (projection != null) { - logger.warn( - "Materializing Parquet Avro records with projection may cause " + - "NullPointerException. Perform a `map` or `flatMap` immediately after " + - "`parquetAvroFile` to map out projected fields." - ) - } - this.map(identity) + def parquetAvroFile[T <: SpecificRecord: ClassTag]( + path: String, + projection: Schema = ReadParam.DefaultProjection, + predicate: FilterPredicate = ReadParam.DefaultPredicate, + conf: Configuration = ReadParam.DefaultConfiguration, + suffix: String = ReadParam.DefaultSuffix, + datumFactory: AvroDatumFactory[T] = ReadParam.DefaultDatumFactory + ): SCollection[T] = { + val param = ParquetSpecificRecordIO.ReadParam(projection, predicate, conf, suffix, datumFactory) + self.read(ParquetSpecificRecordIO[T](path))(param) } } trait ScioContextSyntax { implicit def parquetAvroScioContextOps(c: ScioContext): ScioContextOps = new ScioContextOps(c) - implicit def parquetAvroFileToSCollection[T: Coder](self: ParquetAvroFile[T]): SCollection[T] = - self.toSCollection } diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetRead.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetRead.scala index ecb169f109..d86c3941c2 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetRead.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetRead.scala @@ -17,379 +17,71 @@ package com.spotify.scio.parquet.read import com.spotify.scio.parquet.ParquetConfiguration -import com.spotify.scio.util.{Functions, ScioUtil} -import com.twitter.chill.ClosureCleaner +import com.spotify.scio.parquet.avro.ParquetAvroRead import magnolify.parquet.ParquetType -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord -import org.apache.avro.reflect.ReflectData -import org.apache.avro.specific.SpecificRecord import org.apache.beam.sdk.io.FileIO import org.apache.beam.sdk.io.FileIO.ReadableFile import org.apache.beam.sdk.io.hadoop.SerializableConfiguration -import org.apache.beam.sdk.transforms.{PTransform, ParDo, SerializableFunction} +import org.apache.beam.sdk.transforms.{PTransform, ParDo} import org.apache.beam.sdk.values.{PBegin, PCollection} import org.apache.hadoop.conf.Configuration -import org.apache.parquet.avro.{AvroReadSupport, GenericDataSupplier} import org.apache.parquet.filter2.predicate.FilterPredicate import org.apache.parquet.hadoop.ParquetInputFormat -import scala.reflect.ClassTag +trait ParquetRead { -object ParquetRead { - - def read[T, R]( + def read[T]( readSupportFactory: ReadSupportFactory[T], conf: SerializableConfiguration, - filePattern: String, - projectionFn: SerializableFunction[T, R] - ): PTransform[PBegin, PCollection[R]] = - new PTransform[PBegin, PCollection[R]] { - override def expand(input: PBegin): PCollection[R] = { + filePattern: String + ): PTransform[PBegin, PCollection[T]] = + new PTransform[PBegin, PCollection[T]] { + override def expand(input: PBegin): PCollection[T] = { input .apply(FileIO.`match`().filepattern(filePattern)) .apply(FileIO.readMatches) - .apply(readFiles(readSupportFactory, conf, projectionFn)) + .apply(readFiles(readSupportFactory, conf)) } } - def readFiles[T, R]( + def readFiles[T]( readSupportFactory: ReadSupportFactory[T], - conf: SerializableConfiguration, - projectionFn: SerializableFunction[T, R] - ): PTransform[PCollection[ReadableFile], PCollection[R]] = { - val sdf = new ParquetReadFn[T, R](readSupportFactory, conf, projectionFn) - val tfx: PTransform[PCollection[_ <: ReadableFile], PCollection[R]] = ParDo.of(sdf) + conf: SerializableConfiguration + ): PTransform[PCollection[ReadableFile], PCollection[T]] = { + val sdf = ParquetReadFn(readSupportFactory, conf) + val tfx: PTransform[PCollection[_ <: ReadableFile], PCollection[T]] = ParDo.of(sdf) - tfx.asInstanceOf[PTransform[PCollection[ReadableFile], PCollection[R]]] + tfx.asInstanceOf[PTransform[PCollection[ReadableFile], PCollection[T]]] } - /** - * A ReadFiles implementation that reads Parquet file(s) into Scala case classes of type T - * - * @param predicate - * a Parquet [[FilterPredicate]] predicate, if desired - * @param conf - * a Parquet [[Configuration]], if desired - */ - def readTyped[T: ParquetType]( - predicate: FilterPredicate = null, - conf: Configuration = null - ): PTransform[PCollection[ReadableFile], PCollection[T]] = readTyped( - identity[T], - predicate, - conf - ) - - /** - * A ReadFiles implementation that reads Parquet file(s) into Scala case classes of type R, where - * R is mapped from type T - * - * @param projectionFn - * a function mapping T => R - */ - def readTyped[T: ParquetType, R]( - projectionFn: T => R - ): PTransform[PCollection[ReadableFile], PCollection[R]] = readTyped( - projectionFn, - null, - null - ) - - /** - * A ReadFiles implementation that reads Parquet file(s) into Scala case classes of type R, where - * R is mapped from type T - * - * @param projectionFn - * a function mapping T => R - * @param predicate - * a Parquet [[FilterApi]] predicate - */ - def readTyped[T: ParquetType, R]( - projectionFn: T => R, - predicate: FilterPredicate - ): PTransform[PCollection[ReadableFile], PCollection[R]] = readTyped( - projectionFn, - predicate, - null - ) - - /** - * A ReadFiles implementation that reads Parquet file(s) into Scala case classes of type R, where - * R is mapped from type T - * - * @param projectionFn - * a function mapping T => R - * @param conf - * a Parquet [[Configuration]] - */ - def readTyped[T: ParquetType, R]( - projectionFn: T => R, - conf: Configuration - ): PTransform[PCollection[ReadableFile], PCollection[R]] = readTyped( - projectionFn, - null, - conf - ) - - /** - * A ReadFiles implementation that reads Parquet file(s) into Scala case classes of type R, where - * R is mapped from type T - * - * @param projectionFn - * a function mapping T => R - * @param predicate - * a Parquet [[FilterApi]] predicate - * @param conf - * a Parquet [[Configuration]] - */ - def readTyped[T: ParquetType, R]( - projectionFn: T => R, - predicate: FilterPredicate, - conf: Configuration - ): PTransform[PCollection[ReadableFile], PCollection[R]] = { - val configuration = ParquetConfiguration.ofNullable(conf) - Option(predicate).foreach(p => ParquetInputFormat.setFilterPredicate(configuration, p)) + def parseFiles[T, U]( + readSupportFactory: ReadSupportFactory[T], + conf: SerializableConfiguration, + parseFn: T => U + ): PTransform[PCollection[ReadableFile], PCollection[U]] = { + val sdf = ParquetReadFn(readSupportFactory, conf, parseFn) + val tfx: PTransform[PCollection[_ <: ReadableFile], PCollection[U]] = ParDo.of(sdf) - val cleanedFn = Functions.serializableFn(ClosureCleaner.clean(projectionFn)) - readFiles(ReadSupportFactory.typed[T], new SerializableConfiguration(configuration), cleanedFn) + tfx.asInstanceOf[PTransform[PCollection[ReadableFile], PCollection[U]]] } /** - * A ReadFiles implementation that reads Parquet file(s) into Avro [[GenericRecord]]s using the - * supplied schema + * A ReadFiles implementation that reads Parquet file(s) into Scala case classes of type T * - * @param schema - * The Avro [[Schema]] to use for Parquet reads; can be a projection of the full file schema * @param predicate * a Parquet [[FilterPredicate]] predicate, if desired * @param conf * a Parquet [[Configuration]], if desired */ - def readAvroGenericRecordFiles( - schema: Schema, + def readTypedFiles[T: ParquetType]( predicate: FilterPredicate = null, conf: Configuration = null - ): PTransform[PCollection[ReadableFile], PCollection[GenericRecord]] = - readAvroGenericRecordFiles(schema, identity[GenericRecord], predicate, conf) - - /** - * A ReadFiles implementation that reads Parquet file(s) into Avro [[GenericRecord]]s using the - * supplied schema, then applies a mapping function to convert the Avro records into type T - * - * @param schema - * The Avro [[Schema]] to use for Parquet reads; can be a projection of the full file schema - * @param projectionFn - * a function mapping [[GenericRecord]] => T - */ - def readAvroGenericRecordFiles[T]( - schema: Schema, - projectionFn: GenericRecord => T - ): PTransform[PCollection[ReadableFile], PCollection[T]] = readAvroGenericRecordFiles( - schema, - projectionFn, - null, - null - ) - - /** - * A ReadFiles implementation that reads Parquet file(s) into Avro [[GenericRecord]]s using the - * supplied schema, then applies a mapping function to convert the Avro records into type T - * - * @param schema - * The Avro [[Schema]] to use for Parquet reads; can be a projection of the full file schema - * @param projectionFn - * a function mapping [[GenericRecord]] => T - * @param predicate - * a Parquet [[FilterPredicate]] predicate - */ - def readAvroGenericRecordFiles[T]( - schema: Schema, - projectionFn: GenericRecord => T, - predicate: FilterPredicate - ): PTransform[PCollection[ReadableFile], PCollection[T]] = readAvroGenericRecordFiles( - schema, - projectionFn, - predicate, - null - ) - - /** - * A ReadFiles implementation that reads Parquet file(s) into Avro [[GenericRecord]]s using the - * supplied schema, then applies a mapping function to convert the Avro records into type T - * - * @param schema - * The Avro [[Schema]] to use for Parquet reads; can be a projection of the full file schema - * @param projectionFn - * a function mapping [[GenericRecord]] => T - * @param conf - * a Parquet [[Configuration]] - */ - def readAvroGenericRecordFiles[T]( - schema: Schema, - projectionFn: GenericRecord => T, - conf: Configuration - ): PTransform[PCollection[ReadableFile], PCollection[T]] = readAvroGenericRecordFiles( - schema, - projectionFn, - conf - ) - - /** - * A ReadFiles implementation that reads Parquet file(s) into Avro [[GenericRecord]]s using the - * supplied schema, then applies a mapping function to convert the Avro records into type T - * - * @param schema - * The Avro [[Schema]] to use for Parquet reads; can be a projection of the full file schema - * @param projectionFn - * a function mapping [[GenericRecord]] => T - * @param predicate - * a Parquet [[FilterPredicate]] predicate - * @param conf - * a Parquet [[Configuration]] - */ - def readAvroGenericRecordFiles[T]( - schema: Schema, - projectionFn: GenericRecord => T, - predicate: FilterPredicate, - conf: Configuration ): PTransform[PCollection[ReadableFile], PCollection[T]] = { val configuration = ParquetConfiguration.ofNullable(conf) - configuration.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false) - AvroReadSupport.setAvroReadSchema(configuration, schema) - AvroReadSupport.setRequestedProjection(configuration, schema) - - Option(predicate).foreach(p => ParquetInputFormat.setFilterPredicate(configuration, p)) - if (configuration.get(AvroReadSupport.AVRO_DATA_SUPPLIER) == null) { - AvroReadSupport.setAvroDataSupplier(configuration, classOf[GenericDataSupplier]) - } - - val cleanedFn = Functions.serializableFn(ClosureCleaner.clean(projectionFn)) - readFiles( - ReadSupportFactory.avro, - new SerializableConfiguration(configuration), - cleanedFn - ) - } - - /** - * A ReadFiles implementation that reads Parquet file(s) into Avro [[SpecificRecord]]s - * - * @param projection - * an optional [[Schema]] used for Projection, made up of a subset of fields from the full Avro - * type `T`. If left unspecified, all fields from the generated class `T` will be read. Note - * that all fields excluded from the projection MUST be nullable in the Avro schema; if they are - * non-nullable, the resulting PTransform will fail serialization. You can solve this by adding - * a `projectionFn` param mapping type `T` to a serializable type `R` (i.e., a Scala case - * class): see [[readAvro(projection, projectionFn, predicate, conf)]] - * @param predicate - * a Parquet [[FilterPredicate]] predicate, if desired - * @param conf - * a Parquet [[Configuration]], if desired - */ - def readAvro[T <: SpecificRecord: ClassTag]( - projection: Schema = null, - predicate: FilterPredicate = null, - conf: Configuration = null - ): PTransform[PCollection[ReadableFile], PCollection[T]] = - readAvro(projection, identity[T], predicate, conf) - - /** - * A ReadFiles implementation that reads Parquet file(s) into Avro [[SpecificRecord]]s using the - * supplied schema, then applies a mapping function to convert the Avro records into type T - * - * @param projection - * an [[Schema]] used for Projection, made up of a subset of fields from the full Avro type `T` - * @param projectionFn - * a function mapping T => R - */ - def readAvro[T <: SpecificRecord: ClassTag, R]( - projection: Schema, - projectionFn: T => R - ): PTransform[PCollection[ReadableFile], PCollection[R]] = readAvro( - projection, - projectionFn, - null, - null - ) - - /** - * A ReadFiles implementation that reads Parquet file(s) into Avro [[SpecificRecord]]s using the - * supplied schema, then applies a mapping function to convert the Avro records into type T - * - * @param projection - * an [[Schema]] used for Projection, made up of a subset of fields from the full Avro type `T` - * @param projectionFn - * a function mapping T => R - * @param predicate - * a Parquet [[FilterPredicate]] predicate - */ - def readAvro[T <: SpecificRecord: ClassTag, R]( - projection: Schema, - projectionFn: T => R, - predicate: FilterPredicate - ): PTransform[PCollection[ReadableFile], PCollection[R]] = readAvro( - projection, - projectionFn, - predicate, - null - ) - - /** - * A ReadFiles implementation that reads Parquet file(s) into Avro [[SpecificRecord]]s using the - * supplied schema, then applies a mapping function to convert the Avro records into type T - * - * @param projection - * an [[Schema]] used for Projection, made up of a subset of fields from the full Avro type `T` - * @param projectionFn - * a function mapping T => R - * @param conf - * a Parquet [[Configuration]] - */ - def readAvro[T <: SpecificRecord: ClassTag, R]( - projection: Schema, - projectionFn: T => R, - conf: Configuration - ): PTransform[PCollection[ReadableFile], PCollection[R]] = readAvro( - projection, - projectionFn, - null, - conf - ) - - /** - * A ReadFiles implementation that reads Parquet file(s) into Avro [[SpecificRecord]]s using the - * supplied schema, then applies a mapping function to convert the Avro records into type T - * - * @param projection - * an [[Schema]] used for Projection, made up of a subset of fields from the full Avro type `T` - * @param projectionFn - * a function mapping T => R - * @param predicate - * a Parquet [[FilterPredicate]] predicate - * @param conf - * a Parquet [[Configuration]] - */ - def readAvro[T <: SpecificRecord: ClassTag, R]( - projection: Schema, - projectionFn: T => R, - predicate: FilterPredicate, - conf: Configuration - ): PTransform[PCollection[ReadableFile], PCollection[R]] = { - val configuration = ParquetConfiguration.ofNullable(conf) - - val avroClass = ScioUtil.classOf[T] - val readSchema = ReflectData.get().getSchema(avroClass) - AvroReadSupport.setAvroReadSchema(configuration, readSchema) - - Option(projection).foreach(p => AvroReadSupport.setRequestedProjection(configuration, p)) Option(predicate).foreach(p => ParquetInputFormat.setFilterPredicate(configuration, p)) - val cleanedFn = Functions.serializableFn(ClosureCleaner.clean(projectionFn)) - readFiles( - ReadSupportFactory.avro[T], - new SerializableConfiguration(configuration), - cleanedFn - ) + readFiles(ReadSupportFactory.typed[T], new SerializableConfiguration(configuration)) } } + +object ParquetRead extends ParquetRead with ParquetAvroRead diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetReadFn.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetReadFn.scala index 425277ac41..7ad618922a 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetReadFn.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetReadFn.scala @@ -17,7 +17,7 @@ package com.spotify.scio.parquet.read import com.spotify.scio.parquet.BeamInputFile -import com.spotify.scio.parquet.read.ParquetReadFn._ +import com.spotify.scio.util.Functions import org.apache.beam.sdk.io.FileIO.ReadableFile import org.apache.beam.sdk.io.hadoop.SerializableConfiguration import org.apache.beam.sdk.io.range.OffsetRange @@ -58,13 +58,28 @@ object ParquetReadFn { // Constants private val SplitLimit = 64000000L private val EntireFileRange = new OffsetRange(0, 1) + + def apply[T, U]( + readSupportFactory: ReadSupportFactory[T], + conf: SerializableConfiguration, + parseFn: T => U + ): ParquetReadFn[T, U] = + new ParquetReadFn(readSupportFactory, conf, Functions.serializableFn(parseFn)) + + def apply[T]( + readSupportFactory: ReadSupportFactory[T], + conf: SerializableConfiguration + ): ParquetReadFn[T, T] = + new ParquetReadFn(readSupportFactory, conf, Functions.serializableFn(identity)) } -class ParquetReadFn[T, R]( +class ParquetReadFn[T, U] private ( readSupportFactory: ReadSupportFactory[T], conf: SerializableConfiguration, - projectionFn: SerializableFunction[T, R] -) extends DoFn[ReadableFile, R] { + parseFn: SerializableFunction[T, U] +) extends DoFn[ReadableFile, U] { + import ParquetReadFn._ + @transient private lazy val options = HadoopReadOptions.builder(conf.get()).build() @@ -160,7 +175,7 @@ class ParquetReadFn[T, R]( def processElement( @Element file: ReadableFile, tracker: RestrictionTracker[OffsetRange, Long], - out: DoFn.OutputReceiver[R] + out: DoFn.OutputReceiver[U] ): Unit = { logger.debug( "reading file from offset {} to {}", @@ -207,8 +222,8 @@ class ParquetReadFn[T, R]( pages.getRowCount, file, recordReader, - out, - projectionFn + parseFn, + out ) pages = filterGranularity.readNextRowGroup(reader) } @@ -228,8 +243,8 @@ class ParquetReadFn[T, R]( pages.getRowCount, file, recordReader, - out, - projectionFn + parseFn, + out ) currentRowGroupIndex += 1 @@ -245,8 +260,8 @@ class ParquetReadFn[T, R]( rowCount: Long, file: ReadableFile, recordReader: RecordReader[T], - outputReceiver: DoFn.OutputReceiver[R], - projectionFn: SerializableFunction[T, R] + parseFn: SerializableFunction[T, U], + outputReceiver: DoFn.OutputReceiver[U] ): Unit = { logger.debug( "row group {} read in memory. row count = {}", @@ -272,7 +287,7 @@ class ParquetReadFn[T, R]( ) } else { - outputReceiver.output(projectionFn(record)) + outputReceiver.output(parseFn(record)) } currentRow += 1 diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ReadSupportFactory.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ReadSupportFactory.scala index 47a161fb8b..aca95b0f3c 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ReadSupportFactory.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ReadSupportFactory.scala @@ -32,7 +32,7 @@ object ReadSupportFactory { } def avro[T]: ReadSupportFactory[T] = new ReadSupportFactory[T] { - def readSupport: ReadSupport[T] = new AvroReadSupport() + def readSupport: ReadSupport[T] = new AvroReadSupport } def example: ReadSupportFactory[Example] = new ReadSupportFactory[Example] { diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala index a2fb8d8d41..2eaaa0b584 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala @@ -44,7 +44,7 @@ import org.apache.beam.sdk.transforms.SimpleFunction import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.parquet.filter2.predicate.FilterPredicate -import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetReader} +import org.apache.parquet.hadoop.ParquetInputFormat import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.tensorflow.proto.example.{Example, Features} import org.tensorflow.metadata.v0.Schema @@ -88,8 +88,7 @@ final case class ParquetExampleIO(path: String) extends ScioIO[Example] { ParquetRead.read( ReadSupportFactory.example, new SerializableConfiguration(conf), - filePattern, - identity[Example] + filePattern ) ).setCoder(coder) } @@ -258,7 +257,7 @@ final case class ParquetExampleTap(path: String, params: ParquetExampleIO.ReadPa val filePattern = ScioUtil.filePattern(path, params.suffix) val xs = FileSystems.`match`(filePattern).metadata().asScala.toList xs.iterator.flatMap { metadata => - val reader: ParquetReader[Example] = TensorflowExampleParquetReader + val reader = TensorflowExampleParquetReader .builder(BeamInputFile.of(metadata.resourceId())) .withConf(Option(params.conf).getOrElse(new Configuration())) .build() diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala index 98c4c6ee7a..62c23be3cf 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala @@ -80,8 +80,7 @@ final case class ParquetTypeIO[T: ClassTag: Coder: ParquetType]( ParquetRead.read( ReadSupportFactory.typed, new SerializableConfiguration(conf), - filePattern, - identity[T] + filePattern ) ).setCoder(coder) } diff --git a/scio-parquet/src/test/scala/com/spotify/scio/parquet/avro/ParquetAvroIOTest.scala b/scio-parquet/src/test/scala/com/spotify/scio/parquet/avro/ParquetAvroIOTest.scala index 119bb60167..440bd31c49 100644 --- a/scio-parquet/src/test/scala/com/spotify/scio/parquet/avro/ParquetAvroIOTest.scala +++ b/scio-parquet/src/test/scala/com/spotify/scio/parquet/avro/ParquetAvroIOTest.scala @@ -17,20 +17,21 @@ package com.spotify.scio.parquet.avro -import java.io.File import com.spotify.scio._ import com.spotify.scio.avro._ -import com.spotify.scio.io.{ClosedTap, FileNamePolicySpec, ScioIOTest, TapSpec, TextIO} +import com.spotify.scio.coders.Coder +import com.spotify.scio.io._ import com.spotify.scio.parquet.ParquetConfiguration import com.spotify.scio.parquet.read.ParquetReadConfiguration import com.spotify.scio.testing._ import com.spotify.scio.util.FilenamePolicySupplier import com.spotify.scio.values.{SCollection, WindowOptions} import org.apache.avro.data.TimeConversions -import org.apache.avro.{Conversion, Conversions, LogicalType, Schema} -import org.apache.avro.generic.{GenericData, GenericRecord, GenericRecordBuilder} -import org.apache.avro.specific.SpecificData +import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord} +import org.apache.avro.io.{DatumReader, DatumWriter} +import org.apache.avro.{Conversions, Schema} import org.apache.beam.sdk.Pipeline.PipelineExecutionException +import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory.GenericDatumFactory import org.apache.beam.sdk.options.PipelineOptionsFactory import org.apache.beam.sdk.transforms.windowing.{BoundedWindow, IntervalWindow, PaneInfo} import org.apache.commons.io.FileUtils @@ -41,8 +42,10 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.prop.TableDrivenPropertyChecks.{forAll => forAllCases, Table} import org.typelevel.scalaccompat.annotation.unused -import java.lang +import java.io.File import java.nio.file.Files +import scala.concurrent.duration._ +import scala.util.chaining._ class ParquetAvroIOFileNamePolicyTest extends FileNamePolicySpec[TestRecord] { override val suffix: String = ".parquet" @@ -71,9 +74,44 @@ class ParquetAvroIOFileNamePolicyTest extends FileNamePolicySpec[TestRecord] { ) } +object ParquetAvroIOTest { + + final class TestLogicalTypesGenericDatumFactory extends GenericDatumFactory { + override def apply(writer: Schema): DatumWriter[GenericRecord] = { + val data = new GenericData() + data.addLogicalTypeConversion(new TimeConversions.TimestampConversion) + data.addLogicalTypeConversion(new Conversions.DecimalConversion) + new GenericDatumWriter[GenericRecord](writer, data) + } + override def apply(writer: Schema, reader: Schema): DatumReader[GenericRecord] = { + val data = new GenericData() + data.addLogicalTypeConversion(new TimeConversions.TimestampConversion) + data.addLogicalTypeConversion(new Conversions.DecimalConversion) + new GenericDatumReader[GenericRecord](writer, reader, data) + } + } + + final class TestLogicalTypesDataSupplier extends AvroDataSupplier { + override def get(): GenericData = new GenericData() + .tap(_.addLogicalTypeConversion(new TimeConversions.TimestampConversion)) + .tap(_.addLogicalTypeConversion(new Conversions.DecimalConversion)) + } + + class TestRecordProjection(@unused str: String) +} + class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll { + + import ParquetAvroIOTest._ + private val testDir = Files.createTempDirectory("scio-test-").toFile + private val genericRecords = (1 to 10).map(AvroUtils.newGenericRecord) private val specificRecords = (1 to 10).map(AvroUtils.newSpecificRecord) + private val projection = Projection[TestRecord](_.getIntField) + private val predicate = Predicate[TestRecord](_.getIntField <= 5) + private val projectionRecords = (1 to 10).map { i => + TestRecord.newBuilder().setIntField(i).build() + } override protected def beforeAll(): Unit = { val sc = ScioContext() @@ -97,80 +135,106 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll { (() => ParquetAvroIO.ReadParam.DefaultConfiguration, "default") ) - it should "work with specific records" in { - val xs = (1 to 100).map(AvroUtils.newSpecificRecord) - + "ParquetAvroIO" should "work with specific records" in { forAllCases(readConfigs) { case (c, _) => - testTap(xs)(_.saveAsParquetAvroFile(_))(".parquet") - testJobTest(xs)(ParquetAvroIO(_))( - _.parquetAvroFile[TestRecord](_, conf = c()).map(identity) - )(_.saveAsParquetAvroFile(_)) + testTap(specificRecords)(_.saveAsParquetAvroFile(_))(".parquet") + testJobTest(specificRecords)(ParquetAvroIO(_))(_.parquetAvroFile[TestRecord](_, conf = c()))( + _.saveAsParquetAvroFile(_) + ) } } it should "read specific records with projection" in { forAllCases(readConfigs) { case (c, _) => - val sc = ScioContext() - val projection = Projection[TestRecord](_.getIntField) - val data = sc.parquetAvroFile[TestRecord]( - path = testDir.getAbsolutePath, - projection = projection, - suffix = ".parquet", - conf = c() - ) - - data.map(_.getIntField.toInt) should containInAnyOrder(1 to 10) - data.map(identity) should forAll[TestRecord] { r => - r.getLongField == null && r.getFloatField == null && r.getDoubleField == null && - r.getBooleanField == null && r.getStringField == null && r.getArrayField - .size() == 0 + runWithRealContext() { sc => + val result = sc.parquetAvroFile[TestRecord]( + path = testDir.getAbsolutePath, + projection = projection, + suffix = ".parquet", + conf = c() + ) + result should containInAnyOrder(projectionRecords) } - sc.run() } } it should "read specific records with predicate" in { forAllCases(readConfigs) { case (c, _) => - val sc = ScioContext() - val predicate = Predicate[TestRecord](_.getIntField <= 5) - val data = sc.parquetAvroFile[TestRecord]( - path = testDir.getAbsolutePath, - predicate = predicate, - suffix = ".parquet", - conf = c() - ) - val expected = specificRecords.filter(_.getIntField <= 5) - data.map(identity) should containInAnyOrder(expected) - sc.run() + runWithRealContext() { sc => + val result = sc.parquetAvroFile[TestRecord]( + path = testDir.getAbsolutePath, + predicate = predicate, + suffix = ".parquet", + conf = c() + ) + val expected = specificRecords.filter(_.getIntField <= 5) + result should containInAnyOrder(expected) + } } } it should "read specific records with projection and predicate" in { forAllCases(readConfigs) { case (c, _) => - val sc = ScioContext() - val projection = Projection[TestRecord](_.getIntField) - val predicate = Predicate[TestRecord](_.getIntField <= 5) - val data = sc.parquetAvroFile[TestRecord]( - path = testDir.getAbsolutePath, - projection = projection, - predicate = predicate, - suffix = ".parquet", - conf = c() + runWithRealContext() { sc => + val result = sc.parquetAvroFile[TestRecord]( + path = testDir.getAbsolutePath, + projection = projection, + predicate = predicate, + suffix = ".parquet", + conf = c() + ) + val expected = projectionRecords.filter(_.getIntField <= 5) + result should containInAnyOrder(expected) + } + } + } + + it should "write and read GenericRecords with logical types" in withTempDir { dir => + val schema = TestLogicalTypes.getClassSchema + val records: Seq[GenericRecord] = (1 to 10).map { i => + TestLogicalTypes + .newBuilder() + .setTimestamp(DateTime.now()) + .setDecimal(BigDecimal(i).setScale(2).bigDecimal) + .build() + } + val datumFactory = new TestLogicalTypesGenericDatumFactory + implicit val coder: Coder[GenericRecord] = avroCoder(datumFactory, schema) + + forAllCases(readConfigs) { case (readConf, testCase) => + val testCaseDir = new File(dir, testCase) + + runWithRealContext() { sc => + sc.parallelize(records) + .saveAsParquetAvroFile( + path = testCaseDir.getAbsolutePath, + schema = schema, + datumFactory = datumFactory + ) + } + + val conf = ParquetConfiguration.ofNullable(readConf()) + conf.setClass( + AvroWriteSupport.AVRO_DATA_SUPPLIER, + classOf[TestLogicalTypesDataSupplier], + classOf[AvroDataSupplier] ) - data.map(_.getIntField.toInt) should containInAnyOrder(1 to 5) - data.map(identity) should forAll[TestRecord] { r => - r.getLongField == null && - r.getFloatField == null && - r.getDoubleField == null && - r.getBooleanField == null && - r.getStringField == null && - r.getArrayField.size() == 0 + + runWithRealContext() { sc => + val result = sc + .parquetAvroGenericRecordFile( + path = testCaseDir.getAbsolutePath, + schema = schema, + conf = conf, + suffix = ".parquet", + datumFactory = datumFactory + ) + result should containInAnyOrder(records) } - sc.run() } } - it should "write and read SpecificRecords with default logical types" in withTempDir { dir => + it should "write and read SpecificRecords with logical types" in withTempDir { dir => forAllCases(readConfigs) { case (readConf, testCase) => val testCaseDir = new File(dir, testCase) val records = (1 to 10).map(_ => @@ -181,211 +245,92 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll { .build() ) - val sc1 = ScioContext() - sc1 - .parallelize(records) - .saveAsParquetAvroFile(path = testCaseDir.getAbsolutePath) - sc1.run() - - val sc2 = ScioContext() - sc2 - .parquetAvroFile[TestLogicalTypes]( - path = testCaseDir.getAbsolutePath, - conf = readConf(), - suffix = ".parquet" - ) - .map(identity) should containInAnyOrder(records) - - sc2.run() - } - } - - it should "write and read GenericRecords with default logical types" in withTempDir { dir => - forAllCases(readConfigs) { case (readConf, testCase) => - val testCaseDir = new File(dir, testCase) - val records: Seq[GenericRecord] = (1 to 10).map { _ => - val gr = new GenericRecordBuilder(TestLogicalTypes.SCHEMA$) - gr.set("timestamp", DateTime.now()) - gr.set( - "decimal", - BigDecimal.decimal(1.0).setScale(2).bigDecimal - ) - gr.build() + runWithRealContext() { sc => + sc + .parallelize(records) + .saveAsParquetAvroFile(path = testCaseDir.getAbsolutePath) } - implicit val coder = { - GenericData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion) - GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion) - avroGenericRecordCoder(TestLogicalTypes.SCHEMA$) + runWithRealContext() { sc => + val result = sc + .parquetAvroFile[TestLogicalTypes]( + path = testCaseDir.getAbsolutePath, + conf = readConf(), + suffix = ".parquet" + ) + result should containInAnyOrder(records) } - - val sc1 = ScioContext() - sc1 - .parallelize(records) - .saveAsParquetAvroFile( - path = testCaseDir.getAbsolutePath, - schema = TestLogicalTypes.SCHEMA$ - ) - sc1.run() - - val sc2 = ScioContext() - sc2 - .parquetAvroFile[GenericRecord]( - path = testCaseDir.getAbsolutePath, - projection = TestLogicalTypes.SCHEMA$, - conf = readConf(), - suffix = ".parquet" - ) - .map(identity) should containInAnyOrder(records) - - sc2.run() } } - it should "write and read SpecificRecords with custom logical types" in withTempDir { dir => + it should "read/write generic records" in withTempDir { dir => + implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(AvroUtils.schema) + forAllCases(readConfigs) { case (readConf, testCase) => val testCaseDir = new File(dir, testCase) - val records = - (1 to 10).map(_ => - TestLogicalTypes - .newBuilder() - .setTimestamp(DateTime.now()) - .setDecimal(BigDecimal.decimal(1.0).setScale(2).bigDecimal) - .build() - ) - val sc1 = ScioContext() - sc1 - .parallelize(records) - .saveAsParquetAvroFile( - path = testCaseDir.getAbsolutePath - ) - sc1.run() + runWithRealContext() { sc => + sc.parallelize(genericRecords) + .saveAsParquetAvroFile( + testCaseDir.getAbsolutePath, + numShards = 1, + schema = AvroUtils.schema + ) + } + + val files = testCaseDir.listFiles() + files.map(_.isDirectory).length shouldBe 1 - val sc2 = ScioContext() - sc2 - .parquetAvroFile[TestLogicalTypes]( + runWithRealContext() { sc => + val result = sc.parquetAvroGenericRecordFile( path = testCaseDir.getAbsolutePath, + schema = AvroUtils.schema, conf = readConf(), suffix = ".parquet" ) - .map(identity) should containInAnyOrder(records) - - sc2.run() - () + result should containInAnyOrder(genericRecords) + } } } - it should "read with incomplete projection" in withTempDir { dir => - forAllCases(readConfigs) { case (readConf, testCase) => - val testCaseDir = new File(dir, testCase) - val sc1 = ScioContext() - val nestedRecords = - (1 to 10).map(x => new Account(x, x.toString, x.toString, x.toDouble, AccountStatus.Active)) - sc1 - .parallelize(nestedRecords) - .saveAsParquetAvroFile(testCaseDir.getAbsolutePath) - sc1.run() - - val sc2 = ScioContext() - val projection = Projection[Account](_.getName) - val data = sc2.parquetAvroFile[Account]( - path = testCaseDir.getAbsolutePath, - projection = projection, - conf = readConf(), - suffix = ".parquet" - ) - val expected = nestedRecords.map(_.getName.toString) - data.map(_.getName.toString) should containInAnyOrder(expected) - data.flatMap(a => Some(a.getName.toString)) should containInAnyOrder(expected) - sc2.run() - } - } + it should "write windowed generic records to dynamic destinations" in withTempDir { dir => + // This test follows the same pattern as com.spotify.scio.io.dynamic.DynamicFileTest + implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(AvroUtils.schema) + val streamingOptions = PipelineOptionsFactory.fromArgs("--streaming=true").create() + + val filenamePolicySupplier = FilenamePolicySupplier.filenamePolicySupplierOf( + windowed = (shardNumber: Int, numShards: Int, window: BoundedWindow, _: PaneInfo) => { + val intervalWindow = window.asInstanceOf[IntervalWindow] + val year = intervalWindow.start().get(DateTimeFieldType.year()) + val month = intervalWindow.start().get(DateTimeFieldType.monthOfYear()) + val day = intervalWindow.start().get(DateTimeFieldType.dayOfMonth()) + val hour = intervalWindow.start().get(DateTimeFieldType.hourOfDay()) + "y=%02d/m=%02d/d=%02d/h=%02d/part-%s-of-%s" + .format(year, month, day, hour, shardNumber, numShards) + } + ) - it should "read/write generic records" in withTempDir { dir => - forAllCases(readConfigs) { case (readConf, testCase) => - val testCaseDir = new File(dir, testCase) - val genericRecords = (1 to 100).map(AvroUtils.newGenericRecord) - val sc1 = ScioContext() - implicit val coder = avroGenericRecordCoder(AvroUtils.schema) - sc1 + runWithRealContext(streamingOptions) { sc => + sc .parallelize(genericRecords) + // Explicit optional arguments `Duration.Zero` and `WindowOptions()` as a workaround for the + // mysterious "Could not find proxy for val sc1" compiler error + // take each records int value and multiply it by half hour, so we should have 2 records in each hour window + .timestampBy( + x => + Instant.ofEpochMilli( + (30.minutes * x.get("long_field").asInstanceOf[Long]).toMillis - 1 + ), + Duration.ZERO + ) + .withFixedWindows(Duration.standardHours(1), Duration.ZERO, WindowOptions()) .saveAsParquetAvroFile( - testCaseDir.getAbsolutePath, + dir.getAbsolutePath, numShards = 1, - schema = AvroUtils.schema + schema = AvroUtils.schema, + filenamePolicySupplier = filenamePolicySupplier ) - sc1.run() - - val files = testCaseDir.listFiles() - files.map(_.isDirectory).length shouldBe 1 - - val sc2 = ScioContext() - val data: SCollection[GenericRecord] = sc2.parquetAvroFile[GenericRecord]( - path = testCaseDir.getAbsolutePath, - projection = AvroUtils.schema, - conf = readConf(), - suffix = ".parquet" - ) - data should containInAnyOrder(genericRecords) - sc2.run() - } - } - - class TestRecordProjection(@unused str: String) - - "tap" should "use projection schema and GenericDataSupplier" in { - val schema = new Schema.Parser().parse( - """ - |{ - |"type":"record", - |"name":"TestRecordProjection", - |"namespace":"com.spotify.scio.parquet.avro.ParquetAvroIOTest$", - |"fields":[{"name":"int_field","type":["null", "int"]}]} - |""".stripMargin - ) - - implicit val coder = avroGenericRecordCoder(schema) - - ParquetAvroTap( - s"${testDir.toPath}", - ParquetAvroIO.ReadParam(identity[GenericRecord], schema, suffix = "*.parquet") - ).value.foreach { gr => - gr.get("int_field") should not be null - gr.get("string_field") should be(null) } - } - - it should "write windowed generic records to dynamic destinations" in withTempDir { dir => - // This test follows the same pattern as com.spotify.scio.io.dynamic.DynamicFileTest - val genericRecords = (0 until 10).map(AvroUtils.newGenericRecord) - val options = PipelineOptionsFactory.fromArgs("--streaming=true").create() - val sc1 = ScioContext(options) - implicit val coder = avroGenericRecordCoder(AvroUtils.schema) - sc1 - .parallelize(genericRecords) - // Explicit optional arguments `Duration.Zero` and `WindowOptions()` as a workaround for the - // mysterious "Could not find proxy for val sc1" compiler error - // take each records int value and multiply it by half hour, so we should have 2 records in each hour window - .timestampBy(x => new Instant(x.get("int_field").asInstanceOf[Int] * 1800000L), Duration.ZERO) - .withFixedWindows(Duration.standardHours(1), Duration.ZERO, WindowOptions()) - .saveAsParquetAvroFile( - dir.getAbsolutePath, - numShards = 1, - schema = AvroUtils.schema, - filenamePolicySupplier = FilenamePolicySupplier.filenamePolicySupplierOf( - windowed = (shardNumber: Int, numShards: Int, window: BoundedWindow, _: PaneInfo) => { - val intervalWindow = window.asInstanceOf[IntervalWindow] - val year = intervalWindow.start().get(DateTimeFieldType.year()) - val month = intervalWindow.start().get(DateTimeFieldType.monthOfYear()) - val day = intervalWindow.start().get(DateTimeFieldType.dayOfMonth()) - val hour = intervalWindow.start().get(DateTimeFieldType.hourOfDay()) - "y=%02d/m=%02d/d=%02d/h=%02d/part-%s-of-%s" - .format(year, month, day, hour, shardNumber, numShards) - } - ) - ) - sc1.run() def recursiveListFiles(directory: File): List[File] = { val files = directory.listFiles() @@ -395,150 +340,123 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll { val files = recursiveListFiles(dir) files.length shouldBe 5 - val params = - ParquetAvroIO.ReadParam[GenericRecord, GenericRecord]( - identity[GenericRecord], - AvroUtils.schema, - null - ) - - (0 until 10) + (1 to 10) .sliding(2, 2) .zipWithIndex - .map { case (window, idx) => - ( - f"y=1970/m=01/d=01/h=$idx%02d/part-0-of-1.parquet", - window.map(AvroUtils.newGenericRecord) - ) - } - .foreach { - case (filename, records) => { - val tap = ParquetAvroTap(s"$dir/$filename", params) - tap.value.toList should contain theSameElementsAs records - } + .foreach { case (window, idx) => + val filename = f"y=1970/m=01/d=01/h=$idx%02d/part-0-of-1.parquet" + val records = window.map(AvroUtils.newGenericRecord) + val tap = ParquetGenericRecordTap(s"$dir/$filename", AvroUtils.schema) + tap.value.toList should contain theSameElementsAs records } } it should "write generic records to dynamic destinations" in withTempDir { dir => - val genericRecords = (1 to 100).map(AvroUtils.newGenericRecord) - val sc = ScioContext() - implicit val coder = avroGenericRecordCoder(AvroUtils.schema) - sc.parallelize(genericRecords) - .saveAsParquetAvroFile( - dir.getAbsolutePath, - numShards = 1, - schema = AvroUtils.schema, - filenamePolicySupplier = FilenamePolicySupplier.filenamePolicySupplierOf( - unwindowed = (shardNumber: Int, numShards: Int) => - s"part-$shardNumber-of-$numShards-with-custom-naming" + implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(AvroUtils.schema) + + val filenamePolicySupplier = FilenamePolicySupplier.filenamePolicySupplierOf( + unwindowed = + (shardNumber: Int, numShards: Int) => s"part-$shardNumber-of-$numShards-with-custom-naming" + ) + + runWithRealContext() { sc => + sc.parallelize(genericRecords) + .saveAsParquetAvroFile( + dir.getAbsolutePath, + numShards = 1, + schema = AvroUtils.schema, + filenamePolicySupplier = filenamePolicySupplier ) - ) - sc.run() + } val files = dir.listFiles() files.length shouldBe 1 files.head.getAbsolutePath should include("part-0-of-1-with-custom-naming.parquet") - val params = ParquetAvroIO.ReadParam[GenericRecord, GenericRecord]( - identity[GenericRecord], - AvroUtils.schema, - null - ) - - val tap = ParquetAvroTap(files.head.getAbsolutePath, params) - + val tap = ParquetGenericRecordTap(files.head.getAbsolutePath, AvroUtils.schema) tap.value.toList should contain theSameElementsAs genericRecords } - it should "throw exception when filename functions not correctly defined for dynamic destinations" in withTempDir { + it should "throw exception when filename functions not correctly defined for un-windows dynamic destinations" in withTempDir { dir => - val genericRecords = (1 to 100).map(AvroUtils.newGenericRecord) - implicit val coder = avroGenericRecordCoder(AvroUtils.schema) + implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(AvroUtils.schema) - an[NotImplementedError] should be thrownBy { - val sc = ScioContext() - sc.parallelize(genericRecords) - .saveAsParquetAvroFile( - dir.getAbsolutePath, - numShards = 1, - schema = AvroUtils.schema, - filenamePolicySupplier = FilenamePolicySupplier.filenamePolicySupplierOf( - windowed = (_, _, _, _) => "test for exception handling" + val filenamePolicySupplier = FilenamePolicySupplier.filenamePolicySupplierOf( + windowed = (_, _, _, _) => "test for exception handling" + ) + + val e = the[PipelineExecutionException] thrownBy { + runWithRealContext() { sc => + sc.parallelize(genericRecords) + .saveAsParquetAvroFile( + dir.getAbsolutePath, + numShards = 1, + schema = AvroUtils.schema, + filenamePolicySupplier = filenamePolicySupplier ) - ) - try { - sc.run() - } catch { - case e: PipelineExecutionException => - throw e.getCause } } + e.getCause shouldBe a[NotImplementedError] + } + + it should "throw exception when filename functions not correctly defined for windowed dynamic destinations" in withTempDir { + dir => + implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(AvroUtils.schema) + + val filenamePolicySupplier = FilenamePolicySupplier.filenamePolicySupplierOf( + unwindowed = (_, _) => "test for exception handling" + ) val e = the[PipelineExecutionException] thrownBy { - val sc = ScioContext() - sc.parallelize(genericRecords) - .timestampBy( - x => new Instant(x.get("int_field").asInstanceOf[Int] * 1800000L), - Duration.ZERO - ) - .withFixedWindows(Duration.standardHours(1), Duration.ZERO, WindowOptions()) - .saveAsParquetAvroFile( - dir.getAbsolutePath, - numShards = 1, - schema = AvroUtils.schema, - filenamePolicySupplier = FilenamePolicySupplier.filenamePolicySupplierOf( - unwindowed = (_, _) => "test for exception handling" + runWithRealContext() { sc => + sc.parallelize(genericRecords) + .timestampBy( + x => new Instant(x.get("int_field").asInstanceOf[Int] * 1800000L), + Duration.ZERO ) - ) - sc.run() + .withFixedWindows(Duration.standardHours(1), Duration.ZERO, WindowOptions()) + .saveAsParquetAvroFile( + dir.getAbsolutePath, + numShards = 1, + schema = AvroUtils.schema, + filenamePolicySupplier = filenamePolicySupplier + ) + } } e.getCause shouldBe a[NotImplementedError] } it should "apply map functions to test input" in { + val expected = specificRecords.map(_.getIntField.toString) JobTest[ParquetTestJob.type] .args("--input=input", "--output=output") - .input( - ParquetAvroIO[Account]("input"), - List(Account.newBuilder().setId(1).setName("foo").setType("bar").setAmount(2.0).build()) - ) - .output(TextIO("output"))(_ should containSingleValue(("foo", 2.0).toString)) + .input(ParquetAvroIO[TestRecord]("input"), specificRecords) + .output(TextIO("output"))(_ should containInAnyOrder(expected)) .run() } + + "ParquetTap" should "use projection schema" in { + ParquetGenericRecordTap( + testDir.getAbsolutePath, + TestRecord.getClassSchema, + params = ParquetAvroIO.ReadParam(projection, suffix = ".parquet") + ).value.foreach { gr => + gr.get("int_field") should not be null + gr.get("string_field") shouldBe null + } + } } object ParquetTestJob { def main(cmdLineArgs: Array[String]): Unit = { val (sc, args) = ContextAndArgs(cmdLineArgs) sc - .parquetAvroFile[Account]( + .parquetAvroFile[TestRecord]( args("input"), - projection = Projection[Account](_.getName, _.getAmount) + projection = Projection[TestRecord](_.getIntField) ) - .map(a => (a.getName.toString, a.getAmount)) + .map(_.getIntField) .saveAsTextFile(args("output")) sc.run().waitUntilDone() } } - -case class CustomLogicalTypeSupplier() extends AvroDataSupplier { - override def get(): GenericData = { - val specificData = new SpecificData() - specificData.addLogicalTypeConversion(new Conversion[DateTime] { - override def getConvertedType: Class[DateTime] = classOf[DateTime] - override def getLogicalTypeName: String = "timestamp-millis" - - override def toLong( - value: DateTime, - schema: Schema, - `type`: LogicalType - ): lang.Long = - value.toInstant.getMillis - - override def fromLong(value: lang.Long, schema: Schema, `type`: LogicalType): DateTime = - Instant.ofEpochMilli(value).toDateTime - }) - specificData.addLogicalTypeConversion(new Conversions.DecimalConversion) - specificData - } -} diff --git a/scio-parquet/src/test/scala/com/spotify/scio/parquet/read/ParquetReadFnTest.scala b/scio-parquet/src/test/scala/com/spotify/scio/parquet/read/ParquetReadFnTest.scala index 26ab3dbe3a..a945f8d11c 100644 --- a/scio-parquet/src/test/scala/com/spotify/scio/parquet/read/ParquetReadFnTest.scala +++ b/scio-parquet/src/test/scala/com/spotify/scio/parquet/read/ParquetReadFnTest.scala @@ -17,76 +17,65 @@ package com.spotify.scio.parquet.read import com.spotify.scio.ScioContext -import com.spotify.scio.avro._ import com.spotify.scio.parquet.ParquetConfiguration -import com.spotify.scio.parquet.avro._ import com.spotify.scio.parquet.types._ import com.spotify.scio.testing.PipelineSpec -import org.apache.commons.io.FileUtils -import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} import org.apache.beam.sdk.options.PipelineOptionsFactory import org.apache.beam.sdk.util.SerializableUtils +import org.apache.commons.io.FileUtils import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.io.api.Binary import org.scalatest.BeforeAndAfterAll import java.io.File -import java.nio.file.{Files, Paths} +import java.nio.file.Files import scala.jdk.CollectionConverters._ case class Record(strField: String) class ParquetReadFnTest extends PipelineSpec with BeforeAndAfterAll { private val testDir = Files.createTempDirectory("scio-test").toFile + + // Multiple row-groups + private val multiRowGroupConf = ParquetConfiguration.of("parquet.block.size" -> 16) private val testMultiDir = new File(testDir, "multi") + // Single row-group + private val singleRowGroupConf = ParquetConfiguration.of("parquet.block.size" -> 1073741824) private val testSingleDir = new File(testDir, "single") + private val typedRecords = (1 to 250).map(i => Record(i.toString)).toList - private val avroRecords = (251 to 500).map(i => - Account - .newBuilder() - .setId(i) - .setType(i.toString) - .setName(i.toString) - .setAmount(i.toDouble) - .build - ) override def beforeAll(): Unit = { - // Multiple row-groups - val multiRowGroupConf = ParquetConfiguration.of("parquet.block.size" -> 16) - // Single row-group - val singleRowGroupConf = ParquetConfiguration.of("parquet.block.size" -> 1073741824) - val sc = ScioContext() val typedData = sc.parallelize(typedRecords) - val avroData = sc.parallelize(avroRecords) - - typedData.saveAsTypedParquetFile(s"$testMultiDir/typed", conf = multiRowGroupConf) - typedData.saveAsTypedParquetFile(s"$testSingleDir/typed", conf = singleRowGroupConf) - - avroData.saveAsParquetAvroFile(s"$testMultiDir/avro", conf = multiRowGroupConf) - avroData.saveAsParquetAvroFile(s"$testSingleDir/avro", conf = singleRowGroupConf) - + typedData.saveAsTypedParquetFile(testMultiDir.getAbsolutePath, conf = multiRowGroupConf) + typedData.saveAsTypedParquetFile(testSingleDir.getAbsolutePath, conf = singleRowGroupConf) sc.run() } override def afterAll(): Unit = FileUtils.deleteDirectory(testDir) + private def listFiles(dir: File): Seq[String] = + Files + .list(dir.toPath) + .iterator() + .asScala + .map(_.toFile.toPath.toString) + .toSeq + "Parquet ReadFn" should "read at file-level granularity for files with multiple row groups" in { val granularityConf = ParquetConfiguration.of( ParquetReadConfiguration.SplitGranularity -> ParquetReadConfiguration.SplitGranularityFile, ParquetReadConfiguration.UseSplittableDoFn -> true ) - val sc = ScioContext() - - sc - .parallelize(listFiles(s"${testMultiDir.getAbsolutePath}/typed")) - .readFiles( - ParquetRead.readTyped[Record](conf = granularityConf) - ) should containInAnyOrder(typedRecords) + runWithRealContext() { sc => + val result = sc + .parallelize(listFiles(testMultiDir)) + .readFiles(ParquetRead.readTypedFiles[Record](conf = granularityConf)) - sc.run() + result should containInAnyOrder(typedRecords) + } } it should "read at file-level granularity for files with a single row group" in { @@ -95,15 +84,13 @@ class ParquetReadFnTest extends PipelineSpec with BeforeAndAfterAll { ParquetReadConfiguration.UseSplittableDoFn -> true ) - val sc = ScioContext() - - sc - .parallelize(listFiles(s"${testMultiDir.getAbsolutePath}/typed")) - .readFiles( - ParquetRead.readTyped[Record](conf = granularityConf) - ) should containInAnyOrder(typedRecords) + runWithRealContext() { sc => + val result = sc + .parallelize(listFiles(testMultiDir)) + .readFiles(ParquetRead.readTypedFiles[Record](conf = granularityConf)) - sc.run() + result should containInAnyOrder(typedRecords) + } } it should "read at row-group granularity for files with multiple row groups" in { @@ -112,15 +99,15 @@ class ParquetReadFnTest extends PipelineSpec with BeforeAndAfterAll { ParquetReadConfiguration.UseSplittableDoFn -> true ) - val sc = ScioContext() - sc - .typedParquetFile[Record]( - path = s"${testMultiDir.getAbsolutePath}/typed", - conf = granularityConf, - suffix = ".parquet" - ) should containInAnyOrder(typedRecords) - - sc.run() + runWithRealContext() { sc => + val result = sc + .typedParquetFile[Record]( + path = testMultiDir.getAbsolutePath, + conf = granularityConf, + suffix = ".parquet" + ) + result should containInAnyOrder(typedRecords) + } } it should "read at row-group granularity for files with a single row groups" in { @@ -129,191 +116,53 @@ class ParquetReadFnTest extends PipelineSpec with BeforeAndAfterAll { ParquetReadConfiguration.UseSplittableDoFn -> true ) - val sc = ScioContext() - sc - .typedParquetFile[Record]( - path = s"${testSingleDir.getAbsolutePath}/typed", - conf = granularityConf, - suffix = ".parquet" - ) should containInAnyOrder(typedRecords) - - sc.run() + runWithRealContext() { sc => + val result = sc + .typedParquetFile[Record]( + path = testSingleDir.getAbsolutePath, + conf = granularityConf, + suffix = ".parquet" + ) + result should containInAnyOrder(typedRecords) + } } "readTyped" should "work with a predicate" in { - val sc = ScioContext() - sc - .parallelize(listFiles(s"${testSingleDir.getAbsolutePath}/typed")) - .readFiles( - ParquetRead.readTyped[Record]( - FilterApi.eq(FilterApi.binaryColumn("strField"), Binary.fromString("1")) + runWithRealContext() { sc => + val result = sc + .parallelize(listFiles(testSingleDir)) + .readFiles( + ParquetRead.readTypedFiles[Record]( + FilterApi.eq(FilterApi.binaryColumn("strField"), Binary.fromString("1")) + ) ) - ) should containSingleValue(Record("1")) - - sc.run() + result should containSingleValue(Record("1")) + } } - it should "work with a predicate and projection fn" in { - val sc = ScioContext() - sc - .parallelize(listFiles(s"${testSingleDir.getAbsolutePath}/typed")) - .readFiles( - ParquetRead.readTyped( - (r: Record) => r.strField, - FilterApi.eq(FilterApi.binaryColumn("strField"), Binary.fromString("1")), - ParquetConfiguration.empty() + it should "work with a predicate and projection" in { + runWithRealContext() { sc => + val result = sc + .parallelize(listFiles(testSingleDir)) + .readFiles( + ParquetRead.readTypedFiles[Record]( + FilterApi.eq(FilterApi.binaryColumn("strField"), Binary.fromString("1")), + ParquetConfiguration.empty() + ) ) - ) should containSingleValue("1") - - sc.run() + result should containSingleValue(Record("1")) + } } it should "be serializable" in { SerializableUtils.ensureSerializable( - ParquetRead.readTyped( - (r: Record) => r.strField, + ParquetRead.readTypedFiles[Record]( FilterApi.eq(FilterApi.binaryColumn("strField"), Binary.fromString("1")), ParquetConfiguration.empty() ) ) } - "readAvroGenericRecordFiles" should "work with a projection but no projectionFn" in { - val projection = Projection[Account](_.getId) - val expectedOut: Seq[GenericRecord] = (251 to 300).map { i => - new GenericRecordBuilder(projection).set("id", i).build() - } - - implicit val coder = avroGenericRecordCoder(projection) - val sc = ScioContext() - sc - .parallelize(listFiles(s"${testSingleDir.getAbsolutePath}/avro")) - .readFiles( - ParquetRead.readAvroGenericRecordFiles( - projection, - predicate = Predicate[Account](_.getId <= 300) - ) - ) should containInAnyOrder(expectedOut) - - sc.run() - } - - it should "work with a projection and projectionFn" in { - val projection = Projection[Account](_.getId) - val sc = ScioContext() - sc - .parallelize(listFiles(s"${testSingleDir.getAbsolutePath}/avro")) - .readFiles( - ParquetRead.readAvroGenericRecordFiles( - projection, - _.get("id").toString.toInt, - predicate = Predicate[Account](_.getId <= 300) - ) - ) should containInAnyOrder(251 to 300) - - sc.run() - } - - it should "work with a projection and projectionFn on files with multiple row groups" in { - val projection = Projection[Account](_.getId) - val sc = ScioContext() - sc - .parallelize(listFiles(s"${testMultiDir.getAbsolutePath}/avro")) - .readFiles( - ParquetRead.readAvroGenericRecordFiles( - projection, - _.get("id").toString.toInt, - predicate = Predicate[Account](_.getId <= 300) - ) - ) should containInAnyOrder(251 to 300) - - sc.run() - } - - it should "be serializable" in { - SerializableUtils.ensureSerializable( - ParquetRead.readAvroGenericRecordFiles( - Projection[Account](_.getId), - _.get("int_field").toString.toInt, - predicate = Predicate[Account](_.getId <= 300) - ) - ) - } - - "readAvro" should "work without a projection or a projectionFn" in { - val sc = ScioContext() - sc - .parallelize(listFiles(s"${testSingleDir.getAbsolutePath}/avro")) - .readFiles( - ParquetRead.readAvro[Account]( - predicate = Predicate[Account](_.getId == 300) - ) - ) should containSingleValue(avroRecords.find(_.getId == 300).get) - sc.run() - } - - it should "work with a projection but not a projectionFn as long as excluded fields are nullable" in { - val projection = Projection[Account](_.getId, _.getType, _.getAmount) - - val sc = ScioContext() - val output = sc - .parallelize(listFiles(s"${testSingleDir.getAbsolutePath}/avro")) - .readFiles( - ParquetRead.readAvro[Account]( - projection, - Predicate[Account](_.getId == 300) - ) - ) - - output should haveSize(1) - output should satisfy[Account]( - _.forall(a => - a.getId == 300 && a.getName == null && a.getType == "300" && a.getAmount == 300.0 - ) - ) - sc.run() - } - - it should "work with a projection and a projectionFn" in { - val projection = Projection[Account](_.getId) - val sc = ScioContext() - sc - .parallelize(listFiles(s"${testSingleDir.getAbsolutePath}/avro")) - .readFiles( - ParquetRead.readAvro( - projection, - (a: Account) => a.getId.toInt, - Predicate[Account](_.getId == 300) - ) - ) should containSingleValue(300) - sc.run() - } - - it should "work with a projection and a projectionFn on files with multiple row groups" in { - val projection = Projection[Account](_.getId) - val sc = ScioContext() - sc - .parallelize(listFiles(s"${testMultiDir.getAbsolutePath}/avro")) - .readFiles( - ParquetRead.readAvro( - projection, - (tr: Account) => tr.getId.toInt, - Predicate[Account](_.getId == 300) - ) - ) should containSingleValue(300) - sc.run() - } - - it should "be serializable" in { - SerializableUtils.ensureSerializable( - ParquetRead.readAvro( - Projection[Account](_.getId), - (tr: Account) => tr.getId.toInt, - Predicate[Account](_.getId == 300) - ) - ) - } - "ParquetReadConfiguration" should "default to using splittableDoFn only if RunnerV2 experiment is enabled" in { // Default to true if RunnerV2 is set and user hasn't configured SDF explicitly ParquetReadConfiguration.getUseSplittableDoFn( @@ -343,12 +192,4 @@ class ParquetReadFnTest extends PipelineSpec with BeforeAndAfterAll { PipelineOptionsFactory.fromArgs().create() ) shouldBe true } - - private def listFiles(dir: String): Seq[String] = - Files - .list(Paths.get(dir)) - .iterator() - .asScala - .map(_.toFile.toPath.toString) - .toSeq } diff --git a/scio-test/src/test/avro/schema.avsc b/scio-test/src/test/avro/schema.avsc index b3d294e7de..612956ac2b 100644 --- a/scio-test/src/test/avro/schema.avsc +++ b/scio-test/src/test/avro/schema.avsc @@ -1,108 +1,235 @@ -[{ +[ + { "type": "record", "name": "Account", "namespace": "com.spotify.scio.avro", "doc": "Record for an account", "fields": [ - {"name": "id", "type": "int"}, - {"name": "type", "type": "string"}, - {"name": "name", "type": ["null", "string"]}, - {"name": "amount", "type": "double"}, - { - "name": "account_status", - "type": [ - "null", - {"type": "enum", "name": "AccountStatus", "symbols": ["Active", "Inactive"]} - ], - "default": null - } + { + "name": "id", + "type": "int" + }, + { + "name": "type", + "type": "string" + }, + { + "name": "name", + "type": [ + "null", + "string" + ] + }, + { + "name": "amount", + "type": "double" + }, + { + "name": "account_status", + "type": [ + "null", + { + "type": "enum", + "name": "AccountStatus", + "symbols": [ + "Active", + "Inactive" + ] + } + ], + "default": null + } ] -}, { + }, + { "type": "record", "name": "User", "namespace": "com.spotify.scio.avro", "doc": "Record for a user", "fields": [ - {"name": "id", "type": "int"}, - {"name": "last_name", "type": "string"}, - {"name": "first_name", "type": "string"}, - {"name": "email", "type": "string"}, - { - "name": "accounts", - "type": { - "type": "array", - "items": "Account" - } - }, - { - "name": "address", - "type": { - "type": "record", - "name": "Address", - "namespace": "com.spotify.scio.avro", - "doc": "Record for an address", - "fields": [ - {"name": "street1", "type": "string"}, - {"name": "street2", "type": "string"}, - {"name": "city", "type": "string"}, - {"name": "state", "type": "string"}, - {"name": "zip", "type": "string"}, - {"name": "country", "type": "string"} - ] + { + "name": "id", + "type": "int" + }, + { + "name": "last_name", + "type": "string" + }, + { + "name": "first_name", + "type": "string" + }, + { + "name": "email", + "type": "string" + }, + { + "name": "accounts", + "type": { + "type": "array", + "items": "Account" + } + }, + { + "name": "address", + "type": { + "type": "record", + "name": "Address", + "namespace": "com.spotify.scio.avro", + "doc": "Record for an address", + "fields": [ + { + "name": "street1", + "type": "string" + }, + { + "name": "street2", + "type": "string" + }, + { + "name": "city", + "type": "string" + }, + { + "name": "state", + "type": "string" + }, + { + "name": "zip", + "type": "string" + }, + { + "name": "country", + "type": "string" } + ] } + } ] -}, { + }, + { "type": "record", "name": "TestRecord", "namespace": "com.spotify.scio.avro", "doc": "Record for testing", "fields": [ - {"name": "int_field", "type": ["null", "int"], "default": null}, - {"name": "long_field", "type": ["null", "long"], "default": null}, - {"name": "float_field", "type": ["null", "float"], "default": null}, - {"name": "double_field", "type": ["null", "double"], "default": null}, - {"name": "boolean_field", "type": ["null", "boolean"], "default": null}, - {"name": "string_field", "type": ["null", "string"], "default": null}, - {"name": "array_field", "type": {"type": "array", "items": "string"}, "default": null} + { + "name": "int_field", + "type": [ + "null", + "int" + ], + "default": null + }, + { + "name": "long_field", + "type": [ + "null", + "long" + ], + "default": null + }, + { + "name": "float_field", + "type": [ + "null", + "float" + ], + "default": null + }, + { + "name": "double_field", + "type": [ + "null", + "double" + ], + "default": null + }, + { + "name": "boolean_field", + "type": [ + "null", + "boolean" + ], + "default": null + }, + { + "name": "string_field", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "array_field", + "type": { + "type": "array", + "items": "string" + }, + "default": [] + } ] -}, -{ + }, + { "name": "StringFieldTest", "namespace": "com.spotify.scio.avro", "type": "record", "doc": "Record for testing string decoding", "fields": [ - {"name": "strField", "type": ["null", "string"]}, - {"name": "mapField", "type": [ - "null", {"type": "map", "values": "string"} - ], "default": null}, - {"name": "arrayField", "type": [ - "null", {"type": "array", "items": "string"} - ], "default": null} + { + "name": "strField", + "type": [ + "null", + "string" + ] + }, + { + "name": "mapField", + "type": [ + "null", + { + "type": "map", + "values": "string" + } + ], + "default": null + }, + { + "name": "arrayField", + "type": [ + "null", + { + "type": "array", + "items": "string" + } + ], + "default": null + } ] -}, -{ + }, + { "type": "record", "name": "TestLogicalTypes", "namespace": "com.spotify.scio.avro", "doc": "Record for testing logical types", "fields": [ - { - "name": "timestamp", - "type": { - "type": "long", "logicalType": "timestamp-millis" - } - }, - { - "name": "decimal", - "type": { - "type": "bytes", - "logicalType": "decimal", - "precision": 4, - "scale": 2 - } + { + "name": "timestamp", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + }, + { + "name": "decimal", + "type": { + "type": "bytes", + "logicalType": "decimal", + "precision": 4, + "scale": 2 } + } ] -} + } ] diff --git a/scio-test/src/test/scala/com/spotify/scio/avro/AvroUtils.scala b/scio-test/src/test/scala/com/spotify/scio/avro/AvroUtils.scala index 6405028eed..66694b4e7f 100644 --- a/scio-test/src/test/scala/com/spotify/scio/avro/AvroUtils.scala +++ b/scio-test/src/test/scala/com/spotify/scio/avro/AvroUtils.scala @@ -20,6 +20,7 @@ package com.spotify.scio.avro import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} import org.apache.avro.{Schema, SchemaBuilder} +import java.util.Collections import scala.jdk.CollectionConverters._ object AvroUtils { @@ -38,7 +39,7 @@ object AvroUtils { .array() .items() .stringType() - .noDefault() + .arrayDefault(Collections.emptyList[CharSequence]()) .endRecord() def newGenericRecord(i: Int): GenericRecord = diff --git a/site/src/main/paradox/io/Parquet.md b/site/src/main/paradox/io/Parquet.md index 1e425a5eda..973c178bcc 100644 --- a/site/src/main/paradox/io/Parquet.md +++ b/site/src/main/paradox/io/Parquet.md @@ -19,11 +19,12 @@ object ParquetJob { val (sc, args) = ContextAndArgs(cmdlineArgs) // Macros for generating column projections and row predicates + // Fields left out from projection must have default value val projection = Projection[TestRecord](_.getIntField, _.getLongField, _.getBooleanField) val predicate = Predicate[TestRecord](x => x.getIntField > 0 && x.getBooleanField) sc.parquetAvroFile[TestRecord]("input.parquet", projection, predicate) - // Map out projected fields right after reading + // Map out projected fields after reading .map(r => (r.getIntField, r.getStringField, r.getBooleanField)) sc.run() @@ -32,34 +33,8 @@ object ParquetJob { } ``` -Note that the result `TestRecord`s are not complete Avro objects. Only the projected columns (`intField`, `stringField`, `booleanField`) are present while the rest are null. These objects may fail serialization and it's recommended that you map them out to tuples or case classes right after reading. +Note that `predicate` logic is only applied when reading actual Parquet files but not in `JobTest`. Make sure the mocked input respects the predicate. -Also note that `predicate` logic is only applied when reading actual Parquet files but not in `JobTest`. To retain the filter behavior while using mock input, it's recommend that you do the following. - -```scala -import com.spotify.scio._ -import com.spotify.scio.parquet.avro._ -import com.spotify.scio.avro.TestRecord - -object ParquetJob { - def main(cmdlineArgs: Array[String]): Unit = { - - val (sc, args) = ContextAndArgs(cmdlineArgs) - - val projection = Projection[TestRecord](_.getIntField, _.getLongField, _.getBooleanField) - // Build both native filter function and Parquet FilterPredicate - // case class Predicates[T](native: T => Boolean, parquet: FilterPredicate) - val predicate = Predicate.build[TestRecord](x => x.getIntField > 0 && x.getBooleanField) - - sc.parquetAvroFile[TestRecord]("input.parquet", projection, predicate.parquet) - // filter natively with the same logic in case of mock input in `JobTest` - .filter(predicate.native) - - sc.run() - () - } -} -``` You can also read Avro generic records by specifying a reader schema. @@ -74,8 +49,8 @@ object ParquetJob { val (sc, args) = ContextAndArgs(cmdlineArgs) - sc.parquetAvroFile[GenericRecord]("input.parquet", TestRecord.getClassSchema) - // Map out projected fields into something type safe + sc.parquetAvroGenericRecordFile("input.parquet", TestRecord.getClassSchema) + // Map out fields into something type safe .map(r => (r.get("int_field").asInstanceOf[Int], r.get("string_field").toString)) sc.run()