From 189c19395d71e36e623e67e31c7537a5bfae7a15 Mon Sep 17 00:00:00 2001 From: Liyun Xiu Date: Fri, 12 Jul 2024 14:28:16 +0800 Subject: [PATCH] Add deprecation log for batch_size Signed-off-by: Liyun Xiu --- .../org/opensearch/ingest/IngestService.java | 60 +------------------ .../rest/action/document/RestBulkAction.java | 8 ++- 2 files changed, 8 insertions(+), 60 deletions(-) diff --git a/server/src/main/java/org/opensearch/ingest/IngestService.java b/server/src/main/java/org/opensearch/ingest/IngestService.java index a7c2150d0eb1b..17eb23422e68b 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestService.java +++ b/server/src/main/java/org/opensearch/ingest/IngestService.java @@ -525,61 +525,7 @@ public void onFailure(Exception e) { @Override protected void doRun() { - int batchSize = originalBulkRequest.batchSize(); - if (shouldExecuteBulkRequestInBatch(originalBulkRequest.requests().size(), batchSize)) { - runBulkRequestInBatch(numberOfActionRequests, actionRequests, onFailure, onCompletion, onDropped, originalBulkRequest); - return; - } - - final Thread originalThread = Thread.currentThread(); - final AtomicInteger counter = new AtomicInteger(numberOfActionRequests); - int i = 0; - for (DocWriteRequest actionRequest : actionRequests) { - IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest); - if (indexRequest == null) { - if (counter.decrementAndGet() == 0) { - onCompletion.accept(originalThread, null); - } - assert counter.get() >= 0; - i++; - continue; - } - final String pipelineId = indexRequest.getPipeline(); - indexRequest.setPipeline(NOOP_PIPELINE_NAME); - final String finalPipelineId = indexRequest.getFinalPipeline(); - indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME); - boolean hasFinalPipeline = true; - final List pipelines; - if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false - && IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { - pipelines = Arrays.asList(pipelineId, finalPipelineId); - } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false) { - pipelines = Collections.singletonList(pipelineId); - hasFinalPipeline = false; - } else if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { - pipelines = Collections.singletonList(finalPipelineId); - } else { - if (counter.decrementAndGet() == 0) { - onCompletion.accept(originalThread, null); - } - assert counter.get() >= 0; - i++; - continue; - } - - executePipelines( - i, - pipelines.iterator(), - hasFinalPipeline, - indexRequest, - onDropped, - onFailure, - counter, - onCompletion, - originalThread - ); - i++; - } + runBulkRequestInBatch(numberOfActionRequests, actionRequests, onFailure, onCompletion, onDropped, originalBulkRequest); } }); } @@ -654,10 +600,6 @@ private void runBulkRequestInBatch( } } - private boolean shouldExecuteBulkRequestInBatch(int documentSize, int batchSize) { - return batchSize > 1; - } - /** * IndexRequests are grouped by unique (index + pipeline_ids) before batching. * Only IndexRequests in the same group could be batched. It's to ensure batched documents always diff --git a/server/src/main/java/org/opensearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/opensearch/rest/action/document/RestBulkAction.java index 0bc4234c9b8b8..ce52c5620b968 100644 --- a/server/src/main/java/org/opensearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/opensearch/rest/action/document/RestBulkAction.java @@ -38,6 +38,7 @@ import org.opensearch.action.support.ActiveShardCount; import org.opensearch.client.Requests; import org.opensearch.client.node.NodeClient; +import org.opensearch.common.logging.DeprecationLogger; import org.opensearch.common.settings.Settings; import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.RestRequest; @@ -66,6 +67,8 @@ public class RestBulkAction extends BaseRestHandler { private final boolean allowExplicitIndex; + private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestBulkAction.class); + static final String BATCH_SIZE_DEPRECATED_MESSAGE = "The batch size option in bulk API is deprecated and will be removed in 3.0."; public RestBulkAction(Settings settings) { this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings); @@ -97,7 +100,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null); bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); bulkRequest.setRefreshPolicy(request.param("refresh")); - bulkRequest.batchSize(request.paramAsInt("batch_size", 1)); + if (request.hasParam("batch_size")) { + deprecationLogger.deprecate("batch_size_deprecation", BATCH_SIZE_DEPRECATED_MESSAGE); + } + bulkRequest.batchSize(request.paramAsInt("batch_size", Integer.MAX_VALUE)); bulkRequest.add( request.requiredContent(), defaultIndex,