diff --git a/scio-core/src/main/scala/com/spotify/scio/values/PairSCollectionFunctions.scala b/scio-core/src/main/scala/com/spotify/scio/values/PairSCollectionFunctions.scala index 92add90305..c5c32edf0a 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/PairSCollectionFunctions.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/PairSCollectionFunctions.scala @@ -35,6 +35,8 @@ import org.apache.beam.sdk.values.{KV, PCollection} import org.joda.time.{Duration, Instant} import org.slf4j.LoggerFactory +import java.lang.{Double => JDouble} + import scala.collection.compat._ private object PairSCollectionFunctions { @@ -1010,16 +1012,6 @@ class PairSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { def mapValues[U: Coder](f: V => U): SCollection[(K, U)] = self.map(kv => (kv._1, f(kv._2))) - /** - * Return the max of values for each key as defined by the implicit `Ordering[T]`. - * @return - * a new SCollection of (key, maximum value) pairs - * @group per_key - */ - // Scala lambda is simpler and more powerful than transforms.Max - def maxByKey(implicit ord: Ordering[V]): SCollection[(K, V)] = - this.reduceByKey(ord.max) - /** * Return the min of values for each key as defined by the implicit `Ordering[T]`. * @return @@ -1030,6 +1022,16 @@ class PairSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { def minByKey(implicit ord: Ordering[V]): SCollection[(K, V)] = this.reduceByKey(ord.min) + /** + * Return the max of values for each key as defined by the implicit `Ordering[T]`. + * @return + * a new SCollection of (key, maximum value) pairs + * @group per_key + */ + // Scala lambda is simpler and more powerful than transforms.Max + def maxByKey(implicit ord: Ordering[V]): SCollection[(K, V)] = + this.reduceByKey(ord.max) + /** * Return latest of values for each key according to its event time, or null if there are no * elements. @@ -1040,6 +1042,30 @@ class PairSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { def latestByKey: SCollection[(K, V)] = self.applyPerKey(Latest.perKey[K, V]())(kvToTuple) + /** + * Reduce by key with [[com.twitter.algebird.Semigroup Semigroup]]. This could be more powerful + * and better optimized than [[reduceByKey]] in some cases. + * @group per_key + */ + def sumByKey(implicit sg: Semigroup[V]): SCollection[(K, V)] = { + PairSCollectionFunctions.logger.warn( + "combineByKey/sumByKey does not support default value and may fail in some streaming " + + "scenarios. Consider aggregateByKey/foldByKey instead." + ) + this.applyPerKey(Combine.perKey(Functions.reduceFn(context, sg)))(kvToTuple) + } + + /** + * Return the mean of values for each key as defined by the implicit `Numeric[T]`. + * @return + * a new SCollection of (key, mean value) pairs + * @group per_key + */ + def meanByKey(implicit ev: Numeric[V]): SCollection[(K, Double)] = + self.transform { in => + in.mapValues[JDouble](ev.toDouble).applyPerKey(Mean.perKey[K, JDouble]())(kdToTuple) + } + /** * Merge the values for each key using an associative reduce function. This will also perform the * merging locally on each mapper before sending results to a reducer, similarly to a "combiner" @@ -1095,19 +1121,6 @@ class PairSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { } } - /** - * Reduce by key with [[com.twitter.algebird.Semigroup Semigroup]]. This could be more powerful - * and better optimized than [[reduceByKey]] in some cases. - * @group per_key - */ - def sumByKey(implicit sg: Semigroup[V]): SCollection[(K, V)] = { - PairSCollectionFunctions.logger.warn( - "combineByKey/sumByKey does not support default value and may fail in some streaming " + - "scenarios. Consider aggregateByKey/foldByKey instead." - ) - this.applyPerKey(Combine.perKey(Functions.reduceFn(context, sg)))(kvToTuple) - } - /** * Swap the keys with the values. * @group transform diff --git a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala index 2decd759ab..4955f7f1f6 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala @@ -773,6 +773,16 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] { */ def map[U: Coder](f: T => U): SCollection[U] = this.parDo(Functions.mapFn(f)) + /** + * Return the min of this SCollection as defined by the implicit `Ordering[T]`. + * @return + * a new SCollection with the minimum element + * @group transform + */ + // Scala lambda is simpler and more powerful than transforms.Min + def min(implicit ord: Ordering[T]): SCollection[T] = + this.reduce(ord.min) + /** * Return the max of this SCollection as defined by the implicit `Ordering[T]`. * @return @@ -784,37 +794,39 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] { this.reduce(ord.max) /** - * Return the mean of this SCollection as defined by the implicit `Numeric[T]`. + * Return the latest of this SCollection according to its event time. * @return - * a new SCollection with the mean of elements + * a new SCollection with the latest element * @group transform */ - def mean(implicit ev: Numeric[T]): SCollection[Double] = this.transform { in => - val e = ev // defeat closure - in.map(e.toDouble) - .asInstanceOf[SCollection[JDouble]] - .pApply(Mean.globally().withoutDefaults()) - .asInstanceOf[SCollection[Double]] - } + def latest: SCollection[T] = + this.withTimestamp.max(Ordering.by(_._2)).keys /** - * Return the min of this SCollection as defined by the implicit `Ordering[T]`. - * @return - * a new SCollection with the minimum element + * Reduce with [[com.twitter.algebird.Semigroup Semigroup]]. This could be more powerful and + * better optimized than [[reduce]] in some cases. * @group transform */ - // Scala lambda is simpler and more powerful than transforms.Min - def min(implicit ord: Ordering[T]): SCollection[T] = - this.reduce(ord.min) + def sum(implicit sg: Semigroup[T]): SCollection[T] = { + SCollection.logger.warn( + "combine/sum does not support default value and may fail in some streaming scenarios. " + + "Consider aggregate/fold instead." + ) + this.pApply(Combine.globally(Functions.reduceFn(context, sg)).withoutDefaults()) + } /** - * Return the latest of this SCollection according to its event time. + * Return the mean of this SCollection as defined by the implicit `Numeric[T]`. * @return - * a new SCollection with the latest element + * a new SCollection with the mean of elements * @group transform */ - def latest: SCollection[T] = - this.withTimestamp.max(Ordering.by(_._2)).keys + def mean(implicit ev: Numeric[T]): SCollection[Double] = this.transform { in => + val e = ev // defeat closure + in.map[JDouble](e.toDouble) + .pApply(Mean.globally().withoutDefaults()) + .asInstanceOf[SCollection[Double]] + } /** * Compute the SCollection's data distribution using approximate `N`-tiles. @@ -945,19 +957,6 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] { _.map((_, ())).subtractByKey(that).keys } - /** - * Reduce with [[com.twitter.algebird.Semigroup Semigroup]]. This could be more powerful and - * better optimized than [[reduce]] in some cases. - * @group transform - */ - def sum(implicit sg: Semigroup[T]): SCollection[T] = { - SCollection.logger.warn( - "combine/sum does not support default value and may fail in some streaming scenarios. " + - "Consider aggregate/fold instead." - ) - this.pApply(Combine.globally(Functions.reduceFn(context, sg)).withoutDefaults()) - } - /** * Return a sampled subset of any `num` elements of the SCollection. * @group transform diff --git a/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithFanout.scala b/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithFanout.scala index 5b9d0b2f9c..0e3b9b4007 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithFanout.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithFanout.scala @@ -104,6 +104,21 @@ class SCollectionWithFanout[T] private[values] (coll: SCollection[T], fanout: In Combine.globally(Functions.reduceFn(context, op)).withoutDefaults().withFanout(fanout) ) + /** [[SCollection.min]] with fan out. */ + def min(implicit ord: Ordering[T]): SCollection[T] = + this.reduce(ord.min) + + /** [[SCollection.max]] with fan out. */ + def max(implicit ord: Ordering[T]): SCollection[T] = + this.reduce(ord.max) + + /** [[SCollection.latest]] with fan out. */ + def latest: SCollection[T] = { + coll.transform { in => + new SCollectionWithFanout(in.withTimestamp, this.fanout).max(Ordering.by(_._2)).keys + } + } + /** [[SCollection.sum]] with fan out. */ def sum(implicit sg: Semigroup[T]): SCollection[T] = { SCollection.logger.warn( @@ -115,14 +130,6 @@ class SCollectionWithFanout[T] private[values] (coll: SCollection[T], fanout: In ) } - /** [[SCollection.min]] with fan out. */ - def min(implicit ord: Ordering[T]): SCollection[T] = - this.reduce(ord.min) - - /** [[SCollection.max]] with fan out. */ - def max(implicit ord: Ordering[T]): SCollection[T] = - this.reduce(ord.max) - /** [[SCollection.mean]] with fan out. */ def mean(implicit ev: Numeric[T]): SCollection[Double] = { val e = ev // defeat closure @@ -133,13 +140,6 @@ class SCollectionWithFanout[T] private[values] (coll: SCollection[T], fanout: In } } - /** [[SCollection.latest]] with fan out. */ - def latest: SCollection[T] = { - coll.transform { in => - new SCollectionWithFanout(in.withTimestamp, this.fanout).max(Ordering.by(_._2)).keys - } - } - /** [[SCollection.top]] with fan out. */ def top(num: Int)(implicit ord: Ordering[T]): SCollection[Iterable[T]] = { coll.transform { in => diff --git a/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithHotKeyFanout.scala b/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithHotKeyFanout.scala index c177557523..116d0f5ca8 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithHotKeyFanout.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithHotKeyFanout.scala @@ -135,15 +135,6 @@ class SCollectionWithHotKeyFanout[K, V] private[values] ( def reduceByKey(op: (V, V) => V): SCollection[(K, V)] = self.applyPerKey(withFanout(Combine.perKey(Functions.reduceFn(context, op))))(kvToTuple) - /** [[PairSCollectionFunctions.sumByKey]] with hot key fanout. */ - def sumByKey(implicit sg: Semigroup[V]): SCollection[(K, V)] = { - SCollection.logger.warn( - "combineByKey/sumByKey does not support default value and may fail in some streaming " + - "scenarios. Consider aggregateByKey/foldByKey instead." - ) - self.applyPerKey(withFanout(Combine.perKey(Functions.reduceFn(context, sg))))(kvToTuple) - } - /** [[SCollection.min]] with hot key fan out. */ def minByKey(implicit ord: Ordering[V]): SCollection[(K, V)] = self.reduceByKey(ord.min) @@ -152,6 +143,15 @@ class SCollectionWithHotKeyFanout[K, V] private[values] ( def maxByKey(implicit ord: Ordering[V]): SCollection[(K, V)] = self.reduceByKey(ord.max) + /** [[SCollection.latest]] with hot key fan out. */ + def latestByKey: SCollection[(K, V)] = { + self.self.transform { in => + new SCollectionWithHotKeyFanout(in.withTimestampedValues, this.hotKeyFanout) + .maxByKey(Ordering.by(_._2)) + .mapValues(_._1) + } + } + /** [[SCollection.mean]] with hot key fan out. */ def meanByKey(implicit ev: Numeric[V]): SCollection[(K, Double)] = { val e = ev // defeat closure @@ -160,13 +160,13 @@ class SCollectionWithHotKeyFanout[K, V] private[values] ( } } - /** [[SCollection.latest]] with hot key fan out. */ - def latestByKey: SCollection[(K, V)] = { - self.self.transform { in => - new SCollectionWithHotKeyFanout(in.withTimestampedValues, this.hotKeyFanout) - .maxByKey(Ordering.by(_._2)) - .mapValues(_._1) - } + /** [[PairSCollectionFunctions.sumByKey]] with hot key fanout. */ + def sumByKey(implicit sg: Semigroup[V]): SCollection[(K, V)] = { + SCollection.logger.warn( + "combineByKey/sumByKey does not support default value and may fail in some streaming " + + "scenarios. Consider aggregateByKey/foldByKey instead." + ) + self.applyPerKey(withFanout(Combine.perKey(Functions.reduceFn(context, sg))))(kvToTuple) } /** [[PairSCollectionFunctions.topByKey]] with hot key fanout. */ diff --git a/scio-core/src/test/scala/com/spotify/scio/values/PairSCollectionFunctionsTest.scala b/scio-core/src/test/scala/com/spotify/scio/values/PairSCollectionFunctionsTest.scala index 7a15632217..6764c16655 100644 --- a/scio-core/src/test/scala/com/spotify/scio/values/PairSCollectionFunctionsTest.scala +++ b/scio-core/src/test/scala/com/spotify/scio/values/PairSCollectionFunctionsTest.scala @@ -713,30 +713,66 @@ class PairSCollectionFunctionsTest extends PipelineSpec { } } - it should "support maxByKey()" in { + it should "support minByKey()" in { runWithContext { sc => - val p = - sc.parallelize(Seq(("a", 1), ("a", 10), ("b", 2), ("b", 20))).maxByKey - p should containInAnyOrder(Seq(("a", 10), ("b", 20))) + def minByKey(elems: (String, Int)*): SCollection[(String, Int)] = + sc.parallelize(elems).minByKey + + minByKey() should beEmpty + minByKey(("a", 1), ("a", 10), ("b", 2), ("b", 20)) should containInAnyOrder( + Seq(("a", 1), ("b", 2)) + ) } } - it should "support minByKey()" in { + it should "support maxByKey()" in { runWithContext { sc => - val p = - sc.parallelize(Seq(("a", 1), ("a", 10), ("b", 2), ("b", 20))).minByKey - p should containInAnyOrder(Seq(("a", 1), ("b", 2))) + def maxByKey(elems: (String, Int)*): SCollection[(String, Int)] = + sc.parallelize(elems).maxByKey + + maxByKey() should beEmpty + maxByKey(("a", 1), ("a", 10), ("b", 2), ("b", 20)) should containInAnyOrder( + Seq(("a", 10), ("b", 20)) + ) } } it should "support latestByKey()" in { runWithContext { sc => - val p = sc - .parallelize(Seq(("a", 1L), ("a", 10L), ("b", 2L), ("b", 20L))) - .timestampBy { case (_, v) => Instant.ofEpochMilli(v) } - .latestByKey + def latestByKey(elems: (String, Int)*): SCollection[(String, Int)] = + sc + .parallelize(elems) + .timestampBy { case (_, v) => Instant.ofEpochMilli(v.toLong) } + .latestByKey + + latestByKey() should beEmpty + latestByKey(("a", 1), ("a", 10), ("b", 2), ("b", 20)) should containInAnyOrder( + Seq(("a", 10), ("b", 20)) + ) + } + } + + it should "support sumByKey" in { + runWithContext { sc => + def sumByKey(elems: (String, Int)*): SCollection[(String, Int)] = + sc.parallelize(elems).sumByKey - p should containInAnyOrder(Seq(("a", 10L), ("b", 20L))) + sumByKey() should beEmpty + sumByKey( + Seq(("a", 1), ("b", 2), ("b", 2)) ++ (1 to 100).map(("c", _)): _* + ) should containInAnyOrder(Seq(("a", 1), ("b", 4), ("c", 5050))) + } + } + + it should "support meanByKey" in { + runWithContext { sc => + def meanByKey(elems: (String, Int)*): SCollection[(String, Double)] = + sc.parallelize(elems).meanByKey + + meanByKey() should beEmpty + meanByKey( + Seq(("a", 1), ("b", 2), ("b", 3)) ++ (0 to 100).map(("c", _)): _* + ) should containInAnyOrder(Seq(("a", 1.0), ("b", 2.5), ("c", 50.0))) } } @@ -820,15 +856,6 @@ class PairSCollectionFunctionsTest extends PipelineSpec { } } - it should "support sumByKey()" in { - runWithContext { sc => - val p = sc - .parallelize(List(("a", 1), ("b", 2), ("b", 2)) ++ (1 to 100).map(("c", _))) - .sumByKey - p should containInAnyOrder(Seq(("a", 1), ("b", 4), ("c", 5050))) - } - } - it should "support swap()" in { runWithContext { sc => val p = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3))).swap diff --git a/scio-core/src/test/scala/com/spotify/scio/values/SCollectionTest.scala b/scio-core/src/test/scala/com/spotify/scio/values/SCollectionTest.scala index cd1cf8ffcf..f31fb33723 100644 --- a/scio-core/src/test/scala/com/spotify/scio/values/SCollectionTest.scala +++ b/scio-core/src/test/scala/com/spotify/scio/values/SCollectionTest.scala @@ -599,6 +599,7 @@ class SCollectionTest extends PipelineSpec { runWithContext { sc => def sum[T: Coder: Semigroup](elems: T*): SCollection[T] = sc.parallelize(elems).sum + sum[Int]() should beEmpty sum(1, 2, 3) should containSingleValue(6) sum(1L, 2L, 3L) should containSingleValue(6L) sum(1f, 2f, 3f) should containSingleValue(6f)