From 9a0a69fb530732e49834a2f2ddc3011cca82b26c Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Wed, 7 Feb 2024 10:20:00 -0800 Subject: [PATCH 1/5] Apply fast date histogram optimization at the segment level (#12073) --------- Signed-off-by: bowenlan-amzn --- .../aggregations/bucket/DateHistogramIT.java | 4 +- .../bucket/FastFilterRewriteHelper.java | 360 ++++++++++++------ .../bucket/composite/CompositeAggregator.java | 61 ++- .../AutoDateHistogramAggregator.java | 61 +-- .../histogram/DateHistogramAggregator.java | 33 +- .../composite/CompositeAggregatorTests.java | 69 ++++ .../DateHistogramAggregatorTests.java | 208 +++++++++- .../BaseCompositeAggregatorTestCase.java | 8 + 8 files changed, 624 insertions(+), 180 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/DateHistogramIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/DateHistogramIT.java index 64c9c792b866a..6a15490cbfe63 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/DateHistogramIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/DateHistogramIT.java @@ -177,9 +177,9 @@ public void setupSuiteScopeCluster() throws Exception { indexDoc(2, 15, 3), // date: Feb 15, dates: Feb 15, Mar 16 indexDoc(3, 2, 4), // date: Mar 2, dates: Mar 2, Apr 3 indexDoc(3, 15, 5), // date: Mar 15, dates: Mar 15, Apr 16 - indexDoc(3, 23, 6) + indexDoc(3, 23, 6) // date: Mar 23, dates: Mar 23, Apr 24 ) - ); // date: Mar 23, dates: Mar 23, Apr 24 + ); indexRandom(true, builders); ensureSearchable(); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index f377287d0b3bd..6f1cc901e2d82 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -8,9 +8,15 @@ package org.opensearch.search.aggregations.bucket; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.DocValues; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.PointValues; import org.apache.lucene.search.ConstantScoreQuery; +import org.apache.lucene.search.FieldExistsQuery; import org.apache.lucene.search.IndexOrDocValuesQuery; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.PointRangeQuery; @@ -18,16 +24,15 @@ import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Weight; import org.apache.lucene.util.NumericUtils; -import org.opensearch.common.CheckedFunction; import org.opensearch.common.Rounding; import org.opensearch.common.lucene.search.function.FunctionScoreQuery; import org.opensearch.index.mapper.DateFieldMapper; +import org.opensearch.index.mapper.DocCountFieldMapper; import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.query.DateRangeIncludingNowQuery; -import org.opensearch.search.DocValueFormat; -import org.opensearch.search.aggregations.bucket.composite.CompositeKey; import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceConfig; import org.opensearch.search.aggregations.bucket.composite.RoundingValuesSource; +import org.opensearch.search.aggregations.bucket.histogram.LongBounds; import org.opensearch.search.internal.SearchContext; import java.io.IOException; @@ -37,7 +42,8 @@ import java.util.OptionalLong; import java.util.function.BiConsumer; import java.util.function.Function; -import java.util.function.Supplier; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; /** * Utility class to help rewrite aggregations into filters. @@ -55,6 +61,8 @@ public final class FastFilterRewriteHelper { private FastFilterRewriteHelper() {} + private static final Logger logger = LogManager.getLogger(FastFilterRewriteHelper.class); + private static final int MAX_NUM_FILTER_BUCKETS = 1024; private static final Map, Function> queryWrappers; @@ -80,13 +88,13 @@ private static Query unwrapIntoConcreteQuery(Query query) { } /** - * Finds the min and max bounds of field values for the shard + * Finds the global min and max bounds of the field for the shard across all segments + * + * @return null if the field is empty or not indexed */ - private static long[] getIndexBounds(final SearchContext context, final String fieldName) throws IOException { + private static long[] getShardBounds(final SearchContext context, final String fieldName) throws IOException { final List leaves = context.searcher().getIndexReader().leaves(); long min = Long.MAX_VALUE, max = Long.MIN_VALUE; - // Since the query does not specify bounds for aggregation, we can - // build the global min/max from local min/max within each segment for (LeafReaderContext leaf : leaves) { final PointValues values = leaf.reader().getPointValues(fieldName); if (values != null) { @@ -95,51 +103,80 @@ private static long[] getIndexBounds(final SearchContext context, final String f } } - if (min == Long.MAX_VALUE || max == Long.MIN_VALUE) return null; + if (min == Long.MAX_VALUE || max == Long.MIN_VALUE) { + return null; + } + return new long[] { min, max }; + } + /** + * Finds the min and max bounds of the field for the segment + * + * @return null if the field is empty or not indexed + */ + private static long[] getSegmentBounds(final LeafReaderContext context, final String fieldName) throws IOException { + long min = Long.MAX_VALUE, max = Long.MIN_VALUE; + final PointValues values = context.reader().getPointValues(fieldName); + if (values != null) { + min = Math.min(min, NumericUtils.sortableBytesToLong(values.getMinPackedValue(), 0)); + max = Math.max(max, NumericUtils.sortableBytesToLong(values.getMaxPackedValue(), 0)); + } + + if (min == Long.MAX_VALUE || max == Long.MIN_VALUE) { + return null; + } return new long[] { min, max }; } /** - * This method also acts as a pre-condition check for the optimization, - * returns null if the optimization cannot be applied + * This method also acts as a pre-condition check for the optimization + * + * @return null if the processed query not as expected */ - public static long[] getAggregationBounds(final SearchContext context, final String fieldName) throws IOException { + public static long[] getDateHistoAggBounds(final SearchContext context, final String fieldName) throws IOException { final Query cq = unwrapIntoConcreteQuery(context.query()); - final long[] indexBounds = getIndexBounds(context, fieldName); if (cq instanceof PointRangeQuery) { final PointRangeQuery prq = (PointRangeQuery) cq; - // Ensure that the query and aggregation are on the same field - if (prq.getField().equals(fieldName)) { - return new long[] { - // Minimum bound for aggregation is the max between query and global - Math.max(NumericUtils.sortableBytesToLong(prq.getLowerPoint(), 0), indexBounds[0]), - // Maximum bound for aggregation is the min between query and global - Math.min(NumericUtils.sortableBytesToLong(prq.getUpperPoint(), 0), indexBounds[1]) }; - } + final long[] indexBounds = getShardBounds(context, fieldName); + if (indexBounds == null) return null; + return getBoundsWithRangeQuery(prq, fieldName, indexBounds); } else if (cq instanceof MatchAllDocsQuery) { - return indexBounds; + return getShardBounds(context, fieldName); + } else if (cq instanceof FieldExistsQuery) { + // when a range query covers all values of a shard, it will be rewrite field exists query + if (((FieldExistsQuery) cq).getField().equals(fieldName)) { + return getShardBounds(context, fieldName); + } } - // Check if the top-level query (which may be a PRQ on another field) is functionally match-all - Weight weight = context.searcher().createWeight(context.query(), ScoreMode.COMPLETE_NO_SCORES, 1f); - for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) { - if (weight.count(ctx) != ctx.reader().numDocs()) { + + return null; + } + + private static long[] getBoundsWithRangeQuery(PointRangeQuery prq, String fieldName, long[] indexBounds) { + // Ensure that the query and aggregation are on the same field + if (prq.getField().equals(fieldName)) { + // Minimum bound for aggregation is the max between query and global + long lower = Math.max(NumericUtils.sortableBytesToLong(prq.getLowerPoint(), 0), indexBounds[0]); + // Maximum bound for aggregation is the min between query and global + long upper = Math.min(NumericUtils.sortableBytesToLong(prq.getUpperPoint(), 0), indexBounds[1]); + if (lower > upper) { return null; } + return new long[] { lower, upper }; } - return indexBounds; + + return null; } /** * Creates the date range filters for aggregations using the interval, min/max - * bounds and the rounding values + * bounds and prepared rounding */ private static Weight[] createFilterForAggregations( final SearchContext context, + final DateFieldMapper.DateFieldType fieldType, final long interval, final Rounding.Prepared preparedRounding, - final String field, - final DateFieldMapper.DateFieldType fieldType, long low, final long high ) throws IOException { @@ -149,7 +186,10 @@ private static Weight[] createFilterForAggregations( int bucketCount = 0; while (roundedLow <= fieldType.convertNanosToMillis(high)) { bucketCount++; - if (bucketCount > MAX_NUM_FILTER_BUCKETS) return null; + if (bucketCount > MAX_NUM_FILTER_BUCKETS) { + logger.debug("Max number of filters reached [{}], skip the fast filter optimization", MAX_NUM_FILTER_BUCKETS); + return null; + } // Below rounding is needed as the interval could return in // non-rounded values for something like calendar month roundedLow = preparedRounding.round(roundedLow + interval); @@ -176,10 +216,10 @@ private static Weight[] createFilterForAggregations( // is included in the next bucket fieldType.convertRoundedMillisToNanos(roundedLow) - 1, upper, 0); - filters[i++] = context.searcher().createWeight(new PointRangeQuery(field, lower, upper, 1) { + filters[i++] = context.searcher().createWeight(new PointRangeQuery(fieldType.name(), lower, upper, 1) { @Override protected String toString(int dimension, byte[] value) { - return null; + return Long.toString(LongPoint.decodeDimension(value, 0)); } }, ScoreMode.COMPLETE_NO_SCORES, 1); } @@ -189,16 +229,24 @@ protected String toString(int dimension, byte[] value) { } /** - * Context object to do fast filter optimization + * Context object for fast filter optimization + *

+ * Usage: first set aggregation type, then check isRewriteable, then buildFastFilter */ public static class FastFilterContext { + private boolean rewriteable = false; private Weight[] filters = null; - public AggregationType aggregationType; + private boolean filtersBuiltAtShardLevel = false; - public FastFilterContext() {} + private AggregationType aggregationType; + private final SearchContext context; - private void setFilters(Weight[] filters) { - this.filters = filters; + public FastFilterContext(SearchContext context) { + this.context = context; + } + + public AggregationType getAggregationType() { + return aggregationType; } public void setAggregationType(AggregationType aggregationType) { @@ -206,119 +254,145 @@ public void setAggregationType(AggregationType aggregationType) { } public boolean isRewriteable(final Object parent, final int subAggLength) { - return aggregationType.isRewriteable(parent, subAggLength); + boolean rewriteable = aggregationType.isRewriteable(parent, subAggLength); + logger.debug("Fast filter rewriteable: {} for shard {}", rewriteable, context.indexShard().shardId()); + this.rewriteable = rewriteable; + return rewriteable; } - /** - * This filter build method is for date histogram aggregation type - * - * @param computeBounds get the lower and upper bound of the field in a shard search - * @param roundingFunction produce Rounding that contains interval of date range. - * Rounding is computed dynamically using the bounds in AutoDateHistogram - * @param preparedRoundingSupplier produce PreparedRounding to round values at call-time - */ - public void buildFastFilter( - SearchContext context, - CheckedFunction computeBounds, - Function roundingFunction, - Supplier preparedRoundingSupplier - ) throws IOException { - assert this.aggregationType instanceof DateHistogramAggregationType; - DateHistogramAggregationType aggregationType = (DateHistogramAggregationType) this.aggregationType; - DateFieldMapper.DateFieldType fieldType = aggregationType.getFieldType(); - final long[] bounds = computeBounds.apply(aggregationType); - if (bounds == null) return; - - final Rounding rounding = roundingFunction.apply(bounds); - final OptionalLong intervalOpt = Rounding.getInterval(rounding); - if (intervalOpt.isEmpty()) return; - final long interval = intervalOpt.getAsLong(); - - // afterKey is the last bucket key in previous response, while the bucket key - // is the start of the bucket values, so add the interval - if (aggregationType instanceof CompositeAggregationType && ((CompositeAggregationType) aggregationType).afterKey != -1) { - bounds[0] = ((CompositeAggregationType) aggregationType).afterKey + interval; + public void buildFastFilter() throws IOException { + assert filters == null : "Filters should only be built once, but they are already built"; + this.filters = this.aggregationType.buildFastFilter(context); + if (filters != null) { + logger.debug("Fast filter built for shard {}", context.indexShard().shardId()); + filtersBuiltAtShardLevel = true; } + } - final Weight[] filters = FastFilterRewriteHelper.createFilterForAggregations( - context, - interval, - preparedRoundingSupplier.get(), - fieldType.name(), - fieldType, - bounds[0], - bounds[1] - ); - this.setFilters(filters); + public void buildFastFilter(LeafReaderContext leaf) throws IOException { + assert filters == null : "Filters should only be built once, but they are already built"; + this.filters = this.aggregationType.buildFastFilter(leaf, context); + if (filters != null) { + logger.debug("Fast filter built for shard {} segment {}", context.indexShard().shardId(), leaf.ord); + } } } /** * Different types have different pre-conditions, filter building logic, etc. */ - public interface AggregationType { + interface AggregationType { + boolean isRewriteable(Object parent, int subAggLength); + + Weight[] buildFastFilter(SearchContext ctx) throws IOException; + + Weight[] buildFastFilter(LeafReaderContext leaf, SearchContext ctx) throws IOException; + + default int getSize() { + return Integer.MAX_VALUE; + } } /** * For date histogram aggregation */ - public static class DateHistogramAggregationType implements AggregationType { + public static abstract class AbstractDateHistogramAggregationType implements AggregationType { private final MappedFieldType fieldType; private final boolean missing; private final boolean hasScript; + private LongBounds hardBounds; - public DateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) { + public AbstractDateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) { this.fieldType = fieldType; this.missing = missing; this.hasScript = hasScript; } + public AbstractDateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript, LongBounds hardBounds) { + this(fieldType, missing, hasScript); + this.hardBounds = hardBounds; + } + @Override public boolean isRewriteable(Object parent, int subAggLength) { if (parent == null && subAggLength == 0 && !missing && !hasScript) { - return fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType; + if (fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType) { + return fieldType.isSearchable(); + } } return false; } - public DateFieldMapper.DateFieldType getFieldType() { - assert fieldType instanceof DateFieldMapper.DateFieldType; - return (DateFieldMapper.DateFieldType) fieldType; + @Override + public Weight[] buildFastFilter(SearchContext context) throws IOException { + long[] bounds = getDateHistoAggBounds(context, fieldType.name()); + logger.debug("Bounds are {} for shard {}", bounds, context.indexShard().shardId()); + return buildFastFilter(context, bounds); } - } - /** - * For composite aggregation with date histogram as a source - */ - public static class CompositeAggregationType extends DateHistogramAggregationType { - private final RoundingValuesSource valuesSource; - private long afterKey = -1L; - private final int size; - - public CompositeAggregationType( - CompositeValuesSourceConfig[] sourceConfigs, - CompositeKey rawAfterKey, - List formats, - int size - ) { - super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript()); - this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource(); - this.size = size; - if (rawAfterKey != null) { - assert rawAfterKey.size() == 1 && formats.size() == 1; - this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> { - throw new IllegalArgumentException("now() is not supported in [after] key"); - }); + @Override + public Weight[] buildFastFilter(LeafReaderContext leaf, SearchContext context) throws IOException { + long[] bounds = getSegmentBounds(leaf, fieldType.name()); + logger.debug("Bounds are {} for shard {} segment {}", bounds, context.indexShard().shardId(), leaf.ord); + return buildFastFilter(context, bounds); + } + + private Weight[] buildFastFilter(SearchContext context, long[] bounds) throws IOException { + bounds = processHardBounds(bounds); + logger.debug("Bounds are {} for shard {} with hard bound", bounds, context.indexShard().shardId()); + if (bounds == null) { + return null; + } + assert bounds[0] <= bounds[1] : "Low bound should be less than high bound"; + + final Rounding rounding = getRounding(bounds[0], bounds[1]); + final OptionalLong intervalOpt = Rounding.getInterval(rounding); + if (intervalOpt.isEmpty()) { + return null; } + final long interval = intervalOpt.getAsLong(); + + // process the after key of composite agg + processAfterKey(bounds, interval); + + return FastFilterRewriteHelper.createFilterForAggregations( + context, + (DateFieldMapper.DateFieldType) fieldType, + interval, + getRoundingPrepared(), + bounds[0], + bounds[1] + ); } - public Rounding getRounding() { - return valuesSource.getRounding(); + protected abstract Rounding getRounding(final long low, final long high); + + protected abstract Rounding.Prepared getRoundingPrepared(); + + protected void processAfterKey(long[] bound, long interval) {} + + protected long[] processHardBounds(long[] bounds) { + if (bounds != null) { + // Update min/max limit if user specified any hard bounds + if (hardBounds != null) { + if (hardBounds.getMin() > bounds[0]) { + bounds[0] = hardBounds.getMin(); + } + if (hardBounds.getMax() - 1 < bounds[1]) { + bounds[1] = hardBounds.getMax() - 1; // hard bounds max is exclusive + } + if (bounds[0] > bounds[1]) { + return null; + } + } + } + return bounds; } - public Rounding.Prepared getRoundingPreparer() { - return valuesSource.getPreparedRounding(); + public DateFieldMapper.DateFieldType getFieldType() { + assert fieldType instanceof DateFieldMapper.DateFieldType; + return (DateFieldMapper.DateFieldType) fieldType; } } @@ -335,7 +409,9 @@ public static long getBucketOrd(long bucketOrd) { } /** - * This is executed for each segment by passing the leaf reader context + * Try to get the bucket doc counts from the fast filters for the aggregation + *

+ * Usage: invoked at segment level — in getLeafCollector of aggregator * * @param incrementDocCount takes in the bucket key value and the bucket count */ @@ -345,9 +421,39 @@ public static boolean tryFastFilterAggregation( final BiConsumer incrementDocCount ) throws IOException { if (fastFilterContext == null) return false; - if (fastFilterContext.filters == null) return false; + if (!fastFilterContext.rewriteable) { + return false; + } + + NumericDocValues docCountValues = DocValues.getNumeric(ctx.reader(), DocCountFieldMapper.NAME); + if (docCountValues.nextDoc() != NO_MORE_DOCS) { + logger.debug( + "Shard {} segment {} has at least one document with _doc_count field, skip fast filter optimization", + fastFilterContext.context.indexShard().shardId(), + ctx.ord + ); + return false; + } + + // if no filters built at shard level (see getDateHistoAggBounds method for possible reasons) + // check if the query is functionally match-all at segment level + if (!fastFilterContext.filtersBuiltAtShardLevel && !segmentMatchAll(fastFilterContext.context, ctx)) { + return false; + } + Weight[] filters = fastFilterContext.filters; + if (filters == null) { + logger.debug( + "Shard {} segment {} functionally match all documents. Build the fast filter", + fastFilterContext.context.indexShard().shardId(), + ctx.ord + ); + fastFilterContext.buildFastFilter(ctx); + filters = fastFilterContext.filters; + if (filters == null) { + return false; + } + } - final Weight[] filters = fastFilterContext.filters; final int[] counts = new int[filters.length]; int i; for (i = 0; i < filters.length; i++) { @@ -360,26 +466,34 @@ public static boolean tryFastFilterAggregation( } int s = 0; - int size = Integer.MAX_VALUE; + int size = fastFilterContext.aggregationType.getSize(); for (i = 0; i < filters.length; i++) { if (counts[i] > 0) { long bucketKey = i; // the index of filters is the key for filters aggregation - if (fastFilterContext.aggregationType instanceof DateHistogramAggregationType) { - final DateFieldMapper.DateFieldType fieldType = ((DateHistogramAggregationType) fastFilterContext.aggregationType) - .getFieldType(); + if (fastFilterContext.aggregationType instanceof AbstractDateHistogramAggregationType) { + final DateFieldMapper.DateFieldType fieldType = + ((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType).getFieldType(); bucketKey = fieldType.convertNanosToMillis( NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0) ); - if (fastFilterContext.aggregationType instanceof CompositeAggregationType) { - size = ((CompositeAggregationType) fastFilterContext.aggregationType).size; - } } incrementDocCount.accept(bucketKey, counts[i]); s++; - if (s > size) return true; + if (s > size) { + logger.debug("Fast filter optimization applied to composite aggregation with size {}", size); + return true; + } } } + logger.debug("Fast filter optimization applied"); return true; } + + private static boolean segmentMatchAll(SearchContext ctx, LeafReaderContext leafCtx) throws IOException { + Weight weight = ctx.searcher().createWeight(ctx.query(), ScoreMode.COMPLETE_NO_SCORES, 1f); + assert weight != null; + int count = weight.count(leafCtx); + return count > 0 && count == leafCtx.reader().numDocs(); + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index 822b8a6c4b118..b97c814cdf645 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -164,24 +164,55 @@ final class CompositeAggregator extends BucketsAggregator { this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey); this.rawAfterKey = rawAfterKey; - fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(); + fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context); if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) return; - fastFilterContext.setAggregationType( - new FastFilterRewriteHelper.CompositeAggregationType(sourceConfigs, rawAfterKey, formats, size) - ); + fastFilterContext.setAggregationType(new CompositeAggregationType()); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - // bucketOrds is the data structure for saving date histogram results + // bucketOrds is used for saving date histogram results bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE); - // Currently the filter rewrite is only supported for date histograms - FastFilterRewriteHelper.CompositeAggregationType aggregationType = - (FastFilterRewriteHelper.CompositeAggregationType) fastFilterContext.aggregationType; - preparedRounding = aggregationType.getRoundingPreparer(); - fastFilterContext.buildFastFilter( - context, - fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()), - x -> aggregationType.getRounding(), - () -> preparedRounding - ); + preparedRounding = ((CompositeAggregationType) fastFilterContext.getAggregationType()).getRoundingPrepared(); + fastFilterContext.buildFastFilter(); + } + } + + /** + * Currently the filter rewrite is only supported for date histograms + */ + private class CompositeAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType { + private final RoundingValuesSource valuesSource; + private long afterKey = -1L; + + public CompositeAggregationType() { + super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript()); + this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource(); + if (rawAfterKey != null) { + assert rawAfterKey.size() == 1 && formats.size() == 1; + this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> { + throw new IllegalArgumentException("now() is not supported in [after] key"); + }); + } + } + + public Rounding getRounding(final long low, final long high) { + return valuesSource.getRounding(); + } + + public Rounding.Prepared getRoundingPrepared() { + return valuesSource.getPreparedRounding(); + } + + @Override + protected void processAfterKey(long[] bound, long interval) { + // afterKey is the last bucket key in previous response, and the bucket key + // is the minimum of all values in the bucket, so need to add the interval + if (afterKey != -1L) { + bound[0] = afterKey + interval; + } + } + + @Override + public int getSize() { + return size; } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 0ea820abbedf4..12aefc540e75c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -42,6 +42,7 @@ import org.opensearch.common.util.IntArray; import org.opensearch.common.util.LongArray; import org.opensearch.core.common.util.ByteArray; +import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; @@ -156,45 +157,53 @@ private AutoDateHistogramAggregator( this.roundingPreparer = roundingPreparer; this.preparedRounding = prepareRounding(0); - fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(); + fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context); fastFilterContext.setAggregationType( - new FastFilterRewriteHelper.DateHistogramAggregationType( + new AutoHistogramAggregationType( valuesSourceConfig.fieldType(), valuesSourceConfig.missing() != null, valuesSourceConfig.script() != null ) ); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - fastFilterContext.buildFastFilter( - context, - fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()), - b -> getMinimumRounding(b[0], b[1]), - // Passing prepared rounding as supplier to ensure the correct prepared - // rounding is set as it is done during getMinimumRounding - () -> preparedRounding - ); + fastFilterContext.buildFastFilter(); } } - private Rounding getMinimumRounding(final long low, final long high) { - // max - min / targetBuckets = bestDuration - // find the right innerInterval this bestDuration belongs to - // since we cannot exceed targetBuckets, bestDuration should go up, - // so the right innerInterval should be an upper bound - long bestDuration = (high - low) / targetBuckets; - while (roundingIdx < roundingInfos.length - 1) { - final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx]; - final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1]; - // If the interval duration is covered by the maximum inner interval, - // we can start with this outer interval for creating the buckets - if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) { - break; + private class AutoHistogramAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType { + + public AutoHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) { + super(fieldType, missing, hasScript); + } + + @Override + protected Rounding getRounding(final long low, final long high) { + // max - min / targetBuckets = bestDuration + // find the right innerInterval this bestDuration belongs to + // since we cannot exceed targetBuckets, bestDuration should go up, + // so the right innerInterval should be an upper bound + long bestDuration = (high - low) / targetBuckets; + // reset so this function is idempotent + roundingIdx = 0; + while (roundingIdx < roundingInfos.length - 1) { + final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx]; + final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1]; + // If the interval duration is covered by the maximum inner interval, + // we can start with this outer interval for creating the buckets + if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) { + break; + } + roundingIdx++; } - roundingIdx++; + + preparedRounding = prepareRounding(roundingIdx); + return roundingInfos[roundingIdx].rounding; } - preparedRounding = prepareRounding(roundingIdx); - return roundingInfos[roundingIdx].rounding; + @Override + protected Prepared getRoundingPrepared() { + return preparedRounding; + } } protected abstract LongKeyedBucketOrds getBucketOrds(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index b95bd093b82a6..0e830106c8284 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -39,6 +39,7 @@ import org.opensearch.common.Nullable; import org.opensearch.common.Rounding; import org.opensearch.common.lease.Releasables; +import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; @@ -115,29 +116,35 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality); - fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(); + fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context); fastFilterContext.setAggregationType( - new FastFilterRewriteHelper.DateHistogramAggregationType( + new DateHistogramAggregationType( valuesSourceConfig.fieldType(), valuesSourceConfig.missing() != null, - valuesSourceConfig.script() != null + valuesSourceConfig.script() != null, + hardBounds ) ); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - fastFilterContext.buildFastFilter(context, this::computeBounds, x -> rounding, () -> preparedRounding); + fastFilterContext.buildFastFilter(); } } - private long[] computeBounds(final FastFilterRewriteHelper.DateHistogramAggregationType fieldContext) throws IOException { - final long[] bounds = FastFilterRewriteHelper.getAggregationBounds(context, fieldContext.getFieldType().name()); - if (bounds != null) { - // Update min/max limit if user specified any hard bounds - if (hardBounds != null) { - bounds[0] = Math.max(bounds[0], hardBounds.getMin()); - bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive - } + private class DateHistogramAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType { + + public DateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript, LongBounds hardBounds) { + super(fieldType, missing, hasScript, hardBounds); + } + + @Override + protected Rounding getRounding(long low, long high) { + return rounding; + } + + @Override + protected Rounding.Prepared getRoundingPrepared() { + return preparedRounding; } - return bounds; } @Override diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java index bbe27eb573b64..13a3d8145743b 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java @@ -35,6 +35,7 @@ import org.apache.lucene.document.LongPoint; import org.apache.lucene.index.Term; import org.apache.lucene.search.DocValuesFieldExistsQuery; +import org.apache.lucene.search.FieldExistsQuery; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.TermQuery; import org.opensearch.OpenSearchParseException; @@ -1256,6 +1257,74 @@ public void testMultiValuedWithKeywordLongAndDouble() throws Exception { ); } + public void testDateHistogramSourceWithSize() throws IOException { + final List>> dataset = new ArrayList<>( + Arrays.asList( + createDocument("date", asLong("2017-10-20T03:08:45")), + createDocument("date", asLong("2016-09-20T09:00:34")), + createDocument("date", asLong("2016-09-20T11:34:00")), + createDocument("date", asLong("2017-10-20T06:09:24")), + createDocument("date", asLong("2017-10-19T06:09:24")), + createDocument("long", 4L) + ) + ); + testSearchCase( + Arrays.asList( + new MatchAllDocsQuery(), + new FieldExistsQuery("date"), + LongPoint.newRangeQuery("date", asLong("2016-09-20T09:00:34"), asLong("2017-10-20T06:09:24")) + ), + dataset, + () -> { + DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date").field("date") + .calendarInterval(DateHistogramInterval.days(1)); + return new CompositeAggregationBuilder("name", Collections.singletonList(histo)).size(1); + }, + (result) -> { + assertEquals(1, result.getBuckets().size()); + assertEquals("{date=1474329600000}", result.afterKey().toString()); // 2017-10-20T00:00:00 + assertEquals("{date=1474329600000}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(2L, result.getBuckets().get(0).getDocCount()); + } + ); + } + + public void testDateHistogramSourceWithDocCountField() throws IOException { + final List>> dataset = new ArrayList<>( + Arrays.asList( + createDocument("date", asLong("2017-10-20T03:08:45"), "_doc_count", 5), + createDocument("date", asLong("2016-09-20T09:00:34")), + createDocument("date", asLong("2016-09-20T11:34:00"), "_doc_count", 2), + createDocument("date", asLong("2017-10-20T06:09:24")), + createDocument("date", asLong("2017-10-19T06:09:24"), "_doc_count", 3), + createDocument("long", 4L) + ) + ); + testSearchCase( + Arrays.asList( + new MatchAllDocsQuery(), + new FieldExistsQuery("date"), + LongPoint.newRangeQuery("date", asLong("2016-09-20T09:00:34"), asLong("2017-10-20T06:09:24")) + ), + dataset, + () -> { + DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date").field("date") + .calendarInterval(DateHistogramInterval.days(1)); + return new CompositeAggregationBuilder("name", Collections.singletonList(histo)); + }, + (result) -> { + assertEquals(3, result.getBuckets().size()); + assertEquals("{date=1508457600000}", result.afterKey().toString()); + assertEquals("{date=1474329600000}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(3L, result.getBuckets().get(0).getDocCount()); + assertEquals("{date=1508371200000}", result.getBuckets().get(1).getKeyAsString()); + assertEquals(3L, result.getBuckets().get(1).getDocCount()); + assertEquals("{date=1508457600000}", result.getBuckets().get(2).getKeyAsString()); + assertEquals(6L, result.getBuckets().get(2).getDocCount()); + } + ); + } + public void testWithDateHistogram() throws IOException { final List>> dataset = new ArrayList<>(); dataset.addAll( diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index bca6623e66104..2a4fbca7a8541 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -34,6 +34,7 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; @@ -45,6 +46,7 @@ import org.apache.lucene.tests.index.RandomIndexWriter; import org.opensearch.common.time.DateFormatters; import org.opensearch.index.mapper.DateFieldMapper; +import org.opensearch.index.mapper.DocCountFieldMapper; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.BucketOrder; import org.opensearch.search.aggregations.bucket.terms.StringTerms; @@ -1178,6 +1180,181 @@ public void testOverlappingBounds() { ); } + public void testHardBoundsNotOverlapping() throws IOException { + testSearchCase( + new MatchAllDocsQuery(), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) + .hardBounds(new LongBounds("2018-01-01", "2020-01-01")) + .field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(0, buckets.size()); + }, + false + ); + + testSearchCase( + new MatchAllDocsQuery(), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) + .hardBounds(new LongBounds("2016-01-01", "2017-01-01")) + .field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(0, buckets.size()); + }, + false + ); + + testSearchCase( + new MatchAllDocsQuery(), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) + .hardBounds(new LongBounds("2016-01-01", "2017-02-03")) + .field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(2, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-02T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + }, + false + ); + + testSearchCase( + new MatchAllDocsQuery(), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) + .hardBounds(new LongBounds("2017-02-03", "2020-01-01")) + .field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(3, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-03T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-04T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-05T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + }, + false + ); + } + + public void testFilterRewriteOptimizationWithRangeQuery() throws IOException { + testSearchCase( + LongPoint.newRangeQuery(AGGREGABLE_DATE, asLong("2018-01-01"), asLong("2020-01-01")), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(0, buckets.size()); + }, + 10000, + false, + false, + true // force AGGREGABLE_DATE field to be searchable to test the filter rewrite optimization path + ); + + testSearchCase( + LongPoint.newRangeQuery(AGGREGABLE_DATE, asLong("2016-01-01"), asLong("2017-01-01")), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(0, buckets.size()); + }, + 10000, + false, + false, + true + ); + + testSearchCase( + LongPoint.newRangeQuery(AGGREGABLE_DATE, asLong("2016-01-01"), asLong("2017-02-02")), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(2, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-02T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + }, + 10000, + false, + false, + true + ); + + testSearchCase( + LongPoint.newRangeQuery(AGGREGABLE_DATE, asLong("2017-02-03"), asLong("2020-01-01")), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(3, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-03T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-04T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-05T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + }, + 10000, + false, + false, + true + ); + } + + public void testDocCountField() throws IOException { + testSearchCase( + new MatchAllDocsQuery(), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(2, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(5, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-02T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + }, + 10000, + false, + true + ); + } + public void testIllegalInterval() throws IOException { IllegalArgumentException e = expectThrows( IllegalArgumentException.class, @@ -1211,13 +1388,42 @@ private void testSearchCase( int maxBucket, boolean useNanosecondResolution ) throws IOException { - boolean aggregableDateIsSearchable = randomBoolean(); + testSearchCase(query, dataset, configure, verify, maxBucket, useNanosecondResolution, false); + } + + private void testSearchCase( + Query query, + List dataset, + Consumer configure, + Consumer verify, + int maxBucket, + boolean useNanosecondResolution, + boolean useDocCountField + ) throws IOException { + testSearchCase(query, dataset, configure, verify, maxBucket, useNanosecondResolution, useDocCountField, randomBoolean()); + } + + private void testSearchCase( + Query query, + List dataset, + Consumer configure, + Consumer verify, + int maxBucket, + boolean useNanosecondResolution, + boolean useDocCountField, + boolean aggregableDateIsSearchable + ) throws IOException { + logger.debug("Aggregable date is searchable {}", aggregableDateIsSearchable); DateFieldMapper.DateFieldType fieldType = aggregableDateFieldType(useNanosecondResolution, aggregableDateIsSearchable); try (Directory directory = newDirectory()) { try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { Document document = new Document(); + if (useDocCountField) { + // add the doc count field to the first document + document.add(new NumericDocValuesField(DocCountFieldMapper.NAME, 5)); + } for (String date : dataset) { long instant = asLong(date, fieldType); document.add(new SortedNumericDocValuesField(AGGREGABLE_DATE, instant)); diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/composite/BaseCompositeAggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/composite/BaseCompositeAggregatorTestCase.java index 16abf2e255b5d..466e4d1bf1742 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/composite/BaseCompositeAggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/composite/BaseCompositeAggregatorTestCase.java @@ -14,6 +14,7 @@ import org.apache.lucene.document.InetAddressPoint; import org.apache.lucene.document.IntPoint; import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.document.StringField; @@ -40,6 +41,7 @@ import org.opensearch.core.index.Index; import org.opensearch.index.IndexSettings; import org.opensearch.index.mapper.DateFieldMapper; +import org.opensearch.index.mapper.DocCountFieldMapper; import org.opensearch.index.mapper.DocumentMapper; import org.opensearch.index.mapper.IpFieldMapper; import org.opensearch.index.mapper.KeywordFieldMapper; @@ -204,6 +206,12 @@ protected void addToDocument(int id, Document doc, Map> key doc.add(new StringField("id", Integer.toString(id), Field.Store.NO)); for (Map.Entry> entry : keys.entrySet()) { final String name = entry.getKey(); + if (name.equals(DocCountFieldMapper.NAME)) { + doc.add(new IntPoint(name, (int) entry.getValue().get(0))); + // doc count field should be DocValuesType.NUMERIC + doc.add(new NumericDocValuesField(name, (int) entry.getValue().get(0))); + continue; + } for (Object value : entry.getValue()) { if (value instanceof Integer) { doc.add(new SortedNumericDocValuesField(name, (int) value)); From 26cec18e8b44394a2e50142fcfc7f7ad19dd703f Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Wed, 7 Feb 2024 13:40:48 -0500 Subject: [PATCH 2/5] Move 2.x version to 2.13.0 (#12227) (#12230) Signed-off-by: Andriy Redko --- .ci/bwcVersions | 1 + libs/core/src/main/java/org/opensearch/Version.java | 1 + 2 files changed, 2 insertions(+) diff --git a/.ci/bwcVersions b/.ci/bwcVersions index 6a5db93053e3b..793f9cc7125ee 100644 --- a/.ci/bwcVersions +++ b/.ci/bwcVersions @@ -28,3 +28,4 @@ BWC_VERSION: - "2.11.1" - "2.11.2" - "2.12.0" + - "2.13.0" diff --git a/libs/core/src/main/java/org/opensearch/Version.java b/libs/core/src/main/java/org/opensearch/Version.java index 307da89c18d48..5038bb90cc18d 100644 --- a/libs/core/src/main/java/org/opensearch/Version.java +++ b/libs/core/src/main/java/org/opensearch/Version.java @@ -99,6 +99,7 @@ public class Version implements Comparable, ToXContentFragment { public static final Version V_2_11_1 = new Version(2110199, org.apache.lucene.util.Version.LUCENE_9_7_0); public static final Version V_2_11_2 = new Version(2110299, org.apache.lucene.util.Version.LUCENE_9_7_0); public static final Version V_2_12_0 = new Version(2120099, org.apache.lucene.util.Version.LUCENE_9_9_2); + public static final Version V_2_13_0 = new Version(2130099, org.apache.lucene.util.Version.LUCENE_9_9_2); public static final Version V_3_0_0 = new Version(3000099, org.apache.lucene.util.Version.LUCENE_9_9_2); public static final Version CURRENT = V_3_0_0; From 930ecf0f527d95b8bab86f5dc7c1092b3643e996 Mon Sep 17 00:00:00 2001 From: CEHENKLE Date: Wed, 7 Feb 2024 12:36:08 -0800 Subject: [PATCH 3/5] Adding a "meta issue" template (#12148) Signed-off-by: CEHENKLE Co-authored-by: Andrew Ross --- .github/ISSUE_TEMPLATE/meta.yml | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 .github/ISSUE_TEMPLATE/meta.yml diff --git a/.github/ISSUE_TEMPLATE/meta.yml b/.github/ISSUE_TEMPLATE/meta.yml new file mode 100644 index 0000000000000..0ef42688474c3 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/meta.yml @@ -0,0 +1,28 @@ +name: ✨ Meta Issue +description: An issue that collects other issues together to describe a larger project or activity. +title: '[META] ' +labels: ['Meta, untriaged'] +body: + - type: textarea + attributes: + label: Please describe the end goal of this project + description: A clear and concise description of this project/endeavor. This should be understandable to someone with no context. + placeholder: Ex. Views is a way to project indices in OpenSearch, these views act as a focal point for describing the underlying data and how the data is accessed. It allows for restricting the scope and filtering the response consistently. + validations: + required: true + - type: textarea + attributes: + label: Supporting References + description: Please provide links (and descriptions!) to RFCs, design docs, etc + validations: + required: true + - type: textarea + attributes: + label: Issues + description: Please create a list of issues that should be tracked by this meta issue, including a short description. The purpose is to provide everyone on the project with an "at a glance" update of the state us the work being tracked. If you use the format "- [ ]" it will put your list into a checklist. + placeholder: Ex. - [ ] https://github.com/opensearch-project/security/issues/3888 Add views to the cluster metadata schema + validations: + required: true + + + From dc1682f57400a79b5c721cdd303dae43efb1d55f Mon Sep 17 00:00:00 2001 From: Andriy Redko <andriy.redko@aiven.io> Date: Wed, 7 Feb 2024 15:38:48 -0500 Subject: [PATCH 4/5] Fix 350_matched_queries.yml tests while awaiting for backport (#12232) Signed-off-by: Andriy Redko <andriy.redko@aiven.io> --- .../rest-api-spec/test/search/350_matched_queries.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search/350_matched_queries.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search/350_matched_queries.yml index 25de51a316bd4..450e9c0a0a7bb 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search/350_matched_queries.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search/350_matched_queries.yml @@ -1,7 +1,7 @@ setup: - skip: - version: " - 2.12.0" - reason: "implemented for versions post 2.12.0" + version: " - 2.99.99" + reason: "implemented for versions 3.0.0 and above" --- "matched queries": From a0b519826b2dab19f022e16e613f630a0bce253e Mon Sep 17 00:00:00 2001 From: gaobinlong <gbinlong@amazon.com> Date: Thu, 8 Feb 2024 05:07:45 +0800 Subject: [PATCH 5/5] Add community_id ingest processor (#12121) * Add community id ingest processor Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Modify change log Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Optimize the code Signed-off-by: Gao Binlong <gbinlong@amazon.com> --------- Signed-off-by: Gao Binlong <gbinlong@amazon.com> --- CHANGELOG.md | 1 + .../common/network/InetAddresses.java | 2 +- .../ingest/common/CommunityIdProcessor.java | 647 +++++++++++++ .../common/IngestCommonModulePlugin.java | 1 + .../CommunityIdProcessorFactoryTests.java | 117 +++ .../common/CommunityIdProcessorTests.java | 910 ++++++++++++++++++ .../rest-api-spec/test/ingest/10_basic.yml | 16 + .../ingest/320_community_id_processor.yml | 370 +++++++ 8 files changed, 2063 insertions(+), 1 deletion(-) create mode 100644 modules/ingest-common/src/main/java/org/opensearch/ingest/common/CommunityIdProcessor.java create mode 100644 modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorFactoryTests.java create mode 100644 modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorTests.java create mode 100644 modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/320_community_id_processor.yml diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ad0932a86a07..581f5dd74617d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -137,6 +137,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Support index level allocation filtering for searchable snapshot index ([#11563](https://github.com/opensearch-project/OpenSearch/pull/11563)) - Add `org.opensearch.rest.MethodHandlers` and `RestController#getAllHandlers` ([11876](https://github.com/opensearch-project/OpenSearch/pull/11876)) - New DateTime format for RFC3339 compatible date fields ([#11465](https://github.com/opensearch-project/OpenSearch/pull/11465)) +- Add community_id ingest processor ([#12121](https://github.com/opensearch-project/OpenSearch/pull/12121)) - Add support for Google Application Default Credentials in repository-gcs ([#8394](https://github.com/opensearch-project/OpenSearch/pull/8394)) - Remove concurrent segment search feature flag for GA launch ([#12074](https://github.com/opensearch-project/OpenSearch/pull/12074)) - Enable Fuzzy codec for doc id fields using a bloom filter ([#11022](https://github.com/opensearch-project/OpenSearch/pull/11022)) diff --git a/libs/common/src/main/java/org/opensearch/common/network/InetAddresses.java b/libs/common/src/main/java/org/opensearch/common/network/InetAddresses.java index 0f289c09bbae2..60c0717a28f05 100644 --- a/libs/common/src/main/java/org/opensearch/common/network/InetAddresses.java +++ b/libs/common/src/main/java/org/opensearch/common/network/InetAddresses.java @@ -52,7 +52,7 @@ public static boolean isInetAddress(String ipString) { return ipStringToBytes(ipString) != null; } - private static byte[] ipStringToBytes(String ipString) { + public static byte[] ipStringToBytes(String ipString) { // Make a first pass to categorize the characters in this string. boolean hasColon = false; boolean hasDot = false; diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/CommunityIdProcessor.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/CommunityIdProcessor.java new file mode 100644 index 0000000000000..c968fb2f6c2da --- /dev/null +++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/CommunityIdProcessor.java @@ -0,0 +1,647 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest.common; + +import org.opensearch.common.hash.MessageDigests; +import org.opensearch.common.network.InetAddresses; +import org.opensearch.core.common.Strings; +import org.opensearch.ingest.AbstractProcessor; +import org.opensearch.ingest.ConfigurationUtils; +import org.opensearch.ingest.IngestDocument; +import org.opensearch.ingest.Processor; + +import java.security.MessageDigest; +import java.util.Arrays; +import java.util.Base64; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException; + +/** + * Processor that generating community id flow hash for the network flow tuples, the algorithm is defined in + * <a href="https://github.com/corelight/community-id-spec">Community ID Flow Hashing</a>. + */ +public class CommunityIdProcessor extends AbstractProcessor { + public static final String TYPE = "community_id"; + // the version of the community id flow hashing algorithm + private static final String COMMUNITY_ID_HASH_VERSION = "1"; + // 0 byte for padding + private static final byte PADDING_BYTE = 0; + // the maximum code number for network protocol, ICMP message type and code as defined by IANA + private static final int IANA_COMMON_MAX_NUMBER = 255; + // the minimum code number for network protocol, ICMP message type and code as defined by IANA + private static final int IANA_COMMON_MIN_NUMBER = 0; + // the minimum seed for generating hash + private static final int MIN_SEED = 0; + // the maximum seed for generating hash + private static final int MAX_SEED = 65535; + // the minimum port number in transport layer + private static final int MIN_PORT = 0; + // the maximum port number in transport layer + private static final int MAX_PORT = 63335; + private static final String ICMP_MESSAGE_TYPE = "type"; + private static final String ICMP_MESSAGE_CODE = "code"; + private final String sourceIPField; + private final String sourcePortField; + private final String destinationIPField; + private final String destinationPortField; + private final String ianaProtocolNumberField; + private final String protocolField; + private final String icmpTypeField; + private final String icmpCodeField; + private final int seed; + private final String targetField; + private final boolean ignoreMissing; + + CommunityIdProcessor( + String tag, + String description, + String sourceIPField, + String sourcePortField, + String destinationIPField, + String destinationPortField, + String ianaProtocolNumberField, + String protocolField, + String icmpTypeField, + String icmpCodeField, + int seed, + String targetField, + boolean ignoreMissing + ) { + super(tag, description); + this.sourceIPField = sourceIPField; + this.sourcePortField = sourcePortField; + this.destinationIPField = destinationIPField; + this.destinationPortField = destinationPortField; + this.ianaProtocolNumberField = ianaProtocolNumberField; + this.protocolField = protocolField; + this.icmpTypeField = icmpTypeField; + this.icmpCodeField = icmpCodeField; + this.seed = seed; + this.targetField = targetField; + this.ignoreMissing = ignoreMissing; + } + + public String getSourceIPField() { + return sourceIPField; + } + + public String getSourcePortField() { + return sourcePortField; + } + + public String getDestinationIPField() { + return destinationIPField; + } + + public String getDestinationPortField() { + return destinationPortField; + } + + public String getIANAProtocolNumberField() { + return ianaProtocolNumberField; + } + + public String getProtocolField() { + return protocolField; + } + + public String getIcmpTypeField() { + return icmpTypeField; + } + + public String getIcmpCodeField() { + return icmpCodeField; + } + + public int getSeed() { + return seed; + } + + public String getTargetField() { + return targetField; + } + + public boolean isIgnoreMissing() { + return ignoreMissing; + } + + @Override + public IngestDocument execute(IngestDocument document) { + // resolve protocol firstly + Protocol protocol = resolveProtocol(document); + // exit quietly if protocol cannot be resolved and ignore_missing is true + if (protocol == null) { + return document; + } + + // resolve ip secondly, exit quietly if either source ip or destination ip cannot be resolved and ignore_missing is true + byte[] sourceIPByteArray = resolveIP(document, sourceIPField); + if (sourceIPByteArray == null) { + return document; + } + byte[] destIPByteArray = resolveIP(document, destinationIPField); + if (destIPByteArray == null) { + return document; + } + // source ip and destination ip must have same format, either ipv4 or ipv6 + if (sourceIPByteArray.length != destIPByteArray.length) { + throw new IllegalArgumentException("source ip and destination ip must have same format"); + } + + // resolve source port and destination port for transport protocols, + // exit quietly if either source port or destination port is null nor empty + Integer sourcePort = null; + Integer destinationPort = null; + if (protocol.isTransportProtocol()) { + sourcePort = resolvePort(document, sourcePortField); + if (sourcePort == null) { + return document; + } + + destinationPort = resolvePort(document, destinationPortField); + if (destinationPort == null) { + return document; + } + } + + // resolve ICMP message type and code, support both ipv4 and ipv6 + // set source port to icmp type, and set dest port to icmp code, so that we can have a generic way to handle + // all protocols + boolean isOneway = true; + final boolean isICMPProtocol = Protocol.ICMP == protocol || Protocol.ICMP_V6 == protocol; + if (isICMPProtocol) { + Integer icmpType = resolveICMP(document, icmpTypeField, ICMP_MESSAGE_TYPE); + if (icmpType == null) { + return document; + } else { + sourcePort = icmpType; + } + + // for the message types which don't have code, fetch the equivalent code from the pre-defined mapper, + // and they can be considered to two-way flow + Byte equivalentCode = Protocol.ICMP.getProtocolCode() == protocol.getProtocolCode() + ? ICMPType.getEquivalentCode(icmpType.byteValue()) + : ICMPv6Type.getEquivalentCode(icmpType.byteValue()); + if (equivalentCode != null) { + isOneway = false; + // for IPv6-ICMP, the pre-defined code is negative byte, + // we need to convert it to positive integer for later comparison + destinationPort = Protocol.ICMP.getProtocolCode() == protocol.getProtocolCode() + ? Integer.valueOf(equivalentCode) + : Byte.toUnsignedInt(equivalentCode); + } else { + // get icmp code from the document if we cannot get equivalent code from the pre-defined mapper + Integer icmpCode = resolveICMP(document, icmpCodeField, ICMP_MESSAGE_CODE); + if (icmpCode == null) { + return document; + } else { + destinationPort = icmpCode; + } + } + } + + assert (sourcePort != null && destinationPort != null); + boolean isLess = compareIPAndPort(sourceIPByteArray, sourcePort, destIPByteArray, destinationPort); + // swap ip and port to remove directionality in the flow tuple, smaller ip:port tuple comes first + // but for ICMP and IPv6-ICMP, if it's a one-way flow, the flow tuple is considered to be ordered + if (!isLess && (!isICMPProtocol || !isOneway)) { + byte[] byteArray = sourceIPByteArray; + sourceIPByteArray = destIPByteArray; + destIPByteArray = byteArray; + + int tempPort = sourcePort; + sourcePort = destinationPort; + destinationPort = tempPort; + } + + // generate flow hash + String digest = generateCommunityIDHash( + protocol.getProtocolCode(), + sourceIPByteArray, + destIPByteArray, + sourcePort, + destinationPort, + seed + ); + document.setFieldValue(targetField, digest); + return document; + } + + @Override + public String getType() { + return TYPE; + } + + /** + * Resolve network protocol + * @param document the ingesting document + * @return the resolved protocol, null if the resolved protocol is null and ignore_missing is true + * @throws IllegalArgumentException only if ignoreMissing is false and the field is null, empty, invalid, + * or if the field that is found at the provided path is not of the expected type. + */ + private Protocol resolveProtocol(IngestDocument document) { + Protocol protocol = null; + Integer ianaProtocolNumber = null; + String protocolName = null; + if (!Strings.isNullOrEmpty(ianaProtocolNumberField)) { + ianaProtocolNumber = document.getFieldValue(ianaProtocolNumberField, Integer.class, true); + } + if (!Strings.isNullOrEmpty(protocolField)) { + protocolName = document.getFieldValue(protocolField, String.class, true); + } + // if iana protocol number is not specified, then resolve protocol name + if (ianaProtocolNumber != null) { + if (ianaProtocolNumber >= IANA_COMMON_MIN_NUMBER + && ianaProtocolNumber <= IANA_COMMON_MAX_NUMBER + && Protocol.protocolCodeMap.containsKey(ianaProtocolNumber.byteValue())) { + protocol = Protocol.protocolCodeMap.get(ianaProtocolNumber.byteValue()); + } else { + throw new IllegalArgumentException("unsupported iana protocol number [" + ianaProtocolNumber + "]"); + } + } else if (protocolName != null) { + Protocol protocolFromName = Protocol.fromProtocolName(protocolName); + if (protocolFromName != null) { + protocol = protocolFromName; + } else { + throw new IllegalArgumentException("unsupported protocol [" + protocolName + "]"); + } + } + + // return null if protocol cannot be resolved and ignore_missing is true + if (protocol == null) { + if (ignoreMissing) { + return null; + } else { + throw new IllegalArgumentException( + "cannot resolve protocol by neither iana protocol number field [" + + ianaProtocolNumberField + + "] nor protocol name field [" + + protocolField + + "]" + ); + } + } + return protocol; + } + + /** + * Resolve ip address + * @param document the ingesting document + * @param fieldName the ip field to be resolved + * @return the byte array of the resolved ip + * @throws IllegalArgumentException only if ignoreMissing is false and the field is null, empty, invalid, + * or if the field that is found at the provided path is not of the expected type. + */ + private byte[] resolveIP(IngestDocument document, String fieldName) { + if (Strings.isNullOrEmpty(fieldName)) { + if (ignoreMissing) { + return null; + } else { + throw new IllegalArgumentException("both source ip field path and destination ip field path cannot be null nor empty"); + } + } + + String ipAddress = document.getFieldValue(fieldName, String.class, true); + if (Strings.isNullOrEmpty(ipAddress)) { + if (ignoreMissing) { + return null; + } else { + throw new IllegalArgumentException("ip address in the field [" + fieldName + "] is null or empty"); + } + } + + byte[] byteArray = InetAddresses.ipStringToBytes(ipAddress); + if (byteArray == null) { + throw new IllegalArgumentException( + "ip address [" + ipAddress + "] in the field [" + fieldName + "] is not a valid ipv4/ipv6 address" + ); + } else { + return byteArray; + } + } + + /** + * Resolve port for transport protocols + * @param document the ingesting document + * @param fieldName the port field to be resolved + * @return the resolved port number, null if the resolved port is null and ignoreMissing is true + * @throws IllegalArgumentException only if ignoreMissing is false and the field is null, empty, invalid, + * or if the field that is found at the provided path is not of the expected type. + */ + private Integer resolvePort(IngestDocument document, String fieldName) { + Integer port; + if (Strings.isNullOrEmpty(fieldName)) { + if (ignoreMissing) { + return null; + } else { + throw new IllegalArgumentException("both source port and destination port field path cannot be null nor empty"); + } + } else { + port = document.getFieldValue(fieldName, Integer.class, true); + } + + if (port == null) { + if (ignoreMissing) { + return null; + } else { + throw new IllegalArgumentException( + "both source port and destination port cannot be null, but port in the field path [" + fieldName + "] is null" + ); + } + } else if (port < MIN_PORT || port > MAX_PORT) { + throw new IllegalArgumentException( + "both source port and destination port must be between 0 and 65535, but port in the field path [" + + fieldName + + "] is [" + + port + + "]" + ); + } + return port; + } + + /** + * Resolve ICMP's message type and code field + * @param document the ingesting document + * @param fieldName name of the type or the code field + * @param fieldType type or code + * @return the resolved value of the specified field, return null if ignore_missing if true and the field doesn't exist or is null, + * @throws IllegalArgumentException only if ignoreMissing is false and the field is null, empty, invalid, + * or if the field that is found at the provided path is not of the expected type. + */ + private Integer resolveICMP(IngestDocument document, String fieldName, String fieldType) { + if (Strings.isNullOrEmpty(fieldName)) { + if (ignoreMissing) { + return null; + } else { + throw new IllegalArgumentException("icmp message " + fieldType + " field path cannot be null nor empty"); + } + } + Integer fieldValue = document.getFieldValue(fieldName, Integer.class, true); + if (fieldValue == null) { + if (ignoreMissing) { + return null; + } else { + throw new IllegalArgumentException("icmp message " + fieldType + " cannot be null"); + } + } else if (fieldValue < IANA_COMMON_MIN_NUMBER || fieldValue > IANA_COMMON_MAX_NUMBER) { + throw new IllegalArgumentException("invalid icmp message " + fieldType + " [" + fieldValue + "]"); + } else { + return fieldValue; + } + } + + /** + * + * @param protocolCode byte of the protocol number + * @param sourceIPByteArray bytes of the source ip in the network flow tuple + * @param destIPByteArray bytes of the destination ip in the network flow tuple + * @param sourcePort source port in the network flow tuple + * @param destinationPort destination port in the network flow tuple + * @param seed seed for generating hash + * @return the generated hash value, use SHA-1 + */ + private String generateCommunityIDHash( + byte protocolCode, + byte[] sourceIPByteArray, + byte[] destIPByteArray, + Integer sourcePort, + Integer destinationPort, + int seed + ) { + MessageDigest messageDigest = MessageDigests.sha1(); + messageDigest.update(intToTwoByteArray(seed)); + messageDigest.update(sourceIPByteArray); + messageDigest.update(destIPByteArray); + messageDigest.update(protocolCode); + messageDigest.update(PADDING_BYTE); + messageDigest.update(intToTwoByteArray(sourcePort)); + messageDigest.update(intToTwoByteArray(destinationPort)); + + return COMMUNITY_ID_HASH_VERSION + ":" + Base64.getEncoder().encodeToString(messageDigest.digest()); + } + + /** + * Convert an integer to two byte array + * @param val the integer which will be consumed to produce a two byte array + * @return the two byte array + */ + private byte[] intToTwoByteArray(Integer val) { + byte[] byteArray = new byte[2]; + byteArray[0] = Integer.valueOf(val >>> 8).byteValue(); + byteArray[1] = val.byteValue(); + return byteArray; + } + + /** + * Compare the ip and port, return true if the flow tuple is ordered + * @param sourceIPByteArray bytes of the source ip in the network flow tuple + * @param destIPByteArray bytes of the destination ip in the network flow tuple + * @param sourcePort source port in the network flow tuple + * @param destinationPort destination port in the network flow tuple + * @return true if sourceIP is less than destinationIP or sourceIP equals to destinationIP + * but sourcePort is less than destinationPort + */ + private boolean compareIPAndPort(byte[] sourceIPByteArray, int sourcePort, byte[] destIPByteArray, int destinationPort) { + int compareResult = compareByteArray(sourceIPByteArray, destIPByteArray); + return compareResult < 0 || compareResult == 0 && sourcePort < destinationPort; + } + + /** + * Compare two byte array which have same length + * @param byteArray1 the first byte array to compare + * @param byteArray2 the second byte array to compare + * @return 0 if each byte in both two arrays are same, a value less than 0 if byte in the first array is less than + * the byte at the same index, a value greater than 0 if byte in the first array is greater than the byte at the same index + */ + private int compareByteArray(byte[] byteArray1, byte[] byteArray2) { + assert (byteArray1.length == byteArray2.length); + int i = 0; + int j = 0; + while (i < byteArray1.length && j < byteArray2.length) { + int isLess = Byte.compareUnsigned(byteArray1[i], byteArray2[j]); + if (isLess == 0) { + i++; + j++; + } else { + return isLess; + } + } + return 0; + } + + /** + * Mapping ICMP's message type and code into a port-like notion for ordering the request or response + */ + enum ICMPType { + ECHO_REPLY((byte) 0, (byte) 8), + ECHO((byte) 8, (byte) 0), + RTR_ADVERT((byte) 9, (byte) 10), + RTR_SOLICIT((byte) 10, (byte) 9), + TSTAMP((byte) 13, (byte) 14), + TSTAMP_REPLY((byte) 14, (byte) 13), + INFO((byte) 15, (byte) 16), + INFO_REPLY((byte) 16, (byte) 15), + MASK((byte) 17, (byte) 18), + MASK_REPLY((byte) 18, (byte) 17); + + private final byte type; + private final byte code; + + ICMPType(byte type, byte code) { + this.type = type; + this.code = code; + } + + private static final Map<Byte, Byte> ICMPTypeMapper = Arrays.stream(values()).collect(Collectors.toMap(t -> t.type, t -> t.code)); + + /** + * Takes the message type of ICMP and derives equivalent message code + * @param type the message type of ICMP + * @return the equivalent message code + */ + public static Byte getEquivalentCode(int type) { + return ICMPTypeMapper.get(Integer.valueOf(type).byteValue()); + } + } + + /** + * Mapping IPv6-ICMP's message type and code into a port-like notion for ordering the request or response + */ + enum ICMPv6Type { + ECHO_REQUEST((byte) 128, (byte) 129), + ECHO_REPLY((byte) 129, (byte) 128), + MLD_LISTENER_QUERY((byte) 130, (byte) 131), + MLD_LISTENER_REPORT((byte) 131, (byte) 130), + ND_ROUTER_SOLICIT((byte) 133, (byte) 134), + ND_ROUTER_ADVERT((byte) 134, (byte) 133), + ND_NEIGHBOR_SOLICIT((byte) 135, (byte) 136), + ND_NEIGHBOR_ADVERT((byte) 136, (byte) 135), + WRU_REQUEST((byte) 139, (byte) 140), + WRU_REPLY((byte) 140, (byte) 139), + HAAD_REQUEST((byte) 144, (byte) 145), + HAAD_REPLY((byte) 145, (byte) 144); + + private final byte type; + private final byte code; + + ICMPv6Type(byte type, byte code) { + this.type = type; + this.code = code; + } + + private static final Map<Byte, Byte> ICMPTypeMapper = Arrays.stream(values()).collect(Collectors.toMap(t -> t.type, t -> t.code)); + + /** + * Takes the message type of IPv6-ICMP and derives equivalent message code + * @param type the message type of IPv6-ICMP + * @return the equivalent message code + */ + public static Byte getEquivalentCode(int type) { + return ICMPTypeMapper.get(Integer.valueOf(type).byteValue()); + } + } + + /** + * An enumeration of the supported network protocols + */ + enum Protocol { + ICMP((byte) 1, false), + TCP((byte) 6, true), + UDP((byte) 17, true), + ICMP_V6((byte) 58, false), + SCTP((byte) 132, true); + + private final byte protocolCode; + private final boolean isTransportProtocol; + + Protocol(int ianaNumber, boolean isTransportProtocol) { + this.protocolCode = Integer.valueOf(ianaNumber).byteValue(); + this.isTransportProtocol = isTransportProtocol; + } + + public static final Map<Byte, Protocol> protocolCodeMap = Arrays.stream(values()) + .collect(Collectors.toMap(Protocol::getProtocolCode, p -> p)); + + public static Protocol fromProtocolName(String protocolName) { + String name = protocolName.toUpperCase(Locale.ROOT); + if (name.equals("IPV6-ICMP")) { + return Protocol.ICMP_V6; + } + try { + return valueOf(name); + } catch (IllegalArgumentException e) { + return null; + } + } + + public byte getProtocolCode() { + return this.protocolCode; + } + + public boolean isTransportProtocol() { + return this.isTransportProtocol; + } + } + + public static class Factory implements Processor.Factory { + @Override + public CommunityIdProcessor create( + Map<String, Processor.Factory> registry, + String processorTag, + String description, + Map<String, Object> config + ) throws Exception { + String sourceIPField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "source_ip_field"); + String sourcePortField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "source_port_field"); + String destinationIPField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "destination_ip_field"); + String destinationPortField = ConfigurationUtils.readOptionalStringProperty( + TYPE, + processorTag, + config, + "destination_port_field" + ); + String ianaProtocolNumberField = ConfigurationUtils.readOptionalStringProperty( + TYPE, + processorTag, + config, + "iana_protocol_number_field" + ); + String protocolField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "protocol_field"); + String icmpTypeField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "icmp_type_field"); + String icmpCodeField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "icmp_code_field"); + int seed = ConfigurationUtils.readIntProperty(TYPE, processorTag, config, "seed", 0); + if (seed < MIN_SEED || seed > MAX_SEED) { + throw newConfigurationException(TYPE, processorTag, "seed", "seed must be between 0 and 65535"); + } + + String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", "community_id"); + boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); + + return new CommunityIdProcessor( + processorTag, + description, + sourceIPField, + sourcePortField, + destinationIPField, + destinationPortField, + ianaProtocolNumberField, + protocolField, + icmpTypeField, + icmpCodeField, + seed, + targetField, + ignoreMissing + ); + } + } +} diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java index ff6a322ede38f..0f8b248fd5af8 100644 --- a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java +++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java @@ -108,6 +108,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet processors.put(CsvProcessor.TYPE, new CsvProcessor.Factory()); processors.put(CopyProcessor.TYPE, new CopyProcessor.Factory(parameters.scriptService)); processors.put(RemoveByPatternProcessor.TYPE, new RemoveByPatternProcessor.Factory()); + processors.put(CommunityIdProcessor.TYPE, new CommunityIdProcessor.Factory()); return Collections.unmodifiableMap(processors); } diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorFactoryTests.java new file mode 100644 index 0000000000000..5edb44b8c64f2 --- /dev/null +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorFactoryTests.java @@ -0,0 +1,117 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest.common; + +import org.opensearch.OpenSearchException; +import org.opensearch.OpenSearchParseException; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class CommunityIdProcessorFactoryTests extends OpenSearchTestCase { + private CommunityIdProcessor.Factory factory; + + @Before + public void init() { + factory = new CommunityIdProcessor.Factory(); + } + + public void testCreate() throws Exception { + boolean ignoreMissing = randomBoolean(); + int seed = randomIntBetween(0, 65535); + Map<String, Object> config = new HashMap<>(); + config.put("source_ip_field", "source_ip"); + config.put("source_port_field", "source_port"); + config.put("destination_ip_field", "destination_ip"); + config.put("destination_port_field", "destination_port"); + config.put("iana_protocol_number_field", "iana_protocol_number"); + config.put("protocol_field", "protocol"); + config.put("icmp_type_field", "icmp_type"); + config.put("icmp_code_field", "icmp_code"); + config.put("seed", seed); + config.put("target_field", "community_id_hash"); + config.put("ignore_missing", ignoreMissing); + String processorTag = randomAlphaOfLength(10); + CommunityIdProcessor communityIDProcessor = factory.create(null, processorTag, null, config); + assertThat(communityIDProcessor.getTag(), equalTo(processorTag)); + assertThat(communityIDProcessor.getSourceIPField(), equalTo("source_ip")); + assertThat(communityIDProcessor.getSourcePortField(), equalTo("source_port")); + assertThat(communityIDProcessor.getDestinationIPField(), equalTo("destination_ip")); + assertThat(communityIDProcessor.getDestinationPortField(), equalTo("destination_port")); + assertThat(communityIDProcessor.getIANAProtocolNumberField(), equalTo("iana_protocol_number")); + assertThat(communityIDProcessor.getProtocolField(), equalTo("protocol")); + assertThat(communityIDProcessor.getIcmpTypeField(), equalTo("icmp_type")); + assertThat(communityIDProcessor.getIcmpCodeField(), equalTo("icmp_code")); + assertThat(communityIDProcessor.getSeed(), equalTo(seed)); + assertThat(communityIDProcessor.getTargetField(), equalTo("community_id_hash")); + assertThat(communityIDProcessor.isIgnoreMissing(), equalTo(ignoreMissing)); + } + + public void testCreateWithSourceIPField() throws Exception { + Map<String, Object> config = new HashMap<>(); + try { + factory.create(null, null, null, config); + fail("factory create should have failed"); + } catch (OpenSearchParseException e) { + assertThat(e.getMessage(), equalTo("[source_ip_field] required property is missing")); + } + + config.put("source_ip_field", null); + try { + factory.create(null, null, null, config); + fail("factory create should have failed"); + } catch (OpenSearchParseException e) { + assertThat(e.getMessage(), equalTo("[source_ip_field] required property is missing")); + } + } + + public void testCreateWithDestinationIPField() throws Exception { + Map<String, Object> config = new HashMap<>(); + config.put("source_ip_field", "source_ip"); + try { + factory.create(null, null, null, config); + fail("factory create should have failed"); + } catch (OpenSearchParseException e) { + assertThat(e.getMessage(), equalTo("[destination_ip_field] required property is missing")); + } + + config.put("source_ip_field", "source_ip"); + config.put("destination_ip_field", null); + try { + factory.create(null, null, null, config); + fail("factory create should have failed"); + } catch (OpenSearchParseException e) { + assertThat(e.getMessage(), equalTo("[destination_ip_field] required property is missing")); + } + } + + public void testInvalidSeed() throws Exception { + Map<String, Object> config = new HashMap<>(); + int seed; + if (randomBoolean()) { + seed = -1; + } else { + seed = 65536; + } + config.put("source_ip_field", "source_ip"); + config.put("destination_ip_field", "destination_ip"); + config.put("seed", seed); + try { + factory.create(null, null, null, config); + fail("factory create should have failed"); + } catch (OpenSearchException e) { + assertThat(e.getMessage(), equalTo("[seed] seed must be between 0 and 65535")); + } + } + +} diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorTests.java new file mode 100644 index 0000000000000..2bda9db80dbcc --- /dev/null +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorTests.java @@ -0,0 +1,910 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest.common; + +import org.opensearch.ingest.IngestDocument; +import org.opensearch.ingest.Processor; +import org.opensearch.ingest.RandomDocumentPicks; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class CommunityIdProcessorTests extends OpenSearchTestCase { + + public void testResolveProtocol() throws Exception { + Map<String, Object> source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", 1000); + source.put("destination_port", 2000); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source); + + String targetFieldName = randomAlphaOfLength(100); + boolean ignore_missing = randomBoolean(); + Processor processor = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + null, + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + ignore_missing + ); + if (ignore_missing) { + processor.execute(ingestDocument); + assertThat(ingestDocument.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "cannot resolve protocol by neither iana protocol number field [iana_protocol_number] nor protocol name field [protocol]", + IllegalArgumentException.class, + () -> processor.execute(ingestDocument) + ); + } + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", 1000); + source.put("destination_port", 2000); + String protocol = randomAlphaOfLength(10); + source.put("protocol", protocol); + IngestDocument ingestDocumentWithProtocol = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithProtocol = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + assertThrows( + "unsupported protocol [" + protocol + "]", + IllegalArgumentException.class, + () -> processorWithProtocol.execute(ingestDocumentWithProtocol) + ); + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", 1000); + source.put("destination_port", 2000); + int ianaProtocolNumber = randomIntBetween(1000, 10000); + source.put("iana_protocol_number", ianaProtocolNumber); + IngestDocument ingestDocumentWithProtocolNumber = RandomDocumentPicks.randomIngestDocument(random(), source); + + Processor processorWithProtocolNumber = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + null, + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + assertThrows( + "unsupported iana protocol number [" + ianaProtocolNumber + "]", + IllegalArgumentException.class, + () -> processorWithProtocolNumber.execute(ingestDocumentWithProtocolNumber) + ); + } + + public void testResolveIPAndPort() throws Exception { + Map<String, Object> source = new HashMap<>(); + source.put("source_ip", ""); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", 1000); + source.put("destination_port", 2000); + source.put("protocol", "tcp"); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source); + + String targetFieldName = randomAlphaOfLength(100); + boolean ignore_missing = randomBoolean(); + Processor processor = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + null, + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + ignore_missing + ); + if (ignore_missing) { + processor.execute(ingestDocument); + assertThat(ingestDocument.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "ip address in the field [source_ip] is null or empty", + IllegalArgumentException.class, + () -> processor.execute(ingestDocument) + ); + } + + source = new HashMap<>(); + source.put("source_ip", "1.1.1"); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", 1000); + source.put("destination_port", 2000); + source.put("protocol", "tcp"); + IngestDocument ingestDocumentWithInvalidSourceIP = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithInvalidSourceIP = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + + assertThrows( + "ip address in the field [source_ip] is not a valid ipv4/ipv6 address", + IllegalArgumentException.class, + () -> processorWithInvalidSourceIP.execute(ingestDocumentWithInvalidSourceIP) + ); + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", ""); + source.put("source_port", 1000); + source.put("destination_port", 2000); + source.put("protocol", "tcp"); + ignore_missing = randomBoolean(); + IngestDocument ingestDocumentWithEmptyDestIP = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithEmptyDestIP = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + ignore_missing + ); + if (ignore_missing) { + processorWithEmptyDestIP.execute(ingestDocumentWithEmptyDestIP); + assertThat(ingestDocumentWithEmptyDestIP.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "ip address in the field [destination_ip] is null or empty", + IllegalArgumentException.class, + () -> processorWithEmptyDestIP.execute(ingestDocumentWithEmptyDestIP) + ); + } + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2"); + source.put("source_port", 1000); + source.put("destination_port", 2000); + source.put("protocol", "tcp"); + IngestDocument ingestDocumentWithInvalidDestIP = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithInvalidDestIP = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + assertThrows( + "ip address in the field [destination_ip] is not a valid ipv4/ipv6 address", + IllegalArgumentException.class, + () -> processorWithInvalidDestIP.execute(ingestDocumentWithInvalidDestIP) + ); + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", 1000); + source.put("destination_port", 2000); + source.put("protocol", "tcp"); + ignore_missing = randomBoolean(); + IngestDocument normalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithEmptySourceIPFieldPath = createCommunityIdProcessor( + "", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + ignore_missing + ); + if (ignore_missing) { + processorWithEmptySourceIPFieldPath.execute(normalIngestDocument); + assertThat(normalIngestDocument.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "both source ip field path and destination ip field path cannot be null nor empty", + IllegalArgumentException.class, + () -> processorWithEmptySourceIPFieldPath.execute(normalIngestDocument) + ); + } + ignore_missing = randomBoolean(); + Processor processorWithEmptyDestIPFieldPath = createCommunityIdProcessor( + "source_ip", + "source_port", + "", + "destination_port", + "iana_protocol_number", + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + ignore_missing + ); + if (ignore_missing) { + processorWithEmptyDestIPFieldPath.execute(normalIngestDocument); + assertThat(normalIngestDocument.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "both source ip field path and destination ip field path cannot be null nor empty", + IllegalArgumentException.class, + () -> processorWithEmptyDestIPFieldPath.execute(normalIngestDocument) + ); + } + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", null); + source.put("destination_port", 2000); + source.put("protocol", "tcp"); + ignore_missing = randomBoolean(); + IngestDocument ingestDocumentWithEmptySourcePort = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithEmptySourcePort = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + ignore_missing + ); + if (ignore_missing) { + processorWithEmptySourcePort.execute(ingestDocumentWithEmptySourcePort); + assertThat(ingestDocumentWithEmptySourcePort.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "both source port and destination port field path cannot be null nor empty", + IllegalArgumentException.class, + () -> processorWithEmptySourcePort.execute(ingestDocumentWithEmptySourcePort) + ); + } + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", 65536); + source.put("destination_port", 2000); + source.put("protocol", "tcp"); + IngestDocument ingestDocumentWithInvalidSourcePort = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithInvalidSourcePort = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + assertThrows( + "both source port and destination port must be between 0 and 65535, but port in the field path [source_port] is [65536]", + IllegalArgumentException.class, + () -> processorWithInvalidSourcePort.execute(ingestDocumentWithInvalidSourcePort) + ); + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", 1000); + source.put("destination_port", null); + source.put("protocol", "tcp"); + ignore_missing = randomBoolean(); + IngestDocument ingestDocumentWithEmptyDestPort = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithEmptyDestPort = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + ignore_missing + ); + if (ignore_missing) { + processorWithEmptyDestPort.execute(ingestDocumentWithEmptyDestPort); + assertThat(ingestDocumentWithEmptyDestPort.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "both source port and destination port cannot be null, but port in the field path [destination_port] is null", + IllegalArgumentException.class, + () -> processorWithEmptyDestPort.execute(ingestDocumentWithEmptyDestPort) + ); + } + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", 1000); + source.put("destination_port", -1); + source.put("protocol", "tcp"); + IngestDocument ingestDocumentWithInvalidDestPort = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithInvalidDestPort = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + assertThrows( + "both source port and destination port cannot be null, but port in the field path [destination_port] is [-1]", + IllegalArgumentException.class, + () -> processorWithInvalidDestPort.execute(ingestDocumentWithInvalidDestPort) + ); + } + + public void testResolveICMPTypeAndCode() throws Exception { + Map<String, Object> source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + int protocolNumber = randomFrom(1, 58); + source.put("iana_protocol_number", protocolNumber); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source); + String targetFieldName = randomAlphaOfLength(100); + boolean ignoreMissing = randomBoolean(); + Processor processor = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + null, + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + ignoreMissing + ); + if (ignoreMissing) { + processor.execute(ingestDocument); + assertThat(ingestDocument.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "icmp message type field path cannot be null nor empty", + IllegalArgumentException.class, + () -> processor.execute(ingestDocument) + ); + } + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + protocolNumber = randomFrom(1, 58); + source.put("iana_protocol_number", protocolNumber); + source.put("icmp_type", null); + IngestDocument ingestDocumentWithNullType = RandomDocumentPicks.randomIngestDocument(random(), source); + ignoreMissing = randomBoolean(); + Processor processorWithNullType = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + "icmp_type", + null, + randomIntBetween(0, 65535), + targetFieldName, + ignoreMissing + ); + if (ignoreMissing) { + processorWithNullType.execute(ingestDocumentWithNullType); + assertThat(ingestDocumentWithNullType.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "icmp message type cannot be null nor empty", + IllegalArgumentException.class, + () -> processorWithNullType.execute(ingestDocumentWithNullType) + ); + } + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + protocolNumber = randomFrom(1, 58); + source.put("iana_protocol_number", protocolNumber); + int icmpType; + if (randomBoolean()) { + icmpType = randomIntBetween(256, 1000); + } else { + icmpType = randomIntBetween(-100, -1); + } + source.put("icmp_type", icmpType); + IngestDocument ingestDocumentWithInvalidICMPType = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithInvalidICMPType = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + "icmp_type", + null, + randomIntBetween(0, 65535), + targetFieldName, + false + ); + assertThrows( + "invalid icmp message type [" + icmpType + "]", + IllegalArgumentException.class, + () -> processorWithInvalidICMPType.execute(ingestDocumentWithInvalidICMPType) + ); + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + protocolNumber = randomFrom(1, 58); + source.put("iana_protocol_number", protocolNumber); + if (protocolNumber == 1) { + icmpType = randomIntBetween(3, 6); + } else { + icmpType = randomIntBetween(146, 161); + } + source.put("icmp_type", icmpType); + IngestDocument ingestDocumentWithNoCode = RandomDocumentPicks.randomIngestDocument(random(), source); + ignoreMissing = randomBoolean(); + Processor processorWithNoCode = createCommunityIdProcessor( + "source_ip", + null, + "destination_ip", + null, + "iana_protocol_number", + "protocol", + "icmp_type", + null, + randomIntBetween(0, 65535), + targetFieldName, + ignoreMissing + ); + if (ignoreMissing) { + processorWithNoCode.execute(ingestDocumentWithNoCode); + assertThat(ingestDocumentWithNoCode.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "icmp message code field path cannot be null nor empty", + IllegalArgumentException.class, + () -> processorWithNoCode.execute(ingestDocumentWithNoCode) + ); + } + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + protocolNumber = randomFrom(1, 58); + source.put("iana_protocol_number", protocolNumber); + if (protocolNumber == 1) { + icmpType = randomIntBetween(3, 6); + } else { + icmpType = randomIntBetween(146, 161); + } + source.put("icmp_type", icmpType); + source.put("icmp_code", null); + IngestDocument ingestDocumentWithNullCode = RandomDocumentPicks.randomIngestDocument(random(), source); + ignoreMissing = randomBoolean(); + Processor processorWithNullCode = createCommunityIdProcessor( + "source_ip", + null, + "destination_ip", + null, + "iana_protocol_number", + "protocol", + "icmp_type", + "icmp_code", + randomIntBetween(0, 65535), + targetFieldName, + ignoreMissing + ); + if (ignoreMissing) { + processorWithNullCode.execute(ingestDocumentWithNullCode); + assertThat(ingestDocumentWithNullCode.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "icmp message code cannot be null nor empty", + IllegalArgumentException.class, + () -> processorWithNullCode.execute(ingestDocumentWithNullCode) + ); + } + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + protocolNumber = randomFrom(1, 58); + source.put("iana_protocol_number", protocolNumber); + if (protocolNumber == 1) { + icmpType = randomIntBetween(3, 6); + } else { + icmpType = randomIntBetween(146, 161); + } + source.put("icmp_type", icmpType); + int icmpCode; + if (randomBoolean()) { + icmpCode = randomIntBetween(256, 1000); + } else { + icmpCode = randomIntBetween(-100, -1); + } + source.put("icmp_code", icmpCode); + IngestDocument ingestDocumentWithInvalidCode = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithInvalidCode = createCommunityIdProcessor( + "source_ip", + null, + "destination_ip", + null, + "iana_protocol_number", + null, + "icmp_type", + "icmp_code", + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + assertThrows( + "invalid icmp message code [" + icmpCode + "]", + IllegalArgumentException.class, + () -> processorWithInvalidCode.execute(ingestDocumentWithInvalidCode) + ); + } + + public void testTransportProtocols() throws Exception { + Map<String, Object> source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", 1000); + source.put("destination_port", 2000); + boolean isProtocolNameSpecified = randomBoolean(); + if (isProtocolNameSpecified) { + source.put("protocol", randomFrom("tcp", "udp", "sctp")); + } else { + source.put("iana_number", randomFrom(6, 17, 132)); + } + + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source); + + String targetFieldName = randomAlphaOfLength(100); + Processor processor; + if (isProtocolNameSpecified) { + processor = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + null, + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + } else { + processor = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_number", + null, + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + } + + processor.execute(ingestDocument); + assertThat(ingestDocument.hasField(targetFieldName), equalTo(true)); + String communityIDHash = ingestDocument.getFieldValue(targetFieldName, String.class); + assertThat(communityIDHash.startsWith("1:"), equalTo(true)); + } + + public void testICMP() throws Exception { + Map<String, Object> source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + boolean isICMP = randomBoolean(); + if (isICMP) { + source.put("protocol", "icmp"); + source.put("type", randomFrom(0, 8, 9, 10, 13, 15, 17, 18)); + } else { + source.put("protocol", "ipv6-icmp"); + source.put("type", randomFrom(128, 129, 130, 131, 133, 134, 135, 136, 139, 140, 144, 145)); + } + + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source); + + String targetFieldName = randomAlphaOfLength(100); + Processor processor = createCommunityIdProcessor( + "source_ip", + null, + "destination_ip", + null, + null, + "protocol", + "type", + null, + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + + processor.execute(ingestDocument); + assertThat(ingestDocument.hasField(targetFieldName), equalTo(true)); + assertThat(ingestDocument.getFieldValue(targetFieldName, String.class).startsWith("1:"), equalTo(true)); + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + isICMP = randomBoolean(); + if (isICMP) { + source.put("protocol", "icmp"); + // see https://www.iana.org/assignments/icmp-parameters/icmp-parameters.xhtml#icmp-parameters-codes-5 + source.put("type", randomIntBetween(3, 6)); + source.put("code", 0); + } else { + source.put("protocol", "ipv6-icmp"); + // see https://www.iana.org/assignments/icmpv6-parameters/icmpv6-parameters.xhtml#icmpv6-parameters-codes-23 + source.put("type", randomIntBetween(146, 161)); + source.put("code", 0); + } + + IngestDocument ingestDocumentWithOnewayFlow = RandomDocumentPicks.randomIngestDocument(random(), source); + + targetFieldName = randomAlphaOfLength(100); + Processor processorWithOnewayFlow = createCommunityIdProcessor( + "source_ip", + null, + "destination_ip", + null, + null, + "protocol", + "type", + "code", + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + + processorWithOnewayFlow.execute(ingestDocumentWithOnewayFlow); + assertThat(ingestDocumentWithOnewayFlow.hasField(targetFieldName), equalTo(true)); + assertThat(ingestDocumentWithOnewayFlow.getFieldValue(targetFieldName, String.class).startsWith("1:"), equalTo(true)); + } + + // test that the hash result is consistent with the known value + public void testHashResult() throws Exception { + int index = randomIntBetween(0, CommunityIdHashInstance.values().length - 1); + CommunityIdHashInstance instance = CommunityIdHashInstance.values()[index]; + final boolean isTransportProtocol = instance.name().equals("TCP") + || instance.name().equals("UDP") + || instance.name().equals("SCTP"); + Map<String, Object> source = new HashMap<>(); + source.put("source_ip", instance.getSourceIp()); + source.put("destination_ip", instance.getDestIP()); + if (isTransportProtocol) { + source.put("source_port", instance.getSourcePort()); + source.put("destination_port", instance.getDestPort()); + source.put("iana_number", instance.getProtocolNumber()); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source); + + String targetFieldName = randomAlphaOfLength(100); + boolean ignore_missing = randomBoolean(); + Processor processor = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_number", + null, + null, + null, + 0, + targetFieldName, + ignore_missing + ); + + processor.execute(ingestDocument); + assertThat(ingestDocument.hasField(targetFieldName), equalTo(true)); + assertThat(ingestDocument.getFieldValue(targetFieldName, String.class), equalTo(instance.getHash())); + + // test the flow tuple in reversed direction, the hash result should be the same value + source = new HashMap<>(); + source.put("source_ip", instance.getDestIP()); + source.put("destination_ip", instance.getSourceIp()); + source.put("source_port", instance.getDestPort()); + source.put("destination_port", instance.getSourcePort()); + source.put("iana_number", instance.getProtocolNumber()); + IngestDocument ingestDocumentWithReversedDirection = RandomDocumentPicks.randomIngestDocument(random(), source); + + targetFieldName = randomAlphaOfLength(100); + Processor processorWithReversedDirection = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_number", + null, + null, + null, + 0, + targetFieldName, + randomBoolean() + ); + + processorWithReversedDirection.execute(ingestDocumentWithReversedDirection); + assertThat(ingestDocumentWithReversedDirection.hasField(targetFieldName), equalTo(true)); + assertThat(ingestDocumentWithReversedDirection.getFieldValue(targetFieldName, String.class), equalTo(instance.getHash())); + } else { + source.put("type", instance.getSourcePort()); + source.put("code", instance.getDestPort()); + source.put("iana_number", instance.getProtocolNumber()); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source); + + String targetFieldName = randomAlphaOfLength(100); + boolean ignore_missing = randomBoolean(); + Processor processor = createCommunityIdProcessor( + "source_ip", + null, + "destination_ip", + null, + "iana_number", + null, + "type", + "code", + 0, + targetFieldName, + ignore_missing + ); + + processor.execute(ingestDocument); + assertThat(ingestDocument.hasField(targetFieldName), equalTo(true)); + assertThat(ingestDocument.getFieldValue(targetFieldName, String.class), equalTo(instance.getHash())); + } + } + + private enum CommunityIdHashInstance { + TCP("66.35.250.204", "128.232.110.120", 6, 80, 34855, "1:LQU9qZlK+B5F3KDmev6m5PMibrg="), + UDP("8.8.8.8", "192.168.1.52", 17, 53, 54585, "1:d/FP5EW3wiY1vCndhwleRRKHowQ="), + SCTP("192.168.170.8", "192.168.170.56", 132, 7, 7, "1:MP2EtRCAUIZvTw6MxJHLV7N7JDs="), + ICMP("192.168.0.89", "192.168.0.1", 1, 8, 0, "1:X0snYXpgwiv9TZtqg64sgzUn6Dk="), + ICMP_V6("fe80::260:97ff:fe07:69ea", "ff02::1", 58, 134, 0, "1:pkvHqCL88/tg1k4cPigmZXUtL00="); + + private final String sourceIp; + private final String destIP; + private final int protocolNumber; + private final int sourcePort; + private final int destPort; + private final String hash; + + CommunityIdHashInstance(String sourceIp, String destIP, int protocolNumber, int sourcePort, int destPort, String hash) { + this.sourceIp = sourceIp; + this.destIP = destIP; + this.protocolNumber = protocolNumber; + this.sourcePort = sourcePort; + this.destPort = destPort; + this.hash = hash; + } + + private String getSourceIp() { + return this.sourceIp; + } + + private String getDestIP() { + return this.destIP; + } + + private int getProtocolNumber() { + return this.protocolNumber; + } + + private int getSourcePort() { + return this.sourcePort; + } + + private int getDestPort() { + return this.destPort; + } + + private String getHash() { + return this.hash; + } + } + + private static Processor createCommunityIdProcessor( + String sourceIPField, + String sourcePortField, + String destinationIPField, + String destinationPortField, + String ianaProtocolNumberField, + String protocolField, + String icmpTypeField, + String icmpCodeField, + int seed, + String targetField, + boolean ignoreMissing + ) { + return new CommunityIdProcessor( + randomAlphaOfLength(10), + null, + sourceIPField, + sourcePortField, + destinationIPField, + destinationPortField, + ianaProtocolNumberField, + protocolField, + icmpTypeField, + icmpCodeField, + seed, + targetField, + ignoreMissing + ); + } +} diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml index 6717b3e0ebd99..2a816f0386667 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml @@ -70,3 +70,19 @@ nodes.info: {} - contains: { nodes.$cluster_manager.ingest.processors: { type: remove_by_pattern } } + +--- +"Community_id processor exists": + - skip: + version: " - 2.12.99" + features: contains + reason: "community_id processor was introduced in 2.13.0 and contains is a newly added assertion" + - do: + cluster.state: {} + + # Get cluster-manager node id + - set: { cluster_manager_node: cluster_manager } + + - do: + nodes.info: {} + - contains: { nodes.$cluster_manager.ingest.processors: { type: community_id } } diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/320_community_id_processor.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/320_community_id_processor.yml new file mode 100644 index 0000000000000..6de5371bb49f7 --- /dev/null +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/320_community_id_processor.yml @@ -0,0 +1,370 @@ +--- +teardown: + - do: + ingest.delete_pipeline: + id: "1" + ignore: 404 + +--- +"Test creat community_id processor": + - skip: + version: " - 2.12.99" + reason: "introduced in 2.13" + - do: + catch: /\[source\_ip\_field\] required property is missing/ + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "community_id" : { + "destination_ip_field" : "dest" + } + } + ] + } + - do: + catch: /\[destination\_ip\_field\] required property is missing/ + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "community_id" : { + "source_ip_field" : "src" + } + } + ] + } + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "community_id" : { + "source_ip_field" : "source", + "destination_ip_field" : "dest", + "source_port_field" : "srcPort", + "destination_port_field" : "destPort", + "iana_protocol_number_field" : "iana_number", + "protocol_field" : "protocol", + "icmp_type_field" : "icmp", + "icmp_code_field" : "code", + "seed" : 0, + "target_field" : "community_id", + "ignore_missing" : false + } + } + ] + } + - match: { acknowledged: true } + +--- +"Test community_id processor with ignore_missing": + - skip: + version: " - 2.12.99" + reason: "introduced in 2.13" + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "community_id" : { + "source_ip_field" : "source", + "destination_ip_field" : "dest", + "source_port_field" : "srcPort", + "destination_port_field" : "destPort", + "protocol_field" : "protocol" + } + } + ] + } + - match: { acknowledged: true } + + - do: + catch: /ip address in the field \[source\] is null or empty/ + index: + index: test + id: 1 + pipeline: "1" + body: { + dest: "1.1.1.1", + protocol: "tcp" + } + + - do: + catch: /ip address in the field \[dest\] is null or empty/ + index: + index: test + id: 1 + pipeline: "1" + body: { + source: "1.1.1.1", + protocol: "tcp" + } + + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "community_id" : { + "source_ip_field" : "source", + "destination_ip_field" : "dest", + "source_port_field" : "srcPort", + "destination_port_field" : "destPort", + "protocol_field" : "protocol", + "ignore_missing" : true + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "1" + body: { + source: "1.1.1.1", + protocol: "tcp" + } + - do: + get: + index: test + id: 1 + - match: { _source: { source: "1.1.1.1", protocol: "tcp" } } + + - do: + index: + index: test + id: 1 + pipeline: "1" + body: { + dest: "2.2.2.2", + protocol: "tcp" + } + - do: + get: + index: test + id: 1 + - match: { _source: { dest: "2.2.2.2", protocol: "tcp" } } + +--- +"Test community_id processor for tcp": + - skip: + version: " - 2.12.99" + reason: "introduced in 2.13" + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "community_id" : { + "source_ip_field" : "source", + "destination_ip_field" : "dest", + "source_port_field" : "srcPort", + "destination_port_field" : "destPort", + "protocol_field" : "protocol" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "1" + body: { + source: "66.35.250.204", + dest: "128.232.110.120", + protocol: "tcp", + srcPort: 80, + destPort: 34855 + } + - do: + get: + index: test + id: 1 + - match: { _source.community_id: "1:LQU9qZlK+B5F3KDmev6m5PMibrg=" } + +--- +"Test community_id processor for udp": + - skip: + version: " - 2.12.99" + reason: "introduced in 2.13" + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "community_id" : { + "source_ip_field" : "source", + "destination_ip_field" : "dest", + "source_port_field" : "srcPort", + "destination_port_field" : "destPort", + "protocol_field" : "protocol" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "1" + body: { + source: "8.8.8.8", + dest: "192.168.1.52", + protocol: "udp", + srcPort: 53, + destPort: 54585 + } + - do: + get: + index: test + id: 1 + - match: { _source.community_id: "1:d/FP5EW3wiY1vCndhwleRRKHowQ=" } + +--- +"Test community_id processor for sctp": + - skip: + version: " - 2.12.99" + reason: "introduced in 2.13" + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "community_id" : { + "source_ip_field" : "source", + "destination_ip_field" : "dest", + "source_port_field" : "srcPort", + "destination_port_field" : "destPort", + "protocol_field" : "protocol" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "1" + body: { + source: "192.168.170.8", + dest: "192.168.170.56", + protocol: "sctp", + srcPort: 7, + destPort: 7 + } + - do: + get: + index: test + id: 1 + - match: { _source.community_id: "1:MP2EtRCAUIZvTw6MxJHLV7N7JDs=" } + +--- +"Test community_id processor for icmp": + - skip: + version: " - 2.12.99" + reason: "introduced in 2.13" + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "community_id" : { + "source_ip_field" : "source", + "destination_ip_field" : "dest", + "icmp_type_field" : "type", + "icmp_code_field" : "code", + "protocol_field" : "protocol" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "1" + body: { + source: "192.168.0.89", + dest: "192.168.0.1", + protocol: "icmp", + type: 8, + code: 0 + } + - do: + get: + index: test + id: 1 + - match: { _source.community_id: "1:X0snYXpgwiv9TZtqg64sgzUn6Dk=" } + +--- +"Test community_id processor for icmp-v6": + - skip: + version: " - 2.12.99" + reason: "introduced in 2.13" + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "community_id" : { + "source_ip_field" : "source", + "destination_ip_field" : "dest", + "icmp_type_field" : "type", + "icmp_code_field" : "code", + "protocol_field" : "protocol" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "1" + body: { + source: "fe80::260:97ff:fe07:69ea", + dest: "ff02::1", + protocol: "ipv6-icmp", + type: 134, + code: 0 + } + - do: + get: + index: test + id: 1 + - match: { _source.community_id: "1:pkvHqCL88/tg1k4cPigmZXUtL00=" }