From 73ca4714f9e2b08187c6f9579fb33b0e09a9008c Mon Sep 17 00:00:00 2001 From: Amit Galitzky Date: Tue, 3 Sep 2024 09:45:05 -0700 Subject: [PATCH 1/2] Adding remote index and multi index checks in validation Signed-off-by: Amit Galitzky --- .../timeseries/constant/CommonMessages.java | 4 + .../timeseries/rest/RestValidateAction.java | 1 - .../AbstractTimeSeriesActionHandler.java | 218 ++++++++++++------ .../BaseValidateConfigTransportAction.java | 6 +- .../util/CrossClusterConfigUtils.java | 102 ++++++++ .../MultiResponsesDelegateActionListener.java | 2 - ...stractForecasterActionHandlerTestCase.java | 5 + ...ndexAnomalyDetectorActionHandlerTests.java | 5 + .../IndexForecasterActionHandlerTests.java | 9 +- ...dateAnomalyDetectorActionHandlerTests.java | 6 +- ...teAnomalyDetectorTransportActionTests.java | 16 +- .../util/CrossClusterConfigUtilsTests.java | 66 ++++++ 12 files changed, 348 insertions(+), 92 deletions(-) create mode 100644 src/main/java/org/opensearch/timeseries/util/CrossClusterConfigUtils.java create mode 100644 src/test/java/org/opensearch/timeseries/util/CrossClusterConfigUtilsTests.java diff --git a/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java b/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java index 059947f91..16aa3fda2 100644 --- a/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java +++ b/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java @@ -37,6 +37,7 @@ public static String getTooManyCategoricalFieldErr(int limit) { "Result index name has invalid character. Valid characters are a-z, 0-9, -(hyphen) and _(underscore)"; public static String FAIL_TO_VALIDATE = "failed to validate"; public static String INVALID_TIMESTAMP = "Timestamp field: (%s) must be of type date"; + public static String NON_EXISTENT_TIMESTAMP_IN_INDEX = "Timestamp field: (%s) is not found in the (%s) index mapping"; public static String NON_EXISTENT_TIMESTAMP = "Timestamp field: (%s) is not found in index mapping"; public static String INVALID_NAME = "Valid characters for name are a-z, A-Z, 0-9, -(hyphen), _(underscore) and .(period)"; // change this error message to make it compatible with old version's integration(nexus) test @@ -74,6 +75,9 @@ public static String getTooManyCategoricalFieldErr(int limit) { + " characters."; public static final String INDEX_NOT_FOUND = "index does not exist"; public static final String FAIL_TO_GET_MAPPING_MSG = "Fail to get the index mapping of %s"; + public static final String FAIL_TO_GET_MAPPING = "Fail to get the index mapping"; + public static final String TIMESTAMP_VALIDATION_FAILED = "Validation failed for timefield of %s "; + public static final String FAIL_TO_GET_CONFIG_MSG = "Fail to get config"; // ====================================== diff --git a/src/main/java/org/opensearch/timeseries/rest/RestValidateAction.java b/src/main/java/org/opensearch/timeseries/rest/RestValidateAction.java index fa546c3d9..e9da98c8d 100644 --- a/src/main/java/org/opensearch/timeseries/rest/RestValidateAction.java +++ b/src/main/java/org/opensearch/timeseries/rest/RestValidateAction.java @@ -84,7 +84,6 @@ private Boolean validationTypesAreAccepted(String validationType) { public ValidateConfigRequest prepareRequest(RestRequest request, NodeClient client, String typesStr) throws IOException { XContentParser parser = request.contentParser(); ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); - // if type param isn't blank and isn't a part of possible validation types throws exception if (!StringUtils.isBlank(typesStr)) { if (!validationTypesAreAccepted(typesStr)) { diff --git a/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java b/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java index bba0a4f09..deb7fed0a 100644 --- a/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java +++ b/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java @@ -7,21 +7,14 @@ import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.timeseries.constant.CommonMessages.CATEGORICAL_FIELD_TYPE_ERR_MSG; +import static org.opensearch.timeseries.constant.CommonMessages.TIMESTAMP_VALIDATION_FAILED; import static org.opensearch.timeseries.util.ParseUtils.parseAggregators; import static org.opensearch.timeseries.util.RestHandlerUtils.XCONTENT_WITH_TYPE; import static org.opensearch.timeseries.util.RestHandlerUtils.isExceptionCausedByInvalidQuery; import java.io.IOException; import java.time.Clock; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import org.apache.commons.lang.StringUtils; @@ -76,10 +69,7 @@ import org.opensearch.timeseries.model.ValidationIssueType; import org.opensearch.timeseries.task.TaskCacheManager; import org.opensearch.timeseries.task.TaskManager; -import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener; -import org.opensearch.timeseries.util.ParseUtils; -import org.opensearch.timeseries.util.RestHandlerUtils; -import org.opensearch.timeseries.util.SecurityClientUtil; +import org.opensearch.timeseries.util.*; import org.opensearch.transport.TransportService; import com.google.common.collect.Sets; @@ -241,7 +231,6 @@ public void start(ActionListener listener) { createOrUpdateConfig(listener); return; } - if (this.isDryRun) { if (timeSeriesIndices.doesIndexExist(resultIndexOrAlias) || timeSeriesIndices.doesAliasExist(resultIndexOrAlias)) { timeSeriesIndices @@ -304,64 +293,110 @@ protected void validateName(boolean indexingDryRun, ActionListener listener) protected void validateTimeField(boolean indexingDryRun, ActionListener listener) { String givenTimeField = config.getTimeField(); - GetFieldMappingsRequest getMappingsRequest = new GetFieldMappingsRequest(); - getMappingsRequest.indices(config.getIndices().toArray(new String[0])).fields(givenTimeField); - getMappingsRequest.indicesOptions(IndicesOptions.strictExpand()); - - // comments explaining fieldMappingResponse parsing can be found inside validateCategoricalField(String, boolean) - ActionListener mappingsListener = ActionListener.wrap(getMappingsResponse -> { - boolean foundField = false; - Map> mappingsByIndex = getMappingsResponse.mappings(); + HashMap> clusterIndicesMap = CrossClusterConfigUtils + .separateClusterIndexes(config.getIndices(), clusterService); - for (Map mappingsByField : mappingsByIndex.values()) { - for (Map.Entry field2Metadata : mappingsByField.entrySet()) { + ActionListener>> validateGetMappingForTimeFieldListener = ActionListener.wrap(response -> { + prepareConfigIndexing(indexingDryRun, listener); + }, exception -> { listener.onFailure(createValidationException(exception.getMessage(), ValidationIssueType.TIMEFIELD_FIELD)); }); + MultiResponsesDelegateActionListener>> multiGetMappingResponseListener = + new MultiResponsesDelegateActionListener<>( + validateGetMappingForTimeFieldListener, + clusterIndicesMap.entrySet().size(), + String.format(Locale.ROOT, TIMESTAMP_VALIDATION_FAILED, config.getName()), + false + ); - GetFieldMappingsResponse.FieldMappingMetadata fieldMetadata = field2Metadata.getValue(); - if (fieldMetadata != null) { - // sourceAsMap returns sth like {host2={type=keyword}} with host2 being a nested field - Map fieldMap = fieldMetadata.sourceAsMap(); - if (fieldMap != null) { - for (Object type : fieldMap.values()) { - if (type instanceof Map) { - foundField = true; - Map metadataMap = (Map) type; - String typeName = (String) metadataMap.get(CommonName.TYPE); - if (!typeName.equals(CommonName.DATE_TYPE) && !typeName.equals(CommonName.DATE_NANOS_TYPE)) { - listener - .onFailure( - new ValidationException( - String.format(Locale.ROOT, CommonMessages.INVALID_TIMESTAMP, givenTimeField), - ValidationIssueType.TIMEFIELD_FIELD, - configValidationAspect - ) - ); - return; + for (Map.Entry> clusterIndicesEntry : clusterIndicesMap.entrySet()) { + GetFieldMappingsRequest getMappingsRequestForIndex = new GetFieldMappingsRequest(); + getMappingsRequestForIndex.indices((clusterIndicesEntry.getValue().toArray(new String[0]))).fields(givenTimeField); + getMappingsRequestForIndex.indicesOptions(IndicesOptions.strictExpand()); + Client targetClusterClient = CrossClusterConfigUtils.getClientForCluster(clusterIndicesEntry.getKey(), client, clusterService); + ActionListener getMappingResponseListener = ActionListener.wrap(getMappingsResponse -> { + boolean foundField = false; + Map> mappingsByIndex = getMappingsResponse.mappings(); + for (Map.Entry> mappingsByField : mappingsByIndex + .entrySet()) { + if (mappingsByField.getValue().isEmpty()) { + multiGetMappingResponseListener + .onFailure( + new ValidationException( + String + .format( + Locale.ROOT, + CommonMessages.NON_EXISTENT_TIMESTAMP_IN_INDEX, + givenTimeField, + mappingsByField.getKey() + ), + ValidationIssueType.TIMEFIELD_FIELD, + configValidationAspect + ) + ); + return; + } + for (Map.Entry field2Metadata : mappingsByField + .getValue() + .entrySet()) { + GetFieldMappingsResponse.FieldMappingMetadata fieldMetadata = field2Metadata.getValue(); + if (fieldMetadata != null) { + // sourceAsMap returns sth like {host2={type=keyword}} with host2 being a nested field + Map fieldMap = fieldMetadata.sourceAsMap(); + if (fieldMap != null) { + for (Object type : fieldMap.values()) { + if (type instanceof Map) { + foundField = true; + Map metadataMap = (Map) type; + String typeName = (String) metadataMap.get(CommonName.TYPE); + if (!typeName.equals(CommonName.DATE_TYPE) && !typeName.equals(CommonName.DATE_NANOS_TYPE)) { + multiGetMappingResponseListener + .onFailure( + new ValidationException( + String.format(Locale.ROOT, CommonMessages.INVALID_TIMESTAMP, givenTimeField), + ValidationIssueType.TIMEFIELD_FIELD, + configValidationAspect + ) + ); + return; + } } } } } } } - } - if (!foundField) { - listener - .onFailure( - new ValidationException( - String.format(Locale.ROOT, CommonMessages.NON_EXISTENT_TIMESTAMP, givenTimeField), - ValidationIssueType.TIMEFIELD_FIELD, - configValidationAspect + if (!foundField) { + multiGetMappingResponseListener + .onFailure( + new ValidationException( + String.format(Locale.ROOT, CommonMessages.NON_EXISTENT_TIMESTAMP, givenTimeField), + ValidationIssueType.TIMEFIELD_FIELD, + configValidationAspect + ) + ); + return; + } + + multiGetMappingResponseListener + .onResponse( + new MergeableList>( + new ArrayList>(Collections.singletonList(Optional.empty())) ) ); - return; - } - prepareConfigIndexing(indexingDryRun, listener); - }, error -> { - String message = String.format(Locale.ROOT, "Fail to get the index mapping of %s", config.getIndices()); - logger.error(message, error); - listener.onFailure(new IllegalArgumentException(message)); - }); - clientUtil - .executeWithInjectedSecurity(GetFieldMappingsAction.INSTANCE, getMappingsRequest, user, client, context, mappingsListener); + }, e -> { + String errorMessage = String.format(Locale.ROOT, "Fail to get the index mapping of %s", clusterIndicesEntry.getValue()); + logger.error(errorMessage, e); + multiGetMappingResponseListener.onFailure(new OpenSearchStatusException(errorMessage, RestStatus.BAD_REQUEST, e)); + }); + clientUtil + .executeWithInjectedSecurity( + GetFieldMappingsAction.INSTANCE, + getMappingsRequestForIndex, + user, + targetClusterClient, + context, + getMappingResponseListener + ); + } } /** @@ -448,7 +483,6 @@ protected void validateAgainstExistingHCConfig(String configId, boolean indexing QueryBuilder query = QueryBuilders.boolQuery().filter(QueryBuilders.existsQuery(Config.CATEGORY_FIELD)); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(query).size(0).timeout(requestTimeout); - SearchRequest searchRequest = new SearchRequest(CommonName.CONFIG_INDEX).source(searchSourceBuilder); client .search( @@ -460,7 +494,7 @@ protected void validateAgainstExistingHCConfig(String configId, boolean indexing ) ); } else { - validateCategoricalField(configId, indexingDryRun, listener); + validateCategoricalFieldsInAllIndices(configId, indexingDryRun, listener); } } @@ -522,12 +556,33 @@ protected void onSearchHCConfigResponse(SearchResponse response, String detector } listener.onFailure(new IllegalArgumentException(errorMsg)); } else { - validateCategoricalField(detectorId, indexingDryRun, listener); + validateCategoricalFieldsInAllIndices(detectorId, indexingDryRun, listener); } } - @SuppressWarnings("unchecked") - protected void validateCategoricalField(String configId, boolean indexingDryRun, ActionListener listener) { + protected void validateCategoricalFieldsInAllIndices(String configId, boolean indexingDryRun, ActionListener listener) { + HashMap> clusterIndicesMap = CrossClusterConfigUtils + .separateClusterIndexes(config.getIndices(), clusterService); + + Iterator>> iterator = clusterIndicesMap.entrySet().iterator(); + + validateCategoricalFieldRecursive(iterator, configId, indexingDryRun, listener); + + } + + protected void validateCategoricalFieldRecursive( + Iterator>> iterator, + String configId, + boolean indexingDryRun, + ActionListener listener + ) { + if (!iterator.hasNext()) { + searchConfigInputIndices(configId, indexingDryRun, listener); // Call after all indices are validated + return; + } + + // Get the next cluster indices entry + Map.Entry> clusterIndicesEntry = iterator.next(); List categoryField = config.getCategoryFields(); // categoryField should have at least 1 element. Otherwise, we won't reach here. @@ -537,12 +592,16 @@ protected void validateCategoricalField(String configId, boolean indexingDryRun, // throws validation exception before reaching here String categoryField0 = categoryField.get(0); - - GetFieldMappingsRequest getMappingsRequest = new GetFieldMappingsRequest(); - getMappingsRequest.indices(config.getIndices().toArray(new String[0])).fields(categoryField.toArray(new String[0])); - getMappingsRequest.indicesOptions(IndicesOptions.strictExpand()); - - ActionListener mappingsListener = ActionListener.wrap(getMappingsResponse -> { + Client targetClusterClient = CrossClusterConfigUtils.getClientForCluster(clusterIndicesEntry.getKey(), client, clusterService); + // Create the GetFieldMappingsRequest for each index + GetFieldMappingsRequest getMappingsRequestForIndex = new GetFieldMappingsRequest(); + getMappingsRequestForIndex + .indices(clusterIndicesEntry.getValue().toArray(new String[0])) + .fields(categoryField.toArray(new String[0])); + getMappingsRequestForIndex.indicesOptions(IndicesOptions.strictExpand()); + + // Define the listener for each getMapping request + ActionListener getMappingsListener = ActionListener.wrap(getMappingsResponse -> { // example getMappingsResponse: // GetFieldMappingsResponse{mappings={server-metrics={_doc={service=FieldMappingMetadata{fullName='service', // source=org.opensearch.core.common.bytes.BytesArray@7ba87dbd}}}}} @@ -596,18 +655,25 @@ protected void validateCategoricalField(String configId, boolean indexingDryRun, ); return; } + validateCategoricalFieldRecursive(iterator, configId, indexingDryRun, listener); - searchConfigInputIndices(configId, indexingDryRun, listener); }, error -> { String message = String.format(Locale.ROOT, CommonMessages.FAIL_TO_GET_MAPPING_MSG, config.getIndices()); logger.error(message, error); listener.onFailure(new IllegalArgumentException(message)); }); - clientUtil - .executeWithInjectedSecurity(GetFieldMappingsAction.INSTANCE, getMappingsRequest, user, client, context, mappingsListener); + .executeWithInjectedSecurity( + GetFieldMappingsAction.INSTANCE, + getMappingsRequestForIndex, + user, + targetClusterClient, + context, + getMappingsListener + ); } + @SuppressWarnings("unchecked") protected void searchConfigInputIndices(String configId, boolean indexingDryRun, ActionListener listener) { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() .query(QueryBuilders.matchAllQuery()) diff --git a/src/main/java/org/opensearch/timeseries/transport/BaseValidateConfigTransportAction.java b/src/main/java/org/opensearch/timeseries/transport/BaseValidateConfigTransportAction.java index dca5d9ee1..d2fe30c02 100644 --- a/src/main/java/org/opensearch/timeseries/transport/BaseValidateConfigTransportAction.java +++ b/src/main/java/org/opensearch/timeseries/transport/BaseValidateConfigTransportAction.java @@ -8,10 +8,7 @@ import static org.opensearch.timeseries.util.ParseUtils.checkFilterByBackendRoles; import java.time.Clock; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; +import java.util.*; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -205,7 +202,6 @@ public void validateExecute( storedContext.restore(); Config config = request.getConfig(); ActionListener validateListener = ActionListener.wrap(response -> { - logger.debug("Result of validation process " + response); // forcing response to be empty listener.onResponse(new ValidateConfigResponse((ConfigValidationIssue) null)); }, exception -> { diff --git a/src/main/java/org/opensearch/timeseries/util/CrossClusterConfigUtils.java b/src/main/java/org/opensearch/timeseries/util/CrossClusterConfigUtils.java new file mode 100644 index 000000000..72cf2f400 --- /dev/null +++ b/src/main/java/org/opensearch/timeseries/util/CrossClusterConfigUtils.java @@ -0,0 +1,102 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.timeseries.util; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.client.Client; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.service.ClusterService; + +public class CrossClusterConfigUtils { + private static final Logger logger = LogManager.getLogger(ParseUtils.class); + + /** + * Uses the clusterName to determine whether the target client is the local or a remote client, + * and returns the appropriate client. + * @param clusterName The name of the cluster to evaluate. + * @param client The local {@link NodeClient}. + * @param localClusterName The name of the local cluster. + * @return The local {@link NodeClient} for the local cluster, or a remote client for a remote cluster. + */ + public static Client getClientForCluster(String clusterName, Client client, String localClusterName) { + return clusterName.equals(localClusterName) ? client : client.getRemoteClusterClient(clusterName); + } + + /** + * Uses the clusterName to determine whether the target client is the local or a remote client, + * and returns the appropriate client. + * @param clusterName The name of the cluster to evaluate. + * @param client The local {@link NodeClient}. + * @param clusterService Used to retrieve the name of the local cluster. + * @return The local {@link NodeClient} for the local cluster, or a remote client for a remote cluster. + */ + public static Client getClientForCluster(String clusterName, Client client, ClusterService clusterService) { + logger.info("clusterName1: " + clusterName); + logger.info("clusterService.getClusterName().value(): " + clusterService.getClusterName().value()); + + return getClientForCluster(clusterName, client, clusterService.getClusterName().value()); + } + + /** + * Parses the list of indexes into a map of cluster_name to List of index names + * @param indexes A list of index names in cluster_name:index_name format. + * Local indexes can also be in index_name format. + * @param clusterService Used to retrieve the name of the local cluster. + * @return A map of cluster_name:index names + */ + public static HashMap> separateClusterIndexes(List indexes, ClusterService clusterService) { + return separateClusterIndexes(indexes, clusterService.getClusterName().value()); + } + + /** + * Parses the list of indexes into a map of cluster_name to list of index_name + * @param indexes A list of index names in cluster_name:index_name format. + * @param localClusterName The name of the local cluster. + * @return A map of cluster_name to List index_name + */ + public static HashMap> separateClusterIndexes(List indexes, String localClusterName) { + HashMap> output = new HashMap<>(); + for (String index : indexes) { + String clusterName = parseClusterName(index); + String indexName = parseIndexName(index); + + // If the index entry does not have a cluster_name, it indicates the index is on the local cluster. + if (clusterName.isEmpty()) { + clusterName = localClusterName; + } + output.computeIfAbsent(clusterName, k -> new ArrayList<>()).add(indexName); + } + return output; + } + + /** + * @param index The name of the index to evaluate. + * Can be in either cluster_name:index_name or index_name format. + * @return The index name. + */ + public static String parseIndexName(String index) { + if (index.contains(":")) { + String[] parts = index.split(":"); + return parts.length > 1 ? parts[1] : index; + } else { + return index; + } + } + + /** + * @param index The name of the index to evaluate. + * Can be in either cluster_name:index_name or index_name format. + * @return The index name. + */ + public static String parseClusterName(String index) { + return index.contains(":") ? index.substring(0, index.indexOf(':')) : ""; + } +} diff --git a/src/main/java/org/opensearch/timeseries/util/MultiResponsesDelegateActionListener.java b/src/main/java/org/opensearch/timeseries/util/MultiResponsesDelegateActionListener.java index 7dd830435..c2c1d7e19 100644 --- a/src/main/java/org/opensearch/timeseries/util/MultiResponsesDelegateActionListener.java +++ b/src/main/java/org/opensearch/timeseries/util/MultiResponsesDelegateActionListener.java @@ -11,8 +11,6 @@ package org.opensearch.timeseries.util; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.concurrent.atomic.AtomicInteger; diff --git a/src/test/java/org/opensearch/action/admin/indices/mapping/get/AbstractForecasterActionHandlerTestCase.java b/src/test/java/org/opensearch/action/admin/indices/mapping/get/AbstractForecasterActionHandlerTestCase.java index 5bee84cba..d1228ae98 100644 --- a/src/test/java/org/opensearch/action/admin/indices/mapping/get/AbstractForecasterActionHandlerTestCase.java +++ b/src/test/java/org/opensearch/action/admin/indices/mapping/get/AbstractForecasterActionHandlerTestCase.java @@ -73,6 +73,8 @@ public class AbstractForecasterActionHandlerTestCase extends AbstractTimeSeriesT protected ThreadContext threadContext; protected SecurityClientUtil clientUtil; protected String categoricalField; + // @Mock + protected ClusterName clusterName; @SuppressWarnings("unchecked") @Override @@ -85,6 +87,9 @@ public void setUp() throws Exception { clusterService = mock(ClusterService.class); ClusterName clusterName = new ClusterName("test"); + clusterName = mock(ClusterName.class); + when(clusterService.getClusterName()).thenReturn(clusterName); + when(clusterName.value()).thenReturn("test"); ClusterState clusterState = ClusterState.builder(clusterName).metadata(Metadata.builder().build()).build(); when(clusterService.state()).thenReturn(clusterState); diff --git a/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java b/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java index c62e975cf..36d8157d7 100644 --- a/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java +++ b/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java @@ -100,6 +100,7 @@ public class IndexAnomalyDetectorActionHandlerTests extends AbstractTimeSeriesTe private RestRequest.Method method; private ADTaskManager adTaskManager; private SearchFeatureDao searchFeatureDao; + private ClusterName clusterName; @BeforeClass public static void beforeClass() { @@ -157,6 +158,10 @@ public void setUp() throws Exception { searchFeatureDao = mock(SearchFeatureDao.class); + clusterName = mock(ClusterName.class); + when(clusterService.getClusterName()).thenReturn(clusterName); + when(clusterName.value()).thenReturn("test"); + handler = new IndexAnomalyDetectorActionHandler( clusterService, clientMock, diff --git a/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexForecasterActionHandlerTests.java b/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexForecasterActionHandlerTests.java index e78b154ea..86702129e 100644 --- a/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexForecasterActionHandlerTests.java +++ b/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexForecasterActionHandlerTests.java @@ -40,6 +40,7 @@ import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.action.support.replication.ReplicationResponse.ShardInfo; import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.routing.AllocationId; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; @@ -240,7 +241,7 @@ public void doE verify(clientSpy, times(1)).execute(eq(GetAction.INSTANCE), any(), any()); } - public void testFaiToParse() throws InterruptedException { + public void testFailToParse() throws InterruptedException { NodeClient client = new NodeClient(Settings.EMPTY, threadPool) { @Override public void doExecute( @@ -273,6 +274,9 @@ public void doE } }; NodeClient clientSpy = spy(client); + clusterName = mock(ClusterName.class); + when(clusterService.getClusterName()).thenReturn(clusterName); + when(clusterName.value()).thenReturn("test"); method = RestRequest.Method.PUT; @@ -508,6 +512,9 @@ public void doE } }; NodeClient clientSpy = spy(client); + clusterName = mock(ClusterName.class); + when(clusterService.getClusterName()).thenReturn(clusterName); + when(clusterName.value()).thenReturn("test"); method = RestRequest.Method.POST; diff --git a/src/test/java/org/opensearch/action/admin/indices/mapping/get/ValidateAnomalyDetectorActionHandlerTests.java b/src/test/java/org/opensearch/action/admin/indices/mapping/get/ValidateAnomalyDetectorActionHandlerTests.java index 2fea7b5db..a5d88ef53 100644 --- a/src/test/java/org/opensearch/action/admin/indices/mapping/get/ValidateAnomalyDetectorActionHandlerTests.java +++ b/src/test/java/org/opensearch/action/admin/indices/mapping/get/ValidateAnomalyDetectorActionHandlerTests.java @@ -43,6 +43,7 @@ import org.opensearch.ad.task.ADTaskManager; import org.opensearch.client.Client; import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -91,6 +92,7 @@ public class ValidateAnomalyDetectorActionHandlerTests extends AbstractTimeSerie @Mock protected ThreadPool threadPool; protected ThreadContext threadContext; + protected ClusterName mockClusterName; @SuppressWarnings("unchecked") @Override @@ -106,7 +108,9 @@ public void setUp() throws Exception { anomalyDetectionIndices = mock(ADIndexManagement.class); when(anomalyDetectionIndices.doesConfigIndexExist()).thenReturn(true); - + mockClusterName = mock(ClusterName.class); + when(clusterService.getClusterName()).thenReturn(mockClusterName); + when(mockClusterName.value()).thenReturn("test"); detectorId = "123"; seqNo = 0L; primaryTerm = 0L; diff --git a/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportActionTests.java index 8550dce8a..041c1c512 100644 --- a/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportActionTests.java @@ -490,9 +490,11 @@ public void testValidateAnomalyDetectorWithNonExistentTimefield() throws IOExcep ValidateConfigResponse response = client().execute(ValidateAnomalyDetectorAction.INSTANCE, request).actionGet(5_000); assertEquals(ValidationIssueType.TIMEFIELD_FIELD, response.getIssue().getType()); assertEquals(ValidationAspect.DETECTOR, response.getIssue().getAspect()); - assertEquals( - String.format(Locale.ROOT, CommonMessages.NON_EXISTENT_TIMESTAMP, anomalyDetector.getTimeField()), - response.getIssue().getMessage() + assertTrue( + response + .getIssue() + .getMessage() + .contains(String.format(Locale.ROOT, CommonMessages.INVALID_TIMESTAMP, anomalyDetector.getTimeField())) ); } @@ -513,9 +515,11 @@ public void testValidateAnomalyDetectorWithNonDateTimeField() throws IOException ValidateConfigResponse response = client().execute(ValidateAnomalyDetectorAction.INSTANCE, request).actionGet(5_000); assertEquals(ValidationIssueType.TIMEFIELD_FIELD, response.getIssue().getType()); assertEquals(ValidationAspect.DETECTOR, response.getIssue().getAspect()); - assertEquals( - String.format(Locale.ROOT, CommonMessages.INVALID_TIMESTAMP, anomalyDetector.getTimeField()), - response.getIssue().getMessage() + assertTrue( + response + .getIssue() + .getMessage() + .contains(String.format(Locale.ROOT, CommonMessages.INVALID_TIMESTAMP, anomalyDetector.getTimeField())) ); } diff --git a/src/test/java/org/opensearch/timeseries/util/CrossClusterConfigUtilsTests.java b/src/test/java/org/opensearch/timeseries/util/CrossClusterConfigUtilsTests.java new file mode 100644 index 000000000..19fe6ffd6 --- /dev/null +++ b/src/test/java/org/opensearch/timeseries/util/CrossClusterConfigUtilsTests.java @@ -0,0 +1,66 @@ +package org.opensearch.timeseries.util; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import org.mockito.Mock; +import org.opensearch.client.Client; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.test.OpenSearchTestCase; + +public class CrossClusterConfigUtilsTests extends OpenSearchTestCase { + + @Mock + private Client clientMock; + + public void testGetClientForClusterLocalCluster() { + String clusterName = "localCluster"; + Client mockClient = mock(NodeClient.class); + String localClusterName = "localCluster"; + + Client result = CrossClusterConfigUtils.getClientForCluster(clusterName, mockClient, localClusterName); + + assertEquals(mockClient, result); + } + + public void testGetClientForClusterRemoteCluster() { + String clusterName = "remoteCluster"; + Client mockClient = mock(NodeClient.class); + // Client mockRemoteClient = mock(Client.class); + + when(mockClient.getRemoteClusterClient(clusterName)).thenReturn(mockClient); + + Client result = CrossClusterConfigUtils.getClientForCluster(clusterName, mockClient, "localCluster"); + + assertEquals(mockClient, result); + } + + public void testSeparateClusterIndexesRemoteCluster() { + List indexes = Arrays.asList("remoteCluster:index1", "index2"); + ClusterService mockClusterService = mock(ClusterService.class); + when(mockClusterService.getClusterName()).thenReturn(new ClusterName("localCluster")); + + HashMap> result = CrossClusterConfigUtils.separateClusterIndexes(indexes, mockClusterService); + + assertEquals(2, result.size()); + assertEquals(Arrays.asList("index1"), result.get("remoteCluster")); + assertEquals(Arrays.asList("index2"), result.get("localCluster")); + } + + public void testParseIndexName() { + assertEquals("index1", CrossClusterConfigUtils.parseIndexName("remoteCluster:index1")); + assertEquals("index2", CrossClusterConfigUtils.parseIndexName("index2")); + } + + public void testParseClusterName() { + assertEquals("remoteCluster", CrossClusterConfigUtils.parseClusterName("remoteCluster:index1")); + assertEquals("", CrossClusterConfigUtils.parseClusterName("index2")); + } +} From 988ebfb4867cf885ae958d5075cc2a78a7f64526 Mon Sep 17 00:00:00 2001 From: Amit Galitzky Date: Tue, 3 Sep 2024 14:28:01 -0700 Subject: [PATCH 2/2] adding more tests Signed-off-by: Amit Galitzky --- .../timeseries/constant/CommonMessages.java | 2 +- .../AbstractTimeSeriesActionHandler.java | 8 +-- .../util/CrossClusterConfigUtils.java | 36 +++++----- .../MultiResponsesDelegateActionListener.java | 2 + .../ValidateForecasterActionHandlerTests.java | 3 +- .../ad/rest/AnomalyDetectorRestApiIT.java | 67 +++++++++++++++++++ ...teAnomalyDetectorTransportActionTests.java | 5 +- .../util/CrossClusterConfigUtilsTests.java | 63 +++++++++-------- 8 files changed, 127 insertions(+), 59 deletions(-) diff --git a/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java b/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java index 16aa3fda2..8ffed43b7 100644 --- a/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java +++ b/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java @@ -76,7 +76,7 @@ public static String getTooManyCategoricalFieldErr(int limit) { public static final String INDEX_NOT_FOUND = "index does not exist"; public static final String FAIL_TO_GET_MAPPING_MSG = "Fail to get the index mapping of %s"; public static final String FAIL_TO_GET_MAPPING = "Fail to get the index mapping"; - public static final String TIMESTAMP_VALIDATION_FAILED = "Validation failed for timefield of %s "; + public static final String TIMESTAMP_VALIDATION_FAILED = "Validation failed for timefield of %s, "; public static final String FAIL_TO_GET_CONFIG_MSG = "Fail to get config"; diff --git a/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java b/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java index deb7fed0a..44e29f6ae 100644 --- a/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java +++ b/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java @@ -377,15 +377,11 @@ protected void validateTimeField(boolean indexingDryRun, ActionListener liste } multiGetMappingResponseListener - .onResponse( - new MergeableList>( - new ArrayList>(Collections.singletonList(Optional.empty())) - ) - ); + .onResponse(new MergeableList<>(new ArrayList<>(Collections.singletonList(Optional.empty())))); }, e -> { String errorMessage = String.format(Locale.ROOT, "Fail to get the index mapping of %s", clusterIndicesEntry.getValue()); logger.error(errorMessage, e); - multiGetMappingResponseListener.onFailure(new OpenSearchStatusException(errorMessage, RestStatus.BAD_REQUEST, e)); + multiGetMappingResponseListener.onFailure(new IllegalArgumentException(errorMessage, e)); }); clientUtil .executeWithInjectedSecurity( diff --git a/src/main/java/org/opensearch/timeseries/util/CrossClusterConfigUtils.java b/src/main/java/org/opensearch/timeseries/util/CrossClusterConfigUtils.java index 72cf2f400..61c97a814 100644 --- a/src/main/java/org/opensearch/timeseries/util/CrossClusterConfigUtils.java +++ b/src/main/java/org/opensearch/timeseries/util/CrossClusterConfigUtils.java @@ -9,6 +9,7 @@ import java.util.HashMap; import java.util.List; +import org.apache.commons.lang3.tuple.Pair; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.client.Client; @@ -39,9 +40,6 @@ public static Client getClientForCluster(String clusterName, Client client, Stri * @return The local {@link NodeClient} for the local cluster, or a remote client for a remote cluster. */ public static Client getClientForCluster(String clusterName, Client client, ClusterService clusterService) { - logger.info("clusterName1: " + clusterName); - logger.info("clusterService.getClusterName().value(): " + clusterService.getClusterName().value()); - return getClientForCluster(clusterName, client, clusterService.getClusterName().value()); } @@ -65,8 +63,12 @@ public static HashMap> separateClusterIndexes(List public static HashMap> separateClusterIndexes(List indexes, String localClusterName) { HashMap> output = new HashMap<>(); for (String index : indexes) { - String clusterName = parseClusterName(index); - String indexName = parseIndexName(index); + // Use the refactored method to get both cluster and index names in one call + Pair clusterAndIndex = parseClusterAndIndexName(index); + String clusterName = clusterAndIndex.getKey(); + String indexName = clusterAndIndex.getValue(); + logger.info("clusterName: " + clusterName); + logger.info("indexName: " + indexName); // If the index entry does not have a cluster_name, it indicates the index is on the local cluster. if (clusterName.isEmpty()) { @@ -78,25 +80,19 @@ public static HashMap> separateClusterIndexes(List } /** + * Parses the cluster and index names from the given input string. + * The input can be in either "cluster_name:index_name" format or just "index_name". * @param index The name of the index to evaluate. - * Can be in either cluster_name:index_name or index_name format. - * @return The index name. + * @return A Pair where the left is the cluster name (or empty if not present), and the right is the index name. */ - public static String parseIndexName(String index) { + public static Pair parseClusterAndIndexName(String index) { if (index.contains(":")) { - String[] parts = index.split(":"); - return parts.length > 1 ? parts[1] : index; + String[] parts = index.split(":", 2); + String clusterName = parts[0]; + String indexName = parts.length > 1 ? parts[1] : ""; + return Pair.of(clusterName, indexName); } else { - return index; + return Pair.of("", index); } } - - /** - * @param index The name of the index to evaluate. - * Can be in either cluster_name:index_name or index_name format. - * @return The index name. - */ - public static String parseClusterName(String index) { - return index.contains(":") ? index.substring(0, index.indexOf(':')) : ""; - } } diff --git a/src/main/java/org/opensearch/timeseries/util/MultiResponsesDelegateActionListener.java b/src/main/java/org/opensearch/timeseries/util/MultiResponsesDelegateActionListener.java index c2c1d7e19..7dd830435 100644 --- a/src/main/java/org/opensearch/timeseries/util/MultiResponsesDelegateActionListener.java +++ b/src/main/java/org/opensearch/timeseries/util/MultiResponsesDelegateActionListener.java @@ -11,6 +11,8 @@ package org.opensearch.timeseries.util; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.concurrent.atomic.AtomicInteger; diff --git a/src/test/java/org/opensearch/action/admin/indices/mapping/get/ValidateForecasterActionHandlerTests.java b/src/test/java/org/opensearch/action/admin/indices/mapping/get/ValidateForecasterActionHandlerTests.java index e179a326f..10077f462 100644 --- a/src/test/java/org/opensearch/action/admin/indices/mapping/get/ValidateForecasterActionHandlerTests.java +++ b/src/test/java/org/opensearch/action/admin/indices/mapping/get/ValidateForecasterActionHandlerTests.java @@ -22,6 +22,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.ActionResponse; import org.opensearch.forecast.rest.handler.ValidateForecasterActionHandler; +import org.opensearch.timeseries.common.exception.ValidationException; import org.opensearch.timeseries.model.ValidationAspect; public class ValidateForecasterActionHandlerTests extends AbstractForecasterActionHandlerTestCase { @@ -100,7 +101,7 @@ public void doE assertTrue("should not reach here", false); inProgressLatch.countDown(); }, e -> { - assertTrue(e instanceof IllegalArgumentException); + assertTrue(e instanceof ValidationException); inProgressLatch.countDown(); })); assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); diff --git a/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java b/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java index 95cd0917a..d53b68011 100644 --- a/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java +++ b/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java @@ -1377,6 +1377,73 @@ public void testValidateAnomalyDetectorWithNoTimeField() throws Exception { assertEquals("time field missing", CommonMessages.NULL_TIME_FIELD, messageMap.get("time_field").get("message")); } + public void testValidateAnomalyDetectorWithMultipleIndicesOneNotFound() throws Exception { + TestHelpers.createIndex(client(), "test-index", TestHelpers.toHttpEntity("{\"timestamp\": " + Instant.now().toEpochMilli() + "}")); + Response resp = TestHelpers + .makeRequest( + client(), + "POST", + TestHelpers.AD_BASE_DETECTORS_URI + "/_validate", + ImmutableMap.of(), + TestHelpers + .toHttpEntity( + "{\"name\":\"" + + "test-detector" + + "\",\"description\":\"Test detector\",\"time_field\":\"timestamp\"," + + "\"indices\":[\"test-index\", \"test-index-2\"],\"feature_attributes\":[{\"feature_name\":\"cpu-sum\",\"" + + "feature_enabled\":true,\"aggregation_query\":{\"total_cpu\":{\"sum\":{\"field\":\"cpu\"}}}}," + + "{\"feature_name\":\"error-sum\",\"feature_enabled\":true,\"aggregation_query\":" + + "{\"total_error\":" + + "{\"sum\":{\"field\":\"error\"}}}}],\"filter_query\":{\"bool\":{\"filter\":[{\"exists\":" + + "{\"field\":" + + "\"cpu\",\"boost\":1}}],\"adjust_pure_negative\":true,\"boost\":1}},\"detection_interval\":" + + "{\"period\":{\"interval\":1,\"unit\":\"Minutes\"}}," + + "\"window_delay\":{\"period\":{\"interval\":2,\"unit\":\"Minutes\"}}," + + "\"shingle_size\": 8}" + ), + null + ); + Map responseMap = entityAsMap(resp); + + @SuppressWarnings("unchecked") + Map> messageMap = (Map>) XContentMapValues + .extractValue("detector", responseMap); + String errorMessage = "index does not exist"; + assertEquals("index does not exist", errorMessage, messageMap.get("indices").get("message")); + } + + public void testValidateAnomalyDetectorWithMultipleIndices() throws Exception { + TestHelpers.createIndexWithTimeField(client(), "test-index", TIME_FIELD); + TestHelpers.createIndexWithTimeField(client(), "test-index-2", TIME_FIELD); + + Response resp = TestHelpers + .makeRequest( + client(), + "POST", + TestHelpers.AD_BASE_DETECTORS_URI + "/_validate", + ImmutableMap.of(), + TestHelpers + .toHttpEntity( + "{\"name\":\"" + + "test-detector" + + "\",\"description\":\"Test detector\",\"time_field\":\"timestamp\"," + + "\"indices\":[\"test-index\", \"test-index-2\"],\"feature_attributes\":[{\"feature_name\":\"cpu-sum\",\"" + + "feature_enabled\":true,\"aggregation_query\":{\"total_cpu\":{\"sum\":{\"field\":\"cpu\"}}}}," + + "{\"feature_name\":\"error-sum\",\"feature_enabled\":true,\"aggregation_query\":" + + "{\"total_error\":" + + "{\"sum\":{\"field\":\"error\"}}}}],\"filter_query\":{\"bool\":{\"filter\":[{\"exists\":" + + "{\"field\":" + + "\"cpu\",\"boost\":1}}],\"adjust_pure_negative\":true,\"boost\":1}},\"detection_interval\":" + + "{\"period\":{\"interval\":1,\"unit\":\"Minutes\"}}," + + "\"window_delay\":{\"period\":{\"interval\":2,\"unit\":\"Minutes\"}}," + + "\"shingle_size\": 8}" + ), + null + ); + Map responseMap = entityAsMap(resp); + assertEquals("no issue, empty response body", new HashMap(), responseMap); + } + public void testValidateAnomalyDetectorWithIncorrectShingleSize() throws Exception { TestHelpers.createIndex(client(), "test-index", TestHelpers.toHttpEntity("{\"timestamp\": " + Instant.now().toEpochMilli() + "}")); Response resp = TestHelpers diff --git a/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportActionTests.java index 041c1c512..53f6f0ab5 100644 --- a/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportActionTests.java @@ -491,10 +491,7 @@ public void testValidateAnomalyDetectorWithNonExistentTimefield() throws IOExcep assertEquals(ValidationIssueType.TIMEFIELD_FIELD, response.getIssue().getType()); assertEquals(ValidationAspect.DETECTOR, response.getIssue().getAspect()); assertTrue( - response - .getIssue() - .getMessage() - .contains(String.format(Locale.ROOT, CommonMessages.INVALID_TIMESTAMP, anomalyDetector.getTimeField())) + response.getIssue().getMessage().contains("Timestamp field: (" + anomalyDetector.getTimeField() + ") is not found in the") ); } diff --git a/src/test/java/org/opensearch/timeseries/util/CrossClusterConfigUtilsTests.java b/src/test/java/org/opensearch/timeseries/util/CrossClusterConfigUtilsTests.java index 19fe6ffd6..dfc46c6d8 100644 --- a/src/test/java/org/opensearch/timeseries/util/CrossClusterConfigUtilsTests.java +++ b/src/test/java/org/opensearch/timeseries/util/CrossClusterConfigUtilsTests.java @@ -1,14 +1,23 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.timeseries.util; -import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.HashMap; import java.util.List; -import org.mockito.Mock; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.Before; import org.opensearch.client.Client; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.ClusterName; @@ -17,50 +26,50 @@ public class CrossClusterConfigUtilsTests extends OpenSearchTestCase { - @Mock - private Client clientMock; - - public void testGetClientForClusterLocalCluster() { - String clusterName = "localCluster"; - Client mockClient = mock(NodeClient.class); - String localClusterName = "localCluster"; + private Client mockClient; + private String remoteClusterName; + private String localClusterName; - Client result = CrossClusterConfigUtils.getClientForCluster(clusterName, mockClient, localClusterName); + @Before + public void setup() { + // Initialize the mock clients + mockClient = mock(NodeClient.class); + localClusterName = "localCluster"; + remoteClusterName = "remoteCluster"; + } + public void testGetClientForClusterLocalCluster() { + Client result = CrossClusterConfigUtils.getClientForCluster(localClusterName, mockClient, localClusterName); assertEquals(mockClient, result); + verify(mockClient, never()).getRemoteClusterClient(anyString()); } public void testGetClientForClusterRemoteCluster() { - String clusterName = "remoteCluster"; Client mockClient = mock(NodeClient.class); - // Client mockRemoteClient = mock(Client.class); + CrossClusterConfigUtils.getClientForCluster(remoteClusterName, mockClient, localClusterName); - when(mockClient.getRemoteClusterClient(clusterName)).thenReturn(mockClient); - - Client result = CrossClusterConfigUtils.getClientForCluster(clusterName, mockClient, "localCluster"); - - assertEquals(mockClient, result); + // Verify that getRemoteClusterClient was called once with the correct cluster name + verify(mockClient, times(1)).getRemoteClusterClient("remoteCluster"); + when(mockClient.getRemoteClusterClient(remoteClusterName)).thenReturn(mockClient); } public void testSeparateClusterIndexesRemoteCluster() { - List indexes = Arrays.asList("remoteCluster:index1", "index2"); + List indexes = Arrays.asList("remoteCluster:index1", "index2", "remoteCluster2:index2"); ClusterService mockClusterService = mock(ClusterService.class); when(mockClusterService.getClusterName()).thenReturn(new ClusterName("localCluster")); HashMap> result = CrossClusterConfigUtils.separateClusterIndexes(indexes, mockClusterService); - assertEquals(2, result.size()); + assertEquals(3, result.size()); assertEquals(Arrays.asList("index1"), result.get("remoteCluster")); assertEquals(Arrays.asList("index2"), result.get("localCluster")); + assertEquals(Arrays.asList("index2"), result.get("remoteCluster2")); } - public void testParseIndexName() { - assertEquals("index1", CrossClusterConfigUtils.parseIndexName("remoteCluster:index1")); - assertEquals("index2", CrossClusterConfigUtils.parseIndexName("index2")); - } - - public void testParseClusterName() { - assertEquals("remoteCluster", CrossClusterConfigUtils.parseClusterName("remoteCluster:index1")); - assertEquals("", CrossClusterConfigUtils.parseClusterName("index2")); + public void testParseClusterAndIndexName_WithClusterAndIndex() { + String input = "clusterA:index1"; + Pair result = CrossClusterConfigUtils.parseClusterAndIndexName(input); + assertEquals("clusterA", result.getKey()); + assertEquals("index1", result.getValue()); } }