Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update beam to 2.52 #5054

Merged
merged 9 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jobs:
java:
- 11
- 17
- 21
scala:
- 2.12.18
- 2.13.11
Expand Down
60 changes: 37 additions & 23 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import _root_.io.github.davidgregory084.DevMode
ThisBuild / turbo := true

val beamVendorVersion = "0.1"
val beamVersion = "2.51.0"
val beamVersion = "2.52.0"

// check version used by beam
// https://github.com/apache/beam/blob/v2.51.0/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
// https://github.com/apache/beam/blob/v2.52.0/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
val autoServiceVersion = "1.0.1"
val autoValueVersion = "1.9"
val avroVersion = "1.8.2"
Expand All @@ -43,8 +43,7 @@ val commonsCompressVersion = "1.21"
val commonsIoVersion = "2.13.0"
val commonsLang3Version = "3.9"
val commonsMath3Version = "3.6.1"
val datastoreV1ProtoClientVersion = "2.16.3"
val flinkVersion = "1.16.0"
val datastoreV1ProtoClientVersion = "2.17.1"
val googleClientsVersion = "2.0.0"
val googleOauthClientVersion = "1.34.1"
val guavaVersion = "32.1.2-jre"
Expand All @@ -57,36 +56,38 @@ val jodaTimeVersion = "2.10.10"
val nettyTcNativeVersion = "2.0.52.Final"
val nettyVersion = "4.1.87.Final"
val slf4jVersion = "1.7.30"
val sparkVersion = "3.4.1"
val zetasketchVersion = "0.1.0"
// dependent versions
val googleApiServicesBigQueryVersion = s"v2-rev20230520-$googleClientsVersion"
val googleApiServicesBigQueryVersion = s"v2-rev20230812-$googleClientsVersion"
val googleApiServicesDataflowVersion = s"v1b3-rev20220920-$googleClientsVersion"
val googleApiServicesPubsubVersion = s"v1-rev20220904-$googleClientsVersion"
val googleApiServicesStorageVersion = s"v1-rev20230617-$googleClientsVersion"
// beam tested versions
val flinkVersion = "1.16.0" // runners/flink/1.16/build.gradle
val sparkVersion = "3.4.1" // runners/spark/3/build.gradle

// check versions from libraries-bom
// https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/26.22.0/index.html
// https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/26.23.0/index.html
val animalSnifferAnnotationsVersion = "1.23"
val bigQueryStorageBetaVersion = "0.165.1"
val bigQueryStorageVersion = "2.41.1"
val bigQueryStorageBetaVersion = "0.166.0"
val bigQueryStorageVersion = "2.42.0"
val checkerFrameworkVersion = "3.33.0"
val errorProneAnnotationsVersion = "2.18.0"
val failureAccessVersion = "1.0.1"
val floggerVersion = "0.7.4"
val gaxVersion = "2.32.0"
val googleApiCommonVersion = "2.15.0"
val gaxVersion = "2.33.0"
val googleApiCommonVersion = "2.16.0"
val googleAuthVersion = "1.19.0"
val googleCloudBigTableVersion = "2.26.0"
val googleCloudCoreVersion = "2.22.0"
val googleCloudDatastoreVersion = "0.107.3"
val googleCloudMonitoringVersion = "3.24.0"
val googleCloudPubSubVersion = "1.106.1"
val googleCloudSpannerVersion = "6.45.0"
val googleCloudStorageVersion = "2.26.0"
val googleCommonsProtoVersion = "2.23.0"
val googleCloudBigTableVersion = "2.27.2"
val googleCloudCoreVersion = "2.23.0"
val googleCloudDatastoreVersion = "0.108.1"
val googleCloudMonitoringVersion = "3.25.0"
val googleCloudPubSubVersion = "1.107.0"
val googleCloudSpannerVersion = "6.47.0"
val googleCloudStorageVersion = "2.27.0"
val googleCommonsProtoVersion = "2.24.0"
val googleHttpClientsVersion = "1.43.3"
val googleIAMVersion = "1.18.0"
val googleIAMVersion = "1.19.0"
val grpcVersion = "1.56.1"
val j2objcAnnotationsVersion = "2.8"
val jsr305Version = "3.0.2"
Expand Down Expand Up @@ -240,8 +241,8 @@ lazy val keepExistingHeader =
.trim()
})

lazy val java17Settings = sys.props("java.version") match {
case v if v.startsWith("17.") =>
lazy val javaSettings = sys.props("java.version") match {
case v if v.startsWith("17.") || v.startsWith("21.") =>
Def.settings(
javaOptions ++= Seq(
"--add-opens",
Expand All @@ -255,7 +256,7 @@ lazy val java17Settings = sys.props("java.version") match {

val commonSettings = formatSettings ++
mimaSettings ++
java17Settings ++
javaSettings ++
Def.settings(
organization := "com.spotify",
headerLicense := Some(HeaderLicense.ALv2(currentYear.toString, "Spotify AB")),
Expand Down Expand Up @@ -1283,6 +1284,18 @@ lazy val `scio-repl`: Project = project
case None => Left("Error merging beam avro classes")
}
}
case PathList("com", "google", "errorprone", _*) =>
// prefer original errorprone classes instead of the ones packaged by beam
CustomMergeStrategy("ErrorProne") { conflicts =>
import sbtassembly.Assembly._
conflicts.collectFirst {
case Library(ModuleCoordinate("com.google.errorprone", _, _), _, t, s) =>
JarEntry(t, s)
} match {
case Some(e) => Right(Vector(e))
case None => Left("Error merging errorprone classes")
}
}
case PathList("org", "checkerframework", _*) =>
// prefer checker-qual classes packaged in checkerframework libs
CustomMergeStrategy("CheckerQual") { conflicts =>
Expand Down Expand Up @@ -1411,6 +1424,7 @@ lazy val `scio-smb`: Project = project
"io.dropwizard.metrics" % "metrics-core" % metricsVersion % Runtime,
// test
"org.apache.beam" % "beam-sdks-java-core" % beamVersion % "it,test" classifier "tests",
"org.apache.beam" % "beam-sdks-java-extensions-avro" % beamVersion % "it,test" classifier "tests",
"org.hamcrest" % "hamcrest" % hamcrestVersion % "it,test",
"org.scalatest" %% "scalatest" % scalatestVersion % "it,test",
"org.slf4j" % "slf4j-simple" % slf4jVersion % "it,test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.smb.BucketMetadata.HashType;
import org.apache.beam.sdk.extensions.smb.SortedBucketSource.PrimaryKeyedBucketedInput;
import org.apache.beam.sdk.io.AvroGeneratedUser;
import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileSystems;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.AvroGeneratedUser;
import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser;
import org.apache.beam.sdk.extensions.avro.io.AvroIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.beam.sdk.extensions.smb.BucketMetadata;
import org.apache.beam.sdk.extensions.smb.JsonSortedBucketIO;
import org.apache.beam.sdk.extensions.smb.SortedBucketSink;
import org.apache.beam.sdk.io.AvroGeneratedUser;
import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.beam.sdk.extensions.smb.JsonSortedBucketIO;
import org.apache.beam.sdk.extensions.smb.SortedBucketIO;
import org.apache.beam.sdk.extensions.smb.SortedBucketSource;
import org.apache.beam.sdk.io.AvroGeneratedUser;
import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser;
import org.apache.beam.sdk.extensions.smb.BucketMetadata.HashType;
import org.apache.beam.sdk.io.AvroGeneratedUser;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.hamcrest.MatcherAssert;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.AvroGeneratedUser;
import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.beam.sdk.extensions.smb;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser;
import org.apache.beam.sdk.extensions.smb.SortedBucketIO.CoGbkTransform;
import org.apache.beam.sdk.io.AvroGeneratedUser;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.util.SerializableUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser;
import org.apache.beam.sdk.extensions.smb.BucketMetadata.HashType;
import org.apache.beam.sdk.io.AvroGeneratedUser;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.sdk.io.AvroGeneratedUser;
import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.AvroGeneratedUser;
import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.util.SerializableUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@
import java.util.stream.StreamSupport;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser;
import org.apache.beam.sdk.extensions.smb.FileOperations.Writer;
import org.apache.beam.sdk.extensions.smb.SMBFilenamePolicy.FileAssignment;
import org.apache.beam.sdk.extensions.smb.SortedBucketSource.Predicate;
import org.apache.beam.sdk.io.AvroGeneratedUser;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.LocalResources;
import org.apache.beam.sdk.io.Read;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.smb
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.beam.sdk.coders.StringUtf8Coder
import org.apache.beam.sdk.extensions.smb.BucketMetadata.HashType
import org.apache.beam.sdk.io.AvroGeneratedUser
import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser
import org.apache.beam.sdk.transforms.display.DisplayData
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.spotify.scio.smb._
import com.spotify.scio.testing._
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.beam.sdk.io.AvroGeneratedUser
import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser
import org.apache.beam.sdk.values.TupleTag

import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -211,7 +211,7 @@ class ParquetEndToEndTest extends PipelineSpec {
.from(usersDir.toString)
)
.map(kv => (kv._1.toString, kv._2.toList))
val expected = users.map(u => (u.getName, List(u)))
val expected = users.map(u => (u.getName.toString, List(u)))
actual should containInAnyOrder(expected)
}
}