汽车之家基于 Flink 的数据传输平台的设计与实践
數(shù)據(jù)接入與傳輸作為打通數(shù)據(jù)系統(tǒng)與業(yè)務(wù)系統(tǒng)的一道橋梁,是數(shù)據(jù)系統(tǒng)與架構(gòu)中不可或缺的一個(gè)重要部分。數(shù)據(jù)傳輸系統(tǒng)穩(wěn)定性和準(zhǔn)確性,直接影響整個(gè)數(shù)據(jù)系統(tǒng)服務(wù)的 SLA 和質(zhì)量。此外如何提升系統(tǒng)的易用性,保證監(jiān)控服務(wù)并降低系統(tǒng)維護(hù)成本,優(yōu)雅應(yīng)對(duì)災(zāi)難等問題也十分重要。
本文介紹了汽車之家實(shí)時(shí)計(jì)算團(tuán)隊(duì)利用 Flink 和 Flink 實(shí)時(shí)平臺(tái)構(gòu)建數(shù)據(jù)傳輸 SDK 和傳輸平臺(tái)并不斷完善的實(shí)踐經(jīng)驗(yàn)與總結(jié)。內(nèi)容包括:
一、背景與需求
汽車之家(下稱之家)作為一家數(shù)據(jù)智能驅(qū)動(dòng)的公司,天然存在著對(duì)數(shù)據(jù)的各種復(fù)雜需求,之家的數(shù)據(jù)系統(tǒng)負(fù)責(zé)支撐這些業(yè)務(wù)需求的開展。數(shù)據(jù)傳輸系統(tǒng),作為其中一環(huán),承擔(dān)了各類數(shù)據(jù)導(dǎo)入分發(fā)的需求,支持用戶訂閱數(shù)據(jù)變更。隨著支撐的業(yè)務(wù)擴(kuò)增與需求的增加。原來的接入系統(tǒng)暴露出了一定的問題和不足:
- 缺乏有效的任務(wù)與信息管理機(jī)制,依賴人工進(jìn)行任務(wù)的管理和運(yùn)維,信息的統(tǒng)計(jì)
- 接入程序資源使用浪費(fèi),缺乏彈性
- 針對(duì) DDL 變更問題,不能很好的處理,必要時(shí)需要人工介入
- 傳輸系統(tǒng)依賴的組件比較多,比如 Zookeeper,Redis 等
- 代碼的技術(shù)債累積,代碼維護(hù)成本變高
針對(duì)上述問題,我們決定開發(fā)一套新的數(shù)據(jù)傳輸和分發(fā)系統(tǒng),一舉解決上述問題。
二、技術(shù)選型與設(shè)計(jì) —— Why Flink?
在開展新系統(tǒng)的開發(fā)工作之前,我們分析的可選的方案思路大體分三種:
我們規(guī)約出以下主要設(shè)計(jì)使用目標(biāo):
- 架構(gòu)設(shè)計(jì)上要運(yùn)維管理是友好的,提供高可用以及故障恢復(fù)策略,支持異地多活
- 架構(gòu)設(shè)計(jì)上要提供強(qiáng)數(shù)據(jù)準(zhǔn)確性,至少承諾 at-least-once 語義
- 架構(gòu)設(shè)計(jì)上要對(duì)擴(kuò)縮容是友好的,可以按需分配資源
- 功能設(shè)計(jì)上要全面的監(jiān)控覆蓋和完善的報(bào)警機(jī)制,支持元數(shù)據(jù)信息管理
- 功能設(shè)計(jì)上要對(duì)實(shí)時(shí)計(jì)算是友好的(1)
- 功能設(shè)計(jì)上要能完全防御 DDL 變更帶來的問題
此外,在性能指標(biāo)上,接入系統(tǒng)的延時(shí)和吞吐至少要滿足所有業(yè)務(wù)常規(guī)狀態(tài)下的需求。
(1) 指與實(shí)時(shí)計(jì)算平臺(tái)整合的能力方案設(shè)計(jì)與對(duì)比
依照設(shè)計(jì)思路和目標(biāo),我們整理了方案主要功能的對(duì)比表格:
(1)Flink 自帶高可用和故障恢復(fù),實(shí)時(shí)計(jì)算平臺(tái)在此基礎(chǔ)上提供更強(qiáng)的高可用服務(wù)(2)良好的編碼 + flink 機(jī)制即可實(shí)現(xiàn) Exactly-Once
(3)實(shí)時(shí)計(jì)算平臺(tái)自帶任務(wù)部署管理能力
(4)實(shí)時(shí)計(jì)算平臺(tái)自帶完備的監(jiān)控和管理
經(jīng)過討論,大家一致決定基于Flink進(jìn)行新的傳輸平臺(tái)的開發(fā):
我們的 MVP 版本開發(fā)完成大約只花費(fèi)了不到 3 周的時(shí)間,POC 的結(jié)果完全符合預(yù)期的性能要求和功能要求。
三、數(shù)據(jù)傳輸系統(tǒng)的設(shè)計(jì)架構(gòu)
從邏輯層面來看,之家的實(shí)時(shí)數(shù)據(jù)傳輸平臺(tái)分為 3 部分:
- 數(shù)據(jù)傳輸程序
- 接入任務(wù)信息管理模塊
- 任務(wù)執(zhí)行 Runtime 模塊
在實(shí)現(xiàn)上:
- 數(shù)據(jù)傳輸程序是由固定的 Flink Jar 和 Flink SQL Codegen Service 生成的SQL Task 組成
- 管理模塊作為一個(gè)微服務(wù),負(fù)責(zé)與 Flink 平臺(tái)組件通信,完成必要的任務(wù)管理和信息管理
- 執(zhí)行層直接依賴 Flink 平臺(tái)和 Flink 平臺(tái)的集群
組件架構(gòu)與交互邏輯
傳輸系統(tǒng)涉及到的組件和交互如圖所示:
AutoDTS 即為傳輸系統(tǒng)的任務(wù)信息管理模塊,AutoStream Core 為 Flink 實(shí)時(shí)平臺(tái)核心系統(tǒng),Jar Service 是 Flink 相關(guān) SDK Jar 儲(chǔ)存管理服務(wù),Metastore 為 Flink 平臺(tái)的元數(shù)據(jù)管理系統(tǒng),Flink Client 是我們自己封裝的 Submit Client,支持以 Restful 方式向 YARN/K8S 上提交作業(yè)。
AutoDTS 前端直接與用戶進(jìn)行交互,完成用戶對(duì)任務(wù)信息的修改和任務(wù)生命周期的操作。AutoDTS 將任務(wù)信息處理后與 Flink 平臺(tái)交互,每一個(gè)數(shù)據(jù)傳輸任務(wù)對(duì)應(yīng)Flink平臺(tái)唯一一個(gè)任務(wù),同時(shí),部分任務(wù)信息被 AutoDTS 處理,會(huì)直接在 Metastore 上完成對(duì)應(yīng)流表的創(chuàng)建。用戶直接申請(qǐng)并使用該 Flink 流表,進(jìn)行 SQL 任務(wù)的開發(fā)。
針對(duì)不同的傳輸任務(wù),AutoDTS 會(huì)委托 Core System 組織任務(wù)參數(shù)和 SQL 邏輯,并從 Jar Service 加載不同的 SDK Jar 提交到 Client 去執(zhí)行,對(duì)于基于 SQL Codegen 的傳輸任務(wù),Flink SQL Codegen Service 會(huì)將任務(wù)參數(shù)組織整合翻譯成可執(zhí)行的 Flink SQL 任務(wù),通過 SQL 任務(wù),我們可以直接復(fù)用平臺(tái) SQL SDKs,執(zhí)行 SQL 作業(yè)。
正如前文提到的,我們最大限度復(fù)用已有組件和服務(wù),大大降低了開發(fā)的周期。
傳輸任務(wù)類型與構(gòu)成
之家的數(shù)據(jù)傳輸任務(wù)分為兩種類型,接入任務(wù)與分發(fā)任務(wù)。
- 接入任務(wù),負(fù)責(zé)從數(shù)據(jù)源實(shí)時(shí)接入 Changelog Stream 并處理成統(tǒng)一的格式寫入 Kafka 中,每個(gè)表只會(huì)對(duì)用唯一個(gè)接入程序,作為公共數(shù)據(jù)資產(chǎn),被下游程序進(jìn)行使用和消費(fèi)
- 分發(fā)任務(wù), 負(fù)責(zé)讀取公共的 Kafka 數(shù)據(jù),并將數(shù)據(jù)寫入指定的存儲(chǔ)中,用戶根據(jù)自己的需求去使用,擁有分發(fā)任務(wù)的所有權(quán)
如圖所示,接入的數(shù)據(jù)源主要有 3 種,除了 Mysql 和 SqlServer,我們還支持了 TiDB 的 Changelog(TiCDC)接入 Java Client 相關(guān)邏輯,并將我們的代碼貢獻(xiàn)到了 TiDB 社區(qū) [1];對(duì)于分發(fā)端,通過解析用戶的任務(wù)配置,從而進(jìn)行 SQL codegen 生成 Flink SQL 代碼執(zhí)行。
四、基于 Flink 的 Binlog 接入 SDK
在這些接入和分發(fā) SDK 中,Binlog 接入 SDK 是比較有難度的一個(gè),下面我們以 Binlog 接入 SDK 為例,剖析接入 SDK 的主體設(shè)計(jì)思路和開發(fā)過程。
Stage 拆解
依照 Flink 經(jīng)典的 Source->Transformation->Sink,Binlog 接入任務(wù)也拆分為這三個(gè)Stage:
Binlog Source
Binlog Source 的樸素開發(fā)思路:創(chuàng)建一個(gè) BinaryLogClient 并持續(xù) fetchBinlogEvent 并進(jìn)行簡(jiǎn)單的轉(zhuǎn)換處理后發(fā)送到下游。在既定的設(shè)計(jì)目標(biāo)中,以下問題需要認(rèn)真思考:
對(duì)于問題1,考慮到 Binlog Stream 的特殊性,我們要求 Source 的并行度為且僅能為1。且在絕大部分情況下,從 BinaryLogClient fetch BinlogEvent 不會(huì)是性能瓶頸。我們只要保證 BinaryLogClient 與 BinlogSourceFunction 的生命周期一致,二者通過有界的阻塞隊(duì)列鏈接,分別充當(dāng)生產(chǎn)者和消費(fèi)者,同時(shí) BinlogSourceFunction 對(duì) BinlogEvent 盡可能少的進(jìn)行邏處理,讓 BinlogSourceFunction 的負(fù)擔(dān)盡量減輕,從而提升 Source 階段的性能即可。
而對(duì)于問題 2、3,則需要從 Binlog 的特性和格式來分析。眾所周知,BinlogEvent 攜帶了唯一的 BinlogPosition。BinlogPosition 是全序的,我們可以在 trigger Checkpoint 的時(shí)候,對(duì)當(dāng)前的 BinlogPosition 進(jìn)行記錄。但是僅僅是記錄這個(gè)是不夠的,如果記錄了數(shù)據(jù)位置,那么下次從 Checkpoint 恢復(fù)的時(shí)候,是從當(dāng)條記錄開始還是當(dāng)條記錄的下一條記錄開始呢?另一方面,我們希望發(fā)送的按照一個(gè)完整的 transaction 去發(fā)送數(shù)據(jù)給下游而非從事務(wù)中間截?cái)喟l(fā)送。這里,我們就要用到 BinlogEvent 的一種特定事件——TransactionEnd 事件。
我們這里先來解決問題 2,我們要求 BinlogSourceFunction 只使用 TransactionEnd 事件的 BinlogPosition 來更新位點(diǎn)保存到狀態(tài)中,由于 TransactionEnd 事件不是 DML 事件,不會(huì)導(dǎo)致下游生成數(shù)據(jù),所以就不需要考慮之前提到的問題。
而問題3的解決需要和 Flink的Checkpoint 機(jī)制進(jìn)行聯(lián)動(dòng)。我們當(dāng)時(shí)使用的 Flink 版本是 1.9.x。在 Source 端,需要通過 CheckpointLock 來讓 Source 和 Checkpoint trigger 進(jìn)行配合。雖然在理解和使用上有一定的壁壘,但是 CheckppointLock 機(jī)制恰恰幫助我們達(dá)成了問題 3 的目標(biāo)。我們保證了 Source 只有拿到 lock 才發(fā)送數(shù)據(jù)給下游,只有在完成一次 transaction 的數(shù)據(jù)發(fā)送后才 unlock,這樣就保證了 2 個(gè) checkpoint 之間必定是完整的 𝒳( 𝒳 ∈ N )次 transaction 的數(shù)據(jù)。另一方面,我們減小了 checkpoint trigger 的間隔(200ms~500ms),減少了 checkpoint 間的數(shù)據(jù) transaction 的數(shù)量,加快數(shù)據(jù) commit 的速度。
UnifiedFormatTransform
就如名字描述的,UnifiedFormatTransform 的作用是將數(shù)據(jù)轉(zhuǎn)換為統(tǒng)一制定的數(shù)據(jù)格式。
相較于 Binlog Source 階段,UnifiedFormatTransform 階段不用太過擔(dān)心性能問題,良好的編碼和水平垂直擴(kuò)容能力可以應(yīng)付絕大部分性能需求。但是有一個(gè)重要的問題亟待解決,就是前面提到的功能設(shè)計(jì)目標(biāo):完全防御 DDL 帶來的問題。
DDL 問題在數(shù)據(jù)同步/傳輸中一直是一個(gè)比較棘手的問題,帶來的麻煩包括不限于,數(shù)據(jù)解析失敗/錯(cuò)誤,程序失敗/重啟,且恢復(fù)的成本往往很高。而其實(shí)解決這個(gè)問題的核心思路也很簡(jiǎn)單,就是在程序中就地解析 DDL 并處理 Schema 變化。為了實(shí)現(xiàn)這個(gè)功能,我們需要完成以下幾個(gè)步驟:
- 內(nèi)嵌 Parser,用于解析 DDL SQL
- 解析出現(xiàn)的所有 DDL,根據(jù)解析的 DDL 內(nèi)容更新內(nèi)置的 Schema,并更新到 Flink 狀態(tài)中
- 生成 DDL 對(duì)應(yīng)的數(shù)據(jù)發(fā)送到下游
我們這實(shí)現(xiàn)上參考了 Maxwell [2] 的做法,內(nèi)嵌了 Antlr4 的 Mysql 文法的 g4 文件,然后自定義 listener 來完成對(duì) Schema 的更新和 DDL 數(shù)據(jù)的生成,然后 Schema 會(huì)在 Checkpoint 觸發(fā)時(shí)被保存到狀態(tài)中。
完成了就地解決 DDL 的功能后,不論是簡(jiǎn)單的 Alter Table,還是復(fù)雜的 Online DDL,接入程序都可以順利解決,利用狀態(tài)從斷點(diǎn)恢復(fù),也不會(huì)出現(xiàn) Schema 異常的問題。
Kafka Sink
Kafka Sink 階段主要是將轉(zhuǎn)換好的數(shù)據(jù)寫入 Kafka中。Flink 原生為 Kafka Sink 賦予了 Exactly-Once 的能力,而我們也將這個(gè)功能利用起來,和 Source 一起,提供了開箱即用的端到端 Exactly-Once 解決方案。我們保證了 Source 按照完整的 Mysql Transaction 發(fā)送數(shù)據(jù),同時(shí) Sink 按照完整的 Mysql Transaction 將數(shù)據(jù)寫入Kafka,對(duì)于 Transaction 敏感的場(chǎng)景,我們可以開啟 Transactional 消費(fèi)模式,來完成強(qiáng) transaction 語義(而非最終一致性)的數(shù)據(jù)處理。
其他優(yōu)化
此外我們還做了一些優(yōu)化功能:
- gtid 支持與一鍵主從切換
- 程序運(yùn)行信息定期備份到外部存儲(chǔ)
- Binlog 同步任務(wù)相關(guān)的監(jiān)控指標(biāo)覆蓋
五、平臺(tái)使用
用戶在傳輸平臺(tái),只需要完成必要配置的設(shè)定,即可完成傳輸任務(wù)的創(chuàng)建和數(shù)據(jù)的使用,比較簡(jiǎn)單。
接入任務(wù)
對(duì)于接入任務(wù),正如我們前文提到的, 接入任務(wù)產(chǎn)生的數(shù)據(jù)會(huì)被作為公共資產(chǎn)。所以用戶只需要查詢需求的表的數(shù)據(jù)是否已經(jīng)接入,如果已經(jīng)接入,則可以直接申請(qǐng)使用,否則發(fā)起一次表接入申請(qǐng),審批通過后會(huì)由系統(tǒng)自動(dòng)進(jìn)行操作。
分發(fā)任務(wù)
對(duì)于分發(fā)作業(yè),需要用戶進(jìn)行創(chuàng)建,以 Iceberg 分發(fā)任務(wù)為例:
■ 字段篩選
選擇出分發(fā)作業(yè)使用的已經(jīng)接入到平臺(tái)的數(shù)據(jù)源表字段
在選擇一些任務(wù)的運(yùn)行配置(如資源,運(yùn)行環(huán)境)后,就可以創(chuàng)建并運(yùn)行一個(gè)分發(fā)任務(wù),我們可以看到對(duì)應(yīng)唯一一個(gè) Flink 平臺(tái)任務(wù) ID:
此外,我們還提供了豐富的監(jiān)控查詢,元數(shù)據(jù)信息查詢等功能,充分利用了實(shí)時(shí)計(jì)算平臺(tái)的已有組件,實(shí)現(xiàn)了傳輸系統(tǒng)與實(shí)時(shí)計(jì)算系統(tǒng)的緊密結(jié)合。
六、總結(jié)與展望
實(shí)踐證明,我們選擇基于 Flink 進(jìn)行輸出傳輸系統(tǒng)的開發(fā),是個(gè)明智且正確的決定。在最小的開發(fā)成本下,從功能和效率及可維護(hù)性上,完全解決了之前遺留的問題,全面提升了之家接入/分發(fā)/數(shù)據(jù)訂閱的效率和用戶體驗(yàn),也提升了我們?cè)跀?shù)據(jù)傳輸方面的技術(shù)能力。
最近我們?cè)跀?shù)據(jù)湖方向投入了較多的精力,傳輸系統(tǒng)目前也已經(jīng)初步支持?jǐn)?shù)據(jù)接入數(shù)據(jù)湖,未來希望可以不斷完善相關(guān)功能,大幅提升數(shù)據(jù)湖數(shù)據(jù)接入的能力,支持用戶一鍵入湖,加強(qiáng)整個(gè)數(shù)據(jù)體系的整合。
另一方面,我們看到 Flink 新版本提供了許多新功能新工具。例如 FLIP-27 Source 和 OperatorCoordinator,我們希望可以借由這兩個(gè)全新的機(jī)制和工具,繼續(xù)優(yōu)化我們的代碼,拓展相關(guān)功能。對(duì)于新推出 Upsert-Kafka,我們已經(jīng)開始嘗試在 Flink 計(jì)算平臺(tái)上進(jìn)行初步的開發(fā)和整合,希望之后將 Upsert-Kafka 與傳輸系統(tǒng)打通,繼續(xù)擴(kuò)展與豐富實(shí)時(shí)計(jì)算和傳輸?shù)臉I(yè)務(wù)場(chǎng)景!
參考資料:
[1] https://github.com/pingcap/ticdc/pull/804
[2] https://github.com/zendesk/maxwell
作者介紹:
劉首維,本科畢業(yè)于大連理工大學(xué),Apache Flink Contributor,Scala/Akka 重度愛好者,19年加入汽車之家負(fù)責(zé)實(shí)時(shí)計(jì)算平臺(tái)和數(shù)據(jù)傳輸平臺(tái)數(shù)據(jù)的開發(fā)和維護(hù)。
原文鏈接:https://developer.aliyun.com/article/783578?
版權(quán)聲明:本文內(nèi)容由阿里云實(shí)名注冊(cè)用戶自發(fā)貢獻(xiàn),版權(quán)歸原作者所有,阿里云開發(fā)者社區(qū)不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。具體規(guī)則請(qǐng)查看《阿里云開發(fā)者社區(qū)用戶服務(wù)協(xié)議》和《阿里云開發(fā)者社區(qū)知識(shí)產(chǎn)權(quán)保護(hù)指引》。如果您發(fā)現(xiàn)本社區(qū)中有涉嫌抄襲的內(nèi)容,填寫侵權(quán)投訴表單進(jìn)行舉報(bào),一經(jīng)查實(shí),本社區(qū)將立刻刪除涉嫌侵權(quán)內(nèi)容。 與50位技術(shù)專家面對(duì)面20年技術(shù)見證,附贈(zèng)技術(shù)全景圖總結(jié)
以上是生活随笔為你收集整理的汽车之家基于 Flink 的数据传输平台的设计与实践的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 饿了么EMonitor演进史
- 下一篇: 融合趋势下基于 Flink Kylin