queue.cpp

00001 /*****************************************************************************
00002 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
00003 All rights reserved.
00004 
00005 Redistribution and use in source and binary forms, with or without
00006 modification, are permitted provided that the following conditions are
00007 met:
00008 
00009 * Redistributions of source code must retain the above
00010   copyright notice, this list of conditions and the
00011   following disclaimer.
00012 
00013 * Redistributions in binary form must reproduce the
00014   above copyright notice, this list of conditions
00015   and the following disclaimer in the documentation
00016   and/or other materials provided with the distribution.
00017 
00018 * Neither the name of the University of Illinois
00019   nor the names of its contributors may be used to
00020   endorse or promote products derived from this
00021   software without specific prior written permission.
00022 
00023 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00024 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
00025 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
00026 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
00027 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00028 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00029 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00030 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00031 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00032 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00033 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00034 *****************************************************************************/
00035 
00036 /*****************************************************************************
00037 written by
00038    Yunhong Gu, last updated 05/05/2011
00039 *****************************************************************************/
00040 
00041 #ifdef WIN32
00042    #include <winsock2.h>
00043    #include <ws2tcpip.h>
00044    #ifdef LEGACY_WIN32
00045       #include <wspiapi.h>
00046    #endif
00047 #endif
00048 #include <cstring>
00049 
00050 #include "common.h"
00051 #include "core.h"
00052 #include "queue.h"
00053 
00054 using namespace std;
00055 
00056 CUnitQueue::CUnitQueue():
00057 m_pQEntry(NULL),
00058 m_pCurrQueue(NULL),
00059 m_pLastQueue(NULL),
00060 m_iSize(0),
00061 m_iCount(0),
00062 m_iMSS(),
00063 m_iIPversion()
00064 {
00065 }
00066 
00067 CUnitQueue::~CUnitQueue()
00068 {
00069    CQEntry* p = m_pQEntry;
00070 
00071    while (p != NULL)
00072    {
00073       delete [] p->m_pUnit;
00074       delete [] p->m_pBuffer;
00075 
00076       CQEntry* q = p;
00077       if (p == m_pLastQueue)
00078          p = NULL;
00079       else
00080          p = p->m_pNext;
00081       delete q;
00082    }
00083 }
00084 
00085 int CUnitQueue::init(int size, int mss, int version)
00086 {
00087    CQEntry* tempq = NULL;
00088    CUnit* tempu = NULL;
00089    char* tempb = NULL;
00090 
00091    try
00092    {
00093       tempq = new CQEntry;
00094       tempu = new CUnit [size];
00095       tempb = new char [size * mss];
00096    }
00097    catch (...)
00098    {
00099       delete tempq;
00100       delete [] tempu;
00101       delete [] tempb;
00102 
00103       return -1;
00104    }
00105 
00106    for (int i = 0; i < size; ++ i)
00107    {
00108       tempu[i].m_iFlag = 0;
00109       tempu[i].m_Packet.m_pcData = tempb + i * mss;
00110    }
00111    tempq->m_pUnit = tempu;
00112    tempq->m_pBuffer = tempb;
00113    tempq->m_iSize = size;
00114 
00115    m_pQEntry = m_pCurrQueue = m_pLastQueue = tempq;
00116    m_pQEntry->m_pNext = m_pQEntry;
00117 
00118    m_pAvailUnit = m_pCurrQueue->m_pUnit;
00119 
00120    m_iSize = size;
00121    m_iMSS = mss;
00122    m_iIPversion = version;
00123 
00124    return 0;
00125 }
00126 
00127 int CUnitQueue::increase()
00128 {
00129    // adjust/correct m_iCount
00130    int real_count = 0;
00131    CQEntry* p = m_pQEntry;
00132    while (p != NULL)
00133    {
00134       CUnit* u = p->m_pUnit;
00135       for (CUnit* end = u + p->m_iSize; u != end; ++ u)
00136          if (u->m_iFlag != 0)
00137             ++ real_count;
00138 
00139       if (p == m_pLastQueue)
00140          p = NULL;
00141       else
00142          p = p->m_pNext;
00143    }
00144    m_iCount = real_count;
00145    if (double(m_iCount) / m_iSize < 0.9)
00146       return -1;
00147 
00148    CQEntry* tempq = NULL;
00149    CUnit* tempu = NULL;
00150    char* tempb = NULL;
00151 
00152    // all queues have the same size
00153    int size = m_pQEntry->m_iSize;
00154 
00155    try
00156    {
00157       tempq = new CQEntry;
00158       tempu = new CUnit [size];
00159       tempb = new char [size * m_iMSS];
00160    }
00161    catch (...)
00162    {
00163       delete tempq;
00164       delete [] tempu;
00165       delete [] tempb;
00166 
00167       return -1;
00168    }
00169 
00170    for (int i = 0; i < size; ++ i)
00171    {
00172       tempu[i].m_iFlag = 0;
00173       tempu[i].m_Packet.m_pcData = tempb + i * m_iMSS;
00174    }
00175    tempq->m_pUnit = tempu;
00176    tempq->m_pBuffer = tempb;
00177    tempq->m_iSize = size;
00178 
00179    m_pLastQueue->m_pNext = tempq;
00180    m_pLastQueue = tempq;
00181    m_pLastQueue->m_pNext = m_pQEntry;
00182 
00183    m_iSize += size;
00184 
00185    return 0;
00186 }
00187 
00188 int CUnitQueue::shrink()
00189 {
00190    // currently queue cannot be shrunk.
00191    return -1;
00192 }
00193 
00194 CUnit* CUnitQueue::getNextAvailUnit()
00195 {
00196    if (m_iCount * 10 > m_iSize * 9)
00197       increase();
00198 
00199    if (m_iCount >= m_iSize)
00200       return NULL;
00201 
00202    CQEntry* entrance = m_pCurrQueue;
00203 
00204    do
00205    {
00206       for (CUnit* sentinel = m_pCurrQueue->m_pUnit + m_pCurrQueue->m_iSize - 1; m_pAvailUnit != sentinel; ++ m_pAvailUnit)
00207          if (m_pAvailUnit->m_iFlag == 0)
00208             return m_pAvailUnit;
00209 
00210       if (m_pCurrQueue->m_pUnit->m_iFlag == 0)
00211       {
00212          m_pAvailUnit = m_pCurrQueue->m_pUnit;
00213          return m_pAvailUnit;
00214       }
00215 
00216       m_pCurrQueue = m_pCurrQueue->m_pNext;
00217       m_pAvailUnit = m_pCurrQueue->m_pUnit;
00218    } while (m_pCurrQueue != entrance);
00219 
00220    increase();
00221 
00222    return NULL;
00223 }
00224 
00225 
00226 CSndUList::CSndUList():
00227 m_pHeap(NULL),
00228 m_iArrayLength(4096),
00229 m_iLastEntry(-1),
00230 m_ListLock(),
00231 m_pWindowLock(NULL),
00232 m_pWindowCond(NULL),
00233 m_pTimer(NULL)
00234 {
00235    m_pHeap = new CSNode*[m_iArrayLength];
00236 
00237    #ifndef WIN32
00238       pthread_mutex_init(&m_ListLock, NULL);
00239    #else
00240       m_ListLock = CreateMutex(NULL, false, NULL);
00241    #endif
00242 }
00243 
00244 CSndUList::~CSndUList()
00245 {
00246    delete [] m_pHeap;
00247 
00248    #ifndef WIN32
00249       pthread_mutex_destroy(&m_ListLock);
00250    #else
00251       CloseHandle(m_ListLock);
00252    #endif
00253 }
00254 
00255 void CSndUList::insert(int64_t ts, const CUDT* u)
00256 {
00257    CGuard listguard(m_ListLock);
00258 
00259    // increase the heap array size if necessary
00260    if (m_iLastEntry == m_iArrayLength - 1)
00261    {
00262       CSNode** temp = NULL;
00263 
00264       try
00265       {
00266          temp = new CSNode*[m_iArrayLength * 2];
00267       }
00268       catch(...)
00269       {
00270          return;
00271       }
00272 
00273       memcpy(temp, m_pHeap, sizeof(CSNode*) * m_iArrayLength);
00274       m_iArrayLength *= 2;
00275       delete [] m_pHeap;
00276       m_pHeap = temp;
00277    }
00278 
00279    insert_(ts, u);
00280 }
00281 
00282 void CSndUList::update(const CUDT* u, bool reschedule)
00283 {
00284    CGuard listguard(m_ListLock);
00285 
00286    CSNode* n = u->m_pSNode;
00287 
00288    if (n->m_iHeapLoc >= 0)
00289    {
00290       if (!reschedule)
00291          return;
00292 
00293       if (n->m_iHeapLoc == 0)
00294       {
00295          n->m_llTimeStamp = 1;
00296          m_pTimer->interrupt();
00297          return;
00298       }
00299 
00300       remove_(u);
00301    }
00302 
00303    insert_(1, u);
00304 }
00305 
00306 int CSndUList::pop(sockaddr*& addr, CPacket& pkt)
00307 {
00308    CGuard listguard(m_ListLock);
00309 
00310    if (-1 == m_iLastEntry)
00311       return -1;
00312 
00313    // no pop until the next schedulled time
00314    uint64_t ts;
00315    CTimer::rdtsc(ts);
00316    if (ts < m_pHeap[0]->m_llTimeStamp)
00317       return -1;
00318 
00319    CUDT* u = m_pHeap[0]->m_pUDT;
00320    remove_(u);
00321 
00322    if (!u->m_bConnected || u->m_bBroken)
00323       return -1;
00324 
00325    // pack a packet from the socket
00326    if (u->packData(pkt, ts) <= 0)
00327       return -1;
00328 
00329    addr = u->m_pPeerAddr;
00330 
00331    // insert a new entry, ts is the next processing time
00332    if (ts > 0)
00333       insert_(ts, u);
00334 
00335    return 1;
00336 }
00337 
00338 void CSndUList::remove(const CUDT* u)
00339 {
00340    CGuard listguard(m_ListLock);
00341 
00342    remove_(u);
00343 }
00344 
00345 uint64_t CSndUList::getNextProcTime()
00346 {
00347    CGuard listguard(m_ListLock);
00348 
00349    if (-1 == m_iLastEntry)
00350       return 0;
00351 
00352    return m_pHeap[0]->m_llTimeStamp;
00353 }
00354 
00355 void CSndUList::insert_(int64_t ts, const CUDT* u)
00356 {
00357    CSNode* n = u->m_pSNode;
00358 
00359    // do not insert repeated node
00360    if (n->m_iHeapLoc >= 0)
00361       return;
00362 
00363    m_iLastEntry ++;
00364    m_pHeap[m_iLastEntry] = n;
00365    n->m_llTimeStamp = ts;
00366 
00367    int q = m_iLastEntry;
00368    int p = q;
00369    while (p != 0)
00370    {
00371       p = (q - 1) >> 1;
00372       if (m_pHeap[p]->m_llTimeStamp > m_pHeap[q]->m_llTimeStamp)
00373       {
00374          CSNode* t = m_pHeap[p];
00375          m_pHeap[p] = m_pHeap[q];
00376          m_pHeap[q] = t;
00377          t->m_iHeapLoc = q;
00378          q = p;
00379       }
00380       else
00381          break;
00382    }
00383 
00384    n->m_iHeapLoc = q;
00385 
00386    // an earlier event has been inserted, wake up sending worker
00387    if (n->m_iHeapLoc == 0)
00388       m_pTimer->interrupt();
00389 
00390    // first entry, activate the sending queue
00391    if (0 == m_iLastEntry)
00392    {
00393       #ifndef WIN32
00394          pthread_mutex_lock(m_pWindowLock);
00395          pthread_cond_signal(m_pWindowCond);
00396          pthread_mutex_unlock(m_pWindowLock);
00397       #else
00398          SetEvent(*m_pWindowCond);
00399       #endif
00400    }
00401 }
00402 
00403 void CSndUList::remove_(const CUDT* u)
00404 {
00405    CSNode* n = u->m_pSNode;
00406 
00407    if (n->m_iHeapLoc >= 0)
00408    {
00409       // remove the node from heap
00410       m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry];
00411       m_iLastEntry --;
00412       m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc;
00413 
00414       int q = n->m_iHeapLoc;
00415       int p = q * 2 + 1;
00416       while (p <= m_iLastEntry)
00417       {
00418          if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_llTimeStamp > m_pHeap[p + 1]->m_llTimeStamp))
00419             p ++;
00420 
00421          if (m_pHeap[q]->m_llTimeStamp > m_pHeap[p]->m_llTimeStamp)
00422          {
00423             CSNode* t = m_pHeap[p];
00424             m_pHeap[p] = m_pHeap[q];
00425             m_pHeap[p]->m_iHeapLoc = p;
00426             m_pHeap[q] = t;
00427             m_pHeap[q]->m_iHeapLoc = q;
00428 
00429             q = p;
00430             p = q * 2 + 1;
00431          }
00432          else
00433             break;
00434       }
00435 
00436       n->m_iHeapLoc = -1;
00437    }
00438 
00439    // the only event has been deleted, wake up immediately
00440    if (0 == m_iLastEntry)
00441       m_pTimer->interrupt();
00442 }
00443 
00444 //
00445 CSndQueue::CSndQueue():
00446 m_WorkerThread(),
00447 m_pSndUList(NULL),
00448 m_pChannel(NULL),
00449 m_pTimer(NULL),
00450 m_WindowLock(),
00451 m_WindowCond(),
00452 m_bClosing(false),
00453 m_ExitCond()
00454 {
00455    #ifndef WIN32
00456       pthread_cond_init(&m_WindowCond, NULL);
00457       pthread_mutex_init(&m_WindowLock, NULL);
00458    #else
00459       m_WindowLock = CreateMutex(NULL, false, NULL);
00460       m_WindowCond = CreateEvent(NULL, false, false, NULL);
00461       m_ExitCond = CreateEvent(NULL, false, false, NULL);
00462    #endif
00463 }
00464 
00465 CSndQueue::~CSndQueue()
00466 {
00467    m_bClosing = true;
00468 
00469    #ifndef WIN32
00470       pthread_mutex_lock(&m_WindowLock);
00471       pthread_cond_signal(&m_WindowCond);
00472       pthread_mutex_unlock(&m_WindowLock);
00473       if (0 != m_WorkerThread)
00474          pthread_join(m_WorkerThread, NULL);
00475       pthread_cond_destroy(&m_WindowCond);
00476       pthread_mutex_destroy(&m_WindowLock);
00477    #else
00478       SetEvent(m_WindowCond);
00479       if (NULL != m_WorkerThread)
00480          WaitForSingleObject(m_ExitCond, INFINITE);
00481       CloseHandle(m_WorkerThread);
00482       CloseHandle(m_WindowLock);
00483       CloseHandle(m_WindowCond);
00484       CloseHandle(m_ExitCond);
00485    #endif
00486 
00487    delete m_pSndUList;
00488 }
00489 
00490 void CSndQueue::init(CChannel* c, CTimer* t)
00491 {
00492    m_pChannel = c;
00493    m_pTimer = t;
00494    m_pSndUList = new CSndUList;
00495    m_pSndUList->m_pWindowLock = &m_WindowLock;
00496    m_pSndUList->m_pWindowCond = &m_WindowCond;
00497    m_pSndUList->m_pTimer = m_pTimer;
00498 
00499    #ifndef WIN32
00500       if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this))
00501       {
00502          m_WorkerThread = 0;
00503          throw CUDTException(3, 1);
00504       }
00505    #else
00506       DWORD threadID;
00507       m_WorkerThread = CreateThread(NULL, 0, CSndQueue::worker, this, 0, &threadID);
00508       if (NULL == m_WorkerThread)
00509          throw CUDTException(3, 1);
00510    #endif
00511 }
00512 
00513 #ifndef WIN32
00514    void* CSndQueue::worker(void* param)
00515 #else
00516    DWORD WINAPI CSndQueue::worker(LPVOID param)
00517 #endif
00518 {
00519    CSndQueue* self = (CSndQueue*)param;
00520 
00521    while (!self->m_bClosing)
00522    {
00523       uint64_t ts = self->m_pSndUList->getNextProcTime();
00524 
00525       if (ts > 0)
00526       {
00527          // wait until next processing time of the first socket on the list
00528          uint64_t currtime;
00529          CTimer::rdtsc(currtime);
00530          if (currtime < ts)
00531             self->m_pTimer->sleepto(ts);
00532 
00533          // it is time to send the next pkt
00534          sockaddr* addr;
00535          CPacket pkt;
00536          if (self->m_pSndUList->pop(addr, pkt) < 0)
00537             continue;
00538 
00539          self->m_pChannel->sendto(addr, pkt);
00540       }
00541       else
00542       {
00543          // wait here if there is no sockets with data to be sent
00544          #ifndef WIN32
00545             pthread_mutex_lock(&self->m_WindowLock);
00546             if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0))
00547                pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock);
00548             pthread_mutex_unlock(&self->m_WindowLock);
00549          #else
00550             WaitForSingleObject(self->m_WindowCond, INFINITE);
00551          #endif
00552       }
00553    }
00554 
00555    #ifndef WIN32
00556       return NULL;
00557    #else
00558       SetEvent(self->m_ExitCond);
00559       return 0;
00560    #endif
00561 }
00562 
00563 int CSndQueue::sendto(const sockaddr* addr, CPacket& packet)
00564 {
00565    // send out the packet immediately (high priority), this is a control packet
00566    m_pChannel->sendto(addr, packet);
00567    return packet.getLength();
00568 }
00569 
00570 
00571 //
00572 CRcvUList::CRcvUList():
00573 m_pUList(NULL),
00574 m_pLast(NULL)
00575 {
00576 }
00577 
00578 CRcvUList::~CRcvUList()
00579 {
00580 }
00581 
00582 void CRcvUList::insert(const CUDT* u)
00583 {
00584    CRNode* n = u->m_pRNode;
00585    CTimer::rdtsc(n->m_llTimeStamp);
00586 
00587    if (NULL == m_pUList)
00588    {
00589       // empty list, insert as the single node
00590       n->m_pPrev = n->m_pNext = NULL;
00591       m_pLast = m_pUList = n;
00592 
00593       return;
00594    }
00595 
00596    // always insert at the end for RcvUList
00597    n->m_pPrev = m_pLast;
00598    n->m_pNext = NULL;
00599    m_pLast->m_pNext = n;
00600    m_pLast = n;
00601 }
00602 
00603 void CRcvUList::remove(const CUDT* u)
00604 {
00605    CRNode* n = u->m_pRNode;
00606 
00607    if (!n->m_bOnList)
00608       return;
00609 
00610    if (NULL == n->m_pPrev)
00611    {
00612       // n is the first node
00613       m_pUList = n->m_pNext;
00614       if (NULL == m_pUList)
00615          m_pLast = NULL;
00616       else
00617          m_pUList->m_pPrev = NULL;
00618    }
00619    else
00620    {
00621       n->m_pPrev->m_pNext = n->m_pNext;
00622       if (NULL == n->m_pNext)
00623       {
00624          // n is the last node
00625          m_pLast = n->m_pPrev;
00626       }
00627       else
00628          n->m_pNext->m_pPrev = n->m_pPrev;
00629    }
00630 
00631    n->m_pNext = n->m_pPrev = NULL;
00632 }
00633 
00634 void CRcvUList::update(const CUDT* u)
00635 {
00636    CRNode* n = u->m_pRNode;
00637 
00638    if (!n->m_bOnList)
00639       return;
00640 
00641    CTimer::rdtsc(n->m_llTimeStamp);
00642 
00643    // if n is the last node, do not need to change
00644    if (NULL == n->m_pNext)
00645       return;
00646 
00647    if (NULL == n->m_pPrev)
00648    {
00649       m_pUList = n->m_pNext;
00650       m_pUList->m_pPrev = NULL;
00651    }
00652    else
00653    {
00654       n->m_pPrev->m_pNext = n->m_pNext;
00655       n->m_pNext->m_pPrev = n->m_pPrev;
00656    }
00657 
00658    n->m_pPrev = m_pLast;
00659    n->m_pNext = NULL;
00660    m_pLast->m_pNext = n;
00661    m_pLast = n;
00662 }
00663 
00664 //
00665 CHash::CHash():
00666 m_pBucket(NULL),
00667 m_iHashSize(0)
00668 {
00669 }
00670 
00671 CHash::~CHash()
00672 {
00673    for (int i = 0; i < m_iHashSize; ++ i)
00674    {
00675       CBucket* b = m_pBucket[i];
00676       while (NULL != b)
00677       {
00678          CBucket* n = b->m_pNext;
00679          delete b;
00680          b = n;
00681       }
00682    }
00683 
00684    delete [] m_pBucket;
00685 }
00686 
00687 void CHash::init(int size)
00688 {
00689    m_pBucket = new CBucket* [size];
00690 
00691    for (int i = 0; i < size; ++ i)
00692       m_pBucket[i] = NULL;
00693 
00694    m_iHashSize = size;
00695 }
00696 
00697 CUDT* CHash::lookup(int32_t id)
00698 {
00699    // simple hash function (% hash table size); suitable for socket descriptors
00700    CBucket* b = m_pBucket[id % m_iHashSize];
00701 
00702    while (NULL != b)
00703    {
00704       if (id == b->m_iID)
00705          return b->m_pUDT;
00706       b = b->m_pNext;
00707    }
00708 
00709    return NULL;
00710 }
00711 
00712 void CHash::insert(int32_t id, CUDT* u)
00713 {
00714    CBucket* b = m_pBucket[id % m_iHashSize];
00715 
00716    CBucket* n = new CBucket;
00717    n->m_iID = id;
00718    n->m_pUDT = u;
00719    n->m_pNext = b;
00720 
00721    m_pBucket[id % m_iHashSize] = n;
00722 }
00723 
00724 void CHash::remove(int32_t id)
00725 {
00726    CBucket* b = m_pBucket[id % m_iHashSize];
00727    CBucket* p = NULL;
00728 
00729    while (NULL != b)
00730    {
00731       if (id == b->m_iID)
00732       {
00733          if (NULL == p)
00734             m_pBucket[id % m_iHashSize] = b->m_pNext;
00735          else
00736             p->m_pNext = b->m_pNext;
00737 
00738          delete b;
00739 
00740          return;
00741       }
00742 
00743       p = b;
00744       b = b->m_pNext;
00745    }
00746 }
00747 
00748 
00749 //
00750 CRendezvousQueue::CRendezvousQueue():
00751 m_lRendezvousID(),
00752 m_RIDVectorLock()
00753 {
00754    #ifndef WIN32
00755       pthread_mutex_init(&m_RIDVectorLock, NULL);
00756    #else
00757       m_RIDVectorLock = CreateMutex(NULL, false, NULL);
00758    #endif
00759 }
00760 
00761 CRendezvousQueue::~CRendezvousQueue()
00762 {
00763    #ifndef WIN32
00764       pthread_mutex_destroy(&m_RIDVectorLock);
00765    #else
00766       CloseHandle(m_RIDVectorLock);
00767    #endif
00768 
00769    for (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++ i)
00770    {
00771       if (AF_INET == i->m_iIPversion)
00772          delete (sockaddr_in*)i->m_pPeerAddr;
00773       else
00774          delete (sockaddr_in6*)i->m_pPeerAddr;
00775    }
00776 
00777    m_lRendezvousID.clear();
00778 }
00779 
00780 void CRendezvousQueue::insert(const UDTSOCKET& id, CUDT* u, int ipv, const sockaddr* addr, uint64_t ttl)
00781 {
00782    CGuard vg(m_RIDVectorLock);
00783 
00784    CRL r;
00785    r.m_iID = id;
00786    r.m_pUDT = u;
00787    r.m_iIPversion = ipv;
00788    r.m_pPeerAddr = (AF_INET == ipv) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6;
00789    memcpy(r.m_pPeerAddr, addr, (AF_INET == ipv) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));
00790    r.m_ullTTL = ttl;
00791 
00792    m_lRendezvousID.push_back(r);
00793 }
00794 
00795 void CRendezvousQueue::remove(const UDTSOCKET& id)
00796 {
00797    CGuard vg(m_RIDVectorLock);
00798 
00799    for (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++ i)
00800    {
00801       if (i->m_iID == id)
00802       {
00803          if (AF_INET == i->m_iIPversion)
00804             delete (sockaddr_in*)i->m_pPeerAddr;
00805          else
00806             delete (sockaddr_in6*)i->m_pPeerAddr;
00807 
00808          m_lRendezvousID.erase(i);
00809 
00810          return;
00811       }
00812    }
00813 }
00814 
00815 CUDT* CRendezvousQueue::retrieve(const sockaddr* addr, UDTSOCKET& id)
00816 {
00817    CGuard vg(m_RIDVectorLock);
00818 
00819    // TODO: optimize search
00820    for (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++ i)
00821    {
00822       if (CIPAddress::ipcmp(addr, i->m_pPeerAddr, i->m_iIPversion) && ((0 == id) || (id == i->m_iID)))
00823       {
00824          id = i->m_iID;
00825          return i->m_pUDT;
00826       }
00827    }
00828 
00829    return NULL;
00830 }
00831 
00832 void CRendezvousQueue::updateConnStatus()
00833 {
00834    if (m_lRendezvousID.empty())
00835       return;
00836 
00837    CGuard vg(m_RIDVectorLock);
00838 
00839    for (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++ i)
00840    {
00841       // avoid sending too many requests, at most 1 request per 250ms
00842       if (CTimer::getTime() - i->m_pUDT->m_llLastReqTime > 250000)
00843       {
00844          if (CTimer::getTime() >= i->m_ullTTL)
00845          {
00846             // connection timer expired, acknowledge app via epoll
00847             i->m_pUDT->m_bConnecting = false;
00848             CUDT::s_UDTUnited.m_EPoll.update_events(i->m_iID, i->m_pUDT->m_sPollID, UDT_EPOLL_ERR, true);
00849             continue;
00850          }
00851 
00852          CPacket request;
00853          char* reqdata = new char [i->m_pUDT->m_iPayloadSize];
00854          request.pack(0, NULL, reqdata, i->m_pUDT->m_iPayloadSize);
00855          // ID = 0, connection request
00856          request.m_iID = !i->m_pUDT->m_bRendezvous ? 0 : i->m_pUDT->m_ConnRes.m_iID;
00857          int hs_size = i->m_pUDT->m_iPayloadSize;
00858          i->m_pUDT->m_ConnReq.serialize(reqdata, hs_size);
00859          request.setLength(hs_size);
00860          i->m_pUDT->m_pSndQueue->sendto(i->m_pPeerAddr, request);
00861          i->m_pUDT->m_llLastReqTime = CTimer::getTime();
00862          delete [] reqdata;
00863       }
00864    }
00865 }
00866 
00867 //
00868 CRcvQueue::CRcvQueue():
00869 m_WorkerThread(),
00870 m_UnitQueue(),
00871 m_pRcvUList(NULL),
00872 m_pHash(NULL),
00873 m_pChannel(NULL),
00874 m_pTimer(NULL),
00875 m_iPayloadSize(),
00876 m_bClosing(false),
00877 m_ExitCond(),
00878 m_LSLock(),
00879 m_pListener(NULL),
00880 m_pRendezvousQueue(NULL),
00881 m_vNewEntry(),
00882 m_IDLock(),
00883 m_mBuffer(),
00884 m_PassLock(),
00885 m_PassCond()
00886 {
00887    #ifndef WIN32
00888       pthread_mutex_init(&m_PassLock, NULL);
00889       pthread_cond_init(&m_PassCond, NULL);
00890       pthread_mutex_init(&m_LSLock, NULL);
00891       pthread_mutex_init(&m_IDLock, NULL);
00892    #else
00893       m_PassLock = CreateMutex(NULL, false, NULL);
00894       m_PassCond = CreateEvent(NULL, false, false, NULL);
00895       m_LSLock = CreateMutex(NULL, false, NULL);
00896       m_IDLock = CreateMutex(NULL, false, NULL);
00897       m_ExitCond = CreateEvent(NULL, false, false, NULL);
00898    #endif
00899 }
00900 
00901 CRcvQueue::~CRcvQueue()
00902 {
00903    m_bClosing = true;
00904 
00905    #ifndef WIN32
00906       if (0 != m_WorkerThread)
00907          pthread_join(m_WorkerThread, NULL);
00908       pthread_mutex_destroy(&m_PassLock);
00909       pthread_cond_destroy(&m_PassCond);
00910       pthread_mutex_destroy(&m_LSLock);
00911       pthread_mutex_destroy(&m_IDLock);
00912    #else
00913       if (NULL != m_WorkerThread)
00914          WaitForSingleObject(m_ExitCond, INFINITE);
00915       CloseHandle(m_WorkerThread);
00916       CloseHandle(m_PassLock);
00917       CloseHandle(m_PassCond);
00918       CloseHandle(m_LSLock);
00919       CloseHandle(m_IDLock);
00920       CloseHandle(m_ExitCond);
00921    #endif
00922 
00923    delete m_pRcvUList;
00924    delete m_pHash;
00925    delete m_pRendezvousQueue;
00926 
00927    // remove all queued messages
00928    for (map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.begin(); i != m_mBuffer.end(); ++ i)
00929    {
00930       while (!i->second.empty())
00931       {
00932          CPacket* pkt = i->second.front();
00933          delete [] pkt->m_pcData;
00934          delete pkt;
00935          i->second.pop();
00936       }
00937    }
00938 }
00939 
00940 void CRcvQueue::init(int qsize, int payload, int version, int hsize, CChannel* cc, CTimer* t)
00941 {
00942    m_iPayloadSize = payload;
00943 
00944    m_UnitQueue.init(qsize, payload, version);
00945 
00946    m_pHash = new CHash;
00947    m_pHash->init(hsize);
00948 
00949    m_pChannel = cc;
00950    m_pTimer = t;
00951 
00952    m_pRcvUList = new CRcvUList;
00953    m_pRendezvousQueue = new CRendezvousQueue;
00954 
00955    #ifndef WIN32
00956       if (0 != pthread_create(&m_WorkerThread, NULL, CRcvQueue::worker, this))
00957       {
00958          m_WorkerThread = 0;
00959          throw CUDTException(3, 1);
00960       }
00961    #else
00962       DWORD threadID;
00963       m_WorkerThread = CreateThread(NULL, 0, CRcvQueue::worker, this, 0, &threadID);
00964       if (NULL == m_WorkerThread)
00965          throw CUDTException(3, 1);
00966    #endif
00967 }
00968 
00969 #ifndef WIN32
00970    void* CRcvQueue::worker(void* param)
00971 #else
00972    DWORD WINAPI CRcvQueue::worker(LPVOID param)
00973 #endif
00974 {
00975    CRcvQueue* self = (CRcvQueue*)param;
00976 
00977    sockaddr* addr = (AF_INET == self->m_UnitQueue.m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;
00978    CUDT* u = NULL;
00979    int32_t id;
00980 
00981    while (!self->m_bClosing)
00982    {
00983       #ifdef NO_BUSY_WAITING
00984          self->m_pTimer->tick();
00985       #endif
00986 
00987       // check waiting list, if new socket, insert it to the list
00988       while (self->ifNewEntry())
00989       {
00990          CUDT* ne = self->getNewEntry();
00991          if (NULL != ne)
00992          {
00993             self->m_pRcvUList->insert(ne);
00994             self->m_pHash->insert(ne->m_SocketID, ne);
00995          }
00996       }
00997 
00998       // find next available slot for incoming packet
00999       CUnit* unit = self->m_UnitQueue.getNextAvailUnit();
01000       if (NULL == unit)
01001       {
01002          // no space, skip this packet
01003          CPacket temp;
01004          temp.m_pcData = new char[self->m_iPayloadSize];
01005          temp.setLength(self->m_iPayloadSize);
01006          self->m_pChannel->recvfrom(addr, temp);
01007          delete [] temp.m_pcData;
01008          goto TIMER_CHECK;
01009       }
01010 
01011       unit->m_Packet.setLength(self->m_iPayloadSize);
01012 
01013       // reading next incoming packet, recvfrom returns -1 is nothing has been received
01014       if (self->m_pChannel->recvfrom(addr, unit->m_Packet) < 0)
01015          goto TIMER_CHECK;
01016 
01017       id = unit->m_Packet.m_iID;
01018 
01019       // ID 0 is for connection request, which should be passed to the listening socket or rendezvous sockets
01020       if (0 == id)
01021       {
01022          if (NULL != self->m_pListener)
01023             self->m_pListener->listen(addr, unit->m_Packet);
01024          else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id)))
01025          {
01026             // asynchronous connect: call connect here
01027             // otherwise wait for the UDT socket to retrieve this packet
01028             if (!u->m_bSynRecving)
01029                u->connect(unit->m_Packet);
01030             else
01031                self->storePkt(id, unit->m_Packet.clone());
01032          }
01033       }
01034       else if (id > 0)
01035       {
01036          if (NULL != (u = self->m_pHash->lookup(id)))
01037          {
01038             if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion))
01039             {
01040                if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing)
01041                {
01042                   if (0 == unit->m_Packet.getFlag())
01043                      u->processData(unit);
01044                   else
01045                      u->processCtrl(unit->m_Packet);
01046 
01047                   u->checkTimers();
01048                   self->m_pRcvUList->update(u);
01049                }
01050             }
01051          }
01052          else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id)))
01053          {
01054             if (!u->m_bSynRecving)
01055                u->connect(unit->m_Packet);
01056             else
01057                self->storePkt(id, unit->m_Packet.clone());
01058          }
01059       }
01060 
01061 TIMER_CHECK:
01062       // take care of the timing event for all UDT sockets
01063 
01064       uint64_t currtime;
01065       CTimer::rdtsc(currtime);
01066 
01067       CRNode* ul = self->m_pRcvUList->m_pUList;
01068       uint64_t ctime = currtime - 100000 * CTimer::getCPUFrequency();
01069       while ((NULL != ul) && (ul->m_llTimeStamp < ctime))
01070       {
01071          CUDT* u = ul->m_pUDT;
01072 
01073          if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing)
01074          {
01075             u->checkTimers();
01076             self->m_pRcvUList->update(u);
01077          }
01078          else
01079          {
01080             // the socket must be removed from Hash table first, then RcvUList
01081             self->m_pHash->remove(u->m_SocketID);
01082             self->m_pRcvUList->remove(u);
01083             u->m_pRNode->m_bOnList = false;
01084          }
01085 
01086          ul = self->m_pRcvUList->m_pUList;
01087       }
01088 
01089       // Check connection requests status for all sockets in the RendezvousQueue.
01090       self->m_pRendezvousQueue->updateConnStatus();
01091    }
01092 
01093    if (AF_INET == self->m_UnitQueue.m_iIPversion)
01094       delete (sockaddr_in*)addr;
01095    else
01096       delete (sockaddr_in6*)addr;
01097 
01098    #ifndef WIN32
01099       return NULL;
01100    #else
01101       SetEvent(self->m_ExitCond);
01102       return 0;
01103    #endif
01104 }
01105 
01106 int CRcvQueue::recvfrom(int32_t id, CPacket& packet)
01107 {
01108    CGuard bufferlock(m_PassLock);
01109 
01110    map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
01111 
01112    if (i == m_mBuffer.end())
01113    {
01114       #ifndef WIN32
01115          uint64_t now = CTimer::getTime();
01116          timespec timeout;
01117 
01118          timeout.tv_sec = now / 1000000 + 1;
01119          timeout.tv_nsec = (now % 1000000) * 1000;
01120 
01121          pthread_cond_timedwait(&m_PassCond, &m_PassLock, &timeout);
01122       #else
01123          ReleaseMutex(m_PassLock);
01124          WaitForSingleObject(m_PassCond, 1000);
01125          WaitForSingleObject(m_PassLock, INFINITE);
01126       #endif
01127 
01128       i = m_mBuffer.find(id);
01129       if (i == m_mBuffer.end())
01130       {
01131          packet.setLength(-1);
01132          return -1;
01133       }
01134    }
01135 
01136    // retrieve the earliest packet
01137    CPacket* newpkt = i->second.front();
01138 
01139    if (packet.getLength() < newpkt->getLength())
01140    {
01141       packet.setLength(-1);
01142       return -1;
01143    }
01144 
01145    // copy packet content
01146    memcpy(packet.m_nHeader, newpkt->m_nHeader, CPacket::m_iPktHdrSize);
01147    memcpy(packet.m_pcData, newpkt->m_pcData, newpkt->getLength());
01148    packet.setLength(newpkt->getLength());
01149 
01150    delete [] newpkt->m_pcData;
01151    delete newpkt;
01152 
01153    // remove this message from queue, 
01154    // if no more messages left for this socket, release its data structure
01155    i->second.pop();
01156    if (i->second.empty())
01157       m_mBuffer.erase(i);
01158 
01159    return packet.getLength();
01160 }
01161 
01162 int CRcvQueue::setListener(CUDT* u)
01163 {
01164    CGuard lslock(m_LSLock);
01165 
01166    if (NULL != m_pListener)
01167       return -1;
01168 
01169    m_pListener = u;
01170    return 0;
01171 }
01172 
01173 void CRcvQueue::removeListener(const CUDT* u)
01174 {
01175    CGuard lslock(m_LSLock);
01176 
01177    if (u == m_pListener)
01178       m_pListener = NULL;
01179 }
01180 
01181 void CRcvQueue::registerConnector(const UDTSOCKET& id, CUDT* u, int ipv, const sockaddr* addr, uint64_t ttl)
01182 {
01183    m_pRendezvousQueue->insert(id, u, ipv, addr, ttl);
01184 }
01185 
01186 void CRcvQueue::removeConnector(const UDTSOCKET& id)
01187 {
01188    m_pRendezvousQueue->remove(id);
01189 
01190    CGuard bufferlock(m_PassLock);
01191 
01192    map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
01193    if (i != m_mBuffer.end())
01194    {
01195       while (!i->second.empty())
01196       {
01197          delete [] i->second.front()->m_pcData;
01198          delete i->second.front();
01199          i->second.pop();
01200       }
01201       m_mBuffer.erase(i);
01202    }
01203 }
01204 
01205 void CRcvQueue::setNewEntry(CUDT* u)
01206 {
01207    CGuard listguard(m_IDLock);
01208    m_vNewEntry.push_back(u);
01209 }
01210 
01211 bool CRcvQueue::ifNewEntry()
01212 {
01213    return !(m_vNewEntry.empty());
01214 }
01215 
01216 CUDT* CRcvQueue::getNewEntry()
01217 {
01218    CGuard listguard(m_IDLock);
01219 
01220    if (m_vNewEntry.empty())
01221       return NULL;
01222 
01223    CUDT* u = (CUDT*)*(m_vNewEntry.begin());
01224    m_vNewEntry.erase(m_vNewEntry.begin());
01225 
01226    return u;
01227 }
01228 
01229 void CRcvQueue::storePkt(int32_t id, CPacket* pkt)
01230 {
01231    CGuard bufferlock(m_PassLock);   
01232 
01233    map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
01234 
01235    if (i == m_mBuffer.end())
01236    {
01237       m_mBuffer[id].push(pkt);
01238 
01239       #ifndef WIN32
01240          pthread_cond_signal(&m_PassCond);
01241       #else
01242          SetEvent(m_PassCond);
01243       #endif
01244    }
01245    else
01246    {
01247       //avoid storing too many packets, in case of malfunction or attack
01248       if (i->second.size() > 16)
01249          return;
01250 
01251       i->second.push(pkt);
01252    }
01253 }

Generated on 9 Feb 2013 for barchart-udt-core-2.2.2 by  doxygen 1.6.1