Skip to content

Commit

Permalink
Merge pull request #174 from Flipkart/unique-over-histogram
Browse files Browse the repository at this point in the history
Added distinct support over a field for histogram
  • Loading branch information
santanusinha committed Jul 27, 2016
2 parents fb88514 + f0d134f commit 1643334
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class HistogramRequest extends ActionRequest {

private String field = "_timestamp";

private String uniqueCountOn;

private Period period;

public HistogramRequest() {
Expand All @@ -53,6 +55,14 @@ public void setPeriod(Period period) {
this.period = period;
}

public String getUniqueCountOn() {
return uniqueCountOn;
}

public void setUniqueCountOn(String uniqueCountOn) {
this.uniqueCountOn = uniqueCountOn;
}

public String getField() {
return field;
}
Expand All @@ -64,9 +74,10 @@ public void setField(String field) {
@Override
public String toString() {
return new ToStringBuilder(this)
.appendSuper(super.toString())
.append("table", table)
.append("filters", getFilters())
.append("field", field)
.append("uniqueCountOn", uniqueCountOn)
.append("period", period)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2014 Flipkart Internet Pvt. Ltd.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -26,8 +26,6 @@
* Time: 12:14 AM
*/
public class HistogramResponse implements ActionResponse {
private long from;
private long to;

public static class Count {
private Number period;
Expand Down Expand Up @@ -102,26 +100,9 @@ public void setCounts(List<Count> counts) {
this.counts = counts;
}


public long getFrom() {
return from;
}

public void setFrom(long from) {
this.from = from;
}

public long getTo() {
return to;
}

public void setTo(long to) {
this.to = to;
}

@Override
public void accept(ResponseVisitor visitor) {
visitor.visit(this);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.metrics.cardinality.Cardinality;

Expand Down Expand Up @@ -103,10 +102,7 @@ public ActionResponse execute(CountRequest parameter) throws FoxtrotException {
.setSearchType(SearchType.COUNT)
.setQuery(new ElasticSearchQueryGenerator(FilterCombinerType.and)
.genFilter(parameter.getFilters()))
.addAggregation(AggregationBuilders
.cardinality(Utils.sanitizeFieldForAggregation(parameter.getField()))
.field(parameter.getField())
);
.addAggregation(Utils.buildCardinalityAggregation(parameter.getField()));
} catch (Exception e) {
throw FoxtrotExceptions.queryCreationException(parameter, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramBuilder;
import org.elasticsearch.search.aggregations.metrics.cardinality.Cardinality;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -102,6 +105,10 @@ public void validateImpl(HistogramRequest parameter) throws MalformedQueryExcept
validationErrors.add("time period cannot be null");
}

if (parameter.getUniqueCountOn() != null && parameter.getUniqueCountOn().isEmpty()) {
validationErrors.add("distinct field cannot be empty (can be null)");
}

if (!CollectionUtils.isNullOrEmpty(validationErrors)) {
throw FoxtrotExceptions.createMalformedQueryException(parameter, validationErrors);
}
Expand All @@ -110,8 +117,8 @@ public void validateImpl(HistogramRequest parameter) throws MalformedQueryExcept
@Override
public ActionResponse execute(HistogramRequest parameter) throws FoxtrotException {
SearchRequestBuilder searchRequestBuilder;
DateHistogram.Interval interval = Utils.getHistogramInterval(parameter.getPeriod());
String dateHistogramKey = Utils.getDateHistogramKey(parameter.getField());

AbstractAggregationBuilder aggregationBuilder = buildAggregation();
try {
searchRequestBuilder = getConnection().getClient().prepareSearch(
ElasticsearchUtils.getIndices(parameter.getTable(), parameter))
Expand All @@ -121,31 +128,49 @@ public ActionResponse execute(HistogramRequest parameter) throws FoxtrotExceptio
.genFilter(parameter.getFilters()))
.setSize(0)
.setSearchType(SearchType.COUNT)
.addAggregation(Utils.buildDateHistogramAggregation(parameter.getField(), interval));
.addAggregation(aggregationBuilder);
} catch (Exception e) {
throw FoxtrotExceptions.queryCreationException(parameter, e);
}

try {
SearchResponse response = searchRequestBuilder.execute().actionGet();
Aggregations aggregations = response.getAggregations();
if (aggregations == null) {
return new HistogramResponse(Collections.<HistogramResponse.Count>emptyList());
}
DateHistogram dateHistogram = aggregations.get(dateHistogramKey);
Collection<? extends DateHistogram.Bucket> buckets = dateHistogram.getBuckets();
List<HistogramResponse.Count> counts = new ArrayList<>(buckets.size());
for (DateHistogram.Bucket bucket : buckets) {
HistogramResponse.Count count = new HistogramResponse.Count(
bucket.getKeyAsNumber(), bucket.getDocCount());
counts.add(count);
}
return new HistogramResponse(counts);
return buildResponse(aggregations);
} catch (ElasticsearchException e) {
throw FoxtrotExceptions.createQueryExecutionException(parameter, e);
}
}

private HistogramResponse buildResponse(Aggregations aggregations) {
if (aggregations == null) {
return new HistogramResponse(Collections.<HistogramResponse.Count>emptyList());
}
String dateHistogramKey = Utils.getDateHistogramKey(getParameter().getField());
DateHistogram dateHistogram = aggregations.get(dateHistogramKey);
Collection<? extends DateHistogram.Bucket> buckets = dateHistogram.getBuckets();
List<HistogramResponse.Count> counts = new ArrayList<>(buckets.size());
for (DateHistogram.Bucket bucket : buckets) {
if (!CollectionUtils.isNullOrEmpty(getParameter().getUniqueCountOn())) {
String key = Utils.sanitizeFieldForAggregation(getParameter().getUniqueCountOn());
Cardinality cardinality = bucket.getAggregations().get(key);
counts.add(new HistogramResponse.Count(bucket.getKeyAsNumber(), cardinality.getValue()));
} else {
counts.add(new HistogramResponse.Count(bucket.getKeyAsNumber(), bucket.getDocCount()));
}
}
return new HistogramResponse(counts);
}

private AbstractAggregationBuilder buildAggregation() {
DateHistogram.Interval interval = Utils.getHistogramInterval(getParameter().getPeriod());
DateHistogramBuilder histogramBuilder = Utils.buildDateHistogramAggregation(getParameter().getField(), interval);
if (!CollectionUtils.isNullOrEmpty(getParameter().getUniqueCountOn())) {
histogramBuilder.subAggregation(Utils.buildCardinalityAggregation(getParameter().getUniqueCountOn()));
}
return histogramBuilder;
}

@Override
protected Filter getDefaultTimeSpan() {
LastFilter lastFilter = new LastFilter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramBuilder;
import org.elasticsearch.search.aggregations.metrics.cardinality.CardinalityBuilder;
import org.elasticsearch.search.aggregations.metrics.percentiles.InternalPercentiles;
import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile;
import org.elasticsearch.search.aggregations.metrics.stats.extended.InternalExtendedStats;
Expand Down Expand Up @@ -36,6 +37,16 @@ public static DateHistogramBuilder buildDateHistogramAggregation(String field, D
.interval(interval);
}

public static CardinalityBuilder buildCardinalityAggregation(String field) {
return AggregationBuilders
.cardinality(Utils.sanitizeFieldForAggregation(field))
.field(field);
}

public static String sanitizeFieldForAggregation(String field) {
return field.replaceAll(Constants.FIELD_REPLACEMENT_REGEX, Constants.FIELD_REPLACEMENT_VALUE);
}

public static DateHistogram.Interval getHistogramInterval(Period period) {
DateHistogram.Interval interval;
switch (period) {
Expand All @@ -58,10 +69,6 @@ public static DateHistogram.Interval getHistogramInterval(Period period) {
return interval;
}

public static String sanitizeFieldForAggregation(String field) {
return field.replaceAll(Constants.FIELD_REPLACEMENT_REGEX, Constants.FIELD_REPLACEMENT_VALUE);
}

public static String getExtendedStatsAggregationKey(String field) {
return sanitizeFieldForAggregation(field) + "_extended_stats";
}
Expand Down

0 comments on commit 1643334

Please sign in to comment.