/* $Id$ */ | |
/* | |
* Copyright (C)2003-2006 Benny Prijono <benny@prijono.org> | |
* | |
* This program is free software; you can redistribute it and/or modify | |
* it under the terms of the GNU General Public License as published by | |
* the Free Software Foundation; either version 2 of the License, or | |
* (at your option) any later version. | |
* | |
* This program is distributed in the hope that it will be useful, | |
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
* GNU General Public License for more details. | |
* | |
* You should have received a copy of the GNU General Public License | |
* along with this program; if not, write to the Free Software | |
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA | |
*/ | |
/* | |
* sock_select.c | |
* | |
* This is the implementation of IOQueue using pj_sock_select(). | |
* It runs anywhere where pj_sock_select() is available (currently | |
* Win32, Linux, Linux kernel, etc.). | |
*/ | |
#include <pj/ioqueue.h> | |
#include <pj/os.h> | |
#include <pj/lock.h> | |
#include <pj/log.h> | |
#include <pj/list.h> | |
#include <pj/pool.h> | |
#include <pj/string.h> | |
#include <pj/assert.h> | |
#include <pj/sock.h> | |
#include <pj/compat/socket.h> | |
#include <pj/sock_select.h> | |
#include <pj/errno.h> | |
/* | |
* Include declaration from common abstraction. | |
*/ | |
#include "ioqueue_common_abs.h" | |
/* | |
* ISSUES with ioqueue_select() | |
* | |
* EAGAIN/EWOULDBLOCK error in recv(): | |
* - when multiple threads are working with the ioqueue, application | |
* may receive EAGAIN or EWOULDBLOCK in the receive callback. | |
* This error happens because more than one thread is watching for | |
* the same descriptor set, so when all of them call recv() or recvfrom() | |
* simultaneously, only one will succeed and the rest will get the error. | |
* | |
*/ | |
#define THIS_FILE "ioq_select" | |
/* | |
* The select ioqueue relies on socket functions (pj_sock_xxx()) to return | |
* the correct error code. | |
*/ | |
#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100) | |
# error "Error reporting must be enabled for this function to work!" | |
#endif | |
/** | |
* Get the number of descriptors in the set. This is defined in sock_select.c | |
* This function will only return the number of sockets set from PJ_FD_SET | |
* operation. When the set is modified by other means (such as by select()), | |
* the count will not be reflected here. | |
* | |
* That's why don't export this function in the header file, to avoid | |
* misunderstanding. | |
* | |
* @param fdsetp The descriptor set. | |
* | |
* @return Number of descriptors in the set. | |
*/ | |
PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp); | |
/* | |
* During debugging build, VALIDATE_FD_SET is set. | |
* This will check the validity of the fd_sets. | |
*/ | |
/* | |
#if defined(PJ_DEBUG) && PJ_DEBUG != 0 | |
# define VALIDATE_FD_SET 1 | |
#else | |
# define VALIDATE_FD_SET 0 | |
#endif | |
*/ | |
#define VALIDATE_FD_SET 0 | |
/* | |
* This describes each key. | |
*/ | |
struct pj_ioqueue_key_t | |
{ | |
DECLARE_COMMON_KEY | |
}; | |
/* | |
* This describes the I/O queue itself. | |
*/ | |
struct pj_ioqueue_t | |
{ | |
DECLARE_COMMON_IOQUEUE | |
unsigned max, count; | |
pj_ioqueue_key_t key_list; | |
pj_fd_set_t rfdset; | |
pj_fd_set_t wfdset; | |
#if PJ_HAS_TCP | |
pj_fd_set_t xfdset; | |
#endif | |
}; | |
/* Include implementation for common abstraction after we declare | |
* pj_ioqueue_key_t and pj_ioqueue_t. | |
*/ | |
#include "ioqueue_common_abs.c" | |
/* | |
* pj_ioqueue_name() | |
*/ | |
PJ_DEF(const char*) pj_ioqueue_name(void) | |
{ | |
return "select"; | |
} | |
/* | |
* pj_ioqueue_create() | |
* | |
* Create select ioqueue. | |
*/ | |
PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, | |
pj_size_t max_fd, | |
pj_ioqueue_t **p_ioqueue) | |
{ | |
pj_ioqueue_t *ioqueue; | |
pj_lock_t *lock; | |
pj_status_t rc; | |
/* Check that arguments are valid. */ | |
PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && | |
max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES, | |
PJ_EINVAL); | |
/* Check that size of pj_ioqueue_op_key_t is sufficient */ | |
PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= | |
sizeof(union operation_key), PJ_EBUG); | |
ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); | |
ioqueue_init(ioqueue); | |
ioqueue->max = max_fd; | |
ioqueue->count = 0; | |
PJ_FD_ZERO(&ioqueue->rfdset); | |
PJ_FD_ZERO(&ioqueue->wfdset); | |
#if PJ_HAS_TCP | |
PJ_FD_ZERO(&ioqueue->xfdset); | |
#endif | |
pj_list_init(&ioqueue->key_list); | |
rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); | |
if (rc != PJ_SUCCESS) | |
return rc; | |
rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE); | |
if (rc != PJ_SUCCESS) | |
return rc; | |
PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue)); | |
*p_ioqueue = ioqueue; | |
return PJ_SUCCESS; | |
} | |
/* | |
* pj_ioqueue_destroy() | |
* | |
* Destroy ioqueue. | |
*/ | |
PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) | |
{ | |
PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); | |
pj_lock_acquire(ioqueue->lock); | |
return ioqueue_destroy(ioqueue); | |
} | |
/* | |
* pj_ioqueue_register_sock() | |
* | |
* Register a handle to ioqueue. | |
*/ | |
PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, | |
pj_ioqueue_t *ioqueue, | |
pj_sock_t sock, | |
void *user_data, | |
const pj_ioqueue_callback *cb, | |
pj_ioqueue_key_t **p_key) | |
{ | |
pj_ioqueue_key_t *key = NULL; | |
pj_uint32_t value; | |
pj_status_t rc = PJ_SUCCESS; | |
PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET && | |
cb && p_key, PJ_EINVAL); | |
pj_lock_acquire(ioqueue->lock); | |
if (ioqueue->count >= ioqueue->max) { | |
rc = PJ_ETOOMANY; | |
goto on_return; | |
} | |
/* Set socket to nonblocking. */ | |
value = 1; | |
#ifdef PJ_WIN32 | |
if (ioctlsocket(sock, FIONBIO, (u_long*)&value)) { | |
#else | |
if (ioctl(sock, FIONBIO, &value)) { | |
#endif | |
rc = pj_get_netos_error(); | |
goto on_return; | |
} | |
/* Create key. */ | |
key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); | |
rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); | |
if (rc != PJ_SUCCESS) { | |
key = NULL; | |
goto on_return; | |
} | |
/* Register */ | |
pj_list_insert_before(&ioqueue->key_list, key); | |
++ioqueue->count; | |
on_return: | |
/* On error, socket may be left in non-blocking mode. */ | |
*p_key = key; | |
pj_lock_release(ioqueue->lock); | |
return rc; | |
} | |
/* | |
* pj_ioqueue_unregister() | |
* | |
* Unregister handle from ioqueue. | |
*/ | |
PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) | |
{ | |
pj_ioqueue_t *ioqueue; | |
PJ_ASSERT_RETURN(key, PJ_EINVAL); | |
ioqueue = key->ioqueue; | |
pj_lock_acquire(ioqueue->lock); | |
pj_assert(ioqueue->count > 0); | |
--ioqueue->count; | |
pj_list_erase(key); | |
PJ_FD_CLR(key->fd, &ioqueue->rfdset); | |
PJ_FD_CLR(key->fd, &ioqueue->wfdset); | |
#if PJ_HAS_TCP | |
PJ_FD_CLR(key->fd, &ioqueue->xfdset); | |
#endif | |
/* ioqueue_destroy may try to acquire key's mutex. | |
* Since normally the order of locking is to lock key's mutex first | |
* then ioqueue's mutex, ioqueue_destroy may deadlock unless we | |
* release ioqueue's mutex first. | |
*/ | |
pj_lock_release(ioqueue->lock); | |
/* Destroy the key. */ | |
ioqueue_destroy_key(key); | |
return PJ_SUCCESS; | |
} | |
/* This supposed to check whether the fd_set values are consistent | |
* with the operation currently set in each key. | |
*/ | |
#if VALIDATE_FD_SET | |
static void validate_sets(const pj_ioqueue_t *ioqueue, | |
const pj_fd_set_t *rfdset, | |
const pj_fd_set_t *wfdset, | |
const pj_fd_set_t *xfdset) | |
{ | |
pj_ioqueue_key_t *key; | |
/* | |
* This basicly would not work anymore. | |
* We need to lock key before performing the check, but we can't do | |
* so because we're holding ioqueue mutex. If we acquire key's mutex | |
* now, the will cause deadlock. | |
*/ | |
pj_assert(0); | |
key = ioqueue->key_list.next; | |
while (key != &ioqueue->key_list) { | |
if (!pj_list_empty(&key->read_list) | |
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 | |
|| !pj_list_empty(&key->accept_list) | |
#endif | |
) | |
{ | |
pj_assert(PJ_FD_ISSET(key->fd, rfdset)); | |
} | |
else { | |
pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0); | |
} | |
if (!pj_list_empty(&key->write_list) | |
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 | |
|| key->connecting | |
#endif | |
) | |
{ | |
pj_assert(PJ_FD_ISSET(key->fd, wfdset)); | |
} | |
else { | |
pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0); | |
} | |
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 | |
if (key->connecting) | |
{ | |
pj_assert(PJ_FD_ISSET(key->fd, xfdset)); | |
} | |
else { | |
pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0); | |
} | |
#endif /* PJ_HAS_TCP */ | |
key = key->next; | |
} | |
} | |
#endif /* VALIDATE_FD_SET */ | |
/* ioqueue_remove_from_set() | |
* This function is called from ioqueue_dispatch_event() to instruct | |
* the ioqueue to remove the specified descriptor from ioqueue's descriptor | |
* set for the specified event. | |
*/ | |
static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, | |
pj_sock_t fd, | |
enum ioqueue_event_type event_type) | |
{ | |
pj_lock_acquire(ioqueue->lock); | |
if (event_type == READABLE_EVENT) | |
PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset); | |
else if (event_type == WRITEABLE_EVENT) | |
PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset); | |
else if (event_type == EXCEPTION_EVENT) | |
PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset); | |
else | |
pj_assert(0); | |
pj_lock_release(ioqueue->lock); | |
} | |
/* | |
* ioqueue_add_to_set() | |
* This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc | |
* to instruct the ioqueue to add the specified handle to ioqueue's descriptor | |
* set for the specified event. | |
*/ | |
static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, | |
pj_sock_t fd, | |
enum ioqueue_event_type event_type ) | |
{ | |
pj_lock_acquire(ioqueue->lock); | |
if (event_type == READABLE_EVENT) | |
PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset); | |
else if (event_type == WRITEABLE_EVENT) | |
PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset); | |
else if (event_type == EXCEPTION_EVENT) | |
PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset); | |
else | |
pj_assert(0); | |
pj_lock_release(ioqueue->lock); | |
} | |
/* | |
* pj_ioqueue_poll() | |
* | |
* Few things worth written: | |
* | |
* - we used to do only one callback called per poll, but it didn't go | |
* very well. The reason is because on some situation, the write | |
* callback gets called all the time, thus doesn't give the read | |
* callback to get called. This happens, for example, when user | |
* submit write operation inside the write callback. | |
* As the result, we changed the behaviour so that now multiple | |
* callbacks are called in a single poll. It should be fast too, | |
* just that we need to be carefull with the ioqueue data structs. | |
* | |
* - to guarantee preemptiveness etc, the poll function must strictly | |
* work on fd_set copy of the ioqueue (not the original one). | |
*/ | |
PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) | |
{ | |
pj_fd_set_t rfdset, wfdset, xfdset; | |
int count, counter; | |
pj_ioqueue_key_t *h; | |
struct event | |
{ | |
pj_ioqueue_key_t *key; | |
enum ioqueue_event_type event_type; | |
} event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; | |
PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); | |
/* Lock ioqueue before making fd_set copies */ | |
pj_lock_acquire(ioqueue->lock); | |
/* We will only do select() when there are sockets to be polled. | |
* Otherwise select() will return error. | |
*/ | |
if (PJ_FD_COUNT(&ioqueue->rfdset)==0 && | |
PJ_FD_COUNT(&ioqueue->wfdset)==0 && | |
PJ_FD_COUNT(&ioqueue->xfdset)==0) | |
{ | |
pj_lock_release(ioqueue->lock); | |
if (timeout) | |
pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout)); | |
return 0; | |
} | |
/* Copy ioqueue's pj_fd_set_t to local variables. */ | |
pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t)); | |
pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t)); | |
#if PJ_HAS_TCP | |
pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t)); | |
#else | |
PJ_FD_ZERO(&xfdset); | |
#endif | |
#if VALIDATE_FD_SET | |
validate_sets(ioqueue, &rfdset, &wfdset, &xfdset); | |
#endif | |
/* Unlock ioqueue before select(). */ | |
pj_lock_release(ioqueue->lock); | |
count = pj_sock_select(FD_SETSIZE, &rfdset, &wfdset, &xfdset, timeout); | |
if (count <= 0) | |
return count; | |
else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) | |
count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL; | |
/* Scan descriptor sets for event and add the events in the event | |
* array to be processed later in this function. We do this so that | |
* events can be processed in parallel without holding ioqueue lock. | |
*/ | |
pj_lock_acquire(ioqueue->lock); | |
counter = 0; | |
/* Scan for writable sockets first to handle piggy-back data | |
* coming with accept(). | |
*/ | |
h = ioqueue->key_list.next; | |
for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) { | |
if ( (key_has_pending_write(h) || key_has_pending_connect(h)) | |
&& PJ_FD_ISSET(h->fd, &wfdset)) | |
{ | |
event[counter].key = h; | |
event[counter].event_type = WRITEABLE_EVENT; | |
++counter; | |
} | |
/* Scan for readable socket. */ | |
if ((key_has_pending_read(h) || key_has_pending_accept(h)) | |
&& PJ_FD_ISSET(h->fd, &rfdset)) | |
{ | |
event[counter].key = h; | |
event[counter].event_type = READABLE_EVENT; | |
++counter; | |
} | |
#if PJ_HAS_TCP | |
if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) { | |
event[counter].key = h; | |
event[counter].event_type = EXCEPTION_EVENT; | |
++counter; | |
} | |
#endif | |
} | |
pj_lock_release(ioqueue->lock); | |
count = counter; | |
/* Now process all events. The dispatch functions will take care | |
* of locking in each of the key | |
*/ | |
for (counter=0; counter<count; ++counter) { | |
switch (event[counter].event_type) { | |
case READABLE_EVENT: | |
ioqueue_dispatch_read_event(ioqueue, event[counter].key); | |
break; | |
case WRITEABLE_EVENT: | |
ioqueue_dispatch_write_event(ioqueue, event[counter].key); | |
break; | |
case EXCEPTION_EVENT: | |
ioqueue_dispatch_exception_event(ioqueue, event[counter].key); | |
break; | |
case NO_EVENT: | |
pj_assert(!"Invalid event!"); | |
break; | |
} | |
} | |
return count; | |
} | |