blob: 24f9bfbb33536e430d330a4686febac0ea54559c [file] [log] [blame]
Benny Prijono4766ffe2005-11-01 17:56:59 +00001/* $Id$
2 *
Benny Prijonodd859a62005-11-01 16:42:51 +00003 */
Benny Prijonodd859a62005-11-01 16:42:51 +00004/*
5 * ioqueue_epoll.c
6 *
7 * This is the implementation of IOQueue framework using /dev/epoll
8 * API in _both_ Linux user-mode and kernel-mode.
9 */
10
11#include <pj/ioqueue.h>
12#include <pj/os.h>
13#include <pj/lock.h>
14#include <pj/log.h>
15#include <pj/list.h>
16#include <pj/pool.h>
17#include <pj/string.h>
18#include <pj/assert.h>
19#include <pj/errno.h>
20#include <pj/sock.h>
21#include <pj/compat/socket.h>
22
23#if !defined(PJ_LINUX_KERNEL) || PJ_LINUX_KERNEL==0
24 /*
25 * Linux user mode
26 */
27# include <sys/epoll.h>
28# include <errno.h>
29# include <unistd.h>
30
31# define epoll_data data.ptr
32# define epoll_data_type void*
33# define ioctl_val_type unsigned long*
34# define getsockopt_val_ptr int*
35# define os_getsockopt getsockopt
36# define os_ioctl ioctl
37# define os_read read
38# define os_close close
39# define os_epoll_create epoll_create
40# define os_epoll_ctl epoll_ctl
41# define os_epoll_wait epoll_wait
42#else
43 /*
44 * Linux kernel mode.
45 */
46# include <linux/config.h>
47# include <linux/version.h>
48# if defined(MODVERSIONS)
49# include <linux/modversions.h>
50# endif
51# include <linux/kernel.h>
52# include <linux/poll.h>
53# include <linux/eventpoll.h>
54# include <linux/syscalls.h>
55# include <linux/errno.h>
56# include <linux/unistd.h>
57# include <asm/ioctls.h>
58 enum EPOLL_EVENTS
59 {
60 EPOLLIN = 0x001,
61 EPOLLOUT = 0x004,
62 EPOLLERR = 0x008,
63 };
64# define os_epoll_create sys_epoll_create
65 static int os_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
66 {
67 long rc;
68 mm_segment_t oldfs = get_fs();
69 set_fs(KERNEL_DS);
70 rc = sys_epoll_ctl(epfd, op, fd, event);
71 set_fs(oldfs);
72 if (rc) {
73 errno = -rc;
74 return -1;
75 } else {
76 return 0;
77 }
78 }
79 static int os_epoll_wait(int epfd, struct epoll_event *events,
80 int maxevents, int timeout)
81 {
82 int count;
83 mm_segment_t oldfs = get_fs();
84 set_fs(KERNEL_DS);
85 count = sys_epoll_wait(epfd, events, maxevents, timeout);
86 set_fs(oldfs);
87 return count;
88 }
89# define os_close sys_close
90# define os_getsockopt pj_sock_getsockopt
91 static int os_read(int fd, void *buf, size_t len)
92 {
93 long rc;
94 mm_segment_t oldfs = get_fs();
95 set_fs(KERNEL_DS);
96 rc = sys_read(fd, buf, len);
97 set_fs(oldfs);
98 if (rc) {
99 errno = -rc;
100 return -1;
101 } else {
102 return 0;
103 }
104 }
105# define socklen_t unsigned
106# define ioctl_val_type unsigned long
107 int ioctl(int fd, int opt, ioctl_val_type value);
108 static int os_ioctl(int fd, int opt, ioctl_val_type value)
109 {
110 int rc;
111 mm_segment_t oldfs = get_fs();
112 set_fs(KERNEL_DS);
113 rc = ioctl(fd, opt, value);
114 set_fs(oldfs);
115 if (rc < 0) {
116 errno = -rc;
117 return rc;
118 } else
119 return rc;
120 }
121# define getsockopt_val_ptr char*
122
123# define epoll_data data
124# define epoll_data_type __u32
125#endif
126
127#define THIS_FILE "ioq_epoll"
128
129#define PJ_IOQUEUE_IS_READ_OP(op) ((op & PJ_IOQUEUE_OP_READ) || \
130 (op & PJ_IOQUEUE_OP_RECV) || \
131 (op & PJ_IOQUEUE_OP_RECV_FROM))
132#define PJ_IOQUEUE_IS_WRITE_OP(op) ((op & PJ_IOQUEUE_OP_WRITE) || \
133 (op & PJ_IOQUEUE_OP_SEND) || \
134 (op & PJ_IOQUEUE_OP_SEND_TO))
135
136
137#if PJ_HAS_TCP
138# define PJ_IOQUEUE_IS_ACCEPT_OP(op) (op & PJ_IOQUEUE_OP_ACCEPT)
139# define PJ_IOQUEUE_IS_CONNECT_OP(op) (op & PJ_IOQUEUE_OP_CONNECT)
140#else
141# define PJ_IOQUEUE_IS_ACCEPT_OP(op) 0
142# define PJ_IOQUEUE_IS_CONNECT_OP(op) 0
143#endif
144
145
146//#define TRACE_(expr) PJ_LOG(3,expr)
147#define TRACE_(expr)
148
149
150/*
151 * This describes each key.
152 */
153struct pj_ioqueue_key_t
154{
155 PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t)
156 pj_sock_t fd;
157 pj_ioqueue_operation_e op;
158 void *user_data;
159 pj_ioqueue_callback cb;
160
161 void *rd_buf;
162 unsigned rd_flags;
163 pj_size_t rd_buflen;
164 void *wr_buf;
165 pj_size_t wr_buflen;
166
167 pj_sockaddr_t *rmt_addr;
168 int *rmt_addrlen;
169
170 pj_sockaddr_t *local_addr;
171 int *local_addrlen;
172
173 pj_sock_t *accept_fd;
174};
175
176/*
177 * This describes the I/O queue.
178 */
179struct pj_ioqueue_t
180{
181 pj_lock_t *lock;
182 pj_bool_t auto_delete_lock;
183 unsigned max, count;
184 pj_ioqueue_key_t hlist;
185 int epfd;
186};
187
188/*
189 * pj_ioqueue_create()
190 *
191 * Create select ioqueue.
192 */
193PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
194 pj_size_t max_fd,
195 int max_threads,
196 pj_ioqueue_t **p_ioqueue)
197{
198 pj_ioqueue_t *ioque;
199 pj_status_t rc;
200
201 PJ_UNUSED_ARG(max_threads);
202
203 if (max_fd > PJ_IOQUEUE_MAX_HANDLES) {
204 pj_assert(!"max_fd too large");
205 return PJ_EINVAL;
206 }
207
208 ioque = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
209 ioque->max = max_fd;
210 ioque->count = 0;
211 pj_list_init(&ioque->hlist);
212
213 rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioque->lock);
214 if (rc != PJ_SUCCESS)
215 return rc;
216
217 ioque->auto_delete_lock = PJ_TRUE;
218 ioque->epfd = os_epoll_create(max_fd);
219 if (ioque->epfd < 0) {
220 return PJ_RETURN_OS_ERROR(pj_get_native_os_error());
221 }
222
223 PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioque));
224
225 *p_ioqueue = ioque;
226 return PJ_SUCCESS;
227}
228
229/*
230 * pj_ioqueue_destroy()
231 *
232 * Destroy ioqueue.
233 */
234PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioque)
235{
236 PJ_ASSERT_RETURN(ioque, PJ_EINVAL);
237 PJ_ASSERT_RETURN(ioque->epfd > 0, PJ_EINVALIDOP);
238
239 pj_lock_acquire(ioque->lock);
240 os_close(ioque->epfd);
241 ioque->epfd = 0;
242 if (ioque->auto_delete_lock)
243 pj_lock_destroy(ioque->lock);
244
245 return PJ_SUCCESS;
246}
247
248/*
249 * pj_ioqueue_set_lock()
250 */
251PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque,
252 pj_lock_t *lock,
253 pj_bool_t auto_delete )
254{
255 PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL);
256
257 if (ioque->auto_delete_lock) {
258 pj_lock_destroy(ioque->lock);
259 }
260
261 ioque->lock = lock;
262 ioque->auto_delete_lock = auto_delete;
263
264 return PJ_SUCCESS;
265}
266
267
268/*
269 * pj_ioqueue_register_sock()
270 *
271 * Register a socket to ioqueue.
272 */
273PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
274 pj_ioqueue_t *ioque,
275 pj_sock_t sock,
276 void *user_data,
277 const pj_ioqueue_callback *cb,
278 pj_ioqueue_key_t **p_key)
279{
280 pj_ioqueue_key_t *key = NULL;
281 pj_uint32_t value;
282 struct epoll_event ev;
283 int status;
284 pj_status_t rc = PJ_SUCCESS;
285
286 PJ_ASSERT_RETURN(pool && ioque && sock != PJ_INVALID_SOCKET &&
287 cb && p_key, PJ_EINVAL);
288
289 pj_lock_acquire(ioque->lock);
290
291 if (ioque->count >= ioque->max) {
292 rc = PJ_ETOOMANY;
293 TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: too many files"));
294 goto on_return;
295 }
296
297 /* Set socket to nonblocking. */
298 value = 1;
299 if ((rc=os_ioctl(sock, FIONBIO, (ioctl_val_type)&value))) {
300 TRACE_((THIS_FILE, "pj_ioqueue_register_sock error: ioctl rc=%d",
301 rc));
302 rc = pj_get_netos_error();
303 goto on_return;
304 }
305
306 /* Create key. */
307 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
308 key->fd = sock;
309 key->user_data = user_data;
310 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
311
312 /* os_epoll_ctl. */
313 ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;
314 ev.epoll_data = (epoll_data_type)key;
315 status = os_epoll_ctl(ioque->epfd, EPOLL_CTL_ADD, sock, &ev);
316 if (status < 0) {
317 rc = pj_get_os_error();
318 TRACE_((THIS_FILE,
319 "pj_ioqueue_register_sock error: os_epoll_ctl rc=%d",
320 status));
321 goto on_return;
322 }
323
324 /* Register */
325 pj_list_insert_before(&ioque->hlist, key);
326 ++ioque->count;
327
328on_return:
329 *p_key = key;
330 pj_lock_release(ioque->lock);
331
332 return rc;
333}
334
335/*
336 * pj_ioqueue_unregister()
337 *
338 * Unregister handle from ioqueue.
339 */
340PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque,
341 pj_ioqueue_key_t *key)
342{
343 struct epoll_event ev;
344 int status;
345
346 PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL);
347
348 pj_lock_acquire(ioque->lock);
349
350 pj_assert(ioque->count > 0);
351 --ioque->count;
352 pj_list_erase(key);
353
354 ev.events = 0;
355 ev.epoll_data = (epoll_data_type)key;
356 status = os_epoll_ctl( ioque->epfd, EPOLL_CTL_DEL, key->fd, &ev);
357 if (status != 0) {
358 pj_status_t rc = pj_get_os_error();
359 pj_lock_release(ioque->lock);
360 return rc;
361 }
362
363 pj_lock_release(ioque->lock);
364 return PJ_SUCCESS;
365}
366
367/*
368 * pj_ioqueue_get_user_data()
369 *
370 * Obtain value associated with a key.
371 */
372PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
373{
374 PJ_ASSERT_RETURN(key != NULL, NULL);
375 return key->user_data;
376}
377
378
379/*
380 * pj_ioqueue_poll()
381 *
382 */
383PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout)
384{
385 int i, count, processed;
386 struct epoll_event events[16];
387 int msec;
388
389 PJ_CHECK_STACK();
390
391 msec = timeout ? PJ_TIME_VAL_MSEC(*timeout) : 9000;
392
393 count = os_epoll_wait( ioque->epfd, events, PJ_ARRAY_SIZE(events), msec);
394 if (count <= 0)
395 return count;
396
397 /* Lock ioqueue. */
398 pj_lock_acquire(ioque->lock);
399
400 processed = 0;
401
402 for (i=0; i<count; ++i) {
403 pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type)
404 events[i].epoll_data;
405 pj_status_t rc;
406
407 /*
408 * Check for completion of read operations.
409 */
410 if ((events[i].events & EPOLLIN) && (PJ_IOQUEUE_IS_READ_OP(h->op))) {
411 pj_ssize_t bytes_read = h->rd_buflen;
412
413 if ((h->op & PJ_IOQUEUE_OP_RECV_FROM)) {
414 rc = pj_sock_recvfrom( h->fd, h->rd_buf, &bytes_read, 0,
415 h->rmt_addr, h->rmt_addrlen);
416 } else if ((h->op & PJ_IOQUEUE_OP_RECV)) {
417 rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0);
418 } else {
419 bytes_read = os_read( h->fd, h->rd_buf, bytes_read);
420 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
421 }
422
423 if (rc != PJ_SUCCESS) {
424 bytes_read = -rc;
425 }
426
427 h->op &= ~(PJ_IOQUEUE_OP_READ | PJ_IOQUEUE_OP_RECV |
428 PJ_IOQUEUE_OP_RECV_FROM);
429
430 /* Call callback. */
431 (*h->cb.on_read_complete)(h, bytes_read);
432
433 ++processed;
434 }
435 /*
436 * Check for completion of accept() operation.
437 */
438 else if ((events[i].events & EPOLLIN) &&
439 (h->op & PJ_IOQUEUE_OP_ACCEPT))
440 {
441 /* accept() must be the only operation specified on
442 * server socket
443 */
444 pj_assert( h->op == PJ_IOQUEUE_OP_ACCEPT);
445
446 rc = pj_sock_accept( h->fd, h->accept_fd,
447 h->rmt_addr, h->rmt_addrlen);
448 if (rc==PJ_SUCCESS && h->local_addr) {
449 rc = pj_sock_getsockname(*h->accept_fd, h->local_addr,
450 h->local_addrlen);
451 }
452
453 h->op &= ~(PJ_IOQUEUE_OP_ACCEPT);
454
455 /* Call callback. */
456 (*h->cb.on_accept_complete)(h, *h->accept_fd, rc);
457
458 ++processed;
459 }
460
461 /*
462 * Check for completion of write operations.
463 */
464 if ((events[i].events & EPOLLOUT) && PJ_IOQUEUE_IS_WRITE_OP(h->op)) {
465 /* Completion of write(), send(), or sendto() operation. */
466
467 /* Clear operation. */
468 h->op &= ~(PJ_IOQUEUE_OP_WRITE | PJ_IOQUEUE_OP_SEND |
469 PJ_IOQUEUE_OP_SEND_TO);
470
471 /* Call callback. */
472 /* All data must have been sent? */
473 (*h->cb.on_write_complete)(h, h->wr_buflen);
474
475 ++processed;
476 }
477#if PJ_HAS_TCP
478 /*
479 * Check for completion of connect() operation.
480 */
481 else if ((events[i].events & EPOLLOUT) &&
482 (h->op & PJ_IOQUEUE_OP_CONNECT))
483 {
484 /* Completion of connect() operation */
485 pj_ssize_t bytes_transfered;
486
487 /* from connect(2):
488 * On Linux, use getsockopt to read the SO_ERROR option at
489 * level SOL_SOCKET to determine whether connect() completed
490 * successfully (if SO_ERROR is zero).
491 */
492 int value;
493 socklen_t vallen = sizeof(value);
494 int gs_rc = os_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
495 (getsockopt_val_ptr)&value, &vallen);
496 if (gs_rc != 0) {
497 /* Argh!! What to do now???
498 * Just indicate that the socket is connected. The
499 * application will get error as soon as it tries to use
500 * the socket to send/receive.
501 */
502 bytes_transfered = 0;
503 } else {
504 bytes_transfered = value;
505 }
506
507 /* Clear operation. */
508 h->op &= (~PJ_IOQUEUE_OP_CONNECT);
509
510 /* Call callback. */
511 (*h->cb.on_connect_complete)(h, bytes_transfered);
512
513 ++processed;
514 }
515#endif /* PJ_HAS_TCP */
516
517 /*
518 * Check for error condition.
519 */
520 if (events[i].events & EPOLLERR) {
521 if (h->op & PJ_IOQUEUE_OP_CONNECT) {
522 h->op &= ~PJ_IOQUEUE_OP_CONNECT;
523
524 /* Call callback. */
525 (*h->cb.on_connect_complete)(h, -1);
526
527 ++processed;
528 }
529 }
530 }
531
532 pj_lock_release(ioque->lock);
533
534 return processed;
535}
536
537/*
538 * pj_ioqueue_read()
539 *
540 * Start asynchronous read from the descriptor.
541 */
542PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque,
543 pj_ioqueue_key_t *key,
544 void *buffer,
545 pj_size_t buflen)
546{
547 PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
548 PJ_CHECK_STACK();
549
550 /* For consistency with other ioqueue implementation, we would reject
551 * if descriptor has already been submitted for reading before.
552 */
553 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
554 (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
555 (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
556 PJ_EBUSY);
557
558 pj_lock_acquire(ioque->lock);
559
560 key->op |= PJ_IOQUEUE_OP_READ;
561 key->rd_flags = 0;
562 key->rd_buf = buffer;
563 key->rd_buflen = buflen;
564
565 pj_lock_release(ioque->lock);
566 return PJ_EPENDING;
567}
568
569
570/*
571 * pj_ioqueue_recv()
572 *
573 * Start asynchronous recv() from the socket.
574 */
575PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque,
576 pj_ioqueue_key_t *key,
577 void *buffer,
578 pj_size_t buflen,
579 unsigned flags )
580{
581 PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
582 PJ_CHECK_STACK();
583
584 /* For consistency with other ioqueue implementation, we would reject
585 * if descriptor has already been submitted for reading before.
586 */
587 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
588 (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
589 (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
590 PJ_EBUSY);
591
592 pj_lock_acquire(ioque->lock);
593
594 key->op |= PJ_IOQUEUE_OP_RECV;
595 key->rd_buf = buffer;
596 key->rd_buflen = buflen;
597 key->rd_flags = flags;
598
599 pj_lock_release(ioque->lock);
600 return PJ_EPENDING;
601}
602
603/*
604 * pj_ioqueue_recvfrom()
605 *
606 * Start asynchronous recvfrom() from the socket.
607 */
608PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque,
609 pj_ioqueue_key_t *key,
610 void *buffer,
611 pj_size_t buflen,
612 unsigned flags,
613 pj_sockaddr_t *addr,
614 int *addrlen)
615{
616 PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
617 PJ_CHECK_STACK();
618
619 /* For consistency with other ioqueue implementation, we would reject
620 * if descriptor has already been submitted for reading before.
621 */
622 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
623 (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
624 (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
625 PJ_EBUSY);
626
627 pj_lock_acquire(ioque->lock);
628
629 key->op |= PJ_IOQUEUE_OP_RECV_FROM;
630 key->rd_buf = buffer;
631 key->rd_buflen = buflen;
632 key->rd_flags = flags;
633 key->rmt_addr = addr;
634 key->rmt_addrlen = addrlen;
635
636 pj_lock_release(ioque->lock);
637 return PJ_EPENDING;
638}
639
640/*
641 * pj_ioqueue_write()
642 *
643 * Start asynchronous write() to the descriptor.
644 */
645PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque,
646 pj_ioqueue_key_t *key,
647 const void *data,
648 pj_size_t datalen)
649{
650 pj_status_t rc;
651 pj_ssize_t sent;
652
653 PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
654 PJ_CHECK_STACK();
655
656 /* For consistency with other ioqueue implementation, we would reject
657 * if descriptor has already been submitted for writing before.
658 */
659 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
660 (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
661 (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
662 PJ_EBUSY);
663
664 sent = datalen;
665 /* sent would be -1 after pj_sock_send() if it returns error. */
666 rc = pj_sock_send(key->fd, data, &sent, 0);
667 if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
668 return rc;
669 }
670
671 pj_lock_acquire(ioque->lock);
672
673 key->op |= PJ_IOQUEUE_OP_WRITE;
674 key->wr_buf = NULL;
675 key->wr_buflen = datalen;
676
677 pj_lock_release(ioque->lock);
678
679 return PJ_EPENDING;
680}
681
682/*
683 * pj_ioqueue_send()
684 *
685 * Start asynchronous send() to the descriptor.
686 */
687PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque,
688 pj_ioqueue_key_t *key,
689 const void *data,
690 pj_size_t datalen,
691 unsigned flags)
692{
693 pj_status_t rc;
694 pj_ssize_t sent;
695
696 PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
697 PJ_CHECK_STACK();
698
699 /* For consistency with other ioqueue implementation, we would reject
700 * if descriptor has already been submitted for writing before.
701 */
702 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
703 (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
704 (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
705 PJ_EBUSY);
706
707 sent = datalen;
708 /* sent would be -1 after pj_sock_send() if it returns error. */
709 rc = pj_sock_send(key->fd, data, &sent, flags);
710 if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
711 return rc;
712 }
713
714 pj_lock_acquire(ioque->lock);
715
716 key->op |= PJ_IOQUEUE_OP_SEND;
717 key->wr_buf = NULL;
718 key->wr_buflen = datalen;
719
720 pj_lock_release(ioque->lock);
721
722 return PJ_EPENDING;
723}
724
725
726/*
727 * pj_ioqueue_sendto()
728 *
729 * Start asynchronous write() to the descriptor.
730 */
731PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque,
732 pj_ioqueue_key_t *key,
733 const void *data,
734 pj_size_t datalen,
735 unsigned flags,
736 const pj_sockaddr_t *addr,
737 int addrlen)
738{
739 pj_status_t rc;
740 pj_ssize_t sent;
741
742 PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
743 PJ_CHECK_STACK();
744
745 /* For consistency with other ioqueue implementation, we would reject
746 * if descriptor has already been submitted for writing before.
747 */
748 PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
749 (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
750 (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
751 PJ_EBUSY);
752
753 sent = datalen;
754 /* sent would be -1 after pj_sock_sendto() if it returns error. */
755 rc = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
756 if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
757 return rc;
758 }
759
760 pj_lock_acquire(ioque->lock);
761
762 key->op |= PJ_IOQUEUE_OP_SEND_TO;
763 key->wr_buf = NULL;
764 key->wr_buflen = datalen;
765
766 pj_lock_release(ioque->lock);
767 return PJ_EPENDING;
768}
769
770#if PJ_HAS_TCP
771/*
772 * Initiate overlapped accept() operation.
773 */
774PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue,
775 pj_ioqueue_key_t *key,
776 pj_sock_t *new_sock,
777 pj_sockaddr_t *local,
778 pj_sockaddr_t *remote,
779 int *addrlen)
780{
781 /* check parameters. All must be specified! */
782 pj_assert(ioqueue && key && new_sock);
783
784 /* Server socket must have no other operation! */
785 pj_assert(key->op == 0);
786
787 pj_lock_acquire(ioqueue->lock);
788
789 key->op = PJ_IOQUEUE_OP_ACCEPT;
790 key->accept_fd = new_sock;
791 key->rmt_addr = remote;
792 key->rmt_addrlen = addrlen;
793 key->local_addr = local;
794 key->local_addrlen = addrlen; /* use same addr. as rmt_addrlen */
795
796 pj_lock_release(ioqueue->lock);
797 return PJ_EPENDING;
798}
799
800/*
801 * Initiate overlapped connect() operation (well, it's non-blocking actually,
802 * since there's no overlapped version of connect()).
803 */
804PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue,
805 pj_ioqueue_key_t *key,
806 const pj_sockaddr_t *addr,
807 int addrlen )
808{
809 pj_status_t rc;
810
811 /* check parameters. All must be specified! */
812 PJ_ASSERT_RETURN(ioqueue && key && addr && addrlen, PJ_EINVAL);
813
814 /* Connecting socket must have no other operation! */
815 PJ_ASSERT_RETURN(key->op == 0, PJ_EBUSY);
816
817 rc = pj_sock_connect(key->fd, addr, addrlen);
818 if (rc == PJ_SUCCESS) {
819 /* Connected! */
820 return PJ_SUCCESS;
821 } else {
822 if (rc == PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) ||
823 rc == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK))
824 {
825 /* Pending! */
826 pj_lock_acquire(ioqueue->lock);
827 key->op = PJ_IOQUEUE_OP_CONNECT;
828 pj_lock_release(ioqueue->lock);
829 return PJ_EPENDING;
830 } else {
831 /* Error! */
832 return rc;
833 }
834 }
835}
836#endif /* PJ_HAS_TCP */
837