Skip to content

Commit

Permalink
added new version of wait() for eventuals and added non-blocking bulk…
Browse files Browse the repository at this point in the history
… transfers
  • Loading branch information
mdorier committed Dec 31, 2024
1 parent b3be1e7 commit ff6581f
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 3 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# -DCMAKE_PREFIX_PATH='/dir1;/dir2;/dir3'
#

cmake_minimum_required (VERSION 3.7)
cmake_minimum_required (VERSION 3.10)
project (thallium C CXX)
enable_testing ()

Expand Down
129 changes: 129 additions & 0 deletions include/thallium/bulk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,67 @@ class endpoint;
class remote_bulk;
class bulk_segment;


/**
* @brief The async_bulk_op class is returned by the push_to and pull_from
* methods and track a non-blocking RDMA operation.
*/
class async_bulk_op {

friend class remote_bulk;

public:

bool test() const {
if(m_request == MARGO_REQUEST_NULL)
throw exception{"Calling async_bulk_op::test() on a null request"};
int flag;
int ret = margo_test(m_request, &flag);
MARGO_ASSERT((hg_return_t)ret, margo_test);
return flag != 0;
}

std::size_t wait() {
if(m_request == MARGO_REQUEST_NULL)
throw exception{"Calling async_bulk_op::wait() on a null request"};
hg_return_t ret = margo_wait(m_request);
MARGO_ASSERT(ret, margo_wait);
return m_tranferred_size;
}

async_bulk_op(const async_bulk_op&) = delete;

async_bulk_op(async_bulk_op&& other)
: m_request{std::exchange(other.m_request, MARGO_REQUEST_NULL)}
{}

async_bulk_op& operator=(const async_bulk_op&) = delete;

async_bulk_op& operator=(async_bulk_op&& other) {
if(&other == this || m_request == other.m_request) return *this;
if(m_request != MARGO_REQUEST_NULL)
wait();
m_request = std::exchange(other.m_request, MARGO_REQUEST_NULL);
return *this;
}

~async_bulk_op() {
if(m_request != MARGO_REQUEST_NULL) {
wait();
}
}

private:

async_bulk_op(std::size_t size, margo_request req)
: m_tranferred_size{size}
, m_request{req}
{}

std::size_t m_tranferred_size;
margo_request m_request;
};

/**
* @brief bulk objects represent abstractions of memory
* segments exposed by a process for RDMA operations. A bulk
Expand Down Expand Up @@ -194,6 +255,19 @@ class bulk {
*/
std::size_t operator>>(const remote_bulk& b) const;

/**
* @brief Pushes data from the left operand (entire bulk object)
* to the right operand (remote_bulk). If the size of the
* segments don't match, the smallest size is used.
* This operation is non-blocking, returning an async_bulk_op
* object that the caller can wait on.
*
* @param b remote_bulk object towards which to push data.
*
* @return an async_bulk_op object.
*/
async_bulk_op push_to(const remote_bulk& b) const;

/**
* @brief Pulls data from the right operand (remote_bulk)
* to the left operand (bulk). If the size of the
Expand All @@ -205,6 +279,19 @@ class bulk {
*/
std::size_t operator<<(const remote_bulk& b) const;

/**
* @brief Pulls data from the right operand (remote_bulk)
* to the left operand (bulk). If the size of the
* segments don't match, the smallest size is used.
* This operation is non-blocking, returning an async_bulk_op
* object that the caller can wait on.
*
* @param b remote_bulk object from which to pull data.
*
* @return an async_bulk_op object.
*/
async_bulk_op pull_from(const remote_bulk& b) const;

/**
* @brief Returns the underlying hg_bulk_t handle.
* If copy == false, the returned handle is a reference to the internal
Expand Down Expand Up @@ -320,6 +407,19 @@ class bulk_segment {
*/
std::size_t operator>>(const remote_bulk& b) const;

/**
* @brief Pushes data from the left operand (bulk_segment)
* to the right operand (remote_bulk). If the size of the
* segments don't match, the smallest size is used.
* This operation is non-blocking, returning an async_bulk_op
* object that the caller can wait on.
*
* @param b remote_bulk object towards which to push data.
*
* @return an async_bulk_op object.
*/
async_bulk_op push_to(const remote_bulk& b) const;

/**
* @brief Pulls data from the right operand (remote_bulk)
* to the right operand (bulk_segment). If the size of the
Expand All @@ -331,6 +431,19 @@ class bulk_segment {
*/
std::size_t operator<<(const remote_bulk& b) const;

/**
* @brief Pulls data from the right operand (remote_bulk)
* to the left operand (bulk_segment). If the size of the
* segments don't match, the smallest size is used.
* This operation is non-blocking, returning an async_bulk_op
* object that the caller can wait on.
*
* @param b remote_bulk object from which to pull data.
*
* @return an async_bulk_op object.
*/
async_bulk_op pull_from(const remote_bulk& b) const;

/**
* @brief Selects a subsegment from this segment. If the size is too
* large, the maximum possible size is chosen.
Expand Down Expand Up @@ -390,18 +503,34 @@ inline std::size_t bulk_segment::operator>>(const remote_bulk& b) const {
return b << *this;
}

inline async_bulk_op bulk_segment::push_to(const remote_bulk& b) const {
return b.push_from(*this);
}

inline std::size_t bulk_segment::operator<<(const remote_bulk& b) const {
return b >> *this;
}

inline async_bulk_op bulk_segment::pull_from(const remote_bulk& b) const {
return b.pull_to(*this);
}

inline std::size_t bulk::operator>>(const remote_bulk& b) const {
return b << (this->select(0, size()));
}

inline async_bulk_op bulk::push_to(const remote_bulk& b) const {
return b.push_from(*this);
}

inline std::size_t bulk::operator<<(const remote_bulk& b) const {
return b >> (this->select(0, size()));
}

inline async_bulk_op bulk::pull_from(const remote_bulk& b) const {
return b.pull_to(*this);
}

} // namespace thallium

#endif
17 changes: 15 additions & 2 deletions include/thallium/eventual.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,21 @@ template <typename T> class eventual {
*
* @return The value stored in the eventual.
*/
value_type wait() {
value_type wait() const & {
TL_EVENTUAL_ASSERT(ABT_eventual_wait(m_eventual, nullptr));
return m_value;
}

/**
* @brief Wait on the eventual.
*
* @return The value stored in the eventual.
*/
value_type&& wait() const && {
TL_EVENTUAL_ASSERT(ABT_eventual_wait(m_eventual, nullptr));
return std::move(m_value);
}

/**
* @brief Test the eventual.
*/
Expand All @@ -152,7 +162,10 @@ template <typename T> class eventual {
/**
* @brief Reset the eventual.
*/
void reset() { TL_EVENTUAL_ASSERT(ABT_eventual_reset(m_eventual)); }
void reset() {
m_value = value_type{};
TL_EVENTUAL_ASSERT(ABT_eventual_reset(m_eventual));
}
};

/**
Expand Down
72 changes: 72 additions & 0 deletions include/thallium/remote_bulk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,19 @@ class remote_bulk {
*/
std::size_t operator>>(const bulk_segment& dest) const;

/**
* @brief Pulls data from the left operand (remote_bulk)
* to the right operand (bulk_segment). If the size of the
* segments don't match, the smallest size is used.
* This operation is non-blocking, returning an async_bulk_op
* object that the caller can wait on.
*
* @param dest bulk_segment object towards which to pull data.
*
* @return an async_bulk_op object.
*/
async_bulk_op pull_to(const bulk_segment& dest) const;

/**
* @brief Performs a push operation from the source bulk
* (right operand) to the remote_bulk (left operand).
Expand All @@ -85,6 +98,19 @@ class remote_bulk {
*/
std::size_t operator<<(const bulk_segment& src) const;

/**
* @brief Pushes data from the right operand (bulk_segment)
* to the left operand (remote_bulk). If the size of the
* segments don't match, the smallest size is used.
* This operation is non-blocking, returning an async_bulk_op
* object that the caller can wait on.
*
* @param src Local bulk segment from which to push data.
*
* @return an async_bulk_op object.
*/
async_bulk_op push_from(const bulk_segment& src) const;

/**
* @brief Creates a bulk_segment object by selecting a given portion
* of the bulk object given an offset and a size.
Expand Down Expand Up @@ -134,6 +160,29 @@ inline std::size_t remote_bulk::operator>>(const bulk_segment& dest) const {
return size;
}

inline async_bulk_op remote_bulk::pull_to(const bulk_segment& dest) const {

margo_instance_id mid = m_endpoint.m_mid;
hg_bulk_op_t op = HG_BULK_PULL;
hg_addr_t origin_addr = m_endpoint.m_addr;
hg_bulk_t origin_handle = m_segment.m_bulk.m_bulk;
size_t origin_offset = m_segment.m_offset;
hg_bulk_t local_handle = dest.m_bulk.m_bulk;
size_t local_offset = dest.m_offset;
size_t size = dest.m_size;
margo_request req = MARGO_REQUEST_NULL;

if(size > m_segment.m_size)
size = m_segment.m_size;

hg_return_t ret =
margo_bulk_itransfer(mid, op, origin_addr, origin_handle, origin_offset,
local_handle, local_offset, size, &req);
MARGO_ASSERT(ret, margo_bulk_itransfer);

return async_bulk_op{size, req};
}

inline std::size_t remote_bulk::operator<<(const bulk_segment& src) const {

margo_instance_id mid = m_endpoint.m_mid;
Expand All @@ -156,6 +205,29 @@ inline std::size_t remote_bulk::operator<<(const bulk_segment& src) const {
return size;
}

inline async_bulk_op remote_bulk::push_from(const bulk_segment& src) const {

margo_instance_id mid = m_endpoint.m_mid;
hg_bulk_op_t op = HG_BULK_PUSH;
hg_addr_t origin_addr = m_endpoint.m_addr;
hg_bulk_t origin_handle = m_segment.m_bulk.m_bulk;
size_t origin_offset = m_segment.m_offset;
hg_bulk_t local_handle = src.m_bulk.m_bulk;
size_t local_offset = src.m_offset;
size_t size = src.m_size;
margo_request req = MARGO_REQUEST_NULL;

if(size > m_segment.m_size)
size = m_segment.m_size;

hg_return_t ret =
margo_bulk_itransfer(mid, op, origin_addr, origin_handle, origin_offset,
local_handle, local_offset, size, &req);
MARGO_ASSERT(ret, margo_bulk_itransfer);

return async_bulk_op{size, req};
}

} // namespace thallium

#endif

0 comments on commit ff6581f

Please sign in to comment.