Skip to content

Commit

Permalink
Initial start of AD profile test in tool
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Ohlsen <ohltyler@amazon.com>
  • Loading branch information
ohltyler committed Dec 29, 2023
1 parent 051b79a commit 47b267a
Show file tree
Hide file tree
Showing 5 changed files with 381 additions and 52 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ dependencies {
implementation fileTree(dir: sqlJarDirectory, include: ["opensearch-sql-${version}.jar", "ppl-${version}.jar", "protocol-${version}.jar"])
compileOnly "org.opensearch:common-utils:${version}"
compileOnly "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${version}"


// ZipArchive dependencies used for integration tests
zipArchive group: 'org.opensearch.plugin', name:'opensearch-ml-plugin', version: "${version}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,22 @@
package org.opensearch.agent.tools;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.client.AnomalyDetectionNodeClient;
import org.opensearch.ad.model.ADTask;
import org.opensearch.ad.transport.GetAnomalyDetectorRequest;
import org.opensearch.ad.transport.GetAnomalyDetectorResponse;
import org.opensearch.agent.tools.utils.ToolConstants.DetectorStateString;
import org.opensearch.client.Client;
import org.opensearch.common.lucene.uid.Versions;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
Expand Down Expand Up @@ -123,24 +131,98 @@ public <T> void run(Map<String, String> parameters, ActionListener<T> listener)

SearchRequest searchDetectorRequest = new SearchRequest().source(searchSourceBuilder);

if (running != null || disabled != null || failed != null) {
// TODO: add a listener to trigger when the first response is received, to trigger the profile API call
// to fetch the detector state, etc.
// Will need AD client to onboard the profile API first.
}

ActionListener<SearchResponse> searchDetectorListener = ActionListener.<SearchResponse>wrap(response -> {
StringBuilder sb = new StringBuilder();
SearchHit[] hits = response.getHits().getHits();
List<SearchHit> hits = Arrays.asList(response.getHits().getHits());
Map<String, SearchHit> hitsAsMap = hits.stream().collect(Collectors.toMap(SearchHit::getId, hit -> hit));

// If we need to filter by detector state, make subsequent profile API calls to each detector
if (running != null || disabled != null || failed != null) {

// Send out individual AD client calls to fetch detector profiles, continuously adding to a
// tracked list of CompletableFutures
List<CompletableFuture<GetAnomalyDetectorResponse>> profileFutures = new ArrayList<>();
for (SearchHit hit : hits) {
CompletableFuture<GetAnomalyDetectorResponse> profileFuture = new CompletableFuture<>();
profileFutures.add(profileFuture);
ActionListener<GetAnomalyDetectorResponse> profileListener = ActionListener
.<GetAnomalyDetectorResponse>wrap(profileResponse -> {
profileFuture.complete(profileResponse);
}, e -> {
log.error("Failed to get anomaly detector profile.", e);
profileFuture.completeExceptionally(e);
listener.onFailure(e);
});

Check warning on line 155 in src/main/java/org/opensearch/agent/tools/SearchAnomalyDetectorsTool.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/agent/tools/SearchAnomalyDetectorsTool.java#L152-L155

Added lines #L152 - L155 were not covered by tests

GetAnomalyDetectorRequest profileRequest = new GetAnomalyDetectorRequest(
hit.getId(),
Versions.MATCH_ANY,
true,
true,
"",
"",
false,
null
);
adClient.getDetectorProfile(profileRequest, profileListener);
}

// Wait for all CompletableFutures to complete, and iterate through the responses. Filter out
// detectors with unwanted detector states.
CompletableFuture<List<GetAnomalyDetectorResponse>> listFuture = CompletableFuture
.allOf(profileFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> profileFutures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
final List<GetAnomalyDetectorResponse> profileResponses = listFuture.join();
for (GetAnomalyDetectorResponse profileResponse : profileResponses) {
if (profileResponse != null && profileResponse.getDetector() != null) {
String detectorId = profileResponse.getDetector().getId();

// We follow the existing logic as the frontend to determine overall detector state
// https://github.com/opensearch-project/anomaly-detection-dashboards-plugin/blob/main/server/routes/utils/adHelpers.ts#L437
String detectorState;
ADTask realtimeTask = profileResponse.getRealtimeAdTask();

if (realtimeTask != null) {
String taskState = realtimeTask.getState();
if (taskState.equalsIgnoreCase("CREATED")) {
detectorState = DetectorStateString.Initializing.name();

Check warning on line 188 in src/main/java/org/opensearch/agent/tools/SearchAnomalyDetectorsTool.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/agent/tools/SearchAnomalyDetectorsTool.java#L188

Added line #L188 was not covered by tests
} else if (taskState.equalsIgnoreCase("RUNNING")) {
detectorState = DetectorStateString.Running.name();
} else if (taskState.equalsIgnoreCase("INIT_FAILURE")
|| taskState.equalsIgnoreCase("UNEXPECTED_FAILURE")
|| taskState.equalsIgnoreCase("FAILED")) {
detectorState = DetectorStateString.Failed.name();
} else {
// Task states may fall under other values, such as "FEATURE_REQUIRED" / "STOPPED" / etc.
// We assume here that these will all fall under the disabled category
detectorState = DetectorStateString.Disabled.name();
}
} else {
detectorState = DetectorStateString.Disabled.name();

Check warning on line 201 in src/main/java/org/opensearch/agent/tools/SearchAnomalyDetectorsTool.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/agent/tools/SearchAnomalyDetectorsTool.java#L201

Added line #L201 was not covered by tests
}

if (running != null && !running && detectorState.equals(DetectorStateString.Running.name())) {
hitsAsMap.remove(detectorId);
}
if (disabled != null && !disabled && detectorState.equals(DetectorStateString.Disabled.name())) {
hitsAsMap.remove(detectorId);
}
if (failed != null && !failed && detectorState.equals(DetectorStateString.Failed.name())) {
hitsAsMap.remove(detectorId);
}
}
}
}

sb.append("AnomalyDetectors=[");
for (SearchHit hit : hits) {
for (SearchHit hit : hitsAsMap.values()) {
sb.append("{");
sb.append("id=").append(hit.getId()).append(",");
sb.append("name=").append(hit.getSourceAsMap().get("name"));
sb.append("}");
}
sb.append("]");
sb.append("TotalAnomalyDetectors=").append(response.getHits().getTotalHits().value);
sb.append("TotalAnomalyDetectors=").append(hitsAsMap.values().size());
listener.onResponse((T) sb.toString());
}, e -> {
log.error("Failed to search anomaly detectors.", e);
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/opensearch/agent/tools/utils/ToolConstants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.agent.tools.utils;

public class ToolConstants {

Check warning on line 8 in src/main/java/org/opensearch/agent/tools/utils/ToolConstants.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/agent/tools/utils/ToolConstants.java#L8

Added line #L8 was not covered by tests
public static enum DetectorStateString {
Running,
Disabled,
Failed,
Initializing
}
}
62 changes: 62 additions & 0 deletions src/test/java/org/opensearch/agent/TestHelpers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.agent;

import static org.mockito.Mockito.when;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;

import org.apache.lucene.search.TotalHits;
import org.mockito.Mockito;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchResponseSections;
import org.opensearch.ad.model.ADTask;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.transport.GetAnomalyDetectorResponse;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.Aggregations;

public class TestHelpers {

public static SearchResponse generateSearchResponse(SearchHit[] hits) {
TotalHits totalHits = new TotalHits(hits.length, TotalHits.Relation.EQUAL_TO);
return new SearchResponse(
new SearchResponseSections(new SearchHits(hits, totalHits, 0), new Aggregations(new ArrayList<>()), null, false, null, null, 0),
null,
0,
0,
0,
0,
null,
null
);
}

public static GetAnomalyDetectorResponse generateGetAnomalyDetectorResponses(String[] detectorIds, String[] detectorStates) {
AnomalyDetector detector = Mockito.mock(AnomalyDetector.class);
when(detector.getId()).thenReturn(detectorIds[0], Arrays.copyOfRange(detectorIds, 1, detectorIds.length));
ADTask realtimeAdTask = Mockito.mock(ADTask.class);
when(realtimeAdTask.getState()).thenReturn(detectorStates[0], Arrays.copyOfRange(detectorStates, 1, detectorStates.length));
GetAnomalyDetectorResponse getDetectorProfileResponse = Mockito.mock(GetAnomalyDetectorResponse.class);
when(getDetectorProfileResponse.getRealtimeAdTask()).thenReturn(realtimeAdTask);
when(getDetectorProfileResponse.getDetector()).thenReturn(detector);
return getDetectorProfileResponse;
}

public static SearchHit generateSearchDetectorHit(String detectorName, String detectorId) throws IOException {
XContentBuilder content = XContentBuilder.builder(XContentType.JSON.xContent());
content.startObject();
content.field("name", detectorName);
content.endObject();
return new SearchHit(0, detectorId, null, null).sourceRef(BytesReference.bytes(content));
}
}
Loading

0 comments on commit 47b267a

Please sign in to comment.