diff --git a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala index 73334ca8c4..46f75a1c44 100644 --- a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala +++ b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala @@ -138,7 +138,8 @@ object ScioContext { */ // scalastyle:off number.of.methods class ScioContext private[scio] (val options: PipelineOptions, - private var artifacts: List[String]) { + private var artifacts: List[String]) + extends TransformNameable { private implicit val context: ScioContext = this @@ -398,7 +399,7 @@ class ScioContext private[scio] (val options: PipelineOptions, private[scio] def applyInternal[Output <: POutput](root: PTransform[_ >: PBegin, Output]) : Output = - pipeline.apply(CallSites.getCurrent, root) + pipeline.apply(this.tfName, root) /** * Get an SCollection for an object file. diff --git a/scio-core/src/main/scala/com/spotify/scio/util/MultiJoin.scala b/scio-core/src/main/scala/com/spotify/scio/util/MultiJoin.scala index 807ac2f15d..0da52734a6 100644 --- a/scio-core/src/main/scala/com/spotify/scio/util/MultiJoin.scala +++ b/scio-core/src/main/scala/com/spotify/scio/util/MultiJoin.scala @@ -29,12 +29,12 @@ package com.spotify.scio.util import com.google.cloud.dataflow.sdk.transforms.join.{CoGroupByKey, KeyedPCollectionTuple} import com.google.cloud.dataflow.sdk.values.TupleTag import com.google.common.collect.Lists -import com.spotify.scio.values.SCollection +import com.spotify.scio.values.{SCollection, TransformNameable} import scala.collection.JavaConverters._ import scala.reflect.ClassTag -object MultiJoin { +object MultiJoin extends TransformNameable { def toOptions[T](xs: Iterator[T]): Iterator[Option[T]] = if (xs.isEmpty) Iterator(None) else xs.map(Option(_)) @@ -43,8 +43,8 @@ object MultiJoin { val keyed = KeyedPCollectionTuple .of(tagA, a.toKV.internal) .and(tagB, b.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).map { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).map { kv => val (key, result) = (kv.getKey, kv.getValue) (key, (result.getAll(tagA).asScala, result.getAll(tagB).asScala)) } @@ -56,8 +56,8 @@ object MultiJoin { .of(tagA, a.toKV.internal) .and(tagB, b.toKV.internal) .and(tagC, c.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).map { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).map { kv => val (key, result) = (kv.getKey, kv.getValue) (key, (result.getAll(tagA).asScala, result.getAll(tagB).asScala, result.getAll(tagC).asScala)) } @@ -70,8 +70,8 @@ object MultiJoin { .and(tagB, b.toKV.internal) .and(tagC, c.toKV.internal) .and(tagD, d.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).map { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).map { kv => val (key, result) = (kv.getKey, kv.getValue) (key, (result.getAll(tagA).asScala, result.getAll(tagB).asScala, result.getAll(tagC).asScala, result.getAll(tagD).asScala)) } @@ -85,8 +85,8 @@ object MultiJoin { .and(tagC, c.toKV.internal) .and(tagD, d.toKV.internal) .and(tagE, e.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).map { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).map { kv => val (key, result) = (kv.getKey, kv.getValue) (key, (result.getAll(tagA).asScala, result.getAll(tagB).asScala, result.getAll(tagC).asScala, result.getAll(tagD).asScala, result.getAll(tagE).asScala)) } @@ -101,8 +101,8 @@ object MultiJoin { .and(tagD, d.toKV.internal) .and(tagE, e.toKV.internal) .and(tagF, f.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).map { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).map { kv => val (key, result) = (kv.getKey, kv.getValue) (key, (result.getAll(tagA).asScala, result.getAll(tagB).asScala, result.getAll(tagC).asScala, result.getAll(tagD).asScala, result.getAll(tagE).asScala, result.getAll(tagF).asScala)) } @@ -118,8 +118,8 @@ object MultiJoin { .and(tagE, e.toKV.internal) .and(tagF, f.toKV.internal) .and(tagG, g.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).map { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).map { kv => val (key, result) = (kv.getKey, kv.getValue) (key, (result.getAll(tagA).asScala, result.getAll(tagB).asScala, result.getAll(tagC).asScala, result.getAll(tagD).asScala, result.getAll(tagE).asScala, result.getAll(tagF).asScala, result.getAll(tagG).asScala)) } @@ -136,8 +136,8 @@ object MultiJoin { .and(tagF, f.toKV.internal) .and(tagG, g.toKV.internal) .and(tagH, h.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).map { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).map { kv => val (key, result) = (kv.getKey, kv.getValue) (key, (result.getAll(tagA).asScala, result.getAll(tagB).asScala, result.getAll(tagC).asScala, result.getAll(tagD).asScala, result.getAll(tagE).asScala, result.getAll(tagF).asScala, result.getAll(tagG).asScala, result.getAll(tagH).asScala)) } @@ -155,8 +155,8 @@ object MultiJoin { .and(tagG, g.toKV.internal) .and(tagH, h.toKV.internal) .and(tagI, i.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).map { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).map { kv => val (key, result) = (kv.getKey, kv.getValue) (key, (result.getAll(tagA).asScala, result.getAll(tagB).asScala, result.getAll(tagC).asScala, result.getAll(tagD).asScala, result.getAll(tagE).asScala, result.getAll(tagF).asScala, result.getAll(tagG).asScala, result.getAll(tagH).asScala, result.getAll(tagI).asScala)) } @@ -175,8 +175,8 @@ object MultiJoin { .and(tagH, h.toKV.internal) .and(tagI, i.toKV.internal) .and(tagJ, j.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).map { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).map { kv => val (key, result) = (kv.getKey, kv.getValue) (key, (result.getAll(tagA).asScala, result.getAll(tagB).asScala, result.getAll(tagC).asScala, result.getAll(tagD).asScala, result.getAll(tagE).asScala, result.getAll(tagF).asScala, result.getAll(tagG).asScala, result.getAll(tagH).asScala, result.getAll(tagI).asScala, result.getAll(tagJ).asScala)) } @@ -196,8 +196,8 @@ object MultiJoin { .and(tagI, i.toKV.internal) .and(tagJ, j.toKV.internal) .and(tagK, k.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).map { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).map { kv => val (key, result) = (kv.getKey, kv.getValue) (key, (result.getAll(tagA).asScala, result.getAll(tagB).asScala, result.getAll(tagC).asScala, result.getAll(tagD).asScala, result.getAll(tagE).asScala, result.getAll(tagF).asScala, result.getAll(tagG).asScala, result.getAll(tagH).asScala, result.getAll(tagI).asScala, result.getAll(tagJ).asScala, result.getAll(tagK).asScala)) } @@ -218,8 +218,8 @@ object MultiJoin { .and(tagJ, j.toKV.internal) .and(tagK, k.toKV.internal) .and(tagL, l.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).map { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).map { kv => val (key, result) = (kv.getKey, kv.getValue) (key, (result.getAll(tagA).asScala, result.getAll(tagB).asScala, result.getAll(tagC).asScala, result.getAll(tagD).asScala, result.getAll(tagE).asScala, result.getAll(tagF).asScala, result.getAll(tagG).asScala, result.getAll(tagH).asScala, result.getAll(tagI).asScala, result.getAll(tagJ).asScala, result.getAll(tagK).asScala, result.getAll(tagL).asScala)) } @@ -241,8 +241,8 @@ object MultiJoin { .and(tagK, k.toKV.internal) .and(tagL, l.toKV.internal) .and(tagM, m.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).map { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).map { kv => val (key, result) = (kv.getKey, kv.getValue) (key, (result.getAll(tagA).asScala, result.getAll(tagB).asScala, result.getAll(tagC).asScala, result.getAll(tagD).asScala, result.getAll(tagE).asScala, result.getAll(tagF).asScala, result.getAll(tagG).asScala, result.getAll(tagH).asScala, result.getAll(tagI).asScala, result.getAll(tagJ).asScala, result.getAll(tagK).asScala, result.getAll(tagL).asScala, result.getAll(tagM).asScala)) } @@ -265,8 +265,8 @@ object MultiJoin { .and(tagL, l.toKV.internal) .and(tagM, m.toKV.internal) .and(tagN, n.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).map { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).map { kv => val (key, result) = (kv.getKey, kv.getValue) (key, (result.getAll(tagA).asScala, result.getAll(tagB).asScala, result.getAll(tagC).asScala, result.getAll(tagD).asScala, result.getAll(tagE).asScala, result.getAll(tagF).asScala, result.getAll(tagG).asScala, result.getAll(tagH).asScala, result.getAll(tagI).asScala, result.getAll(tagJ).asScala, result.getAll(tagK).asScala, result.getAll(tagL).asScala, result.getAll(tagM).asScala, result.getAll(tagN).asScala)) } @@ -290,8 +290,8 @@ object MultiJoin { .and(tagM, m.toKV.internal) .and(tagN, n.toKV.internal) .and(tagO, o.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).map { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).map { kv => val (key, result) = (kv.getKey, kv.getValue) (key, (result.getAll(tagA).asScala, result.getAll(tagB).asScala, result.getAll(tagC).asScala, result.getAll(tagD).asScala, result.getAll(tagE).asScala, result.getAll(tagF).asScala, result.getAll(tagG).asScala, result.getAll(tagH).asScala, result.getAll(tagI).asScala, result.getAll(tagJ).asScala, result.getAll(tagK).asScala, result.getAll(tagL).asScala, result.getAll(tagM).asScala, result.getAll(tagN).asScala, result.getAll(tagO).asScala)) } @@ -316,8 +316,8 @@ object MultiJoin { .and(tagN, n.toKV.internal) .and(tagO, o.toKV.internal) .and(tagP, p.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).map { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).map { kv => val (key, result) = (kv.getKey, kv.getValue) (key, (result.getAll(tagA).asScala, result.getAll(tagB).asScala, result.getAll(tagC).asScala, result.getAll(tagD).asScala, result.getAll(tagE).asScala, result.getAll(tagF).asScala, result.getAll(tagG).asScala, result.getAll(tagH).asScala, result.getAll(tagI).asScala, result.getAll(tagJ).asScala, result.getAll(tagK).asScala, result.getAll(tagL).asScala, result.getAll(tagM).asScala, result.getAll(tagN).asScala, result.getAll(tagO).asScala, result.getAll(tagP).asScala)) } @@ -343,8 +343,8 @@ object MultiJoin { .and(tagO, o.toKV.internal) .and(tagP, p.toKV.internal) .and(tagQ, q.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).map { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).map { kv => val (key, result) = (kv.getKey, kv.getValue) (key, (result.getAll(tagA).asScala, result.getAll(tagB).asScala, result.getAll(tagC).asScala, result.getAll(tagD).asScala, result.getAll(tagE).asScala, result.getAll(tagF).asScala, result.getAll(tagG).asScala, result.getAll(tagH).asScala, result.getAll(tagI).asScala, result.getAll(tagJ).asScala, result.getAll(tagK).asScala, result.getAll(tagL).asScala, result.getAll(tagM).asScala, result.getAll(tagN).asScala, result.getAll(tagO).asScala, result.getAll(tagP).asScala, result.getAll(tagQ).asScala)) } @@ -371,8 +371,8 @@ object MultiJoin { .and(tagP, p.toKV.internal) .and(tagQ, q.toKV.internal) .and(tagR, r.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).map { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).map { kv => val (key, result) = (kv.getKey, kv.getValue) (key, (result.getAll(tagA).asScala, result.getAll(tagB).asScala, result.getAll(tagC).asScala, result.getAll(tagD).asScala, result.getAll(tagE).asScala, result.getAll(tagF).asScala, result.getAll(tagG).asScala, result.getAll(tagH).asScala, result.getAll(tagI).asScala, result.getAll(tagJ).asScala, result.getAll(tagK).asScala, result.getAll(tagL).asScala, result.getAll(tagM).asScala, result.getAll(tagN).asScala, result.getAll(tagO).asScala, result.getAll(tagP).asScala, result.getAll(tagQ).asScala, result.getAll(tagR).asScala)) } @@ -400,8 +400,8 @@ object MultiJoin { .and(tagQ, q.toKV.internal) .and(tagR, r.toKV.internal) .and(tagS, s.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).map { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).map { kv => val (key, result) = (kv.getKey, kv.getValue) (key, (result.getAll(tagA).asScala, result.getAll(tagB).asScala, result.getAll(tagC).asScala, result.getAll(tagD).asScala, result.getAll(tagE).asScala, result.getAll(tagF).asScala, result.getAll(tagG).asScala, result.getAll(tagH).asScala, result.getAll(tagI).asScala, result.getAll(tagJ).asScala, result.getAll(tagK).asScala, result.getAll(tagL).asScala, result.getAll(tagM).asScala, result.getAll(tagN).asScala, result.getAll(tagO).asScala, result.getAll(tagP).asScala, result.getAll(tagQ).asScala, result.getAll(tagR).asScala, result.getAll(tagS).asScala)) } @@ -430,8 +430,8 @@ object MultiJoin { .and(tagR, r.toKV.internal) .and(tagS, s.toKV.internal) .and(tagT, t.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).map { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).map { kv => val (key, result) = (kv.getKey, kv.getValue) (key, (result.getAll(tagA).asScala, result.getAll(tagB).asScala, result.getAll(tagC).asScala, result.getAll(tagD).asScala, result.getAll(tagE).asScala, result.getAll(tagF).asScala, result.getAll(tagG).asScala, result.getAll(tagH).asScala, result.getAll(tagI).asScala, result.getAll(tagJ).asScala, result.getAll(tagK).asScala, result.getAll(tagL).asScala, result.getAll(tagM).asScala, result.getAll(tagN).asScala, result.getAll(tagO).asScala, result.getAll(tagP).asScala, result.getAll(tagQ).asScala, result.getAll(tagR).asScala, result.getAll(tagS).asScala, result.getAll(tagT).asScala)) } @@ -461,8 +461,8 @@ object MultiJoin { .and(tagS, s.toKV.internal) .and(tagT, t.toKV.internal) .and(tagU, u.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).map { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).map { kv => val (key, result) = (kv.getKey, kv.getValue) (key, (result.getAll(tagA).asScala, result.getAll(tagB).asScala, result.getAll(tagC).asScala, result.getAll(tagD).asScala, result.getAll(tagE).asScala, result.getAll(tagF).asScala, result.getAll(tagG).asScala, result.getAll(tagH).asScala, result.getAll(tagI).asScala, result.getAll(tagJ).asScala, result.getAll(tagK).asScala, result.getAll(tagL).asScala, result.getAll(tagM).asScala, result.getAll(tagN).asScala, result.getAll(tagO).asScala, result.getAll(tagP).asScala, result.getAll(tagQ).asScala, result.getAll(tagR).asScala, result.getAll(tagS).asScala, result.getAll(tagT).asScala, result.getAll(tagU).asScala)) } @@ -493,8 +493,8 @@ object MultiJoin { .and(tagT, t.toKV.internal) .and(tagU, u.toKV.internal) .and(tagV, v.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).map { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).map { kv => val (key, result) = (kv.getKey, kv.getValue) (key, (result.getAll(tagA).asScala, result.getAll(tagB).asScala, result.getAll(tagC).asScala, result.getAll(tagD).asScala, result.getAll(tagE).asScala, result.getAll(tagF).asScala, result.getAll(tagG).asScala, result.getAll(tagH).asScala, result.getAll(tagI).asScala, result.getAll(tagJ).asScala, result.getAll(tagK).asScala, result.getAll(tagL).asScala, result.getAll(tagM).asScala, result.getAll(tagN).asScala, result.getAll(tagO).asScala, result.getAll(tagP).asScala, result.getAll(tagQ).asScala, result.getAll(tagR).asScala, result.getAll(tagS).asScala, result.getAll(tagT).asScala, result.getAll(tagU).asScala, result.getAll(tagV).asScala)) } @@ -505,8 +505,8 @@ object MultiJoin { val keyed = KeyedPCollectionTuple .of(tagA, a.toKV.internal) .and(tagB, b.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { b <- result.getAll(tagB).asScala.iterator @@ -521,8 +521,8 @@ object MultiJoin { .of(tagA, a.toKV.internal) .and(tagB, b.toKV.internal) .and(tagC, c.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { c <- result.getAll(tagC).asScala.iterator @@ -539,8 +539,8 @@ object MultiJoin { .and(tagB, b.toKV.internal) .and(tagC, c.toKV.internal) .and(tagD, d.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { d <- result.getAll(tagD).asScala.iterator @@ -559,8 +559,8 @@ object MultiJoin { .and(tagC, c.toKV.internal) .and(tagD, d.toKV.internal) .and(tagE, e.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { e <- result.getAll(tagE).asScala.iterator @@ -581,8 +581,8 @@ object MultiJoin { .and(tagD, d.toKV.internal) .and(tagE, e.toKV.internal) .and(tagF, f.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { f <- result.getAll(tagF).asScala.iterator @@ -605,8 +605,8 @@ object MultiJoin { .and(tagE, e.toKV.internal) .and(tagF, f.toKV.internal) .and(tagG, g.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { g <- result.getAll(tagG).asScala.iterator @@ -631,8 +631,8 @@ object MultiJoin { .and(tagF, f.toKV.internal) .and(tagG, g.toKV.internal) .and(tagH, h.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { h <- result.getAll(tagH).asScala.iterator @@ -659,8 +659,8 @@ object MultiJoin { .and(tagG, g.toKV.internal) .and(tagH, h.toKV.internal) .and(tagI, i.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { i <- result.getAll(tagI).asScala.iterator @@ -689,8 +689,8 @@ object MultiJoin { .and(tagH, h.toKV.internal) .and(tagI, i.toKV.internal) .and(tagJ, j.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { j <- result.getAll(tagJ).asScala.iterator @@ -721,8 +721,8 @@ object MultiJoin { .and(tagI, i.toKV.internal) .and(tagJ, j.toKV.internal) .and(tagK, k.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { k <- result.getAll(tagK).asScala.iterator @@ -755,8 +755,8 @@ object MultiJoin { .and(tagJ, j.toKV.internal) .and(tagK, k.toKV.internal) .and(tagL, l.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { l <- result.getAll(tagL).asScala.iterator @@ -791,8 +791,8 @@ object MultiJoin { .and(tagK, k.toKV.internal) .and(tagL, l.toKV.internal) .and(tagM, m.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { m <- result.getAll(tagM).asScala.iterator @@ -829,8 +829,8 @@ object MultiJoin { .and(tagL, l.toKV.internal) .and(tagM, m.toKV.internal) .and(tagN, n.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { n <- result.getAll(tagN).asScala.iterator @@ -869,8 +869,8 @@ object MultiJoin { .and(tagM, m.toKV.internal) .and(tagN, n.toKV.internal) .and(tagO, o.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { o <- result.getAll(tagO).asScala.iterator @@ -911,8 +911,8 @@ object MultiJoin { .and(tagN, n.toKV.internal) .and(tagO, o.toKV.internal) .and(tagP, p.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { p <- result.getAll(tagP).asScala.iterator @@ -955,8 +955,8 @@ object MultiJoin { .and(tagO, o.toKV.internal) .and(tagP, p.toKV.internal) .and(tagQ, q.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { q <- result.getAll(tagQ).asScala.iterator @@ -1001,8 +1001,8 @@ object MultiJoin { .and(tagP, p.toKV.internal) .and(tagQ, q.toKV.internal) .and(tagR, r.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { r <- result.getAll(tagR).asScala.iterator @@ -1049,8 +1049,8 @@ object MultiJoin { .and(tagQ, q.toKV.internal) .and(tagR, r.toKV.internal) .and(tagS, s.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { s <- result.getAll(tagS).asScala.iterator @@ -1099,8 +1099,8 @@ object MultiJoin { .and(tagR, r.toKV.internal) .and(tagS, s.toKV.internal) .and(tagT, t.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { t <- result.getAll(tagT).asScala.iterator @@ -1151,8 +1151,8 @@ object MultiJoin { .and(tagS, s.toKV.internal) .and(tagT, t.toKV.internal) .and(tagU, u.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { u <- result.getAll(tagU).asScala.iterator @@ -1205,8 +1205,8 @@ object MultiJoin { .and(tagT, t.toKV.internal) .and(tagU, u.toKV.internal) .and(tagV, v.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { v <- result.getAll(tagV).asScala.iterator @@ -1240,8 +1240,8 @@ object MultiJoin { val keyed = KeyedPCollectionTuple .of(tagA, a.toKV.internal) .and(tagB, b.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { b <- toOptions(result.getAll(tagB).asScala.iterator) @@ -1256,8 +1256,8 @@ object MultiJoin { .of(tagA, a.toKV.internal) .and(tagB, b.toKV.internal) .and(tagC, c.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { c <- toOptions(result.getAll(tagC).asScala.iterator) @@ -1274,8 +1274,8 @@ object MultiJoin { .and(tagB, b.toKV.internal) .and(tagC, c.toKV.internal) .and(tagD, d.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { d <- toOptions(result.getAll(tagD).asScala.iterator) @@ -1294,8 +1294,8 @@ object MultiJoin { .and(tagC, c.toKV.internal) .and(tagD, d.toKV.internal) .and(tagE, e.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { e <- toOptions(result.getAll(tagE).asScala.iterator) @@ -1316,8 +1316,8 @@ object MultiJoin { .and(tagD, d.toKV.internal) .and(tagE, e.toKV.internal) .and(tagF, f.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { f <- toOptions(result.getAll(tagF).asScala.iterator) @@ -1340,8 +1340,8 @@ object MultiJoin { .and(tagE, e.toKV.internal) .and(tagF, f.toKV.internal) .and(tagG, g.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { g <- toOptions(result.getAll(tagG).asScala.iterator) @@ -1366,8 +1366,8 @@ object MultiJoin { .and(tagF, f.toKV.internal) .and(tagG, g.toKV.internal) .and(tagH, h.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { h <- toOptions(result.getAll(tagH).asScala.iterator) @@ -1394,8 +1394,8 @@ object MultiJoin { .and(tagG, g.toKV.internal) .and(tagH, h.toKV.internal) .and(tagI, i.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { i <- toOptions(result.getAll(tagI).asScala.iterator) @@ -1424,8 +1424,8 @@ object MultiJoin { .and(tagH, h.toKV.internal) .and(tagI, i.toKV.internal) .and(tagJ, j.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { j <- toOptions(result.getAll(tagJ).asScala.iterator) @@ -1456,8 +1456,8 @@ object MultiJoin { .and(tagI, i.toKV.internal) .and(tagJ, j.toKV.internal) .and(tagK, k.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { k <- toOptions(result.getAll(tagK).asScala.iterator) @@ -1490,8 +1490,8 @@ object MultiJoin { .and(tagJ, j.toKV.internal) .and(tagK, k.toKV.internal) .and(tagL, l.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { l <- toOptions(result.getAll(tagL).asScala.iterator) @@ -1526,8 +1526,8 @@ object MultiJoin { .and(tagK, k.toKV.internal) .and(tagL, l.toKV.internal) .and(tagM, m.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { m <- toOptions(result.getAll(tagM).asScala.iterator) @@ -1564,8 +1564,8 @@ object MultiJoin { .and(tagL, l.toKV.internal) .and(tagM, m.toKV.internal) .and(tagN, n.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { n <- toOptions(result.getAll(tagN).asScala.iterator) @@ -1604,8 +1604,8 @@ object MultiJoin { .and(tagM, m.toKV.internal) .and(tagN, n.toKV.internal) .and(tagO, o.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { o <- toOptions(result.getAll(tagO).asScala.iterator) @@ -1646,8 +1646,8 @@ object MultiJoin { .and(tagN, n.toKV.internal) .and(tagO, o.toKV.internal) .and(tagP, p.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { p <- toOptions(result.getAll(tagP).asScala.iterator) @@ -1690,8 +1690,8 @@ object MultiJoin { .and(tagO, o.toKV.internal) .and(tagP, p.toKV.internal) .and(tagQ, q.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { q <- toOptions(result.getAll(tagQ).asScala.iterator) @@ -1736,8 +1736,8 @@ object MultiJoin { .and(tagP, p.toKV.internal) .and(tagQ, q.toKV.internal) .and(tagR, r.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { r <- toOptions(result.getAll(tagR).asScala.iterator) @@ -1784,8 +1784,8 @@ object MultiJoin { .and(tagQ, q.toKV.internal) .and(tagR, r.toKV.internal) .and(tagS, s.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { s <- toOptions(result.getAll(tagS).asScala.iterator) @@ -1834,8 +1834,8 @@ object MultiJoin { .and(tagR, r.toKV.internal) .and(tagS, s.toKV.internal) .and(tagT, t.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { t <- toOptions(result.getAll(tagT).asScala.iterator) @@ -1886,8 +1886,8 @@ object MultiJoin { .and(tagS, s.toKV.internal) .and(tagT, t.toKV.internal) .and(tagU, u.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { u <- toOptions(result.getAll(tagU).asScala.iterator) @@ -1940,8 +1940,8 @@ object MultiJoin { .and(tagT, t.toKV.internal) .and(tagU, u.toKV.internal) .and(tagV, v.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { v <- toOptions(result.getAll(tagV).asScala.iterator) @@ -1975,8 +1975,8 @@ object MultiJoin { val keyed = KeyedPCollectionTuple .of(tagA, a.toKV.internal) .and(tagB, b.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { b <- toOptions(result.getAll(tagB).asScala.iterator) @@ -1991,8 +1991,8 @@ object MultiJoin { .of(tagA, a.toKV.internal) .and(tagB, b.toKV.internal) .and(tagC, c.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { c <- toOptions(result.getAll(tagC).asScala.iterator) @@ -2009,8 +2009,8 @@ object MultiJoin { .and(tagB, b.toKV.internal) .and(tagC, c.toKV.internal) .and(tagD, d.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { d <- toOptions(result.getAll(tagD).asScala.iterator) @@ -2029,8 +2029,8 @@ object MultiJoin { .and(tagC, c.toKV.internal) .and(tagD, d.toKV.internal) .and(tagE, e.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { e <- toOptions(result.getAll(tagE).asScala.iterator) @@ -2051,8 +2051,8 @@ object MultiJoin { .and(tagD, d.toKV.internal) .and(tagE, e.toKV.internal) .and(tagF, f.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { f <- toOptions(result.getAll(tagF).asScala.iterator) @@ -2075,8 +2075,8 @@ object MultiJoin { .and(tagE, e.toKV.internal) .and(tagF, f.toKV.internal) .and(tagG, g.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { g <- toOptions(result.getAll(tagG).asScala.iterator) @@ -2101,8 +2101,8 @@ object MultiJoin { .and(tagF, f.toKV.internal) .and(tagG, g.toKV.internal) .and(tagH, h.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { h <- toOptions(result.getAll(tagH).asScala.iterator) @@ -2129,8 +2129,8 @@ object MultiJoin { .and(tagG, g.toKV.internal) .and(tagH, h.toKV.internal) .and(tagI, i.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { i <- toOptions(result.getAll(tagI).asScala.iterator) @@ -2159,8 +2159,8 @@ object MultiJoin { .and(tagH, h.toKV.internal) .and(tagI, i.toKV.internal) .and(tagJ, j.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { j <- toOptions(result.getAll(tagJ).asScala.iterator) @@ -2191,8 +2191,8 @@ object MultiJoin { .and(tagI, i.toKV.internal) .and(tagJ, j.toKV.internal) .and(tagK, k.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { k <- toOptions(result.getAll(tagK).asScala.iterator) @@ -2225,8 +2225,8 @@ object MultiJoin { .and(tagJ, j.toKV.internal) .and(tagK, k.toKV.internal) .and(tagL, l.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { l <- toOptions(result.getAll(tagL).asScala.iterator) @@ -2261,8 +2261,8 @@ object MultiJoin { .and(tagK, k.toKV.internal) .and(tagL, l.toKV.internal) .and(tagM, m.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { m <- toOptions(result.getAll(tagM).asScala.iterator) @@ -2299,8 +2299,8 @@ object MultiJoin { .and(tagL, l.toKV.internal) .and(tagM, m.toKV.internal) .and(tagN, n.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { n <- toOptions(result.getAll(tagN).asScala.iterator) @@ -2339,8 +2339,8 @@ object MultiJoin { .and(tagM, m.toKV.internal) .and(tagN, n.toKV.internal) .and(tagO, o.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { o <- toOptions(result.getAll(tagO).asScala.iterator) @@ -2381,8 +2381,8 @@ object MultiJoin { .and(tagN, n.toKV.internal) .and(tagO, o.toKV.internal) .and(tagP, p.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { p <- toOptions(result.getAll(tagP).asScala.iterator) @@ -2425,8 +2425,8 @@ object MultiJoin { .and(tagO, o.toKV.internal) .and(tagP, p.toKV.internal) .and(tagQ, q.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { q <- toOptions(result.getAll(tagQ).asScala.iterator) @@ -2471,8 +2471,8 @@ object MultiJoin { .and(tagP, p.toKV.internal) .and(tagQ, q.toKV.internal) .and(tagR, r.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { r <- toOptions(result.getAll(tagR).asScala.iterator) @@ -2519,8 +2519,8 @@ object MultiJoin { .and(tagQ, q.toKV.internal) .and(tagR, r.toKV.internal) .and(tagS, s.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { s <- toOptions(result.getAll(tagS).asScala.iterator) @@ -2569,8 +2569,8 @@ object MultiJoin { .and(tagR, r.toKV.internal) .and(tagS, s.toKV.internal) .and(tagT, t.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { t <- toOptions(result.getAll(tagT).asScala.iterator) @@ -2621,8 +2621,8 @@ object MultiJoin { .and(tagS, s.toKV.internal) .and(tagT, t.toKV.internal) .and(tagU, u.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { u <- toOptions(result.getAll(tagU).asScala.iterator) @@ -2675,8 +2675,8 @@ object MultiJoin { .and(tagT, t.toKV.internal) .and(tagU, u.toKV.internal) .and(tagV, v.toKV.internal) - .apply(CallSites.getCurrent, CoGroupByKey.create()) - a.context.wrap(keyed).flatMap { kv => + .apply("CoGroupByKey", CoGroupByKey.create()) + a.context.wrap(keyed).withName(this.tfName).flatMap { kv => val (key, result) = (kv.getKey, kv.getValue) for { v <- toOptions(result.getAll(tagV).asScala.iterator) diff --git a/scio-core/src/main/scala/com/spotify/scio/values/PCollectionWrapper.scala b/scio-core/src/main/scala/com/spotify/scio/values/PCollectionWrapper.scala index ad1fc59066..cdf5de82f8 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/PCollectionWrapper.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/PCollectionWrapper.scala @@ -20,12 +20,11 @@ package com.spotify.scio.values import com.google.cloud.dataflow.sdk.coders.Coder import com.google.cloud.dataflow.sdk.transforms.{Combine, DoFn, PTransform, ParDo} import com.google.cloud.dataflow.sdk.values.{KV, PCollection, POutput} -import com.spotify.scio.util.CallSites import com.spotify.scio.{Implicits, ScioContext} import scala.reflect.ClassTag -private[values] trait PCollectionWrapper[T] { +private[values] trait PCollectionWrapper[T] extends TransformNameable { import Implicits._ @@ -39,7 +38,7 @@ private[values] trait PCollectionWrapper[T] { private[scio] def applyInternal[Output <: POutput] (transform: PTransform[_ >: PCollection[T], Output]): Output = - internal.apply(CallSites.getCurrent, transform) + internal.apply(this.tfName, transform) protected def pApply[U: ClassTag] (transform: PTransform[_ >: PCollection[T], PCollection[U]]): SCollection[U] = { diff --git a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala index 1fa25ffa97..3416ff88c1 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala @@ -60,8 +60,8 @@ object SCollection { def unionAll[T: ClassTag](scs: Iterable[SCollection[T]]): SCollection[T] = { val o = PCollectionList .of(scs.map(_.internal).asJava) - .apply(CallSites.getCurrent, Flatten.pCollections()) - new SCollectionImpl(o, scs.head.context) + .apply("UnionAll", Flatten.pCollections()) + scs.head.context.wrap(o) } import scala.language.implicitConversions @@ -126,7 +126,7 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] { /** Apply a transform. */ private[values] def transform[U: ClassTag](f: SCollection[T] => SCollection[U]) : SCollection[U] = { - val o = internal.apply(CallSites.getCurrent, new PTransform[PCollection[T], PCollection[U]]() { + val o = internal.apply(this.tfName, new PTransform[PCollection[T], PCollection[U]]() { override def apply(input: PCollection[T]): PCollection[U] = { f(context.wrap(input)).internal } @@ -163,7 +163,7 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] { def union(that: SCollection[T]): SCollection[T] = { val o = PCollectionList .of(internal).and(that.internal) - .apply(CallSites.getCurrent, Flatten.pCollections()) + .apply(this.tfName, Flatten.pCollections()) context.wrap(o) } diff --git a/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithHotKeyFanout.scala b/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithHotKeyFanout.scala index feaebde478..0685965429 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithHotKeyFanout.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithHotKeyFanout.scala @@ -31,7 +31,8 @@ import scala.reflect.ClassTag */ class SCollectionWithHotKeyFanout[K: ClassTag, V: ClassTag] (private val self: PairSCollectionFunctions[K, V], - private val hotKeyFanout: Either[K => Int, Int]) { + private val hotKeyFanout: Either[K => Int, Int]) + extends TransformNameable { private def withFanout[K, I, O](combine: Combine.PerKey[K, I, O]) : PerKeyWithHotKeyFanout[K, I, O] = this.hotKeyFanout match { @@ -42,6 +43,11 @@ class SCollectionWithHotKeyFanout[K: ClassTag, V: ClassTag] combine.withHotKeyFanout(f) } + override def withName(name: String): this.type = { + self.self.withName(name) + this + } + /** * Aggregate the values of each key, using given combine functions and a neutral "zero value". * This function can return a different result type, U, than the type of the values in this diff --git a/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithSideInput.scala b/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithSideInput.scala index 1df6507464..dcae4da52d 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithSideInput.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithSideInput.scala @@ -106,7 +106,7 @@ class SCollectionWithSideInput[T: ClassTag] private[values] (val internal: PColl .withOutputTags(_mainTag.tupleTag, sideTags) .of(transformWithSideOutputsFn(sideOutputs, f)) - val pCollectionWrapper = this.internal.apply(CallSites.getCurrent, transform) + val pCollectionWrapper = this.internal.apply("TransformWithSideOutputs", transform) pCollectionWrapper.getAll.asScala .mapValues(context.wrap(_).asInstanceOf[SCollection[T]].setCoder(internal.getCoder)) .flatMap{ case(tt, col) => Try{tagToSide(tt.getId) -> col}.toOption } diff --git a/scio-core/src/main/scala/com/spotify/scio/values/TransformNameable.scala b/scio-core/src/main/scala/com/spotify/scio/values/TransformNameable.scala new file mode 100644 index 0000000000..5cc9f54fcf --- /dev/null +++ b/scio-core/src/main/scala/com/spotify/scio/values/TransformNameable.scala @@ -0,0 +1,47 @@ +/* + * Copyright 2016 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.values + +import com.spotify.scio.util.CallSites + +trait TransformNameable { + private var nameProvider: TransformNameProvider = CallSiteNameProvider + + private[scio] def tfName: String = { + val n = nameProvider.name + nameProvider = CallSiteNameProvider + n + } + + def withName(name: String): this.type = { + require(nameProvider.getClass != classOf[ConstNameProvider], + s"withName() has already been used to set '${tfName}' as the name for the next transform.") + nameProvider = new ConstNameProvider(name) + this + } +} + +private trait TransformNameProvider { + def name: String +} + +private object CallSiteNameProvider extends TransformNameProvider { + def name: String = CallSites.getCurrent +} + +private class ConstNameProvider(val name: String) extends TransformNameProvider diff --git a/scio-test/src/test/scala/com/spotify/scio/values/NamedTransformTest.scala b/scio-test/src/test/scala/com/spotify/scio/values/NamedTransformTest.scala new file mode 100644 index 0000000000..9b80ab42ad --- /dev/null +++ b/scio-test/src/test/scala/com/spotify/scio/values/NamedTransformTest.scala @@ -0,0 +1,153 @@ +/* + * Copyright 2016 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.values + +import com.spotify.scio.testing.PipelineSpec +import com.spotify.scio.util.MultiJoin + +class NamedTransformTest extends PipelineSpec { + + "ScioContext" should "support custom transform name" in { + runWithContext { sc => + val p = sc.withName("ReadInput").parallelize(Seq("a", "b", "c")) + assertTransformNameEquals(p, "ReadInput/Read(InMemorySource)") + } + } + + "SCollection" should "support custom transform name" in { + runWithContext { sc => + val p = sc.parallelize(Seq(1, 2, 3, 4, 5)) + .map(_ * 3) + .withName("OnlyEven").filter(_ % 2 == 0) + assertTransformNameEquals(p, "OnlyEven/Filter") + } + } + + "DoubleSCollectionFunctions" should "support custom transform name" in { + runWithContext { sc => + val p = sc.parallelize(Seq(1.0, 2.0, 3.0, 4.0, 5.0)) + .withName("CalcVariance").variance + assertOuterTransformNameEquals(p, "CalcVariance") + } + } + + "PairSCollectionFunctions" should "support custom transform name" in { + runWithContext { sc => + val p = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3))) + .withName("SumPerKey").sumByKey + assertTransformNameEquals(p, "SumPerKey/KvToTuple") + } + } + + "SCollectionWithAccumulator" should "support custom transform name" in { + runWithContext { sc => + val intSum = sc.sumAccumulator[Int]("IntSum") + val p = sc.parallelize(Seq(1, 2, 3, 4, 5)) + .withAccumulator(intSum) + .withName("TripleSum").map { (i, c) => + val n = i * 3 + c.addValue(intSum, n) + n + } + assertTransformNameEquals(p, "TripleSum") + } + } + + "SCollectionWithFanout" should "support custom transform name" in { + runWithContext { sc => + val p = sc.parallelize(Seq(1, 2, 3)).withFanout(10) + .withName("Sum").sum + assertTransformNameEquals(p, "Sum/Values/Values") + } + } + + "SCollectionWithHotKeyFanout" should "support custom transform name" in { + runWithContext { sc => + val p = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3))).withHotKeyFanout(10) + .withName("Sum").sumByKey + assertTransformNameEquals(p, "Sum/KvToTuple") + } + } + + "SCollectionWithSideInput" should "support custom transform name" in { + runWithContext { sc => + val p1 = sc.parallelize(Seq("a", "b", "c")) + val p2 = sc.parallelize(Seq(1, 2, 3)).asListSideInput + val s = p1.withSideInputs(p2) + .withName("GetX").filter((x, s) => x == "a") + assertTransformNameEquals(s, "GetX") + } + } + + "SCollectionWithSideOutput" should "support custom transform name" in { + runWithContext { sc => + val p1 = sc.parallelize(Seq("a", "b", "c")) + val p2 = SideOutput[String]() + val (main, side) = p1.withSideOutputs(p2) + .withName("MakeSideOutput").map { (x, s) => s.output(p2, x + "2"); x + "1" } + assertTransformNameEquals(main, "MakeSideOutput") + assertTransformNameEquals(side(p2), "MakeSideOutput") + } + } + + "WindowedSCollection" should "support custom transform name" in { + runWithContext { sc => + val p = sc.parallelize(Seq(1, 2, 3, 4, 5)) + .toWindowed + .withName("Triple").map(x => x.withValue(x.value * 3)) + assertTransformNameEquals(p, "Triple") + } + } + + "MultiJoin" should "support custom transform name" in { + runWithContext { sc => + val p1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 3), ("c", 4))) + val p2 = sc.parallelize(Seq(("a", 11), ("b", 12), ("b", 13), ("d", 14))) + val p = MultiJoin.withName("JoinEm").left(p1, p2) + assertTransformNameEquals(p, "JoinEm") + } + } + + "Duplicate transform name" should "have number to make unique" in { + runWithContext { sc => + val p1 = sc.parallelize(1 to 5) + .withName("MyTransform").map(_ * 2) + val p2 = p1 + .withName("MyTransform").map(_ * 3) + val p3 = p1 + .withName("MyTransform").map(_ * 4) + assertTransformNameEquals(p1, "MyTransform") + assertTransformNameEquals(p2, "MyTransform2") + assertTransformNameEquals(p3, "MyTransform3") + } + } + + "TransformNameable" should "prevent repeated calls to .withName" in { + intercept[IllegalArgumentException](runWithContext { sc => + val p1 = sc.parallelize(1 to 5) + .withName("Double").withName("DoubleMap").map(_ * 2) + }).getMessage shouldBe "requirement failed: withName() has already been used to set 'Double'" + + " as the name for the next transform." + } + + private def assertTransformNameEquals(p: PCollectionWrapper[_], tfName: String) = + p.internal.getProducingTransformInternal.getFullName shouldBe tfName + + private def assertOuterTransformNameEquals(p: PCollectionWrapper[_], tfName: String) = + p.internal.getProducingTransformInternal.getFullName.split("/").head shouldBe tfName +} diff --git a/scripts/multijoin.py b/scripts/multijoin.py index d4306e4e46..1f995426ee 100755 --- a/scripts/multijoin.py +++ b/scripts/multijoin.py @@ -61,9 +61,9 @@ def cogroup(out, n): print >> out, ' .of(tagA, a.toKV.internal)' for x in vals[1:]: print >> out, ' .and(tag%s, %s.toKV.internal)' % (x, x.lower()) - print >> out, ' .apply(CallSites.getCurrent, CoGroupByKey.create())' + print >> out, ' .apply("CoGroupByKey", CoGroupByKey.create())' - print >> out, ' a.context.wrap(keyed).map { kv =>' + print >> out, ' a.context.wrap(keyed).withName(this.tfName).map { kv =>' print >> out, ' val (key, result) = (kv.getKey, kv.getValue)' print >> out, ' (key, (%s))' % ', '.join('result.getAll(tag%s).asScala' % x for x in vals) # NOQA print >> out, ' }' @@ -85,9 +85,9 @@ def join(out, n): print >> out, ' .of(tagA, a.toKV.internal)' for x in vals[1:]: print >> out, ' .and(tag%s, %s.toKV.internal)' % (x, x.lower()) - print >> out, ' .apply(CallSites.getCurrent, CoGroupByKey.create())' + print >> out, ' .apply("CoGroupByKey", CoGroupByKey.create())' - print >> out, ' a.context.wrap(keyed).flatMap { kv =>' + print >> out, ' a.context.wrap(keyed).withName(this.tfName).flatMap { kv =>' print >> out, ' val (key, result) = (kv.getKey, kv.getValue)' print >> out, ' for {' for x in reversed(vals): @@ -111,9 +111,9 @@ def left(out, n): print >> out, ' .of(tagA, a.toKV.internal)' for x in vals[1:]: print >> out, ' .and(tag%s, %s.toKV.internal)' % (x, x.lower()) - print >> out, ' .apply(CallSites.getCurrent, CoGroupByKey.create())' + print >> out, ' .apply("CoGroupByKey", CoGroupByKey.create())' - print >> out, ' a.context.wrap(keyed).flatMap { kv =>' + print >> out, ' a.context.wrap(keyed).withName(this.tfName).flatMap { kv =>' print >> out, ' val (key, result) = (kv.getKey, kv.getValue)' print >> out, ' for {' for (i, x) in enumerate(reversed(vals)): @@ -140,9 +140,9 @@ def outer(out, n): print >> out, ' .of(tagA, a.toKV.internal)' for x in vals[1:]: print >> out, ' .and(tag%s, %s.toKV.internal)' % (x, x.lower()) - print >> out, ' .apply(CallSites.getCurrent, CoGroupByKey.create())' + print >> out, ' .apply("CoGroupByKey", CoGroupByKey.create())' - print >> out, ' a.context.wrap(keyed).flatMap { kv =>' + print >> out, ' a.context.wrap(keyed).withName(this.tfName).flatMap { kv =>' print >> out, ' val (key, result) = (kv.getKey, kv.getValue)' print >> out, ' for {' for (i, x) in enumerate(reversed(vals)): @@ -186,12 +186,12 @@ def main(out): import com.google.cloud.dataflow.sdk.transforms.join.{CoGroupByKey, KeyedPCollectionTuple} # NOQA import com.google.cloud.dataflow.sdk.values.TupleTag import com.google.common.collect.Lists - import com.spotify.scio.values.SCollection + import com.spotify.scio.values.{SCollection, TransformNameable} import scala.collection.JavaConverters._ import scala.reflect.ClassTag - object MultiJoin { + object MultiJoin extends TransformNameable { def toOptions[T](xs: Iterator[T]): Iterator[Option[T]] = if (xs.isEmpty) Iterator(None) else xs.map(Option(_)) ''').replace(' # NOQA', '').lstrip('\n')