blob: be562fa726efe055aafaac71502df5615a16cbb9 [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
3 * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19
20/*
21 * ioqueue_common_abs.c
22 *
23 * This contains common functionalities to emulate proactor pattern with
24 * various event dispatching mechanisms (e.g. select, epoll).
25 *
26 * This file will be included by the appropriate ioqueue implementation.
27 * This file is NOT supposed to be compiled as stand-alone source.
28 */
29
Benny Prijono40212582006-06-22 18:41:28 +000030#define PENDING_RETRY 2
31
Benny Prijono9033e312005-11-21 02:08:39 +000032static void ioqueue_init( pj_ioqueue_t *ioqueue )
33{
34 ioqueue->lock = NULL;
35 ioqueue->auto_delete_lock = 0;
36}
37
38static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
39{
40 if (ioqueue->auto_delete_lock && ioqueue->lock ) {
41 pj_lock_release(ioqueue->lock);
42 return pj_lock_destroy(ioqueue->lock);
43 } else
44 return PJ_SUCCESS;
45}
46
47/*
48 * pj_ioqueue_set_lock()
49 */
50PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
51 pj_lock_t *lock,
52 pj_bool_t auto_delete )
53{
54 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
55
56 if (ioqueue->auto_delete_lock && ioqueue->lock) {
57 pj_lock_destroy(ioqueue->lock);
58 }
59
60 ioqueue->lock = lock;
61 ioqueue->auto_delete_lock = auto_delete;
62
63 return PJ_SUCCESS;
64}
65
66static pj_status_t ioqueue_init_key( pj_pool_t *pool,
67 pj_ioqueue_t *ioqueue,
68 pj_ioqueue_key_t *key,
69 pj_sock_t sock,
70 void *user_data,
71 const pj_ioqueue_callback *cb)
72{
73 pj_status_t rc;
74 int optlen;
75
Benny Prijono8befd9f2006-05-13 22:46:23 +000076 PJ_UNUSED_ARG(pool);
77
Benny Prijono9033e312005-11-21 02:08:39 +000078 key->ioqueue = ioqueue;
79 key->fd = sock;
80 key->user_data = user_data;
81 pj_list_init(&key->read_list);
82 pj_list_init(&key->write_list);
83#if PJ_HAS_TCP
84 pj_list_init(&key->accept_list);
Benny Prijono5accbd02006-03-30 16:32:18 +000085 key->connecting = 0;
Benny Prijono9033e312005-11-21 02:08:39 +000086#endif
87
88 /* Save callback. */
89 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
90
Benny Prijono5accbd02006-03-30 16:32:18 +000091#if PJ_IOQUEUE_HAS_SAFE_UNREG
92 /* Set initial reference count to 1 */
93 pj_assert(key->ref_count == 0);
94 ++key->ref_count;
95
96 key->closing = 0;
97#endif
98
Benny Prijono9033e312005-11-21 02:08:39 +000099 /* Get socket type. When socket type is datagram, some optimization
100 * will be performed during send to allow parallel send operations.
101 */
102 optlen = sizeof(key->fd_type);
103 rc = pj_sock_getsockopt(sock, PJ_SOL_SOCKET, PJ_SO_TYPE,
104 &key->fd_type, &optlen);
105 if (rc != PJ_SUCCESS)
106 key->fd_type = PJ_SOCK_STREAM;
107
108 /* Create mutex for the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000109#if !PJ_IOQUEUE_HAS_SAFE_UNREG
110 rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
111#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000112
113 return rc;
114}
115
Benny Prijono9033e312005-11-21 02:08:39 +0000116/*
117 * pj_ioqueue_get_user_data()
118 *
119 * Obtain value associated with a key.
120 */
121PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
122{
123 PJ_ASSERT_RETURN(key != NULL, NULL);
124 return key->user_data;
125}
126
127/*
128 * pj_ioqueue_set_user_data()
129 */
130PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
131 void *user_data,
132 void **old_data)
133{
134 PJ_ASSERT_RETURN(key, PJ_EINVAL);
135
136 if (old_data)
137 *old_data = key->user_data;
138 key->user_data = user_data;
139
140 return PJ_SUCCESS;
141}
142
143PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
144{
145 return !pj_list_empty(&key->write_list);
146}
147
148PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
149{
150 return !pj_list_empty(&key->read_list);
151}
152
153PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
154{
155#if PJ_HAS_TCP
156 return !pj_list_empty(&key->accept_list);
157#else
158 return 0;
159#endif
160}
161
162PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
163{
164 return key->connecting;
165}
166
167
Benny Prijono5accbd02006-03-30 16:32:18 +0000168#if PJ_IOQUEUE_HAS_SAFE_UNREG
169# define IS_CLOSING(key) (key->closing)
170#else
171# define IS_CLOSING(key) (0)
172#endif
173
174
Benny Prijono9033e312005-11-21 02:08:39 +0000175/*
176 * ioqueue_dispatch_event()
177 *
178 * Report occurence of an event in the key to be processed by the
179 * framework.
180 */
181void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
182{
183 /* Lock the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000184 pj_mutex_lock(h->mutex);
185
186 if (h->closing) {
187 pj_mutex_unlock(h->mutex);
188 return;
189 }
Benny Prijono9033e312005-11-21 02:08:39 +0000190
191#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
192 if (h->connecting) {
193 /* Completion of connect() operation */
194 pj_ssize_t bytes_transfered;
195
196 /* Clear operation. */
197 h->connecting = 0;
198
Benny Prijono63ab3562006-07-08 19:46:43 +0000199 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
200 ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000201
Benny Prijono9033e312005-11-21 02:08:39 +0000202
203#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
204 /* from connect(2):
205 * On Linux, use getsockopt to read the SO_ERROR option at
206 * level SOL_SOCKET to determine whether connect() completed
207 * successfully (if SO_ERROR is zero).
208 */
209 {
210 int value;
Benny Prijono7db431e2006-07-23 14:38:49 +0000211 int vallen = sizeof(value);
Benny Prijono2bbd7102006-07-18 00:10:53 +0000212 int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
213 &value, &vallen);
Benny Prijono9033e312005-11-21 02:08:39 +0000214 if (gs_rc != 0) {
215 /* Argh!! What to do now???
216 * Just indicate that the socket is connected. The
217 * application will get error as soon as it tries to use
218 * the socket to send/receive.
219 */
220 bytes_transfered = 0;
221 } else {
222 bytes_transfered = value;
223 }
224 }
225#elif defined(PJ_WIN32) && PJ_WIN32!=0
226 bytes_transfered = 0; /* success */
227#else
228 /* Excellent information in D.J. Bernstein page:
229 * http://cr.yp.to/docs/connect.html
230 *
231 * Seems like the most portable way of detecting connect()
232 * failure is to call getpeername(). If socket is connected,
233 * getpeername() will return 0. If the socket is not connected,
234 * it will return ENOTCONN, and read(fd, &ch, 1) will produce
235 * the right errno through error slippage. This is a combination
236 * of suggestions from Douglas C. Schmidt and Ken Keys.
237 */
Benny Prijono9cf138e2006-01-19 03:58:29 +0000238 {
239 int gp_rc;
240 struct sockaddr_in addr;
241 socklen_t addrlen = sizeof(addr);
Benny Prijono9033e312005-11-21 02:08:39 +0000242
Benny Prijono9cf138e2006-01-19 03:58:29 +0000243 gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000244 bytes_transfered = (gp_rc < 0) ? gp_rc : -gp_rc;
Benny Prijono9cf138e2006-01-19 03:58:29 +0000245 }
Benny Prijono9033e312005-11-21 02:08:39 +0000246#endif
247
Benny Prijono5accbd02006-03-30 16:32:18 +0000248 /* Unlock; from this point we don't need to hold key's mutex. */
249 pj_mutex_unlock(h->mutex);
250
Benny Prijono9033e312005-11-21 02:08:39 +0000251 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000252 if (h->cb.on_connect_complete && !IS_CLOSING(h))
Benny Prijono9033e312005-11-21 02:08:39 +0000253 (*h->cb.on_connect_complete)(h, bytes_transfered);
254
255 /* Done. */
256
257 } else
258#endif /* PJ_HAS_TCP */
259 if (key_has_pending_write(h)) {
260 /* Socket is writable. */
261 struct write_operation *write_op;
262 pj_ssize_t sent;
263 pj_status_t send_rc;
264
265 /* Get the first in the queue. */
266 write_op = h->write_list.next;
267
268 /* For datagrams, we can remove the write_op from the list
269 * so that send() can work in parallel.
270 */
271 if (h->fd_type == PJ_SOCK_DGRAM) {
272 pj_list_erase(write_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000273
274 if (pj_list_empty(&h->write_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000275 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000276
Benny Prijono9033e312005-11-21 02:08:39 +0000277 }
278
279 /* Send the data.
280 * Unfortunately we must do this while holding key's mutex, thus
281 * preventing parallel write on a single key.. :-((
282 */
283 sent = write_op->size - write_op->written;
284 if (write_op->op == PJ_IOQUEUE_OP_SEND) {
285 send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
286 &sent, write_op->flags);
Benny Prijono40212582006-06-22 18:41:28 +0000287 /* Can't do this. We only clear "op" after we're finished sending
288 * the whole buffer.
289 */
290 //write_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000291 } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
292 send_rc = pj_sock_sendto(h->fd,
293 write_op->buf+write_op->written,
294 &sent, write_op->flags,
295 &write_op->rmt_addr,
296 write_op->rmt_addrlen);
Benny Prijono40212582006-06-22 18:41:28 +0000297 /* Can't do this. We only clear "op" after we're finished sending
298 * the whole buffer.
299 */
300 //write_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000301 } else {
302 pj_assert(!"Invalid operation type!");
Benny Prijono37e8d332006-01-20 21:03:36 +0000303 write_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000304 send_rc = PJ_EBUG;
305 }
306
307 if (send_rc == PJ_SUCCESS) {
308 write_op->written += sent;
309 } else {
310 pj_assert(send_rc > 0);
311 write_op->written = -send_rc;
312 }
313
314 /* Are we finished with this buffer? */
315 if (send_rc!=PJ_SUCCESS ||
316 write_op->written == (pj_ssize_t)write_op->size ||
317 h->fd_type == PJ_SOCK_DGRAM)
318 {
Benny Prijono40212582006-06-22 18:41:28 +0000319
320 write_op->op = 0;
321
Benny Prijono9033e312005-11-21 02:08:39 +0000322 if (h->fd_type != PJ_SOCK_DGRAM) {
323 /* Write completion of the whole stream. */
324 pj_list_erase(write_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000325
326 /* Clear operation if there's no more data to send. */
327 if (pj_list_empty(&h->write_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000328 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000329
Benny Prijono9033e312005-11-21 02:08:39 +0000330 }
331
Benny Prijono5accbd02006-03-30 16:32:18 +0000332 /* No need to hold mutex anymore */
333 pj_mutex_unlock(h->mutex);
334
Benny Prijono9033e312005-11-21 02:08:39 +0000335 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000336 if (h->cb.on_write_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000337 (*h->cb.on_write_complete)(h,
338 (pj_ioqueue_op_key_t*)write_op,
339 write_op->written);
340 }
341
342 } else {
Benny Prijono5accbd02006-03-30 16:32:18 +0000343 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000344 }
345
346 /* Done. */
347 } else {
348 /*
349 * This is normal; execution may fall here when multiple threads
350 * are signalled for the same event, but only one thread eventually
351 * able to process the event.
352 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000353 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000354 }
355}
356
357void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
358{
359 pj_status_t rc;
360
361 /* Lock the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000362 pj_mutex_lock(h->mutex);
363
364 if (h->closing) {
365 pj_mutex_unlock(h->mutex);
366 return;
367 }
Benny Prijono9033e312005-11-21 02:08:39 +0000368
369# if PJ_HAS_TCP
370 if (!pj_list_empty(&h->accept_list)) {
371
372 struct accept_operation *accept_op;
373
374 /* Get one accept operation from the list. */
375 accept_op = h->accept_list.next;
376 pj_list_erase(accept_op);
377 accept_op->op = 0;
378
379 /* Clear bit in fdset if there is no more pending accept */
380 if (pj_list_empty(&h->accept_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000381 ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000382
Benny Prijono9033e312005-11-21 02:08:39 +0000383 rc=pj_sock_accept(h->fd, accept_op->accept_fd,
384 accept_op->rmt_addr, accept_op->addrlen);
385 if (rc==PJ_SUCCESS && accept_op->local_addr) {
386 rc = pj_sock_getsockname(*accept_op->accept_fd,
387 accept_op->local_addr,
388 accept_op->addrlen);
389 }
390
Benny Prijono5accbd02006-03-30 16:32:18 +0000391 /* Unlock; from this point we don't need to hold key's mutex. */
392 pj_mutex_unlock(h->mutex);
393
Benny Prijono9033e312005-11-21 02:08:39 +0000394 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000395 if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000396 (*h->cb.on_accept_complete)(h,
397 (pj_ioqueue_op_key_t*)accept_op,
398 *accept_op->accept_fd, rc);
399 }
400
401 }
402 else
403# endif
404 if (key_has_pending_read(h)) {
405 struct read_operation *read_op;
406 pj_ssize_t bytes_read;
407
408 /* Get one pending read operation from the list. */
409 read_op = h->read_list.next;
410 pj_list_erase(read_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000411
412 /* Clear fdset if there is no pending read. */
413 if (pj_list_empty(&h->read_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000414 ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000415
Benny Prijono9033e312005-11-21 02:08:39 +0000416 bytes_read = read_op->size;
417
418 if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
Benny Prijono37e8d332006-01-20 21:03:36 +0000419 read_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000420 rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0,
421 read_op->rmt_addr,
422 read_op->rmt_addrlen);
423 } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
Benny Prijono37e8d332006-01-20 21:03:36 +0000424 read_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000425 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
426 } else {
427 pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
Benny Prijono37e8d332006-01-20 21:03:36 +0000428 read_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000429 /*
430 * User has specified pj_ioqueue_read().
431 * On Win32, we should do ReadFile(). But because we got
432 * here because of select() anyway, user must have put a
433 * socket descriptor on h->fd, which in this case we can
434 * just call pj_sock_recv() instead of ReadFile().
435 * On Unix, user may put a file in h->fd, so we'll have
436 * to call read() here.
437 * This may not compile on systems which doesn't have
438 * read(). That's why we only specify PJ_LINUX here so
439 * that error is easier to catch.
440 */
Benny Prijono9cf138e2006-01-19 03:58:29 +0000441# if defined(PJ_WIN32) && PJ_WIN32 != 0 || \
442 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE != 0
Benny Prijono9033e312005-11-21 02:08:39 +0000443 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
444 //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
445 // &bytes_read, NULL);
446# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
447 bytes_read = read(h->fd, read_op->buf, bytes_read);
448 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
449# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
450 bytes_read = sys_read(h->fd, read_op->buf, bytes_read);
451 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
452# else
453# error "Implement read() for this platform!"
454# endif
455 }
456
457 if (rc != PJ_SUCCESS) {
458# if defined(PJ_WIN32) && PJ_WIN32 != 0
459 /* On Win32, for UDP, WSAECONNRESET on the receive side
460 * indicates that previous sending has triggered ICMP Port
461 * Unreachable message.
462 * But we wouldn't know at this point which one of previous
463 * key that has triggered the error, since UDP socket can
464 * be shared!
465 * So we'll just ignore it!
466 */
467
468 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
469 //PJ_LOG(4,(THIS_FILE,
470 // "Ignored ICMP port unreach. on key=%p", h));
471 }
472# endif
473
474 /* In any case we would report this to caller. */
475 bytes_read = -rc;
476 }
477
Benny Prijono5accbd02006-03-30 16:32:18 +0000478 /* Unlock; from this point we don't need to hold key's mutex. */
479 pj_mutex_unlock(h->mutex);
480
Benny Prijono9033e312005-11-21 02:08:39 +0000481 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000482 if (h->cb.on_read_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000483 (*h->cb.on_read_complete)(h,
484 (pj_ioqueue_op_key_t*)read_op,
485 bytes_read);
486 }
487
488 } else {
489 /*
490 * This is normal; execution may fall here when multiple threads
491 * are signalled for the same event, but only one thread eventually
492 * able to process the event.
493 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000494 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000495 }
496}
497
498
499void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
500 pj_ioqueue_key_t *h )
501{
Benny Prijono5accbd02006-03-30 16:32:18 +0000502 pj_mutex_lock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000503
504 if (!h->connecting) {
505 /* It is possible that more than one thread was woken up, thus
506 * the remaining thread will see h->connecting as zero because
507 * it has been processed by other thread.
508 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000509 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000510 return;
511 }
512
Benny Prijono5accbd02006-03-30 16:32:18 +0000513 if (h->closing) {
514 pj_mutex_unlock(h->mutex);
515 return;
516 }
517
Benny Prijono9033e312005-11-21 02:08:39 +0000518 /* Clear operation. */
519 h->connecting = 0;
520
Benny Prijono63ab3562006-07-08 19:46:43 +0000521 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
522 ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000523
Benny Prijono5accbd02006-03-30 16:32:18 +0000524 pj_mutex_unlock(h->mutex);
Benny Prijonoac52df42006-03-25 10:06:00 +0000525
Benny Prijono5accbd02006-03-30 16:32:18 +0000526 /* Call callback. */
Benny Prijono2bbd7102006-07-18 00:10:53 +0000527 if (h->cb.on_connect_complete && !IS_CLOSING(h)) {
528 pj_status_t status = -1;
529#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
530 int value;
Benny Prijono7db431e2006-07-23 14:38:49 +0000531 int vallen = sizeof(value);
Benny Prijono2bbd7102006-07-18 00:10:53 +0000532 int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
533 &value, &vallen);
534 if (gs_rc == 0) {
535 status = PJ_RETURN_OS_ERROR(value);
536 }
537#endif
538
539 (*h->cb.on_connect_complete)(h, status);
540 }
Benny Prijono9033e312005-11-21 02:08:39 +0000541}
542
543/*
544 * pj_ioqueue_recv()
545 *
546 * Start asynchronous recv() from the socket.
547 */
548PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
549 pj_ioqueue_op_key_t *op_key,
550 void *buffer,
551 pj_ssize_t *length,
552 unsigned flags )
553{
554 struct read_operation *read_op;
555
556 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
557 PJ_CHECK_STACK();
558
559 read_op = (struct read_operation*)op_key;
560 read_op->op = 0;
561
Benny Prijono5accbd02006-03-30 16:32:18 +0000562 /* Check if key is closing. */
563 if (key->closing)
564 return PJ_ECANCELLED;
565
Benny Prijono9033e312005-11-21 02:08:39 +0000566 /* Try to see if there's data immediately available.
567 */
568 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
569 pj_status_t status;
570 pj_ssize_t size;
571
572 size = *length;
573 status = pj_sock_recv(key->fd, buffer, &size, flags);
574 if (status == PJ_SUCCESS) {
575 /* Yes! Data is available! */
576 *length = size;
577 return PJ_SUCCESS;
578 } else {
579 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
580 * the error to caller.
581 */
582 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
583 return status;
584 }
585 }
586
587 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
588
589 /*
590 * No data is immediately available.
591 * Must schedule asynchronous operation to the ioqueue.
592 */
593 read_op->op = PJ_IOQUEUE_OP_RECV;
594 read_op->buf = buffer;
595 read_op->size = *length;
596 read_op->flags = flags;
597
598 pj_mutex_lock(key->mutex);
599 pj_list_insert_before(&key->read_list, read_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000600 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000601 pj_mutex_unlock(key->mutex);
602
603 return PJ_EPENDING;
604}
605
606/*
607 * pj_ioqueue_recvfrom()
608 *
609 * Start asynchronous recvfrom() from the socket.
610 */
611PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
612 pj_ioqueue_op_key_t *op_key,
613 void *buffer,
614 pj_ssize_t *length,
615 unsigned flags,
616 pj_sockaddr_t *addr,
617 int *addrlen)
618{
619 struct read_operation *read_op;
620
621 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
622 PJ_CHECK_STACK();
623
Benny Prijono5accbd02006-03-30 16:32:18 +0000624 /* Check if key is closing. */
625 if (key->closing)
626 return PJ_ECANCELLED;
627
Benny Prijono9033e312005-11-21 02:08:39 +0000628 read_op = (struct read_operation*)op_key;
629 read_op->op = 0;
630
631 /* Try to see if there's data immediately available.
632 */
633 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
634 pj_status_t status;
635 pj_ssize_t size;
636
637 size = *length;
638 status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
639 addr, addrlen);
640 if (status == PJ_SUCCESS) {
641 /* Yes! Data is available! */
642 *length = size;
643 return PJ_SUCCESS;
644 } else {
645 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
646 * the error to caller.
647 */
648 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
649 return status;
650 }
651 }
652
653 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
654
655 /*
656 * No data is immediately available.
657 * Must schedule asynchronous operation to the ioqueue.
658 */
659 read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
660 read_op->buf = buffer;
661 read_op->size = *length;
662 read_op->flags = flags;
663 read_op->rmt_addr = addr;
664 read_op->rmt_addrlen = addrlen;
665
666 pj_mutex_lock(key->mutex);
667 pj_list_insert_before(&key->read_list, read_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000668 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000669 pj_mutex_unlock(key->mutex);
670
671 return PJ_EPENDING;
672}
673
674/*
675 * pj_ioqueue_send()
676 *
677 * Start asynchronous send() to the descriptor.
678 */
679PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
680 pj_ioqueue_op_key_t *op_key,
681 const void *data,
682 pj_ssize_t *length,
683 unsigned flags)
684{
685 struct write_operation *write_op;
686 pj_status_t status;
Benny Prijono40212582006-06-22 18:41:28 +0000687 unsigned retry;
Benny Prijono9033e312005-11-21 02:08:39 +0000688 pj_ssize_t sent;
689
690 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
691 PJ_CHECK_STACK();
692
Benny Prijono5accbd02006-03-30 16:32:18 +0000693 /* Check if key is closing. */
694 if (key->closing)
695 return PJ_ECANCELLED;
696
Benny Prijono9033e312005-11-21 02:08:39 +0000697 /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */
698 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
699
700 /* Fast track:
701 * Try to send data immediately, only if there's no pending write!
702 * Note:
703 * We are speculating that the list is empty here without properly
704 * acquiring ioqueue's mutex first. This is intentional, to maximize
705 * performance via parallelism.
706 *
707 * This should be safe, because:
708 * - by convention, we require caller to make sure that the
709 * key is not unregistered while other threads are invoking
710 * an operation on the same key.
711 * - pj_list_empty() is safe to be invoked by multiple threads,
712 * even when other threads are modifying the list.
713 */
714 if (pj_list_empty(&key->write_list)) {
715 /*
716 * See if data can be sent immediately.
717 */
718 sent = *length;
719 status = pj_sock_send(key->fd, data, &sent, flags);
720 if (status == PJ_SUCCESS) {
721 /* Success! */
722 *length = sent;
723 return PJ_SUCCESS;
724 } else {
725 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
726 * the error to caller.
727 */
728 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
729 return status;
730 }
731 }
732 }
733
734 /*
735 * Schedule asynchronous send.
736 */
Benny Prijono40212582006-06-22 18:41:28 +0000737 write_op = (struct write_operation*)op_key;
738
739 /* Spin if write_op has pending operation */
740 for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
741 pj_thread_sleep(0);
742
743 /* Last chance */
744 if (write_op->op) {
745 /* Unable to send packet because there is already pending write in the
746 * write_op. We could not put the operation into the write_op
747 * because write_op already contains a pending operation! And
748 * we could not send the packet directly with send() either,
749 * because that will break the order of the packet. So we can
750 * only return error here.
751 *
752 * This could happen for example in multithreads program,
753 * where polling is done by one thread, while other threads are doing
754 * the sending only. If the polling thread runs on lower priority
755 * than the sending thread, then it's possible that the pending
756 * write flag is not cleared in-time because clearing is only done
757 * during polling.
758 *
759 * Aplication should specify multiple write operation keys on
760 * situation like this.
761 */
762 //pj_assert(!"ioqueue: there is pending operation on this key!");
763 return PJ_EBUSY;
764 }
765
Benny Prijono9033e312005-11-21 02:08:39 +0000766 write_op->op = PJ_IOQUEUE_OP_SEND;
767 write_op->buf = (void*)data;
768 write_op->size = *length;
769 write_op->written = 0;
770 write_op->flags = flags;
771
772 pj_mutex_lock(key->mutex);
773 pj_list_insert_before(&key->write_list, write_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000774 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000775 pj_mutex_unlock(key->mutex);
776
777 return PJ_EPENDING;
778}
779
780
781/*
782 * pj_ioqueue_sendto()
783 *
784 * Start asynchronous write() to the descriptor.
785 */
786PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
787 pj_ioqueue_op_key_t *op_key,
788 const void *data,
789 pj_ssize_t *length,
790 pj_uint32_t flags,
791 const pj_sockaddr_t *addr,
792 int addrlen)
793{
794 struct write_operation *write_op;
Benny Prijono40212582006-06-22 18:41:28 +0000795 unsigned retry;
Benny Prijono9033e312005-11-21 02:08:39 +0000796 pj_status_t status;
797 pj_ssize_t sent;
798
799 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
800 PJ_CHECK_STACK();
801
Benny Prijono5accbd02006-03-30 16:32:18 +0000802 /* Check if key is closing. */
803 if (key->closing)
804 return PJ_ECANCELLED;
805
Benny Prijono9033e312005-11-21 02:08:39 +0000806 /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */
807 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
808
809 /* Fast track:
810 * Try to send data immediately, only if there's no pending write!
811 * Note:
812 * We are speculating that the list is empty here without properly
813 * acquiring ioqueue's mutex first. This is intentional, to maximize
814 * performance via parallelism.
815 *
816 * This should be safe, because:
817 * - by convention, we require caller to make sure that the
818 * key is not unregistered while other threads are invoking
819 * an operation on the same key.
820 * - pj_list_empty() is safe to be invoked by multiple threads,
821 * even when other threads are modifying the list.
822 */
823 if (pj_list_empty(&key->write_list)) {
824 /*
825 * See if data can be sent immediately.
826 */
827 sent = *length;
828 status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
829 if (status == PJ_SUCCESS) {
830 /* Success! */
831 *length = sent;
832 return PJ_SUCCESS;
833 } else {
834 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
835 * the error to caller.
836 */
837 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
838 return status;
839 }
Benny Prijono40212582006-06-22 18:41:28 +0000840 status = status;
Benny Prijono9033e312005-11-21 02:08:39 +0000841 }
Benny Prijono63ab3562006-07-08 19:46:43 +0000842 PJ_LOG(3,(THIS_FILE, "pending write operation!!"));
Benny Prijono9033e312005-11-21 02:08:39 +0000843 }
844
845 /*
846 * Check that address storage can hold the address parameter.
847 */
848 PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);
849
850 /*
851 * Schedule asynchronous send.
852 */
Benny Prijono40212582006-06-22 18:41:28 +0000853 write_op = (struct write_operation*)op_key;
854
855 /* Spin if write_op has pending operation */
856 for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
857 pj_thread_sleep(0);
858
859 /* Last chance */
860 if (write_op->op) {
861 /* Unable to send packet because there is already pending write on the
862 * write_op. We could not put the operation into the write_op
863 * because write_op already contains a pending operation! And
864 * we could not send the packet directly with sendto() either,
865 * because that will break the order of the packet. So we can
866 * only return error here.
867 *
868 * This could happen for example in multithreads program,
869 * where polling is done by one thread, while other threads are doing
870 * the sending only. If the polling thread runs on lower priority
871 * than the sending thread, then it's possible that the pending
872 * write flag is not cleared in-time because clearing is only done
873 * during polling.
874 *
875 * Aplication should specify multiple write operation keys on
876 * situation like this.
877 */
878 //pj_assert(!"ioqueue: there is pending operation on this key!");
879 return PJ_EBUSY;
880 }
881
Benny Prijono9033e312005-11-21 02:08:39 +0000882 write_op->op = PJ_IOQUEUE_OP_SEND_TO;
883 write_op->buf = (void*)data;
884 write_op->size = *length;
885 write_op->written = 0;
886 write_op->flags = flags;
887 pj_memcpy(&write_op->rmt_addr, addr, addrlen);
888 write_op->rmt_addrlen = addrlen;
889
890 pj_mutex_lock(key->mutex);
891 pj_list_insert_before(&key->write_list, write_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000892 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000893 pj_mutex_unlock(key->mutex);
894
895 return PJ_EPENDING;
896}
897
898#if PJ_HAS_TCP
899/*
900 * Initiate overlapped accept() operation.
901 */
902PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
903 pj_ioqueue_op_key_t *op_key,
904 pj_sock_t *new_sock,
905 pj_sockaddr_t *local,
906 pj_sockaddr_t *remote,
907 int *addrlen)
908{
909 struct accept_operation *accept_op;
910 pj_status_t status;
911
912 /* check parameters. All must be specified! */
913 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
914
Benny Prijono5accbd02006-03-30 16:32:18 +0000915 /* Check if key is closing. */
916 if (key->closing)
917 return PJ_ECANCELLED;
918
Benny Prijono9033e312005-11-21 02:08:39 +0000919 accept_op = (struct accept_operation*)op_key;
920 accept_op->op = 0;
921
922 /* Fast track:
923 * See if there's new connection available immediately.
924 */
925 if (pj_list_empty(&key->accept_list)) {
926 status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
927 if (status == PJ_SUCCESS) {
928 /* Yes! New connection is available! */
929 if (local && addrlen) {
930 status = pj_sock_getsockname(*new_sock, local, addrlen);
931 if (status != PJ_SUCCESS) {
932 pj_sock_close(*new_sock);
933 *new_sock = PJ_INVALID_SOCKET;
934 return status;
935 }
936 }
937 return PJ_SUCCESS;
938 } else {
939 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
940 * the error to caller.
941 */
942 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
943 return status;
944 }
945 }
946 }
947
948 /*
949 * No connection is available immediately.
950 * Schedule accept() operation to be completed when there is incoming
951 * connection available.
952 */
953 accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
954 accept_op->accept_fd = new_sock;
955 accept_op->rmt_addr = remote;
956 accept_op->addrlen= addrlen;
957 accept_op->local_addr = local;
958
959 pj_mutex_lock(key->mutex);
960 pj_list_insert_before(&key->accept_list, accept_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000961 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000962 pj_mutex_unlock(key->mutex);
963
964 return PJ_EPENDING;
965}
966
967/*
968 * Initiate overlapped connect() operation (well, it's non-blocking actually,
969 * since there's no overlapped version of connect()).
970 */
971PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
972 const pj_sockaddr_t *addr,
973 int addrlen )
974{
975 pj_status_t status;
976
977 /* check parameters. All must be specified! */
978 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
979
Benny Prijono5accbd02006-03-30 16:32:18 +0000980 /* Check if key is closing. */
981 if (key->closing)
982 return PJ_ECANCELLED;
983
Benny Prijono9033e312005-11-21 02:08:39 +0000984 /* Check if socket has not been marked for connecting */
985 if (key->connecting != 0)
986 return PJ_EPENDING;
987
988 status = pj_sock_connect(key->fd, addr, addrlen);
989 if (status == PJ_SUCCESS) {
990 /* Connected! */
991 return PJ_SUCCESS;
992 } else {
993 if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
994 /* Pending! */
995 pj_mutex_lock(key->mutex);
996 key->connecting = PJ_TRUE;
Benny Prijono63ab3562006-07-08 19:46:43 +0000997 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
998 ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000999 pj_mutex_unlock(key->mutex);
1000 return PJ_EPENDING;
1001 } else {
1002 /* Error! */
1003 return status;
1004 }
1005 }
1006}
1007#endif /* PJ_HAS_TCP */
1008
1009
1010PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1011 pj_size_t size )
1012{
Benny Prijonoac623b32006-07-03 15:19:31 +00001013 pj_bzero(op_key, size);
Benny Prijono9033e312005-11-21 02:08:39 +00001014}
1015
1016
1017/*
1018 * pj_ioqueue_is_pending()
1019 */
1020PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1021 pj_ioqueue_op_key_t *op_key )
1022{
1023 struct generic_operation *op_rec;
1024
1025 PJ_UNUSED_ARG(key);
1026
1027 op_rec = (struct generic_operation*)op_key;
1028 return op_rec->op != 0;
1029}
1030
1031
1032/*
1033 * pj_ioqueue_post_completion()
1034 */
1035PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1036 pj_ioqueue_op_key_t *op_key,
1037 pj_ssize_t bytes_status )
1038{
1039 struct generic_operation *op_rec;
1040
1041 /*
1042 * Find the operation key in all pending operation list to
1043 * really make sure that it's still there; then call the callback.
1044 */
Benny Prijono5accbd02006-03-30 16:32:18 +00001045 pj_mutex_lock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001046
1047 /* Find the operation in the pending read list. */
1048 op_rec = (struct generic_operation*)key->read_list.next;
1049 while (op_rec != (void*)&key->read_list) {
1050 if (op_rec == (void*)op_key) {
1051 pj_list_erase(op_rec);
1052 op_rec->op = 0;
Benny Prijono5accbd02006-03-30 16:32:18 +00001053 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001054
1055 (*key->cb.on_read_complete)(key, op_key, bytes_status);
1056 return PJ_SUCCESS;
1057 }
1058 op_rec = op_rec->next;
1059 }
1060
1061 /* Find the operation in the pending write list. */
1062 op_rec = (struct generic_operation*)key->write_list.next;
1063 while (op_rec != (void*)&key->write_list) {
1064 if (op_rec == (void*)op_key) {
1065 pj_list_erase(op_rec);
1066 op_rec->op = 0;
Benny Prijono5accbd02006-03-30 16:32:18 +00001067 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001068
1069 (*key->cb.on_write_complete)(key, op_key, bytes_status);
1070 return PJ_SUCCESS;
1071 }
1072 op_rec = op_rec->next;
1073 }
1074
1075 /* Find the operation in the pending accept list. */
1076 op_rec = (struct generic_operation*)key->accept_list.next;
1077 while (op_rec != (void*)&key->accept_list) {
1078 if (op_rec == (void*)op_key) {
1079 pj_list_erase(op_rec);
1080 op_rec->op = 0;
Benny Prijono5accbd02006-03-30 16:32:18 +00001081 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001082
1083 (*key->cb.on_accept_complete)(key, op_key,
1084 PJ_INVALID_SOCKET,
1085 bytes_status);
1086 return PJ_SUCCESS;
1087 }
1088 op_rec = op_rec->next;
1089 }
1090
Benny Prijono5accbd02006-03-30 16:32:18 +00001091 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001092
1093 return PJ_EINVALIDOP;
1094}
1095