blob: 93cbb6d557713add46e0be5c48251017f6488503 [file] [log] [blame]
Benny Prijono4766ffe2005-11-01 17:56:59 +00001/* $Id$
Benny Prijonodd859a62005-11-01 16:42:51 +00002 */
3#include <pj/ioqueue.h>
4#include <pj/os.h>
5#include <pj/lock.h>
6#include <pj/pool.h>
7#include <pj/string.h>
8#include <pj/sock.h>
9#include <pj/array.h>
10#include <pj/log.h>
11#include <pj/assert.h>
12#include <pj/errno.h>
13
14
15#if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0
16# include <winsock2.h>
17#elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0
18# include <winsock.h>
19#endif
20
21#if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0
22# include <mswsock.h>
23#endif
24
Benny Prijono40ce3fb2005-11-07 18:14:08 +000025
26/* The address specified in AcceptEx() must be 16 more than the size of
27 * SOCKADDR (source: MSDN).
Benny Prijonoa9946d52005-11-06 09:37:47 +000028 */
29#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16)
Benny Prijono40ce3fb2005-11-07 18:14:08 +000030
31typedef struct generic_overlapped
32{
33 WSAOVERLAPPED overlapped;
34 pj_ioqueue_operation_e operation;
35} generic_overlapped;
36
Benny Prijonodd859a62005-11-01 16:42:51 +000037/*
Benny Prijonoa7f64a32005-11-07 15:47:28 +000038 * OVERLAPPPED structure for send and receive.
Benny Prijonodd859a62005-11-01 16:42:51 +000039 */
40typedef struct ioqueue_overlapped
41{
Benny Prijono40ce3fb2005-11-07 18:14:08 +000042 WSAOVERLAPPED overlapped;
Benny Prijonodd859a62005-11-01 16:42:51 +000043 pj_ioqueue_operation_e operation;
Benny Prijono40ce3fb2005-11-07 18:14:08 +000044 WSABUF wsabuf;
45 pj_sockaddr_in dummy_addr;
46 int dummy_addrlen;
Benny Prijonodd859a62005-11-01 16:42:51 +000047} ioqueue_overlapped;
48
49#if PJ_HAS_TCP
50/*
51 * OVERLAP structure for accept.
52 */
53typedef struct ioqueue_accept_rec
54{
Benny Prijono40ce3fb2005-11-07 18:14:08 +000055 WSAOVERLAPPED overlapped;
Benny Prijonodd859a62005-11-01 16:42:51 +000056 pj_ioqueue_operation_e operation;
57 pj_sock_t newsock;
58 pj_sock_t *newsock_ptr;
59 int *addrlen;
60 void *remote;
61 void *local;
62 char accept_buf[2 * ACCEPT_ADDR_LEN];
63} ioqueue_accept_rec;
64#endif
Benny Prijono40ce3fb2005-11-07 18:14:08 +000065
66/*
67 * Structure to hold pending operation key.
68 */
69union operation_key
70{
71 generic_overlapped generic;
72 ioqueue_overlapped overlapped;
73#if PJ_HAS_TCP
74 ioqueue_accept_rec accept;
75#endif
76};
77
78/* Type of handle in the key. */
79enum handle_type
80{
81 HND_IS_UNKNOWN,
82 HND_IS_FILE,
83 HND_IS_SOCKET,
84};
Benny Prijonodd859a62005-11-01 16:42:51 +000085
86/*
87 * Structure for individual socket.
88 */
89struct pj_ioqueue_key_t
Benny Prijono40ce3fb2005-11-07 18:14:08 +000090{
Benny Prijonoa9946d52005-11-06 09:37:47 +000091 pj_ioqueue_t *ioqueue;
Benny Prijonodd859a62005-11-01 16:42:51 +000092 HANDLE hnd;
Benny Prijono40ce3fb2005-11-07 18:14:08 +000093 void *user_data;
Benny Prijonoa7f64a32005-11-07 15:47:28 +000094 enum handle_type hnd_type;
Benny Prijonodd859a62005-11-01 16:42:51 +000095#if PJ_HAS_TCP
96 int connecting;
Benny Prijonodd859a62005-11-01 16:42:51 +000097#endif
98 pj_ioqueue_callback cb;
99};
100
101/*
102 * IO Queue structure.
103 */
104struct pj_ioqueue_t
105{
106 HANDLE iocp;
107 pj_lock_t *lock;
108 pj_bool_t auto_delete_lock;
109 unsigned event_count;
110 HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1];
111#if PJ_HAS_TCP
112 unsigned connecting_count;
113 HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1];
114 pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1];
115#endif
116};
117
118
119#if PJ_HAS_TCP
120/*
121 * Process the socket when the overlapped accept() completed.
122 */
123static void ioqueue_on_accept_complete(ioqueue_accept_rec *accept_overlapped)
124{
125 struct sockaddr *local;
126 struct sockaddr *remote;
127 int locallen, remotelen;
128
129 PJ_CHECK_STACK();
130
131 /* Operation complete immediately. */
132 GetAcceptExSockaddrs( accept_overlapped->accept_buf,
133 0,
134 ACCEPT_ADDR_LEN,
135 ACCEPT_ADDR_LEN,
136 &local,
137 &locallen,
138 &remote,
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000139 &remotelen);
Benny Prijonoa9946d52005-11-06 09:37:47 +0000140 if (*accept_overlapped->addrlen > locallen) {
141 pj_memcpy(accept_overlapped->local, local, locallen);
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000142 pj_memcpy(accept_overlapped->remote, remote, locallen);
143 } else {
144 pj_memset(accept_overlapped->local, 0, *accept_overlapped->addrlen);
145 pj_memset(accept_overlapped->remote, 0, *accept_overlapped->addrlen);
Benny Prijonoa9946d52005-11-06 09:37:47 +0000146 }
Benny Prijonodd859a62005-11-01 16:42:51 +0000147 *accept_overlapped->addrlen = locallen;
148 if (accept_overlapped->newsock_ptr)
149 *accept_overlapped->newsock_ptr = accept_overlapped->newsock;
150 accept_overlapped->operation = 0;
151 accept_overlapped->newsock = PJ_INVALID_SOCKET;
152}
153
154static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos)
155{
156 pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos];
157 HANDLE hEvent = ioqueue->connecting_handles[pos];
Benny Prijonodd859a62005-11-01 16:42:51 +0000158
159 /* Remove key from array of connecting handles. */
160 pj_array_erase(ioqueue->connecting_keys, sizeof(key),
161 ioqueue->connecting_count, pos);
162 pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE),
163 ioqueue->connecting_count, pos);
164 --ioqueue->connecting_count;
165
166 /* Disassociate the socket from the event. */
167 WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0);
168
169 /* Put event object to pool. */
170 if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) {
171 ioqueue->event_pool[ioqueue->event_count++] = hEvent;
172 } else {
173 /* Shouldn't happen. There should be no more pending connections
174 * than max.
175 */
176 pj_assert(0);
177 CloseHandle(hEvent);
178 }
179
Benny Prijonodd859a62005-11-01 16:42:51 +0000180}
181
182/*
183 * Poll for the completion of non-blocking connect().
184 * If there's a completion, the function return the key of the completed
185 * socket, and 'result' argument contains the connect() result. If connect()
186 * succeeded, 'result' will have value zero, otherwise will have the error
187 * code.
188 */
189static pj_ioqueue_key_t *check_connecting( pj_ioqueue_t *ioqueue,
190 pj_ssize_t *connect_err )
191{
192 pj_ioqueue_key_t *key = NULL;
193
194 if (ioqueue->connecting_count) {
195 DWORD result;
196
197 pj_lock_acquire(ioqueue->lock);
198 result = WaitForMultipleObjects(ioqueue->connecting_count,
199 ioqueue->connecting_handles,
200 FALSE, 0);
201 if (result >= WAIT_OBJECT_0 &&
202 result < WAIT_OBJECT_0+ioqueue->connecting_count)
203 {
204 WSANETWORKEVENTS net_events;
205
206 /* Got completed connect(). */
207 unsigned pos = result - WAIT_OBJECT_0;
208 key = ioqueue->connecting_keys[pos];
209
210 /* See whether connect has succeeded. */
211 WSAEnumNetworkEvents((pj_sock_t)key->hnd,
212 ioqueue->connecting_handles[pos],
213 &net_events);
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000214 *connect_err =
Benny Prijonoa9946d52005-11-06 09:37:47 +0000215 PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]);
Benny Prijonodd859a62005-11-01 16:42:51 +0000216
217 /* Erase socket from pending connect. */
218 erase_connecting_socket(ioqueue, pos);
219 }
220 pj_lock_release(ioqueue->lock);
221 }
222 return key;
223}
224#endif
225
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000226/*
227 * pj_ioqueue_name()
228 */
229PJ_DEF(const char*) pj_ioqueue_name(void)
230{
231 return "iocp";
232}
233
234/*
235 * pj_ioqueue_create()
Benny Prijonoa9946d52005-11-06 09:37:47 +0000236 */
Benny Prijonodd859a62005-11-01 16:42:51 +0000237PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
238 pj_size_t max_fd,
Benny Prijonoa9946d52005-11-06 09:37:47 +0000239 pj_ioqueue_t **p_ioqueue)
Benny Prijonodd859a62005-11-01 16:42:51 +0000240{
Benny Prijonoa9946d52005-11-06 09:37:47 +0000241 pj_ioqueue_t *ioqueue;
Benny Prijonodd859a62005-11-01 16:42:51 +0000242 pj_status_t rc;
243
244 PJ_UNUSED_ARG(max_fd);
Benny Prijonoa9946d52005-11-06 09:37:47 +0000245 PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL);
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000246
247 rc = sizeof(union operation_key);
248
249 /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */
250 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
251 sizeof(union operation_key), PJ_EBUG);
Benny Prijonodd859a62005-11-01 16:42:51 +0000252
Benny Prijonoa9946d52005-11-06 09:37:47 +0000253 ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue));
254 ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
255 if (ioqueue->iocp == NULL)
Benny Prijonodd859a62005-11-01 16:42:51 +0000256 return PJ_RETURN_OS_ERROR(GetLastError());
257
Benny Prijonoa9946d52005-11-06 09:37:47 +0000258 rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock);
Benny Prijonodd859a62005-11-01 16:42:51 +0000259 if (rc != PJ_SUCCESS) {
Benny Prijonoa9946d52005-11-06 09:37:47 +0000260 CloseHandle(ioqueue->iocp);
Benny Prijonodd859a62005-11-01 16:42:51 +0000261 return rc;
262 }
263
Benny Prijonoa9946d52005-11-06 09:37:47 +0000264 ioqueue->auto_delete_lock = PJ_TRUE;
Benny Prijonodd859a62005-11-01 16:42:51 +0000265
Benny Prijonoa9946d52005-11-06 09:37:47 +0000266 *p_ioqueue = ioqueue;
Benny Prijonodd859a62005-11-01 16:42:51 +0000267
Benny Prijonoa9946d52005-11-06 09:37:47 +0000268 PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue));
Benny Prijonodd859a62005-11-01 16:42:51 +0000269 return PJ_SUCCESS;
270}
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000271
272/*
273 * pj_ioqueue_destroy()
Benny Prijonoa9946d52005-11-06 09:37:47 +0000274 */
275PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
Benny Prijonodd859a62005-11-01 16:42:51 +0000276{
277 unsigned i;
278
279 PJ_CHECK_STACK();
Benny Prijonoa9946d52005-11-06 09:37:47 +0000280 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
Benny Prijonodd859a62005-11-01 16:42:51 +0000281
282 /* Destroy events in the pool */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000283 for (i=0; i<ioqueue->event_count; ++i) {
284 CloseHandle(ioqueue->event_pool[i]);
Benny Prijonodd859a62005-11-01 16:42:51 +0000285 }
Benny Prijonoa9946d52005-11-06 09:37:47 +0000286 ioqueue->event_count = 0;
Benny Prijonodd859a62005-11-01 16:42:51 +0000287
Benny Prijonoa9946d52005-11-06 09:37:47 +0000288 if (CloseHandle(ioqueue->iocp) != TRUE)
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000289 return PJ_RETURN_OS_ERROR(GetLastError());
290
291 if (ioqueue->auto_delete_lock)
292 pj_lock_destroy(ioqueue->lock);
293
Benny Prijonoa9946d52005-11-06 09:37:47 +0000294 return PJ_SUCCESS;
Benny Prijonodd859a62005-11-01 16:42:51 +0000295}
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000296
297/*
298 * pj_ioqueue_set_lock()
Benny Prijonoa9946d52005-11-06 09:37:47 +0000299 */
300PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
Benny Prijonodd859a62005-11-01 16:42:51 +0000301 pj_lock_t *lock,
302 pj_bool_t auto_delete )
303{
Benny Prijonoa9946d52005-11-06 09:37:47 +0000304 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
Benny Prijonodd859a62005-11-01 16:42:51 +0000305
Benny Prijonoa9946d52005-11-06 09:37:47 +0000306 if (ioqueue->auto_delete_lock) {
307 pj_lock_destroy(ioqueue->lock);
Benny Prijonodd859a62005-11-01 16:42:51 +0000308 }
309
Benny Prijonoa9946d52005-11-06 09:37:47 +0000310 ioqueue->lock = lock;
311 ioqueue->auto_delete_lock = auto_delete;
Benny Prijonodd859a62005-11-01 16:42:51 +0000312
313 return PJ_SUCCESS;
314}
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000315
316/*
317 * pj_ioqueue_register_sock()
Benny Prijonoa9946d52005-11-06 09:37:47 +0000318 */
Benny Prijonodd859a62005-11-01 16:42:51 +0000319PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
Benny Prijonoa9946d52005-11-06 09:37:47 +0000320 pj_ioqueue_t *ioqueue,
321 pj_sock_t sock,
Benny Prijonodd859a62005-11-01 16:42:51 +0000322 void *user_data,
323 const pj_ioqueue_callback *cb,
324 pj_ioqueue_key_t **key )
325{
326 HANDLE hioq;
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000327 pj_ioqueue_key_t *rec;
Benny Prijonoa9946d52005-11-06 09:37:47 +0000328 u_long value;
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000329 int rc;
Benny Prijonodd859a62005-11-01 16:42:51 +0000330
Benny Prijonoa9946d52005-11-06 09:37:47 +0000331 PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL);
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000332
Benny Prijonoa9946d52005-11-06 09:37:47 +0000333 /* Build the key for this socket. */
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000334 rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
Benny Prijonoa9946d52005-11-06 09:37:47 +0000335 rec->ioqueue = ioqueue;
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000336 rec->hnd = (HANDLE)sock;
Benny Prijonoa7f64a32005-11-07 15:47:28 +0000337 rec->hnd_type = HND_IS_SOCKET;
Benny Prijonodd859a62005-11-01 16:42:51 +0000338 rec->user_data = user_data;
339 pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000340
341 /* Set socket to nonblocking. */
342 value = 1;
343 rc = ioctlsocket(sock, FIONBIO, &value);
344 if (rc != 0) {
345 return PJ_RETURN_OS_ERROR(WSAGetLastError());
346 }
347
Benny Prijonoa9946d52005-11-06 09:37:47 +0000348 /* Associate with IOCP */
349 hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0);
Benny Prijonodd859a62005-11-01 16:42:51 +0000350 if (!hioq) {
351 return PJ_RETURN_OS_ERROR(GetLastError());
352 }
353
354 *key = rec;
355 return PJ_SUCCESS;
356}
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000357
Benny Prijonoa9946d52005-11-06 09:37:47 +0000358/*
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000359 * pj_ioqueue_unregister()
Benny Prijonoa9946d52005-11-06 09:37:47 +0000360 */
361PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
Benny Prijonodd859a62005-11-01 16:42:51 +0000362{
Benny Prijonoa9946d52005-11-06 09:37:47 +0000363 PJ_ASSERT_RETURN(key, PJ_EINVAL);
Benny Prijonodd859a62005-11-01 16:42:51 +0000364
365#if PJ_HAS_TCP
366 if (key->connecting) {
367 unsigned pos;
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000368 pj_ioqueue_t *ioqueue;
369
370 ioqueue = key->ioqueue;
Benny Prijonodd859a62005-11-01 16:42:51 +0000371
372 /* Erase from connecting_handles */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000373 pj_lock_acquire(ioqueue->lock);
374 for (pos=0; pos < ioqueue->connecting_count; ++pos) {
375 if (ioqueue->connecting_keys[pos] == key) {
376 erase_connecting_socket(ioqueue, pos);
Benny Prijonodd859a62005-11-01 16:42:51 +0000377 break;
378 }
379 }
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000380 key->connecting = 0;
Benny Prijonoa9946d52005-11-06 09:37:47 +0000381 pj_lock_release(ioqueue->lock);
Benny Prijonodd859a62005-11-01 16:42:51 +0000382 }
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000383#endif
384 if (key->hnd_type == HND_IS_FILE) {
385 CloseHandle(key->hnd);
Benny Prijonoa7f64a32005-11-07 15:47:28 +0000386 }
Benny Prijonodd859a62005-11-01 16:42:51 +0000387 return PJ_SUCCESS;
388}
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000389
390/*
391 * pj_ioqueue_get_user_data()
Benny Prijonoa9946d52005-11-06 09:37:47 +0000392 */
Benny Prijonodd859a62005-11-01 16:42:51 +0000393PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
394{
395 PJ_ASSERT_RETURN(key, NULL);
396 return key->user_data;
397}
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000398
399/*
400 * pj_ioqueue_set_user_data()
401 */
402PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
403 void *user_data,
404 void **old_data )
405{
406 PJ_ASSERT_RETURN(key, PJ_EINVAL);
407
408 if (old_data)
409 *old_data = key->user_data;
410
411 key->user_data = user_data;
412 return PJ_SUCCESS;
413}
414
415/*
416 * pj_ioqueue_poll()
Benny Prijonoa9946d52005-11-06 09:37:47 +0000417 *
Benny Prijonodd859a62005-11-01 16:42:51 +0000418 * Poll for events.
419 */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000420PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
Benny Prijonodd859a62005-11-01 16:42:51 +0000421{
422 DWORD dwMsec, dwBytesTransfered, dwKey;
Benny Prijonoa9946d52005-11-06 09:37:47 +0000423 generic_overlapped *pOv;
Benny Prijonodd859a62005-11-01 16:42:51 +0000424 pj_ioqueue_key_t *key;
425 pj_ssize_t size_status;
426 BOOL rc;
427
Benny Prijonoa9946d52005-11-06 09:37:47 +0000428 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
Benny Prijonodd859a62005-11-01 16:42:51 +0000429
430 /* Check the connecting array. */
431#if PJ_HAS_TCP
Benny Prijonoa9946d52005-11-06 09:37:47 +0000432 key = check_connecting(ioqueue, &size_status);
Benny Prijonodd859a62005-11-01 16:42:51 +0000433 if (key != NULL) {
434 key->cb.on_connect_complete(key, (int)size_status);
435 return 1;
436 }
437#endif
438
439 /* Calculate miliseconds timeout for GetQueuedCompletionStatus */
440 dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
441
442 /* Poll for completion status. */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000443 rc = GetQueuedCompletionStatus(ioqueue->iocp, &dwBytesTransfered, &dwKey,
444 (OVERLAPPED**)&pOv, dwMsec);
Benny Prijonodd859a62005-11-01 16:42:51 +0000445
446 /* The return value is:
447 * - nonzero if event was dequeued.
Benny Prijonoa9946d52005-11-06 09:37:47 +0000448 * - zero and pOv==NULL if no event was dequeued.
449 * - zero and pOv!=NULL if event for failed I/O was dequeued.
Benny Prijonodd859a62005-11-01 16:42:51 +0000450 */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000451 if (pOv) {
Benny Prijonodd859a62005-11-01 16:42:51 +0000452 /* Event was dequeued for either successfull or failed I/O */
453 key = (pj_ioqueue_key_t*)dwKey;
454 size_status = dwBytesTransfered;
Benny Prijonoa9946d52005-11-06 09:37:47 +0000455 switch (pOv->operation) {
Benny Prijonodd859a62005-11-01 16:42:51 +0000456 case PJ_IOQUEUE_OP_READ:
457 case PJ_IOQUEUE_OP_RECV:
458 case PJ_IOQUEUE_OP_RECV_FROM:
Benny Prijonoa9946d52005-11-06 09:37:47 +0000459 pOv->operation = 0;
Benny Prijonodd859a62005-11-01 16:42:51 +0000460 if (key->cb.on_read_complete)
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000461 key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv,
Benny Prijonoa9946d52005-11-06 09:37:47 +0000462 size_status);
Benny Prijonodd859a62005-11-01 16:42:51 +0000463 break;
464 case PJ_IOQUEUE_OP_WRITE:
465 case PJ_IOQUEUE_OP_SEND:
466 case PJ_IOQUEUE_OP_SEND_TO:
Benny Prijonoa9946d52005-11-06 09:37:47 +0000467 pOv->operation = 0;
Benny Prijonodd859a62005-11-01 16:42:51 +0000468 if (key->cb.on_write_complete)
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000469 key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv,
Benny Prijonoa9946d52005-11-06 09:37:47 +0000470 size_status);
Benny Prijonodd859a62005-11-01 16:42:51 +0000471 break;
472#if PJ_HAS_TCP
473 case PJ_IOQUEUE_OP_ACCEPT:
474 /* special case for accept. */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000475 ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv);
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000476 if (key->cb.on_accept_complete) {
Benny Prijonoa9946d52005-11-06 09:37:47 +0000477 ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv;
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000478 key->cb.on_accept_complete(key,
479 (pj_ioqueue_op_key_t*)pOv,
Benny Prijonoa9946d52005-11-06 09:37:47 +0000480 accept_rec->newsock,
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000481 PJ_SUCCESS);
Benny Prijonoa9946d52005-11-06 09:37:47 +0000482 }
Benny Prijonodd859a62005-11-01 16:42:51 +0000483 break;
484 case PJ_IOQUEUE_OP_CONNECT:
485#endif
486 case PJ_IOQUEUE_OP_NONE:
487 pj_assert(0);
488 break;
489 }
490 return 1;
491 }
492
493 if (GetLastError()==WAIT_TIMEOUT) {
Benny Prijonoa9946d52005-11-06 09:37:47 +0000494 /* Check the connecting array (again). */
Benny Prijonodd859a62005-11-01 16:42:51 +0000495#if PJ_HAS_TCP
Benny Prijonoa9946d52005-11-06 09:37:47 +0000496 key = check_connecting(ioqueue, &size_status);
Benny Prijonodd859a62005-11-01 16:42:51 +0000497 if (key != NULL) {
498 key->cb.on_connect_complete(key, (int)size_status);
499 return 1;
500 }
501#endif
502 return 0;
503 }
504 return -1;
505}
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000506
Benny Prijonodd859a62005-11-01 16:42:51 +0000507/*
Benny Prijonodd859a62005-11-01 16:42:51 +0000508 * pj_ioqueue_recv()
509 *
510 * Initiate overlapped WSARecv() operation.
511 */
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000512PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
Benny Prijonoa9946d52005-11-06 09:37:47 +0000513 pj_ioqueue_op_key_t *op_key,
Benny Prijonodd859a62005-11-01 16:42:51 +0000514 void *buffer,
Benny Prijonoa9946d52005-11-06 09:37:47 +0000515 pj_ssize_t *length,
Benny Prijonodd859a62005-11-01 16:42:51 +0000516 unsigned flags )
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000517{
518 /*
519 * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and
520 * addrlen here. But unfortunately it generates EINVAL... :-(
521 * -bennylp
522 */
523 int rc;
524 DWORD bytesRead;
525 DWORD dwFlags = 0;
526 union operation_key *op_key_rec;
527
528 PJ_CHECK_STACK();
529 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
530
531 op_key_rec = (union operation_key*)op_key->internal__;
532 op_key_rec->overlapped.wsabuf.buf = buffer;
533 op_key_rec->overlapped.wsabuf.len = *length;
534
535 dwFlags = flags;
536
537 /* Try non-overlapped received first to see if data is
538 * immediately available.
539 */
540 rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
541 &bytesRead, &dwFlags, NULL, NULL);
542 if (rc == 0) {
543 *length = bytesRead;
544 return PJ_SUCCESS;
545 } else {
546 DWORD dwError = WSAGetLastError();
547 if (dwError != WSAEWOULDBLOCK) {
548 *length = -1;
549 return PJ_RETURN_OS_ERROR(dwError);
550 }
551 }
552
553 /*
554 * No immediate data available.
555 * Register overlapped Recv() operation.
556 */
557 pj_memset(&op_key_rec->overlapped.overlapped, 0,
558 sizeof(op_key_rec->overlapped.overlapped));
559 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
560
561 rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
562 &bytesRead, &dwFlags,
563 &op_key_rec->overlapped.overlapped, NULL);
564 if (rc == SOCKET_ERROR) {
565 DWORD dwStatus = WSAGetLastError();
566 if (dwStatus!=WSA_IO_PENDING) {
567 *length = -1;
568 return PJ_STATUS_FROM_OS(dwStatus);
569 }
570 }
571
572 /* Pending operation has been scheduled. */
573 return PJ_EPENDING;
Benny Prijonodd859a62005-11-01 16:42:51 +0000574}
575
576/*
577 * pj_ioqueue_recvfrom()
578 *
579 * Initiate overlapped RecvFrom() operation.
580 */
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000581PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
Benny Prijonoa9946d52005-11-06 09:37:47 +0000582 pj_ioqueue_op_key_t *op_key,
Benny Prijonodd859a62005-11-01 16:42:51 +0000583 void *buffer,
Benny Prijonoa9946d52005-11-06 09:37:47 +0000584 pj_ssize_t *length,
Benny Prijonodd859a62005-11-01 16:42:51 +0000585 unsigned flags,
586 pj_sockaddr_t *addr,
587 int *addrlen)
588{
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000589 int rc;
590 DWORD bytesRead;
591 DWORD dwFlags = 0;
592 union operation_key *op_key_rec;
593
594 PJ_CHECK_STACK();
595 PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
596
597 op_key_rec = (union operation_key*)op_key->internal__;
598 op_key_rec->overlapped.wsabuf.buf = buffer;
599 op_key_rec->overlapped.wsabuf.len = *length;
600
601 dwFlags = flags;
602
603 /* Try non-overlapped received first to see if data is
604 * immediately available.
605 */
606 rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
607 &bytesRead, &dwFlags, addr, addrlen, NULL, NULL);
608 if (rc == 0) {
609 *length = bytesRead;
610 return PJ_SUCCESS;
611 } else {
612 DWORD dwError = WSAGetLastError();
613 if (dwError != WSAEWOULDBLOCK) {
614 *length = -1;
615 return PJ_RETURN_OS_ERROR(dwError);
616 }
617 }
618
619 /*
620 * No immediate data available.
621 * Register overlapped Recv() operation.
622 */
623 pj_memset(&op_key_rec->overlapped.overlapped, 0,
624 sizeof(op_key_rec->overlapped.overlapped));
625 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
626
627 rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
628 &bytesRead, &dwFlags, addr, addrlen,
629 &op_key_rec->overlapped.overlapped, NULL);
630 if (rc == SOCKET_ERROR) {
631 DWORD dwStatus = WSAGetLastError();
632 if (dwStatus!=WSA_IO_PENDING) {
633 *length = -1;
634 return PJ_STATUS_FROM_OS(dwStatus);
635 }
636 }
637
638 /* Pending operation has been scheduled. */
639 return PJ_EPENDING;
Benny Prijonodd859a62005-11-01 16:42:51 +0000640}
641
642/*
Benny Prijonodd859a62005-11-01 16:42:51 +0000643 * pj_ioqueue_send()
644 *
645 * Initiate overlapped Send operation.
646 */
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000647PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
Benny Prijonoa9946d52005-11-06 09:37:47 +0000648 pj_ioqueue_op_key_t *op_key,
Benny Prijonodd859a62005-11-01 16:42:51 +0000649 const void *data,
Benny Prijonoa9946d52005-11-06 09:37:47 +0000650 pj_ssize_t *length,
Benny Prijonodd859a62005-11-01 16:42:51 +0000651 unsigned flags )
652{
Benny Prijonoa9946d52005-11-06 09:37:47 +0000653 return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0);
Benny Prijonodd859a62005-11-01 16:42:51 +0000654}
655
656
657/*
658 * pj_ioqueue_sendto()
659 *
660 * Initiate overlapped SendTo operation.
661 */
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000662PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
Benny Prijonoa9946d52005-11-06 09:37:47 +0000663 pj_ioqueue_op_key_t *op_key,
Benny Prijonodd859a62005-11-01 16:42:51 +0000664 const void *data,
Benny Prijonoa9946d52005-11-06 09:37:47 +0000665 pj_ssize_t *length,
Benny Prijonodd859a62005-11-01 16:42:51 +0000666 unsigned flags,
667 const pj_sockaddr_t *addr,
668 int addrlen)
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000669{
670 int rc;
671 DWORD bytesWritten;
672 DWORD dwFlags;
673 union operation_key *op_key_rec;
674
675 PJ_CHECK_STACK();
676 PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);
677
678 op_key_rec = (union operation_key*)op_key->internal__;
679
680 dwFlags = flags;
681
682 /*
683 * First try blocking write.
684 */
685 op_key_rec->overlapped.wsabuf.buf = (void*)data;
686 op_key_rec->overlapped.wsabuf.len = *length;
687
688 rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
689 &bytesWritten, dwFlags, addr, addrlen,
690 NULL, NULL);
691 if (rc == 0) {
692 *length = bytesWritten;
693 return PJ_SUCCESS;
694 } else {
695 DWORD dwStatus = WSAGetLastError();
696 if (dwStatus != WSAEWOULDBLOCK) {
697 *length = -1;
698 return PJ_RETURN_OS_ERROR(dwStatus);
699 }
700 }
701
702 /*
703 * Data can't be sent immediately.
704 * Schedule asynchronous WSASend().
705 */
706 pj_memset(&op_key_rec->overlapped.overlapped, 0,
707 sizeof(op_key_rec->overlapped.overlapped));
708 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND;
709
710 rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
711 &bytesWritten, dwFlags, addr, addrlen,
712 &op_key_rec->overlapped.overlapped, NULL);
713 if (rc == SOCKET_ERROR) {
714 DWORD dwStatus = WSAGetLastError();
715 if (dwStatus!=WSA_IO_PENDING)
716 return PJ_STATUS_FROM_OS(dwStatus);
717 }
718
719 /* Asynchronous operation successfully submitted. */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000720 return PJ_EPENDING;
Benny Prijonodd859a62005-11-01 16:42:51 +0000721}
722
723#if PJ_HAS_TCP
724
725/*
726 * pj_ioqueue_accept()
727 *
728 * Initiate overlapped accept() operation.
729 */
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000730PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
Benny Prijonoa9946d52005-11-06 09:37:47 +0000731 pj_ioqueue_op_key_t *op_key,
732 pj_sock_t *new_sock,
733 pj_sockaddr_t *local,
734 pj_sockaddr_t *remote,
735 int *addrlen)
Benny Prijonodd859a62005-11-01 16:42:51 +0000736{
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000737 BOOL rc;
Benny Prijonodd859a62005-11-01 16:42:51 +0000738 DWORD bytesReceived;
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000739 pj_status_t status;
740 union operation_key *op_key_rec;
Benny Prijonoa9946d52005-11-06 09:37:47 +0000741 SOCKET sock;
Benny Prijonodd859a62005-11-01 16:42:51 +0000742
743 PJ_CHECK_STACK();
Benny Prijonoa9946d52005-11-06 09:37:47 +0000744 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000745
746 /*
747 * See if there is a new connection immediately available.
748 */
749 sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0);
750 if (sock != INVALID_SOCKET) {
751 /* Yes! New socket is available! */
752 int status;
753
754 status = getsockname(sock, local, addrlen);
755 if (status != 0) {
756 DWORD dwError = WSAGetLastError();
757 closesocket(sock);
758 return PJ_RETURN_OS_ERROR(dwError);
759 }
760
761 *new_sock = sock;
762 return PJ_SUCCESS;
763
764 } else {
765 DWORD dwError = WSAGetLastError();
766 if (dwError != WSAEWOULDBLOCK) {
767 return PJ_RETURN_OS_ERROR(dwError);
768 }
769 }
770
771 /*
772 * No connection is immediately available.
773 * Must schedule an asynchronous operation.
774 */
775 op_key_rec = (union operation_key*)op_key->internal__;
Benny Prijonoa9946d52005-11-06 09:37:47 +0000776
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000777 status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0,
Benny Prijonoa9946d52005-11-06 09:37:47 +0000778 &op_key_rec->accept.newsock);
779 if (status != PJ_SUCCESS)
780 return status;
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000781
782 /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket
783 * addresses can be obtained with getsockname() and getpeername().
784 */
785 status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET,
786 SO_UPDATE_ACCEPT_CONTEXT,
787 (char*)&key->hnd, sizeof(SOCKET));
788 /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
789 * So ignore the error status.
790 */
791
792 op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT;
Benny Prijonoa9946d52005-11-06 09:37:47 +0000793 op_key_rec->accept.addrlen = addrlen;
794 op_key_rec->accept.local = local;
795 op_key_rec->accept.remote = remote;
796 op_key_rec->accept.newsock_ptr = new_sock;
797 pj_memset(&op_key_rec->accept.overlapped, 0,
798 sizeof(op_key_rec->accept.overlapped));
Benny Prijonodd859a62005-11-01 16:42:51 +0000799
Benny Prijonoa9946d52005-11-06 09:37:47 +0000800 rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock,
801 op_key_rec->accept.accept_buf,
Benny Prijonodd859a62005-11-01 16:42:51 +0000802 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN,
803 &bytesReceived,
Benny Prijonoa9946d52005-11-06 09:37:47 +0000804 &op_key_rec->accept.overlapped );
Benny Prijonodd859a62005-11-01 16:42:51 +0000805
806 if (rc == TRUE) {
Benny Prijonoa9946d52005-11-06 09:37:47 +0000807 ioqueue_on_accept_complete(&op_key_rec->accept);
Benny Prijonodd859a62005-11-01 16:42:51 +0000808 return PJ_SUCCESS;
809 } else {
810 DWORD dwStatus = WSAGetLastError();
Benny Prijonoa9946d52005-11-06 09:37:47 +0000811 if (dwStatus!=WSA_IO_PENDING)
Benny Prijonodd859a62005-11-01 16:42:51 +0000812 return PJ_STATUS_FROM_OS(dwStatus);
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000813 }
814
815 /* Asynchronous Accept() has been submitted. */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000816 return PJ_EPENDING;
Benny Prijonodd859a62005-11-01 16:42:51 +0000817}
818
819
820/*
821 * pj_ioqueue_connect()
822 *
823 * Initiate overlapped connect() operation (well, it's non-blocking actually,
824 * since there's no overlapped version of connect()).
825 */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000826PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
Benny Prijonodd859a62005-11-01 16:42:51 +0000827 const pj_sockaddr_t *addr,
828 int addrlen )
829{
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000830 HANDLE hEvent;
Benny Prijonoa9946d52005-11-06 09:37:47 +0000831 pj_ioqueue_t *ioqueue;
Benny Prijonodd859a62005-11-01 16:42:51 +0000832
833 PJ_CHECK_STACK();
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000834 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
Benny Prijonodd859a62005-11-01 16:42:51 +0000835
836 /* Initiate connect() */
837 if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) {
838 DWORD dwStatus;
839 dwStatus = WSAGetLastError();
Benny Prijonoa9946d52005-11-06 09:37:47 +0000840 if (dwStatus != WSAEWOULDBLOCK) {
Benny Prijonodd859a62005-11-01 16:42:51 +0000841 return PJ_RETURN_OS_ERROR(dwStatus);
Benny Prijonodd859a62005-11-01 16:42:51 +0000842 }
843 } else {
844 /* Connect has completed immediately! */
Benny Prijonodd859a62005-11-01 16:42:51 +0000845 return PJ_SUCCESS;
846 }
Benny Prijono40ce3fb2005-11-07 18:14:08 +0000847
848 ioqueue = key->ioqueue;
Benny Prijonodd859a62005-11-01 16:42:51 +0000849
850 /* Add to the array of connecting socket to be polled */
851 pj_lock_acquire(ioqueue->lock);
852
853 if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) {
854 pj_lock_release(ioqueue->lock);
855 return PJ_ETOOMANYCONN;
856 }
857
858 /* Get or create event object. */
859 if (ioqueue->event_count) {
860 hEvent = ioqueue->event_pool[ioqueue->event_count - 1];
861 --ioqueue->event_count;
862 } else {
863 hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
864 if (hEvent == NULL) {
865 DWORD dwStatus = GetLastError();
866 pj_lock_release(ioqueue->lock);
867 return PJ_STATUS_FROM_OS(dwStatus);
868 }
869 }
870
871 /* Mark key as connecting.
872 * We can't use array index since key can be removed dynamically.
873 */
874 key->connecting = 1;
875
876 /* Associate socket events to the event object. */
877 if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) {
878 CloseHandle(hEvent);
879 pj_lock_release(ioqueue->lock);
880 return PJ_RETURN_OS_ERROR(WSAGetLastError());
881 }
882
883 /* Add to array. */
884 ioqueue->connecting_keys[ ioqueue->connecting_count ] = key;
885 ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent;
886 ioqueue->connecting_count++;
887
888 pj_lock_release(ioqueue->lock);
889
890 return PJ_EPENDING;
891}
892#endif /* #if PJ_HAS_TCP */
893
Benny Prijono85d3f452005-11-09 15:37:19 +0000894
895
896PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
897 pj_ioqueue_op_key_t *op_key )
898{
899 BOOL rc;
900 DWORD bytesTransfered;
901
902 rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key,
903 &bytesTransfered, FALSE );
904
905 if (rc == FALSE) {
906 return GetLastError()==ERROR_IO_INCOMPLETE;
907 }
908
909 return FALSE;
910}
911
912
913PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
914 pj_ioqueue_op_key_t *op_key,
915 pj_ssize_t bytes_status )
916{
917 BOOL rc;
918
919 rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status,
920 (long)key, (OVERLAPPED*)op_key );
921 if (rc == FALSE) {
922 return PJ_RETURN_OS_ERROR(GetLastError());
923 }
924
925 return PJ_SUCCESS;
926}
927