令牌桶 限速_Go 限流器实战系列(2) Token Bucket 令牌桶
上一篇說到 Leaky Bucket 能限制客戶端的訪問速率, 但是無法應對突發流量, 本質原因就是漏斗桶只是為了保證固定時間內通過的流量是一樣的. 面對這種情況, 本篇文章繼續介紹另外一種限流器: Token Bucket -- 令牌桶
什么是 Token Bucket
漏斗桶的桶空間就那么大, 其只能保證桶里的請求是勻速流出, 并不關心流入的速度, 只要桶溢出了就服務拒絕, 這可能并不符合互聯網行業的使用場景.
試想這樣的場景, 盡管服務器的 QPS 已經達到限速閾值了, 但是并不想將所有的流量都拒之門外, 仍然讓部分流量能夠正常通過限流器. 這樣我們的服務器在面對突發流量時能夠有一定的伸縮空間, 而不是一直處于不可用狀態.
基于上面的場景需求, 令牌桶采用跟漏斗桶截然不同的做法.
令牌桶令牌桶也有自己的固定大小, 我們設置 QPS 為 100, 在初始化令牌桶的時候, 就會立即生成 100 個令牌放到桶里面, 同時還按照一定的速率, 每隔一定的時間產生固定數量的令牌放入到桶中. 如果桶溢出了, 則舍棄生成的令牌.
只要有請求能夠拿到令牌, 就能保證其通過限流器. 當然拿不到令牌的請求只能被無情拒絕了(或者等待令牌產生), 這個請求的命不好~
面對突然爆發的流量, 可能大部分流量都被限流器給擋住了, 但是也有部分流量剛好拿到了剛剛生成的 Token, 就這樣在千軍萬馬中通過了限流器. 相對于漏斗桶來說, 令牌桶更適合現在互聯網行業的需要, 是被廣泛使用的限流算法
如何設置令牌桶的大小和產生令牌的速率?
答: 多進行生產環境的壓測, 根據集群的實際承受能力設置相應桶的大小和產生令牌的速率. 血的經驗告訴我, 周期性的線上壓測是一件很重要的事情(使用local cache的程序, 壓測的時候一定要記得先臨時關閉它)
juju/ratelimit
juju/ratelimit 是大部分項目都在使用的 golang 令牌桶的實現方案. 下面會結合其用法, 源碼剖析令牌桶的實現的方案.
gin 中間件
package?mainimport?(
?"fmt"
?"log"
?"net/http"
?"time"
?"github.com/gin-gonic/gin"
?"github.com/juju/ratelimit"
)
var?limiter?=?ratelimit.NewBucketWithQuantum(time.Second,?10,?10)
func?tokenRateLimiter()?gin.HandlerFunc?{
?fmt.Println("token?create?rate:",?limiter.Rate())
?fmt.Println("available?token?:",?limiter.Available())
?return?func(context?*gin.Context)?{
??if?limiter.TakeAvailable(1)?==?0?{
???log.Printf("available?token?:%d",?limiter.Available())
???context.AbortWithStatusJSON(http.StatusTooManyRequests,?"Too?Many?Request")
??}?else?{
???context.Writer.Header().Set("X-RateLimit-Remaining",?fmt.Sprintf("%d",?limiter.Available()))
???context.Writer.Header().Set("X-RateLimit-Limit",?fmt.Sprintf("%d",?limiter.Capacity()))
???context.Next()
??}
?}
}
func?main()?{
?e?:=?gin.Default()
?e.GET("/test",?tokenRateLimiter(),?func(context?*gin.Context)?{
??context.JSON(200,?true)
?})
?e.Run(":9091")
}
Output:
token?create?rate:?100available?token?:?100
[GIN]?2020/07/03?-?17:34:37?|?200?|?????157.505μs?|???????127.0.0.1?|?GET??????/test
[GIN]?2020/07/03?-?17:34:37?|?200?|?????310.898μs?|???????127.0.0.1?|?GET??????/test
[GIN]?2020/07/03?-?17:34:37?|?200?|???????61.64μs?|???????127.0.0.1?|?GET??????/test
[GIN]?2020/07/03?-?17:34:37?|?200?|???????8.677μs?|???????127.0.0.1?|?GET??????/test
[GIN]?2020/07/03?-?17:34:37?|?200?|???????6.145μs?|???????127.0.0.1?|?GET??????/test
[GIN]?2020/07/03?-?17:34:37?|?200?|??????23.576μs?|???????127.0.0.1?|?GET??????/test
[GIN]?2020/07/03?-?17:34:37?|?200?|???????5.617μs?|???????127.0.0.1?|?GET??????/test
.....
[GIN]?2020/07/03?-?17:35:03?|?429?|????6.193792ms?|???????127.0.0.1?|?GET??????/test
[GIN]?2020/07/03?-?17:35:03?|?200?|???????8.509μs?|???????127.0.0.1?|?GET??????/test?[GIN]?2020/07/03?-?17:35:03?|?429?|??????10.324μs?|???????127.0.0.1?|?GET??????/test
....
有下面特點:
源碼分析
初始化
建議使用初始化函數有下面三種:
- NewBucket(fillInterval time.Duration, capacity int64): 默認的初始化函數, 每一個周期內生成 1 個令牌, 默認 quantum = 1
- NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) : 跟 NewBucket 類似, 每個周期內生成 quantum 個令牌
- NewBucketWithRate(rate float64, capacity int64): 每秒產生 rate 速率的令牌
其實初始化函數還有很多種, 但基本上大同小異, 最后都是調用 NewBucketWithQuantumAndClock.
func?NewBucketWithQuantumAndClock(fillInterval?time.Duration,?capacity,?quantum?int64,?clock?Clock)?*Bucket?{?//?....
?return?&Bucket{
??clock:???????????clock,
??startTime:???????clock.Now(),
??latestTick:??????0,????????????//?上一次產生token的記錄點
??fillInterval:????fillInterval,?//?產生token的間隔
??capacity:????????capacity,?????//?桶的大小
??quantum:?????????quantum,??????//?每秒產生token的速率
??availableTokens:?capacity,?????//?桶內可用的令牌的個數
?}
}
Rate
func?(tb?*Bucket)?Rate()?float64?{?return?1e9?*?float64(tb.quantum)?/?float64(tb.fillInterval)
}
time.Duration 實際的是以 nanosecond 試試呈現的, 就是 1e9, 1e9 / float64(tb.fillInterval) 的結果就是 1/tb.fillInterval 秒.
于是令牌桶產生令牌的速率是: 每秒內產生 float64(tb.quantum) / float64(tb.fillInterval)
TakeAvailable
func?(tb?*Bucket)?currentTick(now?time.Time)?int64?{?//?由于?tb.quantum?是每秒產生的token的數量.?于是計算從bucket初始化的startTime到現在now的時間間隔?t,
?//?t/tb.fillInterval?*?tb.quantum?計算的是從開始到現在應該產生的?token?數量
?return?int64(now.Sub(tb.startTime)?/?tb.fillInterval)
}
func?(tb?*Bucket)?adjustavailableTokens(tick?int64)?{
?if?tb.availableTokens?>=?tb.capacity?{?//?如果令牌的可用數量已經達到桶的容量,?直接返回
??return
?}
?
?//?tick?*?tb.quantum?是從bucket初始化到本次請求應該產生的?token的數量
?//?tb.latestTick?是從bucket初始化到上次請求應該產生的?token的數量
?//?tick?*?tb.quantum?-?tb.latestTick?計算出兩次請求間應該產生的token數量
?//?tb.availableTokens?+=?(tick?-?tb.latestTick)?*?tb.quantum:?桶內剩余的token數量?+?新產生的token數量
?tb.availableTokens?+=?(tick?-?tb.latestTick)?*?tb.quantum
?if?tb.availableTokens?>?tb.capacity?{
??tb.availableTokens?=?tb.capacity?//?如果產生的令牌數量超過了桶的容量,?則桶內剩余的令牌數量等于桶的size
?}
?tb.latestTick?=?tick
?return
}
func?(tb?*Bucket)?takeAvailable(now?time.Time,?count?int64)?int64?{
?if?count?<=?0?{
??return?0
?}
?tb.adjustavailableTokens(tb.currentTick(now))
?if?tb.availableTokens?<=?0?{?//?如果桶內剩余token數量小于等于0,?直接返回0
??return?0
?}
?if?count?>?tb.availableTokens?{
??count?=?tb.availableTokens
?}
?tb.availableTokens?-=?count
?return?count
}
//?如果返回值是0,?代表桶內已經沒有令牌了
func?(tb?*Bucket)?TakeAvailable(count?int64)?int64?{
?tb.mu.Lock()
?defer?tb.mu.Unlock()
?return?tb.takeAvailable(tb.clock.Now(),?count)
}
TakeAvailable 是 Token Bucket的核心函數. 從這個實現我們能看到
Token Bucket的缺陷
令牌桶算法能滿足絕大部分服務器限流的需要, 是被廣泛使用的限流算法, 不過其也有一些缺點:
下一篇我們看alibaba/Sentinel, kratos 的 bbr 算法是如何做到系統自適應限流
參考
[1] 分布式服務限流實戰,已經為你排好坑了 https://www.infoq.cn/article/Qg2tX8fyw5Vt-f3HH673)
[2] 維基百科--Token_bucket https://en.wikipedia.org/wiki/Token_bucket
[3] juju/ratelimit https://github.com/juju/ratelimit
[4] B 站在微服務治理中的探索與實踐 https://www.infoq.cn/article/zRuGHM_SsQ0lk7gWyBgG
[5] 限流器系列(1) -- Leaky Bucket 漏斗桶 https://www.haohongfan.com/post/2020-06-27-leaky-bucket/
推薦閱讀
Go 限流器實戰系列(1) -- Leaky Bucket 漏斗桶
喜歡本文的朋友,歡迎關注“Go語言中文網”:
Go語言中文網啟用微信學習交流群,歡迎加微信:274768166,投稿亦歡迎
總結
以上是生活随笔為你收集整理的令牌桶 限速_Go 限流器实战系列(2) Token Bucket 令牌桶的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mariadb用户群体mysql_MyS
- 下一篇: linux中dhcp如何配置两个子网,l