Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#610: SampleArray util method replaced by using arrow::compute::Take … #612

Merged
merged 13 commits into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 46 additions & 69 deletions cpp/src/cylon/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
#include <cylon/util/arrow_utils.hpp>
#include <cylon/util/macros.hpp>
#include <cylon/util/to_string.hpp>
#include <cylon/util/arrow_utils.hpp>
#include <cylon/repartition.hpp>
#include <cylon/scalar.hpp>
#include <cylon/serialize/table_serialize.hpp>

namespace cylon {

Expand Down Expand Up @@ -103,9 +105,9 @@ static inline Status all_to_all_arrow_tables(const std::shared_ptr<CylonContext>

// entries from each RANK are separated
static inline Status all_to_all_arrow_tables_separated_arrow_table(const std::shared_ptr<CylonContext> &ctx,
const std::shared_ptr<arrow::Schema> &schema,
const std::vector<std::shared_ptr<arrow::Table>> &partitioned_tables,
std::vector<std::shared_ptr<arrow::Table>> &received_tables) {
const std::shared_ptr<arrow::Schema> &schema,
const std::vector<std::shared_ptr<arrow::Table>> &partitioned_tables,
std::vector<std::shared_ptr<arrow::Table>> &received_tables) {
const auto &neighbours = ctx->GetNeighbours(true);
received_tables.resize(ctx->GetWorldSize());

Expand Down Expand Up @@ -154,14 +156,14 @@ static inline Status all_to_all_arrow_tables_separated_arrow_table(const std::sh
}

static inline Status all_to_all_arrow_tables_separated_cylon_table(const std::shared_ptr<CylonContext> &ctx,
const std::shared_ptr<arrow::Schema> &schema,
const std::vector<std::shared_ptr<arrow::Table>> &partitioned_tables,
std::vector<std::shared_ptr<Table>> &table_out) {
const std::shared_ptr<arrow::Schema> &schema,
const std::vector<std::shared_ptr<arrow::Table>> &partitioned_tables,
std::vector<std::shared_ptr<Table>> &table_out) {
std::vector<std::shared_ptr<arrow::Table>> received_tables;
all_to_all_arrow_tables_separated_arrow_table(ctx, schema, partitioned_tables, received_tables);

table_out.reserve(received_tables.size() - 1);
for (auto &received_table: received_tables) {
for (auto &received_table : received_tables) {
if (received_table->num_rows() > 0) {
CYLON_ASSIGN_OR_RAISE(auto arrow_tb, received_table->CombineChunks(cylon::ToArrowPool(ctx)));
auto temp = std::make_shared<Table>(ctx, std::move(arrow_tb));
Expand Down Expand Up @@ -235,9 +237,9 @@ static inline Status shuffle_two_tables_by_hashing(const std::shared_ptr<cylon::
Status FromCSV(const std::shared_ptr<CylonContext> &ctx, const std::string &path,
std::shared_ptr<Table> &tableOut, const cylon::io::config::CSVReadOptions &options) {
arrow::Result<std::shared_ptr<arrow::Table>> result = cylon::io::read_csv(ctx, path, options);

LOG(INFO) << "Reading Inside FromCSV";

if (result.ok()) {
LOG(INFO) << "CSV file reading is OK";
std::shared_ptr<arrow::Table> &table = result.ValueOrDie();
Expand Down Expand Up @@ -291,7 +293,7 @@ Status Table::FromColumns(const std::shared_ptr<CylonContext> &ctx,
arrow::SchemaBuilder schema_builder;
arrow::ArrayVector arrays;

if (columns.size() != column_names.size()){
if (columns.size() != column_names.size()) {
return {Code::Invalid, "number of columns != number of column names"};
}

Expand Down Expand Up @@ -341,7 +343,7 @@ Status Merge(const std::vector<std::shared_ptr<cylon::Table>> &ctables,
if (!ctables.empty()) {
std::vector<std::shared_ptr<arrow::Table>> tables;
tables.reserve(ctables.size());
for (const auto &t: ctables) {
for (const auto &t : ctables) {
if (!t->Empty()) {
tables.push_back(t->get_table());
}
Expand Down Expand Up @@ -413,48 +415,13 @@ Status Sort(const std::shared_ptr<Table> &table, const std::vector<int32_t> &sor
return Table::FromArrowTable(ctx, sorted_table, out);
}

Status SampleTableUniform(const std::shared_ptr<Table> &local_sorted,
int num_samples, std::vector<int32_t> sort_columns,
std::shared_ptr<Table> &sample_result,
const std::shared_ptr<CylonContext> &ctx) {
auto pool = cylon::ToArrowPool(ctx);

CYLON_ASSIGN_OR_RAISE(auto local_sorted_selected_cols, local_sorted->get_table()->SelectColumns(sort_columns));

if (local_sorted->Rows() == 0 || num_samples == 0) {
std::shared_ptr<arrow::Table> output;
RETURN_CYLON_STATUS_IF_ARROW_FAILED(util::CreateEmptyTable(
local_sorted_selected_cols->schema(), &output, pool));
sample_result = std::make_shared<Table>(ctx, std::move(output));
return Status::OK();
}

double step = local_sorted->Rows() / (num_samples + 1.0);
double acc = step;
arrow::Int64Builder filter(pool);
RETURN_CYLON_STATUS_IF_ARROW_FAILED(filter.Reserve(num_samples));

for (int i = 0; i < num_samples; i++) {
filter.UnsafeAppend((int64_t) acc);
acc += step;
}

CYLON_ASSIGN_OR_RAISE(auto take_arr, filter.Finish());
CYLON_ASSIGN_OR_RAISE(
auto take_res,
(arrow::compute::Take(local_sorted_selected_cols, take_arr)));
sample_result = std::make_shared<Table>(ctx, take_res.table());

return Status::OK();
}

template <typename T>
template<typename T>
static int CompareRows(const std::vector<std::unique_ptr<T>> &comparators,
int64_t idx_a,
int64_t idx_b) {
int sz = comparators.size();
if (std::is_same<T, cylon::DualArrayIndexComparator>::value) {
idx_b |= (int64_t)1 << 63;
idx_b |= (int64_t) 1 << 63;
}
for (int i = 0; i < sz; i++) {
int result = comparators[i]->compare(idx_a, idx_b);
Expand Down Expand Up @@ -538,9 +505,14 @@ Status DetermineSplitPoints(
gathered_tables_include_root, sort_columns, sort_orders, merged_table));

int num_split_points =
std::min(merged_table->Rows(), (int64_t)ctx->GetWorldSize() - 1);
std::min(merged_table->Rows(), (int64_t) ctx->GetWorldSize() - 1);

std::shared_ptr<arrow::Table> atable;
RETURN_CYLON_STATUS_IF_ARROW_FAILED(util::SampleTableUniform(merged_table->get_table(),
num_split_points, sort_columns, atable, ctx));

return Table::FromArrowTable(ctx, std::move(atable), split_points);

return SampleTableUniform(merged_table, num_split_points, sort_columns, split_points, ctx);
}

Status GetSplitPoints(std::shared_ptr<Table> &sample_result,
Expand All @@ -564,9 +536,9 @@ Status GetSplitPoints(std::shared_ptr<Table> &sample_result,

// return (index of) first element that is not less than the target element
int64_t tableBinarySearch(
const std::shared_ptr<Table> &sorted_table,
std::unique_ptr<DualTableRowIndexEqualTo>& equal_to,
int64_t split_point_idx, int64_t l) {
const std::shared_ptr<Table> &sorted_table,
std::unique_ptr<DualTableRowIndexEqualTo> &equal_to,
int64_t split_point_idx, int64_t l) {
int64_t r = sorted_table->Rows() - 1;
int L = l;

Expand Down Expand Up @@ -676,18 +648,22 @@ Status DistributedSortRegularSampling(const std::shared_ptr<Table> &table,

// sample the sorted table with sort columns and create a table
int sample_count = ctx->GetWorldSize() * SAMPLING_RATIO;
sample_count = std::min((int64_t)sample_count, table->Rows());
sample_count = std::min((int64_t) sample_count, table->Rows());

// sample_result only contains sorted columns
std::shared_ptr<Table> sample_result;

RETURN_CYLON_STATUS_IF_FAILED(
SampleTableUniform(local_sorted, sample_count, sort_columns, sample_result, ctx));
std::shared_ptr<arrow::Table> sample_result;

RETURN_CYLON_STATUS_IF_ARROW_FAILED(
util::SampleTableUniform(local_sorted->get_table(), sample_count, sort_columns, sample_result, ctx));
// determine split point, split_points only contains sorted columns
std::shared_ptr<Table> split_points;

std::shared_ptr<Table> cylonTable;

Table::FromArrowTable(ctx, std::move(sample_result), cylonTable);

RETURN_CYLON_STATUS_IF_FAILED(GetSplitPoints(
sample_result, sort_direction, split_points));
cylonTable, sort_direction, split_points));

// construct target_partition, partition_hist
RETURN_CYLON_STATUS_IF_FAILED(
Expand All @@ -712,14 +688,16 @@ Status DistributedSortRegularSampling(const std::shared_ptr<Table> &table,
}

Status DistributedSortInitialSampling(const std::shared_ptr<Table> &table,
const std::vector<int> &sort_columns,
std::shared_ptr<Table> &output,
const std::vector<bool> &sort_direction,
SortOptions sort_options) {
const std::vector<int> &sort_columns,
std::shared_ptr<Table> &output,
const std::vector<bool> &sort_direction,
SortOptions sort_options) {
const auto &ctx = table->GetContext();
int world_sz = ctx->GetWorldSize();

std::shared_ptr<arrow::Table> arrow_table, sorted_table;


// first do distributed sort partitioning
if (world_sz == 1) {
arrow_table = table->get_table();
Expand Down Expand Up @@ -766,6 +744,7 @@ Status DistributedSortInitialSampling(const std::shared_ptr<Table> &table,
}

return Table::FromArrowTable(ctx, sorted_table, output);

}

Status DistributedSort(const std::shared_ptr<Table> &table,
Expand All @@ -782,7 +761,7 @@ Status DistributedSort(const std::shared_ptr<Table> &table,
std::shared_ptr<Table> &output,
const std::vector<bool> &sort_direction,
SortOptions sort_options) {
if(sort_options.sort_method == sort_options.INITIAL_SAMPLE) {
if (sort_options.sort_method == sort_options.INITIAL_SAMPLE) {
return DistributedSortInitialSampling(table, sort_columns, output, sort_direction, sort_options);
} else {
return DistributedSortRegularSampling(table, sort_columns, sort_direction, output, sort_options);
Expand Down Expand Up @@ -1210,7 +1189,7 @@ Status FromCSV(const std::shared_ptr<CylonContext> &ctx, const std::vector<std::
read_promise));
}
bool all_passed = true;
for (auto &future: futures) {
for (auto &future : futures) {
auto status = future.first.get();
all_passed &= status.is_ok();
future.second.join();
Expand Down Expand Up @@ -1238,7 +1217,7 @@ Status Project(const std::shared_ptr<Table> &table, const std::vector<int32_t> &
auto table_ = table->get_table();
const auto &ctx = table->GetContext();

for (auto const &col_index: project_columns) {
for (auto const &col_index : project_columns) {
schema_vector.push_back(table_->field(col_index));
column_arrays.push_back(table_->column(col_index));
}
Expand Down Expand Up @@ -1518,7 +1497,6 @@ Status Repartition(const std::shared_ptr<cylon::Table> &table,

auto num_row_scalar = std::make_shared<Scalar>(arrow::MakeScalar(num_row));


RETURN_CYLON_STATUS_IF_FAILED(
table->GetContext()->GetCommunicator()->Allgather(num_row_scalar,
&sizes_cols));
Expand Down Expand Up @@ -1695,7 +1673,7 @@ Status FromParquet(const std::shared_ptr<CylonContext> &ctx, const std::vector<s
read_promise));
}
bool all_passed = true;
for (auto &future: futures) {
for (auto &future : futures) {
auto status = future.first.get();
all_passed &= status.is_ok();
future.second.join();
Expand Down Expand Up @@ -1725,6 +1703,5 @@ Status WriteParquet(const std::shared_ptr<cylon::CylonContext> &ctx_,
return Status(Code::OK);
}


#endif
} // namespace cylon
Loading