Seastar: semaphore
Preface
最近在实现 fetch 接口时, 需要对单个 JS 脚本的单次执行过程中可以发起的 fetch 操作数进行限制(和 CloudFlare Workers 类似):
- 总量限制:超过了则后续的 fetch 操作都将直接失败
- 并发限制:超过了则后续的 fetch 请求将会被延迟(postpone)执行,直到有其他执行着的 fetch 请求结束
第一点很简单, 用一个计数器即可解决; 关键在于第二个, 要将超出了并发限制的 fetch 操作阻塞住, 直到有其他 fetch 操作结束(即返回的 promise 被 resolve)再唤醒它们执行.
这个需求听起来似乎很熟悉?可以换一种说法,将「同一个时刻可运行的 fetch 请求」视为一种资源,这个资源是可以重复利用的(这一点是并发限制和总量限制的主要区别):发起一个 fetch 请求相当于拿走一份资源,结束一个 fetch 请求相当于归还一份资源;且这个资源并非无限,所以当资源被拿完之后再想拿就等着直到有人归还资源。
说到这里就很清晰了,这个问题就是信号量(semaphore)1要解决的:
In computer science, a semaphore is a variable or abstract data type used to control access to a common resource by multiple threads and avoid critical section problems in a concurrent system such as a multitasking operating system.
使用场景
上面虽然提到了多线程, 但是其实信号量的本质是一种计数型资源 —— 资源不仅仅在多线程下才会产生争抢, 考虑如下场景:
|
|
do_something
是一个异步操作, 该异步操作结束后中会修改一个全局变量 data
; 然后连续调用了两次 modify_data
, 注意这两次调用并没有使用 then
操作串联起来,所以他俩是独立的两个 fiber, 并没有先后依赖关系,因为我们不知道哪个异步操作会先完成, 所以上二者会交织(interleave)执行, 导致最终 data
的值不确定; 其实线程也是类似的, 不加锁的话线程之间同样会交织执行——只是和 fiber 相比, 两者执行流交织的粒度有所不同: fiber 之间的交织是以 continuation 为最小单元, 而 thread 之间的交织则是以 CPU 指令为最小单元
Seastar 为单线程的异步场景设计了 seastar::semaphore
信号量机制用于控制用户对资源的并发访问, 并提供了 future-style API; 下面用它来限制同一时刻只有一个操作可以访问 data
:
|
|
这里我们创建了一个初始大小为 1 的 semaphore,这表示它所管理的资源数为 1;通过 seastar::with_semaphore
执行原来的 fiber,这个函数会从 sema
中取出一个资源,结束时释放;所以当调用 modify_data(7)
时如果前一个 fiber 没有结束,此时 sema
持有的资源数为 0,那么 with_semaphore()
就会等待;这样就保证了同时只有一个 modify_data
fiber 在执行;以上代码的结果也是确定的:最终 data
的值为 7
以上代码中 seastar::semaphore
用于限制同时可以执行的 fiber 的实例个数;除此之外,由于 semaphore 可以管理任意多个资源(实际上是 int64_t
),所以我们还可以用它来限制其他资源。
比如我们有一个函数 using_lots_of_memory
会使用较多内存,所以肯定不希望它无限制地执行;下面用 seastar::semaphore
来限制这个函数最多只能使用 1MB 内存,超过了再调用它也不会立即执行,而是等待该函数的其他调用实例结束并释放资源:
|
|
Semaphore Internals
seastar::semaphore
的实现非常简洁:
|
|
是的,就是这么简单;其中 _count
表示该 semaphore 所管理的资源总数,_ex
表示该 semaphore broken 时的异常,_wait_list
则保存着所有的 waiter。
最开始的使用实例中我们已经看到了,seastar::semaphore
的使用很简单:
|
|
这样就创建了一个可用资源数为 7 的 semaphore;其实 seastar::semaphore
只是一个类型别名,真正的类型是 basic_semaphore
:
|
|
其中 ExceptionFactory
用于自定义异常,Clock
用于自定义时间精度——没错,seastar::semaphore
也支持指定最长等待时间;除了常用的 seastar::semaphore
之外,还有另外一个预定义的 semaphose:
|
|
这是一个具名信号量,这种信号量在等待超时、或者 semaphore broken 时抛出来的异常会带有一个名字用于指明是哪一个信号量。这个套路和 std::string
以及 baisc_string
相同,我们一般用默认的 std::string
就足够了,但是这种做法无疑给我们提供了更多扩展能力
Semaphore 最初被 E.W.Dijkstra 提出时它包含两个操作:P 和 V2,其中 P 表示取出资源,而 V 表示放回资源;同样 seastar::semaphore
也提供了这两个操作,分别命名为 wait
和 signal
,比如前面 modify_data
的例子用 wait
和 signal
我们可以改成这样:
|
|
获取资源
seastar::semaphore
用 wait
来表示 P
操作,即获取资源;
|
|
nr
表示本次请求需要的资源数目——是的,不一定一次只能哪一个资源(回顾之前的内存的例子);首先看看当前剩余的资源是否可以满足该请求,如果可以的话,直接返回一个 ready future
|
|
否则的话,则先检查该 semaphore
是否有异常,seastar::semaphore
中的异常通常用于关闭场景,比如在 Web Server 中我们可以用它来控制最多可以 accept 的连接数,当 Server 关闭时我们可以调用该 semaphore 的 broken
方法产生一个异常;所以这样的异常表示该 semaphore 不再可用,这种情况下也直接返回一个 exceptional future;
否则的话,我们就需要等待,这里又是常见的套路:创建一个 promise 保存起来,返回该 promise 关联的 future,并用该 future 串联后续要执行的操作,这样只有该 future 被 resolve,后续的操作才会真正发起;
但是稍微有一点不太一样,seastar::semaphore
支持 timed-wait,调用方可以指定最长愿意等待多长时间,超过这个时间就就不能继续等了;所以如果传入的 timeout
是一个有效值,说明用户的确指定了超时值,此时需要该 waiter entry 关联一个 timer,暂时不用管 abortable_fifo
和 expiry_handler
,现在只需要知道,当该 timer 超时,就会通过 entry
中的 promise 往关联的 future 中写入一个 timedout exception 从而激活它。
此外,seastar::semaphore
还提供了非阻塞版本的 try_wait
:
|
|
只是简单地检查一下剩余资源是否可以立即满足请求,并不会在资源不足时创建 waiter——而是返回 -1 告诉调用方资源不足。
释放资源
和 wait
正好相反,signal
用于将资源归还给 semaphore:
|
|
首先看看该 semaphore 是否还有效,无效的话说明也不用做;否则的话递增计数器,然后处理等待队列;队列中的 waiter 是按 FIFO 顺序去处理的,这一点很重要,并且在 wait
中也对这一点做了保障——就在 may_proceed
方法中:
|
|
它不仅仅检查剩余资源数目是否足够,还会检查等待队列上是否有其他 waiter 已经在等待;少了第二点,FIFO 的顺序就无法保证:有的请求需要的资源数目多,那么它就更有可能被其他需要资源数目少的请求给抢占,从而得不到执行机会以至于被饿死(starve)。
关闭 Semaphore
前面已经提到过,seastar::semaphore
是可以被关闭的——通过 broken
方法:
|
|
传入一个异常表示关闭原因,然后 borken
会将可用资源数设置为 0 并以异常的方式激活等待队列中所有的 waiter——所以我们可以在 signal
时发现 semaphore 已经 broken 时什么都不做,而不用担心 waiter 得不到调度执行。
Semaphore 与 RAII
通过前面的例子可以发现,使用 seastar::semaphore
我们需要搭配 wait
和 signal
,毫无疑问这会给我们带来一些心智负担,一不小心可能就忘记了调用 signal
——可能直接就是忘记了调用,也可能是调用了但是没有得到执行;比如:
为了异常安全,在 C++ 中一般不建议将资源的获取与释放分开在两个函数中进行,而是应该使用 RAII 机制,seastar
基于此提供了 with_semaphore
这个 utility:
|
|
这个函数接受一个 seastar::semaphore
,一个要执行的函数,以及 unites
表示执行该函数需要消耗的资源数,他会自动从 semaphore 中获取资源(可能等待),然后调用函数并保证会释放资源
首先通过 get_units
获取资源(可能等待),之后创建一个 semaphore_unites
,也就是说这个 semaphore_units
持有这份资源,然后 futurize_invoke
执行该函数——并处理其中可能抛出的异常(捕获异常将其转换为一个 exceptional future 返回给调用方);最后在 finally
中释放 semaphore_units
从而将资源归还给 seastar::semaphore
Semaphore Units
在 with_semaphore
中已经看到了 semaphore_unites
的使用,它是资源的持有者:
|
|
其中 _n
表示它所持有的资源个数,_sem
则是对该资源所属的 semaphore 的引用;创建这样一个对象并不会导致从 semaphore 中取出资源——我们必须先取出资源,然后才能创建这样一个对象,就像 get_units
函数所做的那样:
|
|
所以我们一般不会直接创建 semaphore_units
对象,而是通过 get_units
,该对象析构时,其所持有的资源将会被归还:
|
|
就这两点来看,一般情况下不会直接和 semaphore_units
打交道,甚至也感受不到它的存在:通过 get_units
获取资源并创建 semaphore_unites
,通过其析构函数自动释放资源——而这也被 with_semaphore
给封装好了。
但是 semaphore_units
还提供了其他方法,其中一个比较有用的可能是 split
:
|
|
将一个 units 一分为二,这一点在预分配资源的情况下应该比较有用。