blob: 367ffb5e5a0b0d0521a6bd0d4388bf8a6890e9f3 [file] [log] [blame]
/* $Id$
*/
/*
* sock_select.c
*
* This is the implementation of IOQueue using pj_sock_select().
* It runs anywhere where pj_sock_select() is available (currently
* Win32, Linux, Linux kernel, etc.).
*/
#include <pj/ioqueue.h>
#include <pj/os.h>
#include <pj/lock.h>
#include <pj/log.h>
#include <pj/list.h>
#include <pj/pool.h>
#include <pj/string.h>
#include <pj/assert.h>
#include <pj/sock.h>
#include <pj/compat/socket.h>
#include <pj/sock_select.h>
#include <pj/errno.h>
/*
* ISSUES with ioqueue_select()
*
* EAGAIN/EWOULDBLOCK error in recv():
* - when multiple threads are working with the ioqueue, application
* may receive EAGAIN or EWOULDBLOCK in the receive callback.
* This error happens because more than one thread is watching for
* the same descriptor set, so when all of them call recv() or recvfrom()
* simultaneously, only one will succeed and the rest will get the error.
*
*/
#define THIS_FILE "ioq_select"
/*
* The select ioqueue relies on socket functions (pj_sock_xxx()) to return
* the correct error code.
*/
#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
# error "Error reporting must be enabled for this function to work!"
#endif
/**
* Get the number of descriptors in the set. This is defined in sock_select.c
* This function will only return the number of sockets set from PJ_FD_SET
* operation. When the set is modified by other means (such as by select()),
* the count will not be reflected here.
*
* That's why don't export this function in the header file, to avoid
* misunderstanding.
*
* @param fdsetp The descriptor set.
*
* @return Number of descriptors in the set.
*/
PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
/*
* During debugging build, VALIDATE_FD_SET is set.
* This will check the validity of the fd_sets.
*/
#if defined(PJ_DEBUG) && PJ_DEBUG != 0
# define VALIDATE_FD_SET 1
#else
# define VALIDATE_FD_SET 0
#endif
struct generic_operation
{
PJ_DECL_LIST_MEMBER(struct generic_operation);
pj_ioqueue_operation_e op;
};
struct read_operation
{
PJ_DECL_LIST_MEMBER(struct read_operation);
pj_ioqueue_operation_e op;
void *buf;
pj_size_t size;
unsigned flags;
pj_sockaddr_t *rmt_addr;
int *rmt_addrlen;
};
struct write_operation
{
PJ_DECL_LIST_MEMBER(struct write_operation);
pj_ioqueue_operation_e op;
char *buf;
pj_size_t size;
pj_ssize_t written;
unsigned flags;
pj_sockaddr_in rmt_addr;
int rmt_addrlen;
};
#if PJ_HAS_TCP
struct accept_operation
{
PJ_DECL_LIST_MEMBER(struct accept_operation);
pj_ioqueue_operation_e op;
pj_sock_t *accept_fd;
pj_sockaddr_t *local_addr;
pj_sockaddr_t *rmt_addr;
int *addrlen;
};
#endif
union operation_key
{
struct generic_operation generic;
struct read_operation read;
struct write_operation write;
#if PJ_HAS_TCP
struct accept_operation accept;
#endif
};
/*
* This describes each key.
*/
struct pj_ioqueue_key_t
{
PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
pj_ioqueue_t *ioqueue;
pj_sock_t fd;
void *user_data;
pj_ioqueue_callback cb;
int connecting;
struct read_operation read_list;
struct write_operation write_list;
#if PJ_HAS_TCP
struct accept_operation accept_list;
#endif
};
/*
* This describes the I/O queue itself.
*/
struct pj_ioqueue_t
{
pj_lock_t *lock;
pj_bool_t auto_delete_lock;
unsigned max, count;
pj_ioqueue_key_t key_list;
pj_fd_set_t rfdset;
pj_fd_set_t wfdset;
#if PJ_HAS_TCP
pj_fd_set_t xfdset;
#endif
};
/*
* pj_ioqueue_create()
*
* Create select ioqueue.
*/
PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
pj_size_t max_fd,
pj_ioqueue_t **p_ioqueue)
{
pj_ioqueue_t *ioqueue;
pj_status_t rc;
/* Check that arguments are valid. */
PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,
PJ_EINVAL);
/* Check that size of pj_ioqueue_op_key_t is sufficient */
PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
sizeof(union operation_key), PJ_EBUG);
ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
ioqueue->max = max_fd;
ioqueue->count = 0;
PJ_FD_ZERO(&ioqueue->rfdset);
PJ_FD_ZERO(&ioqueue->wfdset);
#if PJ_HAS_TCP
PJ_FD_ZERO(&ioqueue->xfdset);
#endif
pj_list_init(&ioqueue->key_list);
rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioqueue->lock);
if (rc != PJ_SUCCESS)
return rc;
ioqueue->auto_delete_lock = PJ_TRUE;
PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
*p_ioqueue = ioqueue;
return PJ_SUCCESS;
}
/*
* pj_ioqueue_destroy()
*
* Destroy ioqueue.
*/
PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
{
pj_status_t rc = PJ_SUCCESS;
PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
pj_lock_acquire(ioqueue->lock);
if (ioqueue->auto_delete_lock)
rc = pj_lock_destroy(ioqueue->lock);
return rc;
}
/*
* pj_ioqueue_register_sock()
*
* Register a handle to ioqueue.
*/
PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
pj_ioqueue_t *ioqueue,
pj_sock_t sock,
void *user_data,
const pj_ioqueue_callback *cb,
pj_ioqueue_key_t **p_key)
{
pj_ioqueue_key_t *key = NULL;
pj_uint32_t value;
pj_status_t rc = PJ_SUCCESS;
PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
cb && p_key, PJ_EINVAL);
pj_lock_acquire(ioqueue->lock);
if (ioqueue->count >= ioqueue->max) {
rc = PJ_ETOOMANY;
goto on_return;
}
/* Set socket to nonblocking. */
value = 1;
#ifdef PJ_WIN32
if (ioctlsocket(sock, FIONBIO, (u_long*)&value)) {
#else
if (ioctl(sock, FIONBIO, &value)) {
#endif
rc = pj_get_netos_error();
goto on_return;
}
/* Create key. */
key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
key->ioqueue = ioqueue;
key->fd = sock;
key->user_data = user_data;
pj_list_init(&key->read_list);
pj_list_init(&key->write_list);
#if PJ_HAS_TCP
pj_list_init(&key->accept_list);
#endif
/* Save callback. */
pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
/* Register */
pj_list_insert_before(&ioqueue->key_list, key);
++ioqueue->count;
on_return:
/* On error, socket may be left in non-blocking mode. */
*p_key = key;
pj_lock_release(ioqueue->lock);
return rc;
}
/*
* pj_ioqueue_unregister()
*
* Unregister handle from ioqueue.
*/
PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
{
pj_ioqueue_t *ioqueue;
PJ_ASSERT_RETURN(key, PJ_EINVAL);
ioqueue = key->ioqueue;
pj_lock_acquire(ioqueue->lock);
pj_assert(ioqueue->count > 0);
--ioqueue->count;
pj_list_erase(key);
PJ_FD_CLR(key->fd, &ioqueue->rfdset);
PJ_FD_CLR(key->fd, &ioqueue->wfdset);
#if PJ_HAS_TCP
PJ_FD_CLR(key->fd, &ioqueue->xfdset);
#endif
pj_lock_release(ioqueue->lock);
return PJ_SUCCESS;
}
/*
* pj_ioqueue_get_user_data()
*
* Obtain value associated with a key.
*/
PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
{
PJ_ASSERT_RETURN(key != NULL, NULL);
return key->user_data;
}
/*
* pj_ioqueue_set_user_data()
*/
PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
void *user_data,
void **old_data)
{
PJ_ASSERT_RETURN(key, PJ_EINVAL);
if (old_data)
*old_data = key->user_data;
key->user_data = user_data;
return PJ_SUCCESS;
}
/* This supposed to check whether the fd_set values are consistent
* with the operation currently set in each key.
*/
#if VALIDATE_FD_SET
static void validate_sets(const pj_ioqueue_t *ioqueue,
const pj_fd_set_t *rfdset,
const pj_fd_set_t *wfdset,
const pj_fd_set_t *xfdset)
{
pj_ioqueue_key_t *key;
key = ioqueue->key_list.next;
while (key != &ioqueue->key_list) {
if (!pj_list_empty(&key->read_list)
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
|| !pj_list_empty(&key->accept_list)
#endif
)
{
pj_assert(PJ_FD_ISSET(key->fd, rfdset));
}
else {
pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
}
if (!pj_list_empty(&key->write_list)
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
|| key->connecting
#endif
)
{
pj_assert(PJ_FD_ISSET(key->fd, wfdset));
}
else {
pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
}
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
if (key->connecting)
{
pj_assert(PJ_FD_ISSET(key->fd, xfdset));
}
else {
pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0);
}
#endif /* PJ_HAS_TCP */
key = key->next;
}
}
#endif /* VALIDATE_FD_SET */
/*
* pj_ioqueue_poll()
*
* Few things worth written:
*
* - we used to do only one callback called per poll, but it didn't go
* very well. The reason is because on some situation, the write
* callback gets called all the time, thus doesn't give the read
* callback to get called. This happens, for example, when user
* submit write operation inside the write callback.
* As the result, we changed the behaviour so that now multiple
* callbacks are called in a single poll. It should be fast too,
* just that we need to be carefull with the ioqueue data structs.
*
* - to guarantee preemptiveness etc, the poll function must strictly
* work on fd_set copy of the ioqueue (not the original one).
*/
PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
{
pj_fd_set_t rfdset, wfdset, xfdset;
int count;
pj_ioqueue_key_t *h;
PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
/* Lock ioqueue before making fd_set copies */
pj_lock_acquire(ioqueue->lock);
/* We will only do select() when there are sockets to be polled.
* Otherwise select() will return error.
*/
if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
PJ_FD_COUNT(&ioqueue->wfdset)==0 &&
PJ_FD_COUNT(&ioqueue->xfdset)==0)
{
pj_lock_release(ioqueue->lock);
if (timeout)
pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
return 0;
}
/* Copy ioqueue's pj_fd_set_t to local variables. */
pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));
pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));
#if PJ_HAS_TCP
pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));
#else
PJ_FD_ZERO(&xfdset);
#endif
#if VALIDATE_FD_SET
validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);
#endif
/* Unlock ioqueue before select(). */
pj_lock_release(ioqueue->lock);
count = pj_sock_select(FD_SETSIZE, &rfdset, &wfdset, &xfdset, timeout);
if (count <= 0)
return count;
/* Lock ioqueue again before scanning for signalled sockets.
* We must strictly use recursive mutex since application may invoke
* the ioqueue again inside the callback.
*/
pj_lock_acquire(ioqueue->lock);
/* Scan for writable sockets first to handle piggy-back data
* coming with accept().
*/
h = ioqueue->key_list.next;
do_writable_scan:
for ( ; h!=&ioqueue->key_list; h = h->next) {
if ( (!pj_list_empty(&h->write_list) || h->connecting)
&& PJ_FD_ISSET(h->fd, &wfdset))
{
break;
}
}
if (h != &ioqueue->key_list) {
pj_assert(!pj_list_empty(&h->write_list) || h->connecting);
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
if (h->connecting) {
/* Completion of connect() operation */
pj_ssize_t bytes_transfered;
#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
/* from connect(2):
* On Linux, use getsockopt to read the SO_ERROR option at
* level SOL_SOCKET to determine whether connect() completed
* successfully (if SO_ERROR is zero).
*/
int value;
socklen_t vallen = sizeof(value);
int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
&value, &vallen);
if (gs_rc != 0) {
/* Argh!! What to do now???
* Just indicate that the socket is connected. The
* application will get error as soon as it tries to use
* the socket to send/receive.
*/
bytes_transfered = 0;
} else {
bytes_transfered = value;
}
#elif defined(PJ_WIN32) && PJ_WIN32!=0
bytes_transfered = 0; /* success */
#else
/* Excellent information in D.J. Bernstein page:
* http://cr.yp.to/docs/connect.html
*
* Seems like the most portable way of detecting connect()
* failure is to call getpeername(). If socket is connected,
* getpeername() will return 0. If the socket is not connected,
* it will return ENOTCONN, and read(fd, &ch, 1) will produce
* the right errno through error slippage. This is a combination
* of suggestions from Douglas C. Schmidt and Ken Keys.
*/
int gp_rc;
struct sockaddr_in addr;
socklen_t addrlen = sizeof(addr);
gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
bytes_transfered = gp_rc;
#endif
/* Clear operation. */
h->connecting = 0;
PJ_FD_CLR(h->fd, &ioqueue->wfdset);
PJ_FD_CLR(h->fd, &ioqueue->xfdset);
/* Call callback. */
if (h->cb.on_connect_complete)
(*h->cb.on_connect_complete)(h, bytes_transfered);
/* Re-scan writable sockets. */
goto do_writable_scan;
} else
#endif /* PJ_HAS_TCP */
{
/* Socket is writable. */
struct write_operation *write_op;
pj_ssize_t sent;
pj_status_t send_rc;
/* Get the first in the queue. */
write_op = h->write_list.next;
/* Send the data. */
sent = write_op->size - write_op->written;
if (write_op->op == PJ_IOQUEUE_OP_SEND) {
send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
&sent, write_op->flags);
} else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
send_rc = pj_sock_sendto(h->fd,
write_op->buf+write_op->written,
&sent, write_op->flags,
&write_op->rmt_addr,
write_op->rmt_addrlen);
} else {
pj_assert(!"Invalid operation type!");
send_rc = PJ_EBUG;
}
if (send_rc == PJ_SUCCESS) {
write_op->written += sent;
} else {
pj_assert(send_rc > 0);
write_op->written = -send_rc;
}
/* In any case we don't need to process this descriptor again. */
PJ_FD_CLR(h->fd, &wfdset);
/* Are we finished with this buffer? */
if (send_rc!=PJ_SUCCESS ||
write_op->written == (pj_ssize_t)write_op->size)
{
pj_list_erase(write_op);
/* Clear operation if there's no more data to send. */
if (pj_list_empty(&h->write_list))
PJ_FD_CLR(h->fd, &ioqueue->wfdset);
/* Call callback. */
if (h->cb.on_write_complete) {
(*h->cb.on_write_complete)(h,
(pj_ioqueue_op_key_t*)write_op,
write_op->written);
}
}
/* Re-scan writable sockets. */
goto do_writable_scan;
}
}
/* Scan for readable socket. */
h = ioqueue->key_list.next;
do_readable_scan:
for ( ; h!=&ioqueue->key_list; h = h->next) {
if ((!pj_list_empty(&h->read_list)
#if PJ_HAS_TCP
|| !pj_list_empty(&h->accept_list)
#endif
) && PJ_FD_ISSET(h->fd, &rfdset))
{
break;
}
}
if (h != &ioqueue->key_list) {
pj_status_t rc;
#if PJ_HAS_TCP
pj_assert(!pj_list_empty(&h->read_list) ||
!pj_list_empty(&h->accept_list));
#else
pj_assert(!pj_list_empty(&h->read_list));
#endif
# if PJ_HAS_TCP
if (!pj_list_empty(&h->accept_list)) {
struct accept_operation *accept_op;
/* Get one accept operation from the list. */
accept_op = h->accept_list.next;
pj_list_erase(accept_op);
rc=pj_sock_accept(h->fd, accept_op->accept_fd,
accept_op->rmt_addr, accept_op->addrlen);
if (rc==PJ_SUCCESS && accept_op->local_addr) {
rc = pj_sock_getsockname(*accept_op->accept_fd,
accept_op->local_addr,
accept_op->addrlen);
}
/* Clear bit in fdset if there is no more pending accept */
if (pj_list_empty(&h->accept_list))
PJ_FD_CLR(h->fd, &ioqueue->rfdset);
/* Call callback. */
if (h->cb.on_accept_complete)
(*h->cb.on_accept_complete)(h, (pj_ioqueue_op_key_t*)accept_op,
*accept_op->accept_fd, rc);
/* Re-scan readable sockets. */
goto do_readable_scan;
}
else {
# endif
struct read_operation *read_op;
pj_ssize_t bytes_read;
pj_assert(!pj_list_empty(&h->read_list));
/* Get one pending read operation from the list. */
read_op = h->read_list.next;
pj_list_erase(read_op);
bytes_read = read_op->size;
if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0,
read_op->rmt_addr,
read_op->rmt_addrlen);
} else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
} else {
pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
/*
* User has specified pj_ioqueue_read().
* On Win32, we should do ReadFile(). But because we got
* here because of select() anyway, user must have put a
* socket descriptor on h->fd, which in this case we can
* just call pj_sock_recv() instead of ReadFile().
* On Unix, user may put a file in h->fd, so we'll have
* to call read() here.
* This may not compile on systems which doesn't have
* read(). That's why we only specify PJ_LINUX here so
* that error is easier to catch.
*/
# if defined(PJ_WIN32) && PJ_WIN32 != 0
rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
//rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
// &bytes_read, NULL);
# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
bytes_read = read(h->fd, h->rd_buf, bytes_read);
rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
bytes_read = sys_read(h->fd, h->rd_buf, bytes_read);
rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
# else
# error "Implement read() for this platform!"
# endif
}
if (rc != PJ_SUCCESS) {
# if defined(PJ_WIN32) && PJ_WIN32 != 0
/* On Win32, for UDP, WSAECONNRESET on the receive side
* indicates that previous sending has triggered ICMP Port
* Unreachable message.
* But we wouldn't know at this point which one of previous
* key that has triggered the error, since UDP socket can
* be shared!
* So we'll just ignore it!
*/
if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
//PJ_LOG(4,(THIS_FILE,
// "Ignored ICMP port unreach. on key=%p", h));
}
# endif
/* In any case we would report this to caller. */
bytes_read = -rc;
}
/* Clear fdset if there is no pending read. */
if (pj_list_empty(&h->read_list))
PJ_FD_CLR(h->fd, &ioqueue->rfdset);
/* In any case clear from temporary set. */
PJ_FD_CLR(h->fd, &rfdset);
/* Call callback. */
if (h->cb.on_read_complete)
(*h->cb.on_read_complete)(h, (pj_ioqueue_op_key_t*)read_op,
bytes_read);
/* Re-scan readable sockets. */
goto do_readable_scan;
}
}
#if PJ_HAS_TCP
/* Scan for exception socket for TCP connection error. */
h = ioqueue->key_list.next;
do_except_scan:
for ( ; h!=&ioqueue->key_list; h = h->next) {
if (h->connecting && PJ_FD_ISSET(h->fd, &xfdset))
break;
}
if (h != &ioqueue->key_list) {
pj_assert(h->connecting);
/* Clear operation. */
h->connecting = 0;
PJ_FD_CLR(h->fd, &ioqueue->wfdset);
PJ_FD_CLR(h->fd, &ioqueue->xfdset);
PJ_FD_CLR(h->fd, &wfdset);
PJ_FD_CLR(h->fd, &xfdset);
/* Call callback. */
if (h->cb.on_connect_complete)
(*h->cb.on_connect_complete)(h, -1);
/* Re-scan exception list. */
goto do_except_scan;
}
#endif /* PJ_HAS_TCP */
/* Shouldn't happen. */
/* For strange reason on WinXP select() can return 1 while there is no
* pj_fd_set_t signaled. */
/* pj_assert(0); */
//count = 0;
pj_lock_release(ioqueue->lock);
return count;
}
/*
* pj_ioqueue_recv()
*
* Start asynchronous recv() from the socket.
*/
PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
void *buffer,
pj_ssize_t *length,
unsigned flags )
{
pj_status_t status;
pj_ssize_t size;
struct read_operation *read_op;
pj_ioqueue_t *ioqueue;
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
PJ_CHECK_STACK();
/* Try to see if there's data immediately available.
*/
size = *length;
status = pj_sock_recv(key->fd, buffer, &size, flags);
if (status == PJ_SUCCESS) {
/* Yes! Data is available! */
*length = size;
return PJ_SUCCESS;
} else {
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
* the error to caller.
*/
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
return status;
}
/*
* No data is immediately available.
* Must schedule asynchronous operation to the ioqueue.
*/
ioqueue = key->ioqueue;
pj_lock_acquire(ioqueue->lock);
read_op = (struct read_operation*)op_key;
read_op->op = PJ_IOQUEUE_OP_RECV;
read_op->buf = buffer;
read_op->size = *length;
read_op->flags = flags;
pj_list_insert_before(&key->read_list, read_op);
PJ_FD_SET(key->fd, &ioqueue->rfdset);
pj_lock_release(ioqueue->lock);
return PJ_EPENDING;
}
/*
* pj_ioqueue_recvfrom()
*
* Start asynchronous recvfrom() from the socket.
*/
PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
void *buffer,
pj_ssize_t *length,
unsigned flags,
pj_sockaddr_t *addr,
int *addrlen)
{
pj_status_t status;
pj_ssize_t size;
struct read_operation *read_op;
pj_ioqueue_t *ioqueue;
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
PJ_CHECK_STACK();
/* Try to see if there's data immediately available.
*/
size = *length;
status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
addr, addrlen);
if (status == PJ_SUCCESS) {
/* Yes! Data is available! */
*length = size;
return PJ_SUCCESS;
} else {
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
* the error to caller.
*/
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
return status;
}
/*
* No data is immediately available.
* Must schedule asynchronous operation to the ioqueue.
*/
ioqueue = key->ioqueue;
pj_lock_acquire(ioqueue->lock);
read_op = (struct read_operation*)op_key;
read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
read_op->buf = buffer;
read_op->size = *length;
read_op->flags = flags;
read_op->rmt_addr = addr;
read_op->rmt_addrlen = addrlen;
pj_list_insert_before(&key->read_list, read_op);
PJ_FD_SET(key->fd, &ioqueue->rfdset);
pj_lock_release(ioqueue->lock);
return PJ_EPENDING;
}
/*
* pj_ioqueue_send()
*
* Start asynchronous send() to the descriptor.
*/
PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
const void *data,
pj_ssize_t *length,
unsigned flags)
{
pj_ioqueue_t *ioqueue;
struct write_operation *write_op;
pj_status_t status;
pj_ssize_t sent;
PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
PJ_CHECK_STACK();
/* Fast track:
* Try to send data immediately, only if there's no pending write!
* Note:
* We are speculating that the list is empty here without properly
* acquiring ioqueue's mutex first. This is intentional, to maximize
* performance via parallelism.
*
* This should be safe, because:
* - by convention, we require caller to make sure that the
* key is not unregistered while other threads are invoking
* an operation on the same key.
* - pj_list_empty() is safe to be invoked by multiple threads,
* even when other threads are modifying the list.
*/
if (pj_list_empty(&key->write_list)) {
/*
* See if data can be sent immediately.
*/
sent = *length;
status = pj_sock_send(key->fd, data, &sent, flags);
if (status == PJ_SUCCESS) {
/* Success! */
*length = sent;
return PJ_SUCCESS;
} else {
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
* the error to caller.
*/
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
return status;
}
}
}
/*
* Schedule asynchronous send.
*/
ioqueue = key->ioqueue;
pj_lock_acquire(ioqueue->lock);
write_op = (struct write_operation*)op_key;
write_op->op = PJ_IOQUEUE_OP_SEND;
write_op->buf = NULL;
write_op->size = *length;
write_op->written = 0;
write_op->flags = flags;
pj_list_insert_before(&key->write_list, write_op);
PJ_FD_SET(key->fd, &ioqueue->wfdset);
pj_lock_release(ioqueue->lock);
return PJ_EPENDING;
}
/*
* pj_ioqueue_sendto()
*
* Start asynchronous write() to the descriptor.
*/
PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
const void *data,
pj_ssize_t *length,
unsigned flags,
const pj_sockaddr_t *addr,
int addrlen)
{
pj_ioqueue_t *ioqueue;
struct write_operation *write_op;
pj_status_t status;
pj_ssize_t sent;
PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
PJ_CHECK_STACK();
/* Fast track:
* Try to send data immediately, only if there's no pending write!
* Note:
* We are speculating that the list is empty here without properly
* acquiring ioqueue's mutex first. This is intentional, to maximize
* performance via parallelism.
*
* This should be safe, because:
* - by convention, we require caller to make sure that the
* key is not unregistered while other threads are invoking
* an operation on the same key.
* - pj_list_empty() is safe to be invoked by multiple threads,
* even when other threads are modifying the list.
*/
if (pj_list_empty(&key->write_list)) {
/*
* See if data can be sent immediately.
*/
sent = *length;
status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
if (status == PJ_SUCCESS) {
/* Success! */
*length = sent;
return PJ_SUCCESS;
} else {
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
* the error to caller.
*/
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
return status;
}
}
}
/*
* Check that address storage can hold the address parameter.
*/
PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);
/*
* Schedule asynchronous send.
*/
ioqueue = key->ioqueue;
pj_lock_acquire(ioqueue->lock);
write_op = (struct write_operation*)op_key;
write_op->op = PJ_IOQUEUE_OP_SEND_TO;
write_op->buf = NULL;
write_op->size = *length;
write_op->written = 0;
write_op->flags = flags;
pj_memcpy(&write_op->rmt_addr, addr, addrlen);
write_op->rmt_addrlen = addrlen;
pj_list_insert_before(&key->write_list, write_op);
PJ_FD_SET(key->fd, &ioqueue->wfdset);
pj_lock_release(ioqueue->lock);
return PJ_EPENDING;
}
#if PJ_HAS_TCP
/*
* Initiate overlapped accept() operation.
*/
PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_sock_t *new_sock,
pj_sockaddr_t *local,
pj_sockaddr_t *remote,
int *addrlen)
{
pj_ioqueue_t *ioqueue;
struct accept_operation *accept_op;
pj_status_t status;
/* check parameters. All must be specified! */
PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
/* Fast track:
* See if there's new connection available immediately.
*/
if (pj_list_empty(&key->accept_list)) {
status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
if (status == PJ_SUCCESS) {
/* Yes! New connection is available! */
if (local && addrlen) {
status = pj_sock_getsockname(*new_sock, local, addrlen);
if (status != PJ_SUCCESS) {
pj_sock_close(*new_sock);
*new_sock = PJ_INVALID_SOCKET;
return status;
}
}
return PJ_SUCCESS;
} else {
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
* the error to caller.
*/
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
return status;
}
}
}
/*
* No connection is available immediately.
* Schedule accept() operation to be completed when there is incoming
* connection available.
*/
ioqueue = key->ioqueue;
accept_op = (struct accept_operation*)op_key;
pj_lock_acquire(ioqueue->lock);
accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
accept_op->accept_fd = new_sock;
accept_op->rmt_addr = remote;
accept_op->addrlen= addrlen;
accept_op->local_addr = local;
pj_list_insert_before(&key->accept_list, accept_op);
PJ_FD_SET(key->fd, &ioqueue->rfdset);
pj_lock_release(ioqueue->lock);
return PJ_EPENDING;
}
/*
* Initiate overlapped connect() operation (well, it's non-blocking actually,
* since there's no overlapped version of connect()).
*/
PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
const pj_sockaddr_t *addr,
int addrlen )
{
pj_ioqueue_t *ioqueue;
pj_status_t status;
/* check parameters. All must be specified! */
PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
/* Check if socket has not been marked for connecting */
if (key->connecting != 0)
return PJ_EPENDING;
status = pj_sock_connect(key->fd, addr, addrlen);
if (status == PJ_SUCCESS) {
/* Connected! */
return PJ_SUCCESS;
} else {
if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
/* Pending! */
ioqueue = key->ioqueue;
pj_lock_acquire(ioqueue->lock);
key->connecting = PJ_TRUE;
PJ_FD_SET(key->fd, &ioqueue->wfdset);
PJ_FD_SET(key->fd, &ioqueue->xfdset);
pj_lock_release(ioqueue->lock);
return PJ_EPENDING;
} else {
/* Error! */
return status;
}
}
}
#endif /* PJ_HAS_TCP */