blob: 79e46183479175086adfa219923376d2c79244dc [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
3 * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19#include <pj/ioqueue.h>
20#include <pj/os.h>
21#include <pj/lock.h>
22#include <pj/pool.h>
23#include <pj/string.h>
24#include <pj/sock.h>
25#include <pj/array.h>
26#include <pj/log.h>
27#include <pj/assert.h>
28#include <pj/errno.h>
29
30
31#if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0
32# include <winsock2.h>
33#elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0
34# include <winsock.h>
35#endif
36
37#if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0
38# include <mswsock.h>
39#endif
40
41
42/* The address specified in AcceptEx() must be 16 more than the size of
43 * SOCKADDR (source: MSDN).
44 */
45#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16)
46
47typedef struct generic_overlapped
48{
49 WSAOVERLAPPED overlapped;
50 pj_ioqueue_operation_e operation;
51} generic_overlapped;
52
53/*
54 * OVERLAPPPED structure for send and receive.
55 */
56typedef struct ioqueue_overlapped
57{
58 WSAOVERLAPPED overlapped;
59 pj_ioqueue_operation_e operation;
60 WSABUF wsabuf;
61 pj_sockaddr_in dummy_addr;
62 int dummy_addrlen;
63} ioqueue_overlapped;
64
65#if PJ_HAS_TCP
66/*
67 * OVERLAP structure for accept.
68 */
69typedef struct ioqueue_accept_rec
70{
71 WSAOVERLAPPED overlapped;
72 pj_ioqueue_operation_e operation;
73 pj_sock_t newsock;
74 pj_sock_t *newsock_ptr;
75 int *addrlen;
76 void *remote;
77 void *local;
78 char accept_buf[2 * ACCEPT_ADDR_LEN];
79} ioqueue_accept_rec;
80#endif
81
82/*
83 * Structure to hold pending operation key.
84 */
85union operation_key
86{
87 generic_overlapped generic;
88 ioqueue_overlapped overlapped;
89#if PJ_HAS_TCP
90 ioqueue_accept_rec accept;
91#endif
92};
93
94/* Type of handle in the key. */
95enum handle_type
96{
97 HND_IS_UNKNOWN,
98 HND_IS_FILE,
99 HND_IS_SOCKET,
100};
101
102/*
103 * Structure for individual socket.
104 */
105struct pj_ioqueue_key_t
106{
107 pj_ioqueue_t *ioqueue;
108 HANDLE hnd;
109 void *user_data;
110 enum handle_type hnd_type;
111#if PJ_HAS_TCP
112 int connecting;
113#endif
114 pj_ioqueue_callback cb;
115};
116
117/*
118 * IO Queue structure.
119 */
120struct pj_ioqueue_t
121{
122 HANDLE iocp;
123 pj_lock_t *lock;
124 pj_bool_t auto_delete_lock;
125 unsigned event_count;
126 HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1];
127#if PJ_HAS_TCP
128 unsigned connecting_count;
129 HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1];
130 pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1];
131#endif
132};
133
134
135#if PJ_HAS_TCP
136/*
137 * Process the socket when the overlapped accept() completed.
138 */
139static void ioqueue_on_accept_complete(ioqueue_accept_rec *accept_overlapped)
140{
141 struct sockaddr *local;
142 struct sockaddr *remote;
143 int locallen, remotelen;
144
145 PJ_CHECK_STACK();
146
147 /* Operation complete immediately. */
148 GetAcceptExSockaddrs( accept_overlapped->accept_buf,
149 0,
150 ACCEPT_ADDR_LEN,
151 ACCEPT_ADDR_LEN,
152 &local,
153 &locallen,
154 &remote,
155 &remotelen);
156 if (*accept_overlapped->addrlen > locallen) {
157 pj_memcpy(accept_overlapped->local, local, locallen);
158 pj_memcpy(accept_overlapped->remote, remote, locallen);
159 } else {
160 pj_memset(accept_overlapped->local, 0, *accept_overlapped->addrlen);
161 pj_memset(accept_overlapped->remote, 0, *accept_overlapped->addrlen);
162 }
163 *accept_overlapped->addrlen = locallen;
164 if (accept_overlapped->newsock_ptr)
165 *accept_overlapped->newsock_ptr = accept_overlapped->newsock;
166 accept_overlapped->operation = 0;
167 accept_overlapped->newsock = PJ_INVALID_SOCKET;
168}
169
170static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos)
171{
172 pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos];
173 HANDLE hEvent = ioqueue->connecting_handles[pos];
174
175 /* Remove key from array of connecting handles. */
176 pj_array_erase(ioqueue->connecting_keys, sizeof(key),
177 ioqueue->connecting_count, pos);
178 pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE),
179 ioqueue->connecting_count, pos);
180 --ioqueue->connecting_count;
181
182 /* Disassociate the socket from the event. */
183 WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0);
184
185 /* Put event object to pool. */
186 if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) {
187 ioqueue->event_pool[ioqueue->event_count++] = hEvent;
188 } else {
189 /* Shouldn't happen. There should be no more pending connections
190 * than max.
191 */
192 pj_assert(0);
193 CloseHandle(hEvent);
194 }
195
196}
197
198/*
199 * Poll for the completion of non-blocking connect().
200 * If there's a completion, the function return the key of the completed
201 * socket, and 'result' argument contains the connect() result. If connect()
202 * succeeded, 'result' will have value zero, otherwise will have the error
203 * code.
204 */
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000205static int check_connecting( pj_ioqueue_t *ioqueue )
Benny Prijono9033e312005-11-21 02:08:39 +0000206{
Benny Prijono9033e312005-11-21 02:08:39 +0000207 if (ioqueue->connecting_count) {
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000208 int i, count;
209 struct
210 {
211 pj_ioqueue_key_t *key;
212 pj_status_t status;
213 } events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1];
Benny Prijono9033e312005-11-21 02:08:39 +0000214
215 pj_lock_acquire(ioqueue->lock);
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000216 for (count=0; count<PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1; ++count) {
217 DWORD result;
Benny Prijono9033e312005-11-21 02:08:39 +0000218
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000219 result = WaitForMultipleObjects(ioqueue->connecting_count,
220 ioqueue->connecting_handles,
221 FALSE, 0);
222 if (result >= WAIT_OBJECT_0 &&
223 result < WAIT_OBJECT_0+ioqueue->connecting_count)
224 {
225 WSANETWORKEVENTS net_events;
Benny Prijono9033e312005-11-21 02:08:39 +0000226
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000227 /* Got completed connect(). */
228 unsigned pos = result - WAIT_OBJECT_0;
229 events[count].key = ioqueue->connecting_keys[pos];
Benny Prijono9033e312005-11-21 02:08:39 +0000230
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000231 /* See whether connect has succeeded. */
232 WSAEnumNetworkEvents((pj_sock_t)events[count].key->hnd,
233 ioqueue->connecting_handles[pos],
234 &net_events);
235 events[count].status =
236 PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]);
237
238 /* Erase socket from pending connect. */
239 erase_connecting_socket(ioqueue, pos);
240 } else {
241 /* No more events */
242 break;
243 }
Benny Prijono9033e312005-11-21 02:08:39 +0000244 }
245 pj_lock_release(ioqueue->lock);
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000246
247 /* Call callbacks. */
248 for (i=0; i<count; ++i) {
249 if (events[i].key->cb.on_connect_complete) {
250 events[i].key->cb.on_connect_complete(events[i].key,
251 events[i].status);
252 }
253 }
254
255 return count;
Benny Prijono9033e312005-11-21 02:08:39 +0000256 }
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000257
258 return 0;
259
Benny Prijono9033e312005-11-21 02:08:39 +0000260}
261#endif
262
263/*
264 * pj_ioqueue_name()
265 */
266PJ_DEF(const char*) pj_ioqueue_name(void)
267{
268 return "iocp";
269}
270
271/*
272 * pj_ioqueue_create()
273 */
274PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
275 pj_size_t max_fd,
276 pj_ioqueue_t **p_ioqueue)
277{
278 pj_ioqueue_t *ioqueue;
279 pj_status_t rc;
280
281 PJ_UNUSED_ARG(max_fd);
282 PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL);
283
284 rc = sizeof(union operation_key);
285
286 /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */
287 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
288 sizeof(union operation_key), PJ_EBUG);
289
290 ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue));
291 ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
292 if (ioqueue->iocp == NULL)
293 return PJ_RETURN_OS_ERROR(GetLastError());
294
295 rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock);
296 if (rc != PJ_SUCCESS) {
297 CloseHandle(ioqueue->iocp);
298 return rc;
299 }
300
301 ioqueue->auto_delete_lock = PJ_TRUE;
302
303 *p_ioqueue = ioqueue;
304
305 PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue));
306 return PJ_SUCCESS;
307}
308
309/*
310 * pj_ioqueue_destroy()
311 */
312PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
313{
314 unsigned i;
315
316 PJ_CHECK_STACK();
317 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
318
319 /* Destroy events in the pool */
320 for (i=0; i<ioqueue->event_count; ++i) {
321 CloseHandle(ioqueue->event_pool[i]);
322 }
323 ioqueue->event_count = 0;
324
325 if (CloseHandle(ioqueue->iocp) != TRUE)
326 return PJ_RETURN_OS_ERROR(GetLastError());
327
328 if (ioqueue->auto_delete_lock)
329 pj_lock_destroy(ioqueue->lock);
330
331 return PJ_SUCCESS;
332}
333
334/*
335 * pj_ioqueue_set_lock()
336 */
337PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
338 pj_lock_t *lock,
339 pj_bool_t auto_delete )
340{
341 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
342
343 if (ioqueue->auto_delete_lock) {
344 pj_lock_destroy(ioqueue->lock);
345 }
346
347 ioqueue->lock = lock;
348 ioqueue->auto_delete_lock = auto_delete;
349
350 return PJ_SUCCESS;
351}
352
353/*
354 * pj_ioqueue_register_sock()
355 */
356PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
357 pj_ioqueue_t *ioqueue,
358 pj_sock_t sock,
359 void *user_data,
360 const pj_ioqueue_callback *cb,
361 pj_ioqueue_key_t **key )
362{
363 HANDLE hioq;
364 pj_ioqueue_key_t *rec;
365 u_long value;
366 int rc;
367
368 PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL);
369
370 /* Build the key for this socket. */
371 rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
372 rec->ioqueue = ioqueue;
373 rec->hnd = (HANDLE)sock;
374 rec->hnd_type = HND_IS_SOCKET;
375 rec->user_data = user_data;
376 pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
377
378 /* Set socket to nonblocking. */
379 value = 1;
380 rc = ioctlsocket(sock, FIONBIO, &value);
381 if (rc != 0) {
382 return PJ_RETURN_OS_ERROR(WSAGetLastError());
383 }
384
385 /* Associate with IOCP */
386 hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0);
387 if (!hioq) {
388 return PJ_RETURN_OS_ERROR(GetLastError());
389 }
390
391 *key = rec;
392 return PJ_SUCCESS;
393}
394
395/*
396 * pj_ioqueue_unregister()
397 */
398PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
399{
400 PJ_ASSERT_RETURN(key, PJ_EINVAL);
401
402#if PJ_HAS_TCP
403 if (key->connecting) {
404 unsigned pos;
405 pj_ioqueue_t *ioqueue;
406
407 ioqueue = key->ioqueue;
408
409 /* Erase from connecting_handles */
410 pj_lock_acquire(ioqueue->lock);
411 for (pos=0; pos < ioqueue->connecting_count; ++pos) {
412 if (ioqueue->connecting_keys[pos] == key) {
413 erase_connecting_socket(ioqueue, pos);
414 break;
415 }
416 }
417 key->connecting = 0;
418 pj_lock_release(ioqueue->lock);
419 }
420#endif
421 if (key->hnd_type == HND_IS_FILE) {
422 CloseHandle(key->hnd);
423 }
424 return PJ_SUCCESS;
425}
426
427/*
428 * pj_ioqueue_get_user_data()
429 */
430PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
431{
432 PJ_ASSERT_RETURN(key, NULL);
433 return key->user_data;
434}
435
436/*
437 * pj_ioqueue_set_user_data()
438 */
439PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
440 void *user_data,
441 void **old_data )
442{
443 PJ_ASSERT_RETURN(key, PJ_EINVAL);
444
445 if (old_data)
446 *old_data = key->user_data;
447
448 key->user_data = user_data;
449 return PJ_SUCCESS;
450}
451
452/*
453 * pj_ioqueue_poll()
454 *
455 * Poll for events.
456 */
457PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
458{
459 DWORD dwMsec, dwBytesTransfered, dwKey;
460 generic_overlapped *pOv;
461 pj_ioqueue_key_t *key;
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000462 int connect_count;
Benny Prijono4f2be312005-11-21 17:01:06 +0000463 pj_ssize_t size_status = -1;
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000464 BOOL rcGetQueued;;
Benny Prijono9033e312005-11-21 02:08:39 +0000465
466 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
467
468 /* Check the connecting array. */
469#if PJ_HAS_TCP
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000470 connect_count = check_connecting(ioqueue);
Benny Prijono9033e312005-11-21 02:08:39 +0000471#endif
472
473 /* Calculate miliseconds timeout for GetQueuedCompletionStatus */
474 dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
475
476 /* Poll for completion status. */
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000477 rcGetQueued = GetQueuedCompletionStatus(ioqueue->iocp, &dwBytesTransfered,
478 &dwKey, (OVERLAPPED**)&pOv,
479 dwMsec);
Benny Prijono9033e312005-11-21 02:08:39 +0000480
481 /* The return value is:
482 * - nonzero if event was dequeued.
483 * - zero and pOv==NULL if no event was dequeued.
484 * - zero and pOv!=NULL if event for failed I/O was dequeued.
485 */
486 if (pOv) {
487 /* Event was dequeued for either successfull or failed I/O */
488 key = (pj_ioqueue_key_t*)dwKey;
489 size_status = dwBytesTransfered;
490 switch (pOv->operation) {
491 case PJ_IOQUEUE_OP_READ:
492 case PJ_IOQUEUE_OP_RECV:
493 case PJ_IOQUEUE_OP_RECV_FROM:
494 pOv->operation = 0;
495 if (key->cb.on_read_complete)
496 key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv,
497 size_status);
498 break;
499 case PJ_IOQUEUE_OP_WRITE:
500 case PJ_IOQUEUE_OP_SEND:
501 case PJ_IOQUEUE_OP_SEND_TO:
502 pOv->operation = 0;
503 if (key->cb.on_write_complete)
504 key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv,
505 size_status);
506 break;
507#if PJ_HAS_TCP
508 case PJ_IOQUEUE_OP_ACCEPT:
509 /* special case for accept. */
510 ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv);
511 if (key->cb.on_accept_complete) {
512 ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv;
513 key->cb.on_accept_complete(key,
514 (pj_ioqueue_op_key_t*)pOv,
515 accept_rec->newsock,
516 PJ_SUCCESS);
517 }
518 break;
519 case PJ_IOQUEUE_OP_CONNECT:
520#endif
521 case PJ_IOQUEUE_OP_NONE:
522 pj_assert(0);
523 break;
524 }
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000525 return connect_count+1;
Benny Prijono9033e312005-11-21 02:08:39 +0000526 }
527
Benny Prijonoe986c8c2005-11-25 10:54:54 +0000528 /* No event was queued. */
529 return connect_count;
Benny Prijono9033e312005-11-21 02:08:39 +0000530}
531
532/*
533 * pj_ioqueue_recv()
534 *
535 * Initiate overlapped WSARecv() operation.
536 */
537PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
538 pj_ioqueue_op_key_t *op_key,
539 void *buffer,
540 pj_ssize_t *length,
541 pj_uint32_t flags )
542{
543 /*
544 * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and
545 * addrlen here. But unfortunately it generates EINVAL... :-(
546 * -bennylp
547 */
548 int rc;
549 DWORD bytesRead;
550 DWORD dwFlags = 0;
551 union operation_key *op_key_rec;
552
553 PJ_CHECK_STACK();
554 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
555
556 op_key_rec = (union operation_key*)op_key->internal__;
557 op_key_rec->overlapped.wsabuf.buf = buffer;
558 op_key_rec->overlapped.wsabuf.len = *length;
559
560 dwFlags = flags;
561
562 /* Try non-overlapped received first to see if data is
563 * immediately available.
564 */
565 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
566 rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
567 &bytesRead, &dwFlags, NULL, NULL);
568 if (rc == 0) {
569 *length = bytesRead;
570 return PJ_SUCCESS;
571 } else {
572 DWORD dwError = WSAGetLastError();
573 if (dwError != WSAEWOULDBLOCK) {
574 *length = -1;
575 return PJ_RETURN_OS_ERROR(dwError);
576 }
577 }
578 }
579
580 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
581
582 /*
583 * No immediate data available.
584 * Register overlapped Recv() operation.
585 */
586 pj_memset(&op_key_rec->overlapped.overlapped, 0,
587 sizeof(op_key_rec->overlapped.overlapped));
588 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
589
590 rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
591 &bytesRead, &dwFlags,
592 &op_key_rec->overlapped.overlapped, NULL);
593 if (rc == SOCKET_ERROR) {
594 DWORD dwStatus = WSAGetLastError();
595 if (dwStatus!=WSA_IO_PENDING) {
596 *length = -1;
597 return PJ_STATUS_FROM_OS(dwStatus);
598 }
599 }
600
601 /* Pending operation has been scheduled. */
602 return PJ_EPENDING;
603}
604
605/*
606 * pj_ioqueue_recvfrom()
607 *
608 * Initiate overlapped RecvFrom() operation.
609 */
610PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
611 pj_ioqueue_op_key_t *op_key,
612 void *buffer,
613 pj_ssize_t *length,
614 pj_uint32_t flags,
615 pj_sockaddr_t *addr,
616 int *addrlen)
617{
618 int rc;
619 DWORD bytesRead;
620 DWORD dwFlags = 0;
621 union operation_key *op_key_rec;
622
623 PJ_CHECK_STACK();
624 PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
625
626 op_key_rec = (union operation_key*)op_key->internal__;
627 op_key_rec->overlapped.wsabuf.buf = buffer;
628 op_key_rec->overlapped.wsabuf.len = *length;
629
630 dwFlags = flags;
631
632 /* Try non-overlapped received first to see if data is
633 * immediately available.
634 */
635 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
636 rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
637 &bytesRead, &dwFlags, addr, addrlen, NULL, NULL);
638 if (rc == 0) {
639 *length = bytesRead;
640 return PJ_SUCCESS;
641 } else {
642 DWORD dwError = WSAGetLastError();
643 if (dwError != WSAEWOULDBLOCK) {
644 *length = -1;
645 return PJ_RETURN_OS_ERROR(dwError);
646 }
647 }
648 }
649
650 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
651
652 /*
653 * No immediate data available.
654 * Register overlapped Recv() operation.
655 */
656 pj_memset(&op_key_rec->overlapped.overlapped, 0,
657 sizeof(op_key_rec->overlapped.overlapped));
658 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
659
660 rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
661 &bytesRead, &dwFlags, addr, addrlen,
662 &op_key_rec->overlapped.overlapped, NULL);
663 if (rc == SOCKET_ERROR) {
664 DWORD dwStatus = WSAGetLastError();
665 if (dwStatus!=WSA_IO_PENDING) {
666 *length = -1;
667 return PJ_STATUS_FROM_OS(dwStatus);
668 }
669 }
670
671 /* Pending operation has been scheduled. */
672 return PJ_EPENDING;
673}
674
675/*
676 * pj_ioqueue_send()
677 *
678 * Initiate overlapped Send operation.
679 */
680PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
681 pj_ioqueue_op_key_t *op_key,
682 const void *data,
683 pj_ssize_t *length,
684 pj_uint32_t flags )
685{
686 return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0);
687}
688
689
690/*
691 * pj_ioqueue_sendto()
692 *
693 * Initiate overlapped SendTo operation.
694 */
695PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
696 pj_ioqueue_op_key_t *op_key,
697 const void *data,
698 pj_ssize_t *length,
699 pj_uint32_t flags,
700 const pj_sockaddr_t *addr,
701 int addrlen)
702{
703 int rc;
704 DWORD bytesWritten;
705 DWORD dwFlags;
706 union operation_key *op_key_rec;
707
708 PJ_CHECK_STACK();
709 PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);
710
711 op_key_rec = (union operation_key*)op_key->internal__;
712
713 /*
714 * First try blocking write.
715 */
716 op_key_rec->overlapped.wsabuf.buf = (void*)data;
717 op_key_rec->overlapped.wsabuf.len = *length;
718
719 dwFlags = flags;
720
721 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
722 rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
723 &bytesWritten, dwFlags, addr, addrlen,
724 NULL, NULL);
725 if (rc == 0) {
726 *length = bytesWritten;
727 return PJ_SUCCESS;
728 } else {
729 DWORD dwStatus = WSAGetLastError();
730 if (dwStatus != WSAEWOULDBLOCK) {
731 *length = -1;
732 return PJ_RETURN_OS_ERROR(dwStatus);
733 }
734 }
735 }
736
737 dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
738
739 /*
740 * Data can't be sent immediately.
741 * Schedule asynchronous WSASend().
742 */
743 pj_memset(&op_key_rec->overlapped.overlapped, 0,
744 sizeof(op_key_rec->overlapped.overlapped));
745 op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND;
746
747 rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
748 &bytesWritten, dwFlags, addr, addrlen,
749 &op_key_rec->overlapped.overlapped, NULL);
750 if (rc == SOCKET_ERROR) {
751 DWORD dwStatus = WSAGetLastError();
752 if (dwStatus!=WSA_IO_PENDING)
753 return PJ_STATUS_FROM_OS(dwStatus);
754 }
755
756 /* Asynchronous operation successfully submitted. */
757 return PJ_EPENDING;
758}
759
760#if PJ_HAS_TCP
761
762/*
763 * pj_ioqueue_accept()
764 *
765 * Initiate overlapped accept() operation.
766 */
767PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
768 pj_ioqueue_op_key_t *op_key,
769 pj_sock_t *new_sock,
770 pj_sockaddr_t *local,
771 pj_sockaddr_t *remote,
772 int *addrlen)
773{
774 BOOL rc;
775 DWORD bytesReceived;
776 pj_status_t status;
777 union operation_key *op_key_rec;
778 SOCKET sock;
779
780 PJ_CHECK_STACK();
781 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
782
783 /*
784 * See if there is a new connection immediately available.
785 */
786 sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0);
787 if (sock != INVALID_SOCKET) {
788 /* Yes! New socket is available! */
789 int status;
790
791 status = getsockname(sock, local, addrlen);
792 if (status != 0) {
793 DWORD dwError = WSAGetLastError();
794 closesocket(sock);
795 return PJ_RETURN_OS_ERROR(dwError);
796 }
797
798 *new_sock = sock;
799 return PJ_SUCCESS;
800
801 } else {
802 DWORD dwError = WSAGetLastError();
803 if (dwError != WSAEWOULDBLOCK) {
804 return PJ_RETURN_OS_ERROR(dwError);
805 }
806 }
807
808 /*
809 * No connection is immediately available.
810 * Must schedule an asynchronous operation.
811 */
812 op_key_rec = (union operation_key*)op_key->internal__;
813
814 status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0,
815 &op_key_rec->accept.newsock);
816 if (status != PJ_SUCCESS)
817 return status;
818
819 /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket
820 * addresses can be obtained with getsockname() and getpeername().
821 */
822 status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET,
823 SO_UPDATE_ACCEPT_CONTEXT,
824 (char*)&key->hnd, sizeof(SOCKET));
825 /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
826 * So ignore the error status.
827 */
828
829 op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT;
830 op_key_rec->accept.addrlen = addrlen;
831 op_key_rec->accept.local = local;
832 op_key_rec->accept.remote = remote;
833 op_key_rec->accept.newsock_ptr = new_sock;
834 pj_memset(&op_key_rec->accept.overlapped, 0,
835 sizeof(op_key_rec->accept.overlapped));
836
837 rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock,
838 op_key_rec->accept.accept_buf,
839 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN,
840 &bytesReceived,
841 &op_key_rec->accept.overlapped );
842
843 if (rc == TRUE) {
844 ioqueue_on_accept_complete(&op_key_rec->accept);
845 return PJ_SUCCESS;
846 } else {
847 DWORD dwStatus = WSAGetLastError();
848 if (dwStatus!=WSA_IO_PENDING)
849 return PJ_STATUS_FROM_OS(dwStatus);
850 }
851
852 /* Asynchronous Accept() has been submitted. */
853 return PJ_EPENDING;
854}
855
856
857/*
858 * pj_ioqueue_connect()
859 *
860 * Initiate overlapped connect() operation (well, it's non-blocking actually,
861 * since there's no overlapped version of connect()).
862 */
863PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
864 const pj_sockaddr_t *addr,
865 int addrlen )
866{
867 HANDLE hEvent;
868 pj_ioqueue_t *ioqueue;
869
870 PJ_CHECK_STACK();
871 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
872
873 /* Initiate connect() */
874 if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) {
875 DWORD dwStatus;
876 dwStatus = WSAGetLastError();
877 if (dwStatus != WSAEWOULDBLOCK) {
878 return PJ_RETURN_OS_ERROR(dwStatus);
879 }
880 } else {
881 /* Connect has completed immediately! */
882 return PJ_SUCCESS;
883 }
884
885 ioqueue = key->ioqueue;
886
887 /* Add to the array of connecting socket to be polled */
888 pj_lock_acquire(ioqueue->lock);
889
890 if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) {
891 pj_lock_release(ioqueue->lock);
892 return PJ_ETOOMANYCONN;
893 }
894
895 /* Get or create event object. */
896 if (ioqueue->event_count) {
897 hEvent = ioqueue->event_pool[ioqueue->event_count - 1];
898 --ioqueue->event_count;
899 } else {
900 hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
901 if (hEvent == NULL) {
902 DWORD dwStatus = GetLastError();
903 pj_lock_release(ioqueue->lock);
904 return PJ_STATUS_FROM_OS(dwStatus);
905 }
906 }
907
908 /* Mark key as connecting.
909 * We can't use array index since key can be removed dynamically.
910 */
911 key->connecting = 1;
912
913 /* Associate socket events to the event object. */
914 if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) {
915 CloseHandle(hEvent);
916 pj_lock_release(ioqueue->lock);
917 return PJ_RETURN_OS_ERROR(WSAGetLastError());
918 }
919
920 /* Add to array. */
921 ioqueue->connecting_keys[ ioqueue->connecting_count ] = key;
922 ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent;
923 ioqueue->connecting_count++;
924
925 pj_lock_release(ioqueue->lock);
926
927 return PJ_EPENDING;
928}
929#endif /* #if PJ_HAS_TCP */
930
931
932PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
933 pj_size_t size )
934{
935 pj_memset(op_key, 0, size);
936}
937
938PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
939 pj_ioqueue_op_key_t *op_key )
940{
941 BOOL rc;
942 DWORD bytesTransfered;
943
944 rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key,
945 &bytesTransfered, FALSE );
946
947 if (rc == FALSE) {
948 return GetLastError()==ERROR_IO_INCOMPLETE;
949 }
950
951 return FALSE;
952}
953
954
955PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
956 pj_ioqueue_op_key_t *op_key,
957 pj_ssize_t bytes_status )
958{
959 BOOL rc;
960
961 rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status,
962 (long)key, (OVERLAPPED*)op_key );
963 if (rc == FALSE) {
964 return PJ_RETURN_OS_ERROR(GetLastError());
965 }
966
967 return PJ_SUCCESS;
968}
969