Skip to content

Commit

Permalink
feat: add priority
Browse files Browse the repository at this point in the history
  • Loading branch information
chloro-pn committed Jul 10, 2023
1 parent 6252e7a commit 04ad1c5
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 6 deletions.
5 changes: 5 additions & 0 deletions async_simple/Executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ class Executor {
// func will not be executed. In case schedule return true, the executor
// should guarantee that the func would be executed.
virtual bool schedule(Func func) = 0;

virtual bool scheduleWithPriority(Func func, int8_t priority) {
return schedule(std::move(func));
}

// Return true if caller runs in the executor.
virtual bool currentThreadInExecutor() const {
throw std::logic_error("Not implemented");
Expand Down
9 changes: 7 additions & 2 deletions async_simple/coro/Collect.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ struct CollectAllAwaiter {
auto executor = promise_type._executor;
for (size_t i = 0; i < _input.size(); ++i) {
auto& exec = _input[i]._coro.promise()._executor;
int8_t priority = _input[i]._coro.promise()._priority;
if (exec == nullptr) {
exec = executor;
}
Expand All @@ -293,7 +294,7 @@ struct CollectAllAwaiter {
if (Para == true && _input.size() > 1) {
if (exec != nullptr)
AS_LIKELY {
exec->schedule(func);
exec->scheduleWithPriority(std::move(func), priority);
continue;
}
}
Expand Down Expand Up @@ -420,6 +421,7 @@ struct CollectAllVariadicAwaiter {
(
[executor, this](auto& lazy, auto& result) {
auto& exec = lazy._coro.promise()._executor;
int8_t priority = lazy._coro.promise()._priority;
if (exec == nullptr) {
exec = executor;
}
Expand All @@ -434,7 +436,10 @@ struct CollectAllVariadicAwaiter {

if constexpr (Para == true && sizeof...(Ts) > 1) {
if (exec != nullptr)
AS_LIKELY { exec->schedule(std::move(func)); }
AS_LIKELY {
exec->scheduleWithPriority(std::move(func),
priority);
}
else
AS_UNLIKELY { func(); }
} else {
Expand Down
3 changes: 2 additions & 1 deletion async_simple/coro/Dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class DispatchAwaiter {
}
Executor* old_ex = h.promise()._executor;
ChangeLaziessExecutorTo(h, _ex);
bool succ = _ex->schedule(std::move(h));
int8_t priority = h.promise()._priority;
bool succ = _ex->scheduleWithPriority(std::move(h), priority);
// cannot access *this after schedule.
// If the scheduling fails, we must change the executor back to its
// original value, as the user may catch exceptions and handle them
Expand Down
23 changes: 20 additions & 3 deletions async_simple/coro/Lazy.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ struct Yield {};
template <typename T = void>
struct LazyLocals {};

struct Priority {};

namespace detail {
template <class, typename OAlloc, bool Para>
struct CollectAllAwaiter;
Expand Down Expand Up @@ -98,7 +100,8 @@ class LazyPromiseBase {

logicAssert(_executor,
"Yielding is only meaningful with an executor!");
_executor->schedule(std::move(handle));
int8_t priority = handle.promise()._priority;
_executor->scheduleWithPriority(std::move(handle), priority);
}
void await_resume() noexcept {}

Expand All @@ -107,7 +110,8 @@ class LazyPromiseBase {
};

public:
LazyPromiseBase() : _executor(nullptr), _lazy_local(nullptr) {}
LazyPromiseBase()
: _executor(nullptr), _lazy_local(nullptr), _priority(0) {}
// Lazily started, coroutine will not execute until first resume() is called
std::suspend_always initial_suspend() noexcept { return {}; }
FinalAwaiter final_suspend() noexcept { return {}; }
Expand All @@ -127,13 +131,16 @@ class LazyPromiseBase {
return ReadyAwaiter<T*>(static_cast<T*>(_lazy_local));
}

auto await_transform(Priority) { return ReadyAwaiter<int8_t>(_priority); }

auto await_transform(Yield) { return YieldAwaiter(_executor); }

/// IMPORTANT: _continuation should be the first member due to the
/// requirement of dbg script.
std::coroutine_handle<> _continuation;
Executor* _executor;
void* _lazy_local;
int8_t _priority;
};

template <typename T>
Expand Down Expand Up @@ -290,6 +297,8 @@ class LazyBase {
PromiseType>::value) {
this->_handle.promise()._lazy_local =
continuation.promise()._lazy_local;
this->_handle.promise()._priority =
continuation.promise()._priority;
}
return awaitSuspendImpl();
}
Expand All @@ -300,7 +309,8 @@ class LazyBase {
// executor schedule performed
auto& pr = this->_handle.promise();
logicAssert(pr._executor, "RescheduleLazy need executor");
pr._executor->schedule(this->_handle);
int8_t priority = pr._priority;
pr._executor->scheduleWithPriority(this->_handle, priority);
} else {
return this->_handle;
}
Expand Down Expand Up @@ -532,6 +542,13 @@ class [[nodiscard]] RescheduleLazy
return RescheduleLazy<T>(std::exchange(this->_coro, nullptr));
}

RescheduleLazy<T> setPriority(int8_t priority) && {
logicAssert(this->_coro.operator bool(),
"Lazy do not have a coroutine_handle");
this->_coro.promise()._priority = priority;
return RescheduleLazy<T>(std::exchange(this->_coro, nullptr));
}

[[deprecated(
"RescheduleLazy should be only allowed in DetachedCoroutine")]] auto
operator co_await() {
Expand Down
10 changes: 10 additions & 0 deletions async_simple/coro/test/LazyTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,16 @@ TEST_F(LazyTest, testYield) {
m2.unlock();
}

TEST_F(LazyTest, testPriority) {
int8_t priority = 0;
auto test = [&]() -> Lazy<void> {
priority = co_await Priority{};
co_return;
};
syncAwait(test().via(&_executor).setPriority(100));
EXPECT_EQ(priority, 100);
}

TEST_F(LazyTest, testVoid) {
std::atomic<int> value = 0;
auto test = [this, &value]() -> Lazy<> {
Expand Down

0 comments on commit 04ad1c5

Please sign in to comment.