不止八股:阿里内部语雀一些有趣的并发编程笔试题2——手写限流器
系列文章目錄和關(guān)于我
0丶引入
筆者社招一年半經(jīng)驗(yàn)跳槽加入阿里約1年時(shí)間,無意間發(fā)現(xiàn)一些阿里語雀上的一些面試題題庫,出于學(xué)習(xí)目的在此進(jìn)行記錄。
- 這一篇主要寫一些有趣的筆試題(非leetcode),這些有的考驗(yàn)并發(fā)編程,有的考驗(yàn)設(shè)計(jì)能力。
- 筆者不是什么技術(shù)大牛,此處筆試題充滿主觀思考,并不一定是滿分答案,歡迎評(píng)論區(qū)一起探討。
- 不止八股:面試題之外,筆者會(huì)更多的思考下底層原理,不只是簡單的背誦。
下面這個(gè)題目也是筆者面試阿里筆試做過的一道筆試題,現(xiàn)在回想自己那時(shí)候?qū)懙囊彩且慧?/p>
1.題目-限流組件設(shè)計(jì)
網(wǎng)站或者API服務(wù)有可能被惡意訪問導(dǎo)致不可用,為了防止流量過大,通常會(huì)有限流設(shè)計(jì)。
請(qǐng)實(shí)現(xiàn)一個(gè) RateLimiter 類,包含 isAllow 方法。每個(gè)請(qǐng)求包含一個(gè) resource 資源,如果resource 在 1 秒鐘內(nèi)有超過 N 次請(qǐng)求,就拒絕響應(yīng)。
public interface IRateLimiter{
boolean isAllow(String resource);
}
2.筆者的題解
筆者在面試的時(shí)候,其實(shí)沒看過,沒使用過sentinel(《Sentinel基本使用與源碼分析》),也沒看過Guava的RateLimiter,筆者上一份工作是一個(gè)銀行內(nèi)部的工具,用戶就是100-的銀行經(jīng)歷,限流是不可能限流。
因此第一反應(yīng)是使用一個(gè)變量記錄當(dāng)前是第幾秒,另外一個(gè)變量記錄當(dāng)前當(dāng)前通過了多少請(qǐng)求,這種算法也叫計(jì)數(shù)器算法。所以這里引入了兩個(gè)問題:
- 需要根據(jù)resource映射到一個(gè)對(duì)象,對(duì)象具備兩個(gè)字段——記錄第幾秒和通過請(qǐng)求數(shù)
- 如何保證 【兩個(gè)字段——記錄第幾秒和通過請(qǐng)求數(shù)】更新的線程安全
2.1 使用鎖實(shí)現(xiàn)計(jì)數(shù)器算法
2.1.1 回家等通知的寫法
如上,我們抽象出CountFlowChecker負(fù)責(zé)這一個(gè)資源的限流控制,checkerMap中key是資源名稱,value是CountFlowChecker。然后使用synchronized修飾checkerMap來實(shí)現(xiàn)checkerMap初始化的線程安全。這一段代碼有哪些問題?
- checkerMap讀是沒有競爭的,不需要加鎖
- 鎖的粒度太大了——鎖定整個(gè)checkerMap,讓所有調(diào)用CountRateLimiter1#isAllow都是串行的!
2.1.2 解決[checkerMap讀是沒有競爭的,不需要加鎖]的問題
如上讀不加鎖,只有發(fā)現(xiàn)沒有初始化,需要寫的是才進(jìn)入綠色部分代碼進(jìn)行初始化,但是綠色部分部分存在bug
如圖紅色,藍(lán)色代表兩個(gè)并發(fā)的請(qǐng)求,二者訪問的時(shí)候CountFlowChecker都沒有初始化,so都來到綠色部分,假設(shè)紅色請(qǐng)求先拿到鎖并成功初始化了CountFlowChecker然后釋放了鎖,這時(shí)候藍(lán)色請(qǐng)求處理線程被喚醒,將覆蓋紅色請(qǐng)求處理線程寫入的CountFlowChecker。如何解決昵?
其實(shí)和單例模式中的雙重if異曲同工之妙,這里獲取到鎖后再次讀取,只有為null才進(jìn)行初始化,解決了上面紅藍(lán)線程覆蓋的情況!這種寫法在Spring的Bean初始化中也有使用。
但是即使你這樣寫了,也是要回家等通知的!因?yàn)?鎖的粒度太大了——鎖定整個(gè)checkerMap,讓所有調(diào)用CountRateLimiter1#isAllow都是串行的!
2.1.3 減小鎖粒度
如上圖,鎖整個(gè)checkerMap相當(dāng)于把checkerMap把checkerMap中每一個(gè)數(shù)組都鎖了,但是不同的數(shù)組槽之間是沒有線程安全問題的,比如數(shù)組下標(biāo)0對(duì)應(yīng)了資源A,數(shù)組下標(biāo)2對(duì)應(yīng)了資源B,A和B是可以并行做數(shù)據(jù)變更操作的!
這其實(shí)就對(duì)應(yīng)了ConcurrentHashMap中減小鎖粒度思想!因此可以這樣優(yōu)化:
這里我們使用了ConcurrentHashMap#computeIfAbsent,保證了假設(shè)resouce對(duì)應(yīng)數(shù)組槽沒有元素的時(shí)候,會(huì)串行的將new 出來的CountFlowChecker塞到checkerMap中。ConcurrentHashMap中是對(duì)每一個(gè)數(shù)組槽使用cas+synchronized進(jìn)行初始化,原理可見:《JUC源碼學(xué)習(xí)筆記8——ConcurrentHashMap源碼分析》。
至此我們解決了resouce和CountFlowChecker的關(guān)聯(lián),接下來就是CountFlowChecker#isAllow的實(shí)現(xiàn)了,也是限流算法真正核心的部分!
下面是使用synchronized實(shí)現(xiàn)的方式:
@Data
static class CountFlowChecker {
// 當(dāng)前是第幾秒
private long seconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
// 當(dāng)前seconds這一秒請(qǐng)求了多少次
private int count = 0;
// qps = 10
private int max = 10;
public synchronized boolean isAllow(int acquire) {
if (acquire > max) {
return false;
}
long current = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
if (current == seconds) {
boolean flag = count + acquire <= max;
count += acquire;
return flag;
}
count = acquire;
return true;
}
}
如上我們使用synchronized保證seconds和count更新的原子性和可見性。
雖然synchronized具備輕量級(jí)這種樂觀鎖的優(yōu)化策略,但是在并發(fā)比較高的情況下會(huì)升級(jí)為重量鎖,最后會(huì)導(dǎo)致更多的系統(tǒng)調(diào)用和線程上下文切換,所以這時(shí)候通常需要考慮使用cas進(jìn)行優(yōu)化。(guava的RateLimiter就是使用的synchronized,但是面試官大概率會(huì)想看你對(duì)cas的理解如何)
2.2 使用cas實(shí)現(xiàn)計(jì)數(shù)器算法
推薦閱讀:《JUC源碼學(xué)習(xí)筆記4——原子類源碼分析,CAS,Volatile內(nèi)存屏障,緩存?zhèn)喂蚕砼cUnSafe相關(guān)方法》
這里主要是怎么用cas完成seconds和count兩個(gè)變量的更新
-
AtomicReference
我們自定義一個(gè)類,里面包含當(dāng)前是第一秒和這一秒通過了多次請(qǐng)求
-
AtomicStampedReference
本質(zhì)是解決cas中的ABA問題,但是在這里我們可以使用
stamp 表示當(dāng)前是第幾秒
如下是使用AtomicStampedReference的實(shí)現(xiàn)方式
public class CountFlowChecker1 {
/**
* Integer 表示這一秒內(nèi)通過的請(qǐng)求,
* stamp 表示當(dāng)前是第幾秒
*/
private AtomicStampedReference<Integer> flowCountHelper;
private int max;
public CountFlowChecker1(int max) {
this.max = max;
flowCountHelper = new AtomicStampedReference<>(0, currentSeconds());
}
public boolean isAllow(int acquire) {
if (acquire > max) {
return false;
}
while (true) {
// 當(dāng)前是第幾秒
int currentSeconds = currentSeconds();
// 上一次統(tǒng)計(jì)是第幾秒
int preSeconds = flowCountHelper.getStamp();
// 上一次的數(shù)量
Integer preCount = flowCountHelper.getReference();
// 是同一秒,超過了閾值,那么false
if (preSeconds == currentSeconds & preCount + acquire > max) {
return false;
}
// 不是同一秒,或者是同一秒沒超過閾值,那么cas更新
if (flowCountHelper.compareAndSet(preCount, preCount + acquire, preSeconds, currentSeconds)) {
return true;
}
// 更新失敗 繼續(xù)自旋
}
}
private static int currentSeconds() {
return (int) (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) % Integer.MAX_VALUE);
}
}
主要是isAllow方法,如上我們可以看到核心思想是在一個(gè)自旋中使用cas保證第幾秒和請(qǐng)求數(shù)量的更新原子性。
但是這里引入一個(gè)問題:為什么AtomicStampedReference#compareAndSet可以保證可見性?線程A cas成功,那么線程B cas會(huì)失敗繼續(xù)自旋,重新獲取flowCountHelper.getStamp()和flowCountHelper.getReference(),為什么getStamp,getReference可以保證線程B立馬可見?
原因就在AtomicStampedReference使用Pair保證stamp和reference,Pair是使用volatile修飾的,對(duì)于volatile變量的寫操作,會(huì)在其后插入一個(gè)內(nèi)存屏障。在Java中,volatile變量的寫操作后通常會(huì)插入一個(gè)"store-store"屏障(Store Barrier),以及一個(gè)"store-load"屏障。這些內(nèi)存屏障確保了以下幾點(diǎn):
- Store-Store Barrier:這個(gè)屏障防止volatile寫與之前普通寫的指令重排序。也就是說,對(duì)volatile變量的寫操作之前的所有普通寫操作都必須在volatile寫之前完成,確保了volatile寫之前的修改對(duì)于其他線程是可見的。
- Store-Load Barrier:這個(gè)屏障防止volatile寫與之后的讀寫操作重排序。它確保了volatile變量寫之后的讀取操作不會(huì)在volatile寫之前發(fā)生。這保證了volatile寫操作的結(jié)果對(duì)后續(xù)操作是可見的。
ok,我們繼續(xù)回到這種寫法,由于其使用了cas保證原子性,如果一瞬間有1000線程過來,那么1個(gè)線程成功,那么999個(gè)線程就要繼續(xù)自旋,導(dǎo)致浪費(fèi)了很多cpu資源,有沒有辦法優(yōu)化一下昵?
2.2.1 使用Thread#yield降低cpu資源浪費(fèi)
既然太多線程自旋了,那么可以在自旋失敗后使用Thread#yield降低這種cpu資源的競爭
但是這種方法也不是非常的優(yōu)秀,因?yàn)樗鼤?huì)導(dǎo)致請(qǐng)求處理的rt變高,但是是一種優(yōu)化思路,咱沒辦法做到既要有要。
2.2.2 借鑒LongAdder的思想,減少熱點(diǎn)數(shù)據(jù)競爭
如上面的寫法,所有線程都在cas競爭修改flowCountHelper中記錄數(shù)量,這個(gè)數(shù)量是一個(gè)熱點(diǎn)數(shù)據(jù),我們可以學(xué)習(xí)LongAdder的做法進(jìn)行優(yōu)化
LongAdder 內(nèi)部有base用于在沒有競爭的情況下,進(jìn)行CAS更新,其中還有Cell數(shù)組在沖突的時(shí)候根據(jù)線程唯一標(biāo)識(shí)對(duì)Cell數(shù)組長度進(jìn)行取模,讓線程去更新Cell數(shù)組中的內(nèi)容。這樣最后的值就是 base+Cell數(shù)組之和,LongAdder自然只能保證最終一致性,如果邊更新邊獲取總和不能保證總和正確
如下是借鑒后寫的代碼
2.2.2.1 基本屬性
可以看到我們改變了使用一個(gè)Integer記錄這一秒請(qǐng)求總數(shù)的方式,轉(zhuǎn)而使用一個(gè)AtomicIntegerArray數(shù)組記錄,數(shù)組之和才是這一秒通過的總數(shù)。
而且還使用了ThreadLocal記錄當(dāng)前線程分配到的位置,一個(gè)線程對(duì)應(yīng)AtomicIntegerArray數(shù)組中的一個(gè)位置,從而實(shí)現(xiàn)熱點(diǎn)數(shù)據(jù)分離!!!
但是這也帶來了一些弊端,后面代碼會(huì)有所體現(xiàn)。
2.2.2.2 限流邏輯
public boolean isAllow(int acquire) {
if (acquire > max) {
return false;
}
while (true) {
// 當(dāng)前是第幾秒
int currentSeconds = currentSeconds();
// 上一次統(tǒng)計(jì)是第幾秒
int preSeconds = flowCountHelper.getStamp();
int currentThreadRandomIndex = THREA_RAMDON_INDEX.get();
// 不是同一秒 嘗試new 一個(gè)全新的數(shù)組!
if (currentSeconds != preSeconds) {
AtomicIntegerArray newCountArray = new AtomicIntegerArray(100);
newCountArray.set(currentThreadRandomIndex, acquire);
if (flowCountHelper.compareAndSet(flowCountHelper.getReference(), newCountArray, preSeconds, currentSeconds)) {
return true;
}
}
// 是同一秒 or cas 失敗 如果是cas失敗,那么說明存在另外一個(gè)線程new了一個(gè)權(quán)限數(shù)組
// 統(tǒng)計(jì)這一秒有多少請(qǐng)求量
// 細(xì)節(jié)1 重新使用flowCountHelper.getReference(),因?yàn)槿绻巧厦鎐as失敗,那么這里的flowCountHelper.getReference()對(duì)應(yīng)的AtomicIntegerArray被替換成新的了
AtomicIntegerArray countArray = flowCountHelper.getReference();
int countArrayLength = countArray.length();
// 統(tǒng)計(jì)總數(shù)
long preCount = 0;
for (int i = 0; i < countArrayLength; i++) {
preCount = countArray.get(i);
}
// 理論上 上面的for不會(huì)消耗太多時(shí)間
// 不夠需要的,那么false
if (preCount + acquire > max) {
return false;
}
// 在currentThreadRandomIndex的原值
int sourceValue = countArray.get(currentThreadRandomIndex);
// 細(xì)節(jié)2:使用的是【細(xì)節(jié)1】拿到的array 這時(shí)候不能重新flowCountHelper.getReference(),因?yàn)槿绻厦娴膄or統(tǒng)計(jì)超過了一秒,那么這一次的請(qǐng)求會(huì)加到下一秒
if (countArray.compareAndSet(currentThreadRandomIndex, sourceValue, sourceValue + acquire)) {
// 弊端,這里true 不一定成功的限制了qps,因?yàn)樯厦娴那蠛?與 這里的cas 不具備一致性,存在其他線程修改了的情況
return true;
}
// 理論沖突的概率降低了,不需要 yield 吧
}
}
可以看到大體思路差不多,其中有兩處細(xì)節(jié),大家可以品一品
-
細(xì)節(jié)1:
-
細(xì)節(jié)2:
這里使用的是統(tǒng)計(jì)前獲取AtomicIntegerArray,為什么不
flowCountHelper.getReference()?因?yàn)榇嬖诹硗饩€程發(fā)現(xiàn)不是同一秒然后更新了flowCountHelper中AtomicIntegerArray引用的指向,如果重新flowCountHelper.getReference()可能讓上一秒的請(qǐng)求加到下一秒,當(dāng)然這也不是不可以,這也相當(dāng)于上一秒借用了下一秒-
弊端:求和和cas不具備一致性
-
問題:為什么AtomicIntegerArray可以保證數(shù)組元素的可見性?
同樣是因?yàn)槭褂昧藘?nèi)存屏障!
另外筆者這里的AtomicIntegerArray是沒法擴(kuò)容的,默認(rèn)100個(gè)。LongAdder的設(shè)計(jì)則更為巧妙,LongAdder中存在一個(gè)volatile long base值,LongAdder會(huì)優(yōu)先case更新base,如果存在多線程導(dǎo)致case失敗,才使用數(shù)組進(jìn)行規(guī)避,而且還具備擴(kuò)容的能力,感興趣的話可以看看筆者寫的:JUC源碼學(xué)習(xí)筆記4——原子類源碼分析,CAS,Volatile內(nèi)存屏障,緩存?zhèn)喂蚕砼cUnSafe相關(guān)方法 - Cuzzz - 博客園 (cnblogs.com)
2.3 計(jì)數(shù)器算法的不足
臨界值問題:假設(shè)我們qps最大為10,如果在第一秒的前900ms沒有請(qǐng)求,但是后100ms通過了10個(gè)請(qǐng)求,然后來到下一秒,下一秒的100ms也通過了100ms,那么在第一秒后100ms和下一秒的前100ms一共通過了20個(gè)請(qǐng)求,這一段時(shí)間內(nèi)是超出了qps 10的!
為此有了下面的滑動(dòng)窗口算法
2.4 滑動(dòng)窗口算法
為了避免計(jì)數(shù)器中的臨界問題,讓限制更加平滑,將固定窗口中分割出多個(gè)小時(shí)間窗口,分別在每個(gè)小的時(shí)間窗口中記錄訪問次數(shù),然后根據(jù)時(shí)間將窗口往前滑動(dòng)并刪除過期的小時(shí)間窗口。
計(jì)數(shù)器算法,可以看做只有兩個(gè)窗口,因此在兩個(gè)窗口邊界的時(shí)候會(huì)出現(xiàn)臨界問題。而滑動(dòng)窗口統(tǒng)計(jì)當(dāng)前時(shí)間處于的1s內(nèi)產(chǎn)生了多少次請(qǐng)求,避免了臨界問題
- 優(yōu)點(diǎn):實(shí)現(xiàn)相對(duì)簡單,且沒有計(jì)數(shù)器算法的臨界問題
- 缺點(diǎn):無法應(yīng)對(duì)短時(shí)間高并發(fā)(突刺現(xiàn)象),比如我在間歇性高流量請(qǐng)求,每一批次的請(qǐng)求,處于不同的窗口(圖中的虛線窗口)
接下來我們將手寫滑動(dòng)窗口算法(sentinel也是使用的滑動(dòng)窗口算法:Sentinel基本使用與源碼分析)
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.atomic.AtomicStampedReference;
/**
* 假設(shè) 把一秒分割為 5 份
* |-----|-----|-----|-----|------|
* 0 200ms 400ms 600ms 800ms 1000ms
* 假設(shè)當(dāng)前是10s余500ms 那么應(yīng)該落到 400ms——600ms之間
* 算法就是 10s余500ms%1s = 500ms,500ms/(1s/5) = 2 ==> 對(duì)應(yīng)400ms——600ms
* 這里需要注意 如果不是同一秒的值,那么不要統(tǒng)計(jì)進(jìn)去,
* 比如9s的時(shí)候400ms——600ms值是10,現(xiàn)在時(shí)間到10s余500ms了
* 統(tǒng)計(jì)的時(shí)候不是10+1,而是1,因?yàn)槎疾煌幻肓?因此滑動(dòng)窗口的元素需要記下當(dāng)前是第一秒的值
*/
public class CountFlowChecker3 {
// AtomicReferenceArray 就是上面的滑動(dòng)窗口,本質(zhì)是一個(gè)數(shù)組
// AtomicStampedReference中stamp記錄是第幾秒的值,Integer記錄數(shù)量
private AtomicReferenceArray<AtomicStampedReference<Integer>> slideWindow;
// 最大qps
private int max;
// 把一秒分為多少份!
private int arrayLength;
// 一份是多少ms
private int intervalDuration;
public CountFlowChecker3(int max, int arrayLength) {
this.max = max;
this.arrayLength = arrayLength;
this.slideWindow = new AtomicReferenceArray<>(arrayLength);
// 這里可能存在沒辦法整除的情況,不是本文的主題,暫不做考慮
this.intervalDuration = 1000 / arrayLength;
}
public boolean isAllow(int acquire) {
if (acquire > max) {
return false;
}
while (true) {
// 當(dāng)前時(shí)間
long currentMilliSeconds = currentMilliSeconds();
int currentSeconds = (int) (TimeUnit.MILLISECONDS.toSeconds(currentMilliSeconds) % Integer.MAX_VALUE);
// 對(duì)應(yīng)在滑動(dòng)窗口的位置
int index = (int) (currentMilliSeconds%1000 / this.intervalDuration);
// 求和
long preSum = sum(currentSeconds);
// 超出限流
if (preSum + acquire >= max) {
return false;
}
// 獲取當(dāng)前位置的引用
AtomicStampedReference<Integer> element = slideWindow.get(index);
// 如果沒有初始化
if (Objects.isNull(element)) {
if (slideWindow.compareAndSet(index, null, new AtomicStampedReference<>(acquire, currentSeconds))) {
return true;
}
}
// 刷新一下,因?yàn)檫@時(shí)候maybe被其他線程初始化了
element = slideWindow.get(index);
// 是同一秒,那么+,如果不是那么覆蓋
int sourceSeconds = element.getStamp();
int updateValue = sourceSeconds == currentSeconds ? element.getReference() + acquire : acquire;
if (element.compareAndSet(element.getReference(),updateValue,sourceSeconds,currentSeconds)) {
return true;
}
}
}
private long sum(int currentSeconds) {
int sum = 0;
for (int i = 0; i < slideWindow.length(); i++) {
AtomicStampedReference<Integer> element = slideWindow.get(i);
// 是同一秒的值才統(tǒng)計(jì)!
if (Objects.nonNull(element) && element.getStamp() == currentSeconds
&& Objects.nonNull(element.getReference())) {
sum = element.getReference();
}
}
return sum;
}
private static long currentMilliSeconds() {
return System.currentTimeMillis();
}
}
可以看到滑動(dòng)窗口統(tǒng)計(jì)完多個(gè)窗口值后,如果判斷可以繼續(xù)通過那么也是進(jìn)行cas更新,統(tǒng)計(jì)sum和后面的cas也不具備一致性
并且同樣可以使用LongAdder優(yōu)化熱點(diǎn)數(shù)據(jù)競爭的問題,比如下優(yōu)化,代碼類似于2.2.2
2.5 令牌桶算法
請(qǐng)求執(zhí)行作為消費(fèi)者,每個(gè)請(qǐng)求都需要去桶中拿取一個(gè)令牌,取到令牌則繼續(xù)執(zhí)行;如果桶中無令牌可取,就觸發(fā)拒絕策略,可以是超時(shí)等待,也可以是直接拒絕本次請(qǐng)求,由此達(dá)到限流目的。當(dāng)桶中令牌數(shù)大于最大數(shù)量的時(shí)候,將不再添加。它可以適應(yīng)流量突發(fā),N 個(gè)請(qǐng)求到來只需要從桶中獲取 N 個(gè)令牌就可以繼續(xù)處理。
import com.google.common.util.concurrent.AtomicDouble;
public class CountFlowChecker4 {
// 最大qps
private final double maxTokens;
// 上一次可用的tokens
// com.google.common.util.concurrent.AtomicDouble;
// 使用doubleToRawLongBits和longBitsToDouble進(jìn)行double的轉(zhuǎn)換
private final AtomicDouble availableTokens;
// 上一次填充的間隔
private volatile long lastRefillTimeStamp;
public CountFlowChecker4(double maxTokens) {
this.maxTokens = maxTokens;
this.availableTokens = new AtomicDouble(maxTokens);
this.lastRefillTimeStamp = System.currentTimeMillis();
}
public boolean isAllow(int acquire) {
if (acquire > maxTokens) {
return false;
}
long now = System.currentTimeMillis();
// 嘗試根據(jù)時(shí)間重新填充令牌
refill(now);
double currentTokens = availableTokens.get();
// 如果沒有足夠的令牌,則立即返回false,不阻塞
if (currentTokens < acquire) {
return false;
}
// 如果令牌數(shù)量足夠,則使用CAS減少一個(gè)令牌
return availableTokens.compareAndSet(currentTokens, currentTokens - acquire);
}
private void refill(long now) {
double tokensToAdd = (((double) (now - lastRefillTimeStamp)) / 1000 * maxTokens);
double preCount = availableTokens.get();
double newTokenCount = Math.min(maxTokens, preCount + tokensToAdd);
// 使用CAS更新令牌數(shù)量,如果失敗則忽略(其他線程可能已經(jīng)更新了)
if (tokensToAdd > 0) {
if (availableTokens.compareAndSet(preCount, newTokenCount)) {
// 這里不需要糾結(jié)lastRefillTimeStamp 和 availableTokens更新的原子性
// 因?yàn)閘astRefillTimeStamp 記錄的是上一次更新時(shí)間
// 如果當(dāng)前線程成功,那么就更新吧
lastRefillTimeStamp = now;
}
}
}
}
令牌桶算法實(shí)現(xiàn)也并沒有太復(fù)雜,而且這里使用的是動(dòng)態(tài)計(jì)算令牌數(shù)據(jù),可以看出適應(yīng)流量突發(fā),一瞬間可用給出全部的令牌,甚至還可以積攢令牌應(yīng)對(duì)并發(fā),但是這種允許突發(fā)流量對(duì)于下游是不是不太友好。
2.6 漏桶算法
漏桶限流算法的核心就是, 不管上面的水流速度有多塊, 漏桶水滴的流出速度始終保持不變,不論進(jìn)入的流量有多么不規(guī)則,流量的離開速率卻始終保持恒定
在漏桶算法中,有一個(gè)固定容量的桶,請(qǐng)求(類似水)以任意速率流入桶內(nèi),而桶以恒定速率往外“漏”出請(qǐng)求。如果桶滿了,進(jìn)來的新請(qǐng)求會(huì)丟棄或排隊(duì)等待。
通常實(shí)際應(yīng)用是通過定時(shí)器任務(wù)實(shí)現(xiàn)漏桶的“漏水”操作,即定時(shí)任務(wù)線程定時(shí)從桶中獲取任務(wù)進(jìn)行執(zhí)行,理論上使用一個(gè)阻塞隊(duì)列+調(diào)度線程池可進(jìn)行實(shí)現(xiàn)。
漏桶算法在需要及時(shí)響應(yīng)的場景下不是很友好,任務(wù)如果被提交到桶,調(diào)用方卻超時(shí)了那么任務(wù)處理也沒啥意義了,和本地場景不是很符合。
3.筆者的思考
3.1 限流器算法比較
-
計(jì)數(shù)器(Fixed Window Counter)
最簡單的限流算法,基于一個(gè)固定的時(shí)間窗口(比如每秒),統(tǒng)計(jì)請(qǐng)求的數(shù)量,當(dāng)請(qǐng)求數(shù)量超出閾值時(shí),新的請(qǐng)求將被拒絕或者排隊(duì)。
- 優(yōu)點(diǎn):實(shí)現(xiàn)簡單,容易理解。
- 缺點(diǎn):在時(shí)間窗口邊界處存在突發(fā)請(qǐng)求量的問題,即窗口重置時(shí)可能會(huì)突然允許大量請(qǐng)求通過,從而導(dǎo)致短時(shí)間的高流量。
-
滑動(dòng)窗口(Sliding Window Log)
滑動(dòng)窗口算法是對(duì)計(jì)數(shù)器算法的一種改進(jìn),它考慮了時(shí)間窗口中的每個(gè)小間隔。這些間隔可以是過去幾秒、幾分的N個(gè)桶,算法根據(jù)請(qǐng)求到達(dá)的時(shí)間進(jìn)行統(tǒng)計(jì),使得限流更加平滑。
- 優(yōu)點(diǎn):相比固定窗口算法,滑動(dòng)窗口可以減少時(shí)間窗口邊緣的突發(fā)流量問題。
- 缺點(diǎn):比固定窗口算法復(fù)雜,實(shí)現(xiàn)和維護(hù)成本更高。
-
漏桶(Leaky Bucket)
漏桶算法使用一個(gè)固定容量的桶來表示令牌或請(qǐng)求。請(qǐng)求按照固定的速率進(jìn)入桶內(nèi),而桶按照固定的速率向外漏水(處理請(qǐng)求),當(dāng)桶滿時(shí),多余的水(請(qǐng)求)會(huì)溢出(被拒絕或排隊(duì))。
- 優(yōu)點(diǎn):能夠以恒定的速率處理請(qǐng)求,避免了突發(fā)流量影響。
- 缺點(diǎn):即使網(wǎng)絡(luò)狀況良好,桶的出水速率也是恒定的,這可能會(huì)導(dǎo)致一定程度上的資源浪費(fèi)。
-
令牌桶(Token Bucket)
令牌桶算法維護(hù)一個(gè)令牌桶,桶內(nèi)有一個(gè)固定容量的令牌。系統(tǒng)以恒定速率生成令牌到桶中,當(dāng)請(qǐng)求到來時(shí),如果桶內(nèi)有令牌,則允許該請(qǐng)求通過,并消耗一個(gè)令牌;如果沒有令牌,則請(qǐng)求被拒絕或排隊(duì)。
- 優(yōu)點(diǎn):能夠允許一定程度的突發(fā)流量,因?yàn)榭梢岳鄯e令牌;處理請(qǐng)求的速率可以動(dòng)態(tài)調(diào)整。
- 缺點(diǎn):實(shí)現(xiàn)比固定窗口計(jì)數(shù)器和漏桶算法復(fù)雜。
3.2 上述編碼中的思想
-
【2.1.2解決
[checkerMap讀是沒有競爭的,不需要加鎖]的問題】讀寫分離的思想,如mysql中讀取一般是不加鎖的,我們?cè)趯?shí)際業(yè)務(wù)開發(fā)中讀取數(shù)據(jù)也一般是不會(huì)加鎖的!
-
【2.1.3 減小鎖粒度】
如mysql中的行鎖和表鎖,行鎖提高了更高的并發(fā)度,這也是innodb優(yōu)于其myisam的點(diǎn)
-
【2.2.2 借鑒LongAdder的思想,減少熱點(diǎn)數(shù)據(jù)競爭】
熱點(diǎn)數(shù)據(jù)分離,比如redis中的熱key釋放可拆分為key1,key2進(jìn)行熱點(diǎn)數(shù)據(jù)分離。
比如大賣的商家,其訂單流水分在多個(gè)表,分散熱點(diǎn)避免單表性能瓶頸
-
【2.6 漏桶算法】
類似消息的隊(duì)列的削峰填谷,將請(qǐng)求放到消息隊(duì)列,讓消費(fèi)者以合適的速率進(jìn)行消費(fèi)
3.3 分布式限流
如果機(jī)器有500臺(tái),限流100qps怎么辦?
- Sentinel提供了一種分布式限流,核心是選取一臺(tái)機(jī)器作為leader,其他機(jī)器調(diào)用的時(shí)候需要發(fā)送請(qǐng)求申請(qǐng)令牌,leader負(fù)責(zé)進(jìn)行統(tǒng)計(jì)。但是一般建議使用,因?yàn)榫邆鋯吸c(diǎn)故障的問題,而且也不夠去中心化。
- redis實(shí)現(xiàn),每一臺(tái)機(jī)器都需要訪問redis進(jìn)行讀寫操作,熱key問題,并且徒增rt,不太劃算
我們?cè)陔p11大促的時(shí)候,會(huì)進(jìn)行流量評(píng)估,一般不建議單機(jī)qps不能小于5,小于5的限流容易出現(xiàn)誤殺,不太具備現(xiàn)實(shí)意義。
因此如果流量負(fù)載均衡,那么建議優(yōu)化為單機(jī)限流,使用sentinel or guava的RateLimiter
但是筆者有一個(gè)項(xiàng)目,算法提供AIGC服務(wù),機(jī)器只有100臺(tái),且每一臺(tái)并發(fā)度為1,服務(wù)端有500臺(tái),但是筆者是離線定時(shí)任務(wù)調(diào)用AIGC服務(wù),所以我使用了分布式調(diào)度map-reduce,使用調(diào)度機(jī)器分配任務(wù)到100臺(tái)服務(wù)端機(jī)器上,服務(wù)端機(jī)器單機(jī)串行從而控制qps!
總結(jié)
以上是生活随笔為你收集整理的不止八股:阿里内部语雀一些有趣的并发编程笔试题2——手写限流器的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JVM学习-程序编译与优化
- 下一篇: 【scikit-learn基础】--『监