Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add helper methods for Parquet ReadFiles transforms #4801

Merged
merged 13 commits into from
Jun 20, 2023

Conversation

clairemcginty
Copy link
Contributor

@clairemcginty clairemcginty commented May 8, 2023

Adds helper methods to create Parquet ReadFiles transforms.

Motivation: I realized it was really hard for users to construct a correct Parquet ReadFiles for Avro because the required configuration is so specific. So I added helpers, designed to be used in a similar way as Beam's AvroIO.readFiles:

// beam AvroIO.readFiles
sc.parallelize(files).readFiles(BAvroIO.readFiles(schema))

// Scio Parquet ReadFiles
sc.parallelize(files).readFiles(ParquetRead.readAvroGenericRecordFiles(schema))

feedback on the method naming/placement welcome -- I guess ParquetRead.readAvroGenericRecordFiles is kind of redundant?

@clairemcginty clairemcginty changed the title Parquet sdf projection Add helper methods for Parquet ReadFiles transforms May 8, 2023
@@ -43,8 +58,92 @@ object ParquetRead {
readSupportFactory: ReadSupportFactory[T],
conf: SerializableConfiguration,
projectionFn: SerializableFunction[T, R]
): ParDo.SingleOutput[ReadableFile, R] = {
): PTransform[PCollection[ReadableFile], PCollection[R]] = {
Copy link
Contributor Author

@clairemcginty clairemcginty May 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will technically break MiMa :( but on the bright side, I think this method has basically zero adoption, so it should be ok. The reason is that ParDo.of(..) constructs a PTrasnform[PCollection[_ <: ReadableFile], ...]. Thus, the pre-existing signature didn't work when used with SCollection#readFiles as the signatures didn't match.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe candidate to 0.13?

@codecov
Copy link

codecov bot commented May 8, 2023

Codecov Report

Merging #4801 (306d57c) into main (f32058c) will increase coverage by 0.11%.
The diff coverage is 86.29%.

❗ Current head 306d57c differs from pull request most recent head 968fd72. Consider uploading reports for the commit 968fd72 to get more accurate results

@@            Coverage Diff             @@
##             main    #4801      +/-   ##
==========================================
+ Coverage   62.84%   62.96%   +0.11%     
==========================================
  Files         282      282              
  Lines       10573    10641      +68     
  Branches      795      779      -16     
==========================================
+ Hits         6645     6700      +55     
- Misses       3928     3941      +13     
Impacted Files Coverage Δ
...la/com/spotify/scio/testing/BigtableMatchers.scala 100.00% <ø> (ø)
...ain/scala/com/spotify/scio/coders/BeamCoders.scala 65.11% <72.22%> (-11.08%) ⬇️
...la/com/spotify/scio/parquet/read/ParquetRead.scala 84.09% <82.05%> (-15.91%) ⬇️
...potify/scio/estimators/ApproxDistinctCounter.scala 100.00% <100.00%> (ø)
...ify/scio/values/PairHashSCollectionFunctions.scala 100.00% <100.00%> (ø)
...y/scio/extra/hll/sketching/SketchHllPlusPlus.scala 93.33% <100.00%> (ø)
...cala/com/spotify/scio/grpc/SCollectionSyntax.scala 100.00% <100.00%> (ø)

... and 2 files with indirect coverage changes

@clairemcginty
Copy link
Contributor Author

on second thought, maybe I can move these methods into ParquetAvroIO and ParquetTypeIO objects, so they can be accessed i.e. ParquetAvroIO.readFiles(schema)... that's more analogous to Beam-style transform constructors, I think

@@ -43,8 +58,92 @@ object ParquetRead {
readSupportFactory: ReadSupportFactory[T],
conf: SerializableConfiguration,
projectionFn: SerializableFunction[T, R]
): ParDo.SingleOutput[ReadableFile, R] = {
): PTransform[PCollection[ReadableFile], PCollection[R]] = {
val sdf = new ParquetReadFn[T, R](readSupportFactory, conf, projectionFn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so readFiles API is using only splittable reads, right? does it perform the same as legacy on runner v1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it's a little less performant on RunnerV1 (doesn't scale up as well).

)

def readTyped[T: ClassTag: Coder: ParquetType, R](
projectionFn: T => R,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 2 APIs and not setting identity as default projectionFn ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make the API friendlier for users who aren't supplying a projection, so they con't have to specify their T type twice


def readAvroGenericRecordFiles[T](
schema: Schema,
projectionFn: GenericRecord => T,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not look to be a projection: We are reading the whole schema data even if the function selects a single field of the GenericRecord.

Option(predicate).foreach(p => ParquetInputFormat.setFilterPredicate(configuration, p))

val cleanedFn = Functions.serializableFn(ClosureCleaner.clean(projectionFn))
readFiles(ReadSupportFactory.typed[T], new SerializableConfiguration(configuration), cleanedFn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure the projectionFn is doing what it is supposed to.
I checked in the ParquetReadFn here and it seems to me it is the equivalent of a chained map step.

IMHO we should either:

  • simplify the API by removing projection and let user do a map post read (I am also surprised here we do not have an implicit Coder[R] for the output type)
  • ask user to create a model T which is a projection of the model used when writing the parquet file

Copy link
Contributor Author

@clairemcginty clairemcginty May 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: implicit Coder[R] -- that would be required when the ReadFiles transform is actually applied, here: https://github.com/spotify/scio/blob/main/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala#L1396

(Maybe it makes sense to move these methods to the respective avro/typed SCollectionSyntax traits, and have the method actually apply the transform to an SCollection? that way we could set the Coder...)

will circle back on the projectionFn, I want to check something...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, so for the projection -- the main advantage of offering the projection, as I understand it, is that it happens pre-Coder#encode.

I think that for GenericRecords, this doesn't offer any advantage, as long as the user correctly specifies an implicit Coder using the projected schema -- see unit test.

But for SpecificRecords, say you have a schema with two non-nullable fields, but you only want to project one of those fields. Without being able to apply a projectionFn pre-Coder#encode, serialization will fail due to the absence of a non-nullable field. I just updated the unit test to use Account as a test record, which has non-nullable fields; the SpecificRecord test w/ no projectionFn is currently failing.

So overall, I think that:

a. ProjectionFn support for GenericRecord reads isn't important, and could be removed; however, I'm worried that users will be confused about setting the correct Coder here.
b. ProjectionFn support should be a requirement for SpecificRecord reads if your projection excludes non-nullable, non-default fields.

Comment on lines +128 to +129
projection: Schema,
projectionFn: T => R,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don're really like those 2 to be separated as they are strongly coupled.
If we want to offer projection, I think we better have to rework the Projection helper so it encapsulate both the mapping function and the schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to rework the Parquet-Avro ReadParam to make this more clear! I think that it depends a bit if we'd like to include this change in a 0.12 release (I can revert the readFiles signature changes to make MiMa happy), in which case it should be left as-is, or if we'd like to save it for 0.13.

Comment on lines +141 to +142
predicate: FilterPredicate,
conf: Configuration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we create method overload to avoid setting those at null ?

Copy link
Contributor Author

@clairemcginty clairemcginty Jun 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been thinking about that -- my concern is that overloading might make things more confusing from user POV. Say we have:

def readAvro[T <: SpecificRecord: ClassTag](): PTransform[PCollection[ReadableFile], PCollection[T]] =
    readAvro[T](null, null, null)

def readAvro[T <: SpecificRecord: ClassTag](
    projection: Schema,
    predicate: FilterPredicate,
    conf: Configuration
  ): PTransform[PCollection[ReadableFile], PCollection[T]] = ...

In this case, users can either pass 0 arguments, or all 3. But I think it's somewhat common use case to pass in only a projection, or only a predicate -- and that would be more difficult for the user if we went with the overloaded API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant we can create overloads for all possibilities

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 sounds good. thanks for updating the PR!

Comment on lines +200 to +201
predicate: FilterPredicate,
conf: Configuration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here ?

Comment on lines +97 to +98
predicate: FilterPredicate,
conf: Configuration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here

@RustedBones RustedBones added this to the 0.13.0 milestone Jun 20, 2023
@clairemcginty clairemcginty merged commit 508f5f4 into main Jun 20, 2023
@clairemcginty clairemcginty deleted the parquet_sdf_projection branch June 20, 2023 13:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants