Skip to content

Commit

Permalink
Merge branch 'main' into fixup-0.13-release
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones authored Jun 19, 2023
2 parents 1c65655 + f32058c commit d415fe5
Show file tree
Hide file tree
Showing 25 changed files with 51 additions and 47 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ jobs:
strategy:
matrix:
java:
- 8
- 11
- 17
scala:
Expand All @@ -38,8 +37,6 @@ jobs:
java: 11
coverage: true
exclude:
- scala: 2.12.18
java: 8
- scala: 2.12.18
java: 17
repl-test:
Expand Down
11 changes: 7 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ val httpAsyncClientVersion = "4.1.5"
val hamcrestVersion = "2.2"
val jakartaJsonVersion = "2.1.2"
val javaLshVersion = "0.12"
val jedisVersion = "4.4.2"
val jedisVersion = "4.4.3"
val jnaVersion = "5.13.0"
val junitInterfaceVersion = "0.13.3"
val junitVersion = "4.13.2"
Expand All @@ -131,13 +131,13 @@ val parquetVersion = "1.12.3"
val pprintVersion = "0.8.1"
val protobufGenericVersion = "0.2.9"
val scalacheckVersion = "1.17.0"
val scalaCollectionCompatVersion = "2.10.0"
val scalaCollectionCompatVersion = "2.11.0"
val scalaMacrosVersion = "2.1.1"
val scalatestVersion = "3.2.16"
val shapelessVersion = "2.3.10"
val sparkeyVersion = "3.2.5"
val tensorFlowVersion = "0.4.2"
val testContainersVersion = "0.40.16"
val testContainersVersion = "0.40.17"
val zoltarVersion = "0.6.0"
// dependent versions
val scalatestplusVersion = s"$scalatestVersion.0"
Expand Down Expand Up @@ -559,6 +559,7 @@ lazy val `scio-core`: Project = project
"org.apache.avro" % "avro" % avroVersion, // TODO remove from core
"org.apache.beam" % "beam-runners-core-construction-java" % beamVersion,
"org.apache.beam" % "beam-sdks-java-core" % beamVersion,
"org.apache.beam" % "beam-sdks-java-extensions-avro" % beamVersion,
"org.apache.beam" % "beam-sdks-java-extensions-protobuf" % beamVersion,
"org.apache.beam" % "beam-vendor-guava-26_0-jre" % beamVendorVersion,
"org.apache.commons" % "commons-compress" % commonsCompressVersion,
Expand Down Expand Up @@ -683,6 +684,7 @@ lazy val `scio-avro`: Project = project
"com.thoughtworks.paranamer" % "paranamer"
),
"org.apache.beam" % "beam-sdks-java-core" % beamVersion,
"org.apache.beam" % "beam-sdks-java-extensions-avro" % beamVersion,
"org.apache.beam" % "beam-vendor-guava-26_0-jre" % beamVendorVersion,
"org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionCompatVersion,
"org.slf4j" % "slf4j-api" % slf4jVersion,
Expand Down Expand Up @@ -1184,7 +1186,7 @@ lazy val `scio-examples`: Project = project
"org.slf4j" % "slf4j-api" % slf4jVersion,
// runtime
"com.google.cloud.bigdataoss" % "gcs-connector" % s"hadoop2-$bigdataossVersion" % Runtime,
"com.google.cloud.sql" % "mysql-socket-factory" % "1.11.1" % Runtime,
"com.google.cloud.sql" % "mysql-socket-factory-connector-j-8" % "1.12.0" % Runtime,
"org.slf4j" % "log4j-over-slf4j" % slf4jVersion % Runtime,
"org.slf4j" % "slf4j-simple" % slf4jVersion % Runtime,
// test
Expand Down Expand Up @@ -1342,6 +1344,7 @@ lazy val `scio-smb`: Project = project
"com.spotify" %% "magnolify-parquet" % magnolifyVersion,
"joda-time" % "joda-time" % jodaTimeVersion,
"org.apache.beam" % "beam-sdks-java-core" % beamVersion,
"org.apache.beam" % "beam-sdks-java-extensions-avro" % beamVersion,
"org.apache.beam" % "beam-sdks-java-extensions-protobuf" % beamVersion,
// #3260 work around for sorter memory limit until we patch upstream
// "org.apache.beam" % "beam-sdks-java-extensions-sorter" % beamVersion,
Expand Down
24 changes: 12 additions & 12 deletions scio-avro/src/main/scala/com/spotify/scio/avro/AvroIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.avro.specific.SpecificRecord
import org.apache.beam.sdk.io.fs.ResourceId
import org.apache.beam.sdk.transforms.DoFn.{Element, OutputReceiver, ProcessElement}
import org.apache.beam.sdk.transforms.DoFn
import org.apache.beam.sdk.{io => beam}
import org.apache.beam.sdk.extensions.avro.io.{AvroIO => BAvroIO}

import scala.jdk.CollectionConverters._
import scala.reflect.runtime.universe._
Expand Down Expand Up @@ -136,7 +136,7 @@ sealed trait AvroIO[T] extends ScioIO[T] {
final override val tapT: TapT.Aux[T, T] = TapOf[T]

protected[scio] def avroOut[U](
write: beam.AvroIO.Write[U],
write: BAvroIO.Write[U],
path: String,
numShards: Int,
suffix: String,
Expand All @@ -147,7 +147,7 @@ sealed trait AvroIO[T] extends ScioIO[T] {
shardNameTemplate: String,
isWindowed: Boolean,
tempDirectory: ResourceId
): beam.AvroIO.Write[U] = {
): BAvroIO.Write[U] = {
require(tempDirectory != null, "tempDirectory must not be null")
val fp = FilenamePolicySupplier.resolve(
filenamePolicySupplier = filenamePolicySupplier,
Expand Down Expand Up @@ -180,7 +180,7 @@ final case class SpecificRecordIO[T <: SpecificRecord: ClassTag: Coder](path: St
val coder = CoderMaterializer.beam(sc, Coder[T])
val cls = ScioUtil.classOf[T]
val filePattern = ScioUtil.filePattern(path, params.suffix)
val t = beam.AvroIO
val t = BAvroIO
.read(cls)
.from(filePattern)
sc
Expand All @@ -194,7 +194,7 @@ final case class SpecificRecordIO[T <: SpecificRecord: ClassTag: Coder](path: St
*/
override protected def write(data: SCollection[T], params: WriteP): Tap[T] = {
val cls = ScioUtil.classOf[T]
val t = beam.AvroIO.write(cls)
val t = BAvroIO.write(cls)

data.applyInternal(
avroOut(
Expand Down Expand Up @@ -231,7 +231,7 @@ final case class GenericRecordIO(path: String, schema: Schema) extends AvroIO[Ge
override protected def read(sc: ScioContext, params: ReadP): SCollection[GenericRecord] = {
val coder = CoderMaterializer.beam(sc, Coder.avroGenericRecordCoder(schema))
val filePattern = ScioUtil.filePattern(path, params.suffix)
val t = beam.AvroIO
val t = BAvroIO
.readGenericRecords(schema)
.from(filePattern)
sc
Expand All @@ -246,7 +246,7 @@ final case class GenericRecordIO(path: String, schema: Schema) extends AvroIO[Ge
data: SCollection[GenericRecord],
params: WriteP
): Tap[GenericRecord] = {
val t = beam.AvroIO.writeGenericRecords(schema)
val t = BAvroIO.writeGenericRecords(schema)
data.applyInternal(
avroOut(
t,
Expand Down Expand Up @@ -290,7 +290,7 @@ final case class GenericRecordParseIO[T](path: String, parseFn: GenericRecord =>
*/
override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = {
val filePattern = ScioUtil.filePattern(path, params.suffix)
val t = beam.AvroIO
val t = BAvroIO
.parseGenericRecords(Functions.serializableFn(parseFn))
.from(filePattern)
.withCoder(CoderMaterializer.beam(sc, coder))
Expand Down Expand Up @@ -348,9 +348,9 @@ object AvroIO {

object AvroTyped {
private[scio] def writeTransform[T <: HasAvroAnnotation: TypeTag: Coder]()
: beam.AvroIO.TypedWrite[T, Void, GenericRecord] = {
: BAvroIO.TypedWrite[T, Void, GenericRecord] = {
val avroT = AvroType[T]
beam.AvroIO
BAvroIO
.writeCustomTypeToGenericRecords()
.withFormatFunction(Functions.serializableFn(avroT.toGenericRecord))
.withSchema(avroT.schema)
Expand All @@ -362,7 +362,7 @@ object AvroTyped {
final override val tapT: TapT.Aux[T, T] = TapOf[T]

private[scio] def typedAvroOut[U](
write: beam.AvroIO.TypedWrite[U, Void, GenericRecord],
write: BAvroIO.TypedWrite[U, Void, GenericRecord],
path: String,
numShards: Int,
suffix: String,
Expand Down Expand Up @@ -401,7 +401,7 @@ object AvroTyped {
override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = {
val avroT = AvroType[T]
val filePattern = ScioUtil.filePattern(path, params.suffix)
val t = beam.AvroIO.readGenericRecords(avroT.schema).from(filePattern)
val t = BAvroIO.readGenericRecords(avroT.schema).from(filePattern)
sc.applyTransform(t).map(avroT.fromGenericRecord)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.twitter.chill.KSerializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificRecord
import org.apache.beam.sdk.coders.AvroCoder
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder

import scala.collection.mutable.{Map => MMap}
import scala.util.Try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.{SpecificData, SpecificFixed, SpecificRecord}
import org.apache.beam.sdk.coders.Coder.NonDeterministicException
import org.apache.beam.sdk.coders.{AtomicCoder, AvroCoder, CustomCoder, StringUtf8Coder}
import org.apache.beam.sdk.coders.{AtomicCoder, CustomCoder, StringUtf8Coder}
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder
import org.apache.beam.sdk.util.common.ElementByteSizeObserver

import java.io.{InputStream, OutputStream}
Expand Down Expand Up @@ -71,7 +72,7 @@ final private class SlowGenericRecordCoder extends AtomicCoder[GenericRecord] {
/**
* Implementation is legit only for SpecificFixed, not GenericFixed
* @see
* [[org.apache.beam.sdk.coders.AvroCoder]]
* [[org.apache.beam.sdk.extensions.avro.coders.AvroCoder]]
*/
final private class SpecificFixedCoder[A <: SpecificFixed](cls: Class[A]) extends CustomCoder[A] {
// lazy because AVRO Schema isn't serializable
Expand Down
2 changes: 1 addition & 1 deletion scio-core/src/main/scala/com/spotify/scio/io/Tap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.spotify.scio.values.SCollection
import com.spotify.scio.{ScioContext, ScioResult}
import org.apache.avro.generic.GenericRecord
import org.apache.beam.sdk.coders.{Coder => BCoder}
import org.apache.beam.sdk.io.AvroIO
import org.apache.beam.sdk.extensions.avro.io.AvroIO
import org.apache.beam.sdk.transforms.DoFn
import org.apache.beam.sdk.transforms.DoFn.{Element, OutputReceiver, ProcessElement}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.avro.specific.SpecificRecord
import org.apache.beam.sdk.coders.StringUtf8Coder
import org.apache.beam.sdk.io.{Compression, FileIO}
import org.apache.beam.sdk.{io => beam}
import org.apache.beam.sdk.extensions.avro.io.{AvroIO => BAvroIO}

import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
Expand Down Expand Up @@ -81,7 +82,7 @@ final class DynamicSpecificRecordSCollectionOps[T <: SpecificRecord](
val cls = ct.runtimeClass.asInstanceOf[Class[T]]
val nm = new JHashMap[String, AnyRef]()
nm.putAll(metadata.asJava)
val sink = beam.AvroIO
val sink = BAvroIO
.sink(cls)
.withCodec(codec)
.withMetadata(nm)
Expand Down Expand Up @@ -130,7 +131,7 @@ final class DynamicGenericRecordSCollectionOps[T <: GenericRecord](private val s
} else {
val nm = new JHashMap[String, AnyRef]()
nm.putAll(metadata.asJava)
val sink = beam.AvroIO
val sink = BAvroIO
.sinkViaGenericRecords(
schema,
(element: T, _: Schema) => element
Expand Down Expand Up @@ -227,7 +228,7 @@ final class DynamicProtobufSCollectionOps[T <: Message](private val self: SColle
"Protobuf file with dynamic destinations cannot be used in a test context"
)
} else {
val sink = beam.AvroIO
val sink = BAvroIO
.sinkViaGenericRecords(
avroSchema,
(element: T, _: Schema) => AvroBytesUtil.encode(elemCoder, element)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ package com.spotify.scio.schemas.instances
import com.spotify.scio.schemas.{RawRecord, Schema}
import org.apache.avro.specific.SpecificRecord
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.beam.sdk.schemas.utils.AvroUtils
import org.apache.beam.sdk.schemas.{AvroRecordSchema, Schema => BSchema}
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils
import org.apache.beam.sdk.extensions.avro.schemas.AvroRecordSchema
import org.apache.beam.sdk.schemas.{Schema => BSchema}
import org.apache.beam.sdk.transforms.SerializableFunction
import org.apache.beam.sdk.values.{Row, TypeDescriptor}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.apache.beam.sdk.util.SerializableUtils
import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode
import org.apache.beam.sdk.values._
import org.apache.beam.sdk.{io => beam}
import org.apache.beam.sdk.extensions.avro.io.{AvroIO => BAvroIO}
import org.joda.time.{Duration, Instant}
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -1554,7 +1555,7 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] {
val elemCoder = CoderMaterializer.beam(context, coder)
val schema = AvroBytesUtil.schema
val avroCoder = Coder.avroGenericRecordCoder(schema)
val write = beam.AvroIO
val write = BAvroIO
.writeGenericRecords(schema)
.to(ScioUtil.pathWithPrefix(path, "part"))
.withSuffix(".obj.avro")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.avro.reflect.Nullable;
import org.apache.beam.examples.complete.game.utils.WriteToText;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.beam.examples.subprocess.utils;

import org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.beam.examples.complete.game.LeaderBoard.CalculateTeamScores;
import org.apache.beam.examples.complete.game.LeaderBoard.CalculateUserScores;
import org.apache.beam.examples.complete.game.UserScore.GameActionInfo;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import org.apache.beam.examples.complete.game.StatefulTeamScore.UpdateTeamScoreFn;
import org.apache.beam.examples.complete.game.UserScore.GameActionInfo;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ object ParquetAvroIO {
}

// Needed to make GenericRecord read by parquet-avro work with Beam's
// org.apache.beam.sdk.coders.AvroCoder.
// org.apache.beam.sdk.extensions.avro.coders.AvroCoder
if (!isSpecific) {
jobConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.AvroGeneratedUser;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.extensions.avro.io.AvroIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.io.IOException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.smb.AvroSortedBucketIO;
Expand Down
Loading

0 comments on commit d415fe5

Please sign in to comment.