blob: 6afe3c3fd812d5f5a9c92553f658eb95cdcebcb7 [file] [log] [blame]
Benny Prijono7cd16222007-03-05 00:58:24 +00001/* $Id$ */
2/*
3 * Copyright (C) 2003-2005 Benny Prijono <benny@prijono.org>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19#include "server.h"
20
21struct worker
22{
23 pj_ioqueue_op_key_t read_key;
24 unsigned index;
25 pj_uint8_t readbuf[4000];
26 pj_sockaddr src_addr;
27 int src_addr_len;
28};
29
30struct pj_stun_usage
31{
32 pj_pool_t *pool;
33 pj_mutex_t *mutex;
34 pj_stun_usage_cb cb;
35 int type;
36 pj_sock_t sock;
37 pj_ioqueue_key_t *key;
38 unsigned worker_cnt;
39 struct worker *worker;
40
41 pj_ioqueue_op_key_t *send_key;
42 unsigned send_count, send_index;
43
44 pj_bool_t quitting;
45 void *user_data;
46};
47
48
49static void on_read_complete(pj_ioqueue_key_t *key,
50 pj_ioqueue_op_key_t *op_key,
51 pj_ssize_t bytes_read);
52
53/*
54 * Create STUN usage.
55 */
56PJ_DEF(pj_status_t) pj_stun_usage_create( pj_stun_server *srv,
57 const char *name,
58 const pj_stun_usage_cb *cb,
59 int family,
60 int type,
61 int protocol,
62 const pj_sockaddr_t *local_addr,
63 int addr_len,
64 pj_stun_usage **p_usage)
65{
66 pj_stun_server_info *si;
67 pj_pool_t *pool;
68 pj_stun_usage *usage;
69 pj_ioqueue_callback ioqueue_cb;
70 unsigned i;
71 pj_status_t status;
72
73 si = pj_stun_server_get_info(srv);
74
75 pool = pj_pool_create(si->pf, name, 4000, 4000, NULL);
76 usage = PJ_POOL_ZALLOC_T(pool, pj_stun_usage);
77 usage->pool = pool;
78
79 status = pj_mutex_create_simple(pool, name, &usage->mutex);
80 if (status != PJ_SUCCESS)
81 goto on_error;
82
83 pj_memcpy(&usage->cb, cb, sizeof(*cb));
84
85 usage->type = type;
86 status = pj_sock_socket(family, type, protocol, &usage->sock);
87 if (status != PJ_SUCCESS)
88 goto on_error;
89
90 status = pj_sock_bind(usage->sock, local_addr, addr_len);
91 if (status != PJ_SUCCESS)
92 goto on_error;
93
94 pj_bzero(&ioqueue_cb, sizeof(ioqueue_cb));
95 ioqueue_cb.on_read_complete = &on_read_complete;
96 status = pj_ioqueue_register_sock(usage->pool, si->ioqueue, usage->sock,
97 usage, &ioqueue_cb, &usage->key);
98 if (status != PJ_SUCCESS)
99 goto on_error;
100
101 usage->worker_cnt = si->thread_cnt;
102 usage->worker = pj_pool_calloc(pool, si->thread_cnt,
103 sizeof(struct worker));
104 for (i=0; i<si->thread_cnt; ++i) {
105 pj_ioqueue_op_key_init(&usage->worker[i].read_key,
106 sizeof(usage->worker[i].read_key));
107 usage->worker[i].index = i;
108 }
109
110 usage->send_count = usage->worker_cnt * 2;
111 usage->send_key = pj_pool_calloc(pool, usage->send_count,
112 sizeof(pj_ioqueue_op_key_t));
113 for (i=0; i<usage->send_count; ++i) {
114 pj_ioqueue_op_key_init(&usage->send_key[i],
115 sizeof(usage->send_key[i]));
116 }
117
118 for (i=0; i<si->thread_cnt; ++i) {
119 pj_ssize_t size;
120
121 size = sizeof(usage->worker[i].readbuf);
122 usage->worker[i].src_addr_len = sizeof(usage->worker[i].src_addr);
123 status = pj_ioqueue_recvfrom(usage->key, &usage->worker[i].read_key,
124 usage->worker[i].readbuf, &size,
125 PJ_IOQUEUE_ALWAYS_ASYNC,
126 &usage->worker[i].src_addr,
127 &usage->worker[i].src_addr_len);
128 if (status != PJ_EPENDING)
129 goto on_error;
130 }
131
132 *p_usage = usage;
133 return PJ_SUCCESS;
134
135on_error:
136 pj_stun_usage_destroy(usage);
137 return status;
138}
139
140
141/**
142 * Destroy usage.
143 */
144PJ_DEF(pj_status_t) pj_stun_usage_destroy(pj_stun_usage *usage)
145{
146 if (usage->key) {
147 pj_ioqueue_unregister(usage->key);
148 usage->key = NULL;
149 usage->sock = PJ_INVALID_SOCKET;
150 } else if (usage->sock != 0 && usage->sock != PJ_INVALID_SOCKET) {
151 pj_sock_close(usage->sock);
152 usage->sock = PJ_INVALID_SOCKET;
153 }
154
155 if (usage->mutex) {
156 pj_mutex_destroy(usage->mutex);
157 usage->mutex = NULL;
158 }
159
160 if (usage->pool) {
161 pj_pool_t *pool = usage->pool;
162 usage->pool = NULL;
163 pj_pool_release(pool);
164 }
165
166 return PJ_SUCCESS;
167}
168
169
170/**
171 * Set user data.
172 */
173PJ_DEF(pj_status_t) pj_stun_usage_set_user_data( pj_stun_usage *usage,
174 void *user_data)
175{
176 usage->user_data = user_data;
177 return PJ_SUCCESS;
178}
179
180/**
181 * Get user data.
182 */
183PJ_DEF(void*) pj_stun_usage_get_user_data(pj_stun_usage *usage)
184{
185 return usage->user_data;
186}
187
188
189/**
190 * Send with the usage.
191 */
192PJ_DEF(pj_status_t) pj_stun_usage_sendto( pj_stun_usage *usage,
193 const void *pkt,
194 pj_size_t pkt_size,
195 unsigned flags,
196 const pj_sockaddr_t *dst_addr,
197 unsigned addr_len)
198{
199 pj_ssize_t size = pkt_size;
200 unsigned i, count = usage->send_count, index;
201
202 pj_mutex_lock(usage->mutex);
203 for (i=0, ++usage->send_index; i<count; ++i, ++usage->send_index) {
204 if (usage->send_index >= usage->send_count)
205 usage->send_index = 0;
206
207 if (pj_ioqueue_is_pending(usage->key, &usage->send_key[usage->send_index])==0) {
208 break;
209 }
210 }
211
212 if (i==count) {
213 pj_mutex_unlock(usage->mutex);
214 return PJ_EBUSY;
215 }
216
217 index = usage->send_index;
218 pj_mutex_unlock(usage->mutex);
219
220 return pj_ioqueue_sendto(usage->key, &usage->send_key[index],
221 pkt, &size, flags,
222 dst_addr, addr_len);
223}
224
225
226static void on_read_complete(pj_ioqueue_key_t *key,
227 pj_ioqueue_op_key_t *op_key,
228 pj_ssize_t bytes_read)
229{
230 enum { MAX_LOOP = 10 };
231 pj_stun_usage *usage = pj_ioqueue_get_user_data(key);
232 struct worker *worker = (struct worker*) op_key;
233 unsigned count;
234 pj_status_t status;
235
236 for (count=0; !usage->quitting; ++count) {
237 unsigned flags;
238
239 if (bytes_read > 0) {
240 (*usage->cb.on_rx_data)(usage, worker->readbuf, bytes_read,
241 &worker->src_addr, worker->src_addr_len);
242 } else if (bytes_read < 0) {
243 pj_stun_perror(usage->pool->obj_name, "recv() error", -bytes_read);
244 }
245
246 if (usage->quitting)
247 break;
248
249 bytes_read = sizeof(worker->readbuf);
250 flags = (count >= MAX_LOOP) ? PJ_IOQUEUE_ALWAYS_ASYNC : 0;
251 worker->src_addr_len = sizeof(worker->src_addr);
252 status = pj_ioqueue_recvfrom(usage->key, &worker->read_key,
253 worker->readbuf, &bytes_read, flags,
254 &worker->src_addr, &worker->src_addr_len);
255 if (status == PJ_EPENDING)
256 break;
257 }
258}
259