Seastar: 用户线程
系列: Seastar · 第 7 篇 / 共 11 篇
Preface
通常我们在 seastar 中写代码是不能阻塞的,如果我们希望在一段异步代码之后执行完毕之后开始执行另外一个逻辑,那么我们就需要通过 continuation 的方式将其串联起来,比如:
seastar::sleep(2s).then([]() {
/* another code piece */
})而不能直接调用 future::get() 或者 future::wait():
seastar::sleep(2s).wait();
/* another code piece */以上代码在 wait() 的过程中就会 core dump。
但是 seastar 提供了 seastar::thread 这个工具,通过它,我们可以正常执行可能导致阻塞(即 future::wait())的代码:
ss::thread th([]() {
using namespace std::chrono_literals;
ss::sleep(2s).wait();
std::cout << "Hello, seastar thread!" << std::endl;
});
return ss::do_with(std::move(th), [](seastar::thread &th) {
return th.join();
});实际上其内部并没有阻塞,但是却产生了以同步方式写异步代码的效果(通过 ucontext),有点 async/await 的感觉;这个工具在单测中用的非常普遍,而且像 scylla 和 redpanda 的 main 函数都是通过它启动的。
seastar::async
虽然我们可以像上面一样直接使用 seastar::thread,但是却要处理等待 thread 结束(join())、生命周期管理等问题,所以 seastar 更推荐我们使用 seastar::async 这个函数,比如之前的代码通过该函数可以改写成这样:
return ss::async([]() {
using namespace std::chrono_literals;
ss::sleep(2s).wait();
std::cout << "Hello, seastar!" << std::endl;
});简洁明了,看看 seastar::async 的具体实现:
template <typename Func, typename... Args>
inline
futurize_t<std::invoke_result_t<Func, Args...>>
async(thread_attributes attr, Func&& func, Args&&... args) noexcept {
using return_type = std::invoke_result_t<Func, Args...>;
struct work {
thread_attributes attr;
Func func;
std::tuple<Args...> args;
promise<return_type> pr;
thread th;
};
try {
auto wp = std::make_unique<work>(work{std::move(attr), std::forward<Func>(func), std::forward_as_tuple(std::forward<Args>(args)...)});
auto& w = *wp;
auto ret = w.pr.get_future();
w.th = thread(std::move(w.attr), [&w] {
futurize<return_type>::apply(std::move(w.func), std::move(w.args)).forward_to(std::move(w.pr));
});
return w.th.join().then([ret = std::move(ret)] () mutable {
return std::move(ret);
}).finally([wp = std::move(wp)] {});
} catch (...) {
return futurize<return_type>::make_exception_future(std::current_exception());
}
}其中 work 结构是 seastar::thread 运行过程中使用到的上下文,各个字段的具体含义为:
attr:创建seastar::thread时可以自定义的一些属性,和pthread_attr_t类似的func和args:用户传给seastar::async的函数以及参数(称为用户函数)pr:最终用户函数执行的结果就通过这个 promise(关联的 future)获取th:即seastar::thread结构
首先创建一个 work 实例然后初始化其结构,在初始化创建 thread 时需要传入欲执行的函数,但是并没有直接将用户提供的函数传进去,而是通过 futurize_apply 包了一层,从而可以捕获其可能产生的异常并且将该函数的返回值(以及可能产生的异常)转化为一个 future;更为重要的是,还通过 forward_to 将该 future 包含的数据转发到 work->pr 中去,而 work->pr 关联的 future 将会被作为整个 seastar::async 函数的返回值,这样我们就可以通过 seastar::async 获取到用户函数的返回值:当用户函数结束时,seastar::async 调用也会随之 resolve
和 seastar 中其他地方一样,这里也使用了 unique_ptr + finally + std::move 的方式来保证 wp 会在 async 函数结束之后才被释放(以及会被释放)。
thread && thread context
thread 结构非常简单:
std::unique_ptr<thread_context> _context;
static thread_local thread* _current;类变量 _current 表示当前线程中正在执行的 seastar thread,而 thread 的所有状态数据都被放在了 thread_context 中:
class thread_context final : private task {
stack_holder _stack;
noncopyable_function<void ()> _func;
jmp_buf_link _context;
promise<> _done;
bool _joined = false;
boost::intrusive::list_member_hook<> _all_link;
using all_thread_list = boost::intrusive::list<thread_context,
boost::intrusive::member_hook<thread_context, boost::intrusive::list_member_hook<>,
&thread_context::_all_link>,
boost::intrusive::constant_time_size<false>>;
static thread_local all_thread_list _all_threads;其中 _all_threads 链表用于组织所有的 thread,主要用于调试1;_stack 就是该 thread 使用的栈,暂且不用理会 stack_holder 是什么,知道栈其实就是一个字符数组而已就行;_func 则是用户传入的函数——外加一层 futurize_apply 的包装
在 seastar::async 函数中,可以注意到其实一共用到了两个 future:其中一个是 w.th.join() 产生的,另一个则是用来将用户函数的执行结果传递给 seastar::async() 的调用方的 ret。后者的用途很容易理解,就是用来串联 continuation 并将用户函数的执行结果进行传递,那么 thread::join() 是作何用的呢?
大致可以和 std::thread 进行类比,std::thread 在析构时,要么处于 detach 的状态,要么是 join 的状态,否则会执行 std::terminate 终止整个程序,seastar::thread 也是类似的:
thread::~thread() { assert(!_context || _context->_joined); }而 join() 其实就是等待 seastar::thread 执行完毕。
启动 seastar thread
启动一个 seastar thread 很简单: 只需要创建一个 thread_context,其主要逻辑又集中在 setup 方法中:
void
thread_context::setup(size_t stack_size) {
ucontext_t initial_context;
auto q = uint64_t(reinterpret_cast<uintptr_t>(this));
auto main = reinterpret_cast<void (*)()>(&thread_context::s_main);
auto r = getcontext(&initial_context);
throw_system_error_on(r == -1);
initial_context.uc_stack.ss_sp = _stack.get();
initial_context.uc_stack.ss_size = stack_size;
initial_context.uc_link = nullptr;
makecontext(&initial_context, main, 2, int(q), int(q >> 32));
_context.thread = this;
_context.initial_switch_in(&initial_context, _stack.get(), stack_size);
}其实就是标准的 ucontext 初始化流程,最终 initial_switch_in 就会跳进去执行用户函数——经过两步:第一步 thread_context::s_main 解析 int 参数,然后调用 thread_context::main() 函数:
void
thread_context::main() {
_context.initial_switch_in_completed();
if (group() != current_scheduling_group()) {
yield();
}
try {
_func();
_done.set_value();
} catch (...) {
_done.set_exception(std::current_exception());
}
_context.final_switch_out();
}先不看 scheduling group 相关逻辑,剩余的代码也很简单,如果用户函数中没有执行阻塞操作(future::wait()),那么就一路执行,直到最后 final_switch_out 跳回去,这里会跳回调用 initial_switch_in 的地方,此时 initial_switch_in 调用返回并继续执行后续的逻辑,对于 seastar::async() 函数,也就是从 thread 的创建之后的逻辑开始继续执行。
但是我们知道 seastar::async 最大的用途就是执行阻塞式用户代码,那么如果在 _func() 中执行了等待 future 的操作,seastar 是怎么处理的呢?
ucontext 与长跳转
在了解 seastar 是如何处理等待 future 的逻辑之前,先要知道它其实用的还是 ucontext 这一套东西,在 thread_context::main() 中已经见识到了一些(initial_switch_in 和 final_switch_out),但是还是有必要来完整地看看 seastar 是如何使用 ucontext 进行上下文切换的。
seastar 将所有上下文切换的逻辑都封装在了 jmp_buf_link 类中,不过可能与我们想象中的有所不同,seastar 中提供了两套上下文切换的方案2:
- 单纯使用 ucontext API,比较慢,但是和 AddressSanitizer 配合的更好
- 混合使用了 ucontext API 以及长跳转两种方式的控制流转移能力,更快,但是 AddressSanitizer 没法合长跳转配合使用
ASan 是一个内存错误检测器。它主要用于检测一些常见的内存错误,不如 use-after-free、memory-leak…,但是在上下文切换时它可能产生误报,不过它也提供了机制让使用者在进行上下文切换时通知它3,从而减少误报的概率;但是目前它只支持和 ucontext 上下文切换使用,而不支持 longjmp。
所以如果使用了 ASan,那么就只使用 ucontext API 进行上下文切换;否则的话则混合使用 ucontext + longjmp:第一次进入 uthread 的时候使用 setcontext 切换进去(因为只有 setcontext 调用才能为该 uthread 设置其栈),后续切换进去或者切换出来都使用长跳转,因为 ucontext API 涉及到系统调用,而长跳转则只是单纯的库函数,所以性能上会好很多4。
由于不熟悉 ASan,所以目前只看第二种方案;将不必要的字段移除, jmp_buf_link 结构也就很简单了:
struct jmp_buf_link {
jmp_buf jmpbuf;
jmp_buf_link* link;
thread_context* thread;
};jmp_buf是长跳转本身所需要的link用于记录是从哪一个 ucontext 跳过来的,便于跳回去
thread_local jmp_buf_link g_unthreaded_context;
thread_local jmp_buf_link* g_current_context;
inline void jmp_buf_link::initial_switch_in(ucontext_t* initial_context, const void*, size_t) {
auto prev = std::exchange(g_current_context, this);
link = prev;
if (setjmp(prev->jmpbuf) == 0) {
setcontext(initial_context);
}
}
inline void jmp_buf_link::switch_in() {
auto prev = std::exchange(g_current_context, this);
link = prev;
if (setjmp(prev->jmpbuf) == 0) {
longjmp(jmpbuf, 1);
}
}
inline void jmp_buf_link::switch_out() {
g_current_context = link;
if (setjmp(jmpbuf) == 0) {
longjmp(g_current_context->jmpbuf, 1);
}
}
inline void jmp_buf_link::initial_switch_in_completed(){ /* void */ }
inline void jmp_buf_link::final_switch_out() {
g_current_context = link;
longjmp(g_current_context->jmpbuf, 1);
}这里有俩 thread_local 的全局变量,其中 g_current_context 始终指向的是当前正在执行着的 seastar thread jmp_buf_link,g_unthreaded_context 是一个占位符,主要是为了方便 g_current_context 的操作:一开始它就指向 g_unthreaded_context,这样这个指针就始终不为空,也就不用对它做特殊处理了
在看这些方法之前,首先需要去理解 switch in 和 switch out 两个操作的具体含义:
- switch in 操作是从当前正在执行的 ucontext(
g_current_context)跳转至调用switch_in的jmp_buf_link去 - switch out 操作是从调用
switch_in的jmp_buf_link(也肯定是g_current_context),跳转至link指向的 ucontext(也就是其来源)
然后要区分第一次 switch in/最后一次 switch out 以及中间所有的 switch in/switch out 这几种情况要做的事情的异同:
- 第一次 switch in 时,先在当前 ucontext 中设置好返回点(
jmpbuf),然后setcontext跳进去执行 - 最后一次 switch out 时,直接
longjmp跳转到link指向的 ucontext 中设置的返回点去执行,而不用在当前 ucontext 中设置检查点,因为我们知道不会再跳回来了 - 中间的 switch in/switch out,都需要在当前 ucontext 中设置好返回点然后
longjmp initial_swith_in和final_switch_out是配合使用的,同理swith_in和switch_out也是如此
在 seastar thread 中 wait future
了解了 seastar 是如何组织 ucontext 以及如何进行上下文切换之后,我们可以来看看它是如何处理等待 future 的,比如下面这段代码:
ss::async([]() {
using namespace std::chrono_literals;
ss::sleep(2s).wait();
... /* C1 */
});这里直接调用 wait(),由于 ss::sleep(2s) 的结果肯定是一个 unavailable future,所以 wait 必定会走到 do_wait 方法:
void internal::future_base::do_wait() noexcept {
auto thread = thread_impl::get();
assert(thread);
thread_wake_task wake_task{thread};
wake_task.make_backtrace();
_promise->_task = &wake_task;
thread_impl::switch_out(thread);
... /* C2 */
}首先 thread_impl::get() 取出 g_current_context,这个变量对应着现时正在执行着的 thread,后面的 assert 也说明了 wait() 方法只能在 seastar::thread 环境中调用,否则 assert 直接就会 coredump。
然后创建了一个 thread_wait_task 作为当前 future(即 ss::sleep(2s) 创建出来的 future) 的 continuation,这里直接创建了一个栈变量,然后 switch_out,但是不用担心其生命周期,它使用的是 seastar::async 创建的 ucontext 中的栈,所以只要 seastar::async 没有结束,就不会被释放(seastar::async 自然会在其内部操作都完成之后才结束)
switch_out 切换到最近一次执行 switch_in 的地方,对于上面这段代码,它是通过 initial_switch_in 切换进来的,所以肯定也是回到该处;随后该函数返回,整个 seastar::thread 的构造函数也结束并返回,此时 thread_context::main() 还没有执行完,_done 还是 unavailable 的,所以在 seastar::async 的剩余的逻辑中 w.th.join() 调用返回的 future 也是 unavailable 的。
那么什么时候会继续执行用户函数剩余的逻辑呢?当 sleep(2s) 完成,会执行其 continuation,也就是在 do_wait() 中设置的 thread_wait_task,在其中会继续跳入该 thread 继续执行:
class thread_wake_task final : public task {
thread_context* _thread;
public:
thread_wake_task(thread_context* thread) noexcept : _thread(thread) {}
virtual void run_and_dispose() noexcept override {
thread_impl::switch_in(_thread);
}
virtual task* waiting_task() noexcept override {
return _thread->waiting_task();
}
};所以 switch_in() 首先会跳入 do_wait() 函数,此时的效果就相当于 thread_impl::switch_out(thread) 调用返回,然后执行后面的语句(也就是 C2),不过这里实际上是空的,所以 do_wait() 返回,整个 ss::sleep(2s).wait() 结束,继续执行其后的语句(也就是 C1)
当整个用户函数执行完毕,控制流会走向哪里呢?这一点我之前很迷惑,毕竟后面似乎没有再调用过 switch_out 了,但是随后发现发现我忘了还有 final_switch_out() 这个调用没有执行!是的,由于用户函数是通过 thread_context::main() 调用进来的,所以 _func() 结束后它肯定会返回 thread_context::main() 继续往下执行,而后最终执行到 final_switch_out,这个时候切换回去的也是最近的一次 swtich_in 的地方,也就是 thread::wake_task::run_and_dispose()
至此, run_and_dispose() 方法结束,而且先前 thread_context::main() 中也 resolve 了 _done 关联的 future,从而会将其 continuation 放入 reactor 的 task queue 中接下来执行… 最终整个 seastar::async() 调用结束
一个 🌰
确实 seastar thread 中程序控制流的切换还挺复杂的,而且上面只有一次 wait()(当然实际上一次和多次其实区别并不大),而且光看文字也比较硬,所以还是举个栗子,首先我在 jmp_buf_link 的所有上下文切换函数、以及 future::do_wait() 函数中的开头和结尾都加一句打印,然后执行下面这段函数:
return ss::async([]() {
std::cout << "Hi, seastar thread!" << std::endl;
for (int i = 0; i < 2; i++) {
using namespace std::chrono_literals;
ss::sleep(2s).wait();
std::cout << i << "-th wakeup, now: "
<< std::chrono::steady_clock::now().time_since_epoch() /
std::chrono::seconds(1)
<< std::endl;
}
});这段函数里面执行了两次 wait(),最终执行得到的打印输出:
initial_switch_in() begin
Hi, seastar thread!
future do_wait() begin
switch_out() begin
initial_switch_in() done
construct thread done...
switch_in() begin
switch_out() done
future do_wait() end
0-th wakeup, now: 2406298
future do_wait() begin
switch_out() begin
switch_in() done
switch_in() begin
switch_out() done
future do_wait() end
1-th wakeup, now: 2406300
final_switch_out() begin
switch_in() done
thread join() done...这样就可以很清晰地看出程序控制流是如何转移的了。