blob: cd8219461f1159e324ac950807eff98ee9a06cb3 [file] [log] [blame]
Emeric Vigier2f625822012-08-06 11:09:52 -04001// Copyright (C) 2001,2002,2004 Federico Montesino Pouzols <fedemp@altern.org>
2//
3// This program is free software; you can redistribute it and/or modify
4// it under the terms of the GNU General Public License as published by
5// the Free Software Foundation; either version 2 of the License, or
6// (at your option) any later version.
7//
8// This program is distributed in the hope that it will be useful,
9// but WITHOUT ANY WARRANTY; without even the implied warranty of
10// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11// GNU General Public License for more details.
12//
13// You should have received a copy of the GNU General Public License
14// along with this program; if not, write to the Free Software
15// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
16//
17// As a special exception, you may use this file as part of a free software
18// library without restriction. Specifically, if other files instantiate
19// templates or use macros or inline functions from this file, or you compile
20// this file and link it with other files to produce an executable, this
21// file does not by itself cause the resulting executable to be covered by
22// the GNU General Public License. This exception does not however
23// invalidate any other reasons why the executable file might be covered by
24// the GNU General Public License.
25//
26// This exception applies only to the code released under the name GNU
27// ccRTP. If you copy code from other releases into a copy of GNU
28// ccRTP, as the General Public License permits, the exception does
29// not apply to the code that you add in this way. To avoid misleading
30// anyone as to the status of such modified files, you must delete
31// this exception notice from them.
32//
33// If you write modifications of your own for GNU ccRTP, it is your choice
34// whether to permit this exception to apply to your modifications.
35// If you do not wish that, delete this exception notice.
36//
37
38#include "private.h"
39#include <ccrtp/iqueue.h>
40
Alexandre Lisionddd731e2014-01-31 11:50:08 -050041NAMESPACE_COMMONCPP
Emeric Vigier2f625822012-08-06 11:09:52 -040042
43const size_t IncomingDataQueueBase::defaultMaxRecvPacketSize = 65534;
44
45ConflictHandler::ConflictingTransportAddress::
46ConflictingTransportAddress(InetAddress na,tpport_t dtp, tpport_t ctp):
47networkAddress(na), dataTransportPort(dtp),
48controlTransportPort(ctp), next(NULL)
49{
Alexandre Lisionddd731e2014-01-31 11:50:08 -050050 SysTime::gettimeofday(&lastPacketTime,NULL);
Emeric Vigier2f625822012-08-06 11:09:52 -040051}
52
53ConflictHandler::ConflictingTransportAddress*
54ConflictHandler::searchDataConflict(InetAddress na, tpport_t dtp)
55{
56 ConflictingTransportAddress* result = firstConflict;
57 while ( result->networkAddress != na ||
58 result->dataTransportPort != dtp)
59 result = result->next;
60 return result;
61}
62
63ConflictHandler::ConflictingTransportAddress*
64ConflictHandler::searchControlConflict(InetAddress na, tpport_t ctp)
65{
66 ConflictingTransportAddress* result = firstConflict;
67 while ( result &&
68 (result->networkAddress != na ||
69 result->controlTransportPort != ctp) )
70 result = result->next;
71 return result;
72}
73
74void
75ConflictHandler::addConflict(const InetAddress& na, tpport_t dtp, tpport_t ctp)
76{
77 ConflictingTransportAddress* nc =
78 new ConflictingTransportAddress(na,dtp,ctp);
79
80 if ( lastConflict ) {
81 lastConflict->setNext(nc);
82 lastConflict = nc;
83 } else {
84 firstConflict = lastConflict = nc;
85 }
86}
87
88const uint8 IncomingDataQueue::defaultMinValidPacketSequence = 0;
89const uint16 IncomingDataQueue::defaultMaxPacketMisorder = 0;
90const uint16 IncomingDataQueue::defaultMaxPacketDropout = 3000;
91const size_t IncomingDataQueue::defaultMembersSize =
92MembershipBookkeeping::defaultMembersHashSize;
93
94IncomingDataQueue::IncomingDataQueue(uint32 size) :
95IncomingDataQueueBase(), MembershipBookkeeping(size)
96{
97 recvFirst = recvLast = NULL;
98 sourceExpirationPeriod = 5; // 5 RTCP report intervals
99 minValidPacketSequence = getDefaultMinValidPacketSequence();
100 maxPacketDropout = getDefaultMaxPacketDropout();
101 maxPacketMisorder = getDefaultMaxPacketMisorder();
102}
103
104void
105IncomingDataQueue::purgeIncomingQueue()
106{
107 IncomingRTPPktLink* recvnext;
108 // flush the reception queue (incoming packets not yet
109 // retrieved)
110 recvLock.writeLock();
111 while( recvFirst )
112 {
113 recvnext = recvFirst->getNext();
114
115 // nullify source specific packet list
116 SyncSourceLink *s = recvFirst->getSourceLink();
117 s->setFirst(NULL);
118 s->setLast(NULL);
119
120 delete recvFirst->getPacket();
121 delete recvFirst;
122 recvFirst = recvnext;
123 }
124 recvLock.unlock();
125}
126
127void
128IncomingDataQueue::renewLocalSSRC()
129{
130 const uint32 MAXTRIES = 20;
131 uint32 newssrc;
132 uint16 tries = 0;
133 do {
134 newssrc = random32();
135 tries++;
136 } while ( (tries < MAXTRIES) && isRegistered(newssrc) );
137
138 if ( MAXTRIES == tries ) {
139 // TODO we are in real trouble.
140 }
141}
142
143bool
144IncomingDataQueue::isWaiting(const SyncSource* src) const
145{
146 bool w;
147 recvLock.readLock();
148 if ( NULL == src )
149 w = ( NULL != recvFirst);
150 else
151 w = isMine(*src) && ( NULL != getLink(*src)->getFirst() );
152
153 recvLock.unlock();
154 return w;
155}
156
157uint32
158IncomingDataQueue::getFirstTimestamp(const SyncSource* src) const
159{
160 recvLock.readLock();
161
162 // get the first packet
163 IncomingRTPPktLink* packetLink;
164 if ( NULL == src )
165 packetLink = recvFirst;
166 else
167 packetLink = isMine(*src) ? getLink(*src)->getFirst() : NULL;
168
169 // get the timestamp of the first packet
170 uint32 ts;
171 if ( packetLink )
172 ts = packetLink->getTimestamp();
173 else
174 ts = 0l;
175
176 recvLock.unlock();
177 return ts;
178}
179
180size_t
181IncomingDataQueue::takeInDataPacket(void)
182{
183 InetHostAddress network_address;
184 tpport_t transport_port;
185
186 uint32 nextSize = (uint32)getNextDataPacketSize();
187 unsigned char* buffer = new unsigned char[nextSize];
188 int32 rtn = (int32)recvData(buffer,nextSize,network_address,transport_port);
189 if ( (rtn < 0) || ((uint32)rtn > getMaxRecvPacketSize()) ){
190 delete buffer;
191 return 0;
192 }
193
194 // get time of arrival
195 struct timeval recvtime;
Alexandre Lisionddd731e2014-01-31 11:50:08 -0500196 SysTime::gettimeofday(&recvtime,NULL);
Emeric Vigier2f625822012-08-06 11:09:52 -0400197
198 // Special handling of padding to take care of encrypted content.
199 // In case of SRTP the padding length field is also encrypted, thus
200 // it gives a wrong length. Check and clear padding bit before
201 // creating the RTPPacket. Will be set and re-computed after a possible
202 // SRTP decryption.
203 uint8 padSet = (*buffer & 0x20);
204 if (padSet) {
205 *buffer = *buffer & ~0x20; // clear padding bit
Alexandre Lisionddd731e2014-01-31 11:50:08 -0500206 }
Emeric Vigier2f625822012-08-06 11:09:52 -0400207 // build a packet. It will link itself to its source
208 IncomingRTPPkt* packet =
209 new IncomingRTPPkt(buffer,rtn);
210
211 // Generic header validity check.
212 if ( !packet->isHeaderValid() ) {
213 delete packet;
214 return 0;
215 }
216
217 CryptoContext* pcc = getInQueueCryptoContext( packet->getSSRC());
218 if (pcc == NULL) {
219 pcc = getInQueueCryptoContext(0);
220 if (pcc != NULL) {
221 pcc = pcc->newCryptoContextForSSRC(packet->getSSRC(), 0, 0L);
222 if (pcc != NULL) {
223 pcc->deriveSrtpKeys(0);
224 setInQueueCryptoContext(pcc);
225 }
226 }
227 }
228 if (pcc != NULL) {
229 int32 ret = packet->unprotect(pcc);
230 if (ret < 0) {
231 if (!onSRTPPacketError(*packet, ret)) {
232 delete packet;
233 return 0;
234 }
235 }
236 }
237 if (padSet) {
238 packet->reComputePayLength(true);
239 }
240 // virtual for profile-specific validation and processing.
241 if ( !onRTPPacketRecv(*packet) ) {
242 delete packet;
243 return 0;
244 }
245
246 bool source_created;
247 SyncSourceLink* sourceLink =
248 getSourceBySSRC(packet->getSSRC(),source_created);
249 SyncSource* s = sourceLink->getSource();
250 if ( source_created ) {
251 // Set data transport address.
252 setDataTransportPort(*s,transport_port);
253 // Network address is assumed to be the same as the control one
254 setNetworkAddress(*s,network_address);
255 sourceLink->initStats();
256 // First packet arrival time.
257 sourceLink->setInitialDataTime(recvtime);
258 sourceLink->setProbation(getMinValidPacketSequence());
259 if ( sourceLink->getHello() )
260 onNewSyncSource(*s);
261 } else if ( 0 == s->getDataTransportPort() ) {
262 // Test if RTCP packets had been received but this is the
263 // first data packet from this source.
264 setDataTransportPort(*s,transport_port);
265 }
266
267 // Before inserting in the queue,
268 // 1) check for collisions and loops. If the packet cannot be
269 // assigned to a source, it will be rejected.
270 // 2) check the source is a sufficiently well known source
271 // TODO: also check CSRC identifiers.
272 if ( checkSSRCInIncomingRTPPkt(*sourceLink,source_created,
273 network_address,transport_port) &&
274 recordReception(*sourceLink,*packet,recvtime) ) {
275 // now the packet link is linked in the queues
276 IncomingRTPPktLink* packetLink =
277 new IncomingRTPPktLink(packet,
278 sourceLink,
279 recvtime,
280 packet->getTimestamp() -
281 sourceLink->getInitialDataTimestamp(),
282 NULL,NULL,NULL,NULL);
283 insertRecvPacket(packetLink);
284 } else {
285 // must be discarded due to collision or loop or
286 // invalid source
287 delete packet;
288 }
289
290 // ccRTP keeps packets from the new source, but avoids
291 // flip-flopping. This allows losing less packets and for
292 // mobile telephony applications or other apps that may change
293 // the source transport address during the session.
294 return rtn;
295}
296
297bool IncomingDataQueue::checkSSRCInIncomingRTPPkt(SyncSourceLink& sourceLink,
298bool is_new, InetAddress& network_address, tpport_t transport_port)
299{
300 bool result = true;
301
302 // Test if the source is new and it is not the local one.
303 if ( is_new &&
304 sourceLink.getSource()->getID() != getLocalSSRC() )
305 return result;
306
307 SyncSource *s = sourceLink.getSource();
308
309 if ( s->getDataTransportPort() != transport_port ||
310 s->getNetworkAddress() != network_address ) {
311 // SSRC collision or a loop has happened
312 if ( s->getID() != getLocalSSRC() ) {
313 // TODO: Optional error counter.
314
315 // Note this differs from the default in the RFC.
316 // Discard packet only when the collision is
317 // repeating (to avoid flip-flopping)
318 if ( sourceLink.getPrevConflict() &&
319 (
320 (network_address ==
321 sourceLink.getPrevConflict()->networkAddress)
322 &&
323 (transport_port ==
324 sourceLink.getPrevConflict()->dataTransportPort)
325 ) ) {
326 // discard packet and do not flip-flop
327 result = false;
328 } else {
329 // Record who has collided so that in
330 // the future we can how if the
331 // collision repeats.
332 sourceLink.setPrevConflict(network_address,
333 transport_port,0);
334 // Change sync source transport address
335 setDataTransportPort(*s,transport_port);
336 setNetworkAddress(*s,network_address);
337 }
338
339 } else {
340 // Collision or loop of own packets.
341 ConflictingTransportAddress* conflicting =
342 searchDataConflict(network_address,
343 transport_port);
344 if ( conflicting ) {
345 // Optional error counter.
346 updateConflict(*conflicting);
347 result = false;
348 } else {
349 // New collision
350 addConflict(s->getNetworkAddress(),
351 s->getDataTransportPort(),
352 s->getControlTransportPort());
353 dispatchBYE("SSRC collision detected when receiving data packet.");
354 renewLocalSSRC();
355 setNetworkAddress(*s,network_address);
356 setDataTransportPort(*s,transport_port);
357 setControlTransportPort(*s,0);
358 sourceLink.initStats();
359 sourceLink.setProbation(getMinValidPacketSequence());
360 }
361 }
362 }
363 return result;
364}
365
366bool
367IncomingDataQueue::insertRecvPacket(IncomingRTPPktLink* packetLink)
368{
369 SyncSourceLink *srcLink = packetLink->getSourceLink();
370 unsigned short seq = packetLink->getPacket()->getSeqNum();
371 recvLock.writeLock();
372 IncomingRTPPktLink* plink = srcLink->getLast();
373 if ( plink && (seq < plink->getPacket()->getSeqNum()) ) {
374 // a disordered packet, so look for its place
375 while ( plink && (seq < plink->getPacket()->getSeqNum()) ){
376 // the packet is a duplicate
377 if ( seq == plink->getPacket()->getSeqNum() ) {
378 recvLock.unlock();
379 VDL(("Duplicated disordered packet: seqnum %d, SSRC:",
380 seq,srcLink->getSource()->getID()));
381 delete packetLink->getPacket();
382 delete packetLink;
383 return false;
384 }
385 plink = plink->getSrcPrev();
386 }
387 if ( !plink ) {
388 // we have scanned the whole (and non empty)
389 // list, so this must be the older (first)
390 // packet from this source.
391
392 // insert into the source specific queue
393 IncomingRTPPktLink* srcFirst = srcLink->getFirst();
394 srcFirst->setSrcPrev(packetLink);
395 packetLink->setSrcNext(srcFirst);
396 // insert into the global queue
397 IncomingRTPPktLink* prevFirst = srcFirst->getPrev();
398 if ( prevFirst ){
399 prevFirst->setNext(packetLink);
400 packetLink->setPrev(prevFirst);
401 }
402 srcFirst->setPrev(packetLink);
403 packetLink->setNext(srcFirst);
404 srcLink->setFirst(packetLink);
405 } else {
406 // (we are in the middle of the source list)
407 // insert into the source specific queue
408 plink->getSrcNext()->setSrcPrev(packetLink);
409 packetLink->setSrcNext(plink->getSrcNext());
410 // -- insert into the global queue, with the
411 // minimum priority compared to packets from
412 // other sources
413 plink->getSrcNext()->getPrev()->setNext(packetLink);
414 packetLink->setPrev(plink->getSrcNext()->getPrev());
415 plink->getSrcNext()->setPrev(packetLink);
416 packetLink->setNext(plink->getSrcNext());
417 // ------
418 plink->setSrcNext(packetLink);
419 packetLink->setSrcPrev(plink);
420 // insert into the global queue (giving
421 // priority compared to packets from other sources)
422 //list->getNext->setPrev(packetLink);
423 //packetLink->setNext(list->getNext);
424 //list->setNext(packet);
425 //packet->setPrev(list);
426 }
427 } else {
428 // An ordered packet
429 if ( !plink ) {
430 // the only packet in the source specific queue
431 srcLink->setLast(packetLink);
432 srcLink->setFirst(packetLink);
433 // the last packet in the global queue
434 if ( recvLast ) {
435 recvLast->setNext(packetLink);
436 packetLink->setPrev(recvLast);
437 }
438 recvLast = packetLink;
439 if ( !recvFirst )
440 recvFirst = packetLink;
441 } else {
442 // there are already more packets from this source.
443 // this ignores duplicate packets
444 if ( plink && (seq == plink->getPacket()->getSeqNum()) ) {
445 VDL(("Duplicated packet: seqnum %d, SSRC:",
446 seq,srcLink->getSource->getID()));
447 recvLock.unlock();
448 delete packetLink->getPacket();
449 delete packetLink;
450 return false;
451 }
452 // the last packet in the source specific queue
453 srcLink->getLast()->setSrcNext(packetLink);
454 packetLink->setSrcPrev(srcLink->getLast());
455 srcLink->setLast(packetLink);
456 // the last packet in the global queue
457 recvLast->setNext(packetLink);
458 packetLink->setPrev(recvLast);
459 recvLast = packetLink;
460 }
461 }
462 // account the insertion of this packet into the queue
463 srcLink->recordInsertion(*packetLink);
464 recvLock.unlock();
465 // packet successfully inserted
466 return true;
467}
468
469const AppDataUnit*
470IncomingDataQueue::getData(uint32 stamp, const SyncSource* src)
471{
472 IncomingRTPPktLink* pl;
473// unsigned count = 0;
474 AppDataUnit* result;
475
476 if ( NULL != (pl = getWaiting(stamp,src)) ) {
477 IncomingRTPPkt* packet = pl->getPacket();
478// size_t len = packet->getPayloadSize();
479
480 SyncSource &src = *(pl->getSourceLink()->getSource());
481 result = new AppDataUnit(*packet,src);
482
483 // delete the packet link, but not the packet
484 delete pl;
485// count += len;
486 } else {
487 result = NULL;
488 }
489 return result;
490}
491
492// FIX: try to merge and organize
493IncomingDataQueue::IncomingRTPPktLink*
494IncomingDataQueue::getWaiting(uint32 timestamp, const SyncSource* src)
495{
496 if ( src && !isMine(*src) )
497 return NULL;
498
499 IncomingRTPPktLink *result;
500 recvLock.writeLock();
501 if ( src != NULL ) {
502 // process source specific queries:
503 // we will modify the queue of this source
504 SyncSourceLink* srcm = getLink(*src);
505
506 // first, delete all older packets. The while loop
507 // down here counts how many older packets are there;
508 // then the for loop deletes them and advances l till
509 // the first non older packet.
510 int nold = 0;
511 IncomingRTPPktLink* l = srcm->getFirst();
512 if ( !l ) {
513 result = NULL;
514 recvLock.unlock();
515 return result;
516 }
517 while ( l && ((l->getTimestamp() < timestamp) ||
518 end2EndDelayed(*l))) {
519 nold++;
520 l = l->getSrcNext();
521 }
522 // to know whether the global queue gets empty
523 bool nonempty = false;
524 for ( int i = 0; i < nold; i++) {
525 l = srcm->getFirst();
526 srcm->setFirst(srcm->getFirst()->getSrcNext());;
527 // unlink from the global queue
528 nonempty = false;
529 if ( l->getPrev() ){
530 nonempty = true;
531 l->getPrev()->setNext(l->getNext());
532 } if ( l->getNext() ) {
533 nonempty = true;
534 l->getNext()->setPrev(l->getPrev());
535 }
536 // now, delete it
537 onExpireRecv(*(l->getPacket()));// notify packet discard
538 delete l->getPacket();
539 delete l;
540 }
541 // return the packet, if found
542 if ( !srcm->getFirst() ) {
543 // threre are no more packets from this source
544 srcm->setLast(NULL);
545 if ( !nonempty )
546 recvFirst = recvLast = NULL;
547 result = NULL;
548 } else if ( srcm->getFirst()->getTimestamp() > timestamp ) {
549 // threre are only newer packets from this source
550 srcm->getFirst()->setSrcPrev(NULL);
551 result = NULL;
552 } else {
553 // (src->getFirst()->getTimestamp() == stamp) is true
554 result = srcm->getFirst();
555 // unlink the selected packet from the global queue
556 if ( result->getPrev() )
557 result->getPrev()->setNext(result->getNext());
558 else
559 recvFirst = result->getNext();
560 if ( result->getNext() )
561 result->getNext()->setPrev(result->getPrev());
562 else
563 recvLast = result->getPrev();
564 // unlink the selected packet from the source queue
565 srcm->setFirst(result->getSrcNext());
566 if ( srcm->getFirst() )
567 srcm->getFirst()->setPrev(NULL);
568 else
569 srcm->setLast(NULL);
570 }
571 } else {
572 // process source unspecific queries
573 int nold = 0;
574 IncomingRTPPktLink* l = recvFirst;
575 while ( l && (l->getTimestamp() < timestamp ||
576 end2EndDelayed(*l) ) ){
577 nold++;
578 l = l->getNext();
579 }
580 for (int i = 0; i < nold; i++) {
581 IncomingRTPPktLink* l = recvFirst;
582 recvFirst = recvFirst->getNext();
583 // unlink the packet from the queue of its source
584 SyncSourceLink* src = l->getSourceLink();
585 src->setFirst(l->getSrcNext());
586 if ( l->getSrcNext() )
587 l->getSrcNext()->setSrcPrev(NULL);
588 else
589 src->setLast(NULL);
590 // now, delete it
591 onExpireRecv(*(l->getPacket()));// notify packet discard
592 delete l->getPacket();
593 delete l;
594 }
595
596 // return the packet, if found
597 if ( !recvFirst ) {
598 // there are no more packets in the queue
599 recvLast = NULL;
600 result = NULL;
601 } else if ( recvFirst->getTimestamp() > timestamp ) {
602 // there are only newer packets in the queue
603 l->setPrev(NULL);
604 result = NULL;
605 } else {
606 // (recvFirst->getTimestamp() == stamp) is true
607 result = recvFirst;
608 // unlink the selected packet from the global queue
609 recvFirst = recvFirst->getNext();
610 if ( recvFirst )
611 recvFirst->setPrev(NULL);
612 else
613 recvLast = NULL;
614 // unlink the selected packet from the queue
615 // of its source
616 SyncSourceLink* src = result->getSourceLink();
617 src->setFirst(result->getSrcNext());
618 if ( src->getFirst() )
619 src->getFirst()->setSrcPrev(NULL);
620 else
621 src->setLast(NULL);
622 }
623 }
624 recvLock.unlock();
625 return result;
626}
627
628bool
629IncomingDataQueue::recordReception(SyncSourceLink& srcLink,
630const IncomingRTPPkt& pkt, const timeval recvtime)
631{
632 bool result = true;
633
634 // Source validation.
635 SyncSource* src = srcLink.getSource();
636 if ( !(srcLink.isValid()) ) {
637 // source is not yet valid.
638 if ( pkt.getSeqNum() == srcLink.getMaxSeqNum() + 1 ) {
639 // packet in sequence.
640 srcLink.decProbation();
641 if ( srcLink.isValid() ) {
642 // source has become valid.
643 // TODO: avoid this the first time.
644 srcLink.initSequence(pkt.getSeqNum());
645 } else {
646 result = false;
647 }
648 } else {
649 // packet not in sequence.
650 srcLink.probation = getMinValidPacketSequence() - 1;
651 result = false;
652 }
653 srcLink.setMaxSeqNum(pkt.getSeqNum());
654 } else {
655 // source was already valid.
656 uint16 step = pkt.getSeqNum() - srcLink.getMaxSeqNum();
657 if ( step < getMaxPacketDropout() ) {
658 // Ordered, with not too high step.
659 if ( pkt.getSeqNum() < srcLink.getMaxSeqNum() ) {
660 // sequene number wrapped.
661 srcLink.incSeqNumAccum();
662 }
663 srcLink.setMaxSeqNum(pkt.getSeqNum());
664 } else if ( step <= (SEQNUMMOD - getMaxPacketMisorder()) ) {
665 // too high step of the sequence number.
666 if ( pkt.getSeqNum() == srcLink.getBadSeqNum() ) {
667 srcLink.initSequence(pkt.getSeqNum());
668 } else {
669 srcLink.setBadSeqNum((pkt.getSeqNum() + 1) &
670 (SEQNUMMOD - 1) );
671 //This additional check avoids that
672 //the very first packet from a source
673 //be discarded.
674 if ( 0 < srcLink.getObservedPacketCount() ) {
675 result = false;
676 } else {
677 srcLink.setMaxSeqNum(pkt.getSeqNum());
678 }
679 }
680 } else {
681 // duplicate or reordered packet
682 }
683 }
684
685 if ( result ) {
686 // the packet is considered valid.
687 srcLink.incObservedPacketCount();
688 srcLink.incObservedOctetCount(pkt.getPayloadSize());
689 srcLink.lastPacketTime = recvtime;
690 if ( srcLink.getObservedPacketCount() == 1 ) {
691 // ooops, it's the first packet from this source
692 setSender(*src,true);
693 srcLink.setInitialDataTimestamp(pkt.getTimestamp());
694 }
695 // we record the last time a packet from this source
696 // was received, this has statistical interest and is
697 // needed to time out old senders that are no sending
698 // any longer.
699
700 // compute the interarrival jitter estimation.
701 timeval tarrival;
702 timeval lastT = srcLink.getLastPacketTime();
703 timeval initial = srcLink.getInitialDataTime();
704 timersub(&lastT,&initial,&tarrival);
705 uint32 arrival = timeval2microtimeout(tarrival)
706 * getCurrentRTPClockRate();
707 uint32 transitTime = arrival - pkt.getTimestamp();
708 int32 delta = transitTime -
709 srcLink.getLastPacketTransitTime();
710 srcLink.setLastPacketTransitTime(transitTime);
711 if ( delta < 0 )
712 delta = -delta;
713 srcLink.setJitter( srcLink.getJitter() +
714 (1.0f / 16.0f) *
715 (static_cast<float>(delta) -
716 srcLink.getJitter()));
717 }
718 return result;
719}
720
721void
722IncomingDataQueue::recordExtraction(const IncomingRTPPkt&)
723{}
724
725void
726IncomingDataQueue::setInQueueCryptoContext(CryptoContext* cc)
727{
728 std::list<CryptoContext *>::iterator i;
729
730 MutexLock lock(cryptoMutex);
731 // check if a CryptoContext for a SSRC already exists. If yes
732 // remove it from list before inserting the new one.
733 for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ) {
734 if( (*i)->getSsrc() == cc->getSsrc() ) {
735 CryptoContext* tmp = *i;
736 cryptoContexts.erase(i);
737 delete tmp;
738 break;
739 }
740 }
741 cryptoContexts.push_back(cc);
742}
743
744void
745IncomingDataQueue::removeInQueueCryptoContext(CryptoContext* cc)
746{
747 std::list<CryptoContext *>::iterator i;
748
749 MutexLock lock(cryptoMutex);
750 if (cc == NULL) { // Remove any incoming crypto contexts
751 for (i = cryptoContexts.begin(); i != cryptoContexts.end(); ) {
752 CryptoContext* tmp = *i;
753 i = cryptoContexts.erase(i);
754 delete tmp;
755 }
756 }
757 else {
758 for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ){
759 if( (*i)->getSsrc() == cc->getSsrc() ) {
760 CryptoContext* tmp = *i;
761 cryptoContexts.erase(i);
762 delete tmp;
763 return;
764 }
765 }
766 }
767}
768
769CryptoContext*
770IncomingDataQueue::getInQueueCryptoContext(uint32 ssrc)
771{
772 std::list<CryptoContext *>::iterator i;
773
774 MutexLock lock(cryptoMutex);
775 for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ){
776 if( (*i)->getSsrc() == ssrc) {
777 return (*i);
778 }
779 }
780 return NULL;
781}
782
Alexandre Lisionddd731e2014-01-31 11:50:08 -0500783END_NAMESPACE
Emeric Vigier2f625822012-08-06 11:09:52 -0400784
785/** EMACS **
786 * Local variables:
787 * mode: c++
788 * c-basic-offset: 4
789 * End:
790 */