Skip to content

Commit

Permalink
Introduce execution hint for Cardinality aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
maitreya2954 committed Sep 5, 2024
1 parent 6e70191 commit b7560d6
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public final class CardinalityAggregationBuilder extends ValuesSourceAggregation

private static final ParseField REHASH = new ParseField("rehash").withAllDeprecated("no replacement - values will always be rehashed");
public static final ParseField PRECISION_THRESHOLD_FIELD = new ParseField("precision_threshold");
public static final ParseField EXECUTION_HINT_FIELD = new ParseField(("execution_hint"));

public static final ObjectParser<CardinalityAggregationBuilder, String> PARSER = ObjectParser.fromBuilder(
NAME,
Expand All @@ -76,6 +77,7 @@ public final class CardinalityAggregationBuilder extends ValuesSourceAggregation
static {
ValuesSourceAggregationBuilder.declareFields(PARSER, true, false, false);
PARSER.declareLong(CardinalityAggregationBuilder::precisionThreshold, CardinalityAggregationBuilder.PRECISION_THRESHOLD_FIELD);
PARSER.declareString(CardinalityAggregationBuilder::executionHint, CardinalityAggregationBuilder.EXECUTION_HINT_FIELD);
PARSER.declareLong((b, v) -> {/*ignore*/}, REHASH);
}

Expand All @@ -85,6 +87,8 @@ public static void registerAggregators(ValuesSourceRegistry.Builder builder) {

private Long precisionThreshold = null;

private String executionHint = null;

public CardinalityAggregationBuilder(String name) {
super(name);
}
Expand All @@ -96,6 +100,7 @@ public CardinalityAggregationBuilder(
) {
super(clone, factoriesBuilder, metadata);
this.precisionThreshold = clone.precisionThreshold;
this.executionHint = clone.executionHint;
}

@Override
Expand All @@ -111,6 +116,7 @@ public CardinalityAggregationBuilder(StreamInput in) throws IOException {
if (in.readBoolean()) {
precisionThreshold = in.readLong();
}
executionHint = in.readOptionalString();
}

@Override
Expand All @@ -125,6 +131,7 @@ protected void innerWriteTo(StreamOutput out) throws IOException {
if (hasPrecisionThreshold) {
out.writeLong(precisionThreshold);
}
out.writeOptionalString(executionHint);
}

@Override
Expand Down Expand Up @@ -155,27 +162,37 @@ public Long precisionThreshold() {
return precisionThreshold;
}

public CardinalityAggregationBuilder executionHint(String executionHint) {
this.executionHint = executionHint;
return this;
}

public String executionHint() { return executionHint; }

@Override
protected CardinalityAggregatorFactory innerBuild(
QueryShardContext queryShardContext,
ValuesSourceConfig config,
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder
) throws IOException {
return new CardinalityAggregatorFactory(name, config, precisionThreshold, queryShardContext, parent, subFactoriesBuilder, metadata);
return new CardinalityAggregatorFactory(name, config, precisionThreshold, executionHint, queryShardContext, parent, subFactoriesBuilder, metadata);
}

@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
if (precisionThreshold != null) {
builder.field(PRECISION_THRESHOLD_FIELD.getPreferredName(), precisionThreshold);
}
if (executionHint != null) {
builder.field(EXECUTION_HINT_FIELD.getPreferredName(), executionHint);
}
return builder;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), precisionThreshold);
return Objects.hash(super.hashCode(), precisionThreshold, executionHint);
}

@Override
Expand All @@ -184,7 +201,8 @@ public boolean equals(Object obj) {
if (obj == null || getClass() != obj.getClass()) return false;
if (super.equals(obj) == false) return false;
CardinalityAggregationBuilder other = (CardinalityAggregationBuilder) obj;
return Objects.equals(precisionThreshold, other.precisionThreshold);
return Objects.equals(precisionThreshold, other.precisionThreshold)
&& Objects.equals(executionHint, other.executionHint);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue

private static final Logger logger = LogManager.getLogger(CardinalityAggregator.class);

private final CardinalityAggregatorFactory.ExecutionMode executionMode;
private final int precision;
private final ValuesSource valuesSource;

Expand All @@ -111,6 +112,7 @@ public CardinalityAggregator(
String name,
ValuesSourceConfig valuesSourceConfig,
int precision,
CardinalityAggregatorFactory.ExecutionMode executionMode,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata
Expand All @@ -121,6 +123,7 @@ public CardinalityAggregator(
this.precision = precision;
this.counts = valuesSource == null ? null : new HyperLogLogPlusPlus(precision, context.bigArrays(), 1);
this.valuesSourceConfig = valuesSourceConfig;
this.executionMode = executionMode;
}

@Override
Expand All @@ -129,6 +132,7 @@ public ScoreMode scoreMode() {
}

private Collector pickCollector(LeafReaderContext ctx) throws IOException {
logger.info("ValuesSource Type: " + valuesSource);
if (valuesSource == null) {
emptyCollectorsUsed++;
return new EmptyCollector();
Expand All @@ -151,6 +155,9 @@ private Collector pickCollector(LeafReaderContext ctx) throws IOException {
if (maxOrd == 0) {
emptyCollectorsUsed++;
return new EmptyCollector();
} else if (executionMode == CardinalityAggregatorFactory.ExecutionMode.ORDINAL) { // Force OrdinalsCollector
ordinalsCollectorsUsed++;
collector = new OrdinalsCollector(counts, ordinalValues, context.bigArrays());
} else {
final long ordinalsMemoryUsage = OrdinalsCollector.memoryOverhead(maxOrd);
final long countsMemoryUsage = HyperLogLogPlusPlus.memoryUsage(precision);
Expand Down Expand Up @@ -261,6 +268,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc
postCollectLastCollector();

collector = pickCollector(ctx);
logger.info("Collector chosen: " + collector);
return collector;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,53 @@
*/
class CardinalityAggregatorFactory extends ValuesSourceAggregatorFactory {

public static enum ExecutionMode {

UNSET(null),
DIRECT("direct"),
ORDINAL("ordinal");

private final String hintString;

ExecutionMode(String hintString) {
this.hintString = hintString;
}

public static ExecutionMode fromString(String value) {
if (value == null) {
return UNSET;
}
switch(value) {
case "direct": return DIRECT;
case "ordinal": return ORDINAL;
default:
throw new IllegalArgumentException("Unknown `execution_hint`: [" + value + "], expected any of [direct, ordinals]");
}
}

@Override
public String toString() {
return hintString;
}
}

private final ExecutionMode executionMode;

private final Long precisionThreshold;

CardinalityAggregatorFactory(
String name,
ValuesSourceConfig config,
Long precisionThreshold,
String executionHint,
QueryShardContext queryShardContext,
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metadata
) throws IOException {
super(name, config, queryShardContext, parent, subFactoriesBuilder, metadata);
this.precisionThreshold = precisionThreshold;
this.executionMode = ExecutionMode.fromString(executionHint);
}

public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
Expand All @@ -74,7 +108,7 @@ public static void registerAggregators(ValuesSourceRegistry.Builder builder) {

@Override
protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map<String, Object> metadata) throws IOException {
return new CardinalityAggregator(name, config, precision(), searchContext, parent, metadata);
return new CardinalityAggregator(name, config, precision(), executionMode, searchContext, parent, metadata);
}

@Override
Expand All @@ -86,7 +120,7 @@ protected Aggregator doCreateInternal(
) throws IOException {
return queryShardContext.getValuesSourceRegistry()
.getAggregator(CardinalityAggregationBuilder.REGISTRY_KEY, config)
.build(name, config, precision(), searchContext, parent, metadata);
.build(name, config, precision(), executionMode, searchContext, parent, metadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Aggregator build(
String name,
ValuesSourceConfig valuesSourceConfig,
int precision,
CardinalityAggregatorFactory.ExecutionMode executionMode,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata
Expand Down

0 comments on commit b7560d6

Please sign in to comment.