Skip to content

Commit

Permalink
fixup!
Browse files Browse the repository at this point in the history
Hide guava interfaces. Formatter EvaluationContext.
  • Loading branch information
tgroh committed Mar 9, 2016
1 parent 24d9846 commit ab5c0ed
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.PValue;
import com.google.common.base.Predicate;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
Expand All @@ -51,7 +50,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -83,7 +81,7 @@ class InProcessEvaluationContext {

/** The options that were used to create this {@link Pipeline}. */
private final InProcessPipelineOptions options;

/** The current processing time and event time watermarks and timers. */
private final InMemoryWatermarkManager watermarkManager;

Expand Down Expand Up @@ -124,8 +122,9 @@ private InProcessEvaluationContext(
this.stepNames = stepNames;

this.pendingCallbacks = ConcurrentPartitionedPriorityQueue.create(new CallbackOrdering());
this.watermarkManager = InMemoryWatermarkManager.create(
NanosOffsetClock.create(), rootTransforms, valueToConsumers);
this.watermarkManager =
InMemoryWatermarkManager.create(
NanosOffsetClock.create(), rootTransforms, valueToConsumers);
this.sideInputContainer = InProcessSideInputContainer.create(this, views);

this.applicationStateInternals = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -202,8 +201,7 @@ private Iterable<? extends CommittedBundle<?>> commitBundles(
}

private void watermarksUpdated() {
for (
AppliedPTransform<?, ?, ?> transform : stepNames. keySet()) {
for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
checkCallbacks(transform);
}
}
Expand Down Expand Up @@ -302,7 +300,7 @@ public void callAfterOutputMustHaveBeenProduced(
}

private AppliedPTransform<?, ?, ?> lookupProducing(PValue value) {
for (AppliedPTransform<?, ?, ?> transform : stepNames. keySet()) {
for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
if (transform.getOutput().equals(value) || transform.getOutput().expand().contains(value)) {
return transform;
}
Expand All @@ -323,7 +321,9 @@ public InProcessPipelineOptions getPipelineOptions() {
public InProcessExecutionContext getExecutionContext(
AppliedPTransform<?, ?, ?> application, Object key) {
StepAndKey stepAndKey = StepAndKey.of(application, key);
return new InProcessExecutionContext(options.getClock(), key,
return new InProcessExecutionContext(
options.getClock(),
key,
(CopyOnAccessInMemoryStateInternals<Object>) applicationStateInternals.get(stepAndKey),
watermarkManager.getWatermarks(application));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static <K, V> ConcurrentPartitionedPriorityQueue<K, V> create(Comparator<
private final Ordering<V> ordering;
private final ConcurrentMap<K, PriorityQueue<V>> queues;

public ConcurrentPartitionedPriorityQueue(Ordering<V> ordering) {
private ConcurrentPartitionedPriorityQueue(Ordering<V> ordering) {
this.ordering = ordering;
queues = new ConcurrentHashMap<>();
}
Expand Down

0 comments on commit ab5c0ed

Please sign in to comment.