Skip to content

Commit

Permalink
refactor syncAwait
Browse files Browse the repository at this point in the history
  • Loading branch information
chloro-pn committed Dec 25, 2024
1 parent 35f68b5 commit 5c0f87f
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 12 deletions.
3 changes: 3 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,8 @@ cc_library(
"@platforms//os:windows": [],
"//conditions:default": ["-lpthread"],
}),
deps = [
":simple_executors",
],
visibility = ["//visibility:public"],
)
1 change: 1 addition & 0 deletions async_simple/coro/Lazy.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ class LazyBase {
using promise_type = detail::LazyPromise<T>;
using Handle = CoroHandle<promise_type>;
using ValueType = T;
static constexpr bool isReschedule = reschedule;

struct AwaiterBase : public detail::LazyAwaiterBase<T> {
using Base = detail::LazyAwaiterBase<T>;
Expand Down
45 changes: 43 additions & 2 deletions async_simple/coro/SyncAwait.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@
#ifndef ASYNC_SIMPLE_CORO_SYNC_AWAIT_H
#define ASYNC_SIMPLE_CORO_SYNC_AWAIT_H

#include <condition_variable>
#include <mutex>
#include <type_traits>
#include <utility>
#include "async_simple/coro/Lazy.h"
#ifndef ASYNC_SIMPLE_USE_MODULES
#include "async_simple/Common.h"
#include "async_simple/Executor.h"
#include "async_simple/Try.h"
#include "async_simple/executors/LocalExecutor.h"
#include "async_simple/util/Condition.h"

#endif // ASYNC_SIMPLE_USE_MODULES
Expand All @@ -31,14 +37,15 @@ namespace coro {
// be returned. Do not syncAwait in the same executor with Lazy, this may lead
// to a deadlock.
template <typename LazyType>
inline auto syncAwait(LazyType &&lazy) {
requires std::remove_cvref_t<LazyType>::isReschedule inline auto syncAwait(
LazyType &&lazy) {
auto executor = lazy.getExecutor();
if (executor)
logicAssert(!executor->currentThreadInExecutor(),
"do not sync await in the same executor with Lazy");

util::Condition cond;
using ValueType = typename std::decay_t<LazyType>::ValueType;
using ValueType = typename std::remove_cvref_t<LazyType>::ValueType;

Try<ValueType> value;
std::move(std::forward<LazyType>(lazy))
Expand All @@ -50,6 +57,40 @@ inline auto syncAwait(LazyType &&lazy) {
return std::move(value).value();
}

/*
* note:
* 有问题,因为用户可能自定义awaiter,将resume交给其他执行流执行,因此不能确定loop退出后还有无协程调度,这种情况下会导致死锁。
* 并且调度器需要保证线程安全,因为可能resume的时候将会在其他线程schedule。
* 解决办法:1.
* cv+mutex+quit_flag,这种情况对性能有损伤(不过现代架构下锁争抢是性能衰退的主要原因,无冲突的加解锁可以接受)。
* 2.
* 由用户保证awaiter不会在其他线程/执行流resume和schedule,这种情况没法显式约束。
*/
/*
* update:
* 目前是cv+mutex_quit_flag的实现。
*/
template <typename LazyType>
requires(!std::remove_cvref_t<LazyType>::isReschedule) inline auto syncAwait(
LazyType &&lazy) {
std::condition_variable cv;
std::mutex mut;
bool quit_flag{false};
executors::LocalExecutor local_ex(cv, mut, quit_flag);
using ValueType = typename std::remove_cvref_t<LazyType>::ValueType;
Try<ValueType> value;
std::move(std::forward<LazyType>(lazy))
.via(&local_ex)
.start([&cv, &mut, &quit_flag, &value](Try<ValueType> result) {
value = std::move(result);
std::unique_lock<std::mutex> lock(mut);
quit_flag = true;
cv.notify_all();
});
local_ex.Loop();
return std::move(value).value();
}

// A simple wrapper to ease the use.
template <typename LazyType>
inline auto syncAwait(LazyType &&lazy, Executor *ex) {
Expand Down
7 changes: 2 additions & 5 deletions async_simple/coro/test/LocalExecutorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <coroutine>
#include "async_simple/Try.h"
#include "async_simple/coro/Lazy.h"
#include "async_simple/executors/LocalExecutor.h"
#include "async_simple/coro/SyncAwait.h"

#include "async_simple/test/unittest.h"

Expand Down Expand Up @@ -64,10 +64,7 @@ Lazy<int> coro_compute() {
}

TEST(LocalExecutorTest, testStackOverflow) {
executors::LocalExecutor ex;
int result{0};
coro_compute().via(&ex).start([&](Try<int> i) { result = i.value(); });
ex.Loop();
int result = syncAwait(coro_compute());
EXPECT_EQ(result, 49950000);
}

Expand Down
27 changes: 22 additions & 5 deletions async_simple/executors/LocalExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
#ifndef ASYNC_LOCAL_EXECUTOR_H
#define ASYNC_LOCAL_EXECUTOR_H

#include <condition_variable>
#include <deque>
#include <mutex>
#include "async_simple/Executor.h"

namespace async_simple {
Expand All @@ -26,26 +28,41 @@ namespace executors {

class LocalExecutor : public Executor {
public:
LocalExecutor() : Executor("local executor") {}
LocalExecutor(std::condition_variable& cv, std::mutex& mut,
const bool& quit)
: Executor("local executor"), cv_(cv), mut_(mut), quit_(quit) {}

bool schedule(Func func) override {
std::unique_lock<std::mutex> lock(mut_);
ready_queue_.push_back(std::move(func));
cv_.notify_all();
return true;
}

void Loop() {
while (true) {
if (ready_queue_.empty()) {
std::unique_lock<std::mutex> lock(mut_);
cv_.wait(lock, [this]() {
return quit_ == true || !ready_queue_.empty();
});
if (!ready_queue_.empty()) {
Func func = std::move(ready_queue_.front());
ready_queue_.pop_front();
lock.unlock();
func();
continue;
}
if (quit_ == true) {
return;
}
Func func = std::move(ready_queue_.front());
ready_queue_.pop_front();
func();
}
}

private:
std::deque<Func> ready_queue_;
std::condition_variable& cv_;
std::mutex& mut_;
const bool& quit_;
};

} // namespace executors
Expand Down

0 comments on commit 5c0f87f

Please sign in to comment.