From bb55c6b37481a13fd56da7ae92d8732fc9ec43a6 Mon Sep 17 00:00:00 2001 From: RS146BIJAY Date: Thu, 11 Apr 2024 13:01:29 +0530 Subject: [PATCH] Grouping segments during flushing --- ...riteriaBasedGroupingTieredMergePolicy.java | 72 +++++++++++++++++ .../index/DWPTGroupingCriteriaDefinition.java | 80 +++++++++++++++++++ .../apache/lucene/index/DocumentsWriter.java | 33 +++++++- .../index/DocumentsWriterFlushControl.java | 4 +- .../index/DocumentsWriterPerThread.java | 6 +- .../index/DocumentsWriterPerThreadPool.java | 70 +++++++++++----- .../lucene/index/LiveIndexWriterConfig.java | 25 ++++++ .../lucene/index/TestDocumentWriter.java | 42 ++++++++++ .../TestDocumentsWriterPerThreadPool.java | 24 +++--- .../apache/lucene/index/TestIndexWriter.java | 4 +- .../apache/lucene/tests/index/DocHelper.java | 50 ++++++++++++ 11 files changed, 372 insertions(+), 38 deletions(-) create mode 100644 lucene/core/src/java/org/apache/lucene/index/CriteriaBasedGroupingTieredMergePolicy.java create mode 100644 lucene/core/src/java/org/apache/lucene/index/DWPTGroupingCriteriaDefinition.java diff --git a/lucene/core/src/java/org/apache/lucene/index/CriteriaBasedGroupingTieredMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/CriteriaBasedGroupingTieredMergePolicy.java new file mode 100644 index 000000000000..410d7469845b --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/index/CriteriaBasedGroupingTieredMergePolicy.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.index; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Decorator Merge policy over the existing Tiered Merge policy. During a segment merge, this policy + * would categorize segments according to their grouping function outcomes before merging segments + * within the same category, thus maintaining the grouping criteria’s integrity throughout the merge + * process. + * + * @lucene.experimental + */ +public class CriteriaBasedGroupingTieredMergePolicy extends TieredMergePolicy { + + @Override + public MergeSpecification findMerges( + MergeTrigger mergeTrigger, SegmentInfos infos, MergeContext mergeContext) throws IOException { + final Set merging = mergeContext.getMergingSegments(); + MergeSpecification spec = null; + final Map> commitInfos = new HashMap<>(); + for (SegmentCommitInfo si : infos) { + if (merging.contains(si)) { + continue; + } + + final String dwptGroupNumber = si.info.getAttribute("dwptGroupNumber"); + commitInfos.computeIfAbsent(dwptGroupNumber, k -> new ArrayList<>()).add(si); + } + + for (String dwptGroupNumber : commitInfos.keySet()) { + if (commitInfos.get(dwptGroupNumber).size() > 1) { + final SegmentInfos newSIS = new SegmentInfos(infos.getIndexCreatedVersionMajor()); + for (SegmentCommitInfo info : commitInfos.get(dwptGroupNumber)) { + newSIS.add(info); + } + + final MergeSpecification tieredMergePolicySpec = + super.findMerges(mergeTrigger, infos, mergeContext); + if (tieredMergePolicySpec != null) { + if (spec == null) { + spec = new MergeSpecification(); + } + + spec.merges.addAll(tieredMergePolicySpec.merges); + } + } + } + + return spec; + } +} diff --git a/lucene/core/src/java/org/apache/lucene/index/DWPTGroupingCriteriaDefinition.java b/lucene/core/src/java/org/apache/lucene/index/DWPTGroupingCriteriaDefinition.java new file mode 100644 index 000000000000..c3f628ae7958 --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/index/DWPTGroupingCriteriaDefinition.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.index; + +import java.util.function.Function; + +/** Structure of criteria on the basis of which group af a segment is selected. */ +public class DWPTGroupingCriteriaDefinition { + /** Grouping function which determines the DWPT group using which documents will be indexed. */ + private final Function>, Integer> + dwptGroupingCriteriaFunction; + + /** + * Maximum limit on DWPT group size. Any document evaluated in a group number greater this limit + * is indexed using default DWPT group + */ + private final int maxDWPTGroupSize; + + /** + * Group number of default DWPT. This group is returned for documents whose grouping function + * outcome is greater than max group limit. + */ + public static final int DEFAULT_DWPT_GROUP_NUMBER = -1; + + /** Grouping criteria function to select the DWPT pool group of a document. */ + public static final DWPTGroupingCriteriaDefinition DEFAULT_DWPT_GROUPING_CRITERIA_DEFINITION = + new DWPTGroupingCriteriaDefinition( + (document) -> { + return DEFAULT_DWPT_GROUP_NUMBER; + }, + 1); + + /** + * Constructor to create a DWPTGroupingCriteriaDefinition on the basis of a criteria function and + * a max DWPT group size. + * + * @param dwptGroupingCriteriaFunction the grouping criteria function. + * @param maxDWPTGroupSize maximum number of groups allowed by grouping criteria function. + */ + public DWPTGroupingCriteriaDefinition( + final Function>, Integer> + dwptGroupingCriteriaFunction, + final int maxDWPTGroupSize) { + this.dwptGroupingCriteriaFunction = dwptGroupingCriteriaFunction; + this.maxDWPTGroupSize = maxDWPTGroupSize; + } + + /** + * Returns the grouping criteria function. + * + * @return the grouping criteria function. + */ + public Function>, Integer> + getDwptGroupingCriteriaFunction() { + return dwptGroupingCriteriaFunction; + } + + /** + * Returns the max number of groups allowed for this grouping criteria function. + * + * @return the max number of groups allowed for this grouping criteria function. + */ + public int getMaxDWPTGroupSize() { + return maxDWPTGroupSize; + } +} diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java index 7955df5630e4..10b84ae1bab6 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.function.Supplier; import java.util.function.ToLongFunction; import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment; @@ -50,7 +51,7 @@ *

Threads: * *

Multiple threads are allowed into addDocument at once. There is an initial synchronized call - * to {@link DocumentsWriterFlushControl#obtainAndLock()} which allocates a DWPT for this indexing + * to {@link DocumentsWriterFlushControl#obtainAndLock} which allocates a DWPT for this indexing * thread. The same thread will not necessarily get the same DWPT over time. Then updateDocuments is * called on that DWPT without synchronization (most of the "heavy lifting" is in this call). Once a * DWPT fills up enough RAM or hold enough documents in memory the DWPT is checked out for flush and @@ -119,7 +120,7 @@ final class DocumentsWriter implements Closeable, Accountable { this.deleteQueue = new DocumentsWriterDeleteQueue(infoStream); this.perThreadPool = new DocumentsWriterPerThreadPool( - () -> { + (dwptGroupNumber) -> { final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldNumberMap); return new DocumentsWriterPerThread( indexCreatedVersionMajor, @@ -130,7 +131,8 @@ final class DocumentsWriter implements Closeable, Accountable { deleteQueue, infos, pendingNumDocs, - enableTestPoints); + enableTestPoints, + dwptGroupNumber); }); this.pendingNumDocs = pendingNumDocs; flushControl = new DocumentsWriterFlushControl(this, config); @@ -412,7 +414,7 @@ long updateDocuments( throws IOException { boolean hasEvents = preUpdate(); - final DocumentsWriterPerThread dwpt = flushControl.obtainAndLock(); + final DocumentsWriterPerThread dwpt = flushControl.obtainAndLock(getDWPTGroupNumber(docs)); final DocumentsWriterPerThread flushingDWPT; long seqNo; @@ -453,6 +455,29 @@ private boolean maybeFlush() throws IOException { return false; } + /** + * Fetches dwpt group number for a given list of docs. For any group number greater than + * dwptGroupLimit we return the default group. + * + * @param docs the passed list of docs. + * @return dwpt group number for a given list of docs + */ + private int getDWPTGroupNumber( + final Iterable> docs) { + final DWPTGroupingCriteriaDefinition dwptGroupingCriteriaDefinition = + config.getDwptGroupingCriteriaDefinition(); + final Function>, Integer> + dwptGroupingCriteriaFunction = + dwptGroupingCriteriaDefinition.getDwptGroupingCriteriaFunction(); + final int dwptGroupLimit = dwptGroupingCriteriaDefinition.getMaxDWPTGroupSize(); + int dwptGroupNumber = dwptGroupingCriteriaFunction.apply(docs); + if (dwptGroupNumber >= dwptGroupLimit) { + dwptGroupNumber = DWPTGroupingCriteriaDefinition.DEFAULT_DWPT_GROUP_NUMBER; + } + + return dwptGroupNumber; + } + private void doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException { assert flushingDWPT != null : "Flushing DWPT must not be null"; do { diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index 170966e8ae49..77ba5c0cf3ed 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -521,9 +521,9 @@ public void setApplyAllDeletes() { flushDeletes.set(true); } - DocumentsWriterPerThread obtainAndLock() { + DocumentsWriterPerThread obtainAndLock(int dwptGroupNumber) { while (closed == false) { - final DocumentsWriterPerThread perThread = perThreadPool.getAndLock(); + final DocumentsWriterPerThread perThread = perThreadPool.getAndLock(dwptGroupNumber); if (perThread.deleteQueue == documentsWriter.deleteQueue) { // simply return the DWPT even in a flush all case since we already hold the lock and the // DWPT is not stale diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java index 3fe76a6338f3..ef27ea36f98b 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java @@ -52,6 +52,7 @@ final class DocumentsWriterPerThread implements Accountable, Lock { private Throwable abortingException; + public final int dwptGroupNumber; private void onAbortingException(Throwable throwable) { assert throwable != null : "aborting exception must not be null"; @@ -151,7 +152,8 @@ void abort() throws IOException { DocumentsWriterDeleteQueue deleteQueue, FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, - boolean enableTestPoints) { + boolean enableTestPoints, + int dwptGroupNumber) { this.indexMajorVersionCreated = indexMajorVersionCreated; this.directory = new TrackingDirectoryWrapper(directory); this.fieldInfos = fieldInfos; @@ -163,6 +165,7 @@ void abort() throws IOException { this.deleteQueue = Objects.requireNonNull(deleteQueue); assert numDocsInRAM == 0 : "num docs " + numDocsInRAM; deleteSlice = deleteQueue.newSlice(); + this.dwptGroupNumber = dwptGroupNumber; segmentInfo = new SegmentInfo( @@ -178,6 +181,7 @@ void abort() throws IOException { StringHelper.randomId(), Collections.emptyMap(), indexWriterConfig.getIndexSort()); + segmentInfo.putAttribute("dwptGroupNumber", String.valueOf(dwptGroupNumber)); assert numDocsInRAM == 0; if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) { infoStream.message( diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java index d69a71bfea57..380a0504e37a 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java @@ -22,9 +22,11 @@ import java.util.IdentityHashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import java.util.function.Predicate; -import java.util.function.Supplier; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.ThreadInterruptedException; @@ -42,21 +44,25 @@ */ final class DocumentsWriterPerThreadPool implements Iterable, Closeable { - private final Set dwpts = - Collections.newSetFromMap(new IdentityHashMap<>()); - private final LockableConcurrentApproximatePriorityQueue freeList = - new LockableConcurrentApproximatePriorityQueue<>(); - private final Supplier dwptFactory; + private final Map> dwpts = new ConcurrentHashMap<>(); + private final Map> + freeList = new ConcurrentHashMap<>(); + private final Function dwptFactory; private int takenWriterPermits = 0; private volatile boolean closed; - DocumentsWriterPerThreadPool(Supplier dwptFactory) { + DocumentsWriterPerThreadPool(Function dwptFactory) { this.dwptFactory = dwptFactory; } /** Returns the active number of {@link DocumentsWriterPerThread} instances. */ synchronized int size() { - return dwpts.size(); + int si = 0; + for (int dwptGroupNumber : dwpts.keySet()) { + si += dwpts.get(dwptGroupNumber).size(); + } + + return si; } synchronized void lockNewWriters() { @@ -82,7 +88,7 @@ synchronized void unlockNewWriters() { * * @return a new {@link DocumentsWriterPerThread} */ - private synchronized DocumentsWriterPerThread newWriter() { + private synchronized DocumentsWriterPerThread newWriter(int dwptGroupNumber) { assert takenWriterPermits >= 0; while (takenWriterPermits > 0) { // we can't create new DWPTs while not all permits are available @@ -99,12 +105,20 @@ private synchronized DocumentsWriterPerThread newWriter() { // end of the world it's violating the contract that we don't release any new DWPT after this // pool is closed ensureOpen(); - DocumentsWriterPerThread dwpt = dwptFactory.get(); + DocumentsWriterPerThread dwpt = dwptFactory.apply(dwptGroupNumber); dwpt.lock(); // lock so nobody else will get this DWPT - dwpts.add(dwpt); + getDwpts(dwptGroupNumber).add(dwpt); return dwpt; } + private synchronized Set getDwpts(int dwptGroupNumber) { + if (!dwpts.containsKey(dwptGroupNumber)) { + dwpts.put(dwptGroupNumber, Collections.newSetFromMap(new IdentityHashMap<>())); + } + + return dwpts.get(dwptGroupNumber); + } + // TODO: maybe we should try to do load leveling here: we want roughly even numbers // of items (docs, deletes, DV updates) to most take advantage of concurrency while flushing @@ -112,9 +126,9 @@ private synchronized DocumentsWriterPerThread newWriter() { * This method is used by DocumentsWriter/FlushControl to obtain a DWPT to do an indexing * operation (add/updateDocument). */ - DocumentsWriterPerThread getAndLock() { + DocumentsWriterPerThread getAndLock(final int dwptGroupNumber) { ensureOpen(); - DocumentsWriterPerThread dwpt = freeList.lockAndPoll(); + DocumentsWriterPerThread dwpt = getFreeList(dwptGroupNumber).lockAndPoll(); if (dwpt != null) { return dwpt; } @@ -123,7 +137,16 @@ DocumentsWriterPerThread getAndLock() { // `freeList` at this point, it will be added later on once DocumentsWriter has indexed a // document into this DWPT and then gives it back to the pool by calling // #marksAsFreeAndUnlock. - return newWriter(); + return newWriter(dwptGroupNumber); + } + + private LockableConcurrentApproximatePriorityQueue getFreeList( + final int dwptGroupNumber) { + if (!freeList.containsKey(dwptGroupNumber)) { + freeList.put(dwptGroupNumber, new LockableConcurrentApproximatePriorityQueue<>()); + } + + return freeList.get(dwptGroupNumber); } private void ensureOpen() { @@ -133,20 +156,25 @@ private void ensureOpen() { } private synchronized boolean contains(DocumentsWriterPerThread state) { - return dwpts.contains(state); + return getDwpts(state.dwptGroupNumber).contains(state); } void marksAsFreeAndUnlock(DocumentsWriterPerThread state) { final long ramBytesUsed = state.ramBytesUsed(); assert contains(state) : "we tried to add a DWPT back to the pool but the pool doesn't know about this DWPT"; - freeList.addAndUnlock(state, ramBytesUsed); + getFreeList(state.dwptGroupNumber).addAndUnlock(state, ramBytesUsed); } @Override public synchronized Iterator iterator() { // copy on read - this is a quick op since num states is low - return List.copyOf(dwpts).iterator(); + List list = new ArrayList<>(); + for (int groupId : dwpts.keySet()) { + list.addAll(List.copyOf(dwpts.get(groupId))); + } + + return list.iterator(); } /** @@ -182,10 +210,10 @@ synchronized boolean checkout(DocumentsWriterPerThread perThread) { // #getAndLock cannot pull this DWPT out of the pool since #getAndLock does a DWPT#tryLock to // check if the DWPT is available. assert perThread.isHeldByCurrentThread(); - if (dwpts.remove(perThread)) { - freeList.remove(perThread); + if (getDwpts(perThread.dwptGroupNumber).remove(perThread)) { + getFreeList(perThread.dwptGroupNumber).remove(perThread); } else { - assert freeList.contains(perThread) == false; + assert getFreeList(perThread.dwptGroupNumber).contains(perThread) == false; return false; } return true; @@ -193,7 +221,7 @@ synchronized boolean checkout(DocumentsWriterPerThread perThread) { /** Returns true if this DWPT is still part of the pool */ synchronized boolean isRegistered(DocumentsWriterPerThread perThread) { - return dwpts.contains(perThread); + return getDwpts(perThread.dwptGroupNumber).contains(perThread); } @Override diff --git a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java index c9db6d0c6f66..4115bf9a8723 100644 --- a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java +++ b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java @@ -116,6 +116,9 @@ public class LiveIndexWriterConfig { /** The IndexWriter event listener to record key events * */ protected IndexWriterEventListener eventListener; + /** DWPT criteria definition used to select pool group from which DWPT will be selected. */ + protected DWPTGroupingCriteriaDefinition dwptGroupingCriteriaDefinition; + // used by IndexWriterConfig LiveIndexWriterConfig(Analyzer analyzer) { this.analyzer = analyzer; @@ -139,6 +142,8 @@ public class LiveIndexWriterConfig { perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB; maxFullFlushMergeWaitMillis = IndexWriterConfig.DEFAULT_MAX_FULL_FLUSH_MERGE_WAIT_MILLIS; eventListener = IndexWriterEventListener.NO_OP_LISTENER; + dwptGroupingCriteriaDefinition = + DWPTGroupingCriteriaDefinition.DEFAULT_DWPT_GROUPING_CRITERIA_DEFINITION; } /** Returns the default analyzer to use for indexing documents. */ @@ -377,6 +382,26 @@ public LiveIndexWriterConfig setUseCompoundFile(boolean useCompoundFile) { return this; } + /** + * Set DWPT grouping criteria definition + * + * @param dwptGroupingCriteriaDefinition the passed DWPT grouping criteria definition. + */ + public LiveIndexWriterConfig setDWPTGroupingCriteriaFunction( + final DWPTGroupingCriteriaDefinition dwptGroupingCriteriaDefinition) { + this.dwptGroupingCriteriaDefinition = dwptGroupingCriteriaDefinition; + return this; + } + + /** + * Get DWPT grouping criteria definition + * + * @return dwptGroupingCriteriaDefinition + */ + public DWPTGroupingCriteriaDefinition getDwptGroupingCriteriaDefinition() { + return dwptGroupingCriteriaDefinition; + } + /** * Returns true iff the {@link IndexWriter} packs newly written segments in a * compound file. Default is true. diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDocumentWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestDocumentWriter.java index 7d8cdf95d233..563fffaffeee 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestDocumentWriter.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestDocumentWriter.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.io.Reader; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.function.Function; import org.apache.lucene.analysis.Analyzer; @@ -605,4 +606,45 @@ public void testIndexBinaryValueWithoutTokenStream() throws IOException { } } } + + public void testCriteriaBasedDWPTGrouping() throws IOException { + int[] statusCodes = new int[] {200, 201, 400, 404, 500, 200, 300}; + final IndexWriterConfig config = newIndexWriterConfig(new MockAnalyzer(random())); + config.setMergePolicy(new CriteriaBasedGroupingTieredMergePolicy()); + config.setDWPTGroupingCriteriaFunction(DocHelper.getDWPTCriteriaDefinition()); + IndexWriter writer = new IndexWriter(dir, config); + for (int statusCode : statusCodes) { + final Document logDocument = DocHelper.createLogDocument(17, "test", statusCode); + writer.addDocument(logDocument); + } + + writer.commit(); + SegmentInfos infos = writer.cloneSegmentInfos(); + assertEquals(infos.size(), 2); + Iterator segInfoIterator = infos.iterator(); + SegmentCommitInfo si = segInfoIterator.next(); + + // First segment will contain 2xx and 3xx logs. + SegmentReader reader = new SegmentReader(si, Version.LATEST.major, newIOContext(random())); + assertTrue(reader != null); + Document doc = reader.storedFields().document(0); + assertTrue(doc != null); + IndexableField[] fields = doc.getFields("statuscode"); + assertTrue(fields != null && fields.length == 1); + assertTrue(fields[0].stringValue().startsWith("2") || fields[0].stringValue().startsWith("3")); + reader.close(); + + si = segInfoIterator.next(); + + // Second segments will contain 4xx and 5xx logs. + reader = new SegmentReader(si, Version.LATEST.major, newIOContext(random())); + assertTrue(reader != null); + doc = reader.storedFields().document(0); + assertTrue(doc != null); + fields = doc.getFields("statuscode"); + assertTrue(fields != null && fields.length == 1); + assertTrue(fields[0].stringValue().startsWith("4") || fields[0].stringValue().startsWith("5")); + writer.close(); + reader.close(); + } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterPerThreadPool.java b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterPerThreadPool.java index 4f52c1f5854e..b2e320fa412b 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterPerThreadPool.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterPerThreadPool.java @@ -31,7 +31,7 @@ public void testLockReleaseAndClose() throws IOException { try (Directory directory = newDirectory()) { DocumentsWriterPerThreadPool pool = new DocumentsWriterPerThreadPool( - () -> + (dwptGroupNumber) -> new DocumentsWriterPerThread( Version.LATEST.major, "", @@ -41,15 +41,19 @@ public void testLockReleaseAndClose() throws IOException { new DocumentsWriterDeleteQueue(null), null, new AtomicLong(), - false)); + false, + dwptGroupNumber)); - DocumentsWriterPerThread first = pool.getAndLock(); + DocumentsWriterPerThread first = + pool.getAndLock(DWPTGroupingCriteriaDefinition.DEFAULT_DWPT_GROUP_NUMBER); assertEquals(1, pool.size()); - DocumentsWriterPerThread second = pool.getAndLock(); + DocumentsWriterPerThread second = + pool.getAndLock(DWPTGroupingCriteriaDefinition.DEFAULT_DWPT_GROUP_NUMBER); assertEquals(2, pool.size()); pool.marksAsFreeAndUnlock(first); assertEquals(2, pool.size()); - DocumentsWriterPerThread third = pool.getAndLock(); + DocumentsWriterPerThread third = + pool.getAndLock(DWPTGroupingCriteriaDefinition.DEFAULT_DWPT_GROUP_NUMBER); assertSame(first, third); assertEquals(2, pool.size()); pool.checkout(third); @@ -71,7 +75,7 @@ public void testCloseWhileNewWritersLocked() throws IOException, InterruptedExce try (Directory directory = newDirectory()) { DocumentsWriterPerThreadPool pool = new DocumentsWriterPerThreadPool( - () -> + (dwptGroupNumber) -> new DocumentsWriterPerThread( Version.LATEST.major, "", @@ -81,9 +85,11 @@ public void testCloseWhileNewWritersLocked() throws IOException, InterruptedExce new DocumentsWriterDeleteQueue(null), null, new AtomicLong(), - false)); + false, + dwptGroupNumber)); - DocumentsWriterPerThread first = pool.getAndLock(); + DocumentsWriterPerThread first = + pool.getAndLock(DWPTGroupingCriteriaDefinition.DEFAULT_DWPT_GROUP_NUMBER); pool.lockNewWriters(); CountDownLatch latch = new CountDownLatch(1); Thread t = @@ -91,7 +97,7 @@ public void testCloseWhileNewWritersLocked() throws IOException, InterruptedExce () -> { try { latch.countDown(); - pool.getAndLock(); + pool.getAndLock(DWPTGroupingCriteriaDefinition.DEFAULT_DWPT_GROUP_NUMBER); fail(); } catch ( @SuppressWarnings("unused") diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java index 4cee1635c42e..87e92401da78 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java @@ -4126,7 +4126,9 @@ public void testFlushWhileStartingNewThreads() throws IOException, InterruptedEx List states = new ArrayList<>(); try { for (int i = 0; i < 100; i++) { - DocumentsWriterPerThread state = w.docWriter.perThreadPool.getAndLock(); + DocumentsWriterPerThread state = + w.docWriter.perThreadPool.getAndLock( + DWPTGroupingCriteriaDefinition.DEFAULT_DWPT_GROUP_NUMBER); states.add(state::unlock); state.deleteQueue.getNextSequenceNumber(); } diff --git a/lucene/test-framework/src/java/org/apache/lucene/tests/index/DocHelper.java b/lucene/test-framework/src/java/org/apache/lucene/tests/index/DocHelper.java index b51842c8a401..757695ec3158 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/tests/index/DocHelper.java +++ b/lucene/test-framework/src/java/org/apache/lucene/tests/index/DocHelper.java @@ -19,8 +19,10 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Random; +import java.util.function.Function; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; @@ -28,6 +30,7 @@ import org.apache.lucene.document.StoredField; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; +import org.apache.lucene.index.DWPTGroupingCriteriaDefinition; import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; @@ -326,4 +329,51 @@ public static Document createDocument(int n, String indexName, int numFields) { } return doc; } + + /** + * Creates a log document with a given id, indexName and status code + * + * @param n id of the document + * @param indexName index name where document is added + * @param statusCode status code of the log entry + * @return a log document with a given id, indexName and status code + */ + public static Document createLogDocument(int n, String indexName, int statusCode) { + Document doc = new Document(); + doc.add(new Field("id", Integer.toString(n), STRING_TYPE_STORED_WITH_TVS)); + doc.add(new Field("indexname", indexName, STRING_TYPE_STORED_WITH_TVS)); + doc.add(new Field("statuscode", Integer.toString(statusCode), STRING_TYPE_STORED_WITH_TVS)); + doc.add(new Field("@timestamp", "898787158", STRING_TYPE_STORED_WITH_TVS)); + doc.add(new Field("clientip", "5.12.211.1", STRING_TYPE_STORED_WITH_TVS)); + doc.add(new Field("request", "GET /gematu/lowsea.gif HTTP/1.1", STRING_TYPE_STORED_WITH_TVS)); + + return doc; + } + + /** + * Returns a criteria definition which groups data by status code. + * + * @return a criteria definition which groups data by status code. + */ + public static DWPTGroupingCriteriaDefinition getDWPTCriteriaDefinition() { + final Function>, Integer> + criteriaFunction = + (docs) -> { + Iterator docIt = docs.iterator().next().iterator(); + while (docIt.hasNext()) { + IndexableField field = docIt.next(); + if (field.stringValue() != null && field.name().equals("statuscode")) { + int statusCode = Integer.parseInt(field.stringValue()); + if ((statusCode / 100) == 2 || (statusCode / 100) == 3) { + return 1; + } else if ((statusCode / 100) == 4 || (statusCode / 100) == 5) { + return 2; + } + } + } + return 0; + }; + + return new DWPTGroupingCriteriaDefinition(criteriaFunction, 4); + } }