Redis学习(二):redis集群之cluster模式下的跨节点的批量操作 I
說明
通過之前的博文《Redis學(xué)習(xí)(一):redis集群之哨兵模式下的負(fù)載均衡》,對(duì)redis哨兵模式下的讀負(fù)載進(jìn)行學(xué)習(xí)研究。在本篇博文中,將對(duì)redis cluster模式下的跨節(jié)點(diǎn)集合操作進(jìn)行研究學(xué)習(xí)。通過本篇博文,我們將了解redis cluster模式的基本原理及在Jedis客戶端中如何對(duì)redis cluster集群進(jìn)行批量操作。
正文
Redis Cluster
redis cluster是redis官方推薦的高可用分布式解決方案。它的設(shè)計(jì)目標(biāo)主要是:
- 高性能和線性擴(kuò)展
- 一定程度的寫操作安全性
- 可用性
但是它同時(shí)也引入了一些缺陷:所有的操作都只能在同一個(gè)節(jié)點(diǎn)(key的slot相同)進(jìn)行,只能使用0號(hào)數(shù)據(jù)庫而不能使用其他數(shù)據(jù)庫,無法跨節(jié)點(diǎn)使用事務(wù)等等。
在對(duì)redis cluster操作前需要了解下槽點(diǎn)的概念:
redis cluster模式將存儲(chǔ)空間在邏輯上分為了16384個(gè)槽點(diǎn),每個(gè)節(jié)點(diǎn)負(fù)責(zé)一部分槽點(diǎn)。在對(duì)數(shù)據(jù)進(jìn)行操作時(shí),需要根據(jù)key計(jì)算出槽點(diǎn),進(jìn)而找到對(duì)應(yīng)的節(jié)點(diǎn)進(jìn)行操作。槽點(diǎn)的算法為:HASH_SLOT = CRC16(KEY) mod 16384
為了保證集群的可用性,官方建議最少使用三個(gè)節(jié)點(diǎn),三個(gè)從節(jié)點(diǎn),以一主一從的形式。
關(guān)于redis集群的搭建可以參考《深入剖析Redis系列(三) - Redis集群模式搭建與原理詳解》這篇文章。
關(guān)于redis集群更多的詳細(xì)內(nèi)容,詳見 https://redis.io/topics/cluster-spec
JedisCluster
JedisCluster是Jedis客戶端中對(duì)cluster模式實(shí)現(xiàn)的操作類,通過探究學(xué)習(xí)其源碼來了解Jedis如何對(duì)集群進(jìn)行操作。
通過上圖可以看到,JedisCluster繼承了BinaryJedisCluster類,實(shí)現(xiàn)了JedisClusterCommands, MultiKeyJedisClusterCommands, JedisClusterScriptingCommands接口。
接著再以JedisCluster的構(gòu)造函數(shù)為入口,探究該對(duì)象如何創(chuàng)建初始化。
public JedisCluster(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig) {this((Set)nodes, 2000, 5, poolConfig); }public JedisCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts, GenericObjectPoolConfig poolConfig) {super(jedisClusterNode, timeout, maxAttempts, poolConfig); }通過源碼可以發(fā)現(xiàn)它最終調(diào)用了父類的構(gòu)造方法:
public BinaryJedisCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts, GenericObjectPoolConfig poolConfig) {this.connectionHandler = new JedisSlotBasedConnectionHandler(jedisClusterNode, poolConfig, timeout);this.maxAttempts = maxAttempts; }在BinaryJedisCluster類的構(gòu)造方法中,創(chuàng)建了JedisClusterConnectionHandler對(duì)象,可以看到這里具體創(chuàng)建的是JedisSlotBasedConnectionHandler對(duì)象。
通過上圖可以看到,JedisSlotBasedConnectionHandler繼承了JedisClusterConnectionHandler這個(gè)抽象類,該類的構(gòu)造方法也是調(diào)用的父類的構(gòu)造方法。
在抽象類JedisClusterConnectionHandler的構(gòu)造函數(shù)中,創(chuàng)建了JedisClusterInfoCache對(duì)象,并進(jìn)行了槽點(diǎn)初始化。在該類中有唯一一個(gè)屬性------JedisClusterInfoCache cache;
public JedisClusterConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName) {this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password, clientName);this.initializeSlotsCache(nodes, poolConfig, connectionTimeout, soTimeout, password, clientName); }至此,我們可以看到與JedisCluster類密切相關(guān)的兩個(gè)類,JedisClusterConnectionHandler和JedisClusterInfoCache。在JedisClusterInfoCache類中有兩個(gè)關(guān)鍵屬性 Map<String, JedisPool> nodes 和 Map<Integer, JedisPool> slots, 通過閱讀源碼來了解這兩個(gè)屬性的作用。
在以上代碼中可以看到,先創(chuàng)建JedisClusterInfoCache對(duì)象,再初始化槽點(diǎn)信息。
public JedisClusterInfoCache(GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName) {this.nodes = new HashMap();this.slots = new HashMap();this.rwl = new ReentrantReadWriteLock();this.r = this.rwl.readLock();this.w = this.rwl.writeLock();this.poolConfig = poolConfig;this.connectionTimeout = connectionTimeout;this.soTimeout = soTimeout;this.password = password;this.clientName = clientName; }初始化槽點(diǎn)信息調(diào)用了initializeSlotsCache方法
private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName) {Iterator var7 = startNodes.iterator();while(var7.hasNext()) {HostAndPort hostAndPort = (HostAndPort)var7.next();Jedis jedis = null;try {jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, soTimeout);if (password != null) {jedis.auth(password);}if (clientName != null) {jedis.clientSetname(clientName);}this.cache.discoverClusterNodesAndSlots(jedis);break;} catch (JedisConnectionException var14) {} finally {if (jedis != null) {jedis.close();}}}}在該方法中,可以看到通過配置的節(jié)點(diǎn)信息,循環(huán)調(diào)用JedisClusterInfoCache對(duì)象的discoverClusterNodesAndSlots方法。注意,這里只要初始化成功就立即終止循環(huán)。
public void discoverClusterNodesAndSlots(Jedis jedis) {this.w.lock();try {this.reset();List<Object> slots = jedis.clusterSlots();Iterator var3 = slots.iterator();while(true) {List slotInfo;do {if (!var3.hasNext()) {return;}Object slotInfoObj = var3.next();slotInfo = (List)slotInfoObj;} while(slotInfo.size() <= 2);List<Integer> slotNums = this.getAssignedSlotArray(slotInfo);int size = slotInfo.size();for(int i = 2; i < size; ++i) {List<Object> hostInfos = (List)slotInfo.get(i);if (hostInfos.size() > 0) {HostAndPort targetNode = this.generateHostAndPort(hostInfos);this.setupNodeIfNotExist(targetNode);if (i == 2) {this.assignSlotsToNode(slotNums, targetNode);}}}}} finally {this.w.unlock();}}在該方法中,通過jedis.clusterSlots()獲取集群的槽點(diǎn)信息??梢钥吹皆摲椒ǚ祷亓艘粋€(gè)List<Object>對(duì)象,在list中每個(gè)元素是單獨(dú)的slot信息,這也是一個(gè)list集合。該集合的基本信息為[long, long, List, List], 第一,二個(gè)元素是該節(jié)點(diǎn)負(fù)責(zé)槽點(diǎn)的起始位置,第三個(gè)元素是主節(jié)點(diǎn)信息,第四個(gè)元素為主節(jié)點(diǎn)對(duì)應(yīng)的從節(jié)點(diǎn)信息。該list的基本信息為[string,int,string],第一個(gè)為host信息,第二個(gè)為port信息,第三個(gè)為唯一id。
在獲取有關(guān)節(jié)點(diǎn)的槽點(diǎn)信息后,調(diào)用getAssignedSlotArray(slotinfo)來獲取所有的槽點(diǎn)值。
private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {List<Integer> slotNums = new ArrayList();for(int slot = ((Long)slotInfo.get(0)).intValue(); slot <= ((Long)slotInfo.get(1)).intValue(); ++slot) {slotNums.add(slot);}return slotNums; }再獲取主節(jié)點(diǎn)的地址信息,調(diào)用generateHostAndPort(hostInfo)方法。
private HostAndPort generateHostAndPort(List<Object> hostInfos) {return new HostAndPort(SafeEncoder.encode((byte[])((byte[])hostInfos.get(0))), ((Long)hostInfos.get(1)).intValue()); }再根據(jù)節(jié)點(diǎn)地址信息來設(shè)置節(jié)點(diǎn)對(duì)應(yīng)的JedisPool,即設(shè)置Map<String, JedisPool> nodes的值。
public JedisPool setupNodeIfNotExist(HostAndPort node) {this.w.lock();JedisPool nodePool;try {String nodeKey = getNodeKey(node);JedisPool existingPool = (JedisPool)this.nodes.get(nodeKey);if (existingPool == null) {nodePool = new JedisPool(this.poolConfig, node.getHost(), node.getPort(), this.connectionTimeout, this.soTimeout, this.password, 0, this.clientName, false, (SSLSocketFactory)null, (SSLParameters)null, (HostnameVerifier)null);this.nodes.put(nodeKey, nodePool);JedisPool var5 = nodePool;return var5;}nodePool = existingPool;} finally {this.w.unlock();}return nodePool; }接下來判斷若此時(shí)節(jié)點(diǎn)信息為主節(jié)點(diǎn)信息時(shí),則調(diào)用assignSlotsToNodes方法,設(shè)置每個(gè)槽點(diǎn)值對(duì)應(yīng)的連接池,即設(shè)置Map<Integer, JedisPool> slots的值。
public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {this.w.lock();try {JedisPool targetPool = this.setupNodeIfNotExist(targetNode);Iterator var4 = targetSlots.iterator();while(var4.hasNext()) {Integer slot = (Integer)var4.next();this.slots.put(slot, targetPool);}} finally {this.w.unlock();}}至此,JedisCluster對(duì)象的初始化完成。這里主要是通過JedisClusterInfoCache對(duì)象來保存節(jié)點(diǎn)信息及對(duì)應(yīng)槽點(diǎn)信息。
mget操作
在上部分內(nèi)容中,簡單介紹了JedisCluster類的初始化過程。在之前提到redis cluster 只能實(shí)現(xiàn)在一個(gè)節(jié)點(diǎn)的集合操作,即要求所有的key都有相同的slot,這里我們通過源碼,了解JedisCluster的有限的批量操作。
public List<String> mget(final String... keys) {return (List)(new JedisClusterCommand<List<String>>(this.connectionHandler, this.maxAttempts) {public List<String> execute(Jedis connection) {return connection.mget(keys);}}).run(keys.length, keys); }在mget方法中,創(chuàng)建了JedisClusterCommand匿名對(duì)象,調(diào)用其run()方法來完成操作。這里從run()方法開始探究。
public T run(int keyCount, String... keys) {if (keys != null && keys.length != 0) {int slot = JedisClusterCRC16.getSlot(keys[0]);if (keys.length > 1) {for(int i = 1; i < keyCount; ++i) {int nextSlot = JedisClusterCRC16.getSlot(keys[i]);if (slot != nextSlot) {throw new JedisClusterOperationException("No way to dispatch this command to Redis Cluster because keys have different slots.");}}}return this.runWithRetries(slot, this.maxAttempts, false, (JedisRedirectionException)null);} else {throw new JedisClusterOperationException("No way to dispatch this command to Redis Cluster.");} }通過該方法可以看到,針對(duì)所有的key,都通過JedisClusterCRC16.getSlot(key)方法計(jì)算出其對(duì)應(yīng)的槽點(diǎn)值,再循環(huán)判斷所有的槽點(diǎn)值是否相等,若存在不等則拋出異常: No way to dispatch this command to Redis Cluster because keys have different slots.
校驗(yàn)完成后,調(diào)用了runWithRetries方法,具體執(zhí)行命令,通過該方法名稱可以看出,該方法可以失敗重試。重試次數(shù)為配置時(shí)的參數(shù),若沒有指定,則JedisCluster默認(rèn)值為5。
private T runWithRetries(int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {if (attempts <= 0) {throw new JedisClusterMaxAttemptsException("No more cluster attempts left.");} else {Jedis connection = null;Object var7;try {if (redirect != null) {connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode());if (redirect instanceof JedisAskDataException) {connection.asking();}} else if (tryRandomNode) {connection = this.connectionHandler.getConnection();} else {connection = this.connectionHandler.getConnectionFromSlot(slot);}Object var6 = this.execute(connection);return var6;} catch (JedisNoReachableClusterNodeException var13) {throw var13;} catch (JedisConnectionException var14) {this.releaseConnection(connection);connection = null;if (attempts <= 1) {this.connectionHandler.renewSlotCache();}var7 = this.runWithRetries(slot, attempts - 1, tryRandomNode, redirect);return var7;} catch (JedisRedirectionException var15) {if (var15 instanceof JedisMovedDataException) {this.connectionHandler.renewSlotCache(connection);}this.releaseConnection(connection);connection = null;var7 = this.runWithRetries(slot, attempts - 1, false, var15);} finally {this.releaseConnection(connection);}return var7;}}在方法中,首先通過JedisClusterConnectionHandler的getConnectionFromSlot(slot)方法獲取對(duì)應(yīng)槽點(diǎn)的連接jedis對(duì)象。
public Jedis getConnectionFromSlot(int slot) {JedisPool connectionPool = this.cache.getSlotPool(slot);if (connectionPool != null) {return connectionPool.getResource();} else {this.renewSlotCache();connectionPool = this.cache.getSlotPool(slot);return connectionPool != null ? connectionPool.getResource() : this.getConnection();} }在獲取槽點(diǎn)對(duì)應(yīng)連接時(shí),是通過JedisClusterInfoCache的getSlotPool(slot)方法。若獲取的JedisPool為null,則會(huì)進(jìn)行重新初始化槽點(diǎn)的信息。在重新初始化后若值仍為null,則隨機(jī)獲取一個(gè)Jedis對(duì)象。
在獲取到j(luò)edis后,調(diào)用execute()方法執(zhí)行命令,這個(gè)方法在創(chuàng)建匿名對(duì)象時(shí),該方法被實(shí)現(xiàn)。
上面提到,該方法可以失敗重試。通過源碼得知,在異常為JedisConnectionException或JedisRedirectException時(shí),才進(jìn)行重試。在重試過程中,也會(huì)進(jìn)行重新初始化槽點(diǎn)信息信息,直到成功執(zhí)行或重試次數(shù)耗盡。
至此,JedisCluster的有限集合操作mget源碼分析結(jié)束。這里可以看出,JedisCluster只能進(jìn)行有限的批量操作,必須要求所有key的slot值相等。這樣的方式,對(duì)我們的使用造成很多不便,雖然官方建議可以通過key的hash_tag來保證slot值一致,實(shí)現(xiàn)批量操作。
在下篇博文中,我將會(huì)根據(jù)閱讀源碼獲取的基本知識(shí)來打破這個(gè)約束,實(shí)現(xiàn)跨節(jié)點(diǎn)的批量操作。
總結(jié)
以上是生活随笔為你收集整理的Redis学习(二):redis集群之cluster模式下的跨节点的批量操作 I的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Citrix ADC中SNIP的三种配置
- 下一篇: mysql错误代码1064_如何解决my