Seastar: 系统调用线程

balus

2022-04-18

Preface

在 Seastar 中,文件相关的操作并不直接由 reactor 线程(工作线程)执行,而是被 offload 到一个专门的线程中去执行,为此 Seastar 在 reactor 中提供了诸多文件操作相关的方法;比如重命名一个文件就可以使用 rename_file 方法:

static ss::future<> f() {
    return ss::engine()
        .rename_file("hello.txt", "world.txt")
        .then_wrapped([](ss::future<> &&fut) {
            if (fut.failed()) {
                fmt::print("rename file failed: {}", fut.get_exception());
            }
            return ss::make_ready_future<>();
        });
}

所有这些文件操作相关的函数都返回 future

源码剖析

每个 reactor 中都有一个类型为 thread_pool 的变量,但是我觉得有点名不副实,因为它并不是一个 pool,而只是一个单独的线程——仅仅用于执行文件相关的 syscall,所以叫 syscall thread 或许会更好一些;不过这些都是细枝末节,先看看 rename_file 的实现:

future<>
reactor::rename_file(std::string_view old_pathname, std::string_view new_pathname) noexcept {
    // Allocating memory for a sstring can throw, hence the futurize_invoke
    return futurize_invoke([old_pathname, new_pathname] {
        return engine()._thread_pool->submit<syscall_result<int>>([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] {
            return wrap_syscall<int>(::rename(old_pathname.c_str(), new_pathname.c_str()));
        }).then([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] (syscall_result<int> sr) {
            sr.throw_fs_exception_if_error("rename failed",  old_pathname, new_pathname);
            return make_ready_future<>();
        });
    });
}

逻辑非常简单,就是往 syscall thread 中提交一个任务,该任务也不过是简单地调用了一下 rename1 这个系统调用;如果该调用出错,则抛出异常——当然被 futurize_invoke 捕获并转换为一个 exceptional future 返回给调用方。

现在还没有看到 thread_pool 的实现,不过在此之前我们可以先想象一下它的大致实现——依靠我们对 Seastar 核间通信机制2的了解,毕竟他俩都是线程之间的通信:

class thread_pool {
    reactor* _reactor;
    uint64_t _aio_threaded_fallbacks = 0;
    syscall_work_queue inter_thread_wq;
    posix_thread _worker_thread;
    std::atomic<bool> _stopped = { false };
    std::atomic<bool> _main_thread_idle = { false };
};

目前我们只需要理解 inter_thread_wq_worker_thread 这俩成员,前者自然是任务队列,后者则是一个线程实体

class syscall_work_queue {
    static constexpr size_t queue_length = 128;
    struct work_item;
    using lf_queue = boost::lockfree::spsc_queue<work_item*,
                            boost::lockfree::capacity<queue_length>>;
    lf_queue _pending;
    lf_queue _completed;
    writeable_eventfd _start_eventfd;
    semaphore _queue_has_room = { queue_length };
};

_pending_completed 分别是提交队列与完成队列,_queue_has_room 是一个 seastar::semaphore,主要是限制向 syscall thread 提交任务的数目,毕竟线程的执行能力是有限的;最后一个是 _start_eventfd,是一个 eventfd3,用于通知 syscall thread 有任务到来

提交任务

整个任务的执行分为三部分:

其中第一步和第三步都是由 reactor 发起;这里不打算讲

void syscall_work_queue::submit_item(std::unique_ptr<syscall_work_queue::work_item> item) {
    (void)_queue_has_room.wait().then_wrapped([this, item = std::move(item)] (future<> f) mutable {
        // propagate wait failure via work_item
        if (f.failed()) {
            item->set_exception(f.get_exception());
            return;
        }
        _pending.push(item.release());
        _start_eventfd.signal(1);
    });
}

非常简单,首先根据 reactor 提交的 callable object 构建一个 work_item 插入到 task queue 的末尾;和前面提到的一样,这里使用了 semaphore 来限制提交的数目;然后往 eventfd 中写入 1 通知 syscall thread

工作循环

在构建 posix_thread 对象时需要传入一个函数传给 pthread_craete 创建线程;对于 syscall thread,这个函数就是它的工作循环:

void thread_pool::work(sstring name) {
    pthread_setname_np(pthread_self(), name.c_str());
    sigset_t mask;
    sigfillset(&mask);
    auto r = ::pthread_sigmask(SIG_BLOCK, &mask, NULL);
    throw_pthread_error(r);
    std::array<syscall_work_queue::work_item*, syscall_work_queue::queue_length> tmp_buf;
    while (true) {
        uint64_t count;
        auto r = ::read(inter_thread_wq._start_eventfd.get_read_fd(), &count, sizeof(count));
        assert(r == sizeof(count));
        if (_stopped.load(std::memory_order_relaxed)) {
            break;
        }
        auto end = tmp_buf.data();
        inter_thread_wq._pending.consume_all([&] (syscall_work_queue::work_item* wi) {
            *end++ = wi;
        });
        for (auto p = tmp_buf.data(); p != end; ++p) {
            auto wi = *p;
            wi->process();
            inter_thread_wq._completed.push(wi);
        }
        if (_main_thread_idle.load(std::memory_order_seq_cst)) {
            uint64_t one = 1;
            ::write(_reactor->_notify_eventfd.get(), &one, 8);
        }
    }
}

首先是为这个线程改名字,便于用命令行工具查看;然后屏蔽所有的信号,避免被误伤;最后开始工作循环,里面主要干了 4 件事情:

  1. 首先读取 _start_eventfd:如果 reactor 中没有任务,那么该线程会被阻塞在这个 read 操作上;否则通过读取到的数值可以确定任务队列中任务的个数(应该是至少有那么多个?)
  2. 检查 _stopped,如果为 true 则跳出循环——外部会通过该标志位让 syscall thread 退出
  3. 从 task queue 中取出任务并执行
  4. 处理 reactor 休眠的情况

收割任务

提交队列中的任务被 syscall thread 执行完后其结果会被放在完成队列中,reactor 中为此注册了 syscall poller 收割这些执行结果:

class reactor::syscall_pollfn final : public reactor::pollfn {
public:
    virtual bool poll() final override {
        return _r._thread_pool->complete();
    }
};

unsigned syscall_work_queue::complete() {
    std::array<work_item*, queue_length> tmp_buf;
    auto end = tmp_buf.data();
    auto nr = _completed.consume_all([&] (work_item* wi) {
        *end++ = wi;
    });
    for (auto p = tmp_buf.data(); p != end; ++p) {
        auto wi = *p;
        wi->complete();
        delete wi;
    }
    _queue_has_room.signal(nr);
    return nr;
}

线程休眠

由于 Seastar 以轮询代替了中断(比如各种 poller),所以当没有任务需要执行时,Seastar 会进入休眠避免无谓的 CPU 消耗,当有外部事件发生时再唤醒它继续执行(相当于从轮询模式转变至中断模式);所以每个 poller 都需要实现 try_enter_interrupt_modeexit_interrupt_mode 两个方法,当 reactor 进入休眠时会调用 enter 方法通知 poller 自己已经进入了休眠,此时 poller 不能再期待 reactor 会主动轮询它,而应该在有事件完成时唤醒 reactor;reactor 退出休眠状态时则会调用 exit 方法通知 poller 状态变更:

void enter_interrupt_mode() { _main_thread_idle.store(true, std::memory_order_seq_cst); }
void exit_interrupt_mode() { _main_thread_idle.store(false, std::memory_order_relaxed); }

当 reactor 进入休眠时,syscall thread 会记录下该状态,并且在其 run loop 中检查 _main_thread_idle 标志位,这个标志位如果设置了,说明 reactor 进入了休眠,而由于 syscall thread 只有执行了任务时才会检查该状态,说明此时有结果等待着 reactor 去收割,也就是说它有活干了,所以通过 reactor::_notify_eventfd 通知 reactor 停止休眠:通过写该 fd 产生一个可读事件,从而被 worker 的 epoll 感知到并唤醒线程继续执行其 main run loop

线程退出

当 reactor 结束时,syscall thread 也要退出;一般的方式时向它投递一个 pthread_exit() 的任务让它执行;这里也是类似的,不过并没有实际投递任务,而是将 _stopped 标记位设置为 true 并通知 syscall thread:

thread_pool::~thread_pool() {
    _stopped.store(true, std::memory_order_relaxed);
    inter_thread_wq._start_eventfd.signal(1);
    _worker_thread.join();
}

syscall thread 在其 run loop 中发现有任务之后,首先会检查 _stopped 标记并决定是否要退出,这样也达到了目的

Reference


  1. https://man7.org/linux/man-pages/man2/rename.2.html↩︎

  2. https://chenjianyong.com/blog/2022/04/seastar_inter_core_communication.html↩︎

  3. https://man7.org/linux/man-pages/man2/eventfd.2.html↩︎