blob: 796ed471b727ff970ddd40080737569daee95d78 [file] [log] [blame]
Tristan Matthews0a329cc2013-07-17 13:20:14 -04001/* $Id$ */
2/*
3 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20#include "turn.h"
21#include <pj/compat/socket.h>
22
23#if PJ_HAS_TCP
24
25struct accept_op
26{
27 pj_ioqueue_op_key_t op_key;
28 pj_sock_t sock;
29 pj_sockaddr src_addr;
30 int src_addr_len;
31};
32
33struct tcp_listener
34{
35 pj_turn_listener base;
36 pj_ioqueue_key_t *key;
37 unsigned accept_cnt;
38 struct accept_op *accept_op; /* Array of accept_op's */
39};
40
41
42static void lis_on_accept_complete(pj_ioqueue_key_t *key,
43 pj_ioqueue_op_key_t *op_key,
44 pj_sock_t sock,
45 pj_status_t status);
46static pj_status_t lis_destroy(pj_turn_listener *listener);
47static void transport_create(pj_sock_t sock, pj_turn_listener *lis,
48 pj_sockaddr_t *src_addr, int src_addr_len);
49
50static void show_err(const char *sender, const char *title,
51 pj_status_t status)
52{
53 char errmsg[PJ_ERR_MSG_SIZE];
54
55 pj_strerror(status, errmsg, sizeof(errmsg));
56 PJ_LOG(4,(sender, "%s: %s", title, errmsg));
57}
58
59
60/*
61 * Create a new listener on the specified port.
62 */
63PJ_DEF(pj_status_t) pj_turn_listener_create_tcp(pj_turn_srv *srv,
64 int af,
65 const pj_str_t *bound_addr,
66 unsigned port,
67 unsigned concurrency_cnt,
68 unsigned flags,
69 pj_turn_listener **p_listener)
70{
71 pj_pool_t *pool;
72 struct tcp_listener *tcp_lis;
73 pj_ioqueue_callback ioqueue_cb;
74 unsigned i;
75 pj_status_t status;
76
77 /* Create structure */
78 pool = pj_pool_create(srv->core.pf, "tcpl%p", 1000, 1000, NULL);
79 tcp_lis = PJ_POOL_ZALLOC_T(pool, struct tcp_listener);
80 tcp_lis->base.pool = pool;
81 tcp_lis->base.obj_name = pool->obj_name;
82 tcp_lis->base.server = srv;
83 tcp_lis->base.tp_type = PJ_TURN_TP_TCP;
84 tcp_lis->base.sock = PJ_INVALID_SOCKET;
85 //tcp_lis->base.sendto = &tcp_sendto;
86 tcp_lis->base.destroy = &lis_destroy;
87 tcp_lis->accept_cnt = concurrency_cnt;
88 tcp_lis->base.flags = flags;
89
90 /* Create socket */
91 status = pj_sock_socket(af, pj_SOCK_STREAM(), 0, &tcp_lis->base.sock);
92 if (status != PJ_SUCCESS)
93 goto on_error;
94
95 /* Init bind address */
96 status = pj_sockaddr_init(af, &tcp_lis->base.addr, bound_addr,
97 (pj_uint16_t)port);
98 if (status != PJ_SUCCESS)
99 goto on_error;
100
101 /* Create info */
102 pj_ansi_strcpy(tcp_lis->base.info, "TCP:");
103 pj_sockaddr_print(&tcp_lis->base.addr, tcp_lis->base.info+4,
104 sizeof(tcp_lis->base.info)-4, 3);
105
106 /* Bind socket */
107 status = pj_sock_bind(tcp_lis->base.sock, &tcp_lis->base.addr,
108 pj_sockaddr_get_len(&tcp_lis->base.addr));
109 if (status != PJ_SUCCESS)
110 goto on_error;
111
112 /* Listen() */
113 status = pj_sock_listen(tcp_lis->base.sock, 5);
114 if (status != PJ_SUCCESS)
115 goto on_error;
116
117 /* Register to ioqueue */
118 pj_bzero(&ioqueue_cb, sizeof(ioqueue_cb));
119 ioqueue_cb.on_accept_complete = &lis_on_accept_complete;
120 status = pj_ioqueue_register_sock(pool, srv->core.ioqueue, tcp_lis->base.sock,
121 tcp_lis, &ioqueue_cb, &tcp_lis->key);
122
123 /* Create op keys */
124 tcp_lis->accept_op = (struct accept_op*)pj_pool_calloc(pool, concurrency_cnt,
125 sizeof(struct accept_op));
126
127 /* Create each accept_op and kick off read operation */
128 for (i=0; i<concurrency_cnt; ++i) {
129 lis_on_accept_complete(tcp_lis->key, &tcp_lis->accept_op[i].op_key,
130 PJ_INVALID_SOCKET, PJ_EPENDING);
131 }
132
133 /* Done */
134 PJ_LOG(4,(tcp_lis->base.obj_name, "Listener %s created",
135 tcp_lis->base.info));
136
137 *p_listener = &tcp_lis->base;
138 return PJ_SUCCESS;
139
140
141on_error:
142 lis_destroy(&tcp_lis->base);
143 return status;
144}
145
146
147/*
148 * Destroy listener.
149 */
150static pj_status_t lis_destroy(pj_turn_listener *listener)
151{
152 struct tcp_listener *tcp_lis = (struct tcp_listener *)listener;
153 unsigned i;
154
155 if (tcp_lis->key) {
156 pj_ioqueue_unregister(tcp_lis->key);
157 tcp_lis->key = NULL;
158 tcp_lis->base.sock = PJ_INVALID_SOCKET;
159 } else if (tcp_lis->base.sock != PJ_INVALID_SOCKET) {
160 pj_sock_close(tcp_lis->base.sock);
161 tcp_lis->base.sock = PJ_INVALID_SOCKET;
162 }
163
164 for (i=0; i<tcp_lis->accept_cnt; ++i) {
165 /* Nothing to do */
166 }
167
168 if (tcp_lis->base.pool) {
169 pj_pool_t *pool = tcp_lis->base.pool;
170
171 PJ_LOG(4,(tcp_lis->base.obj_name, "Listener %s destroyed",
172 tcp_lis->base.info));
173
174 tcp_lis->base.pool = NULL;
175 pj_pool_release(pool);
176 }
177 return PJ_SUCCESS;
178}
179
180
181/*
182 * Callback on new TCP connection.
183 */
184static void lis_on_accept_complete(pj_ioqueue_key_t *key,
185 pj_ioqueue_op_key_t *op_key,
186 pj_sock_t sock,
187 pj_status_t status)
188{
189 struct tcp_listener *tcp_lis;
190 struct accept_op *accept_op = (struct accept_op*) op_key;
191
192 tcp_lis = (struct tcp_listener*) pj_ioqueue_get_user_data(key);
193
194 PJ_UNUSED_ARG(sock);
195
196 do {
197 /* Report new connection. */
198 if (status == PJ_SUCCESS) {
199 char addr[PJ_INET6_ADDRSTRLEN+8];
200 PJ_LOG(5,(tcp_lis->base.obj_name, "Incoming TCP from %s",
201 pj_sockaddr_print(&accept_op->src_addr, addr,
202 sizeof(addr), 3)));
203 transport_create(accept_op->sock, &tcp_lis->base,
204 &accept_op->src_addr, accept_op->src_addr_len);
205 } else if (status != PJ_EPENDING) {
206 show_err(tcp_lis->base.obj_name, "accept()", status);
207 }
208
209 /* Prepare next accept() */
210 accept_op->src_addr_len = sizeof(accept_op->src_addr);
211 status = pj_ioqueue_accept(key, op_key, &accept_op->sock,
212 NULL,
213 &accept_op->src_addr,
214 &accept_op->src_addr_len);
215
216 } while (status != PJ_EPENDING && status != PJ_ECANCELLED &&
217 status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL));
218}
219
220
221/****************************************************************************/
222/*
223 * Transport
224 */
225enum
226{
227 TIMER_NONE,
228 TIMER_DESTROY
229};
230
231/* The delay in seconds to be applied before TCP transport is destroyed when
232 * no allocation is referencing it. This also means the initial time to wait
233 * after the initial TCP connection establishment to receive a valid STUN
234 * message in the transport.
235 */
236#define SHUTDOWN_DELAY 10
237
238struct recv_op
239{
240 pj_ioqueue_op_key_t op_key;
241 pj_turn_pkt pkt;
242};
243
244struct tcp_transport
245{
246 pj_turn_transport base;
247 pj_pool_t *pool;
248 pj_timer_entry timer;
249
250 pj_turn_allocation *alloc;
251 int ref_cnt;
252
253 pj_sock_t sock;
254 pj_ioqueue_key_t *key;
255 struct recv_op recv_op;
256 pj_ioqueue_op_key_t send_op;
257};
258
259
260static void tcp_on_read_complete(pj_ioqueue_key_t *key,
261 pj_ioqueue_op_key_t *op_key,
262 pj_ssize_t bytes_read);
263
264static pj_status_t tcp_sendto(pj_turn_transport *tp,
265 const void *packet,
266 pj_size_t size,
267 unsigned flag,
268 const pj_sockaddr_t *addr,
269 int addr_len);
270static void tcp_destroy(struct tcp_transport *tcp);
271static void tcp_add_ref(pj_turn_transport *tp,
272 pj_turn_allocation *alloc);
273static void tcp_dec_ref(pj_turn_transport *tp,
274 pj_turn_allocation *alloc);
275static void timer_callback(pj_timer_heap_t *timer_heap,
276 pj_timer_entry *entry);
277
278static void transport_create(pj_sock_t sock, pj_turn_listener *lis,
279 pj_sockaddr_t *src_addr, int src_addr_len)
280{
281 pj_pool_t *pool;
282 struct tcp_transport *tcp;
283 pj_ioqueue_callback cb;
284 pj_status_t status;
285
286 pool = pj_pool_create(lis->server->core.pf, "tcp%p", 1000, 1000, NULL);
287
288 tcp = PJ_POOL_ZALLOC_T(pool, struct tcp_transport);
289 tcp->base.obj_name = pool->obj_name;
290 tcp->base.listener = lis;
291 tcp->base.info = lis->info;
292 tcp->base.sendto = &tcp_sendto;
293 tcp->base.add_ref = &tcp_add_ref;
294 tcp->base.dec_ref = &tcp_dec_ref;
295 tcp->pool = pool;
296 tcp->sock = sock;
297
298 pj_timer_entry_init(&tcp->timer, TIMER_NONE, tcp, &timer_callback);
299
300 /* Register to ioqueue */
301 pj_bzero(&cb, sizeof(cb));
302 cb.on_read_complete = &tcp_on_read_complete;
303 status = pj_ioqueue_register_sock(pool, lis->server->core.ioqueue, sock,
304 tcp, &cb, &tcp->key);
305 if (status != PJ_SUCCESS) {
306 tcp_destroy(tcp);
307 return;
308 }
309
310 /* Init pkt */
311 tcp->recv_op.pkt.pool = pj_pool_create(lis->server->core.pf, "tcpkt%p",
312 1000, 1000, NULL);
313 tcp->recv_op.pkt.transport = &tcp->base;
314 tcp->recv_op.pkt.src.tp_type = PJ_TURN_TP_TCP;
315 tcp->recv_op.pkt.src_addr_len = src_addr_len;
316 pj_memcpy(&tcp->recv_op.pkt.src.clt_addr, src_addr, src_addr_len);
317
318 tcp_on_read_complete(tcp->key, &tcp->recv_op.op_key, -PJ_EPENDING);
319 /* Should not access transport from now, it may have been destroyed */
320}
321
322
323static void tcp_destroy(struct tcp_transport *tcp)
324{
325 if (tcp->key) {
326 pj_ioqueue_unregister(tcp->key);
327 tcp->key = NULL;
328 tcp->sock = 0;
329 } else if (tcp->sock) {
330 pj_sock_close(tcp->sock);
331 tcp->sock = 0;
332 }
333
334 if (tcp->pool) {
335 pj_pool_release(tcp->pool);
336 }
337}
338
339
340static void timer_callback(pj_timer_heap_t *timer_heap,
341 pj_timer_entry *entry)
342{
343 struct tcp_transport *tcp = (struct tcp_transport*) entry->user_data;
344
345 PJ_UNUSED_ARG(timer_heap);
346
347 tcp_destroy(tcp);
348}
349
350
351static void tcp_on_read_complete(pj_ioqueue_key_t *key,
352 pj_ioqueue_op_key_t *op_key,
353 pj_ssize_t bytes_read)
354{
355 struct tcp_transport *tcp;
356 struct recv_op *recv_op = (struct recv_op*) op_key;
357 pj_status_t status;
358
359 tcp = (struct tcp_transport*) pj_ioqueue_get_user_data(key);
360
361 do {
362 /* Report to server or allocation, if we have allocation */
363 if (bytes_read > 0) {
364
365 recv_op->pkt.len = bytes_read;
366 pj_gettimeofday(&recv_op->pkt.rx_time);
367
368 tcp_add_ref(&tcp->base, NULL);
369
370 if (tcp->alloc) {
371 pj_turn_allocation_on_rx_client_pkt(tcp->alloc, &recv_op->pkt);
372 } else {
373 pj_turn_srv_on_rx_pkt(tcp->base.listener->server, &recv_op->pkt);
374 }
375
376 pj_assert(tcp->ref_cnt > 0);
377 tcp_dec_ref(&tcp->base, NULL);
378
379 } else if (bytes_read != -PJ_EPENDING) {
380 /* TCP connection closed/error. Notify client and then destroy
381 * ourselves.
382 * Note: the -PJ_EPENDING is the value passed during init.
383 */
384 ++tcp->ref_cnt;
385
386 if (tcp->alloc) {
387 if (bytes_read != 0) {
388 show_err(tcp->base.obj_name, "TCP socket error",
389 -bytes_read);
390 } else {
391 PJ_LOG(5,(tcp->base.obj_name, "TCP socket closed"));
392 }
393 pj_turn_allocation_on_transport_closed(tcp->alloc, &tcp->base);
394 tcp->alloc = NULL;
395 }
396
397 pj_assert(tcp->ref_cnt > 0);
398 if (--tcp->ref_cnt == 0) {
399 tcp_destroy(tcp);
400 return;
401 }
402 }
403
404 /* Reset pool */
405 pj_pool_reset(recv_op->pkt.pool);
406
407 /* If packet is full discard it */
408 if (recv_op->pkt.len == sizeof(recv_op->pkt.pkt)) {
409 PJ_LOG(4,(tcp->base.obj_name, "Buffer discarded"));
410 recv_op->pkt.len = 0;
411 }
412
413 /* Read next packet */
414 bytes_read = sizeof(recv_op->pkt.pkt) - recv_op->pkt.len;
415 status = pj_ioqueue_recv(tcp->key, op_key,
416 recv_op->pkt.pkt + recv_op->pkt.len,
417 &bytes_read, 0);
418
419 if (status != PJ_EPENDING && status != PJ_SUCCESS)
420 bytes_read = -status;
421
422 } while (status != PJ_EPENDING && status != PJ_ECANCELLED &&
423 status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL));
424
425}
426
427
428static pj_status_t tcp_sendto(pj_turn_transport *tp,
429 const void *packet,
430 pj_size_t size,
431 unsigned flag,
432 const pj_sockaddr_t *addr,
433 int addr_len)
434{
435 struct tcp_transport *tcp = (struct tcp_transport*) tp;
436 pj_ssize_t length = size;
437
438 PJ_UNUSED_ARG(addr);
439 PJ_UNUSED_ARG(addr_len);
440
441 return pj_ioqueue_send(tcp->key, &tcp->send_op, packet, &length, flag);
442}
443
444
445static void tcp_add_ref(pj_turn_transport *tp,
446 pj_turn_allocation *alloc)
447{
448 struct tcp_transport *tcp = (struct tcp_transport*) tp;
449
450 ++tcp->ref_cnt;
451
452 if (tcp->alloc == NULL && alloc) {
453 tcp->alloc = alloc;
454 }
455
456 /* Cancel shutdown timer if it's running */
457 if (tcp->timer.id != TIMER_NONE) {
458 pj_timer_heap_cancel(tcp->base.listener->server->core.timer_heap,
459 &tcp->timer);
460 tcp->timer.id = TIMER_NONE;
461 }
462}
463
464
465static void tcp_dec_ref(pj_turn_transport *tp,
466 pj_turn_allocation *alloc)
467{
468 struct tcp_transport *tcp = (struct tcp_transport*) tp;
469
470 --tcp->ref_cnt;
471
472 if (alloc && alloc == tcp->alloc) {
473 tcp->alloc = NULL;
474 }
475
476 if (tcp->ref_cnt == 0 && tcp->timer.id == TIMER_NONE) {
477 pj_time_val delay = { SHUTDOWN_DELAY, 0 };
478 tcp->timer.id = TIMER_DESTROY;
479 pj_timer_heap_schedule(tcp->base.listener->server->core.timer_heap,
480 &tcp->timer, &delay);
481 }
482}
483
484#else /* PJ_HAS_TCP */
485
486/* To avoid empty translation unit warning */
487int listener_tcp_dummy = 0;
488
489#endif /* PJ_HAS_TCP */
490