blob: f5e369044ffbcd93c3015a5bf5ef8bcf6d9b4231 [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
Benny Prijonoa771a512007-02-19 01:13:53 +00003 * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org>
Benny Prijono9033e312005-11-21 02:08:39 +00004 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19
20/*
21 * sock_select.c
22 *
23 * This is the implementation of IOQueue using pj_sock_select().
24 * It runs anywhere where pj_sock_select() is available (currently
25 * Win32, Linux, Linux kernel, etc.).
26 */
27
28#include <pj/ioqueue.h>
29#include <pj/os.h>
30#include <pj/lock.h>
31#include <pj/log.h>
32#include <pj/list.h>
33#include <pj/pool.h>
34#include <pj/string.h>
35#include <pj/assert.h>
36#include <pj/sock.h>
37#include <pj/compat/socket.h>
38#include <pj/sock_select.h>
39#include <pj/errno.h>
40
41/*
42 * Include declaration from common abstraction.
43 */
44#include "ioqueue_common_abs.h"
45
46/*
47 * ISSUES with ioqueue_select()
48 *
49 * EAGAIN/EWOULDBLOCK error in recv():
50 * - when multiple threads are working with the ioqueue, application
51 * may receive EAGAIN or EWOULDBLOCK in the receive callback.
52 * This error happens because more than one thread is watching for
53 * the same descriptor set, so when all of them call recv() or recvfrom()
54 * simultaneously, only one will succeed and the rest will get the error.
55 *
56 */
57#define THIS_FILE "ioq_select"
58
59/*
60 * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
61 * the correct error code.
62 */
63#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
64# error "Error reporting must be enabled for this function to work!"
65#endif
66
67/**
68 * Get the number of descriptors in the set. This is defined in sock_select.c
69 * This function will only return the number of sockets set from PJ_FD_SET
70 * operation. When the set is modified by other means (such as by select()),
71 * the count will not be reflected here.
72 *
73 * That's why don't export this function in the header file, to avoid
74 * misunderstanding.
75 *
76 * @param fdsetp The descriptor set.
77 *
78 * @return Number of descriptors in the set.
79 */
80PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
81
82
83/*
84 * During debugging build, VALIDATE_FD_SET is set.
85 * This will check the validity of the fd_sets.
86 */
87/*
88#if defined(PJ_DEBUG) && PJ_DEBUG != 0
89# define VALIDATE_FD_SET 1
90#else
91# define VALIDATE_FD_SET 0
92#endif
93*/
94#define VALIDATE_FD_SET 0
95
Benny Prijono42c5b9e2006-05-10 19:24:40 +000096#if 0
97# define TRACE__(args) PJ_LOG(3,args)
98#else
99# define TRACE__(args)
100#endif
101
Benny Prijono9033e312005-11-21 02:08:39 +0000102/*
103 * This describes each key.
104 */
105struct pj_ioqueue_key_t
106{
107 DECLARE_COMMON_KEY
108};
109
110/*
111 * This describes the I/O queue itself.
112 */
113struct pj_ioqueue_t
114{
115 DECLARE_COMMON_IOQUEUE
116
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000117 unsigned max, count; /* Max and current key count */
118 int nfds; /* The largest fd value (for select)*/
119 pj_ioqueue_key_t active_list; /* List of active keys. */
Benny Prijono9033e312005-11-21 02:08:39 +0000120 pj_fd_set_t rfdset;
121 pj_fd_set_t wfdset;
122#if PJ_HAS_TCP
123 pj_fd_set_t xfdset;
124#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000125
126#if PJ_IOQUEUE_HAS_SAFE_UNREG
127 pj_mutex_t *ref_cnt_mutex;
128 pj_ioqueue_key_t closing_list;
129 pj_ioqueue_key_t free_list;
130#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000131};
132
133/* Include implementation for common abstraction after we declare
134 * pj_ioqueue_key_t and pj_ioqueue_t.
135 */
136#include "ioqueue_common_abs.c"
137
138/*
139 * pj_ioqueue_name()
140 */
141PJ_DEF(const char*) pj_ioqueue_name(void)
142{
143 return "select";
144}
145
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000146/*
147 * Scan the socket descriptor sets for the largest descriptor.
148 * This value is needed by select().
149 */
150#if defined(PJ_SELECT_NEEDS_NFDS) && PJ_SELECT_NEEDS_NFDS!=0
151static void rescan_fdset(pj_ioqueue_t *ioqueue)
152{
153 pj_ioqueue_key_t *key = ioqueue->active_list.next;
154 int max = 0;
155
156 while (key != &ioqueue->active_list) {
157 if (key->fd > max)
158 max = key->fd;
159 key = key->next;
160 }
161
162 ioqueue->nfds = max;
163}
164#else
165static void rescan_fdset(pj_ioqueue_t *ioqueue)
166{
167 ioqueue->nfds = FD_SETSIZE-1;
168}
169#endif
170
171
Benny Prijono9033e312005-11-21 02:08:39 +0000172/*
173 * pj_ioqueue_create()
174 *
175 * Create select ioqueue.
176 */
177PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
178 pj_size_t max_fd,
179 pj_ioqueue_t **p_ioqueue)
180{
181 pj_ioqueue_t *ioqueue;
182 pj_lock_t *lock;
Benny Prijono5accbd02006-03-30 16:32:18 +0000183 unsigned i;
Benny Prijono9033e312005-11-21 02:08:39 +0000184 pj_status_t rc;
185
186 /* Check that arguments are valid. */
187 PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
188 max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,
189 PJ_EINVAL);
190
191 /* Check that size of pj_ioqueue_op_key_t is sufficient */
192 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
193 sizeof(union operation_key), PJ_EBUG);
194
Benny Prijono5accbd02006-03-30 16:32:18 +0000195 /* Create and init common ioqueue stuffs */
Benny Prijono9033e312005-11-21 02:08:39 +0000196 ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
Benny Prijono9033e312005-11-21 02:08:39 +0000197 ioqueue_init(ioqueue);
198
199 ioqueue->max = max_fd;
200 ioqueue->count = 0;
201 PJ_FD_ZERO(&ioqueue->rfdset);
202 PJ_FD_ZERO(&ioqueue->wfdset);
203#if PJ_HAS_TCP
204 PJ_FD_ZERO(&ioqueue->xfdset);
205#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000206 pj_list_init(&ioqueue->active_list);
Benny Prijono9033e312005-11-21 02:08:39 +0000207
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000208 rescan_fdset(ioqueue);
209
Benny Prijono5accbd02006-03-30 16:32:18 +0000210#if PJ_IOQUEUE_HAS_SAFE_UNREG
211 /* When safe unregistration is used (the default), we pre-create
212 * all keys and put them in the free list.
213 */
214
215 /* Mutex to protect key's reference counter
216 * We don't want to use key's mutex or ioqueue's mutex because
217 * that would create deadlock situation in some cases.
218 */
219 rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex);
220 if (rc != PJ_SUCCESS)
221 return rc;
222
223
224 /* Init key list */
225 pj_list_init(&ioqueue->free_list);
226 pj_list_init(&ioqueue->closing_list);
227
228
229 /* Pre-create all keys according to max_fd */
230 for (i=0; i<max_fd; ++i) {
231 pj_ioqueue_key_t *key;
232
233 key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t));
234 key->ref_count = 0;
235 rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
236 if (rc != PJ_SUCCESS) {
237 key = ioqueue->free_list.next;
238 while (key != &ioqueue->free_list) {
239 pj_mutex_destroy(key->mutex);
240 key = key->next;
241 }
242 pj_mutex_destroy(ioqueue->ref_cnt_mutex);
243 return rc;
244 }
245
246 pj_list_push_back(&ioqueue->free_list, key);
247 }
248#endif
249
250 /* Create and init ioqueue mutex */
Benny Prijono9033e312005-11-21 02:08:39 +0000251 rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
252 if (rc != PJ_SUCCESS)
253 return rc;
254
255 rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
256 if (rc != PJ_SUCCESS)
257 return rc;
258
259 PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
260
261 *p_ioqueue = ioqueue;
262 return PJ_SUCCESS;
263}
264
265/*
266 * pj_ioqueue_destroy()
267 *
268 * Destroy ioqueue.
269 */
270PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
271{
Benny Prijono5accbd02006-03-30 16:32:18 +0000272 pj_ioqueue_key_t *key;
273
Benny Prijono9033e312005-11-21 02:08:39 +0000274 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
275
276 pj_lock_acquire(ioqueue->lock);
Benny Prijono5accbd02006-03-30 16:32:18 +0000277
278#if PJ_IOQUEUE_HAS_SAFE_UNREG
279 /* Destroy reference counters */
280 key = ioqueue->active_list.next;
281 while (key != &ioqueue->active_list) {
282 pj_mutex_destroy(key->mutex);
283 key = key->next;
284 }
285
286 key = ioqueue->closing_list.next;
287 while (key != &ioqueue->closing_list) {
288 pj_mutex_destroy(key->mutex);
289 key = key->next;
290 }
291
292 key = ioqueue->free_list.next;
293 while (key != &ioqueue->free_list) {
294 pj_mutex_destroy(key->mutex);
295 key = key->next;
296 }
297
298 pj_mutex_destroy(ioqueue->ref_cnt_mutex);
299#endif
300
Benny Prijono9033e312005-11-21 02:08:39 +0000301 return ioqueue_destroy(ioqueue);
302}
303
304
305/*
306 * pj_ioqueue_register_sock()
307 *
Benny Prijono5accbd02006-03-30 16:32:18 +0000308 * Register socket handle to ioqueue.
Benny Prijono9033e312005-11-21 02:08:39 +0000309 */
310PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
311 pj_ioqueue_t *ioqueue,
312 pj_sock_t sock,
313 void *user_data,
314 const pj_ioqueue_callback *cb,
315 pj_ioqueue_key_t **p_key)
316{
317 pj_ioqueue_key_t *key = NULL;
Benny Prijonofc24e692007-01-27 18:31:51 +0000318#if defined(PJ_WIN32) && PJ_WIN32!=0 || \
319 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
320 u_long value;
321#else
Benny Prijono9033e312005-11-21 02:08:39 +0000322 pj_uint32_t value;
Benny Prijonofc24e692007-01-27 18:31:51 +0000323#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000324 pj_status_t rc = PJ_SUCCESS;
325
326 PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
327 cb && p_key, PJ_EINVAL);
328
329 pj_lock_acquire(ioqueue->lock);
330
331 if (ioqueue->count >= ioqueue->max) {
332 rc = PJ_ETOOMANY;
333 goto on_return;
334 }
335
Benny Prijono5accbd02006-03-30 16:32:18 +0000336 /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get
337 * the key from the free list. Otherwise allocate a new one.
338 */
339#if PJ_IOQUEUE_HAS_SAFE_UNREG
340 pj_assert(!pj_list_empty(&ioqueue->free_list));
341 if (pj_list_empty(&ioqueue->free_list)) {
342 rc = PJ_ETOOMANY;
343 goto on_return;
344 }
345
346 key = ioqueue->free_list.next;
347 pj_list_erase(key);
348#else
349 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
350#endif
351
352 rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
353 if (rc != PJ_SUCCESS) {
354 key = NULL;
355 goto on_return;
356 }
357
Benny Prijono9033e312005-11-21 02:08:39 +0000358 /* Set socket to nonblocking. */
359 value = 1;
Benny Prijono9cf138e2006-01-19 03:58:29 +0000360#if defined(PJ_WIN32) && PJ_WIN32!=0 || \
361 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
Benny Prijonofc24e692007-01-27 18:31:51 +0000362 if (ioctlsocket(sock, FIONBIO, &value)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000363#else
364 if (ioctl(sock, FIONBIO, &value)) {
365#endif
366 rc = pj_get_netos_error();
367 goto on_return;
368 }
369
Benny Prijono9033e312005-11-21 02:08:39 +0000370
Benny Prijono5accbd02006-03-30 16:32:18 +0000371 /* Put in active list. */
372 pj_list_insert_before(&ioqueue->active_list, key);
Benny Prijono9033e312005-11-21 02:08:39 +0000373 ++ioqueue->count;
374
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000375 /* Rescan fdset to get max descriptor */
376 rescan_fdset(ioqueue);
377
Benny Prijono9033e312005-11-21 02:08:39 +0000378on_return:
379 /* On error, socket may be left in non-blocking mode. */
380 *p_key = key;
381 pj_lock_release(ioqueue->lock);
382
383 return rc;
384}
385
Benny Prijono5accbd02006-03-30 16:32:18 +0000386#if PJ_IOQUEUE_HAS_SAFE_UNREG
387/* Increment key's reference counter */
388static void increment_counter(pj_ioqueue_key_t *key)
389{
390 pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
391 ++key->ref_count;
392 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
393}
394
395/* Decrement the key's reference counter, and when the counter reach zero,
396 * destroy the key.
397 *
398 * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK.
399 */
400static void decrement_counter(pj_ioqueue_key_t *key)
401{
402 pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
403 --key->ref_count;
404 if (key->ref_count == 0) {
405
406 pj_assert(key->closing == 1);
407 pj_gettimeofday(&key->free_time);
408 key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
409 pj_time_val_normalize(&key->free_time);
410
411 pj_lock_acquire(key->ioqueue->lock);
412 pj_list_erase(key);
413 pj_list_push_back(&key->ioqueue->closing_list, key);
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000414 /* Rescan fdset to get max descriptor */
415 rescan_fdset(key->ioqueue);
Benny Prijono5accbd02006-03-30 16:32:18 +0000416 pj_lock_release(key->ioqueue->lock);
417 }
418 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
419}
420#endif
421
422
Benny Prijono9033e312005-11-21 02:08:39 +0000423/*
424 * pj_ioqueue_unregister()
425 *
426 * Unregister handle from ioqueue.
427 */
428PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
429{
430 pj_ioqueue_t *ioqueue;
431
432 PJ_ASSERT_RETURN(key, PJ_EINVAL);
433
434 ioqueue = key->ioqueue;
435
Benny Prijono5accbd02006-03-30 16:32:18 +0000436 /* Lock the key to make sure no callback is simultaneously modifying
437 * the key. We need to lock the key before ioqueue here to prevent
438 * deadlock.
439 */
440 pj_mutex_lock(key->mutex);
441
442 /* Also lock ioqueue */
Benny Prijono9033e312005-11-21 02:08:39 +0000443 pj_lock_acquire(ioqueue->lock);
444
445 pj_assert(ioqueue->count > 0);
446 --ioqueue->count;
447 pj_list_erase(key);
448 PJ_FD_CLR(key->fd, &ioqueue->rfdset);
449 PJ_FD_CLR(key->fd, &ioqueue->wfdset);
450#if PJ_HAS_TCP
451 PJ_FD_CLR(key->fd, &ioqueue->xfdset);
452#endif
453
Benny Prijono5accbd02006-03-30 16:32:18 +0000454 /* Close socket. */
455 pj_sock_close(key->fd);
456
457 /* Clear callback */
458 key->cb.on_accept_complete = NULL;
459 key->cb.on_connect_complete = NULL;
460 key->cb.on_read_complete = NULL;
461 key->cb.on_write_complete = NULL;
462
463 /* Must release ioqueue lock first before decrementing counter, to
464 * prevent deadlock.
Benny Prijono9033e312005-11-21 02:08:39 +0000465 */
466 pj_lock_release(ioqueue->lock);
467
Benny Prijono5accbd02006-03-30 16:32:18 +0000468#if PJ_IOQUEUE_HAS_SAFE_UNREG
469 /* Mark key is closing. */
470 key->closing = 1;
471
472 /* Decrement counter. */
473 decrement_counter(key);
474
475 /* Done. */
476 pj_mutex_unlock(key->mutex);
477#else
478 pj_mutex_destroy(key->mutex);
479#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000480
481 return PJ_SUCCESS;
482}
483
484
485/* This supposed to check whether the fd_set values are consistent
486 * with the operation currently set in each key.
487 */
488#if VALIDATE_FD_SET
489static void validate_sets(const pj_ioqueue_t *ioqueue,
490 const pj_fd_set_t *rfdset,
491 const pj_fd_set_t *wfdset,
492 const pj_fd_set_t *xfdset)
493{
494 pj_ioqueue_key_t *key;
495
496 /*
497 * This basicly would not work anymore.
498 * We need to lock key before performing the check, but we can't do
499 * so because we're holding ioqueue mutex. If we acquire key's mutex
500 * now, the will cause deadlock.
501 */
502 pj_assert(0);
503
Benny Prijono5accbd02006-03-30 16:32:18 +0000504 key = ioqueue->active_list.next;
505 while (key != &ioqueue->active_list) {
Benny Prijono9033e312005-11-21 02:08:39 +0000506 if (!pj_list_empty(&key->read_list)
507#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
508 || !pj_list_empty(&key->accept_list)
509#endif
510 )
511 {
512 pj_assert(PJ_FD_ISSET(key->fd, rfdset));
513 }
514 else {
515 pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
516 }
517 if (!pj_list_empty(&key->write_list)
518#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
519 || key->connecting
520#endif
521 )
522 {
523 pj_assert(PJ_FD_ISSET(key->fd, wfdset));
524 }
525 else {
526 pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
527 }
528#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
529 if (key->connecting)
530 {
531 pj_assert(PJ_FD_ISSET(key->fd, xfdset));
532 }
533 else {
534 pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0);
535 }
536#endif /* PJ_HAS_TCP */
537
538 key = key->next;
539 }
540}
541#endif /* VALIDATE_FD_SET */
542
543
544/* ioqueue_remove_from_set()
545 * This function is called from ioqueue_dispatch_event() to instruct
546 * the ioqueue to remove the specified descriptor from ioqueue's descriptor
547 * set for the specified event.
548 */
549static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
Benny Prijono63ab3562006-07-08 19:46:43 +0000550 pj_ioqueue_key_t *key,
Benny Prijono9033e312005-11-21 02:08:39 +0000551 enum ioqueue_event_type event_type)
552{
553 pj_lock_acquire(ioqueue->lock);
554
555 if (event_type == READABLE_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000556 PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->rfdset);
Benny Prijono9033e312005-11-21 02:08:39 +0000557 else if (event_type == WRITEABLE_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000558 PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->wfdset);
Benny Prijono3569c0d2007-04-06 10:29:20 +0000559#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
Benny Prijono9033e312005-11-21 02:08:39 +0000560 else if (event_type == EXCEPTION_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000561 PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->xfdset);
Benny Prijono3569c0d2007-04-06 10:29:20 +0000562#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000563 else
564 pj_assert(0);
565
566 pj_lock_release(ioqueue->lock);
567}
568
569/*
570 * ioqueue_add_to_set()
571 * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
572 * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
573 * set for the specified event.
574 */
575static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
Benny Prijono63ab3562006-07-08 19:46:43 +0000576 pj_ioqueue_key_t *key,
Benny Prijono9033e312005-11-21 02:08:39 +0000577 enum ioqueue_event_type event_type )
578{
579 pj_lock_acquire(ioqueue->lock);
580
581 if (event_type == READABLE_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000582 PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->rfdset);
Benny Prijono9033e312005-11-21 02:08:39 +0000583 else if (event_type == WRITEABLE_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000584 PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->wfdset);
Benny Prijono3569c0d2007-04-06 10:29:20 +0000585#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
Benny Prijono9033e312005-11-21 02:08:39 +0000586 else if (event_type == EXCEPTION_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000587 PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->xfdset);
Benny Prijono3569c0d2007-04-06 10:29:20 +0000588#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000589 else
590 pj_assert(0);
591
592 pj_lock_release(ioqueue->lock);
593}
594
Benny Prijono5accbd02006-03-30 16:32:18 +0000595#if PJ_IOQUEUE_HAS_SAFE_UNREG
596/* Scan closing keys to be put to free list again */
597static void scan_closing_keys(pj_ioqueue_t *ioqueue)
598{
599 pj_time_val now;
600 pj_ioqueue_key_t *h;
601
602 pj_gettimeofday(&now);
603 h = ioqueue->closing_list.next;
604 while (h != &ioqueue->closing_list) {
605 pj_ioqueue_key_t *next = h->next;
606
607 pj_assert(h->closing != 0);
608
609 if (PJ_TIME_VAL_GTE(now, h->free_time)) {
610 pj_list_erase(h);
611 pj_list_push_back(&ioqueue->free_list, h);
612 }
613 h = next;
614 }
615}
616#endif
617
618
Benny Prijono9033e312005-11-21 02:08:39 +0000619/*
620 * pj_ioqueue_poll()
621 *
622 * Few things worth written:
623 *
624 * - we used to do only one callback called per poll, but it didn't go
625 * very well. The reason is because on some situation, the write
626 * callback gets called all the time, thus doesn't give the read
627 * callback to get called. This happens, for example, when user
628 * submit write operation inside the write callback.
629 * As the result, we changed the behaviour so that now multiple
630 * callbacks are called in a single poll. It should be fast too,
631 * just that we need to be carefull with the ioqueue data structs.
632 *
633 * - to guarantee preemptiveness etc, the poll function must strictly
634 * work on fd_set copy of the ioqueue (not the original one).
635 */
636PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
637{
638 pj_fd_set_t rfdset, wfdset, xfdset;
639 int count, counter;
640 pj_ioqueue_key_t *h;
641 struct event
642 {
643 pj_ioqueue_key_t *key;
644 enum ioqueue_event_type event_type;
645 } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
646
Benny Prijono37e8d332006-01-20 21:03:36 +0000647 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
Benny Prijono9033e312005-11-21 02:08:39 +0000648
649 /* Lock ioqueue before making fd_set copies */
650 pj_lock_acquire(ioqueue->lock);
651
652 /* We will only do select() when there are sockets to be polled.
653 * Otherwise select() will return error.
654 */
655 if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
Benny Prijono3569c0d2007-04-06 10:29:20 +0000656 PJ_FD_COUNT(&ioqueue->wfdset)==0
657#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
658 && PJ_FD_COUNT(&ioqueue->xfdset)==0
659#endif
660 )
Benny Prijono9033e312005-11-21 02:08:39 +0000661 {
Benny Prijono5accbd02006-03-30 16:32:18 +0000662#if PJ_IOQUEUE_HAS_SAFE_UNREG
663 scan_closing_keys(ioqueue);
664#endif
665 pj_lock_release(ioqueue->lock);
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000666 TRACE__((THIS_FILE, " poll: no fd is set"));
Benny Prijono9033e312005-11-21 02:08:39 +0000667 if (timeout)
668 pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
669 return 0;
670 }
671
672 /* Copy ioqueue's pj_fd_set_t to local variables. */
673 pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));
674 pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));
675#if PJ_HAS_TCP
676 pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));
677#else
678 PJ_FD_ZERO(&xfdset);
679#endif
680
681#if VALIDATE_FD_SET
682 validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);
683#endif
684
685 /* Unlock ioqueue before select(). */
686 pj_lock_release(ioqueue->lock);
687
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000688 count = pj_sock_select(ioqueue->nfds+1, &rfdset, &wfdset, &xfdset,
689 timeout);
Benny Prijono9033e312005-11-21 02:08:39 +0000690
691 if (count <= 0)
Benny Prijono37e8d332006-01-20 21:03:36 +0000692 return -pj_get_netos_error();
Benny Prijono9033e312005-11-21 02:08:39 +0000693 else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
694 count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;
695
696 /* Scan descriptor sets for event and add the events in the event
697 * array to be processed later in this function. We do this so that
698 * events can be processed in parallel without holding ioqueue lock.
699 */
700 pj_lock_acquire(ioqueue->lock);
701
702 counter = 0;
703
704 /* Scan for writable sockets first to handle piggy-back data
705 * coming with accept().
706 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000707 h = ioqueue->active_list.next;
708 for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) {
709
Benny Prijono9033e312005-11-21 02:08:39 +0000710 if ( (key_has_pending_write(h) || key_has_pending_connect(h))
Benny Prijono3059eb62006-10-04 20:46:27 +0000711 && PJ_FD_ISSET(h->fd, &wfdset) && !IS_CLOSING(h))
Benny Prijono9033e312005-11-21 02:08:39 +0000712 {
Benny Prijono5accbd02006-03-30 16:32:18 +0000713#if PJ_IOQUEUE_HAS_SAFE_UNREG
714 increment_counter(h);
715#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000716 event[counter].key = h;
717 event[counter].event_type = WRITEABLE_EVENT;
718 ++counter;
719 }
720
721 /* Scan for readable socket. */
722 if ((key_has_pending_read(h) || key_has_pending_accept(h))
Benny Prijono3059eb62006-10-04 20:46:27 +0000723 && PJ_FD_ISSET(h->fd, &rfdset) && !IS_CLOSING(h))
Benny Prijono9033e312005-11-21 02:08:39 +0000724 {
Benny Prijono5accbd02006-03-30 16:32:18 +0000725#if PJ_IOQUEUE_HAS_SAFE_UNREG
726 increment_counter(h);
727#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000728 event[counter].key = h;
729 event[counter].event_type = READABLE_EVENT;
730 ++counter;
731 }
732
733#if PJ_HAS_TCP
Benny Prijono5accbd02006-03-30 16:32:18 +0000734 if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) &&
Benny Prijono3059eb62006-10-04 20:46:27 +0000735 !IS_CLOSING(h))
Benny Prijono5accbd02006-03-30 16:32:18 +0000736 {
737#if PJ_IOQUEUE_HAS_SAFE_UNREG
738 increment_counter(h);
739#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000740 event[counter].key = h;
741 event[counter].event_type = EXCEPTION_EVENT;
742 ++counter;
743 }
744#endif
745 }
746
747 pj_lock_release(ioqueue->lock);
748
749 count = counter;
750
751 /* Now process all events. The dispatch functions will take care
752 * of locking in each of the key
753 */
754 for (counter=0; counter<count; ++counter) {
755 switch (event[counter].event_type) {
756 case READABLE_EVENT:
757 ioqueue_dispatch_read_event(ioqueue, event[counter].key);
758 break;
759 case WRITEABLE_EVENT:
760 ioqueue_dispatch_write_event(ioqueue, event[counter].key);
761 break;
762 case EXCEPTION_EVENT:
763 ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
764 break;
765 case NO_EVENT:
766 pj_assert(!"Invalid event!");
767 break;
768 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000769
770#if PJ_IOQUEUE_HAS_SAFE_UNREG
771 decrement_counter(event[counter].key);
772#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000773 }
774
Benny Prijono5accbd02006-03-30 16:32:18 +0000775
Benny Prijono9033e312005-11-21 02:08:39 +0000776 return count;
777}
778