diff --git a/sdks/java/extensions/euphoria/euphoria-core/pom.xml b/sdks/java/extensions/euphoria/euphoria-core/pom.xml
index 7079901e45d31..7ccaefc3d0b65 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/pom.xml
+++ b/sdks/java/extensions/euphoria/euphoria-core/pom.xml
@@ -41,6 +41,11 @@
guava
+
+ com.google.code.findbugs
+ jsr305
+
+
org.slf4j
slf4j-api
diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java
index f632e31d2f517..d03898ddccef1 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java
+++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java
@@ -20,6 +20,7 @@
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.Operator;
+import javax.annotation.Nullable;
import java.io.Serializable;
import java.net.URI;
import java.util.Collection;
@@ -44,11 +45,13 @@ public interface Dataset extends Serializable {
*
* @return this dataset's explicit source - if any
*/
+ @Nullable
DataSource getSource();
/**
* @return the operator that produced this dataset - if any
*/
+ @Nullable
Operator, T> getProducer();
/**
@@ -105,6 +108,7 @@ default void checkpoint(URI uri) throws Exception {
* data set is supposed to be persisted to, otherwise the
* sink provided through {@link #persist(DataSink)}.
*/
+ @Nullable
default DataSink getOutputSink() {
return null;
}
@@ -115,6 +119,7 @@ default DataSink getOutputSink() {
* @return {@code null} if no checkpoint sink has been defined,
* otherwise the sink provided through {@link #checkpoint(DataSink)}
*/
+ @Nullable
default DataSink getCheckpointSink() {
return null;
}
diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/InputDataset.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/InputDataset.java
index 0ab13bb80ac27..4a777ecd2acf2 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/InputDataset.java
+++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/InputDataset.java
@@ -15,11 +15,12 @@
*/
package cz.seznam.euphoria.core.client.dataset;
-import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.Operator;
+
+import javax.annotation.Nullable;
import java.util.Collection;
/**
@@ -37,11 +38,13 @@ public InputDataset(Flow flow, DataSource source, boolean bounded) {
this.bounded = bounded;
}
+ @Nullable
@Override
public DataSource getSource() {
return source;
}
+ @Nullable
@Override
public Operator, T> getProducer() {
return null;
diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/OutputDataset.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/OutputDataset.java
index c287e73c08b83..612881431e39c 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/OutputDataset.java
+++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/OutputDataset.java
@@ -19,6 +19,8 @@
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.Operator;
+
+import javax.annotation.Nullable;
import java.util.Collection;
/**
@@ -39,11 +41,13 @@ public OutputDataset(Flow flow, Operator, T> producer, boolean bounded) {
this.bounded = bounded;
}
+ @Nullable
@Override
public DataSource getSource() {
return null;
}
+ @Nullable
@Override
public Operator, T> getProducer() {
return producer;
@@ -59,11 +63,13 @@ public void checkpoint(DataSink sink) {
checkpointSink = sink;
}
+ @Nullable
@Override
public DataSink getOutputSink() {
return outputSink;
}
+ @Nullable
@Override
public DataSink getCheckpointSink() {
return checkpointSink;
diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/Session.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/Session.java
index 9b519b42a5ae5..e2df7054c05d2 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/Session.java
+++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/Session.java
@@ -22,6 +22,7 @@
import cz.seznam.euphoria.core.client.triggers.Trigger;
import cz.seznam.euphoria.core.client.util.Pair;
+import javax.annotation.Nullable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -38,6 +39,7 @@
public final class Session implements MergingWindowing {
private final long gapDurationMillis;
+ @Nullable
private Duration earlyTriggeringPeriod;
public static Session of(Duration gapDuration) {
diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/Time.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/Time.java
index 4266d1f53e0b4..6633f720dc01a 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/Time.java
+++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/Time.java
@@ -21,6 +21,7 @@
import cz.seznam.euphoria.core.client.triggers.TimeTrigger;
import cz.seznam.euphoria.core.client.triggers.Trigger;
+import javax.annotation.Nullable;
import java.time.Duration;
import java.util.Arrays;
import java.util.Objects;
@@ -34,6 +35,7 @@
public class Time implements Windowing {
private final long durationMillis;
+ @Nullable
private Duration earlyTriggeringPeriod;
public static Time of(Duration duration) {
@@ -79,6 +81,7 @@ public Trigger getTrigger() {
return new TimeTrigger();
}
+ @Nullable
public Duration getEarlyTriggeringPeriod() {
return earlyTriggeringPeriod;
}
diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Flow.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Flow.java
index 7646480a08578..7f2addc0b8d2d 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Flow.java
+++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Flow.java
@@ -25,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
@@ -88,7 +89,7 @@ public class Flow implements Serializable {
= new HashMap<>();
- private Flow(String name, Settings settings) {
+ private Flow(@Nullable String name, Settings settings) {
this.name = name == null ? "" : name;
this.settings = cloneSettings(settings);
}
@@ -110,7 +111,7 @@ public static Flow create() {
*
* @return a newly created flow
*/
- public static Flow create(String flowName) {
+ public static Flow create(@Nullable String flowName) {
return new Flow(flowName, new Settings());
}
@@ -157,7 +158,7 @@ public > T add(T operator) {
*
* @return the added operator
*/
- > T add(T operator, String logicalName) {
+ > T add(T operator, @Nullable String logicalName) {
operatorNames.put(operator, buildOperatorName(operator, logicalName));
operators.add(operator);
@@ -211,7 +212,7 @@ private void validateSerializable(Operator o) {
}
}
- private String buildOperatorName(Operator op, String logicalName) {
+ private String buildOperatorName(Operator op, @Nullable String logicalName) {
StringBuilder sb = new StringBuilder(64);
sb.append(op.getName()).append('@').append(operatorNames.size() + 1);
logicalName = Util.trimToNull(logicalName);
diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Util.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Util.java
index a9f0a432ebdba..f493a277b2922 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Util.java
+++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Util.java
@@ -15,6 +15,8 @@
*/
package cz.seznam.euphoria.core.client.flow;
+import javax.annotation.Nullable;
+
class Util {
/**
@@ -23,7 +25,8 @@ class Util {
* @param s input string
* @return non-empty trimmed string or null
*/
- static String trimToNull(String s) {
+ @Nullable
+ static String trimToNull(@Nullable String s) {
if (s == null) {
return null;
}
diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/graph/Node.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/graph/Node.java
index 6398108f9c079..ffab7afcfa642 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/graph/Node.java
+++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/graph/Node.java
@@ -15,6 +15,7 @@
*/
package cz.seznam.euphoria.core.client.graph;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -26,17 +27,18 @@
public final class Node {
final List> children = new ArrayList<>();
+ @Nullable
final T value;
final List> parents = new ArrayList<>();
@SuppressWarnings("unchecked")
private static final Node NULL_NODE = new Node(null);
- Node(T value) {
+ Node(@Nullable T value) {
this.value = value;
}
- Node(T value, List> parents) {
+ Node(@Nullable T value, List> parents) {
this(value);
this.parents.addAll(parents);
}
diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java
index 64604c78a5294..31b045e9db5b0 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java
+++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java
@@ -26,6 +26,7 @@
import cz.seznam.euphoria.core.client.graph.DAG;
import cz.seznam.euphoria.core.client.util.Pair;
+import javax.annotation.Nullable;
import java.util.Objects;
/**
@@ -97,15 +98,17 @@ public static class OutputBuilder
private final String name;
private final Dataset input;
private final UnaryFunction keyExtractor;
+ @Nullable
private final Windowing windowing;
+ @Nullable
private final UnaryFunction eventTimeAssigner;
OutputBuilder(String name,
Dataset input,
UnaryFunction keyExtractor,
- Windowing windowing /* optional */,
- UnaryFunction eventTimeAssigner /* optional */,
+ @Nullable Windowing windowing,
+ @Nullable UnaryFunction eventTimeAssigner,
PartitioningBuilder partitioning) {
//initialize partitioning
@@ -137,12 +140,12 @@ public static OfBuilder named(String name) {
}
CountByKey(String name,
- Flow flow,
- Dataset input,
- UnaryFunction extractor,
- Windowing windowing /* optional */,
- UnaryFunction eventTimeAssigner /* optional */,
- Partitioning partitioning) {
+ Flow flow,
+ Dataset input,
+ UnaryFunction extractor,
+ @Nullable Windowing windowing,
+ @Nullable UnaryFunction eventTimeAssigner,
+ Partitioning partitioning) {
super(name, flow, input, extractor, windowing, eventTimeAssigner, partitioning);
}
diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java
index ae2b98877dd4d..e36d4e66ab8ca 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java
+++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java
@@ -27,6 +27,7 @@
import cz.seznam.euphoria.core.client.graph.DAG;
import cz.seznam.euphoria.core.client.util.Pair;
+import javax.annotation.Nullable;
import java.util.Objects;
/**
@@ -64,11 +65,13 @@ public static class WindowingBuilder
{
private final String name;
private final Dataset input;
+ @Nullable
private final UnaryFunction mapper;
+
WindowingBuilder(
String name,
Dataset input,
- UnaryFunction mapper /* optional */) {
+ @Nullable UnaryFunction mapper) {
// define default partitioning
super(new DefaultPartitioning<>(input.getNumPartitions()));
@@ -100,16 +103,19 @@ public static class OutputBuilder
{
private final String name;
private final Dataset input;
+ @Nullable
private final UnaryFunction mapper;
+ @Nullable
private final Windowing windowing;
+ @Nullable
private final UnaryFunction eventTimeAssigner;
OutputBuilder(String name,
Dataset input,
- UnaryFunction mapper /* optional */,
+ @Nullable UnaryFunction mapper,
PartitioningBuilder partitioning,
- Windowing windowing /* optional */,
- UnaryFunction eventTimeAssigner /* optional */) {
+ @Nullable Windowing windowing,
+ @Nullable UnaryFunction eventTimeAssigner) {
super(partitioning);
this.name = Objects.requireNonNull(name);
@@ -143,8 +149,8 @@ public static OfBuilder named(String name) {
Dataset input,
UnaryFunction mapper,
Partitioning partitioning,
- Windowing windowing /* optional */,
- UnaryFunction eventTimeAssigner /* optional */) {
+ @Nullable Windowing windowing,
+ @Nullable UnaryFunction eventTimeAssigner) {
super(name, flow, input, mapper, windowing, eventTimeAssigner, partitioning);
}
diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java
index 885b151b18314..0bb3e7832c0d2 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java
+++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java
@@ -33,6 +33,7 @@
import cz.seznam.euphoria.core.client.util.Either;
import cz.seznam.euphoria.core.client.util.Pair;
+import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
@@ -203,7 +204,9 @@ public static class OutputBuilder<
private final UnaryFunction rightKeyExtractor;
private final BinaryFunctor joinFunc;
private final boolean outer;
+ @Nullable
private final Windowing, W> windowing;
+ @Nullable
private final UnaryFunction, Long> eventTimeAssigner;
OutputBuilder(String name,
@@ -214,8 +217,8 @@ public static class OutputBuilder<
BinaryFunctor joinFunc,
boolean outer,
PartitioningBuilder partitioning,
- Windowing, W> windowing /* optional */,
- UnaryFunction, Long> eventTimeAssigner /* optional */) {
+ @Nullable Windowing, W> windowing,
+ @Nullable UnaryFunction, Long> eventTimeAssigner) {
super(partitioning);
@@ -262,15 +265,15 @@ public static OfBuilder named(String name) {
boolean outer = false;
Join(String name,
- Flow flow,
- Dataset left, Dataset right,
- Windowing, W> windowing /* optional */,
- UnaryFunction, Long> eventTimeAssigner /* optional */,
- Partitioning partitioning,
- UnaryFunction leftKeyExtractor,
- UnaryFunction rightKeyExtractor,
- BinaryFunctor functor,
- boolean outer) {
+ Flow flow,
+ Dataset left, Dataset right,
+ @Nullable Windowing, W> windowing,
+ @Nullable UnaryFunction, Long> eventTimeAssigner,
+ Partitioning partitioning,
+ UnaryFunction leftKeyExtractor,
+ UnaryFunction rightKeyExtractor,
+ BinaryFunctor functor,
+ boolean outer) {
super(name, flow, windowing, eventTimeAssigner, (Either elem) -> {
if (elem.isLeft()) {
diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/PartitioningAware.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/PartitioningAware.java
index 928d64f3506d0..9b1c70743ee64 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/PartitioningAware.java
+++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/PartitioningAware.java
@@ -18,6 +18,7 @@
import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioner;
import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning;
+import javax.annotation.Nullable;
import java.util.Objects;
/**
@@ -30,6 +31,7 @@ public interface PartitioningAware {
abstract class PartitioningBuilder implements OptionalMethodBuilder {
private final DefaultPartitioning defaultPartitioning;
+ @Nullable
private Partitioning partitioning;
public PartitioningBuilder(DefaultPartitioning defaultPartitioning) {
@@ -37,8 +39,7 @@ public PartitioningBuilder(DefaultPartitioning defaultPartitioning) {
}
public PartitioningBuilder(DefaultPartitioning defaultPartitioning,
- Partitioning partitioning)
- {
+ @Nullable Partitioning partitioning) {
this.defaultPartitioning = Objects.requireNonNull(defaultPartitioning);
this.partitioning = partitioning;
}
diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java
index 2312f55dcbda4..6cadb6f318429 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java
+++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java
@@ -33,6 +33,7 @@
import cz.seznam.euphoria.core.client.operator.state.StorageProvider;
import cz.seznam.euphoria.core.client.util.Pair;
+import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.Objects;
@@ -181,7 +182,9 @@ public static class DatasetBuilder5<
private final UnaryFunction keyExtractor;
private final UnaryFunction valueExtractor;
private final ReduceFunction reducer;
+ @Nullable
private final Windowing windowing;
+ @Nullable
private final UnaryFunction eventTimeAssigner;
DatasetBuilder5(String name,
@@ -189,8 +192,8 @@ public static class DatasetBuilder5<
UnaryFunction keyExtractor,
UnaryFunction valueExtractor,
ReduceFunction reducer,
- Windowing windowing /* optional */,
- UnaryFunction eventTimeAssigner /* optional */,
+ @Nullable Windowing windowing,
+ @Nullable UnaryFunction eventTimeAssigner,
PartitioningBuilder partitioning) {
// initialize default partitioning according to input
@@ -233,8 +236,8 @@ public static OfBuilder named(String name) {
Dataset input,
UnaryFunction keyExtractor,
UnaryFunction valueExtractor,
- Windowing windowing /* optional */,
- UnaryFunction eventTimeAssigner /* optional */,
+ @Nullable Windowing windowing,
+ @Nullable UnaryFunction eventTimeAssigner,
ReduceFunction reducer,
Partitioning partitioning) {
super(name, flow, input, keyExtractor, windowing, eventTimeAssigner, partitioning);
diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java
index 674229d96d158..4bc27f140b343 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java
+++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java
@@ -28,6 +28,7 @@
import cz.seznam.euphoria.core.client.operator.state.State;
import cz.seznam.euphoria.core.client.util.Pair;
+import javax.annotation.Nullable;
import java.util.Objects;
/**
@@ -155,8 +156,7 @@ public static class DatasetBuilder5<
UnaryFunction keyExtractor,
UnaryFunction valueExtractor,
StateFactory stateFactory,
- CombinableReduceFunction stateCombiner)
- {
+ CombinableReduceFunction stateCombiner) {
// initialize default partitioning according to input
super(new DefaultPartitioning<>(input.getNumPartitions()));
@@ -175,8 +175,7 @@ public static class DatasetBuilder5<
}
public
DatasetBuilder6
- windowBy(Windowing windowing, UnaryFunction eventTimeAssigner)
- {
+ windowBy(Windowing windowing, UnaryFunction eventTimeAssigner) {
return new DatasetBuilder6<>(name, input, keyExtractor, valueExtractor,
stateFactory, stateCombiner,
Objects.requireNonNull(windowing), eventTimeAssigner, this);
@@ -194,15 +193,17 @@ public static class DatasetBuilder6<
W extends Window>
extends PartitioningBuilder<
KEY,DatasetBuilder6>
- implements OutputBuilder>
- {
+ implements OutputBuilder> {
+
private final String name;
private final Dataset input;
private final UnaryFunction keyExtractor;
private final UnaryFunction valueExtractor;
private final StateFactory stateFactory;
private final CombinableReduceFunction stateCombiner;
+ @Nullable
private final Windowing windowing;
+ @Nullable
private final UnaryFunction eventTimeAssigner;
DatasetBuilder6(String name,
@@ -211,8 +212,8 @@ public static class DatasetBuilder6<
UnaryFunction valueExtractor,
StateFactory stateFactory,
CombinableReduceFunction stateCombiner,
- Windowing windowing /* optional */,
- UnaryFunction eventTimeAssigner /* optional*/,
+ @Nullable Windowing windowing,
+ @Nullable UnaryFunction eventTimeAssigner,
PartitioningBuilder partitioning)
{
// initialize partitioning
@@ -261,8 +262,8 @@ public static OfBuilder named(String name) {
Dataset input,
UnaryFunction keyExtractor,
UnaryFunction valueExtractor,
- Windowing windowing /* optional */,
- UnaryFunction eventTimeAssigner /* optional */,
+ @Nullable Windowing windowing,
+ @Nullable UnaryFunction eventTimeAssigner,
StateFactory stateFactory,
CombinableReduceFunction stateCombiner,
Partitioning partitioning)
diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java
index 2d572c5b106dd..2814bd81fc1f6 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java
+++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java
@@ -29,6 +29,8 @@
import cz.seznam.euphoria.core.client.graph.DAG;
import cz.seznam.euphoria.core.client.util.Pair;
+import javax.annotation.Nullable;
+
/**
* Reduce all elements in window.
*/
@@ -158,14 +160,14 @@ public static OfBuilder named(String name) {
static final Byte B_ZERO = (byte) 0;
private ReduceWindow(
- String name,
- Flow flow,
- Dataset input,
- UnaryFunction valueExtractor,
- Windowing windowing /* optional */,
- UnaryFunction eventTimeAssigner /* optional */,
- ReduceFunction reducer,
- int numPartitions) {
+ String name,
+ Flow flow,
+ Dataset input,
+ UnaryFunction valueExtractor,
+ @Nullable Windowing windowing,
+ @Nullable UnaryFunction eventTimeAssigner,
+ ReduceFunction reducer,
+ int numPartitions) {
super(name, flow, input, e -> B_ZERO, windowing, eventTimeAssigner,
new Partitioning() {
diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SingleInputOperator.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SingleInputOperator.java
index e6c20dc305398..c32ae78b7b8bb 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SingleInputOperator.java
+++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SingleInputOperator.java
@@ -18,8 +18,8 @@
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.flow.Flow;
-import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
/**
* Operator with single input.
@@ -41,7 +41,7 @@ public Dataset input() {
/** @return all of this operator's input as single element collection */
@Override
public Collection> listInputs() {
- return Arrays.asList(input);
+ return Collections.singletonList(input);
}
diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseOperator.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseOperator.java
index 891b20b5ee7e1..61d7e0ebc1bfc 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseOperator.java
+++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseOperator.java
@@ -22,6 +22,8 @@
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
+import javax.annotation.Nullable;
+
/**
* Operator with internal state.
*/
@@ -38,8 +40,8 @@ public abstract class StateAwareWindowWiseOperator<
protected StateAwareWindowWiseOperator(
String name,
Flow flow,
- Windowing windowing /* optional */,
- UnaryFunction eventTimeAssigner /* optional */,
+ @Nullable Windowing windowing,
+ @Nullable UnaryFunction eventTimeAssigner,
UnaryFunction keyExtractor,
Partitioning partitioning) {
diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java
index 1785a70f5845a..9afc42e982855 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java
+++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java
@@ -22,6 +22,7 @@
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
+import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
@@ -40,8 +41,8 @@ protected StateAwareWindowWiseSingleInputOperator(
String name,
Flow flow, Dataset input,
UnaryFunction extractor,
- Windowing windowing /* optional */,
- UnaryFunction eventTimeAssigner /* optional */,
+ @Nullable Windowing windowing,
+ @Nullable UnaryFunction eventTimeAssigner,
Partitioning partitioning) {
super(name, flow, windowing, eventTimeAssigner, extractor, partitioning);
diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java
index 698a96370b7f4..b96b9f441fce0 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java
+++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java
@@ -27,6 +27,7 @@
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.client.util.Sums;
+import javax.annotation.Nullable;
import java.util.Objects;
/**
@@ -109,16 +110,18 @@ public static class OutputBuilder
private final Dataset input;
private final UnaryFunction keyExtractor;
private final UnaryFunction valueExtractor;
- private final Windowing windowing; /* optional */
- private final UnaryFunction eventTimeAssigner; /* optional */
+ @Nullable
+ private final Windowing windowing;
+ @Nullable
+ private final UnaryFunction eventTimeAssigner;
+
OutputBuilder(String name,
Dataset input,
UnaryFunction keyExtractor,
UnaryFunction valueExtractor,
- Windowing windowing /* optional */,
- UnaryFunction eventTimeAssigner /* optional */,
- PartitioningBuilder partitioning)
- {
+ @Nullable Windowing windowing,
+ @Nullable UnaryFunction eventTimeAssigner,
+ PartitioningBuilder partitioning) {
super(partitioning);
this.name = Objects.requireNonNull(name);
@@ -154,8 +157,8 @@ public static OfBuilder named(String name) {
Dataset input,
UnaryFunction keyExtractor,
UnaryFunction valueExtractor,
- Windowing windowing /* optional */,
- UnaryFunction eventTimeAssigner /* optional */,
+ @Nullable Windowing windowing,
+ @Nullable UnaryFunction eventTimeAssigner,
Partitioning partitioning)
{
super(name, flow, input, keyExtractor, windowing, eventTimeAssigner, partitioning);
diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java
index c0bcbec4b6e58..8ce57b8e7d1ff 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java
+++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java
@@ -33,6 +33,7 @@
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.client.util.Triple;
+import javax.annotation.Nullable;
import java.util.Iterator;
import static java.util.Objects.requireNonNull;
@@ -209,7 +210,9 @@ public static class OutputBuilder<
private final UnaryFunction keyFn;
private final UnaryFunction valueFn;
private final UnaryFunction scoreFn;
+ @Nullable
private final Windowing windowing;
+ @Nullable
private final UnaryFunction eventTimeAssigner;
OutputBuilder(String name,
@@ -218,8 +221,8 @@ public static class OutputBuilder<
UnaryFunction valueFn,
UnaryFunction scoreFn,
PartitioningBuilder partitioning,
- Windowing windowing /* optional */,
- UnaryFunction eventTimeAssigner /*optional */) {
+ @Nullable Windowing windowing,
+ @Nullable UnaryFunction eventTimeAssigner) {
super(partitioning);
@@ -263,8 +266,8 @@ public static OfBuilder named(String name) {
UnaryFunction valueFn,
UnaryFunction scoreFn,
Partitioning partitioning,
- Windowing windowing /* optional */,
- UnaryFunction eventTimeAssigner /*optional */) {
+ @Nullable Windowing windowing,
+ @Nullable UnaryFunction eventTimeAssigner) {
super(name, flow, input, keyFn, windowing, eventTimeAssigner, partitioning);
this.valueFn = valueFn;
diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/WindowWiseOperator.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/WindowWiseOperator.java
index a35265b6a1cfc..d47e6674856dc 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/WindowWiseOperator.java
+++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/WindowWiseOperator.java
@@ -20,6 +20,8 @@
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
+import javax.annotation.Nullable;
+
/**
* Operator working on some context.
*/
@@ -27,24 +29,27 @@ public abstract class WindowWiseOperator<
IN, WIN, OUT, W extends Window>
extends Operator implements WindowAware {
+ @Nullable
protected Windowing windowing;
+ @Nullable
protected UnaryFunction eventTimeAssigner;
public WindowWiseOperator(String name,
Flow flow,
- Windowing windowing /* optional */,
- UnaryFunction eventTimeAssigner /* optional */)
- {
+ @Nullable Windowing windowing,
+ @Nullable UnaryFunction eventTimeAssigner) {
super(name, flow);
this.windowing = windowing;
this.eventTimeAssigner = eventTimeAssigner;
}
+ @Nullable
@Override
public Windowing getWindowing() {
return windowing;
}
+ @Nullable
@Override
public UnaryFunction