From bee4bd582ca8137824ac58077a4642430c30c907 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Thu, 19 Sep 2024 12:33:45 -0700 Subject: [PATCH] resolved comments --- .../channel/ServiceUnitStateTableViewSyncer.java | 3 +-- .../tableview/impl/MetadataStoreTableViewImpl.java | 9 ++++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java index 202e3b0efed69..10ab39a66d279 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java @@ -89,8 +89,7 @@ private CompletableFuture syncToMetadataStore(String key, ServiceUnitState return metadataStoreTableView.put(key, data); } - private CompletableFuture dummy(String key, ServiceUnitStateData data) { - return CompletableFuture.completedFuture(null); + private void dummy(String key, ServiceUnitStateData data) { } private void syncExistingItems() diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.java index aa08784b259e8..4f9aad0ba658b 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.java @@ -234,14 +234,15 @@ private CompletableFuture handleExisting(String path) { } private void fill() throws MetadataStoreException { - long started = System.currentTimeMillis(); + final var deadline = System.currentTimeMillis() + FILL_TIMEOUT_IN_MILLIS; log.info("{} start filling existing items under the pathPrefix:{}", name, pathPrefix); ConcurrentLinkedDeque q = new ConcurrentLinkedDeque<>(); List> futures = new ArrayList<>(); q.add(pathPrefix); LongAdder count = new LongAdder(); while (!q.isEmpty()) { - if (System.currentTimeMillis() - started > FILL_TIMEOUT_IN_MILLIS) { + var now = System.currentTimeMillis(); + if (now >= deadline) { String err = name + " failed to fill existing items in " + TimeUnit.MILLISECONDS.toSeconds(FILL_TIMEOUT_IN_MILLIS) + " secs. Filled count:" + count.sum(); @@ -266,7 +267,9 @@ private void fill() throws MetadataStoreException { })); } try { - FutureUtil.waitForAll(futures).get(timeoutInMillis, TimeUnit.MILLISECONDS); + FutureUtil.waitForAll(futures).get( + Math.min(timeoutInMillis, deadline - now), + TimeUnit.MILLISECONDS); } catch (Throwable e) { Throwable c = FutureUtil.unwrapCompletionException(e); log.error("{} failed to fill existing items", name, c);