queue.h

00001 /*****************************************************************************
00002 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
00003 All rights reserved.
00004 
00005 Redistribution and use in source and binary forms, with or without
00006 modification, are permitted provided that the following conditions are
00007 met:
00008 
00009 * Redistributions of source code must retain the above
00010   copyright notice, this list of conditions and the
00011   following disclaimer.
00012 
00013 * Redistributions in binary form must reproduce the
00014   above copyright notice, this list of conditions
00015   and the following disclaimer in the documentation
00016   and/or other materials provided with the distribution.
00017 
00018 * Neither the name of the University of Illinois
00019   nor the names of its contributors may be used to
00020   endorse or promote products derived from this
00021   software without specific prior written permission.
00022 
00023 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00024 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
00025 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
00026 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
00027 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00028 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00029 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00030 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00031 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00032 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00033 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00034 *****************************************************************************/
00035 
00036 /*****************************************************************************
00037 written by
00038    Yunhong Gu, last updated 01/12/2011
00039 *****************************************************************************/
00040 
00041 
00042 #ifndef __UDT_QUEUE_H__
00043 #define __UDT_QUEUE_H__
00044 
00045 #include "channel.h"
00046 #include "common.h"
00047 #include "packet.h"
00048 #include <list>
00049 #include <map>
00050 #include <queue>
00051 #include <vector>
00052 
00053 class CUDT;
00054 
00055 struct CUnit
00056 {
00057    CPacket m_Packet;            // packet
00058    int m_iFlag;                 // 0: free, 1: occupied, 2: msg read but not freed (out-of-order), 3: msg dropped
00059 };
00060 
00061 class CUnitQueue
00062 {
00063 friend class CRcvQueue;
00064 friend class CRcvBuffer;
00065 
00066 public:
00067    CUnitQueue();
00068    ~CUnitQueue();
00069 
00070 public:
00071 
00072       // Functionality:
00073       //    Initialize the unit queue.
00074       // Parameters:
00075       //    1) [in] size: queue size
00076       //    2) [in] mss: maximum segament size
00077       //    3) [in] version: IP version
00078       // Returned value:
00079       //    0: success, -1: failure.
00080 
00081    int init(int size, int mss, int version);
00082 
00083       // Functionality:
00084       //    Increase (double) the unit queue size.
00085       // Parameters:
00086       //    None.
00087       // Returned value:
00088       //    0: success, -1: failure.
00089 
00090    int increase();
00091 
00092       // Functionality:
00093       //    Decrease (halve) the unit queue size.
00094       // Parameters:
00095       //    None.
00096       // Returned value:
00097       //    0: success, -1: failure.
00098 
00099    int shrink();
00100 
00101       // Functionality:
00102       //    find an available unit for incoming packet.
00103       // Parameters:
00104       //    None.
00105       // Returned value:
00106       //    Pointer to the available unit, NULL if not found.
00107 
00108    CUnit* getNextAvailUnit();
00109 
00110 private:
00111    struct CQEntry
00112    {
00113       CUnit* m_pUnit;           // unit queue
00114       char* m_pBuffer;          // data buffer
00115       int m_iSize;              // size of each queue
00116 
00117       CQEntry* m_pNext;
00118    }
00119    *m_pQEntry,                  // pointer to the first unit queue
00120    *m_pCurrQueue,               // pointer to the current available queue
00121    *m_pLastQueue;               // pointer to the last unit queue
00122 
00123    CUnit* m_pAvailUnit;         // recent available unit
00124 
00125    int m_iSize;                 // total size of the unit queue, in number of packets
00126    int m_iCount;                // total number of valid packets in the queue
00127 
00128    int m_iMSS;                  // unit buffer size
00129    int m_iIPversion;            // IP version
00130 
00131 private:
00132    CUnitQueue(const CUnitQueue&);
00133    CUnitQueue& operator=(const CUnitQueue&);
00134 };
00135 
00136 struct CSNode
00137 {
00138    CUDT* m_pUDT;                // Pointer to the instance of CUDT socket
00139    uint64_t m_llTimeStamp;      // Time Stamp
00140 
00141    int m_iHeapLoc;              // location on the heap, -1 means not on the heap
00142 };
00143 
00144 class CSndUList
00145 {
00146 friend class CSndQueue;
00147 
00148 public:
00149    CSndUList();
00150    ~CSndUList();
00151 
00152 public:
00153 
00154       // Functionality:
00155       //    Insert a new UDT instance into the list.
00156       // Parameters:
00157       //    1) [in] ts: time stamp: next processing time
00158       //    2) [in] u: pointer to the UDT instance
00159       // Returned value:
00160       //    None.
00161 
00162    void insert(int64_t ts, const CUDT* u);
00163 
00164       // Functionality:
00165       //    Update the timestamp of the UDT instance on the list.
00166       // Parameters:
00167       //    1) [in] u: pointer to the UDT instance
00168       //    2) [in] resechedule: if the timestampe shoudl be rescheduled
00169       // Returned value:
00170       //    None.
00171 
00172    void update(const CUDT* u, bool reschedule = true);
00173 
00174       // Functionality:
00175       //    Retrieve the next packet and peer address from the first entry, and reschedule it in the queue.
00176       // Parameters:
00177       //    0) [out] addr: destination address of the next packet
00178       //    1) [out] pkt: the next packet to be sent
00179       // Returned value:
00180       //    1 if successfully retrieved, -1 if no packet found.
00181 
00182    int pop(sockaddr*& addr, CPacket& pkt);
00183 
00184       // Functionality:
00185       //    Remove UDT instance from the list.
00186       // Parameters:
00187       //    1) [in] u: pointer to the UDT instance
00188       // Returned value:
00189       //    None.
00190 
00191    void remove(const CUDT* u);
00192 
00193       // Functionality:
00194       //    Retrieve the next scheduled processing time.
00195       // Parameters:
00196       //    None.
00197       // Returned value:
00198       //    Scheduled processing time of the first UDT socket in the list.
00199 
00200    uint64_t getNextProcTime();
00201 
00202 private:
00203    void insert_(int64_t ts, const CUDT* u);
00204    void remove_(const CUDT* u);
00205 
00206 private:
00207    CSNode** m_pHeap;                    // The heap array
00208    int m_iArrayLength;                  // physical length of the array
00209    int m_iLastEntry;                    // position of last entry on the heap array
00210 
00211    pthread_mutex_t m_ListLock;
00212 
00213    pthread_mutex_t* m_pWindowLock;
00214    pthread_cond_t* m_pWindowCond;
00215 
00216    CTimer* m_pTimer;
00217 
00218 private:
00219    CSndUList(const CSndUList&);
00220    CSndUList& operator=(const CSndUList&);
00221 };
00222 
00223 struct CRNode
00224 {
00225    CUDT* m_pUDT;                // Pointer to the instance of CUDT socket
00226    uint64_t m_llTimeStamp;      // Time Stamp
00227 
00228    CRNode* m_pPrev;             // previous link
00229    CRNode* m_pNext;             // next link
00230 
00231    bool m_bOnList;              // if the node is already on the list
00232 };
00233 
00234 class CRcvUList
00235 {
00236 public:
00237    CRcvUList();
00238    ~CRcvUList();
00239 
00240 public:
00241 
00242       // Functionality:
00243       //    Insert a new UDT instance to the list.
00244       // Parameters:
00245       //    1) [in] u: pointer to the UDT instance
00246       // Returned value:
00247       //    None.
00248 
00249    void insert(const CUDT* u);
00250 
00251       // Functionality:
00252       //    Remove the UDT instance from the list.
00253       // Parameters:
00254       //    1) [in] u: pointer to the UDT instance
00255       // Returned value:
00256       //    None.
00257 
00258    void remove(const CUDT* u);
00259 
00260       // Functionality:
00261       //    Move the UDT instance to the end of the list, if it already exists; otherwise, do nothing.
00262       // Parameters:
00263       //    1) [in] u: pointer to the UDT instance
00264       // Returned value:
00265       //    None.
00266 
00267    void update(const CUDT* u);
00268 
00269 public:
00270    CRNode* m_pUList;            // the head node
00271 
00272 private:
00273    CRNode* m_pLast;             // the last node
00274 
00275 private:
00276    CRcvUList(const CRcvUList&);
00277    CRcvUList& operator=(const CRcvUList&);
00278 };
00279 
00280 class CHash
00281 {
00282 public:
00283    CHash();
00284    ~CHash();
00285 
00286 public:
00287 
00288       // Functionality:
00289       //    Initialize the hash table.
00290       // Parameters:
00291       //    1) [in] size: hash table size
00292       // Returned value:
00293       //    None.
00294 
00295    void init(int size);
00296 
00297       // Functionality:
00298       //    Look for a UDT instance from the hash table.
00299       // Parameters:
00300       //    1) [in] id: socket ID
00301       // Returned value:
00302       //    Pointer to a UDT instance, or NULL if not found.
00303 
00304    CUDT* lookup(int32_t id);
00305 
00306       // Functionality:
00307       //    Insert an entry to the hash table.
00308       // Parameters:
00309       //    1) [in] id: socket ID
00310       //    2) [in] u: pointer to the UDT instance
00311       // Returned value:
00312       //    None.
00313 
00314    void insert(int32_t id, CUDT* u);
00315 
00316       // Functionality:
00317       //    Remove an entry from the hash table.
00318       // Parameters:
00319       //    1) [in] id: socket ID
00320       // Returned value:
00321       //    None.
00322 
00323    void remove(int32_t id);
00324 
00325 private:
00326    struct CBucket
00327    {
00328       int32_t m_iID;            // Socket ID
00329       CUDT* m_pUDT;             // Socket instance
00330 
00331       CBucket* m_pNext;         // next bucket
00332    } **m_pBucket;               // list of buckets (the hash table)
00333 
00334    int m_iHashSize;             // size of hash table
00335 
00336 private:
00337    CHash(const CHash&);
00338    CHash& operator=(const CHash&);
00339 };
00340 
00341 class CRendezvousQueue
00342 {
00343 public:
00344    CRendezvousQueue();
00345    ~CRendezvousQueue();
00346 
00347 public:
00348    void insert(const UDTSOCKET& id, CUDT* u, int ipv, const sockaddr* addr, uint64_t ttl);
00349    void remove(const UDTSOCKET& id);
00350    CUDT* retrieve(const sockaddr* addr, UDTSOCKET& id);
00351 
00352    void updateConnStatus();
00353 
00354 private:
00355    struct CRL
00356    {
00357       UDTSOCKET m_iID;                  // UDT socket ID (self)
00358       CUDT* m_pUDT;                     // UDT instance
00359       int m_iIPversion;                 // IP version
00360       sockaddr* m_pPeerAddr;            // UDT sonnection peer address
00361       uint64_t m_ullTTL;                        // the time that this request expires
00362    };
00363    std::list<CRL> m_lRendezvousID;      // The sockets currently in rendezvous mode
00364 
00365    pthread_mutex_t m_RIDVectorLock;
00366 };
00367 
00368 class CSndQueue
00369 {
00370 friend class CUDT;
00371 friend class CUDTUnited;
00372 
00373 public:
00374    CSndQueue();
00375    ~CSndQueue();
00376 
00377 public:
00378 
00379       // Functionality:
00380       //    Initialize the sending queue.
00381       // Parameters:
00382       //    1) [in] c: UDP channel to be associated to the queue
00383       //    2) [in] t: Timer
00384       // Returned value:
00385       //    None.
00386 
00387    void init(CChannel* c, CTimer* t);
00388 
00389       // Functionality:
00390       //    Send out a packet to a given address.
00391       // Parameters:
00392       //    1) [in] addr: destination address
00393       //    2) [in] packet: packet to be sent out
00394       // Returned value:
00395       //    Size of data sent out.
00396 
00397    int sendto(const sockaddr* addr, CPacket& packet);
00398 
00399 private:
00400 #ifndef WIN32
00401    static void* worker(void* param);
00402 #else
00403    static DWORD WINAPI worker(LPVOID param);
00404 #endif
00405 
00406    pthread_t m_WorkerThread;
00407 
00408 private:
00409    CSndUList* m_pSndUList;              // List of UDT instances for data sending
00410    CChannel* m_pChannel;                // The UDP channel for data sending
00411    CTimer* m_pTimer;                    // Timing facility
00412 
00413    pthread_mutex_t m_WindowLock;
00414    pthread_cond_t m_WindowCond;
00415 
00416    volatile bool m_bClosing;            // closing the worker
00417    pthread_cond_t m_ExitCond;
00418 
00419 private:
00420    CSndQueue(const CSndQueue&);
00421    CSndQueue& operator=(const CSndQueue&);
00422 };
00423 
00424 class CRcvQueue
00425 {
00426 friend class CUDT;
00427 friend class CUDTUnited;
00428 
00429 public:
00430    CRcvQueue();
00431    ~CRcvQueue();
00432 
00433 public:
00434 
00435       // Functionality:
00436       //    Initialize the receiving queue.
00437       // Parameters:
00438       //    1) [in] size: queue size
00439       //    2) [in] mss: maximum packet size
00440       //    3) [in] version: IP version
00441       //    4) [in] hsize: hash table size
00442       //    5) [in] c: UDP channel to be associated to the queue
00443       //    6) [in] t: timer
00444       // Returned value:
00445       //    None.
00446 
00447    void init(int size, int payload, int version, int hsize, CChannel* c, CTimer* t);
00448 
00449       // Functionality:
00450       //    Read a packet for a specific UDT socket id.
00451       // Parameters:
00452       //    1) [in] id: Socket ID
00453       //    2) [out] packet: received packet
00454       // Returned value:
00455       //    Data size of the packet
00456 
00457    int recvfrom(int32_t id, CPacket& packet);
00458 
00459 private:
00460 #ifndef WIN32
00461    static void* worker(void* param);
00462 #else
00463    static DWORD WINAPI worker(LPVOID param);
00464 #endif
00465 
00466    pthread_t m_WorkerThread;
00467 
00468 private:
00469    CUnitQueue m_UnitQueue;              // The received packet queue
00470 
00471    CRcvUList* m_pRcvUList;              // List of UDT instances that will read packets from the queue
00472    CHash* m_pHash;                      // Hash table for UDT socket looking up
00473    CChannel* m_pChannel;                // UDP channel for receving packets
00474    CTimer* m_pTimer;                    // shared timer with the snd queue
00475 
00476    int m_iPayloadSize;                  // packet payload size
00477 
00478    volatile bool m_bClosing;            // closing the workder
00479    pthread_cond_t m_ExitCond;
00480 
00481 private:
00482    int setListener(CUDT* u);
00483    void removeListener(const CUDT* u);
00484 
00485    void registerConnector(const UDTSOCKET& id, CUDT* u, int ipv, const sockaddr* addr, uint64_t ttl);
00486    void removeConnector(const UDTSOCKET& id);
00487 
00488    void setNewEntry(CUDT* u);
00489    bool ifNewEntry();
00490    CUDT* getNewEntry();
00491 
00492    void storePkt(int32_t id, CPacket* pkt);
00493 
00494 private:
00495    pthread_mutex_t m_LSLock;
00496    CUDT* m_pListener;                                   // pointer to the (unique, if any) listening UDT entity
00497    CRendezvousQueue* m_pRendezvousQueue;                // The list of sockets in rendezvous mode
00498 
00499    std::vector<CUDT*> m_vNewEntry;                      // newly added entries, to be inserted
00500    pthread_mutex_t m_IDLock;
00501 
00502    std::map<int32_t, std::queue<CPacket*> > m_mBuffer;  // temporary buffer for rendezvous connection request
00503    pthread_mutex_t m_PassLock;
00504    pthread_cond_t m_PassCond;
00505 
00506 private:
00507    CRcvQueue(const CRcvQueue&);
00508    CRcvQueue& operator=(const CRcvQueue&);
00509 };
00510 
00511 struct CMultiplexer
00512 {
00513    CSndQueue* m_pSndQueue;      // The sending queue
00514    CRcvQueue* m_pRcvQueue;      // The receiving queue
00515    CChannel* m_pChannel;        // The UDP channel for sending and receiving
00516    CTimer* m_pTimer;            // The timer
00517 
00518    int m_iPort;                 // The UDP port number of this multiplexer
00519    int m_iIPversion;            // IP version
00520    int m_iMSS;                  // Maximum Segment Size
00521    int m_iRefCount;             // number of UDT instances that are associated with this multiplexer
00522    bool m_bReusable;            // if this one can be shared with others
00523 
00524    int m_iID;                   // multiplexer ID
00525 };
00526 
00527 #endif

Generated on 9 Feb 2013 for barchart-udt-core-2.2.2 by  doxygen 1.6.1