Skip to content

Commit

Permalink
Fixes #27
Browse files Browse the repository at this point in the history
This adds methods  and  which effectively perform collect under the hood.  works also, but should be casted to KotlinArray which is not too beautiful and we can't override it (cause Kotlin forbids existing method overloading)

Signed-off-by: Pasha Finkelshteyn <asm0dey@jetbrains.com>
  • Loading branch information
asm0dey committed Jul 8, 2020
1 parent ffdb41d commit 5c05b6f
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,23 @@
*/
package org.jetbrains.spark.extensions

import java.util

import org.apache.spark.SparkContext
import org.apache.spark.sql._

import scala.collection.JavaConverters

object KSparkExtensions {
def col(d: Dataset[_], name: String): Column = d.col(name)

def col(name: String): Column = functions.col(name)

def lit(literal: Any): Column = functions.lit(literal)

def collectAsList[T](ds: Dataset[T]): util.List[T] = JavaConverters.seqAsJavaList(ds.collect())


def debugCodegen(df: Dataset[_]): Unit = {
import org.apache.spark.sql.execution.debug._
df.debugCodegen()
Expand All @@ -39,5 +46,5 @@ object KSparkExtensions {
df.debug()
}

def sparkContext(s:SparkSession): SparkContext = s.sparkContext
def sparkContext(s: SparkSession): SparkContext = s.sparkContext
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*-
* =LICENSE=
* Kotlin Spark API: Examples
* ----------
* Copyright (C) 2019 - 2020 JetBrains
* ----------
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =LICENSEEND=
*/
package org.jetbrains.spark.api.examples

import org.apache.spark.sql.Row
import org.jetbrains.spark.api.*

fun main() {
withSpark {
val sd = dsOf(1, 2, 3)
sd.createOrReplaceTempView("ds")
spark.sql("select * from ds")
.withCached {
println("asList: ${toList<Int>()}")
println("asArray: ${toArray<Int>().contentToString()}")
this
}
.to<Int>()
.withCached {
println("typed collect: " + (collect() as Array<Int>).contentToString())
println("type collectAsList: " + collectAsList())
}

dsOf(1, 2, 3)
.map { c(it, it + 1, it + 2) }
.to<Row>()
.select("_1")
.collectAsList()
.forEach { println(it) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ inline fun <reified KEY, reified VALUE> KeyValueGroupedDataset<KEY, VALUE>.reduc
.map { t -> t._1 to t._2 }

inline fun <T, reified R> Dataset<T>.downcast(): Dataset<R> = `as`(encoder<R>())
inline fun <reified R> Dataset<*>.`as`(): Dataset<R> = `as`(encoder<R>())
inline fun <reified R> Dataset<*>.to(): Dataset<R> = `as`(encoder<R>())

inline fun <reified T> Dataset<T>.forEach(noinline func: (T) -> Unit) = foreach(ForeachFunction(func))

Expand Down Expand Up @@ -245,6 +247,9 @@ inline fun <reified T, R> Dataset<T>.withCached(blockingUnpersist: Boolean = fal
return cached.executeOnCached().also { cached.unpersist(blockingUnpersist) }
}

inline fun <reified T> Dataset<Row>.toList() = KSparkExtensions.collectAsList(to<T>())
inline fun <reified R> Dataset<*>.toArray(): Array<R> = to<R>().collect() as Array<R>

/**
* Alternative to [Dataset.show] which returns surce dataset.
* Useful in debug purposes when you need to view contant of dataset as intermediate operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
*/
package org.jetbrains.spark.api

import org.apache.spark.sql.SparkSession
import org.jetbrains.spark.api.SparkLogLevel.ERROR

/**
Expand Down

0 comments on commit 5c05b6f

Please sign in to comment.