blob: b7add9a52bfe793ff5490e18f5d64af5ca9484ab [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
Benny Prijono32177c02008-06-20 22:44:47 +00003 * Copyright (C)2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono9033e312005-11-21 02:08:39 +00004 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19
20/*
21 * ioqueue_common_abs.c
22 *
23 * This contains common functionalities to emulate proactor pattern with
24 * various event dispatching mechanisms (e.g. select, epoll).
25 *
26 * This file will be included by the appropriate ioqueue implementation.
27 * This file is NOT supposed to be compiled as stand-alone source.
28 */
29
Benny Prijono40212582006-06-22 18:41:28 +000030#define PENDING_RETRY 2
31
Benny Prijono9033e312005-11-21 02:08:39 +000032static void ioqueue_init( pj_ioqueue_t *ioqueue )
33{
34 ioqueue->lock = NULL;
35 ioqueue->auto_delete_lock = 0;
Benny Prijono8eb763c2008-07-12 10:09:39 +000036 ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
Benny Prijono9033e312005-11-21 02:08:39 +000037}
38
39static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
40{
41 if (ioqueue->auto_delete_lock && ioqueue->lock ) {
42 pj_lock_release(ioqueue->lock);
43 return pj_lock_destroy(ioqueue->lock);
44 } else
45 return PJ_SUCCESS;
46}
47
48/*
49 * pj_ioqueue_set_lock()
50 */
51PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
52 pj_lock_t *lock,
53 pj_bool_t auto_delete )
54{
55 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
56
57 if (ioqueue->auto_delete_lock && ioqueue->lock) {
58 pj_lock_destroy(ioqueue->lock);
59 }
60
61 ioqueue->lock = lock;
62 ioqueue->auto_delete_lock = auto_delete;
63
64 return PJ_SUCCESS;
65}
66
67static pj_status_t ioqueue_init_key( pj_pool_t *pool,
68 pj_ioqueue_t *ioqueue,
69 pj_ioqueue_key_t *key,
70 pj_sock_t sock,
71 void *user_data,
72 const pj_ioqueue_callback *cb)
73{
74 pj_status_t rc;
75 int optlen;
76
Benny Prijono8befd9f2006-05-13 22:46:23 +000077 PJ_UNUSED_ARG(pool);
78
Benny Prijono9033e312005-11-21 02:08:39 +000079 key->ioqueue = ioqueue;
80 key->fd = sock;
81 key->user_data = user_data;
82 pj_list_init(&key->read_list);
83 pj_list_init(&key->write_list);
84#if PJ_HAS_TCP
85 pj_list_init(&key->accept_list);
Benny Prijono5accbd02006-03-30 16:32:18 +000086 key->connecting = 0;
Benny Prijono9033e312005-11-21 02:08:39 +000087#endif
88
89 /* Save callback. */
90 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
91
Benny Prijono5accbd02006-03-30 16:32:18 +000092#if PJ_IOQUEUE_HAS_SAFE_UNREG
93 /* Set initial reference count to 1 */
94 pj_assert(key->ref_count == 0);
95 ++key->ref_count;
96
97 key->closing = 0;
98#endif
99
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000100 rc = pj_ioqueue_set_concurrency(key, ioqueue->default_concurrency);
101 if (rc != PJ_SUCCESS)
102 return rc;
103
Benny Prijono9033e312005-11-21 02:08:39 +0000104 /* Get socket type. When socket type is datagram, some optimization
105 * will be performed during send to allow parallel send operations.
106 */
107 optlen = sizeof(key->fd_type);
Benny Prijono8ab968f2007-07-20 08:08:30 +0000108 rc = pj_sock_getsockopt(sock, pj_SOL_SOCKET(), pj_SO_TYPE(),
Benny Prijono9033e312005-11-21 02:08:39 +0000109 &key->fd_type, &optlen);
110 if (rc != PJ_SUCCESS)
Benny Prijono8ab968f2007-07-20 08:08:30 +0000111 key->fd_type = pj_SOCK_STREAM();
Benny Prijono9033e312005-11-21 02:08:39 +0000112
113 /* Create mutex for the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000114#if !PJ_IOQUEUE_HAS_SAFE_UNREG
115 rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
116#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000117
118 return rc;
119}
120
Benny Prijono9033e312005-11-21 02:08:39 +0000121/*
122 * pj_ioqueue_get_user_data()
123 *
124 * Obtain value associated with a key.
125 */
126PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
127{
128 PJ_ASSERT_RETURN(key != NULL, NULL);
129 return key->user_data;
130}
131
132/*
133 * pj_ioqueue_set_user_data()
134 */
135PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
136 void *user_data,
137 void **old_data)
138{
139 PJ_ASSERT_RETURN(key, PJ_EINVAL);
140
141 if (old_data)
142 *old_data = key->user_data;
143 key->user_data = user_data;
144
145 return PJ_SUCCESS;
146}
147
148PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
149{
150 return !pj_list_empty(&key->write_list);
151}
152
153PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
154{
155 return !pj_list_empty(&key->read_list);
156}
157
158PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
159{
160#if PJ_HAS_TCP
161 return !pj_list_empty(&key->accept_list);
162#else
Benny Prijono3569c0d2007-04-06 10:29:20 +0000163 PJ_UNUSED_ARG(key);
Benny Prijono9033e312005-11-21 02:08:39 +0000164 return 0;
165#endif
166}
167
168PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
169{
170 return key->connecting;
171}
172
173
Benny Prijono5accbd02006-03-30 16:32:18 +0000174#if PJ_IOQUEUE_HAS_SAFE_UNREG
175# define IS_CLOSING(key) (key->closing)
176#else
177# define IS_CLOSING(key) (0)
178#endif
179
180
Benny Prijono9033e312005-11-21 02:08:39 +0000181/*
182 * ioqueue_dispatch_event()
183 *
184 * Report occurence of an event in the key to be processed by the
185 * framework.
186 */
187void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
188{
189 /* Lock the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000190 pj_mutex_lock(h->mutex);
191
Benny Prijono3059eb62006-10-04 20:46:27 +0000192 if (IS_CLOSING(h)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000193 pj_mutex_unlock(h->mutex);
194 return;
195 }
Benny Prijono9033e312005-11-21 02:08:39 +0000196
197#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
198 if (h->connecting) {
199 /* Completion of connect() operation */
200 pj_ssize_t bytes_transfered;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000201 pj_bool_t has_lock;
Benny Prijono9033e312005-11-21 02:08:39 +0000202
203 /* Clear operation. */
204 h->connecting = 0;
205
Benny Prijono63ab3562006-07-08 19:46:43 +0000206 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
207 ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000208
Benny Prijono9033e312005-11-21 02:08:39 +0000209
210#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
211 /* from connect(2):
212 * On Linux, use getsockopt to read the SO_ERROR option at
213 * level SOL_SOCKET to determine whether connect() completed
214 * successfully (if SO_ERROR is zero).
215 */
216 {
217 int value;
Benny Prijono7db431e2006-07-23 14:38:49 +0000218 int vallen = sizeof(value);
Benny Prijono2bbd7102006-07-18 00:10:53 +0000219 int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
220 &value, &vallen);
Benny Prijono9033e312005-11-21 02:08:39 +0000221 if (gs_rc != 0) {
222 /* Argh!! What to do now???
223 * Just indicate that the socket is connected. The
224 * application will get error as soon as it tries to use
225 * the socket to send/receive.
226 */
227 bytes_transfered = 0;
228 } else {
229 bytes_transfered = value;
230 }
231 }
232#elif defined(PJ_WIN32) && PJ_WIN32!=0
233 bytes_transfered = 0; /* success */
234#else
235 /* Excellent information in D.J. Bernstein page:
236 * http://cr.yp.to/docs/connect.html
237 *
238 * Seems like the most portable way of detecting connect()
239 * failure is to call getpeername(). If socket is connected,
240 * getpeername() will return 0. If the socket is not connected,
241 * it will return ENOTCONN, and read(fd, &ch, 1) will produce
242 * the right errno through error slippage. This is a combination
243 * of suggestions from Douglas C. Schmidt and Ken Keys.
244 */
Benny Prijono9cf138e2006-01-19 03:58:29 +0000245 {
246 int gp_rc;
247 struct sockaddr_in addr;
248 socklen_t addrlen = sizeof(addr);
Benny Prijono9033e312005-11-21 02:08:39 +0000249
Benny Prijono9cf138e2006-01-19 03:58:29 +0000250 gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000251 bytes_transfered = (gp_rc < 0) ? gp_rc : -gp_rc;
Benny Prijono9cf138e2006-01-19 03:58:29 +0000252 }
Benny Prijono9033e312005-11-21 02:08:39 +0000253#endif
254
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000255 /* Unlock; from this point we don't need to hold key's mutex
256 * (unless concurrency is disabled, which in this case we should
257 * hold the mutex while calling the callback) */
258 if (h->allow_concurrent) {
259 /* concurrency may be changed while we're in the callback, so
260 * save it to a flag.
261 */
262 has_lock = PJ_FALSE;
263 pj_mutex_unlock(h->mutex);
264 } else {
265 has_lock = PJ_TRUE;
266 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000267
Benny Prijono9033e312005-11-21 02:08:39 +0000268 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000269 if (h->cb.on_connect_complete && !IS_CLOSING(h))
Benny Prijono9033e312005-11-21 02:08:39 +0000270 (*h->cb.on_connect_complete)(h, bytes_transfered);
271
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000272 /* Unlock if we still hold the lock */
273 if (has_lock) {
274 pj_mutex_unlock(h->mutex);
275 }
276
Benny Prijono9033e312005-11-21 02:08:39 +0000277 /* Done. */
278
279 } else
280#endif /* PJ_HAS_TCP */
281 if (key_has_pending_write(h)) {
282 /* Socket is writable. */
283 struct write_operation *write_op;
284 pj_ssize_t sent;
285 pj_status_t send_rc;
286
287 /* Get the first in the queue. */
288 write_op = h->write_list.next;
289
290 /* For datagrams, we can remove the write_op from the list
291 * so that send() can work in parallel.
292 */
Benny Prijono8ab968f2007-07-20 08:08:30 +0000293 if (h->fd_type == pj_SOCK_DGRAM()) {
Benny Prijono9033e312005-11-21 02:08:39 +0000294 pj_list_erase(write_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000295
296 if (pj_list_empty(&h->write_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000297 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000298
Benny Prijono9033e312005-11-21 02:08:39 +0000299 }
300
301 /* Send the data.
302 * Unfortunately we must do this while holding key's mutex, thus
303 * preventing parallel write on a single key.. :-((
304 */
305 sent = write_op->size - write_op->written;
306 if (write_op->op == PJ_IOQUEUE_OP_SEND) {
307 send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
308 &sent, write_op->flags);
Benny Prijono40212582006-06-22 18:41:28 +0000309 /* Can't do this. We only clear "op" after we're finished sending
310 * the whole buffer.
311 */
312 //write_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000313 } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
314 send_rc = pj_sock_sendto(h->fd,
315 write_op->buf+write_op->written,
316 &sent, write_op->flags,
317 &write_op->rmt_addr,
318 write_op->rmt_addrlen);
Benny Prijono40212582006-06-22 18:41:28 +0000319 /* Can't do this. We only clear "op" after we're finished sending
320 * the whole buffer.
321 */
322 //write_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000323 } else {
324 pj_assert(!"Invalid operation type!");
Benny Prijonoa1e69682007-05-11 15:14:34 +0000325 write_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000326 send_rc = PJ_EBUG;
327 }
328
329 if (send_rc == PJ_SUCCESS) {
330 write_op->written += sent;
331 } else {
332 pj_assert(send_rc > 0);
333 write_op->written = -send_rc;
334 }
335
336 /* Are we finished with this buffer? */
337 if (send_rc!=PJ_SUCCESS ||
338 write_op->written == (pj_ssize_t)write_op->size ||
Benny Prijono8ab968f2007-07-20 08:08:30 +0000339 h->fd_type == pj_SOCK_DGRAM())
Benny Prijono9033e312005-11-21 02:08:39 +0000340 {
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000341 pj_bool_t has_lock;
Benny Prijono40212582006-06-22 18:41:28 +0000342
Benny Prijonoa1e69682007-05-11 15:14:34 +0000343 write_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono40212582006-06-22 18:41:28 +0000344
Benny Prijono8ab968f2007-07-20 08:08:30 +0000345 if (h->fd_type != pj_SOCK_DGRAM()) {
Benny Prijono9033e312005-11-21 02:08:39 +0000346 /* Write completion of the whole stream. */
347 pj_list_erase(write_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000348
349 /* Clear operation if there's no more data to send. */
350 if (pj_list_empty(&h->write_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000351 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000352
Benny Prijono9033e312005-11-21 02:08:39 +0000353 }
354
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000355 /* Unlock; from this point we don't need to hold key's mutex
356 * (unless concurrency is disabled, which in this case we should
357 * hold the mutex while calling the callback) */
358 if (h->allow_concurrent) {
359 /* concurrency may be changed while we're in the callback, so
360 * save it to a flag.
361 */
362 has_lock = PJ_FALSE;
363 pj_mutex_unlock(h->mutex);
364 } else {
365 has_lock = PJ_TRUE;
366 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000367
Benny Prijono9033e312005-11-21 02:08:39 +0000368 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000369 if (h->cb.on_write_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000370 (*h->cb.on_write_complete)(h,
371 (pj_ioqueue_op_key_t*)write_op,
372 write_op->written);
373 }
374
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000375 if (has_lock) {
376 pj_mutex_unlock(h->mutex);
377 }
378
Benny Prijono9033e312005-11-21 02:08:39 +0000379 } else {
Benny Prijono5accbd02006-03-30 16:32:18 +0000380 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000381 }
382
383 /* Done. */
384 } else {
385 /*
386 * This is normal; execution may fall here when multiple threads
387 * are signalled for the same event, but only one thread eventually
388 * able to process the event.
389 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000390 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000391 }
392}
393
394void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
395{
396 pj_status_t rc;
397
398 /* Lock the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000399 pj_mutex_lock(h->mutex);
400
Benny Prijono3059eb62006-10-04 20:46:27 +0000401 if (IS_CLOSING(h)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000402 pj_mutex_unlock(h->mutex);
403 return;
404 }
Benny Prijono9033e312005-11-21 02:08:39 +0000405
406# if PJ_HAS_TCP
407 if (!pj_list_empty(&h->accept_list)) {
408
409 struct accept_operation *accept_op;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000410 pj_bool_t has_lock;
Benny Prijono9033e312005-11-21 02:08:39 +0000411
412 /* Get one accept operation from the list. */
413 accept_op = h->accept_list.next;
414 pj_list_erase(accept_op);
Benny Prijonoa1e69682007-05-11 15:14:34 +0000415 accept_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000416
417 /* Clear bit in fdset if there is no more pending accept */
418 if (pj_list_empty(&h->accept_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000419 ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000420
Benny Prijono9033e312005-11-21 02:08:39 +0000421 rc=pj_sock_accept(h->fd, accept_op->accept_fd,
422 accept_op->rmt_addr, accept_op->addrlen);
423 if (rc==PJ_SUCCESS && accept_op->local_addr) {
424 rc = pj_sock_getsockname(*accept_op->accept_fd,
425 accept_op->local_addr,
426 accept_op->addrlen);
427 }
428
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000429 /* Unlock; from this point we don't need to hold key's mutex
430 * (unless concurrency is disabled, which in this case we should
431 * hold the mutex while calling the callback) */
432 if (h->allow_concurrent) {
433 /* concurrency may be changed while we're in the callback, so
434 * save it to a flag.
435 */
436 has_lock = PJ_FALSE;
437 pj_mutex_unlock(h->mutex);
438 } else {
439 has_lock = PJ_TRUE;
440 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000441
Benny Prijono9033e312005-11-21 02:08:39 +0000442 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000443 if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000444 (*h->cb.on_accept_complete)(h,
445 (pj_ioqueue_op_key_t*)accept_op,
446 *accept_op->accept_fd, rc);
447 }
448
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000449 if (has_lock) {
450 pj_mutex_unlock(h->mutex);
451 }
Benny Prijono9033e312005-11-21 02:08:39 +0000452 }
453 else
454# endif
455 if (key_has_pending_read(h)) {
456 struct read_operation *read_op;
457 pj_ssize_t bytes_read;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000458 pj_bool_t has_lock;
Benny Prijono9033e312005-11-21 02:08:39 +0000459
460 /* Get one pending read operation from the list. */
461 read_op = h->read_list.next;
462 pj_list_erase(read_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000463
464 /* Clear fdset if there is no pending read. */
465 if (pj_list_empty(&h->read_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000466 ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000467
Benny Prijono9033e312005-11-21 02:08:39 +0000468 bytes_read = read_op->size;
469
470 if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
Benny Prijonoa1e69682007-05-11 15:14:34 +0000471 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijonof6b95302006-12-25 06:36:23 +0000472 rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read,
473 read_op->flags,
Benny Prijono9033e312005-11-21 02:08:39 +0000474 read_op->rmt_addr,
475 read_op->rmt_addrlen);
476 } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
Benny Prijonoa1e69682007-05-11 15:14:34 +0000477 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijonof6b95302006-12-25 06:36:23 +0000478 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
479 read_op->flags);
Benny Prijono9033e312005-11-21 02:08:39 +0000480 } else {
481 pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
Benny Prijonoa1e69682007-05-11 15:14:34 +0000482 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000483 /*
484 * User has specified pj_ioqueue_read().
485 * On Win32, we should do ReadFile(). But because we got
486 * here because of select() anyway, user must have put a
487 * socket descriptor on h->fd, which in this case we can
488 * just call pj_sock_recv() instead of ReadFile().
489 * On Unix, user may put a file in h->fd, so we'll have
490 * to call read() here.
491 * This may not compile on systems which doesn't have
492 * read(). That's why we only specify PJ_LINUX here so
493 * that error is easier to catch.
494 */
Benny Prijono9cf138e2006-01-19 03:58:29 +0000495# if defined(PJ_WIN32) && PJ_WIN32 != 0 || \
496 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE != 0
Benny Prijonof6b95302006-12-25 06:36:23 +0000497 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
498 read_op->flags);
Benny Prijono9033e312005-11-21 02:08:39 +0000499 //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
500 // &bytes_read, NULL);
501# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
502 bytes_read = read(h->fd, read_op->buf, bytes_read);
503 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
504# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
505 bytes_read = sys_read(h->fd, read_op->buf, bytes_read);
506 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
507# else
508# error "Implement read() for this platform!"
509# endif
510 }
511
512 if (rc != PJ_SUCCESS) {
513# if defined(PJ_WIN32) && PJ_WIN32 != 0
514 /* On Win32, for UDP, WSAECONNRESET on the receive side
515 * indicates that previous sending has triggered ICMP Port
516 * Unreachable message.
517 * But we wouldn't know at this point which one of previous
518 * key that has triggered the error, since UDP socket can
519 * be shared!
520 * So we'll just ignore it!
521 */
522
523 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
524 //PJ_LOG(4,(THIS_FILE,
525 // "Ignored ICMP port unreach. on key=%p", h));
526 }
527# endif
528
529 /* In any case we would report this to caller. */
530 bytes_read = -rc;
531 }
532
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000533 /* Unlock; from this point we don't need to hold key's mutex
534 * (unless concurrency is disabled, which in this case we should
535 * hold the mutex while calling the callback) */
536 if (h->allow_concurrent) {
537 /* concurrency may be changed while we're in the callback, so
538 * save it to a flag.
539 */
540 has_lock = PJ_FALSE;
541 pj_mutex_unlock(h->mutex);
542 } else {
543 has_lock = PJ_TRUE;
544 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000545
Benny Prijono9033e312005-11-21 02:08:39 +0000546 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000547 if (h->cb.on_read_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000548 (*h->cb.on_read_complete)(h,
549 (pj_ioqueue_op_key_t*)read_op,
550 bytes_read);
551 }
552
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000553 if (has_lock) {
554 pj_mutex_unlock(h->mutex);
555 }
556
Benny Prijono9033e312005-11-21 02:08:39 +0000557 } else {
558 /*
559 * This is normal; execution may fall here when multiple threads
560 * are signalled for the same event, but only one thread eventually
561 * able to process the event.
562 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000563 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000564 }
565}
566
567
568void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
569 pj_ioqueue_key_t *h )
570{
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000571 pj_bool_t has_lock;
572
Benny Prijono5accbd02006-03-30 16:32:18 +0000573 pj_mutex_lock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000574
575 if (!h->connecting) {
576 /* It is possible that more than one thread was woken up, thus
577 * the remaining thread will see h->connecting as zero because
578 * it has been processed by other thread.
579 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000580 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000581 return;
582 }
583
Benny Prijono3059eb62006-10-04 20:46:27 +0000584 if (IS_CLOSING(h)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000585 pj_mutex_unlock(h->mutex);
586 return;
587 }
588
Benny Prijono9033e312005-11-21 02:08:39 +0000589 /* Clear operation. */
590 h->connecting = 0;
591
Benny Prijono63ab3562006-07-08 19:46:43 +0000592 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
593 ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000594
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000595 /* Unlock; from this point we don't need to hold key's mutex
596 * (unless concurrency is disabled, which in this case we should
597 * hold the mutex while calling the callback) */
598 if (h->allow_concurrent) {
599 /* concurrency may be changed while we're in the callback, so
600 * save it to a flag.
601 */
602 has_lock = PJ_FALSE;
603 pj_mutex_unlock(h->mutex);
604 } else {
605 has_lock = PJ_TRUE;
606 }
Benny Prijonoac52df42006-03-25 10:06:00 +0000607
Benny Prijono5accbd02006-03-30 16:32:18 +0000608 /* Call callback. */
Benny Prijono2bbd7102006-07-18 00:10:53 +0000609 if (h->cb.on_connect_complete && !IS_CLOSING(h)) {
610 pj_status_t status = -1;
611#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
612 int value;
Benny Prijono7db431e2006-07-23 14:38:49 +0000613 int vallen = sizeof(value);
Benny Prijono2bbd7102006-07-18 00:10:53 +0000614 int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
615 &value, &vallen);
616 if (gs_rc == 0) {
617 status = PJ_RETURN_OS_ERROR(value);
618 }
619#endif
620
621 (*h->cb.on_connect_complete)(h, status);
622 }
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000623
624 if (has_lock) {
625 pj_mutex_unlock(h->mutex);
626 }
Benny Prijono9033e312005-11-21 02:08:39 +0000627}
628
629/*
630 * pj_ioqueue_recv()
631 *
632 * Start asynchronous recv() from the socket.
633 */
634PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
635 pj_ioqueue_op_key_t *op_key,
636 void *buffer,
637 pj_ssize_t *length,
638 unsigned flags )
639{
640 struct read_operation *read_op;
641
642 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
643 PJ_CHECK_STACK();
644
Benny Prijono8ac081b2008-02-14 14:20:38 +0000645 /* Check if key is closing (need to do this first before accessing
646 * other variables, since they might have been destroyed. See ticket
647 * #469).
648 */
Benny Prijono3059eb62006-10-04 20:46:27 +0000649 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000650 return PJ_ECANCELLED;
651
Benny Prijono8ac081b2008-02-14 14:20:38 +0000652 read_op = (struct read_operation*)op_key;
653 read_op->op = PJ_IOQUEUE_OP_NONE;
654
Benny Prijono9033e312005-11-21 02:08:39 +0000655 /* Try to see if there's data immediately available.
656 */
657 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
658 pj_status_t status;
659 pj_ssize_t size;
660
661 size = *length;
662 status = pj_sock_recv(key->fd, buffer, &size, flags);
663 if (status == PJ_SUCCESS) {
664 /* Yes! Data is available! */
665 *length = size;
666 return PJ_SUCCESS;
667 } else {
668 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
669 * the error to caller.
670 */
671 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
672 return status;
673 }
674 }
675
676 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
677
678 /*
679 * No data is immediately available.
680 * Must schedule asynchronous operation to the ioqueue.
681 */
682 read_op->op = PJ_IOQUEUE_OP_RECV;
683 read_op->buf = buffer;
684 read_op->size = *length;
685 read_op->flags = flags;
686
687 pj_mutex_lock(key->mutex);
688 pj_list_insert_before(&key->read_list, read_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000689 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000690 pj_mutex_unlock(key->mutex);
691
692 return PJ_EPENDING;
693}
694
695/*
696 * pj_ioqueue_recvfrom()
697 *
698 * Start asynchronous recvfrom() from the socket.
699 */
700PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
701 pj_ioqueue_op_key_t *op_key,
702 void *buffer,
703 pj_ssize_t *length,
704 unsigned flags,
705 pj_sockaddr_t *addr,
706 int *addrlen)
707{
708 struct read_operation *read_op;
709
710 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
711 PJ_CHECK_STACK();
712
Benny Prijono5accbd02006-03-30 16:32:18 +0000713 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000714 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000715 return PJ_ECANCELLED;
716
Benny Prijono9033e312005-11-21 02:08:39 +0000717 read_op = (struct read_operation*)op_key;
Benny Prijonoa1e69682007-05-11 15:14:34 +0000718 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000719
720 /* Try to see if there's data immediately available.
721 */
722 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
723 pj_status_t status;
724 pj_ssize_t size;
725
726 size = *length;
727 status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
728 addr, addrlen);
729 if (status == PJ_SUCCESS) {
730 /* Yes! Data is available! */
731 *length = size;
732 return PJ_SUCCESS;
733 } else {
734 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
735 * the error to caller.
736 */
737 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
738 return status;
739 }
740 }
741
742 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
743
744 /*
745 * No data is immediately available.
746 * Must schedule asynchronous operation to the ioqueue.
747 */
748 read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
749 read_op->buf = buffer;
750 read_op->size = *length;
751 read_op->flags = flags;
752 read_op->rmt_addr = addr;
753 read_op->rmt_addrlen = addrlen;
754
755 pj_mutex_lock(key->mutex);
756 pj_list_insert_before(&key->read_list, read_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000757 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000758 pj_mutex_unlock(key->mutex);
759
760 return PJ_EPENDING;
761}
762
763/*
764 * pj_ioqueue_send()
765 *
766 * Start asynchronous send() to the descriptor.
767 */
768PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
769 pj_ioqueue_op_key_t *op_key,
770 const void *data,
771 pj_ssize_t *length,
772 unsigned flags)
773{
774 struct write_operation *write_op;
775 pj_status_t status;
Benny Prijono40212582006-06-22 18:41:28 +0000776 unsigned retry;
Benny Prijono9033e312005-11-21 02:08:39 +0000777 pj_ssize_t sent;
778
779 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
780 PJ_CHECK_STACK();
781
Benny Prijono5accbd02006-03-30 16:32:18 +0000782 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000783 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000784 return PJ_ECANCELLED;
785
Benny Prijono9033e312005-11-21 02:08:39 +0000786 /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */
787 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
788
789 /* Fast track:
790 * Try to send data immediately, only if there's no pending write!
791 * Note:
792 * We are speculating that the list is empty here without properly
793 * acquiring ioqueue's mutex first. This is intentional, to maximize
794 * performance via parallelism.
795 *
796 * This should be safe, because:
797 * - by convention, we require caller to make sure that the
798 * key is not unregistered while other threads are invoking
799 * an operation on the same key.
800 * - pj_list_empty() is safe to be invoked by multiple threads,
801 * even when other threads are modifying the list.
802 */
803 if (pj_list_empty(&key->write_list)) {
804 /*
805 * See if data can be sent immediately.
806 */
807 sent = *length;
808 status = pj_sock_send(key->fd, data, &sent, flags);
809 if (status == PJ_SUCCESS) {
810 /* Success! */
811 *length = sent;
812 return PJ_SUCCESS;
813 } else {
814 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
815 * the error to caller.
816 */
817 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
818 return status;
819 }
820 }
821 }
822
823 /*
824 * Schedule asynchronous send.
825 */
Benny Prijono40212582006-06-22 18:41:28 +0000826 write_op = (struct write_operation*)op_key;
827
828 /* Spin if write_op has pending operation */
829 for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
830 pj_thread_sleep(0);
831
832 /* Last chance */
833 if (write_op->op) {
834 /* Unable to send packet because there is already pending write in the
835 * write_op. We could not put the operation into the write_op
836 * because write_op already contains a pending operation! And
837 * we could not send the packet directly with send() either,
838 * because that will break the order of the packet. So we can
839 * only return error here.
840 *
841 * This could happen for example in multithreads program,
842 * where polling is done by one thread, while other threads are doing
843 * the sending only. If the polling thread runs on lower priority
844 * than the sending thread, then it's possible that the pending
845 * write flag is not cleared in-time because clearing is only done
846 * during polling.
847 *
848 * Aplication should specify multiple write operation keys on
849 * situation like this.
850 */
851 //pj_assert(!"ioqueue: there is pending operation on this key!");
852 return PJ_EBUSY;
853 }
854
Benny Prijono9033e312005-11-21 02:08:39 +0000855 write_op->op = PJ_IOQUEUE_OP_SEND;
Benny Prijonoa1e69682007-05-11 15:14:34 +0000856 write_op->buf = (char*)data;
Benny Prijono9033e312005-11-21 02:08:39 +0000857 write_op->size = *length;
858 write_op->written = 0;
859 write_op->flags = flags;
860
861 pj_mutex_lock(key->mutex);
862 pj_list_insert_before(&key->write_list, write_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000863 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000864 pj_mutex_unlock(key->mutex);
865
866 return PJ_EPENDING;
867}
868
869
870/*
871 * pj_ioqueue_sendto()
872 *
873 * Start asynchronous write() to the descriptor.
874 */
875PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
876 pj_ioqueue_op_key_t *op_key,
877 const void *data,
878 pj_ssize_t *length,
879 pj_uint32_t flags,
880 const pj_sockaddr_t *addr,
881 int addrlen)
882{
883 struct write_operation *write_op;
Benny Prijono40212582006-06-22 18:41:28 +0000884 unsigned retry;
Benny Prijono9033e312005-11-21 02:08:39 +0000885 pj_status_t status;
886 pj_ssize_t sent;
887
888 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
889 PJ_CHECK_STACK();
890
Benny Prijono5accbd02006-03-30 16:32:18 +0000891 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000892 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000893 return PJ_ECANCELLED;
894
Benny Prijono9033e312005-11-21 02:08:39 +0000895 /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */
896 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
897
898 /* Fast track:
899 * Try to send data immediately, only if there's no pending write!
900 * Note:
901 * We are speculating that the list is empty here without properly
902 * acquiring ioqueue's mutex first. This is intentional, to maximize
903 * performance via parallelism.
904 *
905 * This should be safe, because:
906 * - by convention, we require caller to make sure that the
907 * key is not unregistered while other threads are invoking
908 * an operation on the same key.
909 * - pj_list_empty() is safe to be invoked by multiple threads,
910 * even when other threads are modifying the list.
911 */
912 if (pj_list_empty(&key->write_list)) {
913 /*
914 * See if data can be sent immediately.
915 */
916 sent = *length;
917 status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
918 if (status == PJ_SUCCESS) {
919 /* Success! */
920 *length = sent;
921 return PJ_SUCCESS;
922 } else {
923 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
924 * the error to caller.
925 */
926 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
927 return status;
928 }
Benny Prijono40212582006-06-22 18:41:28 +0000929 status = status;
Benny Prijono9033e312005-11-21 02:08:39 +0000930 }
931 }
932
933 /*
934 * Check that address storage can hold the address parameter.
935 */
Benny Prijonoa1e69682007-05-11 15:14:34 +0000936 PJ_ASSERT_RETURN(addrlen <= (int)sizeof(pj_sockaddr_in), PJ_EBUG);
Benny Prijono9033e312005-11-21 02:08:39 +0000937
938 /*
939 * Schedule asynchronous send.
940 */
Benny Prijono40212582006-06-22 18:41:28 +0000941 write_op = (struct write_operation*)op_key;
942
943 /* Spin if write_op has pending operation */
944 for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
945 pj_thread_sleep(0);
946
947 /* Last chance */
948 if (write_op->op) {
949 /* Unable to send packet because there is already pending write on the
950 * write_op. We could not put the operation into the write_op
951 * because write_op already contains a pending operation! And
952 * we could not send the packet directly with sendto() either,
953 * because that will break the order of the packet. So we can
954 * only return error here.
955 *
956 * This could happen for example in multithreads program,
957 * where polling is done by one thread, while other threads are doing
958 * the sending only. If the polling thread runs on lower priority
959 * than the sending thread, then it's possible that the pending
960 * write flag is not cleared in-time because clearing is only done
961 * during polling.
962 *
963 * Aplication should specify multiple write operation keys on
964 * situation like this.
965 */
966 //pj_assert(!"ioqueue: there is pending operation on this key!");
967 return PJ_EBUSY;
968 }
969
Benny Prijono9033e312005-11-21 02:08:39 +0000970 write_op->op = PJ_IOQUEUE_OP_SEND_TO;
Benny Prijonoa1e69682007-05-11 15:14:34 +0000971 write_op->buf = (char*)data;
Benny Prijono9033e312005-11-21 02:08:39 +0000972 write_op->size = *length;
973 write_op->written = 0;
974 write_op->flags = flags;
975 pj_memcpy(&write_op->rmt_addr, addr, addrlen);
976 write_op->rmt_addrlen = addrlen;
977
978 pj_mutex_lock(key->mutex);
979 pj_list_insert_before(&key->write_list, write_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000980 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000981 pj_mutex_unlock(key->mutex);
982
983 return PJ_EPENDING;
984}
985
986#if PJ_HAS_TCP
987/*
988 * Initiate overlapped accept() operation.
989 */
990PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
991 pj_ioqueue_op_key_t *op_key,
992 pj_sock_t *new_sock,
993 pj_sockaddr_t *local,
994 pj_sockaddr_t *remote,
995 int *addrlen)
996{
997 struct accept_operation *accept_op;
998 pj_status_t status;
999
1000 /* check parameters. All must be specified! */
1001 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
1002
Benny Prijono5accbd02006-03-30 16:32:18 +00001003 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +00001004 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +00001005 return PJ_ECANCELLED;
1006
Benny Prijono9033e312005-11-21 02:08:39 +00001007 accept_op = (struct accept_operation*)op_key;
Benny Prijonoa1e69682007-05-11 15:14:34 +00001008 accept_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +00001009
1010 /* Fast track:
1011 * See if there's new connection available immediately.
1012 */
1013 if (pj_list_empty(&key->accept_list)) {
1014 status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
1015 if (status == PJ_SUCCESS) {
1016 /* Yes! New connection is available! */
1017 if (local && addrlen) {
1018 status = pj_sock_getsockname(*new_sock, local, addrlen);
1019 if (status != PJ_SUCCESS) {
1020 pj_sock_close(*new_sock);
1021 *new_sock = PJ_INVALID_SOCKET;
1022 return status;
1023 }
1024 }
1025 return PJ_SUCCESS;
1026 } else {
1027 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
1028 * the error to caller.
1029 */
1030 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
1031 return status;
1032 }
1033 }
1034 }
1035
1036 /*
1037 * No connection is available immediately.
1038 * Schedule accept() operation to be completed when there is incoming
1039 * connection available.
1040 */
1041 accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
1042 accept_op->accept_fd = new_sock;
1043 accept_op->rmt_addr = remote;
1044 accept_op->addrlen= addrlen;
1045 accept_op->local_addr = local;
1046
1047 pj_mutex_lock(key->mutex);
1048 pj_list_insert_before(&key->accept_list, accept_op);
Benny Prijono63ab3562006-07-08 19:46:43 +00001049 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +00001050 pj_mutex_unlock(key->mutex);
1051
1052 return PJ_EPENDING;
1053}
1054
1055/*
1056 * Initiate overlapped connect() operation (well, it's non-blocking actually,
1057 * since there's no overlapped version of connect()).
1058 */
1059PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
1060 const pj_sockaddr_t *addr,
1061 int addrlen )
1062{
1063 pj_status_t status;
1064
1065 /* check parameters. All must be specified! */
1066 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
1067
Benny Prijono5accbd02006-03-30 16:32:18 +00001068 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +00001069 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +00001070 return PJ_ECANCELLED;
1071
Benny Prijono9033e312005-11-21 02:08:39 +00001072 /* Check if socket has not been marked for connecting */
1073 if (key->connecting != 0)
1074 return PJ_EPENDING;
1075
1076 status = pj_sock_connect(key->fd, addr, addrlen);
1077 if (status == PJ_SUCCESS) {
1078 /* Connected! */
1079 return PJ_SUCCESS;
1080 } else {
1081 if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
1082 /* Pending! */
1083 pj_mutex_lock(key->mutex);
1084 key->connecting = PJ_TRUE;
Benny Prijono63ab3562006-07-08 19:46:43 +00001085 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
1086 ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +00001087 pj_mutex_unlock(key->mutex);
1088 return PJ_EPENDING;
1089 } else {
1090 /* Error! */
1091 return status;
1092 }
1093 }
1094}
1095#endif /* PJ_HAS_TCP */
1096
1097
1098PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1099 pj_size_t size )
1100{
Benny Prijonoac623b32006-07-03 15:19:31 +00001101 pj_bzero(op_key, size);
Benny Prijono9033e312005-11-21 02:08:39 +00001102}
1103
1104
1105/*
1106 * pj_ioqueue_is_pending()
1107 */
1108PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1109 pj_ioqueue_op_key_t *op_key )
1110{
1111 struct generic_operation *op_rec;
1112
1113 PJ_UNUSED_ARG(key);
1114
1115 op_rec = (struct generic_operation*)op_key;
1116 return op_rec->op != 0;
1117}
1118
1119
1120/*
1121 * pj_ioqueue_post_completion()
1122 */
1123PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1124 pj_ioqueue_op_key_t *op_key,
1125 pj_ssize_t bytes_status )
1126{
1127 struct generic_operation *op_rec;
1128
1129 /*
1130 * Find the operation key in all pending operation list to
1131 * really make sure that it's still there; then call the callback.
1132 */
Benny Prijono5accbd02006-03-30 16:32:18 +00001133 pj_mutex_lock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001134
1135 /* Find the operation in the pending read list. */
1136 op_rec = (struct generic_operation*)key->read_list.next;
1137 while (op_rec != (void*)&key->read_list) {
1138 if (op_rec == (void*)op_key) {
1139 pj_list_erase(op_rec);
Benny Prijonoa1e69682007-05-11 15:14:34 +00001140 op_rec->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono5accbd02006-03-30 16:32:18 +00001141 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001142
1143 (*key->cb.on_read_complete)(key, op_key, bytes_status);
1144 return PJ_SUCCESS;
1145 }
1146 op_rec = op_rec->next;
1147 }
1148
1149 /* Find the operation in the pending write list. */
1150 op_rec = (struct generic_operation*)key->write_list.next;
1151 while (op_rec != (void*)&key->write_list) {
1152 if (op_rec == (void*)op_key) {
1153 pj_list_erase(op_rec);
Benny Prijonoa1e69682007-05-11 15:14:34 +00001154 op_rec->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono5accbd02006-03-30 16:32:18 +00001155 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001156
1157 (*key->cb.on_write_complete)(key, op_key, bytes_status);
1158 return PJ_SUCCESS;
1159 }
1160 op_rec = op_rec->next;
1161 }
1162
1163 /* Find the operation in the pending accept list. */
1164 op_rec = (struct generic_operation*)key->accept_list.next;
1165 while (op_rec != (void*)&key->accept_list) {
1166 if (op_rec == (void*)op_key) {
1167 pj_list_erase(op_rec);
Benny Prijonoa1e69682007-05-11 15:14:34 +00001168 op_rec->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono5accbd02006-03-30 16:32:18 +00001169 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001170
1171 (*key->cb.on_accept_complete)(key, op_key,
1172 PJ_INVALID_SOCKET,
1173 bytes_status);
1174 return PJ_SUCCESS;
1175 }
1176 op_rec = op_rec->next;
1177 }
1178
Benny Prijono5accbd02006-03-30 16:32:18 +00001179 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001180
1181 return PJ_EINVALIDOP;
1182}
1183
Benny Prijonoe3f79fd2008-02-13 15:17:28 +00001184PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency( pj_ioqueue_t *ioqueue,
1185 pj_bool_t allow)
1186{
1187 PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
1188 ioqueue->default_concurrency = allow;
1189 return PJ_SUCCESS;
1190}
1191
1192
1193PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
1194 pj_bool_t allow)
1195{
1196 PJ_ASSERT_RETURN(key, PJ_EINVAL);
1197
1198 /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
1199 * disabled.
1200 */
1201 PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
1202
1203 key->allow_concurrent = allow;
1204 return PJ_SUCCESS;
1205}
1206
1207PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
1208{
1209 return pj_mutex_lock(key->mutex);
1210}
1211
1212PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
1213{
1214 return pj_mutex_unlock(key->mutex);
1215}
1216