blob: a5618e36885439079e05778e04657d04abd7adfd [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
Benny Prijono32177c02008-06-20 22:44:47 +00003 * Copyright (C)2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono9033e312005-11-21 02:08:39 +00004 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19#include <pj/ioqueue.h>
20#include <pj/os.h>
21#include <pj/lock.h>
22#include <pj/pool.h>
23#include <pj/string.h>
24#include <pj/sock.h>
25#include <pj/array.h>
26#include <pj/log.h>
27#include <pj/assert.h>
28#include <pj/errno.h>
Benny Prijono9c025eb2006-07-10 21:35:27 +000029#include <pj/compat/socket.h>
Benny Prijono9033e312005-11-21 02:08:39 +000030
31
32#if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0
33# include <winsock2.h>
34#elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0
35# include <winsock.h>
36#endif
37
38#if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0
39# include <mswsock.h>
40#endif
41
42
43/* The address specified in AcceptEx() must be 16 more than the size of
44 * SOCKADDR (source: MSDN).
45 */
46#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16)
47
48typedef struct generic_overlapped
49{
50 WSAOVERLAPPED overlapped;
51 pj_ioqueue_operation_e operation;
52} generic_overlapped;
53
54/*
55 * OVERLAPPPED structure for send and receive.
56 */
57typedef struct ioqueue_overlapped
58{
59 WSAOVERLAPPED overlapped;
60 pj_ioqueue_operation_e operation;
61 WSABUF wsabuf;
62 pj_sockaddr_in dummy_addr;
63 int dummy_addrlen;
64} ioqueue_overlapped;
65
66#if PJ_HAS_TCP
67/*
68 * OVERLAP structure for accept.
69 */
70typedef struct ioqueue_accept_rec
71{
72 WSAOVERLAPPED overlapped;
73 pj_ioqueue_operation_e operation;
74 pj_sock_t newsock;
75 pj_sock_t *newsock_ptr;
76 int *addrlen;
77 void *remote;
78 void *local;
79 char accept_buf[2 * ACCEPT_ADDR_LEN];
80} ioqueue_accept_rec;
81#endif
82
83/*
84 * Structure to hold pending operation key.
85 */
86union operation_key
87{
88 generic_overlapped generic;
89 ioqueue_overlapped overlapped;
90#if PJ_HAS_TCP
91 ioqueue_accept_rec accept;
92#endif
93};
94
95/* Type of handle in the key. */
96enum handle_type
97{
98 HND_IS_UNKNOWN,
99 HND_IS_FILE,
100 HND_IS_SOCKET,
101};
102
Benny Prijono8d317a02006-03-22 11:49:19 +0000103enum { POST_QUIT_LEN = 0xFFFFDEADUL };
104
Benny Prijono9033e312005-11-21 02:08:39 +0000105/*
106 * Structure for individual socket.
107 */
108struct pj_ioqueue_key_t
109{
Benny Prijono5accbd02006-03-30 16:32:18 +0000110 PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
111
Benny Prijono9033e312005-11-21 02:08:39 +0000112 pj_ioqueue_t *ioqueue;
113 HANDLE hnd;
114 void *user_data;
115 enum handle_type hnd_type;
Benny Prijono5accbd02006-03-30 16:32:18 +0000116 pj_ioqueue_callback cb;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000117 pj_bool_t allow_concurrent;
Benny Prijono5accbd02006-03-30 16:32:18 +0000118
Benny Prijono9033e312005-11-21 02:08:39 +0000119#if PJ_HAS_TCP
120 int connecting;
121#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000122
123#if PJ_IOQUEUE_HAS_SAFE_UNREG
124 pj_atomic_t *ref_count;
125 pj_bool_t closing;
126 pj_time_val free_time;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000127 pj_mutex_t *mutex;
Benny Prijono5accbd02006-03-30 16:32:18 +0000128#endif
129
Benny Prijono9033e312005-11-21 02:08:39 +0000130};
131
132/*
133 * IO Queue structure.
134 */
135struct pj_ioqueue_t
136{
137 HANDLE iocp;
138 pj_lock_t *lock;
139 pj_bool_t auto_delete_lock;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000140 pj_bool_t default_concurrency;
Benny Prijono5accbd02006-03-30 16:32:18 +0000141
142#if PJ_IOQUEUE_HAS_SAFE_UNREG
143 pj_ioqueue_key_t active_list;
144 pj_ioqueue_key_t free_list;
145 pj_ioqueue_key_t closing_list;
146#endif
147
148 /* These are to keep track of connecting sockets */
149#if PJ_HAS_TCP
Benny Prijono9033e312005-11-21 02:08:39 +0000150 unsigned event_count;
151 HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1];
Benny Prijono9033e312005-11-21 02:08:39 +0000152 unsigned connecting_count;
153 HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1];
154 pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1];
155#endif
156};
157
158
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000159#if PJ_IOQUEUE_HAS_SAFE_UNREG
160/* Prototype */
161static void scan_closing_keys(pj_ioqueue_t *ioqueue);
162#endif
163
164
Benny Prijono9033e312005-11-21 02:08:39 +0000165#if PJ_HAS_TCP
166/*
167 * Process the socket when the overlapped accept() completed.
168 */
169static void ioqueue_on_accept_complete(ioqueue_accept_rec *accept_overlapped)
170{
171 struct sockaddr *local;
172 struct sockaddr *remote;
173 int locallen, remotelen;
174
175 PJ_CHECK_STACK();
176
177 /* Operation complete immediately. */
178 GetAcceptExSockaddrs( accept_overlapped->accept_buf,
179 0,
180 ACCEPT_ADDR_LEN,
181 ACCEPT_ADDR_LEN,
182 &local,
183 &locallen,
184 &remote,
185 &remotelen);
Benny Prijono9c025eb2006-07-10 21:35:27 +0000186 if (*accept_overlapped->addrlen >= locallen) {
Benny Prijono9033e312005-11-21 02:08:39 +0000187 pj_memcpy(accept_overlapped->local, local, locallen);
188 pj_memcpy(accept_overlapped->remote, remote, locallen);
189 } else {
Benny Prijonoac623b32006-07-03 15:19:31 +0000190 pj_bzero(accept_overlapped->local, *accept_overlapped->addrlen);
191 pj_bzero(accept_overlapped->remote, *accept_overlapped->addrlen);
Benny Prijono9033e312005-11-21 02:08:39 +0000192 }
193 *accept_overlapped->addrlen = locallen;
194 if (accept_overlapped->newsock_ptr)
195 *accept_overlapped->newsock_ptr = accept_overlapped->newsock;
196 accept_overlapped->operation = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000197}
198
199static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos)
200{
201 pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos];
202 HANDLE hEvent = ioqueue->connecting_handles[pos];
203
204 /* Remove key from array of connecting handles. */
205 pj_array_erase(ioqueue->connecting_keys, sizeof(key),
206 ioqueue->connecting_count, pos);
207 pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE),
208 ioqueue->connecting_count, pos);
209 --ioqueue->connecting_count;
210
211 /* Disassociate the socket from the event. */
212 WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0);
213
214 /* Put event object to pool. */
215 if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) {
216 ioqueue->event_pool[ioqueue->event_count++] = hEvent;
217 } else {
218 /* Shouldn't happen. There should be no more pending connections
219 * than max.
220 */
221 pj_assert(0);
222 CloseHandle(hEvent);
223 }
224
225}
226
227/*
228 * Poll for the completion of non-blocking connect().
229 * If there's a completion, the function return the key of the completed
230 * socket, and 'result' argument contains the connect() result. If connect()
231 * succeeded, 'result' will have value zero, otherwise will have the error
232 * code.
233 */
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000234static int check_connecting( pj_ioqueue_t *ioqueue )
Benny Prijono9033e312005-11-21 02:08:39 +0000235{
Benny Prijono9033e312005-11-21 02:08:39 +0000236 if (ioqueue->connecting_count) {
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000237 int i, count;
238 struct
239 {
240 pj_ioqueue_key_t *key;
241 pj_status_t status;
242 } events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1];
Benny Prijono9033e312005-11-21 02:08:39 +0000243
244 pj_lock_acquire(ioqueue->lock);
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000245 for (count=0; count<PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1; ++count) {
246 DWORD result;
Benny Prijono9033e312005-11-21 02:08:39 +0000247
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000248 result = WaitForMultipleObjects(ioqueue->connecting_count,
249 ioqueue->connecting_handles,
250 FALSE, 0);
251 if (result >= WAIT_OBJECT_0 &&
252 result < WAIT_OBJECT_0+ioqueue->connecting_count)
253 {
254 WSANETWORKEVENTS net_events;
Benny Prijono9033e312005-11-21 02:08:39 +0000255
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000256 /* Got completed connect(). */
257 unsigned pos = result - WAIT_OBJECT_0;
258 events[count].key = ioqueue->connecting_keys[pos];
Benny Prijono9033e312005-11-21 02:08:39 +0000259
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000260 /* See whether connect has succeeded. */
261 WSAEnumNetworkEvents((pj_sock_t)events[count].key->hnd,
262 ioqueue->connecting_handles[pos],
263 &net_events);
264 events[count].status =
265 PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]);
266
267 /* Erase socket from pending connect. */
268 erase_connecting_socket(ioqueue, pos);
269 } else {
270 /* No more events */
271 break;
272 }
Benny Prijono9033e312005-11-21 02:08:39 +0000273 }
274 pj_lock_release(ioqueue->lock);
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000275
276 /* Call callbacks. */
277 for (i=0; i<count; ++i) {
278 if (events[i].key->cb.on_connect_complete) {
279 events[i].key->cb.on_connect_complete(events[i].key,
280 events[i].status);
281 }
282 }
283
284 return count;
Benny Prijono9033e312005-11-21 02:08:39 +0000285 }
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000286
287 return 0;
288
Benny Prijono9033e312005-11-21 02:08:39 +0000289}
290#endif
291
292/*
293 * pj_ioqueue_name()
294 */
295PJ_DEF(const char*) pj_ioqueue_name(void)
296{
297 return "iocp";
298}
299
300/*
301 * pj_ioqueue_create()
302 */
303PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
304 pj_size_t max_fd,
305 pj_ioqueue_t **p_ioqueue)
306{
307 pj_ioqueue_t *ioqueue;
Benny Prijono5accbd02006-03-30 16:32:18 +0000308 unsigned i;
Benny Prijono9033e312005-11-21 02:08:39 +0000309 pj_status_t rc;
310
311 PJ_UNUSED_ARG(max_fd);
312 PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL);
313
314 rc = sizeof(union operation_key);
315
316 /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */
317 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
318 sizeof(union operation_key), PJ_EBUG);
319
Benny Prijono5accbd02006-03-30 16:32:18 +0000320 /* Create IOCP */
Benny Prijono9033e312005-11-21 02:08:39 +0000321 ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue));
322 ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
323 if (ioqueue->iocp == NULL)
324 return PJ_RETURN_OS_ERROR(GetLastError());
325
Benny Prijono5accbd02006-03-30 16:32:18 +0000326 /* Create IOCP mutex */
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000327 rc = pj_lock_create_recursive_mutex(pool, NULL, &ioqueue->lock);
Benny Prijono9033e312005-11-21 02:08:39 +0000328 if (rc != PJ_SUCCESS) {
329 CloseHandle(ioqueue->iocp);
330 return rc;
331 }
332
333 ioqueue->auto_delete_lock = PJ_TRUE;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000334 ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
Benny Prijono9033e312005-11-21 02:08:39 +0000335
Benny Prijono5accbd02006-03-30 16:32:18 +0000336#if PJ_IOQUEUE_HAS_SAFE_UNREG
337 /*
338 * Create and initialize key pools.
339 */
340 pj_list_init(&ioqueue->active_list);
341 pj_list_init(&ioqueue->free_list);
342 pj_list_init(&ioqueue->closing_list);
343
344 /* Preallocate keys according to max_fd setting, and put them
345 * in free_list.
346 */
347 for (i=0; i<max_fd; ++i) {
348 pj_ioqueue_key_t *key;
349
350 key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t));
351
352 rc = pj_atomic_create(pool, 0, &key->ref_count);
353 if (rc != PJ_SUCCESS) {
354 key = ioqueue->free_list.next;
355 while (key != &ioqueue->free_list) {
356 pj_atomic_destroy(key->ref_count);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000357 pj_mutex_destroy(key->mutex);
358 key = key->next;
359 }
360 CloseHandle(ioqueue->iocp);
361 return rc;
362 }
363
364 rc = pj_mutex_create_recursive(pool, "ioqkey", &key->mutex);
365 if (rc != PJ_SUCCESS) {
366 pj_atomic_destroy(key->ref_count);
367 key = ioqueue->free_list.next;
368 while (key != &ioqueue->free_list) {
369 pj_atomic_destroy(key->ref_count);
370 pj_mutex_destroy(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000371 key = key->next;
372 }
373 CloseHandle(ioqueue->iocp);
374 return rc;
375 }
376
377 pj_list_push_back(&ioqueue->free_list, key);
Benny Prijono5accbd02006-03-30 16:32:18 +0000378 }
379#endif
380
Benny Prijono9033e312005-11-21 02:08:39 +0000381 *p_ioqueue = ioqueue;
382
383 PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue));
384 return PJ_SUCCESS;
385}
386
387/*
388 * pj_ioqueue_destroy()
389 */
390PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
391{
Benny Prijono3569c0d2007-04-06 10:29:20 +0000392#if PJ_HAS_TCP
Benny Prijono9033e312005-11-21 02:08:39 +0000393 unsigned i;
Benny Prijono3569c0d2007-04-06 10:29:20 +0000394#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000395 pj_ioqueue_key_t *key;
Benny Prijono9033e312005-11-21 02:08:39 +0000396
397 PJ_CHECK_STACK();
398 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
399
Benny Prijono5accbd02006-03-30 16:32:18 +0000400 pj_lock_acquire(ioqueue->lock);
401
402#if PJ_HAS_TCP
Benny Prijono9033e312005-11-21 02:08:39 +0000403 /* Destroy events in the pool */
404 for (i=0; i<ioqueue->event_count; ++i) {
405 CloseHandle(ioqueue->event_pool[i]);
406 }
407 ioqueue->event_count = 0;
Benny Prijono5accbd02006-03-30 16:32:18 +0000408#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000409
410 if (CloseHandle(ioqueue->iocp) != TRUE)
411 return PJ_RETURN_OS_ERROR(GetLastError());
412
Benny Prijono5accbd02006-03-30 16:32:18 +0000413#if PJ_IOQUEUE_HAS_SAFE_UNREG
414 /* Destroy reference counters */
415 key = ioqueue->active_list.next;
416 while (key != &ioqueue->active_list) {
417 pj_atomic_destroy(key->ref_count);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000418 pj_mutex_destroy(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000419 key = key->next;
420 }
421
422 key = ioqueue->closing_list.next;
423 while (key != &ioqueue->closing_list) {
424 pj_atomic_destroy(key->ref_count);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000425 pj_mutex_destroy(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000426 key = key->next;
427 }
428
429 key = ioqueue->free_list.next;
430 while (key != &ioqueue->free_list) {
431 pj_atomic_destroy(key->ref_count);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000432 pj_mutex_destroy(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000433 key = key->next;
434 }
435#endif
436
Benny Prijono9033e312005-11-21 02:08:39 +0000437 if (ioqueue->auto_delete_lock)
438 pj_lock_destroy(ioqueue->lock);
439
440 return PJ_SUCCESS;
441}
442
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000443
444PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue,
445 pj_bool_t allow)
446{
447 PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
448 ioqueue->default_concurrency = allow;
449 return PJ_SUCCESS;
450}
451
Benny Prijono9033e312005-11-21 02:08:39 +0000452/*
453 * pj_ioqueue_set_lock()
454 */
455PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
456 pj_lock_t *lock,
457 pj_bool_t auto_delete )
458{
459 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
460
461 if (ioqueue->auto_delete_lock) {
462 pj_lock_destroy(ioqueue->lock);
463 }
464
465 ioqueue->lock = lock;
466 ioqueue->auto_delete_lock = auto_delete;
467
468 return PJ_SUCCESS;
469}
470
471/*
472 * pj_ioqueue_register_sock()
473 */
474PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
475 pj_ioqueue_t *ioqueue,
476 pj_sock_t sock,
477 void *user_data,
478 const pj_ioqueue_callback *cb,
479 pj_ioqueue_key_t **key )
480{
481 HANDLE hioq;
482 pj_ioqueue_key_t *rec;
483 u_long value;
484 int rc;
485
486 PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL);
487
Benny Prijono5accbd02006-03-30 16:32:18 +0000488 pj_lock_acquire(ioqueue->lock);
489
490#if PJ_IOQUEUE_HAS_SAFE_UNREG
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000491 /* Scan closing list first to release unused keys.
492 * Must do this with lock acquired.
493 */
494 scan_closing_keys(ioqueue);
495
Benny Prijono5accbd02006-03-30 16:32:18 +0000496 /* If safe unregistration is used, then get the key record from
497 * the free list.
498 */
499 if (pj_list_empty(&ioqueue->free_list)) {
500 pj_lock_release(ioqueue->lock);
501 return PJ_ETOOMANY;
502 }
503
504 rec = ioqueue->free_list.next;
505 pj_list_erase(rec);
506
507 /* Set initial reference count to 1 */
508 pj_assert(pj_atomic_get(rec->ref_count) == 0);
509 pj_atomic_inc(rec->ref_count);
510
511 rec->closing = 0;
512
513#else
Benny Prijono9033e312005-11-21 02:08:39 +0000514 rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
Benny Prijono5accbd02006-03-30 16:32:18 +0000515#endif
516
517 /* Build the key for this socket. */
Benny Prijono9033e312005-11-21 02:08:39 +0000518 rec->ioqueue = ioqueue;
519 rec->hnd = (HANDLE)sock;
520 rec->hnd_type = HND_IS_SOCKET;
521 rec->user_data = user_data;
522 pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
523
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000524 /* Set concurrency for this handle */
525 rc = pj_ioqueue_set_concurrency(rec, ioqueue->default_concurrency);
526 if (rc != PJ_SUCCESS) {
527 pj_lock_release(ioqueue->lock);
528 return rc;
529 }
530
Benny Prijono5accbd02006-03-30 16:32:18 +0000531#if PJ_HAS_TCP
532 rec->connecting = 0;
533#endif
534
Benny Prijono9033e312005-11-21 02:08:39 +0000535 /* Set socket to nonblocking. */
536 value = 1;
537 rc = ioctlsocket(sock, FIONBIO, &value);
538 if (rc != 0) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000539 pj_lock_release(ioqueue->lock);
Benny Prijono9033e312005-11-21 02:08:39 +0000540 return PJ_RETURN_OS_ERROR(WSAGetLastError());
541 }
542
543 /* Associate with IOCP */
544 hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0);
545 if (!hioq) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000546 pj_lock_release(ioqueue->lock);
Benny Prijono9033e312005-11-21 02:08:39 +0000547 return PJ_RETURN_OS_ERROR(GetLastError());
548 }
549
550 *key = rec;
Benny Prijono5accbd02006-03-30 16:32:18 +0000551
552#if PJ_IOQUEUE_HAS_SAFE_UNREG
553 pj_list_push_back(&ioqueue->active_list, rec);
554#endif
555
556 pj_lock_release(ioqueue->lock);
557
Benny Prijono9033e312005-11-21 02:08:39 +0000558 return PJ_SUCCESS;
559}
560
Benny Prijono9033e312005-11-21 02:08:39 +0000561
562/*
563 * pj_ioqueue_get_user_data()
564 */
565PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
566{
567 PJ_ASSERT_RETURN(key, NULL);
568 return key->user_data;
569}
570
571/*
572 * pj_ioqueue_set_user_data()
573 */
574PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
575 void *user_data,
576 void **old_data )
577{
578 PJ_ASSERT_RETURN(key, PJ_EINVAL);
579
580 if (old_data)
581 *old_data = key->user_data;
582
583 key->user_data = user_data;
584 return PJ_SUCCESS;
585}
586
Benny Prijono8d317a02006-03-22 11:49:19 +0000587
Benny Prijono5accbd02006-03-30 16:32:18 +0000588#if PJ_IOQUEUE_HAS_SAFE_UNREG
589/* Decrement the key's reference counter, and when the counter reach zero,
590 * destroy the key.
591 */
592static void decrement_counter(pj_ioqueue_key_t *key)
593{
594 if (pj_atomic_dec_and_get(key->ref_count) == 0) {
595
596 pj_lock_acquire(key->ioqueue->lock);
597
598 pj_assert(key->closing == 1);
599 pj_gettimeofday(&key->free_time);
600 key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
601 pj_time_val_normalize(&key->free_time);
602
603 pj_list_erase(key);
604 pj_list_push_back(&key->ioqueue->closing_list, key);
605
606 pj_lock_release(key->ioqueue->lock);
607 }
608}
609#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000610
Benny Prijono9033e312005-11-21 02:08:39 +0000611/*
Benny Prijono5accbd02006-03-30 16:32:18 +0000612 * Poll the I/O Completion Port, execute callback,
Benny Prijono8d317a02006-03-22 11:49:19 +0000613 * and return the key and bytes transfered of the last operation.
Benny Prijono9033e312005-11-21 02:08:39 +0000614 */
Benny Prijono8d317a02006-03-22 11:49:19 +0000615static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
616 pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key )
Benny Prijono9033e312005-11-21 02:08:39 +0000617{
Benny Prijono8d317a02006-03-22 11:49:19 +0000618 DWORD dwBytesTransfered, dwKey;
Benny Prijono9033e312005-11-21 02:08:39 +0000619 generic_overlapped *pOv;
620 pj_ioqueue_key_t *key;
Benny Prijono4f2be312005-11-21 17:01:06 +0000621 pj_ssize_t size_status = -1;
Benny Prijono8d317a02006-03-22 11:49:19 +0000622 BOOL rcGetQueued;
Benny Prijono9033e312005-11-21 02:08:39 +0000623
624 /* Poll for completion status. */
Benny Prijono8d317a02006-03-22 11:49:19 +0000625 rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransfered,
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000626 &dwKey, (OVERLAPPED**)&pOv,
Benny Prijono8d317a02006-03-22 11:49:19 +0000627 dwTimeout);
Benny Prijono9033e312005-11-21 02:08:39 +0000628
629 /* The return value is:
630 * - nonzero if event was dequeued.
631 * - zero and pOv==NULL if no event was dequeued.
632 * - zero and pOv!=NULL if event for failed I/O was dequeued.
633 */
634 if (pOv) {
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000635 pj_bool_t has_lock;
636
Benny Prijono9033e312005-11-21 02:08:39 +0000637 /* Event was dequeued for either successfull or failed I/O */
638 key = (pj_ioqueue_key_t*)dwKey;
639 size_status = dwBytesTransfered;
Benny Prijono8d317a02006-03-22 11:49:19 +0000640
641 /* Report to caller regardless */
642 if (p_bytes)
643 *p_bytes = size_status;
644 if (p_key)
645 *p_key = key;
646
Benny Prijono5accbd02006-03-30 16:32:18 +0000647#if PJ_IOQUEUE_HAS_SAFE_UNREG
648 /* We shouldn't call callbacks if key is quitting. */
649 if (key->closing)
Benny Prijono8d317a02006-03-22 11:49:19 +0000650 return PJ_TRUE;
Benny Prijono8d317a02006-03-22 11:49:19 +0000651
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000652 /* If concurrency is disabled, lock the key
653 * (and save the lock status to local var since app may change
654 * concurrency setting while in the callback) */
655 if (key->allow_concurrent == PJ_FALSE) {
656 pj_mutex_lock(key->mutex);
657 has_lock = PJ_TRUE;
658 } else {
659 has_lock = PJ_FALSE;
660 }
661
662 /* Now that we get the lock, check again that key is not closing */
663 if (key->closing) {
664 if (has_lock) {
665 pj_mutex_unlock(key->mutex);
666 }
667 return PJ_TRUE;
668 }
669
Benny Prijono5accbd02006-03-30 16:32:18 +0000670 /* Increment reference counter to prevent this key from being
671 * deleted
Benny Prijono8d317a02006-03-22 11:49:19 +0000672 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000673 pj_atomic_inc(key->ref_count);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000674#else
675 PJ_UNUSED_ARG(has_lock);
Benny Prijono5accbd02006-03-30 16:32:18 +0000676#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000677
678 /* Carry out the callback */
Benny Prijono9033e312005-11-21 02:08:39 +0000679 switch (pOv->operation) {
680 case PJ_IOQUEUE_OP_READ:
681 case PJ_IOQUEUE_OP_RECV:
682 case PJ_IOQUEUE_OP_RECV_FROM:
683 pOv->operation = 0;
684 if (key->cb.on_read_complete)
685 key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv,
686 size_status);
687 break;
688 case PJ_IOQUEUE_OP_WRITE:
689 case PJ_IOQUEUE_OP_SEND:
690 case PJ_IOQUEUE_OP_SEND_TO:
691 pOv->operation = 0;
692 if (key->cb.on_write_complete)
693 key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv,
694 size_status);
695 break;
696#if PJ_HAS_TCP
697 case PJ_IOQUEUE_OP_ACCEPT:
698 /* special case for accept. */
699 ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv);
700 if (key->cb.on_accept_complete) {
701 ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv;
Benny Prijono9c025eb2006-07-10 21:35:27 +0000702 pj_status_t status = PJ_SUCCESS;
703
704 if (accept_rec->newsock == PJ_INVALID_SOCKET) {
705 int dwError = WSAGetLastError();
706 if (dwError == 0) dwError = OSERR_ENOTCONN;
707 status = PJ_RETURN_OS_ERROR(dwError);
708 }
709
Benny Prijono9033e312005-11-21 02:08:39 +0000710 key->cb.on_accept_complete(key,
711 (pj_ioqueue_op_key_t*)pOv,
712 accept_rec->newsock,
Benny Prijono9c025eb2006-07-10 21:35:27 +0000713 status);
Benny Prijono01de33b2006-06-28 15:23:18 +0000714 accept_rec->newsock = PJ_INVALID_SOCKET;
Benny Prijono9033e312005-11-21 02:08:39 +0000715 }
716 break;
717 case PJ_IOQUEUE_OP_CONNECT:
718#endif
719 case PJ_IOQUEUE_OP_NONE:
720 pj_assert(0);
721 break;
722 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000723
724#if PJ_IOQUEUE_HAS_SAFE_UNREG
725 decrement_counter(key);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000726 if (has_lock)
727 pj_mutex_unlock(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000728#endif
729
Benny Prijono8d317a02006-03-22 11:49:19 +0000730 return PJ_TRUE;
Benny Prijono9033e312005-11-21 02:08:39 +0000731 }
732
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000733 /* No event was queued. */
Benny Prijono8d317a02006-03-22 11:49:19 +0000734 return PJ_FALSE;
735}
736
737/*
738 * pj_ioqueue_unregister()
739 */
740PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
741{
Benny Prijono9fe1ad22007-02-17 00:30:24 +0000742 unsigned i;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000743 pj_bool_t has_lock;
Benny Prijono9fe1ad22007-02-17 00:30:24 +0000744 enum { RETRY = 10 };
745
Benny Prijono8d317a02006-03-22 11:49:19 +0000746 PJ_ASSERT_RETURN(key, PJ_EINVAL);
747
748#if PJ_HAS_TCP
749 if (key->connecting) {
750 unsigned pos;
751 pj_ioqueue_t *ioqueue;
752
753 ioqueue = key->ioqueue;
754
755 /* Erase from connecting_handles */
756 pj_lock_acquire(ioqueue->lock);
757 for (pos=0; pos < ioqueue->connecting_count; ++pos) {
758 if (ioqueue->connecting_keys[pos] == key) {
759 erase_connecting_socket(ioqueue, pos);
760 break;
761 }
762 }
763 key->connecting = 0;
764 pj_lock_release(ioqueue->lock);
765 }
766#endif
Benny Prijono08beac62006-11-23 07:31:27 +0000767
768#if PJ_IOQUEUE_HAS_SAFE_UNREG
769 /* Mark key as closing before closing handle. */
770 key->closing = 1;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000771
772 /* If concurrency is disabled, wait until the key has finished
773 * processing the callback
774 */
775 if (key->allow_concurrent == PJ_FALSE) {
776 pj_mutex_lock(key->mutex);
777 has_lock = PJ_TRUE;
778 } else {
779 has_lock = PJ_FALSE;
780 }
781#else
782 PJ_UNUSED_ARG(has_lock);
Benny Prijono08beac62006-11-23 07:31:27 +0000783#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000784
785 /* Close handle (the only way to disassociate handle from IOCP).
786 * We also need to close handle to make sure that no further events
787 * will come to the handle.
Benny Prijono8d317a02006-03-22 11:49:19 +0000788 */
Benny Prijono6d9ee8d2008-07-18 10:33:09 +0000789 /* Update 2008/07/18 (http://trac.pjsip.org/repos/ticket/575):
790 * - It seems that CloseHandle() in itself does not actually close
791 * the socket (i.e. it will still appear in "netstat" output). Also
792 * if we only use CloseHandle(), an "Invalid Handle" exception will
793 * be raised in WSACleanup().
794 * - MSDN documentation says that CloseHandle() must be called after
795 * closesocket() call (see
796 * http://msdn.microsoft.com/en-us/library/ms724211(VS.85).aspx).
797 * But turns out that this will raise "Invalid Handle" exception
798 * in debug mode.
799 * So because of this, we replaced CloseHandle() with closesocket()
800 * instead. These was tested on WinXP SP2.
801 */
802 //CloseHandle(key->hnd);
803 pj_sock_close((pj_sock_t)key->hnd);
Benny Prijono8d317a02006-03-22 11:49:19 +0000804
Benny Prijono5accbd02006-03-30 16:32:18 +0000805 /* Reset callbacks */
Benny Prijono8d317a02006-03-22 11:49:19 +0000806 key->cb.on_accept_complete = NULL;
807 key->cb.on_connect_complete = NULL;
Benny Prijono5accbd02006-03-30 16:32:18 +0000808 key->cb.on_read_complete = NULL;
809 key->cb.on_write_complete = NULL;
Benny Prijono8d317a02006-03-22 11:49:19 +0000810
Benny Prijono5accbd02006-03-30 16:32:18 +0000811#if PJ_IOQUEUE_HAS_SAFE_UNREG
Benny Prijono5accbd02006-03-30 16:32:18 +0000812 /* Even after handle is closed, I suspect that IOCP may still try to
813 * do something with the handle, causing memory corruption when pool
814 * debugging is enabled.
815 *
816 * Forcing context switch seems to have fixed that, but this is quite
817 * an ugly solution..
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000818 *
819 * Update 2008/02/13:
820 * This should not happen if concurrency is disallowed for the key.
821 * So at least application has a solution for this (i.e. by disallowing
822 * concurrency in the key).
Benny Prijono8d317a02006-03-22 11:49:19 +0000823 */
Benny Prijono9fe1ad22007-02-17 00:30:24 +0000824 //This will loop forever if unregistration is done on the callback.
825 //Doing this with RETRY I think should solve the IOCP setting the
826 //socket signalled, without causing the deadlock.
827 //while (pj_atomic_get(key->ref_count) != 1)
828 // pj_thread_sleep(0);
829 for (i=0; pj_atomic_get(key->ref_count) != 1 && i<RETRY; ++i)
Benny Prijono08beac62006-11-23 07:31:27 +0000830 pj_thread_sleep(0);
831
832 /* Decrement reference counter to destroy the key. */
833 decrement_counter(key);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000834
835 if (has_lock)
836 pj_mutex_unlock(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000837#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000838
839 return PJ_SUCCESS;
840}
841
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000842#if PJ_IOQUEUE_HAS_SAFE_UNREG
843/* Scan the closing list, and put pending closing keys to free list.
844 * Must do this with ioqueue mutex held.
845 */
846static void scan_closing_keys(pj_ioqueue_t *ioqueue)
847{
848 if (!pj_list_empty(&ioqueue->closing_list)) {
849 pj_time_val now;
850 pj_ioqueue_key_t *key;
851
852 pj_gettimeofday(&now);
853
854 /* Move closing keys to free list when they've finished the closing
855 * idle time.
856 */
857 key = ioqueue->closing_list.next;
858 while (key != &ioqueue->closing_list) {
859 pj_ioqueue_key_t *next = key->next;
860
861 pj_assert(key->closing != 0);
862
863 if (PJ_TIME_VAL_GTE(now, key->free_time)) {
864 pj_list_erase(key);
865 pj_list_push_back(&ioqueue->free_list, key);
866 }
867 key = next;
868 }
869 }
870}
871#endif
872
Benny Prijono8d317a02006-03-22 11:49:19 +0000873/*
874 * pj_ioqueue_poll()
875 *
876 * Poll for events.
877 */
878PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
879{
880 DWORD dwMsec;
Benny Prijono3569c0d2007-04-06 10:29:20 +0000881#if PJ_HAS_TCP
Benny Prijono8d317a02006-03-22 11:49:19 +0000882 int connect_count = 0;
Benny Prijono3569c0d2007-04-06 10:29:20 +0000883#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000884 int event_count = 0;
Benny Prijono8d317a02006-03-22 11:49:19 +0000885
886 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
887
Benny Prijono8d317a02006-03-22 11:49:19 +0000888 /* Calculate miliseconds timeout for GetQueuedCompletionStatus */
889 dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
890
891 /* Poll for completion status. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000892 event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);
893
894#if PJ_HAS_TCP
895 /* Check the connecting array, only when there's no activity. */
896 if (event_count == 0) {
897 connect_count = check_connecting(ioqueue);
898 if (connect_count > 0)
899 event_count += connect_count;
900 }
901#endif
902
903#if PJ_IOQUEUE_HAS_SAFE_UNREG
904 /* Check the closing keys only when there's no activity and when there are
905 * pending closing keys.
906 */
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000907 if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000908 pj_lock_acquire(ioqueue->lock);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000909 scan_closing_keys(ioqueue);
Benny Prijono5accbd02006-03-30 16:32:18 +0000910 pj_lock_release(ioqueue->lock);
911 }
912#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000913
914 /* Return number of events. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000915 return event_count;
Benny Prijono9033e312005-11-21 02:08:39 +0000916}
917
918/*
919 * pj_ioqueue_recv()
920 *
921 * Initiate overlapped WSARecv() operation.
922 */
923PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
924 pj_ioqueue_op_key_t *op_key,
925 void *buffer,
926 pj_ssize_t *length,
927 pj_uint32_t flags )
928{
929 /*
930 * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and
931 * addrlen here. But unfortunately it generates EINVAL... :-(
932 * -bennylp
933 */
934 int rc;
935 DWORD bytesRead;
936 DWORD dwFlags = 0;
937 union operation_key *op_key_rec;
938
939 PJ_CHECK_STACK();
940 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
941
Benny Prijono5accbd02006-03-30 16:32:18 +0000942#if PJ_IOQUEUE_HAS_SAFE_UNREG
943 /* Check key is not closing */
944 if (key->closing)
945 return PJ_ECANCELLED;
946#endif
947
Benny Prijono9033e312005-11-21 02:08:39 +0000948 op_key_rec = (union operation_key*)op_key->internal__;
949 op_key_rec->overlapped.wsabuf.buf = buffer;
950 op_key_rec->overlapped.wsabuf.len = *length;
951
952 dwFlags = flags;
953
954 /* Try non-overlapped received first to see if data is
955 * immediately available.
956 */
957 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
958 rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
959 &bytesRead, &dwFlags, NULL, NULL);
960 if (rc == 0) {
961 *length = bytesRead;
962 return PJ_SUCCESS;
963 } else {
964 DWORD dwError = WSAGetLastError();
965 if (dwError != WSAEWOULDBLOCK) {
966 *length = -1;
967 return PJ_RETURN_OS_ERROR(dwError);
968 }
969 }
970 }
971
972 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
973
974 /*
975 * No immediate data available.
976 * Register overlapped Recv() operation.
977 */
Benny Prijonoac623b32006-07-03 15:19:31 +0000978 pj_bzero( &op_key_rec->overlapped.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +0000979 sizeof(op_key_rec->overlapped.overlapped));
980 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
981
982 rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
983 &bytesRead, &dwFlags,
984 &op_key_rec->overlapped.overlapped, NULL);
985 if (rc == SOCKET_ERROR) {
986 DWORD dwStatus = WSAGetLastError();
987 if (dwStatus!=WSA_IO_PENDING) {
988 *length = -1;
989 return PJ_STATUS_FROM_OS(dwStatus);
990 }
991 }
992
993 /* Pending operation has been scheduled. */
994 return PJ_EPENDING;
995}
996
997/*
998 * pj_ioqueue_recvfrom()
999 *
1000 * Initiate overlapped RecvFrom() operation.
1001 */
1002PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
1003 pj_ioqueue_op_key_t *op_key,
1004 void *buffer,
1005 pj_ssize_t *length,
1006 pj_uint32_t flags,
1007 pj_sockaddr_t *addr,
1008 int *addrlen)
1009{
1010 int rc;
1011 DWORD bytesRead;
1012 DWORD dwFlags = 0;
1013 union operation_key *op_key_rec;
1014
1015 PJ_CHECK_STACK();
1016 PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
1017
Benny Prijono5accbd02006-03-30 16:32:18 +00001018#if PJ_IOQUEUE_HAS_SAFE_UNREG
1019 /* Check key is not closing */
1020 if (key->closing)
1021 return PJ_ECANCELLED;
1022#endif
1023
Benny Prijono9033e312005-11-21 02:08:39 +00001024 op_key_rec = (union operation_key*)op_key->internal__;
1025 op_key_rec->overlapped.wsabuf.buf = buffer;
1026 op_key_rec->overlapped.wsabuf.len = *length;
1027
1028 dwFlags = flags;
1029
1030 /* Try non-overlapped received first to see if data is
1031 * immediately available.
1032 */
1033 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
1034 rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1035 &bytesRead, &dwFlags, addr, addrlen, NULL, NULL);
1036 if (rc == 0) {
1037 *length = bytesRead;
1038 return PJ_SUCCESS;
1039 } else {
1040 DWORD dwError = WSAGetLastError();
1041 if (dwError != WSAEWOULDBLOCK) {
1042 *length = -1;
1043 return PJ_RETURN_OS_ERROR(dwError);
1044 }
1045 }
1046 }
1047
1048 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
1049
1050 /*
1051 * No immediate data available.
1052 * Register overlapped Recv() operation.
1053 */
Benny Prijonoac623b32006-07-03 15:19:31 +00001054 pj_bzero( &op_key_rec->overlapped.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +00001055 sizeof(op_key_rec->overlapped.overlapped));
1056 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
1057
1058 rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1059 &bytesRead, &dwFlags, addr, addrlen,
1060 &op_key_rec->overlapped.overlapped, NULL);
1061 if (rc == SOCKET_ERROR) {
1062 DWORD dwStatus = WSAGetLastError();
1063 if (dwStatus!=WSA_IO_PENDING) {
1064 *length = -1;
1065 return PJ_STATUS_FROM_OS(dwStatus);
1066 }
1067 }
1068
1069 /* Pending operation has been scheduled. */
1070 return PJ_EPENDING;
1071}
1072
1073/*
1074 * pj_ioqueue_send()
1075 *
1076 * Initiate overlapped Send operation.
1077 */
1078PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
1079 pj_ioqueue_op_key_t *op_key,
1080 const void *data,
1081 pj_ssize_t *length,
1082 pj_uint32_t flags )
1083{
1084 return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0);
1085}
1086
1087
1088/*
1089 * pj_ioqueue_sendto()
1090 *
1091 * Initiate overlapped SendTo operation.
1092 */
1093PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
1094 pj_ioqueue_op_key_t *op_key,
1095 const void *data,
1096 pj_ssize_t *length,
1097 pj_uint32_t flags,
1098 const pj_sockaddr_t *addr,
1099 int addrlen)
1100{
1101 int rc;
1102 DWORD bytesWritten;
1103 DWORD dwFlags;
1104 union operation_key *op_key_rec;
1105
1106 PJ_CHECK_STACK();
1107 PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);
Benny Prijono5accbd02006-03-30 16:32:18 +00001108
1109#if PJ_IOQUEUE_HAS_SAFE_UNREG
1110 /* Check key is not closing */
1111 if (key->closing)
1112 return PJ_ECANCELLED;
1113#endif
1114
Benny Prijono9033e312005-11-21 02:08:39 +00001115 op_key_rec = (union operation_key*)op_key->internal__;
1116
1117 /*
1118 * First try blocking write.
1119 */
1120 op_key_rec->overlapped.wsabuf.buf = (void*)data;
1121 op_key_rec->overlapped.wsabuf.len = *length;
1122
1123 dwFlags = flags;
1124
1125 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
1126 rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1127 &bytesWritten, dwFlags, addr, addrlen,
1128 NULL, NULL);
1129 if (rc == 0) {
1130 *length = bytesWritten;
1131 return PJ_SUCCESS;
1132 } else {
1133 DWORD dwStatus = WSAGetLastError();
1134 if (dwStatus != WSAEWOULDBLOCK) {
1135 *length = -1;
1136 return PJ_RETURN_OS_ERROR(dwStatus);
1137 }
1138 }
1139 }
1140
1141 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
1142
1143 /*
1144 * Data can't be sent immediately.
1145 * Schedule asynchronous WSASend().
1146 */
Benny Prijonoac623b32006-07-03 15:19:31 +00001147 pj_bzero( &op_key_rec->overlapped.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +00001148 sizeof(op_key_rec->overlapped.overlapped));
1149 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND;
1150
1151 rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1152 &bytesWritten, dwFlags, addr, addrlen,
1153 &op_key_rec->overlapped.overlapped, NULL);
1154 if (rc == SOCKET_ERROR) {
1155 DWORD dwStatus = WSAGetLastError();
1156 if (dwStatus!=WSA_IO_PENDING)
1157 return PJ_STATUS_FROM_OS(dwStatus);
1158 }
1159
1160 /* Asynchronous operation successfully submitted. */
1161 return PJ_EPENDING;
1162}
1163
1164#if PJ_HAS_TCP
1165
1166/*
1167 * pj_ioqueue_accept()
1168 *
1169 * Initiate overlapped accept() operation.
1170 */
1171PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
1172 pj_ioqueue_op_key_t *op_key,
1173 pj_sock_t *new_sock,
1174 pj_sockaddr_t *local,
1175 pj_sockaddr_t *remote,
1176 int *addrlen)
1177{
1178 BOOL rc;
1179 DWORD bytesReceived;
1180 pj_status_t status;
1181 union operation_key *op_key_rec;
1182 SOCKET sock;
1183
1184 PJ_CHECK_STACK();
1185 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
1186
Benny Prijono5accbd02006-03-30 16:32:18 +00001187#if PJ_IOQUEUE_HAS_SAFE_UNREG
1188 /* Check key is not closing */
1189 if (key->closing)
1190 return PJ_ECANCELLED;
1191#endif
1192
Benny Prijono9033e312005-11-21 02:08:39 +00001193 /*
1194 * See if there is a new connection immediately available.
1195 */
1196 sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0);
1197 if (sock != INVALID_SOCKET) {
1198 /* Yes! New socket is available! */
1199 int status;
1200
1201 status = getsockname(sock, local, addrlen);
1202 if (status != 0) {
1203 DWORD dwError = WSAGetLastError();
1204 closesocket(sock);
1205 return PJ_RETURN_OS_ERROR(dwError);
1206 }
1207
1208 *new_sock = sock;
1209 return PJ_SUCCESS;
1210
1211 } else {
1212 DWORD dwError = WSAGetLastError();
1213 if (dwError != WSAEWOULDBLOCK) {
1214 return PJ_RETURN_OS_ERROR(dwError);
1215 }
1216 }
1217
1218 /*
1219 * No connection is immediately available.
1220 * Must schedule an asynchronous operation.
1221 */
1222 op_key_rec = (union operation_key*)op_key->internal__;
1223
Benny Prijono8ab968f2007-07-20 08:08:30 +00001224 status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0,
Benny Prijono9033e312005-11-21 02:08:39 +00001225 &op_key_rec->accept.newsock);
1226 if (status != PJ_SUCCESS)
1227 return status;
1228
1229 /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket
1230 * addresses can be obtained with getsockname() and getpeername().
1231 */
1232 status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET,
1233 SO_UPDATE_ACCEPT_CONTEXT,
1234 (char*)&key->hnd, sizeof(SOCKET));
1235 /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
1236 * So ignore the error status.
1237 */
1238
1239 op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT;
1240 op_key_rec->accept.addrlen = addrlen;
1241 op_key_rec->accept.local = local;
1242 op_key_rec->accept.remote = remote;
1243 op_key_rec->accept.newsock_ptr = new_sock;
Benny Prijonoac623b32006-07-03 15:19:31 +00001244 pj_bzero( &op_key_rec->accept.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +00001245 sizeof(op_key_rec->accept.overlapped));
1246
1247 rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock,
1248 op_key_rec->accept.accept_buf,
1249 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN,
1250 &bytesReceived,
1251 &op_key_rec->accept.overlapped );
1252
1253 if (rc == TRUE) {
1254 ioqueue_on_accept_complete(&op_key_rec->accept);
1255 return PJ_SUCCESS;
1256 } else {
1257 DWORD dwStatus = WSAGetLastError();
1258 if (dwStatus!=WSA_IO_PENDING)
1259 return PJ_STATUS_FROM_OS(dwStatus);
1260 }
1261
1262 /* Asynchronous Accept() has been submitted. */
1263 return PJ_EPENDING;
1264}
1265
1266
1267/*
1268 * pj_ioqueue_connect()
1269 *
1270 * Initiate overlapped connect() operation (well, it's non-blocking actually,
1271 * since there's no overlapped version of connect()).
1272 */
1273PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
1274 const pj_sockaddr_t *addr,
1275 int addrlen )
1276{
1277 HANDLE hEvent;
1278 pj_ioqueue_t *ioqueue;
1279
1280 PJ_CHECK_STACK();
1281 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
1282
Benny Prijono5accbd02006-03-30 16:32:18 +00001283#if PJ_IOQUEUE_HAS_SAFE_UNREG
1284 /* Check key is not closing */
1285 if (key->closing)
1286 return PJ_ECANCELLED;
1287#endif
1288
Benny Prijono9033e312005-11-21 02:08:39 +00001289 /* Initiate connect() */
1290 if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) {
1291 DWORD dwStatus;
1292 dwStatus = WSAGetLastError();
1293 if (dwStatus != WSAEWOULDBLOCK) {
1294 return PJ_RETURN_OS_ERROR(dwStatus);
1295 }
1296 } else {
1297 /* Connect has completed immediately! */
1298 return PJ_SUCCESS;
1299 }
1300
1301 ioqueue = key->ioqueue;
1302
1303 /* Add to the array of connecting socket to be polled */
1304 pj_lock_acquire(ioqueue->lock);
1305
1306 if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) {
1307 pj_lock_release(ioqueue->lock);
1308 return PJ_ETOOMANYCONN;
1309 }
1310
1311 /* Get or create event object. */
1312 if (ioqueue->event_count) {
1313 hEvent = ioqueue->event_pool[ioqueue->event_count - 1];
1314 --ioqueue->event_count;
1315 } else {
1316 hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
1317 if (hEvent == NULL) {
1318 DWORD dwStatus = GetLastError();
1319 pj_lock_release(ioqueue->lock);
1320 return PJ_STATUS_FROM_OS(dwStatus);
1321 }
1322 }
1323
1324 /* Mark key as connecting.
1325 * We can't use array index since key can be removed dynamically.
1326 */
1327 key->connecting = 1;
1328
1329 /* Associate socket events to the event object. */
1330 if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) {
1331 CloseHandle(hEvent);
1332 pj_lock_release(ioqueue->lock);
1333 return PJ_RETURN_OS_ERROR(WSAGetLastError());
1334 }
1335
1336 /* Add to array. */
1337 ioqueue->connecting_keys[ ioqueue->connecting_count ] = key;
1338 ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent;
1339 ioqueue->connecting_count++;
1340
1341 pj_lock_release(ioqueue->lock);
1342
1343 return PJ_EPENDING;
1344}
1345#endif /* #if PJ_HAS_TCP */
1346
1347
1348PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1349 pj_size_t size )
1350{
Benny Prijonoac623b32006-07-03 15:19:31 +00001351 pj_bzero(op_key, size);
Benny Prijono9033e312005-11-21 02:08:39 +00001352}
1353
1354PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1355 pj_ioqueue_op_key_t *op_key )
1356{
1357 BOOL rc;
1358 DWORD bytesTransfered;
1359
1360 rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key,
1361 &bytesTransfered, FALSE );
1362
1363 if (rc == FALSE) {
1364 return GetLastError()==ERROR_IO_INCOMPLETE;
1365 }
1366
1367 return FALSE;
1368}
1369
1370
1371PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1372 pj_ioqueue_op_key_t *op_key,
1373 pj_ssize_t bytes_status )
1374{
1375 BOOL rc;
1376
1377 rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status,
1378 (long)key, (OVERLAPPED*)op_key );
1379 if (rc == FALSE) {
1380 return PJ_RETURN_OS_ERROR(GetLastError());
1381 }
1382
1383 return PJ_SUCCESS;
1384}
1385
Benny Prijonoe3f79fd2008-02-13 15:17:28 +00001386PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
1387 pj_bool_t allow)
1388{
1389 PJ_ASSERT_RETURN(key, PJ_EINVAL);
1390
1391 /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
1392 * disabled.
1393 */
1394 PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
1395
1396 key->allow_concurrent = allow;
1397 return PJ_SUCCESS;
1398}
1399
1400PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
1401{
1402#if PJ_IOQUEUE_HAS_SAFE_UNREG
1403 return pj_mutex_lock(key->mutex);
1404#else
1405 PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
1406#endif
1407}
1408
1409PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
1410{
1411#if PJ_IOQUEUE_HAS_SAFE_UNREG
1412 return pj_mutex_unlock(key->mutex);
1413#else
1414 PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
1415#endif
1416}
1417