From 6ceb9e6f7489d74ace53915d461e50321edac5f4 Mon Sep 17 00:00:00 2001 From: lenovo Date: Mon, 2 Mar 2020 21:56:24 +0800 Subject: [PATCH 1/3] Add configuration to disable auto creation of subscriptions. --- conf/broker.conf | 3 +++ conf/standalone.conf | 3 +++ .../pulsar/broker/ServiceConfiguration.java | 5 +++++ .../broker/service/BrokerServiceException.java | 6 ++++++ .../pulsar/broker/service/ServerCnx.java | 13 +++++++++++-- .../BrokerServiceAutoTopicCreationTest.java | 18 ++++++++++++++++++ .../pulsar/functions/worker/WorkerService.java | 5 +++++ site2/docs/reference-configuration.md | 1 + 8 files changed, 52 insertions(+), 2 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index bfb7f7ceee12d..59898800773c9 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -91,6 +91,9 @@ allowAutoTopicCreation=true # The type of topic that is allowed to be automatically created.(partitioned/non-partitioned) allowAutoTopicCreationType=non-partitioned +# Enable subscription auto creation if new consumer connected (disable auto creation with value false) +allowAutoSubscriptionCreation=true + # The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned. defaultNumPartitions=1 diff --git a/conf/standalone.conf b/conf/standalone.conf index 7dff0c3c913ab..f351d8d2bff5f 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -632,6 +632,9 @@ allowAutoTopicCreation=true # The type of topic that is allowed to be automatically created.(partitioned/non-partitioned) allowAutoTopicCreationType=non-partitioned +# Enable subscription auto creation if new consumer connected (disable auto creation with value false) +allowAutoSubscriptionCreation=true + # The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned. defaultNumPartitions=1 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 86da1027f34d6..0d554f0c5dc74 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -998,6 +998,11 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)" ) private String allowAutoTopicCreationType = "non-partitioned"; + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "Allow automated creation of subscriptions if set to true (default value)." + ) + private boolean allowAutoSubscriptionCreation = true; @FieldContext( category = CATEGORY_STORAGE_ML, doc = "The number of partitioned topics that is allowed to be automatically created" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index e794718c5fb1b..b4bfed518ad3c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -123,6 +123,12 @@ public TopicNotFoundException(String msg) { } } + public static class SubscriptionNotFoundException extends BrokerServiceException { + public SubscriptionNotFoundException(String msg) { + super(msg); + } + } + public static class SubscriptionBusyException extends BrokerServiceException { public SubscriptionBusyException(String msg) { super(msg); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index bef63fc7f5b0a..436e05bbf9e20 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -58,6 +58,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; +import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; @@ -720,7 +721,6 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(), subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex()) : null; - final String subscription = subscribe.getSubscription(); final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0; final boolean readCompacted = subscribe.getReadCompacted(); final Map metadata = CommandUtils.metadataFromCommand(subscribe); @@ -746,7 +746,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { if (service.isAuthorizationEnabled()) { authorizationFuture = service.getAuthorizationService().canConsumeAsync(topicName, originalPrincipal != null ? originalPrincipal : authRole, authenticationData, - subscription); + subscriptionName); } else { authorizationFuture = CompletableFuture.completedFuture(true); } @@ -809,6 +809,15 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { Topic topic = optTopic.get(); + boolean rejectSubscriptionIfDoesNotExist = isDurable + && !service.pulsar().getConfig().isAllowAutoSubscriptionCreation() + && !topic.getSubscriptions().containsKey(subscriptionName); + + if (rejectSubscriptionIfDoesNotExist) { + return FutureUtil + .failedFuture(new SubscriptionNotFoundException("Subscription does not exist")); + } + if (schema != null) { return topic.addSchemaIfIdleOrCheckCompatible(schema) .thenCompose(v -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java index 929f9c24133fc..9ad764451cb37 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java @@ -104,6 +104,24 @@ public void testAutoTopicCreationDisableIfNonPartitionedTopicAlreadyExist() thro assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName)); } + @Test + public void testAutoSubscriptionCreationDisable() throws Exception{ + pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false); + + final String topicName = "persistent://prop/ns-abc/test-topic"; + final String subscriptionName = "test-topic-sub"; + + admin.topics().createNonPartitionedTopic(topicName); + + try { + pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); + fail("Subscribe operation should have failed"); + } catch (Exception e) { + assertTrue(e instanceof PulsarClientException); + } + assertFalse(admin.topics().getSubscriptions(topicName).contains(subscriptionName)); + } + /** * CheckAllowAutoCreation's default value is false. * So using getPartitionedTopicMetadata() directly will not produce partitioned topic diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index fdfb7dec9b626..b23c707cc5c3a 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -34,6 +34,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -162,6 +163,10 @@ public void start(URI dlogUri, this.connectorsManager = new ConnectorsManager(workerConfig); //create membership manager + String coordinationTopic = workerConfig.getClusterCoordinationTopic(); + if (!brokerAdmin.topics().getSubscriptions(coordinationTopic).contains(MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION)) { + brokerAdmin.topics().createSubscription(coordinationTopic, MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION, MessageId.earliest); + } this.membershipManager = new MembershipManager(this, this.client, this.brokerAdmin); // create function runtime manager diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md index cdfe0d419a70d..edc3f996451b3 100644 --- a/site2/docs/reference-configuration.md +++ b/site2/docs/reference-configuration.md @@ -131,6 +131,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di |backlogQuotaDefaultLimitGB| The default per-topic backlog quota limit | -1 | |allowAutoTopicCreation| Enable topic auto creation if a new producer or consumer connected |true| |allowAutoTopicCreationType| The topic type (partitioned or non-partitioned) that is allowed to be automatically created. |Partitioned| +|allowAutoSubscriptionCreation| Enable subscription auto creation if a new consumer connected |true| |defaultNumPartitions| The number of partitioned topics that is allowed to be automatically created if `allowAutoTopicCreationType` is partitioned |1| |brokerDeleteInactiveTopicsEnabled| Enable the deletion of inactive topics |true| |brokerDeleteInactiveTopicsFrequencySeconds| How often to check for inactive topics |60| From 12630cab0fa6bd8a1c2b1f13c8d42490da77be12 Mon Sep 17 00:00:00 2001 From: lenovo Date: Tue, 3 Mar 2020 11:23:28 +0800 Subject: [PATCH 2/3] Add a unit test to cover subscription creation --- .../BrokerServiceAutoTopicCreationTest.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java index 9ad764451cb37..e5b7e0c8bdfcf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -122,6 +123,24 @@ public void testAutoSubscriptionCreationDisable() throws Exception{ assertFalse(admin.topics().getSubscriptions(topicName).contains(subscriptionName)); } + @Test + public void testSubscriptionCreationWithAutoCreationDisable() throws Exception{ + pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false); + + final String topicName = "persistent://prop/ns-abc/test-topic"; + final String subscriptionName = "test-topic-sub"; + + admin.topics().createNonPartitionedTopic(topicName); + assertFalse(admin.topics().getSubscriptions(topicName).contains(subscriptionName)); + + // Create the subscription by PulsarAdmin + admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest); + assertTrue(admin.topics().getSubscriptions(topicName).contains(subscriptionName)); + + // Subscribe operation should be successful + pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); + } + /** * CheckAllowAutoCreation's default value is false. * So using getPartitionedTopicMetadata() directly will not produce partitioned topic From 5bc343d8ffd20a8a8627591e18d4036ac30c7383 Mon Sep 17 00:00:00 2001 From: lenovo Date: Tue, 3 Mar 2020 14:21:20 +0800 Subject: [PATCH 3/3] Fix the flaky test --- .../BrokerServiceAutoTopicCreationTest.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java index e5b7e0c8bdfcf..d595a64648cd6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java @@ -109,8 +109,8 @@ public void testAutoTopicCreationDisableIfNonPartitionedTopicAlreadyExist() thro public void testAutoSubscriptionCreationDisable() throws Exception{ pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false); - final String topicName = "persistent://prop/ns-abc/test-topic"; - final String subscriptionName = "test-topic-sub"; + final String topicName = "persistent://prop/ns-abc/test-subtopic"; + final String subscriptionName = "test-subtopic-sub"; admin.topics().createNonPartitionedTopic(topicName); @@ -121,14 +121,17 @@ public void testAutoSubscriptionCreationDisable() throws Exception{ assertTrue(e instanceof PulsarClientException); } assertFalse(admin.topics().getSubscriptions(topicName).contains(subscriptionName)); + + // Reset to default + pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true); } @Test public void testSubscriptionCreationWithAutoCreationDisable() throws Exception{ pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false); - final String topicName = "persistent://prop/ns-abc/test-topic"; - final String subscriptionName = "test-topic-sub"; + final String topicName = "persistent://prop/ns-abc/test-subtopic"; + final String subscriptionName = "test-subtopic-sub"; admin.topics().createNonPartitionedTopic(topicName); assertFalse(admin.topics().getSubscriptions(topicName).contains(subscriptionName)); @@ -139,6 +142,9 @@ public void testSubscriptionCreationWithAutoCreationDisable() throws Exception{ // Subscribe operation should be successful pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); + + // Reset to default + pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true); } /**