数据库路由中间件MyCat - 源代码篇(7)
此文已由作者張鎬薪授權(quán)網(wǎng)易云社區(qū)發(fā)布。
歡迎訪問網(wǎng)易云社區(qū),了解更多網(wǎng)易技術(shù)產(chǎn)品運營經(jīng)驗。
3. 連接模塊
3.4 FrontendConnection前端連接
構(gòu)造方法:
public?FrontendConnection(NetworkChannel?channel)?throws?IOException?{?????super(channel);InetSocketAddress?localAddr?=?(InetSocketAddress)?channel.getLocalAddress();InetSocketAddress?remoteAddr?=?null;?????if?(channel?instanceof?SocketChannel)?{remoteAddr?=?(InetSocketAddress)?((SocketChannel)?channel).getRemoteAddress();????}?else?if?(channel?instanceof?AsynchronousSocketChannel)?{remoteAddr?=?(InetSocketAddress)?((AsynchronousSocketChannel)?channel).getRemoteAddress();}?????this.host?=?remoteAddr.getHostString();?????this.port?=?localAddr.getPort();?????this.localPort?=?remoteAddr.getPort();?????this.handler?=?new?FrontendAuthenticator(this);}FrontendConnection是對前端連接channel的封裝,接受NetworkChannel作為參數(shù)構(gòu)造。前端連接建立,需要先驗證其權(quán)限,所以,handler首先設(shè)置為FrontendAuthenticator 等到驗證成功,handler會被設(shè)置成FrontendCommandHandler。 下面來看和FrontendConnection相關(guān)的Handler: ?FrontendCommandHandler會先解析請求類型,之后調(diào)用不同的方法處理不同類型的請求。例如,FrontendQueryHandler會解析query類型的sql請求語句:
?@Overridepublic?void?handle(byte[]?data){????????if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData()){MySQLMessage?mm?=?new?MySQLMessage(data);????????????int??packetLength?=?mm.readUB3();????????????if(packetLength+4==data.length){source.loadDataInfileData(data);}????????????return;}????????switch?(data[4]){????????????case?MySQLPacket.COM_INIT_DB:commands.doInitDB();source.initDB(data);????????????????break;????????????case?MySQLPacket.COM_QUERY:commands.doQuery();source.query(data);????????????????break;????????????case?MySQLPacket.COM_PING:commands.doPing();source.ping();????????????????break;????????????case?MySQLPacket.COM_QUIT:commands.doQuit();source.close("quit?cmd");????????????????break;????????????case?MySQLPacket.COM_PROCESS_KILL:commands.doKill();source.kill(data);????????????????break;????????????case?MySQLPacket.COM_STMT_PREPARE:commands.doStmtPrepare();source.stmtPrepare(data);????????????????break;????????????case?MySQLPacket.COM_STMT_EXECUTE:commands.doStmtExecute();source.stmtExecute(data);????????????????break;????????????case?MySQLPacket.COM_STMT_CLOSE:commands.doStmtClose();source.stmtClose(data);????????????????break;????????????case?MySQLPacket.COM_HEARTBEAT:commands.doHeartbeat();source.heartbeat(data);????????????????break;????????????default:commands.doOther();source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR,?????????????????????????????"Unknown?command");}}FrontendCommandHandler會調(diào)用FrontendConnection合適的方法解析處理不同的請求,例如它的initDB(byte[] data)方法:
????public?void?initDB(byte[]?data)?{MySQLMessage?mm?=?new?MySQLMessage(data);mm.position(5);String?db?=?mm.readString();????????//?檢查schema的有效性if?(db?==?null?||?!privileges.schemaExists(db))?{writeErrMessage(ErrorCode.ER_BAD_DB_ERROR,?"Unknown?database?'"?+?db?+?"'");????????????return;}????????if?(!privileges.userExists(user,?host))?{writeErrMessage(ErrorCode.ER_ACCESS_DENIED_ERROR,?"Access?denied?for?user?'"?+?user?+?"'");????????????return;}Set<String>?schemas?=?privileges.getUserSchemas(user);????????if?(schemas?==?null?||?schemas.size()?==?0?||?schemas.contains(db))?{????????????this.schema?=?db;write(writeToBuffer(OkPacket.OK,?allocate()));}?else?{String?s?=?"Access?denied?for?user?'"?+?user?+?"'?to?database?'"?+?db?+?"'";writeErrMessage(ErrorCode.ER_DBACCESS_DENIED_ERROR,?s);}}方法調(diào)用: 通過查看可以發(fā)現(xiàn),在command packet被解析出是initDB類型的請求時(其實就是用戶發(fā)送的查詢語句為“use XXX”),會調(diào)用此方法進行處理,同時,這些方法都是被RW線程執(zhí)行的。 此方法從FrontedPrivilege中驗證用戶是否有權(quán)限訪問這個邏輯庫,如果有就把當(dāng)前連接的邏輯庫設(shè)為用戶請求的邏輯庫。 其他方法與handler也是相似的關(guān)系,可以看出,FrontendConnection組合了多種封裝的handler來處理不同的請求的不同階段。至于各種handler,會在之后sql解析,sql路由,協(xié)議實現(xiàn)等模塊詳細(xì)介紹。
3.4.1 ServerConnection服務(wù)端連接
前端連接包括ServerConnection(服務(wù)端連接)和ManagerConnection(管理端連接)。前端鏈接不會直接創(chuàng)建,而是通過工廠創(chuàng)建: 工廠方法:
@Overrideprotected?FrontendConnection?getConnection(NetworkChannel?channel)?throws?IOException?{SystemConfig?sys?=?MycatServer.getInstance().getConfig().getSystem();ServerConnection?c?=?new?ServerConnection(channel);MycatServer.getInstance().getConfig().setSocketParams(c,?true);c.setPrivileges(MycatPrivileges.instance());c.setQueryHandler(new?ServerQueryHandler(c));c.setLoadDataInfileHandler(new?ServerLoadDataInfileHandler(c));????????//?c.setPrepareHandler(new?ServerPrepareHandler(c));c.setTxIsolation(sys.getTxIsolation());c.setSession2(new?NonBlockingSession(c));????????return?c;}可以看出,每個新的ServerConnection都會綁定一個新的ServerQueryHandler負(fù)責(zé)處理sql指令,一個ServerLoadDataInfileHandler負(fù)責(zé)處理文件載入命令,一個session負(fù)責(zé)處理事務(wù) 下面是相關(guān)的類圖 ?這里的所有獨立的handler里面都是static方法,可供其他類直接調(diào)用。每個ServerConnection都會有一個NonBlockingSession來處理。 這里說下連接、會話、邏輯庫、MyCat實例的關(guān)系(與MySQL里面的連接、會話、數(shù)據(jù)庫、MySQL實例的關(guān)系不太一樣);首先每個MyCat實例都管理多個數(shù)據(jù)庫。連接是針對MyCat實例建立的,并且,MyCat的連接(AbstractConnection)是不可復(fù)用的,在close方法會關(guān)閉連接并清理使用的資源。但是緩存資源(buffer)是可以復(fù)用的。比如,在一個前端連接長時間空閑時或者出現(xiàn)異常時,會被清理掉。每個連接會擁有一個session來處理事務(wù),保存會話信息。 這里,每個連接擁有一個會話。每個連接中的方法,被RW線程執(zhí)行,相當(dāng)于與RW線程綁定。RW線程是可以復(fù)用的,這里相當(dāng)于MySQL中的連接是可以復(fù)用的(連接池)。 Session.java:
public?interface?Session?{????/***?取得源端連接*/FrontendConnection?getSource();????/***?取得當(dāng)前目標(biāo)端數(shù)量*/int?getTargetCount();????/***?開啟一個會話執(zhí)行*/void?execute(RouteResultset?rrs,?int?type);????/***?提交一個會話執(zhí)行*/void?commit();????/***?回滾一個會話執(zhí)行*/void?rollback();????/***?取消一個正在執(zhí)行中的會話*?*?@param?sponsor*????????????如果發(fā)起者為null,則表示由自己發(fā)起。*/void?cancel(FrontendConnection?sponsor);????/***?終止會話,必須在關(guān)閉源端連接后執(zhí)行該方法。*/void?terminate();}下面我們著重研究它的實現(xiàn)類NonBlockingSession: 首先,取得源端連接方法FrontendConnection getSource();,其實就是NonBlockingSession在創(chuàng)建時就已綁定一個連接,誰會調(diào)用這個方法取得源端鏈接呢? ?可以發(fā)現(xiàn),主要有各種查詢的handler還有SQLengine會去調(diào)用。因為處理無論返回什么結(jié)果,都需要返回給源端。 int getTargetCount();取得當(dāng)前目標(biāo)端數(shù)量。根據(jù)目標(biāo)端的數(shù)量不同會用不同的handler處理轉(zhuǎn)發(fā)SQL和合并結(jié)果。
@Overridepublic?void?execute(RouteResultset?rrs,?int?type)?{????????//?清理之前處理用的資源clearHandlesResources();????????if?(LOGGER.isDebugEnabled())?{StringBuilder?s?=?new?StringBuilder();LOGGER.debug(s.append(source).append(rrs).toString()?+?"?rrs?");}????????//?檢查路由結(jié)果是否為空RouteResultsetNode[]?nodes?=?rrs.getNodes();????????if?(nodes?==?null?||?nodes.length?==?0?||?nodes[0].getName()?==?null||?nodes[0].getName().equals(""))?{????????????//如果為空,則表名有誤,提示客戶端source.writeErrMessage(ErrorCode.ER_NO_DB_ERROR,????????????????????"No?dataNode?found?,please?check?tables?defined?in?schema:"+?source.getSchema());????????????return;}????????//如果路由結(jié)果個數(shù)為1,則為單點查詢或事務(wù)if?(nodes.length?==?1)?{????????????//使用SingleNodeHandler處理單點查詢或事務(wù)singleNodeHandler?=?new?SingleNodeHandler(rrs,?this);????????????try?{singleNodeHandler.execute();}?catch?(Exception?e)?{LOGGER.warn(new?StringBuilder().append(source).append(rrs),?e);source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA,?e.toString());}}?else?{????????????//如果路由結(jié)果>1,則為多點查詢或事務(wù)boolean?autocommit?=?source.isAutocommit();SystemConfig?sysConfig?=?MycatServer.getInstance().getConfig().getSystem();????????????//mutiNodeLimitType沒有用。。。int?mutiNodeLimitType?=?sysConfig.getMutiNodeLimitType();????????????//使用multiNodeHandler處理多點查詢或事務(wù)multiNodeHandler?=?new?MultiNodeQueryHandler(type,?rrs,?autocommit,????????????????????this);????????????try?{multiNodeHandler.execute();}?catch?(Exception?e)?{LOGGER.warn(new?StringBuilder().append(source).append(rrs),?e);source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA,?e.toString());}}}每次一個Session執(zhí)行SQL時,會先清理handler使用的資源。SingleNodeHandler與multiNodeHandler之后會講。這里的handler我們之后會在每個模塊去講,Session之后也還會提到,敬請期待
免費體驗云安全(易盾)內(nèi)容安全、驗證碼等服務(wù)
更多網(wǎng)易技術(shù)、產(chǎn)品、運營經(jīng)驗分享請點擊。
相關(guān)文章:
【推薦】?Android TV 開發(fā)(5)
【推薦】?分布式存儲系統(tǒng)可靠性系列三:設(shè)計模式
轉(zhuǎn)載于:https://www.cnblogs.com/zyfd/p/9894735.html
《新程序員》:云原生和全面數(shù)字化實踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的数据库路由中间件MyCat - 源代码篇(7)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 洛谷 P3332 [ZJOI2013]K
- 下一篇: CMD查询Mysql中文乱码的解决方法