From 4616dfac5382062a5149c799554dda4ff601369f Mon Sep 17 00:00:00 2001 From: Andrew Ross Date: Wed, 30 Nov 2022 11:47:24 -0800 Subject: [PATCH] Fix 1.x compatibility bug with stored Tasks (#5412) When the new 'cancelled' field was introduced it was a miss not to increment the version number on the mapping definitions for the .tasks index. This commit fixes that oversight, as well as modifies the existing backward compatiblity test to ensure that it will catch future mistakes like this one. Closes #5376 Signed-off-by: Andrew Ross --- CHANGELOG.md | 1 + .../upgrades/SystemIndicesUpgradeIT.java | 53 +++++++++++-------- .../opensearch/tasks/TaskResultsService.java | 2 +- .../opensearch/tasks/task-index-mapping.json | 2 +- 4 files changed, 35 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aaa53632a4c08..28548021d8aa9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -97,6 +97,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Deprecated ### Removed ### Fixed +- Fix 1.x compatibility bug with stored Tasks ([#5412](https://github.com/opensearch-project/OpenSearch/pull/5412)) ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.4...HEAD diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/SystemIndicesUpgradeIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/SystemIndicesUpgradeIT.java index 8bebb3881e3fd..2d26238763a09 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/SystemIndicesUpgradeIT.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/SystemIndicesUpgradeIT.java @@ -32,14 +32,16 @@ package org.opensearch.upgrades; -import org.opensearch.LegacyESVersion; -import org.opensearch.Version; +import org.hamcrest.MatcherAssert; import org.opensearch.client.Request; +import org.opensearch.client.Response; import org.opensearch.client.ResponseException; import org.opensearch.test.XContentTestUtils.JsonMapView; +import java.io.IOException; import java.util.Map; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -62,25 +64,7 @@ public void testSystemIndicesUpgrades() throws Exception { "{\"f1\": \"v1\", \"f2\": \"v2\"}\n"); client().performRequest(bulk); - // start a async reindex job - Request reindex = new Request("POST", "/_reindex"); - reindex.setJsonEntity( - "{\n" + - " \"source\":{\n" + - " \"index\":\"test_index_old\"\n" + - " },\n" + - " \"dest\":{\n" + - " \"index\":\"test_index_reindex\"\n" + - " }\n" + - "}"); - reindex.addParameter("wait_for_completion", "false"); - Map response = entityAsMap(client().performRequest(reindex)); - String taskId = (String) response.get("task"); - - // wait for task - Request getTask = new Request("GET", "/_tasks/" + taskId); - getTask.addParameter("wait_for_completion", "true"); - client().performRequest(getTask); + createAndVerifyStoredTask(); // make sure .tasks index exists Request getTasksIndex = new Request("GET", "/.tasks"); @@ -97,6 +81,8 @@ public void testSystemIndicesUpgrades() throws Exception { } }); } else if (CLUSTER_TYPE == ClusterType.UPGRADED) { + createAndVerifyStoredTask(); + assertBusy(() -> { Request clusterStateRequest = new Request("GET", "/_cluster/state/metadata"); Map indices = new JsonMapView(entityAsMap(client().performRequest(clusterStateRequest))) @@ -115,4 +101,29 @@ public void testSystemIndicesUpgrades() throws Exception { }); } } + + /** + * Completed tasks get persisted into the .tasks index, so this method waits + * until the task is completed in order to verify that it has been successfully + * written to the index and can be retrieved. + */ + private static void createAndVerifyStoredTask() throws Exception { + // Use update by query to create an async task + final Request updateByQueryRequest = new Request("POST", "/test_index_old/_update_by_query"); + updateByQueryRequest.addParameter("wait_for_completion", "false"); + final Response updateByQueryResponse = client().performRequest(updateByQueryRequest); + MatcherAssert.assertThat(updateByQueryResponse.getStatusLine().getStatusCode(), equalTo(200)); + final String taskId = (String) entityAsMap(updateByQueryResponse).get("task"); + + // wait for task to complete + waitUntil(() -> { + try { + final Response getTaskResponse = client().performRequest(new Request("GET", "/_tasks/" + taskId)); + MatcherAssert.assertThat(getTaskResponse.getStatusLine().getStatusCode(), equalTo(200)); + return (Boolean) entityAsMap(getTaskResponse).get("completed"); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + } } diff --git a/server/src/main/java/org/opensearch/tasks/TaskResultsService.java b/server/src/main/java/org/opensearch/tasks/TaskResultsService.java index 66d3aeb748cf7..accc02624f71c 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskResultsService.java +++ b/server/src/main/java/org/opensearch/tasks/TaskResultsService.java @@ -86,7 +86,7 @@ public class TaskResultsService { public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version"; - public static final int TASK_RESULT_MAPPING_VERSION = 3; // must match version in task-index-mapping.json + public static final int TASK_RESULT_MAPPING_VERSION = 4; // must match version in task-index-mapping.json /** * The backoff policy to use when saving a task result fails. The total wait diff --git a/server/src/main/resources/org/opensearch/tasks/task-index-mapping.json b/server/src/main/resources/org/opensearch/tasks/task-index-mapping.json index 54e9d39902f03..58b6b2d3bc873 100644 --- a/server/src/main/resources/org/opensearch/tasks/task-index-mapping.json +++ b/server/src/main/resources/org/opensearch/tasks/task-index-mapping.json @@ -1,7 +1,7 @@ { "_doc" : { "_meta": { - "version": 3 + "version": 4 }, "dynamic" : "strict", "properties" : {