Skip to content

Commit

Permalink
Revert back refresh policy in RequestConverters. (#55)
Browse files Browse the repository at this point in the history
This PR reverts back the deleted code (#16, #54) related to refresh policies.

Signed-off-by: Peter Nied <petern@amazon.com>
  • Loading branch information
adnapibar authored and peternied committed Mar 13, 2021
1 parent a1b80a5 commit 5f92d71
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ private static Request getStyleRequest(String method, GetRequest getRequest) {
Params parameters = new Params();
parameters.withPreference(getRequest.preference());
parameters.withRouting(getRequest.routing());
parameters.withRefresh(getRequest.refresh());
parameters.withRealtime(getRequest.realtime());
parameters.withStoredFields(getRequest.storedFields());
parameters.withVersion(getRequest.version());
Expand All @@ -292,6 +293,7 @@ private static Request sourceRequest(GetSourceRequest getSourceRequest, String h
Params parameters = new Params();
parameters.withPreference(getSourceRequest.preference());
parameters.withRouting(getSourceRequest.routing());
parameters.withRefresh(getSourceRequest.refresh());
parameters.withRealtime(getSourceRequest.realtime());
parameters.withFetchSourceContext(getSourceRequest.fetchSourceContext());

Expand All @@ -313,6 +315,7 @@ static Request multiGet(MultiGetRequest multiGetRequest) throws IOException {
Params parameters = new Params();
parameters.withPreference(multiGetRequest.preference());
parameters.withRealtime(multiGetRequest.realtime());
parameters.withRefresh(multiGetRequest.refresh());
request.addParameters(parameters.asMap());
request.setEntity(createEntity(multiGetRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
Expand Down Expand Up @@ -588,6 +591,7 @@ private static Request prepareReindexRequest(ReindexRequest reindexRequest, bool
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
Params params = new Params()
.withWaitForCompletion(waitForCompletion)
.withRefresh(reindexRequest.isRefresh())
.withTimeout(reindexRequest.getTimeout())
.withWaitForActiveShards(reindexRequest.getWaitForActiveShards())
.withRequestsPerSecond(reindexRequest.getRequestsPerSecond())
Expand All @@ -608,6 +612,7 @@ private static Request prepareDeleteByQueryRequest(DeleteByQueryRequest deleteBy
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
Params params = new Params()
.withRouting(deleteByQueryRequest.getRouting())
.withRefresh(deleteByQueryRequest.isRefresh())
.withTimeout(deleteByQueryRequest.getTimeout())
.withWaitForActiveShards(deleteByQueryRequest.getWaitForActiveShards())
.withRequestsPerSecond(deleteByQueryRequest.getRequestsPerSecond())
Expand Down Expand Up @@ -639,6 +644,7 @@ static Request prepareUpdateByQueryRequest(UpdateByQueryRequest updateByQueryReq
Params params = new Params()
.withRouting(updateByQueryRequest.getRouting())
.withPipeline(updateByQueryRequest.getPipeline())
.withRefresh(updateByQueryRequest.isRefresh())
.withTimeout(updateByQueryRequest.getTimeout())
.withWaitForActiveShards(updateByQueryRequest.getWaitForActiveShards())
.withRequestsPerSecond(updateByQueryRequest.getRequestsPerSecond())
Expand Down Expand Up @@ -908,6 +914,13 @@ Params withRealtime(boolean realtime) {
return this;
}

Params withRefresh(boolean refresh) {
if (refresh) {
return withRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
return this;
}

/**
* @deprecated
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,13 @@ private static void doTestSourceExists(BiFunction<String, String, GetSourceReque
expectedParams.put("realtime", "false");
}
}
if (randomBoolean()) {
boolean refresh = randomBoolean();
getRequest.refresh(refresh);
if (refresh) {
expectedParams.put("refresh", "true");
}
}
Request request = RequestConverters.sourceExists(getRequest);
assertEquals(HttpHead.METHOD_NAME, request.getMethod());
String type = getRequest.type();
Expand Down Expand Up @@ -235,6 +242,13 @@ private static void doTestGetSource(BiFunction<String, String, GetSourceRequest>
expectedParams.put("realtime", "false");
}
}
if (randomBoolean()) {
boolean refresh = randomBoolean();
getRequest.refresh(refresh);
if (refresh) {
expectedParams.put("refresh", "true");
}
}
Request request = RequestConverters.getSource(getRequest);
assertEquals(HttpGet.METHOD_NAME, request.getMethod());
assertEquals("/" + index + "/_source/" + id, request.getEndpoint());
Expand All @@ -257,6 +271,12 @@ public void testMultiGet() throws IOException {
expectedParams.put("realtime", "false");
}
}
if (randomBoolean()) {
multiGetRequest.refresh(randomBoolean());
if (multiGetRequest.refresh()) {
expectedParams.put("refresh", "true");
}
}

int numberOfRequests = randomIntBetween(0, 32);
for (int i = 0; i < numberOfRequests; i++) {
Expand Down Expand Up @@ -307,6 +327,7 @@ public void testDelete() {
Map<String, String> expectedParams = new HashMap<>();

setRandomTimeout(deleteRequest::timeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams);
setRandomRefreshPolicy(deleteRequest::setRefreshPolicy, expectedParams);
setRandomVersion(deleteRequest, expectedParams);
setRandomVersionType(deleteRequest::versionType, expectedParams);
setRandomIfSeqNoAndTerm(deleteRequest, expectedParams);
Expand Down Expand Up @@ -370,6 +391,13 @@ private static void getAndExistsTest(Function<GetRequest, Request> requestConver
expectedParams.put("realtime", "false");
}
}
if (randomBoolean()) {
boolean refresh = randomBoolean();
getRequest.refresh(refresh);
if (refresh) {
expectedParams.put("refresh", "true");
}
}
if (randomBoolean()) {
long version = randomLong();
getRequest.version(version);
Expand Down Expand Up @@ -693,6 +721,7 @@ public void testIndex() throws IOException {
}

setRandomTimeout(indexRequest::timeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams);
setRandomRefreshPolicy(indexRequest::setRefreshPolicy, expectedParams);

// There is some logic around _create endpoint and version/version type
if (indexRequest.opType() == DocWriteRequest.OpType.CREATE) {
Expand Down Expand Up @@ -829,6 +858,13 @@ public void testUpdate() throws IOException {
} else {
expectedParams.put("timeout", ReplicationRequest.DEFAULT_TIMEOUT.getStringRep());
}
if (randomBoolean()) {
WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());
updateRequest.setRefreshPolicy(refreshPolicy);
if (refreshPolicy != WriteRequest.RefreshPolicy.NONE) {
expectedParams.put("refresh", refreshPolicy.getValue());
}
}
setRandomWaitForActiveShards(updateRequest::waitForActiveShards, expectedParams);
setRandomIfSeqNoAndTerm(updateRequest, new HashMap<>()); // if* params are passed in the body
if (randomBoolean()) {
Expand Down Expand Up @@ -931,6 +967,7 @@ public void testBulk() throws IOException {
expectedParams.put("timeout", BulkShardRequest.DEFAULT_TIMEOUT.getStringRep());
}

setRandomRefreshPolicy(bulkRequest::setRefreshPolicy, expectedParams);

XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE);

Expand Down Expand Up @@ -2203,6 +2240,16 @@ static void setRandomWaitForActiveShards(Consumer<ActiveShardCount> setter, Acti
}
}

private static void setRandomRefreshPolicy(Consumer<WriteRequest.RefreshPolicy> setter, Map<String, String> expectedParams) {
if (randomBoolean()) {
WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());
setter.accept(refreshPolicy);
if (refreshPolicy != WriteRequest.RefreshPolicy.NONE) {
expectedParams.put("refresh", refreshPolicy.getValue());
}
}
}

private static void setRandomVersion(DocWriteRequest<?> request, Map<String, String> expectedParams) {
if (randomBoolean()) {
long version = randomFrom(Versions.MATCH_ANY, Versions.MATCH_DELETED, Versions.NOT_FOUND, randomNonNegativeLong());
Expand Down

0 comments on commit 5f92d71

Please sign in to comment.