Skip to content

Commit

Permalink
fixup!
Browse files Browse the repository at this point in the history
Add ConcurrentPartitionedPriorityQueue, use in
InProcessEvaluationContext to handle WatermarkCallbacks
  • Loading branch information
tgroh committed Mar 9, 2016
1 parent bdbd70b commit 24d9846
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger;
import com.google.cloud.dataflow.sdk.util.ConcurrentPartitionedPriorityQueue;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.SideInputReader;
import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
Expand All @@ -38,9 +40,9 @@
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.PValue;
import com.google.common.base.Predicate;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;

Expand All @@ -50,7 +52,6 @@
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
Expand All @@ -59,8 +60,9 @@
import javax.annotation.Nullable;

/**
* The evaluation context for the {@link InProcessPipelineRunner}. Contains state shared within
* the current evaluation.
* The evaluation context for a specific pipeline being executed by the
* {@link InProcessPipelineRunner}. Contains state shared within the execution across all
* transforms.
*
* <p>{@link InProcessEvaluationContext} contains shared state for an execution of the
* {@link InProcessPipelineRunner} that can be used while evaluating a {@link PTransform}. This
Expand All @@ -76,12 +78,16 @@
* can be executed.
*/
class InProcessEvaluationContext {
private final Set<AppliedPTransform<?, ?, ?>> allTransforms;
/** The step name for each {@link AppliedPTransform} in the {@link Pipeline}. */
private final Map<AppliedPTransform<?, ?, ?>, String> stepNames;

/** The options that were used to create this {@link Pipeline}. */
private final InProcessPipelineOptions options;

/** The current processing time and event time watermarks and timers. */
private final InMemoryWatermarkManager watermarkManager;
private final ConcurrentMap<AppliedPTransform<?, ?, ?>, PriorityQueue<WatermarkCallback>>

private final ConcurrentPartitionedPriorityQueue<AppliedPTransform<?, ?, ?>, WatermarkCallback>
pendingCallbacks;

// The stateInternals of the world, by applied PTransform and key.
Expand Down Expand Up @@ -115,15 +121,9 @@ private InProcessEvaluationContext(
checkNotNull(valueToConsumers);
checkNotNull(stepNames);
checkNotNull(views);

this.allTransforms =
ImmutableSet.<AppliedPTransform<?, ?, ?>>builder()
.addAll(rootTransforms)
.addAll(Iterables.concat(valueToConsumers.values()))
.build();
this.stepNames = stepNames;

this.pendingCallbacks = new ConcurrentHashMap<>();
this.pendingCallbacks = ConcurrentPartitionedPriorityQueue.create(new CallbackOrdering());
this.watermarkManager = InMemoryWatermarkManager.create(
NanosOffsetClock.create(), rootTransforms, valueToConsumers);
this.sideInputContainer = InProcessSideInputContainer.create(this, views);
Expand Down Expand Up @@ -203,31 +203,37 @@ private Iterable<? extends CommittedBundle<?>> commitBundles(

private void watermarksUpdated() {
for (
Map.Entry<AppliedPTransform<?, ?, ?>, PriorityQueue<WatermarkCallback>> transformCallbacks :
pendingCallbacks.entrySet()) {
checkCallbacks(transformCallbacks.getKey(), transformCallbacks.getValue());
AppliedPTransform<?, ?, ?> transform : stepNames. keySet()) {
checkCallbacks(transform);
}
}

private void checkCallbacks(
AppliedPTransform<?, ?, ?> producingTransform,
PriorityQueue<WatermarkCallback> pendingTransformCallbacks) {
private void checkCallbacks(AppliedPTransform<?, ?, ?> producingTransform) {
TransformWatermarks watermarks = watermarkManager.getWatermarks(producingTransform);
ShouldFirePredicate shouldFire = new ShouldFirePredicate(watermarks.getOutputWatermark());
WatermarkCallback callback;
do {
callback = null;
synchronized (pendingTransformCallbacks) {
if (!pendingTransformCallbacks.isEmpty()
&& pendingTransformCallbacks.peek().shouldFire(watermarks.getOutputWatermark())) {
callback = pendingTransformCallbacks.poll();
}
}
callback = pendingCallbacks.pollIfSatisfies(producingTransform, shouldFire);
if (callback != null) {
callbackExecutor.execute(callback.getCallback());
}
} while (callback != null);
}

private static class ShouldFirePredicate
implements SerializableFunction<WatermarkCallback, Boolean> {
private final Instant currentInstant;

public ShouldFirePredicate(Instant currentInstant) {
this.currentInstant = currentInstant;
}

@Override
public Boolean apply(WatermarkCallback input) {
return input.shouldFire(currentInstant);
}
}

/**
* Create a {@link UncommittedBundle} for use by a source.
*/
Expand Down Expand Up @@ -284,17 +290,8 @@ public void callAfterOutputMustHaveBeenProduced(
WatermarkCallback callback =
WatermarkCallback.onGuaranteedFiring(window, windowingStrategy, runnable);
AppliedPTransform<?, ?, ?> producing = getProducing(value);
PriorityQueue<WatermarkCallback> callbacks = pendingCallbacks.get(producing);
if (callbacks == null) {
PriorityQueue<WatermarkCallback> newCallbacks =
new PriorityQueue<>(10, new CallbackOrdering());
pendingCallbacks.putIfAbsent(producing, newCallbacks);
callbacks = pendingCallbacks.get(producing);
}
synchronized (callbacks) {
callbacks.offer(callback);
}
checkCallbacks(producing, callbacks);
pendingCallbacks.offer(producing, callback);
checkCallbacks(producing);
}

private AppliedPTransform<?, ?, ?> getProducing(PValue value) {
Expand All @@ -305,7 +302,7 @@ public void callAfterOutputMustHaveBeenProduced(
}

private AppliedPTransform<?, ?, ?> lookupProducing(PValue value) {
for (AppliedPTransform<?, ?, ?> transform : allTransforms) {
for (AppliedPTransform<?, ?, ?> transform : stepNames. keySet()) {
if (transform.getOutput().equals(value) || transform.getOutput().expand().contains(value)) {
return transform;
}
Expand Down Expand Up @@ -335,7 +332,7 @@ public InProcessExecutionContext getExecutionContext(
* Get all of the steps used in this {@link Pipeline}.
*/
public Collection<AppliedPTransform<?, ?, ?>> getSteps() {
return allTransforms;
return stepNames.keySet();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (C) 2016 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.common.collect.Ordering;

import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* A {@link PriorityQueue} that is partitioned by key and thread-safe.
*/
public class ConcurrentPartitionedPriorityQueue<K, V> {
public static <K, V> ConcurrentPartitionedPriorityQueue<K, V> create(Comparator<V> comparator) {
return new ConcurrentPartitionedPriorityQueue<>(Ordering.from(comparator));
}

private final Ordering<V> ordering;
private final ConcurrentMap<K, PriorityQueue<V>> queues;

public ConcurrentPartitionedPriorityQueue(Ordering<V> ordering) {
this.ordering = ordering;
queues = new ConcurrentHashMap<>();
}

/**
* Place the provided value in the priority queue belonging to the key.
*/
public void offer(K key, V value) {
PriorityQueue<V> queue = queues.get(key);
if (queue == null) {
queue = new PriorityQueue<>(ordering);
queues.putIfAbsent(key, queue);
queue = queues.get(key);
}
synchronized (queue) {
queue.offer(value);
}
}

/**
* Retrieve the head of the queue for the provided key if it satisfies the provided prediate.
*/
public V pollIfSatisfies(K key, SerializableFunction<V, Boolean> toSatisfy) {
PriorityQueue<V> queue = queues.get(key);
if (queue == null) {
return null;
}
synchronized (queue) {
if (!queue.isEmpty() && toSatisfy.apply(queue.peek())) {
return queue.poll();
}
return null;
}
}
}

0 comments on commit 24d9846

Please sign in to comment.