初探 C++20 Coroutine
前言
近段时间研究了一下 C++20 的协程(Coroutine),大概了解了其中的工作原理,做一下记录。
初次接触 Coroutine 时,给我的感觉是一脸懵逼的。和其他语言简单的 async、await 不同,想要使用 C++20 的 Coroutine,它要求你定义一个包含 promise_type
的类型,其中 promise_type
又需要至少包含 get_return_object
, initial_suspend
, final_suspend
, return_void
和 unhandled_exception
函数;还没完,co_await
表达式还要你实现一个 awaitable
类型,这个 awaitable
类型至少需要实现 await_ready
, await_suspend
和 await_resume
。这一大片东西呼过来,相信很少有人能不晕。让我们一个一个来看它们究竟是什么。
co_await
与 awaitable
对象
在程序中,我们很容易遇到阻塞的情况,例如,等待 socket 数据包,等待数据库返回查询结果等等,通常为了避免这些阻塞的操作影响主线程,我们会单独开一个新的线程去做这些操作。
我们简单模拟一个阻塞操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <chrono>
#include <functional>
#include <thread>
class AddOne {
public:
// 注意构造函数本身不阻塞主线程
AddOne(int x, std::function<void(int)> result_ready_cb)
: _thread{[=, result_ready_cb = std::move(result_ready_cb)]() mutable {
// 假装阻塞了 5s 才得到结果
std::this_thread::sleep_for(std::chrono::seconds(5));
result_ready_cb(x + 1);
}} {}
~AddOne() {
// 如果 AddOne 析构时线程还没有完成,我们 detach 这个线程
if (_thread.joinable()) _thread.detach();
}
// 但是 wait_for_result 有可能阻塞主线程
void wait_for_result() { _thread.join(); }
private:
std::thread _thread;
};
需要注意的是:我们在 AddOne 中使用 std::thread
是为了模拟一个阻塞操作。
为了避免这个阻塞操作影响我们的主线程,所以我们开一个单独的线程去执行它:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <chrono>
#include <functional>
#include <iostream>
#include <thread>
class AddOne {
public:
// 注意构造函数本身不阻塞主线程
AddOne(int x, std::function<void(int)> result_ready_cb)
: _thread{[=, result_ready_cb = std::move(result_ready_cb)]() mutable {
// 假装阻塞了 5s 才得到结果
std::this_thread::sleep_for(std::chrono::seconds(5));
result_ready_cb(x + 1);
}} {}
~AddOne() {
// 如果 AddOne 析构时线程还没有完成,我们 detach 这个线程
if (_thread.joinable()) _thread.detach();
}
// 但是 wait_for_result 有可能阻塞主线程
void wait_for_result() { _thread.join(); }
private:
std::thread _thread;
};
int main(int argc, char* argv[]) {
int value = 1;
int result1 = 0;
bool result1_ready = false;
int result2 = 0;
bool result2_ready = false;
std::function<void(int)> result_handle1 = [&](int _result) mutable {
result1 = _result + 1;
result1_ready = true;
};
std::function<void(int)> result_handle2 = [&](int _result) mutable {
result2 = _result * _result;
result2_ready = true;
};
// 为了避免 add_one.wait_for_result
// 阻塞主线程,所以我们将它放到一个新线程中执行
std::thread thread([&]() {
AddOne add_one(value, result_handle1);
add_one.wait_for_result();
});
// 如果还需要等待另一个阻塞数据,那就需要再开一个线程
std::thread thread2([&]() {
AddOne add_one(value, result_handle2);
add_one.wait_for_result();
});
// std::thread 不会阻塞主线程,我们继续做其他事情...
// ...
}
注意到 AddOne
有一个特点,它可以注册一个回调函数用来通知我们是否已经准备好了数据。在线程模型中,我们除了在线程中干等它结束别无他法,但是在协程模型中,我们可以在阻塞时暂停这个任务,让执行这个任务的线程先去执行其他的任务,等到 AddOne
调用回调函数通知我们时,我们再恢复这个任务。
执行“暂停”这一操作的运算符是 co_await
,它需要一个 awaitable
对象,当调用 co_await awaitable
时,它会暂停该任务,让当前线程去处理其他任务(准确地说,是将控制返回给当前协程的调用方),直到被暂停的任务被恢复(通过协程句柄的 resume()
函数)——需要注意的是,被恢复时可能在另一个线程上,这是协程的特点之一,可以在被暂停时自由地在线程之间传递。
为此,我们需要定义一个 awaitable
类型,该类型主要需要实现下面的函数:
-
await_ready()
:在被co_await
时是否已经准备好。如果返回false
,那么co_await awaitable
就会立即暂停该协程,然后调用awaitable.await_suspend()
。如果返回true
,说明数据已经准备好了,那么co_await
就没必要暂停协程了。因为我们的AddOne
必然阻塞 5s,所以这里我们直接返回false
就行了。 -
await_suspend()
:如上所述,如果协程被暂停,该函数会被调用,我们需要在这个函数中注册回调函数,并且在回调函数中调用协程句柄的resume()
函数以表示数据已经准备好,恢复协程。该函数返回void
或true
时,协程暂停,将控制返还给调用者;返回false
时,立即恢复该协程。如果返回其他协程的协程句柄,则立即恢复那个协程。 -
await_resume()
:当协程句柄的resume()
函数被调用时,该函数被调用。该函数的返回值就是co_await awaitable
的返回值。
整合上述信息,我们为 AddOne
实现一个 AddOneAwaitable
类型:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <chrono>
#include <functional>
#include <iostream>
#include <thread>
#include <coroutine>
class AddOne {
...
};
class AddOneAwaitable {
public:
AddOneAwaitable(int x) : _x(x) {}
bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<> handle) {
AddOne add_one(_x, [=, this](int result) {
// 保存结果,用在 await_resume 中
_result = result;
// 在回调函数中,我们恢复当前协程
handle.resume();
});
}
int await_resume() const {
// 返回结果
return _result;
}
private:
int _x;
int _result;
};
协程对象与承诺对象
除了 awaitable
对象以外,我们还需要定义一个协程对象(我们用 task
作为它的类名)。
一个最简单的协程对象可以什么都没有,只包含一个 promise_type
类型:
1
2
3
4
class task {
public:
class promise_type { ... };
};
每一个协程对象都与一个承诺(Promise)对象关联。简单来说,承诺对象由协程操作,协程通过承诺对象提交结果或执行异常处理;协程对象则由调用者操作。
承诺对象需要至少实现下面的五个函数:
-
get_return_object()
:获得协程对象。我们可以在此处通过std::coroutine_handle::from_promise()
获取关联的协程句柄,然后将协程句柄传递给协程对象。 -
initial_suspend()
:该函数返回一个awaitable
对象,协程在初始化时将会co_await
它。对于惰性启动的协程,可以返回std::suspend_always
,对于立即启动的协程,可以返回std::suspend_never
。这两个类型的实现非常简单,它们的await_suspend()
和await_resume()
都是空的,而await_ready()
分别返回false
和true
。我们知道,当await_ready()
返回true
的时候,co_await
不会暂停协程,因此通过返回std::suspend_always
或std::suspend_never
可以决定协程是否在初始化时暂停。 -
final_suspend()
:和上面类似,只不过是在协程结束时co_await
它返回的awaitable
对象。需要注意的是,如果该函数返回std::suspend_never
,即协程不在结束时暂停,那么协程完成final_suspend()
之后会释放相关资源;而如果返回std::suspend_always
,即协程在结束时暂停,那么调用者仍然可以从协程对象中获取相关的资源。注意,不管暂不暂停,之后再恢复该协程都是未定义行为。另外,该函数必须是noexcept
的,也就是说它不能引发异常。 -
unhandled_exception()
:当协程因异常而结束时,调用该函数处理异常。 -
return_void()
/return_value()
:该函数与co_return
有关,我们暂且按下不表。
用上面的信息,我们来实现协程和承诺类型:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class task {
public:
class promise_type {
public:
// 获得协程对象
task get_return_object() {
// 把协程句柄交给协程对象的构造函数
return {std::coroutine_handle<promise_type>::from_promise(*this)};
}
// 没有必要在初始化时暂停,所以返回 std::suspend_never
std::suspend_never initial_suspend() { return {}; }
// 我们还需要获取协程的相关信息,因此让协程在结束时暂停,见 task::done 函数
std::suspend_always final_suspend() noexcept { return {}; }
// 异常处理,我们目前不关心它,留空
void unhandled_exception() {}
// 和 co_return 有关,暂且按下不表
void return_void() {}
};
// 保存一下协程句柄
task(std::coroutine_handle<promise_type> handle) : _handle(handle) {}
// 协程对象的调用者可以通过该函数获取协程是否执行完成
bool done() {
// 需要注意,协程句柄的 done() 函数要求协程在结束时暂停,
// 也就是说,承诺类型的 final_suspend() 要返回 std::suspend_always
return _handle.done();
}
private:
std::coroutine_handle<promise_type> _handle;
};
我们写一个协程函数,对应在线程模型中传递给线程执行的函数:
1
2
3
4
5
6
7
8
9
// 返回值 task 将会自动生成,可以充当返回值的类型 T 一定要有 T::promise_type
task add_one_coroutine(int x, std::function<void(int)> result_handle) {
// co_await 一个 AddOneAwaitable,
// 协程会在此处暂停,然后将控制交还给 add_one_coroutine 的调用者。
// 整个表达式的返回值是 AddOneAwaitable::await_resume() 的返回值。
int result = co_await AddOneAwaitable(x);
// 协程句柄的 resume() 被 AddOneAwaitable 注册给 AddOne 的回调函数调用,协程函数恢复执行,可以处理数据了
result_handle(result);
}
然后,我们将线程模型替换为协程模型:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
int main(int argc, char* argv[]) {
int value = 1;
int result1 = 0;
int result2 = 0;
std::function<void(int)> result_handle1 = [&](int _result) mutable {
result1 = _result + 1;
};
std::function<void(int)> result_handle2 = [&](int _result) mutable {
result2 = _result * _result;
};
// 把线程模型改成协程模型:
task task1 = add_one_coroutine(value, result_handle1);
task task2 = add_one_coroutine(value, result_handle2);
// 由于上面两个任务都会阻塞暂停协程,所以 main 函数还可以继续做其他事情
std::cout << "hello world" << std::endl;
// main 需要做的事情已经做完了,等待协程完成
while (true) {
if (task1.done() && task2.done()) {
// 处理结果
std::cout << result1 << result2 << std::endl;
break;
}
else
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
return 0;
}
完整的代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#include <chrono>
#include <coroutine>
#include <functional>
#include <iostream>
#include <thread>
class AddOne {
public:
// 注意构造函数本身不阻塞主线程
AddOne(int x, std::function<void(int)> result_ready_cb)
: _thread{[=, result_ready_cb = std::move(result_ready_cb)]() mutable {
// 假装阻塞了 5s 才得到结果
std::this_thread::sleep_for(std::chrono::seconds(5));
result_ready_cb(x + 1);
}} {}
~AddOne() {
// 如果 AddOne 析构时线程还没有完成,我们 detach 这个线程
if (_thread.joinable()) _thread.detach();
}
// 但是 wait_for_result 有可能阻塞主线程
void wait_for_result() { _thread.join(); }
private:
std::thread _thread;
};
class AddOneAwaitable {
public:
AddOneAwaitable(int x) : _x(x) {}
bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<> handle) {
AddOne add_one(_x, [=, this](int result) {
// 保存结果,用在 await_resume 中
_result = result;
// 在回调函数中,我们恢复当前协程
handle.resume();
});
}
int await_resume() const {
// 返回结果
return _result;
}
private:
int _x;
int _result;
};
class task {
public:
class promise_type {
public:
// 获得协程对象
task get_return_object() {
// 把协程句柄交给协程对象的构造函数
return {std::coroutine_handle<promise_type>::from_promise(*this)};
}
// 没有必要在初始化时暂停,所以返回 std::suspend_never
std::suspend_never initial_suspend() { return {}; }
// 我们还需要获取协程的相关信息,因此让协程在结束时暂停,见 task::done 函数
std::suspend_always final_suspend() noexcept { return {}; }
// 异常处理,我们目前不关心它,留空
void unhandled_exception() {}
// 和 co_return 有关,暂且按下不表
void return_void() {}
};
// 保存一下协程句柄
task(std::coroutine_handle<promise_type> handle) : _handle(handle) {}
// 协程对象的调用者可以通过该函数获取协程是否执行完成
bool done() {
// 需要注意,协程句柄的 done() 函数要求协程在结束时暂停,
// 也就是说,承诺类型的 final_suspend() 要返回 std::suspend_always
return _handle.done();
}
private:
std::coroutine_handle<promise_type> _handle;
};
// 返回值 task 将会自动生成,可以充当返回值的类型 T 一定要有 T::promise_type
task add_one_coroutine(int x, std::function<void(int)> result_handle) {
// co_await 一个 AddOneAwaitable,
// 协程会在此处暂停,然后将控制交还给 add_one_coroutine 的调用者。
// 整个表达式的返回值是 AddOneAwaitable::await_resume() 的返回值。
int result = co_await AddOneAwaitable(x);
// 协程句柄的 resume() 被 AddOneAwaitable 注册给 AddOne 的回调函数调用,协程函数恢复执行,可以处理数据了
result_handle(result);
}
int main(int argc, char* argv[]) {
int value = 1;
int result1 = 0;
int result2 = 0;
std::function<void(int)> result_handle1 = [&](int _result) mutable {
result1 = _result + 1;
};
std::function<void(int)> result_handle2 = [&](int _result) mutable {
result2 = _result * _result;
};
// 把线程模型改成协程模型:
task task1 = add_one_coroutine(value, result_handle1);
task task2 = add_one_coroutine(value, result_handle2);
// 由于上面两个任务都会阻塞暂停协程,所以 main 函数还可以继续做其他事情
std::cout << "hello world" << std::endl;
// main 需要做的事情已经做完了,等待协程完成
while (true) {
if (task1.done() && task2.done()) {
// 处理结果
std::cout << result1 << result2 << std::endl;
break;
}
else
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
return 0;
}
我们来分析一下协程的执行顺序:
- 进入
main
函数 -
main
函数调用add_one_coroutine()
函数 - 创建一个
task::promise_type
类型的承诺对象,假设为promise1
- 调用
promise1.get_return_object()
,构造task
类型的协程对象,获得task1
-
promise1.initial_suspend()
被调用,由于返回std::suspend_never
,因此协程继续执行 - 进入函数
add_one_coroutine()
内部,执行co_await AddOneAwaitable(x)
- 构造
AddOneAwaitable
对象,假设为awaitable1
- 调用
awaitable1.await_ready()
,由于我们返回false
,因此协程立即暂停 - 调用
awaitable1.await_suspend()
,之后协程将控制交还给 main 函数 -
main
函数再次调用add_one_coroutine()
函数 - 同 3~9,协程对象
task2
也做一样的事情,最后将控制交还给 main 函数 - main 函数继续做自己的事情
-
AddOne
阻塞结束,调用task1
的协程句柄的resume()
函数,task1
协程恢复。需要注意的是,这里task1
会被转移到AddOne
创建的线程中恢复执行,这一点我们后面再说 - 调用
awaitable1.await_resume()
,并将该函数的返回值作为co_await AddOneAwaitable(x)
的返回值 - 继续执行
add_one_coroutine
剩下的内容 - 函数执行结束,调用
promise1.return_void()
,该函数与co_return
有关,我们后面再讲 - 调用
promise1.final_suspend()
,协程执行结束 - 等待
task2
的阻塞结束,然后类似 12~16 步骤完成task2
协程
有几点是上述例子中没有体现出来的,需要注意:
- 同一个协程内可以出现多个
co_await
表达式 - 同一个
awaitable
对象可以被多次co_await
- 协程对象
task
也可以实现await_ready()
,await_suspend()
和await_resume()
然后被其他协程co_await
。
另一方面,我们注意到协程并不是在主线程恢复的,而是在 AddOne
创建的线程中恢复执行的。这是因为哪个线程调用协程句柄的 resume()
,哪个线程就会接手该协程的执行。
我们当然也可以在主线程中调用 resume()
,让所有协程都在同一个线程中进行。例如,我们写一个简单的计数器协程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <coroutine>
#include <iostream>
class task {
public:
class promise_type {
public:
task get_return_object() {
return {std::coroutine_handle<promise_type>::from_promise(*this)};
}
std::suspend_never initial_suspend() { return {}; }
// 这个例子中不需要在协程结束后保留相关资源,因此直接返回 std::suspend_never
std::suspend_never final_suspend() noexcept { return {}; }
void unhandled_exception() {}
void return_void() {}
};
task(std::coroutine_handle<promise_type> handle) : _handle(handle) {}
// 通过调用协程对象的 resume() 主动恢复协程
void resume() { _handle.resume(); }
private:
std::coroutine_handle<promise_type> _handle;
};
// 一个简单的计数器,功能就是在每次恢复时计数一次
task counter() {
std::suspend_always awaitable;
for (size_t count = 0;; ++count) {
// 同一个 awaitable 可以多次 co_await
// 每次 co_await,都会将控制交还给调用者,也就是 main 函数
co_await awaitable;
std::cout << "current count: " << count << std::endl;
}
}
int main(int argc, char* argv[]) {
task counter_task = counter();
for (int i = 0; i < 3; ++i) {
std::cout << "resume counter task" << std::endl;
// 主动调用 resume 以恢复协程
counter_task.resume();
}
return 0;
}
协程计时器
根据上面的信息,我们实现协程计时器有两个思路:
- 在
awaitable
的await_suspend
函数中开辟新的线程,在新线程中计时,并注册唤醒协程的回调函数。该方案的优点是精确,能确保协程在指定时间被唤醒。 - 不开辟新线程,而是在主线程中主动调用
handle.resume()
来推动计时器。该方案的优点是不需要开辟新的线程,但是缺点是只有在主线程主动调用handle.resume()
时,才能知道计时器究竟有没有计时结束,因此这种计时器的计时不够精准,其精确程度取决于主线程调用handle.resume()
的频率。
第一种方案和 AddOneAwaitable
的实现大同小异,我们就不再写一遍了。我们主要来说说第二种计时器。
首先 task
类型没啥特殊的地方,照着以前的用就行:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <coroutine>
class task {
public:
class promise_type {
public:
task get_return_object() {
return {std::coroutine_handle<promise_type>::from_promise(*this)};
}
std::suspend_never initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void unhandled_exception() {}
void return_void() {}
};
task(std::coroutine_handle<promise_type> handle) : _handle(handle) {}
bool done() { return _handle.done(); }
void resume() { _handle.resume(); }
private:
std::coroutine_handle<promise_type> _handle;
};
然后是 awaitable
类型,由于我们不打算开辟新线程来计时,因此我们不能使用形如 std::this_thread::wait_for()
的方法来计时,否则会直接阻塞主线程。
那么我们要做的其实就很简单了,只在 awaitable
里做超时判断,返回 true
/false
给外部:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <chrono>
#include <coroutine>
class SleepFor {
public:
template <typename DurationT>
SleepFor(DurationT duration) {
_time_wait_for = std::chrono::steady_clock::now() + duration;
}
bool await_ready() const { return _timeout(); }
bool await_suspend(std::coroutine_handle<>) const { return !_timeout(); }
// 这里是关键,将当前是否超时返回给 co_await 的调用者
bool await_resume() const { return _timeout(); }
private:
inline bool _timeout() const {
return std::chrono::steady_clock::now() >= _time_wait_for;
}
std::chrono::time_point<std::chrono::steady_clock> _time_wait_for;
};
那么,我们可以这样来实现协程计时器函数:
1
2
3
4
5
6
7
template <typename DurationT>
task sleep_for(DurationT duration, std::function<void(void)> callback) {
SleepFor sleep_awaitable{duration};
while (!co_await sleep_awaitable)
;
callback();
}
由于是单线程,我们在 main
函数中需要不断地推动计时器,因此最好的实现方式就是事件循环:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int main(int argc, char* argv[]) {
task t =
sleep_for(5s, []() { std::cout << "5 seconds passed" << std::endl; });
for (;;) {
// 做主线程该做的事情。在这里,我们使用 sleep 100ms 的方法来代表主线程做了
// 100ms 的其他事情
std::this_thread::sleep_for(100ms);
// 我们在 resume() 的时候才知道计时器有没有跑完,因此这个计时器是不准确的,
// 其精确程度取决于每一轮事件循环中,主线程需要做多久其他的事情。
// 但这种方法并不是没有优点,考虑数据竞争问题,这种方案可以有效地避免主线程
// 在处理数据的过程中,协程被唤醒并且导致数据竞争。
if (!t.done()) {
t.resume();
}
}
}
我们整合一下代码,可以把 SleepFor
藏在 sleep_for
函数的内部以隐藏其实现细节:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#include <chrono>
#include <coroutine>
#include <functional>
#include <iostream>
#include <thread>
using namespace std::chrono_literals;
class task {
public:
class promise_type {
public:
task get_return_object() {
return {std::coroutine_handle<promise_type>::from_promise(*this)};
}
std::suspend_never initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void unhandled_exception() {}
void return_void() {}
};
task(std::coroutine_handle<promise_type> handle) : _handle(handle) {}
bool done() { return _handle.done(); }
void resume() { _handle.resume(); }
private:
std::coroutine_handle<promise_type> _handle;
};
template <typename DurationT>
task sleep_for(DurationT duration, std::function<void(void)> callback) {
class SleepFor {
public:
SleepFor(DurationT duration) {
_time_wait_for = std::chrono::steady_clock::now() + duration;
}
bool await_ready() const { return _timeout(); }
bool await_suspend(std::coroutine_handle<>) const { return !_timeout(); }
bool await_resume() const { return _timeout(); }
private:
inline bool _timeout() const {
return std::chrono::steady_clock::now() >= _time_wait_for;
}
std::chrono::time_point<std::chrono::steady_clock> _time_wait_for;
};
SleepFor sleep_awaitable{duration};
while (!co_await sleep_awaitable)
;
callback();
}
int main(int argc, char* argv[]) {
task t =
sleep_for(5s, []() { std::cout << "5 seconds passed" << std::endl; });
for (;;) {
// 做主线程该做的事情。在这里,我们使用 sleep 100ms 的方法来代表主线程做了
// 100ms 的其他事情
std::this_thread::sleep_for(100ms);
if (!t.done()) {
t.resume();
}
}
}
co_return
我们知道,协程函数的返回值是一个协程对象,因此我们没法简单地通过 return value;
将返回值从协程函数内传递给协程调用者。
因此,C++20 给了我们 co_return
关键字,类似普通函数中的 return
,它可以结束协程,并将值从协程函数内传递给协程调用者。
co_return
表达式的核心就是我们之前刻意忽略掉的承诺类型的 return_void()
和 return_value()
函数。当 co_return;
不携带返回值调用时,promise.return_void()
被调用,类似普通函数,在协程函数末尾会隐含一个 co_return;
;当 co_return expr
被调用时,promise.return_value(expr)
被调用。
举个简单的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <coroutine>
#include <iostream>
class task {
public:
class promise_type {
public:
task get_return_object() {
return {std::coroutine_handle<promise_type>::from_promise(*this)};
}
std::suspend_never initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void unhandled_exception() {}
void return_value(int value) {
// 把返回值存起来
_value = value;
}
int _value;
};
task(std::coroutine_handle<promise_type> handle) : _handle(handle) {}
bool done() { return _handle.done(); }
void resume() { _handle.resume(); }
// 包装一个函数供 main 函数获取返回值
int get_value() { return _handle.promise()._value; }
private:
std::coroutine_handle<promise_type> _handle;
};
// 一个简单协程,仅仅只是返回 1
task just_get_1() { co_return 1; }
int main(int argc, char* argv[]) {
task get_1_task = just_get_1();
// 确保协程已经执行完
if (!get_1_task.done()) get_1_task.resume();
// 获取协程的返回值
std::cout << "value is " << get_1_task.get_value() << std::endl;
return 0;
}
co_yield
co_yield expr
实质是 co_await promise.yield_value(expr)
的语法糖,很容易看出,想要使用 co_yield
,我们需要给承诺类型实现一个 yield_value()
函数,并且该函数返回一个 awaitable
对象。这个语法通常用来实现惰性生成器,例如,我们做一个斐波那契数列的生成器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#include <coroutine>
#include <iostream>
class task {
public:
class promise_type {
public:
task get_return_object() {
return {std::coroutine_handle<promise_type>::from_promise(*this)};
}
// 由于是惰性生成器,我们在协程初始化时就暂停,所以返回 std::suspend_always
std::suspend_always initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void unhandled_exception() {}
void return_void() {}
// 每次生成值后都暂停,所以我们返回 std::suspend_always
std::suspend_always yield_value(size_t value) {
// 类似 co_return,我们把每次 yield 的值都保存起来
_value = value;
return {};
}
size_t _value;
};
task(std::coroutine_handle<promise_type> handle) : _handle(handle) {}
// 我们重载一个 operator(),当然重载一个其他函数名也是一样的
size_t operator()() {
// 恢复协程的执行
_handle.resume();
// 将本次 yield 的值返回给调用者
return _handle.promise()._value;
}
private:
std::coroutine_handle<promise_type> _handle;
};
task fibonacci() {
// yield 斐波那契数列的第一项给调用者
co_yield 1;
// yield 斐波那契数列的第二项给调用者
co_yield 1;
size_t n1 = 1, n2 = 1;
while (true) {
// 计算斐波那契数列
size_t value = n1 + n2;
// yield 给调用者
co_yield value;
// 为下一次计算做准备
n1 = n2;
n2 = value;
}
}
int main(int argc, char* argv[]) {
task fib = fibonacci();
for (int i = 0; i < 10; ++i)
// 调用一次 task::operator(),恢复协程的运行并获得一个 yield 出来的值
std::cout << "fibonacci[" << i << "] is " << fib() << std::endl;
return 0;
}
让 task
也可 co_await
基于上面的 AddOneAwaitable
的例子,我们试着将 task
也改造成 awaitable
类型。
不过我们先简化一下上面的例子,不再用回调函数了,而是用 co_return
返回结果:
1
2
3
4
task add_one_coroutine(int x) {
int result = co_await AddOneAwaitable(x);
co_return result;
}
我们的目标是,让 task
也可以被 co_await
,这意味着我们可以写:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
task add_one_coroutine(int x) {
int result = co_await AddOneAwaitable(x);
co_return result;
}
task add_two_coroutine(int x) {
int result = co_await add_one_coroutine(x);
result = co_await add_one_coroutine(result);
co_return result;
}
task add_three_coroutine(int x) {
int result = co_await add_two_coroutine(x);
result = co_await add_one_coroutine(result);
co_return result;
}
首先我们先把 task::promise_type
的 return_void()
换成 return_value()
以满足 co_return
的需求:
1
2
3
4
5
6
7
8
9
10
11
class task {
public:
class promise_type {
public:
...
void return_value(int value) { _value = value; }
int _value;
};
...
int get_value() const { return _handle.promise()._value; }
};
然后我们为 task
实现 awaitable
类型所需的三个函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class task {
public:
class promise_type {
public:
...
std::suspend_always final_suspend() noexcept {
// 这里是关键,我们需要在协程函数结束之后唤醒父协程
if (_parent_handle) _parent_handle.resume();
return {};
}
// 父协程的句柄
std::coroutine_handle<promise_type> _parent_handle;
};
...
bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<promise_type> handle) {
// 把父协程的句柄传递给 promise,这样在当前协程执行结束之后可以调用它的
// resume() 来恢复父协程
_handle.promise()._parent_handle = handle;
}
int await_resume() const { return get_value(); }
private:
std::coroutine_handle<promise_type> _handle;
};
完整代码如下,该例子在启动 15 秒后输出 4:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#include <chrono>
#include <coroutine>
#include <functional>
#include <iostream>
#include <thread>
class AddOne {
public:
AddOne(int x, std::function<void(int)> result_ready_cb)
: _thread{[=, result_ready_cb = std::move(result_ready_cb)]() mutable {
std::this_thread::sleep_for(std::chrono::seconds(5));
result_ready_cb(x + 1);
}} {}
~AddOne() {
if (_thread.joinable()) _thread.detach();
}
void wait_for_result() { _thread.join(); }
private:
std::thread _thread;
};
class AddOneAwaitable {
public:
AddOneAwaitable(int x) : _x(x) {}
bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<> handle) {
AddOne add_one(_x, [=, this](int result) {
_result = result;
handle.resume();
});
}
int await_resume() const { return _result; }
private:
int _x;
int _result;
};
class task {
public:
class promise_type {
public:
task get_return_object() {
return {std::coroutine_handle<promise_type>::from_promise(*this)};
}
std::suspend_never initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept {
// 这里是关键,我们需要在协程函数结束之后唤醒父协程
if (_parent_handle) _parent_handle.resume();
return {};
}
void unhandled_exception() {}
void return_value(int value) { _value = value; }
int _value;
// 父协程的句柄
std::coroutine_handle<promise_type> _parent_handle;
};
task(std::coroutine_handle<promise_type> handle) : _handle(handle) {}
bool done() { return _handle.done(); }
int get_value() const { return _handle.promise()._value; }
bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<promise_type> handle) {
// 把父协程的句柄传递给 promise,这样在当前协程执行结束之后可以调用它的
// resume() 来恢复父协程
_handle.promise()._parent_handle = handle;
}
int await_resume() const { return get_value(); }
private:
std::coroutine_handle<promise_type> _handle;
};
task add_one_coroutine(int x) {
int result = co_await AddOneAwaitable(x);
co_return result;
}
task add_two_coroutine(int x) {
int result = co_await add_one_coroutine(x);
result = co_await add_one_coroutine(result);
co_return result;
}
task add_three_coroutine(int x) {
int result = co_await add_two_coroutine(x);
result = co_await add_one_coroutine(result);
co_return result;
}
int main(int argc, char* argv[]) {
int value = 1;
task task = add_three_coroutine(value);
// 由于上面两个任务都会阻塞暂停协程,所以 main 函数还可以继续做其他事情
std::cout << "hello world" << std::endl;
// main 需要做的事情已经做完了,等待协程完成
while (true) {
if (task.done()) {
// 处理结果
std::cout << task.get_value() << std::endl;
break;
} else
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
return 0;
}
展望 C++23
从上面的例子不难看出,在 C++ 中想要写一份协程代码非常的麻烦,需要定义非常多的东西。而 C++23 就在着手解决这个问题,标准库将会提供一些通用的协程类型,让我们可以更简单地上手操作协程。例如,上述斐波那契数列生成器的例子,在 C++23 中可以这样写:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <generator>
#include <ranges>
#include <iostream>
std::generator<size_t> fibonacci() {
co_yield 1;
co_yield 1;
size_t n1 = 1, n2 = 1;
while (true) {
size_t value = n1 + n2;
co_yield value;
n1 = n2;
n2 = value;
}
}
int main(int argc, char* argv[]) {
for (auto const [i, item] : fibonacci() | std::views::enumerate | std::views::take(10))
std::cout << "fibonacci[" << i << "] is " << item << std::endl;
return 0;
}