Skip to content

Commit

Permalink
Rework
Browse files Browse the repository at this point in the history
  • Loading branch information
sjperkins committed Sep 30, 2024
1 parent f95ec10 commit 63ff00a
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 68 deletions.
2 changes: 1 addition & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ add_library(arcae SHARED
arcae/configuration.cc
arcae/descriptor.cc
arcae/data_partition.cc
arcae/group_sort.cc
arcae/isolated_table_proxy.cc
arcae/new_table_proxy.cc
arcae/partition_sort.cc
arcae/read_impl.cc
arcae/write_impl.cc
arcae/result_shape.cc
Expand Down
30 changes: 15 additions & 15 deletions cpp/arcae/group_sort.cc → cpp/arcae/partition_sort.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "group_sort.h"
#include "arcae/partition_sort.h"

#include <algorithm>
#include <cstdint>
Expand Down Expand Up @@ -34,13 +34,13 @@ namespace arcae {

namespace {

static constexpr char kArrayIsNull[] = "GroupSortData array is null";
static constexpr char kLengthMismatch[] = "GroupSortData length mismatch";
static constexpr char kHasNulls[] = "GroupSortData has nulls";
static constexpr char kArrayIsNull[] = "PartitionSortData array is null";
static constexpr char kLengthMismatch[] = "PartitionSortData length mismatch";
static constexpr char kHasNulls[] = "PartitionSortData has nulls";

} // namespace

Result<std::shared_ptr<GroupSortData>> GroupSortData::Make(
Result<std::shared_ptr<PartitionSortData>> PartitionSortData::Make(
const std::vector<std::shared_ptr<Array>>& groups, const std::shared_ptr<Array>& time,
const std::shared_ptr<Array>& ant1, const std::shared_ptr<Array>& ant2,
const std::shared_ptr<Array>& rows) {
Expand Down Expand Up @@ -73,14 +73,14 @@ Result<std::shared_ptr<GroupSortData>> GroupSortData::Make(
groups_int32.push_back(std::dynamic_pointer_cast<arrow::Int32Array>(group));
}

return std::make_shared<AggregateAdapter<GroupSortData>>(
return std::make_shared<AggregateAdapter<PartitionSortData>>(
std::move(groups_int32), std::dynamic_pointer_cast<DoubleArray>(time),
std::dynamic_pointer_cast<Int32Array>(ant1),
std::dynamic_pointer_cast<Int32Array>(ant2),
std::dynamic_pointer_cast<Int64Array>(rows));
}

Result<std::shared_ptr<GroupSortData>> GroupSortData::Sort() const {
Result<std::shared_ptr<PartitionSortData>> PartitionSortData::Sort() const {
std::vector<const int*> groups;
groups.reserve(groups_.size());
for (const auto& g : groups_) groups.push_back(g->raw_values());
Expand Down Expand Up @@ -138,15 +138,15 @@ Result<std::shared_ptr<GroupSortData>> GroupSortData::Sort() const {
DoCopy(ant2_span, ant2);
DoCopy(rows_span, rows);

return std::make_shared<AggregateAdapter<GroupSortData>>(
return std::make_shared<AggregateAdapter<PartitionSortData>>(
std::move(group_arrays),
std::make_shared<DoubleArray>(nrow, std::move(time_buffer)),
std::make_shared<Int32Array>(nrow, std::move(ant1_buffer)),
std::make_shared<Int32Array>(nrow, std::move(ant2_buffer)),
std::make_shared<Int64Array>(nrow, std::move(rows_buffer)));
}

std::shared_ptr<Table> GroupSortData::ToTable() const {
std::shared_ptr<Table> PartitionSortData::ToTable() const {
std::vector<std::shared_ptr<Array>> arrays;
std::vector<std::shared_ptr<Field>> fields;

Expand All @@ -172,14 +172,14 @@ std::shared_ptr<Table> GroupSortData::ToTable() const {
return Table::Make(schema(std::move(fields)), std::move(arrays));
}

Result<std::shared_ptr<GroupSortData>> MergeGroups(
const std::vector<std::shared_ptr<GroupSortData>>& group_data) {
Result<std::shared_ptr<PartitionSortData>> MergePartitions(
const std::vector<std::shared_ptr<PartitionSortData>>& group_data) {
if (group_data.empty())
return std::make_shared<AggregateAdapter<GroupSortData>>(
GroupSortData::GroupsType{}, nullptr, nullptr, nullptr, nullptr);
return std::make_shared<AggregateAdapter<PartitionSortData>>(
PartitionSortData::GroupsType{}, nullptr, nullptr, nullptr, nullptr);

struct MergeData {
GroupSortData* group_;
PartitionSortData* group_;
std::int64_t r;

inline std::int32_t group(std::size_t g, std::int64_t r) const {
Expand Down Expand Up @@ -263,7 +263,7 @@ Result<std::shared_ptr<GroupSortData>> MergeGroups(
}
}

return std::make_shared<AggregateAdapter<GroupSortData>>(
return std::make_shared<AggregateAdapter<PartitionSortData>>(
std::move(group_arrays),
std::make_shared<DoubleArray>(nrows, std::move(time_buffer)),
std::make_shared<Int32Array>(nrows, std::move(ant1_buffer)),
Expand Down
28 changes: 21 additions & 7 deletions cpp/arcae/group_sort.h → cpp/arcae/partition_sort.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,15 @@

namespace arcae {

struct GroupSortData {
// Structure for storing data to sort the rows of a Measurement Set
// partition by a number of grouping columns (e.g. DATA_DESC_ID, FIELD_ID),
// as well as TIME, ANTENNA1 and ANTENNA2.
//
// The sorting functionality could be provided by an arrow Table.
// However, arrow's Table merging functionality is not publicly exposed,
// so PartitionSortData is used to provide a structure that can be
// used in a k-way merge.
struct PartitionSortData {
using GroupsType = std::vector<std::shared_ptr<arrow::Int32Array>>;
GroupsType groups_;
std::shared_ptr<arrow::DoubleArray> time_;
Expand All @@ -27,8 +35,8 @@ struct GroupSortData {
inline std::int32_t ant2(std::size_t row) const { return ant2_->raw_values()[row]; }
inline std::int64_t rows(std::size_t row) const { return rows_->raw_values()[row]; }

// Create the GroupSortData from grouping and sorting arrays
static arrow::Result<std::shared_ptr<GroupSortData>> Make(
// Create the PartitionSortata from grouping and sorting arrays
static arrow::Result<std::shared_ptr<PartitionSortData>> Make(
const std::vector<std::shared_ptr<arrow::Array>>& groups,
const std::shared_ptr<arrow::Array>& time,
const std::shared_ptr<arrow::Array>& ant1,
Expand All @@ -44,12 +52,18 @@ struct GroupSortData {
// Number of rows in the group
std::int64_t nRows() const { return rows_->length(); }

// Sort the Group
arrow::Result<std::shared_ptr<GroupSortData>> Sort() const;
// Sort the Group in the following order (ascending)
// 1. Each GROUP column
// 2. TIME
// 3. ANTENNA1
// 4. ANTENNA2
arrow::Result<std::shared_ptr<PartitionSortData>> Sort() const;
};

arrow::Result<std::shared_ptr<GroupSortData>> MergeGroups(
const std::vector<std::shared_ptr<GroupSortData>>& group_data);
// Do a k-way merge of the given partitions
// which should have been sorted by a called to PartitionSortData::Sort()
arrow::Result<std::shared_ptr<PartitionSortData>> MergePartitions(
const std::vector<std::shared_ptr<PartitionSortData>>& partitions);

} // namespace arcae

Expand Down
8 changes: 4 additions & 4 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ add_executable(dev_transpose_test dev_transpose_test.cc)
target_link_libraries(dev_transpose_test PRIVATE GTest::gtest_main arcae absl::time absl::str_format test_utils)
add_test(dev_transpose_test dev_transpose_test)

add_executable(group_sort_test group_sort_test.cc)
target_link_libraries(group_sort_test PRIVATE GTest::gtest_main arcae test_utils)
add_test(group_sort_test group_sort_test)
add_executable(partition_sort_test partition_sort_test.cc)
target_link_libraries(partition_sort_test PRIVATE GTest::gtest_main arcae test_utils)
add_test(partition_sort_test partition_sort_test)

add_executable(new_table_proxy_test new_table_proxy_test.cc)
target_link_libraries(new_table_proxy_test PRIVATE GTest::gtest_main arcae test_utils)
Expand All @@ -42,7 +42,7 @@ add_test(new_table_proxy_test new_table_proxy_test)
set_tests_properties(result_shape_test
data_partition_test
dev_transpose_test
group_sort_test
partition_sort_test
isolated_table_proxy_test
new_table_proxy_test
parallel_write_test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "arcae/group_sort.h"
#include "arcae/partition_sort.h"

#include <memory>

Expand All @@ -12,8 +12,8 @@

using ::arrow::ipc::internal::json::ArrayFromJSON;

using ::arcae::GroupSortData;
using ::arcae::MergeGroups;
using ::arcae::MergePartitions;
using ::arcae::PartitionSortData;

namespace {

Expand All @@ -35,9 +35,10 @@ TEST(GroupSortTest, TestSort) {
ASSERT_OK_AND_ASSIGN(auto rows,
ArrayFromJSON(arrow::int64(), "[1, 2, 3, 4, 5, 6, 7, 8, 9]"));

ASSERT_OK_AND_ASSIGN(auto base, GroupSortData::Make(groups, time, ant1, ant2, rows));
ASSERT_OK_AND_ASSIGN(auto base,
PartitionSortData::Make(groups, time, ant1, ant2, rows));
ASSERT_OK_AND_ASSIGN(auto sorted, base->Sort());
ASSERT_OK_AND_ASSIGN(auto merged, MergeGroups({sorted, sorted, sorted, sorted}));
ASSERT_OK_AND_ASSIGN(auto merged, MergePartitions({sorted, sorted, sorted, sorted}));
}

} // namespace
18 changes: 9 additions & 9 deletions src/arcae/lib/arrow_tables.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,23 @@ cdef extern from "arcae/descriptor.h" namespace "arcae" nogil:
cdef CResult[string] CMSDescriptor" arcae::MSDescriptor"(
const string & table, bool complete)

cdef extern from "arcae/group_sort.h" namespace "arcae" nogil:
cdef cppclass CGroupSortData" arcae::GroupSortData":
cdef extern from "arcae/partition_sort.h" namespace "arcae" nogil:
cdef cppclass CPartitionSortData" arcae::PartitionSortData":
@staticmethod
CResult[shared_ptr[CGroupSortData]] Make" GroupSortData::Make"(
CResult[shared_ptr[CPartitionSortData]] Make" PartitionSortData::Make"(
const vector[shared_ptr[CArray]] & columns,
const shared_ptr[CArray] & time,
const shared_ptr[CArray] & ant1,
const shared_ptr[CArray] & ant2,
const shared_ptr[CArray] & rows)

size_t nGroups" GroupSortData::nGroups"()
size_t nRows" GroupSortData::nRows"()
CResult[shared_ptr[CGroupSortData]] Sort" GroupSortData::Sort"()
shared_ptr[CTable] ToTable" GroupSortData::ToTable"()
size_t nGroups" PartitionSortData::nGroups"()
size_t nRows" PartitionSortData::nRows"()
CResult[shared_ptr[CPartitionSortData]] Sort" PartitionSortData::Sort"()
shared_ptr[CTable] ToTable" PartitionSortData::ToTable"()

CResult[shared_ptr[CGroupSortData]] MergeGroups" MergeGroups"(
const vector[shared_ptr[CGroupSortData]] & group_data)
CResult[shared_ptr[CPartitionSortData]] MergePartitions" MergePartitions"(
const vector[shared_ptr[CPartitionSortData]] & partitions)


cdef extern from "arcae/new_table_proxy.h" namespace "arcae" nogil:
Expand Down
44 changes: 22 additions & 22 deletions src/arcae/lib/arrow_tables.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ from pyarrow.lib import (tobytes, frombytes)
from arcae.lib.arrow_tables cimport (
CCasaTable,
CConfiguration,
CGroupSortData,
CPartitionSortData,
CMSDescriptor,
CServiceLocator,
COpenTable,
CDefaultMS,
CSelection,
CSelectionBuilder,
CTaql,
MergeGroups,
MergePartitions,
IndexType)


Expand Down Expand Up @@ -435,8 +435,8 @@ class Configuration(MutableMapping):
return config.Size()


cdef class GroupSortData:
cdef shared_ptr[CGroupSortData] c_data
cdef class PartitionSortData:
cdef shared_ptr[CPartitionSortData] c_data

def __init__(
self,
Expand All @@ -458,19 +458,19 @@ cdef class GroupSortData:

with nogil:
self.c_data = GetResultValue(
CGroupSortData.Make(c_groups, c_time, c_ant1, c_ant2, c_rows)
CPartitionSortData.Make(c_groups, c_time, c_ant1, c_ant2, c_rows)
)

def sort(self) -> GroupSortData:
cdef shared_ptr[CGroupSortData] c_gsd
cdef GroupSortData gsd
def sort(self) -> PartitionSortData:
cdef shared_ptr[CPartitionSortData] c_psd
cdef PartitionSortData psd

with nogil:
c_gsd = GetResultValue(self.c_data.get().Sort())
c_psd = GetResultValue(self.c_data.get().Sort())

gsd = GroupSortData.__new__(GroupSortData)
gsd.c_data = c_gsd
return gsd
psd = PartitionSortData.__new__(PartitionSortData)
psd.c_data = c_psd
return psd

def to_arrow(self) -> pa.Table:
cdef shared_ptr[CTable] table
Expand All @@ -481,17 +481,17 @@ cdef class GroupSortData:
return pyarrow_wrap_table(table)


def merge_groups(groups: Sequence[GroupSortData]) -> GroupSortData:
cdef vector[shared_ptr[CGroupSortData]] c_groups
cdef shared_ptr[CGroupSortData] c_merged
cdef GroupSortData gsd
def merge_partitions(partitions: Sequence[PartitionSortData]) -> PartitionSortData:
cdef vector[shared_ptr[CPartitionSortData]] c_partitions
cdef shared_ptr[CPartitionSortData] c_merged
cdef PartitionSortData psd

for g in groups:
c_groups.push_back((<GroupSortData?> g).c_data)
for o in partitions:
c_partitions.push_back((<PartitionSortData?> o).c_data)

with nogil:
c_merged = GetResultValue(MergeGroups(c_groups))
c_merged = GetResultValue(MergePartitions(c_partitions))

gsd = GroupSortData.__new__(GroupSortData)
gsd.c_data = c_merged
return gsd
psd = PartitionSortData.__new__(PartitionSortData)
psd.c_data = c_merged
return psd
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pyarrow as pa
import pytest

from arcae.lib.arrow_tables import GroupSortData, merge_groups
from arcae.lib.arrow_tables import PartitionSortData, merge_partitions

SORT_KEYS = [
("GROUP_0", "ascending"),
Expand All @@ -26,7 +26,7 @@ def test_sorting():
}
)

gsd = GroupSortData(
gsd = PartitionSortData(
[
data["GROUP_0"].combine_chunks(),
data["GROUP_1"].combine_chunks(),
Expand Down Expand Up @@ -64,10 +64,10 @@ def test_merging(n, chunks, seed):

gsds = []

# Split test data into GroupSortData and sort
# Split test data into PartitionSortData and sort
for start in range(0, n, chunks):
batch = data.slice(start, chunks)
gsd = GroupSortData(
gsd = PartitionSortData(
[
batch["GROUP_0"].combine_chunks(),
batch["GROUP_1"].combine_chunks(),
Expand All @@ -81,4 +81,4 @@ def test_merging(n, chunks, seed):
gsds.append(gsd.sort())

# Test that merging matches sorted data
assert merge_groups(gsds).to_arrow().equals(data.sort_by(SORT_KEYS))
assert merge_partitions(gsds).to_arrow().equals(data.sort_by(SORT_KEYS))

0 comments on commit 63ff00a

Please sign in to comment.