blob: 71977cbde28ccb2c63c995c713bc2d0debb6aa8d [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
3 * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19#include <pj/ioqueue.h>
20#include <pj/os.h>
21#include <pj/lock.h>
22#include <pj/pool.h>
23#include <pj/string.h>
24#include <pj/sock.h>
25#include <pj/array.h>
26#include <pj/log.h>
27#include <pj/assert.h>
28#include <pj/errno.h>
Benny Prijono9c025eb2006-07-10 21:35:27 +000029#include <pj/compat/socket.h>
Benny Prijono9033e312005-11-21 02:08:39 +000030
31
32#if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0
33# include <winsock2.h>
34#elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0
35# include <winsock.h>
36#endif
37
38#if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0
39# include <mswsock.h>
40#endif
41
42
43/* The address specified in AcceptEx() must be 16 more than the size of
44 * SOCKADDR (source: MSDN).
45 */
46#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16)
47
48typedef struct generic_overlapped
49{
50 WSAOVERLAPPED overlapped;
51 pj_ioqueue_operation_e operation;
52} generic_overlapped;
53
54/*
55 * OVERLAPPPED structure for send and receive.
56 */
57typedef struct ioqueue_overlapped
58{
59 WSAOVERLAPPED overlapped;
60 pj_ioqueue_operation_e operation;
61 WSABUF wsabuf;
62 pj_sockaddr_in dummy_addr;
63 int dummy_addrlen;
64} ioqueue_overlapped;
65
66#if PJ_HAS_TCP
67/*
68 * OVERLAP structure for accept.
69 */
70typedef struct ioqueue_accept_rec
71{
72 WSAOVERLAPPED overlapped;
73 pj_ioqueue_operation_e operation;
74 pj_sock_t newsock;
75 pj_sock_t *newsock_ptr;
76 int *addrlen;
77 void *remote;
78 void *local;
79 char accept_buf[2 * ACCEPT_ADDR_LEN];
80} ioqueue_accept_rec;
81#endif
82
83/*
84 * Structure to hold pending operation key.
85 */
86union operation_key
87{
88 generic_overlapped generic;
89 ioqueue_overlapped overlapped;
90#if PJ_HAS_TCP
91 ioqueue_accept_rec accept;
92#endif
93};
94
95/* Type of handle in the key. */
96enum handle_type
97{
98 HND_IS_UNKNOWN,
99 HND_IS_FILE,
100 HND_IS_SOCKET,
101};
102
Benny Prijono8d317a02006-03-22 11:49:19 +0000103enum { POST_QUIT_LEN = 0xFFFFDEADUL };
104
Benny Prijono9033e312005-11-21 02:08:39 +0000105/*
106 * Structure for individual socket.
107 */
108struct pj_ioqueue_key_t
109{
Benny Prijono5accbd02006-03-30 16:32:18 +0000110 PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
111
Benny Prijono9033e312005-11-21 02:08:39 +0000112 pj_ioqueue_t *ioqueue;
113 HANDLE hnd;
114 void *user_data;
115 enum handle_type hnd_type;
Benny Prijono5accbd02006-03-30 16:32:18 +0000116 pj_ioqueue_callback cb;
117
Benny Prijono9033e312005-11-21 02:08:39 +0000118#if PJ_HAS_TCP
119 int connecting;
120#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000121
122#if PJ_IOQUEUE_HAS_SAFE_UNREG
123 pj_atomic_t *ref_count;
124 pj_bool_t closing;
125 pj_time_val free_time;
126#endif
127
Benny Prijono9033e312005-11-21 02:08:39 +0000128};
129
130/*
131 * IO Queue structure.
132 */
133struct pj_ioqueue_t
134{
135 HANDLE iocp;
136 pj_lock_t *lock;
137 pj_bool_t auto_delete_lock;
Benny Prijono5accbd02006-03-30 16:32:18 +0000138
139#if PJ_IOQUEUE_HAS_SAFE_UNREG
140 pj_ioqueue_key_t active_list;
141 pj_ioqueue_key_t free_list;
142 pj_ioqueue_key_t closing_list;
143#endif
144
145 /* These are to keep track of connecting sockets */
146#if PJ_HAS_TCP
Benny Prijono9033e312005-11-21 02:08:39 +0000147 unsigned event_count;
148 HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1];
Benny Prijono9033e312005-11-21 02:08:39 +0000149 unsigned connecting_count;
150 HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1];
151 pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1];
152#endif
153};
154
155
156#if PJ_HAS_TCP
157/*
158 * Process the socket when the overlapped accept() completed.
159 */
160static void ioqueue_on_accept_complete(ioqueue_accept_rec *accept_overlapped)
161{
162 struct sockaddr *local;
163 struct sockaddr *remote;
164 int locallen, remotelen;
165
166 PJ_CHECK_STACK();
167
168 /* Operation complete immediately. */
169 GetAcceptExSockaddrs( accept_overlapped->accept_buf,
170 0,
171 ACCEPT_ADDR_LEN,
172 ACCEPT_ADDR_LEN,
173 &local,
174 &locallen,
175 &remote,
176 &remotelen);
Benny Prijono9c025eb2006-07-10 21:35:27 +0000177 if (*accept_overlapped->addrlen >= locallen) {
Benny Prijono9033e312005-11-21 02:08:39 +0000178 pj_memcpy(accept_overlapped->local, local, locallen);
179 pj_memcpy(accept_overlapped->remote, remote, locallen);
180 } else {
Benny Prijonoac623b32006-07-03 15:19:31 +0000181 pj_bzero(accept_overlapped->local, *accept_overlapped->addrlen);
182 pj_bzero(accept_overlapped->remote, *accept_overlapped->addrlen);
Benny Prijono9033e312005-11-21 02:08:39 +0000183 }
184 *accept_overlapped->addrlen = locallen;
185 if (accept_overlapped->newsock_ptr)
186 *accept_overlapped->newsock_ptr = accept_overlapped->newsock;
187 accept_overlapped->operation = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000188}
189
190static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos)
191{
192 pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos];
193 HANDLE hEvent = ioqueue->connecting_handles[pos];
194
195 /* Remove key from array of connecting handles. */
196 pj_array_erase(ioqueue->connecting_keys, sizeof(key),
197 ioqueue->connecting_count, pos);
198 pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE),
199 ioqueue->connecting_count, pos);
200 --ioqueue->connecting_count;
201
202 /* Disassociate the socket from the event. */
203 WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0);
204
205 /* Put event object to pool. */
206 if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) {
207 ioqueue->event_pool[ioqueue->event_count++] = hEvent;
208 } else {
209 /* Shouldn't happen. There should be no more pending connections
210 * than max.
211 */
212 pj_assert(0);
213 CloseHandle(hEvent);
214 }
215
216}
217
218/*
219 * Poll for the completion of non-blocking connect().
220 * If there's a completion, the function return the key of the completed
221 * socket, and 'result' argument contains the connect() result. If connect()
222 * succeeded, 'result' will have value zero, otherwise will have the error
223 * code.
224 */
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000225static int check_connecting( pj_ioqueue_t *ioqueue )
Benny Prijono9033e312005-11-21 02:08:39 +0000226{
Benny Prijono9033e312005-11-21 02:08:39 +0000227 if (ioqueue->connecting_count) {
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000228 int i, count;
229 struct
230 {
231 pj_ioqueue_key_t *key;
232 pj_status_t status;
233 } events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1];
Benny Prijono9033e312005-11-21 02:08:39 +0000234
235 pj_lock_acquire(ioqueue->lock);
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000236 for (count=0; count<PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1; ++count) {
237 DWORD result;
Benny Prijono9033e312005-11-21 02:08:39 +0000238
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000239 result = WaitForMultipleObjects(ioqueue->connecting_count,
240 ioqueue->connecting_handles,
241 FALSE, 0);
242 if (result >= WAIT_OBJECT_0 &&
243 result < WAIT_OBJECT_0+ioqueue->connecting_count)
244 {
245 WSANETWORKEVENTS net_events;
Benny Prijono9033e312005-11-21 02:08:39 +0000246
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000247 /* Got completed connect(). */
248 unsigned pos = result - WAIT_OBJECT_0;
249 events[count].key = ioqueue->connecting_keys[pos];
Benny Prijono9033e312005-11-21 02:08:39 +0000250
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000251 /* See whether connect has succeeded. */
252 WSAEnumNetworkEvents((pj_sock_t)events[count].key->hnd,
253 ioqueue->connecting_handles[pos],
254 &net_events);
255 events[count].status =
256 PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]);
257
258 /* Erase socket from pending connect. */
259 erase_connecting_socket(ioqueue, pos);
260 } else {
261 /* No more events */
262 break;
263 }
Benny Prijono9033e312005-11-21 02:08:39 +0000264 }
265 pj_lock_release(ioqueue->lock);
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000266
267 /* Call callbacks. */
268 for (i=0; i<count; ++i) {
269 if (events[i].key->cb.on_connect_complete) {
270 events[i].key->cb.on_connect_complete(events[i].key,
271 events[i].status);
272 }
273 }
274
275 return count;
Benny Prijono9033e312005-11-21 02:08:39 +0000276 }
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000277
278 return 0;
279
Benny Prijono9033e312005-11-21 02:08:39 +0000280}
281#endif
282
283/*
284 * pj_ioqueue_name()
285 */
286PJ_DEF(const char*) pj_ioqueue_name(void)
287{
288 return "iocp";
289}
290
291/*
292 * pj_ioqueue_create()
293 */
294PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
295 pj_size_t max_fd,
296 pj_ioqueue_t **p_ioqueue)
297{
298 pj_ioqueue_t *ioqueue;
Benny Prijono5accbd02006-03-30 16:32:18 +0000299 unsigned i;
Benny Prijono9033e312005-11-21 02:08:39 +0000300 pj_status_t rc;
301
302 PJ_UNUSED_ARG(max_fd);
303 PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL);
304
305 rc = sizeof(union operation_key);
306
307 /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */
308 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
309 sizeof(union operation_key), PJ_EBUG);
310
Benny Prijono5accbd02006-03-30 16:32:18 +0000311 /* Create IOCP */
Benny Prijono9033e312005-11-21 02:08:39 +0000312 ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue));
313 ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
314 if (ioqueue->iocp == NULL)
315 return PJ_RETURN_OS_ERROR(GetLastError());
316
Benny Prijono5accbd02006-03-30 16:32:18 +0000317 /* Create IOCP mutex */
Benny Prijono9033e312005-11-21 02:08:39 +0000318 rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock);
319 if (rc != PJ_SUCCESS) {
320 CloseHandle(ioqueue->iocp);
321 return rc;
322 }
323
324 ioqueue->auto_delete_lock = PJ_TRUE;
325
Benny Prijono5accbd02006-03-30 16:32:18 +0000326#if PJ_IOQUEUE_HAS_SAFE_UNREG
327 /*
328 * Create and initialize key pools.
329 */
330 pj_list_init(&ioqueue->active_list);
331 pj_list_init(&ioqueue->free_list);
332 pj_list_init(&ioqueue->closing_list);
333
334 /* Preallocate keys according to max_fd setting, and put them
335 * in free_list.
336 */
337 for (i=0; i<max_fd; ++i) {
338 pj_ioqueue_key_t *key;
339
340 key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t));
341
342 rc = pj_atomic_create(pool, 0, &key->ref_count);
343 if (rc != PJ_SUCCESS) {
344 key = ioqueue->free_list.next;
345 while (key != &ioqueue->free_list) {
346 pj_atomic_destroy(key->ref_count);
347 key = key->next;
348 }
349 CloseHandle(ioqueue->iocp);
350 return rc;
351 }
352
353 pj_list_push_back(&ioqueue->free_list, key);
354
355 }
356#endif
357
Benny Prijono9033e312005-11-21 02:08:39 +0000358 *p_ioqueue = ioqueue;
359
360 PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue));
361 return PJ_SUCCESS;
362}
363
364/*
365 * pj_ioqueue_destroy()
366 */
367PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
368{
369 unsigned i;
Benny Prijono5accbd02006-03-30 16:32:18 +0000370 pj_ioqueue_key_t *key;
Benny Prijono9033e312005-11-21 02:08:39 +0000371
372 PJ_CHECK_STACK();
373 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
374
Benny Prijono5accbd02006-03-30 16:32:18 +0000375 pj_lock_acquire(ioqueue->lock);
376
377#if PJ_HAS_TCP
Benny Prijono9033e312005-11-21 02:08:39 +0000378 /* Destroy events in the pool */
379 for (i=0; i<ioqueue->event_count; ++i) {
380 CloseHandle(ioqueue->event_pool[i]);
381 }
382 ioqueue->event_count = 0;
Benny Prijono5accbd02006-03-30 16:32:18 +0000383#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000384
385 if (CloseHandle(ioqueue->iocp) != TRUE)
386 return PJ_RETURN_OS_ERROR(GetLastError());
387
Benny Prijono5accbd02006-03-30 16:32:18 +0000388#if PJ_IOQUEUE_HAS_SAFE_UNREG
389 /* Destroy reference counters */
390 key = ioqueue->active_list.next;
391 while (key != &ioqueue->active_list) {
392 pj_atomic_destroy(key->ref_count);
393 key = key->next;
394 }
395
396 key = ioqueue->closing_list.next;
397 while (key != &ioqueue->closing_list) {
398 pj_atomic_destroy(key->ref_count);
399 key = key->next;
400 }
401
402 key = ioqueue->free_list.next;
403 while (key != &ioqueue->free_list) {
404 pj_atomic_destroy(key->ref_count);
405 key = key->next;
406 }
407#endif
408
Benny Prijono9033e312005-11-21 02:08:39 +0000409 if (ioqueue->auto_delete_lock)
410 pj_lock_destroy(ioqueue->lock);
411
412 return PJ_SUCCESS;
413}
414
415/*
416 * pj_ioqueue_set_lock()
417 */
418PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
419 pj_lock_t *lock,
420 pj_bool_t auto_delete )
421{
422 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
423
424 if (ioqueue->auto_delete_lock) {
425 pj_lock_destroy(ioqueue->lock);
426 }
427
428 ioqueue->lock = lock;
429 ioqueue->auto_delete_lock = auto_delete;
430
431 return PJ_SUCCESS;
432}
433
434/*
435 * pj_ioqueue_register_sock()
436 */
437PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
438 pj_ioqueue_t *ioqueue,
439 pj_sock_t sock,
440 void *user_data,
441 const pj_ioqueue_callback *cb,
442 pj_ioqueue_key_t **key )
443{
444 HANDLE hioq;
445 pj_ioqueue_key_t *rec;
446 u_long value;
447 int rc;
448
449 PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL);
450
Benny Prijono5accbd02006-03-30 16:32:18 +0000451 pj_lock_acquire(ioqueue->lock);
452
453#if PJ_IOQUEUE_HAS_SAFE_UNREG
454 /* If safe unregistration is used, then get the key record from
455 * the free list.
456 */
457 if (pj_list_empty(&ioqueue->free_list)) {
458 pj_lock_release(ioqueue->lock);
459 return PJ_ETOOMANY;
460 }
461
462 rec = ioqueue->free_list.next;
463 pj_list_erase(rec);
464
465 /* Set initial reference count to 1 */
466 pj_assert(pj_atomic_get(rec->ref_count) == 0);
467 pj_atomic_inc(rec->ref_count);
468
469 rec->closing = 0;
470
471#else
Benny Prijono9033e312005-11-21 02:08:39 +0000472 rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
Benny Prijono5accbd02006-03-30 16:32:18 +0000473#endif
474
475 /* Build the key for this socket. */
Benny Prijono9033e312005-11-21 02:08:39 +0000476 rec->ioqueue = ioqueue;
477 rec->hnd = (HANDLE)sock;
478 rec->hnd_type = HND_IS_SOCKET;
479 rec->user_data = user_data;
480 pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
481
Benny Prijono5accbd02006-03-30 16:32:18 +0000482#if PJ_HAS_TCP
483 rec->connecting = 0;
484#endif
485
Benny Prijono9033e312005-11-21 02:08:39 +0000486 /* Set socket to nonblocking. */
487 value = 1;
488 rc = ioctlsocket(sock, FIONBIO, &value);
489 if (rc != 0) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000490 pj_lock_release(ioqueue->lock);
Benny Prijono9033e312005-11-21 02:08:39 +0000491 return PJ_RETURN_OS_ERROR(WSAGetLastError());
492 }
493
494 /* Associate with IOCP */
495 hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0);
496 if (!hioq) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000497 pj_lock_release(ioqueue->lock);
Benny Prijono9033e312005-11-21 02:08:39 +0000498 return PJ_RETURN_OS_ERROR(GetLastError());
499 }
500
501 *key = rec;
Benny Prijono5accbd02006-03-30 16:32:18 +0000502
503#if PJ_IOQUEUE_HAS_SAFE_UNREG
504 pj_list_push_back(&ioqueue->active_list, rec);
505#endif
506
507 pj_lock_release(ioqueue->lock);
508
Benny Prijono9033e312005-11-21 02:08:39 +0000509 return PJ_SUCCESS;
510}
511
Benny Prijono9033e312005-11-21 02:08:39 +0000512
513/*
514 * pj_ioqueue_get_user_data()
515 */
516PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
517{
518 PJ_ASSERT_RETURN(key, NULL);
519 return key->user_data;
520}
521
522/*
523 * pj_ioqueue_set_user_data()
524 */
525PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
526 void *user_data,
527 void **old_data )
528{
529 PJ_ASSERT_RETURN(key, PJ_EINVAL);
530
531 if (old_data)
532 *old_data = key->user_data;
533
534 key->user_data = user_data;
535 return PJ_SUCCESS;
536}
537
Benny Prijono8d317a02006-03-22 11:49:19 +0000538
Benny Prijono5accbd02006-03-30 16:32:18 +0000539#if PJ_IOQUEUE_HAS_SAFE_UNREG
540/* Decrement the key's reference counter, and when the counter reach zero,
541 * destroy the key.
542 */
543static void decrement_counter(pj_ioqueue_key_t *key)
544{
545 if (pj_atomic_dec_and_get(key->ref_count) == 0) {
546
547 pj_lock_acquire(key->ioqueue->lock);
548
549 pj_assert(key->closing == 1);
550 pj_gettimeofday(&key->free_time);
551 key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
552 pj_time_val_normalize(&key->free_time);
553
554 pj_list_erase(key);
555 pj_list_push_back(&key->ioqueue->closing_list, key);
556
557 pj_lock_release(key->ioqueue->lock);
558 }
559}
560#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000561
Benny Prijono9033e312005-11-21 02:08:39 +0000562/*
Benny Prijono5accbd02006-03-30 16:32:18 +0000563 * Poll the I/O Completion Port, execute callback,
Benny Prijono8d317a02006-03-22 11:49:19 +0000564 * and return the key and bytes transfered of the last operation.
Benny Prijono9033e312005-11-21 02:08:39 +0000565 */
Benny Prijono8d317a02006-03-22 11:49:19 +0000566static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
567 pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key )
Benny Prijono9033e312005-11-21 02:08:39 +0000568{
Benny Prijono8d317a02006-03-22 11:49:19 +0000569 DWORD dwBytesTransfered, dwKey;
Benny Prijono9033e312005-11-21 02:08:39 +0000570 generic_overlapped *pOv;
571 pj_ioqueue_key_t *key;
Benny Prijono4f2be312005-11-21 17:01:06 +0000572 pj_ssize_t size_status = -1;
Benny Prijono8d317a02006-03-22 11:49:19 +0000573 BOOL rcGetQueued;
Benny Prijono9033e312005-11-21 02:08:39 +0000574
575 /* Poll for completion status. */
Benny Prijono8d317a02006-03-22 11:49:19 +0000576 rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransfered,
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000577 &dwKey, (OVERLAPPED**)&pOv,
Benny Prijono8d317a02006-03-22 11:49:19 +0000578 dwTimeout);
Benny Prijono9033e312005-11-21 02:08:39 +0000579
580 /* The return value is:
581 * - nonzero if event was dequeued.
582 * - zero and pOv==NULL if no event was dequeued.
583 * - zero and pOv!=NULL if event for failed I/O was dequeued.
584 */
585 if (pOv) {
586 /* Event was dequeued for either successfull or failed I/O */
587 key = (pj_ioqueue_key_t*)dwKey;
588 size_status = dwBytesTransfered;
Benny Prijono8d317a02006-03-22 11:49:19 +0000589
590 /* Report to caller regardless */
591 if (p_bytes)
592 *p_bytes = size_status;
593 if (p_key)
594 *p_key = key;
595
Benny Prijono5accbd02006-03-30 16:32:18 +0000596#if PJ_IOQUEUE_HAS_SAFE_UNREG
597 /* We shouldn't call callbacks if key is quitting. */
598 if (key->closing)
Benny Prijono8d317a02006-03-22 11:49:19 +0000599 return PJ_TRUE;
Benny Prijono8d317a02006-03-22 11:49:19 +0000600
Benny Prijono5accbd02006-03-30 16:32:18 +0000601 /* Increment reference counter to prevent this key from being
602 * deleted
Benny Prijono8d317a02006-03-22 11:49:19 +0000603 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000604 pj_atomic_inc(key->ref_count);
605#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000606
607 /* Carry out the callback */
Benny Prijono9033e312005-11-21 02:08:39 +0000608 switch (pOv->operation) {
609 case PJ_IOQUEUE_OP_READ:
610 case PJ_IOQUEUE_OP_RECV:
611 case PJ_IOQUEUE_OP_RECV_FROM:
612 pOv->operation = 0;
613 if (key->cb.on_read_complete)
614 key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv,
615 size_status);
616 break;
617 case PJ_IOQUEUE_OP_WRITE:
618 case PJ_IOQUEUE_OP_SEND:
619 case PJ_IOQUEUE_OP_SEND_TO:
620 pOv->operation = 0;
621 if (key->cb.on_write_complete)
622 key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv,
623 size_status);
624 break;
625#if PJ_HAS_TCP
626 case PJ_IOQUEUE_OP_ACCEPT:
627 /* special case for accept. */
628 ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv);
629 if (key->cb.on_accept_complete) {
630 ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv;
Benny Prijono9c025eb2006-07-10 21:35:27 +0000631 pj_status_t status = PJ_SUCCESS;
632
633 if (accept_rec->newsock == PJ_INVALID_SOCKET) {
634 int dwError = WSAGetLastError();
635 if (dwError == 0) dwError = OSERR_ENOTCONN;
636 status = PJ_RETURN_OS_ERROR(dwError);
637 }
638
Benny Prijono9033e312005-11-21 02:08:39 +0000639 key->cb.on_accept_complete(key,
640 (pj_ioqueue_op_key_t*)pOv,
641 accept_rec->newsock,
Benny Prijono9c025eb2006-07-10 21:35:27 +0000642 status);
Benny Prijono01de33b2006-06-28 15:23:18 +0000643 accept_rec->newsock = PJ_INVALID_SOCKET;
Benny Prijono9033e312005-11-21 02:08:39 +0000644 }
645 break;
646 case PJ_IOQUEUE_OP_CONNECT:
647#endif
648 case PJ_IOQUEUE_OP_NONE:
649 pj_assert(0);
650 break;
651 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000652
653#if PJ_IOQUEUE_HAS_SAFE_UNREG
654 decrement_counter(key);
655#endif
656
Benny Prijono8d317a02006-03-22 11:49:19 +0000657 return PJ_TRUE;
Benny Prijono9033e312005-11-21 02:08:39 +0000658 }
659
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000660 /* No event was queued. */
Benny Prijono8d317a02006-03-22 11:49:19 +0000661 return PJ_FALSE;
662}
663
664/*
665 * pj_ioqueue_unregister()
666 */
667PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
668{
Benny Prijono8d317a02006-03-22 11:49:19 +0000669 PJ_ASSERT_RETURN(key, PJ_EINVAL);
670
671#if PJ_HAS_TCP
672 if (key->connecting) {
673 unsigned pos;
674 pj_ioqueue_t *ioqueue;
675
676 ioqueue = key->ioqueue;
677
678 /* Erase from connecting_handles */
679 pj_lock_acquire(ioqueue->lock);
680 for (pos=0; pos < ioqueue->connecting_count; ++pos) {
681 if (ioqueue->connecting_keys[pos] == key) {
682 erase_connecting_socket(ioqueue, pos);
683 break;
684 }
685 }
686 key->connecting = 0;
687 pj_lock_release(ioqueue->lock);
688 }
689#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000690
691 /* Close handle (the only way to disassociate handle from IOCP).
692 * We also need to close handle to make sure that no further events
693 * will come to the handle.
Benny Prijono8d317a02006-03-22 11:49:19 +0000694 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000695 CloseHandle(key->hnd);
Benny Prijono8d317a02006-03-22 11:49:19 +0000696
Benny Prijono5accbd02006-03-30 16:32:18 +0000697 /* Reset callbacks */
Benny Prijono8d317a02006-03-22 11:49:19 +0000698 key->cb.on_accept_complete = NULL;
699 key->cb.on_connect_complete = NULL;
Benny Prijono5accbd02006-03-30 16:32:18 +0000700 key->cb.on_read_complete = NULL;
701 key->cb.on_write_complete = NULL;
Benny Prijono8d317a02006-03-22 11:49:19 +0000702
Benny Prijono5accbd02006-03-30 16:32:18 +0000703#if PJ_IOQUEUE_HAS_SAFE_UNREG
704 /* Mark key as closing. */
705 key->closing = 1;
Benny Prijono8d317a02006-03-22 11:49:19 +0000706
Benny Prijono5accbd02006-03-30 16:32:18 +0000707 /* Decrement reference counter. */
708 decrement_counter(key);
Benny Prijono8d317a02006-03-22 11:49:19 +0000709
Benny Prijono5accbd02006-03-30 16:32:18 +0000710 /* Even after handle is closed, I suspect that IOCP may still try to
711 * do something with the handle, causing memory corruption when pool
712 * debugging is enabled.
713 *
714 * Forcing context switch seems to have fixed that, but this is quite
715 * an ugly solution..
Benny Prijono8d317a02006-03-22 11:49:19 +0000716 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000717 pj_thread_sleep(0);
718#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000719
720 return PJ_SUCCESS;
721}
722
723/*
724 * pj_ioqueue_poll()
725 *
726 * Poll for events.
727 */
728PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
729{
730 DWORD dwMsec;
731 int connect_count = 0;
Benny Prijono5accbd02006-03-30 16:32:18 +0000732 int event_count = 0;
Benny Prijono8d317a02006-03-22 11:49:19 +0000733
734 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
735
Benny Prijono8d317a02006-03-22 11:49:19 +0000736 /* Calculate miliseconds timeout for GetQueuedCompletionStatus */
737 dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
738
739 /* Poll for completion status. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000740 event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);
741
742#if PJ_HAS_TCP
743 /* Check the connecting array, only when there's no activity. */
744 if (event_count == 0) {
745 connect_count = check_connecting(ioqueue);
746 if (connect_count > 0)
747 event_count += connect_count;
748 }
749#endif
750
751#if PJ_IOQUEUE_HAS_SAFE_UNREG
752 /* Check the closing keys only when there's no activity and when there are
753 * pending closing keys.
754 */
755 if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) {
756 pj_time_val now;
757 pj_ioqueue_key_t *key;
758
759 pj_gettimeofday(&now);
760
761 /* Move closing keys to free list when they've finished the closing
762 * idle time.
763 */
764 pj_lock_acquire(ioqueue->lock);
765 key = ioqueue->closing_list.next;
766 while (key != &ioqueue->closing_list) {
767 pj_ioqueue_key_t *next = key->next;
768
769 pj_assert(key->closing != 0);
770
771 if (PJ_TIME_VAL_GTE(now, key->free_time)) {
772 pj_list_erase(key);
773 pj_list_push_back(&ioqueue->free_list, key);
774 }
775 key = next;
776 }
777 pj_lock_release(ioqueue->lock);
778 }
779#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000780
781 /* Return number of events. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000782 return event_count;
Benny Prijono9033e312005-11-21 02:08:39 +0000783}
784
785/*
786 * pj_ioqueue_recv()
787 *
788 * Initiate overlapped WSARecv() operation.
789 */
790PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
791 pj_ioqueue_op_key_t *op_key,
792 void *buffer,
793 pj_ssize_t *length,
794 pj_uint32_t flags )
795{
796 /*
797 * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and
798 * addrlen here. But unfortunately it generates EINVAL... :-(
799 * -bennylp
800 */
801 int rc;
802 DWORD bytesRead;
803 DWORD dwFlags = 0;
804 union operation_key *op_key_rec;
805
806 PJ_CHECK_STACK();
807 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
808
Benny Prijono5accbd02006-03-30 16:32:18 +0000809#if PJ_IOQUEUE_HAS_SAFE_UNREG
810 /* Check key is not closing */
811 if (key->closing)
812 return PJ_ECANCELLED;
813#endif
814
Benny Prijono9033e312005-11-21 02:08:39 +0000815 op_key_rec = (union operation_key*)op_key->internal__;
816 op_key_rec->overlapped.wsabuf.buf = buffer;
817 op_key_rec->overlapped.wsabuf.len = *length;
818
819 dwFlags = flags;
820
821 /* Try non-overlapped received first to see if data is
822 * immediately available.
823 */
824 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
825 rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
826 &bytesRead, &dwFlags, NULL, NULL);
827 if (rc == 0) {
828 *length = bytesRead;
829 return PJ_SUCCESS;
830 } else {
831 DWORD dwError = WSAGetLastError();
832 if (dwError != WSAEWOULDBLOCK) {
833 *length = -1;
834 return PJ_RETURN_OS_ERROR(dwError);
835 }
836 }
837 }
838
839 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
840
841 /*
842 * No immediate data available.
843 * Register overlapped Recv() operation.
844 */
Benny Prijonoac623b32006-07-03 15:19:31 +0000845 pj_bzero( &op_key_rec->overlapped.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +0000846 sizeof(op_key_rec->overlapped.overlapped));
847 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
848
849 rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
850 &bytesRead, &dwFlags,
851 &op_key_rec->overlapped.overlapped, NULL);
852 if (rc == SOCKET_ERROR) {
853 DWORD dwStatus = WSAGetLastError();
854 if (dwStatus!=WSA_IO_PENDING) {
855 *length = -1;
856 return PJ_STATUS_FROM_OS(dwStatus);
857 }
858 }
859
860 /* Pending operation has been scheduled. */
861 return PJ_EPENDING;
862}
863
864/*
865 * pj_ioqueue_recvfrom()
866 *
867 * Initiate overlapped RecvFrom() operation.
868 */
869PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
870 pj_ioqueue_op_key_t *op_key,
871 void *buffer,
872 pj_ssize_t *length,
873 pj_uint32_t flags,
874 pj_sockaddr_t *addr,
875 int *addrlen)
876{
877 int rc;
878 DWORD bytesRead;
879 DWORD dwFlags = 0;
880 union operation_key *op_key_rec;
881
882 PJ_CHECK_STACK();
883 PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
884
Benny Prijono5accbd02006-03-30 16:32:18 +0000885#if PJ_IOQUEUE_HAS_SAFE_UNREG
886 /* Check key is not closing */
887 if (key->closing)
888 return PJ_ECANCELLED;
889#endif
890
Benny Prijono9033e312005-11-21 02:08:39 +0000891 op_key_rec = (union operation_key*)op_key->internal__;
892 op_key_rec->overlapped.wsabuf.buf = buffer;
893 op_key_rec->overlapped.wsabuf.len = *length;
894
895 dwFlags = flags;
896
897 /* Try non-overlapped received first to see if data is
898 * immediately available.
899 */
900 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
901 rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
902 &bytesRead, &dwFlags, addr, addrlen, NULL, NULL);
903 if (rc == 0) {
904 *length = bytesRead;
905 return PJ_SUCCESS;
906 } else {
907 DWORD dwError = WSAGetLastError();
908 if (dwError != WSAEWOULDBLOCK) {
909 *length = -1;
910 return PJ_RETURN_OS_ERROR(dwError);
911 }
912 }
913 }
914
915 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
916
917 /*
918 * No immediate data available.
919 * Register overlapped Recv() operation.
920 */
Benny Prijonoac623b32006-07-03 15:19:31 +0000921 pj_bzero( &op_key_rec->overlapped.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +0000922 sizeof(op_key_rec->overlapped.overlapped));
923 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
924
925 rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
926 &bytesRead, &dwFlags, addr, addrlen,
927 &op_key_rec->overlapped.overlapped, NULL);
928 if (rc == SOCKET_ERROR) {
929 DWORD dwStatus = WSAGetLastError();
930 if (dwStatus!=WSA_IO_PENDING) {
931 *length = -1;
932 return PJ_STATUS_FROM_OS(dwStatus);
933 }
934 }
935
936 /* Pending operation has been scheduled. */
937 return PJ_EPENDING;
938}
939
940/*
941 * pj_ioqueue_send()
942 *
943 * Initiate overlapped Send operation.
944 */
945PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
946 pj_ioqueue_op_key_t *op_key,
947 const void *data,
948 pj_ssize_t *length,
949 pj_uint32_t flags )
950{
951 return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0);
952}
953
954
955/*
956 * pj_ioqueue_sendto()
957 *
958 * Initiate overlapped SendTo operation.
959 */
960PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
961 pj_ioqueue_op_key_t *op_key,
962 const void *data,
963 pj_ssize_t *length,
964 pj_uint32_t flags,
965 const pj_sockaddr_t *addr,
966 int addrlen)
967{
968 int rc;
969 DWORD bytesWritten;
970 DWORD dwFlags;
971 union operation_key *op_key_rec;
972
973 PJ_CHECK_STACK();
974 PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);
Benny Prijono5accbd02006-03-30 16:32:18 +0000975
976#if PJ_IOQUEUE_HAS_SAFE_UNREG
977 /* Check key is not closing */
978 if (key->closing)
979 return PJ_ECANCELLED;
980#endif
981
Benny Prijono9033e312005-11-21 02:08:39 +0000982 op_key_rec = (union operation_key*)op_key->internal__;
983
984 /*
985 * First try blocking write.
986 */
987 op_key_rec->overlapped.wsabuf.buf = (void*)data;
988 op_key_rec->overlapped.wsabuf.len = *length;
989
990 dwFlags = flags;
991
992 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
993 rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
994 &bytesWritten, dwFlags, addr, addrlen,
995 NULL, NULL);
996 if (rc == 0) {
997 *length = bytesWritten;
998 return PJ_SUCCESS;
999 } else {
1000 DWORD dwStatus = WSAGetLastError();
1001 if (dwStatus != WSAEWOULDBLOCK) {
1002 *length = -1;
1003 return PJ_RETURN_OS_ERROR(dwStatus);
1004 }
1005 }
1006 }
1007
1008 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
1009
1010 /*
1011 * Data can't be sent immediately.
1012 * Schedule asynchronous WSASend().
1013 */
Benny Prijonoac623b32006-07-03 15:19:31 +00001014 pj_bzero( &op_key_rec->overlapped.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +00001015 sizeof(op_key_rec->overlapped.overlapped));
1016 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND;
1017
1018 rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1019 &bytesWritten, dwFlags, addr, addrlen,
1020 &op_key_rec->overlapped.overlapped, NULL);
1021 if (rc == SOCKET_ERROR) {
1022 DWORD dwStatus = WSAGetLastError();
1023 if (dwStatus!=WSA_IO_PENDING)
1024 return PJ_STATUS_FROM_OS(dwStatus);
1025 }
1026
1027 /* Asynchronous operation successfully submitted. */
1028 return PJ_EPENDING;
1029}
1030
1031#if PJ_HAS_TCP
1032
1033/*
1034 * pj_ioqueue_accept()
1035 *
1036 * Initiate overlapped accept() operation.
1037 */
1038PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
1039 pj_ioqueue_op_key_t *op_key,
1040 pj_sock_t *new_sock,
1041 pj_sockaddr_t *local,
1042 pj_sockaddr_t *remote,
1043 int *addrlen)
1044{
1045 BOOL rc;
1046 DWORD bytesReceived;
1047 pj_status_t status;
1048 union operation_key *op_key_rec;
1049 SOCKET sock;
1050
1051 PJ_CHECK_STACK();
1052 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
1053
Benny Prijono5accbd02006-03-30 16:32:18 +00001054#if PJ_IOQUEUE_HAS_SAFE_UNREG
1055 /* Check key is not closing */
1056 if (key->closing)
1057 return PJ_ECANCELLED;
1058#endif
1059
Benny Prijono9033e312005-11-21 02:08:39 +00001060 /*
1061 * See if there is a new connection immediately available.
1062 */
1063 sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0);
1064 if (sock != INVALID_SOCKET) {
1065 /* Yes! New socket is available! */
1066 int status;
1067
1068 status = getsockname(sock, local, addrlen);
1069 if (status != 0) {
1070 DWORD dwError = WSAGetLastError();
1071 closesocket(sock);
1072 return PJ_RETURN_OS_ERROR(dwError);
1073 }
1074
1075 *new_sock = sock;
1076 return PJ_SUCCESS;
1077
1078 } else {
1079 DWORD dwError = WSAGetLastError();
1080 if (dwError != WSAEWOULDBLOCK) {
1081 return PJ_RETURN_OS_ERROR(dwError);
1082 }
1083 }
1084
1085 /*
1086 * No connection is immediately available.
1087 * Must schedule an asynchronous operation.
1088 */
1089 op_key_rec = (union operation_key*)op_key->internal__;
1090
1091 status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0,
1092 &op_key_rec->accept.newsock);
1093 if (status != PJ_SUCCESS)
1094 return status;
1095
1096 /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket
1097 * addresses can be obtained with getsockname() and getpeername().
1098 */
1099 status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET,
1100 SO_UPDATE_ACCEPT_CONTEXT,
1101 (char*)&key->hnd, sizeof(SOCKET));
1102 /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
1103 * So ignore the error status.
1104 */
1105
1106 op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT;
1107 op_key_rec->accept.addrlen = addrlen;
1108 op_key_rec->accept.local = local;
1109 op_key_rec->accept.remote = remote;
1110 op_key_rec->accept.newsock_ptr = new_sock;
Benny Prijonoac623b32006-07-03 15:19:31 +00001111 pj_bzero( &op_key_rec->accept.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +00001112 sizeof(op_key_rec->accept.overlapped));
1113
1114 rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock,
1115 op_key_rec->accept.accept_buf,
1116 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN,
1117 &bytesReceived,
1118 &op_key_rec->accept.overlapped );
1119
1120 if (rc == TRUE) {
1121 ioqueue_on_accept_complete(&op_key_rec->accept);
1122 return PJ_SUCCESS;
1123 } else {
1124 DWORD dwStatus = WSAGetLastError();
1125 if (dwStatus!=WSA_IO_PENDING)
1126 return PJ_STATUS_FROM_OS(dwStatus);
1127 }
1128
1129 /* Asynchronous Accept() has been submitted. */
1130 return PJ_EPENDING;
1131}
1132
1133
1134/*
1135 * pj_ioqueue_connect()
1136 *
1137 * Initiate overlapped connect() operation (well, it's non-blocking actually,
1138 * since there's no overlapped version of connect()).
1139 */
1140PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
1141 const pj_sockaddr_t *addr,
1142 int addrlen )
1143{
1144 HANDLE hEvent;
1145 pj_ioqueue_t *ioqueue;
1146
1147 PJ_CHECK_STACK();
1148 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
1149
Benny Prijono5accbd02006-03-30 16:32:18 +00001150#if PJ_IOQUEUE_HAS_SAFE_UNREG
1151 /* Check key is not closing */
1152 if (key->closing)
1153 return PJ_ECANCELLED;
1154#endif
1155
Benny Prijono9033e312005-11-21 02:08:39 +00001156 /* Initiate connect() */
1157 if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) {
1158 DWORD dwStatus;
1159 dwStatus = WSAGetLastError();
1160 if (dwStatus != WSAEWOULDBLOCK) {
1161 return PJ_RETURN_OS_ERROR(dwStatus);
1162 }
1163 } else {
1164 /* Connect has completed immediately! */
1165 return PJ_SUCCESS;
1166 }
1167
1168 ioqueue = key->ioqueue;
1169
1170 /* Add to the array of connecting socket to be polled */
1171 pj_lock_acquire(ioqueue->lock);
1172
1173 if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) {
1174 pj_lock_release(ioqueue->lock);
1175 return PJ_ETOOMANYCONN;
1176 }
1177
1178 /* Get or create event object. */
1179 if (ioqueue->event_count) {
1180 hEvent = ioqueue->event_pool[ioqueue->event_count - 1];
1181 --ioqueue->event_count;
1182 } else {
1183 hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
1184 if (hEvent == NULL) {
1185 DWORD dwStatus = GetLastError();
1186 pj_lock_release(ioqueue->lock);
1187 return PJ_STATUS_FROM_OS(dwStatus);
1188 }
1189 }
1190
1191 /* Mark key as connecting.
1192 * We can't use array index since key can be removed dynamically.
1193 */
1194 key->connecting = 1;
1195
1196 /* Associate socket events to the event object. */
1197 if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) {
1198 CloseHandle(hEvent);
1199 pj_lock_release(ioqueue->lock);
1200 return PJ_RETURN_OS_ERROR(WSAGetLastError());
1201 }
1202
1203 /* Add to array. */
1204 ioqueue->connecting_keys[ ioqueue->connecting_count ] = key;
1205 ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent;
1206 ioqueue->connecting_count++;
1207
1208 pj_lock_release(ioqueue->lock);
1209
1210 return PJ_EPENDING;
1211}
1212#endif /* #if PJ_HAS_TCP */
1213
1214
1215PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1216 pj_size_t size )
1217{
Benny Prijonoac623b32006-07-03 15:19:31 +00001218 pj_bzero(op_key, size);
Benny Prijono9033e312005-11-21 02:08:39 +00001219}
1220
1221PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1222 pj_ioqueue_op_key_t *op_key )
1223{
1224 BOOL rc;
1225 DWORD bytesTransfered;
1226
1227 rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key,
1228 &bytesTransfered, FALSE );
1229
1230 if (rc == FALSE) {
1231 return GetLastError()==ERROR_IO_INCOMPLETE;
1232 }
1233
1234 return FALSE;
1235}
1236
1237
1238PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1239 pj_ioqueue_op_key_t *op_key,
1240 pj_ssize_t bytes_status )
1241{
1242 BOOL rc;
1243
1244 rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status,
1245 (long)key, (OVERLAPPED*)op_key );
1246 if (rc == FALSE) {
1247 return PJ_RETURN_OS_ERROR(GetLastError());
1248 }
1249
1250 return PJ_SUCCESS;
1251}
1252