百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分析 > 正文

聊聊C++20最大的变革之一 —— Coroutine,看不懂你打我(四)

liebian365 2025-02-06 15:36 16 浏览 0 评论

上一篇文章中我们最后实现了一个 Generic Generator 样例,并且展示了多种用法包括:for loop / send 等,作为本系列的最后一篇我们将实现一个 “稍微” 复杂一些的 Coroutine 功能 —— 一个异步调度框架的原型。

我们稍微回顾一下本系列第一篇文章,印象深刻的朋友一定会问:全文开头画的饼没有实现呀,具体来说下面这个东西,好像和你写的东西不太一样呀?

socket.listen()
conn = await socket.accept()
data = await conn.read()
await conn.write(buffer)
await conn.close()

我在之前的三篇文章中核心是介绍C++ coroutine的用法,而要实现上面类似的功能我们实际上需要一个“异步运行时框架”,当然还有很多其它名字比如:调度器、并行框架等等,其实大部分情况下都是指的一个事情。接下来我们将实现一个非常简单的调度器,展示如何在实际场景中应用Coroutine的能力。

在展示代码之前首先希望大家能思考一个问题:

对于一个Coroutine调度器,其核心是解决什么问题?

我们仔细想想:中断函数执行、保存函数现场、恢复函数执行、异常处理、返回值处理,C++ coroutine机制都已经实现了,我们不需要做任何事情编译器会解决一切,那么还没有解决的是什么?回忆一下在第一篇文章中的几个例子,main函数除了调用 counter 函数、输出值到cout之外还需要做什么?

其实答案是非常明显的,调度器的核心就是:何时、在哪里,恢复 coroutine 的执行?

诚然一个工业级的调度器要考虑非常多细节但是核心还是这个,我们接下来的代码就会展示这一点:

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

using namespace std::chrono_literals;

using spawn_function = std::function)>;

template 
class awaitable {
 public:
  //
  // Promise type
  //

  class promise_type {
   public:
    awaitable get_return_object() {
      // Create a new awaitable object. It's awaitable's responsible to destroy handle
      return awaitable(std::coroutine_handle::from_promise(*this));
    }

    std::suspend_always initial_suspend() noexcept { return {}; }

    std::suspend_always final_suspend() noexcept {
      if (spawn_ && caller_handle_) {
        // The callee is completed, and we should schedule the await_resume of caller. (by calling caller_handle())
        spawn_(caller_handle_);
      }
      return {};
    }

    void unhandled_exception() {
      // Store exception
      exception_ = std::current_exception();
    }

    template  U>
    void return_value(U&& value) {
      // Store return value
      value_ = std::forward(value);
    }

    void set_caller(std::coroutine_handle<> handle) {
      // Store the caller of current coroutine.
      // This function may be called multiple times (one time per co_await from caller)
      caller_handle_ = handle;
    }

    //
    // Get & set spawn function. The handle only by ran when spawn function is set.
    // The spawn function may be changed at any time current coroutine is suspended.
    // That means the current coroutine or the caller's coroutine may resume at different thread.
    //
    spawn_function get_spawn() { return spawn_; }

    void set_spawn(spawn_function f) {
      spawn_ = f;
      if (f && !init_spawned_) {
        init_spawned_ = true;
        // Schedule current coroutine to continue from initial_suspend
        f(std::coroutine_handle::from_promise(*this));
      }
    }

   private:
    friend awaitable;

    // Check if current coroutine has been resumed after initial suspend.
    bool init_spawned_ = false;
    // The spawn function
    spawn_function spawn_;
    // Store the return value & exception
    std::optional value_;
    std::exception_ptr exception_;
    // The caller coroutine handle
    std::coroutine_handle<> caller_handle_;
  };

  //
  // Awaitable
  //

  ~awaitable() noexcept {
    // Destroy the handle
    handle_.destroy();
  }

  awaitable(const awaitable&) = delete;  // Cannot copy awaitable

  awaitable(awaitable&& other) noexcept : handle_(std::exchange(other.handle_, {})) {
    // NOTE: This is tricky. A moved awaitable is not available any more but I didn't handle the case.
  }

  constexpr bool await_ready() const noexcept { return false; }

  void await_suspend(std::coroutine_handle h) {
    // Progragate spawn function from caller to callee and set caller. We can then call spawn_(caller_handle) to resume
    // the caller later.
    // NOTE:
    //  [handle_] is the [callee]'s coroutine_handle
    //  [h] is the [caller]'s coroutin_handle
    auto& promise = handle_.promise();
    promise.set_caller(h);
    promise.set_spawn(h.promise().get_spawn());
  }

  T& await_resume() noexcept { return value(); }

  bool done() noexcept { return handle_.done(); }

  T& value() noexcept {
    auto& promise = handle_.promise();
    if (promise.exception_) {
      std::rethrow_exception(promise.exception_);
    }
    return *promise.value_;
  }

  spawn_function get_spawn() { return handle_.promise().get_spawn(); }

  void set_spawn(spawn_function f) { return handle_.promise().set_spawn(f); }

 private:
  friend promise_type;

  explicit awaitable(std::coroutine_handle handle) noexcept : handle_(handle) {}

  // The callee corouting handle
  std::coroutine_handle handle_;
};

template 
class await_callback {
 public:
  await_callback() {}

  constexpr bool await_ready() const noexcept { return false; }

  bool await_suspend(std::coroutine_handle::promise_type> h) {
    handle_ = h;
    return false;
  }

  auto await_resume() noexcept {
    return [h = this->handle_] {
      // Add the handle to scheduler to continue the coroutine.
      h.promise().get_spawn()(h);
    };
  }

 private:
  std::coroutine_handle::promise_type> handle_;
};

awaitable mock_heavy_func(int x) {
  std::cout << "[mock_heavy_func] Run\n";
  auto callback = co_await await_callback();  // Will not suspend
  // Schedule a thread and sleep for sometime.
  std::thread thread([x, callback] {
    std::cout << "[mock_heavy_func] Wait in thread\n";
    std::this_thread::sleep_for(x * 1ms);
    std::cout << "[mock_heavy_func] Awake in thread\n";
    callback();  // Tell current coroutine to continue
  });
  thread.detach();
  co_await std::suspend_always{};
  // Will reach here after callback is called
  co_return x;
}

awaitable simple_func(int x) {
  std::cout << "[simple_func] Run\n";
  auto value = co_await mock_heavy_func(x);
  std::cout << "[simple_func] Complete\n";
  co_return value + 1;
}

awaitable complex_func() {
  std::cout << "[complex_func] Run\n";
  auto await1 = simple_func(100);
  auto await2 = simple_func(500);
  auto await3 = simple_func(1000);
  auto await4 = simple_func(2000);
  std::cout << "[complex_func] Wait\n";
  auto value = co_await await1 + co_await await2 + co_await await3 + co_await await4;
  std::cout << "[complex_func] Done\n";
  co_return value;
}

//
// A simple scheduler
//

template 
T spawn(awaitable&& task) {
  //
  // Handle queue and spawn function
  //
  std::mutex m;
  std::counting_semaphore queue_size{0};
  std::queue> h_queue;
  spawn_function spawn = [&m, &queue_size, &h_queue](std::coroutine_handle<> h) {
    {
      std::lock_guard lock(m);
      h_queue.emplace(h);
    }
    queue_size.release();
  };

  // Set spawn function (And will actually run the function)
  task.set_spawn(spawn);

  //
  // Run handles (Yes, we can implement it in a multi-thread way, it's quite easy to do that)
  //
  //  For an industrial implementation, we must consider the following things:
  //
  //    1. Cancellation
  //    2. Effective way to enqueue / dequeue
  //    3. Dead lock detection
  //    4. ...
  //
  //  Remember: this is just a very simple prototype.
  //
  while (!task.done()) {
    // When all coroutines of current scheduler are suspend. The scheduler should wait for any coroutine becoming ready
    // to continue (e.g. data received from a socket, user input a string, ...).
    queue_size.acquire();
    // Run handles
    std::coroutine_handle<> handle;
    {
      std::lock_guard lock(m);
      handle = h_queue.front();
      h_queue.pop();
    }
    handle();
  }

  return task.value();
}

int main() {
  // Spawn the complex function and wait for it
  auto result = spawn(complex_func());
  std::cout << "[main] result:" << result << std::endl;
  return 0;
}

/*
Outputs:
[complex_func] Run
[complex_func] Wait
[simple_func] Run
[mock_heavy_func] Run
[mock_heavy_func] Wait in thread
[mock_heavy_func] Awake in thread
[simple_func] Complete
[simple_func] Run
[mock_heavy_func] Run
[mock_heavy_func] Wait in thread
[mock_heavy_func] Awake in thread
[simple_func] Complete
[simple_func] Run
[mock_heavy_func] Run
[mock_heavy_func] Wait in thread
[mock_heavy_func] Awake in thread
[simple_func] Complete
[simple_func] Run
[mock_heavy_func] Run
[mock_heavy_func] Wait in thread
[mock_heavy_func] Awake in thread
[simple_func] Complete
[complex_func] Done
[main] result:3604
*/

(注释写英文纯粹是因为习惯,早年被utf-8 / gbk搞麻了……)

上面就是包含了调度器本身和测试函数的全部代码,真的是非常非常短吧?去掉注释可能只有100行这样子。

基于前文的叙述分析这段代码,我们首先注意这两个:

  1. 谁负责确定一个coroutine要继续执行?
  2. 谁负责真正继续执行一个coroutine?

我们看下面这个代码片段:

awaitable simple_func(int x) {
	auto value = co_await mock_heavy_func(x);
  co_return value + 1;
}

当 mock_heavy_func 这个 coroutine function 执行完毕之后,simple_func需要从 co_await 的位置恢复执行。这件事件不会自动发生,直到 simple_func 的 coroutine_handle 被调用。

这里有些朋友往往会有一个疑问:为什么之前的例子中的main函数在counter suspend后都会自动执行?实际上这两个现象之间没什么关系,但是因为都是同样的“调用了一个coroutine函数”造成了一些幻觉:

  1. main 函数自身不是coroutine function,因此不具备暂停的能力(没有coroutine handle,没有编译器保存的现成),因此也无法谈论“恢复执行”
  2. 其次,coroutine function 会 suspend 在一个 co_await / co_yield / co_return 的位置(相应的参数指明需要暂停时)。只有 coroutine function 才能使用这些关键字,main函数不能使用也自然不会有这个问题

那么我们考虑下面的例子,方便大家终极理解这个问题:

// 三个coroutine function: co1/co2/co3,全部initial_suspend返回suspend_always
awaitable co1() {...}
awaitable co2() { co_await co1(); }
awaitable co3() { co_await co2(); }
int main() {
  co3();
  // 这一行代码什么时候执行?
  std::cout << "hahaha";
  return 0;
}

main调用了co3,co3调用co2,co2调用co1,那么字符串“hahaha”是在co3执行完后输出的吗?还是在其他什么别的函数执行完输出的?

答案是:main调用co3后,co3在真正执行前就会 suspend,然后线程返回main继续执行,输出hahaha,也就是说:第4行还没有开始执行,而第3,2行根本还没来得及调用。

这段代码没有恢复任何 coroutine 的执行,因此直到退出时co1也没有机会被运行到的。这一段理解了,那么我想大家就会想到:那我们就像第一篇中的例子一样把 co3 的 coroutine_handle 传递给 main 函数,main 调用一次就可以恢复 co3 的执行了。

说的对,确实可以这样,但是 co3 开始执行 co_await co2() 后,co2又 suspend 住了?谁来恢复 co2 的执行呢?诚然我们可以用很多trick把 co2 的 coroutine_handle 传递给 main 函数,但是可以想象,如果调用栈极为复杂甚至不可预期,那么这种trick很快就无法保持下去了。

聪明的朋友也许就想到了:我们可以搞个全局的变量,一个vector,任何 coroutine 函数把自己的 coroutine_handle 写到这个 vector 里,然后 main 函数不断轮训这个 vector 调用 coroutine_handle 就行了!

事实上就是这样,虽然不优雅但是原理是对的,调度器的各类优化(绑定不同的 vector / queue或者其他什么数据结构、用多个线程同时从这个数据结构里取出 handle 来加快执行等等)其原理其实就是这样的。

当然我的例子里不会涉及这么多优化(实际上并不难大家可以自己写写),只优化了一个点:不使用一个全局变量,而是提供一个入口函数 spawn,这个不是 coroutine function 的入口函数负责维护所有由这个函数调用产生的 coroutine_handle 的队列以及在必要时调用 handle。而 coroutine 函数之间的调用会自动传递 spawn 初始给定的队列对象。

下面我们详细分析几段代码:

第206 - 215行,这是 spawn 函数初始化 coroutine_handle 队列数据结构的地方,创建了一个队列、一个信号量和一个互斥体,熟悉多线程编程的朋友对这些很熟悉了,实际上就是一个 blockable producer / consumer queue 的玩意儿。然后创建一个 spawn lambda function,用来封装一下 enqueue 操作。

第234行,把封装好的 spawn function 传入task —— 一个初始化的task没有绑定 spawn 函数,因此并不会真正执行,如同上面co1 / co2 / co3那个例子中的情况,会 suspend 在函数的入口处。我们给 task 设置好了 spawn function 的同时 task 就会调用 spawn 来将自己的 coroutine_handle 写入到队列中。而后 task 调用的任何其他 coroutine function 都会自动传递这个 spawn function。

第232 - 218行,一个判断task执行执行完毕的 while 循环,如果没有执行完毕就一直等待新的 coroutine_handle 被写入队列,这是一个标准的 consumer 实现,没什么好解释的了。

这里多说一句,如果你想让更多线程同时执行 coroutine_handle 来提高性能,可以把这一块儿代码修改为多线程消费 consumer。

上面主要是 spawn 函数干的事情,接下来我们看 awaitable 的实现,主要看两件事:

  • awaitable 是如何在 coroutine 之间传递 spawn function 的?
  • awaitable 何时调用 spawn function 把 coroutine_handle 写入队列的?

第103 - 112行,当 awaitable 对象被 co_await suspend 时,我们需要确保当被调用的 coroutine function (Aka. callee )执行完毕时恢复调用 co_await 的函数(Aka. caller)的执行。为了做到这一点,callee需要 spawn function 以及 caller 的 coroutine_handle ,当 callee 执行到 final_suspend 时,把 caller 的 coroutine_handle 通过调用 spawn 写入到队列中。

有点儿忘记 caller / callee 概念的朋友,回到上面 co1 / co2 / co3 的例子,对于:awaitable co3() { co_await co2(); } 这行代码里的 co_await 来说,其 awaitable::await_suspend 里 callee就是co2,caller就是co3。caller是调用者,callee是被调用者。

这一段代码就是把 caller 的 spawn function 和 coroutine_handle 写入到 callee 的 promise 中。

第31 - 37行,当 coroutine function 执行完毕后,会尝试调用 spawn 将 caller 的 coroutine_handle 写入队列。

这里还有一个需要注意的地方,就是第63 - 70行,在设置 spawn function 时,需要额外判断一次要不要立即开始执行当前函数,因为:

  1. 所有 coroutine function 会 suspend 在函数的起始位置,必须将其 coroutine_handle 加入队列才能继续执行函数
  2. 对于入口 coroutine function complex_func,其是通过手动调用 set_spawn 来设置 spawn_function 的
  3. 任何 coroutine function 如果还没有在 initial suspend 中恢复,应当立即恢复执行,而非等待 caller coroutine function 执行完毕。

基于以上可以容易理解这段代码的含义,并理解为什么第234行能让这个异步框架“开始运行”。

以上就是本文要介绍的全部内容了,其他代码都是比较常规、在之前的文章中都有涉及。大家如果还有什么疑惑的地方,可以留言评论。

至此,C++ coroutine 系列全部4篇文章就结束了,相信大家对如何利用好 C++20 带来的强大的 Coroutine 机制有了较好的了解。我非常建议大家后续读一读 boost::asio 的 awaitable 实现,具有非常好的示范意义。当然其极端的模板用法确实令人有些望而却步,需要很多耐心。

喜欢就点个赞、收个藏、关个注吧!

相关推荐

4万多吨豪华游轮遇险 竟是因为这个原因……

(观察者网讯)4.7万吨豪华游轮搁浅,竟是因为油量太低?据观察者网此前报道,挪威游轮“维京天空”号上周六(23日)在挪威近海发生引擎故障搁浅。船上载有1300多人,其中28人受伤住院。经过数天的调...

“菜鸟黑客”必用兵器之“渗透测试篇二”

"菜鸟黑客"必用兵器之"渗透测试篇二"上篇文章主要针对伙伴们对"渗透测试"应该如何学习?"渗透测试"的基本流程?本篇文章继续上次的分享,接着介绍一下黑客们常用的渗透测试工具有哪些?以及用实验环境让大家...

科幻春晚丨《震动羽翼说“Hello”》两万年星间飞行,探测器对地球的最终告白

作者|藤井太洋译者|祝力新【编者按】2021年科幻春晚的最后一篇小说,来自大家喜爱的日本科幻作家藤井太洋。小说将视角放在一颗太空探测器上,延续了他一贯的浪漫风格。...

麦子陪你做作业(二):KEGG通路数据库的正确打开姿势

作者:麦子KEGG是通路数据库中最庞大的,涵盖基因组网络信息,主要注释基因的功能和调控关系。当我们选到了合适的候选分子,单变量研究也已做完,接着研究机制的时便可使用到它。你需要了解你的分子目前已有哪些...

知存科技王绍迪:突破存储墙瓶颈,详解存算一体架构优势

智东西(公众号:zhidxcom)编辑|韦世玮智东西6月5日消息,近日,在落幕不久的GTIC2021嵌入式AI创新峰会上,知存科技CEO王绍迪博士以《存算一体AI芯片:AIoT设备的算力新选择》...

每日新闻播报(September 14)_每日新闻播报英文

AnOscarstatuestandscoveredwithplasticduringpreparationsleadinguptothe87thAcademyAward...

香港新巴城巴开放实时到站数据 供科技界研发使用

中新网3月22日电据香港《明报》报道,香港特区政府致力推动智慧城市,鼓励公私营机构开放数据,以便科技界研发使用。香港运输署21日与新巴及城巴(两巴)公司签署谅解备忘录,两巴将于2019年第3季度,开...

5款不容错过的APP: Red Bull Alert,Flipagram,WifiMapper

本周有不少非常出色的app推出,鸵鸟电台做了一个小合集。亮相本周榜单的有WifiMapper's安卓版的app,其中包含了RedBull的一款新型闹钟,还有一款可爱的怪物主题益智游戏。一起来看看我...

Qt动画效果展示_qt显示图片

今天在这篇博文中,主要实践Qt动画,做一个实例来讲解Qt动画使用,其界面如下图所示(由于没有录制为gif动画图片,所以请各位下载查看效果):该程序使用应用程序单窗口,主窗口继承于QMainWindow...

如何从0到1设计实现一门自己的脚本语言

作者:dong...

三年级语文上册 仿写句子 需要的直接下载打印吧

描写秋天的好句好段1.秋天来了,山野变成了美丽的图画。苹果露出红红的脸庞,梨树挂起金黄的灯笼,高粱举起了燃烧的火把。大雁在天空一会儿写“人”字,一会儿写“一”字。2.花园里,菊花争奇斗艳,红的似火,粉...

C++|那些一看就很简洁、优雅、经典的小代码段

目录0等概率随机洗牌:1大小写转换2字符串复制...

二年级上册语文必考句子仿写,家长打印,孩子照着练

二年级上册语文必考句子仿写,家长打印,孩子照着练。具体如下:...

一年级语文上 句子专项练习(可打印)

...

亲自上阵!C++ 大佬深度“剧透”:C++26 将如何在代码生成上对抗 Rust?

...

取消回复欢迎 发表评论: