From bf69c26c5491b08ece3cf70e1436dac3e22ff8e2 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Tue, 16 May 2023 16:03:23 +0200 Subject: [PATCH] Deprecate old skewed-join API --- .../PairSkewedSCollectionFunctions.scala | 316 +++--------------- 1 file changed, 55 insertions(+), 261 deletions(-) diff --git a/scio-core/src/main/scala/com/spotify/scio/values/PairSkewedSCollectionFunctions.scala b/scio-core/src/main/scala/com/spotify/scio/values/PairSkewedSCollectionFunctions.scala index 23612c80da..1daeb44615 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/PairSkewedSCollectionFunctions.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/PairSkewedSCollectionFunctions.scala @@ -59,14 +59,26 @@ object HotKeyMethod { object SkewedJoins { // some sensible defaults for skewed joins - val DefaultHotKeyThreshold: Int = 9000 - val DefaultHotKeyMethod: HotKeyMethod.Threshold = HotKeyMethod.Threshold(DefaultHotKeyThreshold) + /** Default is 9000 occurrence threshold. */ + val DefaultHotKeyMethod: HotKeyMethod.Threshold = HotKeyMethod.Threshold(9000) + + /** Default is 1 - no fanout. */ val DefaultHotKeyFanout: Int = 1 + + /** Default is 0.001. */ val DefaultCmsEpsilon: Double = 0.001 + + /** Default is 1e-10. */ val DefaultCmsDelta: Double = 1e-10 + + /** Default is 42. */ val DefaultCmsSeed: Int = 42 + + /** Default is `1.0` - no sampling. */ val DefaultSampleFraction: Double = 1.0 - val DefaultSampleWithReplacement: Boolean = true + + /** Default is false - Bernoulli sampling */ + val DefaultSampleWithReplacement: Boolean = false private[scio] def union[T](hot: SCollection[T], chill: SCollection[T]): SCollection[T] = hot.withName("Union hot and chill join results").union(chill) @@ -153,8 +165,7 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { * Read more about CMS: [[com.twitter.algebird.CMS]]. * @group join * @param hotKeyMethod - * Method used to compute hot-keys from the left side collection. Default is 9000 occurrence - * threshold. + * Method used to compute hot-keys from the left side collection. * @param hotKeyFanout * The number of intermediate keys that will be used during the CMS computation. * @param cmsEps @@ -174,13 +185,13 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { */ def skewedJoin[W]( rhs: SCollection[(K, W)], - hotKeyMethod: HotKeyMethod, - hotKeyFanout: Int, - cmsEps: Double, - cmsDelta: Double, - cmsSeed: Int, - sampleFraction: Double, - sampleWithReplacement: Boolean + hotKeyMethod: HotKeyMethod = SkewedJoins.DefaultHotKeyMethod, + hotKeyFanout: Int = SkewedJoins.DefaultHotKeyFanout, + cmsEps: Double = SkewedJoins.DefaultCmsEpsilon, + cmsDelta: Double = SkewedJoins.DefaultCmsDelta, + cmsSeed: Int = SkewedJoins.DefaultCmsSeed, + sampleFraction: Double = SkewedJoins.DefaultSampleFraction, + sampleWithReplacement: Boolean = SkewedJoins.DefaultSampleWithReplacement )(implicit hasher: CMSHasher[K]): SCollection[(K, (V, W))] = self.transform { lhs => val lhsKeys = LargeLeftSide.sampleKeys(lhs, sampleFraction, sampleWithReplacement) import com.twitter.algebird._ @@ -200,57 +211,15 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { } } - /** - * N to 1 skew-proof flavor of [[PairSCollectionFunctions.join]]. - * - * Perform a skewed full join where some keys on the left hand may be hot. Frequency of a key is - * estimated with `1 - delta` probability, and the estimate is within `eps * N` of the true - * frequency. - * - * `true frequency <= estimate <= true frequency + eps * N` - * - * where N is the total size of the left hand side stream so far. - * - * @note - * Make sure to `import com.twitter.algebird.CMSHasherImplicits` before using this join. - * @example - * {{{ - * // Implicits that enabling CMS-hashing - * import com.twitter.algebird.CMSHasherImplicits._ - * val p = logs.skewedJoin(logMetadata) - * }}} - * - * Read more about CMS: [[com.twitter.algebird.CMS]]. - * @group join - * @param hotKeyThreshold - * key with `hotKeyThreshold` values will be considered hot. Some runners have inefficient - * `GroupByKey` implementation for groups with more than 10K values. Thus it is recommended to - * set `hotKeyThreshold` to below 10K, keep upper estimation error in mind. If you sample input - * via `sampleFraction` make sure to adjust `hotKeyThreshold` accordingly. Default is 9000. - * @param eps - * One-sided error bound on the error of each point query, i.e. frequency estimate. Must lie in - * `(0, 1)`. Default is 0.001. - * @param seed - * A seed to initialize the random number generator used to create the pairwise independent hash - * functions. Default is 42. - * @param delta - * A bound on the probability that a query estimate does not lie within some small interval (an - * interval that depends on `eps`) around the truth. Must lie in `(0, 1)`. Default is 1e-10. - * @param sampleFraction - * left side sample fraction. Default is `1.0` - no sampling. - * @param withReplacement - * whether to use sampling with replacement, see - * [[SCollection.sample(withReplacement:Boolean,fraction:Double)* SCollection.sample]]. Default - * is true. - */ + @deprecated("Use skewedJoin with hotKeyMethod instead", since = "0.13.0") def skewedJoin[W]( rhs: SCollection[(K, W)], - hotKeyThreshold: Long = SkewedJoins.DefaultHotKeyThreshold, - eps: Double = SkewedJoins.DefaultCmsEpsilon, - seed: Int = SkewedJoins.DefaultCmsSeed, - delta: Double = SkewedJoins.DefaultCmsDelta, - sampleFraction: Double = SkewedJoins.DefaultSampleFraction, - withReplacement: Boolean = SkewedJoins.DefaultSampleWithReplacement + hotKeyThreshold: Long, + eps: Double, + seed: Int, + delta: Double, + sampleFraction: Double, + withReplacement: Boolean )(implicit hasher: CMSHasher[K]): SCollection[(K, (V, W))] = skewedJoin( rhs, @@ -263,37 +232,7 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { withReplacement ) - /** - * N to 1 skew-proof flavor of [[PairSCollectionFunctions.join]]. - * - * Perform a skewed join where some keys on the left hand may be hot, i.e. appear more than - * `hotKeyThreshold` times. Frequency of a key is estimated with `1 - delta` probability, and the - * estimate is within `eps * N` of the true frequency. - * - * `true frequency <= estimate <= true frequency + eps * N` - * - * where N is the total size of the left hand side stream so far. - * - * @note - * Make sure to `import com.twitter.algebird.CMSHasherImplicits` before using this join. - * @example - * {{{ - * // Implicits that enabling CMS-hashing - * import com.twitter.algebird.CMSHasherImplicits._ - * val keyAggregator = CMS.aggregator[K](eps, delta, seed) - * val hotKeyCMS = self.keys.aggregate(keyAggregator) - * val p = logs.skewedJoin(logMetadata, hotKeyThreshold=8500, cms=hotKeyCMS) - * }}} - * - * Read more about CMS: [[com.twitter.algebird.CMS]]. - * @group join - * @param hotKeyThreshold - * key with `hotKeyThreshold` values will be considered hot. Some runners have inefficient - * `GroupByKey` implementation for groups with more than 10K values. Thus it is recommended to - * set `hotKeyThreshold` to below 10K, keep upper estimation error in mind. - * @param cms - * left hand side key [[com.twitter.algebird.CMS]] - */ + @deprecated("Use skewedJoin with hotKeyMethod instead", since = "0.13.0") def skewedJoin[W]( rhs: SCollection[(K, W)], hotKeyThreshold: Long, @@ -362,8 +301,7 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { * Read more about CMS: [[com.twitter.algebird.CMS]]. * @group join * @param hotKeyMethod - * Method used to compute hot-keys from the left side collection. Default is 9000 occurrence - * threshold. + * Method used to compute hot-keys from the left side collection. * @param hotKeyFanout * The number of intermediate keys that will be used during the CMS computation. * @param cmsEps @@ -409,57 +347,15 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { } } - /** - * N to 1 skew-proof flavor of [[PairSCollectionFunctions.leftOuterJoin]]. - * - * Perform a skewed full join where some keys on the left hand may be hot. Frequency of a key is - * estimated with `1 - delta` probability, and the estimate is within `eps * N` of the true - * frequency. - * - * `true frequency <= estimate <= true frequency + eps * N` - * - * where N is the total size of the left hand side stream so far. - * - * @note - * Make sure to `import com.twitter.algebird.CMSHasherImplicits` before using this join. - * @example - * {{{ - * // Implicits that enabling CMS-hashing - * import com.twitter.algebird.CMSHasherImplicits._ - * val p = logs.skewedLeftOuterJoin(logMetadata) - * }}} - * - * Read more about CMS: [[com.twitter.algebird.CMS]]. - * @group join - * @param hotKeyThreshold - * key with `hotKeyThreshold` values will be considered hot. Some runners have inefficient - * `GroupByKey` implementation for groups with more than 10K values. Thus it is recommended to - * set `hotKeyThreshold` to below 10K, keep upper estimation error in mind. If you sample input - * via `sampleFraction` make sure to adjust `hotKeyThreshold` accordingly. Default is 9000. - * @param eps - * One-sided error bound on the error of each point query, i.e. frequency estimate. Must lie in - * `(0, 1)`. Default is 0.001. - * @param seed - * A seed to initialize the random number generator used to create the pairwise independent hash - * functions. Default is 42. - * @param delta - * A bound on the probability that a query estimate does not lie within some small interval (an - * interval that depends on `eps`) around the truth. Must lie in `(0, 1)`. Default is 1e-10. - * @param sampleFraction - * left side sample fraction. Default is `1.0` - no sampling. - * @param withReplacement - * whether to use sampling with replacement, see - * [[SCollection.sample(withReplacement:Boolean,fraction:Double)* SCollection.sample]]. Default - * is true. - */ + @deprecated("Use skewedLeftOuterJoin with hotKeyMethod instead", since = "0.13.0") def skewedLeftOuterJoin[W]( rhs: SCollection[(K, W)], - hotKeyThreshold: Long = SkewedJoins.DefaultHotKeyThreshold, - eps: Double = SkewedJoins.DefaultCmsEpsilon, - seed: Int = SkewedJoins.DefaultCmsSeed, - delta: Double = SkewedJoins.DefaultCmsDelta, - sampleFraction: Double = SkewedJoins.DefaultSampleFraction, - withReplacement: Boolean = SkewedJoins.DefaultSampleWithReplacement + hotKeyThreshold: Long, + eps: Double, + seed: Int, + delta: Double, + sampleFraction: Double, + withReplacement: Boolean )(implicit hasher: CMSHasher[K]): SCollection[(K, (V, Option[W]))] = skewedLeftOuterJoin( rhs, @@ -472,37 +368,7 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { withReplacement ) - /** - * N to 1 skew-proof flavor of [[PairSCollectionFunctions.leftOuterJoin]]. - * - * Perform a skewed left join where some keys on the left hand may be hot, i.e. appear more than - * `hotKeyThreshold` times. Frequency of a key is estimated with `1 - delta` probability, and the - * estimate is within `eps * N` of the true frequency. - * - * `true frequency <= estimate <= true frequency + eps * N` - * - * where N is the total size of the left hand side stream so far. - * - * @note - * Make sure to `import com.twitter.algebird.CMSHasherImplicits` before using this join. - * @example - * {{{ - * // Implicits that enabling CMS-hashing - * import com.twitter.algebird.CMSHasherImplicits._ - * val keyAggregator = CMS.aggregator[K](eps, delta, seed) - * val hotKeyCMS = self.keys.aggregate(keyAggregator) - * val p = logs.skewedLeftOuterJoin(logMetadata, hotKeyThreshold=8500, cms=hotKeyCMS) - * }}} - * - * Read more about CMS: [[com.twitter.algebird.CMS]]. - * @group join - * @param hotKeyThreshold - * key with `hotKeyThreshold` values will be considered hot. Some runners have inefficient - * `GroupByKey` implementation for groups with more than 10K values. Thus it is recommended to - * set `hotKeyThreshold` to below 10K, keep upper estimation error in mind. - * @param cms - * left hand side key [[com.twitter.algebird.CMS]] - */ + @deprecated("Use skewedLeftOuterJoin with hotKeyMethod instead", since = "0.13.0") def skewedLeftOuterJoin[W]( rhs: SCollection[(K, W)], hotKeyThreshold: Long, @@ -589,13 +455,13 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { */ def skewedFullOuterJoin[W]( rhs: SCollection[(K, W)], - hotKeyMethod: HotKeyMethod, - hotKeyFanout: Int, - cmsEps: Double, - cmsDelta: Double, - cmsSeed: Int, - sampleFraction: Double, - sampleWithReplacement: Boolean + hotKeyMethod: HotKeyMethod = SkewedJoins.DefaultHotKeyMethod, + hotKeyFanout: Int = SkewedJoins.DefaultHotKeyFanout, + cmsEps: Double = SkewedJoins.DefaultCmsEpsilon, + cmsDelta: Double = SkewedJoins.DefaultCmsDelta, + cmsSeed: Int = SkewedJoins.DefaultCmsSeed, + sampleFraction: Double = SkewedJoins.DefaultSampleFraction, + sampleWithReplacement: Boolean = SkewedJoins.DefaultSampleWithReplacement )(implicit hasher: CMSHasher[K]): SCollection[(K, (Option[V], Option[W]))] = self.transform { lhs => import com.twitter.algebird._ @@ -616,57 +482,15 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { } } - /** - * N to 1 skew-proof flavor of [[PairSCollectionFunctions.fullOuterJoin]]. - * - * Perform a skewed full join where some keys on the left hand may be hot. Frequency of a key is - * estimated with `1 - delta` probability, and the estimate is within `eps * N` of the true - * frequency. - * - * `true frequency <= estimate <= true frequency + eps * N` - * - * where N is the total size of the left hand side stream so far. - * - * @note - * Make sure to `import com.twitter.algebird.CMSHasherImplicits` before using this join. - * @example - * {{{ - * // Implicits that enabling CMS-hashing - * import com.twitter.algebird.CMSHasherImplicits._ - * val p = logs.skewedFullOuterJoin(logMetadata) - * }}} - * - * Read more about CMS: [[com.twitter.algebird.CMS]]. - * @group join - * @param hotKeyThreshold - * key with `hotKeyThreshold` values will be considered hot. Some runners have inefficient - * `GroupByKey` implementation for groups with more than 10K values. Thus it is recommended to - * set `hotKeyThreshold` to below 10K, keep upper estimation error in mind. If you sample input - * via `sampleFraction` make sure to adjust `hotKeyThreshold` accordingly. Default is 9000. - * @param eps - * One-sided error bound on the error of each point query, i.e. frequency estimate. Must lie in - * `(0, 1)`. Default is 0.001. - * @param seed - * A seed to initialize the random number generator used to create the pairwise independent hash - * functions. Default is 42. - * @param delta - * A bound on the probability that a query estimate does not lie within some small interval (an - * interval that depends on `eps`) around the truth. Must lie in `(0, 1)`. Default is 1e-10. - * @param sampleFraction - * left side sample fraction. Default is `1.0` - no sampling. - * @param withReplacement - * whether to use sampling with replacement, see - * [[SCollection.sample(withReplacement:Boolean,fraction:Double)* SCollection.sample]]. Default - * is true. - */ + @deprecated("Use skewedFullOuterJoin with hotKeyMethod instead", since = "0.13.0") def skewedFullOuterJoin[W]( rhs: SCollection[(K, W)], - hotKeyThreshold: Long = SkewedJoins.DefaultHotKeyThreshold, - eps: Double = SkewedJoins.DefaultCmsEpsilon, - seed: Int = SkewedJoins.DefaultCmsSeed, - delta: Double = SkewedJoins.DefaultCmsDelta, - sampleFraction: Double = SkewedJoins.DefaultSampleFraction, - withReplacement: Boolean = SkewedJoins.DefaultSampleWithReplacement + hotKeyThreshold: Long, + eps: Double, + seed: Int, + delta: Double, + sampleFraction: Double, + withReplacement: Boolean )(implicit hasher: CMSHasher[K]): SCollection[(K, (Option[V], Option[W]))] = skewedFullOuterJoin( rhs, @@ -679,37 +503,7 @@ class PairSkewedSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { withReplacement ) - /** - * N to 1 skew-proof flavor of [[PairSCollectionFunctions.fullOuterJoin]]. - * - * Perform a skewed full outer join where some keys on the left hand may be hot, i.e.appear more - * than`hotKeyThreshold` times. Frequency of a key is estimated with `1 - delta` probability, and - * the estimate is within `eps * N` of the true frequency. - * - * `true frequency <= estimate <= true frequency + eps * N` - * - * where N is the total size of the left hand side stream so far. - * - * @note - * Make sure to `import com.twitter.algebird.CMSHasherImplicits` before using this join. - * @example - * {{{ - * // Implicits that enabling CMS-hashing - * import com.twitter.algebird.CMSHasherImplicits._ - * val keyAggregator = CMS.aggregator[K](eps, delta, seed) - * val hotKeyCMS = self.keys.aggregate(keyAggregator) - * val p = logs.skewedFullOuterJoin(logMetadata, hotKeyThreshold=8500, cms=hotKeyCMS) - * }}} - * - * Read more about CMS: [[com.twitter.algebird.CMSMonoid]]. - * @group join - * @param hotKeyThreshold - * key with `hotKeyThreshold` values will be considered hot. Some runners have inefficient - * `GroupByKey` implementation for groups with more than 10K values. Thus it is recommended to - * set `hotKeyThreshold` to below 10K, keep upper estimation error in mind. - * @param cms - * left hand side key [[com.twitter.algebird.CMSMonoid]] - */ + @deprecated("Use skewedFullOuterJoin with hotKeyMethod instead", since = "0.13.0") def skewedFullOuterJoin[W]( rhs: SCollection[(K, W)], hotKeyThreshold: Long,