diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 8c10f6ca20c84..ba5a11444a6b6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1181,7 +1181,8 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, closeClientFuture.thenAccept(delete -> { CompletableFuture deleteTopicAuthenticationFuture = new CompletableFuture<>(); brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5); - deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema()) + deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema ? deleteSchema() : + CompletableFuture.completedFuture(null)) .thenCompose(__ -> deleteTopicPolicies()) .thenCompose(__ -> transactionBufferCleanupAndClose()) .whenComplete((v, ex) -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index b906941770413..056208c6798f0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -29,14 +29,13 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; - +import com.google.common.collect.Sets; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; - -import com.google.common.collect.Sets; +import lombok.Data; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.service.BrokerTestBase; @@ -44,6 +43,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -51,6 +51,7 @@ import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TopicStats; import org.awaitility.Awaitility; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -271,4 +272,69 @@ public void testPersistentPartitionedTopicUnload() throws Exception { producer.close(); } } + + @Test + public void testCreateSchemaAfterDeletion() throws Exception { + //init namespace + final String myNamespace = "prop/ns"; + admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); + final String topicName = "persistent://prop/ns/test-create-schema-after-deletion" + UUID.randomUUID(); + + // create namespace + // Create a topic with `Person` + try (Producer producer = pulsarClient.newProducer(Schema.AVRO(Person.class)) + .topic(topicName) + .create() + ) { + Person person = new Person(); + person.setName("Tom Hanks"); + person.setAge(60); + + producer.send(person); + + } + + // delete the topic + admin.topics().delete(topicName); + + try (Producer ignored = pulsarClient.newProducer(Schema.AVRO(Student.class)) + .topic(topicName) + .create()) { + Assert.fail("Should fail to create a the producer with a new schema since the schema is not deleted."); + } catch (PulsarClientException pce) { + Assert.assertTrue(pce instanceof PulsarClientException.IncompatibleSchemaException); + } + + // delete the schema + admin.schemas().deleteSchema(topicName); + + // after deleting the schema, try to create a topic with a different schema + try (Producer producer = pulsarClient.newProducer(Schema.AVRO(Student.class)) + .topic(topicName) + .create() + ) { + Student student = new Student(); + student.setName("Tom Jerry"); + student.setAge(30); + student.setGpa(10); + + producer.send(student); + + } + } + + @Data + public static class Student { + private String name; + private int age; + private int gpa; + private int grade; + + } + + @Data + public static class Person { + private String name; + private int age; + } }