blob: a3347d650e3d756f7a71c4bc923d80fb4ed80d04 [file] [log] [blame]
Tristan Matthews0a329cc2013-07-17 13:20:14 -04001/* $Id$ */
2/*
3 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20#include <pjsip/sip_transport_loop.h>
21#include <pjsip/sip_endpoint.h>
22#include <pjsip/sip_errno.h>
23#include <pj/pool.h>
24#include <pj/os.h>
25#include <pj/string.h>
26#include <pj/lock.h>
27#include <pj/assert.h>
28#include <pj/compat/socket.h>
29
30
31#define ADDR_LOOP "128.0.0.1"
32#define ADDR_LOOP_DGRAM "129.0.0.1"
33
34
35/** This structure describes incoming packet. */
36struct recv_list
37{
38 PJ_DECL_LIST_MEMBER(struct recv_list);
39 pjsip_rx_data rdata;
40};
41
42/** This structure is used to keep delayed send failure. */
43struct send_list
44{
45 PJ_DECL_LIST_MEMBER(struct send_list);
46 pj_time_val sent_time;
47 pj_ssize_t sent;
48 pjsip_tx_data *tdata;
49 void *token;
50 void (*callback)(pjsip_transport*, void*, pj_ssize_t);
51};
52
53/** This structure describes the loop transport. */
54struct loop_transport
55{
56 pjsip_transport base;
57 pj_pool_t *pool;
58 pj_thread_t *thread;
59 pj_bool_t thread_quit_flag;
60 pj_bool_t discard;
61 int fail_mode;
62 unsigned recv_delay;
63 unsigned send_delay;
64 struct recv_list recv_list;
65 struct send_list send_list;
66};
67
68
69/* Helper function to create "incoming" packet */
70struct recv_list *create_incoming_packet( struct loop_transport *loop,
71 pjsip_tx_data *tdata )
72{
73 pj_pool_t *pool;
74 struct recv_list *pkt;
75
76 pool = pjsip_endpt_create_pool(loop->base.endpt, "rdata",
77 PJSIP_POOL_RDATA_LEN,
78 PJSIP_POOL_RDATA_INC+5);
79 if (!pool)
80 return NULL;
81
82 pkt = PJ_POOL_ZALLOC_T(pool, struct recv_list);
83
84 /* Initialize rdata. */
85 pkt->rdata.tp_info.pool = pool;
86 pkt->rdata.tp_info.transport = &loop->base;
87
88 /* Copy the packet. */
89 pj_memcpy(pkt->rdata.pkt_info.packet, tdata->buf.start,
90 tdata->buf.cur - tdata->buf.start);
91 pkt->rdata.pkt_info.len = tdata->buf.cur - tdata->buf.start;
92
93 /* the source address */
94 pkt->rdata.pkt_info.src_addr.addr.sa_family = pj_AF_INET();
95
96 /* "Source address" info. */
97 pkt->rdata.pkt_info.src_addr_len = sizeof(pj_sockaddr_in);
98 if (loop->base.key.type == PJSIP_TRANSPORT_LOOP) {
99 pj_ansi_strcpy(pkt->rdata.pkt_info.src_name, ADDR_LOOP);
100 } else {
101 pj_ansi_strcpy(pkt->rdata.pkt_info.src_name, ADDR_LOOP_DGRAM);
102 }
103 pkt->rdata.pkt_info.src_port = loop->base.local_name.port;
104
105 /* When do we need to "deliver" this packet. */
106 pj_gettimeofday(&pkt->rdata.pkt_info.timestamp);
107 pkt->rdata.pkt_info.timestamp.msec += loop->recv_delay;
108 pj_time_val_normalize(&pkt->rdata.pkt_info.timestamp);
109
110 /* Done. */
111
112 return pkt;
113}
114
115
116/* Helper function to add pending notification callback. */
117static pj_status_t add_notification( struct loop_transport *loop,
118 pjsip_tx_data *tdata,
119 pj_ssize_t sent,
120 void *token,
121 void (*callback)(pjsip_transport*,
122 void*, pj_ssize_t))
123{
124 struct send_list *sent_status;
125
126 pjsip_tx_data_add_ref(tdata);
127 pj_lock_acquire(tdata->lock);
128 sent_status = PJ_POOL_ALLOC_T(tdata->pool, struct send_list);
129 pj_lock_release(tdata->lock);
130
131 sent_status->sent = sent;
132 sent_status->tdata = tdata;
133 sent_status->token = token;
134 sent_status->callback = callback;
135
136 pj_gettimeofday(&sent_status->sent_time);
137 sent_status->sent_time.msec += loop->send_delay;
138 pj_time_val_normalize(&sent_status->sent_time);
139
140 pj_lock_acquire(loop->base.lock);
141 pj_list_push_back(&loop->send_list, sent_status);
142 pj_lock_release(loop->base.lock);
143
144 return PJ_SUCCESS;
145}
146
147/* Handler for sending outgoing message; called by transport manager. */
148static pj_status_t loop_send_msg( pjsip_transport *tp,
149 pjsip_tx_data *tdata,
150 const pj_sockaddr_t *rem_addr,
151 int addr_len,
152 void *token,
153 pjsip_transport_callback cb)
154{
155 struct loop_transport *loop = (struct loop_transport*)tp;
156 struct recv_list *recv_pkt;
157
158 PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||
159 tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);
160
161 PJ_UNUSED_ARG(rem_addr);
162 PJ_UNUSED_ARG(addr_len);
163
164
165 /* Need to send failure? */
166 if (loop->fail_mode) {
167 if (loop->send_delay == 0) {
168 return PJ_STATUS_FROM_OS(OSERR_ECONNRESET);
169 } else {
170 add_notification(loop, tdata, -PJ_STATUS_FROM_OS(OSERR_ECONNRESET),
171 token, cb);
172
173 return PJ_EPENDING;
174 }
175 }
176
177 /* Discard any packets? */
178 if (loop->discard)
179 return PJ_SUCCESS;
180
181 /* Create rdata for the "incoming" packet. */
182 recv_pkt = create_incoming_packet(loop, tdata);
183 if (!recv_pkt)
184 return PJ_ENOMEM;
185
186 /* If delay is not configured, deliver this packet now! */
187 if (loop->recv_delay == 0) {
188 pj_ssize_t size_eaten;
189
190 size_eaten = pjsip_tpmgr_receive_packet( loop->base.tpmgr,
191 &recv_pkt->rdata);
192 pj_assert(size_eaten == recv_pkt->rdata.pkt_info.len);
193
194 pjsip_endpt_release_pool(loop->base.endpt,
195 recv_pkt->rdata.tp_info.pool);
196
197 } else {
198 /* Otherwise if delay is configured, add the "packet" to the
199 * receive list to be processed by worker thread.
200 */
201 pj_lock_acquire(loop->base.lock);
202 pj_list_push_back(&loop->recv_list, recv_pkt);
203 pj_lock_release(loop->base.lock);
204 }
205
206 if (loop->send_delay != 0) {
207 add_notification(loop, tdata, tdata->buf.cur - tdata->buf.start,
208 token, cb);
209 return PJ_EPENDING;
210 } else {
211 return PJ_SUCCESS;
212 }
213}
214
215/* Handler to destroy the transport; called by transport manager */
216static pj_status_t loop_destroy(pjsip_transport *tp)
217{
218 struct loop_transport *loop = (struct loop_transport*)tp;
219
220 PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||
221 tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);
222
223 loop->thread_quit_flag = 1;
224 /* Unlock transport mutex before joining thread. */
225 pj_lock_release(tp->lock);
226 pj_thread_join(loop->thread);
227 pj_thread_destroy(loop->thread);
228
229 /* Clear pending send notifications. */
230 while (!pj_list_empty(&loop->send_list)) {
231 struct send_list *node = loop->send_list.next;
232 /* Notify callback. */
233 if (node->callback) {
234 (*node->callback)(&loop->base, node->token, -PJSIP_ESHUTDOWN);
235 }
236 pj_list_erase(node);
237 pjsip_tx_data_dec_ref(node->tdata);
238 }
239
240 /* Clear "incoming" packets in the queue. */
241 while (!pj_list_empty(&loop->recv_list)) {
242 struct recv_list *node = loop->recv_list.next;
243 pj_list_erase(node);
244 pjsip_endpt_release_pool(loop->base.endpt,
245 node->rdata.tp_info.pool);
246 }
247
248 /* Self destruct.. heheh.. */
249 pj_lock_destroy(loop->base.lock);
250 pj_atomic_destroy(loop->base.ref_cnt);
251 pjsip_endpt_release_pool(loop->base.endpt, loop->base.pool);
252
253 return PJ_SUCCESS;
254}
255
256/* Worker thread for loop transport. */
257static int loop_transport_worker_thread(void *arg)
258{
259 struct loop_transport *loop = (struct loop_transport*) arg;
260 struct recv_list r;
261 struct send_list s;
262
263 pj_list_init(&r);
264 pj_list_init(&s);
265
266 while (!loop->thread_quit_flag) {
267 pj_time_val now;
268
269 pj_thread_sleep(1);
270 pj_gettimeofday(&now);
271
272 pj_lock_acquire(loop->base.lock);
273
274 /* Move expired send notification to local list. */
275 while (!pj_list_empty(&loop->send_list)) {
276 struct send_list *node = loop->send_list.next;
277
278 /* Break when next node time is greater than now. */
279 if (PJ_TIME_VAL_GTE(node->sent_time, now))
280 break;
281
282 /* Delete this from the list. */
283 pj_list_erase(node);
284
285 /* Add to local list. */
286 pj_list_push_back(&s, node);
287 }
288
289 /* Move expired "incoming" packet to local list. */
290 while (!pj_list_empty(&loop->recv_list)) {
291 struct recv_list *node = loop->recv_list.next;
292
293 /* Break when next node time is greater than now. */
294 if (PJ_TIME_VAL_GTE(node->rdata.pkt_info.timestamp, now))
295 break;
296
297 /* Delete this from the list. */
298 pj_list_erase(node);
299
300 /* Add to local list. */
301 pj_list_push_back(&r, node);
302
303 }
304
305 pj_lock_release(loop->base.lock);
306
307 /* Process send notification and incoming packet notification
308 * without holding down the loop's mutex.
309 */
310 while (!pj_list_empty(&s)) {
311 struct send_list *node = s.next;
312
313 pj_list_erase(node);
314
315 /* Notify callback. */
316 if (node->callback) {
317 (*node->callback)(&loop->base, node->token, node->sent);
318 }
319
320 /* Decrement tdata reference counter. */
321 pjsip_tx_data_dec_ref(node->tdata);
322 }
323
324 /* Process "incoming" packet. */
325 while (!pj_list_empty(&r)) {
326 struct recv_list *node = r.next;
327 pj_ssize_t size_eaten;
328
329 pj_list_erase(node);
330
331 /* Notify transport manager about the "incoming packet" */
332 size_eaten = pjsip_tpmgr_receive_packet(loop->base.tpmgr,
333 &node->rdata);
334
335 /* Must "eat" all the packets. */
336 pj_assert(size_eaten == node->rdata.pkt_info.len);
337
338 /* Done. */
339 pjsip_endpt_release_pool(loop->base.endpt,
340 node->rdata.tp_info.pool);
341 }
342 }
343
344 return 0;
345}
346
347
348/* Start loop transport. */
349PJ_DEF(pj_status_t) pjsip_loop_start( pjsip_endpoint *endpt,
350 pjsip_transport **transport)
351{
352 pj_pool_t *pool;
353 struct loop_transport *loop;
354 pj_status_t status;
355
356 /* Create pool. */
357 pool = pjsip_endpt_create_pool(endpt, "loop", 4000, 4000);
358 if (!pool)
359 return PJ_ENOMEM;
360
361 /* Create the loop structure. */
362 loop = PJ_POOL_ZALLOC_T(pool, struct loop_transport);
363
364 /* Initialize transport properties. */
365 pj_ansi_snprintf(loop->base.obj_name, sizeof(loop->base.obj_name),
366 "loop%p", loop);
367 loop->base.pool = pool;
368 status = pj_atomic_create(pool, 0, &loop->base.ref_cnt);
369 if (status != PJ_SUCCESS)
370 goto on_error;
371 status = pj_lock_create_recursive_mutex(pool, "loop", &loop->base.lock);
372 if (status != PJ_SUCCESS)
373 goto on_error;
374 loop->base.key.type = PJSIP_TRANSPORT_LOOP_DGRAM;
375 //loop->base.key.rem_addr.sa_family = pj_AF_INET();
376 loop->base.type_name = "LOOP-DGRAM";
377 loop->base.info = "LOOP-DGRAM";
378 loop->base.flag = PJSIP_TRANSPORT_DATAGRAM;
379 loop->base.local_name.host = pj_str(ADDR_LOOP_DGRAM);
380 loop->base.local_name.port =
381 pjsip_transport_get_default_port_for_type((pjsip_transport_type_e)
382 loop->base.key.type);
383 loop->base.addr_len = sizeof(pj_sockaddr_in);
384 loop->base.dir = PJSIP_TP_DIR_NONE;
385 loop->base.endpt = endpt;
386 loop->base.tpmgr = pjsip_endpt_get_tpmgr(endpt);
387 loop->base.send_msg = &loop_send_msg;
388 loop->base.destroy = &loop_destroy;
389
390 pj_list_init(&loop->recv_list);
391 pj_list_init(&loop->send_list);
392
393 /* Create worker thread. */
394 status = pj_thread_create(pool, "loop",
395 &loop_transport_worker_thread, loop, 0,
396 PJ_THREAD_SUSPENDED, &loop->thread);
397 if (status != PJ_SUCCESS)
398 goto on_error;
399
400 /* Register to transport manager. */
401 status = pjsip_transport_register( loop->base.tpmgr, &loop->base);
402 if (status != PJ_SUCCESS)
403 goto on_error;
404
405 /* Start the thread. */
406 status = pj_thread_resume(loop->thread);
407 if (status != PJ_SUCCESS)
408 goto on_error;
409
410 /*
411 * Done.
412 */
413
414 if (transport)
415 *transport = &loop->base;
416
417 return PJ_SUCCESS;
418
419on_error:
420 if (loop->base.lock)
421 pj_lock_destroy(loop->base.lock);
422 if (loop->thread)
423 pj_thread_destroy(loop->thread);
424 if (loop->base.ref_cnt)
425 pj_atomic_destroy(loop->base.ref_cnt);
426 pjsip_endpt_release_pool(endpt, loop->pool);
427 return status;
428}
429
430
431PJ_DEF(pj_status_t) pjsip_loop_set_discard( pjsip_transport *tp,
432 pj_bool_t discard,
433 pj_bool_t *prev_value )
434{
435 struct loop_transport *loop = (struct loop_transport*)tp;
436
437 PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||
438 tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);
439
440 if (prev_value)
441 *prev_value = loop->discard;
442 loop->discard = discard;
443
444 return PJ_SUCCESS;
445}
446
447
448PJ_DEF(pj_status_t) pjsip_loop_set_failure( pjsip_transport *tp,
449 int fail_flag,
450 int *prev_value )
451{
452 struct loop_transport *loop = (struct loop_transport*)tp;
453
454 PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||
455 tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);
456
457 if (prev_value)
458 *prev_value = loop->fail_mode;
459 loop->fail_mode = fail_flag;
460
461 return PJ_SUCCESS;
462}
463
464
465PJ_DEF(pj_status_t) pjsip_loop_set_recv_delay( pjsip_transport *tp,
466 unsigned delay,
467 unsigned *prev_value)
468{
469 struct loop_transport *loop = (struct loop_transport*)tp;
470
471 PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||
472 tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);
473
474 if (prev_value)
475 *prev_value = loop->recv_delay;
476 loop->recv_delay = delay;
477
478 return PJ_SUCCESS;
479}
480
481PJ_DEF(pj_status_t) pjsip_loop_set_send_callback_delay( pjsip_transport *tp,
482 unsigned delay,
483 unsigned *prev_value)
484{
485 struct loop_transport *loop = (struct loop_transport*)tp;
486
487 PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||
488 tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);
489
490 if (prev_value)
491 *prev_value = loop->send_delay;
492 loop->send_delay = delay;
493
494 return PJ_SUCCESS;
495}
496
497PJ_DEF(pj_status_t) pjsip_loop_set_delay( pjsip_transport *tp, unsigned delay )
498{
499 struct loop_transport *loop = (struct loop_transport*)tp;
500
501 PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||
502 tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);
503
504 loop->recv_delay = delay;
505 loop->send_delay = delay;
506
507 return PJ_SUCCESS;
508}
509