From 5a8e42a77753edd51b75407b04555290cd97846a Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Mon, 27 Feb 2023 14:47:44 -0500 Subject: [PATCH 1/4] Make ZSTD default compression for Parquet writes --- .../scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala | 2 +- .../com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala | 2 +- .../scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala | 2 +- .../beam/sdk/extensions/smb/ParquetAvroFileOperations.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala index b390468798..9755925306 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala @@ -303,7 +303,7 @@ object ParquetAvroIO { private[scio] val DefaultSchema = null private[scio] val DefaultNumShards = 0 private[scio] val DefaultSuffix = ".parquet" - private[scio] val DefaultCompression = CompressionCodecName.GZIP + private[scio] val DefaultCompression = CompressionCodecName.ZSTD private[scio] val DefaultConfiguration = null private[scio] val DefaultShardNameTemplate = null private[scio] val DefaultTempDirectory = null diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala index f551bb112e..3480060e6b 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala @@ -221,7 +221,7 @@ object ParquetExampleIO { object WriteParam { private[tensorflow] val DefaultNumShards = 0 private[tensorflow] val DefaultSuffix = ".parquet" - private[tensorflow] val DefaultCompression = CompressionCodecName.GZIP + private[tensorflow] val DefaultCompression = CompressionCodecName.ZSTD private[tensorflow] val DefaultConfiguration = null private[tensorflow] val DefaultShardNameTemplate = null private[tensorflow] val DefaultTempDirectory = null diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala index 5d7cbc23c4..b8a2a267ac 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala @@ -188,7 +188,7 @@ object ParquetTypeIO { object WriteParam { private[scio] val DefaultNumShards = 0 private[scio] val DefaultSuffix = ".parquet" - private[scio] val DefaultCompression = CompressionCodecName.GZIP + private[scio] val DefaultCompression = CompressionCodecName.ZSTD private[scio] val DefaultConfiguration = null private[scio] val DefaultShardNameTemplate = null private[scio] val DefaultTempDirectory = null diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java index 880486e6af..0892f1df4b 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java @@ -51,7 +51,7 @@ * Avro records. */ public class ParquetAvroFileOperations extends FileOperations { - static final CompressionCodecName DEFAULT_COMPRESSION = CompressionCodecName.GZIP; + static final CompressionCodecName DEFAULT_COMPRESSION = CompressionCodecName.ZSTD; private final SerializableSchemaSupplier schemaSupplier; private final CompressionCodecName compression; private final SerializableConfiguration conf; From a12516db6b77d5163172ecf176362fb719016112 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Mon, 27 Feb 2023 15:04:42 -0500 Subject: [PATCH 2/4] Update site --- site/src/main/paradox/io/Parquet.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/site/src/main/paradox/io/Parquet.md b/site/src/main/paradox/io/Parquet.md index 89adc0de99..009065e8f2 100644 --- a/site/src/main/paradox/io/Parquet.md +++ b/site/src/main/paradox/io/Parquet.md @@ -287,7 +287,9 @@ val parquetConf: Configuration = { Here are some other recommended settings. - `numShards` - This should be explicitly set so that the size of each output file is smaller than but close to `parquet.block.size`, i.e. 1 GiB. This guarantees that each file contains 1 row group only and reduces seeks. -- `compression` - `SNAPPY` and `GZIP` work out of the box. Snappy is less CPU intensive but has lower compression ratio. In our benchmarks GZIP seem to work better on GCS. +- `compression` - Parquet defaults to ZSTD compression with a level of 3; compression level can be set to any integer from 1-22 using the configuration option `parquet.compression.codec.zstd.level`. `SNAPPY` and `GZIP` compression types also work out of the box; Snappy is less CPU intensive but has lower compression ratio. In our benchmarks GZIP seem to work better than Snappy on GCS. + +A full list of Parquet configuration options can be found [here](https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/README.md). ## Parquet Reads in Scio 0.12.0+ From 8ee316089bafb0c9c17dcf9c25d187266242d252 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Mon, 27 Feb 2023 15:30:24 -0500 Subject: [PATCH 3/4] Update tests --- .../beam/sdk/extensions/smb/JsonFileOperationsTest.java | 2 +- .../sdk/extensions/smb/ParquetAvroFileOperationsTest.java | 4 ++-- .../beam/sdk/extensions/smb/TensorFlowFileOperationsTest.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/JsonFileOperationsTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/JsonFileOperationsTest.java index 59bfef14b9..d58618a3a1 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/JsonFileOperationsTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/JsonFileOperationsTest.java @@ -47,7 +47,7 @@ public void testUncompressed() throws Exception { @Test public void testCompression() throws Exception { - test(Compression.GZIP); + test(Compression.ZSTD); } private void test(Compression compression) throws Exception { diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperationsTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperationsTest.java index 1c2447c335..75cc5f20e7 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperationsTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperationsTest.java @@ -217,13 +217,13 @@ public void testDisplayData() { MatcherAssert.assertThat( displayData, hasDisplayItem("compression", Compression.UNCOMPRESSED.toString())); MatcherAssert.assertThat( - displayData, hasDisplayItem("compressionCodecName", CompressionCodecName.GZIP.name())); + displayData, hasDisplayItem("compressionCodecName", CompressionCodecName.ZSTD.name())); MatcherAssert.assertThat(displayData, hasDisplayItem("schema", USER_SCHEMA.getFullName())); } private void writeFile(ResourceId file) throws IOException { final ParquetAvroFileOperations fileOperations = - ParquetAvroFileOperations.of(USER_SCHEMA, CompressionCodecName.GZIP); + ParquetAvroFileOperations.of(USER_SCHEMA, CompressionCodecName.ZSTD); final FileOperations.Writer writer = fileOperations.createWriter(file); for (GenericRecord record : USER_RECORDS) { writer.write(record); diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TensorFlowFileOperationsTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TensorFlowFileOperationsTest.java index 04877fa734..3427401e2f 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TensorFlowFileOperationsTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TensorFlowFileOperationsTest.java @@ -53,7 +53,7 @@ public void testUncompressed() throws Exception { @Test public void testCompressed() throws Exception { - test(Compression.GZIP); + test(Compression.ZSTD); } private void test(Compression compression) throws Exception { From 3eac71fc5868058fc4d463a8ea08b917592bb33c Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Mon, 27 Feb 2023 15:32:15 -0500 Subject: [PATCH 4/4] Update ParquetTypeFileOperations --- .../beam/sdk/extensions/smb/ParquetTypeFileOperations.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeFileOperations.scala b/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeFileOperations.scala index a84aafc0d0..0dee032f13 100644 --- a/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeFileOperations.scala +++ b/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeFileOperations.scala @@ -33,7 +33,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName import java.nio.channels.{ReadableByteChannel, WritableByteChannel} object ParquetTypeFileOperations { - val DefaultCompression = CompressionCodecName.GZIP + val DefaultCompression = CompressionCodecName.ZSTD val DefaultConfiguration: Configuration = null def apply[T: Coder: ParquetType](): ParquetTypeFileOperations[T] = apply(DefaultCompression)