Skip to content

Commit

Permalink
Distribute CMS hot key set as side input (#4724)
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones authored Mar 1, 2023
1 parent e2b92c7 commit e7bde42
Showing 1 changed file with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -623,23 +623,23 @@ private object CMSOperations {
implicit val vCoder: Coder[V] = lhs.valueCoder
implicit val wCoder: Coder[W] = rhs.valueCoder

val cmsSideInput = hotKeyCms.asSingletonSideInput
val hotKeysSideInput = hotKeyCms.map(_.heavyHitters).asSingletonSideInput

val (hotLhs, chillLhs) = (SideOutput[(K, V)](), SideOutput[(K, V)]())
val (hotRhs, chillRhs) = (SideOutput[(K, W)](), SideOutput[(K, W)]())

val partitionedLhs = lhs
.withSideInputs(cmsSideInput)
.withSideInputs(hotKeysSideInput)
.transformWithSideOutputs(Seq(hotLhs, chillLhs), "Partition LHS") { case ((k, _), ctx) =>
val cms = ctx(cmsSideInput)
if (cms.heavyHitters.contains(k)) hotLhs else chillLhs
val hotKeys = ctx(hotKeysSideInput)
if (hotKeys.contains(k)) hotLhs else chillLhs
}

val partitionedRhs = rhs
.withSideInputs(cmsSideInput)
.withSideInputs(hotKeysSideInput)
.transformWithSideOutputs(Seq(hotRhs, chillRhs), "Partition RHS") { case ((k, _), ctx) =>
val cms = ctx(cmsSideInput)
if (cms.heavyHitters.contains(k)) hotRhs else chillRhs
val hotKeys = ctx(hotKeysSideInput)
if (hotKeys.contains(k)) hotRhs else chillRhs
}

val lhsPartitions = Partitions(partitionedLhs(hotLhs), partitionedLhs(chillLhs))
Expand Down

0 comments on commit e7bde42

Please sign in to comment.