-
Notifications
You must be signed in to change notification settings - Fork 513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for Zstd coders #5321
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5321 +/- ##
==========================================
- Coverage 62.69% 61.08% -1.61%
==========================================
Files 301 306 +5
Lines 10848 10993 +145
Branches 773 774 +1
==========================================
- Hits 6801 6715 -86
- Misses 4047 4278 +231 ☔ View full report in Codecov by Sentry. |
e | ||
) | ||
} | ||
className.replaceAll("\\$", ".") -> path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe missing some other cases here?
Current version is because Class.forName
requires OuterClass$InnerClass
but the typename in the coders is OuterClass.InnerClass
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A blacklist based on package name may also make sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added blacklist
scio-core/src/main/scala/com/spotify/scio/values/PairSCollectionFunctions.scala
Outdated
Show resolved
Hide resolved
scio-core/src/main/scala/com/spotify/scio/coders/CoderMaterializer.scala
Outdated
Show resolved
Hide resolved
scio-core/src/main/scala/com/spotify/scio/coders/CoderMaterializer.scala
Outdated
Show resolved
Hide resolved
scio-core/src/main/java/com/spotify/scio/options/ScioOptions.java
Outdated
Show resolved
Hide resolved
s.split(":", 2).toList match { | ||
case className :: path :: Nil => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default case should catch when we have more that 1 :
separator
s.split(":", 2).toList match { | |
case className :: path :: Nil => | |
s.split(":") match { | |
case Array(className, path) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The path part can contain a :
, ala gs://
.
I suppose we could use a variety of other characters, e.g. ,
, but :
makes the most sense as a 'mapping' IMO
private def unwrapZstd[T](options: CoderOptions, coder: BCoder[T]): BCoder[T] = | ||
coder match { | ||
case c: BZstdCoder[T] => | ||
val underlying = c.getCoderArguments.get(0).asInstanceOf[BCoder[T]] | ||
unwrap(options, underlying) | ||
case _ => coder | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not factorizing thin in the unwrap
since it is always chained ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In e.g. getTupleCoders
, we unwrap only the top-level Zstd coder for Tuple2[K, V]
but want to retain any Zstd coder on K
. Putting the zstd unwrapping in unwrap
leads to the dict being discarded on many/every unwrap call.
keyDict: Array[Byte] = null, | ||
valueDict: Array[Byte] = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a fan of null defaults.
Also, we used kv
for beam.KV
in many placed, not tuples. This signature is a bit confusing.
Do we actually need this API since this is a regular Tuple2Coder
with implicit (un)compressed key and value coders values ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for user ergonomics if setting the coder manually.
sc.parallelize[(String, String)](
List(...)
)(ZstdCoder.kv(valueDict = dictBytes))
Not providing this means users need to manually lift their dict into a Beam coder into a Scio coder. And using optionals here makes the API unpleasant IMO even though I in general agree with you.
Is ZstdCoder.tuple2
less annoying/confusing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm lightly in favor of tuple2
(unfortunately) -- I think kv
is already semantically "taken" by Beam KV
class.
// training bytes may not exceed 2GiB a.k.a. the max value of an Int | ||
val trainingBytesTargetActual: Int = Option(trainingBytesTarget).getOrElse { | ||
val computed = | ||
Try(Math.multiplyExact(zstdDictSizeBytes, 100)).toOption.getOrElse(Int.MaxValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to catch this or let it fail ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm actually I am unsure of what happens in this case, like if the training size is related to the elements added or the dict size itself. 2gb / 100 implies that the max zdict dictionary size should be 20ish megs, so maybe we should throw on that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on throwing an exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-reading, ~100x is just a recommendation, so using fewer items would probably decrease its effectiveness but wouldn't be catastrophic. Gonna throw anyway since recommendations argue against large dictionaries.
// estimate the sample rate we need by examining numElementsForSizeEstimation elements | ||
val streamsCntSI = scoll.count.asSingletonSideInput(0L) | ||
val sampleRateSI = scoll | ||
.take(numElementsForSizeEstimation) | ||
.map(v => toBytes(v).length) | ||
.sum | ||
.withSideInputs(streamsCntSI) | ||
.map { case (totalSize, ctx) => | ||
val avgSize = totalSize / numElementsForSizeEstimation | ||
val targetNumElements = trainingBytesTargetActual / avgSize | ||
val sampleRate = targetNumElements.toDouble / ctx(streamsCntSI) | ||
logger.info(s"Computed sample rate for Zstd dictionary: ${sampleRate}") | ||
sampleRate | ||
} | ||
.toSCollection | ||
.asSingletonSideInput | ||
|
||
scoll | ||
.withSideInputs(sampleRateSI) | ||
.flatMap { | ||
case (s, ctx) if new Random().nextDouble() <= ctx(sampleRateSI) => | ||
Some(toBytes(s)) | ||
case _ => None | ||
} | ||
.toSCollection | ||
.keyBy(_ => ()) | ||
.groupByKey | ||
.map { case (_, elements) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not using sample
with sampleSize
as the result should fit in memory anyway ?
We'll just have to use Int
for numElementsForSizeEstimation
// estimate the sample rate we need by examining numElementsForSizeEstimation elements | |
val streamsCntSI = scoll.count.asSingletonSideInput(0L) | |
val sampleRateSI = scoll | |
.take(numElementsForSizeEstimation) | |
.map(v => toBytes(v).length) | |
.sum | |
.withSideInputs(streamsCntSI) | |
.map { case (totalSize, ctx) => | |
val avgSize = totalSize / numElementsForSizeEstimation | |
val targetNumElements = trainingBytesTargetActual / avgSize | |
val sampleRate = targetNumElements.toDouble / ctx(streamsCntSI) | |
logger.info(s"Computed sample rate for Zstd dictionary: ${sampleRate}") | |
sampleRate | |
} | |
.toSCollection | |
.asSingletonSideInput | |
scoll | |
.withSideInputs(sampleRateSI) | |
.flatMap { | |
case (s, ctx) if new Random().nextDouble() <= ctx(sampleRateSI) => | |
Some(toBytes(s)) | |
case _ => None | |
} | |
.toSCollection | |
.keyBy(_ => ()) | |
.groupByKey | |
.map { case (_, elements) => | |
scoll | |
.sample(numElementsForSizeEstimation) | |
.map { elements => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is so users don't need to know the average size of elements in the pipeline; we estimate it based on numElementsForSizeEstimation
. Taking only those elements is not useful, since you may need many more elements to actually train.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe smth like this can help #5352 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I imagine doing a bunch of priority queue merges is somewhat less efficient (though more accurate) but perhaps doesn't matter in this case since we have small amounts of data in the end.
Adds
saveAsZstdDictionary
to train a Zstd dictionary on some arbitrarySCollection[T]
. Estimates the average size of elementsT
, collectsn
elements based on a target training set size, then trains and saves the Zstd dictionary.ZstdCoder
object with transform Coders for the simpleT
or for each side of a(K, V)
MyClass
to get Zstd compression automagically. Probably fails if the type is parameterized.--zstdDictionary=com.spotify.scio.MyClass:gs://bucket/path/dict.bin