blob: 4aa4f910275e90bbb9ec5ac07a3cc9990eb012ba [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
3 * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19
20/*
21 * sock_select.c
22 *
23 * This is the implementation of IOQueue using pj_sock_select().
24 * It runs anywhere where pj_sock_select() is available (currently
25 * Win32, Linux, Linux kernel, etc.).
26 */
27
28#include <pj/ioqueue.h>
29#include <pj/os.h>
30#include <pj/lock.h>
31#include <pj/log.h>
32#include <pj/list.h>
33#include <pj/pool.h>
34#include <pj/string.h>
35#include <pj/assert.h>
36#include <pj/sock.h>
37#include <pj/compat/socket.h>
38#include <pj/sock_select.h>
39#include <pj/errno.h>
40
41/*
42 * Include declaration from common abstraction.
43 */
44#include "ioqueue_common_abs.h"
45
46/*
47 * ISSUES with ioqueue_select()
48 *
49 * EAGAIN/EWOULDBLOCK error in recv():
50 * - when multiple threads are working with the ioqueue, application
51 * may receive EAGAIN or EWOULDBLOCK in the receive callback.
52 * This error happens because more than one thread is watching for
53 * the same descriptor set, so when all of them call recv() or recvfrom()
54 * simultaneously, only one will succeed and the rest will get the error.
55 *
56 */
57#define THIS_FILE "ioq_select"
58
59/*
60 * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
61 * the correct error code.
62 */
63#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
64# error "Error reporting must be enabled for this function to work!"
65#endif
66
67/**
68 * Get the number of descriptors in the set. This is defined in sock_select.c
69 * This function will only return the number of sockets set from PJ_FD_SET
70 * operation. When the set is modified by other means (such as by select()),
71 * the count will not be reflected here.
72 *
73 * That's why don't export this function in the header file, to avoid
74 * misunderstanding.
75 *
76 * @param fdsetp The descriptor set.
77 *
78 * @return Number of descriptors in the set.
79 */
80PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
81
82
83/*
84 * During debugging build, VALIDATE_FD_SET is set.
85 * This will check the validity of the fd_sets.
86 */
87/*
88#if defined(PJ_DEBUG) && PJ_DEBUG != 0
89# define VALIDATE_FD_SET 1
90#else
91# define VALIDATE_FD_SET 0
92#endif
93*/
94#define VALIDATE_FD_SET 0
95
96/*
97 * This describes each key.
98 */
99struct pj_ioqueue_key_t
100{
101 DECLARE_COMMON_KEY
102};
103
104/*
105 * This describes the I/O queue itself.
106 */
107struct pj_ioqueue_t
108{
109 DECLARE_COMMON_IOQUEUE
110
111 unsigned max, count;
Benny Prijono5accbd02006-03-30 16:32:18 +0000112 pj_ioqueue_key_t active_list;
Benny Prijono9033e312005-11-21 02:08:39 +0000113 pj_fd_set_t rfdset;
114 pj_fd_set_t wfdset;
115#if PJ_HAS_TCP
116 pj_fd_set_t xfdset;
117#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000118
119#if PJ_IOQUEUE_HAS_SAFE_UNREG
120 pj_mutex_t *ref_cnt_mutex;
121 pj_ioqueue_key_t closing_list;
122 pj_ioqueue_key_t free_list;
123#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000124};
125
126/* Include implementation for common abstraction after we declare
127 * pj_ioqueue_key_t and pj_ioqueue_t.
128 */
129#include "ioqueue_common_abs.c"
130
131/*
132 * pj_ioqueue_name()
133 */
134PJ_DEF(const char*) pj_ioqueue_name(void)
135{
136 return "select";
137}
138
139/*
140 * pj_ioqueue_create()
141 *
142 * Create select ioqueue.
143 */
144PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
145 pj_size_t max_fd,
146 pj_ioqueue_t **p_ioqueue)
147{
148 pj_ioqueue_t *ioqueue;
149 pj_lock_t *lock;
Benny Prijono5accbd02006-03-30 16:32:18 +0000150 unsigned i;
Benny Prijono9033e312005-11-21 02:08:39 +0000151 pj_status_t rc;
152
153 /* Check that arguments are valid. */
154 PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
155 max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,
156 PJ_EINVAL);
157
158 /* Check that size of pj_ioqueue_op_key_t is sufficient */
159 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
160 sizeof(union operation_key), PJ_EBUG);
161
Benny Prijono5accbd02006-03-30 16:32:18 +0000162 /* Create and init common ioqueue stuffs */
Benny Prijono9033e312005-11-21 02:08:39 +0000163 ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
Benny Prijono9033e312005-11-21 02:08:39 +0000164 ioqueue_init(ioqueue);
165
166 ioqueue->max = max_fd;
167 ioqueue->count = 0;
168 PJ_FD_ZERO(&ioqueue->rfdset);
169 PJ_FD_ZERO(&ioqueue->wfdset);
170#if PJ_HAS_TCP
171 PJ_FD_ZERO(&ioqueue->xfdset);
172#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000173 pj_list_init(&ioqueue->active_list);
Benny Prijono9033e312005-11-21 02:08:39 +0000174
Benny Prijono5accbd02006-03-30 16:32:18 +0000175#if PJ_IOQUEUE_HAS_SAFE_UNREG
176 /* When safe unregistration is used (the default), we pre-create
177 * all keys and put them in the free list.
178 */
179
180 /* Mutex to protect key's reference counter
181 * We don't want to use key's mutex or ioqueue's mutex because
182 * that would create deadlock situation in some cases.
183 */
184 rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex);
185 if (rc != PJ_SUCCESS)
186 return rc;
187
188
189 /* Init key list */
190 pj_list_init(&ioqueue->free_list);
191 pj_list_init(&ioqueue->closing_list);
192
193
194 /* Pre-create all keys according to max_fd */
195 for (i=0; i<max_fd; ++i) {
196 pj_ioqueue_key_t *key;
197
198 key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t));
199 key->ref_count = 0;
200 rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
201 if (rc != PJ_SUCCESS) {
202 key = ioqueue->free_list.next;
203 while (key != &ioqueue->free_list) {
204 pj_mutex_destroy(key->mutex);
205 key = key->next;
206 }
207 pj_mutex_destroy(ioqueue->ref_cnt_mutex);
208 return rc;
209 }
210
211 pj_list_push_back(&ioqueue->free_list, key);
212 }
213#endif
214
215 /* Create and init ioqueue mutex */
Benny Prijono9033e312005-11-21 02:08:39 +0000216 rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
217 if (rc != PJ_SUCCESS)
218 return rc;
219
220 rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
221 if (rc != PJ_SUCCESS)
222 return rc;
223
224 PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
225
226 *p_ioqueue = ioqueue;
227 return PJ_SUCCESS;
228}
229
230/*
231 * pj_ioqueue_destroy()
232 *
233 * Destroy ioqueue.
234 */
235PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
236{
Benny Prijono5accbd02006-03-30 16:32:18 +0000237 pj_ioqueue_key_t *key;
238
Benny Prijono9033e312005-11-21 02:08:39 +0000239 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
240
241 pj_lock_acquire(ioqueue->lock);
Benny Prijono5accbd02006-03-30 16:32:18 +0000242
243#if PJ_IOQUEUE_HAS_SAFE_UNREG
244 /* Destroy reference counters */
245 key = ioqueue->active_list.next;
246 while (key != &ioqueue->active_list) {
247 pj_mutex_destroy(key->mutex);
248 key = key->next;
249 }
250
251 key = ioqueue->closing_list.next;
252 while (key != &ioqueue->closing_list) {
253 pj_mutex_destroy(key->mutex);
254 key = key->next;
255 }
256
257 key = ioqueue->free_list.next;
258 while (key != &ioqueue->free_list) {
259 pj_mutex_destroy(key->mutex);
260 key = key->next;
261 }
262
263 pj_mutex_destroy(ioqueue->ref_cnt_mutex);
264#endif
265
Benny Prijono9033e312005-11-21 02:08:39 +0000266 return ioqueue_destroy(ioqueue);
267}
268
269
270/*
271 * pj_ioqueue_register_sock()
272 *
Benny Prijono5accbd02006-03-30 16:32:18 +0000273 * Register socket handle to ioqueue.
Benny Prijono9033e312005-11-21 02:08:39 +0000274 */
275PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
276 pj_ioqueue_t *ioqueue,
277 pj_sock_t sock,
278 void *user_data,
279 const pj_ioqueue_callback *cb,
280 pj_ioqueue_key_t **p_key)
281{
282 pj_ioqueue_key_t *key = NULL;
283 pj_uint32_t value;
284 pj_status_t rc = PJ_SUCCESS;
285
286 PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
287 cb && p_key, PJ_EINVAL);
288
289 pj_lock_acquire(ioqueue->lock);
290
291 if (ioqueue->count >= ioqueue->max) {
292 rc = PJ_ETOOMANY;
293 goto on_return;
294 }
295
Benny Prijono5accbd02006-03-30 16:32:18 +0000296 /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get
297 * the key from the free list. Otherwise allocate a new one.
298 */
299#if PJ_IOQUEUE_HAS_SAFE_UNREG
300 pj_assert(!pj_list_empty(&ioqueue->free_list));
301 if (pj_list_empty(&ioqueue->free_list)) {
302 rc = PJ_ETOOMANY;
303 goto on_return;
304 }
305
306 key = ioqueue->free_list.next;
307 pj_list_erase(key);
308#else
309 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
310#endif
311
312 rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
313 if (rc != PJ_SUCCESS) {
314 key = NULL;
315 goto on_return;
316 }
317
Benny Prijono9033e312005-11-21 02:08:39 +0000318 /* Set socket to nonblocking. */
319 value = 1;
Benny Prijono9cf138e2006-01-19 03:58:29 +0000320#if defined(PJ_WIN32) && PJ_WIN32!=0 || \
321 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
Benny Prijono9033e312005-11-21 02:08:39 +0000322 if (ioctlsocket(sock, FIONBIO, (u_long*)&value)) {
323#else
324 if (ioctl(sock, FIONBIO, &value)) {
325#endif
326 rc = pj_get_netos_error();
327 goto on_return;
328 }
329
Benny Prijono9033e312005-11-21 02:08:39 +0000330
Benny Prijono5accbd02006-03-30 16:32:18 +0000331 /* Put in active list. */
332 pj_list_insert_before(&ioqueue->active_list, key);
Benny Prijono9033e312005-11-21 02:08:39 +0000333 ++ioqueue->count;
334
335on_return:
336 /* On error, socket may be left in non-blocking mode. */
337 *p_key = key;
338 pj_lock_release(ioqueue->lock);
339
340 return rc;
341}
342
Benny Prijono5accbd02006-03-30 16:32:18 +0000343#if PJ_IOQUEUE_HAS_SAFE_UNREG
344/* Increment key's reference counter */
345static void increment_counter(pj_ioqueue_key_t *key)
346{
347 pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
348 ++key->ref_count;
349 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
350}
351
352/* Decrement the key's reference counter, and when the counter reach zero,
353 * destroy the key.
354 *
355 * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK.
356 */
357static void decrement_counter(pj_ioqueue_key_t *key)
358{
359 pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
360 --key->ref_count;
361 if (key->ref_count == 0) {
362
363 pj_assert(key->closing == 1);
364 pj_gettimeofday(&key->free_time);
365 key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
366 pj_time_val_normalize(&key->free_time);
367
368 pj_lock_acquire(key->ioqueue->lock);
369 pj_list_erase(key);
370 pj_list_push_back(&key->ioqueue->closing_list, key);
371 pj_lock_release(key->ioqueue->lock);
372 }
373 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
374}
375#endif
376
377
Benny Prijono9033e312005-11-21 02:08:39 +0000378/*
379 * pj_ioqueue_unregister()
380 *
381 * Unregister handle from ioqueue.
382 */
383PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
384{
385 pj_ioqueue_t *ioqueue;
386
387 PJ_ASSERT_RETURN(key, PJ_EINVAL);
388
389 ioqueue = key->ioqueue;
390
Benny Prijono5accbd02006-03-30 16:32:18 +0000391 /* Lock the key to make sure no callback is simultaneously modifying
392 * the key. We need to lock the key before ioqueue here to prevent
393 * deadlock.
394 */
395 pj_mutex_lock(key->mutex);
396
397 /* Also lock ioqueue */
Benny Prijono9033e312005-11-21 02:08:39 +0000398 pj_lock_acquire(ioqueue->lock);
399
400 pj_assert(ioqueue->count > 0);
401 --ioqueue->count;
402 pj_list_erase(key);
403 PJ_FD_CLR(key->fd, &ioqueue->rfdset);
404 PJ_FD_CLR(key->fd, &ioqueue->wfdset);
405#if PJ_HAS_TCP
406 PJ_FD_CLR(key->fd, &ioqueue->xfdset);
407#endif
408
Benny Prijono5accbd02006-03-30 16:32:18 +0000409 /* Close socket. */
410 pj_sock_close(key->fd);
411
412 /* Clear callback */
413 key->cb.on_accept_complete = NULL;
414 key->cb.on_connect_complete = NULL;
415 key->cb.on_read_complete = NULL;
416 key->cb.on_write_complete = NULL;
417
418 /* Must release ioqueue lock first before decrementing counter, to
419 * prevent deadlock.
Benny Prijono9033e312005-11-21 02:08:39 +0000420 */
421 pj_lock_release(ioqueue->lock);
422
Benny Prijono5accbd02006-03-30 16:32:18 +0000423#if PJ_IOQUEUE_HAS_SAFE_UNREG
424 /* Mark key is closing. */
425 key->closing = 1;
426
427 /* Decrement counter. */
428 decrement_counter(key);
429
430 /* Done. */
431 pj_mutex_unlock(key->mutex);
432#else
433 pj_mutex_destroy(key->mutex);
434#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000435
436 return PJ_SUCCESS;
437}
438
439
440/* This supposed to check whether the fd_set values are consistent
441 * with the operation currently set in each key.
442 */
443#if VALIDATE_FD_SET
444static void validate_sets(const pj_ioqueue_t *ioqueue,
445 const pj_fd_set_t *rfdset,
446 const pj_fd_set_t *wfdset,
447 const pj_fd_set_t *xfdset)
448{
449 pj_ioqueue_key_t *key;
450
451 /*
452 * This basicly would not work anymore.
453 * We need to lock key before performing the check, but we can't do
454 * so because we're holding ioqueue mutex. If we acquire key's mutex
455 * now, the will cause deadlock.
456 */
457 pj_assert(0);
458
Benny Prijono5accbd02006-03-30 16:32:18 +0000459 key = ioqueue->active_list.next;
460 while (key != &ioqueue->active_list) {
Benny Prijono9033e312005-11-21 02:08:39 +0000461 if (!pj_list_empty(&key->read_list)
462#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
463 || !pj_list_empty(&key->accept_list)
464#endif
465 )
466 {
467 pj_assert(PJ_FD_ISSET(key->fd, rfdset));
468 }
469 else {
470 pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
471 }
472 if (!pj_list_empty(&key->write_list)
473#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
474 || key->connecting
475#endif
476 )
477 {
478 pj_assert(PJ_FD_ISSET(key->fd, wfdset));
479 }
480 else {
481 pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
482 }
483#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
484 if (key->connecting)
485 {
486 pj_assert(PJ_FD_ISSET(key->fd, xfdset));
487 }
488 else {
489 pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0);
490 }
491#endif /* PJ_HAS_TCP */
492
493 key = key->next;
494 }
495}
496#endif /* VALIDATE_FD_SET */
497
498
499/* ioqueue_remove_from_set()
500 * This function is called from ioqueue_dispatch_event() to instruct
501 * the ioqueue to remove the specified descriptor from ioqueue's descriptor
502 * set for the specified event.
503 */
504static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
505 pj_sock_t fd,
506 enum ioqueue_event_type event_type)
507{
508 pj_lock_acquire(ioqueue->lock);
509
510 if (event_type == READABLE_EVENT)
511 PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset);
512 else if (event_type == WRITEABLE_EVENT)
513 PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset);
514 else if (event_type == EXCEPTION_EVENT)
515 PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset);
516 else
517 pj_assert(0);
518
519 pj_lock_release(ioqueue->lock);
520}
521
522/*
523 * ioqueue_add_to_set()
524 * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
525 * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
526 * set for the specified event.
527 */
528static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
529 pj_sock_t fd,
530 enum ioqueue_event_type event_type )
531{
532 pj_lock_acquire(ioqueue->lock);
533
534 if (event_type == READABLE_EVENT)
535 PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset);
536 else if (event_type == WRITEABLE_EVENT)
537 PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset);
538 else if (event_type == EXCEPTION_EVENT)
539 PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset);
540 else
541 pj_assert(0);
542
543 pj_lock_release(ioqueue->lock);
544}
545
Benny Prijono5accbd02006-03-30 16:32:18 +0000546#if PJ_IOQUEUE_HAS_SAFE_UNREG
547/* Scan closing keys to be put to free list again */
548static void scan_closing_keys(pj_ioqueue_t *ioqueue)
549{
550 pj_time_val now;
551 pj_ioqueue_key_t *h;
552
553 pj_gettimeofday(&now);
554 h = ioqueue->closing_list.next;
555 while (h != &ioqueue->closing_list) {
556 pj_ioqueue_key_t *next = h->next;
557
558 pj_assert(h->closing != 0);
559
560 if (PJ_TIME_VAL_GTE(now, h->free_time)) {
561 pj_list_erase(h);
562 pj_list_push_back(&ioqueue->free_list, h);
563 }
564 h = next;
565 }
566}
567#endif
568
569
Benny Prijono9033e312005-11-21 02:08:39 +0000570/*
571 * pj_ioqueue_poll()
572 *
573 * Few things worth written:
574 *
575 * - we used to do only one callback called per poll, but it didn't go
576 * very well. The reason is because on some situation, the write
577 * callback gets called all the time, thus doesn't give the read
578 * callback to get called. This happens, for example, when user
579 * submit write operation inside the write callback.
580 * As the result, we changed the behaviour so that now multiple
581 * callbacks are called in a single poll. It should be fast too,
582 * just that we need to be carefull with the ioqueue data structs.
583 *
584 * - to guarantee preemptiveness etc, the poll function must strictly
585 * work on fd_set copy of the ioqueue (not the original one).
586 */
587PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
588{
589 pj_fd_set_t rfdset, wfdset, xfdset;
590 int count, counter;
591 pj_ioqueue_key_t *h;
592 struct event
593 {
594 pj_ioqueue_key_t *key;
595 enum ioqueue_event_type event_type;
596 } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
597
Benny Prijono37e8d332006-01-20 21:03:36 +0000598 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
Benny Prijono9033e312005-11-21 02:08:39 +0000599
600 /* Lock ioqueue before making fd_set copies */
601 pj_lock_acquire(ioqueue->lock);
602
603 /* We will only do select() when there are sockets to be polled.
604 * Otherwise select() will return error.
605 */
606 if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
607 PJ_FD_COUNT(&ioqueue->wfdset)==0 &&
608 PJ_FD_COUNT(&ioqueue->xfdset)==0)
609 {
Benny Prijono5accbd02006-03-30 16:32:18 +0000610#if PJ_IOQUEUE_HAS_SAFE_UNREG
611 scan_closing_keys(ioqueue);
612#endif
613 pj_lock_release(ioqueue->lock);
Benny Prijono9033e312005-11-21 02:08:39 +0000614 if (timeout)
615 pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
616 return 0;
617 }
618
619 /* Copy ioqueue's pj_fd_set_t to local variables. */
620 pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));
621 pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));
622#if PJ_HAS_TCP
623 pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));
624#else
625 PJ_FD_ZERO(&xfdset);
626#endif
627
628#if VALIDATE_FD_SET
629 validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);
630#endif
631
632 /* Unlock ioqueue before select(). */
633 pj_lock_release(ioqueue->lock);
634
635 count = pj_sock_select(FD_SETSIZE, &rfdset, &wfdset, &xfdset, timeout);
636
637 if (count <= 0)
Benny Prijono37e8d332006-01-20 21:03:36 +0000638 return -pj_get_netos_error();
Benny Prijono9033e312005-11-21 02:08:39 +0000639 else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
640 count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;
641
642 /* Scan descriptor sets for event and add the events in the event
643 * array to be processed later in this function. We do this so that
644 * events can be processed in parallel without holding ioqueue lock.
645 */
646 pj_lock_acquire(ioqueue->lock);
647
648 counter = 0;
649
650 /* Scan for writable sockets first to handle piggy-back data
651 * coming with accept().
652 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000653 h = ioqueue->active_list.next;
654 for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) {
655
Benny Prijono9033e312005-11-21 02:08:39 +0000656 if ( (key_has_pending_write(h) || key_has_pending_connect(h))
Benny Prijono5accbd02006-03-30 16:32:18 +0000657 && PJ_FD_ISSET(h->fd, &wfdset) && !h->closing)
Benny Prijono9033e312005-11-21 02:08:39 +0000658 {
Benny Prijono5accbd02006-03-30 16:32:18 +0000659#if PJ_IOQUEUE_HAS_SAFE_UNREG
660 increment_counter(h);
661#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000662 event[counter].key = h;
663 event[counter].event_type = WRITEABLE_EVENT;
664 ++counter;
665 }
666
667 /* Scan for readable socket. */
668 if ((key_has_pending_read(h) || key_has_pending_accept(h))
Benny Prijono5accbd02006-03-30 16:32:18 +0000669 && PJ_FD_ISSET(h->fd, &rfdset) && !h->closing)
Benny Prijono9033e312005-11-21 02:08:39 +0000670 {
Benny Prijono5accbd02006-03-30 16:32:18 +0000671#if PJ_IOQUEUE_HAS_SAFE_UNREG
672 increment_counter(h);
673#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000674 event[counter].key = h;
675 event[counter].event_type = READABLE_EVENT;
676 ++counter;
677 }
678
679#if PJ_HAS_TCP
Benny Prijono5accbd02006-03-30 16:32:18 +0000680 if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) &&
681 !h->closing)
682 {
683#if PJ_IOQUEUE_HAS_SAFE_UNREG
684 increment_counter(h);
685#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000686 event[counter].key = h;
687 event[counter].event_type = EXCEPTION_EVENT;
688 ++counter;
689 }
690#endif
691 }
692
693 pj_lock_release(ioqueue->lock);
694
695 count = counter;
696
697 /* Now process all events. The dispatch functions will take care
698 * of locking in each of the key
699 */
700 for (counter=0; counter<count; ++counter) {
701 switch (event[counter].event_type) {
702 case READABLE_EVENT:
703 ioqueue_dispatch_read_event(ioqueue, event[counter].key);
704 break;
705 case WRITEABLE_EVENT:
706 ioqueue_dispatch_write_event(ioqueue, event[counter].key);
707 break;
708 case EXCEPTION_EVENT:
709 ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
710 break;
711 case NO_EVENT:
712 pj_assert(!"Invalid event!");
713 break;
714 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000715
716#if PJ_IOQUEUE_HAS_SAFE_UNREG
717 decrement_counter(event[counter].key);
718#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000719 }
720
Benny Prijono5accbd02006-03-30 16:32:18 +0000721
Benny Prijono9033e312005-11-21 02:08:39 +0000722 return count;
723}
724