Emeric Vigier | 2f62582 | 2012-08-06 11:09:52 -0400 | [diff] [blame] | 1 | // Copyright (C) 2001,2002,2004 Federico Montesino Pouzols <fedemp@altern.org> |
| 2 | // |
| 3 | // This program is free software; you can redistribute it and/or modify |
| 4 | // it under the terms of the GNU General Public License as published by |
| 5 | // the Free Software Foundation; either version 2 of the License, or |
| 6 | // (at your option) any later version. |
| 7 | // |
| 8 | // This program is distributed in the hope that it will be useful, |
| 9 | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 10 | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 11 | // GNU General Public License for more details. |
| 12 | // |
| 13 | // You should have received a copy of the GNU General Public License |
| 14 | // along with this program; if not, write to the Free Software |
| 15 | // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. |
| 16 | // |
| 17 | // As a special exception, you may use this file as part of a free software |
| 18 | // library without restriction. Specifically, if other files instantiate |
| 19 | // templates or use macros or inline functions from this file, or you compile |
| 20 | // this file and link it with other files to produce an executable, this |
| 21 | // file does not by itself cause the resulting executable to be covered by |
| 22 | // the GNU General Public License. This exception does not however |
| 23 | // invalidate any other reasons why the executable file might be covered by |
| 24 | // the GNU General Public License. |
| 25 | // |
| 26 | // This exception applies only to the code released under the name GNU |
| 27 | // ccRTP. If you copy code from other releases into a copy of GNU |
| 28 | // ccRTP, as the General Public License permits, the exception does |
| 29 | // not apply to the code that you add in this way. To avoid misleading |
| 30 | // anyone as to the status of such modified files, you must delete |
| 31 | // this exception notice from them. |
| 32 | // |
| 33 | // If you write modifications of your own for GNU ccRTP, it is your choice |
| 34 | // whether to permit this exception to apply to your modifications. |
| 35 | // If you do not wish that, delete this exception notice. |
| 36 | // |
| 37 | |
| 38 | #include "private.h" |
| 39 | #include <ccrtp/iqueue.h> |
| 40 | |
Alexandre Lision | ddd731e | 2014-01-31 11:50:08 -0500 | [diff] [blame] | 41 | NAMESPACE_COMMONCPP |
Emeric Vigier | 2f62582 | 2012-08-06 11:09:52 -0400 | [diff] [blame] | 42 | |
| 43 | const size_t IncomingDataQueueBase::defaultMaxRecvPacketSize = 65534; |
| 44 | |
| 45 | ConflictHandler::ConflictingTransportAddress:: |
| 46 | ConflictingTransportAddress(InetAddress na,tpport_t dtp, tpport_t ctp): |
| 47 | networkAddress(na), dataTransportPort(dtp), |
| 48 | controlTransportPort(ctp), next(NULL) |
| 49 | { |
Alexandre Lision | ddd731e | 2014-01-31 11:50:08 -0500 | [diff] [blame] | 50 | SysTime::gettimeofday(&lastPacketTime,NULL); |
Emeric Vigier | 2f62582 | 2012-08-06 11:09:52 -0400 | [diff] [blame] | 51 | } |
| 52 | |
| 53 | ConflictHandler::ConflictingTransportAddress* |
| 54 | ConflictHandler::searchDataConflict(InetAddress na, tpport_t dtp) |
| 55 | { |
| 56 | ConflictingTransportAddress* result = firstConflict; |
| 57 | while ( result->networkAddress != na || |
| 58 | result->dataTransportPort != dtp) |
| 59 | result = result->next; |
| 60 | return result; |
| 61 | } |
| 62 | |
| 63 | ConflictHandler::ConflictingTransportAddress* |
| 64 | ConflictHandler::searchControlConflict(InetAddress na, tpport_t ctp) |
| 65 | { |
| 66 | ConflictingTransportAddress* result = firstConflict; |
| 67 | while ( result && |
| 68 | (result->networkAddress != na || |
| 69 | result->controlTransportPort != ctp) ) |
| 70 | result = result->next; |
| 71 | return result; |
| 72 | } |
| 73 | |
| 74 | void |
| 75 | ConflictHandler::addConflict(const InetAddress& na, tpport_t dtp, tpport_t ctp) |
| 76 | { |
| 77 | ConflictingTransportAddress* nc = |
| 78 | new ConflictingTransportAddress(na,dtp,ctp); |
| 79 | |
| 80 | if ( lastConflict ) { |
| 81 | lastConflict->setNext(nc); |
| 82 | lastConflict = nc; |
| 83 | } else { |
| 84 | firstConflict = lastConflict = nc; |
| 85 | } |
| 86 | } |
| 87 | |
| 88 | const uint8 IncomingDataQueue::defaultMinValidPacketSequence = 0; |
| 89 | const uint16 IncomingDataQueue::defaultMaxPacketMisorder = 0; |
| 90 | const uint16 IncomingDataQueue::defaultMaxPacketDropout = 3000; |
| 91 | const size_t IncomingDataQueue::defaultMembersSize = |
| 92 | MembershipBookkeeping::defaultMembersHashSize; |
| 93 | |
| 94 | IncomingDataQueue::IncomingDataQueue(uint32 size) : |
| 95 | IncomingDataQueueBase(), MembershipBookkeeping(size) |
| 96 | { |
| 97 | recvFirst = recvLast = NULL; |
| 98 | sourceExpirationPeriod = 5; // 5 RTCP report intervals |
| 99 | minValidPacketSequence = getDefaultMinValidPacketSequence(); |
| 100 | maxPacketDropout = getDefaultMaxPacketDropout(); |
| 101 | maxPacketMisorder = getDefaultMaxPacketMisorder(); |
| 102 | } |
| 103 | |
| 104 | void |
| 105 | IncomingDataQueue::purgeIncomingQueue() |
| 106 | { |
| 107 | IncomingRTPPktLink* recvnext; |
| 108 | // flush the reception queue (incoming packets not yet |
| 109 | // retrieved) |
| 110 | recvLock.writeLock(); |
| 111 | while( recvFirst ) |
| 112 | { |
| 113 | recvnext = recvFirst->getNext(); |
| 114 | |
| 115 | // nullify source specific packet list |
| 116 | SyncSourceLink *s = recvFirst->getSourceLink(); |
| 117 | s->setFirst(NULL); |
| 118 | s->setLast(NULL); |
| 119 | |
| 120 | delete recvFirst->getPacket(); |
| 121 | delete recvFirst; |
| 122 | recvFirst = recvnext; |
| 123 | } |
| 124 | recvLock.unlock(); |
| 125 | } |
| 126 | |
| 127 | void |
| 128 | IncomingDataQueue::renewLocalSSRC() |
| 129 | { |
| 130 | const uint32 MAXTRIES = 20; |
| 131 | uint32 newssrc; |
| 132 | uint16 tries = 0; |
| 133 | do { |
| 134 | newssrc = random32(); |
| 135 | tries++; |
| 136 | } while ( (tries < MAXTRIES) && isRegistered(newssrc) ); |
| 137 | |
| 138 | if ( MAXTRIES == tries ) { |
| 139 | // TODO we are in real trouble. |
| 140 | } |
| 141 | } |
| 142 | |
| 143 | bool |
| 144 | IncomingDataQueue::isWaiting(const SyncSource* src) const |
| 145 | { |
| 146 | bool w; |
| 147 | recvLock.readLock(); |
| 148 | if ( NULL == src ) |
| 149 | w = ( NULL != recvFirst); |
| 150 | else |
| 151 | w = isMine(*src) && ( NULL != getLink(*src)->getFirst() ); |
| 152 | |
| 153 | recvLock.unlock(); |
| 154 | return w; |
| 155 | } |
| 156 | |
| 157 | uint32 |
| 158 | IncomingDataQueue::getFirstTimestamp(const SyncSource* src) const |
| 159 | { |
| 160 | recvLock.readLock(); |
| 161 | |
| 162 | // get the first packet |
| 163 | IncomingRTPPktLink* packetLink; |
| 164 | if ( NULL == src ) |
| 165 | packetLink = recvFirst; |
| 166 | else |
| 167 | packetLink = isMine(*src) ? getLink(*src)->getFirst() : NULL; |
| 168 | |
| 169 | // get the timestamp of the first packet |
| 170 | uint32 ts; |
| 171 | if ( packetLink ) |
| 172 | ts = packetLink->getTimestamp(); |
| 173 | else |
| 174 | ts = 0l; |
| 175 | |
| 176 | recvLock.unlock(); |
| 177 | return ts; |
| 178 | } |
| 179 | |
| 180 | size_t |
| 181 | IncomingDataQueue::takeInDataPacket(void) |
| 182 | { |
| 183 | InetHostAddress network_address; |
| 184 | tpport_t transport_port; |
| 185 | |
| 186 | uint32 nextSize = (uint32)getNextDataPacketSize(); |
| 187 | unsigned char* buffer = new unsigned char[nextSize]; |
| 188 | int32 rtn = (int32)recvData(buffer,nextSize,network_address,transport_port); |
| 189 | if ( (rtn < 0) || ((uint32)rtn > getMaxRecvPacketSize()) ){ |
| 190 | delete buffer; |
| 191 | return 0; |
| 192 | } |
| 193 | |
| 194 | // get time of arrival |
| 195 | struct timeval recvtime; |
Alexandre Lision | ddd731e | 2014-01-31 11:50:08 -0500 | [diff] [blame] | 196 | SysTime::gettimeofday(&recvtime,NULL); |
Emeric Vigier | 2f62582 | 2012-08-06 11:09:52 -0400 | [diff] [blame] | 197 | |
| 198 | // Special handling of padding to take care of encrypted content. |
| 199 | // In case of SRTP the padding length field is also encrypted, thus |
| 200 | // it gives a wrong length. Check and clear padding bit before |
| 201 | // creating the RTPPacket. Will be set and re-computed after a possible |
| 202 | // SRTP decryption. |
| 203 | uint8 padSet = (*buffer & 0x20); |
| 204 | if (padSet) { |
| 205 | *buffer = *buffer & ~0x20; // clear padding bit |
Alexandre Lision | ddd731e | 2014-01-31 11:50:08 -0500 | [diff] [blame] | 206 | } |
Emeric Vigier | 2f62582 | 2012-08-06 11:09:52 -0400 | [diff] [blame] | 207 | // build a packet. It will link itself to its source |
| 208 | IncomingRTPPkt* packet = |
| 209 | new IncomingRTPPkt(buffer,rtn); |
| 210 | |
| 211 | // Generic header validity check. |
| 212 | if ( !packet->isHeaderValid() ) { |
| 213 | delete packet; |
| 214 | return 0; |
| 215 | } |
| 216 | |
| 217 | CryptoContext* pcc = getInQueueCryptoContext( packet->getSSRC()); |
| 218 | if (pcc == NULL) { |
| 219 | pcc = getInQueueCryptoContext(0); |
| 220 | if (pcc != NULL) { |
| 221 | pcc = pcc->newCryptoContextForSSRC(packet->getSSRC(), 0, 0L); |
| 222 | if (pcc != NULL) { |
| 223 | pcc->deriveSrtpKeys(0); |
| 224 | setInQueueCryptoContext(pcc); |
| 225 | } |
| 226 | } |
| 227 | } |
| 228 | if (pcc != NULL) { |
| 229 | int32 ret = packet->unprotect(pcc); |
| 230 | if (ret < 0) { |
| 231 | if (!onSRTPPacketError(*packet, ret)) { |
| 232 | delete packet; |
| 233 | return 0; |
| 234 | } |
| 235 | } |
| 236 | } |
| 237 | if (padSet) { |
| 238 | packet->reComputePayLength(true); |
| 239 | } |
| 240 | // virtual for profile-specific validation and processing. |
| 241 | if ( !onRTPPacketRecv(*packet) ) { |
| 242 | delete packet; |
| 243 | return 0; |
| 244 | } |
| 245 | |
| 246 | bool source_created; |
| 247 | SyncSourceLink* sourceLink = |
| 248 | getSourceBySSRC(packet->getSSRC(),source_created); |
| 249 | SyncSource* s = sourceLink->getSource(); |
| 250 | if ( source_created ) { |
| 251 | // Set data transport address. |
| 252 | setDataTransportPort(*s,transport_port); |
| 253 | // Network address is assumed to be the same as the control one |
| 254 | setNetworkAddress(*s,network_address); |
| 255 | sourceLink->initStats(); |
| 256 | // First packet arrival time. |
| 257 | sourceLink->setInitialDataTime(recvtime); |
| 258 | sourceLink->setProbation(getMinValidPacketSequence()); |
| 259 | if ( sourceLink->getHello() ) |
| 260 | onNewSyncSource(*s); |
| 261 | } else if ( 0 == s->getDataTransportPort() ) { |
| 262 | // Test if RTCP packets had been received but this is the |
| 263 | // first data packet from this source. |
| 264 | setDataTransportPort(*s,transport_port); |
| 265 | } |
| 266 | |
| 267 | // Before inserting in the queue, |
| 268 | // 1) check for collisions and loops. If the packet cannot be |
| 269 | // assigned to a source, it will be rejected. |
| 270 | // 2) check the source is a sufficiently well known source |
| 271 | // TODO: also check CSRC identifiers. |
| 272 | if ( checkSSRCInIncomingRTPPkt(*sourceLink,source_created, |
| 273 | network_address,transport_port) && |
| 274 | recordReception(*sourceLink,*packet,recvtime) ) { |
| 275 | // now the packet link is linked in the queues |
| 276 | IncomingRTPPktLink* packetLink = |
| 277 | new IncomingRTPPktLink(packet, |
| 278 | sourceLink, |
| 279 | recvtime, |
| 280 | packet->getTimestamp() - |
| 281 | sourceLink->getInitialDataTimestamp(), |
| 282 | NULL,NULL,NULL,NULL); |
| 283 | insertRecvPacket(packetLink); |
| 284 | } else { |
| 285 | // must be discarded due to collision or loop or |
| 286 | // invalid source |
| 287 | delete packet; |
| 288 | } |
| 289 | |
| 290 | // ccRTP keeps packets from the new source, but avoids |
| 291 | // flip-flopping. This allows losing less packets and for |
| 292 | // mobile telephony applications or other apps that may change |
| 293 | // the source transport address during the session. |
| 294 | return rtn; |
| 295 | } |
| 296 | |
| 297 | bool IncomingDataQueue::checkSSRCInIncomingRTPPkt(SyncSourceLink& sourceLink, |
| 298 | bool is_new, InetAddress& network_address, tpport_t transport_port) |
| 299 | { |
| 300 | bool result = true; |
| 301 | |
| 302 | // Test if the source is new and it is not the local one. |
| 303 | if ( is_new && |
| 304 | sourceLink.getSource()->getID() != getLocalSSRC() ) |
| 305 | return result; |
| 306 | |
| 307 | SyncSource *s = sourceLink.getSource(); |
| 308 | |
| 309 | if ( s->getDataTransportPort() != transport_port || |
| 310 | s->getNetworkAddress() != network_address ) { |
| 311 | // SSRC collision or a loop has happened |
| 312 | if ( s->getID() != getLocalSSRC() ) { |
| 313 | // TODO: Optional error counter. |
| 314 | |
| 315 | // Note this differs from the default in the RFC. |
| 316 | // Discard packet only when the collision is |
| 317 | // repeating (to avoid flip-flopping) |
| 318 | if ( sourceLink.getPrevConflict() && |
| 319 | ( |
| 320 | (network_address == |
| 321 | sourceLink.getPrevConflict()->networkAddress) |
| 322 | && |
| 323 | (transport_port == |
| 324 | sourceLink.getPrevConflict()->dataTransportPort) |
| 325 | ) ) { |
| 326 | // discard packet and do not flip-flop |
| 327 | result = false; |
| 328 | } else { |
| 329 | // Record who has collided so that in |
| 330 | // the future we can how if the |
| 331 | // collision repeats. |
| 332 | sourceLink.setPrevConflict(network_address, |
| 333 | transport_port,0); |
| 334 | // Change sync source transport address |
| 335 | setDataTransportPort(*s,transport_port); |
| 336 | setNetworkAddress(*s,network_address); |
| 337 | } |
| 338 | |
| 339 | } else { |
| 340 | // Collision or loop of own packets. |
| 341 | ConflictingTransportAddress* conflicting = |
| 342 | searchDataConflict(network_address, |
| 343 | transport_port); |
| 344 | if ( conflicting ) { |
| 345 | // Optional error counter. |
| 346 | updateConflict(*conflicting); |
| 347 | result = false; |
| 348 | } else { |
| 349 | // New collision |
| 350 | addConflict(s->getNetworkAddress(), |
| 351 | s->getDataTransportPort(), |
| 352 | s->getControlTransportPort()); |
| 353 | dispatchBYE("SSRC collision detected when receiving data packet."); |
| 354 | renewLocalSSRC(); |
| 355 | setNetworkAddress(*s,network_address); |
| 356 | setDataTransportPort(*s,transport_port); |
| 357 | setControlTransportPort(*s,0); |
| 358 | sourceLink.initStats(); |
| 359 | sourceLink.setProbation(getMinValidPacketSequence()); |
| 360 | } |
| 361 | } |
| 362 | } |
| 363 | return result; |
| 364 | } |
| 365 | |
| 366 | bool |
| 367 | IncomingDataQueue::insertRecvPacket(IncomingRTPPktLink* packetLink) |
| 368 | { |
| 369 | SyncSourceLink *srcLink = packetLink->getSourceLink(); |
| 370 | unsigned short seq = packetLink->getPacket()->getSeqNum(); |
| 371 | recvLock.writeLock(); |
| 372 | IncomingRTPPktLink* plink = srcLink->getLast(); |
| 373 | if ( plink && (seq < plink->getPacket()->getSeqNum()) ) { |
| 374 | // a disordered packet, so look for its place |
| 375 | while ( plink && (seq < plink->getPacket()->getSeqNum()) ){ |
| 376 | // the packet is a duplicate |
| 377 | if ( seq == plink->getPacket()->getSeqNum() ) { |
| 378 | recvLock.unlock(); |
| 379 | VDL(("Duplicated disordered packet: seqnum %d, SSRC:", |
| 380 | seq,srcLink->getSource()->getID())); |
| 381 | delete packetLink->getPacket(); |
| 382 | delete packetLink; |
| 383 | return false; |
| 384 | } |
| 385 | plink = plink->getSrcPrev(); |
| 386 | } |
| 387 | if ( !plink ) { |
| 388 | // we have scanned the whole (and non empty) |
| 389 | // list, so this must be the older (first) |
| 390 | // packet from this source. |
| 391 | |
| 392 | // insert into the source specific queue |
| 393 | IncomingRTPPktLink* srcFirst = srcLink->getFirst(); |
| 394 | srcFirst->setSrcPrev(packetLink); |
| 395 | packetLink->setSrcNext(srcFirst); |
| 396 | // insert into the global queue |
| 397 | IncomingRTPPktLink* prevFirst = srcFirst->getPrev(); |
| 398 | if ( prevFirst ){ |
| 399 | prevFirst->setNext(packetLink); |
| 400 | packetLink->setPrev(prevFirst); |
| 401 | } |
| 402 | srcFirst->setPrev(packetLink); |
| 403 | packetLink->setNext(srcFirst); |
| 404 | srcLink->setFirst(packetLink); |
| 405 | } else { |
| 406 | // (we are in the middle of the source list) |
| 407 | // insert into the source specific queue |
| 408 | plink->getSrcNext()->setSrcPrev(packetLink); |
| 409 | packetLink->setSrcNext(plink->getSrcNext()); |
| 410 | // -- insert into the global queue, with the |
| 411 | // minimum priority compared to packets from |
| 412 | // other sources |
| 413 | plink->getSrcNext()->getPrev()->setNext(packetLink); |
| 414 | packetLink->setPrev(plink->getSrcNext()->getPrev()); |
| 415 | plink->getSrcNext()->setPrev(packetLink); |
| 416 | packetLink->setNext(plink->getSrcNext()); |
| 417 | // ------ |
| 418 | plink->setSrcNext(packetLink); |
| 419 | packetLink->setSrcPrev(plink); |
| 420 | // insert into the global queue (giving |
| 421 | // priority compared to packets from other sources) |
| 422 | //list->getNext->setPrev(packetLink); |
| 423 | //packetLink->setNext(list->getNext); |
| 424 | //list->setNext(packet); |
| 425 | //packet->setPrev(list); |
| 426 | } |
| 427 | } else { |
| 428 | // An ordered packet |
| 429 | if ( !plink ) { |
| 430 | // the only packet in the source specific queue |
| 431 | srcLink->setLast(packetLink); |
| 432 | srcLink->setFirst(packetLink); |
| 433 | // the last packet in the global queue |
| 434 | if ( recvLast ) { |
| 435 | recvLast->setNext(packetLink); |
| 436 | packetLink->setPrev(recvLast); |
| 437 | } |
| 438 | recvLast = packetLink; |
| 439 | if ( !recvFirst ) |
| 440 | recvFirst = packetLink; |
| 441 | } else { |
| 442 | // there are already more packets from this source. |
| 443 | // this ignores duplicate packets |
| 444 | if ( plink && (seq == plink->getPacket()->getSeqNum()) ) { |
| 445 | VDL(("Duplicated packet: seqnum %d, SSRC:", |
| 446 | seq,srcLink->getSource->getID())); |
| 447 | recvLock.unlock(); |
| 448 | delete packetLink->getPacket(); |
| 449 | delete packetLink; |
| 450 | return false; |
| 451 | } |
| 452 | // the last packet in the source specific queue |
| 453 | srcLink->getLast()->setSrcNext(packetLink); |
| 454 | packetLink->setSrcPrev(srcLink->getLast()); |
| 455 | srcLink->setLast(packetLink); |
| 456 | // the last packet in the global queue |
| 457 | recvLast->setNext(packetLink); |
| 458 | packetLink->setPrev(recvLast); |
| 459 | recvLast = packetLink; |
| 460 | } |
| 461 | } |
| 462 | // account the insertion of this packet into the queue |
| 463 | srcLink->recordInsertion(*packetLink); |
| 464 | recvLock.unlock(); |
| 465 | // packet successfully inserted |
| 466 | return true; |
| 467 | } |
| 468 | |
| 469 | const AppDataUnit* |
| 470 | IncomingDataQueue::getData(uint32 stamp, const SyncSource* src) |
| 471 | { |
| 472 | IncomingRTPPktLink* pl; |
| 473 | // unsigned count = 0; |
| 474 | AppDataUnit* result; |
| 475 | |
| 476 | if ( NULL != (pl = getWaiting(stamp,src)) ) { |
| 477 | IncomingRTPPkt* packet = pl->getPacket(); |
| 478 | // size_t len = packet->getPayloadSize(); |
| 479 | |
| 480 | SyncSource &src = *(pl->getSourceLink()->getSource()); |
| 481 | result = new AppDataUnit(*packet,src); |
| 482 | |
| 483 | // delete the packet link, but not the packet |
| 484 | delete pl; |
| 485 | // count += len; |
| 486 | } else { |
| 487 | result = NULL; |
| 488 | } |
| 489 | return result; |
| 490 | } |
| 491 | |
| 492 | // FIX: try to merge and organize |
| 493 | IncomingDataQueue::IncomingRTPPktLink* |
| 494 | IncomingDataQueue::getWaiting(uint32 timestamp, const SyncSource* src) |
| 495 | { |
| 496 | if ( src && !isMine(*src) ) |
| 497 | return NULL; |
| 498 | |
| 499 | IncomingRTPPktLink *result; |
| 500 | recvLock.writeLock(); |
| 501 | if ( src != NULL ) { |
| 502 | // process source specific queries: |
| 503 | // we will modify the queue of this source |
| 504 | SyncSourceLink* srcm = getLink(*src); |
| 505 | |
| 506 | // first, delete all older packets. The while loop |
| 507 | // down here counts how many older packets are there; |
| 508 | // then the for loop deletes them and advances l till |
| 509 | // the first non older packet. |
| 510 | int nold = 0; |
| 511 | IncomingRTPPktLink* l = srcm->getFirst(); |
| 512 | if ( !l ) { |
| 513 | result = NULL; |
| 514 | recvLock.unlock(); |
| 515 | return result; |
| 516 | } |
| 517 | while ( l && ((l->getTimestamp() < timestamp) || |
| 518 | end2EndDelayed(*l))) { |
| 519 | nold++; |
| 520 | l = l->getSrcNext(); |
| 521 | } |
| 522 | // to know whether the global queue gets empty |
| 523 | bool nonempty = false; |
| 524 | for ( int i = 0; i < nold; i++) { |
| 525 | l = srcm->getFirst(); |
| 526 | srcm->setFirst(srcm->getFirst()->getSrcNext());; |
| 527 | // unlink from the global queue |
| 528 | nonempty = false; |
| 529 | if ( l->getPrev() ){ |
| 530 | nonempty = true; |
| 531 | l->getPrev()->setNext(l->getNext()); |
| 532 | } if ( l->getNext() ) { |
| 533 | nonempty = true; |
| 534 | l->getNext()->setPrev(l->getPrev()); |
| 535 | } |
| 536 | // now, delete it |
| 537 | onExpireRecv(*(l->getPacket()));// notify packet discard |
| 538 | delete l->getPacket(); |
| 539 | delete l; |
| 540 | } |
| 541 | // return the packet, if found |
| 542 | if ( !srcm->getFirst() ) { |
| 543 | // threre are no more packets from this source |
| 544 | srcm->setLast(NULL); |
| 545 | if ( !nonempty ) |
| 546 | recvFirst = recvLast = NULL; |
| 547 | result = NULL; |
| 548 | } else if ( srcm->getFirst()->getTimestamp() > timestamp ) { |
| 549 | // threre are only newer packets from this source |
| 550 | srcm->getFirst()->setSrcPrev(NULL); |
| 551 | result = NULL; |
| 552 | } else { |
| 553 | // (src->getFirst()->getTimestamp() == stamp) is true |
| 554 | result = srcm->getFirst(); |
| 555 | // unlink the selected packet from the global queue |
| 556 | if ( result->getPrev() ) |
| 557 | result->getPrev()->setNext(result->getNext()); |
| 558 | else |
| 559 | recvFirst = result->getNext(); |
| 560 | if ( result->getNext() ) |
| 561 | result->getNext()->setPrev(result->getPrev()); |
| 562 | else |
| 563 | recvLast = result->getPrev(); |
| 564 | // unlink the selected packet from the source queue |
| 565 | srcm->setFirst(result->getSrcNext()); |
| 566 | if ( srcm->getFirst() ) |
| 567 | srcm->getFirst()->setPrev(NULL); |
| 568 | else |
| 569 | srcm->setLast(NULL); |
| 570 | } |
| 571 | } else { |
| 572 | // process source unspecific queries |
| 573 | int nold = 0; |
| 574 | IncomingRTPPktLink* l = recvFirst; |
| 575 | while ( l && (l->getTimestamp() < timestamp || |
| 576 | end2EndDelayed(*l) ) ){ |
| 577 | nold++; |
| 578 | l = l->getNext(); |
| 579 | } |
| 580 | for (int i = 0; i < nold; i++) { |
| 581 | IncomingRTPPktLink* l = recvFirst; |
| 582 | recvFirst = recvFirst->getNext(); |
| 583 | // unlink the packet from the queue of its source |
| 584 | SyncSourceLink* src = l->getSourceLink(); |
| 585 | src->setFirst(l->getSrcNext()); |
| 586 | if ( l->getSrcNext() ) |
| 587 | l->getSrcNext()->setSrcPrev(NULL); |
| 588 | else |
| 589 | src->setLast(NULL); |
| 590 | // now, delete it |
| 591 | onExpireRecv(*(l->getPacket()));// notify packet discard |
| 592 | delete l->getPacket(); |
| 593 | delete l; |
| 594 | } |
| 595 | |
| 596 | // return the packet, if found |
| 597 | if ( !recvFirst ) { |
| 598 | // there are no more packets in the queue |
| 599 | recvLast = NULL; |
| 600 | result = NULL; |
| 601 | } else if ( recvFirst->getTimestamp() > timestamp ) { |
| 602 | // there are only newer packets in the queue |
| 603 | l->setPrev(NULL); |
| 604 | result = NULL; |
| 605 | } else { |
| 606 | // (recvFirst->getTimestamp() == stamp) is true |
| 607 | result = recvFirst; |
| 608 | // unlink the selected packet from the global queue |
| 609 | recvFirst = recvFirst->getNext(); |
| 610 | if ( recvFirst ) |
| 611 | recvFirst->setPrev(NULL); |
| 612 | else |
| 613 | recvLast = NULL; |
| 614 | // unlink the selected packet from the queue |
| 615 | // of its source |
| 616 | SyncSourceLink* src = result->getSourceLink(); |
| 617 | src->setFirst(result->getSrcNext()); |
| 618 | if ( src->getFirst() ) |
| 619 | src->getFirst()->setSrcPrev(NULL); |
| 620 | else |
| 621 | src->setLast(NULL); |
| 622 | } |
| 623 | } |
| 624 | recvLock.unlock(); |
| 625 | return result; |
| 626 | } |
| 627 | |
| 628 | bool |
| 629 | IncomingDataQueue::recordReception(SyncSourceLink& srcLink, |
| 630 | const IncomingRTPPkt& pkt, const timeval recvtime) |
| 631 | { |
| 632 | bool result = true; |
| 633 | |
| 634 | // Source validation. |
| 635 | SyncSource* src = srcLink.getSource(); |
| 636 | if ( !(srcLink.isValid()) ) { |
| 637 | // source is not yet valid. |
| 638 | if ( pkt.getSeqNum() == srcLink.getMaxSeqNum() + 1 ) { |
| 639 | // packet in sequence. |
| 640 | srcLink.decProbation(); |
| 641 | if ( srcLink.isValid() ) { |
| 642 | // source has become valid. |
| 643 | // TODO: avoid this the first time. |
| 644 | srcLink.initSequence(pkt.getSeqNum()); |
| 645 | } else { |
| 646 | result = false; |
| 647 | } |
| 648 | } else { |
| 649 | // packet not in sequence. |
| 650 | srcLink.probation = getMinValidPacketSequence() - 1; |
| 651 | result = false; |
| 652 | } |
| 653 | srcLink.setMaxSeqNum(pkt.getSeqNum()); |
| 654 | } else { |
| 655 | // source was already valid. |
| 656 | uint16 step = pkt.getSeqNum() - srcLink.getMaxSeqNum(); |
| 657 | if ( step < getMaxPacketDropout() ) { |
| 658 | // Ordered, with not too high step. |
| 659 | if ( pkt.getSeqNum() < srcLink.getMaxSeqNum() ) { |
| 660 | // sequene number wrapped. |
| 661 | srcLink.incSeqNumAccum(); |
| 662 | } |
| 663 | srcLink.setMaxSeqNum(pkt.getSeqNum()); |
| 664 | } else if ( step <= (SEQNUMMOD - getMaxPacketMisorder()) ) { |
| 665 | // too high step of the sequence number. |
| 666 | if ( pkt.getSeqNum() == srcLink.getBadSeqNum() ) { |
| 667 | srcLink.initSequence(pkt.getSeqNum()); |
| 668 | } else { |
| 669 | srcLink.setBadSeqNum((pkt.getSeqNum() + 1) & |
| 670 | (SEQNUMMOD - 1) ); |
| 671 | //This additional check avoids that |
| 672 | //the very first packet from a source |
| 673 | //be discarded. |
| 674 | if ( 0 < srcLink.getObservedPacketCount() ) { |
| 675 | result = false; |
| 676 | } else { |
| 677 | srcLink.setMaxSeqNum(pkt.getSeqNum()); |
| 678 | } |
| 679 | } |
| 680 | } else { |
| 681 | // duplicate or reordered packet |
| 682 | } |
| 683 | } |
| 684 | |
| 685 | if ( result ) { |
| 686 | // the packet is considered valid. |
| 687 | srcLink.incObservedPacketCount(); |
| 688 | srcLink.incObservedOctetCount(pkt.getPayloadSize()); |
| 689 | srcLink.lastPacketTime = recvtime; |
| 690 | if ( srcLink.getObservedPacketCount() == 1 ) { |
| 691 | // ooops, it's the first packet from this source |
| 692 | setSender(*src,true); |
| 693 | srcLink.setInitialDataTimestamp(pkt.getTimestamp()); |
| 694 | } |
| 695 | // we record the last time a packet from this source |
| 696 | // was received, this has statistical interest and is |
| 697 | // needed to time out old senders that are no sending |
| 698 | // any longer. |
| 699 | |
| 700 | // compute the interarrival jitter estimation. |
| 701 | timeval tarrival; |
| 702 | timeval lastT = srcLink.getLastPacketTime(); |
| 703 | timeval initial = srcLink.getInitialDataTime(); |
| 704 | timersub(&lastT,&initial,&tarrival); |
| 705 | uint32 arrival = timeval2microtimeout(tarrival) |
| 706 | * getCurrentRTPClockRate(); |
| 707 | uint32 transitTime = arrival - pkt.getTimestamp(); |
| 708 | int32 delta = transitTime - |
| 709 | srcLink.getLastPacketTransitTime(); |
| 710 | srcLink.setLastPacketTransitTime(transitTime); |
| 711 | if ( delta < 0 ) |
| 712 | delta = -delta; |
| 713 | srcLink.setJitter( srcLink.getJitter() + |
| 714 | (1.0f / 16.0f) * |
| 715 | (static_cast<float>(delta) - |
| 716 | srcLink.getJitter())); |
| 717 | } |
| 718 | return result; |
| 719 | } |
| 720 | |
| 721 | void |
| 722 | IncomingDataQueue::recordExtraction(const IncomingRTPPkt&) |
| 723 | {} |
| 724 | |
| 725 | void |
| 726 | IncomingDataQueue::setInQueueCryptoContext(CryptoContext* cc) |
| 727 | { |
| 728 | std::list<CryptoContext *>::iterator i; |
| 729 | |
| 730 | MutexLock lock(cryptoMutex); |
| 731 | // check if a CryptoContext for a SSRC already exists. If yes |
| 732 | // remove it from list before inserting the new one. |
| 733 | for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ) { |
| 734 | if( (*i)->getSsrc() == cc->getSsrc() ) { |
| 735 | CryptoContext* tmp = *i; |
| 736 | cryptoContexts.erase(i); |
| 737 | delete tmp; |
| 738 | break; |
| 739 | } |
| 740 | } |
| 741 | cryptoContexts.push_back(cc); |
| 742 | } |
| 743 | |
| 744 | void |
| 745 | IncomingDataQueue::removeInQueueCryptoContext(CryptoContext* cc) |
| 746 | { |
| 747 | std::list<CryptoContext *>::iterator i; |
| 748 | |
| 749 | MutexLock lock(cryptoMutex); |
| 750 | if (cc == NULL) { // Remove any incoming crypto contexts |
| 751 | for (i = cryptoContexts.begin(); i != cryptoContexts.end(); ) { |
| 752 | CryptoContext* tmp = *i; |
| 753 | i = cryptoContexts.erase(i); |
| 754 | delete tmp; |
| 755 | } |
| 756 | } |
| 757 | else { |
| 758 | for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ){ |
| 759 | if( (*i)->getSsrc() == cc->getSsrc() ) { |
| 760 | CryptoContext* tmp = *i; |
| 761 | cryptoContexts.erase(i); |
| 762 | delete tmp; |
| 763 | return; |
| 764 | } |
| 765 | } |
| 766 | } |
| 767 | } |
| 768 | |
| 769 | CryptoContext* |
| 770 | IncomingDataQueue::getInQueueCryptoContext(uint32 ssrc) |
| 771 | { |
| 772 | std::list<CryptoContext *>::iterator i; |
| 773 | |
| 774 | MutexLock lock(cryptoMutex); |
| 775 | for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ){ |
| 776 | if( (*i)->getSsrc() == ssrc) { |
| 777 | return (*i); |
| 778 | } |
| 779 | } |
| 780 | return NULL; |
| 781 | } |
| 782 | |
Alexandre Lision | ddd731e | 2014-01-31 11:50:08 -0500 | [diff] [blame] | 783 | END_NAMESPACE |
Emeric Vigier | 2f62582 | 2012-08-06 11:09:52 -0400 | [diff] [blame] | 784 | |
| 785 | /** EMACS ** |
| 786 | * Local variables: |
| 787 | * mode: c++ |
| 788 | * c-basic-offset: 4 |
| 789 | * End: |
| 790 | */ |