Seastar: FPC(1)

balus

2022-03-06

Preface

主要讲的是 future&promise 一些基本概念以及设计原理,但是因为 Seastar 中的实现1非常复杂(毕竟是工业级产品,且经过了 LLVM 大牛的大量优化,多了许多与核心实现其实并没有关系的逻辑),所以我并不打算直接硬上(其实之前尝试过好几次,但是看完了之后还是迷迷糊糊一知半解,所以打算换一种方法),而是打算借助 CppCon 2015 上的一个 presentation2 来介绍它的基本原理和简易实现;BTW,这个 presentation 的确讲的非常好,力荐!

Future and Promise

标题中的 FPC 指的是 FuturePromiseContinuation 这三个概念,这是并发编程中非常常见的一种编程模型;首先不管 Continuation 是什么,先了解 Future 和 Promise;Wikipedia 是这样解释3的:

In computer science, future, promise, delay, and deferred refer to constructs used for synchronizing program execution in some concurrent programming languages. They describe an object that acts as a proxy for a result that is initially unknown, usually because the computation of its value is not yet complete.

The terms future, promise, delay, and deferred are often used interchangeably, although some differences in usage between future and promise are treated below. Specifically, when usage is distinguished, a future is a read-only placeholder view of a variable, while a promise is a writable, single assignment container which sets the value of the future.

Wikipedia 更加侧重于二者的定义,而 Scala 官方文档4则对二者的功能有着更加详细的解释:

A future is a placeholder object for a result that does not yet exist. A promise is a writable, single-assignment container, which completes a future. Promises can complete the future with a result to indicate success, or with an exception to indicate failure.

Seastar 对这两个概念的解释5则比较简洁:

A future is a data structure that represents some yet-undetermined result. A promise is the provider of this result.

综合以上定义,我们可以这样理解

future 是一个结果的容器,它是只读的;为了得到这个结果我们需要付出一些努力,所以它其中的值可能当下并不存在/并不确定(所以也可以把 future 当做是一个占位符);而 promise 则是该结果的提供方,我们给 future 承诺了一个结果,最后我们付出了努力并得到了结果,需要通过 promise 将其写入,此时可以通过 future 获取到一个确定的值:它可能是一个成功的结果,说明努力终有回报;也可能是一个异常,表示尽管努力了也没有得到好结果,毕竟:

一个人的命运啊,当然要靠自我奋斗,也要考虑到历史的行程。

Future 的中文翻译为期值,即期望的值,我觉得是一个非常好的翻译;也就是说,这个值可能当下因为各种各样的原因导致承诺(Promise)不能马上兑现(unavailable),但是没有关系,我们知道它最终会兑现(available),不管它用什么样的方法;而 Future 的作用,就像我们从文件/网络中读取数据需要持有一个 fd 一样:有了 Future,那么在承诺(Promise)兑现时,我们就知道该从哪里(Future)获取到承诺的数据,其实它相当于一个句柄,或者说一个值的占位符(placeholder)。

或者我们可以将 Promise&Future pair 视为一个先进先出队列,该队列只能使用一次。Promise 是队列的生产端,而 Future 是消费端。与 FIFO 一样,Future 和 Promise 用于解耦数据生产者和数据消费者。

C++11 也为并发编程提供了 future&promise 模型,但是相比于 Seastar 中的 future&promise 功能则简陋了很多,但是没有关系,这篇博客就是实现 C++11 中提供的 future&promise 机制。

信箱模型

前面已经提到过 future&promise 模型也是生产者消费者模型中的一种,这里我们将其具现化,考虑 mailman 送信,recipient 收信的流程:

  1. recipient 一开始在信箱旁边睡觉
  2. mailman 将信件丢进邮箱,并标记该邮箱中有信件,最后摇铃叫醒 recipient
  3. recipient 被唤醒,检查信箱标记,从而取出信件

有几点注意:

代码实现

下面是信箱模型的代码实现:

Out oa;
std::mutex mtx_a;
std::condition_variable cv_a;
bool ready_a = false;

// mailman: won't block the main thread, job(mail delivery) will be done in another thread
SysScheduler().schedule([&]() {
  oa = expensive_computation(); // put a letter in the box
  std::lock_guard<std::mutex> lock(mtx_a);
  ready_a = true; // raise a flag
  cv_a.notify_all(); // clash the cymbals
})

// recipient
std::unique_lock<std::mutex> lock(mtx_a);
while (!ready_a) cv_a.wait(lock); // sleep by the mailbox
std::cout << oa << std::endl; // look in the mailbox and get the mail

其中 SysScheduler() 返回一个全局的线程池对象,其 schedule() 方法接收一个 lambda task,它会将该 task 调度至线程池中的某个线程中执行;需要注意,future&promise 模型并不是非得用在异步编程中,同步编程(利用线程池直接执行任务也是同步的,毕竟它会阻塞线程)中同样可以使用,只不过是 Seastar 将其用在了异步场景,不要先入为主了;考虑到同步异步并不是这里的主要探究对象,所以还是以更容易理解的同步编程作为例子,而且也没有给出具体实现代码。

此外,由于 mailbox 的送信流程与 recipient 的收信流程在不同线程中执行,所以需要采用某种线程同步机制。

上面代码中并没有关于 Future 和 Promise 的具体类型定义,但是它们在代码逻辑中体现了:

SharedState

上面的代码比较 high level 地实现了 future&promise,但是有许多不足:

mailbox/mailman/recipient 需要被良好的封装,这里我们将其通过 C++ class/struct 呈现;mailbox 的生命周期不应该局限于 C++ 作用域,所以应该是在堆上;mailbox 是 mailman 和 recipient 之间的纽带,所以二者都持有 mailbox 的地址;这里我们将 mailbox 抽象为 SharedState

template<typename R> Future;
template<typename R> Promise;

// mailbox class
template<typename R>
struct SharedState {
  R value_;
  std::exception_ptr exception_;
  bool ready_ = false; // flags
  std::mutex mtx_;
  std::condition_variable cv_; // cymbals
};

Future、Promise

通常首先创建一个 Promise,然后通过其 get_future 方法获取其关联的 future,这样二者的共享同一个 ShareState,倘若此时 state 中已经有值了,那么获取的就是一个 ready future。

Promise 至少需要提供一下 3 个方法:

template<typename R>
struct Promise {
  std::shared_ptr<SharedState> state_;    
  bool future_already_retrieved;
  
  void set_value(R r) {
    if (!state_) throw "no state";
    if (state_.ready_) throw "promise already satisfied";
    state_->value = std::move(r);
    std::lock_guard lock(state_->mtx_);
    state_->ready = true;
    state_->cv_.notify_all();
  }
    
  Future<R> get_future() {
    if (!state_) throw "no state";
    if (future_already_retrieved_) throw "future already retrieved";
    furture_already_retrieved_ = true;
    return Future<R>(state_);
  }    
};

设置异常的逻辑和设置结果基本一直,就不赘述;有几点需要注意:

Future 的定义如下:

template<typename R>
struct Future {
  std::shared_ptr<SharedState> state_;    
  void wait() {
    if (!state_) throw "no state";
    std::unique_lock<std::mutex> lock(state_->mtx_);
    while (!state_->ready_) state_->cv_.wait(lock);
  }
  
  R get_value() {
    wait();
    auto sp = std::move(state_);
    return std::move(sp->value_);
  }
};

get_value 方法中,我们将 state_ 移动到一个局部变量,当函数结束后,state_sp 都变成了 nullptr,后续便再也不能通过 Future 操纵 state_ 了,也就是说 future 中的值被读取之后,这个 future 就失效了。此外,我们还可以添加 is_ready, is_valid 等方法,用于判断 future 中是否有值了,以及 future 中的 state_ 是否还有效。

最终,Future, PromiseSharedState 三者之间的关系如下:

future-promise-state

Promise 析构

考虑这样一种情况:promise 在 set_value/set_exception 之前就被析构了,比如在之前代码的基础上随机地调用 set_value

  s.schedule([pa = std::move(pa)]() {
    if (random()) pa.set_value(expensive_computation(a));
  });

这种情况下需要特别注意,因为很有可能有 future 在等待,如果什么都不做,那就很有可能导致 future 一直处于等待中,所以在 Promise 销毁的时候,我们需要检查并处理这种情况:

template<typename R>
struct Promise {
  void abandon_state() {
    if (!state_ || !future_already_retrieved_) {
      return;
    }    
    std::lock_guard<std::mutex> lock(state_->mtx_); 
    if (state_->ready_) return;
    state_->exception_ = std::make_exception_ptr("broken promise");
    state_->ready_ = true;
    state_->cv_.notify_all();
  }  
    
  ~Promise() { abandon_state(); }    
  Promise& operator=(Promise &&rhs) { abandon_state(); } // only movable
};

否则的话,我们走 set_exception() 的流程设置异常并通知等待方即可。

异常处理

现在我们可以这样使用这一套机制:

Out compute_expensive_sum(In a, In b) {
  Promise<Out> pa, pb;
  Future<Out> fa = pa.get_future(), fb = pb.get_future();

  SysScheduler().schedule([pa = std::move(pa)]() {
    pa.set_value(expensive_computation(a));
  });
  SysSheduler().schedule([pb = std::move(pb)]() {
    pb.set_value(expensive_computation(b));
  });
  
  return fa.get() + fb.get();
}

可以看到,future&promise 的内部逻辑已经被良好地隐藏了,而且二者也不再依靠 C++ 的作用域来保持联系,即使二者都被 std::move,也没有丝毫问题。

但是 schedule() 可能存在问题,如果 set_value 抛出了异常,怎么办呢?当然是捕获它:

  SysScheduler().schedule([pa = std::move(pa)]() {
    try {
      pa.set_value(expensive_computation(a));
    } catch(...) {
      pa.set_exception(std::current_exception());
    }
  });

的确可以解决问题,但是这又是一份样板代码——毕竟在每次 set_value 时都需要写一遍;而且我们还需要手动 set_value/set_exception,是否可以将这一切自动化呢?

PackagedTask

C++11 提供了 packaged_task,它和 std::function 类似,但是表示的是延迟动作(delayed action)的语义,在这里我们提供自己的版本,用以消除之前的异常处理样板代码:

template<typename Signature> struct PackagedTask;
template<typename R, typename ...A> struct PackagedTask<R(A...)> {
  UniqueFunction<void(A...)> task_; // can capture things that are not copyable(i.e. promise)
  Future<R> future_;
  bool promise_already_satisfied_ = false;
  
  template<typename F> PackagedTask(F &&f) {
    Promise<R> pa;
    future_ = pa.get_future();
    task_ = [pa = std::move(pa), f = std::forwared<F>(f)] (A ...args) {
      try {
        pa.set_value(f(std::forwarded<A>(args)...));
      } catch(...) {
        pa.set_exception(std::current_exception());
      }
    };
  }
};

其中第四点在下面可以看到:

bool valid() { return task_ != nullptr; }

Future<R> get_future() {
  if (!task_) throw "no state";
  if (!future_.valid()) throw "future already retrieved";
  return std::move(future_);
}

void operator()(A ...args) {
  if (!task_) throw "no state";    
  if (promise_already_satisfied_) throw "promise already satisfied";
  promise_already_satisfied_ = true;
  task_(std::forward<A>(args)...);
}

有了以上 PackagedTask 的实现,C++11 提供的 std::async 函数可以这样实现:

template<typename Func, typename R = decltype(std::declval<Func>()())>
Future<R> async(Func func) {
  PackagedTask<R()> task(std::move(func));
  Future<R> future = task.get_future();
  SysScheduler().schedule(std::move(task));
  return future();
}

此时线程池执行的对象不再是 func,而是 task,也就是 func 的一个更安全的 wrapper:在 func 执行过程中的任何异常,都会被捕获并通过 future 传递出去。

至此,我们将 future&promise 的实现细节(包括 set_valueset_exception) 通过 PackagedTask 全部封装了起来,如果是简单的使用直接用 async 函数即可。

真正的 std::async

上面 async 函数实现使用上有一个问题,如果要执行的函数需要参数,我们怎么将其传递给该函数呢,毕竟目前的 async 函数只接收一个参数;也许我们可以这样:

In in;
Future<int> fut = async([in]() {
    return expensive_computation(std::move(in));
})

每次都需要将函数包裹成一个没有参数的 lambda,并且将该函数的实参捕获进来,又是样板代码;是否可以像 std::thread 一样,直接将函数所需要的参数传递给 async,由 async 内部转发给实际需要的函数呢?像下面这样:

In in;
Future<int> fut = async(expensive_computation, in);

这个比较简单,只需要将传给 async 的额外参数打包成一个 std::tuple,然后在通过这个 tuple 调用 async 的第一个函数参数即可:

template<typename Func, typename R = decltype(std::declval<Func>()())>
auto async(Func func) -> Future<decltype(func(std::move(args)...))> {
  using R = std::decltype(func(std::move(args)...));
  PackagedTask<R(Args ...)> task(std::move(func));
  Future<R> result = task.get_future();
  
  auto bound = ()[task = std::move(task), argstuple = std::make_tuple(std::move(args)...)] {
    std::apply(task, std::move(argstuple));
  };
  SysScheduler().schedule(std::move(task));
  return future();
}

用到了 std::apply6,这是一个非常有用的函数,用于在一个 tuple 上调用一个可调用对象,后面可以专门讲一讲。

Future 的串联

许多语言/框架提供的 future&promise 都提供了 then() 方法,用于在 future 被 fulfil/resolve 之后执行一些操作;这个方法的代码应该是整个 future&promise 实现中最难理解的部分了,所以先不放代码,而是去看看它应该怎么用:

In in;
Future<int> fut1 = async(expensive_computation, in);
    
Future<double> fut2 = fut1.then([](Future<int> fut) -> double {
  int res1 = fut.get_value()
  ...
  return another_expensive_computation(...);
})

也即使说,then 接收的函数,需要接收一个 then 的调用者 Future 一样类型的(实际调用时就是传入调用方 Future),然后返回另一个 Future(类型可以和 then 的调用方 Future 相同或者不同),由此又可以进一步串联其他操作,这样,每次遇到无法立即完成的操作,我们都将其放在线程池中去执行,而在本地只执行那些不会阻塞本线程的操作;在 future 被 fulfil 的过程中,我们可以去执行其他一些操作(又来到了异步?)。

future 被 fulfil/resolve 之后执行一些操作,实际上是 future 的延续,所以被称为 continuation:

template<typename R>
struct SharedState {
  ...
  UniqueFunction<void()> continuation_;
};

then 方法可以这样实现:

template<typename F>
template<typename R>
auto Future<R>::then(F func) {
  if (!state_) throw "no state";
  auto sp = state_;
  using R2 = decltype(func(std::move(*this)));
  PackagedTask<R2()> task([func = std::move(func), arg = std::move(*this)]() mutable {
    return func(std::move(arg));
  })
  Future<R2> result = task.get_future();
  std::lock_guard<std::mutex> lock(sp->mtx_);
  if (!sp->ready_) {
      sp->continuation_ = std::move(task);
  } else {
      SysScheduler().schedule(std::move(task));
  }
  return result;
}

然后在 set_value() 中,我们需要做相应的改动(set_exception() 类似的,不赘述):

void set_value(R r) {
  if (!state_) throw "no state";
  if (state_.ready_) throw "promise already satisfied";
  state_->value = std::move(r);
  std::lock_guard lock(state_->mtx_);
  state_->ready = true;
  if (state_.continuation_) {
      SysCheduler().schedule(std::move(state_.continuation_));
      state_.continuation_ = nullptr;
  }
  state_->cv_.notify_all();
}

首先将 continuation 包装成一个 PackagedTask,这个 task 的 operator() 不接受任何参数,但是返回一个 continuation 返回类型的 Future;而实际该 continuation 接收的是调用 then 的调用方 Future,而这之所以没有体现在 task 的 operator() 中,是因为我们可以直接引用 *this,而不需要显式加上这个参数。

有可能在调用 then() 时,调用方 Future 已经 ready,那么直接调度 PackageTask(由 continuation 而来)执行即可;否则需要将其存入当前 Future(即 then 的调用方) 的 state 中,最终调用 Promiseset_value/set_exception 时会调度执行该 task;最后,从从 continuation 创建而来的 task 中获取一个 future 作为 then() 方法的返回结果返回。

注意,在 then实现构造 task 时(第七行),对 this 采用了 capture by move,也就是说,对于 h = f.then(g) 这个调用,在 then() 调用之后,f 就失效了,我们不能再使用它(但是 continuation 中还可以使用),而应该使用 then 返回的 h——一个新的 Future,之所以要这样,是因为既然我们选择了对 f 调用 then 方法,那么就已经认定了 f 的结果将会给 g 使用,而倘若我们再持有 f,那么就可以继续通过 get_value 获取其结果,而 future 的结果只能获取一次,这将会导致 g 中的对 f.get_value() 必然失败 (假设用户自行调用 f.get_value() 在前);为了避免类似的情况,以我们直接将 f 置为失效。

Reference


  1. https://github.com/scylladb/seastar/blob/master/include/seastar/core/future.hh↩︎

  2. https://www.youtube.com/watch?v=jfDRgnxDe7o↩︎

  3. https://www.wikiwand.com/en/Futures_and_promises↩︎

  4. https://docs.scala-lang.org/overviews/core/futures.html↩︎

  5. http://seastar.io/futures-promises/↩︎

  6. https://en.cppreference.com/w/cpp/utility/apply↩︎