Skip to content

Commit

Permalink
Remove logical type workarounds for parquet-avro
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty committed Jan 17, 2024
1 parent 2f5492c commit db70d18
Show file tree
Hide file tree
Showing 5 changed files with 2 additions and 176 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,6 @@ final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[
val isAssignable = classOf[SpecificRecord].isAssignableFrom(cls)
val writerSchema = if (isAssignable) ReflectData.get().getSchema(cls) else params.schema
val conf = ParquetConfiguration.ofNullable(params.conf)
if (
conf.get(AvroWriteSupport.AVRO_DATA_SUPPLIER) == null && ParquetAvroIO.containsLogicalType(
writerSchema
)
) {
ParquetAvroIO.log.warn(
s"Detected a logical type in schema `$writerSchema`, but Configuration key `${AvroWriteSupport.AVRO_DATA_SUPPLIER}`" +
s"was not set to a logical type supplier. See https://spotify.github.io/scio/io/Parquet.html#logical-types for more information."
)
}

data.applyInternal(
parquetOut(
Expand Down Expand Up @@ -186,18 +176,6 @@ object ParquetAvroIO {
def read(sc: ScioContext, path: String)(implicit coder: Coder[T]): SCollection[T] = {
val jobConf = ParquetConfiguration.ofNullable(conf)

if (
jobConf.get(AvroReadSupport.AVRO_DATA_SUPPLIER) == null && ParquetAvroIO
.containsLogicalType(
readSchema
)
) {
log.warn(
s"Detected a logical type in schema `$readSchema`, but Configuration key `${AvroReadSupport.AVRO_DATA_SUPPLIER}`" +
s"was not set to a logical type supplier. See https://spotify.github.io/scio/io/Parquet.html#logical-types for more information."
)
}

// Needed to make GenericRecord read by parquet-avro work with Beam's
// org.apache.beam.sdk.extensions.avro.coders.AvroCoder
if (!isSpecific) {
Expand Down Expand Up @@ -283,16 +261,6 @@ object ParquetAvroIO {
}
}

private[avro] def containsLogicalType(s: Schema): Boolean = {
s.getLogicalType != null || (s.getType match {
case Schema.Type.RECORD => s.getFields.asScala.exists(f => containsLogicalType(f.schema()))
case Schema.Type.ARRAY => containsLogicalType(s.getElementType)
case Schema.Type.UNION => s.getTypes.asScala.exists(t => containsLogicalType(t))
case Schema.Type.MAP => containsLogicalType(s.getValueType)
case _ => false
})
}

object WriteParam {
val DefaultSchema: Schema = null
val DefaultNumShards: Int = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,21 +151,13 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll {
val sc1 = ScioContext()
sc1
.parallelize(records)
.saveAsParquetAvroFile(
path = dir.getAbsolutePath,
conf = ParquetConfiguration.of(
AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
)
)
.saveAsParquetAvroFile(path = dir.getAbsolutePath)
sc1.run()

val sc2 = ScioContext()
sc2
.parquetAvroFile[TestLogicalTypes](
path = dir.getAbsolutePath,
conf = ParquetConfiguration.of(
AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
),
suffix = ".parquet"
)
.map(identity) should containInAnyOrder(records)
Expand Down Expand Up @@ -195,10 +187,7 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll {
.parallelize(records)
.saveAsParquetAvroFile(
path = dir.getAbsolutePath,
schema = TestLogicalTypes.SCHEMA$,
conf = ParquetConfiguration.of(
AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
)
schema = TestLogicalTypes.SCHEMA$
)
sc1.run()

Expand All @@ -207,9 +196,6 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll {
.parquetAvroFile[GenericRecord](
path = dir.getAbsolutePath,
projection = TestLogicalTypes.SCHEMA$,
conf = ParquetConfiguration.of(
AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
),
suffix = ".parquet"
)
.map(identity) should containInAnyOrder(records)
Expand Down Expand Up @@ -444,48 +430,6 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll {
.output(TextIO("output"))(_ should containSingleValue(("foo", 2.0).toString))
.run()
}

it should "detect logical types in schemas" in {
val schemaParser = new Schema.Parser()

ParquetAvroIO.containsLogicalType(
schemaParser.parse(
"""{"type":"record", "name":"SomeRecord1", "fields":[{"name":"someField","type":"string"}]}"""
)
) shouldBe false

ParquetAvroIO.containsLogicalType(
schemaParser.parse(
"""{"type":"record", "name":"SomeRecord2", "fields":[
|{"name":"someField","type":{"type": "long", "logicalType": "timestamp-millis"}}
|]}""".stripMargin
)
) shouldBe true

ParquetAvroIO.containsLogicalType(
schemaParser.parse(
"""{"type":"record", "name":"SomeRecord3", "fields":[
|{"name":"someField","type": {"type": "array", "items": "SomeRecord2"}}
|]}""".stripMargin
)
) shouldBe true

ParquetAvroIO.containsLogicalType(
schemaParser.parse(
"""{"type":"record", "name":"SomeRecord4", "fields":[
|{"name":"someField","type": {"type": "map", "values": "SomeRecord2"}}
|]}""".stripMargin
)
) shouldBe true

ParquetAvroIO.containsLogicalType(
schemaParser.parse(
"""{"type":"record", "name":"SomeRecord5", "fields":[
|{"name":"someField","type":["null", {"type": "long", "logicalType": "timestamp-millis"}]}
|]}""".stripMargin
)
) shouldBe true
}
}

object ParquetTestJob {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroDataSupplier;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand Down Expand Up @@ -128,10 +125,6 @@ public void testSpecificRecord() throws Exception {
@Test
public void testLogicalTypes() throws Exception {
final Configuration conf = new Configuration();
conf.setClass(
AvroWriteSupport.AVRO_DATA_SUPPLIER, AvroLogicalTypeSupplier.class, AvroDataSupplier.class);
conf.setClass(
AvroReadSupport.AVRO_DATA_SUPPLIER, AvroLogicalTypeSupplier.class, AvroDataSupplier.class);

final ParquetAvroFileOperations<TestLogicalTypes> fileOperations =
ParquetAvroFileOperations.of(
Expand Down

0 comments on commit db70d18

Please sign in to comment.