Tested new ioqueue framework on Linux with select and epoll

git-svn-id: https://svn.pjsip.org/repos/pjproject/main@14 74dad513-b988-da41-8d7b-12977e46ad98
diff --git a/pjlib/build/Makefile b/pjlib/build/Makefile
index 21b2d38..e6a1bd9 100644
--- a/pjlib/build/Makefile
+++ b/pjlib/build/Makefile
@@ -60,7 +60,7 @@
 ###############################################################################
 # Gather all flags.
 #
-export _CFLAGS 	:= -O2 $(CC_CFLAGS) $(OS_CFLAGS) $(HOST_CFLAGS) $(M_CFLAGS) \
+export _CFLAGS 	:= -O2 -g $(CC_CFLAGS) $(OS_CFLAGS) $(HOST_CFLAGS) $(M_CFLAGS) \
 		   $(CFLAGS) $(CC_INC)../include
 export _CXXFLAGS:= $(_CFLAGS) $(CC_CXXFLAGS) $(OS_CXXFLAGS) $(M_CXXFLAGS) \
 		   $(HOST_CXXFLAGS) $(CXXFLAGS)
@@ -98,7 +98,6 @@
 
 	
 export CC_OUT CC AR RANLIB HOST_MV HOST_RM HOST_RMDIR HOST_MKDIR OBJEXT LD LDOUT 
-
 ###############################################################################
 # Main entry
 #
@@ -124,9 +123,12 @@
 
 dep: depend
 
-pjlib:
+pjlib: ../include/pj/config_site.h
 	$(MAKE) -f $(RULES_MAK) APP=PJLIB app=pjlib $(PJLIB_LIB)
 
+../include/pj/config_site.h:
+	touch ../include/pj/config_site.h
+	
 pjlib-test: 
 	$(MAKE) -f $(RULES_MAK) APP=TEST app=pjlib-test $(TEST_EXE)
 
diff --git a/pjlib/build/os-linux.mak b/pjlib/build/os-linux.mak
index fd9fc30..ab22041 100644
--- a/pjlib/build/os-linux.mak
+++ b/pjlib/build/os-linux.mak
@@ -14,8 +14,8 @@
 			os_time_ansi.o \
 			pool_policy_malloc.o sock_bsd.o sock_select.o
 
-export PJLIB_OBJS += ioqueue_select.o 
-#export PJLIB_OBJS += ioqueue_epoll.o
+#export PJLIB_OBJS += ioqueue_select.o 
+export PJLIB_OBJS += ioqueue_epoll.o
 
 #
 # TEST_OBJS are operating system specific object files to be included in
diff --git a/pjlib/include/pj/compat/os_linux.h b/pjlib/include/pj/compat/os_linux.h
index a2f97f6..efb661a 100644
--- a/pjlib/include/pj/compat/os_linux.h
+++ b/pjlib/include/pj/compat/os_linux.h
@@ -54,22 +54,22 @@
 #define PJ_HAS_WINSOCK_H	    0
 #define PJ_HAS_WINSOCK2_H	    0
 
-#define PJ_SOCK_HAS_INET_ATON	    1

-

-/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return

- * the status of non-blocking connect() operation.

+#define PJ_SOCK_HAS_INET_ATON	    1
+
+/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return
+ * the status of non-blocking connect() operation.
  */
-#define PJ_HAS_SO_ERROR             1

-

-/* This value specifies the value set in errno by the OS when a non-blocking

- * socket recv() can not return immediate daata.

- */

-#define PJ_BLOCKING_ERROR_VAL       EAGAIN

-

-/* This value specifies the value set in errno by the OS when a non-blocking

- * socket connect() can not get connected immediately.

- */

-#define PJ_BLOCKING_CONNECT_ERROR_VAL   EINPROGRESS

+#define PJ_HAS_SO_ERROR             1
+
+/* This value specifies the value set in errno by the OS when a non-blocking
+ * socket recv() can not return immediate daata.
+ */
+#define PJ_BLOCKING_ERROR_VAL       EAGAIN
+
+/* This value specifies the value set in errno by the OS when a non-blocking
+ * socket connect() can not get connected immediately.
+ */
+#define PJ_BLOCKING_CONNECT_ERROR_VAL   EINPROGRESS
 
 /* Default threading is enabled, unless it's overridden. */
 #ifndef PJ_HAS_THREADS
diff --git a/pjlib/include/pj/compat/os_linux_kernel.h b/pjlib/include/pj/compat/os_linux_kernel.h
index 0d44ef0..ccae341 100644
--- a/pjlib/include/pj/compat/os_linux_kernel.h
+++ b/pjlib/include/pj/compat/os_linux_kernel.h
@@ -52,21 +52,21 @@
 #define PJ_HAS_WINSOCK2_H	    0
 
 #define PJ_SOCK_HAS_INET_ATON	    0
-

-/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return

- * the status of non-blocking connect() operation.

- */

-#define PJ_HAS_SO_ERROR             1

-

-/* This value specifies the value set in errno by the OS when a non-blocking

- * socket recv() can not return immediate daata.

- */

-#define PJ_BLOCKING_ERROR_VAL       EAGAIN

-

-/* This value specifies the value set in errno by the OS when a non-blocking

- * socket connect() can not get connected immediately.

- */

-#define PJ_BLOCKING_CONNECT_ERROR_VAL   EINPROGRESS

+
+/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return
+ * the status of non-blocking connect() operation.
+ */
+#define PJ_HAS_SO_ERROR             1
+
+/* This value specifies the value set in errno by the OS when a non-blocking
+ * socket recv() can not return immediate daata.
+ */
+#define PJ_BLOCKING_ERROR_VAL       EAGAIN
+
+/* This value specifies the value set in errno by the OS when a non-blocking
+ * socket connect() can not get connected immediately.
+ */
+#define PJ_BLOCKING_CONNECT_ERROR_VAL   EINPROGRESS
 
 #ifndef PJ_HAS_THREADS
 #  define PJ_HAS_THREADS	    (1)
diff --git a/pjlib/include/pj/compat/os_sunos.h b/pjlib/include/pj/compat/os_sunos.h
index 990fb57..87c408a 100644
--- a/pjlib/include/pj/compat/os_sunos.h
+++ b/pjlib/include/pj/compat/os_sunos.h
@@ -39,21 +39,21 @@
 #define PJ_HAS_WINSOCK2_H	    0
 
 #define PJ_SOCK_HAS_INET_ATON	    0
-

-/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return

- * the status of non-blocking connect() operation.

- */

-#define PJ_HAS_SO_ERROR             0

-

-/* This value specifies the value set in errno by the OS when a non-blocking

- * socket recv() can not return immediate daata.

- */

-#define PJ_BLOCKING_ERROR_VAL       EWOULDBLOCK

-

-/* This value specifies the value set in errno by the OS when a non-blocking

- * socket connect() can not get connected immediately.

- */

-#define PJ_BLOCKING_CONNECT_ERROR_VAL   EINPROGRESS

+
+/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return
+ * the status of non-blocking connect() operation.
+ */
+#define PJ_HAS_SO_ERROR             0
+
+/* This value specifies the value set in errno by the OS when a non-blocking
+ * socket recv() can not return immediate daata.
+ */
+#define PJ_BLOCKING_ERROR_VAL       EWOULDBLOCK
+
+/* This value specifies the value set in errno by the OS when a non-blocking
+ * socket connect() can not get connected immediately.
+ */
+#define PJ_BLOCKING_CONNECT_ERROR_VAL   EINPROGRESS
 
 /* Default threading is enabled, unless it's overridden. */
 #ifndef PJ_HAS_THREADS
diff --git a/pjlib/include/pj/compat/os_win32.h b/pjlib/include/pj/compat/os_win32.h
index 87ff752..e8391b9 100644
--- a/pjlib/include/pj/compat/os_win32.h
+++ b/pjlib/include/pj/compat/os_win32.h
@@ -59,21 +59,21 @@
 #define PJ_HAS_WINSOCK2_H	    1
 
 #define PJ_SOCK_HAS_INET_ATON	    0
-

-/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return

- * the status of non-blocking connect() operation.

- */

-#define PJ_HAS_SO_ERROR             0

-

-/* This value specifies the value set in errno by the OS when a non-blocking

- * socket recv() or send() can not return immediately.

- */

-#define PJ_BLOCKING_ERROR_VAL       WSAEWOULDBLOCK

-

-/* This value specifies the value set in errno by the OS when a non-blocking

- * socket connect() can not get connected immediately.

- */

-#define PJ_BLOCKING_CONNECT_ERROR_VAL   WSAEWOULDBLOCK

+
+/* When this macro is set, getsockopt(SOL_SOCKET, SO_ERROR) will return
+ * the status of non-blocking connect() operation.
+ */
+#define PJ_HAS_SO_ERROR             0
+
+/* This value specifies the value set in errno by the OS when a non-blocking
+ * socket recv() or send() can not return immediately.
+ */
+#define PJ_BLOCKING_ERROR_VAL       WSAEWOULDBLOCK
+
+/* This value specifies the value set in errno by the OS when a non-blocking
+ * socket connect() can not get connected immediately.
+ */
+#define PJ_BLOCKING_CONNECT_ERROR_VAL   WSAEWOULDBLOCK
 
 /* Default threading is enabled, unless it's overridden. */
 #ifndef PJ_HAS_THREADS
diff --git a/pjlib/include/pj/ioqueue.h b/pjlib/include/pj/ioqueue.h
index 2e084fd..ce30c9f 100644
--- a/pjlib/include/pj/ioqueue.h
+++ b/pjlib/include/pj/ioqueue.h
@@ -47,53 +47,53 @@
  * @ingroup PJ_IO
  * @{
  *
- * I/O Queue provides API for performing asynchronous I/O operations. It

- * conforms to proactor pattern, which allows application to submit an

- * asynchronous operation and to be notified later when the operation has

+ * I/O Queue provides API for performing asynchronous I/O operations. It
+ * conforms to proactor pattern, which allows application to submit an
+ * asynchronous operation and to be notified later when the operation has
  * completed.
- *

- * The framework works natively in platforms where asynchronous operation API

- * exists, such as in Windows NT with IoCompletionPort/IOCP. In other 

- * platforms, the I/O queue abstracts the operating system's event poll API

- * to provide semantics similar to IoCompletionPort with minimal penalties

- * (i.e. per ioqueue and per handle mutex protection).

- *

- * The I/O queue provides more than just unified abstraction. It also:

- *  - makes sure that the operation uses the most effective way to utilize

- *    the underlying mechanism, to provide the maximum theoritical

- *    throughput possible on a given platform.

- *  - choose the most efficient mechanism for event polling on a given

- *    platform.

+ *
+ * The framework works natively in platforms where asynchronous operation API
+ * exists, such as in Windows NT with IoCompletionPort/IOCP. In other 
+ * platforms, the I/O queue abstracts the operating system's event poll API
+ * to provide semantics similar to IoCompletionPort with minimal penalties
+ * (i.e. per ioqueue and per handle mutex protection).
+ *
+ * The I/O queue provides more than just unified abstraction. It also:
+ *  - makes sure that the operation uses the most effective way to utilize
+ *    the underlying mechanism, to provide the maximum theoritical
+ *    throughput possible on a given platform.
+ *  - choose the most efficient mechanism for event polling on a given
+ *    platform.
  *
  * Currently, the I/O Queue is implemented using:
- *  - <tt><b>select()</b></tt>, as the common denominator, but the least 

- *    efficient. Also the number of descriptor is limited to 

- *    \c PJ_IOQUEUE_MAX_HANDLES (which by default is 64).

- *  - <tt><b>/dev/epoll</b></tt> on Linux (user mode and kernel mode), 

- *    a much faster replacement for select() on Linux (and more importantly

+ *  - <tt><b>select()</b></tt>, as the common denominator, but the least 
+ *    efficient. Also the number of descriptor is limited to 
+ *    \c PJ_IOQUEUE_MAX_HANDLES (which by default is 64).
+ *  - <tt><b>/dev/epoll</b></tt> on Linux (user mode and kernel mode), 
+ *    a much faster replacement for select() on Linux (and more importantly
  *    doesn't have limitation on number of descriptors).
- *  - <b>I/O Completion ports</b> on Windows NT/2000/XP, which is the most 

- *    efficient way to dispatch events in Windows NT based OSes, and most 

- *    importantly, it doesn't have the limit on how many handles to monitor.

+ *  - <b>I/O Completion ports</b> on Windows NT/2000/XP, which is the most 
+ *    efficient way to dispatch events in Windows NT based OSes, and most 
+ *    importantly, it doesn't have the limit on how many handles to monitor.
  *    And it works with files (not only sockets) as well.
- *

- *

- * \section pj_ioqueue_concurrency_sec Concurrency Rules

- *

- * The items below describe rules that must be obeyed when using the I/O 

- * queue, with regard to concurrency:

- *  - simultaneous operations (by different threads) to different key is safe.

- *  - simultaneous operations to the same key is also safe, except

- *    <b>unregistration</b>, which is described below.

- *  - <b>care must be taken when unregistering a key</b> from the

- *    ioqueue. Application must take care that when one thread is issuing

- *    an unregistration, other thread is not simultaneously invoking an

- *    operation <b>to the same key</b>.

- *\n

- *    This happens because the ioqueue functions are working with a pointer

- *    to the key, and there is a possible race condition where the pointer

- *    has been rendered invalid by other threads before the ioqueue has a

- *    chance to acquire mutex on it.

+ *
+ *
+ * \section pj_ioqueue_concurrency_sec Concurrency Rules
+ *
+ * The items below describe rules that must be obeyed when using the I/O 
+ * queue, with regard to concurrency:
+ *  - simultaneous operations (by different threads) to different key is safe.
+ *  - simultaneous operations to the same key is also safe, except
+ *    <b>unregistration</b>, which is described below.
+ *  - <b>care must be taken when unregistering a key</b> from the
+ *    ioqueue. Application must take care that when one thread is issuing
+ *    an unregistration, other thread is not simultaneously invoking an
+ *    operation <b>to the same key</b>.
+ *\n
+ *    This happens because the ioqueue functions are working with a pointer
+ *    to the key, and there is a possible race condition where the pointer
+ *    has been rendered invalid by other threads before the ioqueue has a
+ *    chance to acquire mutex on it.
  *
  * \section pj_ioqeuue_examples_sec Examples
  *
@@ -103,26 +103,26 @@
  *  - \ref page_pjlib_ioqueue_udp_test
  *  - \ref page_pjlib_ioqueue_perf_test
  */
-

-

-

-/**

- * This structure describes operation specific key to be submitted to

- * I/O Queue when performing the asynchronous operation. This key will

- * be returned to the application when completion callback is called.

- *

- * Application normally wants to attach it's specific data in the

- * \c user_data field so that it can keep track of which operation has

- * completed when the callback is called. Alternatively, application can

- * also extend this struct to include its data, because the pointer that

- * is returned in the completion callback will be exactly the same as

- * the pointer supplied when the asynchronous function is called.

- */

-typedef struct pj_ioqueue_op_key_t

-{ 

-    void *internal__[32];           /**< Internal I/O Queue data.   */

-    void *user_data;                /**< Application data.          */

-} pj_ioqueue_op_key_t;

+
+
+
+/**
+ * This structure describes operation specific key to be submitted to
+ * I/O Queue when performing the asynchronous operation. This key will
+ * be returned to the application when completion callback is called.
+ *
+ * Application normally wants to attach it's specific data in the
+ * \c user_data field so that it can keep track of which operation has
+ * completed when the callback is called. Alternatively, application can
+ * also extend this struct to include its data, because the pointer that
+ * is returned in the completion callback will be exactly the same as
+ * the pointer supplied when the asynchronous function is called.
+ */
+typedef struct pj_ioqueue_op_key_t
+{ 
+    void *internal__[32];           /**< Internal I/O Queue data.   */
+    void *user_data;                /**< Application data.          */
+} pj_ioqueue_op_key_t;
 
 /**
  * This structure describes the callbacks to be called when I/O operation
@@ -134,58 +134,58 @@
      * This callback is called when #pj_ioqueue_recv or #pj_ioqueue_recvfrom
      * completes.
      *
-     * @param key	    The key.

+     * @param key	    The key.
      * @param op_key        Operation key.
-     * @param bytes_read    >= 0 to indicate the amount of data read, 

-     *                      otherwise negative value containing the error

-     *                      code. To obtain the pj_status_t error code, use

+     * @param bytes_read    >= 0 to indicate the amount of data read, 
+     *                      otherwise negative value containing the error
+     *                      code. To obtain the pj_status_t error code, use
      *                      (pj_status_t code = -bytes_read).
      */
-    void (*on_read_complete)(pj_ioqueue_key_t *key, 

-                             pj_ioqueue_op_key_t *op_key, 

+    void (*on_read_complete)(pj_ioqueue_key_t *key, 
+                             pj_ioqueue_op_key_t *op_key, 
                              pj_ssize_t bytes_read);
 
     /**
      * This callback is called when #pj_ioqueue_write or #pj_ioqueue_sendto
      * completes.
      *
-     * @param key	    The key.

+     * @param key	    The key.
      * @param op_key        Operation key.
-     * @param bytes_sent    >= 0 to indicate the amount of data written, 

-     *                      otherwise negative value containing the error

-     *                      code. To obtain the pj_status_t error code, use

-     *                      (pj_status_t code = -bytes_sent).

+     * @param bytes_sent    >= 0 to indicate the amount of data written, 
+     *                      otherwise negative value containing the error
+     *                      code. To obtain the pj_status_t error code, use
+     *                      (pj_status_t code = -bytes_sent).
      */
-    void (*on_write_complete)(pj_ioqueue_key_t *key, 

-                              pj_ioqueue_op_key_t *op_key, 

+    void (*on_write_complete)(pj_ioqueue_key_t *key, 
+                              pj_ioqueue_op_key_t *op_key, 
                               pj_ssize_t bytes_sent);
 
     /**
      * This callback is called when #pj_ioqueue_accept completes.
      *
-     * @param key	    The key.

+     * @param key	    The key.
      * @param op_key        Operation key.
      * @param sock          Newly connected socket.
      * @param status	    Zero if the operation completes successfully.
      */
-    void (*on_accept_complete)(pj_ioqueue_key_t *key, 

-                               pj_ioqueue_op_key_t *op_key, 

+    void (*on_accept_complete)(pj_ioqueue_key_t *key, 
+                               pj_ioqueue_op_key_t *op_key, 
                                pj_sock_t sock, 
                                pj_status_t status);
 
     /**
      * This callback is called when #pj_ioqueue_connect completes.
      *
-     * @param key	    The key.

+     * @param key	    The key.
      * @param status	    PJ_SUCCESS if the operation completes successfully.
      */
-    void (*on_connect_complete)(pj_ioqueue_key_t *key, 

+    void (*on_connect_complete)(pj_ioqueue_key_t *key, 
                                 pj_status_t status);
 } pj_ioqueue_callback;
 
 
 /**
- * Types of pending I/O Queue operation. This enumeration is only used

+ * Types of pending I/O Queue operation. This enumeration is only used
  * internally within the ioqueue.
  */
 typedef enum pj_ioqueue_operation_e
@@ -204,17 +204,17 @@
 } pj_ioqueue_operation_e;
 
 
-/**

- * This macro specifies the maximum number of events that can be

- * processed by the ioqueue on a single poll cycle, on implementation

- * that supports it. The value is only meaningfull when specified

- * during PJLIB build.

- */

-#ifndef PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL

-#   define PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL     (16)

-#endif

+/**
+ * This macro specifies the maximum number of events that can be
+ * processed by the ioqueue on a single poll cycle, on implementation
+ * that supports it. The value is only meaningfull when specified
+ * during PJLIB build.
+ */
+#ifndef PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL
+#   define PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL     (16)
+#endif
 
-

+
 /**
  * Create a new I/O Queue framework.
  *
@@ -270,9 +270,9 @@
  * @param sock	    The socket.
  * @param user_data User data to be associated with the key, which can be
  *		    retrieved later.
- * @param cb	    Callback to be called when I/O operation completes. 

- * @param key       Pointer to receive the key to be associated with this

- *                  socket. Subsequent I/O queue operation will need this

+ * @param cb	    Callback to be called when I/O operation completes. 
+ * @param key       Pointer to receive the key to be associated with this
+ *                  socket. Subsequent I/O queue operation will need this
  *                  key.
  *
  * @return	    PJ_SUCCESS on success, or the error code.
@@ -281,17 +281,17 @@
 					       pj_ioqueue_t *ioque,
 					       pj_sock_t sock,
 					       void *user_data,
-					       const pj_ioqueue_callback *cb,

+					       const pj_ioqueue_callback *cb,
                                                pj_ioqueue_key_t **key );
 
 /**
- * Unregister from the I/O Queue framework. Caller must make sure that

- * the key doesn't have any pending operation before calling this function,

- * or otherwise the behaviour is undefined (either callback will be called

- * later when the data is sent/received, or the callback will not be called,

+ * Unregister from the I/O Queue framework. Caller must make sure that
+ * the key doesn't have any pending operation before calling this function,
+ * or otherwise the behaviour is undefined (either callback will be called
+ * later when the data is sent/received, or the callback will not be called,
  * or even something else).
  *
- * @param key	    The key that was previously obtained from registration.

+ * @param key	    The key that was previously obtained from registration.
  *
  * @return          PJ_SUCCESS on success or the error code.
  */
@@ -300,44 +300,44 @@
 
 /**
  * Get user data associated with an ioqueue key.
- *

- * @param key	    The key that was previously obtained from registration.

  *
- * @return          The user data associated with the descriptor, or NULL 

+ * @param key	    The key that was previously obtained from registration.
+ *
+ * @return          The user data associated with the descriptor, or NULL 
  *                  on error or if no data is associated with the key during
  *                  registration.
  */
 PJ_DECL(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key );
-

-/**

- * Set or change the user data to be associated with the file descriptor or

- * handle or socket descriptor.

- *

- * @param key	    The key that was previously obtained from registration.

- * @param user_data User data to be associated with the descriptor.

- * @param old_data  Optional parameter to retrieve the old user data.

- *

- * @return          PJ_SUCCESS on success or the error code.

- */

-PJ_DECL(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,

-                                               void *user_data,

-                                               void **old_data);

+
+/**
+ * Set or change the user data to be associated with the file descriptor or
+ * handle or socket descriptor.
+ *
+ * @param key	    The key that was previously obtained from registration.
+ * @param user_data User data to be associated with the descriptor.
+ * @param old_data  Optional parameter to retrieve the old user data.
+ *
+ * @return          PJ_SUCCESS on success or the error code.
+ */
+PJ_DECL(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
+                                               void *user_data,
+                                               void **old_data);
 
 
 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
 /**
  * Instruct I/O Queue to accept incoming connection on the specified 
- * listening socket. This function will return immediately (i.e. non-blocking)

- * regardless whether a connection is immediately available. If the function

- * can't complete immediately, the caller will be notified about the incoming

- * connection when it calls pj_ioqueue_poll(). If a new connection is

- * immediately available, the function returns PJ_SUCCESS with the new

+ * listening socket. This function will return immediately (i.e. non-blocking)
+ * regardless whether a connection is immediately available. If the function
+ * can't complete immediately, the caller will be notified about the incoming
+ * connection when it calls pj_ioqueue_poll(). If a new connection is
+ * immediately available, the function returns PJ_SUCCESS with the new
  * connection; in this case, the callback WILL NOT be called.
  *
- * @param key	    The key which registered to the server socket.

- * @param op_key    An operation specific key to be associated with the

- *                  pending operation, so that application can keep track of

- *                  which operation has been completed when the callback is

+ * @param key	    The key which registered to the server socket.
+ * @param op_key    An operation specific key to be associated with the
+ *                  pending operation, so that application can keep track of
+ *                  which operation has been completed when the callback is
  *                  called.
  * @param new_sock  Argument which contain pointer to receive the new socket
  *                  for the incoming connection.
@@ -349,15 +349,15 @@
  *		    address, and on output, contains the actual length of the
  *		    address. This argument is optional.
  * @return
- *  - PJ_SUCCESS    When connection is available immediately, and the 

- *                  parameters will be updated to contain information about 

- *                  the new connection. In this case, a completion callback

+ *  - PJ_SUCCESS    When connection is available immediately, and the 
+ *                  parameters will be updated to contain information about 
+ *                  the new connection. In this case, a completion callback
  *                  WILL NOT be called.
- *  - PJ_EPENDING   If no connection is available immediately. When a new

+ *  - PJ_EPENDING   If no connection is available immediately. When a new
  *                  connection arrives, the callback will be called.
  *  - non-zero      which indicates the appropriate error code.
  */
-PJ_DECL(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,

+PJ_DECL(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
                                         pj_ioqueue_op_key_t *op_key,
 					pj_sock_t *sock,
 					pj_sockaddr_t *local,
@@ -366,9 +366,9 @@
 
 /**
  * Initiate non-blocking socket connect. If the socket can NOT be connected
- * immediately, asynchronous connect() will be scheduled and caller will be

- * notified via completion callback when it calls pj_ioqueue_poll(). If

- * socket is connected immediately, the function returns PJ_SUCCESS and

+ * immediately, asynchronous connect() will be scheduled and caller will be
+ * notified via completion callback when it calls pj_ioqueue_poll(). If
+ * socket is connected immediately, the function returns PJ_SUCCESS and
  * completion callback WILL NOT be called.
  *
  * @param key	    The key associated with TCP socket
@@ -376,7 +376,7 @@
  * @param addrlen   The remote address length.
  *
  * @return
- *  - PJ_SUCCESS    If socket is connected immediately. In this case, the

+ *  - PJ_SUCCESS    If socket is connected immediately. In this case, the
  *                  completion callback WILL NOT be called.
  *  - PJ_EPENDING   If operation is queued, or 
  *  - non-zero      Indicates the error code.
@@ -404,40 +404,40 @@
 
 
 /**
- * Instruct the I/O Queue to read from the specified handle. This function

- * returns immediately (i.e. non-blocking) regardless whether some data has 

- * been transfered. If the operation can't complete immediately, caller will 

- * be notified about the completion when it calls pj_ioqueue_poll(). If data

- * is immediately available, the function will return PJ_SUCCESS and the

- * callback WILL NOT be called.

+ * Instruct the I/O Queue to read from the specified handle. This function
+ * returns immediately (i.e. non-blocking) regardless whether some data has 
+ * been transfered. If the operation can't complete immediately, caller will 
+ * be notified about the completion when it calls pj_ioqueue_poll(). If data
+ * is immediately available, the function will return PJ_SUCCESS and the
+ * callback WILL NOT be called.
  *
  * @param key	    The key that uniquely identifies the handle.
- * @param op_key    An operation specific key to be associated with the

- *                  pending operation, so that application can keep track of

- *                  which operation has been completed when the callback is

- *                  called. Caller must make sure that this key remains 

- *                  valid until the function completes.

+ * @param op_key    An operation specific key to be associated with the
+ *                  pending operation, so that application can keep track of
+ *                  which operation has been completed when the callback is
+ *                  called. Caller must make sure that this key remains 
+ *                  valid until the function completes.
  * @param buffer    The buffer to hold the read data. The caller MUST make sure
  *		    that this buffer remain valid until the framework completes
  *		    reading the handle.
- * @param length    On input, it specifies the size of the buffer. If data is

- *                  available to be read immediately, the function returns

- *                  PJ_SUCCESS and this argument will be filled with the

- *                  amount of data read. If the function is pending, caller

- *                  will be notified about the amount of data read in the

- *                  callback. This parameter can point to local variable in

- *                  caller's stack and doesn't have to remain valid for the

+ * @param length    On input, it specifies the size of the buffer. If data is
+ *                  available to be read immediately, the function returns
+ *                  PJ_SUCCESS and this argument will be filled with the
+ *                  amount of data read. If the function is pending, caller
+ *                  will be notified about the amount of data read in the
+ *                  callback. This parameter can point to local variable in
+ *                  caller's stack and doesn't have to remain valid for the
  *                  duration of pending operation.
  * @param flags     Recv flag.
  *
  * @return
- *  - PJ_SUCCESS    If immediate data has been received in the buffer. In this

+ *  - PJ_SUCCESS    If immediate data has been received in the buffer. In this
  *                  case, the callback WILL NOT be called.
- *  - PJ_EPENDING   If the operation has been queued, and the callback will be

+ *  - PJ_EPENDING   If the operation has been queued, and the callback will be
  *                  called when data has been received.
  *  - non-zero      The return value indicates the error code.
  */
-PJ_DECL(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,

+PJ_DECL(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
                                       pj_ioqueue_op_key_t *op_key,
 				      void *buffer,
 				      pj_ssize_t *length,
@@ -450,26 +450,26 @@
  * remain valid until the framework completes reading the data.
  *
  * @param key	    The key that uniquely identifies the handle.
- * @param op_key    An operation specific key to be associated with the

- *                  pending operation, so that application can keep track of

- *                  which operation has been completed when the callback is

- *                  called.

+ * @param op_key    An operation specific key to be associated with the
+ *                  pending operation, so that application can keep track of
+ *                  which operation has been completed when the callback is
+ *                  called.
  * @param buffer    The buffer to hold the read data. The caller MUST make sure
  *		    that this buffer remain valid until the framework completes
  *		    reading the handle.
- * @param length    On input, it specifies the size of the buffer. If data is

- *                  available to be read immediately, the function returns

- *                  PJ_SUCCESS and this argument will be filled with the

- *                  amount of data read. If the function is pending, caller

- *                  will be notified about the amount of data read in the

- *                  callback. This parameter can point to local variable in

- *                  caller's stack and doesn't have to remain valid for the

- *                  duration of pending operation.

+ * @param length    On input, it specifies the size of the buffer. If data is
+ *                  available to be read immediately, the function returns
+ *                  PJ_SUCCESS and this argument will be filled with the
+ *                  amount of data read. If the function is pending, caller
+ *                  will be notified about the amount of data read in the
+ *                  callback. This parameter can point to local variable in
+ *                  caller's stack and doesn't have to remain valid for the
+ *                  duration of pending operation.
  * @param flags     Recv flag.
  * @param addr      Optional Pointer to buffer to receive the address.
  * @param addrlen   On input, specifies the length of the address buffer.
  *                  On output, it will be filled with the actual length of
- *                  the address. This argument can be NULL if \c addr is not

+ *                  the address. This argument can be NULL if \c addr is not
  *                  specified.
  *
  * @return
@@ -479,7 +479,7 @@
  *  - PJ_EPENDING   If the operation has been queued.
  *  - non-zero      The return value indicates the error code.
  */
-PJ_DECL(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,

+PJ_DECL(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
                                           pj_ioqueue_op_key_t *op_key,
 					  void *buffer,
 					  pj_ssize_t *length,
@@ -489,39 +489,39 @@
 
 
 /**
- * Instruct the I/O Queue to write to the handle. This function will return

- * immediately (i.e. non-blocking) regardless whether some data has been 

- * transfered. If the function can't complete immediately, the caller will

- * be notified about the completion when it calls pj_ioqueue_poll(). If 

- * operation completes immediately and data has been transfered, the function

- * returns PJ_SUCCESS and the callback will NOT be called.

- *

+ * Instruct the I/O Queue to write to the handle. This function will return
+ * immediately (i.e. non-blocking) regardless whether some data has been 
+ * transfered. If the function can't complete immediately, the caller will
+ * be notified about the completion when it calls pj_ioqueue_poll(). If 
+ * operation completes immediately and data has been transfered, the function
+ * returns PJ_SUCCESS and the callback will NOT be called.
+ *
  * @param key	    The key that identifies the handle.
- * @param op_key    An operation specific key to be associated with the

- *                  pending operation, so that application can keep track of

- *                  which operation has been completed when the callback is

- *                  called.

+ * @param op_key    An operation specific key to be associated with the
+ *                  pending operation, so that application can keep track of
+ *                  which operation has been completed when the callback is
+ *                  called.
  * @param data	    The data to send. Caller MUST make sure that this buffer 
  *		    remains valid until the write operation completes.
- * @param length    On input, it specifies the length of data to send. When

- *                  data was sent immediately, this function returns PJ_SUCCESS

- *                  and this parameter contains the length of data sent. If

- *                  data can not be sent immediately, an asynchronous operation

- *                  is scheduled and caller will be notified via callback the

- *                  number of bytes sent. This parameter can point to local 

- *                  variable on caller's stack and doesn't have to remain 

+ * @param length    On input, it specifies the length of data to send. When
+ *                  data was sent immediately, this function returns PJ_SUCCESS
+ *                  and this parameter contains the length of data sent. If
+ *                  data can not be sent immediately, an asynchronous operation
+ *                  is scheduled and caller will be notified via callback the
+ *                  number of bytes sent. This parameter can point to local 
+ *                  variable on caller's stack and doesn't have to remain 
  *                  valid until the operation has completed.
  * @param flags     Send flags.
  *
  * @return
- *  - PJ_SUCCESS    If data was immediately transfered. In this case, no

- *                  pending operation has been scheduled and the callback

+ *  - PJ_SUCCESS    If data was immediately transfered. In this case, no
+ *                  pending operation has been scheduled and the callback
  *                  WILL NOT be called.
- *  - PJ_EPENDING   If the operation has been queued. Once data base been

+ *  - PJ_EPENDING   If the operation has been queued. Once data base been
  *                  transfered, the callback will be called.
  *  - non-zero      The return value indicates the error code.
  */
-PJ_DECL(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,

+PJ_DECL(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
                                       pj_ioqueue_op_key_t *op_key,
 				      const void *data,
 				      pj_ssize_t *length,
@@ -533,20 +533,20 @@
  * pj_sock_sendto() (or equivalent) will be called to send the data.
  *
  * @param key	    the key that identifies the handle.
- * @param op_key    An operation specific key to be associated with the

- *                  pending operation, so that application can keep track of

- *                  which operation has been completed when the callback is

- *                  called.

+ * @param op_key    An operation specific key to be associated with the
+ *                  pending operation, so that application can keep track of
+ *                  which operation has been completed when the callback is
+ *                  called.
  * @param data	    the data to send. Caller MUST make sure that this buffer 
  *		    remains valid until the write operation completes.
- * @param length    On input, it specifies the length of data to send. When

- *                  data was sent immediately, this function returns PJ_SUCCESS

- *                  and this parameter contains the length of data sent. If

- *                  data can not be sent immediately, an asynchronous operation

- *                  is scheduled and caller will be notified via callback the

- *                  number of bytes sent. This parameter can point to local 

- *                  variable on caller's stack and doesn't have to remain 

- *                  valid until the operation has completed.

+ * @param length    On input, it specifies the length of data to send. When
+ *                  data was sent immediately, this function returns PJ_SUCCESS
+ *                  and this parameter contains the length of data sent. If
+ *                  data can not be sent immediately, an asynchronous operation
+ *                  is scheduled and caller will be notified via callback the
+ *                  number of bytes sent. This parameter can point to local 
+ *                  variable on caller's stack and doesn't have to remain 
+ *                  valid until the operation has completed.
  * @param flags     send flags.
  * @param addr      Optional remote address.
  * @param addrlen   Remote address length, \c addr is specified.
@@ -556,7 +556,7 @@
  *  - PJ_EPENDING   If the operation has been queued.
  *  - non-zero      The return value indicates the error code.
  */
-PJ_DECL(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,

+PJ_DECL(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
                                         pj_ioqueue_op_key_t *op_key,
 					const void *data,
 					pj_ssize_t *length,
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c
index b5599d9..774d53e 100644
--- a/pjlib/src/pj/ioqueue_common_abs.c
+++ b/pjlib/src/pj/ioqueue_common_abs.c
@@ -1,813 +1,813 @@
-/* $Id$ */

-

-#include <pj/ioqueue.h>

-#include <pj/errno.h>

-#include <pj/list.h>

-#include <pj/sock.h>

-#include <pj/lock.h>

-#include <pj/assert.h>

-#include <pj/string.h>

-

-

-static void ioqueue_init( pj_ioqueue_t *ioqueue )

-{

-    ioqueue->lock = NULL;

-    ioqueue->auto_delete_lock = 0;

-}

-

-static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)

-{

-    if (ioqueue->auto_delete_lock && ioqueue->lock )

-        return pj_lock_destroy(ioqueue->lock);

-    else

-        return PJ_SUCCESS;

-}

-

-/*

- * pj_ioqueue_set_lock()

- */

-PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, 

-					 pj_lock_t *lock,

-					 pj_bool_t auto_delete )

-{

-    PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);

-

-    if (ioqueue->auto_delete_lock && ioqueue->lock) {

-        pj_lock_destroy(ioqueue->lock);

-    }

-

-    ioqueue->lock = lock;

-    ioqueue->auto_delete_lock = auto_delete;

-

-    return PJ_SUCCESS;

-}

-

-static pj_status_t ioqueue_init_key( pj_pool_t *pool,

-                                     pj_ioqueue_t *ioqueue,

-                                     pj_ioqueue_key_t *key,

-                                     pj_sock_t sock,

-                                     void *user_data,

-                                     const pj_ioqueue_callback *cb)

-{

-    pj_status_t rc;

-    int optlen;

-

-    key->ioqueue = ioqueue;

-    key->fd = sock;

-    key->user_data = user_data;

-    pj_list_init(&key->read_list);

-    pj_list_init(&key->write_list);

-#if PJ_HAS_TCP

-    pj_list_init(&key->accept_list);

-#endif

-

-    /* Save callback. */

-    pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));

-

-    /* Get socket type. When socket type is datagram, some optimization

-     * will be performed during send to allow parallel send operations.

-     */

-    optlen = sizeof(key->fd_type);

-    rc = pj_sock_getsockopt(sock, PJ_SOL_SOCKET, PJ_SO_TYPE,

-                            &key->fd_type, &optlen);

-    if (rc != PJ_SUCCESS)

-        key->fd_type = PJ_SOCK_STREAM;

-

-    /* Create mutex for the key. */

-    rc = pj_mutex_create_simple(pool, NULL, &key->mutex);

-    

-    return rc;

-}

-

-static void ioqueue_destroy_key( pj_ioqueue_key_t *key )

-{

-    pj_mutex_destroy(key->mutex);

-}

-

-/*

- * pj_ioqueue_get_user_data()

- *

- * Obtain value associated with a key.

- */

-PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )

-{

-    PJ_ASSERT_RETURN(key != NULL, NULL);

-    return key->user_data;

-}

-

-/*

- * pj_ioqueue_set_user_data()

- */

-PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,

-                                              void *user_data,

-                                              void **old_data)

-{

-    PJ_ASSERT_RETURN(key, PJ_EINVAL);

-

-    if (old_data)

-        *old_data = key->user_data;

-    key->user_data = user_data;

-

-    return PJ_SUCCESS;

-}

-

-PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)

-{

-    return !pj_list_empty(&key->write_list);

-}

-

-PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)

-{

-    return !pj_list_empty(&key->read_list);

-}

-

-PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)

-{

-#if PJ_HAS_TCP

-    return !pj_list_empty(&key->accept_list);

-#else

-    return 0;

-#endif

-}

-

-PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)

-{

-    return key->connecting;

-}

-

-

-/*

- * ioqueue_dispatch_event()

- *

- * Report occurence of an event in the key to be processed by the

- * framework.

- */

-void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)

-{

-    /* Lock the key. */

-    pj_mutex_lock(h->mutex);

-

-#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0

-    if (h->connecting) {

-	/* Completion of connect() operation */

-	pj_ssize_t bytes_transfered;

-

-	/* Clear operation. */

-	h->connecting = 0;

-

-        ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);

-        ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);

-

-        /* Unlock; from this point we don't need to hold key's mutex. */

-        pj_mutex_unlock(h->mutex);

-

-#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)

-	/* from connect(2): 

-	 * On Linux, use getsockopt to read the SO_ERROR option at

-	 * level SOL_SOCKET to determine whether connect() completed

-	 * successfully (if SO_ERROR is zero).

-	 */

-	int value;

-	socklen_t vallen = sizeof(value);

-	int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR, 

-                               &value, &vallen);

-	if (gs_rc != 0) {

-	    /* Argh!! What to do now??? 

-	     * Just indicate that the socket is connected. The

-	     * application will get error as soon as it tries to use

-	     * the socket to send/receive.

-	     */

-	    bytes_transfered = 0;

-	} else {

-            bytes_transfered = value;

-	}

-#elif defined(PJ_WIN32) && PJ_WIN32!=0

-	bytes_transfered = 0; /* success */

-#else

-	/* Excellent information in D.J. Bernstein page:

-	 * http://cr.yp.to/docs/connect.html

-	 *

-	 * Seems like the most portable way of detecting connect()

-	 * failure is to call getpeername(). If socket is connected,

-	 * getpeername() will return 0. If the socket is not connected,

-	 * it will return ENOTCONN, and read(fd, &ch, 1) will produce

-	 * the right errno through error slippage. This is a combination

-	 * of suggestions from Douglas C. Schmidt and Ken Keys.

-	 */

-	int gp_rc;

-	struct sockaddr_in addr;

-	socklen_t addrlen = sizeof(addr);

-

-	gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);

-	bytes_transfered = gp_rc;

-#endif

-

-	/* Call callback. */

-        if (h->cb.on_connect_complete)

-	    (*h->cb.on_connect_complete)(h, bytes_transfered);

-

-        /* Done. */

-

-    } else 

-#endif /* PJ_HAS_TCP */

-    if (key_has_pending_write(h)) {

-	/* Socket is writable. */

-        struct write_operation *write_op;

-        pj_ssize_t sent;

-        pj_status_t send_rc;

-

-        /* Get the first in the queue. */

-        write_op = h->write_list.next;

-

-        /* For datagrams, we can remove the write_op from the list

-         * so that send() can work in parallel.

-         */

-        if (h->fd_type == PJ_SOCK_DGRAM) {

-            pj_list_erase(write_op);

-            if (pj_list_empty(&h->write_list))

-                ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);

-

-            pj_mutex_unlock(h->mutex);

-        }

-

-        /* Send the data. 

-         * Unfortunately we must do this while holding key's mutex, thus

-         * preventing parallel write on a single key.. :-((

-         */

-        sent = write_op->size - write_op->written;

-        if (write_op->op == PJ_IOQUEUE_OP_SEND) {

-            send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,

-                                   &sent, write_op->flags);

-        } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {

-            send_rc = pj_sock_sendto(h->fd, 

-                                     write_op->buf+write_op->written,

-                                     &sent, write_op->flags,

-                                     &write_op->rmt_addr, 

-                                     write_op->rmt_addrlen);

-        } else {

-            pj_assert(!"Invalid operation type!");

-            send_rc = PJ_EBUG;

-        }

-

-        if (send_rc == PJ_SUCCESS) {

-            write_op->written += sent;

-        } else {

-            pj_assert(send_rc > 0);

-            write_op->written = -send_rc;

-        }

-

-        /* Are we finished with this buffer? */

-        if (send_rc!=PJ_SUCCESS || 

-            write_op->written == (pj_ssize_t)write_op->size ||

-            h->fd_type == PJ_SOCK_DGRAM) 

-        {

-            if (h->fd_type != PJ_SOCK_DGRAM) {

-                /* Write completion of the whole stream. */

-                pj_list_erase(write_op);

-

-                /* Clear operation if there's no more data to send. */

-                if (pj_list_empty(&h->write_list))

-                    ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);

-

-                /* No need to hold mutex anymore */

-                pj_mutex_unlock(h->mutex);

-            }

-

-	    /* Call callback. */

-            if (h->cb.on_write_complete) {

-	        (*h->cb.on_write_complete)(h, 

-                                           (pj_ioqueue_op_key_t*)write_op,

-                                           write_op->written);

-            }

-

-        } else {

-            pj_mutex_unlock(h->mutex);

-        }

-

-        /* Done. */

-    } else {

-        pj_assert(!"Descriptor is signaled but key "

-                   "has no pending operation!");

-

-        pj_mutex_unlock(h->mutex);

-    }

-}

-

-void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )

-{

-    pj_status_t rc;

-

-    /* Lock the key. */

-    pj_mutex_lock(h->mutex);

-

-#   if PJ_HAS_TCP

-    if (!pj_list_empty(&h->accept_list)) {

-

-        struct accept_operation *accept_op;

-	

-        /* Get one accept operation from the list. */

-	accept_op = h->accept_list.next;

-        pj_list_erase(accept_op);

-

-	/* Clear bit in fdset if there is no more pending accept */

-        if (pj_list_empty(&h->accept_list))

-            ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);

-

-        /* Unlock; from this point we don't need to hold key's mutex. */

-        pj_mutex_unlock(h->mutex);

-

-	rc=pj_sock_accept(h->fd, accept_op->accept_fd, 

-                          accept_op->rmt_addr, accept_op->addrlen);

-	if (rc==PJ_SUCCESS && accept_op->local_addr) {

-	    rc = pj_sock_getsockname(*accept_op->accept_fd, 

-                                     accept_op->local_addr,

-				     accept_op->addrlen);

-	}

-

-	/* Call callback. */

-        if (h->cb.on_accept_complete)

-	    (*h->cb.on_accept_complete)(h, 

-                                        (pj_ioqueue_op_key_t*)accept_op,

-                                        *accept_op->accept_fd, rc);

-

-    }

-    else

-#   endif

-    if (key_has_pending_read(h)) {

-        struct read_operation *read_op;

-        pj_ssize_t bytes_read;

-

-        pj_assert(!pj_list_empty(&h->read_list));

-

-        /* Get one pending read operation from the list. */

-        read_op = h->read_list.next;

-        pj_list_erase(read_op);

-

-        /* Clear fdset if there is no pending read. */

-        if (pj_list_empty(&h->read_list))

-            ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);

-

-        /* Unlock; from this point we don't need to hold key's mutex. */

-        pj_mutex_unlock(h->mutex);

-

-        bytes_read = read_op->size;

-

-	if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {

-	    rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0,

-				  read_op->rmt_addr, 

-                                  read_op->rmt_addrlen);

-	} else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {

-	    rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);

-        } else {

-            pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);

-            /*

-             * User has specified pj_ioqueue_read().

-             * On Win32, we should do ReadFile(). But because we got

-             * here because of select() anyway, user must have put a

-             * socket descriptor on h->fd, which in this case we can

-             * just call pj_sock_recv() instead of ReadFile().

-             * On Unix, user may put a file in h->fd, so we'll have

-             * to call read() here.

-             * This may not compile on systems which doesn't have 

-             * read(). That's why we only specify PJ_LINUX here so

-             * that error is easier to catch.

-             */

-#	    if defined(PJ_WIN32) && PJ_WIN32 != 0

-                rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);

-                //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,

-                //              &bytes_read, NULL);

-#           elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)

-                bytes_read = read(h->fd, h->rd_buf, bytes_read);

-                rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();

-#	    elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0

-                bytes_read = sys_read(h->fd, h->rd_buf, bytes_read);

-                rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;

-#           else

-#               error "Implement read() for this platform!"

-#           endif

-        }

-	

-	if (rc != PJ_SUCCESS) {

-#	    if defined(PJ_WIN32) && PJ_WIN32 != 0

-	    /* On Win32, for UDP, WSAECONNRESET on the receive side 

-	     * indicates that previous sending has triggered ICMP Port 

-	     * Unreachable message.

-	     * But we wouldn't know at this point which one of previous 

-	     * key that has triggered the error, since UDP socket can

-	     * be shared!

-	     * So we'll just ignore it!

-	     */

-

-	    if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {

-		//PJ_LOG(4,(THIS_FILE, 

-                //          "Ignored ICMP port unreach. on key=%p", h));

-	    }

-#	    endif

-

-            /* In any case we would report this to caller. */

-            bytes_read = -rc;

-	}

-

-	/* Call callback. */

-        if (h->cb.on_read_complete) {

-	    (*h->cb.on_read_complete)(h, 

-                                      (pj_ioqueue_op_key_t*)read_op,

-                                      bytes_read);

-        }

-

-    } else {

-        pj_mutex_unlock(h->mutex);

-    }

-}

-

-

-void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, 

-                                       pj_ioqueue_key_t *h )

-{

-    pj_mutex_lock(h->mutex);

-

-    if (!h->connecting) {

-        /* It is possible that more than one thread was woken up, thus

-         * the remaining thread will see h->connecting as zero because

-         * it has been processed by other thread.

-         */

-        pj_mutex_unlock(h->mutex);

-        return;

-    }

-

-    /* Clear operation. */

-    h->connecting = 0;

-

-    pj_mutex_unlock(h->mutex);

-

-    ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);

-    ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);

-

-    /* Call callback. */

-    if (h->cb.on_connect_complete)

-	(*h->cb.on_connect_complete)(h, -1);

-}

-

-/*

- * pj_ioqueue_recv()

- *

- * Start asynchronous recv() from the socket.

- */

-PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_key_t *key,

-                                      pj_ioqueue_op_key_t *op_key,

-				      void *buffer,

-				      pj_ssize_t *length,

-				      unsigned flags )

-{

-    pj_status_t status;

-    pj_ssize_t size;

-    struct read_operation *read_op;

-

-    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);

-    PJ_CHECK_STACK();

-

-    /* Try to see if there's data immediately available. 

-     */

-    size = *length;

-    status = pj_sock_recv(key->fd, buffer, &size, flags);

-    if (status == PJ_SUCCESS) {

-        /* Yes! Data is available! */

-        *length = size;

-        return PJ_SUCCESS;

-    } else {

-        /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report

-         * the error to caller.

-         */

-        if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))

-            return status;

-    }

-

-    /*

-     * No data is immediately available.

-     * Must schedule asynchronous operation to the ioqueue.

-     */

-    read_op = (struct read_operation*)op_key;

-

-    read_op->op = PJ_IOQUEUE_OP_RECV;

-    read_op->buf = buffer;

-    read_op->size = *length;

-    read_op->flags = flags;

-

-    pj_mutex_lock(key->mutex);

-    pj_list_insert_before(&key->read_list, read_op);

-    ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);

-    pj_mutex_unlock(key->mutex);

-

-    return PJ_EPENDING;

-}

-

-/*

- * pj_ioqueue_recvfrom()

- *

- * Start asynchronous recvfrom() from the socket.

- */

-PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,

-                                         pj_ioqueue_op_key_t *op_key,

-				         void *buffer,

-				         pj_ssize_t *length,

-                                         unsigned flags,

-				         pj_sockaddr_t *addr,

-				         int *addrlen)

-{

-    pj_status_t status;

-    pj_ssize_t size;

-    struct read_operation *read_op;

-

-    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);

-    PJ_CHECK_STACK();

-

-    /* Try to see if there's data immediately available. 

-     */

-    size = *length;

-    status = pj_sock_recvfrom(key->fd, buffer, &size, flags,

-                              addr, addrlen);

-    if (status == PJ_SUCCESS) {

-        /* Yes! Data is available! */

-        *length = size;

-        return PJ_SUCCESS;

-    } else {

-        /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report

-         * the error to caller.

-         */

-        if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))

-            return status;

-    }

-

-    /*

-     * No data is immediately available.

-     * Must schedule asynchronous operation to the ioqueue.

-     */

-    read_op = (struct read_operation*)op_key;

-

-    read_op->op = PJ_IOQUEUE_OP_RECV_FROM;

-    read_op->buf = buffer;

-    read_op->size = *length;

-    read_op->flags = flags;

-    read_op->rmt_addr = addr;

-    read_op->rmt_addrlen = addrlen;

-

-    pj_mutex_lock(key->mutex);

-    pj_list_insert_before(&key->read_list, read_op);

-    ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);

-    pj_mutex_unlock(key->mutex);

-

-    return PJ_EPENDING;

-}

-

-/*

- * pj_ioqueue_send()

- *

- * Start asynchronous send() to the descriptor.

- */

-PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,

-                                     pj_ioqueue_op_key_t *op_key,

-			             const void *data,

-			             pj_ssize_t *length,

-                                     unsigned flags)

-{

-    struct write_operation *write_op;

-    pj_status_t status;

-    pj_ssize_t sent;

-

-    PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);

-    PJ_CHECK_STACK();

-

-    /* Fast track:

-     *   Try to send data immediately, only if there's no pending write!

-     * Note:

-     *  We are speculating that the list is empty here without properly

-     *  acquiring ioqueue's mutex first. This is intentional, to maximize

-     *  performance via parallelism.

-     *

-     *  This should be safe, because:

-     *      - by convention, we require caller to make sure that the

-     *        key is not unregistered while other threads are invoking

-     *        an operation on the same key.

-     *      - pj_list_empty() is safe to be invoked by multiple threads,

-     *        even when other threads are modifying the list.

-     */

-    if (pj_list_empty(&key->write_list)) {

-        /*

-         * See if data can be sent immediately.

-         */

-        sent = *length;

-        status = pj_sock_send(key->fd, data, &sent, flags);

-        if (status == PJ_SUCCESS) {

-            /* Success! */

-            *length = sent;

-            return PJ_SUCCESS;

-        } else {

-            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report

-             * the error to caller.

-             */

-            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {

-                return status;

-            }

-        }

-    }

-

-    /*

-     * Schedule asynchronous send.

-     */

-    write_op = (struct write_operation*)op_key;

-    write_op->op = PJ_IOQUEUE_OP_SEND;

-    write_op->buf = NULL;

-    write_op->size = *length;

-    write_op->written = 0;

-    write_op->flags = flags;

-    

-    pj_mutex_lock(key->mutex);

-    pj_list_insert_before(&key->write_list, write_op);

-    ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);

-    pj_mutex_unlock(key->mutex);

-

-    return PJ_EPENDING;

-}

-

-

-/*

- * pj_ioqueue_sendto()

- *

- * Start asynchronous write() to the descriptor.

- */

-PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,

-                                       pj_ioqueue_op_key_t *op_key,

-			               const void *data,

-			               pj_ssize_t *length,

-                                       unsigned flags,

-			               const pj_sockaddr_t *addr,

-			               int addrlen)

-{

-    struct write_operation *write_op;

-    pj_status_t status;

-    pj_ssize_t sent;

-

-    PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);

-    PJ_CHECK_STACK();

-

-    /* Fast track:

-     *   Try to send data immediately, only if there's no pending write!

-     * Note:

-     *  We are speculating that the list is empty here without properly

-     *  acquiring ioqueue's mutex first. This is intentional, to maximize

-     *  performance via parallelism.

-     *

-     *  This should be safe, because:

-     *      - by convention, we require caller to make sure that the

-     *        key is not unregistered while other threads are invoking

-     *        an operation on the same key.

-     *      - pj_list_empty() is safe to be invoked by multiple threads,

-     *        even when other threads are modifying the list.

-     */

-    if (pj_list_empty(&key->write_list)) {

-        /*

-         * See if data can be sent immediately.

-         */

-        sent = *length;

-        status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);

-        if (status == PJ_SUCCESS) {

-            /* Success! */

-            *length = sent;

-            return PJ_SUCCESS;

-        } else {

-            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report

-             * the error to caller.

-             */

-            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {

-                return status;

-            }

-        }

-    }

-

-    /*

-     * Check that address storage can hold the address parameter.

-     */

-    PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);

-

-    /*

-     * Schedule asynchronous send.

-     */

-    write_op = (struct write_operation*)op_key;

-    write_op->op = PJ_IOQUEUE_OP_SEND_TO;

-    write_op->buf = NULL;

-    write_op->size = *length;

-    write_op->written = 0;

-    write_op->flags = flags;

-    pj_memcpy(&write_op->rmt_addr, addr, addrlen);

-    write_op->rmt_addrlen = addrlen;

-    

-    pj_mutex_lock(key->mutex);

-    pj_list_insert_before(&key->write_list, write_op);

-    ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);

-    pj_mutex_unlock(key->mutex);

-

-    return PJ_EPENDING;

-}

-

-#if PJ_HAS_TCP

-/*

- * Initiate overlapped accept() operation.

- */

-PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,

-                                       pj_ioqueue_op_key_t *op_key,

-			               pj_sock_t *new_sock,

-			               pj_sockaddr_t *local,

-			               pj_sockaddr_t *remote,

-			               int *addrlen)

-{

-    struct accept_operation *accept_op;

-    pj_status_t status;

-

-    /* check parameters. All must be specified! */

-    PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);

-

-    /* Fast track:

-     *  See if there's new connection available immediately.

-     */

-    if (pj_list_empty(&key->accept_list)) {

-        status = pj_sock_accept(key->fd, new_sock, remote, addrlen);

-        if (status == PJ_SUCCESS) {

-            /* Yes! New connection is available! */

-            if (local && addrlen) {

-                status = pj_sock_getsockname(*new_sock, local, addrlen);

-                if (status != PJ_SUCCESS) {

-                    pj_sock_close(*new_sock);

-                    *new_sock = PJ_INVALID_SOCKET;

-                    return status;

-                }

-            }

-            return PJ_SUCCESS;

-        } else {

-            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report

-             * the error to caller.

-             */

-            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {

-                return status;

-            }

-        }

-    }

-

-    /*

-     * No connection is available immediately.

-     * Schedule accept() operation to be completed when there is incoming

-     * connection available.

-     */

-    accept_op = (struct accept_operation*)op_key;

-

-    accept_op->op = PJ_IOQUEUE_OP_ACCEPT;

-    accept_op->accept_fd = new_sock;

-    accept_op->rmt_addr = remote;

-    accept_op->addrlen= addrlen;

-    accept_op->local_addr = local;

-

-    pj_mutex_lock(key->mutex);

-    pj_list_insert_before(&key->accept_list, accept_op);

-    ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);

-    pj_mutex_unlock(key->mutex);

-

-    return PJ_EPENDING;

-}

-

-/*

- * Initiate overlapped connect() operation (well, it's non-blocking actually,

- * since there's no overlapped version of connect()).

- */

-PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,

-					const pj_sockaddr_t *addr,

-					int addrlen )

-{

-    pj_status_t status;

-    

-    /* check parameters. All must be specified! */

-    PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);

-

-    /* Check if socket has not been marked for connecting */

-    if (key->connecting != 0)

-        return PJ_EPENDING;

-    

-    status = pj_sock_connect(key->fd, addr, addrlen);

-    if (status == PJ_SUCCESS) {

-	/* Connected! */

-	return PJ_SUCCESS;

-    } else {

-	if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {

-	    /* Pending! */

-            pj_mutex_lock(key->mutex);

-	    key->connecting = PJ_TRUE;

-            ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);

-            ioqueue_add_to_set(key->ioqueue, key->fd, EXCEPTION_EVENT);

-            pj_mutex_unlock(key->mutex);

-	    return PJ_EPENDING;

-	} else {

-	    /* Error! */

-	    return status;

-	}

-    }

-}

-#endif	/* PJ_HAS_TCP */

-

+/* $Id$ */
+
+/*
+ * ioqueue_common_abs.c
+ *
+ * This contains common functionalities to emulate proactor pattern with
+ * various event dispatching mechanisms (e.g. select, epoll).
+ *
+ * This file will be included by the appropriate ioqueue implementation.
+ * This file is NOT supposed to be compiled as stand-alone source.
+ */
+
+static void ioqueue_init( pj_ioqueue_t *ioqueue )
+{
+    ioqueue->lock = NULL;
+    ioqueue->auto_delete_lock = 0;
+}
+
+static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
+{
+    if (ioqueue->auto_delete_lock && ioqueue->lock )
+        return pj_lock_destroy(ioqueue->lock);
+    else
+        return PJ_SUCCESS;
+}
+
+/*
+ * pj_ioqueue_set_lock()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, 
+					 pj_lock_t *lock,
+					 pj_bool_t auto_delete )
+{
+    PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
+
+    if (ioqueue->auto_delete_lock && ioqueue->lock) {
+        pj_lock_destroy(ioqueue->lock);
+    }
+
+    ioqueue->lock = lock;
+    ioqueue->auto_delete_lock = auto_delete;
+
+    return PJ_SUCCESS;
+}
+
+static pj_status_t ioqueue_init_key( pj_pool_t *pool,
+                                     pj_ioqueue_t *ioqueue,
+                                     pj_ioqueue_key_t *key,
+                                     pj_sock_t sock,
+                                     void *user_data,
+                                     const pj_ioqueue_callback *cb)
+{
+    pj_status_t rc;
+    int optlen;
+
+    key->ioqueue = ioqueue;
+    key->fd = sock;
+    key->user_data = user_data;
+    pj_list_init(&key->read_list);
+    pj_list_init(&key->write_list);
+#if PJ_HAS_TCP
+    pj_list_init(&key->accept_list);
+#endif
+
+    /* Save callback. */
+    pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
+
+    /* Get socket type. When socket type is datagram, some optimization
+     * will be performed during send to allow parallel send operations.
+     */
+    optlen = sizeof(key->fd_type);
+    rc = pj_sock_getsockopt(sock, PJ_SOL_SOCKET, PJ_SO_TYPE,
+                            &key->fd_type, &optlen);
+    if (rc != PJ_SUCCESS)
+        key->fd_type = PJ_SOCK_STREAM;
+
+    /* Create mutex for the key. */
+    rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
+    
+    return rc;
+}
+
+static void ioqueue_destroy_key( pj_ioqueue_key_t *key )
+{
+    pj_mutex_destroy(key->mutex);
+}
+
+/*
+ * pj_ioqueue_get_user_data()
+ *
+ * Obtain value associated with a key.
+ */
+PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
+{
+    PJ_ASSERT_RETURN(key != NULL, NULL);
+    return key->user_data;
+}
+
+/*
+ * pj_ioqueue_set_user_data()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
+                                              void *user_data,
+                                              void **old_data)
+{
+    PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+    if (old_data)
+        *old_data = key->user_data;
+    key->user_data = user_data;
+
+    return PJ_SUCCESS;
+}
+
+PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
+{
+    return !pj_list_empty(&key->write_list);
+}
+
+PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
+{
+    return !pj_list_empty(&key->read_list);
+}
+
+PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
+{
+#if PJ_HAS_TCP
+    return !pj_list_empty(&key->accept_list);
+#else
+    return 0;
+#endif
+}
+
+PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
+{
+    return key->connecting;
+}
+
+
+/*
+ * ioqueue_dispatch_event()
+ *
+ * Report occurence of an event in the key to be processed by the
+ * framework.
+ */
+void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
+{
+    /* Lock the key. */
+    pj_mutex_lock(h->mutex);
+
+#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
+    if (h->connecting) {
+	/* Completion of connect() operation */
+	pj_ssize_t bytes_transfered;
+
+	/* Clear operation. */
+	h->connecting = 0;
+
+        ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
+        ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
+
+        /* Unlock; from this point we don't need to hold key's mutex. */
+        pj_mutex_unlock(h->mutex);
+
+#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
+	/* from connect(2): 
+	 * On Linux, use getsockopt to read the SO_ERROR option at
+	 * level SOL_SOCKET to determine whether connect() completed
+	 * successfully (if SO_ERROR is zero).
+	 */
+	int value;
+	socklen_t vallen = sizeof(value);
+	int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR, 
+                               &value, &vallen);
+	if (gs_rc != 0) {
+	    /* Argh!! What to do now??? 
+	     * Just indicate that the socket is connected. The
+	     * application will get error as soon as it tries to use
+	     * the socket to send/receive.
+	     */
+	    bytes_transfered = 0;
+	} else {
+            bytes_transfered = value;
+	}
+#elif defined(PJ_WIN32) && PJ_WIN32!=0
+	bytes_transfered = 0; /* success */
+#else
+	/* Excellent information in D.J. Bernstein page:
+	 * http://cr.yp.to/docs/connect.html
+	 *
+	 * Seems like the most portable way of detecting connect()
+	 * failure is to call getpeername(). If socket is connected,
+	 * getpeername() will return 0. If the socket is not connected,
+	 * it will return ENOTCONN, and read(fd, &ch, 1) will produce
+	 * the right errno through error slippage. This is a combination
+	 * of suggestions from Douglas C. Schmidt and Ken Keys.
+	 */
+	int gp_rc;
+	struct sockaddr_in addr;
+	socklen_t addrlen = sizeof(addr);
+
+	gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
+	bytes_transfered = gp_rc;
+#endif
+
+	/* Call callback. */
+        if (h->cb.on_connect_complete)
+	    (*h->cb.on_connect_complete)(h, bytes_transfered);
+
+        /* Done. */
+
+    } else 
+#endif /* PJ_HAS_TCP */
+    if (key_has_pending_write(h)) {
+	/* Socket is writable. */
+        struct write_operation *write_op;
+        pj_ssize_t sent;
+        pj_status_t send_rc;
+
+        /* Get the first in the queue. */
+        write_op = h->write_list.next;
+
+        /* For datagrams, we can remove the write_op from the list
+         * so that send() can work in parallel.
+         */
+        if (h->fd_type == PJ_SOCK_DGRAM) {
+            pj_list_erase(write_op);
+            if (pj_list_empty(&h->write_list))
+                ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
+
+            pj_mutex_unlock(h->mutex);
+        }
+
+        /* Send the data. 
+         * Unfortunately we must do this while holding key's mutex, thus
+         * preventing parallel write on a single key.. :-((
+         */
+        sent = write_op->size - write_op->written;
+        if (write_op->op == PJ_IOQUEUE_OP_SEND) {
+            send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
+                                   &sent, write_op->flags);
+        } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
+            send_rc = pj_sock_sendto(h->fd, 
+                                     write_op->buf+write_op->written,
+                                     &sent, write_op->flags,
+                                     &write_op->rmt_addr, 
+                                     write_op->rmt_addrlen);
+        } else {
+            pj_assert(!"Invalid operation type!");
+            send_rc = PJ_EBUG;
+        }
+
+        if (send_rc == PJ_SUCCESS) {
+            write_op->written += sent;
+        } else {
+            pj_assert(send_rc > 0);
+            write_op->written = -send_rc;
+        }
+
+        /* Are we finished with this buffer? */
+        if (send_rc!=PJ_SUCCESS || 
+            write_op->written == (pj_ssize_t)write_op->size ||
+            h->fd_type == PJ_SOCK_DGRAM) 
+        {
+            if (h->fd_type != PJ_SOCK_DGRAM) {
+                /* Write completion of the whole stream. */
+                pj_list_erase(write_op);
+
+                /* Clear operation if there's no more data to send. */
+                if (pj_list_empty(&h->write_list))
+                    ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
+
+                /* No need to hold mutex anymore */
+                pj_mutex_unlock(h->mutex);
+            }
+
+	    /* Call callback. */
+            if (h->cb.on_write_complete) {
+	        (*h->cb.on_write_complete)(h, 
+                                           (pj_ioqueue_op_key_t*)write_op,
+                                           write_op->written);
+            }
+
+        } else {
+            pj_mutex_unlock(h->mutex);
+        }
+
+        /* Done. */
+    } else {
+        pj_assert(!"Descriptor is signaled but key "
+                   "has no pending operation!");
+
+        pj_mutex_unlock(h->mutex);
+    }
+}
+
+void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
+{
+    pj_status_t rc;
+
+    /* Lock the key. */
+    pj_mutex_lock(h->mutex);
+
+#   if PJ_HAS_TCP
+    if (!pj_list_empty(&h->accept_list)) {
+
+        struct accept_operation *accept_op;
+	
+        /* Get one accept operation from the list. */
+	accept_op = h->accept_list.next;
+        pj_list_erase(accept_op);
+
+	/* Clear bit in fdset if there is no more pending accept */
+        if (pj_list_empty(&h->accept_list))
+            ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
+
+        /* Unlock; from this point we don't need to hold key's mutex. */
+        pj_mutex_unlock(h->mutex);
+
+	rc=pj_sock_accept(h->fd, accept_op->accept_fd, 
+                          accept_op->rmt_addr, accept_op->addrlen);
+	if (rc==PJ_SUCCESS && accept_op->local_addr) {
+	    rc = pj_sock_getsockname(*accept_op->accept_fd, 
+                                     accept_op->local_addr,
+				     accept_op->addrlen);
+	}
+
+	/* Call callback. */
+        if (h->cb.on_accept_complete) {
+	    (*h->cb.on_accept_complete)(h, 
+                                        (pj_ioqueue_op_key_t*)accept_op,
+                                        *accept_op->accept_fd, rc);
+	}
+
+    }
+    else
+#   endif
+    if (key_has_pending_read(h)) {
+        struct read_operation *read_op;
+        pj_ssize_t bytes_read;
+
+        /* Get one pending read operation from the list. */
+        read_op = h->read_list.next;
+        pj_list_erase(read_op);
+
+        /* Clear fdset if there is no pending read. */
+        if (pj_list_empty(&h->read_list))
+            ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
+
+        /* Unlock; from this point we don't need to hold key's mutex. */
+        pj_mutex_unlock(h->mutex);
+
+        bytes_read = read_op->size;
+
+	if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
+	    rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0,
+				  read_op->rmt_addr, 
+                                  read_op->rmt_addrlen);
+	} else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
+	    rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
+        } else {
+            pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
+            /*
+             * User has specified pj_ioqueue_read().
+             * On Win32, we should do ReadFile(). But because we got
+             * here because of select() anyway, user must have put a
+             * socket descriptor on h->fd, which in this case we can
+             * just call pj_sock_recv() instead of ReadFile().
+             * On Unix, user may put a file in h->fd, so we'll have
+             * to call read() here.
+             * This may not compile on systems which doesn't have 
+             * read(). That's why we only specify PJ_LINUX here so
+             * that error is easier to catch.
+             */
+#	    if defined(PJ_WIN32) && PJ_WIN32 != 0
+                rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
+                //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
+                //              &bytes_read, NULL);
+#           elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
+                bytes_read = read(h->fd, read_op->buf, bytes_read);
+                rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
+#	    elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
+                bytes_read = sys_read(h->fd, read_op->buf, bytes_read);
+                rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
+#           else
+#               error "Implement read() for this platform!"
+#           endif
+        }
+	
+	if (rc != PJ_SUCCESS) {
+#	    if defined(PJ_WIN32) && PJ_WIN32 != 0
+	    /* On Win32, for UDP, WSAECONNRESET on the receive side 
+	     * indicates that previous sending has triggered ICMP Port 
+	     * Unreachable message.
+	     * But we wouldn't know at this point which one of previous 
+	     * key that has triggered the error, since UDP socket can
+	     * be shared!
+	     * So we'll just ignore it!
+	     */
+
+	    if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
+		//PJ_LOG(4,(THIS_FILE, 
+                //          "Ignored ICMP port unreach. on key=%p", h));
+	    }
+#	    endif
+
+            /* In any case we would report this to caller. */
+            bytes_read = -rc;
+	}
+
+	/* Call callback. */
+        if (h->cb.on_read_complete) {
+	    (*h->cb.on_read_complete)(h, 
+                                      (pj_ioqueue_op_key_t*)read_op,
+                                      bytes_read);
+        }
+
+    } else {
+        pj_mutex_unlock(h->mutex);
+    }
+}
+
+
+void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, 
+                                       pj_ioqueue_key_t *h )
+{
+    pj_mutex_lock(h->mutex);
+
+    if (!h->connecting) {
+        /* It is possible that more than one thread was woken up, thus
+         * the remaining thread will see h->connecting as zero because
+         * it has been processed by other thread.
+         */
+        pj_mutex_unlock(h->mutex);
+        return;
+    }
+
+    /* Clear operation. */
+    h->connecting = 0;
+
+    pj_mutex_unlock(h->mutex);
+
+    ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
+    ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
+
+    /* Call callback. */
+    if (h->cb.on_connect_complete)
+	(*h->cb.on_connect_complete)(h, -1);
+}
+
+/*
+ * pj_ioqueue_recv()
+ *
+ * Start asynchronous recv() from the socket.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_key_t *key,
+                                      pj_ioqueue_op_key_t *op_key,
+				      void *buffer,
+				      pj_ssize_t *length,
+				      unsigned flags )
+{
+    pj_status_t status;
+    pj_ssize_t size;
+    struct read_operation *read_op;
+
+    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
+    PJ_CHECK_STACK();
+
+    /* Try to see if there's data immediately available. 
+     */
+    size = *length;
+    status = pj_sock_recv(key->fd, buffer, &size, flags);
+    if (status == PJ_SUCCESS) {
+        /* Yes! Data is available! */
+        *length = size;
+        return PJ_SUCCESS;
+    } else {
+        /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+         * the error to caller.
+         */
+        if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
+            return status;
+    }
+
+    /*
+     * No data is immediately available.
+     * Must schedule asynchronous operation to the ioqueue.
+     */
+    read_op = (struct read_operation*)op_key;
+
+    read_op->op = PJ_IOQUEUE_OP_RECV;
+    read_op->buf = buffer;
+    read_op->size = *length;
+    read_op->flags = flags;
+
+    pj_mutex_lock(key->mutex);
+    pj_list_insert_before(&key->read_list, read_op);
+    ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
+    pj_mutex_unlock(key->mutex);
+
+    return PJ_EPENDING;
+}
+
+/*
+ * pj_ioqueue_recvfrom()
+ *
+ * Start asynchronous recvfrom() from the socket.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
+                                         pj_ioqueue_op_key_t *op_key,
+				         void *buffer,
+				         pj_ssize_t *length,
+                                         unsigned flags,
+				         pj_sockaddr_t *addr,
+				         int *addrlen)
+{
+    pj_status_t status;
+    pj_ssize_t size;
+    struct read_operation *read_op;
+
+    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
+    PJ_CHECK_STACK();
+
+    /* Try to see if there's data immediately available. 
+     */
+    size = *length;
+    status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
+                              addr, addrlen);
+    if (status == PJ_SUCCESS) {
+        /* Yes! Data is available! */
+        *length = size;
+        return PJ_SUCCESS;
+    } else {
+        /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+         * the error to caller.
+         */
+        if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
+            return status;
+    }
+
+    /*
+     * No data is immediately available.
+     * Must schedule asynchronous operation to the ioqueue.
+     */
+    read_op = (struct read_operation*)op_key;
+
+    read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
+    read_op->buf = buffer;
+    read_op->size = *length;
+    read_op->flags = flags;
+    read_op->rmt_addr = addr;
+    read_op->rmt_addrlen = addrlen;
+
+    pj_mutex_lock(key->mutex);
+    pj_list_insert_before(&key->read_list, read_op);
+    ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
+    pj_mutex_unlock(key->mutex);
+
+    return PJ_EPENDING;
+}
+
+/*
+ * pj_ioqueue_send()
+ *
+ * Start asynchronous send() to the descriptor.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
+                                     pj_ioqueue_op_key_t *op_key,
+			             const void *data,
+			             pj_ssize_t *length,
+                                     unsigned flags)
+{
+    struct write_operation *write_op;
+    pj_status_t status;
+    pj_ssize_t sent;
+
+    PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
+    PJ_CHECK_STACK();
+
+    /* Fast track:
+     *   Try to send data immediately, only if there's no pending write!
+     * Note:
+     *  We are speculating that the list is empty here without properly
+     *  acquiring ioqueue's mutex first. This is intentional, to maximize
+     *  performance via parallelism.
+     *
+     *  This should be safe, because:
+     *      - by convention, we require caller to make sure that the
+     *        key is not unregistered while other threads are invoking
+     *        an operation on the same key.
+     *      - pj_list_empty() is safe to be invoked by multiple threads,
+     *        even when other threads are modifying the list.
+     */
+    if (pj_list_empty(&key->write_list)) {
+        /*
+         * See if data can be sent immediately.
+         */
+        sent = *length;
+        status = pj_sock_send(key->fd, data, &sent, flags);
+        if (status == PJ_SUCCESS) {
+            /* Success! */
+            *length = sent;
+            return PJ_SUCCESS;
+        } else {
+            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+             * the error to caller.
+             */
+            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
+                return status;
+            }
+        }
+    }
+
+    /*
+     * Schedule asynchronous send.
+     */
+    write_op = (struct write_operation*)op_key;
+    write_op->op = PJ_IOQUEUE_OP_SEND;
+    write_op->buf = NULL;
+    write_op->size = *length;
+    write_op->written = 0;
+    write_op->flags = flags;
+    
+    pj_mutex_lock(key->mutex);
+    pj_list_insert_before(&key->write_list, write_op);
+    ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
+    pj_mutex_unlock(key->mutex);
+
+    return PJ_EPENDING;
+}
+
+
+/*
+ * pj_ioqueue_sendto()
+ *
+ * Start asynchronous write() to the descriptor.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
+                                       pj_ioqueue_op_key_t *op_key,
+			               const void *data,
+			               pj_ssize_t *length,
+                                       unsigned flags,
+			               const pj_sockaddr_t *addr,
+			               int addrlen)
+{
+    struct write_operation *write_op;
+    pj_status_t status;
+    pj_ssize_t sent;
+
+    PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
+    PJ_CHECK_STACK();
+
+    /* Fast track:
+     *   Try to send data immediately, only if there's no pending write!
+     * Note:
+     *  We are speculating that the list is empty here without properly
+     *  acquiring ioqueue's mutex first. This is intentional, to maximize
+     *  performance via parallelism.
+     *
+     *  This should be safe, because:
+     *      - by convention, we require caller to make sure that the
+     *        key is not unregistered while other threads are invoking
+     *        an operation on the same key.
+     *      - pj_list_empty() is safe to be invoked by multiple threads,
+     *        even when other threads are modifying the list.
+     */
+    if (pj_list_empty(&key->write_list)) {
+        /*
+         * See if data can be sent immediately.
+         */
+        sent = *length;
+        status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
+        if (status == PJ_SUCCESS) {
+            /* Success! */
+            *length = sent;
+            return PJ_SUCCESS;
+        } else {
+            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+             * the error to caller.
+             */
+            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
+                return status;
+            }
+        }
+    }
+
+    /*
+     * Check that address storage can hold the address parameter.
+     */
+    PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);
+
+    /*
+     * Schedule asynchronous send.
+     */
+    write_op = (struct write_operation*)op_key;
+    write_op->op = PJ_IOQUEUE_OP_SEND_TO;
+    write_op->buf = NULL;
+    write_op->size = *length;
+    write_op->written = 0;
+    write_op->flags = flags;
+    pj_memcpy(&write_op->rmt_addr, addr, addrlen);
+    write_op->rmt_addrlen = addrlen;
+    
+    pj_mutex_lock(key->mutex);
+    pj_list_insert_before(&key->write_list, write_op);
+    ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
+    pj_mutex_unlock(key->mutex);
+
+    return PJ_EPENDING;
+}
+
+#if PJ_HAS_TCP
+/*
+ * Initiate overlapped accept() operation.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
+                                       pj_ioqueue_op_key_t *op_key,
+			               pj_sock_t *new_sock,
+			               pj_sockaddr_t *local,
+			               pj_sockaddr_t *remote,
+			               int *addrlen)
+{
+    struct accept_operation *accept_op;
+    pj_status_t status;
+
+    /* check parameters. All must be specified! */
+    PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
+
+    /* Fast track:
+     *  See if there's new connection available immediately.
+     */
+    if (pj_list_empty(&key->accept_list)) {
+        status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
+        if (status == PJ_SUCCESS) {
+            /* Yes! New connection is available! */
+            if (local && addrlen) {
+                status = pj_sock_getsockname(*new_sock, local, addrlen);
+                if (status != PJ_SUCCESS) {
+                    pj_sock_close(*new_sock);
+                    *new_sock = PJ_INVALID_SOCKET;
+                    return status;
+                }
+            }
+            return PJ_SUCCESS;
+        } else {
+            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+             * the error to caller.
+             */
+            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
+                return status;
+            }
+        }
+    }
+
+    /*
+     * No connection is available immediately.
+     * Schedule accept() operation to be completed when there is incoming
+     * connection available.
+     */
+    accept_op = (struct accept_operation*)op_key;
+
+    accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
+    accept_op->accept_fd = new_sock;
+    accept_op->rmt_addr = remote;
+    accept_op->addrlen= addrlen;
+    accept_op->local_addr = local;
+
+    pj_mutex_lock(key->mutex);
+    pj_list_insert_before(&key->accept_list, accept_op);
+    ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
+    pj_mutex_unlock(key->mutex);
+
+    return PJ_EPENDING;
+}
+
+/*
+ * Initiate overlapped connect() operation (well, it's non-blocking actually,
+ * since there's no overlapped version of connect()).
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
+					const pj_sockaddr_t *addr,
+					int addrlen )
+{
+    pj_status_t status;
+    
+    /* check parameters. All must be specified! */
+    PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
+
+    /* Check if socket has not been marked for connecting */
+    if (key->connecting != 0)
+        return PJ_EPENDING;
+    
+    status = pj_sock_connect(key->fd, addr, addrlen);
+    if (status == PJ_SUCCESS) {
+	/* Connected! */
+	return PJ_SUCCESS;
+    } else {
+	if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
+	    /* Pending! */
+            pj_mutex_lock(key->mutex);
+	    key->connecting = PJ_TRUE;
+            ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
+            ioqueue_add_to_set(key->ioqueue, key->fd, EXCEPTION_EVENT);
+            pj_mutex_unlock(key->mutex);
+	    return PJ_EPENDING;
+	} else {
+	    /* Error! */
+	    return status;
+	}
+    }
+}
+#endif	/* PJ_HAS_TCP */
+
diff --git a/pjlib/src/pj/ioqueue_common_abs.h b/pjlib/src/pj/ioqueue_common_abs.h
index c6fc1ff..1902ff4 100644
--- a/pjlib/src/pj/ioqueue_common_abs.h
+++ b/pjlib/src/pj/ioqueue_common_abs.h
@@ -1,107 +1,108 @@
-/* $Id */

-

-/* ioqueue_common_abs.h

- *

- * This file contains private declarations for abstracting various 

- * event polling/dispatching mechanisms (e.g. select, poll, epoll) 

- * to the ioqueue. 

- */

-

-#include <pj/list.h>

-

-/*

- * The select ioqueue relies on socket functions (pj_sock_xxx()) to return

- * the correct error code.

- */

-#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)

-#   error "Error reporting must be enabled for this function to work!"

-#endif

-

-

-struct generic_operation

-{

-    PJ_DECL_LIST_MEMBER(struct generic_operation);

-    pj_ioqueue_operation_e  op;

-};

-

-struct read_operation

-{

-    PJ_DECL_LIST_MEMBER(struct read_operation);

-    pj_ioqueue_operation_e  op;

-

-    void		   *buf;

-    pj_size_t		    size;

-    unsigned                flags;

-    pj_sockaddr_t	   *rmt_addr;

-    int			   *rmt_addrlen;

-};

-

-struct write_operation

-{

-    PJ_DECL_LIST_MEMBER(struct write_operation);

-    pj_ioqueue_operation_e  op;

-

-    char		   *buf;

-    pj_size_t		    size;

-    pj_ssize_t              written;

-    unsigned                flags;

-    pj_sockaddr_in	    rmt_addr;

-    int			    rmt_addrlen;

-};

-

-#if PJ_HAS_TCP

-struct accept_operation

-{

-    PJ_DECL_LIST_MEMBER(struct accept_operation);

-    pj_ioqueue_operation_e  op;

-

-    pj_sock_t              *accept_fd;

-    pj_sockaddr_t	   *local_addr;

-    pj_sockaddr_t	   *rmt_addr;

-    int			   *addrlen;

-};

-#endif

-

-union operation_key

-{

-    struct generic_operation generic;

-    struct read_operation    read;

-    struct write_operation   write;

-#if PJ_HAS_TCP

-    struct accept_operation  accept;

-#endif

-};

-

-#define DECLARE_COMMON_KEY                          \

-    PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);   \

-    pj_ioqueue_t           *ioqueue;                \

-    pj_mutex_t             *mutex;                  \

-    pj_sock_t		    fd;                     \

-    int                     fd_type;                \

-    void		   *user_data;              \

-    pj_ioqueue_callback	    cb;                     \

-    int                     connecting;             \

-    struct read_operation   read_list;              \

-    struct write_operation  write_list;             \

-    struct accept_operation accept_list;

-

-

-#define DECLARE_COMMON_IOQUEUE                      \

-    pj_lock_t          *lock;                       \

-    pj_bool_t           auto_delete_lock;

-

-

-enum ioqueue_event_type

-{

-    NO_EVENT,

-    READABLE_EVENT,

-    WRITEABLE_EVENT,

-    EXCEPTION_EVENT,

-};

-

-static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,

-                                pj_sock_t fd,

-                                enum ioqueue_event_type event_type );

-static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,

-                                     pj_sock_t fd, 

-                                     enum ioqueue_event_type event_type);

+/* $Id */
+
+/* ioqueue_common_abs.h
+ *
+ * This file contains private declarations for abstracting various 
+ * event polling/dispatching mechanisms (e.g. select, poll, epoll) 
+ * to the ioqueue. 
+ */
+
+#include <pj/list.h>
+
+/*
+ * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
+ * the correct error code.
+ */
+#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
+#   error "Proper error reporting must be enabled for ioqueue to work!"
+#endif
+
+
+struct generic_operation
+{
+    PJ_DECL_LIST_MEMBER(struct generic_operation);
+    pj_ioqueue_operation_e  op;
+};
+
+struct read_operation
+{
+    PJ_DECL_LIST_MEMBER(struct read_operation);
+    pj_ioqueue_operation_e  op;
+
+    void		   *buf;
+    pj_size_t		    size;
+    unsigned                flags;
+    pj_sockaddr_t	   *rmt_addr;
+    int			   *rmt_addrlen;
+};
+
+struct write_operation
+{
+    PJ_DECL_LIST_MEMBER(struct write_operation);
+    pj_ioqueue_operation_e  op;
+
+    char		   *buf;
+    pj_size_t		    size;
+    pj_ssize_t              written;
+    unsigned                flags;
+    pj_sockaddr_in	    rmt_addr;
+    int			    rmt_addrlen;
+};
+
+#if PJ_HAS_TCP
+struct accept_operation
+{
+    PJ_DECL_LIST_MEMBER(struct accept_operation);
+    pj_ioqueue_operation_e  op;
+
+    pj_sock_t              *accept_fd;
+    pj_sockaddr_t	   *local_addr;
+    pj_sockaddr_t	   *rmt_addr;
+    int			   *addrlen;
+};
+#endif
+
+union operation_key
+{
+    struct generic_operation generic;
+    struct read_operation    read;
+    struct write_operation   write;
+#if PJ_HAS_TCP
+    struct accept_operation  accept;
+#endif
+};
+
+#define DECLARE_COMMON_KEY                          \
+    PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);   \
+    pj_ioqueue_t           *ioqueue;                \
+    pj_mutex_t             *mutex;                  \
+    pj_sock_t		    fd;                     \
+    int                     fd_type;                \
+    void		   *user_data;              \
+    pj_ioqueue_callback	    cb;                     \
+    int                     connecting;             \
+    struct read_operation   read_list;              \
+    struct write_operation  write_list;             \
+    struct accept_operation accept_list;
+
+
+#define DECLARE_COMMON_IOQUEUE                      \
+    pj_lock_t          *lock;                       \
+    pj_bool_t           auto_delete_lock;
+
+
+enum ioqueue_event_type
+{
+    NO_EVENT,
+    READABLE_EVENT,
+    WRITEABLE_EVENT,
+    EXCEPTION_EVENT,
+};
+
+static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
+                                pj_sock_t fd,
+                                enum ioqueue_event_type event_type );
+static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
+                                     pj_sock_t fd, 
+                                     enum ioqueue_event_type event_type);
+
diff --git a/pjlib/src/pj/ioqueue_epoll.c b/pjlib/src/pj/ioqueue_epoll.c
index 24f9bfb..aa01253 100644
--- a/pjlib/src/pj/ioqueue_epoll.c
+++ b/pjlib/src/pj/ioqueue_epoll.c
@@ -1,5 +1,4 @@
 /* $Id$
- *
  */
 /*
  * ioqueue_epoll.c
@@ -30,7 +29,7 @@
 
 #   define epoll_data		data.ptr
 #   define epoll_data_type	void*
-#   define ioctl_val_type	unsigned long*
+#   define ioctl_val_type	unsigned long
 #   define getsockopt_val_ptr	int*
 #   define os_getsockopt	getsockopt
 #   define os_ioctl		ioctl
@@ -126,51 +125,20 @@
 
 #define THIS_FILE   "ioq_epoll"
 
-#define PJ_IOQUEUE_IS_READ_OP(op)   ((op & PJ_IOQUEUE_OP_READ) || \
-                                     (op & PJ_IOQUEUE_OP_RECV) || \
-                                     (op & PJ_IOQUEUE_OP_RECV_FROM))
-#define PJ_IOQUEUE_IS_WRITE_OP(op)  ((op & PJ_IOQUEUE_OP_WRITE) || \
-                                     (op & PJ_IOQUEUE_OP_SEND) || \
-                                     (op & PJ_IOQUEUE_OP_SEND_TO))
-
-
-#if PJ_HAS_TCP
-#  define PJ_IOQUEUE_IS_ACCEPT_OP(op)	(op & PJ_IOQUEUE_OP_ACCEPT)
-#  define PJ_IOQUEUE_IS_CONNECT_OP(op)	(op & PJ_IOQUEUE_OP_CONNECT)
-#else
-#  define PJ_IOQUEUE_IS_ACCEPT_OP(op)	0
-#  define PJ_IOQUEUE_IS_CONNECT_OP(op)	0
-#endif
-
-
 //#define TRACE_(expr) PJ_LOG(3,expr)
 #define TRACE_(expr)
 
+/*
+ * Include common ioqueue abstraction.
+ */
+#include "ioqueue_common_abs.h"
 
 /*
  * This describes each key.
  */
 struct pj_ioqueue_key_t
 {
-    PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t)
-    pj_sock_t		    fd;
-    pj_ioqueue_operation_e  op;
-    void		   *user_data;
-    pj_ioqueue_callback	    cb;
-
-    void		   *rd_buf;
-    unsigned                rd_flags;
-    pj_size_t		    rd_buflen;
-    void		   *wr_buf;
-    pj_size_t		    wr_buflen;
-
-    pj_sockaddr_t	   *rmt_addr;
-    int			   *rmt_addrlen;
-
-    pj_sockaddr_t	   *local_addr;
-    int			   *local_addrlen;
-
-    pj_sock_t		   *accept_fd;
+    DECLARE_COMMON_KEY
 };
 
 /*
@@ -178,13 +146,18 @@
  */
 struct pj_ioqueue_t
 {
-    pj_lock_t          *lock;
-    pj_bool_t           auto_delete_lock;
+    DECLARE_COMMON_IOQUEUE
+
     unsigned		max, count;
     pj_ioqueue_key_t	hlist;
     int			epfd;
 };
 
+/* Include implementation for common abstraction after we declare
+ * pj_ioqueue_key_t and pj_ioqueue_t.
+ */
+#include "ioqueue_common_abs.c"
+
 /*
  * pj_ioqueue_create()
  *
@@ -192,37 +165,45 @@
  */
 PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, 
                                        pj_size_t max_fd,
-                                       int max_threads,
                                        pj_ioqueue_t **p_ioqueue)
 {
-    pj_ioqueue_t *ioque;
+    pj_ioqueue_t *ioqueue;
     pj_status_t rc;
+    pj_lock_t *lock;
 
-    PJ_UNUSED_ARG(max_threads);
+    /* Check that arguments are valid. */
+    PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && 
+                     max_fd > 0, PJ_EINVAL);
 
-    if (max_fd > PJ_IOQUEUE_MAX_HANDLES) {
-        pj_assert(!"max_fd too large");
-	return PJ_EINVAL;
-    }
+    /* Check that size of pj_ioqueue_op_key_t is sufficient */
+    PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
+                     sizeof(union operation_key), PJ_EBUG);
 
-    ioque = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
-    ioque->max = max_fd;
-    ioque->count = 0;
-    pj_list_init(&ioque->hlist);
+    ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
 
-    rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioque->lock);
+    ioqueue_init(ioqueue);
+
+    ioqueue->max = max_fd;
+    ioqueue->count = 0;
+    pj_list_init(&ioqueue->hlist);
+
+    rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
     if (rc != PJ_SUCCESS)
 	return rc;
 
-    ioque->auto_delete_lock = PJ_TRUE;
-    ioque->epfd = os_epoll_create(max_fd);
-    if (ioque->epfd < 0) {
+    rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
+    if (rc != PJ_SUCCESS)
+        return rc;
+
+    ioqueue->epfd = os_epoll_create(max_fd);
+    if (ioqueue->epfd < 0) {
+	ioqueue_destroy(ioqueue);
 	return PJ_RETURN_OS_ERROR(pj_get_native_os_error());
     }
     
-    PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioque));
+    PJ_LOG(4, ("pjlib", "epoll I/O Queue created (%p)", ioqueue));
 
-    *p_ioqueue = ioque;
+    *p_ioqueue = ioqueue;
     return PJ_SUCCESS;
 }
 
@@ -231,47 +212,24 @@
  *
  * Destroy ioqueue.
  */
-PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioque)
+PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
 {
-    PJ_ASSERT_RETURN(ioque, PJ_EINVAL);
-    PJ_ASSERT_RETURN(ioque->epfd > 0, PJ_EINVALIDOP);
+    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
+    PJ_ASSERT_RETURN(ioqueue->epfd > 0, PJ_EINVALIDOP);
 
-    pj_lock_acquire(ioque->lock);
-    os_close(ioque->epfd);
-    ioque->epfd = 0;
-    if (ioque->auto_delete_lock)
-        pj_lock_destroy(ioque->lock);
-    
-    return PJ_SUCCESS;
+    pj_lock_acquire(ioqueue->lock);
+    os_close(ioqueue->epfd);
+    ioqueue->epfd = 0;
+    return ioqueue_destroy(ioqueue);
 }
 
 /*
- * pj_ioqueue_set_lock()
- */
-PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque, 
-					 pj_lock_t *lock,
-					 pj_bool_t auto_delete )
-{
-    PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL);
-
-    if (ioque->auto_delete_lock) {
-        pj_lock_destroy(ioque->lock);
-    }
-
-    ioque->lock = lock;
-    ioque->auto_delete_lock = auto_delete;
-
-    return PJ_SUCCESS;
-}
-
-
-/*
  * pj_ioqueue_register_sock()
  *
  * Register a socket to ioqueue.
  */
 PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
-					      pj_ioqueue_t *ioque,
+					      pj_ioqueue_t *ioqueue,
 					      pj_sock_t sock,
 					      void *user_data,
 					      const pj_ioqueue_callback *cb,
@@ -283,12 +241,12 @@
     int status;
     pj_status_t rc = PJ_SUCCESS;
     
-    PJ_ASSERT_RETURN(pool && ioque && sock != PJ_INVALID_SOCKET &&
+    PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
                      cb && p_key, PJ_EINVAL);
 
-    pj_lock_acquire(ioque->lock);
+    pj_lock_acquire(ioqueue->lock);
 
-    if (ioque->count >= ioque->max) {
+    if (ioqueue->count >= ioqueue->max) {
         rc = PJ_ETOOMANY;
 	TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: too many files"));
 	goto on_return;
@@ -305,16 +263,19 @@
 
     /* Create key. */
     key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
-    key->fd = sock;
-    key->user_data = user_data;
-    pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
+    rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
+    if (rc != PJ_SUCCESS) {
+	key = NULL;
+	goto on_return;
+    }
 
     /* os_epoll_ctl. */
     ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;
     ev.epoll_data = (epoll_data_type)key;
-    status = os_epoll_ctl(ioque->epfd, EPOLL_CTL_ADD, sock, &ev);
+    status = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_ADD, sock, &ev);
     if (status < 0) {
 	rc = pj_get_os_error();
+	key = NULL;
 	TRACE_((THIS_FILE, 
                 "pj_ioqueue_register_sock error: os_epoll_ctl rc=%d", 
                 status));
@@ -322,12 +283,12 @@
     }
     
     /* Register */
-    pj_list_insert_before(&ioque->hlist, key);
-    ++ioque->count;
+    pj_list_insert_before(&ioqueue->hlist, key);
+    ++ioqueue->count;
 
 on_return:
     *p_key = key;
-    pj_lock_release(ioque->lock);
+    pj_lock_release(ioqueue->lock);
     
     return rc;
 }
@@ -337,179 +298,116 @@
  *
  * Unregister handle from ioqueue.
  */
-PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque,
-					   pj_ioqueue_key_t *key)
+PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
 {
+    pj_ioqueue_t *ioqueue;
     struct epoll_event ev;
     int status;
     
-    PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL);
+    PJ_ASSERT_RETURN(key != NULL, PJ_EINVAL);
 
-    pj_lock_acquire(ioque->lock);
+    ioqueue = key->ioqueue;
+    pj_lock_acquire(ioqueue->lock);
 
-    pj_assert(ioque->count > 0);
-    --ioque->count;
+    pj_assert(ioqueue->count > 0);
+    --ioqueue->count;
     pj_list_erase(key);
 
     ev.events = 0;
     ev.epoll_data = (epoll_data_type)key;
-    status = os_epoll_ctl( ioque->epfd, EPOLL_CTL_DEL, key->fd, &ev);
+    status = os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_DEL, key->fd, &ev);
     if (status != 0) {
 	pj_status_t rc = pj_get_os_error();
-	pj_lock_release(ioque->lock);
+	pj_lock_release(ioqueue->lock);
 	return rc;
     }
 
-    pj_lock_release(ioque->lock);
+    pj_lock_release(ioqueue->lock);
+
+    /* Destroy the key. */
+    ioqueue_destroy_key(key);
+
     return PJ_SUCCESS;
 }
 
-/*
- * pj_ioqueue_get_user_data()
- *
- * Obtain value associated with a key.
+/* ioqueue_remove_from_set()
+ * This function is called from ioqueue_dispatch_event() to instruct
+ * the ioqueue to remove the specified descriptor from ioqueue's descriptor
+ * set for the specified event.
  */
-PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
+static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
+                                     pj_sock_t fd, 
+                                     enum ioqueue_event_type event_type)
 {
-    PJ_ASSERT_RETURN(key != NULL, NULL);
-    return key->user_data;
 }
 
+/*
+ * ioqueue_add_to_set()
+ * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
+ * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
+ * set for the specified event.
+ */
+static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
+                                pj_sock_t fd,
+                                enum ioqueue_event_type event_type )
+{
+}
 
 /*
  * pj_ioqueue_poll()
  *
  */
-PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout)
+PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
 {
     int i, count, processed;
-    struct epoll_event events[16];
+    struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
     int msec;
+    struct queue {
+	pj_ioqueue_key_t	*key;
+	enum ioqueue_event_type	 event_type;
+    } queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
     
     PJ_CHECK_STACK();
 
     msec = timeout ? PJ_TIME_VAL_MSEC(*timeout) : 9000;
     
-    count = os_epoll_wait( ioque->epfd, events, PJ_ARRAY_SIZE(events), msec);
+    count = os_epoll_wait( ioqueue->epfd, events, PJ_ARRAY_SIZE(events), msec);
     if (count <= 0)
 	return count;
 
     /* Lock ioqueue. */
-    pj_lock_acquire(ioque->lock);
+    pj_lock_acquire(ioqueue->lock);
 
-    processed = 0;
-
-    for (i=0; i<count; ++i) {
+    for (processed=0, i=0; i<count; ++i) {
 	pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type)
 				events[i].epoll_data;
-	pj_status_t rc;
 
 	/*
-	 * Check for completion of read operations.
+	 * Check readability.
 	 */
-	if ((events[i].events & EPOLLIN) && (PJ_IOQUEUE_IS_READ_OP(h->op))) {
-	    pj_ssize_t bytes_read = h->rd_buflen;
-
-	    if ((h->op & PJ_IOQUEUE_OP_RECV_FROM)) {
-	        rc = pj_sock_recvfrom( h->fd, h->rd_buf, &bytes_read, 0,
-				       h->rmt_addr, h->rmt_addrlen);
-	    } else if ((h->op & PJ_IOQUEUE_OP_RECV)) {
-	        rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0);
-	    } else {
-		bytes_read = os_read( h->fd, h->rd_buf, bytes_read);
-		rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
-	    }
-	    
-	    if (rc != PJ_SUCCESS) {
-	        bytes_read = -rc;
-	    }
-
-	    h->op &= ~(PJ_IOQUEUE_OP_READ | PJ_IOQUEUE_OP_RECV | 
-		       PJ_IOQUEUE_OP_RECV_FROM);
-
-	    /* Call callback. */
-	    (*h->cb.on_read_complete)(h, bytes_read);
-
-	    ++processed;
-	}
-	/*
-	 * Check for completion of accept() operation.
-	 */
-	else if ((events[i].events & EPOLLIN) &&
-		 (h->op & PJ_IOQUEUE_OP_ACCEPT)) 
-	{
-	    /* accept() must be the only operation specified on 
-	     * server socket 
-	     */
-	    pj_assert( h->op == PJ_IOQUEUE_OP_ACCEPT);
-
-	    rc = pj_sock_accept( h->fd, h->accept_fd, 
-			         h->rmt_addr, h->rmt_addrlen);
-	    if (rc==PJ_SUCCESS && h->local_addr) {
-		rc = pj_sock_getsockname(*h->accept_fd, h->local_addr, 
-				          h->local_addrlen);
-	    }
-
-	    h->op &= ~(PJ_IOQUEUE_OP_ACCEPT);
-
-	    /* Call callback. */
-	    (*h->cb.on_accept_complete)(h, *h->accept_fd, rc);
-	    
+	if ((events[i].events & EPOLLIN) && 
+	    (key_has_pending_read(h) || key_has_pending_accept(h))) {
+	    queue[processed].key = h;
+	    queue[processed].event_type = READABLE_EVENT;
 	    ++processed;
 	}
 
 	/*
-	 * Check for completion of write operations.
+	 * Check for writeability.
 	 */
-	if ((events[i].events & EPOLLOUT) && PJ_IOQUEUE_IS_WRITE_OP(h->op)) {
-	    /* Completion of write(), send(), or sendto() operation. */
-
-	    /* Clear operation. */
-	    h->op &= ~(PJ_IOQUEUE_OP_WRITE | PJ_IOQUEUE_OP_SEND | 
-                       PJ_IOQUEUE_OP_SEND_TO);
-
-	    /* Call callback. */
-	    /* All data must have been sent? */
-	    (*h->cb.on_write_complete)(h, h->wr_buflen);
-
+	if ((events[i].events & EPOLLOUT) && key_has_pending_write(h)) {
+	    queue[processed].key = h;
+	    queue[processed].event_type = WRITEABLE_EVENT;
 	    ++processed;
 	}
+
 #if PJ_HAS_TCP
 	/*
 	 * Check for completion of connect() operation.
 	 */
-	else if ((events[i].events & EPOLLOUT) && 
-		 (h->op & PJ_IOQUEUE_OP_CONNECT)) 
-	{
-	    /* Completion of connect() operation */
-	    pj_ssize_t bytes_transfered;
-
-	    /* from connect(2): 
-		* On Linux, use getsockopt to read the SO_ERROR option at
-		* level SOL_SOCKET to determine whether connect() completed
-		* successfully (if SO_ERROR is zero).
-		*/
-	    int value;
-	    socklen_t vallen = sizeof(value);
-	    int gs_rc = os_getsockopt(h->fd, SOL_SOCKET, SO_ERROR, 
-                                      (getsockopt_val_ptr)&value, &vallen);
-	    if (gs_rc != 0) {
-		/* Argh!! What to do now??? 
-		 * Just indicate that the socket is connected. The
-		 * application will get error as soon as it tries to use
-		 * the socket to send/receive.
-		 */
-		bytes_transfered = 0;
-	    } else {
-                bytes_transfered = value;
-	    }
-
-	    /* Clear operation. */
-	    h->op &= (~PJ_IOQUEUE_OP_CONNECT);
-
-	    /* Call callback. */
-	    (*h->cb.on_connect_complete)(h, bytes_transfered);
-
+	if ((events[i].events & EPOLLOUT) && (h->connecting)) {
+	    queue[processed].key = h;
+	    queue[processed].event_type = WRITEABLE_EVENT;
 	    ++processed;
 	}
 #endif /* PJ_HAS_TCP */
@@ -517,321 +415,32 @@
 	/*
 	 * Check for error condition.
 	 */
-	if (events[i].events & EPOLLERR) {
-	    if (h->op & PJ_IOQUEUE_OP_CONNECT) {
-		h->op &= ~PJ_IOQUEUE_OP_CONNECT;
-
-		/* Call callback. */
-		(*h->cb.on_connect_complete)(h, -1);
-
-		++processed;
-	    }
+	if (events[i].events & EPOLLERR && (h->connecting)) {
+	    queue[processed].key = h;
+	    queue[processed].event_type = EXCEPTION_EVENT;
+	    ++processed;
 	}
     }
-    
-    pj_lock_release(ioque->lock);
+    pj_lock_release(ioqueue->lock);
+
+    /* Now process the events. */
+    for (i=0; i<processed; ++i) {
+	switch (queue[i].event_type) {
+        case READABLE_EVENT:
+            ioqueue_dispatch_read_event(ioqueue, queue[i].key);
+            break;
+        case WRITEABLE_EVENT:
+            ioqueue_dispatch_write_event(ioqueue, queue[i].key);
+            break;
+        case EXCEPTION_EVENT:
+            ioqueue_dispatch_exception_event(ioqueue, queue[i].key);
+            break;
+        case NO_EVENT:
+            pj_assert(!"Invalid event!");
+            break;
+        }
+    }
 
     return processed;
 }
 
-/*
- * pj_ioqueue_read()
- *
- * Start asynchronous read from the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque,
-			             pj_ioqueue_key_t *key,
-			             void *buffer,
-			             pj_size_t buflen)
-{
-    PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
-    PJ_CHECK_STACK();
-
-    /* For consistency with other ioqueue implementation, we would reject 
-     * if descriptor has already been submitted for reading before.
-     */
-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
-                     PJ_EBUSY);
-
-    pj_lock_acquire(ioque->lock);
-
-    key->op |= PJ_IOQUEUE_OP_READ;
-    key->rd_flags = 0;
-    key->rd_buf = buffer;
-    key->rd_buflen = buflen;
-
-    pj_lock_release(ioque->lock);
-    return PJ_EPENDING;
-}
-
-
-/*
- * pj_ioqueue_recv()
- *
- * Start asynchronous recv() from the socket.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_t *ioque,
-				      pj_ioqueue_key_t *key,
-				      void *buffer,
-				      pj_size_t buflen,
-				      unsigned flags )
-{
-    PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
-    PJ_CHECK_STACK();
-
-    /* For consistency with other ioqueue implementation, we would reject 
-     * if descriptor has already been submitted for reading before.
-     */
-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
-                     PJ_EBUSY);
-
-    pj_lock_acquire(ioque->lock);
-
-    key->op |= PJ_IOQUEUE_OP_RECV;
-    key->rd_buf = buffer;
-    key->rd_buflen = buflen;
-    key->rd_flags = flags;
-
-    pj_lock_release(ioque->lock);
-    return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_recvfrom()
- *
- * Start asynchronous recvfrom() from the socket.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque,
-				         pj_ioqueue_key_t *key,
-				         void *buffer,
-				         pj_size_t buflen,
-                                         unsigned flags,
-				         pj_sockaddr_t *addr,
-				         int *addrlen)
-{
-    PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
-    PJ_CHECK_STACK();
-
-    /* For consistency with other ioqueue implementation, we would reject 
-     * if descriptor has already been submitted for reading before.
-     */
-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
-                     PJ_EBUSY);
-
-    pj_lock_acquire(ioque->lock);
-
-    key->op |= PJ_IOQUEUE_OP_RECV_FROM;
-    key->rd_buf = buffer;
-    key->rd_buflen = buflen;
-    key->rd_flags = flags;
-    key->rmt_addr = addr;
-    key->rmt_addrlen = addrlen;
-
-    pj_lock_release(ioque->lock);
-    return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_write()
- *
- * Start asynchronous write() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque,
-			              pj_ioqueue_key_t *key,
-			              const void *data,
-			              pj_size_t datalen)
-{
-    pj_status_t rc;
-    pj_ssize_t sent;
-
-    PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
-    PJ_CHECK_STACK();
-
-    /* For consistency with other ioqueue implementation, we would reject 
-     * if descriptor has already been submitted for writing before.
-     */
-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
-                     PJ_EBUSY);
-
-    sent = datalen;
-    /* sent would be -1 after pj_sock_send() if it returns error. */
-    rc = pj_sock_send(key->fd, data, &sent, 0);
-    if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
-        return rc;
-    }
-
-    pj_lock_acquire(ioque->lock);
-
-    key->op |= PJ_IOQUEUE_OP_WRITE;
-    key->wr_buf = NULL;
-    key->wr_buflen = datalen;
-
-    pj_lock_release(ioque->lock);
-
-    return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_send()
- *
- * Start asynchronous send() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque,
-			             pj_ioqueue_key_t *key,
-			             const void *data,
-			             pj_size_t datalen,
-                                     unsigned flags)
-{
-    pj_status_t rc;
-    pj_ssize_t sent;
-
-    PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
-    PJ_CHECK_STACK();
-
-    /* For consistency with other ioqueue implementation, we would reject 
-     * if descriptor has already been submitted for writing before.
-     */
-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
-                     PJ_EBUSY);
-
-    sent = datalen;
-    /* sent would be -1 after pj_sock_send() if it returns error. */
-    rc = pj_sock_send(key->fd, data, &sent, flags);
-    if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
-        return rc;
-    }
-
-    pj_lock_acquire(ioque->lock);
-
-    key->op |= PJ_IOQUEUE_OP_SEND;
-    key->wr_buf = NULL;
-    key->wr_buflen = datalen;
-
-    pj_lock_release(ioque->lock);
-
-    return PJ_EPENDING;
-}
-
-
-/*
- * pj_ioqueue_sendto()
- *
- * Start asynchronous write() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque,
-			               pj_ioqueue_key_t *key,
-			               const void *data,
-			               pj_size_t datalen,
-                                       unsigned flags,
-			               const pj_sockaddr_t *addr,
-			               int addrlen)
-{
-    pj_status_t rc;
-    pj_ssize_t sent;
-
-    PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
-    PJ_CHECK_STACK();
-
-    /* For consistency with other ioqueue implementation, we would reject 
-     * if descriptor has already been submitted for writing before.
-     */
-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
-                     PJ_EBUSY);
-
-    sent = datalen;
-    /* sent would be -1 after pj_sock_sendto() if it returns error. */
-    rc = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
-    if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK))  {
-        return rc;
-    }
-
-    pj_lock_acquire(ioque->lock);
-
-    key->op |= PJ_IOQUEUE_OP_SEND_TO;
-    key->wr_buf = NULL;
-    key->wr_buflen = datalen;
-
-    pj_lock_release(ioque->lock);
-    return PJ_EPENDING;
-}
-
-#if PJ_HAS_TCP
-/*
- * Initiate overlapped accept() operation.
- */
-PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue,
-			       pj_ioqueue_key_t *key,
-			       pj_sock_t *new_sock,
-			       pj_sockaddr_t *local,
-			       pj_sockaddr_t *remote,
-			       int *addrlen)
-{
-    /* check parameters. All must be specified! */
-    pj_assert(ioqueue && key && new_sock);
-
-    /* Server socket must have no other operation! */
-    pj_assert(key->op == 0);
-    
-    pj_lock_acquire(ioqueue->lock);
-
-    key->op = PJ_IOQUEUE_OP_ACCEPT;
-    key->accept_fd = new_sock;
-    key->rmt_addr = remote;
-    key->rmt_addrlen = addrlen;
-    key->local_addr = local;
-    key->local_addrlen = addrlen;   /* use same addr. as rmt_addrlen */
-
-    pj_lock_release(ioqueue->lock);
-    return PJ_EPENDING;
-}
-
-/*
- * Initiate overlapped connect() operation (well, it's non-blocking actually,
- * since there's no overlapped version of connect()).
- */
-PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue,
-					pj_ioqueue_key_t *key,
-					const pj_sockaddr_t *addr,
-					int addrlen )
-{
-    pj_status_t rc;
-    
-    /* check parameters. All must be specified! */
-    PJ_ASSERT_RETURN(ioqueue && key && addr && addrlen, PJ_EINVAL);
-
-    /* Connecting socket must have no other operation! */
-    PJ_ASSERT_RETURN(key->op == 0, PJ_EBUSY);
-    
-    rc = pj_sock_connect(key->fd, addr, addrlen);
-    if (rc == PJ_SUCCESS) {
-	/* Connected! */
-	return PJ_SUCCESS;
-    } else {
-	if (rc == PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) || 
-            rc == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) 
-        {
-	    /* Pending! */
-	    pj_lock_acquire(ioqueue->lock);
-	    key->op = PJ_IOQUEUE_OP_CONNECT;
-	    pj_lock_release(ioqueue->lock);
-	    return PJ_EPENDING;
-	} else {
-	    /* Error! */
-	    return rc;
-	}
-    }
-}
-#endif	/* PJ_HAS_TCP */
-
diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c
index 24e6856..c205168 100644
--- a/pjlib/src/pj/ioqueue_select.c
+++ b/pjlib/src/pj/ioqueue_select.c
@@ -20,11 +20,11 @@
 #include <pj/compat/socket.h>
 #include <pj/sock_select.h>
 #include <pj/errno.h>
-

-/*

- * Include declaration from common abstraction.

- */

-#include "ioqueue_common_abs.h"

+
+/*
+ * Include declaration from common abstraction.
+ */
+#include "ioqueue_common_abs.h"
 
 /*
  * ISSUES with ioqueue_select()
@@ -38,30 +38,30 @@
  *
  */
 #define THIS_FILE   "ioq_select"
-

-/*

- * The select ioqueue relies on socket functions (pj_sock_xxx()) to return

- * the correct error code.

- */

-#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)

-#   error "Error reporting must be enabled for this function to work!"

-#endif

-

-/**

- * Get the number of descriptors in the set. This is defined in sock_select.c

- * This function will only return the number of sockets set from PJ_FD_SET

- * operation. When the set is modified by other means (such as by select()),

- * the count will not be reflected here.

- *

- * That's why don't export this function in the header file, to avoid

- * misunderstanding.

- *

- * @param fdsetp    The descriptor set.

- *

- * @return          Number of descriptors in the set.

- */

-PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);

-

+
+/*
+ * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
+ * the correct error code.
+ */
+#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
+#   error "Error reporting must be enabled for this function to work!"
+#endif
+
+/**
+ * Get the number of descriptors in the set. This is defined in sock_select.c
+ * This function will only return the number of sockets set from PJ_FD_SET
+ * operation. When the set is modified by other means (such as by select()),
+ * the count will not be reflected here.
+ *
+ * That's why don't export this function in the header file, to avoid
+ * misunderstanding.
+ *
+ * @param fdsetp    The descriptor set.
+ *
+ * @return          Number of descriptors in the set.
+ */
+PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
+
 
 /*
  * During debugging build, VALIDATE_FD_SET is set.
@@ -72,12 +72,12 @@
 #else
 #  define VALIDATE_FD_SET		0
 #endif
-

+
 /*
  * This describes each key.
  */
 struct pj_ioqueue_key_t
-{

+{
     DECLARE_COMMON_KEY
 };
 
@@ -86,7 +86,7 @@
  */
 struct pj_ioqueue_t
 {
-    DECLARE_COMMON_IOQUEUE

+    DECLARE_COMMON_IOQUEUE
 
     unsigned		max, count;
     pj_ioqueue_key_t	key_list;
@@ -96,11 +96,11 @@
     pj_fd_set_t		xfdset;
 #endif
 };
-

-/* Include implementation for common abstraction after we declare

- * pj_ioqueue_key_t and pj_ioqueue_t.

- */

-#include "ioqueue_common_abs.c"

+
+/* Include implementation for common abstraction after we declare
+ * pj_ioqueue_key_t and pj_ioqueue_t.
+ */
+#include "ioqueue_common_abs.c"
 
 /*
  * pj_ioqueue_create()
@@ -111,22 +111,22 @@
                                        pj_size_t max_fd,
                                        pj_ioqueue_t **p_ioqueue)
 {
-    pj_ioqueue_t *ioqueue;

+    pj_ioqueue_t *ioqueue;
     pj_lock_t *lock;
     pj_status_t rc;
-

-    /* Check that arguments are valid. */

-    PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && 

-                     max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES, 

-                     PJ_EINVAL);

-

-    /* Check that size of pj_ioqueue_op_key_t is sufficient */

-    PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=

-                     sizeof(union operation_key), PJ_EBUG);

-

-    ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));

-

-    ioqueue_init(ioqueue);

+
+    /* Check that arguments are valid. */
+    PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && 
+                     max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES, 
+                     PJ_EINVAL);
+
+    /* Check that size of pj_ioqueue_op_key_t is sufficient */
+    PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
+                     sizeof(union operation_key), PJ_EBUG);
+
+    ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
+
+    ioqueue_init(ioqueue);
 
     ioqueue->max = max_fd;
     ioqueue->count = 0;
@@ -141,8 +141,8 @@
     if (rc != PJ_SUCCESS)
 	return rc;
 
-    rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);

-    if (rc != PJ_SUCCESS)

+    rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
+    if (rc != PJ_SUCCESS)
         return rc;
 
     PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
@@ -159,8 +159,8 @@
 PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
 {
     PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
-

-    pj_lock_acquire(ioqueue->lock);

+
+    pj_lock_acquire(ioqueue->lock);
     return ioqueue_destroy(ioqueue);
 }
 
@@ -203,16 +203,18 @@
     }
 
     /* Create key. */
-    key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));

-    rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);

-    if (rc != PJ_SUCCESS)

-        return rc;

+    key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
+    rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
+    if (rc != PJ_SUCCESS) {
+	key = NULL;
+	goto on_return;
+    }
 
     /* Register */
     pj_list_insert_before(&ioqueue->key_list, key);
     ++ioqueue->count;
 
-on_return:

+on_return:
     /* On error, socket may be left in non-blocking mode. */
     *p_key = key;
     pj_lock_release(ioqueue->lock);
@@ -226,13 +228,13 @@
  * Unregister handle from ioqueue.
  */
 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
-{

-    pj_ioqueue_t *ioqueue;

+{
+    pj_ioqueue_t *ioqueue;
 
     PJ_ASSERT_RETURN(key, PJ_EINVAL);
-

-    ioqueue = key->ioqueue;

-

+
+    ioqueue = key->ioqueue;
+
     pj_lock_acquire(ioqueue->lock);
 
     pj_assert(ioqueue->count > 0);
@@ -243,21 +245,21 @@
 #if PJ_HAS_TCP
     PJ_FD_CLR(key->fd, &ioqueue->xfdset);
 #endif
-

-    /* ioqueue_destroy may try to acquire key's mutex.

-     * Since normally the order of locking is to lock key's mutex first

-     * then ioqueue's mutex, ioqueue_destroy may deadlock unless we

-     * release ioqueue's mutex first.

-     */

-    pj_lock_release(ioqueue->lock);

-

-    /* Destroy the key. */

-    ioqueue_destroy_key(key);

+
+    /* ioqueue_destroy may try to acquire key's mutex.
+     * Since normally the order of locking is to lock key's mutex first
+     * then ioqueue's mutex, ioqueue_destroy may deadlock unless we
+     * release ioqueue's mutex first.
+     */
+    pj_lock_release(ioqueue->lock);
+
+    /* Destroy the key. */
+    ioqueue_destroy_key(key);
 
     return PJ_SUCCESS;
 }
 
-

+
 /* This supposed to check whether the fd_set values are consistent
  * with the operation currently set in each key.
  */
@@ -307,54 +309,54 @@
     }
 }
 #endif	/* VALIDATE_FD_SET */
-

 
-/* ioqueue_remove_from_set()

- * This function is called from ioqueue_dispatch_event() to instruct

- * the ioqueue to remove the specified descriptor from ioqueue's descriptor

- * set for the specified event.

- */

-static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,

-                                     pj_sock_t fd, 

-                                     enum ioqueue_event_type event_type)

-{

-    pj_lock_acquire(ioqueue->lock);

-

-    if (event_type == READABLE_EVENT)

-        PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset);

-    else if (event_type == WRITEABLE_EVENT)

-        PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset);

-    else if (event_type == EXCEPTION_EVENT)

-        PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset);

-    else

-        pj_assert(0);

-

-    pj_lock_release(ioqueue->lock);

-}

-

-/*

- * ioqueue_add_to_set()

- * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc

- * to instruct the ioqueue to add the specified handle to ioqueue's descriptor

- * set for the specified event.

- */

-static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,

-                                pj_sock_t fd,

-                                enum ioqueue_event_type event_type )

-{

-    pj_lock_acquire(ioqueue->lock);

-

-    if (event_type == READABLE_EVENT)

-        PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset);

-    else if (event_type == WRITEABLE_EVENT)

-        PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset);

-    else if (event_type == EXCEPTION_EVENT)

-        PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset);

-    else

-        pj_assert(0);

-

-    pj_lock_release(ioqueue->lock);

-}

+
+/* ioqueue_remove_from_set()
+ * This function is called from ioqueue_dispatch_event() to instruct
+ * the ioqueue to remove the specified descriptor from ioqueue's descriptor
+ * set for the specified event.
+ */
+static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
+                                     pj_sock_t fd, 
+                                     enum ioqueue_event_type event_type)
+{
+    pj_lock_acquire(ioqueue->lock);
+
+    if (event_type == READABLE_EVENT)
+        PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset);
+    else if (event_type == WRITEABLE_EVENT)
+        PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset);
+    else if (event_type == EXCEPTION_EVENT)
+        PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset);
+    else
+        pj_assert(0);
+
+    pj_lock_release(ioqueue->lock);
+}
+
+/*
+ * ioqueue_add_to_set()
+ * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
+ * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
+ * set for the specified event.
+ */
+static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
+                                pj_sock_t fd,
+                                enum ioqueue_event_type event_type )
+{
+    pj_lock_acquire(ioqueue->lock);
+
+    if (event_type == READABLE_EVENT)
+        PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset);
+    else if (event_type == WRITEABLE_EVENT)
+        PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset);
+    else if (event_type == EXCEPTION_EVENT)
+        PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset);
+    else
+        pj_assert(0);
+
+    pj_lock_release(ioqueue->lock);
+}
 
 /*
  * pj_ioqueue_poll()
@@ -378,19 +380,19 @@
     pj_fd_set_t rfdset, wfdset, xfdset;
     int count, counter;
     pj_ioqueue_key_t *h;
-    struct event

-    {

-        pj_ioqueue_key_t    *key;

-        enum event_type      event_type;

-    } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];

-

-    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);

+    struct event
+    {
+        pj_ioqueue_key_t	*key;
+        enum ioqueue_event_type  event_type;
+    } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
+
+    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
 
     /* Lock ioqueue before making fd_set copies */
     pj_lock_acquire(ioqueue->lock);
-

-    /* We will only do select() when there are sockets to be polled.

-     * Otherwise select() will return error.

+
+    /* We will only do select() when there are sockets to be polled.
+     * Otherwise select() will return error.
      */
     if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
         PJ_FD_COUNT(&ioqueue->wfdset)==0 &&
@@ -422,71 +424,71 @@
     
     if (count <= 0)
 	return count;
-    else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)

-        count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;

+    else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
+        count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;
 
-    /* Scan descriptor sets for event and add the events in the event

-     * array to be processed later in this function. We do this so that

-     * events can be processed in parallel without holding ioqueue lock.

+    /* Scan descriptor sets for event and add the events in the event
+     * array to be processed later in this function. We do this so that
+     * events can be processed in parallel without holding ioqueue lock.
      */
     pj_lock_acquire(ioqueue->lock);
-

-    counter = 0;

 
-    /* Scan for writable sockets first to handle piggy-back data

-     * coming with accept().

-     */

-    h = ioqueue->key_list.next;

-    for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) {

-	if ( (key_has_pending_write(h) || key_has_pending_connect(h))

-	     && PJ_FD_ISSET(h->fd, &wfdset))

-        {

-            event[counter].key = h;

-            event[counter].event_type = WRITEABLE_EVENT;

-            ++counter;

-        }

-

-        /* Scan for readable socket. */

-	if ((key_has_pending_read(h) || key_has_pending_accept(h))

-            && PJ_FD_ISSET(h->fd, &rfdset))

-        {

-            event[counter].key = h;

-            event[counter].event_type = READABLE_EVENT;

-            ++counter;        }

-

-#if PJ_HAS_TCP

-        if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) {

-            event[counter].key = h;

-            event[counter].event_type = EXCEPTION_EVENT;

-            ++counter;

-        }

-#endif

-    }

-

-    pj_lock_release(ioqueue->lock);

-

-    count = counter;

-

-    /* Now process all events. The dispatch functions will take care

-     * of locking in each of the key

-     */

-    for (counter=0; counter<count; ++counter) {

-        switch (event[counter].event_type) {

-        case READABLE_EVENT:

-            ioqueue_dispatch_read_event(ioqueue, event[counter].key);

-            break;

-        case WRITEABLE_EVENT:

-            ioqueue_dispatch_write_event(ioqueue, event[counter].key);

-            break;

-        case EXCEPTION_EVENT:

-            ioqueue_dispatch_exception_event(ioqueue, event[counter].key);

-            break;

-        case NO_EVENT:

-        default:

-            pj_assert(!"Invalid event!");

-            break;

-        }

-    }

+    counter = 0;
+
+    /* Scan for writable sockets first to handle piggy-back data
+     * coming with accept().
+     */
+    h = ioqueue->key_list.next;
+    for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) {
+	if ( (key_has_pending_write(h) || key_has_pending_connect(h))
+	     && PJ_FD_ISSET(h->fd, &wfdset))
+        {
+            event[counter].key = h;
+            event[counter].event_type = WRITEABLE_EVENT;
+            ++counter;
+        }
+
+        /* Scan for readable socket. */
+	if ((key_has_pending_read(h) || key_has_pending_accept(h))
+            && PJ_FD_ISSET(h->fd, &rfdset))
+        {
+            event[counter].key = h;
+            event[counter].event_type = READABLE_EVENT;
+            ++counter;
+	}
+
+#if PJ_HAS_TCP
+        if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) {
+            event[counter].key = h;
+            event[counter].event_type = EXCEPTION_EVENT;
+            ++counter;
+        }
+#endif
+    }
+
+    pj_lock_release(ioqueue->lock);
+
+    count = counter;
+
+    /* Now process all events. The dispatch functions will take care
+     * of locking in each of the key
+     */
+    for (counter=0; counter<count; ++counter) {
+        switch (event[counter].event_type) {
+        case READABLE_EVENT:
+            ioqueue_dispatch_read_event(ioqueue, event[counter].key);
+            break;
+        case WRITEABLE_EVENT:
+            ioqueue_dispatch_write_event(ioqueue, event[counter].key);
+            break;
+        case EXCEPTION_EVENT:
+            ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
+            break;
+        case NO_EVENT:
+            pj_assert(!"Invalid event!");
+            break;
+        }
+    }
 
     return count;
 }
diff --git a/pjlib/src/pj/os_core_unix.c b/pjlib/src/pj/os_core_unix.c
index 18f4ded..0c951e9 100644
--- a/pjlib/src/pj/os_core_unix.c
+++ b/pjlib/src/pj/os_core_unix.c
@@ -715,12 +715,14 @@
 
     if (type == PJ_MUTEX_SIMPLE) {
 #if defined(PJ_LINUX) && PJ_LINUX!=0
+	extern int pthread_mutexattr_settype(pthread_mutexattr_t*,int);
 	rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_FAST_NP);
 #else
 	rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
 #endif
     } else {
 #if defined(PJ_LINUX) && PJ_LINUX!=0
+	extern int pthread_mutexattr_settype(pthread_mutexattr_t*,int);
 	rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE_NP);
 #else
 	rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
diff --git a/pjlib/src/pj/sock_bsd.c b/pjlib/src/pj/sock_bsd.c
index 92495df..21c10d1 100644
--- a/pjlib/src/pj/sock_bsd.c
+++ b/pjlib/src/pj/sock_bsd.c
@@ -104,7 +104,9 @@
  */
 PJ_DEF(char*) pj_inet_ntoa(pj_in_addr inaddr)
 {
-    return inet_ntoa(*(struct in_addr*)&inaddr);
+    struct in_addr addr;
+    addr.s_addr = inaddr.s_addr;
+    return inet_ntoa(addr);
 }
 
 /*
diff --git a/pjlib/src/pjlib-test/ioq_perf.c b/pjlib/src/pjlib-test/ioq_perf.c
index 3305fb6..a4ee005 100644
--- a/pjlib/src/pjlib-test/ioq_perf.c
+++ b/pjlib/src/pjlib-test/ioq_perf.c
@@ -38,9 +38,9 @@
                          client_fd;
     pj_ioqueue_t        *ioqueue;
     pj_ioqueue_key_t    *server_key,
-                        *client_key;

-    pj_ioqueue_op_key_t  recv_op,

-                         send_op;

+                        *client_key;
+    pj_ioqueue_op_key_t  recv_op,
+                         send_op;
     int                  has_pending_send;
     pj_size_t            buffer_size;
     char                *outgoing_buffer;
@@ -52,16 +52,16 @@
 /* Callback when data has been read.
  * Increment item->bytes_recv and ready to read the next data.
  */
-static void on_read_complete(pj_ioqueue_key_t *key, 

-                             pj_ioqueue_op_key_t *op_key,

+static void on_read_complete(pj_ioqueue_key_t *key, 
+                             pj_ioqueue_op_key_t *op_key,
                              pj_ssize_t bytes_read)
 {
     test_item *item = pj_ioqueue_get_user_data(key);
-    pj_status_t rc;

+    pj_status_t rc;
     int data_is_available = 1;
 
     //TRACE_((THIS_FILE, "     read complete, bytes_read=%d", bytes_read));
-

+
     do {
         if (thread_quit_flag)
             return;
@@ -76,7 +76,7 @@
 	        PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)", 
 		          bytes_read, errmsg));
 	        PJ_LOG(3,(THIS_FILE, 
-		          ".....additional info: total read=%u, total written=%u",
+		          ".....additional info: total read=%u, total sent=%u",
 		          item->bytes_recv, item->bytes_sent));
 	    } else {
 	        last_error_counter++;
@@ -94,44 +94,44 @@
          */
         if (item->bytes_recv > item->buffer_size * 10000) 
 	    thread_quit_flag = 1;
-

+
         bytes_read = item->buffer_size;
         rc = pj_ioqueue_recv( key, op_key,
                               item->incoming_buffer, &bytes_read, 0 );
 
-        if (rc == PJ_SUCCESS) {

-            data_is_available = 1;

-        } else if (rc == PJ_EPENDING) {

-            data_is_available = 0;

-        } else {

+        if (rc == PJ_SUCCESS) {
+            data_is_available = 1;
+        } else if (rc == PJ_EPENDING) {
+            data_is_available = 0;
+        } else {
             data_is_available = 0;
 	    if (rc != last_error) {
 	        last_error = rc;
-	        app_perror("...error: read error", rc);
+	        app_perror("...error: read error(1)", rc);
 	    } else {
 	        last_error_counter++;
 	    }
-        }

-

-        if (!item->has_pending_send) {

-            pj_ssize_t sent = item->buffer_size;

-            rc = pj_ioqueue_send(item->client_key, &item->send_op,

-                                 item->outgoing_buffer, &sent, 0);

-            if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {

-                app_perror("...error: write error", rc);

-            }

-

-            item->has_pending_send = (rc==PJ_EPENDING);

-        }

-

+        }
+
+        if (!item->has_pending_send) {
+            pj_ssize_t sent = item->buffer_size;
+            rc = pj_ioqueue_send(item->client_key, &item->send_op,
+                                 item->outgoing_buffer, &sent, 0);
+            if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
+                app_perror("...error: write error", rc);
+            }
+
+            item->has_pending_send = (rc==PJ_EPENDING);
+        }
+
     } while (data_is_available);
 }
 
 /* Callback when data has been written.
  * Increment item->bytes_sent and write the next data.
  */
-static void on_write_complete(pj_ioqueue_key_t *key, 

-                              pj_ioqueue_op_key_t *op_key,

+static void on_write_complete(pj_ioqueue_key_t *key, 
+                              pj_ioqueue_op_key_t *op_key,
                               pj_ssize_t bytes_sent)
 {
     test_item *item = pj_ioqueue_get_user_data(key);
@@ -140,7 +140,7 @@
 
     if (thread_quit_flag)
         return;
-

+
     item->has_pending_send = 0;
     item->bytes_sent += bytes_sent;
 
@@ -150,14 +150,14 @@
     } 
     else {
         pj_status_t rc;
-

+
         bytes_sent = item->buffer_size;
         rc = pj_ioqueue_send( item->client_key, op_key,
                               item->outgoing_buffer, &bytes_sent, 0);
         if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
             app_perror("...error: write error", rc);
-        }

-

+        }
+
         item->has_pending_send = (rc==PJ_EPENDING);
     }
 }
@@ -231,7 +231,7 @@
 
     /* Initialize each producer-consumer pair. */
     for (i=0; i<sockpair_cnt; ++i) {
-        pj_ssize_t bytes;

+        pj_ssize_t bytes;
 
         items[i].ioqueue = ioqueue;
         items[i].buffer_size = buffer_size;
@@ -274,7 +274,7 @@
         }
 
         /* Start reading. */
-	TRACE_((THIS_FILE, "      pj_ioqueue_recv.."));

+	TRACE_((THIS_FILE, "      pj_ioqueue_recv.."));
         bytes = items[i].buffer_size;
         rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op,
                              items[i].incoming_buffer, &bytes,
@@ -285,7 +285,7 @@
         }
 
         /* Start writing. */
-	TRACE_((THIS_FILE, "      pj_ioqueue_write.."));

+	TRACE_((THIS_FILE, "      pj_ioqueue_write.."));
         bytes = items[i].buffer_size;
         rc = pj_ioqueue_send(items[i].client_key, &items[i].recv_op,
                              items[i].outgoing_buffer, &bytes, 0);
@@ -293,7 +293,7 @@
             app_perror("...error: pj_ioqueue_write", rc);
             return -76;
         }
-

+
         items[i].has_pending_send = (rc==PJ_EPENDING);
     }