2022-05-04
最近在实现 fetch 接口时, 需要对单个 JS 脚本的单次执行过程中可以发起的 fetch 操作数进行限制(和 CloudFlare Workers 类似):
第一点很简单, 用一个计数器即可解决; 关键在于第二个, 要将超出了并发限制的 fetch 操作阻塞住, 直到有其他 fetch 操作结束(即返回的 promise 被 resolve)再唤醒它们执行.
这个需求听起来似乎很熟悉?可以换一种说法,将「同一个时刻可运行的 fetch 请求」视为一种资源,这个资源是可以重复利用的(这一点是并发限制和总量限制的主要区别):发起一个 fetch 请求相当于拿走一份资源,结束一个 fetch 请求相当于归还一份资源;且这个资源并非无限,所以当资源被拿完之后再想拿就等着直到有人归还资源。
说到这里就很清晰了,这个问题就是信号量(semaphore)1要解决的:
In computer science, a semaphore is a variable or abstract data type used to control access to a common resource by multiple threads and avoid critical section problems in a concurrent system such as a multitasking operating system.
上面虽然提到了多线程, 但是其实信号量的本质是一种计数型资源 —— 资源不仅仅在多线程下才会产生争抢, 考虑如下场景:
static int data = 0;
<> modify_data(int d) {
futurereturn do_something().then([d]() {
= d;
data });
}
(void) modify_data(3);
(void) modify_data(7);
do_something
是一个异步操作,
该异步操作结束后中会修改一个全局变量 data
;
然后连续调用了两次 modify_data
, 注意这两次调用并没有使用
then
操作串联起来,所以他俩是独立的两个 fiber,
并没有先后依赖关系,因为我们不知道哪个异步操作会先完成,
所以上二者会交织(interleave)执行, 导致最终 data
的值不确定;
其实线程也是类似的, 不加锁的话线程之间同样会交织执行——只是和 fiber 相比,
两者执行流交织的粒度有所不同: fiber 之间的交织是以 continuation
为最小单元, 而 thread 之间的交织则是以 CPU 指令为最小单元
Seastar 为单线程的异步场景设计了 seastar::semaphore
信号量机制用于控制用户对资源的并发访问, 并提供了 future-style API;
下面用它来限制同一时刻只有一个操作可以访问 data
:
static int data = 0;
static seastar::semaphore sema{1};
<> modify_data(int d) {
futurereturn seastar::with_semaphore(sema, 1, [d]() {
return do_something().then([d]() {
= d;
data });
});
}
(void) modify_data(3);
(void) modify_data(7);
这里我们创建了一个初始大小为 1 的 semaphore,这表示它所管理的资源数为
1;通过 seastar::with_semaphore
执行原来的
fiber,这个函数会从 sema
中取出一个资源,结束时释放;所以当调用 modify_data(7)
时如果前一个 fiber 没有结束,此时 sema
持有的资源数为
0,那么 with_semaphore()
就会等待;这样就保证了同时只有一个
modify_data
fiber 在执行;以上代码的结果也是确定的:最终
data
的值为 7
以上代码中 seastar::semaphore
用于限制同时可以执行的
fiber 的实例个数;除此之外,由于 semaphore 可以管理任意多个资源(实际上是
int64_t
),所以我们还可以用它来限制其他资源。
比如我们有一个函数 using_lots_of_memory
会使用较多内存,所以肯定不希望它无限制地执行;下面用
seastar::semaphore
来限制这个函数最多只能使用 1MB
内存,超过了再调用它也不会立即执行,而是等待该函数的其他调用实例结束并释放资源:
::future<> using_lots_of_memory(size_t bytes) {
seastarstatic thread_local seastar::semaphore limit(1000000); // limit to 1MB
return seastar::with_semaphore(limit, bytes, [bytes] {
// do something allocating 'bytes' bytes of memory
});
}
seastar::semaphore
的实现非常简洁:
ssize_t _count;
std::exception_ptr _ex;
struct entry {
<> pr;
promisesize_t nr;
std::optional<abort_on_expiry<clock>> timer;
};
::abortable_fifo<entry, expiry_handler> _wait_list; internal
是的,就是这么简单;其中 _count
表示该 semaphore
所管理的资源总数,_ex
表示该 semaphore broken
时的异常,_wait_list
则保存着所有的 waiter。
最开始的使用实例中我们已经看到了,seastar::semaphore
的使用很简单:
::semaphore sema{7}; seastar
这样就创建了一个可用资源数为 7 的 semaphore;其实
seastar::semaphore
只是一个类型别名,真正的类型是
basic_semaphore
:
template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
class basic_semaphore : private ExceptionFactory;
其中 ExceptionFactory
用于自定义异常,Clock
用于自定义时间精度——没错,seastar::semaphore
也支持指定最长等待时间;除了常用的 seastar::semaphore
之外,还有另外一个预定义的 semaphose:
using named_semaphore = basic_semaphore<named_semaphore_exception_factory>;
这是一个具名信号量,这种信号量在等待超时、或者 semaphore broken
时抛出来的异常会带有一个名字用于指明是哪一个信号量。这个套路和
std::string
以及 baisc_string
相同,我们一般用默认的 std::string
就足够了,但是这种做法无疑给我们提供了更多扩展能力
Semaphore 最初被 E.W.Dijkstra 提出时它包含两个操作:P 和 V2,其中 P 表示取出资源,而 V
表示放回资源;同样 seastar::semaphore
也提供了这两个操作,分别命名为 wait
和
signal
,比如前面 modify_data
的例子用
wait
和 signal
我们可以改成这样:
.wait(1).then([d]() {
semareturn do_something().then([d]() {
= d;
data }).finally([]() {
.signal(1);
sema});
})
seastar::semaphore
用 wait
来表示
P
操作,即获取资源;
<> wait(time_point timeout, size_t nr = 1) noexcept {
futureif (may_proceed(nr)) {
-= nr;
_count return make_ready_future<>();
}
...
nr
表示本次请求需要的资源数目——是的,不一定一次只能哪一个资源(回顾之前的内存的例子);首先看看当前剩余的资源是否可以满足该请求,如果可以的话,直接返回一个
ready future
...
if (_ex) {
return make_exception_future(_ex);
}
try {
& e = _wait_list.emplace_back(promise<>(), nr);
entryauto f = e.pr.get_future();
if (timeout != time_point::max()) {
.timer.emplace(timeout);
e& as = e.timer->abort_source();
abort_source.make_back_abortable(as);
_wait_list}
return f;
} catch (...) {
return make_exception_future(std::current_exception());
}
}
否则的话,则先检查该 semaphore
是否有异常,seastar::semaphore
中的异常通常用于关闭场景,比如在 Web Server 中我们可以用它来控制最多可以
accept 的连接数,当 Server 关闭时我们可以调用该 semaphore 的
broken
方法产生一个异常;所以这样的异常表示该 semaphore
不再可用,这种情况下也直接返回一个 exceptional future;
否则的话,我们就需要等待,这里又是常见的套路:创建一个 promise 保存起来,返回该 promise 关联的 future,并用该 future 串联后续要执行的操作,这样只有该 future 被 resolve,后续的操作才会真正发起;
但是稍微有一点不太一样,seastar::semaphore
支持
timed-wait,调用方可以指定最长愿意等待多长时间,超过这个时间就就不能继续等了;所以如果传入的
timeout
是一个有效值,说明用户的确指定了超时值,此时需要该
waiter entry 关联一个 timer,暂时不用管 abortable_fifo
和
expiry_handler
,现在只需要知道,当该 timer 超时,就会通过
entry
中的 promise 往关联的 future 中写入一个 timedout
exception 从而激活它。
此外,seastar::semaphore
还提供了非阻塞版本的
try_wait
:
bool try_wait(size_t nr = 1) noexcept {
if (may_proceed(nr)) {
-= nr;
_count return true;
} else {
return false;
}
}
只是简单地检查一下剩余资源是否可以立即满足请求,并不会在资源不足时创建 waiter——而是返回 -1 告诉调用方资源不足。
和 wait
正好相反,signal
用于将资源归还给
semaphore:
void signal(size_t nr = 1) noexcept {
if (_ex) {
return;
}
+= nr;
_count while (!_wait_list.empty() && has_available_units(_wait_list.front().nr)) {
auto& x = _wait_list.front();
-= x.nr;
_count .pr.set_value();
x.pop_front();
_wait_list}
}
首先看看该 semaphore
是否还有效,无效的话说明也不用做;否则的话递增计数器,然后处理等待队列;队列中的
waiter 是按 FIFO 顺序去处理的,这一点很重要,并且在 wait
中也对这一点做了保障——就在 may_proceed
方法中:
bool has_available_units(size_t nr) const noexcept {
return _count >= 0 && (static_cast<size_t>(_count) >= nr);
}
bool may_proceed(size_t nr) const noexcept {
return has_available_units(nr) && _wait_list.empty();
}
它不仅仅检查剩余资源数目是否足够,还会检查等待队列上是否有其他 waiter 已经在等待;少了第二点,FIFO 的顺序就无法保证:有的请求需要的资源数目多,那么它就更有可能被其他需要资源数目少的请求给抢占,从而得不到执行机会以至于被饿死(starve)。
前面已经提到过,seastar::semaphore
是可以被关闭的——通过
broken
方法:
template<typename ExceptionFactory, typename Clock>
inline void
<ExceptionFactory, Clock>::broken(std::exception_ptr xp) noexcept {
basic_semaphorestatic_assert(std::is_nothrow_copy_constructible_v<std::exception_ptr>);
= xp;
_ex = 0;
_count while (!_wait_list.empty()) {
auto& x = _wait_list.front();
.pr.set_exception(xp);
x.pop_front();
_wait_list}
}
传入一个异常表示关闭原因,然后 borken
会将可用资源数设置为 0 并以异常的方式激活等待队列中所有的
waiter——所以我们可以在 signal
时发现 semaphore 已经 broken
时什么都不做,而不用担心 waiter 得不到调度执行。
通过前面的例子可以发现,使用
seastar::semaphore
我们需要搭配 wait
和
signal
,毫无疑问这会给我们带来一些心智负担,一不小心可能就忘记了调用
signal
——可能直接就是忘记了调用,也可能是调用了但是没有得到执行;比如:
为了异常安全,在 C++
中一般不建议将资源的获取与释放分开在两个函数中进行,而是应该使用 RAII
机制,seastar
基于此提供了 with_semaphore
这个
utility:
template <typename ExceptionFactory, typename Clock, typename Func>
inline
futurize_t<std::invoke_result_t<Func>>
(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, typename basic_semaphore<ExceptionFactory, Clock>::duration timeout, Func&& func) noexcept {
with_semaphorereturn get_units(sem, units, timeout).then([func = std::forward<Func>(func)] (auto units) mutable {
return futurize_invoke(std::forward<Func>(func)).finally([units = std::move(units)] {});
});
}
这个函数接受一个
seastar::semaphore
,一个要执行的函数,以及
unites
表示执行该函数需要消耗的资源数,他会自动从 semaphore
中获取资源(可能等待),然后调用函数并保证会释放资源
首先通过 get_units
获取资源(可能等待),之后创建一个
semaphore_unites
,也就是说这个 semaphore_units
持有这份资源,然后 futurize_invoke
执行该函数——并处理其中可能抛出的异常(捕获异常将其转换为一个 exceptional
future 返回给调用方);最后在 finally
中释放
semaphore_units
从而将资源归还给
seastar::semaphore
在 with_semaphore
中已经看到了
semaphore_unites
的使用,它是资源的持有者:
template<typename ExceptionFactory = semaphore_default_exception_factory, typename Clock = typename timer<>::clock>
class semaphore_units {
<ExceptionFactory, Clock>* _sem;
basic_semaphoresize_t _n;
其中 _n
表示它所持有的资源个数,_sem
则是对该资源所属的 semaphore 的引用;创建这样一个对象并不会导致从
semaphore 中取出资源——我们必须先取出资源,然后才能创建这样一个对象,就像
get_units
函数所做的那样:
template<typename ExceptionFactory, typename Clock>
<semaphore_units<ExceptionFactory, Clock>>
future(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, typename basic_semaphore<ExceptionFactory, Clock>::duration timeout) noexcept {
get_unitsreturn sem.wait(timeout, units).then([&sem, units] {
return semaphore_units<ExceptionFactory, Clock>{ sem, units };
});
}
所以我们一般不会直接创建 semaphore_units
对象,而是通过
get_units
,该对象析构时,其所持有的资源将会被归还:
~semaphore_units() noexcept {
();
return_all}
void return_all() noexcept {
if (_n) {
->signal(_n);
_sem= 0;
_n }
}
就这两点来看,一般情况下不会直接和 semaphore_units
打交道,甚至也感受不到它的存在:通过 get_units
获取资源并创建
semaphore_unites
,通过其析构函数自动释放资源——而这也被
with_semaphore
给封装好了。
但是 semaphore_units
还提供了其他方法,其中一个比较有用的可能是 split
:
(size_t units) {
semaphore_units splitif (units > _n) {
throw std::invalid_argument("Cannot take more units than those protected by the semaphore");
}
-= units;
_n return semaphore_units(_sem, units);
}
将一个 units 一分为二,这一点在预分配资源的情况下应该比较有用。