diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 0af2971af5927..7177b32956e9a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.bookkeeper.mledger.util.StatsBuckets; import org.apache.pulsar.broker.admin.AdminResource; @@ -81,6 +82,9 @@ public abstract class AbstractTopic implements Topic { protected volatile PublishRateLimiter topicPublishRateLimiter; + private LongAdder bytesInCounter = new LongAdder(); + private LongAdder msgInCounter = new LongAdder(); + public AbstractTopic(String topic, BrokerService brokerService) { this.topic = topic; this.brokerService = brokerService; @@ -254,6 +258,9 @@ public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) { this.topicPublishRateLimiter.incrementPublishCount(numOfMessages, msgSizeInBytes); // increase broker publish rate limiter getBrokerPublishRateLimiter().incrementPublishCount(numOfMessages, msgSizeInBytes); + // increase counters + bytesInCounter.add(msgSizeInBytes); + msgInCounter.add(numOfMessages); } @Override @@ -376,5 +383,11 @@ private void updatePublishDispatcher(Policies policies) { } } + public long getMsgInCounter() { return this.msgInCounter.longValue(); } + + public long getBytesInCounter() { + return this.bytesInCounter.longValue(); + } + private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 33a4d30c91302..4f5d91f13a6c4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -762,6 +762,8 @@ public NonPersistentTopicStats getStats() { }); stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgThroughputIn / stats.msgRateIn); + stats.msgInCounter = getMsgInCounter(); + stats.bytesInCounter = getBytesInCounter(); subscriptions.forEach((name, subscription) -> { NonPersistentSubscriptionStats subStats = subscription.getStats(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index f45bb96262087..6dbcc52cadd76 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1466,6 +1466,8 @@ public TopicStats getStats() { }); stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgThroughputIn / stats.msgRateIn); + stats.msgInCounter = getMsgInCounter(); + stats.bytesInCounter = getBytesInCounter(); subscriptions.forEach((name, subscription) -> { SubscriptionStats subStats = subscription.getStats(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index fc055b3e436e2..19aa043ae823a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -104,6 +104,9 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include stats.storageReadRate = mlStats.getReadEntriesRate(); } + stats.msgInCounter = topic.getStats().msgInCounter; + stats.bytesInCounter = topic.getStats().bytesInCounter; + stats.producersCount = 0; topic.getProducers().values().forEach(producer -> { if (producer.isRemote()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index 3d57cddf145f3..846104441fac3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -34,6 +34,8 @@ class TopicStats { double rateOut; double throughputIn; double throughputOut; + long msgInCounter; + long bytesInCounter; long storageSize; public long msgBacklog; @@ -63,6 +65,8 @@ public void reset() { rateOut = 0; throughputIn = 0; throughputOut = 0; + bytesInCounter = 0; + msgInCounter = 0; storageSize = 0; msgBacklog = 0; @@ -162,6 +166,9 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin replStats.replicationBacklog); }); } + + metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter); + metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter); } static void metricType(SimpleTextOutputStream stream, String name) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 0282971a252b0..e569cafdf5044 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -95,6 +95,20 @@ public void testPerTopicStats() throws Exception { assertEquals(cm.size(), 1); assertEquals(cm.get(0).tags.get("cluster"), "test"); + cm = (List) metrics.get("pulsar_in_bytes_total"); + assertEquals(cm.size(), 2); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2"); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns"); + + cm = (List) metrics.get("pulsar_in_messages_total"); + assertEquals(cm.size(), 2); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2"); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns"); + p1.close(); p2.close(); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java index 422b6588ff189..6f7fcc16e2d39 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java @@ -63,6 +63,9 @@ public class TopicStats { public String deduplicationStatus; + public long bytesInCounter; + public long msgInCounter; + public TopicStats() { this.publishers = Lists.newArrayList(); this.subscriptions = Maps.newHashMap(); @@ -78,6 +81,8 @@ public void reset() { this.averageMsgSize = 0; this.storageSize = 0; this.backlogSize = 0; + this.bytesInCounter = 0; + this.msgInCounter = 0; this.publishers.clear(); this.subscriptions.clear(); this.replication.clear(); @@ -93,6 +98,8 @@ public TopicStats add(TopicStats stats) { this.msgThroughputIn += stats.msgThroughputIn; this.msgRateOut += stats.msgRateOut; this.msgThroughputOut += stats.msgThroughputOut; + this.bytesInCounter += stats.bytesInCounter; + this.msgInCounter += stats.msgInCounter; double newAverageMsgSize = (this.averageMsgSize * (this.count - 1) + stats.averageMsgSize) / this.count; this.averageMsgSize = newAverageMsgSize; this.storageSize += stats.storageSize; diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md index 68804718837a7..3983dcd275c75 100644 --- a/site2/docs/reference-metrics.md +++ b/site2/docs/reference-metrics.md @@ -172,6 +172,8 @@ All the topic metrics are labelled with the following labels: | pulsar_subscription_delayed | Gauge | The total message batches (entries) are delayed for dispatching. | | pulsar_storage_write_latency_le_* | Histogram | The entry rate of a topic that the storage write latency is smaller with a given threshold.
Available thresholds:
| | pulsar_entry_size_le_* | Histogram | The entry rate of a topic that the entry size is smaller with a given threshold.
Available thresholds:
| +| pulsar_in_bytes_total | Counter | The total number of bytes received for this topic | +| pulsar_producers_count | Counter | The total number of messages received for this topic | #### Replication metrics