Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow column operations on 'Option[X]' that are valid for 'X' #204

Closed
imarios opened this issue Nov 8, 2017 · 19 comments
Closed

Allow column operations on 'Option[X]' that are valid for 'X' #204

imarios opened this issue Nov 8, 2017 · 19 comments
Assignees
Labels

Comments

@imarios
Copy link
Contributor

imarios commented Nov 8, 2017

In Frameless if you have a column that may contain null values you should encode it using Option[]. This is great and typesafe, but it does make working with these columns harder. For example, you cannot do simple math operations, like *,+,-. I think there is a clear strategy here, if the result is null then the result of the operation will also be null (that's what vanilla Spark also does).

In a way

Option(2) * 2
> Option(4)
None * 10
> None

One way to do this is to pretty much write a lot of duplicate code:

// What we have for the + op
def +(u: U)(implicit n: CatalystNumeric[U]): TypedColumn[T, U] = self.untyped.plus(u).typed
// We can re-write this to support what we described 
def +[X](u: X)(implicit n: U <:< Option[X], x: CatalystNumeric[X]): TypedColumn[T, U] = self.untyped.plus(u).typed

// Usage example
case class X(a: Option[Int])
val t = TypedDataset.create(Seq(X(None), X(Some(2))))
scala> t.select(t('a)+10+90).show().run
+----+
|  _1|
+----+
|null|
| 102|
+----+

scala> t.select(t('a)+10+90).collect().run
res6: Seq[Option[Int]] = WrappedArray(None, Some(102))

This would work for sure, but it will need us to write a LOT of boilerplate code. Any more ideas on how we can do this without having to duplicate all the methods?

@imarios imarios changed the title Allow column operations on Option[X] that are valid for X Allow column operations on 'Option[X]' that are valid for 'X' Nov 8, 2017
@OlivierBlanvillain
Copy link
Contributor

OlivierBlanvillain commented Nov 9, 2017

If you want to factor out these two function I guess you could parametrise over a higher kinded type to have a + function that works on both Id and Option:

trait OptionOrId[X, U] { type Out[t] }

trait LowPrioOptionOrId {
  implicit def id[X, U](implicit X <:< U): OptionOrId[X, U] { type Out[t] = t }
}
object OptionOrId extends LowPrioOptionOrId {
  implicit def opt[X, U](implicit X <:< Option[U]): OptionOrId[X, U] { type Out[t] = Option[t] }
}

def +[X, F[_]](u: X)
  (implicit
    o: OptionOrId[X, U] { type Out[t] = F[t] },
    x: CatalystNumeric[X]
  ): TypedColumn[T, F[U]] =
    self.untyped.plus(u).typed

But that doesn't sound very clean to me, this operation you are describing is simply map, I would much rather like to see that in source (it should be possible to do that without generating anything different at runtime):

t.select(t('a).map(_ + 10 + 90)).collect().run

@imarios
Copy link
Contributor Author

imarios commented Nov 9, 2017

Nice :) it is a map().

@imarios
Copy link
Contributor Author

imarios commented Nov 10, 2017

@OlivierBlanvillain Here is a quick implementation for map.

 def map[X, G](u: TypedColumn[T, X] => TypedColumn[T, G])(implicit x: U <:< Option[X]): TypedColumn[T, Option[G]] =
    u(self.asInstanceOf[TypedColumn[T, X]]).asInstanceOf[TypedColumn[T, Option[G]]]

Here is how to use it (needs the types, inference doesn't work there).

t.select(t('a).map[Int,Int](_ * 2)).collect().run

@OlivierBlanvillain
Copy link
Contributor

OlivierBlanvillain commented Nov 10, 2017

Is it even possible to change the type of a columns with such map? If you remove G and add it via an implicit class you might be able to avoid annotating types

@imarios
Copy link
Contributor Author

imarios commented Nov 11, 2017

@OlivierBlanvillain it can generate a different type. For example, devision on an integer results to a double.

I also discovered that Spark shows some rather interesting behaviors with nulls. For example, null * 2 = null (makes sense), null / 2 = 0.0 ... interesting. So if we leave it up to Spark, we need to accept that these are reasonable conversions?

@OlivierBlanvillain
Copy link
Contributor

I would always wrap results in Option when Options are involved 😄

@imarios
Copy link
Contributor Author

imarios commented Nov 26, 2017

@OlivierBlanvillain here is an alternative syntax that has better type inference:

sealed class TypedColumn[T, U](
  val expr: Expression)(
  implicit
  val uencoder: TypedEncoder[U]
) extends UntypedExpression[T] { self =>

   // ...
   trait Mapper[X] {
      def map[G](u: TypedColumn[T, X] => TypedColumn[T, G]): TypedColumn[T, Option[G]] = 
        u(self.asInstanceOf[TypedColumn[T, X]]).asInstanceOf[TypedColumn[T, Option[G]]]
   }
   def opt[X](implicit x: U <:< Option[X]): Mapper[X] = new Mapper[X] {}
   // ...
}

Usage:

case class X(i: Option[Int])
val t = TypedDataset.create(Seq(None,None,X(Some(2))))
t.select(t('i).opt.map(_*2)).show().run
+----+
|  _1|
+----+
|null|
|null|
|   4|
+----+

I am not supper fan of it ... i would love to hear you input.

@imarios
Copy link
Contributor Author

imarios commented Nov 26, 2017

@OlivierBlanvillain is this kind of type inference better in dotty ? : )

@imarios
Copy link
Contributor Author

imarios commented Nov 26, 2017

hmm ... there is a problem to allow for any TypedColumn[T, X] => TypedColumn[T, G]. Spark can handle null * 2 and will return null but this does not work as smooth for UDFs. So if we apply a UDF in our TypedColumn[T, X] => TypedColumn[T, G] it will not handle null correctly and will fail.

val u = t.makeUDF( (x: Int) => x + 1)

t.select(t('i).opt.map(x => u(x))).show().run
java.lang.ClassCastException: scala.None$ cannot be cast to java.lang.Integer
  at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
  at $anonfun$1.apply(<console>:25)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$FramelessUdfEvalImpl.apply(Unknown Source)
  at frameless.functions.FramelessUdf.eval(Udf.scala:125)
  at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:144)
  at org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:48)
  at org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:30)

@OlivierBlanvillain
Copy link
Contributor

OlivierBlanvillain commented Nov 26, 2017

I'm fine with the syntax. About UDF, I guess we could restrict them a bit to make incompatible with .opt.map, something like df.udf.apply(myUdf).

@imarios
Copy link
Contributor Author

imarios commented Nov 26, 2017

@OlivierBlanvillain I am not sure how we can restrict the use of UDFs inside map(). A udf takes a column as input and generates a column as output. If you see map inside Mapper, it's just a function that takes a column and returns a column. I feel that anyone can throw a udf in there and there is nothing we can do to constrain that.

@OlivierBlanvillain
Copy link
Contributor

OlivierBlanvillain commented Nov 27, 2017

Inheritance could do the trick: (I'm not saying than this is a good idea, but it should be possible 😄)

trait TypedColumn[T] {
  def +

  // ...
  trait Mapper[X] {
     def map[G](u: TypedColumn[T, X] => TypedColumn[T, G]): TypedColumn[T, Option[G]] = 
       u(self.asInstanceOf[TypedColumn[T, X]]).asInstanceOf[TypedColumn[T, Option[G]]]
  }
  def opt[X](implicit x: U <:< Option[X]): Mapper[X] = new Mapper[X] {}
  // ...
}

trait UDFable_TypedColumn[T] extends TypedColumn[T] {
  def applyUdf(u: UDF) // Assuming we remove the apply from UDF,
                       // making this the only way to use them.
}

@imarios
Copy link
Contributor Author

imarios commented Jan 27, 2018

I have a fix for this that requires the changes we are doing to unify the projection and aggregation column types.

@frosforever
Copy link
Contributor

Hey just wanted to check in on this and offer up this very slight modification that I've been using if it helps anyone in the meantime: (in AbstractTypeColumn)

  trait Mapper[X] {
    def map[G, TC[_]](u: ThisType[T, X] => TC[G])(implicit ev: TC[G] <:< AbstractTypedColumn[T, G]): TC[Option[G]] =
      u(self.asInstanceOf[ThisType[T, X]]).asInstanceOf[TC[Option[G]]]
  }
  def opt[X](implicit x: U <:< Option[X]): Mapper[X] = new Mapper[X] {}

It behaves the same way as the above opt but handles TypedColumn or TypedAggregate for use in agg etc. I find this pretty much essential to work with options in frameless.

@OlivierBlanvillain
Copy link
Contributor

OlivierBlanvillain commented Apr 9, 2018

@frosforever Looks pretty neat! Given than it's not a method from vanilla Spark we would a bit more documentation to get it in, but I definitely how it would be useful.

@imarios
Copy link
Contributor Author

imarios commented Apr 9, 2018

Still has the same sensitivity with UDFs. I wanted to try to get this working with UDF but has not been that easy. Needed to better understand how the UDF operates internally. Do you think we can add this even though it has a UDF bug and just maybe document it somewhere?

@frosforever
Copy link
Contributor

frosforever commented Apr 10, 2018

Yes, in my view, we should get something like this in with a caveat comment. We've already merged some things that don't behave well (eg ordering with spark 2.2 #236 (comment)) and this, or something like it, is essential for working with real world data that's very often full of Options.

Sadly, I don't have an idea how to get this to behave well with UDF. Options in general are proving difficult to work with in a sane manner (been struggling with some ordering stuff). Perhaps we can try it as a macro that simply doesn't allow UDF?

@imarios imarios self-assigned this Dec 16, 2020
@ayoub-benali
Copy link
Contributor

@imarios I guess this one can closed due to #479 ?

@imarios
Copy link
Contributor Author

imarios commented Jan 6, 2021

@ayoub-benali yup. Let me close this one.

@imarios imarios closed this as completed Jan 6, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants