Skip to content

Commit

Permalink
apache#18 Use @nullable annotation
Browse files Browse the repository at this point in the history
  • Loading branch information
vanekjar authored and David Moravek committed May 15, 2018
1 parent d76fea1 commit cf430c9
Show file tree
Hide file tree
Showing 44 changed files with 230 additions and 98 deletions.
5 changes: 5 additions & 0 deletions sdks/java/extensions/euphoria/euphoria-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.Operator;

import javax.annotation.Nullable;
import java.io.Serializable;
import java.net.URI;
import java.util.Collection;
Expand All @@ -44,11 +45,13 @@ public interface Dataset<T> extends Serializable {
*
* @return this dataset's explicit source - if any
*/
@Nullable
DataSource<T> getSource();

/**
* @return the operator that produced this dataset - if any
*/
@Nullable
Operator<?, T> getProducer();

/**
Expand Down Expand Up @@ -105,6 +108,7 @@ default void checkpoint(URI uri) throws Exception {
* data set is supposed to be persisted to, otherwise the
* sink provided through {@link #persist(DataSink)}.
*/
@Nullable
default DataSink<T> getOutputSink() {
return null;
}
Expand All @@ -115,6 +119,7 @@ default DataSink<T> getOutputSink() {
* @return {@code null} if no checkpoint sink has been defined,
* otherwise the sink provided through {@link #checkpoint(DataSink)}
*/
@Nullable
default DataSink<T> getCheckpointSink() {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
*/
package cz.seznam.euphoria.core.client.dataset;

import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.Operator;

import javax.annotation.Nullable;
import java.util.Collection;

/**
Expand All @@ -37,11 +38,13 @@ public InputDataset(Flow flow, DataSource<T> source, boolean bounded) {
this.bounded = bounded;
}

@Nullable
@Override
public DataSource<T> getSource() {
return source;
}

@Nullable
@Override
public Operator<?, T> getProducer() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.Operator;

import javax.annotation.Nullable;
import java.util.Collection;

/**
Expand All @@ -39,11 +41,13 @@ public OutputDataset(Flow flow, Operator<?, T> producer, boolean bounded) {
this.bounded = bounded;
}

@Nullable
@Override
public DataSource<T> getSource() {
return null;
}

@Nullable
@Override
public Operator<?, T> getProducer() {
return producer;
Expand All @@ -59,11 +63,13 @@ public void checkpoint(DataSink<T> sink) {
checkpointSink = sink;
}

@Nullable
@Override
public DataSink<T> getOutputSink() {
return outputSink;
}

@Nullable
@Override
public DataSink<T> getCheckpointSink() {
return checkpointSink;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import cz.seznam.euphoria.core.client.triggers.Trigger;
import cz.seznam.euphoria.core.client.util.Pair;

import javax.annotation.Nullable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -38,6 +39,7 @@
public final class Session<T> implements MergingWindowing<T, TimeInterval> {

private final long gapDurationMillis;
@Nullable
private Duration earlyTriggeringPeriod;

public static <T> Session<T> of(Duration gapDuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import cz.seznam.euphoria.core.client.triggers.TimeTrigger;
import cz.seznam.euphoria.core.client.triggers.Trigger;

import javax.annotation.Nullable;
import java.time.Duration;
import java.util.Arrays;
import java.util.Objects;
Expand All @@ -34,6 +35,7 @@
public class Time<T> implements Windowing<T, TimeInterval> {

private final long durationMillis;
@Nullable
private Duration earlyTriggeringPeriod;

public static <T> Time<T> of(Duration duration) {
Expand Down Expand Up @@ -79,6 +81,7 @@ public Trigger<TimeInterval> getTrigger() {
return new TimeTrigger();
}

@Nullable
public Duration getEarlyTriggeringPeriod() {
return earlyTriggeringPeriod;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -88,7 +89,7 @@ public class Flow implements Serializable {
= new HashMap<>();


private Flow(String name, Settings settings) {
private Flow(@Nullable String name, Settings settings) {
this.name = name == null ? "" : name;
this.settings = cloneSettings(settings);
}
Expand All @@ -110,7 +111,7 @@ public static Flow create() {
*
* @return a newly created flow
*/
public static Flow create(String flowName) {
public static Flow create(@Nullable String flowName) {
return new Flow(flowName, new Settings());
}

Expand Down Expand Up @@ -157,7 +158,7 @@ public <IN, OUT, T extends Operator<IN, OUT>> T add(T operator) {
*
* @return the added operator
*/
<IN, OUT, T extends Operator<IN, OUT>> T add(T operator, String logicalName) {
<IN, OUT, T extends Operator<IN, OUT>> T add(T operator, @Nullable String logicalName) {

operatorNames.put(operator, buildOperatorName(operator, logicalName));
operators.add(operator);
Expand Down Expand Up @@ -211,7 +212,7 @@ private void validateSerializable(Operator o) {
}
}

private String buildOperatorName(Operator op, String logicalName) {
private String buildOperatorName(Operator op, @Nullable String logicalName) {
StringBuilder sb = new StringBuilder(64);
sb.append(op.getName()).append('@').append(operatorNames.size() + 1);
logicalName = Util.trimToNull(logicalName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package cz.seznam.euphoria.core.client.flow;

import javax.annotation.Nullable;

class Util {

/**
Expand All @@ -23,7 +25,8 @@ class Util {
* @param s input string
* @return non-empty trimmed string or null
*/
static String trimToNull(String s) {
@Nullable
static String trimToNull(@Nullable String s) {
if (s == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package cz.seznam.euphoria.core.client.graph;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -26,17 +27,18 @@
public final class Node<T> {

final List<Node<T>> children = new ArrayList<>();
@Nullable
final T value;
final List<Node<T>> parents = new ArrayList<>();

@SuppressWarnings("unchecked")
private static final Node NULL_NODE = new Node(null);

Node(T value) {
Node(@Nullable T value) {
this.value = value;
}

Node(T value, List<Node<T>> parents) {
Node(@Nullable T value, List<Node<T>> parents) {
this(value);
this.parents.addAll(parents);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import cz.seznam.euphoria.core.client.graph.DAG;
import cz.seznam.euphoria.core.client.util.Pair;

import javax.annotation.Nullable;
import java.util.Objects;

/**
Expand Down Expand Up @@ -97,15 +98,17 @@ public static class OutputBuilder<IN, KEY, W extends Window>
private final String name;
private final Dataset<IN> input;
private final UnaryFunction<IN, KEY> keyExtractor;
@Nullable
private final Windowing<IN, W> windowing;
@Nullable
private final UnaryFunction<IN, Long> eventTimeAssigner;


OutputBuilder(String name,
Dataset<IN> input,
UnaryFunction<IN, KEY> keyExtractor,
Windowing<IN, W> windowing /* optional */,
UnaryFunction<IN, Long> eventTimeAssigner /* optional */,
@Nullable Windowing<IN, W> windowing,
@Nullable UnaryFunction<IN, Long> eventTimeAssigner,
PartitioningBuilder<KEY, ?> partitioning) {

//initialize partitioning
Expand Down Expand Up @@ -137,12 +140,12 @@ public static OfBuilder named(String name) {
}

CountByKey(String name,
Flow flow,
Dataset<IN> input,
UnaryFunction<IN, KEY> extractor,
Windowing<IN, W> windowing /* optional */,
UnaryFunction<IN, Long> eventTimeAssigner /* optional */,
Partitioning<KEY> partitioning) {
Flow flow,
Dataset<IN> input,
UnaryFunction<IN, KEY> extractor,
@Nullable Windowing<IN, W> windowing,
@Nullable UnaryFunction<IN, Long> eventTimeAssigner,
Partitioning<KEY> partitioning) {

super(name, flow, input, extractor, windowing, eventTimeAssigner, partitioning);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import cz.seznam.euphoria.core.client.graph.DAG;
import cz.seznam.euphoria.core.client.util.Pair;

import javax.annotation.Nullable;
import java.util.Objects;

/**
Expand Down Expand Up @@ -64,11 +65,13 @@ public static class WindowingBuilder<IN, ELEM>
{
private final String name;
private final Dataset<IN> input;
@Nullable
private final UnaryFunction<IN, ELEM> mapper;

WindowingBuilder(
String name,
Dataset<IN> input,
UnaryFunction<IN, ELEM> mapper /* optional */) {
@Nullable UnaryFunction<IN, ELEM> mapper) {

// define default partitioning
super(new DefaultPartitioning<>(input.getNumPartitions()));
Expand Down Expand Up @@ -100,16 +103,19 @@ public static class OutputBuilder<IN, ELEM, W extends Window>
{
private final String name;
private final Dataset<IN> input;
@Nullable
private final UnaryFunction<IN, ELEM> mapper;
@Nullable
private final Windowing<IN, W> windowing;
@Nullable
private final UnaryFunction<IN, Long> eventTimeAssigner;

OutputBuilder(String name,
Dataset<IN> input,
UnaryFunction<IN, ELEM> mapper /* optional */,
@Nullable UnaryFunction<IN, ELEM> mapper,
PartitioningBuilder<ELEM, ?> partitioning,
Windowing<IN, W> windowing /* optional */,
UnaryFunction<IN, Long> eventTimeAssigner /* optional */) {
@Nullable Windowing<IN, W> windowing,
@Nullable UnaryFunction<IN, Long> eventTimeAssigner) {

super(partitioning);
this.name = Objects.requireNonNull(name);
Expand Down Expand Up @@ -143,8 +149,8 @@ public static OfBuilder named(String name) {
Dataset<IN> input,
UnaryFunction<IN, ELEM> mapper,
Partitioning<ELEM> partitioning,
Windowing<IN, W> windowing /* optional */,
UnaryFunction<IN, Long> eventTimeAssigner /* optional */) {
@Nullable Windowing<IN, W> windowing,
@Nullable UnaryFunction<IN, Long> eventTimeAssigner) {

super(name, flow, input, mapper, windowing, eventTimeAssigner, partitioning);
}
Expand Down
Loading

0 comments on commit cf430c9

Please sign in to comment.