Skip to content

Commit

Permalink
Grouping segments during flushing
Browse files Browse the repository at this point in the history
  • Loading branch information
RS146BIJAY committed May 24, 2024
1 parent 13cf882 commit bbb8d2c
Show file tree
Hide file tree
Showing 11 changed files with 376 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Decorator Merge policy over the existing Tiered Merge policy. During a segment merge, this policy would categorize
* segments according to their grouping function outcomes before merging segments within the same category, thus
* maintaining the grouping criteria’s integrity throughout the merge process.
*
* @lucene.experimental
*/
public class CriteriaBasedGroupingTieredMergePolicy extends TieredMergePolicy {

@Override
public MergeSpecification findMerges(
MergeTrigger mergeTrigger, SegmentInfos infos, MergeContext mergeContext) throws IOException {
final Set<SegmentCommitInfo> merging = mergeContext.getMergingSegments();
MergeSpecification spec = null;
final Map<String, List<SegmentCommitInfo>> commitInfos = new HashMap<>();
for (SegmentCommitInfo si : infos) {
if (merging.contains(si)) {
continue;
}

final String dwptGroupNumber = si.info.getAttribute("dwptGroupNumber");
commitInfos.computeIfAbsent(dwptGroupNumber, k -> new ArrayList<>()).add(si);
}

for (String dwptGroupNumber : commitInfos.keySet()) {
if (commitInfos.get(dwptGroupNumber).size() > 1) {
final SegmentInfos newSIS = new SegmentInfos(infos.getIndexCreatedVersionMajor());
for (SegmentCommitInfo info : commitInfos.get(dwptGroupNumber)) {
newSIS.add(info);
}

final MergeSpecification tieredMergePolicySpec =
super.findMerges(mergeTrigger, infos, mergeContext);
if (tieredMergePolicySpec != null) {
if (spec == null) {
spec = new MergeSpecification();
}

spec.merges.addAll(tieredMergePolicySpec.merges);
}
}
}

return spec;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;

import java.util.function.Function;

/**
* Structure of criteria on the basis of which group af a segment is selected.
*
*/
public class DWPTGroupingCriteriaDefinition {
/** Grouping function which determines the DWPT group using which documents will be indexed. */
private final Function<Iterable<? extends Iterable<? extends IndexableField>>, Integer>
dwptGroupingCriteriaFunction;

/**
* Maximum limit on DWPT group size. Any document evaluated in a group number greater this limit
* is indexed using default DWPT group
*/
private final int maxDWPTGroupSize;

/**
* Group number of default DWPT. This group is returned for documents whose grouping function outcome is greater than
* max group limit.
*/
public static final int DEFAULT_DWPT_GROUP_NUMBER = -1;

/**
* Grouping criteria function to select the DWPT pool group of a document.
*/
public static final DWPTGroupingCriteriaDefinition DEFAULT_DWPT_GROUPING_CRITERIA_DEFINITION =
new DWPTGroupingCriteriaDefinition(
(document) -> {
return DEFAULT_DWPT_GROUP_NUMBER;
},
1);

/**
* Constructor to create a DWPTGroupingCriteriaDefinition on the basis of a criteria function and a max DWPT group
* size.
*
* @param dwptGroupingCriteriaFunction
* @param maxDWPTGroupSize
*/
public DWPTGroupingCriteriaDefinition(
final Function<Iterable<? extends Iterable<? extends IndexableField>>, Integer>
dwptGroupingCriteriaFunction,
final int maxDWPTGroupSize) {
this.dwptGroupingCriteriaFunction = dwptGroupingCriteriaFunction;
this.maxDWPTGroupSize = maxDWPTGroupSize;
}

/**
* Returns the grouping criteria function.
*
* @return the grouping criteria function.
*/
public Function<Iterable<? extends Iterable<? extends IndexableField>>, Integer>
getDwptGroupingCriteriaFunction() {
return dwptGroupingCriteriaFunction;
}

/**
* Returns the max number of groups allowed for this grouping criteria function.
*
* @return the max number of groups allowed for this grouping criteria function.
*/
public int getMaxDWPTGroupSize() {
return maxDWPTGroupSize;
}
}
33 changes: 29 additions & 4 deletions lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
Expand All @@ -50,7 +51,7 @@
* <p>Threads:
*
* <p>Multiple threads are allowed into addDocument at once. There is an initial synchronized call
* to {@link DocumentsWriterFlushControl#obtainAndLock()} which allocates a DWPT for this indexing
* to {@link DocumentsWriterFlushControl#obtainAndLock} which allocates a DWPT for this indexing
* thread. The same thread will not necessarily get the same DWPT over time. Then updateDocuments is
* called on that DWPT without synchronization (most of the "heavy lifting" is in this call). Once a
* DWPT fills up enough RAM or hold enough documents in memory the DWPT is checked out for flush and
Expand Down Expand Up @@ -119,7 +120,7 @@ final class DocumentsWriter implements Closeable, Accountable {
this.deleteQueue = new DocumentsWriterDeleteQueue(infoStream);
this.perThreadPool =
new DocumentsWriterPerThreadPool(
() -> {
(dwptGroupNumber) -> {
final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldNumberMap);
return new DocumentsWriterPerThread(
indexCreatedVersionMajor,
Expand All @@ -130,7 +131,8 @@ final class DocumentsWriter implements Closeable, Accountable {
deleteQueue,
infos,
pendingNumDocs,
enableTestPoints);
enableTestPoints,
dwptGroupNumber);
});
this.pendingNumDocs = pendingNumDocs;
flushControl = new DocumentsWriterFlushControl(this, config);
Expand Down Expand Up @@ -412,7 +414,7 @@ long updateDocuments(
throws IOException {
boolean hasEvents = preUpdate();

final DocumentsWriterPerThread dwpt = flushControl.obtainAndLock();
final DocumentsWriterPerThread dwpt = flushControl.obtainAndLock(getDWPTGroupNumber(docs));
final DocumentsWriterPerThread flushingDWPT;
long seqNo;

Expand Down Expand Up @@ -453,6 +455,29 @@ private boolean maybeFlush() throws IOException {
return false;
}

/**
* Fetches dwpt group number for a given list of docs. For any group number greater than
* dwptGroupLimit we return the default group.
*
* @param docs the passed list of docs.
* @return dwpt group number for a given list of docs
*/
private int getDWPTGroupNumber(
final Iterable<? extends Iterable<? extends IndexableField>> docs) {
final DWPTGroupingCriteriaDefinition dwptGroupingCriteriaDefinition =
config.getDwptGroupingCriteriaDefinition();
final Function<Iterable<? extends Iterable<? extends IndexableField>>, Integer>
dwptGroupingCriteriaFunction =
dwptGroupingCriteriaDefinition.getDwptGroupingCriteriaFunction();
final int dwptGroupLimit = dwptGroupingCriteriaDefinition.getMaxDWPTGroupSize();
int dwptGroupNumber = dwptGroupingCriteriaFunction.apply(docs);
if (dwptGroupNumber >= dwptGroupLimit) {
dwptGroupNumber = DWPTGroupingCriteriaDefinition.DEFAULT_DWPT_GROUP_NUMBER;
}

return dwptGroupNumber;
}

private void doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
assert flushingDWPT != null : "Flushing DWPT must not be null";
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,9 +521,9 @@ public void setApplyAllDeletes() {
flushDeletes.set(true);
}

DocumentsWriterPerThread obtainAndLock() {
DocumentsWriterPerThread obtainAndLock(int dwptGroupNumber) {
while (closed == false) {
final DocumentsWriterPerThread perThread = perThreadPool.getAndLock();
final DocumentsWriterPerThread perThread = perThreadPool.getAndLock(dwptGroupNumber);
if (perThread.deleteQueue == documentsWriter.deleteQueue) {
// simply return the DWPT even in a flush all case since we already hold the lock and the
// DWPT is not stale
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
final class DocumentsWriterPerThread implements Accountable, Lock {

private Throwable abortingException;
public final int dwptGroupNumber;

private void onAbortingException(Throwable throwable) {
assert throwable != null : "aborting exception must not be null";
Expand Down Expand Up @@ -151,7 +152,8 @@ void abort() throws IOException {
DocumentsWriterDeleteQueue deleteQueue,
FieldInfos.Builder fieldInfos,
AtomicLong pendingNumDocs,
boolean enableTestPoints) {
boolean enableTestPoints,
int dwptGroupNumber) {
this.indexMajorVersionCreated = indexMajorVersionCreated;
this.directory = new TrackingDirectoryWrapper(directory);
this.fieldInfos = fieldInfos;
Expand All @@ -163,6 +165,7 @@ void abort() throws IOException {
this.deleteQueue = Objects.requireNonNull(deleteQueue);
assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
deleteSlice = deleteQueue.newSlice();
this.dwptGroupNumber = dwptGroupNumber;

segmentInfo =
new SegmentInfo(
Expand All @@ -178,6 +181,7 @@ void abort() throws IOException {
StringHelper.randomId(),
Collections.emptyMap(),
indexWriterConfig.getIndexSort());
segmentInfo.putAttribute("dwptGroupNumber", String.valueOf(dwptGroupNumber));
assert numDocsInRAM == 0;
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message(
Expand Down
Loading

0 comments on commit bbb8d2c

Please sign in to comment.