Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Frameless to explode Map[A,B] #488

Merged
merged 3 commits into from
Nov 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions dataset/src/main/scala/frameless/TypedDataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1195,12 +1195,60 @@ class TypedDataset[T] protected[frameless](val dataset: Dataset[T])(implicit val
i6: Tupler.Aux[OutModValues, Out],
i7: TypedEncoder[Out]
): TypedDataset[Out] = {
val df = dataset.toDF()
import org.apache.spark.sql.functions.{explode => sparkExplode}
val df = dataset.toDF()

val trans =
df
.withColumn(column.value.name, sparkExplode(df(column.value.name)))
.as[Out](TypedExpressionEncoder[Out])
TypedDataset.create[Out](trans)
}

/**
* Explodes a single column at a time. It only compiles if the type of column supports this operation.
*
* @example
*
* {{{
* case class X(i: Int, j: Map[Int, Int])
* case class Y(i: Int, j: (Int, Int))
*
* val f: TypedDataset[X] = ???
* val fNew: TypedDataset[Y] = f.explodeMap('j).as[Y]
Comment on lines +1214 to +1218
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a working example

* }}}
* @param column the column we wish to explode
*/
def explodeMap[A, B, V[_, _], TRep <: HList, OutMod <: HList, OutModValues <: HList, Out]
(column: Witness.Lt[Symbol])
(implicit
i0: TypedColumn.Exists[T, column.T, V[A, B]],
i1: TypedEncoder[A],
i2: TypedEncoder[B],
i3: LabelledGeneric.Aux[T, TRep],
i4: Modifier.Aux[TRep, column.T, V[A,B], (A, B), OutMod],
i5: Values.Aux[OutMod, OutModValues],
i6: Tupler.Aux[OutModValues, Out],
i7: TypedEncoder[Out]
): TypedDataset[Out] = {
import org.apache.spark.sql.functions.{explode => sparkExplode, struct => sparkStruct, col => sparkCol}
val df = dataset.toDF()

// preserve the original list of columns
val columns = df.columns.toSeq.map(sparkCol)
// select all columns, all original columns and [key, value] columns appeared after the map explode
// .withColumn(column.value.name, sparkExplode(df(column.value.name))) in this case would not work
// since the map explode produces two columns
val exploded = df.select(sparkCol("*"), sparkExplode(df(column.value.name)))
val trans =
df.withColumn(column.value.name,
sparkExplode(df(column.value.name))).as[Out](TypedExpressionEncoder[Out])
exploded
// map explode explodes it into [key, value] columns
// the only way to put it into a column is to create a struct
// TODO: handle org.apache.spark.sql.AnalysisException: Reference 'key / value' is ambiguous, could be: key / value, key / value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ayoub-benali I pushed into your branch; one minor thing here is that we don't handle duplicate column names;
i.e.

// for the case class
case class Test(key: Int, m: Map[String, Int])
// explode function produces the following schema
// key, key, value

Copy link
Member

@pomadchin pomadchin Nov 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are multiple ways to workaround it (i.e. rename all input columns first, perform explode, rename them back)
But I didn't have time to try that.

I also don't remember if there is a select by index / some elegant way to handle it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, mb aliasing would not work as well due to interpreter limitations. Can be just a feature than (: I don't think there are good ways to solve that behavior in the native spark API as well.

.withColumn(column.value.name, sparkStruct(exploded("key"), exploded("value")))
// selecting only original columns, we don't need [key, value] columns left in the DataFrame after the map explode
.select(columns: _*)
.as[Out](TypedExpressionEncoder[Out])
TypedDataset.create[Out](trans)
}

Expand Down
31 changes: 30 additions & 1 deletion dataset/src/test/scala/frameless/ExplodeTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import org.scalacheck.Prop._

import scala.reflect.ClassTag


class ExplodeTests extends TypedDatasetSuite {
test("simple explode test") {
val ds = TypedDataset.create(Seq((1,Array(1,2))))
Expand Down Expand Up @@ -48,4 +47,34 @@ class ExplodeTests extends TypedDatasetSuite {
check(forAll(prop[Int] _))
check(forAll(prop[String] _))
}

test("explode on maps") {
def prop[A: TypedEncoder: ClassTag, B: TypedEncoder: ClassTag](xs: List[X1[Map[A, B]]]): Prop = {
val tds = TypedDataset.create(xs)

val framelessResults = tds.explodeMap('a).collect().run().toVector
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OlivierBlanvillain @imarios I would need you help here if possible since I know nothing about shapeless :/
The compiler is complaining about
No column Symbol with shapeless.tag.Tagged[String("a")] of type scala.collection.immutable.Map[A,B] in frameless.X1[scala.collection.immutable.Map[A,A]]

Am I missing something in the definition of explodeMap ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @ayoub-benali, sorry for the delay, I was trying to get the new version out and then help another older PR to get merged. Let me take a look and help you out

val scalaResults = xs.flatMap(_.a.toList).map(t => Tuple1(Tuple2(t._1, t._2))).toVector

framelessResults ?= scalaResults
}

check(forAll(prop[Long, String] _))
check(forAll(prop[Int, Long] _))
check(forAll(prop[String, Int] _))
}

test("explode on maps preserving other columns") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an extra check that verifyies that explode does not drop other columns

def prop[K: TypedEncoder: ClassTag, A: TypedEncoder: ClassTag, B: TypedEncoder: ClassTag](xs: List[X2[K, Map[A, B]]]): Prop = {
val tds = TypedDataset.create(xs)

val framelessResults = tds.explodeMap('b).collect().run().toVector
val scalaResults = xs.flatMap { x2 => x2.b.toList.map((x2.a, _)) }.toVector

framelessResults ?= scalaResults
}

check(forAll(prop[Int, Long, String] _))
check(forAll(prop[String, Int, Long] _))
check(forAll(prop[Long, String, Int] _))
}
}