Added suppor /and fix things for SunOS port

git-svn-id: https://svn.pjsip.org/repos/pjproject/main@2 74dad513-b988-da41-8d7b-12977e46ad98
diff --git a/pjlib/src/pj/ioqueue_epoll.c b/pjlib/src/pj/ioqueue_epoll.c
index 7bbfe13..b41d05f 100644
--- a/pjlib/src/pj/ioqueue_epoll.c
+++ b/pjlib/src/pj/ioqueue_epoll.c
@@ -1,852 +1,852 @@
-/* $Header: /pjproject-0.3/pjlib/src/pj/ioqueue_epoll.c 4     10/29/05 10:27p Bennylp $ */

-/* 

- * $Log: /pjproject-0.3/pjlib/src/pj/ioqueue_epoll.c $

- * 

- * 4     10/29/05 10:27p Bennylp

- * Fixed misc warnings.

- * 

- * 3     10/29/05 11:49a Bennylp

- * Fixed warnings.

- * 

- * 2     10/29/05 11:31a Bennylp

- * Changed accept and lock.

- * 

- * 1     10/17/05 10:49p Bennylp

- * Created.

- * 

- */

-

-/*

- * ioqueue_epoll.c

- *

- * This is the implementation of IOQueue framework using /dev/epoll

- * API in _both_ Linux user-mode and kernel-mode.

- */

-

-#include <pj/ioqueue.h>

-#include <pj/os.h>

-#include <pj/lock.h>

-#include <pj/log.h>

-#include <pj/list.h>

-#include <pj/pool.h>

-#include <pj/string.h>

-#include <pj/assert.h>

-#include <pj/errno.h>

-#include <pj/sock.h>

-#include <pj/compat/socket.h>

-

-#if !defined(PJ_LINUX_KERNEL) || PJ_LINUX_KERNEL==0

-    /*

-     * Linux user mode

-     */

-#   include <sys/epoll.h>

-#   include <errno.h>

-#   include <unistd.h>

-

-#   define epoll_data		data.ptr

-#   define epoll_data_type	void*

-#   define ioctl_val_type	unsigned long*

-#   define getsockopt_val_ptr	int*

-#   define os_getsockopt	getsockopt

-#   define os_ioctl		ioctl

-#   define os_read		read

-#   define os_close		close

-#   define os_epoll_create	epoll_create

-#   define os_epoll_ctl		epoll_ctl

-#   define os_epoll_wait	epoll_wait

-#else

-    /*

-     * Linux kernel mode.

-     */

-#   include <linux/config.h>

-#   include <linux/version.h>

-#   if defined(MODVERSIONS)

-#	include <linux/modversions.h>

-#   endif

-#   include <linux/kernel.h>

-#   include <linux/poll.h>

-#   include <linux/eventpoll.h>

-#   include <linux/syscalls.h>

-#   include <linux/errno.h>

-#   include <linux/unistd.h>

-#   include <asm/ioctls.h>

-    enum EPOLL_EVENTS

-    {

-	EPOLLIN = 0x001,

-	EPOLLOUT = 0x004,

-	EPOLLERR = 0x008,

-    };

-#   define os_epoll_create		sys_epoll_create

-    static int os_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

-    {

-	long rc;

-	mm_segment_t oldfs = get_fs();

-	set_fs(KERNEL_DS);

-	rc = sys_epoll_ctl(epfd, op, fd, event);

-	set_fs(oldfs);

-	if (rc) {

-	    errno = -rc;

-	    return -1;

-	} else {

-	    return 0;

-	}

-    }

-    static int os_epoll_wait(int epfd, struct epoll_event *events,

-			  int maxevents, int timeout)

-    {

-	int count;

-	mm_segment_t oldfs = get_fs();

-	set_fs(KERNEL_DS);

-	count = sys_epoll_wait(epfd, events, maxevents, timeout);

-	set_fs(oldfs);

-	return count;

-    }

-#   define os_close		sys_close

-#   define os_getsockopt	pj_sock_getsockopt

-    static int os_read(int fd, void *buf, size_t len)

-    {

-	long rc;

-	mm_segment_t oldfs = get_fs();

-	set_fs(KERNEL_DS);

-	rc = sys_read(fd, buf, len);

-	set_fs(oldfs);

-	if (rc) {

-	    errno = -rc;

-	    return -1;

-	} else {

-	    return 0;

-	}

-    }

-#   define socklen_t		unsigned

-#   define ioctl_val_type	unsigned long

-    int ioctl(int fd, int opt, ioctl_val_type value);

-    static int os_ioctl(int fd, int opt, ioctl_val_type value)

-    {

-	int rc;

-        mm_segment_t oldfs = get_fs();

-	set_fs(KERNEL_DS);

-	rc = ioctl(fd, opt, value);

-	set_fs(oldfs);

-	if (rc < 0) {

-	    errno = -rc;

-	    return rc;

-	} else

-	    return rc;

-    }

-#   define getsockopt_val_ptr	char*

-

-#   define epoll_data		data

-#   define epoll_data_type	__u32

-#endif

-

-#define THIS_FILE   "ioq_epoll"

-

-#define PJ_IOQUEUE_IS_READ_OP(op)   ((op & PJ_IOQUEUE_OP_READ) || \

-                                     (op & PJ_IOQUEUE_OP_RECV) || \

-                                     (op & PJ_IOQUEUE_OP_RECV_FROM))

-#define PJ_IOQUEUE_IS_WRITE_OP(op)  ((op & PJ_IOQUEUE_OP_WRITE) || \

-                                     (op & PJ_IOQUEUE_OP_SEND) || \

-                                     (op & PJ_IOQUEUE_OP_SEND_TO))

-

-

-#if PJ_HAS_TCP

-#  define PJ_IOQUEUE_IS_ACCEPT_OP(op)	(op & PJ_IOQUEUE_OP_ACCEPT)

-#  define PJ_IOQUEUE_IS_CONNECT_OP(op)	(op & PJ_IOQUEUE_OP_CONNECT)

-#else

-#  define PJ_IOQUEUE_IS_ACCEPT_OP(op)	0

-#  define PJ_IOQUEUE_IS_CONNECT_OP(op)	0

-#endif

-

-

-//#define TRACE_(expr) PJ_LOG(3,expr)

-#define TRACE_(expr)

-

-

-/*

- * This describes each key.

- */

-struct pj_ioqueue_key_t

-{

-    PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t)

-    pj_sock_t		    fd;

-    pj_ioqueue_operation_e  op;

-    void		   *user_data;

-    pj_ioqueue_callback	    cb;

-

-    void		   *rd_buf;

-    unsigned                rd_flags;

-    pj_size_t		    rd_buflen;

-    void		   *wr_buf;

-    pj_size_t		    wr_buflen;

-

-    pj_sockaddr_t	   *rmt_addr;

-    int			   *rmt_addrlen;

-

-    pj_sockaddr_t	   *local_addr;

-    int			   *local_addrlen;

-

-    pj_sock_t		   *accept_fd;

-};

-

-/*

- * This describes the I/O queue.

- */

-struct pj_ioqueue_t

-{

-    pj_lock_t          *lock;

-    pj_bool_t           auto_delete_lock;

-    unsigned		max, count;

-    pj_ioqueue_key_t	hlist;

-    int			epfd;

-};

-

-/*

- * pj_ioqueue_create()

- *

- * Create select ioqueue.

- */

-PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, 

-                                       pj_size_t max_fd,

-                                       int max_threads,

-                                       pj_ioqueue_t **p_ioqueue)

-{

-    pj_ioqueue_t *ioque;

-    pj_status_t rc;

-

-    PJ_UNUSED_ARG(max_threads);

-

-    if (max_fd > PJ_IOQUEUE_MAX_HANDLES) {

-        pj_assert(!"max_fd too large");

-	return PJ_EINVAL;

-    }

-

-    ioque = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));

-    ioque->max = max_fd;

-    ioque->count = 0;

-    pj_list_init(&ioque->hlist);

-

-    rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioque->lock);

-    if (rc != PJ_SUCCESS)

-	return rc;

-

-    ioque->auto_delete_lock = PJ_TRUE;

-    ioque->epfd = os_epoll_create(max_fd);

-    if (ioque->epfd < 0) {

-	return PJ_RETURN_OS_ERROR(pj_get_native_os_error());

-    }

-    

-    PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioque));

-

-    *p_ioqueue = ioque;

-    return PJ_SUCCESS;

-}

-

-/*

- * pj_ioqueue_destroy()

- *

- * Destroy ioqueue.

- */

-PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioque)

-{

-    PJ_ASSERT_RETURN(ioque, PJ_EINVAL);

-    PJ_ASSERT_RETURN(ioque->epfd > 0, PJ_EINVALIDOP);

-

-    pj_lock_acquire(ioque->lock);

-    os_close(ioque->epfd);

-    ioque->epfd = 0;

-    if (ioque->auto_delete_lock)

-        pj_lock_destroy(ioque->lock);

-    

-    return PJ_SUCCESS;

-}

-

-/*

- * pj_ioqueue_set_lock()

- */

-PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque, 

-					 pj_lock_t *lock,

-					 pj_bool_t auto_delete )

-{

-    PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL);

-

-    if (ioque->auto_delete_lock) {

-        pj_lock_destroy(ioque->lock);

-    }

-

-    ioque->lock = lock;

-    ioque->auto_delete_lock = auto_delete;

-

-    return PJ_SUCCESS;

-}

-

-

-/*

- * pj_ioqueue_register_sock()

- *

- * Register a socket to ioqueue.

- */

-PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,

-					      pj_ioqueue_t *ioque,

-					      pj_sock_t sock,

-					      void *user_data,

-					      const pj_ioqueue_callback *cb,

-                                              pj_ioqueue_key_t **p_key)

-{

-    pj_ioqueue_key_t *key = NULL;

-    pj_uint32_t value;

-    struct epoll_event ev;

-    int status;

-    pj_status_t rc = PJ_SUCCESS;

-    

-    PJ_ASSERT_RETURN(pool && ioque && sock != PJ_INVALID_SOCKET &&

-                     cb && p_key, PJ_EINVAL);

-

-    pj_lock_acquire(ioque->lock);

-

-    if (ioque->count >= ioque->max) {

-        rc = PJ_ETOOMANY;

-	TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: too many files"));

-	goto on_return;

-    }

-

-    /* Set socket to nonblocking. */

-    value = 1;

-    if ((rc=os_ioctl(sock, FIONBIO, (ioctl_val_type)&value))) {

-	TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: ioctl rc=%d", 

-                rc));

-        rc = pj_get_netos_error();

-	goto on_return;

-    }

-

-    /* Create key. */

-    key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));

-    key->fd = sock;

-    key->user_data = user_data;

-    pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));

-

-    /* os_epoll_ctl. */

-    ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;

-    ev.epoll_data = (epoll_data_type)key;

-    status = os_epoll_ctl(ioque->epfd, EPOLL_CTL_ADD, sock, &ev);

-    if (status < 0) {

-	rc = pj_get_os_error();

-	TRACE_((THIS_FILE, 

-                "pj_ioqueue_register_sock error: os_epoll_ctl rc=%d", 

-                status));

-	goto on_return;

-    }

-    

-    /* Register */

-    pj_list_insert_before(&ioque->hlist, key);

-    ++ioque->count;

-

-on_return:

-    *p_key = key;

-    pj_lock_release(ioque->lock);

-    

-    return rc;

-}

-

-/*

- * pj_ioqueue_unregister()

- *

- * Unregister handle from ioqueue.

- */

-PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque,

-					   pj_ioqueue_key_t *key)

-{

-    struct epoll_event ev;

-    int status;

-    

-    PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL);

-

-    pj_lock_acquire(ioque->lock);

-

-    pj_assert(ioque->count > 0);

-    --ioque->count;

-    pj_list_erase(key);

-

-    ev.events = 0;

-    ev.epoll_data = (epoll_data_type)key;

-    status = os_epoll_ctl( ioque->epfd, EPOLL_CTL_DEL, key->fd, &ev);

-    if (status != 0) {

-	pj_status_t rc = pj_get_os_error();

-	pj_lock_release(ioque->lock);

-	return rc;

-    }

-

-    pj_lock_release(ioque->lock);

-    return PJ_SUCCESS;

-}

-

-/*

- * pj_ioqueue_get_user_data()

- *

- * Obtain value associated with a key.

- */

-PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )

-{

-    PJ_ASSERT_RETURN(key != NULL, NULL);

-    return key->user_data;

-}

-

-

-/*

- * pj_ioqueue_poll()

- *

- */

-PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout)

-{

-    int i, count, processed;

-    struct epoll_event events[16];

-    int msec;

-    

-    PJ_CHECK_STACK();

-

-    msec = timeout ? PJ_TIME_VAL_MSEC(*timeout) : 9000;

-    

-    count = os_epoll_wait( ioque->epfd, events, PJ_ARRAY_SIZE(events), msec);

-    if (count <= 0)

-	return count;

-

-    /* Lock ioqueue. */

-    pj_lock_acquire(ioque->lock);

-

-    processed = 0;

-

-    for (i=0; i<count; ++i) {

-	pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type)

-				events[i].epoll_data;

-	pj_status_t rc;

-

-	/*

-	 * Check for completion of read operations.

-	 */

-	if ((events[i].events & EPOLLIN) && (PJ_IOQUEUE_IS_READ_OP(h->op))) {

-	    pj_ssize_t bytes_read = h->rd_buflen;

-

-	    if ((h->op & PJ_IOQUEUE_OP_RECV_FROM)) {

-	        rc = pj_sock_recvfrom( h->fd, h->rd_buf, &bytes_read, 0,

-				       h->rmt_addr, h->rmt_addrlen);

-	    } else if ((h->op & PJ_IOQUEUE_OP_RECV)) {

-	        rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0);

-	    } else {

-		bytes_read = os_read( h->fd, h->rd_buf, bytes_read);

-		rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();

-	    }

-	    

-	    if (rc != PJ_SUCCESS) {

-	        bytes_read = -rc;

-	    }

-

-	    h->op &= ~(PJ_IOQUEUE_OP_READ | PJ_IOQUEUE_OP_RECV | 

-		       PJ_IOQUEUE_OP_RECV_FROM);

-

-	    /* Call callback. */

-	    (*h->cb.on_read_complete)(h, bytes_read);

-

-	    ++processed;

-	}

-	/*

-	 * Check for completion of accept() operation.

-	 */

-	else if ((events[i].events & EPOLLIN) &&

-		 (h->op & PJ_IOQUEUE_OP_ACCEPT)) 

-	{

-	    /* accept() must be the only operation specified on 

-	     * server socket 

-	     */

-	    pj_assert( h->op == PJ_IOQUEUE_OP_ACCEPT);

-

-	    rc = pj_sock_accept( h->fd, h->accept_fd, 

-			         h->rmt_addr, h->rmt_addrlen);

-	    if (rc==PJ_SUCCESS && h->local_addr) {

-		rc = pj_sock_getsockname(*h->accept_fd, h->local_addr, 

-				          h->local_addrlen);

-	    }

-

-	    h->op &= ~(PJ_IOQUEUE_OP_ACCEPT);

-

-	    /* Call callback. */

-	    (*h->cb.on_accept_complete)(h, *h->accept_fd, rc);

-	    

-	    ++processed;

-	}

-

-	/*

-	 * Check for completion of write operations.

-	 */

-	if ((events[i].events & EPOLLOUT) && PJ_IOQUEUE_IS_WRITE_OP(h->op)) {

-	    /* Completion of write(), send(), or sendto() operation. */

-

-	    /* Clear operation. */

-	    h->op &= ~(PJ_IOQUEUE_OP_WRITE | PJ_IOQUEUE_OP_SEND | 

-                       PJ_IOQUEUE_OP_SEND_TO);

-

-	    /* Call callback. */

-	    /* All data must have been sent? */

-	    (*h->cb.on_write_complete)(h, h->wr_buflen);

-

-	    ++processed;

-	}

-#if PJ_HAS_TCP

-	/*

-	 * Check for completion of connect() operation.

-	 */

-	else if ((events[i].events & EPOLLOUT) && 

-		 (h->op & PJ_IOQUEUE_OP_CONNECT)) 

-	{

-	    /* Completion of connect() operation */

-	    pj_ssize_t bytes_transfered;

-

-	    /* from connect(2): 

-		* On Linux, use getsockopt to read the SO_ERROR option at

-		* level SOL_SOCKET to determine whether connect() completed

-		* successfully (if SO_ERROR is zero).

-		*/

-	    int value;

-	    socklen_t vallen = sizeof(value);

-	    int gs_rc = os_getsockopt(h->fd, SOL_SOCKET, SO_ERROR, 

-                                      (getsockopt_val_ptr)&value, &vallen);

-	    if (gs_rc != 0) {

-		/* Argh!! What to do now??? 

-		 * Just indicate that the socket is connected. The

-		 * application will get error as soon as it tries to use

-		 * the socket to send/receive.

-		 */

-		bytes_transfered = 0;

-	    } else {

-                bytes_transfered = value;

-	    }

-

-	    /* Clear operation. */

-	    h->op &= (~PJ_IOQUEUE_OP_CONNECT);

-

-	    /* Call callback. */

-	    (*h->cb.on_connect_complete)(h, bytes_transfered);

-

-	    ++processed;

-	}

-#endif /* PJ_HAS_TCP */

-	

-	/*

-	 * Check for error condition.

-	 */

-	if (events[i].events & EPOLLERR) {

-	    if (h->op & PJ_IOQUEUE_OP_CONNECT) {

-		h->op &= ~PJ_IOQUEUE_OP_CONNECT;

-

-		/* Call callback. */

-		(*h->cb.on_connect_complete)(h, -1);

-

-		++processed;

-	    }

-	}

-    }

-    

-    pj_lock_release(ioque->lock);

-

-    return processed;

-}

-

-/*

- * pj_ioqueue_read()

- *

- * Start asynchronous read from the descriptor.

- */

-PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque,

-			             pj_ioqueue_key_t *key,

-			             void *buffer,

-			             pj_size_t buflen)

-{

-    PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);

-    PJ_CHECK_STACK();

-

-    /* For consistency with other ioqueue implementation, we would reject 

-     * if descriptor has already been submitted for reading before.

-     */

-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&

-                      (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&

-                      (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),

-                     PJ_EBUSY);

-

-    pj_lock_acquire(ioque->lock);

-

-    key->op |= PJ_IOQUEUE_OP_READ;

-    key->rd_flags = 0;

-    key->rd_buf = buffer;

-    key->rd_buflen = buflen;

-

-    pj_lock_release(ioque->lock);

-    return PJ_EPENDING;

-}

-

-

-/*

- * pj_ioqueue_recv()

- *

- * Start asynchronous recv() from the socket.

- */

-PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_t *ioque,

-				      pj_ioqueue_key_t *key,

-				      void *buffer,

-				      pj_size_t buflen,

-				      unsigned flags )

-{

-    PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);

-    PJ_CHECK_STACK();

-

-    /* For consistency with other ioqueue implementation, we would reject 

-     * if descriptor has already been submitted for reading before.

-     */

-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&

-                      (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&

-                      (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),

-                     PJ_EBUSY);

-

-    pj_lock_acquire(ioque->lock);

-

-    key->op |= PJ_IOQUEUE_OP_RECV;

-    key->rd_buf = buffer;

-    key->rd_buflen = buflen;

-    key->rd_flags = flags;

-

-    pj_lock_release(ioque->lock);

-    return PJ_EPENDING;

-}

-

-/*

- * pj_ioqueue_recvfrom()

- *

- * Start asynchronous recvfrom() from the socket.

- */

-PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque,

-				         pj_ioqueue_key_t *key,

-				         void *buffer,

-				         pj_size_t buflen,

-                                         unsigned flags,

-				         pj_sockaddr_t *addr,

-				         int *addrlen)

-{

-    PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);

-    PJ_CHECK_STACK();

-

-    /* For consistency with other ioqueue implementation, we would reject 

-     * if descriptor has already been submitted for reading before.

-     */

-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&

-                      (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&

-                      (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),

-                     PJ_EBUSY);

-

-    pj_lock_acquire(ioque->lock);

-

-    key->op |= PJ_IOQUEUE_OP_RECV_FROM;

-    key->rd_buf = buffer;

-    key->rd_buflen = buflen;

-    key->rd_flags = flags;

-    key->rmt_addr = addr;

-    key->rmt_addrlen = addrlen;

-

-    pj_lock_release(ioque->lock);

-    return PJ_EPENDING;

-}

-

-/*

- * pj_ioqueue_write()

- *

- * Start asynchronous write() to the descriptor.

- */

-PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque,

-			              pj_ioqueue_key_t *key,

-			              const void *data,

-			              pj_size_t datalen)

-{

-    pj_status_t rc;

-    pj_ssize_t sent;

-

-    PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);

-    PJ_CHECK_STACK();

-

-    /* For consistency with other ioqueue implementation, we would reject 

-     * if descriptor has already been submitted for writing before.

-     */

-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&

-                      (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&

-                      (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),

-                     PJ_EBUSY);

-

-    sent = datalen;

-    /* sent would be -1 after pj_sock_send() if it returns error. */

-    rc = pj_sock_send(key->fd, data, &sent, 0);

-    if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {

-        return rc;

-    }

-

-    pj_lock_acquire(ioque->lock);

-

-    key->op |= PJ_IOQUEUE_OP_WRITE;

-    key->wr_buf = NULL;

-    key->wr_buflen = datalen;

-

-    pj_lock_release(ioque->lock);

-

-    return PJ_EPENDING;

-}

-

-/*

- * pj_ioqueue_send()

- *

- * Start asynchronous send() to the descriptor.

- */

-PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque,

-			             pj_ioqueue_key_t *key,

-			             const void *data,

-			             pj_size_t datalen,

-                                     unsigned flags)

-{

-    pj_status_t rc;

-    pj_ssize_t sent;

-

-    PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);

-    PJ_CHECK_STACK();

-

-    /* For consistency with other ioqueue implementation, we would reject 

-     * if descriptor has already been submitted for writing before.

-     */

-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&

-                      (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&

-                      (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),

-                     PJ_EBUSY);

-

-    sent = datalen;

-    /* sent would be -1 after pj_sock_send() if it returns error. */

-    rc = pj_sock_send(key->fd, data, &sent, flags);

-    if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {

-        return rc;

-    }

-

-    pj_lock_acquire(ioque->lock);

-

-    key->op |= PJ_IOQUEUE_OP_SEND;

-    key->wr_buf = NULL;

-    key->wr_buflen = datalen;

-

-    pj_lock_release(ioque->lock);

-

-    return PJ_EPENDING;

-}

-

-

-/*

- * pj_ioqueue_sendto()

- *

- * Start asynchronous write() to the descriptor.

- */

-PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque,

-			               pj_ioqueue_key_t *key,

-			               const void *data,

-			               pj_size_t datalen,

-                                       unsigned flags,

-			               const pj_sockaddr_t *addr,

-			               int addrlen)

-{

-    pj_status_t rc;

-    pj_ssize_t sent;

-

-    PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);

-    PJ_CHECK_STACK();

-

-    /* For consistency with other ioqueue implementation, we would reject 

-     * if descriptor has already been submitted for writing before.

-     */

-    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&

-                      (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&

-                      (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),

-                     PJ_EBUSY);

-

-    sent = datalen;

-    /* sent would be -1 after pj_sock_sendto() if it returns error. */

-    rc = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);

-    if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK))  {

-        return rc;

-    }

-

-    pj_lock_acquire(ioque->lock);

-

-    key->op |= PJ_IOQUEUE_OP_SEND_TO;

-    key->wr_buf = NULL;

-    key->wr_buflen = datalen;

-

-    pj_lock_release(ioque->lock);

-    return PJ_EPENDING;

-}

-

-#if PJ_HAS_TCP

-/*

- * Initiate overlapped accept() operation.

- */

-PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue,

-			       pj_ioqueue_key_t *key,

-			       pj_sock_t *new_sock,

-			       pj_sockaddr_t *local,

-			       pj_sockaddr_t *remote,

-			       int *addrlen)

-{

-    /* check parameters. All must be specified! */

-    pj_assert(ioqueue && key && new_sock);

-

-    /* Server socket must have no other operation! */

-    pj_assert(key->op == 0);

-    

-    pj_lock_acquire(ioqueue->lock);

-

-    key->op = PJ_IOQUEUE_OP_ACCEPT;

-    key->accept_fd = new_sock;

-    key->rmt_addr = remote;

-    key->rmt_addrlen = addrlen;

-    key->local_addr = local;

-    key->local_addrlen = addrlen;   /* use same addr. as rmt_addrlen */

-

-    pj_lock_release(ioqueue->lock);

-    return PJ_EPENDING;

-}

-

-/*

- * Initiate overlapped connect() operation (well, it's non-blocking actually,

- * since there's no overlapped version of connect()).

- */

-PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue,

-					pj_ioqueue_key_t *key,

-					const pj_sockaddr_t *addr,

-					int addrlen )

-{

-    pj_status_t rc;

-    

-    /* check parameters. All must be specified! */

-    PJ_ASSERT_RETURN(ioqueue && key && addr && addrlen, PJ_EINVAL);

-

-    /* Connecting socket must have no other operation! */

-    PJ_ASSERT_RETURN(key->op == 0, PJ_EBUSY);

-    

-    rc = pj_sock_connect(key->fd, addr, addrlen);

-    if (rc == PJ_SUCCESS) {

-	/* Connected! */

-	return PJ_SUCCESS;

-    } else {

-	if (rc == PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) || 

-            rc == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) 

-        {

-	    /* Pending! */

-	    pj_lock_acquire(ioqueue->lock);

-	    key->op = PJ_IOQUEUE_OP_CONNECT;

-	    pj_lock_release(ioqueue->lock);

-	    return PJ_EPENDING;

-	} else {

-	    /* Error! */

-	    return rc;

-	}

-    }

-}

-#endif	/* PJ_HAS_TCP */

-

+/* $Header: /pjproject-0.3/pjlib/src/pj/ioqueue_epoll.c 4     10/29/05 10:27p Bennylp $ */
+/* 
+ * $Log: /pjproject-0.3/pjlib/src/pj/ioqueue_epoll.c $
+ * 
+ * 4     10/29/05 10:27p Bennylp
+ * Fixed misc warnings.
+ * 
+ * 3     10/29/05 11:49a Bennylp
+ * Fixed warnings.
+ * 
+ * 2     10/29/05 11:31a Bennylp
+ * Changed accept and lock.
+ * 
+ * 1     10/17/05 10:49p Bennylp
+ * Created.
+ * 
+ */
+
+/*
+ * ioqueue_epoll.c
+ *
+ * This is the implementation of IOQueue framework using /dev/epoll
+ * API in _both_ Linux user-mode and kernel-mode.
+ */
+
+#include <pj/ioqueue.h>
+#include <pj/os.h>
+#include <pj/lock.h>
+#include <pj/log.h>
+#include <pj/list.h>
+#include <pj/pool.h>
+#include <pj/string.h>
+#include <pj/assert.h>
+#include <pj/errno.h>
+#include <pj/sock.h>
+#include <pj/compat/socket.h>
+
+#if !defined(PJ_LINUX_KERNEL) || PJ_LINUX_KERNEL==0
+    /*
+     * Linux user mode
+     */
+#   include <sys/epoll.h>
+#   include <errno.h>
+#   include <unistd.h>
+
+#   define epoll_data		data.ptr
+#   define epoll_data_type	void*
+#   define ioctl_val_type	unsigned long*
+#   define getsockopt_val_ptr	int*
+#   define os_getsockopt	getsockopt
+#   define os_ioctl		ioctl
+#   define os_read		read
+#   define os_close		close
+#   define os_epoll_create	epoll_create
+#   define os_epoll_ctl		epoll_ctl
+#   define os_epoll_wait	epoll_wait
+#else
+    /*
+     * Linux kernel mode.
+     */
+#   include <linux/config.h>
+#   include <linux/version.h>
+#   if defined(MODVERSIONS)
+#	include <linux/modversions.h>
+#   endif
+#   include <linux/kernel.h>
+#   include <linux/poll.h>
+#   include <linux/eventpoll.h>
+#   include <linux/syscalls.h>
+#   include <linux/errno.h>
+#   include <linux/unistd.h>
+#   include <asm/ioctls.h>
+    enum EPOLL_EVENTS
+    {
+	EPOLLIN = 0x001,
+	EPOLLOUT = 0x004,
+	EPOLLERR = 0x008,
+    };
+#   define os_epoll_create		sys_epoll_create
+    static int os_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
+    {
+	long rc;
+	mm_segment_t oldfs = get_fs();
+	set_fs(KERNEL_DS);
+	rc = sys_epoll_ctl(epfd, op, fd, event);
+	set_fs(oldfs);
+	if (rc) {
+	    errno = -rc;
+	    return -1;
+	} else {
+	    return 0;
+	}
+    }
+    static int os_epoll_wait(int epfd, struct epoll_event *events,
+			  int maxevents, int timeout)
+    {
+	int count;
+	mm_segment_t oldfs = get_fs();
+	set_fs(KERNEL_DS);
+	count = sys_epoll_wait(epfd, events, maxevents, timeout);
+	set_fs(oldfs);
+	return count;
+    }
+#   define os_close		sys_close
+#   define os_getsockopt	pj_sock_getsockopt
+    static int os_read(int fd, void *buf, size_t len)
+    {
+	long rc;
+	mm_segment_t oldfs = get_fs();
+	set_fs(KERNEL_DS);
+	rc = sys_read(fd, buf, len);
+	set_fs(oldfs);
+	if (rc) {
+	    errno = -rc;
+	    return -1;
+	} else {
+	    return 0;
+	}
+    }
+#   define socklen_t		unsigned
+#   define ioctl_val_type	unsigned long
+    int ioctl(int fd, int opt, ioctl_val_type value);
+    static int os_ioctl(int fd, int opt, ioctl_val_type value)
+    {
+	int rc;
+        mm_segment_t oldfs = get_fs();
+	set_fs(KERNEL_DS);
+	rc = ioctl(fd, opt, value);
+	set_fs(oldfs);
+	if (rc < 0) {
+	    errno = -rc;
+	    return rc;
+	} else
+	    return rc;
+    }
+#   define getsockopt_val_ptr	char*
+
+#   define epoll_data		data
+#   define epoll_data_type	__u32
+#endif
+
+#define THIS_FILE   "ioq_epoll"
+
+#define PJ_IOQUEUE_IS_READ_OP(op)   ((op & PJ_IOQUEUE_OP_READ) || \
+                                     (op & PJ_IOQUEUE_OP_RECV) || \
+                                     (op & PJ_IOQUEUE_OP_RECV_FROM))
+#define PJ_IOQUEUE_IS_WRITE_OP(op)  ((op & PJ_IOQUEUE_OP_WRITE) || \
+                                     (op & PJ_IOQUEUE_OP_SEND) || \
+                                     (op & PJ_IOQUEUE_OP_SEND_TO))
+
+
+#if PJ_HAS_TCP
+#  define PJ_IOQUEUE_IS_ACCEPT_OP(op)	(op & PJ_IOQUEUE_OP_ACCEPT)
+#  define PJ_IOQUEUE_IS_CONNECT_OP(op)	(op & PJ_IOQUEUE_OP_CONNECT)
+#else
+#  define PJ_IOQUEUE_IS_ACCEPT_OP(op)	0
+#  define PJ_IOQUEUE_IS_CONNECT_OP(op)	0
+#endif
+
+
+//#define TRACE_(expr) PJ_LOG(3,expr)
+#define TRACE_(expr)
+
+
+/*
+ * This describes each key.
+ */
+struct pj_ioqueue_key_t
+{
+    PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t)
+    pj_sock_t		    fd;
+    pj_ioqueue_operation_e  op;
+    void		   *user_data;
+    pj_ioqueue_callback	    cb;
+
+    void		   *rd_buf;
+    unsigned                rd_flags;
+    pj_size_t		    rd_buflen;
+    void		   *wr_buf;
+    pj_size_t		    wr_buflen;
+
+    pj_sockaddr_t	   *rmt_addr;
+    int			   *rmt_addrlen;
+
+    pj_sockaddr_t	   *local_addr;
+    int			   *local_addrlen;
+
+    pj_sock_t		   *accept_fd;
+};
+
+/*
+ * This describes the I/O queue.
+ */
+struct pj_ioqueue_t
+{
+    pj_lock_t          *lock;
+    pj_bool_t           auto_delete_lock;
+    unsigned		max, count;
+    pj_ioqueue_key_t	hlist;
+    int			epfd;
+};
+
+/*
+ * pj_ioqueue_create()
+ *
+ * Create select ioqueue.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, 
+                                       pj_size_t max_fd,
+                                       int max_threads,
+                                       pj_ioqueue_t **p_ioqueue)
+{
+    pj_ioqueue_t *ioque;
+    pj_status_t rc;
+
+    PJ_UNUSED_ARG(max_threads);
+
+    if (max_fd > PJ_IOQUEUE_MAX_HANDLES) {
+        pj_assert(!"max_fd too large");
+	return PJ_EINVAL;
+    }
+
+    ioque = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
+    ioque->max = max_fd;
+    ioque->count = 0;
+    pj_list_init(&ioque->hlist);
+
+    rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioque->lock);
+    if (rc != PJ_SUCCESS)
+	return rc;
+
+    ioque->auto_delete_lock = PJ_TRUE;
+    ioque->epfd = os_epoll_create(max_fd);
+    if (ioque->epfd < 0) {
+	return PJ_RETURN_OS_ERROR(pj_get_native_os_error());
+    }
+    
+    PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioque));
+
+    *p_ioqueue = ioque;
+    return PJ_SUCCESS;
+}
+
+/*
+ * pj_ioqueue_destroy()
+ *
+ * Destroy ioqueue.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioque)
+{
+    PJ_ASSERT_RETURN(ioque, PJ_EINVAL);
+    PJ_ASSERT_RETURN(ioque->epfd > 0, PJ_EINVALIDOP);
+
+    pj_lock_acquire(ioque->lock);
+    os_close(ioque->epfd);
+    ioque->epfd = 0;
+    if (ioque->auto_delete_lock)
+        pj_lock_destroy(ioque->lock);
+    
+    return PJ_SUCCESS;
+}
+
+/*
+ * pj_ioqueue_set_lock()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque, 
+					 pj_lock_t *lock,
+					 pj_bool_t auto_delete )
+{
+    PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL);
+
+    if (ioque->auto_delete_lock) {
+        pj_lock_destroy(ioque->lock);
+    }
+
+    ioque->lock = lock;
+    ioque->auto_delete_lock = auto_delete;
+
+    return PJ_SUCCESS;
+}
+
+
+/*
+ * pj_ioqueue_register_sock()
+ *
+ * Register a socket to ioqueue.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
+					      pj_ioqueue_t *ioque,
+					      pj_sock_t sock,
+					      void *user_data,
+					      const pj_ioqueue_callback *cb,
+                                              pj_ioqueue_key_t **p_key)
+{
+    pj_ioqueue_key_t *key = NULL;
+    pj_uint32_t value;
+    struct epoll_event ev;
+    int status;
+    pj_status_t rc = PJ_SUCCESS;
+    
+    PJ_ASSERT_RETURN(pool && ioque && sock != PJ_INVALID_SOCKET &&
+                     cb && p_key, PJ_EINVAL);
+
+    pj_lock_acquire(ioque->lock);
+
+    if (ioque->count >= ioque->max) {
+        rc = PJ_ETOOMANY;
+	TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: too many files"));
+	goto on_return;
+    }
+
+    /* Set socket to nonblocking. */
+    value = 1;
+    if ((rc=os_ioctl(sock, FIONBIO, (ioctl_val_type)&value))) {
+	TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: ioctl rc=%d", 
+                rc));
+        rc = pj_get_netos_error();
+	goto on_return;
+    }
+
+    /* Create key. */
+    key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
+    key->fd = sock;
+    key->user_data = user_data;
+    pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
+
+    /* os_epoll_ctl. */
+    ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;
+    ev.epoll_data = (epoll_data_type)key;
+    status = os_epoll_ctl(ioque->epfd, EPOLL_CTL_ADD, sock, &ev);
+    if (status < 0) {
+	rc = pj_get_os_error();
+	TRACE_((THIS_FILE, 
+                "pj_ioqueue_register_sock error: os_epoll_ctl rc=%d", 
+                status));
+	goto on_return;
+    }
+    
+    /* Register */
+    pj_list_insert_before(&ioque->hlist, key);
+    ++ioque->count;
+
+on_return:
+    *p_key = key;
+    pj_lock_release(ioque->lock);
+    
+    return rc;
+}
+
+/*
+ * pj_ioqueue_unregister()
+ *
+ * Unregister handle from ioqueue.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque,
+					   pj_ioqueue_key_t *key)
+{
+    struct epoll_event ev;
+    int status;
+    
+    PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL);
+
+    pj_lock_acquire(ioque->lock);
+
+    pj_assert(ioque->count > 0);
+    --ioque->count;
+    pj_list_erase(key);
+
+    ev.events = 0;
+    ev.epoll_data = (epoll_data_type)key;
+    status = os_epoll_ctl( ioque->epfd, EPOLL_CTL_DEL, key->fd, &ev);
+    if (status != 0) {
+	pj_status_t rc = pj_get_os_error();
+	pj_lock_release(ioque->lock);
+	return rc;
+    }
+
+    pj_lock_release(ioque->lock);
+    return PJ_SUCCESS;
+}
+
+/*
+ * pj_ioqueue_get_user_data()
+ *
+ * Obtain value associated with a key.
+ */
+PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
+{
+    PJ_ASSERT_RETURN(key != NULL, NULL);
+    return key->user_data;
+}
+
+
+/*
+ * pj_ioqueue_poll()
+ *
+ */
+PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout)
+{
+    int i, count, processed;
+    struct epoll_event events[16];
+    int msec;
+    
+    PJ_CHECK_STACK();
+
+    msec = timeout ? PJ_TIME_VAL_MSEC(*timeout) : 9000;
+    
+    count = os_epoll_wait( ioque->epfd, events, PJ_ARRAY_SIZE(events), msec);
+    if (count <= 0)
+	return count;
+
+    /* Lock ioqueue. */
+    pj_lock_acquire(ioque->lock);
+
+    processed = 0;
+
+    for (i=0; i<count; ++i) {
+	pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type)
+				events[i].epoll_data;
+	pj_status_t rc;
+
+	/*
+	 * Check for completion of read operations.
+	 */
+	if ((events[i].events & EPOLLIN) && (PJ_IOQUEUE_IS_READ_OP(h->op))) {
+	    pj_ssize_t bytes_read = h->rd_buflen;
+
+	    if ((h->op & PJ_IOQUEUE_OP_RECV_FROM)) {
+	        rc = pj_sock_recvfrom( h->fd, h->rd_buf, &bytes_read, 0,
+				       h->rmt_addr, h->rmt_addrlen);
+	    } else if ((h->op & PJ_IOQUEUE_OP_RECV)) {
+	        rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0);
+	    } else {
+		bytes_read = os_read( h->fd, h->rd_buf, bytes_read);
+		rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
+	    }
+	    
+	    if (rc != PJ_SUCCESS) {
+	        bytes_read = -rc;
+	    }
+
+	    h->op &= ~(PJ_IOQUEUE_OP_READ | PJ_IOQUEUE_OP_RECV | 
+		       PJ_IOQUEUE_OP_RECV_FROM);
+
+	    /* Call callback. */
+	    (*h->cb.on_read_complete)(h, bytes_read);
+
+	    ++processed;
+	}
+	/*
+	 * Check for completion of accept() operation.
+	 */
+	else if ((events[i].events & EPOLLIN) &&
+		 (h->op & PJ_IOQUEUE_OP_ACCEPT)) 
+	{
+	    /* accept() must be the only operation specified on 
+	     * server socket 
+	     */
+	    pj_assert( h->op == PJ_IOQUEUE_OP_ACCEPT);
+
+	    rc = pj_sock_accept( h->fd, h->accept_fd, 
+			         h->rmt_addr, h->rmt_addrlen);
+	    if (rc==PJ_SUCCESS && h->local_addr) {
+		rc = pj_sock_getsockname(*h->accept_fd, h->local_addr, 
+				          h->local_addrlen);
+	    }
+
+	    h->op &= ~(PJ_IOQUEUE_OP_ACCEPT);
+
+	    /* Call callback. */
+	    (*h->cb.on_accept_complete)(h, *h->accept_fd, rc);
+	    
+	    ++processed;
+	}
+
+	/*
+	 * Check for completion of write operations.
+	 */
+	if ((events[i].events & EPOLLOUT) && PJ_IOQUEUE_IS_WRITE_OP(h->op)) {
+	    /* Completion of write(), send(), or sendto() operation. */
+
+	    /* Clear operation. */
+	    h->op &= ~(PJ_IOQUEUE_OP_WRITE | PJ_IOQUEUE_OP_SEND | 
+                       PJ_IOQUEUE_OP_SEND_TO);
+
+	    /* Call callback. */
+	    /* All data must have been sent? */
+	    (*h->cb.on_write_complete)(h, h->wr_buflen);
+
+	    ++processed;
+	}
+#if PJ_HAS_TCP
+	/*
+	 * Check for completion of connect() operation.
+	 */
+	else if ((events[i].events & EPOLLOUT) && 
+		 (h->op & PJ_IOQUEUE_OP_CONNECT)) 
+	{
+	    /* Completion of connect() operation */
+	    pj_ssize_t bytes_transfered;
+
+	    /* from connect(2): 
+		* On Linux, use getsockopt to read the SO_ERROR option at
+		* level SOL_SOCKET to determine whether connect() completed
+		* successfully (if SO_ERROR is zero).
+		*/
+	    int value;
+	    socklen_t vallen = sizeof(value);
+	    int gs_rc = os_getsockopt(h->fd, SOL_SOCKET, SO_ERROR, 
+                                      (getsockopt_val_ptr)&value, &vallen);
+	    if (gs_rc != 0) {
+		/* Argh!! What to do now??? 
+		 * Just indicate that the socket is connected. The
+		 * application will get error as soon as it tries to use
+		 * the socket to send/receive.
+		 */
+		bytes_transfered = 0;
+	    } else {
+                bytes_transfered = value;
+	    }
+
+	    /* Clear operation. */
+	    h->op &= (~PJ_IOQUEUE_OP_CONNECT);
+
+	    /* Call callback. */
+	    (*h->cb.on_connect_complete)(h, bytes_transfered);
+
+	    ++processed;
+	}
+#endif /* PJ_HAS_TCP */
+	
+	/*
+	 * Check for error condition.
+	 */
+	if (events[i].events & EPOLLERR) {
+	    if (h->op & PJ_IOQUEUE_OP_CONNECT) {
+		h->op &= ~PJ_IOQUEUE_OP_CONNECT;
+
+		/* Call callback. */
+		(*h->cb.on_connect_complete)(h, -1);
+
+		++processed;
+	    }
+	}
+    }
+    
+    pj_lock_release(ioque->lock);
+
+    return processed;
+}
+
+/*
+ * pj_ioqueue_read()
+ *
+ * Start asynchronous read from the descriptor.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque,
+			             pj_ioqueue_key_t *key,
+			             void *buffer,
+			             pj_size_t buflen)
+{
+    PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
+    PJ_CHECK_STACK();
+
+    /* For consistency with other ioqueue implementation, we would reject 
+     * if descriptor has already been submitted for reading before.
+     */
+    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
+                      (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
+                      (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
+                     PJ_EBUSY);
+
+    pj_lock_acquire(ioque->lock);
+
+    key->op |= PJ_IOQUEUE_OP_READ;
+    key->rd_flags = 0;
+    key->rd_buf = buffer;
+    key->rd_buflen = buflen;
+
+    pj_lock_release(ioque->lock);
+    return PJ_EPENDING;
+}
+
+
+/*
+ * pj_ioqueue_recv()
+ *
+ * Start asynchronous recv() from the socket.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_t *ioque,
+				      pj_ioqueue_key_t *key,
+				      void *buffer,
+				      pj_size_t buflen,
+				      unsigned flags )
+{
+    PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
+    PJ_CHECK_STACK();
+
+    /* For consistency with other ioqueue implementation, we would reject 
+     * if descriptor has already been submitted for reading before.
+     */
+    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
+                      (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
+                      (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
+                     PJ_EBUSY);
+
+    pj_lock_acquire(ioque->lock);
+
+    key->op |= PJ_IOQUEUE_OP_RECV;
+    key->rd_buf = buffer;
+    key->rd_buflen = buflen;
+    key->rd_flags = flags;
+
+    pj_lock_release(ioque->lock);
+    return PJ_EPENDING;
+}
+
+/*
+ * pj_ioqueue_recvfrom()
+ *
+ * Start asynchronous recvfrom() from the socket.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque,
+				         pj_ioqueue_key_t *key,
+				         void *buffer,
+				         pj_size_t buflen,
+                                         unsigned flags,
+				         pj_sockaddr_t *addr,
+				         int *addrlen)
+{
+    PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
+    PJ_CHECK_STACK();
+
+    /* For consistency with other ioqueue implementation, we would reject 
+     * if descriptor has already been submitted for reading before.
+     */
+    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
+                      (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
+                      (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
+                     PJ_EBUSY);
+
+    pj_lock_acquire(ioque->lock);
+
+    key->op |= PJ_IOQUEUE_OP_RECV_FROM;
+    key->rd_buf = buffer;
+    key->rd_buflen = buflen;
+    key->rd_flags = flags;
+    key->rmt_addr = addr;
+    key->rmt_addrlen = addrlen;
+
+    pj_lock_release(ioque->lock);
+    return PJ_EPENDING;
+}
+
+/*
+ * pj_ioqueue_write()
+ *
+ * Start asynchronous write() to the descriptor.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque,
+			              pj_ioqueue_key_t *key,
+			              const void *data,
+			              pj_size_t datalen)
+{
+    pj_status_t rc;
+    pj_ssize_t sent;
+
+    PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
+    PJ_CHECK_STACK();
+
+    /* For consistency with other ioqueue implementation, we would reject 
+     * if descriptor has already been submitted for writing before.
+     */
+    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
+                      (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
+                      (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
+                     PJ_EBUSY);
+
+    sent = datalen;
+    /* sent would be -1 after pj_sock_send() if it returns error. */
+    rc = pj_sock_send(key->fd, data, &sent, 0);
+    if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
+        return rc;
+    }
+
+    pj_lock_acquire(ioque->lock);
+
+    key->op |= PJ_IOQUEUE_OP_WRITE;
+    key->wr_buf = NULL;
+    key->wr_buflen = datalen;
+
+    pj_lock_release(ioque->lock);
+
+    return PJ_EPENDING;
+}
+
+/*
+ * pj_ioqueue_send()
+ *
+ * Start asynchronous send() to the descriptor.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque,
+			             pj_ioqueue_key_t *key,
+			             const void *data,
+			             pj_size_t datalen,
+                                     unsigned flags)
+{
+    pj_status_t rc;
+    pj_ssize_t sent;
+
+    PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
+    PJ_CHECK_STACK();
+
+    /* For consistency with other ioqueue implementation, we would reject 
+     * if descriptor has already been submitted for writing before.
+     */
+    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
+                      (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
+                      (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
+                     PJ_EBUSY);
+
+    sent = datalen;
+    /* sent would be -1 after pj_sock_send() if it returns error. */
+    rc = pj_sock_send(key->fd, data, &sent, flags);
+    if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
+        return rc;
+    }
+
+    pj_lock_acquire(ioque->lock);
+
+    key->op |= PJ_IOQUEUE_OP_SEND;
+    key->wr_buf = NULL;
+    key->wr_buflen = datalen;
+
+    pj_lock_release(ioque->lock);
+
+    return PJ_EPENDING;
+}
+
+
+/*
+ * pj_ioqueue_sendto()
+ *
+ * Start asynchronous write() to the descriptor.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque,
+			               pj_ioqueue_key_t *key,
+			               const void *data,
+			               pj_size_t datalen,
+                                       unsigned flags,
+			               const pj_sockaddr_t *addr,
+			               int addrlen)
+{
+    pj_status_t rc;
+    pj_ssize_t sent;
+
+    PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
+    PJ_CHECK_STACK();
+
+    /* For consistency with other ioqueue implementation, we would reject 
+     * if descriptor has already been submitted for writing before.
+     */
+    PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
+                      (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
+                      (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
+                     PJ_EBUSY);
+
+    sent = datalen;
+    /* sent would be -1 after pj_sock_sendto() if it returns error. */
+    rc = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
+    if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK))  {
+        return rc;
+    }
+
+    pj_lock_acquire(ioque->lock);
+
+    key->op |= PJ_IOQUEUE_OP_SEND_TO;
+    key->wr_buf = NULL;
+    key->wr_buflen = datalen;
+
+    pj_lock_release(ioque->lock);
+    return PJ_EPENDING;
+}
+
+#if PJ_HAS_TCP
+/*
+ * Initiate overlapped accept() operation.
+ */
+PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue,
+			       pj_ioqueue_key_t *key,
+			       pj_sock_t *new_sock,
+			       pj_sockaddr_t *local,
+			       pj_sockaddr_t *remote,
+			       int *addrlen)
+{
+    /* check parameters. All must be specified! */
+    pj_assert(ioqueue && key && new_sock);
+
+    /* Server socket must have no other operation! */
+    pj_assert(key->op == 0);
+    
+    pj_lock_acquire(ioqueue->lock);
+
+    key->op = PJ_IOQUEUE_OP_ACCEPT;
+    key->accept_fd = new_sock;
+    key->rmt_addr = remote;
+    key->rmt_addrlen = addrlen;
+    key->local_addr = local;
+    key->local_addrlen = addrlen;   /* use same addr. as rmt_addrlen */
+
+    pj_lock_release(ioqueue->lock);
+    return PJ_EPENDING;
+}
+
+/*
+ * Initiate overlapped connect() operation (well, it's non-blocking actually,
+ * since there's no overlapped version of connect()).
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue,
+					pj_ioqueue_key_t *key,
+					const pj_sockaddr_t *addr,
+					int addrlen )
+{
+    pj_status_t rc;
+    
+    /* check parameters. All must be specified! */
+    PJ_ASSERT_RETURN(ioqueue && key && addr && addrlen, PJ_EINVAL);
+
+    /* Connecting socket must have no other operation! */
+    PJ_ASSERT_RETURN(key->op == 0, PJ_EBUSY);
+    
+    rc = pj_sock_connect(key->fd, addr, addrlen);
+    if (rc == PJ_SUCCESS) {
+	/* Connected! */
+	return PJ_SUCCESS;
+    } else {
+	if (rc == PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) || 
+            rc == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) 
+        {
+	    /* Pending! */
+	    pj_lock_acquire(ioqueue->lock);
+	    key->op = PJ_IOQUEUE_OP_CONNECT;
+	    pj_lock_release(ioqueue->lock);
+	    return PJ_EPENDING;
+	} else {
+	    /* Error! */
+	    return rc;
+	}
+    }
+}
+#endif	/* PJ_HAS_TCP */
+