日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

opentrace在mysql中使用_采用OpenReplicator解析MySQL binlog

發布時間:2023/12/4 数据库 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 opentrace在mysql中使用_采用OpenReplicator解析MySQL binlog 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Open Replicator是一個用Java編寫的MySQL binlog分析程序。Open Replicator 首先連接到MySQL(就像一個普通的MySQL Slave一樣),然后接收和分析binlog,最終將分析得出的binlog events以回調的方式通知應用。Open Replicator可以被應用到MySQL數據變化的實時推送,多Master到單Slave的數據同步等多種應用場景。Open Replicator目前只支持MySQL5.0及以上版本。

Open Replicator項目地址:https://github.com/whitesock/open-replicator

binlog事件分析結構圖

在閱讀下面的內容時,首先需要對binlog有一定的了解,可以 參考《MySQL Binlog解析》。

這里通過open-replicator解析binlog日志事件(binlog-format = row)。binlog日志事件里存在兩種操作:DDL和DML,當DDL時輸出一條sql,當DML時輸出相關行信息。可以參考下面:

DDL(CREATE, ALTER, DROP, TRUNCATE,主要用在定義或改變表的結構):

{

"eventId": 1,

"databaseName": "canal_test",

"tableName": "`company`",

"eventType": 2,

"timestamp": 1477033198000,

"timestampReceipt": 1477033248780,

"binlogName": "mysql-bin.000006",

"position": 353,

"nextPostion": 468,

"serverId": 2,

"before": null,

"after": null,

"isDdl": true,

"sql": "DROP TABLE `company` /* generated by server */"}

DML(SELECT, UPDATE, INSERT, DELETE,對數據庫里的數據進行操作):

{

"eventId": 0,

"databaseName": "canal_test",

"tableName": "person",

"eventType": 24,

"timestamp": 1477030734000,

"timestampReceipt": 1477032161988,

"binlogName": "mysql-bin.000006",

"position": 242,

"nextPostion": 326,

"serverId": 2,

"before": {

"id": "3",

"sex": "f",

"address": "shanghai",

"age": "23",

"name": "zzh3"},

"after": {

"id": "3",

"sex": "m",

"address": "shanghai",

"age": "23",

"name": "zzh3"},

"isDdl": false,

"sql": null}

相關的類文件如下:

CDCEvent.java

package or;

import java.util.Map;

import java.util.concurrent.atomic.AtomicLong;

import com.google.code.or.binlog.BinlogEventV4;

import com.google.code.or.binlog.BinlogEventV4Header;

import com.google.code.or.binlog.impl.event.AbstractBinlogEventV4;

public class CDCEvent {

private long eventId = 0;//事件唯一標識

private String databaseName = null;

private String tableName = null;

private int eventType = 0;//事件類型

private long timestamp = 0;//事件發生的時間戳[MySQL服務器的時間]

private long timestampReceipt = 0;//Open-replicator接收到的時間戳[CDC執行的時間戳]

private String binlogName = null;// binlog file name

private long position = 0;

private long nextPostion = 0;

private long serverId = 0;

private Map before = null;

private Map after = null;

private Boolean isDdl= null;

private String sql = null;

private static AtomicLong uuid = new AtomicLong(0);

public CDCEvent(){}

public CDCEvent(final AbstractBinlogEventV4 are, String databaseName, String tableName){

this.init(are);

this.databaseName = databaseName;

this.tableName = tableName;

}

private void init(final BinlogEventV4 be){

this.eventId = uuid.getAndAdd(1);

BinlogEventV4Header header = be.getHeader();

this.timestamp = header.getTimestamp();

this.eventType = header.getEventType();

this.serverId = header.getServerId();

this.timestampReceipt = header.getTimestampOfReceipt();

this.position = header.getPosition();

this.nextPostion = header.getNextPosition();

this.binlogName = header.getBinlogFileName();

}

@Override

public String toString(){

StringBuilder builder = new StringBuilder();

builder.append("{ eventId:").append(eventId);

builder.append(",databaseName:").append(databaseName);

builder.append(",tableName:").append(tableName);

builder.append(",eventType:").append(eventType);

builder.append(",timestamp:").append(timestamp);

builder.append(",timestampReceipt:").append(timestampReceipt);

builder.append(",binlogName:").append(binlogName);

builder.append(",position:").append(position);

builder.append(",nextPostion:").append(nextPostion);

builder.append(",serverId:").append(serverId);

builder.append(",isDdl:").append(isDdl);

builder.append(",sql:").append(sql);

builder.append(",before:").append(before);

builder.append(",after:").append(after).append("}");

return builder.toString();

}

// 省略Getter和Setter方法

}

open-replicator的解析主要是通過注冊Listener的形式實現的,整個過程最重要的步驟在下面:

InstanceListener.java

package or;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import or.keeper.TableInfoKeeper;

import or.manager.CDCEventManager;

import or.model.ColumnInfo;

import or.model.TableInfo;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.google.code.or.binlog.BinlogEventListener;

import com.google.code.or.binlog.BinlogEventV4;

import com.google.code.or.binlog.impl.event.DeleteRowsEvent;

import com.google.code.or.binlog.impl.event.FormatDescriptionEvent;

import com.google.code.or.binlog.impl.event.QueryEvent;

import com.google.code.or.binlog.impl.event.TableMapEvent;

import com.google.code.or.binlog.impl.event.UpdateRowsEvent;

import com.google.code.or.binlog.impl.event.WriteRowsEvent;

import com.google.code.or.binlog.impl.event.XidEvent;

import com.google.code.or.common.glossary.Column;

import com.google.code.or.common.glossary.Pair;

import com.google.code.or.common.glossary.Row;

import com.google.code.or.common.util.MySQLConstants;

public class InstanceListener implements BinlogEventListener{

private static final Logger logger = LoggerFactory.getLogger(InstanceListener.class);

@Override

public void onEvents(BinlogEventV4 be) {

if(be == null){

logger.error("binlog event is null");

return;

}

int eventType = be.getHeader().getEventType();

switch(eventType){

case MySQLConstants.FORMAT_DESCRIPTION_EVENT:

{

logger.trace("FORMAT_DESCRIPTION_EVENT");

break;

}

case MySQLConstants.TABLE_MAP_EVENT://每次ROW_EVENT前都伴隨一個TABLE_MAP_EVENT事件,保存一些表信息,如tableId, tableName, databaseName, 而ROW_EVENT只有tableId

{

TableMapEvent tme = (TableMapEvent)be;

TableInfoKeeper.saveTableIdMap(tme);

logger.trace("TABLE_MAP_EVENT:tableId:{}",tme.getTableId());

break;

}

case MySQLConstants.DELETE_ROWS_EVENT:

{

DeleteRowsEvent dre = (DeleteRowsEvent) be;

long tableId = dre.getTableId();

logger.trace("DELETE_ROW_EVENT:tableId:{}",tableId);

TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);

String databaseName = tableInfo.getDatabaseName();

String tableName = tableInfo.getTableName();

List rows = dre.getRows();

for(Row row:rows){

List before = row.getColumns();

Map beforeMap = getMap(before,databaseName,tableName);

if(beforeMap !=null && beforeMap.size()>0){

CDCEvent cdcEvent = new CDCEvent(dre,databaseName,tableName);

cdcEvent.setIsDdl(false);

cdcEvent.setSql(null);

cdcEvent.setBefore(beforeMap);

CDCEventManager.queue.addLast(cdcEvent);

logger.info("cdcEvent:{}",cdcEvent);

}

}

break;

}

case MySQLConstants.UPDATE_ROWS_EVENT:

{

UpdateRowsEvent upe = (UpdateRowsEvent)be;

long tableId = upe.getTableId();

logger.info("UPDATE_ROWS_EVENT:tableId:{}",tableId);

TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);

String databaseName = tableInfo.getDatabaseName();

String tableName = tableInfo.getTableName();

List> rows = upe.getRows();

for(Pair p:rows){

List colsBefore = p.getBefore().getColumns();

List colsAfter = p.getAfter().getColumns();

Map beforeMap = getMap(colsBefore,databaseName,tableName);

Map afterMap = getMap(colsAfter,databaseName,tableName);

if(beforeMap!=null && afterMap!=null && beforeMap.size()>0 && afterMap.size()>0){

CDCEvent cdcEvent = new CDCEvent(upe,databaseName,tableName);

cdcEvent.setIsDdl(false);

cdcEvent.setSql(null);

cdcEvent.setBefore(beforeMap);

cdcEvent.setAfter(afterMap);

CDCEventManager.queue.addLast(cdcEvent);

logger.info("cdcEvent:{}",cdcEvent);

}

}

break;

}

case MySQLConstants.WRITE_ROWS_EVENT:

{

WriteRowsEvent wre = (WriteRowsEvent)be;

long tableId = wre.getTableId();

logger.trace("WRITE_ROWS_EVENT:tableId:{}",tableId);

TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);

String databaseName = tableInfo.getDatabaseName();

String tableName = tableInfo.getTableName();

List rows = wre.getRows();

for(Row row: rows){

List after = row.getColumns();

Map afterMap = getMap(after,databaseName,tableName);

if(afterMap!=null && afterMap.size()>0){

CDCEvent cdcEvent = new CDCEvent(wre,databaseName,tableName);

cdcEvent.setIsDdl(false);

cdcEvent.setSql(null);

cdcEvent.setAfter(afterMap);

CDCEventManager.queue.addLast(cdcEvent);

logger.info("cdcEvent:{}",cdcEvent);

}

}

break;

}

case MySQLConstants.QUERY_EVENT:

{

QueryEvent qe = (QueryEvent)be;

TableInfo tableInfo = createTableInfo(qe);

if(tableInfo == null)

break;

String databaseName = tableInfo.getDatabaseName();

String tableName = tableInfo.getTableName();

logger.trace("QUERY_EVENT:databaseName:{},tableName:{}",databaseName,tableName);

CDCEvent cdcEvent = new CDCEvent(qe,databaseName,tableName);

cdcEvent.setIsDdl(true);

cdcEvent.setSql(qe.getSql().toString());

CDCEventManager.queue.addLast(cdcEvent);

logger.info("cdcEvent:{}",cdcEvent);

break;

}

case MySQLConstants.XID_EVENT:{

XidEvent xe = (XidEvent)be;

logger.trace("XID_EVENT: xid:{}",xe.getXid());

break;

}

default:

{

logger.trace("DEFAULT:{}",eventType);

break;

}

}

}

/**

* ROW_EVENT中是沒有Column信息的,需要通過MysqlConnection(下面會講到)的方式讀取列名信息,

* 然后跟取回的List進行映射。

*

* @param cols

* @param databaseName

* @param tableName

* @return

*/

private Map getMap(List cols, String databaseName, String tableName){

Map map = new HashMap<>();

if(cols == null || cols.size()==0){

return null;

}

String fullName = databaseName+"."+tableName;

List columnInfoList = TableInfoKeeper.getColumns(fullName);

if(columnInfoList == null)

return null;

if(columnInfoList.size() != cols.size()){

TableInfoKeeper.refreshColumnsMap();

if(columnInfoList.size() != cols.size())

{

logger.warn("columnInfoList.size is not equal to cols.");

return null;

}

}

for(int i=0;i

if(cols.get(i).getValue()==null)

map.put(columnInfoList.get(i).getName(),"");

else

map.put(columnInfoList.get(i).getName(), cols.get(i).toString());

}

return map;

}

/**

* 從sql中提取Table信息,因為QUERY_EVENT是對應DATABASE這一級別的,不像ROW_EVENT是對應TABLE這一級別的,

* 所以需要通過從sql中提取TABLE信息,封裝到TableInfo對象中

*

* @param qe

* @return

*/

private TableInfo createTableInfo(QueryEvent qe){

String sql = qe.getSql().toString().toLowerCase();

TableInfo ti = new TableInfo();

String databaseName = qe.getDatabaseName().toString();

String tableName = null;

if(checkFlag(sql,"table")){

tableName = getTableName(sql,"table");

} else if(checkFlag(sql,"truncate")){

tableName = getTableName(sql,"truncate");

} else{

logger.warn("can not find table name from sql:{}",sql);

return null;

}

ti.setDatabaseName(databaseName);

ti.setTableName(tableName);

ti.setFullName(databaseName+"."+tableName);

return ti;

}

private boolean checkFlag(String sql, String flag){

String[] ss = sql.split(" ");

for(String s:ss){

if(s.equals(flag)){

return true;

}

}

return false;

}

private String getTableName(String sql, String flag){

String[] ss = sql.split("\\.");

String tName = null;

if (ss.length > 1) {

String[] strs = ss[1].split(" ");

tName = strs[0];

} else {

String[] strs = sql.split(" ");

boolean start = false;

for (String s : strs) {

if (s.indexOf(flag) >= 0) {

start = true;

continue;

}

if (start && !s.isEmpty()) {

tName = s;

break;

}

}

}

tName.replaceAll("`", "").replaceAll(";", "");

//del "("[create table person(....]

int index = tName.indexOf('(');

if(index>0){

tName = tName.substring(0, index);

}

return tName;

}

}

上面所涉及到的TableInfo .java如下:

package or.model;

public class TableInfo {

private String databaseName;

private String tableName;

private String fullName;

// 省略Getter和Setter

@Override

public boolean equals(Object o){

if(this == o)

return true;

if(o == null || this.getClass()!=o.getClass())

return false;

TableInfo tableInfo = (TableInfo)o;

if(!this.databaseName.equals(tableInfo.getDatabaseName()))

return false;

if(!this.tableName.equals(tableInfo.getTableName()))

return false;

if(!this.fullName.equals(tableInfo.getFullName()))

return false;

return true;

}

@Override

public int hashCode(){

int result = this.tableName.hashCode();

result = 31*result+this.databaseName.hashCode();

result = 31*result+this.fullName.hashCode();

return result;

}

}

接著需要有個地方保存從TABLE_MAP_EVENT中提取到的信息,TableInfoKeeper .java

package or.keeper;

import java.util.List;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

import or.MysqlConnection;

import or.model.ColumnInfo;

import or.model.TableInfo;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.google.code.or.binlog.impl.event.TableMapEvent;

public class TableInfoKeeper {

private static final Logger logger = LoggerFactory.getLogger(TableInfoKeeper.class);

private static Map tabledIdMap = new ConcurrentHashMap<>();

private static Map> columnsMap = new ConcurrentHashMap<>();

static{

columnsMap = MysqlConnection.getColumns();

}

public static void saveTableIdMap(TableMapEvent tme){

long tableId = tme.getTableId();

tabledIdMap.remove(tableId);

TableInfo table = new TableInfo();

table.setDatabaseName(tme.getDatabaseName().toString());

table.setTableName(tme.getTableName().toString());

table.setFullName(tme.getDatabaseName()+"."+tme.getTableName());

tabledIdMap.put(tableId, table);

}

public static synchronized void refreshColumnsMap(){

Map> map = MysqlConnection.getColumns();

if(map.size()>0){

// logger.warn("refresh and clear cols.");

columnsMap = map;

// logger.warn("refresh and switch cols:{}",map);

}

else

{

logger.error("refresh columnsMap error.");

}

}

public static TableInfo getTableInfo(long tableId){

return tabledIdMap.get(tableId);

}

public static List getColumns(String fullName){

return columnsMap.get(fullName);

}

}

正如上面InstanceListener中提到的,有些信息需要直接從MySQL中讀取,比如數據庫表的列信息,相關的類MysqlConnection如下:

package or;

import java.sql.Connection;

import java.sql.DatabaseMetaData;

import java.sql.DriverManager;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.sql.Statement;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import or.model.BinlogInfo;

import or.model.BinlogMasterStatus;

import or.model.ColumnInfo;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class MysqlConnection {

private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);

private static Connection conn;

private static String host;

private static int port;

private static String user;

private static String password;

public static void setConnection(String hostArg, int portArg, String userArg, String passwordArg){

try {

if(conn == null || conn.isClosed()){

Class.forName("com.mysql.jdbc.Driver");

host = hostArg;

port = portArg;

user = userArg;

password = passwordArg;

conn = DriverManager.getConnection("jdbc:mysql://"+host+":"+port+"/",user,password);

logger.info("connected to mysql:{} : {}",user,password);

}

} catch (ClassNotFoundException e) {

logger.error(e.getMessage(),e);

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

}

public static Connection getConnection(){

try {

if(conn == null || conn.isClosed()){

setConnection(host,port,user,password);

}

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

return conn;

}

/**

* 獲取Column信息

*

*@return

*/

public static Map> getColumns(){

Map> cols = new HashMap<>();

Connection conn = getConnection();

try {

DatabaseMetaData metaData = conn.getMetaData();

ResultSet r = metaData.getCatalogs();

String tableType[] = {"TABLE"};

while(r.next()){

String databaseName = r.getString("TABLE_CAT");

ResultSet result = metaData.getTables(databaseName, null, null, tableType);

while(result.next()){

String tableName = result.getString("TABLE_NAME");

// System.out.println(result.getInt("TABLE_ID"));

String key = databaseName +"."+tableName;

ResultSet colSet = metaData.getColumns(databaseName, null, tableName, null);

cols.put(key, new ArrayList());

while(colSet.next()){

ColumnInfo columnInfo = new ColumnInfo(colSet.getString("COLUMN_NAME"),colSet.getString("TYPE_NAME"));

cols.get(key).add(columnInfo);

}

}

}

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

return cols;

}

/**

* 參考

* mysql> show binary logs

* +------------------+-----------+

* | Log_name | File_size |

* +------------------+-----------+

* | mysql-bin.000001 | 126 |

* | mysql-bin.000002 | 126 |

* | mysql-bin.000003 | 6819 |

* | mysql-bin.000004 | 1868 |

* +------------------+-----------+

*/

public static List getBinlogInfo(){

List binlogList = new ArrayList<>();

Connection conn = null;

Statement statement = null;

ResultSet resultSet = null;

try {

conn = getConnection();

statement = conn.createStatement();

resultSet = statement.executeQuery("show binary logs");

while(resultSet.next()){

BinlogInfo binlogInfo = new BinlogInfo(resultSet.getString("Log_name"),resultSet.getLong("File_size"));

binlogList.add(binlogInfo);

}

} catch (Exception e) {

logger.error(e.getMessage(),e);

} finally{

try {

if(resultSet != null)

resultSet.close();

if(statement != null)

statement.close();

if(conn != null)

conn.close();

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

}

return binlogList;

}

/**

* 參考:

* mysql> show master status;

* +------------------+----------+--------------+------------------+

* | File | Position | Binlog_Do_DB | Binlog_Ignore_DB |

* +------------------+----------+--------------+------------------+

* | mysql-bin.000004 | 1868 | | |

* +------------------+----------+--------------+------------------+

*@return

*/

public static BinlogMasterStatus getBinlogMasterStatus(){

BinlogMasterStatus binlogMasterStatus = new BinlogMasterStatus();

Connection conn = null;

Statement statement = null;

ResultSet resultSet = null;

try {

conn = getConnection();

statement = conn.createStatement();

resultSet = statement.executeQuery("show master status");

while(resultSet.next()){

binlogMasterStatus.setBinlogName(resultSet.getString("File"));

binlogMasterStatus.setPosition(resultSet.getLong("Position"));

}

} catch (Exception e) {

logger.error(e.getMessage(),e);

} finally{

try {

if(resultSet != null)

resultSet.close();

if(statement != null)

statement.close();

if(conn != null)

conn.close();

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

}

return binlogMasterStatus;

}

/**

* 獲取open-replicator所連接的mysql服務器的serverid信息

*@return

*/

public static int getServerId(){

int serverId=6789;

Connection conn = null;

Statement statement = null;

ResultSet resultSet = null;

try {

conn = getConnection();

statement = conn.createStatement();

resultSet = statement.executeQuery("show variables like 'server_id'");

while(resultSet.next()){

serverId = resultSet.getInt("Value");

}

} catch (Exception e) {

logger.error(e.getMessage(),e);

} finally{

try {

if(resultSet != null)

resultSet.close();

if(statement != null)

statement.close();

if(conn != null)

conn.close();

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

}

return serverId;

}

}

上面代碼設計的附加類(BinlogInfo.java; BinlogMasterStatus.java; ColumnInfo.java)

package or.model;

public class BinlogInfo {

private String binlogName;

private Long fileSize;

// 省略Getter和Setter

}

package or.model;

public class BinlogMasterStatus {

private String binlogName;

private long position;

// 省略Getter和Setter

}

package or.model;

public class ColumnInfo {

private String name;

private String type;

// 省略Getter和Setter

}

最后還要有個地方存儲解析之后的事件信息,這里簡要設計下,采用一個ConcurrentLinkedDeque好了(CDCEventManager.java)

package or.manager;

import java.util.concurrent.ConcurrentLinkedDeque;

import or.CDCEvent;

public class CDCEventManager {

public static final ConcurrentLinkedDeque queue = new ConcurrentLinkedDeque<>();

}

所有的準備工作都完成了,下面可以解析binlog日志了:

package or.test;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import or.CDCEvent;

import or.InstanceListener;

import or.MysqlConnection;

import or.OpenReplicatorPlus;

import or.manager.CDCEventManager;

import or.model.BinlogMasterStatus;

import com.google.code.or.OpenReplicator;

import com.google.gson.Gson;

import com.google.gson.GsonBuilder;

import com.google.gson.JsonElement;

import com.google.gson.JsonParser;

public class OpenReplicatorTest {

private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorTest.class);

private static final String host = "10.198.197.60";

private static final int port = 3306;

private static final String user = "****";

private static final String password = "****";

public static void main(String[] args){

OpenReplicator or = new OpenReplicator ();

or.setUser(user);

or.setPassword(password);

or.setHost(host);

or.setPort(port);

MysqlConnection.setConnection(host, port, user, password);

// or.setServerId(MysqlConnection.getServerId());

//配置里的serverId是open-replicator(作為一個slave)的id,不是master的serverId

BinlogMasterStatus bms = MysqlConnection.getBinlogMasterStatus();

or.setBinlogFileName(bms.getBinlogName());

// or.setBinlogFileName("mysql-bin.000004");

or.setBinlogPosition(4);

or.setBinlogEventListener(new InstanceListener());

try {

or.start();

} catch (Exception e) {

logger.error(e.getMessage(),e);

}

Thread thread = new Thread(new PrintCDCEvent());

thread.start();

}

public static class PrintCDCEvent implements Runnable{

@Override

public void run() {

while(true){

if(CDCEventManager.queue.isEmpty() == false)

{

CDCEvent ce = CDCEventManager.queue.pollFirst();

Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create();

String prettyStr1 = gson.toJson(ce);

System.out.println(prettyStr1);

}

else{

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

}

}

時間運行舊了會遇到這樣一個問題:

16-10-21 10:41:49.365 ERROR[binlog-parser-1 AbstractBinlogParser.run:247] failed to parse binlog

java.io.EOFException: null

at com.google.code.or.io.util.ActiveBufferedInputStream.read(ActiveBufferedInputStream.java:169) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.io.impl.XInputStreamImpl.doFill(XInputStreamImpl.java:236) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.io.impl.XInputStreamImpl.read(XInputStreamImpl.java:213) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:141) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:61) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.binlog.impl.ReplicationBasedBinlogParser.doParse(ReplicationBasedBinlogParser.java:91) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.binlog.impl.AbstractBinlogParser$Task.run(AbstractBinlogParser.java:244) ~[open-replicator-1.0.7.jar:na]

at java.lang.Thread.run(Unknown Source) [na:1.7.0_80]

16-10-21 10:41:49.371 INFO [binlog-parser-1 TransportImpl.disconnect:121] disconnected from 10.198.197.60:3306

初步解決方案(extends OpenReplicator然后添加重試機制):

package or;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.google.code.or.OpenReplicator;

public class OpenReplicatorPlus extends OpenReplicator{

private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorPlus.class);

private volatile boolean autoRestart = true;

@Override

public void stopQuietly(long timeout, TimeUnit unit){

super.stopQuietly(timeout, unit);

if(autoRestart){

try {

TimeUnit.SECONDS.sleep(10);

logger.error("Restart OpenReplicator");

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

最后只需將OpenReplicatorTest.java中的OpenReplicator or = new OpenReplicator ();改為OpenReplicator or = new OpenReplicatorPlus ();即可。

大功告成~~

總結

以上是生活随笔為你收集整理的opentrace在mysql中使用_采用OpenReplicator解析MySQL binlog的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。