Skip to content

Commit

Permalink
Provide module specific kryo coder registrar (#4753)
Browse files Browse the repository at this point in the history
* Provide module specific kryo coder registrar

* Add back avro dep in core
  • Loading branch information
RustedBones authored May 25, 2023
1 parent 924282d commit 15dba83
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 103 deletions.
3 changes: 1 addition & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,6 @@ lazy val `scio-core`: Project = project
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion,
"com.google.api-client" % "google-api-client" % googleClientsVersion,
"com.google.apis" % "google-api-services-dataflow" % googleApiServicesDataflowVersion,
"com.google.auto.service" % "auto-service" % autoServiceVersion,
"com.google.guava" % "guava" % guavaVersion,
"com.google.http-client" % "google-http-client" % googleHttpClientsVersion,
Expand Down Expand Up @@ -650,7 +649,7 @@ lazy val `scio-google-cloud-platform`: Project = project
.dependsOn(
`scio-core` % "compile;it->it",
`scio-avro` % "test",
`scio-test` % "test;it"
`scio-test` % "test->test;it"
)
.configs(IntegrationTest)
.settings(commonSettings)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2023 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.spotify.scio.coders

import com.spotify.scio.coders.instances.kryo.{GenericAvroSerializer, SpecificAvroSerializer}
import com.twitter.chill._
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificRecord

@KryoRegistrar
class AvroKryoRegistrar extends IKryoRegistrar {
override def apply(k: Kryo): Unit = {
k.forSubclass[SpecificRecord](new SpecificAvroSerializer)
k.forSubclass[GenericRecord](new GenericAvroSerializer)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@ import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.io.{InputChunked, OutputChunked}
import com.esotericsoftware.kryo.serializers.JavaSerializer
import com.google.protobuf.{ByteString, Message}
import com.spotify.scio.coders.instances.kryo.{GrpcSerializers => grpc, _}
import com.spotify.scio.coders.instances.kryo._
import com.spotify.scio.options.ScioOptions
import com.twitter.chill._
import com.twitter.chill.algebird.AlgebirdRegistrar
import com.twitter.chill.protobuf.ProtobufSerializer
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificRecord
import org.apache.beam.sdk.coders.Coder.NonDeterministicException
import org.apache.beam.sdk.coders.{AtomicCoder, CoderException => BCoderException, InstantCoder}
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder
import org.apache.beam.sdk.options.{PipelineOptions, PipelineOptionsFactory}
import org.apache.beam.sdk.util.VarInt
import org.apache.beam.sdk.util.common.ElementByteSizeObserver
Expand Down Expand Up @@ -98,7 +95,6 @@ final private class ScioKryoRegistrar extends IKryoRegistrar {
override def apply(k: Kryo): Unit = {
logger.debug("Loading common Kryo serializers...")
k.forClass(new CoderSerializer(InstantCoder.of()))
k.forClass(new CoderSerializer(TableRowJsonCoder.of()))
// Java Iterable/Collection are missing proper equality check, use custom CBF as a
// workaround
k.register(
Expand All @@ -116,8 +112,6 @@ final private class ScioKryoRegistrar extends IKryoRegistrar {
new JTraversableSerializer[Any, mutable.Buffer[Any]]
)

k.forSubclass[SpecificRecord](new SpecificAvroSerializer)
k.forSubclass[GenericRecord](new GenericAvroSerializer)
k.forSubclass[Message](new ProtobufSerializer)
k.forClass[LocalDate](new JodaLocalDateSerializer)
k.forClass[LocalTime](new JodaLocalTimeSerializer)
Expand All @@ -126,8 +120,8 @@ final private class ScioKryoRegistrar extends IKryoRegistrar {
k.forSubclass[Path](new JPathSerializer)
k.forSubclass[ByteString](new ByteStringSerializer)
k.forClass(new KVSerializer)
k.forClass[io.grpc.Status](new grpc.StatusSerializer)
k.forSubclass[io.grpc.StatusRuntimeException](new grpc.StatusRuntimeExceptionSerializer)
k.forClass[io.grpc.Status](new StatusSerializer)
k.forSubclass[io.grpc.StatusRuntimeException](new StatusRuntimeExceptionSerializer)
k.addDefaultSerializer(classOf[Throwable], new JavaSerializer)
()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2020 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.scio.coders.instances.kryo

import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.{Input, Output}
import com.twitter.chill.KSerializer
import io.grpc.{Metadata, Status, StatusRuntimeException}

private[coders] class StatusSerializer extends KSerializer[Status] {
override def write(kryo: Kryo, output: Output, status: Status): Unit = {
output.writeInt(status.getCode().value())
output.writeString(status.getDescription)
kryo.writeClassAndObject(output, status.getCause)
}

override def read(kryo: Kryo, input: Input, `type`: Class[Status]): Status = {
val code = input.readInt()
val description = input.readString()
val cause = kryo.readClassAndObject(input).asInstanceOf[Throwable]

Status
.fromCodeValue(code)
.withDescription(description)
.withCause(cause)
}
}

private[coders] class StatusRuntimeExceptionSerializer extends KSerializer[StatusRuntimeException] {
lazy val statusSer = new StatusSerializer()

override def write(kryo: Kryo, output: Output, e: StatusRuntimeException): Unit = {
kryo.writeObject(output, e.getStatus, statusSer)
kryo.writeObjectOrNull(output, e.getTrailers, classOf[Metadata])
}

override def read(
kryo: Kryo,
input: Input,
`type`: Class[StatusRuntimeException]
): StatusRuntimeException = {
val status = kryo.readObject(input, classOf[Status], statusSer)
val trailers = kryo.readObjectOrNull(input, classOf[Metadata])

new StatusRuntimeException(status, trailers)
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2023 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.spotify.scio.coders

import com.google.cloud.bigtable.grpc.scanner.BigtableRetriesExhaustedException
import com.spotify.scio.bigquery.TableRow
import com.spotify.scio.coders.instances.kryo.CoderSerializer
import com.spotify.scio.coders.instances.kryo.BigtableRetriesExhaustedExceptionSerializer
import com.twitter.chill._
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder

@KryoRegistrar
class GcpKryoRegistrar extends IKryoRegistrar {
override def apply(k: Kryo): Unit = {
k.forClass[TableRow](new CoderSerializer(TableRowJsonCoder.of()))
k.forClass[BigtableRetriesExhaustedException](new BigtableRetriesExhaustedExceptionSerializer)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2023 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.spotify.scio.coders.instances.kryo

import com.esotericsoftware.kryo.serializers.DefaultSerializers.StringSerializer
import com.google.cloud.bigtable.grpc.scanner.BigtableRetriesExhaustedException
import com.twitter.chill._

private[coders] class BigtableRetriesExhaustedExceptionSerializer
extends KSerializer[BigtableRetriesExhaustedException] {

private lazy val stringSerializer = new StringSerializer()
private lazy val statusExceptionSerializer = new StatusRuntimeExceptionSerializer()

override def write(kryo: Kryo, output: Output, e: BigtableRetriesExhaustedException): Unit = {
kryo.writeObject(output, e.getMessage, stringSerializer)
kryo.writeObject(output, e.getCause, statusExceptionSerializer)
}

override def read(
kryo: Kryo,
input: Input,
`type`: Class[BigtableRetriesExhaustedException]
): BigtableRetriesExhaustedException = {
val message = kryo.readObject(input, classOf[String], stringSerializer)
val cause = kryo.readObject(input, classOf[Throwable], statusExceptionSerializer)
new BigtableRetriesExhaustedException(message, cause)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2023 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.spotify.scio.coders.instance.kryo

import com.google.cloud.bigtable.grpc.scanner.BigtableRetriesExhaustedException
import com.spotify.scio.coders.instances.kryo.GrpcSerializerTest.eqStatusRuntimeException
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import io.grpc.{Metadata, Status, StatusRuntimeException}
import org.apache.beam.sdk.util.CoderUtils
import org.scalactic.Equality
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

object GcpSerializerTest {

implicit val eqBigtableRetriesExhaustedException: Equality[BigtableRetriesExhaustedException] = {
case (a: BigtableRetriesExhaustedException, b: BigtableRetriesExhaustedException) =>
a.getMessage == b.getMessage &&
((Option(a.getCause), Option(b.getCause)) match {
case (None, None) => true
case (Some(ac: StatusRuntimeException), Some(bc: StatusRuntimeException)) =>
eqStatusRuntimeException.areEqual(ac, bc)
case _ =>
false
})
case _ => false
}

}

class GcpSerializerTest extends AnyFlatSpec with Matchers {

import GcpSerializerTest._

"GcpSerializer" should "roundtrip" in {
val metadata = new Metadata()
metadata.put(Metadata.Key.of[String]("k", Metadata.ASCII_STRING_MARSHALLER), "v")
val cause = new StatusRuntimeException(
Status.OK.withCause(new RuntimeException("bar")).withDescription("bar"),
metadata
)
roundtrip(new BigtableRetriesExhaustedException("Error", cause))
}

private def roundtrip(t: BigtableRetriesExhaustedException): Unit = {
val kryoBCoder = CoderMaterializer.beamWithDefault(Coder[BigtableRetriesExhaustedException])

val bytes = CoderUtils.encodeToByteArray(kryoBCoder, t)
val copy = CoderUtils.decodeFromByteArray(kryoBCoder, bytes)

t shouldEqual copy
}

}
Loading

0 comments on commit 15dba83

Please sign in to comment.