Benny Prijono | e0312a7 | 2005-11-18 00:16:43 +0000 | [diff] [blame] | 1 | /* $Id$ */
|
Benny Prijono | e722461 | 2005-11-13 19:40:44 +0000 | [diff] [blame] | 2 | /*
|
Benny Prijono | e0312a7 | 2005-11-18 00:16:43 +0000 | [diff] [blame] | 3 | * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>
|
Benny Prijono | e722461 | 2005-11-13 19:40:44 +0000 | [diff] [blame] | 4 | *
|
Benny Prijono | e0312a7 | 2005-11-18 00:16:43 +0000 | [diff] [blame] | 5 | * This program is free software; you can redistribute it and/or modify
|
| 6 | * it under the terms of the GNU General Public License as published by
|
| 7 | * the Free Software Foundation; either version 2 of the License, or
|
| 8 | * (at your option) any later version.
|
Benny Prijono | e722461 | 2005-11-13 19:40:44 +0000 | [diff] [blame] | 9 | *
|
Benny Prijono | e0312a7 | 2005-11-18 00:16:43 +0000 | [diff] [blame] | 10 | * This program is distributed in the hope that it will be useful,
|
Benny Prijono | e722461 | 2005-11-13 19:40:44 +0000 | [diff] [blame] | 11 | * but WITHOUT ANY WARRANTY; without even the implied warranty of
|
Benny Prijono | e0312a7 | 2005-11-18 00:16:43 +0000 | [diff] [blame] | 12 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
| 13 | * GNU General Public License for more details.
|
| 14 | *
|
| 15 | * You should have received a copy of the GNU General Public License
|
| 16 | * along with this program; if not, write to the Free Software
|
| 17 | * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
Benny Prijono | e722461 | 2005-11-13 19:40:44 +0000 | [diff] [blame] | 18 | */
|
Benny Prijono | e0312a7 | 2005-11-18 00:16:43 +0000 | [diff] [blame] | 19 |
|
Benny Prijono | e722461 | 2005-11-13 19:40:44 +0000 | [diff] [blame] | 20 | /*
|
| 21 | * sock_select.c
|
| 22 | *
|
| 23 | * This is the implementation of IOQueue using pj_sock_select().
|
| 24 | * It runs anywhere where pj_sock_select() is available (currently
|
| 25 | * Win32, Linux, Linux kernel, etc.).
|
| 26 | */
|
| 27 |
|
| 28 | #include <pj/ioqueue.h>
|
| 29 | #include <pj/os.h>
|
| 30 | #include <pj/lock.h>
|
| 31 | #include <pj/log.h>
|
| 32 | #include <pj/list.h>
|
| 33 | #include <pj/pool.h>
|
| 34 | #include <pj/string.h>
|
| 35 | #include <pj/assert.h>
|
| 36 | #include <pj/sock.h>
|
| 37 | #include <pj/compat/socket.h>
|
| 38 | #include <pj/sock_select.h>
|
| 39 | #include <pj/errno.h>
|
| 40 |
|
| 41 | /*
|
| 42 | * Include declaration from common abstraction.
|
| 43 | */
|
| 44 | #include "ioqueue_common_abs.h"
|
| 45 |
|
| 46 | /*
|
| 47 | * ISSUES with ioqueue_select()
|
| 48 | *
|
| 49 | * EAGAIN/EWOULDBLOCK error in recv():
|
| 50 | * - when multiple threads are working with the ioqueue, application
|
| 51 | * may receive EAGAIN or EWOULDBLOCK in the receive callback.
|
| 52 | * This error happens because more than one thread is watching for
|
| 53 | * the same descriptor set, so when all of them call recv() or recvfrom()
|
| 54 | * simultaneously, only one will succeed and the rest will get the error.
|
| 55 | *
|
| 56 | */
|
| 57 | #define THIS_FILE "ioq_select"
|
| 58 |
|
| 59 | /*
|
| 60 | * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
|
| 61 | * the correct error code.
|
| 62 | */
|
| 63 | #if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
|
| 64 | # error "Error reporting must be enabled for this function to work!"
|
| 65 | #endif
|
| 66 |
|
| 67 | /**
|
| 68 | * Get the number of descriptors in the set. This is defined in sock_select.c
|
| 69 | * This function will only return the number of sockets set from PJ_FD_SET
|
| 70 | * operation. When the set is modified by other means (such as by select()),
|
| 71 | * the count will not be reflected here.
|
| 72 | *
|
| 73 | * That's why don't export this function in the header file, to avoid
|
| 74 | * misunderstanding.
|
| 75 | *
|
| 76 | * @param fdsetp The descriptor set.
|
| 77 | *
|
| 78 | * @return Number of descriptors in the set.
|
| 79 | */
|
| 80 | PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
|
| 81 |
|
| 82 |
|
| 83 | /*
|
| 84 | * During debugging build, VALIDATE_FD_SET is set.
|
| 85 | * This will check the validity of the fd_sets.
|
| 86 | */
|
| 87 | /*
|
| 88 | #if defined(PJ_DEBUG) && PJ_DEBUG != 0
|
| 89 | # define VALIDATE_FD_SET 1
|
| 90 | #else
|
| 91 | # define VALIDATE_FD_SET 0
|
| 92 | #endif
|
| 93 | */
|
| 94 | #define VALIDATE_FD_SET 0
|
| 95 |
|
| 96 | /*
|
| 97 | * This describes each key.
|
| 98 | */
|
| 99 | struct pj_ioqueue_key_t
|
| 100 | {
|
| 101 | DECLARE_COMMON_KEY
|
| 102 | };
|
| 103 |
|
| 104 | /*
|
| 105 | * This describes the I/O queue itself.
|
| 106 | */
|
| 107 | struct pj_ioqueue_t
|
| 108 | {
|
| 109 | DECLARE_COMMON_IOQUEUE
|
| 110 |
|
| 111 | unsigned max, count;
|
| 112 | pj_ioqueue_key_t key_list;
|
| 113 | pj_fd_set_t rfdset;
|
| 114 | pj_fd_set_t wfdset;
|
| 115 | #if PJ_HAS_TCP
|
| 116 | pj_fd_set_t xfdset;
|
| 117 | #endif
|
| 118 | };
|
| 119 |
|
| 120 | /* Include implementation for common abstraction after we declare
|
| 121 | * pj_ioqueue_key_t and pj_ioqueue_t.
|
| 122 | */
|
| 123 | #include "ioqueue_common_abs.c"
|
| 124 |
|
| 125 | /*
|
| 126 | * pj_ioqueue_name()
|
| 127 | */
|
| 128 | PJ_DEF(const char*) pj_ioqueue_name(void)
|
| 129 | {
|
| 130 | return "select";
|
| 131 | }
|
| 132 |
|
| 133 | /*
|
| 134 | * pj_ioqueue_create()
|
| 135 | *
|
| 136 | * Create select ioqueue.
|
| 137 | */
|
| 138 | PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
|
| 139 | pj_size_t max_fd,
|
| 140 | pj_ioqueue_t **p_ioqueue)
|
| 141 | {
|
| 142 | pj_ioqueue_t *ioqueue;
|
| 143 | pj_lock_t *lock;
|
| 144 | pj_status_t rc;
|
| 145 |
|
| 146 | /* Check that arguments are valid. */
|
| 147 | PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
|
| 148 | max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,
|
| 149 | PJ_EINVAL);
|
| 150 |
|
| 151 | /* Check that size of pj_ioqueue_op_key_t is sufficient */
|
| 152 | PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
|
| 153 | sizeof(union operation_key), PJ_EBUG);
|
| 154 |
|
| 155 | ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
|
| 156 |
|
| 157 | ioqueue_init(ioqueue);
|
| 158 |
|
| 159 | ioqueue->max = max_fd;
|
| 160 | ioqueue->count = 0;
|
| 161 | PJ_FD_ZERO(&ioqueue->rfdset);
|
| 162 | PJ_FD_ZERO(&ioqueue->wfdset);
|
| 163 | #if PJ_HAS_TCP
|
| 164 | PJ_FD_ZERO(&ioqueue->xfdset);
|
| 165 | #endif
|
| 166 | pj_list_init(&ioqueue->key_list);
|
| 167 |
|
| 168 | rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
|
| 169 | if (rc != PJ_SUCCESS)
|
| 170 | return rc;
|
| 171 |
|
| 172 | rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
|
| 173 | if (rc != PJ_SUCCESS)
|
| 174 | return rc;
|
| 175 |
|
| 176 | PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
|
| 177 |
|
| 178 | *p_ioqueue = ioqueue;
|
| 179 | return PJ_SUCCESS;
|
| 180 | }
|
| 181 |
|
| 182 | /*
|
| 183 | * pj_ioqueue_destroy()
|
| 184 | *
|
| 185 | * Destroy ioqueue.
|
| 186 | */
|
| 187 | PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
|
| 188 | {
|
| 189 | PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
|
| 190 |
|
| 191 | pj_lock_acquire(ioqueue->lock);
|
| 192 | return ioqueue_destroy(ioqueue);
|
| 193 | }
|
| 194 |
|
| 195 |
|
| 196 | /*
|
| 197 | * pj_ioqueue_register_sock()
|
| 198 | *
|
| 199 | * Register a handle to ioqueue.
|
| 200 | */
|
| 201 | PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
|
| 202 | pj_ioqueue_t *ioqueue,
|
| 203 | pj_sock_t sock,
|
| 204 | void *user_data,
|
| 205 | const pj_ioqueue_callback *cb,
|
| 206 | pj_ioqueue_key_t **p_key)
|
| 207 | {
|
| 208 | pj_ioqueue_key_t *key = NULL;
|
| 209 | pj_uint32_t value;
|
| 210 | pj_status_t rc = PJ_SUCCESS;
|
| 211 |
|
| 212 | PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
|
| 213 | cb && p_key, PJ_EINVAL);
|
| 214 |
|
| 215 | pj_lock_acquire(ioqueue->lock);
|
| 216 |
|
| 217 | if (ioqueue->count >= ioqueue->max) {
|
| 218 | rc = PJ_ETOOMANY;
|
| 219 | goto on_return;
|
| 220 | }
|
| 221 |
|
| 222 | /* Set socket to nonblocking. */
|
| 223 | value = 1;
|
| 224 | #ifdef PJ_WIN32
|
| 225 | if (ioctlsocket(sock, FIONBIO, (u_long*)&value)) {
|
| 226 | #else
|
| 227 | if (ioctl(sock, FIONBIO, &value)) {
|
| 228 | #endif
|
| 229 | rc = pj_get_netos_error();
|
| 230 | goto on_return;
|
| 231 | }
|
| 232 |
|
| 233 | /* Create key. */
|
| 234 | key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
|
| 235 | rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
|
| 236 | if (rc != PJ_SUCCESS) {
|
| 237 | key = NULL;
|
| 238 | goto on_return;
|
| 239 | }
|
| 240 |
|
| 241 | /* Register */
|
| 242 | pj_list_insert_before(&ioqueue->key_list, key);
|
| 243 | ++ioqueue->count;
|
| 244 |
|
| 245 | on_return:
|
| 246 | /* On error, socket may be left in non-blocking mode. */
|
| 247 | *p_key = key;
|
| 248 | pj_lock_release(ioqueue->lock);
|
| 249 |
|
| 250 | return rc;
|
| 251 | }
|
| 252 |
|
| 253 | /*
|
| 254 | * pj_ioqueue_unregister()
|
| 255 | *
|
| 256 | * Unregister handle from ioqueue.
|
| 257 | */
|
| 258 | PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
|
| 259 | {
|
| 260 | pj_ioqueue_t *ioqueue;
|
| 261 |
|
| 262 | PJ_ASSERT_RETURN(key, PJ_EINVAL);
|
| 263 |
|
| 264 | ioqueue = key->ioqueue;
|
| 265 |
|
| 266 | pj_lock_acquire(ioqueue->lock);
|
| 267 |
|
| 268 | pj_assert(ioqueue->count > 0);
|
| 269 | --ioqueue->count;
|
| 270 | pj_list_erase(key);
|
| 271 | PJ_FD_CLR(key->fd, &ioqueue->rfdset);
|
| 272 | PJ_FD_CLR(key->fd, &ioqueue->wfdset);
|
| 273 | #if PJ_HAS_TCP
|
| 274 | PJ_FD_CLR(key->fd, &ioqueue->xfdset);
|
| 275 | #endif
|
| 276 |
|
| 277 | /* ioqueue_destroy may try to acquire key's mutex.
|
| 278 | * Since normally the order of locking is to lock key's mutex first
|
| 279 | * then ioqueue's mutex, ioqueue_destroy may deadlock unless we
|
| 280 | * release ioqueue's mutex first.
|
| 281 | */
|
| 282 | pj_lock_release(ioqueue->lock);
|
| 283 |
|
| 284 | /* Destroy the key. */
|
| 285 | ioqueue_destroy_key(key);
|
| 286 |
|
| 287 | return PJ_SUCCESS;
|
| 288 | }
|
| 289 |
|
| 290 |
|
| 291 | /* This supposed to check whether the fd_set values are consistent
|
| 292 | * with the operation currently set in each key.
|
| 293 | */
|
| 294 | #if VALIDATE_FD_SET
|
| 295 | static void validate_sets(const pj_ioqueue_t *ioqueue,
|
| 296 | const pj_fd_set_t *rfdset,
|
| 297 | const pj_fd_set_t *wfdset,
|
| 298 | const pj_fd_set_t *xfdset)
|
| 299 | {
|
| 300 | pj_ioqueue_key_t *key;
|
| 301 |
|
| 302 | /*
|
| 303 | * This basicly would not work anymore.
|
| 304 | * We need to lock key before performing the check, but we can't do
|
| 305 | * so because we're holding ioqueue mutex. If we acquire key's mutex
|
| 306 | * now, the will cause deadlock.
|
| 307 | */
|
| 308 | pj_assert(0);
|
| 309 |
|
| 310 | key = ioqueue->key_list.next;
|
| 311 | while (key != &ioqueue->key_list) {
|
| 312 | if (!pj_list_empty(&key->read_list)
|
| 313 | #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
|
| 314 | || !pj_list_empty(&key->accept_list)
|
| 315 | #endif
|
| 316 | )
|
| 317 | {
|
| 318 | pj_assert(PJ_FD_ISSET(key->fd, rfdset));
|
| 319 | }
|
| 320 | else {
|
| 321 | pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
|
| 322 | }
|
| 323 | if (!pj_list_empty(&key->write_list)
|
| 324 | #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
|
| 325 | || key->connecting
|
| 326 | #endif
|
| 327 | )
|
| 328 | {
|
| 329 | pj_assert(PJ_FD_ISSET(key->fd, wfdset));
|
| 330 | }
|
| 331 | else {
|
| 332 | pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
|
| 333 | }
|
| 334 | #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
|
| 335 | if (key->connecting)
|
| 336 | {
|
| 337 | pj_assert(PJ_FD_ISSET(key->fd, xfdset));
|
| 338 | }
|
| 339 | else {
|
| 340 | pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0);
|
| 341 | }
|
| 342 | #endif /* PJ_HAS_TCP */
|
| 343 |
|
| 344 | key = key->next;
|
| 345 | }
|
| 346 | }
|
| 347 | #endif /* VALIDATE_FD_SET */
|
| 348 |
|
| 349 |
|
| 350 | /* ioqueue_remove_from_set()
|
| 351 | * This function is called from ioqueue_dispatch_event() to instruct
|
| 352 | * the ioqueue to remove the specified descriptor from ioqueue's descriptor
|
| 353 | * set for the specified event.
|
| 354 | */
|
| 355 | static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
|
| 356 | pj_sock_t fd,
|
| 357 | enum ioqueue_event_type event_type)
|
| 358 | {
|
| 359 | pj_lock_acquire(ioqueue->lock);
|
| 360 |
|
| 361 | if (event_type == READABLE_EVENT)
|
| 362 | PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset);
|
| 363 | else if (event_type == WRITEABLE_EVENT)
|
| 364 | PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset);
|
| 365 | else if (event_type == EXCEPTION_EVENT)
|
| 366 | PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset);
|
| 367 | else
|
| 368 | pj_assert(0);
|
| 369 |
|
| 370 | pj_lock_release(ioqueue->lock);
|
| 371 | }
|
| 372 |
|
| 373 | /*
|
| 374 | * ioqueue_add_to_set()
|
| 375 | * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
|
| 376 | * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
|
| 377 | * set for the specified event.
|
| 378 | */
|
| 379 | static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
|
| 380 | pj_sock_t fd,
|
| 381 | enum ioqueue_event_type event_type )
|
| 382 | {
|
| 383 | pj_lock_acquire(ioqueue->lock);
|
| 384 |
|
| 385 | if (event_type == READABLE_EVENT)
|
| 386 | PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset);
|
| 387 | else if (event_type == WRITEABLE_EVENT)
|
| 388 | PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset);
|
| 389 | else if (event_type == EXCEPTION_EVENT)
|
| 390 | PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset);
|
| 391 | else
|
| 392 | pj_assert(0);
|
| 393 |
|
| 394 | pj_lock_release(ioqueue->lock);
|
| 395 | }
|
| 396 |
|
| 397 | /*
|
| 398 | * pj_ioqueue_poll()
|
| 399 | *
|
| 400 | * Few things worth written:
|
| 401 | *
|
| 402 | * - we used to do only one callback called per poll, but it didn't go
|
| 403 | * very well. The reason is because on some situation, the write
|
| 404 | * callback gets called all the time, thus doesn't give the read
|
| 405 | * callback to get called. This happens, for example, when user
|
| 406 | * submit write operation inside the write callback.
|
| 407 | * As the result, we changed the behaviour so that now multiple
|
| 408 | * callbacks are called in a single poll. It should be fast too,
|
| 409 | * just that we need to be carefull with the ioqueue data structs.
|
| 410 | *
|
| 411 | * - to guarantee preemptiveness etc, the poll function must strictly
|
| 412 | * work on fd_set copy of the ioqueue (not the original one).
|
| 413 | */
|
| 414 | PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
|
| 415 | {
|
| 416 | pj_fd_set_t rfdset, wfdset, xfdset;
|
| 417 | int count, counter;
|
| 418 | pj_ioqueue_key_t *h;
|
| 419 | struct event
|
| 420 | {
|
| 421 | pj_ioqueue_key_t *key;
|
| 422 | enum ioqueue_event_type event_type;
|
| 423 | } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
|
| 424 |
|
| 425 | PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
|
| 426 |
|
| 427 | /* Lock ioqueue before making fd_set copies */
|
| 428 | pj_lock_acquire(ioqueue->lock);
|
| 429 |
|
| 430 | /* We will only do select() when there are sockets to be polled.
|
| 431 | * Otherwise select() will return error.
|
| 432 | */
|
| 433 | if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
|
| 434 | PJ_FD_COUNT(&ioqueue->wfdset)==0 &&
|
| 435 | PJ_FD_COUNT(&ioqueue->xfdset)==0)
|
| 436 | {
|
| 437 | pj_lock_release(ioqueue->lock);
|
| 438 | if (timeout)
|
| 439 | pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
|
| 440 | return 0;
|
| 441 | }
|
| 442 |
|
| 443 | /* Copy ioqueue's pj_fd_set_t to local variables. */
|
| 444 | pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));
|
| 445 | pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));
|
| 446 | #if PJ_HAS_TCP
|
| 447 | pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));
|
| 448 | #else
|
| 449 | PJ_FD_ZERO(&xfdset);
|
| 450 | #endif
|
| 451 |
|
| 452 | #if VALIDATE_FD_SET
|
| 453 | validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);
|
| 454 | #endif
|
| 455 |
|
| 456 | /* Unlock ioqueue before select(). */
|
| 457 | pj_lock_release(ioqueue->lock);
|
| 458 |
|
| 459 | count = pj_sock_select(FD_SETSIZE, &rfdset, &wfdset, &xfdset, timeout);
|
| 460 |
|
| 461 | if (count <= 0)
|
| 462 | return count;
|
| 463 | else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
|
| 464 | count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;
|
| 465 |
|
| 466 | /* Scan descriptor sets for event and add the events in the event
|
| 467 | * array to be processed later in this function. We do this so that
|
| 468 | * events can be processed in parallel without holding ioqueue lock.
|
| 469 | */
|
| 470 | pj_lock_acquire(ioqueue->lock);
|
| 471 |
|
| 472 | counter = 0;
|
| 473 |
|
| 474 | /* Scan for writable sockets first to handle piggy-back data
|
| 475 | * coming with accept().
|
| 476 | */
|
| 477 | h = ioqueue->key_list.next;
|
| 478 | for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) {
|
| 479 | if ( (key_has_pending_write(h) || key_has_pending_connect(h))
|
| 480 | && PJ_FD_ISSET(h->fd, &wfdset))
|
| 481 | {
|
| 482 | event[counter].key = h;
|
| 483 | event[counter].event_type = WRITEABLE_EVENT;
|
| 484 | ++counter;
|
| 485 | }
|
| 486 |
|
| 487 | /* Scan for readable socket. */
|
| 488 | if ((key_has_pending_read(h) || key_has_pending_accept(h))
|
| 489 | && PJ_FD_ISSET(h->fd, &rfdset))
|
| 490 | {
|
| 491 | event[counter].key = h;
|
| 492 | event[counter].event_type = READABLE_EVENT;
|
| 493 | ++counter;
|
| 494 | }
|
| 495 |
|
| 496 | #if PJ_HAS_TCP
|
| 497 | if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) {
|
| 498 | event[counter].key = h;
|
| 499 | event[counter].event_type = EXCEPTION_EVENT;
|
| 500 | ++counter;
|
| 501 | }
|
| 502 | #endif
|
| 503 | }
|
| 504 |
|
| 505 | pj_lock_release(ioqueue->lock);
|
| 506 |
|
| 507 | count = counter;
|
| 508 |
|
| 509 | /* Now process all events. The dispatch functions will take care
|
| 510 | * of locking in each of the key
|
| 511 | */
|
| 512 | for (counter=0; counter<count; ++counter) {
|
| 513 | switch (event[counter].event_type) {
|
| 514 | case READABLE_EVENT:
|
| 515 | ioqueue_dispatch_read_event(ioqueue, event[counter].key);
|
| 516 | break;
|
| 517 | case WRITEABLE_EVENT:
|
| 518 | ioqueue_dispatch_write_event(ioqueue, event[counter].key);
|
| 519 | break;
|
| 520 | case EXCEPTION_EVENT:
|
| 521 | ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
|
| 522 | break;
|
| 523 | case NO_EVENT:
|
| 524 | pj_assert(!"Invalid event!");
|
| 525 | break;
|
| 526 | }
|
| 527 | }
|
| 528 |
|
| 529 | return count;
|
| 530 | }
|
| 531 |
|