日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

UDT协议实现分析——bind、listen与accept

發布時間:2024/4/11 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 UDT协议实现分析——bind、listen与accept 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

UDT Server啟動之后,基于UDT協議的UDP數據可靠傳輸才成為可能,因而接下來分析與UDT Server有關的幾個主要API的實現,來了解下UDT Server是如何listening在特定UDP端口上的。主要有UDT::bind(),UDT::listen()和UDT::accept()等幾個函數。

bind過程

通常UDT Server在創建UDT Socket之后,首先就要調用UDT::bind(),與一個特定的本地UDP端口地址進行綁定,以便可以在希望的端口上監聽。這里來看一下UDT::bind()的實現:

int CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen) {CUDTSocket* s = locate(u);if (NULL == s)throw CUDTException(5, 4, 0);CGuard cg(s->m_ControlLock);// cannot bind a socket more than onceif (INIT != s->m_Status)throw CUDTException(5, 0, 0);// check the size of SOCKADDR structureif (AF_INET == s->m_iIPversion) {if (namelen != sizeof(sockaddr_in))throw CUDTException(5, 3, 0);} else {if (namelen != sizeof(sockaddr_in6))throw CUDTException(5, 3, 0);}s->m_pUDT->open();updateMux(s, name);s->m_Status = OPENED;// copy address information of local nodes->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr);return 0; }int CUDTUnited::bind(UDTSOCKET u, UDPSOCKET udpsock) {CUDTSocket* s = locate(u);if (NULL == s)throw CUDTException(5, 4, 0);CGuard cg(s->m_ControlLock);// cannot bind a socket more than onceif (INIT != s->m_Status)throw CUDTException(5, 0, 0);sockaddr_in name4;sockaddr_in6 name6;sockaddr* name;socklen_t namelen;if (AF_INET == s->m_iIPversion) {namelen = sizeof(sockaddr_in);name = (sockaddr*) &name4;} else {namelen = sizeof(sockaddr_in6);name = (sockaddr*) &name6;}if (-1 == ::getsockname(udpsock, name, &namelen))throw CUDTException(5, 3);s->m_pUDT->open();updateMux(s, name, &udpsock);s->m_Status = OPENED;// copy address information of local nodes->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr);return 0; }int CUDT::bind(UDTSOCKET u, const sockaddr* name, int namelen) {try {return s_UDTUnited.bind(u, name, namelen);} catch (CUDTException& e) {s_UDTUnited.setError(new CUDTException(e));return ERROR;} catch (bad_alloc&) {s_UDTUnited.setError(new CUDTException(3, 2, 0));return ERROR;} catch (...) {s_UDTUnited.setError(new CUDTException(-1, 0, 0));return ERROR;} }int CUDT::bind(UDTSOCKET u, UDPSOCKET udpsock) {try {return s_UDTUnited.bind(u, udpsock);} catch (CUDTException& e) {s_UDTUnited.setError(new CUDTException(e));return ERROR;} catch (bad_alloc&) {s_UDTUnited.setError(new CUDTException(3, 2, 0));return ERROR;} catch (...) {s_UDTUnited.setError(new CUDTException(-1, 0, 0));return ERROR;} }int bind(UDTSOCKET u, const struct sockaddr* name, int namelen) {return CUDT::bind(u, name, namelen); }int bind2(UDTSOCKET u, UDPSOCKET udpsock) {return CUDT::bind(u, udpsock); }

UDT主要提供了兩個bind接口,分別是UDT::bind()和,UDT::bind2()。UDT::bind()將一個UDT Socket與一個struct sockaddr對象描述的地址進行綁定,這需要UDT自己先創建相應的系統UDP socket,并將該系統UDP socket綁定到地址,然后把UDT Socket綁定到該系統UDP socket;UDT::bind2()則將一個UDT Socket直接與一個已經創建好的系統UDP socket進行綁定。

這兩個API的實現結構與UDT::socket()的實現結構基本一致,一樣是分為3層:UDT命名空間中提供了給應用程序調用的接口,可稱為UDT API或User API;User API調用CUDT API,這一層主要用來做錯誤處理,也就是捕獲動作實際執行過程中拋出的異常并保存起來,然后給應用程序使用;CUDT API調用CUDTUnited中API的實現。

這里主要來看CUDTUnited中bind()函數的實現。先來看CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen)函數的實現:

  • 調用CUDTUnited::locate(),根據SocketID,也就是UDT Socket handle在CUDTUnited的std::map<UDTSOCKET, CUDTSocket*> m_Sockets中找到對應的CUDTSocket結構(src/api.cpp):

    CUDTSocket* CUDTUnited::locate(const UDTSOCKET u) {CGuard cg(m_ControlLock);map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u);if ((i == m_Sockets.end()) || (i->second->m_Status == CLOSED))return NULL;return i->second; }

    若找不到,則直接返回;否則,繼續執行。

  • 檢查CUDTSocket對象的狀態,如果當前的狀態不為INIT,直接拋異常退出;否則,繼續執行。

  • 根據本地IP地址的版本,檢查綁定到的目標地址的長度的有效性。IP版本是在UDT Socket創建時指定的。如果無效,則直接拋異常退出;否則,繼續執行。

  • 執行相應的CUDT的open()操作(src/core.cpp):

    void CUDT::open() {CGuard cg(m_ConnectionLock);// Initial sequence number, loss, acknowledgement, etc.m_iPktSize = m_iMSS - 28;m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;m_iEXPCount = 1;m_iBandwidth = 1;m_iDeliveryRate = 16;m_iAckSeqNo = 0;m_ullLastAckTime = 0;// trace informationm_StartTime = CTimer::getTime();m_llSentTotal = m_llRecvTotal = m_iSndLossTotal = m_iRcvLossTotal = m_iRetransTotal = m_iSentACKTotal =m_iRecvACKTotal = m_iSentNAKTotal = m_iRecvNAKTotal = 0;m_LastSampleTime = CTimer::getTime();m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceRcvLoss = m_iTraceRetrans = m_iSentACK = m_iRecvACK =m_iSentNAK = m_iRecvNAK = 0;m_llSndDuration = m_llSndDurationTotal = 0;// structures for queueif (NULL == m_pSNode)m_pSNode = new CSNode;m_pSNode->m_pUDT = this;m_pSNode->m_llTimeStamp = 1;m_pSNode->m_iHeapLoc = -1;if (NULL == m_pRNode)m_pRNode = new CRNode;m_pRNode->m_pUDT = this;m_pRNode->m_llTimeStamp = 1;m_pRNode->m_pPrev = m_pRNode->m_pNext = NULL;m_pRNode->m_bOnList = false;m_iRTT = 10 * m_iSYNInterval;m_iRTTVar = m_iRTT >> 1;m_ullCPUFrequency = CTimer::getCPUFrequency();// set up the timersm_ullSYNInt = m_iSYNInterval * m_ullCPUFrequency;// set minimum NAK and EXP timeout to 100msm_ullMinNakInt = 300000 * m_ullCPUFrequency;m_ullMinExpInt = 300000 * m_ullCPUFrequency;m_ullACKInt = m_ullSYNInt;m_ullNAKInt = m_ullMinNakInt;uint64_t currtime;CTimer::rdtsc(currtime);m_ullLastRspTime = currtime;m_ullNextACKTime = currtime + m_ullSYNInt;m_ullNextNAKTime = currtime + m_ullNAKInt;m_iPktCount = 0;m_iLightACKCount = 1;m_ullTargetTime = 0;m_ullTimeDiff = 0;// Now UDT is opened.m_bOpened = true; }

    在這個函數中,主要還是對變量的初始化,后面會再結合UDT可靠傳輸的具體機制,來說明這些變量的具體含義。

  • 執行updateMux()函數更新UDT Socket的多路復用器的相關信息,后面我們會再來詳細了解這個更新操作。

  • 將CUDTSocket對象的狀態更新為OPENED。

  • 將發送隊列的Channel的地址信息拷貝到本節點的s->m_pSelfAddr,m_pSelfAddrde對象的內存空間是在創建UDT Socket的CUDTUnited::newSocket()函數中分配的。
    后面會再來解釋UDT中Channel和多路復用器Multipexer的含義。

  • 返回0給調用者表示成功結束。

  • 再來看CUDTUnited::bind(UDTSOCKET u, UDPSOCKET udpsock)函數將UDT Socket綁定到一個已經創建好的系統UDP socket的過程:

  • 調用CUDTUnited::locate(),根據SocketID,也就是UDT Socket handle在CUDTUnited的std::map<UDTSOCKET, CUDTSocket*> m_Sockets中找到對應的CUDTSocket結構。若找不到,則直接返回;否則,繼續執行。

  • 檢查CUDTSocket對象的狀態,如果當前的狀態不為INIT,直接拋異常退出;否則,繼續執行。

  • 獲取系統UDP socket的網絡地址(含端口信息)。若獲取失敗則拋異常推出;否則,繼續執行。

  • 執行相應的CUDT的open()操作對一些變量進行初始化。

  • 執行updateMux()函數更新UDT Socket的多路復用器的相關信息,后面我們會再來詳細了解這個更新操作。

  • 將CUDTSocket對象的狀態更新為OPENED。

  • 將發送隊列的Channel的地址信息拷貝到本節點的s->m_pSelfAddr,m_pSelfAddrde對象的內存空間是在創建UDT Socket的CUDTUnited::newSocket()函數中分配的。
    后面會再來解釋UDT中Channel和多路復用器Multipexer的含義。

  • 返回0給調用者表示成功結束。m_MultiplexerLock

  • 總體來說,bind操作使的UDT Socket狀態機的狀態由INIT狀態,轉換到了OPENED狀態。

    CUDTUnited的這兩個bind()函數有如此多的重復邏輯,總讓人覺得,是有方法做進一步的抽象,以消除重復的邏輯,并使這兩個函數的實現都更加精簡的。

    UDT Socket與多路復用器的關聯

    bind()操作所做的最最重要的事大概就是將UDT Socket與多路復用器關聯,也就是CUDTUnited::updateMux()函數的執行了。為了后面能夠更清晰地說明更新多路復用器的操作過程,這里先說明一下UDT的多路復用器CMultiplexer、通道CChannel、發送隊列CSndQueue和接收隊列CRcvQueue的含義。

    UDT中的通道CChannel是系統UDP socket的一個封裝,它主要封裝了系統UDP socket handle,IP版本號,socket地址的長度,發送緩沖區的大小及接收緩沖區的大小等信息,并提供了用于操作 系統UDP socket進行數據收發或屬性設置等動作的函數。我們可以看一下這個class的定義(src/channel.h):

    class CChannel {public:CChannel();CChannel(int version);~CChannel();// Functionality:// Open a UDP channel.// Parameters:// 0) [in] addr: The local address that UDP will use.// Returned value:// None.void open(const sockaddr* addr = NULL);// Functionality:// Open a UDP channel based on an existing UDP socket.// Parameters:// 0) [in] udpsock: UDP socket descriptor.// Returned value:// None.void open(UDPSOCKET udpsock);// Functionality:// Disconnect and close the UDP entity.// Parameters:// None.// Returned value:// None.void close() const;// Functionality:// Get the UDP sending buffer size.// Parameters:// None.// Returned value:// Current UDP sending buffer size.int getSndBufSize();// Functionality:// Get the UDP receiving buffer size.// Parameters:// None.// Returned value:// Current UDP receiving buffer size.int getRcvBufSize();// Functionality:// Set the UDP sending buffer size.// Parameters:// 0) [in] size: expected UDP sending buffer size.// Returned value:// None.void setSndBufSize(int size);// Functionality:// Set the UDP receiving buffer size.// Parameters:// 0) [in] size: expected UDP receiving buffer size.// Returned value:// None.void setRcvBufSize(int size);// Functionality:// Query the socket address that the channel is using.// Parameters:// 0) [out] addr: pointer to store the returned socket address.// Returned value:// None.void getSockAddr(sockaddr* addr) const;// Functionality:// Send a packet to the given address.// Parameters:// 0) [in] addr: pointer to the destination address.// 1) [in] packet: reference to a CPacket entity.// Returned value:// Actual size of data sent.int sendto(const sockaddr* addr, CPacket& packet) const;// Functionality:// Receive a packet from the channel and record the source address.// Parameters:// 0) [in] addr: pointer to the source address.// 1) [in] packet: reference to a CPacket entity.// Returned value:// Actual size of data received.int recvfrom(sockaddr* addr, CPacket& packet) const;private:void setUDPSockOpt();private:int m_iIPversion; // IP versionint m_iSockAddrSize; // socket address structure size (pre-defined to avoid run-time test)UDPSOCKET m_iSocket; // socket descriptorint m_iSndBufSize; // UDP sending buffer sizeint m_iRcvBufSize; // UDP receiving buffer size };

    接收隊列CRcvQueue在初始化時會起一個線程,該線程在被停掉前,會不斷地由CChannel接收其它節點發送過來的UDP消息,可以將這個線程看做是listening在系統UDP 端口上的一個UDP Server。在接收到消息之后,該線程會根據消息的類型及目標 SocketID,把消息dispatch給不同的UDT Socket的CUDT對象。比如對于Handshake類型的消息就會dispatch給listening的UDT Socket的CUDT對象。后面我們研究具體的消息收發的時候再來仔細看這個類的設計。

    發送隊列CSndQueue,主要用于同步地向特定的目標發送一個UDT的Packet,或者在適當的時機異步地發送一些消息,它同樣會在初始化是起一個線程,用來執行異步地發送任務。這個class是UDT做可靠傳輸的一個比較關鍵的class,后面我們研究具體的消息收發的時候再來仔細看這個類的設計。

    UDT的多路復用器結構CMultiplexer將所有這些與特定的系統UDP socket相關聯的CChannel,CRcvQueue,CSndQueue包在一起,并描述了這個系統UDP socket收發的數據的一些公有屬性,有UDP 端口號,IP版本號,最大的包大小,引用計數,是否可復用,及用做哈希索引的ID等。可以看一下這個class的定義:

    struct CMultiplexer {CSndQueue* m_pSndQueue; // The sending queueCRcvQueue* m_pRcvQueue; // The receiving queueCChannel* m_pChannel; // The UDP channel for sending and receivingCTimer* m_pTimer; // The timerint m_iPort; // The UDP port number of this multiplexerint m_iIPversion; // IP versionint m_iMSS; // Maximum Segment Sizeint m_iRefCount; // number of UDT instances that are associated with this multiplexerbool m_bReusable; // if this one can be shared with othersint m_iID; // multiplexer IDCMultiplexer(): m_pSndQueue(NULL),m_pRcvQueue(NULL),m_pChannel(NULL),m_pTimer(NULL),m_iPort(0),m_iIPversion(0),m_iMSS(0),m_iRefCount(0),m_bReusable(true),m_iID(0) {} };

    接著來看CUDTUnited::updateMux()函數的定義(src/api.cpp):

    void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock) {CGuard cg(m_ControlLock);CMultiplexer m;if ((s->m_pUDT->m_bReuseAddr) && (NULL != addr)) {int port = (AF_INET == s->m_pUDT->m_iIPversion) ?ntohs(((sockaddr_in*) addr)->sin_port) : ntohs(((sockaddr_in6*) addr)->sin6_port);// find a reusable addressfor (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin(); i != m_mMultiplexer.end(); ++i) {if ((i->second.m_iIPversion == s->m_pUDT->m_iIPversion) && (i->second.m_iMSS == s->m_pUDT->m_iMSS)&& i->second.m_bReusable) {if (i->second.m_iPort == port) {// reuse the existing multiplexerm = i->second;break;}}}}// a new multiplexer is neededif (m.m_iID == 0) {m.m_iMSS = s->m_pUDT->m_iMSS;m.m_iIPversion = s->m_pUDT->m_iIPversion;m.m_bReusable = s->m_pUDT->m_bReuseAddr;m.m_iID = s->m_SocketID;m.m_pChannel = new CChannel(s->m_pUDT->m_iIPversion);m.m_pChannel->setSndBufSize(s->m_pUDT->m_iUDPSndBufSize);m.m_pChannel->setRcvBufSize(s->m_pUDT->m_iUDPRcvBufSize);try {if (NULL != udpsock)m.m_pChannel->open(*udpsock);elsem.m_pChannel->open(addr);} catch (CUDTException& e) {m.m_pChannel->close();delete m.m_pChannel;throw e;}sockaddr* sa =(AF_INET == s->m_pUDT->m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;m.m_pChannel->getSockAddr(sa);m.m_iPort = (AF_INET == s->m_pUDT->m_iIPversion) ?ntohs(((sockaddr_in*) sa)->sin_port) : ntohs(((sockaddr_in6*) sa)->sin6_port);if (AF_INET == s->m_pUDT->m_iIPversion)delete (sockaddr_in*) sa;elsedelete (sockaddr_in6*) sa;m.m_pTimer = new CTimer;m.m_pSndQueue = new CSndQueue;m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer);m.m_pRcvQueue = new CRcvQueue;m.m_pRcvQueue->init(32, s->m_pUDT->m_iPayloadSize, m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer);m_mMultiplexer[m.m_iID] = m;}++m.m_iRefCount;s->m_pUDT->m_pSndQueue = m.m_pSndQueue;s->m_pUDT->m_pRcvQueue = m.m_pRcvQueue;s->m_iMuxID = m.m_iID; }
  • 這個函數首先會在已經創建的多路復用器的map中查找,看看是否存在 要與多路復用器關聯的UDT Socket可用的多路復用器存在。對于一個UDT Socket來說,UDT Socket本身網絡地址可復用,且某個多路復用器同時滿足它的CChannel的UDP端口號與UDT Socket要bind的目標UDP端口號匹配,它的CChannel的IP地址版本及MSS與UDT Socket的IP地址版本及MSS匹配,它本身可復用,則該多路復用器就是該UDT Socket可用的多路復用器。

  • 若在前面的步驟中,沒有找到可用的多路復用器,則創建一個。
    根據UDT Socket的MSS值,IP版本號,及地址的可復用性來初始化CMultiplexer的對應值。設置CMultiplexer的ID為UDT Socket的SocketID。也就是說,某個CMultiplexer的ID就是與它關聯的首個UDT Socket的SocketID。
    創建CChannel,設置系統UDP socket發送緩沖區及接收緩沖區的大小。并執行CChannel的open()操作。在CChannel::open()中如果不是綁定的已經創建好的系統UDP socket的話,它會自行創建系統UDP socket,并綁定到目標端口上。
    獲取CChannel實際綁定的UDP端口號,賦值給m.m_iPort。
    創建CTimer。
    創建并初始化CSndQueue。
    創建并初始化CRcvQueue。
    將新建的CMultiplexer放進std::map<int, CMultiplexer> m_mMultiplexer中。
    在CUDTUnited類定義中可以看到如下幾行:

    private:std::map<int, CMultiplexer> m_mMultiplexer; // UDP multiplexerpthread_mutex_t m_MultiplexerLock;

    原本設計似乎是要用m_MultiplexerLock來保證對m_mMultiplexer多線程的互斥訪問的,但卻沒有一個地方有用到這個m_MultiplexerLock。不知是發現保護全無必要,還是有所遺漏?

  • 將UDT Socket與多路復用器關聯起來,不管是找到的現成可用的,還是完全新創建的。這里可以看到所謂的將UDT Socket與多路復用器關聯的含義,即是讓CUDTSocket的CUDT對象m_pUDT的發送隊列和接收隊列指向CMultiplexer的發送隊列和接收隊列,設置CUDTSocket的多路復用器ID為CMultiplexer的ID m_iID,這樣后面CUDTSocket和CUDT就可以使用發送隊列CSndQueue和接收隊列CRcvQueue進行數據的收發,并可在需要的時候找到相關的CMultiplexer對象了。
    自此之后,CUDTSocket就有了可以用來收發數據的設施了。

  • 總結一下UDT bind的主要過程。UDT bind過程中,做的最主要的事情就是,根據一個已經創建好的UDT Socket的一些信息及要綁定的本地UDP端口,找到或創建一個多路復用器CMultiplexer,將UDT Socket與該CMultiplexer關聯,即設置CUDTSocket的多路復用器ID m_iMuxID為該CMultiplexer的ID,UDT Socket的發送隊列指針和接收隊列指針指向該CMultiplexer的發送隊列和接收隊列。后續UDT Socket就可以通過發送隊列/接收隊列及它們的CChannel進行數據的收發了。在這個過程中,UDT Socket狀態機完成了狀態由INIT到OPENED的轉變。

    listen過程

    在UDT Server端,對UDT Socket執行了bind操作之后,就可以執行listen來等待其它節點的連接了。這里來看下UDT listen的過程(src/api.cpp):

    int CUDTUnited::listen(const UDTSOCKET u, int backlog) {CUDTSocket* s = locate(u);if (NULL == s)throw CUDTException(5, 4, 0);CGuard cg(s->m_ControlLock);// do nothing if the socket is already listeningif (LISTENING == s->m_Status)return 0;// a socket can listen only if is in OPENED statusif (OPENED != s->m_Status)throw CUDTException(5, 5, 0);// listen is not supported in rendezvous connection setupif (s->m_pUDT->m_bRendezvous)throw CUDTException(5, 7, 0);if (backlog <= 0)throw CUDTException(5, 3, 0);s->m_uiBackLog = backlog;try {s->m_pQueuedSockets = new set<UDTSOCKET>;s->m_pAcceptSockets = new set<UDTSOCKET>;} catch (...) {delete s->m_pQueuedSockets;delete s->m_pAcceptSockets;throw CUDTException(3, 2, 0);}s->m_pUDT->listen();s->m_Status = LISTENING;return 0; }int CUDT::listen(UDTSOCKET u, int backlog) {try {return s_UDTUnited.listen(u, backlog);} catch (CUDTException& e) {s_UDTUnited.setError(new CUDTException(e));return ERROR;} catch (bad_alloc&) {s_UDTUnited.setError(new CUDTException(3, 2, 0));return ERROR;} catch (...) {s_UDTUnited.setError(new CUDTException(-1, 0, 0));return ERROR;} }int listen(UDTSOCKET u, int backlog) {return CUDT::listen(u, backlog); }

    這個API的實現同樣分為3層,UDT命名空間提供的直接給應用程序調用User API層,CUDT API層用于做異常處理,CUDTUnited具體實現API的功能。這里直接來分析CUDTUnited::listen()函數:

  • 調用CUDTUnited::locate(),查找UDT Socket對應的CUDTSocket結構。若找不到,則拋出異常直接返回;否則,繼續執行。

  • 檢查CUDTSocket對象的狀態,如果當前的狀態為LISTENING,則說明UDT Socket已經處于監聽狀態了,直接返回;若當前狀態不為OPENED,直接拋異常退出,否則,繼續執行。這就限制了只有經過了bind操作的UDT Socket才能監聽,也就是UDT Socket的狀態只能由OPENED轉為LISTENING。

  • 檢查是否是rendezvous的UDT Socket,若是則拋出異常推出。這確保在監聽的UDT Socket不能為rendezvous的。

  • 檢查傳入的backlog參數并進行設置。backlog參數用于指定Listening的UDT Socket同一時刻能夠處理的最大的等待連接的請求數。Listening的UDT Socket在收到連接請求的Handshake消息后,經過幾次來回確認,會創建新的UDT Socket以便于通過UDT::accept()函數返回給應用程序,用于與請求連接的發起方進行通信。backlog值用于限定,還沒有通過accept()返回的新創建的UDT Socket的個數。

  • 創建兩個UDTSOCKET的集合m_pQueuedSockets和m_pAcceptSockets,前者為Listening的UDT Socket的連接已經成功建立但還未通過UDT::accept()返回給應用程序的UDT Socket的集合;而后者則是已經通過UDT::accept()返回給應用程序的UDT Socket的集合。

  • 執行UDTSocket的CUDT的listen()操作,可以看一下CUDT listen動作的具體含義(src/core.cpp):

    void CUDT::listen() {CGuard cg(m_ConnectionLock);if (!m_bOpened)throw CUDTException(5, 0, 0);if (m_bConnecting || m_bConnected)throw CUDTException(5, 2, 0);// listen can be called more than onceif (m_bListening)return;// if there is already another socket listening on the same portif (m_pRcvQueue->setListener(this) < 0)throw CUDTException(5, 11, 0);m_bListening = true; }

    先是進行狀態的合法性檢查。
    然后執行m_pRcvQueue->setListener(this),將本CUDT設置為接收隊列的listener。
    最后設置CUDT的狀態m_bListening為true。
    這里可以看出CUDTSocket與CUDT是表示UDT Socket的兩層狀態機,它們的狀態之間有關聯,但又有各自的描述方法。這樣似乎大大增加了這個UDT Socket狀態管理的復雜度了。
    再來看一下CRcvQueue::setListener()(src/queue.cpp):

    int CRcvQueue::setListener(CUDT* u) {CGuard lslock(m_LSLock);if (NULL != m_pListener)return -1;m_pListener = u;return 0; }

    設置接收隊列CRcvQueue的Listener。

  • 設置CUDTSocket的狀態為LISTENING并返回。
    可以看到對于UDT::listen()的調用,促使UDT Socket的狀態由OPENED轉換為了LISTENING。UDT::listen()主要的作用就是 為與UDT Socket關聯的特定端口上的多路復用器CMultiplexer的接收隊列CRcvQueue設置listener,這個動作最主要的意義在于消息的dispatch。我們知道CMultiplexer的接收隊列CRcvQueue在創建、初始化時會起一個線程,不斷地試圖從網絡接收UDP消息,在收到消息之后,將消息dispatch給不同的UDT Socket處理,其中的Handshake等消息,就會被dispatch給listener CUDT處理。后面在具體研究消息的收發時會再來詳細研究這個過程。

  • accept過程

    UDT Server端在對listening執行了UDT::listen()操作之后,就可以執行UDT::accept()操作來等待其它節點連接自己了。來看一下UDT::accept()的執行過程(src/api.cpp):

    UDTSOCKET CUDTUnited::accept(const UDTSOCKET listen, sockaddr* addr, int* addrlen) {if ((NULL != addr) && (NULL == addrlen))throw CUDTException(5, 3, 0);CUDTSocket* ls = locate(listen);if (ls == NULL)throw CUDTException(5, 4, 0);// the "listen" socket must be in LISTENING statusif (LISTENING != ls->m_Status)throw CUDTException(5, 6, 0);// no "accept" in rendezvous connection setupif (ls->m_pUDT->m_bRendezvous)throw CUDTException(5, 7, 0);UDTSOCKET u = CUDT::INVALID_SOCK;bool accepted = false;// !!only one conection can be set up each time!! #ifndef WIN32while (!accepted) {pthread_mutex_lock(&(ls->m_AcceptLock));if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken) {// This socket has been closed.accepted = true;} else if (ls->m_pQueuedSockets->size() > 0) {u = *(ls->m_pQueuedSockets->begin());ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());accepted = true;} else if (!ls->m_pUDT->m_bSynRecving) {accepted = true;}if (!accepted && (LISTENING == ls->m_Status))pthread_cond_wait(&(ls->m_AcceptCond), &(ls->m_AcceptLock));if (ls->m_pQueuedSockets->empty())m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);pthread_mutex_unlock(&(ls->m_AcceptLock));} #elsewhile (!accepted){WaitForSingleObject(ls->m_AcceptLock, INFINITE);if (ls->m_pQueuedSockets->size() > 0){u = *(ls->m_pQueuedSockets->begin());ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());accepted = true;}else if (!ls->m_pUDT->m_bSynRecving)accepted = true;ReleaseMutex(ls->m_AcceptLock);if (!accepted & (LISTENING == ls->m_Status))WaitForSingleObject(ls->m_AcceptCond, INFINITE);if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken){// Send signal to other threads that are waiting to accept.SetEvent(ls->m_AcceptCond);accepted = true;}if (ls->m_pQueuedSockets->empty())m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);} #endifif (u == CUDT::INVALID_SOCK) {// non-blocking receiving, no connection availableif (!ls->m_pUDT->m_bSynRecving)throw CUDTException(6, 2, 0);// listening socket is closedthrow CUDTException(5, 6, 0);}if ((addr != NULL) && (addrlen != NULL)) {if (AF_INET == locate(u)->m_iIPversion)*addrlen = sizeof(sockaddr_in);else*addrlen = sizeof(sockaddr_in6);// copy address information of peer nodememcpy(addr, locate(u)->m_pPeerAddr, *addrlen);}return u; }UDTSOCKET CUDT::accept(UDTSOCKET u, sockaddr* addr, int* addrlen) {try {return s_UDTUnited.accept(u, addr, addrlen);} catch (CUDTException& e) {s_UDTUnited.setError(new CUDTException(e));return INVALID_SOCK;} catch (...) {s_UDTUnited.setError(new CUDTException(-1, 0, 0));return INVALID_SOCK;} }UDTSOCKET accept(UDTSOCKET u, struct sockaddr* addr, int* addrlen) {return CUDT::accept(u, addr, addrlen); }

    這個API實現的3層結構與UDT::bind(),UDT::listen()一樣,不再贅述。來看CUDTUnited::accept()的實現:

  • 調用CUDTUnited::locate(),查找UDT Socket對應的CUDTSocket結構。若找不到,則拋出異常直接返回;否則,繼續執行。

  • 檢查CUDTSocket對象的狀態。可見CUDTUnited::accept()操作要求相應的UDT Socket必須處于LISTENING狀態,且不能為Rendezvous模式。這個地方對于ls->m_pUDT->m_bRendezvous的檢查似乎有些多余了,在CUDTUnited::listen()中可以看到,如果UDT Socket處于Rendezvous模式的話,根本就不可能完成狀態由OPENED到LISTENING的轉換,因而對于UDT Socket LISTENING狀態的檢查已經足夠了。

  • 通過一個循環來等待其它節點的連接。這指的是,等待ls->m_pQueuedSockets中被放入為新的連接創建的UDT Socket。有新的連接時,CUDTUnited::accept()線程被喚醒,它會將UDT Socket從ls->m_pQueuedSockets中移到ls->m_pAcceptSockets,并準備將UDT Socket返回給調用者。當然CUDTUnited::accept()的等待過程結束的條件不只是有新連接進來,在Listening的UDT Socket被closed掉時,ls->m_pUDT->m_bBroken會被設置,UDT Socket的狀態也可能會發生變化,此時等待過程會結束;或者UDT Socket處于同步接收狀態,則無論是否有新連接,等待過程都會盡快結束。

  • 等待連接的過程意外退出,也就是在沒有等到新連接進來的情況下等待過程就退出了的情況下,拋出異常退出。如果UDT Socket處于同步接收狀態,拋出某個類型的異常,否則拋出另外一種類型的異常來表示UDT Socket被關閉了。這個地方的邏輯,向調用者展示的異常信息可能具有誤導性,比如一個同步接收的UDT Socket被關閉了,向調用者展示的信息似乎仍然表明,UDT Socket是由于同步接收的問題而沒有等到新連接進來才退出的。

  • 等到了新連接進來的情況下,將發起端的網絡地址拷貝給調用者。

  • 將新UDT Socket的SocketID返回給調用者。
    可以看到,UDT::accept()這個地方是一個典型的生產者-消費者模型。UDT::accept()是消費者,消費的對象是ls->m_pQueuedSockets中的UDT Socket。我們分析UDT::accept()函數的實現,只能看到這個關于生產-消費的故事的一半,另一半關于生產的故事則需要通過更仔細地分析CRcvQueue::worker()的執行來了解了。

  • 總結一下這幾個操作與Listening Socket狀態變化之間的關系,如下圖所示:


    Done.

    總結

    以上是生活随笔為你收集整理的UDT协议实现分析——bind、listen与accept的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。