blob: a16aba2d348239a60f156067e3ec1c2964cfd72c [file] [log] [blame]
Benny Prijonodd859a62005-11-01 16:42:51 +00001/* $Header: /pjproject-0.3/pjlib/src/pj/ioqueue_select.c 15 10/29/05 10:27p Bennylp $ */
2/* $Log: /pjproject-0.3/pjlib/src/pj/ioqueue_select.c $
3 *
4 * 15 10/29/05 10:27p Bennylp
5 * Fixed misc warnings.
6 *
7 * 14 10/29/05 11:31a Bennylp
8 * Changed accept and lock.
9 *
10 * 13 10/14/05 12:26a Bennylp
11 * Finished error code framework, some fixes in ioqueue, etc. Pretty
12 * major.
13 *
14 * 12 9/21/05 1:39p Bennylp
15 * Periodic checkin for backup.
16 *
17 * 11 9/17/05 10:37a Bennylp
18 * Major reorganization towards version 0.3.
19 *
20 */
21
22/*
23 * sock_select.c
24 *
25 * This is the implementation of IOQueue using pj_sock_select().
26 * It runs anywhere where pj_sock_select() is available (currently
27 * Win32, Linux, Linux kernel, etc.).
28 */
29
30#include <pj/ioqueue.h>
31#include <pj/os.h>
32#include <pj/lock.h>
33#include <pj/log.h>
34#include <pj/list.h>
35#include <pj/pool.h>
36#include <pj/string.h>
37#include <pj/assert.h>
38#include <pj/sock.h>
39#include <pj/compat/socket.h>
40#include <pj/sock_select.h>
41#include <pj/errno.h>
42
43/*
44 * ISSUES with ioqueue_select()
45 *
46 * EAGAIN/EWOULDBLOCK error in recv():
47 * - when multiple threads are working with the ioqueue, application
48 * may receive EAGAIN or EWOULDBLOCK in the receive callback.
49 * This error happens because more than one thread is watching for
50 * the same descriptor set, so when all of them call recv() or recvfrom()
51 * simultaneously, only one will succeed and the rest will get the error.
52 *
53 */
54#define THIS_FILE "ioq_select"
55
56#define PJ_IOQUEUE_IS_READ_OP(op) ((op & PJ_IOQUEUE_OP_READ) || \
57 (op & PJ_IOQUEUE_OP_RECV) || \
58 (op & PJ_IOQUEUE_OP_RECV_FROM))
59#define PJ_IOQUEUE_IS_WRITE_OP(op) ((op & PJ_IOQUEUE_OP_WRITE) || \
60 (op & PJ_IOQUEUE_OP_SEND) || \
61 (op & PJ_IOQUEUE_OP_SEND_TO))
62
63
64#if PJ_HAS_TCP
65# define PJ_IOQUEUE_IS_ACCEPT_OP(op) (op & PJ_IOQUEUE_OP_ACCEPT)
66# define PJ_IOQUEUE_IS_CONNECT_OP(op) (op & PJ_IOQUEUE_OP_CONNECT)
67#else
68# define PJ_IOQUEUE_IS_ACCEPT_OP(op) 0
69# define PJ_IOQUEUE_IS_CONNECT_OP(op) 0
70#endif
71
72/*
73 * During debugging build, VALIDATE_FD_SET is set.
74 * This will check the validity of the fd_sets.
75 */
76#if defined(PJ_DEBUG) && PJ_DEBUG != 0
77# define VALIDATE_FD_SET 1
78#else
79# define VALIDATE_FD_SET 0
80#endif
81
82/*
83 * This describes each key.
84 */
85struct pj_ioqueue_key_t
86{
87 PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t)
88 pj_sock_t fd;
89 pj_ioqueue_operation_e op;
90 void *user_data;
91 pj_ioqueue_callback cb;
92
93 void *rd_buf;
94 unsigned rd_flags;
95 pj_size_t rd_buflen;
96 void *wr_buf;
97 pj_size_t wr_buflen;
98
99 pj_sockaddr_t *rmt_addr;
100 int *rmt_addrlen;
101
102 pj_sockaddr_t *local_addr;
103 int *local_addrlen;
104
105 pj_sock_t *accept_fd;
106};
107
108/*
109 * This describes the I/O queue itself.
110 */
111struct pj_ioqueue_t
112{
113 pj_lock_t *lock;
114 pj_bool_t auto_delete_lock;
115 unsigned max, count;
116 pj_ioqueue_key_t hlist;
117 pj_fd_set_t rfdset;
118 pj_fd_set_t wfdset;
119#if PJ_HAS_TCP
120 pj_fd_set_t xfdset;
121#endif
122};
123
124/*
125 * pj_ioqueue_create()
126 *
127 * Create select ioqueue.
128 */
129PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
130 pj_size_t max_fd,
131 int max_threads,
132 pj_ioqueue_t **p_ioqueue)
133{
134 pj_ioqueue_t *ioque;
135 pj_status_t rc;
136
137 PJ_UNUSED_ARG(max_threads);
138
139 if (max_fd > PJ_IOQUEUE_MAX_HANDLES) {
140 pj_assert(!"max_fd too large");
141 return PJ_EINVAL;
142 }
143
144 ioque = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
145 ioque->max = max_fd;
146 ioque->count = 0;
147 PJ_FD_ZERO(&ioque->rfdset);
148 PJ_FD_ZERO(&ioque->wfdset);
149#if PJ_HAS_TCP
150 PJ_FD_ZERO(&ioque->xfdset);
151#endif
152 pj_list_init(&ioque->hlist);
153
154 rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioque->lock);
155 if (rc != PJ_SUCCESS)
156 return rc;
157
158 ioque->auto_delete_lock = PJ_TRUE;
159
160 PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioque));
161
162 *p_ioqueue = ioque;
163 return PJ_SUCCESS;
164}
165
166/*
167 * pj_ioqueue_destroy()
168 *
169 * Destroy ioqueue.
170 */
171PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioque)
172{
173 pj_status_t rc = PJ_SUCCESS;
174
175 PJ_ASSERT_RETURN(ioque, PJ_EINVAL);
176
177 if (ioque->auto_delete_lock)
178 rc = pj_lock_destroy(ioque->lock);
179
180 return rc;
181}
182
183
184/*
185 * pj_ioqueue_set_lock()
186 */
187PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque,
188 pj_lock_t *lock,
189 pj_bool_t auto_delete )
190{
191 PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL);
192
193 if (ioque->auto_delete_lock) {
194 pj_lock_destroy(ioque->lock);
195 }
196
197 ioque->lock = lock;
198 ioque->auto_delete_lock = auto_delete;
199
200 return PJ_SUCCESS;
201}
202
203
204/*
205 * pj_ioqueue_register_sock()
206 *
207 * Register a handle to ioqueue.
208 */
209PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
210 pj_ioqueue_t *ioque,
211 pj_sock_t sock,
212 void *user_data,
213 const pj_ioqueue_callback *cb,
214 pj_ioqueue_key_t **p_key)
215{
216 pj_ioqueue_key_t *key = NULL;
217 pj_uint32_t value;
218 pj_status_t rc = PJ_SUCCESS;
219
220 PJ_ASSERT_RETURN(pool && ioque && sock != PJ_INVALID_SOCKET &&
221 cb && p_key, PJ_EINVAL);
222
223 pj_lock_acquire(ioque->lock);
224
225 if (ioque->count >= ioque->max) {
226 rc = PJ_ETOOMANY;
227 goto on_return;
228 }
229
230 /* Set socket to nonblocking. */
231 value = 1;
232#ifdef PJ_WIN32
233 if (ioctlsocket(sock, FIONBIO, (unsigned long*)&value)) {
234#else
235 if (ioctl(sock, FIONBIO, &value)) {
236#endif
237 rc = pj_get_netos_error();
238 goto on_return;
239 }
240
241 /* Create key. */
242 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
243 key->fd = sock;
244 key->user_data = user_data;
245
246 /* Save callback. */
247 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
248
249 /* Register */
250 pj_list_insert_before(&ioque->hlist, key);
251 ++ioque->count;
252
253on_return:
254 *p_key = key;
255 pj_lock_release(ioque->lock);
256
257 return rc;
258}
259
260/*
261 * pj_ioqueue_unregister()
262 *
263 * Unregister handle from ioqueue.
264 */
265PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque,
266 pj_ioqueue_key_t *key)
267{
268 PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL);
269
270 pj_lock_acquire(ioque->lock);
271
272 pj_assert(ioque->count > 0);
273 --ioque->count;
274 pj_list_erase(key);
275 PJ_FD_CLR(key->fd, &ioque->rfdset);
276 PJ_FD_CLR(key->fd, &ioque->wfdset);
277#if PJ_HAS_TCP
278 PJ_FD_CLR(key->fd, &ioque->xfdset);
279#endif
280
281 pj_lock_release(ioque->lock);
282 return PJ_SUCCESS;
283}
284
285/*
286 * pj_ioqueue_get_user_data()
287 *
288 * Obtain value associated with a key.
289 */
290PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
291{
292 PJ_ASSERT_RETURN(key != NULL, NULL);
293 return key->user_data;
294}
295
296
297/* This supposed to check whether the fd_set values are consistent
298 * with the operation currently set in each key.
299 */
300#if VALIDATE_FD_SET
301static void validate_sets(const pj_ioqueue_t *ioque,
302 const pj_fd_set_t *rfdset,
303 const pj_fd_set_t *wfdset,
304 const pj_fd_set_t *xfdset)
305{
306 pj_ioqueue_key_t *key;
307
308 key = ioque->hlist.next;
309 while (key != &ioque->hlist) {
310 if ((key->op & PJ_IOQUEUE_OP_READ)
311 || (key->op & PJ_IOQUEUE_OP_RECV)
312 || (key->op & PJ_IOQUEUE_OP_RECV_FROM)
313#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
314 || (key->op & PJ_IOQUEUE_OP_ACCEPT)
315#endif
316 )
317 {
318 pj_assert(PJ_FD_ISSET(key->fd, rfdset));
319 }
320 else {
321 pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
322 }
323 if ((key->op & PJ_IOQUEUE_OP_WRITE)
324 || (key->op & PJ_IOQUEUE_OP_SEND)
325 || (key->op & PJ_IOQUEUE_OP_SEND_TO)
326#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
327 || (key->op & PJ_IOQUEUE_OP_CONNECT)
328#endif
329 )
330 {
331 pj_assert(PJ_FD_ISSET(key->fd, wfdset));
332 }
333 else {
334 pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
335 }
336#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
337 if (key->op & PJ_IOQUEUE_OP_CONNECT)
338 {
339 pj_assert(PJ_FD_ISSET(key->fd, xfdset));
340 }
341 else {
342 pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0);
343 }
344#endif /* PJ_HAS_TCP */
345
346 key = key->next;
347 }
348}
349#endif /* VALIDATE_FD_SET */
350
351
352/*
353 * pj_ioqueue_poll()
354 *
355 * Few things worth written:
356 *
357 * - we used to do only one callback called per poll, but it didn't go
358 * very well. The reason is because on some situation, the write
359 * callback gets called all the time, thus doesn't give the read
360 * callback to get called. This happens, for example, when user
361 * submit write operation inside the write callback.
362 * As the result, we changed the behaviour so that now multiple
363 * callbacks are called in a single poll. It should be fast too,
364 * just that we need to be carefull with the ioqueue data structs.
365 *
366 * - to guarantee preemptiveness etc, the poll function must strictly
367 * work on fd_set copy of the ioqueue (not the original one).
368 */
369PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout)
370{
371 pj_fd_set_t rfdset, wfdset, xfdset;
372 int count;
373 pj_ioqueue_key_t *h;
374
375 /* Lock ioqueue before making fd_set copies */
376 pj_lock_acquire(ioque->lock);
377
378 if (PJ_FD_COUNT(&ioque->rfdset)==0 &&
379 PJ_FD_COUNT(&ioque->wfdset)==0 &&
380 PJ_FD_COUNT(&ioque->xfdset)==0)
381 {
382 pj_lock_release(ioque->lock);
383 if (timeout)
384 pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
385 return 0;
386 }
387
388 /* Copy ioqueue's pj_fd_set_t to local variables. */
389 pj_memcpy(&rfdset, &ioque->rfdset, sizeof(pj_fd_set_t));
390 pj_memcpy(&wfdset, &ioque->wfdset, sizeof(pj_fd_set_t));
391#if PJ_HAS_TCP
392 pj_memcpy(&xfdset, &ioque->xfdset, sizeof(pj_fd_set_t));
393#else
394 PJ_FD_ZERO(&xfdset);
395#endif
396
397#if VALIDATE_FD_SET
398 validate_sets(ioque, &rfdset, &wfdset, &xfdset);
399#endif
400
401 /* Unlock ioqueue before select(). */
402 pj_lock_release(ioque->lock);
403
404 count = pj_sock_select(FD_SETSIZE, &rfdset, &wfdset, &xfdset, timeout);
405
406 if (count <= 0)
407 return count;
408
409 /* Lock ioqueue again before scanning for signalled sockets. */
410 pj_lock_acquire(ioque->lock);
411
412#if PJ_HAS_TCP
413 /* Scan for exception socket */
414 h = ioque->hlist.next;
415do_except_scan:
416 for ( ; h!=&ioque->hlist; h = h->next) {
417 if ((h->op & PJ_IOQUEUE_OP_CONNECT) && PJ_FD_ISSET(h->fd, &xfdset))
418 break;
419 }
420 if (h != &ioque->hlist) {
421 /* 'connect()' should be the only operation. */
422 pj_assert((h->op == PJ_IOQUEUE_OP_CONNECT));
423
424 /* Clear operation. */
425 h->op &= ~(PJ_IOQUEUE_OP_CONNECT);
426 PJ_FD_CLR(h->fd, &ioque->wfdset);
427 PJ_FD_CLR(h->fd, &ioque->xfdset);
428 PJ_FD_CLR(h->fd, &wfdset);
429 PJ_FD_CLR(h->fd, &xfdset);
430
431 /* Call callback. */
432 if (h->cb.on_connect_complete)
433 (*h->cb.on_connect_complete)(h, -1);
434
435 /* Re-scan exception list. */
436 goto do_except_scan;
437 }
438#endif /* PJ_HAS_TCP */
439
440 /* Scan for readable socket. */
441 h = ioque->hlist.next;
442do_readable_scan:
443 for ( ; h!=&ioque->hlist; h = h->next) {
444 if ((PJ_IOQUEUE_IS_READ_OP(h->op) || PJ_IOQUEUE_IS_ACCEPT_OP(h->op)) &&
445 PJ_FD_ISSET(h->fd, &rfdset))
446 {
447 break;
448 }
449 }
450 if (h != &ioque->hlist) {
451 pj_status_t rc;
452
453 pj_assert(PJ_IOQUEUE_IS_READ_OP(h->op) ||
454 PJ_IOQUEUE_IS_ACCEPT_OP(h->op));
455
456# if PJ_HAS_TCP
457 if ((h->op & PJ_IOQUEUE_OP_ACCEPT)) {
458 /* accept() must be the only operation specified on server socket */
459 pj_assert(h->op == PJ_IOQUEUE_OP_ACCEPT);
460
461 rc=pj_sock_accept(h->fd, h->accept_fd, h->rmt_addr, h->rmt_addrlen);
462 if (rc==0 && h->local_addr) {
463 rc = pj_sock_getsockname(*h->accept_fd, h->local_addr,
464 h->local_addrlen);
465 }
466
467 h->op &= ~(PJ_IOQUEUE_OP_ACCEPT);
468 PJ_FD_CLR(h->fd, &ioque->rfdset);
469
470 /* Call callback. */
471 if (h->cb.on_accept_complete)
472 (*h->cb.on_accept_complete)(h, *h->accept_fd, rc);
473
474 /* Re-scan readable sockets. */
475 goto do_readable_scan;
476 }
477 else {
478# endif
479 pj_ssize_t bytes_read = h->rd_buflen;
480
481 if ((h->op & PJ_IOQUEUE_OP_RECV_FROM)) {
482 rc = pj_sock_recvfrom(h->fd, h->rd_buf, &bytes_read, 0,
483 h->rmt_addr, h->rmt_addrlen);
484 } else if ((h->op & PJ_IOQUEUE_OP_RECV)) {
485 rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0);
486 } else {
487 /*
488 * User has specified pj_ioqueue_read().
489 * On Win32, we should do ReadFile(). But because we got
490 * here because of select() anyway, user must have put a
491 * socket descriptor on h->fd, which in this case we can
492 * just call pj_sock_recv() instead of ReadFile().
493 * On Unix, user may put a file in h->fd, so we'll have
494 * to call read() here.
495 * This may not compile on systems which doesn't have
496 * read(). That's why we only specify PJ_LINUX here so
497 * that error is easier to catch.
498 */
499# if defined(PJ_WIN32) && PJ_WIN32 != 0
500 rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0);
501# elif (defined(PJ_LINUX) && PJ_LINUX != 0) || \
502 (defined(PJ_SUNOS) && PJ_SUNOS != 0)
503 bytes_read = read(h->fd, h->rd_buf, bytes_read);
504 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
505# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
506 bytes_read = sys_read(h->fd, h->rd_buf, bytes_read);
507 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
508# else
509# error "Implement read() for this platform!"
510# endif
511 }
512
513 if (rc != PJ_SUCCESS) {
514# if defined(PJ_WIN32) && PJ_WIN32 != 0
515 /* On Win32, for UDP, WSAECONNRESET on the receive side
516 * indicates that previous sending has triggered ICMP Port
517 * Unreachable message.
518 * But we wouldn't know at this point which one of previous
519 * key that has triggered the error, since UDP socket can
520 * be shared!
521 * So we'll just ignore it!
522 */
523
524 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
525 PJ_LOG(4,(THIS_FILE,
526 "Ignored ICMP port unreach. on key=%p", h));
527 }
528# endif
529
530 /* In any case we would report this to caller. */
531 bytes_read = -rc;
532 }
533
534 h->op &= ~(PJ_IOQUEUE_OP_READ | PJ_IOQUEUE_OP_RECV |
535 PJ_IOQUEUE_OP_RECV_FROM);
536 PJ_FD_CLR(h->fd, &ioque->rfdset);
537 PJ_FD_CLR(h->fd, &rfdset);
538
539 /* Call callback. */
540 if (h->cb.on_read_complete)
541 (*h->cb.on_read_complete)(h, bytes_read);
542
543 /* Re-scan readable sockets. */
544 goto do_readable_scan;
545
546 }
547 }
548
549 /* Scan for writable socket */
550 h = ioque->hlist.next;
551do_writable_scan:
552 for ( ; h!=&ioque->hlist; h = h->next) {
553 if ((PJ_IOQUEUE_IS_WRITE_OP(h->op) || PJ_IOQUEUE_IS_CONNECT_OP(h->op))
554 && PJ_FD_ISSET(h->fd, &wfdset))
555 {
556 break;
557 }
558 }
559 if (h != &ioque->hlist) {
560 pj_assert(PJ_IOQUEUE_IS_WRITE_OP(h->op) ||
561 PJ_IOQUEUE_IS_CONNECT_OP(h->op));
562
563#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
564 if ((h->op & PJ_IOQUEUE_OP_CONNECT)) {
565 /* Completion of connect() operation */
566 pj_ssize_t bytes_transfered;
567
568#if (defined(PJ_LINUX) && PJ_LINUX!=0) || \
569 (defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL!=0)
570 /* from connect(2):
571 * On Linux, use getsockopt to read the SO_ERROR option at
572 * level SOL_SOCKET to determine whether connect() completed
573 * successfully (if SO_ERROR is zero).
574 */
575 int value;
576 socklen_t vallen = sizeof(value);
577 int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
578 &value, &vallen);
579 if (gs_rc != 0) {
580 /* Argh!! What to do now???
581 * Just indicate that the socket is connected. The
582 * application will get error as soon as it tries to use
583 * the socket to send/receive.
584 */
585 bytes_transfered = 0;
586 } else {
587 bytes_transfered = value;
588 }
589#elif defined(PJ_WIN32) && PJ_WIN32!=0
590 bytes_transfered = 0; /* success */
591#else
592 /* Excellent information in D.J. Bernstein page:
593 * http://cr.yp.to/docs/connect.html
594 *
595 * Seems like the most portable way of detecting connect()
596 * failure is to call getpeername(). If socket is connected,
597 * getpeername() will return 0. If the socket is not connected,
598 * it will return ENOTCONN, and read(fd, &ch, 1) will produce
599 * the right errno through error slippage. This is a combination
600 * of suggestions from Douglas C. Schmidt and Ken Keys.
601 */
602 int gp_rc;
603 struct sockaddr_in addr;
604 socklen_t addrlen = sizeof(addr);
605
606 gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
607 bytes_transfered = gp_rc;
608#endif
609
610 /* Clear operation. */
611 h->op &= (~PJ_IOQUEUE_OP_CONNECT);
612 PJ_FD_CLR(h->fd, &ioque->wfdset);
613 PJ_FD_CLR(h->fd, &ioque->xfdset);
614
615 /* Call callback. */
616 if (h->cb.on_connect_complete)
617 (*h->cb.on_connect_complete)(h, bytes_transfered);
618
619 /* Re-scan writable sockets. */
620 goto do_writable_scan;
621
622 } else
623#endif /* PJ_HAS_TCP */
624 {
625 /* Completion of write(), send(), or sendto() operation. */
626
627 /* Clear operation. */
628 h->op &= ~(PJ_IOQUEUE_OP_WRITE | PJ_IOQUEUE_OP_SEND |
629 PJ_IOQUEUE_OP_SEND_TO);
630 PJ_FD_CLR(h->fd, &ioque->wfdset);
631 PJ_FD_CLR(h->fd, &wfdset);
632
633 /* Call callback. */
634 /* All data must have been sent? */
635 if (h->cb.on_write_complete)
636 (*h->cb.on_write_complete)(h, h->wr_buflen);
637
638 /* Re-scan writable sockets. */
639 goto do_writable_scan;
640 }
641 }
642
643 /* Shouldn't happen. */
644 /* For strange reason on WinXP select() can return 1 while there is no
645 * pj_fd_set_t signaled. */
646 /* pj_assert(0); */
647
648 //count = 0;
649
650 pj_lock_release(ioque->lock);
651 return count;
652}
653
654/*
655 * pj_ioqueue_read()
656 *
657 * Start asynchronous read from the descriptor.
658 */
659PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque,
660 pj_ioqueue_key_t *key,
661 void *buffer,
662 pj_size_t buflen)
663{
664 PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
665 PJ_CHECK_STACK();
666
667 /* For consistency with other ioqueue implementation, we would reject
668 * if descriptor has already been submitted for reading before.
669 */
670 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
671 (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
672 (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
673 PJ_EBUSY);
674
675 pj_lock_acquire(ioque->lock);
676
677 key->op |= PJ_IOQUEUE_OP_READ;
678 key->rd_flags = 0;
679 key->rd_buf = buffer;
680 key->rd_buflen = buflen;
681 PJ_FD_SET(key->fd, &ioque->rfdset);
682
683 pj_lock_release(ioque->lock);
684 return PJ_EPENDING;
685}
686
687
688/*
689 * pj_ioqueue_recv()
690 *
691 * Start asynchronous recv() from the socket.
692 */
693PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque,
694 pj_ioqueue_key_t *key,
695 void *buffer,
696 pj_size_t buflen,
697 unsigned flags )
698{
699 PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
700 PJ_CHECK_STACK();
701
702 /* For consistency with other ioqueue implementation, we would reject
703 * if descriptor has already been submitted for reading before.
704 */
705 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
706 (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
707 (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
708 PJ_EBUSY);
709
710 pj_lock_acquire(ioque->lock);
711
712 key->op |= PJ_IOQUEUE_OP_RECV;
713 key->rd_buf = buffer;
714 key->rd_buflen = buflen;
715 key->rd_flags = flags;
716 PJ_FD_SET(key->fd, &ioque->rfdset);
717
718 pj_lock_release(ioque->lock);
719 return PJ_EPENDING;
720}
721
722/*
723 * pj_ioqueue_recvfrom()
724 *
725 * Start asynchronous recvfrom() from the socket.
726 */
727PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque,
728 pj_ioqueue_key_t *key,
729 void *buffer,
730 pj_size_t buflen,
731 unsigned flags,
732 pj_sockaddr_t *addr,
733 int *addrlen)
734{
735 PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
736 PJ_CHECK_STACK();
737
738 /* For consistency with other ioqueue implementation, we would reject
739 * if descriptor has already been submitted for reading before.
740 */
741 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
742 (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
743 (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
744 PJ_EBUSY);
745
746 pj_lock_acquire(ioque->lock);
747
748 key->op |= PJ_IOQUEUE_OP_RECV_FROM;
749 key->rd_buf = buffer;
750 key->rd_buflen = buflen;
751 key->rd_flags = flags;
752 key->rmt_addr = addr;
753 key->rmt_addrlen = addrlen;
754 PJ_FD_SET(key->fd, &ioque->rfdset);
755
756 pj_lock_release(ioque->lock);
757 return PJ_EPENDING;
758}
759
760/*
761 * pj_ioqueue_write()
762 *
763 * Start asynchronous write() to the descriptor.
764 */
765PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque,
766 pj_ioqueue_key_t *key,
767 const void *data,
768 pj_size_t datalen)
769{
770 pj_status_t rc;
771 pj_ssize_t sent;
772
773 PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
774 PJ_CHECK_STACK();
775
776 /* For consistency with other ioqueue implementation, we would reject
777 * if descriptor has already been submitted for writing before.
778 */
779 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
780 (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
781 (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
782 PJ_EBUSY);
783
784 sent = datalen;
785 /* sent would be -1 after pj_sock_send() if it returns error. */
786 rc = pj_sock_send(key->fd, data, &sent, 0);
787 if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
788 return rc;
789 }
790
791 pj_lock_acquire(ioque->lock);
792
793 key->op |= PJ_IOQUEUE_OP_WRITE;
794 key->wr_buf = NULL;
795 key->wr_buflen = datalen;
796 PJ_FD_SET(key->fd, &ioque->wfdset);
797
798 pj_lock_release(ioque->lock);
799
800 return PJ_EPENDING;
801}
802
803/*
804 * pj_ioqueue_send()
805 *
806 * Start asynchronous send() to the descriptor.
807 */
808PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque,
809 pj_ioqueue_key_t *key,
810 const void *data,
811 pj_size_t datalen,
812 unsigned flags)
813{
814 pj_status_t rc;
815 pj_ssize_t sent;
816
817 PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
818 PJ_CHECK_STACK();
819
820 /* For consistency with other ioqueue implementation, we would reject
821 * if descriptor has already been submitted for writing before.
822 */
823 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
824 (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
825 (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
826 PJ_EBUSY);
827
828 sent = datalen;
829 /* sent would be -1 after pj_sock_send() if it returns error. */
830 rc = pj_sock_send(key->fd, data, &sent, flags);
831 if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
832 return rc;
833 }
834
835 pj_lock_acquire(ioque->lock);
836
837 key->op |= PJ_IOQUEUE_OP_SEND;
838 key->wr_buf = NULL;
839 key->wr_buflen = datalen;
840 PJ_FD_SET(key->fd, &ioque->wfdset);
841
842 pj_lock_release(ioque->lock);
843
844 return PJ_EPENDING;
845}
846
847
848/*
849 * pj_ioqueue_sendto()
850 *
851 * Start asynchronous write() to the descriptor.
852 */
853PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque,
854 pj_ioqueue_key_t *key,
855 const void *data,
856 pj_size_t datalen,
857 unsigned flags,
858 const pj_sockaddr_t *addr,
859 int addrlen)
860{
861 pj_status_t rc;
862 pj_ssize_t sent;
863
864 PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
865 PJ_CHECK_STACK();
866
867 /* For consistency with other ioqueue implementation, we would reject
868 * if descriptor has already been submitted for writing before.
869 */
870 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
871 (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
872 (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
873 PJ_EBUSY);
874
875 sent = datalen;
876 /* sent would be -1 after pj_sock_sendto() if it returns error. */
877 rc = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
878 if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
879 return rc;
880 }
881
882 pj_lock_acquire(ioque->lock);
883
884 key->op |= PJ_IOQUEUE_OP_SEND_TO;
885 key->wr_buf = NULL;
886 key->wr_buflen = datalen;
887 PJ_FD_SET(key->fd, &ioque->wfdset);
888
889 pj_lock_release(ioque->lock);
890 return PJ_EPENDING;
891}
892
893#if PJ_HAS_TCP
894/*
895 * Initiate overlapped accept() operation.
896 */
897PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue,
898 pj_ioqueue_key_t *key,
899 pj_sock_t *new_sock,
900 pj_sockaddr_t *local,
901 pj_sockaddr_t *remote,
902 int *addrlen)
903{
904 /* check parameters. All must be specified! */
905 pj_assert(ioqueue && key && new_sock);
906
907 /* Server socket must have no other operation! */
908 pj_assert(key->op == 0);
909
910 pj_lock_acquire(ioqueue->lock);
911
912 key->op = PJ_IOQUEUE_OP_ACCEPT;
913 key->accept_fd = new_sock;
914 key->rmt_addr = remote;
915 key->rmt_addrlen = addrlen;
916 key->local_addr = local;
917 key->local_addrlen = addrlen; /* use same addr. as rmt_addrlen */
918
919 PJ_FD_SET(key->fd, &ioqueue->rfdset);
920
921 pj_lock_release(ioqueue->lock);
922 return PJ_EPENDING;
923}
924
925/*
926 * Initiate overlapped connect() operation (well, it's non-blocking actually,
927 * since there's no overlapped version of connect()).
928 */
929PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue,
930 pj_ioqueue_key_t *key,
931 const pj_sockaddr_t *addr,
932 int addrlen )
933{
934 pj_status_t rc;
935
936 /* check parameters. All must be specified! */
937 PJ_ASSERT_RETURN(ioqueue && key && addr && addrlen, PJ_EINVAL);
938
939 /* Connecting socket must have no other operation! */
940 PJ_ASSERT_RETURN(key->op == 0, PJ_EBUSY);
941
942 rc = pj_sock_connect(key->fd, addr, addrlen);
943 if (rc == PJ_SUCCESS) {
944 /* Connected! */
945 return PJ_SUCCESS;
946 } else {
947 if (rc == PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) ||
948 rc == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK))
949 {
950 /* Pending! */
951 pj_lock_acquire(ioqueue->lock);
952 key->op = PJ_IOQUEUE_OP_CONNECT;
953 PJ_FD_SET(key->fd, &ioqueue->wfdset);
954 PJ_FD_SET(key->fd, &ioqueue->xfdset);
955 pj_lock_release(ioqueue->lock);
956 return PJ_EPENDING;
957 } else {
958 /* Error! */
959 return rc;
960 }
961 }
962}
963#endif /* PJ_HAS_TCP */
964