Skip to content

Commit

Permalink
Expose pulsar_out_bytes_total and pulsar_out_messages_total for names…
Browse files Browse the repository at this point in the history
…pace/subscription/consumer. (apache#6918)

Fixes apache#6891
Rated to apache#5802

Add pulsar_out_bytes_total and pulsar_out_messages_total for namespace/subscription/consumer.

New unit test added.
(cherry picked from commit 204f327)
  • Loading branch information
codelipenghui authored and Addison Higham committed Jun 11, 2020
1 parent dd2b967 commit 06815c8
Show file tree
Hide file tree
Showing 15 changed files with 294 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;

import org.apache.bookkeeper.mledger.Entry;
Expand Down Expand Up @@ -78,6 +79,8 @@ public class Consumer {
private final String consumerName;
private final Rate msgOut;
private final Rate msgRedeliver;
private final LongAdder msgOutCounter;
private final LongAdder bytesOutCounter;

private long lastConsumedTimestamp;
private long lastAckedTimestamp;
Expand Down Expand Up @@ -129,6 +132,8 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.cnx = cnx;
this.msgOut = new Rate();
this.msgRedeliver = new Rate();
this.bytesOutCounter = new LongAdder();
this.msgOutCounter = new LongAdder();
this.appId = appId;
this.authenticationData = cnx.authenticationData;
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
Expand Down Expand Up @@ -222,6 +227,8 @@ public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes ba
MESSAGE_PERMITS_UPDATER.addAndGet(this, -totalMessages);
incrementUnackedMessages(totalMessages);
msgOut.recordMultipleEvents(totalMessages, totalBytes);
msgOutCounter.add(totalMessages);
bytesOutCounter.add(totalBytes);

ctx.channel().eventLoop().execute(() -> {
for (int i = 0; i < entries.size(); i++) {
Expand Down Expand Up @@ -457,6 +464,8 @@ public void updateRates() {
}

public ConsumerStats getStats() {
stats.msgOutCounter = msgOutCounter.longValue();
stats.bytesOutCounter = bytesOutCounter.longValue();
stats.lastAckedTimestamp = lastAckedTimestamp;
stats.lastConsumedTimestamp = lastConsumedTimestamp;
stats.availablePermits = getAvailablePermits();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,8 @@ public NonPersistentSubscriptionStats getStats() {
subStats.consumers.add(consumerStats);
subStats.msgRateOut += consumerStats.msgRateOut;
subStats.msgThroughputOut += consumerStats.msgThroughputOut;
subStats.bytesOutCounter += consumerStats.bytesOutCounter;
subStats.msgOutCounter += consumerStats.msgOutCounter;
subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,8 @@ public NonPersistentTopicStats getStats(boolean getPreciseBacklog) {

stats.msgRateOut += subStats.msgRateOut;
stats.msgThroughputOut += subStats.msgThroughputOut;
stats.bytesOutCounter += subStats.bytesOutCounter;
stats.msgOutCounter += subStats.msgOutCounter;
stats.getSubscriptions().put(name, subStats);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,8 @@ public SubscriptionStats getStats(Boolean getPreciseBacklog) {
subStats.consumers.add(consumerStats);
subStats.msgRateOut += consumerStats.msgRateOut;
subStats.msgThroughputOut += consumerStats.msgThroughputOut;
subStats.bytesOutCounter += consumerStats.bytesOutCounter;
subStats.msgOutCounter += consumerStats.msgOutCounter;
subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
subStats.unackedMessages += consumerStats.unackedMessages;
subStats.lastConsumedTimestamp = Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1510,6 +1510,8 @@ public TopicStats getStats(boolean getPreciseBacklog) {

stats.msgRateOut += subStats.msgRateOut;
stats.msgThroughputOut += subStats.msgThroughputOut;
stats.bytesOutCounter += subStats.bytesOutCounter;
stats.msgOutCounter += subStats.msgOutCounter;
stats.subscriptions.put(name, subStats);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ public class AggregatedConsumerStats {
public double msgThroughputOut;

public long availablePermits;

long msgOutCounter;

long bytesOutCounter;
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public class AggregatedNamespaceStats {
public double throughputIn;
public double throughputOut;

public long bytesInCounter;
public long msgInCounter;
public long bytesOutCounter;
public long msgOutCounter;

public long storageSize;
public long msgBacklog;
public long msgDelayed;
Expand Down Expand Up @@ -65,6 +70,11 @@ void updateStats(TopicStats stats) {
throughputIn += stats.throughputIn;
throughputOut += stats.throughputOut;

bytesInCounter += stats.bytesInCounter;
msgInCounter += stats.msgInCounter;
bytesOutCounter += stats.bytesOutCounter;
msgOutCounter += stats.msgOutCounter;

storageSize += stats.storageSize;
backlogSize += stats.backlogSize;
offloadedStorageUsed += stats.offloadedStorageUsed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,9 @@ public class AggregatedSubscriptionStats {

public long msgDelayed;

long msgOutCounter;

long bytesOutCounter;

public Map<Consumer, AggregatedConsumerStats> consumerStat = new HashMap<>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.util.SimpleTextOutputStream;

Expand Down Expand Up @@ -104,8 +105,11 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
stats.storageReadRate = mlStats.getReadEntriesRate();
}

stats.msgInCounter = topic.getStats(getPreciseBacklog).msgInCounter;
stats.bytesInCounter = topic.getStats(getPreciseBacklog).bytesInCounter;
org.apache.pulsar.common.policies.data.TopicStats tStatus = topic.getStats(getPreciseBacklog);
stats.msgInCounter = tStatus.msgInCounter;
stats.bytesInCounter = tStatus.bytesInCounter;
stats.msgOutCounter = tStatus.msgOutCounter;
stats.bytesOutCounter = tStatus.bytesOutCounter;

stats.producersCount = 0;
topic.getProducers().values().forEach(producer -> {
Expand All @@ -123,43 +127,53 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
}
});

topic.getSubscriptions().forEach((name, subscription) -> {
tStatus.subscriptions.forEach((subName, subscriptionStats) -> {
stats.subscriptionsCount++;
stats.msgBacklog += subscription.getNumberOfEntriesInBacklog(getPreciseBacklog);
stats.msgBacklog += subscriptionStats.msgBacklog;

AggregatedSubscriptionStats subsStats = stats.subscriptionStats
.computeIfAbsent(name, k -> new AggregatedSubscriptionStats());
subsStats.msgBacklog = subscription.getNumberOfEntriesInBacklog(getPreciseBacklog);
subsStats.msgDelayed = subscription.getNumberOfEntriesDelayed();
.computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
subsStats.msgBacklog = subscriptionStats.msgBacklog;
subsStats.msgDelayed = subscriptionStats.msgDelayed;
subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
stats.rateOut += subsStats.msgRateOut;
stats.throughputOut += subsStats.msgThroughputOut;
subscriptionStats.consumers.forEach(cStats -> {
stats.consumersCount++;
subsStats.unackedMessages += cStats.unackedMessages;
subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
subsStats.msgRateOut += cStats.msgRateOut;
subsStats.msgThroughputOut += cStats.msgThroughputOut;
subsStats.bytesOutCounter += cStats.bytesOutCounter;
subsStats.msgOutCounter += cStats.msgOutCounter;
if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) {
subsStats.blockedSubscriptionOnUnackedMsgs = true;
}
});
});

subscription.getConsumers().forEach(consumer -> {
// Consumer stats can be a lot if a subscription has many consumers
if (includeConsumerMetrics) {
topic.getSubscriptions().forEach((name, subscription) -> {
AggregatedSubscriptionStats subsStats = stats.subscriptionStats
.computeIfAbsent(name, k -> new AggregatedSubscriptionStats());
subscription.getConsumers().forEach(consumer -> {
ConsumerStats conStats = consumer.getStats();

// Consumer stats can be a lot if a subscription has many consumers
if (includeConsumerMetrics) {
AggregatedConsumerStats consumerStats = subsStats.consumerStat
.computeIfAbsent(consumer, k -> new AggregatedConsumerStats());
consumerStats.unackedMessages = consumer.getStats().unackedMessages;
consumerStats.msgRateRedeliver = consumer.getStats().msgRateRedeliver;
consumerStats.msgRateOut = consumer.getStats().msgRateOut;
consumerStats.msgThroughputOut = consumer.getStats().msgThroughputOut;
consumerStats.availablePermits = consumer.getStats().availablePermits;
consumerStats.blockedSubscriptionOnUnackedMsgs = consumer.getStats().blockedConsumerOnUnackedMsgs;
}

subsStats.unackedMessages += consumer.getStats().unackedMessages;
subsStats.msgRateRedeliver += consumer.getStats().msgRateRedeliver;
subsStats.msgRateOut += consumer.getStats().msgRateOut;
subsStats.msgThroughputOut += consumer.getStats().msgThroughputOut;
if (!subsStats.blockedSubscriptionOnUnackedMsgs && consumer.getStats().blockedConsumerOnUnackedMsgs) {
subsStats.blockedSubscriptionOnUnackedMsgs = true;
}

stats.consumersCount++;
stats.rateOut += consumer.getStats().msgRateOut;
stats.throughputOut += consumer.getStats().msgThroughputOut;
consumerStats.unackedMessages = conStats.unackedMessages;
consumerStats.msgRateRedeliver = conStats.msgRateRedeliver;
consumerStats.msgRateOut = conStats.msgRateOut;
consumerStats.msgThroughputOut = conStats.msgThroughputOut;
consumerStats.bytesOutCounter = conStats.bytesOutCounter;
consumerStats.msgOutCounter = conStats.msgOutCounter;
consumerStats.availablePermits = conStats.availablePermits;
consumerStats.blockedSubscriptionOnUnackedMsgs = conStats.blockedConsumerOnUnackedMsgs;
});
});
});
}

topic.getReplicators().forEach((cluster, replicator) -> {
AggregatedReplicationStats aggReplStats = stats.replicationStats.computeIfAbsent(cluster,
Expand Down Expand Up @@ -206,6 +220,11 @@ private static void printNamespaceStats(SimpleTextOutputStream stream, String cl
metric(stream, cluster, namespace, "pulsar_throughput_in", stats.throughputIn);
metric(stream, cluster, namespace, "pulsar_throughput_out", stats.throughputOut);

metric(stream, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter);
metric(stream, cluster, namespace, "pulsar_in_messages_total", stats.msgInCounter);
metric(stream, cluster, namespace, "pulsar_out_bytes_total", stats.bytesOutCounter);
metric(stream, cluster, namespace, "pulsar_out_messages_total", stats.msgOutCounter);

metric(stream, cluster, namespace, "pulsar_storage_size", stats.storageSize);
metric(stream, cluster, namespace, "pulsar_storage_backlog_size", stats.backlogSize);
metric(stream, cluster, namespace, "pulsar_storage_offloaded_size", stats.offloadedStorageUsed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class TopicStats {
double throughputOut;
long msgInCounter;
long bytesInCounter;
long msgOutCounter;
long bytesOutCounter;

long storageSize;
public long msgBacklog;
Expand Down Expand Up @@ -67,6 +69,8 @@ public void reset() {
throughputOut = 0;
bytesInCounter = 0;
msgInCounter = 0;
bytesOutCounter = 0;
msgOutCounter = 0;

storageSize = 0;
msgBacklog = 0;
Expand Down Expand Up @@ -141,13 +145,17 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages", subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out", subsStats.msgRateOut);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out", subsStats.msgThroughputOut);
metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total", subsStats.bytesOutCounter);
metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total", subsStats.msgOutCounter);
subsStats.consumerStat.forEach((c, consumerStats) -> {
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_unacked_messages", consumerStats.unackedMessages);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_blocked_on_unacked_messages", consumerStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_available_permits", consumerStats.availablePermits);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_out_bytes_total", consumerStats.bytesOutCounter);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_out_messages_total", consumerStats.msgOutCounter);
});
});

Expand Down
Loading

0 comments on commit 06815c8

Please sign in to comment.