rxjava背压_如何形象地描述RxJava中的背压和流控机制?
之前我在知乎上受邀回答過(guò)一個(gè)關(guān)于RxJava背壓(Backpressure)機(jī)制的問(wèn)題,今天我把它整理出來(lái),希望對(duì)更多的人能有幫助。
RxJava的官方文檔中對(duì)于背壓(Backpressure)機(jī)制比較系統(tǒng)的描述是下面這個(gè):
但本文的題目既然是要“形象地”描述各個(gè)機(jī)制,自然會(huì)力求表達(dá)簡(jiǎn)潔,讓人一看就懂。所以,下面我會(huì)盡量拋開(kāi)一些抽象的描述,主要采用打比方的方式來(lái)闡明我對(duì)于這些機(jī)制的理解。
首先,從大的方面說(shuō),上面這篇文檔的題目,雖然叫“Backpressure”(背壓),但卻是在講述一個(gè)更大的話題——“Flow Control”(流控)。Backpressure只是Flow Control的其中一個(gè)方案。
在RxJava中,可以通過(guò)對(duì)Observable連續(xù)調(diào)用多個(gè)Operator組成一個(gè)調(diào)用鏈,其中數(shù)據(jù)從上游向下游傳遞。當(dāng)上游發(fā)送數(shù)據(jù)的速度大于下游處理數(shù)據(jù)的速度時(shí),就需要進(jìn)行Flow Control了。
這就像小學(xué)做的那道數(shù)學(xué)題:一個(gè)水池,有一個(gè)進(jìn)水管和一個(gè)出水管。如果進(jìn)水管水流更大,過(guò)一段時(shí)間水池就會(huì)滿(溢出)。這就是沒(méi)有Flow Control導(dǎo)致的結(jié)果。
Flow Control有哪些思路呢?大概是有四種:(1) 背壓(Backpressure)。
(2) 節(jié)流(Throttling)。
(3) 打包處理。
(4) 調(diào)用棧阻塞(Callstack blocking)。
下面分別詳細(xì)介紹。
注意:目前RxJava的1.x和2.x兩個(gè)版本序列同時(shí)并存,2.x相對(duì)于1.x在接口上有很大變動(dòng),其中也包括Backpressure的部分。但是,這里要討論的Flow Control機(jī)制中的相關(guān)概念,卻都是適用的。
Flow Control的幾種思路
背壓(Backpressure)
Backpressure,也稱為Reactive Pull,就是下游需要多少(具體是通過(guò)下游的request請(qǐng)求指定需要多少),上游就發(fā)送多少。這有點(diǎn)類似于TCP里的流量控制,接收方根據(jù)自己的接收窗口的情況來(lái)控制接收速率,并通過(guò)反向的ACK包來(lái)控制發(fā)送方的發(fā)送速率。
這種方案只對(duì)于所謂的cold Observable有效。cold Observable指的是那些允許降低速率的發(fā)送源,比如兩臺(tái)機(jī)器傳一個(gè)文件,速率可大可小,即使降低到每秒幾個(gè)字節(jié),只要時(shí)間足夠長(zhǎng),還是能夠完成的。相反的例子是音視頻直播,數(shù)據(jù)速率低于某個(gè)值整個(gè)功能就沒(méi)法用了(這種就屬于hot Observable了)。
節(jié)流(Throttling)
節(jié)流(Throttling),說(shuō)白了就是丟棄。消費(fèi)不過(guò)來(lái),就處理其中一部分,剩下的丟棄。還是舉音視頻直播的例子,在下游處理不過(guò)來(lái)的時(shí)候,就需要丟棄數(shù)據(jù)包。
而至于處理哪些和丟棄哪些數(shù)據(jù),就有不同的策略。主要有三種策略:sample (也叫throttleLast)
throttleFirst
debounce (也叫throttleWithTimeout)
從細(xì)的方面分別解釋一下。
sample,采樣。類比一下音頻采樣,8kHz的音頻就是每125微秒采一個(gè)值。sample可以配置成,比如每100毫秒采樣一個(gè)值,但100毫秒內(nèi)上游可能過(guò)來(lái)很多值,選哪個(gè)值呢,就是選最后那個(gè)值。所以它也叫throttleLast。
throttleFirst跟sample類似,比如還是每100毫秒采樣一個(gè)值,但選這100毫秒內(nèi)的第一個(gè)值。在Android開(kāi)發(fā)中有時(shí)候可以把throttleFirst用作點(diǎn)擊事件的防抖動(dòng)處理,就是因?yàn)樗梢栽谥付ǖ囊欢螘r(shí)間內(nèi)處理第一個(gè)點(diǎn)擊事件(即采樣第一個(gè)值),但丟棄后面的點(diǎn)擊事件。
debounce,也叫throttleWithTimeout,名字里就包含一個(gè)例子。比如,一個(gè)網(wǎng)絡(luò)程序維護(hù)一個(gè)TCP連接,不停地收發(fā)數(shù)據(jù),但中間沒(méi)數(shù)據(jù)可以收發(fā)的時(shí)候,就有間歇。這段間歇的時(shí)間,可以稱為idle time。當(dāng)idle time超過(guò)一個(gè)預(yù)設(shè)值的時(shí)候,就算超時(shí)了(time out),這個(gè)時(shí)候可能就需要把連接斷開(kāi)了。實(shí)際上一些做server端的網(wǎng)絡(luò)程序就是這么工作的。每收發(fā)一個(gè)數(shù)據(jù)包之后,啟動(dòng)一個(gè)計(jì)時(shí)器,等待一個(gè)idle time。如果計(jì)時(shí)器到時(shí)之前,又有收發(fā)數(shù)據(jù)包的行為,那么計(jì)時(shí)器重置,等待一個(gè)新的idle time;而如果計(jì)時(shí)器時(shí)間到了,就超時(shí)了(time out),這個(gè)連接就可以關(guān)閉了。debounce的行為,跟這個(gè)非常類似,可以用它來(lái)找到那些連續(xù)的收發(fā)事件之后的idle time超時(shí)事件。換句話說(shuō),debounce可以把連續(xù)發(fā)生的事件之間的較大的間歇找出來(lái)。
打包處理
打包就是把上游來(lái)的小包裹打成大包裹,分發(fā)到下游。這樣下游需要處理的包裹的個(gè)數(shù)就減少了。RxJava中提供了兩類這樣的機(jī)制:buffer和window。
buffer和window的功能基本一樣,只是輸出格式不太一樣:buffer打包后的包裹用一個(gè)List表示,而window打包后的包裹又是一個(gè)Observable。
調(diào)用棧阻塞(Callstack blocking)
這是一種特殊情況,阻塞住整個(gè)調(diào)用棧(Callstack blocking)。之所以說(shuō)這是一種特殊情況,是因?yàn)檫@種方式只適用于整個(gè)調(diào)用鏈都在一個(gè)線程上同步執(zhí)行的情況,這要求中間的各個(gè)operator都不能啟動(dòng)新的線程。在平常使用中這種應(yīng)該是比較少見(jiàn)的,因?yàn)槲覀兘?jīng)常使用subscribeOn或observeOn來(lái)切換執(zhí)行線程,而且有些復(fù)雜的operator本身也會(huì)在內(nèi)部啟動(dòng)新的線程來(lái)處理。另外,如果真的出現(xiàn)了完全同步的調(diào)用鏈,前面的另外三種Flow Control思路仍然可能是適用的,只不過(guò)這種阻塞的方式更簡(jiǎn)單,不需要額外的支持。
這里舉個(gè)例子把調(diào)用棧阻塞和前面的Backpressure比較一下?!罢{(diào)用棧阻塞”相當(dāng)于很多車行駛在盤山公路上,而公路只有一條車道。那么排在最前面的第一輛車就擋住了整條路,后面的車也只能排在后面。而“Backpressure”相當(dāng)于銀行辦業(yè)務(wù)時(shí)的窗口叫號(hào),窗口主動(dòng)叫某個(gè)號(hào)過(guò)去(相當(dāng)于請(qǐng)求),那個(gè)人才過(guò)去辦理。
如何讓Observable支持Backpressure?
在RxJava 1.x中,有些Observable是支持Backpressure的,而有些不支持。但不支持Backpressure的Observable可以通過(guò)一些operator來(lái)轉(zhuǎn)化成支持Backpressure的Observable。這些operator包括:onBackpressureBuffer
onBackpressureDrop
onBackpressureLatest
onBackpressureBlock(已過(guò)期)
它們轉(zhuǎn)化成的Observable分別具有不同的Backpressure策略。
而在RxJava 2.x中,Observable不再支持Backpressure,而是改用Flowable來(lái)專門支持Backpressure。上面提到的四種operator的前三種分別對(duì)應(yīng)Flowable的三種Backpressure策略:BackpressureStrategy.BUFFER
BackpressureStrategy.DROP
BackpressureStrategy.LATEST
onBackpressureBuffer是不丟棄數(shù)據(jù)的處理方式。把上游收到的全部緩存下來(lái),等下游來(lái)請(qǐng)求再發(fā)給下游。相當(dāng)于一個(gè)水庫(kù)。但上游太快,水庫(kù)(buffer)就會(huì)溢出。
onBackpressureDrop和onBackpressureLatest比較類似,都會(huì)丟棄數(shù)據(jù)。這兩種策略相當(dāng)于一種令牌機(jī)制(或者配額機(jī)制),下游通過(guò)request請(qǐng)求產(chǎn)生令牌(配額)給上游,上游接到多少令牌,就給下游發(fā)送多少數(shù)據(jù)。當(dāng)令牌數(shù)消耗到0的時(shí)候,上游開(kāi)始丟棄數(shù)據(jù)。但這兩種策略在令牌數(shù)為0的時(shí)候有一點(diǎn)微妙的區(qū)別:onBackpressureDrop直接丟棄數(shù)據(jù),不緩存任何數(shù)據(jù);而onBackpressureLatest則緩存最新的一條數(shù)據(jù),這樣當(dāng)上游接到新令牌的時(shí)候,它就先把緩存的上一條“最新”數(shù)據(jù)發(fā)送給下游??梢越Y(jié)合下面兩幅圖來(lái)理解。
onBackpressureBlock是看下游有沒(méi)有需求,有需求就發(fā)給下游,下游沒(méi)有需求,不丟棄,但試圖堵住上游的入口(能不能真堵得住還得看上游的情況了),自己并不緩存。這種策略已經(jīng)廢棄不用。
本文重點(diǎn)在于以宏觀的角度來(lái)描述和對(duì)比RxJava中的Flow Control機(jī)制和Backpressure的各種機(jī)制,很多細(xì)節(jié)沒(méi)有涉及。比如,buffer和window除了能把一段時(shí)間內(nèi)收到的數(shù)據(jù)打包,還能把固定數(shù)量的數(shù)據(jù)進(jìn)行打包。再比如,onBackpressureDrop和onBackpressureLatest在一次收到下游多條數(shù)據(jù)的請(qǐng)求時(shí)分別會(huì)如何表現(xiàn),本文沒(méi)有詳細(xì)說(shuō)明。大家可以查閱相應(yīng)的API Reference來(lái)獲得答案,也歡迎留言與我一起討論。
總結(jié)
以上是生活随笔為你收集整理的rxjava背压_如何形象地描述RxJava中的背压和流控机制?的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 可怕的乖孩子_小说《可怕的乖孩子》讲了一
- 下一篇: java 带宽控制_如何使用Java n