日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Java代码中,如何监控Mysql的binlog?

發布時間:2025/3/16 数据库 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java代码中,如何监控Mysql的binlog? 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

最近在工作中,遇到了這樣一個業務場景,我們需要關注一個業務系統數據庫中某幾張表的數據,當數據發生新增或修改時,將它同步到另一個業務系統數據庫中的表中。

一提到數據庫的同步,估計大家第一時間想到的就是基于binlog的主從復制了,但是放在我們的場景中,還有幾個問題:

  • 第一,并不是需要復制所有表的數據,復制對象只有少量的幾張表

  • 第二,也是比較麻煩的,兩個業務系統數據庫表結構可能不一致。例如,要同步數據庫1的A表中的某些字段到數據庫2的B表中,在這一過程中,A表和B表的字段并不是完全相同

這樣的話,我們只能通過代碼的方式,首先獲取到數據庫1表中數據的變動,再通過手動映射的方式,插入到數據庫2的表中。但是,獲取變動數據的這一過程,還是離不開binlog,因此我們就需要在代碼中對binlog進行一下監控。

先說結論,我們最終使用了一個開源工具mysql-binlog-connector-java,用來監控binlog變化并獲取數據,獲取數據后再手動插入到另一個庫的表中,基于它來實現了數據的同步。這個工具的git項目地址如下:

https://github.com/shyiko/mysql-binlog-connector-java

在正式開始前,還是先簡單介紹一下mysql的binlog,binlog是一個二進制文件,它保存在磁盤中,是用來記錄數據庫表結構變更、表數據修改的二進制日志。其實除了數據復制外,它還可以實現數據恢復、增量備份等功能。

啟動項目前,首先需要確保mysql服務已經啟用了binlog:

show?variables?like?'log_bin';

如果為值為OFF,表示沒有啟用,那么需要首先啟用binlog,修改配置文件:

log_bin=mysql-bin binlog-format=ROW server-id=1

對參數做一個簡要說明:

  • 在配置文件中加入了log_bin配置項后,表示啟用了binlog

  • binlog-format是binlog的日志格式,支持三種類型,分別是STATEMENT、ROW、MIXED,我們在這里使用ROW模式

  • server-id用于標識一個sql語句是從哪一個server寫入的,這里一定要進行設置,否則我們在后面的代碼中會無法正常監聽到事件

在更改完配置文件后,重啟mysql服務。再次查看是否啟用binlog,返回為ON,表示已經開啟成功。

在Java項目中,首先引入maven坐標:

<dependency><groupId>com.github.shyiko</groupId><artifactId>mysql-binlog-connector-java</artifactId><version>0.21.0</version> </dependency>

寫一段簡單的示例,看看它的具體使用方式:

public?static?void?main(String[]?args)?{BinaryLogClient?client?=?new?BinaryLogClient("127.0.0.1",?3306,?"hydra",?"123456");client.setServerId(2);client.registerEventListener(event?->?{EventData?data?=?event.getData();if?(data?instanceof?TableMapEventData)?{System.out.println("Table:");TableMapEventData?tableMapEventData?=?(TableMapEventData)?data;System.out.println(tableMapEventData.getTableId()+":?["+tableMapEventData.getDatabase()?+?"-"?+?tableMapEventData.getTable()+"]");}if?(data?instanceof?UpdateRowsEventData)?{System.out.println("Update:");System.out.println(data.toString());}?else?if?(data?instanceof?WriteRowsEventData)?{System.out.println("Insert:");System.out.println(data.toString());}?else?if?(data?instanceof?DeleteRowsEventData)?{System.out.println("Delete:");System.out.println(data.toString());}});try?{client.connect();}?catch?(IOException?e)?{e.printStackTrace();} }

首先,創建一個BinaryLogClient客戶端對象,初始化時需要傳入mysql的連接信息,創建完成后,給客戶端注冊一個監聽器,來實現它對binlog的監聽和解析。在監聽器中,我們暫時只對4種類型的事件數據進行了處理,除了WriteRowsEventData、DeleteRowsEventData、UpdateRowsEventData對應增刪改操作類型的事件數據外,還有一個TableMapEventData類型的數據,包含了表的對應關系,在后面的例子中再具體說明。

在這里,客戶端監聽到的是數據庫級別的所有事件,并且可以監聽到表的DML語句和DDL語句,所以我們只需要處理我們關心的事件數據就行,否則會收到大量的冗余數據。

啟動程序,控制臺輸出:

com.github.shyiko.mysql.binlog.BinaryLogClient openChannelToBinaryLogStream 信息: Connected to 127.0.0.1:3306 at mysql-bin.000002/1046 (sid:2, cid:10)

連接mysql的binlog成功,接下來,我們在數據庫中插入一條數據,這里操作的數據庫名字是tenant,表是dept:

insert?into?dept?VALUES(8,"人力","","1");

這時,控制臺就會打印監聽到事件的數據:

Table: 108: [tenant-dept] Insert: WriteRowsEventData{tableId=108, includedColumns={0, 1, 2, 3}, rows=[[8, 人力, , 1] ]}

我們監聽到的事件類型數據有兩類,第一類是TableMapEventData,通過它可以獲取操作的數據庫名稱、表名稱以及表的id。之所以我們要監聽這個事件,是因為之后監聽的實際操作中返回數據中包含了表的id,而沒有表名等信息,所以如果我們想知道具體的操作是在哪一張表的話,就要先維護一個id與表的對應關系。

第二個打印出來的監聽事件數據是WriteRowsEventData,其中記錄了insert語句作用的表,插入涉及到的列,以及實際插入的數據。另外,如果我們只需要對特定的一張或幾張表進行處理的話,也可以提前設置表的名單,在這里根據表id到表名的映射關系,實現數據的過濾,

接下來,我們再執行一條update語句:

update?dept?set?tenant_id=3?where?id=8?or?id=9

控制臺輸出:

Table: 108: [tenant-dept] Update: UpdateRowsEventData{tableId=108, includedColumnsBeforeUpdate={0, 1, 2, 3}, includedColumns={0, 1, 2, 3}, rows=[{before=[8, 人力, , 1], after=[8, 人力, , 3]},{before=[9, 人力, , 1], after=[9, 人力, , 3]} ]}

在執行update語句時,可能會作用于多條數據,因此在實際修改的數據中,可能包含多行記錄,這一點體現在上面的rows中,包含了id為8和9的兩條數據。

最后,再執行一條delete語句:

delete?from?dept?where?tenant_id=3

控制臺打印如下,rows中同樣返回了生效的兩條數據:

Table: 108: [tenant-dept] Delete: DeleteRowsEventData{tableId=108, includedColumns={0, 1, 2, 3}, rows=[[8, 人力, , 3],[9, 人力, , 3] ]}

簡單的使用原理介紹完成后,再回到我們原先的需求上,需要將一張表中新增或修改的數據同步到另一張表中,問題還有一個,就是如何將返回的數據對應到所在的列上。這時應該怎么實現呢?

以update操作為例,我們要對提取的數據后進行一下處理,更改上面例子中的方法:

if?(data?instanceof?UpdateRowsEventData)?{System.out.println("Update:");UpdateRowsEventData?updateRowsEventData?=?(UpdateRowsEventData)?data;for?(Map.Entry<Serializable[],?Serializable[]>?row?:?updateRowsEventData.getRows())?{List<Serializable>?entries?=?Arrays.asList(row.getValue());System.out.println(entries);JSONObject?dataObject?=?getDataObject(entries);System.out.println(dataObject);} }

在將data強制轉換為UpdateRowsEventData后,可以使用getRows方法獲取到更新的行數據,并且能夠取到每一列的值。

之后,調用了一個自己實現的getDataObject方法,用它來實現數據到列的綁定過程:

private?static?JSONObject?getDataObject(List?message)?{JSONObject?resultObject?=?new?JSONObject();String?format?=?"{\"id\":\"0\",\"dept_name\":\"1\",\"comment\":\"2\",\"tenant_id\":\"3\"}";JSONObject?json?=?JSON.parseObject(format);for?(String?key?:?json.keySet())?{resultObject.put(key,?message.get(json.getInteger(key)));}return?resultObject; }

在format字符串中,提前維護了一個數據庫表的字段順序的字符串,標識了每個字段位于順序中的第幾個位置。通過上面這個函數,能夠實現數據到列的填裝過程,我們再執行一條update語句來查看一下結果:

update?dept?set?tenant_id=3,comment="1"?where?id=8

控制臺打印結果如下:

Table: 108: [tenant-dept] Update: [8, 人力, 1, 3] {"tenant_id":3,"dept_name":"人力","comment":"1","id":8}

可以看到,將修改后的這一條記錄中的屬性填裝到了它對應的列中,之后我們再根據具體的業務邏輯,就可以根據字段名取出數據,將數據同步到其他的表了。

有道無術,術可成;有術無道,止于術

歡迎大家關注Java之道公眾號

好文章,我在看??

總結

以上是生活随笔為你收集整理的Java代码中,如何监控Mysql的binlog?的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。