From e3e41a6065f41abe278fc5b78a54ff68a4f02e9c Mon Sep 17 00:00:00 2001 From: labkey-jeckels Date: Tue, 8 Jul 2025 16:30:13 -0700 Subject: [PATCH 1/6] Improve detection of an idle indexer --- .../src/org/labkey/search/model/AbstractSearchService.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/search/src/org/labkey/search/model/AbstractSearchService.java b/search/src/org/labkey/search/model/AbstractSearchService.java index d70caf3488e..5eaea44443d 100644 --- a/search/src/org/labkey/search/model/AbstractSearchService.java +++ b/search/src/org/labkey/search/model/AbstractSearchService.java @@ -604,7 +604,10 @@ public boolean _eq(URLHelper a, URLHelper b) @Override public boolean drainQueue(PRIORITY priority, long timeout, TimeUnit unit) throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch latch = new CountDownLatch(2); + + // The indexer uses multiple threads for different types of work. Queue an item and a Runnable to be sure + // both get drained SearchService.IndexTask task = createTask("WaitForIndexer", new SearchService.TaskListener() { @Override public void success() @@ -616,6 +619,8 @@ public boolean drainQueue(PRIORITY priority, long timeout, TimeUnit unit) throws task.addNoop(priority); task.setReady(); + task.addRunnable(latch::countDown, priority); + boolean success = latch.await(timeout, unit); refreshNow(); return success; From b65eeea05bb281ca5f14570224c3aecba5b05f47 Mon Sep 17 00:00:00 2001 From: labkey-jeckels Date: Tue, 8 Jul 2025 16:38:12 -0700 Subject: [PATCH 2/6] Runnable first, then item --- .../org/labkey/api/search/SearchService.java | 1 + .../search/model/AbstractSearchService.java | 32 ++++++++++--------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/api/src/org/labkey/api/search/SearchService.java b/api/src/org/labkey/api/search/SearchService.java index f897b8b5ae1..0356c01a57e 100644 --- a/api/src/org/labkey/api/search/SearchService.java +++ b/api/src/org/labkey/api/search/SearchService.java @@ -120,6 +120,7 @@ static void setInstance(SearchService impl) */ boolean drainQueue(PRIORITY priority, long timeout, TimeUnit unit) throws InterruptedException; + /** From lowest to highest priority */ enum PRIORITY { commit, diff --git a/search/src/org/labkey/search/model/AbstractSearchService.java b/search/src/org/labkey/search/model/AbstractSearchService.java index 5eaea44443d..0726d933342 100644 --- a/search/src/org/labkey/search/model/AbstractSearchService.java +++ b/search/src/org/labkey/search/model/AbstractSearchService.java @@ -604,22 +604,24 @@ public boolean _eq(URLHelper a, URLHelper b) @Override public boolean drainQueue(PRIORITY priority, long timeout, TimeUnit unit) throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(2); + final CountDownLatch latch = new CountDownLatch(1); - // The indexer uses multiple threads for different types of work. Queue an item and a Runnable to be sure - // both get drained - SearchService.IndexTask task = createTask("WaitForIndexer", new SearchService.TaskListener() - { - @Override public void success() - { - latch.countDown(); - } - @Override public void indexError(Resource r, Throwable t) { } - }); - task.addNoop(priority); - task.setReady(); - - task.addRunnable(latch::countDown, priority); + SearchService.IndexTask task = createTask("WaitForIndexerRunnable"); + task.addRunnable(priority, () -> + { + // The indexer uses multiple threads for different types of work. Queue a Runnable first, and when it executes, + // queue the Item + SearchService.IndexTask itemTask = createTask("WaitForIndexer", new SearchService.TaskListener() + { + @Override public void success() + { + latch.countDown(); + } + @Override public void indexError(Resource r, Throwable t) { } + }); + itemTask.addNoop(priority); + itemTask.setReady(); + }); boolean success = latch.await(timeout, unit); refreshNow(); From 6cce57914dbe84dd3b0669487c0dd36681121fed Mon Sep 17 00:00:00 2001 From: labkey-matthewb Date: Wed, 9 Jul 2025 10:17:47 -0700 Subject: [PATCH 3/6] simplify drainQueue() --- .../search/model/AbstractSearchService.java | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/search/src/org/labkey/search/model/AbstractSearchService.java b/search/src/org/labkey/search/model/AbstractSearchService.java index 0726d933342..df5633bc3fa 100644 --- a/search/src/org/labkey/search/model/AbstractSearchService.java +++ b/search/src/org/labkey/search/model/AbstractSearchService.java @@ -606,23 +606,21 @@ public boolean drainQueue(PRIORITY priority, long timeout, TimeUnit unit) throws { final CountDownLatch latch = new CountDownLatch(1); - SearchService.IndexTask task = createTask("WaitForIndexerRunnable"); - task.addRunnable(priority, () -> - { - // The indexer uses multiple threads for different types of work. Queue a Runnable first, and when it executes, - // queue the Item - SearchService.IndexTask itemTask = createTask("WaitForIndexer", new SearchService.TaskListener() - { - @Override public void success() - { - latch.countDown(); - } - @Override public void indexError(Resource r, Throwable t) { } - }); - itemTask.addNoop(priority); - itemTask.setReady(); - }); - + SearchService.IndexTask task = createTask("WaitForIndexerRunnable", new SearchService.TaskListener() + { + @Override public void success() + { + latch.countDown(); + } + @Override public void indexError(Resource r, Throwable t) + { + latch.countDown(); + } + }); + // The indexer uses multiple threads for different types of work. Queue a Runnable first, and when it executes, + // queue the Item + task.addRunnable(priority, () -> task.addNoop(priority)); + task.setReady(); boolean success = latch.await(timeout, unit); refreshNow(); return success; From cae7eae7be82fc9356082ccc95676fb5159b7d61 Mon Sep 17 00:00:00 2001 From: labkey-jeckels Date: Fri, 11 Jul 2025 17:51:02 -0700 Subject: [PATCH 4/6] Ensure that we check every task already in the queue --- .../org/labkey/api/search/SearchService.java | 5 +- .../labkey/core/search/NoopSearchService.java | 9 +- .../org/labkey/search/SearchController.java | 7 + .../search/model/AbstractIndexTask.java | 1 + .../search/model/AbstractSearchService.java | 126 ++++++++++++------ 5 files changed, 97 insertions(+), 51 deletions(-) diff --git a/api/src/org/labkey/api/search/SearchService.java b/api/src/org/labkey/api/search/SearchService.java index 0356c01a57e..0f302daac59 100644 --- a/api/src/org/labkey/api/search/SearchService.java +++ b/api/src/org/labkey/api/search/SearchService.java @@ -212,9 +212,6 @@ default void addRunnable(@NotNull SearchService.PRIORITY pri, @NotNull Runnable void addResource(@NotNull WebdavResource r, SearchService.PRIORITY pri); - /* This adds do nothing item to the queue, this is only useful for tracking progress of the queue. see TaskListener. */ - void addNoop(SearchService.PRIORITY pri); - default void addResourceList(List list, int batchSize, Function mapper) { ListUtils.partition(list, batchSize).forEach(sublist -> @@ -432,7 +429,7 @@ public String normalizeHref(Path contextPath, Container c) // helper to call when not found exception is detected void notFound(URLHelper url); - List getTasks(); + List getTasks(); void addPathToCrawl(Path path, @Nullable Date nextCrawl); diff --git a/core/src/org/labkey/core/search/NoopSearchService.java b/core/src/org/labkey/core/search/NoopSearchService.java index 52b095c92f5..3fb5884a6f7 100644 --- a/core/src/org/labkey/core/search/NoopSearchService.java +++ b/core/src/org/labkey/core/search/NoopSearchService.java @@ -57,11 +57,6 @@ public void addResource(@NotNull WebdavResource r, PRIORITY pri) { } - @Override - public void addNoop(PRIORITY pri) - { - } - @Override public void setReady() { @@ -169,7 +164,7 @@ public Map getIndexFormatProperties() } @Override - public WebPartView getSearchView(boolean includeSubfolders, int textBoxWidth, boolean includeHelpLink, boolean isWebpart) + public WebPartView getSearchView(boolean includeSubfolders, int textBoxWidth, boolean includeHelpLink, boolean isWebpart) { return null; } @@ -235,7 +230,7 @@ public WebdavResource resolveResource(@NotNull String resourceIdentifier) } @Override - public HttpView getCustomSearchResult(User user, @NotNull String resourceIdentifier) + public HttpView getCustomSearchResult(User user, @NotNull String resourceIdentifier) { return null; } diff --git a/search/src/org/labkey/search/SearchController.java b/search/src/org/labkey/search/SearchController.java index 1792b2bfc53..a6dccaafc8e 100644 --- a/search/src/org/labkey/search/SearchController.java +++ b/search/src/org/labkey/search/SearchController.java @@ -20,6 +20,7 @@ import org.apache.commons.lang3.EnumUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hc.core5.http.HttpStatus; +import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.json.JSONObject; @@ -58,6 +59,7 @@ import org.labkey.api.util.Path; import org.labkey.api.util.ResponseHelper; import org.labkey.api.util.URLHelper; +import org.labkey.api.util.logging.LogHelper; import org.labkey.api.view.ActionURL; import org.labkey.api.view.FolderManagement.FolderManagementViewPostAction; import org.labkey.api.view.HtmlView; @@ -93,6 +95,8 @@ public class SearchController extends SpringActionController { private static final DefaultActionResolver _actionResolver = new DefaultActionResolver(SearchController.class); + private static final Logger LOG = LogHelper.getLogger(SearchController.class, "Search UI and admin"); + public SearchController() { setActionResolver(_actionResolver); @@ -858,8 +862,11 @@ public static class WaitForIndexerAction extends ExportAction public void export(PriorityForm form, HttpServletResponse response, BindException errors) throws Exception { SearchService ss = SearchService.get(); + long startTime = System.currentTimeMillis(); boolean success = ss.drainQueue(form.getPriority(), 5, TimeUnit.MINUTES); + LOG.info("Spent {}ms draining the search indexer queue. Success: {}", System.currentTimeMillis() - startTime, success); + // Return an error if we time out if (!success) response.setStatus(HttpStatus.SC_INTERNAL_SERVER_ERROR); diff --git a/search/src/org/labkey/search/model/AbstractIndexTask.java b/search/src/org/labkey/search/model/AbstractIndexTask.java index f17080861d2..18800e3b499 100644 --- a/search/src/org/labkey/search/model/AbstractIndexTask.java +++ b/search/src/org/labkey/search/model/AbstractIndexTask.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.IdentityHashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; diff --git a/search/src/org/labkey/search/model/AbstractSearchService.java b/search/src/org/labkey/search/model/AbstractSearchService.java index df5633bc3fa..7f80c05011f 100644 --- a/search/src/org/labkey/search/model/AbstractSearchService.java +++ b/search/src/org/labkey/search/model/AbstractSearchService.java @@ -34,7 +34,6 @@ import org.labkey.api.data.SQLFragment; import org.labkey.api.data.SqlExecutor; import org.labkey.api.data.dialect.SqlDialect; -import org.labkey.api.resource.Resource; import org.labkey.api.search.SearchResultTemplate; import org.labkey.api.search.SearchService; import org.labkey.api.security.User; @@ -65,7 +64,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -75,9 +73,10 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; public abstract class AbstractSearchService implements SearchService, ShutdownListener { @@ -98,12 +97,12 @@ private static Logger getLoggerForCategory(String category) } // Runnables go here, and get pulled off in a single-threaded manner (assumption is that Runnables can create work very quickly) - final PriorityBlockingQueue _runQueue = new PriorityBlockingQueue<>(1000, itemCompare); + final PriorityBlockingQueue _runQueue = new PriorityBlockingQueue<>(1000); // Resources go here for preprocessing (this can be multi-threaded) - final PriorityBlockingQueue _itemQueue = new PriorityBlockingQueue<>(1000, itemCompare); + final PriorityBlockingQueue _itemQueue = new PriorityBlockingQueue<>(1000); - private final List _tasks = new CopyOnWriteArrayList<>(); + private final List<_IndexTask> _tasks = new CopyOnWriteArrayList<>(); private final _IndexTask _defaultTask = new _IndexTask("default"); private Throwable _configurationError = null; @@ -113,9 +112,6 @@ enum OPERATION add, delete, noop } - static final Comparator itemCompare = Comparator.comparing(o -> o._pri); - - public AbstractSearchService() { addSearchCategory(fileCategory); @@ -140,7 +136,7 @@ public IndexTask createTask(String description, TaskListener l) } @Override - public IndexTask defaultTask() + public _IndexTask defaultTask() { return _defaultTask; } @@ -201,17 +197,23 @@ public void addResource(@NotNull WebdavResource r, PRIORITY pri) queueItem(i); } - - @Override - public void addNoop(PRIORITY pri) + public void addNoop(PRIORITY pri, CustomCountLatch latch) { - final Item i = new Item( this, OPERATION.noop, "noop://noop", null, pri); + latch.increment(); + final Item i = new Item( this, OPERATION.noop, "noop://noop", null, pri) + { + @Override + void complete(boolean success) + { + super.complete(success); + latch.decrement(); + } + }; addItem(i); final Item r = new Item(this, () -> queueItem(i), pri); queueItem(r); } - @Override public void completeItem(Item item, boolean success) { @@ -241,17 +243,20 @@ public void setReady() } } - + // Consider: remove _op/OPERATION (not used), subclasses for resource vs. runnable (would clarify invariants and make // hashCode() & equals() more straightforward), formalize _id (using Runnable.toString() seems weak). - class Item + class Item implements Comparable { + static final AtomicLong seq = new AtomicLong(); + OPERATION _op; String _id; - IndexTask _task; + _IndexTask _task; WebdavResource _res; Runnable _run; PRIORITY _pri; + final long seqNum = seq.incrementAndGet(); int _preprocessAttempts = 0; @@ -259,7 +264,7 @@ class Item long _start = 0; // used by setLastIndexed long _complete = 0; // really just for debugging - Item(IndexTask task, OPERATION op, String id, WebdavResource r, PRIORITY pri) + Item(_IndexTask task, OPERATION op, String id, WebdavResource r, PRIORITY pri) { if (null != r) _start = HeartBeat.currentTimeMillis(); @@ -270,7 +275,7 @@ class Item _task = task; } - Item(IndexTask task, Runnable r, PRIORITY pri) + Item(_IndexTask task, Runnable r, PRIORITY pri) { _run = r; _pri = null == pri ? PRIORITY.bulk : pri; @@ -297,7 +302,7 @@ void complete(boolean success) { if (null != _task) { - ((_IndexTask)_task).completeItem(this, success); + _task.completeItem(this, success); } if (!success) @@ -337,6 +342,15 @@ public String toString() { return "Item{" + (null != _res ? _res.toString() : null != _run ? _run.toString() : _op.name()) + '}'; } + + @Override + public int compareTo(@NotNull AbstractSearchService.Item o) + { + int res = _pri.compareTo(o._pri) * -1; + if (res == 0 && o != this) + res = (seqNum < o.seqNum ? -1 : 1); + return res; + } } @@ -518,14 +532,14 @@ public String toString() } - public void addTask(IndexTask task) + private void addTask(_IndexTask task) { _tasks.add(task); } @Override - public List getTasks() + public List<_IndexTask> getTasks() { return new LinkedList<>(_tasks); } @@ -604,22 +618,16 @@ public boolean _eq(URLHelper a, URLHelper b) @Override public boolean drainQueue(PRIORITY priority, long timeout, TimeUnit unit) throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); + final CustomCountLatch latch = new CustomCountLatch(1); + SearchService.IndexTask task = createTask("WaitForIndexer"); + // The indexer uses multiple threads for different types of work. Queue a Runnable first, and when it executes, + // queue an Item on every task + task.addRunnable(priority, () -> { + _tasks.forEach(t -> t.addNoop(priority, latch)); + _defaultTask.addNoop(priority, latch); - SearchService.IndexTask task = createTask("WaitForIndexerRunnable", new SearchService.TaskListener() - { - @Override public void success() - { - latch.countDown(); - } - @Override public void indexError(Resource r, Throwable t) - { - latch.countDown(); - } + latch.decrement(); // Decrement one for the runnable itself }); - // The indexer uses multiple threads for different types of work. Queue a Runnable first, and when it executes, - // queue the Item - task.addRunnable(priority, () -> task.addNoop(priority)); task.setReady(); boolean success = latch.await(timeout, unit); refreshNow(); @@ -869,10 +877,10 @@ public boolean isRunning() public void purgeQueues() { _defaultTask._subtasks.clear(); - for (IndexTask t : getTasks()) + for (AbstractIndexTask t : getTasks()) { t.cancel(true); - ((AbstractIndexTask)t)._subtasks.clear(); + t._subtasks.clear(); } _runQueue.clear(); _itemQueue.clear(); @@ -1142,7 +1150,7 @@ private void _indexLoop() if (null != out[0]) { - _IndexTask t = (_IndexTask)i._task; + _IndexTask t = i._task; if (null != t && null != t._listener) t._listener.indexError(r,out[0]); } @@ -1513,4 +1521,42 @@ public long getFileSizeLimit() { return SearchPropertyManager.getFileSizeLimitMB() * (1024*1024); } + + public static class CustomCountLatch + { + private final AtomicInteger count; + + public CustomCountLatch(int initialCount) + { + this.count = new AtomicInteger(initialCount); + } + + public boolean await(long timeout, TimeUnit unit) throws InterruptedException + { + synchronized (this) + { + while (count.get() > 0) + { + wait(unit.toMillis(timeout)); + } + } + return count.get() == 0; + } + + public void increment() + { + count.incrementAndGet(); + } + + public void decrement() + { + if (count.decrementAndGet() == 0) + { + synchronized (this) + { + notifyAll(); + } + } + } + } } From b65a28b453fa13883169fd6738e97fe467656ed9 Mon Sep 17 00:00:00 2001 From: labkey-jeckels Date: Fri, 11 Jul 2025 17:51:54 -0700 Subject: [PATCH 5/6] Comment --- search/src/org/labkey/search/model/AbstractSearchService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/search/src/org/labkey/search/model/AbstractSearchService.java b/search/src/org/labkey/search/model/AbstractSearchService.java index 7f80c05011f..33681d5e002 100644 --- a/search/src/org/labkey/search/model/AbstractSearchService.java +++ b/search/src/org/labkey/search/model/AbstractSearchService.java @@ -256,6 +256,7 @@ class Item implements Comparable WebdavResource _res; Runnable _run; PRIORITY _pri; + // Used to ensure FIFO ordering in the queue final long seqNum = seq.incrementAndGet(); int _preprocessAttempts = 0; From 1015c7be9e197779f177b2416a9c22aa8e355b66 Mon Sep 17 00:00:00 2001 From: labkey-jeckels Date: Mon, 14 Jul 2025 17:55:25 -0700 Subject: [PATCH 6/6] Simplify monitoring for queue draining --- .../search/model/AbstractIndexTask.java | 4 +- .../search/model/AbstractSearchService.java | 90 ++++++++----------- 2 files changed, 38 insertions(+), 56 deletions(-) diff --git a/search/src/org/labkey/search/model/AbstractIndexTask.java b/search/src/org/labkey/search/model/AbstractIndexTask.java index 18800e3b499..f6e5caf71e0 100644 --- a/search/src/org/labkey/search/model/AbstractIndexTask.java +++ b/search/src/org/labkey/search/model/AbstractIndexTask.java @@ -15,6 +15,7 @@ */ package org.labkey.search.model; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.labkey.api.search.SearchService; @@ -25,7 +26,6 @@ import java.util.Collections; import java.util.IdentityHashMap; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -220,7 +220,7 @@ public SearchService.IndexTask get() throws InterruptedException @Override - public SearchService.IndexTask get(long timeout, TimeUnit unit) throws InterruptedException + public SearchService.IndexTask get(long timeout, @NotNull TimeUnit unit) throws InterruptedException { synchronized (_completeEvent) { diff --git a/search/src/org/labkey/search/model/AbstractSearchService.java b/search/src/org/labkey/search/model/AbstractSearchService.java index 33681d5e002..728d85b6d7c 100644 --- a/search/src/org/labkey/search/model/AbstractSearchService.java +++ b/search/src/org/labkey/search/model/AbstractSearchService.java @@ -34,6 +34,7 @@ import org.labkey.api.data.SQLFragment; import org.labkey.api.data.SqlExecutor; import org.labkey.api.data.dialect.SqlDialect; +import org.labkey.api.resource.Resource; import org.labkey.api.search.SearchResultTemplate; import org.labkey.api.search.SearchService; import org.labkey.api.security.User; @@ -73,9 +74,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; public abstract class AbstractSearchService implements SearchService, ShutdownListener @@ -120,7 +121,7 @@ public AbstractSearchService() @Override - public IndexTask createTask(String description) + public _IndexTask createTask(String description) { _IndexTask task = new _IndexTask(description); addTask(task); @@ -128,7 +129,7 @@ public IndexTask createTask(String description) } @Override - public IndexTask createTask(String description, TaskListener l) + public _IndexTask createTask(String description, TaskListener l) { _IndexTask task = new _IndexTask(description, l); addTask(task); @@ -156,7 +157,7 @@ public void addPathToCrawl(Path path, Date next) } - class _IndexTask extends AbstractIndexTask + public class _IndexTask extends AbstractIndexTask { _IndexTask(String description) { @@ -197,16 +198,15 @@ public void addResource(@NotNull WebdavResource r, PRIORITY pri) queueItem(i); } - public void addNoop(PRIORITY pri, CustomCountLatch latch) + public void addNoop(PRIORITY pri) { - latch.increment(); final Item i = new Item( this, OPERATION.noop, "noop://noop", null, pri) { @Override void complete(boolean success) { + logQueueStatus("addNoop() complete"); super.complete(success); - latch.decrement(); } }; addItem(i); @@ -246,7 +246,7 @@ public void setReady() // Consider: remove _op/OPERATION (not used), subclasses for resource vs. runnable (would clarify invariants and make // hashCode() & equals() more straightforward), formalize _id (using Runnable.toString() seems weak). - class Item implements Comparable + public class Item implements Comparable { static final AtomicLong seq = new AtomicLong(); @@ -477,7 +477,7 @@ private void queueItem(Item i) return; } - _log.debug("_submitQueue.put(" + i._id + ")"); + logQueueStatus("_submitQueue.put(" + i._id + ")"); if (null != i._run) { @@ -619,22 +619,42 @@ public boolean _eq(URLHelper a, URLHelper b) @Override public boolean drainQueue(PRIORITY priority, long timeout, TimeUnit unit) throws InterruptedException { - final CustomCountLatch latch = new CustomCountLatch(1); - SearchService.IndexTask task = createTask("WaitForIndexer"); + final CountDownLatch latch = new CountDownLatch(1); + long start = System.currentTimeMillis(); + _IndexTask task = createTask("WaitForIndexer", new TaskListener() + { + @Override + public void success() + { + logQueueStatus("drainQueue's TaskListener.success()"); + latch.countDown(); + } + + @Override + public void indexError(Resource r, Throwable t) + { + logQueueStatus("drainQueue's TaskListener.indexError()"); + latch.countDown(); + } + }); // The indexer uses multiple threads for different types of work. Queue a Runnable first, and when it executes, - // queue an Item on every task + // queue an Item to ensure all queues are cleared task.addRunnable(priority, () -> { - _tasks.forEach(t -> t.addNoop(priority, latch)); - _defaultTask.addNoop(priority, latch); - - latch.decrement(); // Decrement one for the runnable itself + logQueueStatus("drainQueue's Runnable.run()"); + task.addNoop(priority); }); task.setReady(); boolean success = latch.await(timeout, unit); refreshNow(); + logQueueStatus("drainQueue() complete after " + (System.currentTimeMillis() - start) + " ms"); return success; } + private void logQueueStatus(String message) + { + _log.debug("{}; _runQueue.size() = {}, _itemQueue.size() = {}", message, _runQueue.size(), _itemQueue.size()); + } + @Override public void notFound(URLHelper in) { @@ -1522,42 +1542,4 @@ public long getFileSizeLimit() { return SearchPropertyManager.getFileSizeLimitMB() * (1024*1024); } - - public static class CustomCountLatch - { - private final AtomicInteger count; - - public CustomCountLatch(int initialCount) - { - this.count = new AtomicInteger(initialCount); - } - - public boolean await(long timeout, TimeUnit unit) throws InterruptedException - { - synchronized (this) - { - while (count.get() > 0) - { - wait(unit.toMillis(timeout)); - } - } - return count.get() == 0; - } - - public void increment() - { - count.incrementAndGet(); - } - - public void decrement() - { - if (count.decrementAndGet() == 0) - { - synchronized (this) - { - notifyAll(); - } - } - } - } }