2022-03-06
主要讲的是 future&promise 一些基本概念以及设计原理,但是因为 Seastar 中的实现1非常复杂(毕竟是工业级产品,且经过了 LLVM 大牛的大量优化,多了许多与核心实现其实并没有关系的逻辑),所以我并不打算直接硬上(其实之前尝试过好几次,但是看完了之后还是迷迷糊糊一知半解,所以打算换一种方法),而是打算借助 CppCon 2015 上的一个 presentation2 来介绍它的基本原理和简易实现;BTW,这个 presentation 的确讲的非常好,力荐!
标题中的 FPC 指的是 Future、Promise 和 Continuation 这三个概念,这是并发编程中非常常见的一种编程模型;首先不管 Continuation 是什么,先了解 Future 和 Promise;Wikipedia 是这样解释3的:
In computer science, future, promise, delay, and deferred refer to constructs used for synchronizing program execution in some concurrent programming languages. They describe an object that acts as a proxy for a result that is initially unknown, usually because the computation of its value is not yet complete.
The terms future, promise, delay, and deferred are often used interchangeably, although some differences in usage between future and promise are treated below. Specifically, when usage is distinguished, a future is a read-only placeholder view of a variable, while a promise is a writable, single assignment container which sets the value of the future.
Wikipedia 更加侧重于二者的定义,而 Scala 官方文档4则对二者的功能有着更加详细的解释:
A future is a placeholder object for a result that does not yet exist. A promise is a writable, single-assignment container, which completes a future. Promises can complete the future with a result to indicate success, or with an exception to indicate failure.
Seastar 对这两个概念的解释5则比较简洁:
A future is a data structure that represents some yet-undetermined result. A promise is the provider of this result.
综合以上定义,我们可以这样理解
future 是一个结果的容器,它是只读的;为了得到这个结果我们需要付出一些努力,所以它其中的值可能当下并不存在/并不确定(所以也可以把 future 当做是一个占位符);而 promise 则是该结果的提供方,我们给 future 承诺了一个结果,最后我们付出了努力并得到了结果,需要通过 promise 将其写入,此时可以通过 future 获取到一个确定的值:它可能是一个成功的结果,说明努力终有回报;也可能是一个异常,表示尽管努力了也没有得到好结果,毕竟:
一个人的命运啊,当然要靠自我奋斗,也要考虑到历史的行程。
Future 的中文翻译为期值,即期望的值,我觉得是一个非常好的翻译;也就是说,这个值可能当下因为各种各样的原因导致承诺(Promise)不能马上兑现(unavailable),但是没有关系,我们知道它最终会兑现(available),不管它用什么样的方法;而 Future 的作用,就像我们从文件/网络中读取数据需要持有一个 fd 一样:有了 Future,那么在承诺(Promise)兑现时,我们就知道该从哪里(Future)获取到承诺的数据,其实它相当于一个句柄,或者说一个值的占位符(placeholder)。
或者我们可以将 Promise&Future pair 视为一个先进先出队列,该队列只能使用一次。Promise 是队列的生产端,而 Future 是消费端。与 FIFO 一样,Future 和 Promise 用于解耦数据生产者和数据消费者。
C++11 也为并发编程提供了 future&promise 模型,但是相比于 Seastar 中的 future&promise 功能则简陋了很多,但是没有关系,这篇博客就是实现 C++11 中提供的 future&promise 机制。
前面已经提到过 future&promise 模型也是生产者消费者模型中的一种,这里我们将其具现化,考虑 mailman 送信,recipient 收信的流程:
有几点注意:
下面是信箱模型的代码实现:
;
Out oastd::mutex mtx_a;
std::condition_variable cv_a;
bool ready_a = false;
// mailman: won't block the main thread, job(mail delivery) will be done in another thread
().schedule([&]() {
SysScheduler= expensive_computation(); // put a letter in the box
oa std::lock_guard<std::mutex> lock(mtx_a);
= true; // raise a flag
ready_a .notify_all(); // clash the cymbals
cv_a})
// recipient
std::unique_lock<std::mutex> lock(mtx_a);
while (!ready_a) cv_a.wait(lock); // sleep by the mailbox
std::cout << oa << std::endl; // look in the mailbox and get the mail
其中 SysScheduler()
返回一个全局的线程池对象,其
schedule()
方法接收一个 lambda task,它会将该 task
调度至线程池中的某个线程中执行;需要注意,future&promise
模型并不是非得用在异步编程中,同步编程(利用线程池直接执行任务也是同步的,毕竟它会阻塞线程)中同样可以使用,只不过是
Seastar
将其用在了异步场景,不要先入为主了;考虑到同步异步并不是这里的主要探究对象,所以还是以更容易理解的同步编程作为例子,而且也没有给出具体实现代码。
此外,由于 mailbox 的送信流程与 recipient 的收信流程在不同线程中执行,所以需要采用某种线程同步机制。
上面代码中并没有关于 Future 和 Promise 的具体类型定义,但是它们在代码逻辑中体现了:
SysCheduler().schedule()
函数的 lambda 执行的是 mailman
的工作,它在线程池中的线程上执行,所以不会阻塞当前线程SharedState
上面的代码比较 high level 地实现了 future&promise,但是有许多不足:
oa
,mtx_a
,cv_a
,ready_a
)处于同一层
C++ 作用域之中(mailman 通过 lambda 的 capture by reference 引用
mailbox);倘若它们不在一个作用域中,比如 recipient
被移动到其他地方(毕竟现实生活中也存在这种情况),那么再让它们按照这种方式维持关系就难了;所以我们需要一种显式地链接
mailbox 和 mailman/recipient 关系的机制mailbox/mailman/recipient 需要被良好的封装,这里我们将其通过 C++
class/struct 呈现;mailbox 的生命周期不应该局限于 C++
作用域,所以应该是在堆上;mailbox 是 mailman 和 recipient
之间的纽带,所以二者都持有 mailbox 的地址;这里我们将 mailbox 抽象为
SharedState
:
template<typename R> Future;
template<typename R> Promise;
// mailbox class
template<typename R>
struct SharedState {
value_;
R std::exception_ptr exception_;
bool ready_ = false; // flags
std::mutex mtx_;
std::condition_variable cv_; // cymbals
};
通常首先创建一个 Promise,然后通过其 get_future
方法获取其关联的 future,这样二者的共享同一个
ShareState
,倘若此时 state 中已经有值了,那么获取的就是一个
ready future。
Promise
至少需要提供一下 3 个方法:
template<typename R>
struct Promise {
std::shared_ptr<SharedState> state_;
bool future_already_retrieved;
void set_value(R r) {
if (!state_) throw "no state";
if (state_.ready_) throw "promise already satisfied";
state_->value = std::move(r);
std::lock_guard lock(state_->mtx_);
state_->ready = true;
state_->cv_.notify_all();
}
Future<R> get_future() {
if (!state_) throw "no state";
if (future_already_retrieved_) throw "future already retrieved";
furture_already_retrieved_ = true;
return Future<R>(state_);
}
};
设置异常的逻辑和设置结果基本一直,就不赘述;有几点需要注意:
Promise
是 single assignment
的,也就是说它只能设置一次值(包括异常),因为我们需要保证 future
得到的是一个确定的值Promise
不能被拷贝(其拷贝构造函数以及拷贝赋值运算符是
delete
的),而只允许移动,我们不希望有多个
Promise
可以操纵同一个 SharedState
,这样
Future
的结果也无法保证state_.ready_
就加锁;因为只有 Promise
才会写
state_.ready_
字段,而同时被多个线程读则是安全的;而且
Promise
是不可拷贝的,所以只会存在一个使用该
SharedState
的 Promise
;此外,在后面的
Future::get_value()
函数中也可以看到,只有在
ready_ = true
的情况下才会去读取
value_
,所以在 Promise::set_value()
中,我们只需要保证 value_
的写入是在更新
ready_
之前,就也不用对该操作加锁。Future
的定义如下:
template<typename R>
struct Future {
std::shared_ptr<SharedState> state_;
void wait() {
if (!state_) throw "no state";
std::unique_lock<std::mutex> lock(state_->mtx_);
while (!state_->ready_) state_->cv_.wait(lock);
}
R get_value() {
wait();
auto sp = std::move(state_);
return std::move(sp->value_);
}
};
在 get_value
方法中,我们将 state_
移动到一个局部变量,当函数结束后,state_
和 sp
都变成了 nullptr,后续便再也不能通过 Future 操纵 state_
了,也就是说 future 中的值被读取之后,这个 future
就失效了。此外,我们还可以添加 is_ready
,
is_valid
等方法,用于判断 future 中是否有值了,以及 future
中的 state_
是否还有效。
最终,Future
, Promise
和
SharedState
三者之间的关系如下:
考虑这样一种情况:promise 在
set_value
/set_exception
之前就被析构了,比如在之前代码的基础上随机地调用
set_value
:
.schedule([pa = std::move(pa)]() {
sif (random()) pa.set_value(expensive_computation(a));
});
这种情况下需要特别注意,因为很有可能有 future 在等待,如果什么都不做,那就很有可能导致 future 一直处于等待中,所以在 Promise 销毁的时候,我们需要检查并处理这种情况:
template<typename R>
struct Promise {
void abandon_state() {
if (!state_ || !future_already_retrieved_) {
return;
}
std::lock_guard<std::mutex> lock(state_->mtx_);
if (state_->ready_) return;
state_->exception_ = std::make_exception_ptr("broken promise");
state_->ready_ = true;
state_->cv_.notify_all();
}
~Promise() { abandon_state(); }
& operator=(Promise &&rhs) { abandon_state(); } // only movable
Promise};
abandon_state()
方法;而拷贝构造和拷贝赋值则早已被声明为
delete
,也无需处理否则的话,我们走 set_exception()
的流程设置异常并通知等待方即可。
现在我们可以这样使用这一套机制:
(In a, In b) {
Out compute_expensive_sum<Out> pa, pb;
Promise<Out> fa = pa.get_future(), fb = pb.get_future();
Future
().schedule([pa = std::move(pa)]() {
SysScheduler.set_value(expensive_computation(a));
pa});
().schedule([pb = std::move(pb)]() {
SysSheduler.set_value(expensive_computation(b));
pb});
return fa.get() + fb.get();
}
可以看到,future&promise
的内部逻辑已经被良好地隐藏了,而且二者也不再依靠 C++
的作用域来保持联系,即使二者都被
std::move
,也没有丝毫问题。
但是 schedule()
可能存在问题,如果
set_value
抛出了异常,怎么办呢?当然是捕获它:
().schedule([pa = std::move(pa)]() {
SysSchedulertry {
.set_value(expensive_computation(a));
pa} catch(...) {
.set_exception(std::current_exception());
pa}
});
的确可以解决问题,但是这又是一份样板代码——毕竟在每次
set_value
时都需要写一遍;而且我们还需要手动
set_value
/set_exception
,是否可以将这一切自动化呢?
PackagedTask
C++11 提供了 packaged_task
,它和
std::function
类似,但是表示的是延迟动作(delayed
action)的语义,在这里我们提供自己的版本,用以消除之前的异常处理样板代码:
template<typename Signature> struct PackagedTask;
template<typename R, typename ...A> struct PackagedTask<R(A...)> {
<void(A...)> task_; // can capture things that are not copyable(i.e. promise)
UniqueFunction<R> future_;
Futurebool promise_already_satisfied_ = false;
template<typename F> PackagedTask(F &&f) {
<R> pa;
Promisefuture_ = pa.get_future();
task_ = [pa = std::move(pa), f = std::forwared<F>(f)] (A ...args) {
try {
.set_value(f(std::forwarded<A>(args)...));
pa} catch(...) {
.set_exception(std::current_exception());
pa}
};
}
};
f
其实就是之前的
expensive_task
,是我们为了兑现承诺而需要付出的努力task_
是一个
UniqueFunction
,这是一种可以捕获不可拷贝对象(比如
Promise
)的对象task_
的 operator()
接收的参数就是
f
接收的参数,但是它并不直接返回 f
返回的结果(其返回类型为 void
),而是通过将其设置进
future
来返回结果PackagedTask
其实就是对之前
Promise
、expensive_task
以及执行完
expensive_task
之后设置 value 并处理异常等逻辑的封装PackagedTask
并没有调度 f
执行(SysScheduler().schedule
),它只是将要被调度执行的操作做了一层封装(通过实现
operator()
),这样后续可以直接调度执行它其中第四点在下面可以看到:
bool valid() { return task_ != nullptr; }
<R> get_future() {
Futureif (!task_) throw "no state";
if (!future_.valid()) throw "future already retrieved";
return std::move(future_);
}
void operator()(A ...args) {
if (!task_) throw "no state";
if (promise_already_satisfied_) throw "promise already satisfied";
promise_already_satisfied_ = true;
task_(std::forward<A>(args)...);
}
有了以上 PackagedTask
的实现,C++11 提供的
std::async
函数可以这样实现:
template<typename Func, typename R = decltype(std::declval<Func>()())>
<R> async(Func func) {
Future<R()> task(std::move(func));
PackagedTask<R> future = task.get_future();
Future().schedule(std::move(task));
SysSchedulerreturn future();
}
此时线程池执行的对象不再是 func
,而是
task
,也就是 func
的一个更安全的 wrapper:在
func
执行过程中的任何异常,都会被捕获并通过
future
传递出去。
至此,我们将 future&promise 的实现细节(包括
set_value
和 set_exception
) 通过
PackagedTask
全部封装了起来,如果是简单的使用直接用
async
函数即可。
std::async
上面 async
函数实现使用上有一个问题,如果要执行的函数需要参数,我们怎么将其传递给该函数呢,毕竟目前的
async
函数只接收一个参数;也许我们可以这样:
;
In in<int> fut = async([in]() {
Futurereturn expensive_computation(std::move(in));
})
每次都需要将函数包裹成一个没有参数的
lambda,并且将该函数的实参捕获进来,又是样板代码;是否可以像
std::thread
一样,直接将函数所需要的参数传递给
async
,由 async
内部转发给实际需要的函数呢?像下面这样:
;
In in<int> fut = async(expensive_computation, in); Future
这个比较简单,只需要将传给 async
的额外参数打包成一个
std::tuple
,然后在通过这个 tuple
调用
async
的第一个函数参数即可:
template<typename Func, typename R = decltype(std::declval<Func>()())>
auto async(Func func) -> Future<decltype(func(std::move(args)...))> {
using R = std::decltype(func(std::move(args)...));
<R(Args ...)> task(std::move(func));
PackagedTask<R> result = task.get_future();
Future
auto bound = ()[task = std::move(task), argstuple = std::make_tuple(std::move(args)...)] {
std::apply(task, std::move(argstuple));
};
().schedule(std::move(task));
SysSchedulerreturn future();
}
用到了 std::apply
6,这是一个非常有用的函数,用于在一个
tuple 上调用一个可调用对象,后面可以专门讲一讲。
许多语言/框架提供的 future&promise 都提供了 then()
方法,用于在 future 被 fulfil/resolve
之后执行一些操作;这个方法的代码应该是整个 future&promise
实现中最难理解的部分了,所以先不放代码,而是去看看它应该怎么用:
;
In in<int> fut1 = async(expensive_computation, in);
Future
<double> fut2 = fut1.then([](Future<int> fut) -> double {
Futureint res1 = fut.get_value()
...
return another_expensive_computation(...);
})
也即使说,then
接收的函数,需要接收一个
then
的调用者 Future
一样类型的(实际调用时就是传入调用方 Future
),然后返回另一个
Future
(类型可以和 then
的调用方
Future
相同或者不同),由此又可以进一步串联其他操作,这样,每次遇到无法立即完成的操作,我们都将其放在线程池中去执行,而在本地只执行那些不会阻塞本线程的操作;在
future 被 fulfil
的过程中,我们可以去执行其他一些操作(又来到了异步?)。
future 被 fulfil/resolve 之后执行一些操作,实际上是 future 的延续,所以被称为 continuation:
template<typename R>
struct SharedState {
...
<void()> continuation_;
UniqueFunction};
then
方法可以这样实现:
template<typename F>
template<typename R>
auto Future<R>::then(F func) {
if (!state_) throw "no state";
auto sp = state_;
using R2 = decltype(func(std::move(*this)));
PackagedTask<R2()> task([func = std::move(func), arg = std::move(*this)]() mutable {
return func(std::move(arg));
})
Future<R2> result = task.get_future();
std::lock_guard<std::mutex> lock(sp->mtx_);
if (!sp->ready_) {
sp->continuation_ = std::move(task);
} else {
SysScheduler().schedule(std::move(task));
}
return result;
}
然后在 set_value()
中,我们需要做相应的改动(set_exception()
类似的,不赘述):
void set_value(R r) {
if (!state_) throw "no state";
if (state_.ready_) throw "promise already satisfied";
state_->value = std::move(r);
std::lock_guard lock(state_->mtx_);
state_->ready = true;
if (state_.continuation_) {
().schedule(std::move(state_.continuation_));
SysChedulerstate_.continuation_ = nullptr;
}
state_->cv_.notify_all();
}
首先将 continuation 包装成一个 PackagedTask
,这个 task
的 operator()
不接受任何参数,但是返回一个 continuation
返回类型的 Future
;而实际该 continuation
接收的是调用 then
的调用方
Future
,而这之所以没有体现在 task 的
operator()
中,是因为我们可以直接引用
*this
,而不需要显式加上这个参数。
有可能在调用 then()
时,调用方 Future
已经
ready,那么直接调度 PackageTask
(由 continuation
而来)执行即可;否则需要将其存入当前 Future
(即
then
的调用方) 的 state 中,最终调用 Promise
的 set_value
/set_exception
时会调度执行该
task;最后,从从 continuation 创建而来的 task 中获取一个 future 作为
then()
方法的返回结果返回。
注意,在 then
的实现构造
task
时(第七行),对 this
采用了 capture by
move,也就是说,对于 h = f.then(g)
这个调用,在
then()
调用之后,f
就失效了,我们不能再使用它(但是 continuation 中还可以使用),而应该使用
then
返回的 h
——一个新的
Future
,之所以要这样,是因为既然我们选择了对 f
调用 then
方法,那么就已经认定了 f
的结果将会给 g
使用,而倘若我们再持有
f
,那么就可以继续通过 get_value
获取其结果,而
future
的结果只能获取一次,这将会导致 g
中的对
f.get_value()
必然失败 (假设用户自行调用
f.get_value()
在前);为了避免类似的情况,以我们直接将
f
置为失效。
https://github.com/scylladb/seastar/blob/master/include/seastar/core/future.hh↩︎
https://www.youtube.com/watch?v=jfDRgnxDe7o↩︎
https://www.wikiwand.com/en/Futures_and_promises↩︎
https://docs.scala-lang.org/overviews/core/futures.html↩︎
http://seastar.io/futures-promises/↩︎
https://en.cppreference.com/w/cpp/utility/apply↩︎