blob: 6e2f34ca5efd3f61ab5ff885202b680a7253b120 [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
Benny Prijonoa771a512007-02-19 01:13:53 +00003 * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org>
Benny Prijono9033e312005-11-21 02:08:39 +00004 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19
20/*
21 * ioqueue_common_abs.c
22 *
23 * This contains common functionalities to emulate proactor pattern with
24 * various event dispatching mechanisms (e.g. select, epoll).
25 *
26 * This file will be included by the appropriate ioqueue implementation.
27 * This file is NOT supposed to be compiled as stand-alone source.
28 */
29
Benny Prijono40212582006-06-22 18:41:28 +000030#define PENDING_RETRY 2
31
Benny Prijono9033e312005-11-21 02:08:39 +000032static void ioqueue_init( pj_ioqueue_t *ioqueue )
33{
34 ioqueue->lock = NULL;
35 ioqueue->auto_delete_lock = 0;
36}
37
38static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
39{
40 if (ioqueue->auto_delete_lock && ioqueue->lock ) {
41 pj_lock_release(ioqueue->lock);
42 return pj_lock_destroy(ioqueue->lock);
43 } else
44 return PJ_SUCCESS;
45}
46
47/*
48 * pj_ioqueue_set_lock()
49 */
50PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
51 pj_lock_t *lock,
52 pj_bool_t auto_delete )
53{
54 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
55
56 if (ioqueue->auto_delete_lock && ioqueue->lock) {
57 pj_lock_destroy(ioqueue->lock);
58 }
59
60 ioqueue->lock = lock;
61 ioqueue->auto_delete_lock = auto_delete;
62
63 return PJ_SUCCESS;
64}
65
66static pj_status_t ioqueue_init_key( pj_pool_t *pool,
67 pj_ioqueue_t *ioqueue,
68 pj_ioqueue_key_t *key,
69 pj_sock_t sock,
70 void *user_data,
71 const pj_ioqueue_callback *cb)
72{
73 pj_status_t rc;
74 int optlen;
75
Benny Prijono8befd9f2006-05-13 22:46:23 +000076 PJ_UNUSED_ARG(pool);
77
Benny Prijono9033e312005-11-21 02:08:39 +000078 key->ioqueue = ioqueue;
79 key->fd = sock;
80 key->user_data = user_data;
81 pj_list_init(&key->read_list);
82 pj_list_init(&key->write_list);
83#if PJ_HAS_TCP
84 pj_list_init(&key->accept_list);
Benny Prijono5accbd02006-03-30 16:32:18 +000085 key->connecting = 0;
Benny Prijono9033e312005-11-21 02:08:39 +000086#endif
87
88 /* Save callback. */
89 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
90
Benny Prijono5accbd02006-03-30 16:32:18 +000091#if PJ_IOQUEUE_HAS_SAFE_UNREG
92 /* Set initial reference count to 1 */
93 pj_assert(key->ref_count == 0);
94 ++key->ref_count;
95
96 key->closing = 0;
97#endif
98
Benny Prijonoe3f79fd2008-02-13 15:17:28 +000099 rc = pj_ioqueue_set_concurrency(key, ioqueue->default_concurrency);
100 if (rc != PJ_SUCCESS)
101 return rc;
102
Benny Prijono9033e312005-11-21 02:08:39 +0000103 /* Get socket type. When socket type is datagram, some optimization
104 * will be performed during send to allow parallel send operations.
105 */
106 optlen = sizeof(key->fd_type);
Benny Prijono8ab968f2007-07-20 08:08:30 +0000107 rc = pj_sock_getsockopt(sock, pj_SOL_SOCKET(), pj_SO_TYPE(),
Benny Prijono9033e312005-11-21 02:08:39 +0000108 &key->fd_type, &optlen);
109 if (rc != PJ_SUCCESS)
Benny Prijono8ab968f2007-07-20 08:08:30 +0000110 key->fd_type = pj_SOCK_STREAM();
Benny Prijono9033e312005-11-21 02:08:39 +0000111
112 /* Create mutex for the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000113#if !PJ_IOQUEUE_HAS_SAFE_UNREG
114 rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
115#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000116
117 return rc;
118}
119
Benny Prijono9033e312005-11-21 02:08:39 +0000120/*
121 * pj_ioqueue_get_user_data()
122 *
123 * Obtain value associated with a key.
124 */
125PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
126{
127 PJ_ASSERT_RETURN(key != NULL, NULL);
128 return key->user_data;
129}
130
131/*
132 * pj_ioqueue_set_user_data()
133 */
134PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
135 void *user_data,
136 void **old_data)
137{
138 PJ_ASSERT_RETURN(key, PJ_EINVAL);
139
140 if (old_data)
141 *old_data = key->user_data;
142 key->user_data = user_data;
143
144 return PJ_SUCCESS;
145}
146
147PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
148{
149 return !pj_list_empty(&key->write_list);
150}
151
152PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
153{
154 return !pj_list_empty(&key->read_list);
155}
156
157PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
158{
159#if PJ_HAS_TCP
160 return !pj_list_empty(&key->accept_list);
161#else
Benny Prijono3569c0d2007-04-06 10:29:20 +0000162 PJ_UNUSED_ARG(key);
Benny Prijono9033e312005-11-21 02:08:39 +0000163 return 0;
164#endif
165}
166
167PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
168{
169 return key->connecting;
170}
171
172
Benny Prijono5accbd02006-03-30 16:32:18 +0000173#if PJ_IOQUEUE_HAS_SAFE_UNREG
174# define IS_CLOSING(key) (key->closing)
175#else
176# define IS_CLOSING(key) (0)
177#endif
178
179
Benny Prijono9033e312005-11-21 02:08:39 +0000180/*
181 * ioqueue_dispatch_event()
182 *
183 * Report occurence of an event in the key to be processed by the
184 * framework.
185 */
186void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
187{
188 /* Lock the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000189 pj_mutex_lock(h->mutex);
190
Benny Prijono3059eb62006-10-04 20:46:27 +0000191 if (IS_CLOSING(h)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000192 pj_mutex_unlock(h->mutex);
193 return;
194 }
Benny Prijono9033e312005-11-21 02:08:39 +0000195
196#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
197 if (h->connecting) {
198 /* Completion of connect() operation */
199 pj_ssize_t bytes_transfered;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000200 pj_bool_t has_lock;
Benny Prijono9033e312005-11-21 02:08:39 +0000201
202 /* Clear operation. */
203 h->connecting = 0;
204
Benny Prijono63ab3562006-07-08 19:46:43 +0000205 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
206 ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000207
Benny Prijono9033e312005-11-21 02:08:39 +0000208
209#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
210 /* from connect(2):
211 * On Linux, use getsockopt to read the SO_ERROR option at
212 * level SOL_SOCKET to determine whether connect() completed
213 * successfully (if SO_ERROR is zero).
214 */
215 {
216 int value;
Benny Prijono7db431e2006-07-23 14:38:49 +0000217 int vallen = sizeof(value);
Benny Prijono2bbd7102006-07-18 00:10:53 +0000218 int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
219 &value, &vallen);
Benny Prijono9033e312005-11-21 02:08:39 +0000220 if (gs_rc != 0) {
221 /* Argh!! What to do now???
222 * Just indicate that the socket is connected. The
223 * application will get error as soon as it tries to use
224 * the socket to send/receive.
225 */
226 bytes_transfered = 0;
227 } else {
228 bytes_transfered = value;
229 }
230 }
231#elif defined(PJ_WIN32) && PJ_WIN32!=0
232 bytes_transfered = 0; /* success */
233#else
234 /* Excellent information in D.J. Bernstein page:
235 * http://cr.yp.to/docs/connect.html
236 *
237 * Seems like the most portable way of detecting connect()
238 * failure is to call getpeername(). If socket is connected,
239 * getpeername() will return 0. If the socket is not connected,
240 * it will return ENOTCONN, and read(fd, &ch, 1) will produce
241 * the right errno through error slippage. This is a combination
242 * of suggestions from Douglas C. Schmidt and Ken Keys.
243 */
Benny Prijono9cf138e2006-01-19 03:58:29 +0000244 {
245 int gp_rc;
246 struct sockaddr_in addr;
247 socklen_t addrlen = sizeof(addr);
Benny Prijono9033e312005-11-21 02:08:39 +0000248
Benny Prijono9cf138e2006-01-19 03:58:29 +0000249 gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000250 bytes_transfered = (gp_rc < 0) ? gp_rc : -gp_rc;
Benny Prijono9cf138e2006-01-19 03:58:29 +0000251 }
Benny Prijono9033e312005-11-21 02:08:39 +0000252#endif
253
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000254 /* Unlock; from this point we don't need to hold key's mutex
255 * (unless concurrency is disabled, which in this case we should
256 * hold the mutex while calling the callback) */
257 if (h->allow_concurrent) {
258 /* concurrency may be changed while we're in the callback, so
259 * save it to a flag.
260 */
261 has_lock = PJ_FALSE;
262 pj_mutex_unlock(h->mutex);
263 } else {
264 has_lock = PJ_TRUE;
265 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000266
Benny Prijono9033e312005-11-21 02:08:39 +0000267 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000268 if (h->cb.on_connect_complete && !IS_CLOSING(h))
Benny Prijono9033e312005-11-21 02:08:39 +0000269 (*h->cb.on_connect_complete)(h, bytes_transfered);
270
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000271 /* Unlock if we still hold the lock */
272 if (has_lock) {
273 pj_mutex_unlock(h->mutex);
274 }
275
Benny Prijono9033e312005-11-21 02:08:39 +0000276 /* Done. */
277
278 } else
279#endif /* PJ_HAS_TCP */
280 if (key_has_pending_write(h)) {
281 /* Socket is writable. */
282 struct write_operation *write_op;
283 pj_ssize_t sent;
284 pj_status_t send_rc;
285
286 /* Get the first in the queue. */
287 write_op = h->write_list.next;
288
289 /* For datagrams, we can remove the write_op from the list
290 * so that send() can work in parallel.
291 */
Benny Prijono8ab968f2007-07-20 08:08:30 +0000292 if (h->fd_type == pj_SOCK_DGRAM()) {
Benny Prijono9033e312005-11-21 02:08:39 +0000293 pj_list_erase(write_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000294
295 if (pj_list_empty(&h->write_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000296 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000297
Benny Prijono9033e312005-11-21 02:08:39 +0000298 }
299
300 /* Send the data.
301 * Unfortunately we must do this while holding key's mutex, thus
302 * preventing parallel write on a single key.. :-((
303 */
304 sent = write_op->size - write_op->written;
305 if (write_op->op == PJ_IOQUEUE_OP_SEND) {
306 send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
307 &sent, write_op->flags);
Benny Prijono40212582006-06-22 18:41:28 +0000308 /* Can't do this. We only clear "op" after we're finished sending
309 * the whole buffer.
310 */
311 //write_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000312 } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
313 send_rc = pj_sock_sendto(h->fd,
314 write_op->buf+write_op->written,
315 &sent, write_op->flags,
316 &write_op->rmt_addr,
317 write_op->rmt_addrlen);
Benny Prijono40212582006-06-22 18:41:28 +0000318 /* Can't do this. We only clear "op" after we're finished sending
319 * the whole buffer.
320 */
321 //write_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000322 } else {
323 pj_assert(!"Invalid operation type!");
Benny Prijonoa1e69682007-05-11 15:14:34 +0000324 write_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000325 send_rc = PJ_EBUG;
326 }
327
328 if (send_rc == PJ_SUCCESS) {
329 write_op->written += sent;
330 } else {
331 pj_assert(send_rc > 0);
332 write_op->written = -send_rc;
333 }
334
335 /* Are we finished with this buffer? */
336 if (send_rc!=PJ_SUCCESS ||
337 write_op->written == (pj_ssize_t)write_op->size ||
Benny Prijono8ab968f2007-07-20 08:08:30 +0000338 h->fd_type == pj_SOCK_DGRAM())
Benny Prijono9033e312005-11-21 02:08:39 +0000339 {
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000340 pj_bool_t has_lock;
Benny Prijono40212582006-06-22 18:41:28 +0000341
Benny Prijonoa1e69682007-05-11 15:14:34 +0000342 write_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono40212582006-06-22 18:41:28 +0000343
Benny Prijono8ab968f2007-07-20 08:08:30 +0000344 if (h->fd_type != pj_SOCK_DGRAM()) {
Benny Prijono9033e312005-11-21 02:08:39 +0000345 /* Write completion of the whole stream. */
346 pj_list_erase(write_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000347
348 /* Clear operation if there's no more data to send. */
349 if (pj_list_empty(&h->write_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000350 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000351
Benny Prijono9033e312005-11-21 02:08:39 +0000352 }
353
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000354 /* Unlock; from this point we don't need to hold key's mutex
355 * (unless concurrency is disabled, which in this case we should
356 * hold the mutex while calling the callback) */
357 if (h->allow_concurrent) {
358 /* concurrency may be changed while we're in the callback, so
359 * save it to a flag.
360 */
361 has_lock = PJ_FALSE;
362 pj_mutex_unlock(h->mutex);
363 } else {
364 has_lock = PJ_TRUE;
365 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000366
Benny Prijono9033e312005-11-21 02:08:39 +0000367 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000368 if (h->cb.on_write_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000369 (*h->cb.on_write_complete)(h,
370 (pj_ioqueue_op_key_t*)write_op,
371 write_op->written);
372 }
373
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000374 if (has_lock) {
375 pj_mutex_unlock(h->mutex);
376 }
377
Benny Prijono9033e312005-11-21 02:08:39 +0000378 } else {
Benny Prijono5accbd02006-03-30 16:32:18 +0000379 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000380 }
381
382 /* Done. */
383 } else {
384 /*
385 * This is normal; execution may fall here when multiple threads
386 * are signalled for the same event, but only one thread eventually
387 * able to process the event.
388 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000389 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000390 }
391}
392
393void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
394{
395 pj_status_t rc;
396
397 /* Lock the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000398 pj_mutex_lock(h->mutex);
399
Benny Prijono3059eb62006-10-04 20:46:27 +0000400 if (IS_CLOSING(h)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000401 pj_mutex_unlock(h->mutex);
402 return;
403 }
Benny Prijono9033e312005-11-21 02:08:39 +0000404
405# if PJ_HAS_TCP
406 if (!pj_list_empty(&h->accept_list)) {
407
408 struct accept_operation *accept_op;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000409 pj_bool_t has_lock;
Benny Prijono9033e312005-11-21 02:08:39 +0000410
411 /* Get one accept operation from the list. */
412 accept_op = h->accept_list.next;
413 pj_list_erase(accept_op);
Benny Prijonoa1e69682007-05-11 15:14:34 +0000414 accept_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000415
416 /* Clear bit in fdset if there is no more pending accept */
417 if (pj_list_empty(&h->accept_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000418 ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000419
Benny Prijono9033e312005-11-21 02:08:39 +0000420 rc=pj_sock_accept(h->fd, accept_op->accept_fd,
421 accept_op->rmt_addr, accept_op->addrlen);
422 if (rc==PJ_SUCCESS && accept_op->local_addr) {
423 rc = pj_sock_getsockname(*accept_op->accept_fd,
424 accept_op->local_addr,
425 accept_op->addrlen);
426 }
427
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000428 /* Unlock; from this point we don't need to hold key's mutex
429 * (unless concurrency is disabled, which in this case we should
430 * hold the mutex while calling the callback) */
431 if (h->allow_concurrent) {
432 /* concurrency may be changed while we're in the callback, so
433 * save it to a flag.
434 */
435 has_lock = PJ_FALSE;
436 pj_mutex_unlock(h->mutex);
437 } else {
438 has_lock = PJ_TRUE;
439 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000440
Benny Prijono9033e312005-11-21 02:08:39 +0000441 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000442 if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000443 (*h->cb.on_accept_complete)(h,
444 (pj_ioqueue_op_key_t*)accept_op,
445 *accept_op->accept_fd, rc);
446 }
447
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000448 if (has_lock) {
449 pj_mutex_unlock(h->mutex);
450 }
Benny Prijono9033e312005-11-21 02:08:39 +0000451 }
452 else
453# endif
454 if (key_has_pending_read(h)) {
455 struct read_operation *read_op;
456 pj_ssize_t bytes_read;
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000457 pj_bool_t has_lock;
Benny Prijono9033e312005-11-21 02:08:39 +0000458
459 /* Get one pending read operation from the list. */
460 read_op = h->read_list.next;
461 pj_list_erase(read_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000462
463 /* Clear fdset if there is no pending read. */
464 if (pj_list_empty(&h->read_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000465 ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000466
Benny Prijono9033e312005-11-21 02:08:39 +0000467 bytes_read = read_op->size;
468
469 if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
Benny Prijonoa1e69682007-05-11 15:14:34 +0000470 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijonof6b95302006-12-25 06:36:23 +0000471 rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read,
472 read_op->flags,
Benny Prijono9033e312005-11-21 02:08:39 +0000473 read_op->rmt_addr,
474 read_op->rmt_addrlen);
475 } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
Benny Prijonoa1e69682007-05-11 15:14:34 +0000476 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijonof6b95302006-12-25 06:36:23 +0000477 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
478 read_op->flags);
Benny Prijono9033e312005-11-21 02:08:39 +0000479 } else {
480 pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
Benny Prijonoa1e69682007-05-11 15:14:34 +0000481 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000482 /*
483 * User has specified pj_ioqueue_read().
484 * On Win32, we should do ReadFile(). But because we got
485 * here because of select() anyway, user must have put a
486 * socket descriptor on h->fd, which in this case we can
487 * just call pj_sock_recv() instead of ReadFile().
488 * On Unix, user may put a file in h->fd, so we'll have
489 * to call read() here.
490 * This may not compile on systems which doesn't have
491 * read(). That's why we only specify PJ_LINUX here so
492 * that error is easier to catch.
493 */
Benny Prijono9cf138e2006-01-19 03:58:29 +0000494# if defined(PJ_WIN32) && PJ_WIN32 != 0 || \
495 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE != 0
Benny Prijonof6b95302006-12-25 06:36:23 +0000496 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
497 read_op->flags);
Benny Prijono9033e312005-11-21 02:08:39 +0000498 //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
499 // &bytes_read, NULL);
500# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
501 bytes_read = read(h->fd, read_op->buf, bytes_read);
502 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
503# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
504 bytes_read = sys_read(h->fd, read_op->buf, bytes_read);
505 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
506# else
507# error "Implement read() for this platform!"
508# endif
509 }
510
511 if (rc != PJ_SUCCESS) {
512# if defined(PJ_WIN32) && PJ_WIN32 != 0
513 /* On Win32, for UDP, WSAECONNRESET on the receive side
514 * indicates that previous sending has triggered ICMP Port
515 * Unreachable message.
516 * But we wouldn't know at this point which one of previous
517 * key that has triggered the error, since UDP socket can
518 * be shared!
519 * So we'll just ignore it!
520 */
521
522 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
523 //PJ_LOG(4,(THIS_FILE,
524 // "Ignored ICMP port unreach. on key=%p", h));
525 }
526# endif
527
528 /* In any case we would report this to caller. */
529 bytes_read = -rc;
530 }
531
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000532 /* Unlock; from this point we don't need to hold key's mutex
533 * (unless concurrency is disabled, which in this case we should
534 * hold the mutex while calling the callback) */
535 if (h->allow_concurrent) {
536 /* concurrency may be changed while we're in the callback, so
537 * save it to a flag.
538 */
539 has_lock = PJ_FALSE;
540 pj_mutex_unlock(h->mutex);
541 } else {
542 has_lock = PJ_TRUE;
543 }
Benny Prijono5accbd02006-03-30 16:32:18 +0000544
Benny Prijono9033e312005-11-21 02:08:39 +0000545 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000546 if (h->cb.on_read_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000547 (*h->cb.on_read_complete)(h,
548 (pj_ioqueue_op_key_t*)read_op,
549 bytes_read);
550 }
551
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000552 if (has_lock) {
553 pj_mutex_unlock(h->mutex);
554 }
555
Benny Prijono9033e312005-11-21 02:08:39 +0000556 } else {
557 /*
558 * This is normal; execution may fall here when multiple threads
559 * are signalled for the same event, but only one thread eventually
560 * able to process the event.
561 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000562 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000563 }
564}
565
566
567void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
568 pj_ioqueue_key_t *h )
569{
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000570 pj_bool_t has_lock;
571
Benny Prijono5accbd02006-03-30 16:32:18 +0000572 pj_mutex_lock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000573
574 if (!h->connecting) {
575 /* It is possible that more than one thread was woken up, thus
576 * the remaining thread will see h->connecting as zero because
577 * it has been processed by other thread.
578 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000579 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000580 return;
581 }
582
Benny Prijono3059eb62006-10-04 20:46:27 +0000583 if (IS_CLOSING(h)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000584 pj_mutex_unlock(h->mutex);
585 return;
586 }
587
Benny Prijono9033e312005-11-21 02:08:39 +0000588 /* Clear operation. */
589 h->connecting = 0;
590
Benny Prijono63ab3562006-07-08 19:46:43 +0000591 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
592 ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000593
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000594 /* Unlock; from this point we don't need to hold key's mutex
595 * (unless concurrency is disabled, which in this case we should
596 * hold the mutex while calling the callback) */
597 if (h->allow_concurrent) {
598 /* concurrency may be changed while we're in the callback, so
599 * save it to a flag.
600 */
601 has_lock = PJ_FALSE;
602 pj_mutex_unlock(h->mutex);
603 } else {
604 has_lock = PJ_TRUE;
605 }
Benny Prijonoac52df42006-03-25 10:06:00 +0000606
Benny Prijono5accbd02006-03-30 16:32:18 +0000607 /* Call callback. */
Benny Prijono2bbd7102006-07-18 00:10:53 +0000608 if (h->cb.on_connect_complete && !IS_CLOSING(h)) {
609 pj_status_t status = -1;
610#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
611 int value;
Benny Prijono7db431e2006-07-23 14:38:49 +0000612 int vallen = sizeof(value);
Benny Prijono2bbd7102006-07-18 00:10:53 +0000613 int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
614 &value, &vallen);
615 if (gs_rc == 0) {
616 status = PJ_RETURN_OS_ERROR(value);
617 }
618#endif
619
620 (*h->cb.on_connect_complete)(h, status);
621 }
Benny Prijonoe3f79fd2008-02-13 15:17:28 +0000622
623 if (has_lock) {
624 pj_mutex_unlock(h->mutex);
625 }
Benny Prijono9033e312005-11-21 02:08:39 +0000626}
627
628/*
629 * pj_ioqueue_recv()
630 *
631 * Start asynchronous recv() from the socket.
632 */
633PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
634 pj_ioqueue_op_key_t *op_key,
635 void *buffer,
636 pj_ssize_t *length,
637 unsigned flags )
638{
639 struct read_operation *read_op;
640
641 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
642 PJ_CHECK_STACK();
643
Benny Prijono8ac081b2008-02-14 14:20:38 +0000644 /* Check if key is closing (need to do this first before accessing
645 * other variables, since they might have been destroyed. See ticket
646 * #469).
647 */
Benny Prijono3059eb62006-10-04 20:46:27 +0000648 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000649 return PJ_ECANCELLED;
650
Benny Prijono8ac081b2008-02-14 14:20:38 +0000651 read_op = (struct read_operation*)op_key;
652 read_op->op = PJ_IOQUEUE_OP_NONE;
653
Benny Prijono9033e312005-11-21 02:08:39 +0000654 /* Try to see if there's data immediately available.
655 */
656 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
657 pj_status_t status;
658 pj_ssize_t size;
659
660 size = *length;
661 status = pj_sock_recv(key->fd, buffer, &size, flags);
662 if (status == PJ_SUCCESS) {
663 /* Yes! Data is available! */
664 *length = size;
665 return PJ_SUCCESS;
666 } else {
667 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
668 * the error to caller.
669 */
670 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
671 return status;
672 }
673 }
674
675 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
676
677 /*
678 * No data is immediately available.
679 * Must schedule asynchronous operation to the ioqueue.
680 */
681 read_op->op = PJ_IOQUEUE_OP_RECV;
682 read_op->buf = buffer;
683 read_op->size = *length;
684 read_op->flags = flags;
685
686 pj_mutex_lock(key->mutex);
687 pj_list_insert_before(&key->read_list, read_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000688 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000689 pj_mutex_unlock(key->mutex);
690
691 return PJ_EPENDING;
692}
693
694/*
695 * pj_ioqueue_recvfrom()
696 *
697 * Start asynchronous recvfrom() from the socket.
698 */
699PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
700 pj_ioqueue_op_key_t *op_key,
701 void *buffer,
702 pj_ssize_t *length,
703 unsigned flags,
704 pj_sockaddr_t *addr,
705 int *addrlen)
706{
707 struct read_operation *read_op;
708
709 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
710 PJ_CHECK_STACK();
711
Benny Prijono5accbd02006-03-30 16:32:18 +0000712 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000713 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000714 return PJ_ECANCELLED;
715
Benny Prijono9033e312005-11-21 02:08:39 +0000716 read_op = (struct read_operation*)op_key;
Benny Prijonoa1e69682007-05-11 15:14:34 +0000717 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000718
719 /* Try to see if there's data immediately available.
720 */
721 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
722 pj_status_t status;
723 pj_ssize_t size;
724
725 size = *length;
726 status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
727 addr, addrlen);
728 if (status == PJ_SUCCESS) {
729 /* Yes! Data is available! */
730 *length = size;
731 return PJ_SUCCESS;
732 } else {
733 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
734 * the error to caller.
735 */
736 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
737 return status;
738 }
739 }
740
741 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
742
743 /*
744 * No data is immediately available.
745 * Must schedule asynchronous operation to the ioqueue.
746 */
747 read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
748 read_op->buf = buffer;
749 read_op->size = *length;
750 read_op->flags = flags;
751 read_op->rmt_addr = addr;
752 read_op->rmt_addrlen = addrlen;
753
754 pj_mutex_lock(key->mutex);
755 pj_list_insert_before(&key->read_list, read_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000756 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000757 pj_mutex_unlock(key->mutex);
758
759 return PJ_EPENDING;
760}
761
762/*
763 * pj_ioqueue_send()
764 *
765 * Start asynchronous send() to the descriptor.
766 */
767PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
768 pj_ioqueue_op_key_t *op_key,
769 const void *data,
770 pj_ssize_t *length,
771 unsigned flags)
772{
773 struct write_operation *write_op;
774 pj_status_t status;
Benny Prijono40212582006-06-22 18:41:28 +0000775 unsigned retry;
Benny Prijono9033e312005-11-21 02:08:39 +0000776 pj_ssize_t sent;
777
778 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
779 PJ_CHECK_STACK();
780
Benny Prijono5accbd02006-03-30 16:32:18 +0000781 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000782 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000783 return PJ_ECANCELLED;
784
Benny Prijono9033e312005-11-21 02:08:39 +0000785 /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */
786 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
787
788 /* Fast track:
789 * Try to send data immediately, only if there's no pending write!
790 * Note:
791 * We are speculating that the list is empty here without properly
792 * acquiring ioqueue's mutex first. This is intentional, to maximize
793 * performance via parallelism.
794 *
795 * This should be safe, because:
796 * - by convention, we require caller to make sure that the
797 * key is not unregistered while other threads are invoking
798 * an operation on the same key.
799 * - pj_list_empty() is safe to be invoked by multiple threads,
800 * even when other threads are modifying the list.
801 */
802 if (pj_list_empty(&key->write_list)) {
803 /*
804 * See if data can be sent immediately.
805 */
806 sent = *length;
807 status = pj_sock_send(key->fd, data, &sent, flags);
808 if (status == PJ_SUCCESS) {
809 /* Success! */
810 *length = sent;
811 return PJ_SUCCESS;
812 } else {
813 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
814 * the error to caller.
815 */
816 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
817 return status;
818 }
819 }
820 }
821
822 /*
823 * Schedule asynchronous send.
824 */
Benny Prijono40212582006-06-22 18:41:28 +0000825 write_op = (struct write_operation*)op_key;
826
827 /* Spin if write_op has pending operation */
828 for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
829 pj_thread_sleep(0);
830
831 /* Last chance */
832 if (write_op->op) {
833 /* Unable to send packet because there is already pending write in the
834 * write_op. We could not put the operation into the write_op
835 * because write_op already contains a pending operation! And
836 * we could not send the packet directly with send() either,
837 * because that will break the order of the packet. So we can
838 * only return error here.
839 *
840 * This could happen for example in multithreads program,
841 * where polling is done by one thread, while other threads are doing
842 * the sending only. If the polling thread runs on lower priority
843 * than the sending thread, then it's possible that the pending
844 * write flag is not cleared in-time because clearing is only done
845 * during polling.
846 *
847 * Aplication should specify multiple write operation keys on
848 * situation like this.
849 */
850 //pj_assert(!"ioqueue: there is pending operation on this key!");
851 return PJ_EBUSY;
852 }
853
Benny Prijono9033e312005-11-21 02:08:39 +0000854 write_op->op = PJ_IOQUEUE_OP_SEND;
Benny Prijonoa1e69682007-05-11 15:14:34 +0000855 write_op->buf = (char*)data;
Benny Prijono9033e312005-11-21 02:08:39 +0000856 write_op->size = *length;
857 write_op->written = 0;
858 write_op->flags = flags;
859
860 pj_mutex_lock(key->mutex);
861 pj_list_insert_before(&key->write_list, write_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000862 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000863 pj_mutex_unlock(key->mutex);
864
865 return PJ_EPENDING;
866}
867
868
869/*
870 * pj_ioqueue_sendto()
871 *
872 * Start asynchronous write() to the descriptor.
873 */
874PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
875 pj_ioqueue_op_key_t *op_key,
876 const void *data,
877 pj_ssize_t *length,
878 pj_uint32_t flags,
879 const pj_sockaddr_t *addr,
880 int addrlen)
881{
882 struct write_operation *write_op;
Benny Prijono40212582006-06-22 18:41:28 +0000883 unsigned retry;
Benny Prijono9033e312005-11-21 02:08:39 +0000884 pj_status_t status;
885 pj_ssize_t sent;
886
887 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
888 PJ_CHECK_STACK();
889
Benny Prijono5accbd02006-03-30 16:32:18 +0000890 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000891 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000892 return PJ_ECANCELLED;
893
Benny Prijono9033e312005-11-21 02:08:39 +0000894 /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */
895 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
896
897 /* Fast track:
898 * Try to send data immediately, only if there's no pending write!
899 * Note:
900 * We are speculating that the list is empty here without properly
901 * acquiring ioqueue's mutex first. This is intentional, to maximize
902 * performance via parallelism.
903 *
904 * This should be safe, because:
905 * - by convention, we require caller to make sure that the
906 * key is not unregistered while other threads are invoking
907 * an operation on the same key.
908 * - pj_list_empty() is safe to be invoked by multiple threads,
909 * even when other threads are modifying the list.
910 */
911 if (pj_list_empty(&key->write_list)) {
912 /*
913 * See if data can be sent immediately.
914 */
915 sent = *length;
916 status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
917 if (status == PJ_SUCCESS) {
918 /* Success! */
919 *length = sent;
920 return PJ_SUCCESS;
921 } else {
922 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
923 * the error to caller.
924 */
925 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
926 return status;
927 }
Benny Prijono40212582006-06-22 18:41:28 +0000928 status = status;
Benny Prijono9033e312005-11-21 02:08:39 +0000929 }
930 }
931
932 /*
933 * Check that address storage can hold the address parameter.
934 */
Benny Prijonoa1e69682007-05-11 15:14:34 +0000935 PJ_ASSERT_RETURN(addrlen <= (int)sizeof(pj_sockaddr_in), PJ_EBUG);
Benny Prijono9033e312005-11-21 02:08:39 +0000936
937 /*
938 * Schedule asynchronous send.
939 */
Benny Prijono40212582006-06-22 18:41:28 +0000940 write_op = (struct write_operation*)op_key;
941
942 /* Spin if write_op has pending operation */
943 for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
944 pj_thread_sleep(0);
945
946 /* Last chance */
947 if (write_op->op) {
948 /* Unable to send packet because there is already pending write on the
949 * write_op. We could not put the operation into the write_op
950 * because write_op already contains a pending operation! And
951 * we could not send the packet directly with sendto() either,
952 * because that will break the order of the packet. So we can
953 * only return error here.
954 *
955 * This could happen for example in multithreads program,
956 * where polling is done by one thread, while other threads are doing
957 * the sending only. If the polling thread runs on lower priority
958 * than the sending thread, then it's possible that the pending
959 * write flag is not cleared in-time because clearing is only done
960 * during polling.
961 *
962 * Aplication should specify multiple write operation keys on
963 * situation like this.
964 */
965 //pj_assert(!"ioqueue: there is pending operation on this key!");
966 return PJ_EBUSY;
967 }
968
Benny Prijono9033e312005-11-21 02:08:39 +0000969 write_op->op = PJ_IOQUEUE_OP_SEND_TO;
Benny Prijonoa1e69682007-05-11 15:14:34 +0000970 write_op->buf = (char*)data;
Benny Prijono9033e312005-11-21 02:08:39 +0000971 write_op->size = *length;
972 write_op->written = 0;
973 write_op->flags = flags;
974 pj_memcpy(&write_op->rmt_addr, addr, addrlen);
975 write_op->rmt_addrlen = addrlen;
976
977 pj_mutex_lock(key->mutex);
978 pj_list_insert_before(&key->write_list, write_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000979 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000980 pj_mutex_unlock(key->mutex);
981
982 return PJ_EPENDING;
983}
984
985#if PJ_HAS_TCP
986/*
987 * Initiate overlapped accept() operation.
988 */
989PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
990 pj_ioqueue_op_key_t *op_key,
991 pj_sock_t *new_sock,
992 pj_sockaddr_t *local,
993 pj_sockaddr_t *remote,
994 int *addrlen)
995{
996 struct accept_operation *accept_op;
997 pj_status_t status;
998
999 /* check parameters. All must be specified! */
1000 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
1001
Benny Prijono5accbd02006-03-30 16:32:18 +00001002 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +00001003 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +00001004 return PJ_ECANCELLED;
1005
Benny Prijono9033e312005-11-21 02:08:39 +00001006 accept_op = (struct accept_operation*)op_key;
Benny Prijonoa1e69682007-05-11 15:14:34 +00001007 accept_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +00001008
1009 /* Fast track:
1010 * See if there's new connection available immediately.
1011 */
1012 if (pj_list_empty(&key->accept_list)) {
1013 status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
1014 if (status == PJ_SUCCESS) {
1015 /* Yes! New connection is available! */
1016 if (local && addrlen) {
1017 status = pj_sock_getsockname(*new_sock, local, addrlen);
1018 if (status != PJ_SUCCESS) {
1019 pj_sock_close(*new_sock);
1020 *new_sock = PJ_INVALID_SOCKET;
1021 return status;
1022 }
1023 }
1024 return PJ_SUCCESS;
1025 } else {
1026 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
1027 * the error to caller.
1028 */
1029 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
1030 return status;
1031 }
1032 }
1033 }
1034
1035 /*
1036 * No connection is available immediately.
1037 * Schedule accept() operation to be completed when there is incoming
1038 * connection available.
1039 */
1040 accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
1041 accept_op->accept_fd = new_sock;
1042 accept_op->rmt_addr = remote;
1043 accept_op->addrlen= addrlen;
1044 accept_op->local_addr = local;
1045
1046 pj_mutex_lock(key->mutex);
1047 pj_list_insert_before(&key->accept_list, accept_op);
Benny Prijono63ab3562006-07-08 19:46:43 +00001048 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +00001049 pj_mutex_unlock(key->mutex);
1050
1051 return PJ_EPENDING;
1052}
1053
1054/*
1055 * Initiate overlapped connect() operation (well, it's non-blocking actually,
1056 * since there's no overlapped version of connect()).
1057 */
1058PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
1059 const pj_sockaddr_t *addr,
1060 int addrlen )
1061{
1062 pj_status_t status;
1063
1064 /* check parameters. All must be specified! */
1065 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
1066
Benny Prijono5accbd02006-03-30 16:32:18 +00001067 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +00001068 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +00001069 return PJ_ECANCELLED;
1070
Benny Prijono9033e312005-11-21 02:08:39 +00001071 /* Check if socket has not been marked for connecting */
1072 if (key->connecting != 0)
1073 return PJ_EPENDING;
1074
1075 status = pj_sock_connect(key->fd, addr, addrlen);
1076 if (status == PJ_SUCCESS) {
1077 /* Connected! */
1078 return PJ_SUCCESS;
1079 } else {
1080 if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
1081 /* Pending! */
1082 pj_mutex_lock(key->mutex);
1083 key->connecting = PJ_TRUE;
Benny Prijono63ab3562006-07-08 19:46:43 +00001084 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
1085 ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +00001086 pj_mutex_unlock(key->mutex);
1087 return PJ_EPENDING;
1088 } else {
1089 /* Error! */
1090 return status;
1091 }
1092 }
1093}
1094#endif /* PJ_HAS_TCP */
1095
1096
1097PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1098 pj_size_t size )
1099{
Benny Prijonoac623b32006-07-03 15:19:31 +00001100 pj_bzero(op_key, size);
Benny Prijono9033e312005-11-21 02:08:39 +00001101}
1102
1103
1104/*
1105 * pj_ioqueue_is_pending()
1106 */
1107PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1108 pj_ioqueue_op_key_t *op_key )
1109{
1110 struct generic_operation *op_rec;
1111
1112 PJ_UNUSED_ARG(key);
1113
1114 op_rec = (struct generic_operation*)op_key;
1115 return op_rec->op != 0;
1116}
1117
1118
1119/*
1120 * pj_ioqueue_post_completion()
1121 */
1122PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1123 pj_ioqueue_op_key_t *op_key,
1124 pj_ssize_t bytes_status )
1125{
1126 struct generic_operation *op_rec;
1127
1128 /*
1129 * Find the operation key in all pending operation list to
1130 * really make sure that it's still there; then call the callback.
1131 */
Benny Prijono5accbd02006-03-30 16:32:18 +00001132 pj_mutex_lock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001133
1134 /* Find the operation in the pending read list. */
1135 op_rec = (struct generic_operation*)key->read_list.next;
1136 while (op_rec != (void*)&key->read_list) {
1137 if (op_rec == (void*)op_key) {
1138 pj_list_erase(op_rec);
Benny Prijonoa1e69682007-05-11 15:14:34 +00001139 op_rec->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono5accbd02006-03-30 16:32:18 +00001140 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001141
1142 (*key->cb.on_read_complete)(key, op_key, bytes_status);
1143 return PJ_SUCCESS;
1144 }
1145 op_rec = op_rec->next;
1146 }
1147
1148 /* Find the operation in the pending write list. */
1149 op_rec = (struct generic_operation*)key->write_list.next;
1150 while (op_rec != (void*)&key->write_list) {
1151 if (op_rec == (void*)op_key) {
1152 pj_list_erase(op_rec);
Benny Prijonoa1e69682007-05-11 15:14:34 +00001153 op_rec->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono5accbd02006-03-30 16:32:18 +00001154 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001155
1156 (*key->cb.on_write_complete)(key, op_key, bytes_status);
1157 return PJ_SUCCESS;
1158 }
1159 op_rec = op_rec->next;
1160 }
1161
1162 /* Find the operation in the pending accept list. */
1163 op_rec = (struct generic_operation*)key->accept_list.next;
1164 while (op_rec != (void*)&key->accept_list) {
1165 if (op_rec == (void*)op_key) {
1166 pj_list_erase(op_rec);
Benny Prijonoa1e69682007-05-11 15:14:34 +00001167 op_rec->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono5accbd02006-03-30 16:32:18 +00001168 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001169
1170 (*key->cb.on_accept_complete)(key, op_key,
1171 PJ_INVALID_SOCKET,
1172 bytes_status);
1173 return PJ_SUCCESS;
1174 }
1175 op_rec = op_rec->next;
1176 }
1177
Benny Prijono5accbd02006-03-30 16:32:18 +00001178 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001179
1180 return PJ_EINVALIDOP;
1181}
1182
Benny Prijonoe3f79fd2008-02-13 15:17:28 +00001183PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency( pj_ioqueue_t *ioqueue,
1184 pj_bool_t allow)
1185{
1186 PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
1187 ioqueue->default_concurrency = allow;
1188 return PJ_SUCCESS;
1189}
1190
1191
1192PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
1193 pj_bool_t allow)
1194{
1195 PJ_ASSERT_RETURN(key, PJ_EINVAL);
1196
1197 /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
1198 * disabled.
1199 */
1200 PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
1201
1202 key->allow_concurrent = allow;
1203 return PJ_SUCCESS;
1204}
1205
1206PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
1207{
1208 return pj_mutex_lock(key->mutex);
1209}
1210
1211PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
1212{
1213 return pj_mutex_unlock(key->mutex);
1214}
1215