blob: 4edb34d12260fd9574b21f2f8c4b38720d3bac01 [file] [log] [blame]
Emeric Vigier2f625822012-08-06 11:09:52 -04001// Copyright (C) 2001,2002,2004,2005 Federico Montesino Pouzols <fedemp@altern.org>
2//
3// This program is free software; you can redistribute it and/or modify
4// it under the terms of the GNU General Public License as published by
5// the Free Software Foundation; either version 2 of the License, or
6// (at your option) any later version.
7//
8// This program is distributed in the hope that it will be useful,
9// but WITHOUT ANY WARRANTY; without even the implied warranty of
10// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11// GNU General Public License for more details.
12//
13// You should have received a copy of the GNU General Public License
14// along with this program; if not, write to the Free Software
15// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
16//
17// As a special exception, you may use this file as part of a free software
18// library without restriction. Specifically, if other files instantiate
19// templates or use macros or inline functions from this file, or you compile
20// this file and link it with other files to produce an executable, this
21// file does not by itself cause the resulting executable to be covered by
22// the GNU General Public License. This exception does not however
23// invalidate any other reasons why the executable file might be covered by
24// the GNU General Public License.
25//
26// This exception applies only to the code released under the name GNU
27// ccRTP. If you copy code from other releases into a copy of GNU
28// ccRTP, as the General Public License permits, the exception does
29// not apply to the code that you add in this way. To avoid misleading
30// anyone as to the status of such modified files, you must delete
31// this exception notice from them.
32//
33// If you write modifications of your own for GNU ccRTP, it is your choice
34// whether to permit this exception to apply to your modifications.
35// If you do not wish that, delete this exception notice.
36//
37
38#include "private.h"
39#include <ccrtp/oqueue.h>
40
41#ifdef CCXX_NAMESPACES
42namespace ost {
43#endif
44
45const size_t OutgoingDataQueueBase::defaultMaxSendSegmentSize = 65536;
46
47OutgoingDataQueueBase::OutgoingDataQueueBase()
48{
49 // segment data in packets of no more than 65536 octets.
50 setMaxSendSegmentSize(getDefaultMaxSendSegmentSize());
51}
52
53DestinationListHandler::DestinationListHandler() :
54destList(), destinationLock()
55{}
56
57DestinationListHandler::~DestinationListHandler()
58{
59 TransportAddress* tmp = NULL;
60 writeLockDestinationList();
61 for (std::list<TransportAddress*>::iterator i = destList.begin();
62 destList.end() != i; i++) {
63 tmp = *i;
64#ifdef CCXX_EXCEPTIONS
65 try {
66#endif
67 delete tmp;
68#ifdef CCXX_EXCEPTIONS
69 } catch (...) {}
70#endif
71 }
72 unlockDestinationList();
73}
74
75bool
76DestinationListHandler::addDestinationToList(const InetAddress& ia,
77tpport_t data, tpport_t control)
78{
79 TransportAddress* addr = new TransportAddress(ia,data,control);
80 writeLockDestinationList();
81 destList.push_back(addr);
82 unlockDestinationList();
83 return true;
84}
85
86bool
87DestinationListHandler::removeDestinationFromList(const InetAddress& ia,
88tpport_t dataPort, tpport_t controlPort)
89{
90 bool result = false;
91 writeLockDestinationList();
92 TransportAddress* tmp;
93 for (std::list<TransportAddress*>::iterator i = destList.begin();
94 destList.end() != i && !result; ) {
95 tmp = *i;
96 if ( ia == tmp->getNetworkAddress() &&
97 dataPort == tmp->getDataTransportPort() &&
98 controlPort == tmp->getControlTransportPort() ) {
99 // matches. -> remove it.
100 result = true;
101 destList.erase(i);
102 delete tmp;
103 } else{
104 i++;
105 }
106 }
107 unlockDestinationList();
108 return result;
109}
110
111#ifdef CCXX_IPV6
112
113DestinationListHandlerIPV6::DestinationListHandlerIPV6() :
114destListIPV6(), destinationLock()
115{}
116
117DestinationListHandlerIPV6::~DestinationListHandlerIPV6()
118{
119 TransportAddressIPV6* tmp = NULL;
120 writeLockDestinationListIPV6();
121 for (std::list<TransportAddressIPV6*>::iterator i = destListIPV6.begin();
122 destListIPV6.end() != i; i++) {
123 tmp = *i;
124#ifdef CCXX_EXCEPTIONS
125 try {
126#endif
127 delete tmp;
128#ifdef CCXX_EXCEPTIONS
129 } catch (...) {}
130#endif
131 }
132 unlockDestinationListIPV6();
133}
134
135bool
136DestinationListHandlerIPV6::addDestinationToListIPV6(const IPV6Address& ia,
137tpport_t data, tpport_t control)
138{
139 TransportAddressIPV6* addr = new TransportAddressIPV6(ia,data,control);
140 writeLockDestinationListIPV6();
141 destListIPV6.push_back(addr);
142 unlockDestinationListIPV6();
143 return true;
144}
145
146bool
147DestinationListHandlerIPV6::removeDestinationFromListIPV6(const IPV6Address& ia,
148tpport_t dataPort, tpport_t controlPort)
149{
150 bool result = false;
151 writeLockDestinationListIPV6();
152 TransportAddressIPV6* tmp;
153 for (std::list<TransportAddressIPV6*>::iterator i = destListIPV6.begin();
154 destListIPV6.end() != i && !result; ) {
155 tmp = *i;
156 if ( ia == tmp->getNetworkAddress() &&
157 dataPort == tmp->getDataTransportPort() &&
158 controlPort == tmp->getControlTransportPort() ) {
159 // matches. -> remove it.
160 result = true;
161 destListIPV6.erase(i);
162 delete tmp;
163 } else {
164 i++;
165 }
166 }
167 unlockDestinationListIPV6();
168 return result;
169}
170
171
172#endif
173
174/// Schedule at 8 ms.
175const microtimeout_t OutgoingDataQueue::defaultSchedulingTimeout = 8000;
176/// Packets unsent will expire after 40 ms.
177const microtimeout_t OutgoingDataQueue::defaultExpireTimeout = 40000;
178
179OutgoingDataQueue::OutgoingDataQueue() :
180OutgoingDataQueueBase(),
181#ifdef CCXX_IPV6
182DestinationListHandlerIPV6(),
183#endif
184DestinationListHandler(), sendLock(), sendFirst(NULL), sendLast(NULL)
185{
186 setInitialTimestamp(random32());
187 setSchedulingTimeout(getDefaultSchedulingTimeout());
188 setExpireTimeout(getDefaultExpireTimeout());
189
190 sendInfo.packetCount = 0;
191 sendInfo.octetCount = 0;
192 sendInfo.sendSeq = random16(); // random initial sequence number
193 sendInfo.sendCC = 0; // initially, 0 CSRC identifiers follow the fixed heade
194 sendInfo.paddinglen = 0; // do not add padding bits.
195 sendInfo.marked = false;
196 sendInfo.complete = true;
197 // the local source is the first contributing source
198 sendInfo.sendSources[0] = getLocalSSRC();
199 // this will be an accumulator for the successive cycles of timestamp
200 sendInfo.overflowTime.tv_sec = getInitialTime().tv_sec;
201 sendInfo.overflowTime.tv_usec = getInitialTime().tv_usec;
202}
203
204void
205OutgoingDataQueue::purgeOutgoingQueue()
206{
207 OutgoingRTPPktLink* sendnext;
208 // flush the sending queue (delete outgoing packets
209 // unsent so far)
210 sendLock.writeLock();
211 while ( sendFirst ) {
212 sendnext = sendFirst->getNext();
213 delete sendFirst;
214 sendFirst = sendnext;
215 }
216 sendLast = NULL;
217 sendLock.unlock();
218}
219
220bool
221OutgoingDataQueue::addDestination(const InetHostAddress& ia,
222tpport_t dataPort, tpport_t controlPort)
223{
224 if ( 0 == controlPort )
225 controlPort = dataPort + 1;
226 bool result = addDestinationToList(ia,dataPort,controlPort);
227 if ( result && isSingleDestination() ) {
228 setDataPeer(ia,dataPort);
229 setControlPeer(ia,controlPort);
230 }
231 return result;
232}
233
234bool
235OutgoingDataQueue::addDestination(const InetMcastAddress& ia,
236tpport_t dataPort, tpport_t controlPort)
237{
238 if ( 0 == controlPort )
239 controlPort = dataPort + 1;
240 bool result = addDestinationToList(ia,dataPort,controlPort);
241 if ( result && isSingleDestination() ) {
242 setDataPeer(ia,dataPort);
243 setControlPeer(ia,controlPort);
244 }
245 return result;
246}
247
248bool
249OutgoingDataQueue::forgetDestination(const InetHostAddress& ia,
250tpport_t dataPort, tpport_t controlPort)
251{
252 if ( 0 == controlPort )
253 controlPort = dataPort + 1;
254 return DestinationListHandler::
255 removeDestinationFromList(ia,dataPort,controlPort);
256}
257
258bool
259OutgoingDataQueue::forgetDestination(const InetMcastAddress& ia,
260tpport_t dataPort, tpport_t controlPort)
261{
262 if ( 0 == controlPort )
263 controlPort = dataPort + 1;
264 return DestinationListHandler::
265 removeDestinationFromList(ia,dataPort,controlPort);
266}
267
268#ifdef CCXX_IPV6
269bool
270OutgoingDataQueue::addDestination(const IPV6Address& ia,
271tpport_t dataPort, tpport_t controlPort)
272{
273 if ( 0 == controlPort )
274 controlPort = dataPort + 1;
275 bool result = addDestinationToListIPV6(ia,dataPort,controlPort);
276 if ( result && isSingleDestinationIPV6() ) {
277 setDataPeerIPV6(ia,dataPort);
278 setControlPeerIPV6(ia,controlPort);
279 }
280 return result;
281}
282
283bool
284OutgoingDataQueue::forgetDestination(const IPV6Address& ia,
285tpport_t dataPort, tpport_t controlPort)
286{
287 if ( 0 == controlPort )
288 controlPort = dataPort + 1;
289 return DestinationListHandlerIPV6::
290 removeDestinationFromListIPV6(ia,dataPort,controlPort);
291}
292
293#endif
294
295bool
296OutgoingDataQueue::isSending(void) const
297{
298 if(sendFirst)
299 return true;
300
301 return false;
302}
303
304microtimeout_t
305OutgoingDataQueue::getSchedulingTimeout(void)
306{
307 struct timeval send, now;
308 uint32 rate;
309 uint32 rem;
310
311 for(;;) {
312 // if there is no packet to send, use the default scheduling
313 // timeout
314 if( !sendFirst )
315 return schedulingTimeout;
316
317 uint32 stamp = sendFirst->getPacket()->getTimestamp();
318 stamp -= getInitialTimestamp();
319 rate = getCurrentRTPClockRate();
320
321 // now we want to get in <code>send</code> _when_ the
322 // packet is scheduled to be sent.
323
324 // translate timestamp to timeval
325 send.tv_sec = stamp / rate;
326 rem = stamp % rate;
327 send.tv_usec = (1000ul*rem) / (rate/1000ul); // 10^6 * rem/rate
328
329 // add timevals. Overflow holds the inital time
330 // plus the time accumulated through successive
331 // overflows of timestamp. See below.
332 timeradd(&send,&(sendInfo.overflowTime),&send);
333 gettimeofday(&now, NULL);
334
335 // Problem: when timestamp overflows, time goes back.
336 // We MUST ensure that _send_ is not too lower than
337 // _now_, otherwise, we MUST keep how many time was
338 // lost because of overflow. We assume that _send_
339 // 5000 seconds lower than now suggests timestamp
340 // overflow. (Remember than the 32 bits of the
341 // timestamp field are 47722 seconds under a sampling
342 // clock of 90000 hz.) This is not a perfect
343 // solution. Disorderedly timestamped packets coming
344 // after an overflowed one will be wrongly
345 // corrected. Nevertheless, this may only corrupt a
346 // handful of those packets every more than 13 hours
347 // (if timestamp started from 0).
348 if ( now.tv_sec - send.tv_sec > 5000) {
349 timeval overflow;
350 overflow.tv_sec =(~static_cast<uint32>(0)) / rate;
351 overflow.tv_usec = (~static_cast<uint32>(0)) % rate *
352 1000000ul / rate;
353 do {
354 timeradd(&send,&overflow,&send);
355 timeradd(&(sendInfo.overflowTime),&overflow,
356 &(sendInfo.overflowTime));
357 } while ( now.tv_sec - send.tv_sec > 5000 );
358 }
359
360 // This tries to solve the aforementioned problem
361 // about disordered packets coming after an overflowed
362 // one. Now we apply the reverse idea.
363 if ( send.tv_sec - now.tv_sec > 20000 ) {
364 timeval overflow;
365 overflow.tv_sec = (~static_cast<uint32>(0)) / rate;
366 overflow.tv_usec = (~static_cast<uint32>(0)) % rate *
367 1000000ul / rate;
368 timersub(&send,&overflow,&send);
369 }
370
371 // A: This sets a maximum timeout of 1 hour.
372 if ( send.tv_sec - now.tv_sec > 3600 ) {
373 return 3600000000ul;
374 }
375 int32 diff =
376 ((send.tv_sec - now.tv_sec) * 1000000ul) +
377 send.tv_usec - now.tv_usec;
378 // B: wait <code>diff</code> usecs more before sending
379 if ( diff >= 0 ) {
380 return static_cast<microtimeout_t>(diff);
381 }
382
383 // C: the packet must be sent right now
384 if ( (diff < 0) &&
385 static_cast<microtimeout_t>(-diff) <= getExpireTimeout() ) {
386 return 0;
387 }
388
389 // D: the packet has expired -> delete it.
390 sendLock.writeLock();
391 OutgoingRTPPktLink* packet = sendFirst;
392 sendFirst = sendFirst->getNext();
393 onExpireSend(*(packet->getPacket())); // new virtual to notify
394 delete packet;
395 if ( sendFirst )
396 sendFirst->setPrev(NULL);
397 else
398 sendLast = NULL;
399 sendLock.unlock();
400 }
401 I( false );
402 return 0;
403}
404
405void
406OutgoingDataQueue::putData(uint32 stamp, const unsigned char *data, size_t datalen)
407{
408 if ( !data || !datalen )
409 return;
410
411 size_t step = 0, offset = 0;
412 while ( offset < datalen ) {
413 // remainder and step take care of segmentation
414 // according to getMaxSendSegmentSize()
415 size_t remainder = datalen - offset;
416 step = ( remainder > getMaxSendSegmentSize() ) ?
417 getMaxSendSegmentSize() : remainder;
418
419 CryptoContext* pcc = getOutQueueCryptoContext(getLocalSSRC());
420 if (pcc == NULL) {
421 pcc = getOutQueueCryptoContext(0);
422 if (pcc != NULL) {
423 pcc = pcc->newCryptoContextForSSRC(getLocalSSRC(), 0, 0L);
424 if (pcc != NULL) {
425 pcc->deriveSrtpKeys(0);
426 setOutQueueCryptoContext(pcc);
427 }
428 }
429 }
430 OutgoingRTPPkt* packet;
431 if ( sendInfo.sendCC )
432 packet = new OutgoingRTPPkt(sendInfo.sendSources,15,data + offset,step, sendInfo.paddinglen, pcc);
433 else
434 packet = new OutgoingRTPPkt(data + offset,step,sendInfo.paddinglen, pcc);
435
436 packet->setPayloadType(getCurrentPayloadType());
437 packet->setSeqNum(sendInfo.sendSeq++);
438 packet->setTimestamp(stamp + getInitialTimestamp());
439
440 packet->setSSRCNetwork(getLocalSSRCNetwork());
441 if ( (0 == offset) && getMark() ) {
442 packet->setMarker(true);
443 setMark(false);
444 } else {
445 packet->setMarker(false);
446 }
447 if (pcc != NULL) {
448 packet->protect(getLocalSSRC(), pcc);
449 }
450 // insert the packet into the "tail" of the sending queue
451 sendLock.writeLock();
452 OutgoingRTPPktLink *link =
453 new OutgoingRTPPktLink(packet,sendLast,NULL);
454 if (sendLast)
455 sendLast->setNext(link);
456 else
457 sendFirst = link;
458 sendLast = link;
459 sendLock.unlock();
460
461 offset += step;
462 }
463}
464
465void
466OutgoingDataQueue::sendImmediate(uint32 stamp, const unsigned char *data, size_t datalen)
467{
468 if ( !data || !datalen )
469 return;
470
471 size_t step = 0, offset = 0;
472 while ( offset < datalen ) {
473 // remainder and step take care of segmentation
474 // according to getMaxSendSegmentSize()
475 size_t remainder = datalen - offset;
476 step = ( remainder > getMaxSendSegmentSize() ) ?
477 getMaxSendSegmentSize() : remainder;
478
479 CryptoContext* pcc = getOutQueueCryptoContext(getLocalSSRC());
480
481 OutgoingRTPPkt* packet;
482 if ( sendInfo.sendCC )
483 packet = new OutgoingRTPPkt(sendInfo.sendSources,15,data + offset,step,sendInfo.paddinglen, pcc);
484 else
485 packet = new OutgoingRTPPkt(data + offset,step,sendInfo.paddinglen, pcc);
486
487 packet->setPayloadType(getCurrentPayloadType());
488 packet->setSeqNum(sendInfo.sendSeq++);
489 packet->setTimestamp(stamp + getInitialTimestamp());
490 packet->setSSRCNetwork(getLocalSSRCNetwork());
491
492 if ( (0 == offset) && getMark() ) {
493 packet->setMarker(true);
494 setMark(false);
495 } else {
496 packet->setMarker(false);
497 }
498 if (pcc != NULL) {
499 packet->protect(getLocalSSRC(), pcc);
500 }
501 dispatchImmediate(packet);
502 delete packet;
503 offset += step;
504 }
505}
506
507void OutgoingDataQueue::dispatchImmediate(OutgoingRTPPkt *packet)
508{
509 lockDestinationList();
510 if ( isSingleDestination() ) {
511 TransportAddress* tmp = destList.front();
512 // if going from multi destinations to single destinations.
513 setDataPeer(tmp->getNetworkAddress(), tmp->getDataTransportPort());
514
515 sendData(packet->getRawPacket(), packet->getRawPacketSizeSrtp());
516 } else {
517 // when no destination has been added, NULL == dest.
518 for (std::list<TransportAddress*>::iterator i = destList.begin(); destList.end() != i; i++) {
519 TransportAddress* dest = *i;
520 setDataPeer(dest->getNetworkAddress(), dest->getDataTransportPort());
521 sendData(packet->getRawPacket(), packet->getRawPacketSizeSrtp());
522 }
523 }
524 unlockDestinationList();
525
526#ifdef CCXX_IPV6
527 lockDestinationListIPV6();
528 if ( isSingleDestinationIPV6() ) {
529 TransportAddressIPV6* tmp6 = destListIPV6.front();
530 // if going from multi destinations to single destinations.
531 setDataPeerIPV6(tmp6->getNetworkAddress(),
532 tmp6->getDataTransportPort());
533
534 sendDataIPV6(packet->getRawPacket(),
535 packet->getRawPacketSizeSrtp());
536 } else {
537 // when no destination has been added, NULL == dest.
538 for (std::list<TransportAddressIPV6*>::iterator i6 = destListIPV6.begin(); destListIPV6.end() != i6; i6++) {
539 TransportAddressIPV6* dest6 = *i6;
540 setDataPeerIPV6(dest6->getNetworkAddress(),
541 dest6->getDataTransportPort());
542 sendDataIPV6(packet->getRawPacket(),
543 packet->getRawPacketSizeSrtp());
544 }
545 }
546 unlockDestinationListIPV6();
547#endif
548}
549
550size_t
551OutgoingDataQueue::dispatchDataPacket(void)
552{
553 sendLock.writeLock();
554 OutgoingRTPPktLink* packetLink = sendFirst;
555
556 if ( !packetLink ){
557 sendLock.unlock();
558 return 0;
559 }
560
561 OutgoingRTPPkt* packet = packetLink->getPacket();
562 uint32 rtn = packet->getPayloadSize();
563 dispatchImmediate(packet);
564
565 // unlink the sent packet from the queue and destroy it. Also
566 // record the sending.
567 sendFirst = sendFirst->getNext();
568 if ( sendFirst ) {
569 sendFirst->setPrev(NULL);
570 } else {
571 sendLast = NULL;
572 }
573 // for general accounting and RTCP SR statistics
574 sendInfo.packetCount++;
575 sendInfo.octetCount += packet->getPayloadSize();
576 delete packetLink;
577
578 sendLock.unlock();
579 return rtn;
580}
581
582size_t
583OutgoingDataQueue::setPartial(uint32 stamp, unsigned char *data,
584size_t offset, size_t max)
585{
586 sendLock.writeLock();
587 OutgoingRTPPktLink* packetLink = sendFirst;
588 while ( packetLink )
589 {
590 uint32 pstamp = packetLink->getPacket()->getTimestamp();
591 if ( pstamp > stamp )
592 packetLink = NULL;
593 if ( pstamp >= stamp )
594 break;
595
596 packetLink = packetLink->getNext();
597 }
598 if ( !packetLink ) {
599 sendLock.unlock();
600 return 0;
601 }
602
603 OutgoingRTPPkt* packet = packetLink->getPacket();
604 if ( offset >= packet->getPayloadSize() )
605 return 0;
606
607 if ( max > packet->getPayloadSize() - offset )
608 max = packet->getPayloadSize() - offset;
609
610 memcpy((unsigned char*)(packet->getPayload()) + offset,
611 data, max);
612 sendLock.unlock();
613 return max;
614}
615
616void
617OutgoingDataQueue::setOutQueueCryptoContext(CryptoContext* cc)
618{
619 std::list<CryptoContext *>::iterator i;
620
621 MutexLock lock(cryptoMutex);
622 // check if a CryptoContext for a SSRC already exists. If yes
623 // remove it from list before inserting the new one.
624 for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ) {
625 if( (*i)->getSsrc() == cc->getSsrc() ) {
626 CryptoContext* tmp = *i;
627 cryptoContexts.erase(i);
628 delete tmp;
629 break;
630 }
631 }
632 cryptoContexts.push_back(cc);
633}
634
635void
636OutgoingDataQueue::removeOutQueueCryptoContext(CryptoContext* cc)
637{
638 std::list<CryptoContext *>::iterator i;
639
640 MutexLock lock(cryptoMutex);
641 if (cc == NULL) { // Remove any incoming crypto contexts
642 for (i = cryptoContexts.begin(); i != cryptoContexts.end(); ) {
643 CryptoContext* tmp = *i;
644 i = cryptoContexts.erase(i);
645 delete tmp;
646 }
647 }
648 else {
649 for( i = cryptoContexts.begin(); i != cryptoContexts.end(); i++ ) {
650 if( (*i)->getSsrc() == cc->getSsrc() ) {
651 CryptoContext* tmp = *i;
652 cryptoContexts.erase(i);
653 delete tmp;
654 return;
655 }
656 }
657 }
658}
659
660CryptoContext*
661OutgoingDataQueue::getOutQueueCryptoContext(uint32 ssrc)
662{
663 std::list<CryptoContext *>::iterator i;
664
665 MutexLock lock(cryptoMutex);
666 for( i = cryptoContexts.begin(); i != cryptoContexts.end(); i++ ){
667 if( (*i)->getSsrc() == ssrc) {
668 return (*i);
669 }
670 }
671 return NULL;
672}
673
674#ifdef CCXX_NAMESPACES
675}
676#endif
677
678/** EMACS **
679 * Local variables:
680 * mode: c++
681 * c-basic-offset: 4
682 * End:
683 */