Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Window functions and column sorting #225

Closed
wants to merge 15 commits into from

Conversation

frosforever
Copy link
Contributor

@frosforever frosforever commented Dec 9, 2017

Connect to #164 and #136

Still a WIP but wanted to put this out there for early feedback before going whole hog.
Currently the only window supported is dense_rank and window frames aren't supported. There's also not yet support for sorting on user defined types.

TODO: UDF
*/

implicit def orderedEvidence[A](implicit catalystOrdered: CatalystOrdered[A]): CatalystRowOrdered[A] = of[A]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be dangerous if something gets added to CatalysOrdered that's not supported in sorting. Does anyone know if there even are any types like that? Would it be safer to explicitly state all the allowable types here even if there is some duplication?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it works nicely with CatalysOrdered I would keep it this way for now. Just add a note in object CatalysOrdered saying than if someone adds a new case here he should also update the window function tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this extra type class? Why wouldn’t Catalyst order cover this case as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow. Are you asking why is CatalystRowOrdered needed, shouldn't CatalystOrdered be enough? The reason for that is you can order rows by things that aren't comparable within a row, for example Arrays.

scala> spark.createDataset(Seq((Seq(1, 2, 3), Seq(2,3,1))))
res3: org.apache.spark.sql.Dataset[(Seq[Int], Seq[Int])] = [_1: array<int>, _2: array<int>]

scala> res3.select(untyped.col("_1") > untyped.col("_2")).show
org.apache.spark.sql.AnalysisException: cannot resolve '(`_1` > `_2`)' due to data type mismatch: '(`_1` > `_2`)' requires (boolean or tinyint or smallint or int or bigint or float or double or decimal or timestamp or date or string or binary) type, not array<int>;;

scala> res3.sort(untyped.col("_1")).show()
+---------+---------+
|       _1|       _2|
+---------+---------+
|[1, 2, 3]|[2, 3, 1]|
+---------+---------+

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks more like a bug than a feature ... not yours ofc, on Spark side. Is there any particular reason why they made this design choice?

Copy link
Contributor

@imarios imarios Dec 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that this is fixed in Spark 2.3: https://issues.apache.org/jira/browse/SPARK-21110. Can you also take a look and verify if this is the same bug. It might save us a lot of extra code if this magically goes away. We can then just focus on one type-class for CatalystOrdered it seems?
Here is the PR:
apache/spark#18818

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good find @imarios! I honestly have no idea if there's a reason for the design choice. I'll grab a more serious look at the fix sometime in the next few days.
If this does indeed fix everything, are we targeting 2.3 for the next frameless version? Does it make sense to have keep this in for earlier version support?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following up on this. I was able to test the above code snippet ☝️ https://github.com/typelevel/frameless/pull/225/files#r158376192 using spark 2.3.0-SNAPSHOT and it does indeed work!

*
* apache/spark
*/
def descNullsFirst(implicit isOption: U <:< Option[_], catalystRowOrdering: CatalystRowOrdered[U]): TypedSortedColumn[T, U] =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep the naming similar to Spark's with Nulls or rename this to a more Scala friendly descNonesFirst?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given than they are options involved, I think I like descNonesFirst more

trait WindowFunctions {
import WindowFunctionsHelpers.dense_rankObj

def dense_rank() = dense_rankObj
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to keep the same syntax as Spark where you call dense_rank().over(windowSpec) or just take the window spec as an argument to dense_rank? There's no valid dense_rank without a window spec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion on this. Staying close to vanilla is a plus but here it really makes little sense 😄. To be consistent I would CamelCase it either-way.

@codecov-io
Copy link

codecov-io commented Dec 9, 2017

Codecov Report

Merging #225 into master will decrease coverage by 1.1%.
The diff coverage is 75.47%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #225      +/-   ##
==========================================
- Coverage   96.15%   95.04%   -1.11%     
==========================================
  Files          52       55       +3     
  Lines         936      989      +53     
  Branches       15       14       -1     
==========================================
+ Hits          900      940      +40     
- Misses         36       49      +13
Impacted Files Coverage Δ
...in/scala/frameless/functions/WindowFunctions.scala 100% <100%> (ø)
...ataset/src/main/scala/frameless/TypedDataset.scala 91.97% <50%> (-1.93%) ⬇️
dataset/src/main/scala/frameless/TypedWindow.scala 55.55% <55.55%> (ø)
.../src/main/scala/frameless/CatalystRowOrdered.scala 88.88% <88.88%> (ø)
dataset/src/main/scala/frameless/TypedColumn.scala 98.9% <94.44%> (-1.1%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 820d1f7...4eded7d. Read the comment docs.

@@ -99,7 +99,7 @@ lazy val commonScalacOptions = Seq(
"-encoding", "UTF-8",
"-feature",
"-unchecked",
"-Xfatal-warnings",
// "-Xfatal-warnings",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know. it won't be there in the final version

}

class SortTests extends TypedDatasetSuite {
test("bad udt") {
Copy link
Contributor Author

@frosforever frosforever Dec 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could use some help with this. Unclear how to support UDTs via Record types etc while preventing those whose underlying sqlType are not sortable. The generic Record support is super helpful for StructTypes.
While this is an admittedly contrived example, it's certainly possible to hit this.
Any ideas?

@imarios
Copy link
Contributor

imarios commented Dec 21, 2017

Will be giving a look at this in the next few days

@@ -485,6 +539,28 @@ sealed class TypedAggregate[T, U](val expr: Expression)(
}
}

sealed class TypedSortedColumn[T, U](val expr: Expression)(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trying to see if we really need a new type here. I assume there are methods that only work on sorted columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes exactly! Annoyingly there are also methods that specifically don't work on already sorted columns. I can post up a few examples when I get a moment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, selecting a sum on a sorted column blows up

import org.apache.spark.sql.{functions=>untyped}

ds.select(untyped.sum(untyped.col("a"))).show()
/*
+------+
|sum(a)|
+------+
|  null|
+------+
*/
ds.select(untyped.sum(untyped.col("a").desc)).show()
/*
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_sum(cast(a#2 DESC NULLS LAST as double))], output=[sum#21])
   +- LocalTableScan <empty>, [a#2]

  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
  at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:115)
...
*/

TODO: UDF
*/

implicit def orderedEvidence[A](implicit catalystOrdered: CatalystOrdered[A]): CatalystRowOrdered[A] = of[A]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks more like a bug than a feature ... not yours ofc, on Spark side. Is there any particular reason why they made this design choice?

@OlivierBlanvillain
Copy link
Contributor

@frosforever thanks for the PR! Is it still work of process? Just ping us when you need a review 😄

@frosforever
Copy link
Contributor Author

Hey @OlivierBlanvillain Yes this is still a WIP and it's still an open question if 2.3 will remove a bunch of the required code here. See #225 (comment). It seems like 2.3-RC1 was recently released. I need to find the time to try it out but looking at apache/spark#18818, it does seem like it fixes that issue by consolidating between row ordering and regular orderings.

Perhaps the never version of frameless should only target 2.3? Do we have a story on supporting multiple spark versions?

@OlivierBlanvillain
Copy link
Contributor

Do we have a story on supporting multiple spark versions?

No, so far master has only targeted the latest Spark version. If someone is interested it shouldn't be too hard to cross compile, but not something that I personally want to spend time on.

@imarios
Copy link
Contributor

imarios commented Jan 21, 2018

@frosforever reviewing this PR again, makes me wonder why I had to reinvent the column ordering on another PR O.o. Oh well, let me read this again to see if there is any difference. We can probably take the best of the two and merge them. I think ordering on a columns is a big enough addition to be on a seperate PR. Leaving this one to focus on solving Window functions. What do you think?

@imarios
Copy link
Contributor

imarios commented Jan 21, 2018

@frosforever Working on #231 made me get this PR more now :). I like the idea of using Mapper and the Poly function to make sorting work for regular columns even if they are not explicitly SortedTypedColumns. Let's do this, can you take just the column sorting part of this PR, try to follow the orderByMany pattern from PR #231, copy the unit tests from there and create a seperate PR? Let's not confuse the CatalystRowOrdered part since that may change in 2.3 when they fix SPARK-21110.

@imarios
Copy link
Contributor

imarios commented Jan 21, 2018

Regarding sorted collection, I got this one working:

implicit def derivedOrderingForCollections[C[_], A]
  (implicit
   a: CatalystOrdered[A],
   b: CatalystCollection[C]
  ): CatalystOrdered[C[A]] = theInstance.asInstanceOf[CatalystOrdered[C[A]]]

This will cover all catalyst collections given that they all ecnode to Catalysts array<> so they should follow similar ordering logic.

@frosforever frosforever mentioned this pull request Jan 29, 2018
@frosforever frosforever mentioned this pull request Feb 10, 2018
@frosforever
Copy link
Contributor Author

closing in favor of #248

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants