mysql 透明代理_透明代理MySQL_基于zbus的MySQL透明代理(100行)-云栖社区
我們上次講到zbus網(wǎng)絡(luò)通訊的核心API:
Dispatcher -- 負責(zé)-NIO網(wǎng)絡(luò)事件Selector引擎的管理,對Selector引擎負載均衡
IoAdaptor -- 網(wǎng)絡(luò)事件的處理,服務(wù)器與客戶端共用,負責(zé)讀寫,消息分包組包等
Session -- 代表網(wǎng)絡(luò)鏈接,可以讀寫消息
實際的應(yīng)用,我們幾乎只需要做IoAdaptor的個性化實現(xiàn)就能完成高效的網(wǎng)絡(luò)通訊服務(wù),今天我們將舉例說明如何個性化這個IoAdaptor。
我們今天要完成的目標是:實現(xiàn)MySQL服務(wù)器的透明代理。效果是,你訪問代理服務(wù)器跟訪問目標MySQL無差異。
我們在測試環(huán)境10.17.2.30:3306 這臺機器上提供了MySql,在我們本地機器上跑起來我們今天基于zbus.NET實現(xiàn)的一個代理程序,就能達到下面的效果。
完成大概不到100 行的代碼, Cool?Let’s roll!
首先,我們思考透明TCP代理到底在干啥,透明的TCP代理的業(yè)務(wù)邏輯其實非常簡單,可以描述為,將來自代理上游(發(fā)起請求到代理)的數(shù)據(jù)轉(zhuǎn)發(fā)到目標TCP服務(wù)器,把目標服務(wù)器回來的數(shù)據(jù)原路返回代理上游客戶端。 注意這個原路,如何做到原路返回成為關(guān)鍵點。這個示例其實跟MySQL沒有任何關(guān)系,原則上任何TCP層面的服務(wù)都應(yīng)該適配。
基于zbus.NET怎么來將上面的邏輯在體現(xiàn)出來,也就是如何個性化IoAdaptor?直觀的講,我們要處理的幾個事件應(yīng)該包括:1)從上游客戶端發(fā)起的鏈接請求--代理服務(wù)器的Accept事件,2)代理服務(wù)器連接目標服務(wù)器的Connect事件,3)上下游的數(shù)據(jù)事件onMessage。
zbus.NET的IoAdaptor提供的個性化事件如下
基本包括一個鏈接(客戶端或者服務(wù)端)的生命周期,與消息的編解碼。
我們的代理IoAdaptor就是逐一個性化處理。
第一步,編解碼: 透明代理對消息內(nèi)容不做理解,所以不需要編解碼。
// 透傳不需要編解碼,簡單返回ByteBuffer數(shù)據(jù)
public IoBuffer encode(Object msg) {
if (msg instanceof IoBuffer) {
IoBuffer buff = (IoBuffer) msg;
return buff;
} else {
throw new RuntimeException("Message Not Support");
}
}
// 透傳不需要編解碼,簡單返回ByteBuffer數(shù)據(jù)
public Object decode(IoBuffer buff) {
if (buff.remaining() > 0) {
byte[] data = new byte[buff.remaining()];
buff.readBytes(data);
return IoBuffer.wrap(data);
} else {
return null;
}
}
第二步,代理服務(wù)接入:
@Override
protected void onSessionAccepted(Session sess) throws IOException {
Session target = null;
Dispatcher dispatcher = sess.getDispatcher();
try {
target = dispatcher.createClientSession(targetAddress, this);
} catch (Exception e) {
sess.asyncClose();
return;
}
sess.chain = target;
target.chain = sess;
dispatcher.registerSession(SelectionKey.OP_CONNECT, target);
}
這里的邏輯思路是,代理服務(wù)器每接受到一個請求--通過onSessionAccepted表達,我們將同時創(chuàng)建一個到目標服務(wù)器的鏈接,今天的例子是目標MySQL服務(wù)器,注意上面的處理中把創(chuàng)建目標服務(wù)器Session過程與真正鏈接到目標服務(wù)分開(Dispatcher也提供合并二者的工具方法),是為了能在沒有發(fā)生鏈接之前綁定上好上下游關(guān)系,通過Session的chain變量來表達,也就是當前Session的關(guān)聯(lián)Session,關(guān)聯(lián)好之后啟動感興趣Connect事件,邏輯處理完畢。
第三步,鏈接成功事件(第二步中需要鏈接到目標服務(wù)器)
@Override
public void onSessionConnected(Session sess) throws IOException {
Session chain = sess.chain;
if(chain == null){
sess.asyncClose();
return;
}
if(sess.isActive() && chain.isActive()){
sess.register(SelectionKey.OP_READ);
chain.register(SelectionKey.OP_READ);
}
}
這里的一個核心是當上下游都處于鏈接正常態(tài),上下游Session都啟動感興趣消息讀事件(寫事件是在讀取處理中自動觸發(fā)),為什么在這里做的原因是一定要等上下游都正常態(tài)后才啟動雙方消息處理,不然會出現(xiàn)字節(jié)丟失。
第四步,處理上下游數(shù)據(jù)事件
@Override
protected void onMessage(Object msg, Session sess) throws IOException {
Session chain = sess.chain;
if(chain == null){
sess.asyncClose();
return;
}
chain.write(msg);
}
是不是非常簡單,類似pipeline,從一端的數(shù)據(jù)寫到另外一端。
原則上面4步結(jié)束,整個透明代理就完成了,但是為了處理鏈接異常清理,我們增加了Session清理處理,如下
@Override
public void onSessionToDestroy(Session sess) throws IOException {
try {
sess.close();
} catch (IOException e) { //ignore
}
if (sess.chain == null) return;
try {
sess.chain.close();
sess.chain.chain = null;
sess.chain = null;
} catch (IOException e) {
}
}
工作就是解決上下游鏈接清理鏈接。
至此為止我們的IoAdaptor個性化就完成了,是不是非常簡單,現(xiàn)在我們要跑起來測試了,下面的代碼就是上一次講到重復(fù)的設(shè)置,沒有新意。
public static void main(String[] args) throws Exception {
Dispatcher dispatcher = new Dispatcher();
IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306");
final Server server = new Server(dispatcher, ioAdaptor, 3306);
server.start();
}
騷年,包括渣渣import和少許注釋加起來折騰了不到100行,該跑一跑了,還是那句話,不是HelloWorld,你可以規(guī)模壓力測??纯茨闶欠裨诒镜卮沓鰜砹四愕哪繕朔?wù)MySQL,gl,hf, gogogo.
完整代碼可運行代碼如下,也可直接到zbus示例代碼庫中找到
package org.zbus.net;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import org.zbus.net.core.Dispatcher;
import org.zbus.net.core.IoAdaptor;
import org.zbus.net.core.IoBuffer;
import org.zbus.net.core.Session;
public class TcpProxyAdaptor extends IoAdaptor {
private String targetAddress;
public TcpProxyAdaptor(String targetAddress) {
this.targetAddress = targetAddress;
}
// 透傳不需要編解碼,簡單返回ByteBuffer數(shù)據(jù)
public IoBuffer encode(Object msg) {
if (msg instanceof IoBuffer) {
IoBuffer buff = (IoBuffer) msg;
return buff;
} else {
throw new RuntimeException("Message Not Support");
}
}
// 透傳不需要編解碼,簡單返回ByteBuffer數(shù)據(jù)
public Object decode(IoBuffer buff) {
if (buff.remaining() > 0) {
byte[] data = new byte[buff.remaining()];
buff.readBytes(data);
return IoBuffer.wrap(data);
} else {
return null;
}
}
@Override
protected void onSessionAccepted(Session sess) throws IOException {
Session target = null;
Dispatcher dispatcher = sess.getDispatcher();
try {
target = dispatcher.createClientSession(targetAddress, this);
} catch (Exception e) {
sess.asyncClose();
return;
}
sess.chain = target;
target.chain = sess;
dispatcher.registerSession(SelectionKey.OP_CONNECT, target);
}
@Override
public void onSessionConnected(Session sess) throws IOException {
Session chain = sess.chain;
if(chain == null){
sess.asyncClose();
return;
}
if(sess.isActive() && chain.isActive()){
sess.register(SelectionKey.OP_READ);
chain.register(SelectionKey.OP_READ);
}
}
@Override
protected void onMessage(Object msg, Session sess) throws IOException {
Session chain = sess.chain;
if(chain == null){
sess.asyncClose();
return;
}
chain.write(msg);
}
@Override
public void onSessionToDestroy(Session sess) throws IOException {
try {
sess.close();
} catch (IOException e) { //ignore
}
if (sess.chain == null) return;
try {
sess.chain.close();
sess.chain.chain = null;
sess.chain = null;
} catch (IOException e) {
}
}
@SuppressWarnings("resource")
public static void main(String[] args) throws Exception {
Dispatcher dispatcher = new Dispatcher();
IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306");
final Server server = new Server(dispatcher, ioAdaptor, 3306);
server.setServerName("TcpProxyServer");
server.start();
}
}
總結(jié)
以上是生活随笔為你收集整理的mysql 透明代理_透明代理MySQL_基于zbus的MySQL透明代理(100行)-云栖社区的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux解压mysql文件命令行_li
- 下一篇: mysql安装教程8.0.21安装_my