Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[signalfxregistry]: Add option to send delta histogram count buckets. #3350

Merged
merged 1 commit into from
Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,8 @@ subprojects {
'io.micrometer.core.instrument.LongTaskTimer#record(java.util.function.BooleanSupplier)',
'io.micrometer.core.instrument.LongTaskTimer#record(java.util.function.IntSupplier)',
'io.micrometer.core.instrument.LongTaskTimer#record(java.util.function.LongSupplier)',
'io.micrometer.core.instrument.LongTaskTimer#record(java.util.function.DoubleSupplier)'
'io.micrometer.core.instrument.LongTaskTimer#record(java.util.function.DoubleSupplier)',
'io.micrometer.signalfx.SignalFxConfig#publishDeltaHistogram()'
]
onlyIf { compatibleVersion != 'SKIP' }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2022 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.signalfx;

import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.distribution.CountAtBucket;

final class DeltaHistogramCounts {

@Nullable
private CountAtBucket[] lastHistogramCounts;

CountAtBucket[] calculate(CountAtBucket[] currentHistogramCounts) {
if (lastHistogramCounts == null || lastHistogramCounts.length == 0) {
lastHistogramCounts = currentHistogramCounts;
return currentHistogramCounts;
}

CountAtBucket[] retHistogramCounts = new CountAtBucket[currentHistogramCounts.length];
for (int i = 0; i < currentHistogramCounts.length; i++) {
retHistogramCounts[i] = new CountAtBucket(currentHistogramCounts[i].bucket(),
currentHistogramCounts[i].count() - lastHistogramCounts[i].count());
}
lastHistogramCounts = currentHistogramCounts;
return retHistogramCounts;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ default String accessToken() {
}

/**
* If both "publishCumulativeHistogram" and "publishDeltaHistogram" are set, then
* delta will be used.
* @return {@code true} if the SignalFx registry should emit cumulative histogram
* buckets.
* @since 1.9.0
Expand All @@ -51,6 +53,16 @@ default boolean publishCumulativeHistogram() {
return getBoolean(this, "publishCumulativeHistogram").orElse(false);
}

/**
* If both "publishCumulativeHistogram" and "publishDeltaHistogram" are set, then
* delta will be used.
* @return {@code true} if the SignalFx registry should emit delta histogram buckets.
* @since 1.10.0
*/
default boolean publishDeltaHistogram() {
return getBoolean(this, "publishDeltaHistogram").orElse(false);
}

/**
* @return The URI to ship metrics to. If you need to publish metrics to an internal
* proxy en route to SignalFx, you can define the location of the proxy with this.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class SignalFxMeterRegistry extends StepMeterRegistry {

private final boolean publishCumulativeHistogram;

private final boolean publishDeltaHistogram;

public SignalFxMeterRegistry(SignalFxConfig config, Clock clock) {
this(config, clock, DEFAULT_THREAD_FACTORY);
}
Expand All @@ -93,6 +95,7 @@ else if ("https".equals(apiUri.getScheme())) {
this.dataPointReceiverFactory = new HttpDataPointProtobufReceiverFactory(signalFxEndpoint);
this.eventReceiverFactory = new HttpEventProtobufReceiverFactory(signalFxEndpoint);
this.publishCumulativeHistogram = config.publishCumulativeHistogram();
this.publishDeltaHistogram = config.publishDeltaHistogram();

config().namingConvention(new SignalFxNamingConvention());

Expand Down Expand Up @@ -136,23 +139,23 @@ protected void publish() {
@Override
protected Timer newTimer(Meter.Id id, DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector) {
if (!publishCumulativeHistogram) {
if (!publishCumulativeHistogram && !publishDeltaHistogram) {
return super.newTimer(id, distributionStatisticConfig, pauseDetector);
}
Timer timer = new SignalfxTimer(id, clock, distributionStatisticConfig, pauseDetector, getBaseTimeUnit(),
config.step().toMillis());
config.step().toMillis(), publishDeltaHistogram);
HistogramGauges.registerWithCommonFormat(timer, this);
return timer;
}

@Override
protected DistributionSummary newDistributionSummary(Meter.Id id,
DistributionStatisticConfig distributionStatisticConfig, double scale) {
if (!publishCumulativeHistogram) {
if (!publishCumulativeHistogram && !publishDeltaHistogram) {
return super.newDistributionSummary(id, distributionStatisticConfig, scale);
}
DistributionSummary summary = new SignalfxDistributionSummary(id, clock, distributionStatisticConfig, scale,
config.step().toMillis());
config.step().toMillis(), publishDeltaHistogram);
HistogramGauges.registerWithCommonFormat(summary, this);
return summary;
}
Expand Down Expand Up @@ -208,6 +211,10 @@ Stream<SignalFxProtocolBuffers.DataPoint.Builder> addTimeGauge(TimeGauge timeGau
}

Stream<SignalFxProtocolBuffers.DataPoint.Builder> addGauge(Gauge gauge) {
if (publishDeltaHistogram && gauge.getId().syntheticAssociation() != null
&& gauge.getId().getName().endsWith(".histogram")) {
return Stream.of(addDatapoint(gauge, COUNTER, null, gauge.value()));
}
if (publishCumulativeHistogram && gauge.getId().syntheticAssociation() != null
&& gauge.getId().getName().endsWith(".histogram")) {
return Stream.of(addDatapoint(gauge, CUMULATIVE_COUNTER, null, gauge.value()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package io.micrometer.signalfx;

import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.AbstractDistributionSummary;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.distribution.TimeWindowMax;
import io.micrometer.core.instrument.step.StepTuple2;

Expand All @@ -44,11 +46,20 @@ final class SignalfxDistributionSummary extends AbstractDistributionSummary {

private final TimeWindowMax max;

@Nullable
private final DeltaHistogramCounts deltaHistogramCounts;

SignalfxDistributionSummary(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
double scale, long stepMillis) {
double scale, long stepMillis, boolean isDelta) {
super(id, clock, CumulativeHistogramConfigUtil.updateConfig(distributionStatisticConfig), scale, false);
this.countTotal = new StepTuple2<>(clock, stepMillis, 0L, 0.0, count::sumThenReset, total::sumThenReset);
max = new TimeWindowMax(clock, distributionStatisticConfig);
if (isDelta) {
deltaHistogramCounts = new DeltaHistogramCounts();
}
else {
deltaHistogramCounts = null;
}
}

@Override
Expand All @@ -73,4 +84,20 @@ public double max() {
return max.poll();
}

@Override
public HistogramSnapshot takeSnapshot() {
HistogramSnapshot currentSnapshot = super.takeSnapshot();
if (deltaHistogramCounts == null) {
return currentSnapshot;
}
return new HistogramSnapshot(currentSnapshot.count(), // Already delta in sfx
// implementation.
currentSnapshot.total(), // Already delta in sfx implementation.
currentSnapshot.max(), // Max cannot be calculated as delta, keep the
// current.
currentSnapshot.percentileValues(), // No changes to the percentile
// values.
deltaHistogramCounts.calculate(currentSnapshot.histogramCounts()), currentSnapshot::outputSummary);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package io.micrometer.signalfx;

import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.AbstractTimer;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.distribution.TimeWindowMax;
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
import io.micrometer.core.instrument.step.StepTuple2;
Expand Down Expand Up @@ -45,12 +47,22 @@ final class SignalfxTimer extends AbstractTimer {

private final TimeWindowMax max;

@Nullable
private final DeltaHistogramCounts deltaHistogramCounts;

SignalfxTimer(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector, TimeUnit baseTimeUnit, long stepMillis) {
PauseDetector pauseDetector, TimeUnit baseTimeUnit, long stepMillis, boolean isDelta) {
super(id, clock, CumulativeHistogramConfigUtil.updateConfig(distributionStatisticConfig), pauseDetector,
baseTimeUnit, false);
countTotal = new StepTuple2<>(clock, stepMillis, 0L, 0L, count::sumThenReset, total::sumThenReset);
max = new TimeWindowMax(clock, distributionStatisticConfig);
if (!distributionStatisticConfig.isPublishingPercentiles()
&& distributionStatisticConfig.isPublishingHistogram() && isDelta) {
deltaHistogramCounts = new DeltaHistogramCounts();
}
else {
deltaHistogramCounts = null;
}
}

@Override
Expand All @@ -76,4 +88,19 @@ public double max(TimeUnit unit) {
return max.poll(unit);
}

@Override
public HistogramSnapshot takeSnapshot() {
HistogramSnapshot currentSnapshot = super.takeSnapshot();
if (deltaHistogramCounts == null) {
return currentSnapshot;
}
return new HistogramSnapshot(currentSnapshot.count(), // Already delta in sfx
// implementation
currentSnapshot.total(), // Already delta in sfx implementation
currentSnapshot.max(), // Max cannot be calculated as delta, keep the
// current.
null, // No percentile values
deltaHistogramCounts.calculate(currentSnapshot.histogramCounts()), currentSnapshot::outputSummary);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2022 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.micrometer.signalfx;

import io.micrometer.core.instrument.distribution.CountAtBucket;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class DeltaHistogramCountsTest {

@Test
void empty() {
DeltaHistogramCounts deltaHistogramCounts = new DeltaHistogramCounts();
assertThat(deltaHistogramCounts.calculate(new CountAtBucket[] {})).isEmpty();
assertThat(deltaHistogramCounts.calculate(new CountAtBucket[] {})).isEmpty();
}

@Test
void nonEmpty() {
DeltaHistogramCounts deltaHistogramCounts = new DeltaHistogramCounts();
CountAtBucket[] first = new CountAtBucket[] { new CountAtBucket(1.0, 0), new CountAtBucket(5.0, 1),
new CountAtBucket(Double.MAX_VALUE, 1) };
assertThat(deltaHistogramCounts.calculate(first)).isEqualTo(first);
CountAtBucket[] second = new CountAtBucket[] { new CountAtBucket(1.0, 0), new CountAtBucket(5.0, 2),
new CountAtBucket(Double.MAX_VALUE, 3) };
assertThat(deltaHistogramCounts.calculate(second)).isEqualTo(new CountAtBucket[] { new CountAtBucket(1.0, 0),
new CountAtBucket(5.0, 1), new CountAtBucket(Double.MAX_VALUE, 2) });
}

}
Loading