基于zbus的MySQL透明代理(100行)
項(xiàng)目地址 https://git.oschina.net/rushmore/zbus
我們上次講到zbus網(wǎng)絡(luò)通訊的核心API:
Dispatcher -- 負(fù)責(zé)-NIO網(wǎng)絡(luò)事件Selector引擎的管理,對Selector引擎負(fù)載均衡
IoAdaptor -- 網(wǎng)絡(luò)事件的處理,服務(wù)器與客戶端共用,負(fù)責(zé)讀寫,消息分包組包等
Session -- 代表網(wǎng)絡(luò)鏈接,可以讀寫消息
實(shí)際的應(yīng)用,我們幾乎只需要做IoAdaptor的個性化實(shí)現(xiàn)就能完成高效的網(wǎng)絡(luò)通訊服務(wù),今天我們將舉例說明如何個性化這個IoAdaptor。
我們今天要完成的目標(biāo)是:實(shí)現(xiàn)MySQL服務(wù)器的透明代理。效果是,你訪問代理服務(wù)器跟訪問目標(biāo)MySQL無差異。
我們在測試環(huán)境10.17.2.30:3306 這臺機(jī)器上提供了MySql,在我們本地機(jī)器上跑起來我們今天基于zbus.NET實(shí)現(xiàn)的一個代理程序,就能達(dá)到下面的效果。
完成大概不到100 行的代碼, Cool?Let’s roll!
首先,我們思考透明TCP代理到底在干啥,透明的TCP代理的業(yè)務(wù)邏輯其實(shí)非常簡單,可以描述為,將來自代理上游(發(fā)起請求到代理)的數(shù)據(jù)轉(zhuǎn)發(fā)到目標(biāo)TCP服務(wù)器,把目標(biāo)服務(wù)器回來的數(shù)據(jù)原路返回代理上游客戶端。 注意這個原路,如何做到原路返回成為關(guān)鍵點(diǎn)。這個示例其實(shí)跟MySQL沒有任何關(guān)系,原則上任何TCP層面的服務(wù)都應(yīng)該適配。基于zbus.NET怎么來將上面的邏輯在體現(xiàn)出來,也就是如何個性化IoAdaptor?直觀的講,我們要處理的幾個事件應(yīng)該包括:1)從上游客戶端發(fā)起的鏈接請求--代理服務(wù)器的Accept事件,2)代理服務(wù)器連接目標(biāo)服務(wù)器的Connect事件,3)上下游的數(shù)據(jù)事件onMessage。
zbus.NET的IoAdaptor提供的個性化事件如下
基本包括一個鏈接(客戶端或者服務(wù)端)的生命周期,與消息的編解碼。
我們的代理IoAdaptor就是逐一個性化處理。
第一步,編解碼: 透明代理對消息內(nèi)容不做理解,所以不需要編解碼。
// 透傳不需要編解碼,簡單返回ByteBuffer數(shù)據(jù)public IoBuffer encode(Object msg) {if (msg instanceof IoBuffer) {IoBuffer buff = (IoBuffer) msg;return buff;} else {throw new RuntimeException("Message Not Support");}}// 透傳不需要編解碼,簡單返回ByteBuffer數(shù)據(jù)public Object decode(IoBuffer buff) {if (buff.remaining() > 0) {byte[] data = new byte[buff.remaining()];buff.readBytes(data);return IoBuffer.wrap(data);} else {return null;}}第二步,代理服務(wù)接入:
@Overrideprotected void onSessionAccepted(Session sess) throws IOException {Session target = null;Dispatcher dispatcher = sess.getDispatcher();try {target = dispatcher.createClientSession(targetAddress, this);} catch (Exception e) {sess.asyncClose();return;}sess.chain = target;target.chain = sess;dispatcher.registerSession(SelectionKey.OP_CONNECT, target);}這里的邏輯思路是,代理服務(wù)器每接受到一個請求--通過onSessionAccepted表達(dá),我們將同時創(chuàng)建一個到目標(biāo)服務(wù)器的鏈接,今天的例子是目標(biāo)MySQL服務(wù)器,注意上面的處理中把創(chuàng)建目標(biāo)服務(wù)器Session過程與真正鏈接到目標(biāo)服務(wù)分開(Dispatcher也提供合并二者的工具方法),是為了能在沒有發(fā)生鏈接之前綁定上好上下游關(guān)系,通過Session的chain變量來表達(dá),也就是當(dāng)前Session的關(guān)聯(lián)Session,關(guān)聯(lián)好之后啟動感興趣Connect事件,邏輯處理完畢。
第三步,鏈接成功事件(第二步中需要鏈接到目標(biāo)服務(wù)器)
@Overridepublic void onSessionConnected(Session sess) throws IOException { Session chain = sess.chain;if(chain == null){ sess.asyncClose();return; } if(sess.isActive() && chain.isActive()){ sess.register(SelectionKey.OP_READ);chain.register(SelectionKey.OP_READ);}}這里的一個核心是當(dāng)上下游都處于鏈接正常態(tài),上下游Session都啟動感興趣消息讀事件(寫事件是在讀取處理中自動觸發(fā)),為什么在這里做的原因是一定要等上下游都正常態(tài)后才啟動雙方消息處理,不然會出現(xiàn)字節(jié)丟失。
第四步,處理上下游數(shù)據(jù)事件
@Overrideprotected void onMessage(Object msg, Session sess) throws IOException { Session chain = sess.chain;if(chain == null){sess.asyncClose(); return;} chain.write(msg); }是不是非常簡單,類似pipeline,從一端的數(shù)據(jù)寫到另外一端。
原則上面4步結(jié)束,整個透明代理就完成了,但是為了處理鏈接異常清理,我們增加了Session清理處理,如下
@Overridepublic void onSessionToDestroy(Session sess) throws IOException { try {sess.close();} catch (IOException e) { //ignore} if (sess.chain == null) return; try { sess.chain.close(); sess.chain.chain = null;sess.chain = null;} catch (IOException e) { }}工作就是解決上下游鏈接清理鏈接。
至此為止我們的IoAdaptor個性化就完成了,是不是非常簡單,現(xiàn)在我們要跑起來測試了,下面的代碼就是上一次講到重復(fù)的設(shè)置,沒有新意。
public static void main(String[] args) throws Exception { Dispatcher dispatcher = new Dispatcher(); IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306"); final Server server = new Server(dispatcher, ioAdaptor, 3306); server.start();}騷年,包括渣渣import和少許注釋加起來折騰了不到100行,該跑一跑了,還是那句話,不是HelloWorld,你可以規(guī)模壓力測。看看你是否在本地代理出來了你的目標(biāo)服務(wù)MySQL,gl,hf, gogogo.
完整代碼可運(yùn)行代碼如下,也可直接到zbus示例代碼庫中找到
https://git.oschina.net/rushmore/zbus/blob/master/src/test/java/org/zbus/net/TcpProxyAdaptor.java?dir=0&filepath=src%2Ftest%2Fjava%2Forg%2Fzbus%2Fnet%2FTcpProxyAdaptor.java&oid=08abff381d93519485e1c0ee2c35f1d4f8d1814c&sha=a29272ed99a8f21ec19a14b403ebee53a385e9a4
package org.zbus.net; import java.io.IOException; import java.nio.channels.SelectionKey; import org.zbus.net.core.Dispatcher; import org.zbus.net.core.IoAdaptor; import org.zbus.net.core.IoBuffer; import org.zbus.net.core.Session; public class TcpProxyAdaptor extends IoAdaptor {private String targetAddress;public TcpProxyAdaptor(String targetAddress) {this.targetAddress = targetAddress;}// 透傳不需要編解碼,簡單返回ByteBuffer數(shù)據(jù)public IoBuffer encode(Object msg) {if (msg instanceof IoBuffer) {IoBuffer buff = (IoBuffer) msg;return buff;} else {throw new RuntimeException("Message Not Support");}}// 透傳不需要編解碼,簡單返回ByteBuffer數(shù)據(jù)public Object decode(IoBuffer buff) {if (buff.remaining() > 0) {byte[] data = new byte[buff.remaining()];buff.readBytes(data);return IoBuffer.wrap(data);} else {return null;}}@Overrideprotected void onSessionAccepted(Session sess) throws IOException {Session target = null;Dispatcher dispatcher = sess.getDispatcher();try {target = dispatcher.createClientSession(targetAddress, this);} catch (Exception e) {sess.asyncClose();return;}sess.chain = target;target.chain = sess;dispatcher.registerSession(SelectionKey.OP_CONNECT, target);}@Overridepublic void onSessionConnected(Session sess) throws IOException { Session chain = sess.chain;if(chain == null){ sess.asyncClose();return; } if(sess.isActive() && chain.isActive()){ sess.register(SelectionKey.OP_READ);chain.register(SelectionKey.OP_READ);}}@Overrideprotected void onMessage(Object msg, Session sess) throws IOException { Session chain = sess.chain;if(chain == null){sess.asyncClose(); return;} chain.write(msg); }@Overridepublic void onSessionToDestroy(Session sess) throws IOException { try {sess.close();} catch (IOException e) { //ignore} if (sess.chain == null) return; try { sess.chain.close(); sess.chain.chain = null;sess.chain = null;} catch (IOException e) { }}@SuppressWarnings("resource")public static void main(String[] args) throws Exception { Dispatcher dispatcher = new Dispatcher(); IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306"); final Server server = new Server(dispatcher, ioAdaptor, 3306);server.setServerName("TcpProxyServer");server.start();} }文章轉(zhuǎn)載自 開源中國社區(qū)[https://www.oschina.net]
總結(jié)
以上是生活随笔為你收集整理的基于zbus的MySQL透明代理(100行)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 梦到别人要抓我是什么预兆
- 下一篇: Redis 它是什么?它用来做什么?它的