日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

canal介绍和使用docker安装canal

發(fā)布時(shí)間:2024/9/30 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 canal介绍和使用docker安装canal 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

文章目錄

  • 概述
    • 1.1 背景
    • 1.2 工作原理
    • 1.4 HA機(jī)制設(shè)計(jì)
    • 1.5 docker上安裝canal
    • 1.6 簡(jiǎn)單使用

概述

1.1 背景

早期阿里巴巴因?yàn)楹贾莺兔绹p機(jī)房部署,存在跨機(jī)房同步的業(yè)務(wù)需求,實(shí)現(xiàn)方式主要是基于業(yè)務(wù) trigger 獲取增量變更。從 2010 年開始,業(yè)務(wù)逐步嘗試數(shù)據(jù)庫日志解析獲取增量變更進(jìn)行同步,由此衍生出了大量的數(shù)據(jù)庫增量訂閱和消費(fèi)業(yè)務(wù)。

基于日志增量訂閱和消費(fèi)的業(yè)務(wù)包括

數(shù)據(jù)庫鏡像
數(shù)據(jù)庫實(shí)時(shí)備份
索引構(gòu)建和實(shí)時(shí)維護(hù)(拆分異構(gòu)索引、倒排索引等)
業(yè)務(wù) cache 刷新
帶業(yè)務(wù)邏輯的增量數(shù)據(jù)處理
當(dāng)前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

1.2 工作原理

MySQL主從復(fù)制原理

MySQL master 將數(shù)據(jù)變更寫入二進(jìn)制日志( binary log, 其中記錄叫做二進(jìn)制日志事件binary log events)

MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)

MySQL slave 重放 relay log 中事件,將數(shù)據(jù)變更反映它自己的數(shù)據(jù)

canal 工作原理

canal 模擬 MySQL slave 的交互協(xié)議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送dump 協(xié)議
MySQL master 收到 dump 請(qǐng)求,開始推送 binary log 給 slave (即 canal )
canal 解析 binary log 對(duì)象(原始為 byte 流)

架構(gòu)

server 代表一個(gè)canal服務(wù),管理多個(gè)instance
instance 偽裝成一個(gè)slave,從mysql dump數(shù)據(jù)
instance模塊:

eventParser (數(shù)據(jù)源接入,模擬slave協(xié)議和master進(jìn)行交互,協(xié)議解析)
eventSink (Parser和Store鏈接器,進(jìn)行數(shù)據(jù)過濾,加工,分發(fā)的工作)
eventStore (數(shù)據(jù)存儲(chǔ))
metaManager (增量訂閱&消費(fèi)信息管理器)

1.4 HA機(jī)制設(shè)計(jì)

canal的高可用HA(High Availability)

為了減少對(duì)mysql dump的請(qǐng)求,要求同一時(shí)間只能有一個(gè)處于running,其他的處于standby狀態(tài)
如下圖所示:

大致步驟:

canal server要啟動(dòng)某個(gè)canal instance時(shí)都先向zookeeper進(jìn)行一次嘗試啟動(dòng)判斷 (實(shí)現(xiàn):創(chuàng)建短暫的節(jié)點(diǎn),誰創(chuàng)建成功就允許誰啟動(dòng))
創(chuàng)建zookeeper節(jié)點(diǎn)成功后,對(duì)應(yīng)的canal server就啟動(dòng)對(duì)應(yīng)的canal instance,沒有創(chuàng)建成功的canal instance就會(huì)處于備用狀態(tài)
一旦zookeeper發(fā)現(xiàn)canal server A創(chuàng)建的節(jié)點(diǎn)消失后,立即通知其他的canal server再次進(jìn)行步驟1的操作,重新選出一個(gè)canal server啟動(dòng)instance.
canal client每次進(jìn)行連接時(shí),會(huì)首先向zookeeper詢問當(dāng)前是誰啟動(dòng)了canal instance,然后和其建立鏈接,一旦鏈接不可用,會(huì)重新嘗試connect

1.5 docker上安裝canal

創(chuàng)建mysql容器

docker run -id --name canal_mysql -v /mnt/canal_mysql:/var/lib/mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 mysql:5.7

安裝vim
需要在MySQL容器中修改配置文件,但是容器中默認(rèn)沒有vim命令,需要進(jìn)行安裝。

直接執(zhí)行命令安裝vim速度會(huì)很慢,因?yàn)槭褂玫氖菄獾脑?#xff0c;需要更新Debian源以提高速度。

#在宿主機(jī)創(chuàng)建sources.list配置文件
vi sources.list
#內(nèi)容為:

deb http://mirrors.tuna.tsinghua.edu.cn/debian/ buster main deb http://mirrors.tuna.tsinghua.edu.cn/debian-security buster/updates main deb http://mirrors.tuna.tsinghua.edu.cn/debian/ buster-updates main

?
#復(fù)制宿主機(jī)的配置到MySQL容器中

docker cp sources.list canal_mysql:/etc/apt/

?
#進(jìn)入MySQL容器

docker exec -it canal_mysql /bin/bash

?
#執(zhí)行安裝命令

apt-get update && apt-get install vim -y

修改MySQL配置
需要讓canal偽裝成salve并正確獲取mysql中的binary log,首先要開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,命令如下:

#修改MySQL配置文件

vim /etc/mysql/mysql.conf.d/mysqld.cnf

#添加的內(nèi)容如下:

log-bin=mysql-bin binlog-format=ROW server_id=1

#開啟binlog 選擇ROW模式
#server_id不要和canal的slaveId重復(fù)
重啟MySQL

docker restart canal_mysql

遠(yuǎn)程登錄MySQL,查看配置狀態(tài),執(zhí)行以下sql:

show variables like 'log_bin'; show variables like 'binlog_format'; show master status;

創(chuàng)建Canal賬號(hào)
創(chuàng)建連接MySQL的賬號(hào)canal并授予作為 MySQL slave 的權(quán)限,執(zhí)行以下sql:

創(chuàng)建賬號(hào)

CREATE USER canal IDENTIFIED BY 'canal';

授予權(quán)限

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

刷新并應(yīng)用

FLUSH PRIVILEGES

創(chuàng)建canal-server容器

docker run -d --name canal-server -p 11111:11111 canal/canal-server:v1.1.4

配置canal-server
#進(jìn)入canal-server容器

docker exec -it canal-server /bin/bash

#編輯canal-server的配置

vi canal-server/conf/example/instance.properties

重啟canal-server

修改完成后重啟canal-server,并查看日志:

#按ctrl+D退出容器,并重啟容器

docker restart canal-server

可以看到現(xiàn)在已經(jīng)有兩個(gè)鏡像在啟動(dòng):

#重啟成功后進(jìn)入容器

docker exec -it canal-server /bin/bash

#查看日志

tail -100f canal-server/logs/example/example.log

1.6 簡(jiǎn)單使用

在數(shù)據(jù)庫服務(wù)中創(chuàng)建數(shù)據(jù)庫canal_test并創(chuàng)建表:

CREATE TABLE `student` (`id` varchar(20) NOT NULL,`name` varchar(50) DEFAULT NULL,`age` int(11) DEFAULT NULL,`sex` varchar(5) DEFAULT NULL,`city` varchar(20) DEFAULT NULL,PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

創(chuàng)建Maven工程canal-demo,在pom.xml中添加依賴:

<dependencies><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency> </dependencies>

編寫代碼獲取canal數(shù)據(jù):

import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; import java.util.concurrent.TimeUnit;public class CanalTest {public static void main(String[] args) {String ip = "自己ip地址";String destination = "example";//創(chuàng)建連接對(duì)象CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, 11111), destination, "", "");//進(jìn)行連接canalConnector.connect();//進(jìn)行訂閱canalConnector.subscribe();int batchSize = 5 * 1024;//使用死循環(huán)不斷的獲取canal信息while (true) {//獲取Message對(duì)象Message message = canalConnector.getWithoutAck(batchSize);long id = message.getId();int size = message.getEntries().size();System.out.println("當(dāng)前監(jiān)控到的binLog消息數(shù)量是:" + size);//判斷是否有數(shù)據(jù)if (id == -1 || size == 0) {//如果沒有數(shù)據(jù),等待1秒try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}} else {//如果有數(shù)據(jù),進(jìn)行數(shù)據(jù)解析List<Entry> entries = message.getEntries();//遍歷獲取到的Entry集合for (Entry entry : entries) {System.out.println("----------------------------------------");System.out.println("當(dāng)前的二進(jìn)制日志的條目(entry)類型是:" + entry.getEntryType());//如果屬于原始數(shù)據(jù)ROWDATA,進(jìn)行打印內(nèi)容if (entry.getEntryType() == EntryType.ROWDATA) {try {//獲取存儲(chǔ)的內(nèi)容RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());//打印事件的類型,增刪改查哪種 eventTypeSystem.out.println("事件類型是:" + rowChange.getEventType());//打印改變的內(nèi)容(增量數(shù)據(jù))for (RowData rowData : rowChange.getRowDatasList()) {System.out.println("改變前的數(shù)據(jù):" + rowData.getBeforeColumnsList());System.out.println("改變后的數(shù)據(jù):" + rowData.getAfterColumnsList());}} catch (Exception e) {e.printStackTrace();}}}//消息確認(rèn)已經(jīng)處理了canalConnector.ack(id);}}} }

總結(jié)

以上是生活随笔為你收集整理的canal介绍和使用docker安装canal的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。