blob: 774d53e365be68af3dae5f53a1185dea8babf171 [file] [log] [blame]
Benny Prijonobc986152005-11-06 16:50:38 +00001/* $Id$ */
2
3/*
4 * ioqueue_common_abs.c
5 *
6 * This contains common functionalities to emulate proactor pattern with
7 * various event dispatching mechanisms (e.g. select, epoll).
8 *
9 * This file will be included by the appropriate ioqueue implementation.
10 * This file is NOT supposed to be compiled as stand-alone source.
11 */
12
13static void ioqueue_init( pj_ioqueue_t *ioqueue )
14{
15 ioqueue->lock = NULL;
16 ioqueue->auto_delete_lock = 0;
17}
18
19static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
20{
21 if (ioqueue->auto_delete_lock && ioqueue->lock )
22 return pj_lock_destroy(ioqueue->lock);
23 else
24 return PJ_SUCCESS;
25}
26
27/*
28 * pj_ioqueue_set_lock()
29 */
30PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
31 pj_lock_t *lock,
32 pj_bool_t auto_delete )
33{
34 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
35
36 if (ioqueue->auto_delete_lock && ioqueue->lock) {
37 pj_lock_destroy(ioqueue->lock);
38 }
39
40 ioqueue->lock = lock;
41 ioqueue->auto_delete_lock = auto_delete;
42
43 return PJ_SUCCESS;
44}
45
46static pj_status_t ioqueue_init_key( pj_pool_t *pool,
47 pj_ioqueue_t *ioqueue,
48 pj_ioqueue_key_t *key,
49 pj_sock_t sock,
50 void *user_data,
51 const pj_ioqueue_callback *cb)
52{
53 pj_status_t rc;
54 int optlen;
55
56 key->ioqueue = ioqueue;
57 key->fd = sock;
58 key->user_data = user_data;
59 pj_list_init(&key->read_list);
60 pj_list_init(&key->write_list);
61#if PJ_HAS_TCP
62 pj_list_init(&key->accept_list);
63#endif
64
65 /* Save callback. */
66 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
67
68 /* Get socket type. When socket type is datagram, some optimization
69 * will be performed during send to allow parallel send operations.
70 */
71 optlen = sizeof(key->fd_type);
72 rc = pj_sock_getsockopt(sock, PJ_SOL_SOCKET, PJ_SO_TYPE,
73 &key->fd_type, &optlen);
74 if (rc != PJ_SUCCESS)
75 key->fd_type = PJ_SOCK_STREAM;
76
77 /* Create mutex for the key. */
78 rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
79
80 return rc;
81}
82
83static void ioqueue_destroy_key( pj_ioqueue_key_t *key )
84{
85 pj_mutex_destroy(key->mutex);
86}
87
88/*
89 * pj_ioqueue_get_user_data()
90 *
91 * Obtain value associated with a key.
92 */
93PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
94{
95 PJ_ASSERT_RETURN(key != NULL, NULL);
96 return key->user_data;
97}
98
99/*
100 * pj_ioqueue_set_user_data()
101 */
102PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
103 void *user_data,
104 void **old_data)
105{
106 PJ_ASSERT_RETURN(key, PJ_EINVAL);
107
108 if (old_data)
109 *old_data = key->user_data;
110 key->user_data = user_data;
111
112 return PJ_SUCCESS;
113}
114
115PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
116{
117 return !pj_list_empty(&key->write_list);
118}
119
120PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
121{
122 return !pj_list_empty(&key->read_list);
123}
124
125PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
126{
127#if PJ_HAS_TCP
128 return !pj_list_empty(&key->accept_list);
129#else
130 return 0;
131#endif
132}
133
134PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
135{
136 return key->connecting;
137}
138
139
140/*
141 * ioqueue_dispatch_event()
142 *
143 * Report occurence of an event in the key to be processed by the
144 * framework.
145 */
146void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
147{
148 /* Lock the key. */
149 pj_mutex_lock(h->mutex);
150
151#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
152 if (h->connecting) {
153 /* Completion of connect() operation */
154 pj_ssize_t bytes_transfered;
155
156 /* Clear operation. */
157 h->connecting = 0;
158
159 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
160 ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
161
162 /* Unlock; from this point we don't need to hold key's mutex. */
163 pj_mutex_unlock(h->mutex);
164
165#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
166 /* from connect(2):
167 * On Linux, use getsockopt to read the SO_ERROR option at
168 * level SOL_SOCKET to determine whether connect() completed
169 * successfully (if SO_ERROR is zero).
170 */
171 int value;
172 socklen_t vallen = sizeof(value);
173 int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
174 &value, &vallen);
175 if (gs_rc != 0) {
176 /* Argh!! What to do now???
177 * Just indicate that the socket is connected. The
178 * application will get error as soon as it tries to use
179 * the socket to send/receive.
180 */
181 bytes_transfered = 0;
182 } else {
183 bytes_transfered = value;
184 }
185#elif defined(PJ_WIN32) && PJ_WIN32!=0
186 bytes_transfered = 0; /* success */
187#else
188 /* Excellent information in D.J. Bernstein page:
189 * http://cr.yp.to/docs/connect.html
190 *
191 * Seems like the most portable way of detecting connect()
192 * failure is to call getpeername(). If socket is connected,
193 * getpeername() will return 0. If the socket is not connected,
194 * it will return ENOTCONN, and read(fd, &ch, 1) will produce
195 * the right errno through error slippage. This is a combination
196 * of suggestions from Douglas C. Schmidt and Ken Keys.
197 */
198 int gp_rc;
199 struct sockaddr_in addr;
200 socklen_t addrlen = sizeof(addr);
201
202 gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
203 bytes_transfered = gp_rc;
204#endif
205
206 /* Call callback. */
207 if (h->cb.on_connect_complete)
208 (*h->cb.on_connect_complete)(h, bytes_transfered);
209
210 /* Done. */
211
212 } else
213#endif /* PJ_HAS_TCP */
214 if (key_has_pending_write(h)) {
215 /* Socket is writable. */
216 struct write_operation *write_op;
217 pj_ssize_t sent;
218 pj_status_t send_rc;
219
220 /* Get the first in the queue. */
221 write_op = h->write_list.next;
222
223 /* For datagrams, we can remove the write_op from the list
224 * so that send() can work in parallel.
225 */
226 if (h->fd_type == PJ_SOCK_DGRAM) {
227 pj_list_erase(write_op);
228 if (pj_list_empty(&h->write_list))
229 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
230
231 pj_mutex_unlock(h->mutex);
232 }
233
234 /* Send the data.
235 * Unfortunately we must do this while holding key's mutex, thus
236 * preventing parallel write on a single key.. :-((
237 */
238 sent = write_op->size - write_op->written;
239 if (write_op->op == PJ_IOQUEUE_OP_SEND) {
240 send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
241 &sent, write_op->flags);
242 } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
243 send_rc = pj_sock_sendto(h->fd,
244 write_op->buf+write_op->written,
245 &sent, write_op->flags,
246 &write_op->rmt_addr,
247 write_op->rmt_addrlen);
248 } else {
249 pj_assert(!"Invalid operation type!");
250 send_rc = PJ_EBUG;
251 }
252
253 if (send_rc == PJ_SUCCESS) {
254 write_op->written += sent;
255 } else {
256 pj_assert(send_rc > 0);
257 write_op->written = -send_rc;
258 }
259
260 /* Are we finished with this buffer? */
261 if (send_rc!=PJ_SUCCESS ||
262 write_op->written == (pj_ssize_t)write_op->size ||
263 h->fd_type == PJ_SOCK_DGRAM)
264 {
265 if (h->fd_type != PJ_SOCK_DGRAM) {
266 /* Write completion of the whole stream. */
267 pj_list_erase(write_op);
268
269 /* Clear operation if there's no more data to send. */
270 if (pj_list_empty(&h->write_list))
271 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
272
273 /* No need to hold mutex anymore */
274 pj_mutex_unlock(h->mutex);
275 }
276
277 /* Call callback. */
278 if (h->cb.on_write_complete) {
279 (*h->cb.on_write_complete)(h,
280 (pj_ioqueue_op_key_t*)write_op,
281 write_op->written);
282 }
283
284 } else {
285 pj_mutex_unlock(h->mutex);
286 }
287
288 /* Done. */
289 } else {
290 pj_assert(!"Descriptor is signaled but key "
291 "has no pending operation!");
292
293 pj_mutex_unlock(h->mutex);
294 }
295}
296
297void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
298{
299 pj_status_t rc;
300
301 /* Lock the key. */
302 pj_mutex_lock(h->mutex);
303
304# if PJ_HAS_TCP
305 if (!pj_list_empty(&h->accept_list)) {
306
307 struct accept_operation *accept_op;
308
309 /* Get one accept operation from the list. */
310 accept_op = h->accept_list.next;
311 pj_list_erase(accept_op);
312
313 /* Clear bit in fdset if there is no more pending accept */
314 if (pj_list_empty(&h->accept_list))
315 ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
316
317 /* Unlock; from this point we don't need to hold key's mutex. */
318 pj_mutex_unlock(h->mutex);
319
320 rc=pj_sock_accept(h->fd, accept_op->accept_fd,
321 accept_op->rmt_addr, accept_op->addrlen);
322 if (rc==PJ_SUCCESS && accept_op->local_addr) {
323 rc = pj_sock_getsockname(*accept_op->accept_fd,
324 accept_op->local_addr,
325 accept_op->addrlen);
326 }
327
328 /* Call callback. */
329 if (h->cb.on_accept_complete) {
330 (*h->cb.on_accept_complete)(h,
331 (pj_ioqueue_op_key_t*)accept_op,
332 *accept_op->accept_fd, rc);
333 }
334
335 }
336 else
337# endif
338 if (key_has_pending_read(h)) {
339 struct read_operation *read_op;
340 pj_ssize_t bytes_read;
341
342 /* Get one pending read operation from the list. */
343 read_op = h->read_list.next;
344 pj_list_erase(read_op);
345
346 /* Clear fdset if there is no pending read. */
347 if (pj_list_empty(&h->read_list))
348 ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
349
350 /* Unlock; from this point we don't need to hold key's mutex. */
351 pj_mutex_unlock(h->mutex);
352
353 bytes_read = read_op->size;
354
355 if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
356 rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0,
357 read_op->rmt_addr,
358 read_op->rmt_addrlen);
359 } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
360 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
361 } else {
362 pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
363 /*
364 * User has specified pj_ioqueue_read().
365 * On Win32, we should do ReadFile(). But because we got
366 * here because of select() anyway, user must have put a
367 * socket descriptor on h->fd, which in this case we can
368 * just call pj_sock_recv() instead of ReadFile().
369 * On Unix, user may put a file in h->fd, so we'll have
370 * to call read() here.
371 * This may not compile on systems which doesn't have
372 * read(). That's why we only specify PJ_LINUX here so
373 * that error is easier to catch.
374 */
375# if defined(PJ_WIN32) && PJ_WIN32 != 0
376 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
377 //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
378 // &bytes_read, NULL);
379# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
380 bytes_read = read(h->fd, read_op->buf, bytes_read);
381 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
382# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
383 bytes_read = sys_read(h->fd, read_op->buf, bytes_read);
384 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
385# else
386# error "Implement read() for this platform!"
387# endif
388 }
389
390 if (rc != PJ_SUCCESS) {
391# if defined(PJ_WIN32) && PJ_WIN32 != 0
392 /* On Win32, for UDP, WSAECONNRESET on the receive side
393 * indicates that previous sending has triggered ICMP Port
394 * Unreachable message.
395 * But we wouldn't know at this point which one of previous
396 * key that has triggered the error, since UDP socket can
397 * be shared!
398 * So we'll just ignore it!
399 */
400
401 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
402 //PJ_LOG(4,(THIS_FILE,
403 // "Ignored ICMP port unreach. on key=%p", h));
404 }
405# endif
406
407 /* In any case we would report this to caller. */
408 bytes_read = -rc;
409 }
410
411 /* Call callback. */
412 if (h->cb.on_read_complete) {
413 (*h->cb.on_read_complete)(h,
414 (pj_ioqueue_op_key_t*)read_op,
415 bytes_read);
416 }
417
418 } else {
419 pj_mutex_unlock(h->mutex);
420 }
421}
422
423
424void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
425 pj_ioqueue_key_t *h )
426{
427 pj_mutex_lock(h->mutex);
428
429 if (!h->connecting) {
430 /* It is possible that more than one thread was woken up, thus
431 * the remaining thread will see h->connecting as zero because
432 * it has been processed by other thread.
433 */
434 pj_mutex_unlock(h->mutex);
435 return;
436 }
437
438 /* Clear operation. */
439 h->connecting = 0;
440
441 pj_mutex_unlock(h->mutex);
442
443 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
444 ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
445
446 /* Call callback. */
447 if (h->cb.on_connect_complete)
448 (*h->cb.on_connect_complete)(h, -1);
449}
450
451/*
452 * pj_ioqueue_recv()
453 *
454 * Start asynchronous recv() from the socket.
455 */
456PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
457 pj_ioqueue_op_key_t *op_key,
458 void *buffer,
459 pj_ssize_t *length,
460 unsigned flags )
461{
462 pj_status_t status;
463 pj_ssize_t size;
464 struct read_operation *read_op;
465
466 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
467 PJ_CHECK_STACK();
468
469 /* Try to see if there's data immediately available.
470 */
471 size = *length;
472 status = pj_sock_recv(key->fd, buffer, &size, flags);
473 if (status == PJ_SUCCESS) {
474 /* Yes! Data is available! */
475 *length = size;
476 return PJ_SUCCESS;
477 } else {
478 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
479 * the error to caller.
480 */
481 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
482 return status;
483 }
484
485 /*
486 * No data is immediately available.
487 * Must schedule asynchronous operation to the ioqueue.
488 */
489 read_op = (struct read_operation*)op_key;
490
491 read_op->op = PJ_IOQUEUE_OP_RECV;
492 read_op->buf = buffer;
493 read_op->size = *length;
494 read_op->flags = flags;
495
496 pj_mutex_lock(key->mutex);
497 pj_list_insert_before(&key->read_list, read_op);
498 ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
499 pj_mutex_unlock(key->mutex);
500
501 return PJ_EPENDING;
502}
503
504/*
505 * pj_ioqueue_recvfrom()
506 *
507 * Start asynchronous recvfrom() from the socket.
508 */
509PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
510 pj_ioqueue_op_key_t *op_key,
511 void *buffer,
512 pj_ssize_t *length,
513 unsigned flags,
514 pj_sockaddr_t *addr,
515 int *addrlen)
516{
517 pj_status_t status;
518 pj_ssize_t size;
519 struct read_operation *read_op;
520
521 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
522 PJ_CHECK_STACK();
523
524 /* Try to see if there's data immediately available.
525 */
526 size = *length;
527 status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
528 addr, addrlen);
529 if (status == PJ_SUCCESS) {
530 /* Yes! Data is available! */
531 *length = size;
532 return PJ_SUCCESS;
533 } else {
534 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
535 * the error to caller.
536 */
537 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
538 return status;
539 }
540
541 /*
542 * No data is immediately available.
543 * Must schedule asynchronous operation to the ioqueue.
544 */
545 read_op = (struct read_operation*)op_key;
546
547 read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
548 read_op->buf = buffer;
549 read_op->size = *length;
550 read_op->flags = flags;
551 read_op->rmt_addr = addr;
552 read_op->rmt_addrlen = addrlen;
553
554 pj_mutex_lock(key->mutex);
555 pj_list_insert_before(&key->read_list, read_op);
556 ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
557 pj_mutex_unlock(key->mutex);
558
559 return PJ_EPENDING;
560}
561
562/*
563 * pj_ioqueue_send()
564 *
565 * Start asynchronous send() to the descriptor.
566 */
567PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
568 pj_ioqueue_op_key_t *op_key,
569 const void *data,
570 pj_ssize_t *length,
571 unsigned flags)
572{
573 struct write_operation *write_op;
574 pj_status_t status;
575 pj_ssize_t sent;
576
577 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
578 PJ_CHECK_STACK();
579
580 /* Fast track:
581 * Try to send data immediately, only if there's no pending write!
582 * Note:
583 * We are speculating that the list is empty here without properly
584 * acquiring ioqueue's mutex first. This is intentional, to maximize
585 * performance via parallelism.
586 *
587 * This should be safe, because:
588 * - by convention, we require caller to make sure that the
589 * key is not unregistered while other threads are invoking
590 * an operation on the same key.
591 * - pj_list_empty() is safe to be invoked by multiple threads,
592 * even when other threads are modifying the list.
593 */
594 if (pj_list_empty(&key->write_list)) {
595 /*
596 * See if data can be sent immediately.
597 */
598 sent = *length;
599 status = pj_sock_send(key->fd, data, &sent, flags);
600 if (status == PJ_SUCCESS) {
601 /* Success! */
602 *length = sent;
603 return PJ_SUCCESS;
604 } else {
605 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
606 * the error to caller.
607 */
608 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
609 return status;
610 }
611 }
612 }
613
614 /*
615 * Schedule asynchronous send.
616 */
617 write_op = (struct write_operation*)op_key;
618 write_op->op = PJ_IOQUEUE_OP_SEND;
619 write_op->buf = NULL;
620 write_op->size = *length;
621 write_op->written = 0;
622 write_op->flags = flags;
623
624 pj_mutex_lock(key->mutex);
625 pj_list_insert_before(&key->write_list, write_op);
626 ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
627 pj_mutex_unlock(key->mutex);
628
629 return PJ_EPENDING;
630}
631
632
633/*
634 * pj_ioqueue_sendto()
635 *
636 * Start asynchronous write() to the descriptor.
637 */
638PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
639 pj_ioqueue_op_key_t *op_key,
640 const void *data,
641 pj_ssize_t *length,
642 unsigned flags,
643 const pj_sockaddr_t *addr,
644 int addrlen)
645{
646 struct write_operation *write_op;
647 pj_status_t status;
648 pj_ssize_t sent;
649
650 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
651 PJ_CHECK_STACK();
652
653 /* Fast track:
654 * Try to send data immediately, only if there's no pending write!
655 * Note:
656 * We are speculating that the list is empty here without properly
657 * acquiring ioqueue's mutex first. This is intentional, to maximize
658 * performance via parallelism.
659 *
660 * This should be safe, because:
661 * - by convention, we require caller to make sure that the
662 * key is not unregistered while other threads are invoking
663 * an operation on the same key.
664 * - pj_list_empty() is safe to be invoked by multiple threads,
665 * even when other threads are modifying the list.
666 */
667 if (pj_list_empty(&key->write_list)) {
668 /*
669 * See if data can be sent immediately.
670 */
671 sent = *length;
672 status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
673 if (status == PJ_SUCCESS) {
674 /* Success! */
675 *length = sent;
676 return PJ_SUCCESS;
677 } else {
678 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
679 * the error to caller.
680 */
681 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
682 return status;
683 }
684 }
685 }
686
687 /*
688 * Check that address storage can hold the address parameter.
689 */
690 PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);
691
692 /*
693 * Schedule asynchronous send.
694 */
695 write_op = (struct write_operation*)op_key;
696 write_op->op = PJ_IOQUEUE_OP_SEND_TO;
697 write_op->buf = NULL;
698 write_op->size = *length;
699 write_op->written = 0;
700 write_op->flags = flags;
701 pj_memcpy(&write_op->rmt_addr, addr, addrlen);
702 write_op->rmt_addrlen = addrlen;
703
704 pj_mutex_lock(key->mutex);
705 pj_list_insert_before(&key->write_list, write_op);
706 ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
707 pj_mutex_unlock(key->mutex);
708
709 return PJ_EPENDING;
710}
711
712#if PJ_HAS_TCP
713/*
714 * Initiate overlapped accept() operation.
715 */
716PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
717 pj_ioqueue_op_key_t *op_key,
718 pj_sock_t *new_sock,
719 pj_sockaddr_t *local,
720 pj_sockaddr_t *remote,
721 int *addrlen)
722{
723 struct accept_operation *accept_op;
724 pj_status_t status;
725
726 /* check parameters. All must be specified! */
727 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
728
729 /* Fast track:
730 * See if there's new connection available immediately.
731 */
732 if (pj_list_empty(&key->accept_list)) {
733 status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
734 if (status == PJ_SUCCESS) {
735 /* Yes! New connection is available! */
736 if (local && addrlen) {
737 status = pj_sock_getsockname(*new_sock, local, addrlen);
738 if (status != PJ_SUCCESS) {
739 pj_sock_close(*new_sock);
740 *new_sock = PJ_INVALID_SOCKET;
741 return status;
742 }
743 }
744 return PJ_SUCCESS;
745 } else {
746 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
747 * the error to caller.
748 */
749 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
750 return status;
751 }
752 }
753 }
754
755 /*
756 * No connection is available immediately.
757 * Schedule accept() operation to be completed when there is incoming
758 * connection available.
759 */
760 accept_op = (struct accept_operation*)op_key;
761
762 accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
763 accept_op->accept_fd = new_sock;
764 accept_op->rmt_addr = remote;
765 accept_op->addrlen= addrlen;
766 accept_op->local_addr = local;
767
768 pj_mutex_lock(key->mutex);
769 pj_list_insert_before(&key->accept_list, accept_op);
770 ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
771 pj_mutex_unlock(key->mutex);
772
773 return PJ_EPENDING;
774}
775
776/*
777 * Initiate overlapped connect() operation (well, it's non-blocking actually,
778 * since there's no overlapped version of connect()).
779 */
780PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
781 const pj_sockaddr_t *addr,
782 int addrlen )
783{
784 pj_status_t status;
785
786 /* check parameters. All must be specified! */
787 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
788
789 /* Check if socket has not been marked for connecting */
790 if (key->connecting != 0)
791 return PJ_EPENDING;
792
793 status = pj_sock_connect(key->fd, addr, addrlen);
794 if (status == PJ_SUCCESS) {
795 /* Connected! */
796 return PJ_SUCCESS;
797 } else {
798 if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
799 /* Pending! */
800 pj_mutex_lock(key->mutex);
801 key->connecting = PJ_TRUE;
802 ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
803 ioqueue_add_to_set(key->ioqueue, key->fd, EXCEPTION_EVENT);
804 pj_mutex_unlock(key->mutex);
805 return PJ_EPENDING;
806 } else {
807 /* Error! */
808 return status;
809 }
810 }
811}
812#endif /* PJ_HAS_TCP */
813