日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink 源码 | 自定义 Format 消费 Maxwell CDC 数据

發布時間:2024/8/23 编程问答 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink 源码 | 自定义 Format 消费 Maxwell CDC 数据 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Flink 1.11 最重要的 Feature —— Hive Streaming 之前已經和大家分享過了,今天就和大家來聊一聊另一個特別重要的功能 —— CDC。

CDC概述

何為CDC?Change Data Capture,將數據庫中的’增’、’改’、’刪’操作記錄下來。在很早之前是通過觸發器來完成記錄,現在通過 binlog+同步中間件來實現。常用的 binlog 同步中間件有很多,比如 Alibaba 開源的 canal[1],Red Hat 開源的debezium[2],Zendesk 開源的 Maxwell[3] 等等。

這些中間件會負責 binlog 的解析,并同步到消息中間件中,我們只需要消費對應的 Topic 即可。

回到 Flink 上,CDC 似乎和我們沒有太大的關聯?其實不然,讓我們更加抽象地來看這個世界。

當我們用 Flink 去消費數據比如 Kafka 時,我們就仿佛在讀一張表,什么表?一張不斷有記錄被插入的表,我們將每一條被插入的數據取出來,完成我們的邏輯。

當插入的每條數據都沒有問題時,一切都很美好。關聯、聚合、輸出。

但當我們發現,某條已經被計算過的數據有問題時,麻煩大了。我們直接改最后的輸出值其實是沒有用的,這次改了,當再來數據觸發計算時,結果還是會被錯誤的數據覆蓋,因為中間計算結果沒有被修改,它仍然是一個錯誤的值。怎么辦?撤回流似乎能解決這個問題,這也確實是解決這個問題的手段,但是問題來了,撤回流怎么確定讀取的數據是要被撤回的?另外,怎么去觸發一次撤回?

CDC 解決了這些:將消息中間件的數據反序列化后,根據 Type 來識別數據是 Insert 還是 Delete;另外,如果大家看過 Flink 源碼,會發現反序列化后的數據類型變了,從 Row 升級為 RowData,RowData 能夠將數據標記為撤回還是插入,這就意味著每個算子能夠判斷出數據到底是需要下發還是撤回。

CDC 的重要性就先說這么多,之后有機會的話,出一篇實時 DQC 的視頻,告訴大家 CDC 的出現,對于實時 DQC 的幫助有多大。下面讓我們回到正題。

既然有那么多 CDC 同步中間件,那么一定會有各種各樣的格式存放在消息中間件中,我們必然需要去解析它們。于是 Flink 1.11 提供了 canal-json 和 debezium-json,但我們用的是 Maxwell 怎么辦?只能等官方出或者說是等有人向社區貢獻嗎?那如果我們用的是自研的同步中間件怎么辦?

所以就有了今天的分享:如何去自定義實現一個 Maxwell format。大家也可以基于此文的思路去實現其他 CDC format,比如 OGG, 或是自研 CDC 工具產生的數據格式。

如何實現

當我們提交任務之后,Flink 會通過 SPI 機制將 classpath 下注冊的所有工廠類加載進來,包括 DynamicTableFactory、DeserializationFormatFactory 等等。而對于 Format 來說,到底使用哪個 DeserializationFormatFactory,是根據 DDL 語句中的 Format 來決定的。通過將 Format 的值與工廠類的 factoryIdentifier() 方法的返回值進行匹配 來確定。

再通過 DeserializationFormatFactory 中的 createDecodingFormat(...) 方法,將反序列化對象提供給 DynamicTableSource。

通過圖來了解整個過程(僅從反序列化數據并消費的角度來看):

想要實現 CDC Format 去解析某種 CDC 工具產生的數據其實很簡單,核心組件其實就三個:

  • 工廠類(DeserializationFormatFactory):負責編譯時根據 ‘format’ = ‘maxwell-json’創建對應的反序列化器。即 MaxwellJsonFormatFactory。
  • 反序列化類(DeserializationSchema):負責運行時的解析,根據固定格式將 CDC 數據轉換成 Flink 系統能認識的 INSERT/DELETE/UPDATE 消息,如 RowData。即 MaxwellJsonDeserializationSchema。
  • Service 注冊文件:需要添加 Service 文件 META-INF/services/org.apache.flink.table.factories.Factory ,并在其中增加一行我們實現的 MaxwellJsonFormatFactory 類路徑。

再通過代碼,來看看反序列化中的細節:

public void deserialize(byte[] message, Collectorout) throws IOException {try {RowData row = jsonDeserializer.deserialize(message);String type = row.getString(2).toString(); // "type" fieldif (OP_INSERT.equals(type)) {RowData insert = row.getRow(0, fieldCount);insert.setRowKind(RowKind.INSERT);out.collect(insert);} else if (OP_UPDATE.equals(type)) {GenericRowData after = (GenericRowData) row.getRow(0, fieldCount); // "data" fieldGenericRowData before = (GenericRowData) row.getRow(1, fieldCount); // "old" fieldfor (int f = 0; f < fieldCount; f++) {if (before.isNullAt(f)) {before.setField(f, after.getField(f));}}before.setRowKind(RowKind.UPDATE_BEFORE);after.setRowKind(RowKind.UPDATE_AFTER);out.collect(before);out.collect(after);} else if (OP_DELETE.equals(type)) {RowData delete = row.getRow(0, fieldCount);delete.setRowKind(RowKind.DELETE);out.collect(delete);} else {if (!ignoreParseErrors) {throw new IOException(format("Unknown \"type\" value \"%s\". The Maxwell JSON message is '%s'", type, new String(message)));}}} catch (Throwable t) {if (!ignoreParseErrors) {throw new IOException(format("Corrupt Maxwell JSON message '%s'.", new String(message)), t);}}}

其實并不復雜:先通過 jsonDeserializer 將字節數組根據 [data: ROW, old: ROW, type: String] 的 schema 反序列化成 RowData,然后根據 “type” 列的值來判斷數據是什么類型:增、改、刪;再根據數據類型取出 “data” 或者 “old” 區的數據,來組裝成 Flink 認識的 INSERT/DELETE/UPDATE 數據并下發。

對象 jsonDeserializer 即 JSON 格式的反序列化器,它可以通過指定的 RowType 類型,讀取 JSON 的字節數組中指定的字段并反序列化成 RowData。在我們的場景中,我們需要去讀取如下 Maxwell 數據的 “data”, “old” 和 “type” 部分的數據。

{"database":"test","table":"product","type":"update","ts":1596684928,"xid":7291,"commit":true,"data":{"id":102,"name":"car battery","description":"12V car battery","weight":5.17},"old":{"weight":8.1}}

因此 MaxwellJsonDeserializationSchema 中定義的 JSON 的 RowType 如下所示。

private RowType createJsonRowType(DataType databaseSchema) {// Maxwell JSON contains other information, e.g. "database", "ts"// but we don't need themreturn (RowType) DataTypes.ROW(DataTypes.FIELD("data", databaseSchema),DataTypes.FIELD("old", databaseSchema),DataTypes.FIELD("type", DataTypes.STRING())).getLogicalType();}

databaseSchema 是用戶通過 DDL 定義的 schema 信息,也對應著數據庫中表的 schema。結合上面的 JSON 和代碼,我們能夠得知 jsonDeserializer 只會取走 byte[] 中 data、old、type 這三個字段對應的值,其中 data 和old 還是個嵌套JSON,它們的 schema 信息和 databaseSchema 一致。由于 Maxwell 在同步數據時,“old”區不包含未被更新的字段,所以 jsonDeserializer 返回后,我們會通過 “data” 區的 RowData 將 old 區的缺失字段補齊。

得到 RowData 之后,會取出 type 字段,然后根據對應的值,會有三種分支:

  • insert:取出 data 中的值,也就是我們通過DDL定義的字段對應的值,再將其標記為 RowKind.INSERT 類型數據,最后下發。
  • update:分別取出 data 和 old 的值,然后循環 old 中每個字段,字段值如果為空說明是未修改的字段,那就用 data 中對應位置字段的值替代;之后將 old 標記為 RowKind.UPDATE_BEFORE 也就意味著 Flink 引擎需要將之前對應的值撤回,data 標記為 RowKind.UPDATE_AFTER 正常下發。
  • delete:取出 data 中的值,標記為 RowKind.DELETE,代表需要撤回。

處理的過程中,如果拋出異常,會根據 DDL 中maxwell-json.ignore-parse-errors的值來確定是忽視這條數據繼續處理下一條數據,還是讓任務報錯。

筆者在 maxwell-json 反序列化功能的基礎之上,還實現了序列化的功能,即能將 Flink 產生的 changelog 以 Maxwell 的 JSON 格式輸出到外部系統中。其實現思路與反序列化器的思路正好相反,更多細節可以參考 Pull Request 中的實現。

PR 實現詳情鏈接:?
https://github.com/apache/flink/pull/13090

功能演示

給大家演示一下從 Kafka 中讀取 Maxwell 推送來的 maxwell json 格式數據,并將聚合后的數據再次寫入 Kafka 后,重新讀出來驗證數據是否正確。

Kafka 數據源表

CREATE TABLE topic_products (-- schema is totally the same to the MySQL "products" tableid BIGINT,name STRING,description STRING,weight DECIMAL(10, 2) ) WITH ( 'connector' = 'kafka', 'topic' = 'maxwell', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'maxwell-json');

Kafka 數據結果表&數據源表

CREATE TABLE topic_sink (name STRING,sum_weight DECIMAL(10, 2) ) WITH ( 'connector' = 'kafka', 'topic' = 'maxwell-sink', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'maxwell-json' );

MySQL 表

-- 注意,這部分 SQL 在 MySQL 中執行,不是 Flink 中的表 CREATE TABLE product ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255), description VARCHAR(512), weight FLOAT ); truncate product ; ALTER TABLE product AUTO_INCREMENT = 101; INSERT INTO product VALUES (default,"scooter","Small 2-wheel scooter",3.14),(default,"car battery","12V car battery",8.1),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8),(default,"hammer","12oz carpenter's hammer",0.75),(default,"hammer","14oz carpenter's hammer",0.875),(default,"hammer","16oz carpenter's hammer",1.0),(default,"rocks","box of assorted rocks",5.3),(default,"jacket","water resistent black wind breaker",0.1),(default,"spare tire","24 inch spare tire",22.2); UPDATE product SET description='18oz carpenter hammer' WHERE id=106; UPDATE product SET weight='5.1' WHERE id=107; INSERT INTO product VALUES (default,"jacket","water resistent white wind breaker",0.2); INSERT INTO product VALUES (default,"scooter","Big 2-wheel scooter ",5.18); UPDATE product SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110; UPDATE product SET weight='5.17' WHERE id=111; DELETE FROM product WHERE id=111; UPDATE product SET weight='5.17' WHERE id=102 or id = 101; DELETE FROM product WHERE id=102 or id = 103;

先看看能不能正常讀取 Kafka 中的 maxwell json 數據。

select * from topic_products;

可以看到,所有字段值都變成了 Update 之后的值,同時,被 Delete 的數據也沒有出現。

接著讓我們再將聚合數據寫入 Kafka。

insert into topic_sink select name,sum(weight) as sum_weight from topic_products group by name;

在 Flink 集群的 Web 頁面也能夠看到任務正確提交,接下來再讓我們把聚合數據查出來。

select * from topic_sink

最后,讓我們查詢一下 MySQL 中的表,來驗證數據是否一致;因為在 Flink 中,我們將 weight 字段定義成 Decimal(10,2),所以我們在查詢 MySQL 的時候,需要將 weight 字段進行類型轉換。

沒有問題,我們的 maxwell json 解析很成功。

寫在最后

根據筆者實現 maxwell-json format 的經驗,Flink 對于接口的定義、對于模塊職責的劃分還是很清晰的,所以實現一個自定義 CDC format 非常簡單(核心代碼只有200多行)。因此,如果你是用的 OGG,或是自研的同步中間件,可以通過本文的思路快速實現一個 CDC format,一起解放你的 CDC 數據!

?

?

原文鏈接
本文為阿里云原創內容,未經允許不得轉載。

總結

以上是生活随笔為你收集整理的Flink 源码 | 自定义 Format 消费 Maxwell CDC 数据的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 狂野欧美性猛交blacked | 毛片一级在线观看 | 亚洲成人aaa | 亚洲女人天堂成人av在线 | 亚洲香蕉av | 麻豆欧美 | 欧美黄色a级大片 | 国产一区二区三区免费在线观看 | 日批在线观看 | 久久九九综合 | 免费性爱视频 | 久视频在线观看 | 国产精品毛片一区 | 伊人网综合网 | 国内精品视频一区二区三区 | 成年人看的毛片 | 亚洲综合五区 | 日批免费观看视频 | 成人自拍视频网 | 伊人久久大香线蕉av一区 | 免费看黄色片网站 | 国产男女猛烈无遮挡免费视频 | 性av网站 | 中文不卡av | 成人亚洲在线 | 影音先锋精品 | 乱子伦一区 | 美女激情av | 国产女人在线观看 | 红桃视频网站 | 天天综合网久久综合网 | 激情综合丁香五月 | 成人动漫在线播放 | 亚洲一二三精品 | 深夜视频一区二区 | 国产偷v| 黄色美女大片 | 色婷婷狠 | 亚洲精品国产精华液 | 亚洲视频1 | 日韩一区在线视频 | 一区二区三区午夜 | 色偷偷综合 | 谁有av网址 | 久久综合丁香 | 亚洲国产精品网站 | 夜夜嗨av一区二区三区四区 | 久久99精品国产麻豆婷婷 | 亚洲国产99| 午夜影视在线观看 | 国产情侣啪啪 | 观看免费av| 国产高清久久 | 成片在线观看 | 亚洲成av人片在线观看无 | 最新av网址在线观看 | 不卡视频在线播放 | av最新| 日本视频中文字幕 | 亚洲一区二区视频网站 | 久久久国产精品久久久 | 精品自拍第一页 | 国产按摩一区二区三区 | 国产九九在线 | 欧美1区2区3区| 一区二区三区日韩在线 | 最近中文字幕在线免费观看 | 日韩欧美精品免费 | 欧洲免费av| 这里只有精品9 | 国内精品视频一区 | 日韩av一区二区在线观看 | 日日夜夜精品 | 色欲国产精品一区二区 | 女性毛片 | avtt中文字幕 | 免费av在线网址 | 九九综合视频 | 麻豆性视频 | 亚洲欧洲成人在线 | 深夜成人福利 | 国产成人精品一区二区在线小狼 | 日本久久精品 | 两根大肉大捧一进一出好爽视频 | 亚洲欧美日韩另类 | 嫩草在线视频 | 国产精品青青草 | 激情小说在线观看 | 亚洲成人播放器 | 亚洲精华国产精华精华液网站 | 男女互操在线观看 | 欧美极品一区二区 | 91成人精品国产刺激国语对白 | 人妻少妇偷人精品久久性色 | 色欧美88888久久久久久影院 | 欧美成人黄色 | 久久久毛片 | 一区二区三区 欧美 | 999久久久久久 |