Skip to content

Commit

Permalink
Update beam to 2.56 (#5346)
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones authored May 6, 2024
1 parent b544c6d commit 7de1259
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 37 deletions.
63 changes: 31 additions & 32 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import org.typelevel.scalacoptions.JavaMajorVersion.javaMajorVersion
// To test release candidates, find the beam repo and add it as a resolver
// ThisBuild / resolvers += "apache-beam-staging" at "https://repository.apache.org/content/repositories/"
val beamVendorVersion = "0.1"
val beamVersion = "2.55.1"
val beamVersion = "2.56.0"

// check version used by beam
// https://github.com/apache/beam/blob/v2.55.1/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
// https://github.com/apache/beam/blob/v2.56.0/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
val autoServiceVersion = "1.0.1"
val autoValueVersion = "1.9"
val bigdataossVersion = "2.2.16"
Expand All @@ -41,7 +41,6 @@ val commonsCompressVersion = "1.21"
val commonsIoVersion = "2.13.0"
val commonsLang3Version = "3.9"
val commonsMath3Version = "3.6.1"
val datastoreV1ProtoClientVersion = "2.18.3"
val googleClientsVersion = "2.0.0"
val googleOauthClientVersion = "1.34.1"
val guavaVersion = "32.1.2-jre"
Expand All @@ -51,51 +50,52 @@ val httpCoreVersion = "4.4.14"
val jacksonVersion = "2.14.1"
val jodaTimeVersion = "2.10.10"
val nettyTcNativeVersion = "2.0.52.Final"
val nettyVersion = "4.1.100.Final"
val slf4jVersion = "1.7.30"
// dependent versions
val googleApiServicesBigQueryVersion = s"v2-rev20240124-$googleClientsVersion"
val googleApiServicesDataflowVersion = s"v1b3-rev20240113-$googleClientsVersion"
val googleApiServicesBigQueryVersion = s"v2-rev20240229-$googleClientsVersion"
val googleApiServicesDataflowVersion = s"v1b3-rev20240218-$googleClientsVersion"
val googleApiServicesPubsubVersion = s"v1-rev20220904-$googleClientsVersion"
val googleApiServicesStorageVersion = s"v1-rev20240205-$googleClientsVersion"
val googleApiServicesStorageVersion = s"v1-rev20240311-$googleClientsVersion"
// beam tested versions
val zetasketchVersion = "0.1.0" // sdks/java/extensions/zetasketch/build.gradle
val avroVersion = "1.8.2" // sdks/java/extensions/avro/build.gradle
val flinkVersion = "1.16.0" // runners/flink/1.16/build.gradle
val flinkVersion = "1.17.0" // runners/flink/1.17/build.gradle
val hadoopVersion = "3.2.4" // sdks/java/io/parquet/build.gradle
val sparkVersion = "3.5.0" // runners/spark/3/build.gradle

// check versions from libraries-bom
// https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/26.32.0/index.html
// https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/26.36.0/index.html
val animalSnifferAnnotationsVersion = "1.23"
val checkerQualVersion = "3.42.0"
val errorProneAnnotationsVersion = "2.24.1"
val failureAccessVersion = "1.0.1"
val datastoreV1ProtoClientVersion = "2.19.0"
val errorProneAnnotationsVersion = "2.26.1"
val failureAccessVersion = "1.0.2"
val floggerVersion = "0.8"
val gaxVersion = "2.42.0"
val googleApiClientVersion = "2.2.0" // very strangely not in sync with googleClientsVersion
val googleApiCommonVersion = "2.25.0"
val googleAuthVersion = "1.22.0"
val googleCloudBigQueryStorageVersion = "3.1.0"
val googleCloudBigTableVersion = "2.33.0"
val googleCloudCoreVersion = "2.32.0"
val googleCloudMonitoringVersion = "3.36.0"
val googleCloudProtoBigQueryStorageBetaVersion = "0.173.0"
val gaxVersion = "2.46.1"
val googleApiClientVersion = "2.4.0" // very strangely not in sync with googleClientsVersion
val googleApiCommonVersion = "2.29.1"
val googleAuthVersion = "1.23.0"
val googleCloudBigQueryStorageVersion = "3.4.0"
val googleCloudBigTableVersion = "2.37.0"
val googleCloudCoreVersion = "2.36.1"
val googleCloudMonitoringVersion = "3.41.0"
val googleCloudProtoBigQueryStorageBetaVersion = "0.176.0"
val googleCloudProtoBigTableVersion = googleCloudBigTableVersion
val googleCloudProtoDatastoreVersion = "0.109.3"
val googleCloudProtoPubSubVersion = "1.108.5"
val googleCloudSpannerVersion = "6.58.0"
val googleCloudStorageVersion = "2.33.0"
val googleHttpClientVersion = "1.43.3"
val googleProtoCommonVersion = "2.33.0"
val googleProtoIAMVersion = "1.28.0"
val grpcVersion = "1.61.0"
val j2objcAnnotationsVersion = "2.8"
val googleCloudProtoDatastoreVersion = "0.110.0"
val googleCloudProtoPubSubVersion = "1.109.3"
val googleCloudSpannerVersion = "6.62.0"
val googleCloudStorageVersion = "2.36.1"
val googleHttpClientVersion = "1.44.1"
val googleProtoCommonVersion = "2.37.1"
val googleProtoIAMVersion = "1.32.1"
val grpcVersion = "1.62.2"
val j2objcAnnotationsVersion = "3.0.0"
val jsr305Version = "3.0.2"
val okioVersion = "3.4.0"
val nettyVersion = "4.1.100.Final"
val okioVersion = "3.6.0"
val opencensusVersion = "0.31.1"
val perfmarkVersion = "0.27.0"
val protobufVersion = "3.25.2"
val protobufVersion = "3.25.3"

val algebirdVersion = "0.13.10"
val algebraVersion = "2.10.0"
Expand Down Expand Up @@ -1510,7 +1510,6 @@ lazy val `scio-smb` = project
"org.tensorflow" % "tensorflow-core-api" % tensorFlowVersion % Provided, // scio-tensorflow
// test
"org.apache.beam" % "beam-sdks-java-core" % beamVersion % Test classifier "tests",
"org.apache.beam" % "beam-sdks-java-extensions-avro" % beamVersion % Test classifier "tests",
"org.hamcrest" % "hamcrest" % hamcrestVersion % Test,
"org.scalatest" %% "scalatest" % scalatestVersion % Test,
"org.slf4j" % "slf4j-simple" % slf4jVersion % Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object GrpcSerializerTest {
case _ => false
}

private val eqStatusCode: Equality[StatusCode] = {
val eqStatusCode: Equality[StatusCode] = {
case (a: StatusCode, b: StatusCode) =>
a.getCode == b.getCode
case _ => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

package com.spotify.scio.coders.instances.kryo

import com.google.api.gax.rpc.ApiException
import com.google.api.gax.rpc.{ApiException, StatusCode}
import com.google.cloud.bigtable.data.v2.models.MutateRowsException
import com.twitter.chill._

private[coders] class MutateRowsExceptionSerializer extends KSerializer[MutateRowsException] {
override def write(kryo: Kryo, output: Output, e: MutateRowsException): Unit = {
kryo.writeClassAndObject(output, e.getCause)
kryo.writeObject(output, e.getStatusCode.getCode)
val failedMutations = e.getFailedMutations
kryo.writeObject(output, failedMutations.size())
failedMutations.forEach { fm =>
Expand All @@ -38,6 +39,12 @@ private[coders] class MutateRowsExceptionSerializer extends KSerializer[MutateRo
`type`: Class[MutateRowsException]
): MutateRowsException = {
val cause = kryo.readClassAndObject(input).asInstanceOf[Throwable]
// generic status code. we lost transport information during serialization
val code = kryo.readObject(input, classOf[StatusCode.Code])
val statusCode = new StatusCode() {
override def getCode: StatusCode.Code = code
override def getTransportCode: AnyRef = null
}
val size = kryo.readObject(input, classOf[Integer])
val failedMutations = new _root_.java.util.ArrayList[MutateRowsException.FailedMutation](size)
(0 until size).foreach { _ =>
Expand All @@ -46,6 +53,6 @@ private[coders] class MutateRowsExceptionSerializer extends KSerializer[MutateRo
failedMutations.add(MutateRowsException.FailedMutation.create(index, error))
}
val retryable = kryo.readObject(input, classOf[Boolean])
MutateRowsException.create(cause, failedMutations, retryable)
MutateRowsException.create(cause, statusCode, failedMutations, retryable)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object GcpSerializerTest {
implicit val eqMutateRowsException: Equality[MutateRowsException] = {
case (a: MutateRowsException, b: MutateRowsException) =>
eqCause.areEqual(a.getCause, b.getCause) &&
a.getStatusCode == b.getStatusCode &&
eqStatusCode.areEqual(a.getStatusCode, b.getStatusCode) &&
a.isRetryable == b.isRetryable &&
a.getFailedMutations.size() == b.getFailedMutations.size() &&
a.getFailedMutations.asScala.zip(b.getFailedMutations.asScala).forall { case (x, y) =>
Expand Down Expand Up @@ -71,9 +71,10 @@ class GcpSerializerTest extends AnyFlatSpec with Matchers {

"MutateRowsExceptionSerializer" should "roundtrip" in {
val cause = new StatusRuntimeException(Status.OK)
val code = GrpcStatusCode.of(Status.OK.getCode)
val apiException = new InternalException(cause, GrpcStatusCode.of(Code.OK), false)
val failedMutations = List(MutateRowsException.FailedMutation.create(1, apiException))
val mutateRowsException = MutateRowsException.create(cause, failedMutations.asJava, false)
val mutateRowsException = MutateRowsException.create(cause, code, failedMutations.asJava, false)

mutateRowsException coderShould roundtrip()
}
Expand Down
10 changes: 10 additions & 0 deletions scio-smb/src/test/avro/user.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"namespace": "org.apache.beam.sdk.extensions.avro.io",
"type": "record",
"name": "AvroGeneratedUser",
"fields": [
{ "name": "name", "type": "string"},
{ "name": "favorite_number", "type": ["int", "null"]},
{ "name": "favorite_color", "type": ["string", "null"]}
]
}

0 comments on commit 7de1259

Please sign in to comment.