Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 6394] Add configuration to disable auto creation of subscriptions #6456

Merged
merged 3 commits into from
Mar 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ allowAutoTopicCreation=true
# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
allowAutoTopicCreationType=non-partitioned

# Enable subscription auto creation if new consumer connected (disable auto creation with value false)
allowAutoSubscriptionCreation=true

# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
defaultNumPartitions=1

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,9 @@ allowAutoTopicCreation=true
# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
allowAutoTopicCreationType=non-partitioned

# Enable subscription auto creation if new consumer connected (disable auto creation with value false)
allowAutoSubscriptionCreation=true

# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
defaultNumPartitions=1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)"
)
private String allowAutoTopicCreationType = "non-partitioned";
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Allow automated creation of subscriptions if set to true (default value)."
)
private boolean allowAutoSubscriptionCreation = true;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "The number of partitioned topics that is allowed to be automatically created"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ public TopicNotFoundException(String msg) {
}
}

public static class SubscriptionNotFoundException extends BrokerServiceException {
public SubscriptionNotFoundException(String msg) {
super(msg);
}
}

public static class SubscriptionBusyException extends BrokerServiceException {
public SubscriptionBusyException(String msg) {
super(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
Expand Down Expand Up @@ -720,7 +721,6 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(),
subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex())
: null;
final String subscription = subscribe.getSubscription();
final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
final boolean readCompacted = subscribe.getReadCompacted();
final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
Expand All @@ -746,7 +746,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
if (service.isAuthorizationEnabled()) {
authorizationFuture = service.getAuthorizationService().canConsumeAsync(topicName,
originalPrincipal != null ? originalPrincipal : authRole, authenticationData,
subscription);
subscriptionName);
} else {
authorizationFuture = CompletableFuture.completedFuture(true);
}
Expand Down Expand Up @@ -809,6 +809,15 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {

Topic topic = optTopic.get();

boolean rejectSubscriptionIfDoesNotExist = isDurable
&& !service.pulsar().getConfig().isAllowAutoSubscriptionCreation()
&& !topic.getSubscriptions().containsKey(subscriptionName);

if (rejectSubscriptionIfDoesNotExist) {
return FutureUtil
.failedFuture(new SubscriptionNotFoundException("Subscription does not exist"));
}

if (schema != null) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(v -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -104,6 +105,48 @@ public void testAutoTopicCreationDisableIfNonPartitionedTopicAlreadyExist() thro
assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
}

@Test
public void testAutoSubscriptionCreationDisable() throws Exception{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@murong00

Can you add a test to cover the case - if allowAutoSubscriptionCreation is disabled people can still create a subscription using pulsar-admin?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);

final String topicName = "persistent://prop/ns-abc/test-subtopic";
final String subscriptionName = "test-subtopic-sub";

admin.topics().createNonPartitionedTopic(topicName);

try {
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
fail("Subscribe operation should have failed");
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException);
}
assertFalse(admin.topics().getSubscriptions(topicName).contains(subscriptionName));

// Reset to default
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
}

@Test
public void testSubscriptionCreationWithAutoCreationDisable() throws Exception{
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);

final String topicName = "persistent://prop/ns-abc/test-subtopic";
final String subscriptionName = "test-subtopic-sub";

admin.topics().createNonPartitionedTopic(topicName);
assertFalse(admin.topics().getSubscriptions(topicName).contains(subscriptionName));

// Create the subscription by PulsarAdmin
admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
assertTrue(admin.topics().getSubscriptions(topicName).contains(subscriptionName));

// Subscribe operation should be successful
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();

// Reset to default
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
}

/**
* CheckAllowAutoCreation's default value is false.
* So using getPartitionedTopicMetadata() directly will not produce partitioned topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;

Expand Down Expand Up @@ -162,6 +163,10 @@ public void start(URI dlogUri,
this.connectorsManager = new ConnectorsManager(workerConfig);

//create membership manager
String coordinationTopic = workerConfig.getClusterCoordinationTopic();
if (!brokerAdmin.topics().getSubscriptions(coordinationTopic).contains(MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION)) {
brokerAdmin.topics().createSubscription(coordinationTopic, MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION, MessageId.earliest);
}
this.membershipManager = new MembershipManager(this, this.client, this.brokerAdmin);

// create function runtime manager
Expand Down
1 change: 1 addition & 0 deletions site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|backlogQuotaDefaultLimitGB| The default per-topic backlog quota limit | -1 |
|allowAutoTopicCreation| Enable topic auto creation if a new producer or consumer connected |true|
|allowAutoTopicCreationType| The topic type (partitioned or non-partitioned) that is allowed to be automatically created. |Partitioned|
|allowAutoSubscriptionCreation| Enable subscription auto creation if a new consumer connected |true|
|defaultNumPartitions| The number of partitioned topics that is allowed to be automatically created if `allowAutoTopicCreationType` is partitioned |1|
|brokerDeleteInactiveTopicsEnabled| Enable the deletion of inactive topics |true|
|brokerDeleteInactiveTopicsFrequencySeconds| How often to check for inactive topics |60|
Expand Down