Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#787 - Move encoder implementation details to external shim library (not dependent on the Spark 4 release) #800

Open
wants to merge 73 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
d3ddaf1
#755 - correct version number in readme for non 3.5 build
chris-twiner Sep 29, 2023
a435adc
Merge branch 'master' of github.com:typelevel/frameless
chris-twiner Dec 27, 2023
24bde95
Merge branch 'master' of github.com:typelevel/frameless
chris-twiner Feb 20, 2024
cb259fa
#787 - base required for shim and 14.3.dbr
chris-twiner Feb 26, 2024
b8d4f05
#787 - [Un]WrapOption, Invoke, NewInstance, GetStructField, ifisnull,…
chris-twiner Feb 28, 2024
7944fe9
#787 - [Un]WrapOption, Invoke, NewInstance, GetStructField, ifisnull,…
chris-twiner Feb 28, 2024
71bb38c
#787 - forced reformatting
chris-twiner Feb 28, 2024
c843c6a
#787 - forced reformatting
chris-twiner Feb 28, 2024
9a0c55b
#787 - forced reformatting
chris-twiner Feb 28, 2024
0616953
#787 - mima MapGroups removal
chris-twiner Feb 28, 2024
a70d5c3
#787 - Spark 4 starter pack
chris-twiner Mar 1, 2024
1ef1d9b
#787 - Spark 4 starter pack
chris-twiner Mar 1, 2024
7a96748
#787 - Spark 4 starter pack
chris-twiner Mar 1, 2024
7d0e131
#787 - Spark 4 starter pack, doh
chris-twiner Mar 1, 2024
c6a4341
#787 - resolve conflict for auto merge
chris-twiner Mar 1, 2024
025ee64
Merge branch 'master' into temp/787_shim
chris-twiner Mar 1, 2024
7b17009
Merge branch 'temp/787_shim' of github.com:chris-twiner/frameless int…
chris-twiner Mar 7, 2024
6717e4b
Merge remote-tracking branch 'frameless/master' into temp/787_shim
chris-twiner Mar 7, 2024
4933a90
#787 - reduce the case class api usage even further and CreateStruct,…
chris-twiner Mar 7, 2024
0f9b7cf
#787 - disable local maven again
chris-twiner Mar 7, 2024
059a8e6
#787 - remove all sql package private code
chris-twiner Mar 7, 2024
9c506df
#787 - remove all sql package private code
chris-twiner Mar 7, 2024
11aece0
#787 - ml internals removal - all public - #300
chris-twiner Mar 8, 2024
089cb3a
#787 - ml internals removal - all public - #300 - use rc1
chris-twiner Mar 8, 2024
c7fa1c7
#787 - ml internals removal - all public - #300 - use rc1, so1 not a …
chris-twiner Mar 8, 2024
28071ff
#787 - ml internals removal - all public - #300 - use rc2
chris-twiner Mar 8, 2024
3806e03
#787 - ml internals removal - all public - #300 - use rc2
chris-twiner Mar 8, 2024
1c1d370
#787 - ml internals removal - all public - #300 - use rc2
chris-twiner Mar 8, 2024
728c935
#787 - ml internals removal - all public - #300 - use rc2
chris-twiner Mar 8, 2024
768d467
#787 - rc2
chris-twiner Mar 8, 2024
5a30614
#787 - rc2
chris-twiner Mar 8, 2024
95c66cc
#787 - rc2 - seems each sub object needs adding
chris-twiner Mar 8, 2024
2e11b6d
#787 - rc2 - doc is now an issue?
chris-twiner Mar 8, 2024
1888f4e
#787 - rc2 - mc reflection
chris-twiner Mar 9, 2024
d146f00
#787 - rc2 - add test artefacts
chris-twiner Mar 9, 2024
692475f
#787 - allow testing of all frameless logic
chris-twiner Mar 11, 2024
1008b85
#787 - compilation issue on interface
chris-twiner Mar 11, 2024
dd10cee
#787 - fix test to run on dbr 14.3
chris-twiner Mar 12, 2024
f253d45
#787 #803 - rc4 usage and fix udf with expressionproxy
chris-twiner Mar 14, 2024
7c1e603
#787 #803 - rc4 usage and fix udf with expressionproxy - deeply neste…
chris-twiner Mar 14, 2024
b161067
#787 #803 - rc4 usage and fix udf with expressionproxy - deeply neste…
chris-twiner Mar 14, 2024
2fa1bb0
(cherry picked from commit 955ba829779010d43b9f37ec438f0c8eaea76e0e)
chris-twiner Mar 20, 2024
ee38804
#804 - starter fix, set needed
chris-twiner Mar 20, 2024
fb1c109
#804 - encoding for Set derivatives as well - test build
chris-twiner Mar 20, 2024
ae8b69a
#804 - encoding for Set derivatives as well - test build
chris-twiner Mar 20, 2024
0435c3a
#804 - encoding for Set derivatives as well - test build
chris-twiner Mar 20, 2024
52034b2
#804 - encoding for Set derivatives as well - test build, hashtrieset…
chris-twiner Mar 20, 2024
9e45d92
#804 - encoding for Set derivatives as well - test build, hashtrieset…
chris-twiner Mar 20, 2024
e7881c0
#804 - encoding for Set derivatives as well - test build, 2.13 forced…
chris-twiner Mar 20, 2024
594fceb
#804 - encoding for Set derivatives as well - test build, 2.13 forced…
chris-twiner Mar 20, 2024
5a01976
#804 - encoding for Set derivatives as well - test build, 2.13 forced…
chris-twiner Mar 20, 2024
365b21f
#804 - encoding for Set derivatives as well - test build, 2.13 forced…
chris-twiner Mar 20, 2024
4395c16
#804 - encoding for Set derivatives as well - test build, 2.13 forced…
chris-twiner Mar 20, 2024
c792c05
Merge remote-tracking branch 'upstream/master' into temp/804_clean
chris-twiner Mar 20, 2024
f0d5f16
#804 - rebased
chris-twiner Mar 20, 2024
3bdb8ad
#803 - clean udf from #804, no shim start
chris-twiner Mar 21, 2024
c2f3492
#803 - clean udf eval needs #804
chris-twiner Mar 21, 2024
08d7c3d
#803 - clean udf eval needs #804
chris-twiner Mar 21, 2024
b82d266
(cherry picked from commit 3bdb8ad8af5e3eb77f24c65fbde6d603c58b11ab)
chris-twiner Mar 21, 2024
e36eac2
(cherry picked from commit c2f349299caebc74dd3370ee82edd9134c45eb32)
chris-twiner Mar 21, 2024
aa1e6de
#787 - merge #803 / #804
chris-twiner Mar 21, 2024
be4c35e
#787 - Seq can be stream, fails on dbr, do the same as for arb
chris-twiner Mar 21, 2024
b880261
#787 #804 - stream
chris-twiner Mar 21, 2024
f793fc7
#787 - tests have ordering and precision issues when run on clusters
chris-twiner Apr 10, 2024
e582962
#787 - tests have ordering and precision issues when run on clusters
chris-twiner Apr 10, 2024
986891a
#787 - tests have ordering and precision issues when run on clusters …
chris-twiner Apr 10, 2024
66b31e9
#787 - attempt to solve all but covar_pop and kurtosis
chris-twiner Apr 11, 2024
80de4f2
#787 - attempt covar_pop and kurtosis through tolerances
chris-twiner Apr 11, 2024
a89542e
#787 - tolerance on map members and on vectors for cluster runs
chris-twiner Apr 11, 2024
271e953
#787 - pivottest was random ordering
chris-twiner Apr 12, 2024
fa75889
#787 - ensure last/first are run on a single partition - 15.0 databri…
chris-twiner Apr 12, 2024
b6189b1
#787 - ensure last/first are run on a single partition - 15.0 databri…
chris-twiner Apr 12, 2024
25cc5c3
#787 - ensure last/first are run on a single partition - 15.0 databri…
chris-twiner Apr 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 66 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
val sparkVersion = "3.5.1"
val sparkVersion =
"3.5.1" // "4.0.0-SNAPSHOT" must have the apache_snaps configured
val spark34Version = "3.4.2"
val spark33Version = "3.3.4"
val catsCoreVersion = "2.10.0"
Expand All @@ -11,10 +12,32 @@ val scalacheck = "1.17.0"
val scalacheckEffect = "1.0.4"
val refinedVersion = "0.11.1"
val nakedFSVersion = "0.1.0"
val shimVersion = "0.0.1-RC4"

val Scala212 = "2.12.19"
val Scala213 = "2.13.13"

resolvers in Global += Resolver.mavenLocal
resolvers in Global += MavenRepository(
"sonatype-s01-snapshots",
Resolver.SonatypeS01RepositoryRoot + "/snapshots"
)
resolvers in Global += MavenRepository(
"sonatype-s01-releases",
Resolver.SonatypeS01RepositoryRoot + "/releases"
)
resolvers in Global += MavenRepository(
"apache_snaps",
"https://repository.apache.org/content/repositories/snapshots"
)

import scala.concurrent.duration.DurationInt
import lmcoursier.definitions.CachePolicy

csrConfiguration := csrConfiguration.value
.withTtl(Some(1.minute))
.withCachePolicies(Vector(CachePolicy.LocalOnly))

ThisBuild / tlBaseVersion := "0.16"

ThisBuild / crossScalaVersions := Seq(Scala213, Scala212)
Expand Down Expand Up @@ -87,10 +110,10 @@ lazy val `cats-spark33` = project
lazy val dataset = project
.settings(name := "frameless-dataset")
.settings(
Compile / unmanagedSourceDirectories += baseDirectory.value / "src" / "main" / "spark-3.4+"
Test / unmanagedSourceDirectories += baseDirectory.value / "src" / "test" / "spark-3.3+"
)
.settings(
Test / unmanagedSourceDirectories += baseDirectory.value / "src" / "test" / "spark-3.3+"
libraryDependencies += "com.sparkutils" %% "shim_runtime_3.5.0.oss_3.5" % shimVersion changing () // 4.0.0.oss_4.0 for 4 snapshot
)
.settings(datasetSettings)
.settings(sparkDependencies(sparkVersion))
Expand All @@ -100,10 +123,10 @@ lazy val `dataset-spark34` = project
.settings(name := "frameless-dataset-spark34")
.settings(sourceDirectory := (dataset / sourceDirectory).value)
.settings(
Compile / unmanagedSourceDirectories += (dataset / baseDirectory).value / "src" / "main" / "spark-3.4+"
Test / unmanagedSourceDirectories += (dataset / baseDirectory).value / "src" / "test" / "spark-3.3+"
)
.settings(
Test / unmanagedSourceDirectories += (dataset / baseDirectory).value / "src" / "test" / "spark-3.3+"
libraryDependencies += "com.sparkutils" %% "shim_runtime_3.4.1.oss_3.4" % shimVersion changing ()
)
.settings(datasetSettings)
.settings(sparkDependencies(spark34Version))
Expand All @@ -114,10 +137,10 @@ lazy val `dataset-spark33` = project
.settings(name := "frameless-dataset-spark33")
.settings(sourceDirectory := (dataset / sourceDirectory).value)
.settings(
Compile / unmanagedSourceDirectories += (dataset / baseDirectory).value / "src" / "main" / "spark-3"
Test / unmanagedSourceDirectories += (dataset / baseDirectory).value / "src" / "test" / "spark-3.3+"
)
.settings(
Test / unmanagedSourceDirectories += (dataset / baseDirectory).value / "src" / "test" / "spark-3.3+"
libraryDependencies += "com.sparkutils" %% "shim_runtime_3.3.2.oss_3.3" % shimVersion changing ()
)
.settings(datasetSettings)
.settings(sparkDependencies(spark33Version))
Expand Down Expand Up @@ -239,11 +262,29 @@ lazy val datasetSettings =
imt("frameless.RecordEncoderFields.deriveRecordLast"),
mc("frameless.functions.FramelessLit"),
mc(f"frameless.functions.FramelessLit$$"),
mc("org.apache.spark.sql.FramelessInternals"),
mc(f"org.apache.spark.sql.FramelessInternals$$"),
mc("org.apache.spark.sql.FramelessInternals$DisambiguateLeft"),
mc("org.apache.spark.sql.FramelessInternals$DisambiguateLeft$"),
mc("org.apache.spark.sql.FramelessInternals$DisambiguateRight"),
mc("org.apache.spark.sql.FramelessInternals$DisambiguateRight$"),
mc("org.apache.spark.sql.reflection.package"),
mc("org.apache.spark.sql.reflection.package$"),
mc("org.apache.spark.sql.reflection.package$ScalaSubtypeLock$"),
mc("frameless.MapGroups"),
mc(f"frameless.MapGroups$$"),
dmm("frameless.functions.package.litAggr"),
dmm("org.apache.spark.sql.FramelessInternals.column")
dmm("org.apache.spark.sql.FramelessInternals.column"),
dmm("frameless.TypedEncoder.collectionEncoder"),
dmm("frameless.TypedEncoder.setEncoder"),
dmm("frameless.functions.FramelessUdf.evalCode"),
dmm("frameless.functions.FramelessUdf.copy"),
dmm("frameless.functions.FramelessUdf.this"),
dmm("frameless.functions.FramelessUdf.apply"),
imt("frameless.functions.FramelessUdf.apply")
)
},
coverageExcludedPackages := "org.apache.spark.sql.reflection",
coverageExcludedPackages := "frameless.reflection",
libraryDependencies += "com.globalmentor" % "hadoop-bare-naked-local-fs" % nakedFSVersion % Test exclude ("org.apache.hadoop", "hadoop-commons")
)

Expand All @@ -252,7 +293,18 @@ lazy val refinedSettings =
libraryDependencies += "eu.timepit" %% "refined" % refinedVersion
)

lazy val mlSettings = framelessSettings ++ framelessTypedDatasetREPL
lazy val mlSettings = framelessSettings ++ framelessTypedDatasetREPL ++ Seq(
mimaBinaryIssueFilters ++= {
import com.typesafe.tools.mima.core._

val mc = ProblemFilters.exclude[MissingClassProblem](_)

Seq(
mc("org.apache.spark.ml.FramelessInternals"),
mc(f"org.apache.spark.ml.FramelessInternals$$")
)
}
)

lazy val scalac212Options = Seq(
"-Xlint:-missing-interpolator,-unused,_",
Expand Down Expand Up @@ -324,7 +376,10 @@ lazy val framelessSettings = Seq(
* [error] +- org.scoverage:scalac-scoverage-reporter_2.12:2.0.7 (depends on 2.1.0)
* [error] +- org.scala-lang:scala-compiler:2.12.16 (depends on 1.0.6)
*/
libraryDependencySchemes += "org.scala-lang.modules" %% "scala-xml" % VersionScheme.Always
libraryDependencySchemes += "org.scala-lang.modules" %% "scala-xml" % VersionScheme.Always,
// allow testing on different runtimes, but don't publish / run docs
Test / publishArtifact := true,
Test / packageDoc / publishArtifact := false
) ++ consoleSettings

lazy val spark34Settings = Seq[Setting[_]](
Expand Down
66 changes: 52 additions & 14 deletions cats/src/test/scala/frameless/cats/test.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import _root_.cats.syntax.all._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext => SC}
import org.apache.spark.{ SparkConf, SparkContext => SC }

import org.scalatest.compatible.Assertion
import org.scalactic.anyvals.PosInt
Expand All @@ -21,24 +21,39 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.propspec.AnyPropSpec

trait SparkTests {
val appID: String = new java.util.Date().toString + math.floor(math.random() * 10E4).toLong.toString

val appID: String = new java.util.Date().toString + math
.floor(math.random() * 10e4)
.toLong
.toString

val conf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("test")
.set("spark.ui.enabled", "false")
.set("spark.app.id", appID)

implicit def session: SparkSession = SparkSession.builder().config(conf).getOrCreate()
implicit def session: SparkSession =
SparkSession.builder().config(conf).getOrCreate()
implicit def sc: SparkContext = session.sparkContext

implicit class seqToRdd[A: ClassTag](seq: Seq[A])(implicit sc: SC) {
implicit class seqToRdd[A: ClassTag](
seq: Seq[A]
)(implicit
sc: SC) {
Comment on lines +40 to +43
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, is it how scalafmt formats it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the builds won't run unless scalafmt is run on files, so yes, those settings lead to some hideous looking code.

def toRdd: RDD[A] = sc.makeRDD(seq)
}
}

object Tests {
def innerPairwise(mx: Map[String, Int], my: Map[String, Int], check: (Any, Any) => Assertion)(implicit sc: SC): Assertion = {

def innerPairwise(
mx: Map[String, Int],
my: Map[String, Int],
check: (Any, Any) => Assertion
)(implicit
sc: SC
): Assertion = {
import frameless.cats.implicits._
import frameless.cats.inner._
val xs = sc.parallelize(mx.toSeq)
Expand All @@ -63,21 +78,31 @@ object Tests {
}
}

class Test extends AnyPropSpec with Matchers with ScalaCheckPropertyChecks with SparkTests {
class Test
extends AnyPropSpec
with Matchers
with ScalaCheckPropertyChecks
with SparkTests {

implicit override val generatorDrivenConfig =
PropertyCheckConfiguration(minSize = PosInt(10))

property("spark is working") {
sc.parallelize(Seq(1, 2, 3)).collect() shouldBe Array(1,2,3)
sc.parallelize(Seq(1, 2, 3)).collect() shouldBe Array(1, 2, 3)
}

property("inner pairwise monoid") {
// Make sure we have non-empty map
forAll { (xh: (String, Int), mx: Map[String, Int], yh: (String, Int), my: Map[String, Int]) =>
Tests.innerPairwise(mx + xh, my + yh, _ shouldBe _)
forAll {
(xh: (String, Int),
mx: Map[String, Int],
yh: (String, Int),
my: Map[String, Int]
) => Tests.innerPairwise(mx + xh, my + yh, _ shouldBe _)
}
}

org.scalatestplus.scalacheck.Checkers
property("rdd simple numeric commutative semigroup") {
import frameless.cats.implicits._

Expand Down Expand Up @@ -110,7 +135,8 @@ class Test extends AnyPropSpec with Matchers with ScalaCheckPropertyChecks with
property("rdd tuple commutative semigroup example") {
import frameless.cats.implicits._
forAll { seq: List[(Int, Int)] =>
val expectedSum = if (seq.isEmpty) None else Some(Foldable[List].fold(seq))
val expectedSum =
if (seq.isEmpty) None else Some(Foldable[List].fold(seq))
Comment on lines +138 to +139
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fmt is super weird

val rdd = seq.toRdd

rdd.csum shouldBe expectedSum.getOrElse(0 -> 0)
Expand All @@ -120,10 +146,22 @@ class Test extends AnyPropSpec with Matchers with ScalaCheckPropertyChecks with

property("pair rdd numeric commutative semigroup example") {
import frameless.cats.implicits._
val seq = Seq( ("a",2), ("b",3), ("d",6), ("b",2), ("d",1) )
val seq = Seq(("a", 2), ("b", 3), ("d", 6), ("b", 2), ("d", 1))
val rdd = seq.toRdd
rdd.cminByKey.collect().toSeq should contain theSameElementsAs Seq( ("a",2), ("b",2), ("d",1) )
rdd.cmaxByKey.collect().toSeq should contain theSameElementsAs Seq( ("a",2), ("b",3), ("d",6) )
rdd.csumByKey.collect().toSeq should contain theSameElementsAs Seq( ("a",2), ("b",5), ("d",7) )
rdd.cminByKey.collect().toSeq should contain theSameElementsAs Seq(
("a", 2),
("b", 2),
("d", 1)
)
rdd.cmaxByKey.collect().toSeq should contain theSameElementsAs Seq(
("a", 2),
("b", 3),
("d", 6)
)
rdd.csumByKey.collect().toSeq should contain theSameElementsAs Seq(
("a", 2),
("b", 5),
("d", 7)
)
}
}
67 changes: 67 additions & 0 deletions dataset/src/main/scala/frameless/CollectionCaster.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package frameless

import frameless.TypedEncoder.CollectionConversion
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.{
CodegenContext,
CodegenFallback,
ExprCode
}
import org.apache.spark.sql.catalyst.expressions.{ Expression, UnaryExpression }
import org.apache.spark.sql.types.{ DataType, ObjectType }

case class CollectionCaster[F[_], C[_], Y](
child: Expression,
conversion: CollectionConversion[F, C, Y])
extends UnaryExpression
with CodegenFallback {

protected def withNewChildInternal(newChild: Expression): Expression =
copy(child = newChild)

override def eval(input: InternalRow): Any = {
val o = child.eval(input).asInstanceOf[Object]
o match {
case col: F[Y] @unchecked =>
conversion.convert(col)
case _ => o
}
}

override def dataType: DataType = child.dataType
}

case class SeqCaster[C[X] <: Iterable[X], Y](child: Expression)
extends UnaryExpression {

protected def withNewChildInternal(newChild: Expression): Expression =
copy(child = newChild)

// eval on interpreted works, fallback on codegen does not, e.g. with ColumnTests.asCol and Vectors, the code generated still has child of type Vector but child eval returns X2, which is not good
override def eval(input: InternalRow): Any = {
val o = child.eval(input).asInstanceOf[Object]
o match {
case col: Set[Y] @unchecked =>
col.toSeq
case _ => o
}
}

def toSeqOr[T](isSet: => T, or: => T): T =
child.dataType match {
case ObjectType(cls)
if classOf[scala.collection.Set[_]].isAssignableFrom(cls) =>
isSet
case t => or
}

override def dataType: DataType =
toSeqOr(ObjectType(classOf[scala.collection.Seq[_]]), child.dataType)

override protected def doGenCode(
ctx: CodegenContext,
ev: ExprCode
): ExprCode =
defineCodeGen(ctx, ev, c => toSeqOr(s"$c.toVector()", s"$c"))

}
Loading
Loading