Skip to content

Commit

Permalink
Remove old skewed-join API (#4808)
Browse files Browse the repository at this point in the history
* Deprecate old skewed-join API

* Remove API

* Add scalafix rule for skewedJoin API migration

* Respect scalafix packaging

* Move scalafix packages
  • Loading branch information
RustedBones authored Jun 7, 2023
1 parent 737a3a1 commit d0e2843
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 228 deletions.
18 changes: 16 additions & 2 deletions scalafix/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ lazy val `output-0_12` = project
libraryDependencies ++= scio(Scio.`0.12`)
)

//lazy val `input-0_13` = project
// .settings(
// libraryDependencies ++= scio(Scio.`0.12`)
// )
//
//lazy val `output-0_13` = project
// .settings(
// libraryDependencies ++= scio(Scio.`0.13`)
// )

lazy val tests = project
.enablePlugins(ScalafixTestkitPlugin)
.dependsOn(rules)
Expand All @@ -98,22 +108,26 @@ lazy val tests = project
`input-0_7` / Compile / compile,
`input-0_8` / Compile / compile,
`input-0_10` / Compile / compile,
`input-0_12` / Compile / compile
`input-0_12` / Compile / compile,
// `input-0_13` / Compile / compile
)
.value,
scalafixTestkitOutputSourceDirectories :=
(`output-0_7` / Compile / sourceDirectories).value ++
(`output-0_8` / Compile / sourceDirectories).value ++
(`output-0_10` / Compile / sourceDirectories).value ++
(`output-0_12` / Compile / sourceDirectories).value,
// (`output-0_13` / Compile / sourceDirectories).value,
scalafixTestkitInputSourceDirectories :=
(`input-0_7` / Compile / sourceDirectories).value ++
(`input-0_8` / Compile / sourceDirectories).value ++
(`input-0_10` / Compile / sourceDirectories).value ++
(`input-0_12` / Compile / sourceDirectories).value,
// (`input-0_13` / Compile / sourceDirectories).value,
scalafixTestkitInputClasspath :=
(`input-0_7` / Compile / fullClasspath).value ++
(`input-0_8` / Compile / fullClasspath).value ++
(`input-0_10` / Compile / fullClasspath).value ++
(`input-0_12` / Compile / fullClasspath).value
(`input-0_12` / Compile / fullClasspath).value // ++
// (`input-0_13` / Compile / fullClasspath).value
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
rule = FixSkewedJoins
*/
package fix.v0_13_0

import com.spotify.scio.values.SCollection

object FixSkewedJoins {

// pair SCollection
val scoll: SCollection[(String, Int)] = ???
val rhs: SCollection[(String, String)] = ???

// unamed parameters
scoll.skewedJoin(rhs, 9000, 0.001, 42, 1e-10, 1.0, true)
scoll.skewedLeftOuterJoin(rhs, 9000, 0.001, 42, 1e-10, 1.0, true)
scoll.skewedFullOuterJoin(rhs, 9000, 0.001, 42, 1e-10, 1.0, true)

// named parameters
scoll.skewedJoin(
rhs = rhs,
hotKeyThreshold = 9000,
eps = 0.001,
seed = 42,
delta = 1e-10,
sampleFraction = 1.0,
withReplacement = true
)
scoll.skewedLeftOuterJoin(
rhs = rhs,
hotKeyThreshold = 9000,
eps = 0.001,
seed = 42,
delta = 1e-10,
sampleFraction = 1.0,
withReplacement = true
)
scoll.skewedFullOuterJoin(
rhs = rhs,
hotKeyThreshold = 9000,
eps = 0.001,
seed = 42,
delta = 1e-10,
sampleFraction = 1.0,
withReplacement = true
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package fix.v0_13_0

import com.spotify.scio.values.SCollection
import com.spotify.scio.values.HotKeyMethod

object FixSkewedJoins {

// pair SCollection
val scoll: SCollection[(String, Int)] = ???
val rhs: SCollection[(String, String)] = ???

// unamed parameters
scoll.skewedJoin(rhs = rhs, hotKeyMethod = HotKeyMethod.Threshold(9000), cmsEps = 0.001d, cmsDelta = 1E-10d, cmsSeed = 42, sampleFraction = 1.0d, sampleWithReplacement = true)
scoll.skewedLeftOuterJoin(rhs = rhs, hotKeyMethod = HotKeyMethod.Threshold(9000), cmsEps = 0.001d, cmsDelta = 1E-10d, cmsSeed = 42, sampleFraction = 1.0d, sampleWithReplacement = true)
scoll.skewedFullOuterJoin(rhs = rhs, hotKeyMethod = HotKeyMethod.Threshold(9000), cmsEps = 0.001d, cmsDelta = 1E-10d, cmsSeed = 42, sampleFraction = 1.0d, sampleWithReplacement = true)

// named parameters
scoll.skewedJoin(rhs = rhs, hotKeyMethod = HotKeyMethod.Threshold(9000), cmsEps = 0.001d, cmsDelta = 1E-10d, cmsSeed = 42, sampleFraction = 1.0d, sampleWithReplacement = true)
scoll.skewedLeftOuterJoin(rhs = rhs, hotKeyMethod = HotKeyMethod.Threshold(9000), cmsEps = 0.001d, cmsDelta = 1E-10d, cmsSeed = 42, sampleFraction = 1.0d, sampleWithReplacement = true)
scoll.skewedFullOuterJoin(rhs = rhs, hotKeyMethod = HotKeyMethod.Threshold(9000), cmsEps = 0.001d, cmsDelta = 1E-10d, cmsSeed = 42, sampleFraction = 1.0d, sampleWithReplacement = true)
}
3 changes: 2 additions & 1 deletion scalafix/project/Scio.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ object Scio {
val `0.9` = "0.9.5"
val `0.10` = "0.10.4"
val `0.11` = "0.11.9"
val `0.12` = "0.12.0-RC1"
val `0.12` = "0.12.0"
val `0.13` = "0.12.8+42-378cd978+20230531-1404-SNAPSHOT" // TODO
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ fix.v0_8_0.FixBigQueryDeprecations
fix.v0_8_0.ConsistenceJoinNames
fix.v0_10_0.FixCoderPropagation
fix.v0_12_0.FixBqSaveAsTable
fix.v0_12_0.FixPubsubSpecializations
fix.v0_12_0.FixPubsubSpecializations
fix.v0_13_0.FixSkewedJoins
78 changes: 78 additions & 0 deletions scalafix/rules/src/main/scala/fix/v0_13_0/FixSkewedJoins.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package fix.v0_13_0

import scalafix.v1.{MethodSignature, _}

import scala.meta._
import scala.meta.contrib._

object FixSkewedJoins {

private val PairSkewedSCollectionFunctions = "com/spotify/scio/values/PairSkewedSCollectionFunctions"

val SkewedJoins: SymbolMatcher =
SymbolMatcher.normalized(PairSkewedSCollectionFunctions + "#skewedJoin") +
SymbolMatcher.normalized(PairSkewedSCollectionFunctions + "#skewedLeftOuterJoin") +
SymbolMatcher.normalized(PairSkewedSCollectionFunctions + "#skewedFullOuterJoin")

val HotKeyMethodImport = importer"com.spotify.scio.values.HotKeyMethod"

val ParamRhs = Term.Name("rhs")
val ParamHotKeyThreshold = Term.Name("hotKeyThreshold")
val ParamEps = Term.Name("eps")
val ParamSeed = Term.Name("seed")
val ParamDelta = Term.Name("delta")
val ParamSampleFraction = Term.Name("sampleFraction")
val ParamWithReplacement = Term.Name("withReplacement")

val OldParameters = List(
ParamRhs.value,
ParamHotKeyThreshold.value,
ParamEps.value,
ParamSeed.value,
ParamDelta.value,
ParamSampleFraction.value,
ParamWithReplacement.value,
"hasher"
)

}

class FixSkewedJoins extends SemanticRule("FixSkewedJoins") {

import FixSkewedJoins._

private def isOldSkewedJoinApi(fn: Term)(implicit doc: SemanticDocument): Boolean = {
val symbol = fn.symbol
SkewedJoins.matches(symbol) && (symbol.info.get.signature match {
case MethodSignature(_, parameterLists, _) => parameterLists.flatten.map(_.symbol.displayName) == OldParameters
case _ => false
})
}


private def findParam(param: Term.Name, pos: Int)(args: List[Term]): Option[Term] = {
args
.collectFirst {
case q"$name = $value" if name.isEqual(param) => value
}
.orElse {
args.takeWhile(!_.isInstanceOf[Term.Assign]).lift(pos)
}
}

override def fix(implicit doc: SemanticDocument): Patch = {
doc.tree.collect {
case t @ q"$fn(..$params)" if isOldSkewedJoinApi(fn) =>
val rhs = findParam(ParamRhs, 0)(params).map(p => q"rhs = $p")
val hotKeyMethod = findParam(ParamHotKeyThreshold, 1)(params).map(p => q"hotKeyMethod = HotKeyMethod.Threshold($p)")
val cmsEps = findParam(ParamEps, 2)(params).map(p => q"cmsEps = $p")
val cmsDelta = findParam(ParamDelta, 4)(params).map(p => q"cmsDelta = $p")
val cmsSeed = findParam(ParamSeed, 3)(params).map(p => q"cmsSeed = $p")
val sampleFraction = findParam(ParamSampleFraction, 5)(params).map(p => q"sampleFraction = $p")
val sampleWithReplacement = findParam(ParamWithReplacement, 6)(params).map(p => q"sampleWithReplacement = $p")

val updated = (rhs ++ hotKeyMethod ++ cmsEps ++ cmsDelta ++ cmsSeed ++ sampleFraction ++ sampleWithReplacement).toList
Patch.addGlobalImport(HotKeyMethodImport) + Patch.replaceTree(t, q"$fn(..$updated)".syntax)
}.asPatch
}
}
Loading

0 comments on commit d0e2843

Please sign in to comment.