blob: 29bb3684437df1f8e2d5cfdf4f45c698de7b69aa [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
Benny Prijono32177c02008-06-20 22:44:47 +00003 * Copyright (C)2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono9033e312005-11-21 02:08:39 +00004 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19#include <pj/ioqueue.h>
20#include <pj/os.h>
21#include <pj/lock.h>
22#include <pj/pool.h>
23#include <pj/string.h>
24#include <pj/sock.h>
25#include <pj/array.h>
26#include <pj/log.h>
27#include <pj/assert.h>
28#include <pj/errno.h>
Benny Prijono9c025eb2006-07-10 21:35:27 +000029#include <pj/compat/socket.h>
Benny Prijono9033e312005-11-21 02:08:39 +000030
31
32#if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0
33# include <winsock2.h>
34#elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0
35# include <winsock.h>
36#endif
37
38#if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0
39# include <mswsock.h>
40#endif
41
42
43/* The address specified in AcceptEx() must be 16 more than the size of
44 * SOCKADDR (source: MSDN).
45 */
46#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16)
47
48typedef struct generic_overlapped
49{
50 WSAOVERLAPPED overlapped;
51 pj_ioqueue_operation_e operation;
52} generic_overlapped;
53
54/*
55 * OVERLAPPPED structure for send and receive.
56 */
57typedef struct ioqueue_overlapped
58{
59 WSAOVERLAPPED overlapped;
60 pj_ioqueue_operation_e operation;
61 WSABUF wsabuf;
62 pj_sockaddr_in dummy_addr;
63 int dummy_addrlen;
64} ioqueue_overlapped;
65
66#if PJ_HAS_TCP
67/*
68 * OVERLAP structure for accept.
69 */
70typedef struct ioqueue_accept_rec
71{
72 WSAOVERLAPPED overlapped;
73 pj_ioqueue_operation_e operation;
74 pj_sock_t newsock;
75 pj_sock_t *newsock_ptr;
76 int *addrlen;
77 void *remote;
78 void *local;
79 char accept_buf[2 * ACCEPT_ADDR_LEN];
80} ioqueue_accept_rec;
81#endif
82
83/*
84 * Structure to hold pending operation key.
85 */
86union operation_key
87{
88 generic_overlapped generic;
89 ioqueue_overlapped overlapped;
90#if PJ_HAS_TCP
91 ioqueue_accept_rec accept;
92#endif
93};
94
95/* Type of handle in the key. */
96enum handle_type
97{
98 HND_IS_UNKNOWN,
99 HND_IS_FILE,
100 HND_IS_SOCKET,
101};
102
Benny Prijono8d317a02006-03-22 11:49:19 +0000103enum { POST_QUIT_LEN = 0xFFFFDEADUL };
104
Benny Prijono9033e312005-11-21 02:08:39 +0000105/*
106 * Structure for individual socket.
107 */
108struct pj_ioqueue_key_t
109{
Benny Prijono5accbd02006-03-30 16:32:18 +0000110 PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
111
Benny Prijono9033e312005-11-21 02:08:39 +0000112 pj_ioqueue_t *ioqueue;
113 HANDLE hnd;
114 void *user_data;
115 enum handle_type hnd_type;
Benny Prijono5accbd02006-03-30 16:32:18 +0000116 pj_ioqueue_callback cb;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000117 pj_bool_t allow_concurrent;
Benny Prijono5accbd02006-03-30 16:32:18 +0000118
Benny Prijono9033e312005-11-21 02:08:39 +0000119#if PJ_HAS_TCP
120 int connecting;
121#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000122
123#if PJ_IOQUEUE_HAS_SAFE_UNREG
124 pj_atomic_t *ref_count;
125 pj_bool_t closing;
126 pj_time_val free_time;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000127 pj_mutex_t *mutex;
Benny Prijono5accbd02006-03-30 16:32:18 +0000128#endif
129
Benny Prijono9033e312005-11-21 02:08:39 +0000130};
131
132/*
133 * IO Queue structure.
134 */
135struct pj_ioqueue_t
136{
137 HANDLE iocp;
138 pj_lock_t *lock;
139 pj_bool_t auto_delete_lock;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000140 pj_bool_t default_concurrency;
Benny Prijono5accbd02006-03-30 16:32:18 +0000141
142#if PJ_IOQUEUE_HAS_SAFE_UNREG
143 pj_ioqueue_key_t active_list;
144 pj_ioqueue_key_t free_list;
145 pj_ioqueue_key_t closing_list;
146#endif
147
148 /* These are to keep track of connecting sockets */
149#if PJ_HAS_TCP
Benny Prijono9033e312005-11-21 02:08:39 +0000150 unsigned event_count;
151 HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1];
Benny Prijono9033e312005-11-21 02:08:39 +0000152 unsigned connecting_count;
153 HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1];
154 pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1];
155#endif
156};
157
158
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000159#if PJ_IOQUEUE_HAS_SAFE_UNREG
160/* Prototype */
161static void scan_closing_keys(pj_ioqueue_t *ioqueue);
162#endif
163
164
Benny Prijono9033e312005-11-21 02:08:39 +0000165#if PJ_HAS_TCP
166/*
167 * Process the socket when the overlapped accept() completed.
168 */
169static void ioqueue_on_accept_complete(ioqueue_accept_rec *accept_overlapped)
170{
171 struct sockaddr *local;
172 struct sockaddr *remote;
173 int locallen, remotelen;
174
175 PJ_CHECK_STACK();
176
177 /* Operation complete immediately. */
Benny Prijono25c8f932008-08-26 14:41:26 +0000178 if (accept_overlapped->addrlen) {
179 GetAcceptExSockaddrs( accept_overlapped->accept_buf,
180 0,
181 ACCEPT_ADDR_LEN,
182 ACCEPT_ADDR_LEN,
183 &local,
184 &locallen,
185 &remote,
186 &remotelen);
187 if (*accept_overlapped->addrlen >= locallen) {
188 if (accept_overlapped->local)
189 pj_memcpy(accept_overlapped->local, local, locallen);
190 if (accept_overlapped->remote)
191 pj_memcpy(accept_overlapped->remote, remote, locallen);
192 } else {
193 if (accept_overlapped->local)
194 pj_bzero(accept_overlapped->local,
195 *accept_overlapped->addrlen);
196 if (accept_overlapped->remote)
197 pj_bzero(accept_overlapped->remote,
198 *accept_overlapped->addrlen);
199 }
200
201 *accept_overlapped->addrlen = locallen;
Benny Prijono9033e312005-11-21 02:08:39 +0000202 }
Benny Prijono9033e312005-11-21 02:08:39 +0000203 if (accept_overlapped->newsock_ptr)
204 *accept_overlapped->newsock_ptr = accept_overlapped->newsock;
205 accept_overlapped->operation = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000206}
207
208static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos)
209{
210 pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos];
211 HANDLE hEvent = ioqueue->connecting_handles[pos];
212
213 /* Remove key from array of connecting handles. */
214 pj_array_erase(ioqueue->connecting_keys, sizeof(key),
215 ioqueue->connecting_count, pos);
216 pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE),
217 ioqueue->connecting_count, pos);
218 --ioqueue->connecting_count;
219
220 /* Disassociate the socket from the event. */
221 WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0);
222
223 /* Put event object to pool. */
224 if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) {
225 ioqueue->event_pool[ioqueue->event_count++] = hEvent;
226 } else {
227 /* Shouldn't happen. There should be no more pending connections
228 * than max.
229 */
230 pj_assert(0);
231 CloseHandle(hEvent);
232 }
233
234}
235
236/*
237 * Poll for the completion of non-blocking connect().
238 * If there's a completion, the function return the key of the completed
239 * socket, and 'result' argument contains the connect() result. If connect()
240 * succeeded, 'result' will have value zero, otherwise will have the error
241 * code.
242 */
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000243static int check_connecting( pj_ioqueue_t *ioqueue )
Benny Prijono9033e312005-11-21 02:08:39 +0000244{
Benny Prijono9033e312005-11-21 02:08:39 +0000245 if (ioqueue->connecting_count) {
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000246 int i, count;
247 struct
248 {
249 pj_ioqueue_key_t *key;
250 pj_status_t status;
251 } events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1];
Benny Prijono9033e312005-11-21 02:08:39 +0000252
253 pj_lock_acquire(ioqueue->lock);
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000254 for (count=0; count<PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1; ++count) {
255 DWORD result;
Benny Prijono9033e312005-11-21 02:08:39 +0000256
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000257 result = WaitForMultipleObjects(ioqueue->connecting_count,
258 ioqueue->connecting_handles,
259 FALSE, 0);
260 if (result >= WAIT_OBJECT_0 &&
261 result < WAIT_OBJECT_0+ioqueue->connecting_count)
262 {
263 WSANETWORKEVENTS net_events;
Benny Prijono9033e312005-11-21 02:08:39 +0000264
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000265 /* Got completed connect(). */
266 unsigned pos = result - WAIT_OBJECT_0;
267 events[count].key = ioqueue->connecting_keys[pos];
Benny Prijono9033e312005-11-21 02:08:39 +0000268
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000269 /* See whether connect has succeeded. */
270 WSAEnumNetworkEvents((pj_sock_t)events[count].key->hnd,
271 ioqueue->connecting_handles[pos],
272 &net_events);
273 events[count].status =
274 PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]);
275
276 /* Erase socket from pending connect. */
277 erase_connecting_socket(ioqueue, pos);
278 } else {
279 /* No more events */
280 break;
281 }
Benny Prijono9033e312005-11-21 02:08:39 +0000282 }
283 pj_lock_release(ioqueue->lock);
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000284
285 /* Call callbacks. */
286 for (i=0; i<count; ++i) {
287 if (events[i].key->cb.on_connect_complete) {
288 events[i].key->cb.on_connect_complete(events[i].key,
289 events[i].status);
290 }
291 }
292
293 return count;
Benny Prijono9033e312005-11-21 02:08:39 +0000294 }
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000295
296 return 0;
297
Benny Prijono9033e312005-11-21 02:08:39 +0000298}
299#endif
300
301/*
302 * pj_ioqueue_name()
303 */
304PJ_DEF(const char*) pj_ioqueue_name(void)
305{
306 return "iocp";
307}
308
309/*
310 * pj_ioqueue_create()
311 */
312PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
313 pj_size_t max_fd,
314 pj_ioqueue_t **p_ioqueue)
315{
316 pj_ioqueue_t *ioqueue;
Benny Prijono5accbd02006-03-30 16:32:18 +0000317 unsigned i;
Benny Prijono9033e312005-11-21 02:08:39 +0000318 pj_status_t rc;
319
320 PJ_UNUSED_ARG(max_fd);
321 PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL);
322
323 rc = sizeof(union operation_key);
324
325 /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */
326 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
327 sizeof(union operation_key), PJ_EBUG);
328
Benny Prijono5accbd02006-03-30 16:32:18 +0000329 /* Create IOCP */
Benny Prijono9033e312005-11-21 02:08:39 +0000330 ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue));
331 ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
332 if (ioqueue->iocp == NULL)
333 return PJ_RETURN_OS_ERROR(GetLastError());
334
Benny Prijono5accbd02006-03-30 16:32:18 +0000335 /* Create IOCP mutex */
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000336 rc = pj_lock_create_recursive_mutex(pool, NULL, &ioqueue->lock);
Benny Prijono9033e312005-11-21 02:08:39 +0000337 if (rc != PJ_SUCCESS) {
338 CloseHandle(ioqueue->iocp);
339 return rc;
340 }
341
342 ioqueue->auto_delete_lock = PJ_TRUE;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000343 ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
Benny Prijono9033e312005-11-21 02:08:39 +0000344
Benny Prijono5accbd02006-03-30 16:32:18 +0000345#if PJ_IOQUEUE_HAS_SAFE_UNREG
346 /*
347 * Create and initialize key pools.
348 */
349 pj_list_init(&ioqueue->active_list);
350 pj_list_init(&ioqueue->free_list);
351 pj_list_init(&ioqueue->closing_list);
352
353 /* Preallocate keys according to max_fd setting, and put them
354 * in free_list.
355 */
356 for (i=0; i<max_fd; ++i) {
357 pj_ioqueue_key_t *key;
358
359 key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t));
360
361 rc = pj_atomic_create(pool, 0, &key->ref_count);
362 if (rc != PJ_SUCCESS) {
363 key = ioqueue->free_list.next;
364 while (key != &ioqueue->free_list) {
365 pj_atomic_destroy(key->ref_count);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000366 pj_mutex_destroy(key->mutex);
367 key = key->next;
368 }
369 CloseHandle(ioqueue->iocp);
370 return rc;
371 }
372
373 rc = pj_mutex_create_recursive(pool, "ioqkey", &key->mutex);
374 if (rc != PJ_SUCCESS) {
375 pj_atomic_destroy(key->ref_count);
376 key = ioqueue->free_list.next;
377 while (key != &ioqueue->free_list) {
378 pj_atomic_destroy(key->ref_count);
379 pj_mutex_destroy(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000380 key = key->next;
381 }
382 CloseHandle(ioqueue->iocp);
383 return rc;
384 }
385
386 pj_list_push_back(&ioqueue->free_list, key);
Benny Prijono5accbd02006-03-30 16:32:18 +0000387 }
388#endif
389
Benny Prijono9033e312005-11-21 02:08:39 +0000390 *p_ioqueue = ioqueue;
391
392 PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue));
393 return PJ_SUCCESS;
394}
395
396/*
397 * pj_ioqueue_destroy()
398 */
399PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
400{
Benny Prijono3569c0d2007-04-06 10:29:20 +0000401#if PJ_HAS_TCP
Benny Prijono9033e312005-11-21 02:08:39 +0000402 unsigned i;
Benny Prijono3569c0d2007-04-06 10:29:20 +0000403#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000404 pj_ioqueue_key_t *key;
Benny Prijono9033e312005-11-21 02:08:39 +0000405
406 PJ_CHECK_STACK();
407 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
408
Benny Prijono5accbd02006-03-30 16:32:18 +0000409 pj_lock_acquire(ioqueue->lock);
410
411#if PJ_HAS_TCP
Benny Prijono9033e312005-11-21 02:08:39 +0000412 /* Destroy events in the pool */
413 for (i=0; i<ioqueue->event_count; ++i) {
414 CloseHandle(ioqueue->event_pool[i]);
415 }
416 ioqueue->event_count = 0;
Benny Prijono5accbd02006-03-30 16:32:18 +0000417#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000418
419 if (CloseHandle(ioqueue->iocp) != TRUE)
420 return PJ_RETURN_OS_ERROR(GetLastError());
421
Benny Prijono5accbd02006-03-30 16:32:18 +0000422#if PJ_IOQUEUE_HAS_SAFE_UNREG
423 /* Destroy reference counters */
424 key = ioqueue->active_list.next;
425 while (key != &ioqueue->active_list) {
426 pj_atomic_destroy(key->ref_count);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000427 pj_mutex_destroy(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000428 key = key->next;
429 }
430
431 key = ioqueue->closing_list.next;
432 while (key != &ioqueue->closing_list) {
433 pj_atomic_destroy(key->ref_count);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000434 pj_mutex_destroy(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000435 key = key->next;
436 }
437
438 key = ioqueue->free_list.next;
439 while (key != &ioqueue->free_list) {
440 pj_atomic_destroy(key->ref_count);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000441 pj_mutex_destroy(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000442 key = key->next;
443 }
444#endif
445
Benny Prijono9033e312005-11-21 02:08:39 +0000446 if (ioqueue->auto_delete_lock)
447 pj_lock_destroy(ioqueue->lock);
448
449 return PJ_SUCCESS;
450}
451
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000452
453PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue,
454 pj_bool_t allow)
455{
456 PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
457 ioqueue->default_concurrency = allow;
458 return PJ_SUCCESS;
459}
460
Benny Prijono9033e312005-11-21 02:08:39 +0000461/*
462 * pj_ioqueue_set_lock()
463 */
464PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
465 pj_lock_t *lock,
466 pj_bool_t auto_delete )
467{
468 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
469
470 if (ioqueue->auto_delete_lock) {
471 pj_lock_destroy(ioqueue->lock);
472 }
473
474 ioqueue->lock = lock;
475 ioqueue->auto_delete_lock = auto_delete;
476
477 return PJ_SUCCESS;
478}
479
480/*
481 * pj_ioqueue_register_sock()
482 */
483PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
484 pj_ioqueue_t *ioqueue,
485 pj_sock_t sock,
486 void *user_data,
487 const pj_ioqueue_callback *cb,
488 pj_ioqueue_key_t **key )
489{
490 HANDLE hioq;
491 pj_ioqueue_key_t *rec;
492 u_long value;
493 int rc;
494
495 PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL);
496
Benny Prijono5accbd02006-03-30 16:32:18 +0000497 pj_lock_acquire(ioqueue->lock);
498
499#if PJ_IOQUEUE_HAS_SAFE_UNREG
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000500 /* Scan closing list first to release unused keys.
501 * Must do this with lock acquired.
502 */
503 scan_closing_keys(ioqueue);
504
Benny Prijono5accbd02006-03-30 16:32:18 +0000505 /* If safe unregistration is used, then get the key record from
506 * the free list.
507 */
508 if (pj_list_empty(&ioqueue->free_list)) {
509 pj_lock_release(ioqueue->lock);
510 return PJ_ETOOMANY;
511 }
512
513 rec = ioqueue->free_list.next;
514 pj_list_erase(rec);
515
516 /* Set initial reference count to 1 */
517 pj_assert(pj_atomic_get(rec->ref_count) == 0);
518 pj_atomic_inc(rec->ref_count);
519
520 rec->closing = 0;
521
522#else
Benny Prijono9033e312005-11-21 02:08:39 +0000523 rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
Benny Prijono5accbd02006-03-30 16:32:18 +0000524#endif
525
526 /* Build the key for this socket. */
Benny Prijono9033e312005-11-21 02:08:39 +0000527 rec->ioqueue = ioqueue;
528 rec->hnd = (HANDLE)sock;
529 rec->hnd_type = HND_IS_SOCKET;
530 rec->user_data = user_data;
531 pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
532
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000533 /* Set concurrency for this handle */
534 rc = pj_ioqueue_set_concurrency(rec, ioqueue->default_concurrency);
535 if (rc != PJ_SUCCESS) {
536 pj_lock_release(ioqueue->lock);
537 return rc;
538 }
539
Benny Prijono5accbd02006-03-30 16:32:18 +0000540#if PJ_HAS_TCP
541 rec->connecting = 0;
542#endif
543
Benny Prijono9033e312005-11-21 02:08:39 +0000544 /* Set socket to nonblocking. */
545 value = 1;
546 rc = ioctlsocket(sock, FIONBIO, &value);
547 if (rc != 0) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000548 pj_lock_release(ioqueue->lock);
Benny Prijono9033e312005-11-21 02:08:39 +0000549 return PJ_RETURN_OS_ERROR(WSAGetLastError());
550 }
551
552 /* Associate with IOCP */
553 hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0);
554 if (!hioq) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000555 pj_lock_release(ioqueue->lock);
Benny Prijono9033e312005-11-21 02:08:39 +0000556 return PJ_RETURN_OS_ERROR(GetLastError());
557 }
558
559 *key = rec;
Benny Prijono5accbd02006-03-30 16:32:18 +0000560
561#if PJ_IOQUEUE_HAS_SAFE_UNREG
562 pj_list_push_back(&ioqueue->active_list, rec);
563#endif
564
565 pj_lock_release(ioqueue->lock);
566
Benny Prijono9033e312005-11-21 02:08:39 +0000567 return PJ_SUCCESS;
568}
569
Benny Prijono9033e312005-11-21 02:08:39 +0000570
571/*
572 * pj_ioqueue_get_user_data()
573 */
574PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
575{
576 PJ_ASSERT_RETURN(key, NULL);
577 return key->user_data;
578}
579
580/*
581 * pj_ioqueue_set_user_data()
582 */
583PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
584 void *user_data,
585 void **old_data )
586{
587 PJ_ASSERT_RETURN(key, PJ_EINVAL);
588
589 if (old_data)
590 *old_data = key->user_data;
591
592 key->user_data = user_data;
593 return PJ_SUCCESS;
594}
595
Benny Prijono8d317a02006-03-22 11:49:19 +0000596
Benny Prijono5accbd02006-03-30 16:32:18 +0000597#if PJ_IOQUEUE_HAS_SAFE_UNREG
598/* Decrement the key's reference counter, and when the counter reach zero,
599 * destroy the key.
600 */
601static void decrement_counter(pj_ioqueue_key_t *key)
602{
603 if (pj_atomic_dec_and_get(key->ref_count) == 0) {
604
605 pj_lock_acquire(key->ioqueue->lock);
606
607 pj_assert(key->closing == 1);
608 pj_gettimeofday(&key->free_time);
609 key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
610 pj_time_val_normalize(&key->free_time);
611
612 pj_list_erase(key);
613 pj_list_push_back(&key->ioqueue->closing_list, key);
614
615 pj_lock_release(key->ioqueue->lock);
616 }
617}
618#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000619
Benny Prijono9033e312005-11-21 02:08:39 +0000620/*
Benny Prijono5accbd02006-03-30 16:32:18 +0000621 * Poll the I/O Completion Port, execute callback,
Benny Prijono8d317a02006-03-22 11:49:19 +0000622 * and return the key and bytes transfered of the last operation.
Benny Prijono9033e312005-11-21 02:08:39 +0000623 */
Benny Prijono8d317a02006-03-22 11:49:19 +0000624static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
625 pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key )
Benny Prijono9033e312005-11-21 02:08:39 +0000626{
Benny Prijono8d317a02006-03-22 11:49:19 +0000627 DWORD dwBytesTransfered, dwKey;
Benny Prijono9033e312005-11-21 02:08:39 +0000628 generic_overlapped *pOv;
629 pj_ioqueue_key_t *key;
Benny Prijono4f2be312005-11-21 17:01:06 +0000630 pj_ssize_t size_status = -1;
Benny Prijono8d317a02006-03-22 11:49:19 +0000631 BOOL rcGetQueued;
Benny Prijono9033e312005-11-21 02:08:39 +0000632
633 /* Poll for completion status. */
Benny Prijono8d317a02006-03-22 11:49:19 +0000634 rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransfered,
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000635 &dwKey, (OVERLAPPED**)&pOv,
Benny Prijono8d317a02006-03-22 11:49:19 +0000636 dwTimeout);
Benny Prijono9033e312005-11-21 02:08:39 +0000637
638 /* The return value is:
639 * - nonzero if event was dequeued.
640 * - zero and pOv==NULL if no event was dequeued.
641 * - zero and pOv!=NULL if event for failed I/O was dequeued.
642 */
643 if (pOv) {
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000644 pj_bool_t has_lock;
645
Benny Prijono9033e312005-11-21 02:08:39 +0000646 /* Event was dequeued for either successfull or failed I/O */
647 key = (pj_ioqueue_key_t*)dwKey;
648 size_status = dwBytesTransfered;
Benny Prijono8d317a02006-03-22 11:49:19 +0000649
650 /* Report to caller regardless */
651 if (p_bytes)
652 *p_bytes = size_status;
653 if (p_key)
654 *p_key = key;
655
Benny Prijono5accbd02006-03-30 16:32:18 +0000656#if PJ_IOQUEUE_HAS_SAFE_UNREG
657 /* We shouldn't call callbacks if key is quitting. */
658 if (key->closing)
Benny Prijono8d317a02006-03-22 11:49:19 +0000659 return PJ_TRUE;
Benny Prijono8d317a02006-03-22 11:49:19 +0000660
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000661 /* If concurrency is disabled, lock the key
662 * (and save the lock status to local var since app may change
663 * concurrency setting while in the callback) */
664 if (key->allow_concurrent == PJ_FALSE) {
665 pj_mutex_lock(key->mutex);
666 has_lock = PJ_TRUE;
667 } else {
668 has_lock = PJ_FALSE;
669 }
670
671 /* Now that we get the lock, check again that key is not closing */
672 if (key->closing) {
673 if (has_lock) {
674 pj_mutex_unlock(key->mutex);
675 }
676 return PJ_TRUE;
677 }
678
Benny Prijono5accbd02006-03-30 16:32:18 +0000679 /* Increment reference counter to prevent this key from being
680 * deleted
Benny Prijono8d317a02006-03-22 11:49:19 +0000681 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000682 pj_atomic_inc(key->ref_count);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000683#else
684 PJ_UNUSED_ARG(has_lock);
Benny Prijono5accbd02006-03-30 16:32:18 +0000685#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000686
687 /* Carry out the callback */
Benny Prijono9033e312005-11-21 02:08:39 +0000688 switch (pOv->operation) {
689 case PJ_IOQUEUE_OP_READ:
690 case PJ_IOQUEUE_OP_RECV:
691 case PJ_IOQUEUE_OP_RECV_FROM:
692 pOv->operation = 0;
693 if (key->cb.on_read_complete)
694 key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv,
695 size_status);
696 break;
697 case PJ_IOQUEUE_OP_WRITE:
698 case PJ_IOQUEUE_OP_SEND:
699 case PJ_IOQUEUE_OP_SEND_TO:
700 pOv->operation = 0;
701 if (key->cb.on_write_complete)
702 key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv,
703 size_status);
704 break;
705#if PJ_HAS_TCP
706 case PJ_IOQUEUE_OP_ACCEPT:
707 /* special case for accept. */
708 ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv);
709 if (key->cb.on_accept_complete) {
710 ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv;
Benny Prijono9c025eb2006-07-10 21:35:27 +0000711 pj_status_t status = PJ_SUCCESS;
Benny Prijono25c8f932008-08-26 14:41:26 +0000712 pj_sock_t newsock;
Benny Prijono9c025eb2006-07-10 21:35:27 +0000713
Benny Prijono25c8f932008-08-26 14:41:26 +0000714 newsock = accept_rec->newsock;
715 accept_rec->newsock = PJ_INVALID_SOCKET;
716
717 if (newsock == PJ_INVALID_SOCKET) {
Benny Prijono9c025eb2006-07-10 21:35:27 +0000718 int dwError = WSAGetLastError();
719 if (dwError == 0) dwError = OSERR_ENOTCONN;
720 status = PJ_RETURN_OS_ERROR(dwError);
721 }
722
Benny Prijono25c8f932008-08-26 14:41:26 +0000723 key->cb.on_accept_complete(key, (pj_ioqueue_op_key_t*)pOv,
724 newsock, status);
725
Benny Prijono9033e312005-11-21 02:08:39 +0000726 }
727 break;
728 case PJ_IOQUEUE_OP_CONNECT:
729#endif
730 case PJ_IOQUEUE_OP_NONE:
731 pj_assert(0);
732 break;
733 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000734
735#if PJ_IOQUEUE_HAS_SAFE_UNREG
736 decrement_counter(key);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000737 if (has_lock)
738 pj_mutex_unlock(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000739#endif
740
Benny Prijono8d317a02006-03-22 11:49:19 +0000741 return PJ_TRUE;
Benny Prijono9033e312005-11-21 02:08:39 +0000742 }
743
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000744 /* No event was queued. */
Benny Prijono8d317a02006-03-22 11:49:19 +0000745 return PJ_FALSE;
746}
747
748/*
749 * pj_ioqueue_unregister()
750 */
751PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
752{
Benny Prijono9fe1ad22007-02-17 00:30:24 +0000753 unsigned i;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000754 pj_bool_t has_lock;
Benny Prijono9fe1ad22007-02-17 00:30:24 +0000755 enum { RETRY = 10 };
756
Benny Prijono8d317a02006-03-22 11:49:19 +0000757 PJ_ASSERT_RETURN(key, PJ_EINVAL);
758
759#if PJ_HAS_TCP
760 if (key->connecting) {
761 unsigned pos;
762 pj_ioqueue_t *ioqueue;
763
764 ioqueue = key->ioqueue;
765
766 /* Erase from connecting_handles */
767 pj_lock_acquire(ioqueue->lock);
768 for (pos=0; pos < ioqueue->connecting_count; ++pos) {
769 if (ioqueue->connecting_keys[pos] == key) {
770 erase_connecting_socket(ioqueue, pos);
771 break;
772 }
773 }
774 key->connecting = 0;
775 pj_lock_release(ioqueue->lock);
776 }
777#endif
Benny Prijono08beac62006-11-23 07:31:27 +0000778
779#if PJ_IOQUEUE_HAS_SAFE_UNREG
780 /* Mark key as closing before closing handle. */
781 key->closing = 1;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000782
783 /* If concurrency is disabled, wait until the key has finished
784 * processing the callback
785 */
786 if (key->allow_concurrent == PJ_FALSE) {
787 pj_mutex_lock(key->mutex);
788 has_lock = PJ_TRUE;
789 } else {
790 has_lock = PJ_FALSE;
791 }
792#else
793 PJ_UNUSED_ARG(has_lock);
Benny Prijono08beac62006-11-23 07:31:27 +0000794#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000795
796 /* Close handle (the only way to disassociate handle from IOCP).
797 * We also need to close handle to make sure that no further events
798 * will come to the handle.
Benny Prijono8d317a02006-03-22 11:49:19 +0000799 */
Benny Prijono6d9ee8d2008-07-18 10:33:09 +0000800 /* Update 2008/07/18 (http://trac.pjsip.org/repos/ticket/575):
801 * - It seems that CloseHandle() in itself does not actually close
802 * the socket (i.e. it will still appear in "netstat" output). Also
803 * if we only use CloseHandle(), an "Invalid Handle" exception will
804 * be raised in WSACleanup().
805 * - MSDN documentation says that CloseHandle() must be called after
806 * closesocket() call (see
807 * http://msdn.microsoft.com/en-us/library/ms724211(VS.85).aspx).
808 * But turns out that this will raise "Invalid Handle" exception
809 * in debug mode.
810 * So because of this, we replaced CloseHandle() with closesocket()
811 * instead. These was tested on WinXP SP2.
812 */
813 //CloseHandle(key->hnd);
814 pj_sock_close((pj_sock_t)key->hnd);
Benny Prijono8d317a02006-03-22 11:49:19 +0000815
Benny Prijono5accbd02006-03-30 16:32:18 +0000816 /* Reset callbacks */
Benny Prijono8d317a02006-03-22 11:49:19 +0000817 key->cb.on_accept_complete = NULL;
818 key->cb.on_connect_complete = NULL;
Benny Prijono5accbd02006-03-30 16:32:18 +0000819 key->cb.on_read_complete = NULL;
820 key->cb.on_write_complete = NULL;
Benny Prijono8d317a02006-03-22 11:49:19 +0000821
Benny Prijono5accbd02006-03-30 16:32:18 +0000822#if PJ_IOQUEUE_HAS_SAFE_UNREG
Benny Prijono5accbd02006-03-30 16:32:18 +0000823 /* Even after handle is closed, I suspect that IOCP may still try to
824 * do something with the handle, causing memory corruption when pool
825 * debugging is enabled.
826 *
827 * Forcing context switch seems to have fixed that, but this is quite
828 * an ugly solution..
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000829 *
830 * Update 2008/02/13:
831 * This should not happen if concurrency is disallowed for the key.
832 * So at least application has a solution for this (i.e. by disallowing
833 * concurrency in the key).
Benny Prijono8d317a02006-03-22 11:49:19 +0000834 */
Benny Prijono9fe1ad22007-02-17 00:30:24 +0000835 //This will loop forever if unregistration is done on the callback.
836 //Doing this with RETRY I think should solve the IOCP setting the
837 //socket signalled, without causing the deadlock.
838 //while (pj_atomic_get(key->ref_count) != 1)
839 // pj_thread_sleep(0);
840 for (i=0; pj_atomic_get(key->ref_count) != 1 && i<RETRY; ++i)
Benny Prijono08beac62006-11-23 07:31:27 +0000841 pj_thread_sleep(0);
842
843 /* Decrement reference counter to destroy the key. */
844 decrement_counter(key);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000845
846 if (has_lock)
847 pj_mutex_unlock(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000848#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000849
850 return PJ_SUCCESS;
851}
852
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000853#if PJ_IOQUEUE_HAS_SAFE_UNREG
854/* Scan the closing list, and put pending closing keys to free list.
855 * Must do this with ioqueue mutex held.
856 */
857static void scan_closing_keys(pj_ioqueue_t *ioqueue)
858{
859 if (!pj_list_empty(&ioqueue->closing_list)) {
860 pj_time_val now;
861 pj_ioqueue_key_t *key;
862
863 pj_gettimeofday(&now);
864
865 /* Move closing keys to free list when they've finished the closing
866 * idle time.
867 */
868 key = ioqueue->closing_list.next;
869 while (key != &ioqueue->closing_list) {
870 pj_ioqueue_key_t *next = key->next;
871
872 pj_assert(key->closing != 0);
873
874 if (PJ_TIME_VAL_GTE(now, key->free_time)) {
875 pj_list_erase(key);
876 pj_list_push_back(&ioqueue->free_list, key);
877 }
878 key = next;
879 }
880 }
881}
882#endif
883
Benny Prijono8d317a02006-03-22 11:49:19 +0000884/*
885 * pj_ioqueue_poll()
886 *
887 * Poll for events.
888 */
889PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
890{
891 DWORD dwMsec;
Benny Prijono3569c0d2007-04-06 10:29:20 +0000892#if PJ_HAS_TCP
Benny Prijono8d317a02006-03-22 11:49:19 +0000893 int connect_count = 0;
Benny Prijono3569c0d2007-04-06 10:29:20 +0000894#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000895 int event_count = 0;
Benny Prijono8d317a02006-03-22 11:49:19 +0000896
897 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
898
Benny Prijono8d317a02006-03-22 11:49:19 +0000899 /* Calculate miliseconds timeout for GetQueuedCompletionStatus */
900 dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
901
902 /* Poll for completion status. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000903 event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);
904
905#if PJ_HAS_TCP
906 /* Check the connecting array, only when there's no activity. */
907 if (event_count == 0) {
908 connect_count = check_connecting(ioqueue);
909 if (connect_count > 0)
910 event_count += connect_count;
911 }
912#endif
913
914#if PJ_IOQUEUE_HAS_SAFE_UNREG
915 /* Check the closing keys only when there's no activity and when there are
916 * pending closing keys.
917 */
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000918 if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000919 pj_lock_acquire(ioqueue->lock);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000920 scan_closing_keys(ioqueue);
Benny Prijono5accbd02006-03-30 16:32:18 +0000921 pj_lock_release(ioqueue->lock);
922 }
923#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000924
925 /* Return number of events. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000926 return event_count;
Benny Prijono9033e312005-11-21 02:08:39 +0000927}
928
929/*
930 * pj_ioqueue_recv()
931 *
932 * Initiate overlapped WSARecv() operation.
933 */
934PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
935 pj_ioqueue_op_key_t *op_key,
936 void *buffer,
937 pj_ssize_t *length,
938 pj_uint32_t flags )
939{
940 /*
941 * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and
942 * addrlen here. But unfortunately it generates EINVAL... :-(
943 * -bennylp
944 */
945 int rc;
946 DWORD bytesRead;
947 DWORD dwFlags = 0;
948 union operation_key *op_key_rec;
949
950 PJ_CHECK_STACK();
951 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
952
Benny Prijono5accbd02006-03-30 16:32:18 +0000953#if PJ_IOQUEUE_HAS_SAFE_UNREG
954 /* Check key is not closing */
955 if (key->closing)
956 return PJ_ECANCELLED;
957#endif
958
Benny Prijono9033e312005-11-21 02:08:39 +0000959 op_key_rec = (union operation_key*)op_key->internal__;
960 op_key_rec->overlapped.wsabuf.buf = buffer;
961 op_key_rec->overlapped.wsabuf.len = *length;
962
963 dwFlags = flags;
964
965 /* Try non-overlapped received first to see if data is
966 * immediately available.
967 */
968 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
969 rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
970 &bytesRead, &dwFlags, NULL, NULL);
971 if (rc == 0) {
972 *length = bytesRead;
973 return PJ_SUCCESS;
974 } else {
975 DWORD dwError = WSAGetLastError();
976 if (dwError != WSAEWOULDBLOCK) {
977 *length = -1;
978 return PJ_RETURN_OS_ERROR(dwError);
979 }
980 }
981 }
982
983 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
984
985 /*
986 * No immediate data available.
987 * Register overlapped Recv() operation.
988 */
Benny Prijonoac623b32006-07-03 15:19:31 +0000989 pj_bzero( &op_key_rec->overlapped.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +0000990 sizeof(op_key_rec->overlapped.overlapped));
991 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
992
993 rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
994 &bytesRead, &dwFlags,
995 &op_key_rec->overlapped.overlapped, NULL);
996 if (rc == SOCKET_ERROR) {
997 DWORD dwStatus = WSAGetLastError();
998 if (dwStatus!=WSA_IO_PENDING) {
999 *length = -1;
1000 return PJ_STATUS_FROM_OS(dwStatus);
1001 }
1002 }
1003
1004 /* Pending operation has been scheduled. */
1005 return PJ_EPENDING;
1006}
1007
1008/*
1009 * pj_ioqueue_recvfrom()
1010 *
1011 * Initiate overlapped RecvFrom() operation.
1012 */
1013PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
1014 pj_ioqueue_op_key_t *op_key,
1015 void *buffer,
1016 pj_ssize_t *length,
1017 pj_uint32_t flags,
1018 pj_sockaddr_t *addr,
1019 int *addrlen)
1020{
1021 int rc;
1022 DWORD bytesRead;
1023 DWORD dwFlags = 0;
1024 union operation_key *op_key_rec;
1025
1026 PJ_CHECK_STACK();
1027 PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
1028
Benny Prijono5accbd02006-03-30 16:32:18 +00001029#if PJ_IOQUEUE_HAS_SAFE_UNREG
1030 /* Check key is not closing */
1031 if (key->closing)
1032 return PJ_ECANCELLED;
1033#endif
1034
Benny Prijono9033e312005-11-21 02:08:39 +00001035 op_key_rec = (union operation_key*)op_key->internal__;
1036 op_key_rec->overlapped.wsabuf.buf = buffer;
1037 op_key_rec->overlapped.wsabuf.len = *length;
1038
1039 dwFlags = flags;
1040
1041 /* Try non-overlapped received first to see if data is
1042 * immediately available.
1043 */
1044 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
1045 rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1046 &bytesRead, &dwFlags, addr, addrlen, NULL, NULL);
1047 if (rc == 0) {
1048 *length = bytesRead;
1049 return PJ_SUCCESS;
1050 } else {
1051 DWORD dwError = WSAGetLastError();
1052 if (dwError != WSAEWOULDBLOCK) {
1053 *length = -1;
1054 return PJ_RETURN_OS_ERROR(dwError);
1055 }
1056 }
1057 }
1058
1059 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
1060
1061 /*
1062 * No immediate data available.
1063 * Register overlapped Recv() operation.
1064 */
Benny Prijonoac623b32006-07-03 15:19:31 +00001065 pj_bzero( &op_key_rec->overlapped.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +00001066 sizeof(op_key_rec->overlapped.overlapped));
1067 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
1068
1069 rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1070 &bytesRead, &dwFlags, addr, addrlen,
1071 &op_key_rec->overlapped.overlapped, NULL);
1072 if (rc == SOCKET_ERROR) {
1073 DWORD dwStatus = WSAGetLastError();
1074 if (dwStatus!=WSA_IO_PENDING) {
1075 *length = -1;
1076 return PJ_STATUS_FROM_OS(dwStatus);
1077 }
1078 }
1079
1080 /* Pending operation has been scheduled. */
1081 return PJ_EPENDING;
1082}
1083
1084/*
1085 * pj_ioqueue_send()
1086 *
1087 * Initiate overlapped Send operation.
1088 */
1089PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
1090 pj_ioqueue_op_key_t *op_key,
1091 const void *data,
1092 pj_ssize_t *length,
1093 pj_uint32_t flags )
1094{
1095 return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0);
1096}
1097
1098
1099/*
1100 * pj_ioqueue_sendto()
1101 *
1102 * Initiate overlapped SendTo operation.
1103 */
1104PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
1105 pj_ioqueue_op_key_t *op_key,
1106 const void *data,
1107 pj_ssize_t *length,
1108 pj_uint32_t flags,
1109 const pj_sockaddr_t *addr,
1110 int addrlen)
1111{
1112 int rc;
1113 DWORD bytesWritten;
1114 DWORD dwFlags;
1115 union operation_key *op_key_rec;
1116
1117 PJ_CHECK_STACK();
1118 PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);
Benny Prijono5accbd02006-03-30 16:32:18 +00001119
1120#if PJ_IOQUEUE_HAS_SAFE_UNREG
1121 /* Check key is not closing */
1122 if (key->closing)
1123 return PJ_ECANCELLED;
1124#endif
1125
Benny Prijono9033e312005-11-21 02:08:39 +00001126 op_key_rec = (union operation_key*)op_key->internal__;
1127
1128 /*
1129 * First try blocking write.
1130 */
1131 op_key_rec->overlapped.wsabuf.buf = (void*)data;
1132 op_key_rec->overlapped.wsabuf.len = *length;
1133
1134 dwFlags = flags;
1135
1136 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
1137 rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1138 &bytesWritten, dwFlags, addr, addrlen,
1139 NULL, NULL);
1140 if (rc == 0) {
1141 *length = bytesWritten;
1142 return PJ_SUCCESS;
1143 } else {
1144 DWORD dwStatus = WSAGetLastError();
1145 if (dwStatus != WSAEWOULDBLOCK) {
1146 *length = -1;
1147 return PJ_RETURN_OS_ERROR(dwStatus);
1148 }
1149 }
1150 }
1151
1152 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
1153
1154 /*
1155 * Data can't be sent immediately.
1156 * Schedule asynchronous WSASend().
1157 */
Benny Prijonoac623b32006-07-03 15:19:31 +00001158 pj_bzero( &op_key_rec->overlapped.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +00001159 sizeof(op_key_rec->overlapped.overlapped));
1160 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND;
1161
1162 rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1163 &bytesWritten, dwFlags, addr, addrlen,
1164 &op_key_rec->overlapped.overlapped, NULL);
1165 if (rc == SOCKET_ERROR) {
1166 DWORD dwStatus = WSAGetLastError();
1167 if (dwStatus!=WSA_IO_PENDING)
1168 return PJ_STATUS_FROM_OS(dwStatus);
1169 }
1170
1171 /* Asynchronous operation successfully submitted. */
1172 return PJ_EPENDING;
1173}
1174
1175#if PJ_HAS_TCP
1176
1177/*
1178 * pj_ioqueue_accept()
1179 *
1180 * Initiate overlapped accept() operation.
1181 */
1182PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
1183 pj_ioqueue_op_key_t *op_key,
1184 pj_sock_t *new_sock,
1185 pj_sockaddr_t *local,
1186 pj_sockaddr_t *remote,
1187 int *addrlen)
1188{
1189 BOOL rc;
1190 DWORD bytesReceived;
1191 pj_status_t status;
1192 union operation_key *op_key_rec;
1193 SOCKET sock;
1194
1195 PJ_CHECK_STACK();
1196 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
1197
Benny Prijono5accbd02006-03-30 16:32:18 +00001198#if PJ_IOQUEUE_HAS_SAFE_UNREG
1199 /* Check key is not closing */
1200 if (key->closing)
1201 return PJ_ECANCELLED;
1202#endif
1203
Benny Prijono9033e312005-11-21 02:08:39 +00001204 /*
1205 * See if there is a new connection immediately available.
1206 */
1207 sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0);
1208 if (sock != INVALID_SOCKET) {
1209 /* Yes! New socket is available! */
1210 int status;
1211
1212 status = getsockname(sock, local, addrlen);
1213 if (status != 0) {
1214 DWORD dwError = WSAGetLastError();
1215 closesocket(sock);
1216 return PJ_RETURN_OS_ERROR(dwError);
1217 }
1218
1219 *new_sock = sock;
1220 return PJ_SUCCESS;
1221
1222 } else {
1223 DWORD dwError = WSAGetLastError();
1224 if (dwError != WSAEWOULDBLOCK) {
1225 return PJ_RETURN_OS_ERROR(dwError);
1226 }
1227 }
1228
1229 /*
1230 * No connection is immediately available.
1231 * Must schedule an asynchronous operation.
1232 */
1233 op_key_rec = (union operation_key*)op_key->internal__;
1234
Benny Prijono8ab968f2007-07-20 08:08:30 +00001235 status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0,
Benny Prijono9033e312005-11-21 02:08:39 +00001236 &op_key_rec->accept.newsock);
1237 if (status != PJ_SUCCESS)
1238 return status;
1239
1240 /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket
1241 * addresses can be obtained with getsockname() and getpeername().
1242 */
1243 status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET,
1244 SO_UPDATE_ACCEPT_CONTEXT,
1245 (char*)&key->hnd, sizeof(SOCKET));
1246 /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
1247 * So ignore the error status.
1248 */
1249
1250 op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT;
1251 op_key_rec->accept.addrlen = addrlen;
1252 op_key_rec->accept.local = local;
1253 op_key_rec->accept.remote = remote;
1254 op_key_rec->accept.newsock_ptr = new_sock;
Benny Prijonoac623b32006-07-03 15:19:31 +00001255 pj_bzero( &op_key_rec->accept.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +00001256 sizeof(op_key_rec->accept.overlapped));
1257
1258 rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock,
1259 op_key_rec->accept.accept_buf,
1260 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN,
1261 &bytesReceived,
1262 &op_key_rec->accept.overlapped );
1263
1264 if (rc == TRUE) {
1265 ioqueue_on_accept_complete(&op_key_rec->accept);
1266 return PJ_SUCCESS;
1267 } else {
1268 DWORD dwStatus = WSAGetLastError();
1269 if (dwStatus!=WSA_IO_PENDING)
1270 return PJ_STATUS_FROM_OS(dwStatus);
1271 }
1272
1273 /* Asynchronous Accept() has been submitted. */
1274 return PJ_EPENDING;
1275}
1276
1277
1278/*
1279 * pj_ioqueue_connect()
1280 *
1281 * Initiate overlapped connect() operation (well, it's non-blocking actually,
1282 * since there's no overlapped version of connect()).
1283 */
1284PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
1285 const pj_sockaddr_t *addr,
1286 int addrlen )
1287{
1288 HANDLE hEvent;
1289 pj_ioqueue_t *ioqueue;
1290
1291 PJ_CHECK_STACK();
1292 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
1293
Benny Prijono5accbd02006-03-30 16:32:18 +00001294#if PJ_IOQUEUE_HAS_SAFE_UNREG
1295 /* Check key is not closing */
1296 if (key->closing)
1297 return PJ_ECANCELLED;
1298#endif
1299
Benny Prijono9033e312005-11-21 02:08:39 +00001300 /* Initiate connect() */
1301 if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) {
1302 DWORD dwStatus;
1303 dwStatus = WSAGetLastError();
1304 if (dwStatus != WSAEWOULDBLOCK) {
1305 return PJ_RETURN_OS_ERROR(dwStatus);
1306 }
1307 } else {
1308 /* Connect has completed immediately! */
1309 return PJ_SUCCESS;
1310 }
1311
1312 ioqueue = key->ioqueue;
1313
1314 /* Add to the array of connecting socket to be polled */
1315 pj_lock_acquire(ioqueue->lock);
1316
1317 if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) {
1318 pj_lock_release(ioqueue->lock);
1319 return PJ_ETOOMANYCONN;
1320 }
1321
1322 /* Get or create event object. */
1323 if (ioqueue->event_count) {
1324 hEvent = ioqueue->event_pool[ioqueue->event_count - 1];
1325 --ioqueue->event_count;
1326 } else {
1327 hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
1328 if (hEvent == NULL) {
1329 DWORD dwStatus = GetLastError();
1330 pj_lock_release(ioqueue->lock);
1331 return PJ_STATUS_FROM_OS(dwStatus);
1332 }
1333 }
1334
1335 /* Mark key as connecting.
1336 * We can't use array index since key can be removed dynamically.
1337 */
1338 key->connecting = 1;
1339
1340 /* Associate socket events to the event object. */
1341 if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) {
1342 CloseHandle(hEvent);
1343 pj_lock_release(ioqueue->lock);
1344 return PJ_RETURN_OS_ERROR(WSAGetLastError());
1345 }
1346
1347 /* Add to array. */
1348 ioqueue->connecting_keys[ ioqueue->connecting_count ] = key;
1349 ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent;
1350 ioqueue->connecting_count++;
1351
1352 pj_lock_release(ioqueue->lock);
1353
1354 return PJ_EPENDING;
1355}
1356#endif /* #if PJ_HAS_TCP */
1357
1358
1359PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1360 pj_size_t size )
1361{
Benny Prijonoac623b32006-07-03 15:19:31 +00001362 pj_bzero(op_key, size);
Benny Prijono9033e312005-11-21 02:08:39 +00001363}
1364
1365PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1366 pj_ioqueue_op_key_t *op_key )
1367{
1368 BOOL rc;
1369 DWORD bytesTransfered;
1370
1371 rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key,
1372 &bytesTransfered, FALSE );
1373
1374 if (rc == FALSE) {
1375 return GetLastError()==ERROR_IO_INCOMPLETE;
1376 }
1377
1378 return FALSE;
1379}
1380
1381
1382PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1383 pj_ioqueue_op_key_t *op_key,
1384 pj_ssize_t bytes_status )
1385{
1386 BOOL rc;
1387
1388 rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status,
1389 (long)key, (OVERLAPPED*)op_key );
1390 if (rc == FALSE) {
1391 return PJ_RETURN_OS_ERROR(GetLastError());
1392 }
1393
1394 return PJ_SUCCESS;
1395}
1396
Benny Prijonoe3f79fd2008-02-13 15:17:28 +00001397PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
1398 pj_bool_t allow)
1399{
1400 PJ_ASSERT_RETURN(key, PJ_EINVAL);
1401
1402 /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
1403 * disabled.
1404 */
1405 PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
1406
1407 key->allow_concurrent = allow;
1408 return PJ_SUCCESS;
1409}
1410
1411PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
1412{
1413#if PJ_IOQUEUE_HAS_SAFE_UNREG
1414 return pj_mutex_lock(key->mutex);
1415#else
1416 PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
1417#endif
1418}
1419
1420PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
1421{
1422#if PJ_IOQUEUE_HAS_SAFE_UNREG
1423 return pj_mutex_unlock(key->mutex);
1424#else
1425 PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
1426#endif
1427}
1428