blob: bdd0416130ef241fdc9f701d59e9b18268c9a932 [file] [log] [blame]
Tristan Matthews0a329cc2013-07-17 13:20:14 -04001/* $Id$ */
2/*
3 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20#include <pjsip/sip_transport_udp.h>
21#include <pjsip/sip_endpoint.h>
22#include <pjsip/sip_errno.h>
23#include <pj/addr_resolv.h>
24#include <pj/assert.h>
25#include <pj/lock.h>
26#include <pj/log.h>
27#include <pj/os.h>
28#include <pj/pool.h>
29#include <pj/sock.h>
30#include <pj/compat/socket.h>
31#include <pj/string.h>
32
33
34#define THIS_FILE "sip_transport_udp.c"
35
36/**
37 * These are the target values for socket send and receive buffer sizes,
38 * respectively. They will be applied to UDP socket with setsockopt().
39 * When transport failed to set these size, it will decrease it until
40 * sufficiently large number has been successfully set.
41 *
42 * The buffer size is important, especially in WinXP/2000 machines.
43 * Basicly the lower the size, the more packets will be lost (dropped?)
44 * when we're sending (receiving?) packets in large volumes.
45 *
46 * The figure here is taken based on my experiment on WinXP/2000 machine,
47 * and with this value, the rate of dropped packet is about 8% when
48 * sending 1800 requests simultaneously (percentage taken as average
49 * after 50K requests or so).
50 *
51 * More experiments are needed probably.
52 */
53/* 2010/01/14
54 * Too many people complained about seeing "Error setting SNDBUF" log,
55 * so lets just remove this. People who want to have SNDBUF set can
56 * still do so by declaring these two macros in config_site.h
57 */
58#ifndef PJSIP_UDP_SO_SNDBUF_SIZE
59/*# define PJSIP_UDP_SO_SNDBUF_SIZE (24*1024*1024)*/
60# define PJSIP_UDP_SO_SNDBUF_SIZE 0
61#endif
62
63#ifndef PJSIP_UDP_SO_RCVBUF_SIZE
64/*# define PJSIP_UDP_SO_RCVBUF_SIZE (24*1024*1024)*/
65# define PJSIP_UDP_SO_RCVBUF_SIZE 0
66#endif
67
68
69/* Struct udp_transport "inherits" struct pjsip_transport */
70struct udp_transport
71{
72 pjsip_transport base;
73 pj_sock_t sock;
74 pj_ioqueue_key_t *key;
75 int rdata_cnt;
76 pjsip_rx_data **rdata;
77 int is_closing;
78 pj_bool_t is_paused;
79};
80
81
82/*
83 * Initialize transport's receive buffer from the specified pool.
84 */
85static void init_rdata(struct udp_transport *tp, unsigned rdata_index,
86 pj_pool_t *pool, pjsip_rx_data **p_rdata)
87{
88 pjsip_rx_data *rdata;
89
90 /* Reset pool. */
91 //note: already done by caller
92 //pj_pool_reset(pool);
93
94 rdata = PJ_POOL_ZALLOC_T(pool, pjsip_rx_data);
95
96 /* Init tp_info part. */
97 rdata->tp_info.pool = pool;
98 rdata->tp_info.transport = &tp->base;
99 rdata->tp_info.tp_data = (void*)(pj_ssize_t)rdata_index;
100 rdata->tp_info.op_key.rdata = rdata;
101 pj_ioqueue_op_key_init(&rdata->tp_info.op_key.op_key,
102 sizeof(pj_ioqueue_op_key_t));
103
104 tp->rdata[rdata_index] = rdata;
105
106 if (p_rdata)
107 *p_rdata = rdata;
108}
109
110
111/*
112 * udp_on_read_complete()
113 *
114 * This is callback notification from ioqueue that a pending recvfrom()
115 * operation has completed.
116 */
117static void udp_on_read_complete( pj_ioqueue_key_t *key,
118 pj_ioqueue_op_key_t *op_key,
119 pj_ssize_t bytes_read)
120{
121 /* See https://trac.pjsip.org/repos/ticket/1197 */
122 enum { MAX_IMMEDIATE_PACKET = 50 };
123 pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key;
124 pjsip_rx_data *rdata = rdata_op_key->rdata;
125 struct udp_transport *tp = (struct udp_transport*)rdata->tp_info.transport;
126 int i;
127 pj_status_t status;
128
129 /* Don't do anything if transport is closing. */
130 if (tp->is_closing) {
131 tp->is_closing++;
132 return;
133 }
134
135 /* Don't do anything if transport is being paused. */
136 if (tp->is_paused)
137 return;
138
139 /*
140 * The idea of the loop is to process immediate data received by
141 * pj_ioqueue_recvfrom(), as long as i < MAX_IMMEDIATE_PACKET. When
142 * i is >= MAX_IMMEDIATE_PACKET, we force the recvfrom() operation to
143 * complete asynchronously, to allow other sockets to get their data.
144 */
145 for (i=0;; ++i) {
146 enum { MIN_SIZE = 32 };
147 pj_uint32_t flags;
148
149 /* Report the packet to transport manager. Only do so if packet size
150 * is relatively big enough for a SIP packet.
151 */
152 if (bytes_read > MIN_SIZE) {
153 pj_size_t size_eaten;
154 const pj_sockaddr *src_addr = &rdata->pkt_info.src_addr;
155
156 /* Init pkt_info part. */
157 rdata->pkt_info.len = bytes_read;
158 rdata->pkt_info.zero = 0;
159 pj_gettimeofday(&rdata->pkt_info.timestamp);
160 if (src_addr->addr.sa_family == pj_AF_INET()) {
161 pj_ansi_strcpy(rdata->pkt_info.src_name,
162 pj_inet_ntoa(src_addr->ipv4.sin_addr));
163 rdata->pkt_info.src_port = pj_ntohs(src_addr->ipv4.sin_port);
164 } else {
165 pj_inet_ntop(pj_AF_INET6(),
166 pj_sockaddr_get_addr(&rdata->pkt_info.src_addr),
167 rdata->pkt_info.src_name,
168 sizeof(rdata->pkt_info.src_name));
169 rdata->pkt_info.src_port = pj_ntohs(src_addr->ipv6.sin6_port);
170 }
171
172 size_eaten =
173 pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr,
174 rdata);
175
176 if (size_eaten < 0) {
177 pj_assert(!"It shouldn't happen!");
178 size_eaten = rdata->pkt_info.len;
179 }
180
181 /* Since this is UDP, the whole buffer is the message. */
182 rdata->pkt_info.len = 0;
183
184 } else if (bytes_read <= MIN_SIZE) {
185
186 /* TODO: */
187
188 } else if (-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
189 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
190 -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET))
191 {
192
193 /* Report error to endpoint. */
194 PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
195 rdata->tp_info.transport->obj_name,
196 (pj_status_t)-bytes_read,
197 "Warning: pj_ioqueue_recvfrom()"
198 " callback error"));
199 }
200
201 if (i >= MAX_IMMEDIATE_PACKET) {
202 /* Force ioqueue_recvfrom() to return PJ_EPENDING */
203 flags = PJ_IOQUEUE_ALWAYS_ASYNC;
204 } else {
205 flags = 0;
206 }
207
208 /* Reset pool.
209 * Need to copy rdata fields to temp variable because they will
210 * be invalid after pj_pool_reset().
211 */
212 {
213 pj_pool_t *rdata_pool = rdata->tp_info.pool;
214 struct udp_transport *rdata_tp ;
215 unsigned rdata_index;
216
217 rdata_tp = (struct udp_transport*)rdata->tp_info.transport;
218 rdata_index = (unsigned)(unsigned long)(pj_ssize_t)
219 rdata->tp_info.tp_data;
220
221 pj_pool_reset(rdata_pool);
222 init_rdata(rdata_tp, rdata_index, rdata_pool, &rdata);
223
224 /* Change some vars to point to new location after
225 * pool reset.
226 */
227 op_key = &rdata->tp_info.op_key.op_key;
228 }
229
230 /* Only read next packet if transport is not being paused. This
231 * check handles the case where transport is paused while endpoint
232 * is still processing a SIP message.
233 */
234 if (tp->is_paused)
235 return;
236
237 /* Read next packet. */
238 bytes_read = sizeof(rdata->pkt_info.packet);
239 rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr);
240 status = pj_ioqueue_recvfrom(key, op_key,
241 rdata->pkt_info.packet,
242 &bytes_read, flags,
243 &rdata->pkt_info.src_addr,
244 &rdata->pkt_info.src_addr_len);
245
246 if (status == PJ_SUCCESS) {
247 /* Continue loop. */
248 pj_assert(i < MAX_IMMEDIATE_PACKET);
249
250 } else if (status == PJ_EPENDING) {
251 break;
252
253 } else {
254
255 if (i < MAX_IMMEDIATE_PACKET) {
256
257 /* Report error to endpoint if this is not EWOULDBLOCK error.*/
258 if (status != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
259 status != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
260 status != PJ_STATUS_FROM_OS(OSERR_ECONNRESET))
261 {
262
263 PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
264 rdata->tp_info.transport->obj_name,
265 status,
266 "Warning: pj_ioqueue_recvfrom"));
267 }
268
269 /* Continue loop. */
270 bytes_read = 0;
271 } else {
272 /* This is fatal error.
273 * Ioqueue operation will stop for this transport!
274 */
275 PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
276 rdata->tp_info.transport->obj_name,
277 status,
278 "FATAL: pj_ioqueue_recvfrom() error, "
279 "UDP transport stopping! Error"));
280 break;
281 }
282 }
283 }
284}
285
286/*
287 * udp_on_write_complete()
288 *
289 * This is callback notification from ioqueue that a pending sendto()
290 * operation has completed.
291 */
292static void udp_on_write_complete( pj_ioqueue_key_t *key,
293 pj_ioqueue_op_key_t *op_key,
294 pj_ssize_t bytes_sent)
295{
296 struct udp_transport *tp = (struct udp_transport*)
297 pj_ioqueue_get_user_data(key);
298 pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key;
299
300 tdata_op_key->tdata = NULL;
301
302 if (tdata_op_key->callback) {
303 tdata_op_key->callback(&tp->base, tdata_op_key->token, bytes_sent);
304 }
305}
306
307/*
308 * udp_send_msg()
309 *
310 * This function is called by transport manager (by transport->send_msg())
311 * to send outgoing message.
312 */
313static pj_status_t udp_send_msg( pjsip_transport *transport,
314 pjsip_tx_data *tdata,
315 const pj_sockaddr_t *rem_addr,
316 int addr_len,
317 void *token,
318 pjsip_transport_callback callback)
319{
320 struct udp_transport *tp = (struct udp_transport*)transport;
321 pj_ssize_t size;
322 pj_status_t status;
323
324 PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL);
325 PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX);
326
327 /* Return error if transport is paused */
328 if (tp->is_paused)
329 return PJSIP_ETPNOTAVAIL;
330
331 /* Init op key. */
332 tdata->op_key.tdata = tdata;
333 tdata->op_key.token = token;
334 tdata->op_key.callback = callback;
335
336 /* Send to ioqueue! */
337 size = tdata->buf.cur - tdata->buf.start;
338 status = pj_ioqueue_sendto(tp->key, (pj_ioqueue_op_key_t*)&tdata->op_key,
339 tdata->buf.start, &size, 0,
340 rem_addr, addr_len);
341
342 if (status != PJ_EPENDING)
343 tdata->op_key.tdata = NULL;
344
345 return status;
346}
347
348/*
349 * udp_destroy()
350 *
351 * This function is called by transport manager (by transport->destroy()).
352 */
353static pj_status_t udp_destroy( pjsip_transport *transport )
354{
355 struct udp_transport *tp = (struct udp_transport*)transport;
356 int i;
357
358 /* Mark this transport as closing. */
359 tp->is_closing = 1;
360
361 /* Cancel all pending operations. */
362 /* blp: NO NO NO...
363 * No need to post queued completion as we poll the ioqueue until
364 * we've got events anyway. Posting completion will only cause
365 * callback to be called twice with IOCP: one for the post completion
366 * and another one for closing the socket.
367 *
368 for (i=0; i<tp->rdata_cnt; ++i) {
369 pj_ioqueue_post_completion(tp->key,
370 &tp->rdata[i]->tp_info.op_key.op_key, -1);
371 }
372 */
373
374 /* Unregister from ioqueue. */
375 if (tp->key) {
376 pj_ioqueue_unregister(tp->key);
377 tp->key = NULL;
378 } else {
379 /* Close socket. */
380 if (tp->sock && tp->sock != PJ_INVALID_SOCKET) {
381 pj_sock_close(tp->sock);
382 tp->sock = PJ_INVALID_SOCKET;
383 }
384 }
385
386 /* Must poll ioqueue because IOCP calls the callback when socket
387 * is closed. We poll the ioqueue until all pending callbacks
388 * have been called.
389 */
390 for (i=0; i<50 && tp->is_closing < 1+tp->rdata_cnt; ++i) {
391 int cnt;
392 pj_time_val timeout = {0, 1};
393
394 cnt = pj_ioqueue_poll(pjsip_endpt_get_ioqueue(transport->endpt),
395 &timeout);
396 if (cnt == 0)
397 break;
398 }
399
400 /* Destroy rdata */
401 for (i=0; i<tp->rdata_cnt; ++i) {
402 pj_pool_release(tp->rdata[i]->tp_info.pool);
403 }
404
405 /* Destroy reference counter. */
406 if (tp->base.ref_cnt)
407 pj_atomic_destroy(tp->base.ref_cnt);
408
409 /* Destroy lock */
410 if (tp->base.lock)
411 pj_lock_destroy(tp->base.lock);
412
413 /* Destroy pool. */
414 pjsip_endpt_release_pool(tp->base.endpt, tp->base.pool);
415
416 return PJ_SUCCESS;
417}
418
419
420/*
421 * udp_shutdown()
422 *
423 * Start graceful UDP shutdown.
424 */
425static pj_status_t udp_shutdown(pjsip_transport *transport)
426{
427 return pjsip_transport_dec_ref(transport);
428}
429
430
431/* Create socket */
432static pj_status_t create_socket(int af, const pj_sockaddr_t *local_a,
433 int addr_len, pj_sock_t *p_sock)
434{
435 pj_sock_t sock;
436 pj_sockaddr_in tmp_addr;
437 pj_sockaddr_in6 tmp_addr6;
438 pj_status_t status;
439
440 status = pj_sock_socket(af, pj_SOCK_DGRAM(), 0, &sock);
441 if (status != PJ_SUCCESS)
442 return status;
443
444 if (local_a == NULL) {
445 if (af == pj_AF_INET6()) {
446 pj_bzero(&tmp_addr6, sizeof(tmp_addr6));
447 tmp_addr6.sin6_family = (pj_uint16_t)af;
448 local_a = &tmp_addr6;
449 addr_len = sizeof(tmp_addr6);
450 } else {
451 pj_sockaddr_in_init(&tmp_addr, NULL, 0);
452 local_a = &tmp_addr;
453 addr_len = sizeof(tmp_addr);
454 }
455 }
456
457 status = pj_sock_bind(sock, local_a, addr_len);
458 if (status != PJ_SUCCESS) {
459 pj_sock_close(sock);
460 return status;
461 }
462
463 *p_sock = sock;
464 return PJ_SUCCESS;
465}
466
467
468/* Generate transport's published address */
469static pj_status_t get_published_name(pj_sock_t sock,
470 char hostbuf[],
471 int hostbufsz,
472 pjsip_host_port *bound_name)
473{
474 pj_sockaddr tmp_addr;
475 int addr_len;
476 pj_status_t status;
477
478 addr_len = sizeof(tmp_addr);
479 status = pj_sock_getsockname(sock, &tmp_addr, &addr_len);
480 if (status != PJ_SUCCESS)
481 return status;
482
483 bound_name->host.ptr = hostbuf;
484 if (tmp_addr.addr.sa_family == pj_AF_INET()) {
485 bound_name->port = pj_ntohs(tmp_addr.ipv4.sin_port);
486
487 /* If bound address specifies "0.0.0.0", get the IP address
488 * of local hostname.
489 */
490 if (tmp_addr.ipv4.sin_addr.s_addr == PJ_INADDR_ANY) {
491 pj_sockaddr hostip;
492
493 status = pj_gethostip(pj_AF_INET(), &hostip);
494 if (status != PJ_SUCCESS)
495 return status;
496
497 pj_strcpy2(&bound_name->host, pj_inet_ntoa(hostip.ipv4.sin_addr));
498 } else {
499 /* Otherwise use bound address. */
500 pj_strcpy2(&bound_name->host,
501 pj_inet_ntoa(tmp_addr.ipv4.sin_addr));
502 status = PJ_SUCCESS;
503 }
504
505 } else {
506 /* If bound address specifies "INADDR_ANY" (IPv6), get the
507 * IP address of local hostname
508 */
509 pj_uint32_t loop6[4] = { 0, 0, 0, 0};
510
511 bound_name->port = pj_ntohs(tmp_addr.ipv6.sin6_port);
512
513 if (pj_memcmp(&tmp_addr.ipv6.sin6_addr, loop6, sizeof(loop6))==0) {
514 status = pj_gethostip(tmp_addr.addr.sa_family, &tmp_addr);
515 if (status != PJ_SUCCESS)
516 return status;
517 }
518
519 status = pj_inet_ntop(tmp_addr.addr.sa_family,
520 pj_sockaddr_get_addr(&tmp_addr),
521 hostbuf, hostbufsz);
522 if (status == PJ_SUCCESS) {
523 bound_name->host.slen = pj_ansi_strlen(hostbuf);
524 }
525 }
526
527
528 return status;
529}
530
531/* Set the published address of the transport */
532static void udp_set_pub_name(struct udp_transport *tp,
533 const pjsip_host_port *a_name)
534{
535 enum { INFO_LEN = 80 };
536 char local_addr[PJ_INET6_ADDRSTRLEN+10];
537
538 pj_assert(a_name->host.slen != 0);
539 pj_strdup_with_null(tp->base.pool, &tp->base.local_name.host,
540 &a_name->host);
541 tp->base.local_name.port = a_name->port;
542
543 /* Update transport info. */
544 if (tp->base.info == NULL) {
545 tp->base.info = (char*) pj_pool_alloc(tp->base.pool, INFO_LEN);
546 }
547
548 pj_sockaddr_print(&tp->base.local_addr, local_addr, sizeof(local_addr), 3);
549
550 pj_ansi_snprintf(
551 tp->base.info, INFO_LEN, "udp %s [published as %s:%d]",
552 local_addr,
553 tp->base.local_name.host.ptr,
554 tp->base.local_name.port);
555}
556
557/* Set the socket handle of the transport */
558static void udp_set_socket(struct udp_transport *tp,
559 pj_sock_t sock,
560 const pjsip_host_port *a_name)
561{
562#if PJSIP_UDP_SO_RCVBUF_SIZE || PJSIP_UDP_SO_SNDBUF_SIZE
563 long sobuf_size;
564 pj_status_t status;
565#endif
566
567 /* Adjust socket rcvbuf size */
568#if PJSIP_UDP_SO_RCVBUF_SIZE
569 sobuf_size = PJSIP_UDP_SO_RCVBUF_SIZE;
570 status = pj_sock_setsockopt(sock, pj_SOL_SOCKET(), pj_SO_RCVBUF(),
571 &sobuf_size, sizeof(sobuf_size));
572 if (status != PJ_SUCCESS) {
573 char errmsg[PJ_ERR_MSG_SIZE];
574 pj_strerror(status, errmsg, sizeof(errmsg));
575 PJ_LOG(4,(THIS_FILE, "Error setting SO_RCVBUF: %s [%d]", errmsg,
576 status));
577 }
578#endif
579
580 /* Adjust socket sndbuf size */
581#if PJSIP_UDP_SO_SNDBUF_SIZE
582 sobuf_size = PJSIP_UDP_SO_SNDBUF_SIZE;
583 status = pj_sock_setsockopt(sock, pj_SOL_SOCKET(), pj_SO_SNDBUF(),
584 &sobuf_size, sizeof(sobuf_size));
585 if (status != PJ_SUCCESS) {
586 char errmsg[PJ_ERR_MSG_SIZE];
587 pj_strerror(status, errmsg, sizeof(errmsg));
588 PJ_LOG(4,(THIS_FILE, "Error setting SO_SNDBUF: %s [%d]", errmsg,
589 status));
590 }
591#endif
592
593 /* Set the socket. */
594 tp->sock = sock;
595
596 /* Init address name (published address) */
597 udp_set_pub_name(tp, a_name);
598}
599
600/* Register socket to ioqueue */
601static pj_status_t register_to_ioqueue(struct udp_transport *tp)
602{
603 pj_ioqueue_t *ioqueue;
604 pj_ioqueue_callback ioqueue_cb;
605
606 /* Ignore if already registered */
607 if (tp->key != NULL)
608 return PJ_SUCCESS;
609
610 /* Register to ioqueue. */
611 ioqueue = pjsip_endpt_get_ioqueue(tp->base.endpt);
612 pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb));
613 ioqueue_cb.on_read_complete = &udp_on_read_complete;
614 ioqueue_cb.on_write_complete = &udp_on_write_complete;
615
616 return pj_ioqueue_register_sock(tp->base.pool, ioqueue, tp->sock, tp,
617 &ioqueue_cb, &tp->key);
618}
619
620/* Start ioqueue asynchronous reading to all rdata */
621static pj_status_t start_async_read(struct udp_transport *tp)
622{
623 pj_ioqueue_t *ioqueue;
624 int i;
625 pj_status_t status;
626
627 ioqueue = pjsip_endpt_get_ioqueue(tp->base.endpt);
628
629 /* Start reading the ioqueue. */
630 for (i=0; i<tp->rdata_cnt; ++i) {
631 pj_ssize_t size;
632
633 size = sizeof(tp->rdata[i]->pkt_info.packet);
634 tp->rdata[i]->pkt_info.src_addr_len = sizeof(tp->rdata[i]->pkt_info.src_addr);
635 status = pj_ioqueue_recvfrom(tp->key,
636 &tp->rdata[i]->tp_info.op_key.op_key,
637 tp->rdata[i]->pkt_info.packet,
638 &size, PJ_IOQUEUE_ALWAYS_ASYNC,
639 &tp->rdata[i]->pkt_info.src_addr,
640 &tp->rdata[i]->pkt_info.src_addr_len);
641 if (status == PJ_SUCCESS) {
642 pj_assert(!"Shouldn't happen because PJ_IOQUEUE_ALWAYS_ASYNC!");
643 udp_on_read_complete(tp->key, &tp->rdata[i]->tp_info.op_key.op_key,
644 size);
645 } else if (status != PJ_EPENDING) {
646 /* Error! */
647 return status;
648 }
649 }
650
651 return PJ_SUCCESS;
652}
653
654
655/*
656 * pjsip_udp_transport_attach()
657 *
658 * Attach UDP socket and start transport.
659 */
660static pj_status_t transport_attach( pjsip_endpoint *endpt,
661 pjsip_transport_type_e type,
662 pj_sock_t sock,
663 const pjsip_host_port *a_name,
664 unsigned async_cnt,
665 pjsip_transport **p_transport)
666{
667 pj_pool_t *pool;
668 struct udp_transport *tp;
669 const char *format, *ipv6_quoteb, *ipv6_quotee;
670 unsigned i;
671 pj_status_t status;
672
673 PJ_ASSERT_RETURN(endpt && sock!=PJ_INVALID_SOCKET && a_name && async_cnt>0,
674 PJ_EINVAL);
675
676 /* Object name. */
677 if (type & PJSIP_TRANSPORT_IPV6) {
678 format = "udpv6%p";
679 ipv6_quoteb = "[";
680 ipv6_quotee = "]";
681 } else {
682 format = "udp%p";
683 ipv6_quoteb = ipv6_quotee = "";
684 }
685
686 /* Create pool. */
687 pool = pjsip_endpt_create_pool(endpt, format, PJSIP_POOL_LEN_TRANSPORT,
688 PJSIP_POOL_INC_TRANSPORT);
689 if (!pool)
690 return PJ_ENOMEM;
691
692 /* Create the UDP transport object. */
693 tp = PJ_POOL_ZALLOC_T(pool, struct udp_transport);
694
695 /* Save pool. */
696 tp->base.pool = pool;
697
698 pj_memcpy(tp->base.obj_name, pool->obj_name, PJ_MAX_OBJ_NAME);
699
700 /* Init reference counter. */
701 status = pj_atomic_create(pool, 0, &tp->base.ref_cnt);
702 if (status != PJ_SUCCESS)
703 goto on_error;
704
705 /* Init lock. */
706 status = pj_lock_create_recursive_mutex(pool, pool->obj_name,
707 &tp->base.lock);
708 if (status != PJ_SUCCESS)
709 goto on_error;
710
711 /* Set type. */
712 tp->base.key.type = type;
713
714 /* Remote address is left zero (except the family) */
715 tp->base.key.rem_addr.addr.sa_family = (pj_uint16_t)
716 ((type & PJSIP_TRANSPORT_IPV6) ? pj_AF_INET6() : pj_AF_INET());
717
718 /* Type name. */
719 tp->base.type_name = "UDP";
720
721 /* Transport flag */
722 tp->base.flag = pjsip_transport_get_flag_from_type(type);
723
724
725 /* Length of addressess. */
726 tp->base.addr_len = sizeof(tp->base.local_addr);
727
728 /* Init local address. */
729 status = pj_sock_getsockname(sock, &tp->base.local_addr,
730 &tp->base.addr_len);
731 if (status != PJ_SUCCESS)
732 goto on_error;
733
734 /* Init remote name. */
735 if (type == PJSIP_TRANSPORT_UDP)
736 tp->base.remote_name.host = pj_str("0.0.0.0");
737 else
738 tp->base.remote_name.host = pj_str("::0");
739 tp->base.remote_name.port = 0;
740
741 /* Init direction */
742 tp->base.dir = PJSIP_TP_DIR_NONE;
743
744 /* Set endpoint. */
745 tp->base.endpt = endpt;
746
747 /* Transport manager and timer will be initialized by tpmgr */
748
749 /* Attach socket and assign name. */
750 udp_set_socket(tp, sock, a_name);
751
752 /* Register to ioqueue */
753 status = register_to_ioqueue(tp);
754 if (status != PJ_SUCCESS)
755 goto on_error;
756
757 /* Set functions. */
758 tp->base.send_msg = &udp_send_msg;
759 tp->base.do_shutdown = &udp_shutdown;
760 tp->base.destroy = &udp_destroy;
761
762 /* This is a permanent transport, so we initialize the ref count
763 * to one so that transport manager don't destroy this transport
764 * when there's no user!
765 */
766 pj_atomic_inc(tp->base.ref_cnt);
767
768 /* Register to transport manager. */
769 tp->base.tpmgr = pjsip_endpt_get_tpmgr(endpt);
770 status = pjsip_transport_register( tp->base.tpmgr, (pjsip_transport*)tp);
771 if (status != PJ_SUCCESS)
772 goto on_error;
773
774
775 /* Create rdata and put it in the array. */
776 tp->rdata_cnt = 0;
777 tp->rdata = (pjsip_rx_data**)
778 pj_pool_calloc(tp->base.pool, async_cnt,
779 sizeof(pjsip_rx_data*));
780 for (i=0; i<async_cnt; ++i) {
781 pj_pool_t *rdata_pool = pjsip_endpt_create_pool(endpt, "rtd%p",
782 PJSIP_POOL_RDATA_LEN,
783 PJSIP_POOL_RDATA_INC);
784 if (!rdata_pool) {
785 pj_atomic_set(tp->base.ref_cnt, 0);
786 pjsip_transport_destroy(&tp->base);
787 return PJ_ENOMEM;
788 }
789
790 init_rdata(tp, i, rdata_pool, NULL);
791 tp->rdata_cnt++;
792 }
793
794 /* Start reading the ioqueue. */
795 status = start_async_read(tp);
796 if (status != PJ_SUCCESS) {
797 pjsip_transport_destroy(&tp->base);
798 return status;
799 }
800
801 /* Done. */
802 if (p_transport)
803 *p_transport = &tp->base;
804
805 PJ_LOG(4,(tp->base.obj_name,
806 "SIP %s started, published address is %s%.*s%s:%d",
807 pjsip_transport_get_type_desc((pjsip_transport_type_e)tp->base.key.type),
808 ipv6_quoteb,
809 (int)tp->base.local_name.host.slen,
810 tp->base.local_name.host.ptr,
811 ipv6_quotee,
812 tp->base.local_name.port));
813
814 return PJ_SUCCESS;
815
816on_error:
817 udp_destroy((pjsip_transport*)tp);
818 return status;
819}
820
821
822PJ_DEF(pj_status_t) pjsip_udp_transport_attach( pjsip_endpoint *endpt,
823 pj_sock_t sock,
824 const pjsip_host_port *a_name,
825 unsigned async_cnt,
826 pjsip_transport **p_transport)
827{
828 return transport_attach(endpt, PJSIP_TRANSPORT_UDP, sock, a_name,
829 async_cnt, p_transport);
830}
831
832PJ_DEF(pj_status_t) pjsip_udp_transport_attach2( pjsip_endpoint *endpt,
833 pjsip_transport_type_e type,
834 pj_sock_t sock,
835 const pjsip_host_port *a_name,
836 unsigned async_cnt,
837 pjsip_transport **p_transport)
838{
839 return transport_attach(endpt, type, sock, a_name,
840 async_cnt, p_transport);
841}
842
843/*
844 * pjsip_udp_transport_start()
845 *
846 * Create a UDP socket in the specified address and start a transport.
847 */
848PJ_DEF(pj_status_t) pjsip_udp_transport_start( pjsip_endpoint *endpt,
849 const pj_sockaddr_in *local_a,
850 const pjsip_host_port *a_name,
851 unsigned async_cnt,
852 pjsip_transport **p_transport)
853{
854 pj_sock_t sock;
855 pj_status_t status;
856 char addr_buf[PJ_INET6_ADDRSTRLEN];
857 pjsip_host_port bound_name;
858
859 PJ_ASSERT_RETURN(endpt && async_cnt, PJ_EINVAL);
860
861 status = create_socket(pj_AF_INET(), local_a, sizeof(pj_sockaddr_in),
862 &sock);
863 if (status != PJ_SUCCESS)
864 return status;
865
866 if (a_name == NULL) {
867 /* Address name is not specified.
868 * Build a name based on bound address.
869 */
870 status = get_published_name(sock, addr_buf, sizeof(addr_buf),
871 &bound_name);
872 if (status != PJ_SUCCESS) {
873 pj_sock_close(sock);
874 return status;
875 }
876
877 a_name = &bound_name;
878 }
879
880 return pjsip_udp_transport_attach( endpt, sock, a_name, async_cnt,
881 p_transport );
882}
883
884
885/*
886 * pjsip_udp_transport_start()
887 *
888 * Create a UDP socket in the specified address and start a transport.
889 */
890PJ_DEF(pj_status_t) pjsip_udp_transport_start6(pjsip_endpoint *endpt,
891 const pj_sockaddr_in6 *local_a,
892 const pjsip_host_port *a_name,
893 unsigned async_cnt,
894 pjsip_transport **p_transport)
895{
896 pj_sock_t sock;
897 pj_status_t status;
898 char addr_buf[PJ_INET6_ADDRSTRLEN];
899 pjsip_host_port bound_name;
900
901 PJ_ASSERT_RETURN(endpt && async_cnt, PJ_EINVAL);
902
903 status = create_socket(pj_AF_INET6(), local_a, sizeof(pj_sockaddr_in6),
904 &sock);
905 if (status != PJ_SUCCESS)
906 return status;
907
908 if (a_name == NULL) {
909 /* Address name is not specified.
910 * Build a name based on bound address.
911 */
912 status = get_published_name(sock, addr_buf, sizeof(addr_buf),
913 &bound_name);
914 if (status != PJ_SUCCESS) {
915 pj_sock_close(sock);
916 return status;
917 }
918
919 a_name = &bound_name;
920 }
921
922 return pjsip_udp_transport_attach2(endpt, PJSIP_TRANSPORT_UDP6,
923 sock, a_name, async_cnt, p_transport);
924}
925
926/*
927 * Retrieve the internal socket handle used by the UDP transport.
928 */
929PJ_DEF(pj_sock_t) pjsip_udp_transport_get_socket(pjsip_transport *transport)
930{
931 struct udp_transport *tp;
932
933 PJ_ASSERT_RETURN(transport != NULL, PJ_INVALID_SOCKET);
934
935 tp = (struct udp_transport*) transport;
936
937 return tp->sock;
938}
939
940
941/*
942 * Temporarily pause or shutdown the transport.
943 */
944PJ_DEF(pj_status_t) pjsip_udp_transport_pause(pjsip_transport *transport,
945 unsigned option)
946{
947 struct udp_transport *tp;
948 unsigned i;
949
950 PJ_ASSERT_RETURN(transport != NULL, PJ_EINVAL);
951
952 /* Flag must be specified */
953 PJ_ASSERT_RETURN((option & 0x03) != 0, PJ_EINVAL);
954
955 tp = (struct udp_transport*) transport;
956
957 /* Transport must not have been paused */
958 PJ_ASSERT_RETURN(tp->is_paused==0, PJ_EINVALIDOP);
959
960 /* Set transport to paused first, so that when the read callback is
961 * called by pj_ioqueue_post_completion() it will not try to
962 * re-register the rdata.
963 */
964 tp->is_paused = PJ_TRUE;
965
966 /* Cancel the ioqueue operation. */
967 for (i=0; i<(unsigned)tp->rdata_cnt; ++i) {
968 pj_ioqueue_post_completion(tp->key,
969 &tp->rdata[i]->tp_info.op_key.op_key, -1);
970 }
971
972 /* Destroy the socket? */
973 if (option & PJSIP_UDP_TRANSPORT_DESTROY_SOCKET) {
974 if (tp->key) {
975 /* This implicitly closes the socket */
976 pj_ioqueue_unregister(tp->key);
977 tp->key = NULL;
978 } else {
979 /* Close socket. */
980 if (tp->sock && tp->sock != PJ_INVALID_SOCKET) {
981 pj_sock_close(tp->sock);
982 tp->sock = PJ_INVALID_SOCKET;
983 }
984 }
985 tp->sock = PJ_INVALID_SOCKET;
986
987 }
988
989 PJ_LOG(4,(tp->base.obj_name, "SIP UDP transport paused"));
990
991 return PJ_SUCCESS;
992}
993
994
995/*
996 * Restart transport.
997 *
998 * If option is KEEP_SOCKET, just re-activate ioqueue operation.
999 *
1000 * If option is DESTROY_SOCKET:
1001 * - if socket is specified, replace.
1002 * - if socket is not specified, create and replace.
1003 */
1004PJ_DEF(pj_status_t) pjsip_udp_transport_restart(pjsip_transport *transport,
1005 unsigned option,
1006 pj_sock_t sock,
1007 const pj_sockaddr_in *local,
1008 const pjsip_host_port *a_name)
1009{
1010 struct udp_transport *tp;
1011 pj_status_t status;
1012
1013 PJ_ASSERT_RETURN(transport != NULL, PJ_EINVAL);
1014 /* Flag must be specified */
1015 PJ_ASSERT_RETURN((option & 0x03) != 0, PJ_EINVAL);
1016
1017 tp = (struct udp_transport*) transport;
1018
1019 if (option & PJSIP_UDP_TRANSPORT_DESTROY_SOCKET) {
1020 char addr_buf[PJ_INET6_ADDRSTRLEN];
1021 pjsip_host_port bound_name;
1022
1023 /* Request to recreate transport */
1024
1025 /* Destroy existing socket, if any. */
1026 if (tp->key) {
1027 /* This implicitly closes the socket */
1028 pj_ioqueue_unregister(tp->key);
1029 tp->key = NULL;
1030 } else {
1031 /* Close socket. */
1032 if (tp->sock && tp->sock != PJ_INVALID_SOCKET) {
1033 pj_sock_close(tp->sock);
1034 tp->sock = PJ_INVALID_SOCKET;
1035 }
1036 }
1037 tp->sock = PJ_INVALID_SOCKET;
1038
1039 /* Create the socket if it's not specified */
1040 if (sock == PJ_INVALID_SOCKET) {
1041 status = create_socket(pj_AF_INET(), local,
1042 sizeof(pj_sockaddr_in), &sock);
1043 if (status != PJ_SUCCESS)
1044 return status;
1045 }
1046
1047 /* If transport published name is not specified, calculate it
1048 * from the bound address.
1049 */
1050 if (a_name == NULL) {
1051 status = get_published_name(sock, addr_buf, sizeof(addr_buf),
1052 &bound_name);
1053 if (status != PJ_SUCCESS) {
1054 pj_sock_close(sock);
1055 return status;
1056 }
1057
1058 a_name = &bound_name;
1059 }
1060
1061 /* Init local address. */
1062 status = pj_sock_getsockname(sock, &tp->base.local_addr,
1063 &tp->base.addr_len);
1064 if (status != PJ_SUCCESS)
1065 return status;
1066
1067 /* Assign the socket and published address to transport. */
1068 udp_set_socket(tp, sock, a_name);
1069
1070 } else {
1071
1072 /* For KEEP_SOCKET, transport must have been paused before */
1073 PJ_ASSERT_RETURN(tp->is_paused, PJ_EINVALIDOP);
1074
1075 /* If address name is specified, update it */
1076 if (a_name != NULL)
1077 udp_set_pub_name(tp, a_name);
1078 }
1079
1080 /* Re-register new or existing socket to ioqueue. */
1081 status = register_to_ioqueue(tp);
1082 if (status != PJ_SUCCESS) {
1083 return status;
1084 }
1085
1086 /* Restart async read operation. */
1087 status = start_async_read(tp);
1088 if (status != PJ_SUCCESS)
1089 return status;
1090
1091 /* Everything has been set up */
1092 tp->is_paused = PJ_FALSE;
1093
1094 PJ_LOG(4,(tp->base.obj_name,
1095 "SIP UDP transport restarted, published address is %.*s:%d",
1096 (int)tp->base.local_name.host.slen,
1097 tp->base.local_name.host.ptr,
1098 tp->base.local_name.port));
1099
1100 return PJ_SUCCESS;
1101}
1102