-
Notifications
You must be signed in to change notification settings - Fork 513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make Parquet logical type support opt-in #4782
Conversation
Codecov Report
@@ Coverage Diff @@
## main #4782 +/- ##
==========================================
+ Coverage 61.18% 61.26% +0.07%
==========================================
Files 286 286
Lines 10553 10568 +15
Branches 769 767 -2
==========================================
+ Hits 6457 6474 +17
+ Misses 4096 4094 -2
... and 1 file with indirect coverage changes Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. |
@@ -117,6 +119,17 @@ final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[ | |||
override protected def write(data: SCollection[T], params: WriteP): Tap[T] = { | |||
val isAssignable = classOf[SpecificRecordBase].isAssignableFrom(cls) | |||
val writerSchema = if (isAssignable) ReflectData.get().getSchema(cls) else params.schema | |||
val conf = Option(params.conf).getOrElse(new Configuration()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We start to have this defaulting to new Configuration at many places.
IMHO it is better to fallback only once in the path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed! I'll clean up config usage in a follow up PR.
scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala
Outdated
Show resolved
Hide resolved
case class AvroLogicalTypeSupplier() extends AvroDataSupplier { | ||
override def get(): GenericData = { | ||
val specificData = SpecificData.get() | ||
|
||
// Add conversions as needed | ||
specificData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()) | ||
|
||
specificData | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel we should be able to do better that this.
With specific records, all conversions are added to a pre-generated model contained in the static $MODEL
field of the class, and can be accessed with SpecificData.getForClass
in avro 1.9+. It is still possible to get this work with avro 1.8 with some tricks.
This supplier should only be required for records generated with ReflectData
or GeneridData
where conversions must be explicitly given.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did see $MODEL
-- as well as a static field org.apache.avro.Conversion<?>[] conversions
that has everything we need -- but was hesitant to use them because in Avro 1.8 at least, they're both private so I'd have to use reflection on private/maybe unstable fields. It would be ideal to get the conversions directly from the specific record class though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looked into this a bit more!
I think you're right, we can definitely go from T
/Schema
--> a SpecificData
model populated with conversion types by re-using Avro's model-cache logic.
But the complex bit is actually wiring this model in. Parquet allows you to specify a model by either (a) setting .withDataModel()
on the read/write builder, or (b) supplying the Configuration *DATA_SUPPLIER
param. To use our derived SpecificData
model, we'd have to use .withDataModel
directly, I think -- I can't see how we'd make this work with a generic Supplier class that requires a no-arg constructor.
For Parquet writes, we do invoke the write builder directly, so we could take our derived SpecificData
and set it using .withDataModel.
But for reads, we don't get to directly access the AvroParquetReader
and set .withDataModel
--there are too many layers of indirection--so the only way we can specify a data model is to configure the DATA_SUPPLIER
parameter with the name of a Supplier class, which as I mentioned above, I don't know how we could wire our custom data model into.
ee62c78
to
d2579ca
Compare
…uetAvroIO.scala Co-authored-by: Michel Davit <micheld@spotify.com>
* Make logical type support for Parquet-Avro opt-in * Update documentation * conf->jobConf * scalafmt * Update scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala Co-authored-by: Michel Davit <micheld@spotify.com> * Add parens --------- Co-authored-by: Michel Davit <micheld@spotify.com>
* Make logical type support for Parquet-Avro opt-in * Update documentation * conf->jobConf * scalafmt * Update scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala Co-authored-by: Michel Davit <micheld@spotify.com> * Add parens --------- Co-authored-by: Michel Davit <micheld@spotify.com>
* Make logical type support for Parquet-Avro opt-in * Update documentation * conf->jobConf * scalafmt * Update scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala Co-authored-by: Michel Davit <micheld@spotify.com> * Add parens --------- Co-authored-by: Michel Davit <micheld@spotify.com>
* Support logical types in SpecificRecord Parquet writes (#4772) * Reproduce SpecificRecords/logicalType bug * Fix bug * Cleanup test * logical types must be added from worker VM, not launcher * Fix for both reads and writes * cleanup * Don't set supplier if user has explicitly configured one * Test for custom logical type provider * Pass dataModel to AvroParquetWriter.Builder * Only override .withDataModel if Configuration key exists * Isolate SpecificData instances * fix test * update copyright year * cleanup * Add support for scio-smb Parquet writes * Manage config defaults via core-site.xml * Fix formatting * Should be a standalone class * add test * +header * newline * Add remaining Converters * simplify SpecificData creation * Correctly encode logical types in GenericRecord * remove unused import * Make Parquet logical type support opt-in (#4782) * Make logical type support for Parquet-Avro opt-in * Update documentation * conf->jobConf * scalafmt * Update scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala Co-authored-by: Michel Davit <micheld@spotify.com> * Add parens --------- Co-authored-by: Michel Davit <micheld@spotify.com> --------- Co-authored-by: Michel Davit <micheld@spotify.com>
Makes Parquet logical type support opt-in, rather than included by default.
scio-parquet
will warn if a logical type is detected in the schema but no logical type provider is configured.The reasoning for this is that Avro logical type conversion classes are heavily tied to Avro version --
TimeConversions.TimestampConversion
in Avro 1.8 is renamed toTimeConversions.TimestampMillisConversion
in Avro 1.11, for example. So by including these converters by default, we're essentially hard-failing any Avro 1.11 users immediately, even if their schema doesn't have any logical types.