日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

UDT协议实现分析——连接的建立

發布時間:2024/4/11 编程问答 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 UDT协议实现分析——连接的建立 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

UDT Server在執行UDT::listen()之后,就可以接受其它節點的連接請求了。這里我們研究一下UDT連接建立的過程。

連接的發起

來看連接的發起方。如前面我們看到的那樣,UDT Client創建一個Socket,可以將該Socket綁定到某個端口,也可以不綁定,然后就可以調用UDT::connect()將這個Socket連接到UDT Server了。來看UDT::connect()的定義(src/api.cpp):

int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, int namelen) {CUDTSocket* s = locate(u);if (NULL == s)throw CUDTException(5, 4, 0);CGuard cg(s->m_ControlLock);// check the size of SOCKADDR structureif (AF_INET == s->m_iIPversion) {if (namelen != sizeof(sockaddr_in))throw CUDTException(5, 3, 0);} else {if (namelen != sizeof(sockaddr_in6))throw CUDTException(5, 3, 0);}// a socket can "connect" only if it is in INIT or OPENED statusif (INIT == s->m_Status) {if (!s->m_pUDT->m_bRendezvous) {s->m_pUDT->open();updateMux(s);s->m_Status = OPENED;} elsethrow CUDTException(5, 8, 0);} else if (OPENED != s->m_Status)throw CUDTException(5, 2, 0);// connect_complete() may be called before connect() returns.// So we need to update the status before connect() is called,// otherwise the status may be overwritten with wrong value (CONNECTED vs. CONNECTING).s->m_Status = CONNECTING;try {s->m_pUDT->connect(name);} catch (CUDTException &e) {s->m_Status = OPENED;throw e;}// record peer addressdelete s->m_pPeerAddr;if (AF_INET == s->m_iIPversion) {s->m_pPeerAddr = (sockaddr*) (new sockaddr_in);memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in));} else {s->m_pPeerAddr = (sockaddr*) (new sockaddr_in6);memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in6));}return 0; }int CUDT::connect(UDTSOCKET u, const sockaddr* name, int namelen) {try {return s_UDTUnited.connect(u, name, namelen);} catch (CUDTException &e) {s_UDTUnited.setError(new CUDTException(e));return ERROR;} catch (bad_alloc&) {s_UDTUnited.setError(new CUDTException(3, 2, 0));return ERROR;} catch (...) {s_UDTUnited.setError(new CUDTException(-1, 0, 0));return ERROR;} }int connect(UDTSOCKET u, const struct sockaddr* name, int namelen) {return CUDT::connect(u, name, namelen); }

UDT::connect() API實現的結構跟其它的API沒有太大的區別,不再贅述,直接來分析CUDTUnited::connect():

  • 調用CUDTUnited::locate(),查找UDT Socket對應的CUDTSocket結構。若找不到,則拋出異常直接返回;否則,繼續執行。

  • 根據UDT Socket的IP版本,檢查目標地址的有效性。若無效,則退出,否則繼續執行。

  • 檢查UDT Socket的狀態。確保只有處于INIT或OPENED狀態的UDT Socket才可以執行connect()操作。新創建的UDT Socket處于INIT狀態,bind之后UDT Socket處于OPENED狀態。如果UDT Socket處于INIT狀態,且不是Rendezvous模式,還會執行s->m_pUDT->open(),將UDT Socket與多路復用器CMultiplexer,然后將狀態置為OPENED。
    前面我們在bind的執行過程中有看到將UDT Socket與多路復用器CMultiplexer關聯的過程CUDTUnited::updateMux()。但這里執行的updateMux()的不同之處在于,它既沒有傳遞有效的系統UDP socket,也沒有傳遞有效的本地端口地址。回想updateMux()的實現,這兩個參數主要決定了CMultiplexer的CChannel將與哪個端口關聯。來看兩個CChannel::open()的實現(src/channel.cpp):

  • void CChannel::open(const sockaddr* addr) {// construct an socketm_iSocket = ::socket(m_iIPversion, SOCK_DGRAM, 0);#ifdef WIN32if (INVALID_SOCKET == m_iSocket) #elseif (m_iSocket < 0) #endifthrow CUDTException(1, 0, NET_ERROR);if (NULL != addr) {socklen_t namelen = m_iSockAddrSize;if (0 != ::bind(m_iSocket, addr, namelen))throw CUDTException(1, 3, NET_ERROR);} else {//sendto or WSASendTo will also automatically bind the socketaddrinfo hints;addrinfo* res;memset(&hints, 0, sizeof(struct addrinfo));hints.ai_flags = AI_PASSIVE;hints.ai_family = m_iIPversion;hints.ai_socktype = SOCK_DGRAM;if (0 != ::getaddrinfo(NULL, "0", &hints, &res))throw CUDTException(1, 3, NET_ERROR);if (0 != ::bind(m_iSocket, res->ai_addr, res->ai_addrlen))throw CUDTException(1, 3, NET_ERROR);::freeaddrinfo(res);}setUDPSockOpt(); }void CChannel::open(UDPSOCKET udpsock) {m_iSocket = udpsock;setUDPSockOpt(); }void CChannel::setUDPSockOpt() { #if defined(BSD) || defined(OSX)// BSD system will fail setsockopt if the requested buffer size exceeds system maximum valueint maxsize = 64000;if (0 != ::setsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, (char*)&m_iRcvBufSize, sizeof(int)))::setsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, (char*)&maxsize, sizeof(int));if (0 != ::setsockopt(m_iSocket, SOL_SOCKET, SO_SNDBUF, (char*)&m_iSndBufSize, sizeof(int)))::setsockopt(m_iSocket, SOL_SOCKET, SO_SNDBUF, (char*)&maxsize, sizeof(int)); #else// for other systems, if requested is greated than maximum, the maximum value will be automactally usedif ((0 != ::setsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, (char*) &m_iRcvBufSize, sizeof(int)))|| (0 != ::setsockopt(m_iSocket, SOL_SOCKET, SO_SNDBUF, (char*) &m_iSndBufSize, sizeof(int))))throw CUDTException(1, 3, NET_ERROR); #endiftimeval tv;tv.tv_sec = 0; #if defined (BSD) || defined (OSX)// Known BSD bug as the day I wrote this code.// A small time out value will cause the socket to block forever.tv.tv_usec = 10000; #elsetv.tv_usec = 100; #endif#ifdef UNIX// Set non-blocking I/O// UNIX does not support SO_RCVTIMEOint opts = ::fcntl(m_iSocket, F_GETFL);if (-1 == ::fcntl(m_iSocket, F_SETFL, opts | O_NONBLOCK))throw CUDTException(1, 3, NET_ERROR); #elif WIN32DWORD ot = 1; //millisecondsif (0 != ::setsockopt(m_iSocket, SOL_SOCKET, SO_RCVTIMEO, (char *)&ot, sizeof(DWORD)))throw CUDTException(1, 3, NET_ERROR); #else// Set receiving time-out valueif (0 != ::setsockopt(m_iSocket, SOL_SOCKET, SO_RCVTIMEO, (char *) &tv, sizeof(timeval)))throw CUDTException(1, 3, NET_ERROR); #endif }

    可以看到CChannel::open()主要是把UDT的CChannel與一個系統的UDP socket關聯起來,它們總共處理了3中情況,一是調用者已經創建并綁定到了目標端口的系統UDP socket,這種最簡單,直接將傳遞進來的UDPSOCKET賦值給CChannel的m_iSocket,然后設置系統UDP socket的選項;二是傳遞進來了一個有效的本地端口地址,此時CChannel會自己先創建一個系統UDP socket,并將該socket綁定到傳進來的目標端口地址,一、二兩種情況正是UDT的兩個bind API的情況;三是既沒有有效的系統UDP socket,又沒有有效的本地端口地址傳進來,則會在創建了系統UDP socket之后,先再找一個可用的端口地址,然后將該socket綁定到找到的端口地址,這也就是UDT Socket沒有bind,直接connect的情況。

  • 將UDT Socket的狀態置為CONNECTING。

  • 執行s->m_pUDT->connect(name),連接UDT Server。如果連接失敗,有異常拋出,UDT Socket的狀態會退回到OPENED狀態,然后返回。在這個函數中會完成建立連接整個的網絡消息交互過程。

  • 將連接的目標地址復制到UDT Socket的Peer Address。然后返回0表示成功結束。

  • 在仔細地分析連接建立過程中的數據包交互之前,可以先粗略地看一下這個過程收發了幾個包,及各個包收發的順序。我們知道在UDT中,所有數據包的收發都是通過CChannel完成的,我們可以在CChannel::sendto()和CChannel::recvfrom()中加log來track這一過程。通過UDT提供的demo程序appserver和appclient(在app/目錄下)來研究。先在一個終端下執行appserver:

    xxxxxx@ThundeRobot:/media/data/downloads/hudt/app$ ./appserver server is ready at port: 9000

    改造appclient,使得它只發送一個比較小的數據包就結束,編譯后在另一個終端下執行,可以看到有如下的logs吐出來:

    xxxxxx@ThundeRobot:/media/data/downloads/hudt/app$ ./appclient 127.0.0.1 9000 To connect CRcvQueue::registerConnector Send packet 0Receive packet 364855723 unit->m_Packet.m_iID 364855723 Send packet 0Receive packet 364855723 unit->m_Packet.m_iID 364855723To send data. send 10 bytes Send packet 1020108693Receive packet 364855723 unit->m_Packet.m_iID 364855723 Send packet 1020108693Receive packet 364855723 unit->m_Packet.m_iID 364855723 Send packet 1020108693Receive packet 364855723 unit->m_Packet.m_iID 364855723 Send packet 1020108693

    在appclient運行的這段時間,在運行appserver的終端下的可以看到有如下的logs輸出:

    xxxxxx@ThundeRobot:/media/data/downloads/hudt/app$ ./appserver server is ready at port: 9000Receive packet 0 unit->m_Packet.m_iID 0 Send packet 364855723Receive packet 0 unit->m_Packet.m_iID 0 new CUDTSocket SocketID is 1020108693 PeerID 364855723 Send packet 364855723new connection: 127.0.0.1:59847Receive packet 1020108693 unit->m_Packet.m_iID 1020108693 Send packet 364855723Send packet 364855723Send packet 364855723Receive packet 1020108693 unit->m_Packet.m_iID 1020108693Receive packet 1020108693 unit->m_Packet.m_iID 1020108693Receive packet 1020108693 unit->m_Packet.m_iID 1020108693 recv:Connection was broken.

    可以看到,UDT Client端先發送了一個消息MSG1給UDT Server;UDT Server端收到消息MSG1之后,回了一個消息MSG2給UDT Client;UDT Client收到消息MSG2,又回了一個消息MSG3給UDT Server;UDT Server收到消息MSG3后又回了一個消息MSG4給UDT Client,然后從UDT::accept()返回,自此UDT Server認為一個連接已經成功建立;UDT Client則在收到消息MSG4后,從UDT::connect()返回,并自此認為連接已成功建立,可以進行數據的收發了。用一幅圖來描述這個過程:


    150954_myfS_919237.png

    至于MSG1、2、3、4的具體格式及內容,則留待我們后面來具體分析了。

    接著來看連接建立過程消息交互具體的實現,也就是CUDT::connect()函數:

    void CUDT::connect(const sockaddr* serv_addr) {CGuard cg(m_ConnectionLock);if (!m_bOpened)throw CUDTException(5, 0, 0);if (m_bListening)throw CUDTException(5, 2, 0);if (m_bConnecting || m_bConnected)throw CUDTException(5, 2, 0);// record peer/server addressdelete m_pPeerAddr;m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;memcpy(m_pPeerAddr, serv_addr, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));// register this socket in the rendezvous queue// RendezevousQueue is used to temporarily store incoming handshake, non-rendezvous connections also require this functionuint64_t ttl = 3000000;if (m_bRendezvous)ttl *= 10;ttl += CTimer::getTime();m_pRcvQueue->registerConnector(m_SocketID, this, m_iIPversion, serv_addr, ttl);// This is my current configurationsm_ConnReq.m_iVersion = m_iVersion;m_ConnReq.m_iType = m_iSockType;m_ConnReq.m_iMSS = m_iMSS;m_ConnReq.m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize) ? m_iRcvBufSize : m_iFlightFlagSize;m_ConnReq.m_iReqType = (!m_bRendezvous) ? 1 : 0;m_ConnReq.m_iID = m_SocketID;CIPAddress::ntop(serv_addr, m_ConnReq.m_piPeerIP, m_iIPversion);// Random Initial Sequence Numbersrand((unsigned int) CTimer::getTime());m_iISN = m_ConnReq.m_iISN = (int32_t) (CSeqNo::m_iMaxSeqNo * (double(rand()) / RAND_MAX));m_iLastDecSeq = m_iISN - 1;m_iSndLastAck = m_iISN;m_iSndLastDataAck = m_iISN;m_iSndCurrSeqNo = m_iISN - 1;m_iSndLastAck2 = m_iISN;m_ullSndLastAck2Time = CTimer::getTime();// Inform the server my configurations.CPacket request;char* reqdata = new char[m_iPayloadSize];request.pack(0, NULL, reqdata, m_iPayloadSize);// ID = 0, connection requestrequest.m_iID = 0;int hs_size = m_iPayloadSize;m_ConnReq.serialize(reqdata, hs_size);request.setLength(hs_size);m_pSndQueue->sendto(serv_addr, request);m_llLastReqTime = CTimer::getTime();m_bConnecting = true;// asynchronous connect, return immediatelyif (!m_bSynRecving) {delete[] reqdata;return;}// Wait for the negotiated configurations from the peer side.CPacket response;char* resdata = new char[m_iPayloadSize];response.pack(0, NULL, resdata, m_iPayloadSize);CUDTException e(0, 0);while (!m_bClosing) {// avoid sending too many requests, at most 1 request per 250msif (CTimer::getTime() - m_llLastReqTime > 250000) {m_ConnReq.serialize(reqdata, hs_size);request.setLength(hs_size);if (m_bRendezvous)request.m_iID = m_ConnRes.m_iID;m_pSndQueue->sendto(serv_addr, request);m_llLastReqTime = CTimer::getTime();}response.setLength(m_iPayloadSize);if (m_pRcvQueue->recvfrom(m_SocketID, response) > 0) {if (connect(response) <= 0)break;// new request/response should be sent out immediately on receving a responsem_llLastReqTime = 0;}if (CTimer::getTime() > ttl) {// timeoute = CUDTException(1, 1, 0);break;}}delete[] reqdata;delete[] resdata;if (e.getErrorCode() == 0) {if (m_bClosing) // if the socket is closed before connection...e = CUDTException(1);else if (1002 == m_ConnRes.m_iReqType) // connection request rejectede = CUDTException(1, 2, 0);else if ((!m_bRendezvous) && (m_iISN != m_ConnRes.m_iISN)) // secuity checke = CUDTException(1, 4, 0);}if (e.getErrorCode() != 0)throw e; }

    可以看到,在這個函數中主要完成了如下的這樣一些事情:

  • 檢查CUDT的狀態。確保只有已經與多路復用器關聯,即處于OPENED狀態的UDT Socket才能執行CUDT::connect()操作。如前面看到的,bind操作可以使UDT Socket進入OPENED狀態。對于沒有進行過bind的UDT Socket,CUDTUnited::connect()會做這樣的保證。

  • 拷貝目標網絡地址為UDT Socket的PeerAddr。

  • 執行m_pRcvQueue->registerConnector()向接收隊列注冊Connector。來看這個函數的執行過程(src/queue.cpp):

  • void CRendezvousQueue::insert(const UDTSOCKET& id, CUDT* u, int ipv, const sockaddr* addr, uint64_t ttl) {CGuard vg(m_RIDVectorLock);CRL r;r.m_iID = id;r.m_pUDT = u;r.m_iIPversion = ipv;r.m_pPeerAddr = (AF_INET == ipv) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;memcpy(r.m_pPeerAddr, addr, (AF_INET == ipv) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));r.m_ullTTL = ttl;m_lRendezvousID.push_back(r); }void CRcvQueue::registerConnector(const UDTSOCKET& id, CUDT* u, int ipv, const sockaddr* addr, uint64_t ttl) {m_pRendezvousQueue->insert(id, u, ipv, addr, ttl); }

    可以看到,在這個函數中,主要是向接收隊列CRcvQueue的CRendezvousQueue m_pRendezvousQueue中插入了一個CRL結構。那CRendezvousQueue又是個什么東西呢?來看它的定義(src/queue.h):

    class CRendezvousQueue {public:CRendezvousQueue();~CRendezvousQueue();public:void insert(const UDTSOCKET& id, CUDT* u, int ipv, const sockaddr* addr, uint64_t ttl);void remove(const UDTSOCKET& id);CUDT* retrieve(const sockaddr* addr, UDTSOCKET& id);void updateConnStatus();private:struct CRL {UDTSOCKET m_iID; // UDT socket ID (self)CUDT* m_pUDT; // UDT instanceint m_iIPversion; // IP versionsockaddr* m_pPeerAddr; // UDT sonnection peer addressuint64_t m_ullTTL; // the time that this request expires};std::list<CRL> m_lRendezvousID; // The sockets currently in rendezvous modepthread_mutex_t m_RIDVectorLock; };

    可以看到,它就是一個簡單的容器,提供的操作也是常規的插入、移除及檢索等操作:

    void CRendezvousQueue::remove(const UDTSOCKET& id) {CGuard vg(m_RIDVectorLock);for (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++i) {if (i->m_iID == id) {if (AF_INET == i->m_iIPversion)delete (sockaddr_in*) i->m_pPeerAddr;elsedelete (sockaddr_in6*) i->m_pPeerAddr;m_lRendezvousID.erase(i);return;}} }CUDT* CRendezvousQueue::retrieve(const sockaddr* addr, UDTSOCKET& id) {CGuard vg(m_RIDVectorLock);// TODO: optimize searchfor (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++i) {if (CIPAddress::ipcmp(addr, i->m_pPeerAddr, i->m_iIPversion) && ((0 == id) || (id == i->m_iID))) {id = i->m_iID;return i->m_pUDT;}}return NULL; }

    那接收隊列CRcvQueue是用這個隊列來做什么的呢?這主要與接收隊列CRcvQueue的消息dispatch機制有關。在接收隊列CRcvQueue的worker線程中,接收到一條消息之后,它會根據消息的目標SocketID,及發送端的地址等信息,將消息以不同的方式進行dispatch,m_pRendezvousQueue中的CUDT是其中的一類dispatch目標。后面我們在研究消息接收時,會再來仔細研究接收隊列CRcvQueue的worker線程及m_pRendezvousQueue。

  • 構造 連接請求 消息CHandShake m_ConnReq。可以看一下CHandShake的定義(src/packet.h):
  • class CHandShake {public:CHandShake();int serialize(char* buf, int& size);int deserialize(const char* buf, int size);public:static const int m_iContentSize; // Size of hand shake datapublic:int32_t m_iVersion; // UDT versionint32_t m_iType; // UDT socket typeint32_t m_iISN; // random initial sequence numberint32_t m_iMSS; // maximum segment sizeint32_t m_iFlightFlagSize; // flow control window sizeint32_t m_iReqType; // connection request type: 1: regular connection request, 0: rendezvous connection request, -1/-2: responseint32_t m_iID; // socket IDint32_t m_iCookie; // cookieuint32_t m_piPeerIP[4]; // The IP address that the peer's UDP port is bound to };

    CHandShake的m_iID為發起端UDT Socket的SocketID,請求類型m_iReqType將被設置為了1,還設置了m_iMSS用于協商MSS值。CHandShake的構造函數會初始化所有的字段(src/packet.cpp):

    CHandShake::CHandShake(): m_iVersion(0),m_iType(0),m_iISN(0),m_iMSS(0),m_iFlightFlagSize(0),m_iReqType(0),m_iID(0),m_iCookie_iCookie(0) {for (int i = 0; i < 4; ++i)m_piPeerIP[i] = 0; }

    可以看到m_iCookie被初始化為了0。但注意在這里,CHandShake m_ConnReq的構造過程中,m_iCookie并沒有被賦予新值。

  • 隨機初始化序列號Sequence Number。

  • 創建一個CPacket結構request,為它創建大小為m_iPayloadSize的緩沖區,將該緩沖區pack進CPacket結構,并專門把request.m_iID,也就是這個包發送的目的UDT SocketID,設置為0。

  • m_iPayloadSize的值根據UDT Socket創建者的不同,在不同的地方設置。由應用程序創建的UDT Socket在CUDT::open()中設置,比如Listening的UDT Socket在bind時會執行CUDT::open(),或者連接UDT Server但沒有執行過bind操作的UDT Socket會在CUDTUnited::connect()中執行CUDT::open();UDT Server中由Listening的UDT Socket收到連接請求時創建的UDT Socket,在CUDT::connect(const sockaddr peer, CHandShake hs)中初設置;發起連接的UDT Socket還會在CUDT::connect(const CPacket& response)中再次更新這個值。但這個值總是被設置為m_iPktSize - CPacket::m_iPktHdrSize,CPacket::m_iPktHdrSize為固定的UDT Packet Header大小16。

    m_iPktSize總是與m_iPayloadSize在相同的地方設置,被設置為m_iMSS - 28。m_iMSS,MSS(Maximum Segment Size,最大報文長度),這里是UDT協議定義的一個選項,用于在UDT連接建立時,收發雙方協商通信時每一個報文段所能承載的最大數據長度。在CUDT對象創建時被初始化為1500,但可以通過UDT::setsockopt()進行設置。

    這里先來看一下CPacket的結構(src/packet.h):

    class CPacket {friend class CChannel;friend class CSndQueue;friend class CRcvQueue;public:int32_t& m_iSeqNo; // alias: sequence numberint32_t& m_iMsgNo; // alias: message numberint32_t& m_iTimeStamp; // alias: timestampint32_t& m_iID; // alias: socket IDchar*& m_pcData; // alias: data/control informationstatic const int m_iPktHdrSize; // packet header sizepublic:CPacket();~CPacket();// Functionality:// Get the payload or the control information field length.// Parameters:// None.// Returned value:// the payload or the control information field length.int getLength() const;// Functionality:// Set the payload or the control information field length.// Parameters:// 0) [in] len: the payload or the control information field length.// Returned value:// None.void setLength(int len);// Functionality:// Pack a Control packet.// Parameters:// 0) [in] pkttype: packet type filed.// 1) [in] lparam: pointer to the first data structure, explained by the packet type.// 2) [in] rparam: pointer to the second data structure, explained by the packet type.// 3) [in] size: size of rparam, in number of bytes;// Returned value:// None.void pack(int pkttype, void* lparam = NULL, void* rparam = NULL, int size = 0);// Functionality:// Read the packet vector.// Parameters:// None.// Returned value:// Pointer to the packet vector.iovec* getPacketVector();// Functionality:// Read the packet flag.// Parameters:// None.// Returned value:// packet flag (0 or 1).int getFlag() const;// Functionality:// Read the packet type.// Parameters:// None.// Returned value:// packet type filed (000 ~ 111).int getType() const;// Functionality:// Read the extended packet type.// Parameters:// None.// Returned value:// extended packet type filed (0x000 ~ 0xFFF).int getExtendedType() const;// Functionality:// Read the ACK-2 seq. no.// Parameters:// None.// Returned value:// packet header field (bit 16~31).int32_t getAckSeqNo() const;// Functionality:// Read the message boundary flag bit.// Parameters:// None.// Returned value:// packet header field [1] (bit 0~1).int getMsgBoundary() const;// Functionality:// Read the message inorder delivery flag bit.// Parameters:// None.// Returned value:// packet header field [1] (bit 2).bool getMsgOrderFlag() const;// Functionality:// Read the message sequence number.// Parameters:// None.// Returned value:// packet header field [1] (bit 3~31).int32_t getMsgSeq() const;// Functionality:// Clone this packet.// Parameters:// None.// Returned value:// Pointer to the new packet.CPacket* clone() const;protected:uint32_t m_nHeader[4]; // The 128-bit header fieldiovec m_PacketVector[2]; // The 2-demension vector of UDT packet [header, data]int32_t __pad;protected:CPacket& operator=(const CPacket&); };

    它的數據成員是有4個uint32_t元素的數組m_nHeader,描述UDT Packet的Header,和有兩個元素的iovec數組m_PacketVector。另外的幾個引用則主要是為了方便對這些數據成員的訪問,看下CPacket的構造函數就一目了然了(src/packet.cpp):

    // Set up the aliases in the constructure CPacket::CPacket(): m_iSeqNo((int32_t&) (m_nHeader[0])),m_iMsgNo((int32_t&) (m_nHeader[1])),m_iTimeStamp((int32_t&) (m_nHeader[2])),m_iID((int32_t&) (m_nHeader[3])),m_pcData((char*&) (m_PacketVector[1].iov_base)),__pad() {for (int i = 0; i < 4; ++i)m_nHeader[i] = 0;m_PacketVector[0].iov_base = (char *) m_nHeader;m_PacketVector[0].iov_len = CPacket::m_iPktHdrSize;m_PacketVector[1].iov_base = NULL;m_PacketVector[1].iov_len = 0; }

    注意m_PacketVector的第一個元素指向了m_nHeader。

    在CPacket::pack()中:

    void CPacket::pack(int pkttype, void* lparam, void* rparam, int size) {// Set (bit-0 = 1) and (bit-1~15 = type)m_nHeader[0] = 0x80000000 | (pkttype << 16);// Set additional information and control information fieldswitch (pkttype) {case 2: //0010 - Acknowledgement (ACK)// ACK packet seq. no.if (NULL != lparam)m_nHeader[1] = *(int32_t *) lparam;// data ACK seq. no.// optional: RTT (microsends), RTT variance (microseconds) advertised flow window size (packets), and estimated link capacity (packets per second)m_PacketVector[1].iov_base = (char *) rparam;m_PacketVector[1].iov_len = size;break;case 6: //0110 - Acknowledgement of Acknowledgement (ACK-2)// ACK packet seq. no.m_nHeader[1] = *(int32_t *) lparam;// control info field should be none// but "writev" does not allow thism_PacketVector[1].iov_base = (char *) &__pad; //NULL;m_PacketVector[1].iov_len = 4; //0;break;case 3: //0011 - Loss Report (NAK)// loss listm_PacketVector[1].iov_base = (char *) rparam;m_PacketVector[1].iov_len = size;break;case 4: //0100 - Congestion Warning// control info field should be none// but "writev" does not allow thism_PacketVector[1].iov_base = (char *) &__pad; //NULL;m_PacketVector[1].iov_len = 4; //0;break;case 1: //0001 - Keep-alive// control info field should be none// but "writev" does not allow thism_PacketVector[1].iov_base = (char *) &__pad; //NULL;m_PacketVector[1].iov_len = 4; //0;break;case 0: //0000 - Handshake// control info filed is handshake infom_PacketVector[1].iov_base = (char *) rparam;m_PacketVector[1].iov_len = size; //sizeof(CHandShake);break;case 5: //0101 - Shutdown// control info field should be none// but "writev" does not allow thism_PacketVector[1].iov_base = (char *) &__pad; //NULL;m_PacketVector[1].iov_len = 4; //0;break;case 7: //0111 - Message Drop Request// msg idm_nHeader[1] = *(int32_t *) lparam;//first seq no, last seq nom_PacketVector[1].iov_base = (char *) rparam;m_PacketVector[1].iov_len = size;break;case 8: //1000 - Error Signal from the Peer Side// Error typem_nHeader[1] = *(int32_t *) lparam;// control info field should be none// but "writev" does not allow thism_PacketVector[1].iov_base = (char *) &__pad; //NULL;m_PacketVector[1].iov_len = 4; //0;break;case 32767: //0x7FFF - Reserved for user defined control packets// for extended control packet// "lparam" contains the extended type information for bit 16 - 31// "rparam" is the control informationm_nHeader[0] |= *(int32_t *) lparam;if (NULL != rparam) {m_PacketVector[1].iov_base = (char *) rparam;m_PacketVector[1].iov_len = size;} else {m_PacketVector[1].iov_base = (char *) &__pad;m_PacketVector[1].iov_len = 4;}break;default:break;} }

    在CPacket::pack()中,首先將m_nHeader[0],也就是m_iSeqNo的bit-0設為1表示這是一個控制包,將bit-1~15設置為消息的類型,然后根據消息的不同類型進行不同的處理。對于Handshake消息,其pkttype為0,這里主要關注pkttype為0的case。可見它就是讓m_PacketVector[1]指向前面創建的緩沖區。

  • 將Handshake消息m_ConnReq序列化進前面創建的緩沖區,并正確地設置CPacket request的長度:
  • void CPacket::setLength(int len) {m_PacketVector[1].iov_len = len; }int CHandShake::serialize(char* buf, int& size) {if (size < m_iContentSize)return -1;int32_t* p = (int32_t*) buf;*p++ = m_iVersion;*p++ = m_iType;*p++ = m_iISN;*p++ = m_iMSS;*p++ = m_iFlightFlagSize;*p++ = m_iReqType;*p++ = m_iID;*p++ = m_iCookie;for (int i = 0; i < 4; ++i)*p++ = m_piPeerIP[i];size = m_iContentSize;return 0; }

    序列化時,會將Handshake消息m_ConnReq全部的內容拷貝進緩沖區。略感奇怪,這個地方竟然完全沒有顧及字節序的問題。

  • 調用發送隊列的sendto()函數,向目標地址發送消息:
  • int CSndQueue::sendto(const sockaddr* addr, CPacket& packet) {// send out the packet immediately (high priority), this is a control packetm_pChannel->sendto(addr, packet);return packet.getLength(); }

    CSndQueue的sendto()函數直接調用了CChannel::sendto():

    int CChannel::sendto(const sockaddr* addr, CPacket& packet) const {cout << "CChannel send packet " << packet.m_iID << endl << endl;// convert control information into network orderif (packet.getFlag())for (int i = 0, n = packet.getLength() / 4; i < n; ++i)*((uint32_t *) packet.m_pcData + i) = htonl(*((uint32_t *) packet.m_pcData + i));// convert packet header into network order//for (int j = 0; j < 4; ++ j)// packet.m_nHeader[j] = htonl(packet.m_nHeader[j]);uint32_t* p = packet.m_nHeader;for (int j = 0; j < 4; ++j) {*p = htonl(*p);++p;}#ifndef WIN32msghdr mh;mh.msg_name = (sockaddr*) addr;mh.msg_namelen = m_iSockAddrSize;mh.msg_iov = (iovec*) packet.m_PacketVector;mh.msg_iovlen = 2;mh.msg_control = NULL;mh.msg_controllen = 0;mh.msg_flags = 0;int res = ::sendmsg(m_iSocket, &mh, 0); #elseDWORD size = CPacket::m_iPktHdrSize + packet.getLength();int addrsize = m_iSockAddrSize;int res = ::WSASendTo(m_iSocket, (LPWSABUF)packet.m_PacketVector, 2, &size, 0, addr, addrsize, NULL, NULL);res = (0 == res) ? size : -1; #endif// convert back into local host order//for (int k = 0; k < 4; ++ k)// packet.m_nHeader[k] = ntohl(packet.m_nHeader[k]);p = packet.m_nHeader;for (int k = 0; k < 4; ++k) {*p = ntohl(*p);++p;}if (packet.getFlag()) {for (int l = 0, n = packet.getLength() / 4; l < n; ++l)*((uint32_t *) packet.m_pcData + l) = ntohl(*((uint32_t *) packet.m_pcData + l));}return res; }

    在CChannel::sendto()中會處理Header的字節序問題。

    這里總結一下,UDT Client向UDT Server發送的連接建立請求消息的內容:消息主要分為兩個部分一個是消息的Header,一個是消息的Content。Header為4個uint32_t類型變量,從前到后這4個變量的含義分別為sequence number,message number,timestamp和目標SocketID。就Handshake而言,sequence number的最高位,也就是bit-0為1,表示這是一個控制消息,bit-1~15為pkttype 0,其它位為0;message number及timestamp均為0,目標SocketID為0。

    Content部分,總共48個字節,主要用于進行連接的協商,如MSS等,具體可以看CHandShake。

  • 檢查是否是同步接收模式。如果不是的話,則delete掉前面為request CPacket的CHandShake創建的緩沖區并退出。后面與UDT Server端進一步的消息交互會有接收隊列等幫忙異步地推動。否則繼續執行。值得一提的是,CUDT在其構造函數中,會將m_bSynRecving置為true,但在拷貝構造函數中,則會繼承傳入的值。但這個值如同MSS值一樣,也可以通過UDT::setOpt()設置。也就是說由應用程序創建的UDT Socket默認處于同步接收模式,比如Listening的UDT Socket和發起連接的UDT Socket,但可以自行設置,由Listening的UDT Socket在接收到連接建立請求時創建的UDT Socket,則會繼承Listening UDT Socket的對應值。
  • 我們暫時先看SynRecving模式,也就是默認模式下的UDT Socket的行為。

  • 創建一個CPacket response,同樣為它創建一個大小為m_iPayloadSize的緩沖區以存放數據,并將緩沖區pack進response中。這個CPacket response會被用來存放從UDT Server發回的相應的信息。

  • 進入一個循環執行后續的握手動作,及消息的超時重傳等動作。可以將這個循環看做由3個部分組成。

  • 循環開始的地方是一段發送消息的代碼,在這段代碼中,其實做了兩個事情,或者說可能會發送兩種類型的消息,一是第一個握手消息的超時重傳,二是第二個握手消息的發送及超時重傳。看上去發送的都是CHandShake m_ConnReq,但在接收到第一個握手消息的響應之后,這個結構的某些成員會根據響應而被修改。注意,發送第一個握手消息之后,首次進入循環,將會跳過這個部分。

    之后的第二部分,主要用于接收響應,第一個握手消息的響應及第二個握手消息的響應。來看CRcvQueue::recvfrom()(src/queue.cpp):

    int CRcvQueue::recvfrom(int32_t id, CPacket& packet) {CGuard bufferlock(m_PassLock);map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);if (i == m_mBuffer.end()) { #ifndef WIN32uint64_t now = CTimer::getTime();timespec timeout;timeout.tv_sec = now / 1000000 + 1;timeout.tv_nsec = (now % 1000000) * 1000;pthread_cond_timedwait(&m_PassCond, &m_PassLock, &timeout); #elseReleaseMutex(m_PassLock);WaitForSingleObject(m_PassCond, 1000);WaitForSingleObject(m_PassLock, INFINITE); #endifi = m_mBuffer.find(id);if (i == m_mBuffer.end()) {packet.setLength(-1);return -1;}}// retrieve the earliest packetCPacket* newpkt = i->second.front();if (packet.getLength() < newpkt->getLength()) {packet.setLength(-1);return -1;}// copy packet contentmemcpy(packet.m_nHeader, newpkt->m_nHeader, CPacket::m_iPktHdrSize);memcpy(packet.m_pcData, newpkt->m_pcData, newpkt->getLength());packet.setLength(newpkt->getLength());delete[] newpkt->m_pcData;delete newpkt;// remove this message from queue,// if no more messages left for this socket, release its data structurei->second.pop();if (i->second.empty())m_mBuffer.erase(i);return packet.getLength(); }

    這也是一個生產者-消費者模型,在這里就如同listen的過程一樣,也只能看到這個生產與消費的故事的一半,即消費的那一半。生產者也是RcvQueue的worker線程。這個地方會等待著消息的到來,但也不會無限制的等待,可以看到,這里接收消息的等待時間大概為1s。這里是在等待一個CPacket隊列的出現,也就是m_mBuffer中目標UDT Socket的CPacket隊列。這里會從這個隊列中取出第一個packet返回給調用者。如果隊列被取空了,會直接將這個隊列從m_mBuffer中移除出去。

    循環的第三部分是整個連接建立消息交互過程的超時處理,可以看到,非Rendezvous模式下超時時間為3s,Rendezvous模式下,超時時間則會延長十倍。

    CUDT::connect()執行到接收第一個握手消息的相應時,連接建立請求的發起也算是基本完成了。下面來看UDT Server端收到這個消息時是如何處理的。

    UDT Server對首個Handshake消息的處理

    來看UDT Server端收到這個消息時是如何處理的。如我們前面在 UDT協議實現分析——bind、listen與accept 一文中了解到的,Listening的UDT Socket會在UDT::accept()中等待連接請求進來,那是一個生產者與消費者的故事,UDT::accept()是生產者,接收隊列RcvQueue的worker線程是消費者。

    我們這就來仔細地看一下RcvQueue的worker線程,當然重點會關注對于Handshake消息,也就是目標SocketID為0,pkttype為0的packet的處理(src/queue.cpp):

    #ifndef WIN32 void* CRcvQueue::worker(void* param) #elseDWORD WINAPI CRcvQueue::worker(LPVOID param) #endif{CRcvQueue* self = (CRcvQueue*) param;sockaddr* addr =(AF_INET == self->m_UnitQueue.m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;CUDT* u = NULL;int32_t id;while (!self->m_bClosing) { #ifdef NO_BUSY_WAITINGself->m_pTimer->tick(); #endif// check waiting list, if new socket, insert it to the listwhile (self->ifNewEntry()) {CUDT* ne = self->getNewEntry();if (NULL != ne) {self->m_pRcvUList->insert(ne);self->m_pHash->insert(ne->m_SocketID, ne);}}// find next available slot for incoming packetCUnit* unit = self->m_UnitQueue.getNextAvailUnit();if (NULL == unit) {// no space, skip this packetCPacket temp;temp.m_pcData = new char[self->m_iPayloadSize];temp.setLength(self->m_iPayloadSize);self->m_pChannel->recvfrom(addr, temp);delete[] temp.m_pcData;goto TIMER_CHECK;}unit->m_Packet.setLength(self->m_iPayloadSize);// reading next incoming packet, recvfrom returns -1 is nothing has been receivedif (self->m_pChannel->recvfrom(addr, unit->m_Packet) < 0)goto TIMER_CHECK;id = unit->m_Packet.m_iID;// ID 0 is for connection request, which should be passed to the listening socket or rendezvous socketsif (0 == id) {if (NULL != self->m_pListener)self->m_pListener->listen(addr, unit->m_Packet);else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id))) {// asynchronous connect: call connect here// otherwise wait for the UDT socket to retrieve this packetif (!u->m_bSynRecving)u->connect(unit->m_Packet);elseself->storePkt(id, unit->m_Packet.clone());}} else if (id > 0) {if (NULL != (u = self->m_pHash->lookup(id))) {if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion)) {if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) {if (0 == unit->m_Packet.getFlag())u->processData(unit);elseu->processCtrl(unit->m_Packet);u->checkTimers();self->m_pRcvUList->update(u);}}} else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id))) {if (!u->m_bSynRecving)u->connect(unit->m_Packet);elseself->storePkt(id, unit->m_Packet.clone());}}TIMER_CHECK:// take care of the timing event for all UDT socketsuint64_t currtime;CTimer::rdtsc(currtime);CRNode* ul = self->m_pRcvUList->m_pUList;uint64_t ctime = currtime - 100000 * CTimer::getCPUFrequency();while ((NULL != ul) && (ul->m_llTimeStamp < ctime)) {CUDT* u = ul->m_pUDT;if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) {u->checkTimers();self->m_pRcvUList->update(u);} else {// the socket must be removed from Hash table first, then RcvUListself->m_pHash->remove(u->m_SocketID);self->m_pRcvUList->remove(u);u->m_pRNode->m_bOnList = false;}ul = self->m_pRcvUList->m_pUList;}// Check connection requests status for all sockets in the RendezvousQueue.self->m_pRendezvousQueue->updateConnStatus();}if (AF_INET == self->m_UnitQueue.m_iIPversion)delete (sockaddr_in*) addr;elsedelete (sockaddr_in6*) addr;#ifndef WIN32return NULL; #elseSetEvent(self->m_ExitCond);return 0; #endif }

    這個函數,首先創建了一個sockaddr,用于保存發送端的地址。

    然后就進入了一個循環,不斷地接收UDP消息。

    循環內的第一行是執行Timer的tick(),這個是UDT自己的定時器Timer機制的一部分。

    接下來的這個子循環也主要與RcvQueue的worker線程中消息的dispatch機制有關。

    然后是取一個CUnit,用來接收其它端點發送過來的消息。如果取不到,則接收UDP包并丟棄。然后跳過后面消息dispatch的過程。這個地方的m_UnitQueue用來做緩存,也用來防止收到過多的包消耗過多的資源。完整的CUnitQueue機制暫時先不去仔細分析。

    然后就是取到了CUnit的情況,則先通過CChannel接收一個包,并根據包的內容進行包的dispatch。不能跑偏了,這里主要關注目標SocketID為0,pkttype為0的包的dispatch。可以看到,在Listener存在的情況下,是dispatch給了listener,也就是Listening的UDT Socket的CUDT的listen()函數,否則會dispatch給通道上處于Rendezvous模式的UDT Socket。(在 UDT協議實現分析——bind、listen與accept 一文中關于listen的部分有具體理過這個listener的設置過程。)可以看到,對于相同的通道CChannel,也就是同一個端口上,Rendezvous模式下的UDT Socket和Listening的UDT Socket不能共存,或者說同時存在時,Rendezvous的行為可能不是預期的,但多個處于Rendezvous模式下的UDT Socket可以共存。

    接收隊列CRcvQueue的worker()線程做的其它事情,暫時先不去仔細看。這里先來理一下Listening的UDT Socket在接收到Handshake消息的處理過程,也就是CUDT::listen(sockaddr* addr, CPacket& packet)(src/core.cpp):

    int CUDT::listen(sockaddr* addr, CPacket& packet) {if (m_bClosing)return 1002;if (packet.getLength() != CHandShake::m_iContentSize)return 1004;CHandShake hs;hs.deserialize(packet.m_pcData, packet.getLength());// SYN cookiechar clienthost[NI_MAXHOST];char clientport[NI_MAXSERV];getnameinfo(addr, (AF_INET == m_iVersion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6), clienthost,sizeof(clienthost), clientport, sizeof(clientport), NI_NUMERICHOST | NI_NUMERICSERV);int64_t timestamp = (CTimer::getTime() - m_StartTime) / 60000000; // secret changes every one minutestringstream cookiestr;cookiestr << clienthost << ":" << clientport << ":" << timestamp;unsigned char cookie[16];CMD5::compute(cookiestr.str().c_str(), cookie);if (1 == hs.m_iReqType) {hs.m_iCookie = *(int*) cookie;packet.m_iID = hs.m_iID;int size = packet.getLength();hs.serialize(packet.m_pcData, size);m_pSndQueue->sendto(addr, packet);return 0;} else {if (hs.m_iCookie != *(int*) cookie) {timestamp--;cookiestr << clienthost << ":" << clientport << ":" << timestamp;CMD5::compute(cookiestr.str().c_str(), cookie);if (hs.m_iCookie != *(int*) cookie)return -1;}}int32_t id = hs.m_iID;// When a peer side connects in...if ((1 == packet.getFlag()) && (0 == packet.getType())) {if ((hs.m_iVersion != m_iVersion) || (hs.m_iType != m_iSockType)) {// mismatch, reject the requesths.m_iReqType = 1002;int size = CHandShake::m_iContentSize;hs.serialize(packet.m_pcData, size);packet.m_iID = id;m_pSndQueue->sendto(addr, packet);} else {int result = s_UDTUnited.newConnection(m_SocketID, addr, &hs);if (result == -1)hs.m_iReqType = 1002;// send back a response if connection failed or connection already existed// new connection response should be sent in connect()if (result != 1) {int size = CHandShake::m_iContentSize;hs.serialize(packet.m_pcData, size);packet.m_iID = id;m_pSndQueue->sendto(addr, packet);} else {// a new connection has been created, enable epoll for writes_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);}}}return hs.m_iReqType; }

    在這個函數中主要做了這樣的一些事情:

  • 檢查UDT Socket的狀態,如果處于Closing狀態下,就返回,否則繼續執行。

  • 檢查包的數據部分長度。若長度不為CHandShake::m_iContentSize 48字節,則說明這不是一個有效的Handshake,則返回,否則繼續執行。

  • 創建一個CHandShake hs,并將傳入的packet的數據部分反序列化進這個CHandShake。這里來掃一眼這個CHandShake::deserialize()(src/packet.cpp):

    int CHandShake::deserialize(const char* buf, int size) {if (size < m_iContentSize)return -1;int32_t* p = (int32_t*) buf;m_iVersion = *p++;m_iType = *p++;m_iISN = *p++;m_iMSS = *p++;m_iFlightFlagSize = *p++;m_iReqType = *p++;m_iID = *p++;m_iCookie = *p++;for (int i = 0; i < 4; ++i)m_piPeerIP[i] = *p++;return 0; }
  • 這個函數如同它的反函數serialize()一樣沒有處理字節序的問題。

  • 計算cookie值。所謂cookie值,即由連接發起端的網絡地址(包括IP地址與端口號)及時間戳組成的字符串計算出來的16個字節長度的MD5值。時間戳精確到分鐘值。用于計算MD5值的字符串類似127.0.0.1:49033:0。

  • 計算出來cookie值之后的部分,應該被分成兩個部分。一部分處理連接發起端發送的地一個握手包,也就是hs.m_iReqType == 1的block,在CUDT::connect()中構造m_ConnReq的部分我們有看到這個值要被設為1的;另一部分則處理連接發起端發送的第二個握手消息。這里我們先來看hs.m_iReqType == 1的block。

  • 它取前一步計算的cookie的前4個字節,直接將其強轉為一個int值,賦給前面反序列化的CHandShake的m_iCookie。這個地方竟然顧及字節序的問題,也沒有顧及不同平臺的差異,即int類型的長度在不同的機器上可能不同,這個地方用int32_t似乎要更安全一點。將CHandShake的m_iID,如我們在CUDT::connect()中構造m_ConnReq的部分我們有看到的,為連接發起端UDT Socket的SocketID,設置給packet的m_iID,也就是包的目標SocketID。再將hs重新序列化進packet。通過發送隊列SndQueue發送經過了這一番修改的packet。然后返回。

    總結一下UDT Server中Listening的UDT Socket接收到第一個HandShake包時,對于這個包的處理過程:

    計算一個cookie值,設置給接收到的HandShake的cookie字段,修改包的目標SocketID字段為發起連接的UDT Socket的SocketID,包的其它部分原封不動,最后將這個包重新發回給連接發起端。

    UDT Client發送第二個HandShake消息

    UDT Server接收到第一個HandShake消息,回給UDT Client一個HandShake消息。這樣球就又被踢回給了UDT Client端。接著來看在UDT Client端接收到首個HandShake包的響應后會做什么樣的處理。

    我們知道在CUDT::connect(const sockaddr* serv_addr)中,發送首個HandShake包之后,會調用CRcvQueue::recvfrom()來等著接收UDT Server的響應,消費者焦急地等待著食物的到來。在消息到來時,CUDT::connect()會被生產者,也就是CRcvQueue的worker線程喚醒。這里就來具體看一下這個生產與消費的故事的另一半,生產的故事,也就是CRcvQueue的worker線程的消息dispatch。

    在CRcvQueue::worker()中包dispatch的部分可以看到:

    } else if (id > 0) {if (NULL != (u = self->m_pHash->lookup(id))) {if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion)) {cout << "Receive packet by m_pHash table" << endl;if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) {if (0 == unit->m_Packet.getFlag())u->processData(unit);elseu->processCtrl(unit->m_Packet);u->checkTimers();self->m_pRcvUList->update(u);}}} else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id))) {cout << "Receive packet by m_pRendezvousQueue, u->m_bSynRecving " << u->m_bSynRecving << endl;if (!u->m_bSynRecving)u->connect(unit->m_Packet);elseself->storePkt(id, unit->m_Packet.clone());}}

    我們知道UDT Server回復的消息中是設置了目標SocketID了的。因而會走id > 0的block。

    在CUDT::connect( const sockaddr* serv_addr )中有看到調用m_pRcvQueue->registerConnector()將CUDT添加進RcvQueue的m_pRendezvousQueue中,因而這里會執行id > 0 block中下面的那個block。

    如果前面對于m_bSynRecving的分析,默認情況為true。因而這個地方會執行CRcvQueue::storePkt()來存儲包。來看這個函數的實現:

    void CRcvQueue::storePkt(int32_t id, CPacket* pkt) {CGuard bufferlock(m_PassLock);map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);if (i == m_mBuffer.end()) {m_mBuffer[id].push(pkt);#ifndef WIN32pthread_cond_signal(&m_PassCond); #elseSetEvent(m_PassCond); #endif} else {//avoid storing too many packets, in case of malfunction or attackif (i->second.size() > 16)return;i->second.push(pkt);} }

    在這個函數中會保存接收到的packet,并在必要的時候喚醒等待接收消息的線程。(對應CRcvQueue::recvfrom()的邏輯來看。)

    然后來看CUDT::connect(const sockaddr* serv_addr)在收到第一個HandShake消息的響應之后會做什么樣的處理,也就是CUDT::connect(const CPacket& response)(src/core.cpp):

    int CUDT::connect(const CPacket& response) throw () {// this is the 2nd half of a connection request. If the connection is setup successfully this returns 0.// returning -1 means there is an error.// returning 1 or 2 means the connection is in process and needs more handshakeif (!m_bConnecting)return -1;if (m_bRendezvous && ((0 == response.getFlag()) || (1 == response.getType())) && (0 != m_ConnRes.m_iType)) {//a data packet or a keep-alive packet comes, which means the peer side is already connected// in this situation, the previously recorded response will be usedgoto POST_CONNECT;}if ((1 != response.getFlag()) || (0 != response.getType()))return -1;m_ConnRes.deserialize(response.m_pcData, response.getLength());if (m_bRendezvous) {// regular connect should NOT communicate with rendezvous connect// rendezvous connect require 3-way handshakeif (1 == m_ConnRes.m_iReqType)return -1;if ((0 == m_ConnReq.m_iReqType) || (0 == m_ConnRes.m_iReqType)) {m_ConnReq.m_iReqType = -1;// the request time must be updated so that the next handshake can be sent out immediately.m_llLastReqTime = 0;return 1;}} else {// set cookieif (1 == m_ConnRes.m_iReqType) {m_ConnReq.m_iReqType = -1;m_ConnReq.m_iCookie = m_ConnRes.m_iCookie;m_llLastReqTime = 0;return 1;}}

    這個函數會處理第一個HandShake的響應,也會處理第二個HandShake的響應,這里先來關注第一個HandShake的響應的處理,因而只列出它的一部分的代碼。

    這個函數先是檢查了CUDT的狀態,檢查了packet的有效性,然后就是將接收到的包的數據部分反序列化至CHandShake m_ConnRes中。我們不關注對于Rendezvous模式的處理。

    接著會檢查m_ConnRes的m_iReqType,若為1,則設置m_ConnReq.m_iReqType為-1,設置m_ConnReq.m_iCookie為m_ConnRes.m_iCookie用以標識m_ConnReq為一個合法的第二個HandShake packet;同時設置m_llLastReqTime為0,如我們前面對CUDT::connect(const sockaddr* serv_addr)的分析,以便于此刻保存于m_ConnReq中的第二個HandShake能夠被發送出去as soon as possible。

    這第二個HandShake,與第一個HandShake的差異僅僅在于有了有效的Cookie值,且請求類型ReqType為-1。其它則完全一樣。

    UDT Server對第二個HandShake的處理

    UDT Client對于m_ConnReq的改變并不足以改變接收隊列中worker線程對這個包的dispatch規則,因而直接來看CUDT::listen(sockaddr* addr, CPacket& packet)中對于這第二個HandShake消息的處理。

    接著前面對于這個函數的分析,接前面的第4步。

  • 對于這第二個HandShake,它的ReqType自然不再是1了,而是-1。因而在計算完了cookie值之后,它會先驗證一下HandShake包中的cookie值是否是有效的,如果無效,則直接返回。根據這個地方的邏輯,可以看到cookie的有效時間最長為2分鐘。

  • 檢查包的Flag和Type,如果不是HandShake包,則直接返回,否則繼續執行。

  • 檢查連接發起端IP的版本及Socket類型SockType與本地Listen的UDT Socket是否匹配。若不匹配,則將錯誤碼1002放在發過來的HandShanke的ReqType字段中,設置packet的目標SocketID為發起連接的SocketID,然后將這個包重新發回給UDT Client。

  • 檢查之后,發現完全匹配的情況。調用CUDTUnited::newConnection()創建一個新的UDT Socket。若創建過程執行失敗,則將錯誤碼1002放在發過來的HandShanke的ReqType字段中。若創建成功,會設置發過來的packet的目標SocketID為適當的值,然后將同一個包再發送回UDT Client。CUDTUnited::newConnection()會適當地修改HandShake packet的一些字段。若失敗在執行s_UDTUnited.m_EPoll.update_events()。

  • 返回hs.m_iReqType。

  • 然后來看在CUDTUnited::newConnection()中是如何新建Socket的:

    int CUDTUnited::newConnection(const UDTSOCKET listen, const sockaddr* peer, CHandShake* hs) {CUDTSocket* ns = NULL;CUDTSocket* ls = locate(listen);if (NULL == ls)return -1;// if this connection has already been processedif (NULL != (ns = locate(peer, hs->m_iID, hs->m_iISN))) {if (ns->m_pUDT->m_bBroken) {// last connection from the "peer" address has been brokenns->m_Status = CLOSED;ns->m_TimeStamp = CTimer::getTime();CGuard::enterCS(ls->m_AcceptLock);ls->m_pQueuedSockets->erase(ns->m_SocketID);ls->m_pAcceptSockets->erase(ns->m_SocketID);CGuard::leaveCS(ls->m_AcceptLock);} else {// connection already exist, this is a repeated connection request// respond with existing HS informationhs->m_iISN = ns->m_pUDT->m_iISN;hs->m_iMSS = ns->m_pUDT->m_iMSS;hs->m_iFlightFlagSize = ns->m_pUDT->m_iFlightFlagSize;hs->m_iReqType = -1;hs->m_iID = ns->m_SocketID;return 0;//except for this situation a new connection should be started}}// exceeding backlog, refuse the connection requestif (ls->m_pQueuedSockets->size() >= ls->m_uiBackLog)return -1;try {ns = new CUDTSocket;ns->m_pUDT = new CUDT(*(ls->m_pUDT));if (AF_INET == ls->m_iIPversion) {ns->m_pSelfAddr = (sockaddr*) (new sockaddr_in);((sockaddr_in*) (ns->m_pSelfAddr))->sin_port = 0;ns->m_pPeerAddr = (sockaddr*) (new sockaddr_in);memcpy(ns->m_pPeerAddr, peer, sizeof(sockaddr_in));} else {ns->m_pSelfAddr = (sockaddr*) (new sockaddr_in6);((sockaddr_in6*) (ns->m_pSelfAddr))->sin6_port = 0;ns->m_pPeerAddr = (sockaddr*) (new sockaddr_in6);memcpy(ns->m_pPeerAddr, peer, sizeof(sockaddr_in6));}} catch (...) {delete ns;return -1;}CGuard::enterCS(m_IDLock);ns->m_SocketID = --m_SocketID;cout << "new CUDTSocket SocketID is " << ns->m_SocketID << " PeerID " << hs->m_iID << endl;CGuard::leaveCS(m_IDLock);ns->m_ListenSocket = listen;ns->m_iIPversion = ls->m_iIPversion;ns->m_pUDT->m_SocketID = ns->m_SocketID;ns->m_PeerID = hs->m_iID;ns->m_iISN = hs->m_iISN;int error = 0;try {// bind to the same addr of listening socketns->m_pUDT->open();updateMux(ns, ls);ns->m_pUDT->connect(peer, hs);} catch (...) {error = 1;goto ERR_ROLLBACK;}ns->m_Status = CONNECTED;// copy address information of local nodens->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(ns->m_pSelfAddr);CIPAddress::pton(ns->m_pSelfAddr, ns->m_pUDT->m_piSelfIP, ns->m_iIPversion);// protect the m_Sockets structure.CGuard::enterCS(m_ControlLock);try {m_Sockets[ns->m_SocketID] = ns;m_PeerRec[(ns->m_PeerID << 30) + ns->m_iISN].insert(ns->m_SocketID);} catch (...) {error = 2;}CGuard::leaveCS(m_ControlLock);CGuard::enterCS(ls->m_AcceptLock);try {ls->m_pQueuedSockets->insert(ns->m_SocketID);} catch (...) {error = 3;}CGuard::leaveCS(ls->m_AcceptLock);// acknowledge users waiting for new connections on the listening socketm_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, true);CTimer::triggerEvent();ERR_ROLLBACK: if (error > 0) {ns->m_pUDT->close();ns->m_Status = CLOSED;ns->m_TimeStamp = CTimer::getTime();return -1;}// wake up a waiting accept() call #ifndef WIN32pthread_mutex_lock(&(ls->m_AcceptLock));pthread_cond_signal(&(ls->m_AcceptCond));pthread_mutex_unlock(&(ls->m_AcceptLock)); #elseSetEvent(ls->m_AcceptCond); #endifreturn 1; }

    在這個函數中做了如下這樣的一些事情:

  • 找到listening的UDT Socket的CUDTSocket結構,若找不到則直接返回-1。否則繼續執行。

  • 檢查相同的連接請求是否已經處理過了。在CUDTUnited有一個專門的緩沖區m_PeerRec,用來存放由Listening的Socket創建的UDT Socket,這里主要是通過在這個緩沖區中查找是否已經有connection請求對應的socket來判斷:

  • CUDTSocket* CUDTUnited::locate(const sockaddr* peer, const UDTSOCKET id, int32_t isn) {CGuard cg(m_ControlLock);map<int64_t, set<UDTSOCKET> >::iterator i = m_PeerRec.find((id << 30) + isn);if (i == m_PeerRec.end())return NULL;for (set<UDTSOCKET>::iterator j = i->second.begin(); j != i->second.end(); ++j) {map<UDTSOCKET, CUDTSocket*>::iterator k = m_Sockets.find(*j);// this socket might have been closed and moved m_ClosedSocketsif (k == m_Sockets.end())continue;if (CIPAddress::ipcmp(peer, k->second->m_pPeerAddr, k->second->m_iIPversion))return k->second;}return NULL; }

    如果已經為這個connection請求創建了UDT Socket,又分為兩種情況:

    (1). 為connection請求創建的UDT Socket還是好的,可用的,則根據之前創建的UDT Socket的一些字段設置接收到的HandShake,m_iReqType會被設置為-1,m_iID會被設置為UDT Socket的SocketID。然后返回0。如我們前面在CUDTUnited::newConnection()中看到的,這樣返回之后,CUDTUnited::newConnection()會發送一個響應消息給UDT Client。

    (2). 為connection請求創建的UDT Socket已經爛掉了,不可用了,此時則主要會將其狀態設置為CLOSED,設置時間戳,將其從m_pQueuedSockets和m_pAcceptSockets中移除出去。然后執行后續的新建UDT Socket的流程。

    但對于一個由Listening Socket創建的UDT Socket而言,又會是什么原因導致它處于broken狀態呢?此處這樣的檢查是否真有必要呢?后面會再來研究。

  • 檢查m_pQueuedSockets的大小是否超出了為Listening的UDT Socket設置的backlog大小,若超出,則返回-1,否則繼續執行。

  • 創建一個CUDTSocket對象。創建一個CUDT對象,這里創建的CUDT對象會繼承Listening的UDT Socket的許多屬性(src/api.cpp):

  • CUDT::CUDT(const CUDT& ancestor) {m_pSndBuffer = NULL;m_pRcvBuffer = NULL;m_pSndLossList = NULL;m_pRcvLossList = NULL;m_pACKWindow = NULL;m_pSndTimeWindow = NULL;m_pRcvTimeWindow = NULL;m_pSndQueue = NULL;m_pRcvQueue = NULL;m_pPeerAddr = NULL;m_pSNode = NULL;m_pRNode = NULL;// Initilize mutex and condition variablesinitSynch();// Default UDT configurationsm_iMSS = ancestor.m_iMSS;m_bSynSending = ancestor.m_bSynSending;m_bSynRecving = ancestor.m_bSynRecving;m_iFlightFlagSize = ancestor.m_iFlightFlagSize;m_iSndBufSize = ancestor.m_iSndBufSize;m_iRcvBufSize = ancestor.m_iRcvBufSize;m_Linger = ancestor.m_Linger;m_iUDPSndBufSize = ancestor.m_iUDPSndBufSize;m_iUDPRcvBufSize = ancestor.m_iUDPRcvBufSize;m_iSockType = ancestor.m_iSockType;m_iIPversion = ancestor.m_iIPversion;m_bRendezvous = ancestor.m_bRendezvous;m_iSndTimeOut = ancestor.m_iSndTimeOut;m_iRcvTimeOut = ancestor.m_iRcvTimeOut;m_bReuseAddr = true; // this must be true, because all accepted sockets shared the same port with the listenerm_llMaxBW = ancestor.m_llMaxBW;m_pCCFactory = ancestor.m_pCCFactory->clone();m_pCC = NULL;m_pCache = ancestor.m_pCache;// Initial statusm_bOpened = false;m_bListening = false;m_bConnecting = false;m_bConnected = false;m_bClosing = false;m_bShutdown = false;m_bBroken = false;m_bPeerHealth = true;m_ullLingerExpiration = 0; }

    為SelfAddr分配內存。

    為PeerAddr分配內存。

    拷貝發送端地址到PeerAddr。

    設置SocketID。等等。

  • 執行ns->m_pUDT->open()完成打開動作。然后執行updateMux(ns, ls),將新建的這個UDT Socket綁定到Listening的UDT Socket所綁定的多路復用器:
  • void CUDTUnited::updateMux(CUDTSocket* s, const CUDTSocket* ls) {CGuard cg(m_ControlLock);int port = (AF_INET == ls->m_iIPversion) ?ntohs(((sockaddr_in*) ls->m_pSelfAddr)->sin_port) :ntohs(((sockaddr_in6*) ls->m_pSelfAddr)->sin6_port);// find the listener's addressfor (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin(); i != m_mMultiplexer.end(); ++i) {if (i->second.m_iPort == port) {// reuse the existing multiplexer++i->second.m_iRefCount;s->m_pUDT->m_pSndQueue = i->second.m_pSndQueue;s->m_pUDT->m_pRcvQueue = i->second.m_pRcvQueue;s->m_iMuxID = i->second.m_iID;return;}} }
  • 執行 ns->m_pUDT->connect(peer, hs):
  • void CUDT::connect(const sockaddr* peer, CHandShake* hs) {CGuard cg(m_ConnectionLock);// Uses the smaller MSS between the peersif (hs->m_iMSS > m_iMSS)hs->m_iMSS = m_iMSS;elsem_iMSS = hs->m_iMSS;// exchange info for maximum flow window sizem_iFlowWindowSize = hs->m_iFlightFlagSize;hs->m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize) ? m_iRcvBufSize : m_iFlightFlagSize;m_iPeerISN = hs->m_iISN;m_iRcvLastAck = hs->m_iISN;m_iRcvLastAckAck = hs->m_iISN;m_iRcvCurrSeqNo = hs->m_iISN - 1;m_PeerID = hs->m_iID;hs->m_iID = m_SocketID;// use peer's ISN and send it back for security checkm_iISN = hs->m_iISN;m_iLastDecSeq = m_iISN - 1;m_iSndLastAck = m_iISN;m_iSndLastDataAck = m_iISN;m_iSndCurrSeqNo = m_iISN - 1;m_iSndLastAck2 = m_iISN;m_ullSndLastAck2Time = CTimer::getTime();// this is a reponse handshakehs->m_iReqType = -1;// get local IP address and send the peer its IP address (because UDP cannot get local IP address)memcpy(m_piSelfIP, hs->m_piPeerIP, 16);CIPAddress::ntop(peer, hs->m_piPeerIP, m_iIPversion);m_iPktSize = m_iMSS - 28;m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;// Prepare all structurestry {m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize);m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize);m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2);m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize);m_pACKWindow = new CACKWindow(1024);m_pRcvTimeWindow = new CPktTimeWindow(16, 64);m_pSndTimeWindow = new CPktTimeWindow();} catch (...) {throw CUDTException(3, 2, 0);}CInfoBlock ib;ib.m_iIPversion = m_iIPversion;CInfoBlock::convert(peer, m_iIPversion, ib.m_piIP);if (m_pCache->lookup(&ib) >= 0) {m_iRTT = ib.m_iRTT;m_iBandwidth = ib.m_iBandwidth;}m_pCC = m_pCCFactory->create();m_pCC->m_UDT = m_SocketID;m_pCC->setMSS(m_iMSS);m_pCC->setMaxCWndSize(m_iFlowWindowSize);m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);m_pCC->setRcvRate(m_iDeliveryRate);m_pCC->setRTT(m_iRTT);m_pCC->setBandwidth(m_iBandwidth);m_pCC->init();m_ullInterval = (uint64_t) (m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);m_dCongestionWindow = m_pCC->m_dCWndSize;m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;memcpy(m_pPeerAddr, peer, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));// And of course, it is connected.m_bConnected = true;// register this socket for receiving data packetsm_pRNode->m_bOnList = true;m_pRcvQueue->setNewEntry(this);//send the response to the peer, see listen() for more discussions about thisCPacket response;int size = CHandShake::m_iContentSize;char* buffer = new char[size];hs->serialize(buffer, size);response.pack(0, NULL, buffer, size);response.m_iID = m_PeerID;m_pSndQueue->sendto(peer, response);delete[] buffer; }

    這個函數里會根據HandShake包設置非常多的成員。但主要來關注m_pRcvQueue->setNewEntry(this),這個調用也是與RcvQueue的worker線程的消息dispatch機制有關。后面我們會再來仔細地了解這個函數。

    這個函數會在最后發送響應給UDT Client。

  • 將UDT Socket的狀態置為CONNECTED。拷貝Channel的地址到PeerAddr。

  • 將創建的CUDTSocket放進m_Sockets中,同時放進m_PeerRec中。

  • 將創建的UDT Socket放進m_pQueuedSockets中。這正是Listening UDT Socket accept那個生產-消費故事的另一半,這里是生產者。

  • 將等待在accept()的線程喚醒。至此在UDT Server端,accept()返回一個UDT Socket,UDT Server認為一個連接成功建立。

  • UDT Client從UDT::connect()返回

    如我們前面看到的,CUDT::connect(const sockaddr* serv_addr)在發送了第二個Handshake消息之后,它就會開是等待UDT Server的第二次響應。UDT Server發送第二個Handshake消息的相應之后,UDT Client端將會返回并處理它。這個消息的dispatch過程與第一個HandShake的響應消息的處理過程一致,這里不再贅述。這里來看這第二個HandShake的響應消息的處理,同樣是在CUDT::connect(const CPacket& response)中:

    } else {// set cookieif (1 == m_ConnRes.m_iReqType) {m_ConnReq.m_iReqType = -1;m_ConnReq.m_iCookie = m_ConnRes.m_iCookie;m_llLastReqTime = 0;return 1;}}POST_CONNECT:// Remove from rendezvous queuem_pRcvQueue->removeConnector(m_SocketID);// Re-configure according to the negotiated values.m_iMSS = m_ConnRes.m_iMSS;m_iFlowWindowSize = m_ConnRes.m_iFlightFlagSize;m_iPktSize = m_iMSS - 28;m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;m_iPeerISN = m_ConnRes.m_iISN;m_iRcvLastAck = m_ConnRes.m_iISN;m_iRcvLastAckAck = m_ConnRes.m_iISN;m_iRcvCurrSeqNo = m_ConnRes.m_iISN - 1;m_PeerID = m_ConnRes.m_iID;memcpy(m_piSelfIP, m_ConnRes.m_piPeerIP, 16);// Prepare all data structurestry {m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize);m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize);// after introducing lite ACK, the sndlosslist may not be cleared in time, so it requires twice space.m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2);m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize);m_pACKWindow = new CACKWindow(1024);m_pRcvTimeWindow = new CPktTimeWindow(16, 64);m_pSndTimeWindow = new CPktTimeWindow();} catch (...) {throw CUDTException(3, 2, 0);}CInfoBlock ib;ib.m_iIPversion = m_iIPversion;CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);if (m_pCache->lookup(&ib) >= 0) {m_iRTT = ib.m_iRTT;m_iBandwidth = ib.m_iBandwidth;}m_pCC = m_pCCFactory->create();m_pCC->m_UDT = m_SocketID;m_pCC->setMSS(m_iMSS);m_pCC->setMaxCWndSize(m_iFlowWindowSize);m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);m_pCC->setRcvRate(m_iDeliveryRate);m_pCC->setRTT(m_iRTT);m_pCC->setBandwidth(m_iBandwidth);m_pCC->init();m_ullInterval = (uint64_t) (m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);m_dCongestionWindow = m_pCC->m_dCWndSize;// And, I am connected too.m_bConnecting = false;m_bConnected = true;// register this socket for receiving data packetsm_pRNode->m_bOnList = true;m_pRcvQueue->setNewEntry(this);// acknowledge the management module.s_UDTUnited.connect_complete(m_SocketID);// acknowledde any waiting epolls to writes_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);return 0; }
  • 這里做的第一件事就是調用m_pRcvQueue->removeConnector(m_SocketID)將自己從RevQueue的RendezvousQueue中移除,以表示自己將不再接收Rendezvous消息(src/queue.cpp):

    void CRcvQueue::removeConnector(const UDTSOCKET& id) {m_pRendezvousQueue->remove(id);CGuard bufferlock(m_PassLock);map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);if (i != m_mBuffer.end()) {while (!i->second.empty()) {delete[] i->second.front()->m_pcData;delete i->second.front();i->second.pop();}m_mBuffer.erase(i);} }
  • 這個函數執行完之后,RcvQueue暫時將無法向UDT Socket dispatch包。

  • 根據協商的值重新做配置。這里我們可以再來看一下UDT的協商指的是什么。縱覽連接建立的整個過程,我們并沒有看到針對這些需要協商的值UDT本身有什么特殊的算法來計算,因而所謂的協商則主要是UDT Client端和UDT Server端,針對這些選項,不同應用程序層不同設置的同步協調。

  • 準備所有的數據緩沖區。

  • 設置CUDT的狀態,m_bConnecting為false,m_bConnected為true。

  • 執行m_pRcvQueue->setNewEntry(this),注冊socket來接收數據包。這里來看一下CRcvQueue::setNewEntry(CUDT* u):

  • void CRcvQueue::setNewEntry(CUDT* u) {CGuard listguard(m_IDLock);m_vNewEntry.push_back(u); }

    這個操作本身非常簡單。但把CUDT結構放進CRcvQueue之后,又會發生什么呢?回憶我們前面看到的CRcvQueue::worker(void* param)函數中循環開始部分的這段代碼:

    // check waiting list, if new socket, insert it to the listwhile (self->ifNewEntry()) {CUDT* ne = self->getNewEntry();if (NULL != ne) {self->m_pRcvUList->insert(ne);self->m_pHash->insert(ne->m_SocketID, ne);}}

    對照這段代碼中用到的幾個函數的實現:

    bool CRcvQueue::ifNewEntry() {return !(m_vNewEntry.empty()); }CUDT* CRcvQueue::getNewEntry() {CGuard listguard(m_IDLock);if (m_vNewEntry.empty())return NULL;CUDT* u = (CUDT*) *(m_vNewEntry.begin());m_vNewEntry.erase(m_vNewEntry.begin());return u; }

    可以了解到,在 執行m_pRcvQueue->setNewEntry(this),注冊socket之后,CRcvQueue的worker線程會將這個CUDT結構從它的m_vNewEntry中移到另外的兩個容器m_pRcvUList和m_pHash中。那然后呢?在CRcvQueue::worker(void* param)中不是還有下面這段嗎:

    if (NULL != (u = self->m_pHash->lookup(id))) {if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion)) {cout << "Receive packet by m_pHash table" << endl;if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) {if (0 == unit->m_Packet.getFlag())u->processData(unit);elseu->processCtrl(unit->m_Packet);u->checkTimers();self->m_pRcvUList->update(u);}}} else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id))) {

    就是這樣,可以說,在CUDT::connect(const CPacket& response)中是完成了一次UDT Socket消息接收方式的轉變。

  • 執行s_UDTUnited.connect_complete(m_SocketID)結束整個的connect()過程:
  • void CUDTUnited::connect_complete(const UDTSOCKET u) {CUDTSocket* s = locate(u);if (NULL == s)throw CUDTException(5, 4, 0);// copy address information of local node// the local port must be correctly assigned BEFORE CUDT::connect(),// otherwise if connect() fails, the multiplexer cannot be located by garbage collection and will cause leaks->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr);CIPAddress::pton(s->m_pSelfAddr, s->m_pUDT->m_piSelfIP, s->m_iIPversion);s->m_Status = CONNECTED; }

    UDT Socket至此進入CONNECTED狀態。

    Done。

    總結

    以上是生活随笔為你收集整理的UDT协议实现分析——连接的建立的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    久久久久免费网站 | 久久人人添人人爽添人人88v | 韩日在线一区 | 久久婷婷色综合 | 久久国产热 | 久久久精品久久 | 黄色精品国产 | 成人小视频在线观看免费 | 久久综合九色综合久久久精品综合 | 日日干网址| 国产中年夫妇高潮精品视频 | 亚洲精品欧美视频 | 国产999精品久久久久久麻豆 | 99国产精品一区二区 | 国产精品久久久久久久久久久不卡 | 日韩欧美精品一区 | 美州a亚洲一视本频v色道 | 国产精品久久久毛片 | 人人揉人人揉人人揉人人揉97 | 中文字幕精品视频 | 国产专区欧美专区 | 黄网站色成年免费观看 | 狠狠地操 | 国产精品久久久久久久久免费 | 91福利视频久久久久 | 韩国一区在线 | 欧美日韩色婷婷 | 久操视频在线免费看 | 日韩欧美综合在线视频 | 成人观看视频 | 黄视频网站大全 | 成人午夜电影久久影院 | 久久久电影 | 亚洲成人xxx | 伊色综合久久之综合久久 | 中文字幕观看av | 超碰97中文 | 国产精品爽爽久久久久久蜜臀 | 粉嫩av一区二区三区免费 | 天天爽天天搞 | 中文字幕在线播放日韩 | 精品特级毛片 | 日韩精品一区二区免费视频 | 人人狠狠综合久久亚洲 | 西西大胆免费视频 | 波多野结衣日韩 | 91中文字幕在线观看 | 久久国产精品免费看 | 美女视频又黄又免费 | 99久久99久久精品 | 伊人久久婷婷 | 日韩中文字幕免费电影 | 亚洲狠狠婷婷 | 丁香婷婷色综合亚洲电影 | 免费在线a | 探花视频在线观看免费 | 国产一区二区在线精品 | 久久久久高清 | 久久a视频 | 国产精品久久久久高潮 | 精品亚洲免费视频 | 精品亚洲男同gayvideo网站 | 国产成人免费av电影 | 色资源在线观看 | 国产日韩在线观看一区 | 97超碰人人爱 | 99视频在线免费观看 | 欧美日韩免费观看一区二区三区 | 一级性生活片 | 伊人伊成久久人综合网站 | 色中色综合 | 嫩模bbw搡bbbb搡bbbb | 成人免费视频网址 | 亚洲另类视频在线观看 | 丁香花中文在线免费观看 | 日韩午夜一级片 | 91激情视频在线观看 | 日韩精品播放 | 极品久久久久久久 | 日本电影黄色 | 欧美精品一区二区三区一线天视频 | 99精品99| 麻豆国产视频下载 | 99av国产精品欲麻豆 | 国产91精品一区二区绿帽 | 久久久久久久电影 | 成人免费观看网站 | 久久r精品 | 欧美一级免费黄色片 | 成人免费在线视频 | 国产一区二区视频在线播放 | 国产精品久久久久久久久久久久午 | 久久福利精品 | 久久视讯 | 手机看片中文字幕 | 麻豆久久一区二区 | 色a4yy| 国产精品99视频 | 国产精品永久久久久久久www | 国产1区在线观看 | 99精品亚洲 | 国产色综合天天综合网 | 国产精品女教师 | 99精品在线视频观看 | 欧美激情综合五月色丁香小说 | 久久久亚洲麻豆日韩精品一区三区 | 免费av的网站 | 97福利在线 | 在线看v片 | 国产在线小视频 | 国产理伦在线 | 天天干天天摸 | 亚洲精品玖玖玖av在线看 | 最新av在线免费观看 | 国产黄色免费看 | 久草在在线| 久久久久久久久精 | 欧美精品一区二区三区四区在线 | 久久视了 | 国产视频在线观看一区二区 | 天天插夜夜操 | 99产精品成人啪免费网站 | 国产精品一区二区果冻传媒 | 草久中文字幕 | 成人蜜桃 | 中文字幕一区二区三区四区 | 欧美激情视频一二三区 | 夜色资源站国产www在线视频 | 中文字幕一区二区三区视频 | 色香网| 91九色在线观看视频 | 久久久久久国产一区二区三区 | 中文字幕av免费在线观看 | 精品一区二区三区久久 | 成人免费一级 | 久久久国产精品成人免费 | 九九视频这里只有精品 | 99视频精品视频高清免费 | 在线观看黄色免费视频 | 午夜a区 | 一区二区三区在线播放 | 九九久久婷婷 | 午夜性色 | 日日添夜夜添 | 国产高清在线a视频大全 | 日韩极品视频在线观看 | 91av手机在线| 青青草国产精品视频 | 波多野结衣在线播放视频 | 国产麻豆视频网站 | 成人av资源网 | 99热最新在线| 成人av地址 | 亚洲激情综合 | 亚洲狠狠操 | 日本精品一区二区 | 国内揄拍国产精品 | 五月婷婷色丁香 | 国产成人99av超碰超爽 | 国产成人精品电影久久久 | 色综合狠狠干 | 国产高清专区 | 久色伊人 | 69国产精品视频免费观看 | 永久免费看av | 久久九九视频 | 婷婷六月天丁香 | 欧美日韩一二三四区 | 日韩免费一级电影 | 五月激情视频 | 欧美精品免费一区二区 | 国产伦精品一区二区三区无广告 | 中文字幕在线观看一区二区三区 | 521色香蕉网站在线观看 | 91av视频 | 国产精品嫩草影视久久久 | 久久国产网站 | 美女黄久久 | 国产做aⅴ在线视频播放 | 久久精品欧美一 | 超碰人人91 | 手机在线小视频 | 激情在线网 | 视频一区二区国产 | 欧美成人播放 | 激情丁香在线 | 国产99久久久国产精品免费二区 | 在线91色| 亚洲极色 | 色综合天天狠狠 | 美女网色| 免费观看特级毛片 | 黄色片网站 | 亚洲综合色视频在线观看 | 色婷婷综合久久久久 | 精品久久美女 | 久久99视频精品 | 天天亚洲| 免费色视频 | 亚洲欧洲视频 | 美女在线免费观看视频 | 六月丁香伊人 | 欧美夫妻性生活电影 | 国产精品欧美久久久久三级 | 中文字幕一区二区三区在线播放 | 中文字幕在线成人 | 丁香九月婷婷 | 国产精品美女免费 | 特级a老妇做爰全过程 | 成片视频在线观看 | 久久久久久美女 | 小草av在线播放 | 在线观看视频国产一区 | 日av免费| 免费在线观看av网站 | 99久久精品午夜一区二区小说 | 五月天婷婷免费视频 | 美女黄网久久 | 久久综合偷偷噜噜噜色 | 亚洲免费观看在线视频 | 国产高清在线不卡 | 国产精品av在线免费观看 | 国产区精品 | 亚洲国产中文字幕在线视频综合 | 欧美巨乳网 | 国产成人精品一区二区三区免费 | 日日操夜夜操狠狠操 | 久久精品免视看 | 久久精品一区二区三区中文字幕 | 三上悠亚一区二区在线观看 | 中文字幕 成人 | 91精品国自产在线观看欧美 | 99精品视频在线观看 | 日韩电影在线一区二区 | 中文字幕乱码亚洲精品一区 | 久久精精品 | 91免费高清视频 | 国产麻豆精品一区二区 | 日韩视频免费 | 精品在线观看一区二区三区 | 色综合久久88色综合天天 | 欧美日韩免费观看一区=区三区 | 国产精品女人久久久 | 亚洲aaa毛片| 日本在线视频网址 | 视频成人永久免费视频 | 国产精品网在线观看 | 国产精品一区二区久久精品爱微奶 | 成人a毛片| 成人av中文字幕 | 99久久99精品 | 日韩精品不卡 | 91视频免费网址 | 欧美一级黄色片 | 特黄色大片 | 午夜久久久精品 | 久久综合久色欧美综合狠狠 | 视频一区亚洲 | 一区二区观看 | 91麻豆.com | 97夜夜澡人人双人人人喊 | 黄色三级网站 | 黄色精品国产 | 超碰97.com| 奇米影视999| 中文字幕成人av | 91成人精品视频 | 国产精品久久久久免费a∨ 欧美一级性生活片 | 国产无套视频 | 91精品婷婷国产综合久久蝌蚪 | 国产亚洲精品久久久网站好莱 | 中文字幕国内精品 | 91亚洲精品乱码久久久久久蜜桃 | 三级av片| 日韩精品一区二区免费 | 亚洲综合色丁香婷婷六月图片 | 91福利视频在线 | 六月丁香婷婷网 | 久久激情片 | 日韩精品视频在线观看免费 | www.亚洲精品视频 | 国产在线永久 | 在线视频久 | 亚洲视屏在线播放 | 久久精品视频国产 | 亚洲一区二区观看 | 国产视频在线观看一区二区 | 美女国产 | 蜜臀av性久久久久蜜臀av | 中文字幕在线观看第三页 | 精品国产91亚洲一区二区三区www | 91中文字幕网 | 精品久久久久久久久久久久 | 色婷婷免费 | 在线观看黄色的网站 | 亚洲午夜久久久综合37日本 | 亚洲天天综合 | 日韩在线观看中文字幕 | 欧美十八| 九色激情网 | 久久亚洲精品国产亚洲老地址 | 91超碰在线播放 | 日日干天天插 | 青青啪| 丁香久久综合 | 免费三级黄色 | 国产无遮挡又黄又爽在线观看 | 亚洲精品18p | 久久久久久久久影视 | 麻豆免费精品视频 | 69视频国产 | 欧美激情va永久在线播放 | 久久 亚洲视频 | 国产九九在线 | 2022久久国产露脸精品国产 | 久久久久久久影院 | 久久躁日日躁aaaaxxxx | av在线免费观看不卡 | 亚洲欧美日韩国产一区二区三区 | 久久久亚洲麻豆日韩精品一区三区 | 伊甸园永久入口www 99热 精品在线 | 久久久国产精品视频 | 日日摸日日 | www.人人草 | 99亚洲国产精品 | 久草在线电影网 | 日韩av男人的天堂 | 毛片网站观看 | www欧美色 | 91在线公开视频 | 中文字幕麻豆 | ww视频在线观看 | 日韩精品中文字幕在线播放 | 亚洲欧美日韩精品一区二区 | 香蕉视频亚洲 | 蜜臀av性久久久久av蜜臀妖精 | 色黄www小说| 精品国产理论 | 97精品超碰一区二区三区 | 深夜成人av | 日韩在线观看av | 国产精品美女 | 久草在线资源视频 | 麻豆精品在线视频 | 中文字幕日本电影 | av先锋中文字幕 | 久久草在线免费 | 日本女人逼 | 国产精品久久久久久久久久久免费看 | 国产精品自产拍在线观看蜜 | 精品国产观看 | 色婷婷久久 | 亚洲欧美视频在线播放 | 91色在线观看 | 国产美女被啪进深处喷白浆视频 | 免费看特级毛片 | 免费麻豆网站 | 密桃av在线 | 久久视频免费在线观看 | 国产韩国日本高清视频 | 国内精品久久久久影院男同志 | 精品国产伦一区二区三区观看体验 | 欧美最爽乱淫视频播放 | 99在线国产 | 激情欧美丁香 | 99免在线观看免费视频高清 | 亚洲精品成人在线 | 国产精品久久久久久久久久免费 | 日本久久精品 | 国产精品一级视频 | 欧美日韩不卡一区二区 | 中文字幕一区二区三 | 久久男人影院 | 日韩av手机在线观看 | 久久精品久久久久久久 | 99精品久久99久久久久 | 国产亚洲成av片在线观看 | 欧美日韩免费网站 | 97天堂网| 99久久久国产精品免费99 | 成人综合婷婷国产精品久久免费 | 日本黄色免费播放 | 在线导航福利 | 香蕉成人在线视频 | 欧美日韩1区 | 日韩在线首页 | 99热在线国产精品 | 成人小视频免费在线观看 | 国产福利在线不卡 | 国产在线观看91 | 日韩在线观看的 | 日韩中文字幕网站 | 网站免费黄 | 久久精品国产免费 | 国产亚洲一区 | 亚洲日韩欧美视频 | 久久天天躁 | 色偷偷88888欧美精品久久 | 国产专区一 | 国产高清av在线播放 | 国产91精品一区二区麻豆网站 | 国产精品成人一区二区三区吃奶 | 欧美日韩一级久久久久久免费看 | 亚洲天堂精品 | 亚洲人片在线观看 | 中文字幕丝袜美腿 | 免费看黄的视频 | 亚洲视频在线看 | 久久精品123| 亚洲国产中文字幕在线观看 | 在线亚洲人成电影网站色www | 久久久免费精品国产一区二区 | 日韩精品免费在线视频 | 国产精品大片免费观看 | 日日添夜夜添 | 中文字幕91视频 | 亚洲1区 在线| 国产一级片在线播放 | 一级一片免费看 | 国产精品中文在线 | 日韩xxxxxxxxx | 日韩精品视频久久 | 久草网站在线观看 | 亚洲成人资源在线观看 | 成全免费观看视频 | 久久视频精品 | 色噜噜在线观看 | 婷婷色 亚洲 | 精品人人人 | 国产a国产 | 免费看的黄色小视频 | 国产精品一区二区三区久久久 | 黄网站www | 色综合人人 | 中文字幕第一页在线视频 | 成人在线播放网站 | 黄色精品国产 | 在线成人国产 | 久久久人人爽 | 久久av免费观看 | 亚洲夜夜综合 | 国产高清精 | 99精品视频精品精品视频 | 日韩在线视 | 91麻豆精品国产91久久久无需广告 | 免费福利视频网站 | 免费网站黄色 | 亚洲影视九九影院在线观看 | 色综合天天综合网国产成人网 | 波多野结衣久久资源 | 国产精品综合久久久久 | 久草视频在线资源 | 久久久精品99 | 丁香九月激情综合 | 欧美在线视频一区二区三区 | 成人久久久久 | 精品国产aⅴ麻豆 | 99九九99九九九视频精品 | 日韩色高清 | 国产成人精品综合久久久 | 亚洲国产精品免费 | 午夜av剧场| 精品在线99 | 日韩在线不卡视频 | 天天搞天天 | 色六月婷婷 | 在线免费观看国产 | 日韩精品一区二区免费 | 亚洲日本欧美在线 | 在线播放国产一区二区三区 | 欧美一级久久 | 伊人婷婷网 | 亚洲成av人片在线观看 | 又粗又长又大又爽又黄少妇毛片 | 在线看黄色的网站 | 狠狠干夜夜爱 | 一级理论片在线观看 | 天堂网在线视频 | 久久99精品热在线观看 | 九九热精品视频在线观看 | 国产成人综合图片 | 黄色片视频免费 | 日本天天操 | 亚洲国产中文字幕在线视频综合 | 亚洲片在线资源 | 五月激情五月激情 | 国产黄色理论片 | 午夜精品久久久久久久爽 | 狠狠ri | 欧美福利片在线观看 | 九九热国产 | 日韩精品视频免费专区在线播放 | 狠狠干婷婷| 四虎永久免费 | 国产麻豆精品一区 | 中文有码在线 | 91 在线视频 | 五月婷婷丁香网 | av在线免费观看黄 | 国内精品小视频 | 国产丝袜 | 欧美一级久久久久 | 91最新网址在线观看 | 国产99色| 久久久久中文字幕 | 27xxoo无遮挡动态视频 | 草久中文字幕 | 8x成人免费视频 | 国产91精品高清一区二区三区 | 99 精品 在线| 青青草在久久免费久久免费 | 国产无区一区二区三麻豆 | 99精品免费在线观看 | 国产黄色播放 | 五月婷婷视频在线 | 久久综合色影院 | 欧美国产日韩在线视频 | 夜夜摸夜夜爽 | 成人国产精品久久久春色 | 日韩高清网站 | 亚洲区色| 手机av电影在线 | 国产精品免费看久久久8精臀av | 91亚色视频在线观看 | 国产精品99免视看9 国产精品毛片一区视频 | 亚洲欧美国产日韩在线观看 | 97视频久久久 | 久草视频在线免费播放 | 亚洲黄色大片 | 国产成人精品久久二区二区 | 亚洲精选在线 | 欧美亚洲国产日韩 | 免费av 在线 | 涩涩伊人 | 国产123区在线观看 国产精品麻豆91 | 国产精品久久久久影院日本 | 91精品视频在线看 | 91在线视频精品 | 中文字幕在线免费看 | 亚州天堂 | 国产精品亚洲精品 | 精品久久网 | 91亚洲免费 | 丰满少妇在线 | 在线观看911视频 | 成人午夜久久 | 免费网站在线观看成人 | 国产高清在线精品 | 国产麻豆电影 | 91视频a| 久久久久久久久久福利 | 9在线观看免费高清完整 | 欧美在线视频a | 免费成人短视频 | 在线亚洲小视频 | 婷婷久久综合九色综合 | 国产视频第二页 | 少妇啪啪av入口 | 黄色三级在线观看 | 久久成人综合 | 美国av大片| 久久99国产精品久久 | 亚洲天堂网视频在线观看 | 久久精品这里都是精品 | 色av男人的天堂免费在线 | 日韩av午夜在线观看 | 国产午夜免费视频 | 国产精品久久久久久久久蜜臀 | 狠狠狠综合 | 免费观看黄色av | 亚洲国产av精品毛片鲁大师 | 正在播放亚洲精品 | www.久久色| 黄色av观看 | 天天干天天搞天天射 | 久久手机精品视频 | 免费黄色网止 | 黄色片免费看 | 亚洲激情一区二区三区 | 在线黄色av电影 | 天天爱天天射天天干天天 | 欧美午夜久久 | 久久超级碰视频 | 日韩免费一二三区 | 日韩在线视频二区 | 久草在线视频看看 | 成人在线视频一区 | 91亚洲精品久久久久图片蜜桃 | 最新日韩在线观看 | 免费视频一区 | 91人人澡人人爽人人精品 | 超碰免费在线公开 | 国产午夜视频在线观看 | 丁香高清视频在线看看 | 日韩在线大片 | 国产黑丝一区二区三区 | 狠狠色噜噜狠狠狠合久 | 国产在线精品视频 | 99久久精品一区二区成人 | 日韩二区在线播放 | 久久免费视频4 | 国产99中文字幕 | 成年人黄色免费视频 | 久久99免费观看 | 欧美性黑人 | 999成人精品| 久久五月婷婷丁香 | 99精品国产成人一区二区 | 综合色播| 午夜在线观看影院 | 国产精品久久久 | 国内精品久久久久影院一蜜桃 | 天天草天天| 国产精品美女 | 免费看的视频 | 一级电影免费在线观看 | 国产精品毛片一区二区 | 国产一区二区三区四区在线 | 国产一区av在线 | 久热av | 国产网红在线 | 婷婷中文字幕在线观看 | 99夜色 | 亚洲精品在线二区 | 中文字幕传媒 | 国产亚洲欧美在线视频 | 亚洲自拍偷拍色图 | 91视频久久 | 国产精品网红直播 | 91久久精品一区二区二区 | 欧美一级电影片 | 国产96精品 | 视频国产在线 | 亚洲欧美日本一区二区三区 | 正在播放五月婷婷狠狠干 | av电影免费看 | 最新成人av | av软件在线观看 | 国产一级精品绿帽视频 | 天天干,天天射,天天操,天天摸 | 国产尤物在线视频 | 久久99精品久久久久久三级 | 欧美成人久久 | 久久久久成人精品免费播放动漫 | 欧美日韩中文国产一区发布 | 99国产成+人+综合+亚洲 欧美 | 欧美 日韩 久久 | 欧美大片第1页 | 在线观看免费日韩 | 欧美激情精品久久久久久 | 国产亚洲精品久久久久秋 | 成人在线观看你懂的 | 欧美日韩一区二区三区不卡 | 日日夜夜91 | 黄色毛片网站在线观看 | 亚洲一级片免费观看 | 午夜视频在线网站 | 黄污视频大全 | 热久久免费国产视频 | 精品在线99| 中文字幕在线观看第三页 | 日韩av手机在线观看 | 国产午夜在线观看视频 | 国产在线探花 | 51久久成人国产精品麻豆 | 国产最新视频在线观看 | 国产精品久久久免费 | 欧美日韩调教 | 国产欧美在线一区二区三区 | 久久久国产在线视频 | 免费黄色av电影 | 天天搞天天 | 日韩av高清在线观看 | 中日韩在线视频 | 一区二区不卡 | 亚洲精品字幕在线 | a级国产乱理论片在线观看 特级毛片在线观看 | 国产精品一区久久久久 | 久草网在线视频 | 国产精品美女久久久久久久久 | 天堂av最新网址 | 亚洲精品午夜国产va久久成人 | 日韩免费一区二区在线观看 | 91亚洲精品国偷拍 | 国产精品涩涩屋www在线观看 | 久久免费在线 | 国产精品原创av片国产免费 | 久操操| 国产精品久久嫩一区二区免费 | 亚洲资源在线观看 | 中文字幕免费观看全部电影 | 国产69精品久久久久久久久久 | 五月婷婷香蕉 | 欧美极品少妇xbxb性爽爽视频 | 欧美在线不卡一区 | 91大神免费在线观看 | 青春草视频在线播放 | 九九一级片 | 久久黄色精品视频 | 日韩成人黄色 | 99视屏 | 亚洲播播| 欧美一二三在线 | 91精品色 | 精品在线观看视频 | 久久网站最新地址 | 日韩欧美有码在线 | 91污视频在线 | 日韩一二区在线观看 | 欧美激情视频一二区 | 日韩黄色大片在线观看 | 97视频一区 | 精品久久电影 | 91探花在线视频 | 欧洲精品视频一区二区 | 亚洲视频在线免费观看 | 久久久久成人精品 | 久久精品久久精品久久精品 | 国产视频精品在线 | 日韩高清av | 国产精品久久久久久999 | 欧美一级免费片 | 国产精品丝袜 | 国产99久久久久久免费看 | 久久国产精品偷 | 天天躁天天操 | 免费观看www小视频的软件 | 免费av网站在线 | 久草电影在线观看 | 日韩午夜大片 | 免费观看一级特黄欧美大片 | 欧美三级高清 | 在线观看视频日韩 | 少妇视频一区 | 亚洲狠狠操 | 麻豆久久久久久久 | 色综合久久中文字幕综合网 | 久久精品亚洲国产 | 这里只有精品视频在线 | 亚洲人在线 | 国产精品精品国产色婷婷 | 少妇视频一区 | 99色在线播放 | 激情网在线视频 | 久久国产精品一国产精品 | 久久免费精品视频 | 成人黄色小说视频 | 色偷偷888欧美精品久久久 | 色婷婷久久久 | 婷婷在线精品视频 | 久久久亚洲国产精品麻豆综合天堂 | 久久人视频| 日本中文字幕久久 | 色福利网站 | 精品字幕 | 不卡av在线免费观看 | 国产精品久久久久免费观看 | 国产精品久久久久久久久久久久久久 | 成年人免费电影在线观看 | 天天摸天天操天天舔 | 国产一区在线视频 | 91福利视频一区 | 精品人妖videos欧美人妖 | 亚洲一区二区视频在线 | 久久少妇免费视频 | 免费在线中文字幕 | 天堂va在线观看 | 日日射天天射 | 精品一二三四五区 | 亚洲日本一区二区在线 | 久久久久网站 | 91视频啪| 日本久久精 | 亚洲精品视频国产 | 色橹橹欧美在线观看视频高清 | 国产在线免费观看 | 色网站黄| 亚洲综合在线五月 | 二区中文字幕 | 国产亚洲成人精品 | 亚洲精品国产电影 | a视频免费在线观看 | 国产xxxx| 在线免费观看黄色大片 | 国产99一区视频免费 | 91成人破解版| 国产一区二区三区免费观看视频 | 亚洲91中文字幕无线码三区 | 日本精品中文字幕在线观看 | 中文av网站 | 色综合天天综合 | 成人久久免费 | 日av免费| 国产日韩精品在线观看 | 一级性视频 | 天天视频色版 | av网站在线免费观看 | 久久综合精品一区 | 亚洲天堂毛片 | 综合激情av| 在线观看中文字幕第一页 | 国产a高清| 婷婷中文字幕综合 | 亚洲精品一区二区三区新线路 | 69精品人人人人 | 日韩视频在线不卡 | 色鬼综合网 | 欧美精品亚洲精品日韩精品 | 日韩a级黄色 | 成人资源在线播放 | 国产精久久久久久妇女av | 久久综合导航 | 国产精品毛片一区二区 | 亚洲精品乱码久久久久久高潮 | 99精品观看 | 夜夜视频欧洲 | 懂色av一区二区在线播放 | 亚洲高清色综合 | 日韩免费视频一区二区 | 亚洲精品国偷拍自产在线观看蜜桃 | 国产又粗又猛又色又黄视频 | 一区二区激情 | 五月婷婷黄色 | 91av中文字幕 | 永久免费毛片 | 久久国产精品视频 | 免费色视频网站 | 中文资源在线官网 | 不卡视频在线看 | 青青河边草观看完整版高清 | 日韩欧美久久 | 国产亚洲成人精品 | 国产免费影院 | 91桃色国产在线播放 | 欧美一级小视频 | 顶级bbw搡bbbb搡bbbb | 五月综合婷| 国产96在线 | 久草在线在线视频 | 91av欧美| 亚洲精品乱码久久久久久久久久 | 精品视频在线观看 | 国产香蕉久久精品综合网 | 91激情视频在线播放 | 国产成人在线一区 | 日韩美一区二区三区 | 97在线资源| 天天综合网在线观看 | 97香蕉久久超级碰碰高清版 | 操操操人人 | 黄污污网站 | 99精品欧美一区二区三区黑人哦 | 激情片av| 最近日本韩国中文字幕 | 91麻豆精品国产91久久久更新时间 | 国产在线观看,日本 | 中文字幕在线免费看线人 | 福利视频网站 | 久日精品 | 麻豆成人精品视频 | 午夜视频色 | 在线你懂 | 九九国产精品视频 | 亚洲高清视频一区二区三区 | freejavvideo日本免费| 国产成人中文字幕 | 亚洲一区天堂 | 青青河边草手机免费 | 亚洲理论影院 | 久久影视精品 | 久久精品久久99 | 亚洲精品美女视频 | 91精品国产成 | 色吊丝在线永久观看最新版本 | 亚洲一区二区观看 | 91亚洲精品在线观看 | 日本黄色免费在线观看 | 一区二区视频电影在线观看 | 久久综合爱 | 国产成人精品免费在线观看 | 亚洲最新视频在线播放 | 韩国一区二区三区视频 | 四虎成人免费观看 | 狠狠狠的干 | 中文字幕一区二区在线观看 | 亚洲国产一二三 | 亚洲黄色在线观看 | 91av网址 | 99热在线国产精品 | 不卡中文字幕av | 91av在线不卡 | 深爱激情开心 | 亚洲精品国产精品国自产观看 | 深爱激情五月网 | 欧美 日韩 国产 成人 在线 | 91免费国产在线观看 | 国产精品伦一区二区三区视频 | 日本精品一区二区 | 久久久久久美女 | 91最新在线观看 | 一区二区三区影院 | 国产一区观看 | 在线观看亚洲精品 | 超碰99在线| 亚洲 欧美 日韩 综合 | 成年人电影免费看 | 欧美日韩一区二区三区在线观看视频 | 韩国av一区二区三区 | 欧美孕交vivoestv另类 | 久久桃花网 | 69夜色精品国产69乱 | 国内三级在线观看 | 久久久亚洲影院 | 国产一区二区三精品久久久无广告 | 日韩有码中文字幕在线 | 日韩视频二区 | 99 视频 高清 | www.狠狠操.com | 五月婷在线| 日韩理论电影网 | 亚洲乱码国产乱码精品天美传媒 | 六月激情久久 | 国产精品一区二区三区观看 | 99精品视频在线播放免费 | 亚洲影院色 | 欧美性猛片, | 日韩偷拍精品 | 国产不卡毛片 | 91精品国产99久久久久 | 久久99热国产 | 午夜丰满寂寞少妇精品 | www.夜夜爽 | 免费视频91| 国产精品97 | 天天色播 | 亚洲国产成人精品在线 | 国产精品乱码久久久久久1区2区 | 精品亚洲在线 | 亚洲理论视频 | 国产美女永久免费 | 91热这里只有精品 | 在线视频久久 | 亚洲丁香日韩 | 一区二区不卡高清 | 色婷婷免费 | aaawww| 欧美一二三视频 | 欧美aaa级片| 中文字幕在线乱 | 欧美极品裸体 | 日韩在线观看网站 | www国产亚洲精品久久网站 | 国产一区二区高清视频 | 香蕉视频在线观看免费 | 精品国产色 | 国产高清黄色 | 一级黄网 | 911久久| 天天操天天操天天爽 | 中字幕视频在线永久在线观看免费 | 99亚洲精品 | 欧美日韩高清一区二区 | 国产99久久久精品 | 韩日精品在线观看 | 欧美成人影音 | 最新国产精品拍自在线播放 | 丝袜一区在线 | 免费观看黄 | 在线观看a视频 | 日批视频在线观看免费 | 久久深爱网| 中文字幕色站 | 91精品久久久久久粉嫩 | 欧美一区在线观看视频 | 成人97人人超碰人人99 | 免费看av片网站 | 97精品国产97久久久久久粉红 | 亚洲成av人片在线观看香蕉 | 一级黄视频 | 亚洲激情五月 | 91精品国产综合久久福利 | 亚洲狠狠婷婷综合久久久 | 久久精国产| 国产又粗又猛又爽又黄的视频免费 | 国产又黄又爽又猛视频日本 | 天天做天天爱夜夜爽 | 欧美人操人 | 天堂av网址 | 久久久久国产a免费观看rela | 久久久香蕉视频 | 日韩精品一区在线播放 | 久久精品香蕉 | 欧美另类人妖 | 欧美a级成人淫片免费看 | 国产69精品久久久久久久久久 | 国产精品女同一区二区三区久久夜 | 日本激情视频中文字幕 | 亚洲欧美日本一区二区三区 |