Changed ioqueue to allow simultaneous operations on the same key

git-svn-id: https://svn.pjsip.org/repos/pjproject/main@11 74dad513-b988-da41-8d7b-12977e46ad98
diff --git a/pjlib/src/pj/config.c b/pjlib/src/pj/config.c
index 962fbdc..3354f13 100644
--- a/pjlib/src/pj/config.c
+++ b/pjlib/src/pj/config.c
@@ -4,7 +4,7 @@
 #include <pj/log.h>
 
 static const char *id = "config.c";
-const char *PJ_VERSION = "0.3.0-pre1";
+const char *PJ_VERSION = "0.3.0-pre4";
 
 PJ_DEF(void) pj_dump_config(void)
 {
diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c
index 37d6f66..367ffb5 100644
--- a/pjlib/src/pj/ioqueue_select.c
+++ b/pjlib/src/pj/ioqueue_select.c
@@ -33,22 +33,32 @@
  *
  */
 #define THIS_FILE   "ioq_select"
-
-#define PJ_IOQUEUE_IS_READ_OP(op)   ((op & PJ_IOQUEUE_OP_READ) || \
-                                     (op & PJ_IOQUEUE_OP_RECV) || \
-                                     (op & PJ_IOQUEUE_OP_RECV_FROM))
-#define PJ_IOQUEUE_IS_WRITE_OP(op)  ((op & PJ_IOQUEUE_OP_WRITE) || \
-                                     (op & PJ_IOQUEUE_OP_SEND) || \
-                                     (op & PJ_IOQUEUE_OP_SEND_TO))
+

+/*

+ * The select ioqueue relies on socket functions (pj_sock_xxx()) to return

+ * the correct error code.

+ */

+#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)

+#   error "Error reporting must be enabled for this function to work!"

+#endif

+

+/**

+ * Get the number of descriptors in the set. This is defined in sock_select.c

+ * This function will only return the number of sockets set from PJ_FD_SET

+ * operation. When the set is modified by other means (such as by select()),

+ * the count will not be reflected here.

+ *

+ * That's why don't export this function in the header file, to avoid

+ * misunderstanding.

+ *

+ * @param fdsetp    The descriptor set.

+ *

+ * @return          Number of descriptors in the set.

+ */

+PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);

+

 
 
-#if PJ_HAS_TCP
-#  define PJ_IOQUEUE_IS_ACCEPT_OP(op)	(op & PJ_IOQUEUE_OP_ACCEPT)
-#  define PJ_IOQUEUE_IS_CONNECT_OP(op)	(op & PJ_IOQUEUE_OP_CONNECT)
-#else
-#  define PJ_IOQUEUE_IS_ACCEPT_OP(op)	0
-#  define PJ_IOQUEUE_IS_CONNECT_OP(op)	0
-#endif
 
 /*
  * During debugging build, VALIDATE_FD_SET is set.
@@ -59,31 +69,77 @@
 #else
 #  define VALIDATE_FD_SET		0
 #endif
+

+struct generic_operation

+{

+    PJ_DECL_LIST_MEMBER(struct generic_operation);

+    pj_ioqueue_operation_e  op;

+};

+

+struct read_operation

+{

+    PJ_DECL_LIST_MEMBER(struct read_operation);

+    pj_ioqueue_operation_e  op;

+

+    void		   *buf;

+    pj_size_t		    size;

+    unsigned                flags;

+    pj_sockaddr_t	   *rmt_addr;

+    int			   *rmt_addrlen;

+};

+

+struct write_operation

+{

+    PJ_DECL_LIST_MEMBER(struct write_operation);

+    pj_ioqueue_operation_e  op;

+

+    char		   *buf;

+    pj_size_t		    size;

+    pj_ssize_t              written;

+    unsigned                flags;

+    pj_sockaddr_in	    rmt_addr;

+    int			    rmt_addrlen;

+};

+

+#if PJ_HAS_TCP

+struct accept_operation

+{

+    PJ_DECL_LIST_MEMBER(struct accept_operation);

+    pj_ioqueue_operation_e  op;

+

+    pj_sock_t              *accept_fd;

+    pj_sockaddr_t	   *local_addr;

+    pj_sockaddr_t	   *rmt_addr;

+    int			   *addrlen;

+};

+#endif

+

+union operation_key

+{

+    struct generic_operation generic;

+    struct read_operation    read;

+    struct write_operation   write;

+#if PJ_HAS_TCP

+    struct accept_operation  accept;

+#endif

+};

 
 /*
  * This describes each key.
  */
 struct pj_ioqueue_key_t
 {
-    PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t)
+    PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);

+    pj_ioqueue_t           *ioqueue;
     pj_sock_t		    fd;
-    pj_ioqueue_operation_e  op;
     void		   *user_data;
-    pj_ioqueue_callback	    cb;
-
-    void		   *rd_buf;
-    unsigned                rd_flags;
-    pj_size_t		    rd_buflen;
-    void		   *wr_buf;
-    pj_size_t		    wr_buflen;
-
-    pj_sockaddr_t	   *rmt_addr;
-    int			   *rmt_addrlen;
-
-    pj_sockaddr_t	   *local_addr;
-    int			   *local_addrlen;
-
-    pj_sock_t		   *accept_fd;
+    pj_ioqueue_callback	    cb;

+    int                     connecting;

+    struct read_operation   read_list;

+    struct write_operation  write_list;

+#if PJ_HAS_TCP

+    struct accept_operation accept_list;

+#endif
 };
 
 /*
@@ -94,7 +150,7 @@
     pj_lock_t          *lock;
     pj_bool_t           auto_delete_lock;
     unsigned		max, count;
-    pj_ioqueue_key_t	hlist;
+    pj_ioqueue_key_t	key_list;
     pj_fd_set_t		rfdset;
     pj_fd_set_t		wfdset;
 #if PJ_HAS_TCP
@@ -109,38 +165,39 @@
  */
 PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, 
                                        pj_size_t max_fd,
-                                       int max_threads,
                                        pj_ioqueue_t **p_ioqueue)
 {
-    pj_ioqueue_t *ioque;
+    pj_ioqueue_t *ioqueue;
     pj_status_t rc;
-
-    PJ_UNUSED_ARG(max_threads);
-
-    if (max_fd > PJ_IOQUEUE_MAX_HANDLES) {
-        pj_assert(!"max_fd too large");
-	return PJ_EINVAL;
-    }
-
-    ioque = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
-    ioque->max = max_fd;
-    ioque->count = 0;
-    PJ_FD_ZERO(&ioque->rfdset);
-    PJ_FD_ZERO(&ioque->wfdset);
+

+    /* Check that arguments are valid. */

+    PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && 

+                     max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES, 

+                     PJ_EINVAL);

+

+    /* Check that size of pj_ioqueue_op_key_t is sufficient */

+    PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=

+                     sizeof(union operation_key), PJ_EBUG);

+

+    ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
+    ioqueue->max = max_fd;
+    ioqueue->count = 0;
+    PJ_FD_ZERO(&ioqueue->rfdset);
+    PJ_FD_ZERO(&ioqueue->wfdset);
 #if PJ_HAS_TCP
-    PJ_FD_ZERO(&ioque->xfdset);
+    PJ_FD_ZERO(&ioqueue->xfdset);
 #endif
-    pj_list_init(&ioque->hlist);
+    pj_list_init(&ioqueue->key_list);
 
-    rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioque->lock);
+    rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioqueue->lock);
     if (rc != PJ_SUCCESS)
 	return rc;
 
-    ioque->auto_delete_lock = PJ_TRUE;
+    ioqueue->auto_delete_lock = PJ_TRUE;
 
-    PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioque));
+    PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
 
-    *p_ioqueue = ioque;
+    *p_ioqueue = ioqueue;
     return PJ_SUCCESS;
 }
 
@@ -149,46 +206,28 @@
  *
  * Destroy ioqueue.
  */
-PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioque)
+PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
 {
     pj_status_t rc = PJ_SUCCESS;
 
-    PJ_ASSERT_RETURN(ioque, PJ_EINVAL);
+    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
+

+    pj_lock_acquire(ioqueue->lock);

 
-    if (ioque->auto_delete_lock)
-        rc = pj_lock_destroy(ioque->lock);
+    if (ioqueue->auto_delete_lock)
+        rc = pj_lock_destroy(ioqueue->lock);
 
     return rc;
 }
 
 
 /*
- * pj_ioqueue_set_lock()
- */
-PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque, 
-					 pj_lock_t *lock,
-					 pj_bool_t auto_delete )
-{
-    PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL);
-
-    if (ioque->auto_delete_lock) {
-        pj_lock_destroy(ioque->lock);
-    }
-
-    ioque->lock = lock;
-    ioque->auto_delete_lock = auto_delete;
-
-    return PJ_SUCCESS;
-}
-
-
-/*
  * pj_ioqueue_register_sock()
  *
  * Register a handle to ioqueue.
  */
 PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
-					      pj_ioqueue_t *ioque,
+					      pj_ioqueue_t *ioqueue,
 					      pj_sock_t sock,
 					      void *user_data,
 					      const pj_ioqueue_callback *cb,
@@ -198,12 +237,12 @@
     pj_uint32_t value;
     pj_status_t rc = PJ_SUCCESS;
     
-    PJ_ASSERT_RETURN(pool && ioque && sock != PJ_INVALID_SOCKET &&
+    PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
                      cb && p_key, PJ_EINVAL);
 
-    pj_lock_acquire(ioque->lock);
+    pj_lock_acquire(ioqueue->lock);
 
-    if (ioque->count >= ioque->max) {
+    if (ioqueue->count >= ioqueue->max) {
         rc = PJ_ETOOMANY;
 	goto on_return;
     }
@@ -211,7 +250,7 @@
     /* Set socket to nonblocking. */
     value = 1;
 #ifdef PJ_WIN32
-    if (ioctlsocket(sock, FIONBIO, (unsigned long*)&value)) {
+    if (ioctlsocket(sock, FIONBIO, (u_long*)&value)) {
 #else
     if (ioctl(sock, FIONBIO, &value)) {
 #endif
@@ -220,20 +259,27 @@
     }
 
     /* Create key. */
-    key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
+    key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));

+    key->ioqueue = ioqueue;
     key->fd = sock;
-    key->user_data = user_data;
-
+    key->user_data = user_data;

+    pj_list_init(&key->read_list);

+    pj_list_init(&key->write_list);

+#if PJ_HAS_TCP

+    pj_list_init(&key->accept_list);

+#endif
+

     /* Save callback. */
     pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
 
     /* Register */
-    pj_list_insert_before(&ioque->hlist, key);
-    ++ioque->count;
+    pj_list_insert_before(&ioqueue->key_list, key);
+    ++ioqueue->count;
 
-on_return:
+on_return:

+    /* On error, socket may be left in non-blocking mode. */
     *p_key = key;
-    pj_lock_release(ioque->lock);
+    pj_lock_release(ioqueue->lock);
     
     return rc;
 }
@@ -243,23 +289,26 @@
  *
  * Unregister handle from ioqueue.
  */
-PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque,
-					   pj_ioqueue_key_t *key)
-{
-    PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL);
+PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
+{

+    pj_ioqueue_t *ioqueue;

 
-    pj_lock_acquire(ioque->lock);
+    PJ_ASSERT_RETURN(key, PJ_EINVAL);
+

+    ioqueue = key->ioqueue;

 
-    pj_assert(ioque->count > 0);
-    --ioque->count;
+    pj_lock_acquire(ioqueue->lock);
+
+    pj_assert(ioqueue->count > 0);
+    --ioqueue->count;
     pj_list_erase(key);
-    PJ_FD_CLR(key->fd, &ioque->rfdset);
-    PJ_FD_CLR(key->fd, &ioque->wfdset);
+    PJ_FD_CLR(key->fd, &ioqueue->rfdset);
+    PJ_FD_CLR(key->fd, &ioqueue->wfdset);
 #if PJ_HAS_TCP
-    PJ_FD_CLR(key->fd, &ioque->xfdset);
+    PJ_FD_CLR(key->fd, &ioqueue->xfdset);
 #endif
-
-    pj_lock_release(ioque->lock);
+

+    pj_lock_release(ioqueue->lock);
     return PJ_SUCCESS;
 }
 
@@ -274,25 +323,40 @@
     return key->user_data;
 }
 
+

+/*

+ * pj_ioqueue_set_user_data()

+ */

+PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,

+                                              void *user_data,

+                                              void **old_data)

+{

+    PJ_ASSERT_RETURN(key, PJ_EINVAL);

+

+    if (old_data)

+        *old_data = key->user_data;

+    key->user_data = user_data;

+

+    return PJ_SUCCESS;

+}

+

 
 /* This supposed to check whether the fd_set values are consistent
  * with the operation currently set in each key.
  */
 #if VALIDATE_FD_SET
-static void validate_sets(const pj_ioqueue_t *ioque,
+static void validate_sets(const pj_ioqueue_t *ioqueue,
 			  const pj_fd_set_t *rfdset,
 			  const pj_fd_set_t *wfdset,
 			  const pj_fd_set_t *xfdset)
 {
     pj_ioqueue_key_t *key;
 
-    key = ioque->hlist.next;
-    while (key != &ioque->hlist) {
-	if ((key->op & PJ_IOQUEUE_OP_READ) 
-	    || (key->op & PJ_IOQUEUE_OP_RECV)
-	    || (key->op & PJ_IOQUEUE_OP_RECV_FROM)
+    key = ioqueue->key_list.next;
+    while (key != &ioqueue->key_list) {
+	if (!pj_list_empty(&key->read_list)
 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
-	    || (key->op & PJ_IOQUEUE_OP_ACCEPT)
+	    || !pj_list_empty(&key->accept_list)
 #endif
 	    ) 
 	{
@@ -301,11 +365,9 @@
 	else {
 	    pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
 	}
-	if ((key->op & PJ_IOQUEUE_OP_WRITE)
-	    || (key->op & PJ_IOQUEUE_OP_SEND)
-	    || (key->op & PJ_IOQUEUE_OP_SEND_TO)
+	if (!pj_list_empty(&key->write_list)
 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
-	    || (key->op & PJ_IOQUEUE_OP_CONNECT)
+	    || key->connecting
 #endif
 	   )
 	{
@@ -315,7 +377,7 @@
 	    pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
 	}
 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
-	if (key->op & PJ_IOQUEUE_OP_CONNECT)
+	if (key->connecting)
 	{
 	    pj_assert(PJ_FD_ISSET(key->fd, xfdset));
 	}
@@ -347,124 +409,263 @@
  *  - to guarantee preemptiveness etc, the poll function must strictly
  *    work on fd_set copy of the ioqueue (not the original one).
  */
-PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout)
+PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
 {
     pj_fd_set_t rfdset, wfdset, xfdset;
     int count;
     pj_ioqueue_key_t *h;
+

+    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);

 
     /* Lock ioqueue before making fd_set copies */
-    pj_lock_acquire(ioque->lock);
-
-    if (PJ_FD_COUNT(&ioque->rfdset)==0 &&
-        PJ_FD_COUNT(&ioque->wfdset)==0 &&
-        PJ_FD_COUNT(&ioque->xfdset)==0)
+    pj_lock_acquire(ioqueue->lock);
+

+    /* We will only do select() when there are sockets to be polled.

+     * Otherwise select() will return error.

+     */
+    if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
+        PJ_FD_COUNT(&ioqueue->wfdset)==0 &&
+        PJ_FD_COUNT(&ioqueue->xfdset)==0)
     {
-        pj_lock_release(ioque->lock);
+        pj_lock_release(ioqueue->lock);
         if (timeout)
             pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
         return 0;
     }
 
     /* Copy ioqueue's pj_fd_set_t to local variables. */
-    pj_memcpy(&rfdset, &ioque->rfdset, sizeof(pj_fd_set_t));
-    pj_memcpy(&wfdset, &ioque->wfdset, sizeof(pj_fd_set_t));
+    pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));
+    pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));
 #if PJ_HAS_TCP
-    pj_memcpy(&xfdset, &ioque->xfdset, sizeof(pj_fd_set_t));
+    pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));
 #else
     PJ_FD_ZERO(&xfdset);
 #endif
 
 #if VALIDATE_FD_SET
-    validate_sets(ioque, &rfdset, &wfdset, &xfdset);
+    validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);
 #endif
 
     /* Unlock ioqueue before select(). */
-    pj_lock_release(ioque->lock);
+    pj_lock_release(ioqueue->lock);
 
     count = pj_sock_select(FD_SETSIZE, &rfdset, &wfdset, &xfdset, timeout);
     
     if (count <= 0)
 	return count;
 
-    /* Lock ioqueue again before scanning for signalled sockets. */
-    pj_lock_acquire(ioque->lock);
+    /* Lock ioqueue again before scanning for signalled sockets. 

+     * We must strictly use recursive mutex since application may invoke

+     * the ioqueue again inside the callback.

+     */
+    pj_lock_acquire(ioqueue->lock);
 
-#if PJ_HAS_TCP
-    /* Scan for exception socket */
-    h = ioque->hlist.next;
-do_except_scan:
-    for ( ; h!=&ioque->hlist; h = h->next) {
-	if ((h->op & PJ_IOQUEUE_OP_CONNECT) && PJ_FD_ISSET(h->fd, &xfdset))
-	    break;
-    }
-    if (h != &ioque->hlist) {
-	/* 'connect()' should be the only operation. */
-	pj_assert((h->op == PJ_IOQUEUE_OP_CONNECT));
-
-	/* Clear operation. */
-	h->op &= ~(PJ_IOQUEUE_OP_CONNECT);
-	PJ_FD_CLR(h->fd, &ioque->wfdset);
-	PJ_FD_CLR(h->fd, &ioque->xfdset);
-        PJ_FD_CLR(h->fd, &wfdset);
-        PJ_FD_CLR(h->fd, &xfdset);
-
-	/* Call callback. */
-        if (h->cb.on_connect_complete)
-	    (*h->cb.on_connect_complete)(h, -1);
-
-        /* Re-scan exception list. */
-        goto do_except_scan;
-    }
-#endif	/* PJ_HAS_TCP */
-
+    /* Scan for writable sockets first to handle piggy-back data

+     * coming with accept().

+     */

+    h = ioqueue->key_list.next;

+do_writable_scan:

+    for ( ; h!=&ioqueue->key_list; h = h->next) {

+	if ( (!pj_list_empty(&h->write_list) || h->connecting)

+	     && PJ_FD_ISSET(h->fd, &wfdset))

+        {

+	    break;

+        }

+    }

+    if (h != &ioqueue->key_list) {

+	pj_assert(!pj_list_empty(&h->write_list) || h->connecting);

+

+#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0

+	if (h->connecting) {

+	    /* Completion of connect() operation */

+	    pj_ssize_t bytes_transfered;

+

+#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)

+	    /* from connect(2): 

+	     * On Linux, use getsockopt to read the SO_ERROR option at

+	     * level SOL_SOCKET to determine whether connect() completed

+	     * successfully (if SO_ERROR is zero).

+	     */

+	    int value;

+	    socklen_t vallen = sizeof(value);

+	    int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR, 

+                                   &value, &vallen);

+	    if (gs_rc != 0) {

+		/* Argh!! What to do now??? 

+		 * Just indicate that the socket is connected. The

+		 * application will get error as soon as it tries to use

+		 * the socket to send/receive.

+		 */

+		bytes_transfered = 0;

+	    } else {

+                bytes_transfered = value;

+	    }

+#elif defined(PJ_WIN32) && PJ_WIN32!=0

+	    bytes_transfered = 0; /* success */

+#else

+	    /* Excellent information in D.J. Bernstein page:

+	     * http://cr.yp.to/docs/connect.html

+	     *

+	     * Seems like the most portable way of detecting connect()

+	     * failure is to call getpeername(). If socket is connected,

+	     * getpeername() will return 0. If the socket is not connected,

+	     * it will return ENOTCONN, and read(fd, &ch, 1) will produce

+	     * the right errno through error slippage. This is a combination

+	     * of suggestions from Douglas C. Schmidt and Ken Keys.

+	     */

+	    int gp_rc;

+	    struct sockaddr_in addr;

+	    socklen_t addrlen = sizeof(addr);

+

+	    gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);

+	    bytes_transfered = gp_rc;

+#endif

+

+	    /* Clear operation. */

+	    h->connecting = 0;

+	    PJ_FD_CLR(h->fd, &ioqueue->wfdset);

+	    PJ_FD_CLR(h->fd, &ioqueue->xfdset);

+

+	    /* Call callback. */

+            if (h->cb.on_connect_complete)

+	        (*h->cb.on_connect_complete)(h, bytes_transfered);

+

+            /* Re-scan writable sockets. */

+            goto do_writable_scan;

+

+	} else 

+#endif /* PJ_HAS_TCP */

+	{

+	    /* Socket is writable. */

+            struct write_operation *write_op;

+            pj_ssize_t sent;

+            pj_status_t send_rc;

+

+            /* Get the first in the queue. */

+            write_op = h->write_list.next;

+

+            /* Send the data. */

+            sent = write_op->size - write_op->written;

+            if (write_op->op == PJ_IOQUEUE_OP_SEND) {

+                send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,

+                                       &sent, write_op->flags);

+            } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {

+                send_rc = pj_sock_sendto(h->fd, 

+                                         write_op->buf+write_op->written,

+                                         &sent, write_op->flags,

+                                         &write_op->rmt_addr, 

+                                         write_op->rmt_addrlen);

+            } else {

+                pj_assert(!"Invalid operation type!");

+                send_rc = PJ_EBUG;

+            }

+

+            if (send_rc == PJ_SUCCESS) {

+                write_op->written += sent;

+            } else {

+                pj_assert(send_rc > 0);

+                write_op->written = -send_rc;

+            }

+

+            /* In any case we don't need to process this descriptor again. */

+            PJ_FD_CLR(h->fd, &wfdset);

+

+            /* Are we finished with this buffer? */

+            if (send_rc!=PJ_SUCCESS || 

+                write_op->written == (pj_ssize_t)write_op->size) 

+            {

+                pj_list_erase(write_op);

+

+                /* Clear operation if there's no more data to send. */

+                if (pj_list_empty(&h->write_list))

+                    PJ_FD_CLR(h->fd, &ioqueue->wfdset);

+

+	        /* Call callback. */

+                if (h->cb.on_write_complete) {

+	            (*h->cb.on_write_complete)(h, 

+                                               (pj_ioqueue_op_key_t*)write_op,

+                                               write_op->written);

+                }

+            }

+	    

+            /* Re-scan writable sockets. */

+            goto do_writable_scan;

+	}

+    }

+

     /* Scan for readable socket. */
-    h = ioque->hlist.next;
+    h = ioqueue->key_list.next;
 do_readable_scan:
-    for ( ; h!=&ioque->hlist; h = h->next) {
-	if ((PJ_IOQUEUE_IS_READ_OP(h->op) || PJ_IOQUEUE_IS_ACCEPT_OP(h->op)) && 
-	    PJ_FD_ISSET(h->fd, &rfdset))
+    for ( ; h!=&ioqueue->key_list; h = h->next) {
+	if ((!pj_list_empty(&h->read_list) 

+#if PJ_HAS_TCP

+             || !pj_list_empty(&h->accept_list)

+#endif

+            ) && PJ_FD_ISSET(h->fd, &rfdset))
         {
 	    break;
         }
     }
-    if (h != &ioque->hlist) {
+    if (h != &ioqueue->key_list) {
         pj_status_t rc;
 
-	pj_assert(PJ_IOQUEUE_IS_READ_OP(h->op) ||
-		  PJ_IOQUEUE_IS_ACCEPT_OP(h->op));
+#if PJ_HAS_TCP

+	pj_assert(!pj_list_empty(&h->read_list) || 

+                  !pj_list_empty(&h->accept_list));

+#else

+        pj_assert(!pj_list_empty(&h->read_list));
+#endif

 	
 #	if PJ_HAS_TCP
-	if ((h->op & PJ_IOQUEUE_OP_ACCEPT)) {
-	    /* accept() must be the only operation specified on server socket */
-	    pj_assert(h->op == PJ_IOQUEUE_OP_ACCEPT);
+	if (!pj_list_empty(&h->accept_list)) {

+

+            struct accept_operation *accept_op;
+	    

+            /* Get one accept operation from the list. */
+	    accept_op = h->accept_list.next;

+            pj_list_erase(accept_op);
 
-	    rc=pj_sock_accept(h->fd, h->accept_fd, h->rmt_addr, h->rmt_addrlen);
-	    if (rc==0 && h->local_addr) {
-		rc = pj_sock_getsockname(*h->accept_fd, h->local_addr, 
-					 h->local_addrlen);
+	    rc=pj_sock_accept(h->fd, accept_op->accept_fd, 

+                              accept_op->rmt_addr, accept_op->addrlen);
+	    if (rc==PJ_SUCCESS && accept_op->local_addr) {
+		rc = pj_sock_getsockname(*accept_op->accept_fd, 

+                                         accept_op->local_addr,
+					 accept_op->addrlen);
 	    }
 
-	    h->op &= ~(PJ_IOQUEUE_OP_ACCEPT);
-	    PJ_FD_CLR(h->fd, &ioque->rfdset);
+	    /* Clear bit in fdset if there is no more pending accept */

+            if (pj_list_empty(&h->accept_list))

+	        PJ_FD_CLR(h->fd, &ioqueue->rfdset);
 
 	    /* Call callback. */
             if (h->cb.on_accept_complete)
-	        (*h->cb.on_accept_complete)(h, *h->accept_fd, rc);
+	        (*h->cb.on_accept_complete)(h, (pj_ioqueue_op_key_t*)accept_op,

+                                            *accept_op->accept_fd, rc);
 
             /* Re-scan readable sockets. */
             goto do_readable_scan;
-        } 
+        }
         else {
-#	endif
-            pj_ssize_t bytes_read = h->rd_buflen;
+#	endif

+            struct read_operation *read_op;

+            pj_ssize_t bytes_read;

+

+            pj_assert(!pj_list_empty(&h->read_list));

+

+            /* Get one pending read operation from the list. */

+            read_op = h->read_list.next;

+            pj_list_erase(read_op);

 
-	    if ((h->op & PJ_IOQUEUE_OP_RECV_FROM)) {
-	        rc = pj_sock_recvfrom(h->fd, h->rd_buf, &bytes_read, 0,
-				      h->rmt_addr, h->rmt_addrlen);
-	    } else if ((h->op & PJ_IOQUEUE_OP_RECV)) {
-	        rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0);
-            } else {
+            bytes_read = read_op->size;
+
+	    if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
+	        rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0,
+				      read_op->rmt_addr, 

+                                      read_op->rmt_addrlen);
+	    } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
+	        rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
+            } else {

+                pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
                 /*
                  * User has specified pj_ioqueue_read().
                  * On Win32, we should do ReadFile(). But because we got
@@ -478,9 +679,10 @@
                  * that error is easier to catch.
                  */
 #	        if defined(PJ_WIN32) && PJ_WIN32 != 0
-                rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0);
-#               elif (defined(PJ_LINUX) && PJ_LINUX != 0) || \
-		     (defined(PJ_SUNOS) && PJ_SUNOS != 0)
+                rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);

+                //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,

+                //              &bytes_read, NULL);

+#               elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
                 bytes_read = read(h->fd, h->rd_buf, bytes_read);
                 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
 #		elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
@@ -503,124 +705,61 @@
 	         */
 
 	        if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
-		    PJ_LOG(4,(THIS_FILE, 
-                              "Ignored ICMP port unreach. on key=%p", h));
+		    //PJ_LOG(4,(THIS_FILE, 
+                    //          "Ignored ICMP port unreach. on key=%p", h));
 	        }
 #	        endif
 
                 /* In any case we would report this to caller. */
                 bytes_read = -rc;
 	    }
-
-	    h->op &= ~(PJ_IOQUEUE_OP_READ | PJ_IOQUEUE_OP_RECV | 
-                       PJ_IOQUEUE_OP_RECV_FROM);
-	    PJ_FD_CLR(h->fd, &ioque->rfdset);
+

+            /* Clear fdset if there is no pending read. */

+            if (pj_list_empty(&h->read_list))
+	        PJ_FD_CLR(h->fd, &ioqueue->rfdset);

+

+            /* In any case clear from temporary set. */
             PJ_FD_CLR(h->fd, &rfdset);
 
 	    /* Call callback. */
             if (h->cb.on_read_complete)
-	        (*h->cb.on_read_complete)(h, bytes_read);
+	        (*h->cb.on_read_complete)(h, (pj_ioqueue_op_key_t*)read_op,

+                                          bytes_read);
 
             /* Re-scan readable sockets. */
             goto do_readable_scan;
 
         }
     }
-
-    /* Scan for writable socket  */
-    h = ioque->hlist.next;
-do_writable_scan:
-    for ( ; h!=&ioque->hlist; h = h->next) {
-	if ((PJ_IOQUEUE_IS_WRITE_OP(h->op) || PJ_IOQUEUE_IS_CONNECT_OP(h->op)) 
-	    && PJ_FD_ISSET(h->fd, &wfdset))
-        {
-	    break;
-        }
-    }
-    if (h != &ioque->hlist) {
-	pj_assert(PJ_IOQUEUE_IS_WRITE_OP(h->op) || 
-		  PJ_IOQUEUE_IS_CONNECT_OP(h->op));
-
-#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
-	if ((h->op & PJ_IOQUEUE_OP_CONNECT)) {
-	    /* Completion of connect() operation */
-	    pj_ssize_t bytes_transfered;
-
-#if (defined(PJ_LINUX) && PJ_LINUX!=0) || \
-    (defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL!=0)
-	    /* from connect(2): 
-		* On Linux, use getsockopt to read the SO_ERROR option at
-		* level SOL_SOCKET to determine whether connect() completed
-		* successfully (if SO_ERROR is zero).
-		*/
-	    int value;
-	    socklen_t vallen = sizeof(value);
-	    int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR, 
-                                   &value, &vallen);
-	    if (gs_rc != 0) {
-		/* Argh!! What to do now??? 
-		 * Just indicate that the socket is connected. The
-		 * application will get error as soon as it tries to use
-		 * the socket to send/receive.
-		 */
-		bytes_transfered = 0;
-	    } else {
-                bytes_transfered = value;
-	    }
-#elif defined(PJ_WIN32) && PJ_WIN32!=0
-	    bytes_transfered = 0; /* success */
-#else
-	    /* Excellent information in D.J. Bernstein page:
-	     * http://cr.yp.to/docs/connect.html
-	     *
-	     * Seems like the most portable way of detecting connect()
-	     * failure is to call getpeername(). If socket is connected,
-	     * getpeername() will return 0. If the socket is not connected,
-	     * it will return ENOTCONN, and read(fd, &ch, 1) will produce
-	     * the right errno through error slippage. This is a combination
-	     * of suggestions from Douglas C. Schmidt and Ken Keys.
-	     */
-	    int gp_rc;
-	    struct sockaddr_in addr;
-	    socklen_t addrlen = sizeof(addr);
-
-	    gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
-	    bytes_transfered = gp_rc;
-#endif
-
-	    /* Clear operation. */
-	    h->op &= (~PJ_IOQUEUE_OP_CONNECT);
-	    PJ_FD_CLR(h->fd, &ioque->wfdset);
-	    PJ_FD_CLR(h->fd, &ioque->xfdset);
-
-	    /* Call callback. */
-            if (h->cb.on_connect_complete)
-	        (*h->cb.on_connect_complete)(h, bytes_transfered);
-
-            /* Re-scan writable sockets. */
-            goto do_writable_scan;
-
-	} else 
-#endif /* PJ_HAS_TCP */
-	{
-	    /* Completion of write(), send(), or sendto() operation. */
-
-	    /* Clear operation. */
-	    h->op &= ~(PJ_IOQUEUE_OP_WRITE | PJ_IOQUEUE_OP_SEND | 
-                       PJ_IOQUEUE_OP_SEND_TO);
-	    PJ_FD_CLR(h->fd, &ioque->wfdset);
-            PJ_FD_CLR(h->fd, &wfdset);
-
-	    /* Call callback. */
-	    /* All data must have been sent? */
-            if (h->cb.on_write_complete)
-	        (*h->cb.on_write_complete)(h, h->wr_buflen);
-
-            /* Re-scan writable sockets. */
-            goto do_writable_scan;
-	}
-    }
-
+

+#if PJ_HAS_TCP

+    /* Scan for exception socket for TCP connection error. */

+    h = ioqueue->key_list.next;

+do_except_scan:

+    for ( ; h!=&ioqueue->key_list; h = h->next) {

+	if (h->connecting && PJ_FD_ISSET(h->fd, &xfdset))

+	    break;

+    }

+    if (h != &ioqueue->key_list) {

+

+	pj_assert(h->connecting);

+

+	/* Clear operation. */

+	h->connecting = 0;

+	PJ_FD_CLR(h->fd, &ioqueue->wfdset);

+	PJ_FD_CLR(h->fd, &ioqueue->xfdset);

+        PJ_FD_CLR(h->fd, &wfdset);

+        PJ_FD_CLR(h->fd, &xfdset);

+

+	/* Call callback. */

+        if (h->cb.on_connect_complete)

+	    (*h->cb.on_connect_complete)(h, -1);

+

+        /* Re-scan exception list. */

+        goto do_except_scan;

+    }

+#endif	/* PJ_HAS_TCP */

+

     /* Shouldn't happen. */
     /* For strange reason on WinXP select() can return 1 while there is no
      * pj_fd_set_t signaled. */
@@ -628,75 +767,63 @@
 
     //count = 0;
 
-    pj_lock_release(ioque->lock);
+    pj_lock_release(ioqueue->lock);
     return count;
 }
 
 /*
- * pj_ioqueue_read()
- *
- * Start asynchronous read from the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque,
-			             pj_ioqueue_key_t *key,
-			             void *buffer,
-			             pj_size_t buflen)
-{
-    PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
-    PJ_CHECK_STACK();
-
-    /* For consistency with other ioqueue implementation, we would reject 
-     * if descriptor has already been submitted for reading before.
-     */
-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
-                     PJ_EBUSY);
-
-    pj_lock_acquire(ioque->lock);
-
-    key->op |= PJ_IOQUEUE_OP_READ;
-    key->rd_flags = 0;
-    key->rd_buf = buffer;
-    key->rd_buflen = buflen;
-    PJ_FD_SET(key->fd, &ioque->rfdset);
-
-    pj_lock_release(ioque->lock);
-    return PJ_EPENDING;
-}
-
-
-/*
  * pj_ioqueue_recv()
  *
  * Start asynchronous recv() from the socket.
  */
-PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_t *ioque,
-				      pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_key_t *key,

+                                      pj_ioqueue_op_key_t *op_key,
 				      void *buffer,
-				      pj_size_t buflen,
+				      pj_ssize_t *length,
 				      unsigned flags )
-{
-    PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
+{

+    pj_status_t status;

+    pj_ssize_t size;

+    struct read_operation *read_op;

+    pj_ioqueue_t *ioqueue;

+
+    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
     PJ_CHECK_STACK();
+

+    /* Try to see if there's data immediately available. 

+     */

+    size = *length;

+    status = pj_sock_recv(key->fd, buffer, &size, flags);

+    if (status == PJ_SUCCESS) {

+        /* Yes! Data is available! */

+        *length = size;

+        return PJ_SUCCESS;

+    } else {

+        /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report

+         * the error to caller.

+         */

+        if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))

+            return status;

+    }
+

+    /*

+     * No data is immediately available.

+     * Must schedule asynchronous operation to the ioqueue.

+     */

+    ioqueue = key->ioqueue;
+    pj_lock_acquire(ioqueue->lock);
+

+    read_op = (struct read_operation*)op_key;

 
-    /* For consistency with other ioqueue implementation, we would reject 
-     * if descriptor has already been submitted for reading before.
-     */
-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
-                     PJ_EBUSY);
+    read_op->op = PJ_IOQUEUE_OP_RECV;
+    read_op->buf = buffer;
+    read_op->size = *length;
+    read_op->flags = flags;

+

+    pj_list_insert_before(&key->read_list, read_op);
+    PJ_FD_SET(key->fd, &ioqueue->rfdset);
 
-    pj_lock_acquire(ioque->lock);
-
-    key->op |= PJ_IOQUEUE_OP_RECV;
-    key->rd_buf = buffer;
-    key->rd_buflen = buflen;
-    key->rd_flags = flags;
-    PJ_FD_SET(key->fd, &ioque->rfdset);
-
-    pj_lock_release(ioque->lock);
+    pj_lock_release(ioqueue->lock);
     return PJ_EPENDING;
 }
 
@@ -705,80 +832,60 @@
  *
  * Start asynchronous recvfrom() from the socket.
  */
-PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque,
-				         pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,

+                                         pj_ioqueue_op_key_t *op_key,
 				         void *buffer,
-				         pj_size_t buflen,
+				         pj_ssize_t *length,
                                          unsigned flags,
 				         pj_sockaddr_t *addr,
 				         int *addrlen)
 {
-    PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
-    PJ_CHECK_STACK();
-
-    /* For consistency with other ioqueue implementation, we would reject 
-     * if descriptor has already been submitted for reading before.
-     */
-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
-                     PJ_EBUSY);
-
-    pj_lock_acquire(ioque->lock);
-
-    key->op |= PJ_IOQUEUE_OP_RECV_FROM;
-    key->rd_buf = buffer;
-    key->rd_buflen = buflen;
-    key->rd_flags = flags;
-    key->rmt_addr = addr;
-    key->rmt_addrlen = addrlen;
-    PJ_FD_SET(key->fd, &ioque->rfdset);
-
-    pj_lock_release(ioque->lock);
-    return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_write()
- *
- * Start asynchronous write() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque,
-			              pj_ioqueue_key_t *key,
-			              const void *data,
-			              pj_size_t datalen)
-{
-    pj_status_t rc;
-    pj_ssize_t sent;
-
-    PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
-    PJ_CHECK_STACK();
-
-    /* For consistency with other ioqueue implementation, we would reject 
-     * if descriptor has already been submitted for writing before.
-     */
-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
-                     PJ_EBUSY);
-
-    sent = datalen;
-    /* sent would be -1 after pj_sock_send() if it returns error. */
-    rc = pj_sock_send(key->fd, data, &sent, 0);
-    if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
-        return rc;
-    }
-
-    pj_lock_acquire(ioque->lock);
-
-    key->op |= PJ_IOQUEUE_OP_WRITE;
-    key->wr_buf = NULL;
-    key->wr_buflen = datalen;
-    PJ_FD_SET(key->fd, &ioque->wfdset);
-
-    pj_lock_release(ioque->lock);
-
-    return PJ_EPENDING;
+    pj_status_t status;

+    pj_ssize_t size;

+    struct read_operation *read_op;

+    pj_ioqueue_t *ioqueue;

+

+    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);

+    PJ_CHECK_STACK();

+

+    /* Try to see if there's data immediately available. 

+     */

+    size = *length;

+    status = pj_sock_recvfrom(key->fd, buffer, &size, flags,

+                              addr, addrlen);

+    if (status == PJ_SUCCESS) {

+        /* Yes! Data is available! */

+        *length = size;

+        return PJ_SUCCESS;

+    } else {

+        /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report

+         * the error to caller.

+         */

+        if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))

+            return status;

+    }

+

+    /*

+     * No data is immediately available.

+     * Must schedule asynchronous operation to the ioqueue.

+     */

+    ioqueue = key->ioqueue;

+    pj_lock_acquire(ioqueue->lock);

+

+    read_op = (struct read_operation*)op_key;

+

+    read_op->op = PJ_IOQUEUE_OP_RECV_FROM;

+    read_op->buf = buffer;

+    read_op->size = *length;

+    read_op->flags = flags;

+    read_op->rmt_addr = addr;

+    read_op->rmt_addrlen = addrlen;

+

+    pj_list_insert_before(&key->read_list, read_op);

+    PJ_FD_SET(key->fd, &ioqueue->rfdset);

+

+    pj_lock_release(ioqueue->lock);

+    return PJ_EPENDING;

 }
 
 /*
@@ -786,41 +893,71 @@
  *
  * Start asynchronous send() to the descriptor.
  */
-PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque,
-			             pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,

+                                     pj_ioqueue_op_key_t *op_key,
 			             const void *data,
-			             pj_size_t datalen,
+			             pj_ssize_t *length,
                                      unsigned flags)
-{
-    pj_status_t rc;
+{

+    pj_ioqueue_t *ioqueue;

+    struct write_operation *write_op;
+    pj_status_t status;

     pj_ssize_t sent;
 
-    PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
+    PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
     PJ_CHECK_STACK();
-
-    /* For consistency with other ioqueue implementation, we would reject 
-     * if descriptor has already been submitted for writing before.
-     */
-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
-                     PJ_EBUSY);
-
-    sent = datalen;
-    /* sent would be -1 after pj_sock_send() if it returns error. */
-    rc = pj_sock_send(key->fd, data, &sent, flags);
-    if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
-        return rc;
+

+    /* Fast track:

+     *   Try to send data immediately, only if there's no pending write!

+     * Note:

+     *  We are speculating that the list is empty here without properly

+     *  acquiring ioqueue's mutex first. This is intentional, to maximize

+     *  performance via parallelism.

+     *

+     *  This should be safe, because:

+     *      - by convention, we require caller to make sure that the

+     *        key is not unregistered while other threads are invoking

+     *        an operation on the same key.

+     *      - pj_list_empty() is safe to be invoked by multiple threads,

+     *        even when other threads are modifying the list.

+     */

+    if (pj_list_empty(&key->write_list)) {

+        /*

+         * See if data can be sent immediately.

+         */
+        sent = *length;
+        status = pj_sock_send(key->fd, data, &sent, flags);
+        if (status == PJ_SUCCESS) {

+            /* Success! */

+            *length = sent;

+            return PJ_SUCCESS;

+        } else {

+            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report

+             * the error to caller.

+             */

+            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {

+                return status;

+            }

+        }

     }
+

+    /*

+     * Schedule asynchronous send.

+     */

+    ioqueue = key->ioqueue;

+    pj_lock_acquire(ioqueue->lock);
+

+    write_op = (struct write_operation*)op_key;
+    write_op->op = PJ_IOQUEUE_OP_SEND;
+    write_op->buf = NULL;

+    write_op->size = *length;

+    write_op->written = 0;

+    write_op->flags = flags;

+    

+    pj_list_insert_before(&key->write_list, write_op);
+    PJ_FD_SET(key->fd, &ioqueue->wfdset);
 
-    pj_lock_acquire(ioque->lock);
-
-    key->op |= PJ_IOQUEUE_OP_SEND;
-    key->wr_buf = NULL;
-    key->wr_buflen = datalen;
-    PJ_FD_SET(key->fd, &ioque->wfdset);
-
-    pj_lock_release(ioque->lock);
+    pj_lock_release(ioqueue->lock);
 
     return PJ_EPENDING;
 }
@@ -831,75 +968,149 @@
  *
  * Start asynchronous write() to the descriptor.
  */
-PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque,
-			               pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,

+                                       pj_ioqueue_op_key_t *op_key,
 			               const void *data,
-			               pj_size_t datalen,
+			               pj_ssize_t *length,
                                        unsigned flags,
 			               const pj_sockaddr_t *addr,
 			               int addrlen)
 {
-    pj_status_t rc;
-    pj_ssize_t sent;
-
-    PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
-    PJ_CHECK_STACK();
-
-    /* For consistency with other ioqueue implementation, we would reject 
-     * if descriptor has already been submitted for writing before.
-     */
-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
-                     PJ_EBUSY);
-
-    sent = datalen;
-    /* sent would be -1 after pj_sock_sendto() if it returns error. */
-    rc = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
-    if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK))  {
-        return rc;
-    }
-
-    pj_lock_acquire(ioque->lock);
-
-    key->op |= PJ_IOQUEUE_OP_SEND_TO;
-    key->wr_buf = NULL;
-    key->wr_buflen = datalen;
-    PJ_FD_SET(key->fd, &ioque->wfdset);
-
-    pj_lock_release(ioque->lock);
-    return PJ_EPENDING;
+    pj_ioqueue_t *ioqueue;

+    struct write_operation *write_op;

+    pj_status_t status;

+    pj_ssize_t sent;

+

+    PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);

+    PJ_CHECK_STACK();

+

+    /* Fast track:

+     *   Try to send data immediately, only if there's no pending write!

+     * Note:

+     *  We are speculating that the list is empty here without properly

+     *  acquiring ioqueue's mutex first. This is intentional, to maximize

+     *  performance via parallelism.

+     *

+     *  This should be safe, because:

+     *      - by convention, we require caller to make sure that the

+     *        key is not unregistered while other threads are invoking

+     *        an operation on the same key.

+     *      - pj_list_empty() is safe to be invoked by multiple threads,

+     *        even when other threads are modifying the list.

+     */

+    if (pj_list_empty(&key->write_list)) {

+        /*

+         * See if data can be sent immediately.

+         */

+        sent = *length;

+        status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);

+        if (status == PJ_SUCCESS) {

+            /* Success! */

+            *length = sent;

+            return PJ_SUCCESS;

+        } else {

+            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report

+             * the error to caller.

+             */

+            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {

+                return status;

+            }

+        }

+    }

+

+    /*

+     * Check that address storage can hold the address parameter.

+     */

+    PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);

+

+    /*

+     * Schedule asynchronous send.

+     */

+    ioqueue = key->ioqueue;

+    pj_lock_acquire(ioqueue->lock);

+

+    write_op = (struct write_operation*)op_key;

+    write_op->op = PJ_IOQUEUE_OP_SEND_TO;

+    write_op->buf = NULL;

+    write_op->size = *length;

+    write_op->written = 0;

+    write_op->flags = flags;

+    pj_memcpy(&write_op->rmt_addr, addr, addrlen);

+    write_op->rmt_addrlen = addrlen;

+    

+    pj_list_insert_before(&key->write_list, write_op);

+    PJ_FD_SET(key->fd, &ioqueue->wfdset);

+

+    pj_lock_release(ioqueue->lock);

+

+    return PJ_EPENDING;

 }
 
 #if PJ_HAS_TCP
 /*
  * Initiate overlapped accept() operation.
  */
-PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue,
-			       pj_ioqueue_key_t *key,
-			       pj_sock_t *new_sock,
-			       pj_sockaddr_t *local,
-			       pj_sockaddr_t *remote,
-			       int *addrlen)
-{
+PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,

+                                       pj_ioqueue_op_key_t *op_key,
+			               pj_sock_t *new_sock,
+			               pj_sockaddr_t *local,
+			               pj_sockaddr_t *remote,
+			               int *addrlen)
+{

+    pj_ioqueue_t *ioqueue;

+    struct accept_operation *accept_op;

+    pj_status_t status;

+
     /* check parameters. All must be specified! */
-    pj_assert(ioqueue && key && new_sock);
+    PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
 
-    /* Server socket must have no other operation! */
-    pj_assert(key->op == 0);
-    
-    pj_lock_acquire(ioqueue->lock);
+    /* Fast track:

+     *  See if there's new connection available immediately.

+     */

+    if (pj_list_empty(&key->accept_list)) {

+        status = pj_sock_accept(key->fd, new_sock, remote, addrlen);

+        if (status == PJ_SUCCESS) {

+            /* Yes! New connection is available! */

+            if (local && addrlen) {

+                status = pj_sock_getsockname(*new_sock, local, addrlen);

+                if (status != PJ_SUCCESS) {

+                    pj_sock_close(*new_sock);

+                    *new_sock = PJ_INVALID_SOCKET;

+                    return status;

+                }

+            }

+            return PJ_SUCCESS;

+        } else {

+            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report

+             * the error to caller.

+             */

+            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {

+                return status;

+            }

+        }

+    }

+

+    /*

+     * No connection is available immediately.

+     * Schedule accept() operation to be completed when there is incoming

+     * connection available.

+     */

+    ioqueue = key->ioqueue;

+    accept_op = (struct accept_operation*)op_key;

 
-    key->op = PJ_IOQUEUE_OP_ACCEPT;
-    key->accept_fd = new_sock;
-    key->rmt_addr = remote;
-    key->rmt_addrlen = addrlen;
-    key->local_addr = local;
-    key->local_addrlen = addrlen;   /* use same addr. as rmt_addrlen */
+    pj_lock_acquire(ioqueue->lock);

 
+    accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
+    accept_op->accept_fd = new_sock;
+    accept_op->rmt_addr = remote;
+    accept_op->addrlen= addrlen;
+    accept_op->local_addr = local;
+

+    pj_list_insert_before(&key->accept_list, accept_op);
     PJ_FD_SET(key->fd, &ioqueue->rfdset);
+

+    pj_lock_release(ioqueue->lock);

 
-    pj_lock_release(ioqueue->lock);
     return PJ_EPENDING;
 }
 
@@ -907,37 +1118,37 @@
  * Initiate overlapped connect() operation (well, it's non-blocking actually,
  * since there's no overlapped version of connect()).
  */
-PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue,
-					pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
 					const pj_sockaddr_t *addr,
 					int addrlen )
-{
-    pj_status_t rc;
+{

+    pj_ioqueue_t *ioqueue;
+    pj_status_t status;
     
     /* check parameters. All must be specified! */
-    PJ_ASSERT_RETURN(ioqueue && key && addr && addrlen, PJ_EINVAL);
+    PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
 
-    /* Connecting socket must have no other operation! */
-    PJ_ASSERT_RETURN(key->op == 0, PJ_EBUSY);
+    /* Check if socket has not been marked for connecting */
+    if (key->connecting != 0)

+        return PJ_EPENDING;
     
-    rc = pj_sock_connect(key->fd, addr, addrlen);
-    if (rc == PJ_SUCCESS) {
+    status = pj_sock_connect(key->fd, addr, addrlen);
+    if (status == PJ_SUCCESS) {
 	/* Connected! */
 	return PJ_SUCCESS;
     } else {
-	if (rc == PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) || 
-            rc == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) 
-        {
-	    /* Pending! */
+	if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
+	    /* Pending! */

+            ioqueue = key->ioqueue;
 	    pj_lock_acquire(ioqueue->lock);
-	    key->op = PJ_IOQUEUE_OP_CONNECT;
+	    key->connecting = PJ_TRUE;
 	    PJ_FD_SET(key->fd, &ioqueue->wfdset);
 	    PJ_FD_SET(key->fd, &ioqueue->xfdset);
 	    pj_lock_release(ioqueue->lock);
 	    return PJ_EPENDING;
 	} else {
 	    /* Error! */
-	    return rc;
+	    return status;
 	}
     }
 }
diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c
index dbf883a..afb75c5 100644
--- a/pjlib/src/pj/ioqueue_winnt.c
+++ b/pjlib/src/pj/ioqueue_winnt.c
@@ -22,17 +22,28 @@
 #  include <mswsock.h>
 #endif
 
-
-#define ACCEPT_ADDR_LEN	    (sizeof(pj_sockaddr_in)+20)
+

+/* The address specified in AcceptEx() must be 16 more than the size of

+ * SOCKADDR (source: MSDN).

+ */
+#define ACCEPT_ADDR_LEN	    (sizeof(pj_sockaddr_in)+16)
+

+typedef struct generic_overlapped

+{

+    WSAOVERLAPPED	   overlapped;

+    pj_ioqueue_operation_e operation;

+} generic_overlapped;

 
 /*
  * OVERLAP structure for send and receive.
  */
 typedef struct ioqueue_overlapped
 {
-    WSAOVERLAPPED	   overlapped;
+    WSAOVERLAPPED	   overlapped;

     pj_ioqueue_operation_e operation;
-    WSABUF		   wsabuf;
+    WSABUF		   wsabuf;

+    pj_sockaddr_in         dummy_addr;

+    int                    dummy_addrlen;

 } ioqueue_overlapped;
 
 #if PJ_HAS_TCP
@@ -41,7 +52,7 @@
  */
 typedef struct ioqueue_accept_rec
 {
-    WSAOVERLAPPED	    overlapped;
+    WSAOVERLAPPED	    overlapped;

     pj_ioqueue_operation_e  operation;
     pj_sock_t		    newsock;
     pj_sock_t		   *newsock_ptr;
@@ -51,19 +62,29 @@
     char		    accept_buf[2 * ACCEPT_ADDR_LEN];
 } ioqueue_accept_rec;
 #endif
+

+/*

+ * Structure to hold pending operation key.

+ */

+union operation_key

+{

+    generic_overlapped      generic;

+    ioqueue_overlapped      overlapped;

+#if PJ_HAS_TCP

+    ioqueue_accept_rec      accept;

+#endif

+};

 
 /*
  * Structure for individual socket.
  */
 struct pj_ioqueue_key_t
-{
+{

+    pj_ioqueue_t       *ioqueue;
     HANDLE		hnd;
     void	       *user_data;
-    ioqueue_overlapped	recv_overlapped;
-    ioqueue_overlapped	send_overlapped;
 #if PJ_HAS_TCP
     int			connecting;
-    ioqueue_accept_rec	accept_overlapped;
 #endif
     pj_ioqueue_callback	cb;
 };
@@ -106,9 +127,14 @@
 			  &local,
 			  &locallen,
 			  &remote,
-			  &remotelen);
-    pj_memcpy(accept_overlapped->local, local, locallen);
-    pj_memcpy(accept_overlapped->remote, remote, locallen);
+			  &remotelen);

+    if (*accept_overlapped->addrlen > locallen) {
+        pj_memcpy(accept_overlapped->local, local, locallen);
+        pj_memcpy(accept_overlapped->remote, remote, locallen);

+    } else {

+        pj_memset(accept_overlapped->local, 0, *accept_overlapped->addrlen);

+        pj_memset(accept_overlapped->remote, 0, *accept_overlapped->addrlen);

+    }
     *accept_overlapped->addrlen = locallen;
     if (accept_overlapped->newsock_ptr)
         *accept_overlapped->newsock_ptr = accept_overlapped->newsock;
@@ -120,7 +146,6 @@
 {
     pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos];
     HANDLE hEvent = ioqueue->connecting_handles[pos];
-    unsigned long optval;
 
     /* Remove key from array of connecting handles. */
     pj_array_erase(ioqueue->connecting_keys, sizeof(key),
@@ -143,12 +168,6 @@
 	CloseHandle(hEvent);
     }
 
-    /* Set socket to blocking again. */
-    optval = 0;
-    if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) {
-	DWORD dwStatus;
-	dwStatus = WSAGetLastError();
-    }
 }
 
 /*
@@ -183,7 +202,8 @@
 	    WSAEnumNetworkEvents((pj_sock_t)key->hnd, 
 				 ioqueue->connecting_handles[pos], 
 				 &net_events);
-	    *connect_err = net_events.iErrorCode[FD_CONNECT_BIT];
+	    *connect_err = 

+                PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]);
 
 	    /* Erase socket from pending connect. */
 	    erase_connecting_socket(ioqueue, pos);
@@ -194,95 +214,121 @@
 }
 #endif
 
-
+/*

+ * pj_ioqueue_create()

+ */
 PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, 
 				       pj_size_t max_fd,
-				       int max_threads,
-				       pj_ioqueue_t **ioqueue)
+				       pj_ioqueue_t **p_ioqueue)
 {
-    pj_ioqueue_t *ioq;
+    pj_ioqueue_t *ioqueue;
     pj_status_t rc;
 
     PJ_UNUSED_ARG(max_fd);
-    PJ_ASSERT_RETURN(pool && ioqueue, PJ_EINVAL);
+    PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL);
+

+    rc = sizeof(union operation_key);

+

+    /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */

+    PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= 

+                     sizeof(union operation_key), PJ_EBUG);

 
-    ioq = pj_pool_zalloc(pool, sizeof(*ioq));
-    ioq->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, max_threads);
-    if (ioq->iocp == NULL)
+    ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue));
+    ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+    if (ioqueue->iocp == NULL)
 	return PJ_RETURN_OS_ERROR(GetLastError());
 
-    rc = pj_lock_create_simple_mutex(pool, NULL, &ioq->lock);
+    rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock);
     if (rc != PJ_SUCCESS) {
-	CloseHandle(ioq->iocp);
+	CloseHandle(ioqueue->iocp);
 	return rc;
     }
 
-    ioq->auto_delete_lock = PJ_TRUE;
+    ioqueue->auto_delete_lock = PJ_TRUE;
 
-    *ioqueue = ioq;
+    *p_ioqueue = ioqueue;
 
-    PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioq));
+    PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue));
     return PJ_SUCCESS;
 }
-
-PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioque )
+

+/*

+ * pj_ioqueue_destroy()

+ */
+PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
 {
     unsigned i;
 
     PJ_CHECK_STACK();
-    PJ_ASSERT_RETURN(ioque, PJ_EINVAL);
+    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
 
     /* Destroy events in the pool */
-    for (i=0; i<ioque->event_count; ++i) {
-	CloseHandle(ioque->event_pool[i]);
+    for (i=0; i<ioqueue->event_count; ++i) {
+	CloseHandle(ioqueue->event_pool[i]);
     }
-    ioque->event_count = 0;
+    ioqueue->event_count = 0;
 
-    if (ioque->auto_delete_lock)
-        pj_lock_destroy(ioque->lock);
-
-    if (CloseHandle(ioque->iocp) == TRUE)
-	return PJ_SUCCESS;
-    else
-	return PJ_RETURN_OS_ERROR(GetLastError());
+    if (CloseHandle(ioqueue->iocp) != TRUE)
+	return PJ_RETURN_OS_ERROR(GetLastError());

+

+    if (ioqueue->auto_delete_lock)

+        pj_lock_destroy(ioqueue->lock);

+

+    return PJ_SUCCESS;
 }
-
-PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque, 
+

+/*

+ * pj_ioqueue_set_lock()

+ */
+PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, 
 					 pj_lock_t *lock,
 					 pj_bool_t auto_delete )
 {
-    PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL);
+    PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
 
-    if (ioque->auto_delete_lock) {
-        pj_lock_destroy(ioque->lock);
+    if (ioqueue->auto_delete_lock) {
+        pj_lock_destroy(ioqueue->lock);
     }
 
-    ioque->lock = lock;
-    ioque->auto_delete_lock = auto_delete;
+    ioqueue->lock = lock;
+    ioqueue->auto_delete_lock = auto_delete;
 
     return PJ_SUCCESS;
 }
-
+

+/*

+ * pj_ioqueue_register_sock()

+ */
 PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
-					      pj_ioqueue_t *ioque,
-					      pj_sock_t hnd,
+					      pj_ioqueue_t *ioqueue,
+					      pj_sock_t sock,
 					      void *user_data,
 					      const pj_ioqueue_callback *cb,
 					      pj_ioqueue_key_t **key )
 {
     HANDLE hioq;
-    pj_ioqueue_key_t *rec;
+    pj_ioqueue_key_t *rec;

+    u_long value;
+    int rc;

 
-    PJ_ASSERT_RETURN(pool && ioque && cb && key, PJ_EINVAL);
-
-    rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
-    rec->hnd = (HANDLE)hnd;
+    PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL);
+

+    /* Build the key for this socket. */
+    rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));

+    rec->ioqueue = ioqueue;
+    rec->hnd = (HANDLE)sock;
     rec->user_data = user_data;
     pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
-#if PJ_HAS_TCP
-    rec->accept_overlapped.newsock = PJ_INVALID_SOCKET;
-#endif
-    hioq = CreateIoCompletionPort((HANDLE)hnd, ioque->iocp, (DWORD)rec, 0);
+

+    /* Set socket to nonblocking. */

+    value = 1;

+    rc = ioctlsocket(sock, FIONBIO, &value);

+    if (rc != 0) {

+        return PJ_RETURN_OS_ERROR(WSAGetLastError());

+    }

+

+    /* Associate with IOCP */
+    hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0);
     if (!hioq) {
 	return PJ_RETURN_OS_ERROR(GetLastError());
     }
@@ -291,58 +337,78 @@
     return PJ_SUCCESS;
 }
 
-
-
-PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque,
-					   pj_ioqueue_key_t *key )
+/*
+ * pj_ioqueue_unregister()

+ */
+PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
 {
-    PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL);
+    PJ_ASSERT_RETURN(key, PJ_EINVAL);
 
 #if PJ_HAS_TCP
     if (key->connecting) {
 	unsigned pos;
+        pj_ioqueue_t *ioqueue;

+

+        ioqueue = key->ioqueue;

 
 	/* Erase from connecting_handles */
-	pj_lock_acquire(ioque->lock);
-	for (pos=0; pos < ioque->connecting_count; ++pos) {
-	    if (ioque->connecting_keys[pos] == key) {
-		erase_connecting_socket(ioque, pos);
-                if (key->accept_overlapped.newsock_ptr) {
-                    /* ??? shouldn't it be newsock instead of newsock_ptr??? */
-		    closesocket(*key->accept_overlapped.newsock_ptr);
-                }
+	pj_lock_acquire(ioqueue->lock);
+	for (pos=0; pos < ioqueue->connecting_count; ++pos) {
+	    if (ioqueue->connecting_keys[pos] == key) {
+		erase_connecting_socket(ioqueue, pos);
 		break;
 	    }
 	}
-	pj_lock_release(ioque->lock);
-	key->connecting = 0;
+	key->connecting = 0;

+	pj_lock_release(ioqueue->lock);
     }
 #endif
     return PJ_SUCCESS;
 }
-
+

+/*

+ * pj_ioqueue_get_user_data()

+ */
 PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
 {
     PJ_ASSERT_RETURN(key, NULL);
     return key->user_data;
 }
-
-/*
+

+/*

+ * pj_ioqueue_set_user_data()

+ */

+PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,

+                                              void *user_data,

+                                              void **old_data )

+{

+    PJ_ASSERT_RETURN(key, PJ_EINVAL);

+    

+    if (old_data)

+        *old_data = key->user_data;

+

+    key->user_data = user_data;

+    return PJ_SUCCESS;

+}

+

+/*

+ * pj_ioqueue_poll()

+ *
  * Poll for events.
  */
-PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout)
+PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
 {
     DWORD dwMsec, dwBytesTransfered, dwKey;
-    ioqueue_overlapped *ov;
+    generic_overlapped *pOv;
     pj_ioqueue_key_t *key;
     pj_ssize_t size_status;
     BOOL rc;
 
-    PJ_ASSERT_RETURN(ioque, -PJ_EINVAL);
+    PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
 
     /* Check the connecting array. */
 #if PJ_HAS_TCP
-    key = check_connecting(ioque, &size_status);
+    key = check_connecting(ioqueue, &size_status);
     if (key != NULL) {
 	key->cb.on_connect_complete(key, (int)size_status);
 	return 1;
@@ -353,40 +419,46 @@
     dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
 
     /* Poll for completion status. */
-    rc = GetQueuedCompletionStatus(ioque->iocp, &dwBytesTransfered, &dwKey,
-				   (OVERLAPPED**)&ov, dwMsec);
+    rc = GetQueuedCompletionStatus(ioqueue->iocp, &dwBytesTransfered, &dwKey,
+				   (OVERLAPPED**)&pOv, dwMsec);
 
     /* The return value is:
      * - nonzero if event was dequeued.
-     * - zero and ov==NULL if no event was dequeued.
-     * - zero and ov!=NULL if event for failed I/O was dequeued.
+     * - zero and pOv==NULL if no event was dequeued.
+     * - zero and pOv!=NULL if event for failed I/O was dequeued.
      */
-    if (ov) {
+    if (pOv) {
 	/* Event was dequeued for either successfull or failed I/O */
 	key = (pj_ioqueue_key_t*)dwKey;
 	size_status = dwBytesTransfered;
-	switch (ov->operation) {
+	switch (pOv->operation) {
 	case PJ_IOQUEUE_OP_READ:
 	case PJ_IOQUEUE_OP_RECV:
 	case PJ_IOQUEUE_OP_RECV_FROM:
-            key->recv_overlapped.operation = 0;
+            pOv->operation = 0;
             if (key->cb.on_read_complete)
-	        key->cb.on_read_complete(key, size_status);
+	        key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv, 

+                                         size_status);
 	    break;
 	case PJ_IOQUEUE_OP_WRITE:
 	case PJ_IOQUEUE_OP_SEND:
 	case PJ_IOQUEUE_OP_SEND_TO:
-            key->send_overlapped.operation = 0;
+            pOv->operation = 0;
             if (key->cb.on_write_complete)
-	        key->cb.on_write_complete(key, size_status);
+	        key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv, 

+                                                size_status);
 	    break;
 #if PJ_HAS_TCP
 	case PJ_IOQUEUE_OP_ACCEPT:
 	    /* special case for accept. */
-	    ioqueue_on_accept_complete((ioqueue_accept_rec*)ov);
-            if (key->cb.on_accept_complete)
-	        key->cb.on_accept_complete(key, key->accept_overlapped.newsock,
-                                           0);
+	    ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv);
+            if (key->cb.on_accept_complete) {

+                ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv;
+	        key->cb.on_accept_complete(key, 

+                                           (pj_ioqueue_op_key_t*)pOv, 

+                                           accept_rec->newsock,
+                                           PJ_SUCCESS);

+            }
 	    break;
 	case PJ_IOQUEUE_OP_CONNECT:
 #endif
@@ -398,9 +470,9 @@
     }
 
     if (GetLastError()==WAIT_TIMEOUT) {
-	/* Check the connecting array. */
+	/* Check the connecting array (again). */
 #if PJ_HAS_TCP
-	key = check_connecting(ioque, &size_status);
+	key = check_connecting(ioqueue, &size_status);
 	if (key != NULL) {
 	    key->cb.on_connect_complete(key, (int)size_status);
 	    return 1;
@@ -412,95 +484,72 @@
 }
 
 /*
- * pj_ioqueue_read()
- *
- * Initiate overlapped ReadFile operation.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque,
-				     pj_ioqueue_key_t *key,
-				     void *buffer,
-				     pj_size_t buflen)
-{
-    BOOL rc;
-    DWORD bytesRead;
-
-    PJ_CHECK_STACK();
-    PJ_UNUSED_ARG(ioque);
-
-    if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) {
-        pj_assert(!"Operation already pending for this descriptor");
-        return PJ_EBUSY;
-    }
-
-    pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped));
-    key->recv_overlapped.operation = PJ_IOQUEUE_OP_READ;
-
-    rc = ReadFile(key->hnd, buffer, buflen, &bytesRead, 
-		  &key->recv_overlapped.overlapped);
-    if (rc == FALSE) {
-	DWORD dwStatus = GetLastError();
-	if (dwStatus==ERROR_IO_PENDING)
-            return PJ_EPENDING;
-        else
-            return PJ_STATUS_FROM_OS(dwStatus);
-    } else {
-	/*
-	 * This is workaround to a probable bug in Win2000 (probably NT too).
-	 * Even if 'rc' is TRUE, which indicates operation has completed,
-	 * GetQueuedCompletionStatus still will return the key.
-	 * So as work around, we always return PJ_EPENDING here.
-	 */
-	return PJ_EPENDING;
-    }
-}
-
-/*
  * pj_ioqueue_recv()
  *
  * Initiate overlapped WSARecv() operation.
  */
-PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_t *ioque,
-				      pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_key_t *key,

+                                      pj_ioqueue_op_key_t *op_key,
 				      void *buffer,
-				      pj_size_t buflen,
+				      pj_ssize_t *length,
 				      unsigned flags )
-{
-    int rc;
-    DWORD bytesRead;
-    DWORD dwFlags = 0;
-
-    PJ_CHECK_STACK();
-    PJ_UNUSED_ARG(ioque);
-
-    if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) {
-        pj_assert(!"Operation already pending for this socket");
-        return PJ_EBUSY;
-    }
-
-    pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped));
-    key->recv_overlapped.operation = PJ_IOQUEUE_OP_READ;
-
-    key->recv_overlapped.wsabuf.buf = buffer;
-    key->recv_overlapped.wsabuf.len = buflen;
-
-    dwFlags = flags;
-
-    rc = WSARecv((SOCKET)key->hnd, &key->recv_overlapped.wsabuf, 1, 
-                 &bytesRead, &dwFlags,
-		 &key->recv_overlapped.overlapped, NULL);
-    if (rc == SOCKET_ERROR) {
-	DWORD dwStatus = WSAGetLastError();
-	if (dwStatus==WSA_IO_PENDING)
-            return PJ_EPENDING;
-        else
-            return PJ_STATUS_FROM_OS(dwStatus);
-    } else {
-	/* Must always return pending status.
-	 * See comments on pj_ioqueue_read
-	 * return bytesRead;
-         */
-	return PJ_EPENDING;
-    }
+{

+    /*

+     * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and

+     * addrlen here. But unfortunately it generates EINVAL... :-(

+     *  -bennylp

+     */

+    int rc;

+    DWORD bytesRead;

+    DWORD dwFlags = 0;

+    union operation_key *op_key_rec;

+

+    PJ_CHECK_STACK();

+    PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);

+

+    op_key_rec = (union operation_key*)op_key->internal__;

+    op_key_rec->overlapped.wsabuf.buf = buffer;

+    op_key_rec->overlapped.wsabuf.len = *length;

+

+    dwFlags = flags;

+    

+    /* Try non-overlapped received first to see if data is

+     * immediately available.

+     */

+    rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,

+                 &bytesRead, &dwFlags, NULL, NULL);

+    if (rc == 0) {

+        *length = bytesRead;

+        return PJ_SUCCESS;

+    } else {

+        DWORD dwError = WSAGetLastError();

+        if (dwError != WSAEWOULDBLOCK) {

+            *length = -1;

+            return PJ_RETURN_OS_ERROR(dwError);

+        }

+    }

+

+    /*

+     * No immediate data available.

+     * Register overlapped Recv() operation.

+     */

+    pj_memset(&op_key_rec->overlapped.overlapped, 0,

+              sizeof(op_key_rec->overlapped.overlapped));

+    op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;

+

+    rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, 

+                  &bytesRead, &dwFlags, 

+		  &op_key_rec->overlapped.overlapped, NULL);

+    if (rc == SOCKET_ERROR) {

+	DWORD dwStatus = WSAGetLastError();

+        if (dwStatus!=WSA_IO_PENDING) {

+            *length = -1;

+            return PJ_STATUS_FROM_OS(dwStatus);

+        }

+    }

+

+    /* Pending operation has been scheduled. */

+    return PJ_EPENDING;

 }
 
 /*
@@ -508,136 +557,79 @@
  *
  * Initiate overlapped RecvFrom() operation.
  */
-PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque,
-					 pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,

+                                         pj_ioqueue_op_key_t *op_key,
 					 void *buffer,
-					 pj_size_t buflen,
+					 pj_ssize_t *length,
                                          unsigned flags,
 					 pj_sockaddr_t *addr,
 					 int *addrlen)
 {
-    BOOL rc;
-    DWORD bytesRead;
-    DWORD dwFlags;
-
-    PJ_CHECK_STACK();
-    PJ_UNUSED_ARG(ioque);
-
-    if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) {
-        pj_assert(!"Operation already pending for this socket");
-        return PJ_EBUSY;
-    }
-
-    pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped));
-    key->recv_overlapped.operation = PJ_IOQUEUE_OP_RECV_FROM;
-    key->recv_overlapped.wsabuf.buf = buffer;
-    key->recv_overlapped.wsabuf.len = buflen;
-    dwFlags = flags;
-    rc = WSARecvFrom((SOCKET)key->hnd, &key->recv_overlapped.wsabuf, 1, 
-		     &bytesRead, &dwFlags, 
-		     addr, addrlen,
-		     &key->recv_overlapped.overlapped, NULL);
-    if (rc == SOCKET_ERROR) {
-	DWORD dwStatus = WSAGetLastError();
-	if (dwStatus==WSA_IO_PENDING)
-            return PJ_EPENDING;
-        else
-            return PJ_STATUS_FROM_OS(dwStatus);
-    } else {
-	/* Must always return pending status.
-	 * See comments on pj_ioqueue_read
-	 * return bytesRead;
-         */
-	return PJ_EPENDING;
-    }
+    int rc;

+    DWORD bytesRead;

+    DWORD dwFlags = 0;

+    union operation_key *op_key_rec;

+

+    PJ_CHECK_STACK();

+    PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);

+

+    op_key_rec = (union operation_key*)op_key->internal__;

+    op_key_rec->overlapped.wsabuf.buf = buffer;

+    op_key_rec->overlapped.wsabuf.len = *length;

+

+    dwFlags = flags;

+    

+    /* Try non-overlapped received first to see if data is

+     * immediately available.

+     */

+    rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,

+                     &bytesRead, &dwFlags, addr, addrlen, NULL, NULL);

+    if (rc == 0) {

+        *length = bytesRead;

+        return PJ_SUCCESS;

+    } else {

+        DWORD dwError = WSAGetLastError();

+        if (dwError != WSAEWOULDBLOCK) {

+            *length = -1;

+            return PJ_RETURN_OS_ERROR(dwError);

+        }

+    }

+

+    /*

+     * No immediate data available.

+     * Register overlapped Recv() operation.

+     */

+    pj_memset(&op_key_rec->overlapped.overlapped, 0,

+              sizeof(op_key_rec->overlapped.overlapped));

+    op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;

+

+    rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, 

+                     &bytesRead, &dwFlags, addr, addrlen,

+		     &op_key_rec->overlapped.overlapped, NULL);

+    if (rc == SOCKET_ERROR) {

+	DWORD dwStatus = WSAGetLastError();

+        if (dwStatus!=WSA_IO_PENDING) {

+            *length = -1;

+            return PJ_STATUS_FROM_OS(dwStatus);

+        }

+    } 

+    

+    /* Pending operation has been scheduled. */

+    return PJ_EPENDING;

 }
 
 /*
- * pj_ioqueue_write()
- *
- * Initiate overlapped WriteFile() operation.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque,
-				      pj_ioqueue_key_t *key,
-				      const void *data,
-				      pj_size_t datalen)
-{
-    BOOL rc;
-    DWORD bytesWritten;
-
-    PJ_CHECK_STACK();
-    PJ_UNUSED_ARG(ioque);
-
-    if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) {
-        pj_assert(!"Operation already pending for this descriptor");
-        return PJ_EBUSY;
-    }
-
-    pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped));
-    key->send_overlapped.operation = PJ_IOQUEUE_OP_WRITE;
-    rc = WriteFile(key->hnd, data, datalen, &bytesWritten, 
-		   &key->send_overlapped.overlapped);
-    
-    if (rc == FALSE) {
-	DWORD dwStatus = GetLastError();
-	if (dwStatus==ERROR_IO_PENDING)
-            return PJ_EPENDING;
-        else
-            return PJ_STATUS_FROM_OS(dwStatus);
-    } else {
-	/* Must always return pending status.
-	 * See comments on pj_ioqueue_read
-	 * return bytesWritten;
-         */
-	return PJ_EPENDING;
-    }
-}
-
-
-/*
  * pj_ioqueue_send()
  *
  * Initiate overlapped Send operation.
  */
-PJ_DEF(pj_status_t) pj_ioqueue_send(  pj_ioqueue_t *ioque,
-				      pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_send(  pj_ioqueue_key_t *key,

+                                      pj_ioqueue_op_key_t *op_key,
 				      const void *data,
-				      pj_size_t datalen,
+				      pj_ssize_t *length,
 				      unsigned flags )
 {
-    int rc;
-    DWORD bytesWritten;
-    DWORD dwFlags;
-
-    PJ_CHECK_STACK();
-    PJ_UNUSED_ARG(ioque);
-
-    if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) {
-        pj_assert(!"Operation already pending for this socket");
-        return PJ_EBUSY;
-    }
-
-    pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped));
-    key->send_overlapped.operation = PJ_IOQUEUE_OP_WRITE;
-    key->send_overlapped.wsabuf.buf = (void*)data;
-    key->send_overlapped.wsabuf.len = datalen;
-    dwFlags = flags;
-    rc = WSASend((SOCKET)key->hnd, &key->send_overlapped.wsabuf, 1,
-                 &bytesWritten,  dwFlags,
-		 &key->send_overlapped.overlapped, NULL);
-    if (rc == SOCKET_ERROR) {
-	DWORD dwStatus = WSAGetLastError();
-        if (dwStatus==WSA_IO_PENDING)
-            return PJ_EPENDING;
-        else
-            return PJ_STATUS_FROM_OS(dwStatus);
-    } else {
-	/* Must always return pending status.
-	 * See comments on pj_ioqueue_read
-	 * return bytesRead;
-         */
-	return PJ_EPENDING;
-    }
+    return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0);
 }
 
 
@@ -646,46 +638,65 @@
  *
  * Initiate overlapped SendTo operation.
  */
-PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque,
-				       pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,

+                                       pj_ioqueue_op_key_t *op_key,
 				       const void *data,
-				       pj_size_t datalen,
+				       pj_ssize_t *length,
                                        unsigned flags,
 				       const pj_sockaddr_t *addr,
 				       int addrlen)
-{
-    BOOL rc;
-    DWORD bytesSent;
-    DWORD dwFlags;
-
-    PJ_CHECK_STACK();
-    PJ_UNUSED_ARG(ioque);
-
-    if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) {
-        pj_assert(!"Operation already pending for this socket");
-        return PJ_EBUSY;
-    }
-
-    pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped));
-    key->send_overlapped.operation = PJ_IOQUEUE_OP_SEND_TO;
-    key->send_overlapped.wsabuf.buf = (char*)data;
-    key->send_overlapped.wsabuf.len = datalen;
-    dwFlags = flags;
-    rc = WSASendTo((SOCKET)key->hnd, &key->send_overlapped.wsabuf, 1, 
-		   &bytesSent, dwFlags, addr, 
-		   addrlen, &key->send_overlapped.overlapped, NULL);
-    if (rc == SOCKET_ERROR) {
-	DWORD dwStatus = WSAGetLastError();
-	if (dwStatus==WSA_IO_PENDING)
-            return PJ_EPENDING;
-        else
-            return PJ_STATUS_FROM_OS(dwStatus);
-    } else {
-	// Must always return pending status.
-	// See comments on pj_ioqueue_read
-	// return bytesSent;
-	return PJ_EPENDING;
-    }
+{

+    int rc;

+    DWORD bytesWritten;

+    DWORD dwFlags;

+    union operation_key *op_key_rec;

+

+    PJ_CHECK_STACK();

+    PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);

+    

+    op_key_rec = (union operation_key*)op_key->internal__;

+

+    dwFlags = flags;

+

+    /*

+     * First try blocking write.

+     */

+    op_key_rec->overlapped.wsabuf.buf = (void*)data;

+    op_key_rec->overlapped.wsabuf.len = *length;

+

+    rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,

+                   &bytesWritten, dwFlags, addr, addrlen,

+                   NULL, NULL);

+    if (rc == 0) {

+        *length = bytesWritten;

+        return PJ_SUCCESS;

+    } else {

+        DWORD dwStatus = WSAGetLastError();

+        if (dwStatus != WSAEWOULDBLOCK) {

+            *length = -1;

+            return PJ_RETURN_OS_ERROR(dwStatus);

+        }

+    }

+

+    /*

+     * Data can't be sent immediately.

+     * Schedule asynchronous WSASend().

+     */

+    pj_memset(&op_key_rec->overlapped.overlapped, 0,

+              sizeof(op_key_rec->overlapped.overlapped));

+    op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND;

+

+    rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,

+                   &bytesWritten,  dwFlags, addr, addrlen,

+		   &op_key_rec->overlapped.overlapped, NULL);

+    if (rc == SOCKET_ERROR) {

+	DWORD dwStatus = WSAGetLastError();

+        if (dwStatus!=WSA_IO_PENDING)

+            return PJ_STATUS_FROM_OS(dwStatus);

+    }

+

+    /* Asynchronous operation successfully submitted. */

+    return PJ_EPENDING;
 }
 
 #if PJ_HAS_TCP
@@ -695,59 +706,93 @@
  *
  * Initiate overlapped accept() operation.
  */
-PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue,
-			       pj_ioqueue_key_t *key,
-			       pj_sock_t *new_sock,
-			       pj_sockaddr_t *local,
-			       pj_sockaddr_t *remote,
-			       int *addrlen)
+PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,

+                                       pj_ioqueue_op_key_t *op_key,
+			               pj_sock_t *new_sock,
+			               pj_sockaddr_t *local,
+			               pj_sockaddr_t *remote,
+			               int *addrlen)
 {
-    BOOL rc;
+    BOOL rc;

     DWORD bytesReceived;
-    pj_status_t status;
+    pj_status_t status;

+    union operation_key *op_key_rec;

+    SOCKET sock;
 
     PJ_CHECK_STACK();
-    PJ_UNUSED_ARG(ioqueue);
+    PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
+

+    /*

+     * See if there is a new connection immediately available.

+     */

+    sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0);

+    if (sock != INVALID_SOCKET) {

+        /* Yes! New socket is available! */

+        int status;

+

+        status = getsockname(sock, local, addrlen);

+        if (status != 0) {

+            DWORD dwError = WSAGetLastError();

+            closesocket(sock);

+            return PJ_RETURN_OS_ERROR(dwError);

+        }

+

+        *new_sock = sock;

+        return PJ_SUCCESS;

+

+    } else {

+        DWORD dwError = WSAGetLastError();

+        if (dwError != WSAEWOULDBLOCK) {

+            return PJ_RETURN_OS_ERROR(dwError);

+        }

+    }

+

+    /*

+     * No connection is immediately available.

+     * Must schedule an asynchronous operation.

+     */

+    op_key_rec = (union operation_key*)op_key->internal__;

+    
+    status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, 

+                            &op_key_rec->accept.newsock);
+    if (status != PJ_SUCCESS)
+	return status;
+

+    /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket 

+     * addresses can be obtained with getsockname() and getpeername().

+     */

+    status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET,

+                        SO_UPDATE_ACCEPT_CONTEXT, 

+                        (char*)&key->hnd, sizeof(SOCKET));

+    /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.

+     * So ignore the error status.

+     */

+

+    op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT;

+    op_key_rec->accept.addrlen = addrlen;
+    op_key_rec->accept.local = local;
+    op_key_rec->accept.remote = remote;
+    op_key_rec->accept.newsock_ptr = new_sock;
+    pj_memset(&op_key_rec->accept.overlapped, 0, 
+	      sizeof(op_key_rec->accept.overlapped));
 
-    if (key->accept_overlapped.operation != PJ_IOQUEUE_OP_NONE) {
-        pj_assert(!"Operation already pending for this socket");
-        return PJ_EBUSY;
-    }
-
-    if (key->accept_overlapped.newsock == PJ_INVALID_SOCKET) {
-	pj_sock_t sock;
-	status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, &sock);
-	if (status != PJ_SUCCESS)
-	    return status;
-
-	key->accept_overlapped.newsock = sock;
-    }
-    key->accept_overlapped.operation = PJ_IOQUEUE_OP_ACCEPT;
-    key->accept_overlapped.addrlen = addrlen;
-    key->accept_overlapped.local = local;
-    key->accept_overlapped.remote = remote;
-    key->accept_overlapped.newsock_ptr = new_sock;
-    pj_memset(&key->accept_overlapped.overlapped, 0, 
-	      sizeof(key->accept_overlapped.overlapped));
-
-    rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)key->accept_overlapped.newsock,
-		   key->accept_overlapped.accept_buf,
+    rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock,
+		   op_key_rec->accept.accept_buf,
 		   0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN,
 		   &bytesReceived,
-		   &key->accept_overlapped.overlapped);
+		   &op_key_rec->accept.overlapped );
 
     if (rc == TRUE) {
-	ioqueue_on_accept_complete(&key->accept_overlapped);
-        if (key->cb.on_accept_complete)
-	    key->cb.on_accept_complete(key, key->accept_overlapped.newsock, 0);
+	ioqueue_on_accept_complete(&op_key_rec->accept);
 	return PJ_SUCCESS;
     } else {
 	DWORD dwStatus = WSAGetLastError();
-	if (dwStatus==WSA_IO_PENDING)
-            return PJ_EPENDING;
-        else
+	if (dwStatus!=WSA_IO_PENDING)
             return PJ_STATUS_FROM_OS(dwStatus);
-    }
+    }

+

+    /* Asynchronous Accept() has been submitted. */

+    return PJ_EPENDING;
 }
 
 
@@ -757,42 +802,29 @@
  * Initiate overlapped connect() operation (well, it's non-blocking actually,
  * since there's no overlapped version of connect()).
  */
-PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue,
-					pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
 					const pj_sockaddr_t *addr,
 					int addrlen )
 {
-    unsigned long optval = 1;
-    HANDLE hEvent;
+    HANDLE hEvent;

+    pj_ioqueue_t *ioqueue;
 
     PJ_CHECK_STACK();
-
-    /* Set socket to non-blocking. */
-    if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) {
-	return PJ_RETURN_OS_ERROR(WSAGetLastError());
-    }
+    PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);

 
     /* Initiate connect() */
     if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) {
 	DWORD dwStatus;
 	dwStatus = WSAGetLastError();
-	if (dwStatus != WSAEWOULDBLOCK) {
-	    /* Permanent error */
+        if (dwStatus != WSAEWOULDBLOCK) {
 	    return PJ_RETURN_OS_ERROR(dwStatus);
-	} else {
-	    /* Pending operation. This is what we're looking for. */
 	}
     } else {
 	/* Connect has completed immediately! */
-	/* Restore to blocking mode. */
-	optval = 0;
-	if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) {
-	    return PJ_RETURN_OS_ERROR(WSAGetLastError());
-	}
-
-	key->cb.on_connect_complete(key, 0);
 	return PJ_SUCCESS;
     }
+

+    ioqueue = key->ioqueue;

 
     /* Add to the array of connecting socket to be polled */
     pj_lock_acquire(ioqueue->lock);
diff --git a/pjlib/src/pjlib-test/atomic.c b/pjlib/src/pjlib-test/atomic.c
index 429085e..09bdfdb 100644
--- a/pjlib/src/pjlib-test/atomic.c
+++ b/pjlib/src/pjlib-test/atomic.c
@@ -47,21 +47,29 @@
     /* get: check the value. */
     if (pj_atomic_get(atomic_var) != 111)
         return -30;
-
-    /* increment. */
-    if (pj_atomic_inc(atomic_var) != 112)
+

+    /* increment. */

+    pj_atomic_inc(atomic_var);
+    if (pj_atomic_get(atomic_var) != 112)
         return -40;
 
-    /* decrement. */
-    if (pj_atomic_dec(atomic_var) != 111)
+    /* decrement. */

+    pj_atomic_dec(atomic_var);
+    if (pj_atomic_get(atomic_var) != 111)
         return -50;
 
-    /* set */
-    if (pj_atomic_set(atomic_var, 211) != 111)
+    /* set */

+    pj_atomic_set(atomic_var, 211);
+    if (pj_atomic_get(atomic_var) != 211)
         return -60;
+

+    /* add */

+    pj_atomic_add(atomic_var, 10);

+    if (pj_atomic_get(atomic_var) != 221)

+        return -60;

 
     /* check the value again. */
-    if (pj_atomic_get(atomic_var) != 211)
+    if (pj_atomic_get(atomic_var) != 221)
         return -70;
 
     /* destroy */
diff --git a/pjlib/src/pjlib-test/ioq_perf.c b/pjlib/src/pjlib-test/ioq_perf.c
index cb93c4c..4cd1106 100644
--- a/pjlib/src/pjlib-test/ioq_perf.c
+++ b/pjlib/src/pjlib-test/ioq_perf.c
@@ -34,77 +34,105 @@
 /* Descriptor for each producer/consumer pair. */
 typedef struct test_item
 {
-    pj_sock_t       server_fd, 
-                    client_fd;
-    pj_ioqueue_t    *ioqueue;
-    pj_ioqueue_key_t *server_key,
-                   *client_key;
-    pj_size_t       buffer_size;
-    char           *outgoing_buffer;
-    char           *incoming_buffer;
-    pj_size_t       bytes_sent, 
-                    bytes_recv;
+    pj_sock_t            server_fd, 
+                         client_fd;
+    pj_ioqueue_t        *ioqueue;
+    pj_ioqueue_key_t    *server_key,
+                        *client_key;

+    pj_ioqueue_op_key_t  recv_op,

+                         send_op;

+    int                  has_pending_send;
+    pj_size_t            buffer_size;
+    char                *outgoing_buffer;
+    char                *incoming_buffer;
+    pj_size_t            bytes_sent, 
+                         bytes_recv;
 } test_item;
 
 /* Callback when data has been read.
  * Increment item->bytes_recv and ready to read the next data.
  */
-static void on_read_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_read)
+static void on_read_complete(pj_ioqueue_key_t *key, 

+                             pj_ioqueue_op_key_t *op_key,

+                             pj_ssize_t bytes_read)
 {
     test_item *item = pj_ioqueue_get_user_data(key);
-    pj_status_t rc;
+    pj_status_t rc;

+    int data_is_available = 1;
 
     //TRACE_((THIS_FILE, "     read complete, bytes_read=%d", bytes_read));
+

+    do {
+        if (thread_quit_flag)
+            return;
 
-    if (thread_quit_flag)
-        return;
+        if (bytes_read < 0) {
+            pj_status_t rc = -bytes_read;
+            char errmsg[128];
 
-    if (bytes_read < 0) {
-        pj_status_t rc = -bytes_read;
-        char errmsg[128];
+	    if (rc != last_error) {
+	        last_error = rc;
+	        pj_strerror(rc, errmsg, sizeof(errmsg));
+	        PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)", 
+		          bytes_read, errmsg));
+	        PJ_LOG(3,(THIS_FILE, 
+		          ".....additional info: total read=%u, total written=%u",
+		          item->bytes_recv, item->bytes_sent));
+	    } else {
+	        last_error_counter++;
+	    }
+            bytes_read = 0;
 
-	if (rc != last_error) {
-	    last_error = rc;
-	    pj_strerror(rc, errmsg, sizeof(errmsg));
-	    PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)", 
-		      bytes_read, errmsg));
-	    PJ_LOG(3,(THIS_FILE, 
-		      ".....additional info: total read=%u, total written=%u",
-		      item->bytes_recv, item->bytes_sent));
-	} else {
-	    last_error_counter++;
-	}
-        bytes_read = 0;
+        } else if (bytes_read == 0) {
+            PJ_LOG(3,(THIS_FILE, "...socket has closed!"));
+        }
 
-    } else if (bytes_read == 0) {
-        PJ_LOG(3,(THIS_FILE, "...socket has closed!"));
-    }
-
-    item->bytes_recv += bytes_read;
+        item->bytes_recv += bytes_read;
     
-    /* To assure that the test quits, even if main thread
-     * doesn't have time to run.
-     */
-    if (item->bytes_recv > item->buffer_size * 10000) 
-	thread_quit_flag = 1;
+        /* To assure that the test quits, even if main thread
+         * doesn't have time to run.
+         */
+        if (item->bytes_recv > item->buffer_size * 10000) 
+	    thread_quit_flag = 1;
+

+        bytes_read = item->buffer_size;
+        rc = pj_ioqueue_recv( key, op_key,
+                              item->incoming_buffer, &bytes_read, 0 );
 
-    rc = pj_ioqueue_recv( item->ioqueue, item->server_key,
-                          item->incoming_buffer, item->buffer_size, 0 );
-
-    if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
-	if (rc != last_error) {
-	    last_error = rc;
-	    app_perror("...error: read error", rc);
-	} else {
-	    last_error_counter++;
-	}
-    }
+        if (rc == PJ_SUCCESS) {

+            data_is_available = 1;

+        } else if (rc == PJ_EPENDING) {

+            data_is_available = 0;

+        } else {

+            data_is_available = 0;
+	    if (rc != last_error) {
+	        last_error = rc;
+	        app_perror("...error: read error", rc);
+	    } else {
+	        last_error_counter++;
+	    }
+        }

+

+        if (!item->has_pending_send) {

+            pj_ssize_t sent = item->buffer_size;

+            rc = pj_ioqueue_send(item->client_key, &item->send_op,

+                                 item->outgoing_buffer, &sent, 0);

+            if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {

+                app_perror("...error: write error", rc);

+            }

+

+            item->has_pending_send = (rc==PJ_EPENDING);

+        }

+

+    } while (data_is_available);
 }
 
 /* Callback when data has been written.
  * Increment item->bytes_sent and write the next data.
  */
-static void on_write_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent)
+static void on_write_complete(pj_ioqueue_key_t *key, 

+                              pj_ioqueue_op_key_t *op_key,

+                              pj_ssize_t bytes_sent)
 {
     test_item *item = pj_ioqueue_get_user_data(key);
     
@@ -112,7 +140,8 @@
 
     if (thread_quit_flag)
         return;
-
+

+    item->has_pending_send = 0;
     item->bytes_sent += bytes_sent;
 
     if (bytes_sent <= 0) {
@@ -121,12 +150,15 @@
     } 
     else {
         pj_status_t rc;
-
-        rc = pj_ioqueue_write(item->ioqueue, item->client_key, 
-                              item->outgoing_buffer, item->buffer_size);
+

+        bytes_sent = item->buffer_size;
+        rc = pj_ioqueue_send( item->client_key, op_key,
+                              item->outgoing_buffer, &bytes_sent, 0);
         if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
             app_perror("...error: write error", rc);
-        }
+        }

+

+        item->has_pending_send = (rc==PJ_EPENDING);
     }
 }
 
@@ -191,7 +223,7 @@
     thread = pj_pool_alloc(pool, thread_cnt*sizeof(pj_thread_t*));
 
     TRACE_((THIS_FILE, "     creating ioqueue.."));
-    rc = pj_ioqueue_create(pool, sockpair_cnt*2, thread_cnt, &ioqueue);
+    rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue);
     if (rc != PJ_SUCCESS) {
         app_perror("...error: unable to create ioqueue", rc);
         return -15;
@@ -199,6 +231,7 @@
 
     /* Initialize each producer-consumer pair. */
     for (i=0; i<sockpair_cnt; ++i) {
+        pj_ssize_t bytes;

 
         items[i].ioqueue = ioqueue;
         items[i].buffer_size = buffer_size;
@@ -241,24 +274,27 @@
         }
 
         /* Start reading. */
-	TRACE_((THIS_FILE, "      pj_ioqueue_recv.."));
-        rc = pj_ioqueue_recv(ioqueue, items[i].server_key,
-                             items[i].incoming_buffer, items[i].buffer_size,
+	TRACE_((THIS_FILE, "      pj_ioqueue_recv.."));

+        bytes = items[i].buffer_size;
+        rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op,
+                             items[i].incoming_buffer, &bytes,
 			     0);
-        if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
+        if (rc != PJ_EPENDING) {
             app_perror("...error: pj_ioqueue_recv", rc);
             return -73;
         }
 
         /* Start writing. */
-	TRACE_((THIS_FILE, "      pj_ioqueue_write.."));
-        rc = pj_ioqueue_write(ioqueue, items[i].client_key,
-                              items[i].outgoing_buffer, items[i].buffer_size);
+	TRACE_((THIS_FILE, "      pj_ioqueue_write.."));

+        bytes = items[i].buffer_size;
+        rc = pj_ioqueue_send(items[i].client_key, &items[i].recv_op,
+                             items[i].outgoing_buffer, &bytes, 0);
         if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
             app_perror("...error: pj_ioqueue_write", rc);
             return -76;
         }
-
+

+        items[i].has_pending_send = (rc==PJ_EPENDING);
     }
 
     /* Create the threads. */
@@ -324,8 +360,8 @@
     /* Close all sockets. */
     TRACE_((THIS_FILE, "     closing all sockets.."));
     for (i=0; i<sockpair_cnt; ++i) {
-        pj_ioqueue_unregister(ioqueue, items[i].server_key);
-        pj_ioqueue_unregister(ioqueue, items[i].client_key);
+        pj_ioqueue_unregister(items[i].server_key);
+        pj_ioqueue_unregister(items[i].client_key);
         pj_sock_close(items[i].server_fd);
         pj_sock_close(items[i].client_fd);
     }
diff --git a/pjlib/src/pjlib-test/ioq_tcp.c b/pjlib/src/pjlib-test/ioq_tcp.c
index ebce633..fd5329e 100644
--- a/pjlib/src/pjlib-test/ioq_tcp.c
+++ b/pjlib/src/pjlib-test/ioq_tcp.c
@@ -31,33 +31,45 @@
 #define SOCK_INACTIVE_MAX   (PJ_IOQUEUE_MAX_HANDLES - 2)
 #define POOL_SIZE	    (2*BUF_MAX_SIZE + SOCK_INACTIVE_MAX*128 + 2048)
 
-static pj_ssize_t	callback_read_size,
-                        callback_write_size,
-                        callback_accept_status,
-                        callback_connect_status;
-static pj_ioqueue_key_t*callback_read_key,
-                       *callback_write_key,
-                       *callback_accept_key,
-                       *callback_connect_key;
+static pj_ssize_t	     callback_read_size,
+                             callback_write_size,
+                             callback_accept_status,
+                             callback_connect_status;
+static pj_ioqueue_key_t     *callback_read_key,
+                            *callback_write_key,
+                            *callback_accept_key,
+                            *callback_connect_key;

+static pj_ioqueue_op_key_t  *callback_read_op,

+                            *callback_write_op,

+                            *callback_accept_op;
 
-static void on_ioqueue_read(pj_ioqueue_key_t *key, pj_ssize_t bytes_read)
+static void on_ioqueue_read(pj_ioqueue_key_t *key, 

+                            pj_ioqueue_op_key_t *op_key,

+                            pj_ssize_t bytes_read)
 {
-    callback_read_key = key;
+    callback_read_key = key;

+    callback_read_op = op_key;
     callback_read_size = bytes_read;
 }
 
-static void on_ioqueue_write(pj_ioqueue_key_t *key, pj_ssize_t bytes_written)
+static void on_ioqueue_write(pj_ioqueue_key_t *key, 

+                             pj_ioqueue_op_key_t *op_key,

+                             pj_ssize_t bytes_written)
 {
-    callback_write_key = key;
+    callback_write_key = key;

+    callback_write_op = op_key;
     callback_write_size = bytes_written;
 }
 
-static void on_ioqueue_accept(pj_ioqueue_key_t *key, pj_sock_t sock, 
+static void on_ioqueue_accept(pj_ioqueue_key_t *key, 

+                              pj_ioqueue_op_key_t *op_key,

+                              pj_sock_t sock, 
                               int status)
 {
     PJ_UNUSED_ARG(sock);
 
-    callback_accept_key = key;
+    callback_accept_key = key;

+    callback_accept_op = op_key;
     callback_accept_status = status;
 }
 
@@ -83,28 +95,38 @@
 			  pj_ssize_t bufsize,
 			  pj_timestamp *t_elapsed)
 {
-    int rc;
-    pj_ssize_t bytes;
+    pj_status_t status;
+    pj_ssize_t bytes;

+    pj_time_val timeout;
     pj_timestamp t1, t2;
-    int pending_op = 0;
+    int pending_op = 0;

+    pj_ioqueue_op_key_t read_op, write_op;
 
-    // Start reading on the server side.
-    rc = pj_ioqueue_read(ioque, skey, recv_buf, bufsize);
-    if (rc != 0 && rc != PJ_EPENDING) {
+    // Start reading on the server side.

+    bytes = bufsize;
+    status = pj_ioqueue_recv(skey, &read_op, recv_buf, &bytes, 0);
+    if (status != PJ_SUCCESS && status != PJ_EPENDING) {

+        app_perror("...pj_ioqueue_recv error", status);
 	return -100;
     }
-    
-    ++pending_op;
+    

+    if (status == PJ_EPENDING)
+        ++pending_op;

+    else {

+        /* Does not expect to return error or immediate data. */

+        return -115;

+    }
 
     // Randomize send buffer.
     pj_create_random_string((char*)send_buf, bufsize);
 
-    // Starts send on the client side.
-    bytes = pj_ioqueue_write(ioque, ckey, send_buf, bufsize);
-    if (bytes != bufsize && bytes != PJ_EPENDING) {
+    // Starts send on the client side.

+    bytes = bufsize;
+    status = pj_ioqueue_send(ckey, &write_op, send_buf, &bytes, 0);
+    if (status != PJ_SUCCESS && bytes != PJ_EPENDING) {
 	return -120;
     }
-    if (bytes == PJ_EPENDING) {
+    if (status == PJ_EPENDING) {
 	++pending_op;
     }
 
@@ -113,37 +135,52 @@
 
     // Reset indicators
     callback_read_size = callback_write_size = 0;
-    callback_read_key = callback_write_key = NULL;
+    callback_read_key = callback_write_key = NULL;

+    callback_read_op = callback_write_op = NULL;
 
     // Poll the queue until we've got completion event in the server side.
-    rc = 0;
-    while (pending_op > 0) {
-	rc = pj_ioqueue_poll(ioque, NULL);
-	if (rc > 0) {
+    status = 0;
+    while (pending_op > 0) {

+        timeout.sec = 1; timeout.msec = 0;
+	status = pj_ioqueue_poll(ioque, &timeout);
+	if (status > 0) {
             if (callback_read_size) {
-                if (callback_read_size != bufsize) {
+                if (callback_read_size != bufsize)
                     return -160;
-                }
                 if (callback_read_key != skey)
-                    return -161;
+                    return -161;

+                if (callback_read_op != &read_op)

+                    return -162;
             }
             if (callback_write_size) {
                 if (callback_write_key != ckey)
-                    return -162;
+                    return -163;

+                if (callback_write_op != &write_op)

+                    return -164;
             }
-	    pending_op -= rc;
-	}
-	if (rc < 0) {
+	    pending_op -= status;
+	}

+        if (status == 0) {

+            PJ_LOG(3,("", "...error: timed out"));

+        }
+	if (status < 0) {
 	    return -170;
 	}
     }
+

+    // Pending op is zero.

+    // Subsequent poll should yield zero too.

+    timeout.sec = timeout.msec = 0;

+    status = pj_ioqueue_poll(ioque, &timeout);

+    if (status != 0)

+        return -173;

 
     // End time.
     pj_get_timestamp(&t2);
     t_elapsed->u32.lo += (t2.u32.lo - t1.u32.lo);
 
-    if (rc < 0) {
-	return -150;
+    if (status < 0) {
+	return -176;
     }
 
     // Compare recv buffer with send buffer.
@@ -167,7 +204,8 @@
     pj_pool_t *pool = NULL;
     char *send_buf, *recv_buf;
     pj_ioqueue_t *ioque = NULL;
-    pj_ioqueue_key_t *skey, *ckey0, *ckey1;
+    pj_ioqueue_key_t *skey, *ckey0, *ckey1;

+    pj_ioqueue_op_key_t accept_op;
     int bufsize = BUF_MIN_SIZE;
     pj_ssize_t status = -1;
     int pending_op = 0;
@@ -205,7 +243,7 @@
     }
 
     // Create I/O Queue.
-    rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, 0, &ioque);
+    rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque);
     if (rc != PJ_SUCCESS) {
         app_perror("...ERROR in pj_ioqueue_create()", rc);
 	status=-20; goto on_error;
@@ -231,7 +269,8 @@
 
     // Server socket accept()
     client_addr_len = sizeof(pj_sockaddr_in);
-    status = pj_ioqueue_accept(ioque, skey, &csock0, &client_addr, &rmt_addr, &client_addr_len);
+    status = pj_ioqueue_accept(skey, &accept_op, &csock0, 

+                               &client_addr, &rmt_addr, &client_addr_len);
     if (status != PJ_EPENDING) {
         app_perror("...ERROR in pj_ioqueue_accept()", rc);
 	status=-30; goto on_error;
@@ -247,7 +286,7 @@
     addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1"));
 
     // Client socket connect()
-    status = pj_ioqueue_connect(ioque, ckey1, &addr, sizeof(addr));
+    status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr));
     if (status!=PJ_SUCCESS && status != PJ_EPENDING) {
         app_perror("...ERROR in pj_ioqueue_connect()", rc);
 	status=-40; goto on_error;
@@ -262,6 +301,7 @@
 
     callback_read_key = callback_write_key = 
         callback_accept_key = callback_connect_key = NULL;
+    callback_accept_op = callback_read_op = callback_write_op = NULL;

 
     while (pending_op) {
 	pj_time_val timeout = {1, 0};
@@ -273,8 +313,12 @@
                     status=-41; goto on_error;
                 }
                 if (callback_accept_key != skey) {
-                    status=-41; goto on_error;
-                }
+                    status=-42; goto on_error;
+                }

+                if (callback_accept_op != &accept_op) {

+                    status=-43; goto on_error;

+                }

+                callback_accept_status = -2;
             }
 
             if (callback_connect_status != -2) {
@@ -283,7 +327,8 @@
                 }
                 if (callback_connect_key != ckey1) {
                     status=-51; goto on_error;
-                }
+                }

+                callback_connect_status = -2;
             }
 
 	    pending_op -= status;
@@ -293,6 +338,16 @@
 	    }
 	}
     }
+

+    // There's no pending operation.

+    // When we poll the ioqueue, there must not be events.

+    if (pending_op == 0) {

+        pj_time_val timeout = {1, 0};

+        status = pj_ioqueue_poll(ioque, &timeout);

+        if (status != 0) {

+            status=-60; goto on_error;

+        }

+    }

 
     // Check accepted socket.
     if (csock0 == PJ_INVALID_SOCKET) {
@@ -312,7 +367,8 @@
 
     // Test send and receive.
     t_elapsed.u32.lo = 0;
-    status = send_recv_test(ioque, ckey0, ckey1, send_buf, recv_buf, bufsize, &t_elapsed);
+    status = send_recv_test(ioque, ckey0, ckey1, send_buf, 

+                            recv_buf, bufsize, &t_elapsed);
     if (status != 0) {
 	goto on_error;
     }
@@ -354,7 +410,7 @@
     pool = pj_pool_create(mem, NULL, POOL_SIZE, 4000, NULL);
 
     // Create I/O Queue.
-    rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, 0, &ioque);
+    rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque);
     if (!ioque) {
 	status=-20; goto on_error;
     }
@@ -381,7 +437,7 @@
     addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1"));
 
     // Client socket connect()
-    status = pj_ioqueue_connect(ioque, ckey1, &addr, sizeof(addr));
+    status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr));
     if (status==PJ_SUCCESS) {
 	// unexpectedly success!
 	status = -30;
@@ -416,7 +472,17 @@
 	    }
 	}
     }
-
+

+    // There's no pending operation.

+    // When we poll the ioqueue, there must not be events.

+    if (pending_op == 0) {

+        pj_time_val timeout = {1, 0};

+        status = pj_ioqueue_poll(ioque, &timeout);

+        if (status != 0) {

+            status=-60; goto on_error;

+        }

+    }

+

     // Success
     status = 0;
 
diff --git a/pjlib/src/pjlib-test/ioq_udp.c b/pjlib/src/pjlib-test/ioq_udp.c
index a59dac8..6ee90e4 100644
--- a/pjlib/src/pjlib-test/ioq_udp.c
+++ b/pjlib/src/pjlib-test/ioq_udp.c
@@ -34,31 +34,43 @@
 #undef TRACE_
 #define TRACE_(msg)	    PJ_LOG(3,(THIS_FILE,"....." msg))
 
-static pj_ssize_t callback_read_size,
-                  callback_write_size,
-                  callback_accept_status,
-                  callback_connect_status;
-static pj_ioqueue_key_t *callback_read_key,
-                        *callback_write_key,
-                        *callback_accept_key,
-                        *callback_connect_key;
+static pj_ssize_t            callback_read_size,
+                             callback_write_size,
+                             callback_accept_status,
+                             callback_connect_status;
+static pj_ioqueue_key_t     *callback_read_key,
+                            *callback_write_key,
+                            *callback_accept_key,
+                            *callback_connect_key;
+static pj_ioqueue_op_key_t  *callback_read_op,

+                            *callback_write_op,

+                            *callback_accept_op;

 
-static void on_ioqueue_read(pj_ioqueue_key_t *key, pj_ssize_t bytes_read)
+static void on_ioqueue_read(pj_ioqueue_key_t *key, 

+                            pj_ioqueue_op_key_t *op_key,

+                            pj_ssize_t bytes_read)
 {
-    callback_read_key = key;
+    callback_read_key = key;

+    callback_read_op = op_key;
     callback_read_size = bytes_read;
 }
 
-static void on_ioqueue_write(pj_ioqueue_key_t *key, pj_ssize_t bytes_written)
+static void on_ioqueue_write(pj_ioqueue_key_t *key, 

+                             pj_ioqueue_op_key_t *op_key,

+                             pj_ssize_t bytes_written)
 {
-    callback_write_key = key;
+    callback_write_key = key;

+    callback_write_op = op_key;
     callback_write_size = bytes_written;
 }
 
-static void on_ioqueue_accept(pj_ioqueue_key_t *key, pj_sock_t sock, int status)
+static void on_ioqueue_accept(pj_ioqueue_key_t *key, 

+                              pj_ioqueue_op_key_t *op_key,

+                              pj_sock_t sock, int status)
 {
     PJ_UNUSED_ARG(sock);
-    callback_accept_key = key;
+    callback_accept_key = key;

+    callback_accept_op = op_key;
     callback_accept_status = status;
 }
 
@@ -83,29 +95,6 @@
 #endif
 
 /*
- * native_format_test()
- * This is just a simple test to verify that various structures in sock.h
- * are really compatible with operating system's definitions.
- */
-static int native_format_test(void)
-{
-    pj_status_t rc;
-
-    // Test that PJ_INVALID_SOCKET is working.
-    {
-	pj_sock_t sock;
-	rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, -1, &sock);
-	if (rc == PJ_SUCCESS)
-	    return -1020;
-    }
-
-    // Previous func will set errno var.
-    pj_set_os_error(PJ_SUCCESS);
-
-    return 0;
-}
-
-/*
  * compliance_test()
  * To test that the basic IOQueue functionality works. It will just exchange
  * data between two sockets.
@@ -118,7 +107,8 @@
     pj_pool_t *pool = NULL;
     char *send_buf, *recv_buf;
     pj_ioqueue_t *ioque = NULL;
-    pj_ioqueue_key_t *skey, *ckey;
+    pj_ioqueue_key_t *skey, *ckey;

+    pj_ioqueue_op_key_t read_op, write_op;
     int bufsize = BUF_MIN_SIZE;
     pj_ssize_t bytes, status = -1;
     pj_str_t temp;
@@ -157,8 +147,7 @@
 
     // Create I/O Queue.
     TRACE_("create ioqueue...");
-    rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, 
-			   PJ_IOQUEUE_DEFAULT_THREADS, &ioque);
+    rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque);
     if (rc != PJ_SUCCESS) {
 	status=-20; goto on_error;
     }
@@ -194,12 +183,14 @@
 
     // Register reading from ioqueue.
     TRACE_("start recvfrom...");
-    addrlen = sizeof(addr);
-    bytes = pj_ioqueue_recvfrom(ioque, skey, recv_buf, bufsize, 0,
-			        &addr, &addrlen);
-    if (bytes < 0 && bytes != PJ_EPENDING) {
+    addrlen = sizeof(addr);

+    bytes = bufsize;
+    rc = pj_ioqueue_recvfrom(skey, &read_op, recv_buf, &bytes, 0,
+			     &addr, &addrlen);
+    if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {

+        app_perror("...error: pj_ioqueue_recvfrom", rc);
 	status=-28; goto on_error;
-    } else if (bytes == PJ_EPENDING) {
+    } else if (rc == PJ_EPENDING) {
 	recv_pending = 1;
 	PJ_LOG(3, (THIS_FILE, 
 		   "......ok: recvfrom returned pending"));
@@ -210,14 +201,14 @@
     }
 
     // Write must return the number of bytes.
-    TRACE_("start sendto...");
-    bytes = pj_ioqueue_sendto(ioque, ckey, send_buf, bufsize, 0, &addr, 
-			      sizeof(addr));
-    if (bytes != bufsize && bytes != PJ_EPENDING) {
-	PJ_LOG(1,(THIS_FILE, 
-		  "......error: sendto returned %d", bytes));
+    TRACE_("start sendto...");

+    bytes = bufsize;
+    rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0, &addr, 
+			   sizeof(addr));
+    if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {

+        app_perror("...error: pj_ioqueue_sendto", rc);
 	status=-30; goto on_error;
-    } else if (bytes == PJ_EPENDING) {
+    } else if (rc == PJ_EPENDING) {
 	send_pending = 1;
 	PJ_LOG(3, (THIS_FILE, 
 		   "......ok: sendto returned pending"));
@@ -232,9 +223,10 @@
     callback_accept_status = callback_connect_status = -2;
     callback_read_key = callback_write_key = 
         callback_accept_key = callback_connect_key = NULL;
+    callback_read_op = callback_write_op = NULL;

 
     // Poll if pending.
-    while (send_pending && recv_pending) {
+    while (send_pending || recv_pending) {
 	int rc;
 	pj_time_val timeout = { 5, 0 };
 
@@ -253,9 +245,11 @@
             if (callback_read_size != bufsize) {
                 status=-61; goto on_error;
             }
-
             if (callback_read_key != skey) {
                 status=-65; goto on_error;
+            }

+            if (callback_read_op != &read_op) {

+                status=-66; goto on_error;

             }
 
 	    if (memcmp(send_buf, recv_buf, bufsize) != 0) {
@@ -270,9 +264,11 @@
             if (callback_write_size != bufsize) {
                 status=-73; goto on_error;
             }
-
             if (callback_write_key != ckey) {
                 status=-75; goto on_error;
+            }

+            if (callback_write_op != &write_op) {

+                status=-76; goto on_error;

             }
 
             send_pending = 0;
@@ -326,9 +322,7 @@
     sock = pj_pool_alloc(pool, MAX*sizeof(pj_sock_t));
     
     /* Create IOQueue */
-    rc = pj_ioqueue_create(pool, MAX,
-			   PJ_IOQUEUE_DEFAULT_THREADS,
-			   &ioqueue);
+    rc = pj_ioqueue_create(pool, MAX, &ioqueue);
     if (rc != PJ_SUCCESS || ioqueue == NULL) {
 	app_perror("...error in pj_ioqueue_create", rc);
 	return -10;
@@ -358,7 +352,7 @@
     /* Now deregister and close all handles. */ 
 
     for (i=0; i<count; ++i) {
-	rc = pj_ioqueue_unregister(ioqueue, key[i]);
+	rc = pj_ioqueue_unregister(key[i]);
 	if (rc != PJ_SUCCESS) {
 	    app_perror("...error in pj_ioqueue_unregister", rc);
 	}
@@ -392,7 +386,8 @@
     pj_sock_t ssock=-1, csock=-1;
     pj_sockaddr_in addr;
     pj_pool_t *pool = NULL;
-    pj_sock_t *inactive_sock=NULL;
+    pj_sock_t *inactive_sock=NULL;

+    pj_ioqueue_op_key_t *inactive_read_op;
     char *send_buf, *recv_buf;
     pj_ioqueue_t *ioque = NULL;
     pj_ioqueue_key_t *skey, *ckey, *key;
@@ -429,8 +424,7 @@
     pj_assert(inactive_sock_count+2 <= PJ_IOQUEUE_MAX_HANDLES);
 
     // Create I/O Queue.
-    rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, 
-			   PJ_IOQUEUE_DEFAULT_THREADS, &ioque);
+    rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque);
     if (rc != PJ_SUCCESS) {
 	app_perror("...error: pj_ioqueue_create()", rc);
 	goto on_error;
@@ -439,10 +433,14 @@
     // Allocate inactive sockets, and bind them to some arbitrary address.
     // Then register them to the I/O queue, and start a read operation.
     inactive_sock = (pj_sock_t*)pj_pool_alloc(pool, 
-				    inactive_sock_count*sizeof(pj_sock_t));
+				    inactive_sock_count*sizeof(pj_sock_t));

+    inactive_read_op = (pj_ioqueue_op_key_t*)pj_pool_alloc(pool,

+                              inactive_sock_count*sizeof(pj_ioqueue_op_key_t));
     memset(&addr, 0, sizeof(addr));
     addr.sin_family = PJ_AF_INET;
-    for (i=0; i<inactive_sock_count; ++i) {
+    for (i=0; i<inactive_sock_count; ++i) {

+        pj_ssize_t bytes;

+
 	rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &inactive_sock[i]);
 	if (rc != PJ_SUCCESS || inactive_sock[i] < 0) {
 	    app_perror("...error: pj_sock_socket()", rc);
@@ -462,8 +460,9 @@
 	    app_perror("...error(1): pj_ioqueue_register_sock()", rc);
 	    PJ_LOG(3,(THIS_FILE, "....i=%d", i));
 	    goto on_error;
-	}
-	rc = pj_ioqueue_read(ioque, key, recv_buf, bufsize);
+	}

+        bytes = bufsize;
+	rc = pj_ioqueue_recv(key, &inactive_read_op[i], recv_buf, &bytes, 0);
 	if ( rc < 0 && rc != PJ_EPENDING) {
 	    pj_sock_close(inactive_sock[i]);
 	    inactive_sock[i] = PJ_INVALID_SOCKET;
@@ -495,22 +494,25 @@
     // Test loop.
     t_elapsed.u64 = 0;
     for (i=0; i<LOOP; ++i) {
-	pj_ssize_t bytes;
+	pj_ssize_t bytes;

+        pj_ioqueue_op_key_t read_op, write_op;
 
 	// Randomize send buffer.
 	pj_create_random_string(send_buf, bufsize);
 
-	// Start reading on the server side.
-	rc = pj_ioqueue_read(ioque, skey, recv_buf, bufsize);
+	// Start reading on the server side.

+        bytes = bufsize;
+	rc = pj_ioqueue_recv(skey, &read_op, recv_buf, &bytes, 0);
 	if (rc < 0 && rc != PJ_EPENDING) {
 	    app_perror("...error: pj_ioqueue_read()", rc);
 	    break;
 	}
 
-	// Starts send on the client side.
-	bytes = pj_ioqueue_sendto(ioque, ckey, send_buf, bufsize, 0,
-					&addr, sizeof(addr));
-	if (bytes != bufsize && bytes != PJ_EPENDING) {
+	// Starts send on the client side.

+        bytes = bufsize;
+	rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0,
+			       &addr, sizeof(addr));
+	if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
 	    app_perror("...error: pj_ioqueue_write()", bytes);
 	    rc = -1;
 	    break;
@@ -585,7 +587,7 @@
 	pj_sock_close(csock);
     for (i=0; i<inactive_sock_count && inactive_sock && 
 	      inactive_sock[i]!=PJ_INVALID_SOCKET; ++i) 
-    {
+    {

 	pj_sock_close(inactive_sock[i]);
     }
     if (ioque != NULL)
@@ -599,11 +601,6 @@
     int status;
     int bufsize, sock_count;
 
-    PJ_LOG(3, (THIS_FILE, "...format test"));
-    if ((status = native_format_test()) != 0)
-	return status;
-    PJ_LOG(3, (THIS_FILE, "....native format test ok"));
-
     PJ_LOG(3, (THIS_FILE, "...compliance test"));
     if ((status=compliance_test()) != 0) {
 	return status;
diff --git a/pjlib/src/pjlib-test/main.c b/pjlib/src/pjlib-test/main.c
index 96acc92..6a764e6 100644
--- a/pjlib/src/pjlib-test/main.c
+++ b/pjlib/src/pjlib-test/main.c
@@ -11,7 +11,8 @@
 extern int param_echo_port;
 
 
-#if defined(PJ_WIN32) && PJ_WIN32!=0
+//#if defined(PJ_WIN32) && PJ_WIN32!=0

+#if 0
 #include <windows.h>
 static void boost(void)
 {
diff --git a/pjlib/src/pjlib-test/test.c b/pjlib/src/pjlib-test/test.c
index 5e804f6..e9a62e3 100644
--- a/pjlib/src/pjlib-test/test.c
+++ b/pjlib/src/pjlib-test/test.c
@@ -121,10 +121,10 @@
 #if PJ_HAS_TCP && INCLUDE_TCP_IOQUEUE_TEST
     DO_TEST( tcp_ioqueue_test() );
 #endif
-
-#if INCLUDE_IOQUEUE_PERF_TEST
-    DO_TEST( ioqueue_perf_test() );
-#endif
+

+#if INCLUDE_IOQUEUE_PERF_TEST

+    DO_TEST( ioqueue_perf_test() );

+#endif

 
 #if INCLUDE_XML_TEST
     DO_TEST( xml_test() );
diff --git a/pjlib/src/pjlib-test/test.h b/pjlib/src/pjlib-test/test.h
index 8efe20d..f440851 100644
--- a/pjlib/src/pjlib-test/test.h
+++ b/pjlib/src/pjlib-test/test.h
@@ -8,7 +8,7 @@
 #define GROUP_LIBC                  0
 #define GROUP_OS                    0
 #define GROUP_DATA_STRUCTURE        0
-#define GROUP_NETWORK               0
+#define GROUP_NETWORK               1
 #define GROUP_EXTRA                 0
 
 #define INCLUDE_ERRNO_TEST          GROUP_LIBC
@@ -30,13 +30,13 @@
 #define INCLUDE_SOCK_PERF_TEST      GROUP_NETWORK
 #define INCLUDE_SELECT_TEST	    GROUP_NETWORK
 #define INCLUDE_UDP_IOQUEUE_TEST    GROUP_NETWORK
-#define INCLUDE_TCP_IOQUEUE_TEST    GROUP_NETWORK
+#define INCLUDE_TCP_IOQUEUE_TEST    GROUP_NETWORK

 #define INCLUDE_IOQUEUE_PERF_TEST   GROUP_NETWORK
 #define INCLUDE_XML_TEST	    GROUP_EXTRA
 
-
 #define INCLUDE_ECHO_SERVER         0
-#define INCLUDE_ECHO_CLIENT         1
+#define INCLUDE_ECHO_CLIENT         0
+

 
 #define ECHO_SERVER_MAX_THREADS     4
 #define ECHO_SERVER_START_PORT      65000
@@ -66,12 +66,16 @@
 extern int sock_perf_test(void);
 extern int select_test(void);
 extern int udp_ioqueue_test(void);
-extern int tcp_ioqueue_test(void);
+extern int tcp_ioqueue_test(void);

 extern int ioqueue_perf_test(void);
 extern int xml_test(void);
 
 extern int echo_server(void);
 extern int echo_client(int sock_type, const char *server, int port);
+

+extern int echo_srv_sync(void);

+extern int udp_echo_srv_ioqueue(void);

+extern int echo_srv_common_loop(pj_atomic_t *bytes_counter);

 
 extern pj_pool_factory *mem;
 
diff --git a/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c b/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c
new file mode 100644
index 0000000..5fe6d6f
--- /dev/null
+++ b/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c
@@ -0,0 +1,181 @@
+/* $Id$

+ */

+#include <pjlib.h>

+#include "test.h"

+

+static pj_ioqueue_key_t *key;

+static pj_atomic_t *total_bytes;

+

+struct op_key

+{

+    pj_ioqueue_op_key_t  op_key_;

+    struct op_key       *peer;

+    char                *buffer;

+    pj_size_t            size;

+    int                  is_pending;

+    pj_status_t          last_err;

+};

+

+static void on_read_complete(pj_ioqueue_key_t *key, 

+                             pj_ioqueue_op_key_t *op_key, 

+                             pj_ssize_t bytes_received)

+{

+    pj_status_t rc;

+    struct op_key *recv_rec = (struct op_key *)op_key;

+

+    for (;;) {

+        struct op_key *send_rec = recv_rec->peer;

+        recv_rec->is_pending = 0;

+

+        if (bytes_received < 0) {

+            PJ_LOG(3,("","...error receiving data, received=%d", 

+                      bytes_received));

+        } else if (bytes_received == 0) {

+            /* note: previous error, or write callback */

+        } else {

+            pj_atomic_add(total_bytes, bytes_received);

+

+            if (!send_rec->is_pending) {

+                pj_ssize_t sent = bytes_received;

+                pj_memcpy(send_rec->buffer, recv_rec->buffer, bytes_received);

+                rc = pj_ioqueue_send(key, &send_rec->op_key_, 

+                                     send_rec->buffer, &sent, 0);

+                send_rec->is_pending = (rc==PJ_EPENDING);

+

+                if (rc!=PJ_SUCCESS && rc!=PJ_EPENDING) {

+                    app_perror("...send error", rc);

+                }

+            }

+        }

+

+        if (!send_rec->is_pending) {

+            bytes_received = recv_rec->size;

+            rc = pj_ioqueue_recv(key, &recv_rec->op_key_, 

+                                 recv_rec->buffer, &bytes_received, 0);

+            recv_rec->is_pending = (rc==PJ_EPENDING);

+            if (rc == PJ_SUCCESS) {

+                /* fall through next loop. */

+            } else if (rc == PJ_EPENDING) {

+                /* quit callback. */

+                break;

+            } else {

+                /* error */

+                app_perror("...recv error", rc);

+                recv_rec->last_err = rc;

+

+                bytes_received = 0;

+                /* fall through next loop. */

+            }

+        } else {

+            /* recv will be done when write completion callback is called. */

+            break;

+        }

+    }

+}

+

+static void on_write_complete(pj_ioqueue_key_t *key, 

+                              pj_ioqueue_op_key_t *op_key, 

+                              pj_ssize_t bytes_sent)

+{

+    struct op_key *send_rec = (struct op_key*)op_key;

+

+    if (bytes_sent <= 0) {

+        pj_status_t rc = pj_get_netos_error();

+        app_perror("...send error", rc);

+    }

+

+    send_rec->is_pending = 0;

+    on_read_complete(key, &send_rec->peer->op_key_, 0);

+}

+

+static int worker_thread(void *arg)

+{

+    pj_ioqueue_t *ioqueue = arg;

+    struct op_key read_op, write_op;

+    char recv_buf[512], send_buf[512];

+    pj_ssize_t length;

+    pj_status_t rc;

+

+    read_op.peer = &write_op;

+    read_op.is_pending = 0;

+    read_op.last_err = 0;

+    read_op.buffer = recv_buf;

+    read_op.size = sizeof(recv_buf);

+    write_op.peer = &read_op;

+    write_op.is_pending = 0;

+    write_op.last_err = 0;

+    write_op.buffer = send_buf;

+    write_op.size = sizeof(send_buf);

+

+    length = sizeof(recv_buf);

+    rc = pj_ioqueue_recv(key, &read_op.op_key_, recv_buf, &length, 0);

+    if (rc == PJ_SUCCESS) {

+        read_op.is_pending = 1;

+        on_read_complete(key, &read_op.op_key_, length);

+    }

+    

+    for (;;) {

+        pj_time_val timeout;

+        timeout.sec = 0; timeout.msec = 10;

+        rc = pj_ioqueue_poll(ioqueue, &timeout);

+    }

+}

+

+int udp_echo_srv_ioqueue(void)

+{

+    pj_pool_t *pool;

+    pj_sock_t sock;

+    pj_ioqueue_t *ioqueue;

+    pj_ioqueue_callback callback;

+    int i;

+    pj_thread_t *thread[ECHO_SERVER_MAX_THREADS];

+    pj_status_t rc;

+

+    pj_memset(&callback, 0, sizeof(callback));

+    callback.on_read_complete = &on_read_complete;

+    callback.on_write_complete = &on_write_complete;

+

+    pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);

+    if (!pool)

+        return -10;

+

+    rc = pj_ioqueue_create(pool, 2, &ioqueue);

+    if (rc != PJ_SUCCESS) {

+        app_perror("...pj_ioqueue_create error", rc);

+        return -20;

+    }

+

+    rc = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, 

+                    ECHO_SERVER_START_PORT, &sock);

+    if (rc != PJ_SUCCESS) {

+        app_perror("...app_socket error", rc);

+        return -30;

+    }

+

+    rc = pj_ioqueue_register_sock(pool, ioqueue, sock, NULL,

+                                  &callback, &key);

+    if (rc != PJ_SUCCESS) {

+        app_perror("...error registering socket", rc);

+        return -40;

+    }

+

+    rc = pj_atomic_create(pool, 0, &total_bytes);

+    if (rc != PJ_SUCCESS) {

+        app_perror("...error creating atomic variable", rc);

+        return -45;

+    }

+

+    for (i=0; i<ECHO_SERVER_MAX_THREADS; ++i) {

+        rc = pj_thread_create(pool, NULL, &worker_thread, ioqueue,

+                              PJ_THREAD_DEFAULT_STACK_SIZE, 0,

+                              &thread[i]);

+        if (rc != PJ_SUCCESS) {

+            app_perror("...create thread error", rc);

+            return -50;

+        }

+    }

+

+    echo_srv_common_loop(total_bytes);

+

+    return 0;

+}

diff --git a/pjlib/src/pjlib-test/udp_echo_srv_sync.c b/pjlib/src/pjlib-test/udp_echo_srv_sync.c
index 0e73b13..19ee702 100644
--- a/pjlib/src/pjlib-test/udp_echo_srv_sync.c
+++ b/pjlib/src/pjlib-test/udp_echo_srv_sync.c
@@ -8,7 +8,7 @@
 static int worker_thread(void *arg)
 {
     pj_sock_t    sock = (pj_sock_t)arg;
-    char         buf[1516];
+    char         buf[512];
     pj_status_t  last_recv_err = PJ_SUCCESS, last_write_err = PJ_SUCCESS;
 
     for (;;) {
@@ -48,9 +48,6 @@
     pj_sock_t sock;
     pj_thread_t *thread[ECHO_SERVER_MAX_THREADS];
     pj_status_t rc;

-    pj_highprec_t last_received, avg_bw, highest_bw;

-    pj_time_val last_print;

-    unsigned count;

     int i;
 
     pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
@@ -83,25 +80,36 @@
                   ECHO_SERVER_MAX_THREADS, ECHO_SERVER_START_PORT));
     PJ_LOG(3,("", "...Press Ctrl-C to abort"));
 
+    echo_srv_common_loop(total_bytes);

+    return 0;
+}
+
+
+int echo_srv_common_loop(pj_atomic_t *bytes_counter)

+{

+    pj_highprec_t last_received, avg_bw, highest_bw;

+    pj_time_val last_print;

+    unsigned count;

+

     last_received = 0;

     pj_gettimeofday(&last_print);

     avg_bw = highest_bw = 0;

-    count = 0;
-
+    count = 0;

+

     for (;;) {

-        pj_highprec_t received, cur_received, bw;
+        pj_highprec_t received, cur_received, bw;

         unsigned msec;

         pj_time_val now, duration;

 

         pj_thread_sleep(1000);

-
-        received = cur_received = pj_atomic_get(total_bytes);
+

+        received = cur_received = pj_atomic_get(bytes_counter);

         cur_received = cur_received - last_received;

-
+

         pj_gettimeofday(&now);

         duration = now;

         PJ_TIME_VAL_SUB(duration, last_print);

-        msec = PJ_TIME_VAL_MSEC(duration);
+        msec = PJ_TIME_VAL_MSEC(duration);

         

         bw = cur_received;

         pj_highprec_mul(bw, 1000);

@@ -113,13 +121,13 @@
         avg_bw = avg_bw + bw;

         count++;

 

-        PJ_LOG(3,("", "Synchronous UDP (%d threads): %u KB/s  (avg=%u KB/s) %s", 
-                  ECHO_SERVER_MAX_THREADS, 
-                  (unsigned)(bw / 1000),
-                  (unsigned)(avg_bw / count / 1000),
-                  (count==20 ? "<ses avg>" : "")));
-
-        if (count==20) {
+        PJ_LOG(3,("", "Synchronous UDP (%d threads): %u KB/s  (avg=%u KB/s) %s", 

+                  ECHO_SERVER_MAX_THREADS, 

+                  (unsigned)(bw / 1000),

+                  (unsigned)(avg_bw / count / 1000),

+                  (count==20 ? "<ses avg>" : "")));

+

+        if (count==20) {

             if (avg_bw/count > highest_bw)

                 highest_bw = avg_bw/count;

 

@@ -127,9 +135,9 @@
             avg_bw = 0;

 

             PJ_LOG(3,("", "Highest average bandwidth=%u KB/s",

-                          (unsigned)(highest_bw/1000)));
-        }
-    }
-}
-
-
+                          (unsigned)(highest_bw/1000)));

+        }

+    }

+}

+

+