diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java index 1c0279897aac4..2a164c3518d86 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java @@ -18,7 +18,6 @@ import com.google.cloud.dataflow.sdk.io.Read.Bounded; import com.google.cloud.dataflow.sdk.io.Source.Reader; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.PTransform; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java new file mode 100644 index 0000000000000..9e310711d1f89 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.options.DefaultValueFactory; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * A {@link DefaultValueFactory} that produces cached thread pools via + * {@link Executors#newCachedThreadPool()}. + */ +class CachedThreadPoolExecutorServiceFactory + implements DefaultValueFactory { + @Override + public ExecutorService create(PipelineOptions options) { + return Executors.newCachedThreadPool(); + } +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java new file mode 100644 index 0000000000000..2792631560773 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; + +/** + * A callback for completing a bundle of input. + */ +interface CompletionCallback { + /** + * Handle a successful result. + */ + void handleResult(CommittedBundle inputBundle, InProcessTransformResult result); + + /** + * Handle a result that terminated abnormally due to the provided {@link Throwable}. + */ + void handleThrowable(CommittedBundle inputBundle, Throwable t); +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java new file mode 100644 index 0000000000000..2a6a8103809c8 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor; +import com.google.cloud.dataflow.sdk.runners.TransformTreeNode; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.cloud.dataflow.sdk.values.PValue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue} in the + * {@link Pipeline}. This is used to schedule consuming {@link PTransform PTransforms} to consume + * input after the upstream transform has produced and committed output. + */ +public class ConsumerTrackingPipelineVisitor implements PipelineVisitor { + private Map>> valueToConsumers = new HashMap<>(); + private Collection> rootTransforms = new ArrayList<>(); + private Collection> views = new ArrayList<>(); + private Map, String> stepNames = new HashMap<>(); + private Set toFinalize = new HashSet<>(); + private int numTransforms = 0; + + @Override + public void enterCompositeTransform(TransformTreeNode node) {} + + @Override + public void leaveCompositeTransform(TransformTreeNode node) {} + + @Override + public void visitTransform(TransformTreeNode node) { + toFinalize.removeAll(node.getInputs().keySet()); + AppliedPTransform appliedTransform = getAppliedTransform(node); + if (node.getInput().expand().isEmpty()) { + rootTransforms.add(appliedTransform); + } else { + for (PValue value : node.getInputs().keySet()) { + valueToConsumers.get(value).add(appliedTransform); + stepNames.put(appliedTransform, genStepName()); + } + } + } + + private AppliedPTransform getAppliedTransform(TransformTreeNode node) { + @SuppressWarnings({"rawtypes", "unchecked"}) + AppliedPTransform application = AppliedPTransform.of( + node.getFullName(), node.getInput(), node.getOutput(), (PTransform) node.getTransform()); + return application; + } + + @Override + public void visitValue(PValue value, TransformTreeNode producer) { + toFinalize.add(value); + for (PValue expandedValue : value.expand()) { + valueToConsumers.put(expandedValue, new ArrayList>()); + if (expandedValue instanceof PCollectionView) { + views.add((PCollectionView) expandedValue); + } + expandedValue.recordAsOutput(getAppliedTransform(producer)); + } + value.recordAsOutput(getAppliedTransform(producer)); + } + + private String genStepName() { + return String.format("s%s", numTransforms++); + } + + public Map>> getValueToConsumers() { + return valueToConsumers; + } + + public Map, String> getStepNames() { + return stepNames; + } + + public Collection> getRootTransforms() { + return rootTransforms; + } + + public Collection> getViews() { + return views; + } + + public Map>> getValueToCustomers() { + return valueToConsumers; + } + + /** + * @return + */ + public Set getUnfinalizedPValues() { + return toFinalize; + } +} + + diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java index 745f8f2718a33..307bc5cdb5520 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java @@ -15,7 +15,6 @@ */ package com.google.cloud.dataflow.sdk.runners.inprocess; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import java.util.Objects; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java new file mode 100644 index 0000000000000..05cf55035856e --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java @@ -0,0 +1,361 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessExecutor; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.util.KeyedWorkItem; +import com.google.cloud.dataflow.sdk.util.KeyedWorkItems; +import com.google.cloud.dataflow.sdk.util.TimeDomain; +import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.base.MoreObjects; +import com.google.common.base.Optional; + +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; + +import javax.annotation.Nullable; + +/** + * An {@link InProcessExecutor} that uses an underlying {@link ExecutorService} and + * {@link InProcessEvaluationContext} to execute a {@link Pipeline}. + */ +final class ExecutorServiceParallelExecutor implements InProcessExecutor { + private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class); + + private final ExecutorService executorService; + private final InProcessEvaluationContext evaluationContext; + private final Collection> incompleteRootNodes; + + private final ConcurrentMap currentEvaluations; + private final ConcurrentMap, Boolean> scheduledExecutors; + + private final Queue allUpdates; + private final BlockingQueue visibleUpdates; + + private final CompletionCallback defaultCompletionCallback; + + public static ExecutorServiceParallelExecutor create( + ExecutorService executorService, InProcessEvaluationContext context) { + return new ExecutorServiceParallelExecutor(executorService, context); + } + + private ExecutorServiceParallelExecutor( + ExecutorService executorService, InProcessEvaluationContext context) { + this.executorService = executorService; + this.evaluationContext = context; + this.incompleteRootNodes = new CopyOnWriteArrayList<>(); + + currentEvaluations = new ConcurrentHashMap<>(); + scheduledExecutors = new ConcurrentHashMap<>(); + + this.allUpdates = new ConcurrentLinkedQueue<>(); + this.visibleUpdates = new ArrayBlockingQueue<>(20); + + defaultCompletionCallback = new TimerlessCompletionCallback(); + } + + @Override + public void start(Collection> roots) { + incompleteRootNodes.addAll(roots); + Runnable monitorRunnable = new MonitorRunnable(); + executorService.submit(monitorRunnable); + } + + @SuppressWarnings("unchecked") + public void scheduleConsumption(AppliedPTransform consumer, CommittedBundle bundle, + CompletionCallback onComplete) { + evaluateBundle(consumer, bundle, onComplete); + } + + private void evaluateBundle(final AppliedPTransform transform, + @Nullable final CommittedBundle bundle, final CompletionCallback onComplete) { + final StepAndKey stepAndKey = StepAndKey.of(transform, bundle == null ? null : bundle.getKey()); + TransformExecutorService state = + getStepAndKeyExecutorService(stepAndKey, bundle == null ? true : !bundle.isKeyed()); + TransformExecutor callable = + TransformExecutor.create(evaluationContext, bundle, transform, onComplete, state); + state.schedule(callable); + } + + private void scheduleConsumers(CommittedBundle bundle) { + for (AppliedPTransform consumer : + evaluationContext.getConsumers(bundle.getPCollection())) { + scheduleConsumption(consumer, bundle, defaultCompletionCallback); + } + } + + private TransformExecutorService getStepAndKeyExecutorService( + StepAndKey stepAndKey, boolean parallelizable) { + if (!currentEvaluations.containsKey(stepAndKey)) { + TransformExecutorService evaluationState = + parallelizable + ? TransformExecutorServices.parallel(executorService, scheduledExecutors) + : TransformExecutorServices.serial(executorService, scheduledExecutors); + currentEvaluations.putIfAbsent(stepAndKey, evaluationState); + } + return currentEvaluations.get(stepAndKey); + } + + @Override + public void awaitCompletion() throws Throwable { + VisibleExecutorUpdate update; + do { + update = visibleUpdates.take(); + if (update.throwable.isPresent()) { + if (update.throwable.get() instanceof Exception) { + throw update.throwable.get(); + } else { + throw update.throwable.get(); + } + } + } while (!update.isDone()); + executorService.shutdown(); + } + + private class TimerlessCompletionCallback implements CompletionCallback { + @Override + public void handleResult(CommittedBundle inputBundle, InProcessTransformResult result) { + Iterable> resultBundles = + evaluationContext.handleResult(inputBundle, Collections.emptyList(), result); + for (CommittedBundle outputBundle : resultBundles) { + allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); + } + } + + @Override + public void handleThrowable(CommittedBundle inputBundle, Throwable t) { + allUpdates.offer(ExecutorUpdate.fromThrowable(t)); + } + } + + private class TimerCompletionCallback implements CompletionCallback { + private final Iterable timers; + + private TimerCompletionCallback(Iterable timers) { + this.timers = timers; + } + + @Override + public void handleResult(CommittedBundle inputBundle, InProcessTransformResult result) { + Iterable> resultBundles = + evaluationContext.handleResult(inputBundle, timers, result); + for (CommittedBundle outputBundle : resultBundles) { + allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); + } + } + + @Override + public void handleThrowable(CommittedBundle inputBundle, Throwable t) { + allUpdates.offer(ExecutorUpdate.fromThrowable(t)); + } + } + + /** + * An internal status update on the state of the executor. + * + * Used to signal when the executor should be shut down (due to an exception). + */ + private static class ExecutorUpdate { + private final Optional> bundle; + private final Optional throwable; + + public static ExecutorUpdate fromBundle(CommittedBundle bundle) { + return new ExecutorUpdate(bundle, null); + } + + public static ExecutorUpdate fromThrowable(Throwable t) { + return new ExecutorUpdate(null, t); + } + + private ExecutorUpdate(CommittedBundle producedBundle, Throwable throwable) { + this.bundle = Optional.fromNullable(producedBundle); + this.throwable = Optional.fromNullable(throwable); + } + + public Optional> getBundle() { + return bundle; + } + + public Optional getException() { + return throwable; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(ExecutorUpdate.class) + .add("bundle", bundle).add("exception", throwable) + .toString(); + } + } + + /** + * An update of interest to the user. Used in {@link #awaitCompletion} to decide weather to + * return normally or throw an exception. + */ + private static class VisibleExecutorUpdate { + private final Optional throwable; + private final boolean done; + + public static VisibleExecutorUpdate fromThrowable(Throwable e) { + return new VisibleExecutorUpdate(false, e); + } + + public static VisibleExecutorUpdate finished() { + return new VisibleExecutorUpdate(true, null); + } + + private VisibleExecutorUpdate(boolean done, @Nullable Throwable exception) { + this.throwable = Optional.fromNullable(exception); + this.done = done; + } + + public boolean isDone() { + return done; + } + } + + private class MonitorRunnable implements Runnable { + @Override + public void run() { + Thread.currentThread() + .setName( + String.format( + "%s$%s-monitor", + evaluationContext.getPipelineOptions().getAppName(), + ExecutorServiceParallelExecutor.class.getSimpleName())); + try { + while (true) { + ExecutorUpdate update = allUpdates.poll(); + if (update != null) { + LOG.debug("Executor Update: {}", update); + if (update.getBundle().isPresent()) { + scheduleConsumers(update.getBundle().get()); + } else if (update.getException().isPresent()) { + visibleUpdates.offer( + VisibleExecutorUpdate.fromThrowable(update.getException().get())); + } + } else { + Thread.sleep(50L); + } + mightNeedMoreWork(); + fireTimers(); + if (finishIfDone()) { + break; + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Monitor died due to being interrupted"); + while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(e))) { + visibleUpdates.poll(); + } + } catch (Throwable t) { + LOG.error("Monitor thread died due to throwable", t); + while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(t))) { + visibleUpdates.poll(); + } + } + } + + private void addMoreWork(Collection> activeRoots) { + for (AppliedPTransform activeRoot : activeRoots) { + scheduleConsumption(activeRoot, null, defaultCompletionCallback); + } + } + + public void fireTimers() throws Exception { + try { + for (Map.Entry, Map> transformTimers : + evaluationContext.extractFiredTimers().entrySet()) { + AppliedPTransform transform = transformTimers.getKey(); + for (Map.Entry keyTimers : transformTimers.getValue().entrySet()) { + for (TimeDomain domain : TimeDomain.values()) { + Collection delivery = keyTimers.getValue().getTimers(domain); + if (delivery.isEmpty()) { + continue; + } + KeyedWorkItem work = + KeyedWorkItems.timersWorkItem(keyTimers.getKey(), delivery); + @SuppressWarnings({"unchecked", "rawtypes"}) + CommittedBundle bundle = + InProcessBundle + .>keyed( + (PCollection) transform.getInput(), keyTimers.getKey()) + .add(WindowedValue.valueInEmptyWindows(work)) + .commit(Instant.now()); + scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery)); + } + } + } + } catch (Exception e) { + LOG.error("Internal Error while delivering timers", e); + throw e; + } + } + + private boolean finishIfDone() { + if (evaluationContext.isDone()) { + LOG.debug("Pipeline is finished. Shutting down. {}"); + while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) { + visibleUpdates.poll(); + } + executorService.shutdown(); + return true; + } + return false; + } + + private void mightNeedMoreWork() { + synchronized (scheduledExecutors) { + for (TransformExecutor executor : scheduledExecutors.keySet()) { + Thread thread = executor.getThread(); + if (thread != null) { + switch (thread.getState()) { + case BLOCKED: + case WAITING: + case TERMINATED: + case TIMED_WAITING: + break; + default: + return; + } + } + } + } + addMoreWork(incompleteRootNodes); + } + } +} + diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java index 14428888e2b59..bde1df45e9b43 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java @@ -16,7 +16,6 @@ package com.google.cloud.dataflow.sdk.runners.inprocess; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.Flatten; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java index 0347281749cb1..ec63be84c9f10 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java @@ -22,7 +22,6 @@ import com.google.cloud.dataflow.sdk.coders.IterableCoder; import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.StepTransformResult.Builder; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java index e280e22d2bb99..7cf53aafe6d25 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java @@ -1209,8 +1209,11 @@ public TimerUpdateBuilder deletedTimer(TimerData deletedTimer) { * and deletedTimers. */ public TimerUpdate build() { - return new TimerUpdate(key, ImmutableSet.copyOf(completedTimers), - ImmutableSet.copyOf(setTimers), ImmutableSet.copyOf(deletedTimers)); + return new TimerUpdate( + key, + ImmutableSet.copyOf(completedTimers), + ImmutableSet.copyOf(setTimers), + ImmutableSet.copyOf(deletedTimers)); } } @@ -1245,6 +1248,13 @@ Iterable getDeletedTimers() { return deletedTimers; } + /** + * Returns a {@link TimerUpdate} that is like this one, but with the specified completed timers. + */ + public TimerUpdate withCompletedTimers(Iterable completedTimers) { + return new TimerUpdate(this.key, completedTimers, setTimers, deletedTimers); + } + @Override public int hashCode() { return Objects.hash(key, completedTimers, setTimers, deletedTimers); diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java index cc20161097e8f..31d9db17dafea 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015 Google Inc. + * Copyright (C) 2016 Google Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -64,6 +64,11 @@ private InProcessBundle(PCollection pcollection, boolean keyed, Object key) { this.elements = ImmutableList.builder(); } + @Override + public PCollection getPCollection() { + return pcollection; + } + @Override public InProcessBundle add(WindowedValue element) { checkState(!committed, "Can't add element %s to committed bundle %s", element, this); @@ -110,7 +115,7 @@ public String toString() { if (keyed) { toStringHelper = toStringHelper.add("key", key); } - return toStringHelper.add("elements", elements).toString(); + return toStringHelper.add("elements", committedElements).toString(); } }; } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java new file mode 100644 index 0000000000000..00a2310e6cf91 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -0,0 +1,435 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly; +import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers; +import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TransformWatermarks; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger; +import com.google.cloud.dataflow.sdk.util.ExecutionContext; +import com.google.cloud.dataflow.sdk.util.SideInputReader; +import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.util.common.CounterSet; +import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.cloud.dataflow.sdk.values.PValue; +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; + +import org.joda.time.Instant; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +import javax.annotation.Nullable; + +/** + * The evaluation context for the {@link InProcessPipelineRunner}. Contains state shared within + * the current evaluation. + */ +class InProcessEvaluationContext { + private final Set> allTransforms; + private final Map>> valueToConsumers; + private final Map, String> stepNames; + + private final InProcessPipelineOptions options; + private final TransformEvaluatorRegistry registry; + private final InMemoryWatermarkManager watermarkManager; + private final ConcurrentMap, PriorityQueue> + pendingCallbacks; + + // The stateInternals of the world, by applied PTransform and key. + private final ConcurrentMap> + applicationStateInternals; + + private final InProcessSideInputContainer sideInputContainer; + + private final CounterSet mergedCounters; + + private final Executor callbackExecutor; + + public static InProcessEvaluationContext create(InProcessPipelineOptions options, + TransformEvaluatorRegistry evaluatorRegistry, + Collection> rootTransforms, + Map>> valueToConsumers, + Map, String> stepNames, Collection> views) { + return new InProcessEvaluationContext( + options, evaluatorRegistry, rootTransforms, valueToConsumers, stepNames, views); + } + + private InProcessEvaluationContext(InProcessPipelineOptions options, + TransformEvaluatorRegistry registry, + Collection> rootTransforms, + Map>> valueToConsumers, + Map, String> stepNames, Collection> views) { + this.options = checkNotNull(options); + this.registry = registry; + checkNotNull(rootTransforms); + checkNotNull(valueToConsumers); + checkNotNull(stepNames); + checkNotNull(views); + + this.allTransforms = + ImmutableSet.>builder() + .addAll(rootTransforms) + .addAll(Iterables.concat(valueToConsumers.values())) + .build(); + this.valueToConsumers = valueToConsumers; + this.stepNames = stepNames; + + this.pendingCallbacks = new ConcurrentHashMap<>(); + this.watermarkManager = InMemoryWatermarkManager.create( + NanosOffsetClock.create(), rootTransforms, valueToConsumers); + this.sideInputContainer = InProcessSideInputContainer.create(this, views); + + this.applicationStateInternals = new ConcurrentHashMap<>(); + this.mergedCounters = new CounterSet(); + + this.callbackExecutor = Executors.newCachedThreadPool(); + } + + /** + * Handle the provided {@link InProcessTransformResult}, produced after evaluating the provided + * {@link CommittedBundle} (potentially null, if the result of a root {@link PTransform}). + * + *

The result is the output of running the transform contained in the + * {@link InProcessTransformResult result} on the contents of the provided bundle. + * + * @param completedBundle the bundle that was processed to produce the result. Potentially + * {@code null} if a root transform + * @param completedTimers the timers that were delivered to produce the {@code completedBundle}, + * or an empty iterable if no timers were delivered + * @param result the result of evaluating the input bundle + */ + public synchronized Iterable> handleResult( + @Nullable CommittedBundle completedBundle, + Iterable completedTimers, + InProcessTransformResult result) { + Iterable> committedBundles = + commitBundles(result.getOutputBundles()); + // Update watermarks and timers + watermarkManager.updateWatermarks( + completedBundle, + result.getTransform(), + result.getTimerUpdate().withCompletedTimers(completedTimers), + committedBundles, + result.getWatermarkHold()); + watermarksUpdated(); + // Update counters + if (result.getCounters() != null) { + mergedCounters.merge(result.getCounters()); + } + // Update state internals + CopyOnAccessInMemoryStateInternals theirState = result.getState(); + if (theirState != null) { + CopyOnAccessInMemoryStateInternals committedState = theirState.commit(); + StepAndKey stepAndKey = + StepAndKey.of( + result.getTransform(), completedBundle == null ? null : completedBundle.getKey()); + if (!committedState.isEmpty()) { + applicationStateInternals.put(stepAndKey, committedState); + } else { + applicationStateInternals.remove(stepAndKey); + } + } + return committedBundles; + } + + private Iterable> commitBundles( + Iterable> bundles) { + ImmutableList.Builder> completed = ImmutableList.builder(); + for (UncommittedBundle inProgress : bundles) { + AppliedPTransform producing = + inProgress.getPCollection().getProducingTransformInternal(); + TransformWatermarks watermarks = watermarkManager.getWatermarks(producing); + CommittedBundle committed = + inProgress.commit(watermarks.getSynchronizedProcessingOutputTime()); + if (committed.getElements().iterator().hasNext()) { + // Empty bundles don't impact watermarks and shouldn't trigger downstream execution + completed.add(committed); + } + } + return completed.build(); + } + + private void watermarksUpdated() { + for ( + Map.Entry, PriorityQueue> transformCallbacks : + pendingCallbacks.entrySet()) { + checkCallbacks(transformCallbacks.getKey(), transformCallbacks.getValue()); + } + } + + private void checkCallbacks( + AppliedPTransform producingTransform, + PriorityQueue pendingTransformCallbacks) { + TransformWatermarks watermarks = watermarkManager.getWatermarks(producingTransform); + synchronized (pendingTransformCallbacks) { + while (!pendingTransformCallbacks.isEmpty() + && pendingTransformCallbacks.peek().shouldFire(watermarks.getOutputWatermark())) { + callbackExecutor.execute(pendingTransformCallbacks.poll().getCallback()); + } + } + } + + /** + * Create a {@link UncommittedBundle} for use by a source. + */ + public UncommittedBundle createRootBundle(PCollection output) { + return InProcessBundle.unkeyed(output); + } + + /** + * Create a {@link UncommittedBundle} whose elements belong to the specified {@link + * PCollection}. + */ + public UncommittedBundle createBundle(CommittedBundle input, PCollection output) { + if (input.isKeyed()) { + return InProcessBundle.keyed(output, input.getKey()); + } else { + return InProcessBundle.unkeyed(output); + } + } + + /** + * Create a {@link UncommittedBundle} with the specified keys at the specified step. For use by + * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}. + */ + public UncommittedBundle createKeyedBundle( + CommittedBundle input, Object key, PCollection output) { + return InProcessBundle.keyed(output, key); + } + + /** + * Create a {@link PCollectionViewWriter}, whose elements will be used in the provided + * {@link PCollectionView}. + */ + public PCollectionViewWriter createPCollectionViewWriter( + PCollection> input, final PCollectionView output) { + return new PCollectionViewWriter() { + @Override + public void add(Iterable> values) { + sideInputContainer.write(output, values); + } + }; + } + + /** + * Schedules a callback after the watermark for a {@link PValue} after the trigger for the + * specified window (with the specified windowing strategy) must have fired from the perspective + * of that {@link PValue}, as specified by the value of + * {@link Trigger#getWatermarkThatGuaranteesFiring(BoundedWindow)} for the trigger of the + * {@link WindowingStrategy}. + */ + public void callAfterOutputMustHaveBeenProduced( + PValue value, + BoundedWindow window, + WindowingStrategy windowingStrategy, + Runnable runnable) { + WatermarkCallback callback = + WatermarkCallback.onWindowClose(window, windowingStrategy, runnable); + AppliedPTransform producing = getProducing(value); + PriorityQueue callbacks = pendingCallbacks.get(producing); + if (callbacks == null) { + PriorityQueue newCallbacks = + new PriorityQueue<>(10, new CallbackOrdering()); + pendingCallbacks.putIfAbsent(producing, newCallbacks); + callbacks = pendingCallbacks.get(producing); + } + synchronized (callbacks) { + callbacks.offer(callback); + } + checkCallbacks(producing, callbacks); + } + + private AppliedPTransform getProducing(PValue value) { + if (value.getProducingTransformInternal() != null) { + return value.getProducingTransformInternal(); + } + return lookupProducing(value); + } + + private AppliedPTransform lookupProducing(PValue value) { + for (AppliedPTransform transform : allTransforms) { + if (transform.getOutput().equals(value) || transform.getOutput().expand().contains(value)) { + return transform; + } + } + return null; + } + + /** + * Get the options used by this {@link Pipeline}. + */ + public InProcessPipelineOptions getPipelineOptions() { + return options; + } + + /** + * Get an {@link ExecutionContext} for the provided {@link AppliedPTransform} and key. + */ + public InProcessExecutionContext getExecutionContext( + AppliedPTransform application, Object key) { + StepAndKey stepAndKey = StepAndKey.of(application, key); + return new InProcessExecutionContext(options.getClock(), key, + (CopyOnAccessInMemoryStateInternals) applicationStateInternals.get(stepAndKey), + watermarkManager.getWatermarks(application)); + } + + /** + * Get all of the steps used in this {@link Pipeline}. + */ + public Collection> getSteps() { + return allTransforms; + } + + /** + * Get the Step Name for the provided application. + */ + public String getStepName(AppliedPTransform application) { + return stepNames.get(application); + } + + /** + * Returns a {@link SideInputReader} capable of reading the provided + * {@link PCollectionView PCollectionViews}. + * @param sideInputs the {@link PCollectionView PCollectionViews} the result should be able to + * read + * @return a {@link SideInputReader} that can read all of the provided + * {@link PCollectionView PCollectionViews} + */ + public SideInputReader createSideInputReader(final List> sideInputs) { + return sideInputContainer.withViews(sideInputs); + } + + /** + * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent + * of all other {@link CounterSet CounterSets} created by this call. + * + * The {@link InProcessEvaluationContext} is responsible for unifying the counters present in + * all created {@link CounterSet CounterSets} when the transforms that call this method + * complete. + */ + public CounterSet createCounterSet() { + return new CounterSet(); + } + + /** + * Returns all of the counters that have been merged into this context via calls to + * {@link CounterSet#merge(CounterSet)}. + */ + public CounterSet getCounters() { + return mergedCounters; + } + + /** + * Gets all of the {@link AppliedPTransform transform applications} that consume the provided + * {@link PValue} in this {@link Pipeline}. + * + *

Each returned {@link AppliedPTransform} will contain the provide {@link PValue} in its + * expanded output (i.e., the result of + * {@code application.getInput().expand().contains(pvalue) == true} for each returned value.) + */ + public Collection> getConsumers(PValue pvalue) { + return valueToConsumers.get(pvalue); + } + + /** + * Gets a transform evaluator capable of evaluating the provided {@link AppliedPTransform} on the + * provided {@link CommittedBundle Bundle}. + * + * @throws Exception if constructing the evaluator throws an exception + */ + public TransformEvaluator getTransformEvaluator( + AppliedPTransform transform, CommittedBundle inputBundle) throws Exception { + return registry.forApplication(transform, inputBundle, this); + } + + private static class WatermarkCallback { + public static WatermarkCallback onWindowClose( + BoundedWindow window, WindowingStrategy strategy, Runnable callback) { + @SuppressWarnings("unchecked") + Instant firingAfter = + strategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring((W) window); + return new WatermarkCallback(firingAfter, callback); + } + + private final Instant fireAfter; + private final Runnable callback; + + private WatermarkCallback(Instant fireAfter, Runnable callback) { + this.fireAfter = fireAfter; + this.callback = callback; + } + + public boolean shouldFire(Instant currentWatermark) { + return currentWatermark.isAfter(fireAfter) + || currentWatermark.equals(BoundedWindow.TIMESTAMP_MAX_VALUE); + } + + public Runnable getCallback() { + return callback; + } + } + + private static class CallbackOrdering extends Ordering { + @Override + public int compare(WatermarkCallback left, WatermarkCallback right) { + return ComparisonChain.start() + .compare(left.fireAfter, right.fireAfter) + .compare(left.callback, right.callback, Ordering.arbitrary()) + .result(); + } + } + + /** + * Extracts all timers that have been fired and have not already been extracted. + * + *

This is a destructive operation. Timers will only appear in the result of this method once. + */ + public Map, Map> extractFiredTimers() { + return watermarkManager.extractFiredTimers(); + } + + /** + * Returns true if all steps are done. + */ + public boolean isDone() { + return watermarkManager.isDone(); + } +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java index d659d962f0e5b..1825da0b79edc 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java @@ -15,10 +15,45 @@ */ package com.google.cloud.dataflow.sdk.runners.inprocess; +import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.Validation.Required; + +import com.fasterxml.jackson.annotation.JsonIgnore; + +import java.util.concurrent.ExecutorService; /** * Options that can be used to configure the {@link InProcessPipelineRunner}. */ -public interface InProcessPipelineOptions extends PipelineOptions {} +public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNameOptions { + @JsonIgnore + @Default.InstanceFactory(CachedThreadPoolExecutorServiceFactory.class) + ExecutorService getExecutorService(); + + void setExecutorService(ExecutorService executorService); + + /** + * Gets the {@link Clock} used by this pipeline. The clock is used in place of accessing the + * system time when time values are required by the evaluator. + */ + @Default.InstanceFactory(NanosOffsetClock.Factory.class) + @Required + @Description( + "The processing time source used by the pipeline. When the current time is " + + "needed by the evaluator, the result of clock#now() is used.") + Clock getClock(); + + void setClock(Clock clock); + + @Default.Boolean(false) + @Description("If the pipelien should block awaiting completion of the pipeline. If set to true, " + + "a call to Pipeline#run() will block until all PTransforms are complete. Otherwise, the " + + "Pipeline will execute asynchronously. If set to false, the completion of the pipeline can " + + "be awaited on by use of InProcessPipelineResult#awaitCompletion().") + boolean isBlockOnRun(); + void setBlockOnRun(boolean b); +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java index 124de46b94769..ae1baecfd92be 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015 Google Inc. + * Copyright (C) 2016 Google Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -15,35 +15,44 @@ */ package com.google.cloud.dataflow.sdk.runners.inprocess; -import static com.google.common.base.Preconditions.checkArgument; - import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.Pipeline.PipelineExecutionException; +import com.google.cloud.dataflow.sdk.PipelineResult; import com.google.cloud.dataflow.sdk.annotations.Experimental; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.runners.AggregatorPipelineExtractor; +import com.google.cloud.dataflow.sdk.runners.AggregatorRetrievalException; +import com.google.cloud.dataflow.sdk.runners.AggregatorValues; +import com.google.cloud.dataflow.sdk.runners.PipelineRunner; import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKey; -import com.google.cloud.dataflow.sdk.runners.inprocess.ViewEvaluatorFactory.InProcessCreatePCollectionView; +import com.google.cloud.dataflow.sdk.transforms.Aggregator; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.GroupByKey; -import com.google.cloud.dataflow.sdk.transforms.GroupByKey.GroupByKeyOnly; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger; -import com.google.cloud.dataflow.sdk.util.ExecutionContext; -import com.google.cloud.dataflow.sdk.util.SideInputReader; +import com.google.cloud.dataflow.sdk.util.InstanceBuilder; +import com.google.cloud.dataflow.sdk.util.MapAggregatorValues; import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; +import com.google.cloud.dataflow.sdk.util.UserCodeException; import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.util.common.Counter; import com.google.cloud.dataflow.sdk.util.common.CounterSet; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.cloud.dataflow.sdk.values.PInput; +import com.google.cloud.dataflow.sdk.values.POutput; import com.google.cloud.dataflow.sdk.values.PValue; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import org.joda.time.Instant; -import java.util.List; +import java.util.Collection; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import javax.annotation.Nullable; @@ -52,28 +61,19 @@ * {@link PCollection PCollections}. */ @Experimental -public class InProcessPipelineRunner { - @SuppressWarnings({"rawtypes", "unused"}) +public class InProcessPipelineRunner + extends PipelineRunner { + @SuppressWarnings("rawtypes") private static Map, Class> defaultTransformOverrides = ImmutableMap., Class>builder() + .put(Create.Values.class, InProcessCreate.class) .put(GroupByKey.class, InProcessGroupByKey.class) - .put(CreatePCollectionView.class, InProcessCreatePCollectionView.class) + .put( + CreatePCollectionView.class, + ViewEvaluatorFactory.InProcessCreatePCollectionView.class) .build(); - private static Map, TransformEvaluatorFactory> defaultEvaluatorFactories = - new ConcurrentHashMap<>(); - - /** - * Register a default transform evaluator. - */ - public static > void registerTransformEvaluatorFactory( - Class clazz, TransformEvaluatorFactory evaluator) { - checkArgument(defaultEvaluatorFactories.put(clazz, evaluator) == null, - "Defining a default factory %s to evaluate Transforms of type %s multiple times", evaluator, - clazz); - } - /** * Part of a {@link PCollection}. Elements are output to a bundle, which will cause them to be * executed by {@link PTransform PTransforms} that consume the {@link PCollection} this bundle is @@ -82,9 +82,16 @@ public class InProcessPipelineRunner { * @param the type of elements that can be added to this bundle */ public static interface UncommittedBundle { + /** + * Returns the PCollection that the elements of this {@link UncommittedBundle} belong to. + */ + PCollection getPCollection(); + /** * Outputs an element to this bundle. * + * The bundle implementation is responsible for properly propagating + * * @param element the element to add to this bundle * @return this bundle */ @@ -108,9 +115,8 @@ public static interface UncommittedBundle { * @param the type of elements contained within this bundle */ public static interface CommittedBundle { - /** - * @return the PCollection that the elements of this bundle belong to + * Returns the PCollection that the elements of this bundle belong to. */ PCollection getPCollection(); @@ -127,8 +133,8 @@ public static interface CommittedBundle { @Nullable Object getKey(); /** - * @return an {@link Iterable} containing all of the elements that have been added to this - * {@link CommittedBundle} + * Returns an {@link Iterable} containing all of the elements that have been added to this + * {@link CommittedBundle}. */ Iterable> getElements(); @@ -154,84 +160,157 @@ public static interface PCollectionViewWriter { void add(Iterable> values); } + //////////////////////////////////////////////////////////////////////////////////////////////// + private final InProcessPipelineOptions options; + + public static InProcessPipelineRunner createForTest() { + InProcessPipelineOptions options = PipelineOptionsFactory.as(InProcessPipelineOptions.class); + options.setBlockOnRun(true); + return new InProcessPipelineRunner(options); + } + + public static InProcessPipelineRunner fromOptions(PipelineOptions options) { + return new InProcessPipelineRunner(options.as(InProcessPipelineOptions.class)); + } + + private InProcessPipelineRunner(InProcessPipelineOptions options) { + this.options = options; + } + /** - * The evaluation context for the {@link InProcessPipelineRunner}. Contains state shared within - * the current evaluation. + * Returns the {@link PipelineOptions} used to create this {@link InProcessPipelineRunner}. */ - public static interface InProcessEvaluationContext { - /** - * Create a {@link UncommittedBundle} for use by a source. - */ - UncommittedBundle createRootBundle(PCollection output); + public InProcessPipelineOptions getPipelineOptions() { + return options; + } - /** - * Create a {@link UncommittedBundle} whose elements belong to the specified {@link - * PCollection}. - */ - UncommittedBundle createBundle(CommittedBundle input, PCollection output); + @Override + public OutputT apply( + PTransform transform, InputT input) { + Class overrideClass = defaultTransformOverrides.get(transform.getClass()); + if (overrideClass != null) { + transform.validate(input); + // It is the responsibility of whoever constructs overrides to ensure this is type safe. + @SuppressWarnings("unchecked") + Class> transformClass = + (Class>) transform.getClass(); - /** - * Create a {@link UncommittedBundle} with the specified keys at the specified step. For use by - * {@link GroupByKeyOnly} {@link PTransform PTransforms}. - */ - UncommittedBundle createKeyedBundle( - CommittedBundle input, Object key, PCollection output); + @SuppressWarnings("unchecked") + Class> customTransformClass = + (Class>) overrideClass; - /** - * Create a bundle whose elements will be used in a PCollectionView. - */ - PCollectionViewWriter createPCollectionViewWriter( - PCollection> input, PCollectionView output); + PTransform customTransform = + InstanceBuilder.ofType(customTransformClass) + .withArg(transformClass, transform) + .build(); - /** - * Get the options used by this {@link Pipeline}. - */ - InProcessPipelineOptions getPipelineOptions(); + // This overrides the contents of the apply method without changing the TransformTreeNode that + // is generated by the PCollection application. + return super.apply(customTransform, input); + } else { + return super.apply(transform, input); + } + } - /** - * Get an {@link ExecutionContext} for the provided application. - */ - InProcessExecutionContext getExecutionContext( - AppliedPTransform application, @Nullable Object key); + @Override + public InProcessPipelineResult run(Pipeline pipeline) { + ConsumerTrackingPipelineVisitor visitor = new ConsumerTrackingPipelineVisitor(); + pipeline.traverseTopologically(visitor); + for (PValue unfinalized : visitor.getUnfinalizedPValues()) { + unfinalized.finishSpecifying(); + } - /** - * Get the Step Name for the provided application. - */ - String getStepName(AppliedPTransform application); + InProcessEvaluationContext context = + InProcessEvaluationContext.create( + getPipelineOptions(), + TransformEvaluatorRegistry.defaultRegistry(), + visitor.getRootTransforms(), + visitor.getValueToConsumers(), + visitor.getStepNames(), + visitor.getViews()); - /** - * @param sideInputs the {@link PCollectionView PCollectionViews} the result should be able to - * read - * @return a {@link SideInputReader} that can read all of the provided - * {@link PCollectionView PCollectionViews} - */ - SideInputReader createSideInputReader(List> sideInputs); + ExecutorService executorService = context.getPipelineOptions().getExecutorService(); + InProcessExecutor executor = + ExecutorServiceParallelExecutor.create(executorService, context); + executor.start(visitor.getRootTransforms()); - /** - * Schedules a callback after the watermark for a {@link PValue} after the trigger for the - * specified window (with the specified windowing strategy) must have fired from the perspective - * of that {@link PValue}, as specified by the value of - * {@link Trigger#getWatermarkThatGuaranteesFiring(BoundedWindow)} for the trigger of the - * {@link WindowingStrategy}. - */ - void callAfterOutputMustHaveBeenProduced(PValue value, BoundedWindow window, - WindowingStrategy windowingStrategy, Runnable runnable); + Map, Collection>> aggregatorSteps = + new AggregatorPipelineExtractor(pipeline).getAggregatorSteps(); + InProcessPipelineResult result = + new InProcessPipelineResult(executor, context, aggregatorSteps); + if (options.isBlockOnRun()) { + try { + result.awaitCompletion(); + } catch (UserCodeException userException) { + throw new PipelineExecutionException(userException.getCause()); + } catch (Throwable t) { + Throwables.propagate(t); + } + } + return result; + } - /** - * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent - * of all other {@link CounterSet CounterSets} created by this call. - * - * The {@link InProcessEvaluationContext} is responsible for unifying the counters present in - * all created {@link CounterSet CounterSets} when the transforms that call this method - * complete. - */ - CounterSet createCounterSet(); + /** + * The result of running a {@link Pipeline} with the {@link InProcessPipelineRunner}. + * + * Throws {@link UnsupportedOperationException} for all methods. + */ + public static class InProcessPipelineResult implements PipelineResult { + private final InProcessExecutor executor; + private final InProcessEvaluationContext evaluationContext; + private final Map, Collection>> aggregatorSteps; + private State state; - /** - * Returns all of the counters that have been merged into this context via calls to - * {@link CounterSet#merge(CounterSet)}. - */ - CounterSet getCounters(); + private InProcessPipelineResult( + InProcessExecutor executor, + InProcessEvaluationContext evaluationContext, + Map, Collection>> aggregatorSteps) { + this.executor = executor; + this.evaluationContext = evaluationContext; + this.aggregatorSteps = aggregatorSteps; + // Only ever constructed after the executor has started. + this.state = State.RUNNING; + } + + @Override + public State getState() { + return state; + } + + @Override + public AggregatorValues getAggregatorValues(Aggregator aggregator) + throws AggregatorRetrievalException { + CounterSet counters = evaluationContext.getCounters(); + Collection> steps = aggregatorSteps.get(aggregator); + Map stepValues = new HashMap<>(); + for (AppliedPTransform transform : evaluationContext.getSteps()) { + if (steps.contains(transform.getTransform())) { + String stepName = + String.format( + "user-%s-%s", evaluationContext.getStepName(transform), aggregator.getName()); + Counter counter = (Counter) counters.getExistingCounter(stepName); + if (counter == null) { + throw new IllegalArgumentException( + "Aggregator " + aggregator + " is not used in this pipeline"); + } + stepValues.put(transform.getFullName(), counter.getAggregate()); + } + } + return new MapAggregatorValues<>(stepValues); + } + + public State awaitCompletion() throws Throwable { + if (!state.isTerminal()) { + try { + executor.awaitCompletion(); + state = State.DONE; + } catch (Throwable t) { + state = State.FAILED; + throw t; + } + } + return state; + } } /** @@ -240,21 +319,21 @@ void callAfterOutputMustHaveBeenProduced(PValue value, BoundedWindow window, */ public static interface InProcessExecutor { /** - * @param root the root {@link AppliedPTransform} to schedule - */ - void scheduleRoot(AppliedPTransform root); - - /** - * @param consumer the {@link AppliedPTransform} to schedule - * @param bundle the input bundle to the consumer + * Starts this executor. The provided collection is the collection of root transforms to + * initially schedule. + * + * @param rootTransforms */ - void scheduleConsumption(AppliedPTransform consumer, CommittedBundle bundle); + void start(Collection> rootTransforms); /** * Blocks until the job being executed enters a terminal state. A job is completed after all * root {@link AppliedPTransform AppliedPTransforms} have completed, and all * {@link CommittedBundle Bundles} have been consumed. Jobs may also terminate abnormally. + * + * @throws Throwable whenever an executor thread throws anything, transfers the throwable to the + * waiting thread and rethrows it */ - void awaitCompletion(); + void awaitCompletion() throws Throwable; } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java index bf9a2e1c53fed..0e5f2cdf73539 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java @@ -17,7 +17,6 @@ import static com.google.common.base.Preconditions.checkArgument; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; import com.google.cloud.dataflow.sdk.util.PCollectionViewWindow; @@ -26,6 +25,7 @@ import com.google.cloud.dataflow.sdk.util.WindowingStrategy; import com.google.cloud.dataflow.sdk.values.PCollectionView; import com.google.common.base.MoreObjects; +import com.google.common.base.Throwables; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -108,8 +108,7 @@ public SideInputReader withViews(Collection> newContainedView * *

The provided iterable is expected to contain only a single window and pane. */ - public void write(PCollectionView view, Iterable> values) - throws ExecutionException { + public void write(PCollectionView view, Iterable> values) { Map>> valuesPerWindow = new HashMap<>(); for (WindowedValue value : values) { for (BoundedWindow window : value.getWindows()) { @@ -124,9 +123,10 @@ public void write(PCollectionView view, Iterable> for (Map.Entry>> windowValues : valuesPerWindow.entrySet()) { PCollectionViewWindow windowedView = PCollectionViewWindow.of(view, windowValues.getKey()); - SettableFuture>> future = viewByWindows.get(windowedView); - if (future.isDone()) { - try { + SettableFuture>> future = null; + try { + future = viewByWindows.get(windowedView); + if (future.isDone()) { Iterator> existingValues = future.get().iterator(); PaneInfo newPane = windowValues.getValue().iterator().next().getPane(); // The current value may have no elements, if no elements were produced for the window, @@ -136,13 +136,16 @@ public void write(PCollectionView view, Iterable> viewByWindows.invalidate(windowedView); viewByWindows.get(windowedView).set(windowValues.getValue()); } - } catch (InterruptedException e) { - // TODO: Handle meaningfully. This should never really happen when the result remains - // useful, but the result could be available and the thread can still be interrupted. - Thread.currentThread().interrupt(); + } else { + future.set(windowValues.getValue()); } - } else { - future.set(windowValues.getValue()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (future != null && !future.isDone()) { + future.set(Collections.>emptyList()); + } + } catch (ExecutionException e) { + Throwables.propagate(e.getCause()); } } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java index e3ae1a028c168..24142c2151c91 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java @@ -17,7 +17,6 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java index cd79c219bd678..af5914bab051e 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java @@ -17,7 +17,6 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepAndKey.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepAndKey.java new file mode 100644 index 0000000000000..d3c53dedeeffa --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepAndKey.java @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.common.base.MoreObjects; + +import java.util.Objects; + +/** + * A (Step, Key) pair. This is useful as a map key or cache key for things that are available + * per-step in a keyed manner (e.g. State). + */ +final class StepAndKey { + private final AppliedPTransform step; + private final Object key; + + /** + * Create a new {@link StepAndKey} with the provided step and key. + */ + public static StepAndKey of(AppliedPTransform step, Object key) { + return new StepAndKey(step, key); + } + + private StepAndKey(AppliedPTransform step, Object key) { + this.step = step; + this.key = key; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(StepAndKey.class) + .add("step", step.getFullName()) + .add("key", key) + .toString(); + } + + @Override + public int hashCode() { + return Objects.hash(step, key); + } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } else if (!(other instanceof StepAndKey)) { + return false; + } else { + StepAndKey that = (StepAndKey) other; + return Objects.equals(this.step, that.step) && Objects.equals(this.key, that.key); + } + } +} + diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java index 3b672e0def5e1..860ddfe48f164 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java @@ -16,7 +16,6 @@ package com.google.cloud.dataflow.sdk.runners.inprocess; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.PTransform; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java new file mode 100644 index 0000000000000..704efd74fee40 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java @@ -0,0 +1,53 @@ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.Flatten.FlattenPCollectionList; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.common.collect.ImmutableMap; + +import java.util.Map; + +import javax.annotation.Nullable; + +class TransformEvaluatorRegistry implements TransformEvaluatorFactory { + public static TransformEvaluatorRegistry defaultRegistry() { + @SuppressWarnings("rawtypes") + ImmutableMap, TransformEvaluatorFactory> primitives = + ImmutableMap., TransformEvaluatorFactory>builder() + .put(Read.Bounded.class, new BoundedReadEvaluatorFactory()) + .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory()) + .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory()) + .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory()) + .put( + GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly.class, + new GroupByKeyEvaluatorFactory()) + .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory()) + .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory()) + .build(); + return new TransformEvaluatorRegistry(primitives); + } + + @SuppressWarnings("rawtypes") + private final Map, TransformEvaluatorFactory> factories; + + private TransformEvaluatorRegistry( + @SuppressWarnings("rawtypes") + Map, TransformEvaluatorFactory> factories) { + this.factories = factories; + } + + @Override + public TransformEvaluator forApplication( + AppliedPTransform application, + @Nullable CommittedBundle inputBundle, + InProcessEvaluationContext evaluationContext) + throws Exception { + @SuppressWarnings("rawtypes") + Class applicationTransformClass = application.getTransform().getClass(); + TransformEvaluatorFactory factory = factories.get(applicationTransformClass); + return factory.forApplication(application, inputBundle, evaluationContext); + } +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java new file mode 100644 index 0000000000000..f9d7965eaa440 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java @@ -0,0 +1,96 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.common.base.Throwables; + +import java.util.concurrent.Callable; + +import javax.annotation.Nullable; + +class TransformExecutor implements Callable { + public static TransformExecutor create( + InProcessEvaluationContext evaluationContext, + CommittedBundle inputBundle, + AppliedPTransform transform, + CompletionCallback completionCallback, + TransformExecutorService transformEvaluationState) { + return new TransformExecutor<>( + evaluationContext, inputBundle, transform, completionCallback, transformEvaluationState); + } + + private final InProcessEvaluationContext evaluationContext; + + private final CommittedBundle inputBundle; + private final AppliedPTransform transform; + + private final CompletionCallback onComplete; + + private final TransformExecutorService transformEvaluationState; + + private Thread thread; + + private TransformExecutor( + InProcessEvaluationContext evaluationContext, + CommittedBundle inputBundle, + AppliedPTransform transform, + CompletionCallback completionCallback, + TransformExecutorService transformEvaluationState) { + this.evaluationContext = evaluationContext; + + this.inputBundle = inputBundle; + this.transform = transform; + + this.onComplete = completionCallback; + + this.transformEvaluationState = transformEvaluationState; + } + + @Override + public InProcessTransformResult call() { + this.thread = Thread.currentThread(); + try { + TransformEvaluator evaluator = + evaluationContext.getTransformEvaluator(transform, inputBundle); + if (inputBundle != null) { + for (WindowedValue value : inputBundle.getElements()) { + evaluator.processElement(value); + } + } + InProcessTransformResult result = evaluator.finishBundle(); + onComplete.handleResult(inputBundle, result); + return result; + } catch (Throwable t) { + onComplete.handleThrowable(inputBundle, t); + throw Throwables.propagate(t); + } finally { + this.thread = null; + transformEvaluationState.complete(this); + } + } + + /** + * If this {@link TransformExecutor} is currently executing, return the thread it is executing in. + * Otherwise, return null. + */ + @Nullable + public Thread getThread() { + return this.thread; + } +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorService.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorService.java new file mode 100644 index 0000000000000..3f00da6ebe517 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorService.java @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +/** + * Schedules and completes {@link TransformExecutor TransformExecutors}, controlling concurrency as + * appropriate for the {@link StepAndKey} the executor exists for. + */ +interface TransformExecutorService { + /** + * Schedule the provided work to be eventually executed. + */ + void schedule(TransformExecutor work); + + /** + * Finish executing the provided work. This may cause additional + * {@link TransformExecutor TransformExecutors} to be evaluated. + */ + void complete(TransformExecutor completed); +} + diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServices.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServices.java new file mode 100644 index 0000000000000..f2d55ff9f50e6 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServices.java @@ -0,0 +1,147 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.common.base.MoreObjects; + +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Static factory methods for constructing instances of {@link TransformExecutorService}. + */ +final class TransformExecutorServices { + private TransformExecutorServices() { + // Do not instantiate + } + + /** + * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in + * parallel. + */ + public static TransformExecutorService parallel( + ExecutorService executor, Map, Boolean> scheduled) { + return new ParallelEvaluationState(executor, scheduled); + } + + /** + * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in + * serial. + */ + public static TransformExecutorService serial( + ExecutorService executor, Map, Boolean> scheduled) { + return new SerialEvaluationState(executor, scheduled); + } + + /** + * The evaluation of a step where the input is unkeyed. Unkeyed inputs can be evaluated in + * parallel with an arbitrary amount of parallelism. + */ + private static class ParallelEvaluationState implements TransformExecutorService { + private final ExecutorService executor; + private final Map, Boolean> scheduled; + + private ParallelEvaluationState( + ExecutorService executor, Map, Boolean> scheduled) { + this.executor = executor; + this.scheduled = scheduled; + } + + @Override + public void schedule(TransformExecutor work) { + executor.submit(work); + scheduled.put(work, true); + } + + @Override + public void complete(TransformExecutor completed) { + scheduled.remove(completed); + } + } + + /** + * The evaluation of a (Step, Key) pair. (Step, Key) computations are processed serially; + * scheduling a computation will add it to the Work Queue if a computation is in progress, and + * completing a computation will schedule the next item in the work queue, if it exists. + */ + private static class SerialEvaluationState implements TransformExecutorService { + private final ExecutorService executor; + private final Map, Boolean> scheduled; + + private AtomicReference> currentlyEvaluating; + private final Queue> workQueue; + + private SerialEvaluationState( + ExecutorService executor, Map, Boolean> scheduled) { + this.scheduled = scheduled; + this.executor = executor; + this.currentlyEvaluating = new AtomicReference<>(); + this.workQueue = new ConcurrentLinkedQueue<>(); + } + + /** + * Schedules the work, adding it to the work queue if there is a bundle currently being + * evaluated and scheduling it immediately otherwise. + */ + @Override + public void schedule(TransformExecutor work) { + workQueue.offer(work); + updateCurrentlyEvaluating(); + } + + @Override + public void complete(TransformExecutor completed) { + if (!currentlyEvaluating.compareAndSet(completed, null)) { + throw new IllegalStateException( + "Finished work " + + completed + + " but could not complete due to unexpected currently executing " + + currentlyEvaluating.get()); + } + scheduled.remove(completed); + updateCurrentlyEvaluating(); + } + + private void updateCurrentlyEvaluating() { + if (currentlyEvaluating.get() == null) { + // Only synchronize if we need to update what's currently evaluating + synchronized (this) { + TransformExecutor newWork = workQueue.poll(); + if (newWork != null) { + if (currentlyEvaluating.compareAndSet(null, newWork)) { + scheduled.put(newWork, true); + executor.submit(newWork); + } else { + workQueue.offer(newWork); + } + } + } + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(SerialEvaluationState.class) + .add("currentlyEvaluating", currentlyEvaluating) + .add("workQueue", workQueue) + .toString(); + } + } +} + diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java index 4beac337d6049..97f0e25d38b4b 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java @@ -21,7 +21,6 @@ import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.PTransform; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java index f47cd1de986b0..314d81f6aafcd 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java @@ -17,7 +17,6 @@ import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.coders.VoidCoder; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.GroupByKey; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java index 9f22fbbe9e519..43955149ee032 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java @@ -25,7 +25,7 @@ import com.google.cloud.dataflow.sdk.io.BoundedSource; import com.google.cloud.dataflow.sdk.io.CountingSource; import com.google.cloud.dataflow.sdk.io.Read; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; +import com.google.cloud.dataflow.sdk.io.Read.Bounded; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java index bf25970affc15..0120b9880d654 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java @@ -22,7 +22,6 @@ import static org.mockito.Mockito.when; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java index 5c9e824afe417..4ced82f8c77d6 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java @@ -23,7 +23,6 @@ import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java index 24251522d8248..52398cf73a0bd 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java @@ -1047,6 +1047,18 @@ public void timerUpdateBuilderWithCompletedAfterBuildNotAddedToBuilt() { assertThat(built.getCompletedTimers(), emptyIterable()); } + @Test + public void timerUpdateWithCompletedTimersNotAddedToExisting() { + TimerUpdateBuilder builder = TimerUpdate.builder(null); + TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME); + + TimerUpdate built = builder.build(); + assertThat(built.getCompletedTimers(), emptyIterable()); + assertThat( + built.withCompletedTimers(ImmutableList.of(timer)).getCompletedTimers(), contains(timer)); + assertThat(built.getCompletedTimers(), emptyIterable()); + } + private static Matcher earlierThan(final Instant laterInstant) { return new BaseMatcher() { @Override diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java new file mode 100644 index 0000000000000..6ff378fcfaecc --- /dev/null +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java @@ -0,0 +1,407 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; + +import com.google.cloud.dataflow.sdk.coders.VarIntCoder; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers; +import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.View; +import com.google.cloud.dataflow.sdk.transforms.WithKeys; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo.Timing; +import com.google.cloud.dataflow.sdk.util.SideInputReader; +import com.google.cloud.dataflow.sdk.util.TimeDomain; +import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.util.common.Counter; +import com.google.cloud.dataflow.sdk.util.common.Counter.AggregationKind; +import com.google.cloud.dataflow.sdk.util.common.CounterSet; +import com.google.cloud.dataflow.sdk.util.state.BagState; +import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals; +import com.google.cloud.dataflow.sdk.util.state.StateNamespaces; +import com.google.cloud.dataflow.sdk.util.state.StateTag; +import com.google.cloud.dataflow.sdk.util.state.StateTags; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded; +import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.cloud.dataflow.sdk.values.PValue; +import com.google.common.collect.ImmutableList; + +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Tests for {@link InProcessEvaluationContext}. + */ +@RunWith(JUnit4.class) +public class InProcessEvaluationContextTest { + private TestPipeline p; + private InProcessEvaluationContext context; + private PCollection created; + private PCollection> downstream; + private PCollectionView> view; + + @Before + public void setup() { + InProcessPipelineRunner runner = + InProcessPipelineRunner.fromOptions(PipelineOptionsFactory.create()); + p = TestPipeline.create(); + created = p.apply(Create.of(1, 2, 3)); + downstream = created.apply(WithKeys.of("foo")); + view = created.apply(View.asIterable()); + Collection> rootTransforms = + ImmutableList.>of(created.getProducingTransformInternal()); + Map>> valueToConsumers = new HashMap<>(); + valueToConsumers.put( + created, + ImmutableList.>of( + downstream.getProducingTransformInternal(), view.getProducingTransformInternal())); + valueToConsumers.put(downstream, ImmutableList.>of()); + valueToConsumers.put(view, ImmutableList.>of()); + + Map, String> stepNames = new HashMap<>(); + stepNames.put(created.getProducingTransformInternal(), "s1"); + stepNames.put(downstream.getProducingTransformInternal(), "s2"); + stepNames.put(view.getProducingTransformInternal(), "s3"); + + Collection> views = ImmutableList.>of(view); + context = + InProcessEvaluationContext.create( + runner.getPipelineOptions(), + TransformEvaluatorRegistry.defaultRegistry(), + rootTransforms, + valueToConsumers, + stepNames, + views); + } + + @Test + public void writeToViewWriterThenReadReads() { + PCollectionViewWriter> viewWriter = + context.createPCollectionViewWriter( + PCollection.>createPrimitiveOutputInternal( + p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED), + view); + BoundedWindow window = + new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return new Instant(1248L); + } + }; + BoundedWindow second = + new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return new Instant(899999L); + } + }; + WindowedValue firstValue = + WindowedValue.of(1, new Instant(1222), window, PaneInfo.ON_TIME_AND_ONLY_FIRING); + WindowedValue secondValue = + WindowedValue.of( + 2, new Instant(8766L), second, PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0)); + Iterable> values = ImmutableList.of(firstValue, secondValue); + viewWriter.add(values); + + SideInputReader reader = + context.createSideInputReader(ImmutableList.>of(view)); + assertThat(reader.get(view, window), containsInAnyOrder(1)); + assertThat(reader.get(view, second), containsInAnyOrder(2)); + + WindowedValue overrittenSecondValue = + WindowedValue.of( + 4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1)); + viewWriter.add(Collections.singleton(overrittenSecondValue)); + assertThat(reader.get(view, second), containsInAnyOrder(4444)); + } + + @Test + public void getExecutionContextDifferentKeysIndependentState() { + InProcessExecutionContext fooContext = + context.getExecutionContext(created.getProducingTransformInternal(), "foo"); + + StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); + + fooContext + .getOrCreateStepContext("s1", "s1", null) + .stateInternals() + .state(StateNamespaces.global(), intBag) + .add(1); + + InProcessExecutionContext barContext = + context.getExecutionContext(created.getProducingTransformInternal(), "bar"); + assertThat( + barContext + .getOrCreateStepContext("s1", "s1", null) + .stateInternals() + .state(StateNamespaces.global(), intBag) + .read(), + emptyIterable()); + } + + @Test + public void getExecutionContextDifferentStepsIndependentState() { + String myKey = "foo"; + InProcessExecutionContext fooContext = + context.getExecutionContext(created.getProducingTransformInternal(), myKey); + + StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); + + fooContext + .getOrCreateStepContext("s1", "s1", null) + .stateInternals() + .state(StateNamespaces.global(), intBag) + .add(1); + + InProcessExecutionContext barContext = + context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); + assertThat( + barContext + .getOrCreateStepContext("s1", "s1", null) + .stateInternals() + .state(StateNamespaces.global(), intBag) + .read(), + emptyIterable()); + } + + @Test + public void handleResultMergesCounters() { + CounterSet counters = context.createCounterSet(); + Counter myCounter = Counter.longs("foo", AggregationKind.SUM); + counters.addCounter(myCounter); + + myCounter.addValue(4L); + InProcessTransformResult result = + StepTransformResult.withoutHold(created.getProducingTransformInternal()) + .withCounters(counters) + .build(); + context.handleResult(null, ImmutableList.of(), result); + assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(4L)); + + CounterSet againCounters = context.createCounterSet(); + Counter myLongCounterAgain = Counter.longs("foo", AggregationKind.SUM); + againCounters.add(myLongCounterAgain); + myLongCounterAgain.addValue(8L); + + InProcessTransformResult secondResult = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) + .withCounters(againCounters) + .build(); + context.handleResult( + InProcessBundle.unkeyed(created).commit(Instant.now()), + ImmutableList.of(), + secondResult); + assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(12L)); + } + + @Test + public void handleResultStoresState() { + String myKey = "foo"; + InProcessExecutionContext fooContext = + context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); + + StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); + + CopyOnAccessInMemoryStateInternals state = + fooContext.getOrCreateStepContext("s1", "s1", null).stateInternals(); + BagState bag = state.state(StateNamespaces.global(), intBag); + bag.add(1); + bag.add(2); + bag.add(4); + + InProcessTransformResult stateResult = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) + .withState(state) + .build(); + + context.handleResult( + InProcessBundle.keyed(created, myKey).commit(Instant.now()), + ImmutableList.of(), + stateResult); + + InProcessExecutionContext afterResultContext = + context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); + + CopyOnAccessInMemoryStateInternals afterResultState = + afterResultContext.getOrCreateStepContext("s1", "s1", null).stateInternals(); + assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4)); + } + + @Test + public void callAfterOutputMustHaveBeenProducedAfterEndOfWatermarkCallsback() throws Exception { + final ArrayBlockingQueue wasCalled = new ArrayBlockingQueue<>(1); + Runnable callback = + new Runnable() { + @Override + public void run() { + wasCalled.offer(true); + } + }; + + // Should call back after the end of the global window + context.callAfterOutputMustHaveBeenProduced( + downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback); + + InProcessTransformResult result = + StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0)) + .build(); + + context.handleResult(null, ImmutableList.of(), result); + + // Difficult to demonstrate that we took no action in a multithreaded world; poll for a bit + // will likely be flaky if this logic is broken + assertThat(wasCalled.poll(500L, TimeUnit.MILLISECONDS), nullValue()); + + InProcessTransformResult finishedResult = + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + context.handleResult(null, ImmutableList.of(), finishedResult); + // Obtain the value via blocking call + assertThat(wasCalled.take(), equalTo(true)); + } + + @Test + public void callAfterOutputMustHaveBeenProducedAlreadyAfterCallsImmediately() throws Exception { + InProcessTransformResult finishedResult = + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + context.handleResult(null, ImmutableList.of(), finishedResult); + + final ArrayBlockingQueue wasCalled = new ArrayBlockingQueue<>(1); + Runnable callback = + new Runnable() { + @Override + public void run() { + wasCalled.offer(true); + } + }; + context.callAfterOutputMustHaveBeenProduced( + downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback); + // Should be scheduled to execute asynchronously immediately, but still need a blocking call + assertThat(wasCalled.take(), equalTo(true)); + } + + @Test + public void extractFiredTimersExtractsTimers() { + InProcessTransformResult holdResult = + StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0)) + .build(); + context.handleResult(null, ImmutableList.of(), holdResult); + + String key = "foo"; + TimerData toFire = + TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME); + InProcessTransformResult timerResult = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) + .withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null)) + .withTimerUpdate(TimerUpdate.builder(key).setTimer(toFire).build()) + .build(); + + // haven't added any timers, must be empty + assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); + context.handleResult( + InProcessBundle.keyed(created, key).commit(Instant.now()), + ImmutableList.of(), + timerResult); + + // timer hasn't fired + assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); + + InProcessTransformResult advanceResult = + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + // Should cause the downstream timer to fire + context.handleResult(null, ImmutableList.of(), advanceResult); + + Map, Map> fired = context.extractFiredTimers(); + assertThat( + fired, + Matchers.>hasKey(downstream.getProducingTransformInternal())); + Map downstreamFired = + fired.get(downstream.getProducingTransformInternal()); + assertThat(downstreamFired, Matchers.hasKey(key)); + + FiredTimers firedForKey = downstreamFired.get(key); + assertThat(firedForKey.getTimers(TimeDomain.PROCESSING_TIME), emptyIterable()); + assertThat(firedForKey.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), emptyIterable()); + assertThat(firedForKey.getTimers(TimeDomain.EVENT_TIME), contains(toFire)); + + // Don't reextract timers + assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); + } + + @Test + public void createBundleUnkeyedResultUnkeyed() { + CommittedBundle> newBundle = + context + .createBundle(InProcessBundle.unkeyed(created).commit(Instant.now()), downstream) + .commit(Instant.now()); + assertThat(newBundle.isKeyed(), is(false)); + } + + @Test + public void createBundleKeyedResultPropagatesKey() { + CommittedBundle> newBundle = + context + .createBundle(InProcessBundle.keyed(created, "foo").commit(Instant.now()), downstream) + .commit(Instant.now()); + assertThat(newBundle.isKeyed(), is(true)); + assertThat(newBundle.getKey(), Matchers.equalTo("foo")); + } + + @Test + public void createRootBundleUnkeyed() { + assertThat(context.createRootBundle(created).commit(Instant.now()).isKeyed(), is(false)); + } + + @Test + public void createKeyedBundleKeyed() { + CommittedBundle> keyedBundle = + context + .createKeyedBundle( + InProcessBundle.unkeyed(created).commit(Instant.now()), "foo", downstream) + .commit(Instant.now()); + assertThat(keyedBundle.isKeyed(), is(true)); + assertThat(keyedBundle.getKey(), Matchers.equalTo("foo")); + } +} diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunnerTest.java new file mode 100644 index 0000000000000..adb64cd62588f --- /dev/null +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunnerTest.java @@ -0,0 +1,77 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessPipelineResult; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.transforms.Count; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.MapElements; +import com.google.cloud.dataflow.sdk.transforms.SimpleFunction; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; + +/** + * Tests for basic {@link InProcessPipelineRunner} functionality. + */ +@RunWith(JUnit4.class) +public class InProcessPipelineRunnerTest implements Serializable { + @Test + public void wordCountShouldSucceed() throws Throwable { + Pipeline p = getPipeline(); + + PCollection> counts = + p.apply(Create.of("foo", "bar", "foo", "baz", "bar", "foo")) + .apply(MapElements.via(new SimpleFunction() { + @Override + public String apply(String input) { + return input; + } + })) + .apply(Count.perElement()); + PCollection countStrs = + counts.apply(MapElements.via(new SimpleFunction, String>() { + @Override + public String apply(KV input) { + String str = String.format("%s: %s", input.getKey(), input.getValue()); + return str; + } + })); + + DataflowAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3"); + + InProcessPipelineResult result = ((InProcessPipelineResult) p.run()); + result.awaitCompletion(); + } + + private Pipeline getPipeline() { + PipelineOptions opts = PipelineOptionsFactory.create(); + opts.setRunner(InProcessPipelineRunner.class); + + Pipeline p = Pipeline.create(opts); + return p; + } +} + diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java index 4cfe782936877..dfcd66f17888b 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java @@ -24,7 +24,6 @@ import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.Mean; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java index 033f9de204d3a..66430b6a7f4dd 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java @@ -26,7 +26,6 @@ import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java index ae599bab62bc5..3b928b9077610 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java @@ -26,7 +26,6 @@ import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServicesTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServicesTest.java new file mode 100644 index 0000000000000..2c66dc2c325f8 --- /dev/null +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServicesTest.java @@ -0,0 +1,134 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static org.hamcrest.Matchers.any; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import com.google.common.util.concurrent.MoreExecutors; + +import org.hamcrest.Matchers; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; + +/** + * Tests for {@link TransformExecutorServices}. + */ +@RunWith(JUnit4.class) +public class TransformExecutorServicesTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + private ExecutorService executorService; + private Map, Boolean> scheduled; + + @Before + public void setup() { + executorService = MoreExecutors.newDirectExecutorService(); + scheduled = new ConcurrentHashMap<>(); + } + + @Test + public void parallelScheduleMultipleSchedulesBothImmediately() { + @SuppressWarnings("unchecked") + TransformExecutor first = mock(TransformExecutor.class); + @SuppressWarnings("unchecked") + TransformExecutor second = mock(TransformExecutor.class); + + TransformExecutorService parallel = + TransformExecutorServices.parallel(executorService, scheduled); + parallel.schedule(first); + parallel.schedule(second); + + verify(first).call(); + verify(second).call(); + assertThat( + scheduled, + Matchers.allOf( + Matchers., Boolean>hasEntry(first, true), + Matchers., Boolean>hasEntry(second, true))); + + parallel.complete(first); + assertThat(scheduled, Matchers., Boolean>hasEntry(second, true)); + assertThat( + scheduled, + not( + Matchers., Boolean>hasEntry( + Matchers.>equalTo(first), any(Boolean.class)))); + parallel.complete(second); + assertThat(scheduled.isEmpty(), is(true)); + } + + @Test + public void serialScheduleTwoWaitsForFirstToComplete() { + @SuppressWarnings("unchecked") + TransformExecutor first = mock(TransformExecutor.class); + @SuppressWarnings("unchecked") + TransformExecutor second = mock(TransformExecutor.class); + + TransformExecutorService serial = TransformExecutorServices.serial(executorService, scheduled); + serial.schedule(first); + verify(first).call(); + + serial.schedule(second); + verify(second, never()).call(); + + assertThat(scheduled, Matchers., Boolean>hasEntry(first, true)); + assertThat( + scheduled, + not( + Matchers., Boolean>hasEntry( + Matchers.>equalTo(second), any(Boolean.class)))); + + serial.complete(first); + verify(second).call(); + assertThat(scheduled, Matchers., Boolean>hasEntry(second, true)); + assertThat( + scheduled, + not( + Matchers., Boolean>hasEntry( + Matchers.>equalTo(first), any(Boolean.class)))); + + serial.complete(second); + } + + @Test + public void serialCompleteNotExecutingTaskThrows() { + @SuppressWarnings("unchecked") + TransformExecutor first = mock(TransformExecutor.class); + @SuppressWarnings("unchecked") + TransformExecutor second = mock(TransformExecutor.class); + + TransformExecutorService serial = TransformExecutorServices.serial(executorService, scheduled); + serial.schedule(first); + thrown.expect(IllegalStateException.class); + thrown.expectMessage("unexpected currently executing"); + + serial.complete(second); + } +} diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java new file mode 100644 index 0000000000000..6358b275c1f7a --- /dev/null +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java @@ -0,0 +1,315 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.when; + +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.WithKeys; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.util.concurrent.MoreExecutors; + +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Tests for {@link TransformExecutor}. + */ +@RunWith(JUnit4.class) +public class TransformExecutorTest { + private PCollection created; + private PCollection> downstream; + + private CountDownLatch evaluatorCompleted; + + private RegisteringCompletionCallback completionCallback; + private TransformExecutorService transformEvaluationState; + @Mock private InProcessEvaluationContext evaluationContext; + private Map, Boolean> scheduled; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + + scheduled = new HashMap<>(); + transformEvaluationState = + TransformExecutorServices.parallel(MoreExecutors.newDirectExecutorService(), scheduled); + + evaluatorCompleted = new CountDownLatch(1); + completionCallback = new RegisteringCompletionCallback(evaluatorCompleted); + + TestPipeline p = TestPipeline.create(); + created = p.apply(Create.of("foo", "spam", "third")); + downstream = created.apply(WithKeys.of(3)); + } + + @Test + public void callWithNullInputBundleFinishesBundleAndCompletes() throws Exception { + final InProcessTransformResult result = + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + final AtomicBoolean finishCalled = new AtomicBoolean(false); + TransformEvaluator evaluator = + new TransformEvaluator() { + @Override + public void processElement(WindowedValue element) throws Exception { + throw new IllegalArgumentException("Shouldn't be called"); + } + + @Override + public InProcessTransformResult finishBundle() throws Exception { + finishCalled.set(true); + return result; + } + }; + + when(evaluationContext.getTransformEvaluator(created.getProducingTransformInternal(), null)) + .thenReturn(evaluator); + + TransformExecutor executor = + TransformExecutor.create( + evaluationContext, + null, + created.getProducingTransformInternal(), + completionCallback, + transformEvaluationState); + executor.call(); + + assertThat(finishCalled.get(), is(true)); + assertThat(completionCallback.handledResult, equalTo(result) + ); + assertThat(completionCallback.handledThrowable, is(nullValue())); + assertThat(scheduled, not(Matchers.>hasKey(executor))); + } + + @Test + public void inputBundleProcessesEachElementFinishesAndCompletes() throws Exception { + final InProcessTransformResult result = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build(); + final Collection> elementsProcessed = new ArrayList<>(); + TransformEvaluator evaluator = + new TransformEvaluator() { + @Override + public void processElement(WindowedValue element) throws Exception { + elementsProcessed.add(element); + return; + } + + @Override + public InProcessTransformResult finishBundle() throws Exception { + return result; + } + }; + + WindowedValue foo = WindowedValue.valueInGlobalWindow("foo"); + WindowedValue spam = WindowedValue.valueInGlobalWindow("spam"); + WindowedValue third = WindowedValue.valueInGlobalWindow("third"); + CommittedBundle inputBundle = + InProcessBundle.unkeyed(created) + .add(foo) + .add(spam) + .add(third) + .commit(Instant.now()); + when( + evaluationContext.getTransformEvaluator( + downstream.getProducingTransformInternal(), inputBundle)) + .thenReturn(evaluator); + + TransformExecutor executor = + TransformExecutor.create( + evaluationContext, + inputBundle, + downstream.getProducingTransformInternal(), + completionCallback, + transformEvaluationState); + + Executors.newSingleThreadExecutor().submit(executor); + + evaluatorCompleted.await(); + + assertThat(elementsProcessed, containsInAnyOrder(spam, third, foo)); + assertThat(completionCallback.handledResult, equalTo(result)); + assertThat(completionCallback.handledThrowable, is(nullValue())); + assertThat(scheduled, not(Matchers.>hasKey(executor))); + } + + @Test + public void processElementThrowsExceptionCallsback() throws Exception { + final InProcessTransformResult result = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build(); + final Exception exception = new Exception(); + TransformEvaluator evaluator = + new TransformEvaluator() { + @Override + public void processElement(WindowedValue element) throws Exception { + throw exception; + } + + @Override + public InProcessTransformResult finishBundle() throws Exception { + return result; + } + }; + + WindowedValue foo = WindowedValue.valueInGlobalWindow("foo"); + CommittedBundle inputBundle = + InProcessBundle.unkeyed(created) + .add(foo) + .commit(Instant.now()); + when( + evaluationContext.getTransformEvaluator( + downstream.getProducingTransformInternal(), inputBundle)) + .thenReturn(evaluator); + + TransformExecutor executor = + TransformExecutor.create( + evaluationContext, + inputBundle, + downstream.getProducingTransformInternal(), + completionCallback, + transformEvaluationState); + Executors.newSingleThreadExecutor().submit(executor); + + evaluatorCompleted.await(); + + assertThat(completionCallback.handledResult, is(nullValue())); + assertThat(completionCallback.handledThrowable, Matchers.equalTo(exception)); + assertThat(scheduled, not(Matchers.>hasKey(executor))); + } + + @Test + public void finishBundleThrowsExceptionCallsback() throws Exception { + final Exception exception = new Exception(); + TransformEvaluator evaluator = + new TransformEvaluator() { + @Override + public void processElement(WindowedValue element) throws Exception {} + + @Override + public InProcessTransformResult finishBundle() throws Exception { + throw exception; + } + }; + + CommittedBundle inputBundle = InProcessBundle.unkeyed(created).commit(Instant.now()); + when( + evaluationContext.getTransformEvaluator( + downstream.getProducingTransformInternal(), inputBundle)) + .thenReturn(evaluator); + + TransformExecutor executor = + TransformExecutor.create( + evaluationContext, + inputBundle, + downstream.getProducingTransformInternal(), + completionCallback, + transformEvaluationState); + Executors.newSingleThreadExecutor().submit(executor); + + evaluatorCompleted.await(); + + assertThat(completionCallback.handledResult, is(nullValue())); + assertThat(completionCallback.handledThrowable, Matchers.equalTo(exception)); + assertThat(scheduled, not(Matchers.>hasKey(executor))); + } + + @Test + public void duringCallGetThreadIsNonNull() throws Exception { + final InProcessTransformResult result = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build(); + final CountDownLatch testLatch = new CountDownLatch(1); + final CountDownLatch evaluatorLatch = new CountDownLatch(1); + TransformEvaluator evaluator = + new TransformEvaluator() { + @Override + public void processElement(WindowedValue element) throws Exception { + throw new IllegalArgumentException("Shouldn't be called"); + } + + @Override + public InProcessTransformResult finishBundle() throws Exception { + testLatch.countDown(); + evaluatorLatch.await(); + return result; + } + }; + + when( + evaluationContext.getTransformEvaluator( + created.getProducingTransformInternal(), null)) + .thenReturn(evaluator); + + TransformExecutor executor = + TransformExecutor.create( + evaluationContext, + null, + created.getProducingTransformInternal(), + completionCallback, + transformEvaluationState); + + Executors.newSingleThreadExecutor().submit(executor); + testLatch.await(); + assertThat(executor.getThread(), not(nullValue())); + + // Finish the execution so everything can get closed down cleanly. + evaluatorLatch.countDown(); + } + + private static class RegisteringCompletionCallback implements CompletionCallback { + private InProcessTransformResult handledResult = null; + private Throwable handledThrowable = null; + private final CountDownLatch onMethod; + + private RegisteringCompletionCallback(CountDownLatch onMethod) { + this.onMethod = onMethod; + } + + @Override + public void handleResult(CommittedBundle inputBundle, InProcessTransformResult result) { + handledResult = result; + onMethod.countDown(); + } + + @Override + public void handleThrowable(CommittedBundle inputBundle, Throwable t) { + handledThrowable = t; + onMethod.countDown(); + } + } +} diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java index f139c5648e951..a9bbcc8cc56c2 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java @@ -25,7 +25,6 @@ import com.google.cloud.dataflow.sdk.io.CountingSource; import com.google.cloud.dataflow.sdk.io.Read; import com.google.cloud.dataflow.sdk.io.UnboundedSource; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java index 2f5cd0fb888a4..2f5bdde6adcd9 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java @@ -25,7 +25,6 @@ import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.coders.VoidCoder; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create;