00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041 #ifndef WIN32
00042 #include <unistd.h>
00043 #include <netdb.h>
00044 #include <arpa/inet.h>
00045 #include <cerrno>
00046 #include <cstring>
00047 #include <cstdlib>
00048 #else
00049 #include <winsock2.h>
00050 #include <ws2tcpip.h>
00051 #ifdef LEGACY_WIN32
00052 #include <wspiapi.h>
00053 #endif
00054 #endif
00055 #include <cmath>
00056 #include <sstream>
00057 #include "queue.h"
00058 #include "core.h"
00059
00060 using namespace std;
00061
00062
00063 CUDTUnited CUDT::s_UDTUnited;
00064
00065 const UDTSOCKET CUDT::INVALID_SOCK = -1;
00066 const int CUDT::ERROR = -1;
00067
00068 const UDTSOCKET UDT::INVALID_SOCK = CUDT::INVALID_SOCK;
00069 const int UDT::ERROR = CUDT::ERROR;
00070
00071 const int32_t CSeqNo::m_iSeqNoTH = 0x3FFFFFFF;
00072 const int32_t CSeqNo::m_iMaxSeqNo = 0x7FFFFFFF;
00073 const int32_t CAckNo::m_iMaxAckSeqNo = 0x7FFFFFFF;
00074 const int32_t CMsgNo::m_iMsgNoTH = 0xFFFFFFF;
00075 const int32_t CMsgNo::m_iMaxMsgNo = 0x1FFFFFFF;
00076
00077 const int CUDT::m_iVersion = 4;
00078 const int CUDT::m_iSYNInterval = 10000;
00079 const int CUDT::m_iSelfClockInterval = 64;
00080
00081
00082 CUDT::CUDT()
00083 {
00084 m_pSndBuffer = NULL;
00085 m_pRcvBuffer = NULL;
00086 m_pSndLossList = NULL;
00087 m_pRcvLossList = NULL;
00088 m_pACKWindow = NULL;
00089 m_pSndTimeWindow = NULL;
00090 m_pRcvTimeWindow = NULL;
00091
00092 m_pSndQueue = NULL;
00093 m_pRcvQueue = NULL;
00094 m_pPeerAddr = NULL;
00095 m_pSNode = NULL;
00096 m_pRNode = NULL;
00097
00098
00099 initSynch();
00100
00101
00102 m_iMSS = 1500;
00103 m_bSynSending = true;
00104 m_bSynRecving = true;
00105 m_iFlightFlagSize = 25600;
00106 m_iSndBufSize = 8192;
00107 m_iRcvBufSize = 8192;
00108 m_Linger.l_onoff = 1;
00109 m_Linger.l_linger = 180;
00110 m_iUDPSndBufSize = 65536;
00111 m_iUDPRcvBufSize = m_iRcvBufSize * m_iMSS;
00112 m_iSockType = UDT_STREAM;
00113 m_iIPversion = AF_INET;
00114 m_bRendezvous = false;
00115 m_iSndTimeOut = -1;
00116 m_iRcvTimeOut = -1;
00117 m_bReuseAddr = true;
00118 m_llMaxBW = -1;
00119
00120 m_pCCFactory = new CCCFactory<CUDTCC>;
00121 m_pCC = NULL;
00122 m_pCache = NULL;
00123
00124
00125 m_bOpened = false;
00126 m_bListening = false;
00127 m_bConnecting = false;
00128 m_bConnected = false;
00129 m_bClosing = false;
00130 m_bShutdown = false;
00131 m_bBroken = false;
00132 m_bPeerHealth = true;
00133 m_ullLingerExpiration = 0;
00134 }
00135
00136 CUDT::CUDT(const CUDT& ancestor)
00137 {
00138 m_pSndBuffer = NULL;
00139 m_pRcvBuffer = NULL;
00140 m_pSndLossList = NULL;
00141 m_pRcvLossList = NULL;
00142 m_pACKWindow = NULL;
00143 m_pSndTimeWindow = NULL;
00144 m_pRcvTimeWindow = NULL;
00145
00146 m_pSndQueue = NULL;
00147 m_pRcvQueue = NULL;
00148 m_pPeerAddr = NULL;
00149 m_pSNode = NULL;
00150 m_pRNode = NULL;
00151
00152
00153 initSynch();
00154
00155
00156 m_iMSS = ancestor.m_iMSS;
00157 m_bSynSending = ancestor.m_bSynSending;
00158 m_bSynRecving = ancestor.m_bSynRecving;
00159 m_iFlightFlagSize = ancestor.m_iFlightFlagSize;
00160 m_iSndBufSize = ancestor.m_iSndBufSize;
00161 m_iRcvBufSize = ancestor.m_iRcvBufSize;
00162 m_Linger = ancestor.m_Linger;
00163 m_iUDPSndBufSize = ancestor.m_iUDPSndBufSize;
00164 m_iUDPRcvBufSize = ancestor.m_iUDPRcvBufSize;
00165 m_iSockType = ancestor.m_iSockType;
00166 m_iIPversion = ancestor.m_iIPversion;
00167 m_bRendezvous = ancestor.m_bRendezvous;
00168 m_iSndTimeOut = ancestor.m_iSndTimeOut;
00169 m_iRcvTimeOut = ancestor.m_iRcvTimeOut;
00170 m_bReuseAddr = true;
00171 m_llMaxBW = ancestor.m_llMaxBW;
00172
00173 m_pCCFactory = ancestor.m_pCCFactory->clone();
00174 m_pCC = NULL;
00175 m_pCache = ancestor.m_pCache;
00176
00177
00178 m_bOpened = false;
00179 m_bListening = false;
00180 m_bConnecting = false;
00181 m_bConnected = false;
00182 m_bClosing = false;
00183 m_bShutdown = false;
00184 m_bBroken = false;
00185 m_bPeerHealth = true;
00186 m_ullLingerExpiration = 0;
00187 }
00188
00189 CUDT::~CUDT()
00190 {
00191
00192 destroySynch();
00193
00194
00195 delete m_pSndBuffer;
00196 delete m_pRcvBuffer;
00197 delete m_pSndLossList;
00198 delete m_pRcvLossList;
00199 delete m_pACKWindow;
00200 delete m_pSndTimeWindow;
00201 delete m_pRcvTimeWindow;
00202 delete m_pCCFactory;
00203 delete m_pCC;
00204 delete m_pPeerAddr;
00205 delete m_pSNode;
00206 delete m_pRNode;
00207 }
00208
00209 void CUDT::setOpt(UDTOpt optName, const void* optval, int)
00210 {
00211 if (m_bBroken || m_bClosing)
00212 throw CUDTException(2, 1, 0);
00213
00214 CGuard cg(m_ConnectionLock);
00215 CGuard sendguard(m_SendLock);
00216 CGuard recvguard(m_RecvLock);
00217
00218 switch (optName)
00219 {
00220 case UDT_MSS:
00221 if (m_bOpened)
00222 throw CUDTException(5, 1, 0);
00223
00224 if (*(int*)optval < int(28 + CHandShake::m_iContentSize))
00225 throw CUDTException(5, 3, 0);
00226
00227 m_iMSS = *(int*)optval;
00228
00229
00230 if (m_iMSS > m_iUDPSndBufSize)
00231 m_iMSS = m_iUDPSndBufSize;
00232 if (m_iMSS > m_iUDPRcvBufSize)
00233 m_iMSS = m_iUDPRcvBufSize;
00234
00235 break;
00236
00237 case UDT_SNDSYN:
00238 m_bSynSending = *(bool *)optval;
00239 break;
00240
00241 case UDT_RCVSYN:
00242 m_bSynRecving = *(bool *)optval;
00243 break;
00244
00245 case UDT_CC:
00246 if (m_bConnecting || m_bConnected)
00247 throw CUDTException(5, 1, 0);
00248 if (NULL != m_pCCFactory)
00249 delete m_pCCFactory;
00250 m_pCCFactory = ((CCCVirtualFactory *)optval)->clone();
00251
00252 break;
00253
00254 case UDT_FC:
00255 if (m_bConnecting || m_bConnected)
00256 throw CUDTException(5, 2, 0);
00257
00258 if (*(int*)optval < 1)
00259 throw CUDTException(5, 3);
00260
00261
00262 if (*(int*)optval > 32)
00263 m_iFlightFlagSize = *(int*)optval;
00264 else
00265 m_iFlightFlagSize = 32;
00266
00267 break;
00268
00269 case UDT_SNDBUF:
00270 if (m_bOpened)
00271 throw CUDTException(5, 1, 0);
00272
00273 if (*(int*)optval <= 0)
00274 throw CUDTException(5, 3, 0);
00275
00276 m_iSndBufSize = *(int*)optval / (m_iMSS - 28);
00277
00278 break;
00279
00280 case UDT_RCVBUF:
00281 if (m_bOpened)
00282 throw CUDTException(5, 1, 0);
00283
00284 if (*(int*)optval <= 0)
00285 throw CUDTException(5, 3, 0);
00286
00287
00288 if (*(int*)optval > (m_iMSS - 28) * 32)
00289 m_iRcvBufSize = *(int*)optval / (m_iMSS - 28);
00290 else
00291 m_iRcvBufSize = 32;
00292
00293
00294 if (m_iRcvBufSize > m_iFlightFlagSize)
00295 m_iRcvBufSize = m_iFlightFlagSize;
00296
00297 break;
00298
00299 case UDT_LINGER:
00300 m_Linger = *(linger*)optval;
00301 break;
00302
00303 case UDP_SNDBUF:
00304 if (m_bOpened)
00305 throw CUDTException(5, 1, 0);
00306
00307 m_iUDPSndBufSize = *(int*)optval;
00308
00309 if (m_iUDPSndBufSize < m_iMSS)
00310 m_iUDPSndBufSize = m_iMSS;
00311
00312 break;
00313
00314 case UDP_RCVBUF:
00315 if (m_bOpened)
00316 throw CUDTException(5, 1, 0);
00317
00318 m_iUDPRcvBufSize = *(int*)optval;
00319
00320 if (m_iUDPRcvBufSize < m_iMSS)
00321 m_iUDPRcvBufSize = m_iMSS;
00322
00323 break;
00324
00325 case UDT_RENDEZVOUS:
00326 if (m_bConnecting || m_bConnected)
00327 throw CUDTException(5, 1, 0);
00328 m_bRendezvous = *(bool *)optval;
00329 break;
00330
00331 case UDT_SNDTIMEO:
00332 m_iSndTimeOut = *(int*)optval;
00333 break;
00334
00335 case UDT_RCVTIMEO:
00336 m_iRcvTimeOut = *(int*)optval;
00337 break;
00338
00339 case UDT_REUSEADDR:
00340 if (m_bOpened)
00341 throw CUDTException(5, 1, 0);
00342 m_bReuseAddr = *(bool*)optval;
00343 break;
00344
00345 case UDT_MAXBW:
00346 m_llMaxBW = *(int64_t*)optval;
00347 break;
00348
00349 default:
00350 throw CUDTException(5, 0, 0);
00351 }
00352 }
00353
00354 void CUDT::getOpt(UDTOpt optName, void* optval, int& optlen)
00355 {
00356 CGuard cg(m_ConnectionLock);
00357
00358 switch (optName)
00359 {
00360 case UDT_MSS:
00361 *(int*)optval = m_iMSS;
00362 optlen = sizeof(int);
00363 break;
00364
00365 case UDT_SNDSYN:
00366 *(bool*)optval = m_bSynSending;
00367 optlen = sizeof(bool);
00368 break;
00369
00370 case UDT_RCVSYN:
00371 *(bool*)optval = m_bSynRecving;
00372 optlen = sizeof(bool);
00373 break;
00374
00375 case UDT_CC:
00376 if (!m_bOpened)
00377 throw CUDTException(5, 5, 0);
00378 *(CCC**)optval = m_pCC;
00379 optlen = sizeof(CCC*);
00380
00381 break;
00382
00383 case UDT_FC:
00384 *(int*)optval = m_iFlightFlagSize;
00385 optlen = sizeof(int);
00386 break;
00387
00388 case UDT_SNDBUF:
00389 *(int*)optval = m_iSndBufSize * (m_iMSS - 28);
00390 optlen = sizeof(int);
00391 break;
00392
00393 case UDT_RCVBUF:
00394 *(int*)optval = m_iRcvBufSize * (m_iMSS - 28);
00395 optlen = sizeof(int);
00396 break;
00397
00398 case UDT_LINGER:
00399 if (optlen < (int)(sizeof(linger)))
00400 throw CUDTException(5, 3, 0);
00401
00402 *(linger*)optval = m_Linger;
00403 optlen = sizeof(linger);
00404 break;
00405
00406 case UDP_SNDBUF:
00407 *(int*)optval = m_iUDPSndBufSize;
00408 optlen = sizeof(int);
00409 break;
00410
00411 case UDP_RCVBUF:
00412 *(int*)optval = m_iUDPRcvBufSize;
00413 optlen = sizeof(int);
00414 break;
00415
00416 case UDT_RENDEZVOUS:
00417 *(bool *)optval = m_bRendezvous;
00418 optlen = sizeof(bool);
00419 break;
00420
00421 case UDT_SNDTIMEO:
00422 *(int*)optval = m_iSndTimeOut;
00423 optlen = sizeof(int);
00424 break;
00425
00426 case UDT_RCVTIMEO:
00427 *(int*)optval = m_iRcvTimeOut;
00428 optlen = sizeof(int);
00429 break;
00430
00431 case UDT_REUSEADDR:
00432 *(bool *)optval = m_bReuseAddr;
00433 optlen = sizeof(bool);
00434 break;
00435
00436 case UDT_MAXBW:
00437 *(int64_t*)optval = m_llMaxBW;
00438 optlen = sizeof(int64_t);
00439 break;
00440
00441 case UDT_STATE:
00442 *(int32_t*)optval = s_UDTUnited.getStatus(m_SocketID);
00443 optlen = sizeof(int32_t);
00444 break;
00445
00446 case UDT_EVENT:
00447 {
00448 int32_t event = 0;
00449 if (m_bBroken)
00450 event |= UDT_EPOLL_ERR;
00451 else
00452 {
00453 if (m_pRcvBuffer && (m_pRcvBuffer->getRcvDataSize() > 0))
00454 event |= UDT_EPOLL_IN;
00455 if (m_pSndBuffer && (m_iSndBufSize > m_pSndBuffer->getCurrBufSize()))
00456 event |= UDT_EPOLL_OUT;
00457 }
00458 *(int32_t*)optval = event;
00459 optlen = sizeof(int32_t);
00460 break;
00461 }
00462
00463 case UDT_SNDDATA:
00464 if (m_pSndBuffer)
00465 *(int32_t*)optval = m_pSndBuffer->getCurrBufSize();
00466 else
00467 *(int32_t*)optval = 0;
00468 optlen = sizeof(int32_t);
00469 break;
00470
00471 case UDT_RCVDATA:
00472 if (m_pRcvBuffer)
00473 *(int32_t*)optval = m_pRcvBuffer->getRcvDataSize();
00474 else
00475 *(int32_t*)optval = 0;
00476 optlen = sizeof(int32_t);
00477 break;
00478
00479 default:
00480 throw CUDTException(5, 0, 0);
00481 }
00482 }
00483
00484 void CUDT::open()
00485 {
00486 CGuard cg(m_ConnectionLock);
00487
00488
00489 m_iPktSize = m_iMSS - 28;
00490 m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;
00491
00492 m_iEXPCount = 1;
00493 m_iBandwidth = 1;
00494 m_iDeliveryRate = 16;
00495 m_iAckSeqNo = 0;
00496 m_ullLastAckTime = 0;
00497
00498
00499 m_StartTime = CTimer::getTime();
00500 m_llSentTotal = m_llRecvTotal = m_iSndLossTotal = m_iRcvLossTotal = m_iRetransTotal = m_iSentACKTotal = m_iRecvACKTotal = m_iSentNAKTotal = m_iRecvNAKTotal = 0;
00501 m_LastSampleTime = CTimer::getTime();
00502 m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceRcvLoss = m_iTraceRetrans = m_iSentACK = m_iRecvACK = m_iSentNAK = m_iRecvNAK = 0;
00503 m_llSndDuration = m_llSndDurationTotal = 0;
00504
00505
00506 if (NULL == m_pSNode)
00507 m_pSNode = new CSNode;
00508 m_pSNode->m_pUDT = this;
00509 m_pSNode->m_llTimeStamp = 1;
00510 m_pSNode->m_iHeapLoc = -1;
00511
00512 if (NULL == m_pRNode)
00513 m_pRNode = new CRNode;
00514 m_pRNode->m_pUDT = this;
00515 m_pRNode->m_llTimeStamp = 1;
00516 m_pRNode->m_pPrev = m_pRNode->m_pNext = NULL;
00517 m_pRNode->m_bOnList = false;
00518
00519 m_iRTT = 10 * m_iSYNInterval;
00520 m_iRTTVar = m_iRTT >> 1;
00521 m_ullCPUFrequency = CTimer::getCPUFrequency();
00522
00523
00524 m_ullSYNInt = m_iSYNInterval * m_ullCPUFrequency;
00525
00526
00527 m_ullMinNakInt = 300000 * m_ullCPUFrequency;
00528 m_ullMinExpInt = 300000 * m_ullCPUFrequency;
00529
00530 m_ullACKInt = m_ullSYNInt;
00531 m_ullNAKInt = m_ullMinNakInt;
00532
00533 uint64_t currtime;
00534 CTimer::rdtsc(currtime);
00535 m_ullLastRspTime = currtime;
00536 m_ullNextACKTime = currtime + m_ullSYNInt;
00537 m_ullNextNAKTime = currtime + m_ullNAKInt;
00538
00539 m_iPktCount = 0;
00540 m_iLightACKCount = 1;
00541
00542 m_ullTargetTime = 0;
00543 m_ullTimeDiff = 0;
00544
00545
00546 m_bOpened = true;
00547 }
00548
00549 void CUDT::listen()
00550 {
00551 CGuard cg(m_ConnectionLock);
00552
00553 if (!m_bOpened)
00554 throw CUDTException(5, 0, 0);
00555
00556 if (m_bConnecting || m_bConnected)
00557 throw CUDTException(5, 2, 0);
00558
00559
00560 if (m_bListening)
00561 return;
00562
00563
00564 if (m_pRcvQueue->setListener(this) < 0)
00565 throw CUDTException(5, 11, 0);
00566
00567 m_bListening = true;
00568 }
00569
00570 void CUDT::connect(const sockaddr* serv_addr)
00571 {
00572 CGuard cg(m_ConnectionLock);
00573
00574 if (!m_bOpened)
00575 throw CUDTException(5, 0, 0);
00576
00577 if (m_bListening)
00578 throw CUDTException(5, 2, 0);
00579
00580 if (m_bConnecting || m_bConnected)
00581 throw CUDTException(5, 2, 0);
00582
00583
00584 delete m_pPeerAddr;
00585 m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6;
00586 memcpy(m_pPeerAddr, serv_addr, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));
00587
00588
00589
00590 uint64_t ttl = 3000000;
00591 if (m_bRendezvous)
00592 ttl *= 10;
00593 ttl += CTimer::getTime();
00594 m_pRcvQueue->registerConnector(m_SocketID, this, m_iIPversion, serv_addr, ttl);
00595
00596
00597 m_ConnReq.m_iVersion = m_iVersion;
00598 m_ConnReq.m_iType = m_iSockType;
00599 m_ConnReq.m_iMSS = m_iMSS;
00600 m_ConnReq.m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize)? m_iRcvBufSize : m_iFlightFlagSize;
00601 m_ConnReq.m_iReqType = (!m_bRendezvous) ? 1 : 0;
00602 m_ConnReq.m_iID = m_SocketID;
00603 CIPAddress::ntop(serv_addr, m_ConnReq.m_piPeerIP, m_iIPversion);
00604
00605
00606 srand((unsigned int)CTimer::getTime());
00607 m_iISN = m_ConnReq.m_iISN = (int32_t)(CSeqNo::m_iMaxSeqNo * (double(rand()) / RAND_MAX));
00608
00609 m_iLastDecSeq = m_iISN - 1;
00610 m_iSndLastAck = m_iISN;
00611 m_iSndLastDataAck = m_iISN;
00612 m_iSndCurrSeqNo = m_iISN - 1;
00613 m_iSndLastAck2 = m_iISN;
00614 m_ullSndLastAck2Time = CTimer::getTime();
00615
00616
00617 CPacket request;
00618 char* reqdata = new char [m_iPayloadSize];
00619 request.pack(0, NULL, reqdata, m_iPayloadSize);
00620
00621 request.m_iID = 0;
00622
00623 int hs_size = m_iPayloadSize;
00624 m_ConnReq.serialize(reqdata, hs_size);
00625 request.setLength(hs_size);
00626 m_pSndQueue->sendto(serv_addr, request);
00627 m_llLastReqTime = CTimer::getTime();
00628
00629 m_bConnecting = true;
00630
00631
00632 if (!m_bSynRecving)
00633 {
00634 delete [] reqdata;
00635 return;
00636 }
00637
00638
00639 CPacket response;
00640 char* resdata = new char [m_iPayloadSize];
00641 response.pack(0, NULL, resdata, m_iPayloadSize);
00642
00643 CUDTException e(0, 0);
00644
00645 while (!m_bClosing)
00646 {
00647
00648 if (CTimer::getTime() - m_llLastReqTime > 250000)
00649 {
00650 m_ConnReq.serialize(reqdata, hs_size);
00651 request.setLength(hs_size);
00652 if (m_bRendezvous)
00653 request.m_iID = m_ConnRes.m_iID;
00654 m_pSndQueue->sendto(serv_addr, request);
00655 m_llLastReqTime = CTimer::getTime();
00656 }
00657
00658 response.setLength(m_iPayloadSize);
00659 if (m_pRcvQueue->recvfrom(m_SocketID, response) > 0)
00660 {
00661 if (connect(response) <= 0)
00662 break;
00663
00664
00665 m_llLastReqTime = 0;
00666 }
00667
00668 if (CTimer::getTime() > ttl)
00669 {
00670
00671 e = CUDTException(1, 1, 0);
00672 break;
00673 }
00674 }
00675
00676 delete [] reqdata;
00677 delete [] resdata;
00678
00679 if (e.getErrorCode() == 0)
00680 {
00681 if (m_bClosing)
00682 e = CUDTException(1);
00683 else if (1002 == m_ConnRes.m_iReqType)
00684 e = CUDTException(1, 2, 0);
00685 else if ((!m_bRendezvous) && (m_iISN != m_ConnRes.m_iISN))
00686 e = CUDTException(1, 4, 0);
00687 }
00688
00689 if (e.getErrorCode() != 0)
00690 throw e;
00691 }
00692
00693 int CUDT::connect(const CPacket& response) throw ()
00694 {
00695
00696
00697
00698
00699 if (!m_bConnecting)
00700 return -1;
00701
00702 if (m_bRendezvous && ((0 == response.getFlag()) || (1 == response.getType())) && (0 != m_ConnRes.m_iType))
00703 {
00704
00705
00706 goto POST_CONNECT;
00707 }
00708
00709 if ((1 != response.getFlag()) || (0 != response.getType()))
00710 return -1;
00711
00712 m_ConnRes.deserialize(response.m_pcData, response.getLength());
00713
00714 if (m_bRendezvous)
00715 {
00716
00717
00718 if (1 == m_ConnRes.m_iReqType)
00719 return -1;
00720
00721 if ((0 == m_ConnReq.m_iReqType) || (0 == m_ConnRes.m_iReqType))
00722 {
00723 m_ConnReq.m_iReqType = -1;
00724
00725 m_llLastReqTime = 0;
00726 return 1;
00727 }
00728 }
00729 else
00730 {
00731
00732 if (1 == m_ConnRes.m_iReqType)
00733 {
00734 m_ConnReq.m_iReqType = -1;
00735 m_ConnReq.m_iCookie = m_ConnRes.m_iCookie;
00736 m_llLastReqTime = 0;
00737 return 1;
00738 }
00739 }
00740
00741 POST_CONNECT:
00742
00743 m_pRcvQueue->removeConnector(m_SocketID);
00744
00745
00746 m_iMSS = m_ConnRes.m_iMSS;
00747 m_iFlowWindowSize = m_ConnRes.m_iFlightFlagSize;
00748 m_iPktSize = m_iMSS - 28;
00749 m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;
00750 m_iPeerISN = m_ConnRes.m_iISN;
00751 m_iRcvLastAck = m_ConnRes.m_iISN;
00752 m_iRcvLastAckAck = m_ConnRes.m_iISN;
00753 m_iRcvCurrSeqNo = m_ConnRes.m_iISN - 1;
00754 m_PeerID = m_ConnRes.m_iID;
00755 memcpy(m_piSelfIP, m_ConnRes.m_piPeerIP, 16);
00756
00757
00758 try
00759 {
00760 m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize);
00761 m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize);
00762
00763 m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2);
00764 m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize);
00765 m_pACKWindow = new CACKWindow(1024);
00766 m_pRcvTimeWindow = new CPktTimeWindow(16, 64);
00767 m_pSndTimeWindow = new CPktTimeWindow();
00768 }
00769 catch (...)
00770 {
00771 throw CUDTException(3, 2, 0);
00772 }
00773
00774 CInfoBlock ib;
00775 ib.m_iIPversion = m_iIPversion;
00776 CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);
00777 if (m_pCache->lookup(&ib) >= 0)
00778 {
00779 m_iRTT = ib.m_iRTT;
00780 m_iBandwidth = ib.m_iBandwidth;
00781 }
00782
00783 m_pCC = m_pCCFactory->create();
00784 m_pCC->m_UDT = m_SocketID;
00785 m_pCC->setMSS(m_iMSS);
00786 m_pCC->setMaxCWndSize(m_iFlowWindowSize);
00787 m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);
00788 m_pCC->setRcvRate(m_iDeliveryRate);
00789 m_pCC->setRTT(m_iRTT);
00790 m_pCC->setBandwidth(m_iBandwidth);
00791 m_pCC->init();
00792
00793 m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
00794 m_dCongestionWindow = m_pCC->m_dCWndSize;
00795
00796
00797 m_bConnecting = false;
00798 m_bConnected = true;
00799
00800
00801 m_pRNode->m_bOnList = true;
00802 m_pRcvQueue->setNewEntry(this);
00803
00804
00805 s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);
00806
00807
00808 s_UDTUnited.connect_complete(m_SocketID);
00809
00810 return 0;
00811 }
00812
00813 void CUDT::connect(const sockaddr* peer, CHandShake* hs)
00814 {
00815 CGuard cg(m_ConnectionLock);
00816
00817
00818 if (hs->m_iMSS > m_iMSS)
00819 hs->m_iMSS = m_iMSS;
00820 else
00821 m_iMSS = hs->m_iMSS;
00822
00823
00824 m_iFlowWindowSize = hs->m_iFlightFlagSize;
00825 hs->m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize)? m_iRcvBufSize : m_iFlightFlagSize;
00826
00827 m_iPeerISN = hs->m_iISN;
00828
00829 m_iRcvLastAck = hs->m_iISN;
00830 m_iRcvLastAckAck = hs->m_iISN;
00831 m_iRcvCurrSeqNo = hs->m_iISN - 1;
00832
00833 m_PeerID = hs->m_iID;
00834 hs->m_iID = m_SocketID;
00835
00836
00837 m_iISN = hs->m_iISN;
00838
00839 m_iLastDecSeq = m_iISN - 1;
00840 m_iSndLastAck = m_iISN;
00841 m_iSndLastDataAck = m_iISN;
00842 m_iSndCurrSeqNo = m_iISN - 1;
00843 m_iSndLastAck2 = m_iISN;
00844 m_ullSndLastAck2Time = CTimer::getTime();
00845
00846
00847 hs->m_iReqType = -1;
00848
00849
00850 memcpy(m_piSelfIP, hs->m_piPeerIP, 16);
00851 CIPAddress::ntop(peer, hs->m_piPeerIP, m_iIPversion);
00852
00853 m_iPktSize = m_iMSS - 28;
00854 m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;
00855
00856
00857 try
00858 {
00859 m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize);
00860 m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize);
00861 m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2);
00862 m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize);
00863 m_pACKWindow = new CACKWindow(1024);
00864 m_pRcvTimeWindow = new CPktTimeWindow(16, 64);
00865 m_pSndTimeWindow = new CPktTimeWindow();
00866 }
00867 catch (...)
00868 {
00869 throw CUDTException(3, 2, 0);
00870 }
00871
00872 CInfoBlock ib;
00873 ib.m_iIPversion = m_iIPversion;
00874 CInfoBlock::convert(peer, m_iIPversion, ib.m_piIP);
00875 if (m_pCache->lookup(&ib) >= 0)
00876 {
00877 m_iRTT = ib.m_iRTT;
00878 m_iBandwidth = ib.m_iBandwidth;
00879 }
00880
00881 m_pCC = m_pCCFactory->create();
00882 m_pCC->m_UDT = m_SocketID;
00883 m_pCC->setMSS(m_iMSS);
00884 m_pCC->setMaxCWndSize(m_iFlowWindowSize);
00885 m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);
00886 m_pCC->setRcvRate(m_iDeliveryRate);
00887 m_pCC->setRTT(m_iRTT);
00888 m_pCC->setBandwidth(m_iBandwidth);
00889 m_pCC->init();
00890
00891 m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
00892 m_dCongestionWindow = m_pCC->m_dCWndSize;
00893
00894 m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6;
00895 memcpy(m_pPeerAddr, peer, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));
00896
00897
00898 m_bConnected = true;
00899
00900
00901 m_pRNode->m_bOnList = true;
00902 m_pRcvQueue->setNewEntry(this);
00903
00904
00905 CPacket response;
00906 int size = CHandShake::m_iContentSize;
00907 char* buffer = new char[size];
00908 hs->serialize(buffer, size);
00909 response.pack(0, NULL, buffer, size);
00910 response.m_iID = m_PeerID;
00911 m_pSndQueue->sendto(peer, response);
00912 delete [] buffer;
00913 }
00914
00915 void CUDT::close()
00916 {
00917 if (!m_bOpened)
00918 return;
00919
00920 if (0 != m_Linger.l_onoff)
00921 {
00922 uint64_t entertime = CTimer::getTime();
00923
00924 while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() > 0) && (CTimer::getTime() - entertime < m_Linger.l_linger * 1000000ULL))
00925 {
00926
00927 if (m_ullLingerExpiration >= entertime)
00928 break;
00929
00930 if (!m_bSynSending)
00931 {
00932
00933 if (0 == m_ullLingerExpiration)
00934 m_ullLingerExpiration = entertime + m_Linger.l_linger * 1000000ULL;
00935
00936 return;
00937 }
00938
00939 #ifndef WIN32
00940 timespec ts;
00941 ts.tv_sec = 0;
00942 ts.tv_nsec = 1000000;
00943 nanosleep(&ts, NULL);
00944 #else
00945 Sleep(1);
00946 #endif
00947 }
00948 }
00949
00950
00951 if (m_bConnected)
00952 m_pSndQueue->m_pSndUList->remove(this);
00953
00954
00955 try
00956 {
00957 for (set<int>::iterator i = m_sPollID.begin(); i != m_sPollID.end(); ++ i)
00958 s_UDTUnited.m_EPoll.remove_usock(*i, m_SocketID);
00959 }
00960 catch (...)
00961 {
00962 }
00963
00964 if (!m_bOpened)
00965 return;
00966
00967
00968 m_bClosing = true;
00969
00970 CGuard cg(m_ConnectionLock);
00971
00972
00973 releaseSynch();
00974
00975 if (m_bListening)
00976 {
00977 m_bListening = false;
00978 m_pRcvQueue->removeListener(this);
00979 }
00980 else if (m_bConnecting)
00981 {
00982 m_pRcvQueue->removeConnector(m_SocketID);
00983 }
00984
00985 if (m_bConnected)
00986 {
00987 if (!m_bShutdown)
00988 sendCtrl(5);
00989
00990 m_pCC->close();
00991
00992
00993 CInfoBlock ib;
00994 ib.m_iIPversion = m_iIPversion;
00995 CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);
00996 ib.m_iRTT = m_iRTT;
00997 ib.m_iBandwidth = m_iBandwidth;
00998 m_pCache->update(&ib);
00999
01000 m_bConnected = false;
01001 }
01002
01003
01004 CGuard sendguard(m_SendLock);
01005 CGuard recvguard(m_RecvLock);
01006
01007
01008 m_bOpened = false;
01009 }
01010
01011 int CUDT::send(const char* data, int len)
01012 {
01013 if (UDT_DGRAM == m_iSockType)
01014 throw CUDTException(5, 10, 0);
01015
01016
01017 if (m_bBroken || m_bClosing)
01018 throw CUDTException(2, 1, 0);
01019 else if (!m_bConnected)
01020 throw CUDTException(2, 2, 0);
01021
01022 if (len <= 0)
01023 return 0;
01024
01025 CGuard sendguard(m_SendLock);
01026
01027 if (m_pSndBuffer->getCurrBufSize() == 0)
01028 {
01029
01030 uint64_t currtime;
01031 CTimer::rdtsc(currtime);
01032 m_ullLastRspTime = currtime;
01033 }
01034
01035 if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())
01036 {
01037 if (!m_bSynSending)
01038 throw CUDTException(6, 1, 0);
01039 else
01040 {
01041
01042 #ifndef WIN32
01043 pthread_mutex_lock(&m_SendBlockLock);
01044 if (m_iSndTimeOut < 0)
01045 {
01046 while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth)
01047 pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock);
01048 }
01049 else
01050 {
01051 uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL;
01052 timespec locktime;
01053
01054 locktime.tv_sec = exptime / 1000000;
01055 locktime.tv_nsec = (exptime % 1000000) * 1000;
01056
01057 while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth && (CTimer::getTime() < exptime))
01058 pthread_cond_timedwait(&m_SendBlockCond, &m_SendBlockLock, &locktime);
01059 }
01060 pthread_mutex_unlock(&m_SendBlockLock);
01061 #else
01062 if (m_iSndTimeOut < 0)
01063 {
01064 while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth)
01065 WaitForSingleObject(m_SendBlockCond, INFINITE);
01066 }
01067 else
01068 {
01069 uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL;
01070
01071 while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth && (CTimer::getTime() < exptime))
01072 WaitForSingleObject(m_SendBlockCond, DWORD((exptime - CTimer::getTime()) / 1000));
01073 }
01074 #endif
01075
01076
01077 if (m_bBroken || m_bClosing)
01078 throw CUDTException(2, 1, 0);
01079 else if (!m_bConnected)
01080 throw CUDTException(2, 2, 0);
01081 else if (!m_bPeerHealth)
01082 {
01083 m_bPeerHealth = true;
01084 throw CUDTException(7);
01085 }
01086 }
01087 }
01088
01089 if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())
01090 {
01091 if (m_iSndTimeOut >= 0)
01092 throw CUDTException(6, 3, 0);
01093
01094 return 0;
01095 }
01096
01097 int size = (m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize;
01098 if (size > len)
01099 size = len;
01100
01101
01102 if (0 == m_pSndBuffer->getCurrBufSize())
01103 m_llSndDurationCounter = CTimer::getTime();
01104
01105
01106 m_pSndBuffer->addBuffer(data, size);
01107
01108
01109 m_pSndQueue->m_pSndUList->update(this, false);
01110
01111 if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())
01112 {
01113
01114 s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);
01115 }
01116
01117 return size;
01118 }
01119
01120 int CUDT::recv(char* data, int len)
01121 {
01122 if (UDT_DGRAM == m_iSockType)
01123 throw CUDTException(5, 10, 0);
01124
01125
01126 if (!m_bConnected)
01127 throw CUDTException(2, 2, 0);
01128 else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize()))
01129 throw CUDTException(2, 1, 0);
01130
01131 if (len <= 0)
01132 return 0;
01133
01134 CGuard recvguard(m_RecvLock);
01135
01136 if (0 == m_pRcvBuffer->getRcvDataSize())
01137 {
01138 if (!m_bSynRecving)
01139 throw CUDTException(6, 2, 0);
01140 else
01141 {
01142 #ifndef WIN32
01143 pthread_mutex_lock(&m_RecvDataLock);
01144 if (m_iRcvTimeOut < 0)
01145 {
01146 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
01147 pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock);
01148 }
01149 else
01150 {
01151 uint64_t exptime = CTimer::getTime() + m_iRcvTimeOut * 1000ULL;
01152 timespec locktime;
01153
01154 locktime.tv_sec = exptime / 1000000;
01155 locktime.tv_nsec = (exptime % 1000000) * 1000;
01156
01157 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
01158 {
01159 pthread_cond_timedwait(&m_RecvDataCond, &m_RecvDataLock, &locktime);
01160 if (CTimer::getTime() >= exptime)
01161 break;
01162 }
01163 }
01164 pthread_mutex_unlock(&m_RecvDataLock);
01165 #else
01166 if (m_iRcvTimeOut < 0)
01167 {
01168 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
01169 WaitForSingleObject(m_RecvDataCond, INFINITE);
01170 }
01171 else
01172 {
01173 uint64_t enter_time = CTimer::getTime();
01174
01175 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
01176 {
01177 int diff = int(CTimer::getTime() - enter_time) / 1000;
01178 if (diff >= m_iRcvTimeOut)
01179 break;
01180 WaitForSingleObject(m_RecvDataCond, DWORD(m_iRcvTimeOut - diff ));
01181 }
01182 }
01183 #endif
01184 }
01185 }
01186
01187
01188 if (!m_bConnected)
01189 throw CUDTException(2, 2, 0);
01190 else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize()))
01191 throw CUDTException(2, 1, 0);
01192
01193 int res = m_pRcvBuffer->readBuffer(data, len);
01194
01195 if (m_pRcvBuffer->getRcvDataSize() <= 0)
01196 {
01197
01198 s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
01199 }
01200
01201 if ((res <= 0) && (m_iRcvTimeOut >= 0))
01202 throw CUDTException(6, 3, 0);
01203
01204 return res;
01205 }
01206
01207 int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder)
01208 {
01209 if (UDT_STREAM == m_iSockType)
01210 throw CUDTException(5, 9, 0);
01211
01212
01213 if (m_bBroken || m_bClosing)
01214 throw CUDTException(2, 1, 0);
01215 else if (!m_bConnected)
01216 throw CUDTException(2, 2, 0);
01217
01218 if (len <= 0)
01219 return 0;
01220
01221 if (len > m_iSndBufSize * m_iPayloadSize)
01222 throw CUDTException(5, 12, 0);
01223
01224 CGuard sendguard(m_SendLock);
01225
01226 if (m_pSndBuffer->getCurrBufSize() == 0)
01227 {
01228
01229 uint64_t currtime;
01230 CTimer::rdtsc(currtime);
01231 m_ullLastRspTime = currtime;
01232 }
01233
01234 if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len)
01235 {
01236 if (!m_bSynSending)
01237 throw CUDTException(6, 1, 0);
01238 else
01239 {
01240
01241 #ifndef WIN32
01242 pthread_mutex_lock(&m_SendBlockLock);
01243 if (m_iSndTimeOut < 0)
01244 {
01245 while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len))
01246 pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock);
01247 }
01248 else
01249 {
01250 uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL;
01251 timespec locktime;
01252
01253 locktime.tv_sec = exptime / 1000000;
01254 locktime.tv_nsec = (exptime % 1000000) * 1000;
01255
01256 while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) && (CTimer::getTime() < exptime))
01257 pthread_cond_timedwait(&m_SendBlockCond, &m_SendBlockLock, &locktime);
01258 }
01259 pthread_mutex_unlock(&m_SendBlockLock);
01260 #else
01261 if (m_iSndTimeOut < 0)
01262 {
01263 while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len))
01264 WaitForSingleObject(m_SendBlockCond, INFINITE);
01265 }
01266 else
01267 {
01268 uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL;
01269
01270 while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) && (CTimer::getTime() < exptime))
01271 WaitForSingleObject(m_SendBlockCond, DWORD((exptime - CTimer::getTime()) / 1000));
01272 }
01273 #endif
01274
01275
01276 if (m_bBroken || m_bClosing)
01277 throw CUDTException(2, 1, 0);
01278 else if (!m_bConnected)
01279 throw CUDTException(2, 2, 0);
01280 }
01281 }
01282
01283 if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len)
01284 {
01285 if (m_iSndTimeOut >= 0)
01286 throw CUDTException(6, 3, 0);
01287
01288 return 0;
01289 }
01290
01291
01292 if (0 == m_pSndBuffer->getCurrBufSize())
01293 m_llSndDurationCounter = CTimer::getTime();
01294
01295
01296 m_pSndBuffer->addBuffer(data, len, msttl, inorder);
01297
01298
01299 m_pSndQueue->m_pSndUList->update(this, false);
01300
01301 if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())
01302 {
01303
01304 s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);
01305 }
01306
01307 return len;
01308 }
01309
01310 int CUDT::recvmsg(char* data, int len)
01311 {
01312 if (UDT_STREAM == m_iSockType)
01313 throw CUDTException(5, 9, 0);
01314
01315
01316 if (!m_bConnected)
01317 throw CUDTException(2, 2, 0);
01318
01319 if (len <= 0)
01320 return 0;
01321
01322 CGuard recvguard(m_RecvLock);
01323
01324 if (m_bBroken || m_bClosing)
01325 {
01326 int res = m_pRcvBuffer->readMsg(data, len);
01327
01328 if (m_pRcvBuffer->getRcvMsgNum() <= 0)
01329 {
01330
01331 s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
01332 }
01333
01334 if (0 == res)
01335 throw CUDTException(2, 1, 0);
01336 else
01337 return res;
01338 }
01339
01340 if (!m_bSynRecving)
01341 {
01342 int res = m_pRcvBuffer->readMsg(data, len);
01343 if (0 == res)
01344 throw CUDTException(6, 2, 0);
01345 else
01346 return res;
01347 }
01348
01349 int res = 0;
01350 bool timeout = false;
01351
01352 do
01353 {
01354 #ifndef WIN32
01355 pthread_mutex_lock(&m_RecvDataLock);
01356
01357 if (m_iRcvTimeOut < 0)
01358 {
01359 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_pRcvBuffer->readMsg(data, len))))
01360 pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock);
01361 }
01362 else
01363 {
01364 uint64_t exptime = CTimer::getTime() + m_iRcvTimeOut * 1000ULL;
01365 timespec locktime;
01366
01367 locktime.tv_sec = exptime / 1000000;
01368 locktime.tv_nsec = (exptime % 1000000) * 1000;
01369
01370 if (pthread_cond_timedwait(&m_RecvDataCond, &m_RecvDataLock, &locktime) == ETIMEDOUT)
01371 timeout = true;
01372
01373 res = m_pRcvBuffer->readMsg(data, len);
01374 }
01375 pthread_mutex_unlock(&m_RecvDataLock);
01376 #else
01377 if (m_iRcvTimeOut < 0)
01378 {
01379 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_pRcvBuffer->readMsg(data, len))))
01380 WaitForSingleObject(m_RecvDataCond, INFINITE);
01381 }
01382 else
01383 {
01384 if (WaitForSingleObject(m_RecvDataCond, DWORD(m_iRcvTimeOut)) == WAIT_TIMEOUT)
01385 timeout = true;
01386
01387 res = m_pRcvBuffer->readMsg(data, len);
01388 }
01389 #endif
01390
01391 if (m_bBroken || m_bClosing)
01392 throw CUDTException(2, 1, 0);
01393 else if (!m_bConnected)
01394 throw CUDTException(2, 2, 0);
01395 } while ((0 == res) && !timeout);
01396
01397 if (m_pRcvBuffer->getRcvMsgNum() <= 0)
01398 {
01399
01400 s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
01401 }
01402
01403 if ((res <= 0) && (m_iRcvTimeOut >= 0))
01404 throw CUDTException(6, 3, 0);
01405
01406 return res;
01407 }
01408
01409 int64_t CUDT::sendfile(fstream& ifs, int64_t& offset, int64_t size, int block)
01410 {
01411 if (UDT_DGRAM == m_iSockType)
01412 throw CUDTException(5, 10, 0);
01413
01414 if (m_bBroken || m_bClosing)
01415 throw CUDTException(2, 1, 0);
01416 else if (!m_bConnected)
01417 throw CUDTException(2, 2, 0);
01418
01419 if (size <= 0)
01420 return 0;
01421
01422 CGuard sendguard(m_SendLock);
01423
01424 if (m_pSndBuffer->getCurrBufSize() == 0)
01425 {
01426
01427 uint64_t currtime;
01428 CTimer::rdtsc(currtime);
01429 m_ullLastRspTime = currtime;
01430 }
01431
01432 int64_t tosend = size;
01433 int unitsize;
01434
01435
01436 try
01437 {
01438 ifs.seekg((streamoff)offset);
01439 }
01440 catch (...)
01441 {
01442 throw CUDTException(4, 1);
01443 }
01444
01445
01446 while (tosend > 0)
01447 {
01448 if (ifs.fail())
01449 throw CUDTException(4, 4);
01450
01451 if (ifs.eof())
01452 break;
01453
01454 unitsize = int((tosend >= block) ? block : tosend);
01455
01456 #ifndef WIN32
01457 pthread_mutex_lock(&m_SendBlockLock);
01458 while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth)
01459 pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock);
01460 pthread_mutex_unlock(&m_SendBlockLock);
01461 #else
01462 while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth)
01463 WaitForSingleObject(m_SendBlockCond, INFINITE);
01464 #endif
01465
01466 if (m_bBroken || m_bClosing)
01467 throw CUDTException(2, 1, 0);
01468 else if (!m_bConnected)
01469 throw CUDTException(2, 2, 0);
01470 else if (!m_bPeerHealth)
01471 {
01472
01473 m_bPeerHealth = true;
01474 throw CUDTException(7);
01475 }
01476
01477
01478 if (0 == m_pSndBuffer->getCurrBufSize())
01479 m_llSndDurationCounter = CTimer::getTime();
01480
01481 int64_t sentsize = m_pSndBuffer->addBufferFromFile(ifs, unitsize);
01482
01483 if (sentsize > 0)
01484 {
01485 tosend -= sentsize;
01486 offset += sentsize;
01487 }
01488
01489
01490 m_pSndQueue->m_pSndUList->update(this, false);
01491 }
01492
01493 if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())
01494 {
01495
01496 s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);
01497 }
01498
01499 return size - tosend;
01500 }
01501
01502 int64_t CUDT::recvfile(fstream& ofs, int64_t& offset, int64_t size, int block)
01503 {
01504 if (UDT_DGRAM == m_iSockType)
01505 throw CUDTException(5, 10, 0);
01506
01507 if (!m_bConnected)
01508 throw CUDTException(2, 2, 0);
01509 else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize()))
01510 throw CUDTException(2, 1, 0);
01511
01512 if (size <= 0)
01513 return 0;
01514
01515 CGuard recvguard(m_RecvLock);
01516
01517 int64_t torecv = size;
01518 int unitsize = block;
01519 int recvsize;
01520
01521
01522 try
01523 {
01524 ofs.seekp((streamoff)offset);
01525 }
01526 catch (...)
01527 {
01528 throw CUDTException(4, 3);
01529 }
01530
01531
01532 while (torecv > 0)
01533 {
01534 if (ofs.fail())
01535 {
01536
01537 int32_t err_code = CUDTException::EFILE;
01538 sendCtrl(8, &err_code);
01539
01540 throw CUDTException(4, 4);
01541 }
01542
01543 #ifndef WIN32
01544 pthread_mutex_lock(&m_RecvDataLock);
01545 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
01546 pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock);
01547 pthread_mutex_unlock(&m_RecvDataLock);
01548 #else
01549 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
01550 WaitForSingleObject(m_RecvDataCond, INFINITE);
01551 #endif
01552
01553 if (!m_bConnected)
01554 throw CUDTException(2, 2, 0);
01555 else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize()))
01556 throw CUDTException(2, 1, 0);
01557
01558 unitsize = int((torecv >= block) ? block : torecv);
01559 recvsize = m_pRcvBuffer->readBufferToFile(ofs, unitsize);
01560
01561 if (recvsize > 0)
01562 {
01563 torecv -= recvsize;
01564 offset += recvsize;
01565 }
01566 }
01567
01568 if (m_pRcvBuffer->getRcvDataSize() <= 0)
01569 {
01570
01571 s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
01572 }
01573
01574 return size - torecv;
01575 }
01576
01577 void CUDT::sample(CPerfMon* perf, bool clear)
01578 {
01579 if (!m_bConnected)
01580 throw CUDTException(2, 2, 0);
01581 if (m_bBroken || m_bClosing)
01582 throw CUDTException(2, 1, 0);
01583
01584 uint64_t currtime = CTimer::getTime();
01585 perf->msTimeStamp = (currtime - m_StartTime) / 1000;
01586
01587 perf->pktSent = m_llTraceSent;
01588 perf->pktRecv = m_llTraceRecv;
01589 perf->pktSndLoss = m_iTraceSndLoss;
01590 perf->pktRcvLoss = m_iTraceRcvLoss;
01591 perf->pktRetrans = m_iTraceRetrans;
01592 perf->pktSentACK = m_iSentACK;
01593 perf->pktRecvACK = m_iRecvACK;
01594 perf->pktSentNAK = m_iSentNAK;
01595 perf->pktRecvNAK = m_iRecvNAK;
01596 perf->usSndDuration = m_llSndDuration;
01597
01598 perf->pktSentTotal = m_llSentTotal;
01599 perf->pktRecvTotal = m_llRecvTotal;
01600 perf->pktSndLossTotal = m_iSndLossTotal;
01601 perf->pktRcvLossTotal = m_iRcvLossTotal;
01602 perf->pktRetransTotal = m_iRetransTotal;
01603 perf->pktSentACKTotal = m_iSentACKTotal;
01604 perf->pktRecvACKTotal = m_iRecvACKTotal;
01605 perf->pktSentNAKTotal = m_iSentNAKTotal;
01606 perf->pktRecvNAKTotal = m_iRecvNAKTotal;
01607 perf->usSndDurationTotal = m_llSndDurationTotal;
01608
01609 double interval = double(currtime - m_LastSampleTime);
01610
01611 perf->mbpsSendRate = double(m_llTraceSent) * m_iPayloadSize * 8.0 / interval;
01612 perf->mbpsRecvRate = double(m_llTraceRecv) * m_iPayloadSize * 8.0 / interval;
01613
01614 perf->usPktSndPeriod = m_ullInterval / double(m_ullCPUFrequency);
01615 perf->pktFlowWindow = m_iFlowWindowSize;
01616 perf->pktCongestionWindow = (int)m_dCongestionWindow;
01617 perf->pktFlightSize = CSeqNo::seqlen(m_iSndLastAck, CSeqNo::incseq(m_iSndCurrSeqNo)) - 1;
01618 perf->msRTT = m_iRTT/1000.0;
01619 perf->mbpsBandwidth = m_iBandwidth * m_iPayloadSize * 8.0 / 1000000.0;
01620
01621 #ifndef WIN32
01622 if (0 == pthread_mutex_trylock(&m_ConnectionLock))
01623 #else
01624 if (WAIT_OBJECT_0 == WaitForSingleObject(m_ConnectionLock, 0))
01625 #endif
01626 {
01627 perf->byteAvailSndBuf = (NULL == m_pSndBuffer) ? 0 : (m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iMSS;
01628 perf->byteAvailRcvBuf = (NULL == m_pRcvBuffer) ? 0 : m_pRcvBuffer->getAvailBufSize() * m_iMSS;
01629
01630 #ifndef WIN32
01631 pthread_mutex_unlock(&m_ConnectionLock);
01632 #else
01633 ReleaseMutex(m_ConnectionLock);
01634 #endif
01635 }
01636 else
01637 {
01638 perf->byteAvailSndBuf = 0;
01639 perf->byteAvailRcvBuf = 0;
01640 }
01641
01642 if (clear)
01643 {
01644 m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceRcvLoss = m_iTraceRetrans = m_iSentACK = m_iRecvACK = m_iSentNAK = m_iRecvNAK = 0;
01645 m_llSndDuration = 0;
01646 m_LastSampleTime = currtime;
01647 }
01648 }
01649
01650 void CUDT::CCUpdate()
01651 {
01652 m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
01653 m_dCongestionWindow = m_pCC->m_dCWndSize;
01654
01655 if (m_llMaxBW <= 0)
01656 return;
01657 const double minSP = 1000000.0 / (double(m_llMaxBW) / m_iMSS) * m_ullCPUFrequency;
01658 if (m_ullInterval < minSP)
01659 m_ullInterval = minSP;
01660 }
01661
01662 void CUDT::initSynch()
01663 {
01664 #ifndef WIN32
01665 pthread_mutex_init(&m_SendBlockLock, NULL);
01666 pthread_cond_init(&m_SendBlockCond, NULL);
01667 pthread_mutex_init(&m_RecvDataLock, NULL);
01668 pthread_cond_init(&m_RecvDataCond, NULL);
01669 pthread_mutex_init(&m_SendLock, NULL);
01670 pthread_mutex_init(&m_RecvLock, NULL);
01671 pthread_mutex_init(&m_AckLock, NULL);
01672 pthread_mutex_init(&m_ConnectionLock, NULL);
01673 #else
01674 m_SendBlockLock = CreateMutex(NULL, false, NULL);
01675 m_SendBlockCond = CreateEvent(NULL, false, false, NULL);
01676 m_RecvDataLock = CreateMutex(NULL, false, NULL);
01677 m_RecvDataCond = CreateEvent(NULL, false, false, NULL);
01678 m_SendLock = CreateMutex(NULL, false, NULL);
01679 m_RecvLock = CreateMutex(NULL, false, NULL);
01680 m_AckLock = CreateMutex(NULL, false, NULL);
01681 m_ConnectionLock = CreateMutex(NULL, false, NULL);
01682 #endif
01683 }
01684
01685 void CUDT::destroySynch()
01686 {
01687 #ifndef WIN32
01688 pthread_mutex_destroy(&m_SendBlockLock);
01689 pthread_cond_destroy(&m_SendBlockCond);
01690 pthread_mutex_destroy(&m_RecvDataLock);
01691 pthread_cond_destroy(&m_RecvDataCond);
01692 pthread_mutex_destroy(&m_SendLock);
01693 pthread_mutex_destroy(&m_RecvLock);
01694 pthread_mutex_destroy(&m_AckLock);
01695 pthread_mutex_destroy(&m_ConnectionLock);
01696 #else
01697 CloseHandle(m_SendBlockLock);
01698 CloseHandle(m_SendBlockCond);
01699 CloseHandle(m_RecvDataLock);
01700 CloseHandle(m_RecvDataCond);
01701 CloseHandle(m_SendLock);
01702 CloseHandle(m_RecvLock);
01703 CloseHandle(m_AckLock);
01704 CloseHandle(m_ConnectionLock);
01705 #endif
01706 }
01707
01708 void CUDT::releaseSynch()
01709 {
01710 #ifndef WIN32
01711
01712 pthread_mutex_lock(&m_SendBlockLock);
01713 pthread_cond_signal(&m_SendBlockCond);
01714 pthread_mutex_unlock(&m_SendBlockLock);
01715
01716 pthread_mutex_lock(&m_SendLock);
01717 pthread_mutex_unlock(&m_SendLock);
01718
01719 pthread_mutex_lock(&m_RecvDataLock);
01720 pthread_cond_signal(&m_RecvDataCond);
01721 pthread_mutex_unlock(&m_RecvDataLock);
01722
01723 pthread_mutex_lock(&m_RecvLock);
01724 pthread_mutex_unlock(&m_RecvLock);
01725 #else
01726 SetEvent(m_SendBlockCond);
01727 WaitForSingleObject(m_SendLock, INFINITE);
01728 ReleaseMutex(m_SendLock);
01729 SetEvent(m_RecvDataCond);
01730 WaitForSingleObject(m_RecvLock, INFINITE);
01731 ReleaseMutex(m_RecvLock);
01732 #endif
01733 }
01734
01735 void CUDT::sendCtrl(int pkttype, void* lparam, void* rparam, int size)
01736 {
01737 CPacket ctrlpkt;
01738
01739 switch (pkttype)
01740 {
01741 case 2:
01742 {
01743 int32_t ack;
01744
01745
01746
01747 if (0 == m_pRcvLossList->getLossLength())
01748 ack = CSeqNo::incseq(m_iRcvCurrSeqNo);
01749 else
01750 ack = m_pRcvLossList->getFirstLostSeq();
01751
01752 if (ack == m_iRcvLastAckAck)
01753 break;
01754
01755
01756
01757 if (4 == size)
01758 {
01759 ctrlpkt.pack(pkttype, NULL, &ack, size);
01760 ctrlpkt.m_iID = m_PeerID;
01761 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01762
01763 break;
01764 }
01765
01766 uint64_t currtime;
01767 CTimer::rdtsc(currtime);
01768
01769
01770 if (CSeqNo::seqcmp(ack, m_iRcvLastAck) > 0)
01771 {
01772 int acksize = CSeqNo::seqoff(m_iRcvLastAck, ack);
01773
01774 m_iRcvLastAck = ack;
01775
01776 m_pRcvBuffer->ackData(acksize);
01777
01778
01779 #ifndef WIN32
01780 pthread_mutex_lock(&m_RecvDataLock);
01781 if (m_bSynRecving)
01782 pthread_cond_signal(&m_RecvDataCond);
01783 pthread_mutex_unlock(&m_RecvDataLock);
01784 #else
01785 if (m_bSynRecving)
01786 SetEvent(m_RecvDataCond);
01787 #endif
01788
01789
01790 s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, true);
01791 }
01792 else if (ack == m_iRcvLastAck)
01793 {
01794 if ((currtime - m_ullLastAckTime) < ((m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency))
01795 break;
01796 }
01797 else
01798 break;
01799
01800
01801 if (CSeqNo::seqcmp(m_iRcvLastAck, m_iRcvLastAckAck) > 0)
01802 {
01803 int32_t data[6];
01804
01805 m_iAckSeqNo = CAckNo::incack(m_iAckSeqNo);
01806 data[0] = m_iRcvLastAck;
01807 data[1] = m_iRTT;
01808 data[2] = m_iRTTVar;
01809 data[3] = m_pRcvBuffer->getAvailBufSize();
01810
01811 if (data[3] < 2)
01812 data[3] = 2;
01813
01814 if (currtime - m_ullLastAckTime > m_ullSYNInt)
01815 {
01816 data[4] = m_pRcvTimeWindow->getPktRcvSpeed();
01817 data[5] = m_pRcvTimeWindow->getBandwidth();
01818 ctrlpkt.pack(pkttype, &m_iAckSeqNo, data, 24);
01819
01820 CTimer::rdtsc(m_ullLastAckTime);
01821 }
01822 else
01823 {
01824 ctrlpkt.pack(pkttype, &m_iAckSeqNo, data, 16);
01825 }
01826
01827 ctrlpkt.m_iID = m_PeerID;
01828 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01829
01830 m_pACKWindow->store(m_iAckSeqNo, m_iRcvLastAck);
01831
01832 ++ m_iSentACK;
01833 ++ m_iSentACKTotal;
01834 }
01835
01836 break;
01837 }
01838
01839 case 6:
01840 ctrlpkt.pack(pkttype, lparam);
01841 ctrlpkt.m_iID = m_PeerID;
01842 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01843
01844 break;
01845
01846 case 3:
01847 {
01848 if (NULL != rparam)
01849 {
01850 if (1 == size)
01851 {
01852
01853 ctrlpkt.pack(pkttype, NULL, (int32_t *)rparam + 1, 4);
01854 }
01855 else
01856 {
01857
01858 ctrlpkt.pack(pkttype, NULL, rparam, 8);
01859 }
01860
01861 ctrlpkt.m_iID = m_PeerID;
01862 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01863
01864 ++ m_iSentNAK;
01865 ++ m_iSentNAKTotal;
01866 }
01867 else if (m_pRcvLossList->getLossLength() > 0)
01868 {
01869
01870
01871
01872 int32_t* data = new int32_t[m_iPayloadSize / 4];
01873 int losslen;
01874 m_pRcvLossList->getLossArray(data, losslen, m_iPayloadSize / 4);
01875
01876 if (0 < losslen)
01877 {
01878 ctrlpkt.pack(pkttype, NULL, data, losslen * 4);
01879 ctrlpkt.m_iID = m_PeerID;
01880 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01881
01882 ++ m_iSentNAK;
01883 ++ m_iSentNAKTotal;
01884 }
01885
01886 delete [] data;
01887 }
01888
01889
01890 m_ullNAKInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency;
01891 int rcv_speed = m_pRcvTimeWindow->getPktRcvSpeed();
01892 if (rcv_speed > 0)
01893 m_ullNAKInt += (m_pRcvLossList->getLossLength() * 1000000ULL / rcv_speed) * m_ullCPUFrequency;
01894 if (m_ullNAKInt < m_ullMinNakInt)
01895 m_ullNAKInt = m_ullMinNakInt;
01896
01897 break;
01898 }
01899
01900 case 4:
01901 ctrlpkt.pack(pkttype);
01902 ctrlpkt.m_iID = m_PeerID;
01903 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01904
01905 CTimer::rdtsc(m_ullLastWarningTime);
01906
01907 break;
01908
01909 case 1:
01910 ctrlpkt.pack(pkttype);
01911 ctrlpkt.m_iID = m_PeerID;
01912 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01913
01914 break;
01915
01916 case 0:
01917 ctrlpkt.pack(pkttype, NULL, rparam, sizeof(CHandShake));
01918 ctrlpkt.m_iID = m_PeerID;
01919 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01920
01921 break;
01922
01923 case 5:
01924 ctrlpkt.pack(pkttype);
01925 ctrlpkt.m_iID = m_PeerID;
01926 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01927
01928 break;
01929
01930 case 7:
01931 ctrlpkt.pack(pkttype, lparam, rparam, 8);
01932 ctrlpkt.m_iID = m_PeerID;
01933 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01934
01935 break;
01936
01937 case 8:
01938 ctrlpkt.pack(pkttype, lparam);
01939 ctrlpkt.m_iID = m_PeerID;
01940 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
01941
01942 break;
01943
01944 case 32767:
01945 break;
01946
01947 default:
01948 break;
01949 }
01950 }
01951
01952 void CUDT::processCtrl(CPacket& ctrlpkt)
01953 {
01954
01955 m_iEXPCount = 1;
01956 uint64_t currtime;
01957 CTimer::rdtsc(currtime);
01958 m_ullLastRspTime = currtime;
01959
01960 switch (ctrlpkt.getType())
01961 {
01962 case 2:
01963 {
01964 int32_t ack;
01965
01966
01967 if (4 == ctrlpkt.getLength())
01968 {
01969 ack = *(int32_t *)ctrlpkt.m_pcData;
01970 if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0)
01971 {
01972 m_iFlowWindowSize -= CSeqNo::seqoff(m_iSndLastAck, ack);
01973 m_iSndLastAck = ack;
01974 }
01975
01976 break;
01977 }
01978
01979
01980 ack = ctrlpkt.getAckSeqNo();
01981
01982
01983
01984 uint64_t now = CTimer::getTime();
01985 if ((currtime - m_ullSndLastAck2Time > (uint64_t)m_iSYNInterval) || (ack == m_iSndLastAck2))
01986 {
01987 sendCtrl(6, &ack);
01988 m_iSndLastAck2 = ack;
01989 m_ullSndLastAck2Time = now;
01990 }
01991
01992
01993 ack = *(int32_t *)ctrlpkt.m_pcData;
01994
01995
01996 if (CSeqNo::seqcmp(ack, CSeqNo::incseq(m_iSndCurrSeqNo)) > 0)
01997 {
01998
01999 m_bBroken = true;
02000 m_iBrokenCounter = 0;
02001 break;
02002 }
02003
02004 if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0)
02005 {
02006
02007 m_iFlowWindowSize = *((int32_t *)ctrlpkt.m_pcData + 3);
02008 m_iSndLastAck = ack;
02009 }
02010
02011
02012 CGuard::enterCS(m_AckLock);
02013
02014 int offset = CSeqNo::seqoff(m_iSndLastDataAck, ack);
02015 if (offset <= 0)
02016 {
02017
02018 CGuard::leaveCS(m_AckLock);
02019 break;
02020 }
02021
02022
02023 m_pSndBuffer->ackData(offset);
02024
02025
02026 m_llSndDuration += currtime - m_llSndDurationCounter;
02027 m_llSndDurationTotal += currtime - m_llSndDurationCounter;
02028 m_llSndDurationCounter = currtime;
02029
02030
02031 m_iSndLastDataAck = ack;
02032 m_pSndLossList->remove(CSeqNo::decseq(m_iSndLastDataAck));
02033
02034 CGuard::leaveCS(m_AckLock);
02035
02036 #ifndef WIN32
02037 pthread_mutex_lock(&m_SendBlockLock);
02038 if (m_bSynSending)
02039 pthread_cond_signal(&m_SendBlockCond);
02040 pthread_mutex_unlock(&m_SendBlockLock);
02041 #else
02042 if (m_bSynSending)
02043 SetEvent(m_SendBlockCond);
02044 #endif
02045
02046
02047 s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);
02048
02049
02050 m_pSndQueue->m_pSndUList->update(this, false);
02051
02052
02053
02054
02055 int rtt = *((int32_t *)ctrlpkt.m_pcData + 1);
02056 m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2;
02057 m_iRTT = (m_iRTT * 7 + rtt) >> 3;
02058
02059 m_pCC->setRTT(m_iRTT);
02060
02061 if (ctrlpkt.getLength() > 16)
02062 {
02063
02064 if (*((int32_t *)ctrlpkt.m_pcData + 4) > 0)
02065 m_iDeliveryRate = (m_iDeliveryRate * 7 + *((int32_t *)ctrlpkt.m_pcData + 4)) >> 3;
02066
02067 if (*((int32_t *)ctrlpkt.m_pcData + 5) > 0)
02068 m_iBandwidth = (m_iBandwidth * 7 + *((int32_t *)ctrlpkt.m_pcData + 5)) >> 3;
02069
02070 m_pCC->setRcvRate(m_iDeliveryRate);
02071 m_pCC->setBandwidth(m_iBandwidth);
02072 }
02073
02074 m_pCC->onACK(ack);
02075 CCUpdate();
02076
02077 ++ m_iRecvACK;
02078 ++ m_iRecvACKTotal;
02079
02080 break;
02081 }
02082
02083 case 6:
02084 {
02085 int32_t ack;
02086 int rtt = -1;
02087
02088
02089 rtt = m_pACKWindow->acknowledge(ctrlpkt.getAckSeqNo(), ack);
02090 if (rtt <= 0)
02091 break;
02092
02093
02094
02095
02096
02097 m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2;
02098 m_iRTT = (m_iRTT * 7 + rtt) >> 3;
02099
02100 m_pCC->setRTT(m_iRTT);
02101
02102
02103 if (CSeqNo::seqcmp(ack, m_iRcvLastAckAck) > 0)
02104 m_iRcvLastAckAck = ack;
02105
02106 break;
02107 }
02108
02109 case 3:
02110 {
02111 int32_t* losslist = (int32_t *)(ctrlpkt.m_pcData);
02112
02113 m_pCC->onLoss(losslist, ctrlpkt.getLength() / 4);
02114 CCUpdate();
02115
02116 bool secure = true;
02117
02118
02119 for (int i = 0, n = (int)(ctrlpkt.getLength() / 4); i < n; ++ i)
02120 {
02121 if (0 != (losslist[i] & 0x80000000))
02122 {
02123 if ((CSeqNo::seqcmp(losslist[i] & 0x7FFFFFFF, losslist[i + 1]) > 0) || (CSeqNo::seqcmp(losslist[i + 1], m_iSndCurrSeqNo) > 0))
02124 {
02125
02126 secure = false;
02127 break;
02128 }
02129
02130 int num = 0;
02131 if (CSeqNo::seqcmp(losslist[i] & 0x7FFFFFFF, m_iSndLastAck) >= 0)
02132 num = m_pSndLossList->insert(losslist[i] & 0x7FFFFFFF, losslist[i + 1]);
02133 else if (CSeqNo::seqcmp(losslist[i + 1], m_iSndLastAck) >= 0)
02134 num = m_pSndLossList->insert(m_iSndLastAck, losslist[i + 1]);
02135
02136 m_iTraceSndLoss += num;
02137 m_iSndLossTotal += num;
02138
02139 ++ i;
02140 }
02141 else if (CSeqNo::seqcmp(losslist[i], m_iSndLastAck) >= 0)
02142 {
02143 if (CSeqNo::seqcmp(losslist[i], m_iSndCurrSeqNo) > 0)
02144 {
02145
02146 secure = false;
02147 break;
02148 }
02149
02150 int num = m_pSndLossList->insert(losslist[i], losslist[i]);
02151
02152 m_iTraceSndLoss += num;
02153 m_iSndLossTotal += num;
02154 }
02155 }
02156
02157 if (!secure)
02158 {
02159
02160 m_bBroken = true;
02161 m_iBrokenCounter = 0;
02162 break;
02163 }
02164
02165
02166 m_pSndQueue->m_pSndUList->update(this);
02167
02168 ++ m_iRecvNAK;
02169 ++ m_iRecvNAKTotal;
02170
02171 break;
02172 }
02173
02174 case 4:
02175
02176 m_ullInterval = (uint64_t)ceil(m_ullInterval * 1.125);
02177 m_iLastDecSeq = m_iSndCurrSeqNo;
02178
02179 break;
02180
02181 case 1:
02182
02183
02184
02185 break;
02186
02187 case 0:
02188 {
02189 CHandShake req;
02190 req.deserialize(ctrlpkt.m_pcData, ctrlpkt.getLength());
02191 if ((req.m_iReqType > 0) || (m_bRendezvous && (req.m_iReqType != -2)))
02192 {
02193
02194
02195
02196 CHandShake initdata;
02197 initdata.m_iISN = m_iISN;
02198 initdata.m_iMSS = m_iMSS;
02199 initdata.m_iFlightFlagSize = m_iFlightFlagSize;
02200 initdata.m_iReqType = (!m_bRendezvous) ? -1 : -2;
02201 initdata.m_iID = m_SocketID;
02202
02203 char* hs = new char [m_iPayloadSize];
02204 int hs_size = m_iPayloadSize;
02205 initdata.serialize(hs, hs_size);
02206 sendCtrl(0, NULL, hs, hs_size);
02207 delete [] hs;
02208 }
02209
02210 break;
02211 }
02212
02213 case 5:
02214 m_bShutdown = true;
02215 m_bClosing = true;
02216 m_bBroken = true;
02217 m_iBrokenCounter = 60;
02218
02219
02220 releaseSynch();
02221
02222 CTimer::triggerEvent();
02223
02224 break;
02225
02226 case 7:
02227 m_pRcvBuffer->dropMsg(ctrlpkt.getMsgSeq());
02228 m_pRcvLossList->remove(*(int32_t*)ctrlpkt.m_pcData, *(int32_t*)(ctrlpkt.m_pcData + 4));
02229
02230
02231 if ((CSeqNo::seqcmp(*(int32_t*)ctrlpkt.m_pcData, CSeqNo::incseq(m_iRcvCurrSeqNo)) <= 0)
02232 && (CSeqNo::seqcmp(*(int32_t*)(ctrlpkt.m_pcData + 4), m_iRcvCurrSeqNo) > 0))
02233 {
02234 m_iRcvCurrSeqNo = *(int32_t*)(ctrlpkt.m_pcData + 4);
02235 }
02236
02237 break;
02238
02239 case 8:
02240
02241
02242
02243
02244
02245
02246 m_bPeerHealth = false;
02247
02248 break;
02249
02250 case 32767:
02251 m_pCC->processCustomMsg(&ctrlpkt);
02252 CCUpdate();
02253
02254 break;
02255
02256 default:
02257 break;
02258 }
02259 }
02260
02261 int CUDT::packData(CPacket& packet, uint64_t& ts)
02262 {
02263 int payload = 0;
02264 bool probe = false;
02265
02266 uint64_t entertime;
02267 CTimer::rdtsc(entertime);
02268
02269 if ((0 != m_ullTargetTime) && (entertime > m_ullTargetTime))
02270 m_ullTimeDiff += entertime - m_ullTargetTime;
02271
02272
02273 if ((packet.m_iSeqNo = m_pSndLossList->getLostSeq()) >= 0)
02274 {
02275
02276 CGuard ackguard(m_AckLock);
02277
02278 int offset = CSeqNo::seqoff(m_iSndLastDataAck, packet.m_iSeqNo);
02279 if (offset < 0)
02280 return 0;
02281
02282 int msglen;
02283
02284 payload = m_pSndBuffer->readData(&(packet.m_pcData), offset, packet.m_iMsgNo, msglen);
02285
02286 if (-1 == payload)
02287 {
02288 int32_t seqpair[2];
02289 seqpair[0] = packet.m_iSeqNo;
02290 seqpair[1] = CSeqNo::incseq(seqpair[0], msglen);
02291 sendCtrl(7, &packet.m_iMsgNo, seqpair, 8);
02292
02293
02294 m_pSndLossList->remove(seqpair[1]);
02295
02296
02297 if (CSeqNo::seqcmp(m_iSndCurrSeqNo, CSeqNo::incseq(seqpair[1])) < 0)
02298 m_iSndCurrSeqNo = CSeqNo::incseq(seqpair[1]);
02299
02300 return 0;
02301 }
02302 else if (0 == payload)
02303 return 0;
02304
02305 ++ m_iTraceRetrans;
02306 ++ m_iRetransTotal;
02307 }
02308 else
02309 {
02310
02311
02312
02313 int cwnd = (m_iFlowWindowSize < (int)m_dCongestionWindow) ? m_iFlowWindowSize : (int)m_dCongestionWindow;
02314 if (cwnd >= CSeqNo::seqlen(m_iSndLastAck, CSeqNo::incseq(m_iSndCurrSeqNo)))
02315 {
02316 if (0 != (payload = m_pSndBuffer->readData(&(packet.m_pcData), packet.m_iMsgNo)))
02317 {
02318 m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo);
02319 m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);
02320
02321 packet.m_iSeqNo = m_iSndCurrSeqNo;
02322
02323
02324 if (0 == (packet.m_iSeqNo & 0xF))
02325 probe = true;
02326 }
02327 else
02328 {
02329 m_ullTargetTime = 0;
02330 m_ullTimeDiff = 0;
02331 ts = 0;
02332 return 0;
02333 }
02334 }
02335 else
02336 {
02337 m_ullTargetTime = 0;
02338 m_ullTimeDiff = 0;
02339 ts = 0;
02340 return 0;
02341 }
02342 }
02343
02344 packet.m_iTimeStamp = int(CTimer::getTime() - m_StartTime);
02345 packet.m_iID = m_PeerID;
02346 packet.setLength(payload);
02347
02348 m_pCC->onPktSent(&packet);
02349
02350
02351 ++ m_llTraceSent;
02352 ++ m_llSentTotal;
02353
02354 if (probe)
02355 {
02356
02357 ts = entertime;
02358 probe = false;
02359 }
02360 else
02361 {
02362 #ifndef NO_BUSY_WAITING
02363 ts = entertime + m_ullInterval;
02364 #else
02365 if (m_ullTimeDiff >= m_ullInterval)
02366 {
02367 ts = entertime;
02368 m_ullTimeDiff -= m_ullInterval;
02369 }
02370 else
02371 {
02372 ts = entertime + m_ullInterval - m_ullTimeDiff;
02373 m_ullTimeDiff = 0;
02374 }
02375 #endif
02376 }
02377
02378 m_ullTargetTime = ts;
02379
02380 return payload;
02381 }
02382
02383 int CUDT::processData(CUnit* unit)
02384 {
02385 CPacket& packet = unit->m_Packet;
02386
02387
02388 m_iEXPCount = 1;
02389 uint64_t currtime;
02390 CTimer::rdtsc(currtime);
02391 m_ullLastRspTime = currtime;
02392
02393 m_pCC->onPktReceived(&packet);
02394 ++ m_iPktCount;
02395
02396 m_pRcvTimeWindow->onPktArrival();
02397
02398
02399 if (0 == (packet.m_iSeqNo & 0xF))
02400 m_pRcvTimeWindow->probe1Arrival();
02401 else if (1 == (packet.m_iSeqNo & 0xF))
02402 m_pRcvTimeWindow->probe2Arrival();
02403
02404 ++ m_llTraceRecv;
02405 ++ m_llRecvTotal;
02406
02407 int32_t offset = CSeqNo::seqoff(m_iRcvLastAck, packet.m_iSeqNo);
02408 if ((offset < 0) || (offset >= m_pRcvBuffer->getAvailBufSize()))
02409 return -1;
02410
02411 if (m_pRcvBuffer->addData(unit, offset) < 0)
02412 return -1;
02413
02414
02415 if (CSeqNo::seqcmp(packet.m_iSeqNo, CSeqNo::incseq(m_iRcvCurrSeqNo)) > 0)
02416 {
02417
02418 m_pRcvLossList->insert(CSeqNo::incseq(m_iRcvCurrSeqNo), CSeqNo::decseq(packet.m_iSeqNo));
02419
02420
02421 int32_t lossdata[2];
02422 lossdata[0] = CSeqNo::incseq(m_iRcvCurrSeqNo) | 0x80000000;
02423 lossdata[1] = CSeqNo::decseq(packet.m_iSeqNo);
02424
02425
02426 sendCtrl(3, NULL, lossdata, (CSeqNo::incseq(m_iRcvCurrSeqNo) == CSeqNo::decseq(packet.m_iSeqNo)) ? 1 : 2);
02427
02428 int loss = CSeqNo::seqlen(m_iRcvCurrSeqNo, packet.m_iSeqNo) - 2;
02429 m_iTraceRcvLoss += loss;
02430 m_iRcvLossTotal += loss;
02431 }
02432
02433
02434
02435 if (packet.getLength() != m_iPayloadSize)
02436 CTimer::rdtsc(m_ullNextACKTime);
02437
02438
02439
02440 if (CSeqNo::seqcmp(packet.m_iSeqNo, m_iRcvCurrSeqNo) > 0)
02441 m_iRcvCurrSeqNo = packet.m_iSeqNo;
02442 else
02443 m_pRcvLossList->remove(packet.m_iSeqNo);
02444
02445 return 0;
02446 }
02447
02448 int CUDT::listen(sockaddr* addr, CPacket& packet)
02449 {
02450 if (m_bClosing)
02451 return 1002;
02452
02453 if (packet.getLength() != CHandShake::m_iContentSize)
02454 return 1004;
02455
02456 CHandShake hs;
02457 hs.deserialize(packet.m_pcData, packet.getLength());
02458
02459
02460 char clienthost[NI_MAXHOST];
02461 char clientport[NI_MAXSERV];
02462 getnameinfo(addr, (AF_INET == m_iVersion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6), clienthost, sizeof(clienthost), clientport, sizeof(clientport), NI_NUMERICHOST|NI_NUMERICSERV);
02463 int64_t timestamp = (CTimer::getTime() - m_StartTime) / 60000000;
02464 stringstream cookiestr;
02465 cookiestr << clienthost << ":" << clientport << ":" << timestamp;
02466 unsigned char cookie[16];
02467 CMD5::compute(cookiestr.str().c_str(), cookie);
02468
02469 if (1 == hs.m_iReqType)
02470 {
02471 hs.m_iCookie = *(int*)cookie;
02472 packet.m_iID = hs.m_iID;
02473 int size = packet.getLength();
02474 hs.serialize(packet.m_pcData, size);
02475 m_pSndQueue->sendto(addr, packet);
02476 return 0;
02477 }
02478 else
02479 {
02480 if (hs.m_iCookie != *(int*)cookie)
02481 {
02482 timestamp --;
02483 cookiestr << clienthost << ":" << clientport << ":" << timestamp;
02484 CMD5::compute(cookiestr.str().c_str(), cookie);
02485
02486 if (hs.m_iCookie != *(int*)cookie)
02487 return -1;
02488 }
02489 }
02490
02491 int32_t id = hs.m_iID;
02492
02493
02494 if ((1 == packet.getFlag()) && (0 == packet.getType()))
02495 {
02496 if ((hs.m_iVersion != m_iVersion) || (hs.m_iType != m_iSockType))
02497 {
02498
02499 hs.m_iReqType = 1002;
02500 int size = CHandShake::m_iContentSize;
02501 hs.serialize(packet.m_pcData, size);
02502 packet.m_iID = id;
02503 m_pSndQueue->sendto(addr, packet);
02504 }
02505 else
02506 {
02507 int result = s_UDTUnited.newConnection(m_SocketID, addr, &hs);
02508 if (result == -1)
02509 hs.m_iReqType = 1002;
02510
02511
02512
02513 if (result != 1)
02514 {
02515 int size = CHandShake::m_iContentSize;
02516 hs.serialize(packet.m_pcData, size);
02517 packet.m_iID = id;
02518 m_pSndQueue->sendto(addr, packet);
02519 }
02520 else
02521 {
02522
02523 s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);
02524 }
02525 }
02526 }
02527
02528 return hs.m_iReqType;
02529 }
02530
02531 void CUDT::checkTimers()
02532 {
02533
02534 CCUpdate();
02535
02536
02537
02538
02539 uint64_t currtime;
02540 CTimer::rdtsc(currtime);
02541
02542 if ((currtime > m_ullNextACKTime) || ((m_pCC->m_iACKInterval > 0) && (m_pCC->m_iACKInterval <= m_iPktCount)))
02543 {
02544
02545
02546 sendCtrl(2);
02547 CTimer::rdtsc(currtime);
02548 if (m_pCC->m_iACKPeriod > 0)
02549 m_ullNextACKTime = currtime + m_pCC->m_iACKPeriod * m_ullCPUFrequency;
02550 else
02551 m_ullNextACKTime = currtime + m_ullACKInt;
02552
02553 m_iPktCount = 0;
02554 m_iLightACKCount = 1;
02555 }
02556 else if (m_iSelfClockInterval * m_iLightACKCount <= m_iPktCount)
02557 {
02558
02559 sendCtrl(2, NULL, NULL, 4);
02560 ++ m_iLightACKCount;
02561 }
02562
02563
02564
02565
02566
02567
02568
02569
02570
02571
02572
02573 uint64_t next_exp_time;
02574 if (m_pCC->m_bUserDefinedRTO)
02575 next_exp_time = m_ullLastRspTime + m_pCC->m_iRTO * m_ullCPUFrequency;
02576 else
02577 {
02578 uint64_t exp_int = (m_iEXPCount * (m_iRTT + 4 * m_iRTTVar) + m_iSYNInterval) * m_ullCPUFrequency;
02579 if (exp_int < m_iEXPCount * m_ullMinExpInt)
02580 exp_int = m_iEXPCount * m_ullMinExpInt;
02581 next_exp_time = m_ullLastRspTime + exp_int;
02582 }
02583
02584 if (currtime > next_exp_time)
02585 {
02586
02587
02588 if ((m_iEXPCount > 16) && (currtime - m_ullLastRspTime > 5000000 * m_ullCPUFrequency))
02589 {
02590
02591
02592
02593
02594
02595 m_bClosing = true;
02596 m_bBroken = true;
02597 m_iBrokenCounter = 30;
02598
02599
02600 m_pSndQueue->m_pSndUList->update(this);
02601
02602 releaseSynch();
02603
02604
02605 s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN | UDT_EPOLL_OUT | UDT_EPOLL_ERR, true);
02606
02607 CTimer::triggerEvent();
02608
02609 return;
02610 }
02611
02612
02613
02614 if (m_pSndBuffer->getCurrBufSize() > 0)
02615 {
02616 if ((CSeqNo::incseq(m_iSndCurrSeqNo) != m_iSndLastAck) && (m_pSndLossList->getLossLength() == 0))
02617 {
02618
02619 int32_t csn = m_iSndCurrSeqNo;
02620 int num = m_pSndLossList->insert(m_iSndLastAck, csn);
02621 m_iTraceSndLoss += num;
02622 m_iSndLossTotal += num;
02623 }
02624
02625 m_pCC->onTimeout();
02626 CCUpdate();
02627
02628
02629 m_pSndQueue->m_pSndUList->update(this);
02630 }
02631 else
02632 {
02633 sendCtrl(1);
02634 }
02635
02636 ++ m_iEXPCount;
02637
02638 m_ullLastRspTime = currtime;
02639 }
02640 }
02641
02642 void CUDT::addEPoll(const int eid)
02643 {
02644 CGuard::enterCS(s_UDTUnited.m_EPoll.m_EPollLock);
02645 m_sPollID.insert(eid);
02646 CGuard::leaveCS(s_UDTUnited.m_EPoll.m_EPollLock);
02647
02648 if (!m_bConnected || m_bBroken || m_bClosing)
02649 return;
02650
02651 if (((UDT_STREAM == m_iSockType) && (m_pRcvBuffer->getRcvDataSize() > 0)) ||
02652 ((UDT_DGRAM == m_iSockType) && (m_pRcvBuffer->getRcvMsgNum() > 0)))
02653 {
02654 s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, true);
02655 }
02656 if (m_iSndBufSize > m_pSndBuffer->getCurrBufSize())
02657 {
02658 s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);
02659 }
02660 }
02661
02662 void CUDT::removeEPoll(const int eid)
02663 {
02664 CGuard::enterCS(s_UDTUnited.m_EPoll.m_EPollLock);
02665 m_sPollID.erase(eid);
02666 CGuard::leaveCS(s_UDTUnited.m_EPoll.m_EPollLock);
02667
02668
02669
02670 s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN | UDT_EPOLL_OUT, false);
02671 }