第一篇里面介绍了一下
seastar::sharded
的基本概念和用法,
一般情况下我们用这些就足够了, 但是 Seastar
还为其他一些特殊的使用场景提供了支持, 当程序功能越来越复杂,
碰到这些场景的概率肯定会更大, 所以从完备的层面考虑, 还是得介绍一些他们;
其实也并不困难, Seastar 是秉持实用主义的,
也就是说在里面的组件一定是有现实中的使用场景的,
用到他们只是时间的问题(了解了这些组件之后可以在 redpanda 和 scylladb 的 codebase
里搜索一下具体的使用场景)
前面并没有特别说明 seastar::sharded
的
start()
以及 invoke_on_*
系列函数允许传递哪些参数,
笼统地说只要是支持拷贝构造的类型(包括内置数据类型)都支持,
但是可以更具体地分为三类:
sharded<>
values - these are transformed
before passing to the functor; the local instance is extracted and
passed第一类是最简单也是也是最常用的,
所有除了第二类和第三类说明的类型(sharded
和
sharded_parameter
)外的其他类型都属于第一类, 比如
int
、std::string
以及自定义类型
class Widget
等, 他们会被拷贝到各个 shard
第二类则是考虑到多个 sharded
之间的协作, 比如某个
Service-A 构建时需要用到 Service-B, 此时 A 自然要用的是自己所在 shard
上的 B 实例, 但是在 start
的时候并没有办法指定需要哪个
shard 上的实例(将 lambda 分发到 shard 去执行这个逻辑隐藏在
start()
方法内部), 所以 Seastar
在框架层面上为这种需求添加了支持:
struct ServeiceOne { /* ... */ };
struct ServiceTwo {
void Say(const std::string &msg, ServiceOne &so);
};
{
::sharded<ServiceOne> so;
ss::sharded<ServiceTwo> st;
ss
.start().get0();
so.start().get0();
st
std::string msg = "Hello";
.invoke_on_all<void (ServiceTwo::*)(const std::string &, ServiceOne &)>(
st&ServiceTwo::say, msg, std::ref(so));
}
这里要在所有 shard 上执行 ServiceTwo::Say
方法,
该方法接受一个 ServiceOne &
——这也是一个
sharded
服务; 使用的时候我们传入一个
sharded<ServiceOne>
(因为 sharded
不支持拷贝, 所以需要用 std::ref
包装成一个 reference
wrapper); Seastar 在调用该方法之前, 对于 sharded
类型的参数会拿出其在当前 shard 的实例作为实际调用参数
但是这还不够灵活, 有时候我们需要传递一些依赖于其所在 shard 的内容,
比如内存分配(Seastar 中每个 shard 都有自己专属的 memory), 为此 Seastar
提供了 sharded_parameter
, 它是一个 lambda 以及该 lambda
所需参数的 wrapper, 通过将该 lambda 在对应 shard
上求值达到前面所说的效果, 我们可以这样使用:
struct Service {
void Say(const std::string &msg, std::unique_ptr<int> &&ptr);
};
{
::sharded<Service> s;
ss...
std::string msg = "Hello";
.invoke_on_all<void (ServiceThree::*)(const std::string &,
sstd::unique_ptr<int> &&) const>(
&ServiceThree::Say2, msg,
::sharded_parameter([]() { return std::make_unique<int>(0); }))
ss.get0();
}
实现方面则比较简单, 主要靠的是模板偏特化, 在调用 functor 之前, 会用
unwrap_sharded_arg
来解析参数, 通过对
std::reference_wraper<sharded<Service>>
和
sharded_parameter<...>
定制特殊的 unwrap
参数逻辑从而统一这些参数的传递方式:
template <typename T>
inline T&&
(T&& arg) {
unwrap_sharded_argreturn std::forward<T>(arg);
}
template <typename Service>
<Service>
either_sharded_or_local(std::reference_wrapper<sharded<Service>> arg) {
unwrap_sharded_argreturn either_sharded_or_local<Service>(arg);
}
template <typename Func, typename... Param>
auto
(sharded_parameter<Func, Param...> sp) {
unwrap_sharded_argreturn sp.evaluate();
}
考虑下面这种情况: Service
发起了一个异步操作并在该操作中引用自己, 直接这样做的话很有可能出问题,
因为我们无法确定该异步操作会在 stop()
前完成;
一般情况是借助 ss:gate
跟踪这些异步操作, 并在
Service
自己的 stop()
方法中等待
gate
关闭, 即异步操作完成, 之后才真正释放资源
ss::sharded
在框架层面也提供了机制以解决该问题,
但是并不是使用 gate
, 而是通过让 Service
继承
ss::async_sharded_service
(以 CRTP
的方式), 该类型继承自
ss::enable_shared_from_this<Service
,
然后在异步操作中通过 shared_from_this()
增加引用计数,
ss::sharded::stop()
会确保在所有 shard 上的
Service
实例的引用计数都递减为 0 之后才真正释放资源
下面是一个简单的 🌰:
struct Service : public ss::async_sharded_service<Service> {
() = default;
Service~Service() {
.info("destruct service on shard-{}", ss::this_shard_id());
logger}
void DoSth() {
using namespace std::chrono_literals;
(void)ss::sleep(1s).then([_ = shared_from_this(), this]() {
.info("sleep() done on shard-{}", ss::this_shard_id());
logger/* use this.. */
});
}
};
{
::sharded<Service> s;
ss.start().get0();
s.invoke_on<void (Service::*)()>(1, &Service::DoSth).get0();
s.stop().get0();
s}
DoSth
方法中发起一个异步操作, 并且引用了
Service
自己(capture-by-this), 因为这里用的都是
get0
在 ss:async()
中同步等待异步操作完成,
所以我在 DoSth
中发起的是一个后台操作, 那么在
s.stop()
被调用时, 该异步操作还没有完成,不过由于使用了
ss::async_sharded_service
, 所以并不用担心空指针的问题
观察日志可以发现, shard-1 上的实例是在异步操作完成后才被释放的:
INFO 2022-12-04 08:24:21,182 [shard 0] sharded_demo_3 - destruct service on shard-0
INFO 2022-12-04 08:24:21,183 [shard 2] sharded_demo_3 - destruct service on shard-2
INFO 2022-12-04 08:24:21,183 [shard 3] sharded_demo_3 - destruct service on shard-3
INFO 2022-12-04 08:24:21,183 [shard 7] sharded_demo_3 - destruct service on shard-7
INFO 2022-12-04 08:24:21,183 [shard 5] sharded_demo_3 - destruct service on shard-5
INFO 2022-12-04 08:24:21,183 [shard 8] sharded_demo_3 - destruct service on shard-8
INFO 2022-12-04 08:24:21,183 [shard 9] sharded_demo_3 - destruct service on shard-9
INFO 2022-12-04 08:24:21,183 [shard 11] sharded_demo_3 - destruct service on shard-11
INFO 2022-12-04 08:24:21,183 [shard 4] sharded_demo_3 - destruct service on shard-4
INFO 2022-12-04 08:24:21,183 [shard 6] sharded_demo_3 - destruct service on shard-6
INFO 2022-12-04 08:24:21,183 [shard 15] sharded_demo_3 - destruct service on shard-15
INFO 2022-12-04 08:24:21,183 [shard 10] sharded_demo_3 - destruct service on shard-10
INFO 2022-12-04 08:24:21,183 [shard 12] sharded_demo_3 - destruct service on shard-12
INFO 2022-12-04 08:24:21,183 [shard 13] sharded_demo_3 - destruct service on shard-13
INFO 2022-12-04 08:24:21,183 [shard 14] sharded_demo_3 - destruct service on shard-14
INFO 2022-12-04 08:24:22,182 [shard 1] sharded_demo_3 - sleep() done on shard-1 INFO 2022-12-04 08:24:22,182 [shard 1] sharded_demo_3 - destruct service on shard-1
实现上其实也很简单, 之前在介绍 shard 上实例的创建的时候提到过, 每个
shard 上的 entry
有两个数据成员:
<Service> service;
shared_ptr<> freed; promise
service
自不必说, freed
就是给
async_sharded_service
用的; 回到
sharded::stop()
方法, 其中有两轮
sharded_parallel_for_each
第一轮是调用所有实例的自己的
stop()
方法, 第二轮是将将 entry::service
置为
nullptr
递减引用计数, 然后等待 freed
这个
promise
被 resolve
如果发现 Service
并没有继承
ss::async_sharded_service<Service>
, 那么这个 promise
会在创建 Service
实例的时候被 resolve:
void service_deleted() noexcept {
[this_shard_id()].freed.set_value();
_instances}
void track_deletion(shared_ptr<Service>&, std::false_type) noexcept {
// do not wait for instance to be deleted since it is not going to notify us
();
service_deleted}
void track_deletion(shared_ptr<Service>& s, std::true_type) {
->_delete_cb = std::bind(std::mem_fn(&sharded<Service>::service_deleted), this);
s}
template <typename... Args>
<Service> create_local_service(Args&&... args) {
shared_ptrauto s = ::seastar::make_shared<Service>(std::forward<Args>(args)...);
(*s);
set_container(s, std::is_base_of<async_sharded_service<Service>, Service>());
track_deletionreturn s;
}
否则则会在 ss::async_sharded_service<Service>
的析构函数中被 resolve:
class async_sharded_service : public enable_shared_from_this<T> {
protected:
std::function<void()> _delete_cb;
() noexcept = default;
async_sharded_servicevirtual ~async_sharded_service() {
if (_delete_cb) {
();
_delete_cb}
}
};
只有当引用计数递减为 0 的时候, 才会执行
async_sharded_service
的析构函数,
这也保证了在异步操作中引用 Service
实例的安全性,
不过需要注意的就是要手动 shared_from_this
在前面的 ss::sharded
中, 各个 shard 上的 Service
只能感知到自己, 而有时候我们通常需要多个 shard 上的 Service
协同合作——通过参数传递 sharded
是不行的, 它最终会被解释为
shard 上的实例; Seastar 为这种多同一个 sharded
的实例之间相互协作的场景也提供了支持, 即
peering_sharded_service
template <typename Service>
class peering_sharded_service {
<Service>* _container = nullptr;
shardedprivate:
void set_container(sharded<Service>* container) noexcept { _container = container; }
public:
<Service>& container() noexcept { return *_container; }
shardedconst sharded<Service>& container() const noexcept { return *_container; }
};
和 async_sharded_service
的使用方法类似, 让
Service
继承它(CRTP, 同样的), 就可以让 Service
使用 container()
方法获取其所属的 sharded
,
从而可以用来向其他 shard 上的实例发起函数调用; 比如下面这个 🌰:
::future<bool> conn_quota::do_get(ss::net::inet_address addr) {
ssauto home_shard = addr_to_shard(addr);
if (home_shard == ss::this_shard_id()) {
// Fast path: we are the home shard for this address, can
// probably get a token locally (unless exhausted)
return home_get_units(addr);
} else {
return container().invoke_on(home_shard, [addr](conn_quota& cq) {
return cq.home_get_units(addr);
});
}
}
这段代码来自于 redpanda(有删减),
conn_quota
类用于限制连接数, 分为全局限制和 per-IP 限制;
它并不是一个集中式的服务, 而是分散在各个 shard 上, 每个 shard 都有独立的
quota 计数; 当某个 shard 接收到 ip 发起的连接,
在真正服务之前先要检查这个 ip 是否还有 quota; 这个检查逻辑分为 fast path
和 slow path: fast path 就是在当前 shard 就还有该 ip 的 quota,
此时可以在当前 shard 上得到服务, 否则就要去检查其他 shard(有点
work-stealing 的感觉); 具体做法就是将对端 ip 哈希取模得到它所在的 shard,
如果这个 shard 就是当前 shard, 那么就是 fast path, 否则就是 slow
path
slow path 需要向其他 shard 上的 conn_quota
实例发起请求,
所以 conn_quota
继承了
ss::peering_sharded_service