Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish relative to PushMeterRegistry initialization time and align StepMeter boundaries to that #3450

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -441,28 +441,30 @@ public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
void shouldNotExportCumulativeHistogramDataByDefault_Timer() {
MockClock mockClock = new MockClock();
SignalFxMeterRegistry registry = new SignalFxMeterRegistry(config, mockClock);
Timer timer = Timer.builder("my.timer")
.serviceLevelObjectives(Duration.ofMillis(1), Duration.ofMillis(10), Duration.ofMillis(100),
Duration.ofMillis(1000))
.distributionStatisticExpiry(Duration.ofSeconds(10)).distributionStatisticBufferLength(1)
.register(registry);
Timer timer = Timer.builder("my.timer").serviceLevelObjectives(Duration.ofMillis(1), Duration.ofMillis(10),
Duration.ofMillis(100), Duration.ofMillis(1000)).register(registry);

timer.record(50, TimeUnit.MILLISECONDS);
timer.record(5000, TimeUnit.MILLISECONDS);
mockClock.add(config.step());
getDataPoints(registry, mockClock.wallTime());

// histogram data recorded at the beginning of the step would be rotated out by
// the time of publish
// record in the middle of the step to avoid this issue
mockClock.add(config.step().dividedBy(2));
timer.record(5, TimeUnit.MILLISECONDS);
timer.record(500, TimeUnit.MILLISECONDS);
mockClock.add(config.step().minus(Duration.ofMillis(1)));

mockClock.add(config.step().dividedBy(2));

assertThat(getDataPoints(registry, mockClock.wallTime())).hasSize(8)
.has(gaugePoint("my.timer.avg", 0.2525), atIndex(0)).has(counterPoint("my.timer.count", 2), atIndex(1))
.has(allOf(gaugePoint("my.timer.histogram", 0), bucket(Duration.ofMillis(1))), atIndex(2))
.has(allOf(gaugePoint("my.timer.histogram", 1), bucket(Duration.ofMillis(10))), atIndex(3))
.has(allOf(gaugePoint("my.timer.histogram", 1), bucket(Duration.ofMillis(100))), atIndex(4))
.has(allOf(gaugePoint("my.timer.histogram", 2), bucket(Duration.ofMillis(1000))), atIndex(5))
.has(gaugePoint("my.timer.max", 0.5), atIndex(6))
.has(gaugePoint("my.timer.max", 5), atIndex(6))
.has(counterPoint("my.timer.totalTime", 0.505), atIndex(7));

registry.close();
Expand All @@ -473,17 +475,20 @@ void shouldNotExportCumulativeHistogramDataByDefault_DistributionSummary() {
MockClock mockClock = new MockClock();
SignalFxMeterRegistry registry = new SignalFxMeterRegistry(config, mockClock);
DistributionSummary summary = DistributionSummary.builder("my.distribution")
.serviceLevelObjectives(1, 10, 100, 1000).distributionStatisticExpiry(Duration.ofSeconds(10))
.distributionStatisticBufferLength(1).register(registry);
.serviceLevelObjectives(1, 10, 100, 1000).register(registry);

summary.record(50);
summary.record(5000);
mockClock.add(config.step());
getDataPoints(registry, mockClock.wallTime());

// histogram data recorded at the beginning of the step would be rotated out by
// the time of publish
// record in the middle of the step to avoid this issue
mockClock.add(config.step().dividedBy(2));
summary.record(5);
summary.record(500);
mockClock.add(config.step().minus(Duration.ofMillis(1)));
mockClock.add(config.step().dividedBy(2));

assertThat(getDataPoints(registry, mockClock.wallTime())).hasSize(8)
.has(gaugePoint("my.distribution.avg", 252.5), atIndex(0))
Expand All @@ -492,7 +497,7 @@ void shouldNotExportCumulativeHistogramDataByDefault_DistributionSummary() {
.has(allOf(gaugePoint("my.distribution.histogram", 1), bucket(10)), atIndex(3))
.has(allOf(gaugePoint("my.distribution.histogram", 1), bucket(100)), atIndex(4))
.has(allOf(gaugePoint("my.distribution.histogram", 2), bucket(1000)), atIndex(5))
.has(gaugePoint("my.distribution.max", 500), atIndex(6))
.has(gaugePoint("my.distribution.max", 5000), atIndex(6))
.has(counterPoint("my.distribution.totalTime", 505), atIndex(7));

registry.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public abstract class PushMeterRegistry extends MeterRegistry {

private final PushRegistryConfig config;

private final long registryCreationOffsetFromEpochStepMillis;

@Nullable
private ScheduledExecutorService scheduledExecutorService;

Expand All @@ -42,6 +44,7 @@ protected PushMeterRegistry(PushRegistryConfig config, Clock clock) {
config.requireValid();

this.config = config;
this.registryCreationOffsetFromEpochStepMillis = clock.wallTime() % this.config.step().toMillis();
}

protected abstract void publish();
Expand Down Expand Up @@ -77,12 +80,32 @@ public void start(ThreadFactory threadFactory) {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
// time publication to happen just after StepValue finishes the step
long stepMillis = config.step().toMillis();
long initialDelayMillis = stepMillis - (clock.wallTime() % stepMillis) + 1;
long initialDelayMillis = calculateInitialDelayMillis();

scheduledExecutorService.scheduleAtFixedRate(this::publishSafely, initialDelayMillis, stepMillis,
TimeUnit.MILLISECONDS);
}
}

// VisibleForTesting
long calculateInitialDelayMillis() {
long stepMillis = config.step().toMillis();
return stepMillis - ((clock.wallTime() - getPushOffsetFromEpochStepMillis()) % stepMillis) + 1;
}

/**
* Publishing may be aligned globally (for example, offset 0 from step intervals
* starting at the Unix Epoch) with a fixed offset such that all application instances
* publish metrics at the same point in time, or it may be dynamic for each
* application instance relative to when the registry was created. This value is given
* in millisecond as the offset from the Unix Epoch-based step intervals.
* @return how many milliseconds publishing is offset from Unix Epoch-based step
* intervals
*/
protected long getPushOffsetFromEpochStepMillis() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the name of this method and added a JavaDoc in hopes of making things more clear.

return config.isPushAlignedGlobally() ? 0 : registryCreationOffsetFromEpochStepMillis;
}

public void stop() {
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,22 @@ default int batchSize() {
return getInteger(this, "batchSize").orElse(10000);
}

/**
* This controls when the call to {@link PushMeterRegistry#publish()} is scheduled. If
* this returns true, publishing is scheduled at the same time relative to the Unix
* Epoch regardless of when the registry was created. This has the effect of causing
* all application instances publishing metrics with the same configuration to publish
* at the same point in time globally, which may overload resources by concentrating
* the work to publish and ingest metrics from many application instances. The more
* instances you have publishing metrics at the same time, the more of a problem this
* will be.
* @return false if publishing should be scheduled relative to registry instantiation
* time. Default is {@code false} to avoid the documented resource exhaustion issue.
*/
default boolean isPushAlignedGlobally() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably spend endless time coming up with better and better names, but here's a third iteration where I've avoided using "epoch" in the name because I think it will be less clear what this does compared to focusing on the effect of pushes being aligned globally if you set this to true. Anyone who has feedback about the naming, feel free to share (before we release it, ideally).

I've updated the JavaDoc to explain the issue that motivated this configuration being introduced and the reason for its default.

return getBoolean(this, "isPushAlignedGlobally").orElse(false);
}

@Override
default Validated<?> validate() {
return validate(this);
Expand All @@ -105,7 +121,8 @@ static Validated<?> validate(PushRegistryConfig config) {
return checkAll(config, check("step", PushRegistryConfig::step),
check("connectTimeout", PushRegistryConfig::connectTimeout),
check("readTimeout", PushRegistryConfig::readTimeout),
check("batchSize", PushRegistryConfig::batchSize), check("numThreads", PushRegistryConfig::numThreads));
check("batchSize", PushRegistryConfig::batchSize), check("numThreads", PushRegistryConfig::numThreads),
check("isPushAlignedGlobally", PushRegistryConfig::isPushAlignedGlobally));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ public class StepCounter extends AbstractMeter implements Counter {
private final StepDouble value;

public StepCounter(Id id, Clock clock, long stepMillis) {
this(id, clock, stepMillis, 0);
}

public StepCounter(Id id, Clock clock, long stepMillis, long pushOffsetFromEpochStepMillis) {
super(id);
this.value = new StepDouble(clock, stepMillis);
this.value = new StepDouble(clock, stepMillis, pushOffsetFromEpochStepMillis);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,18 @@ public class StepDistributionSummary extends AbstractDistributionSummary {
* @param supportsAggregablePercentiles whether it supports aggregable percentiles
*/
public StepDistributionSummary(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
double scale, long stepMillis, boolean supportsAggregablePercentiles) {
double scale, long stepMillis, long pushOffsetFromEpochStepMillis, boolean supportsAggregablePercentiles) {
super(id, clock, distributionStatisticConfig, scale, supportsAggregablePercentiles);
this.countTotal = new StepTuple2<>(clock, stepMillis, 0L, 0.0, count::sumThenReset, total::sumThenReset);
this.countTotal = new StepTuple2<>(clock, stepMillis, pushOffsetFromEpochStepMillis, 0L, 0.0,
count::sumThenReset, total::sumThenReset);
this.max = new TimeWindowMax(clock, distributionStatisticConfig);
}

public StepDistributionSummary(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
double scale, long stepMillis, boolean supportsAggregablePercentiles) {
this(id, clock, distributionStatisticConfig, scale, stepMillis, 0, supportsAggregablePercentiles);
}

@Override
protected void recordNonNegative(double amount) {
count.add(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ public class StepDouble extends StepValue<Double> {
private final DoubleAdder current = new DoubleAdder();

public StepDouble(Clock clock, long stepMillis) {
super(clock, stepMillis);
this(clock, stepMillis, 0);
}

public StepDouble(Clock clock, long stepMillis, long pushOffsetFromEpochStepMillis) {
super(clock, stepMillis, pushOffsetFromEpochStepMillis);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@ public class StepFunctionCounter<T> extends AbstractMeter implements FunctionCou

private StepDouble count;

public StepFunctionCounter(Id id, Clock clock, long stepMillis, T obj, ToDoubleFunction<T> f) {
public StepFunctionCounter(Id id, Clock clock, long stepMillis, long pushOffsetFromEpochStepMillis, T obj,
ToDoubleFunction<T> f) {
super(id);
this.ref = new WeakReference<>(obj);
this.f = f;
this.count = new StepDouble(clock, stepMillis);
this.count = new StepDouble(clock, stepMillis, pushOffsetFromEpochStepMillis);
}

public StepFunctionCounter(Id id, Clock clock, long stepMillis, T obj, ToDoubleFunction<T> f) {
this(id, clock, stepMillis, 0, obj, f);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,23 @@ public class StepFunctionTimer<T> implements FunctionTimer {

private final StepTuple2<Long, Double> countTotal;

public StepFunctionTimer(Id id, Clock clock, long stepMillis, T obj, ToLongFunction<T> countFunction,
ToDoubleFunction<T> totalTimeFunction, TimeUnit totalTimeFunctionUnit, TimeUnit baseTimeUnit) {
public StepFunctionTimer(Id id, Clock clock, long stepMillis, long pushOffsetFromEpochStepMillis, T obj,
ToLongFunction<T> countFunction, ToDoubleFunction<T> totalTimeFunction, TimeUnit totalTimeFunctionUnit,
TimeUnit baseTimeUnit) {
this.id = id;
this.clock = clock;
this.ref = new WeakReference<>(obj);
this.countFunction = countFunction;
this.totalTimeFunction = totalTimeFunction;
this.totalTimeFunctionUnit = totalTimeFunctionUnit;
this.baseTimeUnit = baseTimeUnit;
this.countTotal = new StepTuple2<>(clock, stepMillis, 0L, 0.0, count::sumThenReset, total::sumThenReset);
this.countTotal = new StepTuple2<>(clock, stepMillis, pushOffsetFromEpochStepMillis, 0L, 0.0,
count::sumThenReset, total::sumThenReset);
}

public StepFunctionTimer(Id id, Clock clock, long stepMillis, T obj, ToLongFunction<T> countFunction,
ToDoubleFunction<T> totalTimeFunction, TimeUnit totalTimeFunctionUnit, TimeUnit baseTimeUnit) {
this(id, clock, stepMillis, 0, obj, countFunction, totalTimeFunction, totalTimeFunctionUnit, baseTimeUnit);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ public class StepLong extends StepValue<Long> {
private final LongAdder current = new LongAdder();

public StepLong(Clock clock, long stepMillis) {
super(clock, stepMillis);
this(clock, stepMillis, 0);
}

public StepLong(Clock clock, long stepMillis, long pushOffsetFromEpochStepMillis) {
super(clock, stepMillis, pushOffsetFromEpochStepMillis);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ protected <T> Gauge newGauge(Meter.Id id, @Nullable T obj, ToDoubleFunction<T> v

@Override
protected Counter newCounter(Meter.Id id) {
return new StepCounter(id, clock, config.step().toMillis());
return new StepCounter(id, clock, config.step().toMillis(), getPushOffsetFromEpochStepMillis());
}

@Override
Expand All @@ -65,7 +65,7 @@ protected LongTaskTimer newLongTaskTimer(Meter.Id id, DistributionStatisticConfi
protected Timer newTimer(Meter.Id id, DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector) {
Timer timer = new StepTimer(id, clock, distributionStatisticConfig, pauseDetector, getBaseTimeUnit(),
this.config.step().toMillis(), false);
this.config.step().toMillis(), getPushOffsetFromEpochStepMillis(), false);
HistogramGauges.registerWithCommonFormat(timer, this);
return timer;
}
Expand All @@ -74,21 +74,22 @@ protected Timer newTimer(Meter.Id id, DistributionStatisticConfig distributionSt
protected DistributionSummary newDistributionSummary(Meter.Id id,
DistributionStatisticConfig distributionStatisticConfig, double scale) {
DistributionSummary summary = new StepDistributionSummary(id, clock, distributionStatisticConfig, scale,
config.step().toMillis(), false);
config.step().toMillis(), getPushOffsetFromEpochStepMillis(), false);
HistogramGauges.registerWithCommonFormat(summary, this);
return summary;
}

@Override
protected <T> FunctionTimer newFunctionTimer(Meter.Id id, T obj, ToLongFunction<T> countFunction,
ToDoubleFunction<T> totalTimeFunction, TimeUnit totalTimeFunctionUnit) {
return new StepFunctionTimer<>(id, clock, config.step().toMillis(), obj, countFunction, totalTimeFunction,
totalTimeFunctionUnit, getBaseTimeUnit());
return new StepFunctionTimer<>(id, clock, config.step().toMillis(), getPushOffsetFromEpochStepMillis(), obj,
countFunction, totalTimeFunction, totalTimeFunctionUnit, getBaseTimeUnit());
}

@Override
protected <T> FunctionCounter newFunctionCounter(Meter.Id id, T obj, ToDoubleFunction<T> countFunction) {
return new StepFunctionCounter<>(id, clock, config.step().toMillis(), obj, countFunction);
return new StepFunctionCounter<>(id, clock, config.step().toMillis(), getPushOffsetFromEpochStepMillis(), obj,
countFunction);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,20 @@ public class StepTimer extends AbstractTimer {
*/
public StepTimer(final Id id, final Clock clock, final DistributionStatisticConfig distributionStatisticConfig,
final PauseDetector pauseDetector, final TimeUnit baseTimeUnit, final long stepDurationMillis,
final boolean supportsAggregablePercentiles) {
final long pushOffsetFromEpochStepMillis, final boolean supportsAggregablePercentiles) {
super(id, clock, distributionStatisticConfig, pauseDetector, baseTimeUnit, supportsAggregablePercentiles);
countTotal = new StepTuple2<>(clock, stepDurationMillis, 0L, 0L, count::sumThenReset, total::sumThenReset);
countTotal = new StepTuple2<>(clock, stepDurationMillis, pushOffsetFromEpochStepMillis, 0L, 0L,
count::sumThenReset, total::sumThenReset);
max = new TimeWindowMax(clock, distributionStatisticConfig);
}

public StepTimer(final Id id, final Clock clock, final DistributionStatisticConfig distributionStatisticConfig,
final PauseDetector pauseDetector, final TimeUnit baseTimeUnit, final long stepDurationMillis,
final boolean supportsAggregablePercentiles) {
this(id, clock, distributionStatisticConfig, pauseDetector, baseTimeUnit, stepDurationMillis, 0,
supportsAggregablePercentiles);
}

@Override
protected void recordNonNegative(final long amount, final TimeUnit unit) {
final long nanoAmount = (long) TimeUtils.convert(amount, unit, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class StepTuple2<T1, T2> {

private final long stepMillis;

private final long pushOffsetFromEpochStepMillis;

private AtomicLong lastInitPos;

private final T1 t1NoValue;
Expand All @@ -47,21 +49,27 @@ public class StepTuple2<T1, T2> {

private volatile T2 t2Previous;

public StepTuple2(Clock clock, long stepMillis, T1 t1NoValue, T2 t2NoValue, Supplier<T1> t1Supplier,
Supplier<T2> t2Supplier) {
public StepTuple2(Clock clock, long stepMillis, long pushOffsetFromEpochStepMillis, T1 t1NoValue, T2 t2NoValue,
Supplier<T1> t1Supplier, Supplier<T2> t2Supplier) {
this.clock = clock;
this.stepMillis = stepMillis;
this.pushOffsetFromEpochStepMillis = pushOffsetFromEpochStepMillis;
this.t1NoValue = t1NoValue;
this.t2NoValue = t2NoValue;
this.t1Supplier = t1Supplier;
this.t2Supplier = t2Supplier;
this.t1Previous = t1NoValue;
this.t2Previous = t2NoValue;
lastInitPos = new AtomicLong(clock.wallTime() / stepMillis);
lastInitPos = new AtomicLong((clock.wallTime() - pushOffsetFromEpochStepMillis) / stepMillis);
}

public StepTuple2(Clock clock, long stepMillis, T1 t1NoValue, T2 t2NoValue, Supplier<T1> t1Supplier,
Supplier<T2> t2Supplier) {
this(clock, stepMillis, 0, t1NoValue, t2NoValue, t1Supplier, t2Supplier);
}

private void rollCount(long now) {
long stepTime = now / stepMillis;
long stepTime = (now - pushOffsetFromEpochStepMillis) / stepMillis;
long lastInit = lastInitPos.get();
if (lastInit < stepTime && lastInitPos.compareAndSet(lastInit, stepTime)) {
// Need to check if there was any activity during the previous step interval.
Expand Down
Loading