Skip to content

Commit

Permalink
Require import for kryo implicit fallback coder (#5199)
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones authored Jan 24, 2024
1 parent 9f4603e commit 91d4d80
Show file tree
Hide file tree
Showing 24 changed files with 320 additions and 347 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package com.spotify.scio.coders.avro

import com.spotify.scio.avro.{GenericRecordDatumFactory, SpecificRecordDatumFactory}
import com.spotify.scio.coders.Coder
import com.spotify.scio.coders.{Coder, CoderGrammar}
import com.spotify.scio.util.ScioUtil
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
Expand Down Expand Up @@ -113,7 +113,7 @@ private object SpecificFixedCoder {
}
}

trait AvroCoders {
trait AvroCoders extends CoderGrammar {

/**
* Create a Coder for Avro GenericRecord given the schema of the GenericRecord.
Expand All @@ -128,7 +128,7 @@ trait AvroCoders {

// XXX: similar to GenericAvroSerializer
def avroGenericRecordCoder: Coder[GenericRecord] =
Coder.beam(new SlowGenericRecordCoder)
beam(new SlowGenericRecordCoder)

implicit def avroSpecificRecordCoder[T <: SpecificRecord: ClassTag]: Coder[T] = {
val recordClass = ScioUtil.classOf[T]
Expand All @@ -138,8 +138,10 @@ trait AvroCoders {
}

def avroCoder[T <: IndexedRecord](factory: AvroDatumFactory[T], schema: Schema): Coder[T] =
Coder.beam(AvroCoder.of(factory, schema))
beam(AvroCoder.of(factory, schema))

implicit def avroSpecificFixedCoder[T <: SpecificFixed: ClassTag]: Coder[T] =
SpecificFixedCoder[T]
}

private[coders] object AvroCoders extends AvroCoders
98 changes: 13 additions & 85 deletions scio-core/src/main/scala/com/spotify/scio/coders/Coder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,14 @@

package com.spotify.scio.coders

import com.spotify.scio.IsJavaBean
import com.spotify.scio.{IsJavaBean, MagnoliaMacros}
import com.spotify.scio.coders.instances._
import com.spotify.scio.transforms.BaseAsyncLookupDoFn
import org.apache.beam.sdk.coders.{Coder => BCoder}
import org.apache.beam.sdk.values.KV
import org.typelevel.scalaccompat.annotation.unused

import java.util.{List => JList}
import scala.annotation.implicitNotFound
import scala.collection.compat._
import scala.collection.{mutable => m, BitSet, SortedSet}
import scala.reflect.ClassTag
import scala.util.Try
import java.util.UUID

@implicitNotFound(
"""
Expand Down Expand Up @@ -151,7 +146,7 @@ final case class AggregateCoder[T] private (coder: Coder[T]) extends Coder[java.
*
* - To explicitly use kryo Coder use [[Coder.kryo]]
*/
sealed trait CoderGrammar {
private[coders] trait CoderGrammar {

/** Create a ScioCoder from a Beam Coder */
def raw[T](beam: BCoder[T]): Coder[T] =
Expand Down Expand Up @@ -215,93 +210,26 @@ sealed trait CoderGrammar {
}

object Coder
extends CoderGrammar
extends ScalaCoders
with TupleCoders
with JavaCoders
with JodaCoders
with BeamTypeCoders
with ProtobufCoders
with AlgebirdCoders
with GuavaCoders
with JodaCoders
with BeamTypeCoders
with CoderDerivation
with LowPriorityCoders {
@inline final def apply[T](implicit c: Coder[T]): Coder[T] = c

implicit val charCoder: Coder[Char] = ScalaCoders.charCoder
implicit val byteCoder: Coder[Byte] = ScalaCoders.byteCoder
implicit val stringCoder: Coder[String] = ScalaCoders.stringCoder
implicit val shortCoder: Coder[Short] = ScalaCoders.shortCoder
implicit val intCoder: Coder[Int] = ScalaCoders.intCoder
implicit val longCoder: Coder[Long] = ScalaCoders.longCoder
implicit val floatCoder: Coder[Float] = ScalaCoders.floatCoder
implicit val doubleCoder: Coder[Double] = ScalaCoders.doubleCoder
implicit val booleanCoder: Coder[Boolean] = ScalaCoders.booleanCoder
implicit val unitCoder: Coder[Unit] = ScalaCoders.unitCoder
implicit val nothingCoder: Coder[Nothing] = ScalaCoders.nothingCoder
implicit val bigIntCoder: Coder[BigInt] = ScalaCoders.bigIntCoder
implicit val bigDecimalCoder: Coder[BigDecimal] = ScalaCoders.bigDecimalCoder
implicit def tryCoder[A: Coder]: Coder[Try[A]] = ScalaCoders.tryCoder
implicit def eitherCoder[A: Coder, B: Coder]: Coder[Either[A, B]] = ScalaCoders.eitherCoder
implicit def optionCoder[T: Coder, S[_] <: Option[_]]: Coder[S[T]] = ScalaCoders.optionCoder
implicit val noneCoder: Coder[None.type] = ScalaCoders.noneCoder
implicit val bitSetCoder: Coder[BitSet] = ScalaCoders.bitSetCoder
implicit def seqCoder[T: Coder]: Coder[Seq[T]] = ScalaCoders.seqCoder
implicit def iterableCoder[T: Coder]: Coder[Iterable[T]] = ScalaCoders.iterableCoder
implicit def throwableCoder[T <: Throwable: ClassTag]: Coder[T] = ScalaCoders.throwableCoder
implicit def listCoder[T: Coder]: Coder[List[T]] = ScalaCoders.listCoder
implicit def iterableOnceCoder[T: Coder]: Coder[IterableOnce[T]] =
ScalaCoders.iterableOnceCoder
implicit def setCoder[T: Coder]: Coder[Set[T]] = ScalaCoders.setCoder
implicit def mutableSetCoder[T: Coder]: Coder[m.Set[T]] = ScalaCoders.mutableSetCoder
implicit def vectorCoder[T: Coder]: Coder[Vector[T]] = ScalaCoders.vectorCoder
implicit def arrayBufferCoder[T: Coder]: Coder[m.ArrayBuffer[T]] = ScalaCoders.arrayBufferCoder
implicit def bufferCoder[T: Coder]: Coder[m.Buffer[T]] = ScalaCoders.bufferCoder
implicit def listBufferCoder[T: Coder]: Coder[m.ListBuffer[T]] = ScalaCoders.listBufferCoder
implicit def arrayCoder[T: Coder: ClassTag]: Coder[Array[T]] = ScalaCoders.arrayCoder
implicit val arrayByteCoder: Coder[Array[Byte]] = ScalaCoders.arrayByteCoder
implicit def wrappedArrayCoder[T: Coder: ClassTag](implicit
wrap: Array[T] => m.WrappedArray[T]
): Coder[m.WrappedArray[T]] = ScalaCoders.wrappedArrayCoder
implicit def mutableMapCoder[K: Coder, V: Coder]: Coder[m.Map[K, V]] = ScalaCoders.mutableMapCoder
implicit def mapCoder[K: Coder, V: Coder]: Coder[Map[K, V]] = ScalaCoders.mapCoder
implicit def sortedSetCoder[T: Coder: Ordering]: Coder[SortedSet[T]] = ScalaCoders.sortedSetCoder

implicit val voidCoder: Coder[Void] = JavaCoders.voidCoder
implicit val uuidCoder: Coder[UUID] = JavaCoders.uuidCoder
implicit val uriCoder: Coder[java.net.URI] = JavaCoders.uriCoder
implicit val pathCoder: Coder[java.nio.file.Path] = JavaCoders.pathCoder
implicit def jIterableCoder[T: Coder]: Coder[java.lang.Iterable[T]] = JavaCoders.jIterableCoder
implicit def jListCoder[T: Coder]: Coder[JList[T]] = JavaCoders.jListCoder
implicit def jArrayListCoder[T: Coder]: Coder[java.util.ArrayList[T]] = JavaCoders.jArrayListCoder
implicit def jMapCoder[K: Coder, V: Coder]: Coder[java.util.Map[K, V]] = JavaCoders.jMapCoder
implicit def jTryCoder[A](implicit c: Coder[Try[A]]): Coder[BaseAsyncLookupDoFn.Try[A]] =
JavaCoders.jTryCoder
implicit val jBitSetCoder: Coder[java.util.BitSet] = JavaCoders.jBitSetCoder
implicit val jShortCoder: Coder[java.lang.Short] = JavaCoders.jShortCoder
implicit val jByteCoder: Coder[java.lang.Byte] = JavaCoders.jByteCoder
implicit val jIntegerCoder: Coder[java.lang.Integer] = JavaCoders.jIntegerCoder
implicit val jLongCoder: Coder[java.lang.Long] = JavaCoders.jLongCoder
implicit val jFloatCoder: Coder[java.lang.Float] = JavaCoders.jFloatCoder
implicit val jDoubleCoder: Coder[java.lang.Double] = JavaCoders.jDoubleCoder
implicit val jBooleanCoder: Coder[java.lang.Boolean] = JavaCoders.jBooleanCoder
implicit val jBigIntegerCoder: Coder[java.math.BigInteger] = JavaCoders.jBigIntegerCoder
implicit val jBigDecimalCoder: Coder[java.math.BigDecimal] = JavaCoders.jBigDecimalCoder
implicit val serializableCoder: Coder[Serializable] = Coder.kryo[Serializable]
implicit val jInstantCoder: Coder[java.time.Instant] = JavaCoders.jInstantCoder
implicit val jLocalDateCoder: Coder[java.time.LocalDate] = JavaCoders.jLocalDateCoder
implicit val jLocalTimeCoder: Coder[java.time.LocalTime] = JavaCoders.jLocalTimeCoder
implicit val jLocalDateTimeCoder: Coder[java.time.LocalDateTime] = JavaCoders.jLocalDateTimeCoder
implicit val jDurationCoder: Coder[java.time.Duration] = JavaCoders.jDurationCoder
implicit val jPeriodCoder: Coder[java.time.Period] = JavaCoders.jPeriodCoder
implicit val jSqlTimestamp: Coder[java.sql.Timestamp] = JavaCoders.jSqlTimestamp
implicit val jSqDate: Coder[java.sql.Date] = JavaCoders.jSqlDate
implicit val jSqlTime: Coder[java.sql.Time] = JavaCoders.jSqlTime
implicit def coderJEnum[E <: java.lang.Enum[E]: ClassTag]: Coder[E] = JavaCoders.coderJEnum

def fallback[T](implicit lp: shapeless.LowPriority): Coder[T] =
@deprecated("Use Coder.kryo[T] instead", "0.14.0")
def fallback[T](@unused lp: shapeless.LowPriority): Coder[T] =
macro CoderMacros.issueFallbackWarning[T]
}

trait LowPriorityCoders extends LowPriorityCoderDerivation {
implicit def javaBeanCoder[T: IsJavaBean: ClassTag]: Coder[T] = JavaCoders.javaBeanCoder
trait LowPriorityCoders { self: CoderDerivation with JavaBeanCoders =>
implicit override def javaBeanCoder[T: IsJavaBean: ClassTag]: Coder[T] = JavaCoders.javaBeanCoder
implicit override def gen[T]: Coder[T] = macro MagnoliaMacros.genWithoutAnnotations[T]
}

private[coders] object CoderStackTrace {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package com.spotify.scio.coders

import com.spotify.scio.MagnoliaMacros
import com.twitter.chill.ClosureCleaner
import magnolia1._

import scala.reflect.ClassTag

object LowPriorityCoderDerivation {
private object CoderDerivation {

private object ProductIndexedSeqLike {
def apply(p: Product): ProductIndexedSeqLike = new ProductIndexedSeqLike(p)
Expand Down Expand Up @@ -101,9 +102,9 @@ object LowPriorityCoderDerivation {

}

trait LowPriorityCoderDerivation {
trait CoderDerivation extends CoderGrammar {

import LowPriorityCoderDerivation._
import CoderDerivation._

type Typeclass[T] = Coder[T]

Expand All @@ -113,21 +114,21 @@ trait LowPriorityCoderDerivation {

if (ctx.isValueClass) {
val p = ctx.parameters.head
Coder.xmap(p.typeclass.asInstanceOf[Coder[Any]])(
xmap(p.typeclass.asInstanceOf[Coder[Any]])(
closureFunction(constructor)(c => v => c.rawConstruct(Seq(v))),
p.dereference
)
} else if (ctx.isObject) {
Coder.singleton(typeName, closureSupplier(constructor)(_.rawConstruct(Seq.empty)))
singleton(typeName, closureSupplier(constructor)(_.rawConstruct(Seq.empty)))
} else {
Coder.ref(typeName) {
ref(typeName) {
val cs = Array.ofDim[(String, Coder[Any])](ctx.parameters.length)

ctx.parameters.foreach { p =>
cs.update(p.index, p.label -> p.typeclass.asInstanceOf[Coder[Any]])
}

Coder.record[T](typeName, cs)(
record[T](typeName, cs)(
closureFunction(constructor)(_.rawConstruct),
v => ProductIndexedSeqLike(v.asInstanceOf[Product])
)
Expand All @@ -144,24 +145,21 @@ trait LowPriorityCoderDerivation {
if (sealedTrait.subtypes.length <= 2) {
val booleanId: Int => Boolean = _ != 0
val cs = coders.map { case (key, v) => (booleanId(key), v) }
Coder.disjunction[T, Boolean](typeName, cs)(
disjunction[T, Boolean](typeName, cs)(
closureFunction(identifier)(_.id).andThen(booleanId)
)
} else {
Coder.disjunction[T, Int](typeName, coders)(closureFunction(identifier)(_.id))
disjunction[T, Int](typeName, coders)(closureFunction(identifier)(_.id))
}
}

/**
* Derive a Coder for a type T given implicit coders of all parameters in the constructor of type
* T is in scope. For sealed trait, implicit coders of parameters of the constructors of all
* sub-types should be in scope.
*
* In case of a missing [[shapeless.LowPriority]] implicit error when calling this method as
* [[Coder.gen[Type]] ], it means that Scio is unable to derive a BeamCoder for some parameter [P]
* in the constructor of Type. This happens when no implicit Coder instance for type P is in
* scope. This is fixed by placing an implicit Coder of type P in scope, using [[Coder.kryo[P]] ]
* or defining the Coder manually (see also [[Coder.xmap]])
*/
implicit def gen[T]: Coder[T] = macro CoderMacros.wrappedCoder[T]
def gen[T]: Coder[T] = macro MagnoliaMacros.genWithoutAnnotations[T]
}

@deprecated("Use CoderDerivation instead", "0.14.0")
trait LowPriorityCoderDerivation extends CoderDerivation
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,27 @@

package com.spotify.scio.coders.instances

import com.spotify.scio.coders.Coder
import com.spotify.scio.coders.{Coder, CoderGrammar}
import com.twitter.algebird.{BF, Batched, CMS, Moments, TopCMS, TopK}

trait AlgebirdCoders {
implicit def cmsCoder[K]: Coder[CMS[K]] = Coder.kryo
implicit def topCmsCoder[K]: Coder[TopCMS[K]] = Coder.kryo
implicit def bfCoder[K]: Coder[BF[K]] = Coder.kryo
implicit def topKCoder[K]: Coder[TopK[K]] = Coder.kryo
implicit def batchedCoder[U]: Coder[Batched[U]] = Coder.kryo
implicit def momentsCoder[U]: Coder[Moments] =
Coder.xmap(Coder[(Double, Double, Double, Double, Double)])(
trait AlgebirdCoders extends CoderGrammar {
private lazy val kryoCmsCoder: Coder[CMS[_]] = kryo
private lazy val kryoTopCmsCoder: Coder[TopCMS[_]] = kryo
private lazy val kryoBfCoder: Coder[BF[_]] = kryo
private lazy val kryoTopKCoder: Coder[TopK[_]] = kryo
private lazy val kryoBatchedCoder: Coder[Batched[_]] = kryo

implicit def cmsCoder[T]: Coder[CMS[T]] = kryoCmsCoder.asInstanceOf[Coder[CMS[T]]]
implicit def topCmsCoder[T]: Coder[TopCMS[T]] = kryoTopCmsCoder.asInstanceOf[Coder[TopCMS[T]]]
implicit def bfCoder[T]: Coder[BF[T]] = kryoBfCoder.asInstanceOf[Coder[BF[T]]]
implicit def topKCoder[T]: Coder[TopK[T]] = kryoTopKCoder.asInstanceOf[Coder[TopK[T]]]
implicit def batchedCoder[T]: Coder[Batched[T]] = kryoBatchedCoder.asInstanceOf[Coder[Batched[T]]]

implicit lazy val momentsCoder: Coder[Moments] =
xmap(Coder[(Double, Double, Double, Double, Double)])(
{ case (m0D, m1, m2, m3, m4) => new Moments(m0D, m1, m2, m3, m4) },
m => (m.m0D, m.m1, m.m2, m.m3, m.m4)
)
}

private[coders] object AlgebirdCoders extends AlgebirdCoders
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package com.spotify.scio.coders.instances
import com.google.api.client.json.GenericJson
import com.google.api.client.json.JsonObjectParser
import com.google.api.client.json.gson.GsonFactory
import com.spotify.scio.coders.Coder
import com.spotify.scio.coders.{Coder, CoderGrammar}
import com.spotify.scio.util.ScioUtil

import java.io.StringReader
Expand All @@ -39,28 +39,28 @@ import org.apache.beam.sdk.values.{KV, Row}

import scala.reflect.ClassTag

trait BeamTypeCoders {
trait BeamTypeCoders extends CoderGrammar {
import BeamTypeCoders._

implicit def intervalWindowCoder: Coder[IntervalWindow] = Coder.beam(IntervalWindow.getCoder)
implicit lazy val intervalWindowCoder: Coder[IntervalWindow] = beam(IntervalWindow.getCoder)

implicit def globalWindowCoder: Coder[GlobalWindow] = Coder.beam(GlobalWindow.Coder.INSTANCE)
implicit lazy val globalWindowCoder: Coder[GlobalWindow] = beam(GlobalWindow.Coder.INSTANCE)

implicit def boundedWindowCoder: Coder[BoundedWindow] = Coder.kryo[BoundedWindow]
implicit lazy val boundedWindowCoder: Coder[BoundedWindow] = kryo[BoundedWindow]

implicit def paneInfoCoder: Coder[PaneInfo] = Coder.beam(PaneInfo.PaneInfoCoder.of())
implicit lazy val paneInfoCoder: Coder[PaneInfo] = beam(PaneInfo.PaneInfoCoder.of())

def row(schema: BSchema): Coder[Row] = Coder.beam(RowCoder.of(schema))
def row(schema: BSchema): Coder[Row] = beam(RowCoder.of(schema))

implicit def beamKVCoder[K: Coder, V: Coder]: Coder[KV[K, V]] = Coder.kv(Coder[K], Coder[V])
implicit def beamKVCoder[K: Coder, V: Coder]: Coder[KV[K, V]] = kv(Coder[K], Coder[V])

implicit def readableFileCoder: Coder[ReadableFile] = Coder.beam(ReadableFileCoder.of())
implicit lazy val readableFileCoder: Coder[ReadableFile] = beam(ReadableFileCoder.of())

implicit def matchResultMetadataCoder: Coder[MatchResult.Metadata] =
Coder.beam(MetadataCoderV2.of())
implicit lazy val matchResultMetadataCoder: Coder[MatchResult.Metadata] =
beam(MetadataCoderV2.of())

implicit def genericJsonCoder[T <: GenericJson: ClassTag]: Coder[T] =
Coder.xmap(Coder[String])(
xmap(Coder[String])(
str => DefaultJsonObjectParser.parseAndClose(new StringReader(str), ScioUtil.classOf[T]),
DefaultJsonObjectParser.getJsonFactory().toString(_)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@
package com.spotify.scio.coders.instances

import java.io.{InputStream, OutputStream}
import com.google.common.hash.BloomFilter
import com.spotify.scio.coders.Coder
import com.google.common.{hash => g}
import com.google.common.hash.{BloomFilter, Funnel}
import com.spotify.scio.coders.{Coder, CoderGrammar}
import org.apache.beam.sdk.coders.CustomCoder

class GuavaBloomFilterCoder[T](implicit val funnel: g.Funnel[_ >: T])
extends CustomCoder[g.BloomFilter[T]] {
class GuavaBloomFilterCoder[T](implicit val funnel: Funnel[_ >: T])
extends CustomCoder[BloomFilter[T]] {
override def encode(value: BloomFilter[T], outStream: OutputStream): Unit =
value.writeTo(outStream)
override def decode(inStream: InputStream): BloomFilter[T] =
BloomFilter.readFrom[T](inStream, funnel)
override def verifyDeterministic(): Unit = {}
}

trait GuavaCoders {
implicit def guavaBFCoder[T](implicit x: g.Funnel[_ >: T]): Coder[g.BloomFilter[T]] =
Coder.beam(new GuavaBloomFilterCoder[T])
trait GuavaCoders extends CoderGrammar {
implicit def guavaBFCoder[T](implicit x: Funnel[_ >: T]): Coder[BloomFilter[T]] =
beam(new GuavaBloomFilterCoder[T])
}

private[coders] object GuavaCoders extends GuavaCoders
Loading

0 comments on commit 91d4d80

Please sign in to comment.