日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

mysql 透明代理_透明代理MySQL_基于zbus的MySQL透明代理(100行)-云栖社区

發(fā)布時間:2025/3/19 数据库 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 mysql 透明代理_透明代理MySQL_基于zbus的MySQL透明代理(100行)-云栖社区 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

我們上次講到zbus網(wǎng)絡(luò)通訊的核心API:

Dispatcher -- 負責(zé)-NIO網(wǎng)絡(luò)事件Selector引擎的管理,對Selector引擎負載均衡

IoAdaptor -- 網(wǎng)絡(luò)事件的處理,服務(wù)器與客戶端共用,負責(zé)讀寫,消息分包組包等

Session -- 代表網(wǎng)絡(luò)鏈接,可以讀寫消息

實際的應(yīng)用,我們幾乎只需要做IoAdaptor的個性化實現(xiàn)就能完成高效的網(wǎng)絡(luò)通訊服務(wù),今天我們將舉例說明如何個性化這個IoAdaptor。

我們今天要完成的目標是:實現(xiàn)MySQL服務(wù)器的透明代理。效果是,你訪問代理服務(wù)器跟訪問目標MySQL無差異。

我們在測試環(huán)境10.17.2.30:3306 這臺機器上提供了MySql,在我們本地機器上跑起來我們今天基于zbus.NET實現(xiàn)的一個代理程序,就能達到下面的效果。

完成大概不到100 行的代碼, Cool?Let’s roll!

首先,我們思考透明TCP代理到底在干啥,透明的TCP代理的業(yè)務(wù)邏輯其實非常簡單,可以描述為,將來自代理上游(發(fā)起請求到代理)的數(shù)據(jù)轉(zhuǎn)發(fā)到目標TCP服務(wù)器,把目標服務(wù)器回來的數(shù)據(jù)原路返回代理上游客戶端。 注意這個原路,如何做到原路返回成為關(guān)鍵點。這個示例其實跟MySQL沒有任何關(guān)系,原則上任何TCP層面的服務(wù)都應(yīng)該適配。

基于zbus.NET怎么來將上面的邏輯在體現(xiàn)出來,也就是如何個性化IoAdaptor?直觀的講,我們要處理的幾個事件應(yīng)該包括:1)從上游客戶端發(fā)起的鏈接請求--代理服務(wù)器的Accept事件,2)代理服務(wù)器連接目標服務(wù)器的Connect事件,3)上下游的數(shù)據(jù)事件onMessage。

zbus.NET的IoAdaptor提供的個性化事件如下

基本包括一個鏈接(客戶端或者服務(wù)端)的生命周期,與消息的編解碼。

我們的代理IoAdaptor就是逐一個性化處理。

第一步,編解碼: 透明代理對消息內(nèi)容不做理解,所以不需要編解碼。

// 透傳不需要編解碼,簡單返回ByteBuffer數(shù)據(jù)

public IoBuffer encode(Object msg) {

if (msg instanceof IoBuffer) {

IoBuffer buff = (IoBuffer) msg;

return buff;

} else {

throw new RuntimeException("Message Not Support");

}

}

// 透傳不需要編解碼,簡單返回ByteBuffer數(shù)據(jù)

public Object decode(IoBuffer buff) {

if (buff.remaining() > 0) {

byte[] data = new byte[buff.remaining()];

buff.readBytes(data);

return IoBuffer.wrap(data);

} else {

return null;

}

}

第二步,代理服務(wù)接入:

@Override

protected void onSessionAccepted(Session sess) throws IOException {

Session target = null;

Dispatcher dispatcher = sess.getDispatcher();

try {

target = dispatcher.createClientSession(targetAddress, this);

} catch (Exception e) {

sess.asyncClose();

return;

}

sess.chain = target;

target.chain = sess;

dispatcher.registerSession(SelectionKey.OP_CONNECT, target);

}

這里的邏輯思路是,代理服務(wù)器每接受到一個請求--通過onSessionAccepted表達,我們將同時創(chuàng)建一個到目標服務(wù)器的鏈接,今天的例子是目標MySQL服務(wù)器,注意上面的處理中把創(chuàng)建目標服務(wù)器Session過程與真正鏈接到目標服務(wù)分開(Dispatcher也提供合并二者的工具方法),是為了能在沒有發(fā)生鏈接之前綁定上好上下游關(guān)系,通過Session的chain變量來表達,也就是當前Session的關(guān)聯(lián)Session,關(guān)聯(lián)好之后啟動感興趣Connect事件,邏輯處理完畢。

第三步,鏈接成功事件(第二步中需要鏈接到目標服務(wù)器)

@Override

public void onSessionConnected(Session sess) throws IOException {

Session chain = sess.chain;

if(chain == null){

sess.asyncClose();

return;

}

if(sess.isActive() && chain.isActive()){

sess.register(SelectionKey.OP_READ);

chain.register(SelectionKey.OP_READ);

}

}

這里的一個核心是當上下游都處于鏈接正常態(tài),上下游Session都啟動感興趣消息讀事件(寫事件是在讀取處理中自動觸發(fā)),為什么在這里做的原因是一定要等上下游都正常態(tài)后才啟動雙方消息處理,不然會出現(xiàn)字節(jié)丟失。

第四步,處理上下游數(shù)據(jù)事件

@Override

protected void onMessage(Object msg, Session sess) throws IOException {

Session chain = sess.chain;

if(chain == null){

sess.asyncClose();

return;

}

chain.write(msg);

}

是不是非常簡單,類似pipeline,從一端的數(shù)據(jù)寫到另外一端。

原則上面4步結(jié)束,整個透明代理就完成了,但是為了處理鏈接異常清理,我們增加了Session清理處理,如下

@Override

public void onSessionToDestroy(Session sess) throws IOException {

try {

sess.close();

} catch (IOException e) { //ignore

}

if (sess.chain == null) return;

try {

sess.chain.close();

sess.chain.chain = null;

sess.chain = null;

} catch (IOException e) {

}

}

工作就是解決上下游鏈接清理鏈接。

至此為止我們的IoAdaptor個性化就完成了,是不是非常簡單,現(xiàn)在我們要跑起來測試了,下面的代碼就是上一次講到重復(fù)的設(shè)置,沒有新意。

public static void main(String[] args) throws Exception {

Dispatcher dispatcher = new Dispatcher();

IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306");

final Server server = new Server(dispatcher, ioAdaptor, 3306);

server.start();

}

騷年,包括渣渣import和少許注釋加起來折騰了不到100行,該跑一跑了,還是那句話,不是HelloWorld,你可以規(guī)模壓力測??纯茨闶欠裨诒镜卮沓鰜砹四愕哪繕朔?wù)MySQL,gl,hf, gogogo.

完整代碼可運行代碼如下,也可直接到zbus示例代碼庫中找到

package org.zbus.net;

import java.io.IOException;

import java.nio.channels.SelectionKey;

import org.zbus.net.core.Dispatcher;

import org.zbus.net.core.IoAdaptor;

import org.zbus.net.core.IoBuffer;

import org.zbus.net.core.Session;

public class TcpProxyAdaptor extends IoAdaptor {

private String targetAddress;

public TcpProxyAdaptor(String targetAddress) {

this.targetAddress = targetAddress;

}

// 透傳不需要編解碼,簡單返回ByteBuffer數(shù)據(jù)

public IoBuffer encode(Object msg) {

if (msg instanceof IoBuffer) {

IoBuffer buff = (IoBuffer) msg;

return buff;

} else {

throw new RuntimeException("Message Not Support");

}

}

// 透傳不需要編解碼,簡單返回ByteBuffer數(shù)據(jù)

public Object decode(IoBuffer buff) {

if (buff.remaining() > 0) {

byte[] data = new byte[buff.remaining()];

buff.readBytes(data);

return IoBuffer.wrap(data);

} else {

return null;

}

}

@Override

protected void onSessionAccepted(Session sess) throws IOException {

Session target = null;

Dispatcher dispatcher = sess.getDispatcher();

try {

target = dispatcher.createClientSession(targetAddress, this);

} catch (Exception e) {

sess.asyncClose();

return;

}

sess.chain = target;

target.chain = sess;

dispatcher.registerSession(SelectionKey.OP_CONNECT, target);

}

@Override

public void onSessionConnected(Session sess) throws IOException {

Session chain = sess.chain;

if(chain == null){

sess.asyncClose();

return;

}

if(sess.isActive() && chain.isActive()){

sess.register(SelectionKey.OP_READ);

chain.register(SelectionKey.OP_READ);

}

}

@Override

protected void onMessage(Object msg, Session sess) throws IOException {

Session chain = sess.chain;

if(chain == null){

sess.asyncClose();

return;

}

chain.write(msg);

}

@Override

public void onSessionToDestroy(Session sess) throws IOException {

try {

sess.close();

} catch (IOException e) { //ignore

}

if (sess.chain == null) return;

try {

sess.chain.close();

sess.chain.chain = null;

sess.chain = null;

} catch (IOException e) {

}

}

@SuppressWarnings("resource")

public static void main(String[] args) throws Exception {

Dispatcher dispatcher = new Dispatcher();

IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306");

final Server server = new Server(dispatcher, ioAdaptor, 3306);

server.setServerName("TcpProxyServer");

server.start();

}

}

總結(jié)

以上是生活随笔為你收集整理的mysql 透明代理_透明代理MySQL_基于zbus的MySQL透明代理(100行)-云栖社区的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。