Seastar: FPC(2)

balus

2022-03-09

Preface

这里不打算完整地解释 Seastar 中 future&promise 的实现,里面旁枝末节实在太多,比如:

所以还是希望抓住其中的一些重难点以及做的一些优化进行分析。

这个系列的第一篇文章1中已经 future&promise 的大致实现流程进行了解释,Seastar 整体思想与之类似,但是还是有一些不同2

类层次结构

首先看看 FPC 的类结构,这里直接放出其 UML 图:

seastar-fpc-uml

非常复杂的层次结构,关于图片有几点注意:

从之前的 SharedState 我们可以总结出,futureproimise 的共享状态需要包含 future 的状态,异常值、数据;在 Seastar 中这一角色由 future_state 承担:其中数据从 uninitialize_wrapper 中继承,状态、异常值则从 future_state_base 中继承。

实现重点

主要介绍 fpc 中几个重要的操作。

future 的串联:

也就是 then 方法的实现,它会在 future ready 时调度一段代码执行,由于这个方法返回的也是一个 future,所以可以实现 future 的串联;由于 then() 涉及到两队 future&promise 以及好几个函数,所以在看代码之前我们有必要约定几个叫法:

此外,对于 future 中的值从待定变为确定的过程(实际上是future_state_base::any::st的变化),我们称之为 fulfil/resolve/ready。

好,来看看代码:

template <typename Func, typename Result = futurize_t<typename call_then_impl::template result_type<Func>>>
SEASTAR_CONCEPT( requires std::invocable<Func, T SEASTAR_ELLIPSIS> || internal::CanInvokeWhenAllSucceed<Func, T SEASTAR_ELLIPSIS>)
Result
then(Func&& func) noexcept {
#ifndef SEASTAR_TYPE_ERASE_MORE
    return call_then_impl::run(*this, std::move(func));
#else
    using func_type = typename call_then_impl::template func_type<Func>;
    noncopyable_function<func_type> ncf;
    {
        memory::scoped_critical_alloc_section _;
        ncf = noncopyable_function<func_type>([func = std::forward<Func>(func)](auto&&... args) mutable {
            return futurize_invoke(func, std::forward<decltype(args)>(args)...);
        });
    }
    return call_then_impl::run(*this, std::move(ncf));
#endif    
}

先不管函数签名里面的各种 typenameXXX::template 以及 SEASTAR_CONCEPT;直接看函数体,其中会根据 SEASTAR_TYPE_ERASE_MORE 宏进行条件编译,但不管是哪种情况,最终都会调用 call_then_impl::run() 函数:

template <typename SEASTAR_ELLIPSIS T>
class SEASTAR_NODISCARD future : private internal::future_base {
    using call_then_impl = internal::call_then_impl<future>;
    ...
};

这个类根据调用 thenfuture 的类型(更准确地说是 future 中包含的值的类型)进行了偏特化,但是我们只关注 generic case,发现最终还是走到了 future::then_impl() 方法:

template <typename Func, typename Result = futurize_t<internal::future_result_t<Func, T SEASTAR_ELLIPSIS>>>
Result
then_impl(Func&& func) noexcept {
    using futurator = futurize<internal::future_result_t<Func, T SEASTAR_ELLIPSIS>>;
    if (failed()) {
        return futurator::make_exception_future(static_cast<future_state_base&&>(get_available_state_ref()));
    } else if (available()) {
        return futurator::invoke(std::forward<Func>(func), get_available_state_ref().take_value());
    }
    return then_impl_nrvo<Func, Result>(std::forward<Func>(func));
}

把里面的宏都去掉了,可以发现分为两段逻辑:

template <typename Func, typename Result>
Result then_impl_nrvo(Func&& func) noexcept {
    using futurator = futurize<internal::future_result_t<Func, T SEASTAR_ELLIPSIS>>;
    typename futurator::type fut(future_for_get_promise_marker{});
    using pr_type = decltype(fut.get_promise());
    schedule(fut.get_promise(), std::move(func), [](pr_type&& pr, Func& func, future_state&& state) {
        ...
    });
    return fut;
}

从名字就可以看出来,这个函数主要是为了利用 C++ 的 Named Return Value Optimization 特性3,但是这不是我们今天的重点;首先 futurator 是一个 helper 类,会根据 func 以及「调用方 future」存储的值的类型萃取出许多有用的类型,比如func 返回的 future 类型(也就是后面的 futurator::type),future 关联的 promise 的类型…,,然后创建一个 future 作为整个 then 的返回值,最终将「返回方 promise」,以及「用户函数」func 包装成一个 lambda,这里先不用管这个 lambda 的签名,也不管它做了什么,只知道它被调度了:

template <typename Pr, typename Func, typename Wrapper>
void schedule(Pr&& pr, Func&& func, Wrapper&& wrapper) noexcept {
    memory::scoped_critical_alloc_section _;
    auto tws = new continuation<Pr, Func, Wrapper, T SEASTAR_ELLIPSIS>(std::move(pr), std::move(func), std::move(wrapper));
    schedule(tws);
    _state._u.st = future_state_base::state::invalid;
}
void schedule(continuation_base<T SEASTAR_ELLIPSIS>* tws) noexcept {
    future_base::schedule(tws, &tws->_state);
}

对于这个 schedule 方法,我们时刻记住:

[](pr_type&& pr, Func& func, future_state&& state) {
    if (state.failed()) {
        pr.set_exception(static_cast<future_state_base&&>(std::move(state)));
    } else {
        futurator::satisfy_with_result_of(std::move(pr), [&func, &state] {
            return internal::future_invoke(func, std::move(state).get_value());
        });
    }
}

第一个参数是「返回方 promise」,第二个参数是「用户函数」,第三个参数 state 指向的是创建的 continuation 中的本地状态(continuation::_state);首先通过这个状态我们检查「调用方 future」被 resolve 时的状态:如果它是成功的,那么就执行 func 并通过 pr.set_value()(相关逻辑在 futurator::satisfy_with_result_of 中) 以正常的方式激活「返回方 future」;否则如果它失败了,那么就不执行 func,而是直接 pr.set_exception 以异常的方式激活「返回方 future」从而将异常传播出去。

所以我们知道为什么除了 func 之外,还需要一个 wrapper:因为 func 只有在特定条件下(future 无异常)才会被执行;并且当「调用方 future」resolve 之后,除了调用 func,我们还得激活「返回方 future」,这些逻辑都是 wrapper 需要做的。

那么现在问题来了,为什么我们可以通过 continuation 中的本地状态就可以得到调用方 future 的结果呢? 那得继续看 future_base::schedule

void schedule(task* tws, future_state_base* state) noexcept {
    promise_base* p = detach_promise();
    p->_state = state;
    p->_task = tws;
}
promise_base* detach_promise() noexcept {
    _promise->_state = nullptr;
    _promise->_future = nullptr;
    return std::exchange(_promise, nullptr);
}

可以看到里面干了两件事:

前面4提到过,在调用 then 之后,「调用方 future」理应要失效;由于 ready 的 「调用方 future」早已在 then_impl() 方法中就原地执行了「用户函数」,所以这里它肯定是 unavailable 的,而此处取消了它和「调用方 promise」之间的联系,那么「调用方 promise」就再也不会激活了,他也就失效了。

而我们也知道了,「调用方 future」的值必定是要喂给「用户函数」的,而现在「调用方 future」还没有 ready,所以它的值肯定得等 「调用方 promise」设置进去之后才会有,所以这里直接让「调用方 promise」中的状态指针指向 continuation::_state,这样就可以直接将值设置进来从而喂给「用户函数」了。

这里可以说是「调用方 future」失效了,但是也可以这样理解:「调用方 future」没有失效,但是由用户所持有的那个 future 不能再使用了,它其实是被转移进了 continuation(虽然 continuation 中没有直接存储 future,但是「调用方 promise」的状态指针指向了 continuation 的本地状态,所以也可以这样理解),所以说 continuation 中包含有「调用方 future」以及「返回方 promise

按照这个思路,那么 future 的串联在物理结构上应该是这样的:

seastar-fpc

还有最后一个问题,这个 continuation(或者说 func/wrapper)什么时候会执行呢? 我们可以想到,必然是在「调用方 future」 被 resolve之后,也就是「调用方 promise」调用了 set_value 或者 set_exception

// future.hh
template <typename... A>
void set_value(A&&... a) noexcept {
    if (auto *s = get_state()) {
        s->set(std::forward<A>(a)...);
        make_ready<urgent::no>();
    }
}

// future.cc
template <promise_base::urgent Urgent>
void promise_base::make_ready() noexcept {
    if (_task) {
        if (Urgent == urgent::yes) {
            ::seastar::schedule_urgent(std::exchange(_task, nullptr));
        } else {
            ::seastar::schedule(std::exchange(_task, nullptr));
        }
    }
}

// reactor.cc
void schedule(task* t) noexcept {
    engine().add_task(t);
}

最终调用的是 seastar::schedule,这个很名字已经很直观,就不贴内部实现细节了;其实就是把 continuation 添加到当前线程的 reactor 的任务队列里面去,在 reactor 的工作循环中会从队列中取出任务并执行,并调用其 run_and_dispose 方法(对 task 基类虚函数的重写):

virtual void run_and_dispose() noexcept override {
    try {
        _wrapper(std::move(this->_pr), _func, std::move(this->_state));
    } catch (...) {
        this->_pr.set_to_current_exception();
    }
    delete this;
}

这里和前一篇文章5是类似的,都会捕获在执行「用户函数」以及激活「返回方 future」过程中抛出的异常,并设置到「返回方 future」中以异常的方式激活它;注意 continuationnew 出来的,所以在执行完毕之后需要将其销毁(delete this,又是我从来没有用过的骚操作)。

总结一下 then 方法的流程:

  1. 创建「返回方 future」与「返回方 promise」,前者作为 then() 方法的返回值返回给调用者,后者用于在 continuation 中激活「返回方 future」
  2. 将「用户函数」封装成一个 continuation 结构,并保存「返回方 promise
  3. 将「调用方 future」的状态指针指向 continuation 的本地状态
  4. 当异步函数成功返回,「调用方 promise」会激活 continuation——将其放入当前线程的 reactor 的任务队列中等待执行
  5. continuation 得到执行后,会通过保存的「返回方 promise」激活「返回方 future

实现细节

前面主要讲了 Seastar 中 futurethen() 方法的实现,这只是整个 FPC 中的一角,还有一些其他重要细节也值得去了解。

future 的状态

前面说过,Seastar 中 FPC 的共享状态数据来自两部分,其中 future 的状态(和共享状态不是一回事)和异常值从 future_state_base 继承而来:

struct future_state_base {
    enum class state : uintptr_t {
        invalid = 0,
        future = 1,
        result_unavailable = 2,
        result = 3,
        exception_min = 4,
    };
    
    union any {
        state st;
        std::exception_ptr ex;
    } _u;
}

其中 future 状态很好理解,就是发起了异步操作但是还没有返回时的状态;result 状态就是异步返回而且成功,而所有 >= exception_min 的状态,都是异常状态,即异步操作返回了但是却失败了。

invalid 的状态,表示这个 future 不能再使用了,比如一个 future 执行了 then 操作之后它就是 invalid 的,被转移后源也是 invalid

根据 Seastar 中的相关 commit6 中我们可以找到 result_unavailable 这个状态的作用,它其实是为了解耦「禁止从 future 中多次获取结果」以及「释放 future 包裹的对象」这两个问题的处理流程。如果一个 future 曾经存储过值,那么释放它的时候需要做一些特殊处理,所以如果调用 get 之后直接将其设置为 invalid,那么就不知道它是否存储过值也就无法进行特殊处理了,所以加上这个状态用来表示这个 future「曾经存储过值,但是后面被取走了」。

以下是 union any 提供的几个用于判断当前共享状态的状态的方法:

// union any
bool valid() const noexcept { return st != state::invalid && st != state::result_unavailable; }
bool available() const noexcept { return st == state::result || st >= state::exception_min; }
bool failed() const noexcept { return __builtin_expect(st >= state::exception_min, false); }
bool has_result() const noexcept { return st == state::result || st == state::result_unavailable; }

从使用者的角度来看,valid 肯定是共享状态中存有值,而 result_unavailable 只是表示「之前有过值,但是现在没有」,所以肯定不是 valid 的;而 avaiable 表示异步操作已经完成,可能是成功,也可能是失败(异常)。

以下是 future 提供的几个用于判断状态的 public 函数,从 future 使用者的角度来看只有两种状态:

// class future
bool available() const noexcept { return _state.available(); }
bool failed() const noexcept { return _state.failed(); }

即「异步操作是否完成」以及「异步操作是否失败」,没有提供「异步操作是否成功」的接口,因为异步操作完成了不是失败就是成功;二者都是在 union any 提供的方法上做的封装,没啥好说的。

tagged pointer

另外一个值得讲一讲的是为什么 future_state 需要从两个地方分别继承它所需要的数据,为什么要将状态和异常值放一起?

之所以要将状态和异常值放在一个 union 中,是因为这样可以利用 tagged pointer7 技术节省一个指针大小的内存。FPC 是 Seastar 中异步编程的基石,在整个项目中被大量使用,所以可以在这样一个基础功能上节省内存带来的收益将是巨大的。

关于 tagged pointer,大多数架构都是字节可寻址的,但某些类型的数据通常会与数据的大小对齐,比如说指针,在 64 位架构上,指针为 8 字节,所以指针的值总是 8 的倍数,也就意味着其最低 3 位总是为 0,我们可以用它来存储一些信息;相当于给指针加了一个 tag,所以叫做 tagged pointer。

我们再来重新审视一下 union any 的定义:

enum class state : uintptr_t {
    invalid = 0,
    future = 1,
    result_unavailable = 2,
    result = 3,
    exception_min = 4,
};

union any {
    state st;
    std::exception_ptr ex;
} _u;

union any 可以被作为一个 state 解释,也可以作为一个 std::exception_ptr 解释。

state 是一个 uintptr_t 即一个指针大小,但是其中 0/1/2/3 状态只使用了 uintptr_t 的最低两位;std::exception_ptr 其实就是一个指针,所以当 union any 作为一个 std::exception_ptr 解释时,其最低两位是用不上的,而这正好可以给 state 使用;而 std::exception_ptr 只要有值,其值必然是 >=8 的,所以从 state 角度来看,只要它 >=4,说明里面存储的肯定是一个 std::exception_ptr,直接取出 ex 即可。

但是这里没有直接将二者作为一个字段使用,而是采用了 union 的方式使用两个字段(同一块内存,两种解释),这样会更加直观。

而为什么需要从两个地方分别继承它所需要的数据,我想是为了让各自的职责分明。

共享状态

在前一篇文章8中解释过,promise 要想激活 future,那么必须有一个共享的状态关联二者,之前使用的是 std::shared_ptr<SharedState>,是一个堆上的对象,futurepromise 都有一个指向该对象的指针从而实现状态的共享;但是 Seastar 中的 future&promise 为了极致的性能,并不希望动态分配内存,从而采用了另外一种复杂但是更加高效的方案。

Seastar 采用的方案是在 future 中存储一个本地状态,promise 有一个状态指针指向它,从而在调用 set_value/set_exception 时可以直接更新 future 的状态。通常我们先创建一个 promise,然后获取其关联的 future 返回给异步调用的发起者,此时 promise::_state 指向 future::_state;这是最核心的逻辑,但是还有一些问题需要考虑。

promise 中的本地状态

倘若 future&promise 的使用模式,都是先创建 promise,再获取 future,最后再使 future ready,那么以上策略就可以了;但是倘若我们在 get_future 之前就调用了 set_value/set_exception,那么数据该写到哪里呢,毕竟 promise::_state 只是一个指针?所以 promise 中也有一个本地状态用于处理这种情况:

Promise<int> pr;
pr.set_value(13);

倘若在这之后再调用 get_future,那么获取的就是一个 ready future:

template <typename SEASTAR_ELLIPSIS T>
inline
future<T SEASTAR_ELLIPSIS>
promise<T SEASTAR_ELLIPSIS>::get_future() noexcept {
    assert(!this->_future && this->_state && !this->_task);
    return future<T SEASTAR_ELLIPSIS>(this);
}

class future {
private:    
    future(promise<T SEASTAR_ELLIPSIS>* pr) noexcept : future_base(pr, &_state), _state(std::move(pr->_local_state)) { }
};

class future_base {
protected:
    future_base(promise_base* promise, future_state_base* state) noexcept : _promise(promise) {
        _promise->_future = this;
        _promise->_state = state;
    }    
};

可以看到做了三件事情:

如果 promise 中已经有值了,那么第三步会使得返回的 future 是 ready 的。

continuation 中的本地状态

这一点我们已经在 then() 方法实现中已经提到过了;continuation 作为「调用方 promise」和「返回方 future」之间的桥梁;then 调用之后,「调用方 promise」需要激活的是 continuation,再由 continuation 激活「返回方 future」;所以此时 continuation 必须有一个状态供「调用方 promise」设置,所以它也有一个本地状态。

注意「调用方 promise」不能将数据直接写入「返回方 future」,因为「返回方 future」接收的是「用户函数」在「调用方 promise」产生数据上的调用结果。

总结

futurepromise 的关联

以上 future&promise 模型并没有解决 future 或者 promise 移动的问题,倘若 future 被移动,promise 指向的状态不就失效了吗?因此 futurepromise 中各有一个指向对方的指针,这两个指针将二者关联起来,从而在被 std::move 之后可以更新对状态的引用:

// future.cc
void promise_base::move_it(promise_base&& x) noexcept {
    // Don't use std::exchange to make sure x's values are nulled even
    // if &x == this.
    _task = x._task;
    x._task = nullptr;
    _state = x._state;
    x._state = nullptr;
    _future = x._future;
    if (auto* fut = _future) {
        fut->detach_promise();
        fut->_promise = this;
    }
}

// future.hh
void future_base::move_it(future_base&& x, future_state_base* state) noexcept {
    _promise = x._promise;
    if (auto* p = _promise) {
        x.detach_promise();
        p->_future = this;
        p->_state = state;
    }
}

那么有一个问题,为什么 future 调用 then 之后,「调用方 future」没有和 continuation 建立这样的相互联系呢?毕竟「调用方 promise」的状态指针指向了 continuation 的本地状态。这是因为 continuation 是通过 new 动态分配出来的,所以它不会(也不需要)被移动,也就不存在更新状态指针的问题了。

ready future

通常发起一个异步请求后得到的 future 并不是 ready 的,

future<Response> async_get_response(Connection *conn) {
    future<> handshake = make_ready_future<>();
    if (!conn->reused) {
        handshake = conn->handshake();
    }
    return handshake.then([conn]() {
        /* TODO: send header/body, recv header/body... */
    });
}

上面代码模拟的是带有连接复用的 HTTP 请求流程,如果这个连接是一个新的连接,就需要先进行 TCP 三次握手,这也是一个异步流程,所以它返回一个 future,通过这个 future 可以串联后面的其他操作(发送头部发送 body,接收头部,接收 body);而如果这个连接是复用已有连接,那么就不用再执行 TCP 握手流程,直接发送数据接收数据即可;为了这两种情况的后续流程得以统一(使用 future::then 串联操作),我们需要为连接复用的流程创建一个 ready 的 future,Seastar 为此提供了 make_ready_future 这个 helper。

从前面 then 的实现中我们已经看到,ready 的 future 在执行 then() 方法时,会原地执行 「用户函数」。同样道理,Seastar 还提供了 make_exception_future 这个 ready future 的异常版本。

不存储值的 future

future 并不非得存储数据(而且也不一定只能存储一个值——即使是,那也是一个 std::tuple,其中又可以存储任意个异构类型的值),也就是 future<>,不存储数据的值相当于是一个信号,我们通过 ready 方法判断异步请求是否完成。

Seastar 中使用 future_stored_type来表示 future 中存储的数据类型(同时还有一个 future_stored_type_t 别名):

struct monostate {};

template<typename T...>
struct future_stored_type;

template<>
struct future_stored_type<> {
    using type = monostate;
};

template<typename T>
struct future_stored_type<T> {
    using type = std::conditional_t<std::is_void_t<T>, monostate, T>;
};

template<typename ...T>
using future_stored_type_t = typename future_stored_type<T...>::type;

对于存了数据和没有存数据的 future,Seastar 使用模板特化来对不同的情况设置不同的数据类型:

但是这里有个问题,为什么要引入monostate这个类型,直接引入void不行么?

这是是因为void虽然表示空,但是它并不是一种类型(尽管它还可以放在函数返回类型的位置上),所以如果直接使用void作为future_stored_type,就表示这是一种类型,然后事实并不是这样,毕竟我们无法使用 void 来声明一个变量(void*倒是可以):

template<>
struct future_stored_type<> {
    using type = void; // ok
}

using ft = future_stored_type_t<>;
ft v1; // error: Variable has incomplete type 'ft' (aka 'void')

而类型的作用就是让我们创建变量,所以直接使用void是不可取的,因此使用了monostate这个类型。而实际上 C++17 标准库也提供了std::monostate类型,但是 Seastar 考虑到这只是一个非常简单的类型,没有必要为此引入 <variant> 头文件。

随后在future_stored_type之外,Seastar 还定义了future_tuple_type_t

template<typename T>
using future_tuple_type_t = std::conditional_t<std::is_same_v<T, monostate>, tuple<>, tuple<T>;

这个就是根据future_stored_type,将包装成 tuple 类型。

Reference


  1. https://chenjianyong.com/blog/2022/03/seastar_fpc_1.html↩︎

  2. http://seastar.io/futures-promises/↩︎

  3. https://stackoverflow.com/questions/12953127/what-are-copy-elision-and-return-value-optimization↩︎

  4. https://chenjianyong.com/blog/2022/03/seastar_fpc_1.html↩︎

  5. https://chenjianyong.com/blog/2022/03/seastar_fpc_1.html↩︎

  6. https://github.com/scylladb/seastar/commit/589b24b3535714d24db875f6d7bb3a6207f09ef7↩︎

  7. https://www.wikiwand.com/en/Tagged_pointer↩︎

  8. https://chenjianyong.com/blog/2022/03/seastar_fpc_1.html↩︎