UDT 最新源码分析(二) -- 开始与终止
UDT 最新源碼分析 -- 開始與終止
- UDT 開始與終止
- 開始流程
- 終止流程
UDT 開始與終止
開始流程
UDT:: startup -> CUDT::startup -> CUDTUnited::startup
初始化UDT庫,多次調(diào)用時(shí),調(diào)用計(jì)數(shù)增加,但實(shí)際上僅僅初始化一次。對(duì)于windows下,還需要初始化網(wǎng)絡(luò)庫WSAStartup。建立 garbageCollect 線程,用于清理失效socket,并刪除分發(fā)器。
int CUDTUnited::startup() {CGuard gcinit(m_InitLock); //獲取初始鎖if (m_iInstanceCount++ > 0) //實(shí)例被初始化的計(jì)數(shù)器,僅初始化一次return 0;if (m_bGCStatus) //garbageCollect狀態(tài),保證線程不會(huì)被建立多個(gè)return true;m_bClosing = false; //CUDTUnited狀態(tài), 此狀態(tài)下 garbageCollect內(nèi) while循環(huán)不退出pthread_mutex_init(&m_GCStopLock, NULL);pthread_cond_init(&m_GCStopCond, NULL);pthread_create(&m_GCThread, NULL, garbageCollect, this); //garbageCollect 線程創(chuàng)建m_bGCStatus = true;return 0; }再來看 garbageCollect:
void* CUDTUnited::garbageCollect(void* p) {CUDTUnited* self = (CUDTUnited*)p; //獲取CUDTUnited實(shí)例,類型轉(zhuǎn)化CGuard gcguard(self->m_GCStopLock);//資源清理的鎖while (!self->m_bClosing) //初始 m_bClosing = false,無限循環(huán){//當(dāng)UDT協(xié)議判斷某一個(gè)UDT SOCKET的狀態(tài)不正確時(shí),會(huì)將其狀態(tài)設(shè)置為BROKEN,并在這個(gè)函數(shù)中進(jìn)行處理self->checkBrokenSockets();//睡眠等待,等待下一次可以清理出現(xiàn)BROKEN狀態(tài)的UDT SOCKET。睡眠時(shí)間 timeout: 1spthread_cond_timedwait(&self->m_GCStopCond, &self->m_GCStopLock, &timeout);}//remove all sockets and multiplexers. m_bClosing = true, 清理目前依舊殘余的資源 CGuard::enterCS(self->m_ControlLock);//遍歷 m_Sockets 中的 UDT socket,設(shè)置為關(guān)閉,存入 m_ClosedSockets map,下一步處理連接記錄。//查找 Listener, 將 該socket 在Listener中的 m_pQueuedSockets 與 m_pAcceptSockets中記錄刪除for (map<UDTSOCKET, CUDTSocket*>::iterator i = self->m_Sockets.begin(); i != self->m_Sockets.end(); ++ i){i->second->m_pUDT->m_bBroken = true; //將CUDT* 的狀態(tài)設(shè)置為 BROKEN,后續(xù)進(jìn)行處理i->second->m_pUDT->close(); //關(guān)閉i->second->m_Status = CLOSED;//設(shè)置狀態(tài)為關(guān)閉i->second->m_TimeStamp = CTimer::getTime(); //調(diào)整最后一次操作 UDT socket 的時(shí)間self->m_ClosedSockets[i->first] = i->second;//將當(dāng)前描述連接的CUDT*保存至 m_ClosedSockets// remove from listener's queuemap<UDTSOCKET, CUDTSocket*>::iterator ls = self->m_Sockets.find(i->second->m_ListenSocket);if (ls == self->m_Sockets.end()) //如果沒有找到Listener, m_ClosedSockets中繼續(xù)查找{ls = self->m_ClosedSockets.find(i->second->m_ListenSocket);if (ls == self->m_ClosedSockets.end()) //如果沒有找到,就不再處理continue;}CGuard::enterCS(ls->second->m_AcceptLock);//獲取Listener中的鎖ls->second->m_pQueuedSockets->erase(i->second->m_SocketID);//清理接收連接但還未接受的排隊(duì) UDT socketls->second->m_pAcceptSockets->erase(i->second->m_SocketID);//清理已完成連接的隊(duì)列中UDT socketCGuard::leaveCS(ls->second->m_AcceptLock);}self->m_Sockets.clear();//最后刪除m_Sockets中的所有sockets//最后再次遍歷待關(guān)閉的socket隊(duì)列for (map<UDTSOCKET, CUDTSocket*>::iterator j = self->m_ClosedSockets.begin(); j != self->m_ClosedSockets.end(); ++ j){j->second->m_TimeStamp = 0;//設(shè)置狀態(tài)為 0}CGuard::leaveCS(self->m_ControlLock);while (true){//之前只是將即將清理的UDT socket 狀態(tài)設(shè)置為BROKEN,此時(shí)對(duì)BROKEN狀態(tài)的socket進(jìn)行清理self->checkBrokenSockets();CGuard::enterCS(self->m_ControlLock);bool empty = self->m_ClosedSockets.empty(); //判斷待關(guān)閉socket是否已經(jīng)全部清理CGuard::leaveCS(self->m_ControlLock);if (empty)//如果為empty,就可以直接退出break;CTimer::sleep();//不行的話,再歇一會(huì),再次進(jìn)行處理}return NULL; }上面代碼中出現(xiàn)了兩次 checkBrokenSockets,該方法用于清理處于BROKEN狀態(tài)的 UDT socket。
- 檢查所有的 UDT socket
- 如果不是處于broken狀態(tài),查找下一個(gè),如果處于broken狀態(tài):
- 如果 socket 處于 LISTENING 狀態(tài),須再等待3s,以防止有客戶端正在連接
- 如果 recvbuffer 存在數(shù)據(jù),且 brokencount 計(jì)數(shù)器仍大于0,繼續(xù)等待更長(zhǎng)的時(shí)間
- 否則:
- 設(shè)置socket為CLOSED狀態(tài),更新 m_TimeStamp 為當(dāng)前時(shí)間, 開啟移除定時(shí)器。
- 將當(dāng)前socket加入待關(guān)閉 tbc vector, socket加入臨時(shí)存儲(chǔ)closed socket的 m_ClosedSockets map.
- 查找 Listener, 從m_pQueuedSockets 和 m_pAcceptSockets 移除接收到的socket 連接 socket。
- 如果不是處于broken狀態(tài),查找下一個(gè),如果處于broken狀態(tài):
- 對(duì)于移入臨時(shí)存儲(chǔ)待關(guān)閉的 m_ClosedSockets map,檢查每個(gè)socket,
- 如果 m_ullLingerExpiration > 0,表示在發(fā)送緩沖中存在數(shù)據(jù)時(shí),GC設(shè)置了延遲關(guān)閉的時(shí)間
- 如果發(fā)送緩沖不存在,或?yàn)榭?#xff0c;或設(shè)置關(guān)閉時(shí)間已經(jīng)超時(shí):
- 將m_ullLingerExpiration設(shè)置為0;設(shè)置UDT socket為關(guān)閉狀態(tài),在下一次GC中將被回收。更新關(guān)閉時(shí)間為當(dāng)前時(shí)間。
- 如果發(fā)送緩沖不存在,或?yàn)榭?#xff0c;或設(shè)置關(guān)閉時(shí)間已經(jīng)超時(shí):
- 否則:
- 如果標(biāo)記關(guān)閉時(shí)間已經(jīng)超過1s,且socket對(duì)應(yīng)接收隊(duì)列信息節(jié)點(diǎn)已經(jīng)被刪除,或者節(jié)點(diǎn)不存在list中
- 將UDT socket加入tbr 移除隊(duì)列
- 如果 m_ullLingerExpiration > 0,表示在發(fā)送緩沖中存在數(shù)據(jù)時(shí),GC設(shè)置了延遲關(guān)閉的時(shí)間
- 遍歷tbc,從 m_Sockets中刪除。遍歷tbr, 將這些超時(shí)的 socket 移除。
如果 m_ClosedSockets 中UDT socket 已經(jīng)超過超過1s, 就會(huì)放入 tbr。遍歷 tbr, 移除所有socket,更新相關(guān)資源。這里用到了removeSocket 方法。
- 如果UDT socket 是一個(gè) Listener,將所有收到的但未接受的socket 關(guān)閉,設(shè)置為 BROKEN 狀態(tài),等待移除。
- 刪除 m_PeerRec 中記錄的連接。
- 刪除 UDT socket, 檢查對(duì)應(yīng)的復(fù)用器,計(jì)數(shù)器減一,如果已經(jīng)為0, 則關(guān)閉channel,清理復(fù)用器內(nèi)的資源。
終止流程
- UDT::cleanup -> CUDT::cleanup -> CUDTUnited::cleanup
終止流程與開始流程相反,在計(jì)數(shù)器清零以后才會(huì)真正退出,每次調(diào)用,計(jì)數(shù)減一。修改 m_bClosing 與 m_bGCStatus 狀態(tài),終止垃圾回收線程。
int CUDTUnited::cleanup() {CGuard gcinit(m_InitLock);if (--m_iInstanceCount > 0) //計(jì)數(shù)器減一return 0;if (!m_bGCStatus) return 0;m_bClosing = true;pthread_cond_signal(&m_GCStopCond);pthread_join(m_GCThread, NULL);pthread_mutex_destroy(&m_GCStopLock);pthread_cond_destroy(&m_GCStopCond);m_bGCStatus = false;return 0; }對(duì)UDT庫的初始化與終止流程分析結(jié)束,在例程中,基本上都是顯示調(diào)用這兩個(gè)接口,但是在 appclient.cpp 中,通過構(gòu)造函數(shù)與析構(gòu)函數(shù)隱式完成。兩種方式均可,可根據(jù)需要自選。
struct UDTUpDown{UDTUpDown(){// use this function to initialize the UDT libraryUDT::startup();}~UDTUpDown(){// use this function to release the UDT libraryUDT::cleanup();} };總結(jié)
以上是生活随笔為你收集整理的UDT 最新源码分析(二) -- 开始与终止的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java phpwind_GitHub
- 下一篇: 强迫症才需要看,新装电脑 Win10 硬