Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Criteria based DWPT selection inside DocumentWriter #13409

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Decorator Merge policy over the existing Tiered Merge policy. During a segment merge, this policy
* would categorize segments according to their grouping function outcomes before merging segments
* within the same category, thus maintaining the grouping criteria’s integrity throughout the merge
* process.
*
* @lucene.experimental
*/
public class CriteriaBasedGroupingTieredMergePolicy extends TieredMergePolicy {

@Override
public MergeSpecification findMerges(
MergeTrigger mergeTrigger, SegmentInfos infos, MergeContext mergeContext) throws IOException {
final Set<SegmentCommitInfo> merging = mergeContext.getMergingSegments();
MergeSpecification spec = null;
final Map<String, List<SegmentCommitInfo>> commitInfos = new HashMap<>();
for (SegmentCommitInfo si : infos) {
if (merging.contains(si)) {
continue;
}

final String dwptGroupNumber = si.info.getAttribute("dwptGroupNumber");
commitInfos.computeIfAbsent(dwptGroupNumber, k -> new ArrayList<>()).add(si);
}

for (String dwptGroupNumber : commitInfos.keySet()) {
if (commitInfos.get(dwptGroupNumber).size() > 1) {
final SegmentInfos newSIS = new SegmentInfos(infos.getIndexCreatedVersionMajor());
for (SegmentCommitInfo info : commitInfos.get(dwptGroupNumber)) {
newSIS.add(info);
}

final MergeSpecification tieredMergePolicySpec =
super.findMerges(mergeTrigger, infos, mergeContext);
if (tieredMergePolicySpec != null) {
if (spec == null) {
spec = new MergeSpecification();
}

spec.merges.addAll(tieredMergePolicySpec.merges);
}
}
}

return spec;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;

import java.util.function.Function;

/** Structure of criteria on the basis of which group af a segment is selected. */
public class DWPTGroupingCriteriaDefinition {
/** Grouping function which determines the DWPT group using which documents will be indexed. */
private final Function<Iterable<? extends Iterable<? extends IndexableField>>, Integer>
dwptGroupingCriteriaFunction;

/**
* Maximum limit on DWPT group size. Any document evaluated in a group number greater this limit
* is indexed using default DWPT group
*/
private final int maxDWPTGroupSize;

/**
* Group number of default DWPT. This group is returned for documents whose grouping function
* outcome is greater than max group limit.
*/
public static final int DEFAULT_DWPT_GROUP_NUMBER = -1;

/** Grouping criteria function to select the DWPT pool group of a document. */
public static final DWPTGroupingCriteriaDefinition DEFAULT_DWPT_GROUPING_CRITERIA_DEFINITION =
new DWPTGroupingCriteriaDefinition(
(document) -> {
return DEFAULT_DWPT_GROUP_NUMBER;
},
1);

/**
* Constructor to create a DWPTGroupingCriteriaDefinition on the basis of a criteria function and
* a max DWPT group size.
*
* @param dwptGroupingCriteriaFunction the grouping criteria function.
* @param maxDWPTGroupSize maximum number of groups allowed by grouping criteria function.
*/
public DWPTGroupingCriteriaDefinition(
final Function<Iterable<? extends Iterable<? extends IndexableField>>, Integer>
dwptGroupingCriteriaFunction,
final int maxDWPTGroupSize) {
this.dwptGroupingCriteriaFunction = dwptGroupingCriteriaFunction;
this.maxDWPTGroupSize = maxDWPTGroupSize;
}

/**
* Returns the grouping criteria function.
*
* @return the grouping criteria function.
*/
public Function<Iterable<? extends Iterable<? extends IndexableField>>, Integer>
getDwptGroupingCriteriaFunction() {
return dwptGroupingCriteriaFunction;
}

/**
* Returns the max number of groups allowed for this grouping criteria function.
*
* @return the max number of groups allowed for this grouping criteria function.
*/
public int getMaxDWPTGroupSize() {
return maxDWPTGroupSize;
}
}
33 changes: 29 additions & 4 deletions lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
Expand All @@ -50,7 +51,7 @@
* <p>Threads:
*
* <p>Multiple threads are allowed into addDocument at once. There is an initial synchronized call
* to {@link DocumentsWriterFlushControl#obtainAndLock()} which allocates a DWPT for this indexing
* to {@link DocumentsWriterFlushControl#obtainAndLock} which allocates a DWPT for this indexing
* thread. The same thread will not necessarily get the same DWPT over time. Then updateDocuments is
* called on that DWPT without synchronization (most of the "heavy lifting" is in this call). Once a
* DWPT fills up enough RAM or hold enough documents in memory the DWPT is checked out for flush and
Expand Down Expand Up @@ -119,7 +120,7 @@ final class DocumentsWriter implements Closeable, Accountable {
this.deleteQueue = new DocumentsWriterDeleteQueue(infoStream);
this.perThreadPool =
new DocumentsWriterPerThreadPool(
() -> {
(dwptGroupNumber) -> {
final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldNumberMap);
return new DocumentsWriterPerThread(
indexCreatedVersionMajor,
Expand All @@ -130,7 +131,8 @@ final class DocumentsWriter implements Closeable, Accountable {
deleteQueue,
infos,
pendingNumDocs,
enableTestPoints);
enableTestPoints,
dwptGroupNumber);
});
this.pendingNumDocs = pendingNumDocs;
flushControl = new DocumentsWriterFlushControl(this, config);
Expand Down Expand Up @@ -412,7 +414,7 @@ long updateDocuments(
throws IOException {
boolean hasEvents = preUpdate();

final DocumentsWriterPerThread dwpt = flushControl.obtainAndLock();
final DocumentsWriterPerThread dwpt = flushControl.obtainAndLock(getDWPTGroupNumber(docs));
final DocumentsWriterPerThread flushingDWPT;
long seqNo;

Expand Down Expand Up @@ -453,6 +455,29 @@ private boolean maybeFlush() throws IOException {
return false;
}

/**
* Fetches dwpt group number for a given list of docs. For any group number greater than
* dwptGroupLimit we return the default group.
*
* @param docs the passed list of docs.
* @return dwpt group number for a given list of docs
*/
private int getDWPTGroupNumber(
final Iterable<? extends Iterable<? extends IndexableField>> docs) {
final DWPTGroupingCriteriaDefinition dwptGroupingCriteriaDefinition =
config.getDwptGroupingCriteriaDefinition();
final Function<Iterable<? extends Iterable<? extends IndexableField>>, Integer>
dwptGroupingCriteriaFunction =
dwptGroupingCriteriaDefinition.getDwptGroupingCriteriaFunction();
final int dwptGroupLimit = dwptGroupingCriteriaDefinition.getMaxDWPTGroupSize();
int dwptGroupNumber = dwptGroupingCriteriaFunction.apply(docs);
if (dwptGroupNumber >= dwptGroupLimit) {
dwptGroupNumber = DWPTGroupingCriteriaDefinition.DEFAULT_DWPT_GROUP_NUMBER;
}

return dwptGroupNumber;
}

private void doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
assert flushingDWPT != null : "Flushing DWPT must not be null";
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,9 +521,9 @@ public void setApplyAllDeletes() {
flushDeletes.set(true);
}

DocumentsWriterPerThread obtainAndLock() {
DocumentsWriterPerThread obtainAndLock(int dwptGroupNumber) {
while (closed == false) {
final DocumentsWriterPerThread perThread = perThreadPool.getAndLock();
final DocumentsWriterPerThread perThread = perThreadPool.getAndLock(dwptGroupNumber);
if (perThread.deleteQueue == documentsWriter.deleteQueue) {
// simply return the DWPT even in a flush all case since we already hold the lock and the
// DWPT is not stale
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
final class DocumentsWriterPerThread implements Accountable, Lock {

private Throwable abortingException;
public final int dwptGroupNumber;

private void onAbortingException(Throwable throwable) {
assert throwable != null : "aborting exception must not be null";
Expand Down Expand Up @@ -151,7 +152,8 @@ void abort() throws IOException {
DocumentsWriterDeleteQueue deleteQueue,
FieldInfos.Builder fieldInfos,
AtomicLong pendingNumDocs,
boolean enableTestPoints) {
boolean enableTestPoints,
int dwptGroupNumber) {
this.indexMajorVersionCreated = indexMajorVersionCreated;
this.directory = new TrackingDirectoryWrapper(directory);
this.fieldInfos = fieldInfos;
Expand All @@ -163,6 +165,7 @@ void abort() throws IOException {
this.deleteQueue = Objects.requireNonNull(deleteQueue);
assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
deleteSlice = deleteQueue.newSlice();
this.dwptGroupNumber = dwptGroupNumber;

segmentInfo =
new SegmentInfo(
Expand All @@ -178,6 +181,7 @@ void abort() throws IOException {
StringHelper.randomId(),
Collections.emptyMap(),
indexWriterConfig.getIndexSort());
segmentInfo.putAttribute("dwptGroupNumber", String.valueOf(dwptGroupNumber));
assert numDocsInRAM == 0;
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message(
Expand Down
Loading