Skip to content

Commit

Permalink
Implement Presto aggregate function map_union (#1827)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: #1827

Test Plan:
## Unit Test
```
[~/fbsource/fbcode]$ buck test velox/functions/prestosql/aggregates/tests:test
```

## End-to-end Test
```
[~/fbsource/fbcode/presto_cpp/public_tld/java/presto-native-tests]$ JAVA_HOME=/usr/local/fb-jdk-8 mvn -Dmaven.compiler.fork=true -Dmaven.compiler.executable=/usr/local/fb-jdk-8/bin/javac -Djava.net.preferIPv6Addresses=true -DPRESTO_SERVER=$HOME/fbsource/fbcode/buck-out/gen/presto_cpp/main/presto_server -DDATA_DIR=/tmp -Duser.timezone=America/Bahia_Banderas -Dtest=TestHiveAggregationQueries test
```

Reviewed By: kagamiori

Differential Revision: D37088301

Pulled By: benitakbritto

fbshipit-source-id: 209ee7b1ad2e41608226215f3bcb788d687af8e4
  • Loading branch information
benitakbritto authored and facebook-github-bot committed Jun 24, 2022
1 parent 9d10897 commit 2002953
Show file tree
Hide file tree
Showing 9 changed files with 574 additions and 216 deletions.
6 changes: 6 additions & 0 deletions velox/docs/functions/aggregate.rst
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ Map Aggregate Functions

Returns a map created from the input ``key`` / ``value`` pairs.

.. function:: map_union(map(K,V)) -> map(K,V)

Returns the union of all the input ``maps``.
If a ``key`` is found in multiple input ``maps``,
that ``key’s`` ``value`` in the resulting ``map`` comes from an arbitrary input ``map``.

Approximate Aggregate Functions
-------------------------------

Expand Down
1 change: 1 addition & 0 deletions velox/functions/prestosql/aggregates/AggregateNames.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const char* const kCovarSamp = "covar_samp";
const char* const kEvery = "every";
const char* const kHistogram = "histogram";
const char* const kMapAgg = "map_agg";
const char* const kMapUnion = "map_union";
const char* const kMax = "max";
const char* const kMaxBy = "max_by";
const char* const kMerge = "merge";
Expand Down
3 changes: 3 additions & 0 deletions velox/functions/prestosql/aggregates/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ add_library(
ChecksumAggregate.cpp
HistogramAggregate.cpp
MapAggAggregate.cpp
MapAggregateBase.h
MapAggregateBase.cpp
MapUnionAggregate.cpp
MinMaxAggregates.cpp
MinMaxByAggregates.cpp
CountAggregate.cpp
Expand Down
219 changes: 3 additions & 216 deletions velox/functions/prestosql/aggregates/MapAggAggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,95 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/exec/ContainerRowSerde.h"
#include "velox/expression/FunctionSignature.h"
#include "velox/functions/prestosql/aggregates/AggregateNames.h"
#include "velox/functions/prestosql/aggregates/ValueList.h"
#include "velox/functions/prestosql/aggregates/MapAggregateBase.h"

namespace facebook::velox::aggregate {
namespace {

struct MapAccumulator {
ValueList keys;
ValueList values;
};

// See documentation at
// https://prestodb.io/docs/current/functions/aggregate.html
class MapAggAggregate : public exec::Aggregate {
class MapAggAggregate : public aggregate::MapAggregateBase {
public:
explicit MapAggAggregate(TypePtr resultType) : Aggregate(resultType) {}

int32_t accumulatorFixedWidthSize() const override {
return sizeof(MapAccumulator);
}

bool isFixedSize() const override {
return false;
}

void initializeNewGroups(
char** groups,
folly::Range<const vector_size_t*> indices) override {
for (auto index : indices) {
new (groups[index] + offset_) MapAccumulator();
}
}

void finalize(char** groups, int32_t numGroups) override {
for (auto i = 0; i < numGroups; i++) {
value<MapAccumulator>(groups[i])->keys.finalize(allocator_);
value<MapAccumulator>(groups[i])->values.finalize(allocator_);
}
}

void extractValues(char** groups, int32_t numGroups, VectorPtr* result)
override {
auto mapVector = (*result)->as<MapVector>();
VELOX_CHECK(mapVector);
mapVector->resize(numGroups);

auto mapKeys = mapVector->mapKeys();
auto mapValues = mapVector->mapValues();

auto numElements = countElements(groups, numGroups);
mapKeys->resize(numElements);
mapValues->resize(numElements);

auto* rawNulls = getRawNulls(mapVector);
vector_size_t offset = 0;
for (int32_t i = 0; i < numGroups; ++i) {
char* group = groups[i];
clearNull(rawNulls, i);

auto accumulator = value<MapAccumulator>(group);
auto mapSize = accumulator->keys.size();
if (mapSize) {
ValueListReader keysReader(accumulator->keys);
ValueListReader valuesReader(accumulator->values);
for (auto index = 0; index < mapSize; ++index) {
keysReader.next(*mapKeys, offset + index);
valuesReader.next(*mapValues, offset + index);
}
mapVector->setOffsetAndSize(i, offset, mapSize);
offset += mapSize;
} else {
mapVector->setOffsetAndSize(i, offset, 0);
}
}

// canonicalize requires a singly referenced MapVector. std::move
// inside the cast does not clear *result, so we clear this
// manually.
auto mapVectorPtr = std::static_pointer_cast<MapVector>(std::move(*result));
*result = nullptr;
*result = removeDuplicates(mapVectorPtr);
}

void extractAccumulators(char** groups, int32_t numGroups, VectorPtr* result)
override {
extractValues(groups, numGroups, result);
}
explicit MapAggAggregate(TypePtr resultType) : MapAggregateBase(resultType) {}

void addRawInput(
char** groups,
Expand All @@ -123,29 +43,6 @@ class MapAggAggregate : public exec::Aggregate {
});
}

void addIntermediateResults(
char** groups,
const SelectivityVector& rows,
const std::vector<VectorPtr>& args,
bool /*mayPushdown*/) override {
decodedIntermediate_.decode(*args[0], rows);

auto mapVector = decodedIntermediate_.base()->as<MapVector>();
auto& mapKeys = mapVector->mapKeys();
auto& mapValues = mapVector->mapValues();
rows.applyToSelected([&](vector_size_t row) {
auto group = groups[row];
auto accumulator = value<MapAccumulator>(group);

auto decodedRow = decodedIntermediate_.index(row);
auto offset = mapVector->offsetAt(decodedRow);
auto size = mapVector->sizeAt(decodedRow);
auto tracker = trackRowSize(group);
accumulator->keys.appendRange(mapKeys, offset, size, allocator_);
accumulator->values.appendRange(mapValues, offset, size, allocator_);
});
}

void addSingleGroupRawInput(
char* group,
const SelectivityVector& rows,
Expand All @@ -166,116 +63,6 @@ class MapAggAggregate : public exec::Aggregate {
}
});
}

void addSingleGroupIntermediateResults(
char* group,
const SelectivityVector& rows,
const std::vector<VectorPtr>& args,
bool /* mayPushdown */) override {
decodedIntermediate_.decode(*args[0], rows);

auto accumulator = value<MapAccumulator>(group);
auto mapVector = decodedIntermediate_.base()->as<MapVector>();
auto& keys = accumulator->keys;
auto& values = accumulator->values;

auto& mapKeys = mapVector->mapKeys();
auto& mapValues = mapVector->mapValues();
rows.applyToSelected([&](vector_size_t row) {
auto decodedRow = decodedIntermediate_.index(row);
auto offset = mapVector->offsetAt(decodedRow);
auto size = mapVector->sizeAt(decodedRow);
keys.appendRange(mapKeys, offset, size, allocator_);
values.appendRange(mapValues, offset, size, allocator_);
});
}

void destroy(folly::Range<char**> groups) override {
for (auto group : groups) {
auto accumulator = value<MapAccumulator>(group);
accumulator->keys.free(allocator_);
accumulator->values.free(allocator_);
}
}

private:
vector_size_t countElements(char** groups, int32_t numGroups) const {
vector_size_t size = 0;
for (int32_t i = 0; i < numGroups; ++i) {
size += value<MapAccumulator>(groups[i])->keys.size();
}
return size;
}

VectorPtr removeDuplicates(MapVectorPtr& mapVector) const {
MapVector::canonicalize(mapVector);

auto offsets = mapVector->rawOffsets();
auto sizes = mapVector->rawSizes();
auto mapKeys = mapVector->mapKeys();

auto numRows = mapVector->size();
auto numElements = mapKeys->size();

BufferPtr newSizes;
vector_size_t* rawNewSizes = nullptr;

BufferPtr elementIndices;
vector_size_t* rawElementIndices = nullptr;

// Check for duplicate keys
for (vector_size_t row = 0; row < numRows; row++) {
auto offset = offsets[row];
auto size = sizes[row];
auto duplicateCnt = 0;
for (vector_size_t i = 1; i < size; i++) {
if (mapKeys->equalValueAt(mapKeys.get(), offset + i, offset + i - 1)) {
// duplicate key
duplicateCnt++;
if (!rawNewSizes) {
newSizes =
allocateSizes(mapVector->mapKeys()->size(), mapVector->pool());
rawNewSizes = newSizes->asMutable<vector_size_t>();

elementIndices = allocateIndices(
mapVector->mapKeys()->size(), mapVector->pool());
rawElementIndices = elementIndices->asMutable<vector_size_t>();

memcpy(rawNewSizes, sizes, row * sizeof(vector_size_t));
std::iota(rawElementIndices, rawElementIndices + offset + i, 0);
}
} else if (rawNewSizes) {
rawElementIndices[offset + i - duplicateCnt] = offset + i;
}
}
if (rawNewSizes) {
rawNewSizes[row] = size - duplicateCnt;
}
};

if (rawNewSizes) {
return std::make_shared<MapVector>(
mapVector->pool(),
mapVector->type(),
mapVector->nulls(),
mapVector->size(),
mapVector->offsets(),
newSizes,
BaseVector::wrapInDictionary(
BufferPtr(nullptr), elementIndices, numElements, mapKeys),
BaseVector::wrapInDictionary(
BufferPtr(nullptr),
elementIndices,
numElements,
mapVector->mapValues()));
} else {
return mapVector;
}
}

DecodedVector decodedKeys_;
DecodedVector decodedValues_;
DecodedVector decodedIntermediate_;
};

bool registerMapAggAggregate(const std::string& name) {
Expand Down
Loading

0 comments on commit 2002953

Please sign in to comment.