blob: 7319f53dfd4a47cac9e2a307dfce8b65c8cd1fa9 [file] [log] [blame]
Tristan Matthews0a329cc2013-07-17 13:20:14 -04001/* $Id: ioqueue_common_abs.c 4537 2013-06-19 06:47:43Z riza $ */
2/*
3 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20
21/*
22 * ioqueue_common_abs.c
23 *
24 * This contains common functionalities to emulate proactor pattern with
25 * various event dispatching mechanisms (e.g. select, epoll).
26 *
27 * This file will be included by the appropriate ioqueue implementation.
28 * This file is NOT supposed to be compiled as stand-alone source.
29 */
30
31#define PENDING_RETRY 2
32
33static void ioqueue_init( pj_ioqueue_t *ioqueue )
34{
35 ioqueue->lock = NULL;
36 ioqueue->auto_delete_lock = 0;
37 ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
38}
39
40static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
41{
42 if (ioqueue->auto_delete_lock && ioqueue->lock ) {
43 pj_lock_release(ioqueue->lock);
44 return pj_lock_destroy(ioqueue->lock);
45 }
46
47 return PJ_SUCCESS;
48}
49
50/*
51 * pj_ioqueue_set_lock()
52 */
53PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
54 pj_lock_t *lock,
55 pj_bool_t auto_delete )
56{
57 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
58
59 if (ioqueue->auto_delete_lock && ioqueue->lock) {
60 pj_lock_destroy(ioqueue->lock);
61 }
62
63 ioqueue->lock = lock;
64 ioqueue->auto_delete_lock = auto_delete;
65
66 return PJ_SUCCESS;
67}
68
69static pj_status_t ioqueue_init_key( pj_pool_t *pool,
70 pj_ioqueue_t *ioqueue,
71 pj_ioqueue_key_t *key,
72 pj_sock_t sock,
73 pj_grp_lock_t *grp_lock,
74 void *user_data,
75 const pj_ioqueue_callback *cb)
76{
77 pj_status_t rc;
78 int optlen;
79
80 PJ_UNUSED_ARG(pool);
81
82 key->ioqueue = ioqueue;
83 key->fd = sock;
84 key->user_data = user_data;
85 pj_list_init(&key->read_list);
86 pj_list_init(&key->write_list);
87#if PJ_HAS_TCP
88 pj_list_init(&key->accept_list);
89 key->connecting = 0;
90#endif
91
92 /* Save callback. */
93 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
94
95#if PJ_IOQUEUE_HAS_SAFE_UNREG
96 /* Set initial reference count to 1 */
97 pj_assert(key->ref_count == 0);
98 ++key->ref_count;
99
100 key->closing = 0;
101#endif
102
103 rc = pj_ioqueue_set_concurrency(key, ioqueue->default_concurrency);
104 if (rc != PJ_SUCCESS)
105 return rc;
106
107 /* Get socket type. When socket type is datagram, some optimization
108 * will be performed during send to allow parallel send operations.
109 */
110 optlen = sizeof(key->fd_type);
111 rc = pj_sock_getsockopt(sock, pj_SOL_SOCKET(), pj_SO_TYPE(),
112 &key->fd_type, &optlen);
113 if (rc != PJ_SUCCESS)
114 key->fd_type = pj_SOCK_STREAM();
115
116 /* Create mutex for the key. */
117#if !PJ_IOQUEUE_HAS_SAFE_UNREG
118 rc = pj_lock_create_simple_mutex(poll, NULL, &key->lock);
119#endif
120 if (rc != PJ_SUCCESS)
121 return rc;
122
123 /* Group lock */
124 key->grp_lock = grp_lock;
125 if (key->grp_lock) {
126 pj_grp_lock_add_ref_dbg(key->grp_lock, "ioqueue", 0);
127 }
128
129 return PJ_SUCCESS;
130}
131
132/*
133 * pj_ioqueue_get_user_data()
134 *
135 * Obtain value associated with a key.
136 */
137PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
138{
139 PJ_ASSERT_RETURN(key != NULL, NULL);
140 return key->user_data;
141}
142
143/*
144 * pj_ioqueue_set_user_data()
145 */
146PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
147 void *user_data,
148 void **old_data)
149{
150 PJ_ASSERT_RETURN(key, PJ_EINVAL);
151
152 if (old_data)
153 *old_data = key->user_data;
154 key->user_data = user_data;
155
156 return PJ_SUCCESS;
157}
158
159PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
160{
161 return !pj_list_empty(&key->write_list);
162}
163
164PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
165{
166 return !pj_list_empty(&key->read_list);
167}
168
169PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
170{
171#if PJ_HAS_TCP
172 return !pj_list_empty(&key->accept_list);
173#else
174 PJ_UNUSED_ARG(key);
175 return 0;
176#endif
177}
178
179PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
180{
181 return key->connecting;
182}
183
184
185#if PJ_IOQUEUE_HAS_SAFE_UNREG
186# define IS_CLOSING(key) (key->closing)
187#else
188# define IS_CLOSING(key) (0)
189#endif
190
191
192/*
193 * ioqueue_dispatch_event()
194 *
195 * Report occurence of an event in the key to be processed by the
196 * framework.
197 */
198void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
199{
200 /* Lock the key. */
201 pj_ioqueue_lock_key(h);
202
203 if (IS_CLOSING(h)) {
204 pj_ioqueue_unlock_key(h);
205 return;
206 }
207
208#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
209 if (h->connecting) {
210 /* Completion of connect() operation */
211 pj_status_t status;
212 pj_bool_t has_lock;
213
214 /* Clear operation. */
215 h->connecting = 0;
216
217 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
218 ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
219
220
221#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
222 /* from connect(2):
223 * On Linux, use getsockopt to read the SO_ERROR option at
224 * level SOL_SOCKET to determine whether connect() completed
225 * successfully (if SO_ERROR is zero).
226 */
227 {
228 int value;
229 int vallen = sizeof(value);
230 int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
231 &value, &vallen);
232 if (gs_rc != 0) {
233 /* Argh!! What to do now???
234 * Just indicate that the socket is connected. The
235 * application will get error as soon as it tries to use
236 * the socket to send/receive.
237 */
238 status = PJ_SUCCESS;
239 } else {
240 status = PJ_STATUS_FROM_OS(value);
241 }
242 }
243#elif (defined(PJ_WIN32) && PJ_WIN32!=0) || (defined(PJ_WIN64) && PJ_WIN64!=0)
244 status = PJ_SUCCESS; /* success */
245#else
246 /* Excellent information in D.J. Bernstein page:
247 * http://cr.yp.to/docs/connect.html
248 *
249 * Seems like the most portable way of detecting connect()
250 * failure is to call getpeername(). If socket is connected,
251 * getpeername() will return 0. If the socket is not connected,
252 * it will return ENOTCONN, and read(fd, &ch, 1) will produce
253 * the right errno through error slippage. This is a combination
254 * of suggestions from Douglas C. Schmidt and Ken Keys.
255 */
256 {
257 struct sockaddr_in addr;
258 int addrlen = sizeof(addr);
259
260 status = pj_sock_getpeername(h->fd, (struct sockaddr*)&addr,
261 &addrlen);
262 }
263#endif
264
265 /* Unlock; from this point we don't need to hold key's mutex
266 * (unless concurrency is disabled, which in this case we should
267 * hold the mutex while calling the callback) */
268 if (h->allow_concurrent) {
269 /* concurrency may be changed while we're in the callback, so
270 * save it to a flag.
271 */
272 has_lock = PJ_FALSE;
273 pj_ioqueue_unlock_key(h);
274 } else {
275 has_lock = PJ_TRUE;
276 }
277
278 /* Call callback. */
279 if (h->cb.on_connect_complete && !IS_CLOSING(h))
280 (*h->cb.on_connect_complete)(h, status);
281
282 /* Unlock if we still hold the lock */
283 if (has_lock) {
284 pj_ioqueue_unlock_key(h);
285 }
286
287 /* Done. */
288
289 } else
290#endif /* PJ_HAS_TCP */
291 if (key_has_pending_write(h)) {
292 /* Socket is writable. */
293 struct write_operation *write_op;
294 pj_ssize_t sent;
295 pj_status_t send_rc = PJ_SUCCESS;
296
297 /* Get the first in the queue. */
298 write_op = h->write_list.next;
299
300 /* For datagrams, we can remove the write_op from the list
301 * so that send() can work in parallel.
302 */
303 if (h->fd_type == pj_SOCK_DGRAM()) {
304 pj_list_erase(write_op);
305
306 if (pj_list_empty(&h->write_list))
307 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
308
309 }
310
311 /* Send the data.
312 * Unfortunately we must do this while holding key's mutex, thus
313 * preventing parallel write on a single key.. :-((
314 */
315 sent = write_op->size - write_op->written;
316 if (write_op->op == PJ_IOQUEUE_OP_SEND) {
317 send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
318 &sent, write_op->flags);
319 /* Can't do this. We only clear "op" after we're finished sending
320 * the whole buffer.
321 */
322 //write_op->op = 0;
323 } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
324 int retry = 2;
325 while (--retry >= 0) {
326 send_rc = pj_sock_sendto(h->fd,
327 write_op->buf+write_op->written,
328 &sent, write_op->flags,
329 &write_op->rmt_addr,
330 write_op->rmt_addrlen);
331#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
332 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
333 /* Special treatment for dead UDP sockets here, see ticket #1107 */
334 if (send_rc==PJ_STATUS_FROM_OS(EPIPE) && !IS_CLOSING(h) &&
335 h->fd_type==pj_SOCK_DGRAM())
336 {
337 PJ_PERROR(4,(THIS_FILE, send_rc,
338 "Send error for socket %d, retrying",
339 h->fd));
340 replace_udp_sock(h);
341 continue;
342 }
343#endif
344 break;
345 }
346
347 /* Can't do this. We only clear "op" after we're finished sending
348 * the whole buffer.
349 */
350 //write_op->op = 0;
351 } else {
352 pj_assert(!"Invalid operation type!");
353 write_op->op = PJ_IOQUEUE_OP_NONE;
354 send_rc = PJ_EBUG;
355 }
356
357 if (send_rc == PJ_SUCCESS) {
358 write_op->written += sent;
359 } else {
360 pj_assert(send_rc > 0);
361 write_op->written = -send_rc;
362 }
363
364 /* Are we finished with this buffer? */
365 if (send_rc!=PJ_SUCCESS ||
366 write_op->written == (pj_ssize_t)write_op->size ||
367 h->fd_type == pj_SOCK_DGRAM())
368 {
369 pj_bool_t has_lock;
370
371 write_op->op = PJ_IOQUEUE_OP_NONE;
372
373 if (h->fd_type != pj_SOCK_DGRAM()) {
374 /* Write completion of the whole stream. */
375 pj_list_erase(write_op);
376
377 /* Clear operation if there's no more data to send. */
378 if (pj_list_empty(&h->write_list))
379 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
380
381 }
382
383 /* Unlock; from this point we don't need to hold key's mutex
384 * (unless concurrency is disabled, which in this case we should
385 * hold the mutex while calling the callback) */
386 if (h->allow_concurrent) {
387 /* concurrency may be changed while we're in the callback, so
388 * save it to a flag.
389 */
390 has_lock = PJ_FALSE;
391 pj_ioqueue_unlock_key(h);
392 PJ_RACE_ME(5);
393 } else {
394 has_lock = PJ_TRUE;
395 }
396
397 /* Call callback. */
398 if (h->cb.on_write_complete && !IS_CLOSING(h)) {
399 (*h->cb.on_write_complete)(h,
400 (pj_ioqueue_op_key_t*)write_op,
401 write_op->written);
402 }
403
404 if (has_lock) {
405 pj_ioqueue_unlock_key(h);
406 }
407
408 } else {
409 pj_ioqueue_unlock_key(h);
410 }
411
412 /* Done. */
413 } else {
414 /*
415 * This is normal; execution may fall here when multiple threads
416 * are signalled for the same event, but only one thread eventually
417 * able to process the event.
418 */
419 pj_ioqueue_unlock_key(h);
420 }
421}
422
423void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
424{
425 pj_status_t rc;
426
427 /* Lock the key. */
428 pj_ioqueue_lock_key(h);
429
430 if (IS_CLOSING(h)) {
431 pj_ioqueue_unlock_key(h);
432 return;
433 }
434
435# if PJ_HAS_TCP
436 if (!pj_list_empty(&h->accept_list)) {
437
438 struct accept_operation *accept_op;
439 pj_bool_t has_lock;
440
441 /* Get one accept operation from the list. */
442 accept_op = h->accept_list.next;
443 pj_list_erase(accept_op);
444 accept_op->op = PJ_IOQUEUE_OP_NONE;
445
446 /* Clear bit in fdset if there is no more pending accept */
447 if (pj_list_empty(&h->accept_list))
448 ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
449
450 rc=pj_sock_accept(h->fd, accept_op->accept_fd,
451 accept_op->rmt_addr, accept_op->addrlen);
452 if (rc==PJ_SUCCESS && accept_op->local_addr) {
453 rc = pj_sock_getsockname(*accept_op->accept_fd,
454 accept_op->local_addr,
455 accept_op->addrlen);
456 }
457
458 /* Unlock; from this point we don't need to hold key's mutex
459 * (unless concurrency is disabled, which in this case we should
460 * hold the mutex while calling the callback) */
461 if (h->allow_concurrent) {
462 /* concurrency may be changed while we're in the callback, so
463 * save it to a flag.
464 */
465 has_lock = PJ_FALSE;
466 pj_ioqueue_unlock_key(h);
467 PJ_RACE_ME(5);
468 } else {
469 has_lock = PJ_TRUE;
470 }
471
472 /* Call callback. */
473 if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
474 (*h->cb.on_accept_complete)(h,
475 (pj_ioqueue_op_key_t*)accept_op,
476 *accept_op->accept_fd, rc);
477 }
478
479 if (has_lock) {
480 pj_ioqueue_unlock_key(h);
481 }
482 }
483 else
484# endif
485 if (key_has_pending_read(h)) {
486 struct read_operation *read_op;
487 pj_ssize_t bytes_read;
488 pj_bool_t has_lock;
489
490 /* Get one pending read operation from the list. */
491 read_op = h->read_list.next;
492 pj_list_erase(read_op);
493
494 /* Clear fdset if there is no pending read. */
495 if (pj_list_empty(&h->read_list))
496 ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
497
498 bytes_read = read_op->size;
499
500 if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
501 read_op->op = PJ_IOQUEUE_OP_NONE;
502 rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read,
503 read_op->flags,
504 read_op->rmt_addr,
505 read_op->rmt_addrlen);
506 } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
507 read_op->op = PJ_IOQUEUE_OP_NONE;
508 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
509 read_op->flags);
510 } else {
511 pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
512 read_op->op = PJ_IOQUEUE_OP_NONE;
513 /*
514 * User has specified pj_ioqueue_read().
515 * On Win32, we should do ReadFile(). But because we got
516 * here because of select() anyway, user must have put a
517 * socket descriptor on h->fd, which in this case we can
518 * just call pj_sock_recv() instead of ReadFile().
519 * On Unix, user may put a file in h->fd, so we'll have
520 * to call read() here.
521 * This may not compile on systems which doesn't have
522 * read(). That's why we only specify PJ_LINUX here so
523 * that error is easier to catch.
524 */
525# if defined(PJ_WIN32) && PJ_WIN32 != 0 || \
526 defined(PJ_WIN64) && PJ_WIN64 != 0 || \
527 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE != 0
528 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
529 read_op->flags);
530 //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
531 // &bytes_read, NULL);
532# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
533 bytes_read = read(h->fd, read_op->buf, bytes_read);
534 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
535# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
536 bytes_read = sys_read(h->fd, read_op->buf, bytes_read);
537 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
538# else
539# error "Implement read() for this platform!"
540# endif
541 }
542
543 if (rc != PJ_SUCCESS) {
544# if (defined(PJ_WIN32) && PJ_WIN32 != 0) || \
545 (defined(PJ_WIN64) && PJ_WIN64 != 0)
546 /* On Win32, for UDP, WSAECONNRESET on the receive side
547 * indicates that previous sending has triggered ICMP Port
548 * Unreachable message.
549 * But we wouldn't know at this point which one of previous
550 * key that has triggered the error, since UDP socket can
551 * be shared!
552 * So we'll just ignore it!
553 */
554
555 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
556 //PJ_LOG(4,(THIS_FILE,
557 // "Ignored ICMP port unreach. on key=%p", h));
558 }
559# endif
560
561 /* In any case we would report this to caller. */
562 bytes_read = -rc;
563
564#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
565 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
566 /* Special treatment for dead UDP sockets here, see ticket #1107 */
567 if (rc == PJ_STATUS_FROM_OS(ENOTCONN) && !IS_CLOSING(h) &&
568 h->fd_type==pj_SOCK_DGRAM())
569 {
570 replace_udp_sock(h);
571 }
572#endif
573 }
574
575 /* Unlock; from this point we don't need to hold key's mutex
576 * (unless concurrency is disabled, which in this case we should
577 * hold the mutex while calling the callback) */
578 if (h->allow_concurrent) {
579 /* concurrency may be changed while we're in the callback, so
580 * save it to a flag.
581 */
582 has_lock = PJ_FALSE;
583 pj_ioqueue_unlock_key(h);
584 PJ_RACE_ME(5);
585 } else {
586 has_lock = PJ_TRUE;
587 }
588
589 /* Call callback. */
590 if (h->cb.on_read_complete && !IS_CLOSING(h)) {
591 (*h->cb.on_read_complete)(h,
592 (pj_ioqueue_op_key_t*)read_op,
593 bytes_read);
594 }
595
596 if (has_lock) {
597 pj_ioqueue_unlock_key(h);
598 }
599
600 } else {
601 /*
602 * This is normal; execution may fall here when multiple threads
603 * are signalled for the same event, but only one thread eventually
604 * able to process the event.
605 */
606 pj_ioqueue_unlock_key(h);
607 }
608}
609
610
611void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
612 pj_ioqueue_key_t *h )
613{
614 pj_bool_t has_lock;
615
616 pj_ioqueue_lock_key(h);
617
618 if (!h->connecting) {
619 /* It is possible that more than one thread was woken up, thus
620 * the remaining thread will see h->connecting as zero because
621 * it has been processed by other thread.
622 */
623 pj_ioqueue_unlock_key(h);
624 return;
625 }
626
627 if (IS_CLOSING(h)) {
628 pj_ioqueue_unlock_key(h);
629 return;
630 }
631
632 /* Clear operation. */
633 h->connecting = 0;
634
635 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
636 ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
637
638 /* Unlock; from this point we don't need to hold key's mutex
639 * (unless concurrency is disabled, which in this case we should
640 * hold the mutex while calling the callback) */
641 if (h->allow_concurrent) {
642 /* concurrency may be changed while we're in the callback, so
643 * save it to a flag.
644 */
645 has_lock = PJ_FALSE;
646 pj_ioqueue_unlock_key(h);
647 PJ_RACE_ME(5);
648 } else {
649 has_lock = PJ_TRUE;
650 }
651
652 /* Call callback. */
653 if (h->cb.on_connect_complete && !IS_CLOSING(h)) {
654 pj_status_t status = -1;
655#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
656 int value;
657 int vallen = sizeof(value);
658 int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
659 &value, &vallen);
660 if (gs_rc == 0) {
661 status = PJ_RETURN_OS_ERROR(value);
662 }
663#endif
664
665 (*h->cb.on_connect_complete)(h, status);
666 }
667
668 if (has_lock) {
669 pj_ioqueue_unlock_key(h);
670 }
671}
672
673/*
674 * pj_ioqueue_recv()
675 *
676 * Start asynchronous recv() from the socket.
677 */
678PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
679 pj_ioqueue_op_key_t *op_key,
680 void *buffer,
681 pj_ssize_t *length,
682 unsigned flags )
683{
684 struct read_operation *read_op;
685
686 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
687 PJ_CHECK_STACK();
688
689 /* Check if key is closing (need to do this first before accessing
690 * other variables, since they might have been destroyed. See ticket
691 * #469).
692 */
693 if (IS_CLOSING(key))
694 return PJ_ECANCELLED;
695
696 read_op = (struct read_operation*)op_key;
697 read_op->op = PJ_IOQUEUE_OP_NONE;
698
699 /* Try to see if there's data immediately available.
700 */
701 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
702 pj_status_t status;
703 pj_ssize_t size;
704
705 size = *length;
706 status = pj_sock_recv(key->fd, buffer, &size, flags);
707 if (status == PJ_SUCCESS) {
708 /* Yes! Data is available! */
709 *length = size;
710 return PJ_SUCCESS;
711 } else {
712 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
713 * the error to caller.
714 */
715 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
716 return status;
717 }
718 }
719
720 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
721
722 /*
723 * No data is immediately available.
724 * Must schedule asynchronous operation to the ioqueue.
725 */
726 read_op->op = PJ_IOQUEUE_OP_RECV;
727 read_op->buf = buffer;
728 read_op->size = *length;
729 read_op->flags = flags;
730
731 pj_ioqueue_lock_key(key);
732 /* Check again. Handle may have been closed after the previous check
733 * in multithreaded app. If we add bad handle to the set it will
734 * corrupt the ioqueue set. See #913
735 */
736 if (IS_CLOSING(key)) {
737 pj_ioqueue_unlock_key(key);
738 return PJ_ECANCELLED;
739 }
740 pj_list_insert_before(&key->read_list, read_op);
741 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
742 pj_ioqueue_unlock_key(key);
743
744 return PJ_EPENDING;
745}
746
747/*
748 * pj_ioqueue_recvfrom()
749 *
750 * Start asynchronous recvfrom() from the socket.
751 */
752PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
753 pj_ioqueue_op_key_t *op_key,
754 void *buffer,
755 pj_ssize_t *length,
756 unsigned flags,
757 pj_sockaddr_t *addr,
758 int *addrlen)
759{
760 struct read_operation *read_op;
761
762 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
763 PJ_CHECK_STACK();
764
765 /* Check if key is closing. */
766 if (IS_CLOSING(key))
767 return PJ_ECANCELLED;
768
769 read_op = (struct read_operation*)op_key;
770 read_op->op = PJ_IOQUEUE_OP_NONE;
771
772 /* Try to see if there's data immediately available.
773 */
774 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
775 pj_status_t status;
776 pj_ssize_t size;
777
778 size = *length;
779 status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
780 addr, addrlen);
781 if (status == PJ_SUCCESS) {
782 /* Yes! Data is available! */
783 *length = size;
784 return PJ_SUCCESS;
785 } else {
786 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
787 * the error to caller.
788 */
789 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
790 return status;
791 }
792 }
793
794 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
795
796 /*
797 * No data is immediately available.
798 * Must schedule asynchronous operation to the ioqueue.
799 */
800 read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
801 read_op->buf = buffer;
802 read_op->size = *length;
803 read_op->flags = flags;
804 read_op->rmt_addr = addr;
805 read_op->rmt_addrlen = addrlen;
806
807 pj_ioqueue_lock_key(key);
808 /* Check again. Handle may have been closed after the previous check
809 * in multithreaded app. If we add bad handle to the set it will
810 * corrupt the ioqueue set. See #913
811 */
812 if (IS_CLOSING(key)) {
813 pj_ioqueue_unlock_key(key);
814 return PJ_ECANCELLED;
815 }
816 pj_list_insert_before(&key->read_list, read_op);
817 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
818 pj_ioqueue_unlock_key(key);
819
820 return PJ_EPENDING;
821}
822
823/*
824 * pj_ioqueue_send()
825 *
826 * Start asynchronous send() to the descriptor.
827 */
828PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
829 pj_ioqueue_op_key_t *op_key,
830 const void *data,
831 pj_ssize_t *length,
832 unsigned flags)
833{
834 struct write_operation *write_op;
835 pj_status_t status;
836 unsigned retry;
837 pj_ssize_t sent;
838
839 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
840 PJ_CHECK_STACK();
841
842 /* Check if key is closing. */
843 if (IS_CLOSING(key))
844 return PJ_ECANCELLED;
845
846 /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */
847 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
848
849 /* Fast track:
850 * Try to send data immediately, only if there's no pending write!
851 * Note:
852 * We are speculating that the list is empty here without properly
853 * acquiring ioqueue's mutex first. This is intentional, to maximize
854 * performance via parallelism.
855 *
856 * This should be safe, because:
857 * - by convention, we require caller to make sure that the
858 * key is not unregistered while other threads are invoking
859 * an operation on the same key.
860 * - pj_list_empty() is safe to be invoked by multiple threads,
861 * even when other threads are modifying the list.
862 */
863 if (pj_list_empty(&key->write_list)) {
864 /*
865 * See if data can be sent immediately.
866 */
867 sent = *length;
868 status = pj_sock_send(key->fd, data, &sent, flags);
869 if (status == PJ_SUCCESS) {
870 /* Success! */
871 *length = sent;
872 return PJ_SUCCESS;
873 } else {
874 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
875 * the error to caller.
876 */
877 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
878 return status;
879 }
880 }
881 }
882
883 /*
884 * Schedule asynchronous send.
885 */
886 write_op = (struct write_operation*)op_key;
887
888 /* Spin if write_op has pending operation */
889 for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
890 pj_thread_sleep(0);
891
892 /* Last chance */
893 if (write_op->op) {
894 /* Unable to send packet because there is already pending write in the
895 * write_op. We could not put the operation into the write_op
896 * because write_op already contains a pending operation! And
897 * we could not send the packet directly with send() either,
898 * because that will break the order of the packet. So we can
899 * only return error here.
900 *
901 * This could happen for example in multithreads program,
902 * where polling is done by one thread, while other threads are doing
903 * the sending only. If the polling thread runs on lower priority
904 * than the sending thread, then it's possible that the pending
905 * write flag is not cleared in-time because clearing is only done
906 * during polling.
907 *
908 * Aplication should specify multiple write operation keys on
909 * situation like this.
910 */
911 //pj_assert(!"ioqueue: there is pending operation on this key!");
912 return PJ_EBUSY;
913 }
914
915 write_op->op = PJ_IOQUEUE_OP_SEND;
916 write_op->buf = (char*)data;
917 write_op->size = *length;
918 write_op->written = 0;
919 write_op->flags = flags;
920
921 pj_ioqueue_lock_key(key);
922 /* Check again. Handle may have been closed after the previous check
923 * in multithreaded app. If we add bad handle to the set it will
924 * corrupt the ioqueue set. See #913
925 */
926 if (IS_CLOSING(key)) {
927 pj_ioqueue_unlock_key(key);
928 return PJ_ECANCELLED;
929 }
930 pj_list_insert_before(&key->write_list, write_op);
931 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
932 pj_ioqueue_unlock_key(key);
933
934 return PJ_EPENDING;
935}
936
937
938/*
939 * pj_ioqueue_sendto()
940 *
941 * Start asynchronous write() to the descriptor.
942 */
943PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
944 pj_ioqueue_op_key_t *op_key,
945 const void *data,
946 pj_ssize_t *length,
947 pj_uint32_t flags,
948 const pj_sockaddr_t *addr,
949 int addrlen)
950{
951 struct write_operation *write_op;
952 unsigned retry;
953 pj_bool_t restart_retry = PJ_FALSE;
954 pj_status_t status;
955 pj_ssize_t sent;
956
957 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
958 PJ_CHECK_STACK();
959
960#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
961 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
962retry_on_restart:
963#else
964 PJ_UNUSED_ARG(restart_retry);
965#endif
966 /* Check if key is closing. */
967 if (IS_CLOSING(key))
968 return PJ_ECANCELLED;
969
970 /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */
971 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
972
973 /* Fast track:
974 * Try to send data immediately, only if there's no pending write!
975 * Note:
976 * We are speculating that the list is empty here without properly
977 * acquiring ioqueue's mutex first. This is intentional, to maximize
978 * performance via parallelism.
979 *
980 * This should be safe, because:
981 * - by convention, we require caller to make sure that the
982 * key is not unregistered while other threads are invoking
983 * an operation on the same key.
984 * - pj_list_empty() is safe to be invoked by multiple threads,
985 * even when other threads are modifying the list.
986 */
987 if (pj_list_empty(&key->write_list)) {
988 /*
989 * See if data can be sent immediately.
990 */
991 sent = *length;
992 status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
993 if (status == PJ_SUCCESS) {
994 /* Success! */
995 *length = sent;
996 return PJ_SUCCESS;
997 } else {
998 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
999 * the error to caller.
1000 */
1001 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
1002#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
1003 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
1004 /* Special treatment for dead UDP sockets here, see ticket #1107 */
1005 if (status==PJ_STATUS_FROM_OS(EPIPE) && !IS_CLOSING(key) &&
1006 key->fd_type==pj_SOCK_DGRAM() && !restart_retry)
1007 {
1008 PJ_PERROR(4,(THIS_FILE, status,
1009 "Send error for socket %d, retrying",
1010 key->fd));
1011 replace_udp_sock(key);
1012 restart_retry = PJ_TRUE;
1013 goto retry_on_restart;
1014 }
1015#endif
1016
1017 return status;
1018 }
1019 status = status;
1020 }
1021 }
1022
1023 /*
1024 * Check that address storage can hold the address parameter.
1025 */
1026 PJ_ASSERT_RETURN(addrlen <= (int)sizeof(pj_sockaddr_in), PJ_EBUG);
1027
1028 /*
1029 * Schedule asynchronous send.
1030 */
1031 write_op = (struct write_operation*)op_key;
1032
1033 /* Spin if write_op has pending operation */
1034 for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
1035 pj_thread_sleep(0);
1036
1037 /* Last chance */
1038 if (write_op->op) {
1039 /* Unable to send packet because there is already pending write on the
1040 * write_op. We could not put the operation into the write_op
1041 * because write_op already contains a pending operation! And
1042 * we could not send the packet directly with sendto() either,
1043 * because that will break the order of the packet. So we can
1044 * only return error here.
1045 *
1046 * This could happen for example in multithreads program,
1047 * where polling is done by one thread, while other threads are doing
1048 * the sending only. If the polling thread runs on lower priority
1049 * than the sending thread, then it's possible that the pending
1050 * write flag is not cleared in-time because clearing is only done
1051 * during polling.
1052 *
1053 * Aplication should specify multiple write operation keys on
1054 * situation like this.
1055 */
1056 //pj_assert(!"ioqueue: there is pending operation on this key!");
1057 return PJ_EBUSY;
1058 }
1059
1060 write_op->op = PJ_IOQUEUE_OP_SEND_TO;
1061 write_op->buf = (char*)data;
1062 write_op->size = *length;
1063 write_op->written = 0;
1064 write_op->flags = flags;
1065 pj_memcpy(&write_op->rmt_addr, addr, addrlen);
1066 write_op->rmt_addrlen = addrlen;
1067
1068 pj_ioqueue_lock_key(key);
1069 /* Check again. Handle may have been closed after the previous check
1070 * in multithreaded app. If we add bad handle to the set it will
1071 * corrupt the ioqueue set. See #913
1072 */
1073 if (IS_CLOSING(key)) {
1074 pj_ioqueue_unlock_key(key);
1075 return PJ_ECANCELLED;
1076 }
1077 pj_list_insert_before(&key->write_list, write_op);
1078 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
1079 pj_ioqueue_unlock_key(key);
1080
1081 return PJ_EPENDING;
1082}
1083
1084#if PJ_HAS_TCP
1085/*
1086 * Initiate overlapped accept() operation.
1087 */
1088PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
1089 pj_ioqueue_op_key_t *op_key,
1090 pj_sock_t *new_sock,
1091 pj_sockaddr_t *local,
1092 pj_sockaddr_t *remote,
1093 int *addrlen)
1094{
1095 struct accept_operation *accept_op;
1096 pj_status_t status;
1097
1098 /* check parameters. All must be specified! */
1099 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
1100
1101 /* Check if key is closing. */
1102 if (IS_CLOSING(key))
1103 return PJ_ECANCELLED;
1104
1105 accept_op = (struct accept_operation*)op_key;
1106 accept_op->op = PJ_IOQUEUE_OP_NONE;
1107
1108 /* Fast track:
1109 * See if there's new connection available immediately.
1110 */
1111 if (pj_list_empty(&key->accept_list)) {
1112 status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
1113 if (status == PJ_SUCCESS) {
1114 /* Yes! New connection is available! */
1115 if (local && addrlen) {
1116 status = pj_sock_getsockname(*new_sock, local, addrlen);
1117 if (status != PJ_SUCCESS) {
1118 pj_sock_close(*new_sock);
1119 *new_sock = PJ_INVALID_SOCKET;
1120 return status;
1121 }
1122 }
1123 return PJ_SUCCESS;
1124 } else {
1125 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
1126 * the error to caller.
1127 */
1128 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
1129 return status;
1130 }
1131 }
1132 }
1133
1134 /*
1135 * No connection is available immediately.
1136 * Schedule accept() operation to be completed when there is incoming
1137 * connection available.
1138 */
1139 accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
1140 accept_op->accept_fd = new_sock;
1141 accept_op->rmt_addr = remote;
1142 accept_op->addrlen= addrlen;
1143 accept_op->local_addr = local;
1144
1145 pj_ioqueue_lock_key(key);
1146 /* Check again. Handle may have been closed after the previous check
1147 * in multithreaded app. If we add bad handle to the set it will
1148 * corrupt the ioqueue set. See #913
1149 */
1150 if (IS_CLOSING(key)) {
1151 pj_ioqueue_unlock_key(key);
1152 return PJ_ECANCELLED;
1153 }
1154 pj_list_insert_before(&key->accept_list, accept_op);
1155 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
1156 pj_ioqueue_unlock_key(key);
1157
1158 return PJ_EPENDING;
1159}
1160
1161/*
1162 * Initiate overlapped connect() operation (well, it's non-blocking actually,
1163 * since there's no overlapped version of connect()).
1164 */
1165PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
1166 const pj_sockaddr_t *addr,
1167 int addrlen )
1168{
1169 pj_status_t status;
1170
1171 /* check parameters. All must be specified! */
1172 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
1173
1174 /* Check if key is closing. */
1175 if (IS_CLOSING(key))
1176 return PJ_ECANCELLED;
1177
1178 /* Check if socket has not been marked for connecting */
1179 if (key->connecting != 0)
1180 return PJ_EPENDING;
1181
1182 status = pj_sock_connect(key->fd, addr, addrlen);
1183 if (status == PJ_SUCCESS) {
1184 /* Connected! */
1185 return PJ_SUCCESS;
1186 } else {
1187 if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
1188 /* Pending! */
1189 pj_ioqueue_lock_key(key);
1190 /* Check again. Handle may have been closed after the previous
1191 * check in multithreaded app. See #913
1192 */
1193 if (IS_CLOSING(key)) {
1194 pj_ioqueue_unlock_key(key);
1195 return PJ_ECANCELLED;
1196 }
1197 key->connecting = PJ_TRUE;
1198 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
1199 ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
1200 pj_ioqueue_unlock_key(key);
1201 return PJ_EPENDING;
1202 } else {
1203 /* Error! */
1204 return status;
1205 }
1206 }
1207}
1208#endif /* PJ_HAS_TCP */
1209
1210
1211PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1212 pj_size_t size )
1213{
1214 pj_bzero(op_key, size);
1215}
1216
1217
1218/*
1219 * pj_ioqueue_is_pending()
1220 */
1221PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1222 pj_ioqueue_op_key_t *op_key )
1223{
1224 struct generic_operation *op_rec;
1225
1226 PJ_UNUSED_ARG(key);
1227
1228 op_rec = (struct generic_operation*)op_key;
1229 return op_rec->op != 0;
1230}
1231
1232
1233/*
1234 * pj_ioqueue_post_completion()
1235 */
1236PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1237 pj_ioqueue_op_key_t *op_key,
1238 pj_ssize_t bytes_status )
1239{
1240 struct generic_operation *op_rec;
1241
1242 /*
1243 * Find the operation key in all pending operation list to
1244 * really make sure that it's still there; then call the callback.
1245 */
1246 pj_ioqueue_lock_key(key);
1247
1248 /* Find the operation in the pending read list. */
1249 op_rec = (struct generic_operation*)key->read_list.next;
1250 while (op_rec != (void*)&key->read_list) {
1251 if (op_rec == (void*)op_key) {
1252 pj_list_erase(op_rec);
1253 op_rec->op = PJ_IOQUEUE_OP_NONE;
1254 pj_ioqueue_unlock_key(key);
1255
1256 (*key->cb.on_read_complete)(key, op_key, bytes_status);
1257 return PJ_SUCCESS;
1258 }
1259 op_rec = op_rec->next;
1260 }
1261
1262 /* Find the operation in the pending write list. */
1263 op_rec = (struct generic_operation*)key->write_list.next;
1264 while (op_rec != (void*)&key->write_list) {
1265 if (op_rec == (void*)op_key) {
1266 pj_list_erase(op_rec);
1267 op_rec->op = PJ_IOQUEUE_OP_NONE;
1268 pj_ioqueue_unlock_key(key);
1269
1270 (*key->cb.on_write_complete)(key, op_key, bytes_status);
1271 return PJ_SUCCESS;
1272 }
1273 op_rec = op_rec->next;
1274 }
1275
1276 /* Find the operation in the pending accept list. */
1277 op_rec = (struct generic_operation*)key->accept_list.next;
1278 while (op_rec != (void*)&key->accept_list) {
1279 if (op_rec == (void*)op_key) {
1280 pj_list_erase(op_rec);
1281 op_rec->op = PJ_IOQUEUE_OP_NONE;
1282 pj_ioqueue_unlock_key(key);
1283
1284 (*key->cb.on_accept_complete)(key, op_key,
1285 PJ_INVALID_SOCKET,
1286 (pj_status_t)bytes_status);
1287 return PJ_SUCCESS;
1288 }
1289 op_rec = op_rec->next;
1290 }
1291
1292 pj_ioqueue_unlock_key(key);
1293
1294 return PJ_EINVALIDOP;
1295}
1296
1297PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency( pj_ioqueue_t *ioqueue,
1298 pj_bool_t allow)
1299{
1300 PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
1301 ioqueue->default_concurrency = allow;
1302 return PJ_SUCCESS;
1303}
1304
1305
1306PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
1307 pj_bool_t allow)
1308{
1309 PJ_ASSERT_RETURN(key, PJ_EINVAL);
1310
1311 /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
1312 * disabled.
1313 */
1314 PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
1315
1316 key->allow_concurrent = allow;
1317 return PJ_SUCCESS;
1318}
1319
1320PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
1321{
1322 if (key->grp_lock)
1323 return pj_grp_lock_acquire(key->grp_lock);
1324 else
1325 return pj_lock_acquire(key->lock);
1326}
1327
1328PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
1329{
1330 if (key->grp_lock)
1331 return pj_grp_lock_release(key->grp_lock);
1332 else
1333 return pj_lock_release(key->lock);
1334}
1335
1336