Skip to content

Commit

Permalink
Merge 14e04de into 1f32fd6
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones authored Jun 12, 2023
2 parents 1f32fd6 + 14e04de commit 4c08ee5
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class PairSCollectionFunctions[K, V](val self: SCollection[(K, V)]) {
* intermediate node.
*/
def withHotKeyFanout(hotKeyFanout: K => Int): SCollectionWithHotKeyFanout[K, V] =
new SCollectionWithHotKeyFanout(context, this, Left(hotKeyFanout))
new SCollectionWithHotKeyFanout(this, Left(hotKeyFanout))

/**
* Convert this SCollection to an [[SCollectionWithHotKeyFanout]] that uses an intermediate node
Expand All @@ -98,7 +98,7 @@ class PairSCollectionFunctions[K, V](val self: SCollection[(K, V)]) {
* constant value for every key
*/
def withHotKeyFanout(hotKeyFanout: Int): SCollectionWithHotKeyFanout[K, V] =
new SCollectionWithHotKeyFanout(context, this, Right(hotKeyFanout))
new SCollectionWithHotKeyFanout(this, Right(hotKeyFanout))

// =======================================================================
// CoGroups
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import org.apache.beam.sdk.transforms.{Combine, SerializableFunction}
* performing the full combine.
*/
class SCollectionWithHotKeyFanout[K, V] private[values] (
private val context: ScioContext,
private val self: PairSCollectionFunctions[K, V],
private val hotKeyFanout: Either[K => Int, Int]
) extends TransformNameable {

private[this] val context: ScioContext = self.self.context
implicit private[this] val valueCoder: Coder[V] = self.valueCoder

private def withFanout[K0, I, O](
Expand Down Expand Up @@ -73,7 +74,7 @@ class SCollectionWithHotKeyFanout[K, V] private[values] (
def aggregateByKey[A: Coder, U: Coder](aggregator: Aggregator[V, A, U]): SCollection[(K, U)] =
self.self.transform { in =>
val a = aggregator // defeat closure
new SCollectionWithHotKeyFanout(context, in.mapValues(a.prepare), hotKeyFanout)
new SCollectionWithHotKeyFanout(in.mapValues(a.prepare), hotKeyFanout)
.sumByKey(a.semigroup)
.mapValues(a.present)
}
Expand All @@ -87,7 +88,7 @@ class SCollectionWithHotKeyFanout[K, V] private[values] (
): SCollection[(K, U)] = {
self.self.transform { in =>
val a = aggregator // defeat closure
new SCollectionWithHotKeyFanout(context, in.mapValues(a.prepare), hotKeyFanout)
new SCollectionWithHotKeyFanout(in.mapValues(a.prepare), hotKeyFanout)
.foldByKey(a.monoid)
.mapValues(a.present)
}
Expand Down

0 comments on commit 4c08ee5

Please sign in to comment.