Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Parquet to 0.13.1 #5175

Merged
merged 5 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ val metricsVersion = "4.2.23"
val neo4jDriverVersion = "4.4.13"
val ndArrayVersion = "0.3.3"
val parquetExtraVersion = "0.4.3"
val parquetVersion = "1.12.3"
val parquetVersion = "1.13.1"
val pprintVersion = "0.8.1"
val protobufGenericVersion = "0.2.9"
val scalacheckVersion = "1.17.0"
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,6 @@ final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[
val isAssignable = classOf[SpecificRecord].isAssignableFrom(cls)
val writerSchema = if (isAssignable) ReflectData.get().getSchema(cls) else params.schema
val conf = ParquetConfiguration.ofNullable(params.conf)
if (
conf.get(AvroWriteSupport.AVRO_DATA_SUPPLIER) == null && ParquetAvroIO.containsLogicalType(
writerSchema
)
) {
ParquetAvroIO.log.warn(
s"Detected a logical type in schema `$writerSchema`, but Configuration key `${AvroWriteSupport.AVRO_DATA_SUPPLIER}`" +
s"was not set to a logical type supplier. See https://spotify.github.io/scio/io/Parquet.html#logical-types for more information."
)
}

data.applyInternal(
parquetOut(
Expand Down Expand Up @@ -186,18 +176,6 @@ object ParquetAvroIO {
def read(sc: ScioContext, path: String)(implicit coder: Coder[T]): SCollection[T] = {
val jobConf = ParquetConfiguration.ofNullable(conf)

if (
jobConf.get(AvroReadSupport.AVRO_DATA_SUPPLIER) == null && ParquetAvroIO
.containsLogicalType(
readSchema
)
) {
log.warn(
s"Detected a logical type in schema `$readSchema`, but Configuration key `${AvroReadSupport.AVRO_DATA_SUPPLIER}`" +
s"was not set to a logical type supplier. See https://spotify.github.io/scio/io/Parquet.html#logical-types for more information."
)
}

// Needed to make GenericRecord read by parquet-avro work with Beam's
// org.apache.beam.sdk.extensions.avro.coders.AvroCoder
if (!isSpecific) {
Expand Down Expand Up @@ -283,16 +261,6 @@ object ParquetAvroIO {
}
}

private[avro] def containsLogicalType(s: Schema): Boolean = {
s.getLogicalType != null || (s.getType match {
case Schema.Type.RECORD => s.getFields.asScala.exists(f => containsLogicalType(f.schema()))
case Schema.Type.ARRAY => containsLogicalType(s.getElementType)
case Schema.Type.UNION => s.getTypes.asScala.exists(t => containsLogicalType(t))
case Schema.Type.MAP => containsLogicalType(s.getValueType)
case _ => false
})
}

object WriteParam {
val DefaultSchema: Schema = null
val DefaultNumShards: Int = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class TestWriterUtils extends AnyFlatSpec with Matchers with BeforeAndAfterAll {
assertColumn(
columnEncodings(4),
"account_status",
hasBloomFilter = true,
hasBloomFilter = false,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bloom filters are no longer generated when all pages of a column have a dictionary encoding, see: apache/parquet-java@4e9e79c

Seq(Encoding.BIT_PACKED, Encoding.RLE, Encoding.PLAIN_DICTIONARY)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,21 +151,13 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll {
val sc1 = ScioContext()
sc1
.parallelize(records)
.saveAsParquetAvroFile(
path = dir.getAbsolutePath,
conf = ParquetConfiguration.of(
AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
)
)
.saveAsParquetAvroFile(path = dir.getAbsolutePath)
sc1.run()

val sc2 = ScioContext()
sc2
.parquetAvroFile[TestLogicalTypes](
path = dir.getAbsolutePath,
conf = ParquetConfiguration.of(
AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
),
suffix = ".parquet"
)
.map(identity) should containInAnyOrder(records)
Expand Down Expand Up @@ -195,10 +187,7 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll {
.parallelize(records)
.saveAsParquetAvroFile(
path = dir.getAbsolutePath,
schema = TestLogicalTypes.SCHEMA$,
conf = ParquetConfiguration.of(
AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
)
schema = TestLogicalTypes.SCHEMA$
)
sc1.run()

Expand All @@ -207,9 +196,6 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll {
.parquetAvroFile[GenericRecord](
path = dir.getAbsolutePath,
projection = TestLogicalTypes.SCHEMA$,
conf = ParquetConfiguration.of(
AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
),
suffix = ".parquet"
)
.map(identity) should containInAnyOrder(records)
Expand Down Expand Up @@ -444,48 +430,6 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll {
.output(TextIO("output"))(_ should containSingleValue(("foo", 2.0).toString))
.run()
}

it should "detect logical types in schemas" in {
val schemaParser = new Schema.Parser()

ParquetAvroIO.containsLogicalType(
schemaParser.parse(
"""{"type":"record", "name":"SomeRecord1", "fields":[{"name":"someField","type":"string"}]}"""
)
) shouldBe false

ParquetAvroIO.containsLogicalType(
schemaParser.parse(
"""{"type":"record", "name":"SomeRecord2", "fields":[
|{"name":"someField","type":{"type": "long", "logicalType": "timestamp-millis"}}
|]}""".stripMargin
)
) shouldBe true

ParquetAvroIO.containsLogicalType(
schemaParser.parse(
"""{"type":"record", "name":"SomeRecord3", "fields":[
|{"name":"someField","type": {"type": "array", "items": "SomeRecord2"}}
|]}""".stripMargin
)
) shouldBe true

ParquetAvroIO.containsLogicalType(
schemaParser.parse(
"""{"type":"record", "name":"SomeRecord4", "fields":[
|{"name":"someField","type": {"type": "map", "values": "SomeRecord2"}}
|]}""".stripMargin
)
) shouldBe true

ParquetAvroIO.containsLogicalType(
schemaParser.parse(
"""{"type":"record", "name":"SomeRecord5", "fields":[
|{"name":"someField","type":["null", {"type": "long", "logicalType": "timestamp-millis"}]}
|]}""".stripMargin
)
) shouldBe true
}
}

object ParquetTestJob {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroDataSupplier;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand Down Expand Up @@ -128,10 +126,6 @@ public void testSpecificRecord() throws Exception {
@Test
public void testLogicalTypes() throws Exception {
final Configuration conf = new Configuration();
conf.setClass(
AvroWriteSupport.AVRO_DATA_SUPPLIER, AvroLogicalTypeSupplier.class, AvroDataSupplier.class);
conf.setClass(
AvroReadSupport.AVRO_DATA_SUPPLIER, AvroLogicalTypeSupplier.class, AvroDataSupplier.class);

final ParquetAvroFileOperations<TestLogicalTypes> fileOperations =
ParquetAvroFileOperations.of(
Expand Down
7 changes: 4 additions & 3 deletions site/src/main/paradox/extras/Sort-Merge-Bucket.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,11 @@ mySchema

SMB supports Parquet reads and writes in both Avro and case class formats.

If you're using Parquet-Avro and your schema contains a _logical type_, you'll have to opt in to a logical type _supplier_
in your Parquet `Configuration` parameter:
As of **Scio 0.14.0** and above, Scio supports logical types in parquet-avro out of the box.

```scala mdoc:reset
Earlier versions of Scio require you to manually supply a _logical type supplier_ in your Parquet `Configuration` parameter:

```scala mdoc:fail:silent
import org.apache.avro.specific.SpecificRecordBase

import org.apache.beam.sdk.extensions.smb.{AvroLogicalTypeSupplier, ParquetAvroSortedBucketIO}
Expand Down
6 changes: 4 additions & 2 deletions site/src/main/paradox/io/Parquet.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,13 @@ def result = input.saveAsParquetAvroFile("gs://path-to-data/lake/output", schema

### Logical Types

If your Avro schema contains a logical type, you'll need to supply an additional Configuration parameter for your reads and writes.
As of **Scio 0.14.0** and above, Scio supports logical types in parquet-avro out of the box.

If you're on an earlier version of Scio and your Avro schema contains a logical type, you'll need to supply an additional Configuration parameter for your reads and writes.

If you're using the default version of Avro (1.8), you can use Scio's pre-built logical type conversions:

```scala mdoc:compile-only
```scala mdoc:fail:silent
import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.parquet.avro._
Expand Down