Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.5 support #759

Merged
merged 8 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ on:

env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SBT_OPTS: '-Xms1g -Xmx4g'
SPARK_LOCAL_IP: localhost


Expand Down Expand Up @@ -162,7 +163,7 @@ jobs:
- name: Submit Dependencies
uses: scalacenter/sbt-dependency-submission@v2
with:
modules-ignore: root-spark33_2.13 root-spark33_2.12 docs_2.13 docs_2.12 root-spark34_2.13 root-spark34_2.12 root-spark32_2.13 root-spark32_2.12
modules-ignore: root-spark33_2.13 root-spark33_2.12 docs_2.13 docs_2.12 root-spark34_2.13 root-spark34_2.12 root-spark35_2.13 root-spark35_2.12 root-spark32_2.13 root-spark32_2.12
configs-ignore: test scala-tool scala-doc-tool test-internal

site:
Expand Down
33 changes: 17 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,23 @@ associated channels (e.g. GitHub, Discord) to be a safe and friendly environment
The compatible versions of [Spark](http://spark.apache.org/) and
[cats](https://github.com/typelevel/cats) are as follows:

| Frameless | Spark | Cats | Cats-Effect | Scala |
|-----------|-----------------------|----------|-------------|-------------|
| 0.14.1 | 3.4.0 / 3.3.0 / 3.2.2 | 2.x | 3.x | 2.12 / 2.13 |
| 0.14.0 | 3.3.0 / 3.2.2 / 3.1.3 | 2.x | 3.x | 2.12 / 2.13 |
| 0.13.0 | 3.3.0 / 3.2.2 / 3.1.3 | 2.x | 3.x | 2.12 / 2.13 |
| 0.12.0 | 3.2.1 / 3.1.3 / 3.0.3 | 2.x | 3.x | 2.12 / 2.13 |
| 0.11.1 | 3.2.0 / 3.1.2 / 3.0.1 | 2.x | 2.x | 2.12 / 2.13 |
| 0.11.0* | 3.2.0 / 3.1.2 / 3.0.1 | 2.x | 2.x | 2.12 / 2.13 |
| 0.10.1 | 3.1.0 | 2.x | 2.x | 2.12 |
| 0.9.0 | 3.0.0 | 1.x | 1.x | 2.12 |
| 0.8.0 | 2.4.0 | 1.x | 1.x | 2.11 / 2.12 |
| 0.7.0 | 2.3.1 | 1.x | 1.x | 2.11 |
| 0.6.1 | 2.3.0 | 1.x | 0.8 | 2.11 |
| 0.5.2 | 2.2.1 | 1.x | 0.8 | 2.11 |
| 0.4.1 | 2.2.0 | 1.x | 0.8 | 2.11 |
| 0.4.0 | 2.2.0 | 1.0.0-IF | 0.4 | 2.11 |
| Frameless | Spark | Cats | Cats-Effect | Scala |
|-----------|-------------------------------|----------|-------------|-------------|
| 0.14.2 | 3.5.0 / 3.4.0 / 3.3.0 / 3.2.2 | 2.x | 3.x | 2.12 / 2.13 |
| 0.14.1 | 3.4.0 / 3.3.0 / 3.2.2 | 2.x | 3.x | 2.12 / 2.13 |
| 0.14.0 | 3.3.0 / 3.2.2 / 3.1.3 | 2.x | 3.x | 2.12 / 2.13 |
| 0.13.0 | 3.3.0 / 3.2.2 / 3.1.3 | 2.x | 3.x | 2.12 / 2.13 |
| 0.12.0 | 3.2.1 / 3.1.3 / 3.0.3 | 2.x | 3.x | 2.12 / 2.13 |
| 0.11.1 | 3.2.0 / 3.1.2 / 3.0.1 | 2.x | 2.x | 2.12 / 2.13 |
| 0.11.0* | 3.2.0 / 3.1.2 / 3.0.1 | 2.x | 2.x | 2.12 / 2.13 |
| 0.10.1 | 3.1.0 | 2.x | 2.x | 2.12 |
| 0.9.0 | 3.0.0 | 1.x | 1.x | 2.12 |
| 0.8.0 | 2.4.0 | 1.x | 1.x | 2.11 / 2.12 |
| 0.7.0 | 2.3.1 | 1.x | 1.x | 2.11 |
| 0.6.1 | 2.3.0 | 1.x | 0.8 | 2.11 |
| 0.5.2 | 2.2.1 | 1.x | 0.8 | 2.11 |
| 0.4.1 | 2.2.0 | 1.x | 0.8 | 2.11 |
| 0.4.0 | 2.2.0 | 1.0.0-IF | 0.4 | 2.11 |

_\* 0.11.0 has broken Spark 3.1.2 and 3.0.1 artifacts published._

Expand Down
82 changes: 77 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
val sparkVersion = "3.4.1"
val sparkVersion = "3.5.0"
val spark34Version = "3.4.1"
val spark33Version = "3.3.3"
val spark32Version = "3.2.4"
val catsCoreVersion = "2.10.0"
Expand All @@ -15,7 +16,7 @@ val nakedFSVersion = "0.1.0"
val Scala212 = "2.12.18"
val Scala213 = "2.13.12"

ThisBuild / tlBaseVersion := "0.14"
ThisBuild / tlBaseVersion := "0.15"

ThisBuild / crossScalaVersions := Seq(Scala213, Scala212)
ThisBuild / scalaVersion := Scala212
Expand All @@ -24,12 +25,29 @@ lazy val root = project
.in(file("."))
.enablePlugins(NoPublishPlugin)
.settings(crossScalaVersions := Nil)
.aggregate(`root-spark34`, `root-spark33`, `root-spark32`, docs)
.aggregate(
`root-spark35`,
`root-spark34`,
`root-spark33`,
`root-spark32`,
docs
)

lazy val `root-spark35` = project
.in(file(".spark35"))
.enablePlugins(NoPublishPlugin)
.aggregate(core, cats, dataset, refined, ml)

lazy val `root-spark34` = project
.in(file(".spark34"))
.enablePlugins(NoPublishPlugin)
.aggregate(core, cats, dataset, refined, ml)
.aggregate(
core,
`cats-spark34`,
`dataset-spark34`,
`refined-spark34`,
`ml-spark34`
)

lazy val `root-spark33` = project
.in(file(".spark33"))
Expand Down Expand Up @@ -61,6 +79,15 @@ lazy val cats = project
.settings(catsSettings)
.dependsOn(dataset % "test->test;compile->compile;provided->provided")

lazy val `cats-spark34` = project
.settings(name := "frameless-cats-spark34")
.settings(sourceDirectory := (cats / sourceDirectory).value)
.settings(catsSettings)
.settings(spark34Settings)
.dependsOn(
`dataset-spark34` % "test->test;compile->compile;provided->provided"
)

lazy val `cats-spark33` = project
.settings(name := "frameless-cats-spark33")
.settings(sourceDirectory := (cats / sourceDirectory).value)
Expand Down Expand Up @@ -91,6 +118,20 @@ lazy val dataset = project
.settings(sparkDependencies(sparkVersion))
.dependsOn(core % "test->test;compile->compile")

lazy val `dataset-spark34` = project
.settings(name := "frameless-dataset-spark34")
.settings(sourceDirectory := (dataset / sourceDirectory).value)
.settings(
Compile / unmanagedSourceDirectories += (dataset / baseDirectory).value / "src" / "main" / "spark-3.4+"
)
.settings(
Test / unmanagedSourceDirectories += (dataset / baseDirectory).value / "src" / "test" / "spark-3.3+"
)
.settings(datasetSettings)
.settings(sparkDependencies(spark34Version))
.settings(spark34Settings)
.dependsOn(core % "test->test;compile->compile")

lazy val `dataset-spark33` = project
.settings(name := "frameless-dataset-spark33")
.settings(sourceDirectory := (dataset / sourceDirectory).value)
Expand Down Expand Up @@ -124,6 +165,15 @@ lazy val refined = project
.settings(refinedSettings)
.dependsOn(dataset % "test->test;compile->compile;provided->provided")

lazy val `refined-spark34` = project
.settings(name := "frameless-refined-spark34")
.settings(sourceDirectory := (refined / sourceDirectory).value)
.settings(refinedSettings)
.settings(spark34Settings)
.dependsOn(
`dataset-spark34` % "test->test;compile->compile;provided->provided"
)

lazy val `refined-spark33` = project
.settings(name := "frameless-refined-spark33")
.settings(sourceDirectory := (refined / sourceDirectory).value)
Expand Down Expand Up @@ -151,6 +201,17 @@ lazy val ml = project
dataset % "test->test;compile->compile;provided->provided"
)

lazy val `ml-spark34` = project
.settings(name := "frameless-ml-spark34")
.settings(sourceDirectory := (ml / sourceDirectory).value)
.settings(mlSettings)
.settings(sparkMlDependencies(spark34Version))
.settings(spark34Settings)
.dependsOn(
core % "test->test;compile->compile",
`dataset-spark33` % "test->test;compile->compile;provided->provided"
)

lazy val `ml-spark33` = project
.settings(name := "frameless-ml-spark33")
.settings(sourceDirectory := (ml / sourceDirectory).value)
Expand Down Expand Up @@ -326,9 +387,18 @@ lazy val spark32Settings = Seq(
tlVersionIntroduced := Map("2.12" -> "0.13.0", "2.13" -> "0.13.0")
)

lazy val spark34Settings = Seq[Setting[_]](
tlVersionIntroduced := Map("2.12" -> "0.14.1", "2.13" -> "0.14.1"),
mimaPreviousArtifacts := Set(
organization.value %% moduleName.value
.split("-")
.dropRight(1)
.mkString("-") % "0.14.1"
)
)

lazy val spark33Settings = Seq[Setting[_]](
tlVersionIntroduced := Map("2.12" -> "0.13.0", "2.13" -> "0.13.0"),
// frameless-dataset-spark33 was originally frameless-dataset
mimaPreviousArtifacts := Set(
organization.value %% moduleName.value
.split("-")
Expand Down Expand Up @@ -399,3 +469,5 @@ ThisBuild / githubWorkflowBuildMatrixAdditions += "project" -> roots
ThisBuild / githubWorkflowBuildMatrixExclusions ++= roots.init.map { project =>
MatrixExclude(Map("scala" -> "2.13", "project" -> project))
}

ThisBuild / githubWorkflowEnv += "SBT_OPTS" -> "-Xms1g -Xmx4g"
110 changes: 72 additions & 38 deletions dataset/src/main/scala/frameless/functions/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,50 @@ import scala.reflect.ClassTag
import shapeless._
import shapeless.labelled.FieldType
import shapeless.ops.hlist.IsHCons
import shapeless.ops.record.{Keys, Values}
import shapeless.ops.record.{ Keys, Values }

import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.{ reflection => ScalaReflection }
import org.apache.spark.sql.catalyst.expressions.Literal

package object functions extends Udf with UnaryFunctions {

object aggregate extends AggregateFunctions
object nonAggregate extends NonAggregateFunctions

/** Creates a [[frameless.TypedAggregate]] of literal value. If A is to be encoded using an Injection make
* sure the injection instance is in scope.
*
* apache/spark
*/
def litAggr[A, T](value: A)(implicit i0: TypedEncoder[A], i1: Refute[IsValueClass[A]]): TypedAggregate[T, A] =
/**
* Creates a [[frameless.TypedAggregate]] of literal value. If A is to be encoded using an Injection make
* sure the injection instance is in scope.
*
* apache/spark
*/
def litAggr[A, T](
value: A
)(implicit
i0: TypedEncoder[A],
i1: Refute[IsValueClass[A]]
): TypedAggregate[T, A] =
new TypedAggregate[T, A](lit(value).expr)

/** Creates a [[frameless.TypedColumn]] of literal value. If A is to be encoded using an Injection make
* sure the injection instance is in scope.
*
* apache/spark
*
* @tparam A the literal value type
* @tparam T the row type
*/
def lit[A, T](value: A)(
implicit encoder: TypedEncoder[A]): TypedColumn[T, A] = {

if (ScalaReflection.isNativeType(encoder.jvmRepr) && encoder.catalystRepr == encoder.jvmRepr) {
/**
* Creates a [[frameless.TypedColumn]] of literal value. If A is to be encoded using an Injection make
* sure the injection instance is in scope.
*
* apache/spark
*
* @tparam A the literal value type
* @tparam T the row type
*/
def lit[A, T](
value: A
)(implicit
encoder: TypedEncoder[A]
): TypedColumn[T, A] = {

if (
ScalaReflection.isNativeType(
encoder.jvmRepr
) && encoder.catalystRepr == encoder.jvmRepr
) {
val expr = Literal(value, encoder.catalystRepr)

new TypedColumn(expr)
Expand All @@ -52,14 +66,24 @@ package object functions extends Udf with UnaryFunctions {
}
}

/** Creates a [[frameless.TypedColumn]] of literal value
* for a Value class `A`.
*
* @tparam A the value class
* @tparam T the row type
*/
def litValue[A : IsValueClass, T, G <: ::[_, HNil], H <: ::[_ <: FieldType[_ <: Symbol, _], HNil], K <: Symbol, V, KS <: ::[_ <: Symbol, HNil], VS <: HList](value: A)(
implicit
/**
* Creates a [[frameless.TypedColumn]] of literal value
* for a Value class `A`.
*
* @tparam A the value class
* @tparam T the row type
*/
def litValue[
A: IsValueClass,
T,
G <: ::[_, HNil],
H <: ::[_ <: FieldType[_ <: Symbol, _], HNil],
K <: Symbol,
V,
KS <: ::[_ <: Symbol, HNil],
VS <: HList
](value: A
)(implicit
i0: LabelledGeneric.Aux[A, G],
i1: DropUnitValues.Aux[G, H],
i2: IsHCons.Aux[H, _ <: FieldType[K, V], HNil],
Expand All @@ -69,7 +93,7 @@ package object functions extends Udf with UnaryFunctions {
i6: IsHCons.Aux[VS, V, HNil],
i7: TypedEncoder[V],
i8: ClassTag[A]
): TypedColumn[T, A] = {
): TypedColumn[T, A] = {
val expr = {
val field: H = i1(i0.to(value))
val v: V = i6.head(i4(field))
Expand All @@ -90,14 +114,24 @@ package object functions extends Udf with UnaryFunctions {
)
}

/** Creates a [[frameless.TypedColumn]] of literal value
* for an optional Value class `A`.
*
* @tparam A the value class
* @tparam T the row type
*/
def litValue[A : IsValueClass, T, G <: ::[_, HNil], H <: ::[_ <: FieldType[_ <: Symbol, _], HNil], K <: Symbol, V, KS <: ::[_ <: Symbol, HNil], VS <: HList](value: Option[A])(
implicit
/**
* Creates a [[frameless.TypedColumn]] of literal value
* for an optional Value class `A`.
*
* @tparam A the value class
* @tparam T the row type
*/
def litValue[
A: IsValueClass,
T,
G <: ::[_, HNil],
H <: ::[_ <: FieldType[_ <: Symbol, _], HNil],
K <: Symbol,
V,
KS <: ::[_ <: Symbol, HNil],
VS <: HList
](value: Option[A]
)(implicit
i0: LabelledGeneric.Aux[A, G],
i1: DropUnitValues.Aux[G, H],
i2: IsHCons.Aux[H, _ <: FieldType[K, V], HNil],
Expand All @@ -107,7 +141,7 @@ package object functions extends Udf with UnaryFunctions {
i6: IsHCons.Aux[VS, V, HNil],
i7: TypedEncoder[V],
i8: ClassTag[A]
): TypedColumn[T, Option[A]] = {
): TypedColumn[T, Option[A]] = {
val expr = value match {
case Some(some) => {
val field: H = i1(i0.to(some))
Expand Down
Loading
Loading