Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-5123] Testsuite do not run all the tests it should #18

Merged
merged 7 commits into from
Aug 21, 2018
4 changes: 1 addition & 3 deletions sdks/java/extensions/euphoria/euphoria-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
*/

apply plugin: org.apache.beam.gradle.BeamModulePlugin
//applyJavaNature(enableFindbugs: false, failOnWarning: false) //TODO remove
applyJavaNature(enableFindbugs: false)
applyJavaNature()

description = "Apache Beam :: SDKs :: Java :: Extensions :: Euphoria Java 8 DSL :: Core"

Expand All @@ -30,7 +29,6 @@ dependencies {
shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
compile "com.esotericsoftware:kryo:${kryoVersion}"
shadow library.java.guava
//compileOnly library.java.findbugs_jsr305 //TODO remove if not needed
testCompile library.java.mockito_core
testCompile project(path: ':beam-sdks-java-extensions-euphoria-testing')
shadow 'com.google.code.findbugs:annotations:3.0.+'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.euphoria.core.client.io;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -40,7 +41,10 @@ public class ListDataSink<T> implements DataSink<T> {
Collections.synchronizedMap(new WeakHashMap<>());

private final int sinkId = System.identityHashCode(this);

@SuppressFBWarnings("SE_BAD_FIELD")
private final List<ListWriter> writers = Collections.synchronizedList(new ArrayList<>());

@Nullable private Consumer<Dataset<T>> prepareDataset = null;

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class Operators {
* implementation.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public static final Set<Class<? extends Operator<?, ?>>> BASIC_OPS =
static final Set<Class<? extends Operator<?, ?>>> BASIC_OPS =
(Set) Sets.newHashSet(FlatMap.class, ReduceStateByKey.class, Union.class);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,59 +154,31 @@ public TypeDescriptor<V> getValueType() {
// implement this operator via `ReduceByKey`
final ReduceByKey rbk;
final DAG<Operator<?, ?>> dag = DAG.empty();
if (windowing != null) {
rbk =
new ReduceByKey<InputT, Byte, V, OutputT, W>(
getName() + "::ReduceByKey",
getFlow(),
input,
getKeyExtractor(),
getKeyType(),
valueExtractor,
valueType,
windowing,
euphoriaWindowing,
reducer,
valueComparator,
getHints(),
TypeUtils.keyValues(getKeyType(), outputType));
dag.add(rbk);
} else {
// otherwise we use attached windowing, therefore
// we already know the window lables and can do group-by these
// labels to increase parallelism
FlatMap<InputT, KV<Window<?>, InputT>> map =
new FlatMap<>(
getName() + "::window-to-key",
getFlow(),
input,
(InputT in, Collector<KV<Window<?>, InputT>> c) -> {
c.collect(KV.of(c.getWindow(), in));
},
null,
null);
rbk =
new ReduceByKey<KV<Window<?>, InputT>, Window<?>, V, OutputT, W>(
getName() + "::ReduceByKey::attached",
getFlow(),
map.output(),
(KV<Window<?>, InputT> p) -> p.getKey(),
null,
p -> valueExtractor.apply(p.getValue()),
valueType,
null,
null,
reducer,
valueComparator,
getHints(),
null);
dag.add(map);
dag.add(rbk);
}

rbk =
new ReduceByKey<InputT, Byte, V, OutputT, W>(
getName() + "::ReduceByKey",
getFlow(),
input,
getKeyExtractor(),
getKeyType(),
valueExtractor,
valueType,
windowing,
euphoriaWindowing,
reducer,
valueComparator,
getHints(),
TypeUtils.keyValues(getKeyType(), outputType));
dag.add(rbk);

MapElements<KV<Object, OutputT>, OutputT> format =
new MapElements<KV<Object, OutputT>, OutputT>(
getName() + "::MapElements", getFlow(), (Dataset) rbk.output(), KV::getValue, null);
getName() + "::MapElements",
getFlow(),
(Dataset) rbk.output(),
KV::getValue,
outputType);

dag.add(format);
return dag;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class Fold implements Serializable {
* fold of all input data.
*
* @param <T> element type
* @param fold the fold function
* @param fold the associative fold function
* @return the {@link CombinableReduceFunction}
*/
public static <T> CombinableReduceFunction<T> of(BinaryFunction<T, T, T> fold) {
Expand All @@ -49,7 +49,7 @@ public static <T> CombinableReduceFunction<T> of(BinaryFunction<T, T, T> fold) {
*
* @param <T> element type
* @param identity the zero element
* @param fold the fold function
* @param fold the associative fold function
* @return the {@link CombinableReduceFunction}
*/
public static <T> CombinableReduceFunction<T> of(T identity, BinaryFunction<T, T, T> fold) {
Expand All @@ -63,7 +63,7 @@ public static <T> CombinableReduceFunction<T> of(T identity, BinaryFunction<T, T
*
* @param <T> element type
* @param identity the zero element
* @param fold the fold function
* @param fold the associative fold function
* @return the {@link CombinableReduceFunction}
*/
public static <T> ReduceFunctor<T, T> of(T identity, BinaryFunctor<T, T, T> fold) {
Expand All @@ -88,7 +88,7 @@ public static <T> ReduceFunctor<T, T> of(T identity, BinaryFunctor<T, T, T> fold
* @param <InputT> type of input value
* @param <OutputT> type of output value
* @param identity the zero element
* @param fold the fold function
* @param fold the associative fold function
* @return the {@link ReduceFunctor}
*/
@SuppressWarnings("ReturnValueIgnored") // TODO : remove
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import org.apache.beam.sdk.extensions.euphoria.core.client.io.DataSink;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.WindowWiseOperator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.windowing.WindowingDesc;
Expand All @@ -43,11 +42,9 @@ class FlowValidator {
*
* @param dag the user defined flow as a DAG
* @return the input dag if the validation succeeds
* @throws WindowingRequiredException if the given DAG contains operators that require explicit
* windowing strategies to make meaningful executions
*/
static DAG<Operator<?, ?>> preTranslate(DAG<Operator<?, ?>> dag) {
checkJoinWindowing(dag);
// no-op left for future use
return dag;
}

Expand All @@ -61,35 +58,6 @@ class FlowValidator {
return dag;
}

/**
* Validate that join operators' windowing semantics can be meaningfully implemented. I.e. only
* instances which join "batched" windowed data sets do not need to explicitly be provided with a
* user defined windowing strategy.
*/
private static void checkJoinWindowing(DAG<Operator<?, ?>> dag) {
List<Node<Operator<?, ?>>> joins =
dag.traverse().filter(node -> node.get() instanceof Join).collect(Collectors.toList());
for (Node<Operator<?, ?>> join : joins) {
checkJoinWindowing(join);
}
}

private static void checkJoinWindowing(Node<Operator<?, ?>> node) {
checkState(node.get() instanceof Join);

// ~ if a windowing strategy is explicitly provided by the user, all is fine
if (((Join) node.get()).getWindowing() != null) {
return;
}
for (Node<Operator<?, ?>> parent : node.getParents()) {
if (!isBatched(parent)) {
throw new WindowingRequiredException(
"Join operator requires either an explicit windowing"
+ " strategy or needs to be supplied with batched inputs.");
}
}
}

private static boolean isBatched(Node<Operator<?, ?>> node) {
Operator<?, ?> operator = node.get();
if (operator instanceof FlowUnfolder.InputOperator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.euphoria.core.executor.io;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand All @@ -38,11 +39,12 @@
* @param <T> the type of elements stored in this list storage
*/
@Audience(Audience.Type.EXECUTOR)
@SuppressFBWarnings("OBL_UNSATISFIED_OBLIGATION")
public class FsSpillingListStorage<T> implements ListStorage<T>, ExternalIterable<T> {

private final SerializerFactory serializerFactory;
private final SpillFileFactory spillFileFactory;
private final int maxElemsInMemory;
private final int maxElementsInMemory;

// ~ new elements are appended to this list and eventually this list
// will be spilled to disk. therefore, this list contains the lastly
Expand All @@ -57,17 +59,17 @@ public class FsSpillingListStorage<T> implements ListStorage<T>, ExternalIterabl
public FsSpillingListStorage(
SerializerFactory serializerFactory,
SpillFileFactory spillFileFactory,
int maxElemsInMemory) {
int maxElementsInMemory) {

this.serializerFactory = Objects.requireNonNull(serializerFactory);
this.spillFileFactory = Objects.requireNonNull(spillFileFactory);
this.maxElemsInMemory = maxElemsInMemory;
this.maxElementsInMemory = maxElementsInMemory;
}

@Override
public void add(T element) {
elems.add(element);
if (elems.size() > maxElemsInMemory) {
if (elems.size() > maxElementsInMemory) {
spillElems();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ private static SpillFileFactory spillFactory(Settings settings) {
+ " settings");
}
} else {
tmpDir.mkdirs();
if (!tmpDir.mkdirs()) {
throw new IllegalStateException("Unable to create '" + tmpDir + "' directory.");
}
}
return () ->
new File(tmpDir, String.format("euphoria-spill-%s.bin", UUID.randomUUID().toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public static Factory getFactory() {
public Counter getCounter(String name) {
throw new UnsupportedOperationException(
"BeamAccumulatorProvider doesn't support"
+ " getCounter(String name). Need specify operatorName and name");
+ " getCounter(String name). Please specify namespace and name.");
}

@Override
Expand All @@ -66,7 +66,7 @@ public Counter getCounter(final String namespace, final String name) {
public Histogram getHistogram(String name) {
throw new UnsupportedOperationException(
"BeamAccumulatorProvider doesn't support"
+ " getHistogram(String name). Need specify namespace and name");
+ " getHistogram(String name). Please specify namespace and name.");
}

@Override
Expand All @@ -79,7 +79,7 @@ public Histogram getHistogram(final String namespace, final String name) {
public Timer getTimer(String name) {
throw new UnsupportedOperationException(
"BeamAccumulatorProvider doesn't support"
+ " getTimer(String name). Need specify namespace and name");
+ " getTimer(String name). Please specify namespace and name.");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
Expand All @@ -34,7 +35,6 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.io.DataSource;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator;
import org.apache.beam.sdk.extensions.euphoria.core.executor.graph.DAG;
import org.apache.beam.sdk.extensions.euphoria.core.translate.coder.EuphoriaCoderProvider;
import org.apache.beam.sdk.extensions.euphoria.core.translate.io.BeamWriteSink;
import org.apache.beam.sdk.extensions.euphoria.core.util.Settings;
import org.apache.beam.sdk.options.PipelineOptions;
Expand All @@ -47,7 +47,9 @@
*/
public class BeamFlow extends Flow {

@SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED")
private final transient Map<PCollection<?>, Dataset<?>> wrapped = new HashMap<>();

private final transient TranslationContext context;
private final transient Pipeline pipeline;
private Duration allowedLateness = Duration.ZERO;
Expand Down Expand Up @@ -143,8 +145,8 @@ public static BeamFlow of(String name, Pipeline pipeline, boolean allowKryoCoder
}

/**
* Create flow from input {@link PCollection} which is {@linkplain
* PCollection#isFinishedSpecifying() finished specifying}.
* Create flow from input {@link PCollection} which is
* {@linkplain PCollection#isFinishedSpecifying() finished specifying}.
*
* @param pCollection the input {@link PCollection} to wrap into new flow
* @param <T> type of {@link PCollection} element
Expand Down Expand Up @@ -178,10 +180,11 @@ public <T> Dataset<T> createInput(DataSource<T> source) {
/**
* Registers the provided {@link Coder} for the given class.
*
* <p>Consider using {@link
* org.apache.beam.sdk.extensions.euphoria.core.translate.coder.RegisterCoders} when registering
* more coders at once.
*
* <p>
* Consider using
* {@link org.apache.beam.sdk.extensions.euphoria.core.translate.coder.RegisterCoders}
* when registering more coders at once.
* </p>
* @param clazz class of element to be encoded/decoded
* @param coder coder to encode/decode instances of given class
* @param <T> type parameter of the class to be encoded
Expand All @@ -193,10 +196,11 @@ public <T> void registerCoder(Class<T> clazz, Coder<T> coder) {
/**
* Registers the provided {@link Coder} for the given type.
*
* <p>Consider using {@link
* org.apache.beam.sdk.extensions.euphoria.core.translate.coder.RegisterCoders} when registering
* more coders at once.
*
* <p>
* Consider using
* {@link org.apache.beam.sdk.extensions.euphoria.core.translate.coder.RegisterCoders}
* when registering more coders at once.
* </p>
* @param typeDescriptor type of element to be encoded/decoded
* @param coder coder to encode/decode instances of given class
* @param <T> type parameter of the class to be encoded
Expand All @@ -208,9 +212,11 @@ public <T> void registerCoder(TypeDescriptor<T> typeDescriptor, Coder<T> coder)
/**
* Registers the provided {@link Coder} for the given class.
*
* <p>Consider using {@link
* org.apache.beam.sdk.extensions.euphoria.core.translate.coder.RegisterCoders} when registering
* more coders at once.
* <p>
* Consider using
* {@link org.apache.beam.sdk.extensions.euphoria.core.translate.coder.RegisterCoders}
* when registering more coders at once.
* </p>
*
* @param coderProvider
*/
Expand Down
Loading