Seastar: FPC(1)
Preface
主要讲的是 future&promise 一些基本概念以及设计原理,但是因为 Seastar 中的实现1非常复杂(毕竟是工业级产品,且经过了 LLVM 大牛的大量优化,多了许多与核心实现其实并没有关系的逻辑),所以我并不打算直接硬上(其实之前尝试过好几次,但是看完了之后还是迷迷糊糊一知半解,所以打算换一种方法),而是打算借助 CppCon 2015 上的一个 presentation2 来介绍它的基本原理和简易实现;BTW,这个 presentation 的确讲的非常好,力荐!
Future and Promise
标题中的 FPC 指的是 Future、Promise 和 Continuation 这三个概念,这是并发编程中非常常见的一种编程模型;首先不管 Continuation 是什么,先了解 Future 和 Promise;Wikipedia 是这样解释3的:
In computer science, future, promise, delay, and deferred refer to constructs used for synchronizing program execution in some concurrent programming languages. They describe an object that acts as a proxy for a result that is initially unknown, usually because the computation of its value is not yet complete.
The terms future, promise, delay, and deferred are often used interchangeably, although some differences in usage between future and promise are treated below. Specifically, when usage is distinguished, a future is a read-only placeholder view of a variable, while a promise is a writable, single assignment container which sets the value of the future.
Wikipedia 更加侧重于二者的定义,而 Scala 官方文档4则对二者的功能有着更加详细的解释:
A future is a placeholder object for a result that does not yet exist. A promise is a writable, single-assignment container, which completes a future. Promises can complete the future with a result to indicate success, or with an exception to indicate failure.
Seastar 对这两个概念的解释5则比较简洁:
A future is a data structure that represents some yet-undetermined result. A promise is the provider of this result.
综合以上定义,我们可以这样理解
future 是一个结果的容器,它是只读的;为了得到这个结果我们需要付出一些努力,所以它其中的值可能当下并不存在/并不确定(所以也可以把 future 当做是一个占位符);而 promise 则是该结果的提供方,我们给 future 承诺了一个结果,最后我们付出了努力并得到了结果,需要通过 promise 将其写入,此时可以通过 future 获取到一个确定的值:它可能是一个成功的结果,说明努力终有回报;也可能是一个异常,表示尽管努力了也没有得到好结果,毕竟:
一个人的命运啊,当然要靠自我奋斗,也要考虑到历史的行程。
Future 的中文翻译为期值,即期望的值,我觉得是一个非常好的翻译;也就是说,这个值可能当下因为各种各样的原因导致承诺(Promise)不能马上兑现(unavailable),但是没有关系,我们知道它最终会兑现(available),不管它用什么样的方法;而 Future 的作用,就像我们从文件/网络中读取数据需要持有一个 fd 一样:有了 Future,那么在承诺(Promise)兑现时,我们就知道该从哪里(Future)获取到承诺的数据,其实它相当于一个句柄,或者说一个值的占位符(placeholder)。
或者我们可以将 Promise&Future pair 视为一个先进先出队列,该队列只能使用一次。Promise 是队列的生产端,而 Future 是消费端。与 FIFO 一样,Future 和 Promise 用于解耦数据生产者和数据消费者。
C++11 也为并发编程提供了 future&promise 模型,但是相比于 Seastar 中的 future&promise 功能则简陋了很多,但是没有关系,这篇博客就是实现 C++11 中提供的 future&promise 机制。
信箱模型
前面已经提到过 future&promise 模型也是生产者消费者模型中的一种,这里我们将其具现化,考虑 mailman 送信,recipient 收信的流程:
- recipient 一开始在信箱旁边睡觉
- mailman 将信件丢进邮箱,并标记该邮箱中有信件,最后摇铃叫醒 recipient
- recipient 被唤醒,检查信箱标记,从而取出信件
有几点注意:
- recipient 并不会一直去检查邮箱,而是让 mailman 唤醒它;毕竟轮询是低效的
- 信箱在嘈杂的马路旁边,所以 recipient 可能会被吵醒,因此信箱有一个标记,用于表示其中是否有信件,这样 recipient 醒来时不用真正打开邮箱,直接检查标记即可判断信箱内是否有信件。
- 目前引入信箱模型是为了引入一些概念,越到后面越会慢慢弱化信箱模型
代码实现
下面是信箱模型的代码实现:
|
|
其中 SysScheduler()
返回一个全局的线程池对象,其 schedule()
方法接收一个 lambda task,它会将该 task 调度至线程池中的某个线程中执行;需要注意,future&promise 模型并不是非得用在异步编程中,同步编程(利用线程池直接执行任务也是同步的,毕竟它会阻塞线程)中同样可以使用,只不过是 Seastar 将其用在了异步场景,不要先入为主了;考虑到同步异步并不是这里的主要探究对象,所以还是以更容易理解的同步编程作为例子,而且也没有给出具体实现代码。
此外,由于 mailbox 的送信流程与 recipient 的收信流程在不同线程中执行,所以需要采用某种线程同步机制。
上面代码中并没有关于 Future 和 Promise 的具体类型定义,但是它们在代码逻辑中体现了:
SysCheduler().schedule()
函数的 lambda 执行的是 mailman 的工作,它在线程池中的线程上执行,所以不会阻塞当前线程- recipient 在条件变量上睡眠,需要注意使用循环避免虚假唤醒
- 当 mailman 送信完毕,会通过条件变量唤醒 recipient
SharedState
上面的代码比较 high level 地实现了 future&promise,但是有许多不足:
- mailbox/mailman/recipient 目前还只是一些代码片段,很难复用,我们需要将其进行抽象
- 目前 mailman 和 recipient 之所以能够使用 mailbox,还是因为他们和 mailbox(
oa
,mtx_a
,cv_a
,ready_a
)处于同一层 C++ 作用域之中(mailman 通过 lambda 的 capture by reference 引用 mailbox);倘若它们不在一个作用域中,比如 recipient 被移动到其他地方(毕竟现实生活中也存在这种情况),那么再让它们按照这种方式维持关系就难了;所以我们需要一种显式地链接 mailbox 和 mailman/recipient 关系的机制 - 线程同步机制属于 future&promise 的内部实现细节,不应该暴露给用户
mailbox/mailman/recipient 需要被良好的封装,这里我们将其通过 C++ class/struct 呈现;mailbox 的生命周期不应该局限于 C++ 作用域,所以应该是在堆上;mailbox 是 mailman 和 recipient 之间的纽带,所以二者都持有 mailbox 的地址;这里我们将 mailbox 抽象为 SharedState
:
|
|
Future、Promise
通常首先创建一个 Promise,然后通过其 get_future
方法获取其关联的 future,这样二者的共享同一个 ShareState
,倘若此时 state 中已经有值了,那么获取的就是一个 ready future。
Promise
至少需要提供一下 3 个方法:
- 设置结果
- 设置异常
- 获取关联的 future
|
|
设置异常的逻辑和设置结果基本一直,就不赘述;有几点需要注意:
Promise
是 single assignment 的,也就是说它只能设置一次值(包括异常),因为我们需要保证 future 得到的是一个确定的值Promise
不能被拷贝(其拷贝构造函数以及拷贝赋值运算符是delete
的),而只允许移动,我们不希望有多个Promise
可以操纵同一个SharedState
,这样Future
的结果也无法保证- 最多只允许从 promise 中获取一个关联的 future(TODO: 这点是为什么呢?)
- 这里直到第 10 行我们才加锁,而不是在第 8 行检测
state_.ready_
就加锁;因为只有Promise
才会写state_.ready_
字段,而同时被多个线程读则是安全的;而且Promise
是不可拷贝的,所以只会存在一个使用该SharedState
的Promise
;此外,在后面的Future::get_value()
函数中也可以看到,只有在ready_ = true
的情况下才会去读取value_
,所以在Promise::set_value()
中,我们只需要保证value_
的写入是在更新ready_
之前,就也不用对该操作加锁。
Future
的定义如下:
|
|
在 get_value
方法中,我们将 state_
移动到一个局部变量,当函数结束后,state_
和 sp
都变成了 nullptr,后续便再也不能通过 Future 操纵 state_
了,也就是说 future 中的值被读取之后,这个 future 就失效了。此外,我们还可以添加 is_ready
, is_valid
等方法,用于判断 future 中是否有值了,以及 future 中的 state_
是否还有效。
最终,Future
, Promise
和 SharedState
三者之间的关系如下:
Promise 析构
考虑这样一种情况:promise 在 set_value
/set_exception
之前就被析构了,比如在之前代码的基础上随机地调用 set_value
:
|
|
这种情况下需要特别注意,因为很有可能有 future 在等待,如果什么都不做,那就很有可能导致 future 一直处于等待中,所以在 Promise 销毁的时候,我们需要检查并处理这种情况:
|
|
- 如果状态已经被销毁,那么可能是因为这个 Promise 被移动了,此时我们什么都不用做
- 如果没有从这个 Promise 中获取过 Future,那么肯定没有 Future 在等待,也什么都不用做,让 state_ 自行析构即可
- 如果这个 Promise 已经 ready 了,那么肯定已经通知过 Future 了,什么都不用做
- 只有移动拷贝时才需要丢弃已有的内容,而移动构造时内部还没有东西所以就不用调用
abandon_state()
方法;而拷贝构造和拷贝赋值则早已被声明为delete
,也无需处理
否则的话,我们走 set_exception()
的流程设置异常并通知等待方即可。
异常处理
现在我们可以这样使用这一套机制:
|
|
可以看到,future&promise 的内部逻辑已经被良好地隐藏了,而且二者也不再依靠 C++ 的作用域来保持联系,即使二者都被 std::move
,也没有丝毫问题。
但是 schedule()
可能存在问题,如果 set_value
抛出了异常,怎么办呢?当然是捕获它:
|
|
的确可以解决问题,但是这又是一份样板代码——毕竟在每次 set_value
时都需要写一遍;而且我们还需要手动 set_value
/set_exception
,是否可以将这一切自动化呢?
PackagedTask
C++11 提供了 packaged_task
,它和 std::function
类似,但是表示的是**延迟动作(delayed action)**的语义,在这里我们提供自己的版本,用以消除之前的异常处理样板代码:
|
|
- 很多模板相关的东西,但是只需要记住,这里的
f
其实就是之前的expensive_task
,是我们为了兑现承诺而需要付出的努力 task_
是一个UniqueFunction
,这是一种可以捕获不可拷贝对象(比如Promise
)的对象task_
的operator()
接收的参数就是f
接收的参数,但是它并不直接返回f
返回的结果(其返回类型为void
),而是通过将其设置进future
来返回结果- 整个
PackagedTask
其实就是对之前Promise
、expensive_task
以及执行完expensive_task
之后设置 value 并处理异常等逻辑的封装 PackagedTask
并没有调度f
执行(SysScheduler().schedule
),它只是将要被调度执行的操作做了一层封装(通过实现operator()
),这样后续可以直接调度执行它
其中第四点在下面可以看到:
|
|
有了以上 PackagedTask
的实现,C++11 提供的 std::async
函数可以这样实现:
|
|
此时线程池执行的对象不再是 func
,而是 task
,也就是 func
的一个更安全的 wrapper:在 func
执行过程中的任何异常,都会被捕获并通过 future
传递出去。
至此,我们将 future&promise 的实现细节(包括 set_value
和 set_exception
) 通过 PackagedTask
全部封装了起来,如果是简单的使用直接用 async
函数即可。
真正的 std::async
上面 async
函数实现使用上有一个问题,如果要执行的函数需要参数,我们怎么将其传递给该函数呢,毕竟目前的 async
函数只接收一个参数;也许我们可以这样:
|
|
每次都需要将函数包裹成一个没有参数的 lambda,并且将该函数的实参捕获进来,又是样板代码;是否可以像 std::thread
一样,直接将函数所需要的参数传递给 async
,由 async
内部转发给实际需要的函数呢?像下面这样:
|
|
这个比较简单,只需要将传给 async
的额外参数打包成一个 std::tuple
,然后在通过这个 tuple
调用 async
的第一个函数参数即可:
|
|
用到了 std::apply
6,这是一个非常有用的函数,用于在一个 tuple 上调用一个可调用对象,后面可以专门讲一讲。
Future 的串联
许多语言/框架提供的 future&promise 都提供了 then()
方法,用于在 future 被 fulfil/resolve 之后执行一些操作;这个方法的代码应该是整个 future&promise 实现中最难理解的部分了,所以先不放代码,而是去看看它应该怎么用:
|
|
也即使说,then
接收的函数,需要接收一个 then
的调用者 Future
一样类型的(实际调用时就是传入调用方 Future
),然后返回另一个 Future
(类型可以和 then
的调用方 Future
相同或者不同),由此又可以进一步串联其他操作,这样,每次遇到无法立即完成的操作,我们都将其放在线程池中去执行,而在本地只执行那些不会阻塞本线程的操作;在 future 被 fulfil 的过程中,我们可以去执行其他一些操作(又来到了异步?)。
future 被 fulfil/resolve 之后执行一些操作,实际上是 future 的延续,所以被称为 continuation:
|
|
then
方法可以这样实现:
|
|
然后在 set_value()
中,我们需要做相应的改动(set_exception()
类似的,不赘述):
|
|
首先将 continuation 包装成一个 PackagedTask
,这个 task 的 operator()
不接受任何参数,但是返回一个 continuation 返回类型的 Future
;而实际该 continuation
接收的是调用 then
的调用方 Future
,而这之所以没有体现在 task 的 operator()
中,是因为我们可以直接引用 *this
,而不需要显式加上这个参数。
有可能在调用 then()
时,调用方 Future
已经 ready,那么直接调度 PackageTask
(由 continuation 而来)执行即可;否则需要将其存入当前 Future
(即 then
的调用方) 的 state 中,最终调用 Promise
的 set_value
/set_exception
时会调度执行该 task;最后,从从 continuation 创建而来的 task 中获取一个 future 作为 then()
方法的返回结果返回。
注意,在 then
的实现构造 task
时(第七行),对 this
采用了 capture by move,也就是说,对于 h = f.then(g)
这个调用,在 then()
调用之后,f
就失效了,我们不能再使用它(但是 continuation 中还可以使用),而应该使用 then
返回的 h
——一个新的 Future
,之所以要这样,是因为既然我们选择了对 f
调用 then
方法,那么就已经认定了 f
的结果将会给 g
使用,而倘若我们再持有 f
,那么就可以继续通过 get_value
获取其结果,而 future
的结果只能获取一次,这将会导致 g
中的对 f.get_value()
必然失败
(假设用户自行调用 f.get_value()
在前);为了避免类似的情况,以我们直接将 f
置为失效。