From c57eaaf5c2109b1d46eb7c93dc6fd49a5965a373 Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Fri, 18 Feb 2022 21:00:04 +0100 Subject: [PATCH 1/6] added streaming and exploring how it works with kotlin spark api --- examples/pom-3.2_2.12.xml | 5 ++ .../kotlinx/spark/examples/Streaming.kt | 61 +++++++++++++++++++ kotlin-spark-api/3.2/pom_2.12.xml | 6 ++ 3 files changed, 72 insertions(+) create mode 100644 examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Streaming.kt diff --git a/examples/pom-3.2_2.12.xml b/examples/pom-3.2_2.12.xml index b5267352..5f214b69 100644 --- a/examples/pom-3.2_2.12.xml +++ b/examples/pom-3.2_2.12.xml @@ -24,6 +24,11 @@ spark-sql_${scala.compat.version} ${spark3.version} + + org.apache.spark + spark-streaming_${scala.compat.version} + ${spark3.version} + diff --git a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Streaming.kt b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Streaming.kt new file mode 100644 index 00000000..7c562bd5 --- /dev/null +++ b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Streaming.kt @@ -0,0 +1,61 @@ +/*- + * =LICENSE= + * Kotlin Spark API: Examples for Spark 3.2+ (Scala 2.12) + * ---------- + * Copyright (C) 2019 - 2022 JetBrains + * ---------- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * =LICENSEEND= + */ +package org.jetbrains.kotlinx.spark.examples + +import org.apache.spark.SparkConf +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.streaming.Durations +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.jetbrains.kotlinx.spark.api.withSpark +import scala.Tuple2 +import java.io.Serializable + +data class Row @JvmOverloads constructor( + var word: String = "", +) : Serializable + +fun main() = withSpark { + + val context = JavaStreamingContext( + SparkConf() + .setMaster("local[*]") + .setAppName("Test"), + Durations.seconds(1), + ) + + val lines = context.socketTextStream("localhost", 9999) + + val words = lines.flatMap { it.split(" ").iterator() } + + words.foreachRDD { rdd, time -> + + // todo convert rdd to dataset using kotlin data class? + + val rowRdd = rdd.map { Row(it) } + + val dataframe = spark.createDataFrame(rowRdd, Row::class.java) + + + } + + + context.start() + context.awaitTermination() +} \ No newline at end of file diff --git a/kotlin-spark-api/3.2/pom_2.12.xml b/kotlin-spark-api/3.2/pom_2.12.xml index 756d9c2b..826547d2 100644 --- a/kotlin-spark-api/3.2/pom_2.12.xml +++ b/kotlin-spark-api/3.2/pom_2.12.xml @@ -36,6 +36,12 @@ ${spark3.version} provided + + org.apache.spark + spark-streaming_${scala.compat.version} + ${spark3.version} + provided + From bb39fc79f4b528850337428a738761e84168c752 Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Mon, 21 Feb 2022 15:26:31 +0100 Subject: [PATCH 2/6] Adds helpful rdd to dataset conversion, as well as a new withSpark function taking a SparkConf --- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 14 +++++++++++ .../kotlinx/spark/api/SparkHelper.kt | 23 ++++++++++++++++++- 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 1061a21a..d41958ec 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -23,9 +23,11 @@ package org.jetbrains.kotlinx.spark.api import org.apache.hadoop.shaded.org.apache.commons.math3.exception.util.ArgUtils import org.apache.spark.SparkContext +import org.apache.spark.api.java.JavaRDD import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.api.java.function.* import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD import org.apache.spark.sql.* import org.apache.spark.sql.Encoders.* import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -154,6 +156,18 @@ inline fun SparkSession.dsOf(vararg t: T): Dataset = inline fun List.toDS(spark: SparkSession): Dataset = spark.createDataset(this, encoder()) +/** + * Utility method to create dataset from RDD + */ +inline fun RDD.toDS(spark: SparkSession): Dataset = + spark.createDataset(this, encoder()) + +/** + * Utility method to create dataset from JavaRDD + */ +inline fun JavaRDD.toDS(spark: SparkSession): Dataset = + spark.createDataset(this.rdd(), encoder()) + /** * Main method of API, which gives you seamless integration with Spark: * It creates encoder for any given supported type T diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkHelper.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkHelper.kt index 6188daae..213636a8 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkHelper.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkHelper.kt @@ -20,6 +20,8 @@ package org.jetbrains.kotlinx.spark.api import org.apache.spark.SparkConf +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession.Builder import org.apache.spark.sql.UDFRegistration import org.jetbrains.kotlinx.spark.api.SparkLogLevel.ERROR @@ -83,13 +85,32 @@ inline fun withSpark(builder: Builder, logLevel: SparkLogLevel = ERROR, func: KS .also { it.stop() } } +/** + * Wrapper for spark creation which copies params from [sparkConf]. + * + * @param sparkConf Sets a list of config options based on this. + * @param logLevel Control our logLevel. This overrides any user-defined log settings. + * @param func function which will be executed in context of [KSparkSession] (it means that `this` inside block will point to [KSparkSession]) + */ +@JvmOverloads +inline fun withSpark(sparkConf: SparkConf, logLevel: SparkLogLevel = ERROR, func: KSparkSession.() -> Unit) { + withSpark( + builder = SparkSession.builder().config(sparkConf), + logLevel = logLevel, + func = func, + ) +} + /** * This wrapper over [SparkSession] which provides several additional methods to create [org.apache.spark.sql.Dataset] */ +@JvmInline @Suppress("EXPERIMENTAL_FEATURE_WARNING", "unused") -inline class KSparkSession(val spark: SparkSession) { +value class KSparkSession(val spark: SparkSession) { inline fun List.toDS() = toDS(spark) inline fun Array.toDS() = spark.dsOf(*this) inline fun dsOf(vararg arg: T) = spark.dsOf(*arg) + inline fun RDD.toDS() = toDS(spark) + inline fun JavaRDD.toDS() = toDS(spark) val udf: UDFRegistration get() = spark.udf() } From 4bd3fe190146ce0fb9a089a3bd26e2018b2d9996 Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Mon, 21 Feb 2022 16:16:30 +0100 Subject: [PATCH 3/6] makes javaRDD toDS function more generic. Adds sc JavaSparkContext to KSparkSession --- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 5 +-- .../kotlinx/spark/api/SparkHelper.kt | 15 ++++--- .../jetbrains/kotlinx/spark/api/ApiTest.kt | 42 +++++++++++++++++++ 3 files changed, 54 insertions(+), 8 deletions(-) diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index d41958ec..7e9ef135 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -23,8 +23,7 @@ package org.jetbrains.kotlinx.spark.api import org.apache.hadoop.shaded.org.apache.commons.math3.exception.util.ArgUtils import org.apache.spark.SparkContext -import org.apache.spark.api.java.JavaRDD -import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.api.java.* import org.apache.spark.api.java.function.* import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD @@ -165,7 +164,7 @@ inline fun RDD.toDS(spark: SparkSession): Dataset = /** * Utility method to create dataset from JavaRDD */ -inline fun JavaRDD.toDS(spark: SparkSession): Dataset = +inline fun JavaRDDLike.toDS(spark: SparkSession): Dataset = spark.createDataset(this.rdd(), encoder()) /** diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkHelper.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkHelper.kt index 213636a8..d9b4823a 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkHelper.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkHelper.kt @@ -21,7 +21,10 @@ package org.jetbrains.kotlinx.spark.api import org.apache.spark.SparkConf import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.JavaRDDLike +import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Dataset import org.apache.spark.sql.SparkSession.Builder import org.apache.spark.sql.UDFRegistration import org.jetbrains.kotlinx.spark.api.SparkLogLevel.ERROR @@ -80,9 +83,10 @@ inline fun withSpark(builder: Builder, logLevel: SparkLogLevel = ERROR, func: KS KSparkSession(this).apply { sparkContext.setLogLevel(logLevel) func() + sc.stop() + spark.stop() } } - .also { it.stop() } } /** @@ -104,13 +108,14 @@ inline fun withSpark(sparkConf: SparkConf, logLevel: SparkLogLevel = ERROR, func /** * This wrapper over [SparkSession] which provides several additional methods to create [org.apache.spark.sql.Dataset] */ -@JvmInline -@Suppress("EXPERIMENTAL_FEATURE_WARNING", "unused") -value class KSparkSession(val spark: SparkSession) { +class KSparkSession(val spark: SparkSession) { + + val sc: JavaSparkContext = JavaSparkContext(spark.sparkContext) + inline fun List.toDS() = toDS(spark) inline fun Array.toDS() = spark.dsOf(*this) inline fun dsOf(vararg arg: T) = spark.dsOf(*arg) inline fun RDD.toDS() = toDS(spark) - inline fun JavaRDD.toDS() = toDS(spark) + inline fun JavaRDDLike.toDS() = toDS(spark) val udf: UDFRegistration get() = spark.udf() } diff --git a/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt index ed784b13..936b5b2c 100644 --- a/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt +++ b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt @@ -21,6 +21,11 @@ import ch.tutteli.atrium.api.fluent.en_GB.* import ch.tutteli.atrium.api.verbs.expect import io.kotest.core.spec.style.ShouldSpec import io.kotest.matchers.shouldBe +import org.apache.spark.api.java.JavaDoubleRDD +import org.apache.spark.api.java.JavaPairRDD +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.rdd.RDD import org.apache.spark.sql.Dataset import org.apache.spark.sql.functions.* import org.apache.spark.sql.streaming.GroupState @@ -593,6 +598,43 @@ class ApiTest : ShouldSpec({ it.nullable() shouldBe true } } + should("Easily convert a (Java)RDD to a Dataset") { + // scala RDD + val rdd0: RDD = sc.parallelize( + listOf(1, 2, 3, 4, 5, 6) + ).rdd() + val dataset0: Dataset = rdd0.toDS() + dataset0.show() + + dataset0.toList() shouldBe listOf(1, 2, 3, 4, 5, 6) + + // normal JavaRDD + val rdd1: JavaRDD = sc.parallelize( + listOf(1, 2, 3, 4, 5, 6) + ) + val dataset1: Dataset = rdd1.toDS() + dataset1.show() + + dataset1.toList() shouldBe listOf(1, 2, 3, 4, 5, 6) + + // JavaDoubleRDD + val rdd2: JavaDoubleRDD = sc.parallelizeDoubles( + listOf(1.0, 2.0, 3.0, 4.0, 5.0, 6.0) + ) + val dataset2: Dataset = rdd2.toDS() + dataset2.show() + + dataset2.toList() shouldBe listOf(1.0, 2.0, 3.0, 4.0, 5.0, 6.0) + + // JavaPairRDD + val rdd3: JavaPairRDD = sc.parallelizePairs( + listOf(Tuple2(1, 1.0), Tuple2(2, 2.0), Tuple2(3, 3.0)) + ) + val dataset3: Dataset> = rdd3.toDS() + dataset3.show() + + dataset3.toList>() shouldBe listOf(Tuple2(1, 1.0), Tuple2(2, 2.0), Tuple2(3, 3.0)) + } } } }) From 09e9bb5438c929f798084935ced429be1a657e8b Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Tue, 22 Feb 2022 13:06:26 +0100 Subject: [PATCH 4/6] Arity is now Serializable, removed sc.stop(), sc is now lazy, updates tests, removed streaming example --- .../kotlinx/spark/examples/Streaming.kt | 61 ------------------- .../kotlinx/spark/api/SparkHelper.kt | 3 +- .../jetbrains/kotlinx/spark/api/VarArities.kt | 54 ++++++++-------- .../jetbrains/kotlinx/spark/api/ApiTest.kt | 22 +++++++ 4 files changed, 51 insertions(+), 89 deletions(-) delete mode 100644 examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Streaming.kt diff --git a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Streaming.kt b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Streaming.kt deleted file mode 100644 index 7c562bd5..00000000 --- a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Streaming.kt +++ /dev/null @@ -1,61 +0,0 @@ -/*- - * =LICENSE= - * Kotlin Spark API: Examples for Spark 3.2+ (Scala 2.12) - * ---------- - * Copyright (C) 2019 - 2022 JetBrains - * ---------- - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * =LICENSEEND= - */ -package org.jetbrains.kotlinx.spark.examples - -import org.apache.spark.SparkConf -import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.streaming.Durations -import org.apache.spark.streaming.api.java.JavaStreamingContext -import org.jetbrains.kotlinx.spark.api.withSpark -import scala.Tuple2 -import java.io.Serializable - -data class Row @JvmOverloads constructor( - var word: String = "", -) : Serializable - -fun main() = withSpark { - - val context = JavaStreamingContext( - SparkConf() - .setMaster("local[*]") - .setAppName("Test"), - Durations.seconds(1), - ) - - val lines = context.socketTextStream("localhost", 9999) - - val words = lines.flatMap { it.split(" ").iterator() } - - words.foreachRDD { rdd, time -> - - // todo convert rdd to dataset using kotlin data class? - - val rowRdd = rdd.map { Row(it) } - - val dataframe = spark.createDataFrame(rowRdd, Row::class.java) - - - } - - - context.start() - context.awaitTermination() -} \ No newline at end of file diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkHelper.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkHelper.kt index d9b4823a..98fdae8d 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkHelper.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkHelper.kt @@ -83,7 +83,6 @@ inline fun withSpark(builder: Builder, logLevel: SparkLogLevel = ERROR, func: KS KSparkSession(this).apply { sparkContext.setLogLevel(logLevel) func() - sc.stop() spark.stop() } } @@ -110,7 +109,7 @@ inline fun withSpark(sparkConf: SparkConf, logLevel: SparkLogLevel = ERROR, func */ class KSparkSession(val spark: SparkSession) { - val sc: JavaSparkContext = JavaSparkContext(spark.sparkContext) + val sc: JavaSparkContext by lazy { JavaSparkContext(spark.sparkContext) } inline fun List.toDS() = toDS(spark) inline fun Array.toDS() = spark.dsOf(*this) diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/VarArities.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/VarArities.kt index a4b2bdd7..af870038 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/VarArities.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/VarArities.kt @@ -22,32 +22,34 @@ */ package org.jetbrains.kotlinx.spark.api -data class Arity1(val _1: T1) -data class Arity2(val _1: T1, val _2: T2) -data class Arity3(val _1: T1, val _2: T2, val _3: T3) -data class Arity4(val _1: T1, val _2: T2, val _3: T3, val _4: T4) -data class Arity5(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5) -data class Arity6(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6) -data class Arity7(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7) -data class Arity8(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8) -data class Arity9(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9) -data class Arity10(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10) -data class Arity11(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11) -data class Arity12(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12) -data class Arity13(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13) -data class Arity14(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14) -data class Arity15(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15) -data class Arity16(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16) -data class Arity17(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17) -data class Arity18(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18) -data class Arity19(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19) -data class Arity20(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20) -data class Arity21(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21) -data class Arity22(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21, val _22: T22) -data class Arity23(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21, val _22: T22, val _23: T23) -data class Arity24(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21, val _22: T22, val _23: T23, val _24: T24) -data class Arity25(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21, val _22: T22, val _23: T23, val _24: T24, val _25: T25) -data class Arity26(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21, val _22: T22, val _23: T23, val _24: T24, val _25: T25, val _26: T26) +import java.io.Serializable + +data class Arity1(val _1: T1): Serializable +data class Arity2(val _1: T1, val _2: T2): Serializable +data class Arity3(val _1: T1, val _2: T2, val _3: T3): Serializable +data class Arity4(val _1: T1, val _2: T2, val _3: T3, val _4: T4): Serializable +data class Arity5(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5): Serializable +data class Arity6(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6): Serializable +data class Arity7(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7): Serializable +data class Arity8(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8): Serializable +data class Arity9(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9): Serializable +data class Arity10(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10): Serializable +data class Arity11(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11): Serializable +data class Arity12(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12): Serializable +data class Arity13(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13): Serializable +data class Arity14(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14): Serializable +data class Arity15(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15): Serializable +data class Arity16(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16): Serializable +data class Arity17(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17): Serializable +data class Arity18(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18): Serializable +data class Arity19(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19): Serializable +data class Arity20(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20): Serializable +data class Arity21(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21): Serializable +data class Arity22(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21, val _22: T22): Serializable +data class Arity23(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21, val _22: T22, val _23: T23): Serializable +data class Arity24(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21, val _22: T22, val _23: T23, val _24: T24): Serializable +data class Arity25(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21, val _22: T22, val _23: T23, val _24: T24, val _25: T25): Serializable +data class Arity26(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21, val _22: T22, val _23: T23, val _24: T24, val _25: T25, val _26: T26): Serializable fun c(_1: T1) = Arity1(_1) fun c(_1: T1, _2: T2) = Arity2(_1, _2) fun c(_1: T1, _2: T2, _3: T3) = Arity3(_1, _2, _3) diff --git a/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt index 936b5b2c..149e6500 100644 --- a/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt +++ b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt @@ -634,11 +634,33 @@ class ApiTest : ShouldSpec({ dataset3.show() dataset3.toList>() shouldBe listOf(Tuple2(1, 1.0), Tuple2(2, 2.0), Tuple2(3, 3.0)) + + // Kotlin Serializable data class RDD + val rdd4 = sc.parallelize( + listOf(SomeClass(intArrayOf(1, 2), 0)) + ) + val dataset4 = rdd4.toDS() + dataset4.show() + + dataset4.toList().first().let { (a, b) -> + a contentEquals intArrayOf(1, 2) shouldBe true + b shouldBe 0 + } + + // Arity + val rdd5 = sc.parallelize( + listOf(c(1.0, 4)) + ) + val dataset5 = rdd5.toDS() + dataset5.show() + + dataset5.toList>() shouldBe listOf(c(1.0, 4)) } } } }) + data class DataClassWithTuple(val tuple: T) data class LonLat(val lon: Double, val lat: Double) From 38486bbd5931ff4e9e9ee27696969c81798335df Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Wed, 23 Feb 2022 17:25:09 +0100 Subject: [PATCH 5/6] removed .show() from rdd test --- .../test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt | 6 ------ 1 file changed, 6 deletions(-) diff --git a/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt index 149e6500..c8ad1a41 100644 --- a/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt +++ b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt @@ -604,7 +604,6 @@ class ApiTest : ShouldSpec({ listOf(1, 2, 3, 4, 5, 6) ).rdd() val dataset0: Dataset = rdd0.toDS() - dataset0.show() dataset0.toList() shouldBe listOf(1, 2, 3, 4, 5, 6) @@ -613,7 +612,6 @@ class ApiTest : ShouldSpec({ listOf(1, 2, 3, 4, 5, 6) ) val dataset1: Dataset = rdd1.toDS() - dataset1.show() dataset1.toList() shouldBe listOf(1, 2, 3, 4, 5, 6) @@ -622,7 +620,6 @@ class ApiTest : ShouldSpec({ listOf(1.0, 2.0, 3.0, 4.0, 5.0, 6.0) ) val dataset2: Dataset = rdd2.toDS() - dataset2.show() dataset2.toList() shouldBe listOf(1.0, 2.0, 3.0, 4.0, 5.0, 6.0) @@ -631,7 +628,6 @@ class ApiTest : ShouldSpec({ listOf(Tuple2(1, 1.0), Tuple2(2, 2.0), Tuple2(3, 3.0)) ) val dataset3: Dataset> = rdd3.toDS() - dataset3.show() dataset3.toList>() shouldBe listOf(Tuple2(1, 1.0), Tuple2(2, 2.0), Tuple2(3, 3.0)) @@ -640,7 +636,6 @@ class ApiTest : ShouldSpec({ listOf(SomeClass(intArrayOf(1, 2), 0)) ) val dataset4 = rdd4.toDS() - dataset4.show() dataset4.toList().first().let { (a, b) -> a contentEquals intArrayOf(1, 2) shouldBe true @@ -652,7 +647,6 @@ class ApiTest : ShouldSpec({ listOf(c(1.0, 4)) ) val dataset5 = rdd5.toDS() - dataset5.show() dataset5.toList>() shouldBe listOf(c(1.0, 4)) } From bcf99b8fe3b3950e99a9b90cb17bcd1add8f810e Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Thu, 24 Feb 2022 15:07:42 +0100 Subject: [PATCH 6/6] split up rdd tests, added list test. Added kotlin official code style prop for later --- .../jetbrains/kotlinx/spark/api/ApiTest.kt | 30 ++++++++++++------- pom.xml | 1 + 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt index c8ad1a41..06c5628f 100644 --- a/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt +++ b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt @@ -598,22 +598,24 @@ class ApiTest : ShouldSpec({ it.nullable() shouldBe true } } - should("Easily convert a (Java)RDD to a Dataset") { - // scala RDD + should("Convert Scala RDD to Dataset") { val rdd0: RDD = sc.parallelize( listOf(1, 2, 3, 4, 5, 6) ).rdd() val dataset0: Dataset = rdd0.toDS() dataset0.toList() shouldBe listOf(1, 2, 3, 4, 5, 6) + } - // normal JavaRDD + should("Convert a JavaRDD to a Dataset") { val rdd1: JavaRDD = sc.parallelize( listOf(1, 2, 3, 4, 5, 6) ) val dataset1: Dataset = rdd1.toDS() dataset1.toList() shouldBe listOf(1, 2, 3, 4, 5, 6) + } + should("Convert JavaDoubleRDD to Dataset") { // JavaDoubleRDD val rdd2: JavaDoubleRDD = sc.parallelizeDoubles( @@ -622,16 +624,16 @@ class ApiTest : ShouldSpec({ val dataset2: Dataset = rdd2.toDS() dataset2.toList() shouldBe listOf(1.0, 2.0, 3.0, 4.0, 5.0, 6.0) - - // JavaPairRDD + } + should("Convert JavaPairRDD to Dataset") { val rdd3: JavaPairRDD = sc.parallelizePairs( listOf(Tuple2(1, 1.0), Tuple2(2, 2.0), Tuple2(3, 3.0)) ) val dataset3: Dataset> = rdd3.toDS() dataset3.toList>() shouldBe listOf(Tuple2(1, 1.0), Tuple2(2, 2.0), Tuple2(3, 3.0)) - - // Kotlin Serializable data class RDD + } + should("Convert Kotlin Serializable data class RDD to Dataset") { val rdd4 = sc.parallelize( listOf(SomeClass(intArrayOf(1, 2), 0)) ) @@ -641,8 +643,8 @@ class ApiTest : ShouldSpec({ a contentEquals intArrayOf(1, 2) shouldBe true b shouldBe 0 } - - // Arity + } + should("Convert Arity RDD to Dataset") { val rdd5 = sc.parallelize( listOf(c(1.0, 4)) ) @@ -650,6 +652,14 @@ class ApiTest : ShouldSpec({ dataset5.toList>() shouldBe listOf(c(1.0, 4)) } + should("Convert List RDD to Dataset") { + val rdd6 = sc.parallelize( + listOf(listOf(1, 2, 3), listOf(4, 5, 6)) + ) + val dataset6 = rdd6.toDS() + + dataset6.toList>() shouldBe listOf(listOf(1, 2, 3), listOf(4, 5, 6)) + } } } }) @@ -684,5 +694,5 @@ data class ComplexEnumDataClass( data class NullFieldAbleDataClass( val optionList: List?, - val optionMap: Map? + val optionMap: Map?, ) \ No newline at end of file diff --git a/pom.xml b/pom.xml index 47043737..2ced5eb7 100644 --- a/pom.xml +++ b/pom.xml @@ -32,6 +32,7 @@ 3.0.0-M5 1.6.8 4.5.6 + official