Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update beam to 2.56 #5346

Merged
merged 5 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 31 additions & 32 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import org.typelevel.scalacoptions.JavaMajorVersion.javaMajorVersion
// To test release candidates, find the beam repo and add it as a resolver
// ThisBuild / resolvers += "apache-beam-staging" at "https://repository.apache.org/content/repositories/"
val beamVendorVersion = "0.1"
val beamVersion = "2.55.1"
val beamVersion = "2.56.0"

// check version used by beam
// https://github.com/apache/beam/blob/v2.55.1/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
// https://github.com/apache/beam/blob/v2.56.0/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
val autoServiceVersion = "1.0.1"
val autoValueVersion = "1.9"
val bigdataossVersion = "2.2.16"
Expand All @@ -41,7 +41,6 @@ val commonsCompressVersion = "1.21"
val commonsIoVersion = "2.13.0"
val commonsLang3Version = "3.9"
val commonsMath3Version = "3.6.1"
val datastoreV1ProtoClientVersion = "2.18.3"
val googleClientsVersion = "2.0.0"
val googleOauthClientVersion = "1.34.1"
val guavaVersion = "32.1.2-jre"
Expand All @@ -51,51 +50,52 @@ val httpCoreVersion = "4.4.14"
val jacksonVersion = "2.14.1"
val jodaTimeVersion = "2.10.10"
val nettyTcNativeVersion = "2.0.52.Final"
val nettyVersion = "4.1.100.Final"
val slf4jVersion = "1.7.30"
// dependent versions
val googleApiServicesBigQueryVersion = s"v2-rev20240124-$googleClientsVersion"
val googleApiServicesDataflowVersion = s"v1b3-rev20240113-$googleClientsVersion"
val googleApiServicesBigQueryVersion = s"v2-rev20240229-$googleClientsVersion"
val googleApiServicesDataflowVersion = s"v1b3-rev20240218-$googleClientsVersion"
val googleApiServicesPubsubVersion = s"v1-rev20220904-$googleClientsVersion"
val googleApiServicesStorageVersion = s"v1-rev20240205-$googleClientsVersion"
val googleApiServicesStorageVersion = s"v1-rev20240311-$googleClientsVersion"
// beam tested versions
val zetasketchVersion = "0.1.0" // sdks/java/extensions/zetasketch/build.gradle
val avroVersion = "1.8.2" // sdks/java/extensions/avro/build.gradle
val flinkVersion = "1.16.0" // runners/flink/1.16/build.gradle
val flinkVersion = "1.17.0" // runners/flink/1.17/build.gradle
val hadoopVersion = "3.2.4" // sdks/java/io/parquet/build.gradle
val sparkVersion = "3.5.0" // runners/spark/3/build.gradle

// check versions from libraries-bom
// https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/26.32.0/index.html
// https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/26.36.0/index.html
val animalSnifferAnnotationsVersion = "1.23"
val checkerQualVersion = "3.42.0"
val errorProneAnnotationsVersion = "2.24.1"
val failureAccessVersion = "1.0.1"
val datastoreV1ProtoClientVersion = "2.19.0"
val errorProneAnnotationsVersion = "2.26.1"
val failureAccessVersion = "1.0.2"
val floggerVersion = "0.8"
val gaxVersion = "2.42.0"
val googleApiClientVersion = "2.2.0" // very strangely not in sync with googleClientsVersion
val googleApiCommonVersion = "2.25.0"
val googleAuthVersion = "1.22.0"
val googleCloudBigQueryStorageVersion = "3.1.0"
val googleCloudBigTableVersion = "2.33.0"
val googleCloudCoreVersion = "2.32.0"
val googleCloudMonitoringVersion = "3.36.0"
val googleCloudProtoBigQueryStorageBetaVersion = "0.173.0"
val gaxVersion = "2.46.1"
val googleApiClientVersion = "2.4.0" // very strangely not in sync with googleClientsVersion
val googleApiCommonVersion = "2.29.1"
val googleAuthVersion = "1.23.0"
val googleCloudBigQueryStorageVersion = "3.4.0"
val googleCloudBigTableVersion = "2.37.0"
val googleCloudCoreVersion = "2.36.1"
val googleCloudMonitoringVersion = "3.41.0"
val googleCloudProtoBigQueryStorageBetaVersion = "0.176.0"
val googleCloudProtoBigTableVersion = googleCloudBigTableVersion
val googleCloudProtoDatastoreVersion = "0.109.3"
val googleCloudProtoPubSubVersion = "1.108.5"
val googleCloudSpannerVersion = "6.58.0"
val googleCloudStorageVersion = "2.33.0"
val googleHttpClientVersion = "1.43.3"
val googleProtoCommonVersion = "2.33.0"
val googleProtoIAMVersion = "1.28.0"
val grpcVersion = "1.61.0"
val j2objcAnnotationsVersion = "2.8"
val googleCloudProtoDatastoreVersion = "0.110.0"
val googleCloudProtoPubSubVersion = "1.109.3"
val googleCloudSpannerVersion = "6.62.0"
val googleCloudStorageVersion = "2.36.1"
val googleHttpClientVersion = "1.44.1"
val googleProtoCommonVersion = "2.37.1"
val googleProtoIAMVersion = "1.32.1"
val grpcVersion = "1.62.2"
val j2objcAnnotationsVersion = "3.0.0"
val jsr305Version = "3.0.2"
val okioVersion = "3.4.0"
val nettyVersion = "4.1.100.Final"
val okioVersion = "3.6.0"
val opencensusVersion = "0.31.1"
val perfmarkVersion = "0.27.0"
val protobufVersion = "3.25.2"
val protobufVersion = "3.25.3"

val algebirdVersion = "0.13.10"
val algebraVersion = "2.10.0"
Expand Down Expand Up @@ -1510,7 +1510,6 @@ lazy val `scio-smb` = project
"org.tensorflow" % "tensorflow-core-api" % tensorFlowVersion % Provided, // scio-tensorflow
// test
"org.apache.beam" % "beam-sdks-java-core" % beamVersion % Test classifier "tests",
"org.apache.beam" % "beam-sdks-java-extensions-avro" % beamVersion % Test classifier "tests",
"org.hamcrest" % "hamcrest" % hamcrestVersion % Test,
"org.scalatest" %% "scalatest" % scalatestVersion % Test,
"org.slf4j" % "slf4j-simple" % slf4jVersion % Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object GrpcSerializerTest {
case _ => false
}

private val eqStatusCode: Equality[StatusCode] = {
val eqStatusCode: Equality[StatusCode] = {
case (a: StatusCode, b: StatusCode) =>
a.getCode == b.getCode
case _ => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

package com.spotify.scio.coders.instances.kryo

import com.google.api.gax.rpc.ApiException
import com.google.api.gax.rpc.{ApiException, StatusCode}
import com.google.cloud.bigtable.data.v2.models.MutateRowsException
import com.twitter.chill._

private[coders] class MutateRowsExceptionSerializer extends KSerializer[MutateRowsException] {
override def write(kryo: Kryo, output: Output, e: MutateRowsException): Unit = {
kryo.writeClassAndObject(output, e.getCause)
kryo.writeObject(output, e.getStatusCode.getCode)
val failedMutations = e.getFailedMutations
kryo.writeObject(output, failedMutations.size())
failedMutations.forEach { fm =>
Expand All @@ -38,6 +39,12 @@ private[coders] class MutateRowsExceptionSerializer extends KSerializer[MutateRo
`type`: Class[MutateRowsException]
): MutateRowsException = {
val cause = kryo.readClassAndObject(input).asInstanceOf[Throwable]
// generic status code. we lost transport information during serialization
val code = kryo.readObject(input, classOf[StatusCode.Code])
val statusCode = new StatusCode() {
override def getCode: StatusCode.Code = code
override def getTransportCode: AnyRef = null
}
val size = kryo.readObject(input, classOf[Integer])
val failedMutations = new _root_.java.util.ArrayList[MutateRowsException.FailedMutation](size)
(0 until size).foreach { _ =>
Expand All @@ -46,6 +53,6 @@ private[coders] class MutateRowsExceptionSerializer extends KSerializer[MutateRo
failedMutations.add(MutateRowsException.FailedMutation.create(index, error))
}
val retryable = kryo.readObject(input, classOf[Boolean])
MutateRowsException.create(cause, failedMutations, retryable)
MutateRowsException.create(cause, statusCode, failedMutations, retryable)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object GcpSerializerTest {
implicit val eqMutateRowsException: Equality[MutateRowsException] = {
case (a: MutateRowsException, b: MutateRowsException) =>
eqCause.areEqual(a.getCause, b.getCause) &&
a.getStatusCode == b.getStatusCode &&
eqStatusCode.areEqual(a.getStatusCode, b.getStatusCode) &&
a.isRetryable == b.isRetryable &&
a.getFailedMutations.size() == b.getFailedMutations.size() &&
a.getFailedMutations.asScala.zip(b.getFailedMutations.asScala).forall { case (x, y) =>
Expand Down Expand Up @@ -71,9 +71,10 @@ class GcpSerializerTest extends AnyFlatSpec with Matchers {

"MutateRowsExceptionSerializer" should "roundtrip" in {
val cause = new StatusRuntimeException(Status.OK)
val code = GrpcStatusCode.of(Status.OK.getCode)
val apiException = new InternalException(cause, GrpcStatusCode.of(Code.OK), false)
val failedMutations = List(MutateRowsException.FailedMutation.create(1, apiException))
val mutateRowsException = MutateRowsException.create(cause, failedMutations.asJava, false)
val mutateRowsException = MutateRowsException.create(cause, code, failedMutations.asJava, false)

mutateRowsException coderShould roundtrip()
}
Expand Down
10 changes: 10 additions & 0 deletions scio-smb/src/test/avro/user.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"namespace": "org.apache.beam.sdk.extensions.avro.io",
"type": "record",
"name": "AvroGeneratedUser",
Comment on lines +3 to +4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did the Beam Avro class get deleted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The artifact is published with a class compiled with another, incompatible avro version

"fields": [
{ "name": "name", "type": "string"},
{ "name": "favorite_number", "type": ["int", "null"]},
{ "name": "favorite_color", "type": ["string", "null"]}
]
}