blob: e48d5ecc6853e82e52430dc7fd5ee17ed497ab88 [file] [log] [blame]
Alexandre Lision8af73cb2013-12-10 14:11:20 -05001/* $Id$ */
2/*
3 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20#include <pjsip/sip_transport_tcp.h>
21#include <pjsip/sip_endpoint.h>
22#include <pjsip/sip_errno.h>
23#include <pj/compat/socket.h>
24#include <pj/addr_resolv.h>
25#include <pj/activesock.h>
26#include <pj/assert.h>
27#include <pj/lock.h>
28#include <pj/log.h>
29#include <pj/os.h>
30#include <pj/pool.h>
31#include <pj/string.h>
32
33/* Only declare the API if PJ_HAS_TCP is true */
34#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
35
36
37#define THIS_FILE "sip_transport_tcp.c"
38
39#define MAX_ASYNC_CNT 16
40#define POOL_LIS_INIT 512
41#define POOL_LIS_INC 512
42#define POOL_TP_INIT 512
43#define POOL_TP_INC 512
44
45struct tcp_listener;
46struct tcp_transport;
47
48
49/*
50 * This is the TCP listener, which is a "descendant" of pjsip_tpfactory (the
51 * SIP transport factory).
52 */
53struct tcp_listener
54{
55 pjsip_tpfactory factory;
56 pj_bool_t is_registered;
57 pjsip_endpoint *endpt;
58 pjsip_tpmgr *tpmgr;
59 pj_activesock_t *asock;
60 pj_sockaddr bound_addr;
61 pj_qos_type qos_type;
62 pj_qos_params qos_params;
63};
64
65
66/*
67 * This structure is used to keep delayed transmit operation in a list.
68 * A delayed transmission occurs when application sends tx_data when
69 * the TCP connect/establishment is still in progress. These delayed
70 * transmission will be "flushed" once the socket is connected (either
71 * successfully or with errors).
72 */
73struct delayed_tdata
74{
75 PJ_DECL_LIST_MEMBER(struct delayed_tdata);
76 pjsip_tx_data_op_key *tdata_op_key;
77 pj_time_val timeout;
78};
79
80
81/*
82 * This structure describes the TCP transport, and it's descendant of
83 * pjsip_transport.
84 */
85struct tcp_transport
86{
87 pjsip_transport base;
88 pj_bool_t is_server;
89
90 /* Do not save listener instance in the transport, because
91 * listener might be destroyed during transport's lifetime.
92 * See http://trac.pjsip.org/repos/ticket/491
93 struct tcp_listener *listener;
94 */
95
96 pj_bool_t is_registered;
97 pj_bool_t is_closing;
98 pj_status_t close_reason;
99 pj_sock_t sock;
100 pj_activesock_t *asock;
101 pj_bool_t has_pending_connect;
102
103 /* Keep-alive timer. */
104 pj_timer_entry ka_timer;
105 pj_time_val last_activity;
106 pjsip_tx_data_op_key ka_op_key;
107 pj_str_t ka_pkt;
108
109 /* TCP transport can only have one rdata!
110 * Otherwise chunks of incoming PDU may be received on different
111 * buffer.
112 */
113 pjsip_rx_data rdata;
114
115 /* Pending transmission list. */
116 struct delayed_tdata delayed_list;
117};
118
119
120/****************************************************************************
121 * PROTOTYPES
122 */
123
124/* This callback is called when pending accept() operation completes. */
125static pj_bool_t on_accept_complete(pj_activesock_t *asock,
126 pj_sock_t newsock,
127 const pj_sockaddr_t *src_addr,
128 int src_addr_len);
129
130/* This callback is called by transport manager to destroy listener */
131static pj_status_t lis_destroy(pjsip_tpfactory *factory);
132
133/* This callback is called by transport manager to create transport */
134static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
135 pjsip_tpmgr *mgr,
136 pjsip_endpoint *endpt,
137 const pj_sockaddr *rem_addr,
138 int addr_len,
139 pjsip_transport **transport);
140
141/* Common function to create and initialize transport */
142static pj_status_t tcp_create(struct tcp_listener *listener,
143 pj_pool_t *pool,
144 pj_sock_t sock, pj_bool_t is_server,
145 const pj_sockaddr *local,
146 const pj_sockaddr *remote,
147 struct tcp_transport **p_tcp);
148
149
150static void tcp_perror(const char *sender, const char *title,
151 pj_status_t status)
152{
153 char errmsg[PJ_ERR_MSG_SIZE];
154
155 pj_strerror(status, errmsg, sizeof(errmsg));
156
157 PJ_LOG(1,(sender, "%s: %s [code=%d]", title, errmsg, status));
158}
159
160
161static void sockaddr_to_host_port( pj_pool_t *pool,
162 pjsip_host_port *host_port,
163 const pj_sockaddr *addr )
164{
165 host_port->host.ptr = (char*) pj_pool_alloc(pool, PJ_INET6_ADDRSTRLEN+4);
166 pj_sockaddr_print(addr, host_port->host.ptr, PJ_INET6_ADDRSTRLEN+4, 0);
167 host_port->host.slen = pj_ansi_strlen(host_port->host.ptr);
168 host_port->port = pj_sockaddr_get_port(addr);
169}
170
171
172static void tcp_init_shutdown(struct tcp_transport *tcp, pj_status_t status)
173{
174 pjsip_tp_state_callback state_cb;
175
176 if (tcp->close_reason == PJ_SUCCESS)
177 tcp->close_reason = status;
178
179 if (tcp->base.is_shutdown || tcp->base.is_destroying)
180 return;
181
182 /* Prevent immediate transport destroy by application, as transport
183 * state notification callback may be stacked and transport instance
184 * must remain valid at any point in the callback.
185 */
186 pjsip_transport_add_ref(&tcp->base);
187
188 /* Notify application of transport disconnected state */
189 state_cb = pjsip_tpmgr_get_state_cb(tcp->base.tpmgr);
190 if (state_cb) {
191 pjsip_transport_state_info state_info;
192
193 pj_bzero(&state_info, sizeof(state_info));
194 state_info.status = tcp->close_reason;
195 (*state_cb)(&tcp->base, PJSIP_TP_STATE_DISCONNECTED, &state_info);
196 }
197
198 /* check again */
199 if (tcp->base.is_shutdown || tcp->base.is_destroying)
200 return;
201
202 /* We can not destroy the transport since high level objects may
203 * still keep reference to this transport. So we can only
204 * instruct transport manager to gracefully start the shutdown
205 * procedure for this transport.
206 */
207 pjsip_transport_shutdown(&tcp->base);
208
209 /* Now, it is ok to destroy the transport. */
210 pjsip_transport_dec_ref(&tcp->base);
211}
212
213
214/*
215 * Initialize pjsip_tcp_transport_cfg structure with default values.
216 */
217PJ_DEF(void) pjsip_tcp_transport_cfg_default(pjsip_tcp_transport_cfg *cfg,
218 int af)
219{
220 pj_bzero(cfg, sizeof(*cfg));
221 cfg->af = af;
222 pj_sockaddr_init(cfg->af, &cfg->bind_addr, NULL, 0);
223 cfg->async_cnt = 1;
224 cfg->reuse_addr = PJSIP_TCP_TRANSPORT_REUSEADDR;
225}
226
227
228/****************************************************************************
229 * The TCP listener/transport factory.
230 */
231
232/*
233 * This is the public API to create, initialize, register, and start the
234 * TCP listener.
235 */
236PJ_DEF(pj_status_t) pjsip_tcp_transport_start3(
237 pjsip_endpoint *endpt,
238 const pjsip_tcp_transport_cfg *cfg,
239 pjsip_tpfactory **p_factory
240 )
241{
242 pj_pool_t *pool;
243 pj_sock_t sock = PJ_INVALID_SOCKET;
244 struct tcp_listener *listener;
245 pj_activesock_cfg asock_cfg;
246 pj_activesock_cb listener_cb;
247 pj_sockaddr *listener_addr;
248 int addr_len;
249 pj_status_t status;
250
251 /* Sanity check */
252 PJ_ASSERT_RETURN(endpt && cfg->async_cnt, PJ_EINVAL);
253
254 /* Verify that address given in a_name (if any) is valid */
255 if (cfg->addr_name.host.slen) {
256 pj_sockaddr tmp;
257
258 status = pj_sockaddr_init(cfg->af, &tmp, &cfg->addr_name.host,
259 (pj_uint16_t)cfg->addr_name.port);
260 if (status != PJ_SUCCESS || !pj_sockaddr_has_addr(&tmp) ||
261 (cfg->af==pj_AF_INET() &&
262 tmp.ipv4.sin_addr.s_addr==PJ_INADDR_NONE))
263 {
264 /* Invalid address */
265 return PJ_EINVAL;
266 }
267 }
268
269 pool = pjsip_endpt_create_pool(endpt, "tcplis", POOL_LIS_INIT,
270 POOL_LIS_INC);
271 PJ_ASSERT_RETURN(pool, PJ_ENOMEM);
272
273
274 listener = PJ_POOL_ZALLOC_T(pool, struct tcp_listener);
275 listener->factory.pool = pool;
276 listener->factory.type = cfg->af==pj_AF_INET() ? PJSIP_TRANSPORT_TCP :
277 PJSIP_TRANSPORT_TCP6;
278 listener->factory.type_name = (char*)
279 pjsip_transport_get_type_name(listener->factory.type);
280 listener->factory.flag =
281 pjsip_transport_get_flag_from_type(listener->factory.type);
282 listener->qos_type = cfg->qos_type;
283 pj_memcpy(&listener->qos_params, &cfg->qos_params,
284 sizeof(cfg->qos_params));
285
286 pj_ansi_strcpy(listener->factory.obj_name, "tcplis");
287 if (listener->factory.type==PJSIP_TRANSPORT_TCP6)
288 pj_ansi_strcat(listener->factory.obj_name, "6");
289
290 status = pj_lock_create_recursive_mutex(pool, listener->factory.obj_name,
291 &listener->factory.lock);
292 if (status != PJ_SUCCESS)
293 goto on_error;
294
295
296 /* Create socket */
297 status = pj_sock_socket(cfg->af, pj_SOCK_STREAM(), 0, &sock);
298 if (status != PJ_SUCCESS)
299 goto on_error;
300
301 /* Apply QoS, if specified */
302 status = pj_sock_apply_qos2(sock, cfg->qos_type, &cfg->qos_params,
303 2, listener->factory.obj_name,
304 "SIP TCP listener socket");
305
306 /* Apply SO_REUSEADDR */
307 if (cfg->reuse_addr) {
308 int enabled = 1;
309 status = pj_sock_setsockopt(sock, pj_SOL_SOCKET(), pj_SO_REUSEADDR(),
310 &enabled, sizeof(enabled));
311 if (status != PJ_SUCCESS) {
312 PJ_PERROR(4,(listener->factory.obj_name, status,
313 "Warning: error applying SO_REUSEADDR"));
314 }
315 }
316
317 /* Bind address may be different than factory.local_addr because
318 * factory.local_addr will be resolved below.
319 */
320 pj_sockaddr_cp(&listener->bound_addr, &cfg->bind_addr);
321
322 /* Bind socket */
323 listener_addr = &listener->factory.local_addr;
324 pj_sockaddr_cp(listener_addr, &cfg->bind_addr);
325
326 status = pj_sock_bind(sock, listener_addr,
327 pj_sockaddr_get_len(listener_addr));
328 if (status != PJ_SUCCESS)
329 goto on_error;
330
331 /* Retrieve the bound address */
332 addr_len = pj_sockaddr_get_len(listener_addr);
333 status = pj_sock_getsockname(sock, listener_addr, &addr_len);
334 if (status != PJ_SUCCESS)
335 goto on_error;
336
337 /* If published host/IP is specified, then use that address as the
338 * listener advertised address.
339 */
340 if (cfg->addr_name.host.slen) {
341 /* Copy the address */
342 listener->factory.addr_name = cfg->addr_name;
343 pj_strdup(listener->factory.pool, &listener->factory.addr_name.host,
344 &cfg->addr_name.host);
345 listener->factory.addr_name.port = cfg->addr_name.port;
346
347 } else {
348 /* No published address is given, use the bound address */
349
350 /* If the address returns 0.0.0.0, use the default
351 * interface address as the transport's address.
352 */
353 if (!pj_sockaddr_has_addr(listener_addr)) {
354 pj_sockaddr hostip;
355
356 status = pj_gethostip(listener->bound_addr.addr.sa_family,
357 &hostip);
358 if (status != PJ_SUCCESS)
359 goto on_error;
360
361 pj_sockaddr_copy_addr(listener_addr, &hostip);
362 }
363
364 /* Save the address name */
365 sockaddr_to_host_port(listener->factory.pool,
366 &listener->factory.addr_name,
367 listener_addr);
368 }
369
370 /* If port is zero, get the bound port */
371 if (listener->factory.addr_name.port == 0) {
372 listener->factory.addr_name.port = pj_sockaddr_get_port(listener_addr);
373 }
374
375 pj_ansi_snprintf(listener->factory.obj_name,
376 sizeof(listener->factory.obj_name),
377 "tcplis:%d", listener->factory.addr_name.port);
378
379
380 /* Start listening to the address */
381 status = pj_sock_listen(sock, PJSIP_TCP_TRANSPORT_BACKLOG);
382 if (status != PJ_SUCCESS)
383 goto on_error;
384
385
386 /* Create active socket */
387 pj_activesock_cfg_default(&asock_cfg);
388 if (cfg->async_cnt > MAX_ASYNC_CNT)
389 asock_cfg.async_cnt = MAX_ASYNC_CNT;
390 else
391 asock_cfg.async_cnt = cfg->async_cnt;
392
393 pj_bzero(&listener_cb, sizeof(listener_cb));
394 listener_cb.on_accept_complete = &on_accept_complete;
395 status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg,
396 pjsip_endpt_get_ioqueue(endpt),
397 &listener_cb, listener,
398 &listener->asock);
399
400 /* Register to transport manager */
401 listener->endpt = endpt;
402 listener->tpmgr = pjsip_endpt_get_tpmgr(endpt);
403 listener->factory.create_transport = lis_create_transport;
404 listener->factory.destroy = lis_destroy;
405 listener->is_registered = PJ_TRUE;
406 status = pjsip_tpmgr_register_tpfactory(listener->tpmgr,
407 &listener->factory);
408 if (status != PJ_SUCCESS) {
409 listener->is_registered = PJ_FALSE;
410 goto on_error;
411 }
412
413 /* Start pending accept() operations */
414 status = pj_activesock_start_accept(listener->asock, pool);
415 if (status != PJ_SUCCESS)
416 goto on_error;
417
418 PJ_LOG(4,(listener->factory.obj_name,
419 "SIP TCP listener ready for incoming connections at %.*s:%d",
420 (int)listener->factory.addr_name.host.slen,
421 listener->factory.addr_name.host.ptr,
422 listener->factory.addr_name.port));
423
424 /* Return the pointer to user */
425 if (p_factory) *p_factory = &listener->factory;
426
427 return PJ_SUCCESS;
428
429on_error:
430 if (listener->asock==NULL && sock!=PJ_INVALID_SOCKET)
431 pj_sock_close(sock);
432 lis_destroy(&listener->factory);
433 return status;
434}
435
436
437/*
438 * This is the public API to create, initialize, register, and start the
439 * TCP listener.
440 */
441PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt,
442 const pj_sockaddr_in *local,
443 const pjsip_host_port *a_name,
444 unsigned async_cnt,
445 pjsip_tpfactory **p_factory)
446{
447 pjsip_tcp_transport_cfg cfg;
448
449 pjsip_tcp_transport_cfg_default(&cfg, pj_AF_INET());
450
451 if (local)
452 pj_sockaddr_cp(&cfg.bind_addr, local);
453 else
454 pj_sockaddr_init(cfg.af, &cfg.bind_addr, NULL, 0);
455
456 if (a_name)
457 pj_memcpy(&cfg.addr_name, a_name, sizeof(*a_name));
458
459 if (async_cnt)
460 cfg.async_cnt = async_cnt;
461
462 return pjsip_tcp_transport_start3(endpt, &cfg, p_factory);
463}
464
465
466/*
467 * This is the public API to create, initialize, register, and start the
468 * TCP listener.
469 */
470PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt,
471 const pj_sockaddr_in *local,
472 unsigned async_cnt,
473 pjsip_tpfactory **p_factory)
474{
475 return pjsip_tcp_transport_start2(endpt, local, NULL, async_cnt, p_factory);
476}
477
478
479/* This callback is called by transport manager to destroy listener */
480static pj_status_t lis_destroy(pjsip_tpfactory *factory)
481{
482 struct tcp_listener *listener = (struct tcp_listener *)factory;
483
484 if (listener->is_registered) {
485 pjsip_tpmgr_unregister_tpfactory(listener->tpmgr, &listener->factory);
486 listener->is_registered = PJ_FALSE;
487 }
488
489 if (listener->asock) {
490 pj_activesock_close(listener->asock);
491 listener->asock = NULL;
492 }
493
494 if (listener->factory.lock) {
495 pj_lock_destroy(listener->factory.lock);
496 listener->factory.lock = NULL;
497 }
498
499 if (listener->factory.pool) {
500 pj_pool_t *pool = listener->factory.pool;
501
502 PJ_LOG(4,(listener->factory.obj_name, "SIP TCP listener destroyed"));
503
504 listener->factory.pool = NULL;
505 pj_pool_release(pool);
506 }
507
508 return PJ_SUCCESS;
509}
510
511
512/***************************************************************************/
513/*
514 * TCP Transport
515 */
516
517/*
518 * Prototypes.
519 */
520/* Called by transport manager to send message */
521static pj_status_t tcp_send_msg(pjsip_transport *transport,
522 pjsip_tx_data *tdata,
523 const pj_sockaddr_t *rem_addr,
524 int addr_len,
525 void *token,
526 pjsip_transport_callback callback);
527
528/* Called by transport manager to shutdown */
529static pj_status_t tcp_shutdown(pjsip_transport *transport);
530
531/* Called by transport manager to destroy transport */
532static pj_status_t tcp_destroy_transport(pjsip_transport *transport);
533
534/* Utility to destroy transport */
535static pj_status_t tcp_destroy(pjsip_transport *transport,
536 pj_status_t reason);
537
538/* Callback on incoming data */
539static pj_bool_t on_data_read(pj_activesock_t *asock,
540 void *data,
541 pj_size_t size,
542 pj_status_t status,
543 pj_size_t *remainder);
544
545/* Callback when packet is sent */
546static pj_bool_t on_data_sent(pj_activesock_t *asock,
547 pj_ioqueue_op_key_t *send_key,
548 pj_ssize_t sent);
549
550/* Callback when connect completes */
551static pj_bool_t on_connect_complete(pj_activesock_t *asock,
552 pj_status_t status);
553
554/* TCP keep-alive timer callback */
555static void tcp_keep_alive_timer(pj_timer_heap_t *th, pj_timer_entry *e);
556
557/*
558 * Common function to create TCP transport, called when pending accept() and
559 * pending connect() complete.
560 */
561static pj_status_t tcp_create( struct tcp_listener *listener,
562 pj_pool_t *pool,
563 pj_sock_t sock, pj_bool_t is_server,
564 const pj_sockaddr *local,
565 const pj_sockaddr *remote,
566 struct tcp_transport **p_tcp)
567{
568 struct tcp_transport *tcp;
569 pj_ioqueue_t *ioqueue;
570 pj_activesock_cfg asock_cfg;
571 pj_activesock_cb tcp_callback;
572 const pj_str_t ka_pkt = PJSIP_TCP_KEEP_ALIVE_DATA;
573 char print_addr[PJ_INET6_ADDRSTRLEN+10];
574 pj_status_t status;
575
576
577 PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_EINVAL);
578
579
580 if (pool == NULL) {
581 pool = pjsip_endpt_create_pool(listener->endpt, "tcp",
582 POOL_TP_INIT, POOL_TP_INC);
583 PJ_ASSERT_RETURN(pool != NULL, PJ_ENOMEM);
584 }
585
586 /*
587 * Create and initialize basic transport structure.
588 */
589 tcp = PJ_POOL_ZALLOC_T(pool, struct tcp_transport);
590 tcp->is_server = is_server;
591 tcp->sock = sock;
592 /*tcp->listener = listener;*/
593 pj_list_init(&tcp->delayed_list);
594 tcp->base.pool = pool;
595
596 pj_ansi_snprintf(tcp->base.obj_name, PJ_MAX_OBJ_NAME,
597 (is_server ? "tcps%p" :"tcpc%p"), tcp);
598
599 status = pj_atomic_create(pool, 0, &tcp->base.ref_cnt);
600 if (status != PJ_SUCCESS) {
601 goto on_error;
602 }
603
604 status = pj_lock_create_recursive_mutex(pool, "tcp", &tcp->base.lock);
605 if (status != PJ_SUCCESS) {
606 goto on_error;
607 }
608
609 tcp->base.key.type = listener->factory.type;
610 pj_sockaddr_cp(&tcp->base.key.rem_addr, remote);
611 tcp->base.type_name = (char*)pjsip_transport_get_type_name(
612 (pjsip_transport_type_e)tcp->base.key.type);
613 tcp->base.flag = pjsip_transport_get_flag_from_type(
614 (pjsip_transport_type_e)tcp->base.key.type);
615
616 tcp->base.info = (char*) pj_pool_alloc(pool, 64);
617 pj_ansi_snprintf(tcp->base.info, 64, "%s to %s",
618 tcp->base.type_name,
619 pj_sockaddr_print(remote, print_addr,
620 sizeof(print_addr), 3));
621
622 tcp->base.addr_len = pj_sockaddr_get_len(remote);
623 pj_sockaddr_cp(&tcp->base.local_addr, local);
624 sockaddr_to_host_port(pool, &tcp->base.local_name, local);
625 sockaddr_to_host_port(pool, &tcp->base.remote_name, remote);
626 tcp->base.dir = is_server? PJSIP_TP_DIR_INCOMING : PJSIP_TP_DIR_OUTGOING;
627
628 tcp->base.endpt = listener->endpt;
629 tcp->base.tpmgr = listener->tpmgr;
630 tcp->base.send_msg = &tcp_send_msg;
631 tcp->base.do_shutdown = &tcp_shutdown;
632 tcp->base.destroy = &tcp_destroy_transport;
633
634 /* Create active socket */
635 pj_activesock_cfg_default(&asock_cfg);
636 asock_cfg.async_cnt = 1;
637
638 pj_bzero(&tcp_callback, sizeof(tcp_callback));
639 tcp_callback.on_data_read = &on_data_read;
640 tcp_callback.on_data_sent = &on_data_sent;
641 tcp_callback.on_connect_complete = &on_connect_complete;
642
643 ioqueue = pjsip_endpt_get_ioqueue(listener->endpt);
644 status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg,
645 ioqueue, &tcp_callback, tcp, &tcp->asock);
646 if (status != PJ_SUCCESS) {
647 goto on_error;
648 }
649
650 /* Register transport to transport manager */
651 status = pjsip_transport_register(listener->tpmgr, &tcp->base);
652 if (status != PJ_SUCCESS) {
653 goto on_error;
654 }
655
656 tcp->is_registered = PJ_TRUE;
657
658 /* Initialize keep-alive timer */
659 tcp->ka_timer.user_data = (void*)tcp;
660 tcp->ka_timer.cb = &tcp_keep_alive_timer;
661 pj_ioqueue_op_key_init(&tcp->ka_op_key.key, sizeof(pj_ioqueue_op_key_t));
662 pj_strdup(tcp->base.pool, &tcp->ka_pkt, &ka_pkt);
663
664 /* Done setting up basic transport. */
665 *p_tcp = tcp;
666
667 PJ_LOG(4,(tcp->base.obj_name, "TCP %s transport created",
668 (tcp->is_server ? "server" : "client")));
669
670 return PJ_SUCCESS;
671
672on_error:
673 tcp_destroy(&tcp->base, status);
674 return status;
675}
676
677
678/* Flush all delayed transmision once the socket is connected. */
679static void tcp_flush_pending_tx(struct tcp_transport *tcp)
680{
681 pj_time_val now;
682
683 pj_gettickcount(&now);
684 pj_lock_acquire(tcp->base.lock);
685 while (!pj_list_empty(&tcp->delayed_list)) {
686 struct delayed_tdata *pending_tx;
687 pjsip_tx_data *tdata;
688 pj_ioqueue_op_key_t *op_key;
689 pj_ssize_t size;
690 pj_status_t status;
691
692 pending_tx = tcp->delayed_list.next;
693 pj_list_erase(pending_tx);
694
695 tdata = pending_tx->tdata_op_key->tdata;
696 op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
697
698 if (pending_tx->timeout.sec > 0 &&
699 PJ_TIME_VAL_GT(now, pending_tx->timeout))
700 {
701 continue;
702 }
703
704 /* send! */
705 size = tdata->buf.cur - tdata->buf.start;
706 status = pj_activesock_send(tcp->asock, op_key, tdata->buf.start,
707 &size, 0);
708 if (status != PJ_EPENDING) {
709 pj_lock_release(tcp->base.lock);
710 on_data_sent(tcp->asock, op_key, size);
711 pj_lock_acquire(tcp->base.lock);
712 }
713
714 }
715 pj_lock_release(tcp->base.lock);
716}
717
718
719/* Called by transport manager to destroy transport */
720static pj_status_t tcp_destroy_transport(pjsip_transport *transport)
721{
722 struct tcp_transport *tcp = (struct tcp_transport*)transport;
723
724 /* Transport would have been unregistered by now since this callback
725 * is called by transport manager.
726 */
727 tcp->is_registered = PJ_FALSE;
728
729 return tcp_destroy(transport, tcp->close_reason);
730}
731
732
733/* Destroy TCP transport */
734static pj_status_t tcp_destroy(pjsip_transport *transport,
735 pj_status_t reason)
736{
737 struct tcp_transport *tcp = (struct tcp_transport*)transport;
738
739 if (tcp->close_reason == 0)
740 tcp->close_reason = reason;
741
742 if (tcp->is_registered) {
743 tcp->is_registered = PJ_FALSE;
744 pjsip_transport_destroy(transport);
745
746 /* pjsip_transport_destroy will recursively call this function
747 * again.
748 */
749 return PJ_SUCCESS;
750 }
751
752 /* Mark transport as closing */
753 tcp->is_closing = PJ_TRUE;
754
755 /* Stop keep-alive timer. */
756 if (tcp->ka_timer.id) {
757 pjsip_endpt_cancel_timer(tcp->base.endpt, &tcp->ka_timer);
758 tcp->ka_timer.id = PJ_FALSE;
759 }
760
761 /* Cancel all delayed transmits */
762 while (!pj_list_empty(&tcp->delayed_list)) {
763 struct delayed_tdata *pending_tx;
764 pj_ioqueue_op_key_t *op_key;
765
766 pending_tx = tcp->delayed_list.next;
767 pj_list_erase(pending_tx);
768
769 op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
770
771 on_data_sent(tcp->asock, op_key, -reason);
772 }
773
774 if (tcp->rdata.tp_info.pool) {
775 pj_pool_release(tcp->rdata.tp_info.pool);
776 tcp->rdata.tp_info.pool = NULL;
777 }
778
779 if (tcp->asock) {
780 pj_activesock_close(tcp->asock);
781 tcp->asock = NULL;
782 tcp->sock = PJ_INVALID_SOCKET;
783 } else if (tcp->sock != PJ_INVALID_SOCKET) {
784 pj_sock_close(tcp->sock);
785 tcp->sock = PJ_INVALID_SOCKET;
786 }
787
788 if (tcp->base.lock) {
789 pj_lock_destroy(tcp->base.lock);
790 tcp->base.lock = NULL;
791 }
792
793 if (tcp->base.ref_cnt) {
794 pj_atomic_destroy(tcp->base.ref_cnt);
795 tcp->base.ref_cnt = NULL;
796 }
797
798 if (tcp->base.pool) {
799 pj_pool_t *pool;
800
801 if (reason != PJ_SUCCESS) {
802 char errmsg[PJ_ERR_MSG_SIZE];
803
804 pj_strerror(reason, errmsg, sizeof(errmsg));
805 PJ_LOG(4,(tcp->base.obj_name,
806 "TCP transport destroyed with reason %d: %s",
807 reason, errmsg));
808
809 } else {
810
811 PJ_LOG(4,(tcp->base.obj_name,
812 "TCP transport destroyed normally"));
813
814 }
815
816 pool = tcp->base.pool;
817 tcp->base.pool = NULL;
818 pj_pool_release(pool);
819 }
820
821 return PJ_SUCCESS;
822}
823
824
825/*
826 * This utility function creates receive data buffers and start
827 * asynchronous recv() operations from the socket. It is called after
828 * accept() or connect() operation complete.
829 */
830static pj_status_t tcp_start_read(struct tcp_transport *tcp)
831{
832 pj_pool_t *pool;
833 pj_uint32_t size;
834 pj_sockaddr *rem_addr;
835 void *readbuf[1];
836 pj_status_t status;
837
838 /* Init rdata */
839 pool = pjsip_endpt_create_pool(tcp->base.endpt,
840 "rtd%p",
841 PJSIP_POOL_RDATA_LEN,
842 PJSIP_POOL_RDATA_INC);
843 if (!pool) {
844 tcp_perror(tcp->base.obj_name, "Unable to create pool", PJ_ENOMEM);
845 return PJ_ENOMEM;
846 }
847
848 tcp->rdata.tp_info.pool = pool;
849
850 tcp->rdata.tp_info.transport = &tcp->base;
851 tcp->rdata.tp_info.tp_data = tcp;
852 tcp->rdata.tp_info.op_key.rdata = &tcp->rdata;
853 pj_ioqueue_op_key_init(&tcp->rdata.tp_info.op_key.op_key,
854 sizeof(pj_ioqueue_op_key_t));
855
856 tcp->rdata.pkt_info.src_addr = tcp->base.key.rem_addr;
857 tcp->rdata.pkt_info.src_addr_len = sizeof(tcp->rdata.pkt_info.src_addr);
858 rem_addr = &tcp->base.key.rem_addr;
859 pj_sockaddr_print(rem_addr, tcp->rdata.pkt_info.src_name,
860 sizeof(tcp->rdata.pkt_info.src_name), 0);
861 tcp->rdata.pkt_info.src_port = pj_sockaddr_get_port(rem_addr);
862
863 size = sizeof(tcp->rdata.pkt_info.packet);
864 readbuf[0] = tcp->rdata.pkt_info.packet;
865 status = pj_activesock_start_read2(tcp->asock, tcp->base.pool, size,
866 readbuf, 0);
867 if (status != PJ_SUCCESS && status != PJ_EPENDING) {
868 PJ_LOG(4, (tcp->base.obj_name,
869 "pj_activesock_start_read() error, status=%d",
870 status));
871 return status;
872 }
873
874 return PJ_SUCCESS;
875}
876
877
878/* This callback is called by transport manager for the TCP factory
879 * to create outgoing transport to the specified destination.
880 */
881static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
882 pjsip_tpmgr *mgr,
883 pjsip_endpoint *endpt,
884 const pj_sockaddr *rem_addr,
885 int addr_len,
886 pjsip_transport **p_transport)
887{
888 struct tcp_listener *listener;
889 struct tcp_transport *tcp;
890 pj_sock_t sock;
891 pj_sockaddr local_addr;
892 pj_status_t status;
893
894 /* Sanity checks */
895 PJ_ASSERT_RETURN(factory && mgr && endpt && rem_addr &&
896 addr_len && p_transport, PJ_EINVAL);
897
898 /* Check that address is a sockaddr_in or sockaddr_in6*/
899 PJ_ASSERT_RETURN((rem_addr->addr.sa_family == pj_AF_INET() &&
900 addr_len == sizeof(pj_sockaddr_in)) ||
901 (rem_addr->addr.sa_family == pj_AF_INET6() &&
902 addr_len == sizeof(pj_sockaddr_in6)), PJ_EINVAL);
903
904
905 listener = (struct tcp_listener*)factory;
906
907 /* Create socket */
908 status = pj_sock_socket(rem_addr->addr.sa_family, pj_SOCK_STREAM(),
909 0, &sock);
910 if (status != PJ_SUCCESS)
911 return status;
912
913 /* Apply QoS, if specified */
914 status = pj_sock_apply_qos2(sock, listener->qos_type,
915 &listener->qos_params,
916 2, listener->factory.obj_name,
917 "outgoing SIP TCP socket");
918
919 /* Bind to listener's address and any port */
920 pj_bzero(&local_addr, sizeof(local_addr));
921 pj_sockaddr_cp(&local_addr, &listener->bound_addr);
922 pj_sockaddr_set_port(&local_addr, 0);
923
924 status = pj_sock_bind(sock, &local_addr,
925 pj_sockaddr_get_len(&local_addr));
926 if (status != PJ_SUCCESS) {
927 pj_sock_close(sock);
928 return status;
929 }
930
931 /* Get the local port */
932 addr_len = sizeof(local_addr);
933 status = pj_sock_getsockname(sock, &local_addr, &addr_len);
934 if (status != PJ_SUCCESS) {
935 pj_sock_close(sock);
936 return status;
937 }
938
939 /* Initially set the address from the listener's address */
940 if (!pj_sockaddr_has_addr(&local_addr)) {
941 pj_sockaddr_copy_addr(&local_addr, &listener->factory.local_addr);
942 }
943
944 /* Create the transport descriptor */
945 status = tcp_create(listener, NULL, sock, PJ_FALSE, &local_addr,
946 rem_addr, &tcp);
947 if (status != PJ_SUCCESS)
948 return status;
949
950
951 /* Start asynchronous connect() operation */
952 tcp->has_pending_connect = PJ_TRUE;
953 status = pj_activesock_start_connect(tcp->asock, tcp->base.pool, rem_addr,
954 addr_len);
955 if (status == PJ_SUCCESS) {
956 on_connect_complete(tcp->asock, PJ_SUCCESS);
957 } else if (status != PJ_EPENDING) {
958 tcp_destroy(&tcp->base, status);
959 return status;
960 }
961
962 if (tcp->has_pending_connect) {
963 /* Update (again) local address, just in case local address currently
964 * set is different now that asynchronous connect() is started.
965 */
966 addr_len = sizeof(local_addr);
967 if (pj_sock_getsockname(sock, &local_addr, &addr_len)==PJ_SUCCESS) {
968 pj_sockaddr *tp_addr = &tcp->base.local_addr;
969
970 /* Some systems (like old Win32 perhaps) may not set local address
971 * properly before socket is fully connected.
972 */
973 if (pj_sockaddr_cmp(tp_addr, &local_addr) &&
974 pj_sockaddr_has_addr(&local_addr) &&
975 pj_sockaddr_get_port(&local_addr) != 0)
976 {
977 pj_sockaddr_cp(tp_addr, &local_addr);
978 sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name,
979 &local_addr);
980 }
981 }
982
983 PJ_LOG(4,(tcp->base.obj_name,
984 "TCP transport %.*s:%d is connecting to %.*s:%d...",
985 (int)tcp->base.local_name.host.slen,
986 tcp->base.local_name.host.ptr,
987 tcp->base.local_name.port,
988 (int)tcp->base.remote_name.host.slen,
989 tcp->base.remote_name.host.ptr,
990 tcp->base.remote_name.port));
991 }
992
993 /* Done */
994 *p_transport = &tcp->base;
995
996 return PJ_SUCCESS;
997}
998
999
1000/*
1001 * This callback is called by active socket when pending accept() operation
1002 * has completed.
1003 */
1004static pj_bool_t on_accept_complete(pj_activesock_t *asock,
1005 pj_sock_t sock,
1006 const pj_sockaddr_t *src_addr,
1007 int src_addr_len)
1008{
1009 struct tcp_listener *listener;
1010 struct tcp_transport *tcp;
1011 char addr[PJ_INET6_ADDRSTRLEN+10];
1012 pjsip_tp_state_callback state_cb;
1013 pj_sockaddr tmp_src_addr;
1014 pj_status_t status;
1015
1016 PJ_UNUSED_ARG(src_addr_len);
1017
1018 listener = (struct tcp_listener*) pj_activesock_get_user_data(asock);
1019
1020 PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_TRUE);
1021
1022 PJ_LOG(4,(listener->factory.obj_name,
1023 "TCP listener %.*s:%d: got incoming TCP connection "
1024 "from %s, sock=%d",
1025 (int)listener->factory.addr_name.host.slen,
1026 listener->factory.addr_name.host.ptr,
1027 listener->factory.addr_name.port,
1028 pj_sockaddr_print(src_addr, addr, sizeof(addr), 3),
1029 sock));
1030
1031 /* Apply QoS, if specified */
1032 status = pj_sock_apply_qos2(sock, listener->qos_type,
1033 &listener->qos_params,
1034 2, listener->factory.obj_name,
1035 "incoming SIP TCP socket");
1036
1037 /* tcp_create() expect pj_sockaddr, so copy src_addr to temporary var,
1038 * just in case.
1039 */
1040 pj_bzero(&tmp_src_addr, sizeof(tmp_src_addr));
1041 pj_sockaddr_cp(&tmp_src_addr, src_addr);
1042
1043 /*
1044 * Incoming connection!
1045 * Create TCP transport for the new socket.
1046 */
1047 status = tcp_create( listener, NULL, sock, PJ_TRUE,
1048 &listener->factory.local_addr,
1049 &tmp_src_addr, &tcp);
1050 if (status == PJ_SUCCESS) {
1051 status = tcp_start_read(tcp);
1052 if (status != PJ_SUCCESS) {
1053 PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled"));
1054 tcp_destroy(&tcp->base, status);
1055 } else {
1056 /* Start keep-alive timer */
1057 if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) {
1058 pj_time_val delay = {PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0};
1059 pjsip_endpt_schedule_timer(listener->endpt,
1060 &tcp->ka_timer,
1061 &delay);
1062 tcp->ka_timer.id = PJ_TRUE;
1063 pj_gettimeofday(&tcp->last_activity);
1064 }
1065
1066 /* Notify application of transport state accepted */
1067 state_cb = pjsip_tpmgr_get_state_cb(tcp->base.tpmgr);
1068 if (state_cb) {
1069 pjsip_transport_state_info state_info;
1070
1071 pj_bzero(&state_info, sizeof(state_info));
1072 (*state_cb)(&tcp->base, PJSIP_TP_STATE_CONNECTED, &state_info);
1073 }
1074 }
1075 }
1076
1077 return PJ_TRUE;
1078}
1079
1080
1081/*
1082 * Callback from ioqueue when packet is sent.
1083 */
1084static pj_bool_t on_data_sent(pj_activesock_t *asock,
1085 pj_ioqueue_op_key_t *op_key,
1086 pj_ssize_t bytes_sent)
1087{
1088 struct tcp_transport *tcp = (struct tcp_transport*)
1089 pj_activesock_get_user_data(asock);
1090 pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key;
1091
1092 /* Note that op_key may be the op_key from keep-alive, thus
1093 * it will not have tdata etc.
1094 */
1095
1096 tdata_op_key->tdata = NULL;
1097
1098 if (tdata_op_key->callback) {
1099 /*
1100 * Notify sip_transport.c that packet has been sent.
1101 */
1102 if (bytes_sent == 0)
1103 bytes_sent = -PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);
1104
1105 tdata_op_key->callback(&tcp->base, tdata_op_key->token, bytes_sent);
1106
1107 /* Mark last activity time */
1108 pj_gettimeofday(&tcp->last_activity);
1109
1110 }
1111
1112 /* Check for error/closure */
1113 if (bytes_sent <= 0) {
1114 pj_status_t status;
1115
1116 PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d",
1117 bytes_sent));
1118
1119 status = (bytes_sent == 0) ? PJ_RETURN_OS_ERROR(OSERR_ENOTCONN) :
1120 (pj_status_t)-bytes_sent;
1121
1122 tcp_init_shutdown(tcp, status);
1123
1124 return PJ_FALSE;
1125 }
1126
1127 return PJ_TRUE;
1128}
1129
1130
1131/*
1132 * This callback is called by transport manager to send SIP message
1133 */
1134static pj_status_t tcp_send_msg(pjsip_transport *transport,
1135 pjsip_tx_data *tdata,
1136 const pj_sockaddr_t *rem_addr,
1137 int addr_len,
1138 void *token,
1139 pjsip_transport_callback callback)
1140{
1141 struct tcp_transport *tcp = (struct tcp_transport*)transport;
1142 pj_ssize_t size;
1143 pj_bool_t delayed = PJ_FALSE;
1144 pj_status_t status = PJ_SUCCESS;
1145
1146 /* Sanity check */
1147 PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL);
1148
1149 /* Check that there's no pending operation associated with the tdata */
1150 PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX);
1151
1152 /* Check the address is supported */
1153 PJ_ASSERT_RETURN(rem_addr && (addr_len==sizeof(pj_sockaddr_in) ||
1154 addr_len==sizeof(pj_sockaddr_in6)),
1155 PJ_EINVAL);
1156
1157 /* Init op key. */
1158 tdata->op_key.tdata = tdata;
1159 tdata->op_key.token = token;
1160 tdata->op_key.callback = callback;
1161
1162 /* If asynchronous connect() has not completed yet, just put the
1163 * transmit data in the pending transmission list since we can not
1164 * use the socket yet.
1165 */
1166 if (tcp->has_pending_connect) {
1167
1168 /*
1169 * Looks like connect() is still in progress. Check again (this time
1170 * with holding the lock) to be sure.
1171 */
1172 pj_lock_acquire(tcp->base.lock);
1173
1174 if (tcp->has_pending_connect) {
1175 struct delayed_tdata *delayed_tdata;
1176
1177 /*
1178 * connect() is still in progress. Put the transmit data to
1179 * the delayed list.
1180 * Starting from #1583 (https://trac.pjsip.org/repos/ticket/1583),
1181 * we also add timeout value for the transmit data. When the
1182 * connect() is completed, the timeout value will be checked to
1183 * determine whether the transmit data needs to be sent.
1184 */
1185 delayed_tdata = PJ_POOL_ZALLOC_T(tdata->pool,
1186 struct delayed_tdata);
1187 delayed_tdata->tdata_op_key = &tdata->op_key;
1188 if (tdata->msg && tdata->msg->type == PJSIP_REQUEST_MSG) {
1189 pj_gettickcount(&delayed_tdata->timeout);
1190 delayed_tdata->timeout.msec += pjsip_cfg()->tsx.td;
1191 pj_time_val_normalize(&delayed_tdata->timeout);
1192 }
1193
1194 pj_list_push_back(&tcp->delayed_list, delayed_tdata);
1195 status = PJ_EPENDING;
1196
1197 /* Prevent pj_ioqueue_send() to be called below */
1198 delayed = PJ_TRUE;
1199 }
1200
1201 pj_lock_release(tcp->base.lock);
1202 }
1203
1204 if (!delayed) {
1205 /*
1206 * Transport is ready to go. Send the packet to ioqueue to be
1207 * sent asynchronously.
1208 */
1209 size = tdata->buf.cur - tdata->buf.start;
1210 status = pj_activesock_send(tcp->asock,
1211 (pj_ioqueue_op_key_t*)&tdata->op_key,
1212 tdata->buf.start, &size, 0);
1213
1214 if (status != PJ_EPENDING) {
1215 /* Not pending (could be immediate success or error) */
1216 tdata->op_key.tdata = NULL;
1217
1218 /* Shutdown transport on closure/errors */
1219 if (size <= 0) {
1220
1221 PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d",
1222 size));
1223
1224 if (status == PJ_SUCCESS)
1225 status = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);
1226
1227 tcp_init_shutdown(tcp, status);
1228 }
1229 }
1230 }
1231
1232 return status;
1233}
1234
1235
1236/*
1237 * This callback is called by transport manager to shutdown transport.
1238 */
1239static pj_status_t tcp_shutdown(pjsip_transport *transport)
1240{
1241 struct tcp_transport *tcp = (struct tcp_transport*)transport;
1242
1243 /* Stop keep-alive timer. */
1244 if (tcp->ka_timer.id) {
1245 pjsip_endpt_cancel_timer(tcp->base.endpt, &tcp->ka_timer);
1246 tcp->ka_timer.id = PJ_FALSE;
1247 }
1248
1249 return PJ_SUCCESS;
1250}
1251
1252
1253/*
1254 * Callback from ioqueue that an incoming data is received from the socket.
1255 */
1256static pj_bool_t on_data_read(pj_activesock_t *asock,
1257 void *data,
1258 pj_size_t size,
1259 pj_status_t status,
1260 pj_size_t *remainder)
1261{
1262 enum { MAX_IMMEDIATE_PACKET = 10 };
1263 struct tcp_transport *tcp;
1264 pjsip_rx_data *rdata;
1265
1266 PJ_UNUSED_ARG(data);
1267
1268 tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock);
1269 rdata = &tcp->rdata;
1270
1271 /* Don't do anything if transport is closing. */
1272 if (tcp->is_closing) {
1273 tcp->is_closing++;
1274 return PJ_FALSE;
1275 }
1276
1277 /* Houston, we have packet! Report the packet to transport manager
1278 * to be parsed.
1279 */
1280 if (status == PJ_SUCCESS) {
1281 pj_size_t size_eaten;
1282
1283 /* Mark this as an activity */
1284 pj_gettimeofday(&tcp->last_activity);
1285
1286 pj_assert((void*)rdata->pkt_info.packet == data);
1287
1288 /* Init pkt_info part. */
1289 rdata->pkt_info.len = size;
1290 rdata->pkt_info.zero = 0;
1291 pj_gettimeofday(&rdata->pkt_info.timestamp);
1292
1293 /* Report to transport manager.
1294 * The transport manager will tell us how many bytes of the packet
1295 * have been processed (as valid SIP message).
1296 */
1297 size_eaten =
1298 pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr,
1299 rdata);
1300
1301 pj_assert(size_eaten <= (pj_size_t)rdata->pkt_info.len);
1302
1303 /* Move unprocessed data to the front of the buffer */
1304 *remainder = size - size_eaten;
1305 if (*remainder > 0 && *remainder != size) {
1306 pj_memmove(rdata->pkt_info.packet,
1307 rdata->pkt_info.packet + size_eaten,
1308 *remainder);
1309 }
1310
1311 } else {
1312
1313 /* Transport is closed */
1314 PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed"));
1315
1316 tcp_init_shutdown(tcp, status);
1317
1318 return PJ_FALSE;
1319
1320 }
1321
1322 /* Reset pool. */
1323 pj_pool_reset(rdata->tp_info.pool);
1324
1325 return PJ_TRUE;
1326}
1327
1328
1329/*
1330 * Callback from ioqueue when asynchronous connect() operation completes.
1331 */
1332static pj_bool_t on_connect_complete(pj_activesock_t *asock,
1333 pj_status_t status)
1334{
1335 struct tcp_transport *tcp;
1336 pj_sockaddr addr;
1337 int addrlen;
1338 pjsip_tp_state_callback state_cb;
1339
1340 tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock);
1341
1342 /* Mark that pending connect() operation has completed. */
1343 tcp->has_pending_connect = PJ_FALSE;
1344
1345 /* Check connect() status */
1346 if (status != PJ_SUCCESS) {
1347
1348 tcp_perror(tcp->base.obj_name, "TCP connect() error", status);
1349
1350 /* Cancel all delayed transmits */
1351 while (!pj_list_empty(&tcp->delayed_list)) {
1352 struct delayed_tdata *pending_tx;
1353 pj_ioqueue_op_key_t *op_key;
1354
1355 pending_tx = tcp->delayed_list.next;
1356 pj_list_erase(pending_tx);
1357
1358 op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
1359
1360 on_data_sent(tcp->asock, op_key, -status);
1361 }
1362
1363 tcp_init_shutdown(tcp, status);
1364 return PJ_FALSE;
1365 }
1366
1367 PJ_LOG(4,(tcp->base.obj_name,
1368 "TCP transport %.*s:%d is connected to %.*s:%d",
1369 (int)tcp->base.local_name.host.slen,
1370 tcp->base.local_name.host.ptr,
1371 tcp->base.local_name.port,
1372 (int)tcp->base.remote_name.host.slen,
1373 tcp->base.remote_name.host.ptr,
1374 tcp->base.remote_name.port));
1375
1376
1377 /* Update (again) local address, just in case local address currently
1378 * set is different now that the socket is connected (could happen
1379 * on some systems, like old Win32 probably?).
1380 */
1381 addrlen = sizeof(addr);
1382 if (pj_sock_getsockname(tcp->sock, &addr, &addrlen)==PJ_SUCCESS) {
1383 pj_sockaddr *tp_addr = &tcp->base.local_addr;
1384
1385 if (pj_sockaddr_has_addr(&addr) &&
1386 pj_sockaddr_cmp(&addr, tp_addr) != 0)
1387 {
1388 pj_sockaddr_cp(tp_addr, &addr);
1389 sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name,
1390 tp_addr);
1391 }
1392 }
1393
1394 /* Start pending read */
1395 status = tcp_start_read(tcp);
1396 if (status != PJ_SUCCESS) {
1397 tcp_init_shutdown(tcp, status);
1398 return PJ_FALSE;
1399 }
1400
1401 /* Notify application of transport state connected */
1402 state_cb = pjsip_tpmgr_get_state_cb(tcp->base.tpmgr);
1403 if (state_cb) {
1404 pjsip_transport_state_info state_info;
1405
1406 pj_bzero(&state_info, sizeof(state_info));
1407 (*state_cb)(&tcp->base, PJSIP_TP_STATE_CONNECTED, &state_info);
1408 }
1409
1410 /* Flush all pending send operations */
1411 tcp_flush_pending_tx(tcp);
1412
1413 /* Start keep-alive timer */
1414 if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) {
1415 pj_time_val delay = { PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0 };
1416 pjsip_endpt_schedule_timer(tcp->base.endpt, &tcp->ka_timer,
1417 &delay);
1418 tcp->ka_timer.id = PJ_TRUE;
1419 pj_gettimeofday(&tcp->last_activity);
1420 }
1421
1422 return PJ_TRUE;
1423}
1424
1425/* Transport keep-alive timer callback */
1426static void tcp_keep_alive_timer(pj_timer_heap_t *th, pj_timer_entry *e)
1427{
1428 struct tcp_transport *tcp = (struct tcp_transport*) e->user_data;
1429 pj_time_val delay;
1430 pj_time_val now;
1431 pj_ssize_t size;
1432 pj_status_t status;
1433
1434 PJ_UNUSED_ARG(th);
1435
1436 tcp->ka_timer.id = PJ_TRUE;
1437
1438 pj_gettimeofday(&now);
1439 PJ_TIME_VAL_SUB(now, tcp->last_activity);
1440
1441 if (now.sec > 0 && now.sec < PJSIP_TCP_KEEP_ALIVE_INTERVAL) {
1442 /* There has been activity, so don't send keep-alive */
1443 delay.sec = PJSIP_TCP_KEEP_ALIVE_INTERVAL - now.sec;
1444 delay.msec = 0;
1445
1446 pjsip_endpt_schedule_timer(tcp->base.endpt, &tcp->ka_timer,
1447 &delay);
1448 tcp->ka_timer.id = PJ_TRUE;
1449 return;
1450 }
1451
1452 PJ_LOG(5,(tcp->base.obj_name, "Sending %d byte(s) keep-alive to %.*s:%d",
1453 (int)tcp->ka_pkt.slen, (int)tcp->base.remote_name.host.slen,
1454 tcp->base.remote_name.host.ptr,
1455 tcp->base.remote_name.port));
1456
1457 /* Send the data */
1458 size = tcp->ka_pkt.slen;
1459 status = pj_activesock_send(tcp->asock, &tcp->ka_op_key.key,
1460 tcp->ka_pkt.ptr, &size, 0);
1461
1462 if (status != PJ_SUCCESS && status != PJ_EPENDING) {
1463 tcp_perror(tcp->base.obj_name,
1464 "Error sending keep-alive packet", status);
1465 tcp_init_shutdown(tcp, status);
1466 return;
1467 }
1468
1469 /* Register next keep-alive */
1470 delay.sec = PJSIP_TCP_KEEP_ALIVE_INTERVAL;
1471 delay.msec = 0;
1472
1473 pjsip_endpt_schedule_timer(tcp->base.endpt, &tcp->ka_timer,
1474 &delay);
1475 tcp->ka_timer.id = PJ_TRUE;
1476}
1477
1478
1479#endif /* PJ_HAS_TCP */
1480