不止八股:阿里内部语雀一些有趣的并发编程笔试题2——手写限流器
系列文章目錄和關于我
0丶引入
筆者社招一年半經驗跳槽加入阿里約1年時間,無意間發現一些阿里語雀上的一些面試題題庫,出于學習目的在此進行記錄。
- 這一篇主要寫一些有趣的筆試題(非leetcode),這些有的考驗并發編程,有的考驗設計能力。
- 筆者不是什么技術大牛,此處筆試題充滿主觀思考,并不一定是滿分答案,歡迎評論區一起探討。
- 不止八股:面試題之外,筆者會更多的思考下底層原理,不只是簡單的背誦。
下面這個題目也是筆者面試阿里筆試做過的一道筆試題,現在回想自己那時候寫的也是一坨
1.題目-限流組件設計
網站或者API服務有可能被惡意訪問導致不可用,為了防止流量過大,通常會有限流設計。
請實現一個 RateLimiter 類,包含 isAllow 方法。每個請求包含一個 resource 資源,如果resource 在 1 秒鐘內有超過 N 次請求,就拒絕響應。
public interface IRateLimiter{
boolean isAllow(String resource);
}
2.筆者的題解
筆者在面試的時候,其實沒看過,沒使用過sentinel(《Sentinel基本使用與源碼分析》),也沒看過Guava的RateLimiter,筆者上一份工作是一個銀行內部的工具,用戶就是100-的銀行經歷,限流是不可能限流。
因此第一反應是使用一個變量記錄當前是第幾秒,另外一個變量記錄當前當前通過了多少請求,這種算法也叫計數器算法。所以這里引入了兩個問題:
- 需要根據resource映射到一個對象,對象具備兩個字段——記錄第幾秒和通過請求數
- 如何保證 【兩個字段——記錄第幾秒和通過請求數】更新的線程安全
2.1 使用鎖實現計數器算法
2.1.1 回家等通知的寫法
如上,我們抽象出CountFlowChecker負責這一個資源的限流控制,checkerMap中key是資源名稱,value是CountFlowChecker。然后使用synchronized修飾checkerMap來實現checkerMap初始化的線程安全。這一段代碼有哪些問題?
- checkerMap讀是沒有競爭的,不需要加鎖
- 鎖的粒度太大了——鎖定整個checkerMap,讓所有調用CountRateLimiter1#isAllow都是串行的!
2.1.2 解決[checkerMap讀是沒有競爭的,不需要加鎖]的問題
如上讀不加鎖,只有發現沒有初始化,需要寫的是才進入綠色部分代碼進行初始化,但是綠色部分部分存在bug
如圖紅色,藍色代表兩個并發的請求,二者訪問的時候CountFlowChecker都沒有初始化,so都來到綠色部分,假設紅色請求先拿到鎖并成功初始化了CountFlowChecker然后釋放了鎖,這時候藍色請求處理線程被喚醒,將覆蓋紅色請求處理線程寫入的CountFlowChecker。如何解決昵?
其實和單例模式中的雙重if異曲同工之妙,這里獲取到鎖后再次讀取,只有為null才進行初始化,解決了上面紅藍線程覆蓋的情況!這種寫法在Spring的Bean初始化中也有使用。
但是即使你這樣寫了,也是要回家等通知的!因為 鎖的粒度太大了——鎖定整個checkerMap,讓所有調用CountRateLimiter1#isAllow都是串行的!
2.1.3 減小鎖粒度
如上圖,鎖整個checkerMap相當于把checkerMap把checkerMap中每一個數組都鎖了,但是不同的數組槽之間是沒有線程安全問題的,比如數組下標0對應了資源A,數組下標2對應了資源B,A和B是可以并行做數據變更操作的!
這其實就對應了ConcurrentHashMap中減小鎖粒度思想!因此可以這樣優化:
這里我們使用了ConcurrentHashMap#computeIfAbsent,保證了假設resouce對應數組槽沒有元素的時候,會串行的將new 出來的CountFlowChecker塞到checkerMap中。ConcurrentHashMap中是對每一個數組槽使用cas+synchronized進行初始化,原理可見:《JUC源碼學習筆記8——ConcurrentHashMap源碼分析》。
至此我們解決了resouce和CountFlowChecker的關聯,接下來就是CountFlowChecker#isAllow的實現了,也是限流算法真正核心的部分!
下面是使用synchronized實現的方式:
@Data
static class CountFlowChecker {
// 當前是第幾秒
private long seconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
// 當前seconds這一秒請求了多少次
private int count = 0;
// qps = 10
private int max = 10;
public synchronized boolean isAllow(int acquire) {
if (acquire > max) {
return false;
}
long current = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
if (current == seconds) {
boolean flag = count + acquire <= max;
count += acquire;
return flag;
}
count = acquire;
return true;
}
}
如上我們使用synchronized保證seconds和count更新的原子性和可見性。
雖然synchronized具備輕量級這種樂觀鎖的優化策略,但是在并發比較高的情況下會升級為重量鎖,最后會導致更多的系統調用和線程上下文切換,所以這時候通常需要考慮使用cas進行優化。(guava的RateLimiter就是使用的synchronized,但是面試官大概率會想看你對cas的理解如何)
2.2 使用cas實現計數器算法
推薦閱讀:《JUC源碼學習筆記4——原子類源碼分析,CAS,Volatile內存屏障,緩存偽共享與UnSafe相關方法》
這里主要是怎么用cas完成seconds和count兩個變量的更新
-
AtomicReference
我們自定義一個類,里面包含當前是第一秒和這一秒通過了多次請求
-
AtomicStampedReference
本質是解決cas中的ABA問題,但是在這里我們可以使用
stamp 表示當前是第幾秒
如下是使用AtomicStampedReference的實現方式
public class CountFlowChecker1 {
/**
* Integer 表示這一秒內通過的請求,
* stamp 表示當前是第幾秒
*/
private AtomicStampedReference<Integer> flowCountHelper;
private int max;
public CountFlowChecker1(int max) {
this.max = max;
flowCountHelper = new AtomicStampedReference<>(0, currentSeconds());
}
public boolean isAllow(int acquire) {
if (acquire > max) {
return false;
}
while (true) {
// 當前是第幾秒
int currentSeconds = currentSeconds();
// 上一次統計是第幾秒
int preSeconds = flowCountHelper.getStamp();
// 上一次的數量
Integer preCount = flowCountHelper.getReference();
// 是同一秒,超過了閾值,那么false
if (preSeconds == currentSeconds & preCount + acquire > max) {
return false;
}
// 不是同一秒,或者是同一秒沒超過閾值,那么cas更新
if (flowCountHelper.compareAndSet(preCount, preCount + acquire, preSeconds, currentSeconds)) {
return true;
}
// 更新失敗 繼續自旋
}
}
private static int currentSeconds() {
return (int) (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) % Integer.MAX_VALUE);
}
}
主要是isAllow方法,如上我們可以看到核心思想是在一個自旋中使用cas保證第幾秒和請求數量的更新原子性。
但是這里引入一個問題:為什么AtomicStampedReference#compareAndSet可以保證可見性?線程A cas成功,那么線程B cas會失敗繼續自旋,重新獲取flowCountHelper.getStamp()和flowCountHelper.getReference(),為什么getStamp,getReference可以保證線程B立馬可見?
原因就在AtomicStampedReference使用Pair保證stamp和reference,Pair是使用volatile修飾的,對于volatile變量的寫操作,會在其后插入一個內存屏障。在Java中,volatile變量的寫操作后通常會插入一個"store-store"屏障(Store Barrier),以及一個"store-load"屏障。這些內存屏障確保了以下幾點:
- Store-Store Barrier:這個屏障防止volatile寫與之前普通寫的指令重排序。也就是說,對volatile變量的寫操作之前的所有普通寫操作都必須在volatile寫之前完成,確保了volatile寫之前的修改對于其他線程是可見的。
- Store-Load Barrier:這個屏障防止volatile寫與之后的讀寫操作重排序。它確保了volatile變量寫之后的讀取操作不會在volatile寫之前發生。這保證了volatile寫操作的結果對后續操作是可見的。
ok,我們繼續回到這種寫法,由于其使用了cas保證原子性,如果一瞬間有1000線程過來,那么1個線程成功,那么999個線程就要繼續自旋,導致浪費了很多cpu資源,有沒有辦法優化一下昵?
2.2.1 使用Thread#yield降低cpu資源浪費
既然太多線程自旋了,那么可以在自旋失敗后使用Thread#yield降低這種cpu資源的競爭
但是這種方法也不是非常的優秀,因為它會導致請求處理的rt變高,但是是一種優化思路,咱沒辦法做到既要有要。
2.2.2 借鑒LongAdder的思想,減少熱點數據競爭
如上面的寫法,所有線程都在cas競爭修改flowCountHelper中記錄數量,這個數量是一個熱點數據,我們可以學習LongAdder的做法進行優化
LongAdder 內部有base用于在沒有競爭的情況下,進行CAS更新,其中還有Cell數組在沖突的時候根據線程唯一標識對Cell數組長度進行取模,讓線程去更新Cell數組中的內容。這樣最后的值就是 base+Cell數組之和,LongAdder自然只能保證最終一致性,如果邊更新邊獲取總和不能保證總和正確
如下是借鑒后寫的代碼
2.2.2.1 基本屬性
可以看到我們改變了使用一個Integer記錄這一秒請求總數的方式,轉而使用一個AtomicIntegerArray數組記錄,數組之和才是這一秒通過的總數。
而且還使用了ThreadLocal記錄當前線程分配到的位置,一個線程對應AtomicIntegerArray數組中的一個位置,從而實現熱點數據分離!!!
但是這也帶來了一些弊端,后面代碼會有所體現。
2.2.2.2 限流邏輯
public boolean isAllow(int acquire) {
if (acquire > max) {
return false;
}
while (true) {
// 當前是第幾秒
int currentSeconds = currentSeconds();
// 上一次統計是第幾秒
int preSeconds = flowCountHelper.getStamp();
int currentThreadRandomIndex = THREA_RAMDON_INDEX.get();
// 不是同一秒 嘗試new 一個全新的數組!
if (currentSeconds != preSeconds) {
AtomicIntegerArray newCountArray = new AtomicIntegerArray(100);
newCountArray.set(currentThreadRandomIndex, acquire);
if (flowCountHelper.compareAndSet(flowCountHelper.getReference(), newCountArray, preSeconds, currentSeconds)) {
return true;
}
}
// 是同一秒 or cas 失敗 如果是cas失敗,那么說明存在另外一個線程new了一個權限數組
// 統計這一秒有多少請求量
// 細節1 重新使用flowCountHelper.getReference(),因為如果是上面cas失敗,那么這里的flowCountHelper.getReference()對應的AtomicIntegerArray被替換成新的了
AtomicIntegerArray countArray = flowCountHelper.getReference();
int countArrayLength = countArray.length();
// 統計總數
long preCount = 0;
for (int i = 0; i < countArrayLength; i++) {
preCount = countArray.get(i);
}
// 理論上 上面的for不會消耗太多時間
// 不夠需要的,那么false
if (preCount + acquire > max) {
return false;
}
// 在currentThreadRandomIndex的原值
int sourceValue = countArray.get(currentThreadRandomIndex);
// 細節2:使用的是【細節1】拿到的array 這時候不能重新flowCountHelper.getReference(),因為如果上面的for統計超過了一秒,那么這一次的請求會加到下一秒
if (countArray.compareAndSet(currentThreadRandomIndex, sourceValue, sourceValue + acquire)) {
// 弊端,這里true 不一定成功的限制了qps,因為上面的求和 與 這里的cas 不具備一致性,存在其他線程修改了的情況
return true;
}
// 理論沖突的概率降低了,不需要 yield 吧
}
}
可以看到大體思路差不多,其中有兩處細節,大家可以品一品
-
細節1:
-
細節2:
這里使用的是統計前獲取AtomicIntegerArray,為什么不
flowCountHelper.getReference()?因為存在另外線程發現不是同一秒然后更新了flowCountHelper中AtomicIntegerArray引用的指向,如果重新flowCountHelper.getReference()可能讓上一秒的請求加到下一秒,當然這也不是不可以,這也相當于上一秒借用了下一秒-
弊端:求和和cas不具備一致性
-
問題:為什么AtomicIntegerArray可以保證數組元素的可見性?
同樣是因為使用了內存屏障!
另外筆者這里的AtomicIntegerArray是沒法擴容的,默認100個。LongAdder的設計則更為巧妙,LongAdder中存在一個volatile long base值,LongAdder會優先case更新base,如果存在多線程導致case失敗,才使用數組進行規避,而且還具備擴容的能力,感興趣的話可以看看筆者寫的:JUC源碼學習筆記4——原子類源碼分析,CAS,Volatile內存屏障,緩存偽共享與UnSafe相關方法 - Cuzzz - 博客園 (cnblogs.com)
2.3 計數器算法的不足
臨界值問題:假設我們qps最大為10,如果在第一秒的前900ms沒有請求,但是后100ms通過了10個請求,然后來到下一秒,下一秒的100ms也通過了100ms,那么在第一秒后100ms和下一秒的前100ms一共通過了20個請求,這一段時間內是超出了qps 10的!
為此有了下面的滑動窗口算法
2.4 滑動窗口算法
為了避免計數器中的臨界問題,讓限制更加平滑,將固定窗口中分割出多個小時間窗口,分別在每個小的時間窗口中記錄訪問次數,然后根據時間將窗口往前滑動并刪除過期的小時間窗口。
計數器算法,可以看做只有兩個窗口,因此在兩個窗口邊界的時候會出現臨界問題。而滑動窗口統計當前時間處于的1s內產生了多少次請求,避免了臨界問題
- 優點:實現相對簡單,且沒有計數器算法的臨界問題
- 缺點:無法應對短時間高并發(突刺現象),比如我在間歇性高流量請求,每一批次的請求,處于不同的窗口(圖中的虛線窗口)
接下來我們將手寫滑動窗口算法(sentinel也是使用的滑動窗口算法:Sentinel基本使用與源碼分析)
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.atomic.AtomicStampedReference;
/**
* 假設 把一秒分割為 5 份
* |-----|-----|-----|-----|------|
* 0 200ms 400ms 600ms 800ms 1000ms
* 假設當前是10s余500ms 那么應該落到 400ms——600ms之間
* 算法就是 10s余500ms%1s = 500ms,500ms/(1s/5) = 2 ==> 對應400ms——600ms
* 這里需要注意 如果不是同一秒的值,那么不要統計進去,
* 比如9s的時候400ms——600ms值是10,現在時間到10s余500ms了
* 統計的時候不是10+1,而是1,因為都不同一秒了,因此滑動窗口的元素需要記下當前是第一秒的值
*/
public class CountFlowChecker3 {
// AtomicReferenceArray 就是上面的滑動窗口,本質是一個數組
// AtomicStampedReference中stamp記錄是第幾秒的值,Integer記錄數量
private AtomicReferenceArray<AtomicStampedReference<Integer>> slideWindow;
// 最大qps
private int max;
// 把一秒分為多少份!
private int arrayLength;
// 一份是多少ms
private int intervalDuration;
public CountFlowChecker3(int max, int arrayLength) {
this.max = max;
this.arrayLength = arrayLength;
this.slideWindow = new AtomicReferenceArray<>(arrayLength);
// 這里可能存在沒辦法整除的情況,不是本文的主題,暫不做考慮
this.intervalDuration = 1000 / arrayLength;
}
public boolean isAllow(int acquire) {
if (acquire > max) {
return false;
}
while (true) {
// 當前時間
long currentMilliSeconds = currentMilliSeconds();
int currentSeconds = (int) (TimeUnit.MILLISECONDS.toSeconds(currentMilliSeconds) % Integer.MAX_VALUE);
// 對應在滑動窗口的位置
int index = (int) (currentMilliSeconds%1000 / this.intervalDuration);
// 求和
long preSum = sum(currentSeconds);
// 超出限流
if (preSum + acquire >= max) {
return false;
}
// 獲取當前位置的引用
AtomicStampedReference<Integer> element = slideWindow.get(index);
// 如果沒有初始化
if (Objects.isNull(element)) {
if (slideWindow.compareAndSet(index, null, new AtomicStampedReference<>(acquire, currentSeconds))) {
return true;
}
}
// 刷新一下,因為這時候maybe被其他線程初始化了
element = slideWindow.get(index);
// 是同一秒,那么+,如果不是那么覆蓋
int sourceSeconds = element.getStamp();
int updateValue = sourceSeconds == currentSeconds ? element.getReference() + acquire : acquire;
if (element.compareAndSet(element.getReference(),updateValue,sourceSeconds,currentSeconds)) {
return true;
}
}
}
private long sum(int currentSeconds) {
int sum = 0;
for (int i = 0; i < slideWindow.length(); i++) {
AtomicStampedReference<Integer> element = slideWindow.get(i);
// 是同一秒的值才統計!
if (Objects.nonNull(element) && element.getStamp() == currentSeconds
&& Objects.nonNull(element.getReference())) {
sum = element.getReference();
}
}
return sum;
}
private static long currentMilliSeconds() {
return System.currentTimeMillis();
}
}
可以看到滑動窗口統計完多個窗口值后,如果判斷可以繼續通過那么也是進行cas更新,統計sum和后面的cas也不具備一致性
并且同樣可以使用LongAdder優化熱點數據競爭的問題,比如下優化,代碼類似于2.2.2
2.5 令牌桶算法
請求執行作為消費者,每個請求都需要去桶中拿取一個令牌,取到令牌則繼續執行;如果桶中無令牌可取,就觸發拒絕策略,可以是超時等待,也可以是直接拒絕本次請求,由此達到限流目的。當桶中令牌數大于最大數量的時候,將不再添加。它可以適應流量突發,N 個請求到來只需要從桶中獲取 N 個令牌就可以繼續處理。
import com.google.common.util.concurrent.AtomicDouble;
public class CountFlowChecker4 {
// 最大qps
private final double maxTokens;
// 上一次可用的tokens
// com.google.common.util.concurrent.AtomicDouble;
// 使用doubleToRawLongBits和longBitsToDouble進行double的轉換
private final AtomicDouble availableTokens;
// 上一次填充的間隔
private volatile long lastRefillTimeStamp;
public CountFlowChecker4(double maxTokens) {
this.maxTokens = maxTokens;
this.availableTokens = new AtomicDouble(maxTokens);
this.lastRefillTimeStamp = System.currentTimeMillis();
}
public boolean isAllow(int acquire) {
if (acquire > maxTokens) {
return false;
}
long now = System.currentTimeMillis();
// 嘗試根據時間重新填充令牌
refill(now);
double currentTokens = availableTokens.get();
// 如果沒有足夠的令牌,則立即返回false,不阻塞
if (currentTokens < acquire) {
return false;
}
// 如果令牌數量足夠,則使用CAS減少一個令牌
return availableTokens.compareAndSet(currentTokens, currentTokens - acquire);
}
private void refill(long now) {
double tokensToAdd = (((double) (now - lastRefillTimeStamp)) / 1000 * maxTokens);
double preCount = availableTokens.get();
double newTokenCount = Math.min(maxTokens, preCount + tokensToAdd);
// 使用CAS更新令牌數量,如果失敗則忽略(其他線程可能已經更新了)
if (tokensToAdd > 0) {
if (availableTokens.compareAndSet(preCount, newTokenCount)) {
// 這里不需要糾結lastRefillTimeStamp 和 availableTokens更新的原子性
// 因為lastRefillTimeStamp 記錄的是上一次更新時間
// 如果當前線程成功,那么就更新吧
lastRefillTimeStamp = now;
}
}
}
}
令牌桶算法實現也并沒有太復雜,而且這里使用的是動態計算令牌數據,可以看出適應流量突發,一瞬間可用給出全部的令牌,甚至還可以積攢令牌應對并發,但是這種允許突發流量對于下游是不是不太友好。
2.6 漏桶算法
漏桶限流算法的核心就是, 不管上面的水流速度有多塊, 漏桶水滴的流出速度始終保持不變,不論進入的流量有多么不規則,流量的離開速率卻始終保持恒定
在漏桶算法中,有一個固定容量的桶,請求(類似水)以任意速率流入桶內,而桶以恒定速率往外“漏”出請求。如果桶滿了,進來的新請求會丟棄或排隊等待。
通常實際應用是通過定時器任務實現漏桶的“漏水”操作,即定時任務線程定時從桶中獲取任務進行執行,理論上使用一個阻塞隊列+調度線程池可進行實現。
漏桶算法在需要及時響應的場景下不是很友好,任務如果被提交到桶,調用方卻超時了那么任務處理也沒啥意義了,和本地場景不是很符合。
3.筆者的思考
3.1 限流器算法比較
-
計數器(Fixed Window Counter)
最簡單的限流算法,基于一個固定的時間窗口(比如每秒),統計請求的數量,當請求數量超出閾值時,新的請求將被拒絕或者排隊。
- 優點:實現簡單,容易理解。
- 缺點:在時間窗口邊界處存在突發請求量的問題,即窗口重置時可能會突然允許大量請求通過,從而導致短時間的高流量。
-
滑動窗口(Sliding Window Log)
滑動窗口算法是對計數器算法的一種改進,它考慮了時間窗口中的每個小間隔。這些間隔可以是過去幾秒、幾分的N個桶,算法根據請求到達的時間進行統計,使得限流更加平滑。
- 優點:相比固定窗口算法,滑動窗口可以減少時間窗口邊緣的突發流量問題。
- 缺點:比固定窗口算法復雜,實現和維護成本更高。
-
漏桶(Leaky Bucket)
漏桶算法使用一個固定容量的桶來表示令牌或請求。請求按照固定的速率進入桶內,而桶按照固定的速率向外漏水(處理請求),當桶滿時,多余的水(請求)會溢出(被拒絕或排隊)。
- 優點:能夠以恒定的速率處理請求,避免了突發流量影響。
- 缺點:即使網絡狀況良好,桶的出水速率也是恒定的,這可能會導致一定程度上的資源浪費。
-
令牌桶(Token Bucket)
令牌桶算法維護一個令牌桶,桶內有一個固定容量的令牌。系統以恒定速率生成令牌到桶中,當請求到來時,如果桶內有令牌,則允許該請求通過,并消耗一個令牌;如果沒有令牌,則請求被拒絕或排隊。
- 優點:能夠允許一定程度的突發流量,因為可以累積令牌;處理請求的速率可以動態調整。
- 缺點:實現比固定窗口計數器和漏桶算法復雜。
3.2 上述編碼中的思想
-
【2.1.2解決
[checkerMap讀是沒有競爭的,不需要加鎖]的問題】讀寫分離的思想,如mysql中讀取一般是不加鎖的,我們在實際業務開發中讀取數據也一般是不會加鎖的!
-
【2.1.3 減小鎖粒度】
如mysql中的行鎖和表鎖,行鎖提高了更高的并發度,這也是innodb優于其myisam的點
-
【2.2.2 借鑒LongAdder的思想,減少熱點數據競爭】
熱點數據分離,比如redis中的熱key釋放可拆分為key1,key2進行熱點數據分離。
比如大賣的商家,其訂單流水分在多個表,分散熱點避免單表性能瓶頸
-
【2.6 漏桶算法】
類似消息的隊列的削峰填谷,將請求放到消息隊列,讓消費者以合適的速率進行消費
3.3 分布式限流
如果機器有500臺,限流100qps怎么辦?
- Sentinel提供了一種分布式限流,核心是選取一臺機器作為leader,其他機器調用的時候需要發送請求申請令牌,leader負責進行統計。但是一般建議使用,因為具備單點故障的問題,而且也不夠去中心化。
- redis實現,每一臺機器都需要訪問redis進行讀寫操作,熱key問題,并且徒增rt,不太劃算
我們在雙11大促的時候,會進行流量評估,一般不建議單機qps不能小于5,小于5的限流容易出現誤殺,不太具備現實意義。
因此如果流量負載均衡,那么建議優化為單機限流,使用sentinel or guava的RateLimiter
但是筆者有一個項目,算法提供AIGC服務,機器只有100臺,且每一臺并發度為1,服務端有500臺,但是筆者是離線定時任務調用AIGC服務,所以我使用了分布式調度map-reduce,使用調度機器分配任務到100臺服務端機器上,服務端機器單機串行從而控制qps!
總結
以上是生活随笔為你收集整理的不止八股:阿里内部语雀一些有趣的并发编程笔试题2——手写限流器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JVM学习-程序编译与优化
- 下一篇: 【scikit-learn基础】--『监