blob: 9eec80a561a015784353696755d98062a9b3e0be [file] [log] [blame]
Tristan Matthews0a329cc2013-07-17 13:20:14 -04001/* $Id$ */
2/*
3 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20#include "turn.h"
21#include <pj/compat/socket.h>
22
23struct read_op
24{
25 pj_ioqueue_op_key_t op_key;
26 pj_turn_pkt pkt;
27};
28
29struct udp_listener
30{
31 pj_turn_listener base;
32
33 pj_ioqueue_key_t *key;
34 unsigned read_cnt;
35 struct read_op **read_op; /* Array of read_op's */
36
37 pj_turn_transport tp; /* Transport instance */
38};
39
40
41static pj_status_t udp_destroy(pj_turn_listener *udp);
42static void on_read_complete(pj_ioqueue_key_t *key,
43 pj_ioqueue_op_key_t *op_key,
44 pj_ssize_t bytes_read);
45
46static pj_status_t udp_sendto(pj_turn_transport *tp,
47 const void *packet,
48 pj_size_t size,
49 unsigned flag,
50 const pj_sockaddr_t *addr,
51 int addr_len);
52static void udp_add_ref(pj_turn_transport *tp,
53 pj_turn_allocation *alloc);
54static void udp_dec_ref(pj_turn_transport *tp,
55 pj_turn_allocation *alloc);
56
57
58/*
59 * Create a new listener on the specified port.
60 */
61PJ_DEF(pj_status_t) pj_turn_listener_create_udp( pj_turn_srv *srv,
62 int af,
63 const pj_str_t *bound_addr,
64 unsigned port,
65 unsigned concurrency_cnt,
66 unsigned flags,
67 pj_turn_listener **p_listener)
68{
69 pj_pool_t *pool;
70 struct udp_listener *udp;
71 pj_ioqueue_callback ioqueue_cb;
72 unsigned i;
73 pj_status_t status;
74
75 /* Create structure */
76 pool = pj_pool_create(srv->core.pf, "udp%p", 1000, 1000, NULL);
77 udp = PJ_POOL_ZALLOC_T(pool, struct udp_listener);
78 udp->base.pool = pool;
79 udp->base.obj_name = pool->obj_name;
80 udp->base.server = srv;
81 udp->base.tp_type = PJ_TURN_TP_UDP;
82 udp->base.sock = PJ_INVALID_SOCKET;
83 udp->base.destroy = &udp_destroy;
84 udp->read_cnt = concurrency_cnt;
85 udp->base.flags = flags;
86
87 udp->tp.obj_name = udp->base.obj_name;
88 udp->tp.info = udp->base.info;
89 udp->tp.listener = &udp->base;
90 udp->tp.sendto = &udp_sendto;
91 udp->tp.add_ref = &udp_add_ref;
92 udp->tp.dec_ref = &udp_dec_ref;
93
94 /* Create socket */
95 status = pj_sock_socket(af, pj_SOCK_DGRAM(), 0, &udp->base.sock);
96 if (status != PJ_SUCCESS)
97 goto on_error;
98
99 /* Init bind address */
100 status = pj_sockaddr_init(af, &udp->base.addr, bound_addr,
101 (pj_uint16_t)port);
102 if (status != PJ_SUCCESS)
103 goto on_error;
104
105 /* Create info */
106 pj_ansi_strcpy(udp->base.info, "UDP:");
107 pj_sockaddr_print(&udp->base.addr, udp->base.info+4,
108 sizeof(udp->base.info)-4, 3);
109
110 /* Bind socket */
111 status = pj_sock_bind(udp->base.sock, &udp->base.addr,
112 pj_sockaddr_get_len(&udp->base.addr));
113 if (status != PJ_SUCCESS)
114 goto on_error;
115
116 /* Register to ioqueue */
117 pj_bzero(&ioqueue_cb, sizeof(ioqueue_cb));
118 ioqueue_cb.on_read_complete = on_read_complete;
119 status = pj_ioqueue_register_sock(pool, srv->core.ioqueue, udp->base.sock,
120 udp, &ioqueue_cb, &udp->key);
121
122 /* Create op keys */
123 udp->read_op = (struct read_op**)pj_pool_calloc(pool, concurrency_cnt,
124 sizeof(struct read_op*));
125
126 /* Create each read_op and kick off read operation */
127 for (i=0; i<concurrency_cnt; ++i) {
128 pj_pool_t *rpool = pj_pool_create(srv->core.pf, "rop%p",
129 1000, 1000, NULL);
130
131 udp->read_op[i] = PJ_POOL_ZALLOC_T(pool, struct read_op);
132 udp->read_op[i]->pkt.pool = rpool;
133
134 on_read_complete(udp->key, &udp->read_op[i]->op_key, 0);
135 }
136
137 /* Done */
138 PJ_LOG(4,(udp->base.obj_name, "Listener %s created", udp->base.info));
139
140 *p_listener = &udp->base;
141 return PJ_SUCCESS;
142
143
144on_error:
145 udp_destroy(&udp->base);
146 return status;
147}
148
149
150/*
151 * Destroy listener.
152 */
153static pj_status_t udp_destroy(pj_turn_listener *listener)
154{
155 struct udp_listener *udp = (struct udp_listener *)listener;
156 unsigned i;
157
158 if (udp->key) {
159 pj_ioqueue_unregister(udp->key);
160 udp->key = NULL;
161 udp->base.sock = PJ_INVALID_SOCKET;
162 } else if (udp->base.sock != PJ_INVALID_SOCKET) {
163 pj_sock_close(udp->base.sock);
164 udp->base.sock = PJ_INVALID_SOCKET;
165 }
166
167 for (i=0; i<udp->read_cnt; ++i) {
168 if (udp->read_op[i]->pkt.pool) {
169 pj_pool_t *rpool = udp->read_op[i]->pkt.pool;
170 udp->read_op[i]->pkt.pool = NULL;
171 pj_pool_release(rpool);
172 }
173 }
174
175 if (udp->base.pool) {
176 pj_pool_t *pool = udp->base.pool;
177
178 PJ_LOG(4,(udp->base.obj_name, "Listener %s destroyed",
179 udp->base.info));
180
181 udp->base.pool = NULL;
182 pj_pool_release(pool);
183 }
184 return PJ_SUCCESS;
185}
186
187/*
188 * Callback to send packet.
189 */
190static pj_status_t udp_sendto(pj_turn_transport *tp,
191 const void *packet,
192 pj_size_t size,
193 unsigned flag,
194 const pj_sockaddr_t *addr,
195 int addr_len)
196{
197 pj_ssize_t len = size;
198 return pj_sock_sendto(tp->listener->sock, packet, &len, flag, addr, addr_len);
199}
200
201
202static void udp_add_ref(pj_turn_transport *tp,
203 pj_turn_allocation *alloc)
204{
205 /* Do nothing */
206 PJ_UNUSED_ARG(tp);
207 PJ_UNUSED_ARG(alloc);
208}
209
210static void udp_dec_ref(pj_turn_transport *tp,
211 pj_turn_allocation *alloc)
212{
213 /* Do nothing */
214 PJ_UNUSED_ARG(tp);
215 PJ_UNUSED_ARG(alloc);
216}
217
218
219/*
220 * Callback on received packet.
221 */
222static void on_read_complete(pj_ioqueue_key_t *key,
223 pj_ioqueue_op_key_t *op_key,
224 pj_ssize_t bytes_read)
225{
226 struct udp_listener *udp;
227 struct read_op *read_op = (struct read_op*) op_key;
228 pj_status_t status;
229
230 udp = (struct udp_listener*) pj_ioqueue_get_user_data(key);
231
232 do {
233 pj_pool_t *rpool;
234
235 /* Report to server */
236 if (bytes_read > 0) {
237 read_op->pkt.len = bytes_read;
238 pj_gettimeofday(&read_op->pkt.rx_time);
239
240 pj_turn_srv_on_rx_pkt(udp->base.server, &read_op->pkt);
241 }
242
243 /* Reset pool */
244 rpool = read_op->pkt.pool;
245 pj_pool_reset(rpool);
246 read_op->pkt.pool = rpool;
247 read_op->pkt.transport = &udp->tp;
248 read_op->pkt.src.tp_type = udp->base.tp_type;
249
250 /* Read next packet */
251 bytes_read = sizeof(read_op->pkt.pkt);
252 read_op->pkt.src_addr_len = sizeof(read_op->pkt.src.clt_addr);
253 pj_bzero(&read_op->pkt.src.clt_addr, sizeof(read_op->pkt.src.clt_addr));
254
255 status = pj_ioqueue_recvfrom(udp->key, op_key,
256 read_op->pkt.pkt, &bytes_read, 0,
257 &read_op->pkt.src.clt_addr,
258 &read_op->pkt.src_addr_len);
259
260 if (status != PJ_EPENDING && status != PJ_SUCCESS)
261 bytes_read = -status;
262
263 } while (status != PJ_EPENDING && status != PJ_ECANCELLED &&
264 status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL));
265}
266