00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041 #ifdef WIN32
00042 #include <winsock2.h>
00043 #include <ws2tcpip.h>
00044 #ifdef LEGACY_WIN32
00045 #include <wspiapi.h>
00046 #endif
00047 #endif
00048 #include <cstring>
00049
00050 #include "common.h"
00051 #include "core.h"
00052 #include "queue.h"
00053
00054 using namespace std;
00055
00056 CUnitQueue::CUnitQueue():
00057 m_pQEntry(NULL),
00058 m_pCurrQueue(NULL),
00059 m_pLastQueue(NULL),
00060 m_iSize(0),
00061 m_iCount(0),
00062 m_iMSS(),
00063 m_iIPversion()
00064 {
00065 }
00066
00067 CUnitQueue::~CUnitQueue()
00068 {
00069 CQEntry* p = m_pQEntry;
00070
00071 while (p != NULL)
00072 {
00073 delete [] p->m_pUnit;
00074 delete [] p->m_pBuffer;
00075
00076 CQEntry* q = p;
00077 if (p == m_pLastQueue)
00078 p = NULL;
00079 else
00080 p = p->m_pNext;
00081 delete q;
00082 }
00083 }
00084
00085 int CUnitQueue::init(int size, int mss, int version)
00086 {
00087 CQEntry* tempq = NULL;
00088 CUnit* tempu = NULL;
00089 char* tempb = NULL;
00090
00091 try
00092 {
00093 tempq = new CQEntry;
00094 tempu = new CUnit [size];
00095 tempb = new char [size * mss];
00096 }
00097 catch (...)
00098 {
00099 delete tempq;
00100 delete [] tempu;
00101 delete [] tempb;
00102
00103 return -1;
00104 }
00105
00106 for (int i = 0; i < size; ++ i)
00107 {
00108 tempu[i].m_iFlag = 0;
00109 tempu[i].m_Packet.m_pcData = tempb + i * mss;
00110 }
00111 tempq->m_pUnit = tempu;
00112 tempq->m_pBuffer = tempb;
00113 tempq->m_iSize = size;
00114
00115 m_pQEntry = m_pCurrQueue = m_pLastQueue = tempq;
00116 m_pQEntry->m_pNext = m_pQEntry;
00117
00118 m_pAvailUnit = m_pCurrQueue->m_pUnit;
00119
00120 m_iSize = size;
00121 m_iMSS = mss;
00122 m_iIPversion = version;
00123
00124 return 0;
00125 }
00126
00127 int CUnitQueue::increase()
00128 {
00129
00130 int real_count = 0;
00131 CQEntry* p = m_pQEntry;
00132 while (p != NULL)
00133 {
00134 CUnit* u = p->m_pUnit;
00135 for (CUnit* end = u + p->m_iSize; u != end; ++ u)
00136 if (u->m_iFlag != 0)
00137 ++ real_count;
00138
00139 if (p == m_pLastQueue)
00140 p = NULL;
00141 else
00142 p = p->m_pNext;
00143 }
00144 m_iCount = real_count;
00145 if (double(m_iCount) / m_iSize < 0.9)
00146 return -1;
00147
00148 CQEntry* tempq = NULL;
00149 CUnit* tempu = NULL;
00150 char* tempb = NULL;
00151
00152
00153 int size = m_pQEntry->m_iSize;
00154
00155 try
00156 {
00157 tempq = new CQEntry;
00158 tempu = new CUnit [size];
00159 tempb = new char [size * m_iMSS];
00160 }
00161 catch (...)
00162 {
00163 delete tempq;
00164 delete [] tempu;
00165 delete [] tempb;
00166
00167 return -1;
00168 }
00169
00170 for (int i = 0; i < size; ++ i)
00171 {
00172 tempu[i].m_iFlag = 0;
00173 tempu[i].m_Packet.m_pcData = tempb + i * m_iMSS;
00174 }
00175 tempq->m_pUnit = tempu;
00176 tempq->m_pBuffer = tempb;
00177 tempq->m_iSize = size;
00178
00179 m_pLastQueue->m_pNext = tempq;
00180 m_pLastQueue = tempq;
00181 m_pLastQueue->m_pNext = m_pQEntry;
00182
00183 m_iSize += size;
00184
00185 return 0;
00186 }
00187
00188 int CUnitQueue::shrink()
00189 {
00190
00191 return -1;
00192 }
00193
00194 CUnit* CUnitQueue::getNextAvailUnit()
00195 {
00196 if (m_iCount * 10 > m_iSize * 9)
00197 increase();
00198
00199 if (m_iCount >= m_iSize)
00200 return NULL;
00201
00202 CQEntry* entrance = m_pCurrQueue;
00203
00204 do
00205 {
00206 for (CUnit* sentinel = m_pCurrQueue->m_pUnit + m_pCurrQueue->m_iSize - 1; m_pAvailUnit != sentinel; ++ m_pAvailUnit)
00207 if (m_pAvailUnit->m_iFlag == 0)
00208 return m_pAvailUnit;
00209
00210 if (m_pCurrQueue->m_pUnit->m_iFlag == 0)
00211 {
00212 m_pAvailUnit = m_pCurrQueue->m_pUnit;
00213 return m_pAvailUnit;
00214 }
00215
00216 m_pCurrQueue = m_pCurrQueue->m_pNext;
00217 m_pAvailUnit = m_pCurrQueue->m_pUnit;
00218 } while (m_pCurrQueue != entrance);
00219
00220 increase();
00221
00222 return NULL;
00223 }
00224
00225
00226 CSndUList::CSndUList():
00227 m_pHeap(NULL),
00228 m_iArrayLength(4096),
00229 m_iLastEntry(-1),
00230 m_ListLock(),
00231 m_pWindowLock(NULL),
00232 m_pWindowCond(NULL),
00233 m_pTimer(NULL)
00234 {
00235 m_pHeap = new CSNode*[m_iArrayLength];
00236
00237 #ifndef WIN32
00238 pthread_mutex_init(&m_ListLock, NULL);
00239 #else
00240 m_ListLock = CreateMutex(NULL, false, NULL);
00241 #endif
00242 }
00243
00244 CSndUList::~CSndUList()
00245 {
00246 delete [] m_pHeap;
00247
00248 #ifndef WIN32
00249 pthread_mutex_destroy(&m_ListLock);
00250 #else
00251 CloseHandle(m_ListLock);
00252 #endif
00253 }
00254
00255 void CSndUList::insert(int64_t ts, const CUDT* u)
00256 {
00257 CGuard listguard(m_ListLock);
00258
00259
00260 if (m_iLastEntry == m_iArrayLength - 1)
00261 {
00262 CSNode** temp = NULL;
00263
00264 try
00265 {
00266 temp = new CSNode*[m_iArrayLength * 2];
00267 }
00268 catch(...)
00269 {
00270 return;
00271 }
00272
00273 memcpy(temp, m_pHeap, sizeof(CSNode*) * m_iArrayLength);
00274 m_iArrayLength *= 2;
00275 delete [] m_pHeap;
00276 m_pHeap = temp;
00277 }
00278
00279 insert_(ts, u);
00280 }
00281
00282 void CSndUList::update(const CUDT* u, bool reschedule)
00283 {
00284 CGuard listguard(m_ListLock);
00285
00286 CSNode* n = u->m_pSNode;
00287
00288 if (n->m_iHeapLoc >= 0)
00289 {
00290 if (!reschedule)
00291 return;
00292
00293 if (n->m_iHeapLoc == 0)
00294 {
00295 n->m_llTimeStamp = 1;
00296 m_pTimer->interrupt();
00297 return;
00298 }
00299
00300 remove_(u);
00301 }
00302
00303 insert_(1, u);
00304 }
00305
00306 int CSndUList::pop(sockaddr*& addr, CPacket& pkt)
00307 {
00308 CGuard listguard(m_ListLock);
00309
00310 if (-1 == m_iLastEntry)
00311 return -1;
00312
00313
00314 uint64_t ts;
00315 CTimer::rdtsc(ts);
00316 if (ts < m_pHeap[0]->m_llTimeStamp)
00317 return -1;
00318
00319 CUDT* u = m_pHeap[0]->m_pUDT;
00320 remove_(u);
00321
00322 if (!u->m_bConnected || u->m_bBroken)
00323 return -1;
00324
00325
00326 if (u->packData(pkt, ts) <= 0)
00327 return -1;
00328
00329 addr = u->m_pPeerAddr;
00330
00331
00332 if (ts > 0)
00333 insert_(ts, u);
00334
00335 return 1;
00336 }
00337
00338 void CSndUList::remove(const CUDT* u)
00339 {
00340 CGuard listguard(m_ListLock);
00341
00342 remove_(u);
00343 }
00344
00345 uint64_t CSndUList::getNextProcTime()
00346 {
00347 CGuard listguard(m_ListLock);
00348
00349 if (-1 == m_iLastEntry)
00350 return 0;
00351
00352 return m_pHeap[0]->m_llTimeStamp;
00353 }
00354
00355 void CSndUList::insert_(int64_t ts, const CUDT* u)
00356 {
00357 CSNode* n = u->m_pSNode;
00358
00359
00360 if (n->m_iHeapLoc >= 0)
00361 return;
00362
00363 m_iLastEntry ++;
00364 m_pHeap[m_iLastEntry] = n;
00365 n->m_llTimeStamp = ts;
00366
00367 int q = m_iLastEntry;
00368 int p = q;
00369 while (p != 0)
00370 {
00371 p = (q - 1) >> 1;
00372 if (m_pHeap[p]->m_llTimeStamp > m_pHeap[q]->m_llTimeStamp)
00373 {
00374 CSNode* t = m_pHeap[p];
00375 m_pHeap[p] = m_pHeap[q];
00376 m_pHeap[q] = t;
00377 t->m_iHeapLoc = q;
00378 q = p;
00379 }
00380 else
00381 break;
00382 }
00383
00384 n->m_iHeapLoc = q;
00385
00386
00387 if (n->m_iHeapLoc == 0)
00388 m_pTimer->interrupt();
00389
00390
00391 if (0 == m_iLastEntry)
00392 {
00393 #ifndef WIN32
00394 pthread_mutex_lock(m_pWindowLock);
00395 pthread_cond_signal(m_pWindowCond);
00396 pthread_mutex_unlock(m_pWindowLock);
00397 #else
00398 SetEvent(*m_pWindowCond);
00399 #endif
00400 }
00401 }
00402
00403 void CSndUList::remove_(const CUDT* u)
00404 {
00405 CSNode* n = u->m_pSNode;
00406
00407 if (n->m_iHeapLoc >= 0)
00408 {
00409
00410 m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry];
00411 m_iLastEntry --;
00412 m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc;
00413
00414 int q = n->m_iHeapLoc;
00415 int p = q * 2 + 1;
00416 while (p <= m_iLastEntry)
00417 {
00418 if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_llTimeStamp > m_pHeap[p + 1]->m_llTimeStamp))
00419 p ++;
00420
00421 if (m_pHeap[q]->m_llTimeStamp > m_pHeap[p]->m_llTimeStamp)
00422 {
00423 CSNode* t = m_pHeap[p];
00424 m_pHeap[p] = m_pHeap[q];
00425 m_pHeap[p]->m_iHeapLoc = p;
00426 m_pHeap[q] = t;
00427 m_pHeap[q]->m_iHeapLoc = q;
00428
00429 q = p;
00430 p = q * 2 + 1;
00431 }
00432 else
00433 break;
00434 }
00435
00436 n->m_iHeapLoc = -1;
00437 }
00438
00439
00440 if (0 == m_iLastEntry)
00441 m_pTimer->interrupt();
00442 }
00443
00444
00445 CSndQueue::CSndQueue():
00446 m_WorkerThread(),
00447 m_pSndUList(NULL),
00448 m_pChannel(NULL),
00449 m_pTimer(NULL),
00450 m_WindowLock(),
00451 m_WindowCond(),
00452 m_bClosing(false),
00453 m_ExitCond()
00454 {
00455 #ifndef WIN32
00456 pthread_cond_init(&m_WindowCond, NULL);
00457 pthread_mutex_init(&m_WindowLock, NULL);
00458 #else
00459 m_WindowLock = CreateMutex(NULL, false, NULL);
00460 m_WindowCond = CreateEvent(NULL, false, false, NULL);
00461 m_ExitCond = CreateEvent(NULL, false, false, NULL);
00462 #endif
00463 }
00464
00465 CSndQueue::~CSndQueue()
00466 {
00467 m_bClosing = true;
00468
00469 #ifndef WIN32
00470 pthread_mutex_lock(&m_WindowLock);
00471 pthread_cond_signal(&m_WindowCond);
00472 pthread_mutex_unlock(&m_WindowLock);
00473 if (0 != m_WorkerThread)
00474 pthread_join(m_WorkerThread, NULL);
00475 pthread_cond_destroy(&m_WindowCond);
00476 pthread_mutex_destroy(&m_WindowLock);
00477 #else
00478 SetEvent(m_WindowCond);
00479 if (NULL != m_WorkerThread)
00480 WaitForSingleObject(m_ExitCond, INFINITE);
00481 CloseHandle(m_WorkerThread);
00482 CloseHandle(m_WindowLock);
00483 CloseHandle(m_WindowCond);
00484 CloseHandle(m_ExitCond);
00485 #endif
00486
00487 delete m_pSndUList;
00488 }
00489
00490 void CSndQueue::init(CChannel* c, CTimer* t)
00491 {
00492 m_pChannel = c;
00493 m_pTimer = t;
00494 m_pSndUList = new CSndUList;
00495 m_pSndUList->m_pWindowLock = &m_WindowLock;
00496 m_pSndUList->m_pWindowCond = &m_WindowCond;
00497 m_pSndUList->m_pTimer = m_pTimer;
00498
00499 #ifndef WIN32
00500 if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this))
00501 {
00502 m_WorkerThread = 0;
00503 throw CUDTException(3, 1);
00504 }
00505 #else
00506 DWORD threadID;
00507 m_WorkerThread = CreateThread(NULL, 0, CSndQueue::worker, this, 0, &threadID);
00508 if (NULL == m_WorkerThread)
00509 throw CUDTException(3, 1);
00510 #endif
00511 }
00512
00513 #ifndef WIN32
00514 void* CSndQueue::worker(void* param)
00515 #else
00516 DWORD WINAPI CSndQueue::worker(LPVOID param)
00517 #endif
00518 {
00519 CSndQueue* self = (CSndQueue*)param;
00520
00521 while (!self->m_bClosing)
00522 {
00523 uint64_t ts = self->m_pSndUList->getNextProcTime();
00524
00525 if (ts > 0)
00526 {
00527
00528 uint64_t currtime;
00529 CTimer::rdtsc(currtime);
00530 if (currtime < ts)
00531 self->m_pTimer->sleepto(ts);
00532
00533
00534 sockaddr* addr;
00535 CPacket pkt;
00536 if (self->m_pSndUList->pop(addr, pkt) < 0)
00537 continue;
00538
00539 self->m_pChannel->sendto(addr, pkt);
00540 }
00541 else
00542 {
00543
00544 #ifndef WIN32
00545 pthread_mutex_lock(&self->m_WindowLock);
00546 if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0))
00547 pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock);
00548 pthread_mutex_unlock(&self->m_WindowLock);
00549 #else
00550 WaitForSingleObject(self->m_WindowCond, INFINITE);
00551 #endif
00552 }
00553 }
00554
00555 #ifndef WIN32
00556 return NULL;
00557 #else
00558 SetEvent(self->m_ExitCond);
00559 return 0;
00560 #endif
00561 }
00562
00563 int CSndQueue::sendto(const sockaddr* addr, CPacket& packet)
00564 {
00565
00566 m_pChannel->sendto(addr, packet);
00567 return packet.getLength();
00568 }
00569
00570
00571
00572 CRcvUList::CRcvUList():
00573 m_pUList(NULL),
00574 m_pLast(NULL)
00575 {
00576 }
00577
00578 CRcvUList::~CRcvUList()
00579 {
00580 }
00581
00582 void CRcvUList::insert(const CUDT* u)
00583 {
00584 CRNode* n = u->m_pRNode;
00585 CTimer::rdtsc(n->m_llTimeStamp);
00586
00587 if (NULL == m_pUList)
00588 {
00589
00590 n->m_pPrev = n->m_pNext = NULL;
00591 m_pLast = m_pUList = n;
00592
00593 return;
00594 }
00595
00596
00597 n->m_pPrev = m_pLast;
00598 n->m_pNext = NULL;
00599 m_pLast->m_pNext = n;
00600 m_pLast = n;
00601 }
00602
00603 void CRcvUList::remove(const CUDT* u)
00604 {
00605 CRNode* n = u->m_pRNode;
00606
00607 if (!n->m_bOnList)
00608 return;
00609
00610 if (NULL == n->m_pPrev)
00611 {
00612
00613 m_pUList = n->m_pNext;
00614 if (NULL == m_pUList)
00615 m_pLast = NULL;
00616 else
00617 m_pUList->m_pPrev = NULL;
00618 }
00619 else
00620 {
00621 n->m_pPrev->m_pNext = n->m_pNext;
00622 if (NULL == n->m_pNext)
00623 {
00624
00625 m_pLast = n->m_pPrev;
00626 }
00627 else
00628 n->m_pNext->m_pPrev = n->m_pPrev;
00629 }
00630
00631 n->m_pNext = n->m_pPrev = NULL;
00632 }
00633
00634 void CRcvUList::update(const CUDT* u)
00635 {
00636 CRNode* n = u->m_pRNode;
00637
00638 if (!n->m_bOnList)
00639 return;
00640
00641 CTimer::rdtsc(n->m_llTimeStamp);
00642
00643
00644 if (NULL == n->m_pNext)
00645 return;
00646
00647 if (NULL == n->m_pPrev)
00648 {
00649 m_pUList = n->m_pNext;
00650 m_pUList->m_pPrev = NULL;
00651 }
00652 else
00653 {
00654 n->m_pPrev->m_pNext = n->m_pNext;
00655 n->m_pNext->m_pPrev = n->m_pPrev;
00656 }
00657
00658 n->m_pPrev = m_pLast;
00659 n->m_pNext = NULL;
00660 m_pLast->m_pNext = n;
00661 m_pLast = n;
00662 }
00663
00664
00665 CHash::CHash():
00666 m_pBucket(NULL),
00667 m_iHashSize(0)
00668 {
00669 }
00670
00671 CHash::~CHash()
00672 {
00673 for (int i = 0; i < m_iHashSize; ++ i)
00674 {
00675 CBucket* b = m_pBucket[i];
00676 while (NULL != b)
00677 {
00678 CBucket* n = b->m_pNext;
00679 delete b;
00680 b = n;
00681 }
00682 }
00683
00684 delete [] m_pBucket;
00685 }
00686
00687 void CHash::init(int size)
00688 {
00689 m_pBucket = new CBucket* [size];
00690
00691 for (int i = 0; i < size; ++ i)
00692 m_pBucket[i] = NULL;
00693
00694 m_iHashSize = size;
00695 }
00696
00697 CUDT* CHash::lookup(int32_t id)
00698 {
00699
00700 CBucket* b = m_pBucket[id % m_iHashSize];
00701
00702 while (NULL != b)
00703 {
00704 if (id == b->m_iID)
00705 return b->m_pUDT;
00706 b = b->m_pNext;
00707 }
00708
00709 return NULL;
00710 }
00711
00712 void CHash::insert(int32_t id, CUDT* u)
00713 {
00714 CBucket* b = m_pBucket[id % m_iHashSize];
00715
00716 CBucket* n = new CBucket;
00717 n->m_iID = id;
00718 n->m_pUDT = u;
00719 n->m_pNext = b;
00720
00721 m_pBucket[id % m_iHashSize] = n;
00722 }
00723
00724 void CHash::remove(int32_t id)
00725 {
00726 CBucket* b = m_pBucket[id % m_iHashSize];
00727 CBucket* p = NULL;
00728
00729 while (NULL != b)
00730 {
00731 if (id == b->m_iID)
00732 {
00733 if (NULL == p)
00734 m_pBucket[id % m_iHashSize] = b->m_pNext;
00735 else
00736 p->m_pNext = b->m_pNext;
00737
00738 delete b;
00739
00740 return;
00741 }
00742
00743 p = b;
00744 b = b->m_pNext;
00745 }
00746 }
00747
00748
00749
00750 CRendezvousQueue::CRendezvousQueue():
00751 m_lRendezvousID(),
00752 m_RIDVectorLock()
00753 {
00754 #ifndef WIN32
00755 pthread_mutex_init(&m_RIDVectorLock, NULL);
00756 #else
00757 m_RIDVectorLock = CreateMutex(NULL, false, NULL);
00758 #endif
00759 }
00760
00761 CRendezvousQueue::~CRendezvousQueue()
00762 {
00763 #ifndef WIN32
00764 pthread_mutex_destroy(&m_RIDVectorLock);
00765 #else
00766 CloseHandle(m_RIDVectorLock);
00767 #endif
00768
00769 for (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++ i)
00770 {
00771 if (AF_INET == i->m_iIPversion)
00772 delete (sockaddr_in*)i->m_pPeerAddr;
00773 else
00774 delete (sockaddr_in6*)i->m_pPeerAddr;
00775 }
00776
00777 m_lRendezvousID.clear();
00778 }
00779
00780 void CRendezvousQueue::insert(const UDTSOCKET& id, CUDT* u, int ipv, const sockaddr* addr, uint64_t ttl)
00781 {
00782 CGuard vg(m_RIDVectorLock);
00783
00784 CRL r;
00785 r.m_iID = id;
00786 r.m_pUDT = u;
00787 r.m_iIPversion = ipv;
00788 r.m_pPeerAddr = (AF_INET == ipv) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6;
00789 memcpy(r.m_pPeerAddr, addr, (AF_INET == ipv) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));
00790 r.m_ullTTL = ttl;
00791
00792 m_lRendezvousID.push_back(r);
00793 }
00794
00795 void CRendezvousQueue::remove(const UDTSOCKET& id)
00796 {
00797 CGuard vg(m_RIDVectorLock);
00798
00799 for (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++ i)
00800 {
00801 if (i->m_iID == id)
00802 {
00803 if (AF_INET == i->m_iIPversion)
00804 delete (sockaddr_in*)i->m_pPeerAddr;
00805 else
00806 delete (sockaddr_in6*)i->m_pPeerAddr;
00807
00808 m_lRendezvousID.erase(i);
00809
00810 return;
00811 }
00812 }
00813 }
00814
00815 CUDT* CRendezvousQueue::retrieve(const sockaddr* addr, UDTSOCKET& id)
00816 {
00817 CGuard vg(m_RIDVectorLock);
00818
00819
00820 for (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++ i)
00821 {
00822 if (CIPAddress::ipcmp(addr, i->m_pPeerAddr, i->m_iIPversion) && ((0 == id) || (id == i->m_iID)))
00823 {
00824 id = i->m_iID;
00825 return i->m_pUDT;
00826 }
00827 }
00828
00829 return NULL;
00830 }
00831
00832 void CRendezvousQueue::updateConnStatus()
00833 {
00834 if (m_lRendezvousID.empty())
00835 return;
00836
00837 CGuard vg(m_RIDVectorLock);
00838
00839 for (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++ i)
00840 {
00841
00842 if (CTimer::getTime() - i->m_pUDT->m_llLastReqTime > 250000)
00843 {
00844 if (CTimer::getTime() >= i->m_ullTTL)
00845 {
00846
00847 i->m_pUDT->m_bConnecting = false;
00848 CUDT::s_UDTUnited.m_EPoll.update_events(i->m_iID, i->m_pUDT->m_sPollID, UDT_EPOLL_ERR, true);
00849 continue;
00850 }
00851
00852 CPacket request;
00853 char* reqdata = new char [i->m_pUDT->m_iPayloadSize];
00854 request.pack(0, NULL, reqdata, i->m_pUDT->m_iPayloadSize);
00855
00856 request.m_iID = !i->m_pUDT->m_bRendezvous ? 0 : i->m_pUDT->m_ConnRes.m_iID;
00857 int hs_size = i->m_pUDT->m_iPayloadSize;
00858 i->m_pUDT->m_ConnReq.serialize(reqdata, hs_size);
00859 request.setLength(hs_size);
00860 i->m_pUDT->m_pSndQueue->sendto(i->m_pPeerAddr, request);
00861 i->m_pUDT->m_llLastReqTime = CTimer::getTime();
00862 delete [] reqdata;
00863 }
00864 }
00865 }
00866
00867
00868 CRcvQueue::CRcvQueue():
00869 m_WorkerThread(),
00870 m_UnitQueue(),
00871 m_pRcvUList(NULL),
00872 m_pHash(NULL),
00873 m_pChannel(NULL),
00874 m_pTimer(NULL),
00875 m_iPayloadSize(),
00876 m_bClosing(false),
00877 m_ExitCond(),
00878 m_LSLock(),
00879 m_pListener(NULL),
00880 m_pRendezvousQueue(NULL),
00881 m_vNewEntry(),
00882 m_IDLock(),
00883 m_mBuffer(),
00884 m_PassLock(),
00885 m_PassCond()
00886 {
00887 #ifndef WIN32
00888 pthread_mutex_init(&m_PassLock, NULL);
00889 pthread_cond_init(&m_PassCond, NULL);
00890 pthread_mutex_init(&m_LSLock, NULL);
00891 pthread_mutex_init(&m_IDLock, NULL);
00892 #else
00893 m_PassLock = CreateMutex(NULL, false, NULL);
00894 m_PassCond = CreateEvent(NULL, false, false, NULL);
00895 m_LSLock = CreateMutex(NULL, false, NULL);
00896 m_IDLock = CreateMutex(NULL, false, NULL);
00897 m_ExitCond = CreateEvent(NULL, false, false, NULL);
00898 #endif
00899 }
00900
00901 CRcvQueue::~CRcvQueue()
00902 {
00903 m_bClosing = true;
00904
00905 #ifndef WIN32
00906 if (0 != m_WorkerThread)
00907 pthread_join(m_WorkerThread, NULL);
00908 pthread_mutex_destroy(&m_PassLock);
00909 pthread_cond_destroy(&m_PassCond);
00910 pthread_mutex_destroy(&m_LSLock);
00911 pthread_mutex_destroy(&m_IDLock);
00912 #else
00913 if (NULL != m_WorkerThread)
00914 WaitForSingleObject(m_ExitCond, INFINITE);
00915 CloseHandle(m_WorkerThread);
00916 CloseHandle(m_PassLock);
00917 CloseHandle(m_PassCond);
00918 CloseHandle(m_LSLock);
00919 CloseHandle(m_IDLock);
00920 CloseHandle(m_ExitCond);
00921 #endif
00922
00923 delete m_pRcvUList;
00924 delete m_pHash;
00925 delete m_pRendezvousQueue;
00926
00927
00928 for (map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.begin(); i != m_mBuffer.end(); ++ i)
00929 {
00930 while (!i->second.empty())
00931 {
00932 CPacket* pkt = i->second.front();
00933 delete [] pkt->m_pcData;
00934 delete pkt;
00935 i->second.pop();
00936 }
00937 }
00938 }
00939
00940 void CRcvQueue::init(int qsize, int payload, int version, int hsize, CChannel* cc, CTimer* t)
00941 {
00942 m_iPayloadSize = payload;
00943
00944 m_UnitQueue.init(qsize, payload, version);
00945
00946 m_pHash = new CHash;
00947 m_pHash->init(hsize);
00948
00949 m_pChannel = cc;
00950 m_pTimer = t;
00951
00952 m_pRcvUList = new CRcvUList;
00953 m_pRendezvousQueue = new CRendezvousQueue;
00954
00955 #ifndef WIN32
00956 if (0 != pthread_create(&m_WorkerThread, NULL, CRcvQueue::worker, this))
00957 {
00958 m_WorkerThread = 0;
00959 throw CUDTException(3, 1);
00960 }
00961 #else
00962 DWORD threadID;
00963 m_WorkerThread = CreateThread(NULL, 0, CRcvQueue::worker, this, 0, &threadID);
00964 if (NULL == m_WorkerThread)
00965 throw CUDTException(3, 1);
00966 #endif
00967 }
00968
00969 #ifndef WIN32
00970 void* CRcvQueue::worker(void* param)
00971 #else
00972 DWORD WINAPI CRcvQueue::worker(LPVOID param)
00973 #endif
00974 {
00975 CRcvQueue* self = (CRcvQueue*)param;
00976
00977 sockaddr* addr = (AF_INET == self->m_UnitQueue.m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;
00978 CUDT* u = NULL;
00979 int32_t id;
00980
00981 while (!self->m_bClosing)
00982 {
00983 #ifdef NO_BUSY_WAITING
00984 self->m_pTimer->tick();
00985 #endif
00986
00987
00988 while (self->ifNewEntry())
00989 {
00990 CUDT* ne = self->getNewEntry();
00991 if (NULL != ne)
00992 {
00993 self->m_pRcvUList->insert(ne);
00994 self->m_pHash->insert(ne->m_SocketID, ne);
00995 }
00996 }
00997
00998
00999 CUnit* unit = self->m_UnitQueue.getNextAvailUnit();
01000 if (NULL == unit)
01001 {
01002
01003 CPacket temp;
01004 temp.m_pcData = new char[self->m_iPayloadSize];
01005 temp.setLength(self->m_iPayloadSize);
01006 self->m_pChannel->recvfrom(addr, temp);
01007 delete [] temp.m_pcData;
01008 goto TIMER_CHECK;
01009 }
01010
01011 unit->m_Packet.setLength(self->m_iPayloadSize);
01012
01013
01014 if (self->m_pChannel->recvfrom(addr, unit->m_Packet) < 0)
01015 goto TIMER_CHECK;
01016
01017 id = unit->m_Packet.m_iID;
01018
01019
01020 if (0 == id)
01021 {
01022 if (NULL != self->m_pListener)
01023 self->m_pListener->listen(addr, unit->m_Packet);
01024 else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id)))
01025 {
01026
01027
01028 if (!u->m_bSynRecving)
01029 u->connect(unit->m_Packet);
01030 else
01031 self->storePkt(id, unit->m_Packet.clone());
01032 }
01033 }
01034 else if (id > 0)
01035 {
01036 if (NULL != (u = self->m_pHash->lookup(id)))
01037 {
01038 if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion))
01039 {
01040 if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing)
01041 {
01042 if (0 == unit->m_Packet.getFlag())
01043 u->processData(unit);
01044 else
01045 u->processCtrl(unit->m_Packet);
01046
01047 u->checkTimers();
01048 self->m_pRcvUList->update(u);
01049 }
01050 }
01051 }
01052 else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id)))
01053 {
01054 if (!u->m_bSynRecving)
01055 u->connect(unit->m_Packet);
01056 else
01057 self->storePkt(id, unit->m_Packet.clone());
01058 }
01059 }
01060
01061 TIMER_CHECK:
01062
01063
01064 uint64_t currtime;
01065 CTimer::rdtsc(currtime);
01066
01067 CRNode* ul = self->m_pRcvUList->m_pUList;
01068 uint64_t ctime = currtime - 100000 * CTimer::getCPUFrequency();
01069 while ((NULL != ul) && (ul->m_llTimeStamp < ctime))
01070 {
01071 CUDT* u = ul->m_pUDT;
01072
01073 if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing)
01074 {
01075 u->checkTimers();
01076 self->m_pRcvUList->update(u);
01077 }
01078 else
01079 {
01080
01081 self->m_pHash->remove(u->m_SocketID);
01082 self->m_pRcvUList->remove(u);
01083 u->m_pRNode->m_bOnList = false;
01084 }
01085
01086 ul = self->m_pRcvUList->m_pUList;
01087 }
01088
01089
01090 self->m_pRendezvousQueue->updateConnStatus();
01091 }
01092
01093 if (AF_INET == self->m_UnitQueue.m_iIPversion)
01094 delete (sockaddr_in*)addr;
01095 else
01096 delete (sockaddr_in6*)addr;
01097
01098 #ifndef WIN32
01099 return NULL;
01100 #else
01101 SetEvent(self->m_ExitCond);
01102 return 0;
01103 #endif
01104 }
01105
01106 int CRcvQueue::recvfrom(int32_t id, CPacket& packet)
01107 {
01108 CGuard bufferlock(m_PassLock);
01109
01110 map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
01111
01112 if (i == m_mBuffer.end())
01113 {
01114 #ifndef WIN32
01115 uint64_t now = CTimer::getTime();
01116 timespec timeout;
01117
01118 timeout.tv_sec = now / 1000000 + 1;
01119 timeout.tv_nsec = (now % 1000000) * 1000;
01120
01121 pthread_cond_timedwait(&m_PassCond, &m_PassLock, &timeout);
01122 #else
01123 ReleaseMutex(m_PassLock);
01124 WaitForSingleObject(m_PassCond, 1000);
01125 WaitForSingleObject(m_PassLock, INFINITE);
01126 #endif
01127
01128 i = m_mBuffer.find(id);
01129 if (i == m_mBuffer.end())
01130 {
01131 packet.setLength(-1);
01132 return -1;
01133 }
01134 }
01135
01136
01137 CPacket* newpkt = i->second.front();
01138
01139 if (packet.getLength() < newpkt->getLength())
01140 {
01141 packet.setLength(-1);
01142 return -1;
01143 }
01144
01145
01146 memcpy(packet.m_nHeader, newpkt->m_nHeader, CPacket::m_iPktHdrSize);
01147 memcpy(packet.m_pcData, newpkt->m_pcData, newpkt->getLength());
01148 packet.setLength(newpkt->getLength());
01149
01150 delete [] newpkt->m_pcData;
01151 delete newpkt;
01152
01153
01154
01155 i->second.pop();
01156 if (i->second.empty())
01157 m_mBuffer.erase(i);
01158
01159 return packet.getLength();
01160 }
01161
01162 int CRcvQueue::setListener(CUDT* u)
01163 {
01164 CGuard lslock(m_LSLock);
01165
01166 if (NULL != m_pListener)
01167 return -1;
01168
01169 m_pListener = u;
01170 return 0;
01171 }
01172
01173 void CRcvQueue::removeListener(const CUDT* u)
01174 {
01175 CGuard lslock(m_LSLock);
01176
01177 if (u == m_pListener)
01178 m_pListener = NULL;
01179 }
01180
01181 void CRcvQueue::registerConnector(const UDTSOCKET& id, CUDT* u, int ipv, const sockaddr* addr, uint64_t ttl)
01182 {
01183 m_pRendezvousQueue->insert(id, u, ipv, addr, ttl);
01184 }
01185
01186 void CRcvQueue::removeConnector(const UDTSOCKET& id)
01187 {
01188 m_pRendezvousQueue->remove(id);
01189
01190 CGuard bufferlock(m_PassLock);
01191
01192 map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
01193 if (i != m_mBuffer.end())
01194 {
01195 while (!i->second.empty())
01196 {
01197 delete [] i->second.front()->m_pcData;
01198 delete i->second.front();
01199 i->second.pop();
01200 }
01201 m_mBuffer.erase(i);
01202 }
01203 }
01204
01205 void CRcvQueue::setNewEntry(CUDT* u)
01206 {
01207 CGuard listguard(m_IDLock);
01208 m_vNewEntry.push_back(u);
01209 }
01210
01211 bool CRcvQueue::ifNewEntry()
01212 {
01213 return !(m_vNewEntry.empty());
01214 }
01215
01216 CUDT* CRcvQueue::getNewEntry()
01217 {
01218 CGuard listguard(m_IDLock);
01219
01220 if (m_vNewEntry.empty())
01221 return NULL;
01222
01223 CUDT* u = (CUDT*)*(m_vNewEntry.begin());
01224 m_vNewEntry.erase(m_vNewEntry.begin());
01225
01226 return u;
01227 }
01228
01229 void CRcvQueue::storePkt(int32_t id, CPacket* pkt)
01230 {
01231 CGuard bufferlock(m_PassLock);
01232
01233 map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
01234
01235 if (i == m_mBuffer.end())
01236 {
01237 m_mBuffer[id].push(pkt);
01238
01239 #ifndef WIN32
01240 pthread_cond_signal(&m_PassCond);
01241 #else
01242 SetEvent(m_PassCond);
01243 #endif
01244 }
01245 else
01246 {
01247
01248 if (i->second.size() > 16)
01249 return;
01250
01251 i->second.push(pkt);
01252 }
01253 }