blob: b41d05f0837214c7b24c29425e18ebcf7f859346 [file] [log] [blame]
Benny Prijonodd859a62005-11-01 16:42:51 +00001/* $Header: /pjproject-0.3/pjlib/src/pj/ioqueue_epoll.c 4 10/29/05 10:27p Bennylp $ */
2/*
3 * $Log: /pjproject-0.3/pjlib/src/pj/ioqueue_epoll.c $
4 *
5 * 4 10/29/05 10:27p Bennylp
6 * Fixed misc warnings.
7 *
8 * 3 10/29/05 11:49a Bennylp
9 * Fixed warnings.
10 *
11 * 2 10/29/05 11:31a Bennylp
12 * Changed accept and lock.
13 *
14 * 1 10/17/05 10:49p Bennylp
15 * Created.
16 *
17 */
18
19/*
20 * ioqueue_epoll.c
21 *
22 * This is the implementation of IOQueue framework using /dev/epoll
23 * API in _both_ Linux user-mode and kernel-mode.
24 */
25
26#include <pj/ioqueue.h>
27#include <pj/os.h>
28#include <pj/lock.h>
29#include <pj/log.h>
30#include <pj/list.h>
31#include <pj/pool.h>
32#include <pj/string.h>
33#include <pj/assert.h>
34#include <pj/errno.h>
35#include <pj/sock.h>
36#include <pj/compat/socket.h>
37
38#if !defined(PJ_LINUX_KERNEL) || PJ_LINUX_KERNEL==0
39 /*
40 * Linux user mode
41 */
42# include <sys/epoll.h>
43# include <errno.h>
44# include <unistd.h>
45
46# define epoll_data data.ptr
47# define epoll_data_type void*
48# define ioctl_val_type unsigned long*
49# define getsockopt_val_ptr int*
50# define os_getsockopt getsockopt
51# define os_ioctl ioctl
52# define os_read read
53# define os_close close
54# define os_epoll_create epoll_create
55# define os_epoll_ctl epoll_ctl
56# define os_epoll_wait epoll_wait
57#else
58 /*
59 * Linux kernel mode.
60 */
61# include <linux/config.h>
62# include <linux/version.h>
63# if defined(MODVERSIONS)
64# include <linux/modversions.h>
65# endif
66# include <linux/kernel.h>
67# include <linux/poll.h>
68# include <linux/eventpoll.h>
69# include <linux/syscalls.h>
70# include <linux/errno.h>
71# include <linux/unistd.h>
72# include <asm/ioctls.h>
73 enum EPOLL_EVENTS
74 {
75 EPOLLIN = 0x001,
76 EPOLLOUT = 0x004,
77 EPOLLERR = 0x008,
78 };
79# define os_epoll_create sys_epoll_create
80 static int os_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
81 {
82 long rc;
83 mm_segment_t oldfs = get_fs();
84 set_fs(KERNEL_DS);
85 rc = sys_epoll_ctl(epfd, op, fd, event);
86 set_fs(oldfs);
87 if (rc) {
88 errno = -rc;
89 return -1;
90 } else {
91 return 0;
92 }
93 }
94 static int os_epoll_wait(int epfd, struct epoll_event *events,
95 int maxevents, int timeout)
96 {
97 int count;
98 mm_segment_t oldfs = get_fs();
99 set_fs(KERNEL_DS);
100 count = sys_epoll_wait(epfd, events, maxevents, timeout);
101 set_fs(oldfs);
102 return count;
103 }
104# define os_close sys_close
105# define os_getsockopt pj_sock_getsockopt
106 static int os_read(int fd, void *buf, size_t len)
107 {
108 long rc;
109 mm_segment_t oldfs = get_fs();
110 set_fs(KERNEL_DS);
111 rc = sys_read(fd, buf, len);
112 set_fs(oldfs);
113 if (rc) {
114 errno = -rc;
115 return -1;
116 } else {
117 return 0;
118 }
119 }
120# define socklen_t unsigned
121# define ioctl_val_type unsigned long
122 int ioctl(int fd, int opt, ioctl_val_type value);
123 static int os_ioctl(int fd, int opt, ioctl_val_type value)
124 {
125 int rc;
126 mm_segment_t oldfs = get_fs();
127 set_fs(KERNEL_DS);
128 rc = ioctl(fd, opt, value);
129 set_fs(oldfs);
130 if (rc < 0) {
131 errno = -rc;
132 return rc;
133 } else
134 return rc;
135 }
136# define getsockopt_val_ptr char*
137
138# define epoll_data data
139# define epoll_data_type __u32
140#endif
141
142#define THIS_FILE "ioq_epoll"
143
144#define PJ_IOQUEUE_IS_READ_OP(op) ((op & PJ_IOQUEUE_OP_READ) || \
145 (op & PJ_IOQUEUE_OP_RECV) || \
146 (op & PJ_IOQUEUE_OP_RECV_FROM))
147#define PJ_IOQUEUE_IS_WRITE_OP(op) ((op & PJ_IOQUEUE_OP_WRITE) || \
148 (op & PJ_IOQUEUE_OP_SEND) || \
149 (op & PJ_IOQUEUE_OP_SEND_TO))
150
151
152#if PJ_HAS_TCP
153# define PJ_IOQUEUE_IS_ACCEPT_OP(op) (op & PJ_IOQUEUE_OP_ACCEPT)
154# define PJ_IOQUEUE_IS_CONNECT_OP(op) (op & PJ_IOQUEUE_OP_CONNECT)
155#else
156# define PJ_IOQUEUE_IS_ACCEPT_OP(op) 0
157# define PJ_IOQUEUE_IS_CONNECT_OP(op) 0
158#endif
159
160
161//#define TRACE_(expr) PJ_LOG(3,expr)
162#define TRACE_(expr)
163
164
165/*
166 * This describes each key.
167 */
168struct pj_ioqueue_key_t
169{
170 PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t)
171 pj_sock_t fd;
172 pj_ioqueue_operation_e op;
173 void *user_data;
174 pj_ioqueue_callback cb;
175
176 void *rd_buf;
177 unsigned rd_flags;
178 pj_size_t rd_buflen;
179 void *wr_buf;
180 pj_size_t wr_buflen;
181
182 pj_sockaddr_t *rmt_addr;
183 int *rmt_addrlen;
184
185 pj_sockaddr_t *local_addr;
186 int *local_addrlen;
187
188 pj_sock_t *accept_fd;
189};
190
191/*
192 * This describes the I/O queue.
193 */
194struct pj_ioqueue_t
195{
196 pj_lock_t *lock;
197 pj_bool_t auto_delete_lock;
198 unsigned max, count;
199 pj_ioqueue_key_t hlist;
200 int epfd;
201};
202
203/*
204 * pj_ioqueue_create()
205 *
206 * Create select ioqueue.
207 */
208PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
209 pj_size_t max_fd,
210 int max_threads,
211 pj_ioqueue_t **p_ioqueue)
212{
213 pj_ioqueue_t *ioque;
214 pj_status_t rc;
215
216 PJ_UNUSED_ARG(max_threads);
217
218 if (max_fd > PJ_IOQUEUE_MAX_HANDLES) {
219 pj_assert(!"max_fd too large");
220 return PJ_EINVAL;
221 }
222
223 ioque = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
224 ioque->max = max_fd;
225 ioque->count = 0;
226 pj_list_init(&ioque->hlist);
227
228 rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioque->lock);
229 if (rc != PJ_SUCCESS)
230 return rc;
231
232 ioque->auto_delete_lock = PJ_TRUE;
233 ioque->epfd = os_epoll_create(max_fd);
234 if (ioque->epfd < 0) {
235 return PJ_RETURN_OS_ERROR(pj_get_native_os_error());
236 }
237
238 PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioque));
239
240 *p_ioqueue = ioque;
241 return PJ_SUCCESS;
242}
243
244/*
245 * pj_ioqueue_destroy()
246 *
247 * Destroy ioqueue.
248 */
249PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioque)
250{
251 PJ_ASSERT_RETURN(ioque, PJ_EINVAL);
252 PJ_ASSERT_RETURN(ioque->epfd > 0, PJ_EINVALIDOP);
253
254 pj_lock_acquire(ioque->lock);
255 os_close(ioque->epfd);
256 ioque->epfd = 0;
257 if (ioque->auto_delete_lock)
258 pj_lock_destroy(ioque->lock);
259
260 return PJ_SUCCESS;
261}
262
263/*
264 * pj_ioqueue_set_lock()
265 */
266PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque,
267 pj_lock_t *lock,
268 pj_bool_t auto_delete )
269{
270 PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL);
271
272 if (ioque->auto_delete_lock) {
273 pj_lock_destroy(ioque->lock);
274 }
275
276 ioque->lock = lock;
277 ioque->auto_delete_lock = auto_delete;
278
279 return PJ_SUCCESS;
280}
281
282
283/*
284 * pj_ioqueue_register_sock()
285 *
286 * Register a socket to ioqueue.
287 */
288PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
289 pj_ioqueue_t *ioque,
290 pj_sock_t sock,
291 void *user_data,
292 const pj_ioqueue_callback *cb,
293 pj_ioqueue_key_t **p_key)
294{
295 pj_ioqueue_key_t *key = NULL;
296 pj_uint32_t value;
297 struct epoll_event ev;
298 int status;
299 pj_status_t rc = PJ_SUCCESS;
300
301 PJ_ASSERT_RETURN(pool && ioque && sock != PJ_INVALID_SOCKET &&
302 cb && p_key, PJ_EINVAL);
303
304 pj_lock_acquire(ioque->lock);
305
306 if (ioque->count >= ioque->max) {
307 rc = PJ_ETOOMANY;
308 TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: too many files"));
309 goto on_return;
310 }
311
312 /* Set socket to nonblocking. */
313 value = 1;
314 if ((rc=os_ioctl(sock, FIONBIO, (ioctl_val_type)&value))) {
315 TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: ioctl rc=%d",
316 rc));
317 rc = pj_get_netos_error();
318 goto on_return;
319 }
320
321 /* Create key. */
322 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
323 key->fd = sock;
324 key->user_data = user_data;
325 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
326
327 /* os_epoll_ctl. */
328 ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;
329 ev.epoll_data = (epoll_data_type)key;
330 status = os_epoll_ctl(ioque->epfd, EPOLL_CTL_ADD, sock, &ev);
331 if (status < 0) {
332 rc = pj_get_os_error();
333 TRACE_((THIS_FILE,
334 "pj_ioqueue_register_sock error: os_epoll_ctl rc=%d",
335 status));
336 goto on_return;
337 }
338
339 /* Register */
340 pj_list_insert_before(&ioque->hlist, key);
341 ++ioque->count;
342
343on_return:
344 *p_key = key;
345 pj_lock_release(ioque->lock);
346
347 return rc;
348}
349
350/*
351 * pj_ioqueue_unregister()
352 *
353 * Unregister handle from ioqueue.
354 */
355PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque,
356 pj_ioqueue_key_t *key)
357{
358 struct epoll_event ev;
359 int status;
360
361 PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL);
362
363 pj_lock_acquire(ioque->lock);
364
365 pj_assert(ioque->count > 0);
366 --ioque->count;
367 pj_list_erase(key);
368
369 ev.events = 0;
370 ev.epoll_data = (epoll_data_type)key;
371 status = os_epoll_ctl( ioque->epfd, EPOLL_CTL_DEL, key->fd, &ev);
372 if (status != 0) {
373 pj_status_t rc = pj_get_os_error();
374 pj_lock_release(ioque->lock);
375 return rc;
376 }
377
378 pj_lock_release(ioque->lock);
379 return PJ_SUCCESS;
380}
381
382/*
383 * pj_ioqueue_get_user_data()
384 *
385 * Obtain value associated with a key.
386 */
387PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
388{
389 PJ_ASSERT_RETURN(key != NULL, NULL);
390 return key->user_data;
391}
392
393
394/*
395 * pj_ioqueue_poll()
396 *
397 */
398PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout)
399{
400 int i, count, processed;
401 struct epoll_event events[16];
402 int msec;
403
404 PJ_CHECK_STACK();
405
406 msec = timeout ? PJ_TIME_VAL_MSEC(*timeout) : 9000;
407
408 count = os_epoll_wait( ioque->epfd, events, PJ_ARRAY_SIZE(events), msec);
409 if (count <= 0)
410 return count;
411
412 /* Lock ioqueue. */
413 pj_lock_acquire(ioque->lock);
414
415 processed = 0;
416
417 for (i=0; i<count; ++i) {
418 pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type)
419 events[i].epoll_data;
420 pj_status_t rc;
421
422 /*
423 * Check for completion of read operations.
424 */
425 if ((events[i].events & EPOLLIN) && (PJ_IOQUEUE_IS_READ_OP(h->op))) {
426 pj_ssize_t bytes_read = h->rd_buflen;
427
428 if ((h->op & PJ_IOQUEUE_OP_RECV_FROM)) {
429 rc = pj_sock_recvfrom( h->fd, h->rd_buf, &bytes_read, 0,
430 h->rmt_addr, h->rmt_addrlen);
431 } else if ((h->op & PJ_IOQUEUE_OP_RECV)) {
432 rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0);
433 } else {
434 bytes_read = os_read( h->fd, h->rd_buf, bytes_read);
435 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
436 }
437
438 if (rc != PJ_SUCCESS) {
439 bytes_read = -rc;
440 }
441
442 h->op &= ~(PJ_IOQUEUE_OP_READ | PJ_IOQUEUE_OP_RECV |
443 PJ_IOQUEUE_OP_RECV_FROM);
444
445 /* Call callback. */
446 (*h->cb.on_read_complete)(h, bytes_read);
447
448 ++processed;
449 }
450 /*
451 * Check for completion of accept() operation.
452 */
453 else if ((events[i].events & EPOLLIN) &&
454 (h->op & PJ_IOQUEUE_OP_ACCEPT))
455 {
456 /* accept() must be the only operation specified on
457 * server socket
458 */
459 pj_assert( h->op == PJ_IOQUEUE_OP_ACCEPT);
460
461 rc = pj_sock_accept( h->fd, h->accept_fd,
462 h->rmt_addr, h->rmt_addrlen);
463 if (rc==PJ_SUCCESS && h->local_addr) {
464 rc = pj_sock_getsockname(*h->accept_fd, h->local_addr,
465 h->local_addrlen);
466 }
467
468 h->op &= ~(PJ_IOQUEUE_OP_ACCEPT);
469
470 /* Call callback. */
471 (*h->cb.on_accept_complete)(h, *h->accept_fd, rc);
472
473 ++processed;
474 }
475
476 /*
477 * Check for completion of write operations.
478 */
479 if ((events[i].events & EPOLLOUT) && PJ_IOQUEUE_IS_WRITE_OP(h->op)) {
480 /* Completion of write(), send(), or sendto() operation. */
481
482 /* Clear operation. */
483 h->op &= ~(PJ_IOQUEUE_OP_WRITE | PJ_IOQUEUE_OP_SEND |
484 PJ_IOQUEUE_OP_SEND_TO);
485
486 /* Call callback. */
487 /* All data must have been sent? */
488 (*h->cb.on_write_complete)(h, h->wr_buflen);
489
490 ++processed;
491 }
492#if PJ_HAS_TCP
493 /*
494 * Check for completion of connect() operation.
495 */
496 else if ((events[i].events & EPOLLOUT) &&
497 (h->op & PJ_IOQUEUE_OP_CONNECT))
498 {
499 /* Completion of connect() operation */
500 pj_ssize_t bytes_transfered;
501
502 /* from connect(2):
503 * On Linux, use getsockopt to read the SO_ERROR option at
504 * level SOL_SOCKET to determine whether connect() completed
505 * successfully (if SO_ERROR is zero).
506 */
507 int value;
508 socklen_t vallen = sizeof(value);
509 int gs_rc = os_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
510 (getsockopt_val_ptr)&value, &vallen);
511 if (gs_rc != 0) {
512 /* Argh!! What to do now???
513 * Just indicate that the socket is connected. The
514 * application will get error as soon as it tries to use
515 * the socket to send/receive.
516 */
517 bytes_transfered = 0;
518 } else {
519 bytes_transfered = value;
520 }
521
522 /* Clear operation. */
523 h->op &= (~PJ_IOQUEUE_OP_CONNECT);
524
525 /* Call callback. */
526 (*h->cb.on_connect_complete)(h, bytes_transfered);
527
528 ++processed;
529 }
530#endif /* PJ_HAS_TCP */
531
532 /*
533 * Check for error condition.
534 */
535 if (events[i].events & EPOLLERR) {
536 if (h->op & PJ_IOQUEUE_OP_CONNECT) {
537 h->op &= ~PJ_IOQUEUE_OP_CONNECT;
538
539 /* Call callback. */
540 (*h->cb.on_connect_complete)(h, -1);
541
542 ++processed;
543 }
544 }
545 }
546
547 pj_lock_release(ioque->lock);
548
549 return processed;
550}
551
552/*
553 * pj_ioqueue_read()
554 *
555 * Start asynchronous read from the descriptor.
556 */
557PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque,
558 pj_ioqueue_key_t *key,
559 void *buffer,
560 pj_size_t buflen)
561{
562 PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
563 PJ_CHECK_STACK();
564
565 /* For consistency with other ioqueue implementation, we would reject
566 * if descriptor has already been submitted for reading before.
567 */
568 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
569 (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
570 (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
571 PJ_EBUSY);
572
573 pj_lock_acquire(ioque->lock);
574
575 key->op |= PJ_IOQUEUE_OP_READ;
576 key->rd_flags = 0;
577 key->rd_buf = buffer;
578 key->rd_buflen = buflen;
579
580 pj_lock_release(ioque->lock);
581 return PJ_EPENDING;
582}
583
584
585/*
586 * pj_ioqueue_recv()
587 *
588 * Start asynchronous recv() from the socket.
589 */
590PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque,
591 pj_ioqueue_key_t *key,
592 void *buffer,
593 pj_size_t buflen,
594 unsigned flags )
595{
596 PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
597 PJ_CHECK_STACK();
598
599 /* For consistency with other ioqueue implementation, we would reject
600 * if descriptor has already been submitted for reading before.
601 */
602 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
603 (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
604 (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
605 PJ_EBUSY);
606
607 pj_lock_acquire(ioque->lock);
608
609 key->op |= PJ_IOQUEUE_OP_RECV;
610 key->rd_buf = buffer;
611 key->rd_buflen = buflen;
612 key->rd_flags = flags;
613
614 pj_lock_release(ioque->lock);
615 return PJ_EPENDING;
616}
617
618/*
619 * pj_ioqueue_recvfrom()
620 *
621 * Start asynchronous recvfrom() from the socket.
622 */
623PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque,
624 pj_ioqueue_key_t *key,
625 void *buffer,
626 pj_size_t buflen,
627 unsigned flags,
628 pj_sockaddr_t *addr,
629 int *addrlen)
630{
631 PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
632 PJ_CHECK_STACK();
633
634 /* For consistency with other ioqueue implementation, we would reject
635 * if descriptor has already been submitted for reading before.
636 */
637 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
638 (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
639 (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
640 PJ_EBUSY);
641
642 pj_lock_acquire(ioque->lock);
643
644 key->op |= PJ_IOQUEUE_OP_RECV_FROM;
645 key->rd_buf = buffer;
646 key->rd_buflen = buflen;
647 key->rd_flags = flags;
648 key->rmt_addr = addr;
649 key->rmt_addrlen = addrlen;
650
651 pj_lock_release(ioque->lock);
652 return PJ_EPENDING;
653}
654
655/*
656 * pj_ioqueue_write()
657 *
658 * Start asynchronous write() to the descriptor.
659 */
660PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque,
661 pj_ioqueue_key_t *key,
662 const void *data,
663 pj_size_t datalen)
664{
665 pj_status_t rc;
666 pj_ssize_t sent;
667
668 PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
669 PJ_CHECK_STACK();
670
671 /* For consistency with other ioqueue implementation, we would reject
672 * if descriptor has already been submitted for writing before.
673 */
674 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
675 (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
676 (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
677 PJ_EBUSY);
678
679 sent = datalen;
680 /* sent would be -1 after pj_sock_send() if it returns error. */
681 rc = pj_sock_send(key->fd, data, &sent, 0);
682 if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
683 return rc;
684 }
685
686 pj_lock_acquire(ioque->lock);
687
688 key->op |= PJ_IOQUEUE_OP_WRITE;
689 key->wr_buf = NULL;
690 key->wr_buflen = datalen;
691
692 pj_lock_release(ioque->lock);
693
694 return PJ_EPENDING;
695}
696
697/*
698 * pj_ioqueue_send()
699 *
700 * Start asynchronous send() to the descriptor.
701 */
702PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque,
703 pj_ioqueue_key_t *key,
704 const void *data,
705 pj_size_t datalen,
706 unsigned flags)
707{
708 pj_status_t rc;
709 pj_ssize_t sent;
710
711 PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
712 PJ_CHECK_STACK();
713
714 /* For consistency with other ioqueue implementation, we would reject
715 * if descriptor has already been submitted for writing before.
716 */
717 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
718 (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
719 (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
720 PJ_EBUSY);
721
722 sent = datalen;
723 /* sent would be -1 after pj_sock_send() if it returns error. */
724 rc = pj_sock_send(key->fd, data, &sent, flags);
725 if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
726 return rc;
727 }
728
729 pj_lock_acquire(ioque->lock);
730
731 key->op |= PJ_IOQUEUE_OP_SEND;
732 key->wr_buf = NULL;
733 key->wr_buflen = datalen;
734
735 pj_lock_release(ioque->lock);
736
737 return PJ_EPENDING;
738}
739
740
741/*
742 * pj_ioqueue_sendto()
743 *
744 * Start asynchronous write() to the descriptor.
745 */
746PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque,
747 pj_ioqueue_key_t *key,
748 const void *data,
749 pj_size_t datalen,
750 unsigned flags,
751 const pj_sockaddr_t *addr,
752 int addrlen)
753{
754 pj_status_t rc;
755 pj_ssize_t sent;
756
757 PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
758 PJ_CHECK_STACK();
759
760 /* For consistency with other ioqueue implementation, we would reject
761 * if descriptor has already been submitted for writing before.
762 */
763 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
764 (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
765 (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
766 PJ_EBUSY);
767
768 sent = datalen;
769 /* sent would be -1 after pj_sock_sendto() if it returns error. */
770 rc = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
771 if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
772 return rc;
773 }
774
775 pj_lock_acquire(ioque->lock);
776
777 key->op |= PJ_IOQUEUE_OP_SEND_TO;
778 key->wr_buf = NULL;
779 key->wr_buflen = datalen;
780
781 pj_lock_release(ioque->lock);
782 return PJ_EPENDING;
783}
784
785#if PJ_HAS_TCP
786/*
787 * Initiate overlapped accept() operation.
788 */
789PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue,
790 pj_ioqueue_key_t *key,
791 pj_sock_t *new_sock,
792 pj_sockaddr_t *local,
793 pj_sockaddr_t *remote,
794 int *addrlen)
795{
796 /* check parameters. All must be specified! */
797 pj_assert(ioqueue && key && new_sock);
798
799 /* Server socket must have no other operation! */
800 pj_assert(key->op == 0);
801
802 pj_lock_acquire(ioqueue->lock);
803
804 key->op = PJ_IOQUEUE_OP_ACCEPT;
805 key->accept_fd = new_sock;
806 key->rmt_addr = remote;
807 key->rmt_addrlen = addrlen;
808 key->local_addr = local;
809 key->local_addrlen = addrlen; /* use same addr. as rmt_addrlen */
810
811 pj_lock_release(ioqueue->lock);
812 return PJ_EPENDING;
813}
814
815/*
816 * Initiate overlapped connect() operation (well, it's non-blocking actually,
817 * since there's no overlapped version of connect()).
818 */
819PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue,
820 pj_ioqueue_key_t *key,
821 const pj_sockaddr_t *addr,
822 int addrlen )
823{
824 pj_status_t rc;
825
826 /* check parameters. All must be specified! */
827 PJ_ASSERT_RETURN(ioqueue && key && addr && addrlen, PJ_EINVAL);
828
829 /* Connecting socket must have no other operation! */
830 PJ_ASSERT_RETURN(key->op == 0, PJ_EBUSY);
831
832 rc = pj_sock_connect(key->fd, addr, addrlen);
833 if (rc == PJ_SUCCESS) {
834 /* Connected! */
835 return PJ_SUCCESS;
836 } else {
837 if (rc == PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) ||
838 rc == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK))
839 {
840 /* Pending! */
841 pj_lock_acquire(ioqueue->lock);
842 key->op = PJ_IOQUEUE_OP_CONNECT;
843 pj_lock_release(ioqueue->lock);
844 return PJ_EPENDING;
845 } else {
846 /* Error! */
847 return rc;
848 }
849 }
850}
851#endif /* PJ_HAS_TCP */
852