blob: b5599d9cbf3a66755c2a8701c3dd78f005b25485 [file] [log] [blame]
Benny Prijono4d974f32005-11-06 13:32:11 +00001/* $Id$ */
2
3#include <pj/ioqueue.h>
4#include <pj/errno.h>
5#include <pj/list.h>
6#include <pj/sock.h>
7#include <pj/lock.h>
8#include <pj/assert.h>
9#include <pj/string.h>
10
11
12static void ioqueue_init( pj_ioqueue_t *ioqueue )
13{
14 ioqueue->lock = NULL;
15 ioqueue->auto_delete_lock = 0;
16}
17
18static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
19{
20 if (ioqueue->auto_delete_lock && ioqueue->lock )
21 return pj_lock_destroy(ioqueue->lock);
22 else
23 return PJ_SUCCESS;
24}
25
26/*
27 * pj_ioqueue_set_lock()
28 */
29PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
30 pj_lock_t *lock,
31 pj_bool_t auto_delete )
32{
33 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
34
35 if (ioqueue->auto_delete_lock && ioqueue->lock) {
36 pj_lock_destroy(ioqueue->lock);
37 }
38
39 ioqueue->lock = lock;
40 ioqueue->auto_delete_lock = auto_delete;
41
42 return PJ_SUCCESS;
43}
44
45static pj_status_t ioqueue_init_key( pj_pool_t *pool,
46 pj_ioqueue_t *ioqueue,
47 pj_ioqueue_key_t *key,
48 pj_sock_t sock,
49 void *user_data,
50 const pj_ioqueue_callback *cb)
51{
52 pj_status_t rc;
53 int optlen;
54
55 key->ioqueue = ioqueue;
56 key->fd = sock;
57 key->user_data = user_data;
58 pj_list_init(&key->read_list);
59 pj_list_init(&key->write_list);
60#if PJ_HAS_TCP
61 pj_list_init(&key->accept_list);
62#endif
63
64 /* Save callback. */
65 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
66
67 /* Get socket type. When socket type is datagram, some optimization
68 * will be performed during send to allow parallel send operations.
69 */
70 optlen = sizeof(key->fd_type);
71 rc = pj_sock_getsockopt(sock, PJ_SOL_SOCKET, PJ_SO_TYPE,
72 &key->fd_type, &optlen);
73 if (rc != PJ_SUCCESS)
74 key->fd_type = PJ_SOCK_STREAM;
75
76 /* Create mutex for the key. */
77 rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
78
79 return rc;
80}
81
82static void ioqueue_destroy_key( pj_ioqueue_key_t *key )
83{
84 pj_mutex_destroy(key->mutex);
85}
86
87/*
88 * pj_ioqueue_get_user_data()
89 *
90 * Obtain value associated with a key.
91 */
92PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
93{
94 PJ_ASSERT_RETURN(key != NULL, NULL);
95 return key->user_data;
96}
97
98/*
99 * pj_ioqueue_set_user_data()
100 */
101PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
102 void *user_data,
103 void **old_data)
104{
105 PJ_ASSERT_RETURN(key, PJ_EINVAL);
106
107 if (old_data)
108 *old_data = key->user_data;
109 key->user_data = user_data;
110
111 return PJ_SUCCESS;
112}
113
114PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
115{
116 return !pj_list_empty(&key->write_list);
117}
118
119PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
120{
121 return !pj_list_empty(&key->read_list);
122}
123
124PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
125{
126#if PJ_HAS_TCP
127 return !pj_list_empty(&key->accept_list);
128#else
129 return 0;
130#endif
131}
132
133PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
134{
135 return key->connecting;
136}
137
138
139/*
140 * ioqueue_dispatch_event()
141 *
142 * Report occurence of an event in the key to be processed by the
143 * framework.
144 */
145void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
146{
147 /* Lock the key. */
148 pj_mutex_lock(h->mutex);
149
150#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
151 if (h->connecting) {
152 /* Completion of connect() operation */
153 pj_ssize_t bytes_transfered;
154
155 /* Clear operation. */
156 h->connecting = 0;
157
158 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
159 ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
160
161 /* Unlock; from this point we don't need to hold key's mutex. */
162 pj_mutex_unlock(h->mutex);
163
164#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
165 /* from connect(2):
166 * On Linux, use getsockopt to read the SO_ERROR option at
167 * level SOL_SOCKET to determine whether connect() completed
168 * successfully (if SO_ERROR is zero).
169 */
170 int value;
171 socklen_t vallen = sizeof(value);
172 int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
173 &value, &vallen);
174 if (gs_rc != 0) {
175 /* Argh!! What to do now???
176 * Just indicate that the socket is connected. The
177 * application will get error as soon as it tries to use
178 * the socket to send/receive.
179 */
180 bytes_transfered = 0;
181 } else {
182 bytes_transfered = value;
183 }
184#elif defined(PJ_WIN32) && PJ_WIN32!=0
185 bytes_transfered = 0; /* success */
186#else
187 /* Excellent information in D.J. Bernstein page:
188 * http://cr.yp.to/docs/connect.html
189 *
190 * Seems like the most portable way of detecting connect()
191 * failure is to call getpeername(). If socket is connected,
192 * getpeername() will return 0. If the socket is not connected,
193 * it will return ENOTCONN, and read(fd, &ch, 1) will produce
194 * the right errno through error slippage. This is a combination
195 * of suggestions from Douglas C. Schmidt and Ken Keys.
196 */
197 int gp_rc;
198 struct sockaddr_in addr;
199 socklen_t addrlen = sizeof(addr);
200
201 gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
202 bytes_transfered = gp_rc;
203#endif
204
205 /* Call callback. */
206 if (h->cb.on_connect_complete)
207 (*h->cb.on_connect_complete)(h, bytes_transfered);
208
209 /* Done. */
210
211 } else
212#endif /* PJ_HAS_TCP */
213 if (key_has_pending_write(h)) {
214 /* Socket is writable. */
215 struct write_operation *write_op;
216 pj_ssize_t sent;
217 pj_status_t send_rc;
218
219 /* Get the first in the queue. */
220 write_op = h->write_list.next;
221
222 /* For datagrams, we can remove the write_op from the list
223 * so that send() can work in parallel.
224 */
225 if (h->fd_type == PJ_SOCK_DGRAM) {
226 pj_list_erase(write_op);
227 if (pj_list_empty(&h->write_list))
228 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
229
230 pj_mutex_unlock(h->mutex);
231 }
232
233 /* Send the data.
234 * Unfortunately we must do this while holding key's mutex, thus
235 * preventing parallel write on a single key.. :-((
236 */
237 sent = write_op->size - write_op->written;
238 if (write_op->op == PJ_IOQUEUE_OP_SEND) {
239 send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
240 &sent, write_op->flags);
241 } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
242 send_rc = pj_sock_sendto(h->fd,
243 write_op->buf+write_op->written,
244 &sent, write_op->flags,
245 &write_op->rmt_addr,
246 write_op->rmt_addrlen);
247 } else {
248 pj_assert(!"Invalid operation type!");
249 send_rc = PJ_EBUG;
250 }
251
252 if (send_rc == PJ_SUCCESS) {
253 write_op->written += sent;
254 } else {
255 pj_assert(send_rc > 0);
256 write_op->written = -send_rc;
257 }
258
259 /* Are we finished with this buffer? */
260 if (send_rc!=PJ_SUCCESS ||
261 write_op->written == (pj_ssize_t)write_op->size ||
262 h->fd_type == PJ_SOCK_DGRAM)
263 {
264 if (h->fd_type != PJ_SOCK_DGRAM) {
265 /* Write completion of the whole stream. */
266 pj_list_erase(write_op);
267
268 /* Clear operation if there's no more data to send. */
269 if (pj_list_empty(&h->write_list))
270 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
271
272 /* No need to hold mutex anymore */
273 pj_mutex_unlock(h->mutex);
274 }
275
276 /* Call callback. */
277 if (h->cb.on_write_complete) {
278 (*h->cb.on_write_complete)(h,
279 (pj_ioqueue_op_key_t*)write_op,
280 write_op->written);
281 }
282
283 } else {
284 pj_mutex_unlock(h->mutex);
285 }
286
287 /* Done. */
288 } else {
289 pj_assert(!"Descriptor is signaled but key "
290 "has no pending operation!");
291
292 pj_mutex_unlock(h->mutex);
293 }
294}
295
296void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
297{
298 pj_status_t rc;
299
300 /* Lock the key. */
301 pj_mutex_lock(h->mutex);
302
303# if PJ_HAS_TCP
304 if (!pj_list_empty(&h->accept_list)) {
305
306 struct accept_operation *accept_op;
307
308 /* Get one accept operation from the list. */
309 accept_op = h->accept_list.next;
310 pj_list_erase(accept_op);
311
312 /* Clear bit in fdset if there is no more pending accept */
313 if (pj_list_empty(&h->accept_list))
314 ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
315
316 /* Unlock; from this point we don't need to hold key's mutex. */
317 pj_mutex_unlock(h->mutex);
318
319 rc=pj_sock_accept(h->fd, accept_op->accept_fd,
320 accept_op->rmt_addr, accept_op->addrlen);
321 if (rc==PJ_SUCCESS && accept_op->local_addr) {
322 rc = pj_sock_getsockname(*accept_op->accept_fd,
323 accept_op->local_addr,
324 accept_op->addrlen);
325 }
326
327 /* Call callback. */
328 if (h->cb.on_accept_complete)
329 (*h->cb.on_accept_complete)(h,
330 (pj_ioqueue_op_key_t*)accept_op,
331 *accept_op->accept_fd, rc);
332
333 }
334 else
335# endif
336 if (key_has_pending_read(h)) {
337 struct read_operation *read_op;
338 pj_ssize_t bytes_read;
339
340 pj_assert(!pj_list_empty(&h->read_list));
341
342 /* Get one pending read operation from the list. */
343 read_op = h->read_list.next;
344 pj_list_erase(read_op);
345
346 /* Clear fdset if there is no pending read. */
347 if (pj_list_empty(&h->read_list))
348 ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
349
350 /* Unlock; from this point we don't need to hold key's mutex. */
351 pj_mutex_unlock(h->mutex);
352
353 bytes_read = read_op->size;
354
355 if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
356 rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0,
357 read_op->rmt_addr,
358 read_op->rmt_addrlen);
359 } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
360 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
361 } else {
362 pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
363 /*
364 * User has specified pj_ioqueue_read().
365 * On Win32, we should do ReadFile(). But because we got
366 * here because of select() anyway, user must have put a
367 * socket descriptor on h->fd, which in this case we can
368 * just call pj_sock_recv() instead of ReadFile().
369 * On Unix, user may put a file in h->fd, so we'll have
370 * to call read() here.
371 * This may not compile on systems which doesn't have
372 * read(). That's why we only specify PJ_LINUX here so
373 * that error is easier to catch.
374 */
375# if defined(PJ_WIN32) && PJ_WIN32 != 0
376 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
377 //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
378 // &bytes_read, NULL);
379# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
380 bytes_read = read(h->fd, h->rd_buf, bytes_read);
381 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
382# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
383 bytes_read = sys_read(h->fd, h->rd_buf, bytes_read);
384 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
385# else
386# error "Implement read() for this platform!"
387# endif
388 }
389
390 if (rc != PJ_SUCCESS) {
391# if defined(PJ_WIN32) && PJ_WIN32 != 0
392 /* On Win32, for UDP, WSAECONNRESET on the receive side
393 * indicates that previous sending has triggered ICMP Port
394 * Unreachable message.
395 * But we wouldn't know at this point which one of previous
396 * key that has triggered the error, since UDP socket can
397 * be shared!
398 * So we'll just ignore it!
399 */
400
401 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
402 //PJ_LOG(4,(THIS_FILE,
403 // "Ignored ICMP port unreach. on key=%p", h));
404 }
405# endif
406
407 /* In any case we would report this to caller. */
408 bytes_read = -rc;
409 }
410
411 /* Call callback. */
412 if (h->cb.on_read_complete) {
413 (*h->cb.on_read_complete)(h,
414 (pj_ioqueue_op_key_t*)read_op,
415 bytes_read);
416 }
417
418 } else {
419 pj_mutex_unlock(h->mutex);
420 }
421}
422
423
424void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
425 pj_ioqueue_key_t *h )
426{
427 pj_mutex_lock(h->mutex);
428
429 if (!h->connecting) {
430 /* It is possible that more than one thread was woken up, thus
431 * the remaining thread will see h->connecting as zero because
432 * it has been processed by other thread.
433 */
434 pj_mutex_unlock(h->mutex);
435 return;
436 }
437
438 /* Clear operation. */
439 h->connecting = 0;
440
441 pj_mutex_unlock(h->mutex);
442
443 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
444 ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
445
446 /* Call callback. */
447 if (h->cb.on_connect_complete)
448 (*h->cb.on_connect_complete)(h, -1);
449}
450
451/*
452 * pj_ioqueue_recv()
453 *
454 * Start asynchronous recv() from the socket.
455 */
456PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
457 pj_ioqueue_op_key_t *op_key,
458 void *buffer,
459 pj_ssize_t *length,
460 unsigned flags )
461{
462 pj_status_t status;
463 pj_ssize_t size;
464 struct read_operation *read_op;
465
466 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
467 PJ_CHECK_STACK();
468
469 /* Try to see if there's data immediately available.
470 */
471 size = *length;
472 status = pj_sock_recv(key->fd, buffer, &size, flags);
473 if (status == PJ_SUCCESS) {
474 /* Yes! Data is available! */
475 *length = size;
476 return PJ_SUCCESS;
477 } else {
478 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
479 * the error to caller.
480 */
481 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
482 return status;
483 }
484
485 /*
486 * No data is immediately available.
487 * Must schedule asynchronous operation to the ioqueue.
488 */
489 read_op = (struct read_operation*)op_key;
490
491 read_op->op = PJ_IOQUEUE_OP_RECV;
492 read_op->buf = buffer;
493 read_op->size = *length;
494 read_op->flags = flags;
495
496 pj_mutex_lock(key->mutex);
497 pj_list_insert_before(&key->read_list, read_op);
498 ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
499 pj_mutex_unlock(key->mutex);
500
501 return PJ_EPENDING;
502}
503
504/*
505 * pj_ioqueue_recvfrom()
506 *
507 * Start asynchronous recvfrom() from the socket.
508 */
509PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
510 pj_ioqueue_op_key_t *op_key,
511 void *buffer,
512 pj_ssize_t *length,
513 unsigned flags,
514 pj_sockaddr_t *addr,
515 int *addrlen)
516{
517 pj_status_t status;
518 pj_ssize_t size;
519 struct read_operation *read_op;
520
521 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
522 PJ_CHECK_STACK();
523
524 /* Try to see if there's data immediately available.
525 */
526 size = *length;
527 status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
528 addr, addrlen);
529 if (status == PJ_SUCCESS) {
530 /* Yes! Data is available! */
531 *length = size;
532 return PJ_SUCCESS;
533 } else {
534 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
535 * the error to caller.
536 */
537 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
538 return status;
539 }
540
541 /*
542 * No data is immediately available.
543 * Must schedule asynchronous operation to the ioqueue.
544 */
545 read_op = (struct read_operation*)op_key;
546
547 read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
548 read_op->buf = buffer;
549 read_op->size = *length;
550 read_op->flags = flags;
551 read_op->rmt_addr = addr;
552 read_op->rmt_addrlen = addrlen;
553
554 pj_mutex_lock(key->mutex);
555 pj_list_insert_before(&key->read_list, read_op);
556 ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
557 pj_mutex_unlock(key->mutex);
558
559 return PJ_EPENDING;
560}
561
562/*
563 * pj_ioqueue_send()
564 *
565 * Start asynchronous send() to the descriptor.
566 */
567PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
568 pj_ioqueue_op_key_t *op_key,
569 const void *data,
570 pj_ssize_t *length,
571 unsigned flags)
572{
573 struct write_operation *write_op;
574 pj_status_t status;
575 pj_ssize_t sent;
576
577 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
578 PJ_CHECK_STACK();
579
580 /* Fast track:
581 * Try to send data immediately, only if there's no pending write!
582 * Note:
583 * We are speculating that the list is empty here without properly
584 * acquiring ioqueue's mutex first. This is intentional, to maximize
585 * performance via parallelism.
586 *
587 * This should be safe, because:
588 * - by convention, we require caller to make sure that the
589 * key is not unregistered while other threads are invoking
590 * an operation on the same key.
591 * - pj_list_empty() is safe to be invoked by multiple threads,
592 * even when other threads are modifying the list.
593 */
594 if (pj_list_empty(&key->write_list)) {
595 /*
596 * See if data can be sent immediately.
597 */
598 sent = *length;
599 status = pj_sock_send(key->fd, data, &sent, flags);
600 if (status == PJ_SUCCESS) {
601 /* Success! */
602 *length = sent;
603 return PJ_SUCCESS;
604 } else {
605 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
606 * the error to caller.
607 */
608 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
609 return status;
610 }
611 }
612 }
613
614 /*
615 * Schedule asynchronous send.
616 */
617 write_op = (struct write_operation*)op_key;
618 write_op->op = PJ_IOQUEUE_OP_SEND;
619 write_op->buf = NULL;
620 write_op->size = *length;
621 write_op->written = 0;
622 write_op->flags = flags;
623
624 pj_mutex_lock(key->mutex);
625 pj_list_insert_before(&key->write_list, write_op);
626 ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
627 pj_mutex_unlock(key->mutex);
628
629 return PJ_EPENDING;
630}
631
632
633/*
634 * pj_ioqueue_sendto()
635 *
636 * Start asynchronous write() to the descriptor.
637 */
638PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
639 pj_ioqueue_op_key_t *op_key,
640 const void *data,
641 pj_ssize_t *length,
642 unsigned flags,
643 const pj_sockaddr_t *addr,
644 int addrlen)
645{
646 struct write_operation *write_op;
647 pj_status_t status;
648 pj_ssize_t sent;
649
650 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
651 PJ_CHECK_STACK();
652
653 /* Fast track:
654 * Try to send data immediately, only if there's no pending write!
655 * Note:
656 * We are speculating that the list is empty here without properly
657 * acquiring ioqueue's mutex first. This is intentional, to maximize
658 * performance via parallelism.
659 *
660 * This should be safe, because:
661 * - by convention, we require caller to make sure that the
662 * key is not unregistered while other threads are invoking
663 * an operation on the same key.
664 * - pj_list_empty() is safe to be invoked by multiple threads,
665 * even when other threads are modifying the list.
666 */
667 if (pj_list_empty(&key->write_list)) {
668 /*
669 * See if data can be sent immediately.
670 */
671 sent = *length;
672 status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
673 if (status == PJ_SUCCESS) {
674 /* Success! */
675 *length = sent;
676 return PJ_SUCCESS;
677 } else {
678 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
679 * the error to caller.
680 */
681 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
682 return status;
683 }
684 }
685 }
686
687 /*
688 * Check that address storage can hold the address parameter.
689 */
690 PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);
691
692 /*
693 * Schedule asynchronous send.
694 */
695 write_op = (struct write_operation*)op_key;
696 write_op->op = PJ_IOQUEUE_OP_SEND_TO;
697 write_op->buf = NULL;
698 write_op->size = *length;
699 write_op->written = 0;
700 write_op->flags = flags;
701 pj_memcpy(&write_op->rmt_addr, addr, addrlen);
702 write_op->rmt_addrlen = addrlen;
703
704 pj_mutex_lock(key->mutex);
705 pj_list_insert_before(&key->write_list, write_op);
706 ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
707 pj_mutex_unlock(key->mutex);
708
709 return PJ_EPENDING;
710}
711
712#if PJ_HAS_TCP
713/*
714 * Initiate overlapped accept() operation.
715 */
716PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
717 pj_ioqueue_op_key_t *op_key,
718 pj_sock_t *new_sock,
719 pj_sockaddr_t *local,
720 pj_sockaddr_t *remote,
721 int *addrlen)
722{
723 struct accept_operation *accept_op;
724 pj_status_t status;
725
726 /* check parameters. All must be specified! */
727 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
728
729 /* Fast track:
730 * See if there's new connection available immediately.
731 */
732 if (pj_list_empty(&key->accept_list)) {
733 status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
734 if (status == PJ_SUCCESS) {
735 /* Yes! New connection is available! */
736 if (local && addrlen) {
737 status = pj_sock_getsockname(*new_sock, local, addrlen);
738 if (status != PJ_SUCCESS) {
739 pj_sock_close(*new_sock);
740 *new_sock = PJ_INVALID_SOCKET;
741 return status;
742 }
743 }
744 return PJ_SUCCESS;
745 } else {
746 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
747 * the error to caller.
748 */
749 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
750 return status;
751 }
752 }
753 }
754
755 /*
756 * No connection is available immediately.
757 * Schedule accept() operation to be completed when there is incoming
758 * connection available.
759 */
760 accept_op = (struct accept_operation*)op_key;
761
762 accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
763 accept_op->accept_fd = new_sock;
764 accept_op->rmt_addr = remote;
765 accept_op->addrlen= addrlen;
766 accept_op->local_addr = local;
767
768 pj_mutex_lock(key->mutex);
769 pj_list_insert_before(&key->accept_list, accept_op);
770 ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
771 pj_mutex_unlock(key->mutex);
772
773 return PJ_EPENDING;
774}
775
776/*
777 * Initiate overlapped connect() operation (well, it's non-blocking actually,
778 * since there's no overlapped version of connect()).
779 */
780PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
781 const pj_sockaddr_t *addr,
782 int addrlen )
783{
784 pj_status_t status;
785
786 /* check parameters. All must be specified! */
787 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
788
789 /* Check if socket has not been marked for connecting */
790 if (key->connecting != 0)
791 return PJ_EPENDING;
792
793 status = pj_sock_connect(key->fd, addr, addrlen);
794 if (status == PJ_SUCCESS) {
795 /* Connected! */
796 return PJ_SUCCESS;
797 } else {
798 if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
799 /* Pending! */
800 pj_mutex_lock(key->mutex);
801 key->connecting = PJ_TRUE;
802 ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
803 ioqueue_add_to_set(key->ioqueue, key->fd, EXCEPTION_EVENT);
804 pj_mutex_unlock(key->mutex);
805 return PJ_EPENDING;
806 } else {
807 /* Error! */
808 return status;
809 }
810 }
811}
812#endif /* PJ_HAS_TCP */
813