blob: 367ffb5e5a0b0d0521a6bd0d4388bf8a6890e9f3 [file] [log] [blame]
Benny Prijono4766ffe2005-11-01 17:56:59 +00001/* $Id$
Benny Prijonodd859a62005-11-01 16:42:51 +00002 */
Benny Prijonodd859a62005-11-01 16:42:51 +00003/*
4 * sock_select.c
5 *
6 * This is the implementation of IOQueue using pj_sock_select().
7 * It runs anywhere where pj_sock_select() is available (currently
8 * Win32, Linux, Linux kernel, etc.).
9 */
10
11#include <pj/ioqueue.h>
12#include <pj/os.h>
13#include <pj/lock.h>
14#include <pj/log.h>
15#include <pj/list.h>
16#include <pj/pool.h>
17#include <pj/string.h>
18#include <pj/assert.h>
19#include <pj/sock.h>
20#include <pj/compat/socket.h>
21#include <pj/sock_select.h>
22#include <pj/errno.h>
23
24/*
25 * ISSUES with ioqueue_select()
26 *
27 * EAGAIN/EWOULDBLOCK error in recv():
28 * - when multiple threads are working with the ioqueue, application
29 * may receive EAGAIN or EWOULDBLOCK in the receive callback.
30 * This error happens because more than one thread is watching for
31 * the same descriptor set, so when all of them call recv() or recvfrom()
32 * simultaneously, only one will succeed and the rest will get the error.
33 *
34 */
35#define THIS_FILE "ioq_select"
Benny Prijonoa9946d52005-11-06 09:37:47 +000036
37/*
38 * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
39 * the correct error code.
40 */
41#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
42# error "Error reporting must be enabled for this function to work!"
43#endif
44
45/**
46 * Get the number of descriptors in the set. This is defined in sock_select.c
47 * This function will only return the number of sockets set from PJ_FD_SET
48 * operation. When the set is modified by other means (such as by select()),
49 * the count will not be reflected here.
50 *
51 * That's why don't export this function in the header file, to avoid
52 * misunderstanding.
53 *
54 * @param fdsetp The descriptor set.
55 *
56 * @return Number of descriptors in the set.
57 */
58PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
59
Benny Prijonodd859a62005-11-01 16:42:51 +000060
61
Benny Prijonodd859a62005-11-01 16:42:51 +000062
63/*
64 * During debugging build, VALIDATE_FD_SET is set.
65 * This will check the validity of the fd_sets.
66 */
67#if defined(PJ_DEBUG) && PJ_DEBUG != 0
68# define VALIDATE_FD_SET 1
69#else
70# define VALIDATE_FD_SET 0
71#endif
Benny Prijonoa9946d52005-11-06 09:37:47 +000072
73struct generic_operation
74{
75 PJ_DECL_LIST_MEMBER(struct generic_operation);
76 pj_ioqueue_operation_e op;
77};
78
79struct read_operation
80{
81 PJ_DECL_LIST_MEMBER(struct read_operation);
82 pj_ioqueue_operation_e op;
83
84 void *buf;
85 pj_size_t size;
86 unsigned flags;
87 pj_sockaddr_t *rmt_addr;
88 int *rmt_addrlen;
89};
90
91struct write_operation
92{
93 PJ_DECL_LIST_MEMBER(struct write_operation);
94 pj_ioqueue_operation_e op;
95
96 char *buf;
97 pj_size_t size;
98 pj_ssize_t written;
99 unsigned flags;
100 pj_sockaddr_in rmt_addr;
101 int rmt_addrlen;
102};
103
104#if PJ_HAS_TCP
105struct accept_operation
106{
107 PJ_DECL_LIST_MEMBER(struct accept_operation);
108 pj_ioqueue_operation_e op;
109
110 pj_sock_t *accept_fd;
111 pj_sockaddr_t *local_addr;
112 pj_sockaddr_t *rmt_addr;
113 int *addrlen;
114};
115#endif
116
117union operation_key
118{
119 struct generic_operation generic;
120 struct read_operation read;
121 struct write_operation write;
122#if PJ_HAS_TCP
123 struct accept_operation accept;
124#endif
125};
Benny Prijonodd859a62005-11-01 16:42:51 +0000126
127/*
128 * This describes each key.
129 */
130struct pj_ioqueue_key_t
131{
Benny Prijonoa9946d52005-11-06 09:37:47 +0000132 PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
133 pj_ioqueue_t *ioqueue;
Benny Prijonodd859a62005-11-01 16:42:51 +0000134 pj_sock_t fd;
Benny Prijonodd859a62005-11-01 16:42:51 +0000135 void *user_data;
Benny Prijonoa9946d52005-11-06 09:37:47 +0000136 pj_ioqueue_callback cb;
137 int connecting;
138 struct read_operation read_list;
139 struct write_operation write_list;
140#if PJ_HAS_TCP
141 struct accept_operation accept_list;
142#endif
Benny Prijonodd859a62005-11-01 16:42:51 +0000143};
144
145/*
146 * This describes the I/O queue itself.
147 */
148struct pj_ioqueue_t
149{
150 pj_lock_t *lock;
151 pj_bool_t auto_delete_lock;
152 unsigned max, count;
Benny Prijonoa9946d52005-11-06 09:37:47 +0000153 pj_ioqueue_key_t key_list;
Benny Prijonodd859a62005-11-01 16:42:51 +0000154 pj_fd_set_t rfdset;
155 pj_fd_set_t wfdset;
156#if PJ_HAS_TCP
157 pj_fd_set_t xfdset;
158#endif
159};
160
161/*
162 * pj_ioqueue_create()
163 *
164 * Create select ioqueue.
165 */
166PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
167 pj_size_t max_fd,
Benny Prijonodd859a62005-11-01 16:42:51 +0000168 pj_ioqueue_t **p_ioqueue)
169{
Benny Prijonoa9946d52005-11-06 09:37:47 +0000170 pj_ioqueue_t *ioqueue;
Benny Prijonodd859a62005-11-01 16:42:51 +0000171 pj_status_t rc;
Benny Prijonoa9946d52005-11-06 09:37:47 +0000172
173 /* Check that arguments are valid. */
174 PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
175 max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,
176 PJ_EINVAL);
177
178 /* Check that size of pj_ioqueue_op_key_t is sufficient */
179 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
180 sizeof(union operation_key), PJ_EBUG);
181
182 ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
183 ioqueue->max = max_fd;
184 ioqueue->count = 0;
185 PJ_FD_ZERO(&ioqueue->rfdset);
186 PJ_FD_ZERO(&ioqueue->wfdset);
Benny Prijonodd859a62005-11-01 16:42:51 +0000187#if PJ_HAS_TCP
Benny Prijonoa9946d52005-11-06 09:37:47 +0000188 PJ_FD_ZERO(&ioqueue->xfdset);
Benny Prijonodd859a62005-11-01 16:42:51 +0000189#endif
Benny Prijonoa9946d52005-11-06 09:37:47 +0000190 pj_list_init(&ioqueue->key_list);
Benny Prijonodd859a62005-11-01 16:42:51 +0000191
Benny Prijonoa9946d52005-11-06 09:37:47 +0000192 rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioqueue->lock);
Benny Prijonodd859a62005-11-01 16:42:51 +0000193 if (rc != PJ_SUCCESS)
194 return rc;
195
Benny Prijonoa9946d52005-11-06 09:37:47 +0000196 ioqueue->auto_delete_lock = PJ_TRUE;
Benny Prijonodd859a62005-11-01 16:42:51 +0000197
Benny Prijonoa9946d52005-11-06 09:37:47 +0000198 PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
Benny Prijonodd859a62005-11-01 16:42:51 +0000199
Benny Prijonoa9946d52005-11-06 09:37:47 +0000200 *p_ioqueue = ioqueue;
Benny Prijonodd859a62005-11-01 16:42:51 +0000201 return PJ_SUCCESS;
202}
203
204/*
205 * pj_ioqueue_destroy()
206 *
207 * Destroy ioqueue.
208 */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000209PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
Benny Prijonodd859a62005-11-01 16:42:51 +0000210{
211 pj_status_t rc = PJ_SUCCESS;
212
Benny Prijonoa9946d52005-11-06 09:37:47 +0000213 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
214
215 pj_lock_acquire(ioqueue->lock);
Benny Prijonodd859a62005-11-01 16:42:51 +0000216
Benny Prijonoa9946d52005-11-06 09:37:47 +0000217 if (ioqueue->auto_delete_lock)
218 rc = pj_lock_destroy(ioqueue->lock);
Benny Prijonodd859a62005-11-01 16:42:51 +0000219
220 return rc;
221}
222
223
224/*
Benny Prijonodd859a62005-11-01 16:42:51 +0000225 * pj_ioqueue_register_sock()
226 *
227 * Register a handle to ioqueue.
228 */
229PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
Benny Prijonoa9946d52005-11-06 09:37:47 +0000230 pj_ioqueue_t *ioqueue,
Benny Prijonodd859a62005-11-01 16:42:51 +0000231 pj_sock_t sock,
232 void *user_data,
233 const pj_ioqueue_callback *cb,
234 pj_ioqueue_key_t **p_key)
235{
236 pj_ioqueue_key_t *key = NULL;
237 pj_uint32_t value;
238 pj_status_t rc = PJ_SUCCESS;
239
Benny Prijonoa9946d52005-11-06 09:37:47 +0000240 PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
Benny Prijonodd859a62005-11-01 16:42:51 +0000241 cb && p_key, PJ_EINVAL);
242
Benny Prijonoa9946d52005-11-06 09:37:47 +0000243 pj_lock_acquire(ioqueue->lock);
Benny Prijonodd859a62005-11-01 16:42:51 +0000244
Benny Prijonoa9946d52005-11-06 09:37:47 +0000245 if (ioqueue->count >= ioqueue->max) {
Benny Prijonodd859a62005-11-01 16:42:51 +0000246 rc = PJ_ETOOMANY;
247 goto on_return;
248 }
249
250 /* Set socket to nonblocking. */
251 value = 1;
252#ifdef PJ_WIN32
Benny Prijonoa9946d52005-11-06 09:37:47 +0000253 if (ioctlsocket(sock, FIONBIO, (u_long*)&value)) {
Benny Prijonodd859a62005-11-01 16:42:51 +0000254#else
255 if (ioctl(sock, FIONBIO, &value)) {
256#endif
257 rc = pj_get_netos_error();
258 goto on_return;
259 }
260
261 /* Create key. */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000262 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
263 key->ioqueue = ioqueue;
Benny Prijonodd859a62005-11-01 16:42:51 +0000264 key->fd = sock;
Benny Prijonoa9946d52005-11-06 09:37:47 +0000265 key->user_data = user_data;
266 pj_list_init(&key->read_list);
267 pj_list_init(&key->write_list);
268#if PJ_HAS_TCP
269 pj_list_init(&key->accept_list);
270#endif
271
Benny Prijonodd859a62005-11-01 16:42:51 +0000272 /* Save callback. */
273 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
274
275 /* Register */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000276 pj_list_insert_before(&ioqueue->key_list, key);
277 ++ioqueue->count;
Benny Prijonodd859a62005-11-01 16:42:51 +0000278
Benny Prijonoa9946d52005-11-06 09:37:47 +0000279on_return:
280 /* On error, socket may be left in non-blocking mode. */
Benny Prijonodd859a62005-11-01 16:42:51 +0000281 *p_key = key;
Benny Prijonoa9946d52005-11-06 09:37:47 +0000282 pj_lock_release(ioqueue->lock);
Benny Prijonodd859a62005-11-01 16:42:51 +0000283
284 return rc;
285}
286
287/*
288 * pj_ioqueue_unregister()
289 *
290 * Unregister handle from ioqueue.
291 */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000292PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
293{
294 pj_ioqueue_t *ioqueue;
Benny Prijonodd859a62005-11-01 16:42:51 +0000295
Benny Prijonoa9946d52005-11-06 09:37:47 +0000296 PJ_ASSERT_RETURN(key, PJ_EINVAL);
297
298 ioqueue = key->ioqueue;
Benny Prijonodd859a62005-11-01 16:42:51 +0000299
Benny Prijonoa9946d52005-11-06 09:37:47 +0000300 pj_lock_acquire(ioqueue->lock);
301
302 pj_assert(ioqueue->count > 0);
303 --ioqueue->count;
Benny Prijonodd859a62005-11-01 16:42:51 +0000304 pj_list_erase(key);
Benny Prijonoa9946d52005-11-06 09:37:47 +0000305 PJ_FD_CLR(key->fd, &ioqueue->rfdset);
306 PJ_FD_CLR(key->fd, &ioqueue->wfdset);
Benny Prijonodd859a62005-11-01 16:42:51 +0000307#if PJ_HAS_TCP
Benny Prijonoa9946d52005-11-06 09:37:47 +0000308 PJ_FD_CLR(key->fd, &ioqueue->xfdset);
Benny Prijonodd859a62005-11-01 16:42:51 +0000309#endif
Benny Prijonoa9946d52005-11-06 09:37:47 +0000310
311 pj_lock_release(ioqueue->lock);
Benny Prijonodd859a62005-11-01 16:42:51 +0000312 return PJ_SUCCESS;
313}
314
315/*
316 * pj_ioqueue_get_user_data()
317 *
318 * Obtain value associated with a key.
319 */
320PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
321{
322 PJ_ASSERT_RETURN(key != NULL, NULL);
323 return key->user_data;
324}
325
Benny Prijonoa9946d52005-11-06 09:37:47 +0000326
327/*
328 * pj_ioqueue_set_user_data()
329 */
330PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
331 void *user_data,
332 void **old_data)
333{
334 PJ_ASSERT_RETURN(key, PJ_EINVAL);
335
336 if (old_data)
337 *old_data = key->user_data;
338 key->user_data = user_data;
339
340 return PJ_SUCCESS;
341}
342
Benny Prijonodd859a62005-11-01 16:42:51 +0000343
344/* This supposed to check whether the fd_set values are consistent
345 * with the operation currently set in each key.
346 */
347#if VALIDATE_FD_SET
Benny Prijonoa9946d52005-11-06 09:37:47 +0000348static void validate_sets(const pj_ioqueue_t *ioqueue,
Benny Prijonodd859a62005-11-01 16:42:51 +0000349 const pj_fd_set_t *rfdset,
350 const pj_fd_set_t *wfdset,
351 const pj_fd_set_t *xfdset)
352{
353 pj_ioqueue_key_t *key;
354
Benny Prijonoa9946d52005-11-06 09:37:47 +0000355 key = ioqueue->key_list.next;
356 while (key != &ioqueue->key_list) {
357 if (!pj_list_empty(&key->read_list)
Benny Prijonodd859a62005-11-01 16:42:51 +0000358#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
Benny Prijonoa9946d52005-11-06 09:37:47 +0000359 || !pj_list_empty(&key->accept_list)
Benny Prijonodd859a62005-11-01 16:42:51 +0000360#endif
361 )
362 {
363 pj_assert(PJ_FD_ISSET(key->fd, rfdset));
364 }
365 else {
366 pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
367 }
Benny Prijonoa9946d52005-11-06 09:37:47 +0000368 if (!pj_list_empty(&key->write_list)
Benny Prijonodd859a62005-11-01 16:42:51 +0000369#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
Benny Prijonoa9946d52005-11-06 09:37:47 +0000370 || key->connecting
Benny Prijonodd859a62005-11-01 16:42:51 +0000371#endif
372 )
373 {
374 pj_assert(PJ_FD_ISSET(key->fd, wfdset));
375 }
376 else {
377 pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
378 }
379#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
Benny Prijonoa9946d52005-11-06 09:37:47 +0000380 if (key->connecting)
Benny Prijonodd859a62005-11-01 16:42:51 +0000381 {
382 pj_assert(PJ_FD_ISSET(key->fd, xfdset));
383 }
384 else {
385 pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0);
386 }
387#endif /* PJ_HAS_TCP */
388
389 key = key->next;
390 }
391}
392#endif /* VALIDATE_FD_SET */
393
394
395/*
396 * pj_ioqueue_poll()
397 *
398 * Few things worth written:
399 *
400 * - we used to do only one callback called per poll, but it didn't go
401 * very well. The reason is because on some situation, the write
402 * callback gets called all the time, thus doesn't give the read
403 * callback to get called. This happens, for example, when user
404 * submit write operation inside the write callback.
405 * As the result, we changed the behaviour so that now multiple
406 * callbacks are called in a single poll. It should be fast too,
407 * just that we need to be carefull with the ioqueue data structs.
408 *
409 * - to guarantee preemptiveness etc, the poll function must strictly
410 * work on fd_set copy of the ioqueue (not the original one).
411 */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000412PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
Benny Prijonodd859a62005-11-01 16:42:51 +0000413{
414 pj_fd_set_t rfdset, wfdset, xfdset;
415 int count;
416 pj_ioqueue_key_t *h;
Benny Prijonoa9946d52005-11-06 09:37:47 +0000417
418 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
Benny Prijonodd859a62005-11-01 16:42:51 +0000419
420 /* Lock ioqueue before making fd_set copies */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000421 pj_lock_acquire(ioqueue->lock);
422
423 /* We will only do select() when there are sockets to be polled.
424 * Otherwise select() will return error.
425 */
426 if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
427 PJ_FD_COUNT(&ioqueue->wfdset)==0 &&
428 PJ_FD_COUNT(&ioqueue->xfdset)==0)
Benny Prijonodd859a62005-11-01 16:42:51 +0000429 {
Benny Prijonoa9946d52005-11-06 09:37:47 +0000430 pj_lock_release(ioqueue->lock);
Benny Prijonodd859a62005-11-01 16:42:51 +0000431 if (timeout)
432 pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
433 return 0;
434 }
435
436 /* Copy ioqueue's pj_fd_set_t to local variables. */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000437 pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));
438 pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));
Benny Prijonodd859a62005-11-01 16:42:51 +0000439#if PJ_HAS_TCP
Benny Prijonoa9946d52005-11-06 09:37:47 +0000440 pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));
Benny Prijonodd859a62005-11-01 16:42:51 +0000441#else
442 PJ_FD_ZERO(&xfdset);
443#endif
444
445#if VALIDATE_FD_SET
Benny Prijonoa9946d52005-11-06 09:37:47 +0000446 validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);
Benny Prijonodd859a62005-11-01 16:42:51 +0000447#endif
448
449 /* Unlock ioqueue before select(). */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000450 pj_lock_release(ioqueue->lock);
Benny Prijonodd859a62005-11-01 16:42:51 +0000451
452 count = pj_sock_select(FD_SETSIZE, &rfdset, &wfdset, &xfdset, timeout);
453
454 if (count <= 0)
455 return count;
456
Benny Prijonoa9946d52005-11-06 09:37:47 +0000457 /* Lock ioqueue again before scanning for signalled sockets.
458 * We must strictly use recursive mutex since application may invoke
459 * the ioqueue again inside the callback.
460 */
461 pj_lock_acquire(ioqueue->lock);
Benny Prijonodd859a62005-11-01 16:42:51 +0000462
Benny Prijonoa9946d52005-11-06 09:37:47 +0000463 /* Scan for writable sockets first to handle piggy-back data
464 * coming with accept().
465 */
466 h = ioqueue->key_list.next;
467do_writable_scan:
468 for ( ; h!=&ioqueue->key_list; h = h->next) {
469 if ( (!pj_list_empty(&h->write_list) || h->connecting)
470 && PJ_FD_ISSET(h->fd, &wfdset))
471 {
472 break;
473 }
474 }
475 if (h != &ioqueue->key_list) {
476 pj_assert(!pj_list_empty(&h->write_list) || h->connecting);
477
478#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
479 if (h->connecting) {
480 /* Completion of connect() operation */
481 pj_ssize_t bytes_transfered;
482
483#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
484 /* from connect(2):
485 * On Linux, use getsockopt to read the SO_ERROR option at
486 * level SOL_SOCKET to determine whether connect() completed
487 * successfully (if SO_ERROR is zero).
488 */
489 int value;
490 socklen_t vallen = sizeof(value);
491 int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
492 &value, &vallen);
493 if (gs_rc != 0) {
494 /* Argh!! What to do now???
495 * Just indicate that the socket is connected. The
496 * application will get error as soon as it tries to use
497 * the socket to send/receive.
498 */
499 bytes_transfered = 0;
500 } else {
501 bytes_transfered = value;
502 }
503#elif defined(PJ_WIN32) && PJ_WIN32!=0
504 bytes_transfered = 0; /* success */
505#else
506 /* Excellent information in D.J. Bernstein page:
507 * http://cr.yp.to/docs/connect.html
508 *
509 * Seems like the most portable way of detecting connect()
510 * failure is to call getpeername(). If socket is connected,
511 * getpeername() will return 0. If the socket is not connected,
512 * it will return ENOTCONN, and read(fd, &ch, 1) will produce
513 * the right errno through error slippage. This is a combination
514 * of suggestions from Douglas C. Schmidt and Ken Keys.
515 */
516 int gp_rc;
517 struct sockaddr_in addr;
518 socklen_t addrlen = sizeof(addr);
519
520 gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
521 bytes_transfered = gp_rc;
522#endif
523
524 /* Clear operation. */
525 h->connecting = 0;
526 PJ_FD_CLR(h->fd, &ioqueue->wfdset);
527 PJ_FD_CLR(h->fd, &ioqueue->xfdset);
528
529 /* Call callback. */
530 if (h->cb.on_connect_complete)
531 (*h->cb.on_connect_complete)(h, bytes_transfered);
532
533 /* Re-scan writable sockets. */
534 goto do_writable_scan;
535
536 } else
537#endif /* PJ_HAS_TCP */
538 {
539 /* Socket is writable. */
540 struct write_operation *write_op;
541 pj_ssize_t sent;
542 pj_status_t send_rc;
543
544 /* Get the first in the queue. */
545 write_op = h->write_list.next;
546
547 /* Send the data. */
548 sent = write_op->size - write_op->written;
549 if (write_op->op == PJ_IOQUEUE_OP_SEND) {
550 send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
551 &sent, write_op->flags);
552 } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
553 send_rc = pj_sock_sendto(h->fd,
554 write_op->buf+write_op->written,
555 &sent, write_op->flags,
556 &write_op->rmt_addr,
557 write_op->rmt_addrlen);
558 } else {
559 pj_assert(!"Invalid operation type!");
560 send_rc = PJ_EBUG;
561 }
562
563 if (send_rc == PJ_SUCCESS) {
564 write_op->written += sent;
565 } else {
566 pj_assert(send_rc > 0);
567 write_op->written = -send_rc;
568 }
569
570 /* In any case we don't need to process this descriptor again. */
571 PJ_FD_CLR(h->fd, &wfdset);
572
573 /* Are we finished with this buffer? */
574 if (send_rc!=PJ_SUCCESS ||
575 write_op->written == (pj_ssize_t)write_op->size)
576 {
577 pj_list_erase(write_op);
578
579 /* Clear operation if there's no more data to send. */
580 if (pj_list_empty(&h->write_list))
581 PJ_FD_CLR(h->fd, &ioqueue->wfdset);
582
583 /* Call callback. */
584 if (h->cb.on_write_complete) {
585 (*h->cb.on_write_complete)(h,
586 (pj_ioqueue_op_key_t*)write_op,
587 write_op->written);
588 }
589 }
590
591 /* Re-scan writable sockets. */
592 goto do_writable_scan;
593 }
594 }
595
Benny Prijonodd859a62005-11-01 16:42:51 +0000596 /* Scan for readable socket. */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000597 h = ioqueue->key_list.next;
Benny Prijonodd859a62005-11-01 16:42:51 +0000598do_readable_scan:
Benny Prijonoa9946d52005-11-06 09:37:47 +0000599 for ( ; h!=&ioqueue->key_list; h = h->next) {
600 if ((!pj_list_empty(&h->read_list)
601#if PJ_HAS_TCP
602 || !pj_list_empty(&h->accept_list)
603#endif
604 ) && PJ_FD_ISSET(h->fd, &rfdset))
Benny Prijonodd859a62005-11-01 16:42:51 +0000605 {
606 break;
607 }
608 }
Benny Prijonoa9946d52005-11-06 09:37:47 +0000609 if (h != &ioqueue->key_list) {
Benny Prijonodd859a62005-11-01 16:42:51 +0000610 pj_status_t rc;
611
Benny Prijonoa9946d52005-11-06 09:37:47 +0000612#if PJ_HAS_TCP
613 pj_assert(!pj_list_empty(&h->read_list) ||
614 !pj_list_empty(&h->accept_list));
615#else
616 pj_assert(!pj_list_empty(&h->read_list));
617#endif
Benny Prijonodd859a62005-11-01 16:42:51 +0000618
619# if PJ_HAS_TCP
Benny Prijonoa9946d52005-11-06 09:37:47 +0000620 if (!pj_list_empty(&h->accept_list)) {
621
622 struct accept_operation *accept_op;
623
624 /* Get one accept operation from the list. */
625 accept_op = h->accept_list.next;
626 pj_list_erase(accept_op);
Benny Prijonodd859a62005-11-01 16:42:51 +0000627
Benny Prijonoa9946d52005-11-06 09:37:47 +0000628 rc=pj_sock_accept(h->fd, accept_op->accept_fd,
629 accept_op->rmt_addr, accept_op->addrlen);
630 if (rc==PJ_SUCCESS && accept_op->local_addr) {
631 rc = pj_sock_getsockname(*accept_op->accept_fd,
632 accept_op->local_addr,
633 accept_op->addrlen);
Benny Prijonodd859a62005-11-01 16:42:51 +0000634 }
635
Benny Prijonoa9946d52005-11-06 09:37:47 +0000636 /* Clear bit in fdset if there is no more pending accept */
637 if (pj_list_empty(&h->accept_list))
638 PJ_FD_CLR(h->fd, &ioqueue->rfdset);
Benny Prijonodd859a62005-11-01 16:42:51 +0000639
640 /* Call callback. */
641 if (h->cb.on_accept_complete)
Benny Prijonoa9946d52005-11-06 09:37:47 +0000642 (*h->cb.on_accept_complete)(h, (pj_ioqueue_op_key_t*)accept_op,
643 *accept_op->accept_fd, rc);
Benny Prijonodd859a62005-11-01 16:42:51 +0000644
645 /* Re-scan readable sockets. */
646 goto do_readable_scan;
Benny Prijonoa9946d52005-11-06 09:37:47 +0000647 }
Benny Prijonodd859a62005-11-01 16:42:51 +0000648 else {
Benny Prijonoa9946d52005-11-06 09:37:47 +0000649# endif
650 struct read_operation *read_op;
651 pj_ssize_t bytes_read;
652
653 pj_assert(!pj_list_empty(&h->read_list));
654
655 /* Get one pending read operation from the list. */
656 read_op = h->read_list.next;
657 pj_list_erase(read_op);
Benny Prijonodd859a62005-11-01 16:42:51 +0000658
Benny Prijonoa9946d52005-11-06 09:37:47 +0000659 bytes_read = read_op->size;
660
661 if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
662 rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0,
663 read_op->rmt_addr,
664 read_op->rmt_addrlen);
665 } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
666 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
667 } else {
668 pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
Benny Prijonodd859a62005-11-01 16:42:51 +0000669 /*
670 * User has specified pj_ioqueue_read().
671 * On Win32, we should do ReadFile(). But because we got
672 * here because of select() anyway, user must have put a
673 * socket descriptor on h->fd, which in this case we can
674 * just call pj_sock_recv() instead of ReadFile().
675 * On Unix, user may put a file in h->fd, so we'll have
676 * to call read() here.
677 * This may not compile on systems which doesn't have
678 * read(). That's why we only specify PJ_LINUX here so
679 * that error is easier to catch.
680 */
681# if defined(PJ_WIN32) && PJ_WIN32 != 0
Benny Prijonoa9946d52005-11-06 09:37:47 +0000682 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
683 //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
684 // &bytes_read, NULL);
685# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
Benny Prijonodd859a62005-11-01 16:42:51 +0000686 bytes_read = read(h->fd, h->rd_buf, bytes_read);
687 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
688# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
689 bytes_read = sys_read(h->fd, h->rd_buf, bytes_read);
690 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
691# else
692# error "Implement read() for this platform!"
693# endif
694 }
695
696 if (rc != PJ_SUCCESS) {
697# if defined(PJ_WIN32) && PJ_WIN32 != 0
698 /* On Win32, for UDP, WSAECONNRESET on the receive side
699 * indicates that previous sending has triggered ICMP Port
700 * Unreachable message.
701 * But we wouldn't know at this point which one of previous
702 * key that has triggered the error, since UDP socket can
703 * be shared!
704 * So we'll just ignore it!
705 */
706
707 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
Benny Prijonoa9946d52005-11-06 09:37:47 +0000708 //PJ_LOG(4,(THIS_FILE,
709 // "Ignored ICMP port unreach. on key=%p", h));
Benny Prijonodd859a62005-11-01 16:42:51 +0000710 }
711# endif
712
713 /* In any case we would report this to caller. */
714 bytes_read = -rc;
715 }
Benny Prijonoa9946d52005-11-06 09:37:47 +0000716
717 /* Clear fdset if there is no pending read. */
718 if (pj_list_empty(&h->read_list))
719 PJ_FD_CLR(h->fd, &ioqueue->rfdset);
720
721 /* In any case clear from temporary set. */
Benny Prijonodd859a62005-11-01 16:42:51 +0000722 PJ_FD_CLR(h->fd, &rfdset);
723
724 /* Call callback. */
725 if (h->cb.on_read_complete)
Benny Prijonoa9946d52005-11-06 09:37:47 +0000726 (*h->cb.on_read_complete)(h, (pj_ioqueue_op_key_t*)read_op,
727 bytes_read);
Benny Prijonodd859a62005-11-01 16:42:51 +0000728
729 /* Re-scan readable sockets. */
730 goto do_readable_scan;
731
732 }
733 }
Benny Prijonoa9946d52005-11-06 09:37:47 +0000734
735#if PJ_HAS_TCP
736 /* Scan for exception socket for TCP connection error. */
737 h = ioqueue->key_list.next;
738do_except_scan:
739 for ( ; h!=&ioqueue->key_list; h = h->next) {
740 if (h->connecting && PJ_FD_ISSET(h->fd, &xfdset))
741 break;
742 }
743 if (h != &ioqueue->key_list) {
744
745 pj_assert(h->connecting);
746
747 /* Clear operation. */
748 h->connecting = 0;
749 PJ_FD_CLR(h->fd, &ioqueue->wfdset);
750 PJ_FD_CLR(h->fd, &ioqueue->xfdset);
751 PJ_FD_CLR(h->fd, &wfdset);
752 PJ_FD_CLR(h->fd, &xfdset);
753
754 /* Call callback. */
755 if (h->cb.on_connect_complete)
756 (*h->cb.on_connect_complete)(h, -1);
757
758 /* Re-scan exception list. */
759 goto do_except_scan;
760 }
761#endif /* PJ_HAS_TCP */
762
Benny Prijonodd859a62005-11-01 16:42:51 +0000763 /* Shouldn't happen. */
764 /* For strange reason on WinXP select() can return 1 while there is no
765 * pj_fd_set_t signaled. */
766 /* pj_assert(0); */
767
768 //count = 0;
769
Benny Prijonoa9946d52005-11-06 09:37:47 +0000770 pj_lock_release(ioqueue->lock);
Benny Prijonodd859a62005-11-01 16:42:51 +0000771 return count;
772}
773
774/*
Benny Prijonodd859a62005-11-01 16:42:51 +0000775 * pj_ioqueue_recv()
776 *
777 * Start asynchronous recv() from the socket.
778 */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000779PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
780 pj_ioqueue_op_key_t *op_key,
Benny Prijonodd859a62005-11-01 16:42:51 +0000781 void *buffer,
Benny Prijonoa9946d52005-11-06 09:37:47 +0000782 pj_ssize_t *length,
Benny Prijonodd859a62005-11-01 16:42:51 +0000783 unsigned flags )
Benny Prijonoa9946d52005-11-06 09:37:47 +0000784{
785 pj_status_t status;
786 pj_ssize_t size;
787 struct read_operation *read_op;
788 pj_ioqueue_t *ioqueue;
789
790 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
Benny Prijonodd859a62005-11-01 16:42:51 +0000791 PJ_CHECK_STACK();
Benny Prijonoa9946d52005-11-06 09:37:47 +0000792
793 /* Try to see if there's data immediately available.
794 */
795 size = *length;
796 status = pj_sock_recv(key->fd, buffer, &size, flags);
797 if (status == PJ_SUCCESS) {
798 /* Yes! Data is available! */
799 *length = size;
800 return PJ_SUCCESS;
801 } else {
802 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
803 * the error to caller.
804 */
805 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
806 return status;
807 }
808
809 /*
810 * No data is immediately available.
811 * Must schedule asynchronous operation to the ioqueue.
812 */
813 ioqueue = key->ioqueue;
814 pj_lock_acquire(ioqueue->lock);
815
816 read_op = (struct read_operation*)op_key;
Benny Prijonodd859a62005-11-01 16:42:51 +0000817
Benny Prijonoa9946d52005-11-06 09:37:47 +0000818 read_op->op = PJ_IOQUEUE_OP_RECV;
819 read_op->buf = buffer;
820 read_op->size = *length;
821 read_op->flags = flags;
822
823 pj_list_insert_before(&key->read_list, read_op);
824 PJ_FD_SET(key->fd, &ioqueue->rfdset);
Benny Prijonodd859a62005-11-01 16:42:51 +0000825
Benny Prijonoa9946d52005-11-06 09:37:47 +0000826 pj_lock_release(ioqueue->lock);
Benny Prijonodd859a62005-11-01 16:42:51 +0000827 return PJ_EPENDING;
828}
829
830/*
831 * pj_ioqueue_recvfrom()
832 *
833 * Start asynchronous recvfrom() from the socket.
834 */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000835PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
836 pj_ioqueue_op_key_t *op_key,
Benny Prijonodd859a62005-11-01 16:42:51 +0000837 void *buffer,
Benny Prijonoa9946d52005-11-06 09:37:47 +0000838 pj_ssize_t *length,
Benny Prijonodd859a62005-11-01 16:42:51 +0000839 unsigned flags,
840 pj_sockaddr_t *addr,
841 int *addrlen)
842{
Benny Prijonoa9946d52005-11-06 09:37:47 +0000843 pj_status_t status;
844 pj_ssize_t size;
845 struct read_operation *read_op;
846 pj_ioqueue_t *ioqueue;
847
848 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
849 PJ_CHECK_STACK();
850
851 /* Try to see if there's data immediately available.
852 */
853 size = *length;
854 status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
855 addr, addrlen);
856 if (status == PJ_SUCCESS) {
857 /* Yes! Data is available! */
858 *length = size;
859 return PJ_SUCCESS;
860 } else {
861 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
862 * the error to caller.
863 */
864 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
865 return status;
866 }
867
868 /*
869 * No data is immediately available.
870 * Must schedule asynchronous operation to the ioqueue.
871 */
872 ioqueue = key->ioqueue;
873 pj_lock_acquire(ioqueue->lock);
874
875 read_op = (struct read_operation*)op_key;
876
877 read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
878 read_op->buf = buffer;
879 read_op->size = *length;
880 read_op->flags = flags;
881 read_op->rmt_addr = addr;
882 read_op->rmt_addrlen = addrlen;
883
884 pj_list_insert_before(&key->read_list, read_op);
885 PJ_FD_SET(key->fd, &ioqueue->rfdset);
886
887 pj_lock_release(ioqueue->lock);
888 return PJ_EPENDING;
Benny Prijonodd859a62005-11-01 16:42:51 +0000889}
890
891/*
892 * pj_ioqueue_send()
893 *
894 * Start asynchronous send() to the descriptor.
895 */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000896PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
897 pj_ioqueue_op_key_t *op_key,
Benny Prijonodd859a62005-11-01 16:42:51 +0000898 const void *data,
Benny Prijonoa9946d52005-11-06 09:37:47 +0000899 pj_ssize_t *length,
Benny Prijonodd859a62005-11-01 16:42:51 +0000900 unsigned flags)
Benny Prijonoa9946d52005-11-06 09:37:47 +0000901{
902 pj_ioqueue_t *ioqueue;
903 struct write_operation *write_op;
904 pj_status_t status;
Benny Prijonodd859a62005-11-01 16:42:51 +0000905 pj_ssize_t sent;
906
Benny Prijonoa9946d52005-11-06 09:37:47 +0000907 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
Benny Prijonodd859a62005-11-01 16:42:51 +0000908 PJ_CHECK_STACK();
Benny Prijonoa9946d52005-11-06 09:37:47 +0000909
910 /* Fast track:
911 * Try to send data immediately, only if there's no pending write!
912 * Note:
913 * We are speculating that the list is empty here without properly
914 * acquiring ioqueue's mutex first. This is intentional, to maximize
915 * performance via parallelism.
916 *
917 * This should be safe, because:
918 * - by convention, we require caller to make sure that the
919 * key is not unregistered while other threads are invoking
920 * an operation on the same key.
921 * - pj_list_empty() is safe to be invoked by multiple threads,
922 * even when other threads are modifying the list.
923 */
924 if (pj_list_empty(&key->write_list)) {
925 /*
926 * See if data can be sent immediately.
927 */
928 sent = *length;
929 status = pj_sock_send(key->fd, data, &sent, flags);
930 if (status == PJ_SUCCESS) {
931 /* Success! */
932 *length = sent;
933 return PJ_SUCCESS;
934 } else {
935 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
936 * the error to caller.
937 */
938 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
939 return status;
940 }
941 }
Benny Prijonodd859a62005-11-01 16:42:51 +0000942 }
Benny Prijonoa9946d52005-11-06 09:37:47 +0000943
944 /*
945 * Schedule asynchronous send.
946 */
947 ioqueue = key->ioqueue;
948 pj_lock_acquire(ioqueue->lock);
949
950 write_op = (struct write_operation*)op_key;
951 write_op->op = PJ_IOQUEUE_OP_SEND;
952 write_op->buf = NULL;
953 write_op->size = *length;
954 write_op->written = 0;
955 write_op->flags = flags;
956
957 pj_list_insert_before(&key->write_list, write_op);
958 PJ_FD_SET(key->fd, &ioqueue->wfdset);
Benny Prijonodd859a62005-11-01 16:42:51 +0000959
Benny Prijonoa9946d52005-11-06 09:37:47 +0000960 pj_lock_release(ioqueue->lock);
Benny Prijonodd859a62005-11-01 16:42:51 +0000961
962 return PJ_EPENDING;
963}
964
965
966/*
967 * pj_ioqueue_sendto()
968 *
969 * Start asynchronous write() to the descriptor.
970 */
Benny Prijonoa9946d52005-11-06 09:37:47 +0000971PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
972 pj_ioqueue_op_key_t *op_key,
Benny Prijonodd859a62005-11-01 16:42:51 +0000973 const void *data,
Benny Prijonoa9946d52005-11-06 09:37:47 +0000974 pj_ssize_t *length,
Benny Prijonodd859a62005-11-01 16:42:51 +0000975 unsigned flags,
976 const pj_sockaddr_t *addr,
977 int addrlen)
978{
Benny Prijonoa9946d52005-11-06 09:37:47 +0000979 pj_ioqueue_t *ioqueue;
980 struct write_operation *write_op;
981 pj_status_t status;
982 pj_ssize_t sent;
983
984 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
985 PJ_CHECK_STACK();
986
987 /* Fast track:
988 * Try to send data immediately, only if there's no pending write!
989 * Note:
990 * We are speculating that the list is empty here without properly
991 * acquiring ioqueue's mutex first. This is intentional, to maximize
992 * performance via parallelism.
993 *
994 * This should be safe, because:
995 * - by convention, we require caller to make sure that the
996 * key is not unregistered while other threads are invoking
997 * an operation on the same key.
998 * - pj_list_empty() is safe to be invoked by multiple threads,
999 * even when other threads are modifying the list.
1000 */
1001 if (pj_list_empty(&key->write_list)) {
1002 /*
1003 * See if data can be sent immediately.
1004 */
1005 sent = *length;
1006 status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
1007 if (status == PJ_SUCCESS) {
1008 /* Success! */
1009 *length = sent;
1010 return PJ_SUCCESS;
1011 } else {
1012 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
1013 * the error to caller.
1014 */
1015 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
1016 return status;
1017 }
1018 }
1019 }
1020
1021 /*
1022 * Check that address storage can hold the address parameter.
1023 */
1024 PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);
1025
1026 /*
1027 * Schedule asynchronous send.
1028 */
1029 ioqueue = key->ioqueue;
1030 pj_lock_acquire(ioqueue->lock);
1031
1032 write_op = (struct write_operation*)op_key;
1033 write_op->op = PJ_IOQUEUE_OP_SEND_TO;
1034 write_op->buf = NULL;
1035 write_op->size = *length;
1036 write_op->written = 0;
1037 write_op->flags = flags;
1038 pj_memcpy(&write_op->rmt_addr, addr, addrlen);
1039 write_op->rmt_addrlen = addrlen;
1040
1041 pj_list_insert_before(&key->write_list, write_op);
1042 PJ_FD_SET(key->fd, &ioqueue->wfdset);
1043
1044 pj_lock_release(ioqueue->lock);
1045
1046 return PJ_EPENDING;
Benny Prijonodd859a62005-11-01 16:42:51 +00001047}
1048
1049#if PJ_HAS_TCP
1050/*
1051 * Initiate overlapped accept() operation.
1052 */
Benny Prijonoa9946d52005-11-06 09:37:47 +00001053PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
1054 pj_ioqueue_op_key_t *op_key,
1055 pj_sock_t *new_sock,
1056 pj_sockaddr_t *local,
1057 pj_sockaddr_t *remote,
1058 int *addrlen)
1059{
1060 pj_ioqueue_t *ioqueue;
1061 struct accept_operation *accept_op;
1062 pj_status_t status;
1063
Benny Prijonodd859a62005-11-01 16:42:51 +00001064 /* check parameters. All must be specified! */
Benny Prijonoa9946d52005-11-06 09:37:47 +00001065 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
Benny Prijonodd859a62005-11-01 16:42:51 +00001066
Benny Prijonoa9946d52005-11-06 09:37:47 +00001067 /* Fast track:
1068 * See if there's new connection available immediately.
1069 */
1070 if (pj_list_empty(&key->accept_list)) {
1071 status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
1072 if (status == PJ_SUCCESS) {
1073 /* Yes! New connection is available! */
1074 if (local && addrlen) {
1075 status = pj_sock_getsockname(*new_sock, local, addrlen);
1076 if (status != PJ_SUCCESS) {
1077 pj_sock_close(*new_sock);
1078 *new_sock = PJ_INVALID_SOCKET;
1079 return status;
1080 }
1081 }
1082 return PJ_SUCCESS;
1083 } else {
1084 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
1085 * the error to caller.
1086 */
1087 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
1088 return status;
1089 }
1090 }
1091 }
1092
1093 /*
1094 * No connection is available immediately.
1095 * Schedule accept() operation to be completed when there is incoming
1096 * connection available.
1097 */
1098 ioqueue = key->ioqueue;
1099 accept_op = (struct accept_operation*)op_key;
Benny Prijonodd859a62005-11-01 16:42:51 +00001100
Benny Prijonoa9946d52005-11-06 09:37:47 +00001101 pj_lock_acquire(ioqueue->lock);
Benny Prijonodd859a62005-11-01 16:42:51 +00001102
Benny Prijonoa9946d52005-11-06 09:37:47 +00001103 accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
1104 accept_op->accept_fd = new_sock;
1105 accept_op->rmt_addr = remote;
1106 accept_op->addrlen= addrlen;
1107 accept_op->local_addr = local;
1108
1109 pj_list_insert_before(&key->accept_list, accept_op);
Benny Prijonodd859a62005-11-01 16:42:51 +00001110 PJ_FD_SET(key->fd, &ioqueue->rfdset);
Benny Prijonoa9946d52005-11-06 09:37:47 +00001111
1112 pj_lock_release(ioqueue->lock);
Benny Prijonodd859a62005-11-01 16:42:51 +00001113
Benny Prijonodd859a62005-11-01 16:42:51 +00001114 return PJ_EPENDING;
1115}
1116
1117/*
1118 * Initiate overlapped connect() operation (well, it's non-blocking actually,
1119 * since there's no overlapped version of connect()).
1120 */
Benny Prijonoa9946d52005-11-06 09:37:47 +00001121PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
Benny Prijonodd859a62005-11-01 16:42:51 +00001122 const pj_sockaddr_t *addr,
1123 int addrlen )
Benny Prijonoa9946d52005-11-06 09:37:47 +00001124{
1125 pj_ioqueue_t *ioqueue;
1126 pj_status_t status;
Benny Prijonodd859a62005-11-01 16:42:51 +00001127
1128 /* check parameters. All must be specified! */
Benny Prijonoa9946d52005-11-06 09:37:47 +00001129 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
Benny Prijonodd859a62005-11-01 16:42:51 +00001130
Benny Prijonoa9946d52005-11-06 09:37:47 +00001131 /* Check if socket has not been marked for connecting */
1132 if (key->connecting != 0)
1133 return PJ_EPENDING;
Benny Prijonodd859a62005-11-01 16:42:51 +00001134
Benny Prijonoa9946d52005-11-06 09:37:47 +00001135 status = pj_sock_connect(key->fd, addr, addrlen);
1136 if (status == PJ_SUCCESS) {
Benny Prijonodd859a62005-11-01 16:42:51 +00001137 /* Connected! */
1138 return PJ_SUCCESS;
1139 } else {
Benny Prijonoa9946d52005-11-06 09:37:47 +00001140 if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
1141 /* Pending! */
1142 ioqueue = key->ioqueue;
Benny Prijonodd859a62005-11-01 16:42:51 +00001143 pj_lock_acquire(ioqueue->lock);
Benny Prijonoa9946d52005-11-06 09:37:47 +00001144 key->connecting = PJ_TRUE;
Benny Prijonodd859a62005-11-01 16:42:51 +00001145 PJ_FD_SET(key->fd, &ioqueue->wfdset);
1146 PJ_FD_SET(key->fd, &ioqueue->xfdset);
1147 pj_lock_release(ioqueue->lock);
1148 return PJ_EPENDING;
1149 } else {
1150 /* Error! */
Benny Prijonoa9946d52005-11-06 09:37:47 +00001151 return status;
Benny Prijonodd859a62005-11-01 16:42:51 +00001152 }
1153 }
1154}
1155#endif /* PJ_HAS_TCP */
1156