diff --git a/api/src/org/labkey/api/search/SearchService.java b/api/src/org/labkey/api/search/SearchService.java index f897b8b5ae1..0f302daac59 100644 --- a/api/src/org/labkey/api/search/SearchService.java +++ b/api/src/org/labkey/api/search/SearchService.java @@ -120,6 +120,7 @@ static void setInstance(SearchService impl) */ boolean drainQueue(PRIORITY priority, long timeout, TimeUnit unit) throws InterruptedException; + /** From lowest to highest priority */ enum PRIORITY { commit, @@ -211,9 +212,6 @@ default void addRunnable(@NotNull SearchService.PRIORITY pri, @NotNull Runnable void addResource(@NotNull WebdavResource r, SearchService.PRIORITY pri); - /* This adds do nothing item to the queue, this is only useful for tracking progress of the queue. see TaskListener. */ - void addNoop(SearchService.PRIORITY pri); - default void addResourceList(List list, int batchSize, Function mapper) { ListUtils.partition(list, batchSize).forEach(sublist -> @@ -431,7 +429,7 @@ public String normalizeHref(Path contextPath, Container c) // helper to call when not found exception is detected void notFound(URLHelper url); - List getTasks(); + List getTasks(); void addPathToCrawl(Path path, @Nullable Date nextCrawl); diff --git a/core/src/org/labkey/core/search/NoopSearchService.java b/core/src/org/labkey/core/search/NoopSearchService.java index 52b095c92f5..3fb5884a6f7 100644 --- a/core/src/org/labkey/core/search/NoopSearchService.java +++ b/core/src/org/labkey/core/search/NoopSearchService.java @@ -57,11 +57,6 @@ public void addResource(@NotNull WebdavResource r, PRIORITY pri) { } - @Override - public void addNoop(PRIORITY pri) - { - } - @Override public void setReady() { @@ -169,7 +164,7 @@ public Map getIndexFormatProperties() } @Override - public WebPartView getSearchView(boolean includeSubfolders, int textBoxWidth, boolean includeHelpLink, boolean isWebpart) + public WebPartView getSearchView(boolean includeSubfolders, int textBoxWidth, boolean includeHelpLink, boolean isWebpart) { return null; } @@ -235,7 +230,7 @@ public WebdavResource resolveResource(@NotNull String resourceIdentifier) } @Override - public HttpView getCustomSearchResult(User user, @NotNull String resourceIdentifier) + public HttpView getCustomSearchResult(User user, @NotNull String resourceIdentifier) { return null; } diff --git a/search/src/org/labkey/search/SearchController.java b/search/src/org/labkey/search/SearchController.java index 1792b2bfc53..a6dccaafc8e 100644 --- a/search/src/org/labkey/search/SearchController.java +++ b/search/src/org/labkey/search/SearchController.java @@ -20,6 +20,7 @@ import org.apache.commons.lang3.EnumUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hc.core5.http.HttpStatus; +import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.json.JSONObject; @@ -58,6 +59,7 @@ import org.labkey.api.util.Path; import org.labkey.api.util.ResponseHelper; import org.labkey.api.util.URLHelper; +import org.labkey.api.util.logging.LogHelper; import org.labkey.api.view.ActionURL; import org.labkey.api.view.FolderManagement.FolderManagementViewPostAction; import org.labkey.api.view.HtmlView; @@ -93,6 +95,8 @@ public class SearchController extends SpringActionController { private static final DefaultActionResolver _actionResolver = new DefaultActionResolver(SearchController.class); + private static final Logger LOG = LogHelper.getLogger(SearchController.class, "Search UI and admin"); + public SearchController() { setActionResolver(_actionResolver); @@ -858,8 +862,11 @@ public static class WaitForIndexerAction extends ExportAction public void export(PriorityForm form, HttpServletResponse response, BindException errors) throws Exception { SearchService ss = SearchService.get(); + long startTime = System.currentTimeMillis(); boolean success = ss.drainQueue(form.getPriority(), 5, TimeUnit.MINUTES); + LOG.info("Spent {}ms draining the search indexer queue. Success: {}", System.currentTimeMillis() - startTime, success); + // Return an error if we time out if (!success) response.setStatus(HttpStatus.SC_INTERNAL_SERVER_ERROR); diff --git a/search/src/org/labkey/search/model/AbstractIndexTask.java b/search/src/org/labkey/search/model/AbstractIndexTask.java index f17080861d2..f6e5caf71e0 100644 --- a/search/src/org/labkey/search/model/AbstractIndexTask.java +++ b/search/src/org/labkey/search/model/AbstractIndexTask.java @@ -15,6 +15,7 @@ */ package org.labkey.search.model; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.labkey.api.search.SearchService; @@ -219,7 +220,7 @@ public SearchService.IndexTask get() throws InterruptedException @Override - public SearchService.IndexTask get(long timeout, TimeUnit unit) throws InterruptedException + public SearchService.IndexTask get(long timeout, @NotNull TimeUnit unit) throws InterruptedException { synchronized (_completeEvent) { diff --git a/search/src/org/labkey/search/model/AbstractSearchService.java b/search/src/org/labkey/search/model/AbstractSearchService.java index d70caf3488e..728d85b6d7c 100644 --- a/search/src/org/labkey/search/model/AbstractSearchService.java +++ b/search/src/org/labkey/search/model/AbstractSearchService.java @@ -65,7 +65,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -78,6 +77,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; public abstract class AbstractSearchService implements SearchService, ShutdownListener { @@ -98,12 +98,12 @@ private static Logger getLoggerForCategory(String category) } // Runnables go here, and get pulled off in a single-threaded manner (assumption is that Runnables can create work very quickly) - final PriorityBlockingQueue _runQueue = new PriorityBlockingQueue<>(1000, itemCompare); + final PriorityBlockingQueue _runQueue = new PriorityBlockingQueue<>(1000); // Resources go here for preprocessing (this can be multi-threaded) - final PriorityBlockingQueue _itemQueue = new PriorityBlockingQueue<>(1000, itemCompare); + final PriorityBlockingQueue _itemQueue = new PriorityBlockingQueue<>(1000); - private final List _tasks = new CopyOnWriteArrayList<>(); + private final List<_IndexTask> _tasks = new CopyOnWriteArrayList<>(); private final _IndexTask _defaultTask = new _IndexTask("default"); private Throwable _configurationError = null; @@ -113,9 +113,6 @@ enum OPERATION add, delete, noop } - static final Comparator itemCompare = Comparator.comparing(o -> o._pri); - - public AbstractSearchService() { addSearchCategory(fileCategory); @@ -124,7 +121,7 @@ public AbstractSearchService() @Override - public IndexTask createTask(String description) + public _IndexTask createTask(String description) { _IndexTask task = new _IndexTask(description); addTask(task); @@ -132,7 +129,7 @@ public IndexTask createTask(String description) } @Override - public IndexTask createTask(String description, TaskListener l) + public _IndexTask createTask(String description, TaskListener l) { _IndexTask task = new _IndexTask(description, l); addTask(task); @@ -140,7 +137,7 @@ public IndexTask createTask(String description, TaskListener l) } @Override - public IndexTask defaultTask() + public _IndexTask defaultTask() { return _defaultTask; } @@ -160,7 +157,7 @@ public void addPathToCrawl(Path path, Date next) } - class _IndexTask extends AbstractIndexTask + public class _IndexTask extends AbstractIndexTask { _IndexTask(String description) { @@ -201,17 +198,22 @@ public void addResource(@NotNull WebdavResource r, PRIORITY pri) queueItem(i); } - - @Override public void addNoop(PRIORITY pri) { - final Item i = new Item( this, OPERATION.noop, "noop://noop", null, pri); + final Item i = new Item( this, OPERATION.noop, "noop://noop", null, pri) + { + @Override + void complete(boolean success) + { + logQueueStatus("addNoop() complete"); + super.complete(success); + } + }; addItem(i); final Item r = new Item(this, () -> queueItem(i), pri); queueItem(r); } - @Override public void completeItem(Item item, boolean success) { @@ -241,17 +243,21 @@ public void setReady() } } - + // Consider: remove _op/OPERATION (not used), subclasses for resource vs. runnable (would clarify invariants and make // hashCode() & equals() more straightforward), formalize _id (using Runnable.toString() seems weak). - class Item + public class Item implements Comparable { + static final AtomicLong seq = new AtomicLong(); + OPERATION _op; String _id; - IndexTask _task; + _IndexTask _task; WebdavResource _res; Runnable _run; PRIORITY _pri; + // Used to ensure FIFO ordering in the queue + final long seqNum = seq.incrementAndGet(); int _preprocessAttempts = 0; @@ -259,7 +265,7 @@ class Item long _start = 0; // used by setLastIndexed long _complete = 0; // really just for debugging - Item(IndexTask task, OPERATION op, String id, WebdavResource r, PRIORITY pri) + Item(_IndexTask task, OPERATION op, String id, WebdavResource r, PRIORITY pri) { if (null != r) _start = HeartBeat.currentTimeMillis(); @@ -270,7 +276,7 @@ class Item _task = task; } - Item(IndexTask task, Runnable r, PRIORITY pri) + Item(_IndexTask task, Runnable r, PRIORITY pri) { _run = r; _pri = null == pri ? PRIORITY.bulk : pri; @@ -297,7 +303,7 @@ void complete(boolean success) { if (null != _task) { - ((_IndexTask)_task).completeItem(this, success); + _task.completeItem(this, success); } if (!success) @@ -337,6 +343,15 @@ public String toString() { return "Item{" + (null != _res ? _res.toString() : null != _run ? _run.toString() : _op.name()) + '}'; } + + @Override + public int compareTo(@NotNull AbstractSearchService.Item o) + { + int res = _pri.compareTo(o._pri) * -1; + if (res == 0 && o != this) + res = (seqNum < o.seqNum ? -1 : 1); + return res; + } } @@ -462,7 +477,7 @@ private void queueItem(Item i) return; } - _log.debug("_submitQueue.put(" + i._id + ")"); + logQueueStatus("_submitQueue.put(" + i._id + ")"); if (null != i._run) { @@ -518,14 +533,14 @@ public String toString() } - public void addTask(IndexTask task) + private void addTask(_IndexTask task) { _tasks.add(task); } @Override - public List getTasks() + public List<_IndexTask> getTasks() { return new LinkedList<>(_tasks); } @@ -605,22 +620,41 @@ public boolean _eq(URLHelper a, URLHelper b) public boolean drainQueue(PRIORITY priority, long timeout, TimeUnit unit) throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); - SearchService.IndexTask task = createTask("WaitForIndexer", new SearchService.TaskListener() + long start = System.currentTimeMillis(); + _IndexTask task = createTask("WaitForIndexer", new TaskListener() { - @Override public void success() + @Override + public void success() + { + logQueueStatus("drainQueue's TaskListener.success()"); + latch.countDown(); + } + + @Override + public void indexError(Resource r, Throwable t) { + logQueueStatus("drainQueue's TaskListener.indexError()"); latch.countDown(); } - @Override public void indexError(Resource r, Throwable t) { } }); - task.addNoop(priority); + // The indexer uses multiple threads for different types of work. Queue a Runnable first, and when it executes, + // queue an Item to ensure all queues are cleared + task.addRunnable(priority, () -> { + logQueueStatus("drainQueue's Runnable.run()"); + task.addNoop(priority); + }); task.setReady(); - boolean success = latch.await(timeout, unit); refreshNow(); + logQueueStatus("drainQueue() complete after " + (System.currentTimeMillis() - start) + " ms"); return success; } + private void logQueueStatus(String message) + { + _log.debug("{}; _runQueue.size() = {}, _itemQueue.size() = {}", message, _runQueue.size(), _itemQueue.size()); + } + @Override public void notFound(URLHelper in) { @@ -864,10 +898,10 @@ public boolean isRunning() public void purgeQueues() { _defaultTask._subtasks.clear(); - for (IndexTask t : getTasks()) + for (AbstractIndexTask t : getTasks()) { t.cancel(true); - ((AbstractIndexTask)t)._subtasks.clear(); + t._subtasks.clear(); } _runQueue.clear(); _itemQueue.clear(); @@ -1137,7 +1171,7 @@ private void _indexLoop() if (null != out[0]) { - _IndexTask t = (_IndexTask)i._task; + _IndexTask t = i._task; if (null != t && null != t._listener) t._listener.indexError(r,out[0]); }