Skip to content

Commit

Permalink
Adding percentiles to distribution metric (apache#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajothomas authored Nov 2, 2021
1 parent 2e00168 commit 9459eef
Show file tree
Hide file tree
Showing 23 changed files with 689 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ class BeamModulePlugin implements Plugin<Project> {
commons_io : "commons-io:commons-io:2.6",
commons_lang3 : "org.apache.commons:commons-lang3:3.9",
commons_math3 : "org.apache.commons:commons-math3:3.6.1",
datasketches : "org.apache.datasketches:datasketches-java:3.0.0",
error_prone_annotations : "com.google.errorprone:error_prone_annotations:$errorprone_version",
flogger_system_backend : "com.google.flogger:flogger-system-backend:0.6",
gax : "com.google.api:gax", // google_cloud_platform_libraries_bom sets version
Expand Down
1 change: 1 addition & 0 deletions runners/core-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies {
compile project(path: ":model:job-management", configuration: "shadow")
compile project(":runners:core-construction-java")
compile project(":sdks:java:fn-execution")
compile library.java.datasketches
compile library.java.vendored_guava_26_0_jre
compile library.java.joda_time
compile library.java.vendored_grpc_1_36_0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.beam.runners.core.ReduceFnContextFactory.StateStyle;
import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory;
import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@
*/
public class DistributionCell implements Distribution, MetricCell<DistributionData> {

private final DirtyState dirty = new DirtyState();
private final AtomicReference<DistributionData> value =
new AtomicReference<>(DistributionData.EMPTY);
private final DirtyState dirty;
private final AtomicReference<DistributionData> value;
private final MetricName name;

/**
Expand All @@ -45,31 +44,36 @@ public class DistributionCell implements Distribution, MetricCell<DistributionDa
* MetricsContainer}. These constructors are *only* public so runners can instantiate.
*/
public DistributionCell(MetricName name) {
this.dirty = new DirtyState();
this.value = new AtomicReference<>(DistributionData.empty());
this.name = name;
}

public DistributionCell(DistributionMetricKey metricKey) {
this.dirty = new DirtyState();
this.value =
new AtomicReference<>(DistributionData.withPercentiles(metricKey.getPercentiles()));
this.name = metricKey.getMetricName();
}

@Override
public void reset() {
dirty.afterModification();
value.set(DistributionData.EMPTY);
value.get().reset();
}

/** Increment the distribution by the given amount. */
@Override
public void update(long n) {
update(DistributionData.singleton(n));
}

@Override
public void update(long sum, long count, long min, long max) {
update(DistributionData.create(sum, count, min, max));
value.get().update(n);
dirty.afterModification();
}

void update(DistributionData data) {
void update(DistributionData other) {
DistributionData original;
do {
original = value.get();
} while (!value.compareAndSet(original, original.combine(data)));
} while (!value.compareAndSet(original, original.combine(other)));
dirty.afterModification();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,48 +17,231 @@
*/
package org.apache.beam.runners.core.metrics;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.quantiles.DoublesSketchBuilder;
import org.apache.datasketches.quantiles.DoublesUnion;
import org.apache.datasketches.quantiles.DoublesUnionBuilder;
import org.apache.datasketches.quantiles.UpdateDoublesSketch;

/**
* Data describing the the distribution. This should retain enough detail that it can be combined
* with other {@link DistributionData}.
*
* <p>Datasketch library is used to compute percentiles. See {@linktourl
* https://datasketches.apache.org/}.
*
* <p>This is kept distinct from {@link DistributionResult} since this may be extended to include
* data necessary to approximate quantiles, etc. while {@link DistributionResult} would just include
* the approximate value of those quantiles.
*/
@AutoValue
public abstract class DistributionData implements Serializable {

public abstract long sum();

public abstract long count();
public class DistributionData implements Serializable {
// k = 256 should yield an approximate error ε of less than 1%
private static final int SKETCH_SUMMARY_SIZE = 256;

public abstract long min();
private final Set<Double> percentiles;
private long sum;
private long count;
private long min;
private long max;
private transient Optional<UpdateDoublesSketch> sketch;

public abstract long max();
/** Creates an instance of DistributionData with custom percentiles. */
public static DistributionData withPercentiles(Set<Double> percentiles) {
return new DistributionData(0L, 0L, Long.MAX_VALUE, Long.MIN_VALUE, percentiles);
}

public static final DistributionData EMPTY = create(0, 0, Long.MAX_VALUE, Long.MIN_VALUE);
/** Backward compatible static factory method. */
public static DistributionData empty() {
return new DistributionData(0L, 0L, Long.MAX_VALUE, Long.MIN_VALUE, ImmutableSet.of());
}

/** Static factory method primary used for testing. */
@VisibleForTesting
public static DistributionData create(long sum, long count, long min, long max) {
return new AutoValue_DistributionData(sum, count, min, max);
return new DistributionData(sum, count, min, max, ImmutableSet.of());
}

private DistributionData(long sum, long count, long min, long max, Set<Double> percentiles) {
this.sum = sum;
this.count = count;
this.min = min;
this.max = max;
this.percentiles = percentiles;
if (!percentiles.isEmpty()) {
final DoublesSketchBuilder doublesSketchBuilder = new DoublesSketchBuilder();
this.sketch = Optional.of(doublesSketchBuilder.setK(SKETCH_SUMMARY_SIZE).build());
} else {
this.sketch = Optional.empty();
}
}

public static DistributionData singleton(long value) {
return create(value, 1, value, value);
final DistributionData distributionData = empty();
distributionData.update(value);
return distributionData;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////
// Getters

public long sum() {
return sum;
}

public long count() {
return count;
}

public long min() {
return min;
}

public DistributionData combine(DistributionData value) {
return create(
sum() + value.sum(),
count() + value.count(),
Math.min(value.min(), min()),
Math.max(value.max(), max()));
public long max() {
return max;
}

/** Gets the percentiles and the percentiles values as a map. */
public Map<Double, Double> percentiles() {
if (!sketch.isPresent() || sketch.get().getN() == 0) {
// if the sketch is not present or is empty, do not compute the percentile
return ImmutableMap.of();
}

double[] quantiles = percentiles.stream().mapToDouble(i -> i / 100).toArray();
double[] quantileResults = sketch.get().getQuantiles(quantiles);

final ImmutableMap.Builder<Double, Double> resultBuilder = ImmutableMap.builder();
for (int k = 0; k < quantiles.length; k++) {
resultBuilder.put(quantiles[k] * 100, quantileResults[k]);
}
return resultBuilder.build();
}

////////////////////////////////////////////////////////////////////////////////////////////////////////

/**
* Updates the distribution with a value. For percentiles, only add the value to the sketch.
* Percentile will be computed prior to calling {@link DistributionCell#getCumulative()} or in
* {@link #extractResult()}.
*
* @param value value to update the distribution with.
*/
public void update(long value) {
++count;
min = Math.min(min, value);
max = Math.max(max, value);
sum += value;
sketch.ifPresent(currSketch -> currSketch.update(value));
}

/** Merges two distributions. */
public DistributionData combine(DistributionData other) {
if (sketch.isPresent()
&& other.sketch.isPresent()
&& sketch.get().getN() > 0
&& other.sketch.get().getN() > 0) {
final DoublesUnion union = new DoublesUnionBuilder().build();
union.update(sketch.get());
union.update(other.sketch.get());
sketch = Optional.of(union.getResult());
} else if (other.sketch.isPresent() && other.sketch.get().getN() > 0) {
sketch = other.sketch;
}
sum += other.sum;
count += other.count;
max = Math.max(max, other.max);
min = Math.min(min, other.min);
return this;
}

public DistributionData reset() {
this.sum = 0L;
this.count = 0L;
this.min = Long.MAX_VALUE;
this.max = Long.MIN_VALUE;
if (!this.percentiles.isEmpty()) {
final DoublesSketchBuilder doublesSketchBuilder = new DoublesSketchBuilder();
this.sketch = Optional.of(doublesSketchBuilder.setK(SKETCH_SUMMARY_SIZE).build());
} else {
this.sketch = Optional.empty();
}
return this;
}

/** Generates DistributionResult from DistributionData. */
public DistributionResult extractResult() {
return DistributionResult.create(sum(), count(), min(), max());
return DistributionResult.create(sum(), count(), min(), max(), percentiles());
}

@Override
public boolean equals(Object object) {
if (object instanceof DistributionData) {
DistributionData other = (DistributionData) object;
return Objects.equals(max, other.max())
&& Objects.equals(min, other.min())
&& Objects.equals(count, other.count())
&& Objects.equals(sum, other.sum())
&& Objects.equals(percentiles(), other.percentiles());
}
return false;
}

@Override
public int hashCode() {
return Objects.hash(min, max, sum, count, percentiles());
}

@Override
public String toString() {
return "DistributionData{"
+ "sum="
+ sum
+ ", "
+ "count="
+ count
+ ", "
+ "min="
+ min
+ ", "
+ "max="
+ max
+ ", "
+ "percentiles="
+ percentiles()
+ "}";
}

private void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
if (sketch.isPresent()) {
byte[] bytes = sketch.get().toByteArray();
out.writeInt(bytes.length);
out.write(bytes);
}
}

@SuppressWarnings("ResultOfMethodCallIgnored")
private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
in.defaultReadObject();
if (!this.percentiles.isEmpty()) {
int len = in.readInt();
byte[] bytes = new byte[len];
in.read(bytes);
this.sketch = Optional.of(UpdateDoublesSketch.heapify(Memory.wrap(bytes)));
} else {
this.sketch = Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.metrics;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.Set;
import org.apache.beam.sdk.metrics.MetricName;

/**
* Value class to represent Metric name and percentiles together. {@link MetricsContainerImpl} uses
* a map of this key and the Distribution Metric.
*/
@AutoValue
public abstract class DistributionMetricKey implements Serializable {
public abstract MetricName getMetricName();

public abstract Set<Double> getPercentiles();

public static DistributionMetricKey create(MetricName metricName, Set<Double> percentiles) {
return new AutoValue_DistributionMetricKey(metricName, percentiles);
}
}
Loading

0 comments on commit 9459eef

Please sign in to comment.