Seastar 是一个多线程异步库, 基于它的 App 通常都有在多个 shard(也称
logic core) 上对称部署服务的诉求; 为此我们需要在多个 shard
上创建服务的实例, 并且让这些实例执行某些操作; 这些涉及到 shard
之间的通信, Seastar 提倡使用显式的消息传递(message
passing)而不是传统多线程编程常用的共享内存&加锁的方式进行通信,
为此提供了 smp
工具; 所以我们其实可以这样写:
std::vector<Service> instances;
.resive(smp::count);
instances
::invoke_on_all([&instances]() {
smp[this_shard_id()] = new Service();
instance});
使用 smp
中的工具, 我们可以实现这些操作,
但是如果有很多这样的服务,
那么就需要重复大量类似的代码(以上代码只是该场景下很小的一部分),
所以我们应该将其沉淀为一个通用的组件, 而 Seastar 也是这样做的, 这就是
seastar::sharded
.
一开始它并不叫这个名字, 而是 seastar::distributed
,
即分布式服务, 后面考虑到 distributed 经常用于跨机器的场景,
而这里实际上是同机器跨核, 所以改名为 seastar::sharded
,
不过考虑到向前兼容还是把 seastar::distributed
作为
seastar::sharded
的一个别名保留, 只是不再推荐使用
在了解它的实现之前, 先看看它的一些用法, Seastar 提供了一个简易的 HTTP
server, 以此为例, 看看如何使用 sharded
在多个 core
上部署它; 为了简化逻辑, 使用 seastar::async
以串行的方式写:
return ss::async([]() {
::sharded<ss::httpd::http_server> server;
ssauto deferred = ss::defer([&server]() noexcept {
.stop().get0();
server});
::sstring name = "sharded httpd";
ss.start(std::move(name)).get0();
server.invoke_on_all([](ss::httpd::http_server &s) {}).get0();
server
uint16_t port = 12345;
::socket_address sa(port);
ss
server.invoke_on_all<ss::future<> (ss::httpd::http_server::*)(
::socket_address)>(&ss::httpd::http_server::listen, sa)
ss.get0();
auto handler = new my_handler();
server.invoke_on_all([handler](ss::httpd::http_server &s) {
._routes.add_default_handler(handler);
s})
.get0();
return 0;
});
有几个比较重要的步骤:
ss::sharded
的 start()
方法,
该方法负责在所有 shard 上创建 Service
(此处即
ss::httpd::http_server
)实例, 该方法可接收变长参数,
会拷贝转发给 Service
的构造函数invoke_on_all
方法在所有 shard
的实例上执行某个函数, 这里的函数可以是自由函数(更准确地说是 functor),
也可以是 Service
的类方法; 这是非常常用的操作,
ss::sharded
为此重载了许多变体ss::sharded
的 stop()
方法释放/清理实例, 这一步是必需的(这里是作为一个 deferred action 通过
RAII 来确保释放)template<typename Service>
class sharded {
struct entry {
<Service> service;
shared_ptr<> freed;
promise};
std::vector<entry> _instances;
};
很简单的一个结构, _instance
则保存着所有 shard 上的实例,
以 ss::this_shard_id()
作为数组索引, 每个 shard
只访问属于自己的实例, 所以不会有并发读写的问题; 暂时可以忽略
entry
中的 freed
成员,
它比较特殊就留到后面再介绍.
前面我们讲过, 使用 ss::sharded
首先要调用其
start()
方法在各个 shard 上创建实例:
template <typename Service>
template <typename... Args>
<> sharded<Service>::start(Args &&...args) noexcept {
futurereturn sharded_parallel_for_each(
[this, args = std::make_tuple(std::forward<Args>(args)...)](
unsigned c) mutable {
return smp::submit_to(c, [this, args]() mutable {
[this_shard_id()].service = std::apply(
_instances[this](Args... args) {
return create_local_service(std::forward<Args>(args)...);
},
);
args});
});
}
逻辑非常简单直观, 其实就是借助 smp::submit_to
将一个创建
Service
的任务分发到各个 shard 去执行,
不过有几个地方需要注意:
start()
函数可以传递用于构造 Service
所需的参数, 但是由于是 \(1:n\),
所以这些参数需要支持拷贝, 这样才可以分发到多个 shard 去start()
以及后面的 invoke_on_*
系列方法支持两类特殊参数, 这会在第二篇中介绍这是最常用的操作, 为此 Seastar 在三个维度上提供了许多
invoke_on_*
变体:
invoke_on()
用于指定在某个 shard
上执行, invoke_on_all()
指定在所有 shard 上执行,
invoke_on_others()
指定在除当前 shard 之外的所有 shard
上执行smp_submit_options
: invoke_on_*
函数底层也是用的 smp::submit_to
(并发地),
考虑到如果不加限制地并发调用 invoke_on_*
函数, 很有可能导致
OOM(核数越多越有可能), 所以提供这个参数传递给底层的
smp::submit_to
用于控制并发度, 默认用的是
default_service_group()
Service
类的成员方法这些 invoke_on_*
变体大都大同小异,
并且是以一种统一、通用的方式实现; 比如对于需要在所有 shard
上执行函数的场景, Seastar 提供了以下两个 invoke_on_all
变体(以及另外两个带 smp_submit_options
的变体):
<> invoke_on_all(std::function<future<> (Service&)> func) noexcept;
future
template <typename Func, typename... Args>
<> invoke_on_all(Func func, Args... args) noexcept; future
看起来第一个其实没有必要——它也可以通过第二个模板函数实现:
该模板可以接受自由函数, 也可以接受 Service
类中的方法,
并且允许函数接受除 Service&
之外的其余参数;
但是考虑到编译速度, 模板实例化导致的代码膨胀, Seastar
还是提供了第一个方法, 该方法中通过 std::function
从而避免了模板, 所以 Seastar 称之为 type-erased 的版本
模板版本其实也是也是将要执行的函数包装为非模板版本所需要的函数类型, 从而转发给他去执行:
template <typename Service>
template <typename Func, typename... Args>
(requires std::invocable<Func, Service &,
SEASTAR_CONCEPT::sharded_unwrap_t<Args>...>)
internalinline future<> sharded<Service>::invoke_on_all(smp_submit_to_options options,
,
Func func... args) noexcept
Argsreturn invoke_on_all(
, invoke_on_all_func_type([func = std::move(func),
options= std::tuple(std::move(args)...)](
args &service) mutable {
Service return std::apply(
[&service, &func](Args &&...args) mutable {
return futurize_apply(
, std::tuple_cat(std::forward_as_tuple(service),
funcstd::tuple(internal::unwrap_sharded_arg(
std::forward<Args>(args))...)));
},
std::move(args));
}));
}
这里的 func
既可以是自由函数, 也可以是
Service
类的方法; 如果是自由函数, 那么它的第一个参数一定是
Service &
, 而如果是 Service
类方法,
则没有此限定(通常也不会是 Service
, 毕竟可以直接通过
this
方法), 不过由于 futurize_apply
底层使用
std::apply
去执行 functor, 所以对于 Service
类方法的指针, 需要额外传入一个 Service
实例作为第一个参数,
这样就统一了二者
至于非模板的版本, 其内部逻辑和创建实例的逻辑类似, 用
parall_for_each
+ smp::submit_to
即可实现
调用 stop()
即可清理服务, 这个方法不接受任何参数:
return sharded_parallel_for_each([this] (unsigned c) mutable {
return smp::submit_to(c, [this] () mutable {
auto inst = _instances[this_shard_id()].service;
if (!inst) {
return make_ready_future<>();
}
return internal::stop_sharded_instance(*inst);
});
}).then_wrapped([this] (future<> fut) {
return sharded_parallel_for_each([this] (unsigned c) {
return smp::submit_to(c, [this] {
if (_instances[this_shard_id()].service == nullptr) {
return make_ready_future<>();
}
[this_shard_id()].service = nullptr;
_instancesreturn _instances[this_shard_id()].freed.get_future();
});
}).finally([this, fut = std::move(fut)] () mutable {
.clear();
_instances= std::vector<sharded<Service>::entry>();
_instances return std::move(fut);
});
});
可以看到这是一个二段式的清理逻辑:
sharded_parallel_for_each
通过
internal::stop_sharded_instance()
调用 Service
自己的 stop()
方法(如果有的话)并等待它们 resolvesharded_parallel_for_each
则是释放实例(递减引用计数), 并等待其 freed
promise 被
resolve有几个点需要注意:
Service
本身有 future<> stop()
函数, 那么它会在清理的时候被调用,
有需要的话我们可以在这里做清理逻辑Service
的
stop()
可能要访问其他 shard 上的实例(这是可以做到的, 第二篇会介绍),
所以在所有实例的 stop()
函数都结束后再统一清理freed
promise 被 resolve,
这个逻辑和 async_sharded_service
相关, 也会在第二篇中介绍