blob: 37314bf240a5a69b874e97a5c82b819b9dd1d80e [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
Benny Prijono32177c02008-06-20 22:44:47 +00003 * Copyright (C)2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono9033e312005-11-21 02:08:39 +00004 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19
20/*
21 * sock_select.c
22 *
23 * This is the implementation of IOQueue using pj_sock_select().
24 * It runs anywhere where pj_sock_select() is available (currently
25 * Win32, Linux, Linux kernel, etc.).
26 */
27
28#include <pj/ioqueue.h>
29#include <pj/os.h>
30#include <pj/lock.h>
31#include <pj/log.h>
32#include <pj/list.h>
33#include <pj/pool.h>
34#include <pj/string.h>
35#include <pj/assert.h>
36#include <pj/sock.h>
37#include <pj/compat/socket.h>
38#include <pj/sock_select.h>
39#include <pj/errno.h>
40
Benny Prijono40fe9082008-02-08 15:21:41 +000041/* Now that we have access to OS'es <sys/select>, lets check again that
42 * PJ_IOQUEUE_MAX_HANDLES is not greater than FD_SETSIZE
43 */
44#if PJ_IOQUEUE_MAX_HANDLES > FD_SETSIZE
45# error "PJ_IOQUEUE_MAX_HANDLES cannot be greater than FD_SETSIZE"
46#endif
47
48
Benny Prijono9033e312005-11-21 02:08:39 +000049/*
50 * Include declaration from common abstraction.
51 */
52#include "ioqueue_common_abs.h"
53
54/*
55 * ISSUES with ioqueue_select()
56 *
57 * EAGAIN/EWOULDBLOCK error in recv():
58 * - when multiple threads are working with the ioqueue, application
59 * may receive EAGAIN or EWOULDBLOCK in the receive callback.
60 * This error happens because more than one thread is watching for
61 * the same descriptor set, so when all of them call recv() or recvfrom()
62 * simultaneously, only one will succeed and the rest will get the error.
63 *
64 */
65#define THIS_FILE "ioq_select"
66
67/*
68 * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
69 * the correct error code.
70 */
71#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
72# error "Error reporting must be enabled for this function to work!"
73#endif
74
Benny Prijono9033e312005-11-21 02:08:39 +000075/*
76 * During debugging build, VALIDATE_FD_SET is set.
77 * This will check the validity of the fd_sets.
78 */
79/*
80#if defined(PJ_DEBUG) && PJ_DEBUG != 0
81# define VALIDATE_FD_SET 1
82#else
83# define VALIDATE_FD_SET 0
84#endif
85*/
86#define VALIDATE_FD_SET 0
87
Benny Prijono42c5b9e2006-05-10 19:24:40 +000088#if 0
89# define TRACE__(args) PJ_LOG(3,args)
90#else
91# define TRACE__(args)
92#endif
93
Benny Prijono9033e312005-11-21 02:08:39 +000094/*
95 * This describes each key.
96 */
97struct pj_ioqueue_key_t
98{
99 DECLARE_COMMON_KEY
100};
101
102/*
103 * This describes the I/O queue itself.
104 */
105struct pj_ioqueue_t
106{
107 DECLARE_COMMON_IOQUEUE
108
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000109 unsigned max, count; /* Max and current key count */
110 int nfds; /* The largest fd value (for select)*/
111 pj_ioqueue_key_t active_list; /* List of active keys. */
Benny Prijono9033e312005-11-21 02:08:39 +0000112 pj_fd_set_t rfdset;
113 pj_fd_set_t wfdset;
114#if PJ_HAS_TCP
115 pj_fd_set_t xfdset;
116#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000117
118#if PJ_IOQUEUE_HAS_SAFE_UNREG
119 pj_mutex_t *ref_cnt_mutex;
120 pj_ioqueue_key_t closing_list;
121 pj_ioqueue_key_t free_list;
122#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000123};
124
125/* Include implementation for common abstraction after we declare
126 * pj_ioqueue_key_t and pj_ioqueue_t.
127 */
128#include "ioqueue_common_abs.c"
129
Benny Prijonocde71422007-09-19 12:03:28 +0000130#if PJ_IOQUEUE_HAS_SAFE_UNREG
131/* Scan closing keys to be put to free list again */
132static void scan_closing_keys(pj_ioqueue_t *ioqueue);
133#endif
134
Benny Prijono9033e312005-11-21 02:08:39 +0000135/*
136 * pj_ioqueue_name()
137 */
138PJ_DEF(const char*) pj_ioqueue_name(void)
139{
140 return "select";
141}
142
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000143/*
144 * Scan the socket descriptor sets for the largest descriptor.
145 * This value is needed by select().
146 */
147#if defined(PJ_SELECT_NEEDS_NFDS) && PJ_SELECT_NEEDS_NFDS!=0
148static void rescan_fdset(pj_ioqueue_t *ioqueue)
149{
150 pj_ioqueue_key_t *key = ioqueue->active_list.next;
151 int max = 0;
152
153 while (key != &ioqueue->active_list) {
154 if (key->fd > max)
155 max = key->fd;
156 key = key->next;
157 }
158
159 ioqueue->nfds = max;
160}
161#else
162static void rescan_fdset(pj_ioqueue_t *ioqueue)
163{
164 ioqueue->nfds = FD_SETSIZE-1;
165}
166#endif
167
168
Benny Prijono9033e312005-11-21 02:08:39 +0000169/*
170 * pj_ioqueue_create()
171 *
172 * Create select ioqueue.
173 */
174PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
175 pj_size_t max_fd,
176 pj_ioqueue_t **p_ioqueue)
177{
178 pj_ioqueue_t *ioqueue;
179 pj_lock_t *lock;
Benny Prijono5accbd02006-03-30 16:32:18 +0000180 unsigned i;
Benny Prijono9033e312005-11-21 02:08:39 +0000181 pj_status_t rc;
182
183 /* Check that arguments are valid. */
184 PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
185 max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,
186 PJ_EINVAL);
187
188 /* Check that size of pj_ioqueue_op_key_t is sufficient */
189 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
190 sizeof(union operation_key), PJ_EBUG);
191
Benny Prijono5accbd02006-03-30 16:32:18 +0000192 /* Create and init common ioqueue stuffs */
Benny Prijonoa1e69682007-05-11 15:14:34 +0000193 ioqueue = PJ_POOL_ALLOC_T(pool, pj_ioqueue_t);
Benny Prijono9033e312005-11-21 02:08:39 +0000194 ioqueue_init(ioqueue);
195
196 ioqueue->max = max_fd;
197 ioqueue->count = 0;
198 PJ_FD_ZERO(&ioqueue->rfdset);
199 PJ_FD_ZERO(&ioqueue->wfdset);
200#if PJ_HAS_TCP
201 PJ_FD_ZERO(&ioqueue->xfdset);
202#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000203 pj_list_init(&ioqueue->active_list);
Benny Prijono9033e312005-11-21 02:08:39 +0000204
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000205 rescan_fdset(ioqueue);
206
Benny Prijono5accbd02006-03-30 16:32:18 +0000207#if PJ_IOQUEUE_HAS_SAFE_UNREG
208 /* When safe unregistration is used (the default), we pre-create
209 * all keys and put them in the free list.
210 */
211
212 /* Mutex to protect key's reference counter
213 * We don't want to use key's mutex or ioqueue's mutex because
214 * that would create deadlock situation in some cases.
215 */
216 rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex);
217 if (rc != PJ_SUCCESS)
218 return rc;
219
220
221 /* Init key list */
222 pj_list_init(&ioqueue->free_list);
223 pj_list_init(&ioqueue->closing_list);
224
225
226 /* Pre-create all keys according to max_fd */
227 for (i=0; i<max_fd; ++i) {
228 pj_ioqueue_key_t *key;
229
Benny Prijonoa1e69682007-05-11 15:14:34 +0000230 key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t);
Benny Prijono5accbd02006-03-30 16:32:18 +0000231 key->ref_count = 0;
232 rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
233 if (rc != PJ_SUCCESS) {
234 key = ioqueue->free_list.next;
235 while (key != &ioqueue->free_list) {
236 pj_mutex_destroy(key->mutex);
237 key = key->next;
238 }
239 pj_mutex_destroy(ioqueue->ref_cnt_mutex);
240 return rc;
241 }
242
243 pj_list_push_back(&ioqueue->free_list, key);
244 }
245#endif
246
247 /* Create and init ioqueue mutex */
Benny Prijono9033e312005-11-21 02:08:39 +0000248 rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
249 if (rc != PJ_SUCCESS)
250 return rc;
251
252 rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
253 if (rc != PJ_SUCCESS)
254 return rc;
255
256 PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
257
258 *p_ioqueue = ioqueue;
259 return PJ_SUCCESS;
260}
261
262/*
263 * pj_ioqueue_destroy()
264 *
265 * Destroy ioqueue.
266 */
267PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
268{
Benny Prijono5accbd02006-03-30 16:32:18 +0000269 pj_ioqueue_key_t *key;
270
Benny Prijono9033e312005-11-21 02:08:39 +0000271 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
272
273 pj_lock_acquire(ioqueue->lock);
Benny Prijono5accbd02006-03-30 16:32:18 +0000274
275#if PJ_IOQUEUE_HAS_SAFE_UNREG
276 /* Destroy reference counters */
277 key = ioqueue->active_list.next;
278 while (key != &ioqueue->active_list) {
279 pj_mutex_destroy(key->mutex);
280 key = key->next;
281 }
282
283 key = ioqueue->closing_list.next;
284 while (key != &ioqueue->closing_list) {
285 pj_mutex_destroy(key->mutex);
286 key = key->next;
287 }
288
289 key = ioqueue->free_list.next;
290 while (key != &ioqueue->free_list) {
291 pj_mutex_destroy(key->mutex);
292 key = key->next;
293 }
294
295 pj_mutex_destroy(ioqueue->ref_cnt_mutex);
296#endif
297
Benny Prijono9033e312005-11-21 02:08:39 +0000298 return ioqueue_destroy(ioqueue);
299}
300
301
302/*
303 * pj_ioqueue_register_sock()
304 *
Benny Prijono5accbd02006-03-30 16:32:18 +0000305 * Register socket handle to ioqueue.
Benny Prijono9033e312005-11-21 02:08:39 +0000306 */
307PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
308 pj_ioqueue_t *ioqueue,
309 pj_sock_t sock,
310 void *user_data,
311 const pj_ioqueue_callback *cb,
312 pj_ioqueue_key_t **p_key)
313{
314 pj_ioqueue_key_t *key = NULL;
Benny Prijonofc24e692007-01-27 18:31:51 +0000315#if defined(PJ_WIN32) && PJ_WIN32!=0 || \
316 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
317 u_long value;
318#else
Benny Prijono9033e312005-11-21 02:08:39 +0000319 pj_uint32_t value;
Benny Prijonofc24e692007-01-27 18:31:51 +0000320#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000321 pj_status_t rc = PJ_SUCCESS;
322
323 PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
324 cb && p_key, PJ_EINVAL);
325
326 pj_lock_acquire(ioqueue->lock);
327
328 if (ioqueue->count >= ioqueue->max) {
329 rc = PJ_ETOOMANY;
330 goto on_return;
331 }
332
Benny Prijono5accbd02006-03-30 16:32:18 +0000333 /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get
334 * the key from the free list. Otherwise allocate a new one.
335 */
336#if PJ_IOQUEUE_HAS_SAFE_UNREG
Benny Prijonocde71422007-09-19 12:03:28 +0000337
338 /* Scan closing_keys first to let them come back to free_list */
339 scan_closing_keys(ioqueue);
340
Benny Prijono5accbd02006-03-30 16:32:18 +0000341 pj_assert(!pj_list_empty(&ioqueue->free_list));
342 if (pj_list_empty(&ioqueue->free_list)) {
343 rc = PJ_ETOOMANY;
344 goto on_return;
345 }
346
347 key = ioqueue->free_list.next;
348 pj_list_erase(key);
349#else
350 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
351#endif
352
353 rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
354 if (rc != PJ_SUCCESS) {
355 key = NULL;
356 goto on_return;
357 }
358
Benny Prijono9033e312005-11-21 02:08:39 +0000359 /* Set socket to nonblocking. */
360 value = 1;
Benny Prijono9cf138e2006-01-19 03:58:29 +0000361#if defined(PJ_WIN32) && PJ_WIN32!=0 || \
362 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
Benny Prijonofc24e692007-01-27 18:31:51 +0000363 if (ioctlsocket(sock, FIONBIO, &value)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000364#else
365 if (ioctl(sock, FIONBIO, &value)) {
366#endif
367 rc = pj_get_netos_error();
368 goto on_return;
369 }
370
Benny Prijono9033e312005-11-21 02:08:39 +0000371
Benny Prijono5accbd02006-03-30 16:32:18 +0000372 /* Put in active list. */
373 pj_list_insert_before(&ioqueue->active_list, key);
Benny Prijono9033e312005-11-21 02:08:39 +0000374 ++ioqueue->count;
375
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000376 /* Rescan fdset to get max descriptor */
377 rescan_fdset(ioqueue);
378
Benny Prijono9033e312005-11-21 02:08:39 +0000379on_return:
380 /* On error, socket may be left in non-blocking mode. */
381 *p_key = key;
382 pj_lock_release(ioqueue->lock);
383
384 return rc;
385}
386
Benny Prijono5accbd02006-03-30 16:32:18 +0000387#if PJ_IOQUEUE_HAS_SAFE_UNREG
388/* Increment key's reference counter */
389static void increment_counter(pj_ioqueue_key_t *key)
390{
391 pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
392 ++key->ref_count;
393 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
394}
395
396/* Decrement the key's reference counter, and when the counter reach zero,
397 * destroy the key.
398 *
399 * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK.
400 */
401static void decrement_counter(pj_ioqueue_key_t *key)
402{
Benny Prijono324409e2007-10-31 07:53:17 +0000403 pj_lock_acquire(key->ioqueue->lock);
Benny Prijono5accbd02006-03-30 16:32:18 +0000404 pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
405 --key->ref_count;
406 if (key->ref_count == 0) {
407
408 pj_assert(key->closing == 1);
409 pj_gettimeofday(&key->free_time);
410 key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
411 pj_time_val_normalize(&key->free_time);
412
Benny Prijono5accbd02006-03-30 16:32:18 +0000413 pj_list_erase(key);
414 pj_list_push_back(&key->ioqueue->closing_list, key);
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000415 /* Rescan fdset to get max descriptor */
416 rescan_fdset(key->ioqueue);
Benny Prijono5accbd02006-03-30 16:32:18 +0000417 }
418 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
Benny Prijono324409e2007-10-31 07:53:17 +0000419 pj_lock_release(key->ioqueue->lock);
Benny Prijono5accbd02006-03-30 16:32:18 +0000420}
421#endif
422
423
Benny Prijono9033e312005-11-21 02:08:39 +0000424/*
425 * pj_ioqueue_unregister()
426 *
427 * Unregister handle from ioqueue.
428 */
429PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
430{
431 pj_ioqueue_t *ioqueue;
432
433 PJ_ASSERT_RETURN(key, PJ_EINVAL);
434
435 ioqueue = key->ioqueue;
436
Benny Prijono5accbd02006-03-30 16:32:18 +0000437 /* Lock the key to make sure no callback is simultaneously modifying
438 * the key. We need to lock the key before ioqueue here to prevent
439 * deadlock.
440 */
441 pj_mutex_lock(key->mutex);
442
443 /* Also lock ioqueue */
Benny Prijono9033e312005-11-21 02:08:39 +0000444 pj_lock_acquire(ioqueue->lock);
445
446 pj_assert(ioqueue->count > 0);
447 --ioqueue->count;
Benny Prijono9969d182008-04-02 18:36:35 +0000448#if !PJ_IOQUEUE_HAS_SAFE_UNREG
449 /* Ticket #520, key will be erased more than once */
Benny Prijono9033e312005-11-21 02:08:39 +0000450 pj_list_erase(key);
Benny Prijono9969d182008-04-02 18:36:35 +0000451#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000452 PJ_FD_CLR(key->fd, &ioqueue->rfdset);
453 PJ_FD_CLR(key->fd, &ioqueue->wfdset);
454#if PJ_HAS_TCP
455 PJ_FD_CLR(key->fd, &ioqueue->xfdset);
456#endif
457
Benny Prijono5accbd02006-03-30 16:32:18 +0000458 /* Close socket. */
459 pj_sock_close(key->fd);
460
461 /* Clear callback */
462 key->cb.on_accept_complete = NULL;
463 key->cb.on_connect_complete = NULL;
464 key->cb.on_read_complete = NULL;
465 key->cb.on_write_complete = NULL;
466
467 /* Must release ioqueue lock first before decrementing counter, to
468 * prevent deadlock.
Benny Prijono9033e312005-11-21 02:08:39 +0000469 */
470 pj_lock_release(ioqueue->lock);
471
Benny Prijono5accbd02006-03-30 16:32:18 +0000472#if PJ_IOQUEUE_HAS_SAFE_UNREG
473 /* Mark key is closing. */
474 key->closing = 1;
475
476 /* Decrement counter. */
477 decrement_counter(key);
478
479 /* Done. */
480 pj_mutex_unlock(key->mutex);
481#else
482 pj_mutex_destroy(key->mutex);
483#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000484
485 return PJ_SUCCESS;
486}
487
488
489/* This supposed to check whether the fd_set values are consistent
490 * with the operation currently set in each key.
491 */
492#if VALIDATE_FD_SET
493static void validate_sets(const pj_ioqueue_t *ioqueue,
494 const pj_fd_set_t *rfdset,
495 const pj_fd_set_t *wfdset,
496 const pj_fd_set_t *xfdset)
497{
498 pj_ioqueue_key_t *key;
499
500 /*
501 * This basicly would not work anymore.
502 * We need to lock key before performing the check, but we can't do
503 * so because we're holding ioqueue mutex. If we acquire key's mutex
504 * now, the will cause deadlock.
505 */
506 pj_assert(0);
507
Benny Prijono5accbd02006-03-30 16:32:18 +0000508 key = ioqueue->active_list.next;
509 while (key != &ioqueue->active_list) {
Benny Prijono9033e312005-11-21 02:08:39 +0000510 if (!pj_list_empty(&key->read_list)
511#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
512 || !pj_list_empty(&key->accept_list)
513#endif
514 )
515 {
516 pj_assert(PJ_FD_ISSET(key->fd, rfdset));
517 }
518 else {
519 pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
520 }
521 if (!pj_list_empty(&key->write_list)
522#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
523 || key->connecting
524#endif
525 )
526 {
527 pj_assert(PJ_FD_ISSET(key->fd, wfdset));
528 }
529 else {
530 pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
531 }
532#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
533 if (key->connecting)
534 {
535 pj_assert(PJ_FD_ISSET(key->fd, xfdset));
536 }
537 else {
538 pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0);
539 }
540#endif /* PJ_HAS_TCP */
541
542 key = key->next;
543 }
544}
545#endif /* VALIDATE_FD_SET */
546
547
548/* ioqueue_remove_from_set()
549 * This function is called from ioqueue_dispatch_event() to instruct
550 * the ioqueue to remove the specified descriptor from ioqueue's descriptor
551 * set for the specified event.
552 */
553static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
Benny Prijono63ab3562006-07-08 19:46:43 +0000554 pj_ioqueue_key_t *key,
Benny Prijono9033e312005-11-21 02:08:39 +0000555 enum ioqueue_event_type event_type)
556{
557 pj_lock_acquire(ioqueue->lock);
558
559 if (event_type == READABLE_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000560 PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->rfdset);
Benny Prijono9033e312005-11-21 02:08:39 +0000561 else if (event_type == WRITEABLE_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000562 PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->wfdset);
Benny Prijono3569c0d2007-04-06 10:29:20 +0000563#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
Benny Prijono9033e312005-11-21 02:08:39 +0000564 else if (event_type == EXCEPTION_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000565 PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->xfdset);
Benny Prijono3569c0d2007-04-06 10:29:20 +0000566#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000567 else
568 pj_assert(0);
569
570 pj_lock_release(ioqueue->lock);
571}
572
573/*
574 * ioqueue_add_to_set()
575 * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
576 * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
577 * set for the specified event.
578 */
579static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
Benny Prijono63ab3562006-07-08 19:46:43 +0000580 pj_ioqueue_key_t *key,
Benny Prijono9033e312005-11-21 02:08:39 +0000581 enum ioqueue_event_type event_type )
582{
583 pj_lock_acquire(ioqueue->lock);
584
585 if (event_type == READABLE_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000586 PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->rfdset);
Benny Prijono9033e312005-11-21 02:08:39 +0000587 else if (event_type == WRITEABLE_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000588 PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->wfdset);
Benny Prijono3569c0d2007-04-06 10:29:20 +0000589#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
Benny Prijono9033e312005-11-21 02:08:39 +0000590 else if (event_type == EXCEPTION_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000591 PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->xfdset);
Benny Prijono3569c0d2007-04-06 10:29:20 +0000592#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000593 else
594 pj_assert(0);
595
596 pj_lock_release(ioqueue->lock);
597}
598
Benny Prijono5accbd02006-03-30 16:32:18 +0000599#if PJ_IOQUEUE_HAS_SAFE_UNREG
600/* Scan closing keys to be put to free list again */
601static void scan_closing_keys(pj_ioqueue_t *ioqueue)
602{
603 pj_time_val now;
604 pj_ioqueue_key_t *h;
605
606 pj_gettimeofday(&now);
607 h = ioqueue->closing_list.next;
608 while (h != &ioqueue->closing_list) {
609 pj_ioqueue_key_t *next = h->next;
610
611 pj_assert(h->closing != 0);
612
613 if (PJ_TIME_VAL_GTE(now, h->free_time)) {
614 pj_list_erase(h);
615 pj_list_push_back(&ioqueue->free_list, h);
616 }
617 h = next;
618 }
619}
620#endif
621
622
Benny Prijono9033e312005-11-21 02:08:39 +0000623/*
624 * pj_ioqueue_poll()
625 *
626 * Few things worth written:
627 *
628 * - we used to do only one callback called per poll, but it didn't go
629 * very well. The reason is because on some situation, the write
630 * callback gets called all the time, thus doesn't give the read
631 * callback to get called. This happens, for example, when user
632 * submit write operation inside the write callback.
633 * As the result, we changed the behaviour so that now multiple
634 * callbacks are called in a single poll. It should be fast too,
635 * just that we need to be carefull with the ioqueue data structs.
636 *
637 * - to guarantee preemptiveness etc, the poll function must strictly
638 * work on fd_set copy of the ioqueue (not the original one).
639 */
640PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
641{
642 pj_fd_set_t rfdset, wfdset, xfdset;
643 int count, counter;
644 pj_ioqueue_key_t *h;
645 struct event
646 {
647 pj_ioqueue_key_t *key;
648 enum ioqueue_event_type event_type;
649 } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
650
Benny Prijono37e8d332006-01-20 21:03:36 +0000651 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
Benny Prijono9033e312005-11-21 02:08:39 +0000652
653 /* Lock ioqueue before making fd_set copies */
654 pj_lock_acquire(ioqueue->lock);
655
656 /* We will only do select() when there are sockets to be polled.
657 * Otherwise select() will return error.
658 */
659 if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
Benny Prijono3569c0d2007-04-06 10:29:20 +0000660 PJ_FD_COUNT(&ioqueue->wfdset)==0
661#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
662 && PJ_FD_COUNT(&ioqueue->xfdset)==0
663#endif
664 )
Benny Prijono9033e312005-11-21 02:08:39 +0000665 {
Benny Prijono5accbd02006-03-30 16:32:18 +0000666#if PJ_IOQUEUE_HAS_SAFE_UNREG
667 scan_closing_keys(ioqueue);
668#endif
669 pj_lock_release(ioqueue->lock);
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000670 TRACE__((THIS_FILE, " poll: no fd is set"));
Benny Prijono9033e312005-11-21 02:08:39 +0000671 if (timeout)
672 pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
673 return 0;
674 }
675
676 /* Copy ioqueue's pj_fd_set_t to local variables. */
677 pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));
678 pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));
679#if PJ_HAS_TCP
680 pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));
681#else
682 PJ_FD_ZERO(&xfdset);
683#endif
684
685#if VALIDATE_FD_SET
686 validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);
687#endif
688
689 /* Unlock ioqueue before select(). */
690 pj_lock_release(ioqueue->lock);
691
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000692 count = pj_sock_select(ioqueue->nfds+1, &rfdset, &wfdset, &xfdset,
693 timeout);
Benny Prijono9033e312005-11-21 02:08:39 +0000694
Benny Prijono34b00742008-03-13 21:51:51 +0000695 if (count == 0)
696 return 0;
697 else if (count < 0)
Benny Prijono37e8d332006-01-20 21:03:36 +0000698 return -pj_get_netos_error();
Benny Prijono9033e312005-11-21 02:08:39 +0000699 else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
700 count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;
701
702 /* Scan descriptor sets for event and add the events in the event
703 * array to be processed later in this function. We do this so that
704 * events can be processed in parallel without holding ioqueue lock.
705 */
706 pj_lock_acquire(ioqueue->lock);
707
708 counter = 0;
709
710 /* Scan for writable sockets first to handle piggy-back data
711 * coming with accept().
712 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000713 h = ioqueue->active_list.next;
714 for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) {
715
Benny Prijono9033e312005-11-21 02:08:39 +0000716 if ( (key_has_pending_write(h) || key_has_pending_connect(h))
Benny Prijono3059eb62006-10-04 20:46:27 +0000717 && PJ_FD_ISSET(h->fd, &wfdset) && !IS_CLOSING(h))
Benny Prijono9033e312005-11-21 02:08:39 +0000718 {
Benny Prijono5accbd02006-03-30 16:32:18 +0000719#if PJ_IOQUEUE_HAS_SAFE_UNREG
720 increment_counter(h);
721#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000722 event[counter].key = h;
723 event[counter].event_type = WRITEABLE_EVENT;
724 ++counter;
725 }
726
727 /* Scan for readable socket. */
728 if ((key_has_pending_read(h) || key_has_pending_accept(h))
Benny Prijono3059eb62006-10-04 20:46:27 +0000729 && PJ_FD_ISSET(h->fd, &rfdset) && !IS_CLOSING(h))
Benny Prijono9033e312005-11-21 02:08:39 +0000730 {
Benny Prijono5accbd02006-03-30 16:32:18 +0000731#if PJ_IOQUEUE_HAS_SAFE_UNREG
732 increment_counter(h);
733#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000734 event[counter].key = h;
735 event[counter].event_type = READABLE_EVENT;
736 ++counter;
737 }
738
739#if PJ_HAS_TCP
Benny Prijono5accbd02006-03-30 16:32:18 +0000740 if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) &&
Benny Prijono3059eb62006-10-04 20:46:27 +0000741 !IS_CLOSING(h))
Benny Prijono5accbd02006-03-30 16:32:18 +0000742 {
743#if PJ_IOQUEUE_HAS_SAFE_UNREG
744 increment_counter(h);
745#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000746 event[counter].key = h;
747 event[counter].event_type = EXCEPTION_EVENT;
748 ++counter;
749 }
750#endif
751 }
752
753 pj_lock_release(ioqueue->lock);
754
755 count = counter;
756
757 /* Now process all events. The dispatch functions will take care
758 * of locking in each of the key
759 */
760 for (counter=0; counter<count; ++counter) {
761 switch (event[counter].event_type) {
762 case READABLE_EVENT:
763 ioqueue_dispatch_read_event(ioqueue, event[counter].key);
764 break;
765 case WRITEABLE_EVENT:
766 ioqueue_dispatch_write_event(ioqueue, event[counter].key);
767 break;
768 case EXCEPTION_EVENT:
769 ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
770 break;
771 case NO_EVENT:
772 pj_assert(!"Invalid event!");
773 break;
774 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000775
776#if PJ_IOQUEUE_HAS_SAFE_UNREG
777 decrement_counter(event[counter].key);
778#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000779 }
780
Benny Prijono5accbd02006-03-30 16:32:18 +0000781
Benny Prijono9033e312005-11-21 02:08:39 +0000782 return count;
783}
784