diff --git a/scio-parquet/src/main/java/com/spotify/scio/parquet/avro/LogicalTypeSupplier.java b/scio-parquet/src/main/java/com/spotify/scio/parquet/avro/LogicalTypeSupplier.java new file mode 100644 index 0000000000..baffbbbec5 --- /dev/null +++ b/scio-parquet/src/main/java/com/spotify/scio/parquet/avro/LogicalTypeSupplier.java @@ -0,0 +1,40 @@ +/* + * Copyright 2023 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.parquet.avro; + +import org.apache.avro.Conversions; +import org.apache.avro.data.TimeConversions; +import org.apache.avro.generic.GenericData; +import org.apache.avro.specific.SpecificData; +import org.apache.parquet.avro.SpecificDataSupplier; + +/** A SpecificDataSupplier that supplies built-in conversions for Avro LogicalTypes. */ +public class LogicalTypeSupplier extends SpecificDataSupplier { + @Override + public GenericData get() { + SpecificData specificData = new SpecificData(); + specificData.addLogicalTypeConversion(new TimeConversions.DateConversion()); + specificData.addLogicalTypeConversion(new TimeConversions.TimeConversion()); + specificData.addLogicalTypeConversion(new TimeConversions.TimestampConversion()); + specificData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion()); + specificData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion()); + specificData.addLogicalTypeConversion(new Conversions.DecimalConversion()); + specificData.addLogicalTypeConversion(new Conversions.UUIDConversion()); + return specificData; + } +} diff --git a/scio-parquet/src/main/java/com/spotify/scio/parquet/avro/ParquetAvroSink.java b/scio-parquet/src/main/java/com/spotify/scio/parquet/avro/ParquetAvroSink.java index b8175ca4ea..26836c1194 100644 --- a/scio-parquet/src/main/java/com/spotify/scio/parquet/avro/ParquetAvroSink.java +++ b/scio-parquet/src/main/java/com/spotify/scio/parquet/avro/ParquetAvroSink.java @@ -27,7 +27,11 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.util.MimeTypes; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.parquet.avro.AvroDataSupplier; import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.avro.AvroWriteSupport; +import org.apache.parquet.avro.SpecificDataSupplier; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -107,9 +111,24 @@ public ParquetAvroWriter( @Override protected void prepareWrite(WritableByteChannel channel) throws Exception { BeamOutputFile outputFile = BeamOutputFile.of(channel); + Configuration configuration = conf.get(); + AvroParquetWriter.Builder builder = AvroParquetWriter.builder(outputFile).withSchema(schema); - writer = WriterUtils.build(builder, conf.get(), compression); + + // Workaround for PARQUET-2265 + if (configuration.getClass(AvroWriteSupport.AVRO_DATA_SUPPLIER, null) != null) { + Class dataModelSupplier = + configuration.getClass( + AvroWriteSupport.AVRO_DATA_SUPPLIER, + SpecificDataSupplier.class, + AvroDataSupplier.class); + builder = + builder.withDataModel( + ReflectionUtils.newInstance(dataModelSupplier, configuration).get()); + } + + writer = WriterUtils.build(builder, configuration, compression); } @Override diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/package.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/package.scala new file mode 100644 index 0000000000..e01fdf70c4 --- /dev/null +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/package.scala @@ -0,0 +1,40 @@ +/* + * Copyright 2022 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio + +import org.apache.hadoop.conf.Configuration + +package object parquet { + object ParquetConfiguration { + def of(entries: (String, Any)*): Configuration = { + val conf = new Configuration() + entries.foreach { case (k, v) => + v match { + case b: Boolean => conf.setBoolean(k, b) + case f: Float => conf.setFloat(k, f) + case d: Double => conf.setDouble(k, d) + case i: Int => conf.setInt(k, i) + case l: Long => conf.setLong(k, l) + case s: String => conf.set(k, s) + case c: Class[_] => conf.setClass(k, c, c) + case _ => conf.set(k, v.toString) + } + } + conf + } + } +} diff --git a/scio-parquet/src/test/scala/com/spotify/scio/parquet/avro/ParquetAvroIOTest.scala b/scio-parquet/src/test/scala/com/spotify/scio/parquet/avro/ParquetAvroIOTest.scala index 0ea3f9bfe4..bf355a7a32 100644 --- a/scio-parquet/src/test/scala/com/spotify/scio/parquet/avro/ParquetAvroIOTest.scala +++ b/scio-parquet/src/test/scala/com/spotify/scio/parquet/avro/ParquetAvroIOTest.scala @@ -22,15 +22,22 @@ import com.spotify.scio._ import com.spotify.scio.coders.Coder import com.spotify.scio.avro._ import com.spotify.scio.io.{TapSpec, TextIO} +import com.spotify.scio.parquet.ParquetConfiguration import com.spotify.scio.testing._ import com.spotify.scio.values.{SCollection, WindowOptions} -import org.apache.avro.generic.GenericRecord +import org.apache.avro.data.TimeConversions +import org.apache.avro.{Conversion, Conversions, LogicalType, Schema} +import org.apache.avro.generic.{GenericData, GenericRecord, GenericRecordBuilder} +import org.apache.avro.specific.SpecificData import org.apache.beam.sdk.options.PipelineOptionsFactory import org.apache.beam.sdk.transforms.windowing.{BoundedWindow, IntervalWindow, PaneInfo} import org.apache.commons.io.FileUtils -import org.joda.time.{DateTimeFieldType, Duration, Instant} +import org.apache.parquet.avro.{AvroDataSupplier, AvroReadSupport, AvroWriteSupport} +import org.joda.time.{DateTime, DateTimeFieldType, Duration, Instant} import org.scalatest.BeforeAndAfterAll +import java.lang + class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll { private val dir = tmpDir private val specificRecords = (1 to 10).map(AvroUtils.newSpecificRecord) @@ -94,6 +101,127 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll { () } + it should "write and read SpecificRecords with default logical types" in { + val records = + (1 to 10).map(_ => + TestLogicalTypes + .newBuilder() + .setTimestamp(DateTime.now()) + .setDecimal(BigDecimal.decimal(1.0).setScale(2).bigDecimal) + .build() + ) + val path = dir.toPath.resolve("logicalTypesSr").toString + + val sc1 = ScioContext() + sc1 + .parallelize(records) + .saveAsParquetAvroFile( + path, + conf = ParquetConfiguration.of( + AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier] + ) + ) + sc1.run() + () + + val sc2 = ScioContext() + sc2 + .parquetAvroFile[TestLogicalTypes]( + s"$path/*.parquet", + conf = ParquetConfiguration.of( + AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier] + ) + ) + .map(identity) should containInAnyOrder(records) + + sc2.run() + () + } + + it should "write and read GenericRecords with default logical types" in { + + val records: Seq[GenericRecord] = (1 to 10).map { _ => + val gr = new GenericRecordBuilder(TestLogicalTypes.SCHEMA$) + gr.set("timestamp", DateTime.now()) + gr.set( + "decimal", + BigDecimal.decimal(1.0).setScale(2).bigDecimal + ) + gr.build() + } + val path = dir.toPath.resolve("logicalTypesGr").toString + + implicit val coder = { + GenericData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion) + GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion) + Coder.avroGenericRecordCoder(TestLogicalTypes.SCHEMA$) + } + + val sc1 = ScioContext() + sc1 + .parallelize(records) + .saveAsParquetAvroFile( + path, + schema = TestLogicalTypes.SCHEMA$, + conf = ParquetConfiguration.of( + AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier] + ) + ) + sc1.run() + () + + val sc2 = ScioContext() + sc2 + .parquetAvroFile[GenericRecord]( + s"$path/*.parquet", + projection = TestLogicalTypes.SCHEMA$, + conf = ParquetConfiguration.of( + AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier] + ) + ) + .map(identity) should containInAnyOrder(records) + + sc2.run() + () + } + + it should "write and read SpecificRecords with custom logical types" in { + val records = + (1 to 10).map(_ => + TestLogicalTypes + .newBuilder() + .setTimestamp(DateTime.now()) + .setDecimal(BigDecimal.decimal(1.0).setScale(2).bigDecimal) + .build() + ) + val path = dir.toPath.resolve("logicalTypesCustom").toString + + val sc1 = ScioContext() + sc1 + .parallelize(records) + .saveAsParquetAvroFile( + path, + conf = ParquetConfiguration.of( + AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[CustomLogicalTypeSupplier] + ) + ) + sc1.run() + () + + val sc2 = ScioContext() + sc2 + .parquetAvroFile[TestLogicalTypes]( + s"$path/*.parquet", + conf = ParquetConfiguration.of( + AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[CustomLogicalTypeSupplier] + ) + ) + .map(identity) should containInAnyOrder(records) + + sc2.run() + () + } + it should "read with incomplete projection" in { val dir = tmpDir @@ -301,3 +429,25 @@ object ParquetTestJob { sc.run().waitUntilDone() } } + +case class CustomLogicalTypeSupplier() extends AvroDataSupplier { + override def get(): GenericData = { + val specificData = new SpecificData() + specificData.addLogicalTypeConversion(new Conversion[DateTime] { + override def getConvertedType: Class[DateTime] = classOf[DateTime] + override def getLogicalTypeName: String = "timestamp-millis" + + override def toLong( + value: DateTime, + schema: Schema, + `type`: LogicalType + ): lang.Long = + value.toInstant.getMillis + + override def fromLong(value: lang.Long, schema: Schema, `type`: LogicalType): DateTime = + Instant.ofEpochMilli(value).toDateTime + }) + specificData.addLogicalTypeConversion(new Conversions.DecimalConversion) + specificData + } +} diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroLogicalTypeSupplier.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroLogicalTypeSupplier.java new file mode 100644 index 0000000000..a3875f24ff --- /dev/null +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroLogicalTypeSupplier.java @@ -0,0 +1,39 @@ +/* + * Copyright 2023 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.beam.sdk.extensions.smb; + +import org.apache.avro.Conversions; +import org.apache.avro.data.TimeConversions; +import org.apache.avro.generic.GenericData; +import org.apache.avro.specific.SpecificData; +import org.apache.parquet.avro.SpecificDataSupplier; + +public class AvroLogicalTypeSupplier extends SpecificDataSupplier { + @Override + public GenericData get() { + SpecificData specificData = new SpecificData(); + specificData.addLogicalTypeConversion(new TimeConversions.DateConversion()); + specificData.addLogicalTypeConversion(new TimeConversions.TimeConversion()); + specificData.addLogicalTypeConversion(new TimeConversions.TimestampConversion()); + specificData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion()); + specificData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion()); + specificData.addLogicalTypeConversion(new Conversions.DecimalConversion()); + specificData.addLogicalTypeConversion(new Conversions.UUIDConversion()); + return specificData; + } +} diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java index 959fb901e4..f0009cdfd8 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java @@ -32,9 +32,13 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.MimeTypes; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.parquet.avro.AvroDataSupplier; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.avro.AvroWriteSupport; +import org.apache.parquet.avro.SpecificDataSupplier; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.hadoop.ParquetOutputFormat; @@ -194,15 +198,30 @@ private ParquetAvroSink( @Override public void open(WritableByteChannel channel) throws IOException { // https://github.com/apache/parquet-mr/tree/master/parquet-hadoop#class-parquetoutputformat + final Configuration configuration = conf.get(); + int rowGroupSize = - conf.get().getInt(ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE); - writer = + configuration.getInt(ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE); + AvroParquetWriter.Builder builder = AvroParquetWriter.builder(new ParquetOutputFile(channel)) .withSchema(schemaSupplier.get()) .withCompressionCodec(compression) - .withConf(conf.get()) - .withRowGroupSize(rowGroupSize) - .build(); + .withConf(configuration) + .withRowGroupSize(rowGroupSize); + + // Workaround for PARQUET-2265 + if (configuration.getClass(AvroWriteSupport.AVRO_DATA_SUPPLIER, null) != null) { + Class dataModelSupplier = + configuration.getClass( + AvroWriteSupport.AVRO_DATA_SUPPLIER, + SpecificDataSupplier.class, + AvroDataSupplier.class); + builder = + builder.withDataModel( + ReflectionUtils.newInstance(dataModelSupplier, configuration).get()); + } + + writer = builder.build(); } @Override diff --git a/scio-smb/src/test/avro/schema.avsc b/scio-smb/src/test/avro/schema.avsc new file mode 100644 index 0000000000..93e46c8676 --- /dev/null +++ b/scio-smb/src/test/avro/schema.avsc @@ -0,0 +1,25 @@ +[ +{ + "type": "record", + "name": "TestLogicalTypes", + "namespace": "com.spotify.scio.smb", + "doc": "Record for testing logical types", + "fields": [ + { + "name": "timestamp", + "type": { + "type": "long", "logicalType": "timestamp-millis" + } + }, + { + "name": "decimal", + "type": { + "type": "bytes", + "logicalType": "decimal", + "precision": 4, + "scale": 2 + } + } + ] +} +] diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperationsTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperationsTest.java index deb7161ab5..1c2447c335 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperationsTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperationsTest.java @@ -20,6 +20,7 @@ import static org.apache.beam.sdk.extensions.smb.TestUtils.fromFolder; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import com.spotify.scio.smb.TestLogicalTypes; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -35,14 +36,20 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.avro.AvroDataSupplier; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.avro.AvroWriteSupport; import org.apache.parquet.filter2.predicate.FilterApi; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.hamcrest.MatcherAssert; +import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import scala.math.BigDecimal; /** Unit tests for {@link ParquetAvroFileOperations}. */ public class ParquetAvroFileOperationsTest { @@ -114,6 +121,42 @@ public void testSpecificRecord() throws Exception { Assert.assertEquals(records, actual); } + @Test + public void testLogicalTypes() throws Exception { + final Configuration conf = new Configuration(); + conf.setClass( + AvroWriteSupport.AVRO_DATA_SUPPLIER, AvroLogicalTypeSupplier.class, AvroDataSupplier.class); + conf.setClass( + AvroReadSupport.AVRO_DATA_SUPPLIER, AvroLogicalTypeSupplier.class, AvroDataSupplier.class); + + final ParquetAvroFileOperations fileOperations = + ParquetAvroFileOperations.of( + TestLogicalTypes.getClassSchema(), CompressionCodecName.UNCOMPRESSED, conf); + final ResourceId file = + fromFolder(output) + .resolve("file.parquet", ResolveOptions.StandardResolveOptions.RESOLVE_FILE); + + final List records = + IntStream.range(0, 10) + .mapToObj( + i -> + TestLogicalTypes.newBuilder() + .setTimestamp(DateTime.now()) + .setDecimal(BigDecimal.decimal(1.0).setScale(2).bigDecimal()) + .build()) + .collect(Collectors.toList()); + final FileOperations.Writer writer = fileOperations.createWriter(file); + for (TestLogicalTypes record : records) { + writer.write(record); + } + writer.close(); + + final List actual = new ArrayList<>(); + fileOperations.iterator(file).forEachRemaining(actual::add); + + Assert.assertEquals(records, actual); + } + @Test public void testProjection() throws Exception { final ResourceId file = diff --git a/scio-test/src/test/avro/schema.avsc b/scio-test/src/test/avro/schema.avsc index 0165509cc9..b3d294e7de 100644 --- a/scio-test/src/test/avro/schema.avsc +++ b/scio-test/src/test/avro/schema.avsc @@ -81,4 +81,28 @@ "null", {"type": "array", "items": "string"} ], "default": null} ] -}] +}, +{ + "type": "record", + "name": "TestLogicalTypes", + "namespace": "com.spotify.scio.avro", + "doc": "Record for testing logical types", + "fields": [ + { + "name": "timestamp", + "type": { + "type": "long", "logicalType": "timestamp-millis" + } + }, + { + "name": "decimal", + "type": { + "type": "bytes", + "logicalType": "decimal", + "precision": 4, + "scale": 2 + } + } + ] +} +]