2022-05-22
通常我们在 seastar 中写代码是不能阻塞的,如果我们希望在一段异步代码之后执行完毕之后开始执行另外一个逻辑,那么我们就需要通过 continuation 的方式将其串联起来,比如:
::sleep(2s).then([]() {
seastar/* another code piece */
})
而不能直接调用 future::get()
或者
future::wait()
:
::sleep(2s).wait();
seastar
/* another code piece */
以上代码在 wait()
的过程中就会 core dump。
但是 seastar 提供了 seastar::thread
这个工具,通过它,我们可以正常执行可能导致阻塞(即
future::wait()
)的代码:
::thread th([]() {
ssusing namespace std::chrono_literals;
::sleep(2s).wait();
ssstd::cout << "Hello, seastar thread!" << std::endl;
});
return ss::do_with(std::move(th), [](seastar::thread &th) {
return th.join();
});
实际上其内部并没有阻塞,但是却产生了以同步方式写异步代码的效果(通过
ucontext
),有点 async/await
的感觉;这个工具在单测中用的非常普遍,而且像 scylla 和 redpanda 的
main
函数都是通过它启动的。
seastar::async
虽然我们可以像上面一样直接使用
seastar::thread
,但是却要处理等待 thread
结束(join()
)、生命周期管理等问题,所以 seastar
更推荐我们使用 seastar::async
这个函数,比如之前的代码通过该函数可以改写成这样:
return ss::async([]() {
using namespace std::chrono_literals;
::sleep(2s).wait();
ssstd::cout << "Hello, seastar!" << std::endl;
});
简洁明了,看看 seastar::async
的具体实现:
template <typename Func, typename... Args>
inline
futurize_t<std::invoke_result_t<Func, Args...>>
async(thread_attributes attr, Func&& func, Args&&... args) noexcept {
using return_type = std::invoke_result_t<Func, Args...>;
struct work {
thread_attributes attr;
Func func;
std::tuple<Args...> args;
promise<return_type> pr;
thread th;
};
try {
auto wp = std::make_unique<work>(work{std::move(attr), std::forward<Func>(func), std::forward_as_tuple(std::forward<Args>(args)...)});
auto& w = *wp;
auto ret = w.pr.get_future();
w.th = thread(std::move(w.attr), [&w] {
futurize<return_type>::apply(std::move(w.func), std::move(w.args)).forward_to(std::move(w.pr));
});
return w.th.join().then([ret = std::move(ret)] () mutable {
return std::move(ret);
}).finally([wp = std::move(wp)] {});
} catch (...) {
return futurize<return_type>::make_exception_future(std::current_exception());
}
}
其中 work
结构是 seastar::thread
运行过程中使用到的上下文,各个字段的具体含义为:
attr
:创建 seastar::thread
时可以自定义的一些属性,和 pthread_attr_t
类似的func
和 args
:用户传给
seastar::async
的函数以及参数(称为用户函数)pr
:最终用户函数执行的结果就通过这个 promise(关联的
future)获取th
:即 seastar::thread
结构首先创建一个 work
实例然后初始化其结构,在初始化创建
thread
时需要传入欲执行的函数,但是并没有直接将用户提供的函数传进去,而是通过
futurize_apply
包了一层,从而可以捕获其可能产生的异常并且将该函数的返回值(以及可能产生的异常)转化为一个
future;更为重要的是,还通过 forward_to
将该 future
包含的数据转发到 work->pr
中去,而
work->pr
关联的 future 将会被作为整个
seastar::async
函数的返回值,这样我们就可以通过
seastar::async
获取到用户函数的返回值:当用户函数结束时,seastar::async
调用也会随之 resolve
和 seastar 中其他地方一样,这里也使用了 unique_ptr
+
finally
+ std::move
的方式来保证
wp
会在 async
函数结束之后才被释放(以及会被释放)。
thread
结构非常简单:
std::unique_ptr<thread_context> _context;
static thread_local thread* _current;
类变量 _current
表示当前线程中正在执行的 seastar
thread,而 thread
的所有状态数据都被放在了
thread_context
中:
class thread_context final : private task {
;
stack_holder _stack<void ()> _func;
noncopyable_function;
jmp_buf_link _context<> _done;
promisebool _joined = false;
boost::intrusive::list_member_hook<> _all_link;
using all_thread_list = boost::intrusive::list<thread_context,
boost::intrusive::member_hook<thread_context, boost::intrusive::list_member_hook<>,
&thread_context::_all_link>,
boost::intrusive::constant_time_size<false>>;
static thread_local all_thread_list _all_threads;
其中 _all_threads
链表用于组织所有的
thread,主要用于调试1;_stack
就是该 thread
使用的栈,暂且不用理会 stack_holder
是什么,知道栈其实就是一个字符数组而已就行;_func
则是用户传入的函数——外加一层 futurize_apply
的包装
在 seastar::async
函数中,可以注意到其实一共用到了两个
future
:其中一个是 w.th.join()
产生的,另一个则是用来将用户函数的执行结果传递给
seastar::async()
的调用方的
ret
。后者的用途很容易理解,就是用来串联 continuation
并将用户函数的执行结果进行传递,那么 thread::join()
是作何用的呢?
大致可以和 std::thread
进行类比,std::thread
在析构时,要么处于 detach
的状态,要么是 join 的状态,否则会执行 std::terminate
终止整个程序,seastar::thread
也是类似的:
::~thread() { assert(!_context || _context->_joined); } thread
而 join()
其实就是等待 seastar::thread
执行完毕。
启动一个 seastar thread 很简单: 只需要创建一个
thread_context
,其主要逻辑又集中在 setup
方法中:
void
::setup(size_t stack_size) {
thread_contextucontext_t initial_context;
auto q = uint64_t(reinterpret_cast<uintptr_t>(this));
auto main = reinterpret_cast<void (*)()>(&thread_context::s_main);
auto r = getcontext(&initial_context);
(r == -1);
throw_system_error_on.uc_stack.ss_sp = _stack.get();
initial_context.uc_stack.ss_size = stack_size;
initial_context.uc_link = nullptr;
initial_context(&initial_context, main, 2, int(q), int(q >> 32));
makecontext.thread = this;
_context.initial_switch_in(&initial_context, _stack.get(), stack_size);
_context}
其实就是标准的 ucontext
初始化流程,最终
initial_switch_in
就会跳进去执行用户函数——经过两步:第一步
thread_context::s_main
解析 int
参数,然后调用
thread_context::main()
函数:
void
thread_context::main() {
_context.initial_switch_in_completed();
if (group() != current_scheduling_group()) {
yield();
}
try {
_func();
_done.set_value();
} catch (...) {
_done.set_exception(std::current_exception());
}
_context.final_switch_out();
}
先不看 scheduling group
相关逻辑,剩余的代码也很简单,如果用户函数中没有执行阻塞操作(future::wait()
),那么就一路执行,直到最后
final_switch_out
跳回去,这里会跳回调用
initial_switch_in
的地方,此时
initial_switch_in
调用返回并继续执行后续的逻辑,对于
seastar::async()
函数,也就是从 thread
的创建之后的逻辑开始继续执行。
但是我们知道 seastar::async
最大的用途就是执行阻塞式用户代码,那么如果在 _func()
中执行了等待 future 的操作,seastar 是怎么处理的呢?
在了解 seastar 是如何处理等待 future
的逻辑之前,先要知道它其实用的还是 ucontext 这一套东西,在
thread_context::main()
中已经见识到了一些(initial_switch_in
和
final_switch_out
),但是还是有必要来完整地看看 seastar
是如何使用 ucontext 进行上下文切换的。
seastar 将所有上下文切换的逻辑都封装在了 jmp_buf_link
类中,不过可能与我们想象中的有所不同,seastar
中提供了两套上下文切换的方案2:
ASan 是一个内存错误检测器。它主要用于检测一些常见的内存错误,不如 use-after-free、memory-leak…,但是在上下文切换时它可能产生误报,不过它也提供了机制让使用者在进行上下文切换时通知它3,从而减少误报的概率;但是目前它只支持和 ucontext 上下文切换使用,而不支持 longjmp。
所以如果使用了 ASan,那么就只使用 ucontext API
进行上下文切换;否则的话则混合使用 ucontext + longjmp:第一次进入
uthread 的时候使用 setcontext
切换进去(因为只有
setcontext
调用才能为该 uthread
设置其栈),后续切换进去或者切换出来都使用长跳转,因为 ucontext API
涉及到系统调用,而长跳转则只是单纯的库函数,所以性能上会好很多4。
由于不熟悉 ASan,所以目前只看第二种方案;将不必要的字段移除,
jmp_buf_link
结构也就很简单了:
struct jmp_buf_link {
;
jmp_buf jmpbuf* link;
jmp_buf_link* thread;
thread_context};
jmp_buf
是长跳转本身所需要的link
用于记录是从哪一个 ucontext
跳过来的,便于跳回去thread_local jmp_buf_link g_unthreaded_context;
thread_local jmp_buf_link* g_current_context;
inline void jmp_buf_link::initial_switch_in(ucontext_t* initial_context, const void*, size_t) {
auto prev = std::exchange(g_current_context, this);
= prev;
link if (setjmp(prev->jmpbuf) == 0) {
(initial_context);
setcontext}
}
inline void jmp_buf_link::switch_in() {
auto prev = std::exchange(g_current_context, this);
= prev;
link if (setjmp(prev->jmpbuf) == 0) {
(jmpbuf, 1);
longjmp}
}
inline void jmp_buf_link::switch_out() {
g_current_context = link;
if (setjmp(jmpbuf) == 0) {
(g_current_context->jmpbuf, 1);
longjmp}
}
inline void jmp_buf_link::initial_switch_in_completed(){ /* void */ }
inline void jmp_buf_link::final_switch_out() {
g_current_context = link;
(g_current_context->jmpbuf, 1);
longjmp}
这里有俩 thread_local
的全局变量,其中
g_current_context
始终指向的是当前正在执行着的 seastar
thread jmp_buf_link
,g_unthreaded_context
是一个占位符,主要是为了方便 g_current_context
的操作:一开始它就指向
g_unthreaded_context
,这样这个指针就始终不为空,也就不用对它做特殊处理了
在看这些方法之前,首先需要去理解 switch in 和 switch out 两个操作的具体含义:
g_current_context
)跳转至调用
switch_in
的 jmp_buf_link
去switch_in
的
jmp_buf_link
(也肯定是
g_current_context
),跳转至 link
指向的
ucontext(也就是其来源)然后要区分第一次 switch in/最后一次 switch out 以及中间所有的 switch in/switch out 这几种情况要做的事情的异同:
jmpbuf
),然后 setcontext
跳进去执行longjmp
跳转到
link
指向的 ucontext 中设置的返回点去执行,而不用在当前
ucontext 中设置检查点,因为我们知道不会再跳回来了longjmp
initial_swith_in
和 final_switch_out
是配合使用的,同理 swith_in
和 switch_out
也是如此了解了 seastar 是如何组织 ucontext 以及如何进行上下文切换之后,我们可以来看看它是如何处理等待 future 的,比如下面这段代码:
::async([]() {
ssusing namespace std::chrono_literals;
::sleep(2s).wait();
ss... /* C1 */
});
这里直接调用 wait()
,由于 ss::sleep(2s)
的结果肯定是一个 unavailable future,所以 wait
必定会走到
do_wait
方法:
void internal::future_base::do_wait() noexcept {
auto thread = thread_impl::get();
assert(thread);
thread_wake_task wake_task{thread};
wake_task.make_backtrace();
_promise->_task = &wake_task;
thread_impl::switch_out(thread);
... /* C2 */
}
首先 thread_impl::get()
取出
g_current_context
,这个变量对应着现时正在执行着的
thread,后面的 assert
也说明了 wait()
方法只能在 seastar::thread
环境中调用,否则
assert
直接就会 coredump。
然后创建了一个 thread_wait_task
作为当前 future(即
ss::sleep(2s)
创建出来的 future) 的
continuation,这里直接创建了一个栈变量,然后
switch_out
,但是不用担心其生命周期,它使用的是
seastar::async
创建的 ucontext
中的栈,所以只要 seastar::async
没有结束,就不会被释放(seastar::async
自然会在其内部操作都完成之后才结束)
switch_out
切换到最近一次执行 switch_in
的地方,对于上面这段代码,它是通过 initial_switch_in
切换进来的,所以肯定也是回到该处;随后该函数返回,整个
seastar::thread
的构造函数也结束并返回,此时
thread_context::main()
还没有执行完,_done
还是 unavailable 的,所以在 seastar::async
的剩余的逻辑中 w.th.join()
调用返回的 future 也是 unavailable 的。
那么什么时候会继续执行用户函数剩余的逻辑呢?当 sleep(2s)
完成,会执行其 continuation,也就是在 do_wait()
中设置的
thread_wait_task
,在其中会继续跳入该 thread 继续执行:
class thread_wake_task final : public task {
* _thread;
thread_contextpublic:
(thread_context* thread) noexcept : _thread(thread) {}
thread_wake_taskvirtual void run_and_dispose() noexcept override {
::switch_in(_thread);
thread_impl}
virtual task* waiting_task() noexcept override {
return _thread->waiting_task();
}
};
所以 switch_in()
首先会跳入 do_wait()
函数,此时的效果就相当于 thread_impl::switch_out(thread)
调用返回,然后执行后面的语句(也就是
C2
),不过这里实际上是空的,所以 do_wait()
返回,整个 ss::sleep(2s).wait()
结束,继续执行其后的语句(也就是 C1
)
当整个用户函数执行完毕,控制流会走向哪里呢?这一点我之前很迷惑,毕竟后面似乎没有再调用过
switch_out
了,但是随后发现发现我忘了还有
final_switch_out()
这个调用没有执行!是的,由于用户函数是通过
thread_context::main()
调用进来的,所以
_func()
结束后它肯定会返回
thread_context::main()
继续往下执行,而后最终执行到
final_switch_out
,这个时候切换回去的也是最近的一次
swtich_in
的地方,也就是
thread::wake_task::run_and_dispose()
至此, run_and_dispose()
方法结束,而且先前
thread_context::main()
中也 resolve 了 _done
关联的 future,从而会将其 continuation 放入 reactor 的 task queue
中接下来执行… 最终整个 seastar::async()
调用结束
确实 seastar thread 中程序控制流的切换还挺复杂的,而且上面只有一次
wait()
(当然实际上一次和多次其实区别并不大),而且光看文字也比较硬,所以还是举个栗子,首先我在
jmp_buf_link
的所有上下文切换函数、以及
future::do_wait()
函数中的开头和结尾都加一句打印,然后执行下面这段函数:
return ss::async([]() {
std::cout << "Hi, seastar thread!" << std::endl;
for (int i = 0; i < 2; i++) {
using namespace std::chrono_literals;
::sleep(2s).wait();
ssstd::cout << i << "-th wakeup, now: "
<< std::chrono::steady_clock::now().time_since_epoch() /
std::chrono::seconds(1)
<< std::endl;
}
});
这段函数里面执行了两次
wait()
,最终执行得到的打印输出:
initial_switch_in() begin
Hi, seastar thread!
future do_wait() begin
switch_out() begin
initial_switch_in() done
construct thread done...
switch_in() begin
switch_out() done
future do_wait() end
0-th wakeup, now: 2406298
future do_wait() begin
switch_out() begin
switch_in() done
switch_in() begin
switch_out() done
future do_wait() end
1-th wakeup, now: 2406300
final_switch_out() begin
switch_in() done thread join() done...
这样就可以很清晰地看出程序控制流是如何转移的了。