Skip to content

Commit

Permalink
Use LinkedIn spark to build spark runner
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyuiscool committed Apr 9, 2020
1 parent 95f0bd9 commit ff0c00d
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 15 deletions.
27 changes: 22 additions & 5 deletions runners/spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ evaluationDependsOn(":sdks:java:core")
evaluationDependsOn(":sdks:java:io:hadoop-format")
evaluationDependsOn(":runners:core-java")

repositories {
maven {
url "https://artifactory.corp.linkedin.com:8083/artifactory/DDS/"
}
maven {
url "https://artifactory.corp.linkedin.com:8083/artifactory/mintdev-publish-repo/"
}
maven {
url "file://" + System.getProperty("user.home") + "/local-repo"
}
}

configurations {
validatesRunner
}
Expand All @@ -55,6 +67,9 @@ test {
}
}

// Use LinkedIn spark
def spark_version = "2.3.0.263"

dependencies {
compile project(path: ":model:pipeline", configuration: "shadow")
compile project(path: ":sdks:java:core", configuration: "shadow")
Expand All @@ -65,10 +80,12 @@ dependencies {
compile library.java.slf4j_api
compile library.java.joda_time
compile library.java.args4j
provided library.java.spark_core
provided library.java.spark_sql
provided library.java.spark_streaming
provided library.java.spark_network_common

compile "com.linkedin.spark:spark-core_2.11:$spark_version"
compile "com.linkedin.spark:spark-sql_2.11:$spark_version"
compile "com.linkedin.spark:spark-streaming_2.11:$spark_version"
compile "com.linkedin.spark:spark-network-common_2.11:$spark_version"

provided library.java.hadoop_common
provided library.java.commons_io
provided library.java.hamcrest_core
Expand Down Expand Up @@ -235,7 +252,7 @@ task validatesRunner {
description "Validates Spark runner"
dependsOn validatesRunnerBatch
dependsOn validatesRunnerStreaming
// It should be uncommented once all "validatesStructuredStreamingRunnerBatch" tests will pass.
// It should be uncommented once all "validatesStructuredStreamingRunnerBatch" tests will pass.
// Otherwise, it breaks Spark runner ValidatesRunner tests.
//dependsOn validatesStructuredStreamingRunnerBatch
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.NonSQLExpression;
import org.apache.spark.sql.catalyst.expressions.UnaryExpression;
import org.apache.spark.sql.catalyst.expressions.codegen.Block;
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator;
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext;
import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode;
import org.apache.spark.sql.types.DataType;
Expand Down Expand Up @@ -94,7 +92,7 @@ public Expression child() {
public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) {
String accessCode = ctx.addReferenceObj("coder", coder, coder.getClass().getName());
ExprCode input = child.genCode(ctx);
String javaType = CodeGenerator.javaType(dataType());
String javaType = ctx.javaType(dataType());

List<String> parts = new ArrayList<>();
List<Object> args = new ArrayList<>();
Expand All @@ -117,10 +115,12 @@ public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) {

StringContext sc =
new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
Block code =
(new Block.BlockHelper(sc)).code(JavaConversions.collectionAsScalaIterable(args).toSeq());
// Block code =
// (new
// Block.BlockHelper(sc)).code(JavaConversions.collectionAsScalaIterable(args).toSeq());
String code = sc.s(JavaConversions.collectionAsScalaIterable(args).toSeq());

return ev.copy(input.code().$plus(code), input.isNull(), ev.value());
return ev.copy(input.code() + "\n" + code, input.isNull(), ev.value());
}

@Override
Expand Down Expand Up @@ -203,7 +203,7 @@ public Expression child() {
public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) {
String accessCode = ctx.addReferenceObj("coder", coder, coder.getClass().getName());
ExprCode input = child.genCode(ctx);
String javaType = CodeGenerator.javaType(dataType());
String javaType = ctx.javaType(dataType());

List<String> parts = new ArrayList<>();
List<Object> args = new ArrayList<>();
Expand All @@ -228,9 +228,11 @@ public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) {

StringContext sc =
new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
Block code =
(new Block.BlockHelper(sc)).code(JavaConversions.collectionAsScalaIterable(args).toSeq());
return ev.copy(input.code().$plus(code), input.isNull(), ev.value());
// Block code =
// (new
// Block.BlockHelper(sc)).code(JavaConversions.collectionAsScalaIterable(args).toSeq());
String code = sc.s(JavaConversions.collectionAsScalaIterable(args).toSeq());
return ev.copy(input.code() + "\n" + code, input.isNull(), ev.value());
}

@Override
Expand Down

0 comments on commit ff0c00d

Please sign in to comment.