Skip to content

Commit

Permalink
fixed streaming test scala 2.12
Browse files Browse the repository at this point in the history
  • Loading branch information
Jolanrensen committed Mar 25, 2024
1 parent 37eec64 commit 0ab212b
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class SparkKotlinCompilerGradlePlugin : KotlinCompilerPluginSupportPlugin {
it.extensions.findByType<KotlinJvmProjectExtension>()?.apply {
compilerOptions {
// Make sure the parameters of data classes are visible to scala
javaParameters.set(true)
// javaParameters.set(true)

// Avoid NotSerializableException by making lambdas serializable
freeCompilerArgs.add("-Xlambdas=class")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,13 @@ private fun checkpointFile(checkpointDir: String, checkpointTime: Time): Path {
private fun getCheckpointFiles(
checkpointDir: String,
fs: scala.Option<FileSystem>
): scala.collection.immutable.Seq<Path> {
): scala.collection.Seq<Path> {
val klass = Class.forName("org.apache.spark.streaming.Checkpoint$")
val moduleField = klass.getField("MODULE$").also { it.isAccessible = true }
val module = moduleField.get(null)
val getCheckpointFilesMethod = klass.getMethod("getCheckpointFiles", String::class.java, scala.Option::class.java)
.also { it.isAccessible = true }
return getCheckpointFilesMethod.invoke(module, checkpointDir, fs) as scala.collection.immutable.Seq<Path>
return getCheckpointFilesMethod.invoke(module, checkpointDir, fs) as scala.collection.Seq<Path>
}

private fun createCorruptedCheckpoint(): String {
Expand Down

0 comments on commit 0ab212b

Please sign in to comment.