From 59046baa38784d0ff6a5b693bbcd0679b2fb8434 Mon Sep 17 00:00:00 2001 From: Kaushal Kumar Date: Sun, 25 Aug 2024 16:31:22 -0700 Subject: [PATCH 1/9] add rejection listener Signed-off-by: Kaushal Kumar --- .../action/search/TransportSearchAction.java | 5 +++ .../main/java/org/opensearch/node/Node.java | 9 +++- .../wlm/QueryGroupFailureListener.java | 21 ++++++++++ .../org/opensearch/wlm/QueryGroupService.java | 32 ++++++++++++++ ...roupRequestRejectionOperationListener.java | 42 +++++++++++++++++++ 5 files changed, 108 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/opensearch/wlm/QueryGroupFailureListener.java create mode 100644 server/src/main/java/org/opensearch/wlm/QueryGroupService.java create mode 100644 server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 88bf7ebea8e52..e9b84863f72a2 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -316,6 +316,11 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< listener ); } + + if (task instanceof QueryGroupTask) { + listener = + } + executeRequest(task, searchRequest, this::searchAsyncAction, listener); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 1a9b233b387b2..913f44e7e4bf3 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -266,7 +266,9 @@ import org.opensearch.transport.TransportService; import org.opensearch.usage.UsageService; import org.opensearch.watcher.ResourceWatcherService; +import org.opensearch.wlm.QueryGroupService; import org.opensearch.wlm.WorkloadManagementTransportInterceptor; +import org.opensearch.wlm.listeners.QueryGroupRequestRejectionOperationListener; import javax.net.ssl.SNIHostName; @@ -1011,11 +1013,16 @@ protected Node( // Add the telemetryAwarePlugin components to the existing pluginComponents collection. pluginComponents.addAll(telemetryAwarePluginComponents); + final QueryGroupRequestRejectionOperationListener queryGroupRequestRejectionListener = new QueryGroupRequestRejectionOperationListener( + new QueryGroupService(), // We will need to replace this with actual instance of the queryGroupService + threadPool + ); + // register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = new SearchRequestOperationsCompositeListenerFactory( Stream.concat( - Stream.of(searchRequestStats, searchRequestSlowLog, searchTaskRequestOperationsListener), + Stream.of(searchRequestStats, searchRequestSlowLog, searchTaskRequestOperationsListener, queryGroupRequestRejectionListener), pluginComponents.stream() .filter(p -> p instanceof SearchRequestOperationsListener) .map(p -> (SearchRequestOperationsListener) p) diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupFailureListener.java b/server/src/main/java/org/opensearch/wlm/QueryGroupFailureListener.java new file mode 100644 index 0000000000000..3e3a3dffe139d --- /dev/null +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupFailureListener.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm; + +import org.opensearch.core.action.ActionListener; + +public class QueryGroupFailureListener { + public static ActionListener wrap(QueryGroupTask task, ActionListener originalListener) { + return ActionListener.wrap(originalListener::onResponse, exception -> { + // Call QueryGroup service to increment failures for query group of task + System.out.println("QueryGroup listener is invoked"); + originalListener.onFailure(exception); + }); + } +} diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java new file mode 100644 index 0000000000000..dc6532847a647 --- /dev/null +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm; + +/** + * This is stub at this point in time + */ +public class QueryGroupService { + /** + * updates the failure stats for the query group + * @param queryGroupId query group identifier + */ + public void requestFailedFor(final String queryGroupId) { + + } + + /** + * + * @param queryGroupId query group identifier + * @return whether the queryGroup is contended and should reject new incoming requests + */ + public boolean shouldRejectFor(String queryGroupId) { + if (queryGroupId == null) return false; + return false; + } +} diff --git a/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java b/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java new file mode 100644 index 0000000000000..638d6ce8f360c --- /dev/null +++ b/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm.listeners; + +import org.opensearch.action.search.SearchRequestContext; +import org.opensearch.action.search.SearchRequestOperationsListener; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.wlm.QueryGroupService; +import org.opensearch.wlm.QueryGroupTask; + +/** + * This listener is used to perform the rejections for incoming requests into a queryGroup + */ +public class QueryGroupRequestRejectionOperationListener extends SearchRequestOperationsListener { + + private final QueryGroupService queryGroupService; + private final ThreadPool threadPool; + + public QueryGroupRequestRejectionOperationListener(QueryGroupService queryGroupService, ThreadPool threadPool) { + this.queryGroupService = queryGroupService; + this.threadPool = threadPool; + } + + /** + * This method assumes that the queryGroupId is already populated in the {@link ThreadContext} + * @param searchRequestContext SearchRequestContext instance + */ + @Override + protected void onRequestStart(SearchRequestContext searchRequestContext) { + final String queryGroupId = threadPool.getThreadContext().getHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER); + if (queryGroupService.shouldRejectFor(queryGroupId)) { + throw new OpenSearchRejectedExecutionException("QueryGroup " + queryGroupId + " is already contended."); + } + } +} From b583b61e68f9e01de870f54d5509bad1cd5c16ae Mon Sep 17 00:00:00 2001 From: Kaushal Kumar Date: Mon, 26 Aug 2024 12:28:32 -0700 Subject: [PATCH 2/9] add rejection listener unit test Signed-off-by: Kaushal Kumar --- .../action/search/TransportSearchAction.java | 5 -- .../org/opensearch/wlm/QueryGroupService.java | 13 +++-- ...roupRequestRejectionOperationListener.java | 7 ++- ...equestRejectionOperationListenerTests.java | 58 +++++++++++++++++++ 4 files changed, 72 insertions(+), 11 deletions(-) create mode 100644 server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListenerTests.java diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index e9b84863f72a2..88bf7ebea8e52 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -316,11 +316,6 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< listener ); } - - if (task instanceof QueryGroupTask) { - listener = - } - executeRequest(task, searchRequest, this::searchAsyncAction, listener); } diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java index dc6532847a647..12c5399b08ba5 100644 --- a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java @@ -8,8 +8,10 @@ package org.opensearch.wlm; +import java.util.Optional; + /** - * This is stub at this point in time + * This is stub at this point in time and will be replace by an acutal one in couple of days */ public class QueryGroupService { /** @@ -25,8 +27,11 @@ public void requestFailedFor(final String queryGroupId) { * @param queryGroupId query group identifier * @return whether the queryGroup is contended and should reject new incoming requests */ - public boolean shouldRejectFor(String queryGroupId) { - if (queryGroupId == null) return false; - return false; + public Optional shouldRejectFor(String queryGroupId) { + if (queryGroupId == null) return Optional.empty(); + // TODO: At this point this is dummy and we need to decide whether to cancel the request based on last + // reported resource usage for the queryGroup. We also need to increment the rejection count here for the + // query group + return Optional.of("Possible reason. "); } } diff --git a/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java b/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java index 638d6ce8f360c..c850535e23e2f 100644 --- a/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java +++ b/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java @@ -15,6 +15,8 @@ import org.opensearch.wlm.QueryGroupService; import org.opensearch.wlm.QueryGroupTask; +import java.util.Optional; + /** * This listener is used to perform the rejections for incoming requests into a queryGroup */ @@ -35,8 +37,9 @@ public QueryGroupRequestRejectionOperationListener(QueryGroupService queryGroupS @Override protected void onRequestStart(SearchRequestContext searchRequestContext) { final String queryGroupId = threadPool.getThreadContext().getHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER); - if (queryGroupService.shouldRejectFor(queryGroupId)) { - throw new OpenSearchRejectedExecutionException("QueryGroup " + queryGroupId + " is already contended."); + Optional reason = queryGroupService.shouldRejectFor(queryGroupId); + if (reason.isPresent()) { + throw new OpenSearchRejectedExecutionException("QueryGroup " + queryGroupId + " is already contended." + reason.get()); } } } diff --git a/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListenerTests.java b/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListenerTests.java new file mode 100644 index 0000000000000..11d25e13ba0ba --- /dev/null +++ b/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListenerTests.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm.listeners; + +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.wlm.QueryGroupService; +import org.opensearch.wlm.QueryGroupTask; + +import java.util.Optional; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class QueryGroupRequestRejectionOperationListenerTests extends OpenSearchTestCase { + ThreadPool testThreadPool; + QueryGroupService queryGroupService; + QueryGroupRequestRejectionOperationListener sut; + + + public void setUp() throws Exception { + super.setUp(); + testThreadPool = new TestThreadPool("RejectionTestThreadPool"); + } + + public void tearDown() throws Exception { + super.tearDown(); + testThreadPool.shutdown(); + } + + public void testRejectionCase() { + queryGroupService = mock(QueryGroupService.class); + sut = new QueryGroupRequestRejectionOperationListener(queryGroupService, testThreadPool); + final String testQueryGroupId = "asdgasgkajgkw3141_3rt4t"; + testThreadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, testQueryGroupId); + when(queryGroupService.shouldRejectFor(testQueryGroupId)).thenReturn(Optional.of("Test query group is contended")); + + assertThrows(OpenSearchRejectedExecutionException.class, () -> sut.onRequestStart(null)); + } + + public void testNonRejectionCase() { + queryGroupService = mock(QueryGroupService.class); + sut = new QueryGroupRequestRejectionOperationListener(queryGroupService, testThreadPool); + final String testQueryGroupId = "asdgasgkajgkw3141_3rt4t"; + testThreadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, testQueryGroupId); + when(queryGroupService.shouldRejectFor(testQueryGroupId)).thenReturn(Optional.empty()); + + sut.onRequestStart(null); + } +} From aa84c89bfc8c95d21eb78c344a1317fdb453ca52 Mon Sep 17 00:00:00 2001 From: Kaushal Kumar Date: Mon, 26 Aug 2024 12:47:24 -0700 Subject: [PATCH 3/9] add rejection logic for shard level requests Signed-off-by: Kaushal Kumar --- .../main/java/org/opensearch/node/Node.java | 3 ++- ...orkloadManagementTransportInterceptor.java | 20 ++++++++++++++++--- ...adManagementTransportInterceptorTests.java | 2 +- ...anagementTransportRequestHandlerTests.java | 20 +++++++++++++++---- 4 files changed, 36 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 913f44e7e4bf3..d302764fefab2 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1072,7 +1072,8 @@ protected Node( ); WorkloadManagementTransportInterceptor workloadManagementTransportInterceptor = new WorkloadManagementTransportInterceptor( - threadPool + threadPool, + new QueryGroupService() // We will need to replace this with actual implementation ); final Collection secureSettingsFactories = pluginsService.filterPlugins(Plugin.class) diff --git a/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java b/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java index 848df8712549a..fac0e7ac484c6 100644 --- a/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java +++ b/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java @@ -8,6 +8,7 @@ package org.opensearch.wlm; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportChannel; @@ -15,14 +16,18 @@ import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportRequestHandler; +import java.util.Optional; + /** * This class is used to intercept search traffic requests and populate the queryGroupId header in task headers */ public class WorkloadManagementTransportInterceptor implements TransportInterceptor { private final ThreadPool threadPool; + private final QueryGroupService queryGroupService; - public WorkloadManagementTransportInterceptor(ThreadPool threadPool) { + public WorkloadManagementTransportInterceptor(final ThreadPool threadPool, final QueryGroupService queryGroupService) { this.threadPool = threadPool; + this.queryGroupService = queryGroupService; } @Override @@ -32,7 +37,7 @@ public TransportRequestHandler interceptHandler( boolean forceExecution, TransportRequestHandler actualHandler ) { - return new RequestHandler(threadPool, actualHandler); + return new RequestHandler(threadPool, actualHandler, queryGroupService); } /** @@ -43,16 +48,25 @@ public static class RequestHandler implements Transp private final ThreadPool threadPool; TransportRequestHandler actualHandler; + private final QueryGroupService queryGroupService; - public RequestHandler(ThreadPool threadPool, TransportRequestHandler actualHandler) { + public RequestHandler(ThreadPool threadPool, TransportRequestHandler actualHandler, + QueryGroupService queryGroupService) { this.threadPool = threadPool; this.actualHandler = actualHandler; + this.queryGroupService = queryGroupService; } @Override public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { if (isSearchWorkloadRequest(task)) { ((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext()); + final String queryGroupId = ((QueryGroupTask) (task)).getQueryGroupId(); + Optional reason = queryGroupService.shouldRejectFor(queryGroupId); + + if (reason.isPresent()) { + throw new OpenSearchRejectedExecutionException("QueryGroup " + queryGroupId + " is already contended." + reason.get()); + } } actualHandler.messageReceived(request, channel, task); } diff --git a/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportInterceptorTests.java b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportInterceptorTests.java index db4e5e45d49ed..4668b845150a9 100644 --- a/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportInterceptorTests.java +++ b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportInterceptorTests.java @@ -25,7 +25,7 @@ public class WorkloadManagementTransportInterceptorTests extends OpenSearchTestC public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool(getTestName()); - sut = new WorkloadManagementTransportInterceptor(threadPool); + sut = new WorkloadManagementTransportInterceptor(threadPool, new QueryGroupService()); } public void tearDown() throws Exception { diff --git a/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java index 789c02345e774..8ac321b211b79 100644 --- a/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java +++ b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java @@ -9,6 +9,7 @@ package org.opensearch.wlm; import org.opensearch.action.index.IndexRequest; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.tasks.Task; import org.opensearch.test.OpenSearchTestCase; @@ -19,13 +20,14 @@ import org.opensearch.transport.TransportRequestHandler; import java.util.Collections; +import java.util.Optional; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.*; public class WorkloadManagementTransportRequestHandlerTests extends OpenSearchTestCase { private WorkloadManagementTransportInterceptor.RequestHandler sut; private ThreadPool threadPool; + private QueryGroupService queryGroupService; private TestTransportRequestHandler actualHandler; @@ -33,8 +35,9 @@ public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool(getTestName()); actualHandler = new TestTransportRequestHandler<>(); + queryGroupService = mock(QueryGroupService.class); - sut = new WorkloadManagementTransportInterceptor.RequestHandler<>(threadPool, actualHandler); + sut = new WorkloadManagementTransportInterceptor.RequestHandler<>(threadPool, actualHandler, queryGroupService); } public void tearDown() throws Exception { @@ -42,14 +45,23 @@ public void tearDown() throws Exception { threadPool.shutdown(); } - public void testMessageReceivedForSearchWorkload() throws Exception { + public void testMessageReceivedForSearchWorkload_nonRejectionCase() throws Exception { ShardSearchRequest request = mock(ShardSearchRequest.class); QueryGroupTask spyTask = getSpyTask(); + when(queryGroupService.shouldRejectFor(anyString())).thenReturn(Optional.empty()); sut.messageReceived(request, mock(TransportChannel.class), spyTask); assertTrue(sut.isSearchWorkloadRequest(spyTask)); } + public void testMessageReceivedForSearchWorkload_RejectionCase() throws Exception { + ShardSearchRequest request = mock(ShardSearchRequest.class); + QueryGroupTask spyTask = getSpyTask(); + when(queryGroupService.shouldRejectFor(anyString())).thenReturn(Optional.of("QueryGroup is contended.")); + + assertThrows(OpenSearchRejectedExecutionException.class, () -> sut.messageReceived(request, mock(TransportChannel.class), spyTask)); + } + public void testMessageReceivedForNonSearchWorkload() throws Exception { IndexRequest indexRequest = mock(IndexRequest.class); Task task = mock(Task.class); From 58111012e2d710b138c577889a902457d6c2a5ae Mon Sep 17 00:00:00 2001 From: Kaushal Kumar Date: Mon, 26 Aug 2024 12:49:03 -0700 Subject: [PATCH 4/9] add changelog entry Signed-off-by: Kaushal Kumar --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cd02af4f625b9..84e203f3649bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325)) - Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895)) - Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336)) +- [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428))) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) From ca37873719f9af14a2571ce971eb5c117d159a7a Mon Sep 17 00:00:00 2001 From: Kaushal Kumar Date: Mon, 26 Aug 2024 13:05:14 -0700 Subject: [PATCH 5/9] apply spotless check Signed-off-by: Kaushal Kumar --- .../src/main/java/org/opensearch/node/Node.java | 16 +++++++++++----- .../org/opensearch/wlm/QueryGroupService.java | 4 ++-- .../WorkloadManagementTransportInterceptor.java | 3 +-- ...adManagementTransportRequestHandlerTests.java | 5 ++++- ...upRequestRejectionOperationListenerTests.java | 1 - 5 files changed, 18 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index d302764fefab2..12e50577f5fb7 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1013,16 +1013,22 @@ protected Node( // Add the telemetryAwarePlugin components to the existing pluginComponents collection. pluginComponents.addAll(telemetryAwarePluginComponents); - final QueryGroupRequestRejectionOperationListener queryGroupRequestRejectionListener = new QueryGroupRequestRejectionOperationListener( - new QueryGroupService(), // We will need to replace this with actual instance of the queryGroupService - threadPool - ); + final QueryGroupRequestRejectionOperationListener queryGroupRequestRejectionListener = + new QueryGroupRequestRejectionOperationListener( + new QueryGroupService(), // We will need to replace this with actual instance of the queryGroupService + threadPool + ); // register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = new SearchRequestOperationsCompositeListenerFactory( Stream.concat( - Stream.of(searchRequestStats, searchRequestSlowLog, searchTaskRequestOperationsListener, queryGroupRequestRejectionListener), + Stream.of( + searchRequestStats, + searchRequestSlowLog, + searchTaskRequestOperationsListener, + queryGroupRequestRejectionListener + ), pluginComponents.stream() .filter(p -> p instanceof SearchRequestOperationsListener) .map(p -> (SearchRequestOperationsListener) p) diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java index 12c5399b08ba5..6ba286d68670b 100644 --- a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java @@ -30,8 +30,8 @@ public void requestFailedFor(final String queryGroupId) { public Optional shouldRejectFor(String queryGroupId) { if (queryGroupId == null) return Optional.empty(); // TODO: At this point this is dummy and we need to decide whether to cancel the request based on last - // reported resource usage for the queryGroup. We also need to increment the rejection count here for the - // query group + // reported resource usage for the queryGroup. We also need to increment the rejection count here for the + // query group return Optional.of("Possible reason. "); } } diff --git a/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java b/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java index fac0e7ac484c6..6bc66f1c5c04e 100644 --- a/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java +++ b/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java @@ -50,8 +50,7 @@ public static class RequestHandler implements Transp TransportRequestHandler actualHandler; private final QueryGroupService queryGroupService; - public RequestHandler(ThreadPool threadPool, TransportRequestHandler actualHandler, - QueryGroupService queryGroupService) { + public RequestHandler(ThreadPool threadPool, TransportRequestHandler actualHandler, QueryGroupService queryGroupService) { this.threadPool = threadPool; this.actualHandler = actualHandler; this.queryGroupService = queryGroupService; diff --git a/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java index 8ac321b211b79..a06f44da7078f 100644 --- a/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java +++ b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java @@ -22,7 +22,10 @@ import java.util.Collections; import java.util.Optional; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; public class WorkloadManagementTransportRequestHandlerTests extends OpenSearchTestCase { private WorkloadManagementTransportInterceptor.RequestHandler sut; diff --git a/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListenerTests.java b/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListenerTests.java index 11d25e13ba0ba..77a699fecd13a 100644 --- a/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListenerTests.java +++ b/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListenerTests.java @@ -25,7 +25,6 @@ public class QueryGroupRequestRejectionOperationListenerTests extends OpenSearch QueryGroupService queryGroupService; QueryGroupRequestRejectionOperationListener sut; - public void setUp() throws Exception { super.setUp(); testThreadPool = new TestThreadPool("RejectionTestThreadPool"); From d8e41e12cf70cc8812287ce3c9fb2967db11e785 Mon Sep 17 00:00:00 2001 From: Kaushal Kumar Date: Mon, 26 Aug 2024 16:41:32 -0700 Subject: [PATCH 6/9] remove unused files and fix precommit Signed-off-by: Kaushal Kumar --- .../wlm/QueryGroupFailureListener.java | 21 ------------------- ...roupRequestRejectionOperationListener.java | 2 +- 2 files changed, 1 insertion(+), 22 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/wlm/QueryGroupFailureListener.java diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupFailureListener.java b/server/src/main/java/org/opensearch/wlm/QueryGroupFailureListener.java deleted file mode 100644 index 3e3a3dffe139d..0000000000000 --- a/server/src/main/java/org/opensearch/wlm/QueryGroupFailureListener.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.wlm; - -import org.opensearch.core.action.ActionListener; - -public class QueryGroupFailureListener { - public static ActionListener wrap(QueryGroupTask task, ActionListener originalListener) { - return ActionListener.wrap(originalListener::onResponse, exception -> { - // Call QueryGroup service to increment failures for query group of task - System.out.println("QueryGroup listener is invoked"); - originalListener.onFailure(exception); - }); - } -} diff --git a/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java b/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java index c850535e23e2f..09414b111547d 100644 --- a/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java +++ b/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java @@ -31,7 +31,7 @@ public QueryGroupRequestRejectionOperationListener(QueryGroupService queryGroupS } /** - * This method assumes that the queryGroupId is already populated in the {@link ThreadContext} + * This method assumes that the queryGroupId is already populated in the thread context * @param searchRequestContext SearchRequestContext instance */ @Override From a3df783cf93890b643d4c81ff80fdc2894360fef Mon Sep 17 00:00:00 2001 From: Kaushal Kumar Date: Tue, 27 Aug 2024 15:28:19 -0700 Subject: [PATCH 7/9] refactor code Signed-off-by: Kaushal Kumar --- .../org/opensearch/wlm/QueryGroupService.java | 13 ++++++++----- .../WorkloadManagementTransportInterceptor.java | 9 +-------- ...ryGroupRequestRejectionOperationListener.java | 8 +------- ...adManagementTransportRequestHandlerTests.java | 9 ++++----- ...upRequestRejectionOperationListenerTests.java | 16 ++++++---------- 5 files changed, 20 insertions(+), 35 deletions(-) diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java index 6ba286d68670b..7eb2d5d3a8223 100644 --- a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java @@ -8,7 +8,7 @@ package org.opensearch.wlm; -import java.util.Optional; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; /** * This is stub at this point in time and will be replace by an acutal one in couple of days @@ -25,13 +25,16 @@ public void requestFailedFor(final String queryGroupId) { /** * * @param queryGroupId query group identifier - * @return whether the queryGroup is contended and should reject new incoming requests */ - public Optional shouldRejectFor(String queryGroupId) { - if (queryGroupId == null) return Optional.empty(); + public void rejectIfNeeded(String queryGroupId) { + if (queryGroupId == null) return; + boolean reject = false; + final StringBuilder reason = new StringBuilder(); // TODO: At this point this is dummy and we need to decide whether to cancel the request based on last // reported resource usage for the queryGroup. We also need to increment the rejection count here for the // query group - return Optional.of("Possible reason. "); + if (reject) { + throw new OpenSearchRejectedExecutionException("QueryGroup " + queryGroupId + " is already contended." + reason.toString()); + } } } diff --git a/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java b/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java index 6bc66f1c5c04e..d382b4c729a38 100644 --- a/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java +++ b/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java @@ -8,7 +8,6 @@ package org.opensearch.wlm; -import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportChannel; @@ -16,8 +15,6 @@ import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportRequestHandler; -import java.util.Optional; - /** * This class is used to intercept search traffic requests and populate the queryGroupId header in task headers */ @@ -61,11 +58,7 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro if (isSearchWorkloadRequest(task)) { ((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext()); final String queryGroupId = ((QueryGroupTask) (task)).getQueryGroupId(); - Optional reason = queryGroupService.shouldRejectFor(queryGroupId); - - if (reason.isPresent()) { - throw new OpenSearchRejectedExecutionException("QueryGroup " + queryGroupId + " is already contended." + reason.get()); - } + queryGroupService.rejectIfNeeded(queryGroupId); } actualHandler.messageReceived(request, channel, task); } diff --git a/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java b/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java index 09414b111547d..89f6fe709667f 100644 --- a/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java +++ b/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java @@ -10,13 +10,10 @@ import org.opensearch.action.search.SearchRequestContext; import org.opensearch.action.search.SearchRequestOperationsListener; -import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.threadpool.ThreadPool; import org.opensearch.wlm.QueryGroupService; import org.opensearch.wlm.QueryGroupTask; -import java.util.Optional; - /** * This listener is used to perform the rejections for incoming requests into a queryGroup */ @@ -37,9 +34,6 @@ public QueryGroupRequestRejectionOperationListener(QueryGroupService queryGroupS @Override protected void onRequestStart(SearchRequestContext searchRequestContext) { final String queryGroupId = threadPool.getThreadContext().getHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER); - Optional reason = queryGroupService.shouldRejectFor(queryGroupId); - if (reason.isPresent()) { - throw new OpenSearchRejectedExecutionException("QueryGroup " + queryGroupId + " is already contended." + reason.get()); - } + queryGroupService.rejectIfNeeded(queryGroupId); } } diff --git a/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java index a06f44da7078f..59818ad3dbbd2 100644 --- a/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java +++ b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java @@ -20,12 +20,12 @@ import org.opensearch.transport.TransportRequestHandler; import java.util.Collections; -import java.util.Optional; import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; public class WorkloadManagementTransportRequestHandlerTests extends OpenSearchTestCase { private WorkloadManagementTransportInterceptor.RequestHandler sut; @@ -51,8 +51,7 @@ public void tearDown() throws Exception { public void testMessageReceivedForSearchWorkload_nonRejectionCase() throws Exception { ShardSearchRequest request = mock(ShardSearchRequest.class); QueryGroupTask spyTask = getSpyTask(); - when(queryGroupService.shouldRejectFor(anyString())).thenReturn(Optional.empty()); - + doNothing().when(queryGroupService).rejectIfNeeded(anyString()); sut.messageReceived(request, mock(TransportChannel.class), spyTask); assertTrue(sut.isSearchWorkloadRequest(spyTask)); } @@ -60,7 +59,7 @@ public void testMessageReceivedForSearchWorkload_nonRejectionCase() throws Excep public void testMessageReceivedForSearchWorkload_RejectionCase() throws Exception { ShardSearchRequest request = mock(ShardSearchRequest.class); QueryGroupTask spyTask = getSpyTask(); - when(queryGroupService.shouldRejectFor(anyString())).thenReturn(Optional.of("QueryGroup is contended.")); + doThrow(OpenSearchRejectedExecutionException.class).when(queryGroupService).rejectIfNeeded(anyString()); assertThrows(OpenSearchRejectedExecutionException.class, () -> sut.messageReceived(request, mock(TransportChannel.class), spyTask)); } diff --git a/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListenerTests.java b/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListenerTests.java index 77a699fecd13a..19e82aca26153 100644 --- a/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListenerTests.java +++ b/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListenerTests.java @@ -15,10 +15,9 @@ import org.opensearch.wlm.QueryGroupService; import org.opensearch.wlm.QueryGroupTask; -import java.util.Optional; - +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class QueryGroupRequestRejectionOperationListenerTests extends OpenSearchTestCase { ThreadPool testThreadPool; @@ -28,6 +27,8 @@ public class QueryGroupRequestRejectionOperationListenerTests extends OpenSearch public void setUp() throws Exception { super.setUp(); testThreadPool = new TestThreadPool("RejectionTestThreadPool"); + queryGroupService = mock(QueryGroupService.class); + sut = new QueryGroupRequestRejectionOperationListener(queryGroupService, testThreadPool); } public void tearDown() throws Exception { @@ -36,21 +37,16 @@ public void tearDown() throws Exception { } public void testRejectionCase() { - queryGroupService = mock(QueryGroupService.class); - sut = new QueryGroupRequestRejectionOperationListener(queryGroupService, testThreadPool); final String testQueryGroupId = "asdgasgkajgkw3141_3rt4t"; testThreadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, testQueryGroupId); - when(queryGroupService.shouldRejectFor(testQueryGroupId)).thenReturn(Optional.of("Test query group is contended")); - + doThrow(OpenSearchRejectedExecutionException.class).when(queryGroupService).rejectIfNeeded(testQueryGroupId); assertThrows(OpenSearchRejectedExecutionException.class, () -> sut.onRequestStart(null)); } public void testNonRejectionCase() { - queryGroupService = mock(QueryGroupService.class); - sut = new QueryGroupRequestRejectionOperationListener(queryGroupService, testThreadPool); final String testQueryGroupId = "asdgasgkajgkw3141_3rt4t"; testThreadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, testQueryGroupId); - when(queryGroupService.shouldRejectFor(testQueryGroupId)).thenReturn(Optional.empty()); + doNothing().when(queryGroupService).rejectIfNeeded(testQueryGroupId); sut.onRequestStart(null); } From d51b251eb1e52baaf3f41286dc2b501cdb69272c Mon Sep 17 00:00:00 2001 From: Kaushal Kumar Date: Wed, 28 Aug 2024 09:16:12 -0700 Subject: [PATCH 8/9] add package info file Signed-off-by: Kaushal Kumar --- .../org/opensearch/wlm/listeners/package-info.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 server/src/main/java/org/opensearch/wlm/listeners/package-info.java diff --git a/server/src/main/java/org/opensearch/wlm/listeners/package-info.java b/server/src/main/java/org/opensearch/wlm/listeners/package-info.java new file mode 100644 index 0000000000000..e900acf657085 --- /dev/null +++ b/server/src/main/java/org/opensearch/wlm/listeners/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * WLM related listener constructs + */ +package org.opensearch.wlm.listeners; From 5629506c40d1684dc69e9984803a2ef25ef9ef0b Mon Sep 17 00:00:00 2001 From: Kaushal Kumar Date: Thu, 29 Aug 2024 12:25:12 -0700 Subject: [PATCH 9/9] remove unused method from QueryGroupService stub Signed-off-by: Kaushal Kumar --- .../main/java/org/opensearch/wlm/QueryGroupService.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java index 7eb2d5d3a8223..97c4e5169b4ed 100644 --- a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java @@ -14,14 +14,6 @@ * This is stub at this point in time and will be replace by an acutal one in couple of days */ public class QueryGroupService { - /** - * updates the failure stats for the query group - * @param queryGroupId query group identifier - */ - public void requestFailedFor(final String queryGroupId) { - - } - /** * * @param queryGroupId query group identifier