Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove old skewed-join API #4808

Merged
merged 7 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions scalafix/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ lazy val `output-0_12` = project
libraryDependencies ++= scio(Scio.`0.12`)
)

//lazy val `input-0_13` = project
// .settings(
// libraryDependencies ++= scio(Scio.`0.12`)
// )
//
//lazy val `output-0_13` = project
// .settings(
// libraryDependencies ++= scio(Scio.`0.13`)
// )

lazy val tests = project
.enablePlugins(ScalafixTestkitPlugin)
.dependsOn(rules)
Expand All @@ -98,22 +108,26 @@ lazy val tests = project
`input-0_7` / Compile / compile,
`input-0_8` / Compile / compile,
`input-0_10` / Compile / compile,
`input-0_12` / Compile / compile
`input-0_12` / Compile / compile,
// `input-0_13` / Compile / compile
)
.value,
scalafixTestkitOutputSourceDirectories :=
(`output-0_7` / Compile / sourceDirectories).value ++
(`output-0_8` / Compile / sourceDirectories).value ++
(`output-0_10` / Compile / sourceDirectories).value ++
(`output-0_12` / Compile / sourceDirectories).value,
// (`output-0_13` / Compile / sourceDirectories).value,
scalafixTestkitInputSourceDirectories :=
(`input-0_7` / Compile / sourceDirectories).value ++
(`input-0_8` / Compile / sourceDirectories).value ++
(`input-0_10` / Compile / sourceDirectories).value ++
(`input-0_12` / Compile / sourceDirectories).value,
// (`input-0_13` / Compile / sourceDirectories).value,
scalafixTestkitInputClasspath :=
(`input-0_7` / Compile / fullClasspath).value ++
(`input-0_8` / Compile / fullClasspath).value ++
(`input-0_10` / Compile / fullClasspath).value ++
(`input-0_12` / Compile / fullClasspath).value
(`input-0_12` / Compile / fullClasspath).value // ++
// (`input-0_13` / Compile / fullClasspath).value
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
rule = FixSkewedJoins
*/
package fix.v0_13_0

import com.spotify.scio.values.SCollection

object FixSkewedJoins {

// pair SCollection
val scoll: SCollection[(String, Int)] = ???
val rhs: SCollection[(String, String)] = ???

// unamed parameters
scoll.skewedJoin(rhs, 9000, 0.001, 42, 1e-10, 1.0, true)
scoll.skewedLeftOuterJoin(rhs, 9000, 0.001, 42, 1e-10, 1.0, true)
scoll.skewedFullOuterJoin(rhs, 9000, 0.001, 42, 1e-10, 1.0, true)

// named parameters
scoll.skewedJoin(
rhs = rhs,
hotKeyThreshold = 9000,
eps = 0.001,
seed = 42,
delta = 1e-10,
sampleFraction = 1.0,
withReplacement = true
)
scoll.skewedLeftOuterJoin(
rhs = rhs,
hotKeyThreshold = 9000,
eps = 0.001,
seed = 42,
delta = 1e-10,
sampleFraction = 1.0,
withReplacement = true
)
scoll.skewedFullOuterJoin(
rhs = rhs,
hotKeyThreshold = 9000,
eps = 0.001,
seed = 42,
delta = 1e-10,
sampleFraction = 1.0,
withReplacement = true
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package fix.v0_13_0

import com.spotify.scio.values.SCollection
import com.spotify.scio.values.HotKeyMethod

object FixSkewedJoins {

// pair SCollection
val scoll: SCollection[(String, Int)] = ???
val rhs: SCollection[(String, String)] = ???

// unamed parameters
scoll.skewedJoin(rhs = rhs, hotKeyMethod = HotKeyMethod.Threshold(9000), cmsEps = 0.001d, cmsDelta = 1E-10d, cmsSeed = 42, sampleFraction = 1.0d, sampleWithReplacement = true)
scoll.skewedLeftOuterJoin(rhs = rhs, hotKeyMethod = HotKeyMethod.Threshold(9000), cmsEps = 0.001d, cmsDelta = 1E-10d, cmsSeed = 42, sampleFraction = 1.0d, sampleWithReplacement = true)
scoll.skewedFullOuterJoin(rhs = rhs, hotKeyMethod = HotKeyMethod.Threshold(9000), cmsEps = 0.001d, cmsDelta = 1E-10d, cmsSeed = 42, sampleFraction = 1.0d, sampleWithReplacement = true)

// named parameters
scoll.skewedJoin(rhs = rhs, hotKeyMethod = HotKeyMethod.Threshold(9000), cmsEps = 0.001d, cmsDelta = 1E-10d, cmsSeed = 42, sampleFraction = 1.0d, sampleWithReplacement = true)
scoll.skewedLeftOuterJoin(rhs = rhs, hotKeyMethod = HotKeyMethod.Threshold(9000), cmsEps = 0.001d, cmsDelta = 1E-10d, cmsSeed = 42, sampleFraction = 1.0d, sampleWithReplacement = true)
scoll.skewedFullOuterJoin(rhs = rhs, hotKeyMethod = HotKeyMethod.Threshold(9000), cmsEps = 0.001d, cmsDelta = 1E-10d, cmsSeed = 42, sampleFraction = 1.0d, sampleWithReplacement = true)
}
3 changes: 2 additions & 1 deletion scalafix/project/Scio.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ object Scio {
val `0.9` = "0.9.5"
val `0.10` = "0.10.4"
val `0.11` = "0.11.9"
val `0.12` = "0.12.0-RC1"
val `0.12` = "0.12.0"
val `0.13` = "0.12.8+42-378cd978+20230531-1404-SNAPSHOT" // TODO
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ fix.v0_8_0.FixBigQueryDeprecations
fix.v0_8_0.ConsistenceJoinNames
fix.v0_10_0.FixCoderPropagation
fix.v0_12_0.FixBqSaveAsTable
fix.v0_12_0.FixPubsubSpecializations
fix.v0_12_0.FixPubsubSpecializations
fix.v0_13_0.FixSkewedJoins
78 changes: 78 additions & 0 deletions scalafix/rules/src/main/scala/fix/v0_13_0/FixSkewedJoins.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package fix.v0_13_0

import scalafix.v1.{MethodSignature, _}

import scala.meta._
import scala.meta.contrib._

object FixSkewedJoins {

private val PairSkewedSCollectionFunctions = "com/spotify/scio/values/PairSkewedSCollectionFunctions"

val SkewedJoins: SymbolMatcher =
SymbolMatcher.normalized(PairSkewedSCollectionFunctions + "#skewedJoin") +
SymbolMatcher.normalized(PairSkewedSCollectionFunctions + "#skewedLeftOuterJoin") +
SymbolMatcher.normalized(PairSkewedSCollectionFunctions + "#skewedFullOuterJoin")

val HotKeyMethodImport = importer"com.spotify.scio.values.HotKeyMethod"

val ParamRhs = Term.Name("rhs")
val ParamHotKeyThreshold = Term.Name("hotKeyThreshold")
val ParamEps = Term.Name("eps")
val ParamSeed = Term.Name("seed")
val ParamDelta = Term.Name("delta")
val ParamSampleFraction = Term.Name("sampleFraction")
val ParamWithReplacement = Term.Name("withReplacement")

val OldParameters = List(
ParamRhs.value,
ParamHotKeyThreshold.value,
ParamEps.value,
ParamSeed.value,
ParamDelta.value,
ParamSampleFraction.value,
ParamWithReplacement.value,
"hasher"
)

}

class FixSkewedJoins extends SemanticRule("FixSkewedJoins") {

import FixSkewedJoins._

private def isOldSkewedJoinApi(fn: Term)(implicit doc: SemanticDocument): Boolean = {
val symbol = fn.symbol
SkewedJoins.matches(symbol) && (symbol.info.get.signature match {
case MethodSignature(_, parameterLists, _) => parameterLists.flatten.map(_.symbol.displayName) == OldParameters
case _ => false
})
}


private def findParam(param: Term.Name, pos: Int)(args: List[Term]): Option[Term] = {
args
.collectFirst {
case q"$name = $value" if name.isEqual(param) => value
}
.orElse {
args.takeWhile(!_.isInstanceOf[Term.Assign]).lift(pos)
}
}

override def fix(implicit doc: SemanticDocument): Patch = {
doc.tree.collect {
case t @ q"$fn(..$params)" if isOldSkewedJoinApi(fn) =>
val rhs = findParam(ParamRhs, 0)(params).map(p => q"rhs = $p")
val hotKeyMethod = findParam(ParamHotKeyThreshold, 1)(params).map(p => q"hotKeyMethod = HotKeyMethod.Threshold($p)")
val cmsEps = findParam(ParamEps, 2)(params).map(p => q"cmsEps = $p")
val cmsDelta = findParam(ParamDelta, 4)(params).map(p => q"cmsDelta = $p")
val cmsSeed = findParam(ParamSeed, 3)(params).map(p => q"cmsSeed = $p")
val sampleFraction = findParam(ParamSampleFraction, 5)(params).map(p => q"sampleFraction = $p")
val sampleWithReplacement = findParam(ParamWithReplacement, 6)(params).map(p => q"sampleWithReplacement = $p")

val updated = (rhs ++ hotKeyMethod ++ cmsEps ++ cmsDelta ++ cmsSeed ++ sampleFraction ++ sampleWithReplacement).toList
Patch.addGlobalImport(HotKeyMethodImport) + Patch.replaceTree(t, q"$fn(..$updated)".syntax)
}.asPatch
}
}
Loading