blob: fdd1afe750f00fcc27c8b376e6a96de5a343f111 [file] [log] [blame]
Benny Prijono9033e312005-11-21 02:08:39 +00001/* $Id$ */
2/*
Benny Prijonoa771a512007-02-19 01:13:53 +00003 * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org>
Benny Prijono9033e312005-11-21 02:08:39 +00004 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19
20/*
21 * ioqueue_common_abs.c
22 *
23 * This contains common functionalities to emulate proactor pattern with
24 * various event dispatching mechanisms (e.g. select, epoll).
25 *
26 * This file will be included by the appropriate ioqueue implementation.
27 * This file is NOT supposed to be compiled as stand-alone source.
28 */
29
Benny Prijono40212582006-06-22 18:41:28 +000030#define PENDING_RETRY 2
31
Benny Prijono9033e312005-11-21 02:08:39 +000032static void ioqueue_init( pj_ioqueue_t *ioqueue )
33{
34 ioqueue->lock = NULL;
35 ioqueue->auto_delete_lock = 0;
36}
37
38static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
39{
40 if (ioqueue->auto_delete_lock && ioqueue->lock ) {
41 pj_lock_release(ioqueue->lock);
42 return pj_lock_destroy(ioqueue->lock);
43 } else
44 return PJ_SUCCESS;
45}
46
47/*
48 * pj_ioqueue_set_lock()
49 */
50PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
51 pj_lock_t *lock,
52 pj_bool_t auto_delete )
53{
54 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
55
56 if (ioqueue->auto_delete_lock && ioqueue->lock) {
57 pj_lock_destroy(ioqueue->lock);
58 }
59
60 ioqueue->lock = lock;
61 ioqueue->auto_delete_lock = auto_delete;
62
63 return PJ_SUCCESS;
64}
65
66static pj_status_t ioqueue_init_key( pj_pool_t *pool,
67 pj_ioqueue_t *ioqueue,
68 pj_ioqueue_key_t *key,
69 pj_sock_t sock,
70 void *user_data,
71 const pj_ioqueue_callback *cb)
72{
73 pj_status_t rc;
74 int optlen;
75
Benny Prijono8befd9f2006-05-13 22:46:23 +000076 PJ_UNUSED_ARG(pool);
77
Benny Prijono9033e312005-11-21 02:08:39 +000078 key->ioqueue = ioqueue;
79 key->fd = sock;
80 key->user_data = user_data;
81 pj_list_init(&key->read_list);
82 pj_list_init(&key->write_list);
83#if PJ_HAS_TCP
84 pj_list_init(&key->accept_list);
Benny Prijono5accbd02006-03-30 16:32:18 +000085 key->connecting = 0;
Benny Prijono9033e312005-11-21 02:08:39 +000086#endif
87
88 /* Save callback. */
89 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
90
Benny Prijono5accbd02006-03-30 16:32:18 +000091#if PJ_IOQUEUE_HAS_SAFE_UNREG
92 /* Set initial reference count to 1 */
93 pj_assert(key->ref_count == 0);
94 ++key->ref_count;
95
96 key->closing = 0;
97#endif
98
Benny Prijono9033e312005-11-21 02:08:39 +000099 /* Get socket type. When socket type is datagram, some optimization
100 * will be performed during send to allow parallel send operations.
101 */
102 optlen = sizeof(key->fd_type);
Benny Prijono8ab968f2007-07-20 08:08:30 +0000103 rc = pj_sock_getsockopt(sock, pj_SOL_SOCKET(), pj_SO_TYPE(),
Benny Prijono9033e312005-11-21 02:08:39 +0000104 &key->fd_type, &optlen);
105 if (rc != PJ_SUCCESS)
Benny Prijono8ab968f2007-07-20 08:08:30 +0000106 key->fd_type = pj_SOCK_STREAM();
Benny Prijono9033e312005-11-21 02:08:39 +0000107
108 /* Create mutex for the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000109#if !PJ_IOQUEUE_HAS_SAFE_UNREG
110 rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
111#endif
Benny Prijono9033e312005-11-21 02:08:39 +0000112
113 return rc;
114}
115
Benny Prijono9033e312005-11-21 02:08:39 +0000116/*
117 * pj_ioqueue_get_user_data()
118 *
119 * Obtain value associated with a key.
120 */
121PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
122{
123 PJ_ASSERT_RETURN(key != NULL, NULL);
124 return key->user_data;
125}
126
127/*
128 * pj_ioqueue_set_user_data()
129 */
130PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
131 void *user_data,
132 void **old_data)
133{
134 PJ_ASSERT_RETURN(key, PJ_EINVAL);
135
136 if (old_data)
137 *old_data = key->user_data;
138 key->user_data = user_data;
139
140 return PJ_SUCCESS;
141}
142
143PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
144{
145 return !pj_list_empty(&key->write_list);
146}
147
148PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
149{
150 return !pj_list_empty(&key->read_list);
151}
152
153PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
154{
155#if PJ_HAS_TCP
156 return !pj_list_empty(&key->accept_list);
157#else
Benny Prijono3569c0d2007-04-06 10:29:20 +0000158 PJ_UNUSED_ARG(key);
Benny Prijono9033e312005-11-21 02:08:39 +0000159 return 0;
160#endif
161}
162
163PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
164{
165 return key->connecting;
166}
167
168
Benny Prijono5accbd02006-03-30 16:32:18 +0000169#if PJ_IOQUEUE_HAS_SAFE_UNREG
170# define IS_CLOSING(key) (key->closing)
171#else
172# define IS_CLOSING(key) (0)
173#endif
174
175
Benny Prijono9033e312005-11-21 02:08:39 +0000176/*
177 * ioqueue_dispatch_event()
178 *
179 * Report occurence of an event in the key to be processed by the
180 * framework.
181 */
182void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
183{
184 /* Lock the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000185 pj_mutex_lock(h->mutex);
186
Benny Prijono3059eb62006-10-04 20:46:27 +0000187 if (IS_CLOSING(h)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000188 pj_mutex_unlock(h->mutex);
189 return;
190 }
Benny Prijono9033e312005-11-21 02:08:39 +0000191
192#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
193 if (h->connecting) {
194 /* Completion of connect() operation */
195 pj_ssize_t bytes_transfered;
196
197 /* Clear operation. */
198 h->connecting = 0;
199
Benny Prijono63ab3562006-07-08 19:46:43 +0000200 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
201 ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000202
Benny Prijono9033e312005-11-21 02:08:39 +0000203
204#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
205 /* from connect(2):
206 * On Linux, use getsockopt to read the SO_ERROR option at
207 * level SOL_SOCKET to determine whether connect() completed
208 * successfully (if SO_ERROR is zero).
209 */
210 {
211 int value;
Benny Prijono7db431e2006-07-23 14:38:49 +0000212 int vallen = sizeof(value);
Benny Prijono2bbd7102006-07-18 00:10:53 +0000213 int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
214 &value, &vallen);
Benny Prijono9033e312005-11-21 02:08:39 +0000215 if (gs_rc != 0) {
216 /* Argh!! What to do now???
217 * Just indicate that the socket is connected. The
218 * application will get error as soon as it tries to use
219 * the socket to send/receive.
220 */
221 bytes_transfered = 0;
222 } else {
223 bytes_transfered = value;
224 }
225 }
226#elif defined(PJ_WIN32) && PJ_WIN32!=0
227 bytes_transfered = 0; /* success */
228#else
229 /* Excellent information in D.J. Bernstein page:
230 * http://cr.yp.to/docs/connect.html
231 *
232 * Seems like the most portable way of detecting connect()
233 * failure is to call getpeername(). If socket is connected,
234 * getpeername() will return 0. If the socket is not connected,
235 * it will return ENOTCONN, and read(fd, &ch, 1) will produce
236 * the right errno through error slippage. This is a combination
237 * of suggestions from Douglas C. Schmidt and Ken Keys.
238 */
Benny Prijono9cf138e2006-01-19 03:58:29 +0000239 {
240 int gp_rc;
241 struct sockaddr_in addr;
242 socklen_t addrlen = sizeof(addr);
Benny Prijono9033e312005-11-21 02:08:39 +0000243
Benny Prijono9cf138e2006-01-19 03:58:29 +0000244 gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
Benny Prijono42c5b9e2006-05-10 19:24:40 +0000245 bytes_transfered = (gp_rc < 0) ? gp_rc : -gp_rc;
Benny Prijono9cf138e2006-01-19 03:58:29 +0000246 }
Benny Prijono9033e312005-11-21 02:08:39 +0000247#endif
248
Benny Prijono5accbd02006-03-30 16:32:18 +0000249 /* Unlock; from this point we don't need to hold key's mutex. */
250 pj_mutex_unlock(h->mutex);
251
Benny Prijono9033e312005-11-21 02:08:39 +0000252 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000253 if (h->cb.on_connect_complete && !IS_CLOSING(h))
Benny Prijono9033e312005-11-21 02:08:39 +0000254 (*h->cb.on_connect_complete)(h, bytes_transfered);
255
256 /* Done. */
257
258 } else
259#endif /* PJ_HAS_TCP */
260 if (key_has_pending_write(h)) {
261 /* Socket is writable. */
262 struct write_operation *write_op;
263 pj_ssize_t sent;
264 pj_status_t send_rc;
265
266 /* Get the first in the queue. */
267 write_op = h->write_list.next;
268
269 /* For datagrams, we can remove the write_op from the list
270 * so that send() can work in parallel.
271 */
Benny Prijono8ab968f2007-07-20 08:08:30 +0000272 if (h->fd_type == pj_SOCK_DGRAM()) {
Benny Prijono9033e312005-11-21 02:08:39 +0000273 pj_list_erase(write_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000274
275 if (pj_list_empty(&h->write_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000276 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000277
Benny Prijono9033e312005-11-21 02:08:39 +0000278 }
279
280 /* Send the data.
281 * Unfortunately we must do this while holding key's mutex, thus
282 * preventing parallel write on a single key.. :-((
283 */
284 sent = write_op->size - write_op->written;
285 if (write_op->op == PJ_IOQUEUE_OP_SEND) {
286 send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
287 &sent, write_op->flags);
Benny Prijono40212582006-06-22 18:41:28 +0000288 /* Can't do this. We only clear "op" after we're finished sending
289 * the whole buffer.
290 */
291 //write_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000292 } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
293 send_rc = pj_sock_sendto(h->fd,
294 write_op->buf+write_op->written,
295 &sent, write_op->flags,
296 &write_op->rmt_addr,
297 write_op->rmt_addrlen);
Benny Prijono40212582006-06-22 18:41:28 +0000298 /* Can't do this. We only clear "op" after we're finished sending
299 * the whole buffer.
300 */
301 //write_op->op = 0;
Benny Prijono9033e312005-11-21 02:08:39 +0000302 } else {
303 pj_assert(!"Invalid operation type!");
Benny Prijonoa1e69682007-05-11 15:14:34 +0000304 write_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000305 send_rc = PJ_EBUG;
306 }
307
308 if (send_rc == PJ_SUCCESS) {
309 write_op->written += sent;
310 } else {
311 pj_assert(send_rc > 0);
312 write_op->written = -send_rc;
313 }
314
315 /* Are we finished with this buffer? */
316 if (send_rc!=PJ_SUCCESS ||
317 write_op->written == (pj_ssize_t)write_op->size ||
Benny Prijono8ab968f2007-07-20 08:08:30 +0000318 h->fd_type == pj_SOCK_DGRAM())
Benny Prijono9033e312005-11-21 02:08:39 +0000319 {
Benny Prijono40212582006-06-22 18:41:28 +0000320
Benny Prijonoa1e69682007-05-11 15:14:34 +0000321 write_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono40212582006-06-22 18:41:28 +0000322
Benny Prijono8ab968f2007-07-20 08:08:30 +0000323 if (h->fd_type != pj_SOCK_DGRAM()) {
Benny Prijono9033e312005-11-21 02:08:39 +0000324 /* Write completion of the whole stream. */
325 pj_list_erase(write_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000326
327 /* Clear operation if there's no more data to send. */
328 if (pj_list_empty(&h->write_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000329 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000330
Benny Prijono9033e312005-11-21 02:08:39 +0000331 }
332
Benny Prijono5accbd02006-03-30 16:32:18 +0000333 /* No need to hold mutex anymore */
334 pj_mutex_unlock(h->mutex);
335
Benny Prijono9033e312005-11-21 02:08:39 +0000336 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000337 if (h->cb.on_write_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000338 (*h->cb.on_write_complete)(h,
339 (pj_ioqueue_op_key_t*)write_op,
340 write_op->written);
341 }
342
343 } else {
Benny Prijono5accbd02006-03-30 16:32:18 +0000344 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000345 }
346
347 /* Done. */
348 } else {
349 /*
350 * This is normal; execution may fall here when multiple threads
351 * are signalled for the same event, but only one thread eventually
352 * able to process the event.
353 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000354 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000355 }
356}
357
358void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
359{
360 pj_status_t rc;
361
362 /* Lock the key. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000363 pj_mutex_lock(h->mutex);
364
Benny Prijono3059eb62006-10-04 20:46:27 +0000365 if (IS_CLOSING(h)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000366 pj_mutex_unlock(h->mutex);
367 return;
368 }
Benny Prijono9033e312005-11-21 02:08:39 +0000369
370# if PJ_HAS_TCP
371 if (!pj_list_empty(&h->accept_list)) {
372
373 struct accept_operation *accept_op;
374
375 /* Get one accept operation from the list. */
376 accept_op = h->accept_list.next;
377 pj_list_erase(accept_op);
Benny Prijonoa1e69682007-05-11 15:14:34 +0000378 accept_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000379
380 /* Clear bit in fdset if there is no more pending accept */
381 if (pj_list_empty(&h->accept_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000382 ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000383
Benny Prijono9033e312005-11-21 02:08:39 +0000384 rc=pj_sock_accept(h->fd, accept_op->accept_fd,
385 accept_op->rmt_addr, accept_op->addrlen);
386 if (rc==PJ_SUCCESS && accept_op->local_addr) {
387 rc = pj_sock_getsockname(*accept_op->accept_fd,
388 accept_op->local_addr,
389 accept_op->addrlen);
390 }
391
Benny Prijono5accbd02006-03-30 16:32:18 +0000392 /* Unlock; from this point we don't need to hold key's mutex. */
393 pj_mutex_unlock(h->mutex);
394
Benny Prijono9033e312005-11-21 02:08:39 +0000395 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000396 if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000397 (*h->cb.on_accept_complete)(h,
398 (pj_ioqueue_op_key_t*)accept_op,
399 *accept_op->accept_fd, rc);
400 }
401
402 }
403 else
404# endif
405 if (key_has_pending_read(h)) {
406 struct read_operation *read_op;
407 pj_ssize_t bytes_read;
408
409 /* Get one pending read operation from the list. */
410 read_op = h->read_list.next;
411 pj_list_erase(read_op);
Benny Prijono9033e312005-11-21 02:08:39 +0000412
413 /* Clear fdset if there is no pending read. */
414 if (pj_list_empty(&h->read_list))
Benny Prijono63ab3562006-07-08 19:46:43 +0000415 ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000416
Benny Prijono9033e312005-11-21 02:08:39 +0000417 bytes_read = read_op->size;
418
419 if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
Benny Prijonoa1e69682007-05-11 15:14:34 +0000420 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijonof6b95302006-12-25 06:36:23 +0000421 rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read,
422 read_op->flags,
Benny Prijono9033e312005-11-21 02:08:39 +0000423 read_op->rmt_addr,
424 read_op->rmt_addrlen);
425 } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
Benny Prijonoa1e69682007-05-11 15:14:34 +0000426 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijonof6b95302006-12-25 06:36:23 +0000427 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
428 read_op->flags);
Benny Prijono9033e312005-11-21 02:08:39 +0000429 } else {
430 pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
Benny Prijonoa1e69682007-05-11 15:14:34 +0000431 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000432 /*
433 * User has specified pj_ioqueue_read().
434 * On Win32, we should do ReadFile(). But because we got
435 * here because of select() anyway, user must have put a
436 * socket descriptor on h->fd, which in this case we can
437 * just call pj_sock_recv() instead of ReadFile().
438 * On Unix, user may put a file in h->fd, so we'll have
439 * to call read() here.
440 * This may not compile on systems which doesn't have
441 * read(). That's why we only specify PJ_LINUX here so
442 * that error is easier to catch.
443 */
Benny Prijono9cf138e2006-01-19 03:58:29 +0000444# if defined(PJ_WIN32) && PJ_WIN32 != 0 || \
445 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE != 0
Benny Prijonof6b95302006-12-25 06:36:23 +0000446 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
447 read_op->flags);
Benny Prijono9033e312005-11-21 02:08:39 +0000448 //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
449 // &bytes_read, NULL);
450# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
451 bytes_read = read(h->fd, read_op->buf, bytes_read);
452 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
453# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
454 bytes_read = sys_read(h->fd, read_op->buf, bytes_read);
455 rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
456# else
457# error "Implement read() for this platform!"
458# endif
459 }
460
461 if (rc != PJ_SUCCESS) {
462# if defined(PJ_WIN32) && PJ_WIN32 != 0
463 /* On Win32, for UDP, WSAECONNRESET on the receive side
464 * indicates that previous sending has triggered ICMP Port
465 * Unreachable message.
466 * But we wouldn't know at this point which one of previous
467 * key that has triggered the error, since UDP socket can
468 * be shared!
469 * So we'll just ignore it!
470 */
471
472 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
473 //PJ_LOG(4,(THIS_FILE,
474 // "Ignored ICMP port unreach. on key=%p", h));
475 }
476# endif
477
478 /* In any case we would report this to caller. */
479 bytes_read = -rc;
480 }
481
Benny Prijono5accbd02006-03-30 16:32:18 +0000482 /* Unlock; from this point we don't need to hold key's mutex. */
483 pj_mutex_unlock(h->mutex);
484
Benny Prijono9033e312005-11-21 02:08:39 +0000485 /* Call callback. */
Benny Prijono5accbd02006-03-30 16:32:18 +0000486 if (h->cb.on_read_complete && !IS_CLOSING(h)) {
Benny Prijono9033e312005-11-21 02:08:39 +0000487 (*h->cb.on_read_complete)(h,
488 (pj_ioqueue_op_key_t*)read_op,
489 bytes_read);
490 }
491
492 } else {
493 /*
494 * This is normal; execution may fall here when multiple threads
495 * are signalled for the same event, but only one thread eventually
496 * able to process the event.
497 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000498 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000499 }
500}
501
502
503void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
504 pj_ioqueue_key_t *h )
505{
Benny Prijono5accbd02006-03-30 16:32:18 +0000506 pj_mutex_lock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000507
508 if (!h->connecting) {
509 /* It is possible that more than one thread was woken up, thus
510 * the remaining thread will see h->connecting as zero because
511 * it has been processed by other thread.
512 */
Benny Prijono5accbd02006-03-30 16:32:18 +0000513 pj_mutex_unlock(h->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +0000514 return;
515 }
516
Benny Prijono3059eb62006-10-04 20:46:27 +0000517 if (IS_CLOSING(h)) {
Benny Prijono5accbd02006-03-30 16:32:18 +0000518 pj_mutex_unlock(h->mutex);
519 return;
520 }
521
Benny Prijono9033e312005-11-21 02:08:39 +0000522 /* Clear operation. */
523 h->connecting = 0;
524
Benny Prijono63ab3562006-07-08 19:46:43 +0000525 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
526 ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000527
Benny Prijono5accbd02006-03-30 16:32:18 +0000528 pj_mutex_unlock(h->mutex);
Benny Prijonoac52df42006-03-25 10:06:00 +0000529
Benny Prijono5accbd02006-03-30 16:32:18 +0000530 /* Call callback. */
Benny Prijono2bbd7102006-07-18 00:10:53 +0000531 if (h->cb.on_connect_complete && !IS_CLOSING(h)) {
532 pj_status_t status = -1;
533#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
534 int value;
Benny Prijono7db431e2006-07-23 14:38:49 +0000535 int vallen = sizeof(value);
Benny Prijono2bbd7102006-07-18 00:10:53 +0000536 int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
537 &value, &vallen);
538 if (gs_rc == 0) {
539 status = PJ_RETURN_OS_ERROR(value);
540 }
541#endif
542
543 (*h->cb.on_connect_complete)(h, status);
544 }
Benny Prijono9033e312005-11-21 02:08:39 +0000545}
546
547/*
548 * pj_ioqueue_recv()
549 *
550 * Start asynchronous recv() from the socket.
551 */
552PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
553 pj_ioqueue_op_key_t *op_key,
554 void *buffer,
555 pj_ssize_t *length,
556 unsigned flags )
557{
558 struct read_operation *read_op;
559
560 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
561 PJ_CHECK_STACK();
562
563 read_op = (struct read_operation*)op_key;
Benny Prijonoa1e69682007-05-11 15:14:34 +0000564 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000565
Benny Prijono5accbd02006-03-30 16:32:18 +0000566 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000567 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000568 return PJ_ECANCELLED;
569
Benny Prijono9033e312005-11-21 02:08:39 +0000570 /* Try to see if there's data immediately available.
571 */
572 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
573 pj_status_t status;
574 pj_ssize_t size;
575
576 size = *length;
577 status = pj_sock_recv(key->fd, buffer, &size, flags);
578 if (status == PJ_SUCCESS) {
579 /* Yes! Data is available! */
580 *length = size;
581 return PJ_SUCCESS;
582 } else {
583 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
584 * the error to caller.
585 */
586 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
587 return status;
588 }
589 }
590
591 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
592
593 /*
594 * No data is immediately available.
595 * Must schedule asynchronous operation to the ioqueue.
596 */
597 read_op->op = PJ_IOQUEUE_OP_RECV;
598 read_op->buf = buffer;
599 read_op->size = *length;
600 read_op->flags = flags;
601
602 pj_mutex_lock(key->mutex);
603 pj_list_insert_before(&key->read_list, read_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000604 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000605 pj_mutex_unlock(key->mutex);
606
607 return PJ_EPENDING;
608}
609
610/*
611 * pj_ioqueue_recvfrom()
612 *
613 * Start asynchronous recvfrom() from the socket.
614 */
615PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
616 pj_ioqueue_op_key_t *op_key,
617 void *buffer,
618 pj_ssize_t *length,
619 unsigned flags,
620 pj_sockaddr_t *addr,
621 int *addrlen)
622{
623 struct read_operation *read_op;
624
625 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
626 PJ_CHECK_STACK();
627
Benny Prijono5accbd02006-03-30 16:32:18 +0000628 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000629 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000630 return PJ_ECANCELLED;
631
Benny Prijono9033e312005-11-21 02:08:39 +0000632 read_op = (struct read_operation*)op_key;
Benny Prijonoa1e69682007-05-11 15:14:34 +0000633 read_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000634
635 /* Try to see if there's data immediately available.
636 */
637 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
638 pj_status_t status;
639 pj_ssize_t size;
640
641 size = *length;
642 status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
643 addr, addrlen);
644 if (status == PJ_SUCCESS) {
645 /* Yes! Data is available! */
646 *length = size;
647 return PJ_SUCCESS;
648 } else {
649 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
650 * the error to caller.
651 */
652 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
653 return status;
654 }
655 }
656
657 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
658
659 /*
660 * No data is immediately available.
661 * Must schedule asynchronous operation to the ioqueue.
662 */
663 read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
664 read_op->buf = buffer;
665 read_op->size = *length;
666 read_op->flags = flags;
667 read_op->rmt_addr = addr;
668 read_op->rmt_addrlen = addrlen;
669
670 pj_mutex_lock(key->mutex);
671 pj_list_insert_before(&key->read_list, read_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000672 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000673 pj_mutex_unlock(key->mutex);
674
675 return PJ_EPENDING;
676}
677
678/*
679 * pj_ioqueue_send()
680 *
681 * Start asynchronous send() to the descriptor.
682 */
683PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
684 pj_ioqueue_op_key_t *op_key,
685 const void *data,
686 pj_ssize_t *length,
687 unsigned flags)
688{
689 struct write_operation *write_op;
690 pj_status_t status;
Benny Prijono40212582006-06-22 18:41:28 +0000691 unsigned retry;
Benny Prijono9033e312005-11-21 02:08:39 +0000692 pj_ssize_t sent;
693
694 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
695 PJ_CHECK_STACK();
696
Benny Prijono5accbd02006-03-30 16:32:18 +0000697 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000698 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000699 return PJ_ECANCELLED;
700
Benny Prijono9033e312005-11-21 02:08:39 +0000701 /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */
702 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
703
704 /* Fast track:
705 * Try to send data immediately, only if there's no pending write!
706 * Note:
707 * We are speculating that the list is empty here without properly
708 * acquiring ioqueue's mutex first. This is intentional, to maximize
709 * performance via parallelism.
710 *
711 * This should be safe, because:
712 * - by convention, we require caller to make sure that the
713 * key is not unregistered while other threads are invoking
714 * an operation on the same key.
715 * - pj_list_empty() is safe to be invoked by multiple threads,
716 * even when other threads are modifying the list.
717 */
718 if (pj_list_empty(&key->write_list)) {
719 /*
720 * See if data can be sent immediately.
721 */
722 sent = *length;
723 status = pj_sock_send(key->fd, data, &sent, flags);
724 if (status == PJ_SUCCESS) {
725 /* Success! */
726 *length = sent;
727 return PJ_SUCCESS;
728 } else {
729 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
730 * the error to caller.
731 */
732 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
733 return status;
734 }
735 }
736 }
737
738 /*
739 * Schedule asynchronous send.
740 */
Benny Prijono40212582006-06-22 18:41:28 +0000741 write_op = (struct write_operation*)op_key;
742
743 /* Spin if write_op has pending operation */
744 for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
745 pj_thread_sleep(0);
746
747 /* Last chance */
748 if (write_op->op) {
749 /* Unable to send packet because there is already pending write in the
750 * write_op. We could not put the operation into the write_op
751 * because write_op already contains a pending operation! And
752 * we could not send the packet directly with send() either,
753 * because that will break the order of the packet. So we can
754 * only return error here.
755 *
756 * This could happen for example in multithreads program,
757 * where polling is done by one thread, while other threads are doing
758 * the sending only. If the polling thread runs on lower priority
759 * than the sending thread, then it's possible that the pending
760 * write flag is not cleared in-time because clearing is only done
761 * during polling.
762 *
763 * Aplication should specify multiple write operation keys on
764 * situation like this.
765 */
766 //pj_assert(!"ioqueue: there is pending operation on this key!");
767 return PJ_EBUSY;
768 }
769
Benny Prijono9033e312005-11-21 02:08:39 +0000770 write_op->op = PJ_IOQUEUE_OP_SEND;
Benny Prijonoa1e69682007-05-11 15:14:34 +0000771 write_op->buf = (char*)data;
Benny Prijono9033e312005-11-21 02:08:39 +0000772 write_op->size = *length;
773 write_op->written = 0;
774 write_op->flags = flags;
775
776 pj_mutex_lock(key->mutex);
777 pj_list_insert_before(&key->write_list, write_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000778 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000779 pj_mutex_unlock(key->mutex);
780
781 return PJ_EPENDING;
782}
783
784
785/*
786 * pj_ioqueue_sendto()
787 *
788 * Start asynchronous write() to the descriptor.
789 */
790PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
791 pj_ioqueue_op_key_t *op_key,
792 const void *data,
793 pj_ssize_t *length,
794 pj_uint32_t flags,
795 const pj_sockaddr_t *addr,
796 int addrlen)
797{
798 struct write_operation *write_op;
Benny Prijono40212582006-06-22 18:41:28 +0000799 unsigned retry;
Benny Prijono9033e312005-11-21 02:08:39 +0000800 pj_status_t status;
801 pj_ssize_t sent;
802
803 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
804 PJ_CHECK_STACK();
805
Benny Prijono5accbd02006-03-30 16:32:18 +0000806 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000807 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000808 return PJ_ECANCELLED;
809
Benny Prijono9033e312005-11-21 02:08:39 +0000810 /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */
811 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
812
813 /* Fast track:
814 * Try to send data immediately, only if there's no pending write!
815 * Note:
816 * We are speculating that the list is empty here without properly
817 * acquiring ioqueue's mutex first. This is intentional, to maximize
818 * performance via parallelism.
819 *
820 * This should be safe, because:
821 * - by convention, we require caller to make sure that the
822 * key is not unregistered while other threads are invoking
823 * an operation on the same key.
824 * - pj_list_empty() is safe to be invoked by multiple threads,
825 * even when other threads are modifying the list.
826 */
827 if (pj_list_empty(&key->write_list)) {
828 /*
829 * See if data can be sent immediately.
830 */
831 sent = *length;
832 status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
833 if (status == PJ_SUCCESS) {
834 /* Success! */
835 *length = sent;
836 return PJ_SUCCESS;
837 } else {
838 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
839 * the error to caller.
840 */
841 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
842 return status;
843 }
Benny Prijono40212582006-06-22 18:41:28 +0000844 status = status;
Benny Prijono9033e312005-11-21 02:08:39 +0000845 }
846 }
847
848 /*
849 * Check that address storage can hold the address parameter.
850 */
Benny Prijonoa1e69682007-05-11 15:14:34 +0000851 PJ_ASSERT_RETURN(addrlen <= (int)sizeof(pj_sockaddr_in), PJ_EBUG);
Benny Prijono9033e312005-11-21 02:08:39 +0000852
853 /*
854 * Schedule asynchronous send.
855 */
Benny Prijono40212582006-06-22 18:41:28 +0000856 write_op = (struct write_operation*)op_key;
857
858 /* Spin if write_op has pending operation */
859 for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
860 pj_thread_sleep(0);
861
862 /* Last chance */
863 if (write_op->op) {
864 /* Unable to send packet because there is already pending write on the
865 * write_op. We could not put the operation into the write_op
866 * because write_op already contains a pending operation! And
867 * we could not send the packet directly with sendto() either,
868 * because that will break the order of the packet. So we can
869 * only return error here.
870 *
871 * This could happen for example in multithreads program,
872 * where polling is done by one thread, while other threads are doing
873 * the sending only. If the polling thread runs on lower priority
874 * than the sending thread, then it's possible that the pending
875 * write flag is not cleared in-time because clearing is only done
876 * during polling.
877 *
878 * Aplication should specify multiple write operation keys on
879 * situation like this.
880 */
881 //pj_assert(!"ioqueue: there is pending operation on this key!");
882 return PJ_EBUSY;
883 }
884
Benny Prijono9033e312005-11-21 02:08:39 +0000885 write_op->op = PJ_IOQUEUE_OP_SEND_TO;
Benny Prijonoa1e69682007-05-11 15:14:34 +0000886 write_op->buf = (char*)data;
Benny Prijono9033e312005-11-21 02:08:39 +0000887 write_op->size = *length;
888 write_op->written = 0;
889 write_op->flags = flags;
890 pj_memcpy(&write_op->rmt_addr, addr, addrlen);
891 write_op->rmt_addrlen = addrlen;
892
893 pj_mutex_lock(key->mutex);
894 pj_list_insert_before(&key->write_list, write_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000895 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000896 pj_mutex_unlock(key->mutex);
897
898 return PJ_EPENDING;
899}
900
901#if PJ_HAS_TCP
902/*
903 * Initiate overlapped accept() operation.
904 */
905PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
906 pj_ioqueue_op_key_t *op_key,
907 pj_sock_t *new_sock,
908 pj_sockaddr_t *local,
909 pj_sockaddr_t *remote,
910 int *addrlen)
911{
912 struct accept_operation *accept_op;
913 pj_status_t status;
914
915 /* check parameters. All must be specified! */
916 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
917
Benny Prijono5accbd02006-03-30 16:32:18 +0000918 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000919 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000920 return PJ_ECANCELLED;
921
Benny Prijono9033e312005-11-21 02:08:39 +0000922 accept_op = (struct accept_operation*)op_key;
Benny Prijonoa1e69682007-05-11 15:14:34 +0000923 accept_op->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono9033e312005-11-21 02:08:39 +0000924
925 /* Fast track:
926 * See if there's new connection available immediately.
927 */
928 if (pj_list_empty(&key->accept_list)) {
929 status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
930 if (status == PJ_SUCCESS) {
931 /* Yes! New connection is available! */
932 if (local && addrlen) {
933 status = pj_sock_getsockname(*new_sock, local, addrlen);
934 if (status != PJ_SUCCESS) {
935 pj_sock_close(*new_sock);
936 *new_sock = PJ_INVALID_SOCKET;
937 return status;
938 }
939 }
940 return PJ_SUCCESS;
941 } else {
942 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
943 * the error to caller.
944 */
945 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
946 return status;
947 }
948 }
949 }
950
951 /*
952 * No connection is available immediately.
953 * Schedule accept() operation to be completed when there is incoming
954 * connection available.
955 */
956 accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
957 accept_op->accept_fd = new_sock;
958 accept_op->rmt_addr = remote;
959 accept_op->addrlen= addrlen;
960 accept_op->local_addr = local;
961
962 pj_mutex_lock(key->mutex);
963 pj_list_insert_before(&key->accept_list, accept_op);
Benny Prijono63ab3562006-07-08 19:46:43 +0000964 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +0000965 pj_mutex_unlock(key->mutex);
966
967 return PJ_EPENDING;
968}
969
970/*
971 * Initiate overlapped connect() operation (well, it's non-blocking actually,
972 * since there's no overlapped version of connect()).
973 */
974PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
975 const pj_sockaddr_t *addr,
976 int addrlen )
977{
978 pj_status_t status;
979
980 /* check parameters. All must be specified! */
981 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
982
Benny Prijono5accbd02006-03-30 16:32:18 +0000983 /* Check if key is closing. */
Benny Prijono3059eb62006-10-04 20:46:27 +0000984 if (IS_CLOSING(key))
Benny Prijono5accbd02006-03-30 16:32:18 +0000985 return PJ_ECANCELLED;
986
Benny Prijono9033e312005-11-21 02:08:39 +0000987 /* Check if socket has not been marked for connecting */
988 if (key->connecting != 0)
989 return PJ_EPENDING;
990
991 status = pj_sock_connect(key->fd, addr, addrlen);
992 if (status == PJ_SUCCESS) {
993 /* Connected! */
994 return PJ_SUCCESS;
995 } else {
996 if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
997 /* Pending! */
998 pj_mutex_lock(key->mutex);
999 key->connecting = PJ_TRUE;
Benny Prijono63ab3562006-07-08 19:46:43 +00001000 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
1001 ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
Benny Prijono9033e312005-11-21 02:08:39 +00001002 pj_mutex_unlock(key->mutex);
1003 return PJ_EPENDING;
1004 } else {
1005 /* Error! */
1006 return status;
1007 }
1008 }
1009}
1010#endif /* PJ_HAS_TCP */
1011
1012
1013PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1014 pj_size_t size )
1015{
Benny Prijonoac623b32006-07-03 15:19:31 +00001016 pj_bzero(op_key, size);
Benny Prijono9033e312005-11-21 02:08:39 +00001017}
1018
1019
1020/*
1021 * pj_ioqueue_is_pending()
1022 */
1023PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1024 pj_ioqueue_op_key_t *op_key )
1025{
1026 struct generic_operation *op_rec;
1027
1028 PJ_UNUSED_ARG(key);
1029
1030 op_rec = (struct generic_operation*)op_key;
1031 return op_rec->op != 0;
1032}
1033
1034
1035/*
1036 * pj_ioqueue_post_completion()
1037 */
1038PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1039 pj_ioqueue_op_key_t *op_key,
1040 pj_ssize_t bytes_status )
1041{
1042 struct generic_operation *op_rec;
1043
1044 /*
1045 * Find the operation key in all pending operation list to
1046 * really make sure that it's still there; then call the callback.
1047 */
Benny Prijono5accbd02006-03-30 16:32:18 +00001048 pj_mutex_lock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001049
1050 /* Find the operation in the pending read list. */
1051 op_rec = (struct generic_operation*)key->read_list.next;
1052 while (op_rec != (void*)&key->read_list) {
1053 if (op_rec == (void*)op_key) {
1054 pj_list_erase(op_rec);
Benny Prijonoa1e69682007-05-11 15:14:34 +00001055 op_rec->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono5accbd02006-03-30 16:32:18 +00001056 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001057
1058 (*key->cb.on_read_complete)(key, op_key, bytes_status);
1059 return PJ_SUCCESS;
1060 }
1061 op_rec = op_rec->next;
1062 }
1063
1064 /* Find the operation in the pending write list. */
1065 op_rec = (struct generic_operation*)key->write_list.next;
1066 while (op_rec != (void*)&key->write_list) {
1067 if (op_rec == (void*)op_key) {
1068 pj_list_erase(op_rec);
Benny Prijonoa1e69682007-05-11 15:14:34 +00001069 op_rec->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono5accbd02006-03-30 16:32:18 +00001070 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001071
1072 (*key->cb.on_write_complete)(key, op_key, bytes_status);
1073 return PJ_SUCCESS;
1074 }
1075 op_rec = op_rec->next;
1076 }
1077
1078 /* Find the operation in the pending accept list. */
1079 op_rec = (struct generic_operation*)key->accept_list.next;
1080 while (op_rec != (void*)&key->accept_list) {
1081 if (op_rec == (void*)op_key) {
1082 pj_list_erase(op_rec);
Benny Prijonoa1e69682007-05-11 15:14:34 +00001083 op_rec->op = PJ_IOQUEUE_OP_NONE;
Benny Prijono5accbd02006-03-30 16:32:18 +00001084 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001085
1086 (*key->cb.on_accept_complete)(key, op_key,
1087 PJ_INVALID_SOCKET,
1088 bytes_status);
1089 return PJ_SUCCESS;
1090 }
1091 op_rec = op_rec->next;
1092 }
1093
Benny Prijono5accbd02006-03-30 16:32:18 +00001094 pj_mutex_unlock(key->mutex);
Benny Prijono9033e312005-11-21 02:08:39 +00001095
1096 return PJ_EINVALIDOP;
1097}
1098