blob: bcd4f6a3361debf52e5d21753c2a442406c18fc0 [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
Benny Prijono844653c2008-12-23 17:27:53 +00003 * Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono9033e312005-11-21 02:08:39 +00005 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20
21/*
22 * ioqueue_common_abs.c
23 *
24 * This contains common functionalities to emulate proactor pattern with
25 * various event dispatching mechanisms (e.g. select, epoll).
26 *
27 * This file will be included by the appropriate ioqueue implementation.
28 * This file is NOT supposed to be compiled as stand-alone source.
29 */
30
Benny Prijono40212582006-06-22 18:41:28 +000031#define PENDING_RETRY 2
32
Benny Prijono9033e312005-11-21 02:08:39 +000033static void ioqueue_init( pj_ioqueue_t *ioqueue )
34{
35 ioqueue->lock = NULL;
36 ioqueue->auto_delete_lock = 0;
Benny Prijono8eb763c2008-07-12 10:09:39 +000037 ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
Benny Prijono9033e312005-11-21 02:08:39 +000038}
39
40static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
41{
42 if (ioqueue->auto_delete_lock && ioqueue->lock ) {
43 pj_lock_release(ioqueue->lock);
44 return pj_lock_destroy(ioqueue->lock);
Nanang Izzuddin838cb322008-12-18 17:52:57 +000045 }
46
47 return PJ_SUCCESS;
Benny Prijono9033e312005-11-21 02:08:39 +000048}
49
50/*
51 * pj_ioqueue_set_lock()
52 */
53PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
54 pj_lock_t *lock,
55 pj_bool_t auto_delete )
56{
57 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
58
59 if (ioqueue->auto_delete_lock && ioqueue->lock) {
60 pj_lock_destroy(ioqueue->lock);
61 }
62
63 ioqueue->lock = lock;
64 ioqueue->auto_delete_lock = auto_delete;
65
66 return PJ_SUCCESS;
67}
68
69static pj_status_t ioqueue_init_key( pj_pool_t *pool,
70 pj_ioqueue_t *ioqueue,
71 pj_ioqueue_key_t *key,
72 pj_sock_t sock,
73 void *user_data,
74 const pj_ioqueue_callback *cb)
75{
76 pj_status_t rc;
77 int optlen;
78
Benny Prijono8befd9f2006-05-13 22:46:23 +000079 PJ_UNUSED_ARG(pool);
80
Benny Prijono9033e312005-11-21 02:08:39 +000081 key->ioqueue = ioqueue;
82 key->fd = sock;
83 key->user_data = user_data;
84 pj_list_init(&key->read_list);
85 pj_list_init(&key->write_list);
86#if PJ_HAS_TCP
87 pj_list_init(&key->accept_list);
Benny Prijono5accbd02006-03-30 16:32:18 +000088 key->connecting = 0;
Benny Prijono9033e312005-11-21 02:08:39 +000089#endif
90
91 /* Save callback. */
92 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
93
Benny Prijono5accbd02006-03-30 16:32:18 +000094#if PJ_IOQUEUE_HAS_SAFE_UNREG
95 /* Set initial reference count to 1 */
96 pj_assert(key->ref_count == 0);
97 ++key->ref_count;
98
99 key->closing = 0;
100#endif
101
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000102 rc = pj_ioqueue_set_concurrency(key, ioqueue->default_concurrency);
103 if (rc != PJ_SUCCESS)
104 return rc;
105
Benny Prijono9033e312005-11-21 02:08:39 +0000106 /* Get socket type. When socket type is datagram, some optimization
107 * will be performed during send to allow parallel send operations.
108 */
109 optlen = sizeof(key->fd_type);
Benny Prijono8ab968f2007-07-20 08:08:30 +0000110 rc = pj_sock_getsockopt(sock, pj_SOL_SOCKET(), pj_SO_TYPE(),
Benny Prijono9033e312005-11-21 02:08:39 +0000111 &key->fd_type, &optlen);
112 if (rc != PJ_SUCCESS)
Benny Prijono8ab968f2007-07-20 08:08:30 +0000113 key->fd_type = pj_SOCK_STREAM();
Benny Prijono9033e312005-11-21 02:08:39 +0000114
115 /* Create mutex for the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000116#if !PJ_IOQUEUE_HAS_SAFE_UNREG
117 rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
118#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000119
120 return rc;
121}
122
Benny Prijono9033e312005-11-21 02:08:39 +0000123/*
124 * pj_ioqueue_get_user_data()
125 *
126 * Obtain value associated with a key.
127 */
128PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
129{
130 PJ_ASSERT_RETURN(key != NULL, NULL);
131 return key->user_data;
132}
133
134/*
135 * pj_ioqueue_set_user_data()
136 */
137PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
138 void *user_data,
139 void **old_data)
140{
141 PJ_ASSERT_RETURN(key, PJ_EINVAL);
142
143 if (old_data)
144 *old_data = key->user_data;
145 key->user_data = user_data;
146
147 return PJ_SUCCESS;
148}
149
150PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
151{
152 return !pj_list_empty(&key->write_list);
153}
154
155PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
156{
157 return !pj_list_empty(&key->read_list);
158}
159
160PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
161{
162#if PJ_HAS_TCP
163 return !pj_list_empty(&key->accept_list);
164#else
Benny Prijono3569c0d2007-04-06 10:29:20 +0000165 PJ_UNUSED_ARG(key);
Benny Prijono9033e312005-11-21 02:08:39 +0000166 return 0;
167#endif
168}
169
170PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
171{
172 return key->connecting;
173}
174
175
Benny Prijono5accbd02006-03-30 16:32:18 +0000176#if PJ_IOQUEUE_HAS_SAFE_UNREG
177# define IS_CLOSING(key) (key->closing)
178#else
179# define IS_CLOSING(key) (0)
180#endif
181
182
Benny Prijono9033e312005-11-21 02:08:39 +0000183/*
184 * ioqueue_dispatch_event()
185 *
186 * Report occurence of an event in the key to be processed by the
187 * framework.
188 */
189void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
190{
191 /* Lock the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000192 pj_mutex_lock(h->mutex);
193
Benny Prijono3059eb62006-10-04 20:46:27 +0000194 if (IS_CLOSING(h)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000195 pj_mutex_unlock(h->mutex);
196 return;
197 }
Benny Prijono9033e312005-11-21 02:08:39 +0000198
199#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
200 if (h->connecting) {
201 /* Completion of connect() operation */
Benny Prijono653af2e2010-02-05 11:11:52 +0000202 pj_status_t status;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000203 pj_bool_t has_lock;
Benny Prijono9033e312005-11-21 02:08:39 +0000204
205 /* Clear operation. */
206 h->connecting = 0;
207
Benny Prijono63ab3562006-07-08 19:46:43 +0000208 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
209 ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000210
Benny Prijono9033e312005-11-21 02:08:39 +0000211
212#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
213 /* from connect(2):
214 * On Linux, use getsockopt to read the SO_ERROR option at
215 * level SOL_SOCKET to determine whether connect() completed
216 * successfully (if SO_ERROR is zero).
217 */
218 {
219 int value;
Benny Prijono7db431e2006-07-23 14:38:49 +0000220 int vallen = sizeof(value);
Benny Prijono2bbd7102006-07-18 00:10:53 +0000221 int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
222 &value, &vallen);
Benny Prijono9033e312005-11-21 02:08:39 +0000223 if (gs_rc != 0) {
224 /* Argh!! What to do now???
225 * Just indicate that the socket is connected. The
226 * application will get error as soon as it tries to use
227 * the socket to send/receive.
228 */
Benny Prijono653af2e2010-02-05 11:11:52 +0000229 status = PJ_SUCCESS;
Benny Prijono9033e312005-11-21 02:08:39 +0000230 } else {
Benny Prijono653af2e2010-02-05 11:11:52 +0000231 status = PJ_STATUS_FROM_OS(value);
Benny Prijono9033e312005-11-21 02:08:39 +0000232 }
233 }
234#elif defined(PJ_WIN32) && PJ_WIN32!=0
Benny Prijono653af2e2010-02-05 11:11:52 +0000235 status = PJ_SUCCESS; /* success */
Benny Prijono9033e312005-11-21 02:08:39 +0000236#else
237 /* Excellent information in D.J. Bernstein page:
238 * http://cr.yp.to/docs/connect.html
239 *
240 * Seems like the most portable way of detecting connect()
241 * failure is to call getpeername(). If socket is connected,
242 * getpeername() will return 0. If the socket is not connected,
243 * it will return ENOTCONN, and read(fd, &ch, 1) will produce
244 * the right errno through error slippage. This is a combination
245 * of suggestions from Douglas C. Schmidt and Ken Keys.
246 */
Benny Prijono9cf138e2006-01-19 03:58:29 +0000247 {
Benny Prijono9cf138e2006-01-19 03:58:29 +0000248 struct sockaddr_in addr;
Benny Prijono653af2e2010-02-05 11:11:52 +0000249 int addrlen = sizeof(addr);
Benny Prijono9033e312005-11-21 02:08:39 +0000250
Benny Prijono653af2e2010-02-05 11:11:52 +0000251 status = pj_sock_getpeername(h->fd, (struct sockaddr*)&addr,
252 &addrlen);
Benny Prijono9cf138e2006-01-19 03:58:29 +0000253 }
Benny Prijono9033e312005-11-21 02:08:39 +0000254#endif
255
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000256 /* Unlock; from this point we don't need to hold key's mutex
257 * (unless concurrency is disabled, which in this case we should
258 * hold the mutex while calling the callback) */
259 if (h->allow_concurrent) {
260 /* concurrency may be changed while we're in the callback, so
261 * save it to a flag.
262 */
263 has_lock = PJ_FALSE;
264 pj_mutex_unlock(h->mutex);
265 } else {
266 has_lock = PJ_TRUE;
267 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000268
Benny Prijono9033e312005-11-21 02:08:39 +0000269 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000270 if (h->cb.on_connect_complete && !IS_CLOSING(h))
Benny Prijono653af2e2010-02-05 11:11:52 +0000271 (*h->cb.on_connect_complete)(h, status);
Benny Prijono9033e312005-11-21 02:08:39 +0000272
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000273 /* Unlock if we still hold the lock */
274 if (has_lock) {
275 pj_mutex_unlock(h->mutex);
276 }
277
Benny Prijono9033e312005-11-21 02:08:39 +0000278 /* Done. */
279
280 } else
281#endif /* PJ_HAS_TCP */
282 if (key_has_pending_write(h)) {
283 /* Socket is writable. */
284 struct write_operation *write_op;
285 pj_ssize_t sent;
286 pj_status_t send_rc;
287
288 /* Get the first in the queue. */
289 write_op = h->write_list.next;
290
291 /* For datagrams, we can remove the write_op from the list
292 * so that send() can work in parallel.
293 */
Benny Prijono8ab968f2007-07-20 08:08:30 +0000294 if (h->fd_type == pj_SOCK_DGRAM()) {
Benny Prijono9033e312005-11-21 02:08:39 +0000295 pj_list_erase(write_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000296
297 if (pj_list_empty(&h->write_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000298 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000299
Benny Prijono9033e312005-11-21 02:08:39 +0000300 }
301
302 /* Send the data.
303 * Unfortunately we must do this while holding key's mutex, thus
304 * preventing parallel write on a single key.. :-((
305 */
306 sent = write_op->size - write_op->written;
307 if (write_op->op == PJ_IOQUEUE_OP_SEND) {
308 send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
309 &sent, write_op->flags);
Benny Prijono40212582006-06-22 18:41:28 +0000310 /* Can't do this. We only clear "op" after we're finished sending
311 * the whole buffer.
312 */
313 //write_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000314 } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
315 send_rc = pj_sock_sendto(h->fd,
316 write_op->buf+write_op->written,
317 &sent, write_op->flags,
318 &write_op->rmt_addr,
319 write_op->rmt_addrlen);
Benny Prijono40212582006-06-22 18:41:28 +0000320 /* Can't do this. We only clear "op" after we're finished sending
321 * the whole buffer.
322 */
323 //write_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000324 } else {
325 pj_assert(!"Invalid operation type!");
Benny Prijonoa1e69682007-05-11 15:14:34 +0000326 write_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000327 send_rc = PJ_EBUG;
328 }
329
330 if (send_rc == PJ_SUCCESS) {
331 write_op->written += sent;
332 } else {
333 pj_assert(send_rc > 0);
334 write_op->written = -send_rc;
335 }
336
337 /* Are we finished with this buffer? */
338 if (send_rc!=PJ_SUCCESS ||
339 write_op->written == (pj_ssize_t)write_op->size ||
Benny Prijono8ab968f2007-07-20 08:08:30 +0000340 h->fd_type == pj_SOCK_DGRAM())
Benny Prijono9033e312005-11-21 02:08:39 +0000341 {
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000342 pj_bool_t has_lock;
Benny Prijono40212582006-06-22 18:41:28 +0000343
Benny Prijonoa1e69682007-05-11 15:14:34 +0000344 write_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono40212582006-06-22 18:41:28 +0000345
Benny Prijono8ab968f2007-07-20 08:08:30 +0000346 if (h->fd_type != pj_SOCK_DGRAM()) {
Benny Prijono9033e312005-11-21 02:08:39 +0000347 /* Write completion of the whole stream. */
348 pj_list_erase(write_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000349
350 /* Clear operation if there's no more data to send. */
351 if (pj_list_empty(&h->write_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000352 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000353
Benny Prijono9033e312005-11-21 02:08:39 +0000354 }
355
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000356 /* Unlock; from this point we don't need to hold key's mutex
357 * (unless concurrency is disabled, which in this case we should
358 * hold the mutex while calling the callback) */
359 if (h->allow_concurrent) {
360 /* concurrency may be changed while we're in the callback, so
361 * save it to a flag.
362 */
363 has_lock = PJ_FALSE;
364 pj_mutex_unlock(h->mutex);
365 } else {
366 has_lock = PJ_TRUE;
367 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000368
Benny Prijono9033e312005-11-21 02:08:39 +0000369 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000370 if (h->cb.on_write_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000371 (*h->cb.on_write_complete)(h,
372 (pj_ioqueue_op_key_t*)write_op,
373 write_op->written);
374 }
375
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000376 if (has_lock) {
377 pj_mutex_unlock(h->mutex);
378 }
379
Benny Prijono9033e312005-11-21 02:08:39 +0000380 } else {
Benny Prijono5accbd02006-03-30 16:32:18 +0000381 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000382 }
383
384 /* Done. */
385 } else {
386 /*
387 * This is normal; execution may fall here when multiple threads
388 * are signalled for the same event, but only one thread eventually
389 * able to process the event.
390 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000391 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000392 }
393}
394
395void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
396{
397 pj_status_t rc;
398
399 /* Lock the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000400 pj_mutex_lock(h->mutex);
401
Benny Prijono3059eb62006-10-04 20:46:27 +0000402 if (IS_CLOSING(h)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000403 pj_mutex_unlock(h->mutex);
404 return;
405 }
Benny Prijono9033e312005-11-21 02:08:39 +0000406
407# if PJ_HAS_TCP
408 if (!pj_list_empty(&h->accept_list)) {
409
410 struct accept_operation *accept_op;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000411 pj_bool_t has_lock;
Benny Prijono9033e312005-11-21 02:08:39 +0000412
413 /* Get one accept operation from the list. */
414 accept_op = h->accept_list.next;
415 pj_list_erase(accept_op);
Benny Prijonoa1e69682007-05-11 15:14:34 +0000416 accept_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000417
418 /* Clear bit in fdset if there is no more pending accept */
419 if (pj_list_empty(&h->accept_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000420 ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000421
Benny Prijono9033e312005-11-21 02:08:39 +0000422 rc=pj_sock_accept(h->fd, accept_op->accept_fd,
423 accept_op->rmt_addr, accept_op->addrlen);
424 if (rc==PJ_SUCCESS && accept_op->local_addr) {
425 rc = pj_sock_getsockname(*accept_op->accept_fd,
426 accept_op->local_addr,
427 accept_op->addrlen);
428 }
429
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000430 /* Unlock; from this point we don't need to hold key's mutex
431 * (unless concurrency is disabled, which in this case we should
432 * hold the mutex while calling the callback) */
433 if (h->allow_concurrent) {
434 /* concurrency may be changed while we're in the callback, so
435 * save it to a flag.
436 */
437 has_lock = PJ_FALSE;
438 pj_mutex_unlock(h->mutex);
439 } else {
440 has_lock = PJ_TRUE;
441 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000442
Benny Prijono9033e312005-11-21 02:08:39 +0000443 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000444 if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000445 (*h->cb.on_accept_complete)(h,
446 (pj_ioqueue_op_key_t*)accept_op,
447 *accept_op->accept_fd, rc);
448 }
449
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000450 if (has_lock) {
451 pj_mutex_unlock(h->mutex);
452 }
Benny Prijono9033e312005-11-21 02:08:39 +0000453 }
454 else
455# endif
456 if (key_has_pending_read(h)) {
457 struct read_operation *read_op;
458 pj_ssize_t bytes_read;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000459 pj_bool_t has_lock;
Benny Prijono9033e312005-11-21 02:08:39 +0000460
461 /* Get one pending read operation from the list. */
462 read_op = h->read_list.next;
463 pj_list_erase(read_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000464
465 /* Clear fdset if there is no pending read. */
466 if (pj_list_empty(&h->read_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000467 ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000468
Benny Prijono9033e312005-11-21 02:08:39 +0000469 bytes_read = read_op->size;
470
471 if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
Benny Prijonoa1e69682007-05-11 15:14:34 +0000472 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijonof6b95302006-12-25 06:36:23 +0000473 rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read,
474 read_op->flags,
Benny Prijono9033e312005-11-21 02:08:39 +0000475 read_op->rmt_addr,
476 read_op->rmt_addrlen);
477 } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
Benny Prijonoa1e69682007-05-11 15:14:34 +0000478 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijonof6b95302006-12-25 06:36:23 +0000479 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
480 read_op->flags);
Benny Prijono9033e312005-11-21 02:08:39 +0000481 } else {
482 pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
Benny Prijonoa1e69682007-05-11 15:14:34 +0000483 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000484 /*
485 * User has specified pj_ioqueue_read().
486 * On Win32, we should do ReadFile(). But because we got
487 * here because of select() anyway, user must have put a
488 * socket descriptor on h->fd, which in this case we can
489 * just call pj_sock_recv() instead of ReadFile().
490 * On Unix, user may put a file in h->fd, so we'll have
491 * to call read() here.
492 * This may not compile on systems which doesn't have
493 * read(). That's why we only specify PJ_LINUX here so
494 * that error is easier to catch.
495 */
Benny Prijono9cf138e2006-01-19 03:58:29 +0000496# if defined(PJ_WIN32) && PJ_WIN32 != 0 || \
497 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE != 0
Benny Prijonof6b95302006-12-25 06:36:23 +0000498 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
499 read_op->flags);
Benny Prijono9033e312005-11-21 02:08:39 +0000500 //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
501 // &bytes_read, NULL);
502# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
503 bytes_read = read(h->fd, read_op->buf, bytes_read);
504 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
505# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
506 bytes_read = sys_read(h->fd, read_op->buf, bytes_read);
507 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
508# else
509# error "Implement read() for this platform!"
510# endif
511 }
512
513 if (rc != PJ_SUCCESS) {
514# if defined(PJ_WIN32) && PJ_WIN32 != 0
515 /* On Win32, for UDP, WSAECONNRESET on the receive side
516 * indicates that previous sending has triggered ICMP Port
517 * Unreachable message.
518 * But we wouldn't know at this point which one of previous
519 * key that has triggered the error, since UDP socket can
520 * be shared!
521 * So we'll just ignore it!
522 */
523
524 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
525 //PJ_LOG(4,(THIS_FILE,
526 // "Ignored ICMP port unreach. on key=%p", h));
527 }
528# endif
529
530 /* In any case we would report this to caller. */
531 bytes_read = -rc;
532 }
533
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000534 /* Unlock; from this point we don't need to hold key's mutex
535 * (unless concurrency is disabled, which in this case we should
536 * hold the mutex while calling the callback) */
537 if (h->allow_concurrent) {
538 /* concurrency may be changed while we're in the callback, so
539 * save it to a flag.
540 */
541 has_lock = PJ_FALSE;
542 pj_mutex_unlock(h->mutex);
543 } else {
544 has_lock = PJ_TRUE;
545 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000546
Benny Prijono9033e312005-11-21 02:08:39 +0000547 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000548 if (h->cb.on_read_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000549 (*h->cb.on_read_complete)(h,
550 (pj_ioqueue_op_key_t*)read_op,
551 bytes_read);
552 }
553
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000554 if (has_lock) {
555 pj_mutex_unlock(h->mutex);
556 }
557
Benny Prijono9033e312005-11-21 02:08:39 +0000558 } else {
559 /*
560 * This is normal; execution may fall here when multiple threads
561 * are signalled for the same event, but only one thread eventually
562 * able to process the event.
563 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000564 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000565 }
566}
567
568
569void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
570 pj_ioqueue_key_t *h )
571{
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000572 pj_bool_t has_lock;
573
Benny Prijono5accbd02006-03-30 16:32:18 +0000574 pj_mutex_lock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000575
576 if (!h->connecting) {
577 /* It is possible that more than one thread was woken up, thus
578 * the remaining thread will see h->connecting as zero because
579 * it has been processed by other thread.
580 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000581 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000582 return;
583 }
584
Benny Prijono3059eb62006-10-04 20:46:27 +0000585 if (IS_CLOSING(h)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000586 pj_mutex_unlock(h->mutex);
587 return;
588 }
589
Benny Prijono9033e312005-11-21 02:08:39 +0000590 /* Clear operation. */
591 h->connecting = 0;
592
Benny Prijono63ab3562006-07-08 19:46:43 +0000593 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
594 ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000595
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000596 /* Unlock; from this point we don't need to hold key's mutex
597 * (unless concurrency is disabled, which in this case we should
598 * hold the mutex while calling the callback) */
599 if (h->allow_concurrent) {
600 /* concurrency may be changed while we're in the callback, so
601 * save it to a flag.
602 */
603 has_lock = PJ_FALSE;
604 pj_mutex_unlock(h->mutex);
605 } else {
606 has_lock = PJ_TRUE;
607 }
Benny Prijonoac52df42006-03-25 10:06:00 +0000608
Benny Prijono5accbd02006-03-30 16:32:18 +0000609 /* Call callback. */
Benny Prijono2bbd7102006-07-18 00:10:53 +0000610 if (h->cb.on_connect_complete && !IS_CLOSING(h)) {
611 pj_status_t status = -1;
612#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
613 int value;
Benny Prijono7db431e2006-07-23 14:38:49 +0000614 int vallen = sizeof(value);
Benny Prijono2bbd7102006-07-18 00:10:53 +0000615 int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
616 &value, &vallen);
617 if (gs_rc == 0) {
618 status = PJ_RETURN_OS_ERROR(value);
619 }
620#endif
621
622 (*h->cb.on_connect_complete)(h, status);
623 }
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000624
625 if (has_lock) {
626 pj_mutex_unlock(h->mutex);
627 }
Benny Prijono9033e312005-11-21 02:08:39 +0000628}
629
630/*
631 * pj_ioqueue_recv()
632 *
633 * Start asynchronous recv() from the socket.
634 */
635PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
636 pj_ioqueue_op_key_t *op_key,
637 void *buffer,
638 pj_ssize_t *length,
639 unsigned flags )
640{
641 struct read_operation *read_op;
642
643 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
644 PJ_CHECK_STACK();
645
Benny Prijono8ac081b2008-02-14 14:20:38 +0000646 /* Check if key is closing (need to do this first before accessing
647 * other variables, since they might have been destroyed. See ticket
648 * #469).
649 */
Benny Prijono3059eb62006-10-04 20:46:27 +0000650 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000651 return PJ_ECANCELLED;
652
Benny Prijono8ac081b2008-02-14 14:20:38 +0000653 read_op = (struct read_operation*)op_key;
654 read_op->op = PJ_IOQUEUE_OP_NONE;
655
Benny Prijono9033e312005-11-21 02:08:39 +0000656 /* Try to see if there's data immediately available.
657 */
658 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
659 pj_status_t status;
660 pj_ssize_t size;
661
662 size = *length;
663 status = pj_sock_recv(key->fd, buffer, &size, flags);
664 if (status == PJ_SUCCESS) {
665 /* Yes! Data is available! */
666 *length = size;
667 return PJ_SUCCESS;
668 } else {
669 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
670 * the error to caller.
671 */
672 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
673 return status;
674 }
675 }
676
677 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
678
679 /*
680 * No data is immediately available.
681 * Must schedule asynchronous operation to the ioqueue.
682 */
683 read_op->op = PJ_IOQUEUE_OP_RECV;
684 read_op->buf = buffer;
685 read_op->size = *length;
686 read_op->flags = flags;
687
688 pj_mutex_lock(key->mutex);
Benny Prijono25cb51d2009-07-02 08:24:22 +0000689 /* Check again. Handle may have been closed after the previous check
690 * in multithreaded app. If we add bad handle to the set it will
691 * corrupt the ioqueue set. See #913
692 */
693 if (IS_CLOSING(key)) {
694 pj_mutex_unlock(key->mutex);
695 return PJ_ECANCELLED;
696 }
Benny Prijono9033e312005-11-21 02:08:39 +0000697 pj_list_insert_before(&key->read_list, read_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000698 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000699 pj_mutex_unlock(key->mutex);
700
701 return PJ_EPENDING;
702}
703
704/*
705 * pj_ioqueue_recvfrom()
706 *
707 * Start asynchronous recvfrom() from the socket.
708 */
709PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
710 pj_ioqueue_op_key_t *op_key,
711 void *buffer,
712 pj_ssize_t *length,
713 unsigned flags,
714 pj_sockaddr_t *addr,
715 int *addrlen)
716{
717 struct read_operation *read_op;
718
719 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
720 PJ_CHECK_STACK();
721
Benny Prijono5accbd02006-03-30 16:32:18 +0000722 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000723 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000724 return PJ_ECANCELLED;
725
Benny Prijono9033e312005-11-21 02:08:39 +0000726 read_op = (struct read_operation*)op_key;
Benny Prijonoa1e69682007-05-11 15:14:34 +0000727 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000728
729 /* Try to see if there's data immediately available.
730 */
731 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
732 pj_status_t status;
733 pj_ssize_t size;
734
735 size = *length;
736 status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
737 addr, addrlen);
738 if (status == PJ_SUCCESS) {
739 /* Yes! Data is available! */
740 *length = size;
741 return PJ_SUCCESS;
742 } else {
743 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
744 * the error to caller.
745 */
746 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
747 return status;
748 }
749 }
750
751 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
752
753 /*
754 * No data is immediately available.
755 * Must schedule asynchronous operation to the ioqueue.
756 */
757 read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
758 read_op->buf = buffer;
759 read_op->size = *length;
760 read_op->flags = flags;
761 read_op->rmt_addr = addr;
762 read_op->rmt_addrlen = addrlen;
763
764 pj_mutex_lock(key->mutex);
Benny Prijono25cb51d2009-07-02 08:24:22 +0000765 /* Check again. Handle may have been closed after the previous check
766 * in multithreaded app. If we add bad handle to the set it will
767 * corrupt the ioqueue set. See #913
768 */
769 if (IS_CLOSING(key)) {
770 pj_mutex_unlock(key->mutex);
771 return PJ_ECANCELLED;
772 }
Benny Prijono9033e312005-11-21 02:08:39 +0000773 pj_list_insert_before(&key->read_list, read_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000774 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000775 pj_mutex_unlock(key->mutex);
776
777 return PJ_EPENDING;
778}
779
780/*
781 * pj_ioqueue_send()
782 *
783 * Start asynchronous send() to the descriptor.
784 */
785PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
786 pj_ioqueue_op_key_t *op_key,
787 const void *data,
788 pj_ssize_t *length,
789 unsigned flags)
790{
791 struct write_operation *write_op;
792 pj_status_t status;
Benny Prijono40212582006-06-22 18:41:28 +0000793 unsigned retry;
Benny Prijono9033e312005-11-21 02:08:39 +0000794 pj_ssize_t sent;
795
796 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
797 PJ_CHECK_STACK();
798
Benny Prijono5accbd02006-03-30 16:32:18 +0000799 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000800 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000801 return PJ_ECANCELLED;
802
Benny Prijono9033e312005-11-21 02:08:39 +0000803 /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */
804 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
805
806 /* Fast track:
807 * Try to send data immediately, only if there's no pending write!
808 * Note:
809 * We are speculating that the list is empty here without properly
810 * acquiring ioqueue's mutex first. This is intentional, to maximize
811 * performance via parallelism.
812 *
813 * This should be safe, because:
814 * - by convention, we require caller to make sure that the
815 * key is not unregistered while other threads are invoking
816 * an operation on the same key.
817 * - pj_list_empty() is safe to be invoked by multiple threads,
818 * even when other threads are modifying the list.
819 */
820 if (pj_list_empty(&key->write_list)) {
821 /*
822 * See if data can be sent immediately.
823 */
824 sent = *length;
825 status = pj_sock_send(key->fd, data, &sent, flags);
826 if (status == PJ_SUCCESS) {
827 /* Success! */
828 *length = sent;
829 return PJ_SUCCESS;
830 } else {
831 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
832 * the error to caller.
833 */
834 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
835 return status;
836 }
837 }
838 }
839
840 /*
841 * Schedule asynchronous send.
842 */
Benny Prijono40212582006-06-22 18:41:28 +0000843 write_op = (struct write_operation*)op_key;
844
845 /* Spin if write_op has pending operation */
846 for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
847 pj_thread_sleep(0);
848
849 /* Last chance */
850 if (write_op->op) {
851 /* Unable to send packet because there is already pending write in the
852 * write_op. We could not put the operation into the write_op
853 * because write_op already contains a pending operation! And
854 * we could not send the packet directly with send() either,
855 * because that will break the order of the packet. So we can
856 * only return error here.
857 *
858 * This could happen for example in multithreads program,
859 * where polling is done by one thread, while other threads are doing
860 * the sending only. If the polling thread runs on lower priority
861 * than the sending thread, then it's possible that the pending
862 * write flag is not cleared in-time because clearing is only done
863 * during polling.
864 *
865 * Aplication should specify multiple write operation keys on
866 * situation like this.
867 */
868 //pj_assert(!"ioqueue: there is pending operation on this key!");
869 return PJ_EBUSY;
870 }
871
Benny Prijono9033e312005-11-21 02:08:39 +0000872 write_op->op = PJ_IOQUEUE_OP_SEND;
Benny Prijonoa1e69682007-05-11 15:14:34 +0000873 write_op->buf = (char*)data;
Benny Prijono9033e312005-11-21 02:08:39 +0000874 write_op->size = *length;
875 write_op->written = 0;
876 write_op->flags = flags;
877
878 pj_mutex_lock(key->mutex);
Benny Prijono25cb51d2009-07-02 08:24:22 +0000879 /* Check again. Handle may have been closed after the previous check
880 * in multithreaded app. If we add bad handle to the set it will
881 * corrupt the ioqueue set. See #913
882 */
883 if (IS_CLOSING(key)) {
884 pj_mutex_unlock(key->mutex);
885 return PJ_ECANCELLED;
886 }
Benny Prijono9033e312005-11-21 02:08:39 +0000887 pj_list_insert_before(&key->write_list, write_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000888 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000889 pj_mutex_unlock(key->mutex);
890
891 return PJ_EPENDING;
892}
893
894
895/*
896 * pj_ioqueue_sendto()
897 *
898 * Start asynchronous write() to the descriptor.
899 */
900PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
901 pj_ioqueue_op_key_t *op_key,
902 const void *data,
903 pj_ssize_t *length,
904 pj_uint32_t flags,
905 const pj_sockaddr_t *addr,
906 int addrlen)
907{
908 struct write_operation *write_op;
Benny Prijono40212582006-06-22 18:41:28 +0000909 unsigned retry;
Benny Prijono9033e312005-11-21 02:08:39 +0000910 pj_status_t status;
911 pj_ssize_t sent;
912
913 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
914 PJ_CHECK_STACK();
915
Benny Prijono5accbd02006-03-30 16:32:18 +0000916 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000917 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000918 return PJ_ECANCELLED;
919
Benny Prijono9033e312005-11-21 02:08:39 +0000920 /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */
921 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
922
923 /* Fast track:
924 * Try to send data immediately, only if there's no pending write!
925 * Note:
926 * We are speculating that the list is empty here without properly
927 * acquiring ioqueue's mutex first. This is intentional, to maximize
928 * performance via parallelism.
929 *
930 * This should be safe, because:
931 * - by convention, we require caller to make sure that the
932 * key is not unregistered while other threads are invoking
933 * an operation on the same key.
934 * - pj_list_empty() is safe to be invoked by multiple threads,
935 * even when other threads are modifying the list.
936 */
937 if (pj_list_empty(&key->write_list)) {
938 /*
939 * See if data can be sent immediately.
940 */
941 sent = *length;
942 status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
943 if (status == PJ_SUCCESS) {
944 /* Success! */
945 *length = sent;
946 return PJ_SUCCESS;
947 } else {
948 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
949 * the error to caller.
950 */
951 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
952 return status;
953 }
Benny Prijono40212582006-06-22 18:41:28 +0000954 status = status;
Benny Prijono9033e312005-11-21 02:08:39 +0000955 }
956 }
957
958 /*
959 * Check that address storage can hold the address parameter.
960 */
Benny Prijonoa1e69682007-05-11 15:14:34 +0000961 PJ_ASSERT_RETURN(addrlen <= (int)sizeof(pj_sockaddr_in), PJ_EBUG);
Benny Prijono9033e312005-11-21 02:08:39 +0000962
963 /*
964 * Schedule asynchronous send.
965 */
Benny Prijono40212582006-06-22 18:41:28 +0000966 write_op = (struct write_operation*)op_key;
967
968 /* Spin if write_op has pending operation */
969 for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
970 pj_thread_sleep(0);
971
972 /* Last chance */
973 if (write_op->op) {
974 /* Unable to send packet because there is already pending write on the
975 * write_op. We could not put the operation into the write_op
976 * because write_op already contains a pending operation! And
977 * we could not send the packet directly with sendto() either,
978 * because that will break the order of the packet. So we can
979 * only return error here.
980 *
981 * This could happen for example in multithreads program,
982 * where polling is done by one thread, while other threads are doing
983 * the sending only. If the polling thread runs on lower priority
984 * than the sending thread, then it's possible that the pending
985 * write flag is not cleared in-time because clearing is only done
986 * during polling.
987 *
988 * Aplication should specify multiple write operation keys on
989 * situation like this.
990 */
991 //pj_assert(!"ioqueue: there is pending operation on this key!");
992 return PJ_EBUSY;
993 }
994
Benny Prijono9033e312005-11-21 02:08:39 +0000995 write_op->op = PJ_IOQUEUE_OP_SEND_TO;
Benny Prijonoa1e69682007-05-11 15:14:34 +0000996 write_op->buf = (char*)data;
Benny Prijono9033e312005-11-21 02:08:39 +0000997 write_op->size = *length;
998 write_op->written = 0;
999 write_op->flags = flags;
1000 pj_memcpy(&write_op->rmt_addr, addr, addrlen);
1001 write_op->rmt_addrlen = addrlen;
1002
1003 pj_mutex_lock(key->mutex);
Benny Prijono25cb51d2009-07-02 08:24:22 +00001004 /* Check again. Handle may have been closed after the previous check
1005 * in multithreaded app. If we add bad handle to the set it will
1006 * corrupt the ioqueue set. See #913
1007 */
1008 if (IS_CLOSING(key)) {
1009 pj_mutex_unlock(key->mutex);
1010 return PJ_ECANCELLED;
1011 }
Benny Prijono9033e312005-11-21 02:08:39 +00001012 pj_list_insert_before(&key->write_list, write_op);
Benny Prijono63ab3562006-07-08 19:46:43 +00001013 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +00001014 pj_mutex_unlock(key->mutex);
1015
1016 return PJ_EPENDING;
1017}
1018
1019#if PJ_HAS_TCP
1020/*
1021 * Initiate overlapped accept() operation.
1022 */
1023PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
1024 pj_ioqueue_op_key_t *op_key,
1025 pj_sock_t *new_sock,
1026 pj_sockaddr_t *local,
1027 pj_sockaddr_t *remote,
1028 int *addrlen)
1029{
1030 struct accept_operation *accept_op;
1031 pj_status_t status;
1032
1033 /* check parameters. All must be specified! */
1034 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
1035
Benny Prijono5accbd02006-03-30 16:32:18 +00001036 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +00001037 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +00001038 return PJ_ECANCELLED;
1039
Benny Prijono9033e312005-11-21 02:08:39 +00001040 accept_op = (struct accept_operation*)op_key;
Benny Prijonoa1e69682007-05-11 15:14:34 +00001041 accept_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +00001042
1043 /* Fast track:
1044 * See if there's new connection available immediately.
1045 */
1046 if (pj_list_empty(&key->accept_list)) {
1047 status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
1048 if (status == PJ_SUCCESS) {
1049 /* Yes! New connection is available! */
1050 if (local && addrlen) {
1051 status = pj_sock_getsockname(*new_sock, local, addrlen);
1052 if (status != PJ_SUCCESS) {
1053 pj_sock_close(*new_sock);
1054 *new_sock = PJ_INVALID_SOCKET;
1055 return status;
1056 }
1057 }
1058 return PJ_SUCCESS;
1059 } else {
1060 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
1061 * the error to caller.
1062 */
1063 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
1064 return status;
1065 }
1066 }
1067 }
1068
1069 /*
1070 * No connection is available immediately.
1071 * Schedule accept() operation to be completed when there is incoming
1072 * connection available.
1073 */
1074 accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
1075 accept_op->accept_fd = new_sock;
1076 accept_op->rmt_addr = remote;
1077 accept_op->addrlen= addrlen;
1078 accept_op->local_addr = local;
1079
1080 pj_mutex_lock(key->mutex);
Benny Prijono25cb51d2009-07-02 08:24:22 +00001081 /* Check again. Handle may have been closed after the previous check
1082 * in multithreaded app. If we add bad handle to the set it will
1083 * corrupt the ioqueue set. See #913
1084 */
1085 if (IS_CLOSING(key)) {
1086 pj_mutex_unlock(key->mutex);
1087 return PJ_ECANCELLED;
1088 }
Benny Prijono9033e312005-11-21 02:08:39 +00001089 pj_list_insert_before(&key->accept_list, accept_op);
Benny Prijono63ab3562006-07-08 19:46:43 +00001090 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +00001091 pj_mutex_unlock(key->mutex);
1092
1093 return PJ_EPENDING;
1094}
1095
1096/*
1097 * Initiate overlapped connect() operation (well, it's non-blocking actually,
1098 * since there's no overlapped version of connect()).
1099 */
1100PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
1101 const pj_sockaddr_t *addr,
1102 int addrlen )
1103{
1104 pj_status_t status;
1105
1106 /* check parameters. All must be specified! */
1107 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
1108
Benny Prijono5accbd02006-03-30 16:32:18 +00001109 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +00001110 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +00001111 return PJ_ECANCELLED;
1112
Benny Prijono9033e312005-11-21 02:08:39 +00001113 /* Check if socket has not been marked for connecting */
1114 if (key->connecting != 0)
1115 return PJ_EPENDING;
1116
1117 status = pj_sock_connect(key->fd, addr, addrlen);
1118 if (status == PJ_SUCCESS) {
1119 /* Connected! */
1120 return PJ_SUCCESS;
1121 } else {
1122 if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
1123 /* Pending! */
1124 pj_mutex_lock(key->mutex);
Benny Prijono25cb51d2009-07-02 08:24:22 +00001125 /* Check again. Handle may have been closed after the previous
1126 * check in multithreaded app. See #913
1127 */
1128 if (IS_CLOSING(key)) {
1129 pj_mutex_unlock(key->mutex);
1130 return PJ_ECANCELLED;
1131 }
Benny Prijono9033e312005-11-21 02:08:39 +00001132 key->connecting = PJ_TRUE;
Benny Prijono63ab3562006-07-08 19:46:43 +00001133 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
1134 ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +00001135 pj_mutex_unlock(key->mutex);
1136 return PJ_EPENDING;
1137 } else {
1138 /* Error! */
1139 return status;
1140 }
1141 }
1142}
1143#endif /* PJ_HAS_TCP */
1144
1145
1146PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1147 pj_size_t size )
1148{
Benny Prijonoac623b32006-07-03 15:19:31 +00001149 pj_bzero(op_key, size);
Benny Prijono9033e312005-11-21 02:08:39 +00001150}
1151
1152
1153/*
1154 * pj_ioqueue_is_pending()
1155 */
1156PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1157 pj_ioqueue_op_key_t *op_key )
1158{
1159 struct generic_operation *op_rec;
1160
1161 PJ_UNUSED_ARG(key);
1162
1163 op_rec = (struct generic_operation*)op_key;
1164 return op_rec->op != 0;
1165}
1166
1167
1168/*
1169 * pj_ioqueue_post_completion()
1170 */
1171PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1172 pj_ioqueue_op_key_t *op_key,
1173 pj_ssize_t bytes_status )
1174{
1175 struct generic_operation *op_rec;
1176
1177 /*
1178 * Find the operation key in all pending operation list to
1179 * really make sure that it's still there; then call the callback.
1180 */
Benny Prijono5accbd02006-03-30 16:32:18 +00001181 pj_mutex_lock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001182
1183 /* Find the operation in the pending read list. */
1184 op_rec = (struct generic_operation*)key->read_list.next;
1185 while (op_rec != (void*)&key->read_list) {
1186 if (op_rec == (void*)op_key) {
1187 pj_list_erase(op_rec);
Benny Prijonoa1e69682007-05-11 15:14:34 +00001188 op_rec->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono5accbd02006-03-30 16:32:18 +00001189 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001190
1191 (*key->cb.on_read_complete)(key, op_key, bytes_status);
1192 return PJ_SUCCESS;
1193 }
1194 op_rec = op_rec->next;
1195 }
1196
1197 /* Find the operation in the pending write list. */
1198 op_rec = (struct generic_operation*)key->write_list.next;
1199 while (op_rec != (void*)&key->write_list) {
1200 if (op_rec == (void*)op_key) {
1201 pj_list_erase(op_rec);
Benny Prijonoa1e69682007-05-11 15:14:34 +00001202 op_rec->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono5accbd02006-03-30 16:32:18 +00001203 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001204
1205 (*key->cb.on_write_complete)(key, op_key, bytes_status);
1206 return PJ_SUCCESS;
1207 }
1208 op_rec = op_rec->next;
1209 }
1210
1211 /* Find the operation in the pending accept list. */
1212 op_rec = (struct generic_operation*)key->accept_list.next;
1213 while (op_rec != (void*)&key->accept_list) {
1214 if (op_rec == (void*)op_key) {
1215 pj_list_erase(op_rec);
Benny Prijonoa1e69682007-05-11 15:14:34 +00001216 op_rec->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono5accbd02006-03-30 16:32:18 +00001217 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001218
1219 (*key->cb.on_accept_complete)(key, op_key,
1220 PJ_INVALID_SOCKET,
1221 bytes_status);
1222 return PJ_SUCCESS;
1223 }
1224 op_rec = op_rec->next;
1225 }
1226
Benny Prijono5accbd02006-03-30 16:32:18 +00001227 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001228
1229 return PJ_EINVALIDOP;
1230}
1231
Benny Prijonoe3f79fd2008-02-13 15:17:28 +00001232PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency( pj_ioqueue_t *ioqueue,
1233 pj_bool_t allow)
1234{
1235 PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
1236 ioqueue->default_concurrency = allow;
1237 return PJ_SUCCESS;
1238}
1239
1240
1241PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
1242 pj_bool_t allow)
1243{
1244 PJ_ASSERT_RETURN(key, PJ_EINVAL);
1245
1246 /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
1247 * disabled.
1248 */
1249 PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
1250
1251 key->allow_concurrent = allow;
1252 return PJ_SUCCESS;
1253}
1254
1255PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
1256{
1257 return pj_mutex_lock(key->mutex);
1258}
1259
1260PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
1261{
1262 return pj_mutex_unlock(key->mutex);
1263}
1264