blob: 254d653010d5e00335acdffdf39afc8fdc28ba00 [file] [log] [blame]
/* $Id$ */
/*
* Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
* Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <pj/ioqueue.h>
#include <pj/os.h>
#include <pj/lock.h>
#include <pj/pool.h>
#include <pj/string.h>
#include <pj/sock.h>
#include <pj/array.h>
#include <pj/log.h>
#include <pj/assert.h>
#include <pj/errno.h>
#include <pj/compat/socket.h>
#if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0
# include <winsock2.h>
#elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0
# include <winsock.h>
#endif
#if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0
# include <mswsock.h>
#endif
/* The address specified in AcceptEx() must be 16 more than the size of
* SOCKADDR (source: MSDN).
*/
#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16)
typedef struct generic_overlapped
{
WSAOVERLAPPED overlapped;
pj_ioqueue_operation_e operation;
} generic_overlapped;
/*
* OVERLAPPPED structure for send and receive.
*/
typedef struct ioqueue_overlapped
{
WSAOVERLAPPED overlapped;
pj_ioqueue_operation_e operation;
WSABUF wsabuf;
pj_sockaddr_in dummy_addr;
int dummy_addrlen;
} ioqueue_overlapped;
#if PJ_HAS_TCP
/*
* OVERLAP structure for accept.
*/
typedef struct ioqueue_accept_rec
{
WSAOVERLAPPED overlapped;
pj_ioqueue_operation_e operation;
pj_sock_t newsock;
pj_sock_t *newsock_ptr;
int *addrlen;
void *remote;
void *local;
char accept_buf[2 * ACCEPT_ADDR_LEN];
} ioqueue_accept_rec;
#endif
/*
* Structure to hold pending operation key.
*/
union operation_key
{
generic_overlapped generic;
ioqueue_overlapped overlapped;
#if PJ_HAS_TCP
ioqueue_accept_rec accept;
#endif
};
/* Type of handle in the key. */
enum handle_type
{
HND_IS_UNKNOWN,
HND_IS_FILE,
HND_IS_SOCKET,
};
enum { POST_QUIT_LEN = 0xFFFFDEADUL };
/*
* Structure for individual socket.
*/
struct pj_ioqueue_key_t
{
PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
pj_ioqueue_t *ioqueue;
HANDLE hnd;
void *user_data;
enum handle_type hnd_type;
pj_ioqueue_callback cb;
pj_bool_t allow_concurrent;
#if PJ_HAS_TCP
int connecting;
#endif
#if PJ_IOQUEUE_HAS_SAFE_UNREG
pj_atomic_t *ref_count;
pj_bool_t closing;
pj_time_val free_time;
pj_mutex_t *mutex;
#endif
};
/*
* IO Queue structure.
*/
struct pj_ioqueue_t
{
HANDLE iocp;
pj_lock_t *lock;
pj_bool_t auto_delete_lock;
pj_bool_t default_concurrency;
#if PJ_IOQUEUE_HAS_SAFE_UNREG
pj_ioqueue_key_t active_list;
pj_ioqueue_key_t free_list;
pj_ioqueue_key_t closing_list;
#endif
/* These are to keep track of connecting sockets */
#if PJ_HAS_TCP
unsigned event_count;
HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1];
unsigned connecting_count;
HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1];
pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1];
#endif
};
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Prototype */
static void scan_closing_keys(pj_ioqueue_t *ioqueue);
#endif
#if PJ_HAS_TCP
/*
* Process the socket when the overlapped accept() completed.
*/
static void ioqueue_on_accept_complete(ioqueue_accept_rec *accept_overlapped)
{
struct sockaddr *local;
struct sockaddr *remote;
int locallen, remotelen;
PJ_CHECK_STACK();
/* Operation complete immediately. */
if (accept_overlapped->addrlen) {
GetAcceptExSockaddrs( accept_overlapped->accept_buf,
0,
ACCEPT_ADDR_LEN,
ACCEPT_ADDR_LEN,
&local,
&locallen,
&remote,
&remotelen);
if (*accept_overlapped->addrlen >= locallen) {
if (accept_overlapped->local)
pj_memcpy(accept_overlapped->local, local, locallen);
if (accept_overlapped->remote)
pj_memcpy(accept_overlapped->remote, remote, locallen);
} else {
if (accept_overlapped->local)
pj_bzero(accept_overlapped->local,
*accept_overlapped->addrlen);
if (accept_overlapped->remote)
pj_bzero(accept_overlapped->remote,
*accept_overlapped->addrlen);
}
*accept_overlapped->addrlen = locallen;
}
if (accept_overlapped->newsock_ptr)
*accept_overlapped->newsock_ptr = accept_overlapped->newsock;
accept_overlapped->operation = 0;
}
static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos)
{
pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos];
HANDLE hEvent = ioqueue->connecting_handles[pos];
/* Remove key from array of connecting handles. */
pj_array_erase(ioqueue->connecting_keys, sizeof(key),
ioqueue->connecting_count, pos);
pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE),
ioqueue->connecting_count, pos);
--ioqueue->connecting_count;
/* Disassociate the socket from the event. */
WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0);
/* Put event object to pool. */
if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) {
ioqueue->event_pool[ioqueue->event_count++] = hEvent;
} else {
/* Shouldn't happen. There should be no more pending connections
* than max.
*/
pj_assert(0);
CloseHandle(hEvent);
}
}
/*
* Poll for the completion of non-blocking connect().
* If there's a completion, the function return the key of the completed
* socket, and 'result' argument contains the connect() result. If connect()
* succeeded, 'result' will have value zero, otherwise will have the error
* code.
*/
static int check_connecting( pj_ioqueue_t *ioqueue )
{
if (ioqueue->connecting_count) {
int i, count;
struct
{
pj_ioqueue_key_t *key;
pj_status_t status;
} events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1];
pj_lock_acquire(ioqueue->lock);
for (count=0; count<PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1; ++count) {
DWORD result;
result = WaitForMultipleObjects(ioqueue->connecting_count,
ioqueue->connecting_handles,
FALSE, 0);
if (result >= WAIT_OBJECT_0 &&
result < WAIT_OBJECT_0+ioqueue->connecting_count)
{
WSANETWORKEVENTS net_events;
/* Got completed connect(). */
unsigned pos = result - WAIT_OBJECT_0;
events[count].key = ioqueue->connecting_keys[pos];
/* See whether connect has succeeded. */
WSAEnumNetworkEvents((pj_sock_t)events[count].key->hnd,
ioqueue->connecting_handles[pos],
&net_events);
events[count].status =
PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]);
/* Erase socket from pending connect. */
erase_connecting_socket(ioqueue, pos);
} else {
/* No more events */
break;
}
}
pj_lock_release(ioqueue->lock);
/* Call callbacks. */
for (i=0; i<count; ++i) {
if (events[i].key->cb.on_connect_complete) {
events[i].key->cb.on_connect_complete(events[i].key,
events[i].status);
}
}
return count;
}
return 0;
}
#endif
/*
* pj_ioqueue_name()
*/
PJ_DEF(const char*) pj_ioqueue_name(void)
{
return "iocp";
}
/*
* pj_ioqueue_create()
*/
PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
pj_size_t max_fd,
pj_ioqueue_t **p_ioqueue)
{
pj_ioqueue_t *ioqueue;
unsigned i;
pj_status_t rc;
PJ_UNUSED_ARG(max_fd);
PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL);
rc = sizeof(union operation_key);
/* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */
PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
sizeof(union operation_key), PJ_EBUG);
/* Create IOCP */
ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue));
ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (ioqueue->iocp == NULL)
return PJ_RETURN_OS_ERROR(GetLastError());
/* Create IOCP mutex */
rc = pj_lock_create_recursive_mutex(pool, NULL, &ioqueue->lock);
if (rc != PJ_SUCCESS) {
CloseHandle(ioqueue->iocp);
return rc;
}
ioqueue->auto_delete_lock = PJ_TRUE;
ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/*
* Create and initialize key pools.
*/
pj_list_init(&ioqueue->active_list);
pj_list_init(&ioqueue->free_list);
pj_list_init(&ioqueue->closing_list);
/* Preallocate keys according to max_fd setting, and put them
* in free_list.
*/
for (i=0; i<max_fd; ++i) {
pj_ioqueue_key_t *key;
key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t));
rc = pj_atomic_create(pool, 0, &key->ref_count);
if (rc != PJ_SUCCESS) {
key = ioqueue->free_list.next;
while (key != &ioqueue->free_list) {
pj_atomic_destroy(key->ref_count);
pj_mutex_destroy(key->mutex);
key = key->next;
}
CloseHandle(ioqueue->iocp);
return rc;
}
rc = pj_mutex_create_recursive(pool, "ioqkey", &key->mutex);
if (rc != PJ_SUCCESS) {
pj_atomic_destroy(key->ref_count);
key = ioqueue->free_list.next;
while (key != &ioqueue->free_list) {
pj_atomic_destroy(key->ref_count);
pj_mutex_destroy(key->mutex);
key = key->next;
}
CloseHandle(ioqueue->iocp);
return rc;
}
pj_list_push_back(&ioqueue->free_list, key);
}
#endif
*p_ioqueue = ioqueue;
PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue));
return PJ_SUCCESS;
}
/*
* pj_ioqueue_destroy()
*/
PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
{
#if PJ_HAS_TCP
unsigned i;
#endif
pj_ioqueue_key_t *key;
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
pj_lock_acquire(ioqueue->lock);
#if PJ_HAS_TCP
/* Destroy events in the pool */
for (i=0; i<ioqueue->event_count; ++i) {
CloseHandle(ioqueue->event_pool[i]);
}
ioqueue->event_count = 0;
#endif
if (CloseHandle(ioqueue->iocp) != TRUE)
return PJ_RETURN_OS_ERROR(GetLastError());
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Destroy reference counters */
key = ioqueue->active_list.next;
while (key != &ioqueue->active_list) {
pj_atomic_destroy(key->ref_count);
pj_mutex_destroy(key->mutex);
key = key->next;
}
key = ioqueue->closing_list.next;
while (key != &ioqueue->closing_list) {
pj_atomic_destroy(key->ref_count);
pj_mutex_destroy(key->mutex);
key = key->next;
}
key = ioqueue->free_list.next;
while (key != &ioqueue->free_list) {
pj_atomic_destroy(key->ref_count);
pj_mutex_destroy(key->mutex);
key = key->next;
}
#endif
if (ioqueue->auto_delete_lock)
pj_lock_destroy(ioqueue->lock);
return PJ_SUCCESS;
}
PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue,
pj_bool_t allow)
{
PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
ioqueue->default_concurrency = allow;
return PJ_SUCCESS;
}
/*
* pj_ioqueue_set_lock()
*/
PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
pj_lock_t *lock,
pj_bool_t auto_delete )
{
PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
if (ioqueue->auto_delete_lock) {
pj_lock_destroy(ioqueue->lock);
}
ioqueue->lock = lock;
ioqueue->auto_delete_lock = auto_delete;
return PJ_SUCCESS;
}
/*
* pj_ioqueue_register_sock()
*/
PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
pj_ioqueue_t *ioqueue,
pj_sock_t sock,
void *user_data,
const pj_ioqueue_callback *cb,
pj_ioqueue_key_t **key )
{
HANDLE hioq;
pj_ioqueue_key_t *rec;
u_long value;
int rc;
PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL);
pj_lock_acquire(ioqueue->lock);
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Scan closing list first to release unused keys.
* Must do this with lock acquired.
*/
scan_closing_keys(ioqueue);
/* If safe unregistration is used, then get the key record from
* the free list.
*/
if (pj_list_empty(&ioqueue->free_list)) {
pj_lock_release(ioqueue->lock);
return PJ_ETOOMANY;
}
rec = ioqueue->free_list.next;
pj_list_erase(rec);
/* Set initial reference count to 1 */
pj_assert(pj_atomic_get(rec->ref_count) == 0);
pj_atomic_inc(rec->ref_count);
rec->closing = 0;
#else
rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
#endif
/* Build the key for this socket. */
rec->ioqueue = ioqueue;
rec->hnd = (HANDLE)sock;
rec->hnd_type = HND_IS_SOCKET;
rec->user_data = user_data;
pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
/* Set concurrency for this handle */
rc = pj_ioqueue_set_concurrency(rec, ioqueue->default_concurrency);
if (rc != PJ_SUCCESS) {
pj_lock_release(ioqueue->lock);
return rc;
}
#if PJ_HAS_TCP
rec->connecting = 0;
#endif
/* Set socket to nonblocking. */
value = 1;
rc = ioctlsocket(sock, FIONBIO, &value);
if (rc != 0) {
pj_lock_release(ioqueue->lock);
return PJ_RETURN_OS_ERROR(WSAGetLastError());
}
/* Associate with IOCP */
hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0);
if (!hioq) {
pj_lock_release(ioqueue->lock);
return PJ_RETURN_OS_ERROR(GetLastError());
}
*key = rec;
#if PJ_IOQUEUE_HAS_SAFE_UNREG
pj_list_push_back(&ioqueue->active_list, rec);
#endif
pj_lock_release(ioqueue->lock);
return PJ_SUCCESS;
}
/*
* pj_ioqueue_get_user_data()
*/
PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
{
PJ_ASSERT_RETURN(key, NULL);
return key->user_data;
}
/*
* pj_ioqueue_set_user_data()
*/
PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
void *user_data,
void **old_data )
{
PJ_ASSERT_RETURN(key, PJ_EINVAL);
if (old_data)
*old_data = key->user_data;
key->user_data = user_data;
return PJ_SUCCESS;
}
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Decrement the key's reference counter, and when the counter reach zero,
* destroy the key.
*/
static void decrement_counter(pj_ioqueue_key_t *key)
{
if (pj_atomic_dec_and_get(key->ref_count) == 0) {
pj_lock_acquire(key->ioqueue->lock);
pj_assert(key->closing == 1);
pj_gettimeofday(&key->free_time);
key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
pj_time_val_normalize(&key->free_time);
pj_list_erase(key);
pj_list_push_back(&key->ioqueue->closing_list, key);
pj_lock_release(key->ioqueue->lock);
}
}
#endif
/*
* Poll the I/O Completion Port, execute callback,
* and return the key and bytes transfered of the last operation.
*/
static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key )
{
DWORD dwBytesTransfered, dwKey;
generic_overlapped *pOv;
pj_ioqueue_key_t *key;
pj_ssize_t size_status = -1;
BOOL rcGetQueued;
/* Poll for completion status. */
rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransfered,
&dwKey, (OVERLAPPED**)&pOv,
dwTimeout);
/* The return value is:
* - nonzero if event was dequeued.
* - zero and pOv==NULL if no event was dequeued.
* - zero and pOv!=NULL if event for failed I/O was dequeued.
*/
if (pOv) {
pj_bool_t has_lock;
/* Event was dequeued for either successfull or failed I/O */
key = (pj_ioqueue_key_t*)dwKey;
size_status = dwBytesTransfered;
/* Report to caller regardless */
if (p_bytes)
*p_bytes = size_status;
if (p_key)
*p_key = key;
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* We shouldn't call callbacks if key is quitting. */
if (key->closing)
return PJ_TRUE;
/* If concurrency is disabled, lock the key
* (and save the lock status to local var since app may change
* concurrency setting while in the callback) */
if (key->allow_concurrent == PJ_FALSE) {
pj_mutex_lock(key->mutex);
has_lock = PJ_TRUE;
} else {
has_lock = PJ_FALSE;
}
/* Now that we get the lock, check again that key is not closing */
if (key->closing) {
if (has_lock) {
pj_mutex_unlock(key->mutex);
}
return PJ_TRUE;
}
/* Increment reference counter to prevent this key from being
* deleted
*/
pj_atomic_inc(key->ref_count);
#else
PJ_UNUSED_ARG(has_lock);
#endif
/* Carry out the callback */
switch (pOv->operation) {
case PJ_IOQUEUE_OP_READ:
case PJ_IOQUEUE_OP_RECV:
case PJ_IOQUEUE_OP_RECV_FROM:
pOv->operation = 0;
if (key->cb.on_read_complete)
key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv,
size_status);
break;
case PJ_IOQUEUE_OP_WRITE:
case PJ_IOQUEUE_OP_SEND:
case PJ_IOQUEUE_OP_SEND_TO:
pOv->operation = 0;
if (key->cb.on_write_complete)
key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv,
size_status);
break;
#if PJ_HAS_TCP
case PJ_IOQUEUE_OP_ACCEPT:
/* special case for accept. */
ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv);
if (key->cb.on_accept_complete) {
ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv;
pj_status_t status = PJ_SUCCESS;
pj_sock_t newsock;
newsock = accept_rec->newsock;
accept_rec->newsock = PJ_INVALID_SOCKET;
if (newsock == PJ_INVALID_SOCKET) {
int dwError = WSAGetLastError();
if (dwError == 0) dwError = OSERR_ENOTCONN;
status = PJ_RETURN_OS_ERROR(dwError);
}
key->cb.on_accept_complete(key, (pj_ioqueue_op_key_t*)pOv,
newsock, status);
}
break;
case PJ_IOQUEUE_OP_CONNECT:
#endif
case PJ_IOQUEUE_OP_NONE:
pj_assert(0);
break;
}
#if PJ_IOQUEUE_HAS_SAFE_UNREG
decrement_counter(key);
if (has_lock)
pj_mutex_unlock(key->mutex);
#endif
return PJ_TRUE;
}
/* No event was queued. */
return PJ_FALSE;
}
/*
* pj_ioqueue_unregister()
*/
PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
{
unsigned i;
pj_bool_t has_lock;
enum { RETRY = 10 };
PJ_ASSERT_RETURN(key, PJ_EINVAL);
#if PJ_HAS_TCP
if (key->connecting) {
unsigned pos;
pj_ioqueue_t *ioqueue;
ioqueue = key->ioqueue;
/* Erase from connecting_handles */
pj_lock_acquire(ioqueue->lock);
for (pos=0; pos < ioqueue->connecting_count; ++pos) {
if (ioqueue->connecting_keys[pos] == key) {
erase_connecting_socket(ioqueue, pos);
break;
}
}
key->connecting = 0;
pj_lock_release(ioqueue->lock);
}
#endif
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Mark key as closing before closing handle. */
key->closing = 1;
/* If concurrency is disabled, wait until the key has finished
* processing the callback
*/
if (key->allow_concurrent == PJ_FALSE) {
pj_mutex_lock(key->mutex);
has_lock = PJ_TRUE;
} else {
has_lock = PJ_FALSE;
}
#else
PJ_UNUSED_ARG(has_lock);
#endif
/* Close handle (the only way to disassociate handle from IOCP).
* We also need to close handle to make sure that no further events
* will come to the handle.
*/
/* Update 2008/07/18 (http://trac.pjsip.org/repos/ticket/575):
* - It seems that CloseHandle() in itself does not actually close
* the socket (i.e. it will still appear in "netstat" output). Also
* if we only use CloseHandle(), an "Invalid Handle" exception will
* be raised in WSACleanup().
* - MSDN documentation says that CloseHandle() must be called after
* closesocket() call (see
* http://msdn.microsoft.com/en-us/library/ms724211(VS.85).aspx).
* But turns out that this will raise "Invalid Handle" exception
* in debug mode.
* So because of this, we replaced CloseHandle() with closesocket()
* instead. These was tested on WinXP SP2.
*/
//CloseHandle(key->hnd);
pj_sock_close((pj_sock_t)key->hnd);
/* Reset callbacks */
key->cb.on_accept_complete = NULL;
key->cb.on_connect_complete = NULL;
key->cb.on_read_complete = NULL;
key->cb.on_write_complete = NULL;
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Even after handle is closed, I suspect that IOCP may still try to
* do something with the handle, causing memory corruption when pool
* debugging is enabled.
*
* Forcing context switch seems to have fixed that, but this is quite
* an ugly solution..
*
* Update 2008/02/13:
* This should not happen if concurrency is disallowed for the key.
* So at least application has a solution for this (i.e. by disallowing
* concurrency in the key).
*/
//This will loop forever if unregistration is done on the callback.
//Doing this with RETRY I think should solve the IOCP setting the
//socket signalled, without causing the deadlock.
//while (pj_atomic_get(key->ref_count) != 1)
// pj_thread_sleep(0);
for (i=0; pj_atomic_get(key->ref_count) != 1 && i<RETRY; ++i)
pj_thread_sleep(0);
/* Decrement reference counter to destroy the key. */
decrement_counter(key);
if (has_lock)
pj_mutex_unlock(key->mutex);
#endif
return PJ_SUCCESS;
}
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Scan the closing list, and put pending closing keys to free list.
* Must do this with ioqueue mutex held.
*/
static void scan_closing_keys(pj_ioqueue_t *ioqueue)
{
if (!pj_list_empty(&ioqueue->closing_list)) {
pj_time_val now;
pj_ioqueue_key_t *key;
pj_gettimeofday(&now);
/* Move closing keys to free list when they've finished the closing
* idle time.
*/
key = ioqueue->closing_list.next;
while (key != &ioqueue->closing_list) {
pj_ioqueue_key_t *next = key->next;
pj_assert(key->closing != 0);
if (PJ_TIME_VAL_GTE(now, key->free_time)) {
pj_list_erase(key);
pj_list_push_back(&ioqueue->free_list, key);
}
key = next;
}
}
}
#endif
/*
* pj_ioqueue_poll()
*
* Poll for events.
*/
PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
{
DWORD dwMsec;
#if PJ_HAS_TCP
int connect_count = 0;
#endif
int event_count = 0;
PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
/* Calculate miliseconds timeout for GetQueuedCompletionStatus */
dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
/* Poll for completion status. */
event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);
#if PJ_HAS_TCP
/* Check the connecting array, only when there's no activity. */
if (event_count == 0) {
connect_count = check_connecting(ioqueue);
if (connect_count > 0)
event_count += connect_count;
}
#endif
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Check the closing keys only when there's no activity and when there are
* pending closing keys.
*/
if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) {
pj_lock_acquire(ioqueue->lock);
scan_closing_keys(ioqueue);
pj_lock_release(ioqueue->lock);
}
#endif
/* Return number of events. */
return event_count;
}
/*
* pj_ioqueue_recv()
*
* Initiate overlapped WSARecv() operation.
*/
PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
void *buffer,
pj_ssize_t *length,
pj_uint32_t flags )
{
/*
* Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and
* addrlen here. But unfortunately it generates EINVAL... :-(
* -bennylp
*/
int rc;
DWORD bytesRead;
DWORD dwFlags = 0;
union operation_key *op_key_rec;
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Check key is not closing */
if (key->closing)
return PJ_ECANCELLED;
#endif
op_key_rec = (union operation_key*)op_key->internal__;
op_key_rec->overlapped.wsabuf.buf = buffer;
op_key_rec->overlapped.wsabuf.len = *length;
dwFlags = flags;
/* Try non-overlapped received first to see if data is
* immediately available.
*/
if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
&bytesRead, &dwFlags, NULL, NULL);
if (rc == 0) {
*length = bytesRead;
return PJ_SUCCESS;
} else {
DWORD dwError = WSAGetLastError();
if (dwError != WSAEWOULDBLOCK) {
*length = -1;
return PJ_RETURN_OS_ERROR(dwError);
}
}
}
dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
/*
* No immediate data available.
* Register overlapped Recv() operation.
*/
pj_bzero( &op_key_rec->overlapped.overlapped,
sizeof(op_key_rec->overlapped.overlapped));
op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
&bytesRead, &dwFlags,
&op_key_rec->overlapped.overlapped, NULL);
if (rc == SOCKET_ERROR) {
DWORD dwStatus = WSAGetLastError();
if (dwStatus!=WSA_IO_PENDING) {
*length = -1;
return PJ_STATUS_FROM_OS(dwStatus);
}
}
/* Pending operation has been scheduled. */
return PJ_EPENDING;
}
/*
* pj_ioqueue_recvfrom()
*
* Initiate overlapped RecvFrom() operation.
*/
PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
void *buffer,
pj_ssize_t *length,
pj_uint32_t flags,
pj_sockaddr_t *addr,
int *addrlen)
{
int rc;
DWORD bytesRead;
DWORD dwFlags = 0;
union operation_key *op_key_rec;
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Check key is not closing */
if (key->closing)
return PJ_ECANCELLED;
#endif
op_key_rec = (union operation_key*)op_key->internal__;
op_key_rec->overlapped.wsabuf.buf = buffer;
op_key_rec->overlapped.wsabuf.len = *length;
dwFlags = flags;
/* Try non-overlapped received first to see if data is
* immediately available.
*/
if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
&bytesRead, &dwFlags, addr, addrlen, NULL, NULL);
if (rc == 0) {
*length = bytesRead;
return PJ_SUCCESS;
} else {
DWORD dwError = WSAGetLastError();
if (dwError != WSAEWOULDBLOCK) {
*length = -1;
return PJ_RETURN_OS_ERROR(dwError);
}
}
}
dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
/*
* No immediate data available.
* Register overlapped Recv() operation.
*/
pj_bzero( &op_key_rec->overlapped.overlapped,
sizeof(op_key_rec->overlapped.overlapped));
op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
&bytesRead, &dwFlags, addr, addrlen,
&op_key_rec->overlapped.overlapped, NULL);
if (rc == SOCKET_ERROR) {
DWORD dwStatus = WSAGetLastError();
if (dwStatus!=WSA_IO_PENDING) {
*length = -1;
return PJ_STATUS_FROM_OS(dwStatus);
}
}
/* Pending operation has been scheduled. */
return PJ_EPENDING;
}
/*
* pj_ioqueue_send()
*
* Initiate overlapped Send operation.
*/
PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
const void *data,
pj_ssize_t *length,
pj_uint32_t flags )
{
return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0);
}
/*
* pj_ioqueue_sendto()
*
* Initiate overlapped SendTo operation.
*/
PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
const void *data,
pj_ssize_t *length,
pj_uint32_t flags,
const pj_sockaddr_t *addr,
int addrlen)
{
int rc;
DWORD bytesWritten;
DWORD dwFlags;
union operation_key *op_key_rec;
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Check key is not closing */
if (key->closing)
return PJ_ECANCELLED;
#endif
op_key_rec = (union operation_key*)op_key->internal__;
/*
* First try blocking write.
*/
op_key_rec->overlapped.wsabuf.buf = (void*)data;
op_key_rec->overlapped.wsabuf.len = *length;
dwFlags = flags;
if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
&bytesWritten, dwFlags, addr, addrlen,
NULL, NULL);
if (rc == 0) {
*length = bytesWritten;
return PJ_SUCCESS;
} else {
DWORD dwStatus = WSAGetLastError();
if (dwStatus != WSAEWOULDBLOCK) {
*length = -1;
return PJ_RETURN_OS_ERROR(dwStatus);
}
}
}
dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
/*
* Data can't be sent immediately.
* Schedule asynchronous WSASend().
*/
pj_bzero( &op_key_rec->overlapped.overlapped,
sizeof(op_key_rec->overlapped.overlapped));
op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND;
rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
&bytesWritten, dwFlags, addr, addrlen,
&op_key_rec->overlapped.overlapped, NULL);
if (rc == SOCKET_ERROR) {
DWORD dwStatus = WSAGetLastError();
if (dwStatus!=WSA_IO_PENDING)
return PJ_STATUS_FROM_OS(dwStatus);
}
/* Asynchronous operation successfully submitted. */
return PJ_EPENDING;
}
#if PJ_HAS_TCP
/*
* pj_ioqueue_accept()
*
* Initiate overlapped accept() operation.
*/
PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_sock_t *new_sock,
pj_sockaddr_t *local,
pj_sockaddr_t *remote,
int *addrlen)
{
BOOL rc;
DWORD bytesReceived;
pj_status_t status;
union operation_key *op_key_rec;
SOCKET sock;
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Check key is not closing */
if (key->closing)
return PJ_ECANCELLED;
#endif
/*
* See if there is a new connection immediately available.
*/
sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0);
if (sock != INVALID_SOCKET) {
/* Yes! New socket is available! */
if (local && addrlen) {
int status;
status = getsockname(sock, local, addrlen);
if (status != 0) {
DWORD dwError = WSAGetLastError();
closesocket(sock);
return PJ_RETURN_OS_ERROR(dwError);
}
}
*new_sock = sock;
return PJ_SUCCESS;
} else {
DWORD dwError = WSAGetLastError();
if (dwError != WSAEWOULDBLOCK) {
return PJ_RETURN_OS_ERROR(dwError);
}
}
/*
* No connection is immediately available.
* Must schedule an asynchronous operation.
*/
op_key_rec = (union operation_key*)op_key->internal__;
status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0,
&op_key_rec->accept.newsock);
if (status != PJ_SUCCESS)
return status;
/* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket
* addresses can be obtained with getsockname() and getpeername().
*/
status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT,
(char*)&key->hnd, sizeof(SOCKET));
/* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
* So ignore the error status.
*/
op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT;
op_key_rec->accept.addrlen = addrlen;
op_key_rec->accept.local = local;
op_key_rec->accept.remote = remote;
op_key_rec->accept.newsock_ptr = new_sock;
pj_bzero( &op_key_rec->accept.overlapped,
sizeof(op_key_rec->accept.overlapped));
rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock,
op_key_rec->accept.accept_buf,
0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN,
&bytesReceived,
&op_key_rec->accept.overlapped );
if (rc == TRUE) {
ioqueue_on_accept_complete(&op_key_rec->accept);
return PJ_SUCCESS;
} else {
DWORD dwStatus = WSAGetLastError();
if (dwStatus!=WSA_IO_PENDING)
return PJ_STATUS_FROM_OS(dwStatus);
}
/* Asynchronous Accept() has been submitted. */
return PJ_EPENDING;
}
/*
* pj_ioqueue_connect()
*
* Initiate overlapped connect() operation (well, it's non-blocking actually,
* since there's no overlapped version of connect()).
*/
PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
const pj_sockaddr_t *addr,
int addrlen )
{
HANDLE hEvent;
pj_ioqueue_t *ioqueue;
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Check key is not closing */
if (key->closing)
return PJ_ECANCELLED;
#endif
/* Initiate connect() */
if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) {
DWORD dwStatus;
dwStatus = WSAGetLastError();
if (dwStatus != WSAEWOULDBLOCK) {
return PJ_RETURN_OS_ERROR(dwStatus);
}
} else {
/* Connect has completed immediately! */
return PJ_SUCCESS;
}
ioqueue = key->ioqueue;
/* Add to the array of connecting socket to be polled */
pj_lock_acquire(ioqueue->lock);
if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) {
pj_lock_release(ioqueue->lock);
return PJ_ETOOMANYCONN;
}
/* Get or create event object. */
if (ioqueue->event_count) {
hEvent = ioqueue->event_pool[ioqueue->event_count - 1];
--ioqueue->event_count;
} else {
hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
if (hEvent == NULL) {
DWORD dwStatus = GetLastError();
pj_lock_release(ioqueue->lock);
return PJ_STATUS_FROM_OS(dwStatus);
}
}
/* Mark key as connecting.
* We can't use array index since key can be removed dynamically.
*/
key->connecting = 1;
/* Associate socket events to the event object. */
if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) {
CloseHandle(hEvent);
pj_lock_release(ioqueue->lock);
return PJ_RETURN_OS_ERROR(WSAGetLastError());
}
/* Add to array. */
ioqueue->connecting_keys[ ioqueue->connecting_count ] = key;
ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent;
ioqueue->connecting_count++;
pj_lock_release(ioqueue->lock);
return PJ_EPENDING;
}
#endif /* #if PJ_HAS_TCP */
PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
pj_size_t size )
{
pj_bzero(op_key, size);
}
PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key )
{
BOOL rc;
DWORD bytesTransfered;
rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key,
&bytesTransfered, FALSE );
if (rc == FALSE) {
return GetLastError()==ERROR_IO_INCOMPLETE;
}
return FALSE;
}
PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_status )
{
BOOL rc;
rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status,
(long)key, (OVERLAPPED*)op_key );
if (rc == FALSE) {
return PJ_RETURN_OS_ERROR(GetLastError());
}
return PJ_SUCCESS;
}
PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
pj_bool_t allow)
{
PJ_ASSERT_RETURN(key, PJ_EINVAL);
/* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
* disabled.
*/
PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
key->allow_concurrent = allow;
return PJ_SUCCESS;
}
PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
{
#if PJ_IOQUEUE_HAS_SAFE_UNREG
return pj_mutex_lock(key->mutex);
#else
PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
#endif
}
PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
{
#if PJ_IOQUEUE_HAS_SAFE_UNREG
return pj_mutex_unlock(key->mutex);
#else
PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
#endif
}