2021年大数据Flink(四十四):扩展阅读 End-to-End Exactly-Once
目錄
擴(kuò)展閱讀 End-to-End Exactly-Once
流處理的數(shù)據(jù)處理語義
At-most-once-最多一次
At-least-once-至少一次
?Exactly-once-精確一次
End-to-End Exactly-Once-端到端的精確一次
注意:精確一次? 有效一次!
補(bǔ)充:流計(jì)算系統(tǒng)如何支持一致性語義
???????End-to-End Exactly-Once的實(shí)現(xiàn)
???????Source
???????Transformation
???????Sink
???????Flink+Kafka的End-to-End Exactly-Once
???????版本說明
???????兩階段提交-API
???????兩階段提交-簡單流程
???????兩階段提交-詳細(xì)流程
代碼示例
Flink+Kafka實(shí)現(xiàn)End-to-End Exactly-Once
???????Flink+MySQL實(shí)現(xiàn)End-to-End Exactly-Once
深度總結(jié)
Exactly-Once
End-to-End Exactly-Once
Flink如何支持End-to-End Exactly-Once的?
兩階段事務(wù)提交協(xié)議
擴(kuò)展閱讀 End-to-End Exactly-Once
Flink 在1.4.0 版本引入『exactly-once』并號稱支持『End-to-End Exactly-Once』“端到端的精確一次”語義。
流處理的數(shù)據(jù)處理語義
對于批處理,fault-tolerant(容錯(cuò)性)很容易做,失敗只需要replay,就可以完美做到容錯(cuò)。
對于流處理,數(shù)據(jù)流本身是動態(tài),沒有所謂的開始或結(jié)束,雖然可以replay buffer的部分?jǐn)?shù)據(jù),但fault-tolerant做起來會復(fù)雜的多
流處理(有時(shí)稱為事件處理)可以簡單地描述為是對無界數(shù)據(jù)或事件的連續(xù)處理。流或事件處理應(yīng)用程序可以或多或少地被描述為有向圖,并且通常被描述為有向無環(huán)圖(DAG)。在這樣的圖中,每個(gè)邊表示數(shù)據(jù)或事件流,每個(gè)頂點(diǎn)表示運(yùn)算符,會使用程序中定義的邏輯處理來自相鄰邊的數(shù)據(jù)或事件。有兩種特殊類型的頂點(diǎn),通常稱為 sources 和 sinks。sources讀取外部數(shù)據(jù)/事件到應(yīng)用程序中,而 sinks 通常會收集應(yīng)用程序生成的結(jié)果。下圖是流式應(yīng)用程序的示例。有如下特點(diǎn):
分布式情況下是由多個(gè)Source(讀取數(shù)據(jù))節(jié)點(diǎn)、多個(gè)Operator(數(shù)據(jù)處理)節(jié)點(diǎn)、多個(gè)Sink(輸出)節(jié)點(diǎn)構(gòu)成
每個(gè)節(jié)點(diǎn)的并行數(shù)可以有差異,且每個(gè)節(jié)點(diǎn)都有可能發(fā)生故障
對于數(shù)據(jù)正確性最重要的一點(diǎn),就是當(dāng)發(fā)生故障時(shí),是怎樣容錯(cuò)與恢復(fù)的。
?
流處理引擎通常為應(yīng)用程序提供了三種數(shù)據(jù)處理語義:最多一次、至少一次和精確一次。
如下是對這些不同處理語義的寬松定義(一致性由弱到強(qiáng)):
At most noce < At least once < Exactly once < End to End Exactly once
At-most-once-最多一次
有可能會有數(shù)據(jù)丟失
這本質(zhì)上是簡單的恢復(fù)方式,也就是直接從失敗處的下個(gè)數(shù)據(jù)開始恢復(fù)程序,之前的失敗數(shù)據(jù)處理就不管了。可以保證數(shù)據(jù)或事件最多由應(yīng)用程序中的所有算子處理一次。 這意味著如果數(shù)據(jù)在被流應(yīng)用程序完全處理之前發(fā)生丟失,則不會進(jìn)行其他重試或者重新發(fā)送。
?
?
?
?
At-least-once-至少一次
有可能重復(fù)處理數(shù)據(jù)
應(yīng)用程序中的所有算子都保證數(shù)據(jù)或事件至少被處理一次。這通常意味著如果事件在流應(yīng)用程序完全處理之前丟失,則將從源頭重放或重新傳輸事件。然而,由于事件是可以被重傳的,因此一個(gè)事件有時(shí)會被處理多次(至少一次),至于有沒有重復(fù)數(shù)據(jù),不會關(guān)心,所以這種場景需要人工干預(yù)自己處理重復(fù)數(shù)據(jù)
?Exactly-once-精確一次
?
Exactly-Once 是 Flink、Spark 等流處理系統(tǒng)的核心特性之一,這種語義會保證每一條消息只被流處理系統(tǒng)處理一次。即使是在各種故障的情況下,流應(yīng)用程序中的所有算子都保證事件只會被『精確一次』的處理。(也有文章將 Exactly-once 翻譯為:完全一次,恰好一次)
Flink實(shí)現(xiàn)『精確一次』的分布式快照/狀態(tài)檢查點(diǎn)方法受到 Chandy-Lamport 分布式快照算法的啟發(fā)。通過這種機(jī)制,流應(yīng)用程序中每個(gè)算子的所有狀態(tài)都會定期做 checkpoint。如果是在系統(tǒng)中的任何地方發(fā)生失敗,每個(gè)算子的所有狀態(tài)都回滾到最新的全局一致 checkpoint 點(diǎn)。在回滾期間,將暫停所有處理。源也會重置為與最近 checkpoint 相對應(yīng)的正確偏移量。整個(gè)流應(yīng)用程序基本上是回到最近一次的一致狀態(tài),然后程序可以從該狀態(tài)重新啟動。
End-to-End Exactly-Once-端到端的精確一次
Flink 在1.4.0 版本引入『exactly-once』并號稱支持『End-to-End Exactly-Once』“端到端的精確一次”語義。
它指的是 Flink 應(yīng)用從 Source 端開始到 Sink 端結(jié)束,數(shù)據(jù)必須經(jīng)過的起始點(diǎn)和結(jié)束點(diǎn)。
注意:
『exactly-once』和『End-to-End Exactly-Once』的區(qū)別:
?
?
?
?
???????注意:精確一次? 有效一次!
有些人可能認(rèn)為『精確一次』描述了事件處理的保證,其中流中的每個(gè)事件只被處理一次。實(shí)際上,沒有引擎能夠保證正好只處理一次。在面對任意故障時(shí),不可能保證每個(gè)算子中的用戶定義邏輯在每個(gè)事件中只執(zhí)行一次,因?yàn)橛脩舸a被部分執(zhí)行的可能性是永遠(yuǎn)存在的。
那么,當(dāng)引擎聲明『精確一次』處理語義時(shí),它們能保證什么呢?如果不能保證用戶邏輯只執(zhí)行一次,那么什么邏輯只執(zhí)行一次?當(dāng)引擎聲明『精確一次』處理語義時(shí),它們實(shí)際上是在說,它們可以保證引擎管理的狀態(tài)更新只提交一次到持久的后端存儲。
事件的處理可以發(fā)生多次,但是該處理的效果只在持久后端狀態(tài)存儲中反映一次。因此,我們認(rèn)為有效地描述這些處理語義最好的術(shù)語是『有效一次』(effectively once)
?
???????補(bǔ)充:流計(jì)算系統(tǒng)如何支持一致性語義
?
?
?
?
?
?
?
???????End-to-End Exactly-Once的實(shí)現(xiàn)
通過前面的學(xué)習(xí),我們了解到,Flink內(nèi)部借助分布式快照Checkpoint已經(jīng)實(shí)現(xiàn)了內(nèi)部的Exactly-Once,但是Flink 自身是無法保證外部其他系統(tǒng)“精確一次”語義的,所以 Flink 若要實(shí)現(xiàn)所謂“端到端(End to End)的精確一次”的要求,那么外部系統(tǒng)必須支持“精確一次”語義;然后借助一些其他手段才能實(shí)現(xiàn)。如下:
???????Source
發(fā)生故障時(shí)需要支持重設(shè)數(shù)據(jù)的讀取位置,如Kafka可以通過offset來實(shí)現(xiàn)(其他的沒有offset系統(tǒng),我們可以自己實(shí)現(xiàn)累加器計(jì)數(shù))
???????Transformation
也就是Flink內(nèi)部,已經(jīng)通過Checkpoint保證了,如果發(fā)生故障或出錯(cuò)時(shí),Flink應(yīng)用重啟后會從最新成功完成的checkpoint中恢復(fù)——重置應(yīng)用狀態(tài)并回滾狀態(tài)到checkpoint中輸入流的正確位置,之后再開始執(zhí)行數(shù)據(jù)處理,就好像該故障或崩潰從未發(fā)生過一般。
- 分布式快照機(jī)制
我們在之前的課程中講解過 Flink 的容錯(cuò)機(jī)制,Flink?提供了失敗恢復(fù)的容錯(cuò)機(jī)制,而這個(gè)容錯(cuò)機(jī)制的核心就是持續(xù)創(chuàng)建分布式數(shù)據(jù)流的快照來實(shí)現(xiàn)。
?
同 Spark 相比,Spark 僅僅是針對 Driver 的故障恢復(fù) Checkpoint。而 Flink 的快照可以到算子級別,并且對全局?jǐn)?shù)據(jù)也可以做快照。Flink 的分布式快照受到 ?Chandy-Lamport?分布式快照算法啟發(fā),同時(shí)進(jìn)行了量身定做。
?
- Barrier
Flink 分布式快照的核心元素之一是 Barrier(數(shù)據(jù)柵欄),我們也可以把 Barrier 簡單地理解成一個(gè)標(biāo)記,該標(biāo)記是嚴(yán)格有序的,并且隨著數(shù)據(jù)流往下流動。每個(gè) Barrier 都帶有自己的 ID,Barrier 極其輕量,并不會干擾正常的數(shù)據(jù)處理。
?
如上圖所示,假如我們有一個(gè)從左向右流動的數(shù)據(jù)流,Flink 會依次生成 snapshot 1、 snapshot 2、snapshot 3……Flink 中有一個(gè)專門的“協(xié)調(diào)者”負(fù)責(zé)收集每個(gè) snapshot 的位置信息,這個(gè)“協(xié)調(diào)者”也是高可用的。
?
Barrier 會隨著正常數(shù)據(jù)繼續(xù)往下流動,每當(dāng)遇到一個(gè)算子,算子會插入一個(gè)標(biāo)識,這個(gè)標(biāo)識的插入時(shí)間是上游所有的輸入流都接收到 snapshot n。與此同時(shí),當(dāng)我們的 sink 算子接收到所有上游流發(fā)送的 Barrier 時(shí),那么就表明這一批數(shù)據(jù)處理完畢,Flink 會向“協(xié)調(diào)者”發(fā)送確認(rèn)消息,表明當(dāng)前的 snapshot n 完成了。當(dāng)所有的 sink 算子都確認(rèn)這批數(shù)據(jù)成功處理后,那么本次的 snapshot 被標(biāo)識為完成。
?
這里就會有一個(gè)問題,因?yàn)?Flink 運(yùn)行在分布式環(huán)境中,一個(gè) operator 的上游會有很多流,每個(gè)流的 barrier n 到達(dá)的時(shí)間不一致怎么辦?這里 Flink 采取的措施是:快流等慢流。
拿上圖的 barrier n 來說,其中一個(gè)流到的早,其他的流到的比較晚。當(dāng)?shù)谝粋€(gè) barrier n到來后,當(dāng)前的 operator 會繼續(xù)等待其他流的 barrier n。直到所有的barrier n 到來后,operator 才會把所有的數(shù)據(jù)向下發(fā)送。
- 異步和增量
按照上面我們介紹的機(jī)制,每次在把快照存儲到我們的狀態(tài)后端時(shí),如果是同步進(jìn)行就會阻塞正常任務(wù),從而引入延遲。因此 Flink 在做快照存儲時(shí),可采用異步方式。
此外,由于 checkpoint 是一個(gè)全局狀態(tài),用戶保存的狀態(tài)可能非常大,多數(shù)達(dá) G 或者 T 級別。在這種情況下,checkpoint 的創(chuàng)建會非常慢,而且執(zhí)行時(shí)占用的資源也比較多,因此 Flink 提出了增量快照的概念。也就是說,每次都是進(jìn)行的全量 checkpoint,是基于上次進(jìn)行更新的。
?
???????Sink
需要支持冪等寫入或事務(wù)寫入(Flink的兩階段提交需要事務(wù)支持)
?
???????冪等寫入(Idempotent Writes)
冪等寫操作是指:任意多次向一個(gè)系統(tǒng)寫入數(shù)據(jù),只對目標(biāo)系統(tǒng)產(chǎn)生一次結(jié)果影響。
例如,重復(fù)向一個(gè)HashMap里插入同一個(gè)Key-Value二元對,第一次插入時(shí)這個(gè)HashMap發(fā)生變化,后續(xù)的插入操作不會改變HashMap的結(jié)果,這就是一個(gè)冪等寫操作。
HBase、Redis和Cassandra這樣的KV數(shù)據(jù)庫一般經(jīng)常用來作為Sink,用以實(shí)現(xiàn)端到端的Exactly-Once。
需要注意的是,并不是說一個(gè)KV數(shù)據(jù)庫就百分百支持冪等寫。冪等寫對KV對有要求,那就是Key-Value必須是可確定性(Deterministic)計(jì)算的。假如我們設(shè)計(jì)的Key是:name + curTimestamp,每次執(zhí)行數(shù)據(jù)重發(fā)時(shí),生成的Key都不相同,會產(chǎn)生多次結(jié)果,整個(gè)操作不是冪等的。因此,為了追求端到端的Exactly-Once,我們設(shè)計(jì)業(yè)務(wù)邏輯時(shí)要盡量使用確定性的計(jì)算邏輯和數(shù)據(jù)模型。
?
???????事務(wù)寫入(Transactional Writes)
Flink借鑒了數(shù)據(jù)庫中的事務(wù)處理技術(shù),同時(shí)結(jié)合自身的Checkpoint機(jī)制來保證Sink只對外部輸出產(chǎn)生一次影響。大致的流程如下:
Flink先將待輸出的數(shù)據(jù)保存下來暫時(shí)不向外部系統(tǒng)提交,等到Checkpoint結(jié)束時(shí),Flink上下游所有算子的數(shù)據(jù)都是一致的時(shí)候,Flink將之前保存的數(shù)據(jù)全部提交(Commit)到外部系統(tǒng)。換句話說,只有經(jīng)過Checkpoint確認(rèn)的數(shù)據(jù)才向外部系統(tǒng)寫入。
如下圖所示,如果使用事務(wù)寫,那只把時(shí)間戳3之前的輸出提交到外部系統(tǒng),時(shí)間戳3以后的數(shù)據(jù)(例如時(shí)間戳5和8生成的數(shù)據(jù))暫時(shí)保存下來,等待下次Checkpoint時(shí)一起寫入到外部系統(tǒng)。這就避免了時(shí)間戳5這個(gè)數(shù)據(jù)產(chǎn)生多次結(jié)果,多次寫入到外部系統(tǒng)。
?
在事務(wù)寫的具體實(shí)現(xiàn)上,Flink目前提供了兩種方式:
1.預(yù)寫日志(Write-Ahead-Log,WAL)
2.兩階段提交(Two-Phase-Commit,2PC)
這兩種方式區(qū)別主要在于:
1.WAL方式通用性更強(qiáng),適合幾乎所有外部系統(tǒng),但也不能提供百分百端到端的Exactly-Once,因?yàn)閃AL預(yù)習(xí)日志會先寫內(nèi)存,而內(nèi)存是易失介質(zhì)。
2.如果外部系統(tǒng)自身就支持事務(wù)(比如MySQL、Kafka),可以使用2PC方式,可以提供百分百端到端的Exactly-Once。
事務(wù)寫的方式能提供端到端的Exactly-Once一致性,它的代價(jià)也是非常明顯的,就是犧牲了延遲。輸出數(shù)據(jù)不再是實(shí)時(shí)寫入到外部系統(tǒng),而是分批次地提交。目前來說,沒有完美的故障恢復(fù)和Exactly-Once保障機(jī)制,對于開發(fā)者來說,需要在不同需求之間權(quán)衡。
?
???????Flink+Kafka的End-to-End Exactly-Once
在上一小節(jié)我們了解到Flink的 End-to-End Exactly-Once需要Checkpoint+事務(wù)的提交/回滾操作,在分布式系統(tǒng)中協(xié)調(diào)提交和回滾的一個(gè)常見方法就是使用兩階段提交協(xié)議。接下來我們了解下Flink的TwoPhaseCommitSinkFunction是如何支持End-to-End Exactly-Once的
?
???????版本說明
Flink 1.4版本之前,支持Exactly Once語義,僅限于應(yīng)用內(nèi)部。
Flink 1.4版本之后,通過兩階段提交(TwoPhaseCommitSinkFunction)支持End-To-End Exactly Once,而且要求Kafka 0.11+。
利用TwoPhaseCommitSinkFunction是通用的管理方案,只要實(shí)現(xiàn)對應(yīng)的接口,而且Sink的存儲支持變亂提交,即可實(shí)現(xiàn)端到端的劃一性語義。
?
???????兩階段提交-API
在 Flink 中的Two-Phase-Commit-2PC兩階段提交的實(shí)現(xiàn)方法被封裝到了 TwoPhaseCommitSinkFunction 這個(gè)抽象類中,只需要實(shí)現(xiàn)其中的beginTransaction、preCommit、commit、abort 四個(gè)方法就可以實(shí)現(xiàn)“精確一次”的處理語義,如FlinkKafkaProducer就實(shí)現(xiàn)了該類并實(shí)現(xiàn)了這些方法
?
1.beginTransaction,在開啟事務(wù)之前,我們在目標(biāo)文件系統(tǒng)的臨時(shí)目錄中創(chuàng)建一個(gè)臨時(shí)文件,后面在處理數(shù)據(jù)時(shí)將數(shù)據(jù)寫入此文件;
2.preCommit,在預(yù)提交階段,刷寫(flush)文件,然后關(guān)閉文件,之后就不能寫入到文件了,我們還將為屬于下一個(gè)檢查點(diǎn)的任何后續(xù)寫入啟動新事務(wù);
3.commit,在提交階段,我們將預(yù)提交的文件原子性移動到真正的目標(biāo)目錄中,請注意,這會增加輸出數(shù)據(jù)可見性的延遲;
4.abort,在中止階段,我們刪除臨時(shí)文件。
?
???????兩階段提交-簡單流程
?
整個(gè)過程可以總結(jié)為下面四個(gè)階段:
1.一旦 Flink 開始做 checkpoint 操作,那么就會進(jìn)入 pre-commit “預(yù)提交”階段,同時(shí)JobManager的Coordinator?會將?Barrier 注入數(shù)據(jù)流中 ;
2.當(dāng)所有的 barrier 在算子中成功進(jìn)行一遍傳遞(就是Checkpoint完成),并完成快照后,則“預(yù)提交”階段完成;
3.等所有的算子完成“預(yù)提交”,就會發(fā)起一個(gè)commit“提交”動作,但是任何一個(gè)“預(yù)提交”失敗都會導(dǎo)致 Flink 回滾到最近的 checkpoint;
?
???????兩階段提交-詳細(xì)流程
- 需求
接下來將介紹兩階段提交協(xié)議,以及它如何在一個(gè)讀寫Kafka的Flink程序中實(shí)現(xiàn)端到端的Exactly-Once語義。Kafka經(jīng)常與Flink一起使用,且Kafka在最近的0.11版本中添加了對事務(wù)的支持。這意味著現(xiàn)在通過Flink讀寫Kafaka,并提供端到端的Exactly-Once語義有了必要的支持。
?
在上圖中,我們有:
– 從Kafka讀取的數(shù)據(jù)源(Flink內(nèi)置的KafkaConsumer)
– 窗口聚合
– 將數(shù)據(jù)寫回Kafka的數(shù)據(jù)輸出端(Flink內(nèi)置的KafkaProducer)
要使數(shù)據(jù)輸出端提供Exactly-Once保證,它必須將所有數(shù)據(jù)通過一個(gè)事務(wù)提交給Kafka。提交捆綁了兩個(gè)checkpoint之間的所有要寫入的數(shù)據(jù)。這可確保在發(fā)生故障時(shí)能回滾寫入的數(shù)據(jù)。
但是在分布式系統(tǒng)中,通常會有多個(gè)并發(fā)運(yùn)行的寫入任務(wù)的,簡單的提交或回滾是不夠的,因?yàn)樗薪M件必須在提交或回滾時(shí)“一致”才能確保一致的結(jié)果。
Flink使用兩階段提交協(xié)議及預(yù)提交階段來解決這個(gè)問題。
?
- 預(yù)提交-內(nèi)部狀態(tài)
在checkpoint開始的時(shí)候,即兩階段提交協(xié)議的“預(yù)提交”階段。當(dāng)checkpoint開始時(shí),Flink的JobManager會將checkpoint barrier(將數(shù)據(jù)流中的記錄分為進(jìn)入當(dāng)前checkpoint與進(jìn)入下一個(gè)checkpoint)注入數(shù)據(jù)流。
brarrier在operator之間傳遞。對于每一個(gè)operator,它觸發(fā)operator的狀態(tài)快照寫入到state backend。
?
數(shù)據(jù)源保存了消費(fèi)Kafka的偏移量(offset),之后將checkpoint barrier傳遞給下一個(gè)operator。
這種方式僅適用于operator具有『內(nèi)部』狀態(tài)。所謂內(nèi)部狀態(tài),是指Flink state backend保存和管理的 -例如,第二個(gè)operator中window聚合算出來的sum值。當(dāng)一個(gè)進(jìn)程有它的內(nèi)部狀態(tài)的時(shí)候,除了在checkpoint之前需要將數(shù)據(jù)變更寫入到state backend,不需要在預(yù)提交階段執(zhí)行任何其他操作。Flink負(fù)責(zé)在checkpoint成功的情況下正確提交這些寫入,或者在出現(xiàn)故障時(shí)中止這些寫入。
?
- 預(yù)提交-外部狀態(tài)
但是,當(dāng)進(jìn)程具有『外部』狀態(tài)時(shí),需要作些額外的處理。外部狀態(tài)通常以寫入外部系統(tǒng)(如Kafka)的形式出現(xiàn)。在這種情況下,為了提供Exactly-Once保證,外部系統(tǒng)必須支持事務(wù),這樣才能和兩階段提交協(xié)議集成。
在該示例中的數(shù)據(jù)需要寫入Kafka,因此數(shù)據(jù)輸出端(Data Sink)有外部狀態(tài)。在這種情況下,在預(yù)提交階段,除了將其狀態(tài)寫入state backend之外,數(shù)據(jù)輸出端還必須預(yù)先提交其外部事務(wù)。
?
當(dāng)checkpoint barrier在所有operator都傳遞了一遍,并且觸發(fā)的checkpoint回調(diào)成功完成時(shí),預(yù)提交階段就結(jié)束了。所有觸發(fā)的狀態(tài)快照都被視為該checkpoint的一部分。checkpoint是整個(gè)應(yīng)用程序狀態(tài)的快照,包括預(yù)先提交的外部狀態(tài)。如果發(fā)生故障,我們可以回滾到上次成功完成快照的時(shí)間點(diǎn)。
- 提交階段
下一步是通知所有operator,checkpoint已經(jīng)成功了。這是兩階段提交協(xié)議的提交階段,JobManager為應(yīng)用程序中的每個(gè)operator發(fā)出checkpoint已完成的回調(diào)。
數(shù)據(jù)源和widnow operator沒有外部狀態(tài),因此在提交階段,這些operator不必執(zhí)行任何操作。但是,數(shù)據(jù)輸出端(Data Sink)擁有外部狀態(tài),此時(shí)應(yīng)該提交外部事務(wù)。
?
- 總結(jié)
我們對上述知識點(diǎn)總結(jié)下:
1.一旦所有operator完成預(yù)提交,就提交一個(gè)commit。
2.如果只要有一個(gè)預(yù)提交失敗,則所有其他提交都將中止,我們將回滾到上一個(gè)成功完成的checkpoint。
3.在預(yù)提交成功之后,提交的commit需要保證最終成功 – operator和外部系統(tǒng)都需要保障這點(diǎn)。如果commit失敗(例如,由于間歇性網(wǎng)絡(luò)問題),整個(gè)Flink應(yīng)用程序?qū)⑹?#xff0c;應(yīng)用程序?qū)⒏鶕?jù)用戶的重啟策略重新啟動,還會嘗試再提交。這個(gè)過程至關(guān)重要,因?yàn)槿绻鹀ommit最終沒有成功,將會導(dǎo)致數(shù)據(jù)丟失。
4.完整的實(shí)現(xiàn)兩階段提交協(xié)議可能有點(diǎn)復(fù)雜,這就是為什么Flink將它的通用邏輯提取到抽象類TwoPhaseCommitSinkFunction中的原因。
?
代碼示例
Flink+Kafka實(shí)現(xiàn)End-to-End Exactly-Once
https://ververica.cn/developers/flink-kafka-end-to-end-exactly-once-analysis/
package cn.lanson.extend;import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;/*** Author lanson* Desc* Kafka --> Flink-->Kafka ?的End-To-End-Exactly-once* 直接使用* FlinkKafkaConsumer ?+ ?Flink的Checkpoint ?+ ?FlinkKafkaProducer*/
public class Kafka_Flink_Kafka_EndToEnd_ExactlyOnce {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//===========Checkpoint參數(shù)設(shè)置====//===========類型1:必須參數(shù)=============//設(shè)置Checkpoint的時(shí)間間隔為1000ms做一次Checkpoint/其實(shí)就是每隔1000ms發(fā)一次Barrier!env.enableCheckpointing(1000);//設(shè)置State狀態(tài)存儲介質(zhì)if (SystemUtils.IS_OS_WINDOWS) {env.setStateBackend(new FsStateBackend("file:///D:/ckp"));} else {env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink-checkpoint/checkpoint"));}//===========類型2:建議參數(shù)===========//設(shè)置兩個(gè)Checkpoint 之間最少等待時(shí)間,如設(shè)置Checkpoint之間最少是要等?500ms(為了避免每隔1000ms做一次Checkpoint的時(shí)候,前一次太慢和后一次重疊到一起去了)//如:高速公路上,每隔1s關(guān)口放行一輛車,但是規(guī)定了兩車之前的最小車距為500menv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默認(rèn)是0//設(shè)置如果在做Checkpoint過程中出現(xiàn)錯(cuò)誤,是否讓整體任務(wù)失敗:true是??false不是//env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默認(rèn)是trueenv.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默認(rèn)值為0,表示不容忍任何檢查點(diǎn)失敗//設(shè)置是否清理檢查點(diǎn),表示?Cancel 時(shí)是否需要保留當(dāng)前的?Checkpoint,默認(rèn)?Checkpoint會在作業(yè)被Cancel時(shí)被刪除//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,當(dāng)作業(yè)被取消時(shí),刪除外部的checkpoint(默認(rèn)值)//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,當(dāng)作業(yè)被取消時(shí),保留外部的checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//===========類型3:直接使用默認(rèn)的即可===============//設(shè)置checkpoint的執(zhí)行模式為EXACTLY_ONCE(默認(rèn))env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//設(shè)置checkpoint的超時(shí)時(shí)間,如果?Checkpoint在?60s內(nèi)尚未完成說明該次Checkpoint失敗,則丟棄。env.getCheckpointConfig().setCheckpointTimeout(60000);//默認(rèn)10分鐘//設(shè)置同一時(shí)間有多少個(gè)checkpoint可以同時(shí)執(zhí)行env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默認(rèn)為1//=============重啟策略===========env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));//2.SourceProperties props_source = new Properties();props_source.setProperty("bootstrap.servers", "node1:9092");props_source.setProperty("group.id", "flink");props_source.setProperty("auto.offset.reset", "latest");props_source.setProperty("flink.partition-discovery.interval-millis", "5000");//會開啟一個(gè)后臺線程每隔5s檢測一下Kafka的分區(qū)情況//props_source.setProperty("enable.auto.commit", "true");//沒有Checkpoint的時(shí)候使用自動提交偏移量到默認(rèn)主題:__consumer_offsets中//props_source.setProperty("auto.commit.interval.ms", "2000");//kafkaSource就是KafkaConsumerFlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("flink_kafka", new SimpleStringSchema(), props_source);kafkaSource.setStartFromLatest();//kafkaSource.setStartFromGroupOffsets();//設(shè)置從記錄的offset開始消費(fèi),如果沒有記錄從auto.offset.reset配置開始消費(fèi)//kafkaSource.setStartFromEarliest();//設(shè)置直接從Earliest消費(fèi),和auto.offset.reset配置無關(guān)kafkaSource.setCommitOffsetsOnCheckpoints(true);//執(zhí)行Checkpoint的時(shí)候提交offset到Checkpoint(Flink用),并且提交一份到默認(rèn)主題:__consumer_offsets(外部其他系統(tǒng)想用的話也可以獲取到)DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);//3.Transformation//3.1切割出每個(gè)單詞并直接記為1SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {//value就是每一行String[] words = value.split(" ");for (String word : words) {Random random = new Random();int i = random.nextInt(5);if (i > 3) {System.out.println("出bug了...");throw new RuntimeException("出bug了...");}out.collect(Tuple2.of(word, 1));}}});//3.2分組//注意:批處理的分組是groupBy,流處理的分組是keyByKeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);//3.3聚合SingleOutputStreamOperator<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);//3.4將聚合結(jié)果轉(zhuǎn)為自定義的字符串格式SingleOutputStreamOperator<String> result = (SingleOutputStreamOperator<String>) aggResult.map(new RichMapFunction<Tuple2<String, Integer>, String>() {@Overridepublic String map(Tuple2<String, Integer> value) throws Exception {return value.f0 + ":::" + value.f1;}});//4.sink//result.print();Properties props_sink = new Properties();props_sink.setProperty("bootstrap.servers", "node1:9092");props_sink.setProperty("transaction.timeout.ms", 1000 * 5 + "");//設(shè)置事務(wù)超時(shí)時(shí)間,也可在kafka配置中設(shè)置/*FlinkKafkaProducer<String> kafkaSink0 = new FlinkKafkaProducer<>("flink_kafka",new SimpleStringSchema(),props_sink);*/FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka2",new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()),props_sink,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);result.addSink(kafkaSink);//5.executeenv.execute();//測試://1.創(chuàng)建主題?/export/server/kafka/bin/kafka-topics.sh --zookeeper node1:2181 --create --replication-factor 2 --partitions 3 --topic flink_kafka2//2.開啟控制臺生產(chǎn)者?/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka//3.開啟控制臺消費(fèi)者?/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka2}
}
?
???????Flink+MySQL實(shí)現(xiàn)End-to-End Exactly-Once
https://www.jianshu.com/p/5bdd9a0d7d02
- 需求
1.checkpoint每10s進(jìn)行一次,此時(shí)用FlinkKafkaConsumer實(shí)時(shí)消費(fèi)kafka中的消息
2.消費(fèi)并處理完消息后,進(jìn)行一次預(yù)提交數(shù)據(jù)庫的操作
3.如果預(yù)提交沒有問題,10s后進(jìn)行真正的插入數(shù)據(jù)庫操作,如果插入成功,進(jìn)行一次checkpoint,flink會自動記錄消費(fèi)的offset,可以將checkpoint保存的數(shù)據(jù)放到hdfs中
4.如果預(yù)提交出錯(cuò),比如在5s的時(shí)候出錯(cuò)了,此時(shí)Flink程序就會進(jìn)入不斷的重啟中,重啟的策略可以在配置中設(shè)置,checkpoint記錄的還是上一次成功消費(fèi)的offset,因?yàn)楸敬蜗M(fèi)的數(shù)據(jù)在checkpoint期間,消費(fèi)成功,但是預(yù)提交過程中失敗了
5.注意此時(shí)數(shù)據(jù)并沒有真正的執(zhí)行插入操作,因?yàn)轭A(yù)提交(preCommit)失敗,提交(commit)過程也不會發(fā)生。等將異常數(shù)據(jù)處理完成之后,再重新啟動這個(gè)Flink程序,它會自動從上一次成功的checkpoint中繼續(xù)消費(fèi)數(shù)據(jù),以此來達(dá)到Kafka到Mysql的Exactly-Once。
?
- 代碼1
package cn.lanson.extend;import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; import org.apache.kafka.clients.CommonClientConfigs;import java.sql.*; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties;public class Kafka_Flink_MySQL_EndToEnd_ExactlyOnce {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//方便測試env.enableCheckpointing(10000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);//env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setStateBackend(new FsStateBackend("file:///D:/ckp"));//2.SourceString topic = "flink_kafka";Properties props = new Properties();props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,"node1:9092");props.setProperty("group.id","flink");props.setProperty("auto.offset.reset","latest");//如果有記錄偏移量從記錄的位置開始消費(fèi),如果沒有從最新的數(shù)據(jù)開始消費(fèi)props.setProperty("flink.partition-discovery.interval-millis","5000");//開一個(gè)后臺線程每隔5s檢查Kafka的分區(qū)狀態(tài)FlinkKafkaConsumer<ObjectNode> kafkaSource = new FlinkKafkaConsumer<>("topic_in", new JSONKeyValueDeserializationSchema(true), props);kafkaSource.setStartFromGroupOffsets();//從group offset記錄的位置位置開始消費(fèi),如果kafka broker 端沒有該group信息,會根據(jù)"auto.offset.reset"的設(shè)置來決定從哪開始消費(fèi)kafkaSource.setCommitOffsetsOnCheckpoints(true);//Flink執(zhí)行Checkpoint的時(shí)候提交偏移量(一份在Checkpoint中,一份在Kafka的默認(rèn)主題中__comsumer_offsets(方便外部監(jiān)控工具去看))DataStreamSource<ObjectNode> kafkaDS = env.addSource(kafkaSource);//3.transformation//4.SinkkafkaDS.addSink(new MySqlTwoPhaseCommitSink()).name("MySqlTwoPhaseCommitSink");//5.executeenv.execute();} }/**自定義kafka to mysql,繼承TwoPhaseCommitSinkFunction,實(shí)現(xiàn)兩階段提交。功能:保證kafak to mysql 的Exactly-OnceCREATE TABLE `t_test` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`value` varchar(255) DEFAULT NULL,`insert_time` datetime DEFAULT NULL,PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4*/ class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<ObjectNode, Connection, Void> {public MySqlTwoPhaseCommitSink() {super(new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE);}/*** 執(zhí)行數(shù)據(jù)入庫操作*/@Overrideprotected void invoke(Connection connection, ObjectNode objectNode, Context context) throws Exception {System.err.println("start invoke.......");String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());System.err.println("===>date:" + date + " " + objectNode);String value = objectNode.get("value").toString();String sql = "insert into `t_test` (`value`,`insert_time`) values (?,?)";PreparedStatement ps = connection.prepareStatement(sql);ps.setString(1, value);ps.setTimestamp(2, new Timestamp(System.currentTimeMillis()));//執(zhí)行insert語句ps.execute();//手動制造異常if(Integer.parseInt(value) == 15) System.out.println(1/0);}/*** 獲取連接,開啟手動提交事務(wù)(getConnection方法中)*/@Overrideprotected Connection beginTransaction() throws Exception {String url = "jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true";Connection connection = DBConnectUtil.getConnection(url, "root", "root");System.err.println("start beginTransaction......."+connection);return connection;}/*** 預(yù)提交,這里預(yù)提交的邏輯在invoke方法中*/@Overrideprotected void preCommit(Connection connection) throws Exception {System.err.println("start preCommit......."+connection);}/*** 如果invoke執(zhí)行正常則提交事務(wù)*/@Overrideprotected void commit(Connection connection) {System.err.println("start commit......."+connection);DBConnectUtil.commit(connection);}@Overrideprotected void recoverAndCommit(Connection connection) {System.err.println("start recoverAndCommit......."+connection);}@Overrideprotected void recoverAndAbort(Connection connection) {System.err.println("start abort recoverAndAbort......."+connection);}/*** 如果invoke執(zhí)行異常則回滾事務(wù),下一次的checkpoint操作也不會執(zhí)行*/@Overrideprotected void abort(Connection connection) {System.err.println("start abort rollback......."+connection);DBConnectUtil.rollback(connection);} }class DBConnectUtil {/*** 獲取連接*/public static Connection getConnection(String url, String user, String password) throws SQLException {Connection conn = null;conn = DriverManager.getConnection(url, user, password);//設(shè)置手動提交conn.setAutoCommit(false);return conn;}/*** 提交事務(wù)*/public static void commit(Connection conn) {if (conn != null) {try {conn.commit();} catch (SQLException e) {e.printStackTrace();} finally {close(conn);}}}/*** 事務(wù)回滾*/public static void rollback(Connection conn) {if (conn != null) {try {conn.rollback();} catch (SQLException e) {e.printStackTrace();} finally {close(conn);}}}/*** 關(guān)閉連接*/public static void close(Connection conn) {if (conn != null) {try {conn.close();} catch (SQLException e) {e.printStackTrace();}}} }?
?
- 代碼2
package cn.lanson.extend;import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class DataProducer {public static void main(String[] args) throws InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", "node1:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);try {for (int i = 1; i <= 20; i++) {DataBean data = new DataBean(String.valueOf(i));ProducerRecord record = new ProducerRecord<String, String>("flink_kafka", null, null, JSON.toJSONString(data));producer.send(record);System.out.println("發(fā)送數(shù)據(jù): " + JSON.toJSONString(data));Thread.sleep(1000);}}catch (Exception e){System.out.println(e);}producer.flush();}
}@Data
@NoArgsConstructor
@AllArgsConstructor
class DataBean {private String value;
}
深度總結(jié)
Exactly-Once
流數(shù)據(jù)處理的容錯(cuò)語義:
At most once --最多一次, 也就是說數(shù)據(jù)最多被處理一次,有可能會丟失
At least once --至少一次, 也就是說數(shù)據(jù)至少會被處理一次,有可能會重復(fù)
Exactly-Once --精準(zhǔn)一次, 也就是說數(shù)據(jù)只會被處理一次,不會丟也不會重復(fù),注意: ==更準(zhǔn)確的理解應(yīng)該是只會被正確處理一次而不是僅一次==
End-to-End Exactly-Once
Flink不僅僅支持Exactly-Once,而且還支持End-to-End Exactly-Once
End-to-End Exactly-Once : 端到端的Exactly-Once, 也就是說, Flink不光光內(nèi)部處理數(shù)據(jù)的時(shí)候支持Exactly-Once, 在從Source消費(fèi), 到Transformation處理, 再到Sink,整個(gè)流數(shù)據(jù)處理,從一端到另一端 整個(gè)流程都支持Exactly-Once !
Flink如何支持End-to-End Exactly-Once的?
-
Source: offset+Checkpoint: 需要數(shù)據(jù)源支持offset維護(hù),如Kafka(offset) + Flink(使用Checkpoint維護(hù)offset) 就是支持
FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase FlinkKafkaConsumerBase<T> implements CheckpointedFunction 源碼中就記錄了主題分區(qū)和offset信息 ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates initializeState方法和snapshotState方法 -
Transformation:Checkpoint
Flink官方介紹說的就是支持?jǐn)?shù)據(jù)流上的有狀態(tài)計(jì)算! Flink中的有狀態(tài)的Transformation-API都是自動維護(hù)狀態(tài)到的(到Checkpoint中),如sum/reduce/maxBy..... -
Sink:兩階段事務(wù)提交+Checkpoint
Flink+Kafka, Kafka是支持事務(wù)的,所以可以使用兩階段事務(wù)提交來實(shí)現(xiàn) FlinkKafkaProducer<IN> extends TwoPhaseCommitSinkFunction beginTransaction preCommit commit abort
兩階段事務(wù)提交協(xié)議
?
整個(gè)過程可以總結(jié)為下面階段:
1.一旦 Flink 開始做 checkpoint 操作,那么就beginTransaction開啟事務(wù)會進(jìn)入 pre-commit “預(yù)提交”階段,同時(shí)JobManager的Coordinator 會將 Barrier 注入數(shù)據(jù)流中 ;
2.當(dāng)所有的 barrier 在算子中成功進(jìn)行一遍傳遞(就是Checkpoint完成),并完成快照后,則“預(yù)提交”階段完成;
3.等所有的算子完成“預(yù)提交”,就會發(fā)起一個(gè)commit“提交”動作,但是任何一個(gè)“預(yù)提交”失敗都會導(dǎo)致 Flink 回滾到最近的 checkpoint(abort終止事務(wù));
注意: 兩階段事務(wù)提交本身的實(shí)現(xiàn)流程較為固定(主要就是4個(gè)方法的實(shí)現(xiàn)beginTransaction/preCommit/commit/abort), 但是代碼實(shí)現(xiàn)細(xì)節(jié)較為復(fù)雜,所以Flink提供了abstract class TwoPhaseCommitSinkFunction抽象類,并提供了Sink到Kafka的具體實(shí)現(xiàn):FlinkKafkaProducer extends TwoPhaseCommitSinkFunction里面已經(jīng)實(shí)現(xiàn)好了相應(yīng)的方法
注意: 小技巧:
面試時(shí)被問到. 需要說出幾個(gè)核心名詞: FlinkKafkaProducer 兩階段事務(wù)提交/TwoPhaseCommitSinkFunction/beginTransaction/preCommit/commit/abort
總結(jié)
以上是生活随笔為你收集整理的2021年大数据Flink(四十四):扩展阅读 End-to-End Exactly-Once的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据常用语言Scala(三十
- 下一篇: 2021年大数据Flink(四十五):