Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-2: Introduce non-persistent topics #538

Merged
merged 11 commits into from
Jul 28, 2017

Conversation

rdhabalia
Copy link
Contributor

Motivation

Replacing #476 (due to rebase conflicts and namespace-configuration implementation) , and creating this PR on top of #452.

Modifications

Broker can be configured to start in a mode:

  • Load only persistent topic
  • Load only non-persistent topic
  • Load both kind of topics

Load-manager gets topic-lookup request and assigns it to broker which supports that topic. Broker fails to load topics if it doesn't support that type of topic.

Result

Broker can start in a mode where it can serve only non-persistent topic.

@rdhabalia rdhabalia added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Jun 28, 2017
@rdhabalia rdhabalia added this to the 1.19 milestone Jun 28, 2017
@rdhabalia rdhabalia self-assigned this Jun 28, 2017
@saandrews
Copy link
Contributor

This commit is trying to do more than just making broker configurable to own non persistent topics. Or is it due to PR #452 is not merged yet?

@rdhabalia
Copy link
Contributor Author

This commit is trying to do more than just making broker configurable to own non persistent topics. Or is it due to PR #452 is not merged yet?

Yes, it has two commits but you can review this PR specific commits at db8e757c7

fail("topic loading should have failed");
} catch (Exception e) {
// Ok
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it attempt to create persistent topic and ensure it is ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually in this test we start broker in non-persistent mode and we try to load persistent topic which will fail because load-manager doesn't find any broker which support persistent-topic.

However, producer retries to do lookup and recreate but here, we have put the timeout on producer-creation which will timeout and completes the test.
pulsarClient.createProducerAsync(topicName, producerConf).get(1, TimeUnit.SECONDS);

}

}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a case where both type is enabled and topic creation succeeds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have two testcases: one with non-persistent enabled broker other one without it. In other cases broker has default configuration to load both types of topics.

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change looks good to me. I'd like to see a bit more of code sharing with the persistent version of the dispatcher/subscription/replicator.

@@ -42,6 +42,9 @@
private final String webServiceUrlTls;
private final String pulsarServiceUrl;
private final String pulsarServiceUrlTls;
private boolean enablePersistentTopics=true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor and probably a bit of nit-picking. Here it should be referring on wether the feature is "enabled" or "disabled", rather than be imperative ("enable").

Should we rename to persistentTopicsEnabled, nonPersistentTopicsEnabled?

@@ -35,6 +35,9 @@
private final NamespaceName nsname;
private final Range<Long> keyRange;
private final NamespaceBundleFactory factory;
// TODO: remove this once we remove broker persistent/non-persistent mode configuration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you create a github issue for the TODO and mention it here?

@@ -236,5 +238,9 @@ public int compareTo(EntryCache other) {

}

public static Entry create(long ledgerId, long entryId, ByteBuf data) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change still needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, for non-persistent topics we want to create Entry with specific ledgerId-entryId and payload so, it can be pass to other entities (eg: dispatcher, consumer) as those entities APIs requires a Entry wrapper to pass payload/msgId information.

@@ -103,6 +109,8 @@ protected String domain() {
return "topic";
} else if (uri.getPath().startsWith("persistent/")) {
return "persistent";
} else if (uri.getPath().startsWith("non-persistent/")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: should we remove queue and topic here?

return TOTAL_AVAILABLE_PERMITS_UPDATER.get(this) > 0;
}

private Consumer getNextConsumer() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this logic be shared with the persistent version of the dispatcher?
(Maybe using an abstract class?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will again visit it to share things across both the persistent/non-persistent entitites.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, you can introduce some base class like AbstractDispatcherMultipleConsumers with the common logic.

ACTIVE_CONSUMER_UPDATER.set(this, null);
}

private void pickAndScheduleActiveConsumer() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, most of these methods looks very similar to the "persistent" version. Share the same code if possible.


}

public void sendMessage(Entry entry) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to share any logic with the other replicator?

@merlimat merlimat changed the title Make broker configurable to own non-persistent topic Introduce non-persistent topics Jul 25, 2017
@merlimat merlimat changed the title Introduce non-persistent topics PIP-2: Introduce non-persistent topics Jul 25, 2017
@merlimat
Copy link
Contributor

@rdhabalia One small issue I'm seeing on the CLI tool:

$ bin/pulsar-admin non-persistent stats non-persistent://prop/cluster/ns/my-topic
Need to provide a persistent topic name

Usage: pulsar-admin non-persistent [options] [command] [command options]
  Commands:
....

@merlimat
Copy link
Contributor

One more exception in broker :

2017-07-25 11:53:32,957 - ERROR - [pulsar-stats-updater-73-1:PulsarStats@121] - Failed to generate topic stats for topic non-persistent://prop/cluster/ns/my-topic: org.apache.pulsar.common.policies.data.PublisherStats cannot be cast to org.apache.pulsar.common.policies.data.NonPersistentPublisherStats
java.lang.ClassCastException: org.apache.pulsar.common.policies.data.PublisherStats cannot be cast to org.apache.pulsar.common.policies.data.NonPersistentPublisherStats
	at org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.lambda$updateRates$23(NonPersistentTopic.java:607)
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet$Section.forEach(ConcurrentOpenHashSet.java:405)
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet.forEach(ConcurrentOpenHashSet.java:135)
	at org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.updateRates(NonPersistentTopic.java:605)
	at org.apache.pulsar.broker.service.PulsarStats.lambda$null$1(PulsarStats.java:118)
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:386)
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:160)
	at org.apache.pulsar.broker.service.PulsarStats.lambda$null$2(PulsarStats.java:116)
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:386)
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:160)
	at org.apache.pulsar.broker.service.PulsarStats.lambda$updateStats$3(PulsarStats.java:108)
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:386)
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:160)
	at org.apache.pulsar.broker.service.PulsarStats.updateStats(PulsarStats.java:98)
	at org.apache.pulsar.broker.service.BrokerService.updateRates(BrokerService.java:752)
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:30)
	at org.apache.bookkeeper.util.SafeRunnable.run(SafeRunnable.java:31)

@rdhabalia
Copy link
Contributor Author

rdhabalia commented Jul 25, 2017

One small issue I'm seeing on the CLI tool:

yes, thanks for catching it. I will fix it.

exception
java.lang.ClassCastException: org.apache.pulsar.common.policies.data.PublisherStats cannot be cast to org.apache.pulsar.common.policies.data.NonPersistentPublisherStats

Yes, yesterday, I have added commit to get messageDrop stats for non-persistent topic and added inherited stats-class for non-persistent topics, which caused this one. I will fix it as well.

@rdhabalia rdhabalia force-pushed the nonPersist_configure branch 4 times, most recently from 175db00 to f8a1192 Compare July 26, 2017 07:27
@merlimat
Copy link
Contributor

@rdhabalia One test failure seems genuine :

Tests run: 521, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 996.983 sec <<< FAILURE! - in TestSuite
testClosingReplicationProducerTwice(org.apache.pulsar.broker.service.PersistentTopicTest)  Time elapsed: 0.025 sec  <<< FAILURE!
java.lang.NoSuchFieldException: producerConfiguration
	at org.apache.pulsar.broker.service.PersistentTopicTest.testClosingReplicationProducerTwice(PersistentTopicTest.java:954)

@rdhabalia
Copy link
Contributor Author

testClosingReplicationProducerTwice(org.apache.pulsar.broker.service.PersistentTopicTest) Time elapsed: 0.025 sec <<< FAILURE!
java.lang.NoSuchFieldException: producerConfiguration

Actually I had fixed it as part of last commit, it was failing becauseit was trying to get field-variable using reflection which was moved to AbstractParentClass. Build is also green.

I have addressed all your comments. However, I still have to add stats documentation for non-persistent topic. Also, I have added msgDrop stat for publisher and subscription, which also shows up for persistent topic. Therefore, I was thinking to extend non-persistent stats from existing stats so, msgDrop doesn't show up for persistent-topic.

@rdhabalia rdhabalia force-pushed the nonPersist_configure branch 2 times, most recently from e35dba9 to 238f37c Compare July 27, 2017 01:59
@merlimat
Copy link
Contributor

@rdhabalia I'm running a 1 prod - 1 cons test with non-persistent. Everything local.

At the rate of 10K write/s, with no batching, I'm seeing the consumer is getting 1msg/s dropped. Ideally that should not happen since the system is not really overloaded.

@merlimat
Copy link
Contributor

$ bin/pulsar-admin non-persistent stats non-persistent://prop/cluster/ns/my-topic
{
  "msgRateIn" : 10000.319421984088,
  "msgThroughputIn" : 1.0550336990193212E7,
  "msgRateOut" : 9998.314769761299,
  "msgThroughputOut" : 1.054822208209817E7,
  "averageMsgSize" : 1055.0,
  "storageSize" : 0,
  "publishers" : [ {
    "msgRateIn" : 10000.319421984088,
    "msgThroughputIn" : 1.0550336990193212E7,
    "averageMsgSize" : 1055.0,
    "producerId" : 0,
    "producerName" : "standalone-1-3",
    "address" : "/127.0.0.1:51246",
    "connectedSince" : "2017-07-27 11:10:49.446-0700",
    "clientVersion" : "1.19-incubating-SNAPSHOT",
    "msgDropRate" : 0.0
  } ],
  "subscriptions" : {
    "sub" : {
      "msgRateOut" : 9998.314769761299,
      "msgThroughputOut" : 1.054822208209817E7,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 0,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "unackedMessages" : 0,
      "type" : "Exclusive",
      "msgRateExpired" : 0.0,
      "consumers" : [ {
        "msgRateOut" : 9998.314769761299,
        "msgThroughputOut" : 1.054822208209817E7,
        "msgRateRedeliver" : 0.0,
        "consumerName" : "0ce8d",
        "availablePermits" : 791,
        "unackedMessages" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "address" : "/127.0.0.1:51080",
        "connectedSince" : "2017-07-27 11:07:36.213-0700",
        "clientVersion" : "1.19-incubating-SNAPSHOT"
      } ],
      "msgDropRate" : 1.9501263804427469
    }
  },
  "replication" : { },
  "msgDropRate" : 0.0
}

@rdhabalia
Copy link
Contributor Author

At the rate of 10K write/s, with no batching

I feel it's happening due to not available permits?

We drop the message for subscription in two cases:

  • not enough permits
  • socket is not writable

In this case, it seems like consumer is not having enough permits? Let me run perf test again with debug log to confirm the behavior.

@merlimat
Copy link
Contributor

I feel it's happening due to not available permits?

Not sure, I increased to 50K receiver queue size and it still happens

@merlimat
Copy link
Contributor

       "consumerName" : "28513",
        "availablePermits" : 417242,
        "unackedMessages" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "address" : "/127.0.0.1:52409",
        "connectedSince" : "2017-07-27 11:33:51.270-0700",
        "clientVersion" : "1.19-incubating-SNAPSHOT"
      } ],
      "msgDropRate" : 3.1666701082037405

@merlimat
Copy link
Contributor

This was even with 500K permits

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change looks overall good to me. Just the concern about dropped messages in seemingly normal conditions.

Also, I would say to mark it as "Experimental" in the documentation, to set the right expectation, and notice that implementation details might still be changed in future releases.

@@ -775,15 +775,20 @@ public void skipAllMessages(@PathParam("property") String property, @PathParam("
}
} else {
validateAdminOperationOnDestination(dn, authoritative);
PersistentTopic topic = getTopicReference(dn);
if (!(getTopicReference(dn) instanceof PersistentTopic)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to check if it's persistent topic? Shouldn't this already be guaranteed by having this handler on /admin/persistent/... ?

@@ -56,6 +58,8 @@
private final long producerId;
private final String appId;
private Rate msgIn;
// it records msg-drop rate only for non-persistent topic
private Rate msgDrop;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be final? And maybe we can set to null if it's persistent topic.

@@ -100,6 +102,8 @@ public ServerCnx(BrokerService service) {
this.producers = new ConcurrentLongHashMap<>(8, 1);
this.consumers = new ConcurrentLongHashMap<>(8, 1);
this.replicatorPrefix = service.pulsar().getConfiguration().getReplicatorPrefix();
this.nonPersistentMessageSemaphore = new Semaphore(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the events from the same connections are coming from same thread. Even when we write, we typically switch to the event loop thread for that connection.

Given that, the semaphore could just be a regular (non even volatile variable` that we check and increment. No need for thread safety.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, you mean instead semaphore we can have just a counter like pendingSendRequest, but then we need to decrement this counter on completedSendOperation() and that called from a different thread.
so, it might not be a thread-safe and it might not throttle with accurate configured number? right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the completedSendOperation() is already called within the io thread, so an addition unlocked variable should be fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, for non-persistent topic it is been called from ordered-executor thread. So, counter will be incremented by io-thread and decrement by ordered-thread. Not, sure if it would be thread-safe in terms throttling with correct count.
so, do you still think it would be fine to keep one unlock counter variable would be fine here or it should be atomic?

if (producer.isNonPersistentTopic() && !nonPersistentMessageSemaphore.tryAcquire()) {
final long producerId = send.getProducerId();
final long sequenceId = send.getSequenceId();
service.getTopicOrderedExecutor().submitOrdered(producer.getTopic(), SafeRun.safeRun(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why schedule it in ordered thread? We are in the io thread, we should write it already from this thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, we have to write it in ordered thread only else send-ack goes in random order and because of that client will not receive sequenceId in order and it will fail the pending msgs and close connection.

  • if msg doesn't throttle here then broker processes it by delivering it to subscribers in ordered thread and after completion that ordered thread sends ack back to client.
  • so, all the sending-ack operation has to be in ordered-thread only.

Copy link
Contributor

@merlimat merlimat Jul 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhm, but why wait for dispatching message to consumers before sending ack to producer?

I think the best option here is to send ack immediately to producer. We got the message in broker memory, that's the level of guarantee.

Later if the consumers are not ready, the message will be dropped.

if (--pendingSendRequest == ResumeReadsThreshold) {
// Resume reading from socket
ctx.channel().config().setAutoRead(true);
}
if (isNonPersistentTopic) {
nonPersistentMessageSemaphore.release();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I was mentioning above, the pendingSendRequest is already a counter of pending requests from the producer and it's not synchronized or a semaphore.

@@ -46,7 +46,7 @@

/** Client library version */
public String clientVersion;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

White spaces addition

@@ -57,7 +57,7 @@

/** Timestamp of outbound connection establishment time */
public String outboundConnectedSince;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

White spaces addition

@@ -55,8 +55,8 @@

/** List of connected consumers on this subscription w/ their stats */
public List<ConsumerStats> consumers;

public PersistentSubscriptionStats() {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

White spaces addition

@merlimat
Copy link
Contributor

@rdhabalia One other note: If I enable batching the drop-rate falls to 0.016 /s though I suspect it's only counting the number of dropped batches

@rdhabalia
Copy link
Contributor Author

At the rate of 10K write/s, with no batching

after adding more debug logs where we record-msg drop: found out that all the message-drop happened due to connection is not writable.
Before sending message we check permits/currentConsumer.isWritable() and 3 out of 20K message dropped due to connection is not writable as it fails on condition currentConsumer.isWritable()(cnx-IO thread is not ready to process the request)

@rdhabalia
Copy link
Contributor Author

If I enable batching the drop-rate falls to 0.016 /s though I suspect it's only counting the number of dropped batches

yes, we record msg-drop at dispatcher when consumer is not available/cnx-writable so, we don't deserialize message-metadata to get actual count.
But, it's a good point. we can deserialize entry to get actual count from batch-message. I will make that change.

@merlimat
Copy link
Contributor

@rdhabalia One other thing is what happens when a subscriptions has no more consumers available. Right now the subscription stays around. Is that the intended behavior? I feel it's not really needed to be kept, since messages will be dropped anyway.

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Nice work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants