forked from apache/beam
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request apache#5 from seznam/pete/examples
Add euphoria-examples
- Loading branch information
Showing
5 changed files
with
344 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
101 changes: 101 additions & 0 deletions
101
sdks/java/extensions/euphoria/euphoria-examples/pom.xml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
Copyright 2016 Seznam.cz, a.s. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
--> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<parent> | ||
<artifactId>euphoria-parent</artifactId> | ||
<groupId>cz.seznam.euphoria</groupId> | ||
<version>0.5.0-SNAPSHOT</version> | ||
</parent> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<artifactId>euphoria-examples</artifactId> | ||
<packaging>jar</packaging> | ||
<name>${project.groupId}:${project.artifactId}</name> | ||
<description>Example programs demonstrating the Euphoria API.</description> | ||
|
||
<build> | ||
<plugins> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-shade-plugin</artifactId> | ||
<version>2.4.3</version> | ||
<executions> | ||
<execution> | ||
<phase>package</phase> | ||
<goals> | ||
<goal>shade</goal> | ||
</goals> | ||
<configuration> | ||
<finalName>${project.artifactId}</finalName> | ||
<outputDirectory>assembly</outputDirectory> | ||
<createDependencyReducedPom>false</createDependencyReducedPom> | ||
<transformers> | ||
<!-- merge META-INF/services/* (in particular from hadoop-* dependencies) --> | ||
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> | ||
<!-- merge LICENSE and LICENSE.txt --> | ||
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer" /> | ||
<!-- assemble NOTICE --> | ||
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer" /> | ||
</transformers> | ||
</configuration> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>cz.seznam.euphoria</groupId> | ||
<artifactId>euphoria-core</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>cz.seznam.euphoria</groupId> | ||
<artifactId>euphoria-hadoop</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>cz.seznam.euphoria</groupId> | ||
<artifactId>euphoria-flink</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>cz.seznam.euphoria</groupId> | ||
<artifactId>euphoria-spark</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>cz.seznam.euphoria</groupId> | ||
<artifactId>euphoria-inmem</artifactId> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-core_2.10</artifactId> | ||
<version>2.0.1</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>junit</groupId> | ||
<artifactId>junit</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
</dependencies> | ||
|
||
</project> |
106 changes: 106 additions & 0 deletions
106
...sions/euphoria/euphoria-examples/src/main/java/cz/seznam/euphoria/examples/Executors.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
/** | ||
* Copyright 2016 Seznam.cz, a.s. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package cz.seznam.euphoria.examples; | ||
|
||
import cz.seznam.euphoria.core.executor.Executor; | ||
import cz.seznam.euphoria.flink.FlinkExecutor; | ||
import cz.seznam.euphoria.flink.TestFlinkExecutor; | ||
import cz.seznam.euphoria.inmem.InMemExecutor; | ||
import cz.seznam.euphoria.spark.SparkExecutor; | ||
import cz.seznam.euphoria.spark.TestSparkExecutor; | ||
import org.apache.spark.SparkConf; | ||
import org.apache.spark.serializer.KryoSerializer; | ||
|
||
import java.io.IOException; | ||
|
||
/** | ||
* A collection of helpers for easy allocation/creation of a specific executor. | ||
*/ | ||
public class Executors { | ||
|
||
private interface Factory { | ||
Executor create() throws IOException; | ||
} | ||
|
||
private static class InMemFactory implements Factory { | ||
@Override | ||
public Executor create() throws IOException { | ||
return new InMemExecutor(); | ||
} | ||
} | ||
|
||
private static class SparkFactory implements Factory { | ||
private final boolean test; | ||
SparkFactory(boolean test) { | ||
this.test = test; | ||
} | ||
@Override | ||
public Executor create() { | ||
if (test) { | ||
return new TestSparkExecutor(); | ||
} else { | ||
SparkConf conf = new SparkConf(); | ||
conf.set("spark.serializer", KryoSerializer.class.getName()); | ||
return new SparkExecutor(conf); | ||
} | ||
} | ||
} | ||
|
||
private static class FlinkFactory implements Factory { | ||
private final boolean test; | ||
FlinkFactory(boolean test) { | ||
this.test = test; | ||
} | ||
@Override | ||
public Executor create() throws IOException { | ||
if (test) { | ||
return new TestFlinkExecutor(); | ||
} else { | ||
return new FlinkExecutor(); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Creates an executor by name or fails if the specified name is not recognized. | ||
*/ | ||
public static Executor createExecutor(String executorName) throws IOException { | ||
// ~ be sure to go through factories to leverage java lazy class loading; | ||
// this avoids for example loading spark dependencies in a flink environment | ||
final Factory f; | ||
switch (executorName) { | ||
case "inmem": | ||
f = new InMemFactory(); | ||
break; | ||
case "flink-test": | ||
f = new FlinkFactory(true); | ||
break; | ||
case "flink": | ||
f = new FlinkFactory(false); | ||
break; | ||
case "spark-test": | ||
f = new SparkFactory(true); | ||
break; | ||
case "spark": | ||
f = new SparkFactory(false); | ||
break; | ||
default: | ||
throw new IllegalArgumentException("Executor not supported: " + executorName); | ||
} | ||
return f.create(); | ||
} | ||
|
||
} |
126 changes: 126 additions & 0 deletions
126
...uphoria-examples/src/main/java/cz/seznam/euphoria/examples/wordcount/SimpleWordCount.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
/** | ||
* Copyright 2016 Seznam.cz, a.s. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package cz.seznam.euphoria.examples.wordcount; | ||
|
||
import cz.seznam.euphoria.core.client.dataset.Dataset; | ||
import cz.seznam.euphoria.core.client.flow.Flow; | ||
import cz.seznam.euphoria.core.client.io.Context; | ||
import cz.seznam.euphoria.core.client.io.StdoutSink; | ||
import cz.seznam.euphoria.core.client.operator.FlatMap; | ||
import cz.seznam.euphoria.core.client.operator.MapElements; | ||
import cz.seznam.euphoria.core.client.operator.ReduceByKey; | ||
import cz.seznam.euphoria.core.client.util.Pair; | ||
import cz.seznam.euphoria.core.client.util.Sums; | ||
import cz.seznam.euphoria.core.executor.Executor; | ||
import cz.seznam.euphoria.core.util.Settings; | ||
import cz.seznam.euphoria.examples.Executors; | ||
import cz.seznam.euphoria.hadoop.input.SimpleHadoopTextFileSource; | ||
import cz.seznam.euphoria.hadoop.output.SimpleHadoopTextFileSink; | ||
|
||
import java.net.URI; | ||
import java.util.regex.Pattern; | ||
|
||
/** | ||
* Demostrates a very simple word-count supported batched input. | ||
* | ||
* Example usage on flink: | ||
* <pre>{@code | ||
* $ flink run -m yarn-cluster \ | ||
* -yn 1 -ys 2 -ytm 800 \ | ||
* -c cz.seznam.euphoria.examples.wordcount.SimpleWordCount \ | ||
* euphoria-examples/assembly/euphoria-examples.jar \ | ||
* "flink" \ | ||
* "hdfs:///tmp/swc-input" \ | ||
* "hdfs:///tmp/swc-output" \ | ||
* "2" | ||
*}</pre> | ||
* | ||
* Example usage on spark: | ||
* <pre>{@code | ||
* $ spark-submit --verbose --deploy-mode cluster \ | ||
* --master yarn \ | ||
* --executor-memory 1g \ | ||
* --num-executors 1 \ | ||
* --class cz.seznam.euphoria.examples.wordcount.SimpleWordCount \ | ||
* euphoria-examples/assembly/euphoria-examples.jar \ | ||
* "spark" \ | ||
* "hdfs:///tmp/swc-input" \ | ||
* "hdfs:///tmp/swc-output" \ | ||
* "1" | ||
* }</pre> | ||
*/ | ||
public class SimpleWordCount { | ||
|
||
private static final Pattern SPLIT_RE = Pattern.compile("\\s+"); | ||
|
||
public static void main(String[] args) throws Exception { | ||
if (args.length < 3) { | ||
System.err.println("Usage: " + SimpleWordCount.class | ||
+ " <executor-name> <input-uri> <output-uri> [num-output-partitions]"); | ||
System.exit(1); | ||
} | ||
|
||
Settings settings = new Settings(); | ||
settings.setClass("euphoria.io.datasource.factory.file", SimpleHadoopTextFileSource.Factory.class); | ||
settings.setClass("euphoria.io.datasource.factory.hdfs", SimpleHadoopTextFileSource.Factory.class); | ||
settings.setClass("euphoria.io.datasink.factory.file", SimpleHadoopTextFileSink.Factory.class); | ||
settings.setClass("euphoria.io.datasink.factory.hdfs", SimpleHadoopTextFileSink.Factory.class); | ||
settings.setClass("euphoria.io.datasink.factory.stdout", StdoutSink.Factory.class); | ||
|
||
final String executorName = args[0]; | ||
final String input = args[1]; | ||
final String output = args[2]; | ||
final int partitions = args.length > 3 ? Integer.parseInt(args[3]) : -1; | ||
|
||
Flow flow = buildFlow(settings, URI.create(input), URI.create(output), partitions); | ||
|
||
Executor executor = Executors.createExecutor(executorName); | ||
executor.submit(flow).get(); | ||
} | ||
|
||
private static Flow buildFlow(Settings settings, URI input, URI output, int partitions) | ||
throws Exception | ||
{ | ||
Flow flow = Flow.create(SimpleWordCount.class.getSimpleName(), settings); | ||
|
||
Dataset<String> lines = flow.createInput(input); | ||
|
||
// split lines to words | ||
Dataset<String> words = FlatMap.named("TOKENIZER") | ||
.of(lines) | ||
.using((String line, Context<String> c) -> | ||
SPLIT_RE.splitAsStream(line).forEachOrdered(c::collect)) | ||
.output(); | ||
|
||
// count per word | ||
Dataset<Pair<String, Long>> counted = ReduceByKey.named("REDUCE") | ||
.of(words) | ||
.keyBy(e -> e) | ||
.valueBy(e -> 1L) | ||
.combineBy(Sums.ofLongs()) | ||
.applyIf(partitions > 0, op -> op.setNumPartitions(partitions)) | ||
.output(); | ||
|
||
// format output | ||
MapElements.named("FORMAT") | ||
.of(counted) | ||
.using(p -> p.getKey() + "\t" + p.getSecond()) | ||
.output() | ||
.persist(output); | ||
|
||
return flow; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters