日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

UDT协议实现分析——UDT Socket的创建

發(fā)布時間:2024/4/11 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 UDT协议实现分析——UDT Socket的创建 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

UDT API的用法

在分析 連接的建立過程 之前,先來看一下UDT API的用法。在UDT網(wǎng)絡中,通常要有一個UDT Server監(jiān)聽在某臺機器的某個UDP端口上,等待客戶端的連接;有一個或多個客戶端連接UDT Server;UDT Server接收到來自客戶端的連接請求后,創(chuàng)建另外一個單獨的UDT Socket用于與該客戶端進行通信。

先來看一下UDT Server的簡單的實現(xiàn),UDT的開發(fā)者已經(jīng)提供了一些demo程序可供參考,位于app/目錄下。

#include <unistd.h> #include <cstdlib> #include <cstring> #include <netdb.h>#include <iostream> #include <udt.h>using namespace std;void* recvdata(void*);struct UDTUpDown {UDTUpDown() {// use this function to initialize the UDT libraryUDT::startup();}~UDTUpDown() {// use this function to release the UDT libraryUDT::cleanup();} };int main(int argc, char* argv[]) {if ((1 != argc) && ((2 != argc) || (0 == atoi(argv[1])))) {cout << "usage: appserver [server_port]" << endl;return 0;}// Automatically start up and clean up UDT module.UDTUpDown _udt_;addrinfo hints;addrinfo* res;memset(&hints, 0, sizeof(struct addrinfo));hints.ai_flags = AI_PASSIVE;hints.ai_family = AF_INET;hints.ai_socktype = SOCK_STREAM;//hints.ai_socktype = SOCK_DGRAM;string service("9000");if (2 == argc)service = argv[1];if (0 != getaddrinfo(NULL, service.c_str(), &hints, &res)) {cout << "illegal port number or port is busy.\n" << endl;return 0;}UDTSOCKET serv = UDT::socket(res->ai_family, res->ai_socktype, res->ai_protocol);// UDT Options//UDT::setsockopt(serv, 0, UDT_CC, new CCCFactory<CUDPBlast>, sizeof(CCCFactory<CUDPBlast>));//UDT::setsockopt(serv, 0, UDT_MSS, new int(9000), sizeof(int));//UDT::setsockopt(serv, 0, UDT_RCVBUF, new int(10000000), sizeof(int));//UDT::setsockopt(serv, 0, UDP_RCVBUF, new int(10000000), sizeof(int));if (UDT::ERROR == UDT::bind(serv, res->ai_addr, res->ai_addrlen)) {cout << "bind: " << UDT::getlasterror().getErrorMessage() << endl;return 0;}freeaddrinfo(res);cout << "server is ready at port: " << service << endl;if (UDT::ERROR == UDT::listen(serv, 10)) {cout << "listen: " << UDT::getlasterror().getErrorMessage() << endl;return 0;}sockaddr_storage clientaddr;int addrlen = sizeof(clientaddr);UDTSOCKET recver;while (true) {if (UDT::INVALID_SOCK == (recver = UDT::accept(serv, (sockaddr*) &clientaddr, &addrlen))) {cout << "accept: " << UDT::getlasterror().getErrorMessage() << endl;return 0;}char clienthost[NI_MAXHOST];char clientservice[NI_MAXSERV];getnameinfo((sockaddr *) &clientaddr, addrlen, clienthost, sizeof(clienthost), clientservice,sizeof(clientservice), NI_NUMERICHOST | NI_NUMERICSERV);cout << "new connection: " << clienthost << ":" << clientservice << endl;pthread_t rcvthread;pthread_create(&rcvthread, NULL, recvdata, new UDTSOCKET(recver));pthread_detach(rcvthread);}UDT::close(serv);return 0; }void* recvdata(void* usocket) {UDTSOCKET recver = *(UDTSOCKET*) usocket;delete (UDTSOCKET*) usocket;char* data;int size = 100000;data = new char[size];while (true) {int rsize = 0;int rs;while (rsize < size) {int rcv_size;int var_size = sizeof(int);UDT::getsockopt(recver, 0, UDT_RCVDATA, &rcv_size, &var_size);if (UDT::ERROR == (rs = UDT::recv(recver, data + rsize, size - rsize, 0))) {cout << "recv:" << UDT::getlasterror().getErrorMessage() << endl;break;}rsize += rs;}if (rsize < size)break;}delete[] data;UDT::close(recver);return NULL; }
  • 如在 UDT協(xié)議實現(xiàn)分析——UDT初始化和銷毀 一文中提到的, 在調(diào)用任何UDT API之前,需要首先調(diào)用UDT::startup()來對庫做一個初始化,并在結(jié)束之后執(zhí)行UDT::cleanup()做最后的清理。在這個示例中,是創(chuàng)建了一個helper類UDTUpDown來幫助做這些事情的。利用編程語言本身提供的構(gòu)造-析夠機制來對UDT進行初始化和銷毀要比手動調(diào)用這些函數(shù)要可靠得多。
  • 獲得本地UDP端口的網(wǎng)絡地址。
  • 調(diào)用UDT::socket()函數(shù)創(chuàng)建一個UDT Socket。這個Socket是UDT抽象出來的一個邏輯的Socket,我們并不能利用這個Socket本身來收發(fā)數(shù)據(jù)。
  • 調(diào)用UDT::bind()將創(chuàng)建的UDT Socket綁定到本地端口的網(wǎng)絡地址。這一步將會把UDT的邏輯Socket與能夠進行數(shù)據(jù)收發(fā)的系統(tǒng)UDP socket進行關(guān)聯(lián),自此我們就可以利用UDT Socket進行收發(fā)數(shù)據(jù)了。
  • 調(diào)用UDT::listen()告訴UDT,把這個UDT Socket做為這個端口上的Listening Socket。一個UDP端口可以被多個UDT Socket復用,在調(diào)用UDT::listen()之后,UDT就知道,在有其它節(jié)點連接這個UDP端口時,需要把相關(guān)的連接請求消息發(fā)送到哪個UDT Socket的接收緩沖區(qū)了。
    對綁定到相同UDP端口的多個不同UDT Socket調(diào)用UDT::listen()時,UDT是如何處理的?我們知道,一個UDP端口上最多只能有一個listening Socket。
  • 調(diào)用UDT::accept()函數(shù)等待其它節(jié)點的連接。其它節(jié)點連接時,這個函數(shù)返回另外一個單獨的UDT Socket,以用于與發(fā)起連接的節(jié)點進行通信。
    UDT::accept()函數(shù)返回的UDT Socket會被綁定到另外的一個不同的UDP端口,還是會被綁定到listening Socket所綁定的UDP端口?
  • 使用UDT::accept()返回的UDT Socket,利用UDT::recv()與UDT::send()等函數(shù)同發(fā)起連接的節(jié)點進行數(shù)據(jù)傳輸。
  • 在不再需要UDT Socket時,調(diào)用UDT::close()函數(shù)關(guān)掉它,以釋放資源。
    然后再來看下UDT Client的實現(xiàn):
  • #include <unistd.h> #include <cstdlib> #include <cstring> #include <netdb.h>#include <iostream> #include <udt.h>using namespace std;void* monitor(void*);struct UDTUpDown {UDTUpDown() {// use this function to initialize the UDT libraryUDT::startup();}~UDTUpDown() {// use this function to release the UDT libraryUDT::cleanup();} };int main(int argc, char* argv[]) {if ((3 != argc) || (0 == atoi(argv[2]))) {cout << "usage: appclient server_ip server_port" << endl;return 0;}// Automatically start up and clean up UDT module.UDTUpDown _udt_;struct addrinfo hints, *local, *peer;memset(&hints, 0, sizeof(struct addrinfo));hints.ai_flags = AI_PASSIVE;hints.ai_family = AF_INET;hints.ai_socktype = SOCK_STREAM;//hints.ai_socktype = SOCK_DGRAM;if (0 != getaddrinfo(NULL, "9000", &hints, &local)) {cout << "incorrect network address.\n" << endl;return 0;}UDTSOCKET client = UDT::socket(local->ai_family, local->ai_socktype, local->ai_protocol);// UDT Options//UDT::setsockopt(client, 0, UDT_CC, new CCCFactory<CUDPBlast>, sizeof(CCCFactory<CUDPBlast>));//UDT::setsockopt(client, 0, UDT_MSS, new int(9000), sizeof(int));//UDT::setsockopt(client, 0, UDT_SNDBUF, new int(10000000), sizeof(int));//UDT::setsockopt(client, 0, UDP_SNDBUF, new int(10000000), sizeof(int));//UDT::setsockopt(client, 0, UDT_MAXBW, new int64_t(12500000), sizeof(int));// for rendezvous connection, enable the code below/*UDT::setsockopt(client, 0, UDT_RENDEZVOUS, new bool(true), sizeof(bool));if (UDT::ERROR == UDT::bind(client, local->ai_addr, local->ai_addrlen)){cout << "bind: " << UDT::getlasterror().getErrorMessage() << endl;return 0;}*/freeaddrinfo(local);if (0 != getaddrinfo(argv[1], argv[2], &hints, &peer)) {cout << "incorrect server/peer address. " << argv[1] << ":" << argv[2] << endl;return 0;}// connect to the server, implict bindif (UDT::ERROR == UDT::connect(client, peer->ai_addr, peer->ai_addrlen)) {cout << "connect: " << UDT::getlasterror().getErrorMessage() << endl;return 0;}freeaddrinfo(peer);int size = 100000;char* data = new char[size];pthread_create(new pthread_t, NULL, monitor, &client);for (int i = 0; i < 1000000; i++) {int ssize = 0;int ss;while (ssize < size) {if (UDT::ERROR == (ss = UDT::send(client, data + ssize, size - ssize, 0))) {cout << "send:" << UDT::getlasterror().getErrorMessage() << endl;break;}ssize += ss;}if (ssize < size)break;}UDT::close(client);delete[] data;return 0; }void* monitor(void* s) {UDTSOCKET u = *(UDTSOCKET*) s;UDT::TRACEINFO perf;cout << "SendRate(Mb/s)\tRTT(ms)\tCWnd\tPktSndPeriod(us)\tRecvACK\tRecvNAK" << endl;while (true) {sleep(1);if (UDT::ERROR == UDT::perfmon(u, &perf)) {cout << "perfmon: " << UDT::getlasterror().getErrorMessage() << endl;break;}cout << perf.mbpsSendRate << "\t\t" << perf.msRTT << "\t" << perf.pktCongestionWindow << "\t"<< perf.usPktSndPeriod << "\t\t\t" << perf.pktRecvACK << "\t" << perf.pktRecvNAK << endl;}return NULL; }

    可以看到UDT Client連接UDT Server并發(fā)送數(shù)據(jù)的過程大體如下:

  • 同樣需要在調(diào)用任何UDT API之前,先調(diào)用UDT::startup()來對庫做初始化,并在結(jié)束之后執(zhí)行UDT::cleanup()做最后的清理。這里同樣使用helper類UDTUpDown來幫助做這些事情。

  • 獲得UDT Server的網(wǎng)絡地址。

  • 調(diào)用UDT::socket()函數(shù)創(chuàng)建一個UDT Socket。這個Socket可以與特定的本地地址綁定,也可以不綁定。如果綁定,則發(fā)送數(shù)據(jù)時的出口地址就是該端口,如果不綁定,出口地址則是一個不確定的值。

  • 調(diào)用UDT::connect()連接UDT Server。

  • 使用UDT Socket,利用UDT::recv()與UDT::send()等函數(shù)同UDT Server進行數(shù)據(jù)傳輸。

  • 在不再需要UDT Socket時,調(diào)用UDT::close()函數(shù)關(guān)掉它,以釋放資源。

  • UDT基本的收發(fā)數(shù)據(jù)的API的用法大體如上面所示。接著我們來看,這些函數(shù)是如何實現(xiàn)的。

    UDT Socket的創(chuàng)建

    無論是UDT Server要listening,還是UDT client要連接UDT Server,在調(diào)用UDT::startup()初始化UDT之后,首先要做的事情都是調(diào)用UDT::socket()創(chuàng)建UDT Socket了。我們就來看一下創(chuàng)建UDT Socket的過程:

    UDTSOCKET CUDTUnited::newSocket(int af, int type) {if ((type != SOCK_STREAM) && (type != SOCK_DGRAM))throw CUDTException(5, 3, 0);CUDTSocket* ns = NULL;try {ns = new CUDTSocket;ns->m_pUDT = new CUDT;if (AF_INET == af) {ns->m_pSelfAddr = (sockaddr*) (new sockaddr_in);((sockaddr_in*) (ns->m_pSelfAddr))->sin_port = 0;} else {ns->m_pSelfAddr = (sockaddr*) (new sockaddr_in6);((sockaddr_in6*) (ns->m_pSelfAddr))->sin6_port = 0;}} catch (...) {delete ns;throw CUDTException(3, 2, 0);}CGuard::enterCS(m_IDLock);ns->m_SocketID = --m_SocketID;CGuard::leaveCS(m_IDLock);ns->m_Status = INIT;ns->m_ListenSocket = 0;ns->m_pUDT->m_SocketID = ns->m_SocketID;ns->m_pUDT->m_iSockType = (SOCK_STREAM == type) ? UDT_STREAM : UDT_DGRAM;ns->m_pUDT->m_iIPversion = ns->m_iIPversion = af;ns->m_pUDT->m_pCache = m_pCache;// protect the m_Sockets structure.CGuard::enterCS(m_ControlLock);try {m_Sockets[ns->m_SocketID] = ns;} catch (...) {//failure and rollbackCGuard::leaveCS(m_ControlLock);delete ns;ns = NULL;}CGuard::leaveCS(m_ControlLock);if (NULL == ns)throw CUDTException(3, 2, 0);return ns->m_SocketID; }UDTSOCKET CUDT::socket(int af, int type, int) {if (!s_UDTUnited.m_bGCStatus)s_UDTUnited.startup();try {return s_UDTUnited.newSocket(af, type);} catch (CUDTException& e) {s_UDTUnited.setError(new CUDTException(e));return INVALID_SOCK;} catch (bad_alloc&) {s_UDTUnited.setError(new CUDTException(3, 2, 0));return INVALID_SOCK;} catch (...) {s_UDTUnited.setError(new CUDTException(-1, 0, 0));return INVALID_SOCK;} }UDTSOCKET socket(int af, int type, int protocol) {return CUDT::socket(af, type, protocol); }

    調(diào)用流程為,UDT::socket() -> CUDT::socket() -> CUDTUnited::newSocket()。

    在CUDT::socket()中,我們看到,它會首先檢查s_UDTUnited.m_bGCStatus,若發(fā)現(xiàn)UDT還沒有初始化完成的話,則會調(diào)用s_UDTUnited.startup()進行初始化。這個地方對UDT狀態(tài)s_UDTUnited.m_bGCStatus的檢查沒有問題,但在發(fā)現(xiàn)UDT沒有初始化完成時調(diào)用s_UDTUnited.startup()似乎并不恰當。這個地方調(diào)用了s_UDTUnited.startup(),那與這次調(diào)用相對應的cleanup()又在哪調(diào)用了呢?顯然在UDT內(nèi)部是沒有。若是調(diào)用cleanup()的職責總是在UDT的使用者,那倒不如在這個地方返回錯誤給調(diào)用者,完全讓調(diào)用者來管理UDT的生命周期,以盡可能地避免資源泄漏。

    創(chuàng)建UDT Socket的工作實際都在CUDTUnited::newSocket()函數(shù)中完成。可以看到,在這個函數(shù)中主要做了如下的這樣一些事情:

  • 創(chuàng)建一個CUDTSocket對象ns,并創(chuàng)建一個CUDT對象被ns->m_pUDT引用。初始化ns對象的Self網(wǎng)絡地址,端口會被設置為0。在UDT中使用CUDTSocket和CUDT共同來描述一個Socket。每個UDT Socket都會有其相應的CUDTSocket對象和CUDT對象。
  • 可以看一下CUDTSocket類的定義,來了解它都描述了UDT Socket的哪些屬性(src/api.h):

    class CUDTSocket {public:CUDTSocket();~CUDTSocket();UDTSTATUS m_Status; // current socket stateuint64_t m_TimeStamp; // time when the socket is closedint m_iIPversion; // IP versionsockaddr* m_pSelfAddr; // pointer to the local address of the socketsockaddr* m_pPeerAddr; // pointer to the peer address of the socketUDTSOCKET m_SocketID; // socket IDUDTSOCKET m_ListenSocket; // ID of the listener socket; 0 means this is an independent socketUDTSOCKET m_PeerID; // peer socket IDint32_t m_iISN; // initial sequence number, used to tell different connection from same IP:portCUDT* m_pUDT; // pointer to the UDT entitystd::set<UDTSOCKET>* m_pQueuedSockets; // set of connections waiting for accept()std::set<UDTSOCKET>* m_pAcceptSockets; // set of accept()ed connectionspthread_cond_t m_AcceptCond; // used to block "accept" callpthread_mutex_t m_AcceptLock; // mutex associated to m_AcceptCondunsigned int m_uiBackLog; // maximum number of connections in queueint m_iMuxID; // multiplexer IDpthread_mutex_t m_ControlLock; // lock this socket exclusively for control APIs: bind/listen/connectprivate:CUDTSocket(const CUDTSocket&);CUDTSocket& operator=(const CUDTSocket&); };

    這個類只提供了構(gòu)造和析構(gòu)兩個成員函數(shù)。還聲明了私有的copy構(gòu)造函數(shù)和賦值操作符函數(shù),但沒有定義它們,以避免類對象的復制。

    可以再來看一下CUDTSocket的構(gòu)造函數(shù)實現(xiàn)(src/api.h):

    CUDTSocket::CUDTSocket(): m_Status(INIT),m_TimeStamp(0),m_iIPversion(0),m_pSelfAddr(NULL),m_pPeerAddr(NULL),m_SocketID(0),m_ListenSocket(0),m_PeerID(0),m_iISN(0),m_pUDT(NULL),m_pQueuedSockets(NULL),m_pAcceptSockets(NULL),m_AcceptCond(),m_AcceptLock(),m_uiBackLog(0),m_iMuxID(-1) { #ifndef WIN32pthread_mutex_init(&m_AcceptLock, NULL);pthread_cond_init(&m_AcceptCond, NULL);pthread_mutex_init(&m_ControlLock, NULL); #elsem_AcceptLock = CreateMutex(NULL, false, NULL);m_AcceptCond = CreateEvent(NULL, false, false, NULL);m_ControlLock = CreateMutex(NULL, false, NULL); #endif }

    特別注意m_Status的初始化,該值被初始化為了INIT。從狀態(tài)機的角度來看CUDTSocket,在它剛被new出來時,它處于INIT狀態(tài)。

    CUDT類有兩個主要的職責,一是描述UDT Socket,包括所有的非靜態(tài)成員變量和非靜態(tài)成員函數(shù),定義UDT Socket的大部分屬性和所能提供操作;二是提供API,包括絕大部分的static成員函數(shù),這些函數(shù)將調(diào)用者與UDT內(nèi)部的實現(xiàn)連接起來。CUDT類這樣的設計,明顯違背了OO的SRP單一職責原則,這多少還是給代碼的閱讀帶來了一定的障礙。再來看一下CUDT的構(gòu)造函數(shù)實現(xiàn)(src/core.cpp):

    CUDT::CUDT() {m_pSndBuffer = NULL;m_pRcvBuffer = NULL;m_pSndLossList = NULL;m_pRcvLossList = NULL;m_pACKWindow = NULL;m_pSndTimeWindow = NULL;m_pRcvTimeWindow = NULL;m_pSndQueue = NULL;m_pRcvQueue = NULL;m_pPeerAddr = NULL;m_pSNode = NULL;m_pRNode = NULL;// Initilize mutex and condition variablesinitSynch();// Default UDT configurationsm_iMSS = 1500;m_bSynSending = true;m_bSynRecving = true;m_iFlightFlagSize = 25600;m_iSndBufSize = 8192;m_iRcvBufSize = 8192; //Rcv buffer MUST NOT be bigger than Flight Flag sizem_Linger.l_onoff = 1;m_Linger.l_linger = 180;m_iUDPSndBufSize = 65536;m_iUDPRcvBufSize = m_iRcvBufSize * m_iMSS;m_iSockType = UDT_STREAM;m_iIPversion = AF_INET;m_bRendezvous = false;m_iSndTimeOut = -1;m_iRcvTimeOut = -1;m_bReuseAddr = true;m_llMaxBW = -1;m_pCCFactory = new CCCFactory<CUDTCC>;m_pCC = NULL;m_pCache = NULL;// Initial statusm_bOpened = false;m_bListening = false;m_bConnecting = false;m_bConnected = false;m_bClosing = false;m_bShutdown = false;m_bBroken = false;m_bPeerHealth = true;m_ullLingerExpiration = 0; }void CUDT::initSynch() { #ifndef WIN32pthread_mutex_init(&m_SendBlockLock, NULL);pthread_cond_init(&m_SendBlockCond, NULL);pthread_mutex_init(&m_RecvDataLock, NULL);pthread_cond_init(&m_RecvDataCond, NULL);pthread_mutex_init(&m_SendLock, NULL);pthread_mutex_init(&m_RecvLock, NULL);pthread_mutex_init(&m_AckLock, NULL);pthread_mutex_init(&m_ConnectionLock, NULL); #elsem_SendBlockLock = CreateMutex(NULL, false, NULL);m_SendBlockCond = CreateEvent(NULL, false, false, NULL);m_RecvDataLock = CreateMutex(NULL, false, NULL);m_RecvDataCond = CreateEvent(NULL, false, false, NULL);m_SendLock = CreateMutex(NULL, false, NULL);m_RecvLock = CreateMutex(NULL, false, NULL);m_AckLock = CreateMutex(NULL, false, NULL);m_ConnectionLock = CreateMutex(NULL, false, NULL); #endif }

    都是成員變量的初始化,后續(xù)再來詳細了解這些成員變量的作用。

  • 為Socket分配SocketID,其值為CUDTUnited的m_SocketID遞減的結(jié)果。m_SocketID在CUDTUnited的構(gòu)造函數(shù)中初始化:
  • CUDTUnited::CUDTUnited(): m_Sockets(),m_ControlLock(),m_IDLock(),m_SocketID(0),m_TLSError(),m_mMultiplexer(),m_MultiplexerLock(),m_pCache(NULL),m_bClosing(false),m_GCStopLock(),m_GCStopCond(),m_InitLock(),m_iInstanceCount(0),m_bGCStatus(false),m_GCThread(),m_ClosedSockets() {// Socket ID MUST start from a random valuesrand((unsigned int) CTimer::getTime());m_SocketID = 1 + (int) ((1 << 30) * (double(rand()) / RAND_MAX));#ifndef WIN32pthread_mutex_init(&m_ControlLock, NULL);pthread_mutex_init(&m_IDLock, NULL);pthread_mutex_init(&m_InitLock, NULL); #elsem_ControlLock = CreateMutex(NULL, false, NULL);m_IDLock = CreateMutex(NULL, false, NULL);m_InitLock = CreateMutex(NULL, false, NULL); #endif#ifndef WIN32pthread_key_create(&m_TLSError, TLSDestroy); #elsem_TLSError = TlsAlloc();m_TLSLock = CreateMutex(NULL, false, NULL); #endifm_pCache = new CCache<CInfoBlock>; }

    m_SocketID的初始值是一個隨機數(shù)。

  • 初始化ns及它的CUDT對象的一些成員變量。特別注意ns->m_Status的賦值,這里該值被賦為了INIT。從狀態(tài)機的角度來看待CUDTSocket,在執(zhí)行UDT Socket創(chuàng)建結(jié)束執(zhí)行時,它處于INIT狀態(tài)下。

  • 將ns放在std::map<UDTSOCKET, CUDTSocket*> m_Sockets中。

  • 將UDT socket的SocketID返回給調(diào)用者。

  • 這個接口直接返回一個表示UDT Socket的類對象,而不是一個handle,以便于調(diào)用者在調(diào)用UDT API時無需每次都輸入UDT Socket handle參數(shù),這樣的API設計相對于UDT的這種API設計,有什么樣的優(yōu)缺點?

    UDT的錯誤處理機制

    在前面UDT Server和UDT Client的demo程序中,我們有看到,所有的UDT API都會在出錯時返回一個錯誤碼,比如UDT::bind()、UDT::bind2()、UDT::listen()和UDT::connect()等返回值類型為int的函數(shù),在出錯時返回UDT::ERROR,UDT::socket()和UDT::accept()等返回值類型為UDTSOCKET的函數(shù)在出錯時返回UDT::INVALID_SOCK。

    UDT API的調(diào)用者在檢測到它們返回了錯誤值時,通過調(diào)用UDT::getlasterror()函數(shù)來獲取關(guān)于異常更加詳細的信息。這里我們就來看一下這套機制的實現(xiàn)。

    注意看CUDT::socket()函數(shù)的實現(xiàn)。在這個函數(shù)中,會將實際創(chuàng)建UDT Socket的任務委托給s_UDTUnited.newSocket(),但這個調(diào)用會被包在一個try-catch塊中。在s_UDTUnited.newSocket()函數(shù)執(zhí)行的過程中發(fā)生任何的異常,都會先被CUDT::socket()捕獲。CUDT::socket()函數(shù)在捕獲這些異常之后,會根據(jù)捕獲的異常的類型創(chuàng)建不同的CUDTException對象,并通過調(diào)用CUDTUnited::setError()函數(shù)將該CUDTException對象設置給s_UDTUnited。

    我們來看一下CUDTUnited::setError()函數(shù)的實現(xiàn)(src/api.cpp):

    void CUDTUnited::setError(CUDTException* e) { #ifndef WIN32delete (CUDTException*) pthread_getspecific(m_TLSError);pthread_setspecific(m_TLSError, e); #elseCGuard tg(m_TLSLock);delete (CUDTException*)TlsGetValue(m_TLSError);TlsSetValue(m_TLSError, e);m_mTLSRecord[GetCurrentThreadId()] = e; #endif }

    在這個函數(shù)中,會首先取出線程局部存儲變量m_TLSError中保存的本線程上一次創(chuàng)建的 CUDTException對象,將其delete掉,并將本次創(chuàng)建的CUDTException對象設置進去。CUDTUnited類定義中m_TLSError的聲明(src/api.h):

    private:pthread_key_t m_TLSError; // thread local error record (last error) #ifndef WIN32static void TLSDestroy(void* e) {if (NULL != e)delete (CUDTException*) e;} #elsestd::map<DWORD, CUDTException*> m_mTLSRecord;void checkTLSValue();pthread_mutex_t m_TLSLock; #endif

    在CUDTUnited構(gòu)造函數(shù)中也可以看到對這個對象的初始化。 CUDTUnited::TLSDestroy()函數(shù)是m_TLSError的析構(gòu)函數(shù),在m_TLSError最后被銷毀時,這個函數(shù)被調(diào)用,以便于釋放搜有還未釋放的資源。

    再來看UDT API的調(diào)用者獲取上一次發(fā)生的異常的函數(shù)UDT::getlasterror():

    CUDTException* CUDTUnited::getError() { #ifndef WIN32if (NULL == pthread_getspecific(m_TLSError))pthread_setspecific(m_TLSError, new CUDTException);return (CUDTException*) pthread_getspecific(m_TLSError); #elseCGuard tg(m_TLSLock);if(NULL == TlsGetValue(m_TLSError)){CUDTException* e = new CUDTException;TlsSetValue(m_TLSError, e);m_mTLSRecord[GetCurrentThreadId()] = e;}return (CUDTException*)TlsGetValue(m_TLSError); #endif }CUDTException& CUDT::getlasterror() {return *s_UDTUnited.getError(); }ERRORINFO& getlasterror() {return CUDT::getlasterror(); }

    調(diào)用過程為UDT::getlasterror() -> CUDT::getlasterror() -> CUDTUnited::getError()。

    主要就是從s_UDTUnited的線程局部存儲變量m_TLSError中取出前面設置的本線程上次創(chuàng)建的CUDTException對象返回給調(diào)用者。

    總結(jié)一下,UDT的使用者在調(diào)用UDT API時,UDT API會直接調(diào)用CUDT類對應的static API函數(shù),在CUDT類的這些static API函數(shù)中會將做實際事情的工作委托給s_UDTUnited的相應函數(shù),但這個委托調(diào)用會被包在一個try-catch block中。s_UDTUnited的函數(shù)在遇到異常情況時拋出異常,CUDT類的static API函數(shù)捕獲異常,根據(jù)捕獲到的異常的具體類型,創(chuàng)建不同的CUDTException對象設置給s_UDTUnited的線程局部存儲變量m_TLSError中并向UDT API調(diào)用者返回錯誤碼,UDT API的調(diào)用者檢測到錯誤碼后,通過UDT::getlasterror()獲取存儲在m_TLSError中的異常。

    此處可以看到,CUDT提供的這一層API,一個比較重要的作用大概就是做異常處理了。

    在UDT中,使用CUDTException來描述所有出現(xiàn)的異常。可以看一下這個類的定義(src/udt.h):

    class UDT_API CUDTException {public:CUDTException(int major = 0, int minor = 0, int err = -1);CUDTException(const CUDTException& e);virtual ~CUDTException();// Functionality:// Get the description of the exception.// Parameters:// None.// Returned value:// Text message for the exception description.virtual const char* getErrorMessage();// Functionality:// Get the system errno for the exception.// Parameters:// None.// Returned value:// errno.virtual int getErrorCode() const;// Functionality:// Clear the error code.// Parameters:// None.// Returned value:// None.virtual void clear();private:int m_iMajor; // major exception categories// 0: correct condition // 1: network setup exception // 2: network connection broken // 3: memory exception // 4: file exception // 5: method not supported // 6+: undefined errorint m_iMinor; // for specific error reasonsint m_iErrno; // errno returned by the system if there is anystd::string m_strMsg; // text error messagestd::string m_strAPI; // the name of UDT function that returns the errorstd::string m_strDebug; // debug information, set to the original place that causes the errorpublic:// Error Codestatic const int SUCCESS;static const int ECONNSETUP;static const int ENOSERVER;static const int ECONNREJ;static const int ESOCKFAIL;static const int ESECFAIL;static const int ECONNFAIL;static const int ECONNLOST;static const int ENOCONN;static const int ERESOURCE;static const int ETHREAD;static const int ENOBUF;static const int EFILE;static const int EINVRDOFF;static const int ERDPERM;static const int EINVWROFF;static const int EWRPERM;static const int EINVOP;static const int EBOUNDSOCK;static const int ECONNSOCK;static const int EINVPARAM;static const int EINVSOCK;static const int EUNBOUNDSOCK;static const int ENOLISTEN;static const int ERDVNOSERV;static const int ERDVUNBOUND;static const int ESTREAMILL;static const int EDGRAMILL;static const int EDUPLISTEN;static const int ELARGEMSG;static const int EINVPOLLID;static const int EASYNCFAIL;static const int EASYNCSND;static const int EASYNCRCV;static const int ETIMEOUT;static const int EPEERERR;static const int EUNKNOWN; };

    這個class主要通過Major錯誤碼和Minor錯誤碼來描述異常情況,如果是調(diào)用系統(tǒng)調(diào)用出錯了,還會用Errno值。

    具體看一下這個class的實現(xiàn),特別是CUDTException::getErrorMessage()函數(shù),來了解每一個錯誤碼所代表的含義:

    CUDTException::CUDTException(int major, int minor, int err): m_iMajor(major),m_iMinor(minor) {if (-1 == err) #ifndef WIN32m_iErrno = errno; #elsem_iErrno = GetLastError(); #endifelsem_iErrno = err; }CUDTException::CUDTException(const CUDTException& e): m_iMajor(e.m_iMajor),m_iMinor(e.m_iMinor),m_iErrno(e.m_iErrno),m_strMsg() { }CUDTException::~CUDTException() { }const char* CUDTException::getErrorMessage() {// translate "Major:Minor" code into text message.switch (m_iMajor) {case 0:m_strMsg = "Success";break;case 1:m_strMsg = "Connection setup failure";switch (m_iMinor) {case 1:m_strMsg += ": connection time out";break;case 2:m_strMsg += ": connection rejected";break;case 3:m_strMsg += ": unable to create/configure UDP socket";break;case 4:m_strMsg += ": abort for security reasons";break;default:break;}break;case 2:switch (m_iMinor) {case 1:m_strMsg = "Connection was broken";break;case 2:m_strMsg = "Connection does not exist";break;default:break;}break;case 3:m_strMsg = "System resource failure";switch (m_iMinor) {case 1:m_strMsg += ": unable to create new threads";break;case 2:m_strMsg += ": unable to allocate buffers";break;default:break;}break;case 4:m_strMsg = "File system failure";switch (m_iMinor) {case 1:m_strMsg += ": cannot seek read position";break;case 2:m_strMsg += ": failure in read";break;case 3:m_strMsg += ": cannot seek write position";break;case 4:m_strMsg += ": failure in write";break;default:break;}break;case 5:m_strMsg = "Operation not supported";switch (m_iMinor) {case 1:m_strMsg += ": Cannot do this operation on a BOUND socket";break;case 2:m_strMsg += ": Cannot do this operation on a CONNECTED socket";break;case 3:m_strMsg += ": Bad parameters";break;case 4:m_strMsg += ": Invalid socket ID";break;case 5:m_strMsg += ": Cannot do this operation on an UNBOUND socket";break;case 6:m_strMsg += ": Socket is not in listening state";break;case 7:m_strMsg += ": Listen/accept is not supported in rendezous connection setup";break;case 8:m_strMsg += ": Cannot call connect on UNBOUND socket in rendezvous connection setup";break;case 9:m_strMsg += ": This operation is not supported in SOCK_STREAM mode";break;case 10:m_strMsg += ": This operation is not supported in SOCK_DGRAM mode";break;case 11:m_strMsg += ": Another socket is already listening on the same port";break;case 12:m_strMsg += ": Message is too large to send (it must be less than the UDT send buffer size)";break;case 13:m_strMsg += ": Invalid epoll ID";break;default:break;}break;case 6:m_strMsg = "Non-blocking call failure";switch (m_iMinor) {case 1:m_strMsg += ": no buffer available for sending";break;case 2:m_strMsg += ": no data available for reading";break;default:break;}break;case 7:m_strMsg = "The peer side has signalled an error";break;default:m_strMsg = "Unknown error";}// Adding "errno" informationif ((0 != m_iMajor) && (0 < m_iErrno)) {m_strMsg += ": "; #ifndef WIN32char errmsg[1024];if (strerror_r(m_iErrno, errmsg, 1024) == 0)m_strMsg += errmsg; #elseLPVOID lpMsgBuf;FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, m_iErrno, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPTSTR)&lpMsgBuf, 0, NULL);m_strMsg += (char*)lpMsgBuf;LocalFree(lpMsgBuf); #endif}// period #ifndef WIN32m_strMsg += "."; #endifreturn m_strMsg.c_str(); }int CUDTException::getErrorCode() const {return m_iMajor * 1000 + m_iMinor; }void CUDTException::clear() {m_iMajor = 0;m_iMinor = 0;m_iErrno = 0; }const int CUDTException::SUCCESS = 0; const int CUDTException::ECONNSETUP = 1000; const int CUDTException::ENOSERVER = 1001; const int CUDTException::ECONNREJ = 1002; const int CUDTException::ESOCKFAIL = 1003; const int CUDTException::ESECFAIL = 1004; const int CUDTException::ECONNFAIL = 2000; const int CUDTException::ECONNLOST = 2001; const int CUDTException::ENOCONN = 2002; const int CUDTException::ERESOURCE = 3000; const int CUDTException::ETHREAD = 3001; const int CUDTException::ENOBUF = 3002; const int CUDTException::EFILE = 4000; const int CUDTException::EINVRDOFF = 4001; const int CUDTException::ERDPERM = 4002; const int CUDTException::EINVWROFF = 4003; const int CUDTException::EWRPERM = 4004; const int CUDTException::EINVOP = 5000; const int CUDTException::EBOUNDSOCK = 5001; const int CUDTException::ECONNSOCK = 5002; const int CUDTException::EINVPARAM = 5003; const int CUDTException::EINVSOCK = 5004; const int CUDTException::EUNBOUNDSOCK = 5005; const int CUDTException::ENOLISTEN = 5006; const int CUDTException::ERDVNOSERV = 5007; const int CUDTException::ERDVUNBOUND = 5008; const int CUDTException::ESTREAMILL = 5009; const int CUDTException::EDGRAMILL = 5010; const int CUDTException::EDUPLISTEN = 5011; const int CUDTException::ELARGEMSG = 5012; const int CUDTException::EINVPOLLID = 5013; const int CUDTException::EASYNCFAIL = 6000; const int CUDTException::EASYNCSND = 6001; const int CUDTException::EASYNCRCV = 6002; const int CUDTException::ETIMEOUT = 6003; const int CUDTException::EPEERERR = 7000; const int CUDTException::EUNKNOWN = -1;

    Done。

    總結(jié)

    以上是生活随笔為你收集整理的UDT协议实现分析——UDT Socket的创建的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。