diff --git a/sdks/java/extensions/euphoria/euphoria-core/build.gradle b/sdks/java/extensions/euphoria/euphoria-core/build.gradle index 82599d950dac9..b930121ec6bdc 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/build.gradle +++ b/sdks/java/extensions/euphoria/euphoria-core/build.gradle @@ -17,8 +17,7 @@ */ apply plugin: org.apache.beam.gradle.BeamModulePlugin -//applyJavaNature(enableFindbugs: false, failOnWarning: false) //TODO remove -applyJavaNature(enableFindbugs: false) +applyJavaNature() description = "Apache Beam :: SDKs :: Java :: Extensions :: Euphoria Java 8 DSL :: Core" @@ -30,7 +29,6 @@ dependencies { shadow project(path: ":beam-sdks-java-core", configuration: "shadow") compile "com.esotericsoftware:kryo:${kryoVersion}" shadow library.java.guava - //compileOnly library.java.findbugs_jsr305 //TODO remove if not needed testCompile library.java.mockito_core testCompile project(path: ':beam-sdks-java-extensions-euphoria-testing') shadow 'com.google.code.findbugs:annotations:3.0.+' diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/io/ListDataSink.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/io/ListDataSink.java index 3941044cb120a..c005a4663f7ac 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/io/ListDataSink.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/io/ListDataSink.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.extensions.euphoria.core.client.io; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -40,7 +41,10 @@ public class ListDataSink implements DataSink { Collections.synchronizedMap(new WeakHashMap<>()); private final int sinkId = System.identityHashCode(this); + + @SuppressFBWarnings("SE_BAD_FIELD") private final List writers = Collections.synchronizedList(new ArrayList<>()); + @Nullable private Consumer> prepareDataset = null; @SuppressWarnings("unchecked") diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Operators.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Operators.java index 8fdd94f0f1062..566daa5cc04c7 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Operators.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Operators.java @@ -29,7 +29,7 @@ public class Operators { * implementation. */ @SuppressWarnings({"unchecked", "rawtypes"}) - public static final Set>> BASIC_OPS = + static final Set>> BASIC_OPS = (Set) Sets.newHashSet(FlatMap.class, ReduceStateByKey.class, Union.class); /** diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceWindow.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceWindow.java index ce1666fa2cf95..b7c9a7ced7c4d 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceWindow.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceWindow.java @@ -154,59 +154,31 @@ public TypeDescriptor getValueType() { // implement this operator via `ReduceByKey` final ReduceByKey rbk; final DAG> dag = DAG.empty(); - if (windowing != null) { - rbk = - new ReduceByKey( - getName() + "::ReduceByKey", - getFlow(), - input, - getKeyExtractor(), - getKeyType(), - valueExtractor, - valueType, - windowing, - euphoriaWindowing, - reducer, - valueComparator, - getHints(), - TypeUtils.keyValues(getKeyType(), outputType)); - dag.add(rbk); - } else { - // otherwise we use attached windowing, therefore - // we already know the window lables and can do group-by these - // labels to increase parallelism - FlatMap, InputT>> map = - new FlatMap<>( - getName() + "::window-to-key", - getFlow(), - input, - (InputT in, Collector, InputT>> c) -> { - c.collect(KV.of(c.getWindow(), in)); - }, - null, - null); - rbk = - new ReduceByKey, InputT>, Window, V, OutputT, W>( - getName() + "::ReduceByKey::attached", - getFlow(), - map.output(), - (KV, InputT> p) -> p.getKey(), - null, - p -> valueExtractor.apply(p.getValue()), - valueType, - null, - null, - reducer, - valueComparator, - getHints(), - null); - dag.add(map); - dag.add(rbk); - } + + rbk = + new ReduceByKey( + getName() + "::ReduceByKey", + getFlow(), + input, + getKeyExtractor(), + getKeyType(), + valueExtractor, + valueType, + windowing, + euphoriaWindowing, + reducer, + valueComparator, + getHints(), + TypeUtils.keyValues(getKeyType(), outputType)); + dag.add(rbk); MapElements, OutputT> format = new MapElements, OutputT>( - getName() + "::MapElements", getFlow(), (Dataset) rbk.output(), KV::getValue, null); + getName() + "::MapElements", + getFlow(), + (Dataset) rbk.output(), + KV::getValue, + outputType); dag.add(format); return dag; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/util/Fold.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/util/Fold.java index 68ff05153178a..a48e4f0077938 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/util/Fold.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/util/Fold.java @@ -34,7 +34,7 @@ public class Fold implements Serializable { * fold of all input data. * * @param element type - * @param fold the fold function + * @param fold the associative fold function * @return the {@link CombinableReduceFunction} */ public static CombinableReduceFunction of(BinaryFunction fold) { @@ -49,7 +49,7 @@ public static CombinableReduceFunction of(BinaryFunction fold) { * * @param element type * @param identity the zero element - * @param fold the fold function + * @param fold the associative fold function * @return the {@link CombinableReduceFunction} */ public static CombinableReduceFunction of(T identity, BinaryFunction fold) { @@ -63,7 +63,7 @@ public static CombinableReduceFunction of(T identity, BinaryFunction element type * @param identity the zero element - * @param fold the fold function + * @param fold the associative fold function * @return the {@link CombinableReduceFunction} */ public static ReduceFunctor of(T identity, BinaryFunctor fold) { @@ -88,7 +88,7 @@ public static ReduceFunctor of(T identity, BinaryFunctor fold * @param type of input value * @param type of output value * @param identity the zero element - * @param fold the fold function + * @param fold the associative fold function * @return the {@link ReduceFunctor} */ @SuppressWarnings("ReturnValueIgnored") // TODO : remove diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/executor/FlowValidator.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/executor/FlowValidator.java index 42168f3707a41..2d67708e82a63 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/executor/FlowValidator.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/executor/FlowValidator.java @@ -25,7 +25,6 @@ import java.util.stream.Collectors; import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset; import org.apache.beam.sdk.extensions.euphoria.core.client.io.DataSink; -import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join; import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator; import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.WindowWiseOperator; import org.apache.beam.sdk.extensions.euphoria.core.client.operator.windowing.WindowingDesc; @@ -43,11 +42,9 @@ class FlowValidator { * * @param dag the user defined flow as a DAG * @return the input dag if the validation succeeds - * @throws WindowingRequiredException if the given DAG contains operators that require explicit - * windowing strategies to make meaningful executions */ static DAG> preTranslate(DAG> dag) { - checkJoinWindowing(dag); + // no-op left for future use return dag; } @@ -61,35 +58,6 @@ class FlowValidator { return dag; } - /** - * Validate that join operators' windowing semantics can be meaningfully implemented. I.e. only - * instances which join "batched" windowed data sets do not need to explicitly be provided with a - * user defined windowing strategy. - */ - private static void checkJoinWindowing(DAG> dag) { - List>> joins = - dag.traverse().filter(node -> node.get() instanceof Join).collect(Collectors.toList()); - for (Node> join : joins) { - checkJoinWindowing(join); - } - } - - private static void checkJoinWindowing(Node> node) { - checkState(node.get() instanceof Join); - - // ~ if a windowing strategy is explicitly provided by the user, all is fine - if (((Join) node.get()).getWindowing() != null) { - return; - } - for (Node> parent : node.getParents()) { - if (!isBatched(parent)) { - throw new WindowingRequiredException( - "Join operator requires either an explicit windowing" - + " strategy or needs to be supplied with batched inputs."); - } - } - } - private static boolean isBatched(Node> node) { Operator operator = node.get(); if (operator instanceof FlowUnfolder.InputOperator) { diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/executor/io/FsSpillingListStorage.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/executor/io/FsSpillingListStorage.java index b5ee255088725..5c3df7fce7b52 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/executor/io/FsSpillingListStorage.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/executor/io/FsSpillingListStorage.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.extensions.euphoria.core.executor.io; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -38,11 +39,12 @@ * @param the type of elements stored in this list storage */ @Audience(Audience.Type.EXECUTOR) +@SuppressFBWarnings("OBL_UNSATISFIED_OBLIGATION") public class FsSpillingListStorage implements ListStorage, ExternalIterable { private final SerializerFactory serializerFactory; private final SpillFileFactory spillFileFactory; - private final int maxElemsInMemory; + private final int maxElementsInMemory; // ~ new elements are appended to this list and eventually this list // will be spilled to disk. therefore, this list contains the lastly @@ -57,17 +59,17 @@ public class FsSpillingListStorage implements ListStorage, ExternalIterabl public FsSpillingListStorage( SerializerFactory serializerFactory, SpillFileFactory spillFileFactory, - int maxElemsInMemory) { + int maxElementsInMemory) { this.serializerFactory = Objects.requireNonNull(serializerFactory); this.spillFileFactory = Objects.requireNonNull(spillFileFactory); - this.maxElemsInMemory = maxElemsInMemory; + this.maxElementsInMemory = maxElementsInMemory; } @Override public void add(T element) { elems.add(element); - if (elems.size() > maxElemsInMemory) { + if (elems.size() > maxElementsInMemory) { spillElems(); } } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/executor/io/GenericSpillTools.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/executor/io/GenericSpillTools.java index 92f216df89e44..6469776ac2ead 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/executor/io/GenericSpillTools.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/executor/io/GenericSpillTools.java @@ -97,7 +97,9 @@ private static SpillFileFactory spillFactory(Settings settings) { + " settings"); } } else { - tmpDir.mkdirs(); + if (!tmpDir.mkdirs()) { + throw new IllegalStateException("Unable to create '" + tmpDir + "' directory."); + } } return () -> new File(tmpDir, String.format("euphoria-spill-%s.bin", UUID.randomUUID().toString())); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamAccumulatorProvider.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamAccumulatorProvider.java index 87cf453550462..893c95b3c0cec 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamAccumulatorProvider.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamAccumulatorProvider.java @@ -53,7 +53,7 @@ public static Factory getFactory() { public Counter getCounter(String name) { throw new UnsupportedOperationException( "BeamAccumulatorProvider doesn't support" - + " getCounter(String name). Need specify operatorName and name"); + + " getCounter(String name). Please specify namespace and name."); } @Override @@ -66,7 +66,7 @@ public Counter getCounter(final String namespace, final String name) { public Histogram getHistogram(String name) { throw new UnsupportedOperationException( "BeamAccumulatorProvider doesn't support" - + " getHistogram(String name). Need specify namespace and name"); + + " getHistogram(String name). Please specify namespace and name."); } @Override @@ -79,7 +79,7 @@ public Histogram getHistogram(final String namespace, final String name) { public Timer getTimer(String name) { throw new UnsupportedOperationException( "BeamAccumulatorProvider doesn't support" - + " getTimer(String name). Need specify namespace and name"); + + " getTimer(String name). Please specify namespace and name."); } /** diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamFlow.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamFlow.java index af27ec8b65fc3..600a36f1dd30e 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamFlow.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamFlow.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.Duration; import java.util.HashMap; import java.util.List; @@ -34,7 +35,6 @@ import org.apache.beam.sdk.extensions.euphoria.core.client.io.DataSource; import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator; import org.apache.beam.sdk.extensions.euphoria.core.executor.graph.DAG; -import org.apache.beam.sdk.extensions.euphoria.core.translate.coder.EuphoriaCoderProvider; import org.apache.beam.sdk.extensions.euphoria.core.translate.io.BeamWriteSink; import org.apache.beam.sdk.extensions.euphoria.core.util.Settings; import org.apache.beam.sdk.options.PipelineOptions; @@ -47,7 +47,9 @@ */ public class BeamFlow extends Flow { + @SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED") private final transient Map, Dataset> wrapped = new HashMap<>(); + private final transient TranslationContext context; private final transient Pipeline pipeline; private Duration allowedLateness = Duration.ZERO; @@ -143,8 +145,8 @@ public static BeamFlow of(String name, Pipeline pipeline, boolean allowKryoCoder } /** - * Create flow from input {@link PCollection} which is {@linkplain - * PCollection#isFinishedSpecifying() finished specifying}. + * Create flow from input {@link PCollection} which is + * {@linkplain PCollection#isFinishedSpecifying() finished specifying}. * * @param pCollection the input {@link PCollection} to wrap into new flow * @param type of {@link PCollection} element @@ -178,10 +180,11 @@ public Dataset createInput(DataSource source) { /** * Registers the provided {@link Coder} for the given class. * - *

Consider using {@link - * org.apache.beam.sdk.extensions.euphoria.core.translate.coder.RegisterCoders} when registering - * more coders at once. - * + *

+ * Consider using + * {@link org.apache.beam.sdk.extensions.euphoria.core.translate.coder.RegisterCoders} + * when registering more coders at once. + *

* @param clazz class of element to be encoded/decoded * @param coder coder to encode/decode instances of given class * @param type parameter of the class to be encoded @@ -193,10 +196,11 @@ public void registerCoder(Class clazz, Coder coder) { /** * Registers the provided {@link Coder} for the given type. * - *

Consider using {@link - * org.apache.beam.sdk.extensions.euphoria.core.translate.coder.RegisterCoders} when registering - * more coders at once. - * + *

+ * Consider using + * {@link org.apache.beam.sdk.extensions.euphoria.core.translate.coder.RegisterCoders} + * when registering more coders at once. + *

* @param typeDescriptor type of element to be encoded/decoded * @param coder coder to encode/decode instances of given class * @param type parameter of the class to be encoded @@ -208,9 +212,11 @@ public void registerCoder(TypeDescriptor typeDescriptor, Coder coder) /** * Registers the provided {@link Coder} for the given class. * - *

Consider using {@link - * org.apache.beam.sdk.extensions.euphoria.core.translate.coder.RegisterCoders} when registering - * more coders at once. + *

+ * Consider using + * {@link org.apache.beam.sdk.extensions.euphoria.core.translate.coder.RegisterCoders} + * when registering more coders at once. + *

* * @param coderProvider */ diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamRunnerWrapper.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamRunnerWrapper.java index ef632265095a6..7e0f4741d29cb 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamRunnerWrapper.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamRunnerWrapper.java @@ -25,9 +25,9 @@ import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.AccumulatorProvider; import org.apache.beam.sdk.extensions.euphoria.core.client.flow.Flow; +import org.apache.beam.sdk.extensions.euphoria.core.translate.common.PipelineUtils; import org.apache.beam.sdk.extensions.euphoria.core.util.Settings; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,8 +60,7 @@ private BeamRunnerWrapper(PipelineOptions options, Settings settings) { /** @return wrapper around Beam's direct runner. It allows to run {@link Flow} locally. */ public static BeamRunnerWrapper ofDirect() { - final String[] args = {"--runner=DirectRunner"}; - final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class); + final PipelineOptions options = PipelineUtils.getDirectPipelineOptions(); return new BeamRunnerWrapper(options).withAllowedLateness(java.time.Duration.ofHours(1)); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamPTransform.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/EuphoriaPTransform.java similarity index 87% rename from sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamPTransform.java rename to sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/EuphoriaPTransform.java index a9c439759177f..ad9a3396858a8 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamPTransform.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/EuphoriaPTransform.java @@ -29,19 +29,19 @@ * @param type of input elements * @param type of output elements */ -public class BeamPTransform +public class EuphoriaPTransform extends PTransform, PCollection> { private final Function, Dataset> euphoriaTransform; - private BeamPTransform(Function, Dataset> euphoriaTransform) { + private EuphoriaPTransform(Function, Dataset> euphoriaTransform) { this.euphoriaTransform = euphoriaTransform; } - public static BeamPTransform of( + public static EuphoriaPTransform of( Function, Dataset> euphoriaDatasetTransform) { Objects.requireNonNull(euphoriaDatasetTransform); - return new BeamPTransform<>(euphoriaDatasetTransform); + return new EuphoriaPTransform<>(euphoriaDatasetTransform); } @Override diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/UnionTranslator.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/UnionTranslator.java index 6e21b5f284c59..92cad1dffbb79 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/UnionTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/UnionTranslator.java @@ -22,6 +22,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; +/** Euphoria to Beam translation of {@link Union} operator. */ class UnionTranslator implements OperatorTranslator { private static PCollection doTranslate(Union operator, TranslationContext context) { diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/coder/ClassAwareKryoCoder.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/coder/ClassAwareKryoCoder.java deleted file mode 100644 index c285a9f577401..0000000000000 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/coder/ClassAwareKryoCoder.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.euphoria.core.translate.coder; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; -import java.util.Objects; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.extensions.euphoria.core.client.functional.VoidFunction; -import org.apache.commons.compress.utils.BoundedInputStream; -import org.objenesis.strategy.StdInstantiatorStrategy; - -/** {@link Coder} which encodes/decodes objects of given {@link Class} through {@link Kryo}. */ -public class ClassAwareKryoCoder extends Coder { - - /** Initial size of byte buffers in {@link Output}, {@link Input}. */ - private static final int DEFAULT_BUFFER_SIZE = 4096; - - // this factory needs to be serializable, since the Coder itself is - private static final VoidFunction KRYO_FACTORY = - () -> { - final Kryo instance = new Kryo(); - ((Kryo.DefaultInstantiatorStrategy) instance.getInstantiatorStrategy()) - .setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - return instance; - }; - - private static ThreadLocal threadLocalKryo = ThreadLocal.withInitial(KRYO_FACTORY::apply); - - private static ThreadLocal threadLocalOutput = - ThreadLocal.withInitial(() -> new Output(DEFAULT_BUFFER_SIZE, -1)); - private static ThreadLocal threadLocalInput = - ThreadLocal.withInitial(() -> new Input(DEFAULT_BUFFER_SIZE)); - - private Class clazz; - - public ClassAwareKryoCoder(Class clazz) { - Objects.requireNonNull(clazz); - this.clazz = clazz; - } - - public static ClassAwareKryoCoder of(Class clazz) { - return new ClassAwareKryoCoder<>(clazz); - } - - @Override - public synchronized void encode(T value, OutputStream outStream) - throws CoderException, IOException { - - Output output = threadLocalOutput.get(); - output.clear(); - threadLocalKryo.get().writeObjectOrNull(output, value, clazz); - - DataOutputStream dos = new DataOutputStream(outStream); - // write length of encoded object first to get ability to limit read in decode() to one object - dos.writeInt(output.position()); - outStream.write(output.getBuffer(), 0, output.position()); - } - - @Override - public synchronized T decode(InputStream inStream) throws CoderException, IOException { - DataInputStream dis = new DataInputStream(inStream); - int lengthOfDecodedObject = dis.readInt(); - - Input input = threadLocalInput.get(); - - BoundedInputStream limitedStream = new BoundedInputStream(inStream, lengthOfDecodedObject); - input.setInputStream(limitedStream); - - return threadLocalKryo.get().readObjectOrNull(input, clazz); - } - - @Override - public List> getCoderArguments() { - return null; - } //TODO Is there a way of knowing this ? - - @Override - public void verifyDeterministic() throws NonDeterministicException { - //no op - } - - public Class getClazz() { - return clazz; - } -} diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/common/PipelineUtils.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/common/PipelineUtils.java new file mode 100644 index 0000000000000..4c2212b7fd3c8 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/common/PipelineUtils.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.euphoria.core.translate.common; + +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** A collection of handy methods. */ +public class PipelineUtils { + + public static PipelineOptions getDirectPipelineOptions() { + final String[] args = {"--runner=DirectRunner"}; + return PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class); + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/io/BeamUnboundedSource.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/io/BeamUnboundedSource.java index b9724a8260c2f..46b65c817bc91 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/io/BeamUnboundedSource.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/io/BeamUnboundedSource.java @@ -219,7 +219,7 @@ public BeamCheckpointMark decode(InputStream inStream) @Override public List> getCoderArguments() { - return null; + return Collections.singletonList(offsetTCoder); } @Override diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/OperatorsTestSuite.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/OperatorsTestSuite.java similarity index 81% rename from sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/OperatorsTestSuite.java rename to sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/OperatorsTestSuite.java index acc9c9a3988e5..ce1df10f8d454 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/OperatorsTestSuite.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/OperatorsTestSuite.java @@ -15,12 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.euphoria.core.translate; +package org.apache.beam.sdk.extensions.euphoria.core; import org.apache.beam.sdk.extensions.euphoria.core.testkit.CountByKeyTest; import org.apache.beam.sdk.extensions.euphoria.core.testkit.DistinctTest; import org.apache.beam.sdk.extensions.euphoria.core.testkit.FilterTest; +import org.apache.beam.sdk.extensions.euphoria.core.testkit.FlatMapTest; +import org.apache.beam.sdk.extensions.euphoria.core.testkit.JoinTest; import org.apache.beam.sdk.extensions.euphoria.core.testkit.MapElementsTest; +import org.apache.beam.sdk.extensions.euphoria.core.testkit.ReduceByKeyTest; +import org.apache.beam.sdk.extensions.euphoria.core.testkit.ReduceWindowTest; import org.apache.beam.sdk.extensions.euphoria.core.testkit.SumByKeyTest; import org.apache.beam.sdk.extensions.euphoria.core.testkit.UnionTest; import org.apache.beam.sdk.extensions.euphoria.core.testkit.junit.TestSuiteRunner; @@ -30,7 +34,7 @@ /** Euphoria operators test suite. */ @RunWith(TestSuiteRunner.class) @Suite.SuiteClasses({ - // BroadcastHashJoinTest.class, + // BroadcastHashJoinTest.class, CountByKeyTest.class, DistinctTest.class, FilterTest.class, @@ -43,5 +47,6 @@ // TopPerKeyTest.class, - uncomment when ReduceStateByKey is supported UnionTest.class, // WindowingTest.class, + ReduceWindowTest.class }) public class OperatorsTestSuite {} diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/MapElementsTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/MapElementsTest.java index 60ff4f6a34bb5..ec87eab9afe9a 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/MapElementsTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/MapElementsTest.java @@ -24,6 +24,9 @@ import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset; import org.apache.beam.sdk.extensions.euphoria.core.client.flow.Flow; import org.apache.beam.sdk.extensions.euphoria.core.client.operator.hint.SizeHint; +import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypePropagationAssert; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import org.junit.Test; /** Test operator MapElement. */ @@ -98,4 +101,18 @@ public void testBuild_Hints() { Dataset dataSetWithoutHint = MapElements.of(dataset).using(i -> i).output(); assertEquals(0, dataSetWithoutHint.getProducer().getHints().size()); } + + @Test + public void testTypePropagation() { + Flow flow1 = Flow.create("TEST1"); + Dataset input = Util.createMockDataset(flow1, 2); + + TypeDescriptor outputType = TypeDescriptors.strings(); + + Dataset mappedElements = + MapElements.named("Int2Str").of(input).using(String::valueOf, outputType).output(); + + MapElements map = (MapElements) flow1.operators().iterator().next(); + TypePropagationAssert.assertOperatorTypeAwareness(map, outputType); + } } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/client/type/TypePropagationAssert.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/client/type/TypePropagationAssert.java index 15ca2ef81b5ad..d810b89a91a3b 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/client/type/TypePropagationAssert.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/client/type/TypePropagationAssert.java @@ -23,7 +23,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; import org.junit.Assert; -/** Buncg of methods to assert type descriptors in operators. */ +/** Bunch of methods to assert type descriptors in operators. */ public class TypePropagationAssert { public static void assertOperatorTypeAwareness( diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java new file mode 100644 index 0000000000000..7fa0960638d9d --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java @@ -0,0 +1,846 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.euphoria.core.docs; + +import static java.util.Arrays.asList; + +import com.google.common.base.Splitter; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Stream; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset; +import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction; +import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector; +import org.apache.beam.sdk.extensions.euphoria.core.client.io.ListDataSource; +import org.apache.beam.sdk.extensions.euphoria.core.client.operator.AssignEventTime; +import org.apache.beam.sdk.extensions.euphoria.core.client.operator.CountByKey; +import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Distinct; +import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Filter; +import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap; +import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FullJoin; +import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join; +import org.apache.beam.sdk.extensions.euphoria.core.client.operator.LeftJoin; +import org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements; +import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey; +import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceWindow; +import org.apache.beam.sdk.extensions.euphoria.core.client.operator.RightJoin; +import org.apache.beam.sdk.extensions.euphoria.core.client.operator.SumByKey; +import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Union; +import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Util; +import org.apache.beam.sdk.extensions.euphoria.core.client.util.Fold; +import org.apache.beam.sdk.extensions.euphoria.core.translate.BeamFlow; +import org.apache.beam.sdk.extensions.euphoria.core.translate.EuphoriaPTransform; +import org.apache.beam.sdk.extensions.euphoria.core.translate.coder.KryoCoder; +import org.apache.beam.sdk.extensions.euphoria.core.translate.coder.RegisterCoders; +import org.apache.beam.sdk.extensions.euphoria.core.translate.common.PipelineUtils; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.joda.time.Duration; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; + +/** + * Contains all the examples from documentation page. Not all of them contains asserts, some do, but + * the rest is often here just to confirm that they compile. Once something break or changes, the + * documentation needs to change too. + */ +public class DocumentationExamplesTest { + private List textLineByLine = + Arrays.asList( + "Lorem ipsum dolor sit amet, consectetur adipiscing elit. ", + "Vestibulum volutpat pellentesque risus at sodales.", + "Interdum et malesuada fames ac ante ipsum primis in faucibus.", + "Donec sit amet arcu nec tellus sodales ultricies.", + "Quisque ipsum fermentum nisl at libero accumsan consectetur.", + "Praesent lobortis ex eget ex rhoncus, quis malesuada risus tristique.", + "Aliquam risus at orci, porttitor eu turpis et, porttitor semper ligula."); + @Rule public final TestPipeline pipeline = TestPipeline.create(); + + @Ignore("We do not want to actually write output files from this test.") + @Test + public void wordCountExample() { + PipelineOptions options = PipelineUtils.getDirectPipelineOptions(); + + Pipeline pipeline = Pipeline.create(options); + + // Transform to euphoria's flow. + BeamFlow flow = BeamFlow.of(pipeline); + + // Source of data loaded from Beam IO. + PCollection input = + pipeline + .apply(Create.of(textLineByLine)) + .setTypeDescriptor(TypeDescriptor.of(String.class)); + // Transform PCollection to euphoria's Dataset. + Dataset lines = flow.wrapped(input); + + // FlatMap processes one input element at a time and allows user code to emit + // zero, one, or more output elements. From input lines we will get data set of words. + Dataset words = + FlatMap.named("TOKENIZER") + .of(lines) + .using( + (String line, Collector context) -> { + for (String word : Splitter.onPattern("\\s+").split(line)) { + context.collect(word); + } + }) + .output(); + + // Now we can count input words - the operator ensures that all values for the same + // key (word in this case) end up being processed together. Then it counts number of appearances + // of the same key in 'words' dataset and emits it to output. + Dataset> counted = CountByKey.named("COUNT").of(words).keyBy(w -> w).output(); + + // Format output. + Dataset output = + MapElements.named("FORMAT") + .of(counted) + .using(p -> p.getKey() + ": " + p.getValue()) + .output(); + + // Transform Dataset back to PCollection. It can be done in any step of this flow. + PCollection outputCollection = flow.unwrapped(output); + + // Now we can again use Beam transformation. In this case we save words and their count + // into the text file. + outputCollection.apply(TextIO.write().to("counted_words")); + + pipeline.run(); + } + + @Test + public void inputsAndOutputsSection() { + BeamFlow flow = BeamFlow.of(pipeline); + + PCollection input = + pipeline + .apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck")) + .setTypeDescriptor(TypeDescriptor.of(String.class)); + + Dataset dataset = flow.wrapped(input); + + pipeline.run(); + } + + @Test + public void addOperatorSection() { + BeamFlow flow = BeamFlow.of(pipeline); + Dataset input = flow.createInput(ListDataSource.bounded(asList(1, 2, 4, 3))); + + Dataset mappedElements = + MapElements.named("Int2Str").of(input).using(String::valueOf).output(); + + PAssert.that(flow.unwrapped(mappedElements)).containsInAnyOrder("1", "2", "4", "3"); + + pipeline.run(); + } + + @Test + public void metricsAndAccumulatorsSection() { + Pipeline pipeline = Pipeline.create(PipelineUtils.getDirectPipelineOptions()); + + // Transform to euphoria's flow. + BeamFlow flow = BeamFlow.of(pipeline); + + Dataset dataset = Util.createMockDataset(flow, 1); + + Dataset flatMapped = + FlatMap.named("FlatMap1") + .of(dataset) + .using( + (String value, Collector context) -> { + context.getCounter("my-counter").increment(); + context.collect(value); + }) + .output(); + + Dataset mapped = + MapElements.named("MapThem") + .of(dataset) + .using( + (value, context) -> { + // use simple counter + context.getCounter("my-counter").increment(); + + return value.toLowerCase(); + }) + .output(); + } + + @Test + public void codersAndTypesSection() { + Pipeline pipeline = Pipeline.create(PipelineUtils.getDirectPipelineOptions()); + + // Transform to euphoria's flow. + BeamFlow flow = BeamFlow.of(pipeline); + + KryoCoder beamCoder = KryoCoder.withoutClassRegistration(); + KryoCoder> typeParametrizedCoder = + KryoCoder.withoutClassRegistration(); + + RegisterCoders.to(flow) + .setKryoClassRegistrar( + (kryo) -> { + kryo.register(KryoSerializedElementType.class); //other may follow + }) + .registerCoder(AnotherElementType.class, beamCoder) + .registerCoder( + new TypeDescriptor>() {}, typeParametrizedCoder) + .done(); + + Dataset input = Util.createMockDataset(flow, 1); + + MapElements.named("Int2Str") + .of(input) + .using(String::valueOf, TypeDescriptors.strings()) + .output(); + } + + @Test + public void windowingSection() { + + BeamFlow flow = BeamFlow.of(pipeline); + + PCollection inputPcoll = + pipeline.apply(Create.of(1, 2, 3, 4)).setTypeDescriptor(TypeDescriptors.integers()); + + Dataset input = flow.wrapped(inputPcoll); + + Dataset> countedElements = + CountByKey.of(input) + .keyBy(e -> e) + .windowBy(FixedWindows.of(Duration.standardSeconds(1))) + .triggeredBy(DefaultTrigger.of()) + .discardingFiredPanes() + .output(); + + pipeline.run(); + } + + private static class KryoSerializedElementType {} + + private static class AnotherElementType {} + + private static class ParametrizedTestDataType {} + + @Test + public void countByKeyOperator() { + // Transform to euphoria's flow. + BeamFlow flow = BeamFlow.of(pipeline); + + Dataset input = flow.createInput(ListDataSource.bounded(asList(1, 2, 4, 1, 1, 3))); + + // suppose input: [1, 2, 4, 1, 1, 3] + Dataset> output = CountByKey.of(input).keyBy(e -> e).output(); + // Output will contain: [KV(1, 3), KV(2, 1), KV(3, 1), (4, 1)] + + PAssert.that(flow.unwrapped(output)) + .containsInAnyOrder(asList(KV.of(1, 3L), KV.of(2, 1L), KV.of(3, 1L), KV.of(4, 1L))); + + pipeline.run(); + } + + @Test + public void distinctOperator() { + + // Transform to euphoria's flow. + BeamFlow flow = BeamFlow.of(pipeline); + + Dataset input = flow.createInput(ListDataSource.bounded(asList(1, 2, 3, 3, 2, 1))); + + // suppose input: [1, 2, 3, 3, 2, 1] + Distinct.named("unique-integers-only").of(input).output(); + // Output will contain: 1, 2, 3 + + Dataset> keyValueInput = + flow.createInput( + ListDataSource.bounded( + asList( + KV.of(1, 100L), + KV.of(3, 100_000L), + KV.of(42, 10L), + KV.of(1, 0L), + KV.of(3, 0L)))); + + // suppose input: [KV(1, 100L), KV(3, 100_000L), KV(42, 10L), KV(1, 0L), KV(3, 0L)] + Dataset uniqueKeys = + Distinct.named("unique-keys-only").of(keyValueInput).mapped(KV::getKey).output(); + // Output will contain: 1, 3, 42 + + PAssert.that(flow.unwrapped(uniqueKeys)).containsInAnyOrder(1, 3, 42); + + pipeline.run(); + } + + @Test + public void batchJoinOperator() { + + // Transform to euphoria's flow. + BeamFlow flow = BeamFlow.of(pipeline); + + Dataset left = flow.createInput(ListDataSource.bounded(asList(1, 2, 3, 0, 4, 3, 1))); + Dataset right = + flow.createInput( + ListDataSource.bounded(asList("mouse", "rat", "elephant", "cat", "X", "duck"))); + + // suppose that left contains: [1, 2, 3, 0, 4, 3, 1] + // suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"] + Dataset> joined = + Join.named("join-length-to-words") + .of(left, right) + .by(le -> le, String::length) // key extractors + .using((Integer l, String r, Collector c) -> c.collect(l + "+" + r)) + .output(); + + // joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"), KV(4, "4+duck"), + // KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X")] + + PCollection> outputPCollection = flow.unwrapped(joined); + PAssert.that(outputPCollection) + .containsInAnyOrder( + asList( + KV.of(1, "1+X"), + KV.of(3, "3+cat"), + KV.of(3, "3+rat"), + KV.of(4, "4+duck"), + KV.of(3, "3+cat"), + KV.of(3, "3+rat"), + KV.of(1, "1+X"))); + + pipeline.run(); + } + + @Test + public void batchLeftJoinOperator() { + + // Transform to euphoria's flow. + BeamFlow flow = BeamFlow.of(pipeline); + + Dataset left = flow.createInput(ListDataSource.bounded(asList(1, 2, 3, 0, 4, 3, 1))); + Dataset right = + flow.createInput( + ListDataSource.bounded(asList("mouse", "rat", "elephant", "cat", "X", "duck"))); + + // suppose that left contains: [1, 2, 3, 0, 4, 3, 1] + // suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"] + Dataset> joined = + LeftJoin.named("left-join-length-to-words") + .of(left, right) + .by(le -> le, String::length) // key extractors + .using( + (Integer l, Optional r, Collector c) -> + c.collect(l + "+" + r.orElse(null))) + .output(); + + // joined will contain: [KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"), + // KV(3, "3+rat"), KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"), + // KV(3, "3+rat"), KV(1, "1+X")] + + PCollection> outputPCollection = flow.unwrapped(joined); + PAssert.that(outputPCollection) + .containsInAnyOrder( + asList( + KV.of(1, "1+X"), + KV.of(2, "2+null"), + KV.of(3, "3+cat"), + KV.of(3, "3+rat"), + KV.of(0, "0+null"), + KV.of(4, "4+duck"), + KV.of(3, "3+cat"), + KV.of(3, "3+rat"), + KV.of(1, "1+X"))); + + pipeline.run(); + } + + @Test + public void batchRightJoinFullOperator() { + + // Transform to euphoria's flow. + BeamFlow flow = BeamFlow.of(pipeline); + + Dataset left = flow.createInput(ListDataSource.bounded(asList(1, 2, 3, 0, 4, 3, 1))); + Dataset right = + flow.createInput( + ListDataSource.bounded(asList("mouse", "rat", "elephant", "cat", "X", "duck"))); + + // suppose that left contains: [1, 2, 3, 0, 4, 3, 1] + // suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"] + Dataset> joined = + RightJoin.named("right-join-length-to-words") + .of(left, right) + .by(le -> le, String::length) // key extractors + .using( + (Optional l, String r, Collector c) -> + c.collect(l.orElse(null) + "+" + r)) + .output(); + + // joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"), + // KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X"), + // KV(8, "null+elephant"), KV(5, "null+mouse")] + + PCollection> outputPCollection = flow.unwrapped(joined); + PAssert.that(outputPCollection) + .containsInAnyOrder( + asList( + KV.of(1, "1+X"), + KV.of(3, "3+cat"), + KV.of(3, "3+rat"), + KV.of(4, "4+duck"), + KV.of(3, "3+cat"), + KV.of(3, "3+rat"), + KV.of(1, "1+X"), + KV.of(8, "null+elephant"), + KV.of(5, "null+mouse"))); + + pipeline.run(); + } + + @Test + public void batchFullJoinOperator() { + + BeamFlow flow = BeamFlow.of(pipeline); + + Dataset left = flow.createInput(ListDataSource.bounded(asList(1, 2, 3, 0, 4, 3, 1))); + Dataset right = + flow.createInput( + ListDataSource.bounded(asList("mouse", "rat", "elephant", "cat", "X", "duck"))); + + // suppose that left contains: [1, 2, 3, 0, 4, 3, 1] + // suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"] + Dataset> joined = + FullJoin.named("join-length-to-words") + .of(left, right) + .by(le -> le, String::length) // key extractors + .using( + (Optional l, Optional r, Collector c) -> + c.collect(l.orElse(null) + "+" + r.orElse(null))) + .output(); + + // joined will contain: [ KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"), KV(3, "3+rat"), + // KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"),KV(1, "1+X"), + // KV(1, "null+elephant"), KV(5, "null+mouse")]; + + PCollection> outputPCollection = flow.unwrapped(joined); + PAssert.that(outputPCollection) + .containsInAnyOrder( + asList( + KV.of(1, "1+X"), + KV.of(2, "2+null"), + KV.of(3, "3+cat"), + KV.of(3, "3+rat"), + KV.of(0, "0+null"), + KV.of(4, "4+duck"), + KV.of(3, "3+cat"), + KV.of(3, "3+rat"), + KV.of(1, "1+X"), + KV.of(8, "null+elephant"), + KV.of(5, "null+mouse"))); + + pipeline.run(); + } + + @Test + public void mapElementsOperator() { + + BeamFlow flow = BeamFlow.of(pipeline); + Dataset input = flow.createInput(ListDataSource.bounded(asList(0, 1, 2, 3, 4, 5))); + + // suppose inputs contains: [ 0, 1, 2, 3, 4, 5] + Dataset strings = MapElements.named("int2str").of(input).using(i -> "#" + i).output(); + // strings will contain: [ "#0", "#1", "#2", "#3", "#4", "#5"] + + PAssert.that(flow.unwrapped(strings)).containsInAnyOrder("#0", "#1", "#2", "#3", "#4", "#5"); + + pipeline.run(); + } + + @Test + public void flatMapOperator() { + BeamFlow flow = BeamFlow.of(pipeline); + + Dataset words = + flow.createInput(ListDataSource.bounded(asList("Brown", "fox", ".", ""))); + + // suppose words contain: ["Brown", "fox", ".", ""] + Dataset letters = + FlatMap.named("str2char") + .of(words) + .using( + (String s, Collector collector) -> { + for (int i = 0; i < s.length(); i++) { + char c = s.charAt(i); + collector.collect(String.valueOf(c)); + } + }) + .output(); + // characters will contain: ["B", "r", "o", "w", "n", "f", "o", "x", "."] + + PAssert.that(flow.unwrapped(letters)) + .containsInAnyOrder("B", "r", "o", "w", "n", "f", "o", "x", "."); + pipeline.run(); + } + + @Test + public void flatMapWithTimeExtractorOperator() { + BeamFlow flow = BeamFlow.of(pipeline); + + Dataset events = + flow.createInput( + ListDataSource.bounded( + asList( + new SomeEventObject(0), + new SomeEventObject(1), + new SomeEventObject(2), + new SomeEventObject(3), + new SomeEventObject(4)))); + + // suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp + Dataset timeStampedEvents = + FlatMap.named("extract-event-time") + .of(events) + .using((SomeEventObject e, Collector c) -> c.collect(e)) + .eventTimeBy(SomeEventObject::getEventTimeInMillis) + .output(); + //Euphoria will now know event time for each event + + // PAssert.that(flow.unwrapped(timeStampedEvents)) + // .inWindow(new IntervalWindow(new Instant(0), new Instant(5))); + + pipeline.run(); + } + + @Test + public void filterOperator() { + BeamFlow flow = BeamFlow.of(pipeline); + + Dataset nums = + flow.createInput(ListDataSource.bounded(asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))); + + // suppose nums contains: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + Dataset divisibleBythree = + Filter.named("divisibleByFive").of(nums).by(e -> e % 3 == 0).output(); + //divisibleBythree will contain: [ 0, 3, 6, 9] + + PAssert.that(flow.unwrapped(divisibleBythree)).containsInAnyOrder(0, 3, 6, 9); + pipeline.run(); + } + + @Test + public void reduceByKeyTestOperator1() { + BeamFlow flow = BeamFlow.of(pipeline); + + Dataset animals = + flow.createInput( + ListDataSource.bounded(asList("mouse", "rat", "elephant", "cat", "X", "duck"))); + + //suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"] + Dataset> countOfAnimalNamesByLength = + ReduceByKey.named("to-letters-couts") + .of(animals) + .keyBy(String::length) // length of animal name will be used as groupping key + // we need to count each animal name once, so why not to optimize each string to 1 + .valueBy(e -> 1) + .reduceBy(Stream::count) + .output(); + // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ] + + PAssert.that(flow.unwrapped(countOfAnimalNamesByLength)) + .containsInAnyOrder( + asList(KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L))); + + pipeline.run(); + } + + @Test + public void reduceByKeyTestOperatorCombinable() { + BeamFlow flow = BeamFlow.of(pipeline); + + Dataset animals = + flow.createInput( + ListDataSource.bounded(asList("mouse", "rat", "elephant", "cat", "X", "duck"))); + + //suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"] + Dataset> countOfAnimalNamesByLenght = + ReduceByKey.named("to-letters-couts") + .of(animals) + .keyBy(String::length) // length of animal name will be used as grouping key + // we need to count each animal name once, so why not to optimize each string to 1 + .valueBy(e -> 1L) + .combineBy(s -> s.mapToLong(l -> l).sum()) + .output(); + // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ] + + PAssert.that(flow.unwrapped(countOfAnimalNamesByLenght)) + .containsInAnyOrder( + asList(KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L))); + + pipeline.run(); + } + + @Test + public void reduceByKeyTestOperatorContext() { + BeamFlow flow = BeamFlow.of(pipeline); + + Dataset animals = + flow.createInput( + ListDataSource.bounded(asList("mouse", "rat", "elephant", "cat", "X", "duck"))); + + //suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"] + Dataset> countOfAnimalNamesByLenght = + ReduceByKey.named("to-letters-couts") + .of(animals) + .keyBy(String::length) // length of animal name will e used as grouping key + // we need to count each animal name once, so why not to optimize each string to 1 + .valueBy(e -> 1) + .reduceBy( + (Stream s, Collector collector) -> { + collector.collect(s.count()); + collector.asContext().getCounter("num-of-keys").increment(); + }) + .output(); + // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ] + + PAssert.that(flow.unwrapped(countOfAnimalNamesByLenght)) + .containsInAnyOrder( + asList(KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L))); + + pipeline.run(); + } + + /** + * Note that this one is not mentioned in documentation due to high number of RBK examples + * and rather lower explanation value. Please consider to include it in future + */ + @Test + public void reduceByKeyTestOperatorContextManyOutputs() { + BeamFlow flow = BeamFlow.of(pipeline); + + Dataset animals = + flow.createInput( + ListDataSource.bounded(asList("mouse", "rat", "elephant", "cat", "X", "duck"))); + + Dataset> countOfAnimalNamesByLenght = + ReduceByKey.named("to-letters-couts") + .of(animals) + .keyBy(String::length) // length of animal name will e used as grouping key + // we need to count each animal name once, so why not to optimize each string to 1 + .valueBy(e -> 1) + .reduceBy( + (Stream s, Collector collector) -> { + long count = s.count(); + collector.collect(count); + collector.collect(2L * count); + }) + .output(); + + PAssert.that(flow.unwrapped(countOfAnimalNamesByLenght)) + .containsInAnyOrder( + asList( + KV.of(1, 1L), + KV.of(3, 2L), + KV.of(4, 1L), + KV.of(5, 1L), + KV.of(8, 1L), + KV.of(1, 2L), + KV.of(3, 4L), + KV.of(4, 2L), + KV.of(5, 2L), + KV.of(8, 2L))); + + pipeline.run(); + } + + @Test + public void reduceByKeyTestOperatorFold() { + BeamFlow flow = BeamFlow.of(pipeline); + + Dataset animals = + flow.createInput( + ListDataSource.bounded(asList("mouse", "rat", "elephant", "cat", "X", "duck"))); + + //suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"] + Dataset> countOfAnimalNamesByLenght = + ReduceByKey.named("to-letters-couts") + .of(animals) + .keyBy(String::length) // length of animal name will be used as grouping key + // we need to count each animal name once, so why not to optimize each string to 1 + .valueBy(e -> 1L) + .combineBy(Fold.of((l1, l2) -> l1 + l2)) + .output(); + // countOfAnimalNamesByLength will contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ] + + PAssert.that(flow.unwrapped(countOfAnimalNamesByLenght)) + .containsInAnyOrder( + asList(KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L))); + + pipeline.run(); + } + + @Test + public void testSumByKeyOperator() { + BeamFlow flow = BeamFlow.of(pipeline); + Dataset input = + flow.createInput(ListDataSource.bounded(asList(1, 2, 3, 4, 5, 6, 7, 8, 9))); + + //suppose input contains: [ 1, 2, 3, 4, 5, 6, 7, 8, 9 ] + Dataset> output = + SumByKey.named("sum-odd-and-even") + .of(input) + .keyBy(e -> e % 2) + .valueBy(e -> (long) e) + .output(); + // output will contain: [ KV.of(0, 20L), KV.of(1, 25L)] + + PAssert.that(flow.unwrapped(output)).containsInAnyOrder(asList(KV.of(0, 20L), KV.of(1, 25L))); + pipeline.run(); + } + + @Test + public void testUnionOperator() { + BeamFlow flow = BeamFlow.of(pipeline); + + Dataset cats = + flow.createInput(ListDataSource.bounded(asList("cheetah", "cat", "lynx", "jaguar"))); + + Dataset rodents = + flow.createInput( + ListDataSource.bounded(asList("squirrel", "mouse", "rat", "lemming", "beaver"))); + + //suppose cats contains: [ "cheetah", "cat", "lynx", "jaguar" ] + //suppose rodents conains: [ "squirrel", "mouse", "rat", "lemming", "beaver" ] + Dataset animals = Union.named("to-animals").of(cats, rodents).output(); + + // animal will contain: "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat", "lemming", "beaver" + PAssert.that(flow.unwrapped(animals)) + .containsInAnyOrder( + "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat", "lemming", "beaver"); + + pipeline.run(); + } + + @Test + public void testAssignEventTimeOperator() { + BeamFlow flow = BeamFlow.of(pipeline); + + Dataset events = + flow.createInput( + ListDataSource.bounded( + asList( + new SomeEventObject(0), + new SomeEventObject(1), + new SomeEventObject(2), + new SomeEventObject(3), + new SomeEventObject(4)))); + + // suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp + Dataset timeStampedEvents = + AssignEventTime.named("extract-event-time") + .of(events) + .using(SomeEventObject::getEventTimeInMillis) + .output(); + //Euphoria will now know event time for each event + + // PAssert.that(flow.unwrapped(timeStampedEvents)) + // .inWindow(new IntervalWindow(new Instant(0), new Instant(5))); + + pipeline.run(); + } + + private static class SomeEventObject implements Serializable { + + private long timestamp; + + SomeEventObject(long timestamp) { + this.timestamp = timestamp; + } + + long getEventTimeInMillis() { + return timestamp; + } + } + + @Test + public void testEuphoriaPTransformExample() { + + PCollection inputs = + pipeline.apply( + "Create", Create.of("a", "b", "c", "A", "a", "C", "x").withCoder(StringUtf8Coder.of())); + + //suppose inputs PCollection contains: [ "a", "b", "c", "A", "a", "C", "x"] + PCollection> lettersWithCounts = + inputs.apply( + "count-uppercase-letters-in-Euphoria", + EuphoriaPTransform.of( + (Dataset input) -> { + Dataset upperCase = + MapElements.of(input) + .using((UnaryFunction) String::toUpperCase) + .output(); + + return CountByKey.of(upperCase).keyBy(e -> e).output(); + })); + //now the 'lettersWithCounts' will contain [ KV("A", 3L), KV("B", 1L), KV("C", 2L), KV("X", 1L) ] + + PAssert.that(lettersWithCounts) + .containsInAnyOrder(asList(KV.of("A", 3L), KV.of("B", 1L), KV.of("C", 2L), KV.of("X", 1L))); + + pipeline.run(); + } + + @Test + public void testReduceWithWindowOperator() { + + BeamFlow flow = BeamFlow.of(pipeline); + + Dataset input = + flow.createInput(ListDataSource.bounded(asList(1, 2, 3, 4, 5, 6, 7, 8))); + + //suppose input contains [ 1, 2, 3, 4, 5, 6, 7, 8 ] + //lets assign time-stamp to each input element + Dataset withEventTime = AssignEventTime.of(input).using(i -> 1000L * i).output(); + + Dataset output = + ReduceWindow.of(withEventTime) + .combineBy(Fold.of((i1, i2) -> i1 + i2)) + .windowBy(FixedWindows.of(Duration.millis(5000))) + .triggeredBy(DefaultTrigger.of()) + .discardingFiredPanes() + .output(); + //output will contain: [ 10, 26 ] + + PAssert.that(flow.unwrapped(output)).containsInAnyOrder(10, 26); + + pipeline.run(); + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/JoinTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/JoinTest.java index 43f727aceec86..844305d74c458 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/JoinTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/JoinTest.java @@ -102,6 +102,51 @@ public List> getUnorderedOutput() { }); } + @Processing(Processing.Type.BOUNDED) + @Test + public void batchJoinFullOuterExample() { + execute( + new JoinTestCase>() { + + @Override + protected Dataset> getOutput( + Dataset left, Dataset right) { + + return FullJoin.of(left, right) + .by(le -> le, String::length) + .using( + (Optional l, Optional r, Collector c) -> + c.collect(l.orElse(null) + "+" + r.orElse(null))) + .output(); + } + + @Override + protected List getLeftInput() { + return Arrays.asList(1, 2, 3, 0, 4, 3, 1); + } + + @Override + protected List getRightInput() { + return Arrays.asList("mouse", "rat", "cat", "X", "duck"); + } + + @Override + public List> getUnorderedOutput() { + return Arrays.asList( + KV.of(1, "1+X"), + KV.of(2, "2+null"), + KV.of(3, "3+cat"), + KV.of(3, "3+rat"), + KV.of(0, "0+null"), + KV.of(4, "4+duck"), + KV.of(3, "3+cat"), + KV.of(3, "3+rat"), + KV.of(1, "1+X"), + KV.of(5, "null+mouse")); + } + }); + } + @Processing(Processing.Type.BOUNDED) @Test public void batchJoinFullOuter_outputValues() { diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/ReduceWindowTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/ReduceWindowTest.java index 6aae808507b1d..c08890555d532 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/ReduceWindowTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/ReduceWindowTest.java @@ -17,22 +17,32 @@ */ package org.apache.beam.sdk.extensions.euphoria.core.testkit; +import static java.util.Arrays.asList; + import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset; +import org.apache.beam.sdk.extensions.euphoria.core.client.io.ListDataSource; import org.apache.beam.sdk.extensions.euphoria.core.client.operator.AssignEventTime; import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceWindow; import org.apache.beam.sdk.extensions.euphoria.core.client.util.Sums; import org.apache.beam.sdk.extensions.euphoria.core.testkit.junit.AbstractOperatorTest; import org.apache.beam.sdk.extensions.euphoria.core.testkit.junit.Processing; +import org.apache.beam.sdk.extensions.euphoria.core.translate.BeamFlow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.joda.time.Duration; +import org.junit.Rule; import org.junit.Test; /** Test operator {@code ReduceByKey}. */ @Processing(Processing.Type.ALL) public class ReduceWindowTest extends AbstractOperatorTest { + @Rule public final TestPipeline pipeline = TestPipeline.create(); + @Test public void testReduceWithWindowing() { execute( @@ -93,4 +103,58 @@ public List getUnorderedOutput() { } }); } + + @Test + public void testReduceWithAttachedWindowingMoreWindows() { + execute( + new AbstractTestCase() { + @Override + protected Dataset getOutput(Dataset input) { + Dataset withEventTime = + AssignEventTime.of(input).using(i -> 1000L * i).output(); + + Dataset first = + ReduceWindow.of(withEventTime) + .combineBy(Sums.ofInts()) + .windowBy(FixedWindows.of(org.joda.time.Duration.standardSeconds(5))) + .triggeredBy(DefaultTrigger.of()) + .discardingFiredPanes() + .output(); + + return ReduceWindow.of(first).combineBy(Sums.ofInts()).output(); + } + + @Override + protected List getInput() { + return Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 100); + } + + @Override + public List getUnorderedOutput() { + return Arrays.asList(10, 35, 10, 100); + } + }); + } + + @Test + public void testReduceWithWindowingMoreWindowsTestPipeline() { + + BeamFlow flow = BeamFlow.of(pipeline); + Dataset input = + flow.createInput(ListDataSource.bounded(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 100))); + + Dataset withEventTime = AssignEventTime.of(input).using(i -> 1000L * i).output(); + + Dataset output = + ReduceWindow.of(withEventTime) + .combineBy(Sums.ofInts()) + .windowBy(FixedWindows.of(Duration.standardSeconds(5))) + .triggeredBy(DefaultTrigger.of()) + .discardingFiredPanes() + .output(); + + PAssert.that(flow.unwrapped(output)).containsInAnyOrder(10, 35, 10, 100); + + pipeline.run(); + } } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/junit/TestSuiteRunner.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/junit/TestSuiteRunner.java index 9cf6267214b6f..548745d70ad8d 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/junit/TestSuiteRunner.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/junit/TestSuiteRunner.java @@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory; /** TODO: add javadoc. */ -public class TestSuiteRunner extends Suite { //TODO rename +public class TestSuiteRunner extends Suite { private static final Logger LOG = LoggerFactory.getLogger(TestSuiteRunner.class); @@ -56,10 +56,10 @@ public TestSuiteRunner(Class klass) throws Throwable { // ~ for each encountered test method set up a special runner Optional kPType = getProcessingType(klass); - BeamRunnerWrapper runner = - BeamRunnerWrapper.ofDirect().withAllowedLateness(Duration.ofHours(1)); Class[] testClasses = getAnnotatedClasses(klass); + for (Class testClass : testClasses) { + LOG.info(String.format("Testkit will schedule tests in '%s'.", testClass)); boolean isOperatorTest = isAbstractOperatorTest(testClass); TestClass tc = new TestClass(testClass); @@ -68,24 +68,27 @@ public TestSuiteRunner(Class klass) throws Throwable { Optional cPType = getProcessingType(testClass); for (FrameworkMethod method : methods) { - if (isOperatorTest) { - Optional mPType = getProcessingType(method.getMethod()); - checkArgument( - cPType.isPresent() || mPType.isPresent(), - "Processing annotation is missing either on method or class!"); - Optional definedPType = merged(cPType, mPType); - checkArgument(definedPType.isPresent(), "Conflicting processings!"); - - Optional rPType = merged(kPType, definedPType); - if (rPType.isPresent()) { - for (Processing.Type ptype : rPType.get().asList()) { - addRunner(runners, testClass, method, runner, ptype, paramsList); - } - } else { - addRunner(runners, testClass, method, runner, null, paramsList); + LOG.info(String.format("Test method found '%s' it will be scheduled to run.", method)); + + if (!isOperatorTest) { + addRunner(runners, testClass, method, null, paramsList); + continue; + } + + Optional mPType = getProcessingType(method.getMethod()); + checkArgument( + cPType.isPresent() || mPType.isPresent(), + "Processing annotation is missing either on method or class!"); + Optional definedPType = merged(cPType, mPType); + checkArgument(definedPType.isPresent(), "Conflicting processings!"); + + Optional rPType = merged(kPType, definedPType); + if (rPType.isPresent()) { + for (Processing.Type ptype : rPType.get().asList()) { + addRunner(runners, testClass, method, ptype, paramsList); } } else { - addRunner(runners, testClass, method, runner, null, paramsList); + addRunner(runners, testClass, method, null, paramsList); } } } @@ -95,15 +98,15 @@ private static void addRunner( List acc, Class testClass, FrameworkMethod method, - BeamRunnerWrapper runner, Processing.Type pType, List paramsList) throws Throwable { + if (paramsList == null || paramsList.isEmpty()) { - acc.add(new ExecutorProviderTestMethodRunner(testClass, method, runner, pType, null)); + acc.add(new ExecutorProviderTestMethodRunner(testClass, method, pType, null)); } else { for (Object[] params : paramsList) { - acc.add(new ExecutorProviderTestMethodRunner(testClass, method, runner, pType, params)); + acc.add(new ExecutorProviderTestMethodRunner(testClass, method, pType, params)); } } } @@ -174,11 +177,7 @@ static class ExecutorProviderTestMethodRunner extends BlockJUnit4ClassRunner { private final Object[] parameterList; ExecutorProviderTestMethodRunner( - Class testClass, - FrameworkMethod method, - BeamRunnerWrapper runner, - Processing.Type ptype, - Object[] parameterList) + Class testClass, FrameworkMethod method, Processing.Type ptype, Object[] parameterList) throws InitializationError { super(testClass); this.procType = ptype; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamFlowTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamFlowTest.java index febdb830583f9..e55e7ed6553f5 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamFlowTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamFlowTest.java @@ -44,7 +44,6 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; -import org.junit.Ignore; import org.junit.Test; /** Test for {@link BeamFlow}. */ @@ -143,8 +142,8 @@ public void testPipelineToAndFromBeam() { pipeline.run(); } + @Test @SuppressWarnings("unchecked") - @Ignore public void testPipelineWithRBK() { String raw = "hi there hi hi sue bob hi sue ZOW bob"; List words = Arrays.asList(raw.split(" ")); @@ -165,7 +164,7 @@ public void testPipelineWithRBK() { pipeline.run(); } - @Ignore + @Test public void testPipelineWithEventTime() { List> raw = Arrays.asList( diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamPTransformTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/EuphoriaPTransformTest.java similarity index 93% rename from sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamPTransformTest.java rename to sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/EuphoriaPTransformTest.java index 435a5c07335d5..afc58b12605a3 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamPTransformTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/EuphoriaPTransformTest.java @@ -33,8 +33,8 @@ import org.junit.Rule; import org.junit.Test; -/** A group of test focused at {@link BeamPTransform}. */ -public class BeamPTransformTest implements Serializable { +/** A group of test focused at {@link EuphoriaPTransform}. */ +public class EuphoriaPTransformTest implements Serializable { private static final String BASE_STRING = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur et imperdiet nulla," @@ -53,7 +53,7 @@ public void basicBeamTransformTest() { .apply("Create", Create.of(words).withCoder(StringUtf8Coder.of())) .apply( "To-UpperCase", - BeamPTransform.of( + EuphoriaPTransform.of( input -> MapElements.of(input).using(s -> s.toUpperCase()).output())); PAssert.that(pCollection).containsInAnyOrder(upperCaseWords); @@ -72,7 +72,7 @@ public void testChainedOperations() { .apply("Create", Create.of(words).withCoder(StringUtf8Coder.of())) .apply( "To-UpperCase", - BeamPTransform.of( + EuphoriaPTransform.of( (Dataset input) -> { Dataset upperCase = MapElements.of(input).using(s -> s.toUpperCase()).output(); @@ -97,7 +97,7 @@ public void testBeamTransformWhenFlowIsExecuted() { .apply("Create", Create.of(words).withCoder(StringUtf8Coder.of())) .apply( "To-UpperCase", - BeamPTransform.of( + EuphoriaPTransform.of( input -> MapElements.of(input).using(s -> s.toUpperCase()).output())); PAssert.that(pCollection).containsInAnyOrder(upperCaseWords); @@ -122,7 +122,7 @@ public void testFlowToPCollection() { PCollection twicePCollection = unwrapped.apply( - "Twice", BeamPTransform.of(in -> MapElements.of(in).using(i -> 2 * i).output())); + "Twice", EuphoriaPTransform.of(in -> MapElements.of(in).using(i -> 2 * i).output())); PAssert.that(twicePCollection).containsInAnyOrder(4, 6, 8, 10, 12, 14); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/coder/ClassAwareKryoCoderTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/coder/ClassAwareKryoCoderTest.java deleted file mode 100644 index a56831b538919..0000000000000 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/coder/ClassAwareKryoCoderTest.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.euphoria.core.translate.coder; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.values.KV; -import org.junit.Assert; -import org.junit.Test; - -/** Test targeted at {@link ClassAwareKryoCoder}. */ -public class ClassAwareKryoCoderTest { - - @Test - public void testCoding() throws IOException { - ClassAwareKryoCoder coder = new ClassAwareKryoCoder<>(ClassToBeEncoded.class); - assertEncoding(coder); - } - - @Test - public void testCodingOfTwoClassesInSerial() throws IOException { - ClassAwareKryoCoder coder = new ClassAwareKryoCoder<>(ClassToBeEncoded.class); - ClassAwareKryoCoder secondCoder = new ClassAwareKryoCoder<>(TestClass.class); - - ClassToBeEncoded originalValue = new ClassToBeEncoded("XyZ", 42, Double.NaN); - TestClass secondOriginalValue = new TestClass("just a parameter"); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - - coder.encode(originalValue, outputStream); - secondCoder.encode(secondOriginalValue, outputStream); - - byte[] buf = outputStream.toByteArray(); - ByteArrayInputStream inputStream = new ByteArrayInputStream(buf); - - ClassToBeEncoded decodedValue = coder.decode(inputStream); - TestClass secondDecodedValue = secondCoder.decode(inputStream); - - Assert.assertNotNull(decodedValue); - Assert.assertEquals(originalValue, decodedValue); - - Assert.assertNotNull(secondDecodedValue); - Assert.assertNotNull(secondDecodedValue.param); - Assert.assertEquals("just a parameter", secondDecodedValue.param); - } - - /** Test whenever the {@link ClassAwareKryoCoder} is serializable. */ - @Test - public void testCoderSerialization() throws IOException, ClassNotFoundException { - ClassAwareKryoCoder coder = new ClassAwareKryoCoder<>(ClassToBeEncoded.class); - ByteArrayOutputStream outStr = new ByteArrayOutputStream(); - ObjectOutputStream oss = new ObjectOutputStream(outStr); - - oss.writeObject(coder); - oss.flush(); - oss.close(); - - ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(outStr.toByteArray())); - ClassAwareKryoCoder coderDeserialized = - (ClassAwareKryoCoder) ois.readObject(); - - assertEncoding(coderDeserialized); - } - - @Test - public void testCodingWithKvCoderKeyIsClassAware() throws IOException { - - final ListCoder listCoder = ListCoder.of(VoidCoder.of()); - final KvCoder> kvCoder = - KvCoder.of(ClassAwareKryoCoder.of(TestClass.class), listCoder); - - List inputValue = new ArrayList<>(); - inputValue.add(null); - inputValue.add(null); - - final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - - TestClass inputKey = new TestClass("something"); - kvCoder.encode(KV.of(inputKey, inputValue), byteArrayOutputStream); - - final KV> decoded = - kvCoder.decode(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); - - Assert.assertNotNull(decoded); - Assert.assertNotNull(decoded.getKey()); - Assert.assertEquals(inputKey, decoded.getKey()); - - Assert.assertNotNull(decoded.getValue()); - Assert.assertEquals(inputValue, decoded.getValue()); - } - - @Test - public void testCodingWithKvCoderValueIsClassAware() throws IOException { - - final KvCoder kvCoder = - KvCoder.of(StringUtf8Coder.of(), ClassAwareKryoCoder.of(TestClass.class)); - - final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - - String inputKey = "key"; - TestClass inputValue = new TestClass("something"); - kvCoder.encode(KV.of(inputKey, inputValue), byteArrayOutputStream); - - final KV decoded = - kvCoder.decode(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); - - Assert.assertNotNull(decoded); - Assert.assertNotNull(decoded.getKey()); - Assert.assertEquals(inputKey, decoded.getKey()); - - Assert.assertNotNull(decoded.getValue()); - Assert.assertEquals(inputValue, decoded.getValue()); - } - - @Test - public void testCodingWithKvCoderClassToBeEncoded() throws IOException { - - final ListCoder listCoder = ListCoder.of(VoidCoder.of()); - final KvCoder> kvCoder = - KvCoder.of(ClassAwareKryoCoder.of(ClassToBeEncoded.class), listCoder); - List inputValue = new ArrayList<>(); - inputValue.add(null); - inputValue.add(null); - - final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - - ClassToBeEncoded inputKey = new ClassToBeEncoded("something", 1, 0.2); - kvCoder.encode(KV.of(inputKey, inputValue), byteArrayOutputStream); - - final KV> decoded = - kvCoder.decode(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); - - Assert.assertNotNull(decoded); - Assert.assertNotNull(decoded.getKey()); - Assert.assertEquals(inputKey, decoded.getKey()); - - Assert.assertNotNull(decoded.getValue()); - Assert.assertEquals(inputValue, decoded.getValue()); - } - - private void assertEncoding(ClassAwareKryoCoder coder) throws IOException { - ClassToBeEncoded originalValue = new ClassToBeEncoded("XyZ", 42, Double.NaN); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - - coder.encode(originalValue, outputStream); - - byte[] buf = outputStream.toByteArray(); - ByteArrayInputStream inputStream = new ByteArrayInputStream(buf); - - ClassToBeEncoded decodedValue = coder.decode(inputStream); - - Assert.assertNotNull(decodedValue); - Assert.assertEquals(originalValue, decodedValue); - } - - private static class ClassToBeEncoded { - - private String firstField; - private Integer secondField; - private Double thirdField; - - public ClassToBeEncoded(String firstField, Integer secondField, Double thirdField) { - this.firstField = firstField; - this.secondField = secondField; - this.thirdField = thirdField; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ClassToBeEncoded that = (ClassToBeEncoded) o; - return Objects.equals(firstField, that.firstField) - && Objects.equals(secondField, that.secondField) - && Objects.equals(thirdField, that.thirdField); - } - - @Override - public int hashCode() { - - return Objects.hash(firstField, secondField, thirdField); - } - } - - static class TestClass { - - String param; - - public TestClass(String param) { - this.param = param; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TestClass testClass = (TestClass) o; - return Objects.equals(param, testClass.param); - } - - @Override - public int hashCode() { - - return Objects.hash(param); - } - } -} diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/coder/KryoCoderTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/coder/KryoCoderTest.java index 84aee3740a987..61cf87e21c573 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/coder/KryoCoderTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/coder/KryoCoderTest.java @@ -34,14 +34,15 @@ import org.junit.Assert; import org.junit.Test; -/** Test targeted at {@link KryoCoder}. */ +/** + * Test targeted at {@link KryoCoder}. + */ public class KryoCoderTest { @Test public void testBasicCoding() throws IOException { - IdentifiedRegistrar registrar = - IdentifiedRegistrar.of((k) -> k.register(ClassToBeEncoded.class)); + KryoRegistrar registrar = (k) -> k.register(ClassToBeEncoded.class); KryoCoder coder = KryoCoder.of(registrar); assertEncoding(coder); @@ -50,11 +51,7 @@ public void testBasicCoding() throws IOException { @Test(expected = CoderException.class) public void testWrongRegistrarCoding() throws IOException { - IdentifiedRegistrar registrar = - IdentifiedRegistrar.of( - (k) -> { - /* No-op */ - }); + KryoRegistrar registrar = (k) -> { /* No-op */}; KryoCoder coder = KryoCoder.of(registrar); assertEncoding(coder); @@ -63,13 +60,8 @@ public void testWrongRegistrarCoding() throws IOException { @Test(expected = CoderException.class) public void testWrongRegistrarDecoding() throws IOException { - IdentifiedRegistrar registrarCoding = - IdentifiedRegistrar.of((k) -> k.register(ClassToBeEncoded.class)); - IdentifiedRegistrar registrarDecoding = - IdentifiedRegistrar.of( - (k) -> { - /* No-op */ - }); + KryoRegistrar registrarCoding = (k) -> k.register(ClassToBeEncoded.class); + KryoRegistrar registrarDecoding = (k) -> { /* No-op */}; KryoCoder coderToEncode = KryoCoder.of(registrarCoding); KryoCoder coderToDecode = KryoCoder.of(registrarDecoding); @@ -79,12 +71,10 @@ public void testWrongRegistrarDecoding() throws IOException { @Test public void testCodingOfTwoClassesInSerial() throws IOException { - IdentifiedRegistrar registrar = - IdentifiedRegistrar.of( - (k) -> { - k.register(ClassToBeEncoded.class); - k.register(TestClass.class); - }); + KryoRegistrar registrar = (k) -> { + k.register(ClassToBeEncoded.class); + k.register(TestClass.class); + }; KryoCoder coder = KryoCoder.of(registrar); KryoCoder secondCoder = KryoCoder.of(registrar); @@ -110,11 +100,12 @@ public void testCodingOfTwoClassesInSerial() throws IOException { Assert.assertEquals("just a parameter", secondDecodedValue.param); } - /** Test whenever the {@link KryoCoder} is serializable. */ + /** + * Test whenever the {@link KryoCoder} is serializable. + */ @Test public void testCoderSerialization() throws IOException, ClassNotFoundException { - IdentifiedRegistrar registrar = - IdentifiedRegistrar.of((k) -> k.register(ClassToBeEncoded.class)); + KryoRegistrar registrar = (k) -> k.register(ClassToBeEncoded.class); KryoCoder coder = KryoCoder.of(registrar); ByteArrayOutputStream outStr = new ByteArrayOutputStream(); @@ -130,12 +121,14 @@ public void testCoderSerialization() throws IOException, ClassNotFoundException assertEncoding(coder, coderDeserialized); } + @Test public void testCodingWithKvCoderKeyIsKryoCoder() throws IOException { - IdentifiedRegistrar registrar = IdentifiedRegistrar.of((k) -> k.register(TestClass.class)); + KryoRegistrar registrar = (k) -> k.register(TestClass.class); final ListCoder listCoder = ListCoder.of(VoidCoder.of()); - final KvCoder> kvCoder = KvCoder.of(KryoCoder.of(registrar), listCoder); + final KvCoder> kvCoder = KvCoder + .of(KryoCoder.of(registrar), listCoder); List inputValue = new ArrayList<>(); inputValue.add(null); @@ -146,8 +139,9 @@ public void testCodingWithKvCoderKeyIsKryoCoder() throws IOException { TestClass inputKey = new TestClass("something"); kvCoder.encode(KV.of(inputKey, inputValue), byteArrayOutputStream); - final KV> decoded = - kvCoder.decode(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); + final KV> decoded = kvCoder + .decode(new ByteArrayInputStream(byteArrayOutputStream + .toByteArray())); Assert.assertNotNull(decoded); Assert.assertNotNull(decoded.getKey()); @@ -155,14 +149,15 @@ public void testCodingWithKvCoderKeyIsKryoCoder() throws IOException { Assert.assertNotNull(decoded.getValue()); Assert.assertEquals(inputValue, decoded.getValue()); + } @Test public void testCodingWithKvCoderValueIsKryoCoder() throws IOException { - IdentifiedRegistrar registrar = IdentifiedRegistrar.of((k) -> k.register(TestClass.class)); + KryoRegistrar registrar = (k) -> k.register(TestClass.class); - final KvCoder kvCoder = - KvCoder.of(StringUtf8Coder.of(), KryoCoder.of(registrar)); + final KvCoder kvCoder = KvCoder + .of(StringUtf8Coder.of(), KryoCoder.of(registrar)); final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); @@ -170,8 +165,9 @@ public void testCodingWithKvCoderValueIsKryoCoder() throws IOException { TestClass inputValue = new TestClass("something"); kvCoder.encode(KV.of(inputKey, inputValue), byteArrayOutputStream); - final KV decoded = - kvCoder.decode(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); + final KV decoded = kvCoder + .decode(new ByteArrayInputStream(byteArrayOutputStream + .toByteArray())); Assert.assertNotNull(decoded); Assert.assertNotNull(decoded.getKey()); @@ -183,12 +179,10 @@ public void testCodingWithKvCoderValueIsKryoCoder() throws IOException { @Test public void testCodingWithKvCoderClassToBeEncoded() throws IOException { - IdentifiedRegistrar registrar = - IdentifiedRegistrar.of( - (k) -> { - k.register(TestClass.class); - k.register(ClassToBeEncoded.class); - }); + KryoRegistrar registrar = (k) -> { + k.register(TestClass.class); + k.register(ClassToBeEncoded.class); + }; final ListCoder listCoder = ListCoder.of(VoidCoder.of()); final KvCoder> kvCoder = @@ -202,8 +196,9 @@ public void testCodingWithKvCoderClassToBeEncoded() throws IOException { ClassToBeEncoded inputKey = new ClassToBeEncoded("something", 1, 0.2); kvCoder.encode(KV.of(inputKey, inputValue), byteArrayOutputStream); - final KV> decoded = - kvCoder.decode(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); + final KV> decoded = kvCoder + .decode(new ByteArrayInputStream(byteArrayOutputStream + .toByteArray())); Assert.assertNotNull(decoded); Assert.assertNotNull(decoded.getKey()); @@ -217,9 +212,8 @@ private void assertEncoding(KryoCoder coder) throws IOExceptio assertEncoding(coder, coder); } - private void assertEncoding( - KryoCoder coderToEncode, KryoCoder coderToDecode) - throws IOException { + private void assertEncoding(KryoCoder coderToEncode, + KryoCoder coderToDecode) throws IOException { ClassToBeEncoded originalValue = new ClassToBeEncoded("XyZ", 42, Double.NaN); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/coder/RegisterCodersTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/coder/RegisterCodersTest.java index 36f43f135e6b3..29d75e29e40f2 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/coder/RegisterCodersTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/coder/RegisterCodersTest.java @@ -50,8 +50,8 @@ public void testCodersRegistration() throws CannotProvideCoderException { RegisterCoders.to(flow) .setKryoClassRegistrar( - (k) -> { - k.register(KryoSerializedTestType.class); + (kryo) -> { + kryo.register(KryoSerializedTestType.class); }) .registerCoder(FirstTestDataType.class, firstCoder) .registerCoder(new TypeDescriptor>() {}, parametrizedCoder)