From 50fdd31dd78b1fbe9bffd9d083108903bfe82d97 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Thu, 27 Jul 2017 13:43:02 -0700 Subject: [PATCH] formating + batch-number counting for msg-drop + ack-publish on same thread --- .../broker/admin/NonPersistentTopics.java | 6 +--- .../pulsar/broker/admin/PersistentTopics.java | 28 +------------------ .../pulsar/broker/service/Consumer.java | 4 +-- .../pulsar/broker/service/Producer.java | 15 +++++----- .../pulsar/broker/service/ServerCnx.java | 2 +- ...PersistentDispatcherMultipleConsumers.java | 10 +++++-- ...sistentDispatcherSingleActiveConsumer.java | 15 ++++++++-- .../NonPersistentSubscription.java | 4 +-- .../nonpersistent/NonPersistentTopic.java | 6 ++-- .../pulsar/broker/admin/AdminApiTest.java | 10 +++---- .../client/api/NonPersistentTopicTest.java | 18 ++++++------ .../src/test/resources/log4j.properties | 4 +-- .../data/NonPersistentTopicStats.java | 12 ++++++++ .../common/policies/data/PublisherStats.java | 2 +- .../common/policies/data/ReplicatorStats.java | 2 +- .../policies/data/SubscriptionStats.java | 2 +- 16 files changed, 69 insertions(+), 71 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/NonPersistentTopics.java index 2ecb0bec25488..d6fa05addf107 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/NonPersistentTopics.java @@ -80,11 +80,7 @@ public NonPersistentTopicStats getStats(@PathParam("property") String property, DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); validateAdminOperationOnDestination(dn, authoritative); Topic topic = getTopicReference(dn); - if(topic instanceof NonPersistentTopic) { - return ((NonPersistentTopic)topic).getStats(); - }else { - throw new RestException(Status.METHOD_NOT_ALLOWED, "Can not get stats for persistent topic"); - } + return ((NonPersistentTopic)topic).getStats(); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java index b47c1f8f0bba5..cb4bd17c56803 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java @@ -775,11 +775,6 @@ public void skipAllMessages(@PathParam("property") String property, @PathParam(" } } else { validateAdminOperationOnDestination(dn, authoritative); - if (!(getTopicReference(dn) instanceof PersistentTopic)) { - log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), dn, subName); - throw new RestException(Status.METHOD_NOT_ALLOWED, - "Skip messages on a non-persistent topic is not allowed"); - } PersistentTopic topic = (PersistentTopic) getTopicReference(dn); try { if (subName.startsWith(topic.replicatorPrefix)) { @@ -820,11 +815,6 @@ public void skipMessages(@PathParam("property") String property, @PathParam("clu throw new RestException(Status.METHOD_NOT_ALLOWED, "Skip messages on a partitioned topic is not allowed"); } validateAdminOperationOnDestination(dn, authoritative); - if (!(getTopicReference(dn) instanceof PersistentTopic)) { - log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), dn, subName); - throw new RestException(Status.METHOD_NOT_ALLOWED, - "Skip messages on a non-persistent topic is not allowed"); - } PersistentTopic topic = (PersistentTopic) getTopicReference(dn); try { if (subName.startsWith(topic.replicatorPrefix)) { @@ -887,11 +877,6 @@ public void expireMessagesForAllSubscriptions(@PathParam("property") String prop } else { // validate ownership and redirect if current broker is not owner validateAdminOperationOnDestination(dn, authoritative); - if (!(getTopicReference(dn) instanceof PersistentTopic)) { - log.error("[{}] Not supported operation of non-persistent topic {}", clientAppId(), dn); - throw new RestException(Status.METHOD_NOT_ALLOWED, - "Expire messages on a non-persistent topic is not allowed"); - } PersistentTopic topic = (PersistentTopic) getTopicReference(dn); topic.getReplicators().forEach((subName, replicator) -> { expireMessages(property, cluster, namespace, destination, subName, expireTimeInSeconds, authoritative); @@ -951,11 +936,6 @@ public void resetCursor(@PathParam("property") String property, @PathParam("clus validateAdminOperationOnDestination(dn, authoritative); log.info("[{}][{}] received reset cursor on subscription {} to time {}", clientAppId(), destination, subName, timestamp); - if (!(getTopicReference(dn) instanceof PersistentTopic)) { - log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), dn, subName); - throw new RestException(Status.METHOD_NOT_ALLOWED, - "Reset cursor on a non-persistent topic is not allowed"); - } PersistentTopic topic = (PersistentTopic) getTopicReference(dn); try { PersistentSubscription sub = topic.getSubscription(subName); @@ -1136,13 +1116,7 @@ public MessageId terminate(@PathParam("property") String property, @PathParam("c validateAdminOperationOnDestination(dn, authoritative); Topic topic = getTopicReference(dn); try { - if (topic instanceof PersistentTopic) { - return ((PersistentTopic) topic).terminate().get(); - } else { - log.error("[{}] Not supported operation of non-persistent topic {}", clientAppId(), dn); - throw new RestException(Status.METHOD_NOT_ALLOWED, - "Expire messages on a non-persistent topic is not allowed"); - } + return ((PersistentTopic) topic).terminate().get(); } catch (Exception exception) { log.error("[{}] Failed to terminated topic {}", clientAppId(), dn, exception); throw new RestException(exception); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index cdfa7b2014aab..83235f2c688b7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -217,7 +217,7 @@ private void incrementUnackedMessages(int ackedMessages) { } } - int getBatchSizeforEntry(ByteBuf metadataAndPayload) { + public static int getBatchSizeforEntry(ByteBuf metadataAndPayload, String subscription, long consumerId) { try { // save the reader index and restore after parsing metadataAndPayload.markReaderIndex(); @@ -244,7 +244,7 @@ int updatePermitsAndPendingAcks(final List entries) throws PulsarServerEx while (iter.hasNext()) { Entry entry = iter.next(); ByteBuf metadataAndPayload = entry.getDataBuffer(); - int batchSize = getBatchSizeforEntry(metadataAndPayload); + int batchSize = getBatchSizeforEntry(metadataAndPayload, subscription.toString(), consumerId); if (batchSize == -1) { // this would suggest that the message might have been corrupted iter.remove(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index f29efb5ad548e..84f273287cf9c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -59,7 +59,7 @@ public class Producer { private final String appId; private Rate msgIn; // it records msg-drop rate only for non-persistent topic - private Rate msgDrop; + private final Rate msgDrop; private volatile long pendingPublishAcks = 0; private static final AtomicLongFieldUpdater pendingPublishAcksUpdater = AtomicLongFieldUpdater @@ -81,9 +81,8 @@ public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName this.closeFuture = new CompletableFuture<>(); this.appId = appId; this.msgIn = new Rate(); - this.msgDrop = new Rate(); this.isNonPersistentTopic = topic instanceof NonPersistentTopic; - + this.msgDrop = this.isNonPersistentTopic ? new Rate() : null; this.stats = isNonPersistentTopic ? new NonPersistentPublisherStats() : new PublisherStats(); stats.address = cnx.clientAddress().toString(); stats.connectedSince = DATE_FORMAT.format(Instant.now()); @@ -181,8 +180,10 @@ private void publishOperationCompleted() { } } - public void recordMessageDrop() { - msgDrop.recordEvent(); + public void recordMessageDrop(int batchSize) { + if (this.isNonPersistentTopic) { + msgDrop.recordEvent(batchSize); + } } private static final class MessagePublishedCallback implements PublishCallback, Runnable { @@ -347,11 +348,11 @@ public CompletableFuture disconnect() { public void updateRates() { msgIn.calculateRate(); - msgDrop.calculateRate(); stats.msgRateIn = msgIn.getRate(); stats.msgThroughputIn = msgIn.getValueRate(); stats.averageMsgSize = msgIn.getAverageValue(); - if (stats instanceof NonPersistentPublisherStats) { + if (this.isNonPersistentTopic) { + msgDrop.calculateRate(); ((NonPersistentPublisherStats) stats).msgDropRate = msgDrop.getRate(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 5c75d8b68e3e9..323f49a7b47f2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -582,7 +582,7 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) { service.getTopicOrderedExecutor().submitOrdered(producer.getTopic(), SafeRun.safeRun(() -> { ctx.writeAndFlush(Commands.newSendReceipt(producerId, sequenceId, -1, -1), ctx.voidPromise()); })); - producer.recordMessageDrop(); + producer.recordMessageDrop(send.getNumMessages()); return; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java index 160e5d939be76..2b2bcb3abed84 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java @@ -27,6 +27,7 @@ import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.Consumer; +import static org.apache.pulsar.broker.service.Consumer.getBatchSizeforEntry; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.utils.CopyOnWriteArrayList; import org.slf4j.Logger; @@ -146,8 +147,13 @@ public void sendMessages(List entries) { if (consumer != null) { TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -consumer.sendMessages(entries).getRight()); } else { - msgDrop.recordEvent(entries.size()); - entries.forEach(Entry::release); + entries.forEach(entry -> { + int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), name, -1); + if (totalMsgs > 0) { + msgDrop.recordEvent(); + } + entry.release(); + }); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java index 6001808f21a74..f9ce95d921500 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.service.nonpersistent; +import static org.apache.pulsar.broker.service.Consumer.getBatchSizeforEntry; + import java.util.List; import org.apache.bookkeeper.mledger.Entry; @@ -31,10 +33,12 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer implements NonPersistentDispatcher { private final Rate msgDrop; + private final String name; public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex, - NonPersistentTopic topic) { + NonPersistentTopic topic, String subName) { super(subscriptionType, partitionIndex, topic.getName()); + this.name = topic.getName() + " / " + subName; this.msgDrop = new Rate(); } @@ -44,8 +48,13 @@ public void sendMessages(List entries) { if (currentConsumer != null && currentConsumer.getAvailablePermits() > 0 && currentConsumer.isWritable()) { currentConsumer.sendMessages(entries); } else { - msgDrop.recordEvent(entries.size()); - entries.forEach(Entry::release); + entries.forEach(entry -> { + int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), name, -1); + if (totalMsgs > 0) { + msgDrop.recordEvent(); + } + entry.release(); + }); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index f842e96aa2c5e..25ef976824f37 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -78,7 +78,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce switch (consumer.subType()) { case Exclusive: if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) { - dispatcher = new NonPersistentDispatcherSingleActiveConsumer(SubType.Exclusive, 0, topic); + dispatcher = new NonPersistentDispatcherSingleActiveConsumer(SubType.Exclusive, 0, topic, this.subName); } break; case Shared: @@ -95,7 +95,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce if (dispatcher == null || dispatcher.getType() != SubType.Failover) { dispatcher = new NonPersistentDispatcherSingleActiveConsumer(SubType.Failover, partitionIndex, - topic); + topic, this.subName); } break; default: diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 8217b70a535fa..8599ce7e844ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -749,7 +749,7 @@ public NonPersistentTopicStats getStats() { if (producer.isRemote()) { remotePublishersStats.put(producer.getRemoteCluster(), publisherStats); } else { - stats.publishers.add(publisherStats); + stats.getPublishers().add(publisherStats); } }); @@ -760,7 +760,7 @@ public NonPersistentTopicStats getStats() { stats.msgRateOut += subStats.msgRateOut; stats.msgThroughputOut += subStats.msgThroughputOut; - stats.subscriptions.put(name, subStats); + stats.getSubscriptions().put(name, subStats); }); replicators.forEach((cluster, replicator) -> { @@ -778,7 +778,7 @@ public NonPersistentTopicStats getStats() { stats.msgRateOut += ReplicatorStats.msgRateOut; stats.msgThroughputOut += ReplicatorStats.msgThroughputOut; - stats.replication.put(replicator.getRemoteCluster(), ReplicatorStats); + stats.getReplication().put(replicator.getRemoteCluster(), ReplicatorStats); }); return stats; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 1558bf1056ff6..da63d13be7a03 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -1836,9 +1836,9 @@ public void nonPersistentTopics() throws Exception { publishMessagesOnPersistentTopic("non-persistent://prop-xyz/use/ns1/" + topicName, 10); NonPersistentTopicStats topicStats = admin.nonPersistentTopics().getStats(persistentTopicName); - assertEquals(topicStats.subscriptions.keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub"))); - assertEquals(topicStats.subscriptions.get("my-sub").consumers.size(), 1); - assertEquals(topicStats.publishers.size(), 0); + assertEquals(topicStats.getSubscriptions().keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub"))); + assertEquals(topicStats.getSubscriptions().get("my-sub").consumers.size(), 1); + assertEquals(topicStats.getPublishers().size(), 0); PersistentTopicInternalStats internalStats = admin.nonPersistentTopics().getInternalStats(persistentTopicName); assertEquals(internalStats.cursors.keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub"))); @@ -1847,8 +1847,8 @@ public void nonPersistentTopics() throws Exception { client.close(); topicStats = admin.nonPersistentTopics().getStats(persistentTopicName); - assertTrue(topicStats.subscriptions.keySet().contains("my-sub")); - assertEquals(topicStats.publishers.size(), 0); + assertTrue(topicStats.getSubscriptions().keySet().contains("my-sub")); + assertEquals(topicStats.getPublishers().size(), 0); // test partitioned-topic final String partitionedTopicName = "non-persistent://prop-xyz/use/ns1/paritioned"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index 01a23664aaa10..724ee8a2c4ce8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -397,10 +397,10 @@ public void testTopicStats() throws Exception { rolloverPerIntervalStats(pulsar); stats = topicRef.getStats(); - subStats = stats.subscriptions.values().iterator().next(); + subStats = stats.getSubscriptions().values().iterator().next(); // subscription stats - assertEquals(stats.subscriptions.keySet().size(), 1); + assertEquals(stats.getSubscriptions().keySet().size(), 1); assertEquals(subStats.consumers.size(), 1); Producer producer = pulsarClient.createProducer(topicName); @@ -415,7 +415,7 @@ public void testTopicStats() throws Exception { rolloverPerIntervalStats(pulsar); stats = topicRef.getStats(); - subStats = stats.subscriptions.values().iterator().next(); + subStats = stats.getSubscriptions().values().iterator().next(); assertTrue(subStats.msgRateOut > 0); assertEquals(subStats.consumers.size(), 1); @@ -476,10 +476,10 @@ public void testReplicator() throws Exception { rolloverPerIntervalStats(replicationPulasr); stats = topicRef.getStats(); - subStats = stats.subscriptions.values().iterator().next(); + subStats = stats.getSubscriptions().values().iterator().next(); // subscription stats - assertEquals(stats.subscriptions.keySet().size(), 2); + assertEquals(stats.getSubscriptions().keySet().size(), 2); assertEquals(subStats.consumers.size(), 1); Thread.sleep(timeWaitToSync); @@ -547,7 +547,7 @@ public void testReplicator() throws Exception { rolloverPerIntervalStats(replicationPulasr); stats = topicRef.getStats(); - subStats = stats.subscriptions.values().iterator().next(); + subStats = stats.getSubscriptions().values().iterator().next(); assertTrue(subStats.msgRateOut > 0); assertEquals(subStats.consumers.size(), 1); @@ -762,9 +762,9 @@ public void testMsgDropStat() throws Exception { NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getTopic(topicName).get(); pulsar.getBrokerService().updateRates(); NonPersistentTopicStats stats = topic.getStats(); - NonPersistentPublisherStats npStats = stats.publishers.get(0); - NonPersistentSubscriptionStats sub1Stats = stats.subscriptions.get("subscriber-1"); - NonPersistentSubscriptionStats sub2Stats = stats.subscriptions.get("subscriber-2"); + NonPersistentPublisherStats npStats = stats.getPublishers().get(0); + NonPersistentSubscriptionStats sub1Stats = stats.getSubscriptions().get("subscriber-1"); + NonPersistentSubscriptionStats sub2Stats = stats.getSubscriptions().get("subscriber-2"); assertTrue(npStats.msgDropRate > 0); assertTrue(sub1Stats.msgDropRate > 0); assertTrue(sub2Stats.msgDropRate > 0); diff --git a/pulsar-broker/src/test/resources/log4j.properties b/pulsar-broker/src/test/resources/log4j.properties index 8fbbe7e99ac0c..394e77f5f494f 100644 --- a/pulsar-broker/src/test/resources/log4j.properties +++ b/pulsar-broker/src/test/resources/log4j.properties @@ -23,12 +23,12 @@ # # Format is " (, )+ -log4j.rootLogger=OFF, CONSOLE +log4j.rootLogger=INFO, CONSOLE log4j.logger.org.apache.zookeeper=OFF log4j.logger.org.apache.bookkeeper=OFF -log4j.logger.org.apache.pulsar=OFF +log4j.logger.org.apache.pulsar=INFO log4j.logger.org.testng.listener.TestListener=INFO diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentTopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentTopicStats.java index dbed2351856a6..c154426efa96f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentTopicStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentTopicStats.java @@ -64,4 +64,16 @@ public NonPersistentTopicStats add(NonPersistentTopicStats stats) { this.msgDropRate += stats.msgDropRate; return this; } + + public List getPublishers() { + return this.publishers; + } + + public Map getSubscriptions() { + return this.subscriptions; + } + + public Map getReplication() { + return this.replication; + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java index 60bee0c564acb..ce3f67e3da03f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java @@ -46,7 +46,7 @@ public class PublisherStats { /** Client library version */ public String clientVersion; - + public PublisherStats add(PublisherStats stats) { checkNotNull(stats); this.msgRateIn += stats.msgRateIn; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ReplicatorStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ReplicatorStats.java index 4914596d6f34e..9428f061d2a0f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ReplicatorStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ReplicatorStats.java @@ -57,7 +57,7 @@ public class ReplicatorStats { /** Timestamp of outbound connection establishment time */ public String outboundConnectedSince; - + public ReplicatorStats add(ReplicatorStats stats) { checkNotNull(stats); this.msgRateIn += stats.msgRateIn; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index a83c94a08ed30..4e34950e37a4d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -55,7 +55,7 @@ public class SubscriptionStats { /** List of connected consumers on this subscription w/ their stats */ public List consumers; - + public SubscriptionStats() { this.consumers = Lists.newArrayList(); }