Skip to content

Commit

Permalink
Merge branch 'main' into relax-transform-coder
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones authored Apr 18, 2023
2 parents 0b0546d + 01ca7c8 commit c65c6ad
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 159 deletions.
20 changes: 2 additions & 18 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -188,25 +188,9 @@ def previousVersion(currentVersion: String): Option[String] = {

lazy val mimaSettings = Def.settings(
mimaBinaryIssueFilters := Seq(
// minor scio-tensorflow breaking changes for 0.12.6
// minor scio-core breaking changes for 0.12.8
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.spotify.scio.tensorflow.syntax.SeqExampleSCollectionOps.saveAsTfRecordFile"
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.spotify.scio.tensorflow.syntax.SeqExampleSCollectionOps.saveAsTfRecordFile$extension"
),
// minor scio-grpc breaking changes for 0.12.6
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.spotify.scio.grpc.GrpcSCollectionOps.grpcLookup"
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.spotify.scio.grpc.GrpcSCollectionOps.grpcLookup$extension"
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.spotify.scio.grpc.GrpcSCollectionOps.grpcLookupStream"
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.spotify.scio.grpc.GrpcSCollectionOps.grpcLookupStream$extension"
"com.spotify.scio.values.SCollectionWithSideOutput.this"
)
),
mimaPreviousArtifacts :=
Expand Down
2 changes: 2 additions & 0 deletions scio-core/src/main/scala/com/spotify/scio/coders/Coder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,8 @@ object Coder
implicit val jDurationCoder: Coder[java.time.Duration] = JavaCoders.jDurationCoder
implicit val jPeriodCoder: Coder[java.time.Period] = JavaCoders.jPeriodCoder
implicit val jSqlTimestamp: Coder[java.sql.Timestamp] = JavaCoders.jSqlTimestamp
implicit val jSqDate: Coder[java.sql.Date] = JavaCoders.jSqlDate
implicit val jSqlTime: Coder[java.sql.Time] = JavaCoders.jSqlTime
implicit def coderJEnum[E <: java.lang.Enum[E]: ClassTag]: Coder[E] = JavaCoders.coderJEnum

def fallback[T](implicit lp: shapeless.LowPriority): Coder[T] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ trait JavaCoders extends JavaBeanCoders {
implicit def jSqlTimestamp: Coder[java.sql.Timestamp] =
Coder.xmap(jInstantCoder)(java.sql.Timestamp.from, _.toInstant())

implicit def jSqlDate: Coder[java.sql.Date] =
Coder.xmap(jLocalDateCoder)(java.sql.Date.valueOf, _.toLocalDate())

implicit def jSqlTime: Coder[java.sql.Time] =
Coder.xmap(jLocalTimeCoder)(java.sql.Time.valueOf, _.toLocalTime())

implicit def coderJEnum[E <: java.lang.Enum[E]: ClassTag]: Coder[E] =
Coder.xmap(Coder[String])(
value => java.lang.Enum.valueOf(ScioUtil.classOf[E], value),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] {
* @group side
*/
def withSideOutputs(sides: SideOutput[_]*): SCollectionWithSideOutput[T] =
new SCollectionWithSideOutput[T](internal, context, sides)
new SCollectionWithSideOutput[T](this, sides)

// =======================================================================
// Windowing operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,27 @@ import scala.jdk.CollectionConverters._
* s of the [[SideOutput]] s are accessed via the additional [[SideOutputCollections]] return value.
*/
class SCollectionWithSideOutput[T] private[values] (
val internal: PCollection[T],
val context: ScioContext,
coll: SCollection[T],
sides: Iterable[SideOutput[_]]
) extends PCollectionWrapper[T] {

override val internal: PCollection[T] = coll.internal
override val context: ScioContext = coll.context

private val sideTags = TupleTagList.of(sides.map(_.tupleTag).toList.asJava)

override def withName(name: String): this.type = {
coll.withName(name)
this
}

private def apply[U: Coder](f: DoFn[T, U]): (SCollection[U], SideOutputCollections) = {
val mainTag = new TupleTag[U]

val dofn = ParDo.of(f).withOutputTags(mainTag, sideTags)
val tuple = this.applyInternal(dofn)
val tuple = this.applyInternal(coll.tfName, dofn)

val main =
tuple.get(mainTag).setCoder(CoderMaterializer.beam(context, Coder[U]))
val main = tuple.get(mainTag).setCoder(CoderMaterializer.beam(context, Coder[U]))

sides.foreach { s =>
tuple
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ trait TransformNameable {
nameProvider.getClass != classOf[ConstNameProvider],
s"withName() has already been used to set '$tfName' as the name for the next transform."
)
nameProvider = new ConstNameProvider(name)
nameProvider = ConstNameProvider(name)
this
}
}
Expand Down
33 changes: 24 additions & 9 deletions scio-test/src/test/scala/com/spotify/scio/coders/CoderTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import org.scalatest.matchers.should.Matchers

import java.io.{ByteArrayInputStream, ObjectOutputStream, ObjectStreamClass}
import java.nio.charset.Charset
import java.time.Instant
import java.time.format.DateTimeFormatter
import java.time._
import java.util.UUID
import scala.collection.{mutable => mut}
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -301,7 +302,7 @@ final class CoderTest extends AnyFlatSpec with Matchers {
)

t22.take(2) coderShould roundtrip()
t22.take(3) coderShould roundtrip()
t22.take(3) coderShould roundtrip
t22.take(4) coderShould roundtrip()
t22.take(5) coderShould roundtrip()
t22.take(6) coderShould roundtrip()
Expand Down Expand Up @@ -426,14 +427,28 @@ final class CoderTest extends AnyFlatSpec with Matchers {
new BigInteger("123456789") coderShould notFallback()
new jBigDecimal("123456789.98765") coderShould notFallback()

// java time
Instant.now() coderShould notFallback()
LocalTime.now() coderShould notFallback()
LocalDate.now() coderShould notFallback()
LocalTime.now() coderShould notFallback()
LocalDateTime.now() coderShould notFallback()
Duration.ofSeconds(123) coderShould notFallback()
Period.ofDays(123) coderShould notFallback()

// java sql
java.sql.Timestamp.valueOf("1971-02-03 04:05:06.789") coderShould notFallback()
java.sql.Date.valueOf("1971-02-03") coderShould notFallback()
java.sql.Time.valueOf("01:02:03") coderShould notFallback()

// joda time
new org.joda.time.Instant() coderShould notFallback()
new org.joda.time.DateTime() coderShould notFallback()
new org.joda.time.LocalDate() coderShould notFallback()
new org.joda.time.LocalTime() coderShould notFallback()
new org.joda.time.LocalDateTime() coderShould notFallback()
new org.joda.time.Duration(123) coderShould notFallback()
val now = org.joda.time.Instant.now()
now coderShould notFallback()
new org.joda.time.LocalDate coderShould notFallback()
new org.joda.time.LocalTime coderShould notFallback()
new org.joda.time.LocalDateTime coderShould notFallback()
new org.joda.time.DateTime coderShould notFallback()
new java.sql.Timestamp(1) coderShould notFallback()

new IntervalWindow(now.minus(4000), now) coderShould notFallback()
}

Expand Down
Loading

0 comments on commit c65c6ad

Please sign in to comment.