From d4f9f772ff65cb96ddd1ffff22345fc9395fb4db Mon Sep 17 00:00:00 2001 From: zhangzhiyong Date: Thu, 31 Aug 2023 15:25:54 +0800 Subject: [PATCH] update --- hera-all/trace-etl/trace-etl-server/pom.xml | 8 + .../trace/etl/consumer/ConsumerService.java | 38 ++++- .../trace/etl/consumer/DataCacheService.java | 157 ++++++++++++++++++ .../hera/trace/etl/consumer/EnterManager.java | 42 +++++ .../etl/consumer/MetricsParseService.java | 132 +++++++-------- .../trace/etl/consumer/MutiMetricsCall.java | 60 +++++++ .../trace/etl/util/nacos/NacosClientUtil.java | 10 +- .../trace/etl/util/prometheus/HTTPServer.java | 127 +++++--------- .../youpin/trace/etl/test/FastWriter.java | 68 ++++++++ .../youpin/trace/etl/test/WriterTest.java | 149 +++++++++++++++++ 10 files changed, 618 insertions(+), 173 deletions(-) create mode 100644 hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/consumer/DataCacheService.java create mode 100644 hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/consumer/EnterManager.java create mode 100644 hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/consumer/MutiMetricsCall.java create mode 100644 hera-all/trace-etl/trace-etl-server/src/test/java/com/xiaomi/youpin/trace/etl/test/FastWriter.java create mode 100644 hera-all/trace-etl/trace-etl-server/src/test/java/com/xiaomi/youpin/trace/etl/test/WriterTest.java diff --git a/hera-all/trace-etl/trace-etl-server/pom.xml b/hera-all/trace-etl/trace-etl-server/pom.xml index cfb415beb..10c56d684 100644 --- a/hera-all/trace-etl/trace-etl-server/pom.xml +++ b/hera-all/trace-etl/trace-etl-server/pom.xml @@ -20,6 +20,14 @@ 1.0.0-SNAPSHOT + + + com.squareup.okio + okio + 3.5.0 + + + diff --git a/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/consumer/ConsumerService.java b/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/consumer/ConsumerService.java index 5fa5db390..bb4afc81e 100644 --- a/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/consumer/ConsumerService.java +++ b/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/consumer/ConsumerService.java @@ -7,9 +7,7 @@ import com.xiaomi.hera.tspandata.TSpanData; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.thrift.TDeserializer; @@ -19,6 +17,9 @@ import javax.annotation.PostConstruct; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** * @author dingtao @@ -44,6 +45,12 @@ public class ConsumerService { @Autowired private ClientMessageQueue clientMessageQueue; + @Autowired + private DataCacheService cacheService; + + @Autowired + private EnterManager enterManager; + @PostConstruct public void takeMessage() throws MQClientException { // Before initializing rocketmq consumer, @@ -60,13 +67,19 @@ public void takeMessage() throws MQClientException { log.info("init consumer end ..."); } - private class TraceEtlMessageListener implements MessageListenerConcurrently { + private class TraceEtlMessageListener implements MessageListenerOrderly { + + private AtomicInteger i = new AtomicInteger(); + + private AtomicLong time = new AtomicLong(System.currentTimeMillis()); + @Override - public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { + public ConsumeOrderlyStatus consumeMessage(List list, final ConsumeOrderlyContext context) { if (list == null || list.isEmpty()) { - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + return ConsumeOrderlyStatus.SUCCESS; } + enterManager.enter(); for (MessageExt message : list) { String traceId = ""; try { @@ -79,7 +92,18 @@ public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeCo } clientMessageQueue.enqueue(traceId, message); } - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + + long now = System.currentTimeMillis(); + + i.addAndGet(list.size()); + if ((now - time.get() >= TimeUnit.SECONDS.toMillis(15))) { + i.set(0); + time.set(now); + cacheService.cacheData(); + } + + enterManager.processEnter(); + return ConsumeOrderlyStatus.SUCCESS; } } } diff --git a/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/consumer/DataCacheService.java b/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/consumer/DataCacheService.java new file mode 100644 index 000000000..d0fb12cae --- /dev/null +++ b/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/consumer/DataCacheService.java @@ -0,0 +1,157 @@ +package com.xiaomi.hera.trace.etl.consumer; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Sets; +import com.xiaomi.youpin.prometheus.client.MetricsManager; +import com.xiaomi.youpin.prometheus.client.Prometheus; +import com.xiaomi.youpin.prometheus.client.multi.MutiMetrics; +import io.prometheus.client.*; +import io.prometheus.client.exporter.common.TextFormat; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import java.io.ByteArrayOutputStream; +import java.io.OutputStreamWriter; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * @author goodjava@qq.com + * @date 2023/8/29 10:02 + */ +@Service +@Slf4j +public class DataCacheService { + + private CopyOnWriteArrayList cacheData = new CopyOnWriteArrayList<>(); + + @Resource + private MutiMetricsCall call; + + public int dataSize() { + return cacheData.size(); + } + + @PostConstruct + public void init() { + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { + try { + if (cacheData.size() > 4) { + log.info("clear cache data:{}", cacheData.size()); + cacheData.clear(); + } + } catch (Throwable ex) { + log.error(ex.getMessage()); + } + }, 0, 60, TimeUnit.SECONDS); + + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { + try { + enterManager.getMonitor().enter(); + cacheData(); + } catch (Throwable ex) { + log.error(ex.getMessage(), ex); + } finally { + enterManager.getMonitor().leave(); + } + }, 0, 15, TimeUnit.SECONDS); + } + + @Resource + private EnterManager enterManager; + + + + + public byte[] getData() { + log.info("get data"); + Stopwatch sw = Stopwatch.createStarted(); + try { + if (cacheData.size() >= 1) { + return cacheData.remove(0); + } + } finally { + log.info("get data use time:{}ms", sw.elapsed(TimeUnit.MILLISECONDS)); + } + return new byte[]{}; + } + + + public void cacheData() { + call.change(); + Executors.newSingleThreadExecutor().submit(() -> { + log.info("cache data"); + Stopwatch sw = Stopwatch.createStarted(); + List list = new ArrayList<>(); + MutiMetrics old = call.old(); + CollectorRegistry registry = old.getRegistry(); + try { + Field field = registry.getClass().getDeclaredField("namesToCollectors"); + field.setAccessible(true); + Map namesToCollectors = (Map) field.get(registry); + list = namesToCollectors.keySet().stream() + .filter(it -> !it.endsWith("created")) + .collect(Collectors.toList()); + } catch (Exception e) { + log.info("export metrics error : ", e); + } + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputStreamWriter writer = new OutputStreamWriter(baos)) { + TextFormat.writeFormat(TextFormat.CONTENT_TYPE_004, writer, registry.filteredMetricFamilySamples(Sets.newHashSet(list))); + writer.flush(); + byte[] bytes = baos.toByteArray(); + this.cacheData.add(bytes); + } catch (Throwable ex) { + log.error(ex.getMessage()); + } finally { + clearMetrics(old); + } + log.info("cache data use time:{} ms", sw.elapsed(TimeUnit.MILLISECONDS)); + }); + } + + + private void clearMetrics(MutiMetrics old) { + try { + MetricsManager gMetricsMgr = old.gMetricsMgr; + if (gMetricsMgr instanceof Prometheus) { + Prometheus prometheus = (Prometheus) gMetricsMgr; + Map prometheusMetrics = prometheus.prometheusMetrics; + clearTypeMetrics(prometheusMetrics,old.getRegistry()); + prometheus.prometheusMetrics.clear(); + prometheus.prometheusTypeMetrics.clear(); + } + } catch (Exception e) { + log.error("clear metrics error", e); + } + } + + private void clearTypeMetrics(Map prometheusMetrics, CollectorRegistry registry) { + for (String key : prometheusMetrics.keySet()) { + Object o = prometheusMetrics.get(key); + if (o instanceof Counter) { + Counter counter = (Counter) o; + registry.unregister(counter); + } else if (o instanceof Gauge) { + Gauge gauge = (Gauge) o; + gauge.clear(); + registry.unregister(gauge); + } else if (o instanceof Histogram) { + Histogram histogram = (Histogram) o; + histogram.clear(); + registry.unregister(histogram); + } else { + log.error("metrics : " + key + " Type conversion failed, original type : " + o.getClass().getName()); + } + } + } + + +} diff --git a/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/consumer/EnterManager.java b/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/consumer/EnterManager.java new file mode 100644 index 000000000..735f38f6d --- /dev/null +++ b/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/consumer/EnterManager.java @@ -0,0 +1,42 @@ +package com.xiaomi.hera.trace.etl.consumer; + +import com.google.common.util.concurrent.Monitor; +import lombok.Getter; +import org.springframework.stereotype.Service; + +import java.util.function.BooleanSupplier; + +/** + * @author goodjava@qq.com + * @date 2023/8/31 13:49 + */ +@Service +public class EnterManager { + + @Getter + private Monitor monitor = new Monitor(); + + @Getter + private Monitor processMonitor = new Monitor(); + + private Monitor.Guard guard = monitor.newGuard(new BooleanSupplier() { + @Override + public boolean getAsBoolean() { + return false; + } + }); + + + public void enter() { + monitor.enter(); + monitor.leave(); + } + + public void processEnter() { + processMonitor.enter(); + processMonitor.leave(); + } + + + +} diff --git a/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/consumer/MetricsParseService.java b/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/consumer/MetricsParseService.java index 51521da09..d6e655fdc 100644 --- a/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/consumer/MetricsParseService.java +++ b/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/consumer/MetricsParseService.java @@ -12,12 +12,7 @@ import com.xiaomi.hera.trace.etl.service.HeraContextService; import com.xiaomi.hera.trace.etl.service.WriteEsService; import com.xiaomi.hera.trace.etl.util.ThriftUtil; -import com.xiaomi.hera.tspandata.TAttributeKey; -import com.xiaomi.hera.tspandata.TAttributes; -import com.xiaomi.hera.tspandata.TResource; -import com.xiaomi.hera.tspandata.TSpanData; -import com.xiaomi.hera.tspandata.TValue; -import com.xiaomi.youpin.prometheus.client.Metrics; +import com.xiaomi.hera.tspandata.*; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -74,11 +69,12 @@ public class MetricsParseService { @Autowired private HeraContextService heraContextService; - private Metrics singleMetrics = Metrics.getInstance(); + @Autowired + private MutiMetricsCall mutiMetrics; @PostConstruct public void init() { - singleMetrics.init(env, ""); + mutiMetrics.init(env, ""); } private static final double[] redisBuckets = new double[]{0.1D, 0.5D, 1.0D, 10.0D, 100.0D, 500.0D, 1000.0D}; @@ -97,13 +93,11 @@ public void parse(TSpanData tSpanData) { traceStatistics(serviceName); // 解析TSpanData,转换为指标类 MetricsParseResult metricsParseResult = metricsParse(tSpanData); - if(metricsParseResult.isIgnore()){ + if (metricsParseResult.isIgnore()) { return; } if (metricsParseResult.isValidate()) { - synchronized (LockUtil.lock) { - computeMetrics(metricsParseResult.getJaegerTracerDomain(), metricsParseResult.getHeraTraceEtlConfig()); - } + computeMetrics(metricsParseResult.getJaegerTracerDomain(), metricsParseResult.getHeraTraceEtlConfig()); } if (metricsParseResult.getDriverDomain() != null) { esService.insertDriver(metricsParseResult.getDriverDomain()); @@ -115,11 +109,9 @@ public void parse(TSpanData tSpanData) { } private void traceStatistics(String applicationName) { - synchronized (LockUtil.lock) { - singleMetrics.newCounter("trace_statistics_span_count", "application") - .with(applicationName) - .add(1, applicationName); - } + mutiMetrics.newCounter("trace_statistics_span_count", "application") + .with(applicationName) + .add(1, applicationName); } public MetricsParseResult metricsParse(TSpanData tSpanData) { @@ -304,57 +296,57 @@ private void computeMetrics(JaegerTracerDomain jtc, HeraTraceEtlConfig config) { if (exclude(config == null ? excludeHttpServer : config.getExcludeHttpserverMethod(), jtc.getMethod())) { return; } - singleMetrics.newCounter(httpMetricsName+jtc.getType() + "TotalMethodCount", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(httpMetricsName + jtc.getType() + "TotalMethodCount", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); // success or fail counter if (jtc.isSuccess()) { - singleMetrics.newCounter(httpMetricsName+jtc.getType() + "SuccessMethodCount", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(httpMetricsName + jtc.getType() + "SuccessMethodCount", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); // slow query if (jtc.getDuration() > (config == null ? httpSlowTime : config.getHttpSlowThreshold())) { - singleMetrics.newCounter(httpMetricsName+"httpSlowQuery", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(httpMetricsName + "httpSlowQuery", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); esService.submitErrorEsTrace(esDomain, jtc.getMethod(), metricsServiceName, jtc.getTraceId(), "http", jtc.getServerIp(), String.valueOf(jtc.getEndTime()), "", String.valueOf(jtc.getDuration()), "timeout", jtc.getHttpCode(), jtc.getServiceEnv()); } } else { - singleMetrics.newCounter(httpMetricsName+"httpError", "methodName", "application", "serverIp", "errorCode", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(httpMetricsName + "httpError", "methodName", "application", "serverIp", "errorCode", "serverEnv", "serverEnvId") .with(jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getHttpCode(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getHttpCode(), jtc.getServiceEnv(), jtc.getServiceEnvId()); esService.submitErrorEsTrace(esDomain, jtc.getMethod(), metricsServiceName, jtc.getTraceId(), "http", jtc.getServerIp(), String.valueOf(jtc.getEndTime()), "", String.valueOf(jtc.getDuration()), "error", jtc.getHttpCode(), jtc.getServiceEnv()); } - singleMetrics.newHistogram(httpMetricsName+jtc.getType() + "MethodTimeCount", aopDubboBuckets, new String[]{"methodName", "application", "serverIp", "serverEnv", "serverEnvId"}) + mutiMetrics.newHistogram(httpMetricsName + jtc.getType() + "MethodTimeCount", aopDubboBuckets, new String[]{"methodName", "application", "serverIp", "serverEnv", "serverEnvId"}) .with(jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .observe(jtc.getDuration(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); - singleMetrics.newHistogram(httpMetricsName+jtc.getType() + "MethodTimeCount_without_methodName", aopDubboBuckets, new String[]{"application", "serverIp", "serverEnv", "serverEnvId"}) + mutiMetrics.newHistogram(httpMetricsName + jtc.getType() + "MethodTimeCount_without_methodName", aopDubboBuckets, new String[]{"application", "serverIp", "serverEnv", "serverEnvId"}) .with(metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .observe(jtc.getDuration(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); } else if (SpanKind.CLIENT.equals(jtc.getKind())) { - singleMetrics.newCounter(httpMetricsName+jtc.getType() + "ClientTotalMethodCount", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(httpMetricsName + jtc.getType() + "ClientTotalMethodCount", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); if (jtc.isSuccess()) { - singleMetrics.newCounter(httpMetricsName+jtc.getType() + "ClientSuccessMethodCount", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(httpMetricsName + jtc.getType() + "ClientSuccessMethodCount", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); if (jtc.getDuration() > (config == null ? httpSlowTime : config.getHttpSlowThreshold())) { - singleMetrics.newCounter(httpMetricsName+"httpClientSlowQuery", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(httpMetricsName + "httpClientSlowQuery", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); esService.submitErrorEsTrace(esDomain, jtc.getMethod(), metricsServiceName, jtc.getTraceId(), "http_client", jtc.getServerIp(), String.valueOf(jtc.getEndTime()), "", String.valueOf(jtc.getDuration()), "timeout", jtc.getHttpCode(), jtc.getServiceEnv()); } } else { - singleMetrics.newCounter(httpMetricsName+"httpClientError", "methodName", "application", "serverIp", "errorCode", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(httpMetricsName + "httpClientError", "methodName", "application", "serverIp", "errorCode", "serverEnv", "serverEnvId") .with(jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getHttpCode(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getHttpCode(), jtc.getServiceEnv(), jtc.getServiceEnvId()); esService.submitErrorEsTrace(esDomain, jtc.getMethod(), metricsServiceName, jtc.getTraceId(), "http_client", jtc.getServerIp(), String.valueOf(jtc.getEndTime()), "", String.valueOf(jtc.getDuration()), "error", jtc.getHttpCode(), jtc.getServiceEnv()); } - singleMetrics.newHistogram(httpMetricsName+jtc.getType() + "ClientMethodTimeCount", aopDubboBuckets, new String[]{"methodName", "application", "serverIp", "serverEnv", "serverEnvId"}) + mutiMetrics.newHistogram(httpMetricsName + jtc.getType() + "ClientMethodTimeCount", aopDubboBuckets, new String[]{"methodName", "application", "serverIp", "serverEnv", "serverEnvId"}) .with(jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .observe(jtc.getDuration(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); - singleMetrics.newHistogram(httpMetricsName+jtc.getType() + "ClientMethodTimeCount_without_methodName", aopDubboBuckets, new String[]{"application", "serverIp", "serverEnv", "serverEnvId"}) + mutiMetrics.newHistogram(httpMetricsName + jtc.getType() + "ClientMethodTimeCount_without_methodName", aopDubboBuckets, new String[]{"application", "serverIp", "serverEnv", "serverEnvId"}) .with(metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .observe(jtc.getDuration(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); } @@ -363,53 +355,53 @@ private void computeMetrics(JaegerTracerDomain jtc, HeraTraceEtlConfig config) { if (SpanType.DUBBO.equals(jtc.getType())) { String dubboMetricsName = "hera_"; if (SpanKind.CLIENT.equals(jtc.getKind())) { - singleMetrics.newHistogram(dubboMetricsName + "dubboConsumerTimeCost", aopDubboBuckets, new String[]{"serviceName", "methodName", "application", "serverIp", "serverEnv", "serverEnvId"}) + mutiMetrics.newHistogram(dubboMetricsName + "dubboConsumerTimeCost", aopDubboBuckets, new String[]{"serviceName", "methodName", "application", "serverIp", "serverEnv", "serverEnvId"}) .with(jtc.getDubboServiceName(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .observe(jtc.getDuration(), jtc.getDubboServiceName(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); - singleMetrics.newHistogram(dubboMetricsName + "dubboConsumerTimeCost_without_methodName", aopDubboBuckets, new String[]{"serviceName", "application", "serverIp", "serverEnv", "serverEnvId"}) + mutiMetrics.newHistogram(dubboMetricsName + "dubboConsumerTimeCost_without_methodName", aopDubboBuckets, new String[]{"serviceName", "application", "serverIp", "serverEnv", "serverEnvId"}) .with(jtc.getDubboServiceName(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .observe(jtc.getDuration(), jtc.getDubboServiceName(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); - singleMetrics.newCounter(dubboMetricsName + "dubboBisTotalCount", "serviceName", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(dubboMetricsName + "dubboBisTotalCount", "serviceName", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getDubboServiceName(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getDubboServiceName(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); if (jtc.isSuccess()) { - singleMetrics.newCounter(dubboMetricsName + "dubboBisSuccessCount", "serviceName", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(dubboMetricsName + "dubboBisSuccessCount", "serviceName", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getDubboServiceName(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getDubboServiceName(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); if (jtc.getDuration() > (config == null ? dubboSlowTime : config.getDubboSlowThreshold())) { - singleMetrics.newCounter(dubboMetricsName + "dubboConsumerSlowQuery", "serviceName", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(dubboMetricsName + "dubboConsumerSlowQuery", "serviceName", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getDubboServiceName(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getDubboServiceName(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); esService.submitErrorEsTrace(esDomain, jtc.getDubboServiceName() + "/" + jtc.getMethod(), metricsServiceName, jtc.getTraceId(), "dubbo_consumer", jtc.getServerIp(), String.valueOf(jtc.getEndTime()), "", String.valueOf(jtc.getDuration()), "timeout", jtc.getHttpCode(), jtc.getServiceEnv()); } } else { - singleMetrics.newCounter(dubboMetricsName + "dubboConsumerError", "serviceName", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(dubboMetricsName + "dubboConsumerError", "serviceName", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getDubboServiceName(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getDubboServiceName(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); esService.submitErrorEsTrace(esDomain, jtc.getDubboServiceName() + "/" + jtc.getMethod(), metricsServiceName, jtc.getTraceId(), "dubbo_consumer", jtc.getServerIp(), String.valueOf(jtc.getEndTime()), "", String.valueOf(jtc.getDuration()), "error", jtc.getHttpCode(), jtc.getServiceEnv()); } } else if (SpanKind.SERVER.equals(jtc.getKind())) { - singleMetrics.newCounter(dubboMetricsName + "dubboMethodCalledCount", "serviceName", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(dubboMetricsName + "dubboMethodCalledCount", "serviceName", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getDubboServiceName(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getDubboServiceName(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); - singleMetrics.newHistogram(dubboMetricsName + "dubboProviderCount", aopDubboBuckets, new String[]{"serviceName", "methodName", "application", "serverIp", "serverEnv", "serverEnvId"}) + mutiMetrics.newHistogram(dubboMetricsName + "dubboProviderCount", aopDubboBuckets, new String[]{"serviceName", "methodName", "application", "serverIp", "serverEnv", "serverEnvId"}) .with(jtc.getDubboServiceName(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .observe(jtc.getDuration(), jtc.getDubboServiceName(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); - singleMetrics.newHistogram(dubboMetricsName + "dubboProviderCount_without_methodName", aopDubboBuckets, new String[]{"serviceName", "application", "serverIp", "serverEnv", "serverEnvId"}) + mutiMetrics.newHistogram(dubboMetricsName + "dubboProviderCount_without_methodName", aopDubboBuckets, new String[]{"serviceName", "application", "serverIp", "serverEnv", "serverEnvId"}) .with(jtc.getDubboServiceName(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .observe(jtc.getDuration(), jtc.getDubboServiceName(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); if (jtc.isSuccess()) { - singleMetrics.newCounter(dubboMetricsName + "dubboMethodCalledSuccessCount", "serviceName", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(dubboMetricsName + "dubboMethodCalledSuccessCount", "serviceName", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getDubboServiceName(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getDubboServiceName(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); if (jtc.getDuration() > (config == null ? dubboSlowTime : config.getDubboSlowThreshold())) { - singleMetrics.newCounter(dubboMetricsName + "dubboProviderSlowQuery", "serviceName", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(dubboMetricsName + "dubboProviderSlowQuery", "serviceName", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getDubboServiceName(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getDubboServiceName(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); esService.submitErrorEsTrace(esDomain, jtc.getDubboServiceName() + "/" + jtc.getMethod(), metricsServiceName, jtc.getTraceId(), "dubbo_provider", jtc.getServerIp(), String.valueOf(jtc.getEndTime()), "", String.valueOf(jtc.getDuration()), "timeout", jtc.getHttpCode(), jtc.getServiceEnv()); } } else { - singleMetrics.newCounter(dubboMetricsName + "dubboProviderError", "serviceName", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(dubboMetricsName + "dubboProviderError", "serviceName", "methodName", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getDubboServiceName(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getDubboServiceName(), jtc.getMethod(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); esService.submitErrorEsTrace(esDomain, jtc.getDubboServiceName() + "/" + jtc.getMethod(), metricsServiceName, jtc.getTraceId(), "dubbo_provider", jtc.getServerIp(), String.valueOf(jtc.getEndTime()), "", String.valueOf(jtc.getDuration()), "error", jtc.getHttpCode(), jtc.getServiceEnv()); @@ -420,19 +412,19 @@ private void computeMetrics(JaegerTracerDomain jtc, HeraTraceEtlConfig config) { if (SpanType.REDIS.equals(jtc.getType())) { String redisMetricsName = "hera_"; if (jtc.isSuccess()) { - singleMetrics.newCounter(redisMetricsName + "RedisSuccessCount", "method", "host", "port", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(redisMetricsName + "RedisSuccessCount", "method", "host", "port", "application", "serverIp", "serverEnv", "serverEnvId") .with(reduceString(jtc.getMethod(), 100), jtc.getDbHost(), jtc.getDbPort(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, reduceString(jtc.getMethod(), 100), jtc.getDbHost(), jtc.getDbPort(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); } else { - singleMetrics.newCounter(redisMetricsName + "redisError", "method", "host", "port", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(redisMetricsName + "redisError", "method", "host", "port", "application", "serverIp", "serverEnv", "serverEnvId") .with(reduceString(jtc.getMethod(), 100), jtc.getDbHost(), jtc.getDbPort(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, reduceString(jtc.getMethod(), 100), jtc.getDbHost(), jtc.getDbPort(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); esService.submitErrorEsTrace(esDomain, jtc.getMethod(), metricsServiceName, jtc.getTraceId(), "redis", jtc.getServerIp(), String.valueOf(jtc.getEndTime()), jtc.getDbHost() + ":" + jtc.getDbPort(), String.valueOf(jtc.getDuration()), "error", jtc.getHttpCode(), jtc.getServiceEnv()); } - singleMetrics.newCounter(redisMetricsName + "RedisTotalCount", "method", "host", "port", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(redisMetricsName + "RedisTotalCount", "method", "host", "port", "application", "serverIp", "serverEnv", "serverEnvId") .with(reduceString(jtc.getMethod(), 100), jtc.getDbHost(), jtc.getDbPort(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, reduceString(jtc.getMethod(), 100), jtc.getDbHost(), jtc.getDbPort(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); - singleMetrics.newHistogram(redisMetricsName + "RedisMethodTimeCost", redisBuckets, new String[]{"method", "host", "port", "application", "serverIp", "serverEnv", "serverEnvId"}) + mutiMetrics.newHistogram(redisMetricsName + "RedisMethodTimeCost", redisBuckets, new String[]{"method", "host", "port", "application", "serverIp", "serverEnv", "serverEnvId"}) .with(reduceString(jtc.getMethod(), 100), jtc.getDbHost(), jtc.getDbPort(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .observe(jtc.getDuration(), reduceString(jtc.getMethod(), 100), jtc.getDbHost(), jtc.getDbPort(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); } @@ -440,25 +432,25 @@ private void computeMetrics(JaegerTracerDomain jtc, HeraTraceEtlConfig config) { if (SpanType.MYSQL.equals(jtc.getType())) { String mysqlMetricsName = "hera_"; if (jtc.isSuccess()) { - singleMetrics.newCounter(mysqlMetricsName + "sqlSuccessCount", "dataSource", "sqlMethod", "sql", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(mysqlMetricsName + "sqlSuccessCount", "dataSource", "sqlMethod", "sql", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getDataSource() + "/" + jtc.getDbName(), jtc.getSqlMethod(), reduceString(jtc.getSql(), 100), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getDataSource() + "/" + jtc.getDbName(), jtc.getSqlMethod(), reduceString(jtc.getSql(), 100), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); if (jtc.getDuration() > (config == null ? mysqlSlowTime : config.getMysqlSlowThreshold())) { - singleMetrics.newCounter(mysqlMetricsName + "dbSlowQuery", "dataSource", "sqlMethod", "sql", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(mysqlMetricsName + "dbSlowQuery", "dataSource", "sqlMethod", "sql", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getDataSource() + "/" + jtc.getDbName(), jtc.getSqlMethod(), reduceString(jtc.getSql(), 100), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getDataSource() + "/" + jtc.getDbName(), jtc.getSqlMethod(), reduceString(jtc.getSql(), 100), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); esService.submitErrorEsTrace(esDomain, reduceString(jtc.getSql(), 200), metricsServiceName, jtc.getTraceId(), "mysql", jtc.getServerIp(), String.valueOf(jtc.getEndTime()), jtc.getDataSource() + "/" + jtc.getDbName(), String.valueOf(jtc.getDuration()), "timeout", jtc.getHttpCode(), jtc.getServiceEnv()); } } else { - singleMetrics.newCounter(mysqlMetricsName + "dbError", "dataSource", "sqlMethod", "sql", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(mysqlMetricsName + "dbError", "dataSource", "sqlMethod", "sql", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getDataSource() + "/" + jtc.getDbName(), jtc.getSqlMethod(), reduceString(jtc.getSql(), 100), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getDataSource() + "/" + jtc.getDbName(), jtc.getSqlMethod(), reduceString(jtc.getSql(), 100), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); esService.submitErrorEsTrace(esDomain, reduceString(jtc.getSql(), 200), metricsServiceName, jtc.getTraceId(), "mysql", jtc.getServerIp(), String.valueOf(jtc.getEndTime()), jtc.getDataSource() + "/" + jtc.getDbName(), String.valueOf(jtc.getDuration()), "error", jtc.getHttpCode(), jtc.getServiceEnv()); } - singleMetrics.newCounter(mysqlMetricsName + "sqlTotalCount", "dataSource", "sqlMethod", "sql", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(mysqlMetricsName + "sqlTotalCount", "dataSource", "sqlMethod", "sql", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getDataSource() + "/" + jtc.getDbName(), jtc.getSqlMethod(), reduceString(jtc.getSql(), 100), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getDataSource() + "/" + jtc.getDbName(), jtc.getSqlMethod(), reduceString(jtc.getSql(), 100), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); - singleMetrics.newHistogram(mysqlMetricsName + "sqlTotalTimer", sqlBuckets, new String[]{"dataSource", "sqlMethod", "sql", "application", "serverIp", "serverEnv", "serverEnvId"}) + mutiMetrics.newHistogram(mysqlMetricsName + "sqlTotalTimer", sqlBuckets, new String[]{"dataSource", "sqlMethod", "sql", "application", "serverIp", "serverEnv", "serverEnvId"}) .with(jtc.getDataSource() + "/" + jtc.getDbName(), jtc.getSqlMethod(), reduceString(jtc.getSql(), 100), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .observe(jtc.getDuration(), jtc.getDataSource() + "/" + jtc.getDbName(), jtc.getSqlMethod(), reduceString(jtc.getSql(), 100), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); } @@ -466,25 +458,25 @@ private void computeMetrics(JaegerTracerDomain jtc, HeraTraceEtlConfig config) { if (SpanType.MONGODB.equals(jtc.getType())) { String mysqlMetricsName = "hera_"; if (jtc.isSuccess()) { - singleMetrics.newCounter(mysqlMetricsName + "mongoSuccessCount", "dataSource", "method", "command", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(mysqlMetricsName + "mongoSuccessCount", "dataSource", "method", "command", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getDataSource(), jtc.getSqlMethod(), reduceString(jtc.getSql(), 100), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getDataSource(), jtc.getSqlMethod(), reduceString(jtc.getSql(), 100), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); if (jtc.getDuration() > mysqlSlowTime) { - singleMetrics.newCounter(mysqlMetricsName + "mongodbSlowQuery", "dataSource", "method", "command", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(mysqlMetricsName + "mongodbSlowQuery", "dataSource", "method", "command", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getDataSource(), jtc.getSqlMethod(), reduceString(jtc.getSql(), 100), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getDataSource(), jtc.getSqlMethod(), reduceString(jtc.getSql(), 100), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); esService.submitErrorEsTrace(esDomain, reduceString(jtc.getSql(), 200), metricsServiceName, jtc.getTraceId(), SpanType.MONGODB, jtc.getServerIp(), String.valueOf(jtc.getEndTime()), jtc.getDataSource(), String.valueOf(jtc.getDuration()), "timeout", jtc.getHttpCode(), jtc.getServiceEnv()); } } else { - singleMetrics.newCounter(mysqlMetricsName + "mongodbError", "dataSource", "method", "command", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(mysqlMetricsName + "mongodbError", "dataSource", "method", "command", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getDataSource(), jtc.getSqlMethod(), reduceString(jtc.getSql(), 100), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getDataSource(), jtc.getSqlMethod(), reduceString(jtc.getSql(), 100), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); esService.submitErrorEsTrace(esDomain, reduceString(jtc.getSql(), 200), metricsServiceName, jtc.getTraceId(), SpanType.MONGODB, jtc.getServerIp(), String.valueOf(jtc.getEndTime()), jtc.getDataSource(), String.valueOf(jtc.getDuration()), "error", jtc.getHttpCode(), jtc.getServiceEnv()); } - singleMetrics.newCounter(mysqlMetricsName + "mongoTotalCount", "dataSource", "method", "command", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(mysqlMetricsName + "mongoTotalCount", "dataSource", "method", "command", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getDataSource(), jtc.getSqlMethod(), reduceString(jtc.getSql(), 100), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getDataSource(), jtc.getSqlMethod(), reduceString(jtc.getSql(), 100), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); - singleMetrics.newHistogram(mysqlMetricsName + "mongoTotalTimer", sqlBuckets, new String[]{"dataSource", "method", "command", "application", "serverIp", "serverEnv", "serverEnvId"}) + mutiMetrics.newHistogram(mysqlMetricsName + "mongoTotalTimer", sqlBuckets, new String[]{"dataSource", "method", "command", "application", "serverIp", "serverEnv", "serverEnvId"}) .with(jtc.getDataSource(), jtc.getSqlMethod(), reduceString(jtc.getSql(), 100), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .observe(jtc.getDuration(), jtc.getDataSource(), jtc.getSqlMethod(), reduceString(jtc.getSql(), 100), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); } @@ -492,47 +484,47 @@ private void computeMetrics(JaegerTracerDomain jtc, HeraTraceEtlConfig config) { if (SpanType.ROCKETMQ.equals(jtc.getType())) { String metricsName = "hera_"; if (SpanKind.PRODUCER.equals(jtc.getKind())) { - singleMetrics.newHistogram(metricsName + "rocketmqProducerTimeCost", aopDubboBuckets, new String[]{"topic", "application", "serverIp", "serverEnv", "serverEnvId"}) + mutiMetrics.newHistogram(metricsName + "rocketmqProducerTimeCost", aopDubboBuckets, new String[]{"topic", "application", "serverIp", "serverEnv", "serverEnvId"}) .with(jtc.getTopic(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .observe(jtc.getDuration(), jtc.getTopic(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); - singleMetrics.newCounter(metricsName + "rocketmqProducerTotalCount", "topic", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(metricsName + "rocketmqProducerTotalCount", "topic", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getTopic(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getTopic(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); if (jtc.isSuccess()) { - singleMetrics.newCounter(metricsName + "rocketmqProducerSuccessCount", "topic", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(metricsName + "rocketmqProducerSuccessCount", "topic", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getTopic(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getTopic(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); if (jtc.getDuration() > dubboSlowTime) { - singleMetrics.newCounter(metricsName + "rocketmqProducerSlowQuery", "topic", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(metricsName + "rocketmqProducerSlowQuery", "topic", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getTopic(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getTopic(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); esService.submitErrorEsTrace(esDomain, jtc.getMethod(), metricsServiceName, jtc.getTraceId(), "rocketmq_producer", jtc.getServerIp(), String.valueOf(jtc.getEndTime()), "", String.valueOf(jtc.getDuration()), "timeout", jtc.getHttpCode(), jtc.getServiceEnv()); } } else { - singleMetrics.newCounter(metricsName + "rocketmqProducerError", "topic", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(metricsName + "rocketmqProducerError", "topic", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getTopic(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getTopic(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); esService.submitErrorEsTrace(esDomain, jtc.getMethod(), metricsServiceName, jtc.getTraceId(), "rocketmq_producer", jtc.getServerIp(), String.valueOf(jtc.getEndTime()), "", String.valueOf(jtc.getDuration()), "error", jtc.getHttpCode(), jtc.getServiceEnv()); } } else if (SpanKind.CONSUMER.equals(jtc.getKind())) { - singleMetrics.newHistogram(metricsName + "rocketmqConsumerTimeCost", aopDubboBuckets, new String[]{"topic", "application", "serverIp", "serverEnv", "serverEnvId"}) + mutiMetrics.newHistogram(metricsName + "rocketmqConsumerTimeCost", aopDubboBuckets, new String[]{"topic", "application", "serverIp", "serverEnv", "serverEnvId"}) .with(jtc.getTopic(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .observe(jtc.getDuration(), jtc.getTopic(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); - singleMetrics.newCounter(metricsName + "rocketmqConsumerTotalCount", "topic", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(metricsName + "rocketmqConsumerTotalCount", "topic", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getTopic(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getTopic(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); if (jtc.isSuccess()) { - singleMetrics.newCounter(metricsName + "rocketmqConsumerSuccessCount", "topic", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(metricsName + "rocketmqConsumerSuccessCount", "topic", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getTopic(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getTopic(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); if (jtc.getDuration() > dubboSlowTime) { - singleMetrics.newCounter(metricsName + "rocketmqConsumerSlowQuery", "topic", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(metricsName + "rocketmqConsumerSlowQuery", "topic", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getTopic(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getTopic(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); esService.submitErrorEsTrace(esDomain, jtc.getMethod(), metricsServiceName, jtc.getTraceId(), "rocketmq_consumer", jtc.getServerIp(), String.valueOf(jtc.getEndTime()), "", String.valueOf(jtc.getDuration()), "timeout", jtc.getHttpCode(), jtc.getServiceEnv()); } } else { - singleMetrics.newCounter(metricsName + "rocketmqConsumerError", "topic", "application", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(metricsName + "rocketmqConsumerError", "topic", "application", "serverIp", "serverEnv", "serverEnvId") .with(jtc.getTopic(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, jtc.getTopic(), metricsServiceName, jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); esService.submitErrorEsTrace(esDomain, jtc.getMethod(), metricsServiceName, jtc.getTraceId(), "rocketmq_consumer", jtc.getServerIp(), String.valueOf(jtc.getEndTime()), "", String.valueOf(jtc.getDuration()), "error", jtc.getHttpCode(), jtc.getServiceEnv()); @@ -542,24 +534,24 @@ private void computeMetrics(JaegerTracerDomain jtc, HeraTraceEtlConfig config) { // customizeAnno if (SpanType.CUSTOMIZE_MTTHOD.equals(jtc.getType())) { String metricsName = "hera_"; - singleMetrics.newHistogram(metricsName + "CustomizeMethodTimeCost", aopDubboBuckets, new String[]{"application", "methodName", "serverIp", "serverEnv", "serverEnvId"}) + mutiMetrics.newHistogram(metricsName + "CustomizeMethodTimeCost", aopDubboBuckets, new String[]{"application", "methodName", "serverIp", "serverEnv", "serverEnvId"}) .with(metricsServiceName, jtc.getMethod(), jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .observe(jtc.getDuration(), metricsServiceName, jtc.getMethod(), jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); - singleMetrics.newCounter(metricsName + "CustomizeMethodTotalCount", "application", "methodName", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(metricsName + "CustomizeMethodTotalCount", "application", "methodName", "serverIp", "serverEnv", "serverEnvId") .with(metricsServiceName, jtc.getMethod(), jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, metricsServiceName, jtc.getMethod(), jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); if (jtc.isSuccess()) { - singleMetrics.newCounter(metricsName + "CustomizeMethodSuccessCount", "application", "methodName", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(metricsName + "CustomizeMethodSuccessCount", "application", "methodName", "serverIp", "serverEnv", "serverEnvId") .with(metricsServiceName, jtc.getMethod(), jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, metricsServiceName, jtc.getMethod(), jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); if (jtc.getDuration() > dubboSlowTime) { - singleMetrics.newCounter(metricsName + "CustomizeMethodSlowQuery", "application", "methodName", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(metricsName + "CustomizeMethodSlowQuery", "application", "methodName", "serverIp", "serverEnv", "serverEnvId") .with(metricsServiceName, jtc.getMethod(), jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, metricsServiceName, jtc.getMethod(), jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); esService.submitErrorEsTrace(esDomain, jtc.getMethod(), metricsServiceName, jtc.getTraceId(), "customize_method", jtc.getServerIp(), String.valueOf(jtc.getEndTime()), "", String.valueOf(jtc.getDuration()), "timeout", jtc.getHttpCode(), jtc.getServiceEnv()); } } else { - singleMetrics.newCounter(metricsName + "CustomizeMethodError", "application", "methodName", "serverIp", "serverEnv", "serverEnvId") + mutiMetrics.newCounter(metricsName + "CustomizeMethodError", "application", "methodName", "serverIp", "serverEnv", "serverEnvId") .with(metricsServiceName, jtc.getMethod(), jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()) .add(1, metricsServiceName, jtc.getMethod(), jtc.getServerIp(), jtc.getServiceEnv(), jtc.getServiceEnvId()); esService.submitErrorEsTrace(esDomain, jtc.getMethod(), metricsServiceName, jtc.getTraceId(), "customize_method", jtc.getServerIp(), String.valueOf(jtc.getEndTime()), "", String.valueOf(jtc.getDuration()), "error", jtc.getHttpCode(), jtc.getServiceEnv()); diff --git a/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/consumer/MutiMetricsCall.java b/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/consumer/MutiMetricsCall.java new file mode 100644 index 000000000..dc355542f --- /dev/null +++ b/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/consumer/MutiMetricsCall.java @@ -0,0 +1,60 @@ +package com.xiaomi.hera.trace.etl.consumer; + +import com.xiaomi.youpin.prometheus.client.XmCounter; +import com.xiaomi.youpin.prometheus.client.XmHistogram; +import com.xiaomi.youpin.prometheus.client.multi.MutiMetrics; +import lombok.Getter; +import org.springframework.stereotype.Service; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author goodjava@qq.com + * @date 2023/8/29 16:22 + */ +@Service +public class MutiMetricsCall { + + + private MutiMetrics[] mutiMetricsArray = new MutiMetrics[2]; + + @Getter + private AtomicInteger index = new AtomicInteger(0); + + public void init(String group, String service) { + Arrays.stream(mutiMetricsArray).forEach(it -> { + it.init(group, service); + }); + } + + public void change() { + index.updateAndGet((i) -> { + if (i == 0) { + return 1; + } + return 0; + }); + } + + public MutiMetrics old() { + return mutiMetricsArray[this.index.get() == 0 ? 1 : 0]; + } + + + public MutiMetricsCall() { + mutiMetricsArray[0] = new MutiMetrics(); + mutiMetricsArray[1] = new MutiMetrics(); + } + + + public XmCounter newCounter(String metricName, String... labelNames) { + return mutiMetricsArray[index.get()].newCounter(metricName, labelNames); + } + + public XmHistogram newHistogram(String metricName, double[] buckets, String... labelNames) { + return mutiMetricsArray[index.get()].newHistogram(metricName, buckets, labelNames); + } + + +} diff --git a/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/util/nacos/NacosClientUtil.java b/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/util/nacos/NacosClientUtil.java index cf7569e23..9a08136c3 100644 --- a/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/util/nacos/NacosClientUtil.java +++ b/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/util/nacos/NacosClientUtil.java @@ -1,12 +1,10 @@ package com.xiaomi.hera.trace.etl.util.nacos; -import com.alibaba.fastjson.JSONObject; import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.annotation.NacosValue; import com.alibaba.nacos.api.naming.pojo.Instance; import com.alibaba.nacos.client.naming.NacosNamingService; -import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,13 +13,7 @@ import javax.annotation.PostConstruct; import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.logging.Level; +import java.util.*; @Component public class NacosClientUtil { diff --git a/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/util/prometheus/HTTPServer.java b/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/util/prometheus/HTTPServer.java index c6ab97d2e..fa947e324 100644 --- a/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/util/prometheus/HTTPServer.java +++ b/hera-all/trace-etl/trace-etl-server/src/main/java/com/xiaomi/hera/trace/etl/util/prometheus/HTTPServer.java @@ -6,10 +6,7 @@ import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpServer; -import com.xiaomi.hera.trace.etl.constant.LockUtil; -import com.xiaomi.youpin.prometheus.client.Metrics; -import com.xiaomi.youpin.prometheus.client.MetricsManager; -import com.xiaomi.youpin.prometheus.client.Prometheus; +import com.xiaomi.hera.trace.etl.consumer.DataCacheService; import com.xiaomi.youpin.prometheus.client.binder.ClassLoaderMetricsReduced; import com.xiaomi.youpin.prometheus.client.binder.JvmGcMetricsReduced; import com.xiaomi.youpin.prometheus.client.binder.JvmMemoryMetricsReduced; @@ -21,9 +18,6 @@ import io.micrometer.prometheus.PrometheusMeterRegistry; import io.prometheus.client.Collector; import io.prometheus.client.CollectorRegistry; -import io.prometheus.client.Counter; -import io.prometheus.client.Gauge; -import io.prometheus.client.Histogram; import io.prometheus.client.exporter.common.TextFormat; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -32,6 +26,7 @@ import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import javax.annotation.Resource; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -43,12 +38,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -68,6 +58,9 @@ public class HTTPServer { @Value("${metrics.uri.whitelist}") private String uriWhitelist; + @Resource + private DataCacheService dataCacheService; + private HttpServer server; private ExecutorService executorService; public static final String APPLICATION = "application"; @@ -159,60 +152,56 @@ private Map getData(String contentType, CollectorRegistry regist @Override public void handle(HttpExchange exchange) throws IOException { long a = System.currentTimeMillis(); - if(!filterRequest(exchange)){ + if (!filterRequest(exchange)) { return; } String hostString = exchange.getRemoteAddress().getHostString(); String path = exchange.getRequestURI().getPath(); - boolean isCache = false; - synchronized (LockUtil.lock) { - try (OutputStream os = exchange.getResponseBody()) { - if ("/-/healthy".equals(path)) { - exchange.sendResponseHeaders(200, HEALTHY_RESPONSE.length); - os.write(HEALTHY_RESPONSE); - os.flush(); - return; - } else { - String contentType = TextFormat.chooseContentType(exchange.getRequestHeaders().getFirst("Accept")); - exchange.getResponseHeaders().set("Content-Type", contentType); - CollectorRegistry registry = this.registryMap.get("default"); - if ("/jvm".equals(path)) { - registry = this.registryMap.get("jvm"); - } + byte[] data = null; + try (OutputStream os = exchange.getResponseBody()) { + if ("/-/healthy".equals(path)) { + exchange.sendResponseHeaders(200, HEALTHY_RESPONSE.length); + os.write(HEALTHY_RESPONSE); + os.flush(); + } else { + String contentType = TextFormat.chooseContentType(exchange.getRequestHeaders().getFirst("Accept")); + exchange.getResponseHeaders().set("Content-Type", contentType); + if ("/jvm".equals(path)) { + CollectorRegistry registry = this.registryMap.get("jvm"); Map dataMap = getData(contentType, registry, path, hostString); - byte[] data = (byte[]) dataMap.get("data"); - isCache = (boolean) dataMap.get("isCache"); - exchange.sendResponseHeaders(200, data.length); - os.write(data); - os.flush(); - } - } catch (Throwable ex) { - ex.printStackTrace(); - exchange.sendResponseHeaders(200, 0); - exchange.getResponseBody().write(new byte[]{}); - } finally { - String query = exchange.getRequestURI().getRawQuery(); - long b = System.currentTimeMillis(); - log.info("prometheus request uri : " + path + " queryString : " + query + " remoteAddr:" + hostString + " duration : " + (b - a)); - if (!isCache && "/metrics".equals(path) && token.equals(getToken(query))) { - clearMetrics(); + data = (byte[]) dataMap.get("data"); + } else { + data = dataCacheService.getData(); } + exchange.sendResponseHeaders(200, data.length); + os.write(data); + os.flush(); } + } catch (Throwable ex) { + ex.printStackTrace(); + exchange.sendResponseHeaders(200, 0); + exchange.getResponseBody().write(new byte[]{}); + } finally { + String query = exchange.getRequestURI().getRawQuery(); + long b = System.currentTimeMillis(); + int len = null != data ? data.length : 0; + log.info("prometheus request uri : " + path + " queryString : " + query + " remoteAddr:" + hostString + " duration : " + (b - a) + " data size:" + len); + } } - private boolean filterRequest(HttpExchange exchange){ - if(StringUtils.isEmpty(ua)){ + private boolean filterRequest(HttpExchange exchange) { + if (StringUtils.isEmpty(ua)) { return true; } // Filter by User-Agent Headers requestHeaders = exchange.getRequestHeaders(); - if(requestHeaders != null && requestHeaders.size() > 0) { + if (requestHeaders != null && requestHeaders.size() > 0) { List headers = requestHeaders.get("User-agent"); if (headers != null && headers.size() > 0) { - for (String header : headers){ - for(String uaBlack : ua.split(";")){ - if(header.contains(uaBlack)){ + for (String header : headers) { + for (String uaBlack : ua.split(";")) { + if (header.contains(uaBlack)) { return false; } } @@ -226,42 +215,6 @@ private boolean filterRequest(HttpExchange exchange){ return true; } - private void clearMetrics() { -// synchronized (LockUtil.lock) { - try { - MetricsManager gMetricsMgr = Metrics.getInstance().gMetricsMgr; - if (gMetricsMgr instanceof Prometheus) { - Prometheus prometheus = (Prometheus) gMetricsMgr; - Map prometheusMetrics = prometheus.prometheusMetrics; - clearTypeMetrics(prometheusMetrics); - prometheus.prometheusMetrics.clear(); - prometheus.prometheusTypeMetrics.clear(); - } - } catch (Exception e) { - log.error("clear metrics error", e); - } -// } - } - - private void clearTypeMetrics(Map prometheusMetrics) { - for (String key : prometheusMetrics.keySet()) { - Object o = prometheusMetrics.get(key); - if (o instanceof Counter) { - Counter counter = (Counter) o; - CollectorRegistry.defaultRegistry.unregister(counter); - } else if (o instanceof Gauge) { - Gauge gauge = (Gauge) o; - gauge.clear(); - CollectorRegistry.defaultRegistry.unregister(gauge); - } else if (o instanceof Histogram) { - Histogram histogram = (Histogram) o; - histogram.clear(); - CollectorRegistry.defaultRegistry.unregister(histogram); - } else { - log.error("metrics : " + key + " Type conversion failed, original type : " + o.getClass().getName()); - } - } - } } protected static String getToken(String query) throws IOException { diff --git a/hera-all/trace-etl/trace-etl-server/src/test/java/com/xiaomi/youpin/trace/etl/test/FastWriter.java b/hera-all/trace-etl/trace-etl-server/src/test/java/com/xiaomi/youpin/trace/etl/test/FastWriter.java new file mode 100644 index 000000000..c20ede8ec --- /dev/null +++ b/hera-all/trace-etl/trace-etl-server/src/test/java/com/xiaomi/youpin/trace/etl/test/FastWriter.java @@ -0,0 +1,68 @@ +package com.xiaomi.youpin.trace.etl.test; + +import io.prometheus.client.Collector; +import org.apache.commons.lang3.tuple.Pair; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Enumeration; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** + * @author goodjava@qq.com + * @date 2023/8/29 00:27 + */ +public class FastWriter { + + + public byte[] getBytes(Enumeration mfs) throws IOException { + ByteArrayOutputStream b = new ByteArrayOutputStream(); + OutputStreamWriter w = new OutputStreamWriter(b); + + ArrayList> list = new ArrayList<>(); + + int v =2; + + IntStream.range(0, v).forEach(i -> { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputStreamWriter writer = new OutputStreamWriter(baos); + list.add(Pair.of(baos, writer)); + }); + + + Collections.list(mfs).stream().parallel().forEach(it -> { + Pair pair = list.get((it.hashCode()& Integer.MAX_VALUE) % v); + OutputStreamWriter writer = pair.getValue(); + IntStream.range(0, 100).forEach(i -> { + try { + writer.write(it); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + }); + if (true) { + return new byte[]{}; + } + +// list.stream().map(p->{ +// try { +// p.getValue().flush(); +// return p.getKey().toByteArray(); +// } catch (IOException e) { +// throw new RuntimeException(e); +// } +// }).flatMap(Arrays::stream); +// w.flush(); +// return b.toByteArray(); + return null; + + } + +} diff --git a/hera-all/trace-etl/trace-etl-server/src/test/java/com/xiaomi/youpin/trace/etl/test/WriterTest.java b/hera-all/trace-etl/trace-etl-server/src/test/java/com/xiaomi/youpin/trace/etl/test/WriterTest.java new file mode 100644 index 000000000..d0f36a2e6 --- /dev/null +++ b/hera-all/trace-etl/trace-etl-server/src/test/java/com/xiaomi/youpin/trace/etl/test/WriterTest.java @@ -0,0 +1,149 @@ +package com.xiaomi.youpin.trace.etl.test; + +import com.google.common.base.Strings; +import lombok.SneakyThrows; +import okio.Buffer; +import okio.BufferedSink; +import okio.Okio; +import org.junit.Test; +import org.springframework.util.StopWatch; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.Charset; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * @author goodjava@qq.com + * @date 2023/8/28 22:55 + */ +public class WriterTest { + + private int num = 1000000; + + private String str = Strings.repeat("a", 10); + + + @SneakyThrows + @Test + public void test0() { + List list = IntStream.range(0, num).mapToObj(i -> str).collect(Collectors.toList()); + StopWatch sw = new StopWatch(); + Enumeration e = Collections.enumeration(list); + sw.start(); + byte[] data = new FastWriter().getBytes(e); + System.out.println(data.length); + sw.stop(); + System.out.println(sw.getTotalTimeMillis()); + } + + + @Test + public void test1() { + StopWatch sw = new StopWatch(); + sw.start(); + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputStreamWriter writer = new OutputStreamWriter(baos)) { + IntStream.range(0, num).forEach(i -> { + IntStream.range(0, 100).forEach(j -> { + try { + writer.write(str); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + }); + writer.flush(); + byte[] bytes = baos.toByteArray(); + System.out.println(bytes.length); + } catch (IOException e) { + throw new RuntimeException(e); + } + sw.stop(); + System.out.println(sw.getTotalTimeMillis()); + } + + @Test + public void test2() { + StopWatch sw = new StopWatch(); + sw.start(); + Buffer buffer = new Buffer(); + IntStream.range(0, num).forEach(i -> { + buffer.writeUtf8(str); + }); + long size = buffer.size(); + System.out.println(size); + sw.stop(); + System.out.println(sw.getTotalTimeMillis()); + } + + @SneakyThrows + @Test + public void test22() { + StopWatch sw = new StopWatch(); + sw.start(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(100000); + BufferedSink sink = Okio.buffer(Okio.sink(bos)); + IntStream.range(0, num).forEach(i -> { + try { + sink.write(str.getBytes()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + sink.flush(); + ; + long size = bos.size(); + System.out.println(size); + sw.stop(); + System.out.println(sw.getTotalTimeMillis()); + } + + + @Test + public void test3() { + Buffer buffer = new Buffer(); + buffer.write("abc".getBytes()); + buffer.write("abc".getBytes()); + System.out.println(buffer.readByteString().string(Charset.defaultCharset())); + } + + @SneakyThrows + @Test + public void test4() { + ByteArrayOutputStream bos = new ByteArrayOutputStream(100000); + BufferedSink sink = Okio.buffer(Okio.sink(bos)); + sink.write("abc".getBytes()); + sink.write("abc".getBytes()); + sink.flush(); + System.out.println(bos.toByteArray().length); + + } + + + @Test + public void test5() { + Vector v = new Vector<>(); + v.add("a"); + v.add("b"); + v.add("c"); + ArrayList list = Collections.list(v.elements()); + list.stream().parallel().forEach(i -> { + System.out.println(i + " " + Thread.currentThread().getName()); + }); + } + + + @Test + public void test6() { + Buffer buffer = new Buffer(); + buffer.write("abc".getBytes()); + System.out.println(buffer.readByteString().string(Charset.defaultCharset())); + buffer.clear(); + System.out.println(buffer.readByteString().string(Charset.defaultCharset())); + } + + +}