Skip to content

Commit

Permalink
formating + batch-number counting for msg-drop + ack-publish on same …
Browse files Browse the repository at this point in the history
…thread
  • Loading branch information
rdhabalia committed Jul 27, 2017
1 parent 669e7a9 commit 50fdd31
Show file tree
Hide file tree
Showing 16 changed files with 69 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,7 @@ public NonPersistentTopicStats getStats(@PathParam("property") String property,
DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
validateAdminOperationOnDestination(dn, authoritative);
Topic topic = getTopicReference(dn);
if(topic instanceof NonPersistentTopic) {
return ((NonPersistentTopic)topic).getStats();
}else {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Can not get stats for persistent topic");
}
return ((NonPersistentTopic)topic).getStats();
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,11 +775,6 @@ public void skipAllMessages(@PathParam("property") String property, @PathParam("
}
} else {
validateAdminOperationOnDestination(dn, authoritative);
if (!(getTopicReference(dn) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), dn, subName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Skip messages on a non-persistent topic is not allowed");
}
PersistentTopic topic = (PersistentTopic) getTopicReference(dn);
try {
if (subName.startsWith(topic.replicatorPrefix)) {
Expand Down Expand Up @@ -820,11 +815,6 @@ public void skipMessages(@PathParam("property") String property, @PathParam("clu
throw new RestException(Status.METHOD_NOT_ALLOWED, "Skip messages on a partitioned topic is not allowed");
}
validateAdminOperationOnDestination(dn, authoritative);
if (!(getTopicReference(dn) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), dn, subName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Skip messages on a non-persistent topic is not allowed");
}
PersistentTopic topic = (PersistentTopic) getTopicReference(dn);
try {
if (subName.startsWith(topic.replicatorPrefix)) {
Expand Down Expand Up @@ -887,11 +877,6 @@ public void expireMessagesForAllSubscriptions(@PathParam("property") String prop
} else {
// validate ownership and redirect if current broker is not owner
validateAdminOperationOnDestination(dn, authoritative);
if (!(getTopicReference(dn) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {}", clientAppId(), dn);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Expire messages on a non-persistent topic is not allowed");
}
PersistentTopic topic = (PersistentTopic) getTopicReference(dn);
topic.getReplicators().forEach((subName, replicator) -> {
expireMessages(property, cluster, namespace, destination, subName, expireTimeInSeconds, authoritative);
Expand Down Expand Up @@ -951,11 +936,6 @@ public void resetCursor(@PathParam("property") String property, @PathParam("clus
validateAdminOperationOnDestination(dn, authoritative);
log.info("[{}][{}] received reset cursor on subscription {} to time {}", clientAppId(), destination,
subName, timestamp);
if (!(getTopicReference(dn) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), dn, subName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Reset cursor on a non-persistent topic is not allowed");
}
PersistentTopic topic = (PersistentTopic) getTopicReference(dn);
try {
PersistentSubscription sub = topic.getSubscription(subName);
Expand Down Expand Up @@ -1136,13 +1116,7 @@ public MessageId terminate(@PathParam("property") String property, @PathParam("c
validateAdminOperationOnDestination(dn, authoritative);
Topic topic = getTopicReference(dn);
try {
if (topic instanceof PersistentTopic) {
return ((PersistentTopic) topic).terminate().get();
} else {
log.error("[{}] Not supported operation of non-persistent topic {}", clientAppId(), dn);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Expire messages on a non-persistent topic is not allowed");
}
return ((PersistentTopic) topic).terminate().get();
} catch (Exception exception) {
log.error("[{}] Failed to terminated topic {}", clientAppId(), dn, exception);
throw new RestException(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ private void incrementUnackedMessages(int ackedMessages) {
}
}

int getBatchSizeforEntry(ByteBuf metadataAndPayload) {
public static int getBatchSizeforEntry(ByteBuf metadataAndPayload, String subscription, long consumerId) {
try {
// save the reader index and restore after parsing
metadataAndPayload.markReaderIndex();
Expand All @@ -244,7 +244,7 @@ int updatePermitsAndPendingAcks(final List<Entry> entries) throws PulsarServerEx
while (iter.hasNext()) {
Entry entry = iter.next();
ByteBuf metadataAndPayload = entry.getDataBuffer();
int batchSize = getBatchSizeforEntry(metadataAndPayload);
int batchSize = getBatchSizeforEntry(metadataAndPayload, subscription.toString(), consumerId);
if (batchSize == -1) {
// this would suggest that the message might have been corrupted
iter.remove();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class Producer {
private final String appId;
private Rate msgIn;
// it records msg-drop rate only for non-persistent topic
private Rate msgDrop;
private final Rate msgDrop;

private volatile long pendingPublishAcks = 0;
private static final AtomicLongFieldUpdater<Producer> pendingPublishAcksUpdater = AtomicLongFieldUpdater
Expand All @@ -81,9 +81,8 @@ public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName
this.closeFuture = new CompletableFuture<>();
this.appId = appId;
this.msgIn = new Rate();
this.msgDrop = new Rate();
this.isNonPersistentTopic = topic instanceof NonPersistentTopic;

this.msgDrop = this.isNonPersistentTopic ? new Rate() : null;
this.stats = isNonPersistentTopic ? new NonPersistentPublisherStats() : new PublisherStats();
stats.address = cnx.clientAddress().toString();
stats.connectedSince = DATE_FORMAT.format(Instant.now());
Expand Down Expand Up @@ -181,8 +180,10 @@ private void publishOperationCompleted() {
}
}

public void recordMessageDrop() {
msgDrop.recordEvent();
public void recordMessageDrop(int batchSize) {
if (this.isNonPersistentTopic) {
msgDrop.recordEvent(batchSize);
}
}

private static final class MessagePublishedCallback implements PublishCallback, Runnable {
Expand Down Expand Up @@ -347,11 +348,11 @@ public CompletableFuture<Void> disconnect() {

public void updateRates() {
msgIn.calculateRate();
msgDrop.calculateRate();
stats.msgRateIn = msgIn.getRate();
stats.msgThroughputIn = msgIn.getValueRate();
stats.averageMsgSize = msgIn.getAverageValue();
if (stats instanceof NonPersistentPublisherStats) {
if (this.isNonPersistentTopic) {
msgDrop.calculateRate();
((NonPersistentPublisherStats) stats).msgDropRate = msgDrop.getRate();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
service.getTopicOrderedExecutor().submitOrdered(producer.getTopic(), SafeRun.safeRun(() -> {
ctx.writeAndFlush(Commands.newSendReceipt(producerId, sequenceId, -1, -1), ctx.voidPromise());
}));
producer.recordMessageDrop();
producer.recordMessageDrop(send.getNumMessages());
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import static org.apache.pulsar.broker.service.Consumer.getBatchSizeforEntry;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.utils.CopyOnWriteArrayList;
import org.slf4j.Logger;
Expand Down Expand Up @@ -146,8 +147,13 @@ public void sendMessages(List<Entry> entries) {
if (consumer != null) {
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -consumer.sendMessages(entries).getRight());
} else {
msgDrop.recordEvent(entries.size());
entries.forEach(Entry::release);
entries.forEach(entry -> {
int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), name, -1);
if (totalMsgs > 0) {
msgDrop.recordEvent();
}
entry.release();
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.service.nonpersistent;

import static org.apache.pulsar.broker.service.Consumer.getBatchSizeforEntry;

import java.util.List;

import org.apache.bookkeeper.mledger.Entry;
Expand All @@ -31,10 +33,12 @@
public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer implements NonPersistentDispatcher {

private final Rate msgDrop;
private final String name;

public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
NonPersistentTopic topic) {
NonPersistentTopic topic, String subName) {
super(subscriptionType, partitionIndex, topic.getName());
this.name = topic.getName() + " / " + subName;
this.msgDrop = new Rate();
}

Expand All @@ -44,8 +48,13 @@ public void sendMessages(List<Entry> entries) {
if (currentConsumer != null && currentConsumer.getAvailablePermits() > 0 && currentConsumer.isWritable()) {
currentConsumer.sendMessages(entries);
} else {
msgDrop.recordEvent(entries.size());
entries.forEach(Entry::release);
entries.forEach(entry -> {
int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), name, -1);
if (totalMsgs > 0) {
msgDrop.recordEvent();
}
entry.release();
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
switch (consumer.subType()) {
case Exclusive:
if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) {
dispatcher = new NonPersistentDispatcherSingleActiveConsumer(SubType.Exclusive, 0, topic);
dispatcher = new NonPersistentDispatcherSingleActiveConsumer(SubType.Exclusive, 0, topic, this.subName);
}
break;
case Shared:
Expand All @@ -95,7 +95,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce

if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
dispatcher = new NonPersistentDispatcherSingleActiveConsumer(SubType.Failover, partitionIndex,
topic);
topic, this.subName);
}
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ public NonPersistentTopicStats getStats() {
if (producer.isRemote()) {
remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
} else {
stats.publishers.add(publisherStats);
stats.getPublishers().add(publisherStats);
}
});

Expand All @@ -760,7 +760,7 @@ public NonPersistentTopicStats getStats() {

stats.msgRateOut += subStats.msgRateOut;
stats.msgThroughputOut += subStats.msgThroughputOut;
stats.subscriptions.put(name, subStats);
stats.getSubscriptions().put(name, subStats);
});

replicators.forEach((cluster, replicator) -> {
Expand All @@ -778,7 +778,7 @@ public NonPersistentTopicStats getStats() {
stats.msgRateOut += ReplicatorStats.msgRateOut;
stats.msgThroughputOut += ReplicatorStats.msgThroughputOut;

stats.replication.put(replicator.getRemoteCluster(), ReplicatorStats);
stats.getReplication().put(replicator.getRemoteCluster(), ReplicatorStats);
});

return stats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1836,9 +1836,9 @@ public void nonPersistentTopics() throws Exception {
publishMessagesOnPersistentTopic("non-persistent://prop-xyz/use/ns1/" + topicName, 10);

NonPersistentTopicStats topicStats = admin.nonPersistentTopics().getStats(persistentTopicName);
assertEquals(topicStats.subscriptions.keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub")));
assertEquals(topicStats.subscriptions.get("my-sub").consumers.size(), 1);
assertEquals(topicStats.publishers.size(), 0);
assertEquals(topicStats.getSubscriptions().keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub")));
assertEquals(topicStats.getSubscriptions().get("my-sub").consumers.size(), 1);
assertEquals(topicStats.getPublishers().size(), 0);

PersistentTopicInternalStats internalStats = admin.nonPersistentTopics().getInternalStats(persistentTopicName);
assertEquals(internalStats.cursors.keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub")));
Expand All @@ -1847,8 +1847,8 @@ public void nonPersistentTopics() throws Exception {
client.close();

topicStats = admin.nonPersistentTopics().getStats(persistentTopicName);
assertTrue(topicStats.subscriptions.keySet().contains("my-sub"));
assertEquals(topicStats.publishers.size(), 0);
assertTrue(topicStats.getSubscriptions().keySet().contains("my-sub"));
assertEquals(topicStats.getPublishers().size(), 0);

// test partitioned-topic
final String partitionedTopicName = "non-persistent://prop-xyz/use/ns1/paritioned";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,10 +397,10 @@ public void testTopicStats() throws Exception {

rolloverPerIntervalStats(pulsar);
stats = topicRef.getStats();
subStats = stats.subscriptions.values().iterator().next();
subStats = stats.getSubscriptions().values().iterator().next();

// subscription stats
assertEquals(stats.subscriptions.keySet().size(), 1);
assertEquals(stats.getSubscriptions().keySet().size(), 1);
assertEquals(subStats.consumers.size(), 1);

Producer producer = pulsarClient.createProducer(topicName);
Expand All @@ -415,7 +415,7 @@ public void testTopicStats() throws Exception {

rolloverPerIntervalStats(pulsar);
stats = topicRef.getStats();
subStats = stats.subscriptions.values().iterator().next();
subStats = stats.getSubscriptions().values().iterator().next();

assertTrue(subStats.msgRateOut > 0);
assertEquals(subStats.consumers.size(), 1);
Expand Down Expand Up @@ -476,10 +476,10 @@ public void testReplicator() throws Exception {

rolloverPerIntervalStats(replicationPulasr);
stats = topicRef.getStats();
subStats = stats.subscriptions.values().iterator().next();
subStats = stats.getSubscriptions().values().iterator().next();

// subscription stats
assertEquals(stats.subscriptions.keySet().size(), 2);
assertEquals(stats.getSubscriptions().keySet().size(), 2);
assertEquals(subStats.consumers.size(), 1);

Thread.sleep(timeWaitToSync);
Expand Down Expand Up @@ -547,7 +547,7 @@ public void testReplicator() throws Exception {

rolloverPerIntervalStats(replicationPulasr);
stats = topicRef.getStats();
subStats = stats.subscriptions.values().iterator().next();
subStats = stats.getSubscriptions().values().iterator().next();

assertTrue(subStats.msgRateOut > 0);
assertEquals(subStats.consumers.size(), 1);
Expand Down Expand Up @@ -762,9 +762,9 @@ public void testMsgDropStat() throws Exception {
NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getTopic(topicName).get();
pulsar.getBrokerService().updateRates();
NonPersistentTopicStats stats = topic.getStats();
NonPersistentPublisherStats npStats = stats.publishers.get(0);
NonPersistentSubscriptionStats sub1Stats = stats.subscriptions.get("subscriber-1");
NonPersistentSubscriptionStats sub2Stats = stats.subscriptions.get("subscriber-2");
NonPersistentPublisherStats npStats = stats.getPublishers().get(0);
NonPersistentSubscriptionStats sub1Stats = stats.getSubscriptions().get("subscriber-1");
NonPersistentSubscriptionStats sub2Stats = stats.getSubscriptions().get("subscriber-2");
assertTrue(npStats.msgDropRate > 0);
assertTrue(sub1Stats.msgDropRate > 0);
assertTrue(sub2Stats.msgDropRate > 0);
Expand Down
4 changes: 2 additions & 2 deletions pulsar-broker/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
#
# Format is "<default threshold> (, <appender>)+

log4j.rootLogger=OFF, CONSOLE
log4j.rootLogger=INFO, CONSOLE

log4j.logger.org.apache.zookeeper=OFF
log4j.logger.org.apache.bookkeeper=OFF

log4j.logger.org.apache.pulsar=OFF
log4j.logger.org.apache.pulsar=INFO

log4j.logger.org.testng.listener.TestListener=INFO

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,16 @@ public NonPersistentTopicStats add(NonPersistentTopicStats stats) {
this.msgDropRate += stats.msgDropRate;
return this;
}

public List<NonPersistentPublisherStats> getPublishers() {
return this.publishers;
}

public Map<String, NonPersistentSubscriptionStats> getSubscriptions() {
return this.subscriptions;
}

public Map<String, NonPersistentReplicatorStats> getReplication() {
return this.replication;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class PublisherStats {

/** Client library version */
public String clientVersion;

public PublisherStats add(PublisherStats stats) {
checkNotNull(stats);
this.msgRateIn += stats.msgRateIn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class ReplicatorStats {

/** Timestamp of outbound connection establishment time */
public String outboundConnectedSince;

public ReplicatorStats add(ReplicatorStats stats) {
checkNotNull(stats);
this.msgRateIn += stats.msgRateIn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class SubscriptionStats {

/** List of connected consumers on this subscription w/ their stats */
public List<ConsumerStats> consumers;

public SubscriptionStats() {
this.consumers = Lists.newArrayList();
}
Expand Down

0 comments on commit 50fdd31

Please sign in to comment.