diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index c5f5bbf385dec..31227dba78e4a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.LongAdder; import java.util.stream.Collectors; import org.apache.bookkeeper.mledger.Entry; @@ -78,6 +79,8 @@ public class Consumer { private final String consumerName; private final Rate msgOut; private final Rate msgRedeliver; + private final LongAdder msgOutCounter; + private final LongAdder bytesOutCounter; private long lastConsumedTimestamp; private long lastAckedTimestamp; @@ -129,6 +132,8 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo this.cnx = cnx; this.msgOut = new Rate(); this.msgRedeliver = new Rate(); + this.bytesOutCounter = new LongAdder(); + this.msgOutCounter = new LongAdder(); this.appId = appId; this.authenticationData = cnx.authenticationData; PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0); @@ -222,6 +227,8 @@ public ChannelPromise sendMessages(final List entries, EntryBatchSizes ba MESSAGE_PERMITS_UPDATER.addAndGet(this, -totalMessages); incrementUnackedMessages(totalMessages); msgOut.recordMultipleEvents(totalMessages, totalBytes); + msgOutCounter.add(totalMessages); + bytesOutCounter.add(totalBytes); ctx.channel().eventLoop().execute(() -> { for (int i = 0; i < entries.size(); i++) { @@ -457,6 +464,8 @@ public void updateRates() { } public ConsumerStats getStats() { + stats.msgOutCounter = msgOutCounter.longValue(); + stats.bytesOutCounter = bytesOutCounter.longValue(); stats.lastAckedTimestamp = lastAckedTimestamp; stats.lastConsumedTimestamp = lastConsumedTimestamp; stats.availablePermits = getAvailablePermits(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index f653ee52b4953..316024a314846 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -384,6 +384,8 @@ public NonPersistentSubscriptionStats getStats() { subStats.consumers.add(consumerStats); subStats.msgRateOut += consumerStats.msgRateOut; subStats.msgThroughputOut += consumerStats.msgThroughputOut; + subStats.bytesOutCounter += consumerStats.bytesOutCounter; + subStats.msgOutCounter += consumerStats.msgOutCounter; subStats.msgRateRedeliver += consumerStats.msgRateRedeliver; }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 332abfbe163fa..0a99a64aeaacf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -778,6 +778,8 @@ public NonPersistentTopicStats getStats(boolean getPreciseBacklog) { stats.msgRateOut += subStats.msgRateOut; stats.msgThroughputOut += subStats.msgThroughputOut; + stats.bytesOutCounter += subStats.bytesOutCounter; + stats.msgOutCounter += subStats.msgOutCounter; stats.getSubscriptions().put(name, subStats); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 5e6a2135fdd92..25633f9c38414 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -946,6 +946,8 @@ public SubscriptionStats getStats(Boolean getPreciseBacklog) { subStats.consumers.add(consumerStats); subStats.msgRateOut += consumerStats.msgRateOut; subStats.msgThroughputOut += consumerStats.msgThroughputOut; + subStats.bytesOutCounter += consumerStats.bytesOutCounter; + subStats.msgOutCounter += consumerStats.msgOutCounter; subStats.msgRateRedeliver += consumerStats.msgRateRedeliver; subStats.unackedMessages += consumerStats.unackedMessages; subStats.lastConsumedTimestamp = Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index b7470fa2f5cb3..28911bcdbf49d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1510,6 +1510,8 @@ public TopicStats getStats(boolean getPreciseBacklog) { stats.msgRateOut += subStats.msgRateOut; stats.msgThroughputOut += subStats.msgThroughputOut; + stats.bytesOutCounter += subStats.bytesOutCounter; + stats.msgOutCounter += subStats.msgOutCounter; stats.subscriptions.put(name, subStats); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java index 0fadf3e6ac8ba..8b6bf7d5c9691 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java @@ -31,4 +31,8 @@ public class AggregatedConsumerStats { public double msgThroughputOut; public long availablePermits; + + long msgOutCounter; + + long bytesOutCounter; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java index ea05ed074c09e..1100523cd0693 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java @@ -34,6 +34,11 @@ public class AggregatedNamespaceStats { public double throughputIn; public double throughputOut; + public long bytesInCounter; + public long msgInCounter; + public long bytesOutCounter; + public long msgOutCounter; + public long storageSize; public long msgBacklog; public long msgDelayed; @@ -65,6 +70,11 @@ void updateStats(TopicStats stats) { throughputIn += stats.throughputIn; throughputOut += stats.throughputOut; + bytesInCounter += stats.bytesInCounter; + msgInCounter += stats.msgInCounter; + bytesOutCounter += stats.bytesOutCounter; + msgOutCounter += stats.msgOutCounter; + storageSize += stats.storageSize; backlogSize += stats.backlogSize; offloadedStorageUsed += stats.offloadedStorageUsed; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java index d5b53537dcc9a..1f1e879972bc1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java @@ -41,5 +41,9 @@ public class AggregatedSubscriptionStats { public long msgDelayed; + long msgOutCounter; + + long bytesOutCounter; + public Map consumerStat = new HashMap<>(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index a7b35b8c63337..f032ea172906a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -27,6 +27,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.policies.data.ReplicatorStats; import org.apache.pulsar.common.util.SimpleTextOutputStream; @@ -104,8 +105,11 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include stats.storageReadRate = mlStats.getReadEntriesRate(); } - stats.msgInCounter = topic.getStats(getPreciseBacklog).msgInCounter; - stats.bytesInCounter = topic.getStats(getPreciseBacklog).bytesInCounter; + org.apache.pulsar.common.policies.data.TopicStats tStatus = topic.getStats(getPreciseBacklog); + stats.msgInCounter = tStatus.msgInCounter; + stats.bytesInCounter = tStatus.bytesInCounter; + stats.msgOutCounter = tStatus.msgOutCounter; + stats.bytesOutCounter = tStatus.bytesOutCounter; stats.producersCount = 0; topic.getProducers().values().forEach(producer -> { @@ -123,43 +127,53 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include } }); - topic.getSubscriptions().forEach((name, subscription) -> { + tStatus.subscriptions.forEach((subName, subscriptionStats) -> { stats.subscriptionsCount++; - stats.msgBacklog += subscription.getNumberOfEntriesInBacklog(getPreciseBacklog); + stats.msgBacklog += subscriptionStats.msgBacklog; AggregatedSubscriptionStats subsStats = stats.subscriptionStats - .computeIfAbsent(name, k -> new AggregatedSubscriptionStats()); - subsStats.msgBacklog = subscription.getNumberOfEntriesInBacklog(getPreciseBacklog); - subsStats.msgDelayed = subscription.getNumberOfEntriesDelayed(); + .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats()); + subsStats.msgBacklog = subscriptionStats.msgBacklog; + subsStats.msgDelayed = subscriptionStats.msgDelayed; subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed; + stats.rateOut += subsStats.msgRateOut; + stats.throughputOut += subsStats.msgThroughputOut; + subscriptionStats.consumers.forEach(cStats -> { + stats.consumersCount++; + subsStats.unackedMessages += cStats.unackedMessages; + subsStats.msgRateRedeliver += cStats.msgRateRedeliver; + subsStats.msgRateOut += cStats.msgRateOut; + subsStats.msgThroughputOut += cStats.msgThroughputOut; + subsStats.bytesOutCounter += cStats.bytesOutCounter; + subsStats.msgOutCounter += cStats.msgOutCounter; + if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) { + subsStats.blockedSubscriptionOnUnackedMsgs = true; + } + }); + }); - subscription.getConsumers().forEach(consumer -> { + // Consumer stats can be a lot if a subscription has many consumers + if (includeConsumerMetrics) { + topic.getSubscriptions().forEach((name, subscription) -> { + AggregatedSubscriptionStats subsStats = stats.subscriptionStats + .computeIfAbsent(name, k -> new AggregatedSubscriptionStats()); + subscription.getConsumers().forEach(consumer -> { + ConsumerStats conStats = consumer.getStats(); - // Consumer stats can be a lot if a subscription has many consumers - if (includeConsumerMetrics) { AggregatedConsumerStats consumerStats = subsStats.consumerStat .computeIfAbsent(consumer, k -> new AggregatedConsumerStats()); - consumerStats.unackedMessages = consumer.getStats().unackedMessages; - consumerStats.msgRateRedeliver = consumer.getStats().msgRateRedeliver; - consumerStats.msgRateOut = consumer.getStats().msgRateOut; - consumerStats.msgThroughputOut = consumer.getStats().msgThroughputOut; - consumerStats.availablePermits = consumer.getStats().availablePermits; - consumerStats.blockedSubscriptionOnUnackedMsgs = consumer.getStats().blockedConsumerOnUnackedMsgs; - } - - subsStats.unackedMessages += consumer.getStats().unackedMessages; - subsStats.msgRateRedeliver += consumer.getStats().msgRateRedeliver; - subsStats.msgRateOut += consumer.getStats().msgRateOut; - subsStats.msgThroughputOut += consumer.getStats().msgThroughputOut; - if (!subsStats.blockedSubscriptionOnUnackedMsgs && consumer.getStats().blockedConsumerOnUnackedMsgs) { - subsStats.blockedSubscriptionOnUnackedMsgs = true; - } - stats.consumersCount++; - stats.rateOut += consumer.getStats().msgRateOut; - stats.throughputOut += consumer.getStats().msgThroughputOut; + consumerStats.unackedMessages = conStats.unackedMessages; + consumerStats.msgRateRedeliver = conStats.msgRateRedeliver; + consumerStats.msgRateOut = conStats.msgRateOut; + consumerStats.msgThroughputOut = conStats.msgThroughputOut; + consumerStats.bytesOutCounter = conStats.bytesOutCounter; + consumerStats.msgOutCounter = conStats.msgOutCounter; + consumerStats.availablePermits = conStats.availablePermits; + consumerStats.blockedSubscriptionOnUnackedMsgs = conStats.blockedConsumerOnUnackedMsgs; + }); }); - }); + } topic.getReplicators().forEach((cluster, replicator) -> { AggregatedReplicationStats aggReplStats = stats.replicationStats.computeIfAbsent(cluster, @@ -206,6 +220,11 @@ private static void printNamespaceStats(SimpleTextOutputStream stream, String cl metric(stream, cluster, namespace, "pulsar_throughput_in", stats.throughputIn); metric(stream, cluster, namespace, "pulsar_throughput_out", stats.throughputOut); + metric(stream, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter); + metric(stream, cluster, namespace, "pulsar_in_messages_total", stats.msgInCounter); + metric(stream, cluster, namespace, "pulsar_out_bytes_total", stats.bytesOutCounter); + metric(stream, cluster, namespace, "pulsar_out_messages_total", stats.msgOutCounter); + metric(stream, cluster, namespace, "pulsar_storage_size", stats.storageSize); metric(stream, cluster, namespace, "pulsar_storage_backlog_size", stats.backlogSize); metric(stream, cluster, namespace, "pulsar_storage_offloaded_size", stats.offloadedStorageUsed); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index caf0ce842e2b3..0510d341e21a2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -36,6 +36,8 @@ class TopicStats { double throughputOut; long msgInCounter; long bytesInCounter; + long msgOutCounter; + long bytesOutCounter; long storageSize; public long msgBacklog; @@ -67,6 +69,8 @@ public void reset() { throughputOut = 0; bytesInCounter = 0; msgInCounter = 0; + bytesOutCounter = 0; + msgOutCounter = 0; storageSize = 0; msgBacklog = 0; @@ -141,6 +145,8 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages", subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out", subsStats.msgRateOut); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out", subsStats.msgThroughputOut); + metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total", subsStats.bytesOutCounter); + metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total", subsStats.msgOutCounter); subsStats.consumerStat.forEach((c, consumerStats) -> { metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver); metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_unacked_messages", consumerStats.unackedMessages); @@ -148,6 +154,8 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut); metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut); metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_available_permits", consumerStats.availablePermits); + metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_out_bytes_total", consumerStats.bytesOutCounter); + metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_out_messages_total", consumerStats.msgOutCounter); }); }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index e569cafdf5044..92abb5ccace9d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -34,9 +34,12 @@ import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import com.google.common.base.MoreObjects; @@ -45,13 +48,14 @@ import com.google.common.collect.Multimap; public class PrometheusMetricsTest extends BrokerTestBase { - @BeforeClass + + @BeforeMethod @Override protected void setup() throws Exception { super.baseSetup(); } - @AfterClass + @AfterMethod @Override protected void cleanup() throws Exception { super.internalCleanup(); @@ -61,12 +65,30 @@ protected void cleanup() throws Exception { public void testPerTopicStats() throws Exception { Producer p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create(); Producer p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create(); - for (int i = 0; i < 10; i++) { + + Consumer c1 = pulsarClient.newConsumer() + .topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("test") + .subscribe(); + + Consumer c2 = pulsarClient.newConsumer() + .topic("persistent://my-property/use/my-ns/my-topic2") + .subscriptionName("test") + .subscribe(); + + final int messages = 10; + + for (int i = 0; i < messages; i++) { String message = "my-message-" + i; p1.send(message.getBytes()); p2.send(message.getBytes()); } + for (int i = 0; i < messages; i++) { + c1.acknowledge(c1.receive()); + c2.acknowledge(c2.receive()); + } + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); PrometheusMetricsGenerator.generate(pulsar, true, false, statsOut); String metricsStr = new String(statsOut.toByteArray()); @@ -109,20 +131,58 @@ public void testPerTopicStats() throws Exception { assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns"); + cm = (List) metrics.get("pulsar_out_bytes_total"); + assertEquals(cm.size(), 2); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2"); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(0).tags.get("subscription"), "test"); + assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(1).tags.get("subscription"), "test"); + + cm = (List) metrics.get("pulsar_out_messages_total"); + assertEquals(cm.size(), 2); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2"); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(0).tags.get("subscription"), "test"); + assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(1).tags.get("subscription"), "test"); + p1.close(); p2.close(); + c1.close(); + c2.close(); } @Test public void testPerNamespaceStats() throws Exception { Producer p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create(); Producer p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create(); - for (int i = 0; i < 10; i++) { + + Consumer c1 = pulsarClient.newConsumer() + .topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("test") + .subscribe(); + + Consumer c2 = pulsarClient.newConsumer() + .topic("persistent://my-property/use/my-ns/my-topic2") + .subscriptionName("test") + .subscribe(); + + final int messages = 10; + + for (int i = 0; i < messages; i++) { String message = "my-message-" + i; p1.send(message.getBytes()); p2.send(message.getBytes()); } + for (int i = 0; i < messages; i++) { + c1.acknowledge(c1.receive()); + c2.acknowledge(c2.receive()); + } + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut); String metricsStr = new String(statsOut.toByteArray()); @@ -141,12 +201,114 @@ public void testPerNamespaceStats() throws Exception { cm = (List) metrics.get("pulsar_producers_count"); assertEquals(cm.size(), 2); - assertEquals(cm.get(1).value, 2.0); assertNull(cm.get(1).tags.get("topic")); assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns"); + cm = (List) metrics.get("pulsar_in_bytes_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + + cm = (List) metrics.get("pulsar_in_messages_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + + cm = (List) metrics.get("pulsar_out_bytes_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + + cm = (List) metrics.get("pulsar_out_messages_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + + p1.close(); + p2.close(); + c1.close(); + c2.close(); + } + + @Test + public void testPerConsumerStats() throws Exception { + Producer p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create(); + Producer p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create(); + + Consumer c1 = pulsarClient.newConsumer() + .topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("test") + .subscribe(); + + Consumer c2 = pulsarClient.newConsumer() + .topic("persistent://my-property/use/my-ns/my-topic2") + .subscriptionName("test") + .subscribe(); + + final int messages = 10; + + for (int i = 0; i < messages; i++) { + String message = "my-message-" + i; + p1.send(message.getBytes()); + p2.send(message.getBytes()); + } + + for (int i = 0; i < messages; i++) { + c1.acknowledge(c1.receive()); + c2.acknowledge(c2.receive()); + } + + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, true, true, statsOut); + String metricsStr = new String(statsOut.toByteArray()); + + Multimap metrics = parseMetrics(metricsStr); + + metrics.entries().forEach(e -> { + System.out.println(e.getKey() + ": " + e.getValue()); + }); + + // There should be 1 metric aggregated per namespace + List cm = (List) metrics.get("pulsar_out_bytes_total"); + assertEquals(cm.size(), 4); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2"); + assertEquals(cm.get(0).tags.get("subscription"), "test"); + + assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2"); + assertEquals(cm.get(1).tags.get("subscription"), "test"); + assertEquals(cm.get(1).tags.get("consumer_id"), "1"); + + assertEquals(cm.get(2).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(2).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(2).tags.get("subscription"), "test"); + + assertEquals(cm.get(3).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(3).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(3).tags.get("subscription"), "test"); + assertEquals(cm.get(3).tags.get("consumer_id"), "0"); + + cm = (List) metrics.get("pulsar_out_messages_total"); + assertEquals(cm.size(), 4); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2"); + assertEquals(cm.get(0).tags.get("subscription"), "test"); + + assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2"); + assertEquals(cm.get(1).tags.get("subscription"), "test"); + assertEquals(cm.get(1).tags.get("consumer_id"), "1"); + + assertEquals(cm.get(2).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(2).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(2).tags.get("subscription"), "test"); + + assertEquals(cm.get(3).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(3).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(3).tags.get("subscription"), "test"); + assertEquals(cm.get(3).tags.get("consumer_id"), "0"); + p1.close(); p2.close(); + c1.close(); + c2.close(); } /** Checks for duplicate type definitions for a metric in the Prometheus metrics output. If the Prometheus parser diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java index 7411f03a29f4c..0ecb9448bd1c5 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java @@ -32,6 +32,12 @@ public class ConsumerStats { /** Total throughput delivered to the consumer (bytes/s). */ public double msgThroughputOut; + /** Total bytes delivered to consumer (bytes). */ + public long bytesOutCounter; + + /** Total messages delivered to consumer (msg). */ + public long msgOutCounter; + /** Total rate of messages redelivered by this consumer (msg/s). */ public double msgRateRedeliver; @@ -75,6 +81,8 @@ public ConsumerStats add(ConsumerStats stats) { checkNotNull(stats); this.msgRateOut += stats.msgRateOut; this.msgThroughputOut += stats.msgThroughputOut; + this.bytesOutCounter += stats.bytesOutCounter; + this.msgOutCounter += stats.msgOutCounter; this.msgRateRedeliver += stats.msgRateRedeliver; this.availablePermits += stats.availablePermits; this.unackedMessages += stats.unackedMessages; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index 7064883a04c2a..df8fc72dfe52a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -34,6 +34,12 @@ public class SubscriptionStats { /** Total throughput delivered on this subscription (bytes/s). */ public double msgThroughputOut; + /** Total bytes delivered to consumer (bytes). */ + public long bytesOutCounter; + + /** Total messages delivered to consumer (msg). */ + public long msgOutCounter; + /** Total rate of messages redelivered on this subscription (msg/s). */ public double msgRateRedeliver; @@ -86,6 +92,8 @@ public SubscriptionStats() { public void reset() { msgRateOut = 0; msgThroughputOut = 0; + bytesOutCounter = 0; + msgOutCounter = 0; msgRateRedeliver = 0; msgBacklog = 0; msgBacklogNoDelayed = 0; @@ -101,6 +109,8 @@ public SubscriptionStats add(SubscriptionStats stats) { checkNotNull(stats); this.msgRateOut += stats.msgRateOut; this.msgThroughputOut += stats.msgThroughputOut; + this.bytesOutCounter += stats.bytesOutCounter; + this.msgOutCounter += stats.msgOutCounter; this.msgRateRedeliver += stats.msgRateRedeliver; this.msgBacklog += stats.msgBacklog; this.msgBacklogNoDelayed += stats.msgBacklogNoDelayed; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java index 9962fa95ae383..be471e3bf7c78 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java @@ -43,6 +43,18 @@ public class TopicStats { /** Total throughput of messages dispatched for the topic (byte/s). */ public double msgThroughputOut; + /** Total bytes published to the topic (bytes). */ + public long bytesInCounter; + + /** Total messages published to the topic (msg). */ + public long msgInCounter; + + /** Total bytes delivered to consumer (bytes). */ + public long bytesOutCounter; + + /** Total messages delivered to consumer (msg). */ + public long msgOutCounter; + /** Average size of published messages (bytes). */ public double averageMsgSize; @@ -63,9 +75,6 @@ public class TopicStats { public String deduplicationStatus; - public long bytesInCounter; - public long msgInCounter; - public TopicStats() { this.publishers = Lists.newArrayList(); this.subscriptions = Maps.newHashMap(); @@ -83,6 +92,8 @@ public void reset() { this.backlogSize = 0; this.bytesInCounter = 0; this.msgInCounter = 0; + this.bytesOutCounter = 0; + this.msgOutCounter = 0; this.publishers.clear(); this.subscriptions.clear(); this.replication.clear(); @@ -100,6 +111,8 @@ public TopicStats add(TopicStats stats) { this.msgThroughputOut += stats.msgThroughputOut; this.bytesInCounter += stats.bytesInCounter; this.msgInCounter += stats.msgInCounter; + this.bytesOutCounter += stats.bytesOutCounter; + this.msgOutCounter += stats.msgOutCounter; double newAverageMsgSize = (this.averageMsgSize * (this.count - 1) + stats.averageMsgSize) / this.count; this.averageMsgSize = newAverageMsgSize; this.storageSize += stats.storageSize; diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md index 3983dcd275c75..b57406c14f9dc 100644 --- a/site2/docs/reference-metrics.md +++ b/site2/docs/reference-metrics.md @@ -174,6 +174,8 @@ All the topic metrics are labelled with the following labels: | pulsar_entry_size_le_* | Histogram | The entry rate of a topic that the entry size is smaller with a given threshold.
Available thresholds:
  • pulsar_entry_size_le_128: <= 128 bytes
  • pulsar_entry_size_le_512: <= 512 bytes
  • pulsar_entry_size_le_1_kb: <= 1 KB
  • pulsar_entry_size_le_2_kb: <= 2 KB
  • pulsar_entry_size_le_4_kb: <= 4 KB
  • pulsar_entry_size_le_16_kb: <= 16 KB
  • pulsar_entry_size_le_100_kb: <= 100 KB
  • pulsar_entry_size_le_1_mb: <= 1 MB
  • pulsar_entry_size_le_overflow: > 1 MB
| | pulsar_in_bytes_total | Counter | The total number of bytes received for this topic | | pulsar_producers_count | Counter | The total number of messages received for this topic | +| pulsar_out_bytes_total | Counter | The total number of bytes read from this topic | +| pulsar_out_messages_total | Counter | The total number of messages read from this topic | #### Replication metrics