Skip to content

Commit

Permalink
Use default beam avro version (#5415)
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones authored Jul 8, 2024
1 parent c915a53 commit 80a0ddb
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 151 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@ jobs:
with:
token: ${{ secrets.CODECOV_TOKEN }}

avro-latest:
name: Test Latest Avro
avro-legacy:
name: Test Legacy Avro
if: github.event_name != 'pull_request' && github.ref == 'refs/heads/main'
strategy:
matrix:
Expand Down Expand Up @@ -388,7 +388,7 @@ jobs:

- name: Test
env:
JAVA_OPTS: '-Davro.version=1.11.3'
JAVA_OPTS: '-Davro.version=1.8.2'
run: sbt '++ ${{ matrix.scala }}' scio-avro/test

it-test:
Expand Down
17 changes: 9 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -317,14 +317,14 @@ ThisBuild / githubWorkflowAddedJobs ++= Seq(
javas = List(javaDefault)
),
WorkflowJob(
"avro-latest",
"Test Latest Avro",
"avro-legacy",
"Test Legacy Avro",
WorkflowStep.CheckoutFull ::
WorkflowStep.SetupJava(List(javaDefault)) :::
List(
WorkflowStep.Sbt(
List("scio-avro/test"),
env = Map("JAVA_OPTS" -> "-Davro.version=1.11.3"),
env = Map("JAVA_OPTS" -> "-Davro.version=1.8.2"),
name = Some("Test")
)
),
Expand Down Expand Up @@ -815,7 +815,7 @@ lazy val `scio-test-parquet` = project
"com.spotify" %% "magnolify-parquet" % magnolifyVersion,
"org.apache.avro" % "avro" % avroVersion,
"org.apache.hadoop" % "hadoop-common" % hadoopVersion,
"org.apache.parquet" % "parquet-avro" % parquetVersion excludeAll (Exclude.avro),
"org.apache.parquet" % "parquet-avro" % parquetVersion,
"org.apache.parquet" % "parquet-column" % parquetVersion,
"org.apache.parquet" % "parquet-common" % parquetVersion,
"org.apache.parquet" % "parquet-hadoop" % parquetVersion,
Expand Down Expand Up @@ -853,7 +853,7 @@ lazy val `scio-avro` = project
"me.lyh" %% "protobuf-generic" % protobufGenericVersion,
"org.apache.avro" % "avro" % avroVersion,
"org.apache.beam" % "beam-sdks-java-core" % beamVersion,
"org.apache.beam" % "beam-sdks-java-extensions-avro" % beamVersion excludeAll (Exclude.avro),
"org.apache.beam" % "beam-sdks-java-extensions-avro" % beamVersion,
"org.apache.beam" % "beam-vendor-guava-32_1_2-jre" % beamVendorVersion,
"org.slf4j" % "slf4j-api" % slf4jVersion,
// test
Expand Down Expand Up @@ -927,7 +927,7 @@ lazy val `scio-google-cloud-platform` = project
"org.apache.avro" % "avro" % avroVersion,
"org.apache.beam" % "beam-sdks-java-core" % beamVersion,
"org.apache.beam" % "beam-sdks-java-extensions-google-cloud-platform-core" % beamVersion,
"org.apache.beam" % "beam-sdks-java-io-google-cloud-platform" % beamVersion excludeAll (Exclude.avro),
"org.apache.beam" % "beam-sdks-java-io-google-cloud-platform" % beamVersion,
"org.apache.beam" % "beam-vendor-guava-32_1_2-jre" % beamVendorVersion,
"org.slf4j" % "slf4j-api" % slf4jVersion,
// patch jackson versions
Expand Down Expand Up @@ -1183,7 +1183,7 @@ lazy val `scio-parquet` = project
"org.apache.beam" % "beam-vendor-guava-32_1_2-jre" % beamVendorVersion,
"org.apache.hadoop" % "hadoop-common" % hadoopVersion,
"org.apache.hadoop" % "hadoop-mapreduce-client-core" % hadoopVersion,
"org.apache.parquet" % "parquet-avro" % parquetVersion excludeAll (Exclude.avro),
"org.apache.parquet" % "parquet-avro" % parquetVersion,
"org.apache.parquet" % "parquet-column" % parquetVersion,
"org.apache.parquet" % "parquet-common" % parquetVersion,
"org.apache.parquet" % "parquet-hadoop" % parquetVersion,
Expand Down Expand Up @@ -1557,7 +1557,7 @@ lazy val `scio-smb` = project
"org.apache.beam" % "beam-sdks-java-io-google-cloud-platform" % beamVersion % Provided, // scio-gcp
"org.apache.beam" % "beam-sdks-java-io-hadoop-common" % beamVersion % Provided, // scio-parquet
"org.apache.hadoop" % "hadoop-common" % hadoopVersion % Provided, // scio-parquet
"org.apache.parquet" % "parquet-avro" % parquetVersion % Provided excludeAll (Exclude.avro), // scio-parquet
"org.apache.parquet" % "parquet-avro" % parquetVersion % Provided, // scio-parquet
"org.apache.parquet" % "parquet-column" % parquetVersion % Provided, // scio-parquet
"org.apache.parquet" % "parquet-common" % parquetVersion % Provided, // scio-parquet
"org.apache.parquet" % "parquet-hadoop" % parquetVersion % Provided, // scio-parquet
Expand Down Expand Up @@ -1878,6 +1878,7 @@ ThisBuild / dependencyOverrides ++= Seq(
"io.opencensus" % "opencensus-contrib-http-util" % opencensusVersion,
"io.perfmark" % "perfmark-api" % perfmarkVersion,
"org.apache.avro" % "avro" % avroVersion,
"org.apache.commons" % "commons-compress" % commonsCompressVersion,
"org.apache.commons" % "commons-lang3" % commonsLang3Version,
"org.apache.httpcomponents" % "httpclient" % httpClientVersion,
"org.apache.httpcomponents" % "httpcore" % httpCoreVersion,
Expand Down
2 changes: 0 additions & 2 deletions project/Exclude.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import sbt._

object Exclude {
// do not pull newer avro version
val avro: ExclusionRule = "org.apache.avro" % "avro"
val gcsio: ExclusionRule = "com.google.cloud.bigdataoss" % "gcsio"
// do not pull newer jackson version
val jacksons: Seq[ExclusionRule] = Seq(
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.5.3")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.1.0")
addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.4.7")

val avroVersion = sys.props.get("avro.version").getOrElse("1.8.2")
val avroVersion = sys.props.get("avro.version").getOrElse("1.11.3")
libraryDependencies ++= Seq(
"org.apache.avro" % "avro-compiler" % avroVersion,
"org.typelevel" %% "scalac-options" % "0.1.5"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ object BeamExample {
// Beam `PTransform`
.applyTransform(window)
// Scio `map` transform
.map(a => KV.of(a.getName.toString, a.getAmount))
.map(a => KV.of(a.getName.toString, Double.box(a.getAmount)))
// Beam `PTransform`
.applyTransform(sumByKey)
// Scio `map` transform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.spotify.scio.extra.bigquery

import com.spotify.scio.extra.bigquery.AvroConverters.AvroConversionException

import java.math.{BigDecimal => JBigDecimal}
import java.nio.ByteBuffer
import java.util
Expand All @@ -27,48 +28,61 @@ import org.apache.avro.Schema
import org.apache.avro.generic.GenericData.EnumSymbol
import org.apache.avro.generic.{GenericFixed, IndexedRecord}
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding
import org.joda.time.format.DateTimeFormat
import org.joda.time.{DateTime, LocalDate, LocalTime}

import java.time.format.DateTimeFormatter
import java.time.{Instant, LocalDate, LocalTime, ZoneOffset}
import scala.jdk.CollectionConverters._

/**
* Converts an [[org.apache.avro.generic.IndexedRecord IndexedRecord]] into a
* [[com.spotify.scio.bigquery.TableRow TableRow]].
*/
private[bigquery] trait ToTableRow {
private lazy val encodingPropName: String = "bigquery.bytes.encoder"
private lazy val base64Encoding: BaseEncoding = BaseEncoding.base64()
private lazy val hexEncoding: BaseEncoding = BaseEncoding.base16()
private object ToTableRow {
private lazy val EncodingPropName: String = "bigquery.bytes.encoder"
private lazy val Base64Encoding: BaseEncoding = BaseEncoding.base64()
private lazy val HexEncoding: BaseEncoding = BaseEncoding.base16()

// YYYY-[M]M-[D]D
private[this] val localDateFormatter =
DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC()
private lazy val JodaLocalDateFormatter =
org.joda.time.format.DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC()
private lazy val LocalDateFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC)

// YYYY-[M]M-[D]D[( |T)[H]H:[M]M:[S]S[.DDDDDD]]
private[this] val localTimeFormatter =
DateTimeFormat.forPattern("HH:mm:ss.SSSSSS")
private lazy val JodaLocalTimeFormatter =
org.joda.time.format.DateTimeFormat.forPattern("HH:mm:ss.SSSSSS")
private lazy val LocalTimeFormatter =
DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS")

// YYYY-[M]M-[D]D[( |T)[H]H:[M]M:[S]S[.DDDDDD]][time zone]
private[this] val timestampFormatter =
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS")
private lazy val JodaTimestampFormatter =
org.joda.time.format.DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS")
private lazy val TimestampFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS").withZone(ZoneOffset.UTC)
}

/**
* Converts an [[org.apache.avro.generic.IndexedRecord IndexedRecord]] into a
* [[com.spotify.scio.bigquery.TableRow TableRow]].
*/
private[bigquery] trait ToTableRow {
import ToTableRow._

private[bigquery] def toTableRowField(fieldValue: Any, field: Schema.Field): Any =
fieldValue match {
case x: CharSequence => x.toString
case x: EnumSymbol => x.toString
case x: Enum[_] => x.name()
case x: JBigDecimal => x.toString
case x: Number => x
case x: Boolean => x
case x: GenericFixed => encodeByteArray(x.bytes(), field.schema())
case x: ByteBuffer => encodeByteArray(toByteArray(x), field.schema())
case x: util.Map[_, _] => toTableRowFromMap(x.asScala, field)
case x: java.lang.Iterable[_] => toTableRowFromIterable(x.asScala, field)
case x: IndexedRecord => AvroConverters.toTableRow(x)
case x: LocalDate => localDateFormatter.print(x)
case x: LocalTime => localTimeFormatter.print(x)
case x: DateTime => timestampFormatter.print(x)
case x: CharSequence => x.toString
case x: EnumSymbol => x.toString
case x: Enum[_] => x.name()
case x: JBigDecimal => x.toString
case x: Number => x
case x: Boolean => x
case x: GenericFixed => encodeByteArray(x.bytes(), field.schema())
case x: ByteBuffer => encodeByteArray(toByteArray(x), field.schema())
case x: util.Map[_, _] => toTableRowFromMap(x.asScala, field)
case x: java.lang.Iterable[_] => toTableRowFromIterable(x.asScala, field)
case x: IndexedRecord => AvroConverters.toTableRow(x)
case x: LocalDate => LocalDateFormatter.format(x)
case x: LocalTime => LocalTimeFormatter.format(x)
case x: Instant => TimestampFormatter.format(x)
case x: org.joda.time.LocalDate => JodaLocalDateFormatter.print(x)
case x: org.joda.time.LocalTime => JodaLocalTimeFormatter.print(x)
case x: org.joda.time.DateTime => JodaTimestampFormatter.print(x)
case _ =>
throw AvroConversionException(
s"ToTableRow conversion failed:" +
Expand Down Expand Up @@ -101,12 +115,12 @@ private[bigquery] trait ToTableRow {
.asJava

private def encodeByteArray(bytes: Array[Byte], fieldSchema: Schema): String =
Option(fieldSchema.getProp(encodingPropName)) match {
case Some("BASE64") => base64Encoding.encode(bytes)
case Some("HEX") => hexEncoding.encode(bytes)
Option(fieldSchema.getProp(EncodingPropName)) match {
case Some("BASE64") => Base64Encoding.encode(bytes)
case Some("HEX") => HexEncoding.encode(bytes)
case Some(encoding) =>
throw AvroConversionException(s"Unsupported encoding $encoding")
case None => base64Encoding.encode(bytes)
case None => Base64Encoding.encode(bytes)
}

private def toByteArray(buffer: ByteBuffer) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import com.spotify.scio.bigquery.TableRow
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.avro.generic.GenericData.EnumSymbol
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding
import org.joda.time.{DateTime, LocalDate, LocalTime}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.time.{Instant, LocalDate, LocalTime}
import scala.jdk.CollectionConverters._

class ToTableRowTest extends AnyFlatSpec with Matchers {
Expand Down Expand Up @@ -95,9 +95,9 @@ class ToTableRowTest extends AnyFlatSpec with Matchers {

val date: LocalDate = LocalDate.parse("2019-10-29")
val timeMillis: LocalTime = LocalTime.parse("01:24:52.211")
val timeMicros = 1234L
val timestampMillis: DateTime = DateTime.parse("2019-10-29T05:24:52.215")
val timestampMicros = 4325L
val timeMicros: LocalTime = LocalTime.parse("01:24:52.211112")
val timestampMillis: Instant = Instant.parse("2019-10-29T05:24:52.215Z")
val timestampMicros: Instant = Instant.parse("2019-10-29T05:24:52.215521Z")
val decimal = new JBigDecimal("3.14")

val expectedLogicalTypeOutput: TableRow = new TableRow()
Expand All @@ -111,9 +111,9 @@ class ToTableRowTest extends AnyFlatSpec with Matchers {
.set("dateField", "2019-10-29")
.set("decimalField", decimal.toString)
.set("timeMillisField", "01:24:52.211000")
.set("timeMicrosField", timeMicros)
.set("timeMicrosField", "01:24:52.211112")
.set("timestampMillisField", "2019-10-29T05:24:52.215000")
.set("timestampMicrosField", timestampMicros)
.set("timestampMicrosField", "2019-10-29T05:24:52.215521")

"ToTableRowWithLogicalType" should "convert a SpecificRecord with Logical Types to TableRow" in {
val specificRecord = AvroExampleWithLogicalType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,22 +112,8 @@ public ParquetAvroWriter(
protected void prepareWrite(WritableByteChannel channel) throws Exception {
BeamOutputFile outputFile = BeamOutputFile.of(channel);
Configuration configuration = conf.get();

AvroParquetWriter.Builder<T> builder =
AvroParquetWriter.<T>builder(outputFile).withSchema(schema);

// Workaround for PARQUET-2265
if (configuration.getClass(AvroWriteSupport.AVRO_DATA_SUPPLIER, null) != null) {
Class<? extends AvroDataSupplier> dataModelSupplier =
configuration.getClass(
AvroWriteSupport.AVRO_DATA_SUPPLIER,
SpecificDataSupplier.class,
AvroDataSupplier.class);
builder =
builder.withDataModel(
ReflectionUtils.newInstance(dataModelSupplier, configuration).get());
}

writer = WriterUtils.build(builder, configuration, compression);
}

Expand Down
Loading

0 comments on commit 80a0ddb

Please sign in to comment.