blob: 34924779b552ac4060675633a4b0797960813773 [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
Benny Prijono844653c2008-12-23 17:27:53 +00003 * Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono9033e312005-11-21 02:08:39 +00005 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20#include <pj/ioqueue.h>
21#include <pj/os.h>
22#include <pj/lock.h>
23#include <pj/pool.h>
24#include <pj/string.h>
25#include <pj/sock.h>
26#include <pj/array.h>
27#include <pj/log.h>
28#include <pj/assert.h>
29#include <pj/errno.h>
Benny Prijono9c025eb2006-07-10 21:35:27 +000030#include <pj/compat/socket.h>
Benny Prijono9033e312005-11-21 02:08:39 +000031
32
33#if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0
34# include <winsock2.h>
35#elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0
36# include <winsock.h>
37#endif
38
39#if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0
40# include <mswsock.h>
41#endif
42
43
44/* The address specified in AcceptEx() must be 16 more than the size of
45 * SOCKADDR (source: MSDN).
46 */
47#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16)
48
49typedef struct generic_overlapped
50{
51 WSAOVERLAPPED overlapped;
52 pj_ioqueue_operation_e operation;
53} generic_overlapped;
54
55/*
56 * OVERLAPPPED structure for send and receive.
57 */
58typedef struct ioqueue_overlapped
59{
60 WSAOVERLAPPED overlapped;
61 pj_ioqueue_operation_e operation;
62 WSABUF wsabuf;
63 pj_sockaddr_in dummy_addr;
64 int dummy_addrlen;
65} ioqueue_overlapped;
66
67#if PJ_HAS_TCP
68/*
69 * OVERLAP structure for accept.
70 */
71typedef struct ioqueue_accept_rec
72{
73 WSAOVERLAPPED overlapped;
74 pj_ioqueue_operation_e operation;
75 pj_sock_t newsock;
76 pj_sock_t *newsock_ptr;
77 int *addrlen;
78 void *remote;
79 void *local;
80 char accept_buf[2 * ACCEPT_ADDR_LEN];
81} ioqueue_accept_rec;
82#endif
83
84/*
85 * Structure to hold pending operation key.
86 */
87union operation_key
88{
89 generic_overlapped generic;
90 ioqueue_overlapped overlapped;
91#if PJ_HAS_TCP
92 ioqueue_accept_rec accept;
93#endif
94};
95
96/* Type of handle in the key. */
97enum handle_type
98{
99 HND_IS_UNKNOWN,
100 HND_IS_FILE,
101 HND_IS_SOCKET,
102};
103
Benny Prijono8d317a02006-03-22 11:49:19 +0000104enum { POST_QUIT_LEN = 0xFFFFDEADUL };
105
Benny Prijono9033e312005-11-21 02:08:39 +0000106/*
107 * Structure for individual socket.
108 */
109struct pj_ioqueue_key_t
110{
Benny Prijono5accbd02006-03-30 16:32:18 +0000111 PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
112
Benny Prijono9033e312005-11-21 02:08:39 +0000113 pj_ioqueue_t *ioqueue;
114 HANDLE hnd;
115 void *user_data;
116 enum handle_type hnd_type;
Benny Prijono5accbd02006-03-30 16:32:18 +0000117 pj_ioqueue_callback cb;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000118 pj_bool_t allow_concurrent;
Benny Prijono5accbd02006-03-30 16:32:18 +0000119
Benny Prijono9033e312005-11-21 02:08:39 +0000120#if PJ_HAS_TCP
121 int connecting;
122#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000123
124#if PJ_IOQUEUE_HAS_SAFE_UNREG
125 pj_atomic_t *ref_count;
126 pj_bool_t closing;
127 pj_time_val free_time;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000128 pj_mutex_t *mutex;
Benny Prijono5accbd02006-03-30 16:32:18 +0000129#endif
130
Benny Prijono9033e312005-11-21 02:08:39 +0000131};
132
133/*
134 * IO Queue structure.
135 */
136struct pj_ioqueue_t
137{
138 HANDLE iocp;
139 pj_lock_t *lock;
140 pj_bool_t auto_delete_lock;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000141 pj_bool_t default_concurrency;
Benny Prijono5accbd02006-03-30 16:32:18 +0000142
143#if PJ_IOQUEUE_HAS_SAFE_UNREG
144 pj_ioqueue_key_t active_list;
145 pj_ioqueue_key_t free_list;
146 pj_ioqueue_key_t closing_list;
147#endif
148
149 /* These are to keep track of connecting sockets */
150#if PJ_HAS_TCP
Benny Prijono9033e312005-11-21 02:08:39 +0000151 unsigned event_count;
152 HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1];
Benny Prijono9033e312005-11-21 02:08:39 +0000153 unsigned connecting_count;
154 HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1];
155 pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1];
156#endif
157};
158
159
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000160#if PJ_IOQUEUE_HAS_SAFE_UNREG
161/* Prototype */
162static void scan_closing_keys(pj_ioqueue_t *ioqueue);
163#endif
164
165
Benny Prijono9033e312005-11-21 02:08:39 +0000166#if PJ_HAS_TCP
167/*
168 * Process the socket when the overlapped accept() completed.
169 */
Nanang Izzuddinb2c75292010-01-08 13:08:05 +0000170static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
171 ioqueue_accept_rec *accept_overlapped)
Benny Prijono9033e312005-11-21 02:08:39 +0000172{
173 struct sockaddr *local;
174 struct sockaddr *remote;
175 int locallen, remotelen;
Nanang Izzuddinb2c75292010-01-08 13:08:05 +0000176 pj_status_t status;
Benny Prijono9033e312005-11-21 02:08:39 +0000177
178 PJ_CHECK_STACK();
179
Nanang Izzuddinb2c75292010-01-08 13:08:05 +0000180 /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket
181 * addresses can be obtained with getsockname() and getpeername().
182 */
183 status = setsockopt(accept_overlapped->newsock, SOL_SOCKET,
184 SO_UPDATE_ACCEPT_CONTEXT,
185 (char*)&key->hnd,
186 sizeof(SOCKET));
187 /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
188 * So ignore the error status.
189 */
190
Benny Prijono9033e312005-11-21 02:08:39 +0000191 /* Operation complete immediately. */
Benny Prijono25c8f932008-08-26 14:41:26 +0000192 if (accept_overlapped->addrlen) {
193 GetAcceptExSockaddrs( accept_overlapped->accept_buf,
194 0,
195 ACCEPT_ADDR_LEN,
196 ACCEPT_ADDR_LEN,
197 &local,
198 &locallen,
199 &remote,
200 &remotelen);
201 if (*accept_overlapped->addrlen >= locallen) {
202 if (accept_overlapped->local)
203 pj_memcpy(accept_overlapped->local, local, locallen);
204 if (accept_overlapped->remote)
205 pj_memcpy(accept_overlapped->remote, remote, locallen);
206 } else {
207 if (accept_overlapped->local)
208 pj_bzero(accept_overlapped->local,
209 *accept_overlapped->addrlen);
210 if (accept_overlapped->remote)
211 pj_bzero(accept_overlapped->remote,
212 *accept_overlapped->addrlen);
213 }
214
215 *accept_overlapped->addrlen = locallen;
Benny Prijono9033e312005-11-21 02:08:39 +0000216 }
Benny Prijono9033e312005-11-21 02:08:39 +0000217 if (accept_overlapped->newsock_ptr)
218 *accept_overlapped->newsock_ptr = accept_overlapped->newsock;
219 accept_overlapped->operation = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000220}
221
222static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos)
223{
224 pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos];
225 HANDLE hEvent = ioqueue->connecting_handles[pos];
226
227 /* Remove key from array of connecting handles. */
228 pj_array_erase(ioqueue->connecting_keys, sizeof(key),
229 ioqueue->connecting_count, pos);
230 pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE),
231 ioqueue->connecting_count, pos);
232 --ioqueue->connecting_count;
233
234 /* Disassociate the socket from the event. */
235 WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0);
236
237 /* Put event object to pool. */
238 if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) {
239 ioqueue->event_pool[ioqueue->event_count++] = hEvent;
240 } else {
241 /* Shouldn't happen. There should be no more pending connections
242 * than max.
243 */
244 pj_assert(0);
245 CloseHandle(hEvent);
246 }
247
248}
249
250/*
251 * Poll for the completion of non-blocking connect().
252 * If there's a completion, the function return the key of the completed
253 * socket, and 'result' argument contains the connect() result. If connect()
254 * succeeded, 'result' will have value zero, otherwise will have the error
255 * code.
256 */
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000257static int check_connecting( pj_ioqueue_t *ioqueue )
Benny Prijono9033e312005-11-21 02:08:39 +0000258{
Benny Prijono9033e312005-11-21 02:08:39 +0000259 if (ioqueue->connecting_count) {
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000260 int i, count;
261 struct
262 {
263 pj_ioqueue_key_t *key;
264 pj_status_t status;
265 } events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1];
Benny Prijono9033e312005-11-21 02:08:39 +0000266
267 pj_lock_acquire(ioqueue->lock);
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000268 for (count=0; count<PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1; ++count) {
269 DWORD result;
Benny Prijono9033e312005-11-21 02:08:39 +0000270
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000271 result = WaitForMultipleObjects(ioqueue->connecting_count,
272 ioqueue->connecting_handles,
273 FALSE, 0);
274 if (result >= WAIT_OBJECT_0 &&
275 result < WAIT_OBJECT_0+ioqueue->connecting_count)
276 {
277 WSANETWORKEVENTS net_events;
Benny Prijono9033e312005-11-21 02:08:39 +0000278
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000279 /* Got completed connect(). */
280 unsigned pos = result - WAIT_OBJECT_0;
281 events[count].key = ioqueue->connecting_keys[pos];
Benny Prijono9033e312005-11-21 02:08:39 +0000282
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000283 /* See whether connect has succeeded. */
284 WSAEnumNetworkEvents((pj_sock_t)events[count].key->hnd,
285 ioqueue->connecting_handles[pos],
286 &net_events);
287 events[count].status =
288 PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]);
289
290 /* Erase socket from pending connect. */
291 erase_connecting_socket(ioqueue, pos);
292 } else {
293 /* No more events */
294 break;
295 }
Benny Prijono9033e312005-11-21 02:08:39 +0000296 }
297 pj_lock_release(ioqueue->lock);
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000298
299 /* Call callbacks. */
300 for (i=0; i<count; ++i) {
301 if (events[i].key->cb.on_connect_complete) {
302 events[i].key->cb.on_connect_complete(events[i].key,
303 events[i].status);
304 }
305 }
306
307 return count;
Benny Prijono9033e312005-11-21 02:08:39 +0000308 }
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000309
310 return 0;
311
Benny Prijono9033e312005-11-21 02:08:39 +0000312}
313#endif
314
315/*
316 * pj_ioqueue_name()
317 */
318PJ_DEF(const char*) pj_ioqueue_name(void)
319{
320 return "iocp";
321}
322
323/*
324 * pj_ioqueue_create()
325 */
326PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
327 pj_size_t max_fd,
328 pj_ioqueue_t **p_ioqueue)
329{
330 pj_ioqueue_t *ioqueue;
Benny Prijono5accbd02006-03-30 16:32:18 +0000331 unsigned i;
Benny Prijono9033e312005-11-21 02:08:39 +0000332 pj_status_t rc;
333
334 PJ_UNUSED_ARG(max_fd);
335 PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL);
336
337 rc = sizeof(union operation_key);
338
339 /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */
340 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
341 sizeof(union operation_key), PJ_EBUG);
342
Benny Prijono5accbd02006-03-30 16:32:18 +0000343 /* Create IOCP */
Benny Prijono9033e312005-11-21 02:08:39 +0000344 ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue));
345 ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
346 if (ioqueue->iocp == NULL)
347 return PJ_RETURN_OS_ERROR(GetLastError());
348
Benny Prijono5accbd02006-03-30 16:32:18 +0000349 /* Create IOCP mutex */
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000350 rc = pj_lock_create_recursive_mutex(pool, NULL, &ioqueue->lock);
Benny Prijono9033e312005-11-21 02:08:39 +0000351 if (rc != PJ_SUCCESS) {
352 CloseHandle(ioqueue->iocp);
353 return rc;
354 }
355
356 ioqueue->auto_delete_lock = PJ_TRUE;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000357 ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
Benny Prijono9033e312005-11-21 02:08:39 +0000358
Benny Prijono5accbd02006-03-30 16:32:18 +0000359#if PJ_IOQUEUE_HAS_SAFE_UNREG
360 /*
361 * Create and initialize key pools.
362 */
363 pj_list_init(&ioqueue->active_list);
364 pj_list_init(&ioqueue->free_list);
365 pj_list_init(&ioqueue->closing_list);
366
367 /* Preallocate keys according to max_fd setting, and put them
368 * in free_list.
369 */
370 for (i=0; i<max_fd; ++i) {
371 pj_ioqueue_key_t *key;
372
373 key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t));
374
375 rc = pj_atomic_create(pool, 0, &key->ref_count);
376 if (rc != PJ_SUCCESS) {
377 key = ioqueue->free_list.next;
378 while (key != &ioqueue->free_list) {
379 pj_atomic_destroy(key->ref_count);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000380 pj_mutex_destroy(key->mutex);
381 key = key->next;
382 }
383 CloseHandle(ioqueue->iocp);
384 return rc;
385 }
386
387 rc = pj_mutex_create_recursive(pool, "ioqkey", &key->mutex);
388 if (rc != PJ_SUCCESS) {
389 pj_atomic_destroy(key->ref_count);
390 key = ioqueue->free_list.next;
391 while (key != &ioqueue->free_list) {
392 pj_atomic_destroy(key->ref_count);
393 pj_mutex_destroy(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000394 key = key->next;
395 }
396 CloseHandle(ioqueue->iocp);
397 return rc;
398 }
399
400 pj_list_push_back(&ioqueue->free_list, key);
Benny Prijono5accbd02006-03-30 16:32:18 +0000401 }
402#endif
403
Benny Prijono9033e312005-11-21 02:08:39 +0000404 *p_ioqueue = ioqueue;
405
406 PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue));
407 return PJ_SUCCESS;
408}
409
410/*
411 * pj_ioqueue_destroy()
412 */
413PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
414{
Benny Prijono3569c0d2007-04-06 10:29:20 +0000415#if PJ_HAS_TCP
Benny Prijono9033e312005-11-21 02:08:39 +0000416 unsigned i;
Benny Prijono3569c0d2007-04-06 10:29:20 +0000417#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000418 pj_ioqueue_key_t *key;
Benny Prijono9033e312005-11-21 02:08:39 +0000419
420 PJ_CHECK_STACK();
421 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
422
Benny Prijono5accbd02006-03-30 16:32:18 +0000423 pj_lock_acquire(ioqueue->lock);
424
425#if PJ_HAS_TCP
Benny Prijono9033e312005-11-21 02:08:39 +0000426 /* Destroy events in the pool */
427 for (i=0; i<ioqueue->event_count; ++i) {
428 CloseHandle(ioqueue->event_pool[i]);
429 }
430 ioqueue->event_count = 0;
Benny Prijono5accbd02006-03-30 16:32:18 +0000431#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000432
433 if (CloseHandle(ioqueue->iocp) != TRUE)
434 return PJ_RETURN_OS_ERROR(GetLastError());
435
Benny Prijono5accbd02006-03-30 16:32:18 +0000436#if PJ_IOQUEUE_HAS_SAFE_UNREG
437 /* Destroy reference counters */
438 key = ioqueue->active_list.next;
439 while (key != &ioqueue->active_list) {
440 pj_atomic_destroy(key->ref_count);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000441 pj_mutex_destroy(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000442 key = key->next;
443 }
444
445 key = ioqueue->closing_list.next;
446 while (key != &ioqueue->closing_list) {
447 pj_atomic_destroy(key->ref_count);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000448 pj_mutex_destroy(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000449 key = key->next;
450 }
451
452 key = ioqueue->free_list.next;
453 while (key != &ioqueue->free_list) {
454 pj_atomic_destroy(key->ref_count);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000455 pj_mutex_destroy(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000456 key = key->next;
457 }
458#endif
459
Benny Prijono9033e312005-11-21 02:08:39 +0000460 if (ioqueue->auto_delete_lock)
461 pj_lock_destroy(ioqueue->lock);
462
463 return PJ_SUCCESS;
464}
465
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000466
467PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue,
468 pj_bool_t allow)
469{
470 PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
471 ioqueue->default_concurrency = allow;
472 return PJ_SUCCESS;
473}
474
Benny Prijono9033e312005-11-21 02:08:39 +0000475/*
476 * pj_ioqueue_set_lock()
477 */
478PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
479 pj_lock_t *lock,
480 pj_bool_t auto_delete )
481{
482 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
483
484 if (ioqueue->auto_delete_lock) {
485 pj_lock_destroy(ioqueue->lock);
486 }
487
488 ioqueue->lock = lock;
489 ioqueue->auto_delete_lock = auto_delete;
490
491 return PJ_SUCCESS;
492}
493
494/*
495 * pj_ioqueue_register_sock()
496 */
497PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
498 pj_ioqueue_t *ioqueue,
499 pj_sock_t sock,
500 void *user_data,
501 const pj_ioqueue_callback *cb,
502 pj_ioqueue_key_t **key )
503{
504 HANDLE hioq;
505 pj_ioqueue_key_t *rec;
506 u_long value;
507 int rc;
508
509 PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL);
510
Benny Prijono5accbd02006-03-30 16:32:18 +0000511 pj_lock_acquire(ioqueue->lock);
512
513#if PJ_IOQUEUE_HAS_SAFE_UNREG
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000514 /* Scan closing list first to release unused keys.
515 * Must do this with lock acquired.
516 */
517 scan_closing_keys(ioqueue);
518
Benny Prijono5accbd02006-03-30 16:32:18 +0000519 /* If safe unregistration is used, then get the key record from
520 * the free list.
521 */
522 if (pj_list_empty(&ioqueue->free_list)) {
523 pj_lock_release(ioqueue->lock);
524 return PJ_ETOOMANY;
525 }
526
527 rec = ioqueue->free_list.next;
528 pj_list_erase(rec);
529
530 /* Set initial reference count to 1 */
531 pj_assert(pj_atomic_get(rec->ref_count) == 0);
532 pj_atomic_inc(rec->ref_count);
533
534 rec->closing = 0;
535
536#else
Benny Prijono9033e312005-11-21 02:08:39 +0000537 rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
Benny Prijono5accbd02006-03-30 16:32:18 +0000538#endif
539
540 /* Build the key for this socket. */
Benny Prijono9033e312005-11-21 02:08:39 +0000541 rec->ioqueue = ioqueue;
542 rec->hnd = (HANDLE)sock;
543 rec->hnd_type = HND_IS_SOCKET;
544 rec->user_data = user_data;
545 pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
546
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000547 /* Set concurrency for this handle */
548 rc = pj_ioqueue_set_concurrency(rec, ioqueue->default_concurrency);
549 if (rc != PJ_SUCCESS) {
550 pj_lock_release(ioqueue->lock);
551 return rc;
552 }
553
Benny Prijono5accbd02006-03-30 16:32:18 +0000554#if PJ_HAS_TCP
555 rec->connecting = 0;
556#endif
557
Benny Prijono9033e312005-11-21 02:08:39 +0000558 /* Set socket to nonblocking. */
559 value = 1;
560 rc = ioctlsocket(sock, FIONBIO, &value);
561 if (rc != 0) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000562 pj_lock_release(ioqueue->lock);
Benny Prijono9033e312005-11-21 02:08:39 +0000563 return PJ_RETURN_OS_ERROR(WSAGetLastError());
564 }
565
566 /* Associate with IOCP */
567 hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0);
568 if (!hioq) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000569 pj_lock_release(ioqueue->lock);
Benny Prijono9033e312005-11-21 02:08:39 +0000570 return PJ_RETURN_OS_ERROR(GetLastError());
571 }
572
573 *key = rec;
Benny Prijono5accbd02006-03-30 16:32:18 +0000574
575#if PJ_IOQUEUE_HAS_SAFE_UNREG
576 pj_list_push_back(&ioqueue->active_list, rec);
577#endif
578
579 pj_lock_release(ioqueue->lock);
580
Benny Prijono9033e312005-11-21 02:08:39 +0000581 return PJ_SUCCESS;
582}
583
Benny Prijono9033e312005-11-21 02:08:39 +0000584
585/*
586 * pj_ioqueue_get_user_data()
587 */
588PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
589{
590 PJ_ASSERT_RETURN(key, NULL);
591 return key->user_data;
592}
593
594/*
595 * pj_ioqueue_set_user_data()
596 */
597PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
598 void *user_data,
599 void **old_data )
600{
601 PJ_ASSERT_RETURN(key, PJ_EINVAL);
602
603 if (old_data)
604 *old_data = key->user_data;
605
606 key->user_data = user_data;
607 return PJ_SUCCESS;
608}
609
Benny Prijono8d317a02006-03-22 11:49:19 +0000610
Benny Prijono5accbd02006-03-30 16:32:18 +0000611#if PJ_IOQUEUE_HAS_SAFE_UNREG
612/* Decrement the key's reference counter, and when the counter reach zero,
613 * destroy the key.
614 */
615static void decrement_counter(pj_ioqueue_key_t *key)
616{
617 if (pj_atomic_dec_and_get(key->ref_count) == 0) {
618
619 pj_lock_acquire(key->ioqueue->lock);
620
621 pj_assert(key->closing == 1);
Sauw Ming59d2c8c2011-03-16 09:22:24 +0000622 pj_gettickcount(&key->free_time);
Benny Prijono5accbd02006-03-30 16:32:18 +0000623 key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
624 pj_time_val_normalize(&key->free_time);
625
626 pj_list_erase(key);
627 pj_list_push_back(&key->ioqueue->closing_list, key);
628
629 pj_lock_release(key->ioqueue->lock);
630 }
631}
632#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000633
Benny Prijono9033e312005-11-21 02:08:39 +0000634/*
Benny Prijono5accbd02006-03-30 16:32:18 +0000635 * Poll the I/O Completion Port, execute callback,
Benny Prijono8d317a02006-03-22 11:49:19 +0000636 * and return the key and bytes transfered of the last operation.
Benny Prijono9033e312005-11-21 02:08:39 +0000637 */
Benny Prijono8d317a02006-03-22 11:49:19 +0000638static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
639 pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key )
Benny Prijono9033e312005-11-21 02:08:39 +0000640{
Benny Prijono8d317a02006-03-22 11:49:19 +0000641 DWORD dwBytesTransfered, dwKey;
Benny Prijono9033e312005-11-21 02:08:39 +0000642 generic_overlapped *pOv;
643 pj_ioqueue_key_t *key;
Benny Prijono4f2be312005-11-21 17:01:06 +0000644 pj_ssize_t size_status = -1;
Benny Prijono8d317a02006-03-22 11:49:19 +0000645 BOOL rcGetQueued;
Benny Prijono9033e312005-11-21 02:08:39 +0000646
647 /* Poll for completion status. */
Benny Prijono8d317a02006-03-22 11:49:19 +0000648 rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransfered,
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000649 &dwKey, (OVERLAPPED**)&pOv,
Benny Prijono8d317a02006-03-22 11:49:19 +0000650 dwTimeout);
Benny Prijono9033e312005-11-21 02:08:39 +0000651
652 /* The return value is:
653 * - nonzero if event was dequeued.
654 * - zero and pOv==NULL if no event was dequeued.
655 * - zero and pOv!=NULL if event for failed I/O was dequeued.
656 */
657 if (pOv) {
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000658 pj_bool_t has_lock;
659
Benny Prijono9033e312005-11-21 02:08:39 +0000660 /* Event was dequeued for either successfull or failed I/O */
661 key = (pj_ioqueue_key_t*)dwKey;
662 size_status = dwBytesTransfered;
Benny Prijono8d317a02006-03-22 11:49:19 +0000663
664 /* Report to caller regardless */
665 if (p_bytes)
666 *p_bytes = size_status;
667 if (p_key)
668 *p_key = key;
669
Benny Prijono5accbd02006-03-30 16:32:18 +0000670#if PJ_IOQUEUE_HAS_SAFE_UNREG
671 /* We shouldn't call callbacks if key is quitting. */
672 if (key->closing)
Benny Prijono8d317a02006-03-22 11:49:19 +0000673 return PJ_TRUE;
Benny Prijono8d317a02006-03-22 11:49:19 +0000674
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000675 /* If concurrency is disabled, lock the key
676 * (and save the lock status to local var since app may change
677 * concurrency setting while in the callback) */
678 if (key->allow_concurrent == PJ_FALSE) {
679 pj_mutex_lock(key->mutex);
680 has_lock = PJ_TRUE;
681 } else {
682 has_lock = PJ_FALSE;
683 }
684
685 /* Now that we get the lock, check again that key is not closing */
686 if (key->closing) {
687 if (has_lock) {
688 pj_mutex_unlock(key->mutex);
689 }
690 return PJ_TRUE;
691 }
692
Benny Prijono5accbd02006-03-30 16:32:18 +0000693 /* Increment reference counter to prevent this key from being
694 * deleted
Benny Prijono8d317a02006-03-22 11:49:19 +0000695 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000696 pj_atomic_inc(key->ref_count);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000697#else
698 PJ_UNUSED_ARG(has_lock);
Benny Prijono5accbd02006-03-30 16:32:18 +0000699#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000700
701 /* Carry out the callback */
Benny Prijono9033e312005-11-21 02:08:39 +0000702 switch (pOv->operation) {
703 case PJ_IOQUEUE_OP_READ:
704 case PJ_IOQUEUE_OP_RECV:
705 case PJ_IOQUEUE_OP_RECV_FROM:
706 pOv->operation = 0;
707 if (key->cb.on_read_complete)
708 key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv,
709 size_status);
710 break;
711 case PJ_IOQUEUE_OP_WRITE:
712 case PJ_IOQUEUE_OP_SEND:
713 case PJ_IOQUEUE_OP_SEND_TO:
714 pOv->operation = 0;
715 if (key->cb.on_write_complete)
716 key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv,
717 size_status);
718 break;
719#if PJ_HAS_TCP
720 case PJ_IOQUEUE_OP_ACCEPT:
721 /* special case for accept. */
Nanang Izzuddinb2c75292010-01-08 13:08:05 +0000722 ioqueue_on_accept_complete(key, (ioqueue_accept_rec*)pOv);
Benny Prijono9033e312005-11-21 02:08:39 +0000723 if (key->cb.on_accept_complete) {
724 ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv;
Benny Prijono9c025eb2006-07-10 21:35:27 +0000725 pj_status_t status = PJ_SUCCESS;
Benny Prijono25c8f932008-08-26 14:41:26 +0000726 pj_sock_t newsock;
Benny Prijono9c025eb2006-07-10 21:35:27 +0000727
Benny Prijono25c8f932008-08-26 14:41:26 +0000728 newsock = accept_rec->newsock;
729 accept_rec->newsock = PJ_INVALID_SOCKET;
730
731 if (newsock == PJ_INVALID_SOCKET) {
Benny Prijono9c025eb2006-07-10 21:35:27 +0000732 int dwError = WSAGetLastError();
733 if (dwError == 0) dwError = OSERR_ENOTCONN;
734 status = PJ_RETURN_OS_ERROR(dwError);
735 }
736
Benny Prijono25c8f932008-08-26 14:41:26 +0000737 key->cb.on_accept_complete(key, (pj_ioqueue_op_key_t*)pOv,
738 newsock, status);
739
Benny Prijono9033e312005-11-21 02:08:39 +0000740 }
741 break;
742 case PJ_IOQUEUE_OP_CONNECT:
743#endif
744 case PJ_IOQUEUE_OP_NONE:
745 pj_assert(0);
746 break;
747 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000748
749#if PJ_IOQUEUE_HAS_SAFE_UNREG
750 decrement_counter(key);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000751 if (has_lock)
752 pj_mutex_unlock(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000753#endif
754
Benny Prijono8d317a02006-03-22 11:49:19 +0000755 return PJ_TRUE;
Benny Prijono9033e312005-11-21 02:08:39 +0000756 }
757
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000758 /* No event was queued. */
Benny Prijono8d317a02006-03-22 11:49:19 +0000759 return PJ_FALSE;
760}
761
762/*
763 * pj_ioqueue_unregister()
764 */
765PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
766{
Benny Prijono9fe1ad22007-02-17 00:30:24 +0000767 unsigned i;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000768 pj_bool_t has_lock;
Benny Prijono9fe1ad22007-02-17 00:30:24 +0000769 enum { RETRY = 10 };
770
Benny Prijono8d317a02006-03-22 11:49:19 +0000771 PJ_ASSERT_RETURN(key, PJ_EINVAL);
772
773#if PJ_HAS_TCP
774 if (key->connecting) {
775 unsigned pos;
776 pj_ioqueue_t *ioqueue;
777
778 ioqueue = key->ioqueue;
779
780 /* Erase from connecting_handles */
781 pj_lock_acquire(ioqueue->lock);
782 for (pos=0; pos < ioqueue->connecting_count; ++pos) {
783 if (ioqueue->connecting_keys[pos] == key) {
784 erase_connecting_socket(ioqueue, pos);
785 break;
786 }
787 }
788 key->connecting = 0;
789 pj_lock_release(ioqueue->lock);
790 }
791#endif
Benny Prijono08beac62006-11-23 07:31:27 +0000792
793#if PJ_IOQUEUE_HAS_SAFE_UNREG
794 /* Mark key as closing before closing handle. */
795 key->closing = 1;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000796
797 /* If concurrency is disabled, wait until the key has finished
798 * processing the callback
799 */
800 if (key->allow_concurrent == PJ_FALSE) {
801 pj_mutex_lock(key->mutex);
802 has_lock = PJ_TRUE;
803 } else {
804 has_lock = PJ_FALSE;
805 }
806#else
807 PJ_UNUSED_ARG(has_lock);
Benny Prijono08beac62006-11-23 07:31:27 +0000808#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000809
810 /* Close handle (the only way to disassociate handle from IOCP).
811 * We also need to close handle to make sure that no further events
812 * will come to the handle.
Benny Prijono8d317a02006-03-22 11:49:19 +0000813 */
Benny Prijono6d9ee8d2008-07-18 10:33:09 +0000814 /* Update 2008/07/18 (http://trac.pjsip.org/repos/ticket/575):
815 * - It seems that CloseHandle() in itself does not actually close
816 * the socket (i.e. it will still appear in "netstat" output). Also
817 * if we only use CloseHandle(), an "Invalid Handle" exception will
818 * be raised in WSACleanup().
819 * - MSDN documentation says that CloseHandle() must be called after
820 * closesocket() call (see
821 * http://msdn.microsoft.com/en-us/library/ms724211(VS.85).aspx).
822 * But turns out that this will raise "Invalid Handle" exception
823 * in debug mode.
824 * So because of this, we replaced CloseHandle() with closesocket()
825 * instead. These was tested on WinXP SP2.
826 */
827 //CloseHandle(key->hnd);
828 pj_sock_close((pj_sock_t)key->hnd);
Benny Prijono8d317a02006-03-22 11:49:19 +0000829
Benny Prijono5accbd02006-03-30 16:32:18 +0000830 /* Reset callbacks */
Benny Prijono8d317a02006-03-22 11:49:19 +0000831 key->cb.on_accept_complete = NULL;
832 key->cb.on_connect_complete = NULL;
Benny Prijono5accbd02006-03-30 16:32:18 +0000833 key->cb.on_read_complete = NULL;
834 key->cb.on_write_complete = NULL;
Benny Prijono8d317a02006-03-22 11:49:19 +0000835
Benny Prijono5accbd02006-03-30 16:32:18 +0000836#if PJ_IOQUEUE_HAS_SAFE_UNREG
Benny Prijono5accbd02006-03-30 16:32:18 +0000837 /* Even after handle is closed, I suspect that IOCP may still try to
838 * do something with the handle, causing memory corruption when pool
839 * debugging is enabled.
840 *
841 * Forcing context switch seems to have fixed that, but this is quite
842 * an ugly solution..
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000843 *
844 * Update 2008/02/13:
845 * This should not happen if concurrency is disallowed for the key.
846 * So at least application has a solution for this (i.e. by disallowing
847 * concurrency in the key).
Benny Prijono8d317a02006-03-22 11:49:19 +0000848 */
Benny Prijono9fe1ad22007-02-17 00:30:24 +0000849 //This will loop forever if unregistration is done on the callback.
850 //Doing this with RETRY I think should solve the IOCP setting the
851 //socket signalled, without causing the deadlock.
852 //while (pj_atomic_get(key->ref_count) != 1)
853 // pj_thread_sleep(0);
854 for (i=0; pj_atomic_get(key->ref_count) != 1 && i<RETRY; ++i)
Benny Prijono08beac62006-11-23 07:31:27 +0000855 pj_thread_sleep(0);
856
857 /* Decrement reference counter to destroy the key. */
858 decrement_counter(key);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000859
860 if (has_lock)
861 pj_mutex_unlock(key->mutex);
Benny Prijono5accbd02006-03-30 16:32:18 +0000862#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000863
864 return PJ_SUCCESS;
865}
866
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000867#if PJ_IOQUEUE_HAS_SAFE_UNREG
868/* Scan the closing list, and put pending closing keys to free list.
869 * Must do this with ioqueue mutex held.
870 */
871static void scan_closing_keys(pj_ioqueue_t *ioqueue)
872{
873 if (!pj_list_empty(&ioqueue->closing_list)) {
874 pj_time_val now;
875 pj_ioqueue_key_t *key;
876
Sauw Ming59d2c8c2011-03-16 09:22:24 +0000877 pj_gettickcount(&now);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000878
879 /* Move closing keys to free list when they've finished the closing
880 * idle time.
881 */
882 key = ioqueue->closing_list.next;
883 while (key != &ioqueue->closing_list) {
884 pj_ioqueue_key_t *next = key->next;
885
886 pj_assert(key->closing != 0);
887
888 if (PJ_TIME_VAL_GTE(now, key->free_time)) {
889 pj_list_erase(key);
890 pj_list_push_back(&ioqueue->free_list, key);
891 }
892 key = next;
893 }
894 }
895}
896#endif
897
Benny Prijono8d317a02006-03-22 11:49:19 +0000898/*
899 * pj_ioqueue_poll()
900 *
901 * Poll for events.
902 */
903PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
904{
905 DWORD dwMsec;
Benny Prijono3569c0d2007-04-06 10:29:20 +0000906#if PJ_HAS_TCP
Benny Prijono8d317a02006-03-22 11:49:19 +0000907 int connect_count = 0;
Benny Prijono3569c0d2007-04-06 10:29:20 +0000908#endif
Benny Prijono5accbd02006-03-30 16:32:18 +0000909 int event_count = 0;
Benny Prijono8d317a02006-03-22 11:49:19 +0000910
911 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
912
Benny Prijono8d317a02006-03-22 11:49:19 +0000913 /* Calculate miliseconds timeout for GetQueuedCompletionStatus */
914 dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
915
916 /* Poll for completion status. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000917 event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);
918
919#if PJ_HAS_TCP
920 /* Check the connecting array, only when there's no activity. */
921 if (event_count == 0) {
922 connect_count = check_connecting(ioqueue);
923 if (connect_count > 0)
924 event_count += connect_count;
925 }
926#endif
927
928#if PJ_IOQUEUE_HAS_SAFE_UNREG
929 /* Check the closing keys only when there's no activity and when there are
930 * pending closing keys.
931 */
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000932 if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000933 pj_lock_acquire(ioqueue->lock);
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000934 scan_closing_keys(ioqueue);
Benny Prijono5accbd02006-03-30 16:32:18 +0000935 pj_lock_release(ioqueue->lock);
936 }
937#endif
Benny Prijono8d317a02006-03-22 11:49:19 +0000938
939 /* Return number of events. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000940 return event_count;
Benny Prijono9033e312005-11-21 02:08:39 +0000941}
942
943/*
944 * pj_ioqueue_recv()
945 *
946 * Initiate overlapped WSARecv() operation.
947 */
948PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
949 pj_ioqueue_op_key_t *op_key,
950 void *buffer,
951 pj_ssize_t *length,
952 pj_uint32_t flags )
953{
954 /*
955 * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and
956 * addrlen here. But unfortunately it generates EINVAL... :-(
957 * -bennylp
958 */
959 int rc;
960 DWORD bytesRead;
961 DWORD dwFlags = 0;
962 union operation_key *op_key_rec;
963
964 PJ_CHECK_STACK();
965 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
966
Benny Prijono5accbd02006-03-30 16:32:18 +0000967#if PJ_IOQUEUE_HAS_SAFE_UNREG
968 /* Check key is not closing */
969 if (key->closing)
970 return PJ_ECANCELLED;
971#endif
972
Benny Prijono9033e312005-11-21 02:08:39 +0000973 op_key_rec = (union operation_key*)op_key->internal__;
974 op_key_rec->overlapped.wsabuf.buf = buffer;
975 op_key_rec->overlapped.wsabuf.len = *length;
976
977 dwFlags = flags;
978
979 /* Try non-overlapped received first to see if data is
980 * immediately available.
981 */
982 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
983 rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
984 &bytesRead, &dwFlags, NULL, NULL);
985 if (rc == 0) {
986 *length = bytesRead;
987 return PJ_SUCCESS;
988 } else {
989 DWORD dwError = WSAGetLastError();
990 if (dwError != WSAEWOULDBLOCK) {
991 *length = -1;
992 return PJ_RETURN_OS_ERROR(dwError);
993 }
994 }
995 }
996
997 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
998
999 /*
1000 * No immediate data available.
1001 * Register overlapped Recv() operation.
1002 */
Benny Prijonoac623b32006-07-03 15:19:31 +00001003 pj_bzero( &op_key_rec->overlapped.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +00001004 sizeof(op_key_rec->overlapped.overlapped));
1005 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
1006
1007 rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1008 &bytesRead, &dwFlags,
1009 &op_key_rec->overlapped.overlapped, NULL);
1010 if (rc == SOCKET_ERROR) {
1011 DWORD dwStatus = WSAGetLastError();
1012 if (dwStatus!=WSA_IO_PENDING) {
1013 *length = -1;
1014 return PJ_STATUS_FROM_OS(dwStatus);
1015 }
1016 }
1017
1018 /* Pending operation has been scheduled. */
1019 return PJ_EPENDING;
1020}
1021
1022/*
1023 * pj_ioqueue_recvfrom()
1024 *
1025 * Initiate overlapped RecvFrom() operation.
1026 */
1027PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
1028 pj_ioqueue_op_key_t *op_key,
1029 void *buffer,
1030 pj_ssize_t *length,
1031 pj_uint32_t flags,
1032 pj_sockaddr_t *addr,
1033 int *addrlen)
1034{
1035 int rc;
1036 DWORD bytesRead;
1037 DWORD dwFlags = 0;
1038 union operation_key *op_key_rec;
1039
1040 PJ_CHECK_STACK();
1041 PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
1042
Benny Prijono5accbd02006-03-30 16:32:18 +00001043#if PJ_IOQUEUE_HAS_SAFE_UNREG
1044 /* Check key is not closing */
1045 if (key->closing)
1046 return PJ_ECANCELLED;
1047#endif
1048
Benny Prijono9033e312005-11-21 02:08:39 +00001049 op_key_rec = (union operation_key*)op_key->internal__;
1050 op_key_rec->overlapped.wsabuf.buf = buffer;
1051 op_key_rec->overlapped.wsabuf.len = *length;
1052
1053 dwFlags = flags;
1054
1055 /* Try non-overlapped received first to see if data is
1056 * immediately available.
1057 */
1058 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
1059 rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1060 &bytesRead, &dwFlags, addr, addrlen, NULL, NULL);
1061 if (rc == 0) {
1062 *length = bytesRead;
1063 return PJ_SUCCESS;
1064 } else {
1065 DWORD dwError = WSAGetLastError();
1066 if (dwError != WSAEWOULDBLOCK) {
1067 *length = -1;
1068 return PJ_RETURN_OS_ERROR(dwError);
1069 }
1070 }
1071 }
1072
1073 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
1074
1075 /*
1076 * No immediate data available.
1077 * Register overlapped Recv() operation.
1078 */
Benny Prijonoac623b32006-07-03 15:19:31 +00001079 pj_bzero( &op_key_rec->overlapped.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +00001080 sizeof(op_key_rec->overlapped.overlapped));
1081 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
1082
1083 rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1084 &bytesRead, &dwFlags, addr, addrlen,
1085 &op_key_rec->overlapped.overlapped, NULL);
1086 if (rc == SOCKET_ERROR) {
1087 DWORD dwStatus = WSAGetLastError();
1088 if (dwStatus!=WSA_IO_PENDING) {
1089 *length = -1;
1090 return PJ_STATUS_FROM_OS(dwStatus);
1091 }
1092 }
1093
1094 /* Pending operation has been scheduled. */
1095 return PJ_EPENDING;
1096}
1097
1098/*
1099 * pj_ioqueue_send()
1100 *
1101 * Initiate overlapped Send operation.
1102 */
1103PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
1104 pj_ioqueue_op_key_t *op_key,
1105 const void *data,
1106 pj_ssize_t *length,
1107 pj_uint32_t flags )
1108{
1109 return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0);
1110}
1111
1112
1113/*
1114 * pj_ioqueue_sendto()
1115 *
1116 * Initiate overlapped SendTo operation.
1117 */
1118PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
1119 pj_ioqueue_op_key_t *op_key,
1120 const void *data,
1121 pj_ssize_t *length,
1122 pj_uint32_t flags,
1123 const pj_sockaddr_t *addr,
1124 int addrlen)
1125{
1126 int rc;
1127 DWORD bytesWritten;
1128 DWORD dwFlags;
1129 union operation_key *op_key_rec;
1130
1131 PJ_CHECK_STACK();
1132 PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);
Benny Prijono5accbd02006-03-30 16:32:18 +00001133
1134#if PJ_IOQUEUE_HAS_SAFE_UNREG
1135 /* Check key is not closing */
1136 if (key->closing)
1137 return PJ_ECANCELLED;
1138#endif
1139
Benny Prijono9033e312005-11-21 02:08:39 +00001140 op_key_rec = (union operation_key*)op_key->internal__;
1141
1142 /*
1143 * First try blocking write.
1144 */
1145 op_key_rec->overlapped.wsabuf.buf = (void*)data;
1146 op_key_rec->overlapped.wsabuf.len = *length;
1147
1148 dwFlags = flags;
1149
1150 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
1151 rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1152 &bytesWritten, dwFlags, addr, addrlen,
1153 NULL, NULL);
1154 if (rc == 0) {
1155 *length = bytesWritten;
1156 return PJ_SUCCESS;
1157 } else {
1158 DWORD dwStatus = WSAGetLastError();
1159 if (dwStatus != WSAEWOULDBLOCK) {
1160 *length = -1;
1161 return PJ_RETURN_OS_ERROR(dwStatus);
1162 }
1163 }
1164 }
1165
1166 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
1167
1168 /*
1169 * Data can't be sent immediately.
1170 * Schedule asynchronous WSASend().
1171 */
Benny Prijonoac623b32006-07-03 15:19:31 +00001172 pj_bzero( &op_key_rec->overlapped.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +00001173 sizeof(op_key_rec->overlapped.overlapped));
1174 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND;
1175
1176 rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
1177 &bytesWritten, dwFlags, addr, addrlen,
1178 &op_key_rec->overlapped.overlapped, NULL);
1179 if (rc == SOCKET_ERROR) {
1180 DWORD dwStatus = WSAGetLastError();
1181 if (dwStatus!=WSA_IO_PENDING)
1182 return PJ_STATUS_FROM_OS(dwStatus);
1183 }
1184
1185 /* Asynchronous operation successfully submitted. */
1186 return PJ_EPENDING;
1187}
1188
1189#if PJ_HAS_TCP
1190
1191/*
1192 * pj_ioqueue_accept()
1193 *
1194 * Initiate overlapped accept() operation.
1195 */
1196PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
1197 pj_ioqueue_op_key_t *op_key,
1198 pj_sock_t *new_sock,
1199 pj_sockaddr_t *local,
1200 pj_sockaddr_t *remote,
1201 int *addrlen)
1202{
1203 BOOL rc;
1204 DWORD bytesReceived;
1205 pj_status_t status;
1206 union operation_key *op_key_rec;
1207 SOCKET sock;
1208
1209 PJ_CHECK_STACK();
1210 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
1211
Benny Prijono5accbd02006-03-30 16:32:18 +00001212#if PJ_IOQUEUE_HAS_SAFE_UNREG
1213 /* Check key is not closing */
1214 if (key->closing)
1215 return PJ_ECANCELLED;
1216#endif
1217
Benny Prijono9033e312005-11-21 02:08:39 +00001218 /*
1219 * See if there is a new connection immediately available.
1220 */
1221 sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0);
1222 if (sock != INVALID_SOCKET) {
1223 /* Yes! New socket is available! */
Benny Prijono948d4f82009-10-15 04:04:45 +00001224 if (local && addrlen) {
1225 int status;
Benny Prijono9033e312005-11-21 02:08:39 +00001226
Nanang Izzuddinb2c75292010-01-08 13:08:05 +00001227 /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket
1228 * addresses can be obtained with getsockname() and getpeername().
1229 */
1230 status = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
1231 (char*)&key->hnd, sizeof(SOCKET));
1232 /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
1233 * So ignore the error status.
1234 */
1235
Benny Prijono948d4f82009-10-15 04:04:45 +00001236 status = getsockname(sock, local, addrlen);
1237 if (status != 0) {
1238 DWORD dwError = WSAGetLastError();
1239 closesocket(sock);
1240 return PJ_RETURN_OS_ERROR(dwError);
1241 }
1242 }
Benny Prijono9033e312005-11-21 02:08:39 +00001243
1244 *new_sock = sock;
1245 return PJ_SUCCESS;
1246
1247 } else {
1248 DWORD dwError = WSAGetLastError();
1249 if (dwError != WSAEWOULDBLOCK) {
1250 return PJ_RETURN_OS_ERROR(dwError);
1251 }
1252 }
1253
1254 /*
1255 * No connection is immediately available.
1256 * Must schedule an asynchronous operation.
1257 */
1258 op_key_rec = (union operation_key*)op_key->internal__;
1259
Benny Prijono8ab968f2007-07-20 08:08:30 +00001260 status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0,
Benny Prijono9033e312005-11-21 02:08:39 +00001261 &op_key_rec->accept.newsock);
1262 if (status != PJ_SUCCESS)
1263 return status;
1264
Benny Prijono9033e312005-11-21 02:08:39 +00001265 op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT;
1266 op_key_rec->accept.addrlen = addrlen;
1267 op_key_rec->accept.local = local;
1268 op_key_rec->accept.remote = remote;
1269 op_key_rec->accept.newsock_ptr = new_sock;
Benny Prijonoac623b32006-07-03 15:19:31 +00001270 pj_bzero( &op_key_rec->accept.overlapped,
Benny Prijono9033e312005-11-21 02:08:39 +00001271 sizeof(op_key_rec->accept.overlapped));
1272
1273 rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock,
1274 op_key_rec->accept.accept_buf,
1275 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN,
1276 &bytesReceived,
1277 &op_key_rec->accept.overlapped );
1278
1279 if (rc == TRUE) {
Nanang Izzuddinb2c75292010-01-08 13:08:05 +00001280 ioqueue_on_accept_complete(key, &op_key_rec->accept);
Benny Prijono9033e312005-11-21 02:08:39 +00001281 return PJ_SUCCESS;
1282 } else {
1283 DWORD dwStatus = WSAGetLastError();
1284 if (dwStatus!=WSA_IO_PENDING)
1285 return PJ_STATUS_FROM_OS(dwStatus);
1286 }
1287
1288 /* Asynchronous Accept() has been submitted. */
1289 return PJ_EPENDING;
1290}
1291
1292
1293/*
1294 * pj_ioqueue_connect()
1295 *
1296 * Initiate overlapped connect() operation (well, it's non-blocking actually,
1297 * since there's no overlapped version of connect()).
1298 */
1299PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
1300 const pj_sockaddr_t *addr,
1301 int addrlen )
1302{
1303 HANDLE hEvent;
1304 pj_ioqueue_t *ioqueue;
1305
1306 PJ_CHECK_STACK();
1307 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
1308
Benny Prijono5accbd02006-03-30 16:32:18 +00001309#if PJ_IOQUEUE_HAS_SAFE_UNREG
1310 /* Check key is not closing */
1311 if (key->closing)
1312 return PJ_ECANCELLED;
1313#endif
1314
Benny Prijono9033e312005-11-21 02:08:39 +00001315 /* Initiate connect() */
1316 if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) {
1317 DWORD dwStatus;
1318 dwStatus = WSAGetLastError();
1319 if (dwStatus != WSAEWOULDBLOCK) {
1320 return PJ_RETURN_OS_ERROR(dwStatus);
1321 }
1322 } else {
1323 /* Connect has completed immediately! */
1324 return PJ_SUCCESS;
1325 }
1326
1327 ioqueue = key->ioqueue;
1328
1329 /* Add to the array of connecting socket to be polled */
1330 pj_lock_acquire(ioqueue->lock);
1331
1332 if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) {
1333 pj_lock_release(ioqueue->lock);
1334 return PJ_ETOOMANYCONN;
1335 }
1336
1337 /* Get or create event object. */
1338 if (ioqueue->event_count) {
1339 hEvent = ioqueue->event_pool[ioqueue->event_count - 1];
1340 --ioqueue->event_count;
1341 } else {
1342 hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
1343 if (hEvent == NULL) {
1344 DWORD dwStatus = GetLastError();
1345 pj_lock_release(ioqueue->lock);
1346 return PJ_STATUS_FROM_OS(dwStatus);
1347 }
1348 }
1349
1350 /* Mark key as connecting.
1351 * We can't use array index since key can be removed dynamically.
1352 */
1353 key->connecting = 1;
1354
1355 /* Associate socket events to the event object. */
1356 if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) {
1357 CloseHandle(hEvent);
1358 pj_lock_release(ioqueue->lock);
1359 return PJ_RETURN_OS_ERROR(WSAGetLastError());
1360 }
1361
1362 /* Add to array. */
1363 ioqueue->connecting_keys[ ioqueue->connecting_count ] = key;
1364 ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent;
1365 ioqueue->connecting_count++;
1366
1367 pj_lock_release(ioqueue->lock);
1368
1369 return PJ_EPENDING;
1370}
1371#endif /* #if PJ_HAS_TCP */
1372
1373
1374PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1375 pj_size_t size )
1376{
Benny Prijonoac623b32006-07-03 15:19:31 +00001377 pj_bzero(op_key, size);
Benny Prijono9033e312005-11-21 02:08:39 +00001378}
1379
1380PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1381 pj_ioqueue_op_key_t *op_key )
1382{
1383 BOOL rc;
1384 DWORD bytesTransfered;
1385
1386 rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key,
1387 &bytesTransfered, FALSE );
1388
1389 if (rc == FALSE) {
1390 return GetLastError()==ERROR_IO_INCOMPLETE;
1391 }
1392
1393 return FALSE;
1394}
1395
1396
1397PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1398 pj_ioqueue_op_key_t *op_key,
1399 pj_ssize_t bytes_status )
1400{
1401 BOOL rc;
1402
1403 rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status,
1404 (long)key, (OVERLAPPED*)op_key );
1405 if (rc == FALSE) {
1406 return PJ_RETURN_OS_ERROR(GetLastError());
1407 }
1408
1409 return PJ_SUCCESS;
1410}
1411
Benny Prijonoe3f79fd2008-02-13 15:17:28 +00001412PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
1413 pj_bool_t allow)
1414{
1415 PJ_ASSERT_RETURN(key, PJ_EINVAL);
1416
1417 /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
1418 * disabled.
1419 */
1420 PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
1421
1422 key->allow_concurrent = allow;
1423 return PJ_SUCCESS;
1424}
1425
1426PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
1427{
1428#if PJ_IOQUEUE_HAS_SAFE_UNREG
1429 return pj_mutex_lock(key->mutex);
1430#else
1431 PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
1432#endif
1433}
1434
1435PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
1436{
1437#if PJ_IOQUEUE_HAS_SAFE_UNREG
1438 return pj_mutex_unlock(key->mutex);
1439#else
1440 PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
1441#endif
1442}
1443