blob: 60feaedf4e20b5b761338784e3ed27642f849ed0 [file] [log] [blame]
Emeric Vigier2f625822012-08-06 11:09:52 -04001// Copyright (C) 2001,2002,2004,2005 Federico Montesino Pouzols <fedemp@altern.org>
2//
3// This program is free software; you can redistribute it and/or modify
4// it under the terms of the GNU General Public License as published by
5// the Free Software Foundation; either version 2 of the License, or
6// (at your option) any later version.
7//
8// This program is distributed in the hope that it will be useful,
9// but WITHOUT ANY WARRANTY; without even the implied warranty of
10// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11// GNU General Public License for more details.
12//
13// You should have received a copy of the GNU General Public License
14// along with this program; if not, write to the Free Software
15// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
16//
17// As a special exception, you may use this file as part of a free software
18// library without restriction. Specifically, if other files instantiate
19// templates or use macros or inline functions from this file, or you compile
20// this file and link it with other files to produce an executable, this
21// file does not by itself cause the resulting executable to be covered by
22// the GNU General Public License. This exception does not however
23// invalidate any other reasons why the executable file might be covered by
24// the GNU General Public License.
25//
26// This exception applies only to the code released under the name GNU
27// ccRTP. If you copy code from other releases into a copy of GNU
28// ccRTP, as the General Public License permits, the exception does
29// not apply to the code that you add in this way. To avoid misleading
30// anyone as to the status of such modified files, you must delete
31// this exception notice from them.
32//
33// If you write modifications of your own for GNU ccRTP, it is your choice
34// whether to permit this exception to apply to your modifications.
35// If you do not wish that, delete this exception notice.
36//
37
38#include "private.h"
39#include <ccrtp/oqueue.h>
40
Alexandre Lisionddd731e2014-01-31 11:50:08 -050041NAMESPACE_COMMONCPP
Emeric Vigier2f625822012-08-06 11:09:52 -040042
43const size_t OutgoingDataQueueBase::defaultMaxSendSegmentSize = 65536;
44
45OutgoingDataQueueBase::OutgoingDataQueueBase()
46{
47 // segment data in packets of no more than 65536 octets.
48 setMaxSendSegmentSize(getDefaultMaxSendSegmentSize());
49}
50
51DestinationListHandler::DestinationListHandler() :
52destList(), destinationLock()
53{}
54
55DestinationListHandler::~DestinationListHandler()
56{
57 TransportAddress* tmp = NULL;
58 writeLockDestinationList();
59 for (std::list<TransportAddress*>::iterator i = destList.begin();
60 destList.end() != i; i++) {
61 tmp = *i;
62#ifdef CCXX_EXCEPTIONS
63 try {
64#endif
65 delete tmp;
66#ifdef CCXX_EXCEPTIONS
67 } catch (...) {}
68#endif
69 }
70 unlockDestinationList();
71}
72
73bool
74DestinationListHandler::addDestinationToList(const InetAddress& ia,
75tpport_t data, tpport_t control)
76{
77 TransportAddress* addr = new TransportAddress(ia,data,control);
78 writeLockDestinationList();
79 destList.push_back(addr);
80 unlockDestinationList();
81 return true;
82}
83
84bool
85DestinationListHandler::removeDestinationFromList(const InetAddress& ia,
86tpport_t dataPort, tpport_t controlPort)
87{
88 bool result = false;
89 writeLockDestinationList();
90 TransportAddress* tmp;
91 for (std::list<TransportAddress*>::iterator i = destList.begin();
92 destList.end() != i && !result; ) {
93 tmp = *i;
94 if ( ia == tmp->getNetworkAddress() &&
95 dataPort == tmp->getDataTransportPort() &&
96 controlPort == tmp->getControlTransportPort() ) {
97 // matches. -> remove it.
98 result = true;
99 destList.erase(i);
100 delete tmp;
101 } else{
102 i++;
103 }
104 }
105 unlockDestinationList();
106 return result;
107}
108
109#ifdef CCXX_IPV6
110
111DestinationListHandlerIPV6::DestinationListHandlerIPV6() :
112destListIPV6(), destinationLock()
113{}
114
115DestinationListHandlerIPV6::~DestinationListHandlerIPV6()
116{
117 TransportAddressIPV6* tmp = NULL;
118 writeLockDestinationListIPV6();
119 for (std::list<TransportAddressIPV6*>::iterator i = destListIPV6.begin();
120 destListIPV6.end() != i; i++) {
121 tmp = *i;
122#ifdef CCXX_EXCEPTIONS
123 try {
124#endif
125 delete tmp;
126#ifdef CCXX_EXCEPTIONS
127 } catch (...) {}
128#endif
129 }
130 unlockDestinationListIPV6();
131}
132
133bool
134DestinationListHandlerIPV6::addDestinationToListIPV6(const IPV6Address& ia,
135tpport_t data, tpport_t control)
136{
137 TransportAddressIPV6* addr = new TransportAddressIPV6(ia,data,control);
138 writeLockDestinationListIPV6();
139 destListIPV6.push_back(addr);
140 unlockDestinationListIPV6();
141 return true;
142}
143
144bool
145DestinationListHandlerIPV6::removeDestinationFromListIPV6(const IPV6Address& ia,
146tpport_t dataPort, tpport_t controlPort)
147{
148 bool result = false;
149 writeLockDestinationListIPV6();
150 TransportAddressIPV6* tmp;
151 for (std::list<TransportAddressIPV6*>::iterator i = destListIPV6.begin();
152 destListIPV6.end() != i && !result; ) {
153 tmp = *i;
154 if ( ia == tmp->getNetworkAddress() &&
155 dataPort == tmp->getDataTransportPort() &&
156 controlPort == tmp->getControlTransportPort() ) {
157 // matches. -> remove it.
158 result = true;
159 destListIPV6.erase(i);
160 delete tmp;
161 } else {
162 i++;
163 }
164 }
165 unlockDestinationListIPV6();
166 return result;
167}
168
169
170#endif
171
172/// Schedule at 8 ms.
173const microtimeout_t OutgoingDataQueue::defaultSchedulingTimeout = 8000;
174/// Packets unsent will expire after 40 ms.
175const microtimeout_t OutgoingDataQueue::defaultExpireTimeout = 40000;
176
177OutgoingDataQueue::OutgoingDataQueue() :
178OutgoingDataQueueBase(),
179#ifdef CCXX_IPV6
180DestinationListHandlerIPV6(),
181#endif
182DestinationListHandler(), sendLock(), sendFirst(NULL), sendLast(NULL)
183{
184 setInitialTimestamp(random32());
185 setSchedulingTimeout(getDefaultSchedulingTimeout());
186 setExpireTimeout(getDefaultExpireTimeout());
187
188 sendInfo.packetCount = 0;
189 sendInfo.octetCount = 0;
190 sendInfo.sendSeq = random16(); // random initial sequence number
191 sendInfo.sendCC = 0; // initially, 0 CSRC identifiers follow the fixed heade
192 sendInfo.paddinglen = 0; // do not add padding bits.
193 sendInfo.marked = false;
194 sendInfo.complete = true;
195 // the local source is the first contributing source
196 sendInfo.sendSources[0] = getLocalSSRC();
197 // this will be an accumulator for the successive cycles of timestamp
198 sendInfo.overflowTime.tv_sec = getInitialTime().tv_sec;
199 sendInfo.overflowTime.tv_usec = getInitialTime().tv_usec;
200}
201
202void
203OutgoingDataQueue::purgeOutgoingQueue()
204{
205 OutgoingRTPPktLink* sendnext;
206 // flush the sending queue (delete outgoing packets
207 // unsent so far)
208 sendLock.writeLock();
209 while ( sendFirst ) {
210 sendnext = sendFirst->getNext();
211 delete sendFirst;
212 sendFirst = sendnext;
213 }
214 sendLast = NULL;
215 sendLock.unlock();
216}
217
218bool
219OutgoingDataQueue::addDestination(const InetHostAddress& ia,
220tpport_t dataPort, tpport_t controlPort)
221{
222 if ( 0 == controlPort )
223 controlPort = dataPort + 1;
224 bool result = addDestinationToList(ia,dataPort,controlPort);
225 if ( result && isSingleDestination() ) {
226 setDataPeer(ia,dataPort);
227 setControlPeer(ia,controlPort);
228 }
229 return result;
230}
231
232bool
233OutgoingDataQueue::addDestination(const InetMcastAddress& ia,
234tpport_t dataPort, tpport_t controlPort)
235{
236 if ( 0 == controlPort )
237 controlPort = dataPort + 1;
238 bool result = addDestinationToList(ia,dataPort,controlPort);
239 if ( result && isSingleDestination() ) {
240 setDataPeer(ia,dataPort);
241 setControlPeer(ia,controlPort);
242 }
243 return result;
244}
245
246bool
247OutgoingDataQueue::forgetDestination(const InetHostAddress& ia,
248tpport_t dataPort, tpport_t controlPort)
249{
250 if ( 0 == controlPort )
251 controlPort = dataPort + 1;
252 return DestinationListHandler::
253 removeDestinationFromList(ia,dataPort,controlPort);
254}
255
256bool
257OutgoingDataQueue::forgetDestination(const InetMcastAddress& ia,
258tpport_t dataPort, tpport_t controlPort)
259{
260 if ( 0 == controlPort )
261 controlPort = dataPort + 1;
262 return DestinationListHandler::
263 removeDestinationFromList(ia,dataPort,controlPort);
264}
265
266#ifdef CCXX_IPV6
267bool
268OutgoingDataQueue::addDestination(const IPV6Address& ia,
269tpport_t dataPort, tpport_t controlPort)
270{
271 if ( 0 == controlPort )
272 controlPort = dataPort + 1;
273 bool result = addDestinationToListIPV6(ia,dataPort,controlPort);
274 if ( result && isSingleDestinationIPV6() ) {
275 setDataPeerIPV6(ia,dataPort);
276 setControlPeerIPV6(ia,controlPort);
277 }
278 return result;
279}
280
281bool
282OutgoingDataQueue::forgetDestination(const IPV6Address& ia,
283tpport_t dataPort, tpport_t controlPort)
284{
285 if ( 0 == controlPort )
286 controlPort = dataPort + 1;
287 return DestinationListHandlerIPV6::
288 removeDestinationFromListIPV6(ia,dataPort,controlPort);
289}
290
291#endif
292
293bool
294OutgoingDataQueue::isSending(void) const
295{
296 if(sendFirst)
297 return true;
298
299 return false;
300}
301
302microtimeout_t
303OutgoingDataQueue::getSchedulingTimeout(void)
304{
305 struct timeval send, now;
306 uint32 rate;
307 uint32 rem;
308
309 for(;;) {
310 // if there is no packet to send, use the default scheduling
311 // timeout
312 if( !sendFirst )
313 return schedulingTimeout;
314
315 uint32 stamp = sendFirst->getPacket()->getTimestamp();
316 stamp -= getInitialTimestamp();
317 rate = getCurrentRTPClockRate();
318
319 // now we want to get in <code>send</code> _when_ the
320 // packet is scheduled to be sent.
321
322 // translate timestamp to timeval
323 send.tv_sec = stamp / rate;
324 rem = stamp % rate;
325 send.tv_usec = (1000ul*rem) / (rate/1000ul); // 10^6 * rem/rate
326
327 // add timevals. Overflow holds the inital time
328 // plus the time accumulated through successive
329 // overflows of timestamp. See below.
330 timeradd(&send,&(sendInfo.overflowTime),&send);
Alexandre Lisionddd731e2014-01-31 11:50:08 -0500331 SysTime::gettimeofday(&now, NULL);
Emeric Vigier2f625822012-08-06 11:09:52 -0400332
333 // Problem: when timestamp overflows, time goes back.
334 // We MUST ensure that _send_ is not too lower than
335 // _now_, otherwise, we MUST keep how many time was
336 // lost because of overflow. We assume that _send_
337 // 5000 seconds lower than now suggests timestamp
338 // overflow. (Remember than the 32 bits of the
339 // timestamp field are 47722 seconds under a sampling
340 // clock of 90000 hz.) This is not a perfect
341 // solution. Disorderedly timestamped packets coming
342 // after an overflowed one will be wrongly
343 // corrected. Nevertheless, this may only corrupt a
344 // handful of those packets every more than 13 hours
345 // (if timestamp started from 0).
346 if ( now.tv_sec - send.tv_sec > 5000) {
347 timeval overflow;
348 overflow.tv_sec =(~static_cast<uint32>(0)) / rate;
349 overflow.tv_usec = (~static_cast<uint32>(0)) % rate *
350 1000000ul / rate;
351 do {
352 timeradd(&send,&overflow,&send);
353 timeradd(&(sendInfo.overflowTime),&overflow,
354 &(sendInfo.overflowTime));
355 } while ( now.tv_sec - send.tv_sec > 5000 );
356 }
357
358 // This tries to solve the aforementioned problem
359 // about disordered packets coming after an overflowed
360 // one. Now we apply the reverse idea.
361 if ( send.tv_sec - now.tv_sec > 20000 ) {
362 timeval overflow;
363 overflow.tv_sec = (~static_cast<uint32>(0)) / rate;
364 overflow.tv_usec = (~static_cast<uint32>(0)) % rate *
365 1000000ul / rate;
366 timersub(&send,&overflow,&send);
367 }
368
369 // A: This sets a maximum timeout of 1 hour.
370 if ( send.tv_sec - now.tv_sec > 3600 ) {
371 return 3600000000ul;
372 }
373 int32 diff =
374 ((send.tv_sec - now.tv_sec) * 1000000ul) +
375 send.tv_usec - now.tv_usec;
376 // B: wait <code>diff</code> usecs more before sending
377 if ( diff >= 0 ) {
378 return static_cast<microtimeout_t>(diff);
379 }
380
381 // C: the packet must be sent right now
382 if ( (diff < 0) &&
383 static_cast<microtimeout_t>(-diff) <= getExpireTimeout() ) {
384 return 0;
385 }
386
387 // D: the packet has expired -> delete it.
388 sendLock.writeLock();
389 OutgoingRTPPktLink* packet = sendFirst;
390 sendFirst = sendFirst->getNext();
391 onExpireSend(*(packet->getPacket())); // new virtual to notify
392 delete packet;
393 if ( sendFirst )
394 sendFirst->setPrev(NULL);
395 else
396 sendLast = NULL;
397 sendLock.unlock();
398 }
399 I( false );
400 return 0;
401}
402
403void
404OutgoingDataQueue::putData(uint32 stamp, const unsigned char *data, size_t datalen)
405{
406 if ( !data || !datalen )
407 return;
408
409 size_t step = 0, offset = 0;
410 while ( offset < datalen ) {
411 // remainder and step take care of segmentation
412 // according to getMaxSendSegmentSize()
413 size_t remainder = datalen - offset;
414 step = ( remainder > getMaxSendSegmentSize() ) ?
415 getMaxSendSegmentSize() : remainder;
416
417 CryptoContext* pcc = getOutQueueCryptoContext(getLocalSSRC());
418 if (pcc == NULL) {
419 pcc = getOutQueueCryptoContext(0);
420 if (pcc != NULL) {
421 pcc = pcc->newCryptoContextForSSRC(getLocalSSRC(), 0, 0L);
422 if (pcc != NULL) {
423 pcc->deriveSrtpKeys(0);
424 setOutQueueCryptoContext(pcc);
425 }
426 }
427 }
428 OutgoingRTPPkt* packet;
429 if ( sendInfo.sendCC )
430 packet = new OutgoingRTPPkt(sendInfo.sendSources,15,data + offset,step, sendInfo.paddinglen, pcc);
431 else
432 packet = new OutgoingRTPPkt(data + offset,step,sendInfo.paddinglen, pcc);
433
434 packet->setPayloadType(getCurrentPayloadType());
435 packet->setSeqNum(sendInfo.sendSeq++);
436 packet->setTimestamp(stamp + getInitialTimestamp());
437
438 packet->setSSRCNetwork(getLocalSSRCNetwork());
439 if ( (0 == offset) && getMark() ) {
440 packet->setMarker(true);
441 setMark(false);
442 } else {
443 packet->setMarker(false);
444 }
445 if (pcc != NULL) {
446 packet->protect(getLocalSSRC(), pcc);
447 }
448 // insert the packet into the "tail" of the sending queue
449 sendLock.writeLock();
450 OutgoingRTPPktLink *link =
451 new OutgoingRTPPktLink(packet,sendLast,NULL);
452 if (sendLast)
453 sendLast->setNext(link);
454 else
455 sendFirst = link;
456 sendLast = link;
457 sendLock.unlock();
458
459 offset += step;
460 }
461}
462
463void
464OutgoingDataQueue::sendImmediate(uint32 stamp, const unsigned char *data, size_t datalen)
465{
466 if ( !data || !datalen )
467 return;
468
469 size_t step = 0, offset = 0;
470 while ( offset < datalen ) {
471 // remainder and step take care of segmentation
472 // according to getMaxSendSegmentSize()
473 size_t remainder = datalen - offset;
474 step = ( remainder > getMaxSendSegmentSize() ) ?
475 getMaxSendSegmentSize() : remainder;
476
477 CryptoContext* pcc = getOutQueueCryptoContext(getLocalSSRC());
478
479 OutgoingRTPPkt* packet;
480 if ( sendInfo.sendCC )
481 packet = new OutgoingRTPPkt(sendInfo.sendSources,15,data + offset,step,sendInfo.paddinglen, pcc);
482 else
483 packet = new OutgoingRTPPkt(data + offset,step,sendInfo.paddinglen, pcc);
484
485 packet->setPayloadType(getCurrentPayloadType());
486 packet->setSeqNum(sendInfo.sendSeq++);
487 packet->setTimestamp(stamp + getInitialTimestamp());
488 packet->setSSRCNetwork(getLocalSSRCNetwork());
489
490 if ( (0 == offset) && getMark() ) {
491 packet->setMarker(true);
492 setMark(false);
493 } else {
494 packet->setMarker(false);
495 }
496 if (pcc != NULL) {
497 packet->protect(getLocalSSRC(), pcc);
498 }
499 dispatchImmediate(packet);
500 delete packet;
501 offset += step;
502 }
503}
504
505void OutgoingDataQueue::dispatchImmediate(OutgoingRTPPkt *packet)
506{
507 lockDestinationList();
508 if ( isSingleDestination() ) {
509 TransportAddress* tmp = destList.front();
510 // if going from multi destinations to single destinations.
511 setDataPeer(tmp->getNetworkAddress(), tmp->getDataTransportPort());
512
513 sendData(packet->getRawPacket(), packet->getRawPacketSizeSrtp());
514 } else {
515 // when no destination has been added, NULL == dest.
516 for (std::list<TransportAddress*>::iterator i = destList.begin(); destList.end() != i; i++) {
517 TransportAddress* dest = *i;
518 setDataPeer(dest->getNetworkAddress(), dest->getDataTransportPort());
519 sendData(packet->getRawPacket(), packet->getRawPacketSizeSrtp());
520 }
521 }
522 unlockDestinationList();
523
524#ifdef CCXX_IPV6
525 lockDestinationListIPV6();
526 if ( isSingleDestinationIPV6() ) {
527 TransportAddressIPV6* tmp6 = destListIPV6.front();
528 // if going from multi destinations to single destinations.
529 setDataPeerIPV6(tmp6->getNetworkAddress(),
530 tmp6->getDataTransportPort());
531
532 sendDataIPV6(packet->getRawPacket(),
533 packet->getRawPacketSizeSrtp());
534 } else {
535 // when no destination has been added, NULL == dest.
536 for (std::list<TransportAddressIPV6*>::iterator i6 = destListIPV6.begin(); destListIPV6.end() != i6; i6++) {
537 TransportAddressIPV6* dest6 = *i6;
538 setDataPeerIPV6(dest6->getNetworkAddress(),
539 dest6->getDataTransportPort());
540 sendDataIPV6(packet->getRawPacket(),
541 packet->getRawPacketSizeSrtp());
542 }
543 }
544 unlockDestinationListIPV6();
545#endif
546}
547
548size_t
549OutgoingDataQueue::dispatchDataPacket(void)
550{
551 sendLock.writeLock();
552 OutgoingRTPPktLink* packetLink = sendFirst;
553
554 if ( !packetLink ){
555 sendLock.unlock();
556 return 0;
557 }
558
559 OutgoingRTPPkt* packet = packetLink->getPacket();
560 uint32 rtn = packet->getPayloadSize();
561 dispatchImmediate(packet);
562
563 // unlink the sent packet from the queue and destroy it. Also
564 // record the sending.
565 sendFirst = sendFirst->getNext();
566 if ( sendFirst ) {
567 sendFirst->setPrev(NULL);
568 } else {
569 sendLast = NULL;
570 }
571 // for general accounting and RTCP SR statistics
572 sendInfo.packetCount++;
573 sendInfo.octetCount += packet->getPayloadSize();
574 delete packetLink;
575
576 sendLock.unlock();
577 return rtn;
578}
579
580size_t
581OutgoingDataQueue::setPartial(uint32 stamp, unsigned char *data,
582size_t offset, size_t max)
583{
584 sendLock.writeLock();
585 OutgoingRTPPktLink* packetLink = sendFirst;
586 while ( packetLink )
587 {
588 uint32 pstamp = packetLink->getPacket()->getTimestamp();
Alexandre Lisione41ecd42014-02-27 15:51:10 -0500589 if ( pstamp > stamp ) {
Emeric Vigier2f625822012-08-06 11:09:52 -0400590 packetLink = NULL;
Emeric Vigier2f625822012-08-06 11:09:52 -0400591 break;
Alexandre Lisione41ecd42014-02-27 15:51:10 -0500592 } else if ( pstamp == stamp ) {
593 break;
594 }
Emeric Vigier2f625822012-08-06 11:09:52 -0400595
596 packetLink = packetLink->getNext();
597 }
598 if ( !packetLink ) {
599 sendLock.unlock();
600 return 0;
601 }
602
603 OutgoingRTPPkt* packet = packetLink->getPacket();
604 if ( offset >= packet->getPayloadSize() )
605 return 0;
606
607 if ( max > packet->getPayloadSize() - offset )
608 max = packet->getPayloadSize() - offset;
609
610 memcpy((unsigned char*)(packet->getPayload()) + offset,
611 data, max);
612 sendLock.unlock();
613 return max;
614}
615
616void
617OutgoingDataQueue::setOutQueueCryptoContext(CryptoContext* cc)
618{
619 std::list<CryptoContext *>::iterator i;
620
621 MutexLock lock(cryptoMutex);
622 // check if a CryptoContext for a SSRC already exists. If yes
623 // remove it from list before inserting the new one.
624 for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ) {
625 if( (*i)->getSsrc() == cc->getSsrc() ) {
626 CryptoContext* tmp = *i;
627 cryptoContexts.erase(i);
628 delete tmp;
629 break;
630 }
631 }
632 cryptoContexts.push_back(cc);
633}
634
635void
636OutgoingDataQueue::removeOutQueueCryptoContext(CryptoContext* cc)
637{
638 std::list<CryptoContext *>::iterator i;
639
640 MutexLock lock(cryptoMutex);
641 if (cc == NULL) { // Remove any incoming crypto contexts
642 for (i = cryptoContexts.begin(); i != cryptoContexts.end(); ) {
643 CryptoContext* tmp = *i;
644 i = cryptoContexts.erase(i);
645 delete tmp;
646 }
647 }
648 else {
649 for( i = cryptoContexts.begin(); i != cryptoContexts.end(); i++ ) {
650 if( (*i)->getSsrc() == cc->getSsrc() ) {
651 CryptoContext* tmp = *i;
652 cryptoContexts.erase(i);
653 delete tmp;
654 return;
655 }
656 }
657 }
658}
659
660CryptoContext*
661OutgoingDataQueue::getOutQueueCryptoContext(uint32 ssrc)
662{
663 std::list<CryptoContext *>::iterator i;
664
665 MutexLock lock(cryptoMutex);
666 for( i = cryptoContexts.begin(); i != cryptoContexts.end(); i++ ){
667 if( (*i)->getSsrc() == ssrc) {
668 return (*i);
669 }
670 }
671 return NULL;
672}
673
Alexandre Lisionddd731e2014-01-31 11:50:08 -0500674END_NAMESPACE
Emeric Vigier2f625822012-08-06 11:09:52 -0400675
676/** EMACS **
677 * Local variables:
678 * mode: c++
679 * c-basic-offset: 4
680 * End:
681 */