blob: 4cffcae469eabfeb4007ac5d62e14faba493197b [file] [log] [blame]
Benny Prijonobc986152005-11-06 16:50:38 +00001/* $Id$ */
2
3/*
4 * ioqueue_common_abs.c
5 *
6 * This contains common functionalities to emulate proactor pattern with
7 * various event dispatching mechanisms (e.g. select, epoll).
8 *
9 * This file will be included by the appropriate ioqueue implementation.
10 * This file is NOT supposed to be compiled as stand-alone source.
11 */
12
13static void ioqueue_init( pj_ioqueue_t *ioqueue )
14{
15 ioqueue->lock = NULL;
16 ioqueue->auto_delete_lock = 0;
17}
18
19static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
20{
Benny Prijono0e641132005-11-08 12:46:10 +000021 if (ioqueue->auto_delete_lock && ioqueue->lock ) {
22 pj_lock_release(ioqueue->lock);
Benny Prijonobc986152005-11-06 16:50:38 +000023 return pj_lock_destroy(ioqueue->lock);
Benny Prijono0e641132005-11-08 12:46:10 +000024 } else
Benny Prijonobc986152005-11-06 16:50:38 +000025 return PJ_SUCCESS;
26}
27
28/*
29 * pj_ioqueue_set_lock()
30 */
31PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
32 pj_lock_t *lock,
33 pj_bool_t auto_delete )
34{
35 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
36
37 if (ioqueue->auto_delete_lock && ioqueue->lock) {
38 pj_lock_destroy(ioqueue->lock);
39 }
40
41 ioqueue->lock = lock;
42 ioqueue->auto_delete_lock = auto_delete;
43
44 return PJ_SUCCESS;
45}
46
47static pj_status_t ioqueue_init_key( pj_pool_t *pool,
48 pj_ioqueue_t *ioqueue,
49 pj_ioqueue_key_t *key,
50 pj_sock_t sock,
51 void *user_data,
52 const pj_ioqueue_callback *cb)
53{
54 pj_status_t rc;
55 int optlen;
56
57 key->ioqueue = ioqueue;
58 key->fd = sock;
59 key->user_data = user_data;
60 pj_list_init(&key->read_list);
61 pj_list_init(&key->write_list);
62#if PJ_HAS_TCP
63 pj_list_init(&key->accept_list);
64#endif
65
66 /* Save callback. */
67 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
68
69 /* Get socket type. When socket type is datagram, some optimization
70 * will be performed during send to allow parallel send operations.
71 */
72 optlen = sizeof(key->fd_type);
73 rc = pj_sock_getsockopt(sock, PJ_SOL_SOCKET, PJ_SO_TYPE,
74 &key->fd_type, &optlen);
75 if (rc != PJ_SUCCESS)
76 key->fd_type = PJ_SOCK_STREAM;
77
78 /* Create mutex for the key. */
79 rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
80
81 return rc;
82}
83
84static void ioqueue_destroy_key( pj_ioqueue_key_t *key )
85{
86 pj_mutex_destroy(key->mutex);
87}
88
89/*
90 * pj_ioqueue_get_user_data()
91 *
92 * Obtain value associated with a key.
93 */
94PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
95{
96 PJ_ASSERT_RETURN(key != NULL, NULL);
97 return key->user_data;
98}
99
100/*
101 * pj_ioqueue_set_user_data()
102 */
103PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
104 void *user_data,
105 void **old_data)
106{
107 PJ_ASSERT_RETURN(key, PJ_EINVAL);
108
109 if (old_data)
110 *old_data = key->user_data;
111 key->user_data = user_data;
112
113 return PJ_SUCCESS;
114}
115
116PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
117{
118 return !pj_list_empty(&key->write_list);
119}
120
121PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
122{
123 return !pj_list_empty(&key->read_list);
124}
125
126PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
127{
128#if PJ_HAS_TCP
129 return !pj_list_empty(&key->accept_list);
130#else
131 return 0;
132#endif
133}
134
135PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
136{
137 return key->connecting;
138}
139
140
141/*
142 * ioqueue_dispatch_event()
143 *
144 * Report occurence of an event in the key to be processed by the
145 * framework.
146 */
147void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
148{
149 /* Lock the key. */
150 pj_mutex_lock(h->mutex);
151
152#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
153 if (h->connecting) {
154 /* Completion of connect() operation */
155 pj_ssize_t bytes_transfered;
156
157 /* Clear operation. */
158 h->connecting = 0;
159
160 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
161 ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
162
163 /* Unlock; from this point we don't need to hold key's mutex. */
164 pj_mutex_unlock(h->mutex);
165
166#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
167 /* from connect(2):
168 * On Linux, use getsockopt to read the SO_ERROR option at
169 * level SOL_SOCKET to determine whether connect() completed
170 * successfully (if SO_ERROR is zero).
171 */
Benny Prijono1a73a052005-11-07 21:58:51 +0000172 {
173 int value;
174 socklen_t vallen = sizeof(value);
175 int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
Benny Prijonobc986152005-11-06 16:50:38 +0000176 &value, &vallen);
Benny Prijono1a73a052005-11-07 21:58:51 +0000177 if (gs_rc != 0) {
Benny Prijonobc986152005-11-06 16:50:38 +0000178 /* Argh!! What to do now???
179 * Just indicate that the socket is connected. The
180 * application will get error as soon as it tries to use
181 * the socket to send/receive.
182 */
183 bytes_transfered = 0;
Benny Prijono1a73a052005-11-07 21:58:51 +0000184 } else {
Benny Prijonobc986152005-11-06 16:50:38 +0000185 bytes_transfered = value;
Benny Prijono1a73a052005-11-07 21:58:51 +0000186 }
187 }
Benny Prijonobc986152005-11-06 16:50:38 +0000188#elif defined(PJ_WIN32) && PJ_WIN32!=0
189 bytes_transfered = 0; /* success */
190#else
191 /* Excellent information in D.J. Bernstein page:
192 * http://cr.yp.to/docs/connect.html
193 *
194 * Seems like the most portable way of detecting connect()
195 * failure is to call getpeername(). If socket is connected,
196 * getpeername() will return 0. If the socket is not connected,
197 * it will return ENOTCONN, and read(fd, &ch, 1) will produce
198 * the right errno through error slippage. This is a combination
199 * of suggestions from Douglas C. Schmidt and Ken Keys.
200 */
201 int gp_rc;
202 struct sockaddr_in addr;
203 socklen_t addrlen = sizeof(addr);
204
205 gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
206 bytes_transfered = gp_rc;
207#endif
208
209 /* Call callback. */
210 if (h->cb.on_connect_complete)
211 (*h->cb.on_connect_complete)(h, bytes_transfered);
212
213 /* Done. */
214
215 } else
216#endif /* PJ_HAS_TCP */
217 if (key_has_pending_write(h)) {
218 /* Socket is writable. */
219 struct write_operation *write_op;
220 pj_ssize_t sent;
221 pj_status_t send_rc;
222
223 /* Get the first in the queue. */
224 write_op = h->write_list.next;
225
226 /* For datagrams, we can remove the write_op from the list
227 * so that send() can work in parallel.
228 */
229 if (h->fd_type == PJ_SOCK_DGRAM) {
230 pj_list_erase(write_op);
231 if (pj_list_empty(&h->write_list))
232 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
233
234 pj_mutex_unlock(h->mutex);
235 }
236
237 /* Send the data.
238 * Unfortunately we must do this while holding key's mutex, thus
239 * preventing parallel write on a single key.. :-((
240 */
241 sent = write_op->size - write_op->written;
242 if (write_op->op == PJ_IOQUEUE_OP_SEND) {
243 send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
244 &sent, write_op->flags);
245 } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
246 send_rc = pj_sock_sendto(h->fd,
247 write_op->buf+write_op->written,
248 &sent, write_op->flags,
249 &write_op->rmt_addr,
250 write_op->rmt_addrlen);
251 } else {
252 pj_assert(!"Invalid operation type!");
253 send_rc = PJ_EBUG;
254 }
255
256 if (send_rc == PJ_SUCCESS) {
257 write_op->written += sent;
258 } else {
259 pj_assert(send_rc > 0);
260 write_op->written = -send_rc;
261 }
262
263 /* Are we finished with this buffer? */
264 if (send_rc!=PJ_SUCCESS ||
265 write_op->written == (pj_ssize_t)write_op->size ||
266 h->fd_type == PJ_SOCK_DGRAM)
267 {
268 if (h->fd_type != PJ_SOCK_DGRAM) {
269 /* Write completion of the whole stream. */
270 pj_list_erase(write_op);
271
272 /* Clear operation if there's no more data to send. */
273 if (pj_list_empty(&h->write_list))
274 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
275
276 /* No need to hold mutex anymore */
277 pj_mutex_unlock(h->mutex);
278 }
279
280 /* Call callback. */
281 if (h->cb.on_write_complete) {
282 (*h->cb.on_write_complete)(h,
283 (pj_ioqueue_op_key_t*)write_op,
284 write_op->written);
285 }
286
287 } else {
288 pj_mutex_unlock(h->mutex);
289 }
290
291 /* Done. */
292 } else {
Benny Prijono6782e1d2005-11-06 19:46:48 +0000293 /*
294 * This is normal; execution may fall here when multiple threads
295 * are signalled for the same event, but only one thread eventually
296 * able to process the event.
297 */
Benny Prijonobc986152005-11-06 16:50:38 +0000298 pj_mutex_unlock(h->mutex);
299 }
300}
301
302void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
303{
304 pj_status_t rc;
305
306 /* Lock the key. */
307 pj_mutex_lock(h->mutex);
308
309# if PJ_HAS_TCP
310 if (!pj_list_empty(&h->accept_list)) {
311
312 struct accept_operation *accept_op;
313
314 /* Get one accept operation from the list. */
315 accept_op = h->accept_list.next;
316 pj_list_erase(accept_op);
317
318 /* Clear bit in fdset if there is no more pending accept */
319 if (pj_list_empty(&h->accept_list))
320 ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
321
322 /* Unlock; from this point we don't need to hold key's mutex. */
323 pj_mutex_unlock(h->mutex);
324
325 rc=pj_sock_accept(h->fd, accept_op->accept_fd,
326 accept_op->rmt_addr, accept_op->addrlen);
327 if (rc==PJ_SUCCESS && accept_op->local_addr) {
328 rc = pj_sock_getsockname(*accept_op->accept_fd,
329 accept_op->local_addr,
330 accept_op->addrlen);
331 }
332
333 /* Call callback. */
334 if (h->cb.on_accept_complete) {
335 (*h->cb.on_accept_complete)(h,
336 (pj_ioqueue_op_key_t*)accept_op,
337 *accept_op->accept_fd, rc);
338 }
339
340 }
341 else
342# endif
343 if (key_has_pending_read(h)) {
344 struct read_operation *read_op;
345 pj_ssize_t bytes_read;
346
347 /* Get one pending read operation from the list. */
348 read_op = h->read_list.next;
349 pj_list_erase(read_op);
350
351 /* Clear fdset if there is no pending read. */
352 if (pj_list_empty(&h->read_list))
353 ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
354
355 /* Unlock; from this point we don't need to hold key's mutex. */
356 pj_mutex_unlock(h->mutex);
357
358 bytes_read = read_op->size;
359
360 if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
361 rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0,
362 read_op->rmt_addr,
363 read_op->rmt_addrlen);
364 } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
365 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
366 } else {
367 pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
368 /*
369 * User has specified pj_ioqueue_read().
370 * On Win32, we should do ReadFile(). But because we got
371 * here because of select() anyway, user must have put a
372 * socket descriptor on h->fd, which in this case we can
373 * just call pj_sock_recv() instead of ReadFile().
374 * On Unix, user may put a file in h->fd, so we'll have
375 * to call read() here.
376 * This may not compile on systems which doesn't have
377 * read(). That's why we only specify PJ_LINUX here so
378 * that error is easier to catch.
379 */
380# if defined(PJ_WIN32) && PJ_WIN32 != 0
381 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
382 //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
383 // &bytes_read, NULL);
384# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
385 bytes_read = read(h->fd, read_op->buf, bytes_read);
386 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
387# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
388 bytes_read = sys_read(h->fd, read_op->buf, bytes_read);
389 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
390# else
391# error "Implement read() for this platform!"
392# endif
393 }
394
395 if (rc != PJ_SUCCESS) {
396# if defined(PJ_WIN32) && PJ_WIN32 != 0
397 /* On Win32, for UDP, WSAECONNRESET on the receive side
398 * indicates that previous sending has triggered ICMP Port
399 * Unreachable message.
400 * But we wouldn't know at this point which one of previous
401 * key that has triggered the error, since UDP socket can
402 * be shared!
403 * So we'll just ignore it!
404 */
405
406 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
407 //PJ_LOG(4,(THIS_FILE,
408 // "Ignored ICMP port unreach. on key=%p", h));
409 }
410# endif
411
412 /* In any case we would report this to caller. */
413 bytes_read = -rc;
414 }
415
416 /* Call callback. */
417 if (h->cb.on_read_complete) {
418 (*h->cb.on_read_complete)(h,
419 (pj_ioqueue_op_key_t*)read_op,
420 bytes_read);
421 }
422
423 } else {
Benny Prijono6782e1d2005-11-06 19:46:48 +0000424 /*
425 * This is normal; execution may fall here when multiple threads
426 * are signalled for the same event, but only one thread eventually
427 * able to process the event.
428 */
Benny Prijonobc986152005-11-06 16:50:38 +0000429 pj_mutex_unlock(h->mutex);
430 }
431}
432
433
434void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
435 pj_ioqueue_key_t *h )
436{
437 pj_mutex_lock(h->mutex);
438
439 if (!h->connecting) {
440 /* It is possible that more than one thread was woken up, thus
441 * the remaining thread will see h->connecting as zero because
442 * it has been processed by other thread.
443 */
444 pj_mutex_unlock(h->mutex);
445 return;
446 }
447
448 /* Clear operation. */
449 h->connecting = 0;
450
451 pj_mutex_unlock(h->mutex);
452
453 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
454 ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
455
456 /* Call callback. */
457 if (h->cb.on_connect_complete)
458 (*h->cb.on_connect_complete)(h, -1);
459}
460
461/*
462 * pj_ioqueue_recv()
463 *
464 * Start asynchronous recv() from the socket.
465 */
466PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
467 pj_ioqueue_op_key_t *op_key,
468 void *buffer,
469 pj_ssize_t *length,
470 unsigned flags )
471{
472 pj_status_t status;
473 pj_ssize_t size;
474 struct read_operation *read_op;
475
476 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
477 PJ_CHECK_STACK();
478
479 /* Try to see if there's data immediately available.
480 */
481 size = *length;
482 status = pj_sock_recv(key->fd, buffer, &size, flags);
483 if (status == PJ_SUCCESS) {
484 /* Yes! Data is available! */
485 *length = size;
486 return PJ_SUCCESS;
487 } else {
488 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
489 * the error to caller.
490 */
491 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
492 return status;
493 }
494
495 /*
496 * No data is immediately available.
497 * Must schedule asynchronous operation to the ioqueue.
498 */
499 read_op = (struct read_operation*)op_key;
500
501 read_op->op = PJ_IOQUEUE_OP_RECV;
502 read_op->buf = buffer;
503 read_op->size = *length;
504 read_op->flags = flags;
505
506 pj_mutex_lock(key->mutex);
507 pj_list_insert_before(&key->read_list, read_op);
508 ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
509 pj_mutex_unlock(key->mutex);
510
511 return PJ_EPENDING;
512}
513
514/*
515 * pj_ioqueue_recvfrom()
516 *
517 * Start asynchronous recvfrom() from the socket.
518 */
519PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
520 pj_ioqueue_op_key_t *op_key,
521 void *buffer,
522 pj_ssize_t *length,
523 unsigned flags,
524 pj_sockaddr_t *addr,
525 int *addrlen)
526{
527 pj_status_t status;
528 pj_ssize_t size;
529 struct read_operation *read_op;
530
531 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
532 PJ_CHECK_STACK();
533
534 /* Try to see if there's data immediately available.
535 */
536 size = *length;
537 status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
538 addr, addrlen);
539 if (status == PJ_SUCCESS) {
540 /* Yes! Data is available! */
541 *length = size;
542 return PJ_SUCCESS;
543 } else {
544 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
545 * the error to caller.
546 */
547 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
548 return status;
549 }
550
551 /*
552 * No data is immediately available.
553 * Must schedule asynchronous operation to the ioqueue.
554 */
555 read_op = (struct read_operation*)op_key;
556
557 read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
558 read_op->buf = buffer;
559 read_op->size = *length;
560 read_op->flags = flags;
561 read_op->rmt_addr = addr;
562 read_op->rmt_addrlen = addrlen;
563
564 pj_mutex_lock(key->mutex);
565 pj_list_insert_before(&key->read_list, read_op);
566 ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
567 pj_mutex_unlock(key->mutex);
568
569 return PJ_EPENDING;
570}
571
572/*
573 * pj_ioqueue_send()
574 *
575 * Start asynchronous send() to the descriptor.
576 */
577PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
578 pj_ioqueue_op_key_t *op_key,
579 const void *data,
580 pj_ssize_t *length,
581 unsigned flags)
582{
583 struct write_operation *write_op;
584 pj_status_t status;
585 pj_ssize_t sent;
586
587 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
588 PJ_CHECK_STACK();
589
590 /* Fast track:
591 * Try to send data immediately, only if there's no pending write!
592 * Note:
593 * We are speculating that the list is empty here without properly
594 * acquiring ioqueue's mutex first. This is intentional, to maximize
595 * performance via parallelism.
596 *
597 * This should be safe, because:
598 * - by convention, we require caller to make sure that the
599 * key is not unregistered while other threads are invoking
600 * an operation on the same key.
601 * - pj_list_empty() is safe to be invoked by multiple threads,
602 * even when other threads are modifying the list.
603 */
604 if (pj_list_empty(&key->write_list)) {
605 /*
606 * See if data can be sent immediately.
607 */
608 sent = *length;
609 status = pj_sock_send(key->fd, data, &sent, flags);
610 if (status == PJ_SUCCESS) {
611 /* Success! */
612 *length = sent;
613 return PJ_SUCCESS;
614 } else {
615 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
616 * the error to caller.
617 */
618 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
619 return status;
620 }
621 }
622 }
623
624 /*
625 * Schedule asynchronous send.
626 */
627 write_op = (struct write_operation*)op_key;
628 write_op->op = PJ_IOQUEUE_OP_SEND;
Benny Prijono6782e1d2005-11-06 19:46:48 +0000629 write_op->buf = (void*)data;
Benny Prijonobc986152005-11-06 16:50:38 +0000630 write_op->size = *length;
631 write_op->written = 0;
632 write_op->flags = flags;
633
634 pj_mutex_lock(key->mutex);
635 pj_list_insert_before(&key->write_list, write_op);
636 ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
637 pj_mutex_unlock(key->mutex);
638
639 return PJ_EPENDING;
640}
641
642
643/*
644 * pj_ioqueue_sendto()
645 *
646 * Start asynchronous write() to the descriptor.
647 */
648PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
649 pj_ioqueue_op_key_t *op_key,
650 const void *data,
651 pj_ssize_t *length,
652 unsigned flags,
653 const pj_sockaddr_t *addr,
654 int addrlen)
655{
656 struct write_operation *write_op;
657 pj_status_t status;
658 pj_ssize_t sent;
659
660 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
661 PJ_CHECK_STACK();
662
663 /* Fast track:
664 * Try to send data immediately, only if there's no pending write!
665 * Note:
666 * We are speculating that the list is empty here without properly
667 * acquiring ioqueue's mutex first. This is intentional, to maximize
668 * performance via parallelism.
669 *
670 * This should be safe, because:
671 * - by convention, we require caller to make sure that the
672 * key is not unregistered while other threads are invoking
673 * an operation on the same key.
674 * - pj_list_empty() is safe to be invoked by multiple threads,
675 * even when other threads are modifying the list.
676 */
677 if (pj_list_empty(&key->write_list)) {
678 /*
679 * See if data can be sent immediately.
680 */
681 sent = *length;
682 status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
683 if (status == PJ_SUCCESS) {
684 /* Success! */
685 *length = sent;
686 return PJ_SUCCESS;
687 } else {
688 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
689 * the error to caller.
690 */
691 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
692 return status;
693 }
694 }
695 }
696
697 /*
698 * Check that address storage can hold the address parameter.
699 */
700 PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);
701
702 /*
703 * Schedule asynchronous send.
704 */
705 write_op = (struct write_operation*)op_key;
706 write_op->op = PJ_IOQUEUE_OP_SEND_TO;
Benny Prijono6782e1d2005-11-06 19:46:48 +0000707 write_op->buf = (void*)data;
Benny Prijonobc986152005-11-06 16:50:38 +0000708 write_op->size = *length;
709 write_op->written = 0;
710 write_op->flags = flags;
711 pj_memcpy(&write_op->rmt_addr, addr, addrlen);
712 write_op->rmt_addrlen = addrlen;
713
714 pj_mutex_lock(key->mutex);
715 pj_list_insert_before(&key->write_list, write_op);
716 ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
717 pj_mutex_unlock(key->mutex);
718
719 return PJ_EPENDING;
720}
721
722#if PJ_HAS_TCP
723/*
724 * Initiate overlapped accept() operation.
725 */
726PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
727 pj_ioqueue_op_key_t *op_key,
728 pj_sock_t *new_sock,
729 pj_sockaddr_t *local,
730 pj_sockaddr_t *remote,
731 int *addrlen)
732{
733 struct accept_operation *accept_op;
734 pj_status_t status;
735
736 /* check parameters. All must be specified! */
737 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
738
739 /* Fast track:
740 * See if there's new connection available immediately.
741 */
742 if (pj_list_empty(&key->accept_list)) {
743 status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
744 if (status == PJ_SUCCESS) {
745 /* Yes! New connection is available! */
746 if (local && addrlen) {
747 status = pj_sock_getsockname(*new_sock, local, addrlen);
748 if (status != PJ_SUCCESS) {
749 pj_sock_close(*new_sock);
750 *new_sock = PJ_INVALID_SOCKET;
751 return status;
752 }
753 }
754 return PJ_SUCCESS;
755 } else {
756 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
757 * the error to caller.
758 */
759 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
760 return status;
761 }
762 }
763 }
764
765 /*
766 * No connection is available immediately.
767 * Schedule accept() operation to be completed when there is incoming
768 * connection available.
769 */
770 accept_op = (struct accept_operation*)op_key;
771
772 accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
773 accept_op->accept_fd = new_sock;
774 accept_op->rmt_addr = remote;
775 accept_op->addrlen= addrlen;
776 accept_op->local_addr = local;
777
778 pj_mutex_lock(key->mutex);
779 pj_list_insert_before(&key->accept_list, accept_op);
780 ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
781 pj_mutex_unlock(key->mutex);
782
783 return PJ_EPENDING;
784}
785
786/*
787 * Initiate overlapped connect() operation (well, it's non-blocking actually,
788 * since there's no overlapped version of connect()).
789 */
790PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
791 const pj_sockaddr_t *addr,
792 int addrlen )
793{
794 pj_status_t status;
795
796 /* check parameters. All must be specified! */
797 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
798
799 /* Check if socket has not been marked for connecting */
800 if (key->connecting != 0)
801 return PJ_EPENDING;
802
803 status = pj_sock_connect(key->fd, addr, addrlen);
804 if (status == PJ_SUCCESS) {
805 /* Connected! */
806 return PJ_SUCCESS;
807 } else {
808 if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
809 /* Pending! */
810 pj_mutex_lock(key->mutex);
811 key->connecting = PJ_TRUE;
812 ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
813 ioqueue_add_to_set(key->ioqueue, key->fd, EXCEPTION_EVENT);
814 pj_mutex_unlock(key->mutex);
815 return PJ_EPENDING;
816 } else {
817 /* Error! */
818 return status;
819 }
820 }
821}
822#endif /* PJ_HAS_TCP */
823