Skip to content

Commit

Permalink
Fix 1.x compatibility bug with stored Tasks (opensearch-project#5412)
Browse files Browse the repository at this point in the history
When the new 'cancelled' field was introduced it was a miss not to
increment the version number on the mapping definitions for the .tasks
index. This commit fixes that oversight, as well as modifies the
existing backward compatiblity test to ensure that it will catch future
mistakes like this one.

Closes opensearch-project#5376

Signed-off-by: Andrew Ross <andrross@amazon.com>
  • Loading branch information
andrross authored Nov 30, 2022
1 parent c7ba253 commit 4616dfa
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Deprecated
### Removed
### Fixed
- Fix 1.x compatibility bug with stored Tasks ([#5412](https://github.com/opensearch-project/OpenSearch/pull/5412))
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.4...HEAD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@

package org.opensearch.upgrades;

import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.hamcrest.MatcherAssert;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.test.XContentTestUtils.JsonMapView;

import java.io.IOException;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
Expand All @@ -62,25 +64,7 @@ public void testSystemIndicesUpgrades() throws Exception {
"{\"f1\": \"v1\", \"f2\": \"v2\"}\n");
client().performRequest(bulk);

// start a async reindex job
Request reindex = new Request("POST", "/_reindex");
reindex.setJsonEntity(
"{\n" +
" \"source\":{\n" +
" \"index\":\"test_index_old\"\n" +
" },\n" +
" \"dest\":{\n" +
" \"index\":\"test_index_reindex\"\n" +
" }\n" +
"}");
reindex.addParameter("wait_for_completion", "false");
Map<String, Object> response = entityAsMap(client().performRequest(reindex));
String taskId = (String) response.get("task");

// wait for task
Request getTask = new Request("GET", "/_tasks/" + taskId);
getTask.addParameter("wait_for_completion", "true");
client().performRequest(getTask);
createAndVerifyStoredTask();

// make sure .tasks index exists
Request getTasksIndex = new Request("GET", "/.tasks");
Expand All @@ -97,6 +81,8 @@ public void testSystemIndicesUpgrades() throws Exception {
}
});
} else if (CLUSTER_TYPE == ClusterType.UPGRADED) {
createAndVerifyStoredTask();

assertBusy(() -> {
Request clusterStateRequest = new Request("GET", "/_cluster/state/metadata");
Map<String, Object> indices = new JsonMapView(entityAsMap(client().performRequest(clusterStateRequest)))
Expand All @@ -115,4 +101,29 @@ public void testSystemIndicesUpgrades() throws Exception {
});
}
}

/**
* Completed tasks get persisted into the .tasks index, so this method waits
* until the task is completed in order to verify that it has been successfully
* written to the index and can be retrieved.
*/
private static void createAndVerifyStoredTask() throws Exception {
// Use update by query to create an async task
final Request updateByQueryRequest = new Request("POST", "/test_index_old/_update_by_query");
updateByQueryRequest.addParameter("wait_for_completion", "false");
final Response updateByQueryResponse = client().performRequest(updateByQueryRequest);
MatcherAssert.assertThat(updateByQueryResponse.getStatusLine().getStatusCode(), equalTo(200));
final String taskId = (String) entityAsMap(updateByQueryResponse).get("task");

// wait for task to complete
waitUntil(() -> {
try {
final Response getTaskResponse = client().performRequest(new Request("GET", "/_tasks/" + taskId));
MatcherAssert.assertThat(getTaskResponse.getStatusLine().getStatusCode(), equalTo(200));
return (Boolean) entityAsMap(getTaskResponse).get("completed");
} catch (IOException e) {
throw new AssertionError(e);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public class TaskResultsService {

public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version";

public static final int TASK_RESULT_MAPPING_VERSION = 3; // must match version in task-index-mapping.json
public static final int TASK_RESULT_MAPPING_VERSION = 4; // must match version in task-index-mapping.json

/**
* The backoff policy to use when saving a task result fails. The total wait
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"_doc" : {
"_meta": {
"version": 3
"version": 4
},
"dynamic" : "strict",
"properties" : {
Expand Down

0 comments on commit 4616dfa

Please sign in to comment.