日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RocketMQ Consumer 负载均衡算法源码学习 -- AllocateMessageQueueConsistentHash

發布時間:2025/3/21 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ Consumer 负载均衡算法源码学习 -- AllocateMessageQueueConsistentHash 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

RocketMQ 提供了一致性hash 算法來做Consumer 和 MessageQueue的負載均衡。 源碼中一致性hash 環的實現是很優秀的,我們一步一步分析。

一個Hash環包含多個節點, 我們用 MyNode 去封裝節點, 方法 getKey() 封裝獲取節點的key。我們可以實現MyNode 去描述一個物理節點或虛擬節點。MyVirtualNode 實現 MyNode, 表示一個虛擬節點。這里注意:一個虛擬節點是依賴于一個物理節點,所以MyVirtualNode 中封裝了 一個 泛型 T physicalNode。物理節點MyClientNode也是實現了這個MyNode接口,很好的設計。代碼加注釋如下:

?/*** 表示hash環的一個節點*/public interface MyNode {/*** @return 節點的key*/String getKey();}/*** 虛擬節點*/ public class MyVirtualNode<T extends MyNode> implements MyNode {final T physicalNode; ?// 主節點final int replicaIndex; ?// 虛節點下標public MyVirtualNode(T physicalNode, int replicaIndex) {this.physicalNode = physicalNode;this.replicaIndex = replicaIndex;}@Overridepublic String getKey() {return physicalNode.getKey() + "-" + replicaIndex;}/*** thisMyVirtualNode 是否是pNode 的 虛節點*/public boolean isVirtualNodeOf(T pNode) {return physicalNode.getKey().equals(pNode.getKey());}public T getPhysicalNode() {return physicalNode;}}private static class MyClientNode implements MyNode {private final String clientID;public MyClientNode(String clientID) {this.clientID = clientID;}@Overridepublic String getKey() {return clientID;} }

上面實現了節點, 一致性hash 下一個問題是怎么封裝hash算法呢?RocketMQ 使用 MyHashFunction 接口定義hash算法。使用MD5 + bit 位hash的方式實現hash算法。我們完全可以自己實現hash算法,具體見我的“常見的一些hash函數”文章。MyMD5Hash 算法代碼的如下:

?// MD5 hash 算法, 這里hash算法可以用常用的 hash 算法替換。private static class MyMD5Hash implements MyHashFunction {MessageDigest instance;public MyMD5Hash() {try {instance = MessageDigest.getInstance("MD5");} catch (NoSuchAlgorithmException e) {}}@Overridepublic long hash(String key) {instance.reset();instance.update(key.getBytes());byte[] digest = instance.digest();long h = 0;for (int i = 0; i < 4; i++) {h <<= 8;h |= ((int)digest[i]) & 0xFF;}return h;}}

現在,hash環的節點有了, hash算法也有了,最重要的是描述一個一致性hash 環。 想一想,這個環可以由N 個物理節點, 每個物理節點對應m個虛擬節點,節點位置用hash算法值描述。每個物理節點就是每個Consumer, 每個Consumer 的 id 就是 物理節點的key。 每個MessageQueue 的toString() 值 hash 后,用來找環上對應的最近的下一個物理節點。源碼如下,這里展示主要的代碼,其中最巧妙地是routeNode 方法, addNode 方法 注意我的注釋:

public class MyConsistentHashRouter<T extends MyNode> {private final SortedMap<Long, MyVirtualNode<T>> ring = new TreeMap<>(); // key是虛節點key的哈希值, value 是虛節點 private final MyHashFunction myHashFunction; /*** @param pNodes 物理節點集合* @param vNodeCount 每個物理節點對應的虛節點數量* @param hashFunction hash 函數 用于 hash 各個節點*/ public MyConsistentHashRouter(Collection<T> pNodes, int vNodeCount, MyHashFunction hashFunction) {if (hashFunction == null) {throw new NullPointerException("Hash Function is null");}this.myHashFunction = hashFunction;if (pNodes != null) {for (T pNode : pNodes) {this.addNode(pNode, vNodeCount);}} } /*** 添加物理節點和它的虛節點到hash環。* @param pNode 物理節點* @param vNodeCount 虛節點數量。*/ public void addNode(T pNode, int vNodeCount) {if (vNodeCount < 0) {throw new IllegalArgumentException("ill virtual node counts :" + vNodeCount);}int existingReplicas = this.getExistingReplicas(pNode);for (int i = 0; i < vNodeCount; i++) {MyVirtualNode<T> vNode = new MyVirtualNode<T>(pNode, i + existingReplicas); // 創建一個新的虛節點,位置是 i+existingReplicasring.put(this.myHashFunction.hash(vNode.getKey()), vNode); // 將新的虛節點放到hash環中} } /*** 根據一個給定的key 在 hash環中 找到離這個key最近的下一個物理節點* @param key 一個key, 用于找這個key 在環上最近的節點*/ public T routeNode(String key) {if (ring.isEmpty()) {return null;}Long hashVal = this.myHashFunction.hash(key);SortedMap<Long, MyVirtualNode<T>> tailMap = ring.tailMap(hashVal);Long nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();return ring.get(nodeHashVal).getPhysicalNode(); }/*** @param pNode 物理節點* @return 當前這個物理節點對應的虛節點的個數*/ public int getExistingReplicas(T pNode) {int replicas = 0;for (MyVirtualNode<T> vNode : ring.values()) {if (vNode.isVirtualNodeOf(pNode)) {replicas++;}}return replicas; }

現在一致性hash 環有了, 剩下的就是 和rocketmq 的 consumer, mq 構成負載均衡策略了。比較簡單, 代碼如下:

??? ??? ??? ?/*** 基于一致性性hash環的Consumer負載均衡.*/?? ??public class MyAllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {// 每個物理節點對應的虛節點的個數private final int virtualNodeCnt;private final MyHashFunction customHashFunction;public MyAllocateMessageQueueConsistentHash() {this(10); ? // 默認10個虛擬節點}public MyAllocateMessageQueueConsistentHash(int virtualNodeCnt) {this(virtualNodeCnt, null);}public MyAllocateMessageQueueConsistentHash(int virtualNodeCnt, MyHashFunction customHashFunction) {if (virtualNodeCnt < 0) {throw new IllegalArgumentException("illegal virtualNodeCnt : " + virtualNodeCnt);}this.virtualNodeCnt = virtualNodeCnt;this.customHashFunction = customHashFunction;}@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {// 省去一系列非空校驗Collection<MyClientNode> cidNodes = new ArrayList<>();for (String cid : cidAll) {cidNodes.add(new MyClientNode(cid));}final MyConsistentHashRouter<MyClientNode> router;if (this.customHashFunction != null) {router = new MyConsistentHashRouter<MyClientNode>(cidNodes, virtualNodeCnt, customHashFunction);}else {router = new MyConsistentHashRouter<MyClientNode>(cidNodes, virtualNodeCnt);}List<MessageQueue> results = new ArrayList<MessageQueue>(); ?// 當前 currentCID 對應的 mq// 將每個mq 根據一致性hash 算法找到對應的物理節點(Consumer)for (MessageQueue mq : mqAll) {MyClientNode clientNode = router.routeNode(mq.toString()); ? // 根據 mq toString() 方法做hash 和環上節點比較if (clientNode != null && currentCID.equals(clientNode.getKey())) {results.add(mq);}}return results;}@Overridepublic String getName() {return "CONSISTENT_HASH";}private static class MyClientNode implements MyNode {private final String clientID;public MyClientNode(String clientID) {this.clientID = clientID;}@Overridepublic String getKey() {return clientID;}}}


————————————————
版權聲明:本文為CSDN博主「昊haohao」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/ZHANGYONGHAO604/article/details/82426373

總結

以上是生活随笔為你收集整理的RocketMQ Consumer 负载均衡算法源码学习 -- AllocateMessageQueueConsistentHash的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。