-
Notifications
You must be signed in to change notification settings - Fork 513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for named transforms. #357
Conversation
3309992
to
ce3fe78
Compare
I'm OK with the user facing API, but feel like that we should re-work the internals a bit, especially with things carried over in d0e1590. I'd remove the optional argument trait NameProvider {
def apply(): String
}
object CallSitesNameProvider extends NameProvider {
@override def apply(): String = CallSites.getCurrent
}
class StaticNameProvided(private val name: String) extends NameProvider {
@override def apply(): String = name
} This way we can undo some of the optional args (which are always tricky) in d0e1590 |
Basically I'm suggesting that we revert d0e1590 and use the provider approach which is cleaner and more extendable. |
+1, do as part of this PR? |
@@ -78,6 +78,12 @@ package object scio { | |||
Await.result(self.flatMap(identity), atMost) | |||
} | |||
|
|||
/** Set the name of the next transform. */ | |||
implicit class NamedTfSCollection[T: ClassTag](val wrapped: SCollection[T]) { | |||
def withName(tfName: String): SCollection[T] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should fail if wrapped
is already a NamedTfSCollection
. Also I'd name this NamedSCollection
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True. Easiest way is probably just to have wrapped
be SCollectionImpl
instead right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually that won't work because user only sees SCollection[T]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably true, you might not even need a NamedSCollectionImpl
, just reflect on self.nameProvider
should be sufficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean you might not need a implicit class, have def .withName
on PCollectionWrapper
would be the best/cleanest, if we can get rid of CallSites.getCurrent
everywhere else and replace them with this.nameProvider.get()
.
Sure. You can do multi-commits PRs. It's clearer if we keep a revert commit ( |
ce3fe78
to
dd19b5c
Compare
@nevillelyh @ravwojdyla This solution works for all |
aad2628
to
28ad371
Compare
Should note that this still doesn't prevent calling |
09517f1
to
6e1560d
Compare
Current coverage is 81.65% (diff: 100%)@@ master #357 diff @@
==========================================
Files 66 67 +1
Lines 2540 2627 +87
Methods 2267 2327 +60
Messages 0 0
Branches 273 300 +27
==========================================
+ Hits 2088 2145 +57
- Misses 452 482 +30
Partials 0 0
|
578dd6b
to
f62fefc
Compare
import com.spotify.scio.util.CallSites | ||
|
||
trait TransformNameable[T <: TransformNameable[T]] { | ||
this: T => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the convention is to keep this: T =>
on the previous line.
def tfName: String = nameProvider.name | ||
|
||
def withName(name: String): T = { | ||
nameProvider = new ConstNameProvider(name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add an require()
here against withName().withName()
?
Also do a |
@nevillelyh just pushed two more commits to address your comments. |
801ba10
to
1822fe6
Compare
👍 |
Don't leak variable transform name to subclasses. Also fix whitespace and make test names consistent. Add TransformNameProvider so CallSites.getCurrent can be deferred to the proper call site. Fix test.
Also give explicit names to internal transforms in DoubleSCollectionFunctions
b30261f
to
98c2539
Compare
@@ -79,8 +79,8 @@ package object accumulators { | |||
|
|||
/** Count positive and negative results in a filter with automatically created accumulators. */ | |||
def accumulateCountFilter(f: T => Boolean): SCollection[T] = { | |||
val accPos = self.context.sumAccumulator[Long]("Positive#" + CallSites.getCurrentName) | |||
val accNeg = self.context.sumAccumulator[Long]("Negative#" + CallSites.getCurrentName) | |||
val accPos = self.context.sumAccumulator[Long]("Positive#" + self.tfName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These 2 are not PTransform
names but accumulator identifiers. We should keep the original so that they're distinguishable.
@@ -60,7 +60,7 @@ package object accumulators { | |||
|
|||
/** Count elements with an automatically created accumulator. */ | |||
def accumulateCount: SCollection[T] = | |||
self.accumulateCount(self.context.sumAccumulator[Long](CallSites.getCurrentName)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an accumulator name. Keep the original.
@@ -39,22 +39,26 @@ class DoubleSCollectionFunctions(self: SCollection[Double]) { | |||
// def sum: SCollection[Double] = this.stats().map(_.sum) | |||
|
|||
/** Compute the standard deviation of this SCollection's elements. */ | |||
def stdev: SCollection[Double] = self.transform(_.stats.map(_.stdev)) | |||
def stdev: SCollection[Double] = self.transform(_.stats.withName("StdDev").map(_.stdev)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's leave transforms in this file alone for now? There're too many of them and it's impossible to catch all. Like there're a few in histogram()
that we didn't cover?
Plus I doubt these are super useful in streaming. Better to catch them later as they pop up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did this up front so that we can have a test for naming transforms on DoubleSCollectionFunctions
. Since they're composite transforms I wanted to make the internal name deterministic to make an assertion. https://github.com/spotify/scio/blob/playground/scio-test/src/test/scala/com/spotify/scio/values/NamedTransformTest.scala#L45
Maybe instead I can remove these, and just assert the transform name starts with the given name and ignore the inner transform name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree we can just assert the part before /
. Leave the inner ones alone for now and fix when it becomes a problem.
@@ -60,8 +60,8 @@ object SCollection { | |||
def unionAll[T: ClassTag](scs: Iterable[SCollection[T]]): SCollection[T] = { | |||
val o = PCollectionList | |||
.of(scs.map(_.internal).asJava) | |||
.apply(CallSites.getCurrent, Flatten.pCollections()) | |||
new SCollectionImpl(o, scs.head.context) | |||
.apply("FlattenList", Flatten.pCollections()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use UnionAll
to be more precise?
@@ -163,7 +163,7 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] { | |||
def union(that: SCollection[T]): SCollection[T] = { | |||
val o = PCollectionList | |||
.of(internal).and(that.internal) | |||
.apply(CallSites.getCurrent, Flatten.pCollections()) | |||
.apply("FlattenList", Flatten.pCollections()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can use this.tfName
here? More consistent with other transforms.
trait TransformNameable { | ||
private var nameProvider: TransformNameProvider = CallSiteNameProvider | ||
|
||
def tfName: String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this private[values]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ScioContext
also needs this, will make it private[scio]
.withName("MyTransform").map(_ * 3) | ||
val p3 = p1 | ||
.withName("MyTransform").map(_ * 4) | ||
assertTransformName(p1, "MyTransform") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't find the incremental logic in this PR, where are they from? Is it better to fail duplicate names instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is built-in to dataflow, I included a test for it because an earlier version of this PR actually broke in this scenario.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
* Add ability to name transforms in all SCollection variants. Don't leak variable transform name to subclasses. Also fix whitespace and make test names consistent. Add TransformNameProvider so CallSites.getCurrent can be deferred to the proper call site. Fix test. * Remove all occurrences of CallSites.getCurrent Also give explicit names to internal transforms in DoubleSCollectionFunctions * Prevent successive calls to withName * Always reset NameProvider after tfName requested. * No need to use F-bounded type for TransformNameable. * Accumulator names should still use CallSites.getCurrentName * Fix transform names for union and unionAll * Ignore internal transform names in DoubleSCollectionFunctions for now. * Make tfName scio-private * Better test helper function names. * Fix formatting Conflicts: scio-core/src/main/scala/com/spotify/scio/util/MultiJoin.scala scio-core/src/main/scala/com/spotify/scio/values/PCollectionWrapper.scala scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala
After some discussion in #354 we decided for now this would be the best approach. This is ready for review, but there are a couple other things we will need to address before everything is fully supported
CallSites.getCurrent
), so even if the user gives a name to the outer transform, the inner transform is still named non-deterministically and would be a problem for updating. We will need to go through the Scio API and identify all the places where we internally use composite transforms, and give them all an appropriate name similar to what the Dataflow SDK does for its composite transforms.I propose we get this out which will solve the problem for the majority of transforms, and open another issue to address the remaining work in the above items.