diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java index 31d9db17dafea..112ba17d14429 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java @@ -22,7 +22,6 @@ import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.common.base.MoreObjects; -import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.collect.ImmutableList; import org.joda.time.Instant; @@ -110,12 +109,12 @@ public Instant getSynchronizedProcessingOutputWatermark() { @Override public String toString() { - ToStringHelper toStringHelper = - MoreObjects.toStringHelper(this).add("pcollection", pcollection); - if (keyed) { - toStringHelper = toStringHelper.add("key", key); - } - return toStringHelper.add("elements", committedElements).toString(); + return MoreObjects.toStringHelper(this) + .omitNullValues() + .add("pcollection", pcollection) + .add("key", key) + .add("elements", committedElements) + .toString(); } }; } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java index 5d49c17d83adf..6cd8640d72c85 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -26,10 +26,8 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger; -import com.google.cloud.dataflow.sdk.util.ConcurrentPartitionedPriorityQueue; import com.google.cloud.dataflow.sdk.util.ExecutionContext; import com.google.cloud.dataflow.sdk.util.SideInputReader; import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; @@ -40,20 +38,14 @@ import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PCollectionView; import com.google.cloud.dataflow.sdk.values.PValue; -import com.google.common.collect.ComparisonChain; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.google.common.collect.Ordering; - -import org.joda.time.Instant; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import javax.annotation.Nullable; @@ -85,10 +77,10 @@ class InProcessEvaluationContext { /** The current processing time and event time watermarks and timers. */ private final InMemoryWatermarkManager watermarkManager; - private final ConcurrentPartitionedPriorityQueue, WatermarkCallback> - pendingCallbacks; + /** Executes callbacks based on the progression of the watermark. */ + private final WatermarkCallbackExecutor callbackExecutor; - // The stateInternals of the world, by applied PTransform and key. + /** The stateInternals of the world, by applied PTransform and key. */ private final ConcurrentMap> applicationStateInternals; @@ -96,8 +88,6 @@ class InProcessEvaluationContext { private final CounterSet mergedCounters; - private final Executor callbackExecutor; - public static InProcessEvaluationContext create( InProcessPipelineOptions options, Collection> rootTransforms, @@ -121,7 +111,6 @@ private InProcessEvaluationContext( checkNotNull(views); this.stepNames = stepNames; - this.pendingCallbacks = ConcurrentPartitionedPriorityQueue.create(new CallbackOrdering()); this.watermarkManager = InMemoryWatermarkManager.create( NanosOffsetClock.create(), rootTransforms, valueToConsumers); @@ -130,7 +119,7 @@ private InProcessEvaluationContext( this.applicationStateInternals = new ConcurrentHashMap<>(); this.mergedCounters = new CounterSet(); - this.callbackExecutor = Executors.newCachedThreadPool(); + this.callbackExecutor = WatermarkCallbackExecutor.create(); } /** @@ -161,7 +150,7 @@ public synchronized Iterable> handleResult( result.getTimerUpdate().withCompletedTimers(completedTimers), committedBundles, result.getWatermarkHold()); - watermarksUpdated(); + fireAllAvailableCallbacks(); // Update counters if (result.getCounters() != null) { mergedCounters.merge(result.getCounters()); @@ -200,36 +189,15 @@ private Iterable> commitBundles( return completed.build(); } - private void watermarksUpdated() { + private void fireAllAvailableCallbacks() { for (AppliedPTransform transform : stepNames.keySet()) { - checkCallbacks(transform); + fireAvailableCallbacks(transform); } } - private void checkCallbacks(AppliedPTransform producingTransform) { + private void fireAvailableCallbacks(AppliedPTransform producingTransform) { TransformWatermarks watermarks = watermarkManager.getWatermarks(producingTransform); - ShouldFirePredicate shouldFire = new ShouldFirePredicate(watermarks.getOutputWatermark()); - WatermarkCallback callback; - do { - callback = pendingCallbacks.pollIfSatisfies(producingTransform, shouldFire); - if (callback != null) { - callbackExecutor.execute(callback.getCallback()); - } - } while (callback != null); - } - - private static class ShouldFirePredicate - implements SerializableFunction { - private final Instant currentInstant; - - public ShouldFirePredicate(Instant currentInstant) { - this.currentInstant = currentInstant; - } - - @Override - public Boolean apply(WatermarkCallback input) { - return input.shouldFire(currentInstant); - } + callbackExecutor.fireForWatermark(producingTransform, watermarks.getOutputWatermark()); } /** @@ -273,23 +241,26 @@ public void add(Iterable> values) { } /** - * Execute the specified callback after the watermark for a {@link PValue} passes the point at + * Schedule a callback to be executed after output would be produced for the given window + * if there had been input. + * + *

Output would be produced when the watermark for a {@link PValue} passes the point at * which the trigger for the specified window (with the specified windowing strategy) must have * fired from the perspective of that {@link PValue}, as specified by the value of * {@link Trigger#getWatermarkThatGuaranteesFiring(BoundedWindow)} for the trigger of the * {@link WindowingStrategy}. When the callback has fired, either values will have been produced - * for a key in that window, or the window is empty. + * for a key in that window, or the window is empty. The callback will be executed regardless of + * weather values have been produced. */ - public void callAfterOutputMustHaveBeenProduced( + public void scheduleAfterOutputWouldBeProduced( PValue value, BoundedWindow window, WindowingStrategy windowingStrategy, Runnable runnable) { - WatermarkCallback callback = - WatermarkCallback.onGuaranteedFiring(window, windowingStrategy, runnable); AppliedPTransform producing = getProducing(value); - pendingCallbacks.offer(producing, callback); - checkCallbacks(producing); + callbackExecutor.callOnGuaranteedFiring(producing, window, windowingStrategy, runnable); + + fireAvailableCallbacks(lookupProducing(value)); } private AppliedPTransform getProducing(PValue value) { @@ -351,7 +322,7 @@ public String getStepName(AppliedPTransform application) { * {@link PCollectionView PCollectionViews} */ public SideInputReader createSideInputReader(final List> sideInputs) { - return sideInputContainer.withViews(sideInputs); + return sideInputContainer.createReaderForViews(sideInputs); } /** @@ -374,47 +345,11 @@ public CounterSet getCounters() { return mergedCounters; } - private static class WatermarkCallback { - public static WatermarkCallback onGuaranteedFiring( - BoundedWindow window, WindowingStrategy strategy, Runnable callback) { - @SuppressWarnings("unchecked") - Instant firingAfter = - strategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring((W) window); - return new WatermarkCallback(firingAfter, callback); - } - - private final Instant fireAfter; - private final Runnable callback; - - private WatermarkCallback(Instant fireAfter, Runnable callback) { - this.fireAfter = fireAfter; - this.callback = callback; - } - - public boolean shouldFire(Instant currentWatermark) { - return currentWatermark.isAfter(fireAfter) - || currentWatermark.equals(BoundedWindow.TIMESTAMP_MAX_VALUE); - } - - public Runnable getCallback() { - return callback; - } - } - - private static class CallbackOrdering extends Ordering { - @Override - public int compare(WatermarkCallback left, WatermarkCallback right) { - return ComparisonChain.start() - .compare(left.fireAfter, right.fireAfter) - .compare(left.callback, right.callback, Ordering.arbitrary()) - .result(); - } - } - /** * Extracts all timers that have been fired and have not already been extracted. * - *

This is a destructive operation. Timers will only appear in the result of this method once. + *

This is a destructive operation. Timers will only appear in the result of this method once + * for each time they are set. */ public Map, Map> extractFiredTimers() { return watermarkManager.extractFiredTimers(); diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java index d8469677bb0a0..37c9fcfa653f7 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java @@ -89,7 +89,7 @@ private InProcessSideInputContainer(InProcessEvaluationContext context, * the provided argument. The returned {@link InProcessSideInputContainer} is unmodifiable without * casting, but will change as this {@link InProcessSideInputContainer} is modified. */ - public SideInputReader withViews(Collection> newContainedViews) { + public SideInputReader createReaderForViews(Collection> newContainedViews) { if (!containedViews.containsAll(newContainedViews)) { Set> currentlyContained = ImmutableSet.copyOf(containedViews); Set> newRequested = ImmutableSet.copyOf(newContainedViews); @@ -113,7 +113,7 @@ public void write(PCollectionView view, Iterable> indexValuesByWindow(values); for (Map.Entry>> windowValues : valuesPerWindow.entrySet()) { - setPCollectionViewWindowValues(view, windowValues.getKey(), windowValues.getValue()); + updatePCollectionViewWindowValues(view, windowValues.getKey(), windowValues.getValue()); } } @@ -141,7 +141,7 @@ private Map>> indexValuesByWindow( * specified values, if the values are part of a later pane than currently exist within the * {@link PCollectionViewWindow}. */ - private void setPCollectionViewWindowValues( + private void updatePCollectionViewWindowValues( PCollectionView view, BoundedWindow window, Collection> windowValues) { PCollectionViewWindow windowedView = PCollectionViewWindow.of(view, window); SettableFuture>> future = null; @@ -188,7 +188,7 @@ public T get(final PCollectionView view, final BoundedWindow window) { viewByWindows.get(windowedView); WindowingStrategy windowingStrategy = view.getWindowingStrategyInternal(); - evaluationContext.callAfterOutputMustHaveBeenProduced( + evaluationContext.scheduleAfterOutputWouldBeProduced( view, window, windowingStrategy, new Runnable() { @Override public void run() { diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java index ba5609578ba32..0c8cb7e80aca2 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java @@ -27,6 +27,10 @@ import javax.annotation.Nullable; +/** + * A {@link TransformEvaluatorFactory} that delegates to primitive {@link TransformEvaluatorFactory} + * implementations based on the type of {@link PTransform} of the application. + */ class TransformEvaluatorRegistry implements TransformEvaluatorFactory { public static TransformEvaluatorRegistry defaultRegistry() { @SuppressWarnings("rawtypes") diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java new file mode 100644 index 0000000000000..27d59b9a6487f --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java @@ -0,0 +1,143 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Ordering; + +import org.joda.time.Instant; + +import java.util.PriorityQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Executes callbacks that occur based on the progression of the watermark per-step. + * + *

Callbacks are registered by calls to + * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)}, + * and are executed after a call to {@link #fireForWatermark(AppliedPTransform, Instant)} with the + * same {@link AppliedPTransform} and a watermark sufficient to ensure that the trigger for the + * windowing strategy would have been produced. + * + *

NOTE: {@link WatermarkCallbackExecutor} does not track the latest observed watermark for any + * {@link AppliedPTransform} - any call to + * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)} + * that could have potentially already fired should be followed by a call to + * {@link #fireForWatermark(AppliedPTransform, Instant)} for the same transform with the current + * value of the watermark. + */ +class WatermarkCallbackExecutor { + /** + * Create a new {@link WatermarkCallbackExecutor}. + */ + public static WatermarkCallbackExecutor create() { + return new WatermarkCallbackExecutor(); + } + + private final ConcurrentMap, PriorityQueue> + callbacks; + private final ExecutorService executor; + + private WatermarkCallbackExecutor() { + this.callbacks = new ConcurrentHashMap<>(); + this.executor = Executors.newSingleThreadExecutor(); + } + + /** + * Execute the provided {@link Runnable} after the next call to + * {@link #fireForWatermark(AppliedPTransform, Instant)} where the window is guaranteed to have + * produced output. + */ + public void callOnGuaranteedFiring( + AppliedPTransform step, + BoundedWindow window, + WindowingStrategy windowingStrategy, + Runnable runnable) { + WatermarkCallback callback = + WatermarkCallback.onGuaranteedFiring(window, windowingStrategy, runnable); + + PriorityQueue callbackQueue = callbacks.get(step); + if (callbackQueue == null) { + callbackQueue = new PriorityQueue<>(11, new CallbackOrdering()); + if (callbacks.putIfAbsent(step, callbackQueue) != null) { + callbackQueue = callbacks.get(step); + } + } + + synchronized (callbackQueue) { + callbackQueue.offer(callback); + } + } + + /** + * Schedule all pending callbacks that must have produced output by the time of the provided + * watermark. + */ + public void fireForWatermark(AppliedPTransform step, Instant watermark) { + PriorityQueue callbackQueue = callbacks.get(step); + if (callbackQueue == null) { + return; + } + synchronized (callbackQueue) { + while (!callbackQueue.isEmpty() && callbackQueue.peek().shouldFire(watermark)) { + executor.submit(callbackQueue.poll().getCallback()); + } + } + } + + private static class WatermarkCallback { + public static WatermarkCallback onGuaranteedFiring( + BoundedWindow window, WindowingStrategy strategy, Runnable callback) { + @SuppressWarnings("unchecked") + Instant firingAfter = + strategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring((W) window); + return new WatermarkCallback(firingAfter, callback); + } + + private final Instant fireAfter; + private final Runnable callback; + + private WatermarkCallback(Instant fireAfter, Runnable callback) { + this.fireAfter = fireAfter; + this.callback = callback; + } + + public boolean shouldFire(Instant currentWatermark) { + return currentWatermark.isAfter(fireAfter) + || currentWatermark.equals(BoundedWindow.TIMESTAMP_MAX_VALUE); + } + + public Runnable getCallback() { + return callback; + } + } + + private static class CallbackOrdering extends Ordering { + @Override + public int compare(WatermarkCallback left, WatermarkCallback right) { + return ComparisonChain.start() + .compare(left.fireAfter, right.fireAfter) + .compare(left.callback, right.callback, Ordering.arbitrary()) + .result(); + } + } +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ConcurrentPartitionedPriorityQueue.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ConcurrentPartitionedPriorityQueue.java deleted file mode 100644 index 105842c30c14d..0000000000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ConcurrentPartitionedPriorityQueue.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (C) 2016 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.util; - -import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; -import com.google.common.collect.Ordering; - -import java.util.Comparator; -import java.util.PriorityQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * A {@link PriorityQueue} that is partitioned by key and thread-safe. - */ -public class ConcurrentPartitionedPriorityQueue { - public static ConcurrentPartitionedPriorityQueue create(Comparator comparator) { - return new ConcurrentPartitionedPriorityQueue<>(Ordering.from(comparator)); - } - - private final Ordering ordering; - private final ConcurrentMap> queues; - - private ConcurrentPartitionedPriorityQueue(Ordering ordering) { - this.ordering = ordering; - queues = new ConcurrentHashMap<>(); - } - - /** - * Place the provided value in the priority queue belonging to the key. - */ - public void offer(K key, V value) { - PriorityQueue queue = queues.get(key); - if (queue == null) { - queue = new PriorityQueue<>(ordering); - queues.putIfAbsent(key, queue); - queue = queues.get(key); - } - synchronized (queue) { - queue.offer(value); - } - } - - /** - * Retrieve the head of the queue for the provided key if it satisfies the provided prediate. - */ - public V pollIfSatisfies(K key, SerializableFunction toSatisfy) { - PriorityQueue queue = queues.get(key); - if (queue == null) { - return null; - } - synchronized (queue) { - if (!queue.isEmpty() && toSatisfy.apply(queue.peek())) { - return queue.poll(); - } - return null; - } - } -} diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java index 951ebd8500af7..149096040a3de 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java @@ -20,12 +20,14 @@ import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; import com.google.cloud.dataflow.sdk.coders.VarIntCoder; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers; import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter; import com.google.cloud.dataflow.sdk.testing.TestPipeline; @@ -143,6 +145,35 @@ public void writeToViewWriterThenReadReads() { assertThat(reader.get(view, second), containsInAnyOrder(4444)); } + @Test + public void getExecutionContextSameStepSameKeyState() { + InProcessExecutionContext fooContext = + context.getExecutionContext(created.getProducingTransformInternal(), "foo"); + + StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); + + InProcessStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1", null); + stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1); + + context.handleResult( + InProcessBundle.keyed(created, "foo").commit(Instant.now()), + ImmutableList.of(), + StepTransformResult.withoutHold(created.getProducingTransformInternal()) + .withState(stepContext.commitState()) + .build()); + + InProcessExecutionContext secondFooContext = + context.getExecutionContext(created.getProducingTransformInternal(), "foo"); + assertThat( + secondFooContext + .getOrCreateStepContext("s1", "s1", null) + .stateInternals() + .state(StateNamespaces.global(), intBag) + .read(), + contains(1)); + } + + @Test public void getExecutionContextDifferentKeysIndependentState() { InProcessExecutionContext fooContext = @@ -158,6 +189,7 @@ public void getExecutionContextDifferentKeysIndependentState() { InProcessExecutionContext barContext = context.getExecutionContext(created.getProducingTransformInternal(), "bar"); + assertThat(barContext, not(equalTo(fooContext))); assertThat( barContext .getOrCreateStepContext("s1", "s1", null) @@ -267,7 +299,7 @@ public void run() { }; // Should call back after the end of the global window - context.callAfterOutputMustHaveBeenProduced( + context.scheduleAfterOutputWouldBeProduced( downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback); InProcessTransformResult result = @@ -301,9 +333,8 @@ public void run() { callLatch.countDown(); } }; - context.callAfterOutputMustHaveBeenProduced( + context.scheduleAfterOutputWouldBeProduced( downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback); - // if the callback is not scheduled, this test will never complete. assertThat(callLatch.await(1, TimeUnit.SECONDS), is(true)); } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java index dfcd66f17888b..16b4eb7d07e26 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java @@ -136,7 +136,7 @@ public void getAfterWriteReturnsPaneInWindow() throws Exception { container.write(mapView, ImmutableList.>of(one, two)); Map viewContents = - container.withViews(ImmutableList.>of(mapView)) + container.createReaderForViews(ImmutableList.>of(mapView)) .get(mapView, firstWindow); assertThat(viewContents, hasEntry("one", 1)); assertThat(viewContents, hasEntry("two", 2)); @@ -152,7 +152,7 @@ public void getReturnsLatestPaneInWindow() throws Exception { container.write(mapView, ImmutableList.>of(one, two)); Map viewContents = - container.withViews(ImmutableList.>of(mapView)) + container.createReaderForViews(ImmutableList.>of(mapView)) .get(mapView, secondWindow); assertThat(viewContents, hasEntry("one", 1)); assertThat(viewContents, hasEntry("two", 2)); @@ -163,7 +163,7 @@ public void getReturnsLatestPaneInWindow() throws Exception { container.write(mapView, ImmutableList.>of(three)); Map overwrittenViewContents = - container.withViews(ImmutableList.>of(mapView)) + container.createReaderForViews(ImmutableList.>of(mapView)) .get(mapView, secondWindow); assertThat(overwrittenViewContents, hasEntry("three", 3)); assertThat(overwrittenViewContents.size(), is(1)); @@ -175,15 +175,18 @@ public void getReturnsLatestPaneInWindow() throws Exception { */ @Test public void getBlocksUntilPaneAvailable() throws Exception { - BoundedWindow window = new BoundedWindow() { - @Override - public Instant maxTimestamp() { - return new Instant(1024L); - } - }; + BoundedWindow window = + new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return new Instant(1024L); + } + }; Future singletonFuture = - getFutureOfView(container.withViews(ImmutableList.>of(singletonView)), - singletonView, window); + getFutureOfView( + container.createReaderForViews(ImmutableList.>of(singletonView)), + singletonView, + window); WindowedValue singletonValue = WindowedValue.of(4.75, new Instant(475L), window, PaneInfo.ON_TIME_AND_ONLY_FIRING); @@ -202,7 +205,7 @@ public Instant maxTimestamp() { } }; SideInputReader newReader = - container.withViews(ImmutableList.>of(singletonView)); + container.createReaderForViews(ImmutableList.>of(singletonView)); Future singletonFuture = getFutureOfView(newReader, singletonView, window); WindowedValue singletonValue = @@ -215,25 +218,31 @@ public Instant maxTimestamp() { @Test public void withPCollectionViewsErrorsForContainsNotInViews() { - PCollectionView>> newView = PCollectionViews.multimapView(pipeline, - WindowingStrategy.globalDefault(), KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + PCollectionView>> newView = + PCollectionViews.multimapView( + pipeline, + WindowingStrategy.globalDefault(), + KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("with unknown views " + ImmutableList.of(newView).toString()); - container.withViews(ImmutableList.>of(newView)); + container.createReaderForViews(ImmutableList.>of(newView)); } @Test public void withViewsForViewNotInContainerFails() { - PCollectionView>> newView = PCollectionViews.multimapView(pipeline, - WindowingStrategy.globalDefault(), KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + PCollectionView>> newView = + PCollectionViews.multimapView( + pipeline, + WindowingStrategy.globalDefault(), + KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("unknown views"); thrown.expectMessage(newView.toString()); - container.withViews(ImmutableList.>of(newView)); + container.createReaderForViews(ImmutableList.>of(newView)); } @Test @@ -241,7 +250,7 @@ public void getOnReaderForViewNotInReaderFails() { thrown.expect(IllegalArgumentException.class); thrown.expectMessage("unknown view: " + iterableView.toString()); - container.withViews(ImmutableList.>of(mapView)) + container.createReaderForViews(ImmutableList.>of(mapView)) .get(iterableView, GlobalWindow.INSTANCE); } @@ -254,11 +263,11 @@ public void writeForMultipleElementsInDifferentWindowsSucceeds() throws Exceptio PaneInfo.ON_TIME_AND_ONLY_FIRING); container.write(singletonView, ImmutableList.of(firstWindowedValue, secondWindowedValue)); assertThat( - container.withViews(ImmutableList.>of(singletonView)) + container.createReaderForViews(ImmutableList.>of(singletonView)) .get(singletonView, firstWindow), equalTo(2.875)); assertThat( - container.withViews(ImmutableList.>of(singletonView)) + container.createReaderForViews(ImmutableList.>of(singletonView)) .get(singletonView, secondWindow), equalTo(4.125)); } @@ -273,7 +282,7 @@ public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws Excep container.write(iterableView, ImmutableList.of(firstValue, secondValue)); assertThat( - container.withViews(ImmutableList.>of(iterableView)) + container.createReaderForViews(ImmutableList.>of(iterableView)) .get(iterableView, firstWindow), contains(44, 44)); } @@ -285,11 +294,11 @@ public void writeForElementInMultipleWindowsSucceeds() throws Exception { ImmutableList.of(firstWindow, secondWindow), PaneInfo.ON_TIME_AND_ONLY_FIRING); container.write(singletonView, ImmutableList.of(multiWindowedValue)); assertThat( - container.withViews(ImmutableList.>of(singletonView)) + container.createReaderForViews(ImmutableList.>of(singletonView)) .get(singletonView, firstWindow), equalTo(2.875)); assertThat( - container.withViews(ImmutableList.>of(singletonView)) + container.createReaderForViews(ImmutableList.>of(singletonView)) .get(singletonView, secondWindow), equalTo(2.875)); } @@ -305,7 +314,7 @@ public void finishDoesNotOverwriteWrittenElements() throws Exception { immediatelyInvokeCallback(mapView, secondWindow); Map viewContents = - container.withViews(ImmutableList.>of(mapView)) + container.createReaderForViews(ImmutableList.>of(mapView)) .get(mapView, secondWindow); assertThat(viewContents, hasEntry("one", 1)); @@ -316,8 +325,11 @@ public void finishDoesNotOverwriteWrittenElements() throws Exception { @Test public void finishOnPendingViewsSetsEmptyElements() throws Exception { immediatelyInvokeCallback(mapView, secondWindow); - Future> mapFuture = getFutureOfView( - container.withViews(ImmutableList.>of(mapView)), mapView, secondWindow); + Future> mapFuture = + getFutureOfView( + container.createReaderForViews(ImmutableList.>of(mapView)), + mapView, + secondWindow); assertThat(mapFuture.get().isEmpty(), is(true)); } @@ -328,18 +340,21 @@ public void finishOnPendingViewsSetsEmptyElements() throws Exception { */ private void immediatelyInvokeCallback(PCollectionView view, BoundedWindow window) { doAnswer( - new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - Object callback = invocation.getArguments()[3]; - Runnable callbackRunnable = (Runnable) callback; - callbackRunnable.run(); - return null; - } - }) + new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Object callback = invocation.getArguments()[3]; + Runnable callbackRunnable = (Runnable) callback; + callbackRunnable.run(); + return null; + } + }) .when(context) - .callAfterOutputMustHaveBeenProduced(Mockito.eq(view), Mockito.eq(window), - Mockito.eq(view.getWindowingStrategyInternal()), Mockito.any(Runnable.class)); + .scheduleAfterOutputWouldBeProduced( + Mockito.eq(view), + Mockito.eq(window), + Mockito.eq(view.getWindowingStrategyInternal()), + Mockito.any(Runnable.class)); } private Future getFutureOfView(final SideInputReader myReader, diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java new file mode 100644 index 0000000000000..be3e062fbde18 --- /dev/null +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java @@ -0,0 +1,126 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.Sum; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Tests for {@link WatermarkCallbackExecutor}. + */ +@RunWith(JUnit4.class) +public class WatermarkCallbackExecutorTest { + private WatermarkCallbackExecutor executor = WatermarkCallbackExecutor.create(); + private AppliedPTransform create; + private AppliedPTransform sum; + + @Before + public void setup() { + TestPipeline p = TestPipeline.create(); + PCollection created = p.apply(Create.of(1, 2, 3)); + create = created.getProducingTransformInternal(); + sum = created.apply(Sum.integersGlobally()).getProducingTransformInternal(); + } + + @Test + public void onGuaranteedFiringFiresAfterTrigger() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + executor.callOnGuaranteedFiring( + create, + GlobalWindow.INSTANCE, + WindowingStrategy.globalDefault(), + new CountDownLatchCallback(latch)); + + executor.fireForWatermark(create, BoundedWindow.TIMESTAMP_MAX_VALUE); + assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(true)); + } + + @Test + public void multipleCallbacksShouldFireFires() throws Exception { + CountDownLatch latch = new CountDownLatch(2); + WindowFn windowFn = FixedWindows.of(Duration.standardMinutes(10)); + IntervalWindow window = + new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10))); + executor.callOnGuaranteedFiring( + create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch)); + executor.callOnGuaranteedFiring( + create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch)); + + executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(10))); + assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(true)); + } + + @Test + public void noCallbacksShouldFire() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + WindowFn windowFn = FixedWindows.of(Duration.standardMinutes(10)); + IntervalWindow window = + new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10))); + executor.callOnGuaranteedFiring( + create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch)); + + executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(5))); + assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(false)); + } + + @Test + public void unrelatedStepShouldNotFire() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + WindowFn windowFn = FixedWindows.of(Duration.standardMinutes(10)); + IntervalWindow window = + new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10))); + executor.callOnGuaranteedFiring( + sum, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch)); + + executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(20))); + assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(false)); + } + + private static class CountDownLatchCallback implements Runnable { + private final CountDownLatch latch; + + public CountDownLatchCallback(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void run() { + latch.countDown(); + } + } +}