当我们要操作一批key时,可以通过 redis pipline 再执行完后一次性读取所有结果来较少网络传输的消耗; 很明显,这有个限制条件 => 这批key的执行必须在同一个连接上
当部署的redis为 standalone 或 master-slave 结构的时候还好,可以从 pool 取出来的连接都是一个 master 节点的, 那要是 redis cluster 的时候怎么办? 这批key 可能在同一个 redis node 也可能分散在多个 redis nodes 这样就是多个连接了
redis cluster 虽然自动对 key 进行了分片,但是它对 client 的要求比较高,需要客户端连接所有 cluster 内的节点(这个和 db client方案类似)并缓存 slots分配信息,然后在客户端采用同样的算法进行hash后定位 key 的 slot 进而定位 slot 所属的 redis 节点,然后获取对应节点的连接发送命令
cluster pipeline 实现思路
java 常用的客户端 jedis,虽然提供了 redis-cluster 功能,但是并没有提供 cluster 下的 pipeline 能力,我们借助它封装好的 JedisClusterCRC16 工具去计算 slot 定位对应 redis node 的连接,按照 redis node 将这批 key 进行分组 ,那么每组 key 就能分别进行 pipeline 逻辑了
伪代码
static List<Integer, HostAndPort> slot2NodeMap; // 可以通过主动调用Jedis.clusterNodes获取slot映射关系,并缓存在本地
List<Object> clusterPipeline(List keys) {
Map<HostAndPort, List<String>> node2Keys= new HashMap<>(); // 节点对应keys分组
for(String key : keys) {
// 计算key对应的slot
int slot = JedisClusterCRC16.getSlot(key);
// 根据slot获取对应的节点信息,将同一节点的key收在一组
node2Keys.get(slot2NodeMap.get(slot)).add(key);
}
List<Object> results = new ArrayList();
// 分组执行
for (Map.Entry<HostAndPort, List<String>> group : node2Keys) {
Jedis jedis = JedisClusterConnectionHandler.getConnectionFromNode(group.key);
PipeLine pipeline = jedis.pipelined();
// 执行本组keys
result.addAll(jedis.syncAndReturnAll());
}
return results;
}
注意:在 cluster 上执行 pipeline 可能会由于 redis 节点扩缩容 中途 redirection 切换连接导致结果丢失; 可以把 attempts 重试次数设为0 不允许自动切换连接 以感知到异常,然后业务主动进行重试
jedis 官方支持?
github 上其实2017年就有人提交了 cluster pipeline 的pr,维护人员也很乐意 merge 但是~~ 后续跟进比较慢,然后19年 merge review的时候有些异常,提交人也没再跟进,导致一直没有合并成功;
https://github.com/redis/jedis/pull/1455
实现 cluster pipeline 也可以参考这个pr 的提交代码