上一篇文章中我们最后实现了一个 Generic Generator 样例,并且展示了多种用法包括:for loop / send 等,作为本系列的最后一篇我们将实现一个 “稍微” 复杂一些的 Coroutine 功能 —— 一个异步调度框架的原型。
我们稍微回顾一下本系列第一篇文章,印象深刻的朋友一定会问:全文开头画的饼没有实现呀,具体来说下面这个东西,好像和你写的东西不太一样呀?
socket.listen()
conn = await socket.accept()
data = await conn.read()
await conn.write(buffer)
await conn.close()
我在之前的三篇文章中核心是介绍C++ coroutine的用法,而要实现上面类似的功能我们实际上需要一个“异步运行时框架”,当然还有很多其它名字比如:调度器、并行框架等等,其实大部分情况下都是指的一个事情。接下来我们将实现一个非常简单的调度器,展示如何在实际场景中应用Coroutine的能力。
在展示代码之前首先希望大家能思考一个问题:
对于一个Coroutine调度器,其核心是解决什么问题?
我们仔细想想:中断函数执行、保存函数现场、恢复函数执行、异常处理、返回值处理,C++ coroutine机制都已经实现了,我们不需要做任何事情编译器会解决一切,那么还没有解决的是什么?回忆一下在第一篇文章中的几个例子,main函数除了调用 counter 函数、输出值到cout之外还需要做什么?
其实答案是非常明显的,调度器的核心就是:何时、在哪里,恢复 coroutine 的执行?
诚然一个工业级的调度器要考虑非常多细节但是核心还是这个,我们接下来的代码就会展示这一点:
#include
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std::chrono_literals;
using spawn_function = std::function)>;
template
class awaitable {
public:
//
// Promise type
//
class promise_type {
public:
awaitable get_return_object() {
// Create a new awaitable object. It's awaitable's responsible to destroy handle
return awaitable(std::coroutine_handle::from_promise(*this));
}
std::suspend_always initial_suspend() noexcept { return {}; }
std::suspend_always final_suspend() noexcept {
if (spawn_ && caller_handle_) {
// The callee is completed, and we should schedule the await_resume of caller. (by calling caller_handle())
spawn_(caller_handle_);
}
return {};
}
void unhandled_exception() {
// Store exception
exception_ = std::current_exception();
}
template U>
void return_value(U&& value) {
// Store return value
value_ = std::forward(value);
}
void set_caller(std::coroutine_handle<> handle) {
// Store the caller of current coroutine.
// This function may be called multiple times (one time per co_await from caller)
caller_handle_ = handle;
}
//
// Get & set spawn function. The handle only by ran when spawn function is set.
// The spawn function may be changed at any time current coroutine is suspended.
// That means the current coroutine or the caller's coroutine may resume at different thread.
//
spawn_function get_spawn() { return spawn_; }
void set_spawn(spawn_function f) {
spawn_ = f;
if (f && !init_spawned_) {
init_spawned_ = true;
// Schedule current coroutine to continue from initial_suspend
f(std::coroutine_handle::from_promise(*this));
}
}
private:
friend awaitable;
// Check if current coroutine has been resumed after initial suspend.
bool init_spawned_ = false;
// The spawn function
spawn_function spawn_;
// Store the return value & exception
std::optional value_;
std::exception_ptr exception_;
// The caller coroutine handle
std::coroutine_handle<> caller_handle_;
};
//
// Awaitable
//
~awaitable() noexcept {
// Destroy the handle
handle_.destroy();
}
awaitable(const awaitable&) = delete; // Cannot copy awaitable
awaitable(awaitable&& other) noexcept : handle_(std::exchange(other.handle_, {})) {
// NOTE: This is tricky. A moved awaitable is not available any more but I didn't handle the case.
}
constexpr bool await_ready() const noexcept { return false; }
void await_suspend(std::coroutine_handle h) {
// Progragate spawn function from caller to callee and set caller. We can then call spawn_(caller_handle) to resume
// the caller later.
// NOTE:
// [handle_] is the [callee]'s coroutine_handle
// [h] is the [caller]'s coroutin_handle
auto& promise = handle_.promise();
promise.set_caller(h);
promise.set_spawn(h.promise().get_spawn());
}
T& await_resume() noexcept { return value(); }
bool done() noexcept { return handle_.done(); }
T& value() noexcept {
auto& promise = handle_.promise();
if (promise.exception_) {
std::rethrow_exception(promise.exception_);
}
return *promise.value_;
}
spawn_function get_spawn() { return handle_.promise().get_spawn(); }
void set_spawn(spawn_function f) { return handle_.promise().set_spawn(f); }
private:
friend promise_type;
explicit awaitable(std::coroutine_handle handle) noexcept : handle_(handle) {}
// The callee corouting handle
std::coroutine_handle handle_;
};
template
class await_callback {
public:
await_callback() {}
constexpr bool await_ready() const noexcept { return false; }
bool await_suspend(std::coroutine_handle::promise_type> h) {
handle_ = h;
return false;
}
auto await_resume() noexcept {
return [h = this->handle_] {
// Add the handle to scheduler to continue the coroutine.
h.promise().get_spawn()(h);
};
}
private:
std::coroutine_handle::promise_type> handle_;
};
awaitable mock_heavy_func(int x) {
std::cout << "[mock_heavy_func] Run\n";
auto callback = co_await await_callback(); // Will not suspend
// Schedule a thread and sleep for sometime.
std::thread thread([x, callback] {
std::cout << "[mock_heavy_func] Wait in thread\n";
std::this_thread::sleep_for(x * 1ms);
std::cout << "[mock_heavy_func] Awake in thread\n";
callback(); // Tell current coroutine to continue
});
thread.detach();
co_await std::suspend_always{};
// Will reach here after callback is called
co_return x;
}
awaitable simple_func(int x) {
std::cout << "[simple_func] Run\n";
auto value = co_await mock_heavy_func(x);
std::cout << "[simple_func] Complete\n";
co_return value + 1;
}
awaitable complex_func() {
std::cout << "[complex_func] Run\n";
auto await1 = simple_func(100);
auto await2 = simple_func(500);
auto await3 = simple_func(1000);
auto await4 = simple_func(2000);
std::cout << "[complex_func] Wait\n";
auto value = co_await await1 + co_await await2 + co_await await3 + co_await await4;
std::cout << "[complex_func] Done\n";
co_return value;
}
//
// A simple scheduler
//
template
T spawn(awaitable&& task) {
//
// Handle queue and spawn function
//
std::mutex m;
std::counting_semaphore queue_size{0};
std::queue> h_queue;
spawn_function spawn = [&m, &queue_size, &h_queue](std::coroutine_handle<> h) {
{
std::lock_guard lock(m);
h_queue.emplace(h);
}
queue_size.release();
};
// Set spawn function (And will actually run the function)
task.set_spawn(spawn);
//
// Run handles (Yes, we can implement it in a multi-thread way, it's quite easy to do that)
//
// For an industrial implementation, we must consider the following things:
//
// 1. Cancellation
// 2. Effective way to enqueue / dequeue
// 3. Dead lock detection
// 4. ...
//
// Remember: this is just a very simple prototype.
//
while (!task.done()) {
// When all coroutines of current scheduler are suspend. The scheduler should wait for any coroutine becoming ready
// to continue (e.g. data received from a socket, user input a string, ...).
queue_size.acquire();
// Run handles
std::coroutine_handle<> handle;
{
std::lock_guard lock(m);
handle = h_queue.front();
h_queue.pop();
}
handle();
}
return task.value();
}
int main() {
// Spawn the complex function and wait for it
auto result = spawn(complex_func());
std::cout << "[main] result:" << result << std::endl;
return 0;
}
/*
Outputs:
[complex_func] Run
[complex_func] Wait
[simple_func] Run
[mock_heavy_func] Run
[mock_heavy_func] Wait in thread
[mock_heavy_func] Awake in thread
[simple_func] Complete
[simple_func] Run
[mock_heavy_func] Run
[mock_heavy_func] Wait in thread
[mock_heavy_func] Awake in thread
[simple_func] Complete
[simple_func] Run
[mock_heavy_func] Run
[mock_heavy_func] Wait in thread
[mock_heavy_func] Awake in thread
[simple_func] Complete
[simple_func] Run
[mock_heavy_func] Run
[mock_heavy_func] Wait in thread
[mock_heavy_func] Awake in thread
[simple_func] Complete
[complex_func] Done
[main] result:3604
*/
(注释写英文纯粹是因为习惯,早年被utf-8 / gbk搞麻了……)
上面就是包含了调度器本身和测试函数的全部代码,真的是非常非常短吧?去掉注释可能只有100行这样子。
基于前文的叙述分析这段代码,我们首先注意这两个:
- 谁负责确定一个coroutine要继续执行?
- 谁负责真正继续执行一个coroutine?
我们看下面这个代码片段:
awaitable simple_func(int x) {
auto value = co_await mock_heavy_func(x);
co_return value + 1;
}
当 mock_heavy_func 这个 coroutine function 执行完毕之后,simple_func需要从 co_await 的位置恢复执行。这件事件不会自动发生,直到 simple_func 的 coroutine_handle 被调用。
这里有些朋友往往会有一个疑问:为什么之前的例子中的main函数在counter suspend后都会自动执行?实际上这两个现象之间没什么关系,但是因为都是同样的“调用了一个coroutine函数”造成了一些幻觉:
- main 函数自身不是coroutine function,因此不具备暂停的能力(没有coroutine handle,没有编译器保存的现成),因此也无法谈论“恢复执行”
- 其次,coroutine function 会 suspend 在一个 co_await / co_yield / co_return 的位置(相应的参数指明需要暂停时)。只有 coroutine function 才能使用这些关键字,main函数不能使用也自然不会有这个问题
那么我们考虑下面的例子,方便大家终极理解这个问题:
// 三个coroutine function: co1/co2/co3,全部initial_suspend返回suspend_always
awaitable co1() {...}
awaitable co2() { co_await co1(); }
awaitable co3() { co_await co2(); }
int main() {
co3();
// 这一行代码什么时候执行?
std::cout << "hahaha";
return 0;
}
main调用了co3,co3调用co2,co2调用co1,那么字符串“hahaha”是在co3执行完后输出的吗?还是在其他什么别的函数执行完输出的?
答案是:main调用co3后,co3在真正执行前就会 suspend,然后线程返回main继续执行,输出hahaha,也就是说:第4行还没有开始执行,而第3,2行根本还没来得及调用。
这段代码没有恢复任何 coroutine 的执行,因此直到退出时co1也没有机会被运行到的。这一段理解了,那么我想大家就会想到:那我们就像第一篇中的例子一样把 co3 的 coroutine_handle 传递给 main 函数,main 调用一次就可以恢复 co3 的执行了。
说的对,确实可以这样,但是 co3 开始执行 co_await co2() 后,co2又 suspend 住了?谁来恢复 co2 的执行呢?诚然我们可以用很多trick把 co2 的 coroutine_handle 传递给 main 函数,但是可以想象,如果调用栈极为复杂甚至不可预期,那么这种trick很快就无法保持下去了。
聪明的朋友也许就想到了:我们可以搞个全局的变量,一个vector,任何 coroutine 函数把自己的 coroutine_handle 写到这个 vector 里,然后 main 函数不断轮训这个 vector 调用 coroutine_handle 就行了!
事实上就是这样,虽然不优雅但是原理是对的,调度器的各类优化(绑定不同的 vector / queue或者其他什么数据结构、用多个线程同时从这个数据结构里取出 handle 来加快执行等等)其原理其实就是这样的。
当然我的例子里不会涉及这么多优化(实际上并不难大家可以自己写写),只优化了一个点:不使用一个全局变量,而是提供一个入口函数 spawn,这个不是 coroutine function 的入口函数负责维护所有由这个函数调用产生的 coroutine_handle 的队列以及在必要时调用 handle。而 coroutine 函数之间的调用会自动传递 spawn 初始给定的队列对象。
下面我们详细分析几段代码:
第206 - 215行,这是 spawn 函数初始化 coroutine_handle 队列数据结构的地方,创建了一个队列、一个信号量和一个互斥体,熟悉多线程编程的朋友对这些很熟悉了,实际上就是一个 blockable producer / consumer queue 的玩意儿。然后创建一个 spawn lambda function,用来封装一下 enqueue 操作。
第234行,把封装好的 spawn function 传入task —— 一个初始化的task没有绑定 spawn 函数,因此并不会真正执行,如同上面co1 / co2 / co3那个例子中的情况,会 suspend 在函数的入口处。我们给 task 设置好了 spawn function 的同时 task 就会调用 spawn 来将自己的 coroutine_handle 写入到队列中。而后 task 调用的任何其他 coroutine function 都会自动传递这个 spawn function。
第232 - 218行,一个判断task执行执行完毕的 while 循环,如果没有执行完毕就一直等待新的 coroutine_handle 被写入队列,这是一个标准的 consumer 实现,没什么好解释的了。
这里多说一句,如果你想让更多线程同时执行 coroutine_handle 来提高性能,可以把这一块儿代码修改为多线程消费 consumer。
上面主要是 spawn 函数干的事情,接下来我们看 awaitable 的实现,主要看两件事:
- awaitable 是如何在 coroutine 之间传递 spawn function 的?
- awaitable 何时调用 spawn function 把 coroutine_handle 写入队列的?
第103 - 112行,当 awaitable 对象被 co_await suspend 时,我们需要确保当被调用的 coroutine function (Aka. callee )执行完毕时恢复调用 co_await 的函数(Aka. caller)的执行。为了做到这一点,callee需要 spawn function 以及 caller 的 coroutine_handle ,当 callee 执行到 final_suspend 时,把 caller 的 coroutine_handle 通过调用 spawn 写入到队列中。
有点儿忘记 caller / callee 概念的朋友,回到上面 co1 / co2 / co3 的例子,对于:awaitable co3() { co_await co2(); } 这行代码里的 co_await 来说,其 awaitable::await_suspend 里 callee就是co2,caller就是co3。caller是调用者,callee是被调用者。
这一段代码就是把 caller 的 spawn function 和 coroutine_handle 写入到 callee 的 promise 中。
第31 - 37行,当 coroutine function 执行完毕后,会尝试调用 spawn 将 caller 的 coroutine_handle 写入队列。
这里还有一个需要注意的地方,就是第63 - 70行,在设置 spawn function 时,需要额外判断一次要不要立即开始执行当前函数,因为:
- 所有 coroutine function 会 suspend 在函数的起始位置,必须将其 coroutine_handle 加入队列才能继续执行函数
- 对于入口 coroutine function complex_func,其是通过手动调用 set_spawn 来设置 spawn_function 的
- 任何 coroutine function 如果还没有在 initial suspend 中恢复,应当立即恢复执行,而非等待 caller coroutine function 执行完毕。
基于以上可以容易理解这段代码的含义,并理解为什么第234行能让这个异步框架“开始运行”。
以上就是本文要介绍的全部内容了,其他代码都是比较常规、在之前的文章中都有涉及。大家如果还有什么疑惑的地方,可以留言评论。
至此,C++ coroutine 系列全部4篇文章就结束了,相信大家对如何利用好 C++20 带来的强大的 Coroutine 机制有了较好的了解。我非常建议大家后续读一读 boost::asio 的 awaitable 实现,具有非常好的示范意义。当然其极端的模板用法确实令人有些望而却步,需要很多耐心。
喜欢就点个赞、收个藏、关个注吧!