Skip to content

Commit

Permalink
Add opt-in Logical type support to 0.11.x branch (#4792)
Browse files Browse the repository at this point in the history
* Support logical types in SpecificRecord Parquet writes (#4772)

* Reproduce SpecificRecords/logicalType bug

* Fix bug

* Cleanup test

* logical types must be added from worker VM, not launcher

* Fix for both reads and writes

* cleanup

* Don't set supplier if user has explicitly configured one

* Test for custom logical type provider

* Pass dataModel to AvroParquetWriter.Builder

* Only override .withDataModel if Configuration key exists

* Isolate SpecificData instances

* fix test

* update copyright year

* cleanup

* Add support for scio-smb Parquet writes

* Manage config defaults via core-site.xml

* Fix formatting

* Should be a standalone class

* add test

* +header

* newline

* Add remaining Converters

* simplify SpecificData creation

* Correctly encode logical types in GenericRecord

* remove unused import

* Make Parquet logical type support opt-in (#4782)

* Make logical type support for Parquet-Avro opt-in

* Update documentation

* conf->jobConf

* scalafmt

* Update scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala

Co-authored-by: Michel Davit <micheld@spotify.com>

* Add parens

---------

Co-authored-by: Michel Davit <micheld@spotify.com>

---------

Co-authored-by: Michel Davit <micheld@spotify.com>
  • Loading branch information
clairemcginty and RustedBones authored May 3, 2023
1 parent c4c51d9 commit 5de2e2a
Show file tree
Hide file tree
Showing 9 changed files with 408 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2023 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.scio.parquet.avro;

import org.apache.avro.Conversions;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;
import org.apache.avro.specific.SpecificData;
import org.apache.parquet.avro.SpecificDataSupplier;

/** A SpecificDataSupplier that supplies built-in conversions for Avro LogicalTypes. */
public class LogicalTypeSupplier extends SpecificDataSupplier {
@Override
public GenericData get() {
SpecificData specificData = new SpecificData();
specificData.addLogicalTypeConversion(new TimeConversions.DateConversion());
specificData.addLogicalTypeConversion(new TimeConversions.TimeConversion());
specificData.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
specificData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
specificData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
specificData.addLogicalTypeConversion(new Conversions.DecimalConversion());
specificData.addLogicalTypeConversion(new Conversions.UUIDConversion());
return specificData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.parquet.avro.AvroDataSupplier;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.avro.SpecificDataSupplier;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

Expand Down Expand Up @@ -107,9 +111,24 @@ public ParquetAvroWriter(
@Override
protected void prepareWrite(WritableByteChannel channel) throws Exception {
BeamOutputFile outputFile = BeamOutputFile.of(channel);
Configuration configuration = conf.get();

AvroParquetWriter.Builder<T> builder =
AvroParquetWriter.<T>builder(outputFile).withSchema(schema);
writer = WriterUtils.build(builder, conf.get(), compression);

// Workaround for PARQUET-2265
if (configuration.getClass(AvroWriteSupport.AVRO_DATA_SUPPLIER, null) != null) {
Class<? extends AvroDataSupplier> dataModelSupplier =
configuration.getClass(
AvroWriteSupport.AVRO_DATA_SUPPLIER,
SpecificDataSupplier.class,
AvroDataSupplier.class);
builder =
builder.withDataModel(
ReflectionUtils.newInstance(dataModelSupplier, configuration).get());
}

writer = WriterUtils.build(builder, configuration, compression);
}

@Override
Expand Down
40 changes: 40 additions & 0 deletions scio-parquet/src/main/scala/com/spotify/scio/parquet/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2022 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.spotify.scio

import org.apache.hadoop.conf.Configuration

package object parquet {
object ParquetConfiguration {
def of(entries: (String, Any)*): Configuration = {
val conf = new Configuration()
entries.foreach { case (k, v) =>
v match {
case b: Boolean => conf.setBoolean(k, b)
case f: Float => conf.setFloat(k, f)
case d: Double => conf.setDouble(k, d)
case i: Int => conf.setInt(k, i)
case l: Long => conf.setLong(k, l)
case s: String => conf.set(k, s)
case c: Class[_] => conf.setClass(k, c, c)
case _ => conf.set(k, v.toString)
}
}
conf
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,22 @@ import com.spotify.scio._
import com.spotify.scio.coders.Coder
import com.spotify.scio.avro._
import com.spotify.scio.io.{TapSpec, TextIO}
import com.spotify.scio.parquet.ParquetConfiguration
import com.spotify.scio.testing._
import com.spotify.scio.values.{SCollection, WindowOptions}
import org.apache.avro.generic.GenericRecord
import org.apache.avro.data.TimeConversions
import org.apache.avro.{Conversion, Conversions, LogicalType, Schema}
import org.apache.avro.generic.{GenericData, GenericRecord, GenericRecordBuilder}
import org.apache.avro.specific.SpecificData
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.transforms.windowing.{BoundedWindow, IntervalWindow, PaneInfo}
import org.apache.commons.io.FileUtils
import org.joda.time.{DateTimeFieldType, Duration, Instant}
import org.apache.parquet.avro.{AvroDataSupplier, AvroReadSupport, AvroWriteSupport}
import org.joda.time.{DateTime, DateTimeFieldType, Duration, Instant}
import org.scalatest.BeforeAndAfterAll

import java.lang

class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll {
private val dir = tmpDir
private val specificRecords = (1 to 10).map(AvroUtils.newSpecificRecord)
Expand Down Expand Up @@ -94,6 +101,127 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll {
()
}

it should "write and read SpecificRecords with default logical types" in {
val records =
(1 to 10).map(_ =>
TestLogicalTypes
.newBuilder()
.setTimestamp(DateTime.now())
.setDecimal(BigDecimal.decimal(1.0).setScale(2).bigDecimal)
.build()
)
val path = dir.toPath.resolve("logicalTypesSr").toString

val sc1 = ScioContext()
sc1
.parallelize(records)
.saveAsParquetAvroFile(
path,
conf = ParquetConfiguration.of(
AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
)
)
sc1.run()
()

val sc2 = ScioContext()
sc2
.parquetAvroFile[TestLogicalTypes](
s"$path/*.parquet",
conf = ParquetConfiguration.of(
AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
)
)
.map(identity) should containInAnyOrder(records)

sc2.run()
()
}

it should "write and read GenericRecords with default logical types" in {

val records: Seq[GenericRecord] = (1 to 10).map { _ =>
val gr = new GenericRecordBuilder(TestLogicalTypes.SCHEMA$)
gr.set("timestamp", DateTime.now())
gr.set(
"decimal",
BigDecimal.decimal(1.0).setScale(2).bigDecimal
)
gr.build()
}
val path = dir.toPath.resolve("logicalTypesGr").toString

implicit val coder = {
GenericData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion)
GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion)
Coder.avroGenericRecordCoder(TestLogicalTypes.SCHEMA$)
}

val sc1 = ScioContext()
sc1
.parallelize(records)
.saveAsParquetAvroFile(
path,
schema = TestLogicalTypes.SCHEMA$,
conf = ParquetConfiguration.of(
AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
)
)
sc1.run()
()

val sc2 = ScioContext()
sc2
.parquetAvroFile[GenericRecord](
s"$path/*.parquet",
projection = TestLogicalTypes.SCHEMA$,
conf = ParquetConfiguration.of(
AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
)
)
.map(identity) should containInAnyOrder(records)

sc2.run()
()
}

it should "write and read SpecificRecords with custom logical types" in {
val records =
(1 to 10).map(_ =>
TestLogicalTypes
.newBuilder()
.setTimestamp(DateTime.now())
.setDecimal(BigDecimal.decimal(1.0).setScale(2).bigDecimal)
.build()
)
val path = dir.toPath.resolve("logicalTypesCustom").toString

val sc1 = ScioContext()
sc1
.parallelize(records)
.saveAsParquetAvroFile(
path,
conf = ParquetConfiguration.of(
AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[CustomLogicalTypeSupplier]
)
)
sc1.run()
()

val sc2 = ScioContext()
sc2
.parquetAvroFile[TestLogicalTypes](
s"$path/*.parquet",
conf = ParquetConfiguration.of(
AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[CustomLogicalTypeSupplier]
)
)
.map(identity) should containInAnyOrder(records)

sc2.run()
()
}

it should "read with incomplete projection" in {
val dir = tmpDir

Expand Down Expand Up @@ -301,3 +429,25 @@ object ParquetTestJob {
sc.run().waitUntilDone()
}
}

case class CustomLogicalTypeSupplier() extends AvroDataSupplier {
override def get(): GenericData = {
val specificData = new SpecificData()
specificData.addLogicalTypeConversion(new Conversion[DateTime] {
override def getConvertedType: Class[DateTime] = classOf[DateTime]
override def getLogicalTypeName: String = "timestamp-millis"

override def toLong(
value: DateTime,
schema: Schema,
`type`: LogicalType
): lang.Long =
value.toInstant.getMillis

override def fromLong(value: lang.Long, schema: Schema, `type`: LogicalType): DateTime =
Instant.ofEpochMilli(value).toDateTime
})
specificData.addLogicalTypeConversion(new Conversions.DecimalConversion)
specificData
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2023 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.beam.sdk.extensions.smb;

import org.apache.avro.Conversions;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;
import org.apache.avro.specific.SpecificData;
import org.apache.parquet.avro.SpecificDataSupplier;

public class AvroLogicalTypeSupplier extends SpecificDataSupplier {
@Override
public GenericData get() {
SpecificData specificData = new SpecificData();
specificData.addLogicalTypeConversion(new TimeConversions.DateConversion());
specificData.addLogicalTypeConversion(new TimeConversions.TimeConversion());
specificData.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
specificData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
specificData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
specificData.addLogicalTypeConversion(new Conversions.DecimalConversion());
specificData.addLogicalTypeConversion(new Conversions.UUIDConversion());
return specificData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.parquet.avro.AvroDataSupplier;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.avro.SpecificDataSupplier;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.ParquetOutputFormat;
Expand Down Expand Up @@ -194,15 +198,30 @@ private ParquetAvroSink(
@Override
public void open(WritableByteChannel channel) throws IOException {
// https://github.com/apache/parquet-mr/tree/master/parquet-hadoop#class-parquetoutputformat
final Configuration configuration = conf.get();

int rowGroupSize =
conf.get().getInt(ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE);
writer =
configuration.getInt(ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE);
AvroParquetWriter.Builder<ValueT> builder =
AvroParquetWriter.<ValueT>builder(new ParquetOutputFile(channel))
.withSchema(schemaSupplier.get())
.withCompressionCodec(compression)
.withConf(conf.get())
.withRowGroupSize(rowGroupSize)
.build();
.withConf(configuration)
.withRowGroupSize(rowGroupSize);

// Workaround for PARQUET-2265
if (configuration.getClass(AvroWriteSupport.AVRO_DATA_SUPPLIER, null) != null) {
Class<? extends AvroDataSupplier> dataModelSupplier =
configuration.getClass(
AvroWriteSupport.AVRO_DATA_SUPPLIER,
SpecificDataSupplier.class,
AvroDataSupplier.class);
builder =
builder.withDataModel(
ReflectionUtils.newInstance(dataModelSupplier, configuration).get());
}

writer = builder.build();
}

@Override
Expand Down
Loading

0 comments on commit 5de2e2a

Please sign in to comment.