Skip to content

Commit

Permalink
Merge branch '1.10.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
shakuzen committed Apr 13, 2023
2 parents ed06c01 + 2084715 commit a165805
Showing 1 changed file with 25 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import io.micrometer.common.util.internal.logging.InternalLoggerFactory;
import io.micrometer.common.util.internal.logging.WarnThenDebugLogger;
import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.distribution.ValueAtPercentile;
import io.micrometer.core.instrument.util.AbstractPartition;
import io.micrometer.core.ipc.http.HttpSender;
import io.micrometer.dynatrace.AbstractDynatraceExporter;
import io.micrometer.dynatrace.DynatraceConfig;
Expand All @@ -33,10 +33,7 @@
import java.net.MalformedURLException;
import java.net.URI;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -136,12 +133,27 @@ private DimensionList parseDefaultDimensions(Map<String, String> defaultDimensio
*/
@Override
public void export(List<Meter> meters) {
// Lines that are too long to be ingested into Dynatrace, as well as lines that
// contain NaN or Inf values are not returned from "toMetricLines", and are
// therefore dropped.
List<String> metricLines = meters.stream().flatMap(this::toMetricLines).collect(Collectors.toList());
int partitionSize = Math.min(config.batchSize(), DynatraceMetricApiConstants.getPayloadLinesLimit());
List<String> batch = new ArrayList<>(partitionSize);

for (Meter meter : meters) {
// Lines that are too long to be ingested into Dynatrace, as well as lines
// that contain NaN or Inf values are not returned from "toMetricLines",
// and are therefore dropped.
Stream<String> metricLines = toMetricLines(meter);

metricLines.forEach(line -> {
batch.add(line);
if (batch.size() == partitionSize) {
send(batch);
batch.clear();
}
});
}

sendInBatches(metricLines);
if (!batch.isEmpty()) {
send(batch);
}
}

private Stream<String> toMetricLines(Meter meter) {
Expand Down Expand Up @@ -316,7 +328,8 @@ private void send(List<String> metricLines) {
return;
}
try {
logger.debug("Sending {} lines to {}", metricLines.size(), endpoint);
int lineCount = metricLines.size();
logger.debug("Sending {} lines to {}", lineCount, endpoint);

String body = String.join("\n", metricLines);
logger.debug("Sending lines:\n{}", body);
Expand All @@ -329,7 +342,7 @@ private void send(List<String> metricLines) {
requestBuilder.withHeader("User-Agent", "micrometer")
.withPlainText(body)
.send()
.onSuccess(response -> handleSuccess(metricLines.size(), response))
.onSuccess(response -> handleSuccess(lineCount, response))
.onError(response -> logger.error("Failed metric ingestion: Error Code={}, Response Body={}",
response.code(), getTruncatedBody(response)));
}
Expand Down Expand Up @@ -369,21 +382,4 @@ private void handleSuccess(int totalSent, HttpSender.Response response) {
}
}

private void sendInBatches(List<String> metricLines) {
int partitionSize = Math.min(config.batchSize(), DynatraceMetricApiConstants.getPayloadLinesLimit());
MetricLinePartition.partition(metricLines, partitionSize).forEach(this::send);
}

static class MetricLinePartition extends AbstractPartition<String> {

private MetricLinePartition(List<String> list, int partitionSize) {
super(list, partitionSize);
}

static List<List<String>> partition(List<String> list, int partitionSize) {
return new MetricLinePartition(list, partitionSize);
}

}

}

0 comments on commit a165805

Please sign in to comment.