diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index c2532047..8f8a9696 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -6,9 +6,9 @@ add_library(arcae SHARED arcae/configuration.cc arcae/descriptor.cc arcae/data_partition.cc - arcae/group_sort.cc arcae/isolated_table_proxy.cc arcae/new_table_proxy.cc + arcae/partition_sort.cc arcae/read_impl.cc arcae/write_impl.cc arcae/result_shape.cc diff --git a/cpp/arcae/group_sort.cc b/cpp/arcae/partition_sort.cc similarity index 90% rename from cpp/arcae/group_sort.cc rename to cpp/arcae/partition_sort.cc index 6cfc4ff3..1e56471b 100644 --- a/cpp/arcae/group_sort.cc +++ b/cpp/arcae/partition_sort.cc @@ -1,4 +1,4 @@ -#include "group_sort.h" +#include "arcae/partition_sort.h" #include #include @@ -34,13 +34,13 @@ namespace arcae { namespace { -static constexpr char kArrayIsNull[] = "GroupSortData array is null"; -static constexpr char kLengthMismatch[] = "GroupSortData length mismatch"; -static constexpr char kHasNulls[] = "GroupSortData has nulls"; +static constexpr char kArrayIsNull[] = "PartitionSortData array is null"; +static constexpr char kLengthMismatch[] = "PartitionSortData length mismatch"; +static constexpr char kHasNulls[] = "PartitionSortData has nulls"; } // namespace -Result> GroupSortData::Make( +Result> PartitionSortData::Make( const std::vector>& groups, const std::shared_ptr& time, const std::shared_ptr& ant1, const std::shared_ptr& ant2, const std::shared_ptr& rows) { @@ -73,14 +73,14 @@ Result> GroupSortData::Make( groups_int32.push_back(std::dynamic_pointer_cast(group)); } - return std::make_shared>( + return std::make_shared>( std::move(groups_int32), std::dynamic_pointer_cast(time), std::dynamic_pointer_cast(ant1), std::dynamic_pointer_cast(ant2), std::dynamic_pointer_cast(rows)); } -Result> GroupSortData::Sort() const { +Result> PartitionSortData::Sort() const { std::vector groups; groups.reserve(groups_.size()); for (const auto& g : groups_) groups.push_back(g->raw_values()); @@ -138,7 +138,7 @@ Result> GroupSortData::Sort() const { DoCopy(ant2_span, ant2); DoCopy(rows_span, rows); - return std::make_shared>( + return std::make_shared>( std::move(group_arrays), std::make_shared(nrow, std::move(time_buffer)), std::make_shared(nrow, std::move(ant1_buffer)), @@ -146,7 +146,7 @@ Result> GroupSortData::Sort() const { std::make_shared(nrow, std::move(rows_buffer))); } -std::shared_ptr GroupSortData::ToTable() const { +std::shared_ptr
PartitionSortData::ToTable() const { std::vector> arrays; std::vector> fields; @@ -172,14 +172,14 @@ std::shared_ptr
GroupSortData::ToTable() const { return Table::Make(schema(std::move(fields)), std::move(arrays)); } -Result> MergeGroups( - const std::vector>& group_data) { +Result> MergePartitions( + const std::vector>& group_data) { if (group_data.empty()) - return std::make_shared>( - GroupSortData::GroupsType{}, nullptr, nullptr, nullptr, nullptr); + return std::make_shared>( + PartitionSortData::GroupsType{}, nullptr, nullptr, nullptr, nullptr); struct MergeData { - GroupSortData* group_; + PartitionSortData* group_; std::int64_t r; inline std::int32_t group(std::size_t g, std::int64_t r) const { @@ -263,7 +263,7 @@ Result> MergeGroups( } } - return std::make_shared>( + return std::make_shared>( std::move(group_arrays), std::make_shared(nrows, std::move(time_buffer)), std::make_shared(nrows, std::move(ant1_buffer)), diff --git a/cpp/arcae/group_sort.h b/cpp/arcae/partition_sort.h similarity index 59% rename from cpp/arcae/group_sort.h rename to cpp/arcae/partition_sort.h index 8c514e48..901b2a06 100644 --- a/cpp/arcae/group_sort.h +++ b/cpp/arcae/partition_sort.h @@ -11,7 +11,15 @@ namespace arcae { -struct GroupSortData { +// Structure for storing data to sort the rows of a Measurement Set +// partition by a number of grouping columns (e.g. DATA_DESC_ID, FIELD_ID), +// as well as TIME, ANTENNA1 and ANTENNA2. +// +// The sorting functionality could be provided by an arrow Table. +// However, arrow's Table merging functionality is not publicly exposed, +// so PartitionSortData is used to provide a structure that can be +// used in a k-way merge. +struct PartitionSortData { using GroupsType = std::vector>; GroupsType groups_; std::shared_ptr time_; @@ -27,8 +35,8 @@ struct GroupSortData { inline std::int32_t ant2(std::size_t row) const { return ant2_->raw_values()[row]; } inline std::int64_t rows(std::size_t row) const { return rows_->raw_values()[row]; } - // Create the GroupSortData from grouping and sorting arrays - static arrow::Result> Make( + // Create the PartitionSortata from grouping and sorting arrays + static arrow::Result> Make( const std::vector>& groups, const std::shared_ptr& time, const std::shared_ptr& ant1, @@ -44,12 +52,18 @@ struct GroupSortData { // Number of rows in the group std::int64_t nRows() const { return rows_->length(); } - // Sort the Group - arrow::Result> Sort() const; + // Sort the Group in the following order (ascending) + // 1. Each GROUP column + // 2. TIME + // 3. ANTENNA1 + // 4. ANTENNA2 + arrow::Result> Sort() const; }; -arrow::Result> MergeGroups( - const std::vector>& group_data); +// Do a k-way merge of the given partitions +// which should have been sorted by a called to PartitionSortData::Sort() +arrow::Result> MergePartitions( + const std::vector>& partitions); } // namespace arcae diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 4f53c8be..0d5afe8c 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -31,9 +31,9 @@ add_executable(dev_transpose_test dev_transpose_test.cc) target_link_libraries(dev_transpose_test PRIVATE GTest::gtest_main arcae absl::time absl::str_format test_utils) add_test(dev_transpose_test dev_transpose_test) -add_executable(group_sort_test group_sort_test.cc) -target_link_libraries(group_sort_test PRIVATE GTest::gtest_main arcae test_utils) -add_test(group_sort_test group_sort_test) +add_executable(partition_sort_test partition_sort_test.cc) +target_link_libraries(partition_sort_test PRIVATE GTest::gtest_main arcae test_utils) +add_test(partition_sort_test partition_sort_test) add_executable(new_table_proxy_test new_table_proxy_test.cc) target_link_libraries(new_table_proxy_test PRIVATE GTest::gtest_main arcae test_utils) @@ -42,7 +42,7 @@ add_test(new_table_proxy_test new_table_proxy_test) set_tests_properties(result_shape_test data_partition_test dev_transpose_test - group_sort_test + partition_sort_test isolated_table_proxy_test new_table_proxy_test parallel_write_test diff --git a/cpp/tests/group_sort_test.cc b/cpp/tests/partition_sort_test.cc similarity index 79% rename from cpp/tests/group_sort_test.cc rename to cpp/tests/partition_sort_test.cc index 43c2650d..be342828 100644 --- a/cpp/tests/group_sort_test.cc +++ b/cpp/tests/partition_sort_test.cc @@ -1,4 +1,4 @@ -#include "arcae/group_sort.h" +#include "arcae/partition_sort.h" #include @@ -12,8 +12,8 @@ using ::arrow::ipc::internal::json::ArrayFromJSON; -using ::arcae::GroupSortData; -using ::arcae::MergeGroups; +using ::arcae::MergePartitions; +using ::arcae::PartitionSortData; namespace { @@ -35,9 +35,10 @@ TEST(GroupSortTest, TestSort) { ASSERT_OK_AND_ASSIGN(auto rows, ArrayFromJSON(arrow::int64(), "[1, 2, 3, 4, 5, 6, 7, 8, 9]")); - ASSERT_OK_AND_ASSIGN(auto base, GroupSortData::Make(groups, time, ant1, ant2, rows)); + ASSERT_OK_AND_ASSIGN(auto base, + PartitionSortData::Make(groups, time, ant1, ant2, rows)); ASSERT_OK_AND_ASSIGN(auto sorted, base->Sort()); - ASSERT_OK_AND_ASSIGN(auto merged, MergeGroups({sorted, sorted, sorted, sorted})); + ASSERT_OK_AND_ASSIGN(auto merged, MergePartitions({sorted, sorted, sorted, sorted})); } } // namespace diff --git a/src/arcae/lib/arrow_tables.pxd b/src/arcae/lib/arrow_tables.pxd index e3dae4bb..b7bbbf5a 100644 --- a/src/arcae/lib/arrow_tables.pxd +++ b/src/arcae/lib/arrow_tables.pxd @@ -51,23 +51,23 @@ cdef extern from "arcae/descriptor.h" namespace "arcae" nogil: cdef CResult[string] CMSDescriptor" arcae::MSDescriptor"( const string & table, bool complete) -cdef extern from "arcae/group_sort.h" namespace "arcae" nogil: - cdef cppclass CGroupSortData" arcae::GroupSortData": +cdef extern from "arcae/partition_sort.h" namespace "arcae" nogil: + cdef cppclass CPartitionSortData" arcae::PartitionSortData": @staticmethod - CResult[shared_ptr[CGroupSortData]] Make" GroupSortData::Make"( + CResult[shared_ptr[CPartitionSortData]] Make" PartitionSortData::Make"( const vector[shared_ptr[CArray]] & columns, const shared_ptr[CArray] & time, const shared_ptr[CArray] & ant1, const shared_ptr[CArray] & ant2, const shared_ptr[CArray] & rows) - size_t nGroups" GroupSortData::nGroups"() - size_t nRows" GroupSortData::nRows"() - CResult[shared_ptr[CGroupSortData]] Sort" GroupSortData::Sort"() - shared_ptr[CTable] ToTable" GroupSortData::ToTable"() + size_t nGroups" PartitionSortData::nGroups"() + size_t nRows" PartitionSortData::nRows"() + CResult[shared_ptr[CPartitionSortData]] Sort" PartitionSortData::Sort"() + shared_ptr[CTable] ToTable" PartitionSortData::ToTable"() - CResult[shared_ptr[CGroupSortData]] MergeGroups" MergeGroups"( - const vector[shared_ptr[CGroupSortData]] & group_data) + CResult[shared_ptr[CPartitionSortData]] MergePartitions" MergePartitions"( + const vector[shared_ptr[CPartitionSortData]] & partitions) cdef extern from "arcae/new_table_proxy.h" namespace "arcae" nogil: diff --git a/src/arcae/lib/arrow_tables.pyx b/src/arcae/lib/arrow_tables.pyx index 0935dd19..3d7ad6ca 100644 --- a/src/arcae/lib/arrow_tables.pyx +++ b/src/arcae/lib/arrow_tables.pyx @@ -29,7 +29,7 @@ from pyarrow.lib import (tobytes, frombytes) from arcae.lib.arrow_tables cimport ( CCasaTable, CConfiguration, - CGroupSortData, + CPartitionSortData, CMSDescriptor, CServiceLocator, COpenTable, @@ -37,7 +37,7 @@ from arcae.lib.arrow_tables cimport ( CSelection, CSelectionBuilder, CTaql, - MergeGroups, + MergePartitions, IndexType) @@ -435,8 +435,8 @@ class Configuration(MutableMapping): return config.Size() -cdef class GroupSortData: - cdef shared_ptr[CGroupSortData] c_data +cdef class PartitionSortData: + cdef shared_ptr[CPartitionSortData] c_data def __init__( self, @@ -458,19 +458,19 @@ cdef class GroupSortData: with nogil: self.c_data = GetResultValue( - CGroupSortData.Make(c_groups, c_time, c_ant1, c_ant2, c_rows) + CPartitionSortData.Make(c_groups, c_time, c_ant1, c_ant2, c_rows) ) - def sort(self) -> GroupSortData: - cdef shared_ptr[CGroupSortData] c_gsd - cdef GroupSortData gsd + def sort(self) -> PartitionSortData: + cdef shared_ptr[CPartitionSortData] c_psd + cdef PartitionSortData psd with nogil: - c_gsd = GetResultValue(self.c_data.get().Sort()) + c_psd = GetResultValue(self.c_data.get().Sort()) - gsd = GroupSortData.__new__(GroupSortData) - gsd.c_data = c_gsd - return gsd + psd = PartitionSortData.__new__(PartitionSortData) + psd.c_data = c_psd + return psd def to_arrow(self) -> pa.Table: cdef shared_ptr[CTable] table @@ -481,17 +481,17 @@ cdef class GroupSortData: return pyarrow_wrap_table(table) -def merge_groups(groups: Sequence[GroupSortData]) -> GroupSortData: - cdef vector[shared_ptr[CGroupSortData]] c_groups - cdef shared_ptr[CGroupSortData] c_merged - cdef GroupSortData gsd +def merge_partitions(partitions: Sequence[PartitionSortData]) -> PartitionSortData: + cdef vector[shared_ptr[CPartitionSortData]] c_partitions + cdef shared_ptr[CPartitionSortData] c_merged + cdef PartitionSortData psd - for g in groups: - c_groups.push_back(( g).c_data) + for o in partitions: + c_partitions.push_back(( o).c_data) with nogil: - c_merged = GetResultValue(MergeGroups(c_groups)) + c_merged = GetResultValue(MergePartitions(c_partitions)) - gsd = GroupSortData.__new__(GroupSortData) - gsd.c_data = c_merged - return gsd + psd = PartitionSortData.__new__(PartitionSortData) + psd.c_data = c_merged + return psd diff --git a/src/arcae/tests/test_group_sort.py b/src/arcae/tests/test_partition_sort.py similarity index 89% rename from src/arcae/tests/test_group_sort.py rename to src/arcae/tests/test_partition_sort.py index 917ca9d0..a4b13fdc 100644 --- a/src/arcae/tests/test_group_sort.py +++ b/src/arcae/tests/test_partition_sort.py @@ -2,7 +2,7 @@ import pyarrow as pa import pytest -from arcae.lib.arrow_tables import GroupSortData, merge_groups +from arcae.lib.arrow_tables import PartitionSortData, merge_partitions SORT_KEYS = [ ("GROUP_0", "ascending"), @@ -26,7 +26,7 @@ def test_sorting(): } ) - gsd = GroupSortData( + gsd = PartitionSortData( [ data["GROUP_0"].combine_chunks(), data["GROUP_1"].combine_chunks(), @@ -64,10 +64,10 @@ def test_merging(n, chunks, seed): gsds = [] - # Split test data into GroupSortData and sort + # Split test data into PartitionSortData and sort for start in range(0, n, chunks): batch = data.slice(start, chunks) - gsd = GroupSortData( + gsd = PartitionSortData( [ batch["GROUP_0"].combine_chunks(), batch["GROUP_1"].combine_chunks(), @@ -81,4 +81,4 @@ def test_merging(n, chunks, seed): gsds.append(gsd.sort()) # Test that merging matches sorted data - assert merge_groups(gsds).to_arrow().equals(data.sort_by(SORT_KEYS)) + assert merge_partitions(gsds).to_arrow().equals(data.sort_by(SORT_KEYS))