| /* $Id$ |
| */ |
| #include <pj/ioqueue.h> |
| #include <pj/os.h> |
| #include <pj/lock.h> |
| #include <pj/pool.h> |
| #include <pj/string.h> |
| #include <pj/sock.h> |
| #include <pj/array.h> |
| #include <pj/log.h> |
| #include <pj/assert.h> |
| #include <pj/errno.h> |
| |
| |
| #if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0 |
| # include <winsock2.h> |
| #elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0 |
| # include <winsock.h> |
| #endif |
| |
| #if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0 |
| # include <mswsock.h> |
| #endif |
| |
| |
| /* The address specified in AcceptEx() must be 16 more than the size of |
| * SOCKADDR (source: MSDN). |
| */ |
| #define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16) |
| |
| typedef struct generic_overlapped |
| { |
| WSAOVERLAPPED overlapped; |
| pj_ioqueue_operation_e operation; |
| } generic_overlapped; |
| |
| /* |
| * OVERLAPPPED structure for send and receive. |
| */ |
| typedef struct ioqueue_overlapped |
| { |
| WSAOVERLAPPED overlapped; |
| pj_ioqueue_operation_e operation; |
| WSABUF wsabuf; |
| pj_sockaddr_in dummy_addr; |
| int dummy_addrlen; |
| } ioqueue_overlapped; |
| |
| #if PJ_HAS_TCP |
| /* |
| * OVERLAP structure for accept. |
| */ |
| typedef struct ioqueue_accept_rec |
| { |
| WSAOVERLAPPED overlapped; |
| pj_ioqueue_operation_e operation; |
| pj_sock_t newsock; |
| pj_sock_t *newsock_ptr; |
| int *addrlen; |
| void *remote; |
| void *local; |
| char accept_buf[2 * ACCEPT_ADDR_LEN]; |
| } ioqueue_accept_rec; |
| #endif |
| |
| /* |
| * Structure to hold pending operation key. |
| */ |
| union operation_key |
| { |
| generic_overlapped generic; |
| ioqueue_overlapped overlapped; |
| #if PJ_HAS_TCP |
| ioqueue_accept_rec accept; |
| #endif |
| }; |
| |
| /* Type of handle in the key. */ |
| enum handle_type |
| { |
| HND_IS_UNKNOWN, |
| HND_IS_FILE, |
| HND_IS_SOCKET, |
| }; |
| |
| /* |
| * Structure for individual socket. |
| */ |
| struct pj_ioqueue_key_t |
| { |
| pj_ioqueue_t *ioqueue; |
| HANDLE hnd; |
| void *user_data; |
| enum handle_type hnd_type; |
| #if PJ_HAS_TCP |
| int connecting; |
| #endif |
| pj_ioqueue_callback cb; |
| }; |
| |
| /* |
| * IO Queue structure. |
| */ |
| struct pj_ioqueue_t |
| { |
| HANDLE iocp; |
| pj_lock_t *lock; |
| pj_bool_t auto_delete_lock; |
| unsigned event_count; |
| HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1]; |
| #if PJ_HAS_TCP |
| unsigned connecting_count; |
| HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1]; |
| pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1]; |
| #endif |
| }; |
| |
| |
| #if PJ_HAS_TCP |
| /* |
| * Process the socket when the overlapped accept() completed. |
| */ |
| static void ioqueue_on_accept_complete(ioqueue_accept_rec *accept_overlapped) |
| { |
| struct sockaddr *local; |
| struct sockaddr *remote; |
| int locallen, remotelen; |
| |
| PJ_CHECK_STACK(); |
| |
| /* Operation complete immediately. */ |
| GetAcceptExSockaddrs( accept_overlapped->accept_buf, |
| 0, |
| ACCEPT_ADDR_LEN, |
| ACCEPT_ADDR_LEN, |
| &local, |
| &locallen, |
| &remote, |
| &remotelen); |
| if (*accept_overlapped->addrlen > locallen) { |
| pj_memcpy(accept_overlapped->local, local, locallen); |
| pj_memcpy(accept_overlapped->remote, remote, locallen); |
| } else { |
| pj_memset(accept_overlapped->local, 0, *accept_overlapped->addrlen); |
| pj_memset(accept_overlapped->remote, 0, *accept_overlapped->addrlen); |
| } |
| *accept_overlapped->addrlen = locallen; |
| if (accept_overlapped->newsock_ptr) |
| *accept_overlapped->newsock_ptr = accept_overlapped->newsock; |
| accept_overlapped->operation = 0; |
| accept_overlapped->newsock = PJ_INVALID_SOCKET; |
| } |
| |
| static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos) |
| { |
| pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos]; |
| HANDLE hEvent = ioqueue->connecting_handles[pos]; |
| |
| /* Remove key from array of connecting handles. */ |
| pj_array_erase(ioqueue->connecting_keys, sizeof(key), |
| ioqueue->connecting_count, pos); |
| pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE), |
| ioqueue->connecting_count, pos); |
| --ioqueue->connecting_count; |
| |
| /* Disassociate the socket from the event. */ |
| WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0); |
| |
| /* Put event object to pool. */ |
| if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) { |
| ioqueue->event_pool[ioqueue->event_count++] = hEvent; |
| } else { |
| /* Shouldn't happen. There should be no more pending connections |
| * than max. |
| */ |
| pj_assert(0); |
| CloseHandle(hEvent); |
| } |
| |
| } |
| |
| /* |
| * Poll for the completion of non-blocking connect(). |
| * If there's a completion, the function return the key of the completed |
| * socket, and 'result' argument contains the connect() result. If connect() |
| * succeeded, 'result' will have value zero, otherwise will have the error |
| * code. |
| */ |
| static pj_ioqueue_key_t *check_connecting( pj_ioqueue_t *ioqueue, |
| pj_ssize_t *connect_err ) |
| { |
| pj_ioqueue_key_t *key = NULL; |
| |
| if (ioqueue->connecting_count) { |
| DWORD result; |
| |
| pj_lock_acquire(ioqueue->lock); |
| result = WaitForMultipleObjects(ioqueue->connecting_count, |
| ioqueue->connecting_handles, |
| FALSE, 0); |
| if (result >= WAIT_OBJECT_0 && |
| result < WAIT_OBJECT_0+ioqueue->connecting_count) |
| { |
| WSANETWORKEVENTS net_events; |
| |
| /* Got completed connect(). */ |
| unsigned pos = result - WAIT_OBJECT_0; |
| key = ioqueue->connecting_keys[pos]; |
| |
| /* See whether connect has succeeded. */ |
| WSAEnumNetworkEvents((pj_sock_t)key->hnd, |
| ioqueue->connecting_handles[pos], |
| &net_events); |
| *connect_err = |
| PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]); |
| |
| /* Erase socket from pending connect. */ |
| erase_connecting_socket(ioqueue, pos); |
| } |
| pj_lock_release(ioqueue->lock); |
| } |
| return key; |
| } |
| #endif |
| |
| /* |
| * pj_ioqueue_name() |
| */ |
| PJ_DEF(const char*) pj_ioqueue_name(void) |
| { |
| return "iocp"; |
| } |
| |
| /* |
| * pj_ioqueue_create() |
| */ |
| PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, |
| pj_size_t max_fd, |
| pj_ioqueue_t **p_ioqueue) |
| { |
| pj_ioqueue_t *ioqueue; |
| pj_status_t rc; |
| |
| PJ_UNUSED_ARG(max_fd); |
| PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL); |
| |
| rc = sizeof(union operation_key); |
| |
| /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */ |
| PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= |
| sizeof(union operation_key), PJ_EBUG); |
| |
| ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue)); |
| ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); |
| if (ioqueue->iocp == NULL) |
| return PJ_RETURN_OS_ERROR(GetLastError()); |
| |
| rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock); |
| if (rc != PJ_SUCCESS) { |
| CloseHandle(ioqueue->iocp); |
| return rc; |
| } |
| |
| ioqueue->auto_delete_lock = PJ_TRUE; |
| |
| *p_ioqueue = ioqueue; |
| |
| PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue)); |
| return PJ_SUCCESS; |
| } |
| |
| /* |
| * pj_ioqueue_destroy() |
| */ |
| PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue ) |
| { |
| unsigned i; |
| |
| PJ_CHECK_STACK(); |
| PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); |
| |
| /* Destroy events in the pool */ |
| for (i=0; i<ioqueue->event_count; ++i) { |
| CloseHandle(ioqueue->event_pool[i]); |
| } |
| ioqueue->event_count = 0; |
| |
| if (CloseHandle(ioqueue->iocp) != TRUE) |
| return PJ_RETURN_OS_ERROR(GetLastError()); |
| |
| if (ioqueue->auto_delete_lock) |
| pj_lock_destroy(ioqueue->lock); |
| |
| return PJ_SUCCESS; |
| } |
| |
| /* |
| * pj_ioqueue_set_lock() |
| */ |
| PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, |
| pj_lock_t *lock, |
| pj_bool_t auto_delete ) |
| { |
| PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL); |
| |
| if (ioqueue->auto_delete_lock) { |
| pj_lock_destroy(ioqueue->lock); |
| } |
| |
| ioqueue->lock = lock; |
| ioqueue->auto_delete_lock = auto_delete; |
| |
| return PJ_SUCCESS; |
| } |
| |
| /* |
| * pj_ioqueue_register_sock() |
| */ |
| PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, |
| pj_ioqueue_t *ioqueue, |
| pj_sock_t sock, |
| void *user_data, |
| const pj_ioqueue_callback *cb, |
| pj_ioqueue_key_t **key ) |
| { |
| HANDLE hioq; |
| pj_ioqueue_key_t *rec; |
| u_long value; |
| int rc; |
| |
| PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL); |
| |
| /* Build the key for this socket. */ |
| rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); |
| rec->ioqueue = ioqueue; |
| rec->hnd = (HANDLE)sock; |
| rec->hnd_type = HND_IS_SOCKET; |
| rec->user_data = user_data; |
| pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback)); |
| |
| /* Set socket to nonblocking. */ |
| value = 1; |
| rc = ioctlsocket(sock, FIONBIO, &value); |
| if (rc != 0) { |
| return PJ_RETURN_OS_ERROR(WSAGetLastError()); |
| } |
| |
| /* Associate with IOCP */ |
| hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0); |
| if (!hioq) { |
| return PJ_RETURN_OS_ERROR(GetLastError()); |
| } |
| |
| *key = rec; |
| return PJ_SUCCESS; |
| } |
| |
| /* |
| * pj_ioqueue_unregister() |
| */ |
| PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) |
| { |
| PJ_ASSERT_RETURN(key, PJ_EINVAL); |
| |
| #if PJ_HAS_TCP |
| if (key->connecting) { |
| unsigned pos; |
| pj_ioqueue_t *ioqueue; |
| |
| ioqueue = key->ioqueue; |
| |
| /* Erase from connecting_handles */ |
| pj_lock_acquire(ioqueue->lock); |
| for (pos=0; pos < ioqueue->connecting_count; ++pos) { |
| if (ioqueue->connecting_keys[pos] == key) { |
| erase_connecting_socket(ioqueue, pos); |
| break; |
| } |
| } |
| key->connecting = 0; |
| pj_lock_release(ioqueue->lock); |
| } |
| #endif |
| if (key->hnd_type == HND_IS_FILE) { |
| CloseHandle(key->hnd); |
| } |
| return PJ_SUCCESS; |
| } |
| |
| /* |
| * pj_ioqueue_get_user_data() |
| */ |
| PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) |
| { |
| PJ_ASSERT_RETURN(key, NULL); |
| return key->user_data; |
| } |
| |
| /* |
| * pj_ioqueue_set_user_data() |
| */ |
| PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, |
| void *user_data, |
| void **old_data ) |
| { |
| PJ_ASSERT_RETURN(key, PJ_EINVAL); |
| |
| if (old_data) |
| *old_data = key->user_data; |
| |
| key->user_data = user_data; |
| return PJ_SUCCESS; |
| } |
| |
| /* |
| * pj_ioqueue_poll() |
| * |
| * Poll for events. |
| */ |
| PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) |
| { |
| DWORD dwMsec, dwBytesTransfered, dwKey; |
| generic_overlapped *pOv; |
| pj_ioqueue_key_t *key; |
| pj_ssize_t size_status; |
| BOOL rc; |
| |
| PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); |
| |
| /* Check the connecting array. */ |
| #if PJ_HAS_TCP |
| key = check_connecting(ioqueue, &size_status); |
| if (key != NULL) { |
| key->cb.on_connect_complete(key, (int)size_status); |
| return 1; |
| } |
| #endif |
| |
| /* Calculate miliseconds timeout for GetQueuedCompletionStatus */ |
| dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE; |
| |
| /* Poll for completion status. */ |
| rc = GetQueuedCompletionStatus(ioqueue->iocp, &dwBytesTransfered, &dwKey, |
| (OVERLAPPED**)&pOv, dwMsec); |
| |
| /* The return value is: |
| * - nonzero if event was dequeued. |
| * - zero and pOv==NULL if no event was dequeued. |
| * - zero and pOv!=NULL if event for failed I/O was dequeued. |
| */ |
| if (pOv) { |
| /* Event was dequeued for either successfull or failed I/O */ |
| key = (pj_ioqueue_key_t*)dwKey; |
| size_status = dwBytesTransfered; |
| switch (pOv->operation) { |
| case PJ_IOQUEUE_OP_READ: |
| case PJ_IOQUEUE_OP_RECV: |
| case PJ_IOQUEUE_OP_RECV_FROM: |
| pOv->operation = 0; |
| if (key->cb.on_read_complete) |
| key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv, |
| size_status); |
| break; |
| case PJ_IOQUEUE_OP_WRITE: |
| case PJ_IOQUEUE_OP_SEND: |
| case PJ_IOQUEUE_OP_SEND_TO: |
| pOv->operation = 0; |
| if (key->cb.on_write_complete) |
| key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv, |
| size_status); |
| break; |
| #if PJ_HAS_TCP |
| case PJ_IOQUEUE_OP_ACCEPT: |
| /* special case for accept. */ |
| ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv); |
| if (key->cb.on_accept_complete) { |
| ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv; |
| key->cb.on_accept_complete(key, |
| (pj_ioqueue_op_key_t*)pOv, |
| accept_rec->newsock, |
| PJ_SUCCESS); |
| } |
| break; |
| case PJ_IOQUEUE_OP_CONNECT: |
| #endif |
| case PJ_IOQUEUE_OP_NONE: |
| pj_assert(0); |
| break; |
| } |
| return 1; |
| } |
| |
| if (GetLastError()==WAIT_TIMEOUT) { |
| /* Check the connecting array (again). */ |
| #if PJ_HAS_TCP |
| key = check_connecting(ioqueue, &size_status); |
| if (key != NULL) { |
| key->cb.on_connect_complete(key, (int)size_status); |
| return 1; |
| } |
| #endif |
| return 0; |
| } |
| return -1; |
| } |
| |
| /* |
| * pj_ioqueue_recv() |
| * |
| * Initiate overlapped WSARecv() operation. |
| */ |
| PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, |
| pj_ioqueue_op_key_t *op_key, |
| void *buffer, |
| pj_ssize_t *length, |
| unsigned flags ) |
| { |
| /* |
| * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and |
| * addrlen here. But unfortunately it generates EINVAL... :-( |
| * -bennylp |
| */ |
| int rc; |
| DWORD bytesRead; |
| DWORD dwFlags = 0; |
| union operation_key *op_key_rec; |
| |
| PJ_CHECK_STACK(); |
| PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); |
| |
| op_key_rec = (union operation_key*)op_key->internal__; |
| op_key_rec->overlapped.wsabuf.buf = buffer; |
| op_key_rec->overlapped.wsabuf.len = *length; |
| |
| dwFlags = flags; |
| |
| /* Try non-overlapped received first to see if data is |
| * immediately available. |
| */ |
| rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, |
| &bytesRead, &dwFlags, NULL, NULL); |
| if (rc == 0) { |
| *length = bytesRead; |
| return PJ_SUCCESS; |
| } else { |
| DWORD dwError = WSAGetLastError(); |
| if (dwError != WSAEWOULDBLOCK) { |
| *length = -1; |
| return PJ_RETURN_OS_ERROR(dwError); |
| } |
| } |
| |
| /* |
| * No immediate data available. |
| * Register overlapped Recv() operation. |
| */ |
| pj_memset(&op_key_rec->overlapped.overlapped, 0, |
| sizeof(op_key_rec->overlapped.overlapped)); |
| op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV; |
| |
| rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, |
| &bytesRead, &dwFlags, |
| &op_key_rec->overlapped.overlapped, NULL); |
| if (rc == SOCKET_ERROR) { |
| DWORD dwStatus = WSAGetLastError(); |
| if (dwStatus!=WSA_IO_PENDING) { |
| *length = -1; |
| return PJ_STATUS_FROM_OS(dwStatus); |
| } |
| } |
| |
| /* Pending operation has been scheduled. */ |
| return PJ_EPENDING; |
| } |
| |
| /* |
| * pj_ioqueue_recvfrom() |
| * |
| * Initiate overlapped RecvFrom() operation. |
| */ |
| PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, |
| pj_ioqueue_op_key_t *op_key, |
| void *buffer, |
| pj_ssize_t *length, |
| unsigned flags, |
| pj_sockaddr_t *addr, |
| int *addrlen) |
| { |
| int rc; |
| DWORD bytesRead; |
| DWORD dwFlags = 0; |
| union operation_key *op_key_rec; |
| |
| PJ_CHECK_STACK(); |
| PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL); |
| |
| op_key_rec = (union operation_key*)op_key->internal__; |
| op_key_rec->overlapped.wsabuf.buf = buffer; |
| op_key_rec->overlapped.wsabuf.len = *length; |
| |
| dwFlags = flags; |
| |
| /* Try non-overlapped received first to see if data is |
| * immediately available. |
| */ |
| rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, |
| &bytesRead, &dwFlags, addr, addrlen, NULL, NULL); |
| if (rc == 0) { |
| *length = bytesRead; |
| return PJ_SUCCESS; |
| } else { |
| DWORD dwError = WSAGetLastError(); |
| if (dwError != WSAEWOULDBLOCK) { |
| *length = -1; |
| return PJ_RETURN_OS_ERROR(dwError); |
| } |
| } |
| |
| /* |
| * No immediate data available. |
| * Register overlapped Recv() operation. |
| */ |
| pj_memset(&op_key_rec->overlapped.overlapped, 0, |
| sizeof(op_key_rec->overlapped.overlapped)); |
| op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV; |
| |
| rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, |
| &bytesRead, &dwFlags, addr, addrlen, |
| &op_key_rec->overlapped.overlapped, NULL); |
| if (rc == SOCKET_ERROR) { |
| DWORD dwStatus = WSAGetLastError(); |
| if (dwStatus!=WSA_IO_PENDING) { |
| *length = -1; |
| return PJ_STATUS_FROM_OS(dwStatus); |
| } |
| } |
| |
| /* Pending operation has been scheduled. */ |
| return PJ_EPENDING; |
| } |
| |
| /* |
| * pj_ioqueue_send() |
| * |
| * Initiate overlapped Send operation. |
| */ |
| PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, |
| pj_ioqueue_op_key_t *op_key, |
| const void *data, |
| pj_ssize_t *length, |
| unsigned flags ) |
| { |
| return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0); |
| } |
| |
| |
| /* |
| * pj_ioqueue_sendto() |
| * |
| * Initiate overlapped SendTo operation. |
| */ |
| PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, |
| pj_ioqueue_op_key_t *op_key, |
| const void *data, |
| pj_ssize_t *length, |
| unsigned flags, |
| const pj_sockaddr_t *addr, |
| int addrlen) |
| { |
| int rc; |
| DWORD bytesWritten; |
| DWORD dwFlags; |
| union operation_key *op_key_rec; |
| |
| PJ_CHECK_STACK(); |
| PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL); |
| |
| op_key_rec = (union operation_key*)op_key->internal__; |
| |
| dwFlags = flags; |
| |
| /* |
| * First try blocking write. |
| */ |
| op_key_rec->overlapped.wsabuf.buf = (void*)data; |
| op_key_rec->overlapped.wsabuf.len = *length; |
| |
| rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, |
| &bytesWritten, dwFlags, addr, addrlen, |
| NULL, NULL); |
| if (rc == 0) { |
| *length = bytesWritten; |
| return PJ_SUCCESS; |
| } else { |
| DWORD dwStatus = WSAGetLastError(); |
| if (dwStatus != WSAEWOULDBLOCK) { |
| *length = -1; |
| return PJ_RETURN_OS_ERROR(dwStatus); |
| } |
| } |
| |
| /* |
| * Data can't be sent immediately. |
| * Schedule asynchronous WSASend(). |
| */ |
| pj_memset(&op_key_rec->overlapped.overlapped, 0, |
| sizeof(op_key_rec->overlapped.overlapped)); |
| op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND; |
| |
| rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, |
| &bytesWritten, dwFlags, addr, addrlen, |
| &op_key_rec->overlapped.overlapped, NULL); |
| if (rc == SOCKET_ERROR) { |
| DWORD dwStatus = WSAGetLastError(); |
| if (dwStatus!=WSA_IO_PENDING) |
| return PJ_STATUS_FROM_OS(dwStatus); |
| } |
| |
| /* Asynchronous operation successfully submitted. */ |
| return PJ_EPENDING; |
| } |
| |
| #if PJ_HAS_TCP |
| |
| /* |
| * pj_ioqueue_accept() |
| * |
| * Initiate overlapped accept() operation. |
| */ |
| PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, |
| pj_ioqueue_op_key_t *op_key, |
| pj_sock_t *new_sock, |
| pj_sockaddr_t *local, |
| pj_sockaddr_t *remote, |
| int *addrlen) |
| { |
| BOOL rc; |
| DWORD bytesReceived; |
| pj_status_t status; |
| union operation_key *op_key_rec; |
| SOCKET sock; |
| |
| PJ_CHECK_STACK(); |
| PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); |
| |
| /* |
| * See if there is a new connection immediately available. |
| */ |
| sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0); |
| if (sock != INVALID_SOCKET) { |
| /* Yes! New socket is available! */ |
| int status; |
| |
| status = getsockname(sock, local, addrlen); |
| if (status != 0) { |
| DWORD dwError = WSAGetLastError(); |
| closesocket(sock); |
| return PJ_RETURN_OS_ERROR(dwError); |
| } |
| |
| *new_sock = sock; |
| return PJ_SUCCESS; |
| |
| } else { |
| DWORD dwError = WSAGetLastError(); |
| if (dwError != WSAEWOULDBLOCK) { |
| return PJ_RETURN_OS_ERROR(dwError); |
| } |
| } |
| |
| /* |
| * No connection is immediately available. |
| * Must schedule an asynchronous operation. |
| */ |
| op_key_rec = (union operation_key*)op_key->internal__; |
| |
| status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, |
| &op_key_rec->accept.newsock); |
| if (status != PJ_SUCCESS) |
| return status; |
| |
| /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket |
| * addresses can be obtained with getsockname() and getpeername(). |
| */ |
| status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET, |
| SO_UPDATE_ACCEPT_CONTEXT, |
| (char*)&key->hnd, sizeof(SOCKET)); |
| /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later. |
| * So ignore the error status. |
| */ |
| |
| op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT; |
| op_key_rec->accept.addrlen = addrlen; |
| op_key_rec->accept.local = local; |
| op_key_rec->accept.remote = remote; |
| op_key_rec->accept.newsock_ptr = new_sock; |
| pj_memset(&op_key_rec->accept.overlapped, 0, |
| sizeof(op_key_rec->accept.overlapped)); |
| |
| rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock, |
| op_key_rec->accept.accept_buf, |
| 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN, |
| &bytesReceived, |
| &op_key_rec->accept.overlapped ); |
| |
| if (rc == TRUE) { |
| ioqueue_on_accept_complete(&op_key_rec->accept); |
| return PJ_SUCCESS; |
| } else { |
| DWORD dwStatus = WSAGetLastError(); |
| if (dwStatus!=WSA_IO_PENDING) |
| return PJ_STATUS_FROM_OS(dwStatus); |
| } |
| |
| /* Asynchronous Accept() has been submitted. */ |
| return PJ_EPENDING; |
| } |
| |
| |
| /* |
| * pj_ioqueue_connect() |
| * |
| * Initiate overlapped connect() operation (well, it's non-blocking actually, |
| * since there's no overlapped version of connect()). |
| */ |
| PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, |
| const pj_sockaddr_t *addr, |
| int addrlen ) |
| { |
| HANDLE hEvent; |
| pj_ioqueue_t *ioqueue; |
| |
| PJ_CHECK_STACK(); |
| PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); |
| |
| /* Initiate connect() */ |
| if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) { |
| DWORD dwStatus; |
| dwStatus = WSAGetLastError(); |
| if (dwStatus != WSAEWOULDBLOCK) { |
| return PJ_RETURN_OS_ERROR(dwStatus); |
| } |
| } else { |
| /* Connect has completed immediately! */ |
| return PJ_SUCCESS; |
| } |
| |
| ioqueue = key->ioqueue; |
| |
| /* Add to the array of connecting socket to be polled */ |
| pj_lock_acquire(ioqueue->lock); |
| |
| if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) { |
| pj_lock_release(ioqueue->lock); |
| return PJ_ETOOMANYCONN; |
| } |
| |
| /* Get or create event object. */ |
| if (ioqueue->event_count) { |
| hEvent = ioqueue->event_pool[ioqueue->event_count - 1]; |
| --ioqueue->event_count; |
| } else { |
| hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); |
| if (hEvent == NULL) { |
| DWORD dwStatus = GetLastError(); |
| pj_lock_release(ioqueue->lock); |
| return PJ_STATUS_FROM_OS(dwStatus); |
| } |
| } |
| |
| /* Mark key as connecting. |
| * We can't use array index since key can be removed dynamically. |
| */ |
| key->connecting = 1; |
| |
| /* Associate socket events to the event object. */ |
| if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) { |
| CloseHandle(hEvent); |
| pj_lock_release(ioqueue->lock); |
| return PJ_RETURN_OS_ERROR(WSAGetLastError()); |
| } |
| |
| /* Add to array. */ |
| ioqueue->connecting_keys[ ioqueue->connecting_count ] = key; |
| ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent; |
| ioqueue->connecting_count++; |
| |
| pj_lock_release(ioqueue->lock); |
| |
| return PJ_EPENDING; |
| } |
| #endif /* #if PJ_HAS_TCP */ |
| |
|
|
|
|
| PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
|
| pj_ioqueue_op_key_t *op_key )
|
| {
|
| BOOL rc;
|
| DWORD bytesTransfered;
|
|
|
| rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key,
|
| &bytesTransfered, FALSE );
|
|
|
| if (rc == FALSE) {
|
| return GetLastError()==ERROR_IO_INCOMPLETE;
|
| }
|
|
|
| return FALSE;
|
| }
|
|
|
|
|
| PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
|
| pj_ioqueue_op_key_t *op_key,
|
| pj_ssize_t bytes_status )
|
| {
|
| BOOL rc;
|
|
|
| rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status,
|
| (long)key, (OVERLAPPED*)op_key );
|
| if (rc == FALSE) {
|
| return PJ_RETURN_OS_ERROR(GetLastError());
|
| }
|
|
|
| return PJ_SUCCESS;
|
| }
|
|
|