core.cpp

00001 /*****************************************************************************
00002 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
00003 All rights reserved.
00004 
00005 Redistribution and use in source and binary forms, with or without
00006 modification, are permitted provided that the following conditions are
00007 met:
00008 
00009 * Redistributions of source code must retain the above
00010   copyright notice, this list of conditions and the
00011   following disclaimer.
00012 
00013 * Redistributions in binary form must reproduce the
00014   above copyright notice, this list of conditions
00015   and the following disclaimer in the documentation
00016   and/or other materials provided with the distribution.
00017 
00018 * Neither the name of the University of Illinois
00019   nor the names of its contributors may be used to
00020   endorse or promote products derived from this
00021   software without specific prior written permission.
00022 
00023 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00024 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
00025 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
00026 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
00027 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00028 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00029 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00030 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00031 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00032 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00033 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00034 *****************************************************************************/
00035 
00036 /*****************************************************************************
00037 written by
00038    Yunhong Gu, last updated 02/28/2012
00039 *****************************************************************************/
00040 
00041 #ifndef WIN32
00042    #include <unistd.h>
00043    #include <netdb.h>
00044    #include <arpa/inet.h>
00045    #include <cerrno>
00046    #include <cstring>
00047    #include <cstdlib>
00048 #else
00049    #include <winsock2.h>
00050    #include <ws2tcpip.h>
00051    #ifdef LEGACY_WIN32
00052       #include <wspiapi.h>
00053    #endif
00054 #endif
00055 #include <cmath>
00056 #include <sstream>
00057 #include "queue.h"
00058 #include "core.h"
00059 
00060 using namespace std;
00061 
00062 
00063 CUDTUnited CUDT::s_UDTUnited;
00064 
00065 const UDTSOCKET CUDT::INVALID_SOCK = -1;
00066 const int CUDT::ERROR = -1;
00067 
00068 const UDTSOCKET UDT::INVALID_SOCK = CUDT::INVALID_SOCK;
00069 const int UDT::ERROR = CUDT::ERROR;
00070 
00071 const int32_t CSeqNo::m_iSeqNoTH = 0x3FFFFFFF;
00072 const int32_t CSeqNo::m_iMaxSeqNo = 0x7FFFFFFF;
00073 const int32_t CAckNo::m_iMaxAckSeqNo = 0x7FFFFFFF;
00074 const int32_t CMsgNo::m_iMsgNoTH = 0xFFFFFFF;
00075 const int32_t CMsgNo::m_iMaxMsgNo = 0x1FFFFFFF;
00076 
00077 const int CUDT::m_iVersion = 4;
00078 const int CUDT::m_iSYNInterval = 10000;
00079 const int CUDT::m_iSelfClockInterval = 64;
00080 
00081 
00082 CUDT::CUDT()
00083 {
00084    m_pSndBuffer = NULL;
00085    m_pRcvBuffer = NULL;
00086    m_pSndLossList = NULL;
00087    m_pRcvLossList = NULL;
00088    m_pACKWindow = NULL;
00089    m_pSndTimeWindow = NULL;
00090    m_pRcvTimeWindow = NULL;
00091 
00092    m_pSndQueue = NULL;
00093    m_pRcvQueue = NULL;
00094    m_pPeerAddr = NULL;
00095    m_pSNode = NULL;
00096    m_pRNode = NULL;
00097 
00098    // Initilize mutex and condition variables
00099    initSynch();
00100 
00101    // Default UDT configurations
00102    m_iMSS = 1500;
00103    m_bSynSending = true;
00104    m_bSynRecving = true;
00105    m_iFlightFlagSize = 25600;
00106    m_iSndBufSize = 8192;
00107    m_iRcvBufSize = 8192; //Rcv buffer MUST NOT be bigger than Flight Flag size
00108    m_Linger.l_onoff = 1;
00109    m_Linger.l_linger = 180;
00110    m_iUDPSndBufSize = 65536;
00111    m_iUDPRcvBufSize = m_iRcvBufSize * m_iMSS;
00112    m_iSockType = UDT_STREAM;
00113    m_iIPversion = AF_INET;
00114    m_bRendezvous = false;
00115    m_iSndTimeOut = -1;
00116    m_iRcvTimeOut = -1;
00117    m_bReuseAddr = true;
00118    m_llMaxBW = -1;
00119 
00120    m_pCCFactory = new CCCFactory<CUDTCC>;
00121    m_pCC = NULL;
00122    m_pCache = NULL;
00123 
00124    // Initial status
00125    m_bOpened = false;
00126    m_bListening = false;
00127    m_bConnecting = false;
00128    m_bConnected = false;
00129    m_bClosing = false;
00130    m_bShutdown = false;
00131    m_bBroken = false;
00132    m_bPeerHealth = true;
00133    m_ullLingerExpiration = 0;
00134 }
00135 
00136 CUDT::CUDT(const CUDT& ancestor)
00137 {
00138    m_pSndBuffer = NULL;
00139    m_pRcvBuffer = NULL;
00140    m_pSndLossList = NULL;
00141    m_pRcvLossList = NULL;
00142    m_pACKWindow = NULL;
00143    m_pSndTimeWindow = NULL;
00144    m_pRcvTimeWindow = NULL;
00145 
00146    m_pSndQueue = NULL;
00147    m_pRcvQueue = NULL;
00148    m_pPeerAddr = NULL;
00149    m_pSNode = NULL;
00150    m_pRNode = NULL;
00151 
00152    // Initilize mutex and condition variables
00153    initSynch();
00154 
00155    // Default UDT configurations
00156    m_iMSS = ancestor.m_iMSS;
00157    m_bSynSending = ancestor.m_bSynSending;
00158    m_bSynRecving = ancestor.m_bSynRecving;
00159    m_iFlightFlagSize = ancestor.m_iFlightFlagSize;
00160    m_iSndBufSize = ancestor.m_iSndBufSize;
00161    m_iRcvBufSize = ancestor.m_iRcvBufSize;
00162    m_Linger = ancestor.m_Linger;
00163    m_iUDPSndBufSize = ancestor.m_iUDPSndBufSize;
00164    m_iUDPRcvBufSize = ancestor.m_iUDPRcvBufSize;
00165    m_iSockType = ancestor.m_iSockType;
00166    m_iIPversion = ancestor.m_iIPversion;
00167    m_bRendezvous = ancestor.m_bRendezvous;
00168    m_iSndTimeOut = ancestor.m_iSndTimeOut;
00169    m_iRcvTimeOut = ancestor.m_iRcvTimeOut;
00170    m_bReuseAddr = true; // this must be true, because all accepted sockets shared the same port with the listener
00171    m_llMaxBW = ancestor.m_llMaxBW;
00172 
00173    m_pCCFactory = ancestor.m_pCCFactory->clone();
00174    m_pCC = NULL;
00175    m_pCache = ancestor.m_pCache;
00176 
00177    // Initial status
00178    m_bOpened = false;
00179    m_bListening = false;
00180    m_bConnecting = false;
00181    m_bConnected = false;
00182    m_bClosing = false;
00183    m_bShutdown = false;
00184    m_bBroken = false;
00185    m_bPeerHealth = true;
00186    m_ullLingerExpiration = 0;
00187 }
00188 
00189 CUDT::~CUDT()
00190 {
00191    // release mutex/condtion variables
00192    destroySynch();
00193 
00194    // destroy the data structures
00195    delete m_pSndBuffer;
00196    delete m_pRcvBuffer;
00197    delete m_pSndLossList;
00198    delete m_pRcvLossList;
00199    delete m_pACKWindow;
00200    delete m_pSndTimeWindow;
00201    delete m_pRcvTimeWindow;
00202    delete m_pCCFactory;
00203    delete m_pCC;
00204    delete m_pPeerAddr;
00205    delete m_pSNode;
00206    delete m_pRNode;
00207 }
00208 
00209 void CUDT::setOpt(UDTOpt optName, const void* optval, int)
00210 {
00211    if (m_bBroken || m_bClosing)
00212       throw CUDTException(2, 1, 0);
00213 
00214    CGuard cg(m_ConnectionLock);
00215    CGuard sendguard(m_SendLock);
00216    CGuard recvguard(m_RecvLock);
00217 
00218    switch (optName)
00219    {
00220    case UDT_MSS:
00221       if (m_bOpened)
00222          throw CUDTException(5, 1, 0);
00223 
00224       if (*(int*)optval < int(28 + CHandShake::m_iContentSize))
00225          throw CUDTException(5, 3, 0);
00226 
00227       m_iMSS = *(int*)optval;
00228 
00229       // Packet size cannot be greater than UDP buffer size
00230       if (m_iMSS > m_iUDPSndBufSize)
00231          m_iMSS = m_iUDPSndBufSize;
00232       if (m_iMSS > m_iUDPRcvBufSize)
00233          m_iMSS = m_iUDPRcvBufSize;
00234 
00235       break;
00236 
00237    case UDT_SNDSYN:
00238       m_bSynSending = *(bool *)optval;
00239       break;
00240 
00241    case UDT_RCVSYN:
00242       m_bSynRecving = *(bool *)optval;
00243       break;
00244 
00245    case UDT_CC:
00246       if (m_bConnecting || m_bConnected)
00247          throw CUDTException(5, 1, 0);
00248       if (NULL != m_pCCFactory)
00249          delete m_pCCFactory;
00250       m_pCCFactory = ((CCCVirtualFactory *)optval)->clone();
00251 
00252       break;
00253 
00254    case UDT_FC:
00255       if (m_bConnecting || m_bConnected)
00256          throw CUDTException(5, 2, 0);
00257 
00258       if (*(int*)optval < 1)
00259          throw CUDTException(5, 3);
00260 
00261       // Mimimum recv flight flag size is 32 packets
00262       if (*(int*)optval > 32)
00263          m_iFlightFlagSize = *(int*)optval;
00264       else
00265          m_iFlightFlagSize = 32;
00266 
00267       break;
00268 
00269    case UDT_SNDBUF:
00270       if (m_bOpened)
00271          throw CUDTException(5, 1, 0);
00272 
00273       if (*(int*)optval <= 0)
00274          throw CUDTException(5, 3, 0);
00275 
00276       m_iSndBufSize = *(int*)optval / (m_iMSS - 28);
00277 
00278       break;
00279 
00280    case UDT_RCVBUF:
00281       if (m_bOpened)
00282          throw CUDTException(5, 1, 0);
00283 
00284       if (*(int*)optval <= 0)
00285          throw CUDTException(5, 3, 0);
00286 
00287       // Mimimum recv buffer size is 32 packets
00288       if (*(int*)optval > (m_iMSS - 28) * 32)
00289          m_iRcvBufSize = *(int*)optval / (m_iMSS - 28);
00290       else
00291          m_iRcvBufSize = 32;
00292 
00293       // recv buffer MUST not be greater than FC size
00294       if (m_iRcvBufSize > m_iFlightFlagSize)
00295          m_iRcvBufSize = m_iFlightFlagSize;
00296 
00297       break;
00298 
00299    case UDT_LINGER:
00300       m_Linger = *(linger*)optval;
00301       break;
00302 
00303    case UDP_SNDBUF:
00304       if (m_bOpened)
00305          throw CUDTException(5, 1, 0);
00306 
00307       m_iUDPSndBufSize = *(int*)optval;
00308 
00309       if (m_iUDPSndBufSize < m_iMSS)
00310          m_iUDPSndBufSize = m_iMSS;
00311 
00312       break;
00313 
00314    case UDP_RCVBUF:
00315       if (m_bOpened)
00316          throw CUDTException(5, 1, 0);
00317 
00318       m_iUDPRcvBufSize = *(int*)optval;
00319 
00320       if (m_iUDPRcvBufSize < m_iMSS)
00321          m_iUDPRcvBufSize = m_iMSS;
00322 
00323       break;
00324 
00325    case UDT_RENDEZVOUS:
00326       if (m_bConnecting || m_bConnected)
00327          throw CUDTException(5, 1, 0);
00328       m_bRendezvous = *(bool *)optval;
00329       break;
00330 
00331    case UDT_SNDTIMEO: 
00332       m_iSndTimeOut = *(int*)optval; 
00333       break; 
00334     
00335    case UDT_RCVTIMEO: 
00336       m_iRcvTimeOut = *(int*)optval; 
00337       break; 
00338 
00339    case UDT_REUSEADDR:
00340       if (m_bOpened)
00341          throw CUDTException(5, 1, 0);
00342       m_bReuseAddr = *(bool*)optval;
00343       break;
00344 
00345    case UDT_MAXBW:
00346       m_llMaxBW = *(int64_t*)optval;
00347       break;
00348     
00349    default:
00350       throw CUDTException(5, 0, 0);
00351    }
00352 }
00353 
00354 void CUDT::getOpt(UDTOpt optName, void* optval, int& optlen)
00355 {
00356    CGuard cg(m_ConnectionLock);
00357 
00358    switch (optName)
00359    {
00360    case UDT_MSS:
00361       *(int*)optval = m_iMSS;
00362       optlen = sizeof(int);
00363       break;
00364 
00365    case UDT_SNDSYN:
00366       *(bool*)optval = m_bSynSending;
00367       optlen = sizeof(bool);
00368       break;
00369 
00370    case UDT_RCVSYN:
00371       *(bool*)optval = m_bSynRecving;
00372       optlen = sizeof(bool);
00373       break;
00374 
00375    case UDT_CC:
00376       if (!m_bOpened)
00377          throw CUDTException(5, 5, 0);
00378       *(CCC**)optval = m_pCC;
00379       optlen = sizeof(CCC*);
00380 
00381       break;
00382 
00383    case UDT_FC:
00384       *(int*)optval = m_iFlightFlagSize;
00385       optlen = sizeof(int);
00386       break;
00387 
00388    case UDT_SNDBUF:
00389       *(int*)optval = m_iSndBufSize * (m_iMSS - 28);
00390       optlen = sizeof(int);
00391       break;
00392 
00393    case UDT_RCVBUF:
00394       *(int*)optval = m_iRcvBufSize * (m_iMSS - 28);
00395       optlen = sizeof(int);
00396       break;
00397 
00398    case UDT_LINGER:
00399       if (optlen < (int)(sizeof(linger)))
00400          throw CUDTException(5, 3, 0);
00401 
00402       *(linger*)optval = m_Linger;
00403       optlen = sizeof(linger);
00404       break;
00405 
00406    case UDP_SNDBUF:
00407       *(int*)optval = m_iUDPSndBufSize;
00408       optlen = sizeof(int);
00409       break;
00410 
00411    case UDP_RCVBUF:
00412       *(int*)optval = m_iUDPRcvBufSize;
00413       optlen = sizeof(int);
00414       break;
00415 
00416    case UDT_RENDEZVOUS:
00417       *(bool *)optval = m_bRendezvous;
00418       optlen = sizeof(bool);
00419       break;
00420 
00421    case UDT_SNDTIMEO: 
00422       *(int*)optval = m_iSndTimeOut; 
00423       optlen = sizeof(int); 
00424       break; 
00425     
00426    case UDT_RCVTIMEO: 
00427       *(int*)optval = m_iRcvTimeOut; 
00428       optlen = sizeof(int); 
00429       break; 
00430 
00431    case UDT_REUSEADDR:
00432       *(bool *)optval = m_bReuseAddr;
00433       optlen = sizeof(bool);
00434       break;
00435 
00436    case UDT_MAXBW:
00437       *(int64_t*)optval = m_llMaxBW;
00438       optlen = sizeof(int64_t);
00439       break;
00440 
00441    case UDT_STATE:
00442       *(int32_t*)optval = s_UDTUnited.getStatus(m_SocketID);
00443       optlen = sizeof(int32_t);
00444       break;
00445 
00446    case UDT_EVENT:
00447    {
00448       int32_t event = 0;
00449       if (m_bBroken)
00450          event |= UDT_EPOLL_ERR;
00451       else
00452       {
00453          if (m_pRcvBuffer && (m_pRcvBuffer->getRcvDataSize() > 0))
00454             event |= UDT_EPOLL_IN;
00455          if (m_pSndBuffer && (m_iSndBufSize > m_pSndBuffer->getCurrBufSize()))
00456             event |= UDT_EPOLL_OUT;
00457       }
00458       *(int32_t*)optval = event;
00459       optlen = sizeof(int32_t);
00460       break;
00461    }
00462 
00463    case UDT_SNDDATA:
00464       if (m_pSndBuffer)
00465          *(int32_t*)optval = m_pSndBuffer->getCurrBufSize();
00466       else
00467          *(int32_t*)optval = 0;
00468       optlen = sizeof(int32_t);
00469       break;
00470 
00471    case UDT_RCVDATA:
00472       if (m_pRcvBuffer)
00473          *(int32_t*)optval = m_pRcvBuffer->getRcvDataSize();
00474       else
00475          *(int32_t*)optval = 0;
00476       optlen = sizeof(int32_t);
00477       break;
00478 
00479    default:
00480       throw CUDTException(5, 0, 0);
00481    }
00482 }
00483 
00484 void CUDT::open()
00485 {
00486    CGuard cg(m_ConnectionLock);
00487 
00488    // Initial sequence number, loss, acknowledgement, etc.
00489    m_iPktSize = m_iMSS - 28;
00490    m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;
00491 
00492    m_iEXPCount = 1;
00493    m_iBandwidth = 1;
00494    m_iDeliveryRate = 16;
00495    m_iAckSeqNo = 0;
00496    m_ullLastAckTime = 0;
00497 
00498    // trace information
00499    m_StartTime = CTimer::getTime();
00500    m_llSentTotal = m_llRecvTotal = m_iSndLossTotal = m_iRcvLossTotal = m_iRetransTotal = m_iSentACKTotal = m_iRecvACKTotal = m_iSentNAKTotal = m_iRecvNAKTotal = 0;
00501    m_LastSampleTime = CTimer::getTime();
00502    m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceRcvLoss = m_iTraceRetrans = m_iSentACK = m_iRecvACK = m_iSentNAK = m_iRecvNAK = 0;
00503    m_llSndDuration = m_llSndDurationTotal = 0;
00504 
00505    // structures for queue
00506    if (NULL == m_pSNode)
00507       m_pSNode = new CSNode;
00508    m_pSNode->m_pUDT = this;
00509    m_pSNode->m_llTimeStamp = 1;
00510    m_pSNode->m_iHeapLoc = -1;
00511 
00512    if (NULL == m_pRNode)
00513       m_pRNode = new CRNode;
00514    m_pRNode->m_pUDT = this;
00515    m_pRNode->m_llTimeStamp = 1;
00516    m_pRNode->m_pPrev = m_pRNode->m_pNext = NULL;
00517    m_pRNode->m_bOnList = false;
00518 
00519    m_iRTT = 10 * m_iSYNInterval;
00520    m_iRTTVar = m_iRTT >> 1;
00521    m_ullCPUFrequency = CTimer::getCPUFrequency();
00522 
00523    // set up the timers
00524    m_ullSYNInt = m_iSYNInterval * m_ullCPUFrequency;
00525   
00526    // set minimum NAK and EXP timeout to 100ms
00527    m_ullMinNakInt = 300000 * m_ullCPUFrequency;
00528    m_ullMinExpInt = 300000 * m_ullCPUFrequency;
00529 
00530    m_ullACKInt = m_ullSYNInt;
00531    m_ullNAKInt = m_ullMinNakInt;
00532 
00533    uint64_t currtime;
00534    CTimer::rdtsc(currtime);
00535    m_ullLastRspTime = currtime;
00536    m_ullNextACKTime = currtime + m_ullSYNInt;
00537    m_ullNextNAKTime = currtime + m_ullNAKInt;
00538 
00539    m_iPktCount = 0;
00540    m_iLightACKCount = 1;
00541 
00542    m_ullTargetTime = 0;
00543    m_ullTimeDiff = 0;
00544 
00545    // Now UDT is opened.
00546    m_bOpened = true;
00547 }
00548 
00549 void CUDT::listen()
00550 {
00551    CGuard cg(m_ConnectionLock);
00552 
00553    if (!m_bOpened)
00554       throw CUDTException(5, 0, 0);
00555 
00556    if (m_bConnecting || m_bConnected)
00557       throw CUDTException(5, 2, 0);
00558 
00559    // listen can be called more than once
00560    if (m_bListening)
00561       return;
00562 
00563    // if there is already another socket listening on the same port
00564    if (m_pRcvQueue->setListener(this) < 0)
00565       throw CUDTException(5, 11, 0);
00566 
00567    m_bListening = true;
00568 }
00569 
00570 void CUDT::connect(const sockaddr* serv_addr)
00571 {
00572    CGuard cg(m_ConnectionLock);
00573 
00574    if (!m_bOpened)
00575       throw CUDTException(5, 0, 0);
00576 
00577    if (m_bListening)
00578       throw CUDTException(5, 2, 0);
00579 
00580    if (m_bConnecting || m_bConnected)
00581       throw CUDTException(5, 2, 0);
00582 
00583    // record peer/server address
00584    delete m_pPeerAddr;
00585    m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6;
00586    memcpy(m_pPeerAddr, serv_addr, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));
00587 
00588    // register this socket in the rendezvous queue
00589    // RendezevousQueue is used to temporarily store incoming handshake, non-rendezvous connections also require this function
00590    uint64_t ttl = 3000000;
00591    if (m_bRendezvous)
00592       ttl *= 10;
00593    ttl += CTimer::getTime();
00594    m_pRcvQueue->registerConnector(m_SocketID, this, m_iIPversion, serv_addr, ttl);
00595 
00596    // This is my current configurations
00597    m_ConnReq.m_iVersion = m_iVersion;
00598    m_ConnReq.m_iType = m_iSockType;
00599    m_ConnReq.m_iMSS = m_iMSS;
00600    m_ConnReq.m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize)? m_iRcvBufSize : m_iFlightFlagSize;
00601    m_ConnReq.m_iReqType = (!m_bRendezvous) ? 1 : 0;
00602    m_ConnReq.m_iID = m_SocketID;
00603    CIPAddress::ntop(serv_addr, m_ConnReq.m_piPeerIP, m_iIPversion);
00604 
00605    // Random Initial Sequence Number
00606    srand((unsigned int)CTimer::getTime());
00607    m_iISN = m_ConnReq.m_iISN = (int32_t)(CSeqNo::m_iMaxSeqNo * (double(rand()) / RAND_MAX));
00608 
00609    m_iLastDecSeq = m_iISN - 1;
00610    m_iSndLastAck = m_iISN;
00611    m_iSndLastDataAck = m_iISN;
00612    m_iSndCurrSeqNo = m_iISN - 1;
00613    m_iSndLastAck2 = m_iISN;
00614    m_ullSndLastAck2Time = CTimer::getTime();
00615 
00616    // Inform the server my configurations.
00617    CPacket request;
00618    char* reqdata = new char [m_iPayloadSize];
00619    request.pack(0, NULL, reqdata, m_iPayloadSize);
00620    // ID = 0, connection request
00621    request.m_iID = 0;
00622 
00623    int hs_size = m_iPayloadSize;
00624    m_ConnReq.serialize(reqdata, hs_size);
00625    request.setLength(hs_size);
00626    m_pSndQueue->sendto(serv_addr, request);
00627    m_llLastReqTime = CTimer::getTime();
00628 
00629    m_bConnecting = true;
00630 
00631    // asynchronous connect, return immediately
00632    if (!m_bSynRecving)
00633    {
00634       delete [] reqdata;
00635       return;
00636    }
00637 
00638    // Wait for the negotiated configurations from the peer side.
00639    CPacket response;
00640    char* resdata = new char [m_iPayloadSize];
00641    response.pack(0, NULL, resdata, m_iPayloadSize);
00642 
00643    CUDTException e(0, 0);
00644 
00645    while (!m_bClosing)
00646    {
00647       // avoid sending too many requests, at most 1 request per 250ms
00648       if (CTimer::getTime() - m_llLastReqTime > 250000)
00649       {
00650          m_ConnReq.serialize(reqdata, hs_size);
00651          request.setLength(hs_size);
00652          if (m_bRendezvous)
00653             request.m_iID = m_ConnRes.m_iID;
00654          m_pSndQueue->sendto(serv_addr, request);
00655          m_llLastReqTime = CTimer::getTime();
00656       }
00657 
00658       response.setLength(m_iPayloadSize);
00659       if (m_pRcvQueue->recvfrom(m_SocketID, response) > 0)
00660       {
00661          if (connect(response) <= 0)
00662             break;
00663 
00664          // new request/response should be sent out immediately on receving a response
00665          m_llLastReqTime = 0;
00666       }
00667 
00668       if (CTimer::getTime() > ttl)
00669       {
00670          // timeout
00671          e = CUDTException(1, 1, 0);
00672          break;
00673       }
00674    }
00675 
00676    delete [] reqdata;
00677    delete [] resdata;
00678 
00679    if (e.getErrorCode() == 0)
00680    {
00681       if (m_bClosing)                                                 // if the socket is closed before connection...
00682          e = CUDTException(1);
00683       else if (1002 == m_ConnRes.m_iReqType)                          // connection request rejected
00684          e = CUDTException(1, 2, 0);
00685       else if ((!m_bRendezvous) && (m_iISN != m_ConnRes.m_iISN))      // secuity check
00686          e = CUDTException(1, 4, 0);
00687    }
00688 
00689    if (e.getErrorCode() != 0)
00690       throw e;
00691 }
00692 
00693 int CUDT::connect(const CPacket& response) throw ()
00694 {
00695    // this is the 2nd half of a connection request. If the connection is setup successfully this returns 0.
00696    // returning -1 means there is an error.
00697    // returning 1 or 2 means the connection is in process and needs more handshake
00698 
00699    if (!m_bConnecting)
00700       return -1;
00701 
00702    if (m_bRendezvous && ((0 == response.getFlag()) || (1 == response.getType())) && (0 != m_ConnRes.m_iType))
00703    {
00704       //a data packet or a keep-alive packet comes, which means the peer side is already connected
00705       // in this situation, the previously recorded response will be used
00706       goto POST_CONNECT;
00707    }
00708 
00709    if ((1 != response.getFlag()) || (0 != response.getType()))
00710       return -1;
00711 
00712    m_ConnRes.deserialize(response.m_pcData, response.getLength());
00713 
00714    if (m_bRendezvous)
00715    {
00716       // regular connect should NOT communicate with rendezvous connect
00717       // rendezvous connect require 3-way handshake
00718       if (1 == m_ConnRes.m_iReqType)
00719          return -1;
00720 
00721       if ((0 == m_ConnReq.m_iReqType) || (0 == m_ConnRes.m_iReqType))
00722       {
00723          m_ConnReq.m_iReqType = -1;
00724          // the request time must be updated so that the next handshake can be sent out immediately.
00725          m_llLastReqTime = 0;
00726          return 1;
00727       }
00728    }
00729    else
00730    {
00731       // set cookie
00732       if (1 == m_ConnRes.m_iReqType)
00733       {
00734          m_ConnReq.m_iReqType = -1;
00735          m_ConnReq.m_iCookie = m_ConnRes.m_iCookie;
00736          m_llLastReqTime = 0;
00737          return 1;
00738       }
00739    }
00740 
00741 POST_CONNECT:
00742    // Remove from rendezvous queue
00743    m_pRcvQueue->removeConnector(m_SocketID);
00744 
00745    // Re-configure according to the negotiated values.
00746    m_iMSS = m_ConnRes.m_iMSS;
00747    m_iFlowWindowSize = m_ConnRes.m_iFlightFlagSize;
00748    m_iPktSize = m_iMSS - 28;
00749    m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;
00750    m_iPeerISN = m_ConnRes.m_iISN;
00751    m_iRcvLastAck = m_ConnRes.m_iISN;
00752    m_iRcvLastAckAck = m_ConnRes.m_iISN;
00753    m_iRcvCurrSeqNo = m_ConnRes.m_iISN - 1;
00754    m_PeerID = m_ConnRes.m_iID;
00755    memcpy(m_piSelfIP, m_ConnRes.m_piPeerIP, 16);
00756 
00757    // Prepare all data structures
00758    try
00759    {
00760       m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize);
00761       m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize);
00762       // after introducing lite ACK, the sndlosslist may not be cleared in time, so it requires twice space.
00763       m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2);
00764       m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize);
00765       m_pACKWindow = new CACKWindow(1024);
00766       m_pRcvTimeWindow = new CPktTimeWindow(16, 64);
00767       m_pSndTimeWindow = new CPktTimeWindow();
00768    }
00769    catch (...)
00770    {
00771       throw CUDTException(3, 2, 0);
00772    }
00773 
00774    CInfoBlock ib;
00775    ib.m_iIPversion = m_iIPversion;
00776    CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);
00777    if (m_pCache->lookup(&ib) >= 0)
00778    {
00779       m_iRTT = ib.m_iRTT;
00780       m_iBandwidth = ib.m_iBandwidth;
00781    }
00782 
00783    m_pCC = m_pCCFactory->create();
00784    m_pCC->m_UDT = m_SocketID;
00785    m_pCC->setMSS(m_iMSS);
00786    m_pCC->setMaxCWndSize(m_iFlowWindowSize);
00787    m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);
00788    m_pCC->setRcvRate(m_iDeliveryRate);
00789    m_pCC->setRTT(m_iRTT);
00790    m_pCC->setBandwidth(m_iBandwidth);
00791    m_pCC->init();
00792 
00793    m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
00794    m_dCongestionWindow = m_pCC->m_dCWndSize;
00795 
00796    // And, I am connected too.
00797    m_bConnecting = false;
00798    m_bConnected = true;
00799 
00800    // register this socket for receiving data packets
00801    m_pRNode->m_bOnList = true;
00802    m_pRcvQueue->setNewEntry(this);
00803 
00804    // acknowledde any waiting epolls to write
00805    s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);
00806 
00807    // acknowledge the management module.
00808    s_UDTUnited.connect_complete(m_SocketID);
00809 
00810    return 0;
00811 }
00812 
00813 void CUDT::connect(const sockaddr* peer, CHandShake* hs)
00814 {
00815    CGuard cg(m_ConnectionLock);
00816 
00817    // Uses the smaller MSS between the peers        
00818    if (hs->m_iMSS > m_iMSS)
00819       hs->m_iMSS = m_iMSS;
00820    else
00821       m_iMSS = hs->m_iMSS;
00822 
00823    // exchange info for maximum flow window size
00824    m_iFlowWindowSize = hs->m_iFlightFlagSize;
00825    hs->m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize)? m_iRcvBufSize : m_iFlightFlagSize;
00826 
00827    m_iPeerISN = hs->m_iISN;
00828 
00829    m_iRcvLastAck = hs->m_iISN;
00830    m_iRcvLastAckAck = hs->m_iISN;
00831    m_iRcvCurrSeqNo = hs->m_iISN - 1;
00832 
00833    m_PeerID = hs->m_iID;
00834    hs->m_iID = m_SocketID;
00835 
00836    // use peer's ISN and send it back for security check
00837    m_iISN = hs->m_iISN;
00838 
00839    m_iLastDecSeq = m_iISN - 1;
00840    m_iSndLastAck = m_iISN;
00841    m_iSndLastDataAck = m_iISN;
00842    m_iSndCurrSeqNo = m_iISN - 1;
00843    m_iSndLastAck2 = m_iISN;
00844    m_ullSndLastAck2Time = CTimer::getTime();
00845 
00846    // this is a reponse handshake
00847    hs->m_iReqType = -1;
00848 
00849    // get local IP address and send the peer its IP address (because UDP cannot get local IP address)
00850    memcpy(m_piSelfIP, hs->m_piPeerIP, 16);
00851    CIPAddress::ntop(peer, hs->m_piPeerIP, m_iIPversion);
00852   
00853    m_iPktSize = m_iMSS - 28;
00854    m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;
00855 
00856    // Prepare all structures
00857    try
00858    {
00859       m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize);
00860       m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize);
00861       m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2);
00862       m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize);
00863       m_pACKWindow = new CACKWindow(1024);
00864       m_pRcvTimeWindow = new CPktTimeWindow(16, 64);
00865       m_pSndTimeWindow = new CPktTimeWindow();
00866    }
00867    catch (...)
00868    {
00869       throw CUDTException(3, 2, 0);
00870    }
00871 
00872    CInfoBlock ib;
00873    ib.m_iIPversion = m_iIPversion;
00874    CInfoBlock::convert(peer, m_iIPversion, ib.m_piIP);
00875    if (m_pCache->lookup(&ib) >= 0)
00876    {
00877       m_iRTT = ib.m_iRTT;
00878       m_iBandwidth = ib.m_iBandwidth;
00879    }
00880 
00881    m_pCC = m_pCCFactory->create();
00882    m_pCC->m_UDT = m_SocketID;
00883    m_pCC->setMSS(m_iMSS);
00884    m_pCC->setMaxCWndSize(m_iFlowWindowSize);
00885    m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);
00886    m_pCC->setRcvRate(m_iDeliveryRate);
00887    m_pCC->setRTT(m_iRTT);
00888    m_pCC->setBandwidth(m_iBandwidth);
00889    m_pCC->init();
00890 
00891    m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
00892    m_dCongestionWindow = m_pCC->m_dCWndSize;
00893 
00894    m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6;
00895    memcpy(m_pPeerAddr, peer, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));
00896 
00897    // And of course, it is connected.
00898    m_bConnected = true;
00899 
00900    // register this socket for receiving data packets
00901    m_pRNode->m_bOnList = true;
00902    m_pRcvQueue->setNewEntry(this);
00903 
00904    //send the response to the peer, see listen() for more discussions about this
00905    CPacket response;
00906    int size = CHandShake::m_iContentSize;
00907    char* buffer = new char[size];
00908    hs->serialize(buffer, size);
00909    response.pack(0, NULL, buffer, size);
00910    response.m_iID = m_PeerID;
00911    m_pSndQueue->sendto(peer, response);
00912    delete [] buffer;
00913 }
00914 
00915 void CUDT::close()
00916 {
00917    if (!m_bOpened)
00918       return;
00919 
00920    if (0 != m_Linger.l_onoff)
00921    {
00922       uint64_t entertime = CTimer::getTime();
00923 
00924       while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() > 0) && (CTimer::getTime() - entertime < m_Linger.l_linger * 1000000ULL))
00925       {
00926          // linger has been checked by previous close() call and has expired
00927          if (m_ullLingerExpiration >= entertime)
00928             break;
00929 
00930          if (!m_bSynSending)
00931          {
00932             // if this socket enables asynchronous sending, return immediately and let GC to close it later
00933             if (0 == m_ullLingerExpiration)
00934                m_ullLingerExpiration = entertime + m_Linger.l_linger * 1000000ULL;
00935 
00936             return;
00937          }
00938 
00939          #ifndef WIN32
00940             timespec ts;
00941             ts.tv_sec = 0;
00942             ts.tv_nsec = 1000000;
00943             nanosleep(&ts, NULL);
00944          #else
00945             Sleep(1);
00946          #endif
00947       }
00948    }
00949 
00950    // remove this socket from the snd queue
00951    if (m_bConnected)
00952       m_pSndQueue->m_pSndUList->remove(this);
00953 
00954    // remove itself from all epoll monitoring
00955    try
00956    {
00957       for (set<int>::iterator i = m_sPollID.begin(); i != m_sPollID.end(); ++ i)
00958          s_UDTUnited.m_EPoll.remove_usock(*i, m_SocketID);
00959    }
00960    catch (...)
00961    {
00962    }
00963 
00964    if (!m_bOpened)
00965       return;
00966 
00967    // Inform the threads handler to stop.
00968    m_bClosing = true;
00969 
00970    CGuard cg(m_ConnectionLock);
00971 
00972    // Signal the sender and recver if they are waiting for data.
00973    releaseSynch();
00974 
00975    if (m_bListening)
00976    {
00977       m_bListening = false;
00978       m_pRcvQueue->removeListener(this);
00979    }
00980    else if (m_bConnecting)
00981    {
00982       m_pRcvQueue->removeConnector(m_SocketID);
00983    }
00984 
00985    if (m_bConnected)
00986    {
00987       if (!m_bShutdown)
00988          sendCtrl(5);
00989 
00990       m_pCC->close();
00991 
00992       // Store current connection information.
00993       CInfoBlock ib;
00994       ib.m_iIPversion = m_iIPversion;
00995       CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);
00996       ib.m_iRTT = m_iRTT;
00997       ib.m_iBandwidth = m_iBandwidth;
00998       m_pCache->update(&ib);
00999 
01000       m_bConnected = false;
01001    }
01002 
01003    // waiting all send and recv calls to stop
01004    CGuard sendguard(m_SendLock);
01005    CGuard recvguard(m_RecvLock);
01006 
01007    // CLOSED.
01008    m_bOpened = false;
01009 }
01010 
01011 int CUDT::send(const char* data, int len)
01012 {
01013    if (UDT_DGRAM == m_iSockType)
01014       throw CUDTException(5, 10, 0);
01015 
01016    // throw an exception if not connected
01017    if (m_bBroken || m_bClosing)
01018       throw CUDTException(2, 1, 0);
01019    else if (!m_bConnected)
01020       throw CUDTException(2, 2, 0);
01021 
01022    if (len <= 0)
01023       return 0;
01024 
01025    CGuard sendguard(m_SendLock);
01026 
01027    if (m_pSndBuffer->getCurrBufSize() == 0)
01028    {
01029       // delay the EXP timer to avoid mis-fired timeout
01030       uint64_t currtime;
01031       CTimer::rdtsc(currtime);
01032       m_ullLastRspTime = currtime;
01033    }
01034 
01035    if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())
01036    {
01037       if (!m_bSynSending)
01038          throw CUDTException(6, 1, 0);
01039       else
01040       {
01041          // wait here during a blocking sending
01042          #ifndef WIN32
01043             pthread_mutex_lock(&m_SendBlockLock);
01044             if (m_iSndTimeOut < 0) 
01045             { 
01046                while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth)
01047                   pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock);
01048             }
01049             else
01050             {
01051                uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL;
01052                timespec locktime; 
01053     
01054                locktime.tv_sec = exptime / 1000000;
01055                locktime.tv_nsec = (exptime % 1000000) * 1000;
01056 
01057                while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth && (CTimer::getTime() < exptime))
01058                   pthread_cond_timedwait(&m_SendBlockCond, &m_SendBlockLock, &locktime);
01059             }
01060             pthread_mutex_unlock(&m_SendBlockLock);
01061          #else
01062             if (m_iSndTimeOut < 0)
01063             {
01064                while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth)
01065                   WaitForSingleObject(m_SendBlockCond, INFINITE);
01066             }
01067             else 
01068             {
01069                uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL;
01070 
01071                while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth && (CTimer::getTime() < exptime))
01072                   WaitForSingleObject(m_SendBlockCond, DWORD((exptime - CTimer::getTime()) / 1000)); 
01073             }
01074          #endif
01075 
01076          // check the connection status
01077          if (m_bBroken || m_bClosing)
01078             throw CUDTException(2, 1, 0);
01079          else if (!m_bConnected)
01080             throw CUDTException(2, 2, 0);
01081          else if (!m_bPeerHealth)
01082          {
01083             m_bPeerHealth = true;
01084             throw CUDTException(7);
01085          }
01086       }
01087    }
01088 
01089    if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())
01090    {
01091       if (m_iSndTimeOut >= 0)
01092          throw CUDTException(6, 3, 0); 
01093 
01094       return 0;
01095    }
01096 
01097    int size = (m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize;
01098    if (size > len)
01099       size = len;
01100 
01101    // record total time used for sending
01102    if (0 == m_pSndBuffer->getCurrBufSize())
01103       m_llSndDurationCounter = CTimer::getTime();
01104 
01105    // insert the user buffer into the sening list
01106    m_pSndBuffer->addBuffer(data, size);
01107 
01108    // insert this socket to snd list if it is not on the list yet
01109    m_pSndQueue->m_pSndUList->update(this, false);
01110 
01111    if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())
01112    {
01113       // write is not available any more
01114       s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);
01115    }
01116 
01117    return size;
01118 }
01119 
01120 int CUDT::recv(char* data, int len)
01121 {
01122    if (UDT_DGRAM == m_iSockType)
01123       throw CUDTException(5, 10, 0);
01124 
01125    // throw an exception if not connected
01126    if (!m_bConnected)
01127       throw CUDTException(2, 2, 0);
01128    else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize()))
01129       throw CUDTException(2, 1, 0);
01130 
01131    if (len <= 0)
01132       return 0;
01133 
01134    CGuard recvguard(m_RecvLock);
01135 
01136    if (0 == m_pRcvBuffer->getRcvDataSize())
01137    {
01138       if (!m_bSynRecving)
01139          throw CUDTException(6, 2, 0);
01140       else
01141       {
01142          #ifndef WIN32
01143             pthread_mutex_lock(&m_RecvDataLock);
01144             if (m_iRcvTimeOut < 0) 
01145             { 
01146                while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
01147                   pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock);
01148             }
01149             else
01150             {
01151                uint64_t exptime = CTimer::getTime() + m_iRcvTimeOut * 1000ULL; 
01152                timespec locktime; 
01153     
01154                locktime.tv_sec = exptime / 1000000;
01155                locktime.tv_nsec = (exptime % 1000000) * 1000;
01156 
01157                while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
01158                {
01159                   pthread_cond_timedwait(&m_RecvDataCond, &m_RecvDataLock, &locktime); 
01160                   if (CTimer::getTime() >= exptime)
01161                      break;
01162                }
01163             }
01164             pthread_mutex_unlock(&m_RecvDataLock);
01165          #else
01166             if (m_iRcvTimeOut < 0)
01167             {
01168                while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
01169                   WaitForSingleObject(m_RecvDataCond, INFINITE);
01170             }
01171             else
01172             {
01173                uint64_t enter_time = CTimer::getTime();
01174 
01175                while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
01176                {
01177                   int diff = int(CTimer::getTime() - enter_time) / 1000;
01178                   if (diff >= m_iRcvTimeOut)
01179                       break;
01180                   WaitForSingleObject(m_RecvDataCond, DWORD(m_iRcvTimeOut - diff ));
01181                }
01182             }
01183          #endif
01184       }
01185    }
01186 
01187    // throw an exception if not connected
01188    if (!m_bConnected)
01189       throw CUDTException(2, 2, 0);
01190    else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize()))
01191       throw CUDTException(2, 1, 0);
01192 
01193    int res = m_pRcvBuffer->readBuffer(data, len);
01194 
01195    if (m_pRcvBuffer->getRcvDataSize() <= 0)
01196    {
01197       // read is not available any more
01198       s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
01199    }
01200 
01201    if ((res <= 0) && (m_iRcvTimeOut >= 0))
01202       throw CUDTException(6, 3, 0);
01203 
01204    return res;
01205 }
01206 
01207 int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder)
01208 {
01209    if (UDT_STREAM == m_iSockType)
01210       throw CUDTException(5, 9, 0);
01211 
01212    // throw an exception if not connected
01213    if (m_bBroken || m_bClosing)
01214       throw CUDTException(2, 1, 0);
01215    else if (!m_bConnected)
01216       throw CUDTException(2, 2, 0);
01217 
01218    if (len <= 0)
01219       return 0;
01220 
01221    if (len > m_iSndBufSize * m_iPayloadSize)
01222       throw CUDTException(5, 12, 0);
01223 
01224    CGuard sendguard(m_SendLock);
01225 
01226    if (m_pSndBuffer->getCurrBufSize() == 0)
01227    {
01228       // delay the EXP timer to avoid mis-fired timeout
01229       uint64_t currtime;
01230       CTimer::rdtsc(currtime);
01231       m_ullLastRspTime = currtime;
01232    }
01233 
01234    if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len)
01235    {
01236       if (!m_bSynSending)
01237          throw CUDTException(6, 1, 0);
01238       else
01239       {
01240          // wait here during a blocking sending
01241          #ifndef WIN32
01242             pthread_mutex_lock(&m_SendBlockLock);
01243             if (m_iSndTimeOut < 0)
01244             {
01245                while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len))
01246                   pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock);
01247             }
01248             else
01249             {
01250                uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL;
01251                timespec locktime;
01252 
01253                locktime.tv_sec = exptime / 1000000;
01254                locktime.tv_nsec = (exptime % 1000000) * 1000;
01255 
01256                while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) && (CTimer::getTime() < exptime))
01257                   pthread_cond_timedwait(&m_SendBlockCond, &m_SendBlockLock, &locktime);
01258             }
01259             pthread_mutex_unlock(&m_SendBlockLock);
01260          #else
01261             if (m_iSndTimeOut < 0)
01262             {
01263                while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len))
01264                   WaitForSingleObject(m_SendBlockCond, INFINITE);
01265             }
01266             else
01267             {
01268                uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL;
01269 
01270                while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) && (CTimer::getTime() < exptime))
01271                   WaitForSingleObject(m_SendBlockCond, DWORD((exptime - CTimer::getTime()) / 1000));
01272             }
01273          #endif
01274 
01275          // check the connection status
01276          if (m_bBroken || m_bClosing)
01277             throw CUDTException(2, 1, 0);
01278          else if (!m_bConnected)
01279             throw CUDTException(2, 2, 0);
01280       }
01281    }
01282 
01283    if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len)
01284    {
01285       if (m_iSndTimeOut >= 0)
01286          throw CUDTException(6, 3, 0);
01287 
01288       return 0;
01289    }
01290 
01291    // record total time used for sending
01292    if (0 == m_pSndBuffer->getCurrBufSize())
01293       m_llSndDurationCounter = CTimer::getTime();
01294 
01295    // insert the user buffer into the sening list
01296    m_pSndBuffer->addBuffer(data, len, msttl, inorder);
01297 
01298    // insert this socket to the snd list if it is not on the list yet
01299    m_pSndQueue->m_pSndUList->update(this, false);
01300 
01301    if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())
01302    {
01303       // write is not available any more
01304       s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);
01305    }
01306 
01307    return len;   
01308 }
01309 
01310 int CUDT::recvmsg(char* data, int len)
01311 {
01312    if (UDT_STREAM == m_iSockType)
01313       throw CUDTException(5, 9, 0);
01314 
01315    // throw an exception if not connected
01316    if (!m_bConnected)
01317       throw CUDTException(2, 2, 0);
01318 
01319    if (len <= 0)
01320       return 0;
01321 
01322    CGuard recvguard(m_RecvLock);
01323 
01324    if (m_bBroken || m_bClosing)
01325    {
01326       int res = m_pRcvBuffer->readMsg(data, len);
01327 
01328       if (m_pRcvBuffer->getRcvMsgNum() <= 0)
01329       {
01330          // read is not available any more
01331          s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
01332       }
01333 
01334       if (0 == res)
01335          throw CUDTException(2, 1, 0);
01336       else
01337          return res;
01338    }
01339 
01340    if (!m_bSynRecving)
01341    {
01342       int res = m_pRcvBuffer->readMsg(data, len);
01343       if (0 == res)
01344          throw CUDTException(6, 2, 0);
01345       else
01346          return res;
01347    }
01348 
01349    int res = 0;
01350    bool timeout = false;
01351 
01352    do
01353    {
01354       #ifndef WIN32
01355          pthread_mutex_lock(&m_RecvDataLock);
01356 
01357          if (m_iRcvTimeOut < 0)
01358          {
01359             while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_pRcvBuffer->readMsg(data, len))))
01360                pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock);
01361          }
01362          else
01363          {
01364             uint64_t exptime = CTimer::getTime() + m_iRcvTimeOut * 1000ULL;
01365             timespec locktime;
01366 
01367             locktime.tv_sec = exptime / 1000000;
01368             locktime.tv_nsec = (exptime % 1000000) * 1000;
01369 
01370             if (pthread_cond_timedwait(&m_RecvDataCond, &m_RecvDataLock, &locktime) == ETIMEDOUT)
01371                timeout = true;
01372 
01373             res = m_pRcvBuffer->readMsg(data, len);           
01374          }
01375          pthread_mutex_unlock(&m_RecvDataLock);
01376       #else
01377          if (m_iRcvTimeOut < 0)
01378          {
01379             while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_pRcvBuffer->readMsg(data, len))))
01380                WaitForSingleObject(m_RecvDataCond, INFINITE);
01381          }
01382          else
01383          {
01384             if (WaitForSingleObject(m_RecvDataCond, DWORD(m_iRcvTimeOut)) == WAIT_TIMEOUT)
01385                timeout = true;
01386 
01387             res = m_pRcvBuffer->readMsg(data, len);
01388          }
01389       #endif
01390 
01391       if (m_bBroken || m_bClosing)
01392          throw CUDTException(2, 1, 0);
01393       else if (!m_bConnected)
01394          throw CUDTException(2, 2, 0);
01395    } while ((0 == res) && !timeout);
01396 
01397    if (m_pRcvBuffer->getRcvMsgNum() <= 0)
01398    {
01399       // read is not available any more
01400       s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
01401    }
01402 
01403    if ((res <= 0) && (m_iRcvTimeOut >= 0))
01404       throw CUDTException(6, 3, 0);
01405 
01406    return res;
01407 }
01408 
01409 int64_t CUDT::sendfile(fstream& ifs, int64_t& offset, int64_t size, int block)
01410 {
01411    if (UDT_DGRAM == m_iSockType)
01412       throw CUDTException(5, 10, 0);
01413 
01414    if (m_bBroken || m_bClosing)
01415       throw CUDTException(2, 1, 0);
01416    else if (!m_bConnected)
01417       throw CUDTException(2, 2, 0);
01418 
01419    if (size <= 0)
01420       return 0;
01421 
01422    CGuard sendguard(m_SendLock);
01423 
01424    if (m_pSndBuffer->getCurrBufSize() == 0)
01425    {
01426       // delay the EXP timer to avoid mis-fired timeout
01427       uint64_t currtime;
01428       CTimer::rdtsc(currtime);
01429       m_ullLastRspTime = currtime;
01430    }
01431 
01432    int64_t tosend = size;
01433    int unitsize;
01434 
01435    // positioning...
01436    try
01437    {
01438       ifs.seekg((streamoff)offset);
01439    }
01440    catch (...)
01441    {
01442       throw CUDTException(4, 1);
01443    }
01444 
01445    // sending block by block
01446    while (tosend > 0)
01447    {
01448       if (ifs.fail())
01449          throw CUDTException(4, 4);
01450 
01451       if (ifs.eof())
01452          break;
01453 
01454       unitsize = int((tosend >= block) ? block : tosend);
01455 
01456       #ifndef WIN32
01457          pthread_mutex_lock(&m_SendBlockLock);
01458          while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth)
01459             pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock);
01460          pthread_mutex_unlock(&m_SendBlockLock);
01461       #else
01462          while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth)
01463             WaitForSingleObject(m_SendBlockCond, INFINITE);
01464       #endif
01465 
01466       if (m_bBroken || m_bClosing)
01467          throw CUDTException(2, 1, 0);
01468       else if (!m_bConnected)
01469          throw CUDTException(2, 2, 0);
01470       else if (!m_bPeerHealth)
01471       {
01472          // reset peer health status, once this error returns, the app should handle the situation at the peer side
01473          m_bPeerHealth = true;
01474          throw CUDTException(7);
01475       }
01476 
01477       // record total time used for sending
01478       if (0 == m_pSndBuffer->getCurrBufSize())
01479          m_llSndDurationCounter = CTimer::getTime();
01480 
01481       int64_t sentsize = m_pSndBuffer->addBufferFromFile(ifs, unitsize);
01482 
01483       if (sentsize > 0)
01484       {
01485          tosend -= sentsize;
01486          offset += sentsize;
01487       }
01488 
01489       // insert this socket to snd list if it is not on the list yet
01490       m_pSndQueue->m_pSndUList->update(this, false);
01491    }
01492 
01493    if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())
01494    {
01495       // write is not available any more
01496       s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);
01497    }
01498 
01499    return size - tosend;
01500 }
01501 
01502 int64_t CUDT::recvfile(fstream& ofs, int64_t& offset, int64_t size, int block)
01503 {
01504    if (UDT_DGRAM == m_iSockType)
01505       throw CUDTException(5, 10, 0);
01506 
01507    if (!m_bConnected)
01508       throw CUDTException(2, 2, 0);
01509    else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize()))
01510       throw CUDTException(2, 1, 0);
01511 
01512    if (size <= 0)
01513       return 0;
01514 
01515    CGuard recvguard(m_RecvLock);
01516 
01517    int64_t torecv = size;
01518    int unitsize = block;
01519    int recvsize;
01520 
01521    // positioning...
01522    try
01523    {
01524       ofs.seekp((streamoff)offset);
01525    }
01526    catch (...)
01527    {
01528       throw CUDTException(4, 3);
01529    }
01530 
01531    // receiving... "recvfile" is always blocking
01532    while (torecv > 0)
01533    {
01534       if (ofs.fail())
01535       {
01536          // send the sender a signal so it will not be blocked forever
01537          int32_t err_code = CUDTException::EFILE;
01538          sendCtrl(8, &err_code);
01539 
01540          throw CUDTException(4, 4);
01541       }
01542 
01543       #ifndef WIN32
01544          pthread_mutex_lock(&m_RecvDataLock);
01545          while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
01546             pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock);
01547          pthread_mutex_unlock(&m_RecvDataLock);
01548       #else
01549          while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
01550             WaitForSingleObject(m_RecvDataCond, INFINITE);
01551       #endif
01552 
01553       if (!m_bConnected)
01554          throw CUDTException(2, 2, 0);
01555       else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize()))
01556          throw CUDTException(2, 1, 0);
01557 
01558       unitsize = int((torecv >= block) ? block : torecv);
01559       recvsize = m_pRcvBuffer->readBufferToFile(ofs, unitsize);
01560 
01561       if (recvsize > 0)
01562       {
01563          torecv -= recvsize;
01564          offset += recvsize;
01565       }
01566    }
01567 
01568    if (m_pRcvBuffer->getRcvDataSize() <= 0)
01569    {
01570       // read is not available any more
01571       s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
01572    }
01573 
01574    return size - torecv;
01575 }
01576 
01577 void CUDT::sample(CPerfMon* perf, bool clear)
01578 {
01579    if (!m_bConnected)
01580       throw CUDTException(2, 2, 0);
01581    if (m_bBroken || m_bClosing)
01582       throw CUDTException(2, 1, 0);
01583 
01584    uint64_t currtime = CTimer::getTime();
01585    perf->msTimeStamp = (currtime - m_StartTime) / 1000;
01586 
01587    perf->pktSent = m_llTraceSent;
01588    perf->pktRecv = m_llTraceRecv;
01589    perf->pktSndLoss = m_iTraceSndLoss;
01590    perf->pktRcvLoss = m_iTraceRcvLoss;
01591    perf->pktRetrans = m_iTraceRetrans;
01592    perf->pktSentACK = m_iSentACK;
01593    perf->pktRecvACK = m_iRecvACK;
01594    perf->pktSentNAK = m_iSentNAK;
01595    perf->pktRecvNAK = m_iRecvNAK;
01596    perf->usSndDuration = m_llSndDuration;
01597 
01598    perf->pktSentTotal = m_llSentTotal;
01599    perf->pktRecvTotal = m_llRecvTotal;
01600    perf->pktSndLossTotal = m_iSndLossTotal;
01601    perf->pktRcvLossTotal = m_iRcvLossTotal;
01602    perf->pktRetransTotal = m_iRetransTotal;
01603    perf->pktSentACKTotal = m_iSentACKTotal;
01604    perf->pktRecvACKTotal = m_iRecvACKTotal;
01605    perf->pktSentNAKTotal = m_iSentNAKTotal;
01606    perf->pktRecvNAKTotal = m_iRecvNAKTotal;
01607    perf->usSndDurationTotal = m_llSndDurationTotal;
01608 
01609    double interval = double(currtime - m_LastSampleTime);
01610 
01611    perf->mbpsSendRate = double(m_llTraceSent) * m_iPayloadSize * 8.0 / interval;
01612    perf->mbpsRecvRate = double(m_llTraceRecv) * m_iPayloadSize * 8.0 / interval;
01613 
01614    perf->usPktSndPeriod = m_ullInterval / double(m_ullCPUFrequency);
01615    perf->pktFlowWindow = m_iFlowWindowSize;
01616    perf->pktCongestionWindow = (int)m_dCongestionWindow;
01617    perf->pktFlightSize = CSeqNo::seqlen(m_iSndLastAck, CSeqNo::incseq(m_iSndCurrSeqNo)) - 1;
01618    perf->msRTT = m_iRTT/1000.0;
01619    perf->mbpsBandwidth = m_iBandwidth * m_iPayloadSize * 8.0 / 1000000.0;
01620 
01621    #ifndef WIN32
01622       if (0 == pthread_mutex_trylock(&m_ConnectionLock))
01623    #else
01624       if (WAIT_OBJECT_0 == WaitForSingleObject(m_ConnectionLock, 0))
01625    #endif
01626    {
01627       perf->byteAvailSndBuf = (NULL == m_pSndBuffer) ? 0 : (m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iMSS;
01628       perf->byteAvailRcvBuf = (NULL == m_pRcvBuffer) ? 0 : m_pRcvBuffer->getAvailBufSize() * m_iMSS;
01629 
01630       #ifndef WIN32
01631          pthread_mutex_unlock(&m_ConnectionLock);
01632       #else
01633          ReleaseMutex(m_ConnectionLock);
01634       #endif
01635    }
01636    else
01637    {
01638       perf->byteAvailSndBuf = 0;
01639       perf->byteAvailRcvBuf = 0;
01640    }
01641 
01642    if (clear)
01643    {
01644       m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceRcvLoss = m_iTraceRetrans = m_iSentACK = m_iRecvACK = m_iSentNAK = m_iRecvNAK = 0;
01645       m_llSndDuration = 0;
01646       m_LastSampleTime = currtime;
01647    }
01648 }
01649 
01650 void CUDT::CCUpdate()
01651 {
01652    m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
01653    m_dCongestionWindow = m_pCC->m_dCWndSize;
01654 
01655    if (m_llMaxBW <= 0)
01656       return;
01657    const double minSP = 1000000.0 / (double(m_llMaxBW) / m_iMSS) * m_ullCPUFrequency;
01658    if (m_ullInterval < minSP)
01659        m_ullInterval = minSP;
01660 }
01661 
01662 void CUDT::initSynch()
01663 {
01664    #ifndef WIN32
01665       pthread_mutex_init(&m_SendBlockLock, NULL);
01666       pthread_cond_init(&m_SendBlockCond, NULL);
01667       pthread_mutex_init(&m_RecvDataLock, NULL);
01668       pthread_cond_init(&m_RecvDataCond, NULL);
01669       pthread_mutex_init(&m_SendLock, NULL);
01670       pthread_mutex_init(&m_RecvLock, NULL);
01671       pthread_mutex_init(&m_AckLock, NULL);
01672       pthread_mutex_init(&m_ConnectionLock, NULL);
01673    #else
01674       m_SendBlockLock = CreateMutex(NULL, false, NULL);
01675       m_SendBlockCond = CreateEvent(NULL, false, false, NULL);
01676       m_RecvDataLock = CreateMutex(NULL, false, NULL);
01677       m_RecvDataCond = CreateEvent(NULL, false, false, NULL);
01678       m_SendLock = CreateMutex(NULL, false, NULL);
01679       m_RecvLock = CreateMutex(NULL, false, NULL);
01680       m_AckLock = CreateMutex(NULL, false, NULL);
01681       m_ConnectionLock = CreateMutex(NULL, false, NULL);
01682    #endif
01683 }
01684 
01685 void CUDT::destroySynch()
01686 {
01687    #ifndef WIN32
01688       pthread_mutex_destroy(&m_SendBlockLock);
01689       pthread_cond_destroy(&m_SendBlockCond);
01690       pthread_mutex_destroy(&m_RecvDataLock);
01691       pthread_cond_destroy(&m_RecvDataCond);
01692       pthread_mutex_destroy(&m_SendLock);
01693       pthread_mutex_destroy(&m_RecvLock);
01694       pthread_mutex_destroy(&m_AckLock);
01695       pthread_mutex_destroy(&m_ConnectionLock);
01696    #else
01697       CloseHandle(m_SendBlockLock);
01698       CloseHandle(m_SendBlockCond);
01699       CloseHandle(m_RecvDataLock);
01700       CloseHandle(m_RecvDataCond);
01701       CloseHandle(m_SendLock);
01702       CloseHandle(m_RecvLock);
01703       CloseHandle(m_AckLock);
01704       CloseHandle(m_ConnectionLock);
01705    #endif
01706 }
01707 
01708 void CUDT::releaseSynch()
01709 {
01710    #ifndef WIN32
01711       // wake up user calls
01712       pthread_mutex_lock(&m_SendBlockLock);
01713       pthread_cond_signal(&m_SendBlockCond);
01714       pthread_mutex_unlock(&m_SendBlockLock);
01715 
01716       pthread_mutex_lock(&m_SendLock);
01717       pthread_mutex_unlock(&m_SendLock);
01718 
01719       pthread_mutex_lock(&m_RecvDataLock);
01720       pthread_cond_signal(&m_RecvDataCond);
01721       pthread_mutex_unlock(&m_RecvDataLock);
01722 
01723       pthread_mutex_lock(&m_RecvLock);
01724       pthread_mutex_unlock(&m_RecvLock);
01725    #else
01726       SetEvent(m_SendBlockCond);
01727       WaitForSingleObject(m_SendLock, INFINITE);
01728       ReleaseMutex(m_SendLock);
01729       SetEvent(m_RecvDataCond);
01730       WaitForSingleObject(m_RecvLock, INFINITE);
01731       ReleaseMutex(m_RecvLock);
01732    #endif
01733 }
01734 
01735 void CUDT::sendCtrl(int pkttype, void* lparam, void* rparam, int size)
01736 {
01737    CPacket ctrlpkt;
01738 
01739    switch (pkttype)
01740    {
01741    case 2: //010 - Acknowledgement
01742       {
01743       int32_t ack;
01744 
01745       // If there is no loss, the ACK is the current largest sequence number plus 1;
01746       // Otherwise it is the smallest sequence number in the receiver loss list.
01747       if (0 == m_pRcvLossList->getLossLength())
01748          ack = CSeqNo::incseq(m_iRcvCurrSeqNo);
01749       else
01750          ack = m_pRcvLossList->getFirstLostSeq();
01751 
01752       if (ack == m_iRcvLastAckAck)
01753          break;
01754 
01755       // send out a lite ACK
01756       // to save time on buffer processing and bandwidth/AS measurement, a lite ACK only feeds back an ACK number
01757       if (4 == size)
01758       {
01759          ctrlpkt.pack(pkttype, NULL, &ack, size);
01760          ctrlpkt.m_iID = m_PeerID;
01761          m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01762 
01763          break;
01764       }
01765 
01766       uint64_t currtime;
01767       CTimer::rdtsc(currtime);
01768 
01769       // There are new received packets to acknowledge, update related information.
01770       if (CSeqNo::seqcmp(ack, m_iRcvLastAck) > 0)
01771       {
01772          int acksize = CSeqNo::seqoff(m_iRcvLastAck, ack);
01773 
01774          m_iRcvLastAck = ack;
01775 
01776          m_pRcvBuffer->ackData(acksize);
01777 
01778          // signal a waiting "recv" call if there is any data available
01779          #ifndef WIN32
01780             pthread_mutex_lock(&m_RecvDataLock);
01781             if (m_bSynRecving)
01782                pthread_cond_signal(&m_RecvDataCond);
01783             pthread_mutex_unlock(&m_RecvDataLock);
01784          #else
01785             if (m_bSynRecving)
01786                SetEvent(m_RecvDataCond);
01787          #endif
01788 
01789          // acknowledge any waiting epolls to read
01790          s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, true);
01791       }
01792       else if (ack == m_iRcvLastAck)
01793       {
01794          if ((currtime - m_ullLastAckTime) < ((m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency))
01795             break;
01796       }
01797       else
01798          break;
01799 
01800       // Send out the ACK only if has not been received by the sender before
01801       if (CSeqNo::seqcmp(m_iRcvLastAck, m_iRcvLastAckAck) > 0)
01802       {
01803          int32_t data[6];
01804 
01805          m_iAckSeqNo = CAckNo::incack(m_iAckSeqNo);
01806          data[0] = m_iRcvLastAck;
01807          data[1] = m_iRTT;
01808          data[2] = m_iRTTVar;
01809          data[3] = m_pRcvBuffer->getAvailBufSize();
01810          // a minimum flow window of 2 is used, even if buffer is full, to break potential deadlock
01811          if (data[3] < 2)
01812             data[3] = 2;
01813 
01814          if (currtime - m_ullLastAckTime > m_ullSYNInt)
01815          {
01816             data[4] = m_pRcvTimeWindow->getPktRcvSpeed();
01817             data[5] = m_pRcvTimeWindow->getBandwidth();
01818             ctrlpkt.pack(pkttype, &m_iAckSeqNo, data, 24);
01819 
01820             CTimer::rdtsc(m_ullLastAckTime);
01821          }
01822          else
01823          {
01824             ctrlpkt.pack(pkttype, &m_iAckSeqNo, data, 16);
01825          }
01826 
01827          ctrlpkt.m_iID = m_PeerID;
01828          m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01829 
01830          m_pACKWindow->store(m_iAckSeqNo, m_iRcvLastAck);
01831 
01832          ++ m_iSentACK;
01833          ++ m_iSentACKTotal;
01834       }
01835 
01836       break;
01837       }
01838 
01839    case 6: //110 - Acknowledgement of Acknowledgement
01840       ctrlpkt.pack(pkttype, lparam);
01841       ctrlpkt.m_iID = m_PeerID;
01842       m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01843 
01844       break;
01845 
01846    case 3: //011 - Loss Report
01847       {
01848       if (NULL != rparam)
01849       {
01850          if (1 == size)
01851          {
01852             // only 1 loss packet
01853             ctrlpkt.pack(pkttype, NULL, (int32_t *)rparam + 1, 4);
01854          }
01855          else
01856          {
01857             // more than 1 loss packets
01858             ctrlpkt.pack(pkttype, NULL, rparam, 8);
01859          }
01860 
01861          ctrlpkt.m_iID = m_PeerID;
01862          m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01863 
01864          ++ m_iSentNAK;
01865          ++ m_iSentNAKTotal;
01866       }
01867       else if (m_pRcvLossList->getLossLength() > 0)
01868       {
01869          // this is periodically NAK report; make sure NAK cannot be sent back too often
01870 
01871          // read loss list from the local receiver loss list
01872          int32_t* data = new int32_t[m_iPayloadSize / 4];
01873          int losslen;
01874          m_pRcvLossList->getLossArray(data, losslen, m_iPayloadSize / 4);
01875 
01876          if (0 < losslen)
01877          {
01878             ctrlpkt.pack(pkttype, NULL, data, losslen * 4);
01879             ctrlpkt.m_iID = m_PeerID;
01880             m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01881 
01882             ++ m_iSentNAK;
01883             ++ m_iSentNAKTotal;
01884          }
01885 
01886          delete [] data;
01887       }
01888 
01889       // update next NAK time, which should wait enough time for the retansmission, but not too long
01890       m_ullNAKInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency;
01891       int rcv_speed = m_pRcvTimeWindow->getPktRcvSpeed();
01892       if (rcv_speed > 0)
01893          m_ullNAKInt += (m_pRcvLossList->getLossLength() * 1000000ULL / rcv_speed) * m_ullCPUFrequency;
01894       if (m_ullNAKInt < m_ullMinNakInt)
01895          m_ullNAKInt = m_ullMinNakInt;
01896 
01897       break;
01898       }
01899 
01900    case 4: //100 - Congestion Warning
01901       ctrlpkt.pack(pkttype);
01902       ctrlpkt.m_iID = m_PeerID;
01903       m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01904 
01905       CTimer::rdtsc(m_ullLastWarningTime);
01906 
01907       break;
01908 
01909    case 1: //001 - Keep-alive
01910       ctrlpkt.pack(pkttype);
01911       ctrlpkt.m_iID = m_PeerID;
01912       m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01913  
01914       break;
01915 
01916    case 0: //000 - Handshake
01917       ctrlpkt.pack(pkttype, NULL, rparam, sizeof(CHandShake));
01918       ctrlpkt.m_iID = m_PeerID;
01919       m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01920 
01921       break;
01922 
01923    case 5: //101 - Shutdown
01924       ctrlpkt.pack(pkttype);
01925       ctrlpkt.m_iID = m_PeerID;
01926       m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01927 
01928       break;
01929 
01930    case 7: //111 - Msg drop request
01931       ctrlpkt.pack(pkttype, lparam, rparam, 8);
01932       ctrlpkt.m_iID = m_PeerID;
01933       m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01934 
01935       break;
01936 
01937    case 8: //1000 - acknowledge the peer side a special error
01938       ctrlpkt.pack(pkttype, lparam);
01939       ctrlpkt.m_iID = m_PeerID;
01940       m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01941 
01942       break;
01943 
01944    case 32767: //0x7FFF - Resevered for future use
01945       break;
01946 
01947    default:
01948       break;
01949    }
01950 }
01951 
01952 void CUDT::processCtrl(CPacket& ctrlpkt)
01953 {
01954    // Just heard from the peer, reset the expiration count.
01955    m_iEXPCount = 1;
01956    uint64_t currtime;
01957    CTimer::rdtsc(currtime);
01958    m_ullLastRspTime = currtime;
01959 
01960    switch (ctrlpkt.getType())
01961    {
01962    case 2: //010 - Acknowledgement
01963       {
01964       int32_t ack;
01965 
01966       // process a lite ACK
01967       if (4 == ctrlpkt.getLength())
01968       {
01969          ack = *(int32_t *)ctrlpkt.m_pcData;
01970          if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0)
01971          {
01972             m_iFlowWindowSize -= CSeqNo::seqoff(m_iSndLastAck, ack);
01973             m_iSndLastAck = ack;
01974          }
01975 
01976          break;
01977       }
01978 
01979        // read ACK seq. no.
01980       ack = ctrlpkt.getAckSeqNo();
01981 
01982       // send ACK acknowledgement
01983       // number of ACK2 can be much less than number of ACK
01984       uint64_t now = CTimer::getTime();
01985       if ((currtime - m_ullSndLastAck2Time > (uint64_t)m_iSYNInterval) || (ack == m_iSndLastAck2))
01986       {
01987          sendCtrl(6, &ack);
01988          m_iSndLastAck2 = ack;
01989          m_ullSndLastAck2Time = now;
01990       }
01991 
01992       // Got data ACK
01993       ack = *(int32_t *)ctrlpkt.m_pcData;
01994 
01995       // check the validation of the ack
01996       if (CSeqNo::seqcmp(ack, CSeqNo::incseq(m_iSndCurrSeqNo)) > 0)
01997       {
01998          //this should not happen: attack or bug
01999          m_bBroken = true;
02000          m_iBrokenCounter = 0;
02001          break;
02002       }
02003 
02004       if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0)
02005       {
02006          // Update Flow Window Size, must update before and together with m_iSndLastAck
02007          m_iFlowWindowSize = *((int32_t *)ctrlpkt.m_pcData + 3);
02008          m_iSndLastAck = ack;
02009       }
02010 
02011       // protect packet retransmission
02012       CGuard::enterCS(m_AckLock);
02013 
02014       int offset = CSeqNo::seqoff(m_iSndLastDataAck, ack);
02015       if (offset <= 0)
02016       {
02017          // discard it if it is a repeated ACK
02018          CGuard::leaveCS(m_AckLock);
02019          break;
02020       }
02021 
02022       // acknowledge the sending buffer
02023       m_pSndBuffer->ackData(offset);
02024 
02025       // record total time used for sending
02026       m_llSndDuration += currtime - m_llSndDurationCounter;
02027       m_llSndDurationTotal += currtime - m_llSndDurationCounter;
02028       m_llSndDurationCounter = currtime;
02029 
02030       // update sending variables
02031       m_iSndLastDataAck = ack;
02032       m_pSndLossList->remove(CSeqNo::decseq(m_iSndLastDataAck));
02033 
02034       CGuard::leaveCS(m_AckLock);
02035 
02036       #ifndef WIN32
02037          pthread_mutex_lock(&m_SendBlockLock);
02038          if (m_bSynSending)
02039             pthread_cond_signal(&m_SendBlockCond);
02040          pthread_mutex_unlock(&m_SendBlockLock);
02041       #else
02042          if (m_bSynSending)
02043             SetEvent(m_SendBlockCond);
02044       #endif
02045 
02046       // acknowledde any waiting epolls to write
02047       s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);
02048 
02049       // insert this socket to snd list if it is not on the list yet
02050       m_pSndQueue->m_pSndUList->update(this, false);
02051 
02052       // Update RTT
02053       //m_iRTT = *((int32_t *)ctrlpkt.m_pcData + 1);
02054       //m_iRTTVar = *((int32_t *)ctrlpkt.m_pcData + 2);
02055       int rtt = *((int32_t *)ctrlpkt.m_pcData + 1);
02056       m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2;
02057       m_iRTT = (m_iRTT * 7 + rtt) >> 3;
02058 
02059       m_pCC->setRTT(m_iRTT);
02060 
02061       if (ctrlpkt.getLength() > 16)
02062       {
02063          // Update Estimated Bandwidth and packet delivery rate
02064          if (*((int32_t *)ctrlpkt.m_pcData + 4) > 0)
02065             m_iDeliveryRate = (m_iDeliveryRate * 7 + *((int32_t *)ctrlpkt.m_pcData + 4)) >> 3;
02066 
02067          if (*((int32_t *)ctrlpkt.m_pcData + 5) > 0)
02068             m_iBandwidth = (m_iBandwidth * 7 + *((int32_t *)ctrlpkt.m_pcData + 5)) >> 3;
02069 
02070          m_pCC->setRcvRate(m_iDeliveryRate);
02071          m_pCC->setBandwidth(m_iBandwidth);
02072       }
02073 
02074       m_pCC->onACK(ack);
02075       CCUpdate();
02076 
02077       ++ m_iRecvACK;
02078       ++ m_iRecvACKTotal;
02079 
02080       break;
02081       }
02082 
02083    case 6: //110 - Acknowledgement of Acknowledgement
02084       {
02085       int32_t ack;
02086       int rtt = -1;
02087 
02088       // update RTT
02089       rtt = m_pACKWindow->acknowledge(ctrlpkt.getAckSeqNo(), ack);
02090       if (rtt <= 0)
02091          break;
02092 
02093       //if increasing delay detected...
02094       //   sendCtrl(4);
02095 
02096       // RTT EWMA
02097       m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2;
02098       m_iRTT = (m_iRTT * 7 + rtt) >> 3;
02099 
02100       m_pCC->setRTT(m_iRTT);
02101 
02102       // update last ACK that has been received by the sender
02103       if (CSeqNo::seqcmp(ack, m_iRcvLastAckAck) > 0)
02104          m_iRcvLastAckAck = ack;
02105 
02106       break;
02107       }
02108 
02109    case 3: //011 - Loss Report
02110       {
02111       int32_t* losslist = (int32_t *)(ctrlpkt.m_pcData);
02112 
02113       m_pCC->onLoss(losslist, ctrlpkt.getLength() / 4);
02114       CCUpdate();
02115 
02116       bool secure = true;
02117 
02118       // decode loss list message and insert loss into the sender loss list
02119       for (int i = 0, n = (int)(ctrlpkt.getLength() / 4); i < n; ++ i)
02120       {
02121          if (0 != (losslist[i] & 0x80000000))
02122          {
02123             if ((CSeqNo::seqcmp(losslist[i] & 0x7FFFFFFF, losslist[i + 1]) > 0) || (CSeqNo::seqcmp(losslist[i + 1], m_iSndCurrSeqNo) > 0))
02124             {
02125                // seq_a must not be greater than seq_b; seq_b must not be greater than the most recent sent seq
02126                secure = false;
02127                break;
02128             }
02129 
02130             int num = 0;
02131             if (CSeqNo::seqcmp(losslist[i] & 0x7FFFFFFF, m_iSndLastAck) >= 0)
02132                num = m_pSndLossList->insert(losslist[i] & 0x7FFFFFFF, losslist[i + 1]);
02133             else if (CSeqNo::seqcmp(losslist[i + 1], m_iSndLastAck) >= 0)
02134                num = m_pSndLossList->insert(m_iSndLastAck, losslist[i + 1]);
02135 
02136             m_iTraceSndLoss += num;
02137             m_iSndLossTotal += num;
02138 
02139             ++ i;
02140          }
02141          else if (CSeqNo::seqcmp(losslist[i], m_iSndLastAck) >= 0)
02142          {
02143             if (CSeqNo::seqcmp(losslist[i], m_iSndCurrSeqNo) > 0)
02144             {
02145                //seq_a must not be greater than the most recent sent seq
02146                secure = false;
02147                break;
02148             }
02149 
02150             int num = m_pSndLossList->insert(losslist[i], losslist[i]);
02151 
02152             m_iTraceSndLoss += num;
02153             m_iSndLossTotal += num;
02154          }
02155       }
02156 
02157       if (!secure)
02158       {
02159          //this should not happen: attack or bug
02160          m_bBroken = true;
02161          m_iBrokenCounter = 0;
02162          break;
02163       }
02164 
02165       // the lost packet (retransmission) should be sent out immediately
02166       m_pSndQueue->m_pSndUList->update(this);
02167 
02168       ++ m_iRecvNAK;
02169       ++ m_iRecvNAKTotal;
02170 
02171       break;
02172       }
02173 
02174    case 4: //100 - Delay Warning
02175       // One way packet delay is increasing, so decrease the sending rate
02176       m_ullInterval = (uint64_t)ceil(m_ullInterval * 1.125);
02177       m_iLastDecSeq = m_iSndCurrSeqNo;
02178 
02179       break;
02180 
02181    case 1: //001 - Keep-alive
02182       // The only purpose of keep-alive packet is to tell that the peer is still alive
02183       // nothing needs to be done.
02184 
02185       break;
02186 
02187    case 0: //000 - Handshake
02188       {
02189       CHandShake req;
02190       req.deserialize(ctrlpkt.m_pcData, ctrlpkt.getLength());
02191       if ((req.m_iReqType > 0) || (m_bRendezvous && (req.m_iReqType != -2)))
02192       {
02193          // The peer side has not received the handshake message, so it keeps querying
02194          // resend the handshake packet
02195 
02196          CHandShake initdata;
02197          initdata.m_iISN = m_iISN;
02198          initdata.m_iMSS = m_iMSS;
02199          initdata.m_iFlightFlagSize = m_iFlightFlagSize;
02200          initdata.m_iReqType = (!m_bRendezvous) ? -1 : -2;
02201          initdata.m_iID = m_SocketID;
02202 
02203          char* hs = new char [m_iPayloadSize];
02204          int hs_size = m_iPayloadSize;
02205          initdata.serialize(hs, hs_size);
02206          sendCtrl(0, NULL, hs, hs_size);
02207          delete [] hs;
02208       }
02209 
02210       break;
02211       }
02212 
02213    case 5: //101 - Shutdown
02214       m_bShutdown = true;
02215       m_bClosing = true;
02216       m_bBroken = true;
02217       m_iBrokenCounter = 60;
02218 
02219       // Signal the sender and recver if they are waiting for data.
02220       releaseSynch();
02221 
02222       CTimer::triggerEvent();
02223 
02224       break;
02225 
02226    case 7: //111 - Msg drop request
02227       m_pRcvBuffer->dropMsg(ctrlpkt.getMsgSeq());
02228       m_pRcvLossList->remove(*(int32_t*)ctrlpkt.m_pcData, *(int32_t*)(ctrlpkt.m_pcData + 4));
02229 
02230       // move forward with current recv seq no.
02231       if ((CSeqNo::seqcmp(*(int32_t*)ctrlpkt.m_pcData, CSeqNo::incseq(m_iRcvCurrSeqNo)) <= 0)
02232          && (CSeqNo::seqcmp(*(int32_t*)(ctrlpkt.m_pcData + 4), m_iRcvCurrSeqNo) > 0))
02233       {
02234          m_iRcvCurrSeqNo = *(int32_t*)(ctrlpkt.m_pcData + 4);
02235       }
02236 
02237       break;
02238 
02239    case 8: // 1000 - An error has happened to the peer side
02240       //int err_type = packet.getAddInfo();
02241 
02242       // currently only this error is signalled from the peer side
02243       // if recvfile() failes (e.g., due to disk fail), blcoked sendfile/send should return immediately
02244       // giving the app a chance to fix the issue
02245 
02246       m_bPeerHealth = false;
02247 
02248       break;
02249 
02250    case 32767: //0x7FFF - reserved and user defined messages
02251       m_pCC->processCustomMsg(&ctrlpkt);
02252       CCUpdate();
02253 
02254       break;
02255 
02256    default:
02257       break;
02258    }
02259 }
02260 
02261 int CUDT::packData(CPacket& packet, uint64_t& ts)
02262 {
02263    int payload = 0;
02264    bool probe = false;
02265 
02266    uint64_t entertime;
02267    CTimer::rdtsc(entertime);
02268 
02269    if ((0 != m_ullTargetTime) && (entertime > m_ullTargetTime))
02270       m_ullTimeDiff += entertime - m_ullTargetTime;
02271 
02272    // Loss retransmission always has higher priority.
02273    if ((packet.m_iSeqNo = m_pSndLossList->getLostSeq()) >= 0)
02274    {
02275       // protect m_iSndLastDataAck from updating by ACK processing
02276       CGuard ackguard(m_AckLock);
02277 
02278       int offset = CSeqNo::seqoff(m_iSndLastDataAck, packet.m_iSeqNo);
02279       if (offset < 0)
02280          return 0;
02281 
02282       int msglen;
02283 
02284       payload = m_pSndBuffer->readData(&(packet.m_pcData), offset, packet.m_iMsgNo, msglen);
02285 
02286       if (-1 == payload)
02287       {
02288          int32_t seqpair[2];
02289          seqpair[0] = packet.m_iSeqNo;
02290          seqpair[1] = CSeqNo::incseq(seqpair[0], msglen);
02291          sendCtrl(7, &packet.m_iMsgNo, seqpair, 8);
02292 
02293          // only one msg drop request is necessary
02294          m_pSndLossList->remove(seqpair[1]);
02295 
02296          // skip all dropped packets
02297          if (CSeqNo::seqcmp(m_iSndCurrSeqNo, CSeqNo::incseq(seqpair[1])) < 0)
02298              m_iSndCurrSeqNo = CSeqNo::incseq(seqpair[1]);
02299 
02300          return 0;
02301       }
02302       else if (0 == payload)
02303          return 0;
02304 
02305       ++ m_iTraceRetrans;
02306       ++ m_iRetransTotal;
02307    }
02308    else
02309    {
02310       // If no loss, pack a new packet.
02311 
02312       // check congestion/flow window limit
02313       int cwnd = (m_iFlowWindowSize < (int)m_dCongestionWindow) ? m_iFlowWindowSize : (int)m_dCongestionWindow;
02314       if (cwnd >= CSeqNo::seqlen(m_iSndLastAck, CSeqNo::incseq(m_iSndCurrSeqNo)))
02315       {
02316          if (0 != (payload = m_pSndBuffer->readData(&(packet.m_pcData), packet.m_iMsgNo)))
02317          {
02318             m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo);
02319             m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);
02320 
02321             packet.m_iSeqNo = m_iSndCurrSeqNo;
02322 
02323             // every 16 (0xF) packets, a packet pair is sent
02324             if (0 == (packet.m_iSeqNo & 0xF))
02325                probe = true;
02326          }
02327          else
02328          {
02329             m_ullTargetTime = 0;
02330             m_ullTimeDiff = 0;
02331             ts = 0;
02332             return 0;
02333          }
02334       }
02335       else
02336       {
02337          m_ullTargetTime = 0;
02338          m_ullTimeDiff = 0;
02339          ts = 0;
02340          return 0;
02341       }
02342    }
02343 
02344    packet.m_iTimeStamp = int(CTimer::getTime() - m_StartTime);
02345    packet.m_iID = m_PeerID;
02346    packet.setLength(payload);
02347 
02348    m_pCC->onPktSent(&packet);
02349    //m_pSndTimeWindow->onPktSent(packet.m_iTimeStamp);
02350 
02351    ++ m_llTraceSent;
02352    ++ m_llSentTotal;
02353 
02354    if (probe)
02355    {
02356       // sends out probing packet pair
02357       ts = entertime;
02358       probe = false;
02359    }
02360    else
02361    {
02362       #ifndef NO_BUSY_WAITING
02363          ts = entertime + m_ullInterval;
02364       #else
02365          if (m_ullTimeDiff >= m_ullInterval)
02366          {
02367             ts = entertime;
02368             m_ullTimeDiff -= m_ullInterval;
02369          }
02370          else
02371          {
02372             ts = entertime + m_ullInterval - m_ullTimeDiff;
02373             m_ullTimeDiff = 0;
02374          }
02375       #endif
02376    }
02377 
02378    m_ullTargetTime = ts;
02379 
02380    return payload;
02381 }
02382 
02383 int CUDT::processData(CUnit* unit)
02384 {
02385    CPacket& packet = unit->m_Packet;
02386 
02387    // Just heard from the peer, reset the expiration count.
02388    m_iEXPCount = 1;
02389    uint64_t currtime;
02390    CTimer::rdtsc(currtime);
02391    m_ullLastRspTime = currtime;
02392 
02393    m_pCC->onPktReceived(&packet);
02394    ++ m_iPktCount;
02395    // update time information
02396    m_pRcvTimeWindow->onPktArrival();
02397 
02398    // check if it is probing packet pair
02399    if (0 == (packet.m_iSeqNo & 0xF))
02400       m_pRcvTimeWindow->probe1Arrival();
02401    else if (1 == (packet.m_iSeqNo & 0xF))
02402       m_pRcvTimeWindow->probe2Arrival();
02403 
02404    ++ m_llTraceRecv;
02405    ++ m_llRecvTotal;
02406 
02407    int32_t offset = CSeqNo::seqoff(m_iRcvLastAck, packet.m_iSeqNo);
02408    if ((offset < 0) || (offset >= m_pRcvBuffer->getAvailBufSize()))
02409       return -1;
02410 
02411    if (m_pRcvBuffer->addData(unit, offset) < 0)
02412       return -1;
02413 
02414    // Loss detection.
02415    if (CSeqNo::seqcmp(packet.m_iSeqNo, CSeqNo::incseq(m_iRcvCurrSeqNo)) > 0)
02416    {
02417       // If loss found, insert them to the receiver loss list
02418       m_pRcvLossList->insert(CSeqNo::incseq(m_iRcvCurrSeqNo), CSeqNo::decseq(packet.m_iSeqNo));
02419 
02420       // pack loss list for NAK
02421       int32_t lossdata[2];
02422       lossdata[0] = CSeqNo::incseq(m_iRcvCurrSeqNo) | 0x80000000;
02423       lossdata[1] = CSeqNo::decseq(packet.m_iSeqNo);
02424 
02425       // Generate loss report immediately.
02426       sendCtrl(3, NULL, lossdata, (CSeqNo::incseq(m_iRcvCurrSeqNo) == CSeqNo::decseq(packet.m_iSeqNo)) ? 1 : 2);
02427 
02428       int loss = CSeqNo::seqlen(m_iRcvCurrSeqNo, packet.m_iSeqNo) - 2;
02429       m_iTraceRcvLoss += loss;
02430       m_iRcvLossTotal += loss;
02431    }
02432 
02433    // This is not a regular fixed size packet...   
02434    //an irregular sized packet usually indicates the end of a message, so send an ACK immediately   
02435    if (packet.getLength() != m_iPayloadSize)   
02436       CTimer::rdtsc(m_ullNextACKTime); 
02437 
02438    // Update the current largest sequence number that has been received.
02439    // Or it is a retransmitted packet, remove it from receiver loss list.
02440    if (CSeqNo::seqcmp(packet.m_iSeqNo, m_iRcvCurrSeqNo) > 0)
02441       m_iRcvCurrSeqNo = packet.m_iSeqNo;
02442    else
02443       m_pRcvLossList->remove(packet.m_iSeqNo);
02444 
02445    return 0;
02446 }
02447 
02448 int CUDT::listen(sockaddr* addr, CPacket& packet)
02449 {
02450    if (m_bClosing)
02451       return 1002;
02452 
02453    if (packet.getLength() != CHandShake::m_iContentSize)
02454       return 1004;
02455 
02456    CHandShake hs;
02457    hs.deserialize(packet.m_pcData, packet.getLength());
02458 
02459    // SYN cookie
02460    char clienthost[NI_MAXHOST];
02461    char clientport[NI_MAXSERV];
02462    getnameinfo(addr, (AF_INET == m_iVersion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6), clienthost, sizeof(clienthost), clientport, sizeof(clientport), NI_NUMERICHOST|NI_NUMERICSERV);
02463    int64_t timestamp = (CTimer::getTime() - m_StartTime) / 60000000; // secret changes every one minute
02464    stringstream cookiestr;
02465    cookiestr << clienthost << ":" << clientport << ":" << timestamp;
02466    unsigned char cookie[16];
02467    CMD5::compute(cookiestr.str().c_str(), cookie);
02468 
02469    if (1 == hs.m_iReqType)
02470    {
02471       hs.m_iCookie = *(int*)cookie;
02472       packet.m_iID = hs.m_iID;
02473       int size = packet.getLength();
02474       hs.serialize(packet.m_pcData, size);
02475       m_pSndQueue->sendto(addr, packet);
02476       return 0;
02477    }
02478    else
02479    {
02480       if (hs.m_iCookie != *(int*)cookie)
02481       {
02482          timestamp --;
02483          cookiestr << clienthost << ":" << clientport << ":" << timestamp;
02484          CMD5::compute(cookiestr.str().c_str(), cookie);
02485 
02486          if (hs.m_iCookie != *(int*)cookie)
02487             return -1;
02488       }
02489    }
02490 
02491    int32_t id = hs.m_iID;
02492 
02493    // When a peer side connects in...
02494    if ((1 == packet.getFlag()) && (0 == packet.getType()))
02495    {
02496       if ((hs.m_iVersion != m_iVersion) || (hs.m_iType != m_iSockType))
02497       {
02498          // mismatch, reject the request
02499          hs.m_iReqType = 1002;
02500          int size = CHandShake::m_iContentSize;
02501          hs.serialize(packet.m_pcData, size);
02502          packet.m_iID = id;
02503          m_pSndQueue->sendto(addr, packet);
02504       }
02505       else
02506       {
02507          int result = s_UDTUnited.newConnection(m_SocketID, addr, &hs);
02508          if (result == -1)
02509             hs.m_iReqType = 1002;
02510 
02511          // send back a response if connection failed or connection already existed
02512          // new connection response should be sent in connect()
02513          if (result != 1)
02514          {
02515             int size = CHandShake::m_iContentSize;
02516             hs.serialize(packet.m_pcData, size);
02517             packet.m_iID = id;
02518             m_pSndQueue->sendto(addr, packet);
02519          }
02520          else
02521          {
02522             // a new connection has been created, enable epoll for write 
02523             s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);
02524          }
02525       }
02526    }
02527 
02528    return hs.m_iReqType;
02529 }
02530 
02531 void CUDT::checkTimers()
02532 {
02533    // update CC parameters
02534    CCUpdate();
02535    //uint64_t minint = (uint64_t)(m_ullCPUFrequency * m_pSndTimeWindow->getMinPktSndInt() * 0.9);
02536    //if (m_ullInterval < minint)
02537    //   m_ullInterval = minint;
02538 
02539    uint64_t currtime;
02540    CTimer::rdtsc(currtime);
02541 
02542    if ((currtime > m_ullNextACKTime) || ((m_pCC->m_iACKInterval > 0) && (m_pCC->m_iACKInterval <= m_iPktCount)))
02543    {
02544       // ACK timer expired or ACK interval is reached
02545 
02546       sendCtrl(2);
02547       CTimer::rdtsc(currtime);
02548       if (m_pCC->m_iACKPeriod > 0)
02549          m_ullNextACKTime = currtime + m_pCC->m_iACKPeriod * m_ullCPUFrequency;
02550       else
02551          m_ullNextACKTime = currtime + m_ullACKInt;
02552 
02553       m_iPktCount = 0;
02554       m_iLightACKCount = 1;
02555    }
02556    else if (m_iSelfClockInterval * m_iLightACKCount <= m_iPktCount)
02557    {
02558       //send a "light" ACK
02559       sendCtrl(2, NULL, NULL, 4);
02560       ++ m_iLightACKCount;
02561    }
02562 
02563    // we are not sending back repeated NAK anymore and rely on the sender's EXP for retransmission
02564    //if ((m_pRcvLossList->getLossLength() > 0) && (currtime > m_ullNextNAKTime))
02565    //{
02566    //   // NAK timer expired, and there is loss to be reported.
02567    //   sendCtrl(3);
02568    //
02569    //   CTimer::rdtsc(currtime);
02570    //   m_ullNextNAKTime = currtime + m_ullNAKInt;
02571    //}
02572 
02573    uint64_t next_exp_time;
02574    if (m_pCC->m_bUserDefinedRTO)
02575       next_exp_time = m_ullLastRspTime + m_pCC->m_iRTO * m_ullCPUFrequency;
02576    else
02577    {
02578       uint64_t exp_int = (m_iEXPCount * (m_iRTT + 4 * m_iRTTVar) + m_iSYNInterval) * m_ullCPUFrequency;
02579       if (exp_int < m_iEXPCount * m_ullMinExpInt)
02580          exp_int = m_iEXPCount * m_ullMinExpInt;
02581       next_exp_time = m_ullLastRspTime + exp_int;
02582    }
02583 
02584    if (currtime > next_exp_time)
02585    {
02586       // Haven't receive any information from the peer, is it dead?!
02587       // timeout: at least 16 expirations and must be greater than 10 seconds
02588       if ((m_iEXPCount > 16) && (currtime - m_ullLastRspTime > 5000000 * m_ullCPUFrequency))
02589       {
02590          //
02591          // Connection is broken. 
02592          // UDT does not signal any information about this instead of to stop quietly.
02593          // Application will detect this when it calls any UDT methods next time.
02594          //
02595          m_bClosing = true;
02596          m_bBroken = true;
02597          m_iBrokenCounter = 30;
02598 
02599          // update snd U list to remove this socket
02600          m_pSndQueue->m_pSndUList->update(this);
02601 
02602          releaseSynch();
02603 
02604          // app can call any UDT API to learn the connection_broken error
02605          s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN | UDT_EPOLL_OUT | UDT_EPOLL_ERR, true);
02606 
02607          CTimer::triggerEvent();
02608 
02609          return;
02610       }
02611 
02612       // sender: Insert all the packets sent after last received acknowledgement into the sender loss list.
02613       // recver: Send out a keep-alive packet
02614       if (m_pSndBuffer->getCurrBufSize() > 0)
02615       {
02616          if ((CSeqNo::incseq(m_iSndCurrSeqNo) != m_iSndLastAck) && (m_pSndLossList->getLossLength() == 0))
02617          {
02618             // resend all unacknowledged packets on timeout, but only if there is no packet in the loss list
02619             int32_t csn = m_iSndCurrSeqNo;
02620             int num = m_pSndLossList->insert(m_iSndLastAck, csn);
02621             m_iTraceSndLoss += num;
02622             m_iSndLossTotal += num;
02623          }
02624 
02625          m_pCC->onTimeout();
02626          CCUpdate();
02627 
02628          // immediately restart transmission
02629          m_pSndQueue->m_pSndUList->update(this);
02630       }
02631       else
02632       {
02633          sendCtrl(1);
02634       }
02635 
02636       ++ m_iEXPCount;
02637       // Reset last response time since we just sent a heart-beat.
02638       m_ullLastRspTime = currtime;
02639    }
02640 }
02641 
02642 void CUDT::addEPoll(const int eid)
02643 {
02644    CGuard::enterCS(s_UDTUnited.m_EPoll.m_EPollLock);
02645    m_sPollID.insert(eid);
02646    CGuard::leaveCS(s_UDTUnited.m_EPoll.m_EPollLock);
02647 
02648    if (!m_bConnected || m_bBroken || m_bClosing)
02649       return;
02650 
02651    if (((UDT_STREAM == m_iSockType) && (m_pRcvBuffer->getRcvDataSize() > 0)) ||
02652       ((UDT_DGRAM == m_iSockType) && (m_pRcvBuffer->getRcvMsgNum() > 0)))
02653    {
02654       s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, true);
02655    }
02656    if (m_iSndBufSize > m_pSndBuffer->getCurrBufSize())
02657    {
02658       s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);
02659    }
02660 }
02661 
02662 void CUDT::removeEPoll(const int eid)
02663 {
02664    CGuard::enterCS(s_UDTUnited.m_EPoll.m_EPollLock);
02665    m_sPollID.erase(eid);
02666    CGuard::leaveCS(s_UDTUnited.m_EPoll.m_EPollLock);
02667 
02668    // clear IO events notifications;
02669    // since this happens after the epoll ID has been removed, they cannot be set again
02670    s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN | UDT_EPOLL_OUT, false);
02671 }

Generated on 9 Feb 2013 for barchart-udt-core-2.2.2 by  doxygen 1.6.1