blob: 33fcae7d61d0621ffdfa91552248ccba31187101 [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
3 * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19
20/*
21 * sock_select.c
22 *
23 * This is the implementation of IOQueue using pj_sock_select().
24 * It runs anywhere where pj_sock_select() is available (currently
25 * Win32, Linux, Linux kernel, etc.).
26 */
27
28#include <pj/ioqueue.h>
29#include <pj/os.h>
30#include <pj/lock.h>
31#include <pj/log.h>
32#include <pj/list.h>
33#include <pj/pool.h>
34#include <pj/string.h>
35#include <pj/assert.h>
36#include <pj/sock.h>
37#include <pj/compat/socket.h>
38#include <pj/sock_select.h>
39#include <pj/errno.h>
40
41/*
42 * Include declaration from common abstraction.
43 */
44#include "ioqueue_common_abs.h"
45
46/*
47 * ISSUES with ioqueue_select()
48 *
49 * EAGAIN/EWOULDBLOCK error in recv():
50 * - when multiple threads are working with the ioqueue, application
51 * may receive EAGAIN or EWOULDBLOCK in the receive callback.
52 * This error happens because more than one thread is watching for
53 * the same descriptor set, so when all of them call recv() or recvfrom()
54 * simultaneously, only one will succeed and the rest will get the error.
55 *
56 */
57#define THIS_FILE "ioq_select"
58
59/*
60 * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
61 * the correct error code.
62 */
63#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
64# error "Error reporting must be enabled for this function to work!"
65#endif
66
67/**
68 * Get the number of descriptors in the set. This is defined in sock_select.c
69 * This function will only return the number of sockets set from PJ_FD_SET
70 * operation. When the set is modified by other means (such as by select()),
71 * the count will not be reflected here.
72 *
73 * That's why don't export this function in the header file, to avoid
74 * misunderstanding.
75 *
76 * @param fdsetp The descriptor set.
77 *
78 * @return Number of descriptors in the set.
79 */
80PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
81
82
83/*
84 * During debugging build, VALIDATE_FD_SET is set.
85 * This will check the validity of the fd_sets.
86 */
87/*
88#if defined(PJ_DEBUG) && PJ_DEBUG != 0
89# define VALIDATE_FD_SET 1
90#else
91# define VALIDATE_FD_SET 0
92#endif
93*/
94#define VALIDATE_FD_SET 0
95
Benny Prijono42c5b9e2006-05-10 19:24:40 +000096#if 0
97# define TRACE__(args) PJ_LOG(3,args)
98#else
99# define TRACE__(args)
100#endif
101
Benny Prijono9033e312005-11-21 02:08:39 +0000102/*
103 * This describes each key.
104 */
105struct pj_ioqueue_key_t
106{
107 DECLARE_COMMON_KEY
108};
109
110/*
111 * This describes the I/O queue itself.
112 */
113struct pj_ioqueue_t
114{
115 DECLARE_COMMON_IOQUEUE
116
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000117 unsigned max, count; /* Max and current key count */
118 int nfds; /* The largest fd value (for select)*/
119 pj_ioqueue_key_t active_list; /* List of active keys. */
Benny Prijono9033e312005-11-21 02:08:39 +0000120 pj_fd_set_t rfdset;
121 pj_fd_set_t wfdset;
122#if PJ_HAS_TCP
123 pj_fd_set_t xfdset;
124#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000125
126#if PJ_IOQUEUE_HAS_SAFE_UNREG
127 pj_mutex_t *ref_cnt_mutex;
128 pj_ioqueue_key_t closing_list;
129 pj_ioqueue_key_t free_list;
130#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000131};
132
133/* Include implementation for common abstraction after we declare
134 * pj_ioqueue_key_t and pj_ioqueue_t.
135 */
136#include "ioqueue_common_abs.c"
137
138/*
139 * pj_ioqueue_name()
140 */
141PJ_DEF(const char*) pj_ioqueue_name(void)
142{
143 return "select";
144}
145
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000146/*
147 * Scan the socket descriptor sets for the largest descriptor.
148 * This value is needed by select().
149 */
150#if defined(PJ_SELECT_NEEDS_NFDS) && PJ_SELECT_NEEDS_NFDS!=0
151static void rescan_fdset(pj_ioqueue_t *ioqueue)
152{
153 pj_ioqueue_key_t *key = ioqueue->active_list.next;
154 int max = 0;
155
156 while (key != &ioqueue->active_list) {
157 if (key->fd > max)
158 max = key->fd;
159 key = key->next;
160 }
161
162 ioqueue->nfds = max;
163}
164#else
165static void rescan_fdset(pj_ioqueue_t *ioqueue)
166{
167 ioqueue->nfds = FD_SETSIZE-1;
168}
169#endif
170
171
Benny Prijono9033e312005-11-21 02:08:39 +0000172/*
173 * pj_ioqueue_create()
174 *
175 * Create select ioqueue.
176 */
177PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
178 pj_size_t max_fd,
179 pj_ioqueue_t **p_ioqueue)
180{
181 pj_ioqueue_t *ioqueue;
182 pj_lock_t *lock;
Benny Prijono5accbd02006-03-30 16:32:18 +0000183 unsigned i;
Benny Prijono9033e312005-11-21 02:08:39 +0000184 pj_status_t rc;
185
186 /* Check that arguments are valid. */
187 PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
188 max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,
189 PJ_EINVAL);
190
191 /* Check that size of pj_ioqueue_op_key_t is sufficient */
192 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
193 sizeof(union operation_key), PJ_EBUG);
194
Benny Prijono5accbd02006-03-30 16:32:18 +0000195 /* Create and init common ioqueue stuffs */
Benny Prijono9033e312005-11-21 02:08:39 +0000196 ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
Benny Prijono9033e312005-11-21 02:08:39 +0000197 ioqueue_init(ioqueue);
198
199 ioqueue->max = max_fd;
200 ioqueue->count = 0;
201 PJ_FD_ZERO(&ioqueue->rfdset);
202 PJ_FD_ZERO(&ioqueue->wfdset);
203#if PJ_HAS_TCP
204 PJ_FD_ZERO(&ioqueue->xfdset);
205#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000206 pj_list_init(&ioqueue->active_list);
Benny Prijono9033e312005-11-21 02:08:39 +0000207
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000208 rescan_fdset(ioqueue);
209
Benny Prijono5accbd02006-03-30 16:32:18 +0000210#if PJ_IOQUEUE_HAS_SAFE_UNREG
211 /* When safe unregistration is used (the default), we pre-create
212 * all keys and put them in the free list.
213 */
214
215 /* Mutex to protect key's reference counter
216 * We don't want to use key's mutex or ioqueue's mutex because
217 * that would create deadlock situation in some cases.
218 */
219 rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex);
220 if (rc != PJ_SUCCESS)
221 return rc;
222
223
224 /* Init key list */
225 pj_list_init(&ioqueue->free_list);
226 pj_list_init(&ioqueue->closing_list);
227
228
229 /* Pre-create all keys according to max_fd */
230 for (i=0; i<max_fd; ++i) {
231 pj_ioqueue_key_t *key;
232
233 key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t));
234 key->ref_count = 0;
235 rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
236 if (rc != PJ_SUCCESS) {
237 key = ioqueue->free_list.next;
238 while (key != &ioqueue->free_list) {
239 pj_mutex_destroy(key->mutex);
240 key = key->next;
241 }
242 pj_mutex_destroy(ioqueue->ref_cnt_mutex);
243 return rc;
244 }
245
246 pj_list_push_back(&ioqueue->free_list, key);
247 }
248#endif
249
250 /* Create and init ioqueue mutex */
Benny Prijono9033e312005-11-21 02:08:39 +0000251 rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
252 if (rc != PJ_SUCCESS)
253 return rc;
254
255 rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
256 if (rc != PJ_SUCCESS)
257 return rc;
258
259 PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
260
261 *p_ioqueue = ioqueue;
262 return PJ_SUCCESS;
263}
264
265/*
266 * pj_ioqueue_destroy()
267 *
268 * Destroy ioqueue.
269 */
270PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
271{
Benny Prijono5accbd02006-03-30 16:32:18 +0000272 pj_ioqueue_key_t *key;
273
Benny Prijono9033e312005-11-21 02:08:39 +0000274 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
275
276 pj_lock_acquire(ioqueue->lock);
Benny Prijono5accbd02006-03-30 16:32:18 +0000277
278#if PJ_IOQUEUE_HAS_SAFE_UNREG
279 /* Destroy reference counters */
280 key = ioqueue->active_list.next;
281 while (key != &ioqueue->active_list) {
282 pj_mutex_destroy(key->mutex);
283 key = key->next;
284 }
285
286 key = ioqueue->closing_list.next;
287 while (key != &ioqueue->closing_list) {
288 pj_mutex_destroy(key->mutex);
289 key = key->next;
290 }
291
292 key = ioqueue->free_list.next;
293 while (key != &ioqueue->free_list) {
294 pj_mutex_destroy(key->mutex);
295 key = key->next;
296 }
297
298 pj_mutex_destroy(ioqueue->ref_cnt_mutex);
299#endif
300
Benny Prijono9033e312005-11-21 02:08:39 +0000301 return ioqueue_destroy(ioqueue);
302}
303
304
305/*
306 * pj_ioqueue_register_sock()
307 *
Benny Prijono5accbd02006-03-30 16:32:18 +0000308 * Register socket handle to ioqueue.
Benny Prijono9033e312005-11-21 02:08:39 +0000309 */
310PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
311 pj_ioqueue_t *ioqueue,
312 pj_sock_t sock,
313 void *user_data,
314 const pj_ioqueue_callback *cb,
315 pj_ioqueue_key_t **p_key)
316{
317 pj_ioqueue_key_t *key = NULL;
318 pj_uint32_t value;
319 pj_status_t rc = PJ_SUCCESS;
320
321 PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
322 cb && p_key, PJ_EINVAL);
323
324 pj_lock_acquire(ioqueue->lock);
325
326 if (ioqueue->count >= ioqueue->max) {
327 rc = PJ_ETOOMANY;
328 goto on_return;
329 }
330
Benny Prijono5accbd02006-03-30 16:32:18 +0000331 /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get
332 * the key from the free list. Otherwise allocate a new one.
333 */
334#if PJ_IOQUEUE_HAS_SAFE_UNREG
335 pj_assert(!pj_list_empty(&ioqueue->free_list));
336 if (pj_list_empty(&ioqueue->free_list)) {
337 rc = PJ_ETOOMANY;
338 goto on_return;
339 }
340
341 key = ioqueue->free_list.next;
342 pj_list_erase(key);
343#else
344 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
345#endif
346
347 rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
348 if (rc != PJ_SUCCESS) {
349 key = NULL;
350 goto on_return;
351 }
352
Benny Prijono9033e312005-11-21 02:08:39 +0000353 /* Set socket to nonblocking. */
354 value = 1;
Benny Prijono9cf138e2006-01-19 03:58:29 +0000355#if defined(PJ_WIN32) && PJ_WIN32!=0 || \
356 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
Benny Prijono9033e312005-11-21 02:08:39 +0000357 if (ioctlsocket(sock, FIONBIO, (u_long*)&value)) {
358#else
359 if (ioctl(sock, FIONBIO, &value)) {
360#endif
361 rc = pj_get_netos_error();
362 goto on_return;
363 }
364
Benny Prijono9033e312005-11-21 02:08:39 +0000365
Benny Prijono5accbd02006-03-30 16:32:18 +0000366 /* Put in active list. */
367 pj_list_insert_before(&ioqueue->active_list, key);
Benny Prijono9033e312005-11-21 02:08:39 +0000368 ++ioqueue->count;
369
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000370 /* Rescan fdset to get max descriptor */
371 rescan_fdset(ioqueue);
372
Benny Prijono9033e312005-11-21 02:08:39 +0000373on_return:
374 /* On error, socket may be left in non-blocking mode. */
375 *p_key = key;
376 pj_lock_release(ioqueue->lock);
377
378 return rc;
379}
380
Benny Prijono5accbd02006-03-30 16:32:18 +0000381#if PJ_IOQUEUE_HAS_SAFE_UNREG
382/* Increment key's reference counter */
383static void increment_counter(pj_ioqueue_key_t *key)
384{
385 pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
386 ++key->ref_count;
387 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
388}
389
390/* Decrement the key's reference counter, and when the counter reach zero,
391 * destroy the key.
392 *
393 * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK.
394 */
395static void decrement_counter(pj_ioqueue_key_t *key)
396{
397 pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
398 --key->ref_count;
399 if (key->ref_count == 0) {
400
401 pj_assert(key->closing == 1);
402 pj_gettimeofday(&key->free_time);
403 key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
404 pj_time_val_normalize(&key->free_time);
405
406 pj_lock_acquire(key->ioqueue->lock);
407 pj_list_erase(key);
408 pj_list_push_back(&key->ioqueue->closing_list, key);
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000409 /* Rescan fdset to get max descriptor */
410 rescan_fdset(key->ioqueue);
Benny Prijono5accbd02006-03-30 16:32:18 +0000411 pj_lock_release(key->ioqueue->lock);
412 }
413 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
414}
415#endif
416
417
Benny Prijono9033e312005-11-21 02:08:39 +0000418/*
419 * pj_ioqueue_unregister()
420 *
421 * Unregister handle from ioqueue.
422 */
423PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
424{
425 pj_ioqueue_t *ioqueue;
426
427 PJ_ASSERT_RETURN(key, PJ_EINVAL);
428
429 ioqueue = key->ioqueue;
430
Benny Prijono5accbd02006-03-30 16:32:18 +0000431 /* Lock the key to make sure no callback is simultaneously modifying
432 * the key. We need to lock the key before ioqueue here to prevent
433 * deadlock.
434 */
435 pj_mutex_lock(key->mutex);
436
437 /* Also lock ioqueue */
Benny Prijono9033e312005-11-21 02:08:39 +0000438 pj_lock_acquire(ioqueue->lock);
439
440 pj_assert(ioqueue->count > 0);
441 --ioqueue->count;
442 pj_list_erase(key);
443 PJ_FD_CLR(key->fd, &ioqueue->rfdset);
444 PJ_FD_CLR(key->fd, &ioqueue->wfdset);
445#if PJ_HAS_TCP
446 PJ_FD_CLR(key->fd, &ioqueue->xfdset);
447#endif
448
Benny Prijono5accbd02006-03-30 16:32:18 +0000449 /* Close socket. */
450 pj_sock_close(key->fd);
451
452 /* Clear callback */
453 key->cb.on_accept_complete = NULL;
454 key->cb.on_connect_complete = NULL;
455 key->cb.on_read_complete = NULL;
456 key->cb.on_write_complete = NULL;
457
458 /* Must release ioqueue lock first before decrementing counter, to
459 * prevent deadlock.
Benny Prijono9033e312005-11-21 02:08:39 +0000460 */
461 pj_lock_release(ioqueue->lock);
462
Benny Prijono5accbd02006-03-30 16:32:18 +0000463#if PJ_IOQUEUE_HAS_SAFE_UNREG
464 /* Mark key is closing. */
465 key->closing = 1;
466
467 /* Decrement counter. */
468 decrement_counter(key);
469
470 /* Done. */
471 pj_mutex_unlock(key->mutex);
472#else
473 pj_mutex_destroy(key->mutex);
474#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000475
476 return PJ_SUCCESS;
477}
478
479
480/* This supposed to check whether the fd_set values are consistent
481 * with the operation currently set in each key.
482 */
483#if VALIDATE_FD_SET
484static void validate_sets(const pj_ioqueue_t *ioqueue,
485 const pj_fd_set_t *rfdset,
486 const pj_fd_set_t *wfdset,
487 const pj_fd_set_t *xfdset)
488{
489 pj_ioqueue_key_t *key;
490
491 /*
492 * This basicly would not work anymore.
493 * We need to lock key before performing the check, but we can't do
494 * so because we're holding ioqueue mutex. If we acquire key's mutex
495 * now, the will cause deadlock.
496 */
497 pj_assert(0);
498
Benny Prijono5accbd02006-03-30 16:32:18 +0000499 key = ioqueue->active_list.next;
500 while (key != &ioqueue->active_list) {
Benny Prijono9033e312005-11-21 02:08:39 +0000501 if (!pj_list_empty(&key->read_list)
502#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
503 || !pj_list_empty(&key->accept_list)
504#endif
505 )
506 {
507 pj_assert(PJ_FD_ISSET(key->fd, rfdset));
508 }
509 else {
510 pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
511 }
512 if (!pj_list_empty(&key->write_list)
513#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
514 || key->connecting
515#endif
516 )
517 {
518 pj_assert(PJ_FD_ISSET(key->fd, wfdset));
519 }
520 else {
521 pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
522 }
523#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
524 if (key->connecting)
525 {
526 pj_assert(PJ_FD_ISSET(key->fd, xfdset));
527 }
528 else {
529 pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0);
530 }
531#endif /* PJ_HAS_TCP */
532
533 key = key->next;
534 }
535}
536#endif /* VALIDATE_FD_SET */
537
538
539/* ioqueue_remove_from_set()
540 * This function is called from ioqueue_dispatch_event() to instruct
541 * the ioqueue to remove the specified descriptor from ioqueue's descriptor
542 * set for the specified event.
543 */
544static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
545 pj_sock_t fd,
546 enum ioqueue_event_type event_type)
547{
548 pj_lock_acquire(ioqueue->lock);
549
550 if (event_type == READABLE_EVENT)
551 PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset);
552 else if (event_type == WRITEABLE_EVENT)
553 PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset);
554 else if (event_type == EXCEPTION_EVENT)
555 PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset);
556 else
557 pj_assert(0);
558
559 pj_lock_release(ioqueue->lock);
560}
561
562/*
563 * ioqueue_add_to_set()
564 * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
565 * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
566 * set for the specified event.
567 */
568static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
569 pj_sock_t fd,
570 enum ioqueue_event_type event_type )
571{
572 pj_lock_acquire(ioqueue->lock);
573
574 if (event_type == READABLE_EVENT)
575 PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset);
576 else if (event_type == WRITEABLE_EVENT)
577 PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset);
578 else if (event_type == EXCEPTION_EVENT)
579 PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset);
580 else
581 pj_assert(0);
582
583 pj_lock_release(ioqueue->lock);
584}
585
Benny Prijono5accbd02006-03-30 16:32:18 +0000586#if PJ_IOQUEUE_HAS_SAFE_UNREG
587/* Scan closing keys to be put to free list again */
588static void scan_closing_keys(pj_ioqueue_t *ioqueue)
589{
590 pj_time_val now;
591 pj_ioqueue_key_t *h;
592
593 pj_gettimeofday(&now);
594 h = ioqueue->closing_list.next;
595 while (h != &ioqueue->closing_list) {
596 pj_ioqueue_key_t *next = h->next;
597
598 pj_assert(h->closing != 0);
599
600 if (PJ_TIME_VAL_GTE(now, h->free_time)) {
601 pj_list_erase(h);
602 pj_list_push_back(&ioqueue->free_list, h);
603 }
604 h = next;
605 }
606}
607#endif
608
609
Benny Prijono9033e312005-11-21 02:08:39 +0000610/*
611 * pj_ioqueue_poll()
612 *
613 * Few things worth written:
614 *
615 * - we used to do only one callback called per poll, but it didn't go
616 * very well. The reason is because on some situation, the write
617 * callback gets called all the time, thus doesn't give the read
618 * callback to get called. This happens, for example, when user
619 * submit write operation inside the write callback.
620 * As the result, we changed the behaviour so that now multiple
621 * callbacks are called in a single poll. It should be fast too,
622 * just that we need to be carefull with the ioqueue data structs.
623 *
624 * - to guarantee preemptiveness etc, the poll function must strictly
625 * work on fd_set copy of the ioqueue (not the original one).
626 */
627PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
628{
629 pj_fd_set_t rfdset, wfdset, xfdset;
630 int count, counter;
631 pj_ioqueue_key_t *h;
632 struct event
633 {
634 pj_ioqueue_key_t *key;
635 enum ioqueue_event_type event_type;
636 } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
637
Benny Prijono37e8d332006-01-20 21:03:36 +0000638 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
Benny Prijono9033e312005-11-21 02:08:39 +0000639
640 /* Lock ioqueue before making fd_set copies */
641 pj_lock_acquire(ioqueue->lock);
642
643 /* We will only do select() when there are sockets to be polled.
644 * Otherwise select() will return error.
645 */
646 if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
647 PJ_FD_COUNT(&ioqueue->wfdset)==0 &&
648 PJ_FD_COUNT(&ioqueue->xfdset)==0)
649 {
Benny Prijono5accbd02006-03-30 16:32:18 +0000650#if PJ_IOQUEUE_HAS_SAFE_UNREG
651 scan_closing_keys(ioqueue);
652#endif
653 pj_lock_release(ioqueue->lock);
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000654 TRACE__((THIS_FILE, " poll: no fd is set"));
Benny Prijono9033e312005-11-21 02:08:39 +0000655 if (timeout)
656 pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
657 return 0;
658 }
659
660 /* Copy ioqueue's pj_fd_set_t to local variables. */
661 pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));
662 pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));
663#if PJ_HAS_TCP
664 pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));
665#else
666 PJ_FD_ZERO(&xfdset);
667#endif
668
669#if VALIDATE_FD_SET
670 validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);
671#endif
672
673 /* Unlock ioqueue before select(). */
674 pj_lock_release(ioqueue->lock);
675
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000676 count = pj_sock_select(ioqueue->nfds+1, &rfdset, &wfdset, &xfdset,
677 timeout);
Benny Prijono9033e312005-11-21 02:08:39 +0000678
679 if (count <= 0)
Benny Prijono37e8d332006-01-20 21:03:36 +0000680 return -pj_get_netos_error();
Benny Prijono9033e312005-11-21 02:08:39 +0000681 else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
682 count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;
683
684 /* Scan descriptor sets for event and add the events in the event
685 * array to be processed later in this function. We do this so that
686 * events can be processed in parallel without holding ioqueue lock.
687 */
688 pj_lock_acquire(ioqueue->lock);
689
690 counter = 0;
691
692 /* Scan for writable sockets first to handle piggy-back data
693 * coming with accept().
694 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000695 h = ioqueue->active_list.next;
696 for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) {
697
Benny Prijono9033e312005-11-21 02:08:39 +0000698 if ( (key_has_pending_write(h) || key_has_pending_connect(h))
Benny Prijono5accbd02006-03-30 16:32:18 +0000699 && PJ_FD_ISSET(h->fd, &wfdset) && !h->closing)
Benny Prijono9033e312005-11-21 02:08:39 +0000700 {
Benny Prijono5accbd02006-03-30 16:32:18 +0000701#if PJ_IOQUEUE_HAS_SAFE_UNREG
702 increment_counter(h);
703#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000704 event[counter].key = h;
705 event[counter].event_type = WRITEABLE_EVENT;
706 ++counter;
707 }
708
709 /* Scan for readable socket. */
710 if ((key_has_pending_read(h) || key_has_pending_accept(h))
Benny Prijono5accbd02006-03-30 16:32:18 +0000711 && PJ_FD_ISSET(h->fd, &rfdset) && !h->closing)
Benny Prijono9033e312005-11-21 02:08:39 +0000712 {
Benny Prijono5accbd02006-03-30 16:32:18 +0000713#if PJ_IOQUEUE_HAS_SAFE_UNREG
714 increment_counter(h);
715#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000716 event[counter].key = h;
717 event[counter].event_type = READABLE_EVENT;
718 ++counter;
719 }
720
721#if PJ_HAS_TCP
Benny Prijono5accbd02006-03-30 16:32:18 +0000722 if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) &&
723 !h->closing)
724 {
725#if PJ_IOQUEUE_HAS_SAFE_UNREG
726 increment_counter(h);
727#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000728 event[counter].key = h;
729 event[counter].event_type = EXCEPTION_EVENT;
730 ++counter;
731 }
732#endif
733 }
734
735 pj_lock_release(ioqueue->lock);
736
737 count = counter;
738
739 /* Now process all events. The dispatch functions will take care
740 * of locking in each of the key
741 */
742 for (counter=0; counter<count; ++counter) {
743 switch (event[counter].event_type) {
744 case READABLE_EVENT:
745 ioqueue_dispatch_read_event(ioqueue, event[counter].key);
746 break;
747 case WRITEABLE_EVENT:
748 ioqueue_dispatch_write_event(ioqueue, event[counter].key);
749 break;
750 case EXCEPTION_EVENT:
751 ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
752 break;
753 case NO_EVENT:
754 pj_assert(!"Invalid event!");
755 break;
756 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000757
758#if PJ_IOQUEUE_HAS_SAFE_UNREG
759 decrement_counter(event[counter].key);
760#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000761 }
762
Benny Prijono5accbd02006-03-30 16:32:18 +0000763
Benny Prijono9033e312005-11-21 02:08:39 +0000764 return count;
765}
766