blob: 2301ac969b277c14663c91e701c3b01580a9b837 [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
Benny Prijonoa771a512007-02-19 01:13:53 +00003 * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org>
Benny Prijono9033e312005-11-21 02:08:39 +00004 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19
20/*
21 * ioqueue_common_abs.c
22 *
23 * This contains common functionalities to emulate proactor pattern with
24 * various event dispatching mechanisms (e.g. select, epoll).
25 *
26 * This file will be included by the appropriate ioqueue implementation.
27 * This file is NOT supposed to be compiled as stand-alone source.
28 */
29
Benny Prijono40212582006-06-22 18:41:28 +000030#define PENDING_RETRY 2
31
Benny Prijono9033e312005-11-21 02:08:39 +000032static void ioqueue_init( pj_ioqueue_t *ioqueue )
33{
34 ioqueue->lock = NULL;
35 ioqueue->auto_delete_lock = 0;
36}
37
38static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
39{
40 if (ioqueue->auto_delete_lock && ioqueue->lock ) {
41 pj_lock_release(ioqueue->lock);
42 return pj_lock_destroy(ioqueue->lock);
43 } else
44 return PJ_SUCCESS;
45}
46
47/*
48 * pj_ioqueue_set_lock()
49 */
50PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
51 pj_lock_t *lock,
52 pj_bool_t auto_delete )
53{
54 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
55
56 if (ioqueue->auto_delete_lock && ioqueue->lock) {
57 pj_lock_destroy(ioqueue->lock);
58 }
59
60 ioqueue->lock = lock;
61 ioqueue->auto_delete_lock = auto_delete;
62
63 return PJ_SUCCESS;
64}
65
66static pj_status_t ioqueue_init_key( pj_pool_t *pool,
67 pj_ioqueue_t *ioqueue,
68 pj_ioqueue_key_t *key,
69 pj_sock_t sock,
70 void *user_data,
71 const pj_ioqueue_callback *cb)
72{
73 pj_status_t rc;
74 int optlen;
75
Benny Prijono8befd9f2006-05-13 22:46:23 +000076 PJ_UNUSED_ARG(pool);
77
Benny Prijono9033e312005-11-21 02:08:39 +000078 key->ioqueue = ioqueue;
79 key->fd = sock;
80 key->user_data = user_data;
81 pj_list_init(&key->read_list);
82 pj_list_init(&key->write_list);
83#if PJ_HAS_TCP
84 pj_list_init(&key->accept_list);
Benny Prijono5accbd02006-03-30 16:32:18 +000085 key->connecting = 0;
Benny Prijono9033e312005-11-21 02:08:39 +000086#endif
87
88 /* Save callback. */
89 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
90
Benny Prijono5accbd02006-03-30 16:32:18 +000091#if PJ_IOQUEUE_HAS_SAFE_UNREG
92 /* Set initial reference count to 1 */
93 pj_assert(key->ref_count == 0);
94 ++key->ref_count;
95
96 key->closing = 0;
97#endif
98
Benny Prijono9033e312005-11-21 02:08:39 +000099 /* Get socket type. When socket type is datagram, some optimization
100 * will be performed during send to allow parallel send operations.
101 */
102 optlen = sizeof(key->fd_type);
103 rc = pj_sock_getsockopt(sock, PJ_SOL_SOCKET, PJ_SO_TYPE,
104 &key->fd_type, &optlen);
105 if (rc != PJ_SUCCESS)
106 key->fd_type = PJ_SOCK_STREAM;
107
108 /* Create mutex for the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000109#if !PJ_IOQUEUE_HAS_SAFE_UNREG
110 rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
111#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000112
113 return rc;
114}
115
Benny Prijono9033e312005-11-21 02:08:39 +0000116/*
117 * pj_ioqueue_get_user_data()
118 *
119 * Obtain value associated with a key.
120 */
121PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
122{
123 PJ_ASSERT_RETURN(key != NULL, NULL);
124 return key->user_data;
125}
126
127/*
128 * pj_ioqueue_set_user_data()
129 */
130PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
131 void *user_data,
132 void **old_data)
133{
134 PJ_ASSERT_RETURN(key, PJ_EINVAL);
135
136 if (old_data)
137 *old_data = key->user_data;
138 key->user_data = user_data;
139
140 return PJ_SUCCESS;
141}
142
143PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
144{
145 return !pj_list_empty(&key->write_list);
146}
147
148PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
149{
150 return !pj_list_empty(&key->read_list);
151}
152
153PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
154{
155#if PJ_HAS_TCP
156 return !pj_list_empty(&key->accept_list);
157#else
158 return 0;
159#endif
160}
161
162PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
163{
164 return key->connecting;
165}
166
167
Benny Prijono5accbd02006-03-30 16:32:18 +0000168#if PJ_IOQUEUE_HAS_SAFE_UNREG
169# define IS_CLOSING(key) (key->closing)
170#else
171# define IS_CLOSING(key) (0)
172#endif
173
174
Benny Prijono9033e312005-11-21 02:08:39 +0000175/*
176 * ioqueue_dispatch_event()
177 *
178 * Report occurence of an event in the key to be processed by the
179 * framework.
180 */
181void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
182{
183 /* Lock the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000184 pj_mutex_lock(h->mutex);
185
Benny Prijono3059eb62006-10-04 20:46:27 +0000186 if (IS_CLOSING(h)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000187 pj_mutex_unlock(h->mutex);
188 return;
189 }
Benny Prijono9033e312005-11-21 02:08:39 +0000190
191#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
192 if (h->connecting) {
193 /* Completion of connect() operation */
194 pj_ssize_t bytes_transfered;
195
196 /* Clear operation. */
197 h->connecting = 0;
198
Benny Prijono63ab3562006-07-08 19:46:43 +0000199 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
200 ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000201
Benny Prijono9033e312005-11-21 02:08:39 +0000202
203#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
204 /* from connect(2):
205 * On Linux, use getsockopt to read the SO_ERROR option at
206 * level SOL_SOCKET to determine whether connect() completed
207 * successfully (if SO_ERROR is zero).
208 */
209 {
210 int value;
Benny Prijono7db431e2006-07-23 14:38:49 +0000211 int vallen = sizeof(value);
Benny Prijono2bbd7102006-07-18 00:10:53 +0000212 int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
213 &value, &vallen);
Benny Prijono9033e312005-11-21 02:08:39 +0000214 if (gs_rc != 0) {
215 /* Argh!! What to do now???
216 * Just indicate that the socket is connected. The
217 * application will get error as soon as it tries to use
218 * the socket to send/receive.
219 */
220 bytes_transfered = 0;
221 } else {
222 bytes_transfered = value;
223 }
224 }
225#elif defined(PJ_WIN32) && PJ_WIN32!=0
226 bytes_transfered = 0; /* success */
227#else
228 /* Excellent information in D.J. Bernstein page:
229 * http://cr.yp.to/docs/connect.html
230 *
231 * Seems like the most portable way of detecting connect()
232 * failure is to call getpeername(). If socket is connected,
233 * getpeername() will return 0. If the socket is not connected,
234 * it will return ENOTCONN, and read(fd, &ch, 1) will produce
235 * the right errno through error slippage. This is a combination
236 * of suggestions from Douglas C. Schmidt and Ken Keys.
237 */
Benny Prijono9cf138e2006-01-19 03:58:29 +0000238 {
239 int gp_rc;
240 struct sockaddr_in addr;
241 socklen_t addrlen = sizeof(addr);
Benny Prijono9033e312005-11-21 02:08:39 +0000242
Benny Prijono9cf138e2006-01-19 03:58:29 +0000243 gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000244 bytes_transfered = (gp_rc < 0) ? gp_rc : -gp_rc;
Benny Prijono9cf138e2006-01-19 03:58:29 +0000245 }
Benny Prijono9033e312005-11-21 02:08:39 +0000246#endif
247
Benny Prijono5accbd02006-03-30 16:32:18 +0000248 /* Unlock; from this point we don't need to hold key's mutex. */
249 pj_mutex_unlock(h->mutex);
250
Benny Prijono9033e312005-11-21 02:08:39 +0000251 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000252 if (h->cb.on_connect_complete && !IS_CLOSING(h))
Benny Prijono9033e312005-11-21 02:08:39 +0000253 (*h->cb.on_connect_complete)(h, bytes_transfered);
254
255 /* Done. */
256
257 } else
258#endif /* PJ_HAS_TCP */
259 if (key_has_pending_write(h)) {
260 /* Socket is writable. */
261 struct write_operation *write_op;
262 pj_ssize_t sent;
263 pj_status_t send_rc;
264
265 /* Get the first in the queue. */
266 write_op = h->write_list.next;
267
268 /* For datagrams, we can remove the write_op from the list
269 * so that send() can work in parallel.
270 */
271 if (h->fd_type == PJ_SOCK_DGRAM) {
272 pj_list_erase(write_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000273
274 if (pj_list_empty(&h->write_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000275 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000276
Benny Prijono9033e312005-11-21 02:08:39 +0000277 }
278
279 /* Send the data.
280 * Unfortunately we must do this while holding key's mutex, thus
281 * preventing parallel write on a single key.. :-((
282 */
283 sent = write_op->size - write_op->written;
284 if (write_op->op == PJ_IOQUEUE_OP_SEND) {
285 send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
286 &sent, write_op->flags);
Benny Prijono40212582006-06-22 18:41:28 +0000287 /* Can't do this. We only clear "op" after we're finished sending
288 * the whole buffer.
289 */
290 //write_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000291 } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
292 send_rc = pj_sock_sendto(h->fd,
293 write_op->buf+write_op->written,
294 &sent, write_op->flags,
295 &write_op->rmt_addr,
296 write_op->rmt_addrlen);
Benny Prijono40212582006-06-22 18:41:28 +0000297 /* Can't do this. We only clear "op" after we're finished sending
298 * the whole buffer.
299 */
300 //write_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000301 } else {
302 pj_assert(!"Invalid operation type!");
Benny Prijono37e8d332006-01-20 21:03:36 +0000303 write_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000304 send_rc = PJ_EBUG;
305 }
306
307 if (send_rc == PJ_SUCCESS) {
308 write_op->written += sent;
309 } else {
310 pj_assert(send_rc > 0);
311 write_op->written = -send_rc;
312 }
313
314 /* Are we finished with this buffer? */
315 if (send_rc!=PJ_SUCCESS ||
316 write_op->written == (pj_ssize_t)write_op->size ||
317 h->fd_type == PJ_SOCK_DGRAM)
318 {
Benny Prijono40212582006-06-22 18:41:28 +0000319
320 write_op->op = 0;
321
Benny Prijono9033e312005-11-21 02:08:39 +0000322 if (h->fd_type != PJ_SOCK_DGRAM) {
323 /* Write completion of the whole stream. */
324 pj_list_erase(write_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000325
326 /* Clear operation if there's no more data to send. */
327 if (pj_list_empty(&h->write_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000328 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000329
Benny Prijono9033e312005-11-21 02:08:39 +0000330 }
331
Benny Prijono5accbd02006-03-30 16:32:18 +0000332 /* No need to hold mutex anymore */
333 pj_mutex_unlock(h->mutex);
334
Benny Prijono9033e312005-11-21 02:08:39 +0000335 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000336 if (h->cb.on_write_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000337 (*h->cb.on_write_complete)(h,
338 (pj_ioqueue_op_key_t*)write_op,
339 write_op->written);
340 }
341
342 } else {
Benny Prijono5accbd02006-03-30 16:32:18 +0000343 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000344 }
345
346 /* Done. */
347 } else {
348 /*
349 * This is normal; execution may fall here when multiple threads
350 * are signalled for the same event, but only one thread eventually
351 * able to process the event.
352 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000353 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000354 }
355}
356
357void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
358{
359 pj_status_t rc;
360
361 /* Lock the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000362 pj_mutex_lock(h->mutex);
363
Benny Prijono3059eb62006-10-04 20:46:27 +0000364 if (IS_CLOSING(h)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000365 pj_mutex_unlock(h->mutex);
366 return;
367 }
Benny Prijono9033e312005-11-21 02:08:39 +0000368
369# if PJ_HAS_TCP
370 if (!pj_list_empty(&h->accept_list)) {
371
372 struct accept_operation *accept_op;
373
374 /* Get one accept operation from the list. */
375 accept_op = h->accept_list.next;
376 pj_list_erase(accept_op);
377 accept_op->op = 0;
378
379 /* Clear bit in fdset if there is no more pending accept */
380 if (pj_list_empty(&h->accept_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000381 ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000382
Benny Prijono9033e312005-11-21 02:08:39 +0000383 rc=pj_sock_accept(h->fd, accept_op->accept_fd,
384 accept_op->rmt_addr, accept_op->addrlen);
385 if (rc==PJ_SUCCESS && accept_op->local_addr) {
386 rc = pj_sock_getsockname(*accept_op->accept_fd,
387 accept_op->local_addr,
388 accept_op->addrlen);
389 }
390
Benny Prijono5accbd02006-03-30 16:32:18 +0000391 /* Unlock; from this point we don't need to hold key's mutex. */
392 pj_mutex_unlock(h->mutex);
393
Benny Prijono9033e312005-11-21 02:08:39 +0000394 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000395 if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000396 (*h->cb.on_accept_complete)(h,
397 (pj_ioqueue_op_key_t*)accept_op,
398 *accept_op->accept_fd, rc);
399 }
400
401 }
402 else
403# endif
404 if (key_has_pending_read(h)) {
405 struct read_operation *read_op;
406 pj_ssize_t bytes_read;
407
408 /* Get one pending read operation from the list. */
409 read_op = h->read_list.next;
410 pj_list_erase(read_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000411
412 /* Clear fdset if there is no pending read. */
413 if (pj_list_empty(&h->read_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000414 ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000415
Benny Prijono9033e312005-11-21 02:08:39 +0000416 bytes_read = read_op->size;
417
418 if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
Benny Prijono37e8d332006-01-20 21:03:36 +0000419 read_op->op = 0;
Benny Prijonof6b95302006-12-25 06:36:23 +0000420 rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read,
421 read_op->flags,
Benny Prijono9033e312005-11-21 02:08:39 +0000422 read_op->rmt_addr,
423 read_op->rmt_addrlen);
424 } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
Benny Prijono37e8d332006-01-20 21:03:36 +0000425 read_op->op = 0;
Benny Prijonof6b95302006-12-25 06:36:23 +0000426 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
427 read_op->flags);
Benny Prijono9033e312005-11-21 02:08:39 +0000428 } else {
429 pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
Benny Prijono37e8d332006-01-20 21:03:36 +0000430 read_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000431 /*
432 * User has specified pj_ioqueue_read().
433 * On Win32, we should do ReadFile(). But because we got
434 * here because of select() anyway, user must have put a
435 * socket descriptor on h->fd, which in this case we can
436 * just call pj_sock_recv() instead of ReadFile().
437 * On Unix, user may put a file in h->fd, so we'll have
438 * to call read() here.
439 * This may not compile on systems which doesn't have
440 * read(). That's why we only specify PJ_LINUX here so
441 * that error is easier to catch.
442 */
Benny Prijono9cf138e2006-01-19 03:58:29 +0000443# if defined(PJ_WIN32) && PJ_WIN32 != 0 || \
444 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE != 0
Benny Prijonof6b95302006-12-25 06:36:23 +0000445 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
446 read_op->flags);
Benny Prijono9033e312005-11-21 02:08:39 +0000447 //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
448 // &bytes_read, NULL);
449# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
450 bytes_read = read(h->fd, read_op->buf, bytes_read);
451 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
452# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
453 bytes_read = sys_read(h->fd, read_op->buf, bytes_read);
454 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
455# else
456# error "Implement read() for this platform!"
457# endif
458 }
459
460 if (rc != PJ_SUCCESS) {
461# if defined(PJ_WIN32) && PJ_WIN32 != 0
462 /* On Win32, for UDP, WSAECONNRESET on the receive side
463 * indicates that previous sending has triggered ICMP Port
464 * Unreachable message.
465 * But we wouldn't know at this point which one of previous
466 * key that has triggered the error, since UDP socket can
467 * be shared!
468 * So we'll just ignore it!
469 */
470
471 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
472 //PJ_LOG(4,(THIS_FILE,
473 // "Ignored ICMP port unreach. on key=%p", h));
474 }
475# endif
476
477 /* In any case we would report this to caller. */
478 bytes_read = -rc;
479 }
480
Benny Prijono5accbd02006-03-30 16:32:18 +0000481 /* Unlock; from this point we don't need to hold key's mutex. */
482 pj_mutex_unlock(h->mutex);
483
Benny Prijono9033e312005-11-21 02:08:39 +0000484 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000485 if (h->cb.on_read_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000486 (*h->cb.on_read_complete)(h,
487 (pj_ioqueue_op_key_t*)read_op,
488 bytes_read);
489 }
490
491 } else {
492 /*
493 * This is normal; execution may fall here when multiple threads
494 * are signalled for the same event, but only one thread eventually
495 * able to process the event.
496 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000497 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000498 }
499}
500
501
502void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
503 pj_ioqueue_key_t *h )
504{
Benny Prijono5accbd02006-03-30 16:32:18 +0000505 pj_mutex_lock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000506
507 if (!h->connecting) {
508 /* It is possible that more than one thread was woken up, thus
509 * the remaining thread will see h->connecting as zero because
510 * it has been processed by other thread.
511 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000512 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000513 return;
514 }
515
Benny Prijono3059eb62006-10-04 20:46:27 +0000516 if (IS_CLOSING(h)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000517 pj_mutex_unlock(h->mutex);
518 return;
519 }
520
Benny Prijono9033e312005-11-21 02:08:39 +0000521 /* Clear operation. */
522 h->connecting = 0;
523
Benny Prijono63ab3562006-07-08 19:46:43 +0000524 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
525 ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000526
Benny Prijono5accbd02006-03-30 16:32:18 +0000527 pj_mutex_unlock(h->mutex);
Benny Prijonoac52df42006-03-25 10:06:00 +0000528
Benny Prijono5accbd02006-03-30 16:32:18 +0000529 /* Call callback. */
Benny Prijono2bbd7102006-07-18 00:10:53 +0000530 if (h->cb.on_connect_complete && !IS_CLOSING(h)) {
531 pj_status_t status = -1;
532#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
533 int value;
Benny Prijono7db431e2006-07-23 14:38:49 +0000534 int vallen = sizeof(value);
Benny Prijono2bbd7102006-07-18 00:10:53 +0000535 int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
536 &value, &vallen);
537 if (gs_rc == 0) {
538 status = PJ_RETURN_OS_ERROR(value);
539 }
540#endif
541
542 (*h->cb.on_connect_complete)(h, status);
543 }
Benny Prijono9033e312005-11-21 02:08:39 +0000544}
545
546/*
547 * pj_ioqueue_recv()
548 *
549 * Start asynchronous recv() from the socket.
550 */
551PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
552 pj_ioqueue_op_key_t *op_key,
553 void *buffer,
554 pj_ssize_t *length,
555 unsigned flags )
556{
557 struct read_operation *read_op;
558
559 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
560 PJ_CHECK_STACK();
561
562 read_op = (struct read_operation*)op_key;
563 read_op->op = 0;
564
Benny Prijono5accbd02006-03-30 16:32:18 +0000565 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000566 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000567 return PJ_ECANCELLED;
568
Benny Prijono9033e312005-11-21 02:08:39 +0000569 /* Try to see if there's data immediately available.
570 */
571 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
572 pj_status_t status;
573 pj_ssize_t size;
574
575 size = *length;
576 status = pj_sock_recv(key->fd, buffer, &size, flags);
577 if (status == PJ_SUCCESS) {
578 /* Yes! Data is available! */
579 *length = size;
580 return PJ_SUCCESS;
581 } else {
582 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
583 * the error to caller.
584 */
585 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
586 return status;
587 }
588 }
589
590 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
591
592 /*
593 * No data is immediately available.
594 * Must schedule asynchronous operation to the ioqueue.
595 */
596 read_op->op = PJ_IOQUEUE_OP_RECV;
597 read_op->buf = buffer;
598 read_op->size = *length;
599 read_op->flags = flags;
600
601 pj_mutex_lock(key->mutex);
602 pj_list_insert_before(&key->read_list, read_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000603 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000604 pj_mutex_unlock(key->mutex);
605
606 return PJ_EPENDING;
607}
608
609/*
610 * pj_ioqueue_recvfrom()
611 *
612 * Start asynchronous recvfrom() from the socket.
613 */
614PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
615 pj_ioqueue_op_key_t *op_key,
616 void *buffer,
617 pj_ssize_t *length,
618 unsigned flags,
619 pj_sockaddr_t *addr,
620 int *addrlen)
621{
622 struct read_operation *read_op;
623
624 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
625 PJ_CHECK_STACK();
626
Benny Prijono5accbd02006-03-30 16:32:18 +0000627 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000628 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000629 return PJ_ECANCELLED;
630
Benny Prijono9033e312005-11-21 02:08:39 +0000631 read_op = (struct read_operation*)op_key;
632 read_op->op = 0;
633
634 /* Try to see if there's data immediately available.
635 */
636 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
637 pj_status_t status;
638 pj_ssize_t size;
639
640 size = *length;
641 status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
642 addr, addrlen);
643 if (status == PJ_SUCCESS) {
644 /* Yes! Data is available! */
645 *length = size;
646 return PJ_SUCCESS;
647 } else {
648 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
649 * the error to caller.
650 */
651 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
652 return status;
653 }
654 }
655
656 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
657
658 /*
659 * No data is immediately available.
660 * Must schedule asynchronous operation to the ioqueue.
661 */
662 read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
663 read_op->buf = buffer;
664 read_op->size = *length;
665 read_op->flags = flags;
666 read_op->rmt_addr = addr;
667 read_op->rmt_addrlen = addrlen;
668
669 pj_mutex_lock(key->mutex);
670 pj_list_insert_before(&key->read_list, read_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000671 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000672 pj_mutex_unlock(key->mutex);
673
674 return PJ_EPENDING;
675}
676
677/*
678 * pj_ioqueue_send()
679 *
680 * Start asynchronous send() to the descriptor.
681 */
682PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
683 pj_ioqueue_op_key_t *op_key,
684 const void *data,
685 pj_ssize_t *length,
686 unsigned flags)
687{
688 struct write_operation *write_op;
689 pj_status_t status;
Benny Prijono40212582006-06-22 18:41:28 +0000690 unsigned retry;
Benny Prijono9033e312005-11-21 02:08:39 +0000691 pj_ssize_t sent;
692
693 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
694 PJ_CHECK_STACK();
695
Benny Prijono5accbd02006-03-30 16:32:18 +0000696 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000697 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000698 return PJ_ECANCELLED;
699
Benny Prijono9033e312005-11-21 02:08:39 +0000700 /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */
701 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
702
703 /* Fast track:
704 * Try to send data immediately, only if there's no pending write!
705 * Note:
706 * We are speculating that the list is empty here without properly
707 * acquiring ioqueue's mutex first. This is intentional, to maximize
708 * performance via parallelism.
709 *
710 * This should be safe, because:
711 * - by convention, we require caller to make sure that the
712 * key is not unregistered while other threads are invoking
713 * an operation on the same key.
714 * - pj_list_empty() is safe to be invoked by multiple threads,
715 * even when other threads are modifying the list.
716 */
717 if (pj_list_empty(&key->write_list)) {
718 /*
719 * See if data can be sent immediately.
720 */
721 sent = *length;
722 status = pj_sock_send(key->fd, data, &sent, flags);
723 if (status == PJ_SUCCESS) {
724 /* Success! */
725 *length = sent;
726 return PJ_SUCCESS;
727 } else {
728 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
729 * the error to caller.
730 */
731 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
732 return status;
733 }
734 }
735 }
736
737 /*
738 * Schedule asynchronous send.
739 */
Benny Prijono40212582006-06-22 18:41:28 +0000740 write_op = (struct write_operation*)op_key;
741
742 /* Spin if write_op has pending operation */
743 for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
744 pj_thread_sleep(0);
745
746 /* Last chance */
747 if (write_op->op) {
748 /* Unable to send packet because there is already pending write in the
749 * write_op. We could not put the operation into the write_op
750 * because write_op already contains a pending operation! And
751 * we could not send the packet directly with send() either,
752 * because that will break the order of the packet. So we can
753 * only return error here.
754 *
755 * This could happen for example in multithreads program,
756 * where polling is done by one thread, while other threads are doing
757 * the sending only. If the polling thread runs on lower priority
758 * than the sending thread, then it's possible that the pending
759 * write flag is not cleared in-time because clearing is only done
760 * during polling.
761 *
762 * Aplication should specify multiple write operation keys on
763 * situation like this.
764 */
765 //pj_assert(!"ioqueue: there is pending operation on this key!");
766 return PJ_EBUSY;
767 }
768
Benny Prijono9033e312005-11-21 02:08:39 +0000769 write_op->op = PJ_IOQUEUE_OP_SEND;
770 write_op->buf = (void*)data;
771 write_op->size = *length;
772 write_op->written = 0;
773 write_op->flags = flags;
774
775 pj_mutex_lock(key->mutex);
776 pj_list_insert_before(&key->write_list, write_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000777 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000778 pj_mutex_unlock(key->mutex);
779
780 return PJ_EPENDING;
781}
782
783
784/*
785 * pj_ioqueue_sendto()
786 *
787 * Start asynchronous write() to the descriptor.
788 */
789PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
790 pj_ioqueue_op_key_t *op_key,
791 const void *data,
792 pj_ssize_t *length,
793 pj_uint32_t flags,
794 const pj_sockaddr_t *addr,
795 int addrlen)
796{
797 struct write_operation *write_op;
Benny Prijono40212582006-06-22 18:41:28 +0000798 unsigned retry;
Benny Prijono9033e312005-11-21 02:08:39 +0000799 pj_status_t status;
800 pj_ssize_t sent;
801
802 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
803 PJ_CHECK_STACK();
804
Benny Prijono5accbd02006-03-30 16:32:18 +0000805 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000806 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000807 return PJ_ECANCELLED;
808
Benny Prijono9033e312005-11-21 02:08:39 +0000809 /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */
810 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
811
812 /* Fast track:
813 * Try to send data immediately, only if there's no pending write!
814 * Note:
815 * We are speculating that the list is empty here without properly
816 * acquiring ioqueue's mutex first. This is intentional, to maximize
817 * performance via parallelism.
818 *
819 * This should be safe, because:
820 * - by convention, we require caller to make sure that the
821 * key is not unregistered while other threads are invoking
822 * an operation on the same key.
823 * - pj_list_empty() is safe to be invoked by multiple threads,
824 * even when other threads are modifying the list.
825 */
826 if (pj_list_empty(&key->write_list)) {
827 /*
828 * See if data can be sent immediately.
829 */
830 sent = *length;
831 status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
832 if (status == PJ_SUCCESS) {
833 /* Success! */
834 *length = sent;
835 return PJ_SUCCESS;
836 } else {
837 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
838 * the error to caller.
839 */
840 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
841 return status;
842 }
Benny Prijono40212582006-06-22 18:41:28 +0000843 status = status;
Benny Prijono9033e312005-11-21 02:08:39 +0000844 }
845 }
846
847 /*
848 * Check that address storage can hold the address parameter.
849 */
850 PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);
851
852 /*
853 * Schedule asynchronous send.
854 */
Benny Prijono40212582006-06-22 18:41:28 +0000855 write_op = (struct write_operation*)op_key;
856
857 /* Spin if write_op has pending operation */
858 for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
859 pj_thread_sleep(0);
860
861 /* Last chance */
862 if (write_op->op) {
863 /* Unable to send packet because there is already pending write on the
864 * write_op. We could not put the operation into the write_op
865 * because write_op already contains a pending operation! And
866 * we could not send the packet directly with sendto() either,
867 * because that will break the order of the packet. So we can
868 * only return error here.
869 *
870 * This could happen for example in multithreads program,
871 * where polling is done by one thread, while other threads are doing
872 * the sending only. If the polling thread runs on lower priority
873 * than the sending thread, then it's possible that the pending
874 * write flag is not cleared in-time because clearing is only done
875 * during polling.
876 *
877 * Aplication should specify multiple write operation keys on
878 * situation like this.
879 */
880 //pj_assert(!"ioqueue: there is pending operation on this key!");
881 return PJ_EBUSY;
882 }
883
Benny Prijono9033e312005-11-21 02:08:39 +0000884 write_op->op = PJ_IOQUEUE_OP_SEND_TO;
885 write_op->buf = (void*)data;
886 write_op->size = *length;
887 write_op->written = 0;
888 write_op->flags = flags;
889 pj_memcpy(&write_op->rmt_addr, addr, addrlen);
890 write_op->rmt_addrlen = addrlen;
891
892 pj_mutex_lock(key->mutex);
893 pj_list_insert_before(&key->write_list, write_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000894 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000895 pj_mutex_unlock(key->mutex);
896
897 return PJ_EPENDING;
898}
899
900#if PJ_HAS_TCP
901/*
902 * Initiate overlapped accept() operation.
903 */
904PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
905 pj_ioqueue_op_key_t *op_key,
906 pj_sock_t *new_sock,
907 pj_sockaddr_t *local,
908 pj_sockaddr_t *remote,
909 int *addrlen)
910{
911 struct accept_operation *accept_op;
912 pj_status_t status;
913
914 /* check parameters. All must be specified! */
915 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
916
Benny Prijono5accbd02006-03-30 16:32:18 +0000917 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000918 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000919 return PJ_ECANCELLED;
920
Benny Prijono9033e312005-11-21 02:08:39 +0000921 accept_op = (struct accept_operation*)op_key;
922 accept_op->op = 0;
923
924 /* Fast track:
925 * See if there's new connection available immediately.
926 */
927 if (pj_list_empty(&key->accept_list)) {
928 status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
929 if (status == PJ_SUCCESS) {
930 /* Yes! New connection is available! */
931 if (local && addrlen) {
932 status = pj_sock_getsockname(*new_sock, local, addrlen);
933 if (status != PJ_SUCCESS) {
934 pj_sock_close(*new_sock);
935 *new_sock = PJ_INVALID_SOCKET;
936 return status;
937 }
938 }
939 return PJ_SUCCESS;
940 } else {
941 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
942 * the error to caller.
943 */
944 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
945 return status;
946 }
947 }
948 }
949
950 /*
951 * No connection is available immediately.
952 * Schedule accept() operation to be completed when there is incoming
953 * connection available.
954 */
955 accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
956 accept_op->accept_fd = new_sock;
957 accept_op->rmt_addr = remote;
958 accept_op->addrlen= addrlen;
959 accept_op->local_addr = local;
960
961 pj_mutex_lock(key->mutex);
962 pj_list_insert_before(&key->accept_list, accept_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000963 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000964 pj_mutex_unlock(key->mutex);
965
966 return PJ_EPENDING;
967}
968
969/*
970 * Initiate overlapped connect() operation (well, it's non-blocking actually,
971 * since there's no overlapped version of connect()).
972 */
973PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
974 const pj_sockaddr_t *addr,
975 int addrlen )
976{
977 pj_status_t status;
978
979 /* check parameters. All must be specified! */
980 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
981
Benny Prijono5accbd02006-03-30 16:32:18 +0000982 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000983 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000984 return PJ_ECANCELLED;
985
Benny Prijono9033e312005-11-21 02:08:39 +0000986 /* Check if socket has not been marked for connecting */
987 if (key->connecting != 0)
988 return PJ_EPENDING;
989
990 status = pj_sock_connect(key->fd, addr, addrlen);
991 if (status == PJ_SUCCESS) {
992 /* Connected! */
993 return PJ_SUCCESS;
994 } else {
995 if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
996 /* Pending! */
997 pj_mutex_lock(key->mutex);
998 key->connecting = PJ_TRUE;
Benny Prijono63ab3562006-07-08 19:46:43 +0000999 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
1000 ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +00001001 pj_mutex_unlock(key->mutex);
1002 return PJ_EPENDING;
1003 } else {
1004 /* Error! */
1005 return status;
1006 }
1007 }
1008}
1009#endif /* PJ_HAS_TCP */
1010
1011
1012PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1013 pj_size_t size )
1014{
Benny Prijonoac623b32006-07-03 15:19:31 +00001015 pj_bzero(op_key, size);
Benny Prijono9033e312005-11-21 02:08:39 +00001016}
1017
1018
1019/*
1020 * pj_ioqueue_is_pending()
1021 */
1022PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1023 pj_ioqueue_op_key_t *op_key )
1024{
1025 struct generic_operation *op_rec;
1026
1027 PJ_UNUSED_ARG(key);
1028
1029 op_rec = (struct generic_operation*)op_key;
1030 return op_rec->op != 0;
1031}
1032
1033
1034/*
1035 * pj_ioqueue_post_completion()
1036 */
1037PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1038 pj_ioqueue_op_key_t *op_key,
1039 pj_ssize_t bytes_status )
1040{
1041 struct generic_operation *op_rec;
1042
1043 /*
1044 * Find the operation key in all pending operation list to
1045 * really make sure that it's still there; then call the callback.
1046 */
Benny Prijono5accbd02006-03-30 16:32:18 +00001047 pj_mutex_lock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001048
1049 /* Find the operation in the pending read list. */
1050 op_rec = (struct generic_operation*)key->read_list.next;
1051 while (op_rec != (void*)&key->read_list) {
1052 if (op_rec == (void*)op_key) {
1053 pj_list_erase(op_rec);
1054 op_rec->op = 0;
Benny Prijono5accbd02006-03-30 16:32:18 +00001055 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001056
1057 (*key->cb.on_read_complete)(key, op_key, bytes_status);
1058 return PJ_SUCCESS;
1059 }
1060 op_rec = op_rec->next;
1061 }
1062
1063 /* Find the operation in the pending write list. */
1064 op_rec = (struct generic_operation*)key->write_list.next;
1065 while (op_rec != (void*)&key->write_list) {
1066 if (op_rec == (void*)op_key) {
1067 pj_list_erase(op_rec);
1068 op_rec->op = 0;
Benny Prijono5accbd02006-03-30 16:32:18 +00001069 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001070
1071 (*key->cb.on_write_complete)(key, op_key, bytes_status);
1072 return PJ_SUCCESS;
1073 }
1074 op_rec = op_rec->next;
1075 }
1076
1077 /* Find the operation in the pending accept list. */
1078 op_rec = (struct generic_operation*)key->accept_list.next;
1079 while (op_rec != (void*)&key->accept_list) {
1080 if (op_rec == (void*)op_key) {
1081 pj_list_erase(op_rec);
1082 op_rec->op = 0;
Benny Prijono5accbd02006-03-30 16:32:18 +00001083 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001084
1085 (*key->cb.on_accept_complete)(key, op_key,
1086 PJ_INVALID_SOCKET,
1087 bytes_status);
1088 return PJ_SUCCESS;
1089 }
1090 op_rec = op_rec->next;
1091 }
1092
Benny Prijono5accbd02006-03-30 16:32:18 +00001093 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001094
1095 return PJ_EINVALIDOP;
1096}
1097