阿里mysql 二进制_Mysql binlog 之阿里canal
1、What is Canal?
canal [k?'n?l],中文翻譯為 水道/管道/溝渠/運河,主要用途是用于 MySQL 數據庫增量日志數據的訂閱、消費和解析,是阿里巴巴開發并開源的,采用Java語言開發;
歷史背景是早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房數據同步的業務需求,實現方式主要是基于業務 trigger(觸發器) 獲取增量變更。從2010年開始,阿里巴巴逐步嘗試采用解析數據庫日志獲取增量變更進行同步,由此衍生出了canal項目;
Github:https://github.com/alibaba/canal
2、工作原理
傳統MySQL主從復制工作原理
從上層來看,復制分成三步:
MySQL的主從復制將經過如下步驟:
1、當 master 主服務器上的數據發生改變時,則將其改變寫入二進制事件日志文件中;
2、salve 從服務器會在一定時間間隔內對 master 主服務器上的二進制日志進行探測,探測其是否發生過改變,如果探測到 master 主服務器的二進制事件日志發生了改變,則開始一個 I/O Thread 請求 master 二進制事件日志;
3、同時 master 主服務器為每個 I/O Thread 啟動一個dump ?Thread,用于向其發送二進制事件日志;
4、slave 從服務器將接收到的二進制事件日志保存至自己本地的中繼日志文件中;
5、salve 從服務器將啟動 SQL Thread 從中繼日志中讀取二進制日志,在本地重放,使得其數據和主服務器保持一致;
6、最后 I/O Thread 和 SQL Thread 將進入睡眠狀態,等待下一次被喚醒;
canal 工作原理
1、canal 模擬 MySQL slave 的交互協議,把自己偽裝為 MySQL slave,向 MySQL master 發送dump 協議;
2、MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即canal );
3、canal 解析 binary log 對象 (原始數據為byte流)
3、Canal使用場景
Canal是基于MySQL變更日志增量訂閱和消費的組件,可以使用在如下一些一些應用場景:
數據庫實時備份
業務cache刷新
search build
價格變化等重要業務消息
帶業務邏輯的增量數據處理
跨數據庫的數據備份(異構數據同步),
例如mysql => oracle,mysql=>mongo,mysql =>redis,
mysql => elasticsearch等;
當前canal 主要是支持源端 MySQL(也支持mariaDB),版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x;Canal搭建環境
1、準備好MySQL運行環境;
2、開啟 MySQL的binlog寫入功能,配置 binlog-format 為 ROW 模式,my.cnf中配置如下:
[mysqld]
log-bin=mysql-bin #開啟 binlog
binlog-format=ROW #選擇 ROW 模式
server_id=1 #配置MySQL replaction需要定義,不要和canal的 slaveId重復
3、授權canal連接MySQL賬號具有作為MySQL slave的權限, 如果已有賬戶可直接 grant授權:
啟動MySQL服務器;
登錄mysql:./mysql -uroot -p -h127.0.0.1 -P3306
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
4、下載 canal部署程序
Wget?https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
tar -zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal.deployer-1.1.4
5、配置修改
vim?conf/example/instance.properties
主要是修改配置文件中與自己的數據庫配置相關的信息;
6、啟動Canal
./startup.sh
7、查看進程:
ps -ef | grep canal
8、查看 server 日志
cat logs/canal/canal.log
9、查看 instance 的日志
vi logs/example/example.log
10、關閉Canal
./stop.sh
canal server的默認端口號為:11111,如果需要調整的話,可以去到\conf目錄底下的canal.properties文件中進行修改;
相關命令:
#是否啟用了日志
show variables like 'log_bin';
#怎樣知道當前的日志
show master status;
#查看mysql binlog模式
show variables like 'binlog_format';
#獲取binlog文件列表
show binary logs;
#查看當前正在寫入的binlog文件
show master status\G
#查看指定binlog文件的內容
show binlog events in 'mysql-bin.000002';
注意binlog日志格式要求為row格式;
Binlog的三種基本類型分別為:
ROW模式 除了記錄sql語句之外,還會記錄每個字段的變化情況,能夠清楚的記錄每行數據的變化歷史,但是會占用較多的空間,需要使用mysqlbinlog工具進行查看;
STATEMENT模式只記錄了sql語句,但是沒有記錄上下文信息,在進行數據恢復的時候可能會導致數據的丟失情況;
MIX模式比較靈活的記錄,例如說當遇到了表結構變更的時候,就會記錄為statement模式。當遇到了數據更新或者刪除情況下就會變為row模式;
啟動了canal的server之后,便是基于java的客戶端搭建了;
代碼集成方式:
com.alibaba.otter
canal.client
1.1.4
package?com.unwulian.search.engine.suggestion.service;
import?com.alibaba.otter.canal.client.CanalConnector;
import?com.alibaba.otter.canal.client.CanalConnectors;
import?com.alibaba.otter.canal.protocol.CanalEntry.*;
import?com.alibaba.otter.canal.protocol.Message;
import?java.net.InetSocketAddress;
import?java.util.List;
/**
*?canal測試
*
*?@author?shiye
*?@create?2020-11-30?14:22
*/
public?class?CanalTest?{
public?static?void?main(String[]?args)?{
String?ip?=?"192.168.2.165";
int?port?=?11111;
String?destination?=?"example";
String?username?=?"";
String?password?=?"";
CanalConnector?connector?=?CanalConnectors.newSingleConnector(new?InetSocketAddress(ip,?port),?destination,?username,?password);
try?{
connector.connect();
connector.subscribe(".*\\..*");
//跳轉到上次進行讀取日志的地方
connector.rollback();
while?(true)?{
//獲取指定數量的數據
Message?message?=?connector.getWithoutAck(1);
long?id?=?message.getId();
int?size?=?message.getEntries().size();
if?(id?==?-1?||?size?==?0)?{
//如果沒有獲取到數據就睡眠疫苗
Thread.sleep(1000);
}?else?{
System.out.println("messge?id:"?+?id);
printEntry(message.getEntries());
}
//提交確認
connector.ack(id);
//?connector.rollback(batchId);?//?處理失敗,?回滾數據
}
}?catch?(Exception?e)?{
e.printStackTrace();
}?finally?{
connector.disconnect();
}
}
private?static?void?printEntry(List?entrys)?{
for?(Entry?entry?:?entrys)?{
if?(entry.getEntryType()?==?EntryType.TRANSACTIONBEGIN?||?entry.getEntryType()?==?EntryType.TRANSACTIONEND)?{
continue;
}
RowChange?rowChage?=?null;
try?{
rowChage?=?RowChange.parseFrom(entry.getStoreValue());
}?catch?(Exception?e)?{
throw?new?RuntimeException("ERROR?##?parser?of?eromanga-event?has?an?error?,?data:"?+?entry.toString(),?e);
}
EventType?eventType?=?rowChage.getEventType();
System.out.println(String.format("================>?binlog日志偏移量[%s:%s]?,?庫名,表名[%s,%s]?,?操作類型?:?%s",
entry.getHeader().getLogfileName(),?entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(),?entry.getHeader().getTableName(),
eventType));
for?(RowData?rowData?:?rowChage.getRowDatasList())?{
if?(eventType?==?EventType.DELETE)?{
printColumn(rowData.getBeforeColumnsList());
}?else?if?(eventType?==?EventType.INSERT)?{
printColumn(rowData.getAfterColumnsList());
}?else?{
System.out.println("------->?before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("------->?after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private?static?void?printColumn(List?columns)?{
for?(Column?column?:?columns)?{
System.out.println(column.getName()?+?"?:?"?+?column.getValue()?+?"????update="?+?column.getUpdated());
}
}
}
springboo集成canal#?阿里binlog?canal配置
canal:
ip:?192.168.2.13???#192.168.2.165
subscribe:?undev.t_bas_xxx1,undev.t_bas_xxx2#配置你要監聽的表
port:?11111
destination:?dev
username:
password:
package?com.unwulian.search.engine.suggestion.config;
import?org.springframework.boot.context.properties.ConfigurationProperties;
import?org.springframework.context.annotation.Configuration;
import?java.io.Serializable;
/**
*?binlog?canal的配置
*
*?@author?shiye
*?@create?2020-07-17?19:30
*/
@Configuration
@ConfigurationProperties(prefix?=?"canal")
public?class?CanalConfig?implements?Serializable?{
/**
*?ip
*/
private?String?ip;
/**
*?mq監聽表
*/
private?String?subscribe;
/**
*?端口
*/
private?int?port;
/**
*?目的地
*/
private?String?destination;
/**
*?用戶名
*/
private?String?username?=?"";
/**
*?密碼
*/
private?String?password;
public?String?getSubscribe()?{
return?subscribe;
}
public?void?setSubscribe(String?subscribe)?{
this.subscribe?=?subscribe;
}
public?String?getIp()?{
return?ip;
}
public?void?setIp(String?ip)?{
this.ip?=?ip;
}
public?int?getPort()?{
return?port;
}
public?void?setPort(int?port)?{
this.port?=?port;
}
public?String?getDestination()?{
return?destination;
}
public?void?setDestination(String?destination)?{
this.destination?=?destination;
}
public?String?getUsername()?{
return?username;
}
public?void?setUsername(String?username)?{
this.username?=?username;
}
public?String?getPassword()?{
return?password;
}
public?void?setPassword(String?password)?{
this.password?=?password;
}
}
package?com.unwulian.search.engine.suggestion.schedule;
import?com.alibaba.otter.canal.client.CanalConnector;
import?com.alibaba.otter.canal.client.CanalConnectors;
import?com.alibaba.otter.canal.protocol.CanalEntry;
import?com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import?com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import?com.alibaba.otter.canal.protocol.Message;
import?com.github.structlog4j.ILogger;
import?com.github.structlog4j.SLoggerFactory;
import?com.unwulian.search.engine.suggestion.config.CanalConfig;
import?com.unwulian.search.engine.suggestion.service.CardService;
import?com.unwulian.search.engine.suggestion.service.CommunityStructService;
import?com.unwulian.search.engine.suggestion.service.HouseService;
import?com.unwulian.search.engine.suggestion.service.RoomService;
import?org.springframework.beans.factory.InitializingBean;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.stereotype.Component;
import?java.net.InetSocketAddress;
import?java.util.List;
/**
*?項目啟動的時候就初始化canal啟動一個線程去監聽canal?server
*
*?@author?shiye
*?@create?2020-11-30?15:11
*/
@Component
public?class?CanalTask?implements?InitializingBean?{
private?static?final?ILogger?logger?=?SLoggerFactory.getLogger(CanalTask.class);
@Autowired
private?CanalConfig?canalConfig;
@Override
public?void?afterPropertiesSet()?throws?Exception?{
/**
*?啟動一下線程一直監聽canal?server
*/
new?Thread(()?->?{
logger.info("start?Thread?to?listent?canal?success....");
CanalConnector?connector?=?CanalConnectors.newSingleConnector(new?InetSocketAddress(canalConfig.getIp(),?canalConfig.getPort()),
canalConfig.getDestination(),
canalConfig.getUsername(),
canalConfig.getPassword());
connector.connect();
connector.subscribe(canalConfig.getSubscribe());
//跳轉到上次進行讀取日志的地方
connector.rollback();
try?{
while?(true)?{
//獲取指定數量的數據
Message?message?=?connector.getWithoutAck(1);
long?id?=?message.getId();
int?size?=?message.getEntries().size();
if?(id?==?-1?||?size?==?0)?{
//如果沒有獲取到數據就睡眠1s
try?{
Thread.sleep(1000);
}?catch?(InterruptedException?e)?{
logger.error("sleep?1000ms?error...."?+?e.getMessage());
}
}?else?{
//處理消息
//logger.info("messge?id:"?+?id);
handlerEntry(message.getEntries());
}
//提交確認
connector.ack(id);
//?connector.rollback(batchId);?//?處理失敗,?回滾數據
}
}?finally?{
//關閉
connector.disconnect();
}
}).start();
logger.info("start?Thread?to?listent?canal?....");
}
/**
*?處理消息
*
*?@param?entrys
*/
private?void?handlerEntry(List?entrys)?{
for?(CanalEntry.Entry?entry?:?entrys)?{
if?(entry.getEntryType()?==?EntryType.TRANSACTIONBEGIN?||?entry.getEntryType()?==?EntryType.TRANSACTIONEND)?{
//類型是事務開始事務結束不做處理
continue;
}
//庫名
//String?databaseName?=?entry.getHeader().getSchemaName();
//表名
String?tableName?=?entry.getHeader().getTableName();
RowChange?rowChage?=?null;
try?{
rowChage?=?RowChange.parseFrom(entry.getStoreValue());
}?catch?(Exception?e)?{
logger.error("ERROR?數據轉換異常,?data:"?+?entry.toString(),?e);
}
switch?(tableName)?{
case?"t_bas_xxx1":
//進行你的業務處理
break;
case?"t_bas_xxx2":
//進行你的業務處理
break;
default:
return;
}
}
}
private?static?void?printColumn(List?columns)?{
for?(CanalEntry.Column?column?:?columns)?{
System.out.println(column.getName()?+?"?:?"?+?column.getValue()?+?"????不做處理="?+?column.getUpdated());
}
}
}
總結
以上是生活随笔為你收集整理的阿里mysql 二进制_Mysql binlog 之阿里canal的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 骁龙8+新旗舰 Moto X30 Pro
- 下一篇: 星纪时代战略投资魅族:双方保持品牌独立