blob: 48612d22aac49fd2c6cfaa9e1332461bbaea2478 [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
Benny Prijonoa771a512007-02-19 01:13:53 +00003 * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org>
Benny Prijono9033e312005-11-21 02:08:39 +00004 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19
20/*
21 * sock_select.c
22 *
23 * This is the implementation of IOQueue using pj_sock_select().
24 * It runs anywhere where pj_sock_select() is available (currently
25 * Win32, Linux, Linux kernel, etc.).
26 */
27
28#include <pj/ioqueue.h>
29#include <pj/os.h>
30#include <pj/lock.h>
31#include <pj/log.h>
32#include <pj/list.h>
33#include <pj/pool.h>
34#include <pj/string.h>
35#include <pj/assert.h>
36#include <pj/sock.h>
37#include <pj/compat/socket.h>
38#include <pj/sock_select.h>
39#include <pj/errno.h>
40
41/*
42 * Include declaration from common abstraction.
43 */
44#include "ioqueue_common_abs.h"
45
46/*
47 * ISSUES with ioqueue_select()
48 *
49 * EAGAIN/EWOULDBLOCK error in recv():
50 * - when multiple threads are working with the ioqueue, application
51 * may receive EAGAIN or EWOULDBLOCK in the receive callback.
52 * This error happens because more than one thread is watching for
53 * the same descriptor set, so when all of them call recv() or recvfrom()
54 * simultaneously, only one will succeed and the rest will get the error.
55 *
56 */
57#define THIS_FILE "ioq_select"
58
59/*
60 * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
61 * the correct error code.
62 */
63#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
64# error "Error reporting must be enabled for this function to work!"
65#endif
66
Benny Prijono9033e312005-11-21 02:08:39 +000067/*
68 * During debugging build, VALIDATE_FD_SET is set.
69 * This will check the validity of the fd_sets.
70 */
71/*
72#if defined(PJ_DEBUG) && PJ_DEBUG != 0
73# define VALIDATE_FD_SET 1
74#else
75# define VALIDATE_FD_SET 0
76#endif
77*/
78#define VALIDATE_FD_SET 0
79
Benny Prijono42c5b9e2006-05-10 19:24:40 +000080#if 0
81# define TRACE__(args) PJ_LOG(3,args)
82#else
83# define TRACE__(args)
84#endif
85
Benny Prijono9033e312005-11-21 02:08:39 +000086/*
87 * This describes each key.
88 */
89struct pj_ioqueue_key_t
90{
91 DECLARE_COMMON_KEY
92};
93
94/*
95 * This describes the I/O queue itself.
96 */
97struct pj_ioqueue_t
98{
99 DECLARE_COMMON_IOQUEUE
100
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000101 unsigned max, count; /* Max and current key count */
102 int nfds; /* The largest fd value (for select)*/
103 pj_ioqueue_key_t active_list; /* List of active keys. */
Benny Prijono9033e312005-11-21 02:08:39 +0000104 pj_fd_set_t rfdset;
105 pj_fd_set_t wfdset;
106#if PJ_HAS_TCP
107 pj_fd_set_t xfdset;
108#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000109
110#if PJ_IOQUEUE_HAS_SAFE_UNREG
111 pj_mutex_t *ref_cnt_mutex;
112 pj_ioqueue_key_t closing_list;
113 pj_ioqueue_key_t free_list;
114#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000115};
116
117/* Include implementation for common abstraction after we declare
118 * pj_ioqueue_key_t and pj_ioqueue_t.
119 */
120#include "ioqueue_common_abs.c"
121
122/*
123 * pj_ioqueue_name()
124 */
125PJ_DEF(const char*) pj_ioqueue_name(void)
126{
127 return "select";
128}
129
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000130/*
131 * Scan the socket descriptor sets for the largest descriptor.
132 * This value is needed by select().
133 */
134#if defined(PJ_SELECT_NEEDS_NFDS) && PJ_SELECT_NEEDS_NFDS!=0
135static void rescan_fdset(pj_ioqueue_t *ioqueue)
136{
137 pj_ioqueue_key_t *key = ioqueue->active_list.next;
138 int max = 0;
139
140 while (key != &ioqueue->active_list) {
141 if (key->fd > max)
142 max = key->fd;
143 key = key->next;
144 }
145
146 ioqueue->nfds = max;
147}
148#else
149static void rescan_fdset(pj_ioqueue_t *ioqueue)
150{
151 ioqueue->nfds = FD_SETSIZE-1;
152}
153#endif
154
155
Benny Prijono9033e312005-11-21 02:08:39 +0000156/*
157 * pj_ioqueue_create()
158 *
159 * Create select ioqueue.
160 */
161PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
162 pj_size_t max_fd,
163 pj_ioqueue_t **p_ioqueue)
164{
165 pj_ioqueue_t *ioqueue;
166 pj_lock_t *lock;
Benny Prijono5accbd02006-03-30 16:32:18 +0000167 unsigned i;
Benny Prijono9033e312005-11-21 02:08:39 +0000168 pj_status_t rc;
169
170 /* Check that arguments are valid. */
171 PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
172 max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,
173 PJ_EINVAL);
174
175 /* Check that size of pj_ioqueue_op_key_t is sufficient */
176 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
177 sizeof(union operation_key), PJ_EBUG);
178
Benny Prijono5accbd02006-03-30 16:32:18 +0000179 /* Create and init common ioqueue stuffs */
Benny Prijonoa1e69682007-05-11 15:14:34 +0000180 ioqueue = PJ_POOL_ALLOC_T(pool, pj_ioqueue_t);
Benny Prijono9033e312005-11-21 02:08:39 +0000181 ioqueue_init(ioqueue);
182
183 ioqueue->max = max_fd;
184 ioqueue->count = 0;
185 PJ_FD_ZERO(&ioqueue->rfdset);
186 PJ_FD_ZERO(&ioqueue->wfdset);
187#if PJ_HAS_TCP
188 PJ_FD_ZERO(&ioqueue->xfdset);
189#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000190 pj_list_init(&ioqueue->active_list);
Benny Prijono9033e312005-11-21 02:08:39 +0000191
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000192 rescan_fdset(ioqueue);
193
Benny Prijono5accbd02006-03-30 16:32:18 +0000194#if PJ_IOQUEUE_HAS_SAFE_UNREG
195 /* When safe unregistration is used (the default), we pre-create
196 * all keys and put them in the free list.
197 */
198
199 /* Mutex to protect key's reference counter
200 * We don't want to use key's mutex or ioqueue's mutex because
201 * that would create deadlock situation in some cases.
202 */
203 rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex);
204 if (rc != PJ_SUCCESS)
205 return rc;
206
207
208 /* Init key list */
209 pj_list_init(&ioqueue->free_list);
210 pj_list_init(&ioqueue->closing_list);
211
212
213 /* Pre-create all keys according to max_fd */
214 for (i=0; i<max_fd; ++i) {
215 pj_ioqueue_key_t *key;
216
Benny Prijonoa1e69682007-05-11 15:14:34 +0000217 key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t);
Benny Prijono5accbd02006-03-30 16:32:18 +0000218 key->ref_count = 0;
219 rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
220 if (rc != PJ_SUCCESS) {
221 key = ioqueue->free_list.next;
222 while (key != &ioqueue->free_list) {
223 pj_mutex_destroy(key->mutex);
224 key = key->next;
225 }
226 pj_mutex_destroy(ioqueue->ref_cnt_mutex);
227 return rc;
228 }
229
230 pj_list_push_back(&ioqueue->free_list, key);
231 }
232#endif
233
234 /* Create and init ioqueue mutex */
Benny Prijono9033e312005-11-21 02:08:39 +0000235 rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
236 if (rc != PJ_SUCCESS)
237 return rc;
238
239 rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
240 if (rc != PJ_SUCCESS)
241 return rc;
242
243 PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
244
245 *p_ioqueue = ioqueue;
246 return PJ_SUCCESS;
247}
248
249/*
250 * pj_ioqueue_destroy()
251 *
252 * Destroy ioqueue.
253 */
254PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
255{
Benny Prijono5accbd02006-03-30 16:32:18 +0000256 pj_ioqueue_key_t *key;
257
Benny Prijono9033e312005-11-21 02:08:39 +0000258 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
259
260 pj_lock_acquire(ioqueue->lock);
Benny Prijono5accbd02006-03-30 16:32:18 +0000261
262#if PJ_IOQUEUE_HAS_SAFE_UNREG
263 /* Destroy reference counters */
264 key = ioqueue->active_list.next;
265 while (key != &ioqueue->active_list) {
266 pj_mutex_destroy(key->mutex);
267 key = key->next;
268 }
269
270 key = ioqueue->closing_list.next;
271 while (key != &ioqueue->closing_list) {
272 pj_mutex_destroy(key->mutex);
273 key = key->next;
274 }
275
276 key = ioqueue->free_list.next;
277 while (key != &ioqueue->free_list) {
278 pj_mutex_destroy(key->mutex);
279 key = key->next;
280 }
281
282 pj_mutex_destroy(ioqueue->ref_cnt_mutex);
283#endif
284
Benny Prijono9033e312005-11-21 02:08:39 +0000285 return ioqueue_destroy(ioqueue);
286}
287
288
289/*
290 * pj_ioqueue_register_sock()
291 *
Benny Prijono5accbd02006-03-30 16:32:18 +0000292 * Register socket handle to ioqueue.
Benny Prijono9033e312005-11-21 02:08:39 +0000293 */
294PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
295 pj_ioqueue_t *ioqueue,
296 pj_sock_t sock,
297 void *user_data,
298 const pj_ioqueue_callback *cb,
299 pj_ioqueue_key_t **p_key)
300{
301 pj_ioqueue_key_t *key = NULL;
Benny Prijonofc24e692007-01-27 18:31:51 +0000302#if defined(PJ_WIN32) && PJ_WIN32!=0 || \
303 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
304 u_long value;
305#else
Benny Prijono9033e312005-11-21 02:08:39 +0000306 pj_uint32_t value;
Benny Prijonofc24e692007-01-27 18:31:51 +0000307#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000308 pj_status_t rc = PJ_SUCCESS;
309
310 PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
311 cb && p_key, PJ_EINVAL);
312
313 pj_lock_acquire(ioqueue->lock);
314
315 if (ioqueue->count >= ioqueue->max) {
316 rc = PJ_ETOOMANY;
317 goto on_return;
318 }
319
Benny Prijono5accbd02006-03-30 16:32:18 +0000320 /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get
321 * the key from the free list. Otherwise allocate a new one.
322 */
323#if PJ_IOQUEUE_HAS_SAFE_UNREG
324 pj_assert(!pj_list_empty(&ioqueue->free_list));
325 if (pj_list_empty(&ioqueue->free_list)) {
326 rc = PJ_ETOOMANY;
327 goto on_return;
328 }
329
330 key = ioqueue->free_list.next;
331 pj_list_erase(key);
332#else
333 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
334#endif
335
336 rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
337 if (rc != PJ_SUCCESS) {
338 key = NULL;
339 goto on_return;
340 }
341
Benny Prijono9033e312005-11-21 02:08:39 +0000342 /* Set socket to nonblocking. */
343 value = 1;
Benny Prijono9cf138e2006-01-19 03:58:29 +0000344#if defined(PJ_WIN32) && PJ_WIN32!=0 || \
345 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
Benny Prijonofc24e692007-01-27 18:31:51 +0000346 if (ioctlsocket(sock, FIONBIO, &value)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000347#else
348 if (ioctl(sock, FIONBIO, &value)) {
349#endif
350 rc = pj_get_netos_error();
351 goto on_return;
352 }
353
Benny Prijono9033e312005-11-21 02:08:39 +0000354
Benny Prijono5accbd02006-03-30 16:32:18 +0000355 /* Put in active list. */
356 pj_list_insert_before(&ioqueue->active_list, key);
Benny Prijono9033e312005-11-21 02:08:39 +0000357 ++ioqueue->count;
358
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000359 /* Rescan fdset to get max descriptor */
360 rescan_fdset(ioqueue);
361
Benny Prijono9033e312005-11-21 02:08:39 +0000362on_return:
363 /* On error, socket may be left in non-blocking mode. */
364 *p_key = key;
365 pj_lock_release(ioqueue->lock);
366
367 return rc;
368}
369
Benny Prijono5accbd02006-03-30 16:32:18 +0000370#if PJ_IOQUEUE_HAS_SAFE_UNREG
371/* Increment key's reference counter */
372static void increment_counter(pj_ioqueue_key_t *key)
373{
374 pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
375 ++key->ref_count;
376 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
377}
378
379/* Decrement the key's reference counter, and when the counter reach zero,
380 * destroy the key.
381 *
382 * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK.
383 */
384static void decrement_counter(pj_ioqueue_key_t *key)
385{
386 pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
387 --key->ref_count;
388 if (key->ref_count == 0) {
389
390 pj_assert(key->closing == 1);
391 pj_gettimeofday(&key->free_time);
392 key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
393 pj_time_val_normalize(&key->free_time);
394
395 pj_lock_acquire(key->ioqueue->lock);
396 pj_list_erase(key);
397 pj_list_push_back(&key->ioqueue->closing_list, key);
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000398 /* Rescan fdset to get max descriptor */
399 rescan_fdset(key->ioqueue);
Benny Prijono5accbd02006-03-30 16:32:18 +0000400 pj_lock_release(key->ioqueue->lock);
401 }
402 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
403}
404#endif
405
406
Benny Prijono9033e312005-11-21 02:08:39 +0000407/*
408 * pj_ioqueue_unregister()
409 *
410 * Unregister handle from ioqueue.
411 */
412PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
413{
414 pj_ioqueue_t *ioqueue;
415
416 PJ_ASSERT_RETURN(key, PJ_EINVAL);
417
418 ioqueue = key->ioqueue;
419
Benny Prijono5accbd02006-03-30 16:32:18 +0000420 /* Lock the key to make sure no callback is simultaneously modifying
421 * the key. We need to lock the key before ioqueue here to prevent
422 * deadlock.
423 */
424 pj_mutex_lock(key->mutex);
425
426 /* Also lock ioqueue */
Benny Prijono9033e312005-11-21 02:08:39 +0000427 pj_lock_acquire(ioqueue->lock);
428
429 pj_assert(ioqueue->count > 0);
430 --ioqueue->count;
431 pj_list_erase(key);
432 PJ_FD_CLR(key->fd, &ioqueue->rfdset);
433 PJ_FD_CLR(key->fd, &ioqueue->wfdset);
434#if PJ_HAS_TCP
435 PJ_FD_CLR(key->fd, &ioqueue->xfdset);
436#endif
437
Benny Prijono5accbd02006-03-30 16:32:18 +0000438 /* Close socket. */
439 pj_sock_close(key->fd);
440
441 /* Clear callback */
442 key->cb.on_accept_complete = NULL;
443 key->cb.on_connect_complete = NULL;
444 key->cb.on_read_complete = NULL;
445 key->cb.on_write_complete = NULL;
446
447 /* Must release ioqueue lock first before decrementing counter, to
448 * prevent deadlock.
Benny Prijono9033e312005-11-21 02:08:39 +0000449 */
450 pj_lock_release(ioqueue->lock);
451
Benny Prijono5accbd02006-03-30 16:32:18 +0000452#if PJ_IOQUEUE_HAS_SAFE_UNREG
453 /* Mark key is closing. */
454 key->closing = 1;
455
456 /* Decrement counter. */
457 decrement_counter(key);
458
459 /* Done. */
460 pj_mutex_unlock(key->mutex);
461#else
462 pj_mutex_destroy(key->mutex);
463#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000464
465 return PJ_SUCCESS;
466}
467
468
469/* This supposed to check whether the fd_set values are consistent
470 * with the operation currently set in each key.
471 */
472#if VALIDATE_FD_SET
473static void validate_sets(const pj_ioqueue_t *ioqueue,
474 const pj_fd_set_t *rfdset,
475 const pj_fd_set_t *wfdset,
476 const pj_fd_set_t *xfdset)
477{
478 pj_ioqueue_key_t *key;
479
480 /*
481 * This basicly would not work anymore.
482 * We need to lock key before performing the check, but we can't do
483 * so because we're holding ioqueue mutex. If we acquire key's mutex
484 * now, the will cause deadlock.
485 */
486 pj_assert(0);
487
Benny Prijono5accbd02006-03-30 16:32:18 +0000488 key = ioqueue->active_list.next;
489 while (key != &ioqueue->active_list) {
Benny Prijono9033e312005-11-21 02:08:39 +0000490 if (!pj_list_empty(&key->read_list)
491#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
492 || !pj_list_empty(&key->accept_list)
493#endif
494 )
495 {
496 pj_assert(PJ_FD_ISSET(key->fd, rfdset));
497 }
498 else {
499 pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
500 }
501 if (!pj_list_empty(&key->write_list)
502#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
503 || key->connecting
504#endif
505 )
506 {
507 pj_assert(PJ_FD_ISSET(key->fd, wfdset));
508 }
509 else {
510 pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
511 }
512#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
513 if (key->connecting)
514 {
515 pj_assert(PJ_FD_ISSET(key->fd, xfdset));
516 }
517 else {
518 pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0);
519 }
520#endif /* PJ_HAS_TCP */
521
522 key = key->next;
523 }
524}
525#endif /* VALIDATE_FD_SET */
526
527
528/* ioqueue_remove_from_set()
529 * This function is called from ioqueue_dispatch_event() to instruct
530 * the ioqueue to remove the specified descriptor from ioqueue's descriptor
531 * set for the specified event.
532 */
533static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
Benny Prijono63ab3562006-07-08 19:46:43 +0000534 pj_ioqueue_key_t *key,
Benny Prijono9033e312005-11-21 02:08:39 +0000535 enum ioqueue_event_type event_type)
536{
537 pj_lock_acquire(ioqueue->lock);
538
539 if (event_type == READABLE_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000540 PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->rfdset);
Benny Prijono9033e312005-11-21 02:08:39 +0000541 else if (event_type == WRITEABLE_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000542 PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->wfdset);
Benny Prijono3569c0d2007-04-06 10:29:20 +0000543#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
Benny Prijono9033e312005-11-21 02:08:39 +0000544 else if (event_type == EXCEPTION_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000545 PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->xfdset);
Benny Prijono3569c0d2007-04-06 10:29:20 +0000546#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000547 else
548 pj_assert(0);
549
550 pj_lock_release(ioqueue->lock);
551}
552
553/*
554 * ioqueue_add_to_set()
555 * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
556 * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
557 * set for the specified event.
558 */
559static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
Benny Prijono63ab3562006-07-08 19:46:43 +0000560 pj_ioqueue_key_t *key,
Benny Prijono9033e312005-11-21 02:08:39 +0000561 enum ioqueue_event_type event_type )
562{
563 pj_lock_acquire(ioqueue->lock);
564
565 if (event_type == READABLE_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000566 PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->rfdset);
Benny Prijono9033e312005-11-21 02:08:39 +0000567 else if (event_type == WRITEABLE_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000568 PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->wfdset);
Benny Prijono3569c0d2007-04-06 10:29:20 +0000569#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
Benny Prijono9033e312005-11-21 02:08:39 +0000570 else if (event_type == EXCEPTION_EVENT)
Benny Prijono63ab3562006-07-08 19:46:43 +0000571 PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->xfdset);
Benny Prijono3569c0d2007-04-06 10:29:20 +0000572#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000573 else
574 pj_assert(0);
575
576 pj_lock_release(ioqueue->lock);
577}
578
Benny Prijono5accbd02006-03-30 16:32:18 +0000579#if PJ_IOQUEUE_HAS_SAFE_UNREG
580/* Scan closing keys to be put to free list again */
581static void scan_closing_keys(pj_ioqueue_t *ioqueue)
582{
583 pj_time_val now;
584 pj_ioqueue_key_t *h;
585
586 pj_gettimeofday(&now);
587 h = ioqueue->closing_list.next;
588 while (h != &ioqueue->closing_list) {
589 pj_ioqueue_key_t *next = h->next;
590
591 pj_assert(h->closing != 0);
592
593 if (PJ_TIME_VAL_GTE(now, h->free_time)) {
594 pj_list_erase(h);
595 pj_list_push_back(&ioqueue->free_list, h);
596 }
597 h = next;
598 }
599}
600#endif
601
602
Benny Prijono9033e312005-11-21 02:08:39 +0000603/*
604 * pj_ioqueue_poll()
605 *
606 * Few things worth written:
607 *
608 * - we used to do only one callback called per poll, but it didn't go
609 * very well. The reason is because on some situation, the write
610 * callback gets called all the time, thus doesn't give the read
611 * callback to get called. This happens, for example, when user
612 * submit write operation inside the write callback.
613 * As the result, we changed the behaviour so that now multiple
614 * callbacks are called in a single poll. It should be fast too,
615 * just that we need to be carefull with the ioqueue data structs.
616 *
617 * - to guarantee preemptiveness etc, the poll function must strictly
618 * work on fd_set copy of the ioqueue (not the original one).
619 */
620PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
621{
622 pj_fd_set_t rfdset, wfdset, xfdset;
623 int count, counter;
624 pj_ioqueue_key_t *h;
625 struct event
626 {
627 pj_ioqueue_key_t *key;
628 enum ioqueue_event_type event_type;
629 } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
630
Benny Prijono37e8d332006-01-20 21:03:36 +0000631 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
Benny Prijono9033e312005-11-21 02:08:39 +0000632
633 /* Lock ioqueue before making fd_set copies */
634 pj_lock_acquire(ioqueue->lock);
635
636 /* We will only do select() when there are sockets to be polled.
637 * Otherwise select() will return error.
638 */
639 if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
Benny Prijono3569c0d2007-04-06 10:29:20 +0000640 PJ_FD_COUNT(&ioqueue->wfdset)==0
641#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
642 && PJ_FD_COUNT(&ioqueue->xfdset)==0
643#endif
644 )
Benny Prijono9033e312005-11-21 02:08:39 +0000645 {
Benny Prijono5accbd02006-03-30 16:32:18 +0000646#if PJ_IOQUEUE_HAS_SAFE_UNREG
647 scan_closing_keys(ioqueue);
648#endif
649 pj_lock_release(ioqueue->lock);
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000650 TRACE__((THIS_FILE, " poll: no fd is set"));
Benny Prijono9033e312005-11-21 02:08:39 +0000651 if (timeout)
652 pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
653 return 0;
654 }
655
656 /* Copy ioqueue's pj_fd_set_t to local variables. */
657 pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));
658 pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));
659#if PJ_HAS_TCP
660 pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));
661#else
662 PJ_FD_ZERO(&xfdset);
663#endif
664
665#if VALIDATE_FD_SET
666 validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);
667#endif
668
669 /* Unlock ioqueue before select(). */
670 pj_lock_release(ioqueue->lock);
671
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000672 count = pj_sock_select(ioqueue->nfds+1, &rfdset, &wfdset, &xfdset,
673 timeout);
Benny Prijono9033e312005-11-21 02:08:39 +0000674
675 if (count <= 0)
Benny Prijono37e8d332006-01-20 21:03:36 +0000676 return -pj_get_netos_error();
Benny Prijono9033e312005-11-21 02:08:39 +0000677 else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
678 count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;
679
680 /* Scan descriptor sets for event and add the events in the event
681 * array to be processed later in this function. We do this so that
682 * events can be processed in parallel without holding ioqueue lock.
683 */
684 pj_lock_acquire(ioqueue->lock);
685
686 counter = 0;
687
688 /* Scan for writable sockets first to handle piggy-back data
689 * coming with accept().
690 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000691 h = ioqueue->active_list.next;
692 for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) {
693
Benny Prijono9033e312005-11-21 02:08:39 +0000694 if ( (key_has_pending_write(h) || key_has_pending_connect(h))
Benny Prijono3059eb62006-10-04 20:46:27 +0000695 && PJ_FD_ISSET(h->fd, &wfdset) && !IS_CLOSING(h))
Benny Prijono9033e312005-11-21 02:08:39 +0000696 {
Benny Prijono5accbd02006-03-30 16:32:18 +0000697#if PJ_IOQUEUE_HAS_SAFE_UNREG
698 increment_counter(h);
699#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000700 event[counter].key = h;
701 event[counter].event_type = WRITEABLE_EVENT;
702 ++counter;
703 }
704
705 /* Scan for readable socket. */
706 if ((key_has_pending_read(h) || key_has_pending_accept(h))
Benny Prijono3059eb62006-10-04 20:46:27 +0000707 && PJ_FD_ISSET(h->fd, &rfdset) && !IS_CLOSING(h))
Benny Prijono9033e312005-11-21 02:08:39 +0000708 {
Benny Prijono5accbd02006-03-30 16:32:18 +0000709#if PJ_IOQUEUE_HAS_SAFE_UNREG
710 increment_counter(h);
711#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000712 event[counter].key = h;
713 event[counter].event_type = READABLE_EVENT;
714 ++counter;
715 }
716
717#if PJ_HAS_TCP
Benny Prijono5accbd02006-03-30 16:32:18 +0000718 if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) &&
Benny Prijono3059eb62006-10-04 20:46:27 +0000719 !IS_CLOSING(h))
Benny Prijono5accbd02006-03-30 16:32:18 +0000720 {
721#if PJ_IOQUEUE_HAS_SAFE_UNREG
722 increment_counter(h);
723#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000724 event[counter].key = h;
725 event[counter].event_type = EXCEPTION_EVENT;
726 ++counter;
727 }
728#endif
729 }
730
731 pj_lock_release(ioqueue->lock);
732
733 count = counter;
734
735 /* Now process all events. The dispatch functions will take care
736 * of locking in each of the key
737 */
738 for (counter=0; counter<count; ++counter) {
739 switch (event[counter].event_type) {
740 case READABLE_EVENT:
741 ioqueue_dispatch_read_event(ioqueue, event[counter].key);
742 break;
743 case WRITEABLE_EVENT:
744 ioqueue_dispatch_write_event(ioqueue, event[counter].key);
745 break;
746 case EXCEPTION_EVENT:
747 ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
748 break;
749 case NO_EVENT:
750 pj_assert(!"Invalid event!");
751 break;
752 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000753
754#if PJ_IOQUEUE_HAS_SAFE_UNREG
755 decrement_counter(event[counter].key);
756#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000757 }
758
Benny Prijono5accbd02006-03-30 16:32:18 +0000759
Benny Prijono9033e312005-11-21 02:08:39 +0000760 return count;
761}
762