Skip to content

Commit

Permalink
Add deprecation log for batch_size
Browse files Browse the repository at this point in the history
Signed-off-by: Liyun Xiu <xiliyun@amazon.com>
  • Loading branch information
chishui committed Jul 12, 2024
1 parent acb54c1 commit 189c193
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 60 deletions.
60 changes: 1 addition & 59 deletions server/src/main/java/org/opensearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -525,61 +525,7 @@ public void onFailure(Exception e) {

@Override
protected void doRun() {
int batchSize = originalBulkRequest.batchSize();
if (shouldExecuteBulkRequestInBatch(originalBulkRequest.requests().size(), batchSize)) {
runBulkRequestInBatch(numberOfActionRequests, actionRequests, onFailure, onCompletion, onDropped, originalBulkRequest);
return;
}

final Thread originalThread = Thread.currentThread();
final AtomicInteger counter = new AtomicInteger(numberOfActionRequests);
int i = 0;
for (DocWriteRequest<?> actionRequest : actionRequests) {
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest);
if (indexRequest == null) {
if (counter.decrementAndGet() == 0) {
onCompletion.accept(originalThread, null);
}
assert counter.get() >= 0;
i++;
continue;
}
final String pipelineId = indexRequest.getPipeline();
indexRequest.setPipeline(NOOP_PIPELINE_NAME);
final String finalPipelineId = indexRequest.getFinalPipeline();
indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME);
boolean hasFinalPipeline = true;
final List<String> pipelines;
if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false
&& IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) {
pipelines = Arrays.asList(pipelineId, finalPipelineId);
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false) {
pipelines = Collections.singletonList(pipelineId);
hasFinalPipeline = false;
} else if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) {
pipelines = Collections.singletonList(finalPipelineId);
} else {
if (counter.decrementAndGet() == 0) {
onCompletion.accept(originalThread, null);
}
assert counter.get() >= 0;
i++;
continue;
}

executePipelines(
i,
pipelines.iterator(),
hasFinalPipeline,
indexRequest,
onDropped,
onFailure,
counter,
onCompletion,
originalThread
);
i++;
}
runBulkRequestInBatch(numberOfActionRequests, actionRequests, onFailure, onCompletion, onDropped, originalBulkRequest);
}
});
}
Expand Down Expand Up @@ -654,10 +600,6 @@ private void runBulkRequestInBatch(
}
}

private boolean shouldExecuteBulkRequestInBatch(int documentSize, int batchSize) {
return batchSize > 1;
}

/**
* IndexRequests are grouped by unique (index + pipeline_ids) before batching.
* Only IndexRequests in the same group could be batched. It's to ensure batched documents always
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.client.Requests;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.logging.DeprecationLogger;
import org.opensearch.common.settings.Settings;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
Expand Down Expand Up @@ -66,6 +67,8 @@
public class RestBulkAction extends BaseRestHandler {

private final boolean allowExplicitIndex;
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestBulkAction.class);
static final String BATCH_SIZE_DEPRECATED_MESSAGE = "The batch size option in bulk API is deprecated and will be removed in 3.0.";

public RestBulkAction(Settings settings) {
this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
Expand Down Expand Up @@ -97,7 +100,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null);
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));
bulkRequest.batchSize(request.paramAsInt("batch_size", 1));
if (request.hasParam("batch_size")) {
deprecationLogger.deprecate("batch_size_deprecation", BATCH_SIZE_DEPRECATED_MESSAGE);
}
bulkRequest.batchSize(request.paramAsInt("batch_size", Integer.MAX_VALUE));
bulkRequest.add(
request.requiredContent(),
defaultIndex,
Expand Down

0 comments on commit 189c193

Please sign in to comment.