From 9343a35255c4f27fa8551d0a96f7b62ede4d6e46 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Mon, 21 Nov 2022 15:31:19 +0800 Subject: [PATCH] [fix][schema] Fix cherry-pick issue from #18283 (#18555) --- .../service/persistent/PersistentTopic.java | 3 +- .../persistent/PersistentTopicTest.java | 72 ++++++++++++++++++- 2 files changed, 71 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 8c10f6ca20c84..ba5a11444a6b6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1181,7 +1181,8 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, closeClientFuture.thenAccept(delete -> { CompletableFuture deleteTopicAuthenticationFuture = new CompletableFuture<>(); brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5); - deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema()) + deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema ? deleteSchema() : + CompletableFuture.completedFuture(null)) .thenCompose(__ -> deleteTopicPolicies()) .thenCompose(__ -> transactionBufferCleanupAndClose()) .whenComplete((v, ex) -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index b906941770413..056208c6798f0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -29,14 +29,13 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; - +import com.google.common.collect.Sets; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; - -import com.google.common.collect.Sets; +import lombok.Data; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.service.BrokerTestBase; @@ -44,6 +43,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -51,6 +51,7 @@ import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TopicStats; import org.awaitility.Awaitility; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -271,4 +272,69 @@ public void testPersistentPartitionedTopicUnload() throws Exception { producer.close(); } } + + @Test + public void testCreateSchemaAfterDeletion() throws Exception { + //init namespace + final String myNamespace = "prop/ns"; + admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); + final String topicName = "persistent://prop/ns/test-create-schema-after-deletion" + UUID.randomUUID(); + + // create namespace + // Create a topic with `Person` + try (Producer producer = pulsarClient.newProducer(Schema.AVRO(Person.class)) + .topic(topicName) + .create() + ) { + Person person = new Person(); + person.setName("Tom Hanks"); + person.setAge(60); + + producer.send(person); + + } + + // delete the topic + admin.topics().delete(topicName); + + try (Producer ignored = pulsarClient.newProducer(Schema.AVRO(Student.class)) + .topic(topicName) + .create()) { + Assert.fail("Should fail to create a the producer with a new schema since the schema is not deleted."); + } catch (PulsarClientException pce) { + Assert.assertTrue(pce instanceof PulsarClientException.IncompatibleSchemaException); + } + + // delete the schema + admin.schemas().deleteSchema(topicName); + + // after deleting the schema, try to create a topic with a different schema + try (Producer producer = pulsarClient.newProducer(Schema.AVRO(Student.class)) + .topic(topicName) + .create() + ) { + Student student = new Student(); + student.setName("Tom Jerry"); + student.setAge(30); + student.setGpa(10); + + producer.send(student); + + } + } + + @Data + public static class Student { + private String name; + private int age; + private int gpa; + private int grade; + + } + + @Data + public static class Person { + private String name; + private int age; + } }