Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add prefix & suffix param for all IO APIs #4809

Merged
merged 30 commits into from
Jun 7, 2023
Merged

Conversation

RustedBones
Copy link
Contributor

@RustedBones RustedBones commented May 17, 2023

This PR allows all file based IO to define either prefix + shardnameTemplate or filenamePolicySupplier to create files.

To make sure the tap returned by the write method is capturing written files, they must be adapted to match on the file suffix, only deterministic part of the filename.
To do so, all file based IO read are give a suffix param, with null as default with the following behavior:

  • if set, the file pattern is constructed with path as a folder with $path/*$suffix
  • if null, path will be used as before, either as a pattern or a file.
  • if set and path s a pattern (contains a *), an exception is thrown

@RustedBones
Copy link
Contributor Author

ScioIO tap relies on files to be named with fixed part prefix.
IMHO path in the ScioIO must not be a pattern. (This is also a limitation in the SMB IOs).
Matcher should be constructed form prefix + suffix provided in the read param

@RustedBones RustedBones marked this pull request as draft May 19, 2023 13:44
}

final case class WriteParam(
final case class WriteParam private (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you sure nobody is calling it directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only restrict the new constructor, the apply stays public. I think it is for API evolution where we only would have to overload apply in the companion object.

@RustedBones RustedBones changed the title Add prefix param for all IO APIs Add prefix & suffix param for all IO APIs May 24, 2023
@codecov
Copy link

codecov bot commented May 24, 2023

Codecov Report

Merging #4809 (0a4c41c) into main (f3e737e) will increase coverage by 0.18%.
The diff coverage is 88.28%.

❗ Current head 0a4c41c differs from pull request most recent head b501af1. Consider uploading reports for the commit b501af1 to get more accurate results

@@            Coverage Diff             @@
##             main    #4809      +/-   ##
==========================================
+ Coverage   62.54%   62.72%   +0.18%     
==========================================
  Files         281      281              
  Lines       10431    10539     +108     
  Branches      781      775       -6     
==========================================
+ Hits         6524     6611      +87     
- Misses       3907     3928      +21     
Impacted Files Coverage Δ
...m/spotify/scio/avro/syntax/SCollectionSyntax.scala 100.00% <ø> (ø)
...ain/scala/com/spotify/scio/cassandra/package.scala 100.00% <ø> (ø)
...tify/scio/extra/csv/syntax/SCollectionSyntax.scala 66.66% <ø> (ø)
...tify/scio/datastore/syntax/SCollectionSyntax.scala 100.00% <ø> (ø)
...tify/scio/datastore/syntax/ScioContextSyntax.scala 100.00% <ø> (ø)
...m/spotify/scio/jdbc/syntax/SCollectionSyntax.scala 33.33% <ø> (ø)
...m/spotify/scio/jdbc/syntax/ScioContextSyntax.scala 80.00% <ø> (ø)
.../spotify/scio/neo4j/syntax/SCollectionSyntax.scala 66.66% <ø> (ø)
...y/scio/parquet/avro/syntax/SCollectionSyntax.scala 75.00% <ø> (ø)
.../parquet/tensorflow/syntax/SCollectionSyntax.scala 100.00% <ø> (ø)
... and 44 more

@RustedBones RustedBones marked this pull request as ready for review May 25, 2023 08:00
@RustedBones RustedBones added this to the 0.13.0 milestone Jun 2, 2023
@@ -93,8 +91,22 @@ private[scio] object ScioUtil {

private def stripPath(path: String): String = StringUtils.stripEnd(path, "/")
def strippedPath(path: String): String = s"${stripPath(path)}/"
def pathWithPrefix(path: String, filePrefix: String): String = s"${stripPath(path)}/${filePrefix}"
def pathWithPartPrefix(path: String): String = s"${stripPath(path)}/part"
def pathWithPrefix(path: String, prefix: String): String = Option(prefix) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ambivalent about allowing null here

parts.head == 0 && parts.last + 1 == parts.size // xxxxx part
} else {
} else if (writtenShards.isEmpty) {
// assume progress is complete when shard info is not retrieved and files are present
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this all we can do here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was the previous behavior. when user give a custom naming policy/shard template, we do not have this info the the `ReadParameter, thus we can't find shard numbers easily.

Comment on lines +90 to +95
path = path,
destinationFn = destinationFn,
numShards = numShards,
prefix = prefix,
suffix = suffix,
tempDirectory = tempDirectory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the provide the parameter names explicitly here? Did you mix up prefix/suffix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Since API is not too strongly types, using named parameter helped to make sure I did not mess up with parameter ordering

val collections = Seq(
"gs://bucket1/data/",
"gs://bucket2/data/"
).map(path => sc.avroFile[TestRecord](path, suffix=".avro"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the way we want to recommend users perform reads going forward? I marginally prefer the old version since it seems more compact & explicit, but that might just be Stockholm syndrome

Copy link
Contributor Author

@RustedBones RustedBones Jun 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends. I think extension and format are strongly coupled. In term of job parameter, if the paths are taken from the args, extention matcher should not be artificially added. If the file format changes, We'd only have to update the job.
If the path is not a suffix matcher (eg. part-*), it is fine to use path only.
Usin path + suffix is also more consistent with the SMB way, where the IO needs to read additional metadata from the path

@RustedBones RustedBones force-pushed the streamline-io-prefix branch 2 times, most recently from 21725ef to b501af1 Compare June 7, 2023 12:41
@RustedBones RustedBones merged commit 81f8316 into main Jun 7, 2023
@RustedBones RustedBones deleted the streamline-io-prefix branch June 7, 2023 14:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants