日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

使用 Binlog 和 Canal 从 MySQL 抽取数据

發布時間:2025/3/21 数据库 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用 Binlog 和 Canal 从 MySQL 抽取数据 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

數據抽取是 ETL 流程的第一步。我們會將數據從 RDBMS 或日志服務器等外部系統抽取至數據倉庫,進行清洗、轉換、聚合等操作。在現代網站技術棧中,MySQL 是最常見的數據庫管理系統,我們會從多個不同的 MySQL 實例中抽取數據,存入一個中心節點,或直接進入 Hive。市面上已有多種成熟的、基于 SQL 查詢的抽取軟件,如著名的開源項目?Apache Sqoop,然而這些工具并不支持實時的數據抽取。MySQL Binlog 則是一種實時的數據流,用于主從節點之間的數據復制,我們可以利用它來進行數據抽取。借助阿里巴巴開源的?Canal?項目,我們能夠非常便捷地將 MySQL 中的數據抽取到任意目標存儲中。

Canal 的組成部分

簡單來說,Canal 會將自己偽裝成 MySQL 從節點(Slave),并從主節點(Master)獲取 Binlog,解析和貯存后供下游消費端使用。Canal 包含兩個組成部分:服務端和客戶端。服務端負責連接至不同的 MySQL 實例,并為每個實例維護一個事件消息隊列;客戶端則可以訂閱這些隊列中的數據變更事件,處理并存儲到數據倉庫中。下面我們來看如何快速搭建起一個 Canal 服務。

配置 MySQL 主節點

MySQL 默認沒有開啟 Binlog,因此我們需要對?my.cnf?文件做以下修改:

server-id = 1 log_bin = /path/to/mysql-bin.log binlog_format = ROW
  • 1
  • 2
  • 3

注意?binlog_format?必須設置為?ROW, 因為在?STATEMENT?或?MIXED?模式下, Binlog 只會記錄和傳輸 SQL 語句(以減少日志大小),而不包含具體數據,我們也就無法保存了。

從節點通過一個專門的賬號連接主節點,這個賬號需要擁有全局的?REPLICATION?權限。我們可以使用?GRANT?命令創建這樣的賬號:

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
  • 1
  • 2

啟動 Canal 服務端

從 GitHub 項目發布頁中下載 Canal 服務端代碼(鏈接),配置文件在?conf?文件夾下,有以下目錄結構:

canal.deployer/conf/canal.properties canal.deployer/conf/instanceA/instance.properties canal.deployer/conf/instanceB/instance.properties
  • 1
  • 2
  • 3

conf/canal.properties?是主配置文件,如其中的?canal.port?用以指定服務端監聽的端口。instanceA/instance.properties?則是各個實例的配置文件,主要的配置項有:

# slaveId 不能與 my.cnf 中的 server-id 項重復 canal.instance.mysql.slaveId = 1234 canal.instance.master.address = 127.0.0.1:3306 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.connectionCharset = UTF-8 # 訂閱實例中所有的數據庫和表 canal.instance.filter.regex = .*\\..*
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

執行?sh bin/startup.sh?命令開啟服務端,在日志文件?logs/example/example.log?中可以看到以下輸出:

Loading properties file from class path resource [canal.properties] Loading properties file from class path resource [example/instance.properties] start CannalInstance for 1-example [destination = example , address = /127.0.0.1:3306 , EventParser] prepare to find start position just show master status
  • 1
  • 2
  • 3
  • 4

編寫 Canal 客戶端

從服務端消費變更消息時,我們需要創建一個 Canal 客戶端,指定需要訂閱的數據庫和表,并開啟輪詢。

首先,在項目中添加?com.alibaba.otter:canal.client?依賴項,構建?CanalConnector?實例:

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");connector.connect(); connector.subscribe(".*\\..*");while (true) {Message message = connector.getWithoutAck(100);long batchId = message.getId();if (batchId == -1 || message.getEntries().isEmpty()) {Thread.sleep(3000);} else {printEntries(message.getEntries());connector.ack(batchId);} }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

這段代碼和連接消息系統很相似。變更事件會批量發送過來,待處理完畢后我們可以 ACK 這一批次,從而避免消息丟失。

// printEntries RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); for (RowData rowData : rowChange.getRowDatasList()) {if (rowChange.getEventType() == EventType.INSERT) {printColumns(rowData.getAfterCollumnList());} }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

每一個?Entry?代表一組具有相同變更類型的數據列表,如 INSERT 類型、UPDATE、DELETE 等。每一行數據我們都可以獲取到各個字段的信息:

// printColumns String line = columns.stream().map(column -> column.getName() + "=" + column.getValue()).collect(Collectors.joining(",")); System.out.println(line);
  • 1
  • 2
  • 3
  • 4
  • 5

完整代碼可以在 GitHub 中找到(鏈接)。

加載至數據倉庫

關系型數據庫與批量更新

若數據倉庫是基于關系型數據庫的,我們可以直接使用?REPLACE?語句將數據變更寫入目標表。其中需要注意的是寫入性能,在更新較頻繁的場景下,我們通常會緩存一段時間的數據,并批量更新至數據庫,如:

REPLACE INTO `user` (`id`, `name`, `age`, `updated`) VALUES (1, 'Jerry', 30, '2017-08-12 16:00:00'), (2, 'Mary', 28, '2017-08-12 17:00:00'), (3, 'Tom', 36, '2017-08-12 18:00:00');
  • 1
  • 2
  • 3
  • 4

另一種方式是將數據變更寫入按分隔符分割的文本文件,并用?LOAD DATA?語句載入數據庫。這些文件也可以用在需要寫入 Hive 的場景中。不管使用哪一種方法,請一定注意要對字符串類型的字段進行轉義,避免導入時出錯。

基于 Hive 的數據倉庫

Hive 表保存在 HDFS 上,該文件系統不支持修改,因此我們需要一些額外工作來寫入數據變更。常用的方式包括:JOIN、Hive 事務、或改用 HBase。

數據可以歸類成基礎數據和增量數據。如昨日的?user?表是基礎數據,今日變更的行是增量數據。通過?FULL OUTER JOIN,我們可以將基礎和增量數據合并成一張最新的數據表,并作為明天的基礎數據:

SELECTCOALESCE(b.`id`, a.`id`) AS `id`,COALESCE(b.`name`, a.`name`) AS `name`,COALESCE(b.`age`, a.`age`) AS `age`,COALESCE(b.`updated`, a.`updated`) AS `updated` FROM dw_stage.`user` a FULL OUTER JOIN (-- 增量數據會包含重復數據,因此需要選擇最新的那一條SELECT `id`, `name`, `age`, `updated`FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `updated` DESC) AS `n`FROM dw_stage.`user_delta`) bWHERE `n` = 1 ) b ON a.`id` = b.`id`;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

Hive 0.13 引入了事務和 ACID 表,0.14 開始支持?INSERT、UPDATE、DELETE?語句,Hive 2.0.0 則又新增了?Streaming Mutation API,用以通過編程的方式批量更新 Hive 表中的記錄。目前,ACID 表必須使用 ORC 文件格式進行存儲,且須按主鍵進行分桶(Bucket)。Hive 會將變更記錄保存在增量文件中,當?OrcInputFormat?讀取數據時會自動定位到最新的那條記錄。官方案例可以在這個鏈接中查看。

最后,我們可以使用 HBase 來實現表數據的更新,它是一種 KV 存儲系統,同樣基于 HDFS。HBase 的數據可以直接為 MapReduce 腳本使用,且 Hive 中可以創建外部映射表指向 HBase。更多信息請查看官方網站。

初始化數據

數據抽取通常是按需進行的,在新增一張表時,數據源中可能已經有大量原始記錄了。常見的做法是手工將這批數據全量導入至目標表中,但我們也可以復用 Canal 這套機制來實現歷史數據的抽取。

首先,我們在數據源庫中創建一張輔助表:

CREATE TABLE `retl_buffer` (id BIGINT AUTO_INCREMENT PRIMARY KEY,table_name VARCHAR(255),pk_value VARCHAR(255) );
  • 1
  • 2
  • 3
  • 4
  • 5

當需要全量抽取?user?表時,我們執行以下語句,將所有?user.id?寫入輔助表中:

INSERT INTO `retl_buffer` (`table_name`, `pk_value`) SELECT 'user', `id` FROM `user`;
  • 1
  • 2

Canal 客戶端在處理到?retl_buffer?表的數據變更時,可以從中解析出表名和主鍵的值,直接反查數據源,將數據寫入目標表:

if ("retl_buffer".equals(entry.getHeader().getTableName())) {String tableName = rowData.getAfterColumns(1).getValue();String pkValue = rowData.getAfterColumns(2).getValue();System.out.println("SELECT * FROM " + tableName + " WHERE id = " + pkValue); }
  • 1
  • 2
  • 3
  • 4
  • 5

這一方法在阿里巴巴的另一個開源軟件?Otter?中使用。

Canal 高可用

  • Canal 服務端中的實例可以配置一個備用 MySQL,從而能夠在雙 Master 場景下自動選擇正在工作的數據源。注意兩臺主庫都需要打開?log_slave_updates?選項。Canal 會使用自己的心跳機制(定期更新輔助表的記錄)來檢測主庫的存活。
  • Canal 自身也有 HA 配置,配合 Zookeeper,我們可以開啟多個 Canal 服務端,當某臺服務器宕機時,客戶端可以從 ZK 中獲取新的服務端地址,繼續進行消費。更多信息可以參考?Canal AdminGuide。

參考資料

  • https://github.com/alibaba/canal/wiki
  • https://github.com/alibaba/otter/wiki
  • https://www.phdata.io/4-strategies-for-updating-hive-tables/
  • https://hortonworks.com/blog/four-step-strategy-incremental-updates-hive/
  • https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions
from:?http://blog.csdn.net/zjerryj/article/details/77152226

總結

以上是生活随笔為你收集整理的使用 Binlog 和 Canal 从 MySQL 抽取数据的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 在线观看视频福利 | 九九九九九热 | 成人免费在线网站 | 伊人色网 | 国产资源在线看 | 一本到免费视频 | 色呦呦国产精品 | 久久色网 | 亚洲一区二区三区三州 | 五月婷婷一区二区 | 少妇xxxxxx| 国产又爽又黄的视频 | 羞羞羞网站 | 日本嫩草影院 | 欧美黄色免费大片 | 久久久久久久麻豆 | 成人免费毛片东京热 | 91综合网 | 欧美一区二区三区久久久 | av免费一区 | 国产精品夫妻自拍 | 久久国产小视频 | 最新天堂中文在线 | 色综合视频网 | 中文天堂在线观看 | 乌克兰做爰xxxⅹ性视频 | 日韩视频福利 | 日韩精品免费一区二区夜夜嗨 | 日韩av片在线免费观看 | 91香蕉在线视频 | 美女扒开尿口给男人看 | 琪琪色18 | 欧美精品一区二区三区久久久 | 成人在线免费电影 | 日韩美女黄色 | 1区2区视频| 无码久久精品国产亚洲av影片 | 国产成人99久久亚洲综合精品 | 国产免费一区二区三区视频 | xxxxx亚洲 | 三级视频网站 | 国产成人短视频在线观看 | 欧美一区亚洲一区 | 向日葵视频在线 | 日韩精品一区二区三区四区五区 | 久久久久9999 | 黄色一级二级 | 激情综合一区二区三区 | 北岛玲在线 | 成人h动漫精品一区二 | 怡红院av亚洲一区二区三区h | 懂色中文一区二区在线播放 | 国产三级av在线播放 | 国产精品91在线 | 99re8在线精品视频免费播放 | 亚洲欧洲免费无码 | 日本国产视频 | 欧美福利网 | 久久视奸 | 中文字幕一区二区三区久久久 | 青青草综合 | 黄瓜视频在线播放 | 好吊妞视频在线 | 国产永久免费视频 | 国产一区二区三区福利 | 亚洲自拍色图 | 成人里番精品一区二区 | 少妇人妻偷人精品无码视频新浪 | 中文字幕一区二区不卡 | 亚洲国产一区二区三区四区 | 打屁屁日本xxxxx变态 | 久久在线免费视频 | 久久综合久久88 | 91免费看大片 | 污污的视频软件 | 国产xxxxx| 亚洲精品久久久久久久蜜桃臀 | 97在线视频观看 | 日韩视频在线视频 | 蜜桃成人在线视频 | 六月丁香激情综合 | 久久青青热| www.av88| 露脸啪啪清纯大学生美女 | 精品一区二区亚洲 | 成人日韩在线观看 | 亚洲视频高清 | 国产91丝袜在线播放九色 | 黄色a级在线观看 | 毛片网站免费 | 韩国三级av | 五月天综合视频 | 欧美色图19p | 麻豆福利在线 | 成人性做爰aaa片免费 | 黄色高清网站 | 欧美色涩在线第一页 | 亚洲麻豆av| 成人av免费 |