blob: 3e8253e1ab0f08ef4c160ffc95c7be491b73fe37 [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
Benny Prijonoa771a512007-02-19 01:13:53 +00003 * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org>
Benny Prijono9033e312005-11-21 02:08:39 +00004 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19#include <pj/ioqueue.h>
20#include <pj/os.h>
21#include <pj/lock.h>
22#include <pj/pool.h>
23#include <pj/string.h>
24#include <pj/sock.h>
25#include <pj/array.h>
26#include <pj/log.h>
27#include <pj/assert.h>
28#include <pj/errno.h>
Benny Prijono9c025eb2006-07-10 21:35:27 +000029#include <pj/compat/socket.h>
Benny Prijono9033e312005-11-21 02:08:39 +000030
31
32#if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0
33# include <winsock2.h>
34#elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0
35# include <winsock.h>
36#endif
37
38#if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0
39# include <mswsock.h>
40#endif
41
42
43/* The address specified in AcceptEx() must be 16 more than the size of
44 * SOCKADDR (source: MSDN).
45 */
46#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16)
47
48typedef struct generic_overlapped
49{
50 WSAOVERLAPPED overlapped;
51 pj_ioqueue_operation_e operation;
52} generic_overlapped;
53
54/*
55 * OVERLAPPPED structure for send and receive.
56 */
57typedef struct ioqueue_overlapped
58{
59 WSAOVERLAPPED overlapped;
60 pj_ioqueue_operation_e operation;
61 WSABUF wsabuf;
62 pj_sockaddr_in dummy_addr;
63 int dummy_addrlen;
64} ioqueue_overlapped;
65
66#if PJ_HAS_TCP
67/*
68 * OVERLAP structure for accept.
69 */
70typedef struct ioqueue_accept_rec
71{
72 WSAOVERLAPPED overlapped;
73 pj_ioqueue_operation_e operation;
74 pj_sock_t newsock;
75 pj_sock_t *newsock_ptr;
76 int *addrlen;
77 void *remote;
78 void *local;
79 char accept_buf[2 * ACCEPT_ADDR_LEN];
80} ioqueue_accept_rec;
81#endif
82
83/*
84 * Structure to hold pending operation key.
85 */
86union operation_key
87{
88 generic_overlapped generic;
89 ioqueue_overlapped overlapped;
90#if PJ_HAS_TCP
91 ioqueue_accept_rec accept;
92#endif
93};
94
95/* Type of handle in the key. */
96enum handle_type
97{
98 HND_IS_UNKNOWN,
99 HND_IS_FILE,
100 HND_IS_SOCKET,
101};
102
Benny Prijono8d317a02006-03-22 11:49:19 +0000103enum { POST_QUIT_LEN = 0xFFFFDEADUL };
104
Benny Prijono9033e312005-11-21 02:08:39 +0000105/*
106 * Structure for individual socket.
107 */
108struct pj_ioqueue_key_t
109{
Benny Prijono5accbd02006-03-30 16:32:18 +0000110 PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
111
Benny Prijono9033e312005-11-21 02:08:39 +0000112 pj_ioqueue_t *ioqueue;
113 HANDLE hnd;
114 void *user_data;
115 enum handle_type hnd_type;
Benny Prijono5accbd02006-03-30 16:32:18 +0000116 pj_ioqueue_callback cb;
117
Benny Prijono9033e312005-11-21 02:08:39 +0000118#if PJ_HAS_TCP
119 int connecting;
120#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000121
122#if PJ_IOQUEUE_HAS_SAFE_UNREG
123 pj_atomic_t *ref_count;
124 pj_bool_t closing;
125 pj_time_val free_time;
126#endif
127
Benny Prijono9033e312005-11-21 02:08:39 +0000128};
129
130/*
131 * IO Queue structure.
132 */
133struct pj_ioqueue_t
134{
135 HANDLE iocp;
136 pj_lock_t *lock;
137 pj_bool_t auto_delete_lock;
Benny Prijono5accbd02006-03-30 16:32:18 +0000138
139#if PJ_IOQUEUE_HAS_SAFE_UNREG
140 pj_ioqueue_key_t active_list;
141 pj_ioqueue_key_t free_list;
142 pj_ioqueue_key_t closing_list;
143#endif
144
145 /* These are to keep track of connecting sockets */
146#if PJ_HAS_TCP
Benny Prijono9033e312005-11-21 02:08:39 +0000147 unsigned event_count;
148 HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1];
Benny Prijono9033e312005-11-21 02:08:39 +0000149 unsigned connecting_count;
150 HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1];
151 pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1];
152#endif
153};
154
155
156#if PJ_HAS_TCP
157/*
158 * Process the socket when the overlapped accept() completed.
159 */
160static void ioqueue_on_accept_complete(ioqueue_accept_rec *accept_overlapped)
161{
162 struct sockaddr *local;
163 struct sockaddr *remote;
164 int locallen, remotelen;
165
166 PJ_CHECK_STACK();
167
168 /* Operation complete immediately. */
169 GetAcceptExSockaddrs( accept_overlapped->accept_buf,
170 0,
171 ACCEPT_ADDR_LEN,
172 ACCEPT_ADDR_LEN,
173 &local,
174 &locallen,
175 &remote,
176 &remotelen);
Benny Prijono9c025eb2006-07-10 21:35:27 +0000177 if (*accept_overlapped->addrlen >= locallen) {
Benny Prijono9033e312005-11-21 02:08:39 +0000178 pj_memcpy(accept_overlapped->local, local, locallen);
179 pj_memcpy(accept_overlapped->remote, remote, locallen);
180 } else {
Benny Prijonoac623b32006-07-03 15:19:31 +0000181 pj_bzero(accept_overlapped->local, *accept_overlapped->addrlen);
182 pj_bzero(accept_overlapped->remote, *accept_overlapped->addrlen);
Benny Prijono9033e312005-11-21 02:08:39 +0000183 }
184 *accept_overlapped->addrlen = locallen;
185 if (accept_overlapped->newsock_ptr)
186 *accept_overlapped->newsock_ptr = accept_overlapped->newsock;
187 accept_overlapped->operation = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000188}
189
190static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos)
191{
192 pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos];
193 HANDLE hEvent = ioqueue->connecting_handles[pos];
194
195 /* Remove key from array of connecting handles. */
196 pj_array_erase(ioqueue->connecting_keys, sizeof(key),
197 ioqueue->connecting_count, pos);
198 pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE),
199 ioqueue->connecting_count, pos);
200 --ioqueue->connecting_count;
201
202 /* Disassociate the socket from the event. */
203 WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0);
204
205 /* Put event object to pool. */
206 if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) {
207 ioqueue->event_pool[ioqueue->event_count++] = hEvent;
208 } else {
209 /* Shouldn't happen. There should be no more pending connections
210 * than max.
211 */
212 pj_assert(0);
213 CloseHandle(hEvent);
214 }
215
216}
217
218/*
219 * Poll for the completion of non-blocking connect().
220 * If there's a completion, the function return the key of the completed
221 * socket, and 'result' argument contains the connect() result. If connect()
222 * succeeded, 'result' will have value zero, otherwise will have the error
223 * code.
224 */
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000225static int check_connecting( pj_ioqueue_t *ioqueue )
Benny Prijono9033e312005-11-21 02:08:39 +0000226{
Benny Prijono9033e312005-11-21 02:08:39 +0000227 if (ioqueue->connecting_count) {
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000228 int i, count;
229 struct
230 {
231 pj_ioqueue_key_t *key;
232 pj_status_t status;
233 } events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1];
Benny Prijono9033e312005-11-21 02:08:39 +0000234
235 pj_lock_acquire(ioqueue->lock);
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000236 for (count=0; count<PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1; ++count) {
237 DWORD result;
Benny Prijono9033e312005-11-21 02:08:39 +0000238
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000239 result = WaitForMultipleObjects(ioqueue->connecting_count,
240 ioqueue->connecting_handles,
241 FALSE, 0);
242 if (result >= WAIT_OBJECT_0 &&
243 result < WAIT_OBJECT_0+ioqueue->connecting_count)
244 {
245 WSANETWORKEVENTS net_events;
Benny Prijono9033e312005-11-21 02:08:39 +0000246
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000247 /* Got completed connect(). */
248 unsigned pos = result - WAIT_OBJECT_0;
249 events[count].key = ioqueue->connecting_keys[pos];
Benny Prijono9033e312005-11-21 02:08:39 +0000250
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000251 /* See whether connect has succeeded. */
252 WSAEnumNetworkEvents((pj_sock_t)events[count].key->hnd,
253 ioqueue->connecting_handles[pos],
254 &net_events);
255 events[count].status =
256 PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]);
257
258 /* Erase socket from pending connect. */
259 erase_connecting_socket(ioqueue, pos);
260 } else {
261 /* No more events */
262 break;
263 }
Benny Prijono9033e312005-11-21 02:08:39 +0000264 }
265 pj_lock_release(ioqueue->lock);
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000266
267 /* Call callbacks. */
268 for (i=0; i<count; ++i) {
269 if (events[i].key->cb.on_connect_complete) {
270 events[i].key->cb.on_connect_complete(events[i].key,
271 events[i].status);
272 }
273 }
274
275 return count;
Benny Prijono9033e312005-11-21 02:08:39 +0000276 }
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000277
278 return 0;
279
Benny Prijono9033e312005-11-21 02:08:39 +0000280}
281#endif
282
283/*
284 * pj_ioqueue_name()
285 */
286PJ_DEF(const char*) pj_ioqueue_name(void)
287{
288 return "iocp";
289}
290
291/*
292 * pj_ioqueue_create()
293 */
294PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
295 pj_size_t max_fd,
296 pj_ioqueue_t **p_ioqueue)
297{
298 pj_ioqueue_t *ioqueue;
Benny Prijono5accbd02006-03-30 16:32:18 +0000299 unsigned i;
Benny Prijono9033e312005-11-21 02:08:39 +0000300 pj_status_t rc;
301
302 PJ_UNUSED_ARG(max_fd);
303 PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL);
304
305 rc = sizeof(union operation_key);
306
307 /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */
308 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
309 sizeof(union operation_key), PJ_EBUG);
310
Benny Prijono5accbd02006-03-30 16:32:18 +0000311 /* Create IOCP */
Benny Prijono9033e312005-11-21 02:08:39 +0000312 ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue));
313 ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
314 if (ioqueue->iocp == NULL)
315 return PJ_RETURN_OS_ERROR(GetLastError());
316
Benny Prijono5accbd02006-03-30 16:32:18 +0000317 /* Create IOCP mutex */
Benny Prijono9033e312005-11-21 02:08:39 +0000318 rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock);
319 if (rc != PJ_SUCCESS) {
320 CloseHandle(ioqueue->iocp);
321 return rc;
322 }
323
324 ioqueue->auto_delete_lock = PJ_TRUE;
325
Benny Prijono5accbd02006-03-30 16:32:18 +0000326#if PJ_IOQUEUE_HAS_SAFE_UNREG
327 /*
328 * Create and initialize key pools.
329 */
330 pj_list_init(&ioqueue->active_list);
331 pj_list_init(&ioqueue->free_list);
332 pj_list_init(&ioqueue->closing_list);
333
334 /* Preallocate keys according to max_fd setting, and put them
335 * in free_list.
336 */
337 for (i=0; i<max_fd; ++i) {
338 pj_ioqueue_key_t *key;
339
340 key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t));
341
342 rc = pj_atomic_create(pool, 0, &key->ref_count);
343 if (rc != PJ_SUCCESS) {
344 key = ioqueue->free_list.next;
345 while (key != &ioqueue->free_list) {
346 pj_atomic_destroy(key->ref_count);
347 key = key->next;
348 }
349 CloseHandle(ioqueue->iocp);
350 return rc;
351 }
352
353 pj_list_push_back(&ioqueue->free_list, key);
354
355 }
356#endif
357
Benny Prijono9033e312005-11-21 02:08:39 +0000358 *p_ioqueue = ioqueue;
359
360 PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue));
361 return PJ_SUCCESS;
362}
363
364/*
365 * pj_ioqueue_destroy()
366 */
367PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
368{
Benny Prijono3569c0d2007-04-06 10:29:20 +0000369#if PJ_HAS_TCP
Benny Prijono9033e312005-11-21 02:08:39 +0000370 unsigned i;
Benny Prijono3569c0d2007-04-06 10:29:20 +0000371#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000372 pj_ioqueue_key_t *key;
Benny Prijono9033e312005-11-21 02:08:39 +0000373
374 PJ_CHECK_STACK();
375 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
376
Benny Prijono5accbd02006-03-30 16:32:18 +0000377 pj_lock_acquire(ioqueue->lock);
378
379#if PJ_HAS_TCP
Benny Prijono9033e312005-11-21 02:08:39 +0000380 /* Destroy events in the pool */
381 for (i=0; i<ioqueue->event_count; ++i) {
382 CloseHandle(ioqueue->event_pool[i]);
383 }
384 ioqueue->event_count = 0;
Benny Prijono5accbd02006-03-30 16:32:18 +0000385#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000386
387 if (CloseHandle(ioqueue->iocp) != TRUE)
388 return PJ_RETURN_OS_ERROR(GetLastError());
389
Benny Prijono5accbd02006-03-30 16:32:18 +0000390#if PJ_IOQUEUE_HAS_SAFE_UNREG
391 /* Destroy reference counters */
392 key = ioqueue->active_list.next;
393 while (key != &ioqueue->active_list) {
394 pj_atomic_destroy(key->ref_count);
395 key = key->next;
396 }
397
398 key = ioqueue->closing_list.next;
399 while (key != &ioqueue->closing_list) {
400 pj_atomic_destroy(key->ref_count);
401 key = key->next;
402 }
403
404 key = ioqueue->free_list.next;
405 while (key != &ioqueue->free_list) {
406 pj_atomic_destroy(key->ref_count);
407 key = key->next;
408 }
409#endif
410
Benny Prijono9033e312005-11-21 02:08:39 +0000411 if (ioqueue->auto_delete_lock)
412 pj_lock_destroy(ioqueue->lock);
413
414 return PJ_SUCCESS;
415}
416
417/*
418 * pj_ioqueue_set_lock()
419 */
420PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
421 pj_lock_t *lock,
422 pj_bool_t auto_delete )
423{
424 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
425
426 if (ioqueue->auto_delete_lock) {
427 pj_lock_destroy(ioqueue->lock);
428 }
429
430 ioqueue->lock = lock;
431 ioqueue->auto_delete_lock = auto_delete;
432
433 return PJ_SUCCESS;
434}
435
436/*
437 * pj_ioqueue_register_sock()
438 */
439PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
440 pj_ioqueue_t *ioqueue,
441 pj_sock_t sock,
442 void *user_data,
443 const pj_ioqueue_callback *cb,
444 pj_ioqueue_key_t **key )
445{
446 HANDLE hioq;
447 pj_ioqueue_key_t *rec;
448 u_long value;
449 int rc;
450
451 PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL);
452
Benny Prijono5accbd02006-03-30 16:32:18 +0000453 pj_lock_acquire(ioqueue->lock);
454
455#if PJ_IOQUEUE_HAS_SAFE_UNREG
456 /* If safe unregistration is used, then get the key record from
457 * the free list.
458 */
459 if (pj_list_empty(&ioqueue->free_list)) {
460 pj_lock_release(ioqueue->lock);
461 return PJ_ETOOMANY;
462 }
463
464 rec = ioqueue->free_list.next;
465 pj_list_erase(rec);
466
467 /* Set initial reference count to 1 */
468 pj_assert(pj_atomic_get(rec->ref_count) == 0);
469 pj_atomic_inc(rec->ref_count);
470
471 rec->closing = 0;
472
473#else
Benny Prijono9033e312005-11-21 02:08:39 +0000474 rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
Benny Prijono5accbd02006-03-30 16:32:18 +0000475#endif
476
477 /* Build the key for this socket. */
Benny Prijono9033e312005-11-21 02:08:39 +0000478 rec->ioqueue = ioqueue;
479 rec->hnd = (HANDLE)sock;
480 rec->hnd_type = HND_IS_SOCKET;
481 rec->user_data = user_data;
482 pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
483
Benny Prijono5accbd02006-03-30 16:32:18 +0000484#if PJ_HAS_TCP
485 rec->connecting = 0;
486#endif
487
Benny Prijono9033e312005-11-21 02:08:39 +0000488 /* Set socket to nonblocking. */
489 value = 1;
490 rc = ioctlsocket(sock, FIONBIO, &value);
491 if (rc != 0) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000492 pj_lock_release(ioqueue->lock);
Benny Prijono9033e312005-11-21 02:08:39 +0000493 return PJ_RETURN_OS_ERROR(WSAGetLastError());
494 }
495
496 /* Associate with IOCP */
497 hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0);
498 if (!hioq) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000499 pj_lock_release(ioqueue->lock);
Benny Prijono9033e312005-11-21 02:08:39 +0000500 return PJ_RETURN_OS_ERROR(GetLastError());
501 }
502
503 *key = rec;
Benny Prijono5accbd02006-03-30 16:32:18 +0000504
505#if PJ_IOQUEUE_HAS_SAFE_UNREG
506 pj_list_push_back(&ioqueue->active_list, rec);
507#endif
508
509 pj_lock_release(ioqueue->lock);
510
Benny Prijono9033e312005-11-21 02:08:39 +0000511 return PJ_SUCCESS;
512}
513
Benny Prijono9033e312005-11-21 02:08:39 +0000514
515/*
516 * pj_ioqueue_get_user_data()
517 */
518PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
519{
520 PJ_ASSERT_RETURN(key, NULL);
521 return key->user_data;
522}
523
524/*
525 * pj_ioqueue_set_user_data()
526 */
527PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
528 void *user_data,
529 void **old_data )
530{
531 PJ_ASSERT_RETURN(key, PJ_EINVAL);
532
533 if (old_data)
534 *old_data = key->user_data;
535
536 key->user_data = user_data;
537 return PJ_SUCCESS;
538}
539
Benny Prijono8d317a02006-03-22 11:49:19 +0000540
Benny Prijono5accbd02006-03-30 16:32:18 +0000541#if PJ_IOQUEUE_HAS_SAFE_UNREG
542/* Decrement the key's reference counter, and when the counter reach zero,
543 * destroy the key.
544 */
545static void decrement_counter(pj_ioqueue_key_t *key)
546{
547 if (pj_atomic_dec_and_get(key->ref_count) == 0) {
548
549 pj_lock_acquire(key->ioqueue->lock);
550
551 pj_assert(key->closing == 1);
552 pj_gettimeofday(&key->free_time);
553 key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
554 pj_time_val_normalize(&key->free_time);
555
556 pj_list_erase(key);
557 pj_list_push_back(&key->ioqueue->closing_list, key);
558
559 pj_lock_release(key->ioqueue->lock);
560 }
561}
562#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000563
Benny Prijono9033e312005-11-21 02:08:39 +0000564/*
Benny Prijono5accbd02006-03-30 16:32:18 +0000565 * Poll the I/O Completion Port, execute callback,
Benny Prijono8d317a02006-03-22 11:49:19 +0000566 * and return the key and bytes transfered of the last operation.
Benny Prijono9033e312005-11-21 02:08:39 +0000567 */
Benny Prijono8d317a02006-03-22 11:49:19 +0000568static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
569 pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key )
Benny Prijono9033e312005-11-21 02:08:39 +0000570{
Benny Prijono8d317a02006-03-22 11:49:19 +0000571 DWORD dwBytesTransfered, dwKey;
Benny Prijono9033e312005-11-21 02:08:39 +0000572 generic_overlapped *pOv;
573 pj_ioqueue_key_t *key;
Benny Prijono4f2be312005-11-21 17:01:06 +0000574 pj_ssize_t size_status = -1;
Benny Prijono8d317a02006-03-22 11:49:19 +0000575 BOOL rcGetQueued;
Benny Prijono9033e312005-11-21 02:08:39 +0000576
577 /* Poll for completion status. */
Benny Prijono8d317a02006-03-22 11:49:19 +0000578 rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransfered,
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000579 &dwKey, (OVERLAPPED**)&pOv,
Benny Prijono8d317a02006-03-22 11:49:19 +0000580 dwTimeout);
Benny Prijono9033e312005-11-21 02:08:39 +0000581
582 /* The return value is:
583 * - nonzero if event was dequeued.
584 * - zero and pOv==NULL if no event was dequeued.
585 * - zero and pOv!=NULL if event for failed I/O was dequeued.
586 */
587 if (pOv) {
588 /* Event was dequeued for either successfull or failed I/O */
589 key = (pj_ioqueue_key_t*)dwKey;
590 size_status = dwBytesTransfered;
Benny Prijono8d317a02006-03-22 11:49:19 +0000591
592 /* Report to caller regardless */
593 if (p_bytes)
594 *p_bytes = size_status;
595 if (p_key)
596 *p_key = key;
597
Benny Prijono5accbd02006-03-30 16:32:18 +0000598#if PJ_IOQUEUE_HAS_SAFE_UNREG
599 /* We shouldn't call callbacks if key is quitting. */
600 if (key->closing)
Benny Prijono8d317a02006-03-22 11:49:19 +0000601 return PJ_TRUE;
Benny Prijono8d317a02006-03-22 11:49:19 +0000602
Benny Prijono5accbd02006-03-30 16:32:18 +0000603 /* Increment reference counter to prevent this key from being
604 * deleted
Benny Prijono8d317a02006-03-22 11:49:19 +0000605 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000606 pj_atomic_inc(key->ref_count);
607#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000608
609 /* Carry out the callback */
Benny Prijono9033e312005-11-21 02:08:39 +0000610 switch (pOv->operation) {
611 case PJ_IOQUEUE_OP_READ:
612 case PJ_IOQUEUE_OP_RECV:
613 case PJ_IOQUEUE_OP_RECV_FROM:
614 pOv->operation = 0;
615 if (key->cb.on_read_complete)
616 key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv,
617 size_status);
618 break;
619 case PJ_IOQUEUE_OP_WRITE:
620 case PJ_IOQUEUE_OP_SEND:
621 case PJ_IOQUEUE_OP_SEND_TO:
622 pOv->operation = 0;
623 if (key->cb.on_write_complete)
624 key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv,
625 size_status);
626 break;
627#if PJ_HAS_TCP
628 case PJ_IOQUEUE_OP_ACCEPT:
629 /* special case for accept. */
630 ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv);
631 if (key->cb.on_accept_complete) {
632 ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv;
Benny Prijono9c025eb2006-07-10 21:35:27 +0000633 pj_status_t status = PJ_SUCCESS;
634
635 if (accept_rec->newsock == PJ_INVALID_SOCKET) {
636 int dwError = WSAGetLastError();
637 if (dwError == 0) dwError = OSERR_ENOTCONN;
638 status = PJ_RETURN_OS_ERROR(dwError);
639 }
640
Benny Prijono9033e312005-11-21 02:08:39 +0000641 key->cb.on_accept_complete(key,
642 (pj_ioqueue_op_key_t*)pOv,
643 accept_rec->newsock,
Benny Prijono9c025eb2006-07-10 21:35:27 +0000644 status);
Benny Prijono01de33b2006-06-28 15:23:18 +0000645 accept_rec->newsock = PJ_INVALID_SOCKET;
Benny Prijono9033e312005-11-21 02:08:39 +0000646 }
647 break;
648 case PJ_IOQUEUE_OP_CONNECT:
649#endif
650 case PJ_IOQUEUE_OP_NONE:
651 pj_assert(0);
652 break;
653 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000654
655#if PJ_IOQUEUE_HAS_SAFE_UNREG
656 decrement_counter(key);
657#endif
658
Benny Prijono8d317a02006-03-22 11:49:19 +0000659 return PJ_TRUE;
Benny Prijono9033e312005-11-21 02:08:39 +0000660 }
661
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000662 /* No event was queued. */
Benny Prijono8d317a02006-03-22 11:49:19 +0000663 return PJ_FALSE;
664}
665
666/*
667 * pj_ioqueue_unregister()
668 */
669PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
670{
Benny Prijono9fe1ad22007-02-17 00:30:24 +0000671 unsigned i;
672 enum { RETRY = 10 };
673
Benny Prijono8d317a02006-03-22 11:49:19 +0000674 PJ_ASSERT_RETURN(key, PJ_EINVAL);
675
676#if PJ_HAS_TCP
677 if (key->connecting) {
678 unsigned pos;
679 pj_ioqueue_t *ioqueue;
680
681 ioqueue = key->ioqueue;
682
683 /* Erase from connecting_handles */
684 pj_lock_acquire(ioqueue->lock);
685 for (pos=0; pos < ioqueue->connecting_count; ++pos) {
686 if (ioqueue->connecting_keys[pos] == key) {
687 erase_connecting_socket(ioqueue, pos);
688 break;
689 }
690 }
691 key->connecting = 0;
692 pj_lock_release(ioqueue->lock);
693 }
694#endif
Benny Prijono08beac62006-11-23 07:31:27 +0000695
696#if PJ_IOQUEUE_HAS_SAFE_UNREG
697 /* Mark key as closing before closing handle. */
698 key->closing = 1;
699#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000700
701 /* Close handle (the only way to disassociate handle from IOCP).
702 * We also need to close handle to make sure that no further events
703 * will come to the handle.
Benny Prijono8d317a02006-03-22 11:49:19 +0000704 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000705 CloseHandle(key->hnd);
Benny Prijono8d317a02006-03-22 11:49:19 +0000706
Benny Prijono5accbd02006-03-30 16:32:18 +0000707 /* Reset callbacks */
Benny Prijono8d317a02006-03-22 11:49:19 +0000708 key->cb.on_accept_complete = NULL;
709 key->cb.on_connect_complete = NULL;
Benny Prijono5accbd02006-03-30 16:32:18 +0000710 key->cb.on_read_complete = NULL;
711 key->cb.on_write_complete = NULL;
Benny Prijono8d317a02006-03-22 11:49:19 +0000712
Benny Prijono5accbd02006-03-30 16:32:18 +0000713#if PJ_IOQUEUE_HAS_SAFE_UNREG
Benny Prijono5accbd02006-03-30 16:32:18 +0000714 /* Even after handle is closed, I suspect that IOCP may still try to
715 * do something with the handle, causing memory corruption when pool
716 * debugging is enabled.
717 *
718 * Forcing context switch seems to have fixed that, but this is quite
719 * an ugly solution..
Benny Prijono8d317a02006-03-22 11:49:19 +0000720 */
Benny Prijono9fe1ad22007-02-17 00:30:24 +0000721 //This will loop forever if unregistration is done on the callback.
722 //Doing this with RETRY I think should solve the IOCP setting the
723 //socket signalled, without causing the deadlock.
724 //while (pj_atomic_get(key->ref_count) != 1)
725 // pj_thread_sleep(0);
726 for (i=0; pj_atomic_get(key->ref_count) != 1 && i<RETRY; ++i)
Benny Prijono08beac62006-11-23 07:31:27 +0000727 pj_thread_sleep(0);
728
729 /* Decrement reference counter to destroy the key. */
730 decrement_counter(key);
Benny Prijono5accbd02006-03-30 16:32:18 +0000731#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000732
733 return PJ_SUCCESS;
734}
735
736/*
737 * pj_ioqueue_poll()
738 *
739 * Poll for events.
740 */
741PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
742{
743 DWORD dwMsec;
Benny Prijono3569c0d2007-04-06 10:29:20 +0000744#if PJ_HAS_TCP
Benny Prijono8d317a02006-03-22 11:49:19 +0000745 int connect_count = 0;
Benny Prijono3569c0d2007-04-06 10:29:20 +0000746#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000747 int event_count = 0;
Benny Prijono8d317a02006-03-22 11:49:19 +0000748
749 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
750
Benny Prijono8d317a02006-03-22 11:49:19 +0000751 /* Calculate miliseconds timeout for GetQueuedCompletionStatus */
752 dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
753
754 /* Poll for completion status. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000755 event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);
756
757#if PJ_HAS_TCP
758 /* Check the connecting array, only when there's no activity. */
759 if (event_count == 0) {
760 connect_count = check_connecting(ioqueue);
761 if (connect_count > 0)
762 event_count += connect_count;
763 }
764#endif
765
766#if PJ_IOQUEUE_HAS_SAFE_UNREG
767 /* Check the closing keys only when there's no activity and when there are
768 * pending closing keys.
Benny Prijono9fe1ad22007-02-17 00:30:24 +0000769 * blp:
770 * no, always check the list. Otherwise on busy activity, this will cause
771 * ioqueue to reject new registration.
Benny Prijono5accbd02006-03-30 16:32:18 +0000772 */
Benny Prijono9fe1ad22007-02-17 00:30:24 +0000773 if (/*event_count == 0 &&*/ !pj_list_empty(&ioqueue->closing_list)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000774 pj_time_val now;
775 pj_ioqueue_key_t *key;
776
777 pj_gettimeofday(&now);
778
779 /* Move closing keys to free list when they've finished the closing
780 * idle time.
781 */
782 pj_lock_acquire(ioqueue->lock);
783 key = ioqueue->closing_list.next;
784 while (key != &ioqueue->closing_list) {
785 pj_ioqueue_key_t *next = key->next;
786
787 pj_assert(key->closing != 0);
788
789 if (PJ_TIME_VAL_GTE(now, key->free_time)) {
790 pj_list_erase(key);
791 pj_list_push_back(&ioqueue->free_list, key);
792 }
793 key = next;
794 }
795 pj_lock_release(ioqueue->lock);
796 }
797#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000798
799 /* Return number of events. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000800 return event_count;
Benny Prijono9033e312005-11-21 02:08:39 +0000801}
802
803/*
804 * pj_ioqueue_recv()
805 *
806 * Initiate overlapped WSARecv() operation.
807 */
808PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
809 pj_ioqueue_op_key_t *op_key,
810 void *buffer,
811 pj_ssize_t *length,
812 pj_uint32_t flags )
813{
814 /*
815 * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and
816 * addrlen here. But unfortunately it generates EINVAL... :-(
817 * -bennylp
818 */
819 int rc;
820 DWORD bytesRead;
821 DWORD dwFlags = 0;
822 union operation_key *op_key_rec;
823
824 PJ_CHECK_STACK();
825 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
826
Benny Prijono5accbd02006-03-30 16:32:18 +0000827#if PJ_IOQUEUE_HAS_SAFE_UNREG
828 /* Check key is not closing */
829 if (key->closing)
830 return PJ_ECANCELLED;
831#endif
832
Benny Prijono9033e312005-11-21 02:08:39 +0000833 op_key_rec = (union operation_key*)op_key->internal__;
834 op_key_rec->overlapped.wsabuf.buf = buffer;
835 op_key_rec->overlapped.wsabuf.len = *length;
836
837 dwFlags = flags;
838
839 /* Try non-overlapped received first to see if data is
840 * immediately available.
841 */
842 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
843 rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
844 &bytesRead, &dwFlags, NULL, NULL);
845 if (rc == 0) {
846 *length = bytesRead;
847 return PJ_SUCCESS;
848 } else {
849 DWORD dwError = WSAGetLastError();
850 if (dwError != WSAEWOULDBLOCK) {
851 *length = -1;
852 return PJ_RETURN_OS_ERROR(dwError);
853 }
854 }
855 }
856
857 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
858
859 /*
860 * No immediate data available.
861 * Register overlapped Recv() operation.
862 */
Benny Prijonoac623b32006-07-03 15:19:31 +0000863 pj_bzero( &op_key_rec->overlapped.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +0000864 sizeof(op_key_rec->overlapped.overlapped));
865 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
866
867 rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
868 &bytesRead, &dwFlags,
869 &op_key_rec->overlapped.overlapped, NULL);
870 if (rc == SOCKET_ERROR) {
871 DWORD dwStatus = WSAGetLastError();
872 if (dwStatus!=WSA_IO_PENDING) {
873 *length = -1;
874 return PJ_STATUS_FROM_OS(dwStatus);
875 }
876 }
877
878 /* Pending operation has been scheduled. */
879 return PJ_EPENDING;
880}
881
882/*
883 * pj_ioqueue_recvfrom()
884 *
885 * Initiate overlapped RecvFrom() operation.
886 */
887PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
888 pj_ioqueue_op_key_t *op_key,
889 void *buffer,
890 pj_ssize_t *length,
891 pj_uint32_t flags,
892 pj_sockaddr_t *addr,
893 int *addrlen)
894{
895 int rc;
896 DWORD bytesRead;
897 DWORD dwFlags = 0;
898 union operation_key *op_key_rec;
899
900 PJ_CHECK_STACK();
901 PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
902
Benny Prijono5accbd02006-03-30 16:32:18 +0000903#if PJ_IOQUEUE_HAS_SAFE_UNREG
904 /* Check key is not closing */
905 if (key->closing)
906 return PJ_ECANCELLED;
907#endif
908
Benny Prijono9033e312005-11-21 02:08:39 +0000909 op_key_rec = (union operation_key*)op_key->internal__;
910 op_key_rec->overlapped.wsabuf.buf = buffer;
911 op_key_rec->overlapped.wsabuf.len = *length;
912
913 dwFlags = flags;
914
915 /* Try non-overlapped received first to see if data is
916 * immediately available.
917 */
918 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
919 rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
920 &bytesRead, &dwFlags, addr, addrlen, NULL, NULL);
921 if (rc == 0) {
922 *length = bytesRead;
923 return PJ_SUCCESS;
924 } else {
925 DWORD dwError = WSAGetLastError();
926 if (dwError != WSAEWOULDBLOCK) {
927 *length = -1;
928 return PJ_RETURN_OS_ERROR(dwError);
929 }
930 }
931 }
932
933 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
934
935 /*
936 * No immediate data available.
937 * Register overlapped Recv() operation.
938 */
Benny Prijonoac623b32006-07-03 15:19:31 +0000939 pj_bzero( &op_key_rec->overlapped.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +0000940 sizeof(op_key_rec->overlapped.overlapped));
941 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
942
943 rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
944 &bytesRead, &dwFlags, addr, addrlen,
945 &op_key_rec->overlapped.overlapped, NULL);
946 if (rc == SOCKET_ERROR) {
947 DWORD dwStatus = WSAGetLastError();
948 if (dwStatus!=WSA_IO_PENDING) {
949 *length = -1;
950 return PJ_STATUS_FROM_OS(dwStatus);
951 }
952 }
953
954 /* Pending operation has been scheduled. */
955 return PJ_EPENDING;
956}
957
958/*
959 * pj_ioqueue_send()
960 *
961 * Initiate overlapped Send operation.
962 */
963PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
964 pj_ioqueue_op_key_t *op_key,
965 const void *data,
966 pj_ssize_t *length,
967 pj_uint32_t flags )
968{
969 return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0);
970}
971
972
973/*
974 * pj_ioqueue_sendto()
975 *
976 * Initiate overlapped SendTo operation.
977 */
978PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
979 pj_ioqueue_op_key_t *op_key,
980 const void *data,
981 pj_ssize_t *length,
982 pj_uint32_t flags,
983 const pj_sockaddr_t *addr,
984 int addrlen)
985{
986 int rc;
987 DWORD bytesWritten;
988 DWORD dwFlags;
989 union operation_key *op_key_rec;
990
991 PJ_CHECK_STACK();
992 PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);
Benny Prijono5accbd02006-03-30 16:32:18 +0000993
994#if PJ_IOQUEUE_HAS_SAFE_UNREG
995 /* Check key is not closing */
996 if (key->closing)
997 return PJ_ECANCELLED;
998#endif
999
Benny Prijono9033e312005-11-21 02:08:39 +00001000 op_key_rec = (union operation_key*)op_key->internal__;
1001
1002 /*
1003 * First try blocking write.
1004 */
1005 op_key_rec->overlapped.wsabuf.buf = (void*)data;
1006 op_key_rec->overlapped.wsabuf.len = *length;
1007
1008 dwFlags = flags;
1009
1010 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
1011 rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1012 &bytesWritten, dwFlags, addr, addrlen,
1013 NULL, NULL);
1014 if (rc == 0) {
1015 *length = bytesWritten;
1016 return PJ_SUCCESS;
1017 } else {
1018 DWORD dwStatus = WSAGetLastError();
1019 if (dwStatus != WSAEWOULDBLOCK) {
1020 *length = -1;
1021 return PJ_RETURN_OS_ERROR(dwStatus);
1022 }
1023 }
1024 }
1025
1026 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
1027
1028 /*
1029 * Data can't be sent immediately.
1030 * Schedule asynchronous WSASend().
1031 */
Benny Prijonoac623b32006-07-03 15:19:31 +00001032 pj_bzero( &op_key_rec->overlapped.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +00001033 sizeof(op_key_rec->overlapped.overlapped));
1034 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND;
1035
1036 rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1037 &bytesWritten, dwFlags, addr, addrlen,
1038 &op_key_rec->overlapped.overlapped, NULL);
1039 if (rc == SOCKET_ERROR) {
1040 DWORD dwStatus = WSAGetLastError();
1041 if (dwStatus!=WSA_IO_PENDING)
1042 return PJ_STATUS_FROM_OS(dwStatus);
1043 }
1044
1045 /* Asynchronous operation successfully submitted. */
1046 return PJ_EPENDING;
1047}
1048
1049#if PJ_HAS_TCP
1050
1051/*
1052 * pj_ioqueue_accept()
1053 *
1054 * Initiate overlapped accept() operation.
1055 */
1056PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
1057 pj_ioqueue_op_key_t *op_key,
1058 pj_sock_t *new_sock,
1059 pj_sockaddr_t *local,
1060 pj_sockaddr_t *remote,
1061 int *addrlen)
1062{
1063 BOOL rc;
1064 DWORD bytesReceived;
1065 pj_status_t status;
1066 union operation_key *op_key_rec;
1067 SOCKET sock;
1068
1069 PJ_CHECK_STACK();
1070 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
1071
Benny Prijono5accbd02006-03-30 16:32:18 +00001072#if PJ_IOQUEUE_HAS_SAFE_UNREG
1073 /* Check key is not closing */
1074 if (key->closing)
1075 return PJ_ECANCELLED;
1076#endif
1077
Benny Prijono9033e312005-11-21 02:08:39 +00001078 /*
1079 * See if there is a new connection immediately available.
1080 */
1081 sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0);
1082 if (sock != INVALID_SOCKET) {
1083 /* Yes! New socket is available! */
1084 int status;
1085
1086 status = getsockname(sock, local, addrlen);
1087 if (status != 0) {
1088 DWORD dwError = WSAGetLastError();
1089 closesocket(sock);
1090 return PJ_RETURN_OS_ERROR(dwError);
1091 }
1092
1093 *new_sock = sock;
1094 return PJ_SUCCESS;
1095
1096 } else {
1097 DWORD dwError = WSAGetLastError();
1098 if (dwError != WSAEWOULDBLOCK) {
1099 return PJ_RETURN_OS_ERROR(dwError);
1100 }
1101 }
1102
1103 /*
1104 * No connection is immediately available.
1105 * Must schedule an asynchronous operation.
1106 */
1107 op_key_rec = (union operation_key*)op_key->internal__;
1108
Benny Prijono8ab968f2007-07-20 08:08:30 +00001109 status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0,
Benny Prijono9033e312005-11-21 02:08:39 +00001110 &op_key_rec->accept.newsock);
1111 if (status != PJ_SUCCESS)
1112 return status;
1113
1114 /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket
1115 * addresses can be obtained with getsockname() and getpeername().
1116 */
1117 status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET,
1118 SO_UPDATE_ACCEPT_CONTEXT,
1119 (char*)&key->hnd, sizeof(SOCKET));
1120 /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
1121 * So ignore the error status.
1122 */
1123
1124 op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT;
1125 op_key_rec->accept.addrlen = addrlen;
1126 op_key_rec->accept.local = local;
1127 op_key_rec->accept.remote = remote;
1128 op_key_rec->accept.newsock_ptr = new_sock;
Benny Prijonoac623b32006-07-03 15:19:31 +00001129 pj_bzero( &op_key_rec->accept.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +00001130 sizeof(op_key_rec->accept.overlapped));
1131
1132 rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock,
1133 op_key_rec->accept.accept_buf,
1134 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN,
1135 &bytesReceived,
1136 &op_key_rec->accept.overlapped );
1137
1138 if (rc == TRUE) {
1139 ioqueue_on_accept_complete(&op_key_rec->accept);
1140 return PJ_SUCCESS;
1141 } else {
1142 DWORD dwStatus = WSAGetLastError();
1143 if (dwStatus!=WSA_IO_PENDING)
1144 return PJ_STATUS_FROM_OS(dwStatus);
1145 }
1146
1147 /* Asynchronous Accept() has been submitted. */
1148 return PJ_EPENDING;
1149}
1150
1151
1152/*
1153 * pj_ioqueue_connect()
1154 *
1155 * Initiate overlapped connect() operation (well, it's non-blocking actually,
1156 * since there's no overlapped version of connect()).
1157 */
1158PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
1159 const pj_sockaddr_t *addr,
1160 int addrlen )
1161{
1162 HANDLE hEvent;
1163 pj_ioqueue_t *ioqueue;
1164
1165 PJ_CHECK_STACK();
1166 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
1167
Benny Prijono5accbd02006-03-30 16:32:18 +00001168#if PJ_IOQUEUE_HAS_SAFE_UNREG
1169 /* Check key is not closing */
1170 if (key->closing)
1171 return PJ_ECANCELLED;
1172#endif
1173
Benny Prijono9033e312005-11-21 02:08:39 +00001174 /* Initiate connect() */
1175 if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) {
1176 DWORD dwStatus;
1177 dwStatus = WSAGetLastError();
1178 if (dwStatus != WSAEWOULDBLOCK) {
1179 return PJ_RETURN_OS_ERROR(dwStatus);
1180 }
1181 } else {
1182 /* Connect has completed immediately! */
1183 return PJ_SUCCESS;
1184 }
1185
1186 ioqueue = key->ioqueue;
1187
1188 /* Add to the array of connecting socket to be polled */
1189 pj_lock_acquire(ioqueue->lock);
1190
1191 if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) {
1192 pj_lock_release(ioqueue->lock);
1193 return PJ_ETOOMANYCONN;
1194 }
1195
1196 /* Get or create event object. */
1197 if (ioqueue->event_count) {
1198 hEvent = ioqueue->event_pool[ioqueue->event_count - 1];
1199 --ioqueue->event_count;
1200 } else {
1201 hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
1202 if (hEvent == NULL) {
1203 DWORD dwStatus = GetLastError();
1204 pj_lock_release(ioqueue->lock);
1205 return PJ_STATUS_FROM_OS(dwStatus);
1206 }
1207 }
1208
1209 /* Mark key as connecting.
1210 * We can't use array index since key can be removed dynamically.
1211 */
1212 key->connecting = 1;
1213
1214 /* Associate socket events to the event object. */
1215 if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) {
1216 CloseHandle(hEvent);
1217 pj_lock_release(ioqueue->lock);
1218 return PJ_RETURN_OS_ERROR(WSAGetLastError());
1219 }
1220
1221 /* Add to array. */
1222 ioqueue->connecting_keys[ ioqueue->connecting_count ] = key;
1223 ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent;
1224 ioqueue->connecting_count++;
1225
1226 pj_lock_release(ioqueue->lock);
1227
1228 return PJ_EPENDING;
1229}
1230#endif /* #if PJ_HAS_TCP */
1231
1232
1233PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1234 pj_size_t size )
1235{
Benny Prijonoac623b32006-07-03 15:19:31 +00001236 pj_bzero(op_key, size);
Benny Prijono9033e312005-11-21 02:08:39 +00001237}
1238
1239PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1240 pj_ioqueue_op_key_t *op_key )
1241{
1242 BOOL rc;
1243 DWORD bytesTransfered;
1244
1245 rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key,
1246 &bytesTransfered, FALSE );
1247
1248 if (rc == FALSE) {
1249 return GetLastError()==ERROR_IO_INCOMPLETE;
1250 }
1251
1252 return FALSE;
1253}
1254
1255
1256PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1257 pj_ioqueue_op_key_t *op_key,
1258 pj_ssize_t bytes_status )
1259{
1260 BOOL rc;
1261
1262 rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status,
1263 (long)key, (OVERLAPPED*)op_key );
1264 if (rc == FALSE) {
1265 return PJ_RETURN_OS_ERROR(GetLastError());
1266 }
1267
1268 return PJ_SUCCESS;
1269}
1270