Skip to content

Commit

Permalink
Spark 3.5 support (#759)
Browse files Browse the repository at this point in the history
* #755 - spark 3.5.0 version

* #755 - spark 3.5.0 version - re-formatting only

* #755 - spark 3.5.0 version - re-formatting only

* #755 - spark 3.5.0 version - build root 34 fix

* #755 - spark 3.5.0 version - build root 34 fix - gc

* #755 - spark 3.5.0 version - build root 34 fix - gc oops

* #755 - remove jvmopts, add ci param, per cchantep
  • Loading branch information
chris-twiner committed Sep 28, 2023
1 parent 79384ed commit f559a6c
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 77 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ on:

env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SBT_OPTS: '-Xms1g -Xmx4g'
SPARK_LOCAL_IP: localhost


Expand Down Expand Up @@ -162,7 +163,7 @@ jobs:
- name: Submit Dependencies
uses: scalacenter/sbt-dependency-submission@v2
with:
modules-ignore: root-spark33_2.13 root-spark33_2.12 docs_2.13 docs_2.12 root-spark34_2.13 root-spark34_2.12 root-spark32_2.13 root-spark32_2.12
modules-ignore: root-spark33_2.13 root-spark33_2.12 docs_2.13 docs_2.12 root-spark34_2.13 root-spark34_2.12 root-spark35_2.13 root-spark35_2.12 root-spark32_2.13 root-spark32_2.12
configs-ignore: test scala-tool scala-doc-tool test-internal

site:
Expand Down
33 changes: 17 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,23 @@ associated channels (e.g. GitHub, Discord) to be a safe and friendly environment
The compatible versions of [Spark](http://spark.apache.org/) and
[cats](https://github.com/typelevel/cats) are as follows:

| Frameless | Spark | Cats | Cats-Effect | Scala |
|-----------|-----------------------|----------|-------------|-------------|
| 0.14.1 | 3.4.0 / 3.3.0 / 3.2.2 | 2.x | 3.x | 2.12 / 2.13 |
| 0.14.0 | 3.3.0 / 3.2.2 / 3.1.3 | 2.x | 3.x | 2.12 / 2.13 |
| 0.13.0 | 3.3.0 / 3.2.2 / 3.1.3 | 2.x | 3.x | 2.12 / 2.13 |
| 0.12.0 | 3.2.1 / 3.1.3 / 3.0.3 | 2.x | 3.x | 2.12 / 2.13 |
| 0.11.1 | 3.2.0 / 3.1.2 / 3.0.1 | 2.x | 2.x | 2.12 / 2.13 |
| 0.11.0* | 3.2.0 / 3.1.2 / 3.0.1 | 2.x | 2.x | 2.12 / 2.13 |
| 0.10.1 | 3.1.0 | 2.x | 2.x | 2.12 |
| 0.9.0 | 3.0.0 | 1.x | 1.x | 2.12 |
| 0.8.0 | 2.4.0 | 1.x | 1.x | 2.11 / 2.12 |
| 0.7.0 | 2.3.1 | 1.x | 1.x | 2.11 |
| 0.6.1 | 2.3.0 | 1.x | 0.8 | 2.11 |
| 0.5.2 | 2.2.1 | 1.x | 0.8 | 2.11 |
| 0.4.1 | 2.2.0 | 1.x | 0.8 | 2.11 |
| 0.4.0 | 2.2.0 | 1.0.0-IF | 0.4 | 2.11 |
| Frameless | Spark | Cats | Cats-Effect | Scala |
|-----------|-------------------------------|----------|-------------|-------------|
| 0.14.2 | 3.5.0 / 3.4.0 / 3.3.0 / 3.2.2 | 2.x | 3.x | 2.12 / 2.13 |
| 0.14.1 | 3.4.0 / 3.3.0 / 3.2.2 | 2.x | 3.x | 2.12 / 2.13 |
| 0.14.0 | 3.3.0 / 3.2.2 / 3.1.3 | 2.x | 3.x | 2.12 / 2.13 |
| 0.13.0 | 3.3.0 / 3.2.2 / 3.1.3 | 2.x | 3.x | 2.12 / 2.13 |
| 0.12.0 | 3.2.1 / 3.1.3 / 3.0.3 | 2.x | 3.x | 2.12 / 2.13 |
| 0.11.1 | 3.2.0 / 3.1.2 / 3.0.1 | 2.x | 2.x | 2.12 / 2.13 |
| 0.11.0* | 3.2.0 / 3.1.2 / 3.0.1 | 2.x | 2.x | 2.12 / 2.13 |
| 0.10.1 | 3.1.0 | 2.x | 2.x | 2.12 |
| 0.9.0 | 3.0.0 | 1.x | 1.x | 2.12 |
| 0.8.0 | 2.4.0 | 1.x | 1.x | 2.11 / 2.12 |
| 0.7.0 | 2.3.1 | 1.x | 1.x | 2.11 |
| 0.6.1 | 2.3.0 | 1.x | 0.8 | 2.11 |
| 0.5.2 | 2.2.1 | 1.x | 0.8 | 2.11 |
| 0.4.1 | 2.2.0 | 1.x | 0.8 | 2.11 |
| 0.4.0 | 2.2.0 | 1.0.0-IF | 0.4 | 2.11 |

_\* 0.11.0 has broken Spark 3.1.2 and 3.0.1 artifacts published._

Expand Down
82 changes: 77 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
val sparkVersion = "3.4.1"
val sparkVersion = "3.5.0"
val spark34Version = "3.4.1"
val spark33Version = "3.3.3"
val spark32Version = "3.2.4"
val catsCoreVersion = "2.10.0"
Expand All @@ -15,7 +16,7 @@ val nakedFSVersion = "0.1.0"
val Scala212 = "2.12.18"
val Scala213 = "2.13.12"

ThisBuild / tlBaseVersion := "0.14"
ThisBuild / tlBaseVersion := "0.15"

ThisBuild / crossScalaVersions := Seq(Scala213, Scala212)
ThisBuild / scalaVersion := Scala212
Expand All @@ -24,12 +25,29 @@ lazy val root = project
.in(file("."))
.enablePlugins(NoPublishPlugin)
.settings(crossScalaVersions := Nil)
.aggregate(`root-spark34`, `root-spark33`, `root-spark32`, docs)
.aggregate(
`root-spark35`,
`root-spark34`,
`root-spark33`,
`root-spark32`,
docs
)

lazy val `root-spark35` = project
.in(file(".spark35"))
.enablePlugins(NoPublishPlugin)
.aggregate(core, cats, dataset, refined, ml)

lazy val `root-spark34` = project
.in(file(".spark34"))
.enablePlugins(NoPublishPlugin)
.aggregate(core, cats, dataset, refined, ml)
.aggregate(
core,
`cats-spark34`,
`dataset-spark34`,
`refined-spark34`,
`ml-spark34`
)

lazy val `root-spark33` = project
.in(file(".spark33"))
Expand Down Expand Up @@ -61,6 +79,15 @@ lazy val cats = project
.settings(catsSettings)
.dependsOn(dataset % "test->test;compile->compile;provided->provided")

lazy val `cats-spark34` = project
.settings(name := "frameless-cats-spark34")
.settings(sourceDirectory := (cats / sourceDirectory).value)
.settings(catsSettings)
.settings(spark34Settings)
.dependsOn(
`dataset-spark34` % "test->test;compile->compile;provided->provided"
)

lazy val `cats-spark33` = project
.settings(name := "frameless-cats-spark33")
.settings(sourceDirectory := (cats / sourceDirectory).value)
Expand Down Expand Up @@ -91,6 +118,20 @@ lazy val dataset = project
.settings(sparkDependencies(sparkVersion))
.dependsOn(core % "test->test;compile->compile")

lazy val `dataset-spark34` = project
.settings(name := "frameless-dataset-spark34")
.settings(sourceDirectory := (dataset / sourceDirectory).value)
.settings(
Compile / unmanagedSourceDirectories += (dataset / baseDirectory).value / "src" / "main" / "spark-3.4+"
)
.settings(
Test / unmanagedSourceDirectories += (dataset / baseDirectory).value / "src" / "test" / "spark-3.3+"
)
.settings(datasetSettings)
.settings(sparkDependencies(spark34Version))
.settings(spark34Settings)
.dependsOn(core % "test->test;compile->compile")

lazy val `dataset-spark33` = project
.settings(name := "frameless-dataset-spark33")
.settings(sourceDirectory := (dataset / sourceDirectory).value)
Expand Down Expand Up @@ -124,6 +165,15 @@ lazy val refined = project
.settings(refinedSettings)
.dependsOn(dataset % "test->test;compile->compile;provided->provided")

lazy val `refined-spark34` = project
.settings(name := "frameless-refined-spark34")
.settings(sourceDirectory := (refined / sourceDirectory).value)
.settings(refinedSettings)
.settings(spark34Settings)
.dependsOn(
`dataset-spark34` % "test->test;compile->compile;provided->provided"
)

lazy val `refined-spark33` = project
.settings(name := "frameless-refined-spark33")
.settings(sourceDirectory := (refined / sourceDirectory).value)
Expand Down Expand Up @@ -151,6 +201,17 @@ lazy val ml = project
dataset % "test->test;compile->compile;provided->provided"
)

lazy val `ml-spark34` = project
.settings(name := "frameless-ml-spark34")
.settings(sourceDirectory := (ml / sourceDirectory).value)
.settings(mlSettings)
.settings(sparkMlDependencies(spark34Version))
.settings(spark34Settings)
.dependsOn(
core % "test->test;compile->compile",
`dataset-spark33` % "test->test;compile->compile;provided->provided"
)

lazy val `ml-spark33` = project
.settings(name := "frameless-ml-spark33")
.settings(sourceDirectory := (ml / sourceDirectory).value)
Expand Down Expand Up @@ -326,9 +387,18 @@ lazy val spark32Settings = Seq(
tlVersionIntroduced := Map("2.12" -> "0.13.0", "2.13" -> "0.13.0")
)

lazy val spark34Settings = Seq[Setting[_]](
tlVersionIntroduced := Map("2.12" -> "0.14.1", "2.13" -> "0.14.1"),
mimaPreviousArtifacts := Set(
organization.value %% moduleName.value
.split("-")
.dropRight(1)
.mkString("-") % "0.14.1"
)
)

lazy val spark33Settings = Seq[Setting[_]](
tlVersionIntroduced := Map("2.12" -> "0.13.0", "2.13" -> "0.13.0"),
// frameless-dataset-spark33 was originally frameless-dataset
mimaPreviousArtifacts := Set(
organization.value %% moduleName.value
.split("-")
Expand Down Expand Up @@ -399,3 +469,5 @@ ThisBuild / githubWorkflowBuildMatrixAdditions += "project" -> roots
ThisBuild / githubWorkflowBuildMatrixExclusions ++= roots.init.map { project =>
MatrixExclude(Map("scala" -> "2.13", "project" -> project))
}

ThisBuild / githubWorkflowEnv += "SBT_OPTS" -> "-Xms1g -Xmx4g"
110 changes: 72 additions & 38 deletions dataset/src/main/scala/frameless/functions/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,50 @@ import scala.reflect.ClassTag
import shapeless._
import shapeless.labelled.FieldType
import shapeless.ops.hlist.IsHCons
import shapeless.ops.record.{Keys, Values}
import shapeless.ops.record.{ Keys, Values }

import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.{ reflection => ScalaReflection }
import org.apache.spark.sql.catalyst.expressions.Literal

package object functions extends Udf with UnaryFunctions {

object aggregate extends AggregateFunctions
object nonAggregate extends NonAggregateFunctions

/** Creates a [[frameless.TypedAggregate]] of literal value. If A is to be encoded using an Injection make
* sure the injection instance is in scope.
*
* apache/spark
*/
def litAggr[A, T](value: A)(implicit i0: TypedEncoder[A], i1: Refute[IsValueClass[A]]): TypedAggregate[T, A] =
/**
* Creates a [[frameless.TypedAggregate]] of literal value. If A is to be encoded using an Injection make
* sure the injection instance is in scope.
*
* apache/spark
*/
def litAggr[A, T](
value: A
)(implicit
i0: TypedEncoder[A],
i1: Refute[IsValueClass[A]]
): TypedAggregate[T, A] =
new TypedAggregate[T, A](lit(value).expr)

/** Creates a [[frameless.TypedColumn]] of literal value. If A is to be encoded using an Injection make
* sure the injection instance is in scope.
*
* apache/spark
*
* @tparam A the literal value type
* @tparam T the row type
*/
def lit[A, T](value: A)(
implicit encoder: TypedEncoder[A]): TypedColumn[T, A] = {

if (ScalaReflection.isNativeType(encoder.jvmRepr) && encoder.catalystRepr == encoder.jvmRepr) {
/**
* Creates a [[frameless.TypedColumn]] of literal value. If A is to be encoded using an Injection make
* sure the injection instance is in scope.
*
* apache/spark
*
* @tparam A the literal value type
* @tparam T the row type
*/
def lit[A, T](
value: A
)(implicit
encoder: TypedEncoder[A]
): TypedColumn[T, A] = {

if (
ScalaReflection.isNativeType(
encoder.jvmRepr
) && encoder.catalystRepr == encoder.jvmRepr
) {
val expr = Literal(value, encoder.catalystRepr)

new TypedColumn(expr)
Expand All @@ -52,14 +66,24 @@ package object functions extends Udf with UnaryFunctions {
}
}

/** Creates a [[frameless.TypedColumn]] of literal value
* for a Value class `A`.
*
* @tparam A the value class
* @tparam T the row type
*/
def litValue[A : IsValueClass, T, G <: ::[_, HNil], H <: ::[_ <: FieldType[_ <: Symbol, _], HNil], K <: Symbol, V, KS <: ::[_ <: Symbol, HNil], VS <: HList](value: A)(
implicit
/**
* Creates a [[frameless.TypedColumn]] of literal value
* for a Value class `A`.
*
* @tparam A the value class
* @tparam T the row type
*/
def litValue[
A: IsValueClass,
T,
G <: ::[_, HNil],
H <: ::[_ <: FieldType[_ <: Symbol, _], HNil],
K <: Symbol,
V,
KS <: ::[_ <: Symbol, HNil],
VS <: HList
](value: A
)(implicit
i0: LabelledGeneric.Aux[A, G],
i1: DropUnitValues.Aux[G, H],
i2: IsHCons.Aux[H, _ <: FieldType[K, V], HNil],
Expand All @@ -69,7 +93,7 @@ package object functions extends Udf with UnaryFunctions {
i6: IsHCons.Aux[VS, V, HNil],
i7: TypedEncoder[V],
i8: ClassTag[A]
): TypedColumn[T, A] = {
): TypedColumn[T, A] = {
val expr = {
val field: H = i1(i0.to(value))
val v: V = i6.head(i4(field))
Expand All @@ -90,14 +114,24 @@ package object functions extends Udf with UnaryFunctions {
)
}

/** Creates a [[frameless.TypedColumn]] of literal value
* for an optional Value class `A`.
*
* @tparam A the value class
* @tparam T the row type
*/
def litValue[A : IsValueClass, T, G <: ::[_, HNil], H <: ::[_ <: FieldType[_ <: Symbol, _], HNil], K <: Symbol, V, KS <: ::[_ <: Symbol, HNil], VS <: HList](value: Option[A])(
implicit
/**
* Creates a [[frameless.TypedColumn]] of literal value
* for an optional Value class `A`.
*
* @tparam A the value class
* @tparam T the row type
*/
def litValue[
A: IsValueClass,
T,
G <: ::[_, HNil],
H <: ::[_ <: FieldType[_ <: Symbol, _], HNil],
K <: Symbol,
V,
KS <: ::[_ <: Symbol, HNil],
VS <: HList
](value: Option[A]
)(implicit
i0: LabelledGeneric.Aux[A, G],
i1: DropUnitValues.Aux[G, H],
i2: IsHCons.Aux[H, _ <: FieldType[K, V], HNil],
Expand All @@ -107,7 +141,7 @@ package object functions extends Udf with UnaryFunctions {
i6: IsHCons.Aux[VS, V, HNil],
i7: TypedEncoder[V],
i8: ClassTag[A]
): TypedColumn[T, Option[A]] = {
): TypedColumn[T, Option[A]] = {
val expr = value match {
case Some(some) => {
val field: H = i1(i0.to(some))
Expand Down
Loading

0 comments on commit f559a6c

Please sign in to comment.