Skip to content

Commit

Permalink
[Issue 5904]Support unload all partitions of a partitioned topic (a…
Browse files Browse the repository at this point in the history
…pache#6187)

Fixes apache#5904 

### Motivation
Pulsar supports unload a non-partitioned-topic or a partition of a partitioned topic. If there has a partitioned topic with too many partitions, users need to get all partition and unload them one by one. We need to support unload all partition of a partitioned topic.

(cherry picked from commit d35e6c1)
  • Loading branch information
ltamber authored and tuteng committed Mar 21, 2020
1 parent 88dda60 commit e33238f
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,32 @@ protected ZooKeeperChildrenCache failureDomainListCache() {
return pulsar().getConfigurationCache().failureDomainListCache();
}

protected CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(
TopicName topicName, boolean authoritative, boolean checkAllowAutoCreation) {
validateClusterOwnership(topicName.getCluster());
// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
validateGlobalNamespaceOwnership(topicName.getNamespaceObject());

try {
checkConnect(topicName);
} catch (WebApplicationException e) {
validateAdminAccessForTenant(topicName.getTenant());
} catch (Exception e) {
// unknown error marked as internal server error
log.warn("Unexpected error while authorizing lookup. topic={}, role={}. Error: {}", topicName,
clientAppId(), e.getMessage(), e);
return FutureUtil.failedFuture(e);
}

if (checkAllowAutoCreation) {
return pulsar().getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName);
} else {
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName);
}
}

protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicName,
boolean authoritative, boolean checkAllowAutoCreation) {
validateClusterOwnership(topicName.getCluster());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,12 +655,63 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
});
}

protected void internalUnloadTopic(boolean authoritative) {
protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authoritative) {
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
unloadTopic(topicName, authoritative);

getPartitionedTopicMetadataAsync(topicName, authoritative, false).whenComplete((meta, t) -> {
if (meta.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();

for (int i = 0; i < meta.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics().unloadAsync(topicNamePartition.toString()));
} catch (Exception e) {
log.error("[{}] Failed to unload topic {}", clientAppId(), topicNamePartition, e);
asyncResponse.resume(new RestException(e));
return;
}
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable th = exception.getCause();
if (th instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage()));
} else {
log.error("[{}] Failed to unload topic {}", clientAppId(), topicName, exception);
asyncResponse.resume(new RestException(exception));
}
return null;
}

asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
validateAdminAccessForTenant(topicName.getTenant());
validateTopicOwnership(topicName, authoritative);

Topic topic = getTopicReference(topicName);
topic.close(false).whenComplete((r, ex) -> {
if (ex != null) {
log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, ex.getMessage(), ex);
asyncResponse.resume(new RestException(ex));

} else {
log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
}
});
}
}).exceptionally(t -> {
Throwable th = t.getCause();
asyncResponse.resume(new RestException(th));
return null;
});
}

protected void internalDeleteTopic(boolean authoritative, boolean force) {
Expand Down Expand Up @@ -1893,22 +1944,6 @@ private CompletableFuture<Void> createSubscriptions(TopicName topicName, int num
return result;
}

protected void unloadTopic(TopicName topicName, boolean authoritative) {
validateSuperUserAccess();
validateTopicOwnership(topicName, authoritative);
try {
Topic topic = getTopicReference(topicName);
topic.close(false).get();
log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName);
} catch (NullPointerException e) {
log.error("[{}] topic {} not found", clientAppId(), topicName);
throw new RestException(Status.NOT_FOUND, "Topic does not exist");
} catch (Exception e) {
log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, e.getMessage(), e);
throw new RestException(e);
}
}

// as described at : (PR: #836) CPP-client old client lib should not be allowed to connect on partitioned-topic.
// So, all requests from old-cpp-client (< v1.21) must be rejected.
// Pulsar client-java lib always passes user-agent as X-Java-$version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,18 @@ public void createPartitionedTopic(@PathParam("property") String property, @Path
@ApiOperation(hidden = true, value = "Unload a topic")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist") })
public void unloadTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
public void unloadTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
log.info("[{}] Unloading topic {}", clientAppId(), topicName);

if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalUnloadTopic(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
unloadTopic(topicName, authoritative);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,12 @@ public void deletePartitionedTopic(@Suspended final AsyncResponse asyncResponse,
@ApiOperation(hidden = true, value = "Unload a topic")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist") })
public void unloadTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
public void unloadTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
internalUnloadTopic(authoritative);
internalUnloadTopic(asyncResponse, authoritative);
}

@DELETE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ public void createPartitionedTopic(
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration"),
})
public void unloadTopic(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
Expand All @@ -222,12 +223,14 @@ public void unloadTopic(
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
try {
validateTopicName(tenant, namespace, encodedTopic);
internalUnloadTopic(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
unloadTopic(topicName, authoritative);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ public void deletePartitionedTopic(
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public void unloadTopic(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
Expand All @@ -373,8 +374,14 @@ public void unloadTopic(
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
internalUnloadTopic(authoritative);
try {
validateTopicName(tenant, namespace, encodedTopic);
internalUnloadTopic(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@DELETE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,18 +276,46 @@ public void testUpdatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix
@Test
public void testUnloadTopic() {
final String topicName = "standard-topic-to-be-unload";
final String partitionTopicName = "partition-topic-to-be-unload";

// 1) not exist topic
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.unloadTopic(response, testTenant, testNamespace, "topic-not-exist", true);
ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());

// 2) create non partitioned topic and unload
response = mock(AsyncResponse.class);
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true);
persistentTopics.unloadTopic(testTenant, testNamespace, topicName, true);
persistentTopics.unloadTopic(response, testTenant, testNamespace, topicName, true);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());

// 3) create partitioned topic and unload
response = mock(AsyncResponse.class);
persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionTopicName, 6);
persistentTopics.unloadTopic(response, testTenant, testNamespace, partitionTopicName, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());

// 4) delete partitioned topic
response = mock(AsyncResponse.class);
persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, partitionTopicName, true, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
}

@Test(expectedExceptions = RestException.class)
@Test
public void testUnloadTopicShallThrowNotFoundWhenTopicNotExist() {
try {
persistentTopics.unloadTopic(testTenant, testNamespace,"non-existent-topic", true);
} catch (RestException e) {
Assert.assertEquals(e.getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
throw e;
}
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.unloadTopic(response, testTenant, testNamespace,"non-existent-topic", true);
ArgumentCaptor<RestException> responseCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
}

@Test
Expand Down

0 comments on commit e33238f

Please sign in to comment.