blob: cccabc8b425177613ca3f763e31880953bcc370b [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
Benny Prijono844653c2008-12-23 17:27:53 +00003 * Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono9033e312005-11-21 02:08:39 +00005 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20
21/*
22 * ioqueue_common_abs.c
23 *
24 * This contains common functionalities to emulate proactor pattern with
25 * various event dispatching mechanisms (e.g. select, epoll).
26 *
27 * This file will be included by the appropriate ioqueue implementation.
28 * This file is NOT supposed to be compiled as stand-alone source.
29 */
30
Benny Prijono40212582006-06-22 18:41:28 +000031#define PENDING_RETRY 2
32
Benny Prijono9033e312005-11-21 02:08:39 +000033static void ioqueue_init( pj_ioqueue_t *ioqueue )
34{
35 ioqueue->lock = NULL;
36 ioqueue->auto_delete_lock = 0;
Benny Prijono8eb763c2008-07-12 10:09:39 +000037 ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
Benny Prijono9033e312005-11-21 02:08:39 +000038}
39
40static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
41{
42 if (ioqueue->auto_delete_lock && ioqueue->lock ) {
43 pj_lock_release(ioqueue->lock);
44 return pj_lock_destroy(ioqueue->lock);
Nanang Izzuddin838cb322008-12-18 17:52:57 +000045 }
46
47 return PJ_SUCCESS;
Benny Prijono9033e312005-11-21 02:08:39 +000048}
49
50/*
51 * pj_ioqueue_set_lock()
52 */
53PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
54 pj_lock_t *lock,
55 pj_bool_t auto_delete )
56{
57 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
58
59 if (ioqueue->auto_delete_lock && ioqueue->lock) {
60 pj_lock_destroy(ioqueue->lock);
61 }
62
63 ioqueue->lock = lock;
64 ioqueue->auto_delete_lock = auto_delete;
65
66 return PJ_SUCCESS;
67}
68
69static pj_status_t ioqueue_init_key( pj_pool_t *pool,
70 pj_ioqueue_t *ioqueue,
71 pj_ioqueue_key_t *key,
72 pj_sock_t sock,
73 void *user_data,
74 const pj_ioqueue_callback *cb)
75{
76 pj_status_t rc;
77 int optlen;
78
Benny Prijono8befd9f2006-05-13 22:46:23 +000079 PJ_UNUSED_ARG(pool);
80
Benny Prijono9033e312005-11-21 02:08:39 +000081 key->ioqueue = ioqueue;
82 key->fd = sock;
83 key->user_data = user_data;
84 pj_list_init(&key->read_list);
85 pj_list_init(&key->write_list);
86#if PJ_HAS_TCP
87 pj_list_init(&key->accept_list);
Benny Prijono5accbd02006-03-30 16:32:18 +000088 key->connecting = 0;
Benny Prijono9033e312005-11-21 02:08:39 +000089#endif
90
91 /* Save callback. */
92 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
93
Benny Prijono5accbd02006-03-30 16:32:18 +000094#if PJ_IOQUEUE_HAS_SAFE_UNREG
95 /* Set initial reference count to 1 */
96 pj_assert(key->ref_count == 0);
97 ++key->ref_count;
98
99 key->closing = 0;
100#endif
101
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000102 rc = pj_ioqueue_set_concurrency(key, ioqueue->default_concurrency);
103 if (rc != PJ_SUCCESS)
104 return rc;
105
Benny Prijono9033e312005-11-21 02:08:39 +0000106 /* Get socket type. When socket type is datagram, some optimization
107 * will be performed during send to allow parallel send operations.
108 */
109 optlen = sizeof(key->fd_type);
Benny Prijono8ab968f2007-07-20 08:08:30 +0000110 rc = pj_sock_getsockopt(sock, pj_SOL_SOCKET(), pj_SO_TYPE(),
Benny Prijono9033e312005-11-21 02:08:39 +0000111 &key->fd_type, &optlen);
112 if (rc != PJ_SUCCESS)
Benny Prijono8ab968f2007-07-20 08:08:30 +0000113 key->fd_type = pj_SOCK_STREAM();
Benny Prijono9033e312005-11-21 02:08:39 +0000114
115 /* Create mutex for the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000116#if !PJ_IOQUEUE_HAS_SAFE_UNREG
117 rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
118#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000119
120 return rc;
121}
122
Benny Prijono9033e312005-11-21 02:08:39 +0000123/*
124 * pj_ioqueue_get_user_data()
125 *
126 * Obtain value associated with a key.
127 */
128PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
129{
130 PJ_ASSERT_RETURN(key != NULL, NULL);
131 return key->user_data;
132}
133
134/*
135 * pj_ioqueue_set_user_data()
136 */
137PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
138 void *user_data,
139 void **old_data)
140{
141 PJ_ASSERT_RETURN(key, PJ_EINVAL);
142
143 if (old_data)
144 *old_data = key->user_data;
145 key->user_data = user_data;
146
147 return PJ_SUCCESS;
148}
149
150PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
151{
152 return !pj_list_empty(&key->write_list);
153}
154
155PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
156{
157 return !pj_list_empty(&key->read_list);
158}
159
160PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
161{
162#if PJ_HAS_TCP
163 return !pj_list_empty(&key->accept_list);
164#else
Benny Prijono3569c0d2007-04-06 10:29:20 +0000165 PJ_UNUSED_ARG(key);
Benny Prijono9033e312005-11-21 02:08:39 +0000166 return 0;
167#endif
168}
169
170PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
171{
172 return key->connecting;
173}
174
175
Benny Prijono5accbd02006-03-30 16:32:18 +0000176#if PJ_IOQUEUE_HAS_SAFE_UNREG
177# define IS_CLOSING(key) (key->closing)
178#else
179# define IS_CLOSING(key) (0)
180#endif
181
182
Benny Prijono9033e312005-11-21 02:08:39 +0000183/*
184 * ioqueue_dispatch_event()
185 *
186 * Report occurence of an event in the key to be processed by the
187 * framework.
188 */
189void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
190{
191 /* Lock the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000192 pj_mutex_lock(h->mutex);
193
Benny Prijono3059eb62006-10-04 20:46:27 +0000194 if (IS_CLOSING(h)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000195 pj_mutex_unlock(h->mutex);
196 return;
197 }
Benny Prijono9033e312005-11-21 02:08:39 +0000198
199#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
200 if (h->connecting) {
201 /* Completion of connect() operation */
202 pj_ssize_t bytes_transfered;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000203 pj_bool_t has_lock;
Benny Prijono9033e312005-11-21 02:08:39 +0000204
205 /* Clear operation. */
206 h->connecting = 0;
207
Benny Prijono63ab3562006-07-08 19:46:43 +0000208 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
209 ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000210
Benny Prijono9033e312005-11-21 02:08:39 +0000211
212#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
213 /* from connect(2):
214 * On Linux, use getsockopt to read the SO_ERROR option at
215 * level SOL_SOCKET to determine whether connect() completed
216 * successfully (if SO_ERROR is zero).
217 */
218 {
219 int value;
Benny Prijono7db431e2006-07-23 14:38:49 +0000220 int vallen = sizeof(value);
Benny Prijono2bbd7102006-07-18 00:10:53 +0000221 int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
222 &value, &vallen);
Benny Prijono9033e312005-11-21 02:08:39 +0000223 if (gs_rc != 0) {
224 /* Argh!! What to do now???
225 * Just indicate that the socket is connected. The
226 * application will get error as soon as it tries to use
227 * the socket to send/receive.
228 */
229 bytes_transfered = 0;
230 } else {
231 bytes_transfered = value;
232 }
233 }
234#elif defined(PJ_WIN32) && PJ_WIN32!=0
235 bytes_transfered = 0; /* success */
236#else
237 /* Excellent information in D.J. Bernstein page:
238 * http://cr.yp.to/docs/connect.html
239 *
240 * Seems like the most portable way of detecting connect()
241 * failure is to call getpeername(). If socket is connected,
242 * getpeername() will return 0. If the socket is not connected,
243 * it will return ENOTCONN, and read(fd, &ch, 1) will produce
244 * the right errno through error slippage. This is a combination
245 * of suggestions from Douglas C. Schmidt and Ken Keys.
246 */
Benny Prijono9cf138e2006-01-19 03:58:29 +0000247 {
248 int gp_rc;
249 struct sockaddr_in addr;
250 socklen_t addrlen = sizeof(addr);
Benny Prijono9033e312005-11-21 02:08:39 +0000251
Benny Prijono9cf138e2006-01-19 03:58:29 +0000252 gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000253 bytes_transfered = (gp_rc < 0) ? gp_rc : -gp_rc;
Benny Prijono9cf138e2006-01-19 03:58:29 +0000254 }
Benny Prijono9033e312005-11-21 02:08:39 +0000255#endif
256
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000257 /* Unlock; from this point we don't need to hold key's mutex
258 * (unless concurrency is disabled, which in this case we should
259 * hold the mutex while calling the callback) */
260 if (h->allow_concurrent) {
261 /* concurrency may be changed while we're in the callback, so
262 * save it to a flag.
263 */
264 has_lock = PJ_FALSE;
265 pj_mutex_unlock(h->mutex);
266 } else {
267 has_lock = PJ_TRUE;
268 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000269
Benny Prijono9033e312005-11-21 02:08:39 +0000270 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000271 if (h->cb.on_connect_complete && !IS_CLOSING(h))
Benny Prijono9033e312005-11-21 02:08:39 +0000272 (*h->cb.on_connect_complete)(h, bytes_transfered);
273
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000274 /* Unlock if we still hold the lock */
275 if (has_lock) {
276 pj_mutex_unlock(h->mutex);
277 }
278
Benny Prijono9033e312005-11-21 02:08:39 +0000279 /* Done. */
280
281 } else
282#endif /* PJ_HAS_TCP */
283 if (key_has_pending_write(h)) {
284 /* Socket is writable. */
285 struct write_operation *write_op;
286 pj_ssize_t sent;
287 pj_status_t send_rc;
288
289 /* Get the first in the queue. */
290 write_op = h->write_list.next;
291
292 /* For datagrams, we can remove the write_op from the list
293 * so that send() can work in parallel.
294 */
Benny Prijono8ab968f2007-07-20 08:08:30 +0000295 if (h->fd_type == pj_SOCK_DGRAM()) {
Benny Prijono9033e312005-11-21 02:08:39 +0000296 pj_list_erase(write_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000297
298 if (pj_list_empty(&h->write_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000299 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000300
Benny Prijono9033e312005-11-21 02:08:39 +0000301 }
302
303 /* Send the data.
304 * Unfortunately we must do this while holding key's mutex, thus
305 * preventing parallel write on a single key.. :-((
306 */
307 sent = write_op->size - write_op->written;
308 if (write_op->op == PJ_IOQUEUE_OP_SEND) {
309 send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
310 &sent, write_op->flags);
Benny Prijono40212582006-06-22 18:41:28 +0000311 /* Can't do this. We only clear "op" after we're finished sending
312 * the whole buffer.
313 */
314 //write_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000315 } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
316 send_rc = pj_sock_sendto(h->fd,
317 write_op->buf+write_op->written,
318 &sent, write_op->flags,
319 &write_op->rmt_addr,
320 write_op->rmt_addrlen);
Benny Prijono40212582006-06-22 18:41:28 +0000321 /* Can't do this. We only clear "op" after we're finished sending
322 * the whole buffer.
323 */
324 //write_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000325 } else {
326 pj_assert(!"Invalid operation type!");
Benny Prijonoa1e69682007-05-11 15:14:34 +0000327 write_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000328 send_rc = PJ_EBUG;
329 }
330
331 if (send_rc == PJ_SUCCESS) {
332 write_op->written += sent;
333 } else {
334 pj_assert(send_rc > 0);
335 write_op->written = -send_rc;
336 }
337
338 /* Are we finished with this buffer? */
339 if (send_rc!=PJ_SUCCESS ||
340 write_op->written == (pj_ssize_t)write_op->size ||
Benny Prijono8ab968f2007-07-20 08:08:30 +0000341 h->fd_type == pj_SOCK_DGRAM())
Benny Prijono9033e312005-11-21 02:08:39 +0000342 {
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000343 pj_bool_t has_lock;
Benny Prijono40212582006-06-22 18:41:28 +0000344
Benny Prijonoa1e69682007-05-11 15:14:34 +0000345 write_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono40212582006-06-22 18:41:28 +0000346
Benny Prijono8ab968f2007-07-20 08:08:30 +0000347 if (h->fd_type != pj_SOCK_DGRAM()) {
Benny Prijono9033e312005-11-21 02:08:39 +0000348 /* Write completion of the whole stream. */
349 pj_list_erase(write_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000350
351 /* Clear operation if there's no more data to send. */
352 if (pj_list_empty(&h->write_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000353 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000354
Benny Prijono9033e312005-11-21 02:08:39 +0000355 }
356
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000357 /* Unlock; from this point we don't need to hold key's mutex
358 * (unless concurrency is disabled, which in this case we should
359 * hold the mutex while calling the callback) */
360 if (h->allow_concurrent) {
361 /* concurrency may be changed while we're in the callback, so
362 * save it to a flag.
363 */
364 has_lock = PJ_FALSE;
365 pj_mutex_unlock(h->mutex);
366 } else {
367 has_lock = PJ_TRUE;
368 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000369
Benny Prijono9033e312005-11-21 02:08:39 +0000370 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000371 if (h->cb.on_write_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000372 (*h->cb.on_write_complete)(h,
373 (pj_ioqueue_op_key_t*)write_op,
374 write_op->written);
375 }
376
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000377 if (has_lock) {
378 pj_mutex_unlock(h->mutex);
379 }
380
Benny Prijono9033e312005-11-21 02:08:39 +0000381 } else {
Benny Prijono5accbd02006-03-30 16:32:18 +0000382 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000383 }
384
385 /* Done. */
386 } else {
387 /*
388 * This is normal; execution may fall here when multiple threads
389 * are signalled for the same event, but only one thread eventually
390 * able to process the event.
391 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000392 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000393 }
394}
395
396void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
397{
398 pj_status_t rc;
399
400 /* Lock the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000401 pj_mutex_lock(h->mutex);
402
Benny Prijono3059eb62006-10-04 20:46:27 +0000403 if (IS_CLOSING(h)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000404 pj_mutex_unlock(h->mutex);
405 return;
406 }
Benny Prijono9033e312005-11-21 02:08:39 +0000407
408# if PJ_HAS_TCP
409 if (!pj_list_empty(&h->accept_list)) {
410
411 struct accept_operation *accept_op;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000412 pj_bool_t has_lock;
Benny Prijono9033e312005-11-21 02:08:39 +0000413
414 /* Get one accept operation from the list. */
415 accept_op = h->accept_list.next;
416 pj_list_erase(accept_op);
Benny Prijonoa1e69682007-05-11 15:14:34 +0000417 accept_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000418
419 /* Clear bit in fdset if there is no more pending accept */
420 if (pj_list_empty(&h->accept_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000421 ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000422
Benny Prijono9033e312005-11-21 02:08:39 +0000423 rc=pj_sock_accept(h->fd, accept_op->accept_fd,
424 accept_op->rmt_addr, accept_op->addrlen);
425 if (rc==PJ_SUCCESS && accept_op->local_addr) {
426 rc = pj_sock_getsockname(*accept_op->accept_fd,
427 accept_op->local_addr,
428 accept_op->addrlen);
429 }
430
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000431 /* Unlock; from this point we don't need to hold key's mutex
432 * (unless concurrency is disabled, which in this case we should
433 * hold the mutex while calling the callback) */
434 if (h->allow_concurrent) {
435 /* concurrency may be changed while we're in the callback, so
436 * save it to a flag.
437 */
438 has_lock = PJ_FALSE;
439 pj_mutex_unlock(h->mutex);
440 } else {
441 has_lock = PJ_TRUE;
442 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000443
Benny Prijono9033e312005-11-21 02:08:39 +0000444 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000445 if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000446 (*h->cb.on_accept_complete)(h,
447 (pj_ioqueue_op_key_t*)accept_op,
448 *accept_op->accept_fd, rc);
449 }
450
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000451 if (has_lock) {
452 pj_mutex_unlock(h->mutex);
453 }
Benny Prijono9033e312005-11-21 02:08:39 +0000454 }
455 else
456# endif
457 if (key_has_pending_read(h)) {
458 struct read_operation *read_op;
459 pj_ssize_t bytes_read;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000460 pj_bool_t has_lock;
Benny Prijono9033e312005-11-21 02:08:39 +0000461
462 /* Get one pending read operation from the list. */
463 read_op = h->read_list.next;
464 pj_list_erase(read_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000465
466 /* Clear fdset if there is no pending read. */
467 if (pj_list_empty(&h->read_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000468 ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000469
Benny Prijono9033e312005-11-21 02:08:39 +0000470 bytes_read = read_op->size;
471
472 if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
Benny Prijonoa1e69682007-05-11 15:14:34 +0000473 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijonof6b95302006-12-25 06:36:23 +0000474 rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read,
475 read_op->flags,
Benny Prijono9033e312005-11-21 02:08:39 +0000476 read_op->rmt_addr,
477 read_op->rmt_addrlen);
478 } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
Benny Prijonoa1e69682007-05-11 15:14:34 +0000479 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijonof6b95302006-12-25 06:36:23 +0000480 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
481 read_op->flags);
Benny Prijono9033e312005-11-21 02:08:39 +0000482 } else {
483 pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
Benny Prijonoa1e69682007-05-11 15:14:34 +0000484 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000485 /*
486 * User has specified pj_ioqueue_read().
487 * On Win32, we should do ReadFile(). But because we got
488 * here because of select() anyway, user must have put a
489 * socket descriptor on h->fd, which in this case we can
490 * just call pj_sock_recv() instead of ReadFile().
491 * On Unix, user may put a file in h->fd, so we'll have
492 * to call read() here.
493 * This may not compile on systems which doesn't have
494 * read(). That's why we only specify PJ_LINUX here so
495 * that error is easier to catch.
496 */
Benny Prijono9cf138e2006-01-19 03:58:29 +0000497# if defined(PJ_WIN32) && PJ_WIN32 != 0 || \
498 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE != 0
Benny Prijonof6b95302006-12-25 06:36:23 +0000499 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
500 read_op->flags);
Benny Prijono9033e312005-11-21 02:08:39 +0000501 //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
502 // &bytes_read, NULL);
503# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
504 bytes_read = read(h->fd, read_op->buf, bytes_read);
505 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
506# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
507 bytes_read = sys_read(h->fd, read_op->buf, bytes_read);
508 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
509# else
510# error "Implement read() for this platform!"
511# endif
512 }
513
514 if (rc != PJ_SUCCESS) {
515# if defined(PJ_WIN32) && PJ_WIN32 != 0
516 /* On Win32, for UDP, WSAECONNRESET on the receive side
517 * indicates that previous sending has triggered ICMP Port
518 * Unreachable message.
519 * But we wouldn't know at this point which one of previous
520 * key that has triggered the error, since UDP socket can
521 * be shared!
522 * So we'll just ignore it!
523 */
524
525 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
526 //PJ_LOG(4,(THIS_FILE,
527 // "Ignored ICMP port unreach. on key=%p", h));
528 }
529# endif
530
531 /* In any case we would report this to caller. */
532 bytes_read = -rc;
533 }
534
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000535 /* Unlock; from this point we don't need to hold key's mutex
536 * (unless concurrency is disabled, which in this case we should
537 * hold the mutex while calling the callback) */
538 if (h->allow_concurrent) {
539 /* concurrency may be changed while we're in the callback, so
540 * save it to a flag.
541 */
542 has_lock = PJ_FALSE;
543 pj_mutex_unlock(h->mutex);
544 } else {
545 has_lock = PJ_TRUE;
546 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000547
Benny Prijono9033e312005-11-21 02:08:39 +0000548 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000549 if (h->cb.on_read_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000550 (*h->cb.on_read_complete)(h,
551 (pj_ioqueue_op_key_t*)read_op,
552 bytes_read);
553 }
554
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000555 if (has_lock) {
556 pj_mutex_unlock(h->mutex);
557 }
558
Benny Prijono9033e312005-11-21 02:08:39 +0000559 } else {
560 /*
561 * This is normal; execution may fall here when multiple threads
562 * are signalled for the same event, but only one thread eventually
563 * able to process the event.
564 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000565 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000566 }
567}
568
569
570void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
571 pj_ioqueue_key_t *h )
572{
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000573 pj_bool_t has_lock;
574
Benny Prijono5accbd02006-03-30 16:32:18 +0000575 pj_mutex_lock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000576
577 if (!h->connecting) {
578 /* It is possible that more than one thread was woken up, thus
579 * the remaining thread will see h->connecting as zero because
580 * it has been processed by other thread.
581 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000582 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000583 return;
584 }
585
Benny Prijono3059eb62006-10-04 20:46:27 +0000586 if (IS_CLOSING(h)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000587 pj_mutex_unlock(h->mutex);
588 return;
589 }
590
Benny Prijono9033e312005-11-21 02:08:39 +0000591 /* Clear operation. */
592 h->connecting = 0;
593
Benny Prijono63ab3562006-07-08 19:46:43 +0000594 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
595 ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000596
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000597 /* Unlock; from this point we don't need to hold key's mutex
598 * (unless concurrency is disabled, which in this case we should
599 * hold the mutex while calling the callback) */
600 if (h->allow_concurrent) {
601 /* concurrency may be changed while we're in the callback, so
602 * save it to a flag.
603 */
604 has_lock = PJ_FALSE;
605 pj_mutex_unlock(h->mutex);
606 } else {
607 has_lock = PJ_TRUE;
608 }
Benny Prijonoac52df42006-03-25 10:06:00 +0000609
Benny Prijono5accbd02006-03-30 16:32:18 +0000610 /* Call callback. */
Benny Prijono2bbd7102006-07-18 00:10:53 +0000611 if (h->cb.on_connect_complete && !IS_CLOSING(h)) {
612 pj_status_t status = -1;
613#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
614 int value;
Benny Prijono7db431e2006-07-23 14:38:49 +0000615 int vallen = sizeof(value);
Benny Prijono2bbd7102006-07-18 00:10:53 +0000616 int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
617 &value, &vallen);
618 if (gs_rc == 0) {
619 status = PJ_RETURN_OS_ERROR(value);
620 }
621#endif
622
623 (*h->cb.on_connect_complete)(h, status);
624 }
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000625
626 if (has_lock) {
627 pj_mutex_unlock(h->mutex);
628 }
Benny Prijono9033e312005-11-21 02:08:39 +0000629}
630
631/*
632 * pj_ioqueue_recv()
633 *
634 * Start asynchronous recv() from the socket.
635 */
636PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
637 pj_ioqueue_op_key_t *op_key,
638 void *buffer,
639 pj_ssize_t *length,
640 unsigned flags )
641{
642 struct read_operation *read_op;
643
644 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
645 PJ_CHECK_STACK();
646
Benny Prijono8ac081b2008-02-14 14:20:38 +0000647 /* Check if key is closing (need to do this first before accessing
648 * other variables, since they might have been destroyed. See ticket
649 * #469).
650 */
Benny Prijono3059eb62006-10-04 20:46:27 +0000651 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000652 return PJ_ECANCELLED;
653
Benny Prijono8ac081b2008-02-14 14:20:38 +0000654 read_op = (struct read_operation*)op_key;
655 read_op->op = PJ_IOQUEUE_OP_NONE;
656
Benny Prijono9033e312005-11-21 02:08:39 +0000657 /* Try to see if there's data immediately available.
658 */
659 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
660 pj_status_t status;
661 pj_ssize_t size;
662
663 size = *length;
664 status = pj_sock_recv(key->fd, buffer, &size, flags);
665 if (status == PJ_SUCCESS) {
666 /* Yes! Data is available! */
667 *length = size;
668 return PJ_SUCCESS;
669 } else {
670 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
671 * the error to caller.
672 */
673 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
674 return status;
675 }
676 }
677
678 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
679
680 /*
681 * No data is immediately available.
682 * Must schedule asynchronous operation to the ioqueue.
683 */
684 read_op->op = PJ_IOQUEUE_OP_RECV;
685 read_op->buf = buffer;
686 read_op->size = *length;
687 read_op->flags = flags;
688
689 pj_mutex_lock(key->mutex);
Benny Prijono25cb51d2009-07-02 08:24:22 +0000690 /* Check again. Handle may have been closed after the previous check
691 * in multithreaded app. If we add bad handle to the set it will
692 * corrupt the ioqueue set. See #913
693 */
694 if (IS_CLOSING(key)) {
695 pj_mutex_unlock(key->mutex);
696 return PJ_ECANCELLED;
697 }
Benny Prijono9033e312005-11-21 02:08:39 +0000698 pj_list_insert_before(&key->read_list, read_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000699 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000700 pj_mutex_unlock(key->mutex);
701
702 return PJ_EPENDING;
703}
704
705/*
706 * pj_ioqueue_recvfrom()
707 *
708 * Start asynchronous recvfrom() from the socket.
709 */
710PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
711 pj_ioqueue_op_key_t *op_key,
712 void *buffer,
713 pj_ssize_t *length,
714 unsigned flags,
715 pj_sockaddr_t *addr,
716 int *addrlen)
717{
718 struct read_operation *read_op;
719
720 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
721 PJ_CHECK_STACK();
722
Benny Prijono5accbd02006-03-30 16:32:18 +0000723 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000724 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000725 return PJ_ECANCELLED;
726
Benny Prijono9033e312005-11-21 02:08:39 +0000727 read_op = (struct read_operation*)op_key;
Benny Prijonoa1e69682007-05-11 15:14:34 +0000728 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000729
730 /* Try to see if there's data immediately available.
731 */
732 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
733 pj_status_t status;
734 pj_ssize_t size;
735
736 size = *length;
737 status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
738 addr, addrlen);
739 if (status == PJ_SUCCESS) {
740 /* Yes! Data is available! */
741 *length = size;
742 return PJ_SUCCESS;
743 } else {
744 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
745 * the error to caller.
746 */
747 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
748 return status;
749 }
750 }
751
752 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
753
754 /*
755 * No data is immediately available.
756 * Must schedule asynchronous operation to the ioqueue.
757 */
758 read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
759 read_op->buf = buffer;
760 read_op->size = *length;
761 read_op->flags = flags;
762 read_op->rmt_addr = addr;
763 read_op->rmt_addrlen = addrlen;
764
765 pj_mutex_lock(key->mutex);
Benny Prijono25cb51d2009-07-02 08:24:22 +0000766 /* Check again. Handle may have been closed after the previous check
767 * in multithreaded app. If we add bad handle to the set it will
768 * corrupt the ioqueue set. See #913
769 */
770 if (IS_CLOSING(key)) {
771 pj_mutex_unlock(key->mutex);
772 return PJ_ECANCELLED;
773 }
Benny Prijono9033e312005-11-21 02:08:39 +0000774 pj_list_insert_before(&key->read_list, read_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000775 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000776 pj_mutex_unlock(key->mutex);
777
778 return PJ_EPENDING;
779}
780
781/*
782 * pj_ioqueue_send()
783 *
784 * Start asynchronous send() to the descriptor.
785 */
786PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
787 pj_ioqueue_op_key_t *op_key,
788 const void *data,
789 pj_ssize_t *length,
790 unsigned flags)
791{
792 struct write_operation *write_op;
793 pj_status_t status;
Benny Prijono40212582006-06-22 18:41:28 +0000794 unsigned retry;
Benny Prijono9033e312005-11-21 02:08:39 +0000795 pj_ssize_t sent;
796
797 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
798 PJ_CHECK_STACK();
799
Benny Prijono5accbd02006-03-30 16:32:18 +0000800 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000801 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000802 return PJ_ECANCELLED;
803
Benny Prijono9033e312005-11-21 02:08:39 +0000804 /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */
805 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
806
807 /* Fast track:
808 * Try to send data immediately, only if there's no pending write!
809 * Note:
810 * We are speculating that the list is empty here without properly
811 * acquiring ioqueue's mutex first. This is intentional, to maximize
812 * performance via parallelism.
813 *
814 * This should be safe, because:
815 * - by convention, we require caller to make sure that the
816 * key is not unregistered while other threads are invoking
817 * an operation on the same key.
818 * - pj_list_empty() is safe to be invoked by multiple threads,
819 * even when other threads are modifying the list.
820 */
821 if (pj_list_empty(&key->write_list)) {
822 /*
823 * See if data can be sent immediately.
824 */
825 sent = *length;
826 status = pj_sock_send(key->fd, data, &sent, flags);
827 if (status == PJ_SUCCESS) {
828 /* Success! */
829 *length = sent;
830 return PJ_SUCCESS;
831 } else {
832 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
833 * the error to caller.
834 */
835 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
836 return status;
837 }
838 }
839 }
840
841 /*
842 * Schedule asynchronous send.
843 */
Benny Prijono40212582006-06-22 18:41:28 +0000844 write_op = (struct write_operation*)op_key;
845
846 /* Spin if write_op has pending operation */
847 for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
848 pj_thread_sleep(0);
849
850 /* Last chance */
851 if (write_op->op) {
852 /* Unable to send packet because there is already pending write in the
853 * write_op. We could not put the operation into the write_op
854 * because write_op already contains a pending operation! And
855 * we could not send the packet directly with send() either,
856 * because that will break the order of the packet. So we can
857 * only return error here.
858 *
859 * This could happen for example in multithreads program,
860 * where polling is done by one thread, while other threads are doing
861 * the sending only. If the polling thread runs on lower priority
862 * than the sending thread, then it's possible that the pending
863 * write flag is not cleared in-time because clearing is only done
864 * during polling.
865 *
866 * Aplication should specify multiple write operation keys on
867 * situation like this.
868 */
869 //pj_assert(!"ioqueue: there is pending operation on this key!");
870 return PJ_EBUSY;
871 }
872
Benny Prijono9033e312005-11-21 02:08:39 +0000873 write_op->op = PJ_IOQUEUE_OP_SEND;
Benny Prijonoa1e69682007-05-11 15:14:34 +0000874 write_op->buf = (char*)data;
Benny Prijono9033e312005-11-21 02:08:39 +0000875 write_op->size = *length;
876 write_op->written = 0;
877 write_op->flags = flags;
878
879 pj_mutex_lock(key->mutex);
Benny Prijono25cb51d2009-07-02 08:24:22 +0000880 /* Check again. Handle may have been closed after the previous check
881 * in multithreaded app. If we add bad handle to the set it will
882 * corrupt the ioqueue set. See #913
883 */
884 if (IS_CLOSING(key)) {
885 pj_mutex_unlock(key->mutex);
886 return PJ_ECANCELLED;
887 }
Benny Prijono9033e312005-11-21 02:08:39 +0000888 pj_list_insert_before(&key->write_list, write_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000889 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000890 pj_mutex_unlock(key->mutex);
891
892 return PJ_EPENDING;
893}
894
895
896/*
897 * pj_ioqueue_sendto()
898 *
899 * Start asynchronous write() to the descriptor.
900 */
901PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
902 pj_ioqueue_op_key_t *op_key,
903 const void *data,
904 pj_ssize_t *length,
905 pj_uint32_t flags,
906 const pj_sockaddr_t *addr,
907 int addrlen)
908{
909 struct write_operation *write_op;
Benny Prijono40212582006-06-22 18:41:28 +0000910 unsigned retry;
Benny Prijono9033e312005-11-21 02:08:39 +0000911 pj_status_t status;
912 pj_ssize_t sent;
913
914 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
915 PJ_CHECK_STACK();
916
Benny Prijono5accbd02006-03-30 16:32:18 +0000917 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000918 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000919 return PJ_ECANCELLED;
920
Benny Prijono9033e312005-11-21 02:08:39 +0000921 /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */
922 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
923
924 /* Fast track:
925 * Try to send data immediately, only if there's no pending write!
926 * Note:
927 * We are speculating that the list is empty here without properly
928 * acquiring ioqueue's mutex first. This is intentional, to maximize
929 * performance via parallelism.
930 *
931 * This should be safe, because:
932 * - by convention, we require caller to make sure that the
933 * key is not unregistered while other threads are invoking
934 * an operation on the same key.
935 * - pj_list_empty() is safe to be invoked by multiple threads,
936 * even when other threads are modifying the list.
937 */
938 if (pj_list_empty(&key->write_list)) {
939 /*
940 * See if data can be sent immediately.
941 */
942 sent = *length;
943 status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
944 if (status == PJ_SUCCESS) {
945 /* Success! */
946 *length = sent;
947 return PJ_SUCCESS;
948 } else {
949 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
950 * the error to caller.
951 */
952 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
953 return status;
954 }
Benny Prijono40212582006-06-22 18:41:28 +0000955 status = status;
Benny Prijono9033e312005-11-21 02:08:39 +0000956 }
957 }
958
959 /*
960 * Check that address storage can hold the address parameter.
961 */
Benny Prijonoa1e69682007-05-11 15:14:34 +0000962 PJ_ASSERT_RETURN(addrlen <= (int)sizeof(pj_sockaddr_in), PJ_EBUG);
Benny Prijono9033e312005-11-21 02:08:39 +0000963
964 /*
965 * Schedule asynchronous send.
966 */
Benny Prijono40212582006-06-22 18:41:28 +0000967 write_op = (struct write_operation*)op_key;
968
969 /* Spin if write_op has pending operation */
970 for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
971 pj_thread_sleep(0);
972
973 /* Last chance */
974 if (write_op->op) {
975 /* Unable to send packet because there is already pending write on the
976 * write_op. We could not put the operation into the write_op
977 * because write_op already contains a pending operation! And
978 * we could not send the packet directly with sendto() either,
979 * because that will break the order of the packet. So we can
980 * only return error here.
981 *
982 * This could happen for example in multithreads program,
983 * where polling is done by one thread, while other threads are doing
984 * the sending only. If the polling thread runs on lower priority
985 * than the sending thread, then it's possible that the pending
986 * write flag is not cleared in-time because clearing is only done
987 * during polling.
988 *
989 * Aplication should specify multiple write operation keys on
990 * situation like this.
991 */
992 //pj_assert(!"ioqueue: there is pending operation on this key!");
993 return PJ_EBUSY;
994 }
995
Benny Prijono9033e312005-11-21 02:08:39 +0000996 write_op->op = PJ_IOQUEUE_OP_SEND_TO;
Benny Prijonoa1e69682007-05-11 15:14:34 +0000997 write_op->buf = (char*)data;
Benny Prijono9033e312005-11-21 02:08:39 +0000998 write_op->size = *length;
999 write_op->written = 0;
1000 write_op->flags = flags;
1001 pj_memcpy(&write_op->rmt_addr, addr, addrlen);
1002 write_op->rmt_addrlen = addrlen;
1003
1004 pj_mutex_lock(key->mutex);
Benny Prijono25cb51d2009-07-02 08:24:22 +00001005 /* Check again. Handle may have been closed after the previous check
1006 * in multithreaded app. If we add bad handle to the set it will
1007 * corrupt the ioqueue set. See #913
1008 */
1009 if (IS_CLOSING(key)) {
1010 pj_mutex_unlock(key->mutex);
1011 return PJ_ECANCELLED;
1012 }
Benny Prijono9033e312005-11-21 02:08:39 +00001013 pj_list_insert_before(&key->write_list, write_op);
Benny Prijono63ab3562006-07-08 19:46:43 +00001014 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +00001015 pj_mutex_unlock(key->mutex);
1016
1017 return PJ_EPENDING;
1018}
1019
1020#if PJ_HAS_TCP
1021/*
1022 * Initiate overlapped accept() operation.
1023 */
1024PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
1025 pj_ioqueue_op_key_t *op_key,
1026 pj_sock_t *new_sock,
1027 pj_sockaddr_t *local,
1028 pj_sockaddr_t *remote,
1029 int *addrlen)
1030{
1031 struct accept_operation *accept_op;
1032 pj_status_t status;
1033
1034 /* check parameters. All must be specified! */
1035 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
1036
Benny Prijono5accbd02006-03-30 16:32:18 +00001037 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +00001038 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +00001039 return PJ_ECANCELLED;
1040
Benny Prijono9033e312005-11-21 02:08:39 +00001041 accept_op = (struct accept_operation*)op_key;
Benny Prijonoa1e69682007-05-11 15:14:34 +00001042 accept_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +00001043
1044 /* Fast track:
1045 * See if there's new connection available immediately.
1046 */
1047 if (pj_list_empty(&key->accept_list)) {
1048 status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
1049 if (status == PJ_SUCCESS) {
1050 /* Yes! New connection is available! */
1051 if (local && addrlen) {
1052 status = pj_sock_getsockname(*new_sock, local, addrlen);
1053 if (status != PJ_SUCCESS) {
1054 pj_sock_close(*new_sock);
1055 *new_sock = PJ_INVALID_SOCKET;
1056 return status;
1057 }
1058 }
1059 return PJ_SUCCESS;
1060 } else {
1061 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
1062 * the error to caller.
1063 */
1064 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
1065 return status;
1066 }
1067 }
1068 }
1069
1070 /*
1071 * No connection is available immediately.
1072 * Schedule accept() operation to be completed when there is incoming
1073 * connection available.
1074 */
1075 accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
1076 accept_op->accept_fd = new_sock;
1077 accept_op->rmt_addr = remote;
1078 accept_op->addrlen= addrlen;
1079 accept_op->local_addr = local;
1080
1081 pj_mutex_lock(key->mutex);
Benny Prijono25cb51d2009-07-02 08:24:22 +00001082 /* Check again. Handle may have been closed after the previous check
1083 * in multithreaded app. If we add bad handle to the set it will
1084 * corrupt the ioqueue set. See #913
1085 */
1086 if (IS_CLOSING(key)) {
1087 pj_mutex_unlock(key->mutex);
1088 return PJ_ECANCELLED;
1089 }
Benny Prijono9033e312005-11-21 02:08:39 +00001090 pj_list_insert_before(&key->accept_list, accept_op);
Benny Prijono63ab3562006-07-08 19:46:43 +00001091 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +00001092 pj_mutex_unlock(key->mutex);
1093
1094 return PJ_EPENDING;
1095}
1096
1097/*
1098 * Initiate overlapped connect() operation (well, it's non-blocking actually,
1099 * since there's no overlapped version of connect()).
1100 */
1101PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
1102 const pj_sockaddr_t *addr,
1103 int addrlen )
1104{
1105 pj_status_t status;
1106
1107 /* check parameters. All must be specified! */
1108 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
1109
Benny Prijono5accbd02006-03-30 16:32:18 +00001110 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +00001111 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +00001112 return PJ_ECANCELLED;
1113
Benny Prijono9033e312005-11-21 02:08:39 +00001114 /* Check if socket has not been marked for connecting */
1115 if (key->connecting != 0)
1116 return PJ_EPENDING;
1117
1118 status = pj_sock_connect(key->fd, addr, addrlen);
1119 if (status == PJ_SUCCESS) {
1120 /* Connected! */
1121 return PJ_SUCCESS;
1122 } else {
1123 if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
1124 /* Pending! */
1125 pj_mutex_lock(key->mutex);
Benny Prijono25cb51d2009-07-02 08:24:22 +00001126 /* Check again. Handle may have been closed after the previous
1127 * check in multithreaded app. See #913
1128 */
1129 if (IS_CLOSING(key)) {
1130 pj_mutex_unlock(key->mutex);
1131 return PJ_ECANCELLED;
1132 }
Benny Prijono9033e312005-11-21 02:08:39 +00001133 key->connecting = PJ_TRUE;
Benny Prijono63ab3562006-07-08 19:46:43 +00001134 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
1135 ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +00001136 pj_mutex_unlock(key->mutex);
1137 return PJ_EPENDING;
1138 } else {
1139 /* Error! */
1140 return status;
1141 }
1142 }
1143}
1144#endif /* PJ_HAS_TCP */
1145
1146
1147PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1148 pj_size_t size )
1149{
Benny Prijonoac623b32006-07-03 15:19:31 +00001150 pj_bzero(op_key, size);
Benny Prijono9033e312005-11-21 02:08:39 +00001151}
1152
1153
1154/*
1155 * pj_ioqueue_is_pending()
1156 */
1157PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1158 pj_ioqueue_op_key_t *op_key )
1159{
1160 struct generic_operation *op_rec;
1161
1162 PJ_UNUSED_ARG(key);
1163
1164 op_rec = (struct generic_operation*)op_key;
1165 return op_rec->op != 0;
1166}
1167
1168
1169/*
1170 * pj_ioqueue_post_completion()
1171 */
1172PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1173 pj_ioqueue_op_key_t *op_key,
1174 pj_ssize_t bytes_status )
1175{
1176 struct generic_operation *op_rec;
1177
1178 /*
1179 * Find the operation key in all pending operation list to
1180 * really make sure that it's still there; then call the callback.
1181 */
Benny Prijono5accbd02006-03-30 16:32:18 +00001182 pj_mutex_lock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001183
1184 /* Find the operation in the pending read list. */
1185 op_rec = (struct generic_operation*)key->read_list.next;
1186 while (op_rec != (void*)&key->read_list) {
1187 if (op_rec == (void*)op_key) {
1188 pj_list_erase(op_rec);
Benny Prijonoa1e69682007-05-11 15:14:34 +00001189 op_rec->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono5accbd02006-03-30 16:32:18 +00001190 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001191
1192 (*key->cb.on_read_complete)(key, op_key, bytes_status);
1193 return PJ_SUCCESS;
1194 }
1195 op_rec = op_rec->next;
1196 }
1197
1198 /* Find the operation in the pending write list. */
1199 op_rec = (struct generic_operation*)key->write_list.next;
1200 while (op_rec != (void*)&key->write_list) {
1201 if (op_rec == (void*)op_key) {
1202 pj_list_erase(op_rec);
Benny Prijonoa1e69682007-05-11 15:14:34 +00001203 op_rec->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono5accbd02006-03-30 16:32:18 +00001204 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001205
1206 (*key->cb.on_write_complete)(key, op_key, bytes_status);
1207 return PJ_SUCCESS;
1208 }
1209 op_rec = op_rec->next;
1210 }
1211
1212 /* Find the operation in the pending accept list. */
1213 op_rec = (struct generic_operation*)key->accept_list.next;
1214 while (op_rec != (void*)&key->accept_list) {
1215 if (op_rec == (void*)op_key) {
1216 pj_list_erase(op_rec);
Benny Prijonoa1e69682007-05-11 15:14:34 +00001217 op_rec->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono5accbd02006-03-30 16:32:18 +00001218 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001219
1220 (*key->cb.on_accept_complete)(key, op_key,
1221 PJ_INVALID_SOCKET,
1222 bytes_status);
1223 return PJ_SUCCESS;
1224 }
1225 op_rec = op_rec->next;
1226 }
1227
Benny Prijono5accbd02006-03-30 16:32:18 +00001228 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001229
1230 return PJ_EINVALIDOP;
1231}
1232
Benny Prijonoe3f79fd2008-02-13 15:17:28 +00001233PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency( pj_ioqueue_t *ioqueue,
1234 pj_bool_t allow)
1235{
1236 PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
1237 ioqueue->default_concurrency = allow;
1238 return PJ_SUCCESS;
1239}
1240
1241
1242PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
1243 pj_bool_t allow)
1244{
1245 PJ_ASSERT_RETURN(key, PJ_EINVAL);
1246
1247 /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
1248 * disabled.
1249 */
1250 PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
1251
1252 key->allow_concurrent = allow;
1253 return PJ_SUCCESS;
1254}
1255
1256PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
1257{
1258 return pj_mutex_lock(key->mutex);
1259}
1260
1261PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
1262{
1263 return pj_mutex_unlock(key->mutex);
1264}
1265