Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Sep 23, 2024
1 parent d57a715 commit 38e0fe9
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1259,7 +1259,11 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture<V
private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
topicFuture.exceptionally(t -> {
pulsarStats.recordTopicLoadFailed();
if (t instanceof BrokerServiceException.BundleUnloadingException) {
pulsarStats.recordConcurrencyLoadTopicAndUnloadBundle();
} else {
pulsarStats.recordTopicLoadFailed();
}
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ Map<String, String> getDimensionMap(String metricsName) {
Metrics getTopicLoadMetrics() {
Metrics metrics = getDimensionMetrics("pulsar_topic_load_times", "topic_load", topicLoadStats);
metrics.put("brk_topic_load_failed_count", TOPIC_LOAD_FAILED.get());
metrics.put("brk_concurrency_load_topic_and_unload_bundle_count",
CONCURRENCY_LOAD_TOPIC_AND_UNLOAD_BUNDLE.get());
return metrics;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -78,6 +79,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
Expand Down Expand Up @@ -1674,6 +1676,18 @@ public void testMetricsPersistentTopicLoadFails() throws Exception {
admin.topics().createNonPartitionedTopic(topic);
admin.topics().unload(topic);

// Get original counter.
MutableInt failedLoadTopic1 = new MutableInt(0);
MutableInt concurrencyLoadTopicAndUnloadBundle1 = new MutableInt(0);
JerseyClient httpClient = JerseyClientBuilder.createClient();
String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
long failedLoadTopic = parseLongMetric(response, "pulsar_topic_load_failed_count");
long concurrencyLoadTopicAndUnloadBundle =
parseLongMetric(response, "pulsar_concurrency_load_topic_and_unload_bundle_count");
failedLoadTopic1.setValue(failedLoadTopic);
concurrencyLoadTopicAndUnloadBundle1.setValue(concurrencyLoadTopicAndUnloadBundle);

// Inject an error that makes the topic load fails.
AtomicBoolean failMarker = new AtomicBoolean(true);
mockZooKeeper.failConditional(KeeperException.Code.NODEEXISTS, (op, path) -> {
Expand All @@ -1686,19 +1700,14 @@ public void testMetricsPersistentTopicLoadFails() throws Exception {

// Do test
CompletableFuture<Producer<byte[]>> producer = pulsarClient.newProducer().topic(topic).createAsync();
JerseyClient httpClient = JerseyClientBuilder.createClient();
Awaitility.await().until(() -> {
String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
Awaitility.await().untilAsserted(() -> {
String response2 = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
Multimap<String, PrometheusMetricsClient.Metric> metricMap = PrometheusMetricsClient.parseMetrics(response);
if (!metricMap.containsKey("pulsar_topic_load_failed_count")) {
return false;
}
double topic_load_failed_count = 0;
for (PrometheusMetricsClient.Metric metric : metricMap.get("pulsar_topic_load_failed_count")) {
topic_load_failed_count += metric.value;
}
return topic_load_failed_count >= 1D;
long failedLoadTopic2 = parseLongMetric(response2, "pulsar_topic_load_failed_count");
long concurrencyLoadTopicAndUnloadBundle2 =
parseLongMetric(response2, "pulsar_concurrency_load_topic_and_unload_bundle_count");
assertTrue(failedLoadTopic2 > failedLoadTopic1.getValue());
assertTrue(concurrencyLoadTopicAndUnloadBundle2 == concurrencyLoadTopicAndUnloadBundle1.getValue());
});

// Remove the injection.
Expand All @@ -1710,6 +1719,74 @@ public void testMetricsPersistentTopicLoadFails() throws Exception {
admin.namespaces().deleteNamespace(namespace);
}

@Test
public void testMetricsPersistentTopicLoadFailsDueToBundleUnloading() throws Exception {
final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", "");
String topic = "persistent://" + namespace + "/topic1_" + UUID.randomUUID();
admin.namespaces().createNamespace(namespace);
admin.topics().createNonPartitionedTopic(topic);
admin.namespaces().unload(namespace);

// Get original counter.
MutableInt failedLoadTopic1 = new MutableInt(0);
MutableInt concurrencyLoadTopicAndUnloadBundle1 = new MutableInt(0);
JerseyClient httpClient = JerseyClientBuilder.createClient();
String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
long failedLoadTopic = parseLongMetric(response, "pulsar_topic_load_failed_count");
long concurrencyLoadTopicAndUnloadBundle =
parseLongMetric(response, "pulsar_concurrency_load_topic_and_unload_bundle_count");
failedLoadTopic1.setValue(failedLoadTopic);
concurrencyLoadTopicAndUnloadBundle1.setValue(concurrencyLoadTopicAndUnloadBundle);

// Inject an error that makes the topic load fails.
AtomicBoolean failMarker = new AtomicBoolean(true);
mockZooKeeper.failConditional(KeeperException.Code.NODEEXISTS, (op, path) -> {
if (failMarker.get() && op.equals(MockZooKeeper.Op.CREATE) &&
path.startsWith("/namespace/" + namespace)) {
return true;
}
return false;
});

// Do test
try {
pulsar.getBrokerService().loadOrCreatePersistentTopic(topic, true, Collections.emptyMap(), null).join();
} catch (Exception ex) {
// ignore, because we injected an error above.
}
Awaitility.await().untilAsserted(() -> {
String response2 = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
long failedLoadTopic2 = parseLongMetric(response2, "pulsar_topic_load_failed_count");
long concurrencyLoadTopicAndUnloadBundle2 =
parseLongMetric(response2, "pulsar_concurrency_load_topic_and_unload_bundle_count");
assertTrue(failedLoadTopic2 == failedLoadTopic1.getValue());
assertTrue(concurrencyLoadTopicAndUnloadBundle2 > concurrencyLoadTopicAndUnloadBundle1.getValue());
});

// Remove the injection.
failMarker.set(false);
// cleanup.
httpClient.close();
admin.topics().delete(topic);
admin.namespaces().deleteNamespace(namespace);
}

private long parseLongMetric(String metricsResponse, String metricName) {
Multimap<String, PrometheusMetricsClient.Metric> metricMap =
PrometheusMetricsClient.parseMetrics(metricsResponse);
if (!metricMap.containsKey(metricName)) {
return 0;
}
double counter = 0;
for (PrometheusMetricsClient.Metric metric :
metricMap.get(metricName)) {
counter += metric.value;
}
return Double.valueOf(counter).longValue();
}

@Test
public void testMetricsNonPersistentTopicLoadFails() throws Exception {
final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", "");
Expand Down

0 comments on commit 38e0fe9

Please sign in to comment.