Skip to content

Commit

Permalink
Revert li spark changes (apache#4)
Browse files Browse the repository at this point in the history
* Revert "Use LinkedIn spark for job-server too"

This reverts commit b086a72.

* Revert "Use LinkedIn spark to build spark runner"

This reverts commit ff0c00d.

* Checkstyle fixes
  • Loading branch information
dxichen authored Aug 17, 2021
1 parent 849b03c commit dc7d41b
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 69 deletions.
37 changes: 4 additions & 33 deletions runners/spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,6 @@ evaluationDependsOn(":sdks:java:core")
evaluationDependsOn(":sdks:java:io:hadoop-format")
evaluationDependsOn(":runners:core-java")

repositories {
maven {
url "https://artifactory.corp.linkedin.com:8083/artifactory/DDS/"
metadataSources {
mavenPom()
artifact()
}
}
maven {
url "https://artifactory.corp.linkedin.com:8083/artifactory/mintdev-publish-repo/"
metadataSources {
mavenPom()
artifact()
}
}
maven {
url "file://" + System.getProperty("user.home") + "/local-repo"
metadataSources {
mavenPom()
artifact()
}
}
}

configurations {
validatesRunner
}
Expand Down Expand Up @@ -92,9 +68,6 @@ test {
}
}

// Use LinkedIn spark
def spark_version = "2.3.0.263"

dependencies {
compile project(path: ":model:pipeline", configuration: "shadow")
compile project(path: ":sdks:java:core", configuration: "shadow")
Expand All @@ -106,12 +79,10 @@ dependencies {
compile library.java.slf4j_api
compile library.java.joda_time
compile library.java.args4j

compile "com.linkedin.spark:spark-core_2.11:$spark_version"
compile "com.linkedin.spark:spark-sql_2.11:$spark_version"
compile "com.linkedin.spark:spark-streaming_2.11:$spark_version"
compile "com.linkedin.spark:spark-network-common_2.11:$spark_version"

provided library.java.spark_core
provided library.java.spark_sql
provided library.java.spark_streaming
provided library.java.spark_network_common
provided library.java.hadoop_common
provided library.java.commons_io
provided library.java.hamcrest_core
Expand Down
24 changes: 0 additions & 24 deletions runners/spark/job-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,6 @@ apply plugin: 'application'
// we need to set mainClassName before applying shadow plugin
mainClassName = "org.apache.beam.runners.spark.SparkJobServerDriver"

repositories {
maven {
url "https://artifactory.corp.linkedin.com:8083/artifactory/DDS/"
metadataSources {
mavenPom()
artifact()
}
}
maven {
url "https://artifactory.corp.linkedin.com:8083/artifactory/mintdev-publish-repo/"
metadataSources {
mavenPom()
artifact()
}
}
maven {
url "file://" + System.getProperty("user.home") + "/local-repo"
metadataSources {
mavenPom()
artifact()
}
}
}

applyJavaNature(
automaticModuleName: 'org.apache.beam.runners.spark.jobserver',
validateShadowJar: false,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Spark sinks that supports beam aggregate metrics. */
package org.apache.beam.runners.spark.aggregators.metrics.sink;
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.NonSQLExpression;
import org.apache.spark.sql.catalyst.expressions.UnaryExpression;
import org.apache.spark.sql.catalyst.expressions.codegen.Block;
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator;
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext;
import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode;
import org.apache.spark.sql.types.DataType;
Expand Down Expand Up @@ -95,7 +97,7 @@ public Expression child() {
public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) {
String accessCode = ctx.addReferenceObj("coder", coder, coder.getClass().getName());
ExprCode input = child.genCode(ctx);
String javaType = ctx.javaType(dataType());
String javaType = CodeGenerator.javaType(dataType());

List<String> parts = new ArrayList<>();
List<Object> args = new ArrayList<>();
Expand All @@ -118,12 +120,10 @@ public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) {

StringContext sc =
new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
// Block code =
// (new
// Block.BlockHelper(sc)).code(JavaConversions.collectionAsScalaIterable(args).toSeq());
String code = sc.s(JavaConversions.collectionAsScalaIterable(args).toSeq());
Block code =
(new Block.BlockHelper(sc)).code(JavaConversions.collectionAsScalaIterable(args).toSeq());

return ev.copy(input.code() + "\n" + code, input.isNull(), ev.value());
return ev.copy(input.code().$plus(code), input.isNull(), ev.value());
}

@Override
Expand Down Expand Up @@ -206,7 +206,7 @@ public Expression child() {
public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) {
String accessCode = ctx.addReferenceObj("coder", coder, coder.getClass().getName());
ExprCode input = child.genCode(ctx);
String javaType = ctx.javaType(dataType());
String javaType = CodeGenerator.javaType(dataType());

List<String> parts = new ArrayList<>();
List<Object> args = new ArrayList<>();
Expand All @@ -231,11 +231,9 @@ public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) {

StringContext sc =
new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
// Block code =
// (new
// Block.BlockHelper(sc)).code(JavaConversions.collectionAsScalaIterable(args).toSeq());
String code = sc.s(JavaConversions.collectionAsScalaIterable(args).toSeq());
return ev.copy(input.code() + "\n" + code, input.isNull(), ev.value());
Block code =
(new Block.BlockHelper(sc)).code(JavaConversions.collectionAsScalaIterable(args).toSeq());
return ev.copy(input.code().$plus(code), input.isNull(), ev.value());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@
<Match>
<Package name="org.apache.beam.sdk.extensions.sql.impl.parser.impl"/>
</Match>
<Match>
<Package name="org.apache.beam.runners.spark.aggregators.metrics.sink"/>
</Match>
<Match>
<Package name="org.apache.beam.sdk.schemas.parser.generated"/>
</Match>
Expand Down

0 comments on commit dc7d41b

Please sign in to comment.