Seastar: sharded service(2)

Jianyong Chen

Preface

第一篇里面介绍了一下 seastar::sharded 的基本概念和用法, 一般情况下我们用这些就足够了, 但是 Seastar 还为其他一些特殊的使用场景提供了支持, 当程序功能越来越复杂, 碰到这些场景的概率肯定会更大, 所以从完备的层面考虑, 还是得介绍一些他们; 其实也并不困难, Seastar 是秉持实用主义的, 也就是说在里面的组件一定是有现实中的使用场景的, 用到他们只是时间的问题(了解了这些组件之后可以在 redpandascylladb 的 codebase 里搜索一下具体的使用场景)

sharded parameter

前面并没有特别说明 seastar::shardedstart() 以及 invoke_on_* 系列函数允许传递哪些参数, 笼统地说只要是支持拷贝构造的类型(包括内置数据类型)都支持, 但是可以更具体地分为三类:

第一类是最简单也是也是最常用的, 所有除了第二类和第三类说明的类型(shardedsharded_parameter)外的其他类型都属于第一类, 比如 intstd::string 以及自定义类型 class Widget 等, 他们会被拷贝到各个 shard

第二类则是考虑到多个 sharded 之间的协作, 比如某个 Service-A 构建时需要用到 Service-B, 此时 A 自然要用的是自己所在 shard 上的 B 实例, 但是在 start 的时候并没有办法指定需要哪个 shard 上的实例(将 lambda 分发到 shard 去执行这个逻辑隐藏在 start() 方法内部), 所以 Seastar 在框架层面上为这种需求添加了支持:

struct ServeiceOne { /* ... */ };

struct ServiceTwo {
  void Say(const std::string &msg, ServiceOne &so);
};

{
  ss::sharded<ServiceOne> so;
  ss::sharded<ServiceTwo> st;
  
  so.start().get0();
  st.start().get0();
  
  std::string msg = "Hello";
  st.invoke_on_all<void (ServiceTwo::*)(const std::string &, ServiceOne &)>(
      &ServiceTwo::say, msg, std::ref(so));
}

这里要在所有 shard 上执行 ServiceTwo::Say 方法, 该方法接受一个 ServiceOne &——这也是一个 sharded 服务; 使用的时候我们传入一个 sharded<ServiceOne>(因为 sharded 不支持拷贝, 所以需要用 std::ref 包装成一个 reference wrapper); Seastar 在调用该方法之前, 对于 sharded 类型的参数会拿出其在当前 shard 的实例作为实际调用参数

但是这还不够灵活, 有时候我们需要传递一些依赖于其所在 shard 的内容, 比如内存分配(Seastar 中每个 shard 都有自己专属的 memory), 为此 Seastar 提供了 sharded_parameter, 它是一个 lambda 以及该 lambda 所需参数的 wrapper, 通过将该 lambda 在对应 shard 上求值达到前面所说的效果, 我们可以这样使用:

struct Service {
  void Say(const std::string &msg, std::unique_ptr<int> &&ptr);
};

{
  ss::sharded<Service> s;
  ...
  std::string msg = "Hello";
  s.invoke_on_all<void (ServiceThree::*)(const std::string &,
                                         std::unique_ptr<int> &&) const>(
       &ServiceThree::Say2, msg,
       ss::sharded_parameter([]() { return std::make_unique<int>(0); }))
      .get0();
}

实现方面则比较简单, 主要靠的是模板偏特化, 在调用 functor 之前, 会用 unwrap_sharded_arg 来解析参数, 通过对 std::reference_wraper<sharded<Service>>sharded_parameter<...> 定制特殊的 unwrap 参数逻辑从而统一这些参数的传递方式:

template <typename T>
inline T&&
unwrap_sharded_arg(T&& arg) {
    return std::forward<T>(arg);
}

template <typename Service>
either_sharded_or_local<Service>
unwrap_sharded_arg(std::reference_wrapper<sharded<Service>> arg) {
    return either_sharded_or_local<Service>(arg);
}

template <typename Func, typename... Param>
auto
unwrap_sharded_arg(sharded_parameter<Func, Param...> sp) {
    return sp.evaluate();
}

async_sharded_service

考虑下面这种情况: Service 发起了一个异步操作并在该操作中引用自己, 直接这样做的话很有可能出问题, 因为我们无法确定该异步操作会在 stop() 前完成; 一般情况是借助 ss:gate 跟踪这些异步操作, 并在 Service 自己的 stop() 方法中等待 gate 关闭, 即异步操作完成, 之后才真正释放资源

ss::sharded 在框架层面也提供了机制以解决该问题, 但是并不是使用 gate, 而是通过让 Service 继承 ss::async_sharded_service(以 CRTP 的方式), 该类型继承自 ss::enable_shared_from_this<Service, 然后在异步操作中通过 shared_from_this() 增加引用计数, ss::sharded::stop() 会确保在所有 shard 上的 Service 实例的引用计数都递减为 0 之后才真正释放资源

下面是一个简单的 🌰:

struct Service : public ss::async_sharded_service<Service> {
  Service() = default;
  ~Service() {
    logger.info("destruct service on shard-{}", ss::this_shard_id());
  }

  void DoSth() {
    using namespace std::chrono_literals;
    (void)ss::sleep(1s).then([_ = shared_from_this(), this]() {
      logger.info("sleep() done on shard-{}", ss::this_shard_id());
      /* use this.. */
    });
  }
};

{
  ss::sharded<Service> s;
  s.start().get0();
  s.invoke_on<void (Service::*)()>(1, &Service::DoSth).get0();
  s.stop().get0(); 
}

DoSth 方法中发起一个异步操作, 并且引用了 Service 自己(capture-by-this), 因为这里用的都是 get0ss:async() 中同步等待异步操作完成, 所以我在 DoSth中发起的是一个后台操作, 那么在 s.stop() 被调用时, 该异步操作还没有完成,不过由于使用了 ss::async_sharded_service, 所以并不用担心空指针的问题

观察日志可以发现, shard-1 上的实例是在异步操作完成后才被释放的:

INFO  2022-12-04 08:24:21,182 [shard  0] sharded_demo_3 - destruct service on shard-0
INFO  2022-12-04 08:24:21,183 [shard  2] sharded_demo_3 - destruct service on shard-2
INFO  2022-12-04 08:24:21,183 [shard  3] sharded_demo_3 - destruct service on shard-3
INFO  2022-12-04 08:24:21,183 [shard  7] sharded_demo_3 - destruct service on shard-7
INFO  2022-12-04 08:24:21,183 [shard  5] sharded_demo_3 - destruct service on shard-5
INFO  2022-12-04 08:24:21,183 [shard  8] sharded_demo_3 - destruct service on shard-8
INFO  2022-12-04 08:24:21,183 [shard  9] sharded_demo_3 - destruct service on shard-9
INFO  2022-12-04 08:24:21,183 [shard 11] sharded_demo_3 - destruct service on shard-11
INFO  2022-12-04 08:24:21,183 [shard  4] sharded_demo_3 - destruct service on shard-4
INFO  2022-12-04 08:24:21,183 [shard  6] sharded_demo_3 - destruct service on shard-6
INFO  2022-12-04 08:24:21,183 [shard 15] sharded_demo_3 - destruct service on shard-15
INFO  2022-12-04 08:24:21,183 [shard 10] sharded_demo_3 - destruct service on shard-10
INFO  2022-12-04 08:24:21,183 [shard 12] sharded_demo_3 - destruct service on shard-12
INFO  2022-12-04 08:24:21,183 [shard 13] sharded_demo_3 - destruct service on shard-13
INFO  2022-12-04 08:24:21,183 [shard 14] sharded_demo_3 - destruct service on shard-14
INFO  2022-12-04 08:24:22,182 [shard  1] sharded_demo_3 - sleep() done on shard-1
INFO  2022-12-04 08:24:22,182 [shard  1] sharded_demo_3 - destruct service on shard-1

实现上其实也很简单, 之前在介绍 shard 上实例的创建的时候提到过, 每个 shard 上的 entry 有两个数据成员:

shared_ptr<Service> service;
promise<> freed;

service 自不必说, freed 就是给 async_sharded_service 用的; 回到 sharded::stop() 方法, 其中有两轮 sharded_parallel_for_each 第一轮是调用所有实例的自己的 stop() 方法, 第二轮是将将 entry::service 置为 nullptr 递减引用计数, 然后等待 freed 这个 promise 被 resolve

如果发现 Service 并没有继承 ss::async_sharded_service<Service>, 那么这个 promise 会在创建 Service 实例的时候被 resolve:

void service_deleted() noexcept {
    _instances[this_shard_id()].freed.set_value();
}

void track_deletion(shared_ptr<Service>&, std::false_type) noexcept {
    // do not wait for instance to be deleted since it is not going to notify us
    service_deleted();
}

void track_deletion(shared_ptr<Service>& s, std::true_type) {
    s->_delete_cb = std::bind(std::mem_fn(&sharded<Service>::service_deleted), this);
}

template <typename... Args>
shared_ptr<Service> create_local_service(Args&&... args) {
    auto s = ::seastar::make_shared<Service>(std::forward<Args>(args)...);
    set_container(*s);
    track_deletion(s, std::is_base_of<async_sharded_service<Service>, Service>());
    return s;
}

否则则会在 ss::async_sharded_service<Service> 的析构函数中被 resolve:

class async_sharded_service : public enable_shared_from_this<T> {
protected:
    std::function<void()> _delete_cb;
    async_sharded_service() noexcept = default;
    virtual ~async_sharded_service() {
        if (_delete_cb) {
            _delete_cb();
        }
    }
};

只有当引用计数递减为 0 的时候, 才会执行 async_sharded_service 的析构函数, 这也保证了在异步操作中引用 Service 实例的安全性, 不过需要注意的就是要手动 shared_from_this

peering_sharded_service

在前面的 ss::sharded 中, 各个 shard 上的 Service 只能感知到自己, 而有时候我们通常需要多个 shard 上的 Service 协同合作——通过参数传递 sharded 是不行的, 它最终会被解释为 shard 上的实例; Seastar 为这种多同一个 sharded 的实例之间相互协作的场景也提供了支持, 即 peering_sharded_service

template <typename Service>
class peering_sharded_service {
    sharded<Service>* _container = nullptr;
private:
    void set_container(sharded<Service>* container) noexcept { _container = container; }
public:
    sharded<Service>& container() noexcept { return *_container; }
    const sharded<Service>& container() const noexcept { return *_container; }
};

async_sharded_service 的使用方法类似, 让 Service 继承它(CRTP, 同样的), 就可以让 Service 使用 container() 方法获取其所属的 sharded, 从而可以用来向其他 shard 上的实例发起函数调用; 比如下面这个 🌰:

ss::future<bool> conn_quota::do_get(ss::net::inet_address addr) {
    auto home_shard = addr_to_shard(addr);
    if (home_shard == ss::this_shard_id()) {
        // Fast path: we are the home shard for this address, can
        // probably get a token locally (unless exhausted)
        return home_get_units(addr);
    } else {
        return container().invoke_on(home_shard, [addr](conn_quota& cq) {
            return cq.home_get_units(addr);
        });
    }
}

这段代码来自于 redpanda(有删减), conn_quota 类用于限制连接数, 分为全局限制和 per-IP 限制; 它并不是一个集中式的服务, 而是分散在各个 shard 上, 每个 shard 都有独立的 quota 计数; 当某个 shard 接收到 ip 发起的连接, 在真正服务之前先要检查这个 ip 是否还有 quota; 这个检查逻辑分为 fast path 和 slow path: fast path 就是在当前 shard 就还有该 ip 的 quota, 此时可以在当前 shard 上得到服务, 否则就要去检查其他 shard(有点 work-stealing 的感觉); 具体做法就是将对端 ip 哈希取模得到它所在的 shard, 如果这个 shard 就是当前 shard, 那么就是 fast path, 否则就是 slow path

slow path 需要向其他 shard 上的 conn_quota 实例发起请求, 所以 conn_quota 继承了 ss::peering_sharded_service

Reference