blob: 11247bd79162d2c5a6454b0d20da3814c3c794e1 [file] [log] [blame]
Emeric Vigier2f625822012-08-06 11:09:52 -04001// Copyright (C) 2000,2001,2004,2005,2006 Federico Montesino Pouzols <fedemp@altern.org>
2//
3// This program is free software; you can redistribute it and/or modify
4// it under the terms of the GNU General Public License as published by
5// the Free Software Foundation; either version 2 of the License, or
6// (at your option) any later version.
7//
8// This program is distributed in the hope that it will be useful,
9// but WITHOUT ANY WARRANTY; without even the implied warranty of
10// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11// GNU General Public License for more details.
12//
13// You should have received a copy of the GNU General Public License
14// along with this program; if not, write to the Free Software
15// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
16//
17// As a special exception, you may use this file as part of a free software
18// library without restriction. Specifically, if other files instantiate
19// templates or use macros or inline functions from this file, or you compile
20// this file and link it with other files to produce an executable, this
21// file does not by itself cause the resulting executable to be covered by
22// the GNU General Public License. This exception does not however
23// invalidate any other reasons why the executable file might be covered by
24// the GNU General Public License.
25//
26// This exception applies only to the code released under the name GNU
27// ccRTP. If you copy code from other releases into a copy of GNU
28// ccRTP, as the General Public License permits, the exception does
29// not apply to the code that you add in this way. To avoid misleading
30// anyone as to the status of such modified files, you must delete
31// this exception notice from them.
32//
33// If you write modifications of your own for GNU ccRTP, it is your choice
34// whether to permit this exception to apply to your modifications.
35// If you do not wish that, delete this exception notice.
36//
37
38#include "private.h"
39#include <ccrtp/pool.h>
40
41#include <algorithm>
42
43#ifdef CCXX_NAMESPACES
44namespace ost {
45using std::list;
46#endif
47
48RTPSessionPool::RTPSessionPool()
49{
50#ifndef WIN32
51 highestSocket = 0;
52 setPoolTimeout(0,3000);
53 FD_ZERO(&recvSocketSet);
54#endif
55}
56
57bool
58RTPSessionPool::addSession(RTPSessionBase& session)
59{
60#ifndef WIN32
61 bool result = false;
62 poolLock.writeLock();
63 // insert in list.
64 PredEquals predEquals(&session);
65 if ( sessionList.end() == std::find_if(sessionList.begin(),sessionList.end(),predEquals) ) {
66 result = true;
67 sessionList.push_back(new SessionListElement(&session));
68 } else {
69 result = false;
70 }
71 poolLock.unlock();
72 return result;
73#else
74 return false;
75#endif
76}
77
78bool
79RTPSessionPool::removeSession(RTPSessionBase& session)
80{
81#ifndef WIN32
82 bool result = false;
83 poolLock.writeLock();
84 // remove from list.
85 PredEquals predEquals(&session);
86 PoolIterator i;
87 if ( sessionList.end() != (i = find_if(sessionList.begin(),sessionList.end(),predEquals)) ) {
88 (*i)->clear();
89 result = true;
90 } else {
91 result = false;
92 }
93 poolLock.unlock();
94 return result;
95#else
96 return false;
97#endif
98}
99
100size_t
101RTPSessionPool::getPoolLength() const
102{
103#ifndef WIN32
104 size_t result;
105 poolLock.readLock();
106 result = sessionList.size();
107 poolLock.unlock();
108 return result;
109#else
110 return 0;
111#endif
112}
113
114void
115SingleRTPSessionPool::run()
116{
117#ifndef WIN32
118 SOCKET so;
119 microtimeout_t packetTimeout(0);
120 while ( isActive() ) {
121 poolLock.readLock();
122 // Make a copy of the list so that add and remove does
123 // not affect the list during this loop iteration
124 list<SessionListElement*> sessions(sessionList);
125 poolLock.unlock();
126
127 PoolIterator i = sessions.begin();
128 while ( i != sessions.end() ) {
129 poolLock.readLock();
130 if (!(*i)->isCleared()) {
131 RTPSessionBase* session((*i)->get());
132 controlReceptionService(*session);
133 controlTransmissionService(*session);
134 }
135 poolLock.unlock();
136 i++;
137 }
138 timeval timeout = getPoolTimeout();
139
140 // Reinitializa fd set
141 FD_ZERO(&recvSocketSet);
142 poolLock.readLock();
143 highestSocket = 0;
144 for (PoolIterator j = sessions.begin(); j != sessions.end (); j++) {
145 if (!(*j)->isCleared()) {
146 RTPSessionBase* session((*j)->get());
147 SOCKET s = getDataRecvSocket(*session);
148 FD_SET(s,&recvSocketSet);
149 if ( s > highestSocket + 1 )
150 highestSocket = s + 1;
151 }
152 }
153 poolLock.unlock();
154
155
156 int n = select(highestSocket,&recvSocketSet,NULL,NULL,
157 &timeout);
158
159 i = sessions.begin();
160 while ( (i != sessions.end()) ) {
161 poolLock.readLock();
162 if (!(*i)->isCleared()) {
163 RTPSessionBase* session((*i)->get());
164 so = getDataRecvSocket(*session);
165 if ( FD_ISSET(so,&recvSocketSet) && (n-- > 0) ) {
166 takeInDataPacket(*session);
167 }
168
169 // schedule by timestamp, as in
170 // SingleThreadRTPSession (by Joergen
171 // Terner)
172 if (packetTimeout < 1000) {
173 packetTimeout = getSchedulingTimeout(*session);
174 }
175 microtimeout_t maxWait =
176 timeval2microtimeout(getRTCPCheckInterval(*session));
177 // make sure the scheduling timeout is
178 // <= the check interval for RTCP
179 // packets
180 packetTimeout = (packetTimeout > maxWait)? maxWait : packetTimeout;
181 if ( packetTimeout < 1000 ) { // !(packetTimeout/1000)
182 setCancel(cancelDeferred);
183 dispatchDataPacket(*session);
184 setCancel(cancelImmediate);
185 //timerTick();
186 } else {
187 packetTimeout = 0;
188 }
189 }
190 poolLock.unlock();
191 i++;
192 }
193
194 // Purge elements for removed sessions.
195 poolLock.writeLock();
196 i = sessionList.begin();
197 while (i != sessionList.end()) {
198 if ((*i)->isCleared()) {
199 SessionListElement* element(*i);
200 i = sessionList.erase(i);
201 delete element;
202 }
203 else {
204 ++i;
205 }
206 }
207 poolLock.unlock();
208
209 //GF we added that to allow the kernel scheduler to
210 // give other tasks some time as if we have lots of
211 // active sessions the thread cann take all the CPU if we
212 // don't pause at all. We haven't found the best way to
213 // do that yet.
214 // usleep (10);
215 yield();
216 }
217#endif // ndef WIN32
218}
219
220#if defined(_MSC_VER) && _MSC_VER >= 1300
221SingleThreadRTPSession<DualRTPUDPIPv4Channel,DualRTPUDPIPv4Channel,AVPQueue>::SingleThreadRTPSession<DualRTPUDPIPv4Channel,DualRTPUDPIPv4Channel,AVPQueue>(
222const InetHostAddress& ia, tpport_t dataPort, tpport_t controlPort, int pri,
223uint32 memberssize, RTPApplication& app) :
224Thread(pri), TRTPSessionBase<RTPDataChannel,RTCPChannel,ServiceQueue>
225(ia,dataPort,controlPort,memberssize,app)
226{}
227
228SingleThreadRTPSession<DualRTPUDPIPv4Channel,DualRTPUDPIPv4Channel,AVPQueue>::SingleThreadRTPSession<DualRTPUDPIPv4Channel,DualRTPUDPIPv4Channel,AVPQueue>(
229const InetMcastAddress& ia, tpport_t dataPort, tpport_t controlPort, int pri,
230uint32 memberssize, RTPApplication& app, uint32 iface) :
231Thread(pri), TRTPSessionBase<RTPDataChannel,RTCPChannel,ServiceQueue>
232(ia,dataPort,controlPort,memberssize,app,iface)
233{}
234
235void SingleThreadRTPSession<DualRTPUDPIPv4Channel,DualRTPUDPIPv4Channel,AVPQueue>::startRunning()
236{
237 enableStack();
238 Thread::start();
239}
240
241bool SingleThreadRTPSession<DualRTPUDPIPv4Channel,DualRTPUDPIPv4Channel,AVPQueue>::isPendingData(microtimeout_t timeout)
242{
243 return TRTPSessionBase<RTPDataChannel,RTCPChannel,ServiceQueue>::isPendingData(timeout);
244}
245
246void SingleThreadRTPSession<DualRTPUDPIPv4Channel,DualRTPUDPIPv4Channel,AVPQueue>::timerTick(void)
247{}
248
249void SingleThreadRTPSession<DualRTPUDPIPv4Channel,DualRTPUDPIPv4Channel,AVPQueue>::run(void)
250{
251 microtimeout_t timeout = 0;
252 while ( ServiceQueue::isActive() ) {
253 if ( timeout < 1000 ){ // !(timeout/1000)
254 timeout = getSchedulingTimeout();
255 }
256 setCancel(cancelDeferred);
257 controlReceptionService();
258 controlTransmissionService();
259 setCancel(cancelImmediate);
260 microtimeout_t maxWait =
261 timeval2microtimeout(getRTCPCheckInterval());
262 // make sure the scheduling timeout is
263 // <= the check interval for RTCP
264 // packets
265 timeout = (timeout > maxWait)? maxWait : timeout;
266 if ( timeout < 1000 ) { // !(timeout/1000)
267 setCancel(cancelDeferred);
268 dispatchDataPacket();
269 setCancel(cancelImmediate);
270 timerTick();
271 } else {
272 if ( isPendingData(timeout/1000) ) {
273 setCancel(cancelDeferred);
274 takeInDataPacket();
275 setCancel(cancelImmediate);
276 }
277 timeout = 0;
278 }
279 }
280 dispatchBYE("GNU ccRTP stack finishing.");
281 Thread::exit();
282}
283
284
285#ifdef CCXX_IPV6
286
287SingleThreadRTPSessionIPV6<DualRTPUDPIPv6Channel,DualRTPUDPIPv6Channel,AVPQueue>::SingleThreadRTPSession<DualRTPUDPIPv4Channel,DualRTPUDPIPv4Channel,AVPQueue>(
288const IPV6Host& ia, tpport_t dataPort, tpport_t controlPort, int pri,
289uint32 memberssize, RTPApplication& app) :
290Thread(pri), TRTPSessionBaseIPV6<RTPDataChannel,RTCPChannel,ServiceQueue>
291(ia,dataPort,controlPort,memberssize,app)
292{}
293
294SingleThreadRTPSessionIPV6<DualRTPUDPIPv6Channel,DualRTPUDPIPv6Channel,AVPQueue>::SingleThreadRTPSession<DualRTPUDPIPv4Channel,DualRTPUDPIPv4Channel,AVPQueue>(
295const IPV6Multicast& ia, tpport_t dataPort, tpport_t controlPort, int pri,
296uint32 memberssize, RTPApplication& app, uint32 iface) :
297Thread(pri), TRTPSessionBaseIPV6<RTPDataChannel,RTCPChannel,ServiceQueue>
298(ia,dataPort,controlPort,memberssize,app,iface)
299{}
300
301void SingleThreadRTPSessionIPV6<DualRTPUDPIPv6Channel,DualRTPUDPIPv6Channel,AVPQueue>::startRunning()
302{
303 enableStack();
304 Thread::start();
305}
306
307bool SingleThreadRTPSessionIPV6<DualRTPUDPIPv6Channel,DualRTPUDPIPv6Channel,AVPQueue>::isPendingData(microtimeout_t timeout)
308{
309 return TRTPSessionBaseIPV6<RTPDataChannel,RTCPChannel,ServiceQueue>::isPendingData(timeout);
310}
311
312void SingleThreadRTPSessionIPV6<DualRTPUDPIPv6Channel,DualRTPUDPIPv6Channel,AVPQueue>::timerTick(void)
313{}
314
315void SingleThreadRTPSessionIPV6<DualRTPUDPIPv6Channel,DualRTPUDPIPv6Channel,AVPQueue>::run(void)
316{
317 microtimeout_t timeout = 0;
318 while ( ServiceQueue::isActive() ) {
319 if ( timeout < 1000 ){ // !(timeout/1000)
320 timeout = getSchedulingTimeout();
321 }
322 setCancel(cancelDeferred);
323 controlReceptionService();
324 controlTransmissionService();
325 setCancel(cancelImmediate);
326 microtimeout_t maxWait =
327 timeval2microtimeout(getRTCPCheckInterval());
328 // make sure the scheduling timeout is
329 // <= the check interval for RTCP
330 // packets
331 timeout = (timeout > maxWait)? maxWait : timeout;
332 if ( timeout < 1000 ) { // !(timeout/1000)
333 setCancel(cancelDeferred);
334 dispatchDataPacket();
335 setCancel(cancelImmediate);
336 timerTick();
337 } else {
338 if ( isPendingData(timeout/1000) ) {
339 setCancel(cancelDeferred);
340 takeInDataPacket();
341 setCancel(cancelImmediate);
342 }
343 timeout = 0;
344 }
345 }
346 dispatchBYE("GNU ccRTP stack finishing.");
347 Thread::exit();
348}
349
350
351#endif
352
353
354#endif
355
356#ifdef CCXX_NAMESPACES
357}
358#endif
359
360/** EMACS **
361 * Local variables:
362 * mode: c++
363 * c-basic-offset: 4
364 * End:
365 */