api.cpp

00001 /*****************************************************************************
00002 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
00003 All rights reserved.
00004 
00005 Redistribution and use in source and binary forms, with or without
00006 modification, are permitted provided that the following conditions are
00007 met:
00008 
00009 * Redistributions of source code must retain the above
00010   copyright notice, this list of conditions and the
00011   following disclaimer.
00012 
00013 * Redistributions in binary form must reproduce the
00014   above copyright notice, this list of conditions
00015   and the following disclaimer in the documentation
00016   and/or other materials provided with the distribution.
00017 
00018 * Neither the name of the University of Illinois
00019   nor the names of its contributors may be used to
00020   endorse or promote products derived from this
00021   software without specific prior written permission.
00022 
00023 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00024 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
00025 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
00026 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
00027 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00028 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00029 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00030 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00031 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00032 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00033 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00034 *****************************************************************************/
00035 
00036 /*****************************************************************************
00037 written by
00038    Yunhong Gu, last updated 07/09/2011
00039 *****************************************************************************/
00040 
00041 #ifdef WIN32
00042    #include <winsock2.h>
00043    #include <ws2tcpip.h>
00044    #ifdef LEGACY_WIN32
00045       #include <wspiapi.h>
00046    #endif
00047 #else
00048    #include <unistd.h>
00049 #endif
00050 #include <cstring>
00051 #include "api.h"
00052 #include "core.h"
00053 
00054 using namespace std;
00055 
00056 CUDTSocket::CUDTSocket():
00057 m_Status(INIT),
00058 m_TimeStamp(0),
00059 m_iIPversion(0),
00060 m_pSelfAddr(NULL),
00061 m_pPeerAddr(NULL),
00062 m_SocketID(0),
00063 m_ListenSocket(0),
00064 m_PeerID(0),
00065 m_iISN(0),
00066 m_pUDT(NULL),
00067 m_pQueuedSockets(NULL),
00068 m_pAcceptSockets(NULL),
00069 m_AcceptCond(),
00070 m_AcceptLock(),
00071 m_uiBackLog(0),
00072 m_iMuxID(-1)
00073 {
00074    #ifndef WIN32
00075       pthread_mutex_init(&m_AcceptLock, NULL);
00076       pthread_cond_init(&m_AcceptCond, NULL);
00077       pthread_mutex_init(&m_ControlLock, NULL);
00078    #else
00079       m_AcceptLock = CreateMutex(NULL, false, NULL);
00080       m_AcceptCond = CreateEvent(NULL, false, false, NULL);
00081       m_ControlLock = CreateMutex(NULL, false, NULL);
00082    #endif
00083 }
00084 
00085 CUDTSocket::~CUDTSocket()
00086 {
00087    if (AF_INET == m_iIPversion)
00088    {
00089       delete (sockaddr_in*)m_pSelfAddr;
00090       delete (sockaddr_in*)m_pPeerAddr;
00091    }
00092    else
00093    {
00094       delete (sockaddr_in6*)m_pSelfAddr;
00095       delete (sockaddr_in6*)m_pPeerAddr;
00096    }
00097 
00098    delete m_pUDT;
00099    m_pUDT = NULL;
00100 
00101    delete m_pQueuedSockets;
00102    delete m_pAcceptSockets;
00103 
00104    #ifndef WIN32
00105       pthread_mutex_destroy(&m_AcceptLock);
00106       pthread_cond_destroy(&m_AcceptCond);
00107       pthread_mutex_destroy(&m_ControlLock);
00108    #else
00109       CloseHandle(m_AcceptLock);
00110       CloseHandle(m_AcceptCond);
00111       CloseHandle(m_ControlLock);
00112    #endif
00113 }
00114 
00116 
00117 CUDTUnited::CUDTUnited():
00118 m_Sockets(),
00119 m_ControlLock(),
00120 m_IDLock(),
00121 m_SocketID(0),
00122 m_TLSError(),
00123 m_mMultiplexer(),
00124 m_MultiplexerLock(),
00125 m_pCache(NULL),
00126 m_bClosing(false),
00127 m_GCStopLock(),
00128 m_GCStopCond(),
00129 m_InitLock(),
00130 m_iInstanceCount(0),
00131 m_bGCStatus(false),
00132 m_GCThread(),
00133 m_ClosedSockets()
00134 {
00135    // Socket ID MUST start from a random value
00136    srand((unsigned int)CTimer::getTime());
00137    m_SocketID = 1 + (int)((1 << 30) * (double(rand()) / RAND_MAX));
00138 
00139    #ifndef WIN32
00140       pthread_mutex_init(&m_ControlLock, NULL);
00141       pthread_mutex_init(&m_IDLock, NULL);
00142       pthread_mutex_init(&m_InitLock, NULL);
00143    #else
00144       m_ControlLock = CreateMutex(NULL, false, NULL);
00145       m_IDLock = CreateMutex(NULL, false, NULL);
00146       m_InitLock = CreateMutex(NULL, false, NULL);
00147    #endif
00148 
00149    #ifndef WIN32
00150       pthread_key_create(&m_TLSError, TLSDestroy);
00151    #else
00152       m_TLSError = TlsAlloc();
00153       m_TLSLock = CreateMutex(NULL, false, NULL);
00154    #endif
00155 
00156    m_pCache = new CCache<CInfoBlock>;
00157 }
00158 
00159 CUDTUnited::~CUDTUnited()
00160 {
00161    #ifndef WIN32
00162       pthread_mutex_destroy(&m_ControlLock);
00163       pthread_mutex_destroy(&m_IDLock);
00164       pthread_mutex_destroy(&m_InitLock);
00165    #else
00166       CloseHandle(m_ControlLock);
00167       CloseHandle(m_IDLock);
00168       CloseHandle(m_InitLock);
00169    #endif
00170 
00171    #ifndef WIN32
00172       pthread_key_delete(m_TLSError);
00173    #else
00174       TlsFree(m_TLSError);
00175       CloseHandle(m_TLSLock);
00176    #endif
00177 
00178    delete m_pCache;
00179 }
00180 
00181 int CUDTUnited::startup()
00182 {
00183    CGuard gcinit(m_InitLock);
00184 
00185    if (m_iInstanceCount++ > 0)
00186       return 0;
00187 
00188    // Global initialization code
00189    #ifdef WIN32
00190       WORD wVersionRequested;
00191       WSADATA wsaData;
00192       wVersionRequested = MAKEWORD(2, 2);
00193 
00194       if (0 != WSAStartup(wVersionRequested, &wsaData))
00195          throw CUDTException(1, 0,  WSAGetLastError());
00196    #endif
00197 
00198    //init CTimer::EventLock
00199 
00200    if (m_bGCStatus)
00201       return true;
00202 
00203    m_bClosing = false;
00204    #ifndef WIN32
00205       pthread_mutex_init(&m_GCStopLock, NULL);
00206       pthread_cond_init(&m_GCStopCond, NULL);
00207       pthread_create(&m_GCThread, NULL, garbageCollect, this);
00208    #else
00209       m_GCStopLock = CreateMutex(NULL, false, NULL);
00210       m_GCStopCond = CreateEvent(NULL, false, false, NULL);
00211       DWORD ThreadID;
00212       m_GCThread = CreateThread(NULL, 0, garbageCollect, this, 0, &ThreadID);
00213    #endif
00214 
00215    m_bGCStatus = true;
00216 
00217    return 0;
00218 }
00219 
00220 int CUDTUnited::cleanup()
00221 {
00222    CGuard gcinit(m_InitLock);
00223 
00224    if (--m_iInstanceCount > 0)
00225       return 0;
00226 
00227    //destroy CTimer::EventLock
00228 
00229    if (!m_bGCStatus)
00230       return 0;
00231 
00232    m_bClosing = true;
00233    #ifndef WIN32
00234       pthread_cond_signal(&m_GCStopCond);
00235       pthread_join(m_GCThread, NULL);
00236       pthread_mutex_destroy(&m_GCStopLock);
00237       pthread_cond_destroy(&m_GCStopCond);
00238    #else
00239       SetEvent(m_GCStopCond);
00240       WaitForSingleObject(m_GCThread, INFINITE);
00241       CloseHandle(m_GCThread);
00242       CloseHandle(m_GCStopLock);
00243       CloseHandle(m_GCStopCond);
00244    #endif
00245 
00246    m_bGCStatus = false;
00247 
00248    // Global destruction code
00249    #ifdef WIN32
00250       WSACleanup();
00251    #endif
00252 
00253    return 0;
00254 }
00255 
00256 UDTSOCKET CUDTUnited::newSocket(int af, int type)
00257 {
00258    if ((type != SOCK_STREAM) && (type != SOCK_DGRAM))
00259       throw CUDTException(5, 3, 0);
00260 
00261    CUDTSocket* ns = NULL;
00262 
00263    try
00264    {
00265       ns = new CUDTSocket;
00266       ns->m_pUDT = new CUDT;
00267       if (AF_INET == af)
00268       {
00269          ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in);
00270          ((sockaddr_in*)(ns->m_pSelfAddr))->sin_port = 0;
00271       }
00272       else
00273       {
00274          ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in6);
00275          ((sockaddr_in6*)(ns->m_pSelfAddr))->sin6_port = 0;
00276       }
00277    }
00278    catch (...)
00279    {
00280       delete ns;
00281       throw CUDTException(3, 2, 0);
00282    }
00283 
00284    CGuard::enterCS(m_IDLock);
00285    ns->m_SocketID = -- m_SocketID;
00286    CGuard::leaveCS(m_IDLock);
00287 
00288    ns->m_Status = INIT;
00289    ns->m_ListenSocket = 0;
00290    ns->m_pUDT->m_SocketID = ns->m_SocketID;
00291    ns->m_pUDT->m_iSockType = (SOCK_STREAM == type) ? UDT_STREAM : UDT_DGRAM;
00292    ns->m_pUDT->m_iIPversion = ns->m_iIPversion = af;
00293    ns->m_pUDT->m_pCache = m_pCache;
00294 
00295    // protect the m_Sockets structure.
00296    CGuard::enterCS(m_ControlLock);
00297    try
00298    {
00299       m_Sockets[ns->m_SocketID] = ns;
00300    }
00301    catch (...)
00302    {
00303       //failure and rollback
00304       CGuard::leaveCS(m_ControlLock);
00305       delete ns;
00306       ns = NULL;
00307    }
00308    CGuard::leaveCS(m_ControlLock);
00309 
00310    if (NULL == ns)
00311       throw CUDTException(3, 2, 0);
00312 
00313    return ns->m_SocketID;
00314 }
00315 
00316 int CUDTUnited::newConnection(const UDTSOCKET listen, const sockaddr* peer, CHandShake* hs)
00317 {
00318    CUDTSocket* ns = NULL;
00319    CUDTSocket* ls = locate(listen);
00320 
00321    if (NULL == ls)
00322       return -1;
00323 
00324    // if this connection has already been processed
00325    if (NULL != (ns = locate(peer, hs->m_iID, hs->m_iISN)))
00326    {
00327       if (ns->m_pUDT->m_bBroken)
00328       {
00329          // last connection from the "peer" address has been broken
00330          ns->m_Status = CLOSED;
00331          ns->m_TimeStamp = CTimer::getTime();
00332 
00333          CGuard::enterCS(ls->m_AcceptLock);
00334          ls->m_pQueuedSockets->erase(ns->m_SocketID);
00335          ls->m_pAcceptSockets->erase(ns->m_SocketID);
00336          CGuard::leaveCS(ls->m_AcceptLock);
00337       }
00338       else
00339       {
00340          // connection already exist, this is a repeated connection request
00341          // respond with existing HS information
00342 
00343          hs->m_iISN = ns->m_pUDT->m_iISN;
00344          hs->m_iMSS = ns->m_pUDT->m_iMSS;
00345          hs->m_iFlightFlagSize = ns->m_pUDT->m_iFlightFlagSize;
00346          hs->m_iReqType = -1;
00347          hs->m_iID = ns->m_SocketID;
00348 
00349          return 0;
00350 
00351          //except for this situation a new connection should be started
00352       }
00353    }
00354 
00355    // exceeding backlog, refuse the connection request
00356    if (ls->m_pQueuedSockets->size() >= ls->m_uiBackLog)
00357       return -1;
00358 
00359    try
00360    {
00361       ns = new CUDTSocket;
00362       ns->m_pUDT = new CUDT(*(ls->m_pUDT));
00363       if (AF_INET == ls->m_iIPversion)
00364       {
00365          ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in);
00366          ((sockaddr_in*)(ns->m_pSelfAddr))->sin_port = 0;
00367          ns->m_pPeerAddr = (sockaddr*)(new sockaddr_in);
00368          memcpy(ns->m_pPeerAddr, peer, sizeof(sockaddr_in));
00369       }
00370       else
00371       {
00372          ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in6);
00373          ((sockaddr_in6*)(ns->m_pSelfAddr))->sin6_port = 0;
00374          ns->m_pPeerAddr = (sockaddr*)(new sockaddr_in6);
00375          memcpy(ns->m_pPeerAddr, peer, sizeof(sockaddr_in6));
00376       }
00377    }
00378    catch (...)
00379    {
00380       delete ns;
00381       return -1;
00382    }
00383 
00384    CGuard::enterCS(m_IDLock);
00385    ns->m_SocketID = -- m_SocketID;
00386    CGuard::leaveCS(m_IDLock);
00387 
00388    ns->m_ListenSocket = listen;
00389    ns->m_iIPversion = ls->m_iIPversion;
00390    ns->m_pUDT->m_SocketID = ns->m_SocketID;
00391    ns->m_PeerID = hs->m_iID;
00392    ns->m_iISN = hs->m_iISN;
00393 
00394    int error = 0;
00395 
00396    try
00397    {
00398       // bind to the same addr of listening socket
00399       ns->m_pUDT->open();
00400       updateMux(ns, ls);
00401       ns->m_pUDT->connect(peer, hs);
00402    }
00403    catch (...)
00404    {
00405       error = 1;
00406       goto ERR_ROLLBACK;
00407    }
00408 
00409    ns->m_Status = CONNECTED;
00410 
00411    // copy address information of local node
00412    ns->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(ns->m_pSelfAddr);
00413    CIPAddress::pton(ns->m_pSelfAddr, ns->m_pUDT->m_piSelfIP, ns->m_iIPversion);
00414 
00415    // protect the m_Sockets structure.
00416    CGuard::enterCS(m_ControlLock);
00417    try
00418    {
00419       m_Sockets[ns->m_SocketID] = ns;
00420       m_PeerRec[(ns->m_PeerID << 30) + ns->m_iISN].insert(ns->m_SocketID);
00421    }
00422    catch (...)
00423    {
00424       error = 2;
00425    }
00426    CGuard::leaveCS(m_ControlLock);
00427 
00428    CGuard::enterCS(ls->m_AcceptLock);
00429    try
00430    {
00431       ls->m_pQueuedSockets->insert(ns->m_SocketID);
00432    }
00433    catch (...)
00434    {
00435       error = 3;
00436    }
00437    CGuard::leaveCS(ls->m_AcceptLock);
00438 
00439    // acknowledge users waiting for new connections on the listening socket
00440    m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, true);
00441 
00442    CTimer::triggerEvent();
00443 
00444    ERR_ROLLBACK:
00445    if (error > 0)
00446    {
00447       ns->m_pUDT->close();
00448       ns->m_Status = CLOSED;
00449       ns->m_TimeStamp = CTimer::getTime();
00450 
00451       return -1;
00452    }
00453 
00454    // wake up a waiting accept() call
00455    #ifndef WIN32
00456       pthread_mutex_lock(&(ls->m_AcceptLock));
00457       pthread_cond_signal(&(ls->m_AcceptCond));
00458       pthread_mutex_unlock(&(ls->m_AcceptLock));
00459    #else
00460       SetEvent(ls->m_AcceptCond);
00461    #endif
00462 
00463    return 1;
00464 }
00465 
00466 CUDT* CUDTUnited::lookup(const UDTSOCKET u)
00467 {
00468    // protects the m_Sockets structure
00469    CGuard cg(m_ControlLock);
00470 
00471    map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u);
00472 
00473    if ((i == m_Sockets.end()) || (i->second->m_Status == CLOSED))
00474       throw CUDTException(5, 4, 0);
00475 
00476    return i->second->m_pUDT;
00477 }
00478 
00479 UDTSTATUS CUDTUnited::getStatus(const UDTSOCKET u)
00480 {
00481    // protects the m_Sockets structure
00482    CGuard cg(m_ControlLock);
00483 
00484    map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u);
00485 
00486    if (i == m_Sockets.end())
00487    {
00488       if (m_ClosedSockets.find(u) != m_ClosedSockets.end())
00489          return CLOSED;
00490 
00491       return NONEXIST;
00492    }
00493 
00494    if (i->second->m_pUDT->m_bBroken)
00495       return BROKEN;
00496 
00497    return i->second->m_Status;   
00498 }
00499 
00500 int CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen)
00501 {
00502    CUDTSocket* s = locate(u);
00503    if (NULL == s)
00504       throw CUDTException(5, 4, 0);
00505 
00506    CGuard cg(s->m_ControlLock);
00507 
00508    // cannot bind a socket more than once
00509    if (INIT != s->m_Status)
00510       throw CUDTException(5, 0, 0);
00511 
00512    // check the size of SOCKADDR structure
00513    if (AF_INET == s->m_iIPversion)
00514    {
00515       if (namelen != sizeof(sockaddr_in))
00516          throw CUDTException(5, 3, 0);
00517    }
00518    else
00519    {
00520       if (namelen != sizeof(sockaddr_in6))
00521          throw CUDTException(5, 3, 0);
00522    }
00523 
00524    s->m_pUDT->open();
00525    updateMux(s, name);
00526    s->m_Status = OPENED;
00527 
00528    // copy address information of local node
00529    s->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr);
00530 
00531    return 0;
00532 }
00533 
00534 int CUDTUnited::bind(UDTSOCKET u, UDPSOCKET udpsock)
00535 {
00536    CUDTSocket* s = locate(u);
00537    if (NULL == s)
00538       throw CUDTException(5, 4, 0);
00539 
00540    CGuard cg(s->m_ControlLock);
00541 
00542    // cannot bind a socket more than once
00543    if (INIT != s->m_Status)
00544       throw CUDTException(5, 0, 0);
00545 
00546    sockaddr_in name4;
00547    sockaddr_in6 name6;
00548    sockaddr* name;
00549    socklen_t namelen;
00550 
00551    if (AF_INET == s->m_iIPversion)
00552    {
00553       namelen = sizeof(sockaddr_in);
00554       name = (sockaddr*)&name4;
00555    }
00556    else
00557    {
00558       namelen = sizeof(sockaddr_in6);
00559       name = (sockaddr*)&name6;
00560    }
00561 
00562    if (-1 == ::getsockname(udpsock, name, &namelen))
00563       throw CUDTException(5, 3);
00564 
00565    s->m_pUDT->open();
00566    updateMux(s, name, &udpsock);
00567    s->m_Status = OPENED;
00568 
00569    // copy address information of local node
00570    s->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr);
00571 
00572    return 0;
00573 }
00574 
00575 int CUDTUnited::listen(const UDTSOCKET u, int backlog)
00576 {
00577    CUDTSocket* s = locate(u);
00578    if (NULL == s)
00579       throw CUDTException(5, 4, 0);
00580 
00581    CGuard cg(s->m_ControlLock);
00582 
00583    // do nothing if the socket is already listening
00584    if (LISTENING == s->m_Status)
00585       return 0;
00586 
00587    // a socket can listen only if is in OPENED status
00588    if (OPENED != s->m_Status)
00589       throw CUDTException(5, 5, 0);
00590 
00591    // listen is not supported in rendezvous connection setup
00592    if (s->m_pUDT->m_bRendezvous)
00593       throw CUDTException(5, 7, 0);
00594 
00595    if (backlog <= 0)
00596       throw CUDTException(5, 3, 0);
00597 
00598    s->m_uiBackLog = backlog;
00599 
00600    try
00601    {
00602       s->m_pQueuedSockets = new set<UDTSOCKET>;
00603       s->m_pAcceptSockets = new set<UDTSOCKET>;
00604    }
00605    catch (...)
00606    {
00607       delete s->m_pQueuedSockets;
00608       delete s->m_pAcceptSockets;
00609       throw CUDTException(3, 2, 0);
00610    }
00611 
00612    s->m_pUDT->listen();
00613 
00614    s->m_Status = LISTENING;
00615 
00616    return 0;
00617 }
00618 
00619 UDTSOCKET CUDTUnited::accept(const UDTSOCKET listen, sockaddr* addr, int* addrlen)
00620 {
00621    if ((NULL != addr) && (NULL == addrlen))
00622       throw CUDTException(5, 3, 0);
00623 
00624    CUDTSocket* ls = locate(listen);
00625 
00626    if (ls == NULL)
00627       throw CUDTException(5, 4, 0);
00628 
00629    // the "listen" socket must be in LISTENING status
00630    if (LISTENING != ls->m_Status)
00631       throw CUDTException(5, 6, 0);
00632 
00633    // no "accept" in rendezvous connection setup
00634    if (ls->m_pUDT->m_bRendezvous)
00635       throw CUDTException(5, 7, 0);
00636 
00637    UDTSOCKET u = CUDT::INVALID_SOCK;
00638    bool accepted = false;
00639 
00640    // !!only one conection can be set up each time!!
00641    #ifndef WIN32
00642       while (!accepted)
00643       {
00644          pthread_mutex_lock(&(ls->m_AcceptLock));
00645 
00646          if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken)
00647          {
00648             // This socket has been closed.
00649             accepted = true;
00650          }
00651          else if (ls->m_pQueuedSockets->size() > 0)
00652          {
00653             u = *(ls->m_pQueuedSockets->begin());
00654             ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);
00655             ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());
00656             accepted = true;
00657          }
00658          else if (!ls->m_pUDT->m_bSynRecving)
00659          {
00660             accepted = true;
00661          }
00662 
00663          if (!accepted && (LISTENING == ls->m_Status))
00664             pthread_cond_wait(&(ls->m_AcceptCond), &(ls->m_AcceptLock));
00665 
00666          if (ls->m_pQueuedSockets->empty())
00667             m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);
00668 
00669          pthread_mutex_unlock(&(ls->m_AcceptLock));
00670       }
00671    #else
00672       while (!accepted)
00673       {
00674          WaitForSingleObject(ls->m_AcceptLock, INFINITE);
00675 
00676          if (ls->m_pQueuedSockets->size() > 0)
00677          {
00678             u = *(ls->m_pQueuedSockets->begin());
00679             ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);
00680             ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());
00681 
00682             accepted = true;
00683          }
00684          else if (!ls->m_pUDT->m_bSynRecving)
00685             accepted = true;
00686 
00687          ReleaseMutex(ls->m_AcceptLock);
00688 
00689          if  (!accepted & (LISTENING == ls->m_Status))
00690             WaitForSingleObject(ls->m_AcceptCond, INFINITE);
00691 
00692          if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken)
00693          {
00694             // Send signal to other threads that are waiting to accept.
00695             SetEvent(ls->m_AcceptCond);
00696             accepted = true;
00697          }
00698 
00699          if (ls->m_pQueuedSockets->empty())
00700             m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);
00701       }
00702    #endif
00703 
00704    if (u == CUDT::INVALID_SOCK)
00705    {
00706       // non-blocking receiving, no connection available
00707       if (!ls->m_pUDT->m_bSynRecving)
00708          throw CUDTException(6, 2, 0);
00709 
00710       // listening socket is closed
00711       throw CUDTException(5, 6, 0);
00712    }
00713 
00714    if ((addr != NULL) && (addrlen != NULL))
00715    {
00716       if (AF_INET == locate(u)->m_iIPversion)
00717          *addrlen = sizeof(sockaddr_in);
00718       else
00719          *addrlen = sizeof(sockaddr_in6);
00720 
00721       // copy address information of peer node
00722       memcpy(addr, locate(u)->m_pPeerAddr, *addrlen);
00723    }
00724 
00725    return u;
00726 }
00727 
00728 int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, int namelen)
00729 {
00730    CUDTSocket* s = locate(u);
00731    if (NULL == s)
00732       throw CUDTException(5, 4, 0);
00733 
00734    CGuard cg(s->m_ControlLock);
00735 
00736    // check the size of SOCKADDR structure
00737    if (AF_INET == s->m_iIPversion)
00738    {
00739       if (namelen != sizeof(sockaddr_in))
00740          throw CUDTException(5, 3, 0);
00741    }
00742    else
00743    {
00744       if (namelen != sizeof(sockaddr_in6))
00745          throw CUDTException(5, 3, 0);
00746    }
00747 
00748    // a socket can "connect" only if it is in INIT or OPENED status
00749    if (INIT == s->m_Status)
00750    {
00751       if (!s->m_pUDT->m_bRendezvous)
00752       {
00753          s->m_pUDT->open();
00754          updateMux(s);
00755          s->m_Status = OPENED;
00756       }
00757       else
00758          throw CUDTException(5, 8, 0);
00759    }
00760    else if (OPENED != s->m_Status)
00761       throw CUDTException(5, 2, 0);
00762 
00763    // connect_complete() may be called before connect() returns.
00764    // So we need to update the status before connect() is called,
00765    // otherwise the status may be overwritten with wrong value (CONNECTED vs. CONNECTING).
00766    s->m_Status = CONNECTING;
00767    try
00768    {
00769       s->m_pUDT->connect(name);
00770    }
00771    catch (CUDTException e)
00772    {
00773       s->m_Status = OPENED;
00774       throw e;
00775    }
00776 
00777    // record peer address
00778    delete s->m_pPeerAddr;
00779    if (AF_INET == s->m_iIPversion)
00780    {
00781       s->m_pPeerAddr = (sockaddr*)(new sockaddr_in);
00782       memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in));
00783    }
00784    else
00785    {
00786       s->m_pPeerAddr = (sockaddr*)(new sockaddr_in6);
00787       memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in6));
00788    }
00789 
00790    return 0;
00791 }
00792 
00793 void CUDTUnited::connect_complete(const UDTSOCKET u)
00794 {
00795    CUDTSocket* s = locate(u);
00796    if (NULL == s)
00797       throw CUDTException(5, 4, 0);
00798 
00799    // copy address information of local node
00800    // the local port must be correctly assigned BEFORE CUDT::connect(),
00801    // otherwise if connect() fails, the multiplexer cannot be located by garbage collection and will cause leak
00802    s->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr);
00803    CIPAddress::pton(s->m_pSelfAddr, s->m_pUDT->m_piSelfIP, s->m_iIPversion);
00804 
00805    s->m_Status = CONNECTED;
00806 }
00807 
00808 int CUDTUnited::close(const UDTSOCKET u)
00809 {
00810    CUDTSocket* s = locate(u);
00811    if (NULL == s)
00812       throw CUDTException(5, 4, 0);
00813 
00814    CGuard socket_cg(s->m_ControlLock);
00815 
00816    if (s->m_Status == LISTENING)
00817    {
00818       if (s->m_pUDT->m_bBroken)
00819          return 0;
00820 
00821       s->m_TimeStamp = CTimer::getTime();
00822       s->m_pUDT->m_bBroken = true;
00823 
00824       // broadcast all "accept" waiting
00825       #ifndef WIN32
00826          pthread_mutex_lock(&(s->m_AcceptLock));
00827          pthread_cond_broadcast(&(s->m_AcceptCond));
00828          pthread_mutex_unlock(&(s->m_AcceptLock));
00829       #else
00830          SetEvent(s->m_AcceptCond);
00831       #endif
00832 
00833       return 0;
00834    }
00835 
00836    s->m_pUDT->close();
00837 
00838    // synchronize with garbage collection.
00839    CGuard manager_cg(m_ControlLock);
00840 
00841    // since "s" is located before m_ControlLock, locate it again in case it became invalid
00842    map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u);
00843    if ((i == m_Sockets.end()) || (i->second->m_Status == CLOSED))
00844       return 0;
00845    s = i->second;
00846 
00847    s->m_Status = CLOSED;
00848 
00849    // a socket will not be immediated removed when it is closed
00850    // in order to prevent other methods from accessing invalid address
00851    // a timer is started and the socket will be removed after approximately 1 second
00852    s->m_TimeStamp = CTimer::getTime();
00853 
00854    m_Sockets.erase(s->m_SocketID);
00855    m_ClosedSockets.insert(pair<UDTSOCKET, CUDTSocket*>(s->m_SocketID, s));
00856 
00857    CTimer::triggerEvent();
00858 
00859    return 0;
00860 }
00861 
00862 int CUDTUnited::getpeername(const UDTSOCKET u, sockaddr* name, int* namelen)
00863 {
00864    if (CONNECTED != getStatus(u))
00865       throw CUDTException(2, 2, 0);
00866 
00867    CUDTSocket* s = locate(u);
00868 
00869    if (NULL == s)
00870       throw CUDTException(5, 4, 0);
00871 
00872    if (!s->m_pUDT->m_bConnected || s->m_pUDT->m_bBroken)
00873       throw CUDTException(2, 2, 0);
00874 
00875    if (AF_INET == s->m_iIPversion)
00876       *namelen = sizeof(sockaddr_in);
00877    else
00878       *namelen = sizeof(sockaddr_in6);
00879 
00880    // copy address information of peer node
00881    memcpy(name, s->m_pPeerAddr, *namelen);
00882 
00883    return 0;
00884 }
00885 
00886 int CUDTUnited::getsockname(const UDTSOCKET u, sockaddr* name, int* namelen)
00887 {
00888    CUDTSocket* s = locate(u);
00889 
00890    if (NULL == s)
00891       throw CUDTException(5, 4, 0);
00892 
00893    if (s->m_pUDT->m_bBroken)
00894       throw CUDTException(5, 4, 0);
00895 
00896    if (INIT == s->m_Status)
00897       throw CUDTException(2, 2, 0);
00898 
00899    if (AF_INET == s->m_iIPversion)
00900       *namelen = sizeof(sockaddr_in);
00901    else
00902       *namelen = sizeof(sockaddr_in6);
00903 
00904    // copy address information of local node
00905    memcpy(name, s->m_pSelfAddr, *namelen);
00906 
00907    return 0;
00908 }
00909 
00910 int CUDTUnited::select(ud_set* readfds, ud_set* writefds, ud_set* exceptfds, const timeval* timeout)
00911 {
00912    uint64_t entertime = CTimer::getTime();
00913 
00914    uint64_t to;
00915    if (NULL == timeout)
00916       to = 0xFFFFFFFFFFFFFFFFULL;
00917    else
00918       to = timeout->tv_sec * 1000000 + timeout->tv_usec;
00919 
00920    // initialize results
00921    int count = 0;
00922    set<UDTSOCKET> rs, ws, es;
00923 
00924    // retrieve related UDT sockets
00925    vector<CUDTSocket*> ru, wu, eu;
00926    CUDTSocket* s;
00927    if (NULL != readfds)
00928       for (set<UDTSOCKET>::iterator i1 = readfds->begin(); i1 != readfds->end(); ++ i1)
00929       {
00930          if (BROKEN == getStatus(*i1))
00931          {
00932             rs.insert(*i1);
00933             ++ count;
00934          }
00935          else if (NULL == (s = locate(*i1)))
00936             throw CUDTException(5, 4, 0);
00937          else
00938             ru.push_back(s);
00939       }
00940    if (NULL != writefds)
00941       for (set<UDTSOCKET>::iterator i2 = writefds->begin(); i2 != writefds->end(); ++ i2)
00942       {
00943          if (BROKEN == getStatus(*i2))
00944          {
00945             ws.insert(*i2);
00946             ++ count;
00947          }
00948          else if (NULL == (s = locate(*i2)))
00949             throw CUDTException(5, 4, 0);
00950          else
00951             wu.push_back(s);
00952       }
00953    if (NULL != exceptfds)
00954       for (set<UDTSOCKET>::iterator i3 = exceptfds->begin(); i3 != exceptfds->end(); ++ i3)
00955       {
00956          if (BROKEN == getStatus(*i3))
00957          {
00958             es.insert(*i3);
00959             ++ count;
00960          }
00961          else if (NULL == (s = locate(*i3)))
00962             throw CUDTException(5, 4, 0);
00963          else
00964             eu.push_back(s);
00965       }
00966 
00967    do
00968    {
00969       // query read sockets
00970       for (vector<CUDTSocket*>::iterator j1 = ru.begin(); j1 != ru.end(); ++ j1)
00971       {
00972          s = *j1;
00973 
00974          if ((s->m_pUDT->m_bConnected && (s->m_pUDT->m_pRcvBuffer->getRcvDataSize() > 0) && ((s->m_pUDT->m_iSockType == UDT_STREAM) || (s->m_pUDT->m_pRcvBuffer->getRcvMsgNum() > 0)))
00975             || (!s->m_pUDT->m_bListening && (s->m_pUDT->m_bBroken || !s->m_pUDT->m_bConnected))
00976             || (s->m_pUDT->m_bListening && (s->m_pQueuedSockets->size() > 0))
00977             || (s->m_Status == CLOSED))
00978          {
00979             rs.insert(s->m_SocketID);
00980             ++ count;
00981          }
00982       }
00983 
00984       // query write sockets
00985       for (vector<CUDTSocket*>::iterator j2 = wu.begin(); j2 != wu.end(); ++ j2)
00986       {
00987          s = *j2;
00988 
00989          if ((s->m_pUDT->m_bConnected && (s->m_pUDT->m_pSndBuffer->getCurrBufSize() < s->m_pUDT->m_iSndBufSize))
00990             || s->m_pUDT->m_bBroken || !s->m_pUDT->m_bConnected || (s->m_Status == CLOSED))
00991          {
00992             ws.insert(s->m_SocketID);
00993             ++ count;
00994          }
00995       }
00996 
00997       // query exceptions on sockets
00998       for (vector<CUDTSocket*>::iterator j3 = eu.begin(); j3 != eu.end(); ++ j3)
00999       {
01000          // check connection request status, not supported now
01001       }
01002 
01003       if (0 < count)
01004          break;
01005 
01006       CTimer::waitForEvent();
01007    } while (to > CTimer::getTime() - entertime);
01008 
01009    if (NULL != readfds)
01010       *readfds = rs;
01011 
01012    if (NULL != writefds)
01013       *writefds = ws;
01014 
01015    if (NULL != exceptfds)
01016       *exceptfds = es;
01017 
01018    return count;
01019 }
01020 
01021 int CUDTUnited::selectEx(const vector<UDTSOCKET>& fds, vector<UDTSOCKET>* readfds, vector<UDTSOCKET>* writefds, vector<UDTSOCKET>* exceptfds, int64_t msTimeOut)
01022 {
01023    uint64_t entertime = CTimer::getTime();
01024 
01025    uint64_t to;
01026    if (msTimeOut >= 0)
01027       to = msTimeOut * 1000;
01028    else
01029       to = 0xFFFFFFFFFFFFFFFFULL;
01030 
01031    // initialize results
01032    int count = 0;
01033    if (NULL != readfds)
01034       readfds->clear();
01035    if (NULL != writefds)
01036       writefds->clear();
01037    if (NULL != exceptfds)
01038       exceptfds->clear();
01039 
01040    do
01041    {
01042       for (vector<UDTSOCKET>::const_iterator i = fds.begin(); i != fds.end(); ++ i)
01043       {
01044          CUDTSocket* s = locate(*i);
01045 
01046          if ((NULL == s) || s->m_pUDT->m_bBroken || (s->m_Status == CLOSED))
01047          {
01048             if (NULL != exceptfds)
01049             {
01050                exceptfds->push_back(*i);
01051                ++ count;
01052             }
01053             continue;
01054          }
01055 
01056          if (NULL != readfds)
01057          {
01058             if ((s->m_pUDT->m_bConnected && (s->m_pUDT->m_pRcvBuffer->getRcvDataSize() > 0) && ((s->m_pUDT->m_iSockType == UDT_STREAM) || (s->m_pUDT->m_pRcvBuffer->getRcvMsgNum() > 0)))
01059                || (s->m_pUDT->m_bListening && (s->m_pQueuedSockets->size() > 0)))
01060             {
01061                readfds->push_back(s->m_SocketID);
01062                ++ count;
01063             }
01064          }
01065 
01066          if (NULL != writefds)
01067          {
01068             if (s->m_pUDT->m_bConnected && (s->m_pUDT->m_pSndBuffer->getCurrBufSize() < s->m_pUDT->m_iSndBufSize))
01069             {
01070                writefds->push_back(s->m_SocketID);
01071                ++ count;
01072             }
01073          }
01074       }
01075 
01076       if (count > 0)
01077          break;
01078 
01079       CTimer::waitForEvent();
01080    } while (to > CTimer::getTime() - entertime);
01081 
01082    return count;
01083 }
01084 
01085 int CUDTUnited::epoll_create()
01086 {
01087    return m_EPoll.create();
01088 }
01089 
01090 int CUDTUnited::epoll_add_usock(const int eid, const UDTSOCKET u, const int* events)
01091 {
01092    CUDTSocket* s = locate(u);
01093    int ret = -1;
01094    if (NULL != s)
01095    {
01096       ret = m_EPoll.add_usock(eid, u, events);
01097       s->m_pUDT->addEPoll(eid);
01098    }
01099    else
01100    {
01101       throw CUDTException(5, 4);
01102    }
01103 
01104    return ret;
01105 }
01106 
01107 // BARCHART
01108 int CUDTUnited::epoll_update_usock(const int eid, const UDTSOCKET u, const int* events)
01109 {
01110    CUDTSocket* s = locate(u);
01111    int ret = -1;
01112    if (NULL != s)  {
01113       ret = m_EPoll.update_usock(eid, u, events);
01114    } else {
01115       throw CUDTException(5, 4);
01116    }
01117    return ret;
01118 }
01119 
01120 // BARCHART
01121 int CUDTUnited::epoll_verify_usock(const int eid, const UDTSOCKET u, int* events)
01122 {
01123    CUDTSocket* s = locate(u);
01124    int ret = -1;
01125    if (NULL != s)  {
01126       ret = m_EPoll.verify_usock(eid, u, events);
01127    } else {
01128       throw CUDTException(5, 4);
01129    }
01130    return ret;
01131 }
01132 
01133 int CUDTUnited::epoll_add_ssock(const int eid, const SYSSOCKET s, const int* events)
01134 {
01135    return m_EPoll.add_ssock(eid, s, events);
01136 }
01137 
01138 int CUDTUnited::epoll_remove_usock(const int eid, const UDTSOCKET u)
01139 {
01140    CUDTSocket* s = locate(u);
01141    if (NULL != s)
01142    {
01143       s->m_pUDT->removeEPoll(eid);
01144    }
01145    //else
01146    //{
01147    //   throw CUDTException(5, 4);
01148    //}
01149 
01150    return m_EPoll.remove_usock(eid, u);
01151 }
01152 
01153 int CUDTUnited::epoll_remove_ssock(const int eid, const SYSSOCKET s)
01154 {
01155    return m_EPoll.remove_ssock(eid, s);
01156 }
01157 
01158 int CUDTUnited::epoll_wait(const int eid, set<UDTSOCKET>* readfds, set<UDTSOCKET>* writefds, int64_t msTimeOut, set<SYSSOCKET>* lrfds, set<SYSSOCKET>* lwfds)
01159 {
01160    return m_EPoll.wait(eid, readfds, writefds, msTimeOut, lrfds, lwfds);
01161 }
01162 
01163 int CUDTUnited::epoll_release(const int eid)
01164 {
01165    return m_EPoll.release(eid);
01166 }
01167 
01168 CUDTSocket* CUDTUnited::locate(const UDTSOCKET u)
01169 {
01170    CGuard cg(m_ControlLock);
01171 
01172    map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u);
01173 
01174    if ((i == m_Sockets.end()) || (i->second->m_Status == CLOSED))
01175       return NULL;
01176 
01177    return i->second;
01178 }
01179 
01180 CUDTSocket* CUDTUnited::locate(const sockaddr* peer, const UDTSOCKET id, int32_t isn)
01181 {
01182    CGuard cg(m_ControlLock);
01183 
01184    map<int64_t, set<UDTSOCKET> >::iterator i = m_PeerRec.find((id << 30) + isn);
01185    if (i == m_PeerRec.end())
01186       return NULL;
01187 
01188    for (set<UDTSOCKET>::iterator j = i->second.begin(); j != i->second.end(); ++ j)
01189    {
01190       map<UDTSOCKET, CUDTSocket*>::iterator k = m_Sockets.find(*j);
01191       // this socket might have been closed and moved m_ClosedSockets
01192       if (k == m_Sockets.end())
01193          continue;
01194 
01195       if (CIPAddress::ipcmp(peer, k->second->m_pPeerAddr, k->second->m_iIPversion))
01196          return k->second;
01197    }
01198 
01199    return NULL;
01200 }
01201 
01202 void CUDTUnited::checkBrokenSockets()
01203 {
01204    CGuard cg(m_ControlLock);
01205 
01206    // set of sockets To Be Closed and To Be Removed
01207    vector<UDTSOCKET> tbc;
01208    vector<UDTSOCKET> tbr;
01209 
01210    for (map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++ i)
01211    {
01212       // check broken connection
01213       if (i->second->m_pUDT->m_bBroken)
01214       {
01215          if (i->second->m_Status == LISTENING)
01216          {
01217             // for a listening socket, it should wait an extra 3 seconds in case a client is connecting
01218             if (CTimer::getTime() - i->second->m_TimeStamp < 3000000)
01219                continue;
01220          }
01221          else if ((i->second->m_pUDT->m_pRcvBuffer != NULL) && (i->second->m_pUDT->m_pRcvBuffer->getRcvDataSize() > 0) && (i->second->m_pUDT->m_iBrokenCounter -- > 0))
01222          {
01223             // if there is still data in the receiver buffer, wait longer
01224             continue;
01225          }
01226 
01227          //close broken connections and start removal timer
01228          i->second->m_Status = CLOSED;
01229          i->second->m_TimeStamp = CTimer::getTime();
01230          tbc.push_back(i->first);
01231          m_ClosedSockets[i->first] = i->second;
01232 
01233          // remove from listener's queue
01234          map<UDTSOCKET, CUDTSocket*>::iterator ls = m_Sockets.find(i->second->m_ListenSocket);
01235          if (ls == m_Sockets.end())
01236          {
01237             ls = m_ClosedSockets.find(i->second->m_ListenSocket);
01238             if (ls == m_ClosedSockets.end())
01239                continue;
01240          }
01241 
01242          CGuard::enterCS(ls->second->m_AcceptLock);
01243          ls->second->m_pQueuedSockets->erase(i->second->m_SocketID);
01244          ls->second->m_pAcceptSockets->erase(i->second->m_SocketID);
01245          CGuard::leaveCS(ls->second->m_AcceptLock);
01246       }
01247    }
01248 
01249    for (map<UDTSOCKET, CUDTSocket*>::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++ j)
01250    {
01251       if (j->second->m_pUDT->m_ullLingerExpiration > 0)
01252       {
01253          // asynchronous close: 
01254          if ((NULL == j->second->m_pUDT->m_pSndBuffer) || (0 == j->second->m_pUDT->m_pSndBuffer->getCurrBufSize()) || (j->second->m_pUDT->m_ullLingerExpiration <= CTimer::getTime()))
01255          {
01256             j->second->m_pUDT->m_ullLingerExpiration = 0;
01257             j->second->m_pUDT->m_bClosing = true;
01258             j->second->m_TimeStamp = CTimer::getTime();
01259          }
01260       }
01261 
01262       // timeout 1 second to destroy a socket AND it has been removed from RcvUList
01263       if ((CTimer::getTime() - j->second->m_TimeStamp > 1000000) && ((NULL == j->second->m_pUDT->m_pRNode) || !j->second->m_pUDT->m_pRNode->m_bOnList))
01264       {
01265          tbr.push_back(j->first);
01266       }
01267    }
01268 
01269    // move closed sockets to the ClosedSockets structure
01270    for (vector<UDTSOCKET>::iterator k = tbc.begin(); k != tbc.end(); ++ k)
01271       m_Sockets.erase(*k);
01272 
01273    // remove those timeout sockets
01274    for (vector<UDTSOCKET>::iterator l = tbr.begin(); l != tbr.end(); ++ l)
01275       removeSocket(*l);
01276 }
01277 
01278 void CUDTUnited::removeSocket(const UDTSOCKET u)
01279 {
01280    map<UDTSOCKET, CUDTSocket*>::iterator i = m_ClosedSockets.find(u);
01281 
01282    // invalid socket ID
01283    if (i == m_ClosedSockets.end())
01284       return;
01285 
01286    // decrease multiplexer reference count, and remove it if necessary
01287    const int mid = i->second->m_iMuxID;
01288 
01289    if (NULL != i->second->m_pQueuedSockets)
01290    {
01291       CGuard::enterCS(i->second->m_AcceptLock);
01292 
01293       // if it is a listener, close all un-accepted sockets in its queue and remove them later
01294       for (set<UDTSOCKET>::iterator q = i->second->m_pQueuedSockets->begin(); q != i->second->m_pQueuedSockets->end(); ++ q)
01295       {
01296          m_Sockets[*q]->m_pUDT->m_bBroken = true;
01297          m_Sockets[*q]->m_pUDT->close();
01298          m_Sockets[*q]->m_TimeStamp = CTimer::getTime();
01299          m_Sockets[*q]->m_Status = CLOSED;
01300          m_ClosedSockets[*q] = m_Sockets[*q];
01301          m_Sockets.erase(*q);
01302       }
01303 
01304       CGuard::leaveCS(i->second->m_AcceptLock);
01305    }
01306 
01307    // remove from peer rec
01308    map<int64_t, set<UDTSOCKET> >::iterator j = m_PeerRec.find((i->second->m_PeerID << 30) + i->second->m_iISN);
01309    if (j != m_PeerRec.end())
01310    {
01311       j->second.erase(u);
01312       if (j->second.empty())
01313          m_PeerRec.erase(j);
01314    }
01315 
01316    // delete this one
01317    i->second->m_pUDT->close();
01318    delete i->second;
01319    m_ClosedSockets.erase(i);
01320 
01321    map<int, CMultiplexer>::iterator m;
01322    m = m_mMultiplexer.find(mid);
01323    if (m == m_mMultiplexer.end())
01324    {
01325       //something is wrong!!!
01326       return;
01327    }
01328 
01329    m->second.m_iRefCount --;
01330    if (0 == m->second.m_iRefCount)
01331    {
01332       m->second.m_pChannel->close();
01333       delete m->second.m_pSndQueue;
01334       delete m->second.m_pRcvQueue;
01335       delete m->second.m_pTimer;
01336       delete m->second.m_pChannel;
01337       m_mMultiplexer.erase(m);
01338    }
01339 }
01340 
01341 void CUDTUnited::setError(CUDTException* e)
01342 {
01343    #ifndef WIN32
01344       delete (CUDTException*)pthread_getspecific(m_TLSError);
01345       pthread_setspecific(m_TLSError, e);
01346    #else
01347       CGuard tg(m_TLSLock);
01348       delete (CUDTException*)TlsGetValue(m_TLSError);
01349       TlsSetValue(m_TLSError, e);
01350       m_mTLSRecord[GetCurrentThreadId()] = e;
01351    #endif
01352 }
01353 
01354 CUDTException* CUDTUnited::getError()
01355 {
01356    #ifndef WIN32
01357       if(NULL == pthread_getspecific(m_TLSError))
01358          pthread_setspecific(m_TLSError, new CUDTException);
01359       return (CUDTException*)pthread_getspecific(m_TLSError);
01360    #else
01361       CGuard tg(m_TLSLock);
01362       if(NULL == TlsGetValue(m_TLSError))
01363       {
01364          CUDTException* e = new CUDTException;
01365          TlsSetValue(m_TLSError, e);
01366          m_mTLSRecord[GetCurrentThreadId()] = e;
01367       }
01368       return (CUDTException*)TlsGetValue(m_TLSError);
01369    #endif
01370 }
01371 
01372 #ifdef WIN32
01373 void CUDTUnited::checkTLSValue()
01374 {
01375    CGuard tg(m_TLSLock);
01376 
01377    vector<DWORD> tbr;
01378    for (map<DWORD, CUDTException*>::iterator i = m_mTLSRecord.begin(); i != m_mTLSRecord.end(); ++ i)
01379    {
01380       HANDLE h = OpenThread(THREAD_QUERY_INFORMATION, FALSE, i->first);
01381       if (NULL == h)
01382       {
01383          tbr.push_back(i->first);
01384          break;
01385       }
01386       if (WAIT_OBJECT_0 == WaitForSingleObject(h, 0))
01387       {
01388          delete i->second;
01389          tbr.push_back(i->first);
01390       }
01391       CloseHandle(h);
01392    }
01393    for (vector<DWORD>::iterator j = tbr.begin(); j != tbr.end(); ++ j)
01394       m_mTLSRecord.erase(*j);
01395 }
01396 #endif
01397 
01398 void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)
01399 {
01400    CGuard cg(m_ControlLock);
01401 
01402    if ((s->m_pUDT->m_bReuseAddr) && (NULL != addr))
01403    {
01404       int port = (AF_INET == s->m_pUDT->m_iIPversion) ? ntohs(((sockaddr_in*)addr)->sin_port) : ntohs(((sockaddr_in6*)addr)->sin6_port);
01405 
01406       // find a reusable address
01407       for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin(); i != m_mMultiplexer.end(); ++ i)
01408       {
01409          if ((i->second.m_iIPversion == s->m_pUDT->m_iIPversion) && (i->second.m_iMSS == s->m_pUDT->m_iMSS) && i->second.m_bReusable)
01410          {
01411             if (i->second.m_iPort == port)
01412             {
01413                // reuse the existing multiplexer
01414                ++ i->second.m_iRefCount;
01415                s->m_pUDT->m_pSndQueue = i->second.m_pSndQueue;
01416                s->m_pUDT->m_pRcvQueue = i->second.m_pRcvQueue;
01417                s->m_iMuxID = i->second.m_iID;
01418                return;
01419             }
01420          }
01421       }
01422    }
01423 
01424    // a new multiplexer is needed
01425    CMultiplexer m;
01426    m.m_iMSS = s->m_pUDT->m_iMSS;
01427    m.m_iIPversion = s->m_pUDT->m_iIPversion;
01428    m.m_iRefCount = 1;
01429    m.m_bReusable = s->m_pUDT->m_bReuseAddr;
01430    m.m_iID = s->m_SocketID;
01431 
01432    m.m_pChannel = new CChannel(s->m_pUDT->m_iIPversion);
01433    m.m_pChannel->setSndBufSize(s->m_pUDT->m_iUDPSndBufSize);
01434    m.m_pChannel->setRcvBufSize(s->m_pUDT->m_iUDPRcvBufSize);
01435 
01436    try
01437    {
01438       if (NULL != udpsock)
01439          m.m_pChannel->open(*udpsock);
01440       else
01441          m.m_pChannel->open(addr);
01442    }
01443    catch (CUDTException& e)
01444    {
01445       m.m_pChannel->close();
01446       delete m.m_pChannel;
01447       throw e;
01448    }
01449 
01450    sockaddr* sa = (AF_INET == s->m_pUDT->m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;
01451    m.m_pChannel->getSockAddr(sa);
01452    m.m_iPort = (AF_INET == s->m_pUDT->m_iIPversion) ? ntohs(((sockaddr_in*)sa)->sin_port) : ntohs(((sockaddr_in6*)sa)->sin6_port);
01453    if (AF_INET == s->m_pUDT->m_iIPversion) delete (sockaddr_in*)sa; else delete (sockaddr_in6*)sa;
01454 
01455    m.m_pTimer = new CTimer;
01456 
01457    m.m_pSndQueue = new CSndQueue;
01458    m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer);
01459    m.m_pRcvQueue = new CRcvQueue;
01460    m.m_pRcvQueue->init(32, s->m_pUDT->m_iPayloadSize, m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer);
01461 
01462    m_mMultiplexer[m.m_iID] = m;
01463 
01464    s->m_pUDT->m_pSndQueue = m.m_pSndQueue;
01465    s->m_pUDT->m_pRcvQueue = m.m_pRcvQueue;
01466    s->m_iMuxID = m.m_iID;
01467 }
01468 
01469 void CUDTUnited::updateMux(CUDTSocket* s, const CUDTSocket* ls)
01470 {
01471    CGuard cg(m_ControlLock);
01472 
01473    int port = (AF_INET == ls->m_iIPversion) ? ntohs(((sockaddr_in*)ls->m_pSelfAddr)->sin_port) : ntohs(((sockaddr_in6*)ls->m_pSelfAddr)->sin6_port);
01474 
01475    // find the listener's address
01476    for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin(); i != m_mMultiplexer.end(); ++ i)
01477    {
01478       if (i->second.m_iPort == port)
01479       {
01480          // reuse the existing multiplexer
01481          ++ i->second.m_iRefCount;
01482          s->m_pUDT->m_pSndQueue = i->second.m_pSndQueue;
01483          s->m_pUDT->m_pRcvQueue = i->second.m_pRcvQueue;
01484          s->m_iMuxID = i->second.m_iID;
01485          return;
01486       }
01487    }
01488 }
01489 
01490 #ifndef WIN32
01491    void* CUDTUnited::garbageCollect(void* p)
01492 #else
01493    DWORD WINAPI CUDTUnited::garbageCollect(LPVOID p)
01494 #endif
01495 {
01496    CUDTUnited* self = (CUDTUnited*)p;
01497 
01498    CGuard gcguard(self->m_GCStopLock);
01499 
01500    while (!self->m_bClosing)
01501    {
01502       self->checkBrokenSockets();
01503 
01504       #ifdef WIN32
01505          self->checkTLSValue();
01506       #endif
01507 
01508       #ifndef WIN32
01509          timeval now;
01510          timespec timeout;
01511          gettimeofday(&now, 0);
01512          timeout.tv_sec = now.tv_sec + 1;
01513          timeout.tv_nsec = now.tv_usec * 1000;
01514 
01515          pthread_cond_timedwait(&self->m_GCStopCond, &self->m_GCStopLock, &timeout);
01516       #else
01517          WaitForSingleObject(self->m_GCStopCond, 1000);
01518       #endif
01519    }
01520 
01521    // remove all sockets and multiplexers
01522    CGuard::enterCS(self->m_ControlLock);
01523    for (map<UDTSOCKET, CUDTSocket*>::iterator i = self->m_Sockets.begin(); i != self->m_Sockets.end(); ++ i)
01524    {
01525       i->second->m_pUDT->m_bBroken = true;
01526       i->second->m_pUDT->close();
01527       i->second->m_Status = CLOSED;
01528       i->second->m_TimeStamp = CTimer::getTime();
01529       self->m_ClosedSockets[i->first] = i->second;
01530 
01531       // remove from listener's queue
01532       map<UDTSOCKET, CUDTSocket*>::iterator ls = self->m_Sockets.find(i->second->m_ListenSocket);
01533       if (ls == self->m_Sockets.end())
01534       {
01535          ls = self->m_ClosedSockets.find(i->second->m_ListenSocket);
01536          if (ls == self->m_ClosedSockets.end())
01537             continue;
01538       }
01539 
01540       CGuard::enterCS(ls->second->m_AcceptLock);
01541       ls->second->m_pQueuedSockets->erase(i->second->m_SocketID);
01542       ls->second->m_pAcceptSockets->erase(i->second->m_SocketID);
01543       CGuard::leaveCS(ls->second->m_AcceptLock);
01544    }
01545    self->m_Sockets.clear();
01546 
01547    for (map<UDTSOCKET, CUDTSocket*>::iterator j = self->m_ClosedSockets.begin(); j != self->m_ClosedSockets.end(); ++ j)
01548    {
01549       j->second->m_TimeStamp = 0;
01550    }
01551    CGuard::leaveCS(self->m_ControlLock);
01552 
01553    while (true)
01554    {
01555       self->checkBrokenSockets();
01556 
01557       CGuard::enterCS(self->m_ControlLock);
01558       bool empty = self->m_ClosedSockets.empty();
01559       CGuard::leaveCS(self->m_ControlLock);
01560 
01561       if (empty)
01562          break;
01563 
01564       CTimer::sleep();
01565    }
01566 
01567    #ifndef WIN32
01568       return NULL;
01569    #else
01570       return 0;
01571    #endif
01572 }
01573 
01575 
01576 int CUDT::startup()
01577 {
01578    return s_UDTUnited.startup();
01579 }
01580 
01581 int CUDT::cleanup()
01582 {
01583    return s_UDTUnited.cleanup();
01584 }
01585 
01586 UDTSOCKET CUDT::socket(int af, int type, int)
01587 {
01588    if (!s_UDTUnited.m_bGCStatus)
01589       s_UDTUnited.startup();
01590 
01591    try
01592    {
01593       return s_UDTUnited.newSocket(af, type);
01594    }
01595    catch (CUDTException& e)
01596    {
01597       s_UDTUnited.setError(new CUDTException(e));
01598       return INVALID_SOCK;
01599    }
01600    catch (bad_alloc&)
01601    {
01602       s_UDTUnited.setError(new CUDTException(3, 2, 0));
01603       return INVALID_SOCK;
01604    }
01605    catch (...)
01606    {
01607       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
01608       return INVALID_SOCK;
01609    }
01610 }
01611 
01612 int CUDT::bind(UDTSOCKET u, const sockaddr* name, int namelen)
01613 {
01614    try
01615    {
01616       return s_UDTUnited.bind(u, name, namelen);
01617    }
01618    catch (CUDTException& e)
01619    {
01620       s_UDTUnited.setError(new CUDTException(e));
01621       return ERROR;
01622    }
01623    catch (bad_alloc&)
01624    {
01625       s_UDTUnited.setError(new CUDTException(3, 2, 0));
01626       return ERROR;
01627    }
01628    catch (...)
01629    {
01630       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
01631       return ERROR;
01632    }
01633 }
01634 
01635 int CUDT::bind(UDTSOCKET u, UDPSOCKET udpsock)
01636 {
01637    try
01638    {
01639       return s_UDTUnited.bind(u, udpsock);
01640    }
01641    catch (CUDTException& e)
01642    {
01643       s_UDTUnited.setError(new CUDTException(e));
01644       return ERROR;
01645    }
01646    catch (bad_alloc&)
01647    {
01648       s_UDTUnited.setError(new CUDTException(3, 2, 0));
01649       return ERROR;
01650    }
01651    catch (...)
01652    {
01653       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
01654       return ERROR;
01655    }
01656 }
01657 
01658 int CUDT::listen(UDTSOCKET u, int backlog)
01659 {
01660    try
01661    {
01662       return s_UDTUnited.listen(u, backlog);
01663    }
01664    catch (CUDTException& e)
01665    {
01666       s_UDTUnited.setError(new CUDTException(e));
01667       return ERROR;
01668    }
01669    catch (bad_alloc&)
01670    {
01671       s_UDTUnited.setError(new CUDTException(3, 2, 0));
01672       return ERROR;
01673    }
01674    catch (...)
01675    {
01676       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
01677       return ERROR;
01678    }
01679 }
01680 
01681 UDTSOCKET CUDT::accept(UDTSOCKET u, sockaddr* addr, int* addrlen)
01682 {
01683    try
01684    {
01685       return s_UDTUnited.accept(u, addr, addrlen);
01686    }
01687    catch (CUDTException& e)
01688    {
01689       s_UDTUnited.setError(new CUDTException(e));
01690       return INVALID_SOCK;
01691    }
01692    catch (...)
01693    {
01694       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
01695       return INVALID_SOCK;
01696    }
01697 }
01698 
01699 int CUDT::connect(UDTSOCKET u, const sockaddr* name, int namelen)
01700 {
01701    try
01702    {
01703       return s_UDTUnited.connect(u, name, namelen);
01704    }
01705    catch (CUDTException e)
01706    {
01707       s_UDTUnited.setError(new CUDTException(e));
01708       return ERROR;
01709    }
01710    catch (bad_alloc&)
01711    {
01712       s_UDTUnited.setError(new CUDTException(3, 2, 0));
01713       return ERROR;
01714    }
01715    catch (...)
01716    {
01717       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
01718       return ERROR;
01719    }
01720 }
01721 
01722 int CUDT::close(UDTSOCKET u)
01723 {
01724    try
01725    {
01726       return s_UDTUnited.close(u);
01727    }
01728    catch (CUDTException e)
01729    {
01730       s_UDTUnited.setError(new CUDTException(e));
01731       return ERROR;
01732    }
01733    catch (...)
01734    {
01735       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
01736       return ERROR;
01737    }
01738 }
01739 
01740 int CUDT::getpeername(UDTSOCKET u, sockaddr* name, int* namelen)
01741 {
01742    try
01743    {
01744       return s_UDTUnited.getpeername(u, name, namelen);
01745    }
01746    catch (CUDTException e)
01747    {
01748       s_UDTUnited.setError(new CUDTException(e));
01749       return ERROR;
01750    }
01751    catch (...)
01752    {
01753       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
01754       return ERROR;
01755    }
01756 }
01757 
01758 int CUDT::getsockname(UDTSOCKET u, sockaddr* name, int* namelen)
01759 {
01760    try
01761    {
01762       return s_UDTUnited.getsockname(u, name, namelen);;
01763    }
01764    catch (CUDTException e)
01765    {
01766       s_UDTUnited.setError(new CUDTException(e));
01767       return ERROR;
01768    }
01769    catch (...)
01770    {
01771       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
01772       return ERROR;
01773    }
01774 }
01775 
01776 int CUDT::getsockopt(UDTSOCKET u, int, UDTOpt optname, void* optval, int* optlen)
01777 {
01778    try
01779    {
01780       CUDT* udt = s_UDTUnited.lookup(u);
01781       udt->getOpt(optname, optval, *optlen);
01782       return 0;
01783    }
01784    catch (CUDTException e)
01785    {
01786       s_UDTUnited.setError(new CUDTException(e));
01787       return ERROR;
01788    }
01789    catch (...)
01790    {
01791       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
01792       return ERROR;
01793    }
01794 }
01795 
01796 int CUDT::setsockopt(UDTSOCKET u, int, UDTOpt optname, const void* optval, int optlen)
01797 {
01798    try
01799    {
01800       CUDT* udt = s_UDTUnited.lookup(u);
01801       udt->setOpt(optname, optval, optlen);
01802       return 0;
01803    }
01804    catch (CUDTException e)
01805    {
01806       s_UDTUnited.setError(new CUDTException(e));
01807       return ERROR;
01808    }
01809    catch (...)
01810    {
01811       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
01812       return ERROR;
01813    }
01814 }
01815 
01816 int CUDT::send(UDTSOCKET u, const char* buf, int len, int)
01817 {
01818    try
01819    {
01820       CUDT* udt = s_UDTUnited.lookup(u);
01821       return udt->send(buf, len);
01822    }
01823    catch (CUDTException e)
01824    {
01825       s_UDTUnited.setError(new CUDTException(e));
01826       return ERROR;
01827    }
01828    catch (bad_alloc&)
01829    {
01830       s_UDTUnited.setError(new CUDTException(3, 2, 0));
01831       return ERROR;
01832    }
01833    catch (...)
01834    {
01835       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
01836       return ERROR;
01837    }
01838 }
01839 
01840 int CUDT::recv(UDTSOCKET u, char* buf, int len, int)
01841 {
01842    try
01843    {
01844       CUDT* udt = s_UDTUnited.lookup(u);
01845       return udt->recv(buf, len);
01846    }
01847    catch (CUDTException e)
01848    {
01849       s_UDTUnited.setError(new CUDTException(e));
01850       return ERROR;
01851    }
01852    catch (...)
01853    {
01854       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
01855       return ERROR;
01856    }
01857 }
01858 
01859 int CUDT::sendmsg(UDTSOCKET u, const char* buf, int len, int ttl, bool inorder)
01860 {
01861    try
01862    {
01863       CUDT* udt = s_UDTUnited.lookup(u);
01864       return udt->sendmsg(buf, len, ttl, inorder);
01865    }
01866    catch (CUDTException e)
01867    {
01868       s_UDTUnited.setError(new CUDTException(e));
01869       return ERROR;
01870    }
01871    catch (bad_alloc&)
01872    {
01873       s_UDTUnited.setError(new CUDTException(3, 2, 0));
01874       return ERROR;
01875    }
01876    catch (...)
01877    {
01878       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
01879       return ERROR;
01880    }
01881 }
01882 
01883 int CUDT::recvmsg(UDTSOCKET u, char* buf, int len)
01884 {
01885    try
01886    {
01887       CUDT* udt = s_UDTUnited.lookup(u);
01888       return udt->recvmsg(buf, len);
01889    }
01890    catch (CUDTException e)
01891    {
01892       s_UDTUnited.setError(new CUDTException(e));
01893       return ERROR;
01894    }
01895    catch (...)
01896    {
01897       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
01898       return ERROR;
01899    }
01900 }
01901 
01902 int64_t CUDT::sendfile(UDTSOCKET u, fstream& ifs, int64_t& offset, int64_t size, int block)
01903 {
01904    try
01905    {
01906       CUDT* udt = s_UDTUnited.lookup(u);
01907       return udt->sendfile(ifs, offset, size, block);
01908    }
01909    catch (CUDTException e)
01910    {
01911       s_UDTUnited.setError(new CUDTException(e));
01912       return ERROR;
01913    }
01914    catch (bad_alloc&)
01915    {
01916       s_UDTUnited.setError(new CUDTException(3, 2, 0));
01917       return ERROR;
01918    }
01919    catch (...)
01920    {
01921       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
01922       return ERROR;
01923    }
01924 }
01925 
01926 int64_t CUDT::recvfile(UDTSOCKET u, fstream& ofs, int64_t& offset, int64_t size, int block)
01927 {
01928    try
01929    {
01930       CUDT* udt = s_UDTUnited.lookup(u);
01931       return udt->recvfile(ofs, offset, size, block);
01932    }
01933    catch (CUDTException e)
01934    {
01935       s_UDTUnited.setError(new CUDTException(e));
01936       return ERROR;
01937    }
01938    catch (...)
01939    {
01940       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
01941       return ERROR;
01942    }
01943 }
01944 
01945 int CUDT::select(int, ud_set* readfds, ud_set* writefds, ud_set* exceptfds, const timeval* timeout)
01946 {
01947    if ((NULL == readfds) && (NULL == writefds) && (NULL == exceptfds))
01948    {
01949       s_UDTUnited.setError(new CUDTException(5, 3, 0));
01950       return ERROR;
01951    }
01952 
01953    try
01954    {
01955       return s_UDTUnited.select(readfds, writefds, exceptfds, timeout);
01956    }
01957    catch (CUDTException e)
01958    {
01959       s_UDTUnited.setError(new CUDTException(e));
01960       return ERROR;
01961    }
01962    catch (bad_alloc&)
01963    {
01964       s_UDTUnited.setError(new CUDTException(3, 2, 0));
01965       return ERROR;
01966    }
01967    catch (...)
01968    {
01969       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
01970       return ERROR;
01971    }
01972 }
01973 
01974 int CUDT::selectEx(const vector<UDTSOCKET>& fds, vector<UDTSOCKET>* readfds, vector<UDTSOCKET>* writefds, vector<UDTSOCKET>* exceptfds, int64_t msTimeOut)
01975 {
01976    if ((NULL == readfds) && (NULL == writefds) && (NULL == exceptfds))
01977    {
01978       s_UDTUnited.setError(new CUDTException(5, 3, 0));
01979       return ERROR;
01980    }
01981 
01982    try
01983    {
01984       return s_UDTUnited.selectEx(fds, readfds, writefds, exceptfds, msTimeOut);
01985    }
01986    catch (CUDTException e)
01987    {
01988       s_UDTUnited.setError(new CUDTException(e));
01989       return ERROR;
01990    }
01991    catch (bad_alloc&)
01992    {
01993       s_UDTUnited.setError(new CUDTException(3, 2, 0));
01994       return ERROR;
01995    }
01996    catch (...)
01997    {
01998       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
01999       return ERROR;
02000    }
02001 }
02002 
02003 int CUDT::epoll_create()
02004 {
02005    try
02006    {
02007       return s_UDTUnited.epoll_create();
02008    }
02009    catch (CUDTException e)
02010    {
02011       s_UDTUnited.setError(new CUDTException(e));
02012       return ERROR;
02013    }
02014    catch (...)
02015    {
02016       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
02017       return ERROR;
02018    }
02019 }
02020 
02021 int CUDT::epoll_add_usock(const int eid, const UDTSOCKET u, const int* events)
02022 {
02023    try
02024    {
02025       return s_UDTUnited.epoll_add_usock(eid, u, events);
02026    }
02027    catch (CUDTException e)
02028    {
02029       s_UDTUnited.setError(new CUDTException(e));
02030       return ERROR;
02031    }
02032    catch (...)
02033    {
02034       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
02035       return ERROR;
02036    }
02037 }
02038 
02039 // BARCHART
02040 int CUDT::epoll_update_usock(const int eid, const UDTSOCKET u, const int* events)
02041 {
02042    try
02043    {
02044       return s_UDTUnited.epoll_update_usock(eid, u, events);
02045    }
02046    catch (CUDTException e)
02047    {
02048       s_UDTUnited.setError(new CUDTException(e));
02049       return ERROR;
02050    }
02051    catch (...)
02052    {
02053       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
02054       return ERROR;
02055    }
02056 }
02057 
02058 // BARCHART
02059 int CUDT::epoll_verify_usock(const int eid, const UDTSOCKET u, int* events)
02060 {
02061    try
02062    {
02063       return s_UDTUnited.epoll_verify_usock(eid, u, events);
02064    }
02065    catch (CUDTException e)
02066    {
02067       s_UDTUnited.setError(new CUDTException(e));
02068       return ERROR;
02069    }
02070    catch (...)
02071    {
02072       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
02073       return ERROR;
02074    }
02075 }
02076 
02077 int CUDT::epoll_add_ssock(const int eid, const SYSSOCKET s, const int* events)
02078 {
02079    try
02080    {
02081       return s_UDTUnited.epoll_add_ssock(eid, s, events);
02082    }
02083    catch (CUDTException e)
02084    {
02085       s_UDTUnited.setError(new CUDTException(e));
02086       return ERROR;
02087    }
02088    catch (...)
02089    {
02090       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
02091       return ERROR;
02092    }
02093 }
02094 
02095 int CUDT::epoll_remove_usock(const int eid, const UDTSOCKET u)
02096 {
02097    try
02098    {
02099       return s_UDTUnited.epoll_remove_usock(eid, u);
02100    }
02101    catch (CUDTException e)
02102    {
02103       s_UDTUnited.setError(new CUDTException(e));
02104       return ERROR;
02105    }
02106    catch (...)
02107    {
02108       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
02109       return ERROR;
02110    }
02111 }
02112 
02113 int CUDT::epoll_remove_ssock(const int eid, const SYSSOCKET s)
02114 {
02115    try
02116    {
02117       return s_UDTUnited.epoll_remove_ssock(eid, s);
02118    }
02119    catch (CUDTException e)
02120    {
02121       s_UDTUnited.setError(new CUDTException(e));
02122       return ERROR;
02123    }
02124    catch (...)
02125    {
02126       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
02127       return ERROR;
02128    }
02129 }
02130 
02131 int CUDT::epoll_wait(const int eid, set<UDTSOCKET>* readfds, set<UDTSOCKET>* writefds, int64_t msTimeOut, set<SYSSOCKET>* lrfds, set<SYSSOCKET>* lwfds)
02132 {
02133    try
02134    {
02135       return s_UDTUnited.epoll_wait(eid, readfds, writefds, msTimeOut, lrfds, lwfds);
02136    }
02137    catch (CUDTException e)
02138    {
02139       s_UDTUnited.setError(new CUDTException(e));
02140       return ERROR;
02141    }
02142    catch (...)
02143    {
02144       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
02145       return ERROR;
02146    }
02147 }
02148 
02149 int CUDT::epoll_release(const int eid)
02150 {
02151    try
02152    {
02153       return s_UDTUnited.epoll_release(eid);
02154    }
02155    catch (CUDTException e)
02156    {
02157       s_UDTUnited.setError(new CUDTException(e));
02158       return ERROR;
02159    }
02160    catch (...)
02161    {
02162       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
02163       return ERROR;
02164    }
02165 }
02166 
02167 CUDTException& CUDT::getlasterror()
02168 {
02169    return *s_UDTUnited.getError();
02170 }
02171 
02172 int CUDT::perfmon(UDTSOCKET u, CPerfMon* perf, bool clear)
02173 {
02174    try
02175    {
02176       CUDT* udt = s_UDTUnited.lookup(u);
02177       udt->sample(perf, clear);
02178       return 0;
02179    }
02180    catch (CUDTException e)
02181    {
02182       s_UDTUnited.setError(new CUDTException(e));
02183       return ERROR;
02184    }
02185    catch (...)
02186    {
02187       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
02188       return ERROR;
02189    }
02190 }
02191 
02192 CUDT* CUDT::getUDTHandle(UDTSOCKET u)
02193 {
02194    try
02195    {
02196       return s_UDTUnited.lookup(u);
02197    }
02198    catch (...)
02199    {
02200       return NULL;
02201    }
02202 }
02203 
02204 UDTSTATUS CUDT::getsockstate(UDTSOCKET u)
02205 {
02206    try
02207    {
02208       return s_UDTUnited.getStatus(u);
02209    }
02210    catch (...)
02211    {
02212       s_UDTUnited.setError(new CUDTException(-1, 0, 0));
02213       return NONEXIST;
02214    }
02215 }
02216 
02217 
02219 
02220 namespace UDT
02221 {
02222 
02223 int startup()
02224 {
02225    return CUDT::startup();
02226 }
02227 
02228 int cleanup()
02229 {
02230    return CUDT::cleanup();
02231 }
02232 
02233 UDTSOCKET socket(int af, int type, int protocol)
02234 {
02235    return CUDT::socket(af, type, protocol);
02236 }
02237 
02238 int bind(UDTSOCKET u, const struct sockaddr* name, int namelen)
02239 {
02240    return CUDT::bind(u, name, namelen);
02241 }
02242 
02243 int bind2(UDTSOCKET u, UDPSOCKET udpsock)
02244 {
02245    return CUDT::bind(u, udpsock);
02246 }
02247 
02248 int listen(UDTSOCKET u, int backlog)
02249 {
02250    return CUDT::listen(u, backlog);
02251 }
02252 
02253 UDTSOCKET accept(UDTSOCKET u, struct sockaddr* addr, int* addrlen)
02254 {
02255    return CUDT::accept(u, addr, addrlen);
02256 }
02257 
02258 int connect(UDTSOCKET u, const struct sockaddr* name, int namelen)
02259 {
02260    return CUDT::connect(u, name, namelen);
02261 }
02262 
02263 int close(UDTSOCKET u)
02264 {
02265    return CUDT::close(u);
02266 }
02267 
02268 int getpeername(UDTSOCKET u, struct sockaddr* name, int* namelen)
02269 {
02270    return CUDT::getpeername(u, name, namelen);
02271 }
02272 
02273 int getsockname(UDTSOCKET u, struct sockaddr* name, int* namelen)
02274 {
02275    return CUDT::getsockname(u, name, namelen);
02276 }
02277 
02278 int getsockopt(UDTSOCKET u, int level, SOCKOPT optname, void* optval, int* optlen)
02279 {
02280    return CUDT::getsockopt(u, level, optname, optval, optlen);
02281 }
02282 
02283 int setsockopt(UDTSOCKET u, int level, SOCKOPT optname, const void* optval, int optlen)
02284 {
02285    return CUDT::setsockopt(u, level, optname, optval, optlen);
02286 }
02287 
02288 int send(UDTSOCKET u, const char* buf, int len, int flags)
02289 {
02290    return CUDT::send(u, buf, len, flags);
02291 }
02292 
02293 int recv(UDTSOCKET u, char* buf, int len, int flags)
02294 {
02295    return CUDT::recv(u, buf, len, flags);
02296 }
02297 
02298 int sendmsg(UDTSOCKET u, const char* buf, int len, int ttl, bool inorder)
02299 {
02300    return CUDT::sendmsg(u, buf, len, ttl, inorder);
02301 }
02302 
02303 int recvmsg(UDTSOCKET u, char* buf, int len)
02304 {
02305    return CUDT::recvmsg(u, buf, len);
02306 }
02307 
02308 int64_t sendfile(UDTSOCKET u, fstream& ifs, int64_t& offset, int64_t size, int block)
02309 {
02310    return CUDT::sendfile(u, ifs, offset, size, block);
02311 }
02312 
02313 int64_t recvfile(UDTSOCKET u, fstream& ofs, int64_t& offset, int64_t size, int block)
02314 {
02315    return CUDT::recvfile(u, ofs, offset, size, block);
02316 }
02317 
02318 int64_t sendfile2(UDTSOCKET u, const char* path, int64_t* offset, int64_t size, int block)
02319 {
02320    fstream ifs(path, ios::binary | ios::in);
02321    int64_t ret = CUDT::sendfile(u, ifs, *offset, size, block);
02322    ifs.close();
02323    return ret;
02324 }
02325 
02326 int64_t recvfile2(UDTSOCKET u, const char* path, int64_t* offset, int64_t size, int block)
02327 {
02328    fstream ofs(path, ios::binary | ios::out);
02329    int64_t ret = CUDT::recvfile(u, ofs, *offset, size, block);
02330    ofs.close();
02331    return ret;
02332 }
02333 
02334 int select(int nfds, UDSET* readfds, UDSET* writefds, UDSET* exceptfds, const struct timeval* timeout)
02335 {
02336    return CUDT::select(nfds, readfds, writefds, exceptfds, timeout);
02337 }
02338 
02339 int selectEx(const vector<UDTSOCKET>& fds, vector<UDTSOCKET>* readfds, vector<UDTSOCKET>* writefds, vector<UDTSOCKET>* exceptfds, int64_t msTimeOut)
02340 {
02341    return CUDT::selectEx(fds, readfds, writefds, exceptfds, msTimeOut);
02342 }
02343 
02344 int epoll_create()
02345 {
02346    return CUDT::epoll_create();
02347 }
02348 
02349 int epoll_add_usock(int eid, UDTSOCKET u, const int* events)
02350 {
02351    return CUDT::epoll_add_usock(eid, u, events);
02352 }
02353 
02354 // BARCHART
02355 int epoll_update_usock(int eid, UDTSOCKET u, const int* events)
02356 {
02357    return CUDT::epoll_update_usock(eid, u, events);
02358 }
02359 
02360 // BARCHART
02361 int epoll_verify_usock(int eid, UDTSOCKET u, int* events)
02362 {
02363    return CUDT::epoll_verify_usock(eid, u, events);
02364 }
02365 
02366 int epoll_add_ssock(int eid, SYSSOCKET s, const int* events)
02367 {
02368    return CUDT::epoll_add_ssock(eid, s, events);
02369 }
02370 
02371 int epoll_remove_usock(int eid, UDTSOCKET u)
02372 {
02373    return CUDT::epoll_remove_usock(eid, u);
02374 }
02375 
02376 int epoll_remove_ssock(int eid, SYSSOCKET s)
02377 {
02378    return CUDT::epoll_remove_ssock(eid, s);
02379 }
02380 
02381 int epoll_wait(int eid, set<UDTSOCKET>* readfds, set<UDTSOCKET>* writefds, int64_t msTimeOut, set<SYSSOCKET>* lrfds, set<SYSSOCKET>* lwfds)
02382 {
02383    return CUDT::epoll_wait(eid, readfds, writefds, msTimeOut, lrfds, lwfds);
02384 }
02385 
02386 #define SET_RESULT(val, num, fds, it) \
02387    if ((val != NULL) && !val->empty()) \
02388    { \
02389       if (*num > static_cast<int>(val->size())) \
02390          *num = val->size(); \
02391       int count = 0; \
02392       for (it = val->begin(); it != val->end(); ++ it) \
02393       { \
02394          if (count >= *num) \
02395             break; \
02396          fds[count ++] = *it; \
02397       } \
02398    }
02399 int epoll_wait2(int eid, UDTSOCKET* readfds, int* rnum, UDTSOCKET* writefds, int* wnum, int64_t msTimeOut,
02400                 SYSSOCKET* lrfds, int* lrnum, SYSSOCKET* lwfds, int* lwnum)
02401 {
02402    // This API is an alternative format for epoll_wait, created for compatability with other languages.
02403    // Users need to pass in an array for holding the returned sockets, with the maximum array length
02404    // stored in *rnum, etc., which will be updated with returned number of sockets.
02405 
02406    set<UDTSOCKET> readset;
02407    set<UDTSOCKET> writeset;
02408    set<SYSSOCKET> lrset;
02409    set<SYSSOCKET> lwset;
02410    set<UDTSOCKET>* rval = NULL;
02411    set<UDTSOCKET>* wval = NULL;
02412    set<SYSSOCKET>* lrval = NULL;
02413    set<SYSSOCKET>* lwval = NULL;
02414    if ((readfds != NULL) && (rnum != NULL))
02415       rval = &readset;
02416    if ((writefds != NULL) && (wnum != NULL))
02417       wval = &writeset;
02418    if ((lrfds != NULL) && (lrnum != NULL))
02419       lrval = &lrset;
02420    if ((lwfds != NULL) && (lwnum != NULL))
02421       lwval = &lwset;
02422 
02423    int ret = CUDT::epoll_wait(eid, rval, wval, msTimeOut, lrval, lwval);
02424    if (ret > 0)
02425    {
02426       set<UDTSOCKET>::const_iterator i;
02427       SET_RESULT(rval, rnum, readfds, i);
02428       SET_RESULT(wval, wnum, writefds, i);
02429       set<SYSSOCKET>::const_iterator j;
02430       SET_RESULT(lrval, lrnum, lrfds, j);
02431       SET_RESULT(lwval, lwnum, lwfds, j);
02432    }
02433    return ret;
02434 }
02435 
02436 int epoll_release(int eid)
02437 {
02438    return CUDT::epoll_release(eid);
02439 }
02440 
02441 ERRORINFO& getlasterror()
02442 {
02443    return CUDT::getlasterror();
02444 }
02445 
02446 int getlasterror_code()
02447 {
02448    return CUDT::getlasterror().getErrorCode();
02449 }
02450 
02451 const char* getlasterror_desc()
02452 {
02453    return CUDT::getlasterror().getErrorMessage();
02454 }
02455 
02456 int perfmon(UDTSOCKET u, TRACEINFO* perf, bool clear)
02457 {
02458    return CUDT::perfmon(u, perf, clear);
02459 }
02460 
02461 UDTSTATUS getsockstate(UDTSOCKET u)
02462 {
02463    return CUDT::getsockstate(u);
02464 }
02465 
02466 }  // namespace UDT

Generated on 9 Feb 2013 for barchart-udt-core-2.2.2 by  doxygen 1.6.1