Skip to content

Commit

Permalink
cylondata#610: Implements requests change to address SIG Faults. Also…
Browse files Browse the repository at this point in the history
… applied google formatter per PR request. Validated tests complete successfully.
  • Loading branch information
mstaylor committed Sep 27, 2022
1 parent aef19ea commit 44f6935
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 101 deletions.
75 changes: 39 additions & 36 deletions cpp/src/cylon/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ static inline Status all_to_all_arrow_tables(const std::shared_ptr<CylonContext>

// entries from each RANK are separated
static inline Status all_to_all_arrow_tables_separated_arrow_table(const std::shared_ptr<CylonContext> &ctx,
const std::shared_ptr<arrow::Schema> &schema,
const std::vector<std::shared_ptr<arrow::Table>> &partitioned_tables,
std::vector<std::shared_ptr<arrow::Table>> &received_tables) {
const std::shared_ptr<arrow::Schema> &schema,
const std::vector<std::shared_ptr<arrow::Table>> &partitioned_tables,
std::vector<std::shared_ptr<arrow::Table>> &received_tables) {
const auto &neighbours = ctx->GetNeighbours(true);
received_tables.resize(ctx->GetWorldSize());

Expand Down Expand Up @@ -156,14 +156,14 @@ static inline Status all_to_all_arrow_tables_separated_arrow_table(const std::sh
}

static inline Status all_to_all_arrow_tables_separated_cylon_table(const std::shared_ptr<CylonContext> &ctx,
const std::shared_ptr<arrow::Schema> &schema,
const std::vector<std::shared_ptr<arrow::Table>> &partitioned_tables,
std::vector<std::shared_ptr<Table>> &table_out) {
const std::shared_ptr<arrow::Schema> &schema,
const std::vector<std::shared_ptr<arrow::Table>> &partitioned_tables,
std::vector<std::shared_ptr<Table>> &table_out) {
std::vector<std::shared_ptr<arrow::Table>> received_tables;
all_to_all_arrow_tables_separated_arrow_table(ctx, schema, partitioned_tables, received_tables);

table_out.reserve(received_tables.size() - 1);
for (auto &received_table: received_tables) {
for (auto &received_table : received_tables) {
if (received_table->num_rows() > 0) {
CYLON_ASSIGN_OR_RAISE(auto arrow_tb, received_table->CombineChunks(cylon::ToArrowPool(ctx)));
auto temp = std::make_shared<Table>(ctx, std::move(arrow_tb));
Expand Down Expand Up @@ -237,9 +237,9 @@ static inline Status shuffle_two_tables_by_hashing(const std::shared_ptr<cylon::
Status FromCSV(const std::shared_ptr<CylonContext> &ctx, const std::string &path,
std::shared_ptr<Table> &tableOut, const cylon::io::config::CSVReadOptions &options) {
arrow::Result<std::shared_ptr<arrow::Table>> result = cylon::io::read_csv(ctx, path, options);

LOG(INFO) << "Reading Inside FromCSV";

if (result.ok()) {
LOG(INFO) << "CSV file reading is OK";
std::shared_ptr<arrow::Table> &table = result.ValueOrDie();
Expand Down Expand Up @@ -293,7 +293,7 @@ Status Table::FromColumns(const std::shared_ptr<CylonContext> &ctx,
arrow::SchemaBuilder schema_builder;
arrow::ArrayVector arrays;

if (columns.size() != column_names.size()){
if (columns.size() != column_names.size()) {
return {Code::Invalid, "number of columns != number of column names"};
}

Expand Down Expand Up @@ -343,7 +343,7 @@ Status Merge(const std::vector<std::shared_ptr<cylon::Table>> &ctables,
if (!ctables.empty()) {
std::vector<std::shared_ptr<arrow::Table>> tables;
tables.reserve(ctables.size());
for (const auto &t: ctables) {
for (const auto &t : ctables) {
if (!t->Empty()) {
tables.push_back(t->get_table());
}
Expand Down Expand Up @@ -415,13 +415,13 @@ Status Sort(const std::shared_ptr<Table> &table, const std::vector<int32_t> &sor
return Table::FromArrowTable(ctx, sorted_table, out);
}

template <typename T>
template<typename T>
static int CompareRows(const std::vector<std::unique_ptr<T>> &comparators,
int64_t idx_a,
int64_t idx_b) {
int sz = comparators.size();
if (std::is_same<T, cylon::DualArrayIndexComparator>::value) {
idx_b |= (int64_t)1 << 63;
idx_b |= (int64_t) 1 << 63;
}
for (int i = 0; i < sz; i++) {
int result = comparators[i]->compare(idx_a, idx_b);
Expand Down Expand Up @@ -505,14 +505,13 @@ Status DetermineSplitPoints(
gathered_tables_include_root, sort_columns, sort_orders, merged_table));

int num_split_points =
std::min(merged_table->Rows(), (int64_t)ctx->GetWorldSize() - 1);


std::min(merged_table->Rows(), (int64_t) ctx->GetWorldSize() - 1);

auto status = util::SampleTableUniform(merged_table->get_table(), num_split_points, sort_columns,
const_cast<std::shared_ptr<arrow::Table> &>(split_points->get_table()), ctx);
std::shared_ptr<arrow::Table> atable;
RETURN_CYLON_STATUS_IF_ARROW_FAILED(util::SampleTableUniform(merged_table->get_table(),
num_split_points, sort_columns, atable, ctx));

return cylon::Status(static_cast<int>(status.code()), status.message());
return Table::FromArrowTable(ctx, std::move(atable), split_points);

}

Expand All @@ -537,9 +536,9 @@ Status GetSplitPoints(std::shared_ptr<Table> &sample_result,

// return (index of) first element that is not less than the target element
int64_t tableBinarySearch(
const std::shared_ptr<Table> &sorted_table,
std::unique_ptr<DualTableRowIndexEqualTo>& equal_to,
int64_t split_point_idx, int64_t l) {
const std::shared_ptr<Table> &sorted_table,
std::unique_ptr<DualTableRowIndexEqualTo> &equal_to,
int64_t split_point_idx, int64_t l) {
int64_t r = sorted_table->Rows() - 1;
int L = l;

Expand Down Expand Up @@ -649,19 +648,22 @@ Status DistributedSortRegularSampling(const std::shared_ptr<Table> &table,

// sample the sorted table with sort columns and create a table
int sample_count = ctx->GetWorldSize() * SAMPLING_RATIO;
sample_count = std::min((int64_t)sample_count, table->Rows());
sample_count = std::min((int64_t) sample_count, table->Rows());

// sample_result only contains sorted columns
std::shared_ptr<arrow::Table> sample_result;


RETURN_CYLON_STATUS_IF_ARROW_FAILED(
util::SampleTableUniform(local_sorted->get_table(), sample_count, sort_columns, sample_result, ctx));

// determine split point, split_points only contains sorted columns
std::shared_ptr<Table> split_points;

std::shared_ptr<Table> cylonTable;

Table::FromArrowTable(ctx, std::move(sample_result), cylonTable);

RETURN_CYLON_STATUS_IF_FAILED(GetSplitPoints(
cylon_sample_result, sort_direction, split_points));
cylonTable, sort_direction, split_points));

// construct target_partition, partition_hist
RETURN_CYLON_STATUS_IF_FAILED(
Expand All @@ -686,14 +688,16 @@ Status DistributedSortRegularSampling(const std::shared_ptr<Table> &table,
}

Status DistributedSortInitialSampling(const std::shared_ptr<Table> &table,
const std::vector<int> &sort_columns,
std::shared_ptr<Table> &output,
const std::vector<bool> &sort_direction,
SortOptions sort_options) {
const std::vector<int> &sort_columns,
std::shared_ptr<Table> &output,
const std::vector<bool> &sort_direction,
SortOptions sort_options) {
const auto &ctx = table->GetContext();
int world_sz = ctx->GetWorldSize();

std::shared_ptr<arrow::Table> arrow_table, sorted_table;


// first do distributed sort partitioning
if (world_sz == 1) {
arrow_table = table->get_table();
Expand Down Expand Up @@ -740,6 +744,7 @@ Status DistributedSortInitialSampling(const std::shared_ptr<Table> &table,
}

return Table::FromArrowTable(ctx, sorted_table, output);

}

Status DistributedSort(const std::shared_ptr<Table> &table,
Expand All @@ -756,7 +761,7 @@ Status DistributedSort(const std::shared_ptr<Table> &table,
std::shared_ptr<Table> &output,
const std::vector<bool> &sort_direction,
SortOptions sort_options) {
if(sort_options.sort_method == sort_options.INITIAL_SAMPLE) {
if (sort_options.sort_method == sort_options.INITIAL_SAMPLE) {
return DistributedSortInitialSampling(table, sort_columns, output, sort_direction, sort_options);
} else {
return DistributedSortRegularSampling(table, sort_columns, sort_direction, output, sort_options);
Expand Down Expand Up @@ -1184,7 +1189,7 @@ Status FromCSV(const std::shared_ptr<CylonContext> &ctx, const std::vector<std::
read_promise));
}
bool all_passed = true;
for (auto &future: futures) {
for (auto &future : futures) {
auto status = future.first.get();
all_passed &= status.is_ok();
future.second.join();
Expand Down Expand Up @@ -1212,7 +1217,7 @@ Status Project(const std::shared_ptr<Table> &table, const std::vector<int32_t> &
auto table_ = table->get_table();
const auto &ctx = table->GetContext();

for (auto const &col_index: project_columns) {
for (auto const &col_index : project_columns) {
schema_vector.push_back(table_->field(col_index));
column_arrays.push_back(table_->column(col_index));
}
Expand Down Expand Up @@ -1492,7 +1497,6 @@ Status Repartition(const std::shared_ptr<cylon::Table> &table,

auto num_row_scalar = std::make_shared<Scalar>(arrow::MakeScalar(num_row));


RETURN_CYLON_STATUS_IF_FAILED(
table->GetContext()->GetCommunicator()->Allgather(num_row_scalar,
&sizes_cols));
Expand Down Expand Up @@ -1669,7 +1673,7 @@ Status FromParquet(const std::shared_ptr<CylonContext> &ctx, const std::vector<s
read_promise));
}
bool all_passed = true;
for (auto &future: futures) {
for (auto &future : futures) {
auto status = future.first.get();
all_passed &= status.is_ok();
future.second.join();
Expand Down Expand Up @@ -1699,6 +1703,5 @@ Status WriteParquet(const std::shared_ptr<cylon::CylonContext> &ctx_,
return Status(Code::OK);
}


#endif
} // namespace cylon
Loading

0 comments on commit 44f6935

Please sign in to comment.