blob: c00e1977750cf8d5a1ef678e62238e5f646999b6 [file] [log] [blame]
Emeric Vigier2f625822012-08-06 11:09:52 -04001// Copyright (C) 2001,2002,2004 Federico Montesino Pouzols <fedemp@altern.org>
2//
3// This program is free software; you can redistribute it and/or modify
4// it under the terms of the GNU General Public License as published by
5// the Free Software Foundation; either version 2 of the License, or
6// (at your option) any later version.
7//
8// This program is distributed in the hope that it will be useful,
9// but WITHOUT ANY WARRANTY; without even the implied warranty of
10// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11// GNU General Public License for more details.
12//
13// You should have received a copy of the GNU General Public License
14// along with this program; if not, write to the Free Software
15// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
16//
17// As a special exception, you may use this file as part of a free software
18// library without restriction. Specifically, if other files instantiate
19// templates or use macros or inline functions from this file, or you compile
20// this file and link it with other files to produce an executable, this
21// file does not by itself cause the resulting executable to be covered by
22// the GNU General Public License. This exception does not however
23// invalidate any other reasons why the executable file might be covered by
24// the GNU General Public License.
25//
26// This exception applies only to the code released under the name GNU
27// ccRTP. If you copy code from other releases into a copy of GNU
28// ccRTP, as the General Public License permits, the exception does
29// not apply to the code that you add in this way. To avoid misleading
30// anyone as to the status of such modified files, you must delete
31// this exception notice from them.
32//
33// If you write modifications of your own for GNU ccRTP, it is your choice
34// whether to permit this exception to apply to your modifications.
35// If you do not wish that, delete this exception notice.
36//
37
38#include "private.h"
39#include <ccrtp/iqueue.h>
40
41#ifdef CCXX_NAMESPACES
42namespace ost {
43#endif
44
45const size_t IncomingDataQueueBase::defaultMaxRecvPacketSize = 65534;
46
47ConflictHandler::ConflictingTransportAddress::
48ConflictingTransportAddress(InetAddress na,tpport_t dtp, tpport_t ctp):
49networkAddress(na), dataTransportPort(dtp),
50controlTransportPort(ctp), next(NULL)
51{
52 gettimeofday(&lastPacketTime,NULL);
53}
54
55ConflictHandler::ConflictingTransportAddress*
56ConflictHandler::searchDataConflict(InetAddress na, tpport_t dtp)
57{
58 ConflictingTransportAddress* result = firstConflict;
59 while ( result->networkAddress != na ||
60 result->dataTransportPort != dtp)
61 result = result->next;
62 return result;
63}
64
65ConflictHandler::ConflictingTransportAddress*
66ConflictHandler::searchControlConflict(InetAddress na, tpport_t ctp)
67{
68 ConflictingTransportAddress* result = firstConflict;
69 while ( result &&
70 (result->networkAddress != na ||
71 result->controlTransportPort != ctp) )
72 result = result->next;
73 return result;
74}
75
76void
77ConflictHandler::addConflict(const InetAddress& na, tpport_t dtp, tpport_t ctp)
78{
79 ConflictingTransportAddress* nc =
80 new ConflictingTransportAddress(na,dtp,ctp);
81
82 if ( lastConflict ) {
83 lastConflict->setNext(nc);
84 lastConflict = nc;
85 } else {
86 firstConflict = lastConflict = nc;
87 }
88}
89
90const uint8 IncomingDataQueue::defaultMinValidPacketSequence = 0;
91const uint16 IncomingDataQueue::defaultMaxPacketMisorder = 0;
92const uint16 IncomingDataQueue::defaultMaxPacketDropout = 3000;
93const size_t IncomingDataQueue::defaultMembersSize =
94MembershipBookkeeping::defaultMembersHashSize;
95
96IncomingDataQueue::IncomingDataQueue(uint32 size) :
97IncomingDataQueueBase(), MembershipBookkeeping(size)
98{
99 recvFirst = recvLast = NULL;
100 sourceExpirationPeriod = 5; // 5 RTCP report intervals
101 minValidPacketSequence = getDefaultMinValidPacketSequence();
102 maxPacketDropout = getDefaultMaxPacketDropout();
103 maxPacketMisorder = getDefaultMaxPacketMisorder();
104}
105
106void
107IncomingDataQueue::purgeIncomingQueue()
108{
109 IncomingRTPPktLink* recvnext;
110 // flush the reception queue (incoming packets not yet
111 // retrieved)
112 recvLock.writeLock();
113 while( recvFirst )
114 {
115 recvnext = recvFirst->getNext();
116
117 // nullify source specific packet list
118 SyncSourceLink *s = recvFirst->getSourceLink();
119 s->setFirst(NULL);
120 s->setLast(NULL);
121
122 delete recvFirst->getPacket();
123 delete recvFirst;
124 recvFirst = recvnext;
125 }
126 recvLock.unlock();
127}
128
129void
130IncomingDataQueue::renewLocalSSRC()
131{
132 const uint32 MAXTRIES = 20;
133 uint32 newssrc;
134 uint16 tries = 0;
135 do {
136 newssrc = random32();
137 tries++;
138 } while ( (tries < MAXTRIES) && isRegistered(newssrc) );
139
140 if ( MAXTRIES == tries ) {
141 // TODO we are in real trouble.
142 }
143}
144
145bool
146IncomingDataQueue::isWaiting(const SyncSource* src) const
147{
148 bool w;
149 recvLock.readLock();
150 if ( NULL == src )
151 w = ( NULL != recvFirst);
152 else
153 w = isMine(*src) && ( NULL != getLink(*src)->getFirst() );
154
155 recvLock.unlock();
156 return w;
157}
158
159uint32
160IncomingDataQueue::getFirstTimestamp(const SyncSource* src) const
161{
162 recvLock.readLock();
163
164 // get the first packet
165 IncomingRTPPktLink* packetLink;
166 if ( NULL == src )
167 packetLink = recvFirst;
168 else
169 packetLink = isMine(*src) ? getLink(*src)->getFirst() : NULL;
170
171 // get the timestamp of the first packet
172 uint32 ts;
173 if ( packetLink )
174 ts = packetLink->getTimestamp();
175 else
176 ts = 0l;
177
178 recvLock.unlock();
179 return ts;
180}
181
182size_t
183IncomingDataQueue::takeInDataPacket(void)
184{
185 InetHostAddress network_address;
186 tpport_t transport_port;
187
188 uint32 nextSize = (uint32)getNextDataPacketSize();
189 unsigned char* buffer = new unsigned char[nextSize];
190 int32 rtn = (int32)recvData(buffer,nextSize,network_address,transport_port);
191 if ( (rtn < 0) || ((uint32)rtn > getMaxRecvPacketSize()) ){
192 delete buffer;
193 return 0;
194 }
195
196 // get time of arrival
197 struct timeval recvtime;
198 gettimeofday(&recvtime,NULL);
199
200 // Special handling of padding to take care of encrypted content.
201 // In case of SRTP the padding length field is also encrypted, thus
202 // it gives a wrong length. Check and clear padding bit before
203 // creating the RTPPacket. Will be set and re-computed after a possible
204 // SRTP decryption.
205 uint8 padSet = (*buffer & 0x20);
206 if (padSet) {
207 *buffer = *buffer & ~0x20; // clear padding bit
208 }
209 // build a packet. It will link itself to its source
210 IncomingRTPPkt* packet =
211 new IncomingRTPPkt(buffer,rtn);
212
213 // Generic header validity check.
214 if ( !packet->isHeaderValid() ) {
215 delete packet;
216 return 0;
217 }
218
219 CryptoContext* pcc = getInQueueCryptoContext( packet->getSSRC());
220 if (pcc == NULL) {
221 pcc = getInQueueCryptoContext(0);
222 if (pcc != NULL) {
223 pcc = pcc->newCryptoContextForSSRC(packet->getSSRC(), 0, 0L);
224 if (pcc != NULL) {
225 pcc->deriveSrtpKeys(0);
226 setInQueueCryptoContext(pcc);
227 }
228 }
229 }
230 if (pcc != NULL) {
231 int32 ret = packet->unprotect(pcc);
232 if (ret < 0) {
233 if (!onSRTPPacketError(*packet, ret)) {
234 delete packet;
235 return 0;
236 }
237 }
238 }
239 if (padSet) {
240 packet->reComputePayLength(true);
241 }
242 // virtual for profile-specific validation and processing.
243 if ( !onRTPPacketRecv(*packet) ) {
244 delete packet;
245 return 0;
246 }
247
248 bool source_created;
249 SyncSourceLink* sourceLink =
250 getSourceBySSRC(packet->getSSRC(),source_created);
251 SyncSource* s = sourceLink->getSource();
252 if ( source_created ) {
253 // Set data transport address.
254 setDataTransportPort(*s,transport_port);
255 // Network address is assumed to be the same as the control one
256 setNetworkAddress(*s,network_address);
257 sourceLink->initStats();
258 // First packet arrival time.
259 sourceLink->setInitialDataTime(recvtime);
260 sourceLink->setProbation(getMinValidPacketSequence());
261 if ( sourceLink->getHello() )
262 onNewSyncSource(*s);
263 } else if ( 0 == s->getDataTransportPort() ) {
264 // Test if RTCP packets had been received but this is the
265 // first data packet from this source.
266 setDataTransportPort(*s,transport_port);
267 }
268
269 // Before inserting in the queue,
270 // 1) check for collisions and loops. If the packet cannot be
271 // assigned to a source, it will be rejected.
272 // 2) check the source is a sufficiently well known source
273 // TODO: also check CSRC identifiers.
274 if ( checkSSRCInIncomingRTPPkt(*sourceLink,source_created,
275 network_address,transport_port) &&
276 recordReception(*sourceLink,*packet,recvtime) ) {
277 // now the packet link is linked in the queues
278 IncomingRTPPktLink* packetLink =
279 new IncomingRTPPktLink(packet,
280 sourceLink,
281 recvtime,
282 packet->getTimestamp() -
283 sourceLink->getInitialDataTimestamp(),
284 NULL,NULL,NULL,NULL);
285 insertRecvPacket(packetLink);
286 } else {
287 // must be discarded due to collision or loop or
288 // invalid source
289 delete packet;
290 }
291
292 // ccRTP keeps packets from the new source, but avoids
293 // flip-flopping. This allows losing less packets and for
294 // mobile telephony applications or other apps that may change
295 // the source transport address during the session.
296 return rtn;
297}
298
299bool IncomingDataQueue::checkSSRCInIncomingRTPPkt(SyncSourceLink& sourceLink,
300bool is_new, InetAddress& network_address, tpport_t transport_port)
301{
302 bool result = true;
303
304 // Test if the source is new and it is not the local one.
305 if ( is_new &&
306 sourceLink.getSource()->getID() != getLocalSSRC() )
307 return result;
308
309 SyncSource *s = sourceLink.getSource();
310
311 if ( s->getDataTransportPort() != transport_port ||
312 s->getNetworkAddress() != network_address ) {
313 // SSRC collision or a loop has happened
314 if ( s->getID() != getLocalSSRC() ) {
315 // TODO: Optional error counter.
316
317 // Note this differs from the default in the RFC.
318 // Discard packet only when the collision is
319 // repeating (to avoid flip-flopping)
320 if ( sourceLink.getPrevConflict() &&
321 (
322 (network_address ==
323 sourceLink.getPrevConflict()->networkAddress)
324 &&
325 (transport_port ==
326 sourceLink.getPrevConflict()->dataTransportPort)
327 ) ) {
328 // discard packet and do not flip-flop
329 result = false;
330 } else {
331 // Record who has collided so that in
332 // the future we can how if the
333 // collision repeats.
334 sourceLink.setPrevConflict(network_address,
335 transport_port,0);
336 // Change sync source transport address
337 setDataTransportPort(*s,transport_port);
338 setNetworkAddress(*s,network_address);
339 }
340
341 } else {
342 // Collision or loop of own packets.
343 ConflictingTransportAddress* conflicting =
344 searchDataConflict(network_address,
345 transport_port);
346 if ( conflicting ) {
347 // Optional error counter.
348 updateConflict(*conflicting);
349 result = false;
350 } else {
351 // New collision
352 addConflict(s->getNetworkAddress(),
353 s->getDataTransportPort(),
354 s->getControlTransportPort());
355 dispatchBYE("SSRC collision detected when receiving data packet.");
356 renewLocalSSRC();
357 setNetworkAddress(*s,network_address);
358 setDataTransportPort(*s,transport_port);
359 setControlTransportPort(*s,0);
360 sourceLink.initStats();
361 sourceLink.setProbation(getMinValidPacketSequence());
362 }
363 }
364 }
365 return result;
366}
367
368bool
369IncomingDataQueue::insertRecvPacket(IncomingRTPPktLink* packetLink)
370{
371 SyncSourceLink *srcLink = packetLink->getSourceLink();
372 unsigned short seq = packetLink->getPacket()->getSeqNum();
373 recvLock.writeLock();
374 IncomingRTPPktLink* plink = srcLink->getLast();
375 if ( plink && (seq < plink->getPacket()->getSeqNum()) ) {
376 // a disordered packet, so look for its place
377 while ( plink && (seq < plink->getPacket()->getSeqNum()) ){
378 // the packet is a duplicate
379 if ( seq == plink->getPacket()->getSeqNum() ) {
380 recvLock.unlock();
381 VDL(("Duplicated disordered packet: seqnum %d, SSRC:",
382 seq,srcLink->getSource()->getID()));
383 delete packetLink->getPacket();
384 delete packetLink;
385 return false;
386 }
387 plink = plink->getSrcPrev();
388 }
389 if ( !plink ) {
390 // we have scanned the whole (and non empty)
391 // list, so this must be the older (first)
392 // packet from this source.
393
394 // insert into the source specific queue
395 IncomingRTPPktLink* srcFirst = srcLink->getFirst();
396 srcFirst->setSrcPrev(packetLink);
397 packetLink->setSrcNext(srcFirst);
398 // insert into the global queue
399 IncomingRTPPktLink* prevFirst = srcFirst->getPrev();
400 if ( prevFirst ){
401 prevFirst->setNext(packetLink);
402 packetLink->setPrev(prevFirst);
403 }
404 srcFirst->setPrev(packetLink);
405 packetLink->setNext(srcFirst);
406 srcLink->setFirst(packetLink);
407 } else {
408 // (we are in the middle of the source list)
409 // insert into the source specific queue
410 plink->getSrcNext()->setSrcPrev(packetLink);
411 packetLink->setSrcNext(plink->getSrcNext());
412 // -- insert into the global queue, with the
413 // minimum priority compared to packets from
414 // other sources
415 plink->getSrcNext()->getPrev()->setNext(packetLink);
416 packetLink->setPrev(plink->getSrcNext()->getPrev());
417 plink->getSrcNext()->setPrev(packetLink);
418 packetLink->setNext(plink->getSrcNext());
419 // ------
420 plink->setSrcNext(packetLink);
421 packetLink->setSrcPrev(plink);
422 // insert into the global queue (giving
423 // priority compared to packets from other sources)
424 //list->getNext->setPrev(packetLink);
425 //packetLink->setNext(list->getNext);
426 //list->setNext(packet);
427 //packet->setPrev(list);
428 }
429 } else {
430 // An ordered packet
431 if ( !plink ) {
432 // the only packet in the source specific queue
433 srcLink->setLast(packetLink);
434 srcLink->setFirst(packetLink);
435 // the last packet in the global queue
436 if ( recvLast ) {
437 recvLast->setNext(packetLink);
438 packetLink->setPrev(recvLast);
439 }
440 recvLast = packetLink;
441 if ( !recvFirst )
442 recvFirst = packetLink;
443 } else {
444 // there are already more packets from this source.
445 // this ignores duplicate packets
446 if ( plink && (seq == plink->getPacket()->getSeqNum()) ) {
447 VDL(("Duplicated packet: seqnum %d, SSRC:",
448 seq,srcLink->getSource->getID()));
449 recvLock.unlock();
450 delete packetLink->getPacket();
451 delete packetLink;
452 return false;
453 }
454 // the last packet in the source specific queue
455 srcLink->getLast()->setSrcNext(packetLink);
456 packetLink->setSrcPrev(srcLink->getLast());
457 srcLink->setLast(packetLink);
458 // the last packet in the global queue
459 recvLast->setNext(packetLink);
460 packetLink->setPrev(recvLast);
461 recvLast = packetLink;
462 }
463 }
464 // account the insertion of this packet into the queue
465 srcLink->recordInsertion(*packetLink);
466 recvLock.unlock();
467 // packet successfully inserted
468 return true;
469}
470
471const AppDataUnit*
472IncomingDataQueue::getData(uint32 stamp, const SyncSource* src)
473{
474 IncomingRTPPktLink* pl;
475// unsigned count = 0;
476 AppDataUnit* result;
477
478 if ( NULL != (pl = getWaiting(stamp,src)) ) {
479 IncomingRTPPkt* packet = pl->getPacket();
480// size_t len = packet->getPayloadSize();
481
482 SyncSource &src = *(pl->getSourceLink()->getSource());
483 result = new AppDataUnit(*packet,src);
484
485 // delete the packet link, but not the packet
486 delete pl;
487// count += len;
488 } else {
489 result = NULL;
490 }
491 return result;
492}
493
494// FIX: try to merge and organize
495IncomingDataQueue::IncomingRTPPktLink*
496IncomingDataQueue::getWaiting(uint32 timestamp, const SyncSource* src)
497{
498 if ( src && !isMine(*src) )
499 return NULL;
500
501 IncomingRTPPktLink *result;
502 recvLock.writeLock();
503 if ( src != NULL ) {
504 // process source specific queries:
505 // we will modify the queue of this source
506 SyncSourceLink* srcm = getLink(*src);
507
508 // first, delete all older packets. The while loop
509 // down here counts how many older packets are there;
510 // then the for loop deletes them and advances l till
511 // the first non older packet.
512 int nold = 0;
513 IncomingRTPPktLink* l = srcm->getFirst();
514 if ( !l ) {
515 result = NULL;
516 recvLock.unlock();
517 return result;
518 }
519 while ( l && ((l->getTimestamp() < timestamp) ||
520 end2EndDelayed(*l))) {
521 nold++;
522 l = l->getSrcNext();
523 }
524 // to know whether the global queue gets empty
525 bool nonempty = false;
526 for ( int i = 0; i < nold; i++) {
527 l = srcm->getFirst();
528 srcm->setFirst(srcm->getFirst()->getSrcNext());;
529 // unlink from the global queue
530 nonempty = false;
531 if ( l->getPrev() ){
532 nonempty = true;
533 l->getPrev()->setNext(l->getNext());
534 } if ( l->getNext() ) {
535 nonempty = true;
536 l->getNext()->setPrev(l->getPrev());
537 }
538 // now, delete it
539 onExpireRecv(*(l->getPacket()));// notify packet discard
540 delete l->getPacket();
541 delete l;
542 }
543 // return the packet, if found
544 if ( !srcm->getFirst() ) {
545 // threre are no more packets from this source
546 srcm->setLast(NULL);
547 if ( !nonempty )
548 recvFirst = recvLast = NULL;
549 result = NULL;
550 } else if ( srcm->getFirst()->getTimestamp() > timestamp ) {
551 // threre are only newer packets from this source
552 srcm->getFirst()->setSrcPrev(NULL);
553 result = NULL;
554 } else {
555 // (src->getFirst()->getTimestamp() == stamp) is true
556 result = srcm->getFirst();
557 // unlink the selected packet from the global queue
558 if ( result->getPrev() )
559 result->getPrev()->setNext(result->getNext());
560 else
561 recvFirst = result->getNext();
562 if ( result->getNext() )
563 result->getNext()->setPrev(result->getPrev());
564 else
565 recvLast = result->getPrev();
566 // unlink the selected packet from the source queue
567 srcm->setFirst(result->getSrcNext());
568 if ( srcm->getFirst() )
569 srcm->getFirst()->setPrev(NULL);
570 else
571 srcm->setLast(NULL);
572 }
573 } else {
574 // process source unspecific queries
575 int nold = 0;
576 IncomingRTPPktLink* l = recvFirst;
577 while ( l && (l->getTimestamp() < timestamp ||
578 end2EndDelayed(*l) ) ){
579 nold++;
580 l = l->getNext();
581 }
582 for (int i = 0; i < nold; i++) {
583 IncomingRTPPktLink* l = recvFirst;
584 recvFirst = recvFirst->getNext();
585 // unlink the packet from the queue of its source
586 SyncSourceLink* src = l->getSourceLink();
587 src->setFirst(l->getSrcNext());
588 if ( l->getSrcNext() )
589 l->getSrcNext()->setSrcPrev(NULL);
590 else
591 src->setLast(NULL);
592 // now, delete it
593 onExpireRecv(*(l->getPacket()));// notify packet discard
594 delete l->getPacket();
595 delete l;
596 }
597
598 // return the packet, if found
599 if ( !recvFirst ) {
600 // there are no more packets in the queue
601 recvLast = NULL;
602 result = NULL;
603 } else if ( recvFirst->getTimestamp() > timestamp ) {
604 // there are only newer packets in the queue
605 l->setPrev(NULL);
606 result = NULL;
607 } else {
608 // (recvFirst->getTimestamp() == stamp) is true
609 result = recvFirst;
610 // unlink the selected packet from the global queue
611 recvFirst = recvFirst->getNext();
612 if ( recvFirst )
613 recvFirst->setPrev(NULL);
614 else
615 recvLast = NULL;
616 // unlink the selected packet from the queue
617 // of its source
618 SyncSourceLink* src = result->getSourceLink();
619 src->setFirst(result->getSrcNext());
620 if ( src->getFirst() )
621 src->getFirst()->setSrcPrev(NULL);
622 else
623 src->setLast(NULL);
624 }
625 }
626 recvLock.unlock();
627 return result;
628}
629
630bool
631IncomingDataQueue::recordReception(SyncSourceLink& srcLink,
632const IncomingRTPPkt& pkt, const timeval recvtime)
633{
634 bool result = true;
635
636 // Source validation.
637 SyncSource* src = srcLink.getSource();
638 if ( !(srcLink.isValid()) ) {
639 // source is not yet valid.
640 if ( pkt.getSeqNum() == srcLink.getMaxSeqNum() + 1 ) {
641 // packet in sequence.
642 srcLink.decProbation();
643 if ( srcLink.isValid() ) {
644 // source has become valid.
645 // TODO: avoid this the first time.
646 srcLink.initSequence(pkt.getSeqNum());
647 } else {
648 result = false;
649 }
650 } else {
651 // packet not in sequence.
652 srcLink.probation = getMinValidPacketSequence() - 1;
653 result = false;
654 }
655 srcLink.setMaxSeqNum(pkt.getSeqNum());
656 } else {
657 // source was already valid.
658 uint16 step = pkt.getSeqNum() - srcLink.getMaxSeqNum();
659 if ( step < getMaxPacketDropout() ) {
660 // Ordered, with not too high step.
661 if ( pkt.getSeqNum() < srcLink.getMaxSeqNum() ) {
662 // sequene number wrapped.
663 srcLink.incSeqNumAccum();
664 }
665 srcLink.setMaxSeqNum(pkt.getSeqNum());
666 } else if ( step <= (SEQNUMMOD - getMaxPacketMisorder()) ) {
667 // too high step of the sequence number.
668 if ( pkt.getSeqNum() == srcLink.getBadSeqNum() ) {
669 srcLink.initSequence(pkt.getSeqNum());
670 } else {
671 srcLink.setBadSeqNum((pkt.getSeqNum() + 1) &
672 (SEQNUMMOD - 1) );
673 //This additional check avoids that
674 //the very first packet from a source
675 //be discarded.
676 if ( 0 < srcLink.getObservedPacketCount() ) {
677 result = false;
678 } else {
679 srcLink.setMaxSeqNum(pkt.getSeqNum());
680 }
681 }
682 } else {
683 // duplicate or reordered packet
684 }
685 }
686
687 if ( result ) {
688 // the packet is considered valid.
689 srcLink.incObservedPacketCount();
690 srcLink.incObservedOctetCount(pkt.getPayloadSize());
691 srcLink.lastPacketTime = recvtime;
692 if ( srcLink.getObservedPacketCount() == 1 ) {
693 // ooops, it's the first packet from this source
694 setSender(*src,true);
695 srcLink.setInitialDataTimestamp(pkt.getTimestamp());
696 }
697 // we record the last time a packet from this source
698 // was received, this has statistical interest and is
699 // needed to time out old senders that are no sending
700 // any longer.
701
702 // compute the interarrival jitter estimation.
703 timeval tarrival;
704 timeval lastT = srcLink.getLastPacketTime();
705 timeval initial = srcLink.getInitialDataTime();
706 timersub(&lastT,&initial,&tarrival);
707 uint32 arrival = timeval2microtimeout(tarrival)
708 * getCurrentRTPClockRate();
709 uint32 transitTime = arrival - pkt.getTimestamp();
710 int32 delta = transitTime -
711 srcLink.getLastPacketTransitTime();
712 srcLink.setLastPacketTransitTime(transitTime);
713 if ( delta < 0 )
714 delta = -delta;
715 srcLink.setJitter( srcLink.getJitter() +
716 (1.0f / 16.0f) *
717 (static_cast<float>(delta) -
718 srcLink.getJitter()));
719 }
720 return result;
721}
722
723void
724IncomingDataQueue::recordExtraction(const IncomingRTPPkt&)
725{}
726
727void
728IncomingDataQueue::setInQueueCryptoContext(CryptoContext* cc)
729{
730 std::list<CryptoContext *>::iterator i;
731
732 MutexLock lock(cryptoMutex);
733 // check if a CryptoContext for a SSRC already exists. If yes
734 // remove it from list before inserting the new one.
735 for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ) {
736 if( (*i)->getSsrc() == cc->getSsrc() ) {
737 CryptoContext* tmp = *i;
738 cryptoContexts.erase(i);
739 delete tmp;
740 break;
741 }
742 }
743 cryptoContexts.push_back(cc);
744}
745
746void
747IncomingDataQueue::removeInQueueCryptoContext(CryptoContext* cc)
748{
749 std::list<CryptoContext *>::iterator i;
750
751 MutexLock lock(cryptoMutex);
752 if (cc == NULL) { // Remove any incoming crypto contexts
753 for (i = cryptoContexts.begin(); i != cryptoContexts.end(); ) {
754 CryptoContext* tmp = *i;
755 i = cryptoContexts.erase(i);
756 delete tmp;
757 }
758 }
759 else {
760 for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ){
761 if( (*i)->getSsrc() == cc->getSsrc() ) {
762 CryptoContext* tmp = *i;
763 cryptoContexts.erase(i);
764 delete tmp;
765 return;
766 }
767 }
768 }
769}
770
771CryptoContext*
772IncomingDataQueue::getInQueueCryptoContext(uint32 ssrc)
773{
774 std::list<CryptoContext *>::iterator i;
775
776 MutexLock lock(cryptoMutex);
777 for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ){
778 if( (*i)->getSsrc() == ssrc) {
779 return (*i);
780 }
781 }
782 return NULL;
783}
784
785#ifdef CCXX_NAMESPACES
786}
787#endif
788
789/** EMACS **
790 * Local variables:
791 * mode: c++
792 * c-basic-offset: 4
793 * End:
794 */