Skip to content

Commit

Permalink
fixup!
Browse files Browse the repository at this point in the history
Add ConcurrentPartitionedPriorityQueueTest

More docs. Minor renaming.
  • Loading branch information
tgroh committed Mar 12, 2016
1 parent ab5c0ed commit 2635f86
Show file tree
Hide file tree
Showing 9 changed files with 392 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.collect.ImmutableList;

import org.joda.time.Instant;
Expand Down Expand Up @@ -110,12 +109,12 @@ public Instant getSynchronizedProcessingOutputWatermark() {

@Override
public String toString() {
ToStringHelper toStringHelper =
MoreObjects.toStringHelper(this).add("pcollection", pcollection);
if (keyed) {
toStringHelper = toStringHelper.add("key", key);
}
return toStringHelper.add("elements", committedElements).toString();
return MoreObjects.toStringHelper(this)
.omitNullValues()
.add("pcollection", pcollection)
.add("key", key)
.add("elements", committedElements)
.toString();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger;
import com.google.cloud.dataflow.sdk.util.ConcurrentPartitionedPriorityQueue;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.SideInputReader;
import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
Expand All @@ -40,20 +38,14 @@
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.PValue;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;

import org.joda.time.Instant;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -85,19 +77,17 @@ class InProcessEvaluationContext {
/** The current processing time and event time watermarks and timers. */
private final InMemoryWatermarkManager watermarkManager;

private final ConcurrentPartitionedPriorityQueue<AppliedPTransform<?, ?, ?>, WatermarkCallback>
pendingCallbacks;
/** Executes callbacks based on the progression of the watermark. */
private final WatermarkCallbackExecutor callbackExecutor;

// The stateInternals of the world, by applied PTransform and key.
/** The stateInternals of the world, by applied PTransform and key. */
private final ConcurrentMap<StepAndKey, CopyOnAccessInMemoryStateInternals<?>>
applicationStateInternals;

private final InProcessSideInputContainer sideInputContainer;

private final CounterSet mergedCounters;

private final Executor callbackExecutor;

public static InProcessEvaluationContext create(
InProcessPipelineOptions options,
Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
Expand All @@ -121,7 +111,6 @@ private InProcessEvaluationContext(
checkNotNull(views);
this.stepNames = stepNames;

this.pendingCallbacks = ConcurrentPartitionedPriorityQueue.create(new CallbackOrdering());
this.watermarkManager =
InMemoryWatermarkManager.create(
NanosOffsetClock.create(), rootTransforms, valueToConsumers);
Expand All @@ -130,7 +119,7 @@ private InProcessEvaluationContext(
this.applicationStateInternals = new ConcurrentHashMap<>();
this.mergedCounters = new CounterSet();

this.callbackExecutor = Executors.newCachedThreadPool();
this.callbackExecutor = WatermarkCallbackExecutor.create();
}

/**
Expand Down Expand Up @@ -161,7 +150,7 @@ public synchronized Iterable<? extends CommittedBundle<?>> handleResult(
result.getTimerUpdate().withCompletedTimers(completedTimers),
committedBundles,
result.getWatermarkHold());
watermarksUpdated();
fireAllAvailableCallbacks();
// Update counters
if (result.getCounters() != null) {
mergedCounters.merge(result.getCounters());
Expand Down Expand Up @@ -200,36 +189,15 @@ private Iterable<? extends CommittedBundle<?>> commitBundles(
return completed.build();
}

private void watermarksUpdated() {
private void fireAllAvailableCallbacks() {
for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
checkCallbacks(transform);
fireAvailableCallbacks(transform);
}
}

private void checkCallbacks(AppliedPTransform<?, ?, ?> producingTransform) {
private void fireAvailableCallbacks(AppliedPTransform<?, ?, ?> producingTransform) {
TransformWatermarks watermarks = watermarkManager.getWatermarks(producingTransform);
ShouldFirePredicate shouldFire = new ShouldFirePredicate(watermarks.getOutputWatermark());
WatermarkCallback callback;
do {
callback = pendingCallbacks.pollIfSatisfies(producingTransform, shouldFire);
if (callback != null) {
callbackExecutor.execute(callback.getCallback());
}
} while (callback != null);
}

private static class ShouldFirePredicate
implements SerializableFunction<WatermarkCallback, Boolean> {
private final Instant currentInstant;

public ShouldFirePredicate(Instant currentInstant) {
this.currentInstant = currentInstant;
}

@Override
public Boolean apply(WatermarkCallback input) {
return input.shouldFire(currentInstant);
}
callbackExecutor.fireForWatermark(producingTransform, watermarks.getOutputWatermark());
}

/**
Expand Down Expand Up @@ -273,23 +241,26 @@ public void add(Iterable<WindowedValue<ElemT>> values) {
}

/**
* Execute the specified callback after the watermark for a {@link PValue} passes the point at
* Schedule a callback to be executed after output would be produced for the given window
* if there had been input.
*
* <p>Output would be produced when the watermark for a {@link PValue} passes the point at
* which the trigger for the specified window (with the specified windowing strategy) must have
* fired from the perspective of that {@link PValue}, as specified by the value of
* {@link Trigger#getWatermarkThatGuaranteesFiring(BoundedWindow)} for the trigger of the
* {@link WindowingStrategy}. When the callback has fired, either values will have been produced
* for a key in that window, or the window is empty.
* for a key in that window, or the window is empty. The callback will be executed regardless of
* weather values have been produced.
*/
public void callAfterOutputMustHaveBeenProduced(
public void scheduleAfterOutputWouldBeProduced(
PValue value,
BoundedWindow window,
WindowingStrategy<?, ?> windowingStrategy,
Runnable runnable) {
WatermarkCallback callback =
WatermarkCallback.onGuaranteedFiring(window, windowingStrategy, runnable);
AppliedPTransform<?, ?, ?> producing = getProducing(value);
pendingCallbacks.offer(producing, callback);
checkCallbacks(producing);
callbackExecutor.callOnGuaranteedFiring(producing, window, windowingStrategy, runnable);

fireAvailableCallbacks(lookupProducing(value));
}

private AppliedPTransform<?, ?, ?> getProducing(PValue value) {
Expand Down Expand Up @@ -351,7 +322,7 @@ public String getStepName(AppliedPTransform<?, ?, ?> application) {
* {@link PCollectionView PCollectionViews}
*/
public SideInputReader createSideInputReader(final List<PCollectionView<?>> sideInputs) {
return sideInputContainer.withViews(sideInputs);
return sideInputContainer.createReaderForViews(sideInputs);
}

/**
Expand All @@ -374,47 +345,11 @@ public CounterSet getCounters() {
return mergedCounters;
}

private static class WatermarkCallback {
public static <W extends BoundedWindow> WatermarkCallback onGuaranteedFiring(
BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) {
@SuppressWarnings("unchecked")
Instant firingAfter =
strategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring((W) window);
return new WatermarkCallback(firingAfter, callback);
}

private final Instant fireAfter;
private final Runnable callback;

private WatermarkCallback(Instant fireAfter, Runnable callback) {
this.fireAfter = fireAfter;
this.callback = callback;
}

public boolean shouldFire(Instant currentWatermark) {
return currentWatermark.isAfter(fireAfter)
|| currentWatermark.equals(BoundedWindow.TIMESTAMP_MAX_VALUE);
}

public Runnable getCallback() {
return callback;
}
}

private static class CallbackOrdering extends Ordering<WatermarkCallback> {
@Override
public int compare(WatermarkCallback left, WatermarkCallback right) {
return ComparisonChain.start()
.compare(left.fireAfter, right.fireAfter)
.compare(left.callback, right.callback, Ordering.arbitrary())
.result();
}
}

/**
* Extracts all timers that have been fired and have not already been extracted.
*
* <p>This is a destructive operation. Timers will only appear in the result of this method once.
* <p>This is a destructive operation. Timers will only appear in the result of this method once
* for each time they are set.
*/
public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> extractFiredTimers() {
return watermarkManager.extractFiredTimers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private InProcessSideInputContainer(InProcessEvaluationContext context,
* the provided argument. The returned {@link InProcessSideInputContainer} is unmodifiable without
* casting, but will change as this {@link InProcessSideInputContainer} is modified.
*/
public SideInputReader withViews(Collection<PCollectionView<?>> newContainedViews) {
public SideInputReader createReaderForViews(Collection<PCollectionView<?>> newContainedViews) {
if (!containedViews.containsAll(newContainedViews)) {
Set<PCollectionView<?>> currentlyContained = ImmutableSet.copyOf(containedViews);
Set<PCollectionView<?>> newRequested = ImmutableSet.copyOf(newContainedViews);
Expand All @@ -113,7 +113,7 @@ public void write(PCollectionView<?> view, Iterable<? extends WindowedValue<?>>
indexValuesByWindow(values);
for (Map.Entry<BoundedWindow, Collection<WindowedValue<?>>> windowValues :
valuesPerWindow.entrySet()) {
setPCollectionViewWindowValues(view, windowValues.getKey(), windowValues.getValue());
updatePCollectionViewWindowValues(view, windowValues.getKey(), windowValues.getValue());
}
}

Expand Down Expand Up @@ -141,7 +141,7 @@ private Map<BoundedWindow, Collection<WindowedValue<?>>> indexValuesByWindow(
* specified values, if the values are part of a later pane than currently exist within the
* {@link PCollectionViewWindow}.
*/
private void setPCollectionViewWindowValues(
private void updatePCollectionViewWindowValues(
PCollectionView<?> view, BoundedWindow window, Collection<WindowedValue<?>> windowValues) {
PCollectionViewWindow<?> windowedView = PCollectionViewWindow.of(view, window);
SettableFuture<Iterable<? extends WindowedValue<?>>> future = null;
Expand Down Expand Up @@ -188,7 +188,7 @@ public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
viewByWindows.get(windowedView);

WindowingStrategy<?, ?> windowingStrategy = view.getWindowingStrategyInternal();
evaluationContext.callAfterOutputMustHaveBeenProduced(
evaluationContext.scheduleAfterOutputWouldBeProduced(
view, window, windowingStrategy, new Runnable() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@

import javax.annotation.Nullable;

/**
* A {@link TransformEvaluatorFactory} that delegates to primitive {@link TransformEvaluatorFactory}
* implementations based on the type of {@link PTransform} of the application.
*/
class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
public static TransformEvaluatorRegistry defaultRegistry() {
@SuppressWarnings("rawtypes")
Expand Down
Loading

0 comments on commit 2635f86

Please sign in to comment.