blob: 07ed838c820a7db72c33fa0b0d61681e3b20b800 [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
Benny Prijono844653c2008-12-23 17:27:53 +00003 * Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono9033e312005-11-21 02:08:39 +00005 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20
21/*
22 * ioqueue_common_abs.c
23 *
24 * This contains common functionalities to emulate proactor pattern with
25 * various event dispatching mechanisms (e.g. select, epoll).
26 *
27 * This file will be included by the appropriate ioqueue implementation.
28 * This file is NOT supposed to be compiled as stand-alone source.
29 */
30
Benny Prijono40212582006-06-22 18:41:28 +000031#define PENDING_RETRY 2
32
Benny Prijono9033e312005-11-21 02:08:39 +000033static void ioqueue_init( pj_ioqueue_t *ioqueue )
34{
35 ioqueue->lock = NULL;
36 ioqueue->auto_delete_lock = 0;
Benny Prijono8eb763c2008-07-12 10:09:39 +000037 ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
Benny Prijono9033e312005-11-21 02:08:39 +000038}
39
40static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
41{
42 if (ioqueue->auto_delete_lock && ioqueue->lock ) {
43 pj_lock_release(ioqueue->lock);
44 return pj_lock_destroy(ioqueue->lock);
Nanang Izzuddin838cb322008-12-18 17:52:57 +000045 }
46
47 return PJ_SUCCESS;
Benny Prijono9033e312005-11-21 02:08:39 +000048}
49
50/*
51 * pj_ioqueue_set_lock()
52 */
53PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
54 pj_lock_t *lock,
55 pj_bool_t auto_delete )
56{
57 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
58
59 if (ioqueue->auto_delete_lock && ioqueue->lock) {
60 pj_lock_destroy(ioqueue->lock);
61 }
62
63 ioqueue->lock = lock;
64 ioqueue->auto_delete_lock = auto_delete;
65
66 return PJ_SUCCESS;
67}
68
69static pj_status_t ioqueue_init_key( pj_pool_t *pool,
70 pj_ioqueue_t *ioqueue,
71 pj_ioqueue_key_t *key,
72 pj_sock_t sock,
73 void *user_data,
74 const pj_ioqueue_callback *cb)
75{
76 pj_status_t rc;
77 int optlen;
78
Benny Prijono8befd9f2006-05-13 22:46:23 +000079 PJ_UNUSED_ARG(pool);
80
Benny Prijono9033e312005-11-21 02:08:39 +000081 key->ioqueue = ioqueue;
82 key->fd = sock;
83 key->user_data = user_data;
84 pj_list_init(&key->read_list);
85 pj_list_init(&key->write_list);
86#if PJ_HAS_TCP
87 pj_list_init(&key->accept_list);
Benny Prijono5accbd02006-03-30 16:32:18 +000088 key->connecting = 0;
Benny Prijono9033e312005-11-21 02:08:39 +000089#endif
90
91 /* Save callback. */
92 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
93
Benny Prijono5accbd02006-03-30 16:32:18 +000094#if PJ_IOQUEUE_HAS_SAFE_UNREG
95 /* Set initial reference count to 1 */
96 pj_assert(key->ref_count == 0);
97 ++key->ref_count;
98
99 key->closing = 0;
100#endif
101
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000102 rc = pj_ioqueue_set_concurrency(key, ioqueue->default_concurrency);
103 if (rc != PJ_SUCCESS)
104 return rc;
105
Benny Prijono9033e312005-11-21 02:08:39 +0000106 /* Get socket type. When socket type is datagram, some optimization
107 * will be performed during send to allow parallel send operations.
108 */
109 optlen = sizeof(key->fd_type);
Benny Prijono8ab968f2007-07-20 08:08:30 +0000110 rc = pj_sock_getsockopt(sock, pj_SOL_SOCKET(), pj_SO_TYPE(),
Benny Prijono9033e312005-11-21 02:08:39 +0000111 &key->fd_type, &optlen);
112 if (rc != PJ_SUCCESS)
Benny Prijono8ab968f2007-07-20 08:08:30 +0000113 key->fd_type = pj_SOCK_STREAM();
Benny Prijono9033e312005-11-21 02:08:39 +0000114
115 /* Create mutex for the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000116#if !PJ_IOQUEUE_HAS_SAFE_UNREG
117 rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
118#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000119
120 return rc;
121}
122
Benny Prijono9033e312005-11-21 02:08:39 +0000123/*
124 * pj_ioqueue_get_user_data()
125 *
126 * Obtain value associated with a key.
127 */
128PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
129{
130 PJ_ASSERT_RETURN(key != NULL, NULL);
131 return key->user_data;
132}
133
134/*
135 * pj_ioqueue_set_user_data()
136 */
137PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
138 void *user_data,
139 void **old_data)
140{
141 PJ_ASSERT_RETURN(key, PJ_EINVAL);
142
143 if (old_data)
144 *old_data = key->user_data;
145 key->user_data = user_data;
146
147 return PJ_SUCCESS;
148}
149
150PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
151{
152 return !pj_list_empty(&key->write_list);
153}
154
155PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
156{
157 return !pj_list_empty(&key->read_list);
158}
159
160PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
161{
162#if PJ_HAS_TCP
163 return !pj_list_empty(&key->accept_list);
164#else
Benny Prijono3569c0d2007-04-06 10:29:20 +0000165 PJ_UNUSED_ARG(key);
Benny Prijono9033e312005-11-21 02:08:39 +0000166 return 0;
167#endif
168}
169
170PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
171{
172 return key->connecting;
173}
174
175
Benny Prijono5accbd02006-03-30 16:32:18 +0000176#if PJ_IOQUEUE_HAS_SAFE_UNREG
177# define IS_CLOSING(key) (key->closing)
178#else
179# define IS_CLOSING(key) (0)
180#endif
181
182
Benny Prijono9033e312005-11-21 02:08:39 +0000183/*
184 * ioqueue_dispatch_event()
185 *
186 * Report occurence of an event in the key to be processed by the
187 * framework.
188 */
189void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
190{
191 /* Lock the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000192 pj_mutex_lock(h->mutex);
193
Benny Prijono3059eb62006-10-04 20:46:27 +0000194 if (IS_CLOSING(h)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000195 pj_mutex_unlock(h->mutex);
196 return;
197 }
Benny Prijono9033e312005-11-21 02:08:39 +0000198
199#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
200 if (h->connecting) {
201 /* Completion of connect() operation */
Benny Prijono653af2e2010-02-05 11:11:52 +0000202 pj_status_t status;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000203 pj_bool_t has_lock;
Benny Prijono9033e312005-11-21 02:08:39 +0000204
205 /* Clear operation. */
206 h->connecting = 0;
207
Benny Prijono63ab3562006-07-08 19:46:43 +0000208 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
209 ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000210
Benny Prijono9033e312005-11-21 02:08:39 +0000211
212#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
213 /* from connect(2):
214 * On Linux, use getsockopt to read the SO_ERROR option at
215 * level SOL_SOCKET to determine whether connect() completed
216 * successfully (if SO_ERROR is zero).
217 */
218 {
219 int value;
Benny Prijono7db431e2006-07-23 14:38:49 +0000220 int vallen = sizeof(value);
Benny Prijono2bbd7102006-07-18 00:10:53 +0000221 int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
222 &value, &vallen);
Benny Prijono9033e312005-11-21 02:08:39 +0000223 if (gs_rc != 0) {
224 /* Argh!! What to do now???
225 * Just indicate that the socket is connected. The
226 * application will get error as soon as it tries to use
227 * the socket to send/receive.
228 */
Benny Prijono653af2e2010-02-05 11:11:52 +0000229 status = PJ_SUCCESS;
Benny Prijono9033e312005-11-21 02:08:39 +0000230 } else {
Benny Prijono653af2e2010-02-05 11:11:52 +0000231 status = PJ_STATUS_FROM_OS(value);
Benny Prijono9033e312005-11-21 02:08:39 +0000232 }
233 }
234#elif defined(PJ_WIN32) && PJ_WIN32!=0
Benny Prijono653af2e2010-02-05 11:11:52 +0000235 status = PJ_SUCCESS; /* success */
Benny Prijono9033e312005-11-21 02:08:39 +0000236#else
237 /* Excellent information in D.J. Bernstein page:
238 * http://cr.yp.to/docs/connect.html
239 *
240 * Seems like the most portable way of detecting connect()
241 * failure is to call getpeername(). If socket is connected,
242 * getpeername() will return 0. If the socket is not connected,
243 * it will return ENOTCONN, and read(fd, &ch, 1) will produce
244 * the right errno through error slippage. This is a combination
245 * of suggestions from Douglas C. Schmidt and Ken Keys.
246 */
Benny Prijono9cf138e2006-01-19 03:58:29 +0000247 {
Benny Prijono9cf138e2006-01-19 03:58:29 +0000248 struct sockaddr_in addr;
Benny Prijono653af2e2010-02-05 11:11:52 +0000249 int addrlen = sizeof(addr);
Benny Prijono9033e312005-11-21 02:08:39 +0000250
Benny Prijono653af2e2010-02-05 11:11:52 +0000251 status = pj_sock_getpeername(h->fd, (struct sockaddr*)&addr,
252 &addrlen);
Benny Prijono9cf138e2006-01-19 03:58:29 +0000253 }
Benny Prijono9033e312005-11-21 02:08:39 +0000254#endif
255
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000256 /* Unlock; from this point we don't need to hold key's mutex
257 * (unless concurrency is disabled, which in this case we should
258 * hold the mutex while calling the callback) */
259 if (h->allow_concurrent) {
260 /* concurrency may be changed while we're in the callback, so
261 * save it to a flag.
262 */
263 has_lock = PJ_FALSE;
264 pj_mutex_unlock(h->mutex);
265 } else {
266 has_lock = PJ_TRUE;
267 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000268
Benny Prijono9033e312005-11-21 02:08:39 +0000269 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000270 if (h->cb.on_connect_complete && !IS_CLOSING(h))
Benny Prijono653af2e2010-02-05 11:11:52 +0000271 (*h->cb.on_connect_complete)(h, status);
Benny Prijono9033e312005-11-21 02:08:39 +0000272
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000273 /* Unlock if we still hold the lock */
274 if (has_lock) {
275 pj_mutex_unlock(h->mutex);
276 }
277
Benny Prijono9033e312005-11-21 02:08:39 +0000278 /* Done. */
279
280 } else
281#endif /* PJ_HAS_TCP */
282 if (key_has_pending_write(h)) {
283 /* Socket is writable. */
284 struct write_operation *write_op;
285 pj_ssize_t sent;
286 pj_status_t send_rc;
287
288 /* Get the first in the queue. */
289 write_op = h->write_list.next;
290
291 /* For datagrams, we can remove the write_op from the list
292 * so that send() can work in parallel.
293 */
Benny Prijono8ab968f2007-07-20 08:08:30 +0000294 if (h->fd_type == pj_SOCK_DGRAM()) {
Benny Prijono9033e312005-11-21 02:08:39 +0000295 pj_list_erase(write_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000296
297 if (pj_list_empty(&h->write_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000298 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000299
Benny Prijono9033e312005-11-21 02:08:39 +0000300 }
301
302 /* Send the data.
303 * Unfortunately we must do this while holding key's mutex, thus
304 * preventing parallel write on a single key.. :-((
305 */
306 sent = write_op->size - write_op->written;
307 if (write_op->op == PJ_IOQUEUE_OP_SEND) {
308 send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
309 &sent, write_op->flags);
Benny Prijono40212582006-06-22 18:41:28 +0000310 /* Can't do this. We only clear "op" after we're finished sending
311 * the whole buffer.
312 */
313 //write_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000314 } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
315 send_rc = pj_sock_sendto(h->fd,
316 write_op->buf+write_op->written,
317 &sent, write_op->flags,
318 &write_op->rmt_addr,
319 write_op->rmt_addrlen);
Benny Prijono40212582006-06-22 18:41:28 +0000320 /* Can't do this. We only clear "op" after we're finished sending
321 * the whole buffer.
322 */
323 //write_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000324 } else {
325 pj_assert(!"Invalid operation type!");
Benny Prijonoa1e69682007-05-11 15:14:34 +0000326 write_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000327 send_rc = PJ_EBUG;
328 }
329
330 if (send_rc == PJ_SUCCESS) {
331 write_op->written += sent;
332 } else {
333 pj_assert(send_rc > 0);
334 write_op->written = -send_rc;
335 }
336
337 /* Are we finished with this buffer? */
338 if (send_rc!=PJ_SUCCESS ||
339 write_op->written == (pj_ssize_t)write_op->size ||
Benny Prijono8ab968f2007-07-20 08:08:30 +0000340 h->fd_type == pj_SOCK_DGRAM())
Benny Prijono9033e312005-11-21 02:08:39 +0000341 {
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000342 pj_bool_t has_lock;
Benny Prijono40212582006-06-22 18:41:28 +0000343
Benny Prijonoa1e69682007-05-11 15:14:34 +0000344 write_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono40212582006-06-22 18:41:28 +0000345
Benny Prijono8ab968f2007-07-20 08:08:30 +0000346 if (h->fd_type != pj_SOCK_DGRAM()) {
Benny Prijono9033e312005-11-21 02:08:39 +0000347 /* Write completion of the whole stream. */
348 pj_list_erase(write_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000349
350 /* Clear operation if there's no more data to send. */
351 if (pj_list_empty(&h->write_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000352 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000353
Benny Prijono9033e312005-11-21 02:08:39 +0000354 }
355
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000356 /* Unlock; from this point we don't need to hold key's mutex
357 * (unless concurrency is disabled, which in this case we should
358 * hold the mutex while calling the callback) */
359 if (h->allow_concurrent) {
360 /* concurrency may be changed while we're in the callback, so
361 * save it to a flag.
362 */
363 has_lock = PJ_FALSE;
364 pj_mutex_unlock(h->mutex);
365 } else {
366 has_lock = PJ_TRUE;
367 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000368
Benny Prijono9033e312005-11-21 02:08:39 +0000369 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000370 if (h->cb.on_write_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000371 (*h->cb.on_write_complete)(h,
372 (pj_ioqueue_op_key_t*)write_op,
373 write_op->written);
374 }
375
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000376 if (has_lock) {
377 pj_mutex_unlock(h->mutex);
378 }
379
Benny Prijono9033e312005-11-21 02:08:39 +0000380 } else {
Benny Prijono5accbd02006-03-30 16:32:18 +0000381 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000382 }
383
384 /* Done. */
385 } else {
386 /*
387 * This is normal; execution may fall here when multiple threads
388 * are signalled for the same event, but only one thread eventually
389 * able to process the event.
390 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000391 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000392 }
393}
394
395void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
396{
397 pj_status_t rc;
398
399 /* Lock the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000400 pj_mutex_lock(h->mutex);
401
Benny Prijono3059eb62006-10-04 20:46:27 +0000402 if (IS_CLOSING(h)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000403 pj_mutex_unlock(h->mutex);
404 return;
405 }
Benny Prijono9033e312005-11-21 02:08:39 +0000406
407# if PJ_HAS_TCP
408 if (!pj_list_empty(&h->accept_list)) {
409
410 struct accept_operation *accept_op;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000411 pj_bool_t has_lock;
Benny Prijono9033e312005-11-21 02:08:39 +0000412
413 /* Get one accept operation from the list. */
414 accept_op = h->accept_list.next;
415 pj_list_erase(accept_op);
Benny Prijonoa1e69682007-05-11 15:14:34 +0000416 accept_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000417
418 /* Clear bit in fdset if there is no more pending accept */
419 if (pj_list_empty(&h->accept_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000420 ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000421
Benny Prijono9033e312005-11-21 02:08:39 +0000422 rc=pj_sock_accept(h->fd, accept_op->accept_fd,
423 accept_op->rmt_addr, accept_op->addrlen);
424 if (rc==PJ_SUCCESS && accept_op->local_addr) {
425 rc = pj_sock_getsockname(*accept_op->accept_fd,
426 accept_op->local_addr,
427 accept_op->addrlen);
428 }
429
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000430 /* Unlock; from this point we don't need to hold key's mutex
431 * (unless concurrency is disabled, which in this case we should
432 * hold the mutex while calling the callback) */
433 if (h->allow_concurrent) {
434 /* concurrency may be changed while we're in the callback, so
435 * save it to a flag.
436 */
437 has_lock = PJ_FALSE;
438 pj_mutex_unlock(h->mutex);
439 } else {
440 has_lock = PJ_TRUE;
441 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000442
Benny Prijono9033e312005-11-21 02:08:39 +0000443 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000444 if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000445 (*h->cb.on_accept_complete)(h,
446 (pj_ioqueue_op_key_t*)accept_op,
447 *accept_op->accept_fd, rc);
448 }
449
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000450 if (has_lock) {
451 pj_mutex_unlock(h->mutex);
452 }
Benny Prijono9033e312005-11-21 02:08:39 +0000453 }
454 else
455# endif
456 if (key_has_pending_read(h)) {
457 struct read_operation *read_op;
458 pj_ssize_t bytes_read;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000459 pj_bool_t has_lock;
Benny Prijono9033e312005-11-21 02:08:39 +0000460
461 /* Get one pending read operation from the list. */
462 read_op = h->read_list.next;
463 pj_list_erase(read_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000464
465 /* Clear fdset if there is no pending read. */
466 if (pj_list_empty(&h->read_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000467 ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000468
Benny Prijono9033e312005-11-21 02:08:39 +0000469 bytes_read = read_op->size;
470
471 if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
Benny Prijonoa1e69682007-05-11 15:14:34 +0000472 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijonof6b95302006-12-25 06:36:23 +0000473 rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read,
474 read_op->flags,
Benny Prijono9033e312005-11-21 02:08:39 +0000475 read_op->rmt_addr,
476 read_op->rmt_addrlen);
477 } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
Benny Prijonoa1e69682007-05-11 15:14:34 +0000478 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijonof6b95302006-12-25 06:36:23 +0000479 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
480 read_op->flags);
Benny Prijono9033e312005-11-21 02:08:39 +0000481 } else {
482 pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
Benny Prijonoa1e69682007-05-11 15:14:34 +0000483 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000484 /*
485 * User has specified pj_ioqueue_read().
486 * On Win32, we should do ReadFile(). But because we got
487 * here because of select() anyway, user must have put a
488 * socket descriptor on h->fd, which in this case we can
489 * just call pj_sock_recv() instead of ReadFile().
490 * On Unix, user may put a file in h->fd, so we'll have
491 * to call read() here.
492 * This may not compile on systems which doesn't have
493 * read(). That's why we only specify PJ_LINUX here so
494 * that error is easier to catch.
495 */
Benny Prijono9cf138e2006-01-19 03:58:29 +0000496# if defined(PJ_WIN32) && PJ_WIN32 != 0 || \
497 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE != 0
Benny Prijonof6b95302006-12-25 06:36:23 +0000498 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
499 read_op->flags);
Benny Prijono9033e312005-11-21 02:08:39 +0000500 //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
501 // &bytes_read, NULL);
502# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
503 bytes_read = read(h->fd, read_op->buf, bytes_read);
504 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
505# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
506 bytes_read = sys_read(h->fd, read_op->buf, bytes_read);
507 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
508# else
509# error "Implement read() for this platform!"
510# endif
511 }
512
513 if (rc != PJ_SUCCESS) {
514# if defined(PJ_WIN32) && PJ_WIN32 != 0
515 /* On Win32, for UDP, WSAECONNRESET on the receive side
516 * indicates that previous sending has triggered ICMP Port
517 * Unreachable message.
518 * But we wouldn't know at this point which one of previous
519 * key that has triggered the error, since UDP socket can
520 * be shared!
521 * So we'll just ignore it!
522 */
523
524 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
525 //PJ_LOG(4,(THIS_FILE,
526 // "Ignored ICMP port unreach. on key=%p", h));
527 }
528# endif
529
530 /* In any case we would report this to caller. */
531 bytes_read = -rc;
Sauw Mingbe3771a2010-08-27 06:46:29 +0000532
533#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
534 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
535 /* Special treatment for dead UDP sockets here, see ticket #1107 */
536 if (rc == PJ_STATUS_FROM_OS(ENOTCONN) && !IS_CLOSING(h) &&
537 h->fd_type==pj_SOCK_DGRAM())
538 {
539 replace_udp_sock(h);
540 }
541#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000542 }
543
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000544 /* Unlock; from this point we don't need to hold key's mutex
545 * (unless concurrency is disabled, which in this case we should
546 * hold the mutex while calling the callback) */
547 if (h->allow_concurrent) {
548 /* concurrency may be changed while we're in the callback, so
549 * save it to a flag.
550 */
551 has_lock = PJ_FALSE;
552 pj_mutex_unlock(h->mutex);
553 } else {
554 has_lock = PJ_TRUE;
555 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000556
Benny Prijono9033e312005-11-21 02:08:39 +0000557 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000558 if (h->cb.on_read_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000559 (*h->cb.on_read_complete)(h,
560 (pj_ioqueue_op_key_t*)read_op,
561 bytes_read);
562 }
563
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000564 if (has_lock) {
565 pj_mutex_unlock(h->mutex);
566 }
567
Benny Prijono9033e312005-11-21 02:08:39 +0000568 } else {
569 /*
570 * This is normal; execution may fall here when multiple threads
571 * are signalled for the same event, but only one thread eventually
572 * able to process the event.
573 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000574 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000575 }
576}
577
578
579void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
580 pj_ioqueue_key_t *h )
581{
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000582 pj_bool_t has_lock;
583
Benny Prijono5accbd02006-03-30 16:32:18 +0000584 pj_mutex_lock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000585
586 if (!h->connecting) {
587 /* It is possible that more than one thread was woken up, thus
588 * the remaining thread will see h->connecting as zero because
589 * it has been processed by other thread.
590 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000591 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000592 return;
593 }
594
Benny Prijono3059eb62006-10-04 20:46:27 +0000595 if (IS_CLOSING(h)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000596 pj_mutex_unlock(h->mutex);
597 return;
598 }
599
Benny Prijono9033e312005-11-21 02:08:39 +0000600 /* Clear operation. */
601 h->connecting = 0;
602
Benny Prijono63ab3562006-07-08 19:46:43 +0000603 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
604 ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000605
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000606 /* Unlock; from this point we don't need to hold key's mutex
607 * (unless concurrency is disabled, which in this case we should
608 * hold the mutex while calling the callback) */
609 if (h->allow_concurrent) {
610 /* concurrency may be changed while we're in the callback, so
611 * save it to a flag.
612 */
613 has_lock = PJ_FALSE;
614 pj_mutex_unlock(h->mutex);
615 } else {
616 has_lock = PJ_TRUE;
617 }
Benny Prijonoac52df42006-03-25 10:06:00 +0000618
Benny Prijono5accbd02006-03-30 16:32:18 +0000619 /* Call callback. */
Benny Prijono2bbd7102006-07-18 00:10:53 +0000620 if (h->cb.on_connect_complete && !IS_CLOSING(h)) {
621 pj_status_t status = -1;
622#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
623 int value;
Benny Prijono7db431e2006-07-23 14:38:49 +0000624 int vallen = sizeof(value);
Benny Prijono2bbd7102006-07-18 00:10:53 +0000625 int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
626 &value, &vallen);
627 if (gs_rc == 0) {
628 status = PJ_RETURN_OS_ERROR(value);
629 }
630#endif
631
632 (*h->cb.on_connect_complete)(h, status);
633 }
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000634
635 if (has_lock) {
636 pj_mutex_unlock(h->mutex);
637 }
Benny Prijono9033e312005-11-21 02:08:39 +0000638}
639
640/*
641 * pj_ioqueue_recv()
642 *
643 * Start asynchronous recv() from the socket.
644 */
645PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
646 pj_ioqueue_op_key_t *op_key,
647 void *buffer,
648 pj_ssize_t *length,
649 unsigned flags )
650{
651 struct read_operation *read_op;
652
653 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
654 PJ_CHECK_STACK();
655
Benny Prijono8ac081b2008-02-14 14:20:38 +0000656 /* Check if key is closing (need to do this first before accessing
657 * other variables, since they might have been destroyed. See ticket
658 * #469).
659 */
Benny Prijono3059eb62006-10-04 20:46:27 +0000660 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000661 return PJ_ECANCELLED;
662
Benny Prijono8ac081b2008-02-14 14:20:38 +0000663 read_op = (struct read_operation*)op_key;
664 read_op->op = PJ_IOQUEUE_OP_NONE;
665
Benny Prijono9033e312005-11-21 02:08:39 +0000666 /* Try to see if there's data immediately available.
667 */
668 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
669 pj_status_t status;
670 pj_ssize_t size;
671
672 size = *length;
673 status = pj_sock_recv(key->fd, buffer, &size, flags);
674 if (status == PJ_SUCCESS) {
675 /* Yes! Data is available! */
676 *length = size;
677 return PJ_SUCCESS;
678 } else {
679 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
680 * the error to caller.
681 */
682 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
683 return status;
684 }
685 }
686
687 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
688
689 /*
690 * No data is immediately available.
691 * Must schedule asynchronous operation to the ioqueue.
692 */
693 read_op->op = PJ_IOQUEUE_OP_RECV;
694 read_op->buf = buffer;
695 read_op->size = *length;
696 read_op->flags = flags;
697
698 pj_mutex_lock(key->mutex);
Benny Prijono25cb51d2009-07-02 08:24:22 +0000699 /* Check again. Handle may have been closed after the previous check
700 * in multithreaded app. If we add bad handle to the set it will
701 * corrupt the ioqueue set. See #913
702 */
703 if (IS_CLOSING(key)) {
704 pj_mutex_unlock(key->mutex);
705 return PJ_ECANCELLED;
706 }
Benny Prijono9033e312005-11-21 02:08:39 +0000707 pj_list_insert_before(&key->read_list, read_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000708 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000709 pj_mutex_unlock(key->mutex);
710
711 return PJ_EPENDING;
712}
713
714/*
715 * pj_ioqueue_recvfrom()
716 *
717 * Start asynchronous recvfrom() from the socket.
718 */
719PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
720 pj_ioqueue_op_key_t *op_key,
721 void *buffer,
722 pj_ssize_t *length,
723 unsigned flags,
724 pj_sockaddr_t *addr,
725 int *addrlen)
726{
727 struct read_operation *read_op;
728
729 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
730 PJ_CHECK_STACK();
731
Benny Prijono5accbd02006-03-30 16:32:18 +0000732 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000733 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000734 return PJ_ECANCELLED;
735
Benny Prijono9033e312005-11-21 02:08:39 +0000736 read_op = (struct read_operation*)op_key;
Benny Prijonoa1e69682007-05-11 15:14:34 +0000737 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000738
739 /* Try to see if there's data immediately available.
740 */
741 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
742 pj_status_t status;
743 pj_ssize_t size;
744
745 size = *length;
746 status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
747 addr, addrlen);
748 if (status == PJ_SUCCESS) {
749 /* Yes! Data is available! */
750 *length = size;
751 return PJ_SUCCESS;
752 } else {
753 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
754 * the error to caller.
755 */
756 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
757 return status;
758 }
759 }
760
761 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
762
763 /*
764 * No data is immediately available.
765 * Must schedule asynchronous operation to the ioqueue.
766 */
767 read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
768 read_op->buf = buffer;
769 read_op->size = *length;
770 read_op->flags = flags;
771 read_op->rmt_addr = addr;
772 read_op->rmt_addrlen = addrlen;
773
774 pj_mutex_lock(key->mutex);
Benny Prijono25cb51d2009-07-02 08:24:22 +0000775 /* Check again. Handle may have been closed after the previous check
776 * in multithreaded app. If we add bad handle to the set it will
777 * corrupt the ioqueue set. See #913
778 */
779 if (IS_CLOSING(key)) {
780 pj_mutex_unlock(key->mutex);
781 return PJ_ECANCELLED;
782 }
Benny Prijono9033e312005-11-21 02:08:39 +0000783 pj_list_insert_before(&key->read_list, read_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000784 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000785 pj_mutex_unlock(key->mutex);
786
787 return PJ_EPENDING;
788}
789
790/*
791 * pj_ioqueue_send()
792 *
793 * Start asynchronous send() to the descriptor.
794 */
795PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
796 pj_ioqueue_op_key_t *op_key,
797 const void *data,
798 pj_ssize_t *length,
799 unsigned flags)
800{
801 struct write_operation *write_op;
802 pj_status_t status;
Benny Prijono40212582006-06-22 18:41:28 +0000803 unsigned retry;
Benny Prijono9033e312005-11-21 02:08:39 +0000804 pj_ssize_t sent;
805
806 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
807 PJ_CHECK_STACK();
808
Benny Prijono5accbd02006-03-30 16:32:18 +0000809 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000810 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000811 return PJ_ECANCELLED;
812
Benny Prijono9033e312005-11-21 02:08:39 +0000813 /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */
814 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
815
816 /* Fast track:
817 * Try to send data immediately, only if there's no pending write!
818 * Note:
819 * We are speculating that the list is empty here without properly
820 * acquiring ioqueue's mutex first. This is intentional, to maximize
821 * performance via parallelism.
822 *
823 * This should be safe, because:
824 * - by convention, we require caller to make sure that the
825 * key is not unregistered while other threads are invoking
826 * an operation on the same key.
827 * - pj_list_empty() is safe to be invoked by multiple threads,
828 * even when other threads are modifying the list.
829 */
830 if (pj_list_empty(&key->write_list)) {
831 /*
832 * See if data can be sent immediately.
833 */
834 sent = *length;
835 status = pj_sock_send(key->fd, data, &sent, flags);
836 if (status == PJ_SUCCESS) {
837 /* Success! */
838 *length = sent;
839 return PJ_SUCCESS;
840 } else {
841 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
842 * the error to caller.
843 */
844 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
845 return status;
846 }
847 }
848 }
849
850 /*
851 * Schedule asynchronous send.
852 */
Benny Prijono40212582006-06-22 18:41:28 +0000853 write_op = (struct write_operation*)op_key;
854
855 /* Spin if write_op has pending operation */
856 for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
857 pj_thread_sleep(0);
858
859 /* Last chance */
860 if (write_op->op) {
861 /* Unable to send packet because there is already pending write in the
862 * write_op. We could not put the operation into the write_op
863 * because write_op already contains a pending operation! And
864 * we could not send the packet directly with send() either,
865 * because that will break the order of the packet. So we can
866 * only return error here.
867 *
868 * This could happen for example in multithreads program,
869 * where polling is done by one thread, while other threads are doing
870 * the sending only. If the polling thread runs on lower priority
871 * than the sending thread, then it's possible that the pending
872 * write flag is not cleared in-time because clearing is only done
873 * during polling.
874 *
875 * Aplication should specify multiple write operation keys on
876 * situation like this.
877 */
878 //pj_assert(!"ioqueue: there is pending operation on this key!");
879 return PJ_EBUSY;
880 }
881
Benny Prijono9033e312005-11-21 02:08:39 +0000882 write_op->op = PJ_IOQUEUE_OP_SEND;
Benny Prijonoa1e69682007-05-11 15:14:34 +0000883 write_op->buf = (char*)data;
Benny Prijono9033e312005-11-21 02:08:39 +0000884 write_op->size = *length;
885 write_op->written = 0;
886 write_op->flags = flags;
887
888 pj_mutex_lock(key->mutex);
Benny Prijono25cb51d2009-07-02 08:24:22 +0000889 /* Check again. Handle may have been closed after the previous check
890 * in multithreaded app. If we add bad handle to the set it will
891 * corrupt the ioqueue set. See #913
892 */
893 if (IS_CLOSING(key)) {
894 pj_mutex_unlock(key->mutex);
895 return PJ_ECANCELLED;
896 }
Benny Prijono9033e312005-11-21 02:08:39 +0000897 pj_list_insert_before(&key->write_list, write_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000898 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000899 pj_mutex_unlock(key->mutex);
900
901 return PJ_EPENDING;
902}
903
904
905/*
906 * pj_ioqueue_sendto()
907 *
908 * Start asynchronous write() to the descriptor.
909 */
910PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
911 pj_ioqueue_op_key_t *op_key,
912 const void *data,
913 pj_ssize_t *length,
914 pj_uint32_t flags,
915 const pj_sockaddr_t *addr,
916 int addrlen)
917{
918 struct write_operation *write_op;
Benny Prijono40212582006-06-22 18:41:28 +0000919 unsigned retry;
Benny Prijono9033e312005-11-21 02:08:39 +0000920 pj_status_t status;
921 pj_ssize_t sent;
922
923 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
924 PJ_CHECK_STACK();
925
Benny Prijono5accbd02006-03-30 16:32:18 +0000926 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000927 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000928 return PJ_ECANCELLED;
929
Benny Prijono9033e312005-11-21 02:08:39 +0000930 /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */
931 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
932
933 /* Fast track:
934 * Try to send data immediately, only if there's no pending write!
935 * Note:
936 * We are speculating that the list is empty here without properly
937 * acquiring ioqueue's mutex first. This is intentional, to maximize
938 * performance via parallelism.
939 *
940 * This should be safe, because:
941 * - by convention, we require caller to make sure that the
942 * key is not unregistered while other threads are invoking
943 * an operation on the same key.
944 * - pj_list_empty() is safe to be invoked by multiple threads,
945 * even when other threads are modifying the list.
946 */
947 if (pj_list_empty(&key->write_list)) {
948 /*
949 * See if data can be sent immediately.
950 */
951 sent = *length;
952 status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
953 if (status == PJ_SUCCESS) {
954 /* Success! */
955 *length = sent;
956 return PJ_SUCCESS;
957 } else {
958 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
959 * the error to caller.
960 */
961 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
962 return status;
963 }
Benny Prijono40212582006-06-22 18:41:28 +0000964 status = status;
Benny Prijono9033e312005-11-21 02:08:39 +0000965 }
966 }
967
968 /*
969 * Check that address storage can hold the address parameter.
970 */
Benny Prijonoa1e69682007-05-11 15:14:34 +0000971 PJ_ASSERT_RETURN(addrlen <= (int)sizeof(pj_sockaddr_in), PJ_EBUG);
Benny Prijono9033e312005-11-21 02:08:39 +0000972
973 /*
974 * Schedule asynchronous send.
975 */
Benny Prijono40212582006-06-22 18:41:28 +0000976 write_op = (struct write_operation*)op_key;
977
978 /* Spin if write_op has pending operation */
979 for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
980 pj_thread_sleep(0);
981
982 /* Last chance */
983 if (write_op->op) {
984 /* Unable to send packet because there is already pending write on the
985 * write_op. We could not put the operation into the write_op
986 * because write_op already contains a pending operation! And
987 * we could not send the packet directly with sendto() either,
988 * because that will break the order of the packet. So we can
989 * only return error here.
990 *
991 * This could happen for example in multithreads program,
992 * where polling is done by one thread, while other threads are doing
993 * the sending only. If the polling thread runs on lower priority
994 * than the sending thread, then it's possible that the pending
995 * write flag is not cleared in-time because clearing is only done
996 * during polling.
997 *
998 * Aplication should specify multiple write operation keys on
999 * situation like this.
1000 */
1001 //pj_assert(!"ioqueue: there is pending operation on this key!");
1002 return PJ_EBUSY;
1003 }
1004
Benny Prijono9033e312005-11-21 02:08:39 +00001005 write_op->op = PJ_IOQUEUE_OP_SEND_TO;
Benny Prijonoa1e69682007-05-11 15:14:34 +00001006 write_op->buf = (char*)data;
Benny Prijono9033e312005-11-21 02:08:39 +00001007 write_op->size = *length;
1008 write_op->written = 0;
1009 write_op->flags = flags;
1010 pj_memcpy(&write_op->rmt_addr, addr, addrlen);
1011 write_op->rmt_addrlen = addrlen;
1012
1013 pj_mutex_lock(key->mutex);
Benny Prijono25cb51d2009-07-02 08:24:22 +00001014 /* Check again. Handle may have been closed after the previous check
1015 * in multithreaded app. If we add bad handle to the set it will
1016 * corrupt the ioqueue set. See #913
1017 */
1018 if (IS_CLOSING(key)) {
1019 pj_mutex_unlock(key->mutex);
1020 return PJ_ECANCELLED;
1021 }
Benny Prijono9033e312005-11-21 02:08:39 +00001022 pj_list_insert_before(&key->write_list, write_op);
Benny Prijono63ab3562006-07-08 19:46:43 +00001023 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +00001024 pj_mutex_unlock(key->mutex);
1025
1026 return PJ_EPENDING;
1027}
1028
1029#if PJ_HAS_TCP
1030/*
1031 * Initiate overlapped accept() operation.
1032 */
1033PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
1034 pj_ioqueue_op_key_t *op_key,
1035 pj_sock_t *new_sock,
1036 pj_sockaddr_t *local,
1037 pj_sockaddr_t *remote,
1038 int *addrlen)
1039{
1040 struct accept_operation *accept_op;
1041 pj_status_t status;
1042
1043 /* check parameters. All must be specified! */
1044 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
1045
Benny Prijono5accbd02006-03-30 16:32:18 +00001046 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +00001047 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +00001048 return PJ_ECANCELLED;
1049
Benny Prijono9033e312005-11-21 02:08:39 +00001050 accept_op = (struct accept_operation*)op_key;
Benny Prijonoa1e69682007-05-11 15:14:34 +00001051 accept_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +00001052
1053 /* Fast track:
1054 * See if there's new connection available immediately.
1055 */
1056 if (pj_list_empty(&key->accept_list)) {
1057 status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
1058 if (status == PJ_SUCCESS) {
1059 /* Yes! New connection is available! */
1060 if (local && addrlen) {
1061 status = pj_sock_getsockname(*new_sock, local, addrlen);
1062 if (status != PJ_SUCCESS) {
1063 pj_sock_close(*new_sock);
1064 *new_sock = PJ_INVALID_SOCKET;
1065 return status;
1066 }
1067 }
1068 return PJ_SUCCESS;
1069 } else {
1070 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
1071 * the error to caller.
1072 */
1073 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
1074 return status;
1075 }
1076 }
1077 }
1078
1079 /*
1080 * No connection is available immediately.
1081 * Schedule accept() operation to be completed when there is incoming
1082 * connection available.
1083 */
1084 accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
1085 accept_op->accept_fd = new_sock;
1086 accept_op->rmt_addr = remote;
1087 accept_op->addrlen= addrlen;
1088 accept_op->local_addr = local;
1089
1090 pj_mutex_lock(key->mutex);
Benny Prijono25cb51d2009-07-02 08:24:22 +00001091 /* Check again. Handle may have been closed after the previous check
1092 * in multithreaded app. If we add bad handle to the set it will
1093 * corrupt the ioqueue set. See #913
1094 */
1095 if (IS_CLOSING(key)) {
1096 pj_mutex_unlock(key->mutex);
1097 return PJ_ECANCELLED;
1098 }
Benny Prijono9033e312005-11-21 02:08:39 +00001099 pj_list_insert_before(&key->accept_list, accept_op);
Benny Prijono63ab3562006-07-08 19:46:43 +00001100 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +00001101 pj_mutex_unlock(key->mutex);
1102
1103 return PJ_EPENDING;
1104}
1105
1106/*
1107 * Initiate overlapped connect() operation (well, it's non-blocking actually,
1108 * since there's no overlapped version of connect()).
1109 */
1110PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
1111 const pj_sockaddr_t *addr,
1112 int addrlen )
1113{
1114 pj_status_t status;
1115
1116 /* check parameters. All must be specified! */
1117 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
1118
Benny Prijono5accbd02006-03-30 16:32:18 +00001119 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +00001120 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +00001121 return PJ_ECANCELLED;
1122
Benny Prijono9033e312005-11-21 02:08:39 +00001123 /* Check if socket has not been marked for connecting */
1124 if (key->connecting != 0)
1125 return PJ_EPENDING;
1126
1127 status = pj_sock_connect(key->fd, addr, addrlen);
1128 if (status == PJ_SUCCESS) {
1129 /* Connected! */
1130 return PJ_SUCCESS;
1131 } else {
1132 if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
1133 /* Pending! */
1134 pj_mutex_lock(key->mutex);
Benny Prijono25cb51d2009-07-02 08:24:22 +00001135 /* Check again. Handle may have been closed after the previous
1136 * check in multithreaded app. See #913
1137 */
1138 if (IS_CLOSING(key)) {
1139 pj_mutex_unlock(key->mutex);
1140 return PJ_ECANCELLED;
1141 }
Benny Prijono9033e312005-11-21 02:08:39 +00001142 key->connecting = PJ_TRUE;
Benny Prijono63ab3562006-07-08 19:46:43 +00001143 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
1144 ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +00001145 pj_mutex_unlock(key->mutex);
1146 return PJ_EPENDING;
1147 } else {
1148 /* Error! */
1149 return status;
1150 }
1151 }
1152}
1153#endif /* PJ_HAS_TCP */
1154
1155
1156PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1157 pj_size_t size )
1158{
Benny Prijonoac623b32006-07-03 15:19:31 +00001159 pj_bzero(op_key, size);
Benny Prijono9033e312005-11-21 02:08:39 +00001160}
1161
1162
1163/*
1164 * pj_ioqueue_is_pending()
1165 */
1166PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1167 pj_ioqueue_op_key_t *op_key )
1168{
1169 struct generic_operation *op_rec;
1170
1171 PJ_UNUSED_ARG(key);
1172
1173 op_rec = (struct generic_operation*)op_key;
1174 return op_rec->op != 0;
1175}
1176
1177
1178/*
1179 * pj_ioqueue_post_completion()
1180 */
1181PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1182 pj_ioqueue_op_key_t *op_key,
1183 pj_ssize_t bytes_status )
1184{
1185 struct generic_operation *op_rec;
1186
1187 /*
1188 * Find the operation key in all pending operation list to
1189 * really make sure that it's still there; then call the callback.
1190 */
Benny Prijono5accbd02006-03-30 16:32:18 +00001191 pj_mutex_lock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001192
1193 /* Find the operation in the pending read list. */
1194 op_rec = (struct generic_operation*)key->read_list.next;
1195 while (op_rec != (void*)&key->read_list) {
1196 if (op_rec == (void*)op_key) {
1197 pj_list_erase(op_rec);
Benny Prijonoa1e69682007-05-11 15:14:34 +00001198 op_rec->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono5accbd02006-03-30 16:32:18 +00001199 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001200
1201 (*key->cb.on_read_complete)(key, op_key, bytes_status);
1202 return PJ_SUCCESS;
1203 }
1204 op_rec = op_rec->next;
1205 }
1206
1207 /* Find the operation in the pending write list. */
1208 op_rec = (struct generic_operation*)key->write_list.next;
1209 while (op_rec != (void*)&key->write_list) {
1210 if (op_rec == (void*)op_key) {
1211 pj_list_erase(op_rec);
Benny Prijonoa1e69682007-05-11 15:14:34 +00001212 op_rec->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono5accbd02006-03-30 16:32:18 +00001213 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001214
1215 (*key->cb.on_write_complete)(key, op_key, bytes_status);
1216 return PJ_SUCCESS;
1217 }
1218 op_rec = op_rec->next;
1219 }
1220
1221 /* Find the operation in the pending accept list. */
1222 op_rec = (struct generic_operation*)key->accept_list.next;
1223 while (op_rec != (void*)&key->accept_list) {
1224 if (op_rec == (void*)op_key) {
1225 pj_list_erase(op_rec);
Benny Prijonoa1e69682007-05-11 15:14:34 +00001226 op_rec->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono5accbd02006-03-30 16:32:18 +00001227 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001228
1229 (*key->cb.on_accept_complete)(key, op_key,
1230 PJ_INVALID_SOCKET,
1231 bytes_status);
1232 return PJ_SUCCESS;
1233 }
1234 op_rec = op_rec->next;
1235 }
1236
Benny Prijono5accbd02006-03-30 16:32:18 +00001237 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001238
1239 return PJ_EINVALIDOP;
1240}
1241
Benny Prijonoe3f79fd2008-02-13 15:17:28 +00001242PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency( pj_ioqueue_t *ioqueue,
1243 pj_bool_t allow)
1244{
1245 PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
1246 ioqueue->default_concurrency = allow;
1247 return PJ_SUCCESS;
1248}
1249
1250
1251PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
1252 pj_bool_t allow)
1253{
1254 PJ_ASSERT_RETURN(key, PJ_EINVAL);
1255
1256 /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
1257 * disabled.
1258 */
1259 PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
1260
1261 key->allow_concurrent = allow;
1262 return PJ_SUCCESS;
1263}
1264
1265PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
1266{
1267 return pj_mutex_lock(key->mutex);
1268}
1269
1270PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
1271{
1272 return pj_mutex_unlock(key->mutex);
1273}
1274