Tested new ioqueue framework on Linux with select and epoll

git-svn-id: https://svn.pjsip.org/repos/pjproject/main@14 74dad513-b988-da41-8d7b-12977e46ad98
diff --git a/pjlib/src/pj/ioqueue_epoll.c b/pjlib/src/pj/ioqueue_epoll.c
index 24f9bfb..aa01253 100644
--- a/pjlib/src/pj/ioqueue_epoll.c
+++ b/pjlib/src/pj/ioqueue_epoll.c
@@ -1,5 +1,4 @@
 /* $Id$
- *
  */
 /*
  * ioqueue_epoll.c
@@ -30,7 +29,7 @@
 
 #   define epoll_data		data.ptr
 #   define epoll_data_type	void*
-#   define ioctl_val_type	unsigned long*
+#   define ioctl_val_type	unsigned long
 #   define getsockopt_val_ptr	int*
 #   define os_getsockopt	getsockopt
 #   define os_ioctl		ioctl
@@ -126,51 +125,20 @@
 
 #define THIS_FILE   "ioq_epoll"
 
-#define PJ_IOQUEUE_IS_READ_OP(op)   ((op & PJ_IOQUEUE_OP_READ) || \
-                                     (op & PJ_IOQUEUE_OP_RECV) || \
-                                     (op & PJ_IOQUEUE_OP_RECV_FROM))
-#define PJ_IOQUEUE_IS_WRITE_OP(op)  ((op & PJ_IOQUEUE_OP_WRITE) || \
-                                     (op & PJ_IOQUEUE_OP_SEND) || \
-                                     (op & PJ_IOQUEUE_OP_SEND_TO))
-
-
-#if PJ_HAS_TCP
-#  define PJ_IOQUEUE_IS_ACCEPT_OP(op)	(op & PJ_IOQUEUE_OP_ACCEPT)
-#  define PJ_IOQUEUE_IS_CONNECT_OP(op)	(op & PJ_IOQUEUE_OP_CONNECT)
-#else
-#  define PJ_IOQUEUE_IS_ACCEPT_OP(op)	0
-#  define PJ_IOQUEUE_IS_CONNECT_OP(op)	0
-#endif
-
-
 //#define TRACE_(expr) PJ_LOG(3,expr)
 #define TRACE_(expr)
 
+/*
+ * Include common ioqueue abstraction.
+ */
+#include "ioqueue_common_abs.h"
 
 /*
  * This describes each key.
  */
 struct pj_ioqueue_key_t
 {
-    PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t)
-    pj_sock_t		    fd;
-    pj_ioqueue_operation_e  op;
-    void		   *user_data;
-    pj_ioqueue_callback	    cb;
-
-    void		   *rd_buf;
-    unsigned                rd_flags;
-    pj_size_t		    rd_buflen;
-    void		   *wr_buf;
-    pj_size_t		    wr_buflen;
-
-    pj_sockaddr_t	   *rmt_addr;
-    int			   *rmt_addrlen;
-
-    pj_sockaddr_t	   *local_addr;
-    int			   *local_addrlen;
-
-    pj_sock_t		   *accept_fd;
+    DECLARE_COMMON_KEY
 };
 
 /*
@@ -178,13 +146,18 @@
  */
 struct pj_ioqueue_t
 {
-    pj_lock_t          *lock;
-    pj_bool_t           auto_delete_lock;
+    DECLARE_COMMON_IOQUEUE
+
     unsigned		max, count;
     pj_ioqueue_key_t	hlist;
     int			epfd;
 };
 
+/* Include implementation for common abstraction after we declare
+ * pj_ioqueue_key_t and pj_ioqueue_t.
+ */
+#include "ioqueue_common_abs.c"
+
 /*
  * pj_ioqueue_create()
  *
@@ -192,37 +165,45 @@
  */
 PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, 
                                        pj_size_t max_fd,
-                                       int max_threads,
                                        pj_ioqueue_t **p_ioqueue)
 {
-    pj_ioqueue_t *ioque;
+    pj_ioqueue_t *ioqueue;
     pj_status_t rc;
+    pj_lock_t *lock;
 
-    PJ_UNUSED_ARG(max_threads);
+    /* Check that arguments are valid. */
+    PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && 
+                     max_fd > 0, PJ_EINVAL);
 
-    if (max_fd > PJ_IOQUEUE_MAX_HANDLES) {
-        pj_assert(!"max_fd too large");
-	return PJ_EINVAL;
-    }
+    /* Check that size of pj_ioqueue_op_key_t is sufficient */
+    PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
+                     sizeof(union operation_key), PJ_EBUG);
 
-    ioque = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
-    ioque->max = max_fd;
-    ioque->count = 0;
-    pj_list_init(&ioque->hlist);
+    ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
 
-    rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioque->lock);
+    ioqueue_init(ioqueue);
+
+    ioqueue->max = max_fd;
+    ioqueue->count = 0;
+    pj_list_init(&ioqueue->hlist);
+
+    rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
     if (rc != PJ_SUCCESS)
 	return rc;
 
-    ioque->auto_delete_lock = PJ_TRUE;
-    ioque->epfd = os_epoll_create(max_fd);
-    if (ioque->epfd < 0) {
+    rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
+    if (rc != PJ_SUCCESS)
+        return rc;
+
+    ioqueue->epfd = os_epoll_create(max_fd);
+    if (ioqueue->epfd < 0) {
+	ioqueue_destroy(ioqueue);
 	return PJ_RETURN_OS_ERROR(pj_get_native_os_error());
     }
     
-    PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioque));
+    PJ_LOG(4, ("pjlib", "epoll I/O Queue created (%p)", ioqueue));
 
-    *p_ioqueue = ioque;
+    *p_ioqueue = ioqueue;
     return PJ_SUCCESS;
 }
 
@@ -231,47 +212,24 @@
  *
  * Destroy ioqueue.
  */
-PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioque)
+PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
 {
-    PJ_ASSERT_RETURN(ioque, PJ_EINVAL);
-    PJ_ASSERT_RETURN(ioque->epfd > 0, PJ_EINVALIDOP);
+    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
+    PJ_ASSERT_RETURN(ioqueue->epfd > 0, PJ_EINVALIDOP);
 
-    pj_lock_acquire(ioque->lock);
-    os_close(ioque->epfd);
-    ioque->epfd = 0;
-    if (ioque->auto_delete_lock)
-        pj_lock_destroy(ioque->lock);
-    
-    return PJ_SUCCESS;
+    pj_lock_acquire(ioqueue->lock);
+    os_close(ioqueue->epfd);
+    ioqueue->epfd = 0;
+    return ioqueue_destroy(ioqueue);
 }
 
 /*
- * pj_ioqueue_set_lock()
- */
-PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque, 
-					 pj_lock_t *lock,
-					 pj_bool_t auto_delete )
-{
-    PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL);
-
-    if (ioque->auto_delete_lock) {
-        pj_lock_destroy(ioque->lock);
-    }
-
-    ioque->lock = lock;
-    ioque->auto_delete_lock = auto_delete;
-
-    return PJ_SUCCESS;
-}
-
-
-/*
  * pj_ioqueue_register_sock()
  *
  * Register a socket to ioqueue.
  */
 PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
-					      pj_ioqueue_t *ioque,
+					      pj_ioqueue_t *ioqueue,
 					      pj_sock_t sock,
 					      void *user_data,
 					      const pj_ioqueue_callback *cb,
@@ -283,12 +241,12 @@
     int status;
     pj_status_t rc = PJ_SUCCESS;
     
-    PJ_ASSERT_RETURN(pool && ioque && sock != PJ_INVALID_SOCKET &&
+    PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
                      cb && p_key, PJ_EINVAL);
 
-    pj_lock_acquire(ioque->lock);
+    pj_lock_acquire(ioqueue->lock);
 
-    if (ioque->count >= ioque->max) {
+    if (ioqueue->count >= ioqueue->max) {
         rc = PJ_ETOOMANY;
 	TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: too many files"));
 	goto on_return;
@@ -305,16 +263,19 @@
 
     /* Create key. */
     key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
-    key->fd = sock;
-    key->user_data = user_data;
-    pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
+    rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
+    if (rc != PJ_SUCCESS) {
+	key = NULL;
+	goto on_return;
+    }
 
     /* os_epoll_ctl. */
     ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;
     ev.epoll_data = (epoll_data_type)key;
-    status = os_epoll_ctl(ioque->epfd, EPOLL_CTL_ADD, sock, &ev);
+    status = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_ADD, sock, &ev);
     if (status < 0) {
 	rc = pj_get_os_error();
+	key = NULL;
 	TRACE_((THIS_FILE, 
                 "pj_ioqueue_register_sock error: os_epoll_ctl rc=%d", 
                 status));
@@ -322,12 +283,12 @@
     }
     
     /* Register */
-    pj_list_insert_before(&ioque->hlist, key);
-    ++ioque->count;
+    pj_list_insert_before(&ioqueue->hlist, key);
+    ++ioqueue->count;
 
 on_return:
     *p_key = key;
-    pj_lock_release(ioque->lock);
+    pj_lock_release(ioqueue->lock);
     
     return rc;
 }
@@ -337,179 +298,116 @@
  *
  * Unregister handle from ioqueue.
  */
-PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque,
-					   pj_ioqueue_key_t *key)
+PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
 {
+    pj_ioqueue_t *ioqueue;
     struct epoll_event ev;
     int status;
     
-    PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL);
+    PJ_ASSERT_RETURN(key != NULL, PJ_EINVAL);
 
-    pj_lock_acquire(ioque->lock);
+    ioqueue = key->ioqueue;
+    pj_lock_acquire(ioqueue->lock);
 
-    pj_assert(ioque->count > 0);
-    --ioque->count;
+    pj_assert(ioqueue->count > 0);
+    --ioqueue->count;
     pj_list_erase(key);
 
     ev.events = 0;
     ev.epoll_data = (epoll_data_type)key;
-    status = os_epoll_ctl( ioque->epfd, EPOLL_CTL_DEL, key->fd, &ev);
+    status = os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_DEL, key->fd, &ev);
     if (status != 0) {
 	pj_status_t rc = pj_get_os_error();
-	pj_lock_release(ioque->lock);
+	pj_lock_release(ioqueue->lock);
 	return rc;
     }
 
-    pj_lock_release(ioque->lock);
+    pj_lock_release(ioqueue->lock);
+
+    /* Destroy the key. */
+    ioqueue_destroy_key(key);
+
     return PJ_SUCCESS;
 }
 
-/*
- * pj_ioqueue_get_user_data()
- *
- * Obtain value associated with a key.
+/* ioqueue_remove_from_set()
+ * This function is called from ioqueue_dispatch_event() to instruct
+ * the ioqueue to remove the specified descriptor from ioqueue's descriptor
+ * set for the specified event.
  */
-PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
+static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
+                                     pj_sock_t fd, 
+                                     enum ioqueue_event_type event_type)
 {
-    PJ_ASSERT_RETURN(key != NULL, NULL);
-    return key->user_data;
 }
 
+/*
+ * ioqueue_add_to_set()
+ * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
+ * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
+ * set for the specified event.
+ */
+static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
+                                pj_sock_t fd,
+                                enum ioqueue_event_type event_type )
+{
+}
 
 /*
  * pj_ioqueue_poll()
  *
  */
-PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout)
+PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
 {
     int i, count, processed;
-    struct epoll_event events[16];
+    struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
     int msec;
+    struct queue {
+	pj_ioqueue_key_t	*key;
+	enum ioqueue_event_type	 event_type;
+    } queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
     
     PJ_CHECK_STACK();
 
     msec = timeout ? PJ_TIME_VAL_MSEC(*timeout) : 9000;
     
-    count = os_epoll_wait( ioque->epfd, events, PJ_ARRAY_SIZE(events), msec);
+    count = os_epoll_wait( ioqueue->epfd, events, PJ_ARRAY_SIZE(events), msec);
     if (count <= 0)
 	return count;
 
     /* Lock ioqueue. */
-    pj_lock_acquire(ioque->lock);
+    pj_lock_acquire(ioqueue->lock);
 
-    processed = 0;
-
-    for (i=0; i<count; ++i) {
+    for (processed=0, i=0; i<count; ++i) {
 	pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type)
 				events[i].epoll_data;
-	pj_status_t rc;
 
 	/*
-	 * Check for completion of read operations.
+	 * Check readability.
 	 */
-	if ((events[i].events & EPOLLIN) && (PJ_IOQUEUE_IS_READ_OP(h->op))) {
-	    pj_ssize_t bytes_read = h->rd_buflen;
-
-	    if ((h->op & PJ_IOQUEUE_OP_RECV_FROM)) {
-	        rc = pj_sock_recvfrom( h->fd, h->rd_buf, &bytes_read, 0,
-				       h->rmt_addr, h->rmt_addrlen);
-	    } else if ((h->op & PJ_IOQUEUE_OP_RECV)) {
-	        rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0);
-	    } else {
-		bytes_read = os_read( h->fd, h->rd_buf, bytes_read);
-		rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
-	    }
-	    
-	    if (rc != PJ_SUCCESS) {
-	        bytes_read = -rc;
-	    }
-
-	    h->op &= ~(PJ_IOQUEUE_OP_READ | PJ_IOQUEUE_OP_RECV | 
-		       PJ_IOQUEUE_OP_RECV_FROM);
-
-	    /* Call callback. */
-	    (*h->cb.on_read_complete)(h, bytes_read);
-
-	    ++processed;
-	}
-	/*
-	 * Check for completion of accept() operation.
-	 */
-	else if ((events[i].events & EPOLLIN) &&
-		 (h->op & PJ_IOQUEUE_OP_ACCEPT)) 
-	{
-	    /* accept() must be the only operation specified on 
-	     * server socket 
-	     */
-	    pj_assert( h->op == PJ_IOQUEUE_OP_ACCEPT);
-
-	    rc = pj_sock_accept( h->fd, h->accept_fd, 
-			         h->rmt_addr, h->rmt_addrlen);
-	    if (rc==PJ_SUCCESS && h->local_addr) {
-		rc = pj_sock_getsockname(*h->accept_fd, h->local_addr, 
-				          h->local_addrlen);
-	    }
-
-	    h->op &= ~(PJ_IOQUEUE_OP_ACCEPT);
-
-	    /* Call callback. */
-	    (*h->cb.on_accept_complete)(h, *h->accept_fd, rc);
-	    
+	if ((events[i].events & EPOLLIN) && 
+	    (key_has_pending_read(h) || key_has_pending_accept(h))) {
+	    queue[processed].key = h;
+	    queue[processed].event_type = READABLE_EVENT;
 	    ++processed;
 	}
 
 	/*
-	 * Check for completion of write operations.
+	 * Check for writeability.
 	 */
-	if ((events[i].events & EPOLLOUT) && PJ_IOQUEUE_IS_WRITE_OP(h->op)) {
-	    /* Completion of write(), send(), or sendto() operation. */
-
-	    /* Clear operation. */
-	    h->op &= ~(PJ_IOQUEUE_OP_WRITE | PJ_IOQUEUE_OP_SEND | 
-                       PJ_IOQUEUE_OP_SEND_TO);
-
-	    /* Call callback. */
-	    /* All data must have been sent? */
-	    (*h->cb.on_write_complete)(h, h->wr_buflen);
-
+	if ((events[i].events & EPOLLOUT) && key_has_pending_write(h)) {
+	    queue[processed].key = h;
+	    queue[processed].event_type = WRITEABLE_EVENT;
 	    ++processed;
 	}
+
 #if PJ_HAS_TCP
 	/*
 	 * Check for completion of connect() operation.
 	 */
-	else if ((events[i].events & EPOLLOUT) && 
-		 (h->op & PJ_IOQUEUE_OP_CONNECT)) 
-	{
-	    /* Completion of connect() operation */
-	    pj_ssize_t bytes_transfered;
-
-	    /* from connect(2): 
-		* On Linux, use getsockopt to read the SO_ERROR option at
-		* level SOL_SOCKET to determine whether connect() completed
-		* successfully (if SO_ERROR is zero).
-		*/
-	    int value;
-	    socklen_t vallen = sizeof(value);
-	    int gs_rc = os_getsockopt(h->fd, SOL_SOCKET, SO_ERROR, 
-                                      (getsockopt_val_ptr)&value, &vallen);
-	    if (gs_rc != 0) {
-		/* Argh!! What to do now??? 
-		 * Just indicate that the socket is connected. The
-		 * application will get error as soon as it tries to use
-		 * the socket to send/receive.
-		 */
-		bytes_transfered = 0;
-	    } else {
-                bytes_transfered = value;
-	    }
-
-	    /* Clear operation. */
-	    h->op &= (~PJ_IOQUEUE_OP_CONNECT);
-
-	    /* Call callback. */
-	    (*h->cb.on_connect_complete)(h, bytes_transfered);
-
+	if ((events[i].events & EPOLLOUT) && (h->connecting)) {
+	    queue[processed].key = h;
+	    queue[processed].event_type = WRITEABLE_EVENT;
 	    ++processed;
 	}
 #endif /* PJ_HAS_TCP */
@@ -517,321 +415,32 @@
 	/*
 	 * Check for error condition.
 	 */
-	if (events[i].events & EPOLLERR) {
-	    if (h->op & PJ_IOQUEUE_OP_CONNECT) {
-		h->op &= ~PJ_IOQUEUE_OP_CONNECT;
-
-		/* Call callback. */
-		(*h->cb.on_connect_complete)(h, -1);
-
-		++processed;
-	    }
+	if (events[i].events & EPOLLERR && (h->connecting)) {
+	    queue[processed].key = h;
+	    queue[processed].event_type = EXCEPTION_EVENT;
+	    ++processed;
 	}
     }
-    
-    pj_lock_release(ioque->lock);
+    pj_lock_release(ioqueue->lock);
+
+    /* Now process the events. */
+    for (i=0; i<processed; ++i) {
+	switch (queue[i].event_type) {
+        case READABLE_EVENT:
+            ioqueue_dispatch_read_event(ioqueue, queue[i].key);
+            break;
+        case WRITEABLE_EVENT:
+            ioqueue_dispatch_write_event(ioqueue, queue[i].key);
+            break;
+        case EXCEPTION_EVENT:
+            ioqueue_dispatch_exception_event(ioqueue, queue[i].key);
+            break;
+        case NO_EVENT:
+            pj_assert(!"Invalid event!");
+            break;
+        }
+    }
 
     return processed;
 }
 
-/*
- * pj_ioqueue_read()
- *
- * Start asynchronous read from the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque,
-			             pj_ioqueue_key_t *key,
-			             void *buffer,
-			             pj_size_t buflen)
-{
-    PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
-    PJ_CHECK_STACK();
-
-    /* For consistency with other ioqueue implementation, we would reject 
-     * if descriptor has already been submitted for reading before.
-     */
-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
-                     PJ_EBUSY);
-
-    pj_lock_acquire(ioque->lock);
-
-    key->op |= PJ_IOQUEUE_OP_READ;
-    key->rd_flags = 0;
-    key->rd_buf = buffer;
-    key->rd_buflen = buflen;
-
-    pj_lock_release(ioque->lock);
-    return PJ_EPENDING;
-}
-
-
-/*
- * pj_ioqueue_recv()
- *
- * Start asynchronous recv() from the socket.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_t *ioque,
-				      pj_ioqueue_key_t *key,
-				      void *buffer,
-				      pj_size_t buflen,
-				      unsigned flags )
-{
-    PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
-    PJ_CHECK_STACK();
-
-    /* For consistency with other ioqueue implementation, we would reject 
-     * if descriptor has already been submitted for reading before.
-     */
-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
-                     PJ_EBUSY);
-
-    pj_lock_acquire(ioque->lock);
-
-    key->op |= PJ_IOQUEUE_OP_RECV;
-    key->rd_buf = buffer;
-    key->rd_buflen = buflen;
-    key->rd_flags = flags;
-
-    pj_lock_release(ioque->lock);
-    return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_recvfrom()
- *
- * Start asynchronous recvfrom() from the socket.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque,
-				         pj_ioqueue_key_t *key,
-				         void *buffer,
-				         pj_size_t buflen,
-                                         unsigned flags,
-				         pj_sockaddr_t *addr,
-				         int *addrlen)
-{
-    PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
-    PJ_CHECK_STACK();
-
-    /* For consistency with other ioqueue implementation, we would reject 
-     * if descriptor has already been submitted for reading before.
-     */
-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
-                     PJ_EBUSY);
-
-    pj_lock_acquire(ioque->lock);
-
-    key->op |= PJ_IOQUEUE_OP_RECV_FROM;
-    key->rd_buf = buffer;
-    key->rd_buflen = buflen;
-    key->rd_flags = flags;
-    key->rmt_addr = addr;
-    key->rmt_addrlen = addrlen;
-
-    pj_lock_release(ioque->lock);
-    return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_write()
- *
- * Start asynchronous write() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque,
-			              pj_ioqueue_key_t *key,
-			              const void *data,
-			              pj_size_t datalen)
-{
-    pj_status_t rc;
-    pj_ssize_t sent;
-
-    PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
-    PJ_CHECK_STACK();
-
-    /* For consistency with other ioqueue implementation, we would reject 
-     * if descriptor has already been submitted for writing before.
-     */
-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
-                     PJ_EBUSY);
-
-    sent = datalen;
-    /* sent would be -1 after pj_sock_send() if it returns error. */
-    rc = pj_sock_send(key->fd, data, &sent, 0);
-    if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
-        return rc;
-    }
-
-    pj_lock_acquire(ioque->lock);
-
-    key->op |= PJ_IOQUEUE_OP_WRITE;
-    key->wr_buf = NULL;
-    key->wr_buflen = datalen;
-
-    pj_lock_release(ioque->lock);
-
-    return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_send()
- *
- * Start asynchronous send() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque,
-			             pj_ioqueue_key_t *key,
-			             const void *data,
-			             pj_size_t datalen,
-                                     unsigned flags)
-{
-    pj_status_t rc;
-    pj_ssize_t sent;
-
-    PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
-    PJ_CHECK_STACK();
-
-    /* For consistency with other ioqueue implementation, we would reject 
-     * if descriptor has already been submitted for writing before.
-     */
-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
-                     PJ_EBUSY);
-
-    sent = datalen;
-    /* sent would be -1 after pj_sock_send() if it returns error. */
-    rc = pj_sock_send(key->fd, data, &sent, flags);
-    if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
-        return rc;
-    }
-
-    pj_lock_acquire(ioque->lock);
-
-    key->op |= PJ_IOQUEUE_OP_SEND;
-    key->wr_buf = NULL;
-    key->wr_buflen = datalen;
-
-    pj_lock_release(ioque->lock);
-
-    return PJ_EPENDING;
-}
-
-
-/*
- * pj_ioqueue_sendto()
- *
- * Start asynchronous write() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque,
-			               pj_ioqueue_key_t *key,
-			               const void *data,
-			               pj_size_t datalen,
-                                       unsigned flags,
-			               const pj_sockaddr_t *addr,
-			               int addrlen)
-{
-    pj_status_t rc;
-    pj_ssize_t sent;
-
-    PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
-    PJ_CHECK_STACK();
-
-    /* For consistency with other ioqueue implementation, we would reject 
-     * if descriptor has already been submitted for writing before.
-     */
-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
-                      (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
-                     PJ_EBUSY);
-
-    sent = datalen;
-    /* sent would be -1 after pj_sock_sendto() if it returns error. */
-    rc = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
-    if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK))  {
-        return rc;
-    }
-
-    pj_lock_acquire(ioque->lock);
-
-    key->op |= PJ_IOQUEUE_OP_SEND_TO;
-    key->wr_buf = NULL;
-    key->wr_buflen = datalen;
-
-    pj_lock_release(ioque->lock);
-    return PJ_EPENDING;
-}
-
-#if PJ_HAS_TCP
-/*
- * Initiate overlapped accept() operation.
- */
-PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue,
-			       pj_ioqueue_key_t *key,
-			       pj_sock_t *new_sock,
-			       pj_sockaddr_t *local,
-			       pj_sockaddr_t *remote,
-			       int *addrlen)
-{
-    /* check parameters. All must be specified! */
-    pj_assert(ioqueue && key && new_sock);
-
-    /* Server socket must have no other operation! */
-    pj_assert(key->op == 0);
-    
-    pj_lock_acquire(ioqueue->lock);
-
-    key->op = PJ_IOQUEUE_OP_ACCEPT;
-    key->accept_fd = new_sock;
-    key->rmt_addr = remote;
-    key->rmt_addrlen = addrlen;
-    key->local_addr = local;
-    key->local_addrlen = addrlen;   /* use same addr. as rmt_addrlen */
-
-    pj_lock_release(ioqueue->lock);
-    return PJ_EPENDING;
-}
-
-/*
- * Initiate overlapped connect() operation (well, it's non-blocking actually,
- * since there's no overlapped version of connect()).
- */
-PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue,
-					pj_ioqueue_key_t *key,
-					const pj_sockaddr_t *addr,
-					int addrlen )
-{
-    pj_status_t rc;
-    
-    /* check parameters. All must be specified! */
-    PJ_ASSERT_RETURN(ioqueue && key && addr && addrlen, PJ_EINVAL);
-
-    /* Connecting socket must have no other operation! */
-    PJ_ASSERT_RETURN(key->op == 0, PJ_EBUSY);
-    
-    rc = pj_sock_connect(key->fd, addr, addrlen);
-    if (rc == PJ_SUCCESS) {
-	/* Connected! */
-	return PJ_SUCCESS;
-    } else {
-	if (rc == PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) || 
-            rc == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) 
-        {
-	    /* Pending! */
-	    pj_lock_acquire(ioqueue->lock);
-	    key->op = PJ_IOQUEUE_OP_CONNECT;
-	    pj_lock_release(ioqueue->lock);
-	    return PJ_EPENDING;
-	} else {
-	    /* Error! */
-	    return rc;
-	}
-    }
-}
-#endif	/* PJ_HAS_TCP */
-