00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041 #include <cstring>
00042 #include <cmath>
00043 #include "buffer.h"
00044
00045 using namespace std;
00046
00047 CSndBuffer::CSndBuffer(int size, int mss):
00048 m_BufLock(),
00049 m_pBlock(NULL),
00050 m_pFirstBlock(NULL),
00051 m_pCurrBlock(NULL),
00052 m_pLastBlock(NULL),
00053 m_pBuffer(NULL),
00054 m_iNextMsgNo(1),
00055 m_iSize(size),
00056 m_iMSS(mss),
00057 m_iCount(0)
00058 {
00059
00060 m_pBuffer = new Buffer;
00061 m_pBuffer->m_pcData = new char [m_iSize * m_iMSS];
00062 m_pBuffer->m_iSize = m_iSize;
00063 m_pBuffer->m_pNext = NULL;
00064
00065
00066 m_pBlock = new Block;
00067 Block* pb = m_pBlock;
00068 for (int i = 1; i < m_iSize; ++ i)
00069 {
00070 pb->m_pNext = new Block;
00071 pb->m_iMsgNo = 0;
00072 pb = pb->m_pNext;
00073 }
00074 pb->m_pNext = m_pBlock;
00075
00076 pb = m_pBlock;
00077 char* pc = m_pBuffer->m_pcData;
00078 for (int i = 0; i < m_iSize; ++ i)
00079 {
00080 pb->m_pcData = pc;
00081 pb = pb->m_pNext;
00082 pc += m_iMSS;
00083 }
00084
00085 m_pFirstBlock = m_pCurrBlock = m_pLastBlock = m_pBlock;
00086
00087 #ifndef WIN32
00088 pthread_mutex_init(&m_BufLock, NULL);
00089 #else
00090 m_BufLock = CreateMutex(NULL, false, NULL);
00091 #endif
00092 }
00093
00094 CSndBuffer::~CSndBuffer()
00095 {
00096 Block* pb = m_pBlock->m_pNext;
00097 while (pb != m_pBlock)
00098 {
00099 Block* temp = pb;
00100 pb = pb->m_pNext;
00101 delete temp;
00102 }
00103 delete m_pBlock;
00104
00105 while (m_pBuffer != NULL)
00106 {
00107 Buffer* temp = m_pBuffer;
00108 m_pBuffer = m_pBuffer->m_pNext;
00109 delete [] temp->m_pcData;
00110 delete temp;
00111 }
00112
00113 #ifndef WIN32
00114 pthread_mutex_destroy(&m_BufLock);
00115 #else
00116 CloseHandle(m_BufLock);
00117 #endif
00118 }
00119
00120 void CSndBuffer::addBuffer(const char* data, int len, int ttl, bool order)
00121 {
00122 int size = len / m_iMSS;
00123 if ((len % m_iMSS) != 0)
00124 size ++;
00125
00126
00127 while (size + m_iCount >= m_iSize)
00128 increase();
00129
00130 uint64_t time = CTimer::getTime();
00131 int32_t inorder = order;
00132 inorder <<= 29;
00133
00134 Block* s = m_pLastBlock;
00135 for (int i = 0; i < size; ++ i)
00136 {
00137 int pktlen = len - i * m_iMSS;
00138 if (pktlen > m_iMSS)
00139 pktlen = m_iMSS;
00140
00141 memcpy(s->m_pcData, data + i * m_iMSS, pktlen);
00142 s->m_iLength = pktlen;
00143
00144 s->m_iMsgNo = m_iNextMsgNo | inorder;
00145 if (i == 0)
00146 s->m_iMsgNo |= 0x80000000;
00147 if (i == size - 1)
00148 s->m_iMsgNo |= 0x40000000;
00149
00150 s->m_OriginTime = time;
00151 s->m_iTTL = ttl;
00152
00153 s = s->m_pNext;
00154 }
00155 m_pLastBlock = s;
00156
00157 CGuard::enterCS(m_BufLock);
00158 m_iCount += size;
00159 CGuard::leaveCS(m_BufLock);
00160
00161 m_iNextMsgNo ++;
00162 if (m_iNextMsgNo == CMsgNo::m_iMaxMsgNo)
00163 m_iNextMsgNo = 1;
00164 }
00165
00166 int CSndBuffer::addBufferFromFile(fstream& ifs, int len)
00167 {
00168 int size = len / m_iMSS;
00169 if ((len % m_iMSS) != 0)
00170 size ++;
00171
00172
00173 while (size + m_iCount >= m_iSize)
00174 increase();
00175
00176 Block* s = m_pLastBlock;
00177 int total = 0;
00178 for (int i = 0; i < size; ++ i)
00179 {
00180 if (ifs.bad() || ifs.fail() || ifs.eof())
00181 break;
00182
00183 int pktlen = len - i * m_iMSS;
00184 if (pktlen > m_iMSS)
00185 pktlen = m_iMSS;
00186
00187 ifs.read(s->m_pcData, pktlen);
00188 if ((pktlen = ifs.gcount()) <= 0)
00189 break;
00190
00191
00192 s->m_iMsgNo = m_iNextMsgNo | 0x20000000;
00193 if (i == 0)
00194 s->m_iMsgNo |= 0x80000000;
00195 if (i == size - 1)
00196 s->m_iMsgNo |= 0x40000000;
00197
00198 s->m_iLength = pktlen;
00199 s->m_iTTL = -1;
00200 s = s->m_pNext;
00201
00202 total += pktlen;
00203 }
00204 m_pLastBlock = s;
00205
00206 CGuard::enterCS(m_BufLock);
00207 m_iCount += size;
00208 CGuard::leaveCS(m_BufLock);
00209
00210 m_iNextMsgNo ++;
00211 if (m_iNextMsgNo == CMsgNo::m_iMaxMsgNo)
00212 m_iNextMsgNo = 1;
00213
00214 return total;
00215 }
00216
00217 int CSndBuffer::readData(char** data, int32_t& msgno)
00218 {
00219
00220 if (m_pCurrBlock == m_pLastBlock)
00221 return 0;
00222
00223 *data = m_pCurrBlock->m_pcData;
00224 int readlen = m_pCurrBlock->m_iLength;
00225 msgno = m_pCurrBlock->m_iMsgNo;
00226
00227 m_pCurrBlock = m_pCurrBlock->m_pNext;
00228
00229 return readlen;
00230 }
00231
00232 int CSndBuffer::readData(char** data, const int offset, int32_t& msgno, int& msglen)
00233 {
00234 CGuard bufferguard(m_BufLock);
00235
00236 Block* p = m_pFirstBlock;
00237
00238 for (int i = 0; i < offset; ++ i)
00239 p = p->m_pNext;
00240
00241 if ((p->m_iTTL >= 0) && ((CTimer::getTime() - p->m_OriginTime) / 1000 > (uint64_t)p->m_iTTL))
00242 {
00243 msgno = p->m_iMsgNo & 0x1FFFFFFF;
00244
00245 msglen = 1;
00246 p = p->m_pNext;
00247 bool move = false;
00248 while (msgno == (p->m_iMsgNo & 0x1FFFFFFF))
00249 {
00250 if (p == m_pCurrBlock)
00251 move = true;
00252 p = p->m_pNext;
00253 if (move)
00254 m_pCurrBlock = p;
00255 msglen ++;
00256 }
00257
00258 return -1;
00259 }
00260
00261 *data = p->m_pcData;
00262 int readlen = p->m_iLength;
00263 msgno = p->m_iMsgNo;
00264
00265 return readlen;
00266 }
00267
00268 void CSndBuffer::ackData(int offset)
00269 {
00270 CGuard bufferguard(m_BufLock);
00271
00272 for (int i = 0; i < offset; ++ i)
00273 m_pFirstBlock = m_pFirstBlock->m_pNext;
00274
00275 m_iCount -= offset;
00276
00277 CTimer::triggerEvent();
00278 }
00279
00280 int CSndBuffer::getCurrBufSize() const
00281 {
00282 return m_iCount;
00283 }
00284
00285 void CSndBuffer::increase()
00286 {
00287 int unitsize = m_pBuffer->m_iSize;
00288
00289
00290 Buffer* nbuf = NULL;
00291 try
00292 {
00293 nbuf = new Buffer;
00294 nbuf->m_pcData = new char [unitsize * m_iMSS];
00295 }
00296 catch (...)
00297 {
00298 delete nbuf;
00299 throw CUDTException(3, 2, 0);
00300 }
00301 nbuf->m_iSize = unitsize;
00302 nbuf->m_pNext = NULL;
00303
00304
00305 Buffer* p = m_pBuffer;
00306 while (NULL != p->m_pNext)
00307 p = p->m_pNext;
00308 p->m_pNext = nbuf;
00309
00310
00311 Block* nblk = NULL;
00312 try
00313 {
00314 nblk = new Block;
00315 }
00316 catch (...)
00317 {
00318 delete nblk;
00319 throw CUDTException(3, 2, 0);
00320 }
00321 Block* pb = nblk;
00322 for (int i = 1; i < unitsize; ++ i)
00323 {
00324 pb->m_pNext = new Block;
00325 pb = pb->m_pNext;
00326 }
00327
00328
00329 pb->m_pNext = m_pLastBlock->m_pNext;
00330 m_pLastBlock->m_pNext = nblk;
00331
00332 pb = nblk;
00333 char* pc = nbuf->m_pcData;
00334 for (int i = 0; i < unitsize; ++ i)
00335 {
00336 pb->m_pcData = pc;
00337 pb = pb->m_pNext;
00338 pc += m_iMSS;
00339 }
00340
00341 m_iSize += unitsize;
00342 }
00343
00345
00346 CRcvBuffer::CRcvBuffer(CUnitQueue* queue, int bufsize):
00347 m_pUnit(NULL),
00348 m_iSize(bufsize),
00349 m_pUnitQueue(queue),
00350 m_iStartPos(0),
00351 m_iLastAckPos(0),
00352 m_iMaxPos(0),
00353 m_iNotch(0)
00354 {
00355 m_pUnit = new CUnit* [m_iSize];
00356 for (int i = 0; i < m_iSize; ++ i)
00357 m_pUnit[i] = NULL;
00358 }
00359
00360 CRcvBuffer::~CRcvBuffer()
00361 {
00362 for (int i = 0; i < m_iSize; ++ i)
00363 {
00364 if (NULL != m_pUnit[i])
00365 {
00366 m_pUnit[i]->m_iFlag = 0;
00367 -- m_pUnitQueue->m_iCount;
00368 }
00369 }
00370
00371 delete [] m_pUnit;
00372 }
00373
00374 int CRcvBuffer::addData(CUnit* unit, int offset)
00375 {
00376 int pos = (m_iLastAckPos + offset) % m_iSize;
00377 if (offset > m_iMaxPos)
00378 m_iMaxPos = offset;
00379
00380 if (NULL != m_pUnit[pos])
00381 return -1;
00382
00383 m_pUnit[pos] = unit;
00384
00385 unit->m_iFlag = 1;
00386 ++ m_pUnitQueue->m_iCount;
00387
00388 return 0;
00389 }
00390
00391 int CRcvBuffer::readBuffer(char* data, int len)
00392 {
00393 int p = m_iStartPos;
00394 int lastack = m_iLastAckPos;
00395 int rs = len;
00396
00397 while ((p != lastack) && (rs > 0))
00398 {
00399 int unitsize = m_pUnit[p]->m_Packet.getLength() - m_iNotch;
00400 if (unitsize > rs)
00401 unitsize = rs;
00402
00403 memcpy(data, m_pUnit[p]->m_Packet.m_pcData + m_iNotch, unitsize);
00404 data += unitsize;
00405
00406 if ((rs > unitsize) || (rs == m_pUnit[p]->m_Packet.getLength() - m_iNotch))
00407 {
00408 CUnit* tmp = m_pUnit[p];
00409 m_pUnit[p] = NULL;
00410 tmp->m_iFlag = 0;
00411 -- m_pUnitQueue->m_iCount;
00412
00413 if (++ p == m_iSize)
00414 p = 0;
00415
00416 m_iNotch = 0;
00417 }
00418 else
00419 m_iNotch += rs;
00420
00421 rs -= unitsize;
00422 }
00423
00424 m_iStartPos = p;
00425 return len - rs;
00426 }
00427
00428 int CRcvBuffer::readBufferToFile(fstream& ofs, int len)
00429 {
00430 int p = m_iStartPos;
00431 int lastack = m_iLastAckPos;
00432 int rs = len;
00433
00434 while ((p != lastack) && (rs > 0))
00435 {
00436 int unitsize = m_pUnit[p]->m_Packet.getLength() - m_iNotch;
00437 if (unitsize > rs)
00438 unitsize = rs;
00439
00440 ofs.write(m_pUnit[p]->m_Packet.m_pcData + m_iNotch, unitsize);
00441 if (ofs.fail())
00442 break;
00443
00444 if ((rs > unitsize) || (rs == m_pUnit[p]->m_Packet.getLength() - m_iNotch))
00445 {
00446 CUnit* tmp = m_pUnit[p];
00447 m_pUnit[p] = NULL;
00448 tmp->m_iFlag = 0;
00449 -- m_pUnitQueue->m_iCount;
00450
00451 if (++ p == m_iSize)
00452 p = 0;
00453
00454 m_iNotch = 0;
00455 }
00456 else
00457 m_iNotch += rs;
00458
00459 rs -= unitsize;
00460 }
00461
00462 m_iStartPos = p;
00463
00464 return len - rs;
00465 }
00466
00467 void CRcvBuffer::ackData(int len)
00468 {
00469 m_iLastAckPos = (m_iLastAckPos + len) % m_iSize;
00470 m_iMaxPos -= len;
00471 if (m_iMaxPos < 0)
00472 m_iMaxPos = 0;
00473
00474 CTimer::triggerEvent();
00475 }
00476
00477 int CRcvBuffer::getAvailBufSize() const
00478 {
00479
00480 return m_iSize - getRcvDataSize() - 1;
00481 }
00482
00483 int CRcvBuffer::getRcvDataSize() const
00484 {
00485 if (m_iLastAckPos >= m_iStartPos)
00486 return m_iLastAckPos - m_iStartPos;
00487
00488 return m_iSize + m_iLastAckPos - m_iStartPos;
00489 }
00490
00491 void CRcvBuffer::dropMsg(int32_t msgno)
00492 {
00493 for (int i = m_iStartPos, n = (m_iLastAckPos + m_iMaxPos) % m_iSize; i != n; i = (i + 1) % m_iSize)
00494 if ((NULL != m_pUnit[i]) && (msgno == m_pUnit[i]->m_Packet.m_iMsgNo))
00495 m_pUnit[i]->m_iFlag = 3;
00496 }
00497
00498 int CRcvBuffer::readMsg(char* data, int len)
00499 {
00500 int p, q;
00501 bool passack;
00502 if (!scanMsg(p, q, passack))
00503 return 0;
00504
00505 int rs = len;
00506 while (p != (q + 1) % m_iSize)
00507 {
00508 int unitsize = m_pUnit[p]->m_Packet.getLength();
00509 if ((rs >= 0) && (unitsize > rs))
00510 unitsize = rs;
00511
00512 if (unitsize > 0)
00513 {
00514 memcpy(data, m_pUnit[p]->m_Packet.m_pcData, unitsize);
00515 data += unitsize;
00516 rs -= unitsize;
00517 }
00518
00519 if (!passack)
00520 {
00521 CUnit* tmp = m_pUnit[p];
00522 m_pUnit[p] = NULL;
00523 tmp->m_iFlag = 0;
00524 -- m_pUnitQueue->m_iCount;
00525 }
00526 else
00527 m_pUnit[p]->m_iFlag = 2;
00528
00529 if (++ p == m_iSize)
00530 p = 0;
00531 }
00532
00533 if (!passack)
00534 m_iStartPos = (q + 1) % m_iSize;
00535
00536 return len - rs;
00537 }
00538
00539 int CRcvBuffer::getRcvMsgNum()
00540 {
00541 int p, q;
00542 bool passack;
00543 return scanMsg(p, q, passack) ? 1 : 0;
00544 }
00545
00546 bool CRcvBuffer::scanMsg(int& p, int& q, bool& passack)
00547 {
00548
00549 if ((m_iStartPos == m_iLastAckPos) && (m_iMaxPos <= 0))
00550 return false;
00551
00552
00553 while (m_iStartPos != m_iLastAckPos)
00554 {
00555 if (NULL == m_pUnit[m_iStartPos])
00556 {
00557 if (++ m_iStartPos == m_iSize)
00558 m_iStartPos = 0;
00559 continue;
00560 }
00561
00562 if ((1 == m_pUnit[m_iStartPos]->m_iFlag) && (m_pUnit[m_iStartPos]->m_Packet.getMsgBoundary() > 1))
00563 {
00564 bool good = true;
00565
00566
00567 for (int i = m_iStartPos; i != m_iLastAckPos;)
00568 {
00569 if ((NULL == m_pUnit[i]) || (1 != m_pUnit[i]->m_iFlag))
00570 {
00571 good = false;
00572 break;
00573 }
00574
00575 if ((m_pUnit[i]->m_Packet.getMsgBoundary() == 1) || (m_pUnit[i]->m_Packet.getMsgBoundary() == 3))
00576 break;
00577
00578 if (++ i == m_iSize)
00579 i = 0;
00580 }
00581
00582 if (good)
00583 break;
00584 }
00585
00586 CUnit* tmp = m_pUnit[m_iStartPos];
00587 m_pUnit[m_iStartPos] = NULL;
00588 tmp->m_iFlag = 0;
00589 -- m_pUnitQueue->m_iCount;
00590
00591 if (++ m_iStartPos == m_iSize)
00592 m_iStartPos = 0;
00593 }
00594
00595 p = -1;
00596 q = m_iStartPos;
00597 passack = m_iStartPos == m_iLastAckPos;
00598 bool found = false;
00599
00600
00601 for (int i = 0, n = m_iMaxPos + getRcvDataSize(); i <= n; ++ i)
00602 {
00603 if ((NULL != m_pUnit[q]) && (1 == m_pUnit[q]->m_iFlag))
00604 {
00605 switch (m_pUnit[q]->m_Packet.getMsgBoundary())
00606 {
00607 case 3:
00608 p = q;
00609 found = true;
00610 break;
00611
00612 case 2:
00613 p = q;
00614 break;
00615
00616 case 1:
00617 if (p != -1)
00618 found = true;
00619 }
00620 }
00621 else
00622 {
00623
00624 p = -1;
00625 }
00626
00627 if (found)
00628 {
00629
00630 if (!passack || !m_pUnit[q]->m_Packet.getMsgOrderFlag())
00631 break;
00632
00633 found = false;
00634 }
00635
00636 if (++ q == m_iSize)
00637 q = 0;
00638
00639 if (q == m_iLastAckPos)
00640 passack = true;
00641 }
00642
00643
00644 if (!found)
00645 {
00646
00647 if ((p != -1) && ((q + 1) % m_iSize == p))
00648 found = true;
00649 }
00650
00651 return found;
00652 }