blob: ce8cd5d94b67c7ef2f0a10c9b72ee4ce7166f81f [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
Benny Prijonoa771a512007-02-19 01:13:53 +00003 * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org>
Benny Prijono9033e312005-11-21 02:08:39 +00004 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19#include <pj/ioqueue.h>
20#include <pj/os.h>
21#include <pj/lock.h>
22#include <pj/pool.h>
23#include <pj/string.h>
24#include <pj/sock.h>
25#include <pj/array.h>
26#include <pj/log.h>
27#include <pj/assert.h>
28#include <pj/errno.h>
Benny Prijono9c025eb2006-07-10 21:35:27 +000029#include <pj/compat/socket.h>
Benny Prijono9033e312005-11-21 02:08:39 +000030
31
32#if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0
33# include <winsock2.h>
34#elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0
35# include <winsock.h>
36#endif
37
38#if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0
39# include <mswsock.h>
40#endif
41
42
43/* The address specified in AcceptEx() must be 16 more than the size of
44 * SOCKADDR (source: MSDN).
45 */
46#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16)
47
48typedef struct generic_overlapped
49{
50 WSAOVERLAPPED overlapped;
51 pj_ioqueue_operation_e operation;
52} generic_overlapped;
53
54/*
55 * OVERLAPPPED structure for send and receive.
56 */
57typedef struct ioqueue_overlapped
58{
59 WSAOVERLAPPED overlapped;
60 pj_ioqueue_operation_e operation;
61 WSABUF wsabuf;
62 pj_sockaddr_in dummy_addr;
63 int dummy_addrlen;
64} ioqueue_overlapped;
65
66#if PJ_HAS_TCP
67/*
68 * OVERLAP structure for accept.
69 */
70typedef struct ioqueue_accept_rec
71{
72 WSAOVERLAPPED overlapped;
73 pj_ioqueue_operation_e operation;
74 pj_sock_t newsock;
75 pj_sock_t *newsock_ptr;
76 int *addrlen;
77 void *remote;
78 void *local;
79 char accept_buf[2 * ACCEPT_ADDR_LEN];
80} ioqueue_accept_rec;
81#endif
82
83/*
84 * Structure to hold pending operation key.
85 */
86union operation_key
87{
88 generic_overlapped generic;
89 ioqueue_overlapped overlapped;
90#if PJ_HAS_TCP
91 ioqueue_accept_rec accept;
92#endif
93};
94
95/* Type of handle in the key. */
96enum handle_type
97{
98 HND_IS_UNKNOWN,
99 HND_IS_FILE,
100 HND_IS_SOCKET,
101};
102
Benny Prijono8d317a02006-03-22 11:49:19 +0000103enum { POST_QUIT_LEN = 0xFFFFDEADUL };
104
Benny Prijono9033e312005-11-21 02:08:39 +0000105/*
106 * Structure for individual socket.
107 */
108struct pj_ioqueue_key_t
109{
Benny Prijono5accbd02006-03-30 16:32:18 +0000110 PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
111
Benny Prijono9033e312005-11-21 02:08:39 +0000112 pj_ioqueue_t *ioqueue;
113 HANDLE hnd;
114 void *user_data;
115 enum handle_type hnd_type;
Benny Prijono5accbd02006-03-30 16:32:18 +0000116 pj_ioqueue_callback cb;
117
Benny Prijono9033e312005-11-21 02:08:39 +0000118#if PJ_HAS_TCP
119 int connecting;
120#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000121
122#if PJ_IOQUEUE_HAS_SAFE_UNREG
123 pj_atomic_t *ref_count;
124 pj_bool_t closing;
125 pj_time_val free_time;
126#endif
127
Benny Prijono9033e312005-11-21 02:08:39 +0000128};
129
130/*
131 * IO Queue structure.
132 */
133struct pj_ioqueue_t
134{
135 HANDLE iocp;
136 pj_lock_t *lock;
137 pj_bool_t auto_delete_lock;
Benny Prijono5accbd02006-03-30 16:32:18 +0000138
139#if PJ_IOQUEUE_HAS_SAFE_UNREG
140 pj_ioqueue_key_t active_list;
141 pj_ioqueue_key_t free_list;
142 pj_ioqueue_key_t closing_list;
143#endif
144
145 /* These are to keep track of connecting sockets */
146#if PJ_HAS_TCP
Benny Prijono9033e312005-11-21 02:08:39 +0000147 unsigned event_count;
148 HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1];
Benny Prijono9033e312005-11-21 02:08:39 +0000149 unsigned connecting_count;
150 HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1];
151 pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1];
152#endif
153};
154
155
156#if PJ_HAS_TCP
157/*
158 * Process the socket when the overlapped accept() completed.
159 */
160static void ioqueue_on_accept_complete(ioqueue_accept_rec *accept_overlapped)
161{
162 struct sockaddr *local;
163 struct sockaddr *remote;
164 int locallen, remotelen;
165
166 PJ_CHECK_STACK();
167
168 /* Operation complete immediately. */
169 GetAcceptExSockaddrs( accept_overlapped->accept_buf,
170 0,
171 ACCEPT_ADDR_LEN,
172 ACCEPT_ADDR_LEN,
173 &local,
174 &locallen,
175 &remote,
176 &remotelen);
Benny Prijono9c025eb2006-07-10 21:35:27 +0000177 if (*accept_overlapped->addrlen >= locallen) {
Benny Prijono9033e312005-11-21 02:08:39 +0000178 pj_memcpy(accept_overlapped->local, local, locallen);
179 pj_memcpy(accept_overlapped->remote, remote, locallen);
180 } else {
Benny Prijonoac623b32006-07-03 15:19:31 +0000181 pj_bzero(accept_overlapped->local, *accept_overlapped->addrlen);
182 pj_bzero(accept_overlapped->remote, *accept_overlapped->addrlen);
Benny Prijono9033e312005-11-21 02:08:39 +0000183 }
184 *accept_overlapped->addrlen = locallen;
185 if (accept_overlapped->newsock_ptr)
186 *accept_overlapped->newsock_ptr = accept_overlapped->newsock;
187 accept_overlapped->operation = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000188}
189
190static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos)
191{
192 pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos];
193 HANDLE hEvent = ioqueue->connecting_handles[pos];
194
195 /* Remove key from array of connecting handles. */
196 pj_array_erase(ioqueue->connecting_keys, sizeof(key),
197 ioqueue->connecting_count, pos);
198 pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE),
199 ioqueue->connecting_count, pos);
200 --ioqueue->connecting_count;
201
202 /* Disassociate the socket from the event. */
203 WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0);
204
205 /* Put event object to pool. */
206 if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) {
207 ioqueue->event_pool[ioqueue->event_count++] = hEvent;
208 } else {
209 /* Shouldn't happen. There should be no more pending connections
210 * than max.
211 */
212 pj_assert(0);
213 CloseHandle(hEvent);
214 }
215
216}
217
218/*
219 * Poll for the completion of non-blocking connect().
220 * If there's a completion, the function return the key of the completed
221 * socket, and 'result' argument contains the connect() result. If connect()
222 * succeeded, 'result' will have value zero, otherwise will have the error
223 * code.
224 */
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000225static int check_connecting( pj_ioqueue_t *ioqueue )
Benny Prijono9033e312005-11-21 02:08:39 +0000226{
Benny Prijono9033e312005-11-21 02:08:39 +0000227 if (ioqueue->connecting_count) {
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000228 int i, count;
229 struct
230 {
231 pj_ioqueue_key_t *key;
232 pj_status_t status;
233 } events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1];
Benny Prijono9033e312005-11-21 02:08:39 +0000234
235 pj_lock_acquire(ioqueue->lock);
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000236 for (count=0; count<PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1; ++count) {
237 DWORD result;
Benny Prijono9033e312005-11-21 02:08:39 +0000238
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000239 result = WaitForMultipleObjects(ioqueue->connecting_count,
240 ioqueue->connecting_handles,
241 FALSE, 0);
242 if (result >= WAIT_OBJECT_0 &&
243 result < WAIT_OBJECT_0+ioqueue->connecting_count)
244 {
245 WSANETWORKEVENTS net_events;
Benny Prijono9033e312005-11-21 02:08:39 +0000246
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000247 /* Got completed connect(). */
248 unsigned pos = result - WAIT_OBJECT_0;
249 events[count].key = ioqueue->connecting_keys[pos];
Benny Prijono9033e312005-11-21 02:08:39 +0000250
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000251 /* See whether connect has succeeded. */
252 WSAEnumNetworkEvents((pj_sock_t)events[count].key->hnd,
253 ioqueue->connecting_handles[pos],
254 &net_events);
255 events[count].status =
256 PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]);
257
258 /* Erase socket from pending connect. */
259 erase_connecting_socket(ioqueue, pos);
260 } else {
261 /* No more events */
262 break;
263 }
Benny Prijono9033e312005-11-21 02:08:39 +0000264 }
265 pj_lock_release(ioqueue->lock);
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000266
267 /* Call callbacks. */
268 for (i=0; i<count; ++i) {
269 if (events[i].key->cb.on_connect_complete) {
270 events[i].key->cb.on_connect_complete(events[i].key,
271 events[i].status);
272 }
273 }
274
275 return count;
Benny Prijono9033e312005-11-21 02:08:39 +0000276 }
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000277
278 return 0;
279
Benny Prijono9033e312005-11-21 02:08:39 +0000280}
281#endif
282
283/*
284 * pj_ioqueue_name()
285 */
286PJ_DEF(const char*) pj_ioqueue_name(void)
287{
288 return "iocp";
289}
290
291/*
292 * pj_ioqueue_create()
293 */
294PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
295 pj_size_t max_fd,
296 pj_ioqueue_t **p_ioqueue)
297{
298 pj_ioqueue_t *ioqueue;
Benny Prijono5accbd02006-03-30 16:32:18 +0000299 unsigned i;
Benny Prijono9033e312005-11-21 02:08:39 +0000300 pj_status_t rc;
301
302 PJ_UNUSED_ARG(max_fd);
303 PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL);
304
305 rc = sizeof(union operation_key);
306
307 /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */
308 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
309 sizeof(union operation_key), PJ_EBUG);
310
Benny Prijono5accbd02006-03-30 16:32:18 +0000311 /* Create IOCP */
Benny Prijono9033e312005-11-21 02:08:39 +0000312 ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue));
313 ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
314 if (ioqueue->iocp == NULL)
315 return PJ_RETURN_OS_ERROR(GetLastError());
316
Benny Prijono5accbd02006-03-30 16:32:18 +0000317 /* Create IOCP mutex */
Benny Prijono9033e312005-11-21 02:08:39 +0000318 rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock);
319 if (rc != PJ_SUCCESS) {
320 CloseHandle(ioqueue->iocp);
321 return rc;
322 }
323
324 ioqueue->auto_delete_lock = PJ_TRUE;
325
Benny Prijono5accbd02006-03-30 16:32:18 +0000326#if PJ_IOQUEUE_HAS_SAFE_UNREG
327 /*
328 * Create and initialize key pools.
329 */
330 pj_list_init(&ioqueue->active_list);
331 pj_list_init(&ioqueue->free_list);
332 pj_list_init(&ioqueue->closing_list);
333
334 /* Preallocate keys according to max_fd setting, and put them
335 * in free_list.
336 */
337 for (i=0; i<max_fd; ++i) {
338 pj_ioqueue_key_t *key;
339
340 key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t));
341
342 rc = pj_atomic_create(pool, 0, &key->ref_count);
343 if (rc != PJ_SUCCESS) {
344 key = ioqueue->free_list.next;
345 while (key != &ioqueue->free_list) {
346 pj_atomic_destroy(key->ref_count);
347 key = key->next;
348 }
349 CloseHandle(ioqueue->iocp);
350 return rc;
351 }
352
353 pj_list_push_back(&ioqueue->free_list, key);
354
355 }
356#endif
357
Benny Prijono9033e312005-11-21 02:08:39 +0000358 *p_ioqueue = ioqueue;
359
360 PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue));
361 return PJ_SUCCESS;
362}
363
364/*
365 * pj_ioqueue_destroy()
366 */
367PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
368{
369 unsigned i;
Benny Prijono5accbd02006-03-30 16:32:18 +0000370 pj_ioqueue_key_t *key;
Benny Prijono9033e312005-11-21 02:08:39 +0000371
372 PJ_CHECK_STACK();
373 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
374
Benny Prijono5accbd02006-03-30 16:32:18 +0000375 pj_lock_acquire(ioqueue->lock);
376
377#if PJ_HAS_TCP
Benny Prijono9033e312005-11-21 02:08:39 +0000378 /* Destroy events in the pool */
379 for (i=0; i<ioqueue->event_count; ++i) {
380 CloseHandle(ioqueue->event_pool[i]);
381 }
382 ioqueue->event_count = 0;
Benny Prijono5accbd02006-03-30 16:32:18 +0000383#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000384
385 if (CloseHandle(ioqueue->iocp) != TRUE)
386 return PJ_RETURN_OS_ERROR(GetLastError());
387
Benny Prijono5accbd02006-03-30 16:32:18 +0000388#if PJ_IOQUEUE_HAS_SAFE_UNREG
389 /* Destroy reference counters */
390 key = ioqueue->active_list.next;
391 while (key != &ioqueue->active_list) {
392 pj_atomic_destroy(key->ref_count);
393 key = key->next;
394 }
395
396 key = ioqueue->closing_list.next;
397 while (key != &ioqueue->closing_list) {
398 pj_atomic_destroy(key->ref_count);
399 key = key->next;
400 }
401
402 key = ioqueue->free_list.next;
403 while (key != &ioqueue->free_list) {
404 pj_atomic_destroy(key->ref_count);
405 key = key->next;
406 }
407#endif
408
Benny Prijono9033e312005-11-21 02:08:39 +0000409 if (ioqueue->auto_delete_lock)
410 pj_lock_destroy(ioqueue->lock);
411
412 return PJ_SUCCESS;
413}
414
415/*
416 * pj_ioqueue_set_lock()
417 */
418PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
419 pj_lock_t *lock,
420 pj_bool_t auto_delete )
421{
422 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
423
424 if (ioqueue->auto_delete_lock) {
425 pj_lock_destroy(ioqueue->lock);
426 }
427
428 ioqueue->lock = lock;
429 ioqueue->auto_delete_lock = auto_delete;
430
431 return PJ_SUCCESS;
432}
433
434/*
435 * pj_ioqueue_register_sock()
436 */
437PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
438 pj_ioqueue_t *ioqueue,
439 pj_sock_t sock,
440 void *user_data,
441 const pj_ioqueue_callback *cb,
442 pj_ioqueue_key_t **key )
443{
444 HANDLE hioq;
445 pj_ioqueue_key_t *rec;
446 u_long value;
447 int rc;
448
449 PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL);
450
Benny Prijono5accbd02006-03-30 16:32:18 +0000451 pj_lock_acquire(ioqueue->lock);
452
453#if PJ_IOQUEUE_HAS_SAFE_UNREG
454 /* If safe unregistration is used, then get the key record from
455 * the free list.
456 */
457 if (pj_list_empty(&ioqueue->free_list)) {
458 pj_lock_release(ioqueue->lock);
459 return PJ_ETOOMANY;
460 }
461
462 rec = ioqueue->free_list.next;
463 pj_list_erase(rec);
464
465 /* Set initial reference count to 1 */
466 pj_assert(pj_atomic_get(rec->ref_count) == 0);
467 pj_atomic_inc(rec->ref_count);
468
469 rec->closing = 0;
470
471#else
Benny Prijono9033e312005-11-21 02:08:39 +0000472 rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
Benny Prijono5accbd02006-03-30 16:32:18 +0000473#endif
474
475 /* Build the key for this socket. */
Benny Prijono9033e312005-11-21 02:08:39 +0000476 rec->ioqueue = ioqueue;
477 rec->hnd = (HANDLE)sock;
478 rec->hnd_type = HND_IS_SOCKET;
479 rec->user_data = user_data;
480 pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
481
Benny Prijono5accbd02006-03-30 16:32:18 +0000482#if PJ_HAS_TCP
483 rec->connecting = 0;
484#endif
485
Benny Prijono9033e312005-11-21 02:08:39 +0000486 /* Set socket to nonblocking. */
487 value = 1;
488 rc = ioctlsocket(sock, FIONBIO, &value);
489 if (rc != 0) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000490 pj_lock_release(ioqueue->lock);
Benny Prijono9033e312005-11-21 02:08:39 +0000491 return PJ_RETURN_OS_ERROR(WSAGetLastError());
492 }
493
494 /* Associate with IOCP */
495 hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0);
496 if (!hioq) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000497 pj_lock_release(ioqueue->lock);
Benny Prijono9033e312005-11-21 02:08:39 +0000498 return PJ_RETURN_OS_ERROR(GetLastError());
499 }
500
501 *key = rec;
Benny Prijono5accbd02006-03-30 16:32:18 +0000502
503#if PJ_IOQUEUE_HAS_SAFE_UNREG
504 pj_list_push_back(&ioqueue->active_list, rec);
505#endif
506
507 pj_lock_release(ioqueue->lock);
508
Benny Prijono9033e312005-11-21 02:08:39 +0000509 return PJ_SUCCESS;
510}
511
Benny Prijono9033e312005-11-21 02:08:39 +0000512
513/*
514 * pj_ioqueue_get_user_data()
515 */
516PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
517{
518 PJ_ASSERT_RETURN(key, NULL);
519 return key->user_data;
520}
521
522/*
523 * pj_ioqueue_set_user_data()
524 */
525PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
526 void *user_data,
527 void **old_data )
528{
529 PJ_ASSERT_RETURN(key, PJ_EINVAL);
530
531 if (old_data)
532 *old_data = key->user_data;
533
534 key->user_data = user_data;
535 return PJ_SUCCESS;
536}
537
Benny Prijono8d317a02006-03-22 11:49:19 +0000538
Benny Prijono5accbd02006-03-30 16:32:18 +0000539#if PJ_IOQUEUE_HAS_SAFE_UNREG
540/* Decrement the key's reference counter, and when the counter reach zero,
541 * destroy the key.
542 */
543static void decrement_counter(pj_ioqueue_key_t *key)
544{
545 if (pj_atomic_dec_and_get(key->ref_count) == 0) {
546
547 pj_lock_acquire(key->ioqueue->lock);
548
549 pj_assert(key->closing == 1);
550 pj_gettimeofday(&key->free_time);
551 key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
552 pj_time_val_normalize(&key->free_time);
553
554 pj_list_erase(key);
555 pj_list_push_back(&key->ioqueue->closing_list, key);
556
557 pj_lock_release(key->ioqueue->lock);
558 }
559}
560#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000561
Benny Prijono9033e312005-11-21 02:08:39 +0000562/*
Benny Prijono5accbd02006-03-30 16:32:18 +0000563 * Poll the I/O Completion Port, execute callback,
Benny Prijono8d317a02006-03-22 11:49:19 +0000564 * and return the key and bytes transfered of the last operation.
Benny Prijono9033e312005-11-21 02:08:39 +0000565 */
Benny Prijono8d317a02006-03-22 11:49:19 +0000566static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
567 pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key )
Benny Prijono9033e312005-11-21 02:08:39 +0000568{
Benny Prijono8d317a02006-03-22 11:49:19 +0000569 DWORD dwBytesTransfered, dwKey;
Benny Prijono9033e312005-11-21 02:08:39 +0000570 generic_overlapped *pOv;
571 pj_ioqueue_key_t *key;
Benny Prijono4f2be312005-11-21 17:01:06 +0000572 pj_ssize_t size_status = -1;
Benny Prijono8d317a02006-03-22 11:49:19 +0000573 BOOL rcGetQueued;
Benny Prijono9033e312005-11-21 02:08:39 +0000574
575 /* Poll for completion status. */
Benny Prijono8d317a02006-03-22 11:49:19 +0000576 rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransfered,
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000577 &dwKey, (OVERLAPPED**)&pOv,
Benny Prijono8d317a02006-03-22 11:49:19 +0000578 dwTimeout);
Benny Prijono9033e312005-11-21 02:08:39 +0000579
580 /* The return value is:
581 * - nonzero if event was dequeued.
582 * - zero and pOv==NULL if no event was dequeued.
583 * - zero and pOv!=NULL if event for failed I/O was dequeued.
584 */
585 if (pOv) {
586 /* Event was dequeued for either successfull or failed I/O */
587 key = (pj_ioqueue_key_t*)dwKey;
588 size_status = dwBytesTransfered;
Benny Prijono8d317a02006-03-22 11:49:19 +0000589
590 /* Report to caller regardless */
591 if (p_bytes)
592 *p_bytes = size_status;
593 if (p_key)
594 *p_key = key;
595
Benny Prijono5accbd02006-03-30 16:32:18 +0000596#if PJ_IOQUEUE_HAS_SAFE_UNREG
597 /* We shouldn't call callbacks if key is quitting. */
598 if (key->closing)
Benny Prijono8d317a02006-03-22 11:49:19 +0000599 return PJ_TRUE;
Benny Prijono8d317a02006-03-22 11:49:19 +0000600
Benny Prijono5accbd02006-03-30 16:32:18 +0000601 /* Increment reference counter to prevent this key from being
602 * deleted
Benny Prijono8d317a02006-03-22 11:49:19 +0000603 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000604 pj_atomic_inc(key->ref_count);
605#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000606
607 /* Carry out the callback */
Benny Prijono9033e312005-11-21 02:08:39 +0000608 switch (pOv->operation) {
609 case PJ_IOQUEUE_OP_READ:
610 case PJ_IOQUEUE_OP_RECV:
611 case PJ_IOQUEUE_OP_RECV_FROM:
612 pOv->operation = 0;
613 if (key->cb.on_read_complete)
614 key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv,
615 size_status);
616 break;
617 case PJ_IOQUEUE_OP_WRITE:
618 case PJ_IOQUEUE_OP_SEND:
619 case PJ_IOQUEUE_OP_SEND_TO:
620 pOv->operation = 0;
621 if (key->cb.on_write_complete)
622 key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv,
623 size_status);
624 break;
625#if PJ_HAS_TCP
626 case PJ_IOQUEUE_OP_ACCEPT:
627 /* special case for accept. */
628 ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv);
629 if (key->cb.on_accept_complete) {
630 ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv;
Benny Prijono9c025eb2006-07-10 21:35:27 +0000631 pj_status_t status = PJ_SUCCESS;
632
633 if (accept_rec->newsock == PJ_INVALID_SOCKET) {
634 int dwError = WSAGetLastError();
635 if (dwError == 0) dwError = OSERR_ENOTCONN;
636 status = PJ_RETURN_OS_ERROR(dwError);
637 }
638
Benny Prijono9033e312005-11-21 02:08:39 +0000639 key->cb.on_accept_complete(key,
640 (pj_ioqueue_op_key_t*)pOv,
641 accept_rec->newsock,
Benny Prijono9c025eb2006-07-10 21:35:27 +0000642 status);
Benny Prijono01de33b2006-06-28 15:23:18 +0000643 accept_rec->newsock = PJ_INVALID_SOCKET;
Benny Prijono9033e312005-11-21 02:08:39 +0000644 }
645 break;
646 case PJ_IOQUEUE_OP_CONNECT:
647#endif
648 case PJ_IOQUEUE_OP_NONE:
649 pj_assert(0);
650 break;
651 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000652
653#if PJ_IOQUEUE_HAS_SAFE_UNREG
654 decrement_counter(key);
655#endif
656
Benny Prijono8d317a02006-03-22 11:49:19 +0000657 return PJ_TRUE;
Benny Prijono9033e312005-11-21 02:08:39 +0000658 }
659
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000660 /* No event was queued. */
Benny Prijono8d317a02006-03-22 11:49:19 +0000661 return PJ_FALSE;
662}
663
664/*
665 * pj_ioqueue_unregister()
666 */
667PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
668{
Benny Prijono9fe1ad22007-02-17 00:30:24 +0000669 unsigned i;
670 enum { RETRY = 10 };
671
Benny Prijono8d317a02006-03-22 11:49:19 +0000672 PJ_ASSERT_RETURN(key, PJ_EINVAL);
673
674#if PJ_HAS_TCP
675 if (key->connecting) {
676 unsigned pos;
677 pj_ioqueue_t *ioqueue;
678
679 ioqueue = key->ioqueue;
680
681 /* Erase from connecting_handles */
682 pj_lock_acquire(ioqueue->lock);
683 for (pos=0; pos < ioqueue->connecting_count; ++pos) {
684 if (ioqueue->connecting_keys[pos] == key) {
685 erase_connecting_socket(ioqueue, pos);
686 break;
687 }
688 }
689 key->connecting = 0;
690 pj_lock_release(ioqueue->lock);
691 }
692#endif
Benny Prijono08beac62006-11-23 07:31:27 +0000693
694#if PJ_IOQUEUE_HAS_SAFE_UNREG
695 /* Mark key as closing before closing handle. */
696 key->closing = 1;
697#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000698
699 /* Close handle (the only way to disassociate handle from IOCP).
700 * We also need to close handle to make sure that no further events
701 * will come to the handle.
Benny Prijono8d317a02006-03-22 11:49:19 +0000702 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000703 CloseHandle(key->hnd);
Benny Prijono8d317a02006-03-22 11:49:19 +0000704
Benny Prijono5accbd02006-03-30 16:32:18 +0000705 /* Reset callbacks */
Benny Prijono8d317a02006-03-22 11:49:19 +0000706 key->cb.on_accept_complete = NULL;
707 key->cb.on_connect_complete = NULL;
Benny Prijono5accbd02006-03-30 16:32:18 +0000708 key->cb.on_read_complete = NULL;
709 key->cb.on_write_complete = NULL;
Benny Prijono8d317a02006-03-22 11:49:19 +0000710
Benny Prijono5accbd02006-03-30 16:32:18 +0000711#if PJ_IOQUEUE_HAS_SAFE_UNREG
Benny Prijono5accbd02006-03-30 16:32:18 +0000712 /* Even after handle is closed, I suspect that IOCP may still try to
713 * do something with the handle, causing memory corruption when pool
714 * debugging is enabled.
715 *
716 * Forcing context switch seems to have fixed that, but this is quite
717 * an ugly solution..
Benny Prijono8d317a02006-03-22 11:49:19 +0000718 */
Benny Prijono9fe1ad22007-02-17 00:30:24 +0000719 //This will loop forever if unregistration is done on the callback.
720 //Doing this with RETRY I think should solve the IOCP setting the
721 //socket signalled, without causing the deadlock.
722 //while (pj_atomic_get(key->ref_count) != 1)
723 // pj_thread_sleep(0);
724 for (i=0; pj_atomic_get(key->ref_count) != 1 && i<RETRY; ++i)
Benny Prijono08beac62006-11-23 07:31:27 +0000725 pj_thread_sleep(0);
726
727 /* Decrement reference counter to destroy the key. */
728 decrement_counter(key);
Benny Prijono5accbd02006-03-30 16:32:18 +0000729#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000730
731 return PJ_SUCCESS;
732}
733
734/*
735 * pj_ioqueue_poll()
736 *
737 * Poll for events.
738 */
739PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
740{
741 DWORD dwMsec;
742 int connect_count = 0;
Benny Prijono5accbd02006-03-30 16:32:18 +0000743 int event_count = 0;
Benny Prijono8d317a02006-03-22 11:49:19 +0000744
745 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
746
Benny Prijono8d317a02006-03-22 11:49:19 +0000747 /* Calculate miliseconds timeout for GetQueuedCompletionStatus */
748 dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
749
750 /* Poll for completion status. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000751 event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);
752
753#if PJ_HAS_TCP
754 /* Check the connecting array, only when there's no activity. */
755 if (event_count == 0) {
756 connect_count = check_connecting(ioqueue);
757 if (connect_count > 0)
758 event_count += connect_count;
759 }
760#endif
761
762#if PJ_IOQUEUE_HAS_SAFE_UNREG
763 /* Check the closing keys only when there's no activity and when there are
764 * pending closing keys.
Benny Prijono9fe1ad22007-02-17 00:30:24 +0000765 * blp:
766 * no, always check the list. Otherwise on busy activity, this will cause
767 * ioqueue to reject new registration.
Benny Prijono5accbd02006-03-30 16:32:18 +0000768 */
Benny Prijono9fe1ad22007-02-17 00:30:24 +0000769 if (/*event_count == 0 &&*/ !pj_list_empty(&ioqueue->closing_list)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000770 pj_time_val now;
771 pj_ioqueue_key_t *key;
772
773 pj_gettimeofday(&now);
774
775 /* Move closing keys to free list when they've finished the closing
776 * idle time.
777 */
778 pj_lock_acquire(ioqueue->lock);
779 key = ioqueue->closing_list.next;
780 while (key != &ioqueue->closing_list) {
781 pj_ioqueue_key_t *next = key->next;
782
783 pj_assert(key->closing != 0);
784
785 if (PJ_TIME_VAL_GTE(now, key->free_time)) {
786 pj_list_erase(key);
787 pj_list_push_back(&ioqueue->free_list, key);
788 }
789 key = next;
790 }
791 pj_lock_release(ioqueue->lock);
792 }
793#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000794
795 /* Return number of events. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000796 return event_count;
Benny Prijono9033e312005-11-21 02:08:39 +0000797}
798
799/*
800 * pj_ioqueue_recv()
801 *
802 * Initiate overlapped WSARecv() operation.
803 */
804PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
805 pj_ioqueue_op_key_t *op_key,
806 void *buffer,
807 pj_ssize_t *length,
808 pj_uint32_t flags )
809{
810 /*
811 * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and
812 * addrlen here. But unfortunately it generates EINVAL... :-(
813 * -bennylp
814 */
815 int rc;
816 DWORD bytesRead;
817 DWORD dwFlags = 0;
818 union operation_key *op_key_rec;
819
820 PJ_CHECK_STACK();
821 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
822
Benny Prijono5accbd02006-03-30 16:32:18 +0000823#if PJ_IOQUEUE_HAS_SAFE_UNREG
824 /* Check key is not closing */
825 if (key->closing)
826 return PJ_ECANCELLED;
827#endif
828
Benny Prijono9033e312005-11-21 02:08:39 +0000829 op_key_rec = (union operation_key*)op_key->internal__;
830 op_key_rec->overlapped.wsabuf.buf = buffer;
831 op_key_rec->overlapped.wsabuf.len = *length;
832
833 dwFlags = flags;
834
835 /* Try non-overlapped received first to see if data is
836 * immediately available.
837 */
838 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
839 rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
840 &bytesRead, &dwFlags, NULL, NULL);
841 if (rc == 0) {
842 *length = bytesRead;
843 return PJ_SUCCESS;
844 } else {
845 DWORD dwError = WSAGetLastError();
846 if (dwError != WSAEWOULDBLOCK) {
847 *length = -1;
848 return PJ_RETURN_OS_ERROR(dwError);
849 }
850 }
851 }
852
853 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
854
855 /*
856 * No immediate data available.
857 * Register overlapped Recv() operation.
858 */
Benny Prijonoac623b32006-07-03 15:19:31 +0000859 pj_bzero( &op_key_rec->overlapped.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +0000860 sizeof(op_key_rec->overlapped.overlapped));
861 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
862
863 rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
864 &bytesRead, &dwFlags,
865 &op_key_rec->overlapped.overlapped, NULL);
866 if (rc == SOCKET_ERROR) {
867 DWORD dwStatus = WSAGetLastError();
868 if (dwStatus!=WSA_IO_PENDING) {
869 *length = -1;
870 return PJ_STATUS_FROM_OS(dwStatus);
871 }
872 }
873
874 /* Pending operation has been scheduled. */
875 return PJ_EPENDING;
876}
877
878/*
879 * pj_ioqueue_recvfrom()
880 *
881 * Initiate overlapped RecvFrom() operation.
882 */
883PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
884 pj_ioqueue_op_key_t *op_key,
885 void *buffer,
886 pj_ssize_t *length,
887 pj_uint32_t flags,
888 pj_sockaddr_t *addr,
889 int *addrlen)
890{
891 int rc;
892 DWORD bytesRead;
893 DWORD dwFlags = 0;
894 union operation_key *op_key_rec;
895
896 PJ_CHECK_STACK();
897 PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
898
Benny Prijono5accbd02006-03-30 16:32:18 +0000899#if PJ_IOQUEUE_HAS_SAFE_UNREG
900 /* Check key is not closing */
901 if (key->closing)
902 return PJ_ECANCELLED;
903#endif
904
Benny Prijono9033e312005-11-21 02:08:39 +0000905 op_key_rec = (union operation_key*)op_key->internal__;
906 op_key_rec->overlapped.wsabuf.buf = buffer;
907 op_key_rec->overlapped.wsabuf.len = *length;
908
909 dwFlags = flags;
910
911 /* Try non-overlapped received first to see if data is
912 * immediately available.
913 */
914 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
915 rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
916 &bytesRead, &dwFlags, addr, addrlen, NULL, NULL);
917 if (rc == 0) {
918 *length = bytesRead;
919 return PJ_SUCCESS;
920 } else {
921 DWORD dwError = WSAGetLastError();
922 if (dwError != WSAEWOULDBLOCK) {
923 *length = -1;
924 return PJ_RETURN_OS_ERROR(dwError);
925 }
926 }
927 }
928
929 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
930
931 /*
932 * No immediate data available.
933 * Register overlapped Recv() operation.
934 */
Benny Prijonoac623b32006-07-03 15:19:31 +0000935 pj_bzero( &op_key_rec->overlapped.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +0000936 sizeof(op_key_rec->overlapped.overlapped));
937 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
938
939 rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
940 &bytesRead, &dwFlags, addr, addrlen,
941 &op_key_rec->overlapped.overlapped, NULL);
942 if (rc == SOCKET_ERROR) {
943 DWORD dwStatus = WSAGetLastError();
944 if (dwStatus!=WSA_IO_PENDING) {
945 *length = -1;
946 return PJ_STATUS_FROM_OS(dwStatus);
947 }
948 }
949
950 /* Pending operation has been scheduled. */
951 return PJ_EPENDING;
952}
953
954/*
955 * pj_ioqueue_send()
956 *
957 * Initiate overlapped Send operation.
958 */
959PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
960 pj_ioqueue_op_key_t *op_key,
961 const void *data,
962 pj_ssize_t *length,
963 pj_uint32_t flags )
964{
965 return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0);
966}
967
968
969/*
970 * pj_ioqueue_sendto()
971 *
972 * Initiate overlapped SendTo operation.
973 */
974PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
975 pj_ioqueue_op_key_t *op_key,
976 const void *data,
977 pj_ssize_t *length,
978 pj_uint32_t flags,
979 const pj_sockaddr_t *addr,
980 int addrlen)
981{
982 int rc;
983 DWORD bytesWritten;
984 DWORD dwFlags;
985 union operation_key *op_key_rec;
986
987 PJ_CHECK_STACK();
988 PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);
Benny Prijono5accbd02006-03-30 16:32:18 +0000989
990#if PJ_IOQUEUE_HAS_SAFE_UNREG
991 /* Check key is not closing */
992 if (key->closing)
993 return PJ_ECANCELLED;
994#endif
995
Benny Prijono9033e312005-11-21 02:08:39 +0000996 op_key_rec = (union operation_key*)op_key->internal__;
997
998 /*
999 * First try blocking write.
1000 */
1001 op_key_rec->overlapped.wsabuf.buf = (void*)data;
1002 op_key_rec->overlapped.wsabuf.len = *length;
1003
1004 dwFlags = flags;
1005
1006 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
1007 rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1008 &bytesWritten, dwFlags, addr, addrlen,
1009 NULL, NULL);
1010 if (rc == 0) {
1011 *length = bytesWritten;
1012 return PJ_SUCCESS;
1013 } else {
1014 DWORD dwStatus = WSAGetLastError();
1015 if (dwStatus != WSAEWOULDBLOCK) {
1016 *length = -1;
1017 return PJ_RETURN_OS_ERROR(dwStatus);
1018 }
1019 }
1020 }
1021
1022 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
1023
1024 /*
1025 * Data can't be sent immediately.
1026 * Schedule asynchronous WSASend().
1027 */
Benny Prijonoac623b32006-07-03 15:19:31 +00001028 pj_bzero( &op_key_rec->overlapped.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +00001029 sizeof(op_key_rec->overlapped.overlapped));
1030 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND;
1031
1032 rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1033 &bytesWritten, dwFlags, addr, addrlen,
1034 &op_key_rec->overlapped.overlapped, NULL);
1035 if (rc == SOCKET_ERROR) {
1036 DWORD dwStatus = WSAGetLastError();
1037 if (dwStatus!=WSA_IO_PENDING)
1038 return PJ_STATUS_FROM_OS(dwStatus);
1039 }
1040
1041 /* Asynchronous operation successfully submitted. */
1042 return PJ_EPENDING;
1043}
1044
1045#if PJ_HAS_TCP
1046
1047/*
1048 * pj_ioqueue_accept()
1049 *
1050 * Initiate overlapped accept() operation.
1051 */
1052PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
1053 pj_ioqueue_op_key_t *op_key,
1054 pj_sock_t *new_sock,
1055 pj_sockaddr_t *local,
1056 pj_sockaddr_t *remote,
1057 int *addrlen)
1058{
1059 BOOL rc;
1060 DWORD bytesReceived;
1061 pj_status_t status;
1062 union operation_key *op_key_rec;
1063 SOCKET sock;
1064
1065 PJ_CHECK_STACK();
1066 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
1067
Benny Prijono5accbd02006-03-30 16:32:18 +00001068#if PJ_IOQUEUE_HAS_SAFE_UNREG
1069 /* Check key is not closing */
1070 if (key->closing)
1071 return PJ_ECANCELLED;
1072#endif
1073
Benny Prijono9033e312005-11-21 02:08:39 +00001074 /*
1075 * See if there is a new connection immediately available.
1076 */
1077 sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0);
1078 if (sock != INVALID_SOCKET) {
1079 /* Yes! New socket is available! */
1080 int status;
1081
1082 status = getsockname(sock, local, addrlen);
1083 if (status != 0) {
1084 DWORD dwError = WSAGetLastError();
1085 closesocket(sock);
1086 return PJ_RETURN_OS_ERROR(dwError);
1087 }
1088
1089 *new_sock = sock;
1090 return PJ_SUCCESS;
1091
1092 } else {
1093 DWORD dwError = WSAGetLastError();
1094 if (dwError != WSAEWOULDBLOCK) {
1095 return PJ_RETURN_OS_ERROR(dwError);
1096 }
1097 }
1098
1099 /*
1100 * No connection is immediately available.
1101 * Must schedule an asynchronous operation.
1102 */
1103 op_key_rec = (union operation_key*)op_key->internal__;
1104
1105 status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0,
1106 &op_key_rec->accept.newsock);
1107 if (status != PJ_SUCCESS)
1108 return status;
1109
1110 /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket
1111 * addresses can be obtained with getsockname() and getpeername().
1112 */
1113 status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET,
1114 SO_UPDATE_ACCEPT_CONTEXT,
1115 (char*)&key->hnd, sizeof(SOCKET));
1116 /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
1117 * So ignore the error status.
1118 */
1119
1120 op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT;
1121 op_key_rec->accept.addrlen = addrlen;
1122 op_key_rec->accept.local = local;
1123 op_key_rec->accept.remote = remote;
1124 op_key_rec->accept.newsock_ptr = new_sock;
Benny Prijonoac623b32006-07-03 15:19:31 +00001125 pj_bzero( &op_key_rec->accept.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +00001126 sizeof(op_key_rec->accept.overlapped));
1127
1128 rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock,
1129 op_key_rec->accept.accept_buf,
1130 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN,
1131 &bytesReceived,
1132 &op_key_rec->accept.overlapped );
1133
1134 if (rc == TRUE) {
1135 ioqueue_on_accept_complete(&op_key_rec->accept);
1136 return PJ_SUCCESS;
1137 } else {
1138 DWORD dwStatus = WSAGetLastError();
1139 if (dwStatus!=WSA_IO_PENDING)
1140 return PJ_STATUS_FROM_OS(dwStatus);
1141 }
1142
1143 /* Asynchronous Accept() has been submitted. */
1144 return PJ_EPENDING;
1145}
1146
1147
1148/*
1149 * pj_ioqueue_connect()
1150 *
1151 * Initiate overlapped connect() operation (well, it's non-blocking actually,
1152 * since there's no overlapped version of connect()).
1153 */
1154PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
1155 const pj_sockaddr_t *addr,
1156 int addrlen )
1157{
1158 HANDLE hEvent;
1159 pj_ioqueue_t *ioqueue;
1160
1161 PJ_CHECK_STACK();
1162 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
1163
Benny Prijono5accbd02006-03-30 16:32:18 +00001164#if PJ_IOQUEUE_HAS_SAFE_UNREG
1165 /* Check key is not closing */
1166 if (key->closing)
1167 return PJ_ECANCELLED;
1168#endif
1169
Benny Prijono9033e312005-11-21 02:08:39 +00001170 /* Initiate connect() */
1171 if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) {
1172 DWORD dwStatus;
1173 dwStatus = WSAGetLastError();
1174 if (dwStatus != WSAEWOULDBLOCK) {
1175 return PJ_RETURN_OS_ERROR(dwStatus);
1176 }
1177 } else {
1178 /* Connect has completed immediately! */
1179 return PJ_SUCCESS;
1180 }
1181
1182 ioqueue = key->ioqueue;
1183
1184 /* Add to the array of connecting socket to be polled */
1185 pj_lock_acquire(ioqueue->lock);
1186
1187 if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) {
1188 pj_lock_release(ioqueue->lock);
1189 return PJ_ETOOMANYCONN;
1190 }
1191
1192 /* Get or create event object. */
1193 if (ioqueue->event_count) {
1194 hEvent = ioqueue->event_pool[ioqueue->event_count - 1];
1195 --ioqueue->event_count;
1196 } else {
1197 hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
1198 if (hEvent == NULL) {
1199 DWORD dwStatus = GetLastError();
1200 pj_lock_release(ioqueue->lock);
1201 return PJ_STATUS_FROM_OS(dwStatus);
1202 }
1203 }
1204
1205 /* Mark key as connecting.
1206 * We can't use array index since key can be removed dynamically.
1207 */
1208 key->connecting = 1;
1209
1210 /* Associate socket events to the event object. */
1211 if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) {
1212 CloseHandle(hEvent);
1213 pj_lock_release(ioqueue->lock);
1214 return PJ_RETURN_OS_ERROR(WSAGetLastError());
1215 }
1216
1217 /* Add to array. */
1218 ioqueue->connecting_keys[ ioqueue->connecting_count ] = key;
1219 ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent;
1220 ioqueue->connecting_count++;
1221
1222 pj_lock_release(ioqueue->lock);
1223
1224 return PJ_EPENDING;
1225}
1226#endif /* #if PJ_HAS_TCP */
1227
1228
1229PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1230 pj_size_t size )
1231{
Benny Prijonoac623b32006-07-03 15:19:31 +00001232 pj_bzero(op_key, size);
Benny Prijono9033e312005-11-21 02:08:39 +00001233}
1234
1235PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1236 pj_ioqueue_op_key_t *op_key )
1237{
1238 BOOL rc;
1239 DWORD bytesTransfered;
1240
1241 rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key,
1242 &bytesTransfered, FALSE );
1243
1244 if (rc == FALSE) {
1245 return GetLastError()==ERROR_IO_INCOMPLETE;
1246 }
1247
1248 return FALSE;
1249}
1250
1251
1252PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1253 pj_ioqueue_op_key_t *op_key,
1254 pj_ssize_t bytes_status )
1255{
1256 BOOL rc;
1257
1258 rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status,
1259 (long)key, (OVERLAPPED*)op_key );
1260 if (rc == FALSE) {
1261 return PJ_RETURN_OS_ERROR(GetLastError());
1262 }
1263
1264 return PJ_SUCCESS;
1265}
1266