Skip to content

Commit

Permalink
Change default bulk batch size to Integer.MAX_VALUE
Browse files Browse the repository at this point in the history
Signed-off-by: Liyun Xiu <xiliyun@amazon.com>
  • Loading branch information
chishui committed Jul 11, 2024
1 parent 2d8c68c commit 1c46922
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
private String globalRouting;
private String globalIndex;
private Boolean globalRequireAlias;
private int batchSize = 1;
private int batchSize = Integer.MAX_VALUE;

private long sizeInBytes = 0;

Expand Down
6 changes: 3 additions & 3 deletions server/src/main/java/org/opensearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ private void runBulkRequestInBatch(
i++;
}

int batchSize = originalBulkRequest.batchSize();
int batchSize = Math.min(numberOfActionRequests, originalBulkRequest.batchSize());
List<List<IndexRequestWrapper>> batches = prepareBatches(batchSize, indexRequestWrappers);
logger.debug("batchSize: {}, batches: {}", batchSize, batches.size());

Expand All @@ -655,7 +655,7 @@ private void runBulkRequestInBatch(
}

private boolean shouldExecuteBulkRequestInBatch(int documentSize, int batchSize) {
return documentSize > 1 && batchSize > 1;
return batchSize > 1;
}

/**
Expand Down Expand Up @@ -685,7 +685,7 @@ static List<List<IndexRequestWrapper>> prepareBatches(int batchSize, List<IndexR
}
List<List<IndexRequestWrapper>> batchedIndexRequests = new ArrayList<>();
for (Map.Entry<Integer, List<IndexRequestWrapper>> indexRequestsPerKey : indexRequestsPerIndexAndPipelines.entrySet()) {
for (int i = 0; i < indexRequestsPerKey.getValue().size(); i += batchSize) {
for (int i = 0; i < indexRequestsPerKey.getValue().size(); i += Math.min(indexRequestsPerKey.getValue().size(), batchSize)) {
batchedIndexRequests.add(
new ArrayList<>(
indexRequestsPerKey.getValue().subList(i, i + Math.min(batchSize, indexRequestsPerKey.getValue().size() - i))
Expand Down
17 changes: 11 additions & 6 deletions server/src/test/java/org/opensearch/ingest/IngestServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1134,10 +1134,14 @@ public void testBulkRequestExecutionWithFailures() throws Exception {
Exception error = new RuntimeException();
doAnswer(args -> {
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
handler.accept(null, error);
List<IngestDocumentWrapper> ingestDocumentWrappers = (List) args.getArguments()[0];
Consumer<List<IngestDocumentWrapper>> handler = (Consumer) args.getArguments()[1];
for (IngestDocumentWrapper wrapper : ingestDocumentWrappers) {
wrapper.update(wrapper.getIngestDocument(), error);
}
handler.accept(ingestDocumentWrappers);
return null;
}).when(processor).execute(any(), any());
}).when(processor).batchExecute(any(), any());
IngestService ingestService = createWithProcessors(
Collections.singletonMap("mock", (factories, tag, description, config) -> processor)
);
Expand Down Expand Up @@ -1192,10 +1196,11 @@ public void testBulkRequestExecution() throws Exception {
when(processor.getTag()).thenReturn("mockTag");
doAnswer(args -> {
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null);
List<IngestDocumentWrapper> ingestDocumentWrappers = (List) args.getArguments()[0];
Consumer<List<IngestDocumentWrapper>> handler = (Consumer) args.getArguments()[1];
handler.accept(ingestDocumentWrappers);
return null;
}).when(processor).execute(any(), any());
}).when(processor).batchExecute(any(), any());
Map<String, Processor.Factory> map = new HashMap<>(2);
map.put("mock", (factories, tag, description, config) -> processor);

Expand Down

0 comments on commit 1c46922

Please sign in to comment.