Fix Spanner write mutation coder serialization#5919
Fix Spanner write mutation coder serialization#5919NA-V10 wants to merge 2 commits intospotify:mainfrom
Conversation
|
Thanks for reporting this issue. I updated the Spanner write path to explicitly use |
|
hey, thanks for the contribution! I think we shouldn't have to set the Coder explicitly here - as long as % sbt scio-examples/console
scala> import com.spotify.scio._
scala> val sc = ScioContext()
scala> val mutations = Seq(com.google.cloud.spanner.Mutation.newInsertBuilder("myTable").set("foo").to("bar").build())
scala> val selectedCoder = (sc.parallelize(mutations)).coder
^
error:
Cannot find an implicit Coder instance for type:
>> com.google.cloud.spanner.Mutation
This can happen for a few reasons, but the most common case is that a data
member somewhere within this type doesn't have an implicit Coder instance in scope.
Here are some hints:
- For collections, ensure that a Coder instance is in scope for the element type.
- For module specific types, you may need to explicitly import the coders, eg avro:
import com.spotify.scio.avro._
- For sealed traits and case classes, you can identify the missing member's coder:
scala> com.spotify.scio.coders.Coder.gen[Foo]
error: magnolia: could not find Coder.Typeclass for type Bar
in parameter 'xxx' of product type Foo
- For generic methods, you may need to add an implicit parameter so that:
def foo[T](coll: SCollection[SomeClass], param: String): SCollection[T]
may become:
def foo[T](coll: SCollection[SomeClass],
param: String)(implicit c: Coder[T]): SCollection[T]
^
Alternatively, you can use a context bound instead of an implicit parameter:
def foo[T: Coder](coll: SCollection[SomeClass], param: String): SCollection[T]
# Add in coders import and verify spannerMutationCoder is selected
scala> import com.spotify.scio.spanner.coders._
scala> val selectedCoder = (sc.parallelize(mutations)).coder
val selectedCoder: com.spotify.scio.coders.Coder[com.google.cloud.spanner.Mutation] = Beam(SerializableCoder(com.google.cloud.spanner.Mutation))Are you importing |
|
That makes sense, thanks! I had I'll remove the explicit coder assignment and rely on |
|
Updated this to remove the explicit mutation coder assignment and rely on the implicit Spanner coder import instead. |
Summary
This change explicitly sets
spannerMutationCoderon theSCollection[Mutation]before applying the Beam Spanner write transform.Previously, the write path relied on Beam/Kryo fallback serialization, which could lead to
UnsupportedOperationExceptionwhen serializingMutation/MutationGroup.Changes Made
spannerMutationCoderusingCoderMaterializer.beamSCollection[Mutation]data.setCoder(coder).applyInternal(transform)Why
A
spannerMutationCoderalready exists inCoderInstances, but it was not being explicitly used inSpannerWrite. This change ensures that Beam uses the intended serializer forMutationobjects during writes.