From e33238f899b82f049d1b1d4c1551225b0964cd53 Mon Sep 17 00:00:00 2001 From: ltamber Date: Sun, 16 Feb 2020 13:27:50 +0800 Subject: [PATCH] [Issue 5904]Support `unload` all partitions of a partitioned topic (#6187) Fixes #5904 ### Motivation Pulsar supports unload a non-partitioned-topic or a partition of a partitioned topic. If there has a partitioned topic with too many partitions, users need to get all partition and unload them one by one. We need to support unload all partition of a partitioned topic. (cherry picked from commit d35e6c1a711c686b0af8072bfff36b11214825ea) --- .../pulsar/broker/admin/AdminResource.java | 26 +++++++ .../admin/impl/PersistentTopicsBase.java | 71 ++++++++++++++----- .../broker/admin/v1/NonPersistentTopics.java | 18 ++--- .../broker/admin/v1/PersistentTopics.java | 7 +- .../broker/admin/v2/NonPersistentTopics.java | 13 ++-- .../broker/admin/v2/PersistentTopics.java | 11 ++- .../broker/admin/PersistentTopicsTest.java | 44 +++++++++--- 7 files changed, 146 insertions(+), 44 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index e9d559ef014e6..76b5aa6db210b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -552,6 +552,32 @@ protected ZooKeeperChildrenCache failureDomainListCache() { return pulsar().getConfigurationCache().failureDomainListCache(); } + protected CompletableFuture getPartitionedTopicMetadataAsync( + TopicName topicName, boolean authoritative, boolean checkAllowAutoCreation) { + validateClusterOwnership(topicName.getCluster()); + // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can + // serve/redirect request else fail partitioned-metadata-request so, client fails while creating + // producer/consumer + validateGlobalNamespaceOwnership(topicName.getNamespaceObject()); + + try { + checkConnect(topicName); + } catch (WebApplicationException e) { + validateAdminAccessForTenant(topicName.getTenant()); + } catch (Exception e) { + // unknown error marked as internal server error + log.warn("Unexpected error while authorizing lookup. topic={}, role={}. Error: {}", topicName, + clientAppId(), e.getMessage(), e); + return FutureUtil.failedFuture(e); + } + + if (checkAllowAutoCreation) { + return pulsar().getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName); + } else { + return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName); + } + } + protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicName, boolean authoritative, boolean checkAllowAutoCreation) { validateClusterOwnership(topicName.getCluster()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 0add56e311cc4..4a1021f9aa366 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -655,12 +655,63 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole }); } - protected void internalUnloadTopic(boolean authoritative) { + protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authoritative) { log.info("[{}] Unloading topic {}", clientAppId(), topicName); if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } - unloadTopic(topicName, authoritative); + + getPartitionedTopicMetadataAsync(topicName, authoritative, false).whenComplete((meta, t) -> { + if (meta.partitions > 0) { + final List> futures = Lists.newArrayList(); + + for (int i = 0; i < meta.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar().getAdminClient().topics().unloadAsync(topicNamePartition.toString())); + } catch (Exception e) { + log.error("[{}] Failed to unload topic {}", clientAppId(), topicNamePartition, e); + asyncResponse.resume(new RestException(e)); + return; + } + } + + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable th = exception.getCause(); + if (th instanceof NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage())); + } else { + log.error("[{}] Failed to unload topic {}", clientAppId(), topicName, exception); + asyncResponse.resume(new RestException(exception)); + } + return null; + } + + asyncResponse.resume(Response.noContent().build()); + return null; + }); + } else { + validateAdminAccessForTenant(topicName.getTenant()); + validateTopicOwnership(topicName, authoritative); + + Topic topic = getTopicReference(topicName); + topic.close(false).whenComplete((r, ex) -> { + if (ex != null) { + log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, ex.getMessage(), ex); + asyncResponse.resume(new RestException(ex)); + + } else { + log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName); + asyncResponse.resume(Response.noContent().build()); + } + }); + } + }).exceptionally(t -> { + Throwable th = t.getCause(); + asyncResponse.resume(new RestException(th)); + return null; + }); } protected void internalDeleteTopic(boolean authoritative, boolean force) { @@ -1893,22 +1944,6 @@ private CompletableFuture createSubscriptions(TopicName topicName, int num return result; } - protected void unloadTopic(TopicName topicName, boolean authoritative) { - validateSuperUserAccess(); - validateTopicOwnership(topicName, authoritative); - try { - Topic topic = getTopicReference(topicName); - topic.close(false).get(); - log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName); - } catch (NullPointerException e) { - log.error("[{}] topic {} not found", clientAppId(), topicName); - throw new RestException(Status.NOT_FOUND, "Topic does not exist"); - } catch (Exception e) { - log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, e.getMessage(), e); - throw new RestException(e); - } - } - // as described at : (PR: #836) CPP-client old client lib should not be allowed to connect on partitioned-topic. // So, all requests from old-cpp-client (< v1.21) must be rejected. // Pulsar client-java lib always passes user-agent as X-Java-$version. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java index 0179847590bb7..12b9622d7d430 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java @@ -161,16 +161,18 @@ public void createPartitionedTopic(@PathParam("property") String property, @Path @ApiOperation(hidden = true, value = "Unload a topic") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist") }) - public void unloadTopic(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + public void unloadTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateTopicName(property, cluster, namespace, encodedTopic); - log.info("[{}] Unloading topic {}", clientAppId(), topicName); - - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); + try { + validateTopicName(property, cluster, namespace, encodedTopic); + internalUnloadTopic(asyncResponse, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); } - unloadTopic(topicName, authoritative); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index f4fbbe0a4ad8f..9944ca36469da 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -213,11 +213,12 @@ public void deletePartitionedTopic(@Suspended final AsyncResponse asyncResponse, @ApiOperation(hidden = true, value = "Unload a topic") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist") }) - public void unloadTopic(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + public void unloadTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(property, cluster, namespace, encodedTopic); - internalUnloadTopic(authoritative); + internalUnloadTopic(asyncResponse, authoritative); } @DELETE diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index a41db33b9f19c..add815d13c02b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -214,6 +214,7 @@ public void createPartitionedTopic( @ApiResponse(code = 503, message = "Failed to validate global cluster configuration"), }) public void unloadTopic( + @Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Specify the tenant", required = true) @PathParam("tenant") String tenant, @ApiParam(value = "Specify the namespace", required = true) @@ -222,12 +223,14 @@ public void unloadTopic( @PathParam("topic") @Encoded String encodedTopic, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateTopicName(tenant, namespace, encodedTopic); - log.info("[{}] Unloading topic {}", clientAppId(), topicName); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); + try { + validateTopicName(tenant, namespace, encodedTopic); + internalUnloadTopic(asyncResponse, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); } - unloadTopic(topicName, authoritative); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 08411dd5a33d7..57dd7e1726236 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -365,6 +365,7 @@ public void deletePartitionedTopic( @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration") }) public void unloadTopic( + @Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Specify the tenant", required = true) @PathParam("tenant") String tenant, @ApiParam(value = "Specify the namespace", required = true) @@ -373,8 +374,14 @@ public void unloadTopic( @PathParam("topic") @Encoded String encodedTopic, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateTopicName(tenant, namespace, encodedTopic); - internalUnloadTopic(authoritative); + try { + validateTopicName(tenant, namespace, encodedTopic); + internalUnloadTopic(asyncResponse, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } } @DELETE diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index a4cd3258198ac..1825d313124e7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -276,18 +276,46 @@ public void testUpdatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix @Test public void testUnloadTopic() { final String topicName = "standard-topic-to-be-unload"; + final String partitionTopicName = "partition-topic-to-be-unload"; + + // 1) not exist topic + AsyncResponse response = mock(AsyncResponse.class); + persistentTopics.unloadTopic(response, testTenant, testNamespace, "topic-not-exist", true); + ArgumentCaptor errCaptor = ArgumentCaptor.forClass(RestException.class); + verify(response, timeout(5000).times(1)).resume(errCaptor.capture()); + Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode()); + + // 2) create non partitioned topic and unload + response = mock(AsyncResponse.class); persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true); - persistentTopics.unloadTopic(testTenant, testNamespace, topicName, true); + persistentTopics.unloadTopic(response, testTenant, testNamespace, topicName, true); + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + // 3) create partitioned topic and unload + response = mock(AsyncResponse.class); + persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionTopicName, 6); + persistentTopics.unloadTopic(response, testTenant, testNamespace, partitionTopicName, true); + responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + // 4) delete partitioned topic + response = mock(AsyncResponse.class); + persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, partitionTopicName, true, true); + responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); } - @Test(expectedExceptions = RestException.class) + @Test public void testUnloadTopicShallThrowNotFoundWhenTopicNotExist() { - try { - persistentTopics.unloadTopic(testTenant, testNamespace,"non-existent-topic", true); - } catch (RestException e) { - Assert.assertEquals(e.getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode()); - throw e; - } + AsyncResponse response = mock(AsyncResponse.class); + persistentTopics.unloadTopic(response, testTenant, testNamespace,"non-existent-topic", true); + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(RestException.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode()); } @Test