blob: 223598584014ddf07436fa4a5a553f6e5dc2fc9d [file] [log] [blame]
Tristan Matthews0a329cc2013-07-17 13:20:14 -04001/* $Id$ */
2/*
3 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20#include <pjsip/sip_transport_tcp.h>
21#include <pjsip/sip_endpoint.h>
22#include <pjsip/sip_errno.h>
23#include <pj/compat/socket.h>
24#include <pj/addr_resolv.h>
25#include <pj/activesock.h>
26#include <pj/assert.h>
27#include <pj/lock.h>
28#include <pj/log.h>
29#include <pj/os.h>
30#include <pj/pool.h>
31#include <pj/string.h>
32
33/* Only declare the API if PJ_HAS_TCP is true */
34#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
35
36
37#define THIS_FILE "sip_transport_tcp.c"
38
39#define MAX_ASYNC_CNT 16
40#define POOL_LIS_INIT 512
41#define POOL_LIS_INC 512
42#define POOL_TP_INIT 512
43#define POOL_TP_INC 512
44
45struct tcp_listener;
46struct tcp_transport;
47
48
49/*
50 * This is the TCP listener, which is a "descendant" of pjsip_tpfactory (the
51 * SIP transport factory).
52 */
53struct tcp_listener
54{
55 pjsip_tpfactory factory;
56 pj_bool_t is_registered;
57 pjsip_endpoint *endpt;
58 pjsip_tpmgr *tpmgr;
59 pj_activesock_t *asock;
60 pj_sockaddr bound_addr;
61 pj_qos_type qos_type;
62 pj_qos_params qos_params;
63};
64
65
66/*
67 * This structure is used to keep delayed transmit operation in a list.
68 * A delayed transmission occurs when application sends tx_data when
69 * the TCP connect/establishment is still in progress. These delayed
70 * transmission will be "flushed" once the socket is connected (either
71 * successfully or with errors).
72 */
73struct delayed_tdata
74{
75 PJ_DECL_LIST_MEMBER(struct delayed_tdata);
76 pjsip_tx_data_op_key *tdata_op_key;
77 pj_time_val timeout;
78};
79
80
81/*
82 * This structure describes the TCP transport, and it's descendant of
83 * pjsip_transport.
84 */
85struct tcp_transport
86{
87 pjsip_transport base;
88 pj_bool_t is_server;
89
90 /* Do not save listener instance in the transport, because
91 * listener might be destroyed during transport's lifetime.
92 * See http://trac.pjsip.org/repos/ticket/491
93 struct tcp_listener *listener;
94 */
95
96 pj_bool_t is_registered;
97 pj_bool_t is_closing;
98 pj_status_t close_reason;
99 pj_sock_t sock;
100 pj_activesock_t *asock;
101 pj_bool_t has_pending_connect;
102
103 /* Keep-alive timer. */
104 pj_timer_entry ka_timer;
105 pj_time_val last_activity;
106 pjsip_tx_data_op_key ka_op_key;
107 pj_str_t ka_pkt;
108
109 /* TCP transport can only have one rdata!
110 * Otherwise chunks of incoming PDU may be received on different
111 * buffer.
112 */
113 pjsip_rx_data rdata;
114
115 /* Pending transmission list. */
116 struct delayed_tdata delayed_list;
117};
118
119
120/****************************************************************************
121 * PROTOTYPES
122 */
123
124/* This callback is called when pending accept() operation completes. */
125static pj_bool_t on_accept_complete(pj_activesock_t *asock,
126 pj_sock_t newsock,
127 const pj_sockaddr_t *src_addr,
128 int src_addr_len);
129
130/* This callback is called by transport manager to destroy listener */
131static pj_status_t lis_destroy(pjsip_tpfactory *factory);
132
133/* This callback is called by transport manager to create transport */
134static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
135 pjsip_tpmgr *mgr,
136 pjsip_endpoint *endpt,
137 const pj_sockaddr *rem_addr,
138 int addr_len,
139 pjsip_transport **transport);
140
141/* Common function to create and initialize transport */
142static pj_status_t tcp_create(struct tcp_listener *listener,
143 pj_pool_t *pool,
144 pj_sock_t sock, pj_bool_t is_server,
145 const pj_sockaddr *local,
146 const pj_sockaddr *remote,
147 struct tcp_transport **p_tcp);
148
149
150static void tcp_perror(const char *sender, const char *title,
151 pj_status_t status)
152{
153 char errmsg[PJ_ERR_MSG_SIZE];
154
155 pj_strerror(status, errmsg, sizeof(errmsg));
156
157 PJ_LOG(1,(sender, "%s: %s [code=%d]", title, errmsg, status));
158}
159
160
161static void sockaddr_to_host_port( pj_pool_t *pool,
162 pjsip_host_port *host_port,
163 const pj_sockaddr *addr )
164{
165 host_port->host.ptr = (char*) pj_pool_alloc(pool, PJ_INET6_ADDRSTRLEN+4);
166 pj_sockaddr_print(addr, host_port->host.ptr, PJ_INET6_ADDRSTRLEN+4, 0);
167 host_port->host.slen = pj_ansi_strlen(host_port->host.ptr);
168 host_port->port = pj_sockaddr_get_port(addr);
169}
170
171
172static void tcp_init_shutdown(struct tcp_transport *tcp, pj_status_t status)
173{
174 pjsip_tp_state_callback state_cb;
175
176 if (tcp->close_reason == PJ_SUCCESS)
177 tcp->close_reason = status;
178
179 if (tcp->base.is_shutdown)
180 return;
181
182 /* Prevent immediate transport destroy by application, as transport
183 * state notification callback may be stacked and transport instance
184 * must remain valid at any point in the callback.
185 */
186 pjsip_transport_add_ref(&tcp->base);
187
188 /* Notify application of transport disconnected state */
189 state_cb = pjsip_tpmgr_get_state_cb(tcp->base.tpmgr);
190 if (state_cb) {
191 pjsip_transport_state_info state_info;
192
193 pj_bzero(&state_info, sizeof(state_info));
194 state_info.status = tcp->close_reason;
195 (*state_cb)(&tcp->base, PJSIP_TP_STATE_DISCONNECTED, &state_info);
196 }
197
198 /* We can not destroy the transport since high level objects may
199 * still keep reference to this transport. So we can only
200 * instruct transport manager to gracefully start the shutdown
201 * procedure for this transport.
202 */
203 pjsip_transport_shutdown(&tcp->base);
204
205 /* Now, it is ok to destroy the transport. */
206 pjsip_transport_dec_ref(&tcp->base);
207}
208
209
210/*
211 * Initialize pjsip_tcp_transport_cfg structure with default values.
212 */
213PJ_DEF(void) pjsip_tcp_transport_cfg_default(pjsip_tcp_transport_cfg *cfg,
214 int af)
215{
216 pj_bzero(cfg, sizeof(*cfg));
217 cfg->af = af;
218 pj_sockaddr_init(cfg->af, &cfg->bind_addr, NULL, 0);
219 cfg->async_cnt = 1;
220 cfg->reuse_addr = PJSIP_TCP_TRANSPORT_REUSEADDR;
221}
222
223
224/****************************************************************************
225 * The TCP listener/transport factory.
226 */
227
228/*
229 * This is the public API to create, initialize, register, and start the
230 * TCP listener.
231 */
232PJ_DEF(pj_status_t) pjsip_tcp_transport_start3(
233 pjsip_endpoint *endpt,
234 const pjsip_tcp_transport_cfg *cfg,
235 pjsip_tpfactory **p_factory
236 )
237{
238 pj_pool_t *pool;
239 pj_sock_t sock = PJ_INVALID_SOCKET;
240 struct tcp_listener *listener;
241 pj_activesock_cfg asock_cfg;
242 pj_activesock_cb listener_cb;
243 pj_sockaddr *listener_addr;
244 int addr_len;
245 pj_status_t status;
246
247 /* Sanity check */
248 PJ_ASSERT_RETURN(endpt && cfg->async_cnt, PJ_EINVAL);
249
250 /* Verify that address given in a_name (if any) is valid */
251 if (cfg->addr_name.host.slen) {
252 pj_sockaddr tmp;
253
254 status = pj_sockaddr_init(cfg->af, &tmp, &cfg->addr_name.host,
255 (pj_uint16_t)cfg->addr_name.port);
256 if (status != PJ_SUCCESS || !pj_sockaddr_has_addr(&tmp) ||
257 (cfg->af==pj_AF_INET() &&
258 tmp.ipv4.sin_addr.s_addr==PJ_INADDR_NONE))
259 {
260 /* Invalid address */
261 return PJ_EINVAL;
262 }
263 }
264
265 pool = pjsip_endpt_create_pool(endpt, "tcplis", POOL_LIS_INIT,
266 POOL_LIS_INC);
267 PJ_ASSERT_RETURN(pool, PJ_ENOMEM);
268
269
270 listener = PJ_POOL_ZALLOC_T(pool, struct tcp_listener);
271 listener->factory.pool = pool;
272 listener->factory.type = cfg->af==pj_AF_INET() ? PJSIP_TRANSPORT_TCP :
273 PJSIP_TRANSPORT_TCP6;
274 listener->factory.type_name = (char*)
275 pjsip_transport_get_type_name(listener->factory.type);
276 listener->factory.flag =
277 pjsip_transport_get_flag_from_type(listener->factory.type);
278 listener->qos_type = cfg->qos_type;
279 pj_memcpy(&listener->qos_params, &cfg->qos_params,
280 sizeof(cfg->qos_params));
281
282 pj_ansi_strcpy(listener->factory.obj_name, "tcplis");
283 if (listener->factory.type==PJSIP_TRANSPORT_TCP6)
284 pj_ansi_strcat(listener->factory.obj_name, "6");
285
286 status = pj_lock_create_recursive_mutex(pool, listener->factory.obj_name,
287 &listener->factory.lock);
288 if (status != PJ_SUCCESS)
289 goto on_error;
290
291
292 /* Create socket */
293 status = pj_sock_socket(cfg->af, pj_SOCK_STREAM(), 0, &sock);
294 if (status != PJ_SUCCESS)
295 goto on_error;
296
297 /* Apply QoS, if specified */
298 status = pj_sock_apply_qos2(sock, cfg->qos_type, &cfg->qos_params,
299 2, listener->factory.obj_name,
300 "SIP TCP listener socket");
301
302 /* Apply SO_REUSEADDR */
303 if (cfg->reuse_addr) {
304 int enabled = 1;
305 status = pj_sock_setsockopt(sock, pj_SOL_SOCKET(), pj_SO_REUSEADDR(),
306 &enabled, sizeof(enabled));
307 if (status != PJ_SUCCESS) {
308 PJ_PERROR(4,(listener->factory.obj_name, status,
309 "Warning: error applying SO_REUSEADDR"));
310 }
311 }
312
313 /* Bind address may be different than factory.local_addr because
314 * factory.local_addr will be resolved below.
315 */
316 pj_sockaddr_cp(&listener->bound_addr, &cfg->bind_addr);
317
318 /* Bind socket */
319 listener_addr = &listener->factory.local_addr;
320 pj_sockaddr_cp(listener_addr, &cfg->bind_addr);
321
322 status = pj_sock_bind(sock, listener_addr,
323 pj_sockaddr_get_len(listener_addr));
324 if (status != PJ_SUCCESS)
325 goto on_error;
326
327 /* Retrieve the bound address */
328 addr_len = pj_sockaddr_get_len(listener_addr);
329 status = pj_sock_getsockname(sock, listener_addr, &addr_len);
330 if (status != PJ_SUCCESS)
331 goto on_error;
332
333 /* If published host/IP is specified, then use that address as the
334 * listener advertised address.
335 */
336 if (cfg->addr_name.host.slen) {
337 /* Copy the address */
338 listener->factory.addr_name = cfg->addr_name;
339 pj_strdup(listener->factory.pool, &listener->factory.addr_name.host,
340 &cfg->addr_name.host);
341 listener->factory.addr_name.port = cfg->addr_name.port;
342
343 } else {
344 /* No published address is given, use the bound address */
345
346 /* If the address returns 0.0.0.0, use the default
347 * interface address as the transport's address.
348 */
349 if (!pj_sockaddr_has_addr(listener_addr)) {
350 pj_sockaddr hostip;
351
352 status = pj_gethostip(listener->bound_addr.addr.sa_family,
353 &hostip);
354 if (status != PJ_SUCCESS)
355 goto on_error;
356
357 pj_sockaddr_copy_addr(listener_addr, &hostip);
358 }
359
360 /* Save the address name */
361 sockaddr_to_host_port(listener->factory.pool,
362 &listener->factory.addr_name,
363 listener_addr);
364 }
365
366 /* If port is zero, get the bound port */
367 if (listener->factory.addr_name.port == 0) {
368 listener->factory.addr_name.port = pj_sockaddr_get_port(listener_addr);
369 }
370
371 pj_ansi_snprintf(listener->factory.obj_name,
372 sizeof(listener->factory.obj_name),
373 "tcplis:%d", listener->factory.addr_name.port);
374
375
376 /* Start listening to the address */
377 status = pj_sock_listen(sock, PJSIP_TCP_TRANSPORT_BACKLOG);
378 if (status != PJ_SUCCESS)
379 goto on_error;
380
381
382 /* Create active socket */
383 pj_activesock_cfg_default(&asock_cfg);
384 if (cfg->async_cnt > MAX_ASYNC_CNT)
385 asock_cfg.async_cnt = MAX_ASYNC_CNT;
386 else
387 asock_cfg.async_cnt = cfg->async_cnt;
388
389 pj_bzero(&listener_cb, sizeof(listener_cb));
390 listener_cb.on_accept_complete = &on_accept_complete;
391 status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg,
392 pjsip_endpt_get_ioqueue(endpt),
393 &listener_cb, listener,
394 &listener->asock);
395
396 /* Register to transport manager */
397 listener->endpt = endpt;
398 listener->tpmgr = pjsip_endpt_get_tpmgr(endpt);
399 listener->factory.create_transport = lis_create_transport;
400 listener->factory.destroy = lis_destroy;
401 listener->is_registered = PJ_TRUE;
402 status = pjsip_tpmgr_register_tpfactory(listener->tpmgr,
403 &listener->factory);
404 if (status != PJ_SUCCESS) {
405 listener->is_registered = PJ_FALSE;
406 goto on_error;
407 }
408
409 /* Start pending accept() operations */
410 status = pj_activesock_start_accept(listener->asock, pool);
411 if (status != PJ_SUCCESS)
412 goto on_error;
413
414 PJ_LOG(4,(listener->factory.obj_name,
415 "SIP TCP listener ready for incoming connections at %.*s:%d",
416 (int)listener->factory.addr_name.host.slen,
417 listener->factory.addr_name.host.ptr,
418 listener->factory.addr_name.port));
419
420 /* Return the pointer to user */
421 if (p_factory) *p_factory = &listener->factory;
422
423 return PJ_SUCCESS;
424
425on_error:
426 if (listener->asock==NULL && sock!=PJ_INVALID_SOCKET)
427 pj_sock_close(sock);
428 lis_destroy(&listener->factory);
429 return status;
430}
431
432
433/*
434 * This is the public API to create, initialize, register, and start the
435 * TCP listener.
436 */
437PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt,
438 const pj_sockaddr_in *local,
439 const pjsip_host_port *a_name,
440 unsigned async_cnt,
441 pjsip_tpfactory **p_factory)
442{
443 pjsip_tcp_transport_cfg cfg;
444
445 pjsip_tcp_transport_cfg_default(&cfg, pj_AF_INET());
446
447 if (local)
448 pj_sockaddr_cp(&cfg.bind_addr, local);
449 else
450 pj_sockaddr_init(cfg.af, &cfg.bind_addr, NULL, 0);
451
452 if (a_name)
453 pj_memcpy(&cfg.addr_name, a_name, sizeof(*a_name));
454
455 if (async_cnt)
456 cfg.async_cnt = async_cnt;
457
458 return pjsip_tcp_transport_start3(endpt, &cfg, p_factory);
459}
460
461
462/*
463 * This is the public API to create, initialize, register, and start the
464 * TCP listener.
465 */
466PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt,
467 const pj_sockaddr_in *local,
468 unsigned async_cnt,
469 pjsip_tpfactory **p_factory)
470{
471 return pjsip_tcp_transport_start2(endpt, local, NULL, async_cnt, p_factory);
472}
473
474
475/* This callback is called by transport manager to destroy listener */
476static pj_status_t lis_destroy(pjsip_tpfactory *factory)
477{
478 struct tcp_listener *listener = (struct tcp_listener *)factory;
479
480 if (listener->is_registered) {
481 pjsip_tpmgr_unregister_tpfactory(listener->tpmgr, &listener->factory);
482 listener->is_registered = PJ_FALSE;
483 }
484
485 if (listener->asock) {
486 pj_activesock_close(listener->asock);
487 listener->asock = NULL;
488 }
489
490 if (listener->factory.lock) {
491 pj_lock_destroy(listener->factory.lock);
492 listener->factory.lock = NULL;
493 }
494
495 if (listener->factory.pool) {
496 pj_pool_t *pool = listener->factory.pool;
497
498 PJ_LOG(4,(listener->factory.obj_name, "SIP TCP listener destroyed"));
499
500 listener->factory.pool = NULL;
501 pj_pool_release(pool);
502 }
503
504 return PJ_SUCCESS;
505}
506
507
508/***************************************************************************/
509/*
510 * TCP Transport
511 */
512
513/*
514 * Prototypes.
515 */
516/* Called by transport manager to send message */
517static pj_status_t tcp_send_msg(pjsip_transport *transport,
518 pjsip_tx_data *tdata,
519 const pj_sockaddr_t *rem_addr,
520 int addr_len,
521 void *token,
522 pjsip_transport_callback callback);
523
524/* Called by transport manager to shutdown */
525static pj_status_t tcp_shutdown(pjsip_transport *transport);
526
527/* Called by transport manager to destroy transport */
528static pj_status_t tcp_destroy_transport(pjsip_transport *transport);
529
530/* Utility to destroy transport */
531static pj_status_t tcp_destroy(pjsip_transport *transport,
532 pj_status_t reason);
533
534/* Callback on incoming data */
535static pj_bool_t on_data_read(pj_activesock_t *asock,
536 void *data,
537 pj_size_t size,
538 pj_status_t status,
539 pj_size_t *remainder);
540
541/* Callback when packet is sent */
542static pj_bool_t on_data_sent(pj_activesock_t *asock,
543 pj_ioqueue_op_key_t *send_key,
544 pj_ssize_t sent);
545
546/* Callback when connect completes */
547static pj_bool_t on_connect_complete(pj_activesock_t *asock,
548 pj_status_t status);
549
550/* TCP keep-alive timer callback */
551static void tcp_keep_alive_timer(pj_timer_heap_t *th, pj_timer_entry *e);
552
553/*
554 * Common function to create TCP transport, called when pending accept() and
555 * pending connect() complete.
556 */
557static pj_status_t tcp_create( struct tcp_listener *listener,
558 pj_pool_t *pool,
559 pj_sock_t sock, pj_bool_t is_server,
560 const pj_sockaddr *local,
561 const pj_sockaddr *remote,
562 struct tcp_transport **p_tcp)
563{
564 struct tcp_transport *tcp;
565 pj_ioqueue_t *ioqueue;
566 pj_activesock_cfg asock_cfg;
567 pj_activesock_cb tcp_callback;
568 const pj_str_t ka_pkt = PJSIP_TCP_KEEP_ALIVE_DATA;
569 char print_addr[PJ_INET6_ADDRSTRLEN+10];
570 pj_status_t status;
571
572
573 PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_EINVAL);
574
575
576 if (pool == NULL) {
577 pool = pjsip_endpt_create_pool(listener->endpt, "tcp",
578 POOL_TP_INIT, POOL_TP_INC);
579 PJ_ASSERT_RETURN(pool != NULL, PJ_ENOMEM);
580 }
581
582 /*
583 * Create and initialize basic transport structure.
584 */
585 tcp = PJ_POOL_ZALLOC_T(pool, struct tcp_transport);
586 tcp->is_server = is_server;
587 tcp->sock = sock;
588 /*tcp->listener = listener;*/
589 pj_list_init(&tcp->delayed_list);
590 tcp->base.pool = pool;
591
592 pj_ansi_snprintf(tcp->base.obj_name, PJ_MAX_OBJ_NAME,
593 (is_server ? "tcps%p" :"tcpc%p"), tcp);
594
595 status = pj_atomic_create(pool, 0, &tcp->base.ref_cnt);
596 if (status != PJ_SUCCESS) {
597 goto on_error;
598 }
599
600 status = pj_lock_create_recursive_mutex(pool, "tcp", &tcp->base.lock);
601 if (status != PJ_SUCCESS) {
602 goto on_error;
603 }
604
605 tcp->base.key.type = listener->factory.type;
606 pj_sockaddr_cp(&tcp->base.key.rem_addr, remote);
607 tcp->base.type_name = (char*)pjsip_transport_get_type_name(
608 (pjsip_transport_type_e)tcp->base.key.type);
609 tcp->base.flag = pjsip_transport_get_flag_from_type(
610 (pjsip_transport_type_e)tcp->base.key.type);
611
612 tcp->base.info = (char*) pj_pool_alloc(pool, 64);
613 pj_ansi_snprintf(tcp->base.info, 64, "%s to %s",
614 tcp->base.type_name,
615 pj_sockaddr_print(remote, print_addr,
616 sizeof(print_addr), 3));
617
618 tcp->base.addr_len = pj_sockaddr_get_len(remote);
619 pj_sockaddr_cp(&tcp->base.local_addr, local);
620 sockaddr_to_host_port(pool, &tcp->base.local_name, local);
621 sockaddr_to_host_port(pool, &tcp->base.remote_name, remote);
622 tcp->base.dir = is_server? PJSIP_TP_DIR_INCOMING : PJSIP_TP_DIR_OUTGOING;
623
624 tcp->base.endpt = listener->endpt;
625 tcp->base.tpmgr = listener->tpmgr;
626 tcp->base.send_msg = &tcp_send_msg;
627 tcp->base.do_shutdown = &tcp_shutdown;
628 tcp->base.destroy = &tcp_destroy_transport;
629
630 /* Create active socket */
631 pj_activesock_cfg_default(&asock_cfg);
632 asock_cfg.async_cnt = 1;
633
634 pj_bzero(&tcp_callback, sizeof(tcp_callback));
635 tcp_callback.on_data_read = &on_data_read;
636 tcp_callback.on_data_sent = &on_data_sent;
637 tcp_callback.on_connect_complete = &on_connect_complete;
638
639 ioqueue = pjsip_endpt_get_ioqueue(listener->endpt);
640 status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg,
641 ioqueue, &tcp_callback, tcp, &tcp->asock);
642 if (status != PJ_SUCCESS) {
643 goto on_error;
644 }
645
646 /* Register transport to transport manager */
647 status = pjsip_transport_register(listener->tpmgr, &tcp->base);
648 if (status != PJ_SUCCESS) {
649 goto on_error;
650 }
651
652 tcp->is_registered = PJ_TRUE;
653
654 /* Initialize keep-alive timer */
655 tcp->ka_timer.user_data = (void*)tcp;
656 tcp->ka_timer.cb = &tcp_keep_alive_timer;
657 pj_ioqueue_op_key_init(&tcp->ka_op_key.key, sizeof(pj_ioqueue_op_key_t));
658 pj_strdup(tcp->base.pool, &tcp->ka_pkt, &ka_pkt);
659
660 /* Done setting up basic transport. */
661 *p_tcp = tcp;
662
663 PJ_LOG(4,(tcp->base.obj_name, "TCP %s transport created",
664 (tcp->is_server ? "server" : "client")));
665
666 return PJ_SUCCESS;
667
668on_error:
669 tcp_destroy(&tcp->base, status);
670 return status;
671}
672
673
674/* Flush all delayed transmision once the socket is connected. */
675static void tcp_flush_pending_tx(struct tcp_transport *tcp)
676{
677 pj_time_val now;
678
679 pj_gettickcount(&now);
680 pj_lock_acquire(tcp->base.lock);
681 while (!pj_list_empty(&tcp->delayed_list)) {
682 struct delayed_tdata *pending_tx;
683 pjsip_tx_data *tdata;
684 pj_ioqueue_op_key_t *op_key;
685 pj_ssize_t size;
686 pj_status_t status;
687
688 pending_tx = tcp->delayed_list.next;
689 pj_list_erase(pending_tx);
690
691 tdata = pending_tx->tdata_op_key->tdata;
692 op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
693
694 if (pending_tx->timeout.sec > 0 &&
695 PJ_TIME_VAL_GT(now, pending_tx->timeout))
696 {
697 continue;
698 }
699
700 /* send! */
701 size = tdata->buf.cur - tdata->buf.start;
702 status = pj_activesock_send(tcp->asock, op_key, tdata->buf.start,
703 &size, 0);
704 if (status != PJ_EPENDING) {
705 pj_lock_release(tcp->base.lock);
706 on_data_sent(tcp->asock, op_key, size);
707 pj_lock_acquire(tcp->base.lock);
708 }
709
710 }
711 pj_lock_release(tcp->base.lock);
712}
713
714
715/* Called by transport manager to destroy transport */
716static pj_status_t tcp_destroy_transport(pjsip_transport *transport)
717{
718 struct tcp_transport *tcp = (struct tcp_transport*)transport;
719
720 /* Transport would have been unregistered by now since this callback
721 * is called by transport manager.
722 */
723 tcp->is_registered = PJ_FALSE;
724
725 return tcp_destroy(transport, tcp->close_reason);
726}
727
728
729/* Destroy TCP transport */
730static pj_status_t tcp_destroy(pjsip_transport *transport,
731 pj_status_t reason)
732{
733 struct tcp_transport *tcp = (struct tcp_transport*)transport;
734
735 if (tcp->close_reason == 0)
736 tcp->close_reason = reason;
737
738 if (tcp->is_registered) {
739 tcp->is_registered = PJ_FALSE;
740 pjsip_transport_destroy(transport);
741
742 /* pjsip_transport_destroy will recursively call this function
743 * again.
744 */
745 return PJ_SUCCESS;
746 }
747
748 /* Mark transport as closing */
749 tcp->is_closing = PJ_TRUE;
750
751 /* Stop keep-alive timer. */
752 if (tcp->ka_timer.id) {
753 pjsip_endpt_cancel_timer(tcp->base.endpt, &tcp->ka_timer);
754 tcp->ka_timer.id = PJ_FALSE;
755 }
756
757 /* Cancel all delayed transmits */
758 while (!pj_list_empty(&tcp->delayed_list)) {
759 struct delayed_tdata *pending_tx;
760 pj_ioqueue_op_key_t *op_key;
761
762 pending_tx = tcp->delayed_list.next;
763 pj_list_erase(pending_tx);
764
765 op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
766
767 on_data_sent(tcp->asock, op_key, -reason);
768 }
769
770 if (tcp->rdata.tp_info.pool) {
771 pj_pool_release(tcp->rdata.tp_info.pool);
772 tcp->rdata.tp_info.pool = NULL;
773 }
774
775 if (tcp->asock) {
776 pj_activesock_close(tcp->asock);
777 tcp->asock = NULL;
778 tcp->sock = PJ_INVALID_SOCKET;
779 } else if (tcp->sock != PJ_INVALID_SOCKET) {
780 pj_sock_close(tcp->sock);
781 tcp->sock = PJ_INVALID_SOCKET;
782 }
783
784 if (tcp->base.lock) {
785 pj_lock_destroy(tcp->base.lock);
786 tcp->base.lock = NULL;
787 }
788
789 if (tcp->base.ref_cnt) {
790 pj_atomic_destroy(tcp->base.ref_cnt);
791 tcp->base.ref_cnt = NULL;
792 }
793
794 if (tcp->base.pool) {
795 pj_pool_t *pool;
796
797 if (reason != PJ_SUCCESS) {
798 char errmsg[PJ_ERR_MSG_SIZE];
799
800 pj_strerror(reason, errmsg, sizeof(errmsg));
801 PJ_LOG(4,(tcp->base.obj_name,
802 "TCP transport destroyed with reason %d: %s",
803 reason, errmsg));
804
805 } else {
806
807 PJ_LOG(4,(tcp->base.obj_name,
808 "TCP transport destroyed normally"));
809
810 }
811
812 pool = tcp->base.pool;
813 tcp->base.pool = NULL;
814 pj_pool_release(pool);
815 }
816
817 return PJ_SUCCESS;
818}
819
820
821/*
822 * This utility function creates receive data buffers and start
823 * asynchronous recv() operations from the socket. It is called after
824 * accept() or connect() operation complete.
825 */
826static pj_status_t tcp_start_read(struct tcp_transport *tcp)
827{
828 pj_pool_t *pool;
829 pj_uint32_t size;
830 pj_sockaddr *rem_addr;
831 void *readbuf[1];
832 pj_status_t status;
833
834 /* Init rdata */
835 pool = pjsip_endpt_create_pool(tcp->base.endpt,
836 "rtd%p",
837 PJSIP_POOL_RDATA_LEN,
838 PJSIP_POOL_RDATA_INC);
839 if (!pool) {
840 tcp_perror(tcp->base.obj_name, "Unable to create pool", PJ_ENOMEM);
841 return PJ_ENOMEM;
842 }
843
844 tcp->rdata.tp_info.pool = pool;
845
846 tcp->rdata.tp_info.transport = &tcp->base;
847 tcp->rdata.tp_info.tp_data = tcp;
848 tcp->rdata.tp_info.op_key.rdata = &tcp->rdata;
849 pj_ioqueue_op_key_init(&tcp->rdata.tp_info.op_key.op_key,
850 sizeof(pj_ioqueue_op_key_t));
851
852 tcp->rdata.pkt_info.src_addr = tcp->base.key.rem_addr;
853 tcp->rdata.pkt_info.src_addr_len = sizeof(tcp->rdata.pkt_info.src_addr);
854 rem_addr = &tcp->base.key.rem_addr;
855 pj_sockaddr_print(rem_addr, tcp->rdata.pkt_info.src_name,
856 sizeof(tcp->rdata.pkt_info.src_name), 0);
857 tcp->rdata.pkt_info.src_port = pj_sockaddr_get_port(rem_addr);
858
859 size = sizeof(tcp->rdata.pkt_info.packet);
860 readbuf[0] = tcp->rdata.pkt_info.packet;
861 status = pj_activesock_start_read2(tcp->asock, tcp->base.pool, size,
862 readbuf, 0);
863 if (status != PJ_SUCCESS && status != PJ_EPENDING) {
864 PJ_LOG(4, (tcp->base.obj_name,
865 "pj_activesock_start_read() error, status=%d",
866 status));
867 return status;
868 }
869
870 return PJ_SUCCESS;
871}
872
873
874/* This callback is called by transport manager for the TCP factory
875 * to create outgoing transport to the specified destination.
876 */
877static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
878 pjsip_tpmgr *mgr,
879 pjsip_endpoint *endpt,
880 const pj_sockaddr *rem_addr,
881 int addr_len,
882 pjsip_transport **p_transport)
883{
884 struct tcp_listener *listener;
885 struct tcp_transport *tcp;
886 pj_sock_t sock;
887 pj_sockaddr local_addr;
888 pj_status_t status;
889
890 /* Sanity checks */
891 PJ_ASSERT_RETURN(factory && mgr && endpt && rem_addr &&
892 addr_len && p_transport, PJ_EINVAL);
893
894 /* Check that address is a sockaddr_in or sockaddr_in6*/
895 PJ_ASSERT_RETURN((rem_addr->addr.sa_family == pj_AF_INET() &&
896 addr_len == sizeof(pj_sockaddr_in)) ||
897 (rem_addr->addr.sa_family == pj_AF_INET6() &&
898 addr_len == sizeof(pj_sockaddr_in6)), PJ_EINVAL);
899
900
901 listener = (struct tcp_listener*)factory;
902
903 /* Create socket */
904 status = pj_sock_socket(rem_addr->addr.sa_family, pj_SOCK_STREAM(),
905 0, &sock);
906 if (status != PJ_SUCCESS)
907 return status;
908
909 /* Apply QoS, if specified */
910 status = pj_sock_apply_qos2(sock, listener->qos_type,
911 &listener->qos_params,
912 2, listener->factory.obj_name,
913 "outgoing SIP TCP socket");
914
915 /* Bind to listener's address and any port */
916 pj_bzero(&local_addr, sizeof(local_addr));
917 pj_sockaddr_cp(&local_addr, &listener->bound_addr);
918 pj_sockaddr_set_port(&local_addr, 0);
919
920 status = pj_sock_bind(sock, &local_addr,
921 pj_sockaddr_get_len(&local_addr));
922 if (status != PJ_SUCCESS) {
923 pj_sock_close(sock);
924 return status;
925 }
926
927 /* Get the local port */
928 addr_len = sizeof(local_addr);
929 status = pj_sock_getsockname(sock, &local_addr, &addr_len);
930 if (status != PJ_SUCCESS) {
931 pj_sock_close(sock);
932 return status;
933 }
934
935 /* Initially set the address from the listener's address */
936 if (!pj_sockaddr_has_addr(&local_addr)) {
937 pj_sockaddr_copy_addr(&local_addr, &listener->factory.local_addr);
938 }
939
940 /* Create the transport descriptor */
941 status = tcp_create(listener, NULL, sock, PJ_FALSE, &local_addr,
942 rem_addr, &tcp);
943 if (status != PJ_SUCCESS)
944 return status;
945
946
947 /* Start asynchronous connect() operation */
948 tcp->has_pending_connect = PJ_TRUE;
949 status = pj_activesock_start_connect(tcp->asock, tcp->base.pool, rem_addr,
950 addr_len);
951 if (status == PJ_SUCCESS) {
952 on_connect_complete(tcp->asock, PJ_SUCCESS);
953 } else if (status != PJ_EPENDING) {
954 tcp_destroy(&tcp->base, status);
955 return status;
956 }
957
958 if (tcp->has_pending_connect) {
959 /* Update (again) local address, just in case local address currently
960 * set is different now that asynchronous connect() is started.
961 */
962 addr_len = sizeof(local_addr);
963 if (pj_sock_getsockname(sock, &local_addr, &addr_len)==PJ_SUCCESS) {
964 pj_sockaddr *tp_addr = &tcp->base.local_addr;
965
966 /* Some systems (like old Win32 perhaps) may not set local address
967 * properly before socket is fully connected.
968 */
969 if (pj_sockaddr_cmp(tp_addr, &local_addr) &&
970 pj_sockaddr_has_addr(&local_addr) &&
971 pj_sockaddr_get_port(&local_addr) != 0)
972 {
973 pj_sockaddr_cp(tp_addr, &local_addr);
974 sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name,
975 &local_addr);
976 }
977 }
978
979 PJ_LOG(4,(tcp->base.obj_name,
980 "TCP transport %.*s:%d is connecting to %.*s:%d...",
981 (int)tcp->base.local_name.host.slen,
982 tcp->base.local_name.host.ptr,
983 tcp->base.local_name.port,
984 (int)tcp->base.remote_name.host.slen,
985 tcp->base.remote_name.host.ptr,
986 tcp->base.remote_name.port));
987 }
988
989 /* Done */
990 *p_transport = &tcp->base;
991
992 return PJ_SUCCESS;
993}
994
995
996/*
997 * This callback is called by active socket when pending accept() operation
998 * has completed.
999 */
1000static pj_bool_t on_accept_complete(pj_activesock_t *asock,
1001 pj_sock_t sock,
1002 const pj_sockaddr_t *src_addr,
1003 int src_addr_len)
1004{
1005 struct tcp_listener *listener;
1006 struct tcp_transport *tcp;
1007 char addr[PJ_INET6_ADDRSTRLEN+10];
1008 pjsip_tp_state_callback state_cb;
1009 pj_sockaddr tmp_src_addr;
1010 pj_status_t status;
1011
1012 PJ_UNUSED_ARG(src_addr_len);
1013
1014 listener = (struct tcp_listener*) pj_activesock_get_user_data(asock);
1015
1016 PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_TRUE);
1017
1018 PJ_LOG(4,(listener->factory.obj_name,
1019 "TCP listener %.*s:%d: got incoming TCP connection "
1020 "from %s, sock=%d",
1021 (int)listener->factory.addr_name.host.slen,
1022 listener->factory.addr_name.host.ptr,
1023 listener->factory.addr_name.port,
1024 pj_sockaddr_print(src_addr, addr, sizeof(addr), 3),
1025 sock));
1026
1027 /* Apply QoS, if specified */
1028 status = pj_sock_apply_qos2(sock, listener->qos_type,
1029 &listener->qos_params,
1030 2, listener->factory.obj_name,
1031 "incoming SIP TCP socket");
1032
1033 /* tcp_create() expect pj_sockaddr, so copy src_addr to temporary var,
1034 * just in case.
1035 */
1036 pj_bzero(&tmp_src_addr, sizeof(tmp_src_addr));
1037 pj_sockaddr_cp(&tmp_src_addr, src_addr);
1038
1039 /*
1040 * Incoming connection!
1041 * Create TCP transport for the new socket.
1042 */
1043 status = tcp_create( listener, NULL, sock, PJ_TRUE,
1044 &listener->factory.local_addr,
1045 &tmp_src_addr, &tcp);
1046 if (status == PJ_SUCCESS) {
1047 status = tcp_start_read(tcp);
1048 if (status != PJ_SUCCESS) {
1049 PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled"));
1050 tcp_destroy(&tcp->base, status);
1051 } else {
1052 /* Start keep-alive timer */
1053 if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) {
1054 pj_time_val delay = {PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0};
1055 pjsip_endpt_schedule_timer(listener->endpt,
1056 &tcp->ka_timer,
1057 &delay);
1058 tcp->ka_timer.id = PJ_TRUE;
1059 pj_gettimeofday(&tcp->last_activity);
1060 }
1061
1062 /* Notify application of transport state accepted */
1063 state_cb = pjsip_tpmgr_get_state_cb(tcp->base.tpmgr);
1064 if (state_cb) {
1065 pjsip_transport_state_info state_info;
1066
1067 pj_bzero(&state_info, sizeof(state_info));
1068 (*state_cb)(&tcp->base, PJSIP_TP_STATE_CONNECTED, &state_info);
1069 }
1070 }
1071 }
1072
1073 return PJ_TRUE;
1074}
1075
1076
1077/*
1078 * Callback from ioqueue when packet is sent.
1079 */
1080static pj_bool_t on_data_sent(pj_activesock_t *asock,
1081 pj_ioqueue_op_key_t *op_key,
1082 pj_ssize_t bytes_sent)
1083{
1084 struct tcp_transport *tcp = (struct tcp_transport*)
1085 pj_activesock_get_user_data(asock);
1086 pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key;
1087
1088 /* Note that op_key may be the op_key from keep-alive, thus
1089 * it will not have tdata etc.
1090 */
1091
1092 tdata_op_key->tdata = NULL;
1093
1094 if (tdata_op_key->callback) {
1095 /*
1096 * Notify sip_transport.c that packet has been sent.
1097 */
1098 if (bytes_sent == 0)
1099 bytes_sent = -PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);
1100
1101 tdata_op_key->callback(&tcp->base, tdata_op_key->token, bytes_sent);
1102
1103 /* Mark last activity time */
1104 pj_gettimeofday(&tcp->last_activity);
1105
1106 }
1107
1108 /* Check for error/closure */
1109 if (bytes_sent <= 0) {
1110 pj_status_t status;
1111
1112 PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d",
1113 bytes_sent));
1114
1115 status = (bytes_sent == 0) ? PJ_RETURN_OS_ERROR(OSERR_ENOTCONN) :
1116 (pj_status_t)-bytes_sent;
1117
1118 tcp_init_shutdown(tcp, status);
1119
1120 return PJ_FALSE;
1121 }
1122
1123 return PJ_TRUE;
1124}
1125
1126
1127/*
1128 * This callback is called by transport manager to send SIP message
1129 */
1130static pj_status_t tcp_send_msg(pjsip_transport *transport,
1131 pjsip_tx_data *tdata,
1132 const pj_sockaddr_t *rem_addr,
1133 int addr_len,
1134 void *token,
1135 pjsip_transport_callback callback)
1136{
1137 struct tcp_transport *tcp = (struct tcp_transport*)transport;
1138 pj_ssize_t size;
1139 pj_bool_t delayed = PJ_FALSE;
1140 pj_status_t status = PJ_SUCCESS;
1141
1142 /* Sanity check */
1143 PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL);
1144
1145 /* Check that there's no pending operation associated with the tdata */
1146 PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX);
1147
1148 /* Check the address is supported */
1149 PJ_ASSERT_RETURN(rem_addr && (addr_len==sizeof(pj_sockaddr_in) ||
1150 addr_len==sizeof(pj_sockaddr_in6)),
1151 PJ_EINVAL);
1152
1153 /* Init op key. */
1154 tdata->op_key.tdata = tdata;
1155 tdata->op_key.token = token;
1156 tdata->op_key.callback = callback;
1157
1158 /* If asynchronous connect() has not completed yet, just put the
1159 * transmit data in the pending transmission list since we can not
1160 * use the socket yet.
1161 */
1162 if (tcp->has_pending_connect) {
1163
1164 /*
1165 * Looks like connect() is still in progress. Check again (this time
1166 * with holding the lock) to be sure.
1167 */
1168 pj_lock_acquire(tcp->base.lock);
1169
1170 if (tcp->has_pending_connect) {
1171 struct delayed_tdata *delayed_tdata;
1172
1173 /*
1174 * connect() is still in progress. Put the transmit data to
1175 * the delayed list.
1176 * Starting from #1583 (https://trac.pjsip.org/repos/ticket/1583),
1177 * we also add timeout value for the transmit data. When the
1178 * connect() is completed, the timeout value will be checked to
1179 * determine whether the transmit data needs to be sent.
1180 */
1181 delayed_tdata = PJ_POOL_ZALLOC_T(tdata->pool,
1182 struct delayed_tdata);
1183 delayed_tdata->tdata_op_key = &tdata->op_key;
1184 if (tdata->msg && tdata->msg->type == PJSIP_REQUEST_MSG) {
1185 pj_gettickcount(&delayed_tdata->timeout);
1186 delayed_tdata->timeout.msec += pjsip_cfg()->tsx.td;
1187 pj_time_val_normalize(&delayed_tdata->timeout);
1188 }
1189
1190 pj_list_push_back(&tcp->delayed_list, delayed_tdata);
1191 status = PJ_EPENDING;
1192
1193 /* Prevent pj_ioqueue_send() to be called below */
1194 delayed = PJ_TRUE;
1195 }
1196
1197 pj_lock_release(tcp->base.lock);
1198 }
1199
1200 if (!delayed) {
1201 /*
1202 * Transport is ready to go. Send the packet to ioqueue to be
1203 * sent asynchronously.
1204 */
1205 size = tdata->buf.cur - tdata->buf.start;
1206 status = pj_activesock_send(tcp->asock,
1207 (pj_ioqueue_op_key_t*)&tdata->op_key,
1208 tdata->buf.start, &size, 0);
1209
1210 if (status != PJ_EPENDING) {
1211 /* Not pending (could be immediate success or error) */
1212 tdata->op_key.tdata = NULL;
1213
1214 /* Shutdown transport on closure/errors */
1215 if (size <= 0) {
1216
1217 PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d",
1218 size));
1219
1220 if (status == PJ_SUCCESS)
1221 status = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);
1222
1223 tcp_init_shutdown(tcp, status);
1224 }
1225 }
1226 }
1227
1228 return status;
1229}
1230
1231
1232/*
1233 * This callback is called by transport manager to shutdown transport.
1234 */
1235static pj_status_t tcp_shutdown(pjsip_transport *transport)
1236{
1237 struct tcp_transport *tcp = (struct tcp_transport*)transport;
1238
1239 /* Stop keep-alive timer. */
1240 if (tcp->ka_timer.id) {
1241 pjsip_endpt_cancel_timer(tcp->base.endpt, &tcp->ka_timer);
1242 tcp->ka_timer.id = PJ_FALSE;
1243 }
1244
1245 return PJ_SUCCESS;
1246}
1247
1248
1249/*
1250 * Callback from ioqueue that an incoming data is received from the socket.
1251 */
1252static pj_bool_t on_data_read(pj_activesock_t *asock,
1253 void *data,
1254 pj_size_t size,
1255 pj_status_t status,
1256 pj_size_t *remainder)
1257{
1258 enum { MAX_IMMEDIATE_PACKET = 10 };
1259 struct tcp_transport *tcp;
1260 pjsip_rx_data *rdata;
1261
1262 PJ_UNUSED_ARG(data);
1263
1264 tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock);
1265 rdata = &tcp->rdata;
1266
1267 /* Don't do anything if transport is closing. */
1268 if (tcp->is_closing) {
1269 tcp->is_closing++;
1270 return PJ_FALSE;
1271 }
1272
1273 /* Houston, we have packet! Report the packet to transport manager
1274 * to be parsed.
1275 */
1276 if (status == PJ_SUCCESS) {
1277 pj_size_t size_eaten;
1278
1279 /* Mark this as an activity */
1280 pj_gettimeofday(&tcp->last_activity);
1281
1282 pj_assert((void*)rdata->pkt_info.packet == data);
1283
1284 /* Init pkt_info part. */
1285 rdata->pkt_info.len = size;
1286 rdata->pkt_info.zero = 0;
1287 pj_gettimeofday(&rdata->pkt_info.timestamp);
1288
1289 /* Report to transport manager.
1290 * The transport manager will tell us how many bytes of the packet
1291 * have been processed (as valid SIP message).
1292 */
1293 size_eaten =
1294 pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr,
1295 rdata);
1296
1297 pj_assert(size_eaten <= (pj_size_t)rdata->pkt_info.len);
1298
1299 /* Move unprocessed data to the front of the buffer */
1300 *remainder = size - size_eaten;
1301 if (*remainder > 0 && *remainder != size) {
1302 pj_memmove(rdata->pkt_info.packet,
1303 rdata->pkt_info.packet + size_eaten,
1304 *remainder);
1305 }
1306
1307 } else {
1308
1309 /* Transport is closed */
1310 PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed"));
1311
1312 tcp_init_shutdown(tcp, status);
1313
1314 return PJ_FALSE;
1315
1316 }
1317
1318 /* Reset pool. */
1319 pj_pool_reset(rdata->tp_info.pool);
1320
1321 return PJ_TRUE;
1322}
1323
1324
1325/*
1326 * Callback from ioqueue when asynchronous connect() operation completes.
1327 */
1328static pj_bool_t on_connect_complete(pj_activesock_t *asock,
1329 pj_status_t status)
1330{
1331 struct tcp_transport *tcp;
1332 pj_sockaddr addr;
1333 int addrlen;
1334 pjsip_tp_state_callback state_cb;
1335
1336 tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock);
1337
1338 /* Mark that pending connect() operation has completed. */
1339 tcp->has_pending_connect = PJ_FALSE;
1340
1341 /* Check connect() status */
1342 if (status != PJ_SUCCESS) {
1343
1344 tcp_perror(tcp->base.obj_name, "TCP connect() error", status);
1345
1346 /* Cancel all delayed transmits */
1347 while (!pj_list_empty(&tcp->delayed_list)) {
1348 struct delayed_tdata *pending_tx;
1349 pj_ioqueue_op_key_t *op_key;
1350
1351 pending_tx = tcp->delayed_list.next;
1352 pj_list_erase(pending_tx);
1353
1354 op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
1355
1356 on_data_sent(tcp->asock, op_key, -status);
1357 }
1358
1359 tcp_init_shutdown(tcp, status);
1360 return PJ_FALSE;
1361 }
1362
1363 PJ_LOG(4,(tcp->base.obj_name,
1364 "TCP transport %.*s:%d is connected to %.*s:%d",
1365 (int)tcp->base.local_name.host.slen,
1366 tcp->base.local_name.host.ptr,
1367 tcp->base.local_name.port,
1368 (int)tcp->base.remote_name.host.slen,
1369 tcp->base.remote_name.host.ptr,
1370 tcp->base.remote_name.port));
1371
1372
1373 /* Update (again) local address, just in case local address currently
1374 * set is different now that the socket is connected (could happen
1375 * on some systems, like old Win32 probably?).
1376 */
1377 addrlen = sizeof(addr);
1378 if (pj_sock_getsockname(tcp->sock, &addr, &addrlen)==PJ_SUCCESS) {
1379 pj_sockaddr *tp_addr = &tcp->base.local_addr;
1380
1381 if (pj_sockaddr_has_addr(&addr) &&
1382 pj_sockaddr_cmp(&addr, tp_addr) != 0)
1383 {
1384 pj_sockaddr_cp(tp_addr, &addr);
1385 sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name,
1386 tp_addr);
1387 }
1388 }
1389
1390 /* Start pending read */
1391 status = tcp_start_read(tcp);
1392 if (status != PJ_SUCCESS) {
1393 tcp_init_shutdown(tcp, status);
1394 return PJ_FALSE;
1395 }
1396
1397 /* Notify application of transport state connected */
1398 state_cb = pjsip_tpmgr_get_state_cb(tcp->base.tpmgr);
1399 if (state_cb) {
1400 pjsip_transport_state_info state_info;
1401
1402 pj_bzero(&state_info, sizeof(state_info));
1403 (*state_cb)(&tcp->base, PJSIP_TP_STATE_CONNECTED, &state_info);
1404 }
1405
1406 /* Flush all pending send operations */
1407 tcp_flush_pending_tx(tcp);
1408
1409 /* Start keep-alive timer */
1410 if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) {
1411 pj_time_val delay = { PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0 };
1412 pjsip_endpt_schedule_timer(tcp->base.endpt, &tcp->ka_timer,
1413 &delay);
1414 tcp->ka_timer.id = PJ_TRUE;
1415 pj_gettimeofday(&tcp->last_activity);
1416 }
1417
1418 return PJ_TRUE;
1419}
1420
1421/* Transport keep-alive timer callback */
1422static void tcp_keep_alive_timer(pj_timer_heap_t *th, pj_timer_entry *e)
1423{
1424 struct tcp_transport *tcp = (struct tcp_transport*) e->user_data;
1425 pj_time_val delay;
1426 pj_time_val now;
1427 pj_ssize_t size;
1428 pj_status_t status;
1429
1430 PJ_UNUSED_ARG(th);
1431
1432 tcp->ka_timer.id = PJ_TRUE;
1433
1434 pj_gettimeofday(&now);
1435 PJ_TIME_VAL_SUB(now, tcp->last_activity);
1436
1437 if (now.sec > 0 && now.sec < PJSIP_TCP_KEEP_ALIVE_INTERVAL) {
1438 /* There has been activity, so don't send keep-alive */
1439 delay.sec = PJSIP_TCP_KEEP_ALIVE_INTERVAL - now.sec;
1440 delay.msec = 0;
1441
1442 pjsip_endpt_schedule_timer(tcp->base.endpt, &tcp->ka_timer,
1443 &delay);
1444 tcp->ka_timer.id = PJ_TRUE;
1445 return;
1446 }
1447
1448 PJ_LOG(5,(tcp->base.obj_name, "Sending %d byte(s) keep-alive to %.*s:%d",
1449 (int)tcp->ka_pkt.slen, (int)tcp->base.remote_name.host.slen,
1450 tcp->base.remote_name.host.ptr,
1451 tcp->base.remote_name.port));
1452
1453 /* Send the data */
1454 size = tcp->ka_pkt.slen;
1455 status = pj_activesock_send(tcp->asock, &tcp->ka_op_key.key,
1456 tcp->ka_pkt.ptr, &size, 0);
1457
1458 if (status != PJ_SUCCESS && status != PJ_EPENDING) {
1459 tcp_perror(tcp->base.obj_name,
1460 "Error sending keep-alive packet", status);
1461 tcp_init_shutdown(tcp, status);
1462 return;
1463 }
1464
1465 /* Register next keep-alive */
1466 delay.sec = PJSIP_TCP_KEEP_ALIVE_INTERVAL;
1467 delay.msec = 0;
1468
1469 pjsip_endpt_schedule_timer(tcp->base.endpt, &tcp->ka_timer,
1470 &delay);
1471 tcp->ka_timer.id = PJ_TRUE;
1472}
1473
1474
1475#endif /* PJ_HAS_TCP */
1476