From cf430c91ce13d0b10d70696d49bd3119ab3b2deb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1ra=20Van=C4=9Bk?= Date: Fri, 17 Feb 2017 11:33:55 +0100 Subject: [PATCH] #18 Use @Nullable annotation --- .../extensions/euphoria/euphoria-core/pom.xml | 5 ++++ .../euphoria/core/client/dataset/Dataset.java | 5 ++++ .../core/client/dataset/InputDataset.java | 5 +++- .../core/client/dataset/OutputDataset.java | 6 +++++ .../client/dataset/windowing/Session.java | 2 ++ .../core/client/dataset/windowing/Time.java | 3 +++ .../euphoria/core/client/flow/Flow.java | 9 ++++--- .../euphoria/core/client/flow/Util.java | 5 +++- .../euphoria/core/client/graph/Node.java | 6 +++-- .../core/client/operator/CountByKey.java | 19 ++++++++------ .../core/client/operator/Distinct.java | 18 ++++++++----- .../euphoria/core/client/operator/Join.java | 25 +++++++++++-------- .../client/operator/PartitioningAware.java | 5 ++-- .../core/client/operator/ReduceByKey.java | 11 +++++--- .../client/operator/ReduceStateByKey.java | 21 ++++++++-------- .../core/client/operator/ReduceWindow.java | 18 +++++++------ .../client/operator/SingleInputOperator.java | 4 +-- .../StateAwareWindowWiseOperator.java | 6 +++-- ...ateAwareWindowWiseSingleInputOperator.java | 5 ++-- .../core/client/operator/SumByKey.java | 19 ++++++++------ .../core/client/operator/TopPerKey.java | 11 +++++--- .../client/operator/WindowWiseOperator.java | 11 +++++--- .../euphoria/core/client/util/Either.java | 6 +++-- .../core/executor/greduce/TimerSupport.java | 5 +++- .../seznam/euphoria/core/util/Settings.java | 5 +++- .../euphoria/euphoria-flink/pom.xml | 6 +++++ .../seznam/euphoria/flink/FlinkExecutor.java | 7 ++++-- .../seznam/euphoria/flink/FlowTranslator.java | 4 ++- .../batch/BatchStateStorageProvider.java | 3 +++ .../flink/streaming/io/DataSourceWrapper.java | 1 - .../streaming/windowing/StreamWindower.java | 3 ++- .../euphoria/euphoria-hadoop/pom.xml | 5 ++++ .../euphoria/hadoop/input/HadoopSource.java | 2 ++ .../hadoop/output/DataSinkOutputFormat.java | 3 +++ .../euphoria/euphoria-inmem/pom.xml | 5 ++++ .../euphoria/euphoria-kafka/pom.xml | 5 ++++ .../cz/seznam/euphoria/kafka/KafkaSource.java | 14 +++++++---- .../cz/seznam/euphoria/kafka/KafkaUtils.java | 5 ++-- .../euphoria-operator-testkit/pom.xml | 5 ++++ .../euphoria/euphoria-spark/pom.xml | 5 ++++ .../euphoria/spark/ReduceByKeyTranslator.java | 4 ++- .../spark/ReduceStateByKeyTranslator.java | 4 ++- .../euphoria/spark/SparkFlowTranslator.java | 4 ++- sdks/java/extensions/euphoria/pom.xml | 8 +++++- 44 files changed, 230 insertions(+), 98 deletions(-) diff --git a/sdks/java/extensions/euphoria/euphoria-core/pom.xml b/sdks/java/extensions/euphoria/euphoria-core/pom.xml index 7079901e45d31..7ccaefc3d0b65 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/pom.xml +++ b/sdks/java/extensions/euphoria/euphoria-core/pom.xml @@ -41,6 +41,11 @@ guava + + com.google.code.findbugs + jsr305 + + org.slf4j slf4j-api diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java index f632e31d2f517..d03898ddccef1 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java @@ -20,6 +20,7 @@ import cz.seznam.euphoria.core.client.io.DataSource; import cz.seznam.euphoria.core.client.operator.Operator; +import javax.annotation.Nullable; import java.io.Serializable; import java.net.URI; import java.util.Collection; @@ -44,11 +45,13 @@ public interface Dataset extends Serializable { * * @return this dataset's explicit source - if any */ + @Nullable DataSource getSource(); /** * @return the operator that produced this dataset - if any */ + @Nullable Operator getProducer(); /** @@ -105,6 +108,7 @@ default void checkpoint(URI uri) throws Exception { * data set is supposed to be persisted to, otherwise the * sink provided through {@link #persist(DataSink)}. */ + @Nullable default DataSink getOutputSink() { return null; } @@ -115,6 +119,7 @@ default DataSink getOutputSink() { * @return {@code null} if no checkpoint sink has been defined, * otherwise the sink provided through {@link #checkpoint(DataSink)} */ + @Nullable default DataSink getCheckpointSink() { return null; } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/InputDataset.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/InputDataset.java index 0ab13bb80ac27..4a777ecd2acf2 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/InputDataset.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/InputDataset.java @@ -15,11 +15,12 @@ */ package cz.seznam.euphoria.core.client.dataset; -import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.io.DataSink; import cz.seznam.euphoria.core.client.io.DataSource; import cz.seznam.euphoria.core.client.operator.Operator; + +import javax.annotation.Nullable; import java.util.Collection; /** @@ -37,11 +38,13 @@ public InputDataset(Flow flow, DataSource source, boolean bounded) { this.bounded = bounded; } + @Nullable @Override public DataSource getSource() { return source; } + @Nullable @Override public Operator getProducer() { return null; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/OutputDataset.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/OutputDataset.java index c287e73c08b83..612881431e39c 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/OutputDataset.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/OutputDataset.java @@ -19,6 +19,8 @@ import cz.seznam.euphoria.core.client.io.DataSink; import cz.seznam.euphoria.core.client.io.DataSource; import cz.seznam.euphoria.core.client.operator.Operator; + +import javax.annotation.Nullable; import java.util.Collection; /** @@ -39,11 +41,13 @@ public OutputDataset(Flow flow, Operator producer, boolean bounded) { this.bounded = bounded; } + @Nullable @Override public DataSource getSource() { return null; } + @Nullable @Override public Operator getProducer() { return producer; @@ -59,11 +63,13 @@ public void checkpoint(DataSink sink) { checkpointSink = sink; } + @Nullable @Override public DataSink getOutputSink() { return outputSink; } + @Nullable @Override public DataSink getCheckpointSink() { return checkpointSink; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/Session.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/Session.java index 9b519b42a5ae5..e2df7054c05d2 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/Session.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/Session.java @@ -22,6 +22,7 @@ import cz.seznam.euphoria.core.client.triggers.Trigger; import cz.seznam.euphoria.core.client.util.Pair; +import javax.annotation.Nullable; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -38,6 +39,7 @@ public final class Session implements MergingWindowing { private final long gapDurationMillis; + @Nullable private Duration earlyTriggeringPeriod; public static Session of(Duration gapDuration) { diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/Time.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/Time.java index 4266d1f53e0b4..6633f720dc01a 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/Time.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/Time.java @@ -21,6 +21,7 @@ import cz.seznam.euphoria.core.client.triggers.TimeTrigger; import cz.seznam.euphoria.core.client.triggers.Trigger; +import javax.annotation.Nullable; import java.time.Duration; import java.util.Arrays; import java.util.Objects; @@ -34,6 +35,7 @@ public class Time implements Windowing { private final long durationMillis; + @Nullable private Duration earlyTriggeringPeriod; public static Time of(Duration duration) { @@ -79,6 +81,7 @@ public Trigger getTrigger() { return new TimeTrigger(); } + @Nullable public Duration getEarlyTriggeringPeriod() { return earlyTriggeringPeriod; } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Flow.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Flow.java index 7646480a08578..7f2addc0b8d2d 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Flow.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Flow.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.IOException; import java.io.ObjectOutputStream; import java.io.OutputStream; @@ -88,7 +89,7 @@ public class Flow implements Serializable { = new HashMap<>(); - private Flow(String name, Settings settings) { + private Flow(@Nullable String name, Settings settings) { this.name = name == null ? "" : name; this.settings = cloneSettings(settings); } @@ -110,7 +111,7 @@ public static Flow create() { * * @return a newly created flow */ - public static Flow create(String flowName) { + public static Flow create(@Nullable String flowName) { return new Flow(flowName, new Settings()); } @@ -157,7 +158,7 @@ public > T add(T operator) { * * @return the added operator */ - > T add(T operator, String logicalName) { + > T add(T operator, @Nullable String logicalName) { operatorNames.put(operator, buildOperatorName(operator, logicalName)); operators.add(operator); @@ -211,7 +212,7 @@ private void validateSerializable(Operator o) { } } - private String buildOperatorName(Operator op, String logicalName) { + private String buildOperatorName(Operator op, @Nullable String logicalName) { StringBuilder sb = new StringBuilder(64); sb.append(op.getName()).append('@').append(operatorNames.size() + 1); logicalName = Util.trimToNull(logicalName); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Util.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Util.java index a9f0a432ebdba..f493a277b2922 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Util.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Util.java @@ -15,6 +15,8 @@ */ package cz.seznam.euphoria.core.client.flow; +import javax.annotation.Nullable; + class Util { /** @@ -23,7 +25,8 @@ class Util { * @param s input string * @return non-empty trimmed string or null */ - static String trimToNull(String s) { + @Nullable + static String trimToNull(@Nullable String s) { if (s == null) { return null; } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/graph/Node.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/graph/Node.java index 6398108f9c079..ffab7afcfa642 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/graph/Node.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/graph/Node.java @@ -15,6 +15,7 @@ */ package cz.seznam.euphoria.core.client.graph; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -26,17 +27,18 @@ public final class Node { final List> children = new ArrayList<>(); + @Nullable final T value; final List> parents = new ArrayList<>(); @SuppressWarnings("unchecked") private static final Node NULL_NODE = new Node(null); - Node(T value) { + Node(@Nullable T value) { this.value = value; } - Node(T value, List> parents) { + Node(@Nullable T value, List> parents) { this(value); this.parents.addAll(parents); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java index 64604c78a5294..31b045e9db5b0 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java @@ -26,6 +26,7 @@ import cz.seznam.euphoria.core.client.graph.DAG; import cz.seznam.euphoria.core.client.util.Pair; +import javax.annotation.Nullable; import java.util.Objects; /** @@ -97,15 +98,17 @@ public static class OutputBuilder private final String name; private final Dataset input; private final UnaryFunction keyExtractor; + @Nullable private final Windowing windowing; + @Nullable private final UnaryFunction eventTimeAssigner; OutputBuilder(String name, Dataset input, UnaryFunction keyExtractor, - Windowing windowing /* optional */, - UnaryFunction eventTimeAssigner /* optional */, + @Nullable Windowing windowing, + @Nullable UnaryFunction eventTimeAssigner, PartitioningBuilder partitioning) { //initialize partitioning @@ -137,12 +140,12 @@ public static OfBuilder named(String name) { } CountByKey(String name, - Flow flow, - Dataset input, - UnaryFunction extractor, - Windowing windowing /* optional */, - UnaryFunction eventTimeAssigner /* optional */, - Partitioning partitioning) { + Flow flow, + Dataset input, + UnaryFunction extractor, + @Nullable Windowing windowing, + @Nullable UnaryFunction eventTimeAssigner, + Partitioning partitioning) { super(name, flow, input, extractor, windowing, eventTimeAssigner, partitioning); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java index ae2b98877dd4d..e36d4e66ab8ca 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java @@ -27,6 +27,7 @@ import cz.seznam.euphoria.core.client.graph.DAG; import cz.seznam.euphoria.core.client.util.Pair; +import javax.annotation.Nullable; import java.util.Objects; /** @@ -64,11 +65,13 @@ public static class WindowingBuilder { private final String name; private final Dataset input; + @Nullable private final UnaryFunction mapper; + WindowingBuilder( String name, Dataset input, - UnaryFunction mapper /* optional */) { + @Nullable UnaryFunction mapper) { // define default partitioning super(new DefaultPartitioning<>(input.getNumPartitions())); @@ -100,16 +103,19 @@ public static class OutputBuilder { private final String name; private final Dataset input; + @Nullable private final UnaryFunction mapper; + @Nullable private final Windowing windowing; + @Nullable private final UnaryFunction eventTimeAssigner; OutputBuilder(String name, Dataset input, - UnaryFunction mapper /* optional */, + @Nullable UnaryFunction mapper, PartitioningBuilder partitioning, - Windowing windowing /* optional */, - UnaryFunction eventTimeAssigner /* optional */) { + @Nullable Windowing windowing, + @Nullable UnaryFunction eventTimeAssigner) { super(partitioning); this.name = Objects.requireNonNull(name); @@ -143,8 +149,8 @@ public static OfBuilder named(String name) { Dataset input, UnaryFunction mapper, Partitioning partitioning, - Windowing windowing /* optional */, - UnaryFunction eventTimeAssigner /* optional */) { + @Nullable Windowing windowing, + @Nullable UnaryFunction eventTimeAssigner) { super(name, flow, input, mapper, windowing, eventTimeAssigner, partitioning); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java index 885b151b18314..0bb3e7832c0d2 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java @@ -33,6 +33,7 @@ import cz.seznam.euphoria.core.client.util.Either; import cz.seznam.euphoria.core.client.util.Pair; +import javax.annotation.Nullable; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; @@ -203,7 +204,9 @@ public static class OutputBuilder< private final UnaryFunction rightKeyExtractor; private final BinaryFunctor joinFunc; private final boolean outer; + @Nullable private final Windowing, W> windowing; + @Nullable private final UnaryFunction, Long> eventTimeAssigner; OutputBuilder(String name, @@ -214,8 +217,8 @@ public static class OutputBuilder< BinaryFunctor joinFunc, boolean outer, PartitioningBuilder partitioning, - Windowing, W> windowing /* optional */, - UnaryFunction, Long> eventTimeAssigner /* optional */) { + @Nullable Windowing, W> windowing, + @Nullable UnaryFunction, Long> eventTimeAssigner) { super(partitioning); @@ -262,15 +265,15 @@ public static OfBuilder named(String name) { boolean outer = false; Join(String name, - Flow flow, - Dataset left, Dataset right, - Windowing, W> windowing /* optional */, - UnaryFunction, Long> eventTimeAssigner /* optional */, - Partitioning partitioning, - UnaryFunction leftKeyExtractor, - UnaryFunction rightKeyExtractor, - BinaryFunctor functor, - boolean outer) { + Flow flow, + Dataset left, Dataset right, + @Nullable Windowing, W> windowing, + @Nullable UnaryFunction, Long> eventTimeAssigner, + Partitioning partitioning, + UnaryFunction leftKeyExtractor, + UnaryFunction rightKeyExtractor, + BinaryFunctor functor, + boolean outer) { super(name, flow, windowing, eventTimeAssigner, (Either elem) -> { if (elem.isLeft()) { diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/PartitioningAware.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/PartitioningAware.java index 928d64f3506d0..9b1c70743ee64 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/PartitioningAware.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/PartitioningAware.java @@ -18,6 +18,7 @@ import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioner; import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; +import javax.annotation.Nullable; import java.util.Objects; /** @@ -30,6 +31,7 @@ public interface PartitioningAware { abstract class PartitioningBuilder implements OptionalMethodBuilder { private final DefaultPartitioning defaultPartitioning; + @Nullable private Partitioning partitioning; public PartitioningBuilder(DefaultPartitioning defaultPartitioning) { @@ -37,8 +39,7 @@ public PartitioningBuilder(DefaultPartitioning defaultPartitioning) { } public PartitioningBuilder(DefaultPartitioning defaultPartitioning, - Partitioning partitioning) - { + @Nullable Partitioning partitioning) { this.defaultPartitioning = Objects.requireNonNull(defaultPartitioning); this.partitioning = partitioning; } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java index 2312f55dcbda4..6cadb6f318429 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java @@ -33,6 +33,7 @@ import cz.seznam.euphoria.core.client.operator.state.StorageProvider; import cz.seznam.euphoria.core.client.util.Pair; +import javax.annotation.Nullable; import java.util.Iterator; import java.util.Objects; @@ -181,7 +182,9 @@ public static class DatasetBuilder5< private final UnaryFunction keyExtractor; private final UnaryFunction valueExtractor; private final ReduceFunction reducer; + @Nullable private final Windowing windowing; + @Nullable private final UnaryFunction eventTimeAssigner; DatasetBuilder5(String name, @@ -189,8 +192,8 @@ public static class DatasetBuilder5< UnaryFunction keyExtractor, UnaryFunction valueExtractor, ReduceFunction reducer, - Windowing windowing /* optional */, - UnaryFunction eventTimeAssigner /* optional */, + @Nullable Windowing windowing, + @Nullable UnaryFunction eventTimeAssigner, PartitioningBuilder partitioning) { // initialize default partitioning according to input @@ -233,8 +236,8 @@ public static OfBuilder named(String name) { Dataset input, UnaryFunction keyExtractor, UnaryFunction valueExtractor, - Windowing windowing /* optional */, - UnaryFunction eventTimeAssigner /* optional */, + @Nullable Windowing windowing, + @Nullable UnaryFunction eventTimeAssigner, ReduceFunction reducer, Partitioning partitioning) { super(name, flow, input, keyExtractor, windowing, eventTimeAssigner, partitioning); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java index 674229d96d158..4bc27f140b343 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java @@ -28,6 +28,7 @@ import cz.seznam.euphoria.core.client.operator.state.State; import cz.seznam.euphoria.core.client.util.Pair; +import javax.annotation.Nullable; import java.util.Objects; /** @@ -155,8 +156,7 @@ public static class DatasetBuilder5< UnaryFunction keyExtractor, UnaryFunction valueExtractor, StateFactory stateFactory, - CombinableReduceFunction stateCombiner) - { + CombinableReduceFunction stateCombiner) { // initialize default partitioning according to input super(new DefaultPartitioning<>(input.getNumPartitions())); @@ -175,8 +175,7 @@ public static class DatasetBuilder5< } public DatasetBuilder6 - windowBy(Windowing windowing, UnaryFunction eventTimeAssigner) - { + windowBy(Windowing windowing, UnaryFunction eventTimeAssigner) { return new DatasetBuilder6<>(name, input, keyExtractor, valueExtractor, stateFactory, stateCombiner, Objects.requireNonNull(windowing), eventTimeAssigner, this); @@ -194,15 +193,17 @@ public static class DatasetBuilder6< W extends Window> extends PartitioningBuilder< KEY,DatasetBuilder6> - implements OutputBuilder> - { + implements OutputBuilder> { + private final String name; private final Dataset input; private final UnaryFunction keyExtractor; private final UnaryFunction valueExtractor; private final StateFactory stateFactory; private final CombinableReduceFunction stateCombiner; + @Nullable private final Windowing windowing; + @Nullable private final UnaryFunction eventTimeAssigner; DatasetBuilder6(String name, @@ -211,8 +212,8 @@ public static class DatasetBuilder6< UnaryFunction valueExtractor, StateFactory stateFactory, CombinableReduceFunction stateCombiner, - Windowing windowing /* optional */, - UnaryFunction eventTimeAssigner /* optional*/, + @Nullable Windowing windowing, + @Nullable UnaryFunction eventTimeAssigner, PartitioningBuilder partitioning) { // initialize partitioning @@ -261,8 +262,8 @@ public static OfBuilder named(String name) { Dataset input, UnaryFunction keyExtractor, UnaryFunction valueExtractor, - Windowing windowing /* optional */, - UnaryFunction eventTimeAssigner /* optional */, + @Nullable Windowing windowing, + @Nullable UnaryFunction eventTimeAssigner, StateFactory stateFactory, CombinableReduceFunction stateCombiner, Partitioning partitioning) diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java index 2d572c5b106dd..2814bd81fc1f6 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java @@ -29,6 +29,8 @@ import cz.seznam.euphoria.core.client.graph.DAG; import cz.seznam.euphoria.core.client.util.Pair; +import javax.annotation.Nullable; + /** * Reduce all elements in window. */ @@ -158,14 +160,14 @@ public static OfBuilder named(String name) { static final Byte B_ZERO = (byte) 0; private ReduceWindow( - String name, - Flow flow, - Dataset input, - UnaryFunction valueExtractor, - Windowing windowing /* optional */, - UnaryFunction eventTimeAssigner /* optional */, - ReduceFunction reducer, - int numPartitions) { + String name, + Flow flow, + Dataset input, + UnaryFunction valueExtractor, + @Nullable Windowing windowing, + @Nullable UnaryFunction eventTimeAssigner, + ReduceFunction reducer, + int numPartitions) { super(name, flow, input, e -> B_ZERO, windowing, eventTimeAssigner, new Partitioning() { diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SingleInputOperator.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SingleInputOperator.java index e6c20dc305398..c32ae78b7b8bb 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SingleInputOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SingleInputOperator.java @@ -18,8 +18,8 @@ import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; -import java.util.Arrays; import java.util.Collection; +import java.util.Collections; /** * Operator with single input. @@ -41,7 +41,7 @@ public Dataset input() { /** @return all of this operator's input as single element collection */ @Override public Collection> listInputs() { - return Arrays.asList(input); + return Collections.singletonList(input); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseOperator.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseOperator.java index 891b20b5ee7e1..61d7e0ebc1bfc 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseOperator.java @@ -22,6 +22,8 @@ import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunction; +import javax.annotation.Nullable; + /** * Operator with internal state. */ @@ -38,8 +40,8 @@ public abstract class StateAwareWindowWiseOperator< protected StateAwareWindowWiseOperator( String name, Flow flow, - Windowing windowing /* optional */, - UnaryFunction eventTimeAssigner /* optional */, + @Nullable Windowing windowing, + @Nullable UnaryFunction eventTimeAssigner, UnaryFunction keyExtractor, Partitioning partitioning) { diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java index 1785a70f5845a..9afc42e982855 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java @@ -22,6 +22,7 @@ import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunction; +import javax.annotation.Nullable; import java.util.Collection; import java.util.Collections; @@ -40,8 +41,8 @@ protected StateAwareWindowWiseSingleInputOperator( String name, Flow flow, Dataset input, UnaryFunction extractor, - Windowing windowing /* optional */, - UnaryFunction eventTimeAssigner /* optional */, + @Nullable Windowing windowing, + @Nullable UnaryFunction eventTimeAssigner, Partitioning partitioning) { super(name, flow, windowing, eventTimeAssigner, extractor, partitioning); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java index 698a96370b7f4..b96b9f441fce0 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java @@ -27,6 +27,7 @@ import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.core.client.util.Sums; +import javax.annotation.Nullable; import java.util.Objects; /** @@ -109,16 +110,18 @@ public static class OutputBuilder private final Dataset input; private final UnaryFunction keyExtractor; private final UnaryFunction valueExtractor; - private final Windowing windowing; /* optional */ - private final UnaryFunction eventTimeAssigner; /* optional */ + @Nullable + private final Windowing windowing; + @Nullable + private final UnaryFunction eventTimeAssigner; + OutputBuilder(String name, Dataset input, UnaryFunction keyExtractor, UnaryFunction valueExtractor, - Windowing windowing /* optional */, - UnaryFunction eventTimeAssigner /* optional */, - PartitioningBuilder partitioning) - { + @Nullable Windowing windowing, + @Nullable UnaryFunction eventTimeAssigner, + PartitioningBuilder partitioning) { super(partitioning); this.name = Objects.requireNonNull(name); @@ -154,8 +157,8 @@ public static OfBuilder named(String name) { Dataset input, UnaryFunction keyExtractor, UnaryFunction valueExtractor, - Windowing windowing /* optional */, - UnaryFunction eventTimeAssigner /* optional */, + @Nullable Windowing windowing, + @Nullable UnaryFunction eventTimeAssigner, Partitioning partitioning) { super(name, flow, input, keyExtractor, windowing, eventTimeAssigner, partitioning); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java index c0bcbec4b6e58..8ce57b8e7d1ff 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java @@ -33,6 +33,7 @@ import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.core.client.util.Triple; +import javax.annotation.Nullable; import java.util.Iterator; import static java.util.Objects.requireNonNull; @@ -209,7 +210,9 @@ public static class OutputBuilder< private final UnaryFunction keyFn; private final UnaryFunction valueFn; private final UnaryFunction scoreFn; + @Nullable private final Windowing windowing; + @Nullable private final UnaryFunction eventTimeAssigner; OutputBuilder(String name, @@ -218,8 +221,8 @@ public static class OutputBuilder< UnaryFunction valueFn, UnaryFunction scoreFn, PartitioningBuilder partitioning, - Windowing windowing /* optional */, - UnaryFunction eventTimeAssigner /*optional */) { + @Nullable Windowing windowing, + @Nullable UnaryFunction eventTimeAssigner) { super(partitioning); @@ -263,8 +266,8 @@ public static OfBuilder named(String name) { UnaryFunction valueFn, UnaryFunction scoreFn, Partitioning partitioning, - Windowing windowing /* optional */, - UnaryFunction eventTimeAssigner /*optional */) { + @Nullable Windowing windowing, + @Nullable UnaryFunction eventTimeAssigner) { super(name, flow, input, keyFn, windowing, eventTimeAssigner, partitioning); this.valueFn = valueFn; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/WindowWiseOperator.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/WindowWiseOperator.java index a35265b6a1cfc..d47e6674856dc 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/WindowWiseOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/WindowWiseOperator.java @@ -20,6 +20,8 @@ import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunction; +import javax.annotation.Nullable; + /** * Operator working on some context. */ @@ -27,24 +29,27 @@ public abstract class WindowWiseOperator< IN, WIN, OUT, W extends Window> extends Operator implements WindowAware { + @Nullable protected Windowing windowing; + @Nullable protected UnaryFunction eventTimeAssigner; public WindowWiseOperator(String name, Flow flow, - Windowing windowing /* optional */, - UnaryFunction eventTimeAssigner /* optional */) - { + @Nullable Windowing windowing, + @Nullable UnaryFunction eventTimeAssigner) { super(name, flow); this.windowing = windowing; this.eventTimeAssigner = eventTimeAssigner; } + @Nullable @Override public Windowing getWindowing() { return windowing; } + @Nullable @Override public UnaryFunction getEventTimeAssigner() { return eventTimeAssigner; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/util/Either.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/util/Either.java index b0d27f3a0cb2d..d6ffff641c3aa 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/util/Either.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/util/Either.java @@ -15,7 +15,7 @@ */ package cz.seznam.euphoria.core.client.util; -import cz.seznam.euphoria.core.client.functional.UnaryFunction; +import javax.annotation.Nullable; import static java.util.Objects.requireNonNull; @@ -24,7 +24,9 @@ */ public final class Either { + @Nullable final LEFT left; + @Nullable final RIGHT right; public static Either left(LEFT left) { @@ -39,7 +41,7 @@ public static Either right(RIGHT right) { } - private Either(LEFT left, RIGHT right) { + private Either(@Nullable LEFT left, @Nullable RIGHT right) { this.left = left; this.right = right; } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/greduce/TimerSupport.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/greduce/TimerSupport.java index 5fa67874d0166..bc698c840fcd9 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/greduce/TimerSupport.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/greduce/TimerSupport.java @@ -17,6 +17,7 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Window; +import javax.annotation.Nullable; import java.util.HashSet; import java.util.PriorityQueue; @@ -28,14 +29,16 @@ class TimerSupport { static final class Timer implements Comparable> { + @Nullable final W window; final long time; - Timer(W window, long time) { + Timer(@Nullable W window, long time) { this.window = window; this.time = time; } + @Nullable public W getWindow() { return window; } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/util/Settings.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/util/Settings.java index 7e4a7642e3d30..f9d29d2fd2e4a 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/util/Settings.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/util/Settings.java @@ -16,6 +16,8 @@ package cz.seznam.euphoria.core.util; import cz.seznam.euphoria.core.client.util.Pair; + +import javax.annotation.Nullable; import java.io.Serializable; import java.net.URI; import java.util.Map; @@ -31,6 +33,7 @@ */ public class Settings implements Serializable { + @Nullable private final String prefix; private final Map map; @@ -51,7 +54,7 @@ public Settings(Settings other) { }); } - private Settings(String prefix, Map map) { + private Settings(@Nullable String prefix, Map map) { this.prefix = prefix; this.map = map; } diff --git a/sdks/java/extensions/euphoria/euphoria-flink/pom.xml b/sdks/java/extensions/euphoria/euphoria-flink/pom.xml index 0ddcfb926f4d1..ee69ea85752da 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/pom.xml +++ b/sdks/java/extensions/euphoria/euphoria-flink/pom.xml @@ -49,6 +49,12 @@ guava + + com.google.code.findbugs + jsr305 + 1.3.9 + + org.apache.flink diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlinkExecutor.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlinkExecutor.java index bc57691d28b9a..30e6b11843e7d 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlinkExecutor.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlinkExecutor.java @@ -27,6 +27,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.time.Duration; import java.util.HashSet; import java.util.List; @@ -50,6 +52,7 @@ public class FlinkExecutor implements Executor { private Duration autoWatermarkInterval = Duration.ofMillis(200); private Duration allowedLateness = Duration.ofMillis(0); private final Set> registeredClasses = new HashSet<>(); + @Nullable private Duration checkpointInterval; // executor to submit flows, if closed all executions should be interrupted @@ -242,8 +245,8 @@ public FlinkExecutor registerClass(Class cls) { * * @return this instance (for method chaining purposes) */ - public FlinkExecutor setCheckpointInterval(Duration interval) { - this.checkpointInterval = interval; + public FlinkExecutor setCheckpointInterval(@Nonnull Duration interval) { + this.checkpointInterval = Objects.requireNonNull(interval); return this; } } diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlowTranslator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlowTranslator.java index 2b0b5ae7abd7f..7be52cd666598 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlowTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlowTranslator.java @@ -22,6 +22,7 @@ import cz.seznam.euphoria.core.client.operator.Operator; import cz.seznam.euphoria.core.executor.FlowUnfolder; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; import java.util.IdentityHashMap; @@ -47,13 +48,14 @@ public static final class TranslateAcceptor implements UnaryPredicate> { final Class type; + @Nullable final UnaryPredicate accept; public TranslateAcceptor(Class type) { this (type, null); } - public TranslateAcceptor(Class type, UnaryPredicate accept) { + public TranslateAcceptor(Class type, @Nullable UnaryPredicate accept) { this.type = Objects.requireNonNull(type); this.accept = accept; } diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchStateStorageProvider.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchStateStorageProvider.java index 60dde29d4b00c..4c529d621eb12 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchStateStorageProvider.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchStateStorageProvider.java @@ -45,6 +45,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + /** * Storage provider for batch processing. * We will store the data in memory, for some amount of data and then @@ -92,6 +94,7 @@ static class MemListStorage implements ListStorage { // serializer for the class Serializer serializer; List data = new ArrayList<>(); + @Nullable File serializedElements = null; Output output = null; diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSourceWrapper.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSourceWrapper.java index 8ec5fbe1ddd2a..84f0229da2189 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSourceWrapper.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSourceWrapper.java @@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamWindower.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamWindower.java index 3dcdc4a84d81d..760eb9795d1fa 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamWindower.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamWindower.java @@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import javax.annotation.Nullable; import java.time.Duration; import java.util.Objects; @@ -78,7 +79,7 @@ public StreamWindower(Duration allowedLateness) { UnaryFunction keyFn, UnaryFunction valFn, Windowing windowing, - UnaryFunction eventTimeAssigner) { + @Nullable UnaryFunction eventTimeAssigner) { if (eventTimeAssigner != null) { input = input.assignTimestampsAndWatermarks( diff --git a/sdks/java/extensions/euphoria/euphoria-hadoop/pom.xml b/sdks/java/extensions/euphoria/euphoria-hadoop/pom.xml index 5aedc3558bd51..421f66bb5ecb2 100644 --- a/sdks/java/extensions/euphoria/euphoria-hadoop/pom.xml +++ b/sdks/java/extensions/euphoria/euphoria-hadoop/pom.xml @@ -44,6 +44,11 @@ guava + + com.google.code.findbugs + jsr305 + + org.apache.hadoop hadoop-common diff --git a/sdks/java/extensions/euphoria/euphoria-hadoop/src/main/java/cz/seznam/euphoria/hadoop/input/HadoopSource.java b/sdks/java/extensions/euphoria/euphoria-hadoop/src/main/java/cz/seznam/euphoria/hadoop/input/HadoopSource.java index ac6549db9c2fc..950884b17a8f8 100644 --- a/sdks/java/extensions/euphoria/euphoria-hadoop/src/main/java/cz/seznam/euphoria/hadoop/input/HadoopSource.java +++ b/sdks/java/extensions/euphoria/euphoria-hadoop/src/main/java/cz/seznam/euphoria/hadoop/input/HadoopSource.java @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import javax.annotation.Nullable; import java.io.IOException; import java.util.Arrays; import java.util.List; @@ -50,6 +51,7 @@ public class HadoopSource implements DataSource> { private final Class> hadoopFormatCls; private final SerializableWritable conf; + @Nullable private transient InputFormat hadoopFormatInstance; public HadoopSource(Class keyClass, Class valueClass, diff --git a/sdks/java/extensions/euphoria/euphoria-hadoop/src/main/java/cz/seznam/euphoria/hadoop/output/DataSinkOutputFormat.java b/sdks/java/extensions/euphoria/euphoria-hadoop/src/main/java/cz/seznam/euphoria/hadoop/output/DataSinkOutputFormat.java index 4321ad6eaf24a..88fe6880e0582 100644 --- a/sdks/java/extensions/euphoria/euphoria-hadoop/src/main/java/cz/seznam/euphoria/hadoop/output/DataSinkOutputFormat.java +++ b/sdks/java/extensions/euphoria/euphoria-hadoop/src/main/java/cz/seznam/euphoria/hadoop/output/DataSinkOutputFormat.java @@ -31,6 +31,8 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import javax.annotation.Nullable; + /** * {@code OutputFormat} created from {@code DataSink}. * Because of the hadoop output format contract, we need to be able to @@ -100,6 +102,7 @@ private static DataSink fromBase64(String base64Bytes) // instance of the data sink private DataSink sink; // the single writer per output format instance and thread + @Nullable private Writer writer; @Override diff --git a/sdks/java/extensions/euphoria/euphoria-inmem/pom.xml b/sdks/java/extensions/euphoria/euphoria-inmem/pom.xml index a0675d0df30ff..12fd5d502e79c 100644 --- a/sdks/java/extensions/euphoria/euphoria-inmem/pom.xml +++ b/sdks/java/extensions/euphoria/euphoria-inmem/pom.xml @@ -44,6 +44,11 @@ guava + + com.google.code.findbugs + jsr305 + + diff --git a/sdks/java/extensions/euphoria/euphoria-kafka/pom.xml b/sdks/java/extensions/euphoria/euphoria-kafka/pom.xml index 86cc24d4c46c9..a0e550cb59e8d 100644 --- a/sdks/java/extensions/euphoria/euphoria-kafka/pom.xml +++ b/sdks/java/extensions/euphoria/euphoria-kafka/pom.xml @@ -46,6 +46,11 @@ guava + + com.google.code.findbugs + jsr305 + + cz.seznam.euphoria euphoria-core diff --git a/sdks/java/extensions/euphoria/euphoria-kafka/src/main/java/cz/seznam/euphoria/kafka/KafkaSource.java b/sdks/java/extensions/euphoria/euphoria-kafka/src/main/java/cz/seznam/euphoria/kafka/KafkaSource.java index cce69003ee661..9ab9d3afd5d71 100644 --- a/sdks/java/extensions/euphoria/euphoria-kafka/src/main/java/cz/seznam/euphoria/kafka/KafkaSource.java +++ b/sdks/java/extensions/euphoria/euphoria-kafka/src/main/java/cz/seznam/euphoria/kafka/KafkaSource.java @@ -34,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.IOException; import java.net.URI; import java.util.ArrayList; @@ -122,6 +123,7 @@ static final class KafkaPartition implements Partition> { private final String groupId; private final int partition; private final String host; + @Nullable private final Settings config; private final long startOffset; @@ -131,7 +133,7 @@ static final class KafkaPartition implements Partition> { KafkaPartition(String brokerList, String topicId, String groupId, int partition, String host, - Settings config /* optional */, + @Nullable Settings config, long startOffset, long stopReadingAtStamp) { @@ -171,7 +173,8 @@ static final class AllPartitionsConsumer implements Partition 0 private final long stopReadingAtStamp; @@ -179,7 +182,7 @@ static final class AllPartitionsConsumer implements Partition DataSource get(URI uri, Settings settings) { private final String brokerList; private final String topicId; private final String groupId; - private final Settings config; // ~ optional + @Nullable + private final Settings config; KafkaSource( String brokerList, String topicId, String groupId, - Settings config) { + @Nullable Settings config) { this.brokerList = requireNonNull(brokerList); this.topicId = requireNonNull(topicId); this.groupId = requireNonNull(groupId); diff --git a/sdks/java/extensions/euphoria/euphoria-kafka/src/main/java/cz/seznam/euphoria/kafka/KafkaUtils.java b/sdks/java/extensions/euphoria/euphoria-kafka/src/main/java/cz/seznam/euphoria/kafka/KafkaUtils.java index 580b64f305430..e4bcedfaa6f1b 100644 --- a/sdks/java/extensions/euphoria/euphoria-kafka/src/main/java/cz/seznam/euphoria/kafka/KafkaUtils.java +++ b/sdks/java/extensions/euphoria/euphoria-kafka/src/main/java/cz/seznam/euphoria/kafka/KafkaUtils.java @@ -36,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -48,7 +49,7 @@ class KafkaUtils { private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class); - private static Properties toProperties(Settings properties) { + private static Properties toProperties(@Nullable Settings properties) { final Properties ps = new Properties(); if (properties != null) { for (Map.Entry e : properties.getAll().entrySet()) { @@ -72,7 +73,7 @@ private static Properties toProperties(Settings properties) { } public static Consumer - newConsumer(String brokerList, String groupId, Settings config) + newConsumer(String brokerList, String groupId, @Nullable Settings config) { Properties ps = toProperties(config); ps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); diff --git a/sdks/java/extensions/euphoria/euphoria-operator-testkit/pom.xml b/sdks/java/extensions/euphoria/euphoria-operator-testkit/pom.xml index 6488822e054af..86bf6e186dc9e 100644 --- a/sdks/java/extensions/euphoria/euphoria-operator-testkit/pom.xml +++ b/sdks/java/extensions/euphoria/euphoria-operator-testkit/pom.xml @@ -63,6 +63,11 @@ guava + + com.google.code.findbugs + jsr305 + + org.slf4j slf4j-api diff --git a/sdks/java/extensions/euphoria/euphoria-spark/pom.xml b/sdks/java/extensions/euphoria/euphoria-spark/pom.xml index 5f5a2772f7f61..c16fc3f0d633c 100644 --- a/sdks/java/extensions/euphoria/euphoria-spark/pom.xml +++ b/sdks/java/extensions/euphoria/euphoria-spark/pom.xml @@ -43,6 +43,11 @@ guava + + com.google.code.findbugs + jsr305 + + cz.seznam.euphoria euphoria-hadoop diff --git a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceByKeyTranslator.java b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceByKeyTranslator.java index cf691bbd40895..afc014ccad03f 100644 --- a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceByKeyTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceByKeyTranslator.java @@ -31,6 +31,7 @@ import org.apache.spark.api.java.function.PairFlatMapFunction; import scala.Tuple2; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; @@ -107,12 +108,13 @@ private static class CompositeKeyExtractor private final UnaryFunction keyExtractor; private final UnaryFunction valueExtractor; private final Windowing windowing; + @Nullable private final UnaryFunction eventTimeAssigner; public CompositeKeyExtractor(UnaryFunction keyExtractor, UnaryFunction valueExtractor, Windowing windowing, - UnaryFunction eventTimeAssigner /* optional */) { + @Nullable UnaryFunction eventTimeAssigner) { this.keyExtractor = keyExtractor; this.valueExtractor = valueExtractor; this.windowing = windowing; diff --git a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceStateByKeyTranslator.java b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceStateByKeyTranslator.java index 7420339903fda..31dbcb475c003 100644 --- a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceStateByKeyTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceStateByKeyTranslator.java @@ -35,6 +35,7 @@ import org.apache.spark.api.java.function.PairFlatMapFunction; import scala.Tuple2; +import javax.annotation.Nullable; import java.io.Serializable; import java.util.ArrayList; import java.util.Comparator; @@ -117,12 +118,13 @@ private static class CompositeKeyExtractor private final UnaryFunction keyExtractor; private final UnaryFunction valueExtractor; private final Windowing windowing; + @Nullable private final UnaryFunction eventTimeAssigner; public CompositeKeyExtractor(UnaryFunction keyExtractor, UnaryFunction valueExtractor, Windowing windowing, - UnaryFunction eventTimeAssigner /* optional */) { + @Nullable UnaryFunction eventTimeAssigner) { this.keyExtractor = keyExtractor; this.valueExtractor = valueExtractor; this.windowing = windowing; diff --git a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/SparkFlowTranslator.java b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/SparkFlowTranslator.java index ca901b09e5a1d..421d028373ca6 100644 --- a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/SparkFlowTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/SparkFlowTranslator.java @@ -38,6 +38,7 @@ import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; +import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -169,13 +170,14 @@ public static final class TranslateAcceptor implements UnaryPredicate> { final Class type; + @Nullable final UnaryPredicate accept; public TranslateAcceptor(Class type) { this (type, null); } - public TranslateAcceptor(Class type, UnaryPredicate accept) { + public TranslateAcceptor(Class type, @Nullable UnaryPredicate accept) { this.type = Objects.requireNonNull(type); this.accept = accept; } diff --git a/sdks/java/extensions/euphoria/pom.xml b/sdks/java/extensions/euphoria/pom.xml index 150e26dff78ad..f421e610663ab 100644 --- a/sdks/java/extensions/euphoria/pom.xml +++ b/sdks/java/extensions/euphoria/pom.xml @@ -432,7 +432,7 @@ sign - + org.apache.maven.plugins maven-gpg-plugin @@ -508,6 +508,12 @@ ${guava.version} + + com.google.code.findbugs + jsr305 + 1.3.9 + + org.projectlombok lombok