欢迎访问 生活随笔!

生活随笔

当前位置: 首页 >

Redis学习(二):redis集群之cluster模式下的跨节点的批量操作 I

发布时间:2023/12/18 39 豆豆
生活随笔 收集整理的这篇文章主要介绍了 Redis学习(二):redis集群之cluster模式下的跨节点的批量操作 I 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

说明

通过之前的博文《Redis学习(一):redis集群之哨兵模式下的负载均衡》,对redis哨兵模式下的读负载进行学习研究。在本篇博文中,将对redis cluster模式下的跨节点集合操作进行研究学习。通过本篇博文,我们将了解redis cluster模式的基本原理及在Jedis客户端中如何对redis cluster集群进行批量操作。

正文

Redis Cluster

redis cluster是redis官方推荐的高可用分布式解决方案。它的设计目标主要是:

  • 高性能和线性扩展
  • 一定程度的写操作安全性
  • 可用性

但是它同时也引入了一些缺陷:所有的操作都只能在同一个节点(key的slot相同)进行,只能使用0号数据库而不能使用其他数据库,无法跨节点使用事务等等。

在对redis cluster操作前需要了解下槽点的概念:
redis cluster模式将存储空间在逻辑上分为了16384个槽点,每个节点负责一部分槽点。在对数据进行操作时,需要根据key计算出槽点,进而找到对应的节点进行操作。槽点的算法为:HASH_SLOT = CRC16(KEY) mod 16384

为了保证集群的可用性,官方建议最少使用三个节点,三个从节点,以一主一从的形式。

关于redis集群的搭建可以参考《深入剖析Redis系列(三) - Redis集群模式搭建与原理详解》这篇文章。

关于redis集群更多的详细内容,详见 https://redis.io/topics/cluster-spec


JedisCluster

JedisCluster是Jedis客户端中对cluster模式实现的操作类,通过探究学习其源码来了解Jedis如何对集群进行操作。

通过上图可以看到,JedisCluster继承了BinaryJedisCluster类,实现了JedisClusterCommands, MultiKeyJedisClusterCommands, JedisClusterScriptingCommands接口。

接着再以JedisCluster的构造函数为入口,探究该对象如何创建初始化。

public JedisCluster(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig) {this((Set)nodes, 2000, 5, poolConfig); }public JedisCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts, GenericObjectPoolConfig poolConfig) {super(jedisClusterNode, timeout, maxAttempts, poolConfig); }

通过源码可以发现它最终调用了父类的构造方法:

public BinaryJedisCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts, GenericObjectPoolConfig poolConfig) {this.connectionHandler = new JedisSlotBasedConnectionHandler(jedisClusterNode, poolConfig, timeout);this.maxAttempts = maxAttempts; }

在BinaryJedisCluster类的构造方法中,创建了JedisClusterConnectionHandler对象,可以看到这里具体创建的是JedisSlotBasedConnectionHandler对象。

通过上图可以看到,JedisSlotBasedConnectionHandler继承了JedisClusterConnectionHandler这个抽象类,该类的构造方法也是调用的父类的构造方法。

在抽象类JedisClusterConnectionHandler的构造函数中,创建了JedisClusterInfoCache对象,并进行了槽点初始化。在该类中有唯一一个属性------JedisClusterInfoCache cache;

public JedisClusterConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName) {this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password, clientName);this.initializeSlotsCache(nodes, poolConfig, connectionTimeout, soTimeout, password, clientName); }

至此,我们可以看到与JedisCluster类密切相关的两个类,JedisClusterConnectionHandler和JedisClusterInfoCache。在JedisClusterInfoCache类中有两个关键属性 Map<String, JedisPool> nodes 和 Map<Integer, JedisPool> slots, 通过阅读源码来了解这两个属性的作用。

在以上代码中可以看到,先创建JedisClusterInfoCache对象,再初始化槽点信息。

public JedisClusterInfoCache(GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName) {this.nodes = new HashMap();this.slots = new HashMap();this.rwl = new ReentrantReadWriteLock();this.r = this.rwl.readLock();this.w = this.rwl.writeLock();this.poolConfig = poolConfig;this.connectionTimeout = connectionTimeout;this.soTimeout = soTimeout;this.password = password;this.clientName = clientName; }

初始化槽点信息调用了initializeSlotsCache方法

private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName) {Iterator var7 = startNodes.iterator();while(var7.hasNext()) {HostAndPort hostAndPort = (HostAndPort)var7.next();Jedis jedis = null;try {jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, soTimeout);if (password != null) {jedis.auth(password);}if (clientName != null) {jedis.clientSetname(clientName);}this.cache.discoverClusterNodesAndSlots(jedis);break;} catch (JedisConnectionException var14) {} finally {if (jedis != null) {jedis.close();}}}}

在该方法中,可以看到通过配置的节点信息,循环调用JedisClusterInfoCache对象的discoverClusterNodesAndSlots方法。注意,这里只要初始化成功就立即终止循环。

public void discoverClusterNodesAndSlots(Jedis jedis) {this.w.lock();try {this.reset();List<Object> slots = jedis.clusterSlots();Iterator var3 = slots.iterator();while(true) {List slotInfo;do {if (!var3.hasNext()) {return;}Object slotInfoObj = var3.next();slotInfo = (List)slotInfoObj;} while(slotInfo.size() <= 2);List<Integer> slotNums = this.getAssignedSlotArray(slotInfo);int size = slotInfo.size();for(int i = 2; i < size; ++i) {List<Object> hostInfos = (List)slotInfo.get(i);if (hostInfos.size() > 0) {HostAndPort targetNode = this.generateHostAndPort(hostInfos);this.setupNodeIfNotExist(targetNode);if (i == 2) {this.assignSlotsToNode(slotNums, targetNode);}}}}} finally {this.w.unlock();}}

在该方法中,通过jedis.clusterSlots()获取集群的槽点信息。可以看到该方法返回了一个List<Object>对象,在list中每个元素是单独的slot信息,这也是一个list集合。该集合的基本信息为[long, long, List, List], 第一,二个元素是该节点负责槽点的起始位置,第三个元素是主节点信息,第四个元素为主节点对应的从节点信息。该list的基本信息为[string,int,string],第一个为host信息,第二个为port信息,第三个为唯一id。

在获取有关节点的槽点信息后,调用getAssignedSlotArray(slotinfo)来获取所有的槽点值。

private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {List<Integer> slotNums = new ArrayList();for(int slot = ((Long)slotInfo.get(0)).intValue(); slot <= ((Long)slotInfo.get(1)).intValue(); ++slot) {slotNums.add(slot);}return slotNums; }

再获取主节点的地址信息,调用generateHostAndPort(hostInfo)方法。

private HostAndPort generateHostAndPort(List<Object> hostInfos) {return new HostAndPort(SafeEncoder.encode((byte[])((byte[])hostInfos.get(0))), ((Long)hostInfos.get(1)).intValue()); }

再根据节点地址信息来设置节点对应的JedisPool,即设置Map<String, JedisPool> nodes的值。

public JedisPool setupNodeIfNotExist(HostAndPort node) {this.w.lock();JedisPool nodePool;try {String nodeKey = getNodeKey(node);JedisPool existingPool = (JedisPool)this.nodes.get(nodeKey);if (existingPool == null) {nodePool = new JedisPool(this.poolConfig, node.getHost(), node.getPort(), this.connectionTimeout, this.soTimeout, this.password, 0, this.clientName, false, (SSLSocketFactory)null, (SSLParameters)null, (HostnameVerifier)null);this.nodes.put(nodeKey, nodePool);JedisPool var5 = nodePool;return var5;}nodePool = existingPool;} finally {this.w.unlock();}return nodePool; }

接下来判断若此时节点信息为主节点信息时,则调用assignSlotsToNodes方法,设置每个槽点值对应的连接池,即设置Map<Integer, JedisPool> slots的值。

public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {this.w.lock();try {JedisPool targetPool = this.setupNodeIfNotExist(targetNode);Iterator var4 = targetSlots.iterator();while(var4.hasNext()) {Integer slot = (Integer)var4.next();this.slots.put(slot, targetPool);}} finally {this.w.unlock();}}

至此,JedisCluster对象的初始化完成。这里主要是通过JedisClusterInfoCache对象来保存节点信息及对应槽点信息。


mget操作

在上部分内容中,简单介绍了JedisCluster类的初始化过程。在之前提到redis cluster 只能实现在一个节点的集合操作,即要求所有的key都有相同的slot,这里我们通过源码,了解JedisCluster的有限的批量操作。

public List<String> mget(final String... keys) {return (List)(new JedisClusterCommand<List<String>>(this.connectionHandler, this.maxAttempts) {public List<String> execute(Jedis connection) {return connection.mget(keys);}}).run(keys.length, keys); }

在mget方法中,创建了JedisClusterCommand匿名对象,调用其run()方法来完成操作。这里从run()方法开始探究。

public T run(int keyCount, String... keys) {if (keys != null && keys.length != 0) {int slot = JedisClusterCRC16.getSlot(keys[0]);if (keys.length > 1) {for(int i = 1; i < keyCount; ++i) {int nextSlot = JedisClusterCRC16.getSlot(keys[i]);if (slot != nextSlot) {throw new JedisClusterOperationException("No way to dispatch this command to Redis Cluster because keys have different slots.");}}}return this.runWithRetries(slot, this.maxAttempts, false, (JedisRedirectionException)null);} else {throw new JedisClusterOperationException("No way to dispatch this command to Redis Cluster.");} }

通过该方法可以看到,针对所有的key,都通过JedisClusterCRC16.getSlot(key)方法计算出其对应的槽点值,再循环判断所有的槽点值是否相等,若存在不等则抛出异常: No way to dispatch this command to Redis Cluster because keys have different slots.

校验完成后,调用了runWithRetries方法,具体执行命令,通过该方法名称可以看出,该方法可以失败重试。重试次数为配置时的参数,若没有指定,则JedisCluster默认值为5。

private T runWithRetries(int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {if (attempts <= 0) {throw new JedisClusterMaxAttemptsException("No more cluster attempts left.");} else {Jedis connection = null;Object var7;try {if (redirect != null) {connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode());if (redirect instanceof JedisAskDataException) {connection.asking();}} else if (tryRandomNode) {connection = this.connectionHandler.getConnection();} else {connection = this.connectionHandler.getConnectionFromSlot(slot);}Object var6 = this.execute(connection);return var6;} catch (JedisNoReachableClusterNodeException var13) {throw var13;} catch (JedisConnectionException var14) {this.releaseConnection(connection);connection = null;if (attempts <= 1) {this.connectionHandler.renewSlotCache();}var7 = this.runWithRetries(slot, attempts - 1, tryRandomNode, redirect);return var7;} catch (JedisRedirectionException var15) {if (var15 instanceof JedisMovedDataException) {this.connectionHandler.renewSlotCache(connection);}this.releaseConnection(connection);connection = null;var7 = this.runWithRetries(slot, attempts - 1, false, var15);} finally {this.releaseConnection(connection);}return var7;}}

在方法中,首先通过JedisClusterConnectionHandler的getConnectionFromSlot(slot)方法获取对应槽点的连接jedis对象。

public Jedis getConnectionFromSlot(int slot) {JedisPool connectionPool = this.cache.getSlotPool(slot);if (connectionPool != null) {return connectionPool.getResource();} else {this.renewSlotCache();connectionPool = this.cache.getSlotPool(slot);return connectionPool != null ? connectionPool.getResource() : this.getConnection();} }

在获取槽点对应连接时,是通过JedisClusterInfoCache的getSlotPool(slot)方法。若获取的JedisPool为null,则会进行重新初始化槽点的信息。在重新初始化后若值仍为null,则随机获取一个Jedis对象。

在获取到jedis后,调用execute()方法执行命令,这个方法在创建匿名对象时,该方法被实现。

上面提到,该方法可以失败重试。通过源码得知,在异常为JedisConnectionException或JedisRedirectException时,才进行重试。在重试过程中,也会进行重新初始化槽点信息信息,直到成功执行或重试次数耗尽。

至此,JedisCluster的有限集合操作mget源码分析结束。这里可以看出,JedisCluster只能进行有限的批量操作,必须要求所有key的slot值相等。这样的方式,对我们的使用造成很多不便,虽然官方建议可以通过key的hash_tag来保证slot值一致,实现批量操作。

在下篇博文中,我将会根据阅读源码获取的基本知识来打破这个约束,实现跨节点的批量操作。

总结

以上是生活随笔为你收集整理的Redis学习(二):redis集群之cluster模式下的跨节点的批量操作 I的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。