From 3d6b01f8d3b05d6ece3c11dc3b0950a0314bed52 Mon Sep 17 00:00:00 2001 From: Hailong Cui Date: Mon, 15 Jan 2024 18:10:55 +0800 Subject: [PATCH] update index name and error handling Signed-off-by: Hailong Cui --- .../java/org/opensearch/agent/ToolPlugin.java | 4 +-- .../agent/indices/IndicesHelper.java | 2 +- .../agent/indices/SkillsIndexEnum.java | 8 ++--- .../opensearch/agent/job/AgentMonitorJob.java | 1 + .../agent/job/IndexSummaryEmbeddingJob.java | 36 ++++++++++++++++--- .../org/opensearch/agent/job/MLClients.java | 13 +++++-- .../SkillsClusterManagerEventListener.java | 7 ++-- .../agent/tools/IndexRoutingTool.java | 29 ++++++++++++--- .../agent/tools/utils/LLMProvider.java | 2 +- ...plugins-skills-index-summary-mapping.json} | 0 ...plugins-skills-index-summary-setting.json} | 0 11 files changed, 79 insertions(+), 23 deletions(-) rename src/main/resources/{.index_summary_embedding_index_mapping.json => .plugins-skills-index-summary-mapping.json} (100%) rename src/main/resources/{.index_summary_embedding_index_setting.json => .plugins-skills-index-summary-setting.json} (100%) diff --git a/src/main/java/org/opensearch/agent/ToolPlugin.java b/src/main/java/org/opensearch/agent/ToolPlugin.java index 909985bc..c4186511 100644 --- a/src/main/java/org/opensearch/agent/ToolPlugin.java +++ b/src/main/java/org/opensearch/agent/ToolPlugin.java @@ -72,7 +72,7 @@ public Collection createComponents( this.clusterService = clusterService; this.xContentRegistry = xContentRegistry; - mlClients = new MLClients(client, xContentRegistry); + mlClients = new MLClients(client, xContentRegistry, clusterService); indicesHelper = new IndicesHelper(clusterService, client, mlClients); SkillsClusterManagerEventListener clusterManagerEventListener = new SkillsClusterManagerEventListener( clusterService, @@ -94,7 +94,7 @@ public Collection createComponents( SearchAnomalyDetectorsTool.Factory.getInstance().init(client); SearchAnomalyResultsTool.Factory.getInstance().init(client); SearchMonitorsTool.Factory.getInstance().init(client); - IndexRoutingTool.Factory.getInstance().init(client, xContentRegistry); + IndexRoutingTool.Factory.getInstance().init(client, xContentRegistry, clusterService); return List.of(clusterManagerEventListener); } diff --git a/src/main/java/org/opensearch/agent/indices/IndicesHelper.java b/src/main/java/org/opensearch/agent/indices/IndicesHelper.java index efec0334..d3c60a39 100644 --- a/src/main/java/org/opensearch/agent/indices/IndicesHelper.java +++ b/src/main/java/org/opensearch/agent/indices/IndicesHelper.java @@ -56,7 +56,7 @@ public class IndicesHelper { } public void initIndexSummaryEmbeddingIndex(ActionListener listener) { - initIndexIfAbsent(SkillsIndexEnum.SKILLS_INDEX_SUMMARY_EMBEDDING_INDEX, listener); + initIndexIfAbsent(SkillsIndexEnum.SKILLS_INDEX_SUMMARY, listener); } public void initIndexIfAbsent(SkillsIndexEnum skillsIndexEnum, ActionListener listener) { diff --git a/src/main/java/org/opensearch/agent/indices/SkillsIndexEnum.java b/src/main/java/org/opensearch/agent/indices/SkillsIndexEnum.java index 4a8f9e99..8c2fcbb5 100644 --- a/src/main/java/org/opensearch/agent/indices/SkillsIndexEnum.java +++ b/src/main/java/org/opensearch/agent/indices/SkillsIndexEnum.java @@ -15,10 +15,10 @@ @Getter public enum SkillsIndexEnum { - SKILLS_INDEX_SUMMARY_EMBEDDING_INDEX( - ".index_summary_embedding_index", - "/.index_summary_embedding_index_setting.json", - "/.index_summary_embedding_index_mapping.json", + SKILLS_INDEX_SUMMARY( + ".plugins-skills-index-summary", + "/.plugins-skills-index-summary-setting.json", + "/.plugins-skills-index-summary-mapping.json", 0 ); diff --git a/src/main/java/org/opensearch/agent/job/AgentMonitorJob.java b/src/main/java/org/opensearch/agent/job/AgentMonitorJob.java index a22f7e29..59f31249 100644 --- a/src/main/java/org/opensearch/agent/job/AgentMonitorJob.java +++ b/src/main/java/org/opensearch/agent/job/AgentMonitorJob.java @@ -56,6 +56,7 @@ public void run() { embeddingModelIds.removeAll(EMBEDDING_MODEL_IDS); if (!embeddingModelIds.isEmpty()) { IndexSummaryEmbeddingJob job = new IndexSummaryEmbeddingJob(client, clusterService, indicesHelper, mlClients); + job.setAdhocModelIds(embeddingModelIds); threadPool.schedule(job, TimeValue.timeValueSeconds(5), GENERIC); } }, exception -> log.info("Query agent for index routing tool failed.", exception))); diff --git a/src/main/java/org/opensearch/agent/job/IndexSummaryEmbeddingJob.java b/src/main/java/org/opensearch/agent/job/IndexSummaryEmbeddingJob.java index 587bf896..9f9c84c9 100644 --- a/src/main/java/org/opensearch/agent/job/IndexSummaryEmbeddingJob.java +++ b/src/main/java/org/opensearch/agent/job/IndexSummaryEmbeddingJob.java @@ -5,7 +5,7 @@ package org.opensearch.agent.job; -import static org.opensearch.agent.indices.SkillsIndexEnum.SKILLS_INDEX_SUMMARY_EMBEDDING_INDEX; +import static org.opensearch.agent.indices.SkillsIndexEnum.SKILLS_INDEX_SUMMARY; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -63,7 +63,7 @@ public class IndexSummaryEmbeddingJob implements Runnable { private IndicesHelper indicesHelper; private MLClients mlClients; - public static String INDEX_SUMMARY_EMBEDDING_INDEX = ".index_summary_embedding_index"; + public static String INDEX_SUMMARY_EMBEDDING_INDEX = SKILLS_INDEX_SUMMARY.getIndexName(); public static String INDEX_SUMMARY_EMBEDDING_FIELD_PREFIX = "index_summary_embedding"; public static String INDEX_NAME_FIELD = "index_name"; public static String DATA_STREAM_FIELD = "data_stream"; @@ -79,6 +79,7 @@ public class IndexSummaryEmbeddingJob implements Runnable { public static String INDEX_EMBEDDING = "embedding"; public static String SENTENCE_EMBEDDING = "sentence_embedding"; public static int DEFAULT_TIMEOUT_SECOND = 30; + public static int TOKEN_LIMIT = 8192; @Setter private List adhocIndexName; @@ -96,6 +97,7 @@ public IndexSummaryEmbeddingJob(Client client, ClusterService clusterService, In @Override public void run() { + // TODO to distribute to other nodes to execute the workload other than cluster manager node // search agent with IndexRoutingTool mlClients.getModelIdsForIndexRoutingTool(adhocModelIds, ActionListener.wrap(embeddingModelIds -> { // no embedding model @@ -205,9 +207,19 @@ private List> getAllIndexMappingAndSampleData() { SearchResponse searchResponse = searchFuture.get(DEFAULT_TIMEOUT_SECOND, TimeUnit.SECONDS); List documents = new ArrayList<>(); - // TODO add token limit check + int tokenNumber = 0; for (SearchHit hit : searchResponse.getHits()) { - documents.add(Strings.toString(MediaTypeRegistry.JSON, hit)); + String docContent = Strings.toString(MediaTypeRegistry.JSON, hit); + int tokens = countToken(docContent); + if (tokenNumber + tokens > TOKEN_LIMIT) { + // at least 1 sample data + if (documents.isEmpty()) { + documents.add(docContent); + } + break; + } + documents.add(docContent); + tokenNumber += tokens; } String indexSummary = String.format(Locale.ROOT, "Index Mappings:%s\\nSample data:\\n%s", mapping, documents); @@ -224,6 +236,20 @@ private List> getAllIndexMappingAndSampleData() { return indexSummaryList; } + /** + * 1 token ~= 4 chars in English + * 1 token ~= ¾ words + * 100 tokens ~= 75 words + * @param sentence + * @return + */ + private int countToken(String sentence) { + if (sentence == null) { + return 0; + } + return sentence.getBytes(StandardCharsets.UTF_8).length / 4; + } + private void indexSummaryVector(String writeIndex, List> docs, String modelId) { // init index and mapping indicesHelper.initIndexSummaryEmbeddingIndex(ActionListener.wrap(initialed -> { @@ -240,7 +266,7 @@ private void indexSummaryVector(String writeIndex, List> doc } private void BulkUpdateVectorField(String writeIndex, List> docs, String modelId) { - indicesHelper.addNewVectorField(SKILLS_INDEX_SUMMARY_EMBEDDING_INDEX.getIndexName(), modelId, ActionListener.wrap(r -> { + indicesHelper.addNewVectorField(SKILLS_INDEX_SUMMARY.getIndexName(), modelId, ActionListener.wrap(r -> { if (r) { bulkWrite(writeIndex, docs, modelId); } else { diff --git a/src/main/java/org/opensearch/agent/job/MLClients.java b/src/main/java/org/opensearch/agent/job/MLClients.java index 289aee4a..0250e071 100644 --- a/src/main/java/org/opensearch/agent/job/MLClients.java +++ b/src/main/java/org/opensearch/agent/job/MLClients.java @@ -25,6 +25,7 @@ import org.opensearch.agent.tools.IndexRoutingTool; import org.opensearch.client.Client; import org.opensearch.client.Requests; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentHelper; @@ -35,6 +36,7 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.query.QueryBuilders; import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.ml.common.CommonValue; import org.opensearch.ml.common.FunctionName; import org.opensearch.ml.common.MLTaskState; import org.opensearch.ml.common.agent.MLAgent; @@ -64,9 +66,12 @@ public class MLClients { private NamedXContentRegistry xContentRegistry; - public MLClients(Client client, NamedXContentRegistry xContentRegistry) { + private ClusterService clusterService; + + public MLClients(Client client, NamedXContentRegistry xContentRegistry, ClusterService clusterService) { this.client = client; this.xContentRegistry = xContentRegistry; + this.clusterService = clusterService; } public T getEmbeddingResult(String modelId, List texts, boolean deploy, Function parser) { @@ -123,6 +128,10 @@ public void getModelIdsForIndexRoutingTool(List adhocModelIds, ActionLis } public void getModelIdsForIndexRoutingTool(ActionListener> listener) { + if (!clusterService.state().metadata().hasIndex(CommonValue.ML_AGENT_INDEX)) { + listener.onResponse(Collections.emptyList()); + return; + } // search agent with IndexRoutingTool SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); String termQueryKey = String.format(Locale.ROOT, "%s.%s", MLAgent.TOOLS_FIELD, MLToolSpec.TOOL_TYPE_FIELD); @@ -143,7 +152,7 @@ public void getModelIdsForIndexRoutingTool(ActionListener> listener } listener.onResponse(new ArrayList<>(embeddingModelIds)); }, ex -> { - if (ExceptionsHelper.unwrap(ex) instanceof IndexNotFoundException) { + if (ExceptionsHelper.unwrapCause(ex) instanceof IndexNotFoundException) { listener.onResponse(Collections.emptyList()); } else { listener.onFailure(ex); diff --git a/src/main/java/org/opensearch/agent/job/SkillsClusterManagerEventListener.java b/src/main/java/org/opensearch/agent/job/SkillsClusterManagerEventListener.java index 7fcfd73a..05e03551 100644 --- a/src/main/java/org/opensearch/agent/job/SkillsClusterManagerEventListener.java +++ b/src/main/java/org/opensearch/agent/job/SkillsClusterManagerEventListener.java @@ -5,7 +5,7 @@ package org.opensearch.agent.job; -import static org.opensearch.agent.indices.SkillsIndexEnum.SKILLS_INDEX_SUMMARY_EMBEDDING_INDEX; +import static org.opensearch.agent.indices.SkillsIndexEnum.SKILLS_INDEX_SUMMARY; import static org.opensearch.threadpool.ThreadPool.Names.GENERIC; import java.util.List; @@ -122,10 +122,9 @@ public void clusterChanged(ClusterChangedEvent event) { threadPool.schedule(job, TimeValue.timeValueSeconds(30), GENERIC); } - if (!event.indicesDeleted().isEmpty() - && clusterService.state().metadata().hasIndex(SKILLS_INDEX_SUMMARY_EMBEDDING_INDEX.getIndexName())) { + if (!event.indicesDeleted().isEmpty() && clusterService.state().metadata().hasIndex(SKILLS_INDEX_SUMMARY.getIndexName())) { List indexNames = event.indicesDeleted().stream().map(Index::getName).collect(Collectors.toList()); - job.bulkDelete(SKILLS_INDEX_SUMMARY_EMBEDDING_INDEX.getIndexName(), indexNames); + job.bulkDelete(SKILLS_INDEX_SUMMARY.getIndexName(), indexNames); } } } diff --git a/src/main/java/org/opensearch/agent/tools/IndexRoutingTool.java b/src/main/java/org/opensearch/agent/tools/IndexRoutingTool.java index 26849e0c..5b2b4cfa 100644 --- a/src/main/java/org/opensearch/agent/tools/IndexRoutingTool.java +++ b/src/main/java/org/opensearch/agent/tools/IndexRoutingTool.java @@ -32,6 +32,7 @@ import org.opensearch.agent.job.MLClients; import org.opensearch.agent.tools.utils.LLMProvider; import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.io.Streams; import org.opensearch.core.action.ActionListener; import org.opensearch.core.xcontent.NamedXContentRegistry; @@ -75,6 +76,8 @@ public class IndexRoutingTool extends VectorDBTool { private final MLClients mlClients; + private final ClusterService clusterService; + @Setter private String prompt; @@ -84,7 +87,8 @@ public IndexRoutingTool( Integer docSize, Integer k, String embeddingModelId, - String inferenceModelId + String inferenceModelId, + ClusterService clusterService ) { super( client, @@ -101,8 +105,9 @@ public IndexRoutingTool( embeddingModelId, Optional.ofNullable(k).orElse(DEFAULT_K) ); - this.mlClients = new MLClients(client, xContentRegistry); + this.mlClients = new MLClients(client, xContentRegistry, clusterService); this.inferenceModelId = inferenceModelId; + this.clusterService = clusterService; } @Override @@ -128,6 +133,11 @@ protected Parser searchResponseParser() { @Override public void run(Map parameters, ActionListener listener) { log.debug("input={}", parameters.get(INPUT_FIELD)); + if (!clusterService.state().metadata().hasIndex(IndexSummaryEmbeddingJob.INDEX_SUMMARY_EMBEDDING_INDEX)) { + log.debug("Index summary index not exists, return not sure directly"); + listener.onResponse((T) "Not sure"); + return; + } // get index of knn-index super.run(parameters, ActionListener.wrap(res -> { List> summaries = (List>) res; @@ -240,6 +250,8 @@ public static class Factory implements Tool.Factory { private Client client; private NamedXContentRegistry xContentRegistry; + private ClusterService clusterService; + private static IndexRoutingTool.Factory INSTANCE; public static IndexRoutingTool.Factory getInstance() { @@ -255,9 +267,10 @@ public static IndexRoutingTool.Factory getInstance() { } } - public void init(Client client, NamedXContentRegistry xContentRegistry) { + public void init(Client client, NamedXContentRegistry xContentRegistry, ClusterService clusterService) { this.client = client; this.xContentRegistry = xContentRegistry; + this.clusterService = clusterService; } @Override @@ -282,7 +295,15 @@ public IndexRoutingTool create(Map params) { Integer docSize = params.containsKey(DOC_SIZE_FIELD) ? Integer.parseInt((String) params.get(DOC_SIZE_FIELD)) : DEFAULT_K; Integer k = params.containsKey(K_FIELD) ? Integer.parseInt((String) params.get(K_FIELD)) : DEFAULT_K; - IndexRoutingTool tool = new IndexRoutingTool(client, xContentRegistry, docSize, k, embeddingModelId, inferenceModelId); + IndexRoutingTool tool = new IndexRoutingTool( + client, + xContentRegistry, + docSize, + k, + embeddingModelId, + inferenceModelId, + clusterService + ); tool.setPrompt(llmProvider.getPromptFormat().replace("${prompt}", promptTemplate)); return tool; diff --git a/src/main/java/org/opensearch/agent/tools/utils/LLMProvider.java b/src/main/java/org/opensearch/agent/tools/utils/LLMProvider.java index c36c159f..dd8a5b22 100644 --- a/src/main/java/org/opensearch/agent/tools/utils/LLMProvider.java +++ b/src/main/java/org/opensearch/agent/tools/utils/LLMProvider.java @@ -17,7 +17,7 @@ @FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE) public enum LLMProvider { OPENAI("${prompt}"), - ANTHROPIC("\\n\\nHuman: ${prompt} \\n\\nAssistant:"), + ANTHROPIC("\n\nHuman: ${prompt} \n\nAssistant:"), MISTRAL("[INST] ${prompt} [/INST]"), NONE("${prompt}"); diff --git a/src/main/resources/.index_summary_embedding_index_mapping.json b/src/main/resources/.plugins-skills-index-summary-mapping.json similarity index 100% rename from src/main/resources/.index_summary_embedding_index_mapping.json rename to src/main/resources/.plugins-skills-index-summary-mapping.json diff --git a/src/main/resources/.index_summary_embedding_index_setting.json b/src/main/resources/.plugins-skills-index-summary-setting.json similarity index 100% rename from src/main/resources/.index_summary_embedding_index_setting.json rename to src/main/resources/.plugins-skills-index-summary-setting.json