blob: 3a7c14e370f6b9a8d9c0653fd36df16c026b5d43 [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
Benny Prijono844653c2008-12-23 17:27:53 +00003 * Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono9033e312005-11-21 02:08:39 +00005 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20
21/*
22 * sock_select.c
23 *
24 * This is the implementation of IOQueue using pj_sock_select().
25 * It runs anywhere where pj_sock_select() is available (currently
26 * Win32, Linux, Linux kernel, etc.).
27 */
28
29#include <pj/ioqueue.h>
30#include <pj/os.h>
31#include <pj/lock.h>
32#include <pj/log.h>
33#include <pj/list.h>
34#include <pj/pool.h>
35#include <pj/string.h>
36#include <pj/assert.h>
37#include <pj/sock.h>
38#include <pj/compat/socket.h>
39#include <pj/sock_select.h>
Sauw Mingbe3771a2010-08-27 06:46:29 +000040#include <pj/sock_qos.h>
Benny Prijono9033e312005-11-21 02:08:39 +000041#include <pj/errno.h>
42
Benny Prijono40fe9082008-02-08 15:21:41 +000043/* Now that we have access to OS'es <sys/select>, lets check again that
44 * PJ_IOQUEUE_MAX_HANDLES is not greater than FD_SETSIZE
45 */
46#if PJ_IOQUEUE_MAX_HANDLES > FD_SETSIZE
47# error "PJ_IOQUEUE_MAX_HANDLES cannot be greater than FD_SETSIZE"
48#endif
49
50
Benny Prijono9033e312005-11-21 02:08:39 +000051/*
52 * Include declaration from common abstraction.
53 */
54#include "ioqueue_common_abs.h"
55
56/*
57 * ISSUES with ioqueue_select()
58 *
59 * EAGAIN/EWOULDBLOCK error in recv():
60 * - when multiple threads are working with the ioqueue, application
61 * may receive EAGAIN or EWOULDBLOCK in the receive callback.
62 * This error happens because more than one thread is watching for
63 * the same descriptor set, so when all of them call recv() or recvfrom()
64 * simultaneously, only one will succeed and the rest will get the error.
65 *
66 */
67#define THIS_FILE "ioq_select"
68
69/*
70 * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
71 * the correct error code.
72 */
73#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
74# error "Error reporting must be enabled for this function to work!"
75#endif
76
Benny Prijono9033e312005-11-21 02:08:39 +000077/*
78 * During debugging build, VALIDATE_FD_SET is set.
79 * This will check the validity of the fd_sets.
80 */
81/*
82#if defined(PJ_DEBUG) && PJ_DEBUG != 0
83# define VALIDATE_FD_SET 1
84#else
85# define VALIDATE_FD_SET 0
86#endif
87*/
88#define VALIDATE_FD_SET 0
89
Benny Prijono42c5b9e2006-05-10 19:24:40 +000090#if 0
91# define TRACE__(args) PJ_LOG(3,args)
92#else
93# define TRACE__(args)
94#endif
95
Benny Prijono9033e312005-11-21 02:08:39 +000096/*
97 * This describes each key.
98 */
99struct pj_ioqueue_key_t
100{
101 DECLARE_COMMON_KEY
102};
103
104/*
105 * This describes the I/O queue itself.
106 */
107struct pj_ioqueue_t
108{
109 DECLARE_COMMON_IOQUEUE
110
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000111 unsigned max, count; /* Max and current key count */
112 int nfds; /* The largest fd value (for select)*/
113 pj_ioqueue_key_t active_list; /* List of active keys. */
Benny Prijono9033e312005-11-21 02:08:39 +0000114 pj_fd_set_t rfdset;
115 pj_fd_set_t wfdset;
116#if PJ_HAS_TCP
117 pj_fd_set_t xfdset;
118#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000119
120#if PJ_IOQUEUE_HAS_SAFE_UNREG
121 pj_mutex_t *ref_cnt_mutex;
122 pj_ioqueue_key_t closing_list;
123 pj_ioqueue_key_t free_list;
124#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000125};
126
Sauw Mingbe3771a2010-08-27 06:46:29 +0000127/* Proto */
128static pj_status_t replace_udp_sock(pj_ioqueue_key_t *h);
129
Benny Prijono9033e312005-11-21 02:08:39 +0000130/* Include implementation for common abstraction after we declare
131 * pj_ioqueue_key_t and pj_ioqueue_t.
132 */
133#include "ioqueue_common_abs.c"
134
Benny Prijonocde71422007-09-19 12:03:28 +0000135#if PJ_IOQUEUE_HAS_SAFE_UNREG
136/* Scan closing keys to be put to free list again */
137static void scan_closing_keys(pj_ioqueue_t *ioqueue);
138#endif
139
Benny Prijono9033e312005-11-21 02:08:39 +0000140/*
141 * pj_ioqueue_name()
142 */
143PJ_DEF(const char*) pj_ioqueue_name(void)
144{
145 return "select";
146}
147
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000148/*
149 * Scan the socket descriptor sets for the largest descriptor.
150 * This value is needed by select().
151 */
152#if defined(PJ_SELECT_NEEDS_NFDS) && PJ_SELECT_NEEDS_NFDS!=0
153static void rescan_fdset(pj_ioqueue_t *ioqueue)
154{
155 pj_ioqueue_key_t *key = ioqueue->active_list.next;
156 int max = 0;
157
158 while (key != &ioqueue->active_list) {
159 if (key->fd > max)
160 max = key->fd;
161 key = key->next;
162 }
163
164 ioqueue->nfds = max;
165}
166#else
167static void rescan_fdset(pj_ioqueue_t *ioqueue)
168{
169 ioqueue->nfds = FD_SETSIZE-1;
170}
171#endif
172
173
Benny Prijono9033e312005-11-21 02:08:39 +0000174/*
175 * pj_ioqueue_create()
176 *
177 * Create select ioqueue.
178 */
179PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
180 pj_size_t max_fd,
181 pj_ioqueue_t **p_ioqueue)
182{
183 pj_ioqueue_t *ioqueue;
184 pj_lock_t *lock;
Benny Prijono5accbd02006-03-30 16:32:18 +0000185 unsigned i;
Benny Prijono9033e312005-11-21 02:08:39 +0000186 pj_status_t rc;
187
188 /* Check that arguments are valid. */
189 PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
190 max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,
191 PJ_EINVAL);
192
193 /* Check that size of pj_ioqueue_op_key_t is sufficient */
194 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
195 sizeof(union operation_key), PJ_EBUG);
196
Benny Prijono5accbd02006-03-30 16:32:18 +0000197 /* Create and init common ioqueue stuffs */
Benny Prijonoa1e69682007-05-11 15:14:34 +0000198 ioqueue = PJ_POOL_ALLOC_T(pool, pj_ioqueue_t);
Benny Prijono9033e312005-11-21 02:08:39 +0000199 ioqueue_init(ioqueue);
200
201 ioqueue->max = max_fd;
202 ioqueue->count = 0;
203 PJ_FD_ZERO(&ioqueue->rfdset);
204 PJ_FD_ZERO(&ioqueue->wfdset);
205#if PJ_HAS_TCP
206 PJ_FD_ZERO(&ioqueue->xfdset);
207#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000208 pj_list_init(&ioqueue->active_list);
Benny Prijono9033e312005-11-21 02:08:39 +0000209
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000210 rescan_fdset(ioqueue);
211
Benny Prijono5accbd02006-03-30 16:32:18 +0000212#if PJ_IOQUEUE_HAS_SAFE_UNREG
213 /* When safe unregistration is used (the default), we pre-create
214 * all keys and put them in the free list.
215 */
216
217 /* Mutex to protect key's reference counter
218 * We don't want to use key's mutex or ioqueue's mutex because
219 * that would create deadlock situation in some cases.
220 */
221 rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex);
222 if (rc != PJ_SUCCESS)
223 return rc;
224
225
226 /* Init key list */
227 pj_list_init(&ioqueue->free_list);
228 pj_list_init(&ioqueue->closing_list);
229
230
231 /* Pre-create all keys according to max_fd */
232 for (i=0; i<max_fd; ++i) {
233 pj_ioqueue_key_t *key;
234
Benny Prijonoa1e69682007-05-11 15:14:34 +0000235 key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t);
Benny Prijono5accbd02006-03-30 16:32:18 +0000236 key->ref_count = 0;
237 rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
238 if (rc != PJ_SUCCESS) {
239 key = ioqueue->free_list.next;
240 while (key != &ioqueue->free_list) {
241 pj_mutex_destroy(key->mutex);
242 key = key->next;
243 }
244 pj_mutex_destroy(ioqueue->ref_cnt_mutex);
245 return rc;
246 }
247
248 pj_list_push_back(&ioqueue->free_list, key);
249 }
250#endif
251
252 /* Create and init ioqueue mutex */
Benny Prijono9033e312005-11-21 02:08:39 +0000253 rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
254 if (rc != PJ_SUCCESS)
255 return rc;
256
257 rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
258 if (rc != PJ_SUCCESS)
259 return rc;
260
261 PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
262
263 *p_ioqueue = ioqueue;
264 return PJ_SUCCESS;
265}
266
267/*
268 * pj_ioqueue_destroy()
269 *
270 * Destroy ioqueue.
271 */
272PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
273{
Benny Prijono5accbd02006-03-30 16:32:18 +0000274 pj_ioqueue_key_t *key;
275
Benny Prijono9033e312005-11-21 02:08:39 +0000276 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
277
278 pj_lock_acquire(ioqueue->lock);
Benny Prijono5accbd02006-03-30 16:32:18 +0000279
280#if PJ_IOQUEUE_HAS_SAFE_UNREG
281 /* Destroy reference counters */
282 key = ioqueue->active_list.next;
283 while (key != &ioqueue->active_list) {
284 pj_mutex_destroy(key->mutex);
285 key = key->next;
286 }
287
288 key = ioqueue->closing_list.next;
289 while (key != &ioqueue->closing_list) {
290 pj_mutex_destroy(key->mutex);
291 key = key->next;
292 }
293
294 key = ioqueue->free_list.next;
295 while (key != &ioqueue->free_list) {
296 pj_mutex_destroy(key->mutex);
297 key = key->next;
298 }
299
300 pj_mutex_destroy(ioqueue->ref_cnt_mutex);
301#endif
302
Benny Prijono9033e312005-11-21 02:08:39 +0000303 return ioqueue_destroy(ioqueue);
304}
305
306
307/*
308 * pj_ioqueue_register_sock()
309 *
Benny Prijono5accbd02006-03-30 16:32:18 +0000310 * Register socket handle to ioqueue.
Benny Prijono9033e312005-11-21 02:08:39 +0000311 */
312PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
313 pj_ioqueue_t *ioqueue,
314 pj_sock_t sock,
315 void *user_data,
316 const pj_ioqueue_callback *cb,
317 pj_ioqueue_key_t **p_key)
318{
319 pj_ioqueue_key_t *key = NULL;
Benny Prijonofc24e692007-01-27 18:31:51 +0000320#if defined(PJ_WIN32) && PJ_WIN32!=0 || \
321 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
322 u_long value;
323#else
Benny Prijono9033e312005-11-21 02:08:39 +0000324 pj_uint32_t value;
Benny Prijonofc24e692007-01-27 18:31:51 +0000325#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000326 pj_status_t rc = PJ_SUCCESS;
327
328 PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
329 cb && p_key, PJ_EINVAL);
330
331 pj_lock_acquire(ioqueue->lock);
332
333 if (ioqueue->count >= ioqueue->max) {
334 rc = PJ_ETOOMANY;
335 goto on_return;
336 }
337
Benny Prijono5accbd02006-03-30 16:32:18 +0000338 /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get
339 * the key from the free list. Otherwise allocate a new one.
340 */
341#if PJ_IOQUEUE_HAS_SAFE_UNREG
Benny Prijonocde71422007-09-19 12:03:28 +0000342
343 /* Scan closing_keys first to let them come back to free_list */
344 scan_closing_keys(ioqueue);
345
Benny Prijono5accbd02006-03-30 16:32:18 +0000346 pj_assert(!pj_list_empty(&ioqueue->free_list));
347 if (pj_list_empty(&ioqueue->free_list)) {
348 rc = PJ_ETOOMANY;
349 goto on_return;
350 }
351
352 key = ioqueue->free_list.next;
353 pj_list_erase(key);
354#else
355 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
356#endif
357
358 rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
359 if (rc != PJ_SUCCESS) {
360 key = NULL;
361 goto on_return;
362 }
363
Benny Prijono9033e312005-11-21 02:08:39 +0000364 /* Set socket to nonblocking. */
365 value = 1;
Benny Prijono9cf138e2006-01-19 03:58:29 +0000366#if defined(PJ_WIN32) && PJ_WIN32!=0 || \
367 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
Benny Prijonofc24e692007-01-27 18:31:51 +0000368 if (ioctlsocket(sock, FIONBIO, &value)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000369#else
370 if (ioctl(sock, FIONBIO, &value)) {
371#endif
372 rc = pj_get_netos_error();
373 goto on_return;
374 }
375
Benny Prijono9033e312005-11-21 02:08:39 +0000376
Benny Prijono5accbd02006-03-30 16:32:18 +0000377 /* Put in active list. */
378 pj_list_insert_before(&ioqueue->active_list, key);
Benny Prijono9033e312005-11-21 02:08:39 +0000379 ++ioqueue->count;
380
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000381 /* Rescan fdset to get max descriptor */
382 rescan_fdset(ioqueue);
383
Benny Prijono9033e312005-11-21 02:08:39 +0000384on_return:
385 /* On error, socket may be left in non-blocking mode. */
386 *p_key = key;
387 pj_lock_release(ioqueue->lock);
388
389 return rc;
390}
391
Benny Prijono5accbd02006-03-30 16:32:18 +0000392#if PJ_IOQUEUE_HAS_SAFE_UNREG
393/* Increment key's reference counter */
394static void increment_counter(pj_ioqueue_key_t *key)
395{
396 pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
397 ++key->ref_count;
398 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
399}
400
401/* Decrement the key's reference counter, and when the counter reach zero,
402 * destroy the key.
403 *
404 * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK.
405 */
406static void decrement_counter(pj_ioqueue_key_t *key)
407{
Benny Prijono324409e2007-10-31 07:53:17 +0000408 pj_lock_acquire(key->ioqueue->lock);
Benny Prijono5accbd02006-03-30 16:32:18 +0000409 pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
410 --key->ref_count;
411 if (key->ref_count == 0) {
412
413 pj_assert(key->closing == 1);
414 pj_gettimeofday(&key->free_time);
415 key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
416 pj_time_val_normalize(&key->free_time);
417
Benny Prijono5accbd02006-03-30 16:32:18 +0000418 pj_list_erase(key);
419 pj_list_push_back(&key->ioqueue->closing_list, key);
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000420 /* Rescan fdset to get max descriptor */
421 rescan_fdset(key->ioqueue);
Benny Prijono5accbd02006-03-30 16:32:18 +0000422 }
423 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
Benny Prijono324409e2007-10-31 07:53:17 +0000424 pj_lock_release(key->ioqueue->lock);
Benny Prijono5accbd02006-03-30 16:32:18 +0000425}
426#endif
427
428
Benny Prijono9033e312005-11-21 02:08:39 +0000429/*
430 * pj_ioqueue_unregister()
431 *
432 * Unregister handle from ioqueue.
433 */
434PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
435{
436 pj_ioqueue_t *ioqueue;
437
438 PJ_ASSERT_RETURN(key, PJ_EINVAL);
439
440 ioqueue = key->ioqueue;
441
Benny Prijono5accbd02006-03-30 16:32:18 +0000442 /* Lock the key to make sure no callback is simultaneously modifying
443 * the key. We need to lock the key before ioqueue here to prevent
444 * deadlock.
445 */
446 pj_mutex_lock(key->mutex);
447
448 /* Also lock ioqueue */
Benny Prijono9033e312005-11-21 02:08:39 +0000449 pj_lock_acquire(ioqueue->lock);
450
451 pj_assert(ioqueue->count > 0);
452 --ioqueue->count;
Benny Prijono9969d182008-04-02 18:36:35 +0000453#if !PJ_IOQUEUE_HAS_SAFE_UNREG
454 /* Ticket #520, key will be erased more than once */
Benny Prijono9033e312005-11-21 02:08:39 +0000455 pj_list_erase(key);
Benny Prijono9969d182008-04-02 18:36:35 +0000456#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000457 PJ_FD_CLR(key->fd, &ioqueue->rfdset);
458 PJ_FD_CLR(key->fd, &ioqueue->wfdset);
459#if PJ_HAS_TCP
460 PJ_FD_CLR(key->fd, &ioqueue->xfdset);
461#endif
462
Benny Prijono5accbd02006-03-30 16:32:18 +0000463 /* Close socket. */
464 pj_sock_close(key->fd);
465
466 /* Clear callback */
467 key->cb.on_accept_complete = NULL;
468 key->cb.on_connect_complete = NULL;
469 key->cb.on_read_complete = NULL;
470 key->cb.on_write_complete = NULL;
471
472 /* Must release ioqueue lock first before decrementing counter, to
473 * prevent deadlock.
Benny Prijono9033e312005-11-21 02:08:39 +0000474 */
475 pj_lock_release(ioqueue->lock);
476
Benny Prijono5accbd02006-03-30 16:32:18 +0000477#if PJ_IOQUEUE_HAS_SAFE_UNREG
478 /* Mark key is closing. */
479 key->closing = 1;
480
481 /* Decrement counter. */
482 decrement_counter(key);
483
484 /* Done. */
485 pj_mutex_unlock(key->mutex);
486#else
487 pj_mutex_destroy(key->mutex);
488#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000489
490 return PJ_SUCCESS;
491}
492
493
494/* This supposed to check whether the fd_set values are consistent
495 * with the operation currently set in each key.
496 */
497#if VALIDATE_FD_SET
498static void validate_sets(const pj_ioqueue_t *ioqueue,
499 const pj_fd_set_t *rfdset,
500 const pj_fd_set_t *wfdset,
501 const pj_fd_set_t *xfdset)
502{
503 pj_ioqueue_key_t *key;
504
505 /*
506 * This basicly would not work anymore.
507 * We need to lock key before performing the check, but we can't do
508 * so because we're holding ioqueue mutex. If we acquire key's mutex
509 * now, the will cause deadlock.
510 */
511 pj_assert(0);
512
Benny Prijono5accbd02006-03-30 16:32:18 +0000513 key = ioqueue->active_list.next;
514 while (key != &ioqueue->active_list) {
Benny Prijono9033e312005-11-21 02:08:39 +0000515 if (!pj_list_empty(&key->read_list)
516#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
517 || !pj_list_empty(&key->accept_list)
518#endif
519 )
520 {
521 pj_assert(PJ_FD_ISSET(key->fd, rfdset));
522 }
523 else {
524 pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
525 }
526 if (!pj_list_empty(&key->write_list)
527#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
528 || key->connecting
529#endif
530 )
531 {
532 pj_assert(PJ_FD_ISSET(key->fd, wfdset));
533 }
534 else {
535 pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
536 }
537#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
538 if (key->connecting)
539 {
540 pj_assert(PJ_FD_ISSET(key->fd, xfdset));
541 }
542 else {
543 pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0);
544 }
545#endif /* PJ_HAS_TCP */
546
547 key = key->next;
548 }
549}
550#endif /* VALIDATE_FD_SET */
551
552
553/* ioqueue_remove_from_set()
554 * This function is called from ioqueue_dispatch_event() to instruct
555 * the ioqueue to remove the specified descriptor from ioqueue's descriptor
556 * set for the specified event.
557 */
558static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
Benny Prijono63ab3562006-07-08 19:46:43 +0000559 pj_ioqueue_key_t *key,
Benny Prijono9033e312005-11-21 02:08:39 +0000560 enum ioqueue_event_type event_type)
561{
562 pj_lock_acquire(ioqueue->lock);
563
564 if (event_type == READABLE_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000565 PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->rfdset);
Benny Prijono9033e312005-11-21 02:08:39 +0000566 else if (event_type == WRITEABLE_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000567 PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->wfdset);
Benny Prijono3569c0d2007-04-06 10:29:20 +0000568#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
Benny Prijono9033e312005-11-21 02:08:39 +0000569 else if (event_type == EXCEPTION_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000570 PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->xfdset);
Benny Prijono3569c0d2007-04-06 10:29:20 +0000571#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000572 else
573 pj_assert(0);
574
575 pj_lock_release(ioqueue->lock);
576}
577
578/*
579 * ioqueue_add_to_set()
580 * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
581 * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
582 * set for the specified event.
583 */
584static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
Benny Prijono63ab3562006-07-08 19:46:43 +0000585 pj_ioqueue_key_t *key,
Benny Prijono9033e312005-11-21 02:08:39 +0000586 enum ioqueue_event_type event_type )
587{
588 pj_lock_acquire(ioqueue->lock);
589
590 if (event_type == READABLE_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000591 PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->rfdset);
Benny Prijono9033e312005-11-21 02:08:39 +0000592 else if (event_type == WRITEABLE_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000593 PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->wfdset);
Benny Prijono3569c0d2007-04-06 10:29:20 +0000594#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
Benny Prijono9033e312005-11-21 02:08:39 +0000595 else if (event_type == EXCEPTION_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000596 PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->xfdset);
Benny Prijono3569c0d2007-04-06 10:29:20 +0000597#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000598 else
599 pj_assert(0);
600
601 pj_lock_release(ioqueue->lock);
602}
603
Benny Prijono5accbd02006-03-30 16:32:18 +0000604#if PJ_IOQUEUE_HAS_SAFE_UNREG
605/* Scan closing keys to be put to free list again */
606static void scan_closing_keys(pj_ioqueue_t *ioqueue)
607{
608 pj_time_val now;
609 pj_ioqueue_key_t *h;
610
611 pj_gettimeofday(&now);
612 h = ioqueue->closing_list.next;
613 while (h != &ioqueue->closing_list) {
614 pj_ioqueue_key_t *next = h->next;
615
616 pj_assert(h->closing != 0);
617
618 if (PJ_TIME_VAL_GTE(now, h->free_time)) {
619 pj_list_erase(h);
620 pj_list_push_back(&ioqueue->free_list, h);
621 }
622 h = next;
623 }
624}
625#endif
626
Sauw Mingbe3771a2010-08-27 06:46:29 +0000627#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
628 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
629static pj_status_t replace_udp_sock(pj_ioqueue_key_t *h)
630{
631 enum flags {
632 HAS_PEER_ADDR = 1,
633 HAS_QOS = 2
634 };
635 pj_sock_t old_sock, new_sock = PJ_INVALID_SOCKET;
636 pj_sockaddr local_addr, rem_addr;
637 int val, addr_len;
638 pj_fd_set_t *fds[3];
639 unsigned i, fds_cnt, flags=0;
640 pj_qos_params qos_params;
641 unsigned msec;
642 pj_status_t status;
643
644 pj_lock_acquire(h->ioqueue->lock);
645
646 old_sock = h->fd;
647
648 /* Can only replace UDP socket */
649 pj_assert(h->fd_type == pj_SOCK_DGRAM());
650
651 PJ_LOG(4,(THIS_FILE, "Attempting to replace UDP socket %d", old_sock));
652
653 /* Investigate the old socket */
654 addr_len = sizeof(local_addr);
655 status = pj_sock_getsockname(old_sock, &local_addr, &addr_len);
656 if (status != PJ_SUCCESS)
657 goto on_error;
658
659 addr_len = sizeof(rem_addr);
660 status = pj_sock_getpeername(old_sock, &rem_addr, &addr_len);
661 if (status == PJ_SUCCESS)
662 flags |= HAS_PEER_ADDR;
663
664 status = pj_sock_get_qos_params(old_sock, &qos_params);
665 if (status == PJ_SUCCESS)
666 flags |= HAS_QOS;
667
668 /* We're done with the old socket, close it otherwise we'll get
669 * error in bind()
670 */
671 pj_sock_close(old_sock);
672
673 /* Prepare the new socket */
674 status = pj_sock_socket(local_addr.addr.sa_family, PJ_SOCK_DGRAM, 0,
675 &new_sock);
676 if (status != PJ_SUCCESS)
677 goto on_error;
678
679 /* Even after the socket is closed, we'll still get "Address in use"
680 * errors, so force it with SO_REUSEADDR
681 */
682 val = 1;
683 status = pj_sock_setsockopt(new_sock, SOL_SOCKET, SO_REUSEADDR,
684 &val, sizeof(val));
685 if (status != PJ_SUCCESS)
686 goto on_error;
687
688 /* The loop is silly, but what else can we do? */
689 addr_len = pj_sockaddr_get_len(&local_addr);
690 for (msec=20; ; msec<1000? msec=msec*2 : 1000) {
691 status = pj_sock_bind(new_sock, &local_addr, addr_len);
692 if (status != PJ_STATUS_FROM_OS(EADDRINUSE))
693 break;
694 PJ_LOG(4,(THIS_FILE, "Address is still in use, retrying.."));
695 pj_thread_sleep(msec);
696 }
697
698 if (status != PJ_SUCCESS)
699 goto on_error;
700
701 if (flags & HAS_QOS) {
702 status = pj_sock_set_qos_params(new_sock, &qos_params);
703 if (status != PJ_SUCCESS)
704 goto on_error;
705 }
706
707 if (flags & HAS_PEER_ADDR) {
708 status = pj_sock_connect(new_sock, &rem_addr, addr_len);
709 if (status != PJ_SUCCESS)
710 goto on_error;
711 }
712
713 /* Set socket to nonblocking. */
714 val = 1;
715#if defined(PJ_WIN32) && PJ_WIN32!=0 || \
716 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
717 if (ioctlsocket(new_sock, FIONBIO, &val)) {
718#else
719 if (ioctl(new_sock, FIONBIO, &val)) {
720#endif
721 status = pj_get_netos_error();
722 goto on_error;
723 }
724
725 /* Replace the occurrence of old socket with new socket in the
726 * fd sets.
727 */
728 fds_cnt = 0;
729 fds[fds_cnt++] = &h->ioqueue->rfdset;
730 fds[fds_cnt++] = &h->ioqueue->wfdset;
731#if PJ_HAS_TCP
732 fds[fds_cnt++] = &h->ioqueue->xfdset;
733#endif
734
735 for (i=0; i<fds_cnt; ++i) {
736 if (PJ_FD_ISSET(old_sock, fds[i])) {
737 PJ_FD_CLR(old_sock, fds[i]);
738 PJ_FD_SET(new_sock, fds[i]);
739 }
740 }
741
742 /* And finally replace the fd in the key */
743 h->fd = new_sock;
744
745 PJ_LOG(4,(THIS_FILE, "UDP has been replaced successfully!"));
746
747 pj_lock_release(h->ioqueue->lock);
748
749 return PJ_SUCCESS;
750
751on_error:
752 if (new_sock != PJ_INVALID_SOCKET)
753 pj_sock_close(new_sock);
754 PJ_PERROR(1,(THIS_FILE, status, "Error replacing socket"));
755 pj_lock_release(h->ioqueue->lock);
756 return status;
757}
758#endif
759
Benny Prijono5accbd02006-03-30 16:32:18 +0000760
Benny Prijono9033e312005-11-21 02:08:39 +0000761/*
762 * pj_ioqueue_poll()
763 *
764 * Few things worth written:
765 *
766 * - we used to do only one callback called per poll, but it didn't go
767 * very well. The reason is because on some situation, the write
768 * callback gets called all the time, thus doesn't give the read
769 * callback to get called. This happens, for example, when user
770 * submit write operation inside the write callback.
771 * As the result, we changed the behaviour so that now multiple
772 * callbacks are called in a single poll. It should be fast too,
773 * just that we need to be carefull with the ioqueue data structs.
774 *
775 * - to guarantee preemptiveness etc, the poll function must strictly
776 * work on fd_set copy of the ioqueue (not the original one).
777 */
778PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
779{
780 pj_fd_set_t rfdset, wfdset, xfdset;
781 int count, counter;
782 pj_ioqueue_key_t *h;
783 struct event
784 {
785 pj_ioqueue_key_t *key;
786 enum ioqueue_event_type event_type;
787 } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
788
Benny Prijono37e8d332006-01-20 21:03:36 +0000789 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
Benny Prijono9033e312005-11-21 02:08:39 +0000790
791 /* Lock ioqueue before making fd_set copies */
792 pj_lock_acquire(ioqueue->lock);
793
794 /* We will only do select() when there are sockets to be polled.
795 * Otherwise select() will return error.
796 */
797 if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
Benny Prijono3569c0d2007-04-06 10:29:20 +0000798 PJ_FD_COUNT(&ioqueue->wfdset)==0
799#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
800 && PJ_FD_COUNT(&ioqueue->xfdset)==0
801#endif
802 )
Benny Prijono9033e312005-11-21 02:08:39 +0000803 {
Benny Prijono5accbd02006-03-30 16:32:18 +0000804#if PJ_IOQUEUE_HAS_SAFE_UNREG
805 scan_closing_keys(ioqueue);
806#endif
807 pj_lock_release(ioqueue->lock);
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000808 TRACE__((THIS_FILE, " poll: no fd is set"));
Benny Prijono9033e312005-11-21 02:08:39 +0000809 if (timeout)
810 pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
811 return 0;
812 }
813
814 /* Copy ioqueue's pj_fd_set_t to local variables. */
815 pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));
816 pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));
817#if PJ_HAS_TCP
818 pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));
819#else
820 PJ_FD_ZERO(&xfdset);
821#endif
822
823#if VALIDATE_FD_SET
824 validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);
825#endif
826
827 /* Unlock ioqueue before select(). */
828 pj_lock_release(ioqueue->lock);
829
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000830 count = pj_sock_select(ioqueue->nfds+1, &rfdset, &wfdset, &xfdset,
831 timeout);
Benny Prijono9033e312005-11-21 02:08:39 +0000832
Benny Prijono34b00742008-03-13 21:51:51 +0000833 if (count == 0)
834 return 0;
835 else if (count < 0)
Benny Prijono37e8d332006-01-20 21:03:36 +0000836 return -pj_get_netos_error();
Benny Prijono9033e312005-11-21 02:08:39 +0000837 else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
838 count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;
839
840 /* Scan descriptor sets for event and add the events in the event
841 * array to be processed later in this function. We do this so that
842 * events can be processed in parallel without holding ioqueue lock.
843 */
844 pj_lock_acquire(ioqueue->lock);
845
846 counter = 0;
847
848 /* Scan for writable sockets first to handle piggy-back data
849 * coming with accept().
850 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000851 h = ioqueue->active_list.next;
852 for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) {
853
Benny Prijono9033e312005-11-21 02:08:39 +0000854 if ( (key_has_pending_write(h) || key_has_pending_connect(h))
Benny Prijono3059eb62006-10-04 20:46:27 +0000855 && PJ_FD_ISSET(h->fd, &wfdset) && !IS_CLOSING(h))
Benny Prijono9033e312005-11-21 02:08:39 +0000856 {
Benny Prijono5accbd02006-03-30 16:32:18 +0000857#if PJ_IOQUEUE_HAS_SAFE_UNREG
858 increment_counter(h);
859#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000860 event[counter].key = h;
861 event[counter].event_type = WRITEABLE_EVENT;
862 ++counter;
863 }
864
865 /* Scan for readable socket. */
866 if ((key_has_pending_read(h) || key_has_pending_accept(h))
Benny Prijonoccf3e242009-03-26 11:16:06 +0000867 && PJ_FD_ISSET(h->fd, &rfdset) && !IS_CLOSING(h) &&
868 counter<count)
Benny Prijono9033e312005-11-21 02:08:39 +0000869 {
Benny Prijono5accbd02006-03-30 16:32:18 +0000870#if PJ_IOQUEUE_HAS_SAFE_UNREG
871 increment_counter(h);
872#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000873 event[counter].key = h;
874 event[counter].event_type = READABLE_EVENT;
875 ++counter;
876 }
877
878#if PJ_HAS_TCP
Benny Prijono5accbd02006-03-30 16:32:18 +0000879 if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) &&
Benny Prijonoccf3e242009-03-26 11:16:06 +0000880 !IS_CLOSING(h) && counter<count)
Benny Prijono5accbd02006-03-30 16:32:18 +0000881 {
882#if PJ_IOQUEUE_HAS_SAFE_UNREG
883 increment_counter(h);
884#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000885 event[counter].key = h;
886 event[counter].event_type = EXCEPTION_EVENT;
887 ++counter;
888 }
889#endif
890 }
891
892 pj_lock_release(ioqueue->lock);
893
894 count = counter;
895
896 /* Now process all events. The dispatch functions will take care
897 * of locking in each of the key
898 */
899 for (counter=0; counter<count; ++counter) {
900 switch (event[counter].event_type) {
901 case READABLE_EVENT:
902 ioqueue_dispatch_read_event(ioqueue, event[counter].key);
903 break;
904 case WRITEABLE_EVENT:
905 ioqueue_dispatch_write_event(ioqueue, event[counter].key);
906 break;
907 case EXCEPTION_EVENT:
908 ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
909 break;
910 case NO_EVENT:
911 pj_assert(!"Invalid event!");
912 break;
913 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000914
915#if PJ_IOQUEUE_HAS_SAFE_UNREG
916 decrement_counter(event[counter].key);
917#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000918 }
919
Benny Prijono5accbd02006-03-30 16:32:18 +0000920
Benny Prijono9033e312005-11-21 02:08:39 +0000921 return count;
922}
923