blob: eea57a9ac6b59ee40c72432cfecdf8b583f69935 [file] [log] [blame]
Tristan Matthews0a329cc2013-07-17 13:20:14 -04001/* $Id: ioqueue_epoll.c 4528 2013-05-30 07:01:11Z ming $ */
2/*
3 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20/*
21 * ioqueue_epoll.c
22 *
23 * This is the implementation of IOQueue framework using /dev/epoll
24 * API in _both_ Linux user-mode and kernel-mode.
25 */
26
27#include <pj/ioqueue.h>
28#include <pj/os.h>
29#include <pj/lock.h>
30#include <pj/log.h>
31#include <pj/list.h>
32#include <pj/pool.h>
33#include <pj/string.h>
34#include <pj/assert.h>
35#include <pj/errno.h>
36#include <pj/sock.h>
37#include <pj/compat/socket.h>
38#include <pj/rand.h>
39
40#if !defined(PJ_LINUX_KERNEL) || PJ_LINUX_KERNEL==0
41 /*
42 * Linux user mode
43 */
44# include <sys/epoll.h>
45# include <errno.h>
46# include <unistd.h>
47
48# define epoll_data data.ptr
49# define epoll_data_type void*
50# define ioctl_val_type unsigned long
51# define getsockopt_val_ptr int*
52# define os_getsockopt getsockopt
53# define os_ioctl ioctl
54# define os_read read
55# define os_close close
56# define os_epoll_create epoll_create
57# define os_epoll_ctl epoll_ctl
58# define os_epoll_wait epoll_wait
59#else
60 /*
61 * Linux kernel mode.
62 */
63# include <linux/config.h>
64# include <linux/version.h>
65# if defined(MODVERSIONS)
66# include <linux/modversions.h>
67# endif
68# include <linux/kernel.h>
69# include <linux/poll.h>
70# include <linux/eventpoll.h>
71# include <linux/syscalls.h>
72# include <linux/errno.h>
73# include <linux/unistd.h>
74# include <asm/ioctls.h>
75 enum EPOLL_EVENTS
76 {
77 EPOLLIN = 0x001,
78 EPOLLOUT = 0x004,
79 EPOLLERR = 0x008,
80 };
81# define os_epoll_create sys_epoll_create
82 static int os_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
83 {
84 long rc;
85 mm_segment_t oldfs = get_fs();
86 set_fs(KERNEL_DS);
87 rc = sys_epoll_ctl(epfd, op, fd, event);
88 set_fs(oldfs);
89 if (rc) {
90 errno = -rc;
91 return -1;
92 } else {
93 return 0;
94 }
95 }
96 static int os_epoll_wait(int epfd, struct epoll_event *events,
97 int maxevents, int timeout)
98 {
99 int count;
100 mm_segment_t oldfs = get_fs();
101 set_fs(KERNEL_DS);
102 count = sys_epoll_wait(epfd, events, maxevents, timeout);
103 set_fs(oldfs);
104 return count;
105 }
106# define os_close sys_close
107# define os_getsockopt pj_sock_getsockopt
108 static int os_read(int fd, void *buf, size_t len)
109 {
110 long rc;
111 mm_segment_t oldfs = get_fs();
112 set_fs(KERNEL_DS);
113 rc = sys_read(fd, buf, len);
114 set_fs(oldfs);
115 if (rc) {
116 errno = -rc;
117 return -1;
118 } else {
119 return 0;
120 }
121 }
122# define socklen_t unsigned
123# define ioctl_val_type unsigned long
124 int ioctl(int fd, int opt, ioctl_val_type value);
125 static int os_ioctl(int fd, int opt, ioctl_val_type value)
126 {
127 int rc;
128 mm_segment_t oldfs = get_fs();
129 set_fs(KERNEL_DS);
130 rc = ioctl(fd, opt, value);
131 set_fs(oldfs);
132 if (rc < 0) {
133 errno = -rc;
134 return rc;
135 } else
136 return rc;
137 }
138# define getsockopt_val_ptr char*
139
140# define epoll_data data
141# define epoll_data_type __u32
142#endif
143
144#define THIS_FILE "ioq_epoll"
145
146//#define TRACE_(expr) PJ_LOG(3,expr)
147#define TRACE_(expr)
148
149/*
150 * Include common ioqueue abstraction.
151 */
152#include "ioqueue_common_abs.h"
153
154/*
155 * This describes each key.
156 */
157struct pj_ioqueue_key_t
158{
159 DECLARE_COMMON_KEY
160};
161
162struct queue
163{
164 pj_ioqueue_key_t *key;
165 enum ioqueue_event_type event_type;
166};
167
168/*
169 * This describes the I/O queue.
170 */
171struct pj_ioqueue_t
172{
173 DECLARE_COMMON_IOQUEUE
174
175 unsigned max, count;
176 //pj_ioqueue_key_t hlist;
177 pj_ioqueue_key_t active_list;
178 int epfd;
179 //struct epoll_event *events;
180 //struct queue *queue;
181
182#if PJ_IOQUEUE_HAS_SAFE_UNREG
183 pj_mutex_t *ref_cnt_mutex;
184 pj_ioqueue_key_t closing_list;
185 pj_ioqueue_key_t free_list;
186#endif
187};
188
189/* Include implementation for common abstraction after we declare
190 * pj_ioqueue_key_t and pj_ioqueue_t.
191 */
192#include "ioqueue_common_abs.c"
193
194#if PJ_IOQUEUE_HAS_SAFE_UNREG
195/* Scan closing keys to be put to free list again */
196static void scan_closing_keys(pj_ioqueue_t *ioqueue);
197#endif
198
199/*
200 * pj_ioqueue_name()
201 */
202PJ_DEF(const char*) pj_ioqueue_name(void)
203{
204#if defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL!=0
205 return "epoll-kernel";
206#else
207 return "epoll";
208#endif
209}
210
211/*
212 * pj_ioqueue_create()
213 *
214 * Create select ioqueue.
215 */
216PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
217 pj_size_t max_fd,
218 pj_ioqueue_t **p_ioqueue)
219{
220 pj_ioqueue_t *ioqueue;
221 pj_status_t rc;
222 pj_lock_t *lock;
223 int i;
224
225 /* Check that arguments are valid. */
226 PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
227 max_fd > 0, PJ_EINVAL);
228
229 /* Check that size of pj_ioqueue_op_key_t is sufficient */
230 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
231 sizeof(union operation_key), PJ_EBUG);
232
233 ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
234
235 ioqueue_init(ioqueue);
236
237 ioqueue->max = max_fd;
238 ioqueue->count = 0;
239 pj_list_init(&ioqueue->active_list);
240
241#if PJ_IOQUEUE_HAS_SAFE_UNREG
242 /* When safe unregistration is used (the default), we pre-create
243 * all keys and put them in the free list.
244 */
245
246 /* Mutex to protect key's reference counter
247 * We don't want to use key's mutex or ioqueue's mutex because
248 * that would create deadlock situation in some cases.
249 */
250 rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex);
251 if (rc != PJ_SUCCESS)
252 return rc;
253
254
255 /* Init key list */
256 pj_list_init(&ioqueue->free_list);
257 pj_list_init(&ioqueue->closing_list);
258
259
260 /* Pre-create all keys according to max_fd */
261 for ( i=0; i<max_fd; ++i) {
262 pj_ioqueue_key_t *key;
263
264 key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t);
265 key->ref_count = 0;
266 rc = pj_lock_create_recursive_mutex(pool, NULL, &key->lock);
267 if (rc != PJ_SUCCESS) {
268 key = ioqueue->free_list.next;
269 while (key != &ioqueue->free_list) {
270 pj_lock_destroy(key->lock);
271 key = key->next;
272 }
273 pj_mutex_destroy(ioqueue->ref_cnt_mutex);
274 return rc;
275 }
276
277 pj_list_push_back(&ioqueue->free_list, key);
278 }
279#endif
280
281 rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
282 if (rc != PJ_SUCCESS)
283 return rc;
284
285 rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
286 if (rc != PJ_SUCCESS)
287 return rc;
288
289 ioqueue->epfd = os_epoll_create(max_fd);
290 if (ioqueue->epfd < 0) {
291 ioqueue_destroy(ioqueue);
292 return PJ_RETURN_OS_ERROR(pj_get_native_os_error());
293 }
294
295 /*ioqueue->events = pj_pool_calloc(pool, max_fd, sizeof(struct epoll_event));
296 PJ_ASSERT_RETURN(ioqueue->events != NULL, PJ_ENOMEM);
297
298 ioqueue->queue = pj_pool_calloc(pool, max_fd, sizeof(struct queue));
299 PJ_ASSERT_RETURN(ioqueue->queue != NULL, PJ_ENOMEM);
300 */
301 PJ_LOG(4, ("pjlib", "epoll I/O Queue created (%p)", ioqueue));
302
303 *p_ioqueue = ioqueue;
304 return PJ_SUCCESS;
305}
306
307/*
308 * pj_ioqueue_destroy()
309 *
310 * Destroy ioqueue.
311 */
312PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
313{
314 pj_ioqueue_key_t *key;
315
316 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
317 PJ_ASSERT_RETURN(ioqueue->epfd > 0, PJ_EINVALIDOP);
318
319 pj_lock_acquire(ioqueue->lock);
320 os_close(ioqueue->epfd);
321 ioqueue->epfd = 0;
322
323#if PJ_IOQUEUE_HAS_SAFE_UNREG
324 /* Destroy reference counters */
325 key = ioqueue->active_list.next;
326 while (key != &ioqueue->active_list) {
327 pj_lock_destroy(key->lock);
328 key = key->next;
329 }
330
331 key = ioqueue->closing_list.next;
332 while (key != &ioqueue->closing_list) {
333 pj_lock_destroy(key->lock);
334 key = key->next;
335 }
336
337 key = ioqueue->free_list.next;
338 while (key != &ioqueue->free_list) {
339 pj_lock_destroy(key->lock);
340 key = key->next;
341 }
342
343 pj_mutex_destroy(ioqueue->ref_cnt_mutex);
344#endif
345 return ioqueue_destroy(ioqueue);
346}
347
348/*
349 * pj_ioqueue_register_sock()
350 *
351 * Register a socket to ioqueue.
352 */
353PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool,
354 pj_ioqueue_t *ioqueue,
355 pj_sock_t sock,
356 pj_grp_lock_t *grp_lock,
357 void *user_data,
358 const pj_ioqueue_callback *cb,
359 pj_ioqueue_key_t **p_key)
360{
361 pj_ioqueue_key_t *key = NULL;
362 pj_uint32_t value;
363 struct epoll_event ev;
364 int status;
365 pj_status_t rc = PJ_SUCCESS;
366
367 PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
368 cb && p_key, PJ_EINVAL);
369
370 pj_lock_acquire(ioqueue->lock);
371
372 if (ioqueue->count >= ioqueue->max) {
373 rc = PJ_ETOOMANY;
374 TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: too many files"));
375 goto on_return;
376 }
377
378 /* Set socket to nonblocking. */
379 value = 1;
380 if ((rc=os_ioctl(sock, FIONBIO, (ioctl_val_type)&value))) {
381 TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: ioctl rc=%d",
382 rc));
383 rc = pj_get_netos_error();
384 goto on_return;
385 }
386
387 /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get
388 * the key from the free list. Otherwise allocate a new one.
389 */
390#if PJ_IOQUEUE_HAS_SAFE_UNREG
391
392 /* Scan closing_keys first to let them come back to free_list */
393 scan_closing_keys(ioqueue);
394
395 pj_assert(!pj_list_empty(&ioqueue->free_list));
396 if (pj_list_empty(&ioqueue->free_list)) {
397 rc = PJ_ETOOMANY;
398 goto on_return;
399 }
400
401 key = ioqueue->free_list.next;
402 pj_list_erase(key);
403#else
404 /* Create key. */
405 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
406#endif
407
408 rc = ioqueue_init_key(pool, ioqueue, key, sock, grp_lock, user_data, cb);
409 if (rc != PJ_SUCCESS) {
410 key = NULL;
411 goto on_return;
412 }
413
414 /* Create key's mutex */
415 /* rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
416 if (rc != PJ_SUCCESS) {
417 key = NULL;
418 goto on_return;
419 }
420*/
421 /* os_epoll_ctl. */
422 ev.events = EPOLLIN | EPOLLERR;
423 ev.epoll_data = (epoll_data_type)key;
424 status = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_ADD, sock, &ev);
425 if (status < 0) {
426 rc = pj_get_os_error();
427 pj_lock_destroy(key->lock);
428 key = NULL;
429 TRACE_((THIS_FILE,
430 "pj_ioqueue_register_sock error: os_epoll_ctl rc=%d",
431 status));
432 goto on_return;
433 }
434
435 /* Register */
436 pj_list_insert_before(&ioqueue->active_list, key);
437 ++ioqueue->count;
438
439 //TRACE_((THIS_FILE, "socket registered, count=%d", ioqueue->count));
440
441on_return:
442 if (rc != PJ_SUCCESS) {
443 if (key && key->grp_lock)
444 pj_grp_lock_dec_ref_dbg(key->grp_lock, "ioqueue", 0);
445 }
446 *p_key = key;
447 pj_lock_release(ioqueue->lock);
448
449 return rc;
450}
451
452PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
453 pj_ioqueue_t *ioqueue,
454 pj_sock_t sock,
455 void *user_data,
456 const pj_ioqueue_callback *cb,
457 pj_ioqueue_key_t **p_key)
458{
459 return pj_ioqueue_register_sock2(pool, ioqueue, sock, NULL, user_data,
460 cb, p_key);
461}
462
463#if PJ_IOQUEUE_HAS_SAFE_UNREG
464/* Increment key's reference counter */
465static void increment_counter(pj_ioqueue_key_t *key)
466{
467 pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
468 ++key->ref_count;
469 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
470}
471
472/* Decrement the key's reference counter, and when the counter reach zero,
473 * destroy the key.
474 *
475 * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK.
476 */
477static void decrement_counter(pj_ioqueue_key_t *key)
478{
479 pj_lock_acquire(key->ioqueue->lock);
480 pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
481 --key->ref_count;
482 if (key->ref_count == 0) {
483
484 pj_assert(key->closing == 1);
485 pj_gettickcount(&key->free_time);
486 key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
487 pj_time_val_normalize(&key->free_time);
488
489 pj_list_erase(key);
490 pj_list_push_back(&key->ioqueue->closing_list, key);
491
492 }
493 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
494 pj_lock_release(key->ioqueue->lock);
495}
496#endif
497
498/*
499 * pj_ioqueue_unregister()
500 *
501 * Unregister handle from ioqueue.
502 */
503PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
504{
505 pj_ioqueue_t *ioqueue;
506 struct epoll_event ev;
507 int status;
508
509 PJ_ASSERT_RETURN(key != NULL, PJ_EINVAL);
510
511 ioqueue = key->ioqueue;
512
513 /* Lock the key to make sure no callback is simultaneously modifying
514 * the key. We need to lock the key before ioqueue here to prevent
515 * deadlock.
516 */
517 pj_ioqueue_lock_key(key);
518
519 /* Also lock ioqueue */
520 pj_lock_acquire(ioqueue->lock);
521
522 pj_assert(ioqueue->count > 0);
523 --ioqueue->count;
524#if !PJ_IOQUEUE_HAS_SAFE_UNREG
525 pj_list_erase(key);
526#endif
527
528 ev.events = 0;
529 ev.epoll_data = (epoll_data_type)key;
530 status = os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_DEL, key->fd, &ev);
531 if (status != 0) {
532 pj_status_t rc = pj_get_os_error();
533 pj_lock_release(ioqueue->lock);
534 return rc;
535 }
536
537 /* Destroy the key. */
538 pj_sock_close(key->fd);
539
540 pj_lock_release(ioqueue->lock);
541
542
543#if PJ_IOQUEUE_HAS_SAFE_UNREG
544 /* Mark key is closing. */
545 key->closing = 1;
546
547 /* Decrement counter. */
548 decrement_counter(key);
549
550 /* Done. */
551 if (key->grp_lock) {
552 /* just dec_ref and unlock. we will set grp_lock to NULL
553 * elsewhere */
554 pj_grp_lock_t *grp_lock = key->grp_lock;
555 // Don't set grp_lock to NULL otherwise the other thread
556 // will crash. Just leave it as dangling pointer, but this
557 // should be safe
558 //key->grp_lock = NULL;
559 pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0);
560 pj_grp_lock_release(grp_lock);
561 } else {
562 pj_ioqueue_unlock_key(key);
563 }
564#else
565 if (key->grp_lock) {
566 /* set grp_lock to NULL and unlock */
567 pj_grp_lock_t *grp_lock = key->grp_lock;
568 // Don't set grp_lock to NULL otherwise the other thread
569 // will crash. Just leave it as dangling pointer, but this
570 // should be safe
571 //key->grp_lock = NULL;
572 pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0);
573 pj_grp_lock_release(grp_lock);
574 } else {
575 pj_ioqueue_unlock_key(key);
576 }
577
578 pj_lock_destroy(key->lock);
579#endif
580
581 return PJ_SUCCESS;
582}
583
584/* ioqueue_remove_from_set()
585 * This function is called from ioqueue_dispatch_event() to instruct
586 * the ioqueue to remove the specified descriptor from ioqueue's descriptor
587 * set for the specified event.
588 */
589static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
590 pj_ioqueue_key_t *key,
591 enum ioqueue_event_type event_type)
592{
593 if (event_type == WRITEABLE_EVENT) {
594 struct epoll_event ev;
595
596 ev.events = EPOLLIN | EPOLLERR;
597 ev.epoll_data = (epoll_data_type)key;
598 os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_MOD, key->fd, &ev);
599 }
600}
601
602/*
603 * ioqueue_add_to_set()
604 * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
605 * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
606 * set for the specified event.
607 */
608static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
609 pj_ioqueue_key_t *key,
610 enum ioqueue_event_type event_type )
611{
612 if (event_type == WRITEABLE_EVENT) {
613 struct epoll_event ev;
614
615 ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;
616 ev.epoll_data = (epoll_data_type)key;
617 os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_MOD, key->fd, &ev);
618 }
619}
620
621#if PJ_IOQUEUE_HAS_SAFE_UNREG
622/* Scan closing keys to be put to free list again */
623static void scan_closing_keys(pj_ioqueue_t *ioqueue)
624{
625 pj_time_val now;
626 pj_ioqueue_key_t *h;
627
628 pj_gettickcount(&now);
629 h = ioqueue->closing_list.next;
630 while (h != &ioqueue->closing_list) {
631 pj_ioqueue_key_t *next = h->next;
632
633 pj_assert(h->closing != 0);
634
635 if (PJ_TIME_VAL_GTE(now, h->free_time)) {
636 pj_list_erase(h);
637 // Don't set grp_lock to NULL otherwise the other thread
638 // will crash. Just leave it as dangling pointer, but this
639 // should be safe
640 //h->grp_lock = NULL;
641 pj_list_push_back(&ioqueue->free_list, h);
642 }
643 h = next;
644 }
645}
646#endif
647
648/*
649 * pj_ioqueue_poll()
650 *
651 */
652PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
653{
654 int i, count, processed;
655 int msec;
656 //struct epoll_event *events = ioqueue->events;
657 //struct queue *queue = ioqueue->queue;
658 struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
659 struct queue queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
660 pj_timestamp t1, t2;
661
662 PJ_CHECK_STACK();
663
664 msec = timeout ? PJ_TIME_VAL_MSEC(*timeout) : 9000;
665
666 TRACE_((THIS_FILE, "start os_epoll_wait, msec=%d", msec));
667 pj_get_timestamp(&t1);
668
669 //count = os_epoll_wait( ioqueue->epfd, events, ioqueue->max, msec);
670 count = os_epoll_wait( ioqueue->epfd, events, PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL, msec);
671 if (count == 0) {
672#if PJ_IOQUEUE_HAS_SAFE_UNREG
673 /* Check the closing keys only when there's no activity and when there are
674 * pending closing keys.
675 */
676 if (count == 0 && !pj_list_empty(&ioqueue->closing_list)) {
677 pj_lock_acquire(ioqueue->lock);
678 scan_closing_keys(ioqueue);
679 pj_lock_release(ioqueue->lock);
680 }
681#endif
682 TRACE_((THIS_FILE, "os_epoll_wait timed out"));
683 return count;
684 }
685 else if (count < 0) {
686 TRACE_((THIS_FILE, "os_epoll_wait error"));
687 return -pj_get_netos_error();
688 }
689
690 pj_get_timestamp(&t2);
691 TRACE_((THIS_FILE, "os_epoll_wait returns %d, time=%d usec",
692 count, pj_elapsed_usec(&t1, &t2)));
693
694 /* Lock ioqueue. */
695 pj_lock_acquire(ioqueue->lock);
696
697 for (processed=0, i=0; i<count; ++i) {
698 pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type)
699 events[i].epoll_data;
700
701 TRACE_((THIS_FILE, "event %d: events=%d", i, events[i].events));
702
703 /*
704 * Check readability.
705 */
706 if ((events[i].events & EPOLLIN) &&
707 (key_has_pending_read(h) || key_has_pending_accept(h)) && !IS_CLOSING(h) ) {
708
709#if PJ_IOQUEUE_HAS_SAFE_UNREG
710 increment_counter(h);
711#endif
712 queue[processed].key = h;
713 queue[processed].event_type = READABLE_EVENT;
714 ++processed;
715 continue;
716 }
717
718 /*
719 * Check for writeability.
720 */
721 if ((events[i].events & EPOLLOUT) && key_has_pending_write(h) && !IS_CLOSING(h)) {
722
723#if PJ_IOQUEUE_HAS_SAFE_UNREG
724 increment_counter(h);
725#endif
726 queue[processed].key = h;
727 queue[processed].event_type = WRITEABLE_EVENT;
728 ++processed;
729 continue;
730 }
731
732#if PJ_HAS_TCP
733 /*
734 * Check for completion of connect() operation.
735 */
736 if ((events[i].events & EPOLLOUT) && (h->connecting) && !IS_CLOSING(h)) {
737
738#if PJ_IOQUEUE_HAS_SAFE_UNREG
739 increment_counter(h);
740#endif
741 queue[processed].key = h;
742 queue[processed].event_type = WRITEABLE_EVENT;
743 ++processed;
744 continue;
745 }
746#endif /* PJ_HAS_TCP */
747
748 /*
749 * Check for error condition.
750 */
751 if ((events[i].events & EPOLLERR) && !IS_CLOSING(h)) {
752 /*
753 * We need to handle this exception event. If it's related to us
754 * connecting, report it as such. If not, just report it as a
755 * read event and the higher layers will handle it.
756 */
757 if (h->connecting) {
758#if PJ_IOQUEUE_HAS_SAFE_UNREG
759 increment_counter(h);
760#endif
761 queue[processed].key = h;
762 queue[processed].event_type = EXCEPTION_EVENT;
763 ++processed;
764 } else if (key_has_pending_read(h) || key_has_pending_accept(h)) {
765#if PJ_IOQUEUE_HAS_SAFE_UNREG
766 increment_counter(h);
767#endif
768 queue[processed].key = h;
769 queue[processed].event_type = READABLE_EVENT;
770 ++processed;
771 }
772 continue;
773 }
774 }
775 for (i=0; i<processed; ++i) {
776 if (queue[i].key->grp_lock)
777 pj_grp_lock_add_ref_dbg(queue[i].key->grp_lock, "ioqueue", 0);
778 }
779
780 PJ_RACE_ME(5);
781
782 pj_lock_release(ioqueue->lock);
783
784 PJ_RACE_ME(5);
785
786 /* Now process the events. */
787 for (i=0; i<processed; ++i) {
788 switch (queue[i].event_type) {
789 case READABLE_EVENT:
790 ioqueue_dispatch_read_event(ioqueue, queue[i].key);
791 break;
792 case WRITEABLE_EVENT:
793 ioqueue_dispatch_write_event(ioqueue, queue[i].key);
794 break;
795 case EXCEPTION_EVENT:
796 ioqueue_dispatch_exception_event(ioqueue, queue[i].key);
797 break;
798 case NO_EVENT:
799 pj_assert(!"Invalid event!");
800 break;
801 }
802
803#if PJ_IOQUEUE_HAS_SAFE_UNREG
804 decrement_counter(queue[i].key);
805#endif
806
807 if (queue[i].key->grp_lock)
808 pj_grp_lock_dec_ref_dbg(queue[i].key->grp_lock,
809 "ioqueue", 0);
810 }
811
812 /* Special case:
813 * When epoll returns > 0 but no descriptors are actually set!
814 */
815 if (count > 0 && !processed && msec > 0) {
816 pj_thread_sleep(msec);
817 }
818
819 pj_get_timestamp(&t1);
820 TRACE_((THIS_FILE, "ioqueue_poll() returns %d, time=%d usec",
821 processed, pj_elapsed_usec(&t2, &t1)));
822
823 return processed;
824}
825