一致性 Hash 算法原理总结
一致性 Hash 算法是解決分布式緩存等問題的一種算法,本文介紹了一致性 Hash 算法的原理,并給出了一種實現和實際運用的案例;
一致性 Hash 算法背景
考慮這么一種場景:
我們有三臺緩存服務器編號node0、node1、node2,現在有 3000 萬個key,希望可以將這些個 key 均勻的緩存到三臺機器上,你會想到什么方案呢?
我們可能首先想到的方案是:取模算法hash(key)% N,即:對 key 進行 hash 運算后取模,N 是機器的數量;
這樣,對 key 進行 hash 后的結果對 3 取模,得到的結果一定是 0、1 或者 2,正好對應服務器node0、node1、node2,存取數據直接找對應的服務器即可,簡單粗暴,完全可以解決上述的問題;
取模算法雖然使用簡單,但對機器數量取模,在集群擴容和收縮時卻有一定的局限性:因為在生產環境中根據業務量的大小,調整服務器數量是常有的事;
而服務器數量 N 發生變化后hash(key)% N計算的結果也會隨之變化!
比如:一個服務器節點掛了,計算公式從hash(key)% 3變成了hash(key)% 2,結果會發生變化,此時想要訪問一個 key,這個 key 的緩存位置大概率會發生改變,那么之前緩存 key 的數據也會失去作用與意義;
大量緩存在同一時間失效,造成緩存的雪崩,進而導致整個緩存系統的不可用,這基本上是不能接受的;
為了解決優化上述情況,一致性 hash 算法應運而生~
一致性 Hash 算法詳述
算法原理
一致性哈希算法在 1997 年由麻省理工學院提出,是一種特殊的哈希算法,在移除或者添加一個服務器時,能夠盡可能小地改變已存在的服務請求與處理請求服務器之間的映射關系;
一致性哈希解決了簡單哈希算法在分布式哈希表(Distributed Hash Table,DHT)中存在的動態伸縮等問題;
一致性 hash 算法本質上也是一種取模算法;
不過,不同于上邊按服務器數量取模,一致性 hash 是對固定值 2^32 取模;
IPv4 的地址是 4 組 8 位 2 進制數組成,所以用 2^32 可以保證每個 IP 地址會有唯一的映射;
① hash 環
我們可以將這2^32個值抽象成一個圓環 ??,圓環的正上方的點代表 0,順時針排列,以此類推:1、2、3…直到2^32-1,而這個由 2 的 32 次方個點組成的圓環統稱為hash環;
② 服務器映射到 hash 環
在對服務器進行映射時,使用hash(服務器ip)% 2^32,即:
使用服務器 IP 地址進行 hash 計算,用哈希后的結果對2^32取模,結果一定是一個 0 到2^32-1之間的整數;
而這個整數映射在 hash 環上的位置代表了一個服務器,依次將node0、node1、node2三個緩存服務器映射到 hash 環上;
③ 對象 key 映射到服務器
在對對應的 Key 映射到具體的服務器時,需要首先計算 Key 的 Hash 值:hash(key)% 2^32;
注:此處的 Hash 函數可以和之前計算服務器映射至 Hash 環的函數不同,只要保證取值范圍和 Hash 環的范圍相同即可(即:2^32);
將 Key 映射至服務器遵循下面的邏輯:
從緩存對象 key 的位置開始,沿順時針方向遇到的第一個服務器,便是當前對象將要緩存到的服務器;
假設我們有 "semlinker"、"kakuqo"、"lolo"、"fer" 四個對象,分別簡寫為 o1、o2、o3 和 o4;
首先,使用哈希函數計算這個對象的 hash 值,值的范圍是 [0, 2^32-1]:
圖中對象的映射關系如下:
hash(o1) = k1; hash(o2) = k2; hash(o3) = k3; hash(o4) = k4;同時 3 臺緩存服務器,分別為 CS1、CS2 和 CS3:
則可知,各對象和服務器的映射關系如下:
K1?=>?CS1 K4?=>?CS3 K2?=>?CS2 K3?=>?CS1即:
以上便是一致性 Hash 的工作原理;
可以看到,一致性 Hash 就是:將原本單個點的 Hash 映射,轉變為了在一個環上的某個片段上的映射!
下面我們來看幾種服務器擴縮容的場景;
服務器擴縮容場景
① 服務器減少
假設 CS3 服務器出現故障導致服務下線,這時原本存儲于 CS3 服務器的對象 o4,需要被重新分配至 CS2 服務器,其它對象仍存儲在原有的機器上:
此時受影響的數據只有 CS2 和 CS3 服務器之間的部分數據!
② 服務器增加
假如業務量激增,我們需要增加一臺服務器 CS4,經過同樣的 hash 運算,該服務器最終落于 t1 和 t2 服務器之間,具體如下圖所示:
此時,只有 t1 和 t2 服務器之間的部分對象需要重新分配;
在以上示例中只有 o3 對象需要重新分配,即它被重新到 CS4 服務器;
在前面我們已經說過:如果使用簡單的取模方法,當新添加服務器時可能會導致大部分緩存失效,而使用一致性哈希算法后,這種情況得到了較大的改善,因為只有少部分對象需要重新分配!
數據偏斜&服務器性能平衡問題
引出問題
在上面給出的例子中,各個服務器幾乎是平均被均攤到 Hash 環上;
但是在實際場景中很難選取到一個 Hash 函數這么完美的將各個服務器散列到 Hash 環上;
此時,在服務器節點數量太少的情況下,很容易因為節點分布不均勻而造成數據傾斜問題;
如下圖被緩存的對象大部分緩存在node-4服務器上,導致其他節點資源浪費,系統壓力大部分集中在node-4節點上,這樣的集群是非常不健康的:
同時,還有另一個問題:
在上面新增服務器 CS4 時,CS4 只分擔了 CS1 服務器的負載,服務器 CS2 和 CS3 并沒有因為 CS4 服務器的加入而減少負載壓力;如果 CS4 服務器的性能與原有服務器的性能一致甚至可能更高,那么這種結果并不是我們所期望的;
虛擬節點
針對上面的問題,我們可以通過:引入虛擬節點來解決負載不均衡的問題:
即將每臺物理服務器虛擬為一組虛擬服務器,將虛擬服務器放置到哈希環上,如果要確定對象的服務器,需先確定對象的虛擬服務器,再由虛擬服務器確定物理服務器;
如下圖所示:
在圖中:o1 和 o2 表示對象,v1 ~ v6 表示虛擬服務器,s1 ~ s3 表示實際的物理服務器;
虛擬節點的計算
虛擬節點的 hash 計算通常可以采用:對應節點的 IP 地址加數字編號后綴 hash(10.24.23.227#1) 的方式;
舉個例子,node-1 節點 IP 為 10.24.23.227,正常計算node-1的 hash 值:
hash(10.24.23.227#1)% 2^32
假設我們給 node-1 設置三個虛擬節點,node-1#1、node-1#2、node-1#3,對它們進行 hash 后取模:
hash(10.24.23.227#1)% 2^32
hash(10.24.23.227#2)% 2^32
hash(10.24.23.227#3)% 2^32
注意:
分配的虛擬節點個數越多,映射在 hash 環上才會越趨于均勻,節點太少的話很難看出效果;
引入虛擬節點的同時也增加了新的問題,要做虛擬節點和真實節點間的映射,對象key->虛擬節點->實際節點之間的轉換;
使用場景
一致性 hash 在分布式系統中應該是實現負載均衡的首選算法,它的實現比較靈活,既可以在客戶端實現,也可以在中間件上實現,比如日常使用較多的緩存中間件memcached和redis集群都有用到它;
memcached 的集群比較特殊,嚴格來說它只能算是偽集群,因為它的服務器之間不能通信,請求的分發路由完全靠客戶端來的計算出緩存對象應該落在哪個服務器上,而它的路由算法用的就是一致性 hash;
還有 redis 集群中 hash 槽的概念,雖然實現不盡相同,但思想萬變不離其宗,看完本篇的一致性 hash,你再去理解 redis 槽位就輕松多了;
其它的應用場景還有很多:
RPC框架Dubbo用來選擇服務提供者
分布式關系數據庫分庫分表:數據與節點的映射關系
LVS負載均衡調度器
……
一致性 Hash 算法實現
下面我們根據上面的講述,使用 Golang 實現一個一致性 Hash 算法,這個算法具有一些下面的功能特性:
一致性 Hash 核心算法;
支持自定義 Hash 算法;
支持自定義虛擬節點個數;
具體源代碼見:
https://github.com/JasonkayZK/consistent-hashing-demo
下面開始實現吧!
結構體、錯誤以及常量定義
① 結構體定義
首先定義每一臺緩存服務器的數據結構:
core/host.go
type?Host?struct?{//?the?host?id:?ip:portName?string//?the?load?bound?of?the?hostLoadBound?int64 }其中:
Name:緩存服務器的 Ip 地址 + 端口,如:127.0.0.1:8000
LoadBound:緩存服務器當前處理的“請求”緩存數,這個字段在后文含有負載邊界值的一致性 Hash中會用到;
其次,定義一致性 Hash 的結構:
core/algorithm.go
//?Consistent?is?an?implementation?of?consistent-hashing-algorithm type?Consistent?struct?{//?the?number?of?replicasreplicaNum?int//?the?total?loads?of?all?replicastotalLoad?int64//?the?hash?function?for?keyshashFunc?func(key?string)?uint64//?the?map?of?virtual?nodes?to?hostshostMap?map[string]*Host//?the?map?of?hashed?virtual?nodes?to?host?namereplicaHostMap?map[uint64]string//?the?hash?ringsortedHostsHashSet?[]uint64//?the?hash?ring?locksync.RWMutex }其中:
replicaNum:表示每個真實的緩存服務器在 Hash 環中存在的虛擬節點數;
totalLoad:所有物理服務器對應的總緩存“請求”數(這個字段在后文含有負載邊界值的一致性 Hash中會用到);
hashFunc:計算 Hash 環映射以及 Key 映射的散列函數;
hostMap:物理服務器名稱對應的 Host 結構體映射;
replicaHostMap:Hash 環中虛擬節點對應真實緩存服務器名稱的映射;
sortedHostsHashSet:Hash 環;
sync.RWMutex:操作 Hash 環時用到的讀寫鎖;
大概的結構如上所示,下面我們來看一些常量和錯誤的定義;
② 常量和錯誤定義
常量的定義如下:
core/algorithm.go
const?(//?The?format?of?the?host?replica?namehostReplicaFormat?=?`%s%d` )var?(//?the?default?number?of?replicasdefaultReplicaNum?=?10//?the?load?bound?factor//?ref:?https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.htmlloadBoundFactor?=?0.25//?the?default?Hash?function?for?keysdefaultHashFunc?=?func(key?string)?uint64?{out?:=?sha512.Sum512([]byte(key))return?binary.LittleEndian.Uint64(out[:])} )分別表示:
defaultReplicaNum:默認情況下,每個真實的物理服務器在 Hash 環中虛擬節點的個數;
loadBoundFactor:負載邊界因數(這個字段在后文含有負載邊界值的一致性 Hash中會用到);
defaultHashFunc:默認的散列函數,這里用到的是 SHA512 算法,并取的是unsigned int64,這一點和上面介紹的0~2^32-1有所區別!
hostReplicaFormat:虛擬節點名稱格式,這里的虛擬節點的格式為:%s%d,和上文提到的10.24.23.227#1的格式有所區別,但是道理是一樣的!
還有一些錯誤的定義:
core/error.go
var?(ErrHostAlreadyExists?=?errors.New("host?already?exists")ErrHostNotFound?=?errors.New("host?not?found") )分別表示服務器已經注冊,以及緩存服務器未找到;
下面來看具體的方法實現!
注冊/注銷緩存服務器
① 注冊緩存服務器
注冊緩存服務器的代碼如下:
core/algorithm.go
func?(c?*Consistent)?RegisterHost(hostName?string)?error?{c.Lock()defer?c.Unlock()if?_,?ok?:=?c.hostMap[hostName];?ok?{return?ErrHostAlreadyExists}c.hostMap[hostName]?=?&Host{Name:??????hostName,LoadBound:?0,}for?i?:=?0;?i?<?c.replicaNum;?i++?{hashedIdx?:=?c.hashFunc(fmt.Sprintf(hostReplicaFormat,?hostName,?i))c.replicaHostMap[hashedIdx]?=?hostNamec.sortedHostsHashSet?=?append(c.sortedHostsHashSet,?hashedIdx)}//?sort?hashes?in?ascending?ordersort.Slice(c.sortedHostsHashSet,?func(i?int,?j?int)?bool?{if?c.sortedHostsHashSet[i]?<?c.sortedHostsHashSet[j]?{return?true}return?false})return?nil }代碼比較簡單,簡單說一下;
首先,檢查服務器是否已經注冊,如果已經注冊,則直接返回已經注冊的錯誤;
隨后,創建一個 Host 對象,并且在 for 循環中創建多個虛擬節點:
根據 hashFunc 計算服務器散列值【注:此處計算的散列值可能和之前的值存在沖突,本實現中暫不考慮這種場景】;
將散列值加入 replicaHostMap 中;
將散列值加入 sortedHostsHashSet 中;
最后,對 Hash 環進行排序;
這里使用數組作為 Hash 環只是為了便于說明,在實際實現中建議選用其他數據結構進行實現,以獲取更好的性能;
當緩存服務器信息寫入 replicaHostMap 映射以及 Hash 環后,即完成了緩存服務器的注冊;
② 注銷緩存服務器
注銷緩存服務器的代碼如下:
core/algorithm.go
func?(c?*Consistent)?UnregisterHost(hostName?string)?error?{c.Lock()defer?c.Unlock()if?_,?ok?:=?c.hostMap[hostName];?!ok?{return?ErrHostNotFound}delete(c.hostMap,?hostName)for?i?:=?0;?i?<?c.replicaNum;?i++?{hashedIdx?:=?c.hashFunc(fmt.Sprintf(hostReplicaFormat,?hostName,?i))delete(c.replicaHostMap,?hashedIdx)c.delHashIndex(hashedIdx)}return?nil }//?Remove?hashed?host?index?from?the?hash?ring func?(c?*Consistent)?delHashIndex(val?uint64)?{idx?:=?-1l?:=?0r?:=?len(c.sortedHostsHashSet)?-?1for?l?<=?r?{m?:=?(l?+?r)?/?2if?c.sortedHostsHashSet[m]?==?val?{idx?=?mbreak}?else?if?c.sortedHostsHashSet[m]?<?val?{l?=?m?+?1}?else?if?c.sortedHostsHashSet[m]?>?val?{r?=?m?-?1}}if?idx?!=?-1?{c.sortedHostsHashSet?=?append(c.sortedHostsHashSet[:idx],?c.sortedHostsHashSet[idx+1:]...)} }和注冊緩存服務器相反,將服務器在 Map 映射以及 Hash 環中去除即完成了注銷;
這里的邏輯和上面注冊的邏輯極為類似,這里不再贅述!
查詢 Key(核心)
查詢 Key 是整個一致性 Hash 算法的核心,但是實現起來也并不復雜;
代碼如下:
core/algorithm.go
func?(c?*Consistent)?GetKey(key?string)?(string,?error)?{hashedKey?:=?c.hashFunc(key)idx?:=?c.searchKey(hashedKey)return?c.replicaHostMap[c.sortedHostsHashSet[idx]],?nil }func?(c?*Consistent)?searchKey(key?uint64)?int?{idx?:=?sort.Search(len(c.sortedHostsHashSet),?func(i?int)?bool?{return?c.sortedHostsHashSet[i]?>=?key})if?idx?>=?len(c.sortedHostsHashSet)?{//?make?search?as?a?ringidx?=?0}return?idx }代碼首先計算 key 的散列值;
隨后,在 Hash 環上“順時針”尋找可以緩存的第一臺緩存服務器:
idx?:=?sort.Search(len(c.sortedHostsHashSet),?func(i?int)?bool?{return?c.sortedHostsHashSet[i]?>=?key })注意到,如果 key 比當前 Hash 環中最大的虛擬節點的 hash 值還大,則選擇當前 Hash 環 中 hash 值最小的一個節點(即“環形”的邏輯):
if?idx?>=?len(c.sortedHostsHashSet)?{//?make?search?as?a?ringidx?=?0 }searchKey 返回了虛擬節點在 Hash 環數組中的 index;
隨后,我們使用 map 返回 index 對應的緩存服務器的名稱即可;
至此,一致性 Hash 算法基本實現,接下來我們來驗證一下。
一致性 Hash 算法實踐與檢驗
算法驗證前準備
① 緩存服務器準備
在驗證算法之前,我們還需要準備幾臺緩存服務器;
為了簡單起見,這里使用了 HTTP 服務器作為緩存服務器,具體代碼如下所示:
server/main.go
package?mainimport?("flag""fmt""net/http""sync""time" )type?CachedMap?struct?{KvMap?sync.MapLock??sync.RWMutex }var?(cache?=?CachedMap{KvMap:?sync.Map{}}port?=?flag.String("p",?"8080",?"port")regHost?=?"http://localhost:18888"expireTime?=?10 )func?main()?{flag.Parse()stopChan?:=?make(chan?interface{})startServer(*port)<-stopChan }func?startServer(port?string)?{hostName?:=?fmt.Sprintf("localhost:%s",?port)fmt.Printf("start?server:?%s\n",?port)err?:=?registerHost(hostName)if?err?!=?nil?{panic(err)}http.HandleFunc("/",?kvHandle)err?=?http.ListenAndServe(":"+port,?nil)if?err?!=?nil?{err?=?unregisterHost(hostName)if?err?!=?nil?{panic(err)}panic(err)} }func?kvHandle(w?http.ResponseWriter,?r?*http.Request)?{_?=?r.ParseForm()if?_,?ok?:=?cache.KvMap.Load(r.Form["key"][0]);?!ok?{val?:=?fmt.Sprintf("hello:?%s",?r.Form["key"][0])cache.KvMap.Store(r.Form["key"][0],?val)fmt.Printf("cached?key:?{%s:?%s}\n",?r.Form["key"][0],?val)time.AfterFunc(time.Duration(expireTime)*time.Second,?func()?{cache.KvMap.Delete(r.Form["key"][0])fmt.Printf("removed?cached?key?after?3s:?{%s:?%s}\n",?r.Form["key"][0],?val)})}val,?_?:=?cache.KvMap.Load(r.Form["key"][0])_,?err?:=?fmt.Fprintf(w,?val.(string))if?err?!=?nil?{panic(err)} }func?registerHost(host?string)?error?{resp,?err?:=?http.Get(fmt.Sprintf("%s/register?host=%s",?regHost,?host))if?err?!=?nil?{return?err}defer?resp.Body.Close()return?nil }func?unregisterHost(host?string)?error?{resp,?err?:=?http.Get(fmt.Sprintf("%s/unregister?host=%s",?regHost,?host))if?err?!=?nil?{return?err}defer?resp.Body.Close()return?nil }代碼接受由命令行指定的 -p 參數指定服務器端口號;
代碼執行后,會調用 startServer 函數啟動一個 http 服務器;
在 startServer 函數中,首先調用 registerHost 在代理服務器上進行注冊(下文會講),并監聽 / 路徑,具體代碼如下:
func?startServer(port?string)?{hostName?:=?fmt.Sprintf("localhost:%s",?port)fmt.Printf("start?server:?%s\n",?port)err?:=?registerHost(hostName)if?err?!=?nil?{panic(err)}http.HandleFunc("/",?kvHandle)err?=?http.ListenAndServe(":"+port,?nil)if?err?!=?nil?{err?=?unregisterHost(hostName)if?err?!=?nil?{panic(err)}panic(err)} }kvHandle 函數對請求進行處理:
func?kvHandle(w?http.ResponseWriter,?r?*http.Request)?{_?=?r.ParseForm()if?_,?ok?:=?cache.KvMap.Load(r.Form["key"][0]);?!ok?{val?:=?fmt.Sprintf("hello:?%s",?r.Form["key"][0])cache.KvMap.Store(r.Form["key"][0],?val)fmt.Printf("cached?key:?{%s:?%s}\n",?r.Form["key"][0],?val)time.AfterFunc(time.Duration(expireTime)*time.Second,?func()?{cache.KvMap.Delete(r.Form["key"][0])fmt.Printf("removed?cached?key?after?3s:?{%s:?%s}\n",?r.Form["key"][0],?val)})}val,?_?:=?cache.KvMap.Load(r.Form["key"][0])_,?err?:=?fmt.Fprintf(w,?val.(string))if?err?!=?nil?{panic(err)} }首先,解析來自路徑的參數:?key=xxx;
隨后,查詢服務器中的緩存(為了簡單起見,這里使用 sync.Map 來模擬緩存):
如果緩存不存在,則寫入緩存,并通過 time.AfterFunc 設置緩存過期時間(expireTime);
最后,返回緩存;
② 緩存代理服務器準備
有了緩存服務器之后,我們還需要一個代理服務器來選擇具體選擇哪個緩存服務器來請求;
代碼如下:
proxy/proxy.go
package?proxyimport?("fmt""github.com/jasonkayzk/consistent-hashing-demo/core""io/ioutil""net/http""time" )type?Proxy?struct?{consistent?*core.Consistent }//?NewProxy?creates?a?new?Proxy func?NewProxy(consistent?*core.Consistent)?*Proxy?{proxy?:=?&Proxy{consistent:?consistent,}return?proxy }func?(p?*Proxy)?GetKey(key?string)?(string,?error)?{host,?err?:=?p.consistent.GetKey(key)if?err?!=?nil?{return?"",?err}resp,?err?:=?http.Get(fmt.Sprintf("http://%s?key=%s",?host,?key))if?err?!=?nil?{return?"",?err}defer?resp.Body.Close()body,?_?:=?ioutil.ReadAll(resp.Body)fmt.Printf("Response?from?host?%s:?%s\n",?host,?string(body))return?string(body),?nil }func?(p?*Proxy)?RegisterHost(host?string)?error?{err?:=?p.consistent.RegisterHost(host)if?err?!=?nil?{return?err}fmt.Println(fmt.Sprintf("register?host:?%s?success",?host))return?nil }func?(p?*Proxy)?UnregisterHost(host?string)?error?{err?:=?p.consistent.UnregisterHost(host)if?err?!=?nil?{return?err}fmt.Println(fmt.Sprintf("unregister?host:?%s?success",?host))return?nil }代理服務器的邏輯很簡單,就是創建一個一致性 Hash 結構:Consistent,把 Consistent 和請求緩存服務器的邏輯進行了一層封裝;
算法驗證
啟動代理服務器
啟動代理服務器的代碼如下:
package?mainimport?("fmt""github.com/jasonkayzk/consistent-hashing-demo/core""github.com/jasonkayzk/consistent-hashing-demo/proxy""net/http" )var?(port?=?"18888"p?=?proxy.NewProxy(core.NewConsistent(10,?nil)) )func?main()?{stopChan?:=?make(chan?interface{})startServer(port)<-stopChan }func?startServer(port?string)?{http.HandleFunc("/register",?registerHost)http.HandleFunc("/unregister",?unregisterHost)http.HandleFunc("/key",?getKey)fmt.Printf("start?proxy?server:?%s\n",?port)err?:=?http.ListenAndServe(":"+port,?nil)if?err?!=?nil?{panic(err)} }func?registerHost(w?http.ResponseWriter,?r?*http.Request)?{_?=?r.ParseForm()err?:=?p.RegisterHost(r.Form["host"][0])if?err?!=?nil?{w.WriteHeader(http.StatusInternalServerError)_,?_?=?fmt.Fprintf(w,?err.Error())return}_,?_?=?fmt.Fprintf(w,?fmt.Sprintf("register?host:?%s?success",?r.Form["host"][0])) }func?unregisterHost(w?http.ResponseWriter,?r?*http.Request)?{_?=?r.ParseForm()err?:=?p.UnregisterHost(r.Form["host"][0])if?err?!=?nil?{w.WriteHeader(http.StatusInternalServerError)_,?_?=?fmt.Fprintf(w,?err.Error())return}_,?_?=?fmt.Fprintf(w,?fmt.Sprintf("unregister?host:?%s?success",?r.Form["host"][0])) }func?getKey(w?http.ResponseWriter,?r?*http.Request)?{_?=?r.ParseForm()val,?err?:=?p.GetKey(r.Form["key"][0])if?err?!=?nil?{w.WriteHeader(http.StatusInternalServerError)_,?_?=?fmt.Fprintf(w,?err.Error())return}_,?_?=?fmt.Fprintf(w,?fmt.Sprintf("key:?%s,?val:?%s",?r.Form["key"][0],?val)) }和緩存服務器類似,這里采用 HTTP 服務器來模擬;
代理服務器監聽 18888 端口的幾個路由:
/register:注冊緩存服務器;
/unregister:注銷緩存服務器;
/key:查詢緩存 Key;
這里為了簡單起見,使用了這種方式進行服務注冊,實際使用時請使用其他組件進行實現!
接下來啟動緩存服務器:
start?proxy?server:?18888啟動緩存服務器
分別啟動三個緩存服務器:
$?go?run?server/main.go?-p?8080 start?server:?8080$?go?run?server/main.go?-p?8081 start?server:?8081$?go?run?server/main.go?-p?8082 start?server:?8082同時,代理服務器輸出:
register?host:?localhost:8080?success register?host:?localhost:8081?success register?host:?localhost:8082?success可以看到緩存服務器已經成功注冊;
請求代理服務器獲取 Key
可以使用 curl 命令請求代理服務器獲取緩存 key:
$?curl?localhost:18888/key?key=123 key:?123,?val:?hello:?123此時,代理服務器輸出:
Response?from?host?localhost:8080:?hello:?123同時,8000 端口的緩存服務器輸出:
cached?key:?{123:?hello:?123} removed?cached?key?after?10s:?{123:?hello:?123}可以看到,8000 端口的服務器對 key 值進行了緩存,并在 10 秒后清除了緩存;
嘗試多次獲取 Key
嘗試獲取多個 Key:
Response?from?host?localhost:8082:?hello:?45363456 Response?from?host?localhost:8080:?hello:?4 Response?from?host?localhost:8082:?hello:?1 Response?from?host?localhost:8080:?hello:?2 Response?from?host?localhost:8082:?hello:?3 Response?from?host?localhost:8080:?hello:?4 Response?from?host?localhost:8082:?hello:?5 Response?from?host?localhost:8080:?hello:?6 Response?from?host?localhost:8082:?hello:?sdkbnfoerwtnbre Response?from?host?localhost:8082:?hello:?sd45555254tg423i5gvj4v5 Response?from?host?localhost:8081:?hello:?0 Response?from?host?localhost:8082:?hello:?032452345可以看到不同的 key 被散列到了不同的緩存服務器;
接下來我們通過 debug 查看具體的變量來一探究竟;
通過 Debug 查看注冊和 Hash 環
開啟 debug,并注冊單個緩存服務器后,查看 Consistent 中的值:
注冊三個緩存服務器后,查看 Consistent 中的值:
從 debug 中的變量,我們就可以很清楚的看到注冊不同數量的服務器時,一致性 Hash 上服務器的動態變化!
以上就是基本的一致性 Hash 算法的實現了!
但是很多時候,我們的緩存服務器需要同時處理大量的緩存請求,而通過上面的算法,我們總是會去同一臺緩存服務器去獲取緩存數據;
如果很多的熱點數據都落在了同一臺緩存服務器上,則可能會出現性能瓶頸;
Google 在 2017 年提出了:含有負載邊界值的一致性 Hash 算法;
下面我們在基本的一致性 Hash 算法的基礎上,實現含有負載邊界值的一致性 Hash!
含有負載邊界值的一致性 Hash
算法描述
17 年時,Google 提出了含有負載邊界值的一致性 Hash 算法,此算法主要應用于在實現一致性的同時,實現負載的平均性;
此算法最初由 Vimeo 的 Andrew Rodland 在 haproxy 中實現并開源;
參考:
https://ai.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
arvix 論文地址:
https://arxiv.org/abs/1608.01350
這個算法將緩存服務器視為一個含有一定容量的桶(可以簡單理解為 Hash 桶),將客戶端視為球,則平均性目標表示為:所有約等于平均密度(球的數量除以桶的數量):
實際使用時,可以設定一個平均密度的參數 ε,將每個桶的容量設置為平均加載時間的 下上限 (1+ε);
具體的計算過程如下:
首先,計算 key 的 Hash 值;
隨后,沿著 Hash 環順時針尋找第一臺滿足條件(平均容量限制)的服務器;
獲取緩存;
例如下面的圖:
使用哈希函數將 6 個球和 3 個桶分配給 Hash 環 上的隨機位置,假設每個桶的容量設置為 2,按 ID 值的遞增順序分配球;
1 號球順時針移動,進入 C 桶;
2 號球進入 A 桶;
3 號和 4 號球進入 B 桶;
5 號球進入 C 桶;
然后 6 號球順時針移動,首先擊中 B 桶;但是桶 B 的容量為 2,并且已經包含球 3 和 4,所以球 6 繼續移動到達桶 C,但該桶也已滿;最后,球 6 最終進入具有備用插槽的桶 A;
算法實現
在上面基本一致性 Hash 算法實現的基礎上,我們繼續實現含有負載邊界值的一致性 Hash 算法;
在核心算法中添加根據負載情況查詢 Key 的函數,以及增加/釋放負載值的函數;
根據負載情況查詢 Key 的函數:
core/algorithm.go
func?(c?*Consistent)?GetKeyLeast(key?string)?(string,?error)?{c.RLock()defer?c.RUnlock()if?len(c.replicaHostMap)?==?0?{return?"",?ErrHostNotFound}hashedKey?:=?c.hashFunc(key)idx?:=?c.searchKey(hashedKey)?//?Find?the?first?host?that?may?serve?the?keyi?:=?idxfor?{host?:=?c.replicaHostMap[c.sortedHostsHashSet[i]]loadChecked,?err?:=?c.checkLoadCapacity(host)if?err?!=?nil?{return?"",?err}if?loadChecked?{return?host,?nil}i++//?if?idx?goes?to?the?end?of?the?ring,?start?from?the?beginningif?i?>=?len(c.replicaHostMap)?{i?=?0}} }func?(c?*Consistent)?checkLoadCapacity(host?string)?(bool,?error)?{//?a?safety?check?if?someone?performed?c.Done?more?than?neededif?c.totalLoad?<?0?{c.totalLoad?=?0}var?avgLoadPerNode?float64avgLoadPerNode?=?float64((c.totalLoad?+?1)?/?int64(len(c.hostMap)))if?avgLoadPerNode?==?0?{avgLoadPerNode?=?1}avgLoadPerNode?=?math.Ceil(avgLoadPerNode?*?(1?+?loadBoundFactor))candidateHost,?ok?:=?c.hostMap[host]if?!ok?{return?false,?ErrHostNotFound}if?float64(candidateHost.LoadBound)+1?<=?avgLoadPerNode?{return?true,?nil}return?false,?nil }在 GetKeyLeast 函數中,首先根據 searchKey 函數,順時針獲取可能滿足條件的第一個虛擬節點;
隨后調用 checkLoadCapacity 校驗當前緩存服務器的負載數是否滿足條件:
candidateHost.LoadBound+1 <= (c.totalLoad + 1) / len(hosts) * (1 + loadBoundFactor)
如果不滿足條件,則沿著 Hash 環走到下一個虛擬節點,繼續判斷是否滿足條件,直到滿足條件;
這里使用的是無條件的 for 循環,因為一定存在低于 平均負載*(1 + loadBoundFactor) 的虛擬節點!
增加/釋放負載值的函數:
core/algorithm.go
func?(c?*Consistent)?Inc(hostName?string)?{c.Lock()defer?c.Unlock()atomic.AddInt64(&c.hostMap[hostName].LoadBound,?1)atomic.AddInt64(&c.totalLoad,?1) }func?(c?*Consistent)?Done(host?string)?{c.Lock()defer?c.Unlock()if?_,?ok?:=?c.hostMap[host];?!ok?{return}atomic.AddInt64(&c.hostMap[host].LoadBound,?-1)atomic.AddInt64(&c.totalLoad,?-1) }邏輯比較簡單,就是原子的對對應緩存服務器進行負載加減一操作;
算法測試
修改代理服務器代碼
在代理服務器中增加路由:
proxy/proxy.go
func?(p?*Proxy)?GetKeyLeast(key?string)?(string,?error)?{host,?err?:=?p.consistent.GetKeyLeast(key)if?err?!=?nil?{return?"",?err}p.consistent.Inc(host)time.AfterFunc(time.Second*10,?func()?{?//?drop?the?host?after?10?seconds(for?testing)!fmt.Printf("dropping?host:?%s?after?10?second\n",?host)p.consistent.Done(host)})resp,?err?:=?http.Get(fmt.Sprintf("http://%s?key=%s",?host,?key))if?err?!=?nil?{return?"",?err}defer?resp.Body.Close()body,?_?:=?ioutil.ReadAll(resp.Body)fmt.Printf("Response?from?host?%s:?%s\n",?host,?string(body))return?string(body),?nil }注意:這里模擬的是單個 key 請求可能會持續 10s 鐘;
啟動代理服務器時增加路由:
main.go
func?startServer(port?string)?{//?......http.HandleFunc("/key_least",?getKeyLeast)//?...... }func?getKeyLeast(w?http.ResponseWriter,?r?*http.Request)?{_?=?r.ParseForm()val,?err?:=?p.GetKeyLeast(r.Form["key"][0])if?err?!=?nil?{w.WriteHeader(http.StatusInternalServerError)_,?_?=?fmt.Fprintf(w,?err.Error())return}_,?_?=?fmt.Fprintf(w,?fmt.Sprintf("key:?%s,?val:?%s",?r.Form["key"][0],?val)) }測試
啟動代理服務器,并開啟三臺緩存服務器;
通過下面的命令獲取含有負載邊界的 Key:
$?curl?localhost:18888/key_least?key=123 key:?123,?val:?hello:?123多次請求后的結果如下:
``` start?proxy?server:?18888 register?host:?localhost:8080?success register?host:?localhost:8081?success register?host:?localhost:8082?successResponse?from?host?localhost:8080:?hello:?123 Response?from?host?localhost:8080:?hello:?123 Response?from?host?localhost:8082:?hello:?123 Response?from?host?localhost:8082:?hello:?123 Response?from?host?localhost:8081:?hello:?123 Response?from?host?localhost:8080:?hello:?123 Response?from?host?localhost:8082:?hello:?123 Response?from?host?localhost:8081:?hello:?123 Response?from?host?localhost:8080:?hello:?123 Response?from?host?localhost:8082:?hello:?123 Response?from?host?localhost:8081:?hello:?123 Response?from?host?localhost:8080:?hello:?123 Response?from?host?localhost:8082:?hello:?123 Response?from?host?localhost:8081:?hello:?123 Response?from?host?localhost:8080:?hello:?123 Response?from?host?localhost:8080:?hello:?123 Response?from?host?localhost:8082:?hello:?123 Response?from?host?localhost:8080:?hello:?123 Response?from?host?localhost:8082:?hello:?123 Response?from?host?localhost:8082:?hello:?123 ```可以看到,緩存被均攤到了其他服務器(這是由于一個緩存請求會持續 10s 導致的)!
總結
本文拋磚引玉的講解了一致性 Hash 算法的原理,并提供了 Go 的實現;
在此基礎之上,根據 Google 的論文實現了帶有負載邊界的一致性 Hash 算法;
當然上面的代碼在實際生產環境下仍然需要部分改進,如:
服務注冊;
緩存服務器實現;
心跳檢測;
……
大家在實際使用時,可以根據需要,搭配實際的組件!
附錄
源代碼:
https://github.com/JasonkayZK/consistent-hashing-demo
文章參考:
https://segmentfault.com/a/1190000041268497
https://segmentfault.com/a/1190000021199728
https://zhuanlan.zhihu.com/p/98030096
https://zh.wikipedia.org/wiki/%E4%B8%80%E8%87%B4%E5%93%88%E5%B8%8C
https://ai.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
https://pkg.go.dev/crypto/sha512#Sum512
加后臺開發學習交流群, 一起進步!
掃碼加我微信,進群和大佬們零距離離
需要深入交流同學,可以加入極客星球,和幾百個同學一起快速成長:
大廠求職核心原理1v1指導(職位,簡歷,面試,策略等一條龍優化)
技術問題分析解答(有專屬VIP群)
大廠技術路線
后臺開發進階
開源項目學習
直播分享(已經分享了7期,加入星球可以看回放)
技術視野
按需提供經典資料,節約你
實戰技能分享
等,極客星球希望成為最有技術價值星球,盡最大努力為星球的同學提供面試,跳槽,技術成長幫助!詳情查看點擊->極客星球
- END -
看完一鍵三連在看,轉發,點贊
是對文章最大的贊賞,極客重生感謝你
推薦閱讀
定個目標|建立自己的技術知識體系
大廠后臺開發基本功修煉路線和經典資料
個人學習方法分享
你好,這里是極客重生,我是阿榮,大家都叫我榮哥,從華為->外企->到互聯網大廠,目前是大廠資深工程師,多次獲得五星員工,多年職場經驗,技術扎實,專業后端開發和后臺架構設計,熱愛底層技術,豐富的實戰經驗,分享技術的本質原理,希望幫助更多人蛻變重生,拿BAT大廠offer,培養高級工程師能力,成為技術專家,實現高薪夢想,期待你的關注!點擊藍字查看我的成長之路。
校招/社招/簡歷/面試技巧/大廠技術棧分析/后端開發進階/優秀開源項目/直播分享/技術視野/實戰高手等,?極客星球希望成為最有技術價值星球,盡最大努力為星球的同學提供面試,跳槽,技術成長幫助!詳情查看->極客星球
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 求點贊,在看,分享三連
總結
以上是生活随笔為你收集整理的一致性 Hash 算法原理总结的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年,腾讯研发人员增长41%,Go
- 下一篇: 每周进步一点