Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Date time functions #355

Merged
merged 1 commit into from
Jan 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,10 @@ trait NonAggregateFunctions {
def upper[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, String] =
str.typed(sparkFunctions.upper(str.untyped))

//////////////////////////////////////////////////////////////////////////////////////////////
// DateTime functions
//////////////////////////////////////////////////////////////////////////////////////////////

/** Non-Aggregate function: Extracts the year as an integer from a given date/timestamp/string.
*
* Differs from `Column#year` by wrapping it's result into an `Option`.
Expand All @@ -715,4 +719,85 @@ trait NonAggregateFunctions {
*/
def year[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] =
str.typed(sparkFunctions.year(str.untyped))

/** Non-Aggregate function: Extracts the quarter as an integer from a given date/timestamp/string.
*
* Differs from `Column#quarter` by wrapping it's result into an `Option`.
*
* apache/spark
*/
def quarter[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] =
str.typed(sparkFunctions.quarter(str.untyped))

/** Non-Aggregate function Extracts the month as an integer from a given date/timestamp/string.
*
* Differs from `Column#month` by wrapping it's result into an `Option`.
*
* apache/spark
*/
def month[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] =
str.typed(sparkFunctions.month(str.untyped))

/** Non-Aggregate function: Extracts the day of the week as an integer from a given date/timestamp/string.
*
* Differs from `Column#dayofweek` by wrapping it's result into an `Option`.
*
* apache/spark
*/
def dayofweek[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] =
str.typed(sparkFunctions.dayofweek(str.untyped))

/** Non-Aggregate function: Extracts the day of the month as an integer from a given date/timestamp/string.
*
* Differs from `Column#dayofmonth` by wrapping it's result into an `Option`.
*
* apache/spark
*/
def dayofmonth[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] =
str.typed(sparkFunctions.dayofmonth(str.untyped))

/** Non-Aggregate function: Extracts the day of the year as an integer from a given date/timestamp/string.
*
* Differs from `Column#dayofyear` by wrapping it's result into an `Option`.
*
* apache/spark
*/
def dayofyear[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] =
str.typed(sparkFunctions.dayofyear(str.untyped))

/** Non-Aggregate function: Extracts the hours as an integer from a given date/timestamp/string.
*
* Differs from `Column#hour` by wrapping it's result into an `Option`.
*
* apache/spark
*/
def hour[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] =
str.typed(sparkFunctions.hour(str.untyped))

/** Non-Aggregate function: Extracts the minutes as an integer from a given date/timestamp/string.
*
* Differs from `Column#minute` by wrapping it's result into an `Option`.
*
* apache/spark
*/
def minute[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] =
str.typed(sparkFunctions.minute(str.untyped))

/** Non-Aggregate function: Extracts the seconds as an integer from a given date/timestamp/string.
*
* Differs from `Column#second` by wrapping it's result into an `Option`.
*
* apache/spark
*/
def second[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] =
str.typed(sparkFunctions.second(str.untyped))

/** Non-Aggregate function: Extracts the week number as an integer from a given date/timestamp/string.
*
* Differs from `Column#weekofyear` by wrapping it's result into an `Option`.
*
* apache/spark
*/
def weekofyear[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] =
str.typed(sparkFunctions.weekofyear(str.untyped))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package frameless.functions

import org.apache.spark.sql.Row

object DateTimeStringBehaviourUtils {
val nullHandler: Row => Option[Int] = _.get(0) match {
case i: Int => Some(i)
case _ => None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.io.File

import frameless.functions.nonAggregate._
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.{Column, Encoder, Row, SaveMode, functions => sparkFunctions}
import org.apache.spark.sql.{Column, Encoder, SaveMode, functions => sparkFunctions}
import org.scalacheck.Prop._
import org.scalacheck.{Arbitrary, Gen, Prop}

Expand Down Expand Up @@ -2206,32 +2206,151 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite {
check(forAll(prop[Option[Boolean], Long] _))
}

def dateTimeStringProp(typedDS: TypedDataset[X1[String]])
(typedCol: TypedColumn[X1[String], Option[Int]], sparkFunc: Column => Column): Prop = {
val spark = session
import spark.implicits._

val sparkResult = typedDS.dataset
.select(sparkFunc($"a"))
.map(DateTimeStringBehaviourUtils.nullHandler)
.collect()
.toList

val typed = typedDS
.select(typedCol)
.collect()
.run()
.toList

typed ?= sparkResult
}

test("year") {
val spark = session
import spark.implicits._

val nullHandler: Row => Option[Int] = _.get(0) match {
case i: Int => Some(i)
case _ => None
def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
dateTimeStringProp(ds)(year(ds[String]('a)), sparkFunctions.year)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("quarter") {
val spark = session
import spark.implicits._

def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
dateTimeStringProp(ds)(quarter(ds[String]('a)), sparkFunctions.quarter)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("month") {
val spark = session
import spark.implicits._

def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
val ds = TypedDataset.create(data)
dateTimeStringProp(ds)(month(ds[String]('a)), sparkFunctions.month)
}

val sparkResult = ds.toDF()
.select(sparkFunctions.year($"a"))
.map(nullHandler)
.collect()
.toList
check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

val typed = ds
.select(year(ds[String]('a)))
.collect()
.run()
.toList
test("dayofweek") {
val spark = session
import spark.implicits._

typed ?= sparkResult
}
def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
dateTimeStringProp(ds)(dayofweek(ds[String]('a)), sparkFunctions.dayofweek)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("dayofmonth") {
val spark = session
import spark.implicits._

def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
dateTimeStringProp(ds)(dayofmonth(ds[String]('a)), sparkFunctions.dayofmonth)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("dayofyear") {
val spark = session
import spark.implicits._

def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
dateTimeStringProp(ds)(dayofyear(ds[String]('a)), sparkFunctions.dayofyear)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("hour") {
val spark = session
import spark.implicits._

def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
dateTimeStringProp(ds)(hour(ds[String]('a)), sparkFunctions.hour)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("minute") {
val spark = session
import spark.implicits._

def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
dateTimeStringProp(ds)(minute(ds[String]('a)), sparkFunctions.minute)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("second") {
val spark = session
import spark.implicits._

def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
dateTimeStringProp(ds)(second(ds[String]('a)), sparkFunctions.second)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("weekofyear") {
val spark = session
import spark.implicits._

def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
dateTimeStringProp(ds)(weekofyear(ds[String]('a)), sparkFunctions.weekofyear)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
Expand Down