From 4acd3fb474d3c477b9ecf52096dd3eda119929ce Mon Sep 17 00:00:00 2001 From: feihongmeilian Date: Thu, 6 Jun 2024 01:35:22 +0800 Subject: [PATCH] add AsyncRedisCluster.for_each function --- src/sw/redis++/async_redis_cluster.h | 14 ++++++++++++++ src/sw/redis++/async_shards_pool.cpp | 12 ++++++++++++ src/sw/redis++/async_shards_pool.h | 2 ++ 3 files changed, 28 insertions(+) diff --git a/src/sw/redis++/async_redis_cluster.h b/src/sw/redis++/async_redis_cluster.h index 650d986..0afd1f7 100644 --- a/src/sw/redis++/async_redis_cluster.h +++ b/src/sw/redis++/async_redis_cluster.h @@ -55,6 +55,20 @@ class AsyncRedisCluster { AsyncSubscriber subscriber(const StringView &hash_tag); + template + void for_each(Callback &&cb) { + assert(_pool); + _pool->update(); + + auto pools = _pool->pools(); + for (auto &pool : pools) { + auto connection = std::make_shared(pool); + + auto ar = AsyncRedis(connection); + cb(ar); + } + } + template auto command(const StringView &cmd_name, const StringView &key, Args &&...args) -> typename std::enable_if::type, diff --git a/src/sw/redis++/async_shards_pool.cpp b/src/sw/redis++/async_shards_pool.cpp index 257ed16..b4d4ff7 100644 --- a/src/sw/redis++/async_shards_pool.cpp +++ b/src/sw/redis++/async_shards_pool.cpp @@ -114,6 +114,18 @@ ConnectionOptions AsyncShardsPool::connection_options() { return _connection_options(slot); } +std::vector AsyncShardsPool::pools() { + std::lock_guard lock(_mutex); + + std::vector nodes; + nodes.reserve(_pools.size()); + for (const auto &pool : _pools) { + nodes.push_back(pool.second); + } + + return nodes; +} + ConnectionOptions AsyncShardsPool::_connection_options(Slot slot) { std::lock_guard lock(_mutex); diff --git a/src/sw/redis++/async_shards_pool.h b/src/sw/redis++/async_shards_pool.h index 98c6419..e98eac0 100644 --- a/src/sw/redis++/async_shards_pool.h +++ b/src/sw/redis++/async_shards_pool.h @@ -59,6 +59,8 @@ class AsyncShardsPool { ConnectionOptions connection_options(); + std::vector pools(); + private: struct RedeliverEvent { std::string key;