00001 /***************************************************************************** 00002 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois. 00003 All rights reserved. 00004 00005 Redistribution and use in source and binary forms, with or without 00006 modification, are permitted provided that the following conditions are 00007 met: 00008 00009 * Redistributions of source code must retain the above 00010 copyright notice, this list of conditions and the 00011 following disclaimer. 00012 00013 * Redistributions in binary form must reproduce the 00014 above copyright notice, this list of conditions 00015 and the following disclaimer in the documentation 00016 and/or other materials provided with the distribution. 00017 00018 * Neither the name of the University of Illinois 00019 nor the names of its contributors may be used to 00020 endorse or promote products derived from this 00021 software without specific prior written permission. 00022 00023 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00024 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 00025 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 00026 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR 00027 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00028 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00029 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00030 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00031 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00032 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00033 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00034 *****************************************************************************/ 00035 00036 /***************************************************************************** 00037 written by 00038 Yunhong Gu, last updated 02/28/2012 00039 *****************************************************************************/ 00040 00041 #ifndef __UDT_CORE_H__ 00042 #define __UDT_CORE_H__ 00043 00044 00045 #include "udt.h" 00046 #include "common.h" 00047 #include "list.h" 00048 #include "buffer.h" 00049 #include "window.h" 00050 #include "packet.h" 00051 #include "channel.h" 00052 #include "api.h" 00053 #include "ccc.h" 00054 #include "cache.h" 00055 #include "queue.h" 00056 00057 enum UDTSockType {UDT_STREAM = 1, UDT_DGRAM}; 00058 00059 class CUDT 00060 { 00061 friend class CUDTSocket; 00062 friend class CUDTUnited; 00063 friend class CCC; 00064 friend struct CUDTComp; 00065 friend class CCache<CInfoBlock>; 00066 friend class CRendezvousQueue; 00067 friend class CSndQueue; 00068 friend class CRcvQueue; 00069 friend class CSndUList; 00070 friend class CRcvUList; 00071 00072 private: // constructor and desctructor 00073 CUDT(); 00074 CUDT(const CUDT& ancestor); 00075 const CUDT& operator=(const CUDT&) {return *this;} 00076 ~CUDT(); 00077 00078 public: //API 00079 static int startup(); 00080 static int cleanup(); 00081 static UDTSOCKET socket(int af, int type = SOCK_STREAM, int protocol = 0); 00082 static int bind(UDTSOCKET u, const sockaddr* name, int namelen); 00083 static int bind(UDTSOCKET u, UDPSOCKET udpsock); 00084 static int listen(UDTSOCKET u, int backlog); 00085 static UDTSOCKET accept(UDTSOCKET u, sockaddr* addr, int* addrlen); 00086 static int connect(UDTSOCKET u, const sockaddr* name, int namelen); 00087 static int close(UDTSOCKET u); 00088 static int getpeername(UDTSOCKET u, sockaddr* name, int* namelen); 00089 static int getsockname(UDTSOCKET u, sockaddr* name, int* namelen); 00090 static int getsockopt(UDTSOCKET u, int level, UDTOpt optname, void* optval, int* optlen); 00091 static int setsockopt(UDTSOCKET u, int level, UDTOpt optname, const void* optval, int optlen); 00092 static int send(UDTSOCKET u, const char* buf, int len, int flags); 00093 static int recv(UDTSOCKET u, char* buf, int len, int flags); 00094 static int sendmsg(UDTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false); 00095 static int recvmsg(UDTSOCKET u, char* buf, int len); 00096 static int64_t sendfile(UDTSOCKET u, std::fstream& ifs, int64_t& offset, int64_t size, int block = 364000); 00097 static int64_t recvfile(UDTSOCKET u, std::fstream& ofs, int64_t& offset, int64_t size, int block = 7280000); 00098 static int select(int nfds, ud_set* readfds, ud_set* writefds, ud_set* exceptfds, const timeval* timeout); 00099 static int selectEx(const std::vector<UDTSOCKET>& fds, std::vector<UDTSOCKET>* readfds, std::vector<UDTSOCKET>* writefds, std::vector<UDTSOCKET>* exceptfds, int64_t msTimeOut); 00100 static int epoll_create(); 00101 static int epoll_add_usock(const int eid, const UDTSOCKET u, const int* events = NULL); 00102 static int epoll_add_ssock(const int eid, const SYSSOCKET s, const int* events = NULL); 00103 static int epoll_remove_usock(const int eid, const UDTSOCKET u); 00104 static int epoll_remove_ssock(const int eid, const SYSSOCKET s); 00105 static int epoll_wait(const int eid, std::set<UDTSOCKET>* readfds, std::set<UDTSOCKET>* writefds, int64_t msTimeOut, std::set<SYSSOCKET>* lrfds = NULL, std::set<SYSSOCKET>* wrfds = NULL); 00106 static int epoll_release(const int eid); 00107 static CUDTException& getlasterror(); 00108 static int perfmon(UDTSOCKET u, CPerfMon* perf, bool clear = true); 00109 static UDTSTATUS getsockstate(UDTSOCKET u); 00110 00111 // BARCHART 00112 static int epoll_update_usock(const int eid, const UDTSOCKET u, const int* events = NULL); 00113 // BARCHART 00114 static int epoll_verify_usock(const int eid, const UDTSOCKET u, int* events); 00115 00116 00117 public: // internal API 00118 static CUDT* getUDTHandle(UDTSOCKET u); 00119 00120 private: 00121 // Functionality: 00122 // initialize a UDT entity and bind to a local address. 00123 // Parameters: 00124 // None. 00125 // Returned value: 00126 // None. 00127 00128 void open(); 00129 00130 // Functionality: 00131 // Start listening to any connection request. 00132 // Parameters: 00133 // None. 00134 // Returned value: 00135 // None. 00136 00137 void listen(); 00138 00139 // Functionality: 00140 // Connect to a UDT entity listening at address "peer". 00141 // Parameters: 00142 // 0) [in] peer: The address of the listening UDT entity. 00143 // Returned value: 00144 // None. 00145 00146 void connect(const sockaddr* peer); 00147 00148 // Functionality: 00149 // Process the response handshake packet. 00150 // Parameters: 00151 // 0) [in] pkt: handshake packet. 00152 // Returned value: 00153 // Return 0 if connected, positive value if connection is in progress, otherwise error code. 00154 00155 int connect(const CPacket& pkt) throw (); 00156 00157 // Functionality: 00158 // Connect to a UDT entity listening at address "peer", which has sent "hs" request. 00159 // Parameters: 00160 // 0) [in] peer: The address of the listening UDT entity. 00161 // 1) [in/out] hs: The handshake information sent by the peer side (in), negotiated value (out). 00162 // Returned value: 00163 // None. 00164 00165 void connect(const sockaddr* peer, CHandShake* hs); 00166 00167 // Functionality: 00168 // Close the opened UDT entity. 00169 // Parameters: 00170 // None. 00171 // Returned value: 00172 // None. 00173 00174 void close(); 00175 00176 // Functionality: 00177 // Request UDT to send out a data block "data" with size of "len". 00178 // Parameters: 00179 // 0) [in] data: The address of the application data to be sent. 00180 // 1) [in] len: The size of the data block. 00181 // Returned value: 00182 // Actual size of data sent. 00183 00184 int send(const char* data, int len); 00185 00186 // Functionality: 00187 // Request UDT to receive data to a memory block "data" with size of "len". 00188 // Parameters: 00189 // 0) [out] data: data received. 00190 // 1) [in] len: The desired size of data to be received. 00191 // Returned value: 00192 // Actual size of data received. 00193 00194 int recv(char* data, int len); 00195 00196 // Functionality: 00197 // send a message of a memory block "data" with size of "len". 00198 // Parameters: 00199 // 0) [out] data: data received. 00200 // 1) [in] len: The desired size of data to be received. 00201 // 2) [in] ttl: the time-to-live of the message. 00202 // 3) [in] inorder: if the message should be delivered in order. 00203 // Returned value: 00204 // Actual size of data sent. 00205 00206 int sendmsg(const char* data, int len, int ttl, bool inorder); 00207 00208 // Functionality: 00209 // Receive a message to buffer "data". 00210 // Parameters: 00211 // 0) [out] data: data received. 00212 // 1) [in] len: size of the buffer. 00213 // Returned value: 00214 // Actual size of data received. 00215 00216 int recvmsg(char* data, int len); 00217 00218 // Functionality: 00219 // Request UDT to send out a file described as "fd", starting from "offset", with size of "size". 00220 // Parameters: 00221 // 0) [in] ifs: The input file stream. 00222 // 1) [in, out] offset: From where to read and send data; output is the new offset when the call returns. 00223 // 2) [in] size: How many data to be sent. 00224 // 3) [in] block: size of block per read from disk 00225 // Returned value: 00226 // Actual size of data sent. 00227 00228 int64_t sendfile(std::fstream& ifs, int64_t& offset, int64_t size, int block = 366000); 00229 00230 // Functionality: 00231 // Request UDT to receive data into a file described as "fd", starting from "offset", with expected size of "size". 00232 // Parameters: 00233 // 0) [out] ofs: The output file stream. 00234 // 1) [in, out] offset: From where to write data; output is the new offset when the call returns. 00235 // 2) [in] size: How many data to be received. 00236 // 3) [in] block: size of block per write to disk 00237 // Returned value: 00238 // Actual size of data received. 00239 00240 int64_t recvfile(std::fstream& ofs, int64_t& offset, int64_t size, int block = 7320000); 00241 00242 // Functionality: 00243 // Configure UDT options. 00244 // Parameters: 00245 // 0) [in] optName: The enum name of a UDT option. 00246 // 1) [in] optval: The value to be set. 00247 // 2) [in] optlen: size of "optval". 00248 // Returned value: 00249 // None. 00250 00251 void setOpt(UDTOpt optName, const void* optval, int optlen); 00252 00253 // Functionality: 00254 // Read UDT options. 00255 // Parameters: 00256 // 0) [in] optName: The enum name of a UDT option. 00257 // 1) [in] optval: The value to be returned. 00258 // 2) [out] optlen: size of "optval". 00259 // Returned value: 00260 // None. 00261 00262 void getOpt(UDTOpt optName, void* optval, int& optlen); 00263 00264 // Functionality: 00265 // read the performance data since last sample() call. 00266 // Parameters: 00267 // 0) [in, out] perf: pointer to a CPerfMon structure to record the performance data. 00268 // 1) [in] clear: flag to decide if the local performance trace should be cleared. 00269 // Returned value: 00270 // None. 00271 00272 void sample(CPerfMon* perf, bool clear = true); 00273 00274 private: 00275 static CUDTUnited s_UDTUnited; // UDT global management base 00276 00277 public: 00278 static const UDTSOCKET INVALID_SOCK; // invalid socket descriptor 00279 static const int ERROR; // socket api error returned value 00280 00281 private: // Identification 00282 UDTSOCKET m_SocketID; // UDT socket number 00283 UDTSockType m_iSockType; // Type of the UDT connection (SOCK_STREAM or SOCK_DGRAM) 00284 UDTSOCKET m_PeerID; // peer id, for multiplexer 00285 static const int m_iVersion; // UDT version, for compatibility use 00286 00287 private: // Packet sizes 00288 int m_iPktSize; // Maximum/regular packet size, in bytes 00289 int m_iPayloadSize; // Maximum/regular payload size, in bytes 00290 00291 private: // Options 00292 int m_iMSS; // Maximum Segment Size, in bytes 00293 bool m_bSynSending; // Sending syncronization mode 00294 bool m_bSynRecving; // Receiving syncronization mode 00295 int m_iFlightFlagSize; // Maximum number of packets in flight from the peer side 00296 int m_iSndBufSize; // Maximum UDT sender buffer size 00297 int m_iRcvBufSize; // Maximum UDT receiver buffer size 00298 linger m_Linger; // Linger information on close 00299 int m_iUDPSndBufSize; // UDP sending buffer size 00300 int m_iUDPRcvBufSize; // UDP receiving buffer size 00301 int m_iIPversion; // IP version 00302 bool m_bRendezvous; // Rendezvous connection mode 00303 int m_iSndTimeOut; // sending timeout in milliseconds 00304 int m_iRcvTimeOut; // receiving timeout in milliseconds 00305 bool m_bReuseAddr; // reuse an exiting port or not, for UDP multiplexer 00306 int64_t m_llMaxBW; // maximum data transfer rate (threshold) 00307 00308 private: // congestion control 00309 CCCVirtualFactory* m_pCCFactory; // Factory class to create a specific CC instance 00310 CCC* m_pCC; // congestion control class 00311 CCache<CInfoBlock>* m_pCache; // network information cache 00312 00313 private: // Status 00314 volatile bool m_bListening; // If the UDT entit is listening to connection 00315 volatile bool m_bConnecting; // The short phase when connect() is called but not yet completed 00316 volatile bool m_bConnected; // Whether the connection is on or off 00317 volatile bool m_bClosing; // If the UDT entity is closing 00318 volatile bool m_bShutdown; // If the peer side has shutdown the connection 00319 volatile bool m_bBroken; // If the connection has been broken 00320 volatile bool m_bPeerHealth; // If the peer status is normal 00321 bool m_bOpened; // If the UDT entity has been opened 00322 int m_iBrokenCounter; // a counter (number of GC checks) to let the GC tag this socket as disconnected 00323 00324 int m_iEXPCount; // Expiration counter 00325 int m_iBandwidth; // Estimated bandwidth, number of packets per second 00326 int m_iRTT; // RTT, in microseconds 00327 int m_iRTTVar; // RTT variance 00328 int m_iDeliveryRate; // Packet arrival rate at the receiver side 00329 00330 uint64_t m_ullLingerExpiration; // Linger expiration time (for GC to close a socket with data in sending buffer) 00331 00332 CHandShake m_ConnReq; // connection request 00333 CHandShake m_ConnRes; // connection response 00334 int64_t m_llLastReqTime; // last time when a connection request is sent 00335 00336 private: // Sending related data 00337 CSndBuffer* m_pSndBuffer; // Sender buffer 00338 CSndLossList* m_pSndLossList; // Sender loss list 00339 CPktTimeWindow* m_pSndTimeWindow; // Packet sending time window 00340 00341 volatile uint64_t m_ullInterval; // Inter-packet time, in CPU clock cycles 00342 uint64_t m_ullTimeDiff; // aggregate difference in inter-packet time 00343 00344 volatile int m_iFlowWindowSize; // Flow control window size 00345 volatile double m_dCongestionWindow; // congestion window size 00346 00347 volatile int32_t m_iSndLastAck; // Last ACK received 00348 volatile int32_t m_iSndLastDataAck; // The real last ACK that updates the sender buffer and loss list 00349 volatile int32_t m_iSndCurrSeqNo; // The largest sequence number that has been sent 00350 int32_t m_iLastDecSeq; // Sequence number sent last decrease occurs 00351 int32_t m_iSndLastAck2; // Last ACK2 sent back 00352 uint64_t m_ullSndLastAck2Time; // The time when last ACK2 was sent back 00353 00354 int32_t m_iISN; // Initial Sequence Number 00355 00356 void CCUpdate(); 00357 00358 private: // Receiving related data 00359 CRcvBuffer* m_pRcvBuffer; // Receiver buffer 00360 CRcvLossList* m_pRcvLossList; // Receiver loss list 00361 CACKWindow* m_pACKWindow; // ACK history window 00362 CPktTimeWindow* m_pRcvTimeWindow; // Packet arrival time window 00363 00364 int32_t m_iRcvLastAck; // Last sent ACK 00365 uint64_t m_ullLastAckTime; // Timestamp of last ACK 00366 int32_t m_iRcvLastAckAck; // Last sent ACK that has been acknowledged 00367 int32_t m_iAckSeqNo; // Last ACK sequence number 00368 int32_t m_iRcvCurrSeqNo; // Largest received sequence number 00369 00370 uint64_t m_ullLastWarningTime; // Last time that a warning message is sent 00371 00372 int32_t m_iPeerISN; // Initial Sequence Number of the peer side 00373 00374 private: // synchronization: mutexes and conditions 00375 pthread_mutex_t m_ConnectionLock; // used to synchronize connection operation 00376 00377 pthread_cond_t m_SendBlockCond; // used to block "send" call 00378 pthread_mutex_t m_SendBlockLock; // lock associated to m_SendBlockCond 00379 00380 pthread_mutex_t m_AckLock; // used to protected sender's loss list when processing ACK 00381 00382 pthread_cond_t m_RecvDataCond; // used to block "recv" when there is no data 00383 pthread_mutex_t m_RecvDataLock; // lock associated to m_RecvDataCond 00384 00385 pthread_mutex_t m_SendLock; // used to synchronize "send" call 00386 pthread_mutex_t m_RecvLock; // used to synchronize "recv" call 00387 00388 void initSynch(); 00389 void destroySynch(); 00390 void releaseSynch(); 00391 00392 private: // Generation and processing of packets 00393 void sendCtrl(int pkttype, void* lparam = NULL, void* rparam = NULL, int size = 0); 00394 void processCtrl(CPacket& ctrlpkt); 00395 int packData(CPacket& packet, uint64_t& ts); 00396 int processData(CUnit* unit); 00397 int listen(sockaddr* addr, CPacket& packet); 00398 00399 private: // Trace 00400 uint64_t m_StartTime; // timestamp when the UDT entity is started 00401 int64_t m_llSentTotal; // total number of sent data packets, including retransmissions 00402 int64_t m_llRecvTotal; // total number of received packets 00403 int m_iSndLossTotal; // total number of lost packets (sender side) 00404 int m_iRcvLossTotal; // total number of lost packets (receiver side) 00405 int m_iRetransTotal; // total number of retransmitted packets 00406 int m_iSentACKTotal; // total number of sent ACK packets 00407 int m_iRecvACKTotal; // total number of received ACK packets 00408 int m_iSentNAKTotal; // total number of sent NAK packets 00409 int m_iRecvNAKTotal; // total number of received NAK packets 00410 int64_t m_llSndDurationTotal; // total real time for sending 00411 00412 uint64_t m_LastSampleTime; // last performance sample time 00413 int64_t m_llTraceSent; // number of pakctes sent in the last trace interval 00414 int64_t m_llTraceRecv; // number of pakctes received in the last trace interval 00415 int m_iTraceSndLoss; // number of lost packets in the last trace interval (sender side) 00416 int m_iTraceRcvLoss; // number of lost packets in the last trace interval (receiver side) 00417 int m_iTraceRetrans; // number of retransmitted packets in the last trace interval 00418 int m_iSentACK; // number of ACKs sent in the last trace interval 00419 int m_iRecvACK; // number of ACKs received in the last trace interval 00420 int m_iSentNAK; // number of NAKs sent in the last trace interval 00421 int m_iRecvNAK; // number of NAKs received in the last trace interval 00422 int64_t m_llSndDuration; // real time for sending 00423 int64_t m_llSndDurationCounter; // timers to record the sending duration 00424 00425 private: // Timers 00426 uint64_t m_ullCPUFrequency; // CPU clock frequency, used for Timer, ticks per microsecond 00427 00428 static const int m_iSYNInterval; // Periodical Rate Control Interval, 10000 microsecond 00429 static const int m_iSelfClockInterval; // ACK interval for self-clocking 00430 00431 uint64_t m_ullNextACKTime; // Next ACK time, in CPU clock cycles, same below 00432 uint64_t m_ullNextNAKTime; // Next NAK time 00433 00434 volatile uint64_t m_ullSYNInt; // SYN interval 00435 volatile uint64_t m_ullACKInt; // ACK interval 00436 volatile uint64_t m_ullNAKInt; // NAK interval 00437 volatile uint64_t m_ullLastRspTime; // time stamp of last response from the peer 00438 00439 uint64_t m_ullMinNakInt; // NAK timeout lower bound; too small value can cause unnecessary retransmission 00440 uint64_t m_ullMinExpInt; // timeout lower bound threshold: too small timeout can cause problem 00441 00442 int m_iPktCount; // packet counter for ACK 00443 int m_iLightACKCount; // light ACK counter 00444 00445 uint64_t m_ullTargetTime; // scheduled time of next packet sending 00446 00447 void checkTimers(); 00448 00449 private: // for UDP multiplexer 00450 CSndQueue* m_pSndQueue; // packet sending queue 00451 CRcvQueue* m_pRcvQueue; // packet receiving queue 00452 sockaddr* m_pPeerAddr; // peer address 00453 uint32_t m_piSelfIP[4]; // local UDP IP address 00454 CSNode* m_pSNode; // node information for UDT list used in snd queue 00455 CRNode* m_pRNode; // node information for UDT list used in rcv queue 00456 00457 private: // for epoll 00458 std::set<int> m_sPollID; // set of epoll ID to trigger 00459 void addEPoll(const int eid); 00460 void removeEPoll(const int eid); 00461 }; 00462 00463 00464 #endif