epoll.cpp

00001 /*****************************************************************************
00002 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
00003 All rights reserved.
00004 
00005 Redistribution and use in source and binary forms, with or without
00006 modification, are permitted provided that the following conditions are
00007 met:
00008 
00009 * Redistributions of source code must retain the above
00010   copyright notice, this list of conditions and the
00011   following disclaimer.
00012 
00013 * Redistributions in binary form must reproduce the
00014   above copyright notice, this list of conditions
00015   and the following disclaimer in the documentation
00016   and/or other materials provided with the distribution.
00017 
00018 * Neither the name of the University of Illinois
00019   nor the names of its contributors may be used to
00020   endorse or promote products derived from this
00021   software without specific prior written permission.
00022 
00023 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00024 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
00025 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
00026 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
00027 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00028 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00029 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00030 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00031 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00032 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00033 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00034 *****************************************************************************/
00035 
00036 /*****************************************************************************
00037 written by
00038    Yunhong Gu, last updated 01/01/2011
00039 *****************************************************************************/
00040 
00041 #ifdef LINUX
00042    #include <sys/epoll.h>
00043    #include <unistd.h>
00044 #endif
00045 #include <algorithm>
00046 #include <cerrno>
00047 #include <cstring>
00048 #include <iterator>
00049 
00050 #include "common.h"
00051 #include "epoll.h"
00052 #include "udt.h"
00053 
00054 using namespace std;
00055 
00056 CEPoll::CEPoll():
00057 m_iIDSeed(0)
00058 {
00059    CGuard::createMutex(m_EPollLock);
00060 }
00061 
00062 CEPoll::~CEPoll()
00063 {
00064    CGuard::releaseMutex(m_EPollLock);
00065 }
00066 
00067 int CEPoll::create()
00068 {
00069    CGuard pg(m_EPollLock);
00070 
00071    int localid = 0;
00072 
00073    #ifdef LINUX
00074    localid = epoll_create(1024);
00075    if (localid < 0)
00076       throw CUDTException(-1, 0, errno);
00077    #else
00078    // on BSD, use kqueue
00079    // on Solaris, use /dev/poll
00080    // on Windows, select
00081    #endif
00082 
00083    if (++ m_iIDSeed >= 0x7FFFFFFF)
00084       m_iIDSeed = 0;
00085 
00086    CEPollDesc desc;
00087    desc.m_iID = m_iIDSeed;
00088    desc.m_iLocalID = localid;
00089    m_mPolls[desc.m_iID] = desc;
00090 
00091    return desc.m_iID;
00092 }
00093 
00094 int CEPoll::add_usock(const int eid, const UDTSOCKET& u, const int* events)
00095 {
00096    CGuard pg(m_EPollLock);
00097 
00098    map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
00099    if (p == m_mPolls.end())
00100       throw CUDTException(5, 13);
00101 
00102    if (!events || (*events & UDT_EPOLL_IN))
00103       p->second.m_sUDTSocksIn.insert(u);
00104    if (!events || (*events & UDT_EPOLL_OUT))
00105       p->second.m_sUDTSocksOut.insert(u);
00106 
00107    return 0;
00108 }
00109 
00110 // BARCHART
00111 int CEPoll::update_usock(const int eid, const UDTSOCKET& u, const int* events)
00112 {
00113    CGuard pg(m_EPollLock);
00114 
00115    map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
00116    if (p == m_mPolls.end()){
00117               throw CUDTException(5, 13);
00118    }
00119 
00120    if(events){
00121            if (*events & UDT_EPOLL_IN){
00122                    p->second.m_sUDTSocksIn.insert(u);
00123            }else{
00124                    p->second.m_sUDTSocksIn.erase(u);
00125            }
00126            if (*events & UDT_EPOLL_OUT){
00127                    p->second.m_sUDTSocksOut.insert(u);
00128            } else{
00129                    p->second.m_sUDTSocksOut.erase(u);
00130            }
00131    }
00132 
00133    return 0;
00134 }
00135 
00136 
00137 // BARCHART
00138 int CEPoll::verify_usock(const int eid, const UDTSOCKET& u, int* events)
00139 {
00140 
00141    CGuard pg(m_EPollLock);
00142 
00143    map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
00144    if (p == m_mPolls.end()){
00145               throw CUDTException(5, 13);
00146    }
00147 
00148    if(events){
00149            if(p->second.m_sUDTSocksIn.find(u) != p->second.m_sUDTSocksIn.end()){
00150                    *events |= UDT_EPOLL_IN;
00151            }
00152            if(p->second.m_sUDTSocksOut.find(u) != p->second.m_sUDTSocksOut.end()){
00153                    *events |= UDT_EPOLL_OUT;
00154            }
00155    }
00156 
00157    return 0;
00158 
00159 }
00160 
00161 
00162 int CEPoll::add_ssock(const int eid, const SYSSOCKET& s, const int* events)
00163 {
00164    CGuard pg(m_EPollLock);
00165 
00166    map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
00167    if (p == m_mPolls.end())
00168       throw CUDTException(5, 13);
00169 
00170 #ifdef LINUX
00171    epoll_event ev;
00172 
00173    if (NULL == events)
00174       ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;
00175    else
00176    {
00177       ev.events = 0;
00178       if (*events & UDT_EPOLL_IN)
00179          ev.events |= EPOLLIN;
00180       if (*events & UDT_EPOLL_OUT)
00181          ev.events |= EPOLLOUT;
00182       if (*events & UDT_EPOLL_ERR)
00183          ev.events |= EPOLLERR;
00184    }
00185 
00186    ev.data.fd = s;
00187    if (::epoll_ctl(p->second.m_iLocalID, EPOLL_CTL_ADD, s, &ev) < 0)
00188       throw CUDTException();
00189 #endif
00190 
00191    p->second.m_sLocals.insert(s);
00192 
00193    return 0;
00194 }
00195 
00196 int CEPoll::remove_usock(const int eid, const UDTSOCKET& u)
00197 {
00198    CGuard pg(m_EPollLock);
00199 
00200    map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
00201    if (p == m_mPolls.end())
00202       throw CUDTException(5, 13);
00203 
00204    p->second.m_sUDTSocksIn.erase(u);
00205    p->second.m_sUDTSocksOut.erase(u);
00206 
00207    // when the socket is removed from a monitoring, it is not available anymore for any IO notification
00208    p->second.m_sUDTReads.erase(u);
00209    p->second.m_sUDTWrites.erase(u);
00210 
00211    return 0;
00212 }
00213 
00214 int CEPoll::remove_ssock(const int eid, const SYSSOCKET& s)
00215 {
00216    CGuard pg(m_EPollLock);
00217 
00218    map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
00219    if (p == m_mPolls.end())
00220       throw CUDTException(5, 13);
00221 
00222 #ifdef LINUX
00223    epoll_event ev;  // ev is ignored, for compatibility with old Linux kernel only.
00224    if (::epoll_ctl(p->second.m_iLocalID, EPOLL_CTL_DEL, s, &ev) < 0)
00225       throw CUDTException();
00226 #endif
00227 
00228    p->second.m_sLocals.erase(s);
00229 
00230    return 0;
00231 }
00232 
00233 int CEPoll::wait(const int eid, set<UDTSOCKET>* readfds, set<UDTSOCKET>* writefds, int64_t msTimeOut, set<SYSSOCKET>* lrfds, set<SYSSOCKET>* lwfds)
00234 {
00235    // if all fields is NULL and waiting time is infinite, then this would be a deadlock
00236    if (!readfds && !writefds && !lrfds && lwfds && (msTimeOut < 0))
00237       throw CUDTException(5, 3, 0);
00238 
00239    // Clear these sets in case the app forget to do it.
00240    if (readfds) readfds->clear();
00241    if (writefds) writefds->clear();
00242    if (lrfds) lrfds->clear();
00243    if (lwfds) lwfds->clear();
00244 
00245    int total = 0;
00246 
00247    int64_t entertime = CTimer::getTime();
00248    while (true)
00249    {
00250       CGuard::enterCS(m_EPollLock);
00251 
00252       map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
00253       if (p == m_mPolls.end())
00254       {
00255          CGuard::leaveCS(m_EPollLock);
00256          throw CUDTException(5, 13);
00257       }
00258 
00259       if (p->second.m_sUDTSocksIn.empty() && p->second.m_sUDTSocksOut.empty() && p->second.m_sLocals.empty() && (msTimeOut < 0))
00260       {
00261          // no socket is being monitored, this may be a deadlock
00262          CGuard::leaveCS(m_EPollLock);
00263          throw CUDTException(5, 3);
00264       }
00265 
00266       // Sockets with exceptions are returned to both read and write sets.
00267       if ((NULL != readfds) && (!p->second.m_sUDTReads.empty() || !p->second.m_sUDTExcepts.empty()))
00268       {
00269          *readfds = p->second.m_sUDTReads;
00270          for (set<UDTSOCKET>::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i)
00271             readfds->insert(*i);
00272          total += p->second.m_sUDTReads.size() + p->second.m_sUDTExcepts.size();
00273       }
00274       if ((NULL != writefds) && (!p->second.m_sUDTWrites.empty() || !p->second.m_sUDTExcepts.empty()))
00275       {
00276          *writefds = p->second.m_sUDTWrites;
00277          for (set<UDTSOCKET>::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i)
00278             writefds->insert(*i);
00279          total += p->second.m_sUDTWrites.size() + p->second.m_sUDTExcepts.size();
00280       }
00281 
00282       if (lrfds || lwfds)
00283       {
00284          #ifdef LINUX
00285          const int max_events = p->second.m_sLocals.size();
00286          epoll_event ev[max_events];
00287          int nfds = ::epoll_wait(p->second.m_iLocalID, ev, max_events, 0);
00288 
00289          for (int i = 0; i < nfds; ++ i)
00290          {
00291             if ((NULL != lrfds) && (ev[i].events & EPOLLIN))
00292            {
00293                lrfds->insert(ev[i].data.fd);
00294                ++ total;
00295             }
00296             if ((NULL != lwfds) && (ev[i].events & EPOLLOUT))
00297             {
00298                lwfds->insert(ev[i].data.fd);
00299                ++ total;
00300             }
00301          }
00302          #else
00303          //currently "select" is used for all non-Linux platforms.
00304          //faster approaches can be applied for specific systems in the future.
00305 
00306          //"select" has a limitation on the number of sockets
00307 
00308          fd_set readfds;
00309          fd_set writefds;
00310          FD_ZERO(&readfds);
00311          FD_ZERO(&writefds);
00312 
00313          for (set<SYSSOCKET>::const_iterator i = p->second.m_sLocals.begin(); i != p->second.m_sLocals.end(); ++ i)
00314          {
00315             if (lrfds)
00316                FD_SET(*i, &readfds);
00317             if (lwfds)
00318                FD_SET(*i, &writefds);
00319          }
00320 
00321          timeval tv;
00322          tv.tv_sec = 0;
00323          tv.tv_usec = 0;
00324          if (::select(0, &readfds, &writefds, NULL, &tv) > 0)
00325          {
00326             for (set<SYSSOCKET>::const_iterator i = p->second.m_sLocals.begin(); i != p->second.m_sLocals.end(); ++ i)
00327             {
00328                if (lrfds && FD_ISSET(*i, &readfds))
00329                {
00330                   lrfds->insert(*i);
00331                   ++ total;
00332                }
00333                if (lwfds && FD_ISSET(*i, &writefds))
00334                {
00335                   lwfds->insert(*i);
00336                   ++ total;
00337                }
00338             }
00339          }
00340          #endif
00341       }
00342 
00343       CGuard::leaveCS(m_EPollLock);
00344 
00345       if (total > 0)
00346          return total;
00347 
00348       if ((msTimeOut >= 0) && (int64_t(CTimer::getTime() - entertime) >= msTimeOut * 1000LL))
00349          throw CUDTException(6, 3, 0);
00350 
00351       CTimer::waitForEvent();
00352    }
00353 
00354    return 0;
00355 }
00356 
00357 int CEPoll::release(const int eid)
00358 {
00359    CGuard pg(m_EPollLock);
00360 
00361    map<int, CEPollDesc>::iterator i = m_mPolls.find(eid);
00362    if (i == m_mPolls.end())
00363       throw CUDTException(5, 13);
00364 
00365    #ifdef LINUX
00366    // release local/system epoll descriptor
00367    ::close(i->second.m_iLocalID);
00368    #endif
00369 
00370    m_mPolls.erase(i);
00371 
00372    return 0;
00373 }
00374 
00375 namespace
00376 {
00377 
00378 void update_epoll_sets(const UDTSOCKET& uid, const set<UDTSOCKET>& watch, set<UDTSOCKET>& result, bool enable)
00379 {
00380    if (enable && (watch.find(uid) != watch.end()))
00381    {
00382       result.insert(uid);
00383    }
00384    else if (!enable)
00385    {
00386       result.erase(uid);
00387    }
00388 }
00389 
00390 }  // namespace
00391 
00392 int CEPoll::update_events(const UDTSOCKET& uid, std::set<int>& eids, int events, bool enable)
00393 {
00394    CGuard pg(m_EPollLock);
00395 
00396    map<int, CEPollDesc>::iterator p;
00397 
00398    vector<int> lost;
00399    for (set<int>::iterator i = eids.begin(); i != eids.end(); ++ i)
00400    {
00401       p = m_mPolls.find(*i);
00402       if (p == m_mPolls.end())
00403       {
00404          lost.push_back(*i);
00405       }
00406       else
00407       {
00408          if ((events & UDT_EPOLL_IN) != 0)
00409             update_epoll_sets(uid, p->second.m_sUDTSocksIn, p->second.m_sUDTReads, enable);
00410          if ((events & UDT_EPOLL_OUT) != 0)
00411             update_epoll_sets(uid, p->second.m_sUDTSocksOut, p->second.m_sUDTWrites, enable);
00412          if ((events & UDT_EPOLL_ERR) != 0)
00413             update_epoll_sets(uid, p->second.m_sUDTSocksEx, p->second.m_sUDTExcepts, enable);
00414       }
00415    }
00416 
00417    for (vector<int>::iterator i = lost.begin(); i != lost.end(); ++ i)
00418       eids.erase(*i);
00419 
00420    return 0;
00421 }

Generated on 9 Feb 2013 for barchart-udt-core-2.2.2 by  doxygen 1.6.1