Skip to content

Commit

Permalink
No need to use F-bounded type for TransformNameable.
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewsmartin committed Jan 17, 2017
1 parent 64a7247 commit 98c2539
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ object ScioContext {
// scalastyle:off number.of.methods
class ScioContext private[scio] (val options: PipelineOptions,
private var artifacts: List[String])
extends TransformNameable[ScioContext] {
extends TransformNameable {

private implicit val context: ScioContext = this

Expand Down
176 changes: 86 additions & 90 deletions scio-core/src/main/scala/com/spotify/scio/util/MultiJoin.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import com.spotify.scio.{Implicits, ScioContext}

import scala.reflect.ClassTag

private[values] trait PCollectionWrapper[T] {
this: TransformNameable[_ <: PCollectionWrapper[T]] =>
private[values] trait PCollectionWrapper[T] extends TransformNameable {

import Implicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ object SCollection {
* @groupname transform Transformations
* @groupname window Windowing Operations
*/
sealed trait SCollection[T] extends PCollectionWrapper[T] with TransformNameable[SCollection[T]] {
sealed trait SCollection[T] extends PCollectionWrapper[T] {

import TupleFunctions._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.reflect.ClassTag
*/
class SCollectionWithAccumulator[T: ClassTag] private[values]
(val internal: PCollection[T], val context: ScioContext, acc: Seq[Accumulator[_]])
extends PCollectionWrapper[T] with TransformNameable[SCollectionWithAccumulator[T]] {
extends PCollectionWrapper[T] {

protected val ct: ClassTag[T] = implicitly[ClassTag[T]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import scala.reflect.ClassTag
class SCollectionWithFanout[T: ClassTag] private[values] (val internal: PCollection[T],
val context: ScioContext,
private val fanout: Int)
extends PCollectionWrapper[T] with TransformNameable[SCollectionWithFanout[T]] {
extends PCollectionWrapper[T] {

protected val ct: ClassTag[T] = implicitly[ClassTag[T]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import scala.reflect.ClassTag
class SCollectionWithHotKeyFanout[K: ClassTag, V: ClassTag]
(private val self: PairSCollectionFunctions[K, V],
private val hotKeyFanout: Either[K => Int, Int])
extends TransformNameable[SCollectionWithHotKeyFanout[K, V]] {
extends TransformNameable {

private def withFanout[K, I, O](combine: Combine.PerKey[K, I, O])
: PerKeyWithHotKeyFanout[K, I, O] = this.hotKeyFanout match {
Expand All @@ -43,7 +43,7 @@ class SCollectionWithHotKeyFanout[K: ClassTag, V: ClassTag]
combine.withHotKeyFanout(f)
}

override def withName(name: String): SCollectionWithHotKeyFanout[K, V] = {
override def withName(name: String): this.type = {
self.self.withName(name)
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import scala.util.Try
class SCollectionWithSideInput[T: ClassTag] private[values] (val internal: PCollection[T],
val context: ScioContext,
sides: Iterable[SideInput[_]])
extends PCollectionWrapper[T] with TransformNameable[SCollectionWithSideInput[T]] {
extends PCollectionWrapper[T] {

protected val ct: ClassTag[T] = implicitly[ClassTag[T]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class SCollectionWithSideOutput[T: ClassTag] private[values]
(val internal: PCollection[T],
val context: ScioContext,
sides: Iterable[SideOutput[_]])
extends PCollectionWrapper[T] with TransformNameable[SCollectionWithSideOutput[T]] {
extends PCollectionWrapper[T] {

protected val ct: ClassTag[T] = implicitly[ClassTag[T]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.spotify.scio.values

import com.spotify.scio.util.CallSites

trait TransformNameable[T <: TransformNameable[T]] { this: T =>
trait TransformNameable {
private var nameProvider: TransformNameProvider = CallSiteNameProvider

def tfName: String = {
Expand All @@ -28,20 +28,20 @@ trait TransformNameable[T <: TransformNameable[T]] { this: T =>
n
}

def withName(name: String): T = {
def withName(name: String): this.type = {
require(nameProvider.getClass != classOf[ConstNameProvider],
s"withName() has already been used to set '${tfName}' as the name for the next transform.")
nameProvider = new ConstNameProvider(name)
this
}
}

private[scio] trait TransformNameProvider {
private trait TransformNameProvider {
def name: String
}

private[scio] object CallSiteNameProvider extends TransformNameProvider {
private object CallSiteNameProvider extends TransformNameProvider {
def name: String = CallSites.getCurrent
}

private[scio] class ConstNameProvider(val name: String) extends TransformNameProvider
private class ConstNameProvider(val name: String) extends TransformNameProvider
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ case class WindowedValue[T](value: T, timestamp: Instant, window: BoundedWindow,

class WindowedSCollection[T: ClassTag] private[values] (val internal: PCollection[T],
val context: ScioContext)
extends PCollectionWrapper[T] with TransformNameable[WindowedSCollection[T]] {
extends PCollectionWrapper[T] {

protected val ct: ClassTag[T] = implicitly[ClassTag[T]]

Expand Down
16 changes: 6 additions & 10 deletions scripts/multijoin.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def cogroup(out, n):
print >> out, ' .and(tag%s, %s.toKV.internal)' % (x, x.lower())
print >> out, ' .apply("CoGroupByKey", CoGroupByKey.create())'

print >> out, ' a.context.wrap(keyed).withName(this.tfNameProvider.name).map { kv =>'
print >> out, ' a.context.wrap(keyed).withName(this.tfName).map { kv =>'
print >> out, ' val (key, result) = (kv.getKey, kv.getValue)'
print >> out, ' (key, (%s))' % ', '.join('result.getAll(tag%s).asScala' % x for x in vals) # NOQA
print >> out, ' }'
Expand All @@ -87,7 +87,7 @@ def join(out, n):
print >> out, ' .and(tag%s, %s.toKV.internal)' % (x, x.lower())
print >> out, ' .apply("CoGroupByKey", CoGroupByKey.create())'

print >> out, ' a.context.wrap(keyed).withName(this.tfNameProvider.name).flatMap { kv =>'
print >> out, ' a.context.wrap(keyed).withName(this.tfName).flatMap { kv =>'
print >> out, ' val (key, result) = (kv.getKey, kv.getValue)'
print >> out, ' for {'
for x in reversed(vals):
Expand All @@ -113,7 +113,7 @@ def left(out, n):
print >> out, ' .and(tag%s, %s.toKV.internal)' % (x, x.lower())
print >> out, ' .apply("CoGroupByKey", CoGroupByKey.create())'

print >> out, ' a.context.wrap(keyed).withName(this.tfNameProvider.name).flatMap { kv =>'
print >> out, ' a.context.wrap(keyed).withName(this.tfName).flatMap { kv =>'
print >> out, ' val (key, result) = (kv.getKey, kv.getValue)'
print >> out, ' for {'
for (i, x) in enumerate(reversed(vals)):
Expand Down Expand Up @@ -142,7 +142,7 @@ def outer(out, n):
print >> out, ' .and(tag%s, %s.toKV.internal)' % (x, x.lower())
print >> out, ' .apply("CoGroupByKey", CoGroupByKey.create())'

print >> out, ' a.context.wrap(keyed).withName(this.tfNameProvider.name).flatMap { kv =>'
print >> out, ' a.context.wrap(keyed).withName(this.tfName).flatMap { kv =>'
print >> out, ' val (key, result) = (kv.getKey, kv.getValue)'
print >> out, ' for {'
for (i, x) in enumerate(reversed(vals)):
Expand Down Expand Up @@ -186,16 +186,12 @@ def main(out):
import com.google.cloud.dataflow.sdk.transforms.join.{CoGroupByKey, KeyedPCollectionTuple} # NOQA
import com.google.cloud.dataflow.sdk.values.TupleTag
import com.google.common.collect.Lists
import com.spotify.scio.values.{CallSiteNameProvider, ConstNameProvider, SCollection, TransformNameProvider}
import com.spotify.scio.values.{SCollection, TransformNameable}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
object MultiJoin {
private var tfNameProvider: TransformNameProvider = CallSiteNameProvider
def withName(name: String): MultiJoin.type = {tfNameProvider = new ConstNameProvider(name); this}
object MultiJoin extends TransformNameable {
def toOptions[T](xs: Iterator[T]): Iterator[Option[T]] = if (xs.isEmpty) Iterator(None) else xs.map(Option(_))
''').replace(' # NOQA', '').lstrip('\n')
Expand Down

0 comments on commit 98c2539

Please sign in to comment.