blob: f12e8eec2fa3eac3bf827344b090560b13bc24a6 [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
Benny Prijono844653c2008-12-23 17:27:53 +00003 * Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono9033e312005-11-21 02:08:39 +00005 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20#include <pj/ioqueue.h>
21#include <pj/os.h>
22#include <pj/lock.h>
23#include <pj/pool.h>
24#include <pj/string.h>
25#include <pj/sock.h>
26#include <pj/array.h>
27#include <pj/log.h>
28#include <pj/assert.h>
29#include <pj/errno.h>
Benny Prijono9c025eb2006-07-10 21:35:27 +000030#include <pj/compat/socket.h>
Benny Prijono9033e312005-11-21 02:08:39 +000031
32
33#if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0
34# include <winsock2.h>
35#elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0
36# include <winsock.h>
37#endif
38
39#if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0
40# include <mswsock.h>
41#endif
42
43
44/* The address specified in AcceptEx() must be 16 more than the size of
45 * SOCKADDR (source: MSDN).
46 */
47#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16)
48
49typedef struct generic_overlapped
50{
51 WSAOVERLAPPED overlapped;
52 pj_ioqueue_operation_e operation;
53} generic_overlapped;
54
55/*
56 * OVERLAPPPED structure for send and receive.
57 */
58typedef struct ioqueue_overlapped
59{
60 WSAOVERLAPPED overlapped;
61 pj_ioqueue_operation_e operation;
62 WSABUF wsabuf;
63 pj_sockaddr_in dummy_addr;
64 int dummy_addrlen;
65} ioqueue_overlapped;
66
67#if PJ_HAS_TCP
68/*
69 * OVERLAP structure for accept.
70 */
71typedef struct ioqueue_accept_rec
72{
73 WSAOVERLAPPED overlapped;
74 pj_ioqueue_operation_e operation;
75 pj_sock_t newsock;
76 pj_sock_t *newsock_ptr;
77 int *addrlen;
78 void *remote;
79 void *local;
80 char accept_buf[2 * ACCEPT_ADDR_LEN];
81} ioqueue_accept_rec;
82#endif
83
84/*
85 * Structure to hold pending operation key.
86 */
87union operation_key
88{
89 generic_overlapped generic;
90 ioqueue_overlapped overlapped;
91#if PJ_HAS_TCP
92 ioqueue_accept_rec accept;
93#endif
94};
95
96/* Type of handle in the key. */
97enum handle_type
98{
99 HND_IS_UNKNOWN,
100 HND_IS_FILE,
101 HND_IS_SOCKET,
102};
103
Benny Prijono8d317a02006-03-22 11:49:19 +0000104enum { POST_QUIT_LEN = 0xFFFFDEADUL };
105
Benny Prijono9033e312005-11-21 02:08:39 +0000106/*
107 * Structure for individual socket.
108 */
109struct pj_ioqueue_key_t
110{
Benny Prijono5accbd02006-03-30 16:32:18 +0000111 PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
112
Benny Prijono9033e312005-11-21 02:08:39 +0000113 pj_ioqueue_t *ioqueue;
114 HANDLE hnd;
115 void *user_data;
116 enum handle_type hnd_type;
Benny Prijono5accbd02006-03-30 16:32:18 +0000117 pj_ioqueue_callback cb;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000118 pj_bool_t allow_concurrent;
Benny Prijono5accbd02006-03-30 16:32:18 +0000119
Benny Prijono9033e312005-11-21 02:08:39 +0000120#if PJ_HAS_TCP
121 int connecting;
122#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000123
124#if PJ_IOQUEUE_HAS_SAFE_UNREG
125 pj_atomic_t *ref_count;
126 pj_bool_t closing;
127 pj_time_val free_time;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000128 pj_mutex_t *mutex;
Benny Prijono5accbd02006-03-30 16:32:18 +0000129#endif
130
Benny Prijono9033e312005-11-21 02:08:39 +0000131};
132
133/*
134 * IO Queue structure.
135 */
136struct pj_ioqueue_t
137{
138 HANDLE iocp;
139 pj_lock_t *lock;
140 pj_bool_t auto_delete_lock;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000141 pj_bool_t default_concurrency;
Benny Prijono5accbd02006-03-30 16:32:18 +0000142
143#if PJ_IOQUEUE_HAS_SAFE_UNREG
144 pj_ioqueue_key_t active_list;
145 pj_ioqueue_key_t free_list;
146 pj_ioqueue_key_t closing_list;
147#endif
148
149 /* These are to keep track of connecting sockets */
150#if PJ_HAS_TCP
Benny Prijono9033e312005-11-21 02:08:39 +0000151 unsigned event_count;
152 HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1];
Benny Prijono9033e312005-11-21 02:08:39 +0000153 unsigned connecting_count;
154 HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1];
155 pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1];
156#endif
157};
158
159
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000160#if PJ_IOQUEUE_HAS_SAFE_UNREG
161/* Prototype */
162static void scan_closing_keys(pj_ioqueue_t *ioqueue);
163#endif
164
165
Benny Prijono9033e312005-11-21 02:08:39 +0000166#if PJ_HAS_TCP
167/*
168 * Process the socket when the overlapped accept() completed.
169 */
170static void ioqueue_on_accept_complete(ioqueue_accept_rec *accept_overlapped)
171{
172 struct sockaddr *local;
173 struct sockaddr *remote;
174 int locallen, remotelen;
175
176 PJ_CHECK_STACK();
177
178 /* Operation complete immediately. */
Benny Prijono25c8f932008-08-26 14:41:26 +0000179 if (accept_overlapped->addrlen) {
180 GetAcceptExSockaddrs( accept_overlapped->accept_buf,
181 0,
182 ACCEPT_ADDR_LEN,
183 ACCEPT_ADDR_LEN,
184 &local,
185 &locallen,
186 &remote,
187 &remotelen);
188 if (*accept_overlapped->addrlen >= locallen) {
189 if (accept_overlapped->local)
190 pj_memcpy(accept_overlapped->local, local, locallen);
191 if (accept_overlapped->remote)
192 pj_memcpy(accept_overlapped->remote, remote, locallen);
193 } else {
194 if (accept_overlapped->local)
195 pj_bzero(accept_overlapped->local,
196 *accept_overlapped->addrlen);
197 if (accept_overlapped->remote)
198 pj_bzero(accept_overlapped->remote,
199 *accept_overlapped->addrlen);
200 }
201
202 *accept_overlapped->addrlen = locallen;
Benny Prijono9033e312005-11-21 02:08:39 +0000203 }
Benny Prijono9033e312005-11-21 02:08:39 +0000204 if (accept_overlapped->newsock_ptr)
205 *accept_overlapped->newsock_ptr = accept_overlapped->newsock;
206 accept_overlapped->operation = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000207}
208
209static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos)
210{
211 pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos];
212 HANDLE hEvent = ioqueue->connecting_handles[pos];
213
214 /* Remove key from array of connecting handles. */
215 pj_array_erase(ioqueue->connecting_keys, sizeof(key),
216 ioqueue->connecting_count, pos);
217 pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE),
218 ioqueue->connecting_count, pos);
219 --ioqueue->connecting_count;
220
221 /* Disassociate the socket from the event. */
222 WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0);
223
224 /* Put event object to pool. */
225 if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) {
226 ioqueue->event_pool[ioqueue->event_count++] = hEvent;
227 } else {
228 /* Shouldn't happen. There should be no more pending connections
229 * than max.
230 */
231 pj_assert(0);
232 CloseHandle(hEvent);
233 }
234
235}
236
237/*
238 * Poll for the completion of non-blocking connect().
239 * If there's a completion, the function return the key of the completed
240 * socket, and 'result' argument contains the connect() result. If connect()
241 * succeeded, 'result' will have value zero, otherwise will have the error
242 * code.
243 */
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000244static int check_connecting( pj_ioqueue_t *ioqueue )
Benny Prijono9033e312005-11-21 02:08:39 +0000245{
Benny Prijono9033e312005-11-21 02:08:39 +0000246 if (ioqueue->connecting_count) {
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000247 int i, count;
248 struct
249 {
250 pj_ioqueue_key_t *key;
251 pj_status_t status;
252 } events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1];
Benny Prijono9033e312005-11-21 02:08:39 +0000253
254 pj_lock_acquire(ioqueue->lock);
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000255 for (count=0; count<PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1; ++count) {
256 DWORD result;
Benny Prijono9033e312005-11-21 02:08:39 +0000257
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000258 result = WaitForMultipleObjects(ioqueue->connecting_count,
259 ioqueue->connecting_handles,
260 FALSE, 0);
261 if (result >= WAIT_OBJECT_0 &&
262 result < WAIT_OBJECT_0+ioqueue->connecting_count)
263 {
264 WSANETWORKEVENTS net_events;
Benny Prijono9033e312005-11-21 02:08:39 +0000265
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000266 /* Got completed connect(). */
267 unsigned pos = result - WAIT_OBJECT_0;
268 events[count].key = ioqueue->connecting_keys[pos];
Benny Prijono9033e312005-11-21 02:08:39 +0000269
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000270 /* See whether connect has succeeded. */
271 WSAEnumNetworkEvents((pj_sock_t)events[count].key->hnd,
272 ioqueue->connecting_handles[pos],
273 &net_events);
274 events[count].status =
275 PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]);
276
277 /* Erase socket from pending connect. */
278 erase_connecting_socket(ioqueue, pos);
279 } else {
280 /* No more events */
281 break;
282 }
Benny Prijono9033e312005-11-21 02:08:39 +0000283 }
284 pj_lock_release(ioqueue->lock);
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000285
286 /* Call callbacks. */
287 for (i=0; i<count; ++i) {
288 if (events[i].key->cb.on_connect_complete) {
289 events[i].key->cb.on_connect_complete(events[i].key,
290 events[i].status);
291 }
292 }
293
294 return count;
Benny Prijono9033e312005-11-21 02:08:39 +0000295 }
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000296
297 return 0;
298
Benny Prijono9033e312005-11-21 02:08:39 +0000299}
300#endif
301
302/*
303 * pj_ioqueue_name()
304 */
305PJ_DEF(const char*) pj_ioqueue_name(void)
306{
307 return "iocp";
308}
309
310/*
311 * pj_ioqueue_create()
312 */
313PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
314 pj_size_t max_fd,
315 pj_ioqueue_t **p_ioqueue)
316{
317 pj_ioqueue_t *ioqueue;
Benny Prijono5accbd02006-03-30 16:32:18 +0000318 unsigned i;
Benny Prijono9033e312005-11-21 02:08:39 +0000319 pj_status_t rc;
320
321 PJ_UNUSED_ARG(max_fd);
322 PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL);
323
324 rc = sizeof(union operation_key);
325
326 /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */
327 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
328 sizeof(union operation_key), PJ_EBUG);
329
Benny Prijono5accbd02006-03-30 16:32:18 +0000330 /* Create IOCP */
Benny Prijono9033e312005-11-21 02:08:39 +0000331 ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue));
332 ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
333 if (ioqueue->iocp == NULL)
334 return PJ_RETURN_OS_ERROR(GetLastError());
335
Benny Prijono5accbd02006-03-30 16:32:18 +0000336 /* Create IOCP mutex */
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000337 rc = pj_lock_create_recursive_mutex(pool, NULL, &ioqueue->lock);
Benny Prijono9033e312005-11-21 02:08:39 +0000338 if (rc != PJ_SUCCESS) {
339 CloseHandle(ioqueue->iocp);
340 return rc;
341 }
342
343 ioqueue->auto_delete_lock = PJ_TRUE;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000344 ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
Benny Prijono9033e312005-11-21 02:08:39 +0000345
Benny Prijono5accbd02006-03-30 16:32:18 +0000346#if PJ_IOQUEUE_HAS_SAFE_UNREG
347 /*
348 * Create and initialize key pools.
349 */
350 pj_list_init(&ioqueue->active_list);
351 pj_list_init(&ioqueue->free_list);
352 pj_list_init(&ioqueue->closing_list);
353
354 /* Preallocate keys according to max_fd setting, and put them
355 * in free_list.
356 */
357 for (i=0; i<max_fd; ++i) {
358 pj_ioqueue_key_t *key;
359
360 key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t));
361
362 rc = pj_atomic_create(pool, 0, &key->ref_count);
363 if (rc != PJ_SUCCESS) {
364 key = ioqueue->free_list.next;
365 while (key != &ioqueue->free_list) {
366 pj_atomic_destroy(key->ref_count);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000367 pj_mutex_destroy(key->mutex);
368 key = key->next;
369 }
370 CloseHandle(ioqueue->iocp);
371 return rc;
372 }
373
374 rc = pj_mutex_create_recursive(pool, "ioqkey", &key->mutex);
375 if (rc != PJ_SUCCESS) {
376 pj_atomic_destroy(key->ref_count);
377 key = ioqueue->free_list.next;
378 while (key != &ioqueue->free_list) {
379 pj_atomic_destroy(key->ref_count);
380 pj_mutex_destroy(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000381 key = key->next;
382 }
383 CloseHandle(ioqueue->iocp);
384 return rc;
385 }
386
387 pj_list_push_back(&ioqueue->free_list, key);
Benny Prijono5accbd02006-03-30 16:32:18 +0000388 }
389#endif
390
Benny Prijono9033e312005-11-21 02:08:39 +0000391 *p_ioqueue = ioqueue;
392
393 PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue));
394 return PJ_SUCCESS;
395}
396
397/*
398 * pj_ioqueue_destroy()
399 */
400PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
401{
Benny Prijono3569c0d2007-04-06 10:29:20 +0000402#if PJ_HAS_TCP
Benny Prijono9033e312005-11-21 02:08:39 +0000403 unsigned i;
Benny Prijono3569c0d2007-04-06 10:29:20 +0000404#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000405 pj_ioqueue_key_t *key;
Benny Prijono9033e312005-11-21 02:08:39 +0000406
407 PJ_CHECK_STACK();
408 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
409
Benny Prijono5accbd02006-03-30 16:32:18 +0000410 pj_lock_acquire(ioqueue->lock);
411
412#if PJ_HAS_TCP
Benny Prijono9033e312005-11-21 02:08:39 +0000413 /* Destroy events in the pool */
414 for (i=0; i<ioqueue->event_count; ++i) {
415 CloseHandle(ioqueue->event_pool[i]);
416 }
417 ioqueue->event_count = 0;
Benny Prijono5accbd02006-03-30 16:32:18 +0000418#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000419
420 if (CloseHandle(ioqueue->iocp) != TRUE)
421 return PJ_RETURN_OS_ERROR(GetLastError());
422
Benny Prijono5accbd02006-03-30 16:32:18 +0000423#if PJ_IOQUEUE_HAS_SAFE_UNREG
424 /* Destroy reference counters */
425 key = ioqueue->active_list.next;
426 while (key != &ioqueue->active_list) {
427 pj_atomic_destroy(key->ref_count);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000428 pj_mutex_destroy(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000429 key = key->next;
430 }
431
432 key = ioqueue->closing_list.next;
433 while (key != &ioqueue->closing_list) {
434 pj_atomic_destroy(key->ref_count);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000435 pj_mutex_destroy(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000436 key = key->next;
437 }
438
439 key = ioqueue->free_list.next;
440 while (key != &ioqueue->free_list) {
441 pj_atomic_destroy(key->ref_count);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000442 pj_mutex_destroy(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000443 key = key->next;
444 }
445#endif
446
Benny Prijono9033e312005-11-21 02:08:39 +0000447 if (ioqueue->auto_delete_lock)
448 pj_lock_destroy(ioqueue->lock);
449
450 return PJ_SUCCESS;
451}
452
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000453
454PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue,
455 pj_bool_t allow)
456{
457 PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
458 ioqueue->default_concurrency = allow;
459 return PJ_SUCCESS;
460}
461
Benny Prijono9033e312005-11-21 02:08:39 +0000462/*
463 * pj_ioqueue_set_lock()
464 */
465PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
466 pj_lock_t *lock,
467 pj_bool_t auto_delete )
468{
469 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
470
471 if (ioqueue->auto_delete_lock) {
472 pj_lock_destroy(ioqueue->lock);
473 }
474
475 ioqueue->lock = lock;
476 ioqueue->auto_delete_lock = auto_delete;
477
478 return PJ_SUCCESS;
479}
480
481/*
482 * pj_ioqueue_register_sock()
483 */
484PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
485 pj_ioqueue_t *ioqueue,
486 pj_sock_t sock,
487 void *user_data,
488 const pj_ioqueue_callback *cb,
489 pj_ioqueue_key_t **key )
490{
491 HANDLE hioq;
492 pj_ioqueue_key_t *rec;
493 u_long value;
494 int rc;
495
496 PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL);
497
Benny Prijono5accbd02006-03-30 16:32:18 +0000498 pj_lock_acquire(ioqueue->lock);
499
500#if PJ_IOQUEUE_HAS_SAFE_UNREG
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000501 /* Scan closing list first to release unused keys.
502 * Must do this with lock acquired.
503 */
504 scan_closing_keys(ioqueue);
505
Benny Prijono5accbd02006-03-30 16:32:18 +0000506 /* If safe unregistration is used, then get the key record from
507 * the free list.
508 */
509 if (pj_list_empty(&ioqueue->free_list)) {
510 pj_lock_release(ioqueue->lock);
511 return PJ_ETOOMANY;
512 }
513
514 rec = ioqueue->free_list.next;
515 pj_list_erase(rec);
516
517 /* Set initial reference count to 1 */
518 pj_assert(pj_atomic_get(rec->ref_count) == 0);
519 pj_atomic_inc(rec->ref_count);
520
521 rec->closing = 0;
522
523#else
Benny Prijono9033e312005-11-21 02:08:39 +0000524 rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
Benny Prijono5accbd02006-03-30 16:32:18 +0000525#endif
526
527 /* Build the key for this socket. */
Benny Prijono9033e312005-11-21 02:08:39 +0000528 rec->ioqueue = ioqueue;
529 rec->hnd = (HANDLE)sock;
530 rec->hnd_type = HND_IS_SOCKET;
531 rec->user_data = user_data;
532 pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
533
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000534 /* Set concurrency for this handle */
535 rc = pj_ioqueue_set_concurrency(rec, ioqueue->default_concurrency);
536 if (rc != PJ_SUCCESS) {
537 pj_lock_release(ioqueue->lock);
538 return rc;
539 }
540
Benny Prijono5accbd02006-03-30 16:32:18 +0000541#if PJ_HAS_TCP
542 rec->connecting = 0;
543#endif
544
Benny Prijono9033e312005-11-21 02:08:39 +0000545 /* Set socket to nonblocking. */
546 value = 1;
547 rc = ioctlsocket(sock, FIONBIO, &value);
548 if (rc != 0) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000549 pj_lock_release(ioqueue->lock);
Benny Prijono9033e312005-11-21 02:08:39 +0000550 return PJ_RETURN_OS_ERROR(WSAGetLastError());
551 }
552
553 /* Associate with IOCP */
554 hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0);
555 if (!hioq) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000556 pj_lock_release(ioqueue->lock);
Benny Prijono9033e312005-11-21 02:08:39 +0000557 return PJ_RETURN_OS_ERROR(GetLastError());
558 }
559
560 *key = rec;
Benny Prijono5accbd02006-03-30 16:32:18 +0000561
562#if PJ_IOQUEUE_HAS_SAFE_UNREG
563 pj_list_push_back(&ioqueue->active_list, rec);
564#endif
565
566 pj_lock_release(ioqueue->lock);
567
Benny Prijono9033e312005-11-21 02:08:39 +0000568 return PJ_SUCCESS;
569}
570
Benny Prijono9033e312005-11-21 02:08:39 +0000571
572/*
573 * pj_ioqueue_get_user_data()
574 */
575PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
576{
577 PJ_ASSERT_RETURN(key, NULL);
578 return key->user_data;
579}
580
581/*
582 * pj_ioqueue_set_user_data()
583 */
584PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
585 void *user_data,
586 void **old_data )
587{
588 PJ_ASSERT_RETURN(key, PJ_EINVAL);
589
590 if (old_data)
591 *old_data = key->user_data;
592
593 key->user_data = user_data;
594 return PJ_SUCCESS;
595}
596
Benny Prijono8d317a02006-03-22 11:49:19 +0000597
Benny Prijono5accbd02006-03-30 16:32:18 +0000598#if PJ_IOQUEUE_HAS_SAFE_UNREG
599/* Decrement the key's reference counter, and when the counter reach zero,
600 * destroy the key.
601 */
602static void decrement_counter(pj_ioqueue_key_t *key)
603{
604 if (pj_atomic_dec_and_get(key->ref_count) == 0) {
605
606 pj_lock_acquire(key->ioqueue->lock);
607
608 pj_assert(key->closing == 1);
609 pj_gettimeofday(&key->free_time);
610 key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
611 pj_time_val_normalize(&key->free_time);
612
613 pj_list_erase(key);
614 pj_list_push_back(&key->ioqueue->closing_list, key);
615
616 pj_lock_release(key->ioqueue->lock);
617 }
618}
619#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000620
Benny Prijono9033e312005-11-21 02:08:39 +0000621/*
Benny Prijono5accbd02006-03-30 16:32:18 +0000622 * Poll the I/O Completion Port, execute callback,
Benny Prijono8d317a02006-03-22 11:49:19 +0000623 * and return the key and bytes transfered of the last operation.
Benny Prijono9033e312005-11-21 02:08:39 +0000624 */
Benny Prijono8d317a02006-03-22 11:49:19 +0000625static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
626 pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key )
Benny Prijono9033e312005-11-21 02:08:39 +0000627{
Benny Prijono8d317a02006-03-22 11:49:19 +0000628 DWORD dwBytesTransfered, dwKey;
Benny Prijono9033e312005-11-21 02:08:39 +0000629 generic_overlapped *pOv;
630 pj_ioqueue_key_t *key;
Benny Prijono4f2be312005-11-21 17:01:06 +0000631 pj_ssize_t size_status = -1;
Benny Prijono8d317a02006-03-22 11:49:19 +0000632 BOOL rcGetQueued;
Benny Prijono9033e312005-11-21 02:08:39 +0000633
634 /* Poll for completion status. */
Benny Prijono8d317a02006-03-22 11:49:19 +0000635 rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransfered,
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000636 &dwKey, (OVERLAPPED**)&pOv,
Benny Prijono8d317a02006-03-22 11:49:19 +0000637 dwTimeout);
Benny Prijono9033e312005-11-21 02:08:39 +0000638
639 /* The return value is:
640 * - nonzero if event was dequeued.
641 * - zero and pOv==NULL if no event was dequeued.
642 * - zero and pOv!=NULL if event for failed I/O was dequeued.
643 */
644 if (pOv) {
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000645 pj_bool_t has_lock;
646
Benny Prijono9033e312005-11-21 02:08:39 +0000647 /* Event was dequeued for either successfull or failed I/O */
648 key = (pj_ioqueue_key_t*)dwKey;
649 size_status = dwBytesTransfered;
Benny Prijono8d317a02006-03-22 11:49:19 +0000650
651 /* Report to caller regardless */
652 if (p_bytes)
653 *p_bytes = size_status;
654 if (p_key)
655 *p_key = key;
656
Benny Prijono5accbd02006-03-30 16:32:18 +0000657#if PJ_IOQUEUE_HAS_SAFE_UNREG
658 /* We shouldn't call callbacks if key is quitting. */
659 if (key->closing)
Benny Prijono8d317a02006-03-22 11:49:19 +0000660 return PJ_TRUE;
Benny Prijono8d317a02006-03-22 11:49:19 +0000661
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000662 /* If concurrency is disabled, lock the key
663 * (and save the lock status to local var since app may change
664 * concurrency setting while in the callback) */
665 if (key->allow_concurrent == PJ_FALSE) {
666 pj_mutex_lock(key->mutex);
667 has_lock = PJ_TRUE;
668 } else {
669 has_lock = PJ_FALSE;
670 }
671
672 /* Now that we get the lock, check again that key is not closing */
673 if (key->closing) {
674 if (has_lock) {
675 pj_mutex_unlock(key->mutex);
676 }
677 return PJ_TRUE;
678 }
679
Benny Prijono5accbd02006-03-30 16:32:18 +0000680 /* Increment reference counter to prevent this key from being
681 * deleted
Benny Prijono8d317a02006-03-22 11:49:19 +0000682 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000683 pj_atomic_inc(key->ref_count);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000684#else
685 PJ_UNUSED_ARG(has_lock);
Benny Prijono5accbd02006-03-30 16:32:18 +0000686#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000687
688 /* Carry out the callback */
Benny Prijono9033e312005-11-21 02:08:39 +0000689 switch (pOv->operation) {
690 case PJ_IOQUEUE_OP_READ:
691 case PJ_IOQUEUE_OP_RECV:
692 case PJ_IOQUEUE_OP_RECV_FROM:
693 pOv->operation = 0;
694 if (key->cb.on_read_complete)
695 key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv,
696 size_status);
697 break;
698 case PJ_IOQUEUE_OP_WRITE:
699 case PJ_IOQUEUE_OP_SEND:
700 case PJ_IOQUEUE_OP_SEND_TO:
701 pOv->operation = 0;
702 if (key->cb.on_write_complete)
703 key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv,
704 size_status);
705 break;
706#if PJ_HAS_TCP
707 case PJ_IOQUEUE_OP_ACCEPT:
708 /* special case for accept. */
709 ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv);
710 if (key->cb.on_accept_complete) {
711 ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv;
Benny Prijono9c025eb2006-07-10 21:35:27 +0000712 pj_status_t status = PJ_SUCCESS;
Benny Prijono25c8f932008-08-26 14:41:26 +0000713 pj_sock_t newsock;
Benny Prijono9c025eb2006-07-10 21:35:27 +0000714
Benny Prijono25c8f932008-08-26 14:41:26 +0000715 newsock = accept_rec->newsock;
716 accept_rec->newsock = PJ_INVALID_SOCKET;
717
718 if (newsock == PJ_INVALID_SOCKET) {
Benny Prijono9c025eb2006-07-10 21:35:27 +0000719 int dwError = WSAGetLastError();
720 if (dwError == 0) dwError = OSERR_ENOTCONN;
721 status = PJ_RETURN_OS_ERROR(dwError);
722 }
723
Benny Prijono25c8f932008-08-26 14:41:26 +0000724 key->cb.on_accept_complete(key, (pj_ioqueue_op_key_t*)pOv,
725 newsock, status);
726
Benny Prijono9033e312005-11-21 02:08:39 +0000727 }
728 break;
729 case PJ_IOQUEUE_OP_CONNECT:
730#endif
731 case PJ_IOQUEUE_OP_NONE:
732 pj_assert(0);
733 break;
734 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000735
736#if PJ_IOQUEUE_HAS_SAFE_UNREG
737 decrement_counter(key);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000738 if (has_lock)
739 pj_mutex_unlock(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000740#endif
741
Benny Prijono8d317a02006-03-22 11:49:19 +0000742 return PJ_TRUE;
Benny Prijono9033e312005-11-21 02:08:39 +0000743 }
744
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000745 /* No event was queued. */
Benny Prijono8d317a02006-03-22 11:49:19 +0000746 return PJ_FALSE;
747}
748
749/*
750 * pj_ioqueue_unregister()
751 */
752PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
753{
Benny Prijono9fe1ad22007-02-17 00:30:24 +0000754 unsigned i;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000755 pj_bool_t has_lock;
Benny Prijono9fe1ad22007-02-17 00:30:24 +0000756 enum { RETRY = 10 };
757
Benny Prijono8d317a02006-03-22 11:49:19 +0000758 PJ_ASSERT_RETURN(key, PJ_EINVAL);
759
760#if PJ_HAS_TCP
761 if (key->connecting) {
762 unsigned pos;
763 pj_ioqueue_t *ioqueue;
764
765 ioqueue = key->ioqueue;
766
767 /* Erase from connecting_handles */
768 pj_lock_acquire(ioqueue->lock);
769 for (pos=0; pos < ioqueue->connecting_count; ++pos) {
770 if (ioqueue->connecting_keys[pos] == key) {
771 erase_connecting_socket(ioqueue, pos);
772 break;
773 }
774 }
775 key->connecting = 0;
776 pj_lock_release(ioqueue->lock);
777 }
778#endif
Benny Prijono08beac62006-11-23 07:31:27 +0000779
780#if PJ_IOQUEUE_HAS_SAFE_UNREG
781 /* Mark key as closing before closing handle. */
782 key->closing = 1;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000783
784 /* If concurrency is disabled, wait until the key has finished
785 * processing the callback
786 */
787 if (key->allow_concurrent == PJ_FALSE) {
788 pj_mutex_lock(key->mutex);
789 has_lock = PJ_TRUE;
790 } else {
791 has_lock = PJ_FALSE;
792 }
793#else
794 PJ_UNUSED_ARG(has_lock);
Benny Prijono08beac62006-11-23 07:31:27 +0000795#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000796
797 /* Close handle (the only way to disassociate handle from IOCP).
798 * We also need to close handle to make sure that no further events
799 * will come to the handle.
Benny Prijono8d317a02006-03-22 11:49:19 +0000800 */
Benny Prijono6d9ee8d2008-07-18 10:33:09 +0000801 /* Update 2008/07/18 (http://trac.pjsip.org/repos/ticket/575):
802 * - It seems that CloseHandle() in itself does not actually close
803 * the socket (i.e. it will still appear in "netstat" output). Also
804 * if we only use CloseHandle(), an "Invalid Handle" exception will
805 * be raised in WSACleanup().
806 * - MSDN documentation says that CloseHandle() must be called after
807 * closesocket() call (see
808 * http://msdn.microsoft.com/en-us/library/ms724211(VS.85).aspx).
809 * But turns out that this will raise "Invalid Handle" exception
810 * in debug mode.
811 * So because of this, we replaced CloseHandle() with closesocket()
812 * instead. These was tested on WinXP SP2.
813 */
814 //CloseHandle(key->hnd);
815 pj_sock_close((pj_sock_t)key->hnd);
Benny Prijono8d317a02006-03-22 11:49:19 +0000816
Benny Prijono5accbd02006-03-30 16:32:18 +0000817 /* Reset callbacks */
Benny Prijono8d317a02006-03-22 11:49:19 +0000818 key->cb.on_accept_complete = NULL;
819 key->cb.on_connect_complete = NULL;
Benny Prijono5accbd02006-03-30 16:32:18 +0000820 key->cb.on_read_complete = NULL;
821 key->cb.on_write_complete = NULL;
Benny Prijono8d317a02006-03-22 11:49:19 +0000822
Benny Prijono5accbd02006-03-30 16:32:18 +0000823#if PJ_IOQUEUE_HAS_SAFE_UNREG
Benny Prijono5accbd02006-03-30 16:32:18 +0000824 /* Even after handle is closed, I suspect that IOCP may still try to
825 * do something with the handle, causing memory corruption when pool
826 * debugging is enabled.
827 *
828 * Forcing context switch seems to have fixed that, but this is quite
829 * an ugly solution..
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000830 *
831 * Update 2008/02/13:
832 * This should not happen if concurrency is disallowed for the key.
833 * So at least application has a solution for this (i.e. by disallowing
834 * concurrency in the key).
Benny Prijono8d317a02006-03-22 11:49:19 +0000835 */
Benny Prijono9fe1ad22007-02-17 00:30:24 +0000836 //This will loop forever if unregistration is done on the callback.
837 //Doing this with RETRY I think should solve the IOCP setting the
838 //socket signalled, without causing the deadlock.
839 //while (pj_atomic_get(key->ref_count) != 1)
840 // pj_thread_sleep(0);
841 for (i=0; pj_atomic_get(key->ref_count) != 1 && i<RETRY; ++i)
Benny Prijono08beac62006-11-23 07:31:27 +0000842 pj_thread_sleep(0);
843
844 /* Decrement reference counter to destroy the key. */
845 decrement_counter(key);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000846
847 if (has_lock)
848 pj_mutex_unlock(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000849#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000850
851 return PJ_SUCCESS;
852}
853
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000854#if PJ_IOQUEUE_HAS_SAFE_UNREG
855/* Scan the closing list, and put pending closing keys to free list.
856 * Must do this with ioqueue mutex held.
857 */
858static void scan_closing_keys(pj_ioqueue_t *ioqueue)
859{
860 if (!pj_list_empty(&ioqueue->closing_list)) {
861 pj_time_val now;
862 pj_ioqueue_key_t *key;
863
864 pj_gettimeofday(&now);
865
866 /* Move closing keys to free list when they've finished the closing
867 * idle time.
868 */
869 key = ioqueue->closing_list.next;
870 while (key != &ioqueue->closing_list) {
871 pj_ioqueue_key_t *next = key->next;
872
873 pj_assert(key->closing != 0);
874
875 if (PJ_TIME_VAL_GTE(now, key->free_time)) {
876 pj_list_erase(key);
877 pj_list_push_back(&ioqueue->free_list, key);
878 }
879 key = next;
880 }
881 }
882}
883#endif
884
Benny Prijono8d317a02006-03-22 11:49:19 +0000885/*
886 * pj_ioqueue_poll()
887 *
888 * Poll for events.
889 */
890PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
891{
892 DWORD dwMsec;
Benny Prijono3569c0d2007-04-06 10:29:20 +0000893#if PJ_HAS_TCP
Benny Prijono8d317a02006-03-22 11:49:19 +0000894 int connect_count = 0;
Benny Prijono3569c0d2007-04-06 10:29:20 +0000895#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000896 int event_count = 0;
Benny Prijono8d317a02006-03-22 11:49:19 +0000897
898 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
899
Benny Prijono8d317a02006-03-22 11:49:19 +0000900 /* Calculate miliseconds timeout for GetQueuedCompletionStatus */
901 dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
902
903 /* Poll for completion status. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000904 event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);
905
906#if PJ_HAS_TCP
907 /* Check the connecting array, only when there's no activity. */
908 if (event_count == 0) {
909 connect_count = check_connecting(ioqueue);
910 if (connect_count > 0)
911 event_count += connect_count;
912 }
913#endif
914
915#if PJ_IOQUEUE_HAS_SAFE_UNREG
916 /* Check the closing keys only when there's no activity and when there are
917 * pending closing keys.
918 */
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000919 if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000920 pj_lock_acquire(ioqueue->lock);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000921 scan_closing_keys(ioqueue);
Benny Prijono5accbd02006-03-30 16:32:18 +0000922 pj_lock_release(ioqueue->lock);
923 }
924#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000925
926 /* Return number of events. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000927 return event_count;
Benny Prijono9033e312005-11-21 02:08:39 +0000928}
929
930/*
931 * pj_ioqueue_recv()
932 *
933 * Initiate overlapped WSARecv() operation.
934 */
935PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
936 pj_ioqueue_op_key_t *op_key,
937 void *buffer,
938 pj_ssize_t *length,
939 pj_uint32_t flags )
940{
941 /*
942 * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and
943 * addrlen here. But unfortunately it generates EINVAL... :-(
944 * -bennylp
945 */
946 int rc;
947 DWORD bytesRead;
948 DWORD dwFlags = 0;
949 union operation_key *op_key_rec;
950
951 PJ_CHECK_STACK();
952 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
953
Benny Prijono5accbd02006-03-30 16:32:18 +0000954#if PJ_IOQUEUE_HAS_SAFE_UNREG
955 /* Check key is not closing */
956 if (key->closing)
957 return PJ_ECANCELLED;
958#endif
959
Benny Prijono9033e312005-11-21 02:08:39 +0000960 op_key_rec = (union operation_key*)op_key->internal__;
961 op_key_rec->overlapped.wsabuf.buf = buffer;
962 op_key_rec->overlapped.wsabuf.len = *length;
963
964 dwFlags = flags;
965
966 /* Try non-overlapped received first to see if data is
967 * immediately available.
968 */
969 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
970 rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
971 &bytesRead, &dwFlags, NULL, NULL);
972 if (rc == 0) {
973 *length = bytesRead;
974 return PJ_SUCCESS;
975 } else {
976 DWORD dwError = WSAGetLastError();
977 if (dwError != WSAEWOULDBLOCK) {
978 *length = -1;
979 return PJ_RETURN_OS_ERROR(dwError);
980 }
981 }
982 }
983
984 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
985
986 /*
987 * No immediate data available.
988 * Register overlapped Recv() operation.
989 */
Benny Prijonoac623b32006-07-03 15:19:31 +0000990 pj_bzero( &op_key_rec->overlapped.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +0000991 sizeof(op_key_rec->overlapped.overlapped));
992 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
993
994 rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
995 &bytesRead, &dwFlags,
996 &op_key_rec->overlapped.overlapped, NULL);
997 if (rc == SOCKET_ERROR) {
998 DWORD dwStatus = WSAGetLastError();
999 if (dwStatus!=WSA_IO_PENDING) {
1000 *length = -1;
1001 return PJ_STATUS_FROM_OS(dwStatus);
1002 }
1003 }
1004
1005 /* Pending operation has been scheduled. */
1006 return PJ_EPENDING;
1007}
1008
1009/*
1010 * pj_ioqueue_recvfrom()
1011 *
1012 * Initiate overlapped RecvFrom() operation.
1013 */
1014PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
1015 pj_ioqueue_op_key_t *op_key,
1016 void *buffer,
1017 pj_ssize_t *length,
1018 pj_uint32_t flags,
1019 pj_sockaddr_t *addr,
1020 int *addrlen)
1021{
1022 int rc;
1023 DWORD bytesRead;
1024 DWORD dwFlags = 0;
1025 union operation_key *op_key_rec;
1026
1027 PJ_CHECK_STACK();
1028 PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
1029
Benny Prijono5accbd02006-03-30 16:32:18 +00001030#if PJ_IOQUEUE_HAS_SAFE_UNREG
1031 /* Check key is not closing */
1032 if (key->closing)
1033 return PJ_ECANCELLED;
1034#endif
1035
Benny Prijono9033e312005-11-21 02:08:39 +00001036 op_key_rec = (union operation_key*)op_key->internal__;
1037 op_key_rec->overlapped.wsabuf.buf = buffer;
1038 op_key_rec->overlapped.wsabuf.len = *length;
1039
1040 dwFlags = flags;
1041
1042 /* Try non-overlapped received first to see if data is
1043 * immediately available.
1044 */
1045 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
1046 rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1047 &bytesRead, &dwFlags, addr, addrlen, NULL, NULL);
1048 if (rc == 0) {
1049 *length = bytesRead;
1050 return PJ_SUCCESS;
1051 } else {
1052 DWORD dwError = WSAGetLastError();
1053 if (dwError != WSAEWOULDBLOCK) {
1054 *length = -1;
1055 return PJ_RETURN_OS_ERROR(dwError);
1056 }
1057 }
1058 }
1059
1060 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
1061
1062 /*
1063 * No immediate data available.
1064 * Register overlapped Recv() operation.
1065 */
Benny Prijonoac623b32006-07-03 15:19:31 +00001066 pj_bzero( &op_key_rec->overlapped.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +00001067 sizeof(op_key_rec->overlapped.overlapped));
1068 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
1069
1070 rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1071 &bytesRead, &dwFlags, addr, addrlen,
1072 &op_key_rec->overlapped.overlapped, NULL);
1073 if (rc == SOCKET_ERROR) {
1074 DWORD dwStatus = WSAGetLastError();
1075 if (dwStatus!=WSA_IO_PENDING) {
1076 *length = -1;
1077 return PJ_STATUS_FROM_OS(dwStatus);
1078 }
1079 }
1080
1081 /* Pending operation has been scheduled. */
1082 return PJ_EPENDING;
1083}
1084
1085/*
1086 * pj_ioqueue_send()
1087 *
1088 * Initiate overlapped Send operation.
1089 */
1090PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
1091 pj_ioqueue_op_key_t *op_key,
1092 const void *data,
1093 pj_ssize_t *length,
1094 pj_uint32_t flags )
1095{
1096 return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0);
1097}
1098
1099
1100/*
1101 * pj_ioqueue_sendto()
1102 *
1103 * Initiate overlapped SendTo operation.
1104 */
1105PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
1106 pj_ioqueue_op_key_t *op_key,
1107 const void *data,
1108 pj_ssize_t *length,
1109 pj_uint32_t flags,
1110 const pj_sockaddr_t *addr,
1111 int addrlen)
1112{
1113 int rc;
1114 DWORD bytesWritten;
1115 DWORD dwFlags;
1116 union operation_key *op_key_rec;
1117
1118 PJ_CHECK_STACK();
1119 PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);
Benny Prijono5accbd02006-03-30 16:32:18 +00001120
1121#if PJ_IOQUEUE_HAS_SAFE_UNREG
1122 /* Check key is not closing */
1123 if (key->closing)
1124 return PJ_ECANCELLED;
1125#endif
1126
Benny Prijono9033e312005-11-21 02:08:39 +00001127 op_key_rec = (union operation_key*)op_key->internal__;
1128
1129 /*
1130 * First try blocking write.
1131 */
1132 op_key_rec->overlapped.wsabuf.buf = (void*)data;
1133 op_key_rec->overlapped.wsabuf.len = *length;
1134
1135 dwFlags = flags;
1136
1137 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
1138 rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1139 &bytesWritten, dwFlags, addr, addrlen,
1140 NULL, NULL);
1141 if (rc == 0) {
1142 *length = bytesWritten;
1143 return PJ_SUCCESS;
1144 } else {
1145 DWORD dwStatus = WSAGetLastError();
1146 if (dwStatus != WSAEWOULDBLOCK) {
1147 *length = -1;
1148 return PJ_RETURN_OS_ERROR(dwStatus);
1149 }
1150 }
1151 }
1152
1153 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
1154
1155 /*
1156 * Data can't be sent immediately.
1157 * Schedule asynchronous WSASend().
1158 */
Benny Prijonoac623b32006-07-03 15:19:31 +00001159 pj_bzero( &op_key_rec->overlapped.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +00001160 sizeof(op_key_rec->overlapped.overlapped));
1161 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND;
1162
1163 rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1164 &bytesWritten, dwFlags, addr, addrlen,
1165 &op_key_rec->overlapped.overlapped, NULL);
1166 if (rc == SOCKET_ERROR) {
1167 DWORD dwStatus = WSAGetLastError();
1168 if (dwStatus!=WSA_IO_PENDING)
1169 return PJ_STATUS_FROM_OS(dwStatus);
1170 }
1171
1172 /* Asynchronous operation successfully submitted. */
1173 return PJ_EPENDING;
1174}
1175
1176#if PJ_HAS_TCP
1177
1178/*
1179 * pj_ioqueue_accept()
1180 *
1181 * Initiate overlapped accept() operation.
1182 */
1183PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
1184 pj_ioqueue_op_key_t *op_key,
1185 pj_sock_t *new_sock,
1186 pj_sockaddr_t *local,
1187 pj_sockaddr_t *remote,
1188 int *addrlen)
1189{
1190 BOOL rc;
1191 DWORD bytesReceived;
1192 pj_status_t status;
1193 union operation_key *op_key_rec;
1194 SOCKET sock;
1195
1196 PJ_CHECK_STACK();
1197 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
1198
Benny Prijono5accbd02006-03-30 16:32:18 +00001199#if PJ_IOQUEUE_HAS_SAFE_UNREG
1200 /* Check key is not closing */
1201 if (key->closing)
1202 return PJ_ECANCELLED;
1203#endif
1204
Benny Prijono9033e312005-11-21 02:08:39 +00001205 /*
1206 * See if there is a new connection immediately available.
1207 */
1208 sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0);
1209 if (sock != INVALID_SOCKET) {
1210 /* Yes! New socket is available! */
1211 int status;
1212
1213 status = getsockname(sock, local, addrlen);
1214 if (status != 0) {
1215 DWORD dwError = WSAGetLastError();
1216 closesocket(sock);
1217 return PJ_RETURN_OS_ERROR(dwError);
1218 }
1219
1220 *new_sock = sock;
1221 return PJ_SUCCESS;
1222
1223 } else {
1224 DWORD dwError = WSAGetLastError();
1225 if (dwError != WSAEWOULDBLOCK) {
1226 return PJ_RETURN_OS_ERROR(dwError);
1227 }
1228 }
1229
1230 /*
1231 * No connection is immediately available.
1232 * Must schedule an asynchronous operation.
1233 */
1234 op_key_rec = (union operation_key*)op_key->internal__;
1235
Benny Prijono8ab968f2007-07-20 08:08:30 +00001236 status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0,
Benny Prijono9033e312005-11-21 02:08:39 +00001237 &op_key_rec->accept.newsock);
1238 if (status != PJ_SUCCESS)
1239 return status;
1240
1241 /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket
1242 * addresses can be obtained with getsockname() and getpeername().
1243 */
1244 status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET,
1245 SO_UPDATE_ACCEPT_CONTEXT,
1246 (char*)&key->hnd, sizeof(SOCKET));
1247 /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
1248 * So ignore the error status.
1249 */
1250
1251 op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT;
1252 op_key_rec->accept.addrlen = addrlen;
1253 op_key_rec->accept.local = local;
1254 op_key_rec->accept.remote = remote;
1255 op_key_rec->accept.newsock_ptr = new_sock;
Benny Prijonoac623b32006-07-03 15:19:31 +00001256 pj_bzero( &op_key_rec->accept.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +00001257 sizeof(op_key_rec->accept.overlapped));
1258
1259 rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock,
1260 op_key_rec->accept.accept_buf,
1261 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN,
1262 &bytesReceived,
1263 &op_key_rec->accept.overlapped );
1264
1265 if (rc == TRUE) {
1266 ioqueue_on_accept_complete(&op_key_rec->accept);
1267 return PJ_SUCCESS;
1268 } else {
1269 DWORD dwStatus = WSAGetLastError();
1270 if (dwStatus!=WSA_IO_PENDING)
1271 return PJ_STATUS_FROM_OS(dwStatus);
1272 }
1273
1274 /* Asynchronous Accept() has been submitted. */
1275 return PJ_EPENDING;
1276}
1277
1278
1279/*
1280 * pj_ioqueue_connect()
1281 *
1282 * Initiate overlapped connect() operation (well, it's non-blocking actually,
1283 * since there's no overlapped version of connect()).
1284 */
1285PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
1286 const pj_sockaddr_t *addr,
1287 int addrlen )
1288{
1289 HANDLE hEvent;
1290 pj_ioqueue_t *ioqueue;
1291
1292 PJ_CHECK_STACK();
1293 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
1294
Benny Prijono5accbd02006-03-30 16:32:18 +00001295#if PJ_IOQUEUE_HAS_SAFE_UNREG
1296 /* Check key is not closing */
1297 if (key->closing)
1298 return PJ_ECANCELLED;
1299#endif
1300
Benny Prijono9033e312005-11-21 02:08:39 +00001301 /* Initiate connect() */
1302 if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) {
1303 DWORD dwStatus;
1304 dwStatus = WSAGetLastError();
1305 if (dwStatus != WSAEWOULDBLOCK) {
1306 return PJ_RETURN_OS_ERROR(dwStatus);
1307 }
1308 } else {
1309 /* Connect has completed immediately! */
1310 return PJ_SUCCESS;
1311 }
1312
1313 ioqueue = key->ioqueue;
1314
1315 /* Add to the array of connecting socket to be polled */
1316 pj_lock_acquire(ioqueue->lock);
1317
1318 if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) {
1319 pj_lock_release(ioqueue->lock);
1320 return PJ_ETOOMANYCONN;
1321 }
1322
1323 /* Get or create event object. */
1324 if (ioqueue->event_count) {
1325 hEvent = ioqueue->event_pool[ioqueue->event_count - 1];
1326 --ioqueue->event_count;
1327 } else {
1328 hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
1329 if (hEvent == NULL) {
1330 DWORD dwStatus = GetLastError();
1331 pj_lock_release(ioqueue->lock);
1332 return PJ_STATUS_FROM_OS(dwStatus);
1333 }
1334 }
1335
1336 /* Mark key as connecting.
1337 * We can't use array index since key can be removed dynamically.
1338 */
1339 key->connecting = 1;
1340
1341 /* Associate socket events to the event object. */
1342 if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) {
1343 CloseHandle(hEvent);
1344 pj_lock_release(ioqueue->lock);
1345 return PJ_RETURN_OS_ERROR(WSAGetLastError());
1346 }
1347
1348 /* Add to array. */
1349 ioqueue->connecting_keys[ ioqueue->connecting_count ] = key;
1350 ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent;
1351 ioqueue->connecting_count++;
1352
1353 pj_lock_release(ioqueue->lock);
1354
1355 return PJ_EPENDING;
1356}
1357#endif /* #if PJ_HAS_TCP */
1358
1359
1360PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1361 pj_size_t size )
1362{
Benny Prijonoac623b32006-07-03 15:19:31 +00001363 pj_bzero(op_key, size);
Benny Prijono9033e312005-11-21 02:08:39 +00001364}
1365
1366PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1367 pj_ioqueue_op_key_t *op_key )
1368{
1369 BOOL rc;
1370 DWORD bytesTransfered;
1371
1372 rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key,
1373 &bytesTransfered, FALSE );
1374
1375 if (rc == FALSE) {
1376 return GetLastError()==ERROR_IO_INCOMPLETE;
1377 }
1378
1379 return FALSE;
1380}
1381
1382
1383PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1384 pj_ioqueue_op_key_t *op_key,
1385 pj_ssize_t bytes_status )
1386{
1387 BOOL rc;
1388
1389 rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status,
1390 (long)key, (OVERLAPPED*)op_key );
1391 if (rc == FALSE) {
1392 return PJ_RETURN_OS_ERROR(GetLastError());
1393 }
1394
1395 return PJ_SUCCESS;
1396}
1397
Benny Prijonoe3f79fd2008-02-13 15:17:28 +00001398PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
1399 pj_bool_t allow)
1400{
1401 PJ_ASSERT_RETURN(key, PJ_EINVAL);
1402
1403 /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
1404 * disabled.
1405 */
1406 PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
1407
1408 key->allow_concurrent = allow;
1409 return PJ_SUCCESS;
1410}
1411
1412PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
1413{
1414#if PJ_IOQUEUE_HAS_SAFE_UNREG
1415 return pj_mutex_lock(key->mutex);
1416#else
1417 PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
1418#endif
1419}
1420
1421PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
1422{
1423#if PJ_IOQUEUE_HAS_SAFE_UNREG
1424 return pj_mutex_unlock(key->mutex);
1425#else
1426 PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
1427#endif
1428}
1429