Set svn:eol-style for all files

git-svn-id: https://svn.pjsip.org/repos/pjproject/trunk@66 74dad513-b988-da41-8d7b-12977e46ad98
diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c
index b313c44..df08d0c 100644
--- a/pjlib/src/pj/ioqueue_select.c
+++ b/pjlib/src/pj/ioqueue_select.c
@@ -1,531 +1,531 @@
-/* $Id$ */

-/* 

- * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>

- *

- * This program is free software; you can redistribute it and/or modify

- * it under the terms of the GNU General Public License as published by

- * the Free Software Foundation; either version 2 of the License, or

- * (at your option) any later version.

- *

- * This program is distributed in the hope that it will be useful,

- * but WITHOUT ANY WARRANTY; without even the implied warranty of

- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the

- * GNU General Public License for more details.

- *

- * You should have received a copy of the GNU General Public License

- * along with this program; if not, write to the Free Software

- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 

- */

-

-/*

- * sock_select.c

- *

- * This is the implementation of IOQueue using pj_sock_select().

- * It runs anywhere where pj_sock_select() is available (currently

- * Win32, Linux, Linux kernel, etc.).

- */

-

-#include <pj/ioqueue.h>

-#include <pj/os.h>

-#include <pj/lock.h>

-#include <pj/log.h>

-#include <pj/list.h>

-#include <pj/pool.h>

-#include <pj/string.h>

-#include <pj/assert.h>

-#include <pj/sock.h>

-#include <pj/compat/socket.h>

-#include <pj/sock_select.h>

-#include <pj/errno.h>

-

-/*

- * Include declaration from common abstraction.

- */

-#include "ioqueue_common_abs.h"

-

-/*

- * ISSUES with ioqueue_select()

- *

- * EAGAIN/EWOULDBLOCK error in recv():

- *  - when multiple threads are working with the ioqueue, application

- *    may receive EAGAIN or EWOULDBLOCK in the receive callback.

- *    This error happens because more than one thread is watching for

- *    the same descriptor set, so when all of them call recv() or recvfrom()

- *    simultaneously, only one will succeed and the rest will get the error.

- *

- */

-#define THIS_FILE   "ioq_select"

-

-/*

- * The select ioqueue relies on socket functions (pj_sock_xxx()) to return

- * the correct error code.

- */

-#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)

-#   error "Error reporting must be enabled for this function to work!"

-#endif

-

-/**

- * Get the number of descriptors in the set. This is defined in sock_select.c

- * This function will only return the number of sockets set from PJ_FD_SET

- * operation. When the set is modified by other means (such as by select()),

- * the count will not be reflected here.

- *

- * That's why don't export this function in the header file, to avoid

- * misunderstanding.

- *

- * @param fdsetp    The descriptor set.

- *

- * @return          Number of descriptors in the set.

- */

-PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);

-

-

-/*

- * During debugging build, VALIDATE_FD_SET is set.

- * This will check the validity of the fd_sets.

- */

-/*

-#if defined(PJ_DEBUG) && PJ_DEBUG != 0

-#  define VALIDATE_FD_SET		1

-#else

-#  define VALIDATE_FD_SET		0

-#endif

-*/

-#define VALIDATE_FD_SET     0

-

-/*

- * This describes each key.

- */

-struct pj_ioqueue_key_t

-{

-    DECLARE_COMMON_KEY

-};

-

-/*

- * This describes the I/O queue itself.

- */

-struct pj_ioqueue_t

-{

-    DECLARE_COMMON_IOQUEUE

-

-    unsigned		max, count;

-    pj_ioqueue_key_t	key_list;

-    pj_fd_set_t		rfdset;

-    pj_fd_set_t		wfdset;

-#if PJ_HAS_TCP

-    pj_fd_set_t		xfdset;

-#endif

-};

-

-/* Include implementation for common abstraction after we declare

- * pj_ioqueue_key_t and pj_ioqueue_t.

- */

-#include "ioqueue_common_abs.c"

-

-/*

- * pj_ioqueue_name()

- */

-PJ_DEF(const char*) pj_ioqueue_name(void)

-{

-    return "select";

-}

-

-/*

- * pj_ioqueue_create()

- *

- * Create select ioqueue.

- */

-PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, 

-                                       pj_size_t max_fd,

-                                       pj_ioqueue_t **p_ioqueue)

-{

-    pj_ioqueue_t *ioqueue;

-    pj_lock_t *lock;

-    pj_status_t rc;

-

-    /* Check that arguments are valid. */

-    PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && 

-                     max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES, 

-                     PJ_EINVAL);

-

-    /* Check that size of pj_ioqueue_op_key_t is sufficient */

-    PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=

-                     sizeof(union operation_key), PJ_EBUG);

-

-    ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));

-

-    ioqueue_init(ioqueue);

-

-    ioqueue->max = max_fd;

-    ioqueue->count = 0;

-    PJ_FD_ZERO(&ioqueue->rfdset);

-    PJ_FD_ZERO(&ioqueue->wfdset);

-#if PJ_HAS_TCP

-    PJ_FD_ZERO(&ioqueue->xfdset);

-#endif

-    pj_list_init(&ioqueue->key_list);

-

-    rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);

-    if (rc != PJ_SUCCESS)

-	return rc;

-

-    rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);

-    if (rc != PJ_SUCCESS)

-        return rc;

-

-    PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));

-

-    *p_ioqueue = ioqueue;

-    return PJ_SUCCESS;

-}

-

-/*

- * pj_ioqueue_destroy()

- *

- * Destroy ioqueue.

- */

-PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)

-{

-    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);

-

-    pj_lock_acquire(ioqueue->lock);

-    return ioqueue_destroy(ioqueue);

-}

-

-

-/*

- * pj_ioqueue_register_sock()

- *

- * Register a handle to ioqueue.

- */

-PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,

-					      pj_ioqueue_t *ioqueue,

-					      pj_sock_t sock,

-					      void *user_data,

-					      const pj_ioqueue_callback *cb,

-                                              pj_ioqueue_key_t **p_key)

-{

-    pj_ioqueue_key_t *key = NULL;

-    pj_uint32_t value;

-    pj_status_t rc = PJ_SUCCESS;

-    

-    PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&

-                     cb && p_key, PJ_EINVAL);

-

-    pj_lock_acquire(ioqueue->lock);

-

-    if (ioqueue->count >= ioqueue->max) {

-        rc = PJ_ETOOMANY;

-	goto on_return;

-    }

-

-    /* Set socket to nonblocking. */

-    value = 1;

-#ifdef PJ_WIN32

-    if (ioctlsocket(sock, FIONBIO, (u_long*)&value)) {

-#else

-    if (ioctl(sock, FIONBIO, &value)) {

-#endif

-        rc = pj_get_netos_error();

-	goto on_return;

-    }

-

-    /* Create key. */

-    key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));

-    rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);

-    if (rc != PJ_SUCCESS) {

-	key = NULL;

-	goto on_return;

-    }

-

-    /* Register */

-    pj_list_insert_before(&ioqueue->key_list, key);

-    ++ioqueue->count;

-

-on_return:

-    /* On error, socket may be left in non-blocking mode. */

-    *p_key = key;

-    pj_lock_release(ioqueue->lock);

-    

-    return rc;

-}

-

-/*

- * pj_ioqueue_unregister()

- *

- * Unregister handle from ioqueue.

- */

-PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)

-{

-    pj_ioqueue_t *ioqueue;

-

-    PJ_ASSERT_RETURN(key, PJ_EINVAL);

-

-    ioqueue = key->ioqueue;

-

-    pj_lock_acquire(ioqueue->lock);

-

-    pj_assert(ioqueue->count > 0);

-    --ioqueue->count;

-    pj_list_erase(key);

-    PJ_FD_CLR(key->fd, &ioqueue->rfdset);

-    PJ_FD_CLR(key->fd, &ioqueue->wfdset);

-#if PJ_HAS_TCP

-    PJ_FD_CLR(key->fd, &ioqueue->xfdset);

-#endif

-

-    /* ioqueue_destroy may try to acquire key's mutex.

-     * Since normally the order of locking is to lock key's mutex first

-     * then ioqueue's mutex, ioqueue_destroy may deadlock unless we

-     * release ioqueue's mutex first.

-     */

-    pj_lock_release(ioqueue->lock);

-

-    /* Destroy the key. */

-    ioqueue_destroy_key(key);

-

-    return PJ_SUCCESS;

-}

-

-

-/* This supposed to check whether the fd_set values are consistent

- * with the operation currently set in each key.

- */

-#if VALIDATE_FD_SET

-static void validate_sets(const pj_ioqueue_t *ioqueue,

-			  const pj_fd_set_t *rfdset,

-			  const pj_fd_set_t *wfdset,

-			  const pj_fd_set_t *xfdset)

-{

-    pj_ioqueue_key_t *key;

-

-    /*

-     * This basicly would not work anymore.

-     * We need to lock key before performing the check, but we can't do

-     * so because we're holding ioqueue mutex. If we acquire key's mutex

-     * now, the will cause deadlock.

-     */

-    pj_assert(0);

-

-    key = ioqueue->key_list.next;

-    while (key != &ioqueue->key_list) {

-	if (!pj_list_empty(&key->read_list)

-#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0

-	    || !pj_list_empty(&key->accept_list)

-#endif

-	    ) 

-	{

-	    pj_assert(PJ_FD_ISSET(key->fd, rfdset));

-	} 

-	else {

-	    pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);

-	}

-	if (!pj_list_empty(&key->write_list)

-#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0

-	    || key->connecting

-#endif

-	   )

-	{

-	    pj_assert(PJ_FD_ISSET(key->fd, wfdset));

-	}

-	else {

-	    pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);

-	}

-#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0

-	if (key->connecting)

-	{

-	    pj_assert(PJ_FD_ISSET(key->fd, xfdset));

-	}

-	else {

-	    pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0);

-	}

-#endif /* PJ_HAS_TCP */

-

-	key = key->next;

-    }

-}

-#endif	/* VALIDATE_FD_SET */

-

-

-/* ioqueue_remove_from_set()

- * This function is called from ioqueue_dispatch_event() to instruct

- * the ioqueue to remove the specified descriptor from ioqueue's descriptor

- * set for the specified event.

- */

-static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,

-                                     pj_sock_t fd, 

-                                     enum ioqueue_event_type event_type)

-{

-    pj_lock_acquire(ioqueue->lock);

-

-    if (event_type == READABLE_EVENT)

-        PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset);

-    else if (event_type == WRITEABLE_EVENT)

-        PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset);

-    else if (event_type == EXCEPTION_EVENT)

-        PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset);

-    else

-        pj_assert(0);

-

-    pj_lock_release(ioqueue->lock);

-}

-

-/*

- * ioqueue_add_to_set()

- * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc

- * to instruct the ioqueue to add the specified handle to ioqueue's descriptor

- * set for the specified event.

- */

-static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,

-                                pj_sock_t fd,

-                                enum ioqueue_event_type event_type )

-{

-    pj_lock_acquire(ioqueue->lock);

-

-    if (event_type == READABLE_EVENT)

-        PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset);

-    else if (event_type == WRITEABLE_EVENT)

-        PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset);

-    else if (event_type == EXCEPTION_EVENT)

-        PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset);

-    else

-        pj_assert(0);

-

-    pj_lock_release(ioqueue->lock);

-}

-

-/*

- * pj_ioqueue_poll()

- *

- * Few things worth written:

- *

- *  - we used to do only one callback called per poll, but it didn't go

- *    very well. The reason is because on some situation, the write 

- *    callback gets called all the time, thus doesn't give the read

- *    callback to get called. This happens, for example, when user

- *    submit write operation inside the write callback.

- *    As the result, we changed the behaviour so that now multiple

- *    callbacks are called in a single poll. It should be fast too,

- *    just that we need to be carefull with the ioqueue data structs.

- *

- *  - to guarantee preemptiveness etc, the poll function must strictly

- *    work on fd_set copy of the ioqueue (not the original one).

- */

-PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)

-{

-    pj_fd_set_t rfdset, wfdset, xfdset;

-    int count, counter;

-    pj_ioqueue_key_t *h;

-    struct event

-    {

-        pj_ioqueue_key_t	*key;

-        enum ioqueue_event_type  event_type;

-    } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];

-

-    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);

-

-    /* Lock ioqueue before making fd_set copies */

-    pj_lock_acquire(ioqueue->lock);

-

-    /* We will only do select() when there are sockets to be polled.

-     * Otherwise select() will return error.

-     */

-    if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&

-        PJ_FD_COUNT(&ioqueue->wfdset)==0 &&

-        PJ_FD_COUNT(&ioqueue->xfdset)==0)

-    {

-        pj_lock_release(ioqueue->lock);

-        if (timeout)

-            pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));

-        return 0;

-    }

-

-    /* Copy ioqueue's pj_fd_set_t to local variables. */

-    pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));

-    pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));

-#if PJ_HAS_TCP

-    pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));

-#else

-    PJ_FD_ZERO(&xfdset);

-#endif

-

-#if VALIDATE_FD_SET

-    validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);

-#endif

-

-    /* Unlock ioqueue before select(). */

-    pj_lock_release(ioqueue->lock);

-

-    count = pj_sock_select(FD_SETSIZE, &rfdset, &wfdset, &xfdset, timeout);

-    

-    if (count <= 0)

-	return count;

-    else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)

-        count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;

-

-    /* Scan descriptor sets for event and add the events in the event

-     * array to be processed later in this function. We do this so that

-     * events can be processed in parallel without holding ioqueue lock.

-     */

-    pj_lock_acquire(ioqueue->lock);

-

-    counter = 0;

-

-    /* Scan for writable sockets first to handle piggy-back data

-     * coming with accept().

-     */

-    h = ioqueue->key_list.next;

-    for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) {

-	if ( (key_has_pending_write(h) || key_has_pending_connect(h))

-	     && PJ_FD_ISSET(h->fd, &wfdset))

-        {

-            event[counter].key = h;

-            event[counter].event_type = WRITEABLE_EVENT;

-            ++counter;

-        }

-

-        /* Scan for readable socket. */

-	if ((key_has_pending_read(h) || key_has_pending_accept(h))

-            && PJ_FD_ISSET(h->fd, &rfdset))

-        {

-            event[counter].key = h;

-            event[counter].event_type = READABLE_EVENT;

-            ++counter;

-	}

-

-#if PJ_HAS_TCP

-        if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) {

-            event[counter].key = h;

-            event[counter].event_type = EXCEPTION_EVENT;

-            ++counter;

-        }

-#endif

-    }

-

-    pj_lock_release(ioqueue->lock);

-

-    count = counter;

-

-    /* Now process all events. The dispatch functions will take care

-     * of locking in each of the key

-     */

-    for (counter=0; counter<count; ++counter) {

-        switch (event[counter].event_type) {

-        case READABLE_EVENT:

-            ioqueue_dispatch_read_event(ioqueue, event[counter].key);

-            break;

-        case WRITEABLE_EVENT:

-            ioqueue_dispatch_write_event(ioqueue, event[counter].key);

-            break;

-        case EXCEPTION_EVENT:

-            ioqueue_dispatch_exception_event(ioqueue, event[counter].key);

-            break;

-        case NO_EVENT:

-            pj_assert(!"Invalid event!");

-            break;

-        }

-    }

-

-    return count;

-}

-

+/* $Id$ */
+/* 
+ * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
+ */
+
+/*
+ * sock_select.c
+ *
+ * This is the implementation of IOQueue using pj_sock_select().
+ * It runs anywhere where pj_sock_select() is available (currently
+ * Win32, Linux, Linux kernel, etc.).
+ */
+
+#include <pj/ioqueue.h>
+#include <pj/os.h>
+#include <pj/lock.h>
+#include <pj/log.h>
+#include <pj/list.h>
+#include <pj/pool.h>
+#include <pj/string.h>
+#include <pj/assert.h>
+#include <pj/sock.h>
+#include <pj/compat/socket.h>
+#include <pj/sock_select.h>
+#include <pj/errno.h>
+
+/*
+ * Include declaration from common abstraction.
+ */
+#include "ioqueue_common_abs.h"
+
+/*
+ * ISSUES with ioqueue_select()
+ *
+ * EAGAIN/EWOULDBLOCK error in recv():
+ *  - when multiple threads are working with the ioqueue, application
+ *    may receive EAGAIN or EWOULDBLOCK in the receive callback.
+ *    This error happens because more than one thread is watching for
+ *    the same descriptor set, so when all of them call recv() or recvfrom()
+ *    simultaneously, only one will succeed and the rest will get the error.
+ *
+ */
+#define THIS_FILE   "ioq_select"
+
+/*
+ * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
+ * the correct error code.
+ */
+#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
+#   error "Error reporting must be enabled for this function to work!"
+#endif
+
+/**
+ * Get the number of descriptors in the set. This is defined in sock_select.c
+ * This function will only return the number of sockets set from PJ_FD_SET
+ * operation. When the set is modified by other means (such as by select()),
+ * the count will not be reflected here.
+ *
+ * That's why don't export this function in the header file, to avoid
+ * misunderstanding.
+ *
+ * @param fdsetp    The descriptor set.
+ *
+ * @return          Number of descriptors in the set.
+ */
+PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
+
+
+/*
+ * During debugging build, VALIDATE_FD_SET is set.
+ * This will check the validity of the fd_sets.
+ */
+/*
+#if defined(PJ_DEBUG) && PJ_DEBUG != 0
+#  define VALIDATE_FD_SET		1
+#else
+#  define VALIDATE_FD_SET		0
+#endif
+*/
+#define VALIDATE_FD_SET     0
+
+/*
+ * This describes each key.
+ */
+struct pj_ioqueue_key_t
+{
+    DECLARE_COMMON_KEY
+};
+
+/*
+ * This describes the I/O queue itself.
+ */
+struct pj_ioqueue_t
+{
+    DECLARE_COMMON_IOQUEUE
+
+    unsigned		max, count;
+    pj_ioqueue_key_t	key_list;
+    pj_fd_set_t		rfdset;
+    pj_fd_set_t		wfdset;
+#if PJ_HAS_TCP
+    pj_fd_set_t		xfdset;
+#endif
+};
+
+/* Include implementation for common abstraction after we declare
+ * pj_ioqueue_key_t and pj_ioqueue_t.
+ */
+#include "ioqueue_common_abs.c"
+
+/*
+ * pj_ioqueue_name()
+ */
+PJ_DEF(const char*) pj_ioqueue_name(void)
+{
+    return "select";
+}
+
+/*
+ * pj_ioqueue_create()
+ *
+ * Create select ioqueue.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, 
+                                       pj_size_t max_fd,
+                                       pj_ioqueue_t **p_ioqueue)
+{
+    pj_ioqueue_t *ioqueue;
+    pj_lock_t *lock;
+    pj_status_t rc;
+
+    /* Check that arguments are valid. */
+    PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && 
+                     max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES, 
+                     PJ_EINVAL);
+
+    /* Check that size of pj_ioqueue_op_key_t is sufficient */
+    PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
+                     sizeof(union operation_key), PJ_EBUG);
+
+    ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
+
+    ioqueue_init(ioqueue);
+
+    ioqueue->max = max_fd;
+    ioqueue->count = 0;
+    PJ_FD_ZERO(&ioqueue->rfdset);
+    PJ_FD_ZERO(&ioqueue->wfdset);
+#if PJ_HAS_TCP
+    PJ_FD_ZERO(&ioqueue->xfdset);
+#endif
+    pj_list_init(&ioqueue->key_list);
+
+    rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
+    if (rc != PJ_SUCCESS)
+	return rc;
+
+    rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
+    if (rc != PJ_SUCCESS)
+        return rc;
+
+    PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
+
+    *p_ioqueue = ioqueue;
+    return PJ_SUCCESS;
+}
+
+/*
+ * pj_ioqueue_destroy()
+ *
+ * Destroy ioqueue.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
+{
+    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
+
+    pj_lock_acquire(ioqueue->lock);
+    return ioqueue_destroy(ioqueue);
+}
+
+
+/*
+ * pj_ioqueue_register_sock()
+ *
+ * Register a handle to ioqueue.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
+					      pj_ioqueue_t *ioqueue,
+					      pj_sock_t sock,
+					      void *user_data,
+					      const pj_ioqueue_callback *cb,
+                                              pj_ioqueue_key_t **p_key)
+{
+    pj_ioqueue_key_t *key = NULL;
+    pj_uint32_t value;
+    pj_status_t rc = PJ_SUCCESS;
+    
+    PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
+                     cb && p_key, PJ_EINVAL);
+
+    pj_lock_acquire(ioqueue->lock);
+
+    if (ioqueue->count >= ioqueue->max) {
+        rc = PJ_ETOOMANY;
+	goto on_return;
+    }
+
+    /* Set socket to nonblocking. */
+    value = 1;
+#ifdef PJ_WIN32
+    if (ioctlsocket(sock, FIONBIO, (u_long*)&value)) {
+#else
+    if (ioctl(sock, FIONBIO, &value)) {
+#endif
+        rc = pj_get_netos_error();
+	goto on_return;
+    }
+
+    /* Create key. */
+    key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
+    rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
+    if (rc != PJ_SUCCESS) {
+	key = NULL;
+	goto on_return;
+    }
+
+    /* Register */
+    pj_list_insert_before(&ioqueue->key_list, key);
+    ++ioqueue->count;
+
+on_return:
+    /* On error, socket may be left in non-blocking mode. */
+    *p_key = key;
+    pj_lock_release(ioqueue->lock);
+    
+    return rc;
+}
+
+/*
+ * pj_ioqueue_unregister()
+ *
+ * Unregister handle from ioqueue.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
+{
+    pj_ioqueue_t *ioqueue;
+
+    PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+    ioqueue = key->ioqueue;
+
+    pj_lock_acquire(ioqueue->lock);
+
+    pj_assert(ioqueue->count > 0);
+    --ioqueue->count;
+    pj_list_erase(key);
+    PJ_FD_CLR(key->fd, &ioqueue->rfdset);
+    PJ_FD_CLR(key->fd, &ioqueue->wfdset);
+#if PJ_HAS_TCP
+    PJ_FD_CLR(key->fd, &ioqueue->xfdset);
+#endif
+
+    /* ioqueue_destroy may try to acquire key's mutex.
+     * Since normally the order of locking is to lock key's mutex first
+     * then ioqueue's mutex, ioqueue_destroy may deadlock unless we
+     * release ioqueue's mutex first.
+     */
+    pj_lock_release(ioqueue->lock);
+
+    /* Destroy the key. */
+    ioqueue_destroy_key(key);
+
+    return PJ_SUCCESS;
+}
+
+
+/* This supposed to check whether the fd_set values are consistent
+ * with the operation currently set in each key.
+ */
+#if VALIDATE_FD_SET
+static void validate_sets(const pj_ioqueue_t *ioqueue,
+			  const pj_fd_set_t *rfdset,
+			  const pj_fd_set_t *wfdset,
+			  const pj_fd_set_t *xfdset)
+{
+    pj_ioqueue_key_t *key;
+
+    /*
+     * This basicly would not work anymore.
+     * We need to lock key before performing the check, but we can't do
+     * so because we're holding ioqueue mutex. If we acquire key's mutex
+     * now, the will cause deadlock.
+     */
+    pj_assert(0);
+
+    key = ioqueue->key_list.next;
+    while (key != &ioqueue->key_list) {
+	if (!pj_list_empty(&key->read_list)
+#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
+	    || !pj_list_empty(&key->accept_list)
+#endif
+	    ) 
+	{
+	    pj_assert(PJ_FD_ISSET(key->fd, rfdset));
+	} 
+	else {
+	    pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
+	}
+	if (!pj_list_empty(&key->write_list)
+#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
+	    || key->connecting
+#endif
+	   )
+	{
+	    pj_assert(PJ_FD_ISSET(key->fd, wfdset));
+	}
+	else {
+	    pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
+	}
+#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
+	if (key->connecting)
+	{
+	    pj_assert(PJ_FD_ISSET(key->fd, xfdset));
+	}
+	else {
+	    pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0);
+	}
+#endif /* PJ_HAS_TCP */
+
+	key = key->next;
+    }
+}
+#endif	/* VALIDATE_FD_SET */
+
+
+/* ioqueue_remove_from_set()
+ * This function is called from ioqueue_dispatch_event() to instruct
+ * the ioqueue to remove the specified descriptor from ioqueue's descriptor
+ * set for the specified event.
+ */
+static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
+                                     pj_sock_t fd, 
+                                     enum ioqueue_event_type event_type)
+{
+    pj_lock_acquire(ioqueue->lock);
+
+    if (event_type == READABLE_EVENT)
+        PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset);
+    else if (event_type == WRITEABLE_EVENT)
+        PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset);
+    else if (event_type == EXCEPTION_EVENT)
+        PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset);
+    else
+        pj_assert(0);
+
+    pj_lock_release(ioqueue->lock);
+}
+
+/*
+ * ioqueue_add_to_set()
+ * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
+ * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
+ * set for the specified event.
+ */
+static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
+                                pj_sock_t fd,
+                                enum ioqueue_event_type event_type )
+{
+    pj_lock_acquire(ioqueue->lock);
+
+    if (event_type == READABLE_EVENT)
+        PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset);
+    else if (event_type == WRITEABLE_EVENT)
+        PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset);
+    else if (event_type == EXCEPTION_EVENT)
+        PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset);
+    else
+        pj_assert(0);
+
+    pj_lock_release(ioqueue->lock);
+}
+
+/*
+ * pj_ioqueue_poll()
+ *
+ * Few things worth written:
+ *
+ *  - we used to do only one callback called per poll, but it didn't go
+ *    very well. The reason is because on some situation, the write 
+ *    callback gets called all the time, thus doesn't give the read
+ *    callback to get called. This happens, for example, when user
+ *    submit write operation inside the write callback.
+ *    As the result, we changed the behaviour so that now multiple
+ *    callbacks are called in a single poll. It should be fast too,
+ *    just that we need to be carefull with the ioqueue data structs.
+ *
+ *  - to guarantee preemptiveness etc, the poll function must strictly
+ *    work on fd_set copy of the ioqueue (not the original one).
+ */
+PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
+{
+    pj_fd_set_t rfdset, wfdset, xfdset;
+    int count, counter;
+    pj_ioqueue_key_t *h;
+    struct event
+    {
+        pj_ioqueue_key_t	*key;
+        enum ioqueue_event_type  event_type;
+    } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
+
+    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
+
+    /* Lock ioqueue before making fd_set copies */
+    pj_lock_acquire(ioqueue->lock);
+
+    /* We will only do select() when there are sockets to be polled.
+     * Otherwise select() will return error.
+     */
+    if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
+        PJ_FD_COUNT(&ioqueue->wfdset)==0 &&
+        PJ_FD_COUNT(&ioqueue->xfdset)==0)
+    {
+        pj_lock_release(ioqueue->lock);
+        if (timeout)
+            pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
+        return 0;
+    }
+
+    /* Copy ioqueue's pj_fd_set_t to local variables. */
+    pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));
+    pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));
+#if PJ_HAS_TCP
+    pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));
+#else
+    PJ_FD_ZERO(&xfdset);
+#endif
+
+#if VALIDATE_FD_SET
+    validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);
+#endif
+
+    /* Unlock ioqueue before select(). */
+    pj_lock_release(ioqueue->lock);
+
+    count = pj_sock_select(FD_SETSIZE, &rfdset, &wfdset, &xfdset, timeout);
+    
+    if (count <= 0)
+	return count;
+    else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
+        count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;
+
+    /* Scan descriptor sets for event and add the events in the event
+     * array to be processed later in this function. We do this so that
+     * events can be processed in parallel without holding ioqueue lock.
+     */
+    pj_lock_acquire(ioqueue->lock);
+
+    counter = 0;
+
+    /* Scan for writable sockets first to handle piggy-back data
+     * coming with accept().
+     */
+    h = ioqueue->key_list.next;
+    for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) {
+	if ( (key_has_pending_write(h) || key_has_pending_connect(h))
+	     && PJ_FD_ISSET(h->fd, &wfdset))
+        {
+            event[counter].key = h;
+            event[counter].event_type = WRITEABLE_EVENT;
+            ++counter;
+        }
+
+        /* Scan for readable socket. */
+	if ((key_has_pending_read(h) || key_has_pending_accept(h))
+            && PJ_FD_ISSET(h->fd, &rfdset))
+        {
+            event[counter].key = h;
+            event[counter].event_type = READABLE_EVENT;
+            ++counter;
+	}
+
+#if PJ_HAS_TCP
+        if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) {
+            event[counter].key = h;
+            event[counter].event_type = EXCEPTION_EVENT;
+            ++counter;
+        }
+#endif
+    }
+
+    pj_lock_release(ioqueue->lock);
+
+    count = counter;
+
+    /* Now process all events. The dispatch functions will take care
+     * of locking in each of the key
+     */
+    for (counter=0; counter<count; ++counter) {
+        switch (event[counter].event_type) {
+        case READABLE_EVENT:
+            ioqueue_dispatch_read_event(ioqueue, event[counter].key);
+            break;
+        case WRITEABLE_EVENT:
+            ioqueue_dispatch_write_event(ioqueue, event[counter].key);
+            break;
+        case EXCEPTION_EVENT:
+            ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
+            break;
+        case NO_EVENT:
+            pj_assert(!"Invalid event!");
+            break;
+        }
+    }
+
+    return count;
+}
+