日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > php >内容正文

php

php阿里的同步工具canal,基于阿里的Canal实现数据同步

發布時間:2025/3/20 php 49 豆豆
生活随笔 收集整理的這篇文章主要介紹了 php阿里的同步工具canal,基于阿里的Canal实现数据同步 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、開啟同步數據庫的binlog功能

(1)開啟同步數據端的數據庫服務(比如我的將一號虛擬機上的mysql數據庫作為同步操作數據庫)

systemctl start mysql.service

mysql -h192.168.137.100 -uroot -p123

(2)檢查mysql的binlog功能是否開啟(可見是OFF,關閉的)

MySQL [(none)]> show variables like 'log_bin';

+---------------+-------+

| Variable_name | Value |

+---------------+-------+

| log_bin | OFF |

+---------------+-------+

(3)開啟binlog功能,修改my.cnf配置文件:vim /etc/my.cnf

在文件中添加如下內容后,重啟mysql服務,在查看mysql的binlog功能。可見已經開啟。

log-bin=mysql-bin

binlog_format=ROW

server_id=1

MySQL [(none)]> show variables like 'log_bin';

+---------------+-------+

| Variable_name | Value |

+---------------+-------+

| log_bin | ON |

+---------------+-------+

二、安裝canal工具

(1)下載地址:https://github.com/alibaba/canal/releases

(2)下載文件:canal.deployer-1.1.4.tar.gz

(3)文件傳到 /usr/local/canal文件夾下,解壓

tar zxvf canal.deployer-1.1.4.tar.gz

(4)修改配置文件vi conf/example/instance.properties

修改如下內容為數據庫連接:

canal.instance.master.address=192.168.137.100:3306

canal.instance.dbUsername=root

canal.instance.dbPassword=123

(5)啟動canal數據同步工具 cd bin

[root@localhost bin]# ./startup.sh

三、測試項目準備

(1)新建項目,引入如下依賴

org.springframework.boot

spring-boot-starter-web

mysql

mysql-connector-java

commons-dbutils

commons-dbutils

org.springframework.boot

spring-boot-starter-jdbc

com.alibaba.otter

canal.client

(2)項目配置文件配置本地同步數據庫

# 服務端口

server.port=1000

# 服務名

spring.application.name=canal-client

# 環境設置:dev、test、prod

spring.profiles.active=dev

# mysql數據庫連接

spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

spring.datasource.url=jdbc:mysql://192.168.137.130:3306/TeachOnLine?useUnicode=true&useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai

spring.datasource.username=root

spring.datasource.password=123

(3)創建如下啟動類

@SpringBootApplication

public class CanalApplication implements CommandLineRunner {

@Resource

private CanalClient canalClient;

// canalClient為數據庫同步操作執行類

public static void main(String[] args) {

SpringApplication.run(CanalApplication.class, args);

}

@Override

public void run(String... strings) throws Exception {

//項目啟動,執行canal客戶端監聽

canalClient.run();

}

}

(4)創建CanalClient數據庫同步操作執行類內容如下(記得保證192.168.137.100服務器的11111端口可通)

@Component

public class CanalClient {

//sql隊列

private Queue SQL_QUEUE = new ConcurrentLinkedQueue<>();

@Resource

private DataSource dataSource;

/**

* canal入庫方法

*/

public void run() {

// 創建遠程數據庫連接對象

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.137.100",

11111), "example", "", "");

int batchSize = 1000;

try {

connector.connect();// 嘗試連接

connector.subscribe(".*\\..*");

connector.rollback();

try {

while (true) {

//嘗試從master那邊拉去數據batchSize條記錄,有多少取多少

Message message = connector.getWithoutAck(batchSize);

long batchId = message.getId();

int size = message.getEntries().size();

if (batchId == -1 || size == 0) {

Thread.sleep(1000);

} else {

dataHandle(message.getEntries());

}

connector.ack(batchId);

//當隊列里面堆積的sql大于一定數值的時候就模擬執行

if (SQL_QUEUE.size() >= 1) {

executeQueueSql();

}

}

} catch (InterruptedException e) {

e.printStackTrace();

} catch (InvalidProtocolBufferException e) {

e.printStackTrace();

}

} finally {

connector.disconnect();

}

}

/**

* 模擬執行隊列里面的sql語句

*/

public void executeQueueSql() {

int size = SQL_QUEUE.size();

for (int i = 0; i < size; i++) {

String sql = SQL_QUEUE.poll();

System.out.println("[sql]----> " + sql);

this.execute(sql.toString());

}

}

/**

* 數據處理

*

* @param entrys

*/

private void dataHandle(List entrys) throws InvalidProtocolBufferException {

for (Entry entry : entrys) {

if (EntryType.ROWDATA == entry.getEntryType()) {

RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

EventType eventType = rowChange.getEventType();

if (eventType == EventType.DELETE) {

saveDeleteSql(entry);

} else if (eventType == EventType.UPDATE) {

saveUpdateSql(entry);

} else if (eventType == EventType.INSERT) {

saveInsertSql(entry);

}

}

}

}

/**

* 保存更新語句

*

* @param entry

*/

private void saveUpdateSql(Entry entry) {

try {

RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

List rowDatasList = rowChange.getRowDatasList();

for (RowData rowData : rowDatasList) {

List newColumnList = rowData.getAfterColumnsList();

StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");

for (int i = 0; i < newColumnList.size(); i++) {

sql.append(" " + newColumnList.get(i).getName()

+ " = '" + newColumnList.get(i).getValue() + "'");

if (i != newColumnList.size() - 1) {

sql.append(",");

}

}

sql.append(" where ");

List oldColumnList = rowData.getBeforeColumnsList();

for (Column column : oldColumnList) {

if (column.getIsKey()) {

//暫時只支持單一主鍵

sql.append(column.getName() + "=" + column.getValue());

break;

}

}

SQL_QUEUE.add(sql.toString());

}

} catch (InvalidProtocolBufferException e) {

e.printStackTrace();

}

}

/**

* 保存刪除語句

*

* @param entry

*/

private void saveDeleteSql(Entry entry) {

try {

RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

List rowDatasList = rowChange.getRowDatasList();

for (RowData rowData : rowDatasList) {

List columnList = rowData.getBeforeColumnsList();

StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");

for (Column column : columnList) {

if (column.getIsKey()) {

//暫時只支持單一主鍵

sql.append(column.getName() + "=" + column.getValue());

break;

}

}

SQL_QUEUE.add(sql.toString());

}

} catch (InvalidProtocolBufferException e) {

e.printStackTrace();

}

}

/**

* 保存插入語句

*

* @param entry

*/

private void saveInsertSql(Entry entry) {

try {

RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

List rowDatasList = rowChange.getRowDatasList();

for (RowData rowData : rowDatasList) {

List columnList = rowData.getAfterColumnsList();

StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");

for (int i = 0; i < columnList.size(); i++) {

sql.append(columnList.get(i).getName());

if (i != columnList.size() - 1) {

sql.append(",");

}

}

sql.append(") VALUES (");

for (int i = 0; i < columnList.size(); i++) {

sql.append("'" + columnList.get(i).getValue() + "'");

if (i != columnList.size() - 1) {

sql.append(",");

}

}

sql.append(")");

SQL_QUEUE.add(sql.toString());

}

} catch (InvalidProtocolBufferException e) {

e.printStackTrace();

}

}

/**

* 入庫

* @param sql

*/

public void execute(String sql) {

Connection con = null;

try {

if(null == sql) return;

con = dataSource.getConnection();

QueryRunner qr = new QueryRunner();

int row = qr.execute(con, sql);// 語句最終執行!

System.out.println("update: "+ row);

} catch (SQLException e) {

e.printStackTrace();

} finally {

DbUtils.closeQuietly(con);

}

}

}

四、測試

(1)啟動類開啟,在192.168.137.100(遠程)數據庫TeachOnLine的表members中插入數據

insert into members values(2,‘candy’,22);

(2)查看控制臺打印信息

[sql]----> insert into members (id,username,age) VALUES ('2','candy','22')

update: 1

(3)在192.168.137.130(本地)數據庫TeachOnLine的表members中出現相同數據。

注:在此例中本地數據庫會同步遠程數據庫的變化,反過來修改本地數據庫內容,遠程數據庫不會同步本地數據庫的加粗樣式變化

標簽:Canal,canal,同步,數據庫,mysql,阿里,sql,entry,size

來源: https://blog.csdn.net/weixin_44187615/article/details/111726524

《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀

總結

以上是生活随笔為你收集整理的php阿里的同步工具canal,基于阿里的Canal实现数据同步的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。