From 222ad3b2371c1709c1b07d8e29e43506463ff25f Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Mon, 8 Jul 2024 16:03:45 -0700 Subject: [PATCH 1/5] introduce circuit breaker in InternalHistogram Signed-off-by: bowenlan-amzn --- .../gradle/testclusters/OpenSearchNode.java | 2 +- .../aggregations/bucket/BucketsAggregator.java | 2 +- .../histogram/AbstractHistogramAggregator.java | 5 +++-- .../bucket/histogram/InternalHistogram.java | 13 +++++++++---- .../bucket/histogram/InternalHistogramTests.java | 4 ++-- 5 files changed, 16 insertions(+), 10 deletions(-) diff --git a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java index 268de50340cbf..a8f382cfc5e0f 100644 --- a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java +++ b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java @@ -1179,7 +1179,7 @@ private void createConfiguration() { // Temporarily disable the real memory usage circuit breaker. It depends on real memory usage which we have no full control // over and the REST client will not retry on circuit breaking exceptions yet (see #31986 for details). Once the REST client // can retry on circuit breaking exceptions, we can revert again to the default configuration. - baseConfig.put("indices.breaker.total.use_real_memory", "false"); + baseConfig.put("indices.breaker.total.use_real_memory", "true"); // Don't wait for state, just start up quickly. This will also allow new and old nodes in the BWC case to become the master baseConfig.put("discovery.initial_state_timeout", "0s"); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java index eef427754f535..9227380299b34 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java @@ -70,7 +70,7 @@ public abstract class BucketsAggregator extends AggregatorBase { private final BigArrays bigArrays; - private final IntConsumer multiBucketConsumer; + protected final IntConsumer multiBucketConsumer; private LongArray docCounts; protected final DocCountProvider docCountProvider; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AbstractHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AbstractHistogramAggregator.java index d3a4a51e5b6f2..27e877ff0f64e 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AbstractHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AbstractHistogramAggregator.java @@ -40,6 +40,7 @@ import org.opensearch.search.aggregations.BucketOrder; import org.opensearch.search.aggregations.CardinalityUpperBound; import org.opensearch.search.aggregations.InternalAggregation; +import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.bucket.BucketsAggregator; import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; import org.opensearch.search.internal.SearchContext; @@ -121,7 +122,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I buildEmptySubAggregations() ); } - return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed, metadata()); + return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed, metadata(), (MultiBucketConsumerService.MultiBucketConsumer) multiBucketConsumer); }); } @@ -137,7 +138,7 @@ public InternalAggregation buildEmptyAggregation() { buildEmptySubAggregations() ); } - return new InternalHistogram(name, Collections.emptyList(), order, minDocCount, emptyBucketInfo, formatter, keyed, metadata()); + return new InternalHistogram(name, Collections.emptyList(), order, minDocCount, emptyBucketInfo, formatter, keyed, metadata(), (MultiBucketConsumerService.MultiBucketConsumer) multiBucketConsumer); } @Override diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/InternalHistogram.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/InternalHistogram.java index a27c689127ac9..ca55ad3638c02 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -44,6 +44,7 @@ import org.opensearch.search.aggregations.InternalMultiBucketAggregation; import org.opensearch.search.aggregations.InternalOrder; import org.opensearch.search.aggregations.KeyComparable; +import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.bucket.IteratorAndCurrent; import org.opensearch.search.aggregations.bucket.MultiBucketsAggregation; @@ -226,6 +227,7 @@ public int hashCode() { private final boolean keyed; private final long minDocCount; final EmptyBucketInfo emptyBucketInfo; + private MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer; public InternalHistogram( String name, @@ -235,7 +237,8 @@ public InternalHistogram( EmptyBucketInfo emptyBucketInfo, DocValueFormat formatter, boolean keyed, - Map metadata + Map metadata, + MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer ) { super(name, metadata); this.buckets = buckets; @@ -245,6 +248,7 @@ public InternalHistogram( this.emptyBucketInfo = emptyBucketInfo; this.format = formatter; this.keyed = keyed; + this.multiBucketConsumer = multiBucketConsumer; } /** @@ -296,7 +300,7 @@ BucketOrder getOrder() { @Override public InternalHistogram create(List buckets) { - return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, format, keyed, metadata); + return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, format, keyed, metadata, multiBucketConsumer); } @Override @@ -414,6 +418,7 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { double key = nextKey(lastBucket.key); while (key < nextBucket.key) { iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs)); + multiBucketConsumer.accept(0); key = nextKey(key); } assert key == nextBucket.key || Double.isNaN(nextBucket.key) : "key: " + key + ", nextBucket.key: " + nextBucket.key; @@ -448,7 +453,7 @@ public InternalAggregation reduce(List aggregations, Reduce } } reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size()); - return new InternalHistogram(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, format, keyed, getMetadata()); + return new InternalHistogram(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, format, keyed, getMetadata(), multiBucketConsumer); } @Override @@ -489,7 +494,7 @@ public InternalAggregation createAggregation(List } } BucketOrder order = BucketOrder.key(randomBoolean()); - return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, format, keyed, metadata); + return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, format, keyed, metadata, ); } // issue 26787 @@ -211,6 +211,6 @@ protected InternalHistogram mutateInstance(InternalHistogram instance) { default: throw new AssertionError("Illegal randomisation branch"); } - return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, format, keyed, metadata); + return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, format, keyed, metadata, ); } } From b83fe2d8773f9f700f19f33a7d462b9edd2bb9f8 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Mon, 15 Jul 2024 08:48:45 -0700 Subject: [PATCH 2/5] use circuit breaker from reduce context Signed-off-by: bowenlan-amzn --- .../aggregations/bucket/BucketsAggregator.java | 2 +- .../histogram/AbstractHistogramAggregator.java | 5 ++--- .../bucket/histogram/InternalHistogram.java | 14 +++++--------- .../bucket/histogram/InternalHistogramTests.java | 4 ++-- 4 files changed, 10 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java index 9227380299b34..eef427754f535 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java @@ -70,7 +70,7 @@ public abstract class BucketsAggregator extends AggregatorBase { private final BigArrays bigArrays; - protected final IntConsumer multiBucketConsumer; + private final IntConsumer multiBucketConsumer; private LongArray docCounts; protected final DocCountProvider docCountProvider; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AbstractHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AbstractHistogramAggregator.java index 27e877ff0f64e..d3a4a51e5b6f2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AbstractHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AbstractHistogramAggregator.java @@ -40,7 +40,6 @@ import org.opensearch.search.aggregations.BucketOrder; import org.opensearch.search.aggregations.CardinalityUpperBound; import org.opensearch.search.aggregations.InternalAggregation; -import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.bucket.BucketsAggregator; import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; import org.opensearch.search.internal.SearchContext; @@ -122,7 +121,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I buildEmptySubAggregations() ); } - return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed, metadata(), (MultiBucketConsumerService.MultiBucketConsumer) multiBucketConsumer); + return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed, metadata()); }); } @@ -138,7 +137,7 @@ public InternalAggregation buildEmptyAggregation() { buildEmptySubAggregations() ); } - return new InternalHistogram(name, Collections.emptyList(), order, minDocCount, emptyBucketInfo, formatter, keyed, metadata(), (MultiBucketConsumerService.MultiBucketConsumer) multiBucketConsumer); + return new InternalHistogram(name, Collections.emptyList(), order, minDocCount, emptyBucketInfo, formatter, keyed, metadata()); } @Override diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/InternalHistogram.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/InternalHistogram.java index ca55ad3638c02..58f403f062c5c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -44,7 +44,6 @@ import org.opensearch.search.aggregations.InternalMultiBucketAggregation; import org.opensearch.search.aggregations.InternalOrder; import org.opensearch.search.aggregations.KeyComparable; -import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.bucket.IteratorAndCurrent; import org.opensearch.search.aggregations.bucket.MultiBucketsAggregation; @@ -227,7 +226,6 @@ public int hashCode() { private final boolean keyed; private final long minDocCount; final EmptyBucketInfo emptyBucketInfo; - private MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer; public InternalHistogram( String name, @@ -237,8 +235,7 @@ public InternalHistogram( EmptyBucketInfo emptyBucketInfo, DocValueFormat formatter, boolean keyed, - Map metadata, - MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer + Map metadata ) { super(name, metadata); this.buckets = buckets; @@ -248,7 +245,6 @@ public InternalHistogram( this.emptyBucketInfo = emptyBucketInfo; this.format = formatter; this.keyed = keyed; - this.multiBucketConsumer = multiBucketConsumer; } /** @@ -300,7 +296,7 @@ BucketOrder getOrder() { @Override public InternalHistogram create(List buckets) { - return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, format, keyed, metadata, multiBucketConsumer); + return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, format, keyed, metadata); } @Override @@ -418,7 +414,7 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { double key = nextKey(lastBucket.key); while (key < nextBucket.key) { iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs)); - multiBucketConsumer.accept(0); + reduceContext.consumeBucketsAndMaybeBreak(0); key = nextKey(key); } assert key == nextBucket.key || Double.isNaN(nextBucket.key) : "key: " + key + ", nextBucket.key: " + nextBucket.key; @@ -453,7 +449,7 @@ public InternalAggregation reduce(List aggregations, Reduce } } reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size()); - return new InternalHistogram(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, format, keyed, getMetadata(), multiBucketConsumer); + return new InternalHistogram(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, format, keyed, getMetadata()); } @Override @@ -494,7 +490,7 @@ public InternalAggregation createAggregation(List } } BucketOrder order = BucketOrder.key(randomBoolean()); - return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, format, keyed, metadata, ); + return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, format, keyed, metadata); } // issue 26787 @@ -211,6 +211,6 @@ protected InternalHistogram mutateInstance(InternalHistogram instance) { default: throw new AssertionError("Illegal randomisation branch"); } - return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, format, keyed, metadata, ); + return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, format, keyed, metadata); } } From 9fea950263a1faae9a3cb54360b94c3aae862674 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Mon, 15 Jul 2024 10:35:46 -0700 Subject: [PATCH 3/5] add test Signed-off-by: bowenlan-amzn --- .../bucket/histogram/InternalHistogram.java | 5 ++- .../histogram/InternalHistogramTests.java | 43 +++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/InternalHistogram.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/InternalHistogram.java index 58f403f062c5c..a988b911de5a3 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -395,6 +395,7 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { // fill with empty buckets for (double key = round(emptyBucketInfo.minBound); key <= emptyBucketInfo.maxBound; key = nextKey(key)) { iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs)); + reduceContext.consumeBucketsAndMaybeBreak(0); } } else { Bucket first = list.get(iter.nextIndex()); @@ -402,11 +403,12 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { // fill with empty buckets until the first key for (double key = round(emptyBucketInfo.minBound); key < first.key; key = nextKey(key)) { iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs)); + reduceContext.consumeBucketsAndMaybeBreak(0); } } // now adding the empty buckets within the actual data, - // e.g. if the data series is [1,2,3,7] there're 3 empty buckets that will be created for 4,5,6 + // e.g. if the data series is [1,2,3,7] there are 3 empty buckets that will be created for 4,5,6 Bucket lastBucket = null; do { Bucket nextBucket = list.get(iter.nextIndex()); @@ -425,6 +427,7 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { // finally, adding the empty buckets *after* the actual data (based on the extended_bounds.max requested by the user) for (double key = nextKey(lastBucket.key); key <= emptyBucketInfo.maxBound; key = nextKey(key)) { iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs)); + reduceContext.consumeBucketsAndMaybeBreak(0); } } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/InternalHistogramTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/InternalHistogramTests.java index 288b22ccfcc92..98c6ac2b3de45 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/InternalHistogramTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/InternalHistogramTests.java @@ -33,10 +33,15 @@ package org.opensearch.search.aggregations.bucket.histogram; import org.apache.lucene.tests.util.TestUtil; +import org.opensearch.core.common.breaker.CircuitBreaker; +import org.opensearch.core.common.breaker.CircuitBreakingException; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.BucketOrder; +import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.InternalAggregations; +import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.ParsedMultiBucketAggregation; +import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.test.InternalAggregationTestCase; import org.opensearch.test.InternalMultiBucketAggregationTestCase; @@ -47,6 +52,8 @@ import java.util.Map; import java.util.TreeMap; +import org.mockito.Mockito; + public class InternalHistogramTests extends InternalMultiBucketAggregationTestCase { private boolean keyed; @@ -123,6 +130,42 @@ public void testHandlesNaN() { ); } + public void testCircuitBreakerWhenAddEmptyBuckets() { + String name = randomAlphaOfLength(5); + double interval = 1; + double lowerBound = 1; + double upperBound = 1026; + List bucket1 = List.of( + new InternalHistogram.Bucket(lowerBound, 1, false, format, InternalAggregations.EMPTY) + ); + List bucket2 = List.of( + new InternalHistogram.Bucket(upperBound, 1, false, format, InternalAggregations.EMPTY) + ); + BucketOrder order = BucketOrder.key(true); + InternalHistogram.EmptyBucketInfo emptyBucketInfo = new InternalHistogram.EmptyBucketInfo( + interval, + 0, + lowerBound, + upperBound, + InternalAggregations.EMPTY + ); + InternalHistogram histogram1 = new InternalHistogram(name, bucket1, order, 0, emptyBucketInfo, format, false, null); + InternalHistogram histogram2 = new InternalHistogram(name, bucket2, order, 0, emptyBucketInfo, format, false, null); + + CircuitBreaker breaker = Mockito.mock(CircuitBreaker.class); + Mockito.when(breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets")).thenThrow(CircuitBreakingException.class); + + MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer(0, breaker); + InternalAggregation.ReduceContext reduceContext = InternalAggregation.ReduceContext.forFinalReduction( + null, + null, + bucketConsumer, + PipelineAggregator.PipelineTree.EMPTY + ); + expectThrows(CircuitBreakingException.class, () -> histogram1.reduce(List.of(histogram1, histogram2), reduceContext)); + Mockito.verify(breaker, Mockito.times(1)).addEstimateBytesAndMaybeBreak(0, "allocated_buckets"); + } + @Override protected void assertReduced(InternalHistogram reduced, List inputs) { TreeMap expectedCounts = new TreeMap<>(); From 4a0f60f6ee62e2f4d35531c54fdf262005dce4f7 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Mon, 15 Jul 2024 10:43:12 -0700 Subject: [PATCH 4/5] revert use_real_memory change in OpenSearchNode Signed-off-by: bowenlan-amzn --- .../java/org/opensearch/gradle/testclusters/OpenSearchNode.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java index a8f382cfc5e0f..268de50340cbf 100644 --- a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java +++ b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java @@ -1179,7 +1179,7 @@ private void createConfiguration() { // Temporarily disable the real memory usage circuit breaker. It depends on real memory usage which we have no full control // over and the REST client will not retry on circuit breaking exceptions yet (see #31986 for details). Once the REST client // can retry on circuit breaking exceptions, we can revert again to the default configuration. - baseConfig.put("indices.breaker.total.use_real_memory", "true"); + baseConfig.put("indices.breaker.total.use_real_memory", "false"); // Don't wait for state, just start up quickly. This will also allow new and old nodes in the BWC case to become the master baseConfig.put("discovery.initial_state_timeout", "0s"); From 124ab46ace5aeb6af49ea48003c6c72c5f62ffde Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Mon, 15 Jul 2024 10:46:16 -0700 Subject: [PATCH 5/5] add change log Signed-off-by: bowenlan-amzn --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c260f8be9ca3..5f3bb5f4ebe35 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,6 +68,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Refactoring FilterPath.parse by using an iterative approach ([#14200](https://github.com/opensearch-project/OpenSearch/pull/14200)) - Refactoring Grok.validatePatternBank by using an iterative approach ([#14206](https://github.com/opensearch-project/OpenSearch/pull/14206)) - Update help output for _cat ([#14722](https://github.com/opensearch-project/OpenSearch/pull/14722)) +- Use circuit breaker in InternalHistogram when adding empty buckets ([#14754](https://github.com/opensearch-project/OpenSearch/pull/14754)) ### Security