Skip to content

Commit

Permalink
[signalfxregistry]: Add option to send delta histogram count buckets.
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Aug 11, 2022
1 parent fcb005d commit e5a38e9
Show file tree
Hide file tree
Showing 8 changed files with 338 additions and 9 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,8 @@ subprojects {
'io.micrometer.core.instrument.LongTaskTimer#record(java.util.function.BooleanSupplier)',
'io.micrometer.core.instrument.LongTaskTimer#record(java.util.function.IntSupplier)',
'io.micrometer.core.instrument.LongTaskTimer#record(java.util.function.LongSupplier)',
'io.micrometer.core.instrument.LongTaskTimer#record(java.util.function.DoubleSupplier)'
'io.micrometer.core.instrument.LongTaskTimer#record(java.util.function.DoubleSupplier)',
'io.micrometer.signalfx.SignalFxConfig#publishDeltaHistogram()'
]
onlyIf { compatibleVersion != 'SKIP' }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2022 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.signalfx;

import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.distribution.CountAtBucket;

final class DeltaHistogramCounts {

@Nullable
private CountAtBucket[] lastHistogramCounts;

CountAtBucket[] calculate(CountAtBucket[] currentHistogramCounts) {
if (lastHistogramCounts == null || lastHistogramCounts.length == 0) {
lastHistogramCounts = currentHistogramCounts;
return currentHistogramCounts;
}

CountAtBucket[] retHistogramCounts = new CountAtBucket[currentHistogramCounts.length];
for (int i = 0; i < currentHistogramCounts.length; i++) {
retHistogramCounts[i] = new CountAtBucket(currentHistogramCounts[i].bucket(),
currentHistogramCounts[i].count() - lastHistogramCounts[i].count());
}
lastHistogramCounts = currentHistogramCounts;
return retHistogramCounts;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ default String accessToken() {
}

/**
* If both "publishCumulativeHistogram" and "publishDeltaHistogram" are set, then
* delta will be used.
* @return {@code true} if the SignalFx registry should emit cumulative histogram
* buckets.
* @since 1.9.0
Expand All @@ -51,6 +53,16 @@ default boolean publishCumulativeHistogram() {
return getBoolean(this, "publishCumulativeHistogram").orElse(false);
}

/**
* If both "publishCumulativeHistogram" and "publishDeltaHistogram" are set, then
* delta will be used.
* @return {@code true} if the SignalFx registry should emit delta histogram buckets.
* @since 1.10.0
*/
default boolean publishDeltaHistogram() {
return getBoolean(this, "publishDeltaHistogram").orElse(false);
}

/**
* @return The URI to ship metrics to. If you need to publish metrics to an internal
* proxy en route to SignalFx, you can define the location of the proxy with this.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class SignalFxMeterRegistry extends StepMeterRegistry {

private final boolean publishCumulativeHistogram;

private final boolean publishDeltaHistogram;

public SignalFxMeterRegistry(SignalFxConfig config, Clock clock) {
this(config, clock, DEFAULT_THREAD_FACTORY);
}
Expand All @@ -93,6 +95,7 @@ else if ("https".equals(apiUri.getScheme())) {
this.dataPointReceiverFactory = new HttpDataPointProtobufReceiverFactory(signalFxEndpoint);
this.eventReceiverFactory = new HttpEventProtobufReceiverFactory(signalFxEndpoint);
this.publishCumulativeHistogram = config.publishCumulativeHistogram();
this.publishDeltaHistogram = config.publishDeltaHistogram();

config().namingConvention(new SignalFxNamingConvention());

Expand Down Expand Up @@ -136,23 +139,23 @@ protected void publish() {
@Override
protected Timer newTimer(Meter.Id id, DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector) {
if (!publishCumulativeHistogram) {
if (!publishCumulativeHistogram && !publishDeltaHistogram) {
return super.newTimer(id, distributionStatisticConfig, pauseDetector);
}
Timer timer = new SignalfxTimer(id, clock, distributionStatisticConfig, pauseDetector, getBaseTimeUnit(),
config.step().toMillis());
config.step().toMillis(), publishDeltaHistogram);
HistogramGauges.registerWithCommonFormat(timer, this);
return timer;
}

@Override
protected DistributionSummary newDistributionSummary(Meter.Id id,
DistributionStatisticConfig distributionStatisticConfig, double scale) {
if (!publishCumulativeHistogram) {
if (!publishCumulativeHistogram && !publishDeltaHistogram) {
return super.newDistributionSummary(id, distributionStatisticConfig, scale);
}
DistributionSummary summary = new SignalfxDistributionSummary(id, clock, distributionStatisticConfig, scale,
config.step().toMillis());
config.step().toMillis(), publishDeltaHistogram);
HistogramGauges.registerWithCommonFormat(summary, this);
return summary;
}
Expand Down Expand Up @@ -208,6 +211,10 @@ Stream<SignalFxProtocolBuffers.DataPoint.Builder> addTimeGauge(TimeGauge timeGau
}

Stream<SignalFxProtocolBuffers.DataPoint.Builder> addGauge(Gauge gauge) {
if (publishDeltaHistogram && gauge.getId().syntheticAssociation() != null
&& gauge.getId().getName().endsWith(".histogram")) {
return Stream.of(addDatapoint(gauge, COUNTER, null, gauge.value()));
}
if (publishCumulativeHistogram && gauge.getId().syntheticAssociation() != null
&& gauge.getId().getName().endsWith(".histogram")) {
return Stream.of(addDatapoint(gauge, CUMULATIVE_COUNTER, null, gauge.value()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package io.micrometer.signalfx;

import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.AbstractDistributionSummary;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.distribution.TimeWindowMax;
import io.micrometer.core.instrument.step.StepTuple2;

Expand All @@ -44,11 +46,20 @@ final class SignalfxDistributionSummary extends AbstractDistributionSummary {

private final TimeWindowMax max;

@Nullable
private final DeltaHistogramCounts deltaHistogramCounts;

SignalfxDistributionSummary(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
double scale, long stepMillis) {
double scale, long stepMillis, boolean isDelta) {
super(id, clock, CumulativeHistogramConfigUtil.updateConfig(distributionStatisticConfig), scale, false);
this.countTotal = new StepTuple2<>(clock, stepMillis, 0L, 0.0, count::sumThenReset, total::sumThenReset);
max = new TimeWindowMax(clock, distributionStatisticConfig);
if (isDelta) {
deltaHistogramCounts = new DeltaHistogramCounts();
}
else {
deltaHistogramCounts = null;
}
}

@Override
Expand All @@ -73,4 +84,20 @@ public double max() {
return max.poll();
}

@Override
public HistogramSnapshot takeSnapshot() {
HistogramSnapshot currentSnapshot = super.takeSnapshot();
if (deltaHistogramCounts == null) {
return currentSnapshot;
}
return new HistogramSnapshot(currentSnapshot.count(), // Already delta in sfx
// implementation.
currentSnapshot.total(), // Already delta in sfx implementation.
currentSnapshot.max(), // Max cannot be calculated as delta, keep the
// current.
currentSnapshot.percentileValues(), // No changes to the percentile
// values.
deltaHistogramCounts.calculate(currentSnapshot.histogramCounts()), currentSnapshot::outputSummary);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package io.micrometer.signalfx;

import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.AbstractTimer;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.distribution.TimeWindowMax;
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
import io.micrometer.core.instrument.step.StepTuple2;
Expand Down Expand Up @@ -45,12 +47,22 @@ final class SignalfxTimer extends AbstractTimer {

private final TimeWindowMax max;

@Nullable
private final DeltaHistogramCounts deltaHistogramCounts;

SignalfxTimer(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector, TimeUnit baseTimeUnit, long stepMillis) {
PauseDetector pauseDetector, TimeUnit baseTimeUnit, long stepMillis, boolean isDelta) {
super(id, clock, CumulativeHistogramConfigUtil.updateConfig(distributionStatisticConfig), pauseDetector,
baseTimeUnit, false);
countTotal = new StepTuple2<>(clock, stepMillis, 0L, 0L, count::sumThenReset, total::sumThenReset);
max = new TimeWindowMax(clock, distributionStatisticConfig);
if (!distributionStatisticConfig.isPublishingPercentiles()
&& distributionStatisticConfig.isPublishingHistogram() && isDelta) {
deltaHistogramCounts = new DeltaHistogramCounts();
}
else {
deltaHistogramCounts = null;
}
}

@Override
Expand All @@ -76,4 +88,19 @@ public double max(TimeUnit unit) {
return max.poll(unit);
}

@Override
public HistogramSnapshot takeSnapshot() {
HistogramSnapshot currentSnapshot = super.takeSnapshot();
if (deltaHistogramCounts == null) {
return currentSnapshot;
}
return new HistogramSnapshot(currentSnapshot.count(), // Already delta in sfx
// implementation
currentSnapshot.total(), // Already delta in sfx implementation
currentSnapshot.max(), // Max cannot be calculated as delta, keep the
// current.
null, // No percentile values
deltaHistogramCounts.calculate(currentSnapshot.histogramCounts()), currentSnapshot::outputSummary);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2022 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.micrometer.signalfx;

import io.micrometer.core.instrument.distribution.CountAtBucket;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class DeltaHistogramCountsTest {

@Test
void empty() {
DeltaHistogramCounts deltaHistogramCounts = new DeltaHistogramCounts();
assertThat(deltaHistogramCounts.calculate(new CountAtBucket[] {})).isEmpty();
assertThat(deltaHistogramCounts.calculate(new CountAtBucket[] {})).isEmpty();
}

@Test
void nonEmpty() {
DeltaHistogramCounts deltaHistogramCounts = new DeltaHistogramCounts();
CountAtBucket[] first = new CountAtBucket[] { new CountAtBucket(1.0, 0), new CountAtBucket(5.0, 1),
new CountAtBucket(Double.MAX_VALUE, 1) };
assertThat(deltaHistogramCounts.calculate(first)).isEqualTo(first);
CountAtBucket[] second = new CountAtBucket[] { new CountAtBucket(1.0, 0), new CountAtBucket(5.0, 2),
new CountAtBucket(Double.MAX_VALUE, 3) };
assertThat(deltaHistogramCounts.calculate(second)).isEqualTo(new CountAtBucket[] { new CountAtBucket(1.0, 0),
new CountAtBucket(5.0, 1), new CountAtBucket(Double.MAX_VALUE, 2) });
}

}
Loading

0 comments on commit e5a38e9

Please sign in to comment.