From 4f3717026757f1b81fe05ba8f413d7b47e544a7e Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Tue, 16 May 2023 16:03:23 +0200 Subject: [PATCH 1/5] Deprecate old skewed-join API --- .../PairSkewedSCollectionFunctions.scala | 330 ++++-------------- .../scio/examples/cookbook/JoinExamples.scala | 13 +- 2 files changed, 68 insertions(+), 275 deletions(-) diff --git a/scio-core/src/main/scala/com/spotify/scio/values/PairSkewedSCollectionFunctions.scala b/scio-core/src/main/scala/com/spotify/scio/values/PairSkewedSCollectionFunctions.scala index 23612c80da..0b64aa25fd 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/PairSkewedSCollectionFunctions.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/PairSkewedSCollectionFunctions.scala @@ -59,14 +59,26 @@ object HotKeyMethod { object SkewedJoins { // some sensible defaults for skewed joins - val DefaultHotKeyThreshold: Int = 9000 - val DefaultHotKeyMethod: HotKeyMethod.Threshold = HotKeyMethod.Threshold(DefaultHotKeyThreshold) + /** Default is 9000 occurrence threshold. */ + val DefaultHotKeyMethod: HotKeyMethod.Threshold = HotKeyMethod.Threshold(9000) + + /** Default is 1 - no fanout. */ val DefaultHotKeyFanout: Int = 1 + + /** Default is 0.001. */ val DefaultCmsEpsilon: Double = 0.001 + + /** Default is 1e-10. */ val DefaultCmsDelta: Double = 1e-10 + + /** Default is 42. */ val DefaultCmsSeed: Int = 42 + + /** Default is `1.0` - no sampling. */ val DefaultSampleFraction: Double = 1.0 - val DefaultSampleWithReplacement: Boolean = true + + /** Default is false - Bernoulli sampling */ + val DefaultSampleWithReplacement: Boolean = false private[scio] def union[T](hot: SCollection[T], chill: SCollection[T]): SCollection[T] = hot.withName("Union hot and chill join results").union(chill) @@ -153,8 +165,7 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { * Read more about CMS: [[com.twitter.algebird.CMS]]. * @group join * @param hotKeyMethod - * Method used to compute hot-keys from the left side collection. Default is 9000 occurrence - * threshold. + * Method used to compute hot-keys from the left side collection. * @param hotKeyFanout * The number of intermediate keys that will be used during the CMS computation. * @param cmsEps @@ -174,13 +185,13 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { */ def skewedJoin[W]( rhs: SCollection[(K, W)], - hotKeyMethod: HotKeyMethod, - hotKeyFanout: Int, - cmsEps: Double, - cmsDelta: Double, - cmsSeed: Int, - sampleFraction: Double, - sampleWithReplacement: Boolean + hotKeyMethod: HotKeyMethod = SkewedJoins.DefaultHotKeyMethod, + hotKeyFanout: Int = SkewedJoins.DefaultHotKeyFanout, + cmsEps: Double = SkewedJoins.DefaultCmsEpsilon, + cmsDelta: Double = SkewedJoins.DefaultCmsDelta, + cmsSeed: Int = SkewedJoins.DefaultCmsSeed, + sampleFraction: Double = SkewedJoins.DefaultSampleFraction, + sampleWithReplacement: Boolean = SkewedJoins.DefaultSampleWithReplacement )(implicit hasher: CMSHasher[K]): SCollection[(K, (V, W))] = self.transform { lhs => val lhsKeys = LargeLeftSide.sampleKeys(lhs, sampleFraction, sampleWithReplacement) import com.twitter.algebird._ @@ -200,57 +211,15 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { } } - /** - * N to 1 skew-proof flavor of [[PairSCollectionFunctions.join]]. - * - * Perform a skewed full join where some keys on the left hand may be hot. Frequency of a key is - * estimated with `1 - delta` probability, and the estimate is within `eps * N` of the true - * frequency. - * - * `true frequency <= estimate <= true frequency + eps * N` - * - * where N is the total size of the left hand side stream so far. - * - * @note - * Make sure to `import com.twitter.algebird.CMSHasherImplicits` before using this join. - * @example - * {{{ - * // Implicits that enabling CMS-hashing - * import com.twitter.algebird.CMSHasherImplicits._ - * val p = logs.skewedJoin(logMetadata) - * }}} - * - * Read more about CMS: [[com.twitter.algebird.CMS]]. - * @group join - * @param hotKeyThreshold - * key with `hotKeyThreshold` values will be considered hot. Some runners have inefficient - * `GroupByKey` implementation for groups with more than 10K values. Thus it is recommended to - * set `hotKeyThreshold` to below 10K, keep upper estimation error in mind. If you sample input - * via `sampleFraction` make sure to adjust `hotKeyThreshold` accordingly. Default is 9000. - * @param eps - * One-sided error bound on the error of each point query, i.e. frequency estimate. Must lie in - * `(0, 1)`. Default is 0.001. - * @param seed - * A seed to initialize the random number generator used to create the pairwise independent hash - * functions. Default is 42. - * @param delta - * A bound on the probability that a query estimate does not lie within some small interval (an - * interval that depends on `eps`) around the truth. Must lie in `(0, 1)`. Default is 1e-10. - * @param sampleFraction - * left side sample fraction. Default is `1.0` - no sampling. - * @param withReplacement - * whether to use sampling with replacement, see - * [[SCollection.sample(withReplacement:Boolean,fraction:Double)* SCollection.sample]]. Default - * is true. - */ + @deprecated("Use skewedJoin with hotKeyMethod instead", since = "0.13.0") def skewedJoin[W]( rhs: SCollection[(K, W)], - hotKeyThreshold: Long = SkewedJoins.DefaultHotKeyThreshold, - eps: Double = SkewedJoins.DefaultCmsEpsilon, - seed: Int = SkewedJoins.DefaultCmsSeed, - delta: Double = SkewedJoins.DefaultCmsDelta, - sampleFraction: Double = SkewedJoins.DefaultSampleFraction, - withReplacement: Boolean = SkewedJoins.DefaultSampleWithReplacement + hotKeyThreshold: Long, + eps: Double, + seed: Int, + delta: Double, + sampleFraction: Double, + withReplacement: Boolean )(implicit hasher: CMSHasher[K]): SCollection[(K, (V, W))] = skewedJoin( rhs, @@ -263,37 +232,7 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { withReplacement ) - /** - * N to 1 skew-proof flavor of [[PairSCollectionFunctions.join]]. - * - * Perform a skewed join where some keys on the left hand may be hot, i.e. appear more than - * `hotKeyThreshold` times. Frequency of a key is estimated with `1 - delta` probability, and the - * estimate is within `eps * N` of the true frequency. - * - * `true frequency <= estimate <= true frequency + eps * N` - * - * where N is the total size of the left hand side stream so far. - * - * @note - * Make sure to `import com.twitter.algebird.CMSHasherImplicits` before using this join. - * @example - * {{{ - * // Implicits that enabling CMS-hashing - * import com.twitter.algebird.CMSHasherImplicits._ - * val keyAggregator = CMS.aggregator[K](eps, delta, seed) - * val hotKeyCMS = self.keys.aggregate(keyAggregator) - * val p = logs.skewedJoin(logMetadata, hotKeyThreshold=8500, cms=hotKeyCMS) - * }}} - * - * Read more about CMS: [[com.twitter.algebird.CMS]]. - * @group join - * @param hotKeyThreshold - * key with `hotKeyThreshold` values will be considered hot. Some runners have inefficient - * `GroupByKey` implementation for groups with more than 10K values. Thus it is recommended to - * set `hotKeyThreshold` to below 10K, keep upper estimation error in mind. - * @param cms - * left hand side key [[com.twitter.algebird.CMS]] - */ + @deprecated("Use skewedJoin with hotKeyMethod instead", since = "0.13.0") def skewedJoin[W]( rhs: SCollection[(K, W)], hotKeyThreshold: Long, @@ -362,8 +301,7 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { * Read more about CMS: [[com.twitter.algebird.CMS]]. * @group join * @param hotKeyMethod - * Method used to compute hot-keys from the left side collection. Default is 9000 occurrence - * threshold. + * Method used to compute hot-keys from the left side collection. * @param hotKeyFanout * The number of intermediate keys that will be used during the CMS computation. * @param cmsEps @@ -383,13 +321,13 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { */ def skewedLeftOuterJoin[W]( rhs: SCollection[(K, W)], - hotKeyMethod: HotKeyMethod, - hotKeyFanout: Int, - cmsEps: Double, - cmsDelta: Double, - cmsSeed: Int, - sampleFraction: Double, - sampleWithReplacement: Boolean + hotKeyMethod: HotKeyMethod = SkewedJoins.DefaultHotKeyMethod, + hotKeyFanout: Int = SkewedJoins.DefaultHotKeyFanout, + cmsEps: Double = SkewedJoins.DefaultCmsEpsilon, + cmsDelta: Double = SkewedJoins.DefaultCmsDelta, + cmsSeed: Int = SkewedJoins.DefaultCmsSeed, + sampleFraction: Double = SkewedJoins.DefaultSampleFraction, + sampleWithReplacement: Boolean = SkewedJoins.DefaultSampleWithReplacement )(implicit hasher: CMSHasher[K]): SCollection[(K, (V, Option[W]))] = self.transform { lhs => import com.twitter.algebird._ val lhsKeys = LargeLeftSide.sampleKeys(lhs, sampleFraction, sampleWithReplacement) @@ -409,57 +347,15 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { } } - /** - * N to 1 skew-proof flavor of [[PairSCollectionFunctions.leftOuterJoin]]. - * - * Perform a skewed full join where some keys on the left hand may be hot. Frequency of a key is - * estimated with `1 - delta` probability, and the estimate is within `eps * N` of the true - * frequency. - * - * `true frequency <= estimate <= true frequency + eps * N` - * - * where N is the total size of the left hand side stream so far. - * - * @note - * Make sure to `import com.twitter.algebird.CMSHasherImplicits` before using this join. - * @example - * {{{ - * // Implicits that enabling CMS-hashing - * import com.twitter.algebird.CMSHasherImplicits._ - * val p = logs.skewedLeftOuterJoin(logMetadata) - * }}} - * - * Read more about CMS: [[com.twitter.algebird.CMS]]. - * @group join - * @param hotKeyThreshold - * key with `hotKeyThreshold` values will be considered hot. Some runners have inefficient - * `GroupByKey` implementation for groups with more than 10K values. Thus it is recommended to - * set `hotKeyThreshold` to below 10K, keep upper estimation error in mind. If you sample input - * via `sampleFraction` make sure to adjust `hotKeyThreshold` accordingly. Default is 9000. - * @param eps - * One-sided error bound on the error of each point query, i.e. frequency estimate. Must lie in - * `(0, 1)`. Default is 0.001. - * @param seed - * A seed to initialize the random number generator used to create the pairwise independent hash - * functions. Default is 42. - * @param delta - * A bound on the probability that a query estimate does not lie within some small interval (an - * interval that depends on `eps`) around the truth. Must lie in `(0, 1)`. Default is 1e-10. - * @param sampleFraction - * left side sample fraction. Default is `1.0` - no sampling. - * @param withReplacement - * whether to use sampling with replacement, see - * [[SCollection.sample(withReplacement:Boolean,fraction:Double)* SCollection.sample]]. Default - * is true. - */ + @deprecated("Use skewedLeftOuterJoin with hotKeyMethod instead", since = "0.13.0") def skewedLeftOuterJoin[W]( rhs: SCollection[(K, W)], - hotKeyThreshold: Long = SkewedJoins.DefaultHotKeyThreshold, - eps: Double = SkewedJoins.DefaultCmsEpsilon, - seed: Int = SkewedJoins.DefaultCmsSeed, - delta: Double = SkewedJoins.DefaultCmsDelta, - sampleFraction: Double = SkewedJoins.DefaultSampleFraction, - withReplacement: Boolean = SkewedJoins.DefaultSampleWithReplacement + hotKeyThreshold: Long, + eps: Double, + seed: Int, + delta: Double, + sampleFraction: Double, + withReplacement: Boolean )(implicit hasher: CMSHasher[K]): SCollection[(K, (V, Option[W]))] = skewedLeftOuterJoin( rhs, @@ -472,37 +368,7 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { withReplacement ) - /** - * N to 1 skew-proof flavor of [[PairSCollectionFunctions.leftOuterJoin]]. - * - * Perform a skewed left join where some keys on the left hand may be hot, i.e. appear more than - * `hotKeyThreshold` times. Frequency of a key is estimated with `1 - delta` probability, and the - * estimate is within `eps * N` of the true frequency. - * - * `true frequency <= estimate <= true frequency + eps * N` - * - * where N is the total size of the left hand side stream so far. - * - * @note - * Make sure to `import com.twitter.algebird.CMSHasherImplicits` before using this join. - * @example - * {{{ - * // Implicits that enabling CMS-hashing - * import com.twitter.algebird.CMSHasherImplicits._ - * val keyAggregator = CMS.aggregator[K](eps, delta, seed) - * val hotKeyCMS = self.keys.aggregate(keyAggregator) - * val p = logs.skewedLeftOuterJoin(logMetadata, hotKeyThreshold=8500, cms=hotKeyCMS) - * }}} - * - * Read more about CMS: [[com.twitter.algebird.CMS]]. - * @group join - * @param hotKeyThreshold - * key with `hotKeyThreshold` values will be considered hot. Some runners have inefficient - * `GroupByKey` implementation for groups with more than 10K values. Thus it is recommended to - * set `hotKeyThreshold` to below 10K, keep upper estimation error in mind. - * @param cms - * left hand side key [[com.twitter.algebird.CMS]] - */ + @deprecated("Use skewedLeftOuterJoin with hotKeyMethod instead", since = "0.13.0") def skewedLeftOuterJoin[W]( rhs: SCollection[(K, W)], hotKeyThreshold: Long, @@ -589,13 +455,13 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { */ def skewedFullOuterJoin[W]( rhs: SCollection[(K, W)], - hotKeyMethod: HotKeyMethod, - hotKeyFanout: Int, - cmsEps: Double, - cmsDelta: Double, - cmsSeed: Int, - sampleFraction: Double, - sampleWithReplacement: Boolean + hotKeyMethod: HotKeyMethod = SkewedJoins.DefaultHotKeyMethod, + hotKeyFanout: Int = SkewedJoins.DefaultHotKeyFanout, + cmsEps: Double = SkewedJoins.DefaultCmsEpsilon, + cmsDelta: Double = SkewedJoins.DefaultCmsDelta, + cmsSeed: Int = SkewedJoins.DefaultCmsSeed, + sampleFraction: Double = SkewedJoins.DefaultSampleFraction, + sampleWithReplacement: Boolean = SkewedJoins.DefaultSampleWithReplacement )(implicit hasher: CMSHasher[K]): SCollection[(K, (Option[V], Option[W]))] = self.transform { lhs => import com.twitter.algebird._ @@ -616,57 +482,15 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { } } - /** - * N to 1 skew-proof flavor of [[PairSCollectionFunctions.fullOuterJoin]]. - * - * Perform a skewed full join where some keys on the left hand may be hot. Frequency of a key is - * estimated with `1 - delta` probability, and the estimate is within `eps * N` of the true - * frequency. - * - * `true frequency <= estimate <= true frequency + eps * N` - * - * where N is the total size of the left hand side stream so far. - * - * @note - * Make sure to `import com.twitter.algebird.CMSHasherImplicits` before using this join. - * @example - * {{{ - * // Implicits that enabling CMS-hashing - * import com.twitter.algebird.CMSHasherImplicits._ - * val p = logs.skewedFullOuterJoin(logMetadata) - * }}} - * - * Read more about CMS: [[com.twitter.algebird.CMS]]. - * @group join - * @param hotKeyThreshold - * key with `hotKeyThreshold` values will be considered hot. Some runners have inefficient - * `GroupByKey` implementation for groups with more than 10K values. Thus it is recommended to - * set `hotKeyThreshold` to below 10K, keep upper estimation error in mind. If you sample input - * via `sampleFraction` make sure to adjust `hotKeyThreshold` accordingly. Default is 9000. - * @param eps - * One-sided error bound on the error of each point query, i.e. frequency estimate. Must lie in - * `(0, 1)`. Default is 0.001. - * @param seed - * A seed to initialize the random number generator used to create the pairwise independent hash - * functions. Default is 42. - * @param delta - * A bound on the probability that a query estimate does not lie within some small interval (an - * interval that depends on `eps`) around the truth. Must lie in `(0, 1)`. Default is 1e-10. - * @param sampleFraction - * left side sample fraction. Default is `1.0` - no sampling. - * @param withReplacement - * whether to use sampling with replacement, see - * [[SCollection.sample(withReplacement:Boolean,fraction:Double)* SCollection.sample]]. Default - * is true. - */ + @deprecated("Use skewedFullOuterJoin with hotKeyMethod instead", since = "0.13.0") def skewedFullOuterJoin[W]( rhs: SCollection[(K, W)], - hotKeyThreshold: Long = SkewedJoins.DefaultHotKeyThreshold, - eps: Double = SkewedJoins.DefaultCmsEpsilon, - seed: Int = SkewedJoins.DefaultCmsSeed, - delta: Double = SkewedJoins.DefaultCmsDelta, - sampleFraction: Double = SkewedJoins.DefaultSampleFraction, - withReplacement: Boolean = SkewedJoins.DefaultSampleWithReplacement + hotKeyThreshold: Long, + eps: Double, + seed: Int, + delta: Double, + sampleFraction: Double, + withReplacement: Boolean )(implicit hasher: CMSHasher[K]): SCollection[(K, (Option[V], Option[W]))] = skewedFullOuterJoin( rhs, @@ -679,37 +503,7 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { withReplacement ) - /** - * N to 1 skew-proof flavor of [[PairSCollectionFunctions.fullOuterJoin]]. - * - * Perform a skewed full outer join where some keys on the left hand may be hot, i.e.appear more - * than`hotKeyThreshold` times. Frequency of a key is estimated with `1 - delta` probability, and - * the estimate is within `eps * N` of the true frequency. - * - * `true frequency <= estimate <= true frequency + eps * N` - * - * where N is the total size of the left hand side stream so far. - * - * @note - * Make sure to `import com.twitter.algebird.CMSHasherImplicits` before using this join. - * @example - * {{{ - * // Implicits that enabling CMS-hashing - * import com.twitter.algebird.CMSHasherImplicits._ - * val keyAggregator = CMS.aggregator[K](eps, delta, seed) - * val hotKeyCMS = self.keys.aggregate(keyAggregator) - * val p = logs.skewedFullOuterJoin(logMetadata, hotKeyThreshold=8500, cms=hotKeyCMS) - * }}} - * - * Read more about CMS: [[com.twitter.algebird.CMSMonoid]]. - * @group join - * @param hotKeyThreshold - * key with `hotKeyThreshold` values will be considered hot. Some runners have inefficient - * `GroupByKey` implementation for groups with more than 10K values. Thus it is recommended to - * set `hotKeyThreshold` to below 10K, keep upper estimation error in mind. - * @param cms - * left hand side key [[com.twitter.algebird.CMSMonoid]] - */ + @deprecated("Use skewedFullOuterJoin with hotKeyMethod instead", since = "0.13.0") def skewedFullOuterJoin[W]( rhs: SCollection[(K, W)], hotKeyThreshold: Long, diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/JoinExamples.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/JoinExamples.scala index 979ff7fd62..61bc7a1fd7 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/JoinExamples.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/JoinExamples.scala @@ -26,6 +26,7 @@ package com.spotify.scio.examples.cookbook import com.spotify.scio.bigquery._ import com.spotify.scio._ import com.spotify.scio.examples.common.ExampleData +import com.spotify.scio.values.HotKeyMethod // ## Utilities used in all examples object JoinUtil { @@ -167,12 +168,11 @@ object SkewedJoinExamples { * Internally it identifies two groups of keys: "Hot" and the rest. Hot keys are joined using * Hash Join and the rest with the regular join. There are 3 ways to identify the set of hot * keys: - * - "threshold" as a cutoff frequency; + * - "threshold" as a cutoff frequency. * - "top percentage" to specify the maximum relative part of all keys can be considered - * hot; + * hot. * - "top N" to specify the absolute number of hot keys. - */ - /** + * * Also, there are several optional parameters in different overloads that could tune the * default behavior: * - `sampleFraction` - the fraction to sample keys in LHS, can improve performance if less @@ -195,9 +195,8 @@ object SkewedJoinExamples { * - `cmsSeed` - random value generator seed for CMS. No need to specify unless you * deliberately expect some (non)deterministic results. */ - .skewedLeftOuterJoin(countryInfo, hotKeyThreshold = 100) - .map { t => - val (countryCode, (eventInfo, countryNameOpt)) = t + .skewedLeftOuterJoin(countryInfo, hotKeyMethod = HotKeyMethod.Threshold(100)) + .map { case (countryCode, (eventInfo, countryNameOpt)) => val countryName = countryNameOpt.getOrElse("none") formatOutput(countryCode, countryName, eventInfo) } From 378cd9782c400126e5fb848aca7ab8d6c6a68b6a Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Wed, 17 May 2023 13:21:57 +0200 Subject: [PATCH 2/5] Remove API --- .../PairSkewedSCollectionFunctions.scala | 159 ++++++++++-------- 1 file changed, 93 insertions(+), 66 deletions(-) diff --git a/scio-core/src/main/scala/com/spotify/scio/values/PairSkewedSCollectionFunctions.scala b/scio-core/src/main/scala/com/spotify/scio/values/PairSkewedSCollectionFunctions.scala index 0b64aa25fd..1676822385 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/PairSkewedSCollectionFunctions.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/PairSkewedSCollectionFunctions.scala @@ -211,28 +211,37 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { } } - @deprecated("Use skewedJoin with hotKeyMethod instead", since = "0.13.0") - def skewedJoin[W]( - rhs: SCollection[(K, W)], - hotKeyThreshold: Long, - eps: Double, - seed: Int, - delta: Double, - sampleFraction: Double, - withReplacement: Boolean - )(implicit hasher: CMSHasher[K]): SCollection[(K, (V, W))] = - skewedJoin( - rhs, - HotKeyMethod.Threshold(hotKeyThreshold), - SkewedJoins.DefaultHotKeyFanout, - eps, - delta, - seed, - sampleFraction, - withReplacement - ) - - @deprecated("Use skewedJoin with hotKeyMethod instead", since = "0.13.0") + /** + * N to 1 skew-proof flavor of [[PairSCollectionFunctions.join]]. + * + * Perform a skewed join where some keys on the left hand may be hot, i.e. appear more than + * `hotKeyThreshold` times. Frequency of a key is estimated with `1 - delta` probability, and the + * estimate is within `eps * N` of the true frequency. + * + * `true frequency <= estimate <= true frequency + eps * N` + * + * where N is the total size of the left hand side stream so far. + * + * @note + * Make sure to `import com.twitter.algebird.CMSHasherImplicits` before using this join. + * @example + * {{{ + * // Implicits that enabling CMS-hashing + * import com.twitter.algebird.CMSHasherImplicits._ + * val keyAggregator = CMS.aggregator[K](eps, delta, seed) + * val hotKeyCMS = self.keys.aggregate(keyAggregator) + * val p = logs.skewedJoin(logMetadata, hotKeyThreshold=8500, cms=hotKeyCMS) + * }}} + * + * Read more about CMS: [[com.twitter.algebird.CMS]]. + * @group join + * @param hotKeyThreshold + * key with `hotKeyThreshold` values will be considered hot. Some runners have inefficient + * `GroupByKey` implementation for groups with more than 10K values. Thus it is recommended to + * set `hotKeyThreshold` to below 10K, keep upper estimation error in mind. + * @param cms + * left hand side key [[com.twitter.algebird.CMS]] + */ def skewedJoin[W]( rhs: SCollection[(K, W)], hotKeyThreshold: Long, @@ -347,28 +356,37 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { } } - @deprecated("Use skewedLeftOuterJoin with hotKeyMethod instead", since = "0.13.0") - def skewedLeftOuterJoin[W]( - rhs: SCollection[(K, W)], - hotKeyThreshold: Long, - eps: Double, - seed: Int, - delta: Double, - sampleFraction: Double, - withReplacement: Boolean - )(implicit hasher: CMSHasher[K]): SCollection[(K, (V, Option[W]))] = - skewedLeftOuterJoin( - rhs, - HotKeyMethod.Threshold(hotKeyThreshold), - SkewedJoins.DefaultHotKeyFanout, - eps, - delta, - seed, - sampleFraction, - withReplacement - ) - - @deprecated("Use skewedLeftOuterJoin with hotKeyMethod instead", since = "0.13.0") + /** + * N to 1 skew-proof flavor of [[PairSCollectionFunctions.leftOuterJoin]]. + * + * Perform a skewed left join where some keys on the left hand may be hot, i.e. appear more than + * `hotKeyThreshold` times. Frequency of a key is estimated with `1 - delta` probability, and the + * estimate is within `eps * N` of the true frequency. + * + * `true frequency <= estimate <= true frequency + eps * N` + * + * where N is the total size of the left hand side stream so far. + * + * @note + * Make sure to `import com.twitter.algebird.CMSHasherImplicits` before using this join. + * @example + * {{{ + * // Implicits that enabling CMS-hashing + * import com.twitter.algebird.CMSHasherImplicits._ + * val keyAggregator = CMS.aggregator[K](eps, delta, seed) + * val hotKeyCMS = self.keys.aggregate(keyAggregator) + * val p = logs.skewedLeftOuterJoin(logMetadata, hotKeyThreshold=8500, cms=hotKeyCMS) + * }}} + * + * Read more about CMS: [[com.twitter.algebird.CMS]]. + * @group join + * @param hotKeyThreshold + * key with `hotKeyThreshold` values will be considered hot. Some runners have inefficient + * `GroupByKey` implementation for groups with more than 10K values. Thus it is recommended to + * set `hotKeyThreshold` to below 10K, keep upper estimation error in mind. + * @param cms + * left hand side key [[com.twitter.algebird.CMS]] + */ def skewedLeftOuterJoin[W]( rhs: SCollection[(K, W)], hotKeyThreshold: Long, @@ -482,28 +500,37 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { } } - @deprecated("Use skewedFullOuterJoin with hotKeyMethod instead", since = "0.13.0") - def skewedFullOuterJoin[W]( - rhs: SCollection[(K, W)], - hotKeyThreshold: Long, - eps: Double, - seed: Int, - delta: Double, - sampleFraction: Double, - withReplacement: Boolean - )(implicit hasher: CMSHasher[K]): SCollection[(K, (Option[V], Option[W]))] = - skewedFullOuterJoin( - rhs, - HotKeyMethod.Threshold(hotKeyThreshold), - SkewedJoins.DefaultHotKeyFanout, - eps, - delta, - seed, - sampleFraction, - withReplacement - ) - - @deprecated("Use skewedFullOuterJoin with hotKeyMethod instead", since = "0.13.0") + /** + * N to 1 skew-proof flavor of [[PairSCollectionFunctions.fullOuterJoin]]. + * + * Perform a skewed full outer join where some keys on the left hand may be hot, i.e.appear more + * than`hotKeyThreshold` times. Frequency of a key is estimated with `1 - delta` probability, and + * the estimate is within `eps * N` of the true frequency. + * + * `true frequency <= estimate <= true frequency + eps * N` + * + * where N is the total size of the left hand side stream so far. + * + * @note + * Make sure to `import com.twitter.algebird.CMSHasherImplicits` before using this join. + * @example + * {{{ + * // Implicits that enabling CMS-hashing + * import com.twitter.algebird.CMSHasherImplicits._ + * val keyAggregator = CMS.aggregator[K](eps, delta, seed) + * val hotKeyCMS = self.keys.aggregate(keyAggregator) + * val p = logs.skewedFullOuterJoin(logMetadata, hotKeyThreshold=8500, cms=hotKeyCMS) + * }}} + * + * Read more about CMS: [[com.twitter.algebird.CMSMonoid]]. + * @group join + * @param hotKeyThreshold + * key with `hotKeyThreshold` values will be considered hot. Some runners have inefficient + * `GroupByKey` implementation for groups with more than 10K values. Thus it is recommended to + * set `hotKeyThreshold` to below 10K, keep upper estimation error in mind. + * @param cms + * left hand side key [[com.twitter.algebird.CMSMonoid]] + */ def skewedFullOuterJoin[W]( rhs: SCollection[(K, W)], hotKeyThreshold: Long, From 15273e7432ce58db021b555abcc1ee4789ac5d49 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Wed, 31 May 2023 15:18:43 +0200 Subject: [PATCH 3/5] Add scalafix rule for skewedJoin API migration --- scalafix/build.sbt | 18 ++++- .../src/main/scala/fix/FixSkewedJoins.scala | 48 +++++++++++ .../src/main/scala/fix/FixSkewedJoins.scala | 22 ++++++ scalafix/project/Scio.scala | 3 +- .../META-INF/services/scalafix.v1.Rule | 3 +- .../src/main/scala/fix/FixSkewedJoins.scala | 79 +++++++++++++++++++ 6 files changed, 169 insertions(+), 4 deletions(-) create mode 100644 scalafix/input-0_13/src/main/scala/fix/FixSkewedJoins.scala create mode 100644 scalafix/output-0_13/src/main/scala/fix/FixSkewedJoins.scala create mode 100644 scalafix/rules/src/main/scala/fix/FixSkewedJoins.scala diff --git a/scalafix/build.sbt b/scalafix/build.sbt index 42245970cc..3a5116efc4 100644 --- a/scalafix/build.sbt +++ b/scalafix/build.sbt @@ -88,6 +88,16 @@ lazy val `output-0_12` = project libraryDependencies ++= scio(Scio.`0.12`) ) +//lazy val `input-0_13` = project +// .settings( +// libraryDependencies ++= scio(Scio.`0.12`) +// ) +// +//lazy val `output-0_13` = project +// .settings( +// libraryDependencies ++= scio(Scio.`0.13`) +// ) + lazy val tests = project .enablePlugins(ScalafixTestkitPlugin) .dependsOn(rules) @@ -98,7 +108,8 @@ lazy val tests = project `input-0_7` / Compile / compile, `input-0_8` / Compile / compile, `input-0_10` / Compile / compile, - `input-0_12` / Compile / compile + `input-0_12` / Compile / compile, + // `input-0_13` / Compile / compile ) .value, scalafixTestkitOutputSourceDirectories := @@ -106,14 +117,17 @@ lazy val tests = project (`output-0_8` / Compile / sourceDirectories).value ++ (`output-0_10` / Compile / sourceDirectories).value ++ (`output-0_12` / Compile / sourceDirectories).value, + // (`output-0_13` / Compile / sourceDirectories).value, scalafixTestkitInputSourceDirectories := (`input-0_7` / Compile / sourceDirectories).value ++ (`input-0_8` / Compile / sourceDirectories).value ++ (`input-0_10` / Compile / sourceDirectories).value ++ (`input-0_12` / Compile / sourceDirectories).value, + // (`input-0_13` / Compile / sourceDirectories).value, scalafixTestkitInputClasspath := (`input-0_7` / Compile / fullClasspath).value ++ (`input-0_8` / Compile / fullClasspath).value ++ (`input-0_10` / Compile / fullClasspath).value ++ - (`input-0_12` / Compile / fullClasspath).value + (`input-0_12` / Compile / fullClasspath).value // ++ + // (`input-0_13` / Compile / fullClasspath).value ) diff --git a/scalafix/input-0_13/src/main/scala/fix/FixSkewedJoins.scala b/scalafix/input-0_13/src/main/scala/fix/FixSkewedJoins.scala new file mode 100644 index 0000000000..7ab41ca7a5 --- /dev/null +++ b/scalafix/input-0_13/src/main/scala/fix/FixSkewedJoins.scala @@ -0,0 +1,48 @@ +/* +rule = FixSkewedJoins + */ +package fix +package v0_13_0 + +import com.spotify.scio.values.SCollection + +object FixSkewedJoins { + + // pair SCollection + val scoll: SCollection[(String, Int)] = ??? + val rhs: SCollection[(String, String)] = ??? + + // unamed parameters + scoll.skewedJoin(rhs, 9000, 0.001, 42, 1e-10, 1.0, true) + scoll.skewedLeftOuterJoin(rhs, 9000, 0.001, 42, 1e-10, 1.0, true) + scoll.skewedFullOuterJoin(rhs, 9000, 0.001, 42, 1e-10, 1.0, true) + + // named parameters + scoll.skewedJoin( + rhs = rhs, + hotKeyThreshold = 9000, + eps = 0.001, + seed = 42, + delta = 1e-10, + sampleFraction = 1.0, + withReplacement = true + ) + scoll.skewedLeftOuterJoin( + rhs = rhs, + hotKeyThreshold = 9000, + eps = 0.001, + seed = 42, + delta = 1e-10, + sampleFraction = 1.0, + withReplacement = true + ) + scoll.skewedFullOuterJoin( + rhs = rhs, + hotKeyThreshold = 9000, + eps = 0.001, + seed = 42, + delta = 1e-10, + sampleFraction = 1.0, + withReplacement = true + ) +} \ No newline at end of file diff --git a/scalafix/output-0_13/src/main/scala/fix/FixSkewedJoins.scala b/scalafix/output-0_13/src/main/scala/fix/FixSkewedJoins.scala new file mode 100644 index 0000000000..d55a5d8880 --- /dev/null +++ b/scalafix/output-0_13/src/main/scala/fix/FixSkewedJoins.scala @@ -0,0 +1,22 @@ +package fix +package v0_13_0 + +import com.spotify.scio.values.SCollection +import com.spotify.scio.values.HotKeyMethod + +object FixSkewedJoins { + + // pair SCollection + val scoll: SCollection[(String, Int)] = ??? + val rhs: SCollection[(String, String)] = ??? + + // unamed parameters + scoll.skewedJoin(rhs = rhs, hotKeyMethod = HotKeyMethod.Threshold(9000), cmsEps = 0.001d, cmsDelta = 1E-10d, cmsSeed = 42, sampleFraction = 1.0d, sampleWithReplacement = true) + scoll.skewedLeftOuterJoin(rhs = rhs, hotKeyMethod = HotKeyMethod.Threshold(9000), cmsEps = 0.001d, cmsDelta = 1E-10d, cmsSeed = 42, sampleFraction = 1.0d, sampleWithReplacement = true) + scoll.skewedFullOuterJoin(rhs = rhs, hotKeyMethod = HotKeyMethod.Threshold(9000), cmsEps = 0.001d, cmsDelta = 1E-10d, cmsSeed = 42, sampleFraction = 1.0d, sampleWithReplacement = true) + + // named parameters + scoll.skewedJoin(rhs = rhs, hotKeyMethod = HotKeyMethod.Threshold(9000), cmsEps = 0.001d, cmsDelta = 1E-10d, cmsSeed = 42, sampleFraction = 1.0d, sampleWithReplacement = true) + scoll.skewedLeftOuterJoin(rhs = rhs, hotKeyMethod = HotKeyMethod.Threshold(9000), cmsEps = 0.001d, cmsDelta = 1E-10d, cmsSeed = 42, sampleFraction = 1.0d, sampleWithReplacement = true) + scoll.skewedFullOuterJoin(rhs = rhs, hotKeyMethod = HotKeyMethod.Threshold(9000), cmsEps = 0.001d, cmsDelta = 1E-10d, cmsSeed = 42, sampleFraction = 1.0d, sampleWithReplacement = true) +} \ No newline at end of file diff --git a/scalafix/project/Scio.scala b/scalafix/project/Scio.scala index 7bbde17908..ec39837fe7 100644 --- a/scalafix/project/Scio.scala +++ b/scalafix/project/Scio.scala @@ -5,5 +5,6 @@ object Scio { val `0.9` = "0.9.5" val `0.10` = "0.10.4" val `0.11` = "0.11.9" - val `0.12` = "0.12.0-RC1" + val `0.12` = "0.12.0" + val `0.13` = "0.12.8+42-378cd978+20230531-1404-SNAPSHOT" // TODO } diff --git a/scalafix/rules/src/main/resources/META-INF/services/scalafix.v1.Rule b/scalafix/rules/src/main/resources/META-INF/services/scalafix.v1.Rule index 5ce91590c9..3a9fefb060 100644 --- a/scalafix/rules/src/main/resources/META-INF/services/scalafix.v1.Rule +++ b/scalafix/rules/src/main/resources/META-INF/services/scalafix.v1.Rule @@ -11,4 +11,5 @@ fix.v0_8_0.FixBigQueryDeprecations fix.v0_8_0.ConsistenceJoinNames fix.v0_10_0.FixCoderPropagation fix.v0_12_0.FixBqSaveAsTable -fix.v0_12_0.FixPubsubSpecializations \ No newline at end of file +fix.v0_12_0.FixPubsubSpecializations +fix.v0_13_0.FixSkewedJoins \ No newline at end of file diff --git a/scalafix/rules/src/main/scala/fix/FixSkewedJoins.scala b/scalafix/rules/src/main/scala/fix/FixSkewedJoins.scala new file mode 100644 index 0000000000..075e1d3814 --- /dev/null +++ b/scalafix/rules/src/main/scala/fix/FixSkewedJoins.scala @@ -0,0 +1,79 @@ +package fix +package v0_13_0 + +import scalafix.v1.{MethodSignature, _} + +import scala.meta._ +import scala.meta.contrib._ + +object FixSkewedJoins { + + private val PairSkewedSCollectionFunctions = "com/spotify/scio/values/PairSkewedSCollectionFunctions" + + val SkewedJoins: SymbolMatcher = + SymbolMatcher.normalized(PairSkewedSCollectionFunctions + "#skewedJoin") + + SymbolMatcher.normalized(PairSkewedSCollectionFunctions + "#skewedLeftOuterJoin") + + SymbolMatcher.normalized(PairSkewedSCollectionFunctions + "#skewedFullOuterJoin") + + val HotKeyMethodImport = importer"com.spotify.scio.values.HotKeyMethod" + + val ParamRhs = Term.Name("rhs") + val ParamHotKeyThreshold = Term.Name("hotKeyThreshold") + val ParamEps = Term.Name("eps") + val ParamSeed = Term.Name("seed") + val ParamDelta = Term.Name("delta") + val ParamSampleFraction = Term.Name("sampleFraction") + val ParamWithReplacement = Term.Name("withReplacement") + + val OldParameters = List( + ParamRhs.value, + ParamHotKeyThreshold.value, + ParamEps.value, + ParamSeed.value, + ParamDelta.value, + ParamSampleFraction.value, + ParamWithReplacement.value, + "hasher" + ) + +} + +class FixSkewedJoins extends SemanticRule("FixSkewedJoins") { + + import FixSkewedJoins._ + + private def isOldSkewedJoinApi(fn: Term)(implicit doc: SemanticDocument): Boolean = { + val symbol = fn.symbol + SkewedJoins.matches(symbol) && (symbol.info.get.signature match { + case MethodSignature(_, parameterLists, _) => parameterLists.flatten.map(_.symbol.displayName) == OldParameters + case _ => false + }) + } + + + private def findParam(param: Term.Name, pos: Int)(args: List[Term]): Option[Term] = { + args + .collectFirst { + case q"$name = $value" if name.isEqual(param) => value + } + .orElse { + args.takeWhile(!_.isInstanceOf[Term.Assign]).lift(pos) + } + } + + override def fix(implicit doc: SemanticDocument): Patch = { + doc.tree.collect { + case t @ q"$fn(..$params)" if isOldSkewedJoinApi(fn) => + val rhs = findParam(ParamRhs, 0)(params).map(p => q"rhs = $p") + val hotKeyMethod = findParam(ParamHotKeyThreshold, 1)(params).map(p => q"hotKeyMethod = HotKeyMethod.Threshold($p)") + val cmsEps = findParam(ParamEps, 2)(params).map(p => q"cmsEps = $p") + val cmsDelta = findParam(ParamDelta, 4)(params).map(p => q"cmsDelta = $p") + val cmsSeed = findParam(ParamSeed, 3)(params).map(p => q"cmsSeed = $p") + val sampleFraction = findParam(ParamSampleFraction, 5)(params).map(p => q"sampleFraction = $p") + val sampleWithReplacement = findParam(ParamWithReplacement, 6)(params).map(p => q"sampleWithReplacement = $p") + + val updated = (rhs ++ hotKeyMethod ++ cmsEps ++ cmsDelta ++ cmsSeed ++ sampleFraction ++ sampleWithReplacement).toList + Patch.addGlobalImport(HotKeyMethodImport) + Patch.replaceTree(t, q"$fn(..$updated)".syntax) + }.asPatch + } +} From 30ae6f076bdfe89ed4039caf7a792be998271727 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Mon, 5 Jun 2023 11:02:17 +0200 Subject: [PATCH 4/5] Respect scalafix packaging --- .../src/main/scala/fix/{ => v0_13_0}/FixSkewedJoins.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) rename scalafix/rules/src/main/scala/fix/{ => v0_13_0}/FixSkewedJoins.scala (99%) diff --git a/scalafix/rules/src/main/scala/fix/FixSkewedJoins.scala b/scalafix/rules/src/main/scala/fix/v0_13_0/FixSkewedJoins.scala similarity index 99% rename from scalafix/rules/src/main/scala/fix/FixSkewedJoins.scala rename to scalafix/rules/src/main/scala/fix/v0_13_0/FixSkewedJoins.scala index 075e1d3814..bb807ecdc2 100644 --- a/scalafix/rules/src/main/scala/fix/FixSkewedJoins.scala +++ b/scalafix/rules/src/main/scala/fix/v0_13_0/FixSkewedJoins.scala @@ -1,5 +1,4 @@ -package fix -package v0_13_0 +package fix.v0_13_0 import scalafix.v1.{MethodSignature, _} From c5a827837219ac2ae8398678bfc8d72eb4eca5a6 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Mon, 5 Jun 2023 15:18:28 +0200 Subject: [PATCH 5/5] Move scalafix packages --- .../src/main/scala/fix/{ => v0_13_0}/FixSkewedJoins.scala | 3 +-- .../src/main/scala/fix/{ => v0_13_0}/FixSkewedJoins.scala | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) rename scalafix/input-0_13/src/main/scala/fix/{ => v0_13_0}/FixSkewedJoins.scala (97%) rename scalafix/output-0_13/src/main/scala/fix/{ => v0_13_0}/FixSkewedJoins.scala (98%) diff --git a/scalafix/input-0_13/src/main/scala/fix/FixSkewedJoins.scala b/scalafix/input-0_13/src/main/scala/fix/v0_13_0/FixSkewedJoins.scala similarity index 97% rename from scalafix/input-0_13/src/main/scala/fix/FixSkewedJoins.scala rename to scalafix/input-0_13/src/main/scala/fix/v0_13_0/FixSkewedJoins.scala index 7ab41ca7a5..1ef98f22bb 100644 --- a/scalafix/input-0_13/src/main/scala/fix/FixSkewedJoins.scala +++ b/scalafix/input-0_13/src/main/scala/fix/v0_13_0/FixSkewedJoins.scala @@ -1,8 +1,7 @@ /* rule = FixSkewedJoins */ -package fix -package v0_13_0 +package fix.v0_13_0 import com.spotify.scio.values.SCollection diff --git a/scalafix/output-0_13/src/main/scala/fix/FixSkewedJoins.scala b/scalafix/output-0_13/src/main/scala/fix/v0_13_0/FixSkewedJoins.scala similarity index 98% rename from scalafix/output-0_13/src/main/scala/fix/FixSkewedJoins.scala rename to scalafix/output-0_13/src/main/scala/fix/v0_13_0/FixSkewedJoins.scala index d55a5d8880..9e6ef8208a 100644 --- a/scalafix/output-0_13/src/main/scala/fix/FixSkewedJoins.scala +++ b/scalafix/output-0_13/src/main/scala/fix/v0_13_0/FixSkewedJoins.scala @@ -1,5 +1,4 @@ -package fix -package v0_13_0 +package fix.v0_13_0 import com.spotify.scio.values.SCollection import com.spotify.scio.values.HotKeyMethod