Emeric Vigier | 2f62582 | 2012-08-06 11:09:52 -0400 | [diff] [blame] | 1 | // Copyright (C) 2001,2002,2004 Federico Montesino Pouzols <fedemp@altern.org> |
| 2 | // |
| 3 | // This program is free software; you can redistribute it and/or modify |
| 4 | // it under the terms of the GNU General Public License as published by |
| 5 | // the Free Software Foundation; either version 2 of the License, or |
| 6 | // (at your option) any later version. |
| 7 | // |
| 8 | // This program is distributed in the hope that it will be useful, |
| 9 | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 10 | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 11 | // GNU General Public License for more details. |
| 12 | // |
| 13 | // You should have received a copy of the GNU General Public License |
| 14 | // along with this program; if not, write to the Free Software |
| 15 | // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. |
| 16 | // |
| 17 | // As a special exception, you may use this file as part of a free software |
| 18 | // library without restriction. Specifically, if other files instantiate |
| 19 | // templates or use macros or inline functions from this file, or you compile |
| 20 | // this file and link it with other files to produce an executable, this |
| 21 | // file does not by itself cause the resulting executable to be covered by |
| 22 | // the GNU General Public License. This exception does not however |
| 23 | // invalidate any other reasons why the executable file might be covered by |
| 24 | // the GNU General Public License. |
| 25 | // |
| 26 | // This exception applies only to the code released under the name GNU |
| 27 | // ccRTP. If you copy code from other releases into a copy of GNU |
| 28 | // ccRTP, as the General Public License permits, the exception does |
| 29 | // not apply to the code that you add in this way. To avoid misleading |
| 30 | // anyone as to the status of such modified files, you must delete |
| 31 | // this exception notice from them. |
| 32 | // |
| 33 | // If you write modifications of your own for GNU ccRTP, it is your choice |
| 34 | // whether to permit this exception to apply to your modifications. |
| 35 | // If you do not wish that, delete this exception notice. |
| 36 | // |
| 37 | |
| 38 | #include "private.h" |
| 39 | #include <ccrtp/iqueue.h> |
| 40 | |
| 41 | #ifdef CCXX_NAMESPACES |
| 42 | namespace ost { |
| 43 | #endif |
| 44 | |
| 45 | const size_t IncomingDataQueueBase::defaultMaxRecvPacketSize = 65534; |
| 46 | |
| 47 | ConflictHandler::ConflictingTransportAddress:: |
| 48 | ConflictingTransportAddress(InetAddress na,tpport_t dtp, tpport_t ctp): |
| 49 | networkAddress(na), dataTransportPort(dtp), |
| 50 | controlTransportPort(ctp), next(NULL) |
| 51 | { |
| 52 | gettimeofday(&lastPacketTime,NULL); |
| 53 | } |
| 54 | |
| 55 | ConflictHandler::ConflictingTransportAddress* |
| 56 | ConflictHandler::searchDataConflict(InetAddress na, tpport_t dtp) |
| 57 | { |
| 58 | ConflictingTransportAddress* result = firstConflict; |
| 59 | while ( result->networkAddress != na || |
| 60 | result->dataTransportPort != dtp) |
| 61 | result = result->next; |
| 62 | return result; |
| 63 | } |
| 64 | |
| 65 | ConflictHandler::ConflictingTransportAddress* |
| 66 | ConflictHandler::searchControlConflict(InetAddress na, tpport_t ctp) |
| 67 | { |
| 68 | ConflictingTransportAddress* result = firstConflict; |
| 69 | while ( result && |
| 70 | (result->networkAddress != na || |
| 71 | result->controlTransportPort != ctp) ) |
| 72 | result = result->next; |
| 73 | return result; |
| 74 | } |
| 75 | |
| 76 | void |
| 77 | ConflictHandler::addConflict(const InetAddress& na, tpport_t dtp, tpport_t ctp) |
| 78 | { |
| 79 | ConflictingTransportAddress* nc = |
| 80 | new ConflictingTransportAddress(na,dtp,ctp); |
| 81 | |
| 82 | if ( lastConflict ) { |
| 83 | lastConflict->setNext(nc); |
| 84 | lastConflict = nc; |
| 85 | } else { |
| 86 | firstConflict = lastConflict = nc; |
| 87 | } |
| 88 | } |
| 89 | |
| 90 | const uint8 IncomingDataQueue::defaultMinValidPacketSequence = 0; |
| 91 | const uint16 IncomingDataQueue::defaultMaxPacketMisorder = 0; |
| 92 | const uint16 IncomingDataQueue::defaultMaxPacketDropout = 3000; |
| 93 | const size_t IncomingDataQueue::defaultMembersSize = |
| 94 | MembershipBookkeeping::defaultMembersHashSize; |
| 95 | |
| 96 | IncomingDataQueue::IncomingDataQueue(uint32 size) : |
| 97 | IncomingDataQueueBase(), MembershipBookkeeping(size) |
| 98 | { |
| 99 | recvFirst = recvLast = NULL; |
| 100 | sourceExpirationPeriod = 5; // 5 RTCP report intervals |
| 101 | minValidPacketSequence = getDefaultMinValidPacketSequence(); |
| 102 | maxPacketDropout = getDefaultMaxPacketDropout(); |
| 103 | maxPacketMisorder = getDefaultMaxPacketMisorder(); |
| 104 | } |
| 105 | |
| 106 | void |
| 107 | IncomingDataQueue::purgeIncomingQueue() |
| 108 | { |
| 109 | IncomingRTPPktLink* recvnext; |
| 110 | // flush the reception queue (incoming packets not yet |
| 111 | // retrieved) |
| 112 | recvLock.writeLock(); |
| 113 | while( recvFirst ) |
| 114 | { |
| 115 | recvnext = recvFirst->getNext(); |
| 116 | |
| 117 | // nullify source specific packet list |
| 118 | SyncSourceLink *s = recvFirst->getSourceLink(); |
| 119 | s->setFirst(NULL); |
| 120 | s->setLast(NULL); |
| 121 | |
| 122 | delete recvFirst->getPacket(); |
| 123 | delete recvFirst; |
| 124 | recvFirst = recvnext; |
| 125 | } |
| 126 | recvLock.unlock(); |
| 127 | } |
| 128 | |
| 129 | void |
| 130 | IncomingDataQueue::renewLocalSSRC() |
| 131 | { |
| 132 | const uint32 MAXTRIES = 20; |
| 133 | uint32 newssrc; |
| 134 | uint16 tries = 0; |
| 135 | do { |
| 136 | newssrc = random32(); |
| 137 | tries++; |
| 138 | } while ( (tries < MAXTRIES) && isRegistered(newssrc) ); |
| 139 | |
| 140 | if ( MAXTRIES == tries ) { |
| 141 | // TODO we are in real trouble. |
| 142 | } |
| 143 | } |
| 144 | |
| 145 | bool |
| 146 | IncomingDataQueue::isWaiting(const SyncSource* src) const |
| 147 | { |
| 148 | bool w; |
| 149 | recvLock.readLock(); |
| 150 | if ( NULL == src ) |
| 151 | w = ( NULL != recvFirst); |
| 152 | else |
| 153 | w = isMine(*src) && ( NULL != getLink(*src)->getFirst() ); |
| 154 | |
| 155 | recvLock.unlock(); |
| 156 | return w; |
| 157 | } |
| 158 | |
| 159 | uint32 |
| 160 | IncomingDataQueue::getFirstTimestamp(const SyncSource* src) const |
| 161 | { |
| 162 | recvLock.readLock(); |
| 163 | |
| 164 | // get the first packet |
| 165 | IncomingRTPPktLink* packetLink; |
| 166 | if ( NULL == src ) |
| 167 | packetLink = recvFirst; |
| 168 | else |
| 169 | packetLink = isMine(*src) ? getLink(*src)->getFirst() : NULL; |
| 170 | |
| 171 | // get the timestamp of the first packet |
| 172 | uint32 ts; |
| 173 | if ( packetLink ) |
| 174 | ts = packetLink->getTimestamp(); |
| 175 | else |
| 176 | ts = 0l; |
| 177 | |
| 178 | recvLock.unlock(); |
| 179 | return ts; |
| 180 | } |
| 181 | |
| 182 | size_t |
| 183 | IncomingDataQueue::takeInDataPacket(void) |
| 184 | { |
| 185 | InetHostAddress network_address; |
| 186 | tpport_t transport_port; |
| 187 | |
| 188 | uint32 nextSize = (uint32)getNextDataPacketSize(); |
| 189 | unsigned char* buffer = new unsigned char[nextSize]; |
| 190 | int32 rtn = (int32)recvData(buffer,nextSize,network_address,transport_port); |
| 191 | if ( (rtn < 0) || ((uint32)rtn > getMaxRecvPacketSize()) ){ |
| 192 | delete buffer; |
| 193 | return 0; |
| 194 | } |
| 195 | |
| 196 | // get time of arrival |
| 197 | struct timeval recvtime; |
| 198 | gettimeofday(&recvtime,NULL); |
| 199 | |
| 200 | // Special handling of padding to take care of encrypted content. |
| 201 | // In case of SRTP the padding length field is also encrypted, thus |
| 202 | // it gives a wrong length. Check and clear padding bit before |
| 203 | // creating the RTPPacket. Will be set and re-computed after a possible |
| 204 | // SRTP decryption. |
| 205 | uint8 padSet = (*buffer & 0x20); |
| 206 | if (padSet) { |
| 207 | *buffer = *buffer & ~0x20; // clear padding bit |
| 208 | } |
| 209 | // build a packet. It will link itself to its source |
| 210 | IncomingRTPPkt* packet = |
| 211 | new IncomingRTPPkt(buffer,rtn); |
| 212 | |
| 213 | // Generic header validity check. |
| 214 | if ( !packet->isHeaderValid() ) { |
| 215 | delete packet; |
| 216 | return 0; |
| 217 | } |
| 218 | |
| 219 | CryptoContext* pcc = getInQueueCryptoContext( packet->getSSRC()); |
| 220 | if (pcc == NULL) { |
| 221 | pcc = getInQueueCryptoContext(0); |
| 222 | if (pcc != NULL) { |
| 223 | pcc = pcc->newCryptoContextForSSRC(packet->getSSRC(), 0, 0L); |
| 224 | if (pcc != NULL) { |
| 225 | pcc->deriveSrtpKeys(0); |
| 226 | setInQueueCryptoContext(pcc); |
| 227 | } |
| 228 | } |
| 229 | } |
| 230 | if (pcc != NULL) { |
| 231 | int32 ret = packet->unprotect(pcc); |
| 232 | if (ret < 0) { |
| 233 | if (!onSRTPPacketError(*packet, ret)) { |
| 234 | delete packet; |
| 235 | return 0; |
| 236 | } |
| 237 | } |
| 238 | } |
| 239 | if (padSet) { |
| 240 | packet->reComputePayLength(true); |
| 241 | } |
| 242 | // virtual for profile-specific validation and processing. |
| 243 | if ( !onRTPPacketRecv(*packet) ) { |
| 244 | delete packet; |
| 245 | return 0; |
| 246 | } |
| 247 | |
| 248 | bool source_created; |
| 249 | SyncSourceLink* sourceLink = |
| 250 | getSourceBySSRC(packet->getSSRC(),source_created); |
| 251 | SyncSource* s = sourceLink->getSource(); |
| 252 | if ( source_created ) { |
| 253 | // Set data transport address. |
| 254 | setDataTransportPort(*s,transport_port); |
| 255 | // Network address is assumed to be the same as the control one |
| 256 | setNetworkAddress(*s,network_address); |
| 257 | sourceLink->initStats(); |
| 258 | // First packet arrival time. |
| 259 | sourceLink->setInitialDataTime(recvtime); |
| 260 | sourceLink->setProbation(getMinValidPacketSequence()); |
| 261 | if ( sourceLink->getHello() ) |
| 262 | onNewSyncSource(*s); |
| 263 | } else if ( 0 == s->getDataTransportPort() ) { |
| 264 | // Test if RTCP packets had been received but this is the |
| 265 | // first data packet from this source. |
| 266 | setDataTransportPort(*s,transport_port); |
| 267 | } |
| 268 | |
| 269 | // Before inserting in the queue, |
| 270 | // 1) check for collisions and loops. If the packet cannot be |
| 271 | // assigned to a source, it will be rejected. |
| 272 | // 2) check the source is a sufficiently well known source |
| 273 | // TODO: also check CSRC identifiers. |
| 274 | if ( checkSSRCInIncomingRTPPkt(*sourceLink,source_created, |
| 275 | network_address,transport_port) && |
| 276 | recordReception(*sourceLink,*packet,recvtime) ) { |
| 277 | // now the packet link is linked in the queues |
| 278 | IncomingRTPPktLink* packetLink = |
| 279 | new IncomingRTPPktLink(packet, |
| 280 | sourceLink, |
| 281 | recvtime, |
| 282 | packet->getTimestamp() - |
| 283 | sourceLink->getInitialDataTimestamp(), |
| 284 | NULL,NULL,NULL,NULL); |
| 285 | insertRecvPacket(packetLink); |
| 286 | } else { |
| 287 | // must be discarded due to collision or loop or |
| 288 | // invalid source |
| 289 | delete packet; |
| 290 | } |
| 291 | |
| 292 | // ccRTP keeps packets from the new source, but avoids |
| 293 | // flip-flopping. This allows losing less packets and for |
| 294 | // mobile telephony applications or other apps that may change |
| 295 | // the source transport address during the session. |
| 296 | return rtn; |
| 297 | } |
| 298 | |
| 299 | bool IncomingDataQueue::checkSSRCInIncomingRTPPkt(SyncSourceLink& sourceLink, |
| 300 | bool is_new, InetAddress& network_address, tpport_t transport_port) |
| 301 | { |
| 302 | bool result = true; |
| 303 | |
| 304 | // Test if the source is new and it is not the local one. |
| 305 | if ( is_new && |
| 306 | sourceLink.getSource()->getID() != getLocalSSRC() ) |
| 307 | return result; |
| 308 | |
| 309 | SyncSource *s = sourceLink.getSource(); |
| 310 | |
| 311 | if ( s->getDataTransportPort() != transport_port || |
| 312 | s->getNetworkAddress() != network_address ) { |
| 313 | // SSRC collision or a loop has happened |
| 314 | if ( s->getID() != getLocalSSRC() ) { |
| 315 | // TODO: Optional error counter. |
| 316 | |
| 317 | // Note this differs from the default in the RFC. |
| 318 | // Discard packet only when the collision is |
| 319 | // repeating (to avoid flip-flopping) |
| 320 | if ( sourceLink.getPrevConflict() && |
| 321 | ( |
| 322 | (network_address == |
| 323 | sourceLink.getPrevConflict()->networkAddress) |
| 324 | && |
| 325 | (transport_port == |
| 326 | sourceLink.getPrevConflict()->dataTransportPort) |
| 327 | ) ) { |
| 328 | // discard packet and do not flip-flop |
| 329 | result = false; |
| 330 | } else { |
| 331 | // Record who has collided so that in |
| 332 | // the future we can how if the |
| 333 | // collision repeats. |
| 334 | sourceLink.setPrevConflict(network_address, |
| 335 | transport_port,0); |
| 336 | // Change sync source transport address |
| 337 | setDataTransportPort(*s,transport_port); |
| 338 | setNetworkAddress(*s,network_address); |
| 339 | } |
| 340 | |
| 341 | } else { |
| 342 | // Collision or loop of own packets. |
| 343 | ConflictingTransportAddress* conflicting = |
| 344 | searchDataConflict(network_address, |
| 345 | transport_port); |
| 346 | if ( conflicting ) { |
| 347 | // Optional error counter. |
| 348 | updateConflict(*conflicting); |
| 349 | result = false; |
| 350 | } else { |
| 351 | // New collision |
| 352 | addConflict(s->getNetworkAddress(), |
| 353 | s->getDataTransportPort(), |
| 354 | s->getControlTransportPort()); |
| 355 | dispatchBYE("SSRC collision detected when receiving data packet."); |
| 356 | renewLocalSSRC(); |
| 357 | setNetworkAddress(*s,network_address); |
| 358 | setDataTransportPort(*s,transport_port); |
| 359 | setControlTransportPort(*s,0); |
| 360 | sourceLink.initStats(); |
| 361 | sourceLink.setProbation(getMinValidPacketSequence()); |
| 362 | } |
| 363 | } |
| 364 | } |
| 365 | return result; |
| 366 | } |
| 367 | |
| 368 | bool |
| 369 | IncomingDataQueue::insertRecvPacket(IncomingRTPPktLink* packetLink) |
| 370 | { |
| 371 | SyncSourceLink *srcLink = packetLink->getSourceLink(); |
| 372 | unsigned short seq = packetLink->getPacket()->getSeqNum(); |
| 373 | recvLock.writeLock(); |
| 374 | IncomingRTPPktLink* plink = srcLink->getLast(); |
| 375 | if ( plink && (seq < plink->getPacket()->getSeqNum()) ) { |
| 376 | // a disordered packet, so look for its place |
| 377 | while ( plink && (seq < plink->getPacket()->getSeqNum()) ){ |
| 378 | // the packet is a duplicate |
| 379 | if ( seq == plink->getPacket()->getSeqNum() ) { |
| 380 | recvLock.unlock(); |
| 381 | VDL(("Duplicated disordered packet: seqnum %d, SSRC:", |
| 382 | seq,srcLink->getSource()->getID())); |
| 383 | delete packetLink->getPacket(); |
| 384 | delete packetLink; |
| 385 | return false; |
| 386 | } |
| 387 | plink = plink->getSrcPrev(); |
| 388 | } |
| 389 | if ( !plink ) { |
| 390 | // we have scanned the whole (and non empty) |
| 391 | // list, so this must be the older (first) |
| 392 | // packet from this source. |
| 393 | |
| 394 | // insert into the source specific queue |
| 395 | IncomingRTPPktLink* srcFirst = srcLink->getFirst(); |
| 396 | srcFirst->setSrcPrev(packetLink); |
| 397 | packetLink->setSrcNext(srcFirst); |
| 398 | // insert into the global queue |
| 399 | IncomingRTPPktLink* prevFirst = srcFirst->getPrev(); |
| 400 | if ( prevFirst ){ |
| 401 | prevFirst->setNext(packetLink); |
| 402 | packetLink->setPrev(prevFirst); |
| 403 | } |
| 404 | srcFirst->setPrev(packetLink); |
| 405 | packetLink->setNext(srcFirst); |
| 406 | srcLink->setFirst(packetLink); |
| 407 | } else { |
| 408 | // (we are in the middle of the source list) |
| 409 | // insert into the source specific queue |
| 410 | plink->getSrcNext()->setSrcPrev(packetLink); |
| 411 | packetLink->setSrcNext(plink->getSrcNext()); |
| 412 | // -- insert into the global queue, with the |
| 413 | // minimum priority compared to packets from |
| 414 | // other sources |
| 415 | plink->getSrcNext()->getPrev()->setNext(packetLink); |
| 416 | packetLink->setPrev(plink->getSrcNext()->getPrev()); |
| 417 | plink->getSrcNext()->setPrev(packetLink); |
| 418 | packetLink->setNext(plink->getSrcNext()); |
| 419 | // ------ |
| 420 | plink->setSrcNext(packetLink); |
| 421 | packetLink->setSrcPrev(plink); |
| 422 | // insert into the global queue (giving |
| 423 | // priority compared to packets from other sources) |
| 424 | //list->getNext->setPrev(packetLink); |
| 425 | //packetLink->setNext(list->getNext); |
| 426 | //list->setNext(packet); |
| 427 | //packet->setPrev(list); |
| 428 | } |
| 429 | } else { |
| 430 | // An ordered packet |
| 431 | if ( !plink ) { |
| 432 | // the only packet in the source specific queue |
| 433 | srcLink->setLast(packetLink); |
| 434 | srcLink->setFirst(packetLink); |
| 435 | // the last packet in the global queue |
| 436 | if ( recvLast ) { |
| 437 | recvLast->setNext(packetLink); |
| 438 | packetLink->setPrev(recvLast); |
| 439 | } |
| 440 | recvLast = packetLink; |
| 441 | if ( !recvFirst ) |
| 442 | recvFirst = packetLink; |
| 443 | } else { |
| 444 | // there are already more packets from this source. |
| 445 | // this ignores duplicate packets |
| 446 | if ( plink && (seq == plink->getPacket()->getSeqNum()) ) { |
| 447 | VDL(("Duplicated packet: seqnum %d, SSRC:", |
| 448 | seq,srcLink->getSource->getID())); |
| 449 | recvLock.unlock(); |
| 450 | delete packetLink->getPacket(); |
| 451 | delete packetLink; |
| 452 | return false; |
| 453 | } |
| 454 | // the last packet in the source specific queue |
| 455 | srcLink->getLast()->setSrcNext(packetLink); |
| 456 | packetLink->setSrcPrev(srcLink->getLast()); |
| 457 | srcLink->setLast(packetLink); |
| 458 | // the last packet in the global queue |
| 459 | recvLast->setNext(packetLink); |
| 460 | packetLink->setPrev(recvLast); |
| 461 | recvLast = packetLink; |
| 462 | } |
| 463 | } |
| 464 | // account the insertion of this packet into the queue |
| 465 | srcLink->recordInsertion(*packetLink); |
| 466 | recvLock.unlock(); |
| 467 | // packet successfully inserted |
| 468 | return true; |
| 469 | } |
| 470 | |
| 471 | const AppDataUnit* |
| 472 | IncomingDataQueue::getData(uint32 stamp, const SyncSource* src) |
| 473 | { |
| 474 | IncomingRTPPktLink* pl; |
| 475 | // unsigned count = 0; |
| 476 | AppDataUnit* result; |
| 477 | |
| 478 | if ( NULL != (pl = getWaiting(stamp,src)) ) { |
| 479 | IncomingRTPPkt* packet = pl->getPacket(); |
| 480 | // size_t len = packet->getPayloadSize(); |
| 481 | |
| 482 | SyncSource &src = *(pl->getSourceLink()->getSource()); |
| 483 | result = new AppDataUnit(*packet,src); |
| 484 | |
| 485 | // delete the packet link, but not the packet |
| 486 | delete pl; |
| 487 | // count += len; |
| 488 | } else { |
| 489 | result = NULL; |
| 490 | } |
| 491 | return result; |
| 492 | } |
| 493 | |
| 494 | // FIX: try to merge and organize |
| 495 | IncomingDataQueue::IncomingRTPPktLink* |
| 496 | IncomingDataQueue::getWaiting(uint32 timestamp, const SyncSource* src) |
| 497 | { |
| 498 | if ( src && !isMine(*src) ) |
| 499 | return NULL; |
| 500 | |
| 501 | IncomingRTPPktLink *result; |
| 502 | recvLock.writeLock(); |
| 503 | if ( src != NULL ) { |
| 504 | // process source specific queries: |
| 505 | // we will modify the queue of this source |
| 506 | SyncSourceLink* srcm = getLink(*src); |
| 507 | |
| 508 | // first, delete all older packets. The while loop |
| 509 | // down here counts how many older packets are there; |
| 510 | // then the for loop deletes them and advances l till |
| 511 | // the first non older packet. |
| 512 | int nold = 0; |
| 513 | IncomingRTPPktLink* l = srcm->getFirst(); |
| 514 | if ( !l ) { |
| 515 | result = NULL; |
| 516 | recvLock.unlock(); |
| 517 | return result; |
| 518 | } |
| 519 | while ( l && ((l->getTimestamp() < timestamp) || |
| 520 | end2EndDelayed(*l))) { |
| 521 | nold++; |
| 522 | l = l->getSrcNext(); |
| 523 | } |
| 524 | // to know whether the global queue gets empty |
| 525 | bool nonempty = false; |
| 526 | for ( int i = 0; i < nold; i++) { |
| 527 | l = srcm->getFirst(); |
| 528 | srcm->setFirst(srcm->getFirst()->getSrcNext());; |
| 529 | // unlink from the global queue |
| 530 | nonempty = false; |
| 531 | if ( l->getPrev() ){ |
| 532 | nonempty = true; |
| 533 | l->getPrev()->setNext(l->getNext()); |
| 534 | } if ( l->getNext() ) { |
| 535 | nonempty = true; |
| 536 | l->getNext()->setPrev(l->getPrev()); |
| 537 | } |
| 538 | // now, delete it |
| 539 | onExpireRecv(*(l->getPacket()));// notify packet discard |
| 540 | delete l->getPacket(); |
| 541 | delete l; |
| 542 | } |
| 543 | // return the packet, if found |
| 544 | if ( !srcm->getFirst() ) { |
| 545 | // threre are no more packets from this source |
| 546 | srcm->setLast(NULL); |
| 547 | if ( !nonempty ) |
| 548 | recvFirst = recvLast = NULL; |
| 549 | result = NULL; |
| 550 | } else if ( srcm->getFirst()->getTimestamp() > timestamp ) { |
| 551 | // threre are only newer packets from this source |
| 552 | srcm->getFirst()->setSrcPrev(NULL); |
| 553 | result = NULL; |
| 554 | } else { |
| 555 | // (src->getFirst()->getTimestamp() == stamp) is true |
| 556 | result = srcm->getFirst(); |
| 557 | // unlink the selected packet from the global queue |
| 558 | if ( result->getPrev() ) |
| 559 | result->getPrev()->setNext(result->getNext()); |
| 560 | else |
| 561 | recvFirst = result->getNext(); |
| 562 | if ( result->getNext() ) |
| 563 | result->getNext()->setPrev(result->getPrev()); |
| 564 | else |
| 565 | recvLast = result->getPrev(); |
| 566 | // unlink the selected packet from the source queue |
| 567 | srcm->setFirst(result->getSrcNext()); |
| 568 | if ( srcm->getFirst() ) |
| 569 | srcm->getFirst()->setPrev(NULL); |
| 570 | else |
| 571 | srcm->setLast(NULL); |
| 572 | } |
| 573 | } else { |
| 574 | // process source unspecific queries |
| 575 | int nold = 0; |
| 576 | IncomingRTPPktLink* l = recvFirst; |
| 577 | while ( l && (l->getTimestamp() < timestamp || |
| 578 | end2EndDelayed(*l) ) ){ |
| 579 | nold++; |
| 580 | l = l->getNext(); |
| 581 | } |
| 582 | for (int i = 0; i < nold; i++) { |
| 583 | IncomingRTPPktLink* l = recvFirst; |
| 584 | recvFirst = recvFirst->getNext(); |
| 585 | // unlink the packet from the queue of its source |
| 586 | SyncSourceLink* src = l->getSourceLink(); |
| 587 | src->setFirst(l->getSrcNext()); |
| 588 | if ( l->getSrcNext() ) |
| 589 | l->getSrcNext()->setSrcPrev(NULL); |
| 590 | else |
| 591 | src->setLast(NULL); |
| 592 | // now, delete it |
| 593 | onExpireRecv(*(l->getPacket()));// notify packet discard |
| 594 | delete l->getPacket(); |
| 595 | delete l; |
| 596 | } |
| 597 | |
| 598 | // return the packet, if found |
| 599 | if ( !recvFirst ) { |
| 600 | // there are no more packets in the queue |
| 601 | recvLast = NULL; |
| 602 | result = NULL; |
| 603 | } else if ( recvFirst->getTimestamp() > timestamp ) { |
| 604 | // there are only newer packets in the queue |
| 605 | l->setPrev(NULL); |
| 606 | result = NULL; |
| 607 | } else { |
| 608 | // (recvFirst->getTimestamp() == stamp) is true |
| 609 | result = recvFirst; |
| 610 | // unlink the selected packet from the global queue |
| 611 | recvFirst = recvFirst->getNext(); |
| 612 | if ( recvFirst ) |
| 613 | recvFirst->setPrev(NULL); |
| 614 | else |
| 615 | recvLast = NULL; |
| 616 | // unlink the selected packet from the queue |
| 617 | // of its source |
| 618 | SyncSourceLink* src = result->getSourceLink(); |
| 619 | src->setFirst(result->getSrcNext()); |
| 620 | if ( src->getFirst() ) |
| 621 | src->getFirst()->setSrcPrev(NULL); |
| 622 | else |
| 623 | src->setLast(NULL); |
| 624 | } |
| 625 | } |
| 626 | recvLock.unlock(); |
| 627 | return result; |
| 628 | } |
| 629 | |
| 630 | bool |
| 631 | IncomingDataQueue::recordReception(SyncSourceLink& srcLink, |
| 632 | const IncomingRTPPkt& pkt, const timeval recvtime) |
| 633 | { |
| 634 | bool result = true; |
| 635 | |
| 636 | // Source validation. |
| 637 | SyncSource* src = srcLink.getSource(); |
| 638 | if ( !(srcLink.isValid()) ) { |
| 639 | // source is not yet valid. |
| 640 | if ( pkt.getSeqNum() == srcLink.getMaxSeqNum() + 1 ) { |
| 641 | // packet in sequence. |
| 642 | srcLink.decProbation(); |
| 643 | if ( srcLink.isValid() ) { |
| 644 | // source has become valid. |
| 645 | // TODO: avoid this the first time. |
| 646 | srcLink.initSequence(pkt.getSeqNum()); |
| 647 | } else { |
| 648 | result = false; |
| 649 | } |
| 650 | } else { |
| 651 | // packet not in sequence. |
| 652 | srcLink.probation = getMinValidPacketSequence() - 1; |
| 653 | result = false; |
| 654 | } |
| 655 | srcLink.setMaxSeqNum(pkt.getSeqNum()); |
| 656 | } else { |
| 657 | // source was already valid. |
| 658 | uint16 step = pkt.getSeqNum() - srcLink.getMaxSeqNum(); |
| 659 | if ( step < getMaxPacketDropout() ) { |
| 660 | // Ordered, with not too high step. |
| 661 | if ( pkt.getSeqNum() < srcLink.getMaxSeqNum() ) { |
| 662 | // sequene number wrapped. |
| 663 | srcLink.incSeqNumAccum(); |
| 664 | } |
| 665 | srcLink.setMaxSeqNum(pkt.getSeqNum()); |
| 666 | } else if ( step <= (SEQNUMMOD - getMaxPacketMisorder()) ) { |
| 667 | // too high step of the sequence number. |
| 668 | if ( pkt.getSeqNum() == srcLink.getBadSeqNum() ) { |
| 669 | srcLink.initSequence(pkt.getSeqNum()); |
| 670 | } else { |
| 671 | srcLink.setBadSeqNum((pkt.getSeqNum() + 1) & |
| 672 | (SEQNUMMOD - 1) ); |
| 673 | //This additional check avoids that |
| 674 | //the very first packet from a source |
| 675 | //be discarded. |
| 676 | if ( 0 < srcLink.getObservedPacketCount() ) { |
| 677 | result = false; |
| 678 | } else { |
| 679 | srcLink.setMaxSeqNum(pkt.getSeqNum()); |
| 680 | } |
| 681 | } |
| 682 | } else { |
| 683 | // duplicate or reordered packet |
| 684 | } |
| 685 | } |
| 686 | |
| 687 | if ( result ) { |
| 688 | // the packet is considered valid. |
| 689 | srcLink.incObservedPacketCount(); |
| 690 | srcLink.incObservedOctetCount(pkt.getPayloadSize()); |
| 691 | srcLink.lastPacketTime = recvtime; |
| 692 | if ( srcLink.getObservedPacketCount() == 1 ) { |
| 693 | // ooops, it's the first packet from this source |
| 694 | setSender(*src,true); |
| 695 | srcLink.setInitialDataTimestamp(pkt.getTimestamp()); |
| 696 | } |
| 697 | // we record the last time a packet from this source |
| 698 | // was received, this has statistical interest and is |
| 699 | // needed to time out old senders that are no sending |
| 700 | // any longer. |
| 701 | |
| 702 | // compute the interarrival jitter estimation. |
| 703 | timeval tarrival; |
| 704 | timeval lastT = srcLink.getLastPacketTime(); |
| 705 | timeval initial = srcLink.getInitialDataTime(); |
| 706 | timersub(&lastT,&initial,&tarrival); |
| 707 | uint32 arrival = timeval2microtimeout(tarrival) |
| 708 | * getCurrentRTPClockRate(); |
| 709 | uint32 transitTime = arrival - pkt.getTimestamp(); |
| 710 | int32 delta = transitTime - |
| 711 | srcLink.getLastPacketTransitTime(); |
| 712 | srcLink.setLastPacketTransitTime(transitTime); |
| 713 | if ( delta < 0 ) |
| 714 | delta = -delta; |
| 715 | srcLink.setJitter( srcLink.getJitter() + |
| 716 | (1.0f / 16.0f) * |
| 717 | (static_cast<float>(delta) - |
| 718 | srcLink.getJitter())); |
| 719 | } |
| 720 | return result; |
| 721 | } |
| 722 | |
| 723 | void |
| 724 | IncomingDataQueue::recordExtraction(const IncomingRTPPkt&) |
| 725 | {} |
| 726 | |
| 727 | void |
| 728 | IncomingDataQueue::setInQueueCryptoContext(CryptoContext* cc) |
| 729 | { |
| 730 | std::list<CryptoContext *>::iterator i; |
| 731 | |
| 732 | MutexLock lock(cryptoMutex); |
| 733 | // check if a CryptoContext for a SSRC already exists. If yes |
| 734 | // remove it from list before inserting the new one. |
| 735 | for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ) { |
| 736 | if( (*i)->getSsrc() == cc->getSsrc() ) { |
| 737 | CryptoContext* tmp = *i; |
| 738 | cryptoContexts.erase(i); |
| 739 | delete tmp; |
| 740 | break; |
| 741 | } |
| 742 | } |
| 743 | cryptoContexts.push_back(cc); |
| 744 | } |
| 745 | |
| 746 | void |
| 747 | IncomingDataQueue::removeInQueueCryptoContext(CryptoContext* cc) |
| 748 | { |
| 749 | std::list<CryptoContext *>::iterator i; |
| 750 | |
| 751 | MutexLock lock(cryptoMutex); |
| 752 | if (cc == NULL) { // Remove any incoming crypto contexts |
| 753 | for (i = cryptoContexts.begin(); i != cryptoContexts.end(); ) { |
| 754 | CryptoContext* tmp = *i; |
| 755 | i = cryptoContexts.erase(i); |
| 756 | delete tmp; |
| 757 | } |
| 758 | } |
| 759 | else { |
| 760 | for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ){ |
| 761 | if( (*i)->getSsrc() == cc->getSsrc() ) { |
| 762 | CryptoContext* tmp = *i; |
| 763 | cryptoContexts.erase(i); |
| 764 | delete tmp; |
| 765 | return; |
| 766 | } |
| 767 | } |
| 768 | } |
| 769 | } |
| 770 | |
| 771 | CryptoContext* |
| 772 | IncomingDataQueue::getInQueueCryptoContext(uint32 ssrc) |
| 773 | { |
| 774 | std::list<CryptoContext *>::iterator i; |
| 775 | |
| 776 | MutexLock lock(cryptoMutex); |
| 777 | for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ){ |
| 778 | if( (*i)->getSsrc() == ssrc) { |
| 779 | return (*i); |
| 780 | } |
| 781 | } |
| 782 | return NULL; |
| 783 | } |
| 784 | |
| 785 | #ifdef CCXX_NAMESPACES |
| 786 | } |
| 787 | #endif |
| 788 | |
| 789 | /** EMACS ** |
| 790 | * Local variables: |
| 791 | * mode: c++ |
| 792 | * c-basic-offset: 4 |
| 793 | * End: |
| 794 | */ |