Skip to content

Commit

Permalink
Adding distributed scalar aggregates (#570)
Browse files Browse the repository at this point in the history
* init

* incomplete

* changing column API

* refactoring column

* adding allreduce column

* adding min, max, count

* adding mean

* removing ctx from communicator methods

* adding variance and stddev

* minor refactor

* minor fix

* adding table aggregations

* more tests

* adding cython bindings and tests

* Update aggregate_test.cpp

* incorporating suggestions from the review
  • Loading branch information
nirandaperera committed Mar 4, 2022
1 parent 9c2fdc4 commit b2c0820
Show file tree
Hide file tree
Showing 40 changed files with 1,663 additions and 300 deletions.
3 changes: 2 additions & 1 deletion cpp/src/cylon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ add_library(cylon SHARED
compute/aggregate_utils.hpp
compute/aggregates.cpp
compute/aggregates.hpp
compute/scalar_aggregate.cpp
ctx/arrow_memory_pool_utils.cpp
ctx/arrow_memory_pool_utils.hpp
ctx/cylon_context.cpp
Expand Down Expand Up @@ -180,7 +181,7 @@ add_library(cylon SHARED
util/sort.hpp
util/to_string.hpp
util/uuid.cpp
util/uuid.hpp)
util/uuid.hpp scalar.cpp scalar.hpp)

IF(NOT MSVC)
if(APPLE)
Expand Down
41 changes: 22 additions & 19 deletions cpp/src/cylon/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,32 @@
* limitations under the License.
*/

#include <cylon/column.hpp>
#include "cylon/column.hpp"
#include "cylon/arrow/arrow_types.hpp"
#include "cylon/util/macros.hpp"
#include "cylon/ctx/arrow_memory_pool_utils.hpp"

namespace cylon {

std::shared_ptr<arrow::ChunkedArray> Column::GetColumnData() const{
return this->data_array;
};

std::string Column::GetID() const{
return this->id;
}
std::shared_ptr<DataType> Column::GetDataType() const{
return this->type;
std::shared_ptr<Column> Column::Make(std::shared_ptr<arrow::Array> data_) {
return std::make_shared<Column>(std::move(data_));
}
std::shared_ptr<Column> Column::Make(const std::string &id,
const std::shared_ptr<DataType> &type,
const std::shared_ptr<arrow::ChunkedArray> &data_) {
return std::make_shared<Column>(id, type, data_);
}
std::shared_ptr<Column> Column::Make(const std::string &id,
const std::shared_ptr<DataType> &type,
const std::shared_ptr<arrow::Array> &data_) {
return std::make_shared<Column>(id, type, data_);

Status Column::Make(const std::shared_ptr<CylonContext> &ctx,
const std::shared_ptr<arrow::ChunkedArray> &data_,
std::shared_ptr<Column> *output) {
CYLON_ASSIGN_OR_RAISE(auto arr, arrow::Concatenate(data_->chunks(), ToArrowPool(ctx)))
*output = Column::Make(std::move(arr));
return Status::OK();
}

Column::Column(std::shared_ptr<arrow::Array> data)
: type_(tarrow::ToCylonType(data->type())), data_(std::move(data)) {}

const std::shared_ptr<arrow::Array> &Column::data() const { return data_; }

const std::shared_ptr<DataType> &Column::type() const { return type_; }

int64_t Column::length() const { return data_->length(); }

} // namespace cylon
51 changes: 14 additions & 37 deletions cpp/src/cylon/column.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,77 +16,54 @@
#define CYLON_SRC_IO_COLUMN_H_

#include <string>
#include <utility>
#include <memory>
#include <arrow/api.h>
#include <arrow/table.h>

#include "cylon/data_types.hpp"
#include "cylon/ctx/arrow_memory_pool_utils.hpp"
#include "cylon/arrow/arrow_types.hpp"
#include "cylon/ctx/cylon_context.hpp"

namespace cylon {

class Column {
public:
Column(std::string id, std::shared_ptr<DataType> type,
std::shared_ptr<arrow::ChunkedArray> data_)
: id(std::move(id)), type(std::move(type)), data_array(std::move(data_)) {
}

Column(std::string id, std::shared_ptr<DataType> type,
std::shared_ptr<arrow::Array> data_)
: id(std::move(id)), type(std::move(type)),
data_array(std::make_shared<arrow::ChunkedArray>(std::move(data_))) {
}
explicit Column(std::shared_ptr<arrow::Array> data);

/**
* Return the data wrapped by column
* @return arrow chunked array
*/
std::shared_ptr<arrow::ChunkedArray> GetColumnData() const;

/**
* Return the unique id of the array
* @return
*/
std::string GetID() const;
const std::shared_ptr<arrow::Array> &data() const;

/**
* Return the data type of the column
* @return
*/
std::shared_ptr<DataType> GetDataType() const;
const std::shared_ptr<DataType> &type() const;

static std::shared_ptr<Column> Make(const std::string &id, const std::shared_ptr<DataType> &type,
const std::shared_ptr<arrow::ChunkedArray> &data_);
int64_t length() const;

static std::shared_ptr<Column> Make(const std::string &id, const std::shared_ptr<DataType> &type,
const std::shared_ptr<arrow::Array> &data_);
static std::shared_ptr<Column> Make(std::shared_ptr<arrow::Array> data_);
static Status Make(const std::shared_ptr<CylonContext> &ctx,
const std::shared_ptr<arrow::ChunkedArray> &data_,
std::shared_ptr<Column> *output);

template<typename T, typename = typename std::enable_if<std::is_arithmetic<T>::value, T>::type>
static Status FromVector(const std::shared_ptr<CylonContext> &ctx,
const std::string &id,
const std::shared_ptr<DataType> &type,
const std::vector<T> &data_vector,
std::shared_ptr<Column> &output) {
static Status FromVector(const std::vector<T> &data_vector, std::shared_ptr<Column> &output) {
using ArrowT = typename arrow::CTypeTraits<T>::ArrowType;
using BuilderT = typename arrow::TypeTraits<ArrowT>::BuilderType;

// copy data to a buffer
BuilderT builder(ToArrowPool(ctx));
BuilderT builder;
RETURN_CYLON_STATUS_IF_ARROW_FAILED(builder.AppendValues(data_vector));

std::shared_ptr<arrow::Array> arr;
RETURN_CYLON_STATUS_IF_ARROW_FAILED(builder.Finish(&arr));
output = std::make_shared<Column>(id, type, std::move(arr));
output = std::make_shared<Column>(std::move(arr));
return Status::OK();
}

private:
std::string id; // The id of the column
std::shared_ptr<DataType> type; // The datatype of the column
std::shared_ptr<arrow::ChunkedArray> data_array; // pointer to the data array
std::shared_ptr<DataType> type_; // The datatype of the column
std::shared_ptr<arrow::Array> data_; // pointer to the data array
};

} // namespace cylon
Expand Down
15 changes: 10 additions & 5 deletions cpp/src/cylon/compute/aggregate_kernels.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,22 @@ struct KernelOptions {
virtual ~KernelOptions() = default;
};

struct BasicOptions : public KernelOptions {
explicit BasicOptions(bool skip_nulls = true) : skip_nulls_(skip_nulls) {}
bool skip_nulls_;
};

/**
* Variance kernel options
*/
struct VarKernelOptions : public KernelOptions {
struct VarKernelOptions : public BasicOptions {
/**
* @param ddof delta degree of freedom
*/
explicit VarKernelOptions(int ddof) : ddof(ddof) {}
VarKernelOptions() : VarKernelOptions(1) {}
explicit VarKernelOptions(int ddof = 0, bool skip_nulls = true)
: BasicOptions(skip_nulls), ddof_(ddof) {}

int ddof;
int ddof_;
};

struct QuantileKernelOptions : public KernelOptions {
Expand Down Expand Up @@ -399,7 +404,7 @@ class VarianceKernel : public TypedAggregationKernel<VarianceKernel<T>, VAR, T>
explicit VarianceKernel(bool do_std = false) : ddof(0), do_std(do_std) {}

void KernelSetup(VarKernelOptions *options) {
ddof = options->ddof;
ddof = options->ddof_;
}

inline void KernelInitializeState(std::tuple<T, T, int64_t> *state) const {
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/cylon/compute/aggregates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
#include <cylon/net/comm_operations.hpp>
#include <cylon/util/macros.hpp>

#include <cylon/compute/aggregates.hpp>
#include <cylon/compute/aggregate_utils.hpp>
#include "cylon/compute/aggregates.hpp"
#include "cylon/compute/aggregate_utils.hpp"
#include "cylon/arrow/arrow_types.hpp"

namespace cylon {
namespace compute {
Expand Down
Loading

0 comments on commit b2c0820

Please sign in to comment.