压箱底总结:流系统端到端一致性对比
點擊上方“朱小廝的博客”,選擇“設為星標”
回復”666“獲取公眾號專屬資料
分布式最難的2個問題:
1、Exactly Once Message processing。
2、保證消息處理順序。
我們今天著重來討論一下:
為什么很難;
怎么解。
前言
這篇文章可以說是作者壓箱底兒的知識總結(之一,畢竟作者學的東西很雜 ╮( ̄▽ ̄"")╭ )了,斷斷續續寫了將近三個月, 耗費了大量的精力。
本來的目的只是想對比一下各個state of art的流系統有什么不同, 但是寫出來之后只是亂糟糟的羅列數據和資料,像這樣列數據一樣,“介紹下這個framework這樣實現的,所以有這樣的特性”,“那個是那樣的”.... blablabla... 使得文章只停留于表層,這樣寫并不是一個好的筆記。
我想記錄的是更本質和更精髓的一些東西:我想更深入的探討一個分布式系統的設計是被什么現實和本質問題所逼迫的結果, 2個不同的設計到底是在哪個本質問題上分道揚鑣才造成了系統設計如此的不同。
追尋問題和解法的前因后果,讓未來的自己一眼掃過去之后能夠自然而然的回憶和理解出: "嗯,對,在這種現實限制下,要達到這種效果就必須這么做", 我想從亂糟糟的觀察到的混亂表象中,抽象出流系統所面對的問題和解決方案的本質,這就是本文的目的,和文章中除開引用文獻之外,作者所貢獻的一些自己的思考。
就作者學習流系統的感受來看, 流系統有2個難點:
第一是end to end consistency,或者說exactly once msg processing;
第二是event time based window操作。
本來作者想用一篇文章同時概括和比較這2點,無奈第一點寫完, 文章已經長度爆炸。于是分開2篇,此為上篇, 著重于從分布式系統的本質問題出發, 從最底層的各種"不可能", 和它們的解(比如:consensus協議)開始, 一層一層的遞進到高層的流系統中, 如何實現容錯場景下的end to end consistency,或者說exactly once msg processing。
目錄
流系統的具體對比在“9.流系統的EOMP”這一節, 前邊都是準備知識... -_-
1、一些術語
2、圣光(廣告)不會告訴你的事
3、幾個事實
4、Liveness和Safety的取舍
5、絕望中的曙光
6、Zombie Fencing
7、三節點間的EOMP
8、加入節點狀態的三節點間的EOMP
9、流系統的EOMP
10、異步增量checkpointing
11、系統內與系統外
12、Latency, 冪等和non-deterministic
13、REFERENCE
一些術語
1、端到端一致性end to end Consistency
一致性其實就是業務正確性, 在不同的業務場景有不同的意思, 在"流系統中間件"這個業務領域, 端到端的一致性就代表Exact once msg processing, 一個消息只被處理一次,造成一次效果。
注意: 這里的"一個消息"代表"邏輯上的一個", 即application對中間件的期待就是把此消息作為一個來處理, 而不是指消息本身的值相等。比如要求計數+1的一個消息, 消息本身的內容可能一模一樣, 但是application發來2次相同消息的"本意"就是要計數兩次, 那么中間件就應該處理兩次, 如果application由于超時重發了本意只想讓中間件處理一次的+1操作, 那么中間件就應該處理一次。
中間件怎么能區分application的"本意"來決策到底處理一次還是多次, 是end to end consistency的關鍵。
2、EOMP
由于Exactly once msg processing太經常出現, 我們用EOMP來代替簡寫一下。
3、容錯failure tolerance
為了方便討論,后邊談到failure, 我們指的都是crash failure, 你可以想象是任何可以造成“把機器砸了然后任何本地狀態丟失(比如硬盤損壞)一樣效果的情況出現"。
在今天的虛擬云時代,這其實很常見,比如container或者虛擬機被resource manager突然kill掉回收了, 那么即使物理機其實沒有問題, 你的application的邏輯節點也是被完全銷毀的樣子。
容錯在end to end Consistency的語義下,是指在機器掛了,網絡鏈接斷開...等情況下,系統的運算結果和沒有任何failure發生時是一摸一樣的。
3、Effective once msg processing(應該翻成有效一次性處理?)
后邊我們可以看到, 保證字面上的Exact once msg processing(即整個系統在物理意義上真的只對消息處理一次), 這在需要考慮容錯的情況下是不可能做到的。
Effective once msg processing是一個更恰當的形容,而所有號稱可以做到EOMP的系統,其實都只是能做到Effective once msg processing。即:中間件, 或者說流處理framework可能在failure發生的情況下處理了多次同一個消息,但是最終的系統計算結果和沒有任何failure時, 一個消息真的只處理了一次時計算的結果相等。這和冪等息息相關。
4、冪等Idempotent
一個相同的操作, 無論重復多少次, 造成的效果都和只操作一次相等。比如更新一個keyValue, 無論你update多少次, 只要key和value不變,那么效果是一樣的。再比如更新計數器處理一次消息就計數器+1, 這個操作本身不冪等, 同一個消息被中間件重"發+收"兩次就會造成計數器統計兩次。
而如果我們的消息有id, 那么更新計數器的邏輯修改為, 把處理過的消息的id全記錄起來, 接到消息先查重, 然后才更新計數器, 那么這個"更新計數器的邏輯"就變成冪等操作了。
把本不冪等的操作轉化為冪等操作是end to end consistency的關鍵之一。
5、確定性計算deterministic
和冪等有些類似, 不過是針對一個計算,相同的input必得到相同的output, 則是一個確定性(deterministic)。
比如從一個msg里計算出一個key和一個value, 如果對同一個消息運算無數次得到的key和value都相同, 那么這個計算就是deterministic的;而如果key里加上一個當前的時鐘的字符串表示, 那么這個計算就不是確定性的, 因為如果重新計算一次這個msg, 得到的是完全不同的key。
注意1: 非確定性計算一般會導致不冪等的操作, 比如我們如果要把上邊例子里的keyvalue存在數據庫里, 重復處理多少次同一個msg, 我們就會重復的插入多少條數據(因為key里的時間戳字符串不同)。
注意2: 非確定性計算并非必然導致不冪等的操作,比如這個時間戳沒有添加在key里而是添加在value里, 且key總是相同的, 那么這個計算還是"非確定性"計算。但是當我們存數據的時候先查重才存keyvalue, 那么無論我們重復處理多少次同一個msg, 我們也只會成功存入第一個keyValue, 之后的keyValue都會被過濾掉。
支持非確定業務計算的同時, 還能在容錯的情況下達成端到端一致性, 是流系統的大難題, 甚至我們今天會提到的幾個state of art的流系統都未必完全支持。(好吧Spark說的就是你)
圣光(廣告)不會告訴你的事
分布式系統最tricky的問題就是, 問題看起來很普通很簡單。一些問題總是看起來有簡單直接的解法,而一個"簡單解"被人查出問題時,也總是看起來可以很簡單的就可以把這個挑出的edge case很簡單的解決掉。
然而我們會立刻發現解決這個edge case而引入的新步驟會引發新的問題... 如此循環, 直到"簡單"疊加到"無法解決的復雜"。
由于人們對這些問題的"預期是簡單的", 所以很多書, online doc, 都大大簡化了對問題的描述和對問題的分析。最普遍的是對failure recovery的介紹, 一般只會簡單的寫"failure發生時, 系統會怎么recovery", 但是完全不提怎么檢測failure和“根本不可能完美檢測到failure”這個分布式系統的基本事實, 從而給了讀者“failure可以完美檢測”的錯覺。
這是因為一來說清楚各種edge case會大大增加文檔的復雜性, 另外一點是寫了讀者可能也看不明白, 還有就是廣告效應, 比如真正字面意義的exactly once msg processing是不存在的, 但是所有其他做到effective once msg processing的系統都說自己可以支持exactly once, 那自己也得打這個廣告不是。
還有就是語焉不詳, 比如某stream系統說自己可以實現exactly once msg delivery, 別看delivery和processing好像差不多, 這里邊的用詞藝術就有意思了,delivery是指消息只在stream里出現一次, 但是在stream里只出現一次的消息卻無法保證只被consume一次確根本不提。
再比如某serverless產品處理某stream的消息, 描述是保證舊的消息沒有處理之前不會處理新消息, 你會想, 簡單描述成保證消息按順序處理不是一樣么? 其實差大了去了, 前者并沒有屏蔽掉舊消息突然replay, 覆蓋掉新消息的處理結果的edge case。而這個事實甚至顛覆了很多使用這個服務的Sr. SDE的對其的認知。
沒有理解分布式系統的幾個簡單的本質問題之前, 你讀文檔的理解很有可能和文檔真正精準定義的事實不符。且讀者對“系統保證”的理解, 往往會由于文檔"藝術"定義的誤導, 而過多的假設系統保證的"強", 直到被坑了去尋根問底, 才會收到"你誤讀了文檔的哪里的詳細解釋"。這是分布式系統"最難的地方在最普通的地方"的直接結果之一。
個人認為最好的辦法就是去理解分布式系統軟件算法所能達到的上限=>關于各種impossibility的結論的論文,然后去學習克服他們的方法的論文。
這樣, 我們才能從各種簡化了的 tutorials里, 從API中, 從各種云服務, 框架的廣告詞背后, 發現“圣光不會告訴你的事", 和"這個世界的真相";(從廣告和online doc天花亂墜的描述中看到分布式系統設計真正的取舍, 這是區分API調包俠和分布式系統專家的分水嶺之一)。
而不是“簡單的信了它們的邪”。而下邊,就是學習分布式系統,你所需要了解的最重要事實中, 和end to end consistency相關的幾個。
幾個事實
1、不存在完美的failure detector
很多關于分布式系統的書上都會說,當failure發生時系統應該怎么做來容錯, 就好像可以準確的檢測到failure一樣。然而事實是, 在目前互聯網的物理實現上(share nothing architecture, 只靠網絡互聯,不直接共享其他比如內存物理硬盤等),我們無法準確的檢測到failure。
簡單來說,就是當我們發現一個node無反應的時候,比如ping它,給它發消息、request、查詢,都沒有反應,我們無法知道這到底是對方已經停止工作了,還是只是處理的很慢而已。
無法制造完美的failure detector,即使在今天也是分布式系統的基礎事實。本文無意在基礎事實上多費唇舌, 無法接受此事實者可以去翻相關論文。╮( ̄▽ ̄"")╭
Essentially, the impossibility results for Consensus and Atomic Broadcast stem from the inherent difficulty of determining whether a process has actually crashed or is>The fundamental reason why Consensus cannot be solved in completely asynchronous systems is the fact that, in such systems, it is impossible to reliably distinguish a process that has crashed from>A fundamental problem in distributed systems is that network partitions (split brain scenarios) and machine crashes are indistinguishable for the observer, i.e. a node can observe that there is a problem with another node, but it cannot tell if it has crashed and will never be available again or if there is a network issue that might or might not heal again after a while. Temporary and permanent failures are indistinguishable because decisions must be made in finite time, and there always exists a temporary failure that lasts longer than the time limit for the decision…
A third type of problem is if a process is unresponsive, e.g. because of overload, CPU starvation or long garbage collection pauses. This is also indistinguishable from network partitions and crashes. The only signal we have for decision is “no reply in given time for heartbeats” and this means that phenomena causing delays or lost heartbeats are indistinguishable from each other and must be handled in the same way.[29]
2、不存在完美的failure detector, 所導致的幾個顛覆你認知的問題
1)分布式共識問題"Consensus"在"不存在完美的Failure Detector的情況下"不可解, 這又叫做FLP impossibility[36]。
可以說是上世紀,奠定分布式系統研究基石方向的一篇論文。即: 在理論上, 在分布式環境里(更準確說應該是異步環境里), 在最多可能出現一個crash failure的強假設下, 不存在任何一個算法可以保證系統里的所有的"正常"節點對某一信息有共識。
對于"共識"你可以理解為一個數據一摸一樣的備份在多個節點上。(那么paxos, raft這種consensus協議是怎么回事呢? 稍后會解釋)
2)在分布式環境, 連分配只增序列號這件事都很難(即不同的進程去向一個系統申請序列號, 從0開始不斷增加, 保證process得到的序列號不能重復)。
因為本質上這是一個consensus問題, 后邊可以看到, 能夠分配高可用性的global序列號(epoch id), 是解決zombie leader/master/processor的問題時的一大助力。
3)在保證liveness的情況下(即檢測到失敗就在另外的機器重啟邏輯節點), 無法保證系統中的Singleton角色“在同一時間點”只有一個。
比如在有leader概念的分布式系統里, 要求任意時間點只有一個leader做決定, 比如HBase需要只能有一個Region Server負責某region的寫操作; 再比如kafka或者Kinesis[22]里需要只能有一個partition processor接受一個stream partition的信息并且采取行動。
而事實是, 任何云服務和現有實現, 都無法在物理上保證“在同一時間點”, 真的只有一個這樣的邏輯角色存在于機群中; 這就牽涉到一個概念=> Zombie Process。
4)Zombie process
由于沒有完美的failure detector, 所以即使幾率再低, 只要時間夠長, 需要failure detection的用例夠多, 系統不可避免會錯誤的判斷把一個并沒有真正crash掉的process當作死掉了。
而如果系統需要保持高可用性,需要在檢測到crash的時候,在新的機器上啟動此process繼續處理,那么當failure detector出錯,則會發生新老process共同工作的問題,此時,這個老的process就是zombie process。
嚴重注意,在分布式系統里,我們需要單一責任的一個節點/processor/role來做決策或者處理信息時,我們要么不保護系統的高可用性(機器掛了就停止服務),要么解決zombie process會帶來的問題。
高可用性的系統中, zombie無法消除。這關系到分布式系統設計里的一個核心問題:liveness和safety的取舍。
Liveness和Safety的取舍
1、在缺乏完美的failure detector的情況下, 對方遲遲不回信息(ping它也不回), 不發heartbeat, 那么本機只有2個選擇:?
認為對方還沒有crash, 持續等待;
認為其crash掉了, 進行failover處理。
選擇1傷害系統的liveness, 因為如果對方真的掛了,我們會無限等待下去, 系統或者計算就無法進行下去。選擇2傷害系統的safety, 因為如果對方其實沒有crash, 那我們就需要處理可能出現的重發去重, 或者zombie問題, 即系統的邏輯節點的“角色唯一性“就會被破壞掉了。
越好的liveness要求越快的響應速度, 而“100%的safety“的意義, 則因系統的具體功能的不同而不同, 但一般都要求系統做決定要小心謹慎, 不能放過一個edge case, 窮盡所有必要的檢查來保證"系統不允許出現的行為絕對不會發生"。在consensus的語義下來說, safety就是絕對不能向外發出不一致的決定(比如向A說決定是X, 后來向B說決定是Y)。
可以看到, 系統的edge case越多, safety越難保證, 而edge cases的全集只是可能發生的情況的集合, 而某一次運行只會發生一種情況(且大概率是正常情況)。
如果系統不檢查最難分辨最耗時的幾種小概率發生的edge case, 那么系統大概率(甚至極大概率)也可以完美運轉毫無問題幾個月, 運氣好甚至幾年。這樣降低了系統的safety(不再是100%), 但是提高了系統的響應速度(由于是概率上會出問題, 所以即使降低了safety保證, 也不是說就一定會出問題, 只是你把系統的正確性交給了運氣和命運)。
而如果系統保證檢查所有的edge case, 但是系統99.9999%的概率都不會進入一些edge cases, 那么這些檢查就會阻礙正常情況的運算速度。Liveness和Safety, 這是分布式系統設計的最基本取舍之一。
而FLP則干脆說: 在分布式consensus這個問題里, 如果你想要獲得100%的系統safety, 那么你絕對無法保證系統liveness, 即:系統總是存在活鎖的可能性, 算法設計只能減小這個可能性, 而無法絕對消除它。
2、更多的safety VS. liveness 取舍的例子:
Kubernetes StatefulSet, 簡單說是可以給容器(pod/container)指定一個名字的, 且保證全cluster總是只有一個容器可以有這個名字, 這樣application就可以通過這個保證來指定機群中的邏輯角色, 且用這個邏輯容器中保存一些狀態。(一般的replicaSet會load balance連接或請求到背后不同的節點, 你的一個請求要求在server本地存一些狀態, 下一個請求未必還會到同一個server)
When a stateful pod instance dies (or the node it’s running on fails), the pod instance needs to be resurrected on another node, but the new instance needs to get the same name, network identity, and state as the one it’s replacing. This is what happens when the pods are managed through a StatefulSet. [37]
Kubernetes StatefulSet在liveness和safety里選擇了safety, 當statefulSet所在的的物理節點"掛了"之后, kubernetes默認不會重啟這個pod到其他節點去, 因為它無法確定這個物理節點到底死沒死, 為了保證safety它選擇放棄了liveness, 即系統無法自愈, StatefulSet提供所提供的服務不可用, 直到靠人干預來解決問題。
([38] P305: 10.5. Understanding how StatefulSets deal with node failures)?
Node fail cause daemon of Kubelet could not tell state of pod on the node….StatefulSet guarantees that there will never be two pods running with the same identity and storage...
Akka Cluster也做了相同的選擇, 在cluster membership管理中,有一個auto-downing的配置, 如果你打開它, 那么cluster就會完全相信Akka的failure detection而自動把unreachable的機器從cluster中刪去, 這意味著一些在這個unreachable節點上的Actor會自動在其他節點重啟。
Akka Cluster的文檔中, auto-downing是強烈不推薦使用的[38], 這是由于Akka Cluster提供的很多feature要求角色的絕對單一性, 比如singleton role這個功能, 在保證“cluster里只有這一個節點扮演這個actor"(safety), 和保證"cluster里總要有一個節點扮演這個actor"(liveness) 中, 選擇了safety, 即保證at most one actor存在于cluster中, 一旦次actor的節點變成unreachable(比如機器真的掛了), 那么Akka也無能為力, 只能傻等這個節點回來或者人來干預決策:
The default setting in Akka Cluster is to not remove unreachable nodes automatically and the recommendation is that the decision of what to do should be taken by a human operator or an external monitoring system. [29]
一個商用的Akka拓展(Akka Split Brain Resolver)提供了一些智能點的解決方案(基于quorum), 有興趣的同學可以看引用文檔[29]。
This is a feature of the Lightbend Reactive Platform. that is exclusively available for Lightbend customers.[29]
3、為什么Kubernetes和Akka不能同時保證safety和liveness呢?
這是因為這兩個作為比較底層的平臺, 他們需要對上層提供非常大的自由性, 而不能限制上層的活動。比如kubernetes沒有規定用戶不能在pod上跑某種程序, Akka也沒有規定用戶不能寫某種actor的code。
這樣, 在不限制自己處理能力的同時要保證任何行為都看起來exactly happen once(因為語義上singleton節點只有一個, 那么就不能讓用戶寫的任意單線程程序出現多節點平行執行的外部效果), 而這對中間件來說是不可能的, 這就引出了另外一篇論文: end to end argument[27], 作者已經寫過一篇文章詳細介紹end to end argument(阿萊克西斯:End to End Argument(可能是最重要的系統設計論文)), 這里不在贅述。
后邊我們可以看到Flink, Spark等流系統為了保證exactly once msg processing需要怎樣和end to end argument 搏斗。
4、可以同時保證safety和liveness么?
取決于具體情況下對safety和liveness的具體要求, 在流處理的情況下, 至少本文提到的4種流系統都給出了自己的解。請耐心往下閱讀。
絕望中的曙光
1、可解也不可解的分布式consensus
由于異步環境下, 釘死了我們不可能有一個完美不犯錯的failure detector。這篇著名的論文Unreliable Failure Detectors for Reliable Distributed Systems[30] 詳細描述了即使我們用一個不準確的failure detector, 也可以解決consensus的方法。
但是它并沒有推翻FLP impossibility的結論:Consensus還是并非絕對可解。但是, 如果我們對需要consensus的計算加一個限制,則Consensus可解。
這個限制是: 計算和通訊只需要在"安全時間"內完成即可, 對[30]提供的算法來講, 提供consensus的系統需要在這段時間內"正確識別crash"即可,也就是說(1)識別出真正掛掉的node, 和(2)不要懷疑正確的node。
怎么理解呢, 這兩個看似對立的概念:?
(1)consensus的有解(比如paxos協議)是對的;
(2)consensus的無解證明:FLP impossibility也是對的。
要準確且簡單的解釋為什么它們都是對的有點難, 推薦還是看論文。但是用比喻來解釋的話, 根據[30], Consensus算法可以看作這樣一個東西, 當系統出現crash, failure detector判斷錯誤,或者網絡突然延遲...等時候, 算法會進入某種循環而不會輕易作出決定。
for example, there is a crash that no process ever detects, and all correct processes are repeatedly (and forever) falsely suspected — the application may lose liveness but not safety [31]
而只要滿足必要的條件時(計算和通訊只需要在"安全時間"內完成), 系統則可以跳出循環讓機群達成一致[30,31]。
(1) There is a time after which every process that crashes is always suspected by some correct process.?
(2) There is a time after which some correct process is never suspected by any correct process.?
The two properties of <>W0 state that eventually something must hold forever; this may appear too strong a requirement to implement in practice. However, when solving a problem that “terminates,” such as Consensus, it is not really required that the properties hold forever, but merely that they hold for a sufficiently long time, that is, long enough for the algorithm that uses the failure detector to achieve its goal.
而FLP impossibility則可以理解為挑刺兒的說, 那這個條件永遠無法出現呢? 你的算法就活鎖了呀(丟失liveness)。
幸運的是, 在現實世界, 我們總可以對消息傳遞和處理來估計一個上限, 你可以理解為,只要消息處理總是在這個上限之內完成,那么consensus總是可以實現, 而消息處理的時間即使偶爾超過了這個上限, 我們的consensus協議也會進入安全循環自我保護, 從而不會破壞系統的safety, 而系統總是可以再次回歸平穩(處理時間回歸上限之內)。
而FLP則是像說: 你無法證明系統總是可以回歸平穩 (確實無法證明, 因為FLP的前提是異步模型, 而我們的真實世界更像是介于異步和同步模型之間的半同步模型, 我們只能說極大概率系統可以"回歸平穩", 而無法證明它的絕對保證; =>可以絕對保證"上限"的模型一般稱為同步模型)。
其實用paxos來模擬出FLP的活鎖的例子也很簡單, 你把節點間對leader的heartbeat timeout時間設為0.001ms, 那么所有的節點都會忙著說服別的其他節點自己才是leader(因為太短的保活時間, 除了自己, 節點總是會認為其他的任意節點是leader時, leader死了), 那么系統就會進入活鎖, 永遠無法前進達成cluster內的consensus, 系統喪失liveness。
Zombie Fencing
即使consensus問題解決了, zombie節點也還是大問題, kubernetes和Akka可以選擇避開zombie, 損失liveness。
然而對于絕大多數分布式系統來說, 是必須直面zombie節點這個問題的,比如各種分布式系統的master節點, 如果master掛了整個系統不在另外的機器重啟master,整個系統就可能變為不可用。
再比如kafka和Kinesis的單一partition只能有一個consumer, 如果這個msg consumer掛了不自動重啟, 對消息的處理就會完全停止。
zombie是最容易被忽視的問題, 比如, 即使我們有了paxos, raft, zookeeper這種consensus工具可以幫我們做leader election, 也不要以為你的系統中不會同時有2個leader做決策了。
這是因為先一代的leader可能突然失去任何對外通信,或者cpu資源被其他進程吃光, 或者各種edge case影響, 使得其他節點無法和其通信, 新的leader被選出, 而老的leader其實還沒死, 如果老的leader在失去cpu之前的最后一件事是去寫只有leader才能寫的數據庫, 那么當它突然獲得cpu時間且網絡恢復正常, 那么這個以為自己還是leader的zombie leader就會出乎意料的去寫數據庫。
這曾經是HBase的一個重大bug[39, Leader Election and External Resources P105]。
Apache HBase, one of the early adopters of ZooKeeper, ran into this problem in the field. HBase has region servers that manage regions of a database table. The data is stored on a distributed file system, HDFS, and a region server has exclusive access to the files that correspond to its region. HBase ensures that only one region server is active at a time for a particular region by using leader election through ZooKeeper...?
The region server is written in Java and has a large memory footprint. When available memory starts getting low, Java starts periodically running garbage collection to find memory no longer in use and free it for future allocations. Unfortunately, when collecting lots of memory, a long stop-the-world garbage collection cycle will occasionally run, pausing the process for extended periods of time. The HBase community found that sometimes this stop-the-world time period would be tens of seconds, which would cause ZooKeeper to consider the region server as dead. When the garbage collection finished and the region server continued processing, sometimes the first thing it would do would be to make changes to the distributed file system. This would end up corrupting data being managed by the new region server that had replaced the region server that had been given up for dead.
(解釋不動了, 大家看英文吧...)
其實對付zombie已經是分布式系統的共識了,也有很多標準的解法,以至于各個論文都不會太仔細的去描述, 這里簡單介紹幾種方法:
zombie fencing設計的關鍵點在于如何阻止已經“成為zombie的自己”搞亂正常的“下一代的自己”的狀態。畢竟無論是zombie還是新的要取代可能死掉的上一代的"下一代", 大家跑的都是相同的代碼邏輯,也就是說這同一段代碼, zombie來跑就"不能過:"(比如不能對系統的狀態造成影響), 但是"下一代"來跑, 就可以正常工作。這一般需要滿足以下幾點:
1)如何正確區分“正常的下一代”(由于懷疑當前的節點已經死掉了,所以重新創建和啟動的新一代邏輯節點)和“zombie”(懷疑錯誤,當前節點并沒有死掉,但是新一代節點已經創建并啟動,當前節點成為大家都以為死掉但是還活著的zombie)一般需要一個多機復制且穩定自增的epoch number來確定新老邏輯節點。
這個epoch number要在分布式環境中穩定自增,一般只能通過consensus協議來實現。否則要分配新一代epoch number時,由于管理epoch number的機群的failover造成分配了一個老的epoch number給新啟動的“下一代”,那么zombie反而會有更大的epoch number,這就會造成整個系統的狀態混亂。
怎樣的混亂在介紹完zombie fencing之后就顯而易見了(因為所有其他節點都以為zombie死掉了, 把所有的最新操作和狀態發給新節點,但是新節點卻有一個比zombie還小的epoch number, 從而被zombie fence掉, 而不是自己可以fence zombie)
2)會被zombie影響的系統需要特殊設計使得:當“新一代”注冊后就拒絕“老一代的任何請求”,特別是寫入請求。也就是具體的利用epoch number的zombie fencing的實現; 一般需要具體情況具體分析。
如果被影響的系統是自己的一個microservice,那么可以隨意設計協議來驗證一個請求所攜帶的epoch number是不是最新的。而當這個被影響的系統是一個外部系統, 比如是業務系統需要用到的一個數據庫,由于你沒法改數據庫的代碼和數據庫client與server之間的協議, 那么就要利數據庫所提供的功能或者說它的協議來設計application層級的zombie fencing協議。
比如對提供test and set,compare and swap的kv數據庫來說,application設計自己的業務表時,要求所有的表都必須有一個epoch字段,而所有的寫入都必須用test and set操作來檢測當前epoch字段是否比要寫入的請求的epoch字段大或相等, 否則拒絕寫入。這樣, 只有"下一代"可以更改zombie寫入的數據, 而zombie永遠無法更改"下一代"插入或者更新過的數據。
另一方面,很多時候"下一代"需要讀取上一代的信息,繼承上一代的數據,然后繼續上一代的工作。那么如果剛讀取完數據,zombie就改變了數據,那么"下一代"對于當前系統狀態的判斷就會出差錯。
一個general的解決的方法也很簡單,要讀先寫,“下一代”開始工作前, 如果要先讀入數據了解“系統的當前狀態”,必須先改變數據的epoch number為自己的epoch number(當然要遵從只增更改原則test and set,如果發現當前數據的epoch number已經比自己的epoch number還大了,則說明自己也已經是zombie了,更新的"下下一代"已經開始工作), 更改數據的epoch number成功之后,再讀入數據,就可以保證比自己老的zombie絕對不可能再更改這個數據,而現在讀入的數據可以體現系統的最新狀態,從而完成對"老一代"數據的繼承。
而在增加epoch number之前所有被寫入的數據。這里即使是"新一代"啟動之后, 讀取系統狀態之前被zombie寫入的數據, 都可以看做老一代的合法數據,只要被新一代開始工作前繼承讀入即可。我們所要避免的是"新一代" 所讀取的事實被zombie所更改。而不是在物理時間的意義上在"新一代"啟動時就立刻阻止zombie的所有系統改動。
zombie fencing的設計取決于分布式系統的具體情況,比如業務邏輯可能更改的數據范圍可能是幾百萬幾千萬的數據記錄,那么這也意味著zombie可能會修改的數據范圍非常大,那么要求"下一代"在開始工作前更改所有數據的epoch number就很不現實。
對于zombie的影響的耐受性也會影響zombie fencing的設計,比如如果"下一代"只需要自己所接觸的有限數據在特定時刻之后不被zombie影響就能正確工作, 那么只要在"下一代"需要接觸特定數據時才更改此數據的epoch number來屏蔽zombie即可,那么即使業務可能修改的數據范圍很大,簡單的更改數據的epoch number也還是可以接受的解決方案。
最糟糕的情況,如果"zombie"可能會插入新的數據, 而"下一代"的正常工作需要不能有非法的新數據插入(比如下一代開始工作前先統計所有資源的個數,然后開始基于這個事實和"只有自己才能更改資源"的假設,作出各種決策。
而此時zombie突然插入了一條新資源記錄或者資源使用記錄...),如果"新一代"完全無法預測zombie會插入什么記錄,要阻止zombie隨意插入數據,“新一代”就只能在利用predicate lock來防止新紀錄插入,且不說很多數據庫根本不支持“鎖住不存在的數據”的predicate lock,就算支持此功能的數據庫也很有可能是使用表級鎖來鎖住整張表。
如果數據表設計成了需要共享給多個節點使用(比如一張資源表,不同的singleton worker負責維護不同的資源范圍),那么表級鎖就會妨礙其他worker的工作。
zombie fencing的設計在于如何引入簡單的fencing點, 對"新一代"暢通無阻,但是卻可以阻止zombie的異常活動, 如果協議設計使得"新一代"可以很容易制造這個fencing點, 則"新一代"在啟動或者需要的時候加入fencing點即可。
比如前邊說的任何數據都要附帶一個epoch值,任何數據寫入都要用test and set來對比數據的當前epoch值和請求的epoch值。
對于上文的隨機插入的業務需求, 可以要求業務邏輯插入任何數據之前,先在一個注冊表的屬于自己epoch的一行里記錄自己要寫的數據的id, 且在記錄的時候用test and set來檢測自己這一行數據的active值是否被更改為disable了。
這樣就相當于引入了一個更簡單的fencing點,因為"下一代"只要在注冊表里把所有上一代的記錄寫為disable, 就可以阻止zombie的未來任何活動,但是此時無法阻止zombie的最后一個注冊的數據插入, 但是"下一代"可以簡單的讀注冊表得知這個數據的id, 從而對這個"最后的zombie寫入"采取相應的策略(繼承,刪除, 甚至fencing, 比如這個id并不存在,那么無法得知是zombie真的在寫之前死了所以永遠不會插入這個記錄了,還是zombie只是卡了, 那么"下一代"可以用自己的epoch和zombie注冊的id先插入一條記錄來占位,這樣無論zombie是真的死了還是卡了,都無法再寫入這個數據了)。這樣,我們就引入了一個連數據插入都可以fencing的fencing點。
Zombie fencing一般都是以上這些套路, 用consensus協議確定epoch number區分"下一代"還是zombie,這個epoch number一般也可以稱為fencing token, 通過把fencing token分發給需要拒絕zombie的service,把fencing token和需要保護的數據(防止被zombie修改)存在一起。
所以一般論文[7, 26]里只會簡單的提到epoch或者sequencer等概念,基本都是zombie fencing的fencing token。
三節點間的EOMP
三點為 (上游/input提供端)=> (當前計算節點/計算結果發送端) =>(下游/計算接收端)
如果我們考慮必須保證系統的高可用性,即檢測到任意process的failure,都會由一個(絕對不死)的高可用的JobManager或者MasterNode,來重啟(可能在另外的node)這個process, 所以我們定義這種即使所在的host掛掉, 也會不斷重新在其他host上重啟的process為邏輯process。這時我們要面臨幾種可能造成inconsistent的情況:
"計算接受端"沒有成功ack"計算結果發送端"的消息,一般表現為發送端的等待ack 超時。根據之前的討論,接收端有可能把消息處理完畢了(ack的消息丟失,或者剛處理完消息還沒發ack就掛了…等情況),也可能沒有處理完畢(沒接到或剛接到消息就掛了…等情況)。
這種情況發送端可以重發信息, 而發送端是需要“上游input提供端”提供某種數據然后進行某種計算后產生的這個消息/計算結果(設為outputA), 那么"計算結果發送端"有兩個策略:
策略1: 利用存儲計算結果來盡量避免重算
要實現上下游exact once processing,需要實現4個條件:
結果高可用;
下游去重;
上游可以replay;
記錄上游進度。
1)要求結果高可用, 應對timeout時, “下游計算接收端”其實并沒有成功處理"計算結果發送端"的計算結果的情況(比如下游掛了), 這時"計算結果發送端"可以把計算的結果存儲在高可用的DataStore里(比如DynamoDB,Spanner…或者自己維護的多備份數據庫)。
這樣超時只要重發這個計算結果即可, 自己甚至可以開始去做別的事情, 比如處理和計算下一個來自“上游/input提供端“的event, 而已經被“下游計算接收端”ack的"計算結果"則可以清理,一般由異步的garbage collection清理掉。
注意, 由于存在存儲失敗的可能性, 或者剛計算完結果還沒來得及存儲就掛掉重啟的可能,我們無法真的保證避免重算,詳見:無法避免的重算的例子。
2)下游去重,應對timeout時下游其實已經處理完畢消息的情況
①一般的解決方案:當邏輯接收端不固定, 比如發送端要根據計算出來的outputA的某key字段把不同的key發送給負責不同key range(也就是partition)的多個"下游計算接收端"。
只需要一個sequenceId就可以實現接收端的消息去重。接收端和發送端各維護一個partition level的sequenceId即可。這樣當發送端收到當前message sequenceId(假設為n)的Ack才發下一個sequenceId為n+1的信息,否則就無限重試。而接收端則根據收到的消息的id是不是已經處理過的最大id+1來判斷是這是不是下一個message。
②Google MillWheel的特例:Google MillWheel做出了一個很有意思的選擇,發送端完全不維護sequenceId,而是為每一個發出的message生成一個全局唯一的id,下游則需要記住"所有"見過的id來去重,但是這樣會造成大量查詢io和存儲cost,所以需要另外的方案來解決性能和下游沒有無限的存儲所以"不可能記住所有id"的問題。這個例子比較特殊,有興趣的同學可以查閱[4,7]
③要求觸發本次計算的“上游input提供端”可以replay input event,否則剛接到event還沒計算就掛掉重啟, 則event丟失。
無法避免的重算:任何時候計算沒完成,或者計算完成后但是成功儲存前(a.結果高可用的需求), 計算節點fail掉重啟, 我們都需要replay上次計算過的input event,所以由于計算結果都還沒存成功,所以從物理上講, 此時我們還是重算了的; 所以即使我們采用把計算結果記錄下來的策略, 我們無法從物理意義上真正避免重算, 我們避免的是有多個"重復的"成功計算結果提交給下游。
而當計算不是deterministic的, 這多個“重復的”計算結果可能是不同的值發送給不同的下游((比如按照計算結果的key發送給下游不同的partition)。那么下游就會處理同一個event所產生的本應只有一個的計算結果兩次,且由于非確定性計算的原因,這兩個計算結果不一樣。這就會造成event不是EOMP的問題。(不僅在物理上計算了2次, 在效果上也影響了2次下游的計算, 打破的effective process once的要求)
④要求記錄event處理的進度, 并保證儲存計算結果不出現重復。記錄event處理的進度, 使得trigger本次計算的"event"可以被屏蔽(比如, ack“上游input提供端”, 告知其input event處理完畢, 可以發下一個了), 來避免計算的re-trigger; 這要求以下策略2選一:
記錄event處理的進度, 和把計算結果存在高可用存儲里的操作是一個原子操作, 要么一起成功, 要么一起失敗; 這種策略可以保證當計算結果儲存下來, 此計算不會replay了;
或者存儲計算結果是一個冪等操作,那么可以先存計算結果,再記錄event處理進度,一旦計算計算結果成功但是記錄event處理進度失敗,重新計算上游的同一個event并儲存計算結果也不會引起問題。
否則要么計算沒存event就被屏蔽掉了, 要么多次計算結果存儲在DataStore里造成下游的重復信息。注意, 此時下游是無法分辨這是重復信息的, 因為這是datastore里的"2條的記錄", 將會獲得不同的message id。
冪等和end2end argument: 所以實現原子操作就不需要冪等了么? 是也不是, 在業務層是的, 比如要實現業務層的冪等,我們可以在存計算結果到datastore里的時候把一個與觸發本次計算的event的唯一id記錄在一起,這樣我們每次存的時候就可以使用樂觀鎖的方式test-and-set, 來保證如果這個id在數據庫里沒有才插入。(取決于業務,我們也可以用這個id當主key來,那么即使多次寫入同樣的內容也沒關系=>要求計算是deterministic的)?
如果我們保證觸發計算的event的"屏蔽"和計算結果的儲存是一個原子操作,那么我們就不需要上邊這種復雜的存儲策略,因為一旦計算結果存儲成功,觸發計算的event必定被"屏蔽"掉了, 而如果沒存儲成功, 則event一定會replay來重試。
然而在傳輸層卻不是的,比如儲存數據庫的tcp有可能丟包重發,依靠tcp的傳輸層id自動去重,實現tcp的冪等。
策略2: 完全依賴重算。
高可靠重發的問題是,所有信息都必須先記錄在高可用性的DataStore里, 相對于重新計算,重發需要的網絡IO, 存儲,狀態管理的cost是很高的。
而如果觸發計算的event可以replay的話(其實不管重算還是不重算,為了防止“剛接到event, 計算節點就掛掉的情況”, 我們都要求event可以replay), 我們就可以選擇重算然后重發來代替存儲計算結果的重發;重算需要2個條件:
計算需要是 deterministic 的,用完全一樣的數據,必須算出完全相同的結果,否則,當計算結果所需要發送的邏輯下游是由計算結果所決定的情況下(比如按照計算結果的key發送給下游不同的partition) 那么non-deterministic的重算有可能把計算結果發給不同的下游,這樣如果重算發生時,下游(假設是節點A)其實已經成功處理完畢重算前上游發送的信息, 只是ACK丟失, 那么重算的結果卻發送給了另外一個(節點B), 那么就會造成一個event造成了2個下游effect的結果, 引起一個event造成2次下游影響的結果, 違反EOMP的原則;
重算之前, 狀態需要rollback到沒有計算之前, 否則會影響需要狀態的計算的結果正確性,如果狀態更新非冪等,本次計算所做的狀態更新也會更新多次;詳見"加入節點狀態的三節點間的EOMP"。
(在多節點流計算里,要求上游可以重發,意味著上游把計算結果存下來了,或者上游可以重算,如果上游需要重算,那么上游需要上游的上游重發,那么上游的上游可以用儲存的結果重發或者重算。。。以此類推)
(2種策略其實都有可能造成重算,也都對event replay有需求。為什么還要浪費資源去存儲計算結果呢?這里邊的重要區別是,當儲存結束時,對觸發本次計算的上游event的依賴結束了,而不穩定的下游不會造成額外的重算, 和對上游, 上游的上游....計算的"鏈式反應", 詳見流的EOMP中的討論)
加入節點狀態的三節點間的EOMP
帶狀態的計算, 比如流計算的某中間節點需要統計總共都收到多少信息了, 每次從上游收到新信息, 都把自己統計的當前歷史信息總數更新并發往下游節點, 那么這個"系統的歷史信息"就是這個"統計消息總數"的邏輯節點的狀態。
由于狀態也需要高保活,所以它也一定需要儲存在遠端dataStore里, 這樣儲存狀態的遠端datastore就相當于一個特殊的下游。不同點在于, 當采用策略2:重算, 而不存儲中間計算結果的話, 重算時則需要datastore可以把它所記錄的狀態rollback到最初剛開始處理此event的那個點。
這里我們只能rollback, 而不能只是依靠冪等來保證“狀態的更新是exactly once”的原因是, 節點在處理任意消息時的狀態也和當前信息的數據一樣是本次計算的input, 而更新后的狀態則是本次消息處理的output, 如果重算時不rollback節點的狀態, 那么我們就會用一個被本消息"影響過"的狀態來進行計算, 而這是會違反exactly once msg processing語義的。
比如節點的本地狀態是上次收到的信息的數據上記錄的時間戳, 節點的運算是計算2個event數據之間的時間戳差距。假設eventA發生在時刻0, eventB發生在時刻10, 那么eventB引發的計算應往下游發送10, 并把節點的本地狀態更新為10, 此時如果eventB的這個計算需要重算, 但是我們不rollback狀態10回到0的話, eventB重算所得的結果就會變成0。
注意: 由于state更新也是處理event的"下游", 那么計算過程中的所有狀態更新都可以算作“計算結果”的一部分, 所以當我們需要儲存計算結果時,則需要把:
狀態更新儲存回高可靠的statestore里;
記錄event處理進度;
把計算結果存在高可用存儲里。
這3個操作作為一個原子操作(以后我們稱之為"原子完成"來省略篇幅); 而任何時候需要重算的話, 狀態必須恢復到處理event之前的樣子。
加入state,我們需要把(d. 要求記錄event處理的進度, 并保證儲存計算結果不出現重復, 更改為 (d+. 要求記錄event處理的進度, 并保證儲存計算結果和state的更新不出現重復。
并加入要求(e. state需要在replay 上游event的時候rollback到處理event之前時的狀態。
這些要求稍有抽象,讓我們看一下流系統一般怎么達成這些要求。
流系統的EOMP
考慮一個多節點的流系統,如果我們把上游所發來的計算結果當成前邊所說的“觸發計算的event”,而自己的發給下游的計算結果msg作為觸發下游計算的event。那么我們就可以用上邊的模型保證兩兩節點之間的exact once msg processing,從而最終實現端到端的exact once msg processing; 這就是Google MillWheel(DataFlow) 和Kafka Stream的解決方案。
他們都選擇把每個節點的計算結果儲存起來,并保證即使non-deterministic的計算, 也只有一次的計算會起作用, 而不會出現(策略2-1中提到的non-deterministic造成的不一致)。他們的區別是:
如何實現state和;
如何實現接收端去重;
如何實現“原子完成”
1、Google MillWheel(DataFlow)
1)每個節點維護一個用來去重的"已處理msgId"集, 從上游收到新msg之后, 檢查去重 (b.下游去重)
2)開始計算, 所有的狀態更新寫在本地, 由于一個狀態只有一個更新者(本計算), 所以可以在本地維護一個狀態的view, 所有的更新只更新本地的view而暫時不commit到"remote的高可用DataStore", MillWheel用的BigTable。
3)計算完畢后, (1).所有的要發送的計算結果,(有一些可能是在計算過程中產生并要求發送的, 都會cache起來), (2)所有的state的所有更新, (3) 引發計算的msgId, 會用一個atomic write寫在BigTable里。(a.要求結果高可用, d+.要求記錄event處理的進度, 并保證儲存計算結果和state的更新不出現重復)
4)當commit成功之后, ACK上游, 而由于上游也采用commit計算結果到BigTable里的策略,且只有當自己(這里)發出的消息ACK之后, 才會允許 garbage collection回收計算結果占用的存儲, 所以在收到ACK之前, 上游的計算結果, 也就是當前計算所需要的msg, 都可以重發,直至本節點計算成功且commit結果 (c. 要求觸發本次計算的event可以replay)
5)一旦計算過程中failure發生(比如機器掛了), 會在另外的host上重啟本process節點,從BigTable恢復本地state和"用來去重的已處理msgId集", 由于上次計算的結果還沒有commit, 所以滿足(e. state需要在replay event的時候rollback到處理event之前時的狀態)
5)新啟動的運算節點在load本地狀態之前先用自己的sequencer廢掉現存的sequencer, 這樣BigTable就可以block zombie計算節點的寫。
2、Kafka Stream
Kafka Stream是建立在kafka分布式隊列功能上個一個library, 所以在介紹kafka Stream之前, 我們先來講一下Kafka
3、簡單介紹Kafka Topic
Kafka的topic可以看作一個partition的queue, 通過發給topic時指定partition(或者用一個partitioner 比如按key做hash來指定使用那個partition), 不同的key的消息可以發送到不同的partition, 相同key的message則可以保證發送到同一個partition去, topic里的信息可以靠一個確定的index來訪問, 就好像一個數據庫一樣,所以只要在data retention到期之前,consumer都可以用同一個index來訪問之前已經訪問過的數據。
4、Kafka Transactional Messaging
前邊說過, Kafka Stream是建立在kafka分布式隊列功能上個一個library, 主要依靠kafka的Transactional Messaging來實現end2end exactly once msg processing。
Transactional Messaging是指用戶可以通過類似以下code來定義哪些對kafka topic的寫屬于一個transaction, 并進一步保證tx的atomic和Isolation。
producer.initTransactions();
?try {?
? ? // called? right before sending any records?
? ? producer.beginTransaction();
? ? //sending some messages...
? ? // when done? sending, commit the transaction?
? ? producer.commitTransaction();
} catch? (Exception e) {
? ? ?producer.close();
} catch? (KafkaException e) {
? ? producer.abortTransaction();??
}?
Kafka transaction保證了, beginTransactions之后的, 所有往不同Kafka topic里發送的消息, 要么在commitTransaction之后才能被read-committed的consumer看到, 要么由于close或者failure而全部作廢, 從而不為read-committed的consumer所見。
而kafka stream通過用kafka本身的分布式queue的功能來實現了state和記錄處理event進度的功能,因為:
所有的要發送的計算結果(由于可以允許計算發不同消息給多個下游,所以可能發給不同的topic和partition);
記錄input event stream的消費進度;
所有的state的所有更新。
這3點, Kafka Stream都是用寫消息到kafka topic里實現的。
1)自不必說,本來就是往topic里寫數據。
2)其實是寫當前consume position的topic;。(注意Kafka Stream的上下游消息傳遞考的是一個中間隱藏的Kafka topic, 上游往這個topic寫, 下游從這個topic讀, 上游不需要下游的ack,只要往topic里寫成功即可, 也不需要管下游已經處理到那里了。
而下游則需要維護自己"處理到那里了"這個信息,儲存在consume position的topic, 這樣比如機器掛掉需要在另外的host上重啟計算節點,則計算節點可以從記錄consume position的topic里讀出自己處理到那里然后從失敗的地方重洗開始。
3)其實是寫一個內部隱藏的state的change log的topic,和一個本地key value表(也就是本計算節點的state)。failover的時候, 之前的"本地"表丟失沒關系, 可以沖change log里恢復出失敗前確定commit的所有state。
(1)(2)(3)的topic都只是普通的Kafka topic。只不過(2)(3)由Kafka Stream自動創建和維護(一部分用來支持高層API的(1)也是自動創建)
開始計算時, 在從上游的topic里拿msg之前, Kafka Stream會啟動一個tx, 然后開始才開始計算, 此時tx coordinator會分配一個新的epoch id給這個producer并且以后跟tx coordinator通訊都要附帶這個epochId;
Kafka Stream的計算節點的上游信息都來自kafka topic的分布式partition queue, 且只接收commit之后的record, 在queue里的record都有確定的某種sequenceId, 所以只要計算節點記錄好自己當前處理的sequenceId, 處理完一個信息就更新自己的sequenceId到下一個record, 且commit到可靠dataStore里, 就絕對不會重復處理上游event, 而只要沒有commit這個位置則可以無數次replay當前的record; (b.下游去重, c. 要求觸發本次計算的event可以replay);
在tx內部,每從上游topic里讀一條信息就寫一條信息到記錄consume position的topic里, 每一個state的更改都會更新到本地的state(是一張表)里,且同時寫在隱藏的changelog里; 計算過程中需要往下游發信息則寫與下游聯系的topic;
計算結束后, commit本次的tx, 由Kafka Transactional Messaging來保證本次tx里發生的所有(1)往下游發的消息, (2) 記錄input event stream的消費進度,(3)所有的state的所有更新是一個原子操作, 由于結果都成功寫入kafka topic,所以達到計算結果的高可用性 (a.要求結果高可用, d+.要求記錄event處理的進度, 并保證儲存計算結果和state的更新不出現重復);
計算過程中出現failure(比如機器掛了), 那么當計算重啟,會重新運行initTransactions來注冊tx, 此時tx coordinator會分配一個新的epoch id給此producer, 并且從此以后拒絕老的epoch id的任何commit信息來防止zombie的計算節點; tx coordinator同時roll back(如果上一個tx已經在prepare_commit狀態, 繼續完成transaction, 具體看下邊Transactional Messaging這個章節); 如果rollback,那么input的處理進度, 狀態的更改和往下游發送的信息都會rollback, 那么計算可以重新開始,就好像沒有上次fail的失敗一樣; 如果上一個tx已經prepare_commit, 那么完成所有信息的commit; 此時當initTransactions返回,當前計算會接著上一個tx完成的進度繼續計算;(e. state需要在replay event的時候rollback到處理event之前時的狀態)
Idempotent producer
冪等producer主要解決這么一個問題: Kafka的消息producer, 也就是往Kafka發消息的client 如果不冪等, 那么因為Kafka的接受消息的broker和producer之間在什么是“重復信息”上沒有共識的話,則broker無法分辨兩個前后一模一樣的消息, 到底是producer的本意就是要發兩次,還是由于producer的重發(比如:producer在收到broker的"接受成功"的ack之前就掛了,所以不知道之前的消息有沒有成功被broker接收, 因此重啟后重發了此信息)。此時broker只能選擇接受消息,這就造成了同一個消息的多次接受。
同時我們也要解決zombie producer的問題: 如果我們保證producer高可用, 重啟我們認為fail掉的producer, 那么其實沒死的zombie producer的信息則會造成,重復且亂序的發布消息。(由于zombie的存在, 會有2個producer同時發布我們以為只有一個producer會按順序發布的消息,這樣就無法保證順序: 比如zombie在發送A, B, C...的時候, 新啟動的producer也開始發送A, B, C... )
Kafka的解法:
10用一個producer指定的固定不變的transactional.id(非自增id,叫producerName可能更好)來識別可能會在不同機器上重啟的同一個logical producer; 相當于給producer起了一個logical name。
2)注冊transaction.id來開始session, 而在session里此tx發來的消息都可以通過維護一個sequenceid來dedup。
3)非正常結束tx的話, 比如機器掛了, producer重啟, 那么就會再次注冊自己的transaction.id, 則標志前一個session失效, 而所有屬于上一個session的信息全部作廢(具體看下一節Atomic and Isolation), 這樣就可以做到producer的zombie fencing
(Further, since each new instance of a producer is assigned a new, unique, PID, we can only guarantee idempotent production within a single producer session ---- Idempotent Producer Guarantees [26])
Atomic and Isolation
1)Producer Zombie fencing: 注冊transaction.id會申請高可靠epoch id, broker和tx coordinator可以依此fencing zombie的任何寫操作 (e.g. tx coordinator關閉tx);. Zombie fencing in https://www.confluent.io/blog/transactions-apache-kafka/
2)多個Tx coordinator跑在kafka broker里, 寫是按照tx.id hash給不同的Tx coordinator, 每一個tx coordintor負責subset的transactionlog的partition。
這樣保證同一個logic produce啟動的tx必定連接同一個tx coordinator。tx coordinator保證所有的狀態都在的高可用高一致性的寫在tx log里。(且用queue zombie fencing來保護自己的狀態一致性, Discussion on Coordinator Fencing in [26]) (Discussion on Transaction-based Fencing, => 如果zombie不跟coordinator再聯系,那么可以一直跟broker發垃圾信息... P39in [26])
3)Producer注冊新的tx之后,在給任意topic的任意partition發消息之前,先跟tx coordinator注冊這個partition。
4)當寫完畢,producer給tx coordinator發commit,tx coordinator執行2PC,在transaction log里寫prepare_commit, 這樣就一定會commit了,因為producer 通知commit就代表所有的寫已經寫成功了, 這一步其實只是把決定記下來。
5)Tx coordinator聯系所有的注冊過的topic的partition,寫一個commit marker進去。
6)當所有的marker寫完,在transaction log里記錄commit complete。
7)注意:當在第一步tx coordinator在發現新的重復transaction.id來注冊時,會檢查有沒有相同的transaction.id下未關閉的tx,有的話發起rollback,先在transaction log里記下rollback的決定,然后聯系所有的注冊過的topic的partition, 寫入一個ABORT marker。
而如果此tx的狀態已經時prepare_commit了,那么有可能tx coordinator在下邊第6步聯系所有partition來commit中間掛掉了,那么要接著完成這個commit過程;即roll forward而不是roll back。
8)Read_commit等級的consumer需要等待transaction有結果,consumer library讀到任何與Transactional Messaging相關的信息,就開始進入cache階段,并不會運行任何consumer端的計算,只有當讀到commit mark,則把cache住的record依次交給consumer端的計算,而當讀到ABORT mark,則把相關tx的record全部filter掉。注意: pending的tx會block所有Read_commit等級的consumer對topic的讀。
在保證兩兩節點之間的EOMP來實現整個流的EOMP的模型里,如果我們某一個或多個節點的狀態和計算結果根本不記錄在高可用DataStore里,我們還是可以實現EOMP, 我們只需要(1)replay這個節點的上游來重算這個節點的狀態和發給下游的計算結果, (2)下游去重。
如果上游也沒計算結果記, 那么replay上游的上游即可, 如果上游的上游也沒記....一直追溯到記錄了計算結果的上游節點即可。
如果一直都沒有failure,那么比起Dataflow和Kafka Stream那種記錄所有計算結果的模型 我們少記錄一些額外的計算結果和狀態就減少了很多系統負載; 這就是重算與記錄計算結果模型的結合。
重算與記錄計算結果的結合
考慮 A,B,C, D 4個節點, A的計算結果傳給B, 而B則把一部分計算結果給C一部分給D, 如果B沒有記錄自己的output, 則Cfail掉之后需要replay上游的input,這就需要B的一些重算來重新制造C所需要的input, 即使B的input(即A)記錄了所有的計算結果, 我們還需要"恰巧可以產生這些歷史計算結果的"B的歷史狀態,才能重算出C所需要的input。(所以B必須保存歷史狀態或者用某種方法重建自己的歷史狀態才能保證可以重算C所需要的input)
如果C的狀態也丟失了, 那么對上游的負擔則更重些, B需要重新計算來提供所有的歷史計算結果(即C的所有歷史input)來讓C重建自己的歷史狀態。
可以看到, 任意一個節點的某狀態S(n+1)是:
上一個歷史狀態S(n), 和;
從歷史狀態S0建立開始所接收到的信息M(n)。
同時作為輸入而得到的輸出; 而這個過程中又會向下游發出一些計算結果O(n+1)。
所以M(n) + S(n) => S(n+1) + O(n+1), 當下游crash重啟需要O(n+1)時, 我們則有2種選擇:
1、記錄O(n+1);
2、記錄O(n+1)但是記住, O(n+1)是根據什么數據生成的。
1是記錄計算結果, 2是重算。兩者并用的好處在于, 1可以異步batch進行而不需要節點必須等待O(n+1)記錄成功才往下游發送O(n+1)。而2保證了當1還沒有完成時, 系統也有足夠的信息可以重建O(n+1)。
這是一個鏈式反應, 當重算需要M(n)和S(n)時, 而如果M(n)并沒有存則需要上游重算M(n), 上游還沒存這些重算M(n)的信息則需要replay上游的上游來重算這些信息,這就是所謂的鏈式反應...。最極端的情況是什么都沒存,那么需要從頭開始跑我們的stream程序。
可以看到, 如果沒有存中間計算結果或者狀態, 那么當這個數據被下游重算需要的時候, 需要我們重算這個數據, 這就會產生對上游的計算結果或者狀態的需求, 這就要求我們如果不存下這些數據, 我們就需要記住計算這個數據的數據依賴圖, 所以要么把"中間"數據和狀態存起來待用, 要么記住他們的數據依賴圖。
而這些記錄的中間結果只有當對其的所有依賴從計算圖中消失時, 我們才可以垃圾回收/刪除這些數據(比如所有基于某狀態的計算結果都已經存下來了, 那么這個狀態的數據就可以刪除, 再比如某計算結果所引發的下游計算結果和狀態都已經存下來了, 那么此計算結果的數據就可以刪除了),從而不會造成儲存數據爆炸。
這, 也就是Spark Streaming的解法。
Spark
Spark有三種Stream...
(1)快要被deprecate掉的DStreaming [10, 14]
(2)新一代為了彌補和Flink之間差距的, 支持event time的Structural Streaming(可惜還是有很多不足, 具體的不同和哪里有不足, 要留到對比各個系統對event time和windows操作的支持的對比, 也就是下篇來詳細描述了) [12,13]
(3)實驗中的Continuous Streaming(Spark Continuous Processing) [11, 20]
(3)還在實驗狀態, 基本上是把底層都改掉來使用了和Flink相同的Chamdy-Lamport算法[20], 但是貌似還有很多問題需要解決所以目前不支持EOMP, 這里不多聊了。
根據Structural Streaming的論文[12], (2)和(1)使用了相似的方法來保證EOMP, 但是其實作者發現(2)比起(1)還是有一些性能上的改進[21],但是總體原則還是和(1)類似的利用一個重算關系圖lineage來維護各個狀態計算結果的依賴關系, 通過異步的checkpoint來截斷lineage也就是各個節點狀態和計算結果復雜的關系(比如一個數據如果已經checkpoint了, 那么它所依賴的所有狀態和計算結果都可以在關系圖里刪去, 因為replay如果依賴于這個數據, 那么使用它的checkpoint即可, 而不需要知道這個數據是怎么算出來的, 如果還沒checkpoint成功, 則需要根據數據依賴圖來重算這個數據), 像這樣利用checkpoint, 就可以防止lineage無限增長。
但是維護關系圖需要利用micro-batch來平衡"關系維護"造成的cost, 否則每一條信息的process都產生一個新狀態和新計算結果的話, 關系圖會爆炸式增長(用micro-batch, 可能1000條信息會積累起來當作"一個信息"發給下游, 只需要在關系圖里記錄一個batch-id即可, 而不是1000個msg id, 對與狀態來說也是這樣,處理1000個msg之前的狀態分配一個id, 處理這1000個信息之后的狀態一個id, 而不需要記錄1000個狀態id, 同時他們之間的聯系線也從1000條降低為1條。這樣就大大減小了關系圖維護的負擔。
但這樣造成的結果是micro-batch會造成很高的端到端處理的latency, 因為micro-batch里的第一條信息要等待micro-batch里的最后一條信息來了之后才能一起傳給下游。
而這個等待是疊加的,當stream的層數越深,每一層的micro-batch的第一條信息都需要等待最后一條信息被處理完畢,相比在每一層都毫不等待,micro-batch造成的額外latency就會疊加式的增高。
注意, Spark Structured Stream提供了一種continuous mode[11,12,13,20]來替代micro-batch,解決了latency的問題,但是目前支持的operator很少,且不能做到exact once msg processing, 這里不多加討論了(不過將來有望做成和flink一樣的模式, 畢竟也用的Chandy-Lamport Distributed Snapshot algorithm) : Note that any time you switch to continuous mode, you will get at-least-once fault-tolerance guarantees.[13]
spark的micro-batch會造成嚴重的latency問題, 而Dataflow和Kafka Stream的方案要求記錄每一個計算結果, 則會在大大增加系統負擔的同時也會有不小的latency附加。那么有沒有一種方法可以不記錄所有中間計算結果, 并且也不使用micro-batch呢?
我們來看看flink的藝術;
Flink
如果我們不儲存流系統中間節點的計算結果在高可用DataStore里, 也不想維護復雜的數據依賴圖(需要micro-batch的根源), 那么當一個節點fail掉需要replay上游的input的時候,上游就必定需要replay自己的上游,且自己的狀態要rollback到沒有接收這些要replay的消息之前的狀態;對上游的上游就有相同的要求,那么最終所有節點的上游最終會歸向數據源節點,并要求"重新replay"。總而言之2個要求:
數據源節點可以replay, 并產生層層的蝴蝶效應的"每個節點對上游要求的replay";
所有的計算節點的狀態,要恢復到沒有接收到"上游所replay的消息"之前的樣子(所以replay后可以回到現在的狀態,且重新生成下游所需要的input, 即當前節點在處理這些replay消息時產生的計算結果集)。
全局一致點和全局一致狀態集
為了方便討論,我們定義2里所提到的global的狀態為一個全局穩定點; 顯然,如果我們一條消息一條消息的處理,數據源節點等待直到所有流節點處理完這條消息所產生的蝴蝶效應信息之后,才發出下一個消息B0,那么在消息B要發出但是沒發出之前,所有的節點的狀態就滿足我們對全局穩定點的需要。
比如當我們持續處理B1,B2,B3...B100, 這時一個節點fail掉了,那么我們只要流系統的所有節點rollback他們的狀態到發出B0前的"全局穩定點", 整個系統的計算和狀態就會干凈的回道任何節點都不曾被任何B0-B100所影響的狀態, 那么此時從數據源節點replay B0, B1, B2... B100 成功, 這些消息就"exactly once process"掉了。所以,我們找到了第一個不需要micro-batch, 也不需要記錄中間節點計算結果,就能實現EOMP的方法:
每n條信息, 或者每一段時間, 數據源節點(或者流系統的第一個入口節點)停止向下游發送任何信息, 直到所有節點報告說有關這條信息的所有派生信息(由于這條信息引起的第一個計算節點的計算結果會發送給它的下游, 下游的計算結果又會發送給它的下游...等等這些都是派生信息)都已經處理完畢, 此時把所有節點的狀態checkpointing在高可用DataStore里, 建立一個全局穩定狀態集(由流系統中每個計算節點各自的全局穩定狀態所組成), 數據源才開始繼續發送信息...這樣, 任意的節點fail掉, 我們只要在別的機器上重啟這個計算節點并download之前checkpoint的狀態,流系統的所有節點也rollback到上一個全局穩定狀態即可, 由于數據源發送數據的進度也屬于全局穩定狀態集中的一員, 所以當數據源rollback自己的狀態,則可以開始replay 全局穩定點checkpoint之后才發送的信息,而此時所有節點都已經rollback到一個"從沒見過這些信息和它們的派生信息"的狀態了,整個系統就好像從來沒有見過這些信息一樣, 從而實現即使failure發生,我們的系統也可以實現EOMP。
更進一步, 我們來看如何不停住數據源的信息接收,我們所需要處理的問題。
1)任意時間點的全局狀態,都不是全局穩定點: 如果所有節點都不等待后續節點有沒有處理完信息, 那么任意時間點, 在流的中間節點建立全局穩定狀態的時候 ,流上游的節點已經開始處理新的信息, 它們的全局穩定狀態早已被新的信息所影響了, 而下游可能還沒收到建立全局穩定狀態所需要的信息。
2)隨意指定的全局穩定狀態集可能根本不存在, 比如數據源連續給A和B發出x,y兩條信息, 而A和B則需要把計算結果都發送給C,如果我們想定義全局穩定狀態為所有節點"處理完x相關的消息之后", 但是"處理完y相關的信息之前"的狀態。
那么考慮這樣一個運行順序: A處理完x向C發出x-A, B處理完x, y后向C發出x-B, y-B, 然而由于網絡和處理速度的因素, C在還沒有收到x-A的情況下就處理完了x-B, y-B, 所以C的一個"干凈"的從未被y信息影響的狀態,但包含了所有需要的x信息的穩定狀態, 在C的狀態變遷過程中是從來不存在的 (即, 處理完x-A和x-B,但是沒有處理y-B時的狀態)。
問題1意味著我們不能用物理時間來建立全局一致狀態集, 那么既然流的不同節點接收到數據源任意消息x的派生消息的時間不同, 那么只要我們能讓所有節點分清哪些是x的消息和派生消息, 哪些是x之后的消息和派生消息, 所有節點就可以在處理完x的派生消息之后把本地狀態復制一份儲存在高可用DataStore里, 作為全局一致狀態集的一員。
問題2意味著即使允許計算節點連續處理input而不必等待所有下游建立好全局一致狀態才發下一個計算結果, 計算節點也不能盲目的不加考慮的處理上游信息, 我們要使得計算節點的狀態變遷過程中, 至少全局一致狀態是可以出現的。
Flink的解法就是由一個高可用的coordinator連續發出不同的stage barrier(比如先給所有src發1,然后1分鐘后發2,2分鐘后發3..... 如此增長), 夾雜在發給數據源發出的數據流里, 所有的節點都必須忠實的轉發這個stage barrier, 這樣所有的節點的:
input都分為了接收到某barrier(設為barrier-a)之前的信息和收到barrier-a之后的信息,;
所有的發給下游的計算結果也分為自己發出barrier-a之前的信息和發出barrier-a之后的信息;
所有的狀態變遷也分為,用所有接收到barrier-a之前的信息, 所建立的狀態, 和收到barrier-a之后被新的信息影響了的狀態。
那么如果所有節點都遵循2個原則:
只用"接收到barrier-a之前的所有信息", 來建立自己的本地狀態,并備份在高可用DataStore里;
只使用"接收到barrier-a之前的所有信息"來計算結果并發送給下游之后, 才轉發barrier-a; 然后才開始處理"接收到barrier-a之后的信息"; 這樣就保證了自己在往下游發送barrier-a之前所發的所有計算結果, 都沒有被自己所收到的barrier-a之后的新消息所影響(自己發送的barrier-a之前的計算結果只和自己接收的barrier-a前的input集合相關)。
而當所有的節點都保證"自己發送的barrier-a之前的計算結果只和自己接收的barrier-a前的input集合相關", barrier-a就成了系統系統的分隔點,而所有節點遵循原則-1所建立的本地狀態備份, 也絕對沒有被數據源發出的在barrier-a之后的信息和它們的派生信息所影響; 而這些所有本地狀態備份的全集,則組成了全局一致狀態集。
一個細節, 當一個節點只有一個input channel的時候, 只要按順序處理input信息即可; 而當一個節點有多于一個input channel的時候, 一個input channel的barrier-a已經接收到, 但是其他channel的barrier-a還沒有收到怎么辦呢?
從收到barrier-a的channel接收新的信息并處理可行么? 顯然不行, 這樣違反了原則-1, 因為"barrier-a之前的信息全集"還沒有湊齊(其他channel的barrier-a還沒有收到), 此時如果處理了任何屬于barrier-a后的"新"信息, 我們就再也無法在狀態變遷中得到一個"干凈"不受barrier-a后的"新"信息所影響的狀態了, 這意味著我們必須block 這個已經收到barrier-a的channel;
我們可以向下游轉發barrier-a么? 顯然也不行, 這樣違反了原則-2, 理由相同, 我們還沒有收到"barrier-a之前的信息全集", 而從其他channel收到barrier-a之前還收到其他信息的話, 它們所產生的計算結果也必須在轉發barrier-a之前發送。
由1,2就很清楚可以推理出flink的算法了:
收到任意input channel 的barrier-a之后, block此channel;
收到所有input channel的barrier-a之后, 把當前狀態checkpoint并備份到高可用的DataStore里; (這里可以做到異步checkpoint并不會影響latency, 詳細介紹看后邊的異步checkpointing這一節);
收到所有input channel的barrier-a之后, 并且處理完所有此前收到的信息并向下游發送計算結果完畢后, 向所有和自己相連的下游轉發barrier-a;
當所有節點都備份完成,我們就得到了一個全局一致狀態集, 或者說全局一致狀態快照; 系統的穩定點就進步到了barrier-a, 如果下一個barrier是barrier-b, 那么在得到barrier-b的全局一致狀態集之前, 如果系統出現failure, 我們就可以通過重啟所有計算節點的方式, 讓所有節點reload barrier-a所記錄的狀態集, 從而實現把所有節點的狀態rollback到"上一個全局一致"的狀態, 使得流系統可以重置到好像根本沒有看到過任何barrier-a到barrier-b之間的信息的一樣, 然后重跑這段信息;
通過干凈的rollback了可能造成的重復處理的痕跡, 使得所有信息的效果都只發生了一次, 所以我們得到了一個端到端的EOMP系統。
異步checkpoint可以使得, checkpoint本身不會block流本身的計算,增量checkpoint避免了,每次一點小變動都需要checkpoint全部的state,可以節省計算機資源(比如網絡壓力)
flink和spark這種需要checkpoint的系統都可以做到異步增量checkpoint, 且這個技術也很成熟了, 本文只選flink的方法[35]來簡單說明一下 , Spark的可以看[21]
1、Flink的異步增量checkpointing
Flink使用RocksDB 作為本地狀態儲存, RocksDB本質上就是一個LSM tree, 對狀態的寫會寫在內存的memtable, 一般是一個linked hashmap, 寫到一定大小就存到硬盤里變成sstable(sorted-string-table), 不再更改。
此后會開一個新的memtable來接受新的寫。這樣會按歷史時間來生成很多小文件, 讀的時候先讀memtable,如果里邊有想要的key對應的value,必定是最新的,否則按歷史時間順來查sstable(sstable有自己的cache, 所以未必需要讀硬盤)。
對于flink來說, 當需要checkpoint的時候, 只需要把當時的memtable寫在硬盤里即可, 這是唯一一個需要block住當前計算的操作, 此后也只需要把從上個checkpoint開始, 新生成的sstable異步發送到高可用的遠程文件系統即可(比如S3, HDFS)。這樣就做到了異步(發送到高可用datastore是異步執行的),和增量(只發送新增文件)。
注意, 由于太多的小文件的sstable會造成讀的性能問題, 所以RocksDB需要異步的compact這些小文件到一個大文件, 對此flink也需要做出一些應對, 詳見[35], 例子給的非常清楚,這里不再贅述。
系統內與系統外以上的討論都是關于中間件內部如何實現EOMP, 但是由于end to end argument的影響, 中間件提供的保證再多, 沒有source的支持, 它也無法區分source(流系統的event來源)發來的2個內容一樣的event, 到底是"同一個"信息的重發, 還是"本意"就是想要中間件處理兩次的兩個"不同"event; 對sink(流系統計算結果的去處)來說,由于failure造成的重算,zombie的存在, 則需要sink能夠"融入"到流系統的EOMP體系中去。
對于source的要求基本就是重發和對消息提供能區分到底是不是一個event的eventId,一般就是Kafka那樣就OK, 比較簡單就不多討論了; 這里著重聊一下sink; Sink主要有兩種手段來配合流系統中間件的EOMP, 冪等和2階段提交(2PC)
1、冪等Sink
最簡單的來配合流系統EOMP的策略就是冪等, 由于是外部系統, 所以重用我們的"兩節點EOMP模型"基本不可能, 因為基本不可能用一個tx來把要寫外部系統的操作和記錄已經處理過這個操作用一個原子tx來commit, 這也是流系統為什么要支持2PC的原因。
由于冪等保證對同一個計算結果寫多次和寫一次一樣, 所以無論是什么流系統, 無論系統是重算型, 還是記錄計算結果來避免重算型, 冪等的sink都可以很好的支持; 所以Dataflow/Spark/Kafka Stream都是靠冪等的sink來實現EOMP。
冪等的問題在于無法應對需要重算, 且計算可以是non-deterministic的情況, 詳見: 后邊(Latency, 冪等和non-deterministic)一節的討論; 這也是Spark Streaming, 使用冪等sink的Flink無法支持non-deterministic計算的本質原因。
相比之下, dataflow總是記錄計算結果來避免重算(即使重算也只會有一次重算的結果會影響下游), Kafka Stream支持tx可以保證只有一次計算結果可以被commit到Kafka Stream里, 如果sink也只讀committed上游kafka stream, 則可以保證即使計算是non-deterministic的, 也只會有唯一commit的計算結果被讀到(其他的計算結果沒有commit marker而被Kafka data comsume API忽略)從而影響sink的外部系統。
而Flink的2PC sink也做到了重算會直接導致sink的外部系統可以配合flink的global rollback, 所以只會有一次的計算結果被外部系統接受(commit)。
所以Spark Stream在4個流系統里, 是唯一一個完全無法支持non-deterministic計算的流系統。
2、Flink獨特的2PC Sink
2PC對很多熟悉數據庫的人來說應該是臭名昭著了, 這是很復雜和很容易造成問題而需要極力避免的東西、但是時代在變化, 2PC在新時代也有了彌補自己問題的很多解法了,這里簡單介紹一下。
2PC協議由一個coordinator,和很多參與2PC的異構系統組成,發起2PC的時候 coodinator要求所有人pre-commit,這是2PC的第一個P(phase),如果所有tx參與者都可以pre-commit并告知coordinator,則coordinator告訴所有人commit,否則告訴所有人abort,這是2PC的第二個P(phrase)
2PC最大的問題是它是一個blocking協議,blocking的點在于當coordinator和某一個2PC的參與者A掛了,其他參與者無法作出任何決定,只能等待coordinator或者死掉的那個參與者A上線,因為這時所有其他參與者都無法判斷以下兩種情況到底那種發生了,從而無法決定到底是commit還是abort。
coodinator已經收到了所有人的pre-commit并告知參與者A commit,A commit后就掛了;
A并不能pre-commit,但是coodinator在告訴所有人需要abort之前就掛了。
在情況1. 所有其他參與者都應該commit,在情況2,所有其他參與者都應該abort;由于無法辨別到底是情況1. 還是2. 所有其他參與者必須block等待,這對很多數據庫來說意味著為此tx加的鎖都不能放掉,從而影響數據庫的其他不參與2PC的操作,甚至鎖死整個數據庫。而如果coordinator或者參與者A無法再上線或者狀態丟失,則需要非常復雜的人工操作來解決其他參與者應該如何決策的問題。
雖然2PC有各種問題, 但是在consensus協議早已經成功分布式系統的基石, 各種開源和標準實現可以被輕松獲得的今天, 用consensus協議來彌補2PC的問題已經成為一個"已經解決的問題", 如[25]4.2 The Paxos Commit Algorithm 中所說:
We could make that fault-tolerant by simply using a consensus algorithm to choose the committed/aborted decision, letting the TM be the client that proposes the consensus value…
解決2PC問題的關鍵在于保持coordinator狀態的高可用性, 那么只要coordinator保證把commit或者abort的決定記錄在一個consensus cluster里即可,比如etcd或者zookeeper,這樣coordintor死了,重啟從consensus cluster里恢復狀態重新告知所有參與者到底應該commit還是abort即可; 這也是為什么各種流行的分布式系統實現分布式tx都是用2PC的原因, 比如dynamoDB, Spanner, Flink, Kafka...
3、Flink的2PC Sink
2PC的第一個P的關鍵在于所有tx參與者在不知道其他參與者狀態的情況下,承諾未來一定可以前進commit成功或者干凈的回退abort。當前的tx參與者準備好了,且同意commit,2PC的第二個P的關鍵點在于整體系統的”唯一決定”統一的推進或者回退各個參與者的狀態。
而Flink的global state其實可以看做一個2PC,當一個節點收到所有的上游的barrier-n時,這個“契機”可以看做收到了coordinator的可不可以precommit的問詢,而當localstate已經在remote 存好之后,當前節點就可以告訴coordinator它準備好了,這可以看做回復precommit(如果此節點在發給precommit)。
而當所有的節點都通知coordinator“準備好了”之后,coordinator就可以記錄下barrier-n的global state完整checkpoint的這個事實,這相當于一個不需要發給tx參與者的commit。
這是由于當failover的時候,是由coordinator告訴所有節點應該從哪個checkpoint點來恢復本地狀態,所以各個節點的localstate到底是commit了還是rollback了,可以完全由“有沒有記錄下barrier-n的global state完整checkpoint成功”這個metadata推算出來,所以也就不需要單獨給各個節點發commit/abort信息來讓各個節點commit或者abort了。
當系統狀態只涉及到flink的內部狀態時(flink提供的stateApi所提供的statestore), 如果一個某節點X在回復precommit之后掛了,coordinator還是可以選擇commit,因為組成global state的節點X的local state已經完整的存儲在remote的datastore里了。
但是如果涉及到外部狀態,比如sink需要把計算結果存儲到一個非flink控制的數據庫中去時,flink的sink節點就相當于這個外部數據庫的client,需要連接外部數據庫并把數據存入外部數據庫;要使得外部數據庫的狀態和flink的狀態保持一致,則需要sink把外部數據庫的狀態引入到flink global state的2PC里,而coordinator在決定commit或者abort的時候,必須通知sink來執行外部狀態的commit或者abort,因為coordinator是不知道外部狀態到底是什么,也無法簡單的用通知sink從不同的globalstate點恢復來代替2PC的commit/abort通知。
同時sink收到barrier-n時,sink要保證外部數據庫里與barrier-(n-1)到barrier-n之間信息相關的數據更改,處于一種“在任何情況下都一定可以commit成功,但是還沒有真的commit,所以外部數據庫的消費者不可見這些狀態,且可以rollback的,可進可退的狀態”。[40]給出了如何用文件實現的一個例子;我這里給出一個如何使用支持transaction的數據庫的例子。
首先為了避免產生歧義, 我們定義:
1)flink-precommit ack為 barrier-n流到各個節點(包括sink), 各個節點完成local snapshot checkpoint后發給coordinator的ack, sink則是完成“某個操作”后發給coordinator的ack, 這個操作需要把外部系統(比如數據庫)置于一種, 保證任何情況下都可以服從coordinator的最終決定的狀態, 一個既可以commit(如果coordinator最終決定commit), 又可以rollback(如果coordinator決定abort)的狀態, 且數據不為外部系統的consumer所見。
2)定義flink-commit為coordinator收到所有人的pre-commit ack后的的最終commit決定。
3)定義db-commit就是普通的外部數據庫的commit。
①當程序開始,sink立刻開一個外部數據庫的transaction,當sink收到上游的所有的barrier-1,則立刻db-commit當前transaction然后回復coordinator flink-precommit成功(flink-precommit ack),因為此時如果不db-commit,一旦回復coordinator flink-precommit之后,這個sink掛了,那么外部數據庫一般就會自動rollback;此時就算sink在其他機器上重啟,我們也丟失了所有要最終flink-commit的數據; 而如果這個sink的crash是發生在coordinator收到所有節點的flink-precommit ack并最終決定flink-commit之后, 所有其他節點(比如另外一個sink)的狀態可能都commit了(所以無法簡單rollback); 而只有此sink的所有數據都無法恢復, 這就破壞了global consistency。
②但是上邊我們在flink-precommit階段就db-commit了外部數據庫的transaction; 這時會有兩個問題:?
第一, 我們暴露了只是應該precommit的數據(這些數據不應被數據庫的外部consumer所見);
第二, 如果有一個其他節點不同意commit而發給coordinator abort的決定, 那么coordinator則會決定abort, 所以我們的sink則需要服從rollback的決定, 但是我們已經db-commit了的數據, 而一般數據庫都不支持rollback已經commit的數據, 這就造成了問題。
為了解決這兩個問題, 這時我們需要設計一個和外部數據庫的數據消費者的數據“屏蔽協議”。比如利用一個字段來表示當前數據只是“precommit”,所有的外部數據庫的讀寫者都應該忽略這些數據(而只有當這個字段是committed才能讀寫)。
這樣當flink的coordinator通知flink-commit時,我們用另外一個外部數據庫的tx來把所有涉及到的precommit的數據的這個字段改為committed即可, 這就解決了第一個問題。對于第二個問題來說, 如果最終flink coordinator決定abort, 我們把此字段設為abort并利用一個異步垃圾回收的程序把所有標記為abort的數據清理掉即可。
③這樣設計的關鍵是, 即使sink precommit ack之后掛了, 我們要flink-commit的數據也不會丟。所以其實flink-precommit ack時, sink把數據寫在任何其他可以保證數據高可用的地方都行(只要sink fail掉重啟之后還能找到它), 未必需要是同一個數據庫的同一個表。如果采取這種策略, 那么在flink-commit時則需要重新把要db-commit的數據從存的地方讀出來, 然后重新寫入到真正要寫的數據庫并db-commit。
④flink提供了一個TwoPhaseCommitSinkFunction,[40]里有詳細描述如何簡單的extends這個interface來實現一個可以和flink的global consistency配合的sink節點的邏輯,本文不再贅述。
需要注意的一點是,當sink收到coordinator的flink-commit指令之后,運行sink的db-commit邏輯,在外部數據庫的db-commit更改完畢(比如把要commit的數據的status的值從precommit改為committed)后,但是flink記住sink已經完成commit之前(flink在跑完sink的commit函數后會記住這個sinki已經commit了, 所以不再重復call sink的commit, 否則flink就會一直重試commit), 此時,一旦sink掛了,那么在另外的機器重啟的sink,flink無法得知外部數據庫已經commit成功了,所以flink會再次重試commit函數來嘗試commit。從而造成重復commit,這也是[40]中提到的commit必須設計為冪等操作的原因。
注意1: 可以使用2PC作為sink的關鍵是, 你的sink可以保證在ack pre-commit之后, 保證無論任何情況都可以成功commit; 這不是說你的sink所連接的外部系統支持tx就可以的, 需要application設計者根據情況具體設計。[1]的P213頁, 就描述了sink是用kafka transaction記錄計算結果到kafka,但是即使用了transaction也可能丟數據的一種edge case。而[41] Kafka 0.11 and newer=>Caveats 里也有提到。
丟失數據的原因就在于, kafka sink的默認實現:FlinkKafkaProducer011, 在precommit的時候沒有真的commit數據, 因此當kafka sink fail掉沒有及時重啟, 一旦kafka tx超時, 所有tx里的數據都會丟失, 而此時如果coordinator已經決定commit就絕不會再重發數據(source也已經commit發出的消息的index),從而kafka sink的此次tx的所有數據永久丟失。
這里提供的DB版本的sink實現思路, 在precommit階段就commit數據, 來保證“無論如何數據都不會丟”, 但是用app level的flag屏蔽外部可見; 這樣做的原因就是為了克服類似kafka sink的這種缺陷。
注意2: 使用2PC Sink的Flink應該是可以應對non-deterministic計算的, 因為一旦failure發生, 所有之前的狀態和對sink的寫入都會被rollback; 但是這樣的話, Flink在sink端就變成了micro-batch模型, batch大小取決于發barrier的頻率; 但是即使這樣, 由于只有sink需要聚集一個batch才能做一次2PC, 但是中間節點往下游發送計算結果還是即算即發的, 所以比起Spark這種所有中間計算都是micro-batch,micro-batch造成的額外latency會疊加式的增高的模型, 端到端的latency應該還是會要小一些。
Latency, 冪等和non-deterministic利用冪等的sink可以做到實時記錄計算結果, 達到最小的end to end latency。因為sink根本不需要等待barrier, 來一條計算結果就向外部系統commit一條記錄就好, 而由冪等保證了就算整個系統開始重算, 在sink端也會表現出每個source端的event只產生了一次效果的結果。
但是冪等是很難克服non-deterministic計算的。因為non-deterministic計算使得同一個source發出的event引起千變萬化的"蝴蝶效應" (比如第一次計算event生成的Key是A, 第二次重算生成的Key是B, 如果下一個節點是partitionByKey, 那么這里的2次計算結果就會發送給了完全不同的下游節點, 考慮幾百次不確定計算引起的不同蝴蝶效應, 等計算結果到達各個sink節時, 計算的key和value甚至結果的個數和在sink節點的distribution都完全不同了, 那么sink也就完全無法利用冪等來屏蔽掉同一個event replay所造成的"蝴蝶效應"了)
相比之下, 如果整個流系統的計算都是確定性的, 那么無論在source端replay多少次同一個event, 它所產生的"蝴蝶效應"在sink端也必定相同, 則application設計者則可以很容易設計出冪等操作來屏蔽掉重復的計算結果。
如果業務里無法去除non-determnistic的計算, 那么你只能選擇Google Dataflow, KafkaStream,或者Flink+2PCSink; 而只支持冪等的Spark和利用冪等sink的Flink無法支持non-determnistic的業務計算。
REFERENCEStream Processing with Apache Flink: Fundamentals, Implementation, and Operation of Streaming Applications
Designing Data-Intensive Applications: The Big Ideas Behind Reliable, Scalable, and Maintainable Systems
Lightweight Asynchronous Snapshots for Distributed Dataflows
Streaming Systems: The What, Where, When, and How of Large-Scale Data Processing
Kafka Streams in Action: Real-time apps and microservices with the Kafka Streams API
The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing
MillWheel: Fault-Tolerant Stream Processing at Internet Scale
Distributed Snapshots: Determining Global States of Distributed Systems (Chandy-Lamport)
State Management in Apache Flink R Consistent Stateful Distributed Stream Processing
Discretized Streams: Fault-Tolerant Streaming Computation at Scale
Continuous Processing in Structured Streaming Design Sketch
Structured Streaming: A Declarative API for Real-Time Applications in Apache Spark
Structured Streaming Programming Guide 2.4.3
Spark Streaming Programming Guide2.4.3
Watermarks, Tables, Event Time, and the Dataflow Model
Kafka Streams’ Take on Watermarks and Triggers
Streams Architecture Kafka
Enabling Exactly Once in Kafka Streams
Transactions in Apache Kafka
Introducing Low-latency Continuous Processing Mode in Structured Streaming in Apache Spark 2.3
State Management in Spark Structured Streaming
Process Large DynamoDB Streams Using Multiple Amazon Kinesis Client Library (KCL) Workers
Big Data: Principles and best practices of scalable realtime data systems
Making Sense of Stream Processing
Consensus on Transaction Commit
Exactly Once Delivery and Transactional Messaging in Kafka=>docs.google.com/documen
End-to-End Arguments in System Design
Transactional Messaging in Kafka
Akka Split Brain Resolver
Unreliable Failure Detectors for Reliable Distributed Systems
The Weakest Failure Detector for Solving Consensus
Exactly once Semantics are Possible: Here’s How Kafka Does it
24/7 Spark Streaming on YARN in Production
Monitoring Back Pressure (flink)
Managing Large State in Apache Flink: An Intro to Incremental Checkpointing
Impossibility of Distributed Consensus with One Faulty Process (AKA, FLP impossibility)
Kubernetes in Action
Akka:Auto-Downing(DO NOT USE)
ZooKeeper: Distributed Process Coordination
An Overview of End-to-End Exactly-Once Processing in Apache Flink
Kafka producers and fault tolerance
想知道更多?掃描下面的二維碼關注我
加技術群入口(備注:Tech):
免費星球入口:
免費資料入口:后臺回復“666”
朕已閱?
總結
以上是生活随笔為你收集整理的压箱底总结:流系统端到端一致性对比的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 下次遇到嚣张的候选人就先这么问:系统变慢
- 下一篇: 多图 | 操作系统中,进程与线程怎么设计