日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

netty实现mysql协议_基于Netty模拟解析Binlog

發(fā)布時間:2024/9/19 数据库 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 netty实现mysql协议_基于Netty模拟解析Binlog 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

前言

最近一段時間一直再看mysql binlog相關(guān)的內(nèi)容,也整理了幾篇相關(guān)的文章,對mysql的事件以及通訊協(xié)議在理論上有了一個大概的了解,但是缺少實戰(zhàn);本文的目的就是從實戰(zhàn)出發(fā),了解binlog解析的整個過程。

解析思路

把binlog的解析過程大致分為以下幾個步驟:

1.服務(wù)器啟動首先獲取上一次解析成功的位置(實例中存儲在本地文件中);

2.和mysql服務(wù)器建立連接;

3.接受mysql發(fā)送來的binlog事件;

4.對不同的binlog事件進行解析;

5.將數(shù)據(jù)進行存儲(實例中僅在日志中打印);

6.存儲成功后,定時記錄Binaly Log位置。

關(guān)于binlog相關(guān)的配置可以參考系列文章,里面有詳解的介紹,下面對步驟進行詳細的介紹;

1.服務(wù)器啟動首先獲取上一次解析成功的位置(實例中存儲在本地文件中)

binlog的位置信息存儲在文件namePosition,有更新也同樣更新到namePosition中,部分代碼如下:

public class NamePositionStore {

private static Logger log = LoggerFactory.getLogger(NamePositionStore.class);

public static final String BINLOG_NAME = "binlogName";

public static final String BINLOG_POSITIION = "binlogPosition";

private static Map binlogMap = new HashMap();

private static String lineSeparator = (String) System.getProperties().get("line.separator");

private static String localStoreUrl = "namePosition";

static {

loadNamePosition();

}

public static synchronized Map loadNamePosition() {

binlogMap = load();

return binlogMap;

}

public static synchronized Map getNamePosition() {

return binlogMap;

}

public static synchronized void putNamePosition(String binlogName, long binlogPosition) {

binlogMap.put(BINLOG_NAME, binlogName);

binlogMap.put(BINLOG_POSITIION, binlogPosition + "");

store(binlogMap);

}

public static synchronized void putNamePosition(long binlogPosition) {

binlogMap.put(BINLOG_POSITIION, binlogPosition + "");

store(binlogMap);

}

...以下代碼省略,可參考碼云完整代碼...

}

namePosition中存儲了兩個字段分別是:binlogName和binlogPosition,這兩個字段會在客戶端請求mysql binlog的時候需要的參數(shù);

2.和mysql服務(wù)器建立連接

在文章Mysql通訊協(xié)議分析中可以看到和mysql服務(wù)器建立連接的步驟:mysql發(fā)送握手包,客戶端發(fā)送認(rèn)證包,mysql發(fā)送認(rèn)證的結(jié)果;

public class HandshakeHandler extends SimpleChannelInboundHandler {

private Logger logger = LoggerFactory.getLogger(HandshakeHandler.class);

@Override

protected void channelRead0(ChannelHandlerContext ctx, DataPackage pk) throws Exception {

logger.info("Handshake start");

if (null == pk) {

return;

}

ByteBuf msg = (ByteBuf) pk.getContent();

int protocolVersion = msg.readByte();

String serverVersion = ByteUtil.NullTerminatedString(msg);

int threadId = ByteUtil.readInt(msg, 4);

logger.info("protocolVersion = " + protocolVersion + ",serverVersion = " + serverVersion + ",threadId = "

+ threadId);

String randomNumber1 = ByteUtil.NullTerminatedString(msg);

msg.readBytes(2);

byte encode = msg.readByte();

msg.readBytes(2);

msg.readBytes(13);

String randomNumber2 = ByteUtil.NullTerminatedString(msg);

logger.info("Handshake end");

AuthenticateDataBean dataBean = new AuthenticateDataBean(encode, randomNumber1 + randomNumber2,

Constants.userName, Constants.password);

ctx.channel().writeAndFlush(new DataPackage(1, dataBean));

ctx.pipeline().remove(this);

}

}

接受mysql發(fā)送的握手包,進行相關(guān)的解析工作,其中比較重要的是兩個挑戰(zhàn)隨機數(shù),客戶端在認(rèn)證的時候需要使用隨機數(shù)對密碼加密;解析完之后客戶端發(fā)送認(rèn)證數(shù)據(jù)包(封裝在AuthenticateDataBean),具體類信息如下:

public class AuthenticateDataBean implements IDataBean {

/** 認(rèn)證需要的用戶名密碼 **/

private String userName;

private String password;

/** 編碼和挑戰(zhàn)隨機數(shù) **/

private byte encode;

private String randomNumber;

...以下代碼省略,可參考碼云完整代碼...

@Override

public byte[] toByteArray() throws Exception {

int clientPower = PowerType.CLIENT_LONG_FLAG | PowerType.CLIENT_PROTOCOL_41

| PowerType.CLIENT_SECURE_CONNECTION;

byte clientPowerBytes[] = ByteUtil.writeInt(clientPower, 4);

int maxLen = 0;

byte maxLenBytes[] = ByteUtil.writeInt(maxLen, 4);

byte encodeBytes[] = ByteUtil.writeInt(encode, 1);

byte zeroBytes[] = ByteUtil.writeInt(0, 23);

byte[] userNameBytes = (userName + "\0").getBytes();

byte[] passwordBytes = "".equals(password) ? new byte[0]

: ByteUtil.passwordCompatibleWithMySQL411(password, randomNumber);

ByteBuf byteBuf = Unpooled.buffer();

byteBuf.writeBytes(clientPowerBytes);

byteBuf.writeBytes(maxLenBytes);

byteBuf.writeBytes(encodeBytes);

byteBuf.writeBytes(zeroBytes);

byteBuf.writeBytes(userNameBytes);

byteBuf.writeByte((byte) passwordBytes.length);

byteBuf.writeBytes(passwordBytes);

return byteBuf.array();

}

}

發(fā)送的認(rèn)證包到服務(wù)器之后,客戶端會收到認(rèn)證的結(jié)果,具體處理在AuthenticateResultHandler中:

public class AuthenticateResultHandler extends SimpleChannelInboundHandler {

private Logger logger = LoggerFactory.getLogger(AuthenticateResultHandler.class);

@Override

protected void channelRead0(ChannelHandlerContext ctx, DataPackage dataPackage) throws Exception {

ByteBuf msg = (ByteBuf) dataPackage.getContent();

int mark = msg.readByte();

if (mark == 0) {

Map binlongMap = NamePositionStore.getNamePosition();

RequestBinlogDumpDataBean dataBean = new RequestBinlogDumpDataBean(Constants.serverId,

binlongMap.get(NamePositionStore.BINLOG_NAME),

Long.valueOf(binlongMap.get(NamePositionStore.BINLOG_POSITIION)));

ctx.channel().writeAndFlush(new DataPackage(0, dataBean));

logger.info("Authenticate success:" + ByteUtil.bytesToHexString(msg.array()));

} else {

logger.info("Authenticate fail:" + ByteUtil.bytesToHexString(msg.array()));

}

ctx.pipeline().remove(this);

}

}

如果認(rèn)證成功,這時候客戶端需要發(fā)送請求接受binlog的請求,這里面包含兩個重要的參數(shù)就是binlogName和binlogPosition,具體信息在RequestBinlogDumpDataBean類中,結(jié)構(gòu)類似AuthenticateDataBean,此處省略。

3.接受mysql發(fā)送來的binlog事件

服務(wù)器收到客戶端的binlog請求,這時服務(wù)器如果產(chǎn)生了binlog日志,會發(fā)送給客戶端,客戶端需要一個接受binlog事件的類:

public class BinlogEventParseHandler extends SimpleChannelInboundHandler {

private Logger logger = LoggerFactory.getLogger(BinlogEventParseHandler.class);

@Override

protected void channelRead0(ChannelHandlerContext ctx, DataPackage datePackage) throws Exception {

ByteBuf contentBuf = (ByteBuf) datePackage.getContent();

contentBuf.skipBytes(1);

EventHeader header = new EventHeader();

header.setTimestamp(ByteUtil.readInt(contentBuf, 4));

header.setTypeCode((byte) ByteUtil.readInt(contentBuf, 1));

header.setServerId(ByteUtil.readInt(contentBuf, 4));

header.setEventLen(ByteUtil.readInt(contentBuf, 4));

header.setNextPosition(ByteUtil.readInt(contentBuf, 4));

header.setFlags(ByteUtil.readInt(contentBuf, 2));

logger.info(header.toString());

IEventParser parser = EventParserFactory.getEventParser(header.getTypeCode());

if (parser == null) {

logger.error("不支持的binlog事件類型解析;typeCode = " + header.getTypeCode());

}

parser.parse(contentBuf, header);

if (header.getTypeCode() != EventType.ROTATE_EVENT

&& header.getTypeCode() != EventType.FORMAT_DESCRIPTION_EVENT) {

NamePositionStore.putNamePosition(header.getNextPosition());

}

}

}

首先解析事件頭包括:eventType,eventLen,nextPosition等信息,然后根據(jù)事件類型,調(diào)用不同的解析器進行解析;

4.對不同的binlog事件進行解析

步驟3中通過不同的事件類型,獲取對應(yīng)的解析器,這些解析器都在EventParserFactory中,下面以FormatDescriptionEventParser為例

public class FormatDescriptionEventParser implements IEventParser {

private Logger logger = LoggerFactory.getLogger(FormatDescriptionEventParser.class);

@Override

public void parse(ByteBuf msg, EventHeader eventHeader) {

long binlogVersion = ByteUtil.readInt(msg, 2);

String serverVersion = ByteUtil.readFixedLenString(msg, 50);

long timestamp = ByteUtil.readInt(msg, 4);

byte headerLength = msg.readByte();

StringBuffer eventTypeFixDataLen = new StringBuffer();

for (int i = 0; i < 27; i++) {

eventTypeFixDataLen.append(msg.readByte() + ",");

}

logger.info("binlogVersion = " + binlogVersion + ",serverVersion = " + serverVersion + ",timestamp = "

+ timestamp + ",headerLength = " + headerLength + ",eventTypeStr = " + eventTypeFixDataLen);

}

}

根據(jù)FormatDescriptionEvent的格式讀取ByteBuf里面的數(shù)據(jù)包括:binlog版本,服務(wù)器版本,時間戳,事件頭長度以及每個Event的fixed part lengths,本次實戰(zhàn)中僅僅將解析后的數(shù)據(jù)打印到日志中,沒有做其他處理。

5.將數(shù)據(jù)進行存儲(實例中僅在日志中打印)

本次使用的binlog模式是:STATEMENT,所有所有的sql語句都會發(fā)送給客戶端,對應(yīng)的事件是QueryEvent,包括創(chuàng)建表,增刪改等操作:

public class QueryEventParser implements IEventParser {

private Logger logger = LoggerFactory.getLogger(QueryEventParser.class);

private static final int QUERY_EVENT_FIX_LEN = 13;

@Override

@SuppressWarnings("unused")

public void parse(ByteBuf msg, EventHeader eventHeader) {

long threadId = ByteUtil.readInt(msg, 4);

long time = ByteUtil.readInt(msg, 4);

int dbNameLen = msg.readByte();

int errorCode = ByteUtil.readInt(msg, 2);

int variableLen = ByteUtil.readInt(msg, 2);

msg.skipBytes(variableLen);

String dbName = ByteUtil.NullTerminatedString(msg);

String sql = ByteUtil.readFixedLenString(msg, (int) (eventHeader.getEventLen() - variableLen

- EventHeader.EVENT_HEADER_LEN - QUERY_EVENT_FIX_LEN - dbName.getBytes().length - 1));

logger.info("dbName = " + dbName + ",sql = " + sql);

}

}

以上的QueryEventParser解析執(zhí)行的更新語句,記錄了數(shù)據(jù)庫名稱和相關(guān)的更新sql語句。

6.存儲成功后,定時記錄Binaly Log位置

在步驟三中的BinlogEventParseHandler類中,我們在解析玩之后,存儲了nextPosition信息到文件中,方便下次啟動讀取,同時binlog還有一個切換binlog文件的事件,同樣也需要記錄;

public class RotateEventParser implements IEventParser {

private Logger logger = LoggerFactory.getLogger(RotateEventParser.class);

@Override

public void parse(ByteBuf msg, EventHeader eventHeader) {

long binlogPosition = ByteUtil.readLong(msg, 8);

int variablePartLen = (int) (eventHeader.getEventLen() - EventHeader.EVENT_HEADER_LEN - 8);

byte variablePart[] = new byte[variablePartLen];

msg.readBytes(variablePart);

String binlogName = new String(variablePart);

logger.info("binlogPosition = " + binlogPosition + ",binlogName = " + binlogName);

NamePositionStore.putNamePosition(binlogName, binlogPosition);

}

}

對應(yīng)的事件是RotateEvent,因為切換成新的binlongName,所有需要同時記錄binlongName和binlogPosition。

總結(jié)

本文旨在讓大家更加了解binlog同步的大致過程,所以本文提供的項目沒有經(jīng)過大量的測試,僅供大家學(xué)習(xí)使用;本項目中參考了一些優(yōu)秀的開源軟件:mysql-binlog-connector-java和MySQL-Binlog

總結(jié)

以上是生活随笔為你收集整理的netty实现mysql协议_基于Netty模拟解析Binlog的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。