00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041 #ifdef LINUX
00042 #include <sys/epoll.h>
00043 #include <unistd.h>
00044 #endif
00045 #include <algorithm>
00046 #include <cerrno>
00047 #include <cstring>
00048 #include <iterator>
00049
00050 #include "common.h"
00051 #include "epoll.h"
00052 #include "udt.h"
00053
00054 using namespace std;
00055
00056 CEPoll::CEPoll():
00057 m_iIDSeed(0)
00058 {
00059 CGuard::createMutex(m_EPollLock);
00060 }
00061
00062 CEPoll::~CEPoll()
00063 {
00064 CGuard::releaseMutex(m_EPollLock);
00065 }
00066
00067 int CEPoll::create()
00068 {
00069 CGuard pg(m_EPollLock);
00070
00071 int localid = 0;
00072
00073 #ifdef LINUX
00074 localid = epoll_create(1024);
00075 if (localid < 0)
00076 throw CUDTException(-1, 0, errno);
00077 #else
00078
00079
00080
00081 #endif
00082
00083 if (++ m_iIDSeed >= 0x7FFFFFFF)
00084 m_iIDSeed = 0;
00085
00086 CEPollDesc desc;
00087 desc.m_iID = m_iIDSeed;
00088 desc.m_iLocalID = localid;
00089 m_mPolls[desc.m_iID] = desc;
00090
00091 return desc.m_iID;
00092 }
00093
00094 int CEPoll::add_usock(const int eid, const UDTSOCKET& u, const int* events)
00095 {
00096 CGuard pg(m_EPollLock);
00097
00098 map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
00099 if (p == m_mPolls.end())
00100 throw CUDTException(5, 13);
00101
00102 if (!events || (*events & UDT_EPOLL_IN))
00103 p->second.m_sUDTSocksIn.insert(u);
00104 if (!events || (*events & UDT_EPOLL_OUT))
00105 p->second.m_sUDTSocksOut.insert(u);
00106
00107 return 0;
00108 }
00109
00110
00111 int CEPoll::update_usock(const int eid, const UDTSOCKET& u, const int* events)
00112 {
00113 CGuard pg(m_EPollLock);
00114
00115 map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
00116 if (p == m_mPolls.end()){
00117 throw CUDTException(5, 13);
00118 }
00119
00120 if(events){
00121 if (*events & UDT_EPOLL_IN){
00122 p->second.m_sUDTSocksIn.insert(u);
00123 }else{
00124 p->second.m_sUDTSocksIn.erase(u);
00125 }
00126 if (*events & UDT_EPOLL_OUT){
00127 p->second.m_sUDTSocksOut.insert(u);
00128 } else{
00129 p->second.m_sUDTSocksOut.erase(u);
00130 }
00131 }
00132
00133 return 0;
00134 }
00135
00136
00137
00138 int CEPoll::verify_usock(const int eid, const UDTSOCKET& u, int* events)
00139 {
00140
00141 CGuard pg(m_EPollLock);
00142
00143 map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
00144 if (p == m_mPolls.end()){
00145 throw CUDTException(5, 13);
00146 }
00147
00148 if(events){
00149 if(p->second.m_sUDTSocksIn.find(u) != p->second.m_sUDTSocksIn.end()){
00150 *events |= UDT_EPOLL_IN;
00151 }
00152 if(p->second.m_sUDTSocksOut.find(u) != p->second.m_sUDTSocksOut.end()){
00153 *events |= UDT_EPOLL_OUT;
00154 }
00155 }
00156
00157 return 0;
00158
00159 }
00160
00161
00162 int CEPoll::add_ssock(const int eid, const SYSSOCKET& s, const int* events)
00163 {
00164 CGuard pg(m_EPollLock);
00165
00166 map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
00167 if (p == m_mPolls.end())
00168 throw CUDTException(5, 13);
00169
00170 #ifdef LINUX
00171 epoll_event ev;
00172
00173 if (NULL == events)
00174 ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;
00175 else
00176 {
00177 ev.events = 0;
00178 if (*events & UDT_EPOLL_IN)
00179 ev.events |= EPOLLIN;
00180 if (*events & UDT_EPOLL_OUT)
00181 ev.events |= EPOLLOUT;
00182 if (*events & UDT_EPOLL_ERR)
00183 ev.events |= EPOLLERR;
00184 }
00185
00186 ev.data.fd = s;
00187 if (::epoll_ctl(p->second.m_iLocalID, EPOLL_CTL_ADD, s, &ev) < 0)
00188 throw CUDTException();
00189 #endif
00190
00191 p->second.m_sLocals.insert(s);
00192
00193 return 0;
00194 }
00195
00196 int CEPoll::remove_usock(const int eid, const UDTSOCKET& u)
00197 {
00198 CGuard pg(m_EPollLock);
00199
00200 map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
00201 if (p == m_mPolls.end())
00202 throw CUDTException(5, 13);
00203
00204 p->second.m_sUDTSocksIn.erase(u);
00205 p->second.m_sUDTSocksOut.erase(u);
00206
00207
00208 p->second.m_sUDTReads.erase(u);
00209 p->second.m_sUDTWrites.erase(u);
00210
00211 return 0;
00212 }
00213
00214 int CEPoll::remove_ssock(const int eid, const SYSSOCKET& s)
00215 {
00216 CGuard pg(m_EPollLock);
00217
00218 map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
00219 if (p == m_mPolls.end())
00220 throw CUDTException(5, 13);
00221
00222 #ifdef LINUX
00223 epoll_event ev;
00224 if (::epoll_ctl(p->second.m_iLocalID, EPOLL_CTL_DEL, s, &ev) < 0)
00225 throw CUDTException();
00226 #endif
00227
00228 p->second.m_sLocals.erase(s);
00229
00230 return 0;
00231 }
00232
00233 int CEPoll::wait(const int eid, set<UDTSOCKET>* readfds, set<UDTSOCKET>* writefds, int64_t msTimeOut, set<SYSSOCKET>* lrfds, set<SYSSOCKET>* lwfds)
00234 {
00235
00236 if (!readfds && !writefds && !lrfds && lwfds && (msTimeOut < 0))
00237 throw CUDTException(5, 3, 0);
00238
00239
00240 if (readfds) readfds->clear();
00241 if (writefds) writefds->clear();
00242 if (lrfds) lrfds->clear();
00243 if (lwfds) lwfds->clear();
00244
00245 int total = 0;
00246
00247 int64_t entertime = CTimer::getTime();
00248 while (true)
00249 {
00250 CGuard::enterCS(m_EPollLock);
00251
00252 map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
00253 if (p == m_mPolls.end())
00254 {
00255 CGuard::leaveCS(m_EPollLock);
00256 throw CUDTException(5, 13);
00257 }
00258
00259 if (p->second.m_sUDTSocksIn.empty() && p->second.m_sUDTSocksOut.empty() && p->second.m_sLocals.empty() && (msTimeOut < 0))
00260 {
00261
00262 CGuard::leaveCS(m_EPollLock);
00263 throw CUDTException(5, 3);
00264 }
00265
00266
00267 if ((NULL != readfds) && (!p->second.m_sUDTReads.empty() || !p->second.m_sUDTExcepts.empty()))
00268 {
00269 *readfds = p->second.m_sUDTReads;
00270 for (set<UDTSOCKET>::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i)
00271 readfds->insert(*i);
00272 total += p->second.m_sUDTReads.size() + p->second.m_sUDTExcepts.size();
00273 }
00274 if ((NULL != writefds) && (!p->second.m_sUDTWrites.empty() || !p->second.m_sUDTExcepts.empty()))
00275 {
00276 *writefds = p->second.m_sUDTWrites;
00277 for (set<UDTSOCKET>::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i)
00278 writefds->insert(*i);
00279 total += p->second.m_sUDTWrites.size() + p->second.m_sUDTExcepts.size();
00280 }
00281
00282 if (lrfds || lwfds)
00283 {
00284 #ifdef LINUX
00285 const int max_events = p->second.m_sLocals.size();
00286 epoll_event ev[max_events];
00287 int nfds = ::epoll_wait(p->second.m_iLocalID, ev, max_events, 0);
00288
00289 for (int i = 0; i < nfds; ++ i)
00290 {
00291 if ((NULL != lrfds) && (ev[i].events & EPOLLIN))
00292 {
00293 lrfds->insert(ev[i].data.fd);
00294 ++ total;
00295 }
00296 if ((NULL != lwfds) && (ev[i].events & EPOLLOUT))
00297 {
00298 lwfds->insert(ev[i].data.fd);
00299 ++ total;
00300 }
00301 }
00302 #else
00303
00304
00305
00306
00307
00308 fd_set readfds;
00309 fd_set writefds;
00310 FD_ZERO(&readfds);
00311 FD_ZERO(&writefds);
00312
00313 for (set<SYSSOCKET>::const_iterator i = p->second.m_sLocals.begin(); i != p->second.m_sLocals.end(); ++ i)
00314 {
00315 if (lrfds)
00316 FD_SET(*i, &readfds);
00317 if (lwfds)
00318 FD_SET(*i, &writefds);
00319 }
00320
00321 timeval tv;
00322 tv.tv_sec = 0;
00323 tv.tv_usec = 0;
00324 if (::select(0, &readfds, &writefds, NULL, &tv) > 0)
00325 {
00326 for (set<SYSSOCKET>::const_iterator i = p->second.m_sLocals.begin(); i != p->second.m_sLocals.end(); ++ i)
00327 {
00328 if (lrfds && FD_ISSET(*i, &readfds))
00329 {
00330 lrfds->insert(*i);
00331 ++ total;
00332 }
00333 if (lwfds && FD_ISSET(*i, &writefds))
00334 {
00335 lwfds->insert(*i);
00336 ++ total;
00337 }
00338 }
00339 }
00340 #endif
00341 }
00342
00343 CGuard::leaveCS(m_EPollLock);
00344
00345 if (total > 0)
00346 return total;
00347
00348 if ((msTimeOut >= 0) && (int64_t(CTimer::getTime() - entertime) >= msTimeOut * 1000LL))
00349 throw CUDTException(6, 3, 0);
00350
00351 CTimer::waitForEvent();
00352 }
00353
00354 return 0;
00355 }
00356
00357 int CEPoll::release(const int eid)
00358 {
00359 CGuard pg(m_EPollLock);
00360
00361 map<int, CEPollDesc>::iterator i = m_mPolls.find(eid);
00362 if (i == m_mPolls.end())
00363 throw CUDTException(5, 13);
00364
00365 #ifdef LINUX
00366
00367 ::close(i->second.m_iLocalID);
00368 #endif
00369
00370 m_mPolls.erase(i);
00371
00372 return 0;
00373 }
00374
00375 namespace
00376 {
00377
00378 void update_epoll_sets(const UDTSOCKET& uid, const set<UDTSOCKET>& watch, set<UDTSOCKET>& result, bool enable)
00379 {
00380 if (enable && (watch.find(uid) != watch.end()))
00381 {
00382 result.insert(uid);
00383 }
00384 else if (!enable)
00385 {
00386 result.erase(uid);
00387 }
00388 }
00389
00390 }
00391
00392 int CEPoll::update_events(const UDTSOCKET& uid, std::set<int>& eids, int events, bool enable)
00393 {
00394 CGuard pg(m_EPollLock);
00395
00396 map<int, CEPollDesc>::iterator p;
00397
00398 vector<int> lost;
00399 for (set<int>::iterator i = eids.begin(); i != eids.end(); ++ i)
00400 {
00401 p = m_mPolls.find(*i);
00402 if (p == m_mPolls.end())
00403 {
00404 lost.push_back(*i);
00405 }
00406 else
00407 {
00408 if ((events & UDT_EPOLL_IN) != 0)
00409 update_epoll_sets(uid, p->second.m_sUDTSocksIn, p->second.m_sUDTReads, enable);
00410 if ((events & UDT_EPOLL_OUT) != 0)
00411 update_epoll_sets(uid, p->second.m_sUDTSocksOut, p->second.m_sUDTWrites, enable);
00412 if ((events & UDT_EPOLL_ERR) != 0)
00413 update_epoll_sets(uid, p->second.m_sUDTSocksEx, p->second.m_sUDTExcepts, enable);
00414 }
00415 }
00416
00417 for (vector<int>::iterator i = lost.begin(); i != lost.end(); ++ i)
00418 eids.erase(*i);
00419
00420 return 0;
00421 }