Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 5904]Support unload all partitions of a partitioned topic #6187

Merged
merged 8 commits into from
Feb 16, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,32 @@ protected ZooKeeperChildrenCache failureDomainListCache() {
return pulsar().getConfigurationCache().failureDomainListCache();
}

protected CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(
TopicName topicName, boolean authoritative, boolean checkAllowAutoCreation) {
validateClusterOwnership(topicName.getCluster());
// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
validateGlobalNamespaceOwnership(topicName.getNamespaceObject());

try {
checkConnect(topicName);
} catch (WebApplicationException e) {
validateAdminAccessForTenant(topicName.getTenant());
} catch (Exception e) {
// unknown error marked as internal server error
log.warn("Unexpected error while authorizing lookup. topic={}, role={}. Error: {}", topicName,
clientAppId(), e.getMessage(), e);
return FutureUtil.failedFuture(e);
}

if (checkAllowAutoCreation) {
return pulsar().getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName);
} else {
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName);
}
}

protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicName,
boolean authoritative, boolean checkAllowAutoCreation) {
validateClusterOwnership(topicName.getCluster());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,12 +655,63 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
});
}

protected void internalUnloadTopic(boolean authoritative) {
protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authoritative) {
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
unloadTopic(topicName, authoritative);

getPartitionedTopicMetadataAsync(topicName, authoritative, false).whenComplete((meta, t) -> {
if (meta.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();

for (int i = 0; i < meta.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics().unloadAsync(topicNamePartition.toString()));
} catch (Exception e) {
log.error("[{}] Failed to unload topic {}", clientAppId(), topicNamePartition, e);
asyncResponse.resume(new RestException(e));
return;
}
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable th = exception.getCause();
if (th instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage()));
} else {
log.error("[{}] Failed to unload topic {}", clientAppId(), topicName, exception);
asyncResponse.resume(new RestException(exception));
}
return null;
}

asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
validateAdminAccessForTenant(topicName.getTenant());
validateTopicOwnership(topicName, authoritative);

Topic topic = getTopicReference(topicName);
topic.close(false).whenComplete((r, ex) -> {
if (ex != null) {
log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, ex.getMessage(), ex);
asyncResponse.resume(new RestException(ex));

} else {
log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
}
});
}
}).exceptionally(t -> {
Throwable th = t.getCause();
asyncResponse.resume(new RestException(th));
return null;
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix indention

}

protected void internalDeleteTopic(boolean authoritative, boolean force) {
Expand Down Expand Up @@ -1893,22 +1944,6 @@ private CompletableFuture<Void> createSubscriptions(TopicName topicName, int num
return result;
}

protected void unloadTopic(TopicName topicName, boolean authoritative) {
validateSuperUserAccess();
validateTopicOwnership(topicName, authoritative);
try {
Topic topic = getTopicReference(topicName);
topic.close(false).get();
log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName);
} catch (NullPointerException e) {
log.error("[{}] topic {} not found", clientAppId(), topicName);
throw new RestException(Status.NOT_FOUND, "Topic does not exist");
} catch (Exception e) {
log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, e.getMessage(), e);
throw new RestException(e);
}
}

// as described at : (PR: #836) CPP-client old client lib should not be allowed to connect on partitioned-topic.
// So, all requests from old-cpp-client (< v1.21) must be rejected.
// Pulsar client-java lib always passes user-agent as X-Java-$version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,18 @@ public void createPartitionedTopic(@PathParam("property") String property, @Path
@ApiOperation(hidden = true, value = "Unload a topic")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist") })
public void unloadTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
public void unloadTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
log.info("[{}] Unloading topic {}", clientAppId(), topicName);

if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalUnloadTopic(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
unloadTopic(topicName, authoritative);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,12 @@ public void deletePartitionedTopic(@Suspended final AsyncResponse asyncResponse,
@ApiOperation(hidden = true, value = "Unload a topic")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist") })
public void unloadTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
public void unloadTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
internalUnloadTopic(authoritative);
internalUnloadTopic(asyncResponse, authoritative);
}

@DELETE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ public void createPartitionedTopic(
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration"),
})
public void unloadTopic(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
Expand All @@ -222,12 +223,14 @@ public void unloadTopic(
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
try {
validateTopicName(tenant, namespace, encodedTopic);
internalUnloadTopic(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
unloadTopic(topicName, authoritative);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ public void deletePartitionedTopic(
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public void unloadTopic(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
Expand All @@ -373,8 +374,14 @@ public void unloadTopic(
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
internalUnloadTopic(authoritative);
try {
validateTopicName(tenant, namespace, encodedTopic);
internalUnloadTopic(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@DELETE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,18 +276,46 @@ public void testUpdatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix
@Test
public void testUnloadTopic() {
final String topicName = "standard-topic-to-be-unload";
final String partitionTopicName = "partition-topic-to-be-unload";

// 1) not exist topic
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.unloadTopic(response, testTenant, testNamespace, "topic-not-exist", true);
ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());

// 2) create non partitioned topic and unload
response = mock(AsyncResponse.class);
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true);
persistentTopics.unloadTopic(testTenant, testNamespace, topicName, true);
persistentTopics.unloadTopic(response, testTenant, testNamespace, topicName, true);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());

// 3) create partitioned topic and unload
response = mock(AsyncResponse.class);
persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionTopicName, 6);
persistentTopics.unloadTopic(response, testTenant, testNamespace, partitionTopicName, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());

// 4) delete partitioned topic
response = mock(AsyncResponse.class);
persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, partitionTopicName, true, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
}

@Test(expectedExceptions = RestException.class)
@Test
public void testUnloadTopicShallThrowNotFoundWhenTopicNotExist() {
try {
persistentTopics.unloadTopic(testTenant, testNamespace,"non-existent-topic", true);
} catch (RestException e) {
Assert.assertEquals(e.getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
throw e;
}
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.unloadTopic(response, testTenant, testNamespace,"non-existent-topic", true);
ArgumentCaptor<RestException> responseCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
}

@Test
Expand Down