buffer.cpp

00001 /*****************************************************************************
00002 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
00003 All rights reserved.
00004 
00005 Redistribution and use in source and binary forms, with or without
00006 modification, are permitted provided that the following conditions are
00007 met:
00008 
00009 * Redistributions of source code must retain the above
00010   copyright notice, this list of conditions and the
00011   following disclaimer.
00012 
00013 * Redistributions in binary form must reproduce the
00014   above copyright notice, this list of conditions
00015   and the following disclaimer in the documentation
00016   and/or other materials provided with the distribution.
00017 
00018 * Neither the name of the University of Illinois
00019   nor the names of its contributors may be used to
00020   endorse or promote products derived from this
00021   software without specific prior written permission.
00022 
00023 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00024 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
00025 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
00026 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
00027 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00028 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00029 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00030 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00031 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00032 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00033 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00034 *****************************************************************************/
00035 
00036 /*****************************************************************************
00037 written by
00038    Yunhong Gu, last updated 03/12/2011
00039 *****************************************************************************/
00040 
00041 #include <cstring>
00042 #include <cmath>
00043 #include "buffer.h"
00044 
00045 using namespace std;
00046 
00047 CSndBuffer::CSndBuffer(int size, int mss):
00048 m_BufLock(),
00049 m_pBlock(NULL),
00050 m_pFirstBlock(NULL),
00051 m_pCurrBlock(NULL),
00052 m_pLastBlock(NULL),
00053 m_pBuffer(NULL),
00054 m_iNextMsgNo(1),
00055 m_iSize(size),
00056 m_iMSS(mss),
00057 m_iCount(0)
00058 {
00059    // initial physical buffer of "size"
00060    m_pBuffer = new Buffer;
00061    m_pBuffer->m_pcData = new char [m_iSize * m_iMSS];
00062    m_pBuffer->m_iSize = m_iSize;
00063    m_pBuffer->m_pNext = NULL;
00064 
00065    // circular linked list for out bound packets
00066    m_pBlock = new Block;
00067    Block* pb = m_pBlock;
00068    for (int i = 1; i < m_iSize; ++ i)
00069    {
00070       pb->m_pNext = new Block;
00071       pb->m_iMsgNo = 0;
00072       pb = pb->m_pNext;
00073    }
00074    pb->m_pNext = m_pBlock;
00075 
00076    pb = m_pBlock;
00077    char* pc = m_pBuffer->m_pcData;
00078    for (int i = 0; i < m_iSize; ++ i)
00079    {
00080       pb->m_pcData = pc;
00081       pb = pb->m_pNext;
00082       pc += m_iMSS;
00083    }
00084 
00085    m_pFirstBlock = m_pCurrBlock = m_pLastBlock = m_pBlock;
00086 
00087    #ifndef WIN32
00088       pthread_mutex_init(&m_BufLock, NULL);
00089    #else
00090       m_BufLock = CreateMutex(NULL, false, NULL);
00091    #endif
00092 }
00093 
00094 CSndBuffer::~CSndBuffer()
00095 {
00096    Block* pb = m_pBlock->m_pNext;
00097    while (pb != m_pBlock)
00098    {
00099       Block* temp = pb;
00100       pb = pb->m_pNext;
00101       delete temp;
00102    }
00103    delete m_pBlock;
00104 
00105    while (m_pBuffer != NULL)
00106    {
00107       Buffer* temp = m_pBuffer;
00108       m_pBuffer = m_pBuffer->m_pNext;
00109       delete [] temp->m_pcData;
00110       delete temp;
00111    }
00112 
00113    #ifndef WIN32
00114       pthread_mutex_destroy(&m_BufLock);
00115    #else
00116       CloseHandle(m_BufLock);
00117    #endif
00118 }
00119 
00120 void CSndBuffer::addBuffer(const char* data, int len, int ttl, bool order)
00121 {
00122    int size = len / m_iMSS;
00123    if ((len % m_iMSS) != 0)
00124       size ++;
00125 
00126    // dynamically increase sender buffer
00127    while (size + m_iCount >= m_iSize)
00128       increase();
00129 
00130    uint64_t time = CTimer::getTime();
00131    int32_t inorder = order;
00132    inorder <<= 29;
00133 
00134    Block* s = m_pLastBlock;
00135    for (int i = 0; i < size; ++ i)
00136    {
00137       int pktlen = len - i * m_iMSS;
00138       if (pktlen > m_iMSS)
00139          pktlen = m_iMSS;
00140 
00141       memcpy(s->m_pcData, data + i * m_iMSS, pktlen);
00142       s->m_iLength = pktlen;
00143 
00144       s->m_iMsgNo = m_iNextMsgNo | inorder;
00145       if (i == 0)
00146          s->m_iMsgNo |= 0x80000000;
00147       if (i == size - 1)
00148          s->m_iMsgNo |= 0x40000000;
00149 
00150       s->m_OriginTime = time;
00151       s->m_iTTL = ttl;
00152 
00153       s = s->m_pNext;
00154    }
00155    m_pLastBlock = s;
00156 
00157    CGuard::enterCS(m_BufLock);
00158    m_iCount += size;
00159    CGuard::leaveCS(m_BufLock);
00160 
00161    m_iNextMsgNo ++;
00162    if (m_iNextMsgNo == CMsgNo::m_iMaxMsgNo)
00163       m_iNextMsgNo = 1;
00164 }
00165 
00166 int CSndBuffer::addBufferFromFile(fstream& ifs, int len)
00167 {
00168    int size = len / m_iMSS;
00169    if ((len % m_iMSS) != 0)
00170       size ++;
00171 
00172    // dynamically increase sender buffer
00173    while (size + m_iCount >= m_iSize)
00174       increase();
00175 
00176    Block* s = m_pLastBlock;
00177    int total = 0;
00178    for (int i = 0; i < size; ++ i)
00179    {
00180       if (ifs.bad() || ifs.fail() || ifs.eof())
00181          break;
00182 
00183       int pktlen = len - i * m_iMSS;
00184       if (pktlen > m_iMSS)
00185          pktlen = m_iMSS;
00186 
00187       ifs.read(s->m_pcData, pktlen);
00188       if ((pktlen = ifs.gcount()) <= 0)
00189          break;
00190 
00191       // currently file transfer is only available in streaming mode, message is always in order, ttl = infinite
00192       s->m_iMsgNo = m_iNextMsgNo | 0x20000000;
00193       if (i == 0)
00194          s->m_iMsgNo |= 0x80000000;
00195       if (i == size - 1)
00196          s->m_iMsgNo |= 0x40000000;
00197 
00198       s->m_iLength = pktlen;
00199       s->m_iTTL = -1;
00200       s = s->m_pNext;
00201 
00202       total += pktlen;
00203    }
00204    m_pLastBlock = s;
00205 
00206    CGuard::enterCS(m_BufLock);
00207    m_iCount += size;
00208    CGuard::leaveCS(m_BufLock);
00209 
00210    m_iNextMsgNo ++;
00211    if (m_iNextMsgNo == CMsgNo::m_iMaxMsgNo)
00212       m_iNextMsgNo = 1;
00213 
00214    return total;
00215 }
00216 
00217 int CSndBuffer::readData(char** data, int32_t& msgno)
00218 {
00219    // No data to read
00220    if (m_pCurrBlock == m_pLastBlock)
00221       return 0;
00222 
00223    *data = m_pCurrBlock->m_pcData;
00224    int readlen = m_pCurrBlock->m_iLength;
00225    msgno = m_pCurrBlock->m_iMsgNo;
00226 
00227    m_pCurrBlock = m_pCurrBlock->m_pNext;
00228 
00229    return readlen;
00230 }
00231 
00232 int CSndBuffer::readData(char** data, const int offset, int32_t& msgno, int& msglen)
00233 {
00234    CGuard bufferguard(m_BufLock);
00235 
00236    Block* p = m_pFirstBlock;
00237 
00238    for (int i = 0; i < offset; ++ i)
00239       p = p->m_pNext;
00240 
00241    if ((p->m_iTTL >= 0) && ((CTimer::getTime() - p->m_OriginTime) / 1000 > (uint64_t)p->m_iTTL))
00242    {
00243       msgno = p->m_iMsgNo & 0x1FFFFFFF;
00244 
00245       msglen = 1;
00246       p = p->m_pNext;
00247       bool move = false;
00248       while (msgno == (p->m_iMsgNo & 0x1FFFFFFF))
00249       {
00250          if (p == m_pCurrBlock)
00251             move = true;
00252          p = p->m_pNext;
00253          if (move)
00254             m_pCurrBlock = p;
00255          msglen ++;
00256       }
00257 
00258       return -1;
00259    }
00260 
00261    *data = p->m_pcData;
00262    int readlen = p->m_iLength;
00263    msgno = p->m_iMsgNo;
00264 
00265    return readlen;
00266 }
00267 
00268 void CSndBuffer::ackData(int offset)
00269 {
00270    CGuard bufferguard(m_BufLock);
00271 
00272    for (int i = 0; i < offset; ++ i)
00273       m_pFirstBlock = m_pFirstBlock->m_pNext;
00274 
00275    m_iCount -= offset;
00276 
00277    CTimer::triggerEvent();
00278 }
00279 
00280 int CSndBuffer::getCurrBufSize() const
00281 {
00282    return m_iCount;
00283 }
00284 
00285 void CSndBuffer::increase()
00286 {
00287    int unitsize = m_pBuffer->m_iSize;
00288 
00289    // new physical buffer
00290    Buffer* nbuf = NULL;
00291    try
00292    {
00293       nbuf  = new Buffer;
00294       nbuf->m_pcData = new char [unitsize * m_iMSS];
00295    }
00296    catch (...)
00297    {
00298       delete nbuf;
00299       throw CUDTException(3, 2, 0);
00300    }
00301    nbuf->m_iSize = unitsize;
00302    nbuf->m_pNext = NULL;
00303 
00304    // insert the buffer at the end of the buffer list
00305    Buffer* p = m_pBuffer;
00306    while (NULL != p->m_pNext)
00307       p = p->m_pNext;
00308    p->m_pNext = nbuf;
00309 
00310    // new packet blocks
00311    Block* nblk = NULL;
00312    try
00313    {
00314       nblk = new Block;
00315    }
00316    catch (...)
00317    {
00318       delete nblk;
00319       throw CUDTException(3, 2, 0);
00320    }
00321    Block* pb = nblk;
00322    for (int i = 1; i < unitsize; ++ i)
00323    {
00324       pb->m_pNext = new Block;
00325       pb = pb->m_pNext;
00326    }
00327 
00328    // insert the new blocks onto the existing one
00329    pb->m_pNext = m_pLastBlock->m_pNext;
00330    m_pLastBlock->m_pNext = nblk;
00331 
00332    pb = nblk;
00333    char* pc = nbuf->m_pcData;
00334    for (int i = 0; i < unitsize; ++ i)
00335    {
00336       pb->m_pcData = pc;
00337       pb = pb->m_pNext;
00338       pc += m_iMSS;
00339    }
00340 
00341    m_iSize += unitsize;
00342 }
00343 
00345 
00346 CRcvBuffer::CRcvBuffer(CUnitQueue* queue, int bufsize):
00347 m_pUnit(NULL),
00348 m_iSize(bufsize),
00349 m_pUnitQueue(queue),
00350 m_iStartPos(0),
00351 m_iLastAckPos(0),
00352 m_iMaxPos(0),
00353 m_iNotch(0)
00354 {
00355    m_pUnit = new CUnit* [m_iSize];
00356    for (int i = 0; i < m_iSize; ++ i)
00357       m_pUnit[i] = NULL;
00358 }
00359 
00360 CRcvBuffer::~CRcvBuffer()
00361 {
00362    for (int i = 0; i < m_iSize; ++ i)
00363    {
00364       if (NULL != m_pUnit[i])
00365       {
00366          m_pUnit[i]->m_iFlag = 0;
00367          -- m_pUnitQueue->m_iCount;
00368       }
00369    }
00370 
00371    delete [] m_pUnit;
00372 }
00373 
00374 int CRcvBuffer::addData(CUnit* unit, int offset)
00375 {
00376    int pos = (m_iLastAckPos + offset) % m_iSize;
00377    if (offset > m_iMaxPos)
00378       m_iMaxPos = offset;
00379 
00380    if (NULL != m_pUnit[pos])
00381       return -1;
00382    
00383    m_pUnit[pos] = unit;
00384 
00385    unit->m_iFlag = 1;
00386    ++ m_pUnitQueue->m_iCount;
00387 
00388    return 0;
00389 }
00390 
00391 int CRcvBuffer::readBuffer(char* data, int len)
00392 {
00393    int p = m_iStartPos;
00394    int lastack = m_iLastAckPos;
00395    int rs = len;
00396 
00397    while ((p != lastack) && (rs > 0))
00398    {
00399       int unitsize = m_pUnit[p]->m_Packet.getLength() - m_iNotch;
00400       if (unitsize > rs)
00401          unitsize = rs;
00402 
00403       memcpy(data, m_pUnit[p]->m_Packet.m_pcData + m_iNotch, unitsize);
00404       data += unitsize;
00405 
00406       if ((rs > unitsize) || (rs == m_pUnit[p]->m_Packet.getLength() - m_iNotch))
00407       {
00408          CUnit* tmp = m_pUnit[p];
00409          m_pUnit[p] = NULL;
00410          tmp->m_iFlag = 0;
00411          -- m_pUnitQueue->m_iCount;
00412 
00413          if (++ p == m_iSize)
00414             p = 0;
00415 
00416          m_iNotch = 0;
00417       }
00418       else
00419          m_iNotch += rs;
00420 
00421       rs -= unitsize;
00422    }
00423 
00424    m_iStartPos = p;
00425    return len - rs;
00426 }
00427 
00428 int CRcvBuffer::readBufferToFile(fstream& ofs, int len)
00429 {
00430    int p = m_iStartPos;
00431    int lastack = m_iLastAckPos;
00432    int rs = len;
00433 
00434    while ((p != lastack) && (rs > 0))
00435    {
00436       int unitsize = m_pUnit[p]->m_Packet.getLength() - m_iNotch;
00437       if (unitsize > rs)
00438          unitsize = rs;
00439 
00440       ofs.write(m_pUnit[p]->m_Packet.m_pcData + m_iNotch, unitsize);
00441       if (ofs.fail())
00442          break;
00443 
00444       if ((rs > unitsize) || (rs == m_pUnit[p]->m_Packet.getLength() - m_iNotch))
00445       {
00446          CUnit* tmp = m_pUnit[p];
00447          m_pUnit[p] = NULL;
00448          tmp->m_iFlag = 0;
00449          -- m_pUnitQueue->m_iCount;
00450 
00451          if (++ p == m_iSize)
00452             p = 0;
00453 
00454          m_iNotch = 0;
00455       }
00456       else
00457          m_iNotch += rs;
00458 
00459       rs -= unitsize;
00460    }
00461 
00462    m_iStartPos = p;
00463 
00464    return len - rs;
00465 }
00466 
00467 void CRcvBuffer::ackData(int len)
00468 {
00469    m_iLastAckPos = (m_iLastAckPos + len) % m_iSize;
00470    m_iMaxPos -= len;
00471    if (m_iMaxPos < 0)
00472       m_iMaxPos = 0;
00473 
00474    CTimer::triggerEvent();
00475 }
00476 
00477 int CRcvBuffer::getAvailBufSize() const
00478 {
00479    // One slot must be empty in order to tell the difference between "empty buffer" and "full buffer"
00480    return m_iSize - getRcvDataSize() - 1;
00481 }
00482 
00483 int CRcvBuffer::getRcvDataSize() const
00484 {
00485    if (m_iLastAckPos >= m_iStartPos)
00486       return m_iLastAckPos - m_iStartPos;
00487 
00488    return m_iSize + m_iLastAckPos - m_iStartPos;
00489 }
00490 
00491 void CRcvBuffer::dropMsg(int32_t msgno)
00492 {
00493    for (int i = m_iStartPos, n = (m_iLastAckPos + m_iMaxPos) % m_iSize; i != n; i = (i + 1) % m_iSize)
00494       if ((NULL != m_pUnit[i]) && (msgno == m_pUnit[i]->m_Packet.m_iMsgNo))
00495          m_pUnit[i]->m_iFlag = 3;
00496 }
00497 
00498 int CRcvBuffer::readMsg(char* data, int len)
00499 {
00500    int p, q;
00501    bool passack;
00502    if (!scanMsg(p, q, passack))
00503       return 0;
00504 
00505    int rs = len;
00506    while (p != (q + 1) % m_iSize)
00507    {
00508       int unitsize = m_pUnit[p]->m_Packet.getLength();
00509       if ((rs >= 0) && (unitsize > rs))
00510          unitsize = rs;
00511 
00512       if (unitsize > 0)
00513       {
00514          memcpy(data, m_pUnit[p]->m_Packet.m_pcData, unitsize);
00515          data += unitsize;
00516          rs -= unitsize;
00517       }
00518 
00519       if (!passack)
00520       {
00521          CUnit* tmp = m_pUnit[p];
00522          m_pUnit[p] = NULL;
00523          tmp->m_iFlag = 0;
00524          -- m_pUnitQueue->m_iCount;
00525       }
00526       else
00527          m_pUnit[p]->m_iFlag = 2;
00528 
00529       if (++ p == m_iSize)
00530          p = 0;
00531    }
00532 
00533    if (!passack)
00534       m_iStartPos = (q + 1) % m_iSize;
00535 
00536    return len - rs;
00537 }
00538 
00539 int CRcvBuffer::getRcvMsgNum()
00540 {
00541    int p, q;
00542    bool passack;
00543    return scanMsg(p, q, passack) ? 1 : 0;
00544 }
00545 
00546 bool CRcvBuffer::scanMsg(int& p, int& q, bool& passack)
00547 {
00548    // empty buffer
00549    if ((m_iStartPos == m_iLastAckPos) && (m_iMaxPos <= 0))
00550       return false;
00551 
00552    //skip all bad msgs at the beginning
00553    while (m_iStartPos != m_iLastAckPos)
00554    {
00555       if (NULL == m_pUnit[m_iStartPos])
00556       {
00557          if (++ m_iStartPos == m_iSize)
00558             m_iStartPos = 0;
00559          continue;
00560       }
00561 
00562       if ((1 == m_pUnit[m_iStartPos]->m_iFlag) && (m_pUnit[m_iStartPos]->m_Packet.getMsgBoundary() > 1))
00563       {
00564          bool good = true;
00565 
00566          // look ahead for the whole message
00567          for (int i = m_iStartPos; i != m_iLastAckPos;)
00568          {
00569             if ((NULL == m_pUnit[i]) || (1 != m_pUnit[i]->m_iFlag))
00570             {
00571                good = false;
00572                break;
00573             }
00574 
00575             if ((m_pUnit[i]->m_Packet.getMsgBoundary() == 1) || (m_pUnit[i]->m_Packet.getMsgBoundary() == 3))
00576                break;
00577 
00578             if (++ i == m_iSize)
00579                i = 0;
00580          }
00581 
00582          if (good)
00583             break;
00584       }
00585 
00586       CUnit* tmp = m_pUnit[m_iStartPos];
00587       m_pUnit[m_iStartPos] = NULL;
00588       tmp->m_iFlag = 0;
00589       -- m_pUnitQueue->m_iCount;
00590 
00591       if (++ m_iStartPos == m_iSize)
00592          m_iStartPos = 0;
00593    }
00594 
00595    p = -1;                  // message head
00596    q = m_iStartPos;         // message tail
00597    passack = m_iStartPos == m_iLastAckPos;
00598    bool found = false;
00599 
00600    // looking for the first message
00601    for (int i = 0, n = m_iMaxPos + getRcvDataSize(); i <= n; ++ i)
00602    {
00603       if ((NULL != m_pUnit[q]) && (1 == m_pUnit[q]->m_iFlag))
00604       {
00605          switch (m_pUnit[q]->m_Packet.getMsgBoundary())
00606          {
00607          case 3: // 11
00608             p = q;
00609             found = true;
00610             break;
00611 
00612          case 2: // 10
00613             p = q;
00614             break;
00615 
00616          case 1: // 01
00617             if (p != -1)
00618                found = true;
00619          }
00620       }
00621       else
00622       {
00623          // a hole in this message, not valid, restart search
00624          p = -1;
00625       }
00626 
00627       if (found)
00628       {
00629          // the msg has to be ack'ed or it is allowed to read out of order, and was not read before
00630          if (!passack || !m_pUnit[q]->m_Packet.getMsgOrderFlag())
00631             break;
00632 
00633          found = false;
00634       }
00635 
00636       if (++ q == m_iSize)
00637          q = 0;
00638 
00639       if (q == m_iLastAckPos)
00640          passack = true;
00641    }
00642 
00643    // no msg found
00644    if (!found)
00645    {
00646       // if the message is larger than the receiver buffer, return part of the message
00647       if ((p != -1) && ((q + 1) % m_iSize == p))
00648          found = true;
00649    }
00650 
00651    return found;
00652 }

Generated on 9 Feb 2013 for barchart-udt-core-2.2.2 by  doxygen 1.6.1