Seastar: semaphore

balus

2022-05-04

Preface

最近在实现 fetch 接口时, 需要对单个 JS 脚本的单次执行过程中可以发起的 fetch 操作数进行限制(和 CloudFlare Workers 类似):

第一点很简单, 用一个计数器即可解决; 关键在于第二个, 要将超出了并发限制的 fetch 操作阻塞住, 直到有其他 fetch 操作结束(即返回的 promise 被 resolve)再唤醒它们执行.

这个需求听起来似乎很熟悉?可以换一种说法,将「同一个时刻可运行的 fetch 请求」视为一种资源,这个资源是可以重复利用的(这一点是并发限制和总量限制的主要区别):发起一个 fetch 请求相当于拿走一份资源,结束一个 fetch 请求相当于归还一份资源;且这个资源并非无限,所以当资源被拿完之后再想拿就等着直到有人归还资源。

说到这里就很清晰了,这个问题就是信号量(semaphore)1要解决的:

In computer science, a semaphore is a variable or abstract data type used to control access to a common resource by multiple threads and avoid critical section problems in a concurrent system such as a multitasking operating system.

使用场景

上面虽然提到了多线程, 但是其实信号量的本质是一种计数型资源 —— 资源不仅仅在多线程下才会产生争抢, 考虑如下场景:

static int data = 0;

future<> modify_data(int d) {
  return do_something().then([d]() {
    data = d;
  });
}

(void) modify_data(3);
(void) modify_data(7);

do_something 是一个异步操作, 该异步操作结束后中会修改一个全局变量 data; 然后连续调用了两次 modify_data, 注意这两次调用并没有使用 then 操作串联起来,所以他俩是独立的两个 fiber, 并没有先后依赖关系,因为我们不知道哪个异步操作会先完成, 所以上二者会交织(interleave)执行, 导致最终 data 的值不确定; 其实线程也是类似的, 不加锁的话线程之间同样会交织执行——只是和 fiber 相比, 两者执行流交织的粒度有所不同: fiber 之间的交织是以 continuation 为最小单元, 而 thread 之间的交织则是以 CPU 指令为最小单元

Seastar 为单线程的异步场景设计了 seastar::semaphore 信号量机制用于控制用户对资源的并发访问, 并提供了 future-style API; 下面用它来限制同一时刻只有一个操作可以访问 data

static int data = 0;
static seastar::semaphore sema{1};

future<> modify_data(int d) {
    return seastar::with_semaphore(sema, 1, [d]() {
        return do_something().then([d]() {
            data = d;
        });
    });
}

(void) modify_data(3);
(void) modify_data(7);

这里我们创建了一个初始大小为 1 的 semaphore,这表示它所管理的资源数为 1;通过 seastar::with_semaphore 执行原来的 fiber,这个函数会从 sema 中取出一个资源,结束时释放;所以当调用 modify_data(7) 时如果前一个 fiber 没有结束,此时 sema 持有的资源数为 0,那么 with_semaphore() 就会等待;这样就保证了同时只有一个 modify_data fiber 在执行;以上代码的结果也是确定的:最终 data 的值为 7

以上代码中 seastar::semaphore 用于限制同时可以执行的 fiber 的实例个数;除此之外,由于 semaphore 可以管理任意多个资源(实际上是 int64_t),所以我们还可以用它来限制其他资源。

比如我们有一个函数 using_lots_of_memory 会使用较多内存,所以肯定不希望它无限制地执行;下面用 seastar::semaphore 来限制这个函数最多只能使用 1MB 内存,超过了再调用它也不会立即执行,而是等待该函数的其他调用实例结束并释放资源:

seastar::future<> using_lots_of_memory(size_t bytes) {
    static thread_local seastar::semaphore limit(1000000); // limit to 1MB
    return seastar::with_semaphore(limit, bytes, [bytes] {
        // do something allocating 'bytes' bytes of memory
    });
}

Semaphore Internals

seastar::semaphore 的实现非常简洁:

    ssize_t _count;
    std::exception_ptr _ex;
    
    struct entry {
        promise<> pr;
        size_t nr;
        std::optional<abort_on_expiry<clock>> timer;
    };
    internal::abortable_fifo<entry, expiry_handler> _wait_list;

是的,就是这么简单;其中 _count 表示该 semaphore 所管理的资源总数,_ex 表示该 semaphore broken 时的异常,_wait_list 则保存着所有的 waiter。

最开始的使用实例中我们已经看到了,seastar::semaphore 的使用很简单:

seastar::semaphore sema{7};

这样就创建了一个可用资源数为 7 的 semaphore;其实 seastar::semaphore 只是一个类型别名,真正的类型是 basic_semaphore

template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
class basic_semaphore : private ExceptionFactory;

其中 ExceptionFactory 用于自定义异常,Clock 用于自定义时间精度——没错,seastar::semaphore 也支持指定最长等待时间;除了常用的 seastar::semaphore 之外,还有另外一个预定义的 semaphose:

using named_semaphore = basic_semaphore<named_semaphore_exception_factory>;

这是一个具名信号量,这种信号量在等待超时、或者 semaphore broken 时抛出来的异常会带有一个名字用于指明是哪一个信号量。这个套路和 std::string 以及 baisc_string 相同,我们一般用默认的 std::string 就足够了,但是这种做法无疑给我们提供了更多扩展能力

Semaphore 最初被 E.W.Dijkstra 提出时它包含两个操作:P 和 V2,其中 P 表示取出资源,而 V 表示放回资源;同样 seastar::semaphore 也提供了这两个操作,分别命名为 waitsignal,比如前面 modify_data 的例子用 waitsignal 我们可以改成这样:

sema.wait(1).then([d]() {
    return do_something().then([d]() {
        data = d; 
    }).finally([]() {
        sema.signal(1);
    });
})

获取资源

seastar::semaphorewait 来表示 P 操作,即获取资源;

    future<> wait(time_point timeout, size_t nr = 1) noexcept {
        if (may_proceed(nr)) {
            _count -= nr;
            return make_ready_future<>();
        }
        ...

nr 表示本次请求需要的资源数目——是的,不一定一次只能哪一个资源(回顾之前的内存的例子);首先看看当前剩余的资源是否可以满足该请求,如果可以的话,直接返回一个 ready future

        ...
        if (_ex) {
            return make_exception_future(_ex);
        }
        try {
            entry& e = _wait_list.emplace_back(promise<>(), nr);
            auto f = e.pr.get_future();
            if (timeout != time_point::max()) {
                e.timer.emplace(timeout);
                abort_source& as = e.timer->abort_source();
                _wait_list.make_back_abortable(as);
            }
            return f;
        } catch (...) {
            return make_exception_future(std::current_exception());
        }
    }

否则的话,则先检查该 semaphore 是否有异常,seastar::semaphore 中的异常通常用于关闭场景,比如在 Web Server 中我们可以用它来控制最多可以 accept 的连接数,当 Server 关闭时我们可以调用该 semaphore 的 broken 方法产生一个异常;所以这样的异常表示该 semaphore 不再可用,这种情况下也直接返回一个 exceptional future;

否则的话,我们就需要等待,这里又是常见的套路:创建一个 promise 保存起来,返回该 promise 关联的 future,并用该 future 串联后续要执行的操作,这样只有该 future 被 resolve,后续的操作才会真正发起;

但是稍微有一点不太一样,seastar::semaphore 支持 timed-wait,调用方可以指定最长愿意等待多长时间,超过这个时间就就不能继续等了;所以如果传入的 timeout 是一个有效值,说明用户的确指定了超时值,此时需要该 waiter entry 关联一个 timer,暂时不用管 abortable_fifoexpiry_handler,现在只需要知道,当该 timer 超时,就会通过 entry 中的 promise 往关联的 future 中写入一个 timedout exception 从而激活它。

此外,seastar::semaphore 还提供了非阻塞版本的 try_wait:

    bool try_wait(size_t nr = 1) noexcept {
        if (may_proceed(nr)) {
            _count -= nr;
            return true;
        } else {
            return false;
        }
    }

只是简单地检查一下剩余资源是否可以立即满足请求,并不会在资源不足时创建 waiter——而是返回 -1 告诉调用方资源不足。

释放资源

wait 正好相反,signal 用于将资源归还给 semaphore:

    void signal(size_t nr = 1) noexcept {
        if (_ex) {
            return;
        }
        _count += nr;
        while (!_wait_list.empty() && has_available_units(_wait_list.front().nr)) {
            auto& x = _wait_list.front();
            _count -= x.nr;
            x.pr.set_value();
            _wait_list.pop_front();
        }
    }

首先看看该 semaphore 是否还有效,无效的话说明也不用做;否则的话递增计数器,然后处理等待队列;队列中的 waiter 是按 FIFO 顺序去处理的,这一点很重要,并且在 wait 中也对这一点做了保障——就在 may_proceed 方法中:

    bool has_available_units(size_t nr) const noexcept {
        return _count >= 0 && (static_cast<size_t>(_count) >= nr);
    }
    bool may_proceed(size_t nr) const noexcept {
        return has_available_units(nr) && _wait_list.empty();
    }

它不仅仅检查剩余资源数目是否足够,还会检查等待队列上是否有其他 waiter 已经在等待;少了第二点,FIFO 的顺序就无法保证:有的请求需要的资源数目多,那么它就更有可能被其他需要资源数目少的请求给抢占,从而得不到执行机会以至于被饿死(starve)。

关闭 Semaphore

前面已经提到过,seastar::semaphore 是可以被关闭的——通过 broken 方法:

template<typename ExceptionFactory, typename Clock>
inline void
basic_semaphore<ExceptionFactory, Clock>::broken(std::exception_ptr xp) noexcept {
    static_assert(std::is_nothrow_copy_constructible_v<std::exception_ptr>);
    _ex = xp;
    _count = 0;
    while (!_wait_list.empty()) {
        auto& x = _wait_list.front();
        x.pr.set_exception(xp);
        _wait_list.pop_front();
    }
}

传入一个异常表示关闭原因,然后 borken 会将可用资源数设置为 0 并以异常的方式激活等待队列中所有的 waiter——所以我们可以在 signal 时发现 semaphore 已经 broken 时什么都不做,而不用担心 waiter 得不到调度执行。

Semaphore 与 RAII

通过前面的例子可以发现,使用 seastar::semaphore 我们需要搭配 waitsignal,毫无疑问这会给我们带来一些心智负担,一不小心可能就忘记了调用 signal——可能直接就是忘记了调用,也可能是调用了但是没有得到执行;比如:

为了异常安全,在 C++ 中一般不建议将资源的获取与释放分开在两个函数中进行,而是应该使用 RAII 机制,seastar 基于此提供了 with_semaphore 这个 utility:

template <typename ExceptionFactory, typename Clock, typename Func>
inline
futurize_t<std::invoke_result_t<Func>>
with_semaphore(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, typename basic_semaphore<ExceptionFactory, Clock>::duration timeout, Func&& func) noexcept {
    return get_units(sem, units, timeout).then([func = std::forward<Func>(func)] (auto units) mutable {
        return futurize_invoke(std::forward<Func>(func)).finally([units = std::move(units)] {});
    });
}

这个函数接受一个 seastar::semaphore,一个要执行的函数,以及 unites 表示执行该函数需要消耗的资源数,他会自动从 semaphore 中获取资源(可能等待),然后调用函数并保证会释放资源

首先通过 get_units 获取资源(可能等待),之后创建一个 semaphore_unites,也就是说这个 semaphore_units 持有这份资源,然后 futurize_invoke 执行该函数——并处理其中可能抛出的异常(捕获异常将其转换为一个 exceptional future 返回给调用方);最后在 finally 中释放 semaphore_units 从而将资源归还给 seastar::semaphore

Semaphore Units

with_semaphore 中已经看到了 semaphore_unites 的使用,它是资源的持有者:

template<typename ExceptionFactory = semaphore_default_exception_factory, typename Clock = typename timer<>::clock>
class semaphore_units {
    basic_semaphore<ExceptionFactory, Clock>* _sem;
    size_t _n;

其中 _n 表示它所持有的资源个数,_sem 则是对该资源所属的 semaphore 的引用;创建这样一个对象并不会导致从 semaphore 中取出资源——我们必须先取出资源,然后才能创建这样一个对象,就像 get_units 函数所做的那样:

template<typename ExceptionFactory, typename Clock>
future<semaphore_units<ExceptionFactory, Clock>>
get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, typename basic_semaphore<ExceptionFactory, Clock>::duration timeout) noexcept {
    return sem.wait(timeout, units).then([&sem, units] {
        return semaphore_units<ExceptionFactory, Clock>{ sem, units };
    });
}

所以我们一般不会直接创建 semaphore_units 对象,而是通过 get_units,该对象析构时,其所持有的资源将会被归还:

    ~semaphore_units() noexcept {
        return_all();
    }
    void return_all() noexcept {
        if (_n) {
            _sem->signal(_n);
            _n = 0;
        }
    }

就这两点来看,一般情况下不会直接和 semaphore_units 打交道,甚至也感受不到它的存在:通过 get_units 获取资源并创建 semaphore_unites,通过其析构函数自动释放资源——而这也被 with_semaphore 给封装好了。

但是 semaphore_units 还提供了其他方法,其中一个比较有用的可能是 split

    semaphore_units split(size_t units) {
        if (units > _n) {
            throw std::invalid_argument("Cannot take more units than those protected by the semaphore");
        }
        _n -= units;
        return semaphore_units(_sem, units);
    }

将一个 units 一分为二,这一点在预分配资源的情况下应该比较有用。

Reference


  1. https://www.wikiwand.com/en/Semaphore_(programming)↩︎

  2. https://cs.nyu.edu/~yap/classes/os/resources/origin_of_PV.html↩︎