Skip to content

Commit

Permalink
Remove logical type workarounds for parquet-avro
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty committed Jan 17, 2024
1 parent ac71a21 commit 971d4a8
Show file tree
Hide file tree
Showing 5 changed files with 4 additions and 180 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -120,16 +120,6 @@ final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[
val isAssignable = classOf[SpecificRecord].isAssignableFrom(cls)
val writerSchema = if (isAssignable) ReflectData.get().getSchema(cls) else params.schema
val conf = ParquetConfiguration.ofNullable(params.conf)
if (
conf.get(AvroWriteSupport.AVRO_DATA_SUPPLIER) == null && ParquetAvroIO.containsLogicalType(
writerSchema
)
) {
ParquetAvroIO.log.warn(
s"Detected a logical type in schema `$writerSchema`, but Configuration key `${AvroWriteSupport.AVRO_DATA_SUPPLIER}`" +
s"was not set to a logical type supplier. See https://spotify.github.io/scio/io/Parquet.html#logical-types for more information."
)
}

data.applyInternal(
parquetOut(
Expand Down Expand Up @@ -187,18 +177,6 @@ object ParquetAvroIO {
def read(sc: ScioContext, path: String)(implicit coder: Coder[T]): SCollection[T] = {
val jobConf = ParquetConfiguration.ofNullable(conf)

if (
jobConf.get(AvroReadSupport.AVRO_DATA_SUPPLIER) == null && ParquetAvroIO
.containsLogicalType(
readSchema
)
) {
log.warn(
s"Detected a logical type in schema `$readSchema`, but Configuration key `${AvroReadSupport.AVRO_DATA_SUPPLIER}`" +
s"was not set to a logical type supplier. See https://spotify.github.io/scio/io/Parquet.html#logical-types for more information."
)
}

// Needed to make GenericRecord read by parquet-avro work with Beam's
// org.apache.beam.sdk.coders.AvroCoder.
if (!isSpecific) {
Expand Down Expand Up @@ -289,16 +267,6 @@ object ParquetAvroIO {
}
}

private[avro] def containsLogicalType(s: Schema): Boolean = {
s.getLogicalType != null || (s.getType match {
case Schema.Type.RECORD => s.getFields.asScala.exists(f => containsLogicalType(f.schema()))
case Schema.Type.ARRAY => containsLogicalType(s.getElementType)
case Schema.Type.UNION => s.getTypes.asScala.exists(t => containsLogicalType(t))
case Schema.Type.MAP => containsLogicalType(s.getValueType)
case _ => false
})
}

object WriteParam {
private[scio] val DefaultSchema = null
private[scio] val DefaultNumShards = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,23 +140,13 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll {
val sc1 = ScioContext()
sc1
.parallelize(records)
.saveAsParquetAvroFile(
path,
conf = ParquetConfiguration.of(
AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
)
)
.saveAsParquetAvroFile(path)
sc1.run()
()

val sc2 = ScioContext()
sc2
.parquetAvroFile[TestLogicalTypes](
s"$path/*.parquet",
conf = ParquetConfiguration.of(
AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
)
)
.parquetAvroFile[TestLogicalTypes](s"$path/*.parquet")
.map(identity) should containInAnyOrder(records)

sc2.run()
Expand Down Expand Up @@ -187,10 +177,7 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll {
.parallelize(records)
.saveAsParquetAvroFile(
path,
schema = TestLogicalTypes.SCHEMA$,
conf = ParquetConfiguration.of(
AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
)
schema = TestLogicalTypes.SCHEMA$
)
sc1.run()
()
Expand All @@ -199,10 +186,7 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll {
sc2
.parquetAvroFile[GenericRecord](
s"$path/*.parquet",
projection = TestLogicalTypes.SCHEMA$,
conf = ParquetConfiguration.of(
AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
)
projection = TestLogicalTypes.SCHEMA$
)
.map(identity) should containInAnyOrder(records)

Expand Down Expand Up @@ -454,48 +438,6 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll {
.output(TextIO("output"))(_ should containSingleValue(("foo", 2.0).toString))
.run()
}

it should "detect logical types in schemas" in {
val schemaParser = new Schema.Parser()

ParquetAvroIO.containsLogicalType(
schemaParser.parse(
"""{"type":"record", "name":"SomeRecord1", "fields":[{"name":"someField","type":"string"}]}"""
)
) shouldBe false

ParquetAvroIO.containsLogicalType(
schemaParser.parse(
"""{"type":"record", "name":"SomeRecord2", "fields":[
|{"name":"someField","type":{"type": "long", "logicalType": "timestamp-millis"}}
|]}""".stripMargin
)
) shouldBe true

ParquetAvroIO.containsLogicalType(
schemaParser.parse(
"""{"type":"record", "name":"SomeRecord3", "fields":[
|{"name":"someField","type": {"type": "array", "items": "SomeRecord2"}}
|]}""".stripMargin
)
) shouldBe true

ParquetAvroIO.containsLogicalType(
schemaParser.parse(
"""{"type":"record", "name":"SomeRecord4", "fields":[
|{"name":"someField","type": {"type": "map", "values": "SomeRecord2"}}
|]}""".stripMargin
)
) shouldBe true

ParquetAvroIO.containsLogicalType(
schemaParser.parse(
"""{"type":"record", "name":"SomeRecord5", "fields":[
|{"name":"someField","type":["null", {"type": "long", "logicalType": "timestamp-millis"}]}
|]}""".stripMargin
)
) shouldBe true
}
}

object ParquetTestJob {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroDataSupplier;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand Down Expand Up @@ -124,10 +121,6 @@ public void testSpecificRecord() throws Exception {
@Test
public void testLogicalTypes() throws Exception {
final Configuration conf = new Configuration();
conf.setClass(
AvroWriteSupport.AVRO_DATA_SUPPLIER, AvroLogicalTypeSupplier.class, AvroDataSupplier.class);
conf.setClass(
AvroReadSupport.AVRO_DATA_SUPPLIER, AvroLogicalTypeSupplier.class, AvroDataSupplier.class);

final ParquetAvroFileOperations<TestLogicalTypes> fileOperations =
ParquetAvroFileOperations.of(
Expand Down

0 comments on commit 971d4a8

Please sign in to comment.