blob: 0fbd14d42553cc4ff259c19ccb2c125857895453 [file] [log] [blame]
Emeric Vigier2f625822012-08-06 11:09:52 -04001// Copyright (C) 2001,2002,2004 Federico Montesino Pouzols <fedemp@altern.org>.
2//
3// This program is free software; you can redistribute it and/or modify
4// it under the terms of the GNU General Public License as published by
5// the Free Software Foundation; either version 2 of the License, or
6// (at your option) any later version.
7//
8// This program is distributed in the hope that it will be useful,
9// but WITHOUT ANY WARRANTY; without even the implied warranty of
10// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11// GNU General Public License for more details.
12//
13// You should have received a copy of the GNU General Public License
14// along with this program; if not, write to the Free Software
15// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
16//
17// As a special exception, you may use this file as part of a free software
18// library without restriction. Specifically, if other files instantiate
19// templates or use macros or inline functions from this file, or you compile
20// this file and link it with other files to produce an executable, this
21// file does not by itself cause the resulting executable to be covered by
22// the GNU General Public License. This exception does not however
23// invalidate any other reasons why the executable file might be covered by
24// the GNU General Public License.
25//
26// This exception applies only to the code released under the name GNU
27// ccRTP. If you copy code from other releases into a copy of GNU
28// ccRTP, as the General Public License permits, the exception does
29// not apply to the code that you add in this way. To avoid misleading
30// anyone as to the status of such modified files, you must delete
31// this exception notice from them.
32//
33// If you write modifications of your own for GNU ccRTP, it is your choice
34// whether to permit this exception to apply to your modifications.
35// If you do not wish that, delete this exception notice.
36//
37
38/**
39 * @file iqueue.h
40 *
41 * @short Generic RTP input queues.
42 **/
43
44#ifndef CCXX_RTP_IQUEUE_H_
45#define CCXX_RTP_IQUEUE_H_
46
47#include <ccrtp/queuebase.h>
48#include <ccrtp/CryptoContext.h>
49
50#include <list>
51
52#ifdef CCXX_NAMESPACES
53namespace ost {
54#endif
55
56/**
57 * @defgroup iqueue Generic RTP input queues.
58 * @{
59 **/
60
61/**
62 * @class Members rtp.h
63 * @short members and senders accounting
64 *
65 * Records the number of members as well as active senders. For now,
66 * it is too simple.
67 *
68 * @author Federico Montesino Pouzols <fedemp@altern.org>
69 **/
70class __EXPORT Members
71{
72public:
73 inline void
74 setMembersCount(uint32 n)
75 { members = n; }
76
77 inline void
78 increaseMembersCount()
79 { members++; }
80
81 inline void
82 decreaseMembersCount()
83 { members--; }
84
85 inline uint32
86 getMembersCount() const
87 { return members; }
88
89 inline void
90 setSendersCount(uint32 n)
91 { activeSenders = n; }
92
93 inline void
94 increaseSendersCount()
95 { activeSenders++; }
96
97 inline void
98 decreaseSendersCount()
99 { activeSenders--; }
100
101 inline uint32
102 getSendersCount() const
103 { return activeSenders; }
104
105protected:
106 Members() :
107 members(0),
108 activeSenders(0)
109 { }
110
111 inline virtual ~Members()
112 { }
113
114private:
115 /// number of identified members
116 uint32 members;
117 /// number of identified members that currently are active senders
118 uint32 activeSenders;
119};
120
121/**
122 * @class SyncSourceHandler
123 * @short SyncSource objects modification methods.
124 *
125 * @author Federico Montesino Pouzols <fedemp@altern.org>
126 **/
127class __EXPORT SyncSourceHandler
128{
129public:
130 /**
131 * This requires SyncSource - SyncSourceHandler friendship.
132 *
133 * Get the SyncSourceLink corresponding to a SyncSource
134 * object.
135 **/
136 inline void*
137 getLink(const SyncSource& source) const
138 { return source.getLink(); }
139
140 inline void
141 setLink(SyncSource& source, void* link)
142 { source.setLink(link); }
143
144 inline void
145 setParticipant(SyncSource& source, Participant& p)
146 { source.setParticipant(p); }
147
148 inline void
149 setState(SyncSource& source, SyncSource::State ns)
150 { source.setState(ns); }
151
152 inline void
153 setSender(SyncSource& source, bool active)
154 { source.setSender(active); }
155
156 inline void
157 setDataTransportPort(SyncSource& source, tpport_t p)
158 { source.setDataTransportPort(p); }
159
160 inline void
161 setControlTransportPort(SyncSource& source, tpport_t p)
162 { source.setControlTransportPort(p); }
163
164 inline void
165 setNetworkAddress(SyncSource& source, InetAddress addr)
166 { source.setNetworkAddress(addr); }
167
168protected:
169 SyncSourceHandler()
170 { }
171
172 inline virtual ~SyncSourceHandler()
173 { }
174};
175
176/**
177 * @class ParticipantHandler
178 * @short Participant objects modification methods.
179 *
180 * @author Federico Montesino Pouzols <fedemp@altern.org>
181 **/
182class __EXPORT ParticipantHandler
183{
184public:
185 inline void
186 setSDESItem(Participant* part, SDESItemType item,
187 const std::string& val)
188 { part->setSDESItem(item,val); }
189
190 inline void
191 setPRIVPrefix(Participant* part, const std::string val)
192 { part->setPRIVPrefix(val); }
193
194protected:
195 ParticipantHandler()
196 { }
197
198 inline virtual ~ParticipantHandler()
199 { }
200};
201
202/**
203 * @class ApplicationHandler
204 * @short Application objects modification methods.
205 *
206 * @author Federico Montesino Pouzols <fedemp@altern.org>
207 **/
208class __EXPORT ApplicationHandler
209{
210public:
211 inline void
212 addParticipant(RTPApplication& app, Participant& part)
213 { app.addParticipant(part); }
214
215 inline void
216 removeParticipant(RTPApplication& app,
217 RTPApplication::ParticipantLink* pl)
218 { app.removeParticipant(pl); }
219
220protected:
221 ApplicationHandler()
222 { }
223
224 inline virtual ~ApplicationHandler()
225 { }
226};
227
228/**
229 * @class ConflictHandler
230 * @short To track addresses of sources conflicting with the local
231 * one.
232 *
233 * @author Federico Montesino Pouzols <fedemp@altern.org>
234 **/
235class __EXPORT ConflictHandler
236{
237public:
238 struct ConflictingTransportAddress
239 {
240 ConflictingTransportAddress(InetAddress na,
241 tpport_t dtp, tpport_t ctp);
242
243 void setNext(ConflictingTransportAddress* nc)
244 { next = nc; }
245
246 inline const InetAddress& getNetworkAddress( ) const
247 { return networkAddress; }
248
249 inline tpport_t getDataTransportPort() const
250 { return dataTransportPort; }
251
252 inline tpport_t getControlTransportPort() const
253 { return controlTransportPort; }
254
255 InetAddress networkAddress;
256 tpport_t dataTransportPort;
257 tpport_t controlTransportPort;
258 ConflictingTransportAddress* next;
259 // arrival time of last data or control packet.
260 timeval lastPacketTime;
261 };
262
263 /**
264 * @param na Inet network address.
265 * @param dtp Data transport port.
266 **/
267 ConflictingTransportAddress* searchDataConflict(InetAddress na,
268 tpport_t dtp);
269 /**
270 * @param na Inet network address.
271 * @param ctp Data transport port.
272 **/
273 ConflictingTransportAddress* searchControlConflict(InetAddress na,
274 tpport_t ctp);
275
276 void updateConflict(ConflictingTransportAddress& ca)
277 { gettimeofday(&(ca.lastPacketTime),NULL); }
278
279 void addConflict(const InetAddress& na, tpport_t dtp, tpport_t ctp);
280
281protected:
282 ConflictHandler()
283 { firstConflict = lastConflict = NULL; }
284
285 inline virtual ~ConflictHandler()
286 { }
287
288 ConflictingTransportAddress* firstConflict, * lastConflict;
289};
290
291/**
292 * @class MembershipBookkeeping
293 * @short Controls the group membership in the current session.
294 *
295 * For now, this class implements only a hash table of members, but
296 * its design and relation with other classes is intented to support
297 * group membership sampling in case scalability problems arise.
298 *
299 * @author Federico Montesino Pouzols <fedemp@altern.org>
300 */
301class __EXPORT MembershipBookkeeping :
302 public SyncSourceHandler,
303 public ParticipantHandler,
304 public ApplicationHandler,
305 public ConflictHandler,
306 private Members
307{
308public:
309 inline size_t getDefaultMembersHashSize()
310 { return defaultMembersHashSize; }
311
312protected:
313
314 /**
315 * @short The initial size is a hint to allocate the resources
316 * needed in order to keep the members' identifiers and
317 * associated information.
318 *
319 * Although ccRTP will reallocate resources when it becomes
320 * necessary, a good hint may save a lot of unpredictable time
321 * penalties.
322 *
323 * @param initialSize an estimation of how many participants
324 * the session will consist of.
325 *
326 */
327 MembershipBookkeeping(uint32 initialSize = defaultMembersHashSize);
328
329 /**
330 * Purges all RTPSource structures created during the session,
331 * as well as the hash table and the list of sources.
332 **/
333 inline virtual
334 ~MembershipBookkeeping()
335 { endMembers(); }
336
337 struct SyncSourceLink;
338
339 inline SyncSourceLink* getLink(const SyncSource& source) const
340 { return static_cast<SyncSourceLink*>(SyncSourceHandler::getLink(source)); }
341 /**
342 * Get whether a synchronization source is recorded in this
343 * membership controller.
344 **/
345 inline bool isMine(const SyncSource& source) const
346 { return getLink(source)->getMembership() == this; }
347
348 /**
349 * @struct IncomingRTPPktLink
350 *
351 * @short Incoming RTP data packets control structure within
352 * the incoming packet queue class.
353 **/
354 struct IncomingRTPPktLink
355 {
356 IncomingRTPPktLink(IncomingRTPPkt* pkt, SyncSourceLink* sLink,
357 struct timeval& recv_ts,
358 uint32 shifted_ts,
359 IncomingRTPPktLink* sp,
360 IncomingRTPPktLink* sn,
361 IncomingRTPPktLink* p,
362 IncomingRTPPktLink* n) :
363 packet(pkt),
364 sourceLink(sLink),
365 prev(p), next(n),
366 srcPrev(sp), srcNext(sn),
367 receptionTime(recv_ts),
368 shiftedTimestamp(shifted_ts)
369 { }
370
371 ~IncomingRTPPktLink()
372 { }
373
374 inline SyncSourceLink* getSourceLink() const
375 { return sourceLink; }
376
377 inline void setSourceLink(SyncSourceLink* src)
378 { sourceLink = src; }
379
380 inline IncomingRTPPktLink* getNext() const
381 { return next; }
382
383 inline void setNext(IncomingRTPPktLink* nl)
384 { next = nl; }
385
386 inline IncomingRTPPktLink* getPrev() const
387 { return prev; }
388
389 inline void setPrev(IncomingRTPPktLink* pl)
390 { prev = pl; }
391
392 inline IncomingRTPPktLink* getSrcNext() const
393 { return srcNext; }
394
395 inline void setSrcNext(IncomingRTPPktLink* sn)
396 { srcNext = sn; }
397
398 inline IncomingRTPPktLink* getSrcPrev() const
399 { return srcPrev; }
400
401 inline void setSrcPrev(IncomingRTPPktLink* sp)
402 { srcPrev = sp; }
403
404 inline IncomingRTPPkt* getPacket() const
405 { return packet; }
406
407 inline void setPacket(IncomingRTPPkt* pkt)
408 { packet = pkt; }
409
410 /**
411 * Set the time this packet was received at.
412 *
413 * @param t time of reception.
414 * @note this has almost nothing to do with the 32-bit
415 * timestamp contained in the packet header.
416 **/
417 inline void setRecvTime(const timeval &t)
418 { receptionTime = t; }
419
420 /**
421 * Get the time this packet was received at.
422 **/
423 inline timeval getRecvTime() const
424 { return receptionTime; }
425
426 /**
427 * Get timestamp of this packet. The timestamp of
428 * incoming packets is filtered so that the timestamp
429 * this method provides for the first packet received
430 * from every source starts from 0.
431 *
432 * @return 32 bit timestamp starting from 0 for each source.
433 */
434 inline uint32 getTimestamp() const
435 { return shiftedTimestamp; }
436
437 inline void setTimestamp(uint32 ts)
438 { shiftedTimestamp = ts;}
439
440 // the packet this link refers to.
441 IncomingRTPPkt* packet;
442 // the synchronization source this packet comes from.
443 SyncSourceLink* sourceLink;
444 // global incoming packet queue links.
445 IncomingRTPPktLink* prev, * next;
446 // source specific incoming packet queue links.
447 IncomingRTPPktLink* srcPrev, * srcNext;
448 // time this packet was received at
449 struct timeval receptionTime;
450 // timestamp of the packet in host order and after
451 // substracting the initial timestamp for its source
452 // (it is an increment from the initial timestamp).
453 uint32 shiftedTimestamp;
454 };
455
456 /**
457 * @struct SyncSourceLink
458 *
459 * @short Synchronization Source internal handler within the
460 * incoming packets queue.
461 *
462 * Incoming packets queue objects hold a hash table and a
463 * linked list of synchronization sources. For each of these
464 * sources, there is also a linked list of incoming rtp
465 * packets, which are linked in an "all incoming packets" list
466 * as well. SyncSourceLink objects hold the necessary data to
467 * maintain these data estructures, as well as source specific
468 * information and statistics for RTCP,
469 *
470 * @author Federico Montesino Pouzols <fedemp@altern.org>
471 **/
472 struct SyncSourceLink
473 {
474 // 2^16
475 static const uint32 SEQNUMMOD;
476
477 SyncSourceLink(MembershipBookkeeping* m,
478 SyncSource* s,
479 IncomingRTPPktLink* fp = NULL,
480 IncomingRTPPktLink* lp = NULL,
481 SyncSourceLink* ps = NULL,
482 SyncSourceLink* ns = NULL,
483 SyncSourceLink* ncollis = NULL) :
484 membership(m), source(s), first(fp), last(lp),
485 prev(ps), next(ns), nextCollis(ncollis),
486 prevConflict(NULL)
487 { m->setLink(*s,this); // record that the source is associated
488 initStats(); // to this link.
489 }
490
491 /**
492 * Note it deletes the source.
493 **/
494 ~SyncSourceLink();
495
496 inline MembershipBookkeeping* getMembership()
497 { return membership; }
498
499 /**
500 * Get the synchronization source object this link
501 * objet holds information for.
502 **/
503 inline SyncSource* getSource() { return source; }
504
505 /**
506 * Get first RTP (data) packet in the queue of packets
507 * received from this socket.
508 **/
509 inline IncomingRTPPktLink* getFirst()
510 { return first; }
511
512 inline void setFirst(IncomingRTPPktLink* fp)
513 { first = fp; }
514
515 /**
516 * Get last RTP (data) packet in the queue of packets
517 * received from this socket.
518 **/
519 inline IncomingRTPPktLink* getLast()
520 { return last; }
521
522 inline void setLast(IncomingRTPPktLink* lp)
523 { last = lp; }
524
525 /**
526 * Get the link object for the previous RTP source.
527 **/
528 inline SyncSourceLink* getPrev()
529 { return prev; }
530
531 inline void setPrev(SyncSourceLink* ps)
532 { prev = ps; }
533
534 /**
535 * Get the link object for the next RTP source.
536 **/
537 inline SyncSourceLink* getNext()
538 { return next; }
539
540 inline void setNext(SyncSourceLink *ns)
541 { next = ns; }
542
543 /**
544 * Get the link object for the next RTP source in the
545 * hash table entry collision list. Note that
546 * collision does not refer to SSRC collision, but
547 * hash table collision.
548 **/
549 inline SyncSourceLink* getNextCollis()
550 { return nextCollis; }
551
552 inline void setNextCollis(SyncSourceLink* ns)
553 { nextCollis = ns; }
554
555 inline ConflictingTransportAddress* getPrevConflict() const
556 { return prevConflict; }
557
558 /**
559 * Get conflicting address.
560 **/
561 void setPrevConflict(InetAddress& addr, tpport_t dataPort,
562 tpport_t controlPort);
563
564 unsigned char* getSenderInfo()
565 { return senderInfo; }
566
567 void setSenderInfo(unsigned char* si);
568
569 unsigned char* getReceiverInfo()
570 { return receiverInfo; }
571
572 void setReceiverInfo(unsigned char* ri);
573
574 inline timeval getLastPacketTime() const
575 { return lastPacketTime; }
576
577 inline timeval getLastRTCPPacketTime() const
578 { return lastRTCPPacketTime; }
579
580 inline timeval getLastRTCPSRTime() const
581 { return lastRTCPSRTime; }
582
583 /**
584 * Get the total number of RTP packets received from this
585 * source.
586 */
587 inline uint32 getObservedPacketCount() const
588 { return obsPacketCount; }
589
590 inline void incObservedPacketCount()
591 { obsPacketCount++; }
592
593 /**
594 * Get the total number of payload octets received from this
595 * source.
596 **/
597 inline uint32 getObservedOctetCount() const
598 { return obsOctetCount; }
599
600 inline void incObservedOctetCount(uint32 n)
601 { obsOctetCount += n; }
602
603 /**
604 * Get the highest valid sequence number received.
605 **/
606 uint16
607 getMaxSeqNum() const
608 { return maxSeqNum; }
609
610 /**
611 * Set the highest valid sequence number recived.
612 * @param max Sequence number.
613 **/
614 void
615 setMaxSeqNum(uint16 max)
616 { maxSeqNum = max; }
617
618 inline uint32
619 getExtendedMaxSeqNum() const
620 { return extendedMaxSeqNum; }
621
622 inline void
623 setExtendedMaxSeqNum(uint32 seq)
624 { extendedMaxSeqNum = seq; }
625
626 inline uint32 getCumulativePacketLost() const
627 { return cumulativePacketLost; }
628
629 inline void setCumulativePacketLost(uint32 pl)
630 { cumulativePacketLost = pl; }
631
632 inline uint8 getFractionLost() const
633 { return fractionLost; }
634
635 inline void setFractionLost(uint8 fl)
636 { fractionLost = fl; }
637
638 inline uint32 getLastPacketTransitTime()
639 { return lastPacketTransitTime; }
640
641 inline void setLastPacketTransitTime(uint32 time)
642 { lastPacketTransitTime = time; }
643
644 inline float getJitter() const
645 { return jitter; }
646
647 inline void setJitter(float j)
648 { jitter = j; }
649
650 inline uint32 getInitialDataTimestamp() const
651 { return initialDataTimestamp; }
652
653 inline void setInitialDataTimestamp(uint32 ts)
654 { initialDataTimestamp = ts; }
655
656 inline timeval getInitialDataTime() const
657 { return initialDataTime; }
658
659 inline void setInitialDataTime(timeval it)
660 { initialDataTime = it; }
661
662 /**
663 * Mark this source as having sent a BYE control packet.
664 *
665 * @return whether some packet from this source had
666 * been received before (getHello() has been called at
667 * least once)
668 **/
669 bool getGoodbye()
670 {
671 if(!flag)
672 return false;
673 flag = false;
674 return true;
675 }
676
677 /**
678 * Mark this source as having sent some packet.
679 *
680 * @return whether no packet from this source had been
681 * received before
682 **/
683 bool getHello() {
684 if(flag)
685 return false;
686 flag = true;
687 return true;
688 }
689
690 inline uint32 getBadSeqNum() const
691 { return badSeqNum; }
692
693 inline void setBadSeqNum(uint32 seq)
694 { badSeqNum = seq; }
695
696 uint8 getProbation() const
697 { return probation; }
698
699 inline void setProbation(uint8 p)
700 { probation = p; }
701
702 inline void decProbation()
703 { --probation; }
704
705 bool isValid() const
706 { return 0 == probation; }
707
708 inline uint16 getBaseSeqNum() const
709 { return baseSeqNum; }
710
711 inline uint32 getSeqNumAccum() const
712 { return seqNumAccum; }
713
714 inline void incSeqNumAccum()
715 { seqNumAccum += SEQNUMMOD; }
716
717 /**
718 * Start a new sequence of received packets.
719 **/
720 inline void initSequence(uint16 seqnum)
721 { maxSeqNum = seqNumAccum = seqnum; }
722
723 /**
724 * Record the insertion of an RTP packet from this
725 * source into the scheduled reception queue. All
726 * received packets should be registered with
727 * recordReception(), but only those actually inserted
728 * into the queue should be registered via this
729 * method.
730 *
731 * @param pl Link structure for packet inserted into the queue.
732 **/
733 void recordInsertion(const IncomingRTPPktLink& pl);
734
735 void initStats();
736
737 /**
738 * Compute cumulative packet lost and fraction of
739 * packets lost during the last reporting interval.
740 **/
741 void computeStats();
742
743 MembershipBookkeeping* membership;
744 // The source this link object refers to.
745 SyncSource* source;
746 // first/last packets from this source in the queue.
747 IncomingRTPPktLink* first, * last;
748 // Links for synchronization sources located before
749 // and after this one in the list of sources.
750 SyncSourceLink* prev, * next;
751 // Prev and next inside the hash table collision list.
752 SyncSourceLink* nextCollis;
753 ConflictingTransportAddress* prevConflict;
754 unsigned char* senderInfo;
755 unsigned char* receiverInfo;
756 // time the last RTP packet from this source was
757 // received at.
758 timeval lastPacketTime;
759 // time the last RTCP packet was received.
760 timeval lastRTCPPacketTime;
761 // time the lasrt RTCP SR was received. Required for
762 // DLSR computation.
763 timeval lastRTCPSRTime;
764
765 // for outgoing RR reports.
766 // number of packets received from this source.
767 uint32 obsPacketCount;
768 // number of octets received from this source.
769 uint32 obsOctetCount;
770 // the higher sequence number seen from this source
771 uint16 maxSeqNum;
772 uint32 extendedMaxSeqNum;
773 uint32 cumulativePacketLost;
774 uint8 fractionLost;
775 // for interarrivel jitter computation
776 uint32 lastPacketTransitTime;
777 // interarrival jitter of packets from this source.
778 float jitter;
779 uint32 initialDataTimestamp;
780 timeval initialDataTime;
781
782 // this flag assures we only call one gotHello and one
783 // gotGoodbye for this src.
784 bool flag;
785
786 // for source validation:
787 uint32 badSeqNum;
788 uint8 probation; // packets in sequence before valid.
789 uint16 baseSeqNum;
790 uint32 expectedPrior;
791 uint32 receivedPrior;
792 uint32 seqNumAccum;
793 };
794
795 /**
796 * Returns whether there is already a synchronizacion source
797 * with "ssrc" SSRC identifier.
798 **/
799 bool
800 isRegistered(uint32 ssrc);
801
802 /**
803 * Get the description of a source by its <code>ssrc</code> identifier.
804 *
805 * @param ssrc SSRC identifier, in host order.
806 * @param created whether a new source has been created.
807 * @return Pointer to the SyncSource object identified by
808 * <code>ssrc</code>.
809 */
810 SyncSourceLink*
811 getSourceBySSRC(uint32 ssrc, bool& created);
812
813 /**
814 * Mark the source identified by <code>ssrc</code> as having
815 * sent a BYE packet. It is not deleted until a timeout
816 * expires, so that in case some packets from this source
817 * arrive a bit later the source is not inserted again in the
818 * table of known sources.
819 *
820 * @return true if the source had been previously identified.
821 * false if it was not in the table of known sources.
822 **/
823 bool
824 BYESource(uint32 ssrc);
825
826 /**
827 * Remove the description of the source identified by
828 * <code>ssrc</code>
829 *
830 * @return whether the source has been actually removed or it
831 * did not exist.
832 */
833 bool
834 removeSource(uint32 ssrc);
835
836 inline SyncSourceLink* getFirst()
837 { return first; }
838
839 inline SyncSourceLink* getLast()
840 { return last; }
841
842 inline uint32
843 getMembersCount()
844 { return Members::getMembersCount(); }
845
846 inline void
847 setMembersCount(uint32 n)
848 { Members::setMembersCount(n); }
849
850 inline uint32
851 getSendersCount()
852 { return Members::getSendersCount(); }
853
854 static const size_t defaultMembersHashSize;
855 static const uint32 SEQNUMMOD;
856
857private:
858 MembershipBookkeeping(const MembershipBookkeeping &o);
859
860 MembershipBookkeeping&
861 operator=(const MembershipBookkeeping &o);
862
863 /**
864 * Purge all RTPSource structures, the hash table and the list
865 * of sources.
866 **/
867 void
868 endMembers();
869
870 // Hash table with sources of RTP and RTCP packets
871 uint32 sourceBucketsNum;
872 SyncSourceLink** sourceLinks;
873 // List of sources, ordered from older to newer
874 SyncSourceLink* first, * last;
875};
876
877/**
878 * @class IncomingDataQueue
879 * @short Queue for incoming RTP data packets in an RTP session.
880 *
881 * @author Federico Montesino Pouzols <fedemp@altern.org>
882 **/
883class __EXPORT IncomingDataQueue: public IncomingDataQueueBase,
884 protected MembershipBookkeeping
885{
886public:
887 /**
888 * @class SyncSourcesIterator
889 * @short iterator through the list of synchronizations
890 * sources in this session
891 **/
892 class SyncSourcesIterator
893 {
894 public:
895 typedef std::forward_iterator_tag iterator_category;
896 typedef SyncSource value_type;
897 typedef ptrdiff_t difference_type;
898 typedef const SyncSource* pointer;
899 typedef const SyncSource& reference;
900
901 SyncSourcesIterator(SyncSourceLink* l = NULL) :
902 link(l)
903 { }
904
905 SyncSourcesIterator(const SyncSourcesIterator& si) :
906 link(si.link)
907 { }
908
909 reference operator*() const
910 { return *(link->getSource()); }
911
912 pointer operator->() const
913 { return link->getSource(); }
914
915 SyncSourcesIterator& operator++() {
916 link = link->getNext();
917 return *this;
918 }
919
920 SyncSourcesIterator operator++(int) {
921 SyncSourcesIterator result(*this);
922 ++(*this);
923 return result;
924 }
925
926 friend bool operator==(const SyncSourcesIterator& l,
927 const SyncSourcesIterator& r)
928 { return l.link == r.link; }
929
930 friend bool operator!=(const SyncSourcesIterator& l,
931 const SyncSourcesIterator& r)
932 { return l.link != r.link; }
933
934 private:
935 SyncSourceLink *link;
936 };
937
938 SyncSourcesIterator begin()
939 { return SyncSourcesIterator(MembershipBookkeeping::getFirst()); }
940
941 SyncSourcesIterator end()
942 { return SyncSourcesIterator(NULL); }
943
944 /**
945 * Retreive data from a specific timestamped packet if such a
946 * packet is currently available in the receive buffer.
947 *
948 * @param stamp Data unit timestamp.
949 * @param src Optional synchronization source selector.
950 * @return data retrieved from the reception buffer.
951 * @retval null pointer if no packet with such timestamp is available.
952 **/
953 const AppDataUnit*
954 getData(uint32 stamp, const SyncSource* src = NULL);
955
956
957 /**
958 * Determine if packets are waiting in the reception queue.
959 *
960 * @param src Optional synchronization source selector.
961 * @return True if packets are waiting.
962 */
963 bool
964 isWaiting(const SyncSource* src = NULL) const;
965
966 /**
967 * Get timestamp of first packet waiting in the queue.
968 *
969 * @param src optional source selector.
970 * @return timestamp of first arrival packet.
971 **/
972 uint32
973 getFirstTimestamp(const SyncSource* src = NULL) const;
974
975 /**
976 * When receiving packets from a new source, it may be
977 * convenient to reject a first few packets before we are
978 * really sure the source is valid. This method sets how many
979 * data packets must be received in sequence before the source
980 * is considered valid and the stack starts to accept its
981 * packets.
982 *
983 * @note the default (see defaultMinValidPacketSequence())
984 * value for this parameter is 0, so that no packets are
985 * rejected (data packets are accepted from the first one).
986 *
987 * @note this validation is performed after the generic header
988 * validation and the additional validation done in
989 * onRTPPacketRecv().
990 *
991 * @note if any valid RTCP packet is received from this
992 * source, it will be immediatly considered valid regardless
993 * of the number of sequential data packets received.
994 *
995 * @param packets number of sequential packet required
996 **/
997 void
998 setMinValidPacketSequence(uint8 packets)
999 { minValidPacketSequence = packets; }
1000
1001 uint8
1002 getDefaultMinValidPacketSequence() const
1003 { return defaultMinValidPacketSequence; }
1004
1005 /**
1006 * Get the minimun number of consecutive packets that must be
1007 * received from a source before accepting its data packets.
1008 **/
1009 uint8
1010 getMinValidPacketSequence() const
1011 { return minValidPacketSequence; }
1012
1013 void
1014 setMaxPacketMisorder(uint16 packets)
1015 { maxPacketMisorder = packets; }
1016
1017 uint16
1018 getDefaultMaxPacketMisorder() const
1019 { return defaultMaxPacketMisorder; }
1020
1021 uint16
1022 getMaxPacketMisorder() const
1023 { return maxPacketMisorder; }
1024
1025 /**
1026 *
1027 * It also prevents packets sent after a restart of the source
1028 * being immediately accepted.
1029 **/
1030 void
1031 setMaxPacketDropout(uint16 packets) // default: 3000.
1032 { maxPacketDropout = packets; }
1033
1034 uint16
1035 getDefaultMaxPacketDropout() const
1036 { return defaultMaxPacketDropout; }
1037
1038 uint16
1039 getMaxPacketDropout() const
1040 { return maxPacketDropout; }
1041
1042 // default value for constructors that allow to specify
1043 // members table s\ize
1044 inline static size_t
1045 getDefaultMembersSize()
1046 { return defaultMembersSize; }
1047
1048 /**
1049 * Set input queue CryptoContext.
1050 *
1051 * The endQueue method (provided by RTPQueue) deletes all
1052 * registered CryptoContexts.
1053 *
1054 * @param cc Pointer to initialized CryptoContext.
1055 */
1056 void
1057 setInQueueCryptoContext(CryptoContext* cc);
1058
1059 /**
1060 * Remove input queue CryptoContext.
1061 *
1062 * The endQueue method (provided by RTPQueue) also deletes all
1063 * registered CryptoContexts.
1064 *
1065 * @param cc
1066 * Pointer to initialized CryptoContext to remove. If pointer
1067 * if <code>NULL</code> then delete the whole queue
1068 */
1069 void
1070 removeInQueueCryptoContext(CryptoContext* cc);
1071
1072 /**
1073 * Get an input queue CryptoContext identified by SSRC
1074 *
1075 * @param ssrc Request CryptoContext for this incoming SSRC
1076 * @return Pointer to CryptoContext of the SSRC of NULL if no context
1077 * available for this SSRC.
1078 */
1079 CryptoContext*
1080 getInQueueCryptoContext(uint32 ssrc);
1081
1082protected:
1083 /**
1084 * @param size initial size of the membership table.
1085 **/
1086 IncomingDataQueue(uint32 size);
1087
1088 virtual ~IncomingDataQueue()
1089 { }
1090
1091 /**
1092 * Apply collision and loop detection and correction algorithm
1093 * when receiving RTP data packets. Follows section 8.2 in
1094 * draft-ietf-avt-rtp-new.
1095 *
1096 * @param sourceLink link to the source object.
1097 * @param is_new whether the source has been just recorded.
1098 * @param na data packet network address.
1099 * @param tp data packet source transport port.
1100 *
1101 * @return whether the packet must not be discarded.
1102 **/
1103 bool checkSSRCInIncomingRTPPkt(SyncSourceLink& sourceLink,
1104 bool is_new, InetAddress& na,
1105 tpport_t tp);
1106
1107 /**
1108 * Set the number of RTCP intervals that the stack will wait
1109 * to change the state of a source from stateActive to
1110 * stateInactive, or to delete the source after being in
1111 * stateInactive.
1112 *
1113 * Note that this value should be uniform accross all
1114 * participants and SHOULD be fixed for a particular profile.
1115 *
1116 * @param intervals number of RTCP report intervals
1117 *
1118 * @note If RTCP is not being used, the RTCP interval is
1119 * assumed to be the default: 5 seconds.
1120 * @note The default for this value is, as RECOMMENDED, 5.
1121 **/
1122 void setSourceExpirationPeriod(uint8 intervals)
1123 { sourceExpirationPeriod = intervals; }
1124
1125 /**
1126 * This function is used by the service thread to process
1127 * the next incoming packet and place it in the receive list.
1128 *
1129 * @return number of payload bytes received. <0 if error.
1130 */
1131 virtual size_t
1132 takeInDataPacket();
1133
1134 void renewLocalSSRC();
1135
1136 /**
1137 * This is used to fetch a packet in the receive queue and to
1138 * expire packets older than the current timestamp.
1139 *
1140 * @return packet buffer object for current timestamp if found.
1141 * @param timestamp timestamp requested.
1142 * @param src optional source selector
1143 * @note if found, the packet is removed from the reception queue
1144 **/
1145 IncomingDataQueue::IncomingRTPPktLink*
1146 getWaiting(uint32 timestamp, const SyncSource *src = NULL);
1147
1148 /**
1149 * Log reception of a new RTP packet from this source. Usually
1150 * updates data such as the packet counter, the expected
1151 * sequence number for the next packet and the time the last
1152 * packet was received at.
1153 *
1154 * @param srcLink Link structure for the synchronization
1155 * source of this packet.
1156 * @param pkt Packet just created and to be logged.
1157 * @param recvtime Reception time.
1158 *
1159 * @return whether, according to the source state and
1160 * statistics, the packet is considered valid and must be
1161 * inserted in the incoming packets queue.
1162 **/
1163 bool
1164 recordReception(SyncSourceLink& srcLink, const IncomingRTPPkt& pkt,
1165 const timeval recvtime);
1166
1167 /**
1168 * Log extraction of a packet from this source from the
1169 * scheduled reception queue.
1170 *
1171 * @param pkt Packet extracted from the queue.
1172 **/
1173 void
1174 recordExtraction(const IncomingRTPPkt& pkt);
1175
1176 void purgeIncomingQueue();
1177
1178 /**
1179 * Virtual called when a new synchronization source has joined
1180 * the session.
1181 *
1182 * @param - new synchronization source
1183 **/
1184 inline virtual void
1185 onNewSyncSource(const SyncSource&)
1186 { }
1187
1188protected:
1189 /**
1190 * A virtual function to support parsing of arriving packets
1191 * to determine if they should be kept in the queue and to
1192 * dispatch events.
1193 *
1194 * A generic header validity check (as specified in RFC 1889)
1195 * is performed on every incoming packet. If the generic check
1196 * completes succesfully, this method is called before the
1197 * packet is actually inserted into the reception queue.
1198 *
1199 * May be used to perform additional validity checks or to do
1200 * some application specific processing.
1201 *
1202 * @param - packet just received.
1203 * @return true if packet is kept in the incoming packets queue.
1204 **/
1205 inline virtual bool
1206 onRTPPacketRecv(IncomingRTPPkt&)
1207 { return true; }
1208
1209 /**
1210 * A hook to filter packets in the receive queue that are being
1211 * expired. This hook may be used to do some application
1212 * specific processing on expired packets before they are
1213 * deleted.
1214 *
1215 * @param - packet expired from the recv queue.
1216 **/
1217 inline virtual void onExpireRecv(IncomingRTPPkt&)
1218 { return; }
1219
1220 /**
1221 * A hook that gets called if the decoding of an incoming SRTP was erroneous
1222 *
1223 * @param pkt
1224 * The SRTP packet with error.
1225 * @param errorCode
1226 * The error code: -1 - SRTP authentication failure, -2 - replay
1227 * check failed
1228 * @return
1229 * True: put the packet in incoming queue for further processing
1230 * by the applications; false: dismiss packet. The default
1231 * implementation returns false.
1232 **/
1233 inline virtual bool
1234 onSRTPPacketError(IncomingRTPPkt& pkt, int32 errorCode)
1235 { return false; }
1236
1237 inline virtual bool
1238 end2EndDelayed(IncomingRTPPktLink&)
1239 { return false; }
1240
1241 /**
1242 * Insert a just received packet in the queue (both general
1243 * and source specific queues). If the packet was already in
1244 * the queue (same SSRC and sequence number), it is not
1245 * inserted but deleted.
1246 *
1247 * @param packetLink link to a packet just received and
1248 * generally validated and processed by onRTPPacketRecv.
1249 *
1250 * @return whether the packet was successfully inserted.
1251 * @retval false when the packet is duplicated (there is
1252 * already a packet from the same source with the same
1253 * timestamp).
1254 * @retval true when the packet is not duplicated.
1255 **/
1256 bool
1257 insertRecvPacket(IncomingRTPPktLink* packetLink);
1258
1259 /**
1260 * This function performs the physical I/O for reading a
1261 * packet from the source. It is a virtual that is
1262 * overriden in the derived class.
1263 *
1264 * @return number of bytes read.
1265 * @param buffer of read packet.
1266 * @param length of data to read.
1267 * @param host address of source.
1268 * @param port number of source.
1269 **/
1270 virtual size_t
1271 recvData(unsigned char* buffer, size_t length,
1272 InetHostAddress& host, tpport_t& port) = 0;
1273
1274 virtual size_t
1275 getNextDataPacketSize() const = 0;
1276
1277 mutable ThreadLock recvLock;
1278 // reception queue
1279 IncomingRTPPktLink* recvFirst, * recvLast;
1280 // values for packet validation.
1281 static const uint8 defaultMinValidPacketSequence;
1282 static const uint16 defaultMaxPacketMisorder;
1283 static const uint16 defaultMaxPacketDropout;
1284 uint8 minValidPacketSequence;
1285 uint16 maxPacketMisorder;
1286 uint16 maxPacketDropout;
1287 static const size_t defaultMembersSize;
1288 uint8 sourceExpirationPeriod;
1289 mutable Mutex cryptoMutex;
1290 std::list<CryptoContext *> cryptoContexts;
1291};
1292
1293/** @}*/ // iqueue
1294
1295#ifdef CCXX_NAMESPACES
1296}
1297#endif
1298
1299#endif //CCXX_RTP_IQUEUE_H_
1300
1301/** EMACS **
1302 * Local variables:
1303 * mode: c++
1304 * c-basic-offset: 8
1305 * End:
1306 */