Skip to content

Improve detection of an idle indexer #6829

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: release25.7-SNAPSHOT
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions api/src/org/labkey/api/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ static void setInstance(SearchService impl)
*/
boolean drainQueue(PRIORITY priority, long timeout, TimeUnit unit) throws InterruptedException;

/** From lowest to highest priority */
enum PRIORITY
{
commit,
Expand Down Expand Up @@ -211,9 +212,6 @@ default void addRunnable(@NotNull SearchService.PRIORITY pri, @NotNull Runnable

void addResource(@NotNull WebdavResource r, SearchService.PRIORITY pri);

/* This adds do nothing item to the queue, this is only useful for tracking progress of the queue. see TaskListener. */
void addNoop(SearchService.PRIORITY pri);

default <T> void addResourceList(List<T> list, int batchSize, Function<T,WebdavResource> mapper)
{
ListUtils.partition(list, batchSize).forEach(sublist ->
Expand Down Expand Up @@ -431,7 +429,7 @@ public String normalizeHref(Path contextPath, Container c)
// helper to call when not found exception is detected
void notFound(URLHelper url);

List<IndexTask> getTasks();
List<? extends IndexTask> getTasks();

void addPathToCrawl(Path path, @Nullable Date nextCrawl);

Expand Down
9 changes: 2 additions & 7 deletions core/src/org/labkey/core/search/NoopSearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ public void addResource(@NotNull WebdavResource r, PRIORITY pri)
{
}

@Override
public void addNoop(PRIORITY pri)
{
}

@Override
public void setReady()
{
Expand Down Expand Up @@ -169,7 +164,7 @@ public Map<String, String> getIndexFormatProperties()
}

@Override
public WebPartView getSearchView(boolean includeSubfolders, int textBoxWidth, boolean includeHelpLink, boolean isWebpart)
public WebPartView<?> getSearchView(boolean includeSubfolders, int textBoxWidth, boolean includeHelpLink, boolean isWebpart)
{
return null;
}
Expand Down Expand Up @@ -235,7 +230,7 @@ public WebdavResource resolveResource(@NotNull String resourceIdentifier)
}

@Override
public HttpView getCustomSearchResult(User user, @NotNull String resourceIdentifier)
public HttpView<?> getCustomSearchResult(User user, @NotNull String resourceIdentifier)
{
return null;
}
Expand Down
7 changes: 7 additions & 0 deletions search/src/org/labkey/search/SearchController.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.commons.lang3.EnumUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.json.JSONObject;
Expand Down Expand Up @@ -58,6 +59,7 @@
import org.labkey.api.util.Path;
import org.labkey.api.util.ResponseHelper;
import org.labkey.api.util.URLHelper;
import org.labkey.api.util.logging.LogHelper;
import org.labkey.api.view.ActionURL;
import org.labkey.api.view.FolderManagement.FolderManagementViewPostAction;
import org.labkey.api.view.HtmlView;
Expand Down Expand Up @@ -93,6 +95,8 @@ public class SearchController extends SpringActionController
{
private static final DefaultActionResolver _actionResolver = new DefaultActionResolver(SearchController.class);

private static final Logger LOG = LogHelper.getLogger(SearchController.class, "Search UI and admin");

public SearchController()
{
setActionResolver(_actionResolver);
Expand Down Expand Up @@ -858,8 +862,11 @@ public static class WaitForIndexerAction extends ExportAction<PriorityForm>
public void export(PriorityForm form, HttpServletResponse response, BindException errors) throws Exception
{
SearchService ss = SearchService.get();
long startTime = System.currentTimeMillis();
boolean success = ss.drainQueue(form.getPriority(), 5, TimeUnit.MINUTES);

LOG.info("Spent {}ms draining the search indexer queue. Success: {}", System.currentTimeMillis() - startTime, success);

// Return an error if we time out
if (!success)
response.setStatus(HttpStatus.SC_INTERNAL_SERVER_ERROR);
Expand Down
3 changes: 2 additions & 1 deletion search/src/org/labkey/search/model/AbstractIndexTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.labkey.search.model;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.labkey.api.search.SearchService;

Expand Down Expand Up @@ -219,7 +220,7 @@ public SearchService.IndexTask get() throws InterruptedException


@Override
public SearchService.IndexTask get(long timeout, TimeUnit unit) throws InterruptedException
public SearchService.IndexTask get(long timeout, @NotNull TimeUnit unit) throws InterruptedException
{
synchronized (_completeEvent)
{
Expand Down
98 changes: 66 additions & 32 deletions search/src/org/labkey/search/model/AbstractSearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -78,6 +77,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public abstract class AbstractSearchService implements SearchService, ShutdownListener
{
Expand All @@ -98,12 +98,12 @@ private static Logger getLoggerForCategory(String category)
}

// Runnables go here, and get pulled off in a single-threaded manner (assumption is that Runnables can create work very quickly)
final PriorityBlockingQueue<Item> _runQueue = new PriorityBlockingQueue<>(1000, itemCompare);
final PriorityBlockingQueue<Item> _runQueue = new PriorityBlockingQueue<>(1000);

// Resources go here for preprocessing (this can be multi-threaded)
final PriorityBlockingQueue<Item> _itemQueue = new PriorityBlockingQueue<>(1000, itemCompare);
final PriorityBlockingQueue<Item> _itemQueue = new PriorityBlockingQueue<>(1000);

private final List<IndexTask> _tasks = new CopyOnWriteArrayList<>();
private final List<_IndexTask> _tasks = new CopyOnWriteArrayList<>();
private final _IndexTask _defaultTask = new _IndexTask("default");

private Throwable _configurationError = null;
Expand All @@ -113,9 +113,6 @@ enum OPERATION
add, delete, noop
}

static final Comparator<Item> itemCompare = Comparator.comparing(o -> o._pri);


public AbstractSearchService()
{
addSearchCategory(fileCategory);
Expand All @@ -124,23 +121,23 @@ public AbstractSearchService()


@Override
public IndexTask createTask(String description)
public _IndexTask createTask(String description)
{
_IndexTask task = new _IndexTask(description);
addTask(task);
return task;
}

@Override
public IndexTask createTask(String description, TaskListener l)
public _IndexTask createTask(String description, TaskListener l)
{
_IndexTask task = new _IndexTask(description, l);
addTask(task);
return task;
}

@Override
public IndexTask defaultTask()
public _IndexTask defaultTask()
{
return _defaultTask;
}
Expand All @@ -160,7 +157,7 @@ public void addPathToCrawl(Path path, Date next)
}


class _IndexTask extends AbstractIndexTask
public class _IndexTask extends AbstractIndexTask
{
_IndexTask(String description)
{
Expand Down Expand Up @@ -201,17 +198,22 @@ public void addResource(@NotNull WebdavResource r, PRIORITY pri)
queueItem(i);
}


@Override
public void addNoop(PRIORITY pri)
{
final Item i = new Item( this, OPERATION.noop, "noop://noop", null, pri);
final Item i = new Item( this, OPERATION.noop, "noop://noop", null, pri)
{
@Override
void complete(boolean success)
{
logQueueStatus("addNoop() complete");
super.complete(success);
}
};
addItem(i);
final Item r = new Item(this, () -> queueItem(i), pri);
queueItem(r);
}


@Override
public void completeItem(Item item, boolean success)
{
Expand Down Expand Up @@ -241,25 +243,29 @@ public void setReady()
}
}


// Consider: remove _op/OPERATION (not used), subclasses for resource vs. runnable (would clarify invariants and make
// hashCode() & equals() more straightforward), formalize _id (using Runnable.toString() seems weak).
class Item
public class Item implements Comparable<Item>
{
static final AtomicLong seq = new AtomicLong();

OPERATION _op;
String _id;
IndexTask _task;
_IndexTask _task;
WebdavResource _res;
Runnable _run;
PRIORITY _pri;
// Used to ensure FIFO ordering in the queue
final long seqNum = seq.incrementAndGet();

int _preprocessAttempts = 0;

long _modified = 0; // used by setLastIndexed
long _start = 0; // used by setLastIndexed
long _complete = 0; // really just for debugging

Item(IndexTask task, OPERATION op, String id, WebdavResource r, PRIORITY pri)
Item(_IndexTask task, OPERATION op, String id, WebdavResource r, PRIORITY pri)
{
if (null != r)
_start = HeartBeat.currentTimeMillis();
Expand All @@ -270,7 +276,7 @@ class Item
_task = task;
}

Item(IndexTask task, Runnable r, PRIORITY pri)
Item(_IndexTask task, Runnable r, PRIORITY pri)
{
_run = r;
_pri = null == pri ? PRIORITY.bulk : pri;
Expand All @@ -297,7 +303,7 @@ void complete(boolean success)
{
if (null != _task)
{
((_IndexTask)_task).completeItem(this, success);
_task.completeItem(this, success);
}

if (!success)
Expand Down Expand Up @@ -337,6 +343,15 @@ public String toString()
{
return "Item{" + (null != _res ? _res.toString() : null != _run ? _run.toString() : _op.name()) + '}';
}

@Override
public int compareTo(@NotNull AbstractSearchService.Item o)
{
int res = _pri.compareTo(o._pri) * -1;
if (res == 0 && o != this)
res = (seqNum < o.seqNum ? -1 : 1);
return res;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

}


Expand Down Expand Up @@ -462,7 +477,7 @@ private void queueItem(Item i)
return;
}

_log.debug("_submitQueue.put(" + i._id + ")");
logQueueStatus("_submitQueue.put(" + i._id + ")");

if (null != i._run)
{
Expand Down Expand Up @@ -518,14 +533,14 @@ public String toString()
}


public void addTask(IndexTask task)
private void addTask(_IndexTask task)
{
_tasks.add(task);
}


@Override
public List<IndexTask> getTasks()
public List<_IndexTask> getTasks()
{
return new LinkedList<>(_tasks);
}
Expand Down Expand Up @@ -605,22 +620,41 @@ public boolean _eq(URLHelper a, URLHelper b)
public boolean drainQueue(PRIORITY priority, long timeout, TimeUnit unit) throws InterruptedException
{
final CountDownLatch latch = new CountDownLatch(1);
SearchService.IndexTask task = createTask("WaitForIndexer", new SearchService.TaskListener()
long start = System.currentTimeMillis();
_IndexTask task = createTask("WaitForIndexer", new TaskListener()
{
@Override public void success()
@Override
public void success()
{
logQueueStatus("drainQueue's TaskListener.success()");
latch.countDown();
}

@Override
public void indexError(Resource r, Throwable t)
{
logQueueStatus("drainQueue's TaskListener.indexError()");
latch.countDown();
}
@Override public void indexError(Resource r, Throwable t) { }
});
task.addNoop(priority);
// The indexer uses multiple threads for different types of work. Queue a Runnable first, and when it executes,
// queue an Item to ensure all queues are cleared
task.addRunnable(priority, () -> {
logQueueStatus("drainQueue's Runnable.run()");
task.addNoop(priority);
});
task.setReady();

boolean success = latch.await(timeout, unit);
refreshNow();
logQueueStatus("drainQueue() complete after " + (System.currentTimeMillis() - start) + " ms");
return success;
}

private void logQueueStatus(String message)
{
_log.debug("{}; _runQueue.size() = {}, _itemQueue.size() = {}", message, _runQueue.size(), _itemQueue.size());
}

@Override
public void notFound(URLHelper in)
{
Expand Down Expand Up @@ -864,10 +898,10 @@ public boolean isRunning()
public void purgeQueues()
{
_defaultTask._subtasks.clear();
for (IndexTask t : getTasks())
for (AbstractIndexTask t : getTasks())
{
t.cancel(true);
((AbstractIndexTask)t)._subtasks.clear();
t._subtasks.clear();
}
_runQueue.clear();
_itemQueue.clear();
Expand Down Expand Up @@ -1137,7 +1171,7 @@ private void _indexLoop()

if (null != out[0])
{
_IndexTask t = (_IndexTask)i._task;
_IndexTask t = i._task;
if (null != t && null != t._listener)
t._listener.indexError(r,out[0]);
}
Expand Down