From 71a771e938f5faf5f6d2d00e0ba27e99e170ed88 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Wed, 4 Sep 2024 22:57:14 +0530 Subject: [PATCH 1/3] [Star tree][Bug] Fix for derived metrics (#15640) * Fix for derived metrics Signed-off-by: Bharathwaj G * fixes for byte Signed-off-by: Bharathwaj G --------- Signed-off-by: Bharathwaj G --- .../Composite99DocValuesReader.java | 2 +- .../index/compositeindex/datacube/Metric.java | 16 +++++ .../startree/builder/BaseStarTreeBuilder.java | 7 +- .../builder/OffHeapStarTreeBuilder.java | 2 +- .../builder/OnHeapStarTreeBuilder.java | 2 +- .../fileformats/meta/StarTreeMetadata.java | 29 ++++++-- .../startree/index/StarTreeValues.java | 2 +- .../index/mapper/NumberFieldMapper.java | 4 +- .../index/mapper/StarTreeMapper.java | 6 +- .../StarTreeDocValuesFormatTests.java | 69 ++++++++++++++++--- .../datacube/startree/StarTreeTestUtils.java | 40 +++++++++-- .../builder/AbstractStarTreeBuilderTests.java | 28 +++++--- 12 files changed, 163 insertions(+), 44 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/codec/composite/composite99/Composite99DocValuesReader.java b/server/src/main/java/org/opensearch/index/codec/composite/composite99/Composite99DocValuesReader.java index ca55e8e4573ab..7901336151c8e 100644 --- a/server/src/main/java/org/opensearch/index/codec/composite/composite99/Composite99DocValuesReader.java +++ b/server/src/main/java/org/opensearch/index/codec/composite/composite99/Composite99DocValuesReader.java @@ -164,7 +164,7 @@ public Composite99DocValuesReader(DocValuesProducer producer, SegmentReadState r // adding metric fields for (Metric metric : starTreeMetadata.getMetrics()) { - for (MetricStat metricStat : metric.getMetrics()) { + for (MetricStat metricStat : metric.getBaseMetrics()) { fields.add( fullyQualifiedFieldNameForStarTreeMetricsDocValues( compositeFieldName, diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/Metric.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/Metric.java index 9accb0201170a..be16f1e9886cd 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/Metric.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/Metric.java @@ -13,6 +13,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -23,10 +24,18 @@ public class Metric implements ToXContent { private final String field; private final List metrics; + private final List baseMetrics; public Metric(String field, List metrics) { this.field = field; this.metrics = metrics; + this.baseMetrics = new ArrayList<>(); + for (MetricStat metricStat : metrics) { + if (metricStat.isDerivedMetric()) { + continue; + } + baseMetrics.add(metricStat); + } } public String getField() { @@ -37,6 +46,13 @@ public List getMetrics() { return metrics; } + /** + * Returns only the base metrics + */ + public List getBaseMetrics() { + return baseMetrics; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java index ba4cb792c00df..a1d638616f2aa 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java @@ -150,10 +150,7 @@ public List generateMetricAggregatorInfos(MapperService ma metricAggregatorInfos.add(metricAggregatorInfo); continue; } - for (MetricStat metricStat : metric.getMetrics()) { - if (metricStat.isDerivedMetric()) { - continue; - } + for (MetricStat metricStat : metric.getBaseMetrics()) { FieldValueConverter fieldValueConverter; Mapper fieldMapper = mapperService.documentMapper().mappers().getMapper(metric.getField()); if (fieldMapper instanceof FieldMapper && ((FieldMapper) fieldMapper).fieldType() instanceof FieldValueConverter) { @@ -185,7 +182,7 @@ public List getMetricReaders(SegmentWriteState stat List metricReaders = new ArrayList<>(); for (Metric metric : this.starTreeField.getMetrics()) { - for (MetricStat metricStat : metric.getMetrics()) { + for (MetricStat metricStat : metric.getBaseMetrics()) { SequentialDocValuesIterator metricReader; FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField()); if (metricStat.equals(MetricStat.DOC_COUNT)) { diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java index 62fda3e56d289..1613b7c5a3ac0 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java @@ -158,7 +158,7 @@ Iterator mergeStarTrees(List starTreeValuesSub List metricReaders = new ArrayList<>(); // get doc id set iterators for metrics for (Metric metric : starTreeValues.getStarTreeField().getMetrics()) { - for (MetricStat metricStat : metric.getMetrics()) { + for (MetricStat metricStat : metric.getBaseMetrics()) { String metricFullName = fullyQualifiedFieldNameForStarTreeMetricsDocValues( starTreeValues.getStarTreeField().getName(), metric.getField(), diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java index ec9475caf7d6d..1a5c906ad413b 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java @@ -144,7 +144,7 @@ StarTreeDocument[] getSegmentsStarTreeDocuments(List starTreeVal List metricReaders = new ArrayList<>(); // get doc id set iterators for metrics for (Metric metric : starTreeValues.getStarTreeField().getMetrics()) { - for (MetricStat metricStat : metric.getMetrics()) { + for (MetricStat metricStat : metric.getBaseMetrics()) { String metricFullName = fullyQualifiedFieldNameForStarTreeMetricsDocValues( starTreeValues.getStarTreeField().getName(), metric.getField(), diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/fileformats/meta/StarTreeMetadata.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/fileformats/meta/StarTreeMetadata.java index e7bb32282ece7..7352c215ee390 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/fileformats/meta/StarTreeMetadata.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/fileformats/meta/StarTreeMetadata.java @@ -220,16 +220,37 @@ private int readMetricsCount() throws IOException { private List readMetricEntries() throws IOException { int metricCount = readMetricsCount(); - Map starTreeMetricMap = new LinkedHashMap<>(); + Map> starTreeMetricStatMap = new LinkedHashMap<>(); for (int i = 0; i < metricCount; i++) { String metricName = meta.readString(); int metricStatOrdinal = meta.readVInt(); MetricStat metricStat = MetricStat.fromMetricOrdinal(metricStatOrdinal); - Metric metric = starTreeMetricMap.computeIfAbsent(metricName, field -> new Metric(field, new ArrayList<>())); - metric.getMetrics().add(metricStat); + List metricStats = starTreeMetricStatMap.computeIfAbsent(metricName, field -> new ArrayList<>()); + metricStats.add(metricStat); } + List starTreeMetricMap = new ArrayList<>(); + for (Map.Entry> metricStatsEntry : starTreeMetricStatMap.entrySet()) { + addEligibleDerivedMetrics(metricStatsEntry.getValue()); + starTreeMetricMap.add(new Metric(metricStatsEntry.getKey(), metricStatsEntry.getValue())); - return new ArrayList<>(starTreeMetricMap.values()); + } + return starTreeMetricMap; + } + + /** + * Add derived metrics if all associated base metrics are present + */ + private void addEligibleDerivedMetrics(List metricStatsList) { + Set metricStatsSet = new HashSet<>(metricStatsList); + for (MetricStat metric : MetricStat.values()) { + if (metric.isDerivedMetric() && !metricStatsSet.contains(metric)) { + List sourceMetrics = metric.getBaseMetrics(); + if (metricStatsSet.containsAll(sourceMetrics)) { + metricStatsList.add(metric); + metricStatsSet.add(metric); + } + } + } } private int readSegmentAggregatedDocCount() throws IOException { diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/index/StarTreeValues.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/index/StarTreeValues.java index 7d5f5ba02b9f8..a34bbbe9ee738 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/index/StarTreeValues.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/index/StarTreeValues.java @@ -171,7 +171,7 @@ public StarTreeValues( // get doc id set iterators for metrics for (Metric metric : starTreeMetadata.getMetrics()) { - for (MetricStat metricStat : metric.getMetrics()) { + for (MetricStat metricStat : metric.getBaseMetrics()) { String metricFullName = fullyQualifiedFieldNameForStarTreeMetricsDocValues( starTreeField.getName(), metric.getField(), diff --git a/server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java index eb0694edc70ba..43e975f95757b 100644 --- a/server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java @@ -653,9 +653,7 @@ public byte[] encodePoint(Number value) { @Override public double toDoubleValue(long value) { - byte[] bytes = new byte[8]; - NumericUtils.longToSortableBytes(value, bytes, 0); - return NumericUtils.sortableLongToDouble(NumericUtils.sortableBytesToLong(bytes, 0)); + return objectToDouble(value); } @Override diff --git a/server/src/main/java/org/opensearch/index/mapper/StarTreeMapper.java b/server/src/main/java/org/opensearch/index/mapper/StarTreeMapper.java index 9e2e7ef3c3889..52dab17e0b0bb 100644 --- a/server/src/main/java/org/opensearch/index/mapper/StarTreeMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/StarTreeMapper.java @@ -287,11 +287,7 @@ private List buildMetrics(String fieldName, Map map, Map } int numBaseMetrics = 0; for (Metric metric : metrics) { - for (MetricStat metricStat : metric.getMetrics()) { - if (metricStat.isDerivedMetric() == false) { - numBaseMetrics++; - } - } + numBaseMetrics += metric.getBaseMetrics().size(); } if (numBaseMetrics > context.getSettings() .getAsInt( diff --git a/server/src/test/java/org/opensearch/index/codec/composite99/datacube/startree/StarTreeDocValuesFormatTests.java b/server/src/test/java/org/opensearch/index/codec/composite99/datacube/startree/StarTreeDocValuesFormatTests.java index 0c6d21d28cc8a..1c267c67e60ed 100644 --- a/server/src/test/java/org/opensearch/index/codec/composite99/datacube/startree/StarTreeDocValuesFormatTests.java +++ b/server/src/test/java/org/opensearch/index/codec/composite99/datacube/startree/StarTreeDocValuesFormatTests.java @@ -119,23 +119,23 @@ public void testStarTreeDocValues() throws IOException { Document doc = new Document(); doc.add(new SortedNumericDocValuesField("sndv", 1)); doc.add(new SortedNumericDocValuesField("dv", 1)); - doc.add(new SortedNumericDocValuesField("field", 1)); + doc.add(new SortedNumericDocValuesField("field", -1)); iw.addDocument(doc); doc = new Document(); doc.add(new SortedNumericDocValuesField("sndv", 1)); doc.add(new SortedNumericDocValuesField("dv", 1)); - doc.add(new SortedNumericDocValuesField("field", 1)); + doc.add(new SortedNumericDocValuesField("field", -1)); iw.addDocument(doc); doc = new Document(); iw.forceMerge(1); doc.add(new SortedNumericDocValuesField("sndv", 2)); doc.add(new SortedNumericDocValuesField("dv", 2)); - doc.add(new SortedNumericDocValuesField("field", 2)); + doc.add(new SortedNumericDocValuesField("field", -2)); iw.addDocument(doc); doc = new Document(); doc.add(new SortedNumericDocValuesField("sndv", 2)); doc.add(new SortedNumericDocValuesField("dv", 2)); - doc.add(new SortedNumericDocValuesField("field", 2)); + doc.add(new SortedNumericDocValuesField("field", -2)); iw.addDocument(doc); iw.forceMerge(1); iw.close(); @@ -144,11 +144,39 @@ public void testStarTreeDocValues() throws IOException { TestUtil.checkReader(ir); assertEquals(1, ir.leaves().size()); + // Segment documents + /** + * sndv dv field + * [1, 1, -1] + * [1, 1, -1] + * [2, 2, -2] + * [2, 2, -2] + */ + // Star tree docuements + /** + * sndv dv | [ sum, value_count, min, max[field]] , [ sum, value_count, min, max[sndv]], doc_count + * [1, 1] | [-2.0, 2.0, -1.0, -1.0, 2.0, 2.0, 1.0, 1.0, 2.0] + * [2, 2] | [-4.0, 2.0, -2.0, -2.0, 4.0, 2.0, 2.0, 2.0, 2.0] + * [null, 1] | [-2.0, 2.0, -1.0, -1.0, 2.0, 2.0, 1.0, 1.0, 2.0] + * [null, 2] | [-4.0, 2.0, -2.0, -2.0, 4.0, 2.0, 2.0, 2.0, 2.0] + */ StarTreeDocument[] expectedStarTreeDocuments = new StarTreeDocument[4]; - expectedStarTreeDocuments[0] = new StarTreeDocument(new Long[] { 1L, 1L }, new Double[] { 2.0, 2.0, 2.0 }); - expectedStarTreeDocuments[1] = new StarTreeDocument(new Long[] { 2L, 2L }, new Double[] { 4.0, 2.0, 4.0 }); - expectedStarTreeDocuments[2] = new StarTreeDocument(new Long[] { null, 1L }, new Double[] { 2.0, 2.0, 2.0 }); - expectedStarTreeDocuments[3] = new StarTreeDocument(new Long[] { null, 2L }, new Double[] { 4.0, 2.0, 4.0 }); + expectedStarTreeDocuments[0] = new StarTreeDocument( + new Long[] { 1L, 1L }, + new Double[] { -2.0, 2.0, -1.0, -1.0, 2.0, 2.0, 1.0, 1.0, 2.0 } + ); + expectedStarTreeDocuments[1] = new StarTreeDocument( + new Long[] { 2L, 2L }, + new Double[] { -4.0, 2.0, -2.0, -2.0, 4.0, 2.0, 2.0, 2.0, 2.0 } + ); + expectedStarTreeDocuments[2] = new StarTreeDocument( + new Long[] { null, 1L }, + new Double[] { -2.0, 2.0, -1.0, -1.0, 2.0, 2.0, 1.0, 1.0, 2.0 } + ); + expectedStarTreeDocuments[3] = new StarTreeDocument( + new Long[] { null, 2L }, + new Double[] { -4.0, 2.0, -2.0, -2.0, 4.0, 2.0, 2.0, 2.0, 2.0 } + ); for (LeafReaderContext context : ir.leaves()) { SegmentReader reader = Lucene.segmentReader(context.reader()); @@ -159,7 +187,17 @@ public void testStarTreeDocValues() throws IOException { StarTreeValues starTreeValues = (StarTreeValues) starTreeDocValuesReader.getCompositeIndexValues(compositeIndexFieldInfo); StarTreeDocument[] starTreeDocuments = StarTreeTestUtils.getSegmentsStarTreeDocuments( List.of(starTreeValues), - List.of(NumberFieldMapper.NumberType.DOUBLE, NumberFieldMapper.NumberType.LONG, NumberFieldMapper.NumberType.LONG), + List.of( + NumberFieldMapper.NumberType.DOUBLE, + NumberFieldMapper.NumberType.LONG, + NumberFieldMapper.NumberType.DOUBLE, + NumberFieldMapper.NumberType.DOUBLE, + NumberFieldMapper.NumberType.DOUBLE, + NumberFieldMapper.NumberType.LONG, + NumberFieldMapper.NumberType.DOUBLE, + NumberFieldMapper.NumberType.DOUBLE, + NumberFieldMapper.NumberType.LONG + ), reader.maxDoc() ); assertStarTreeDocuments(starTreeDocuments, expectedStarTreeDocuments); @@ -190,6 +228,19 @@ private XContentBuilder getExpandedMapping() throws IOException { b.startArray("stats"); b.value("sum"); b.value("value_count"); + b.value("avg"); + b.value("min"); + b.value("max"); + b.endArray(); + b.endObject(); + b.startObject(); + b.field("name", "sndv"); + b.startArray("stats"); + b.value("sum"); + b.value("value_count"); + b.value("avg"); + b.value("min"); + b.value("max"); b.endArray(); b.endObject(); b.endArray(); diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/StarTreeTestUtils.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/StarTreeTestUtils.java index f3e111bf6caa6..b7395b993f67b 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/StarTreeTestUtils.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/StarTreeTestUtils.java @@ -61,6 +61,9 @@ public static StarTreeDocument[] getSegmentsStarTreeDocuments( // get doc id set iterators for metrics for (Metric metric : starTreeValues.getStarTreeField().getMetrics()) { for (MetricStat metricStat : metric.getMetrics()) { + if (metricStat.isDerivedMetric()) { + continue; + } String metricFullName = fullyQualifiedFieldNameForStarTreeMetricsDocValues( starTreeValues.getStarTreeField().getName(), metric.getField(), @@ -125,18 +128,18 @@ public static void assertStarTreeDocuments(StarTreeDocument[] starTreeDocuments, assertNotNull(resultStarTreeDocument.dimensions); assertNotNull(resultStarTreeDocument.metrics); - assertEquals(resultStarTreeDocument.dimensions.length, expectedStarTreeDocument.dimensions.length); - assertEquals(resultStarTreeDocument.metrics.length, expectedStarTreeDocument.metrics.length); + assertEquals(expectedStarTreeDocument.dimensions.length, resultStarTreeDocument.dimensions.length); + assertEquals(expectedStarTreeDocument.metrics.length, resultStarTreeDocument.metrics.length); for (int di = 0; di < resultStarTreeDocument.dimensions.length; di++) { - assertEquals(resultStarTreeDocument.dimensions[di], expectedStarTreeDocument.dimensions[di]); + assertEquals(expectedStarTreeDocument.dimensions[di], resultStarTreeDocument.dimensions[di]); } for (int mi = 0; mi < resultStarTreeDocument.metrics.length; mi++) { if (expectedStarTreeDocument.metrics[mi] instanceof Long) { - assertEquals(resultStarTreeDocument.metrics[mi], ((Long) expectedStarTreeDocument.metrics[mi]).doubleValue()); + assertEquals(((Long) expectedStarTreeDocument.metrics[mi]).doubleValue(), resultStarTreeDocument.metrics[mi]); } else { - assertEquals(resultStarTreeDocument.metrics[mi], expectedStarTreeDocument.metrics[mi]); + assertEquals(expectedStarTreeDocument.metrics[mi], resultStarTreeDocument.metrics[mi]); } } } @@ -267,9 +270,34 @@ public static void assertStarTreeMetadata(StarTreeMetadata expectedStarTreeMetad Metric expectedMetric = expectedStarTreeMetadata.getMetrics().get(i); Metric resultMetric = resultStarTreeMetadata.getMetrics().get(i); assertEquals(expectedMetric.getField(), resultMetric.getField()); + List metricStats = new ArrayList<>(); + for (MetricStat metricStat : expectedMetric.getMetrics()) { + if (metricStat.isDerivedMetric()) { + continue; + } + metricStats.add(metricStat); + } + Metric expectedMetricWithoutDerivedMetrics = new Metric(expectedMetric.getField(), metricStats); + metricStats = new ArrayList<>(); + for (MetricStat metricStat : resultMetric.getMetrics()) { + if (metricStat.isDerivedMetric()) { + continue; + } + metricStats.add(metricStat); + } + Metric resultantMetricWithoutDerivedMetrics = new Metric(resultMetric.getField(), metricStats); + + // assert base metrics are in order in metadata + for (int j = 0; j < expectedMetricWithoutDerivedMetrics.getMetrics().size(); j++) { + assertEquals( + expectedMetricWithoutDerivedMetrics.getMetrics().get(j), + resultantMetricWithoutDerivedMetrics.getMetrics().get(j) + ); + } + // assert all metrics ( including derived metrics are present ) for (int j = 0; j < expectedMetric.getMetrics().size(); j++) { - assertEquals(expectedMetric.getMetrics().get(j), resultMetric.getMetrics().get(j)); + assertTrue(resultMetric.getMetrics().contains(expectedMetric.getMetrics().get(j))); } } diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractStarTreeBuilderTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractStarTreeBuilderTests.java index ad54bda9a916e..65adc43ea8bea 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractStarTreeBuilderTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractStarTreeBuilderTests.java @@ -205,9 +205,7 @@ private SegmentReadState getReadState(int numDocs, List dimensionFields, int numMetrics = 0; for (Metric metric : metrics) { - for (MetricStat metricStat : metric.getMetrics()) { - numMetrics++; - } + numMetrics += metric.getBaseMetrics().size(); } FieldInfo[] fields = new FieldInfo[dimensionFields.size() + numMetrics]; @@ -237,7 +235,7 @@ private SegmentReadState getReadState(int numDocs, List dimensionFields, } for (Metric metric : metrics) { - for (MetricStat metricStat : metric.getMetrics()) { + for (MetricStat metricStat : metric.getBaseMetrics()) { fields[i] = new FieldInfo( fullyQualifiedFieldNameForStarTreeMetricsDocValues( compositeField.getName(), @@ -2047,7 +2045,7 @@ public void testFlushFlow() throws IOException { VERSION_CURRENT, builder.numStarTreeNodes, List.of("field1", "field3"), - List.of(new Metric("field2", List.of(MetricStat.SUM, MetricStat.VALUE_COUNT))), + List.of(new Metric("field2", List.of(MetricStat.SUM, MetricStat.VALUE_COUNT, MetricStat.AVG))), 6, builder.numStarTreeDocs, 1000, @@ -2138,7 +2136,7 @@ public void testFlushFlowDimsReverse() throws IOException { VERSION_CURRENT, builder.numStarTreeNodes, List.of("field1", "field3"), - List.of(new Metric("field2", List.of(MetricStat.SUM, MetricStat.VALUE_COUNT))), + List.of(new Metric("field2", List.of(MetricStat.SUM, MetricStat.VALUE_COUNT, MetricStat.AVG))), 6, builder.numStarTreeDocs, 1000, @@ -2270,8 +2268,9 @@ private StarTreeField getStarTreeFieldWithMultipleMetrics() { Dimension d2 = new NumericDimension("field3"); Metric m1 = new Metric("field2", List.of(MetricStat.SUM)); Metric m2 = new Metric("field2", List.of(MetricStat.VALUE_COUNT)); + Metric m3 = new Metric("field2", List.of(MetricStat.AVG)); List dims = List.of(d1, d2); - List metrics = List.of(m1, m2); + List metrics = List.of(m1, m2, m3); StarTreeFieldConfiguration c = new StarTreeFieldConfiguration(1000, new HashSet<>(), getBuildMode()); return new StarTreeField("sf", dims, metrics, c); } @@ -4069,12 +4068,19 @@ public void testMergeFlow() throws IOException { metricsWithField.add(i); } + List metricsListValueCount = new ArrayList<>(1000); + List metricsWithFieldValueCount = new ArrayList<>(1000); + for (int i = 0; i < 1000; i++) { + metricsListValueCount.add((long) i); + metricsWithFieldValueCount.add(i); + } + Dimension d1 = new NumericDimension("field1"); Dimension d2 = new NumericDimension("field3"); Dimension d3 = new NumericDimension("field5"); Dimension d4 = new NumericDimension("field8"); // Dimension d5 = new NumericDimension("field5"); - Metric m1 = new Metric("field2", List.of(MetricStat.SUM)); + Metric m1 = new Metric("field2", List.of(MetricStat.SUM, MetricStat.AVG, MetricStat.VALUE_COUNT)); Metric m2 = new Metric("_doc_count", List.of(MetricStat.DOC_COUNT)); List dims = List.of(d1, d2, d3, d4); List metrics = List.of(m1, m2); @@ -4085,6 +4091,7 @@ public void testMergeFlow() throws IOException { SortedNumericDocValues d3sndv = getSortedNumericMock(dimList3, docsWithField3); SortedNumericDocValues d4sndv = getSortedNumericMock(dimList4, docsWithField4); SortedNumericDocValues m1sndv = getSortedNumericMock(metricsList, metricsWithField); + SortedNumericDocValues valucountsndv = getSortedNumericMock(metricsListValueCount, metricsWithFieldValueCount); SortedNumericDocValues m2sndv = DocValues.emptySortedNumeric(); Map> dimDocIdSetIterators = Map.of( "field1", @@ -4100,6 +4107,8 @@ public void testMergeFlow() throws IOException { Map> metricDocIdSetIterators = Map.of( "sf_field2_sum_metric", () -> m1sndv, + "sf_field2_value_count_metric", + () -> valucountsndv, "sf__doc_count_doc_count_metric", () -> m2sndv ); @@ -4118,6 +4127,7 @@ public void testMergeFlow() throws IOException { SortedNumericDocValues f2d3sndv = getSortedNumericMock(dimList3, docsWithField3); SortedNumericDocValues f2d4sndv = getSortedNumericMock(dimList4, docsWithField4); SortedNumericDocValues f2m1sndv = getSortedNumericMock(metricsList, metricsWithField); + SortedNumericDocValues f2valucountsndv = getSortedNumericMock(metricsListValueCount, metricsWithFieldValueCount); SortedNumericDocValues f2m2sndv = DocValues.emptySortedNumeric(); Map> f2dimDocIdSetIterators = Map.of( "field1", @@ -4133,6 +4143,8 @@ public void testMergeFlow() throws IOException { Map> f2metricDocIdSetIterators = Map.of( "sf_field2_sum_metric", () -> f2m1sndv, + "sf_field2_value_count_metric", + () -> f2valucountsndv, "sf__doc_count_doc_count_metric", () -> f2m2sndv ); From 3681b52081fc6e2f9c158800ce5639405811d792 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Wed, 4 Sep 2024 12:17:04 -0700 Subject: [PATCH 2/3] [RW Separation] Add routing preference to route requests only to search replicas. (#15563) * Add routing preference to route requests only to search replicas. This adds SEARCH_REPLICA routing preference and defaults to this preference for indices that have search replicas. Signed-off-by: Marc Handalian * add changelog entry Signed-off-by: Marc Handalian * PR feedback - extract a private method for replica filtering Signed-off-by: Marc Handalian * remove changelog entry Signed-off-by: Marc Handalian --------- Signed-off-by: Marc Handalian --- .../indices/settings/SearchOnlyReplicaIT.java | 37 ++++++++- .../routing/IndexShardRoutingTable.java | 28 ++++--- .../cluster/routing/OperationRouting.java | 12 +++ .../cluster/routing/Preference.java | 7 ++ .../routing/OperationRoutingTests.java | 76 +++++++++++++++++++ .../ClusterStateCreationUtils.java | 26 +++++++ 6 files changed, 176 insertions(+), 10 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java index 5fc8e30ed2c7a..6bd91df1de66f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java @@ -8,14 +8,17 @@ package org.opensearch.indices.settings; +import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.Preference; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.query.QueryBuilders; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; @@ -110,7 +113,6 @@ public void testFailoverWithSearchReplica_WithWriterReplicas() throws IOExceptio // add back a node internalCluster().startDataOnlyNode(); ensureGreen(TEST_INDEX); - } public void testFailoverWithSearchReplica_WithoutWriterReplicas() throws IOException { @@ -175,6 +177,39 @@ public void testSearchReplicaScaling() { assertActiveSearchShards(0); } + public void testSearchReplicaRoutingPreference() throws IOException { + int numSearchReplicas = 1; + int numWriterReplicas = 1; + internalCluster().startClusterManagerOnlyNode(); + String primaryNodeName = internalCluster().startDataOnlyNode(); + createIndex( + TEST_INDEX, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numWriterReplicas) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, numSearchReplicas) + .build() + ); + ensureYellow(TEST_INDEX); + client().prepareIndex(TEST_INDEX).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + // add 2 nodes for the replicas + internalCluster().startDataOnlyNodes(2); + ensureGreen(TEST_INDEX); + + assertActiveShardCounts(numSearchReplicas, numWriterReplicas); + + // set preference to search replica here - we default to this when there are + // search replicas but tests will randomize this value if unset + SearchResponse response = client().prepareSearch(TEST_INDEX) + .setPreference(Preference.SEARCH_REPLICA.type()) + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + + String nodeId = response.getHits().getAt(0).getShard().getNodeId(); + IndexShardRoutingTable indexShardRoutingTable = getIndexShardRoutingTable(); + assertEquals(nodeId, indexShardRoutingTable.searchOnlyReplicas().get(0).currentNodeId()); + } + /** * Helper to assert counts of active shards for each type. */ diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index 4cc3300676986..f25cb14f65eca 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -647,15 +647,11 @@ public ShardIterator replicaActiveInitializingShardIt() { return new PlainShardIterator(shardId, Collections.emptyList()); } - LinkedList ordered = new LinkedList<>(); - for (ShardRouting replica : shuffler.shuffle(replicas)) { - if (replica.active()) { - ordered.addFirst(replica); - } else if (replica.initializing()) { - ordered.addLast(replica); - } - } - return new PlainShardIterator(shardId, ordered); + return filterAndOrderShards(replica -> true); + } + + public ShardIterator searchReplicaActiveInitializingShardIt() { + return filterAndOrderShards(ShardRouting::isSearchOnly); } /** @@ -686,6 +682,20 @@ public ShardIterator replicaFirstActiveInitializingShardsIt() { return new PlainShardIterator(shardId, ordered); } + private ShardIterator filterAndOrderShards(Predicate filter) { + LinkedList ordered = new LinkedList<>(); + for (ShardRouting replica : shuffler.shuffle(replicas)) { + if (filter.test(replica)) { + if (replica.active()) { + ordered.addFirst(replica); + } else if (replica.initializing()) { + ordered.addLast(replica); + } + } + } + return new PlainShardIterator(shardId, ordered); + } + /** * Returns an iterator on active and initializing shards residing on the provided nodeId. */ diff --git a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java index 6242247f34a93..fe9e00b250e70 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java @@ -121,6 +121,7 @@ public class OperationRouting { private volatile boolean isFailOpenEnabled; private volatile boolean isStrictWeightedShardRouting; private volatile boolean ignoreWeightedRouting; + private final boolean isReaderWriterSplitEnabled; public OperationRouting(Settings settings, ClusterSettings clusterSettings) { // whether to ignore awareness attributes when routing requests @@ -141,6 +142,7 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) { clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_FAILOPEN_ENABLED, this::setFailOpenEnabled); clusterSettings.addSettingsUpdateConsumer(STRICT_WEIGHTED_SHARD_ROUTING_ENABLED, this::setStrictWeightedShardRouting); clusterSettings.addSettingsUpdateConsumer(IGNORE_WEIGHTED_SHARD_ROUTING, this::setIgnoreWeightedRouting); + this.isReaderWriterSplitEnabled = FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(settings); } void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) { @@ -254,6 +256,14 @@ public GroupShardsIterator searchShards( preference = Preference.PRIMARY_FIRST.type(); } + if (isReaderWriterSplitEnabled) { + if (preference == null || preference.isEmpty()) { + if (indexMetadataForShard.getNumberOfSearchOnlyReplicas() > 0) { + preference = Preference.SEARCH_REPLICA.type(); + } + } + } + ShardIterator iterator = preferenceActiveShardIterator( shard, clusterState.nodes().getLocalNodeId(), @@ -366,6 +376,8 @@ private ShardIterator preferenceActiveShardIterator( return indexShard.primaryFirstActiveInitializingShardsIt(); case REPLICA_FIRST: return indexShard.replicaFirstActiveInitializingShardsIt(); + case SEARCH_REPLICA: + return indexShard.searchReplicaActiveInitializingShardIt(); case ONLY_LOCAL: return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId); case ONLY_NODES: diff --git a/server/src/main/java/org/opensearch/cluster/routing/Preference.java b/server/src/main/java/org/opensearch/cluster/routing/Preference.java index a1ea01afa118f..093e3d5fd45f8 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/Preference.java +++ b/server/src/main/java/org/opensearch/cluster/routing/Preference.java @@ -73,6 +73,11 @@ public enum Preference { */ REPLICA_FIRST("_replica_first"), + /** + * Route to search replica shards + */ + SEARCH_REPLICA("_search_replica"), + /** * Route to the local shard only */ @@ -127,6 +132,8 @@ public static Preference parse(String preference) { return ONLY_LOCAL; case "_only_nodes": return ONLY_NODES; + case "_search_replica": + return SEARCH_REPLICA; default: throw new IllegalArgumentException("no Preference for [" + preferenceType + "]"); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java index ad8b48d56c417..aaeeb52ab5709 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java @@ -1118,6 +1118,82 @@ public void testPartialIndexPrimaryDefault() throws Exception { } } + public void testSearchReplicaDefaultRouting() throws Exception { + final int numShards = 1; + final int numReplicas = 2; + final int numSearchReplicas = 2; + final String indexName = "test"; + final String[] indexNames = new String[] { indexName }; + + ClusterService clusterService = null; + ThreadPool threadPool = null; + + try { + OperationRouting opRouting = new OperationRouting( + Settings.builder().put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, "true").build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + + ClusterState state = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas( + indexNames, + numShards, + numReplicas, + numSearchReplicas + ); + IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().index(indexName).getShards().get(0); + ShardId shardId = indexShardRoutingTable.searchOnlyReplicas().get(0).shardId(); + + threadPool = new TestThreadPool("testSearchReplicaDefaultRouting"); + clusterService = ClusterServiceUtils.createClusterService(threadPool); + + // add a search replica in initializing state: + DiscoveryNode node = new DiscoveryNode( + "node_initializing", + OpenSearchTestCase.buildNewFakeTransportAddress(), + Collections.emptyMap(), + new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES), + Version.CURRENT + ); + + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings(Settings.builder().put(state.metadata().index(indexName).getSettings()).build()) + .numberOfSearchReplicas(3) + .numberOfReplicas(2) + .build(); + Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()).put(indexMetadata, false).generateClusterUuidIfNeeded(); + IndexRoutingTable.Builder indexShardRoutingBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); + indexShardRoutingBuilder.addIndexShard(indexShardRoutingTable); + indexShardRoutingBuilder.addShard( + TestShardRouting.newShardRouting(shardId, node.getId(), null, false, true, ShardRoutingState.INITIALIZING, null) + ); + state = ClusterState.builder(state) + .routingTable(RoutingTable.builder().add(indexShardRoutingBuilder).build()) + .metadata(metadataBuilder.build()) + .build(); + + // Verify default preference is primary only + GroupShardsIterator groupIterator = opRouting.searchShards(state, indexNames, null, null); + assertThat("one group per shard", groupIterator.size(), equalTo(numShards)); + for (ShardIterator shardIterator : groupIterator) { + assertEquals("We should have 3 shards returned", shardIterator.size(), 3); + int i = 0; + for (ShardRouting shardRouting : shardIterator) { + assertTrue( + "Only search replicas should exist with preference SEARCH_REPLICA", + shardIterator.nextOrNull().isSearchOnly() + ); + if (i == shardIterator.size()) { + assertTrue("Initializing shard should appear last", shardRouting.initializing()); + assertFalse("Initializing shard should appear last", shardRouting.active()); + } + } + } + } finally { + IOUtils.close(clusterService); + terminate(threadPool); + } + } + private DiscoveryNode[] setupNodes() { // Sets up two data nodes in zone-a and one data node in zone-b List zones = Arrays.asList("a", "a", "b"); diff --git a/test/framework/src/main/java/org/opensearch/action/support/replication/ClusterStateCreationUtils.java b/test/framework/src/main/java/org/opensearch/action/support/replication/ClusterStateCreationUtils.java index 8650500df8e95..0c4e871b1330c 100644 --- a/test/framework/src/main/java/org/opensearch/action/support/replication/ClusterStateCreationUtils.java +++ b/test/framework/src/main/java/org/opensearch/action/support/replication/ClusterStateCreationUtils.java @@ -63,6 +63,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED; import static org.opensearch.test.OpenSearchTestCase.randomFrom; @@ -325,7 +326,18 @@ public static ClusterState stateWithAssignedPrimariesAndOneReplica(String index, * Creates cluster state with several indexes, shards and replicas and all shards STARTED. */ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indices, int numberOfShards, int numberOfReplicas) { + return stateWithAssignedPrimariesAndReplicas(indices, numberOfShards, numberOfReplicas, 0); + } + /** + * Creates cluster state with several indexes, shards and replicas and all shards STARTED. + */ + public static ClusterState stateWithAssignedPrimariesAndReplicas( + String[] indices, + int numberOfShards, + int numberOfReplicas, + int numberOfSearchReplicas + ) { int numberOfDataNodes = numberOfReplicas + 1; DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); for (int i = 0; i < numberOfDataNodes + 1; i++) { @@ -347,6 +359,7 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indice .put(SETTING_VERSION_CREATED, Version.CURRENT) .put(SETTING_NUMBER_OF_SHARDS, numberOfShards) .put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) + .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, numberOfSearchReplicas) .put(SETTING_CREATION_DATE, System.currentTimeMillis()) ) .build(); @@ -363,6 +376,19 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indice TestShardRouting.newShardRouting(index, i, newNode(replica + 1).getId(), null, false, ShardRoutingState.STARTED) ); } + for (int replica = numberOfReplicas; replica < numberOfSearchReplicas + numberOfReplicas; replica++) { + indexShardRoutingBuilder.addShard( + TestShardRouting.newShardRouting( + new ShardId(index, IndexMetadata.INDEX_UUID_NA_VALUE, i), + newNode(replica + 1).getId(), + null, + false, + true, + ShardRoutingState.STARTED, + null + ) + ); + } indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build()); } routingTableBuilder.add(indexRoutingTableBuilder.build()); From 7b0846ecb1ec3707cf996b81e74559bff1442dd5 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Thu, 5 Sep 2024 00:55:14 +0530 Subject: [PATCH 3/3] Integrate translog cleanup with snapshot deletion and fix primary term deletion logic (#15657) --------- Signed-off-by: Sachin Kale Co-authored-by: Sachin Kale --- ...pshotITV2.java => DeleteSnapshotV2IT.java} | 11 +- .../store/RemoteSegmentStoreDirectory.java | 2 +- .../RemoteSegmentStoreDirectoryFactory.java | 4 + .../RemoteFsTimestampAwareTranslog.java | 172 ++++++++++++++++-- .../transfer/TranslogTransferManager.java | 8 + .../transfer/TranslogTransferMetadata.java | 50 ++++- .../blobstore/BlobStoreRepository.java | 46 ++++- .../index/remote/RemoteStoreUtilsTests.java | 4 +- .../RemoteSegmentStoreDirectoryTests.java | 8 +- .../RemoteFsTimestampAwareTranslogTests.java | 160 ++++++++++++++-- .../TranslogTransferManagerTests.java | 2 +- 11 files changed, 422 insertions(+), 45 deletions(-) rename server/src/internalClusterTest/java/org/opensearch/snapshots/{DeleteSnapshotITV2.java => DeleteSnapshotV2IT.java} (96%) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotITV2.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java similarity index 96% rename from server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotITV2.java rename to server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java index 02b6ea47172c7..44d5c0a28cd9a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotITV2.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java @@ -32,7 +32,7 @@ import static org.hamcrest.Matchers.lessThan; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class DeleteSnapshotITV2 extends AbstractSnapshotIntegTestCase { +public class DeleteSnapshotV2IT extends AbstractSnapshotIntegTestCase { private static final String REMOTE_REPO_NAME = "remote-store-repo-name"; @@ -276,9 +276,11 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID); Path shardPath = Path.of(String.valueOf(indexPath), "0"); Path segmentsPath = Path.of(String.valueOf(shardPath), "segments"); + Path translogPath = Path.of(String.valueOf(shardPath), "translog"); // Get total segments remote store directory file count for deleted index and shard 0 int segmentFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath); + int translogFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(translogPath); RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); @@ -312,6 +314,13 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountAfterDeletingSnapshot1)); } catch (Exception e) {} }, 60, TimeUnit.SECONDS); + + assertBusy(() -> { + try { + assertThat(RemoteStoreBaseIntegTestCase.getFileCount(translogPath), lessThan(translogFilesCountBeforeDeletingSnapshot1)); + } catch (Exception e) {} + }, 60, TimeUnit.SECONDS); + } private Settings snapshotV2Settings(Path remoteStoreRepoPath) { diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 53b43bbfb3bba..72bf07d4b03b2 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -412,7 +412,7 @@ static long getGeneration(String[] filenameTokens) { public static long getTimestamp(String filename) { String[] filenameTokens = filename.split(SEPARATOR); - return RemoteStoreUtils.invertLong(filenameTokens[6]); + return RemoteStoreUtils.invertLong(filenameTokens[filenameTokens.length - 2]); } public static Tuple getNodeIdByPrimaryTermAndGen(String filename) { diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index ea7e2506deec3..233665e65aed9 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -116,4 +116,8 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s } } + public Supplier getRepositoriesService() { + return this.repositoriesService; + } + } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java index 0b134b3bddbec..27d34ec0d05af 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java @@ -8,6 +8,7 @@ package org.opensearch.index.translog; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.blobstore.BlobMetadata; @@ -33,6 +34,7 @@ import java.util.Optional; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; @@ -52,10 +54,13 @@ */ public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog { + private static Logger staticLogger = LogManager.getLogger(RemoteFsTimestampAwareTranslog.class); private final Logger logger; private final Map metadataFilePinnedTimestampMap; // For metadata files, with no min generation in the name, we cache generation data to avoid multiple reads. private final Map> oldFormatMetadataFileGenerationMap; + private final Map> oldFormatMetadataFilePrimaryTermMap; + private final AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE); public RemoteFsTimestampAwareTranslog( TranslogConfig config, @@ -86,6 +91,7 @@ public RemoteFsTimestampAwareTranslog( logger = Loggers.getLogger(getClass(), shardId); this.metadataFilePinnedTimestampMap = new HashMap<>(); this.oldFormatMetadataFileGenerationMap = new HashMap<>(); + this.oldFormatMetadataFilePrimaryTermMap = new HashMap<>(); } @Override @@ -165,7 +171,11 @@ public void onResponse(List blobMetadata) { return; } - List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles); + List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted( + metadataFiles, + metadataFilePinnedTimestampMap, + logger + ); // If index is not deleted, make sure to keep latest metadata file if (indexDeleted == false) { @@ -209,7 +219,7 @@ public void onResponse(List blobMetadata) { oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted); // Delete stale primary terms - deleteStaleRemotePrimaryTerms(metadataFiles); + deleteStaleRemotePrimaryTerms(metadataFilesNotToBeDeleted); } else { remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); } @@ -259,8 +269,16 @@ protected Set getGenerationsToBeDeleted( return generationsToBeDeleted; } - // Visible for testing protected List getMetadataFilesToBeDeleted(List metadataFiles) { + return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, logger); + } + + // Visible for testing + protected static List getMetadataFilesToBeDeleted( + List metadataFiles, + Map metadataFilePinnedTimestampMap, + Logger logger + ) { Tuple> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps(); // Keep files since last successful run of scheduler @@ -351,27 +369,153 @@ protected Tuple getMinMaxTranslogGenerationFromMetadataFile( } } + private void deleteStaleRemotePrimaryTerms(List metadataFiles) { + deleteStaleRemotePrimaryTerms( + metadataFiles, + translogTransferManager, + oldFormatMetadataFilePrimaryTermMap, + minPrimaryTermInRemote, + logger + ); + } + /** * This method must be called only after there are valid generations to delete in trimUnreferencedReaders as it ensures * implicitly that minimum primary term in latest translog metadata in remote store is the current primary term. *
* This will also delete all stale translog metadata files from remote except the latest basis the metadata file comparator. */ - private void deleteStaleRemotePrimaryTerms(List metadataFiles) { + protected static void deleteStaleRemotePrimaryTerms( + List metadataFiles, + TranslogTransferManager translogTransferManager, + Map> oldFormatMetadataFilePrimaryTermMap, + AtomicLong minPrimaryTermInRemoteAtomicLong, + Logger logger + ) { // The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there // are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part // of older primary term. - if (olderPrimaryCleaned.trySet(Boolean.TRUE)) { - if (metadataFiles.isEmpty()) { - logger.trace("No metadata is uploaded yet, returning from deleteStaleRemotePrimaryTerms"); - return; + if (metadataFiles.isEmpty()) { + logger.trace("No metadata is uploaded yet, returning from deleteStaleRemotePrimaryTerms"); + return; + } + Optional minPrimaryTermFromMetadataFiles = metadataFiles.stream().map(file -> { + try { + return getMinMaxPrimaryTermFromMetadataFile(file, translogTransferManager, oldFormatMetadataFilePrimaryTermMap).v1(); + } catch (IOException e) { + return Long.MAX_VALUE; + } + }).min(Long::compareTo); + // First we delete all stale primary terms folders from remote store + Long minPrimaryTermInRemote = getMinPrimaryTermInRemote(minPrimaryTermInRemoteAtomicLong, translogTransferManager, logger); + if (minPrimaryTermFromMetadataFiles.get() > minPrimaryTermInRemote) { + translogTransferManager.deletePrimaryTermsAsync(minPrimaryTermFromMetadataFiles.get()); + minPrimaryTermInRemoteAtomicLong.set(minPrimaryTermFromMetadataFiles.get()); + } else { + logger.debug( + "Skipping primary term cleanup. minimumReferencedPrimaryTerm = {}, minPrimaryTermInRemote = {}", + minPrimaryTermFromMetadataFiles.get(), + minPrimaryTermInRemote + ); + } + } + + private static Long getMinPrimaryTermInRemote( + AtomicLong minPrimaryTermInRemote, + TranslogTransferManager translogTransferManager, + Logger logger + ) { + if (minPrimaryTermInRemote.get() == Long.MAX_VALUE) { + try { + Set primaryTermsInRemote = translogTransferManager.listPrimaryTermsInRemote(); + if (primaryTermsInRemote.isEmpty() == false) { + Optional minPrimaryTerm = primaryTermsInRemote.stream().min(Long::compareTo); + minPrimaryTerm.ifPresent(minPrimaryTermInRemote::set); + } + } catch (IOException e) { + logger.error("Exception while listing primary terms in remote translog", e); + } + } + return minPrimaryTermInRemote.get(); + } + + protected static Tuple getMinMaxPrimaryTermFromMetadataFile( + String metadataFile, + TranslogTransferManager translogTransferManager, + Map> oldFormatMetadataFilePrimaryTermMap + ) throws IOException { + Tuple minMaxPrimaryTermFromFileName = TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(metadataFile); + if (minMaxPrimaryTermFromFileName != null) { + return minMaxPrimaryTermFromFileName; + } else { + if (oldFormatMetadataFilePrimaryTermMap.containsKey(metadataFile)) { + return oldFormatMetadataFilePrimaryTermMap.get(metadataFile); + } else { + TranslogTransferMetadata metadata = translogTransferManager.readMetadata(metadataFile); + long maxPrimaryTem = TranslogTransferMetadata.getPrimaryTermFromFileName(metadataFile); + long minPrimaryTem = -1; + if (metadata.getGenerationToPrimaryTermMapper() != null + && metadata.getGenerationToPrimaryTermMapper().values().isEmpty() == false) { + Optional primaryTerm = metadata.getGenerationToPrimaryTermMapper() + .values() + .stream() + .map(s -> Long.parseLong(s)) + .min(Long::compareTo); + if (primaryTerm.isPresent()) { + minPrimaryTem = primaryTerm.get(); + } + } + Tuple minMaxPrimaryTermTuple = new Tuple<>(minPrimaryTem, maxPrimaryTem); + oldFormatMetadataFilePrimaryTermMap.put(metadataFile, minMaxPrimaryTermTuple); + return minMaxPrimaryTermTuple; } - Optional minPrimaryTerm = metadataFiles.stream() - .map(file -> RemoteStoreUtils.invertLong(file.split(METADATA_SEPARATOR)[1])) - .min(Long::compareTo); - // First we delete all stale primary terms folders from remote store - long minimumReferencedPrimaryTerm = minPrimaryTerm.get() - 1; - translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm); } } + + public static void cleanup(TranslogTransferManager translogTransferManager) throws IOException { + ActionListener> listMetadataFilesListener = new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + List metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()); + + try { + if (metadataFiles.isEmpty()) { + staticLogger.debug("No stale translog metadata files found"); + return; + } + List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, new HashMap<>(), staticLogger); + if (metadataFilesToBeDeleted.isEmpty()) { + staticLogger.debug("No metadata files to delete"); + return; + } + staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted); + + // For all the files that we are keeping, fetch min and max generations + List metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles); + metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted); + staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted); + + // Delete stale metadata files + translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {}); + + // Delete stale primary terms + deleteStaleRemotePrimaryTerms( + metadataFilesNotToBeDeleted, + translogTransferManager, + new HashMap<>(), + new AtomicLong(Long.MAX_VALUE), + staticLogger + ); + } catch (Exception e) { + staticLogger.error("Exception while cleaning up metadata and primary terms", e); + } + } + + @Override + public void onFailure(Exception e) { + staticLogger.error("Exception while cleaning up metadata and primary terms", e); + } + }; + translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 56a9aa6447dec..291218ea47499 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -545,6 +545,14 @@ public void onFailure(Exception e) { }); } + public Set listPrimaryTermsInRemote() throws IOException { + Set primaryTermsStr = transferService.listFolders(remoteDataTransferPath); + if (primaryTermsStr != null) { + return primaryTermsStr.stream().map(Long::parseLong).collect(Collectors.toSet()); + } + return new HashSet<>(); + } + /** * Handles deletion of all translog files associated with a primary term. * diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 745fa9a8a219a..3b8885055e8f7 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -19,6 +19,7 @@ import java.util.Arrays; import java.util.Map; import java.util.Objects; +import java.util.Optional; /** * The metadata associated with every transfer {@link TransferSnapshot}. The metadata is uploaded at the end of the @@ -108,11 +109,28 @@ public String getFileName() { RemoteStoreUtils.invertLong(createdAt), String.valueOf(Objects.hash(nodeId)), RemoteStoreUtils.invertLong(minTranslogGeneration), + String.valueOf(getMinPrimaryTermReferred()), String.valueOf(CURRENT_VERSION) ) ); } + private long getMinPrimaryTermReferred() { + if (generationToPrimaryTermMapper.get() == null || generationToPrimaryTermMapper.get().values().isEmpty()) { + return -1; + } + Optional minPrimaryTerm = generationToPrimaryTermMapper.get() + .values() + .stream() + .map(s -> Long.parseLong(s)) + .min(Long::compareTo); + if (minPrimaryTerm.isPresent()) { + return minPrimaryTerm.get(); + } else { + return -1; + } + } + public static Tuple, String> getNodeIdByPrimaryTermAndGeneration(String filename) { String[] tokens = filename.split(METADATA_SEPARATOR); if (tokens.length < 6) { @@ -143,15 +161,43 @@ public static Tuple getMinMaxTranslogGenerationFromFilename(String f assert Version.CURRENT.onOrAfter(Version.V_2_17_0); try { // instead of direct index, we go backwards to avoid running into same separator in nodeId - String minGeneration = tokens[tokens.length - 2]; + String minGeneration = tokens[tokens.length - 3]; String maxGeneration = tokens[2]; return new Tuple<>(RemoteStoreUtils.invertLong(minGeneration), RemoteStoreUtils.invertLong(maxGeneration)); - } catch (NumberFormatException e) { + } catch (Exception e) { logger.error(() -> new ParameterizedMessage("Exception while getting min and max translog generation from: {}", filename), e); return null; } } + public static Tuple getMinMaxPrimaryTermFromFilename(String filename) { + String[] tokens = filename.split(METADATA_SEPARATOR); + if (tokens.length < 7) { + // For versions < 2.17, we don't have min primary term. + return null; + } + assert Version.CURRENT.onOrAfter(Version.V_2_17_0); + try { + // instead of direct index, we go backwards to avoid running into same separator in nodeId + String minPrimaryTerm = tokens[tokens.length - 2]; + String maxPrimaryTerm = tokens[1]; + return new Tuple<>(Long.parseLong(minPrimaryTerm), RemoteStoreUtils.invertLong(maxPrimaryTerm)); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Exception while getting min and max primary term from: {}", filename), e); + return null; + } + } + + public static long getPrimaryTermFromFileName(String filename) { + String[] tokens = filename.split(METADATA_SEPARATOR); + try { + return RemoteStoreUtils.invertLong(tokens[1]); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Exception while getting max primary term from: {}", filename), e); + return -1; + } + } + @Override public int hashCode() { return Objects.hash(primaryTerm, generation); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 9ce9c0823429a..01d924aa17839 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -116,6 +116,7 @@ import org.opensearch.index.remote.RemoteStorePathStrategy.PathInput; import org.opensearch.index.remote.RemoteStorePathStrategy.SnapshotShardPathInput; import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.remote.RemoteTranslogTransferTracker; import org.opensearch.index.snapshots.IndexShardRestoreFailedException; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; @@ -132,6 +133,10 @@ import org.opensearch.index.store.lockmanager.FileLockInfo; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; +import org.opensearch.index.translog.RemoteFsTimestampAwareTranslog; +import org.opensearch.index.translog.RemoteFsTranslog; +import org.opensearch.index.translog.transfer.FileTransferTracker; +import org.opensearch.index.translog.transfer.TranslogTransferManager; import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; @@ -2143,25 +2148,35 @@ private void cleanRemoteStoreDirectoryIfNeeded( } IndexMetadata prevIndexMetadata = this.getSnapshotIndexMetaData(oldRepoData, snapshotId, indexId); if (prevIndexMetadata != null && !isIndexPresent(clusterService, prevIndexMetadata.getIndexUUID())) { - String remoteStoreRepository = IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.get( + String remoteStoreRepository = IndexMetadata.INDEX_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING.get( prevIndexMetadata.getSettings() ); assert (remoteStoreRepository != null); + String remoteTranslogRepositoryName = IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.get( + prevIndexMetadata.getSettings() + ); + assert (remoteTranslogRepositoryName != null); + Repository remoteTranslogRepository = remoteSegmentStoreDirectoryFactory.getRepositoriesService() + .get() + .repository(remoteTranslogRepositoryName); + RemoteStorePathStrategy remoteStorePathStrategy = RemoteStoreUtils.determineRemoteStorePathStrategy( prevIndexMetadata ); for (int shardId = 0; shardId < prevIndexMetadata.getNumberOfShards(); shardId++) { + ShardId shard = new ShardId(Index.UNKNOWN_INDEX_NAME, prevIndexMetadata.getIndexUUID(), shardId); remoteDirectoryCleanupAsync( remoteSegmentStoreDirectoryFactory, threadPool, remoteStoreRepository, prevIndexMetadata.getIndexUUID(), - new ShardId(Index.UNKNOWN_INDEX_NAME, prevIndexMetadata.getIndexUUID(), shardId), + shard, ThreadPool.Names.REMOTE_PURGE, remoteStorePathStrategy ); + remoteTranslogCleanupAsync(remoteTranslogRepository, shard, remoteStorePathStrategy, prevIndexMetadata); } } } catch (Exception e) { @@ -2181,6 +2196,33 @@ private void cleanRemoteStoreDirectoryIfNeeded( } + private void remoteTranslogCleanupAsync( + Repository remoteTranslogRepository, + ShardId shardId, + RemoteStorePathStrategy remoteStorePathStrategy, + IndexMetadata prevIndexMetadata + ) { + assert remoteTranslogRepository instanceof BlobStoreRepository; + boolean indexMetadataEnabled = RemoteStoreUtils.determineTranslogMetadataEnabled(prevIndexMetadata); + RemoteTranslogTransferTracker remoteTranslogTransferTracker = new RemoteTranslogTransferTracker(shardId, 1000); + FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker); + TranslogTransferManager translogTransferManager = RemoteFsTranslog.buildTranslogTransferManager( + (BlobStoreRepository) remoteTranslogRepository, + threadPool, + shardId, + fileTransferTracker, + remoteTranslogTransferTracker, + remoteStorePathStrategy, + remoteStoreSettings, + indexMetadataEnabled + ); + try { + RemoteFsTimestampAwareTranslog.cleanup(translogTransferManager); + } catch (IOException e) { + logger.error("Exception while cleaning up remote translog for shard: " + shardId, e); + } + } + /** * Finds and returns a list of shard paths that match the given index ID. * diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java index 2a34dd3580948..be30de97ee830 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java @@ -637,7 +637,7 @@ private Tuple, Set> testGetPinnedTimestampLockedFilesW String metadataPrefix = "metadata__1__2__3__4__5__"; Map metadataFiles = new HashMap<>(); for (Long metadataFileTimestamp : metadataFileTimestamps) { - metadataFiles.put(metadataFileTimestamp, metadataPrefix + RemoteStoreUtils.invertLong(metadataFileTimestamp)); + metadataFiles.put(metadataFileTimestamp, metadataPrefix + RemoteStoreUtils.invertLong(metadataFileTimestamp) + "__1"); } return new Tuple<>( metadataFiles, @@ -662,7 +662,7 @@ private Tuple, Set> testGetPinnedTimestampLockedFilesW String primaryTerm = RemoteStoreUtils.invertLong(metadataFileTimestampPrimaryTerm.getValue()); String metadataPrefix = "metadata__" + primaryTerm + "__2__3__4__5__"; long metadataFileTimestamp = metadataFileTimestampPrimaryTerm.getKey(); - metadataFiles.put(metadataFileTimestamp, metadataPrefix + RemoteStoreUtils.invertLong(metadataFileTimestamp)); + metadataFiles.put(metadataFileTimestamp, metadataPrefix + RemoteStoreUtils.invertLong(metadataFileTimestamp) + "__1"); } return new Tuple<>( metadataFiles, diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 336d4bafd4b66..ecd6620dbea15 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -1170,9 +1170,9 @@ public void testInitializeToSpecificTimestampNoMdMatchingTimestamp() throws IOEx public void testInitializeToSpecificTimestampMatchingMdFile() throws IOException { String metadataPrefix = "metadata__1__2__3__4__5__"; List metadataFiles = new ArrayList<>(); - metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(1000)); - metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(2000)); - metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(3000)); + metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(1000) + "__1"); + metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(2000) + "__1"); + metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(3000) + "__1"); Map metadata = new HashMap<>(); metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512::" + Version.LATEST.major); @@ -1184,7 +1184,7 @@ public void testInitializeToSpecificTimestampMatchingMdFile() throws IOException Integer.MAX_VALUE ) ).thenReturn(metadataFiles); - when(remoteMetadataDirectory.getBlobStream(metadataPrefix + RemoteStoreUtils.invertLong(1000))).thenReturn( + when(remoteMetadataDirectory.getBlobStream(metadataPrefix + RemoteStoreUtils.invertLong(1000) + "__1")).thenReturn( createMetadataFileBytes(metadata, indexShard.getLatestReplicationCheckpoint(), segmentInfos) ); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java index 1f82dd9d7e641..c510a6475147d 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java @@ -8,6 +8,8 @@ package org.opensearch.index.translog; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.metadata.RepositoryMetadata; @@ -53,6 +55,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -63,7 +66,9 @@ import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR; import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -211,16 +216,44 @@ public void onFailure(Exception e) { // Node id containing separator String nodeIdWithSeparator = - "metadata__9223372036438563903__9223372036854774799__9223370311919910393__node__1__9223372036438563958__1"; + "metadata__9223372036438563903__9223372036854774799__9223370311919910393__node__1__9223372036438563958__2__1"; Tuple minMaxGen = TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(nodeIdWithSeparator); Long minGen = Long.MAX_VALUE - 9223372036438563958L; assertEquals(minGen, minMaxGen.v1()); // Malformed md filename - String malformedMdFileName = "metadata__9223372036438563903__9223372036854774799__9223370311919910393__node1__xyz__1"; + String malformedMdFileName = "metadata__9223372036438563903__9223372036854774799__9223370311919910393__node1__xyz__3__1"; assertNull(TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(malformedMdFileName)); } + public void testGetMinMaxPrimaryTermFromFilename() throws Exception { + // New format metadata file + String newFormatMetadataFile = + "metadata__9223372036854775800__9223372036854774799__9223370311919910393__node1__9223372036438563958__2__1"; + Tuple minMaxPrimaryterm = TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(newFormatMetadataFile); + Long minPrimaryTerm = 2L; + Long maxPrimaryTerm = 7L; + assertEquals(minPrimaryTerm, minMaxPrimaryterm.v1()); + assertEquals(maxPrimaryTerm, minMaxPrimaryterm.v2()); + + // Old format metadata file + String oldFormatMdFilename = "metadata__9223372036438563903__9223372036854774799__9223370311919910393__31__1"; + assertNull(TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(oldFormatMdFilename)); + + // Node id containing separator + String nodeIdWithSeparator = + "metadata__9223372036854775800__9223372036854774799__9223370311919910393__node__1__9223372036438563958__2__1"; + minMaxPrimaryterm = TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(nodeIdWithSeparator); + minPrimaryTerm = 2L; + maxPrimaryTerm = 7L; + assertEquals(minPrimaryTerm, minMaxPrimaryterm.v1()); + assertEquals(maxPrimaryTerm, minMaxPrimaryterm.v2()); + + // Malformed md filename + String malformedMdFileName = "metadata__9223372036854775800__9223372036854774799__9223370311919910393__node1__xyz__3qwe__1"; + assertNull(TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(malformedMdFileName)); + } + public void testIndexDeletionWithNoPinnedTimestampNoRecentMdFiles() throws Exception { RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); ArrayList ops = new ArrayList<>(); @@ -604,11 +637,11 @@ public void testGetGenerationsToBeDeletedEmptyMetadataFilesNotToBeDeleted() thro List metadataFilesNotToBeDeleted = new ArrayList<>(); List metadataFilesToBeDeleted = List.of( // 4 to 7 - "metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__9223372036854775803__1", + "metadata__9223372036854775806__9223372036854775800__9223370311919910398__31__9223372036854775803__1__1", // 17 to 37 - "metadata__9223372036438563903__9223372036854775770__9223370311919910398__31__9223372036854775790__1", + "metadata__9223372036854775806__9223372036854775770__9223370311919910398__31__9223372036854775790__1__1", // 27 to 42 - "metadata__9223372036438563903__9223372036854775765__9223370311919910403__31__9223372036854775780__1" + "metadata__9223372036854775806__9223372036854775765__9223370311919910403__31__9223372036854775780__1__1" ); Set generations = ((RemoteFsTimestampAwareTranslog) translog).getGenerationsToBeDeleted( metadataFilesNotToBeDeleted, @@ -618,6 +651,7 @@ public void testGetGenerationsToBeDeletedEmptyMetadataFilesNotToBeDeleted() thro Set md1Generations = LongStream.rangeClosed(4, 7).boxed().collect(Collectors.toSet()); Set md2Generations = LongStream.rangeClosed(17, 37).boxed().collect(Collectors.toSet()); Set md3Generations = LongStream.rangeClosed(27, 42).boxed().collect(Collectors.toSet()); + assertTrue(generations.containsAll(md1Generations)); assertTrue(generations.containsAll(md2Generations)); assertTrue(generations.containsAll(md3Generations)); @@ -631,19 +665,19 @@ public void testGetGenerationsToBeDeletedEmptyMetadataFilesNotToBeDeleted() thro public void testGetGenerationsToBeDeleted() throws IOException { List metadataFilesNotToBeDeleted = List.of( // 1 to 4 - "metadata__9223372036438563903__9223372036854775803__9223370311919910398__31__9223372036854775806__1", + "metadata__9223372036854775806__9223372036854775803__9223370311919910398__31__9223372036854775806__1__1", // 26 to 30 - "metadata__9223372036438563903__9223372036854775777__9223370311919910398__31__9223372036854775781__1", + "metadata__9223372036854775806__9223372036854775777__9223370311919910398__31__9223372036854775781__1__1", // 42 to 100 - "metadata__9223372036438563903__9223372036854775707__9223370311919910403__31__9223372036854775765__1" + "metadata__9223372036854775806__9223372036854775707__9223370311919910403__31__9223372036854775765__1__1" ); List metadataFilesToBeDeleted = List.of( // 4 to 7 - "metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__9223372036854775803__1", + "metadata__9223372036854775806__9223372036854775800__9223370311919910398__31__9223372036854775803__1__1", // 17 to 37 - "metadata__9223372036438563903__9223372036854775770__9223370311919910398__31__9223372036854775790__1", + "metadata__9223372036854775806__9223372036854775770__9223370311919910398__31__9223372036854775790__1__1", // 27 to 42 - "metadata__9223372036438563903__9223372036854775765__9223370311919910403__31__9223372036854775780__1" + "metadata__9223372036854775806__9223372036854775765__9223370311919910403__31__9223372036854775780__1__1" ); Set generations = ((RemoteFsTimestampAwareTranslog) translog).getGenerationsToBeDeleted( metadataFilesNotToBeDeleted, @@ -653,6 +687,7 @@ public void testGetGenerationsToBeDeleted() throws IOException { Set md1Generations = LongStream.rangeClosed(5, 7).boxed().collect(Collectors.toSet()); Set md2Generations = LongStream.rangeClosed(17, 25).boxed().collect(Collectors.toSet()); Set md3Generations = LongStream.rangeClosed(31, 41).boxed().collect(Collectors.toSet()); + assertTrue(generations.containsAll(md1Generations)); assertTrue(generations.containsAll(md2Generations)); assertTrue(generations.containsAll(md3Generations)); @@ -783,49 +818,49 @@ public void testGetMinMaxTranslogGenerationFromMetadataFile() throws IOException assertEquals( new Tuple<>(701L, 1008L), translog.getMinMaxTranslogGenerationFromMetadataFile( - "metadata__9223372036438563903__9223372036854774799__9223370311919910393__31__9223372036854775106__1", + "metadata__9223372036438563903__9223372036854774799__9223370311919910393__31__9223372036854775106__1__1", translogTransferManager ) ); assertEquals( new Tuple<>(4L, 7L), translog.getMinMaxTranslogGenerationFromMetadataFile( - "metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__9223372036854775803__1", + "metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__9223372036854775803__2__1", translogTransferManager ) ); assertEquals( new Tuple<>(106L, 106L), translog.getMinMaxTranslogGenerationFromMetadataFile( - "metadata__9223372036438563903__9223372036854775701__9223370311919910403__31__9223372036854775701__1", + "metadata__9223372036438563903__9223372036854775701__9223370311919910403__31__9223372036854775701__3__1", translogTransferManager ) ); assertEquals( new Tuple<>(4573L, 99964L), translog.getMinMaxTranslogGenerationFromMetadataFile( - "metadata__9223372036438563903__9223372036854675843__9223370311919910408__31__9223372036854771234__1", + "metadata__9223372036438563903__9223372036854675843__9223370311919910408__31__9223372036854771234__4__1", translogTransferManager ) ); assertEquals( new Tuple<>(1L, 4L), translog.getMinMaxTranslogGenerationFromMetadataFile( - "metadata__9223372036438563903__9223372036854775803__9223370311919910413__31__9223372036854775806__1", + "metadata__9223372036438563903__9223372036854775803__9223370311919910413__31__9223372036854775806__5__1", translogTransferManager ) ); assertEquals( new Tuple<>(2474L, 3462L), translog.getMinMaxTranslogGenerationFromMetadataFile( - "metadata__9223372036438563903__9223372036854772345__9223370311919910429__31__9223372036854773333__1", + "metadata__9223372036438563903__9223372036854772345__9223370311919910429__31__9223372036854773333__6__1", translogTransferManager ) ); assertEquals( new Tuple<>(5807L, 7917L), translog.getMinMaxTranslogGenerationFromMetadataFile( - "metadata__9223372036438563903__9223372036854767890__9223370311919910434__31__9223372036854770000__1", + "metadata__9223372036438563903__9223372036854767890__9223370311919910434__31__9223372036854770000__7__1", translogTransferManager ) ); @@ -859,4 +894,93 @@ public void testGetMinMaxTranslogGenerationFromMetadataFile() throws IOException verify(translogTransferManager).readMetadata("metadata__9223372036438563903__9223372036854774799__9223370311919910393__31__1"); verify(translogTransferManager).readMetadata("metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__1"); } + + public void testDeleteStaleRemotePrimaryTerms() throws IOException { + TranslogTransferManager translogTransferManager = mock(TranslogTransferManager.class); + + List metadataFiles = List.of( + // PT 4 to 9 + "metadata__9223372036854775798__9223372036854774799__9223370311919910393__node1__9223372036438563958__4__1", + // PT 2 to 7 + "metadata__9223372036854775800__9223372036854774799__9223370311919910393__node1__9223372036438563958__2__1", + // PT 2 to 6 + "metadata__9223372036854775801__9223372036854774799__9223370311919910393__node1__9223372036438563958__2__1" + ); + + Logger staticLogger = LogManager.getLogger(RemoteFsTimestampAwareTranslogTests.class); + when(translogTransferManager.listPrimaryTermsInRemote()).thenReturn(Set.of(1L, 2L, 3L, 4L)); + AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE); + RemoteFsTimestampAwareTranslog.deleteStaleRemotePrimaryTerms( + metadataFiles, + translogTransferManager, + new HashMap<>(), + minPrimaryTermInRemote, + staticLogger + ); + verify(translogTransferManager).deletePrimaryTermsAsync(2L); + assertEquals(2, minPrimaryTermInRemote.get()); + + RemoteFsTimestampAwareTranslog.deleteStaleRemotePrimaryTerms( + metadataFiles, + translogTransferManager, + new HashMap<>(), + minPrimaryTermInRemote, + staticLogger + ); + // This means there are no new invocations of deletePrimaryTermAsync + verify(translogTransferManager, times(1)).deletePrimaryTermsAsync(anyLong()); + } + + public void testDeleteStaleRemotePrimaryTermsNoPrimaryTermInRemote() throws IOException { + TranslogTransferManager translogTransferManager = mock(TranslogTransferManager.class); + + List metadataFiles = List.of( + // PT 4 to 9 + "metadata__9223372036854775798__9223372036854774799__9223370311919910393__node1__9223372036438563958__4__1", + // PT 2 to 7 + "metadata__9223372036854775800__9223372036854774799__9223370311919910393__node1__9223372036438563958__2__1", + // PT 2 to 6 + "metadata__9223372036854775801__9223372036854774799__9223370311919910393__node1__9223372036438563958__2__1" + ); + + Logger staticLogger = LogManager.getLogger(RemoteFsTimestampAwareTranslogTests.class); + when(translogTransferManager.listPrimaryTermsInRemote()).thenReturn(Set.of()); + AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE); + RemoteFsTimestampAwareTranslog.deleteStaleRemotePrimaryTerms( + metadataFiles, + translogTransferManager, + new HashMap<>(), + minPrimaryTermInRemote, + staticLogger + ); + verify(translogTransferManager, times(0)).deletePrimaryTermsAsync(anyLong()); + assertEquals(Long.MAX_VALUE, minPrimaryTermInRemote.get()); + } + + public void testDeleteStaleRemotePrimaryTermsPrimaryTermInRemoteIsBigger() throws IOException { + TranslogTransferManager translogTransferManager = mock(TranslogTransferManager.class); + + List metadataFiles = List.of( + // PT 4 to 9 + "metadata__9223372036854775798__9223372036854774799__9223370311919910393__node1__9223372036438563958__4__1", + // PT 2 to 7 + "metadata__9223372036854775800__9223372036854774799__9223370311919910393__node1__9223372036438563958__2__1", + // PT 2 to 6 + "metadata__9223372036854775801__9223372036854774799__9223370311919910393__node1__9223372036438563958__2__1" + ); + + Logger staticLogger = LogManager.getLogger(RemoteFsTimestampAwareTranslogTests.class); + when(translogTransferManager.listPrimaryTermsInRemote()).thenReturn(Set.of(2L, 3L, 4L)); + AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE); + RemoteFsTimestampAwareTranslog.deleteStaleRemotePrimaryTerms( + metadataFiles, + translogTransferManager, + new HashMap<>(), + minPrimaryTermInRemote, + staticLogger + ); + verify(translogTransferManager, times(0)).deletePrimaryTermsAsync(anyLong()); + assertEquals(2, minPrimaryTermInRemote.get()); + } + } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 8605043ddd5b5..ed0d6b7d50706 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -628,7 +628,7 @@ public void testMetadataConflict() throws InterruptedException { String mdFilename = tm.getFileName(); long count = mdFilename.chars().filter(ch -> ch == METADATA_SEPARATOR.charAt(0)).count(); // There should not be any `_` in mdFile name as it is used a separator . - assertEquals(12, count); + assertEquals(14, count); Thread.sleep(1); TranslogTransferMetadata tm2 = new TranslogTransferMetadata(1, 1, 1, 2, "node--2"); String mdFilename2 = tm2.getFileName();