diff --git a/build.sbt b/build.sbt index 5fdee6367c..0045fd2d0c 100644 --- a/build.sbt +++ b/build.sbt @@ -28,10 +28,10 @@ import org.typelevel.scalacoptions.JavaMajorVersion.javaMajorVersion // To test release candidates, find the beam repo and add it as a resolver // ThisBuild / resolvers += "apache-beam-staging" at "https://repository.apache.org/content/repositories/" val beamVendorVersion = "0.1" -val beamVersion = "2.55.1" +val beamVersion = "2.56.0" // check version used by beam -// https://github.com/apache/beam/blob/v2.55.1/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +// https://github.com/apache/beam/blob/v2.56.0/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy val autoServiceVersion = "1.0.1" val autoValueVersion = "1.9" val bigdataossVersion = "2.2.16" @@ -41,7 +41,6 @@ val commonsCompressVersion = "1.21" val commonsIoVersion = "2.13.0" val commonsLang3Version = "3.9" val commonsMath3Version = "3.6.1" -val datastoreV1ProtoClientVersion = "2.18.3" val googleClientsVersion = "2.0.0" val googleOauthClientVersion = "1.34.1" val guavaVersion = "32.1.2-jre" @@ -51,51 +50,52 @@ val httpCoreVersion = "4.4.14" val jacksonVersion = "2.14.1" val jodaTimeVersion = "2.10.10" val nettyTcNativeVersion = "2.0.52.Final" -val nettyVersion = "4.1.100.Final" val slf4jVersion = "1.7.30" // dependent versions -val googleApiServicesBigQueryVersion = s"v2-rev20240124-$googleClientsVersion" -val googleApiServicesDataflowVersion = s"v1b3-rev20240113-$googleClientsVersion" +val googleApiServicesBigQueryVersion = s"v2-rev20240229-$googleClientsVersion" +val googleApiServicesDataflowVersion = s"v1b3-rev20240218-$googleClientsVersion" val googleApiServicesPubsubVersion = s"v1-rev20220904-$googleClientsVersion" -val googleApiServicesStorageVersion = s"v1-rev20240205-$googleClientsVersion" +val googleApiServicesStorageVersion = s"v1-rev20240311-$googleClientsVersion" // beam tested versions val zetasketchVersion = "0.1.0" // sdks/java/extensions/zetasketch/build.gradle val avroVersion = "1.8.2" // sdks/java/extensions/avro/build.gradle -val flinkVersion = "1.16.0" // runners/flink/1.16/build.gradle +val flinkVersion = "1.17.0" // runners/flink/1.17/build.gradle val hadoopVersion = "3.2.4" // sdks/java/io/parquet/build.gradle val sparkVersion = "3.5.0" // runners/spark/3/build.gradle // check versions from libraries-bom -// https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/26.32.0/index.html +// https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/26.36.0/index.html val animalSnifferAnnotationsVersion = "1.23" val checkerQualVersion = "3.42.0" -val errorProneAnnotationsVersion = "2.24.1" -val failureAccessVersion = "1.0.1" +val datastoreV1ProtoClientVersion = "2.19.0" +val errorProneAnnotationsVersion = "2.26.1" +val failureAccessVersion = "1.0.2" val floggerVersion = "0.8" -val gaxVersion = "2.42.0" -val googleApiClientVersion = "2.2.0" // very strangely not in sync with googleClientsVersion -val googleApiCommonVersion = "2.25.0" -val googleAuthVersion = "1.22.0" -val googleCloudBigQueryStorageVersion = "3.1.0" -val googleCloudBigTableVersion = "2.33.0" -val googleCloudCoreVersion = "2.32.0" -val googleCloudMonitoringVersion = "3.36.0" -val googleCloudProtoBigQueryStorageBetaVersion = "0.173.0" +val gaxVersion = "2.46.1" +val googleApiClientVersion = "2.4.0" // very strangely not in sync with googleClientsVersion +val googleApiCommonVersion = "2.29.1" +val googleAuthVersion = "1.23.0" +val googleCloudBigQueryStorageVersion = "3.4.0" +val googleCloudBigTableVersion = "2.37.0" +val googleCloudCoreVersion = "2.36.1" +val googleCloudMonitoringVersion = "3.41.0" +val googleCloudProtoBigQueryStorageBetaVersion = "0.176.0" val googleCloudProtoBigTableVersion = googleCloudBigTableVersion -val googleCloudProtoDatastoreVersion = "0.109.3" -val googleCloudProtoPubSubVersion = "1.108.5" -val googleCloudSpannerVersion = "6.58.0" -val googleCloudStorageVersion = "2.33.0" -val googleHttpClientVersion = "1.43.3" -val googleProtoCommonVersion = "2.33.0" -val googleProtoIAMVersion = "1.28.0" -val grpcVersion = "1.61.0" -val j2objcAnnotationsVersion = "2.8" +val googleCloudProtoDatastoreVersion = "0.110.0" +val googleCloudProtoPubSubVersion = "1.109.3" +val googleCloudSpannerVersion = "6.62.0" +val googleCloudStorageVersion = "2.36.1" +val googleHttpClientVersion = "1.44.1" +val googleProtoCommonVersion = "2.37.1" +val googleProtoIAMVersion = "1.32.1" +val grpcVersion = "1.62.2" +val j2objcAnnotationsVersion = "3.0.0" val jsr305Version = "3.0.2" -val okioVersion = "3.4.0" +val nettyVersion = "4.1.100.Final" +val okioVersion = "3.6.0" val opencensusVersion = "0.31.1" val perfmarkVersion = "0.27.0" -val protobufVersion = "3.25.2" +val protobufVersion = "3.25.3" val algebirdVersion = "0.13.10" val algebraVersion = "2.10.0" @@ -1510,7 +1510,6 @@ lazy val `scio-smb` = project "org.tensorflow" % "tensorflow-core-api" % tensorFlowVersion % Provided, // scio-tensorflow // test "org.apache.beam" % "beam-sdks-java-core" % beamVersion % Test classifier "tests", - "org.apache.beam" % "beam-sdks-java-extensions-avro" % beamVersion % Test classifier "tests", "org.hamcrest" % "hamcrest" % hamcrestVersion % Test, "org.scalatest" %% "scalatest" % scalatestVersion % Test, "org.slf4j" % "slf4j-simple" % slf4jVersion % Test diff --git a/scio-core/src/test/scala/com/spotify/scio/coders/instances/kryo/GrpcSerializerTest.scala b/scio-core/src/test/scala/com/spotify/scio/coders/instances/kryo/GrpcSerializerTest.scala index fb3268ed21..6d9205dc73 100644 --- a/scio-core/src/test/scala/com/spotify/scio/coders/instances/kryo/GrpcSerializerTest.scala +++ b/scio-core/src/test/scala/com/spotify/scio/coders/instances/kryo/GrpcSerializerTest.scala @@ -58,7 +58,7 @@ object GrpcSerializerTest { case _ => false } - private val eqStatusCode: Equality[StatusCode] = { + val eqStatusCode: Equality[StatusCode] = { case (a: StatusCode, b: StatusCode) => a.getCode == b.getCode case _ => false diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/coders/instances/kryo/GcpSerializer.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/coders/instances/kryo/GcpSerializer.scala index fe9ac36196..f4c4b92a7c 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/coders/instances/kryo/GcpSerializer.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/coders/instances/kryo/GcpSerializer.scala @@ -16,13 +16,14 @@ package com.spotify.scio.coders.instances.kryo -import com.google.api.gax.rpc.ApiException +import com.google.api.gax.rpc.{ApiException, StatusCode} import com.google.cloud.bigtable.data.v2.models.MutateRowsException import com.twitter.chill._ private[coders] class MutateRowsExceptionSerializer extends KSerializer[MutateRowsException] { override def write(kryo: Kryo, output: Output, e: MutateRowsException): Unit = { kryo.writeClassAndObject(output, e.getCause) + kryo.writeObject(output, e.getStatusCode.getCode) val failedMutations = e.getFailedMutations kryo.writeObject(output, failedMutations.size()) failedMutations.forEach { fm => @@ -38,6 +39,12 @@ private[coders] class MutateRowsExceptionSerializer extends KSerializer[MutateRo `type`: Class[MutateRowsException] ): MutateRowsException = { val cause = kryo.readClassAndObject(input).asInstanceOf[Throwable] + // generic status code. we lost transport information during serialization + val code = kryo.readObject(input, classOf[StatusCode.Code]) + val statusCode = new StatusCode() { + override def getCode: StatusCode.Code = code + override def getTransportCode: AnyRef = null + } val size = kryo.readObject(input, classOf[Integer]) val failedMutations = new _root_.java.util.ArrayList[MutateRowsException.FailedMutation](size) (0 until size).foreach { _ => @@ -46,6 +53,6 @@ private[coders] class MutateRowsExceptionSerializer extends KSerializer[MutateRo failedMutations.add(MutateRowsException.FailedMutation.create(index, error)) } val retryable = kryo.readObject(input, classOf[Boolean]) - MutateRowsException.create(cause, failedMutations, retryable) + MutateRowsException.create(cause, statusCode, failedMutations, retryable) } } diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/coders/instance/kryo/GcpSerializerTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/coders/instance/kryo/GcpSerializerTest.scala index 8d67ae7ad1..ddf4c41360 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/coders/instance/kryo/GcpSerializerTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/coders/instance/kryo/GcpSerializerTest.scala @@ -41,7 +41,7 @@ object GcpSerializerTest { implicit val eqMutateRowsException: Equality[MutateRowsException] = { case (a: MutateRowsException, b: MutateRowsException) => eqCause.areEqual(a.getCause, b.getCause) && - a.getStatusCode == b.getStatusCode && + eqStatusCode.areEqual(a.getStatusCode, b.getStatusCode) && a.isRetryable == b.isRetryable && a.getFailedMutations.size() == b.getFailedMutations.size() && a.getFailedMutations.asScala.zip(b.getFailedMutations.asScala).forall { case (x, y) => @@ -71,9 +71,10 @@ class GcpSerializerTest extends AnyFlatSpec with Matchers { "MutateRowsExceptionSerializer" should "roundtrip" in { val cause = new StatusRuntimeException(Status.OK) + val code = GrpcStatusCode.of(Status.OK.getCode) val apiException = new InternalException(cause, GrpcStatusCode.of(Code.OK), false) val failedMutations = List(MutateRowsException.FailedMutation.create(1, apiException)) - val mutateRowsException = MutateRowsException.create(cause, failedMutations.asJava, false) + val mutateRowsException = MutateRowsException.create(cause, code, failedMutations.asJava, false) mutateRowsException coderShould roundtrip() } diff --git a/scio-smb/src/test/avro/user.avsc b/scio-smb/src/test/avro/user.avsc new file mode 100644 index 0000000000..134829746e --- /dev/null +++ b/scio-smb/src/test/avro/user.avsc @@ -0,0 +1,10 @@ +{ + "namespace": "org.apache.beam.sdk.extensions.avro.io", + "type": "record", + "name": "AvroGeneratedUser", + "fields": [ + { "name": "name", "type": "string"}, + { "name": "favorite_number", "type": ["int", "null"]}, + { "name": "favorite_color", "type": ["string", "null"]} + ] +}