日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > 数据库 >内容正文

数据库

基于zbus的MySQL透明代理(100行)

發(fā)布時(shí)間:2023/11/29 数据库 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 基于zbus的MySQL透明代理(100行) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

項(xiàng)目地址 https://git.oschina.net/rushmore/zbus

我們上次講到zbus網(wǎng)絡(luò)通訊的核心API:

Dispatcher -- 負(fù)責(zé)-NIO網(wǎng)絡(luò)事件Selector引擎的管理,對(duì)Selector引擎負(fù)載均衡

IoAdaptor -- 網(wǎng)絡(luò)事件的處理,服務(wù)器與客戶端共用,負(fù)責(zé)讀寫,消息分包組包等

Session -- 代表網(wǎng)絡(luò)鏈接,可以讀寫消息

實(shí)際的應(yīng)用,我們幾乎只需要做IoAdaptor的個(gè)性化實(shí)現(xiàn)就能完成高效的網(wǎng)絡(luò)通訊服務(wù),今天我們將舉例說(shuō)明如何個(gè)性化這個(gè)IoAdaptor。

我們今天要完成的目標(biāo)是:實(shí)現(xiàn)MySQL服務(wù)器的透明代理。效果是,你訪問(wèn)代理服務(wù)器跟訪問(wèn)目標(biāo)MySQL無(wú)差異。

我們?cè)跍y(cè)試環(huán)境10.17.2.30:3306 這臺(tái)機(jī)器上提供了MySql,在我們本地機(jī)器上跑起來(lái)我們今天基于zbus.NET實(shí)現(xiàn)的一個(gè)代理程序,就能達(dá)到下面的效果。


完成大概不到100 行的代碼, Cool?Let’s roll!

首先,我們思考透明TCP代理到底在干啥,透明的TCP代理的業(yè)務(wù)邏輯其實(shí)非常簡(jiǎn)單,可以描述為,將來(lái)自代理上游(發(fā)起請(qǐng)求到代理)的數(shù)據(jù)轉(zhuǎn)發(fā)到目標(biāo)TCP服務(wù)器,把目標(biāo)服務(wù)器回來(lái)的數(shù)據(jù)原路返回代理上游客戶端。 注意這個(gè)原路,如何做到原路返回成為關(guān)鍵點(diǎn)。這個(gè)示例其實(shí)跟MySQL沒有任何關(guān)系,原則上任何TCP層面的服務(wù)都應(yīng)該適配。

基于zbus.NET怎么來(lái)將上面的邏輯在體現(xiàn)出來(lái),也就是如何個(gè)性化IoAdaptor?直觀的講,我們要處理的幾個(gè)事件應(yīng)該包括:1)從上游客戶端發(fā)起的鏈接請(qǐng)求--代理服務(wù)器的Accept事件,2)代理服務(wù)器連接目標(biāo)服務(wù)器的Connect事件,3)上下游的數(shù)據(jù)事件onMessage。

zbus.NET的IoAdaptor提供的個(gè)性化事件如下

基本包括一個(gè)鏈接(客戶端或者服務(wù)端)的生命周期,與消息的編解碼。

我們的代理IoAdaptor就是逐一個(gè)性化處理。

第一步,編解碼: 透明代理對(duì)消息內(nèi)容不做理解,所以不需要編解碼。

// 透?jìng)鞑恍枰幗獯a,簡(jiǎn)單返回ByteBuffer數(shù)據(jù)public IoBuffer encode(Object msg) {if (msg instanceof IoBuffer) {IoBuffer buff = (IoBuffer) msg;return buff;} else {throw new RuntimeException("Message Not Support");}}// 透?jìng)鞑恍枰幗獯a,簡(jiǎn)單返回ByteBuffer數(shù)據(jù)public Object decode(IoBuffer buff) {if (buff.remaining() > 0) {byte[] data = new byte[buff.remaining()];buff.readBytes(data);return IoBuffer.wrap(data);} else {return null;}}

第二步,代理服務(wù)接入:

@Overrideprotected void onSessionAccepted(Session sess) throws IOException {Session target = null;Dispatcher dispatcher = sess.getDispatcher();try {target = dispatcher.createClientSession(targetAddress, this);} catch (Exception e) {sess.asyncClose();return;}sess.chain = target;target.chain = sess;dispatcher.registerSession(SelectionKey.OP_CONNECT, target);}

這里的邏輯思路是,代理服務(wù)器每接受到一個(gè)請(qǐng)求--通過(guò)onSessionAccepted表達(dá),我們將同時(shí)創(chuàng)建一個(gè)到目標(biāo)服務(wù)器的鏈接,今天的例子是目標(biāo)MySQL服務(wù)器,注意上面的處理中把創(chuàng)建目標(biāo)服務(wù)器Session過(guò)程與真正鏈接到目標(biāo)服務(wù)分開(Dispatcher也提供合并二者的工具方法),是為了能在沒有發(fā)生鏈接之前綁定上好上下游關(guān)系,通過(guò)Session的chain變量來(lái)表達(dá),也就是當(dāng)前Session的關(guān)聯(lián)Session,關(guān)聯(lián)好之后啟動(dòng)感興趣Connect事件,邏輯處理完畢。

第三步,鏈接成功事件(第二步中需要鏈接到目標(biāo)服務(wù)器)

@Overridepublic void onSessionConnected(Session sess) throws IOException { Session chain = sess.chain;if(chain == null){ sess.asyncClose();return; } if(sess.isActive() && chain.isActive()){ sess.register(SelectionKey.OP_READ);chain.register(SelectionKey.OP_READ);}}

這里的一個(gè)核心是當(dāng)上下游都處于鏈接正常態(tài),上下游Session都啟動(dòng)感興趣消息讀事件(寫事件是在讀取處理中自動(dòng)觸發(fā)),為什么在這里做的原因是一定要等上下游都正常態(tài)后才啟動(dòng)雙方消息處理,不然會(huì)出現(xiàn)字節(jié)丟失。

第四步,處理上下游數(shù)據(jù)事件

@Overrideprotected void onMessage(Object msg, Session sess) throws IOException { Session chain = sess.chain;if(chain == null){sess.asyncClose(); return;} chain.write(msg); }

是不是非常簡(jiǎn)單,類似pipeline,從一端的數(shù)據(jù)寫到另外一端。

原則上面4步結(jié)束,整個(gè)透明代理就完成了,但是為了處理鏈接異常清理,我們?cè)黾恿薙ession清理處理,如下

@Overridepublic void onSessionToDestroy(Session sess) throws IOException { try {sess.close();} catch (IOException e) { //ignore} if (sess.chain == null) return; try { sess.chain.close(); sess.chain.chain = null;sess.chain = null;} catch (IOException e) { }}

工作就是解決上下游鏈接清理鏈接。

至此為止我們的IoAdaptor個(gè)性化就完成了,是不是非常簡(jiǎn)單,現(xiàn)在我們要跑起來(lái)測(cè)試了,下面的代碼就是上一次講到重復(fù)的設(shè)置,沒有新意。

public static void main(String[] args) throws Exception { Dispatcher dispatcher = new Dispatcher(); IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306"); final Server server = new Server(dispatcher, ioAdaptor, 3306); server.start();}

騷年,包括渣渣import和少許注釋加起來(lái)折騰了不到100行,該跑一跑了,還是那句話,不是HelloWorld,你可以規(guī)模壓力測(cè)。看看你是否在本地代理出來(lái)了你的目標(biāo)服務(wù)MySQL,gl,hf, gogogo.

完整代碼可運(yùn)行代碼如下,也可直接到zbus示例代碼庫(kù)中找到

https://git.oschina.net/rushmore/zbus/blob/master/src/test/java/org/zbus/net/TcpProxyAdaptor.java?dir=0&filepath=src%2Ftest%2Fjava%2Forg%2Fzbus%2Fnet%2FTcpProxyAdaptor.java&oid=08abff381d93519485e1c0ee2c35f1d4f8d1814c&sha=a29272ed99a8f21ec19a14b403ebee53a385e9a4

package org.zbus.net; import java.io.IOException; import java.nio.channels.SelectionKey; import org.zbus.net.core.Dispatcher; import org.zbus.net.core.IoAdaptor; import org.zbus.net.core.IoBuffer; import org.zbus.net.core.Session; public class TcpProxyAdaptor extends IoAdaptor {private String targetAddress;public TcpProxyAdaptor(String targetAddress) {this.targetAddress = targetAddress;}// 透?jìng)鞑恍枰幗獯a,簡(jiǎn)單返回ByteBuffer數(shù)據(jù)public IoBuffer encode(Object msg) {if (msg instanceof IoBuffer) {IoBuffer buff = (IoBuffer) msg;return buff;} else {throw new RuntimeException("Message Not Support");}}// 透?jìng)鞑恍枰幗獯a,簡(jiǎn)單返回ByteBuffer數(shù)據(jù)public Object decode(IoBuffer buff) {if (buff.remaining() > 0) {byte[] data = new byte[buff.remaining()];buff.readBytes(data);return IoBuffer.wrap(data);} else {return null;}}@Overrideprotected void onSessionAccepted(Session sess) throws IOException {Session target = null;Dispatcher dispatcher = sess.getDispatcher();try {target = dispatcher.createClientSession(targetAddress, this);} catch (Exception e) {sess.asyncClose();return;}sess.chain = target;target.chain = sess;dispatcher.registerSession(SelectionKey.OP_CONNECT, target);}@Overridepublic void onSessionConnected(Session sess) throws IOException { Session chain = sess.chain;if(chain == null){ sess.asyncClose();return; } if(sess.isActive() && chain.isActive()){ sess.register(SelectionKey.OP_READ);chain.register(SelectionKey.OP_READ);}}@Overrideprotected void onMessage(Object msg, Session sess) throws IOException { Session chain = sess.chain;if(chain == null){sess.asyncClose(); return;} chain.write(msg); }@Overridepublic void onSessionToDestroy(Session sess) throws IOException { try {sess.close();} catch (IOException e) { //ignore} if (sess.chain == null) return; try { sess.chain.close(); sess.chain.chain = null;sess.chain = null;} catch (IOException e) { }}@SuppressWarnings("resource")public static void main(String[] args) throws Exception { Dispatcher dispatcher = new Dispatcher(); IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306"); final Server server = new Server(dispatcher, ioAdaptor, 3306);server.setServerName("TcpProxyServer");server.start();} }

文章轉(zhuǎn)載自 開源中國(guó)社區(qū)[https://www.oschina.net]

總結(jié)

以上是生活随笔為你收集整理的基于zbus的MySQL透明代理(100行)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。