日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

阿里mysql 二进制_Mysql binlog 之阿里canal

發布時間:2023/12/10 数据库 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 阿里mysql 二进制_Mysql binlog 之阿里canal 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1、What is Canal?

canal [k?'n?l],中文翻譯為 水道/管道/溝渠/運河,主要用途是用于 MySQL 數據庫增量日志數據的訂閱、消費和解析,是阿里巴巴開發并開源的,采用Java語言開發;

歷史背景是早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房數據同步的業務需求,實現方式主要是基于業務 trigger(觸發器) 獲取增量變更。從2010年開始,阿里巴巴逐步嘗試采用解析數據庫日志獲取增量變更進行同步,由此衍生出了canal項目;

Github:https://github.com/alibaba/canal

2、工作原理

傳統MySQL主從復制工作原理

從上層來看,復制分成三步:

MySQL的主從復制將經過如下步驟:

1、當 master 主服務器上的數據發生改變時,則將其改變寫入二進制事件日志文件中;

2、salve 從服務器會在一定時間間隔內對 master 主服務器上的二進制日志進行探測,探測其是否發生過改變,如果探測到 master 主服務器的二進制事件日志發生了改變,則開始一個 I/O Thread 請求 master 二進制事件日志;

3、同時 master 主服務器為每個 I/O Thread 啟動一個dump ?Thread,用于向其發送二進制事件日志;

4、slave 從服務器將接收到的二進制事件日志保存至自己本地的中繼日志文件中;

5、salve 從服務器將啟動 SQL Thread 從中繼日志中讀取二進制日志,在本地重放,使得其數據和主服務器保持一致;

6、最后 I/O Thread 和 SQL Thread 將進入睡眠狀態,等待下一次被喚醒;

canal 工作原理

1、canal 模擬 MySQL slave 的交互協議,把自己偽裝為 MySQL slave,向 MySQL master 發送dump 協議;

2、MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即canal );

3、canal 解析 binary log 對象 (原始數據為byte流)

3、Canal使用場景

Canal是基于MySQL變更日志增量訂閱和消費的組件,可以使用在如下一些一些應用場景:

數據庫實時備份

業務cache刷新

search build

價格變化等重要業務消息

帶業務邏輯的增量數據處理

跨數據庫的數據備份(異構數據同步),

例如mysql => oracle,mysql=>mongo,mysql =>redis,

mysql => elasticsearch等;

當前canal 主要是支持源端 MySQL(也支持mariaDB),版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x;Canal搭建環境

1、準備好MySQL運行環境;

2、開啟 MySQL的binlog寫入功能,配置 binlog-format 為 ROW 模式,my.cnf中配置如下:

[mysqld]

log-bin=mysql-bin #開啟 binlog

binlog-format=ROW #選擇 ROW 模式

server_id=1 #配置MySQL replaction需要定義,不要和canal的 slaveId重復

3、授權canal連接MySQL賬號具有作為MySQL slave的權限, 如果已有賬戶可直接 grant授權:

啟動MySQL服務器;

登錄mysql:./mysql -uroot -p -h127.0.0.1 -P3306

CREATE USER canal IDENTIFIED BY 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

4、下載 canal部署程序

Wget?https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

tar -zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal.deployer-1.1.4

5、配置修改

vim?conf/example/instance.properties

主要是修改配置文件中與自己的數據庫配置相關的信息;

6、啟動Canal

./startup.sh

7、查看進程:

ps -ef | grep canal

8、查看 server 日志

cat logs/canal/canal.log

9、查看 instance 的日志

vi logs/example/example.log

10、關閉Canal

./stop.sh

canal server的默認端口號為:11111,如果需要調整的話,可以去到\conf目錄底下的canal.properties文件中進行修改;

相關命令:

#是否啟用了日志

show variables like 'log_bin';

#怎樣知道當前的日志

show master status;

#查看mysql binlog模式

show variables like 'binlog_format';

#獲取binlog文件列表

show binary logs;

#查看當前正在寫入的binlog文件

show master status\G

#查看指定binlog文件的內容

show binlog events in 'mysql-bin.000002';

注意binlog日志格式要求為row格式;

Binlog的三種基本類型分別為:

ROW模式 除了記錄sql語句之外,還會記錄每個字段的變化情況,能夠清楚的記錄每行數據的變化歷史,但是會占用較多的空間,需要使用mysqlbinlog工具進行查看;

STATEMENT模式只記錄了sql語句,但是沒有記錄上下文信息,在進行數據恢復的時候可能會導致數據的丟失情況;

MIX模式比較靈活的記錄,例如說當遇到了表結構變更的時候,就會記錄為statement模式。當遇到了數據更新或者刪除情況下就會變為row模式;

啟動了canal的server之后,便是基于java的客戶端搭建了;

代碼集成方式:

com.alibaba.otter

canal.client

1.1.4

package?com.unwulian.search.engine.suggestion.service;

import?com.alibaba.otter.canal.client.CanalConnector;

import?com.alibaba.otter.canal.client.CanalConnectors;

import?com.alibaba.otter.canal.protocol.CanalEntry.*;

import?com.alibaba.otter.canal.protocol.Message;

import?java.net.InetSocketAddress;

import?java.util.List;

/**

*?canal測試

*

*?@author?shiye

*?@create?2020-11-30?14:22

*/

public?class?CanalTest?{

public?static?void?main(String[]?args)?{

String?ip?=?"192.168.2.165";

int?port?=?11111;

String?destination?=?"example";

String?username?=?"";

String?password?=?"";

CanalConnector?connector?=?CanalConnectors.newSingleConnector(new?InetSocketAddress(ip,?port),?destination,?username,?password);

try?{

connector.connect();

connector.subscribe(".*\\..*");

//跳轉到上次進行讀取日志的地方

connector.rollback();

while?(true)?{

//獲取指定數量的數據

Message?message?=?connector.getWithoutAck(1);

long?id?=?message.getId();

int?size?=?message.getEntries().size();

if?(id?==?-1?||?size?==?0)?{

//如果沒有獲取到數據就睡眠疫苗

Thread.sleep(1000);

}?else?{

System.out.println("messge?id:"?+?id);

printEntry(message.getEntries());

}

//提交確認

connector.ack(id);

//?connector.rollback(batchId);?//?處理失敗,?回滾數據

}

}?catch?(Exception?e)?{

e.printStackTrace();

}?finally?{

connector.disconnect();

}

}

private?static?void?printEntry(List?entrys)?{

for?(Entry?entry?:?entrys)?{

if?(entry.getEntryType()?==?EntryType.TRANSACTIONBEGIN?||?entry.getEntryType()?==?EntryType.TRANSACTIONEND)?{

continue;

}

RowChange?rowChage?=?null;

try?{

rowChage?=?RowChange.parseFrom(entry.getStoreValue());

}?catch?(Exception?e)?{

throw?new?RuntimeException("ERROR?##?parser?of?eromanga-event?has?an?error?,?data:"?+?entry.toString(),?e);

}

EventType?eventType?=?rowChage.getEventType();

System.out.println(String.format("================>?binlog日志偏移量[%s:%s]?,?庫名,表名[%s,%s]?,?操作類型?:?%s",

entry.getHeader().getLogfileName(),?entry.getHeader().getLogfileOffset(),

entry.getHeader().getSchemaName(),?entry.getHeader().getTableName(),

eventType));

for?(RowData?rowData?:?rowChage.getRowDatasList())?{

if?(eventType?==?EventType.DELETE)?{

printColumn(rowData.getBeforeColumnsList());

}?else?if?(eventType?==?EventType.INSERT)?{

printColumn(rowData.getAfterColumnsList());

}?else?{

System.out.println("------->?before");

printColumn(rowData.getBeforeColumnsList());

System.out.println("------->?after");

printColumn(rowData.getAfterColumnsList());

}

}

}

}

private?static?void?printColumn(List?columns)?{

for?(Column?column?:?columns)?{

System.out.println(column.getName()?+?"?:?"?+?column.getValue()?+?"????update="?+?column.getUpdated());

}

}

}

springboo集成canal#?阿里binlog?canal配置

canal:

ip:?192.168.2.13???#192.168.2.165

subscribe:?undev.t_bas_xxx1,undev.t_bas_xxx2#配置你要監聽的表

port:?11111

destination:?dev

username:

password:

package?com.unwulian.search.engine.suggestion.config;

import?org.springframework.boot.context.properties.ConfigurationProperties;

import?org.springframework.context.annotation.Configuration;

import?java.io.Serializable;

/**

*?binlog?canal的配置

*

*?@author?shiye

*?@create?2020-07-17?19:30

*/

@Configuration

@ConfigurationProperties(prefix?=?"canal")

public?class?CanalConfig?implements?Serializable?{

/**

*?ip

*/

private?String?ip;

/**

*?mq監聽表

*/

private?String?subscribe;

/**

*?端口

*/

private?int?port;

/**

*?目的地

*/

private?String?destination;

/**

*?用戶名

*/

private?String?username?=?"";

/**

*?密碼

*/

private?String?password;

public?String?getSubscribe()?{

return?subscribe;

}

public?void?setSubscribe(String?subscribe)?{

this.subscribe?=?subscribe;

}

public?String?getIp()?{

return?ip;

}

public?void?setIp(String?ip)?{

this.ip?=?ip;

}

public?int?getPort()?{

return?port;

}

public?void?setPort(int?port)?{

this.port?=?port;

}

public?String?getDestination()?{

return?destination;

}

public?void?setDestination(String?destination)?{

this.destination?=?destination;

}

public?String?getUsername()?{

return?username;

}

public?void?setUsername(String?username)?{

this.username?=?username;

}

public?String?getPassword()?{

return?password;

}

public?void?setPassword(String?password)?{

this.password?=?password;

}

}

package?com.unwulian.search.engine.suggestion.schedule;

import?com.alibaba.otter.canal.client.CanalConnector;

import?com.alibaba.otter.canal.client.CanalConnectors;

import?com.alibaba.otter.canal.protocol.CanalEntry;

import?com.alibaba.otter.canal.protocol.CanalEntry.EntryType;

import?com.alibaba.otter.canal.protocol.CanalEntry.RowChange;

import?com.alibaba.otter.canal.protocol.Message;

import?com.github.structlog4j.ILogger;

import?com.github.structlog4j.SLoggerFactory;

import?com.unwulian.search.engine.suggestion.config.CanalConfig;

import?com.unwulian.search.engine.suggestion.service.CardService;

import?com.unwulian.search.engine.suggestion.service.CommunityStructService;

import?com.unwulian.search.engine.suggestion.service.HouseService;

import?com.unwulian.search.engine.suggestion.service.RoomService;

import?org.springframework.beans.factory.InitializingBean;

import?org.springframework.beans.factory.annotation.Autowired;

import?org.springframework.stereotype.Component;

import?java.net.InetSocketAddress;

import?java.util.List;

/**

*?項目啟動的時候就初始化canal啟動一個線程去監聽canal?server

*

*?@author?shiye

*?@create?2020-11-30?15:11

*/

@Component

public?class?CanalTask?implements?InitializingBean?{

private?static?final?ILogger?logger?=?SLoggerFactory.getLogger(CanalTask.class);

@Autowired

private?CanalConfig?canalConfig;

@Override

public?void?afterPropertiesSet()?throws?Exception?{

/**

*?啟動一下線程一直監聽canal?server

*/

new?Thread(()?->?{

logger.info("start?Thread?to?listent?canal?success....");

CanalConnector?connector?=?CanalConnectors.newSingleConnector(new?InetSocketAddress(canalConfig.getIp(),?canalConfig.getPort()),

canalConfig.getDestination(),

canalConfig.getUsername(),

canalConfig.getPassword());

connector.connect();

connector.subscribe(canalConfig.getSubscribe());

//跳轉到上次進行讀取日志的地方

connector.rollback();

try?{

while?(true)?{

//獲取指定數量的數據

Message?message?=?connector.getWithoutAck(1);

long?id?=?message.getId();

int?size?=?message.getEntries().size();

if?(id?==?-1?||?size?==?0)?{

//如果沒有獲取到數據就睡眠1s

try?{

Thread.sleep(1000);

}?catch?(InterruptedException?e)?{

logger.error("sleep?1000ms?error...."?+?e.getMessage());

}

}?else?{

//處理消息

//logger.info("messge?id:"?+?id);

handlerEntry(message.getEntries());

}

//提交確認

connector.ack(id);

//?connector.rollback(batchId);?//?處理失敗,?回滾數據

}

}?finally?{

//關閉

connector.disconnect();

}

}).start();

logger.info("start?Thread?to?listent?canal?....");

}

/**

*?處理消息

*

*?@param?entrys

*/

private?void?handlerEntry(List?entrys)?{

for?(CanalEntry.Entry?entry?:?entrys)?{

if?(entry.getEntryType()?==?EntryType.TRANSACTIONBEGIN?||?entry.getEntryType()?==?EntryType.TRANSACTIONEND)?{

//類型是事務開始事務結束不做處理

continue;

}

//庫名

//String?databaseName?=?entry.getHeader().getSchemaName();

//表名

String?tableName?=?entry.getHeader().getTableName();

RowChange?rowChage?=?null;

try?{

rowChage?=?RowChange.parseFrom(entry.getStoreValue());

}?catch?(Exception?e)?{

logger.error("ERROR?數據轉換異常,?data:"?+?entry.toString(),?e);

}

switch?(tableName)?{

case?"t_bas_xxx1":

//進行你的業務處理

break;

case?"t_bas_xxx2":

//進行你的業務處理

break;

default:

return;

}

}

}

private?static?void?printColumn(List?columns)?{

for?(CanalEntry.Column?column?:?columns)?{

System.out.println(column.getName()?+?"?:?"?+?column.getValue()?+?"????不做處理="?+?column.getUpdated());

}

}

}

總結

以上是生活随笔為你收集整理的阿里mysql 二进制_Mysql binlog 之阿里canal的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。