日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

数据库路由中间件MyCat - 源代码篇(7)

發布時間:2025/4/16 数据库 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 数据库路由中间件MyCat - 源代码篇(7) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

此文已由作者張鎬薪授權網易云社區發布。

歡迎訪問網易云社區,了解更多網易技術產品運營經驗。

3. 連接模塊


3.4 FrontendConnection前端連接

構造方法:

public?FrontendConnection(NetworkChannel?channel)?throws?IOException?{?????super(channel);InetSocketAddress?localAddr?=?(InetSocketAddress)?channel.getLocalAddress();InetSocketAddress?remoteAddr?=?null;?????if?(channel?instanceof?SocketChannel)?{remoteAddr?=?(InetSocketAddress)?((SocketChannel)?channel).getRemoteAddress();????}?else?if?(channel?instanceof?AsynchronousSocketChannel)?{remoteAddr?=?(InetSocketAddress)?((AsynchronousSocketChannel)?channel).getRemoteAddress();}?????this.host?=?remoteAddr.getHostString();?????this.port?=?localAddr.getPort();?????this.localPort?=?remoteAddr.getPort();?????this.handler?=?new?FrontendAuthenticator(this);}

FrontendConnection是對前端連接channel的封裝,接受NetworkChannel作為參數構造。前端連接建立,需要先驗證其權限,所以,handler首先設置為FrontendAuthenticator 等到驗證成功,handler會被設置成FrontendCommandHandler。 下面來看和FrontendConnection相關的Handler: ?FrontendCommandHandler會先解析請求類型,之后調用不同的方法處理不同類型的請求。例如,FrontendQueryHandler會解析query類型的sql請求語句:

?@Overridepublic?void?handle(byte[]?data){????????if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData()){MySQLMessage?mm?=?new?MySQLMessage(data);????????????int??packetLength?=?mm.readUB3();????????????if(packetLength+4==data.length){source.loadDataInfileData(data);}????????????return;}????????switch?(data[4]){????????????case?MySQLPacket.COM_INIT_DB:commands.doInitDB();source.initDB(data);????????????????break;????????????case?MySQLPacket.COM_QUERY:commands.doQuery();source.query(data);????????????????break;????????????case?MySQLPacket.COM_PING:commands.doPing();source.ping();????????????????break;????????????case?MySQLPacket.COM_QUIT:commands.doQuit();source.close("quit?cmd");????????????????break;????????????case?MySQLPacket.COM_PROCESS_KILL:commands.doKill();source.kill(data);????????????????break;????????????case?MySQLPacket.COM_STMT_PREPARE:commands.doStmtPrepare();source.stmtPrepare(data);????????????????break;????????????case?MySQLPacket.COM_STMT_EXECUTE:commands.doStmtExecute();source.stmtExecute(data);????????????????break;????????????case?MySQLPacket.COM_STMT_CLOSE:commands.doStmtClose();source.stmtClose(data);????????????????break;????????????case?MySQLPacket.COM_HEARTBEAT:commands.doHeartbeat();source.heartbeat(data);????????????????break;????????????default:commands.doOther();source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR,?????????????????????????????"Unknown?command");}}

FrontendCommandHandler會調用FrontendConnection合適的方法解析處理不同的請求,例如它的initDB(byte[] data)方法:

????public?void?initDB(byte[]?data)?{MySQLMessage?mm?=?new?MySQLMessage(data);mm.position(5);String?db?=?mm.readString();????????//?檢查schema的有效性if?(db?==?null?||?!privileges.schemaExists(db))?{writeErrMessage(ErrorCode.ER_BAD_DB_ERROR,?"Unknown?database?'"?+?db?+?"'");????????????return;}????????if?(!privileges.userExists(user,?host))?{writeErrMessage(ErrorCode.ER_ACCESS_DENIED_ERROR,?"Access?denied?for?user?'"?+?user?+?"'");????????????return;}Set<String>?schemas?=?privileges.getUserSchemas(user);????????if?(schemas?==?null?||?schemas.size()?==?0?||?schemas.contains(db))?{????????????this.schema?=?db;write(writeToBuffer(OkPacket.OK,?allocate()));}?else?{String?s?=?"Access?denied?for?user?'"?+?user?+?"'?to?database?'"?+?db?+?"'";writeErrMessage(ErrorCode.ER_DBACCESS_DENIED_ERROR,?s);}}

方法調用: 通過查看可以發現,在command packet被解析出是initDB類型的請求時(其實就是用戶發送的查詢語句為“use XXX”),會調用此方法進行處理,同時,這些方法都是被RW線程執行的。 此方法從FrontedPrivilege中驗證用戶是否有權限訪問這個邏輯庫,如果有就把當前連接的邏輯庫設為用戶請求的邏輯庫。 其他方法與handler也是相似的關系,可以看出,FrontendConnection組合了多種封裝的handler來處理不同的請求的不同階段。至于各種handler,會在之后sql解析,sql路由,協議實現等模塊詳細介紹。

3.4.1 ServerConnection服務端連接

前端連接包括ServerConnection(服務端連接)和ManagerConnection(管理端連接)。前端鏈接不會直接創建,而是通過工廠創建: 工廠方法:

@Overrideprotected?FrontendConnection?getConnection(NetworkChannel?channel)?throws?IOException?{SystemConfig?sys?=?MycatServer.getInstance().getConfig().getSystem();ServerConnection?c?=?new?ServerConnection(channel);MycatServer.getInstance().getConfig().setSocketParams(c,?true);c.setPrivileges(MycatPrivileges.instance());c.setQueryHandler(new?ServerQueryHandler(c));c.setLoadDataInfileHandler(new?ServerLoadDataInfileHandler(c));????????//?c.setPrepareHandler(new?ServerPrepareHandler(c));c.setTxIsolation(sys.getTxIsolation());c.setSession2(new?NonBlockingSession(c));????????return?c;}

可以看出,每個新的ServerConnection都會綁定一個新的ServerQueryHandler負責處理sql指令,一個ServerLoadDataInfileHandler負責處理文件載入命令,一個session負責處理事務 下面是相關的類圖 ?這里的所有獨立的handler里面都是static方法,可供其他類直接調用。每個ServerConnection都會有一個NonBlockingSession來處理。 這里說下連接、會話、邏輯庫、MyCat實例的關系(與MySQL里面的連接、會話、數據庫、MySQL實例的關系不太一樣);首先每個MyCat實例都管理多個數據庫。連接是針對MyCat實例建立的,并且,MyCat的連接(AbstractConnection)是不可復用的,在close方法會關閉連接并清理使用的資源。但是緩存資源(buffer)是可以復用的。比如,在一個前端連接長時間空閑時或者出現異常時,會被清理掉。每個連接會擁有一個session來處理事務,保存會話信息。 這里,每個連接擁有一個會話。每個連接中的方法,被RW線程執行,相當于與RW線程綁定。RW線程是可以復用的,這里相當于MySQL中的連接是可以復用的(連接池)。 Session.java:

public?interface?Session?{????/***?取得源端連接*/FrontendConnection?getSource();????/***?取得當前目標端數量*/int?getTargetCount();????/***?開啟一個會話執行*/void?execute(RouteResultset?rrs,?int?type);????/***?提交一個會話執行*/void?commit();????/***?回滾一個會話執行*/void?rollback();????/***?取消一個正在執行中的會話*?*?@param?sponsor*????????????如果發起者為null,則表示由自己發起。*/void?cancel(FrontendConnection?sponsor);????/***?終止會話,必須在關閉源端連接后執行該方法。*/void?terminate();}

下面我們著重研究它的實現類NonBlockingSession: 首先,取得源端連接方法FrontendConnection getSource();,其實就是NonBlockingSession在創建時就已綁定一個連接,誰會調用這個方法取得源端鏈接呢? ?可以發現,主要有各種查詢的handler還有SQLengine會去調用。因為處理無論返回什么結果,都需要返回給源端。 int getTargetCount();取得當前目標端數量。根據目標端的數量不同會用不同的handler處理轉發SQL和合并結果。

@Overridepublic?void?execute(RouteResultset?rrs,?int?type)?{????????//?清理之前處理用的資源clearHandlesResources();????????if?(LOGGER.isDebugEnabled())?{StringBuilder?s?=?new?StringBuilder();LOGGER.debug(s.append(source).append(rrs).toString()?+?"?rrs?");}????????//?檢查路由結果是否為空RouteResultsetNode[]?nodes?=?rrs.getNodes();????????if?(nodes?==?null?||?nodes.length?==?0?||?nodes[0].getName()?==?null||?nodes[0].getName().equals(""))?{????????????//如果為空,則表名有誤,提示客戶端source.writeErrMessage(ErrorCode.ER_NO_DB_ERROR,????????????????????"No?dataNode?found?,please?check?tables?defined?in?schema:"+?source.getSchema());????????????return;}????????//如果路由結果個數為1,則為單點查詢或事務if?(nodes.length?==?1)?{????????????//使用SingleNodeHandler處理單點查詢或事務singleNodeHandler?=?new?SingleNodeHandler(rrs,?this);????????????try?{singleNodeHandler.execute();}?catch?(Exception?e)?{LOGGER.warn(new?StringBuilder().append(source).append(rrs),?e);source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA,?e.toString());}}?else?{????????????//如果路由結果>1,則為多點查詢或事務boolean?autocommit?=?source.isAutocommit();SystemConfig?sysConfig?=?MycatServer.getInstance().getConfig().getSystem();????????????//mutiNodeLimitType沒有用。。。int?mutiNodeLimitType?=?sysConfig.getMutiNodeLimitType();????????????//使用multiNodeHandler處理多點查詢或事務multiNodeHandler?=?new?MultiNodeQueryHandler(type,?rrs,?autocommit,????????????????????this);????????????try?{multiNodeHandler.execute();}?catch?(Exception?e)?{LOGGER.warn(new?StringBuilder().append(source).append(rrs),?e);source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA,?e.toString());}}}

每次一個Session執行SQL時,會先清理handler使用的資源。SingleNodeHandler與multiNodeHandler之后會講。這里的handler我們之后會在每個模塊去講,Session之后也還會提到,敬請期待


免費體驗云安全(易盾)內容安全、驗證碼等服務

更多網易技術、產品、運營經驗分享請點擊。




相關文章:
【推薦】?Android TV 開發(5)
【推薦】?分布式存儲系統可靠性系列三:設計模式

轉載于:https://www.cnblogs.com/zyfd/p/9894735.html

《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀

總結

以上是生活随笔為你收集整理的数据库路由中间件MyCat - 源代码篇(7)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。