理解C++20的革命特性——协程支持2:编写简单的协程调度器 - 实践

news/2025/10/21 15:59:02/文章来源:https://www.cnblogs.com/ljbguanli/p/19155645

理解C++20的革命特性——协程支持2:编写简单的协程调度器 - 实践

理解C++20的革命特性——协程支持2:编写简单的协程调度器

仓库:https://github.com/Charliechen114514/co_practices

前言

​ 在上一篇博客中,我们已经理解了C++20中最为简单的协程调度接口(尽管这一点也不简单)。显然,在这篇博客之前,我们的协程之间还是在使用单协程的调度器进行调度。看起来协程好鸡肋。啥也干不了。但是别着急,为了我们可以进一步的发挥协程的威力。笔者需要你动手完成这个简单的小任务,这个小任务并不困难:

​ 如果你看上面的题目一头雾水,觉得不知道我在说什么的话——您可以先阅读下面的调用代码,然后返回我的上一篇博客琢磨一下该怎么写。(你怎么知道我找到这个练习的时候自己也是一头雾水的)

Task<int> co_add(int a, int b) {simple_log_with_func_name(std::format("Get a: {} and b: {}, ""expected a + b = {}",a, b, a + b));co_return a + b;}Task<void> examples(int a, int b) {simple_log("About to call co_add");int result = co_await co_add(a, b);simple_log(std::format("Get the result: {}", result));co_return;}int main() {simple_log_with_func_name();examples(1, 2);simple_log("Done!");}

​ 你所需要做的,就是上面的代码跑起来,跑起来的办法就是实现Task<T>。如果您做好了,请您参考下面的代码对比一下实现。我们后面会复用Task<T>来完成我们本篇博客的主题——带有返回值支持的调度器。

​ 这里是笔者的代码。"helpers.h"已经在上一篇博客被给出了,没有任何变化,放心使用。

#include "helpers.h"
#include <coroutine>#include <format>template <typename T>class Task {public:struct promise_type;using coro_handle = std::coroutine_handle<promise_type>;Task(coro_handle h): coroutine_handle(h) {simple_log_with_func_name();}~Task() {simple_log_with_func_name();if (coroutine_handle) {coroutine_handle.destroy();}}Task(Task&& o): coroutine_handle(o.coroutine_handle) {o.coroutine_handle = nullptr;}Task& operator=(Task&& o) {coroutine_handle = std::move(o.coroutine_handle);o.coroutine_handle = nullptr;return *this;}// concept requiresstruct promise_type {T cached_value;Task get_return_object() {simple_log_with_func_name();return { coro_handle::from_promise(*this) };}// we dont need suspend when first suspendstd::suspend_never initial_suspend() {simple_log_with_func_name();return {};}// suspend always for the Task clean upsstd::suspend_always final_suspend() noexcept {simple_log_with_func_name();return {};}void return_value(T value) {simple_log_with_func_name(std::format("value T {} is received!", value));cached_value = std::move(value);}void unhandled_exception() {// process notings}};bool await_ready() {simple_log_with_func_name();return false; // always need suspend}void await_suspend(std::coroutine_handle<> h) {simple_log_with_func_name(); // Should never be hereh.resume(); // resume these always}T await_resume() {simple_log_with_func_name();return coroutine_handle.promise().cached_value;}private:coro_handle coroutine_handle;private:Task(const Task&) = delete;Task& operator=(const Task&) = delete;};template <>class Task<void> {public:struct promise_type;using coro_handle = std::coroutine_handle<promise_type>;Task(coro_handle h): coroutine_handle(h) {simple_log_with_func_name();}~Task() {simple_log_with_func_name();if (coroutine_handle) {coroutine_handle.destroy();}}Task(Task&& o): coroutine_handle(o.coroutine_handle) {o.coroutine_handle = nullptr;}Task& operator=(Task&& o) {coroutine_handle = std::move(o.coroutine_handle);o.coroutine_handle = nullptr;return *this;}// concept requiresstruct promise_type {Task get_return_object() {simple_log_with_func_name();return { coro_handle::from_promise(*this) };}// we dont need suspend when first suspendstd::suspend_never initial_suspend() {simple_log_with_func_name();return {};}// suspend always for the Task clean upsstd::suspend_always final_suspend() noexcept {simple_log_with_func_name();return {};}void return_void() { simple_log_with_func_name(); }void unhandled_exception() {// process notings}};private:coro_handle coroutine_handle;private:Task(const Task&) = delete;Task& operator=(const Task&) = delete;};Task<int> co_add(int a, int b) {simple_log_with_func_name(std::format("Get a: {} and b: {}, ""expected a + b = {}",a, b, a + b));co_return a + b;}Task<void> examples(int a, int b) {simple_log("About to call co_add");int result = co_await co_add(a, b);simple_log(std::format("Get the result: {}", result));co_return;}int main() {simple_log_with_func_name();examples(1, 2);simple_log("Done!");}

​ 如果你没有看懂发生了什么,请继续看下面的内容,如果实现跟你的差不多,您可以再翻上去继续编写调度器了。

实现一个最简单的调度器

​ 我们下面马上就来实现一个最简单的调度器了。下面是我们的要求

  • 写一个单例的单线程调度器(event loop),能调度多个 Task。(建议编写一个单例模板玩,另外,Task的基本代码由上一个任务已经做完了)
  • 实现 sleep(ms) awaiter
  • 检测能不能用——写 3 个协程并发运行:打印 “A”、“B”、“C”,交替输出。
第一步——实现一个单例的模板

​ 笔者决定实现一个简单的单例模板,方便我们其他项目的复用。关于单例模式的探讨,尽管依赖注入(DI)是更加合适的,但是我们还是编写基于static的单例模板(协程是C++20才有的,C++11以上已经保证了static的初始化是线程安全的)

single_instance.hpp

#pragma once
template <typename SingleInstanceType>class SingleInstance {public:static SingleInstanceType& instance() {static SingleInstanceType instance;return instance;}protected:SingleInstance() = default;virtual ~SingleInstance() = default;private:SingleInstance(const SingleInstance&) = delete;SingleInstance& operator=(const SingleInstance&) = delete;SingleInstance(SingleInstance&&) = delete;SingleInstance& operator=(SingleInstance&&) = delete;};

​ 很显然,我们禁用了任何形式的拷贝和构造,而且为了后续的使用方便,咱们要采用安全的虚析构函数。SingleInstance()要放到保护域下,咱们的单例子类要访问这个保证我们在语法上回避第二个实例的创建。使用上,咱们只需要这样书写:

class Schedular : public SingleInstance<Schedular>{Schedular() = default; // 还是藏起来我们的构造函数public:friend class SingleInstance<Schedular>;}

好巧不巧,笔者写过单例模式的探讨,实现也是C++20的,参考博客:

  • CSDN: 精读C++20设计模式——创造型设计模式:单例模式-CSDN博客
  • charliechen114514.tech: 精读C++20设计模式——创造型设计模式:单例模式
第二步:初步修改一下我们的Task,让调度器有机会接管我们的协程

​ 很显然——我们现在决定利用调度器来调度我们的协程了——那么,任何的挂起操作都需要我们来控制而不是返回结构体自行裁决,为此,我们的初始化也需要立马被挂起:

// we need suspend when first suspend
std::suspend_always initial_suspend() {
// simple_log_with_func_name();
return {};
}

​ 不管是哪一个泛型实现还是偏特化实现亦是如此。

第三步:思考调度器支持的接口

​ 我们下面准备思考调度器的接口了,好在我们的协程不是抢占式调度,代码写起来非常的容易(但是容易不太可能),只需要遵循无让出时的FIFO调度就好了。

​ 首先,调度器需要支持Sleep调用,也就是让当前的协程睡大觉(有其他协程任务就做其他的,没有咱们说明当前的线程是需要空闲的,调用std::this_thread::sleep_*接口就好了)

​ 所以,我们需要让调度器知道哪一些协程是需要睡大觉的——调度器需要有一个容器管理谁需要睡觉,和一个推送需要睡觉的指定协程。

​ 需要知道一个事情——标准库为了方便,是存在一个叫做sleep_until的接口的,所以,为了方便管理和复用标准库接口,我们设计一个调度器的sleep_until接口——它表明我们要休眠到指定的时间点就可以准备被调度(再次强调,我们需要注意协程的调度是协同调度,我们只能保证休眠的下限事件)。

void Schedular::sleep_until(std::coroutine_handle<> which, // 谁需要休眠?std::chrono::steady_clock::time_point until_when);

​ 另外,咱们还要有一个推送接口:spawn接口,用来接受协程函数返回结构体。这个结构体的所有调度都要被调度器接管。所以,别忘记在Task处声明调度器类为友元。

template <typename T>void Schedular::spawn(Task<T>&& task); // Task只可以被移动,所以放这个接口进来

​ 最后还有一个调度接口——run接口

void Schedular::run();

​ 他将会开始我们的协程调度。就三个!

第四步:实现上面的接口
实现spawn接口,托管协程函数返回的协程返回结构体

​ 我们先从调度本身开始,首先,我们需要缓存预备列的协程接口(注意不是Task本身,我们在调度协程而不是协程的返回结构体),上面提到了咱们的调度策略是FIFO,因此,先来先到就要求我们采用队列来处理我们的存储。

std::queue<std::coroutine_handle<>> ready_coroutines; // 一个简单的队列即可

​ 所以,咱们的spawn接口变得非常好实现——

void Schedular::internal_spawn(std::coroutine_handle<> h) {// private实现,用户不应该直接随意的触碰调度队列ready_coroutines.push(h);	// 加入调度队列}// spawn是一个桥接的接口,我们会取出来Task内托管的coroutine_handle协程句柄,交给我们的// 调度器来管理template <typename T>inline void Schedular::spawn(Task<T>&& task) {internal_spawn(task.coroutine_handle);task.coroutine_handle = nullptr; // 让Task不再托管coroutine_handle本身}
实现睡眠机制

睡眠需要登记我们睡多久,谁睡觉,而且还要按照一定的优先级排序(你想,如果有三个睡眠请求100ms,200ms,300ms的睡眠,肯定是优先睡眠100ms的,再睡200ms,再睡300ms,反过来的话,前两个黄瓜菜都凉了),显然我们立马想到了优先级队列。但是优先级队列需要提供比较方法从而产生小/大根堆。所以我们需要抽象SleepItem结构体——它登记我们的根是睡眠事件最小的。或者说离当前事件点最近的。

struct SleepItem {
SleepItem(std::coroutine_handle<> h,std::chrono::steady_clock::time_point tp): coro_handle(h), sleep(tp) {}std::chrono::steady_clock::time_point sleep;std::coroutine_handle<> coro_handle;bool operator<(const SleepItem& other) const {return sleep > other.sleep;}};std::priority_queue<SleepItem> sleepys;

​ 但是我们还没有实现用户侧代码,用户期盼我们可以这样睡眠:

co_await sleep(300ms);

​ 欸,怎么说的?看到co_await就要条件反射实现awaitable接口。所以——

struct AwaitableSleep {
AwaitableSleep(std::chrono::milliseconds how_long)
: duration(how_long)
, wake_time(std::chrono::steady_clock::now() + how_long) { }
/**
* @brief await_ready always lets the sessions sleep!
*
*/
bool await_ready() { return false; } // 总是我们接管剩下的流程
void await_suspend(std::coroutine_handle<> h) {// 执行推送,然后后面我们自己的调度器会取出来这个句柄扔到就绪队列中Schedular::instance().sleep_until(h, wake_time);}// 什么都不做void await_resume() { }private:std::chrono::milliseconds duration; // 方便获取接口或者调试,性能优先下可以踢掉这个std::chrono::steady_clock::time_point wake_time;};inline AwaitableSleep sleep(std::chrono::milliseconds s) {return { s };}
实现调度逻辑

​ 首先,睡眠是没活干才做,实现上的优先级很明显了——优先处理活跃的协程!

void run() {
// if there is any corotines ready or sleepy unfinished
while (!ready_coroutines.empty() || !sleepys.empty()) {
// 进来这个逻辑,就表明我们现在是有事情做的——不管是睡大觉还是拉起一个协程。
while (!ready_coroutines.empty()) {
auto front_one = ready_coroutines.front();
ready_coroutines.pop();
front_one.resume(); // OK, hang this on!
}
...
}
}

​ 如果我们任何活跃代码都执行完毕了,我们才会去检查睡眠队列中有没有待唤醒的家伙——

auto now = current(); // current返回std::chrono::steady_clock::now()
while (!sleepys.empty() && sleepys.top().sleep <= now) {
ready_coroutines.push(sleepys.top().coro_handle);
sleepys.pop();
}

​ 非常好,如果我们现在的事件越过了指定睡眠唤醒的时间点(也就是sleepys.top().sleep),咱们就要放所有越过了时间点的协程送到咱们的就绪队列中。

​ 下一步,如果我们还有协程需要睡觉,且没有新的就绪队列到来,我们立马就对本线程进行睡眠

void run() {
// if there is any corotines ready or sleepy unfinished
while (!ready_coroutines.empty() || !sleepys.empty()) {
while (!ready_coroutines.empty()) {
auto front_one = ready_coroutines.front();
ready_coroutines.pop();
front_one.resume(); // OK, hang this on!
}
auto now = current();
while (!sleepys.empty() && sleepys.top().sleep <= now) {
ready_coroutines.push(sleepys.top().coro_handle);
sleepys.pop();
}
if (ready_coroutines.empty() && !sleepys.empty()) {
// OK, we can sleep
std::this_thread::sleep_until(sleepys.top().sleep);
}
}
}
继续修改Task的接口

​ 现在任务需要直接向队列里推送了,我们需要思考这些问题。我们使用调度器会这样使用:

Task<int> co_add(int a, int b) {co_await sleep(300ms);co_return a + b;}Task<void> worker(const char* name, int a, int b) {int result = co_await co_add(a, b);std::println("{}: {} + {} = {}", name, a, b, result);}Task<void> main_task() {co_await worker("TaskA", 1, 2);co_await worker("TaskB", 3, 4);co_await worker("TaskC", 5, 6);}

​ 所有的父协程会放下自身的运行,按照C++20无栈协程的逻辑——我们要自己保存协程的句柄。所以我们很容易想到——Task本身要存储父协程的句柄,方便我们子协程恢复的时候恢复父协程的运行,才能继续代码。

​ 可能太跳跃了,我们一个一个慢慢来——我们的父协程里写下代码——co_await worker("TaskA", 1, 2);的时候,父协程就要放弃自己的运行,等待worker的结果。这个时候,我们回忆第一篇博客我们协程框架的运行逻辑:走await_ready查看是否挂起——我们显然返回了否,要自己接管逻辑。所以下一步的执行流被转发到了await_suspend中,这一步就是我们要的——父协程要被挂起,所以子协程要被推送!

// 在创建的子协程的协程返回体中
void await_suspend(std::coroutine_handle<> h) {// simple_log_with_func_name(); // Should never be heresimple_log("Current Routine will be suspend!");coroutine_handle.promise().parent_coroutine = h;simple_log("Child Routine will be called resume!");Schedular::instance().internal_spawn(coroutine_handle);}

coroutine_handle.promise().parent_coroutine = h;设置了子协程的父协程为当前线程,然后将子协程放到就绪队列里去。没啥毛病!(注意这个代码是子协程返回结构体)

​ 现在,我们的子协程就被送到就绪队列中,而且令人兴奋的是——它会被送到就绪的处理逻辑里,当我们的调度器执行就绪的协程队列代码的时候,我们就会执行这个逻辑——

while (!ready_coroutines.empty()) {
auto front_one = ready_coroutines.front();
ready_coroutines.pop();
front_one.resume(); // OK, hang this on!
}

​ 子协程在这里被resume了,执行的就是worker的代码——子协程这下就被挂起。当worker执行结束之后,我们仍然按照流程——调用的是final_suspend,还记得我们存储的parent_coroutine嘛?这里发力了——子协程的结束要求父协程放下执行代码.所以事情变得非常的容易:

std::suspend_always final_suspend() noexcept {
// simple_log_with_func_name();
if (parent_coroutine) {
simple_log("parent_coroutine will be wake up");
// 父协程拉起来执行代码
Schedular::instance().internal_spawn(parent_coroutine);
}
return {};	// 子协程由Task结构体托管,这个逻辑不会发生改变
}

​ 走到这里,我们所有的代码都完成了.我们编译运行一下:

[charliechen@Charliechen coroutines]$ build/schedular/schedular
10:36:12 :Current Routine will be suspend!
10:36:12 :Child Routine will be called resume!
10:36:12 :Current Routine will be suspend!
10:36:12 :Child Routine will be called resume!
10:36:13 :parent_coroutine will be wake up
TaskA: 1 + 2 = 3
10:36:13 :Current Routine will be suspend!
10:36:13 :Child Routine will be called resume!
10:36:13 :Current Routine will be suspend!
10:36:13 :Child Routine will be called resume!
10:36:13 :parent_coroutine will be wake up
TaskB: 3 + 4 = 7
10:36:13 :Current Routine will be suspend!
10:36:13 :Child Routine will be called resume!
10:36:13 :Current Routine will be suspend!
10:36:13 :Child Routine will be called resume!
10:36:13 :parent_coroutine will be wake up
TaskC: 5 + 6 = 11

​ 代码工作的非常完美.

上面的日志如何产生的?答案如下:

[charliechen@Charliechen coroutines]$ build/schedular/schedular
10:36:12 :Current Routine will be suspend! // main_task准备被挂起
10:36:12 :Child Routine will be called resume! // worker("TaskA", 1, 2);准备干活
10:36:12 :Current Routine will be suspend! // worker("TaskA", 1, 2)准备被挂起
10:36:12 :Child Routine will be called resume! // co_add准备干活
10:36:13 :parent_coroutine will be wake up // co_add作为叶子协程,准备结束自己,拉起父协程worker干活
TaskA: 1 + 2 = 3 // worker被拉起,执行打印逻辑
// 如下的逻辑是类似的
10:36:13 :Current Routine will be suspend!
10:36:13 :Child Routine will be called resume!
10:36:13 :Current Routine will be suspend!
10:36:13 :Child Routine will be called resume!
10:36:13 :parent_coroutine will be wake up
TaskB: 3 + 4 = 7
10:36:13 :Current Routine will be suspend!
10:36:13 :Child Routine will be called resume!
10:36:13 :Current Routine will be suspend!
10:36:13 :Child Routine will be called resume!
10:36:13 :parent_coroutine will be wake up
TaskC: 5 + 6 = 11

附录:实现协程的加法函数co_add

​ 为了防止你来回翻阅,笔者仍然直接把代码CV一份放在这里。

#include "helpers.h"
#include <coroutine>#include <format>template <typename T>class Task {public:struct promise_type;using coro_handle = std::coroutine_handle<promise_type>;Task(coro_handle h): coroutine_handle(h) {simple_log_with_func_name();}~Task() {simple_log_with_func_name();if (coroutine_handle) {coroutine_handle.destroy();}}Task(Task&& o): coroutine_handle(o.coroutine_handle) {o.coroutine_handle = nullptr;}Task& operator=(Task&& o) {coroutine_handle = std::move(o.coroutine_handle);o.coroutine_handle = nullptr;return *this;}// concept requiresstruct promise_type {T cached_value;Task get_return_object() {simple_log_with_func_name();return { coro_handle::from_promise(*this) };}// we dont need suspend when first suspendstd::suspend_never initial_suspend() {simple_log_with_func_name();return {};}// suspend always for the Task clean upsstd::suspend_always final_suspend() noexcept {simple_log_with_func_name();return {};}void return_value(T value) {simple_log_with_func_name(std::format("value T {} is received!", value));cached_value = std::move(value);}void unhandled_exception() {// process notings}};bool await_ready() {simple_log_with_func_name();return false; // always need suspend}void await_suspend(std::coroutine_handle<> h) {simple_log_with_func_name(); // Should never be hereh.resume(); // resume these always}T await_resume() {simple_log_with_func_name();return coroutine_handle.promise().cached_value;}private:coro_handle coroutine_handle;private:Task(const Task&) = delete;Task& operator=(const Task&) = delete;};template <>class Task<void> {public:struct promise_type;using coro_handle = std::coroutine_handle<promise_type>;Task(coro_handle h): coroutine_handle(h) {simple_log_with_func_name();}~Task() {simple_log_with_func_name();if (coroutine_handle) {coroutine_handle.destroy();}}Task(Task&& o): coroutine_handle(o.coroutine_handle) {o.coroutine_handle = nullptr;}Task& operator=(Task&& o) {coroutine_handle = std::move(o.coroutine_handle);o.coroutine_handle = nullptr;return *this;}// concept requiresstruct promise_type {Task get_return_object() {simple_log_with_func_name();return { coro_handle::from_promise(*this) };}// we dont need suspend when first suspendstd::suspend_never initial_suspend() {simple_log_with_func_name();return {};}// suspend always for the Task clean upsstd::suspend_always final_suspend() noexcept {simple_log_with_func_name();return {};}void return_void() { simple_log_with_func_name(); }void unhandled_exception() {// process notings}};private:coro_handle coroutine_handle;private:Task(const Task&) = delete;Task& operator=(const Task&) = delete;};

​ 首先,上一篇博客咱们已经提到了——任何跑在协程的函数必须返回协程返回类型,这个事情要求你不容商量的内嵌一个结构体struct promise_type,而且要求你必须实现接口——

struct promise_type {
T cached_value;
Task get_return_object() {
simple_log_with_func_name();
return { coro_handle::from_promise(*this) };
}
// we dont need suspend when first suspend
std::suspend_never initial_suspend() {
simple_log_with_func_name();
return {};
}
// suspend always for the Task clean ups
std::suspend_always final_suspend() noexcept {
simple_log_with_func_name();
return {};
}
void return_value(T value) {
simple_log_with_func_name(std::format("value T {} is received!", value));
cached_value = std::move(value);
}
void unhandled_exception() {
// process notings
}
};

​ 本篇样例中,我们不难理解的是——co_add并不需要创建协程即挂起,所以咱们只需要返回std::suspend_never即可,让我们立马执行返回的结果co_return a + b上,a + b被计算好后,会被送到return_value当中去,需要注意的是——上一篇博客我们已经讨论了生命周期上返回类型和协程句柄本身谁要更长了,这也是为什么我们选择挂起,这样的话让更上一层的Task来负责析构协程对象,而不是它自己解决它自己。这个结构你不会感到陌生的,上一篇博客已经说明了这个结构到底在干啥。

​ co_await要求等待Task<int>,所以任何非空的Task还要实现Awaitable接口(注意,不是说带有PromiseType接口的返回结构体都要实现Awaitable接口,而是我们需要co_await这个接口的时候才需要实现Awaitable接口,请各位搞清楚逻辑关系。)

bool await_ready() {
simple_log_with_func_name();
return false; // always need suspend
}
void await_suspend(std::coroutine_handle<> h) {simple_log_with_func_name(); // Should never be hereh.resume(); // resume these always, call await_resume then}T await_resume() {simple_log_with_func_name();return coroutine_handle.promise().cached_value;}

​ 尽管逻辑上,咱们实际上不需要挂起接口,但是我们的结果存储在coroutine_handle的promise_type里了,这个时候——我们需要接管等待的逻辑,所以还是要挂起来

await_ready实际上也可以表达为——我们需要接管等待的逻辑做我们自己的处理

第一篇博客在:

  • CSDN链接:CSDN
  • 我自己博客的链接:charliechen114514.tech

附录2:调度器的代码

schedular.cpp: example的主代码

#include "schedular.hpp"
#include <print>using namespace std::chrono_literals;Task<int> co_add(int a, int b) {co_await sleep(300ms);co_return a + b;}Task<void> worker(const char* name, int a, int b) {int result = co_await co_add(a, b);std::println("{}: {} + {} = {}", name, a, b, result);}Task<void> main_task() {co_await worker("TaskA", 1, 2);co_await worker("TaskB", 3, 4);co_await worker("TaskC", 5, 6);}int main() {Schedular::instance().spawn(main_task());Schedular::instance().run();}

schedular.hpp:调度器代码

#pragma once
#include "single_instance.hpp"
#include <chrono>#include <coroutine>#include <queue>#include <thread>template <typename T>class Task;struct AwaitableSleep;class Schedular : public SingleInstance<Schedular> {struct SleepItem {SleepItem(std::coroutine_handle<> h,std::chrono::steady_clock::time_point tp): coro_handle(h), sleep(tp) {}std::chrono::steady_clock::time_point sleep;std::coroutine_handle<> coro_handle;bool operator<(const SleepItem& other) const {return sleep > other.sleep;}};std::queue<std::coroutine_handle<>> ready_coroutines;std::priority_queue<SleepItem> sleepys;private:Schedular() = default;~Schedular() override {run();}friend class AwaitableSleep;template <typename T>friend class Task;static std::chrono::steady_clock::time_pointcurrent() {return std::chrono::steady_clock::now();}void sleep_until(std::coroutine_handle<> which,std::chrono::steady_clock::time_point until_when) {sleepys.emplace(which, until_when);}void internal_spawn(std::coroutine_handle<> h) {ready_coroutines.push(h);}public:friend class SingleInstance<Schedular>;template <typename T>void spawn(Task<T>&& task);void run() {// if there is any corotines ready or sleepy unfinishedwhile (!ready_coroutines.empty() || !sleepys.empty()) {while (!ready_coroutines.empty()) {auto front_one = ready_coroutines.front();ready_coroutines.pop();front_one.resume(); // OK, hang this on!}auto now = current();while (!sleepys.empty() && sleepys.top().sleep <= now) {ready_coroutines.push(sleepys.top().coro_handle);sleepys.pop();}if (ready_coroutines.empty() && !sleepys.empty()) {// OK, we can sleepstd::this_thread::sleep_until(sleepys.top().sleep);}}}};struct AwaitableSleep {AwaitableSleep(std::chrono::milliseconds how_long): duration(how_long), wake_time(std::chrono::steady_clock::now() + how_long) { }/*** @brief await_ready always lets the sessions sleep!**/bool await_ready() { return false; }void await_suspend(std::coroutine_handle<> h) {Schedular::instance().sleep_until(h, wake_time);}void await_resume() { }private:std::chrono::milliseconds duration;std::chrono::steady_clock::time_point wake_time;};inline AwaitableSleep sleep(std::chrono::milliseconds s) {return { s };}#include "task.hpp"template <typename T>inline void Schedular::spawn(Task<T>&& task) {internal_spawn(task.coroutine_handle);task.coroutine_handle = nullptr;}

task.hpp: Task的最终抽象

#pragma once
#include "helpers.h"
#include "schedular.hpp"
#include <coroutine>#include <utility>template <typename T>class Task {public:friend class Schedular;struct promise_type;using coro_handle = std::coroutine_handle<promise_type>;Task(coro_handle h): coroutine_handle(h) {// simple_log_with_func_name();}~Task() {// simple_log_with_func_name();if (coroutine_handle) {coroutine_handle.destroy();}}Task(Task&& o): coroutine_handle(o.coroutine_handle) {o.coroutine_handle = nullptr;}Task& operator=(Task&& o) {coroutine_handle = std::move(o.coroutine_handle);o.coroutine_handle = nullptr;return *this;}// concept requiresstruct promise_type {T cached_value;std::coroutine_handle<> parent_coroutine;Task get_return_object() {// simple_log_with_func_name();return { coro_handle::from_promise(*this) };}// we dont need suspend when first suspendstd::suspend_always initial_suspend() {// simple_log_with_func_name();return {};}// suspend always for the Task clean upsstd::suspend_always final_suspend() noexcept {// simple_log_with_func_name();if (parent_coroutine) {simple_log("parent_coroutine will be wake up");Schedular::instance().internal_spawn(parent_coroutine);}return {};}void return_value(T value) {// simple_log_with_func_name(std::format("value T {} is received!", value));cached_value = std::move(value);}void unhandled_exception() {// process notings}};bool await_ready() {// simple_log_with_func_name();return false; // always need suspend}void await_suspend(std::coroutine_handle<> h) {// simple_log_with_func_name(); // Should never be heresimple_log("Current Routine will be suspend!");coroutine_handle.promise().parent_coroutine = h;simple_log("Child Routine will be called resume!");Schedular::instance().internal_spawn(coroutine_handle);}T await_resume() {// simple_log_with_func_name();return coroutine_handle.promise().cached_value;}private:coro_handle coroutine_handle;private:Task(const Task&) = delete;Task& operator=(const Task&) = delete;};template <>class Task<void> {public:friend class Schedular;struct promise_type;using coro_handle = std::coroutine_handle<promise_type>;Task(coro_handle h): coroutine_handle(h) {// simple_log_with_func_name();}~Task() {// simple_log_with_func_name();if (coroutine_handle) {coroutine_handle.destroy();}}Task(Task&& o): coroutine_handle(o.coroutine_handle) {o.coroutine_handle = nullptr;}Task& operator=(Task&& o) {coroutine_handle = std::move(o.coroutine_handle);o.coroutine_handle = nullptr;return *this;}bool await_ready() {// simple_log_with_func_name();return false; // always need suspend}void await_suspend(std::coroutine_handle<> h) {// simple_log_with_func_name(); // Should never be heresimple_log("Current Routine will be suspend!");coroutine_handle.promise().parent_coroutine = h;simple_log("Child Routine will be called resume!");Schedular::instance().internal_spawn(coroutine_handle);}void await_resume() {// simple_log_with_func_name();}// concept requiresstruct promise_type {std::coroutine_handle<> parent_coroutine;Task get_return_object() {// simple_log_with_func_name();return { coro_handle::from_promise(*this) };}// we need suspend when first suspendstd::suspend_always initial_suspend() {// simple_log_with_func_name();return {};}// suspend always for the Task clean upsstd::suspend_always final_suspend() noexcept {// simple_log_with_func_name();if (parent_coroutine) {Schedular::instance().internal_spawn(parent_coroutine);}return {};}void return_void() {// simple_log_with_func_name();}void unhandled_exception() {// process notings}};private:coro_handle coroutine_handle;private:Task(const Task&) = delete;Task& operator=(const Task&) = delete;};

剩下的helpers.h/helpers.cpp和single_instance.hpp,笔者已经在正文给出了。不再重复。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/942365.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

九种UML常见图 -2025.10.19

UML(统一建模语言)的九种常见图可以分为两大类:结构图 和行为图。 结构图 用于展示系统静态的、不变的部分,就像建筑的蓝图,描述有什么构件。 行为图 用于展示系统动态的、变化的部分,就像建筑的使用手册,描述构…

阿里云 CDN部署

阿里云 CDN部署视图预览 阿里云 CDN 每秒访问次数,下行流量,边缘带宽,响应时间,回源带宽,状态码等版本支持 操作系统支持:Linux 前置条件服务器 <安装 Datakit> 服务器 <安装 Func 携带版> 阿里云…

阿里、字节、腾讯等大厂都在用的 12 大主流 AI 前端组件库

阿里、字节、腾讯等大厂都在用的 12 大主流 AI 前端组件库 2025-06-241,720阅读2分钟 Ant Design Xgithub.com/ant-design/…Ant Group 团队打造的面向 AI 时代的全新企业级前端组件库,融合 Copilot、Agent、AI 驱动…

分箱效果评估:IV值和卡方

1.IV值(信息价值):分箱后变量的预测能力 IV值计算过程: 1.计算每个分箱的WOE(证据权重) 2.WOE和分箱的样本占比计算IV2.卡方值(Chi-Square):衡量分箱的“统计显著性”​ 卡方分箱(风控建模) 范围 取值范围:[0, +…

2025 年电缆桥架生产厂家最新推荐排行榜:聚焦北方 / 河北区域及瓦楞 / 防火 / 模压 / 镀锌桥架优质品牌深度解析

引言在电力传输、建筑基建等关键领域,电缆桥架作为线路支撑核心设备,其品质直接决定工程安全与运维稳定性。当前市场中,部分厂家存在工艺粗糙、防腐性能不足、承重不达标等问题,导致桥架寿命缩短、线路故障频发,增…

洒水清洁,音乐相伴,洒水车声音-兰花草音乐芯片详细资料

**兰花草音乐芯片AC1501**是一种广泛应用于电子设备(如贺卡、洒水车、倒车喇叭等)的音乐播放芯片,其核心功能是通过内置解码电路和放大器实现音乐播放。主要应用场景 1、贺卡与玩具:用于制作带有简单音乐播放功能的…

JavaScript 开发代码规范指南

1. 代码格式 1.1 缩进使用 2 个空格进行缩进 不要混用空格和制表符// ✅ 正确 function calculateTotal(items) {let total = 0;items.forEach(item => {total += item.price * item.quantity;});return total; }//…

04.Python百行代码制作查询工具

04.Python百行代码制作查询工具 ------------------------------------------------ 执行后——————————————————————————————————————————————————————————…

通过一台服务器采集所有阿里云账单费用数据

通过一台服务器采集所有阿里云账单费用数据费用 视图预览安装部署说明: 示例 Linux 版本为:CentOS Linux release 7.8.2003 (Core) 通过一台服务器采集所有阿里云账单费用数据前置条件服务器 <安装 Datakit> 服…

2025 油烟机厂家最新推荐榜:五大实力厂商技术与服务口碑评测权威发布滑轨/易清洁/免清洗/智能油烟机厂家推荐

引言 油烟机作为守护厨房健康的核心电器,其性能直接决定烹饪体验与家居环境质量。但当前市场乱象突出:2025 年省级抽查显示多批次产品存在噪声超标、吸力不足等问题,部分产品清洁困难、设计反人类,让消费者陷入选择…

VUE---打印功能

在开发系统的时候,经常会需要将系统里面的报表进行打印,主要是要实现局部打印的功能: 插件:npm install vue-print-nb --save具体示例代码:<template><div><button v-print="#printArea"…

编程语言变量的引用共享问题

编程语言变量的引用共享问题//基本数据类型赋值是开辟新的内存空间。 //修改后面的变量的值不会修改原始变量的值。 var a =1 var b=a b=3 console.log(b) // 3 //引用数据类型赋值时,后面的变量的值修改会改变原始变…

分析一下url的格式和windows与Linux共享文件的格式

分析一下url的格式和windows与Linux共享文件的格式 讲解这些URL和连接格式: ## 1. MSYS2 网址格式分析 `https://www.msys2.org/` 分解: ```https:// - 协议类型(加密的HTTP)www.msys2.org - 完整域名 …

高效管理超多传感器?SHxxx 集线器实现精准切换与零混淆 告别通道混乱,内置校验

高效管理超多传感器?SHxxx 集线器实现精准切换与零混淆 告别通道混乱,内置校验SHxxx 传感器集线器是一款专为高密度传感场景设计的智能切换设备。它支持将最多200路传感器轮转切换至单一接口,彻底解决测试现场因传感…

[ACTF2020 新生赛]Include 1 文件包含

题目界面查看源码以及点击链接从题目include推断是文件包含题目 需要使用php伪协议或者hackbar中模板 ?file=php://filter/read=convet.base64-encode/resource=flag.php将flag.php代码编码成base64编码,然后输出使用…

鸿蒙NEXT网络管理:从“能用”到“智能”的架构演进 - 指南

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

PostgreSQL可观测性完整方案

PostgreSQL可观测性完整方案软件简述 PostgreSQL 是一种开源的关系型数据库管理系统 (RDBMS),它提供了许多可观测性选项,以确保数据库的稳定性和可靠性。 Observability 可观测性是指对数据库状态和操作进行监控和记…

2025 年通风天窗源头厂家最新推荐:品牌定制能力、售后体系及综合实力深度测评榜单

引言 在工业与民用建筑通风系统中,通风天窗的品质直接决定室内空气质量、能耗成本与安全保障,然而当前市场厂家良莠不齐,部分产品存在防水性能差、通风效率低等问题,且定制服务缺失、售后响应滞后,给企业与建筑方…

2025年唐卡装饰权威深度解析:家装行业新格局与品质承诺

引言 本文聚焦“核心业务与服务优势”这一核心维度,结合公开资料与第三方调研,为正在筛选装修服务商的消费者提供一份可验证、可落地的客观参考。 背景与概况 重庆唐卡装饰2009年成立,历时16年得到100万精英用户的口…