Skip to content

Commit

Permalink
Make Parquet logical type support opt-in (#4782)
Browse files Browse the repository at this point in the history
* Make logical type support for Parquet-Avro opt-in

* Update documentation

* conf->jobConf

* scalafmt

* Update scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala

Co-authored-by: Michel Davit <micheld@spotify.com>

* Add parens

---------

Co-authored-by: Michel Davit <micheld@spotify.com>
  • Loading branch information
clairemcginty and RustedBones committed May 3, 2023
1 parent 8cce51c commit 3044403
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 30 deletions.
10 changes: 0 additions & 10 deletions scio-parquet/src/main/resources/core-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,4 @@
<name>fs.gs.inputstream.fadvise</name>
<value>RANDOM</value>
</property>
<property>
<!-- Supplies logical type support for writes -->
<name>parquet.avro.write.data.supplier</name>
<value>com.spotify.scio.parquet.avro.LogicalTypeSupplier</value>
</property>
<property>
<!-- Supplies logical type support for reads -->
<name>parquet.avro.data.supplier</name>
<value>com.spotify.scio.parquet.avro.LogicalTypeSupplier</value>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,23 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll {
val sc1 = ScioContext()
sc1
.parallelize(records)
.saveAsParquetAvroFile(path)
.saveAsParquetAvroFile(
path,
conf = ParquetConfiguration.of(
AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
)
)
sc1.run()
()

val sc2 = ScioContext()
sc2
.parquetAvroFile[TestLogicalTypes](s"$path/*.parquet")
.parquetAvroFile[TestLogicalTypes](
s"$path/*.parquet",
conf = ParquetConfiguration.of(
AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
)
)
.map(identity) should containInAnyOrder(records)

sc2.run()
Expand Down Expand Up @@ -150,13 +160,25 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll {
val sc1 = ScioContext()
sc1
.parallelize(records)
.saveAsParquetAvroFile(path, schema = TestLogicalTypes.SCHEMA$)
.saveAsParquetAvroFile(
path,
schema = TestLogicalTypes.SCHEMA$,
conf = ParquetConfiguration.of(
AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
)
)
sc1.run()
()

val sc2 = ScioContext()
sc2
.parquetAvroFile[GenericRecord](s"$path/*.parquet", projection = TestLogicalTypes.SCHEMA$)
.parquetAvroFile[GenericRecord](
s"$path/*.parquet",
projection = TestLogicalTypes.SCHEMA$,
conf = ParquetConfiguration.of(
AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
)
)
.map(identity) should containInAnyOrder(records)

sc2.run()
Expand Down Expand Up @@ -392,6 +414,48 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll {
.output(TextIO("output"))(_ should containSingleValue(("foo", 2.0).toString))
.run()
}

it should "detect logical types in schemas" in {
val schemaParser = new Schema.Parser()

ParquetAvroIO.containsLogicalType(
schemaParser.parse(
"""{"type":"record", "name":"SomeRecord1", "fields":[{"name":"someField","type":"string"}]}"""
)
) shouldBe false

ParquetAvroIO.containsLogicalType(
schemaParser.parse(
"""{"type":"record", "name":"SomeRecord2", "fields":[
|{"name":"someField","type":{"type": "long", "logicalType": "timestamp-millis"}}
|]}""".stripMargin
)
) shouldBe true

ParquetAvroIO.containsLogicalType(
schemaParser.parse(
"""{"type":"record", "name":"SomeRecord3", "fields":[
|{"name":"someField","type": {"type": "array", "items": "SomeRecord2"}}
|]}""".stripMargin
)
) shouldBe true

ParquetAvroIO.containsLogicalType(
schemaParser.parse(
"""{"type":"record", "name":"SomeRecord4", "fields":[
|{"name":"someField","type": {"type": "map", "values": "SomeRecord2"}}
|]}""".stripMargin
)
) shouldBe true

ParquetAvroIO.containsLogicalType(
schemaParser.parse(
"""{"type":"record", "name":"SomeRecord5", "fields":[
|{"name":"someField","type":["null", {"type": "long", "logicalType": "timestamp-millis"}]}
|]}""".stripMargin
)
) shouldBe true
}
}

object ParquetTestJob {
Expand Down
10 changes: 0 additions & 10 deletions scio-smb/src/main/resources/core-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,4 @@
<name>fs.gs.inputstream.fadvise</name>
<value>RANDOM</value>
</property>
<property>
<!-- Supplies logical type support for writes -->
<name>parquet.avro.write.data.supplier</name>
<value>org.apache.beam.sdk.extensions.smb.AvroLogicalTypeSupplier</value>
</property>
<property>
<!-- Supplies logical type support for reads -->
<name>parquet.avro.data.supplier</name>
<value>org.apache.beam.sdk.extensions.smb.AvroLogicalTypeSupplier</value>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroDataSupplier;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand Down Expand Up @@ -119,8 +123,15 @@ public void testSpecificRecord() throws Exception {

@Test
public void testLogicalTypes() throws Exception {
final Configuration conf = new Configuration();
conf.setClass(
AvroWriteSupport.AVRO_DATA_SUPPLIER, AvroLogicalTypeSupplier.class, AvroDataSupplier.class);
conf.setClass(
AvroReadSupport.AVRO_DATA_SUPPLIER, AvroLogicalTypeSupplier.class, AvroDataSupplier.class);

final ParquetAvroFileOperations<TestLogicalTypes> fileOperations =
ParquetAvroFileOperations.of(TestLogicalTypes.getClassSchema());
ParquetAvroFileOperations.of(
TestLogicalTypes.getClassSchema(), CompressionCodecName.UNCOMPRESSED, conf);
final ResourceId file =
fromFolder(output)
.resolve("file.parquet", ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
Expand Down
45 changes: 45 additions & 0 deletions site/src/main/paradox/extras/Sort-Merge-Bucket.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,51 @@ mySchema
)
```

## Parquet

SMB supports Parquet reads and writes in both Avro and case class formats.

If you're using Parquet-Avro and your schema contains a _logical type_, you'll have to opt in to a logical type _supplier_
in your Parquet `Configuration` parameter:

```scala mdoc:reset
import org.apache.avro.specific.SpecificRecordBase

import org.apache.beam.sdk.extensions.smb.{AvroLogicalTypeSupplier, ParquetAvroSortedBucketIO}
import org.apache.beam.sdk.values.TupleTag
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.avro.{AvroDataSupplier, AvroReadSupport, AvroWriteSupport}
import com.spotify.scio.avro.TestRecord

// Reads
val readConf = new Configuration()
readConf.setClass(AvroReadSupport.AVRO_DATA_SUPPLIER, classOf[AvroLogicalTypeSupplier], classOf[AvroDataSupplier])

ParquetAvroSortedBucketIO
.read[TestRecord](new TupleTag[TestRecord], classOf[TestRecord])
.withConfiguration(readConf)

// Writes
val writeConf = new Configuration()
writeConf.setClass(AvroWriteSupport.AVRO_DATA_SUPPLIER, classOf[AvroLogicalTypeSupplier], classOf[AvroDataSupplier])

ParquetAvroSortedBucketIO
.write(classOf[String], "myKeyField", classOf[TestRecord])
.withConfiguration(writeConf)

// Transforms
val transformConf = new Configuration()
transformConf.setClass(AvroReadSupport.AVRO_DATA_SUPPLIER, classOf[AvroLogicalTypeSupplier], classOf[AvroDataSupplier])
transformConf.setClass(AvroWriteSupport.AVRO_DATA_SUPPLIER, classOf[AvroLogicalTypeSupplier], classOf[AvroDataSupplier])

ParquetAvroSortedBucketIO
.transformOutput(classOf[String], "myKeyField", classOf[TestRecord])
.withConfiguration(transformConf)
```

Note that if you're using a non-default Avro version (i.e. Avro 1.11), you'll have to supply a custom logical type supplier
using Avro 1.11 classes. See @ref:[Logical Types in Parquet](../io/Parquet.md#logical-types) for more information.

## Tuning parameters for SMB transforms

### numBuckets/numShards
Expand Down
116 changes: 111 additions & 5 deletions site/src/main/paradox/io/Parquet.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,81 @@ def yourAvroSchema: org.apache.avro.Schema = ???
def result = input.saveAsParquetAvroFile("gs://path-to-data/lake/output", schema = yourAvroSchema)
```

### Logical Types

If your Avro schema contains a logical type, you'll need to supply an additional Configuration parameter for your reads and writes.

If you're using the default version of Avro (1.8), you can use Scio's pre-built logical type conversions:

```scala mdoc:reset
import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.parquet.avro._
import com.spotify.scio.avro.TestRecord

val sc: ScioContext = ScioContext()
val data: SCollection[TestRecord] = sc.parallelize(List[TestRecord]())

// Reads
import com.spotify.scio.parquet.ParquetConfiguration

import org.apache.parquet.avro.AvroReadSupport

sc.parquetAvroFile(
"somePath",
conf = ParquetConfiguration.of(AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier])
)

// Writes
import org.apache.parquet.avro.AvroWriteSupport

data.saveAsParquetAvroFile(
"somePath",
conf = ParquetConfiguration.of(AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier])
)
```

(If you're using `scio-smb`, you can use the provided class `org.apache.beam.sdk.extensions.smb.AvroLogicalTypeSupplier` instead.)

If you're using Avro 1.11, you'll have to create your own logical type supplier class, as Scio's `LogicalTypeSupplier` uses
classes present in Avro 1.8 but not 1.11. A sample Avro 1.11 logical-type supplier might look like:

```scala
import org.apache.avro.Conversions;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;
import org.apache.avro.specific.SpecificData;
import org.apache.parquet.avro.AvroDataSupplier;

case class AvroLogicalTypeSupplier() extends AvroDataSupplier {
override def get(): GenericData = {
val specificData = SpecificData.get()

// Add conversions as needed
specificData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion())

specificData
}
}
```

Then, you'll have to specify your logical type supplier class in your `Configuration` as outlined above.

## Case classes

Scio uses [magnolify-parquet](https://github.com/spotify/magnolify/blob/master/docs/parquet.md) to derive Parquet reader and writer for case classes at compile time, similar to how @ref:[coders](../internals/Coders.md) work. See this [mapping table](https://github.com/spotify/magnolify/blob/master/docs/mapping.md) for how Scala and Parquet types map.
Scio uses [magnolify-parquet](https://github.com/spotify/magnolify/blob/master/docs/parquet.md) to derive Parquet reader and writer for case classes at compile time, similar to how @ref:[coders](../internals/Coders.md) work. See this [mapping table](https://github.com/spotify/magnolify/blob/master/docs/mapping.md) for how Scala and Parquet types map; enum type mapping is also specifically [documented](https://github.com/spotify/magnolify/blob/main/docs/enums.md).

### Read Parquet files as case classes

When reading Parquet files as case classes, all fields in the case class definition are read. Therefore, it's desirable to construct a case class type with only fields needed for processing.

Starting in Magnolify 0.4.8 (corresponding to Scio 0.11.6 and above), predicates for case classes have Magnolify support at the _field level only_. You can use Parquet's `FilterApi.or` and `FilterApi.and` to chain them:

```scala mdoc:reset:silent
import com.spotify.scio._
import com.spotify.scio.parquet.types._
import magnolify.parquet._
import org.apache.parquet.filter2.predicate.FilterApi

object ParquetJob {
case class MyRecord(int_field: Int, string_field: String)
Expand All @@ -131,7 +195,10 @@ object ParquetJob {

val (sc, args) = ContextAndArgs(cmdlineArgs)

sc.typedParquetFile[MyRecord]("input.parquet")
sc.typedParquetFile[MyRecord]("input.parquet", predicate = FilterApi.and(
Predicate.onField[String]("string_field")(_.startsWith("a")),
Predicate.onField[Int]("int_field")(_ % 2 == 0))
)

sc.run()
()
Expand Down Expand Up @@ -177,14 +244,53 @@ import magnolify.parquet.ParquetArray.AvroCompat._

The same Avro schema evolution principles apply to Parquet, i.e. only append `OPTIONAL` or `REPEATED` fields with default `null` or `[]`. See this [test](https://github.com/spotify/magnolify/blob/master/parquet/src/test/scala/magnolify/parquet/test/SchemaEvolutionSuite.scala) for some common scenarios w.r.t. Parquet schema evolution.

## Performance Tuning
## Configuring Parquet

The Parquet Java [library](https://github.com/apache/parquet-mr) heavily relies on Hadoop's `Job` API. Therefore, in both the Parquet library and in scio-parquet, we use Hadoop's [Configuration](https://hadoop.apache.org/docs/r2.10.2/api/org/apache/hadoop/conf/Configuration.html) class to manage most Parquet read and write options.

The `Configuration` class, when initialized, will load default values from the first available `core-site.xml` found on the classpath. Scio-parquet provides a default [`core-site.xml` implementation](https://github.com/spotify/scio/blob/main/scio-parquet/src/main/resources/core-site.xml): if your Scio pipeline has a dependency on `scio-parquet`, these default options will be picked up in your pipeline.

### Overriding the Default Configuration

You can override the default configuration in two ways:

1. Declare a `core-site.xml` file of your own in your project's `src/main/resources` folder. Note that Hadoop can only pick one `core-site.xml` to read: if you override the file in your project, Hadoop will not read Scio's default `core-site.xml` at all, and none of its default options will be loaded.

2. Create an in-memory `Configuration` object for use with scio-parquet's `ReadParam` and `WriteParam`. Any options provided this way will be _appended_ to Scio's default configuration.

You can create and pass in a custom Configuration using our `ParquetConfiguration` helper, available in Scio 0.12.x and above:

```scala
import com.spotify.scio.parquet.ParquetConfiguration

data
.saveAsParquetAvroFile(args("output"), conf = ParquetConfiguration.of("parquet.block.size" -> 536870912))
```

Some tunings might be required when writing Parquet files to maximize the read performance. Some of the Parquet settings can be configured via Hadoop [core-site.xml](https://github.com/spotify/scio/blob/master/scio-parquet/src/main/resources/core-site.xml) or `Configuration` argument.
If you're on Scio 0.11.x or below, you'll have to create a `Configuration` object directly:

- `parquet.block.size` - This determines block size for HDFS and row group size. 1 GiB is recommended over the default 128 MiB.
```scala
import org.apache.hadoop.conf.Configuration

val parquetConf: Configuration = {
val conf: Configuration = new Configuration()
conf.setInt("parquet.block.size", 536870912)
conf
}
```

### Common Configuration Options

- `parquet.block.size` - This determines block size for HDFS and row group size. 1 GiB is recommended over the default 128 MiB, although you'll have to weigh the tradeoffs: a larger block size means fewer seek operations on blob storage, at the cost of having to load a larger row group into memory.
- `fs.gs.inputstream.fadvise` - Parquet relies heavily on random seeks so this GCS connector setting should be set to `RANDOM`. See this [blog post](https://cloud.google.com/blog/products/data-analytics/new-release-of-cloud-storage-connector-for-hadoop-improving-performance-throughput-and-more) for more.

Here are some other recommended settings.

- `numShards` - This should be explicitly set so that the size of each output file is smaller than but close to `parquet.block.size`, i.e. 1 GiB. This guarantees that each file contains 1 row group only and reduces seeks.
- `compression` - `SNAPPY` and `GZIP` work out of the box. Snappy is less CPU intensive but has lower compression ratio. In our benchmarks GZIP seem to work better on GCS.

## Parquet Reads in Scio 0.12.0+

Parquet read internals have been reworked in Scio 0.12.0. As of 0.12.0, you can opt-into the new Parquet read implementation,
backed by the new Beam [SplittableDoFn](https://beam.apache.org/blog/splittable-do-fn/) API, by following the instructions
@ref:[here](../migrations/v0.12.0-Migration-Guide.md#parquet-reads).

0 comments on commit 3044403

Please sign in to comment.