cdc工具 postgresql_基于 Flink SQL CDC 的实时数据同步方案
作者:伍翀 (云邪)
整理:陳政羽(Flink 社區志愿者)
Flink 1.11 引入了 Flink SQL CDC,CDC 能給我們數據和業務間能帶來什么變化?本文由 Apache Flink PMC,阿里巴巴技術專家伍翀 (云邪)分享,內容將從傳統的數據同步方案,基于 Flink CDC 同步的解決方案以及更多的應用場景和 CDC 未來開發規劃等方面進行介紹和演示。
1、傳統數據同步方案
2、基于 Flink SQL CDC 的數據同步方案(Demo)
3、Flink SQL CDC 的更多應用場景
4、Flink SQL CDC 的未來規劃
直播回顧:
https://www.bilibili.com/video/BV1zt4y1D7kt/
傳統的數據同步方案與 Flink SQL CDC 解決方案
業務系統經常會遇到需要更新數據到多個存儲的需求。例如:一個訂單系統剛剛開始只需要寫入數據庫即可完成業務使用。某天 BI 團隊期望對數據庫做全文索引,于是我們同時要寫多一份數據到 ES 中,改造后一段時間,又有需求需要寫入到 Redis 緩存中。
很明顯這種模式是不可持續發展的,這種雙寫到各個數據存儲系統中可能導致不可維護和擴展,數據一致性問題等,需要引入分布式事務,成本和復雜度也隨之增加。我們可以通過 CDC(Change Data Capture)工具進行解除耦合,同步到下游需要同步的存儲系統。通過這種方式提高系統的穩健性,也方便后續的維護。
Flink SQL CDC 數據同步與原理解析
CDC 全稱是 Change Data Capture ,它是一個比較廣義的概念,只要能捕獲變更的數據,我們都可以稱為 CDC 。業界主要有基于查詢的 CDC 和基于日志的 CDC ,可以從下面表格對比他們功能和差異點。
經過以上對比,我們可以發現基于日志 CDC 有以下這幾種優勢:
· 能夠捕獲所有數據的變化,捕獲完整的變更記錄。在異地容災,數據備份等場景中得到廣泛應用,如果是基于查詢的 CDC 有可能導致兩次查詢的中間一部分數據丟失
· 每次 DML 操作均有記錄無需像查詢 CDC 這樣發起全表掃描進行過濾,擁有更高的效率和性能,具有低延遲,不增加數據庫負載的優勢
· 無需入侵業務,業務解耦,無需更改業務模型
· 捕獲刪除事件和捕獲舊記錄的狀態,在查詢 CDC 中,周期的查詢無法感知中間數據是否刪除
基于日志的 CDC 方案介紹
從 ETL 的角度進行分析,一般采集的都是業務庫數據,這里使用 MySQL 作為需要采集的數據庫,通過 Debezium 把 MySQL Binlog 進行采集后發送至 Kafka 消息隊列,然后對接一些實時計算引擎或者 APP 進行消費后把數據傳輸入 OLAP 系統或者其他存儲介質。
Flink 希望打通更多數據源,發揮完整的計算能力。我們生產中主要來源于業務日志和數據庫日志,Flink 在業務日志的支持上已經非常完善,但是在數據庫日志支持方面在 Flink 1.11 前還屬于一片空白,這就是為什么要集成 CDC 的原因之一。
Flink SQL 內部支持了完整的 changelog 機制,所以 Flink 對接 CDC 數據只需要把CDC 數據轉換成 Flink 認識的數據,所以在 Flink 1.11 里面重構了 TableSource 接口,以便更好支持和集成 CDC。
重構后的 TableSource 輸出的都是 RowData 數據結構,代表了一行的數據。在RowData 上面會有一個元數據的信息,我們稱為 RowKind 。RowKind 里面包括了插入、更新前、更新后、刪除,這樣和數據庫里面的 binlog 概念十分類似。通過 Debezium 采集的 JSON 格式,包含了舊數據和新數據行以及原數據信息,op 的 u表示是 update 更新操作標識符,ts_ms 表示同步的時間戳。因此,對接 Debezium JSON 的數據,其實就是將這種原始的 JSON 數據轉換成 Flink 認識的 RowData。
選擇 Flink 作為 ETL 工具
當選擇 Flink 作為 ETL 工具時,在數據同步場景,如下圖同步結構:
通過 Debezium 訂閱業務庫 MySQL 的 Binlog 傳輸至 Kafka ,Flink 通過創建 Kafka 表指定 format 格式為 debezium-json ,然后通過 Flink 進行計算后或者直接插入到其他外部數據存儲系統,例如圖中的 Elasticsearch 和 PostgreSQL。
但是這個架構有個缺點,我們可以看到采集端組件過多導致維護繁雜,這時候就會想是否可以用 Flink SQL 直接對接 MySQL 的 binlog 數據呢,有沒可以替代的方案呢?
答案是有的!經過改進后結構如下圖:
社區開發了 flink-cdc-connectors 組件,這是一個可以直接從 MySQL、PostgreSQL 等數據庫直接讀取全量數據和增量變更數據的 source 組件。目前也已開源,開源地址:
flink-cdc-connectors 可以用來替換 Debezium+Kafka 的數據采集模塊,從而實現 Flink SQL 采集+計算+傳輸(ETL)一體化,這樣做的優點有以下:
· 開箱即用,簡單易上手
· 減少維護的組件,簡化實時鏈路,減輕部署成本
· 減小端到端延遲
· Flink 自身支持 Exactly Once 的讀取和計算
· 數據不落地,減少存儲成本
· 支持全量和增量流式讀取
· binlog 采集位點可回溯*
基于 Flink SQL CDC 的數據同步方案實踐
下面給大家帶來 3 個關于 Flink SQL + CDC 在實際場景中使用較多的案例。在完成實驗時候,你需要 Docker、MySQL、Elasticsearch 等組件,具體請參考每個案例參考文檔。
案例 1 : Flink SQL CDC + JDBC Connector
這個案例通過訂閱我們訂單表(事實表)數據,通過 Debezium 將 MySQL Binlog 發送至 Kafka,通過維表 Join 和 ETL 操作把結果輸出至下游的 PG 數據庫。具體可以參考 Flink 公眾號文章:《Flink JDBC Connector:Flink 與數據庫集成最佳實踐》案例進行實踐操作。
案例 2 : CDC Streaming ETL
模擬電商公司的訂單表和物流表,需要對訂單數據進行統計分析,對于不同的信息需要進行關聯后續形成訂單的大寬表后,交給下游的業務方使用 ES 做數據分析,這個案例演示了如何只依賴 Flink 不依賴其他組件,借助 Flink 強大的計算能力實時把 Binlog 的數據流關聯一次并同步至 ES 。
例如如下的這段 Flink SQL 代碼就能完成實時同步 MySQL 中 orders 表的全量+增量數據的目的。
CREATE TABLE orders ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'mydb', 'table-name' = 'orders' ); SELECT * FROM orders
為了讓讀者更好地上手和理解,我們還提供了 docker-compose 的測試環境,更詳細的案例教程請參考下文的視頻鏈接和文檔鏈接。
案例 3 : Streaming Changes to Kafka
下面案例就是對 GMV 進行天級別的全站統計。包含插入/更新/刪除,只有付款的訂單才能計算進入 GMV ,觀察 GMV 值的變化。
Flink SQL CDC 的更多應用場景
Flink SQL CDC 不僅可以靈活地應用于實時數據同步場景中,還可以打通更多的場景提供給用戶選擇。
Flink 在數據同步場景中的靈活定位
· 如果你已經有 Debezium/Canal + Kafka 的采集層 (E),可以使用 Flink 作為計算層 (T) 和傳輸層 (L)
· 也可以用 Flink 替代 Debezium/Canal ,由 Flink 直接同步變更數據到 Kafka,Flink 統一 ETL 流程
· 如果不需要 Kafka 數據緩存,可以由 Flink 直接同步變更數據到目的地,Flink 統一 ETL 流程
Flink SQL CDC : 打通更多場景
· 實時數據同步,數據備份,數據遷移,數倉構建
優勢:豐富的上下游(E & L),強大的計算(T),易用的 API(SQL),流式計算低延遲
· 數據庫之上的實時物化視圖、流式數據分析
· 索引構建和實時維護
· 業務 cache 刷新
· 審計跟蹤
· 微服務的解耦,讀寫分離
· 基于 CDC 的維表關聯
下面介紹一下為何用 CDC 的維表關聯會比基于查詢的維表查詢快。
■ 基于查詢的維表關聯
目前維表查詢的方式主要是通過 Join 的方式,數據從消息隊列進來后通過向數據庫發起 IO 的請求,由數據庫把結果返回后合并再輸出到下游,但是這個過程無可避免的產生了 IO 和網絡通信的消耗,導致吞吐量無法進一步提升,就算使用一些緩存機制,但是因為緩存更新不及時可能會導致精確性也沒那么高。
■ 基于 CDC 的維表關聯
我們可以通過 CDC 把維表的數據導入到維表 Join 的狀態里面,在這個 State 里面因為它是一個分布式的 State ,里面保存了 Database 里面實時的數據庫維表鏡像,當消息隊列數據過來時候無需再次查詢遠程的數據庫了,直接查詢本地磁盤的 State ,避免了 IO 操作,實現了低延遲、高吞吐,更精準。
Tips:目前此功能在 1.12 版本的規劃中,具體進度請關注 FLIP-132 。
未來規劃
· FLIP-132 :Temporal Table DDL(基于 CDC 的維表關聯)
· Upsert 數據輸出到 Kafka
· 更多的 CDC formats 支持(debezium-avro, OGG, Maxwell)
· 批模式支持處理 CDC 數據
· flink-cdc-connectors 支持更多數據庫
總結
本文通過對比傳統的數據同步方案與 Flink SQL CDC 方案分享了 Flink CDC 的優勢,與此同時介紹了 CDC 分為日志型和查詢型各自的實現原理。后續案例也演示了關于 Debezium 訂閱 MySQL Binlog 的場景介紹,以及如何通過 flink-cdc-connectors 實現技術整合替代訂閱組件。除此之外,還詳細講解了 Flink CDC 在數據同步、物化視圖、多機房備份等的場景,并重點講解了社區未來規劃的基于 CDC 維表關聯對比傳統維表關聯的優勢以及 CDC 組件工作。
希望通過這次分享,大家對 Flink SQL CDC 能有全新的認識和了解,在未來實際生產開發中,期望 Flink CDC 能帶來更多開發的便捷和更豐富的使用場景。
Q & A
1、GROUP BY 結果如何寫到 Kafka ?
因為 group by 的結果是一個更新的結果,目前無法寫入 append only 的消息隊列中里面去。更新的結果寫入 Kafka 中將在 1.12 版本中原生地支持。在 1.11 版本中,可以通過 flink-cdc-connectors 項目提供的 changelog-json format 來實現該功能,具體見文檔。
2、CDC 是否需要保證順序化消費?
是的,數據同步到 kafka ,首先需要 kafka 在分區中保證有序,同一個 key 的變更數據需要打入到同一個 kafka 的分區里面。這樣 flink 讀取的時候才能保證順序。
總結
以上是生活随笔為你收集整理的cdc工具 postgresql_基于 Flink SQL CDC 的实时数据同步方案的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 39.组合键(复合主键)
- 下一篇: mysql中复合主键指什么作用_MySQ