blob: b313c44973029f21e5d14474beeda48f67f3af4c [file] [log] [blame]
Benny Prijonoe0312a72005-11-18 00:16:43 +00001/* $Id$ */
Benny Prijonoe7224612005-11-13 19:40:44 +00002/*
Benny Prijonoe0312a72005-11-18 00:16:43 +00003 * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>
Benny Prijonoe7224612005-11-13 19:40:44 +00004 *
Benny Prijonoe0312a72005-11-18 00:16:43 +00005 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
Benny Prijonoe7224612005-11-13 19:40:44 +00009 *
Benny Prijonoe0312a72005-11-18 00:16:43 +000010 * This program is distributed in the hope that it will be useful,
Benny Prijonoe7224612005-11-13 19:40:44 +000011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Benny Prijonoe0312a72005-11-18 00:16:43 +000012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
Benny Prijonoe7224612005-11-13 19:40:44 +000018 */
Benny Prijonoe0312a72005-11-18 00:16:43 +000019
Benny Prijonoe7224612005-11-13 19:40:44 +000020/*
21 * sock_select.c
22 *
23 * This is the implementation of IOQueue using pj_sock_select().
24 * It runs anywhere where pj_sock_select() is available (currently
25 * Win32, Linux, Linux kernel, etc.).
26 */
27
28#include <pj/ioqueue.h>
29#include <pj/os.h>
30#include <pj/lock.h>
31#include <pj/log.h>
32#include <pj/list.h>
33#include <pj/pool.h>
34#include <pj/string.h>
35#include <pj/assert.h>
36#include <pj/sock.h>
37#include <pj/compat/socket.h>
38#include <pj/sock_select.h>
39#include <pj/errno.h>
40
41/*
42 * Include declaration from common abstraction.
43 */
44#include "ioqueue_common_abs.h"
45
46/*
47 * ISSUES with ioqueue_select()
48 *
49 * EAGAIN/EWOULDBLOCK error in recv():
50 * - when multiple threads are working with the ioqueue, application
51 * may receive EAGAIN or EWOULDBLOCK in the receive callback.
52 * This error happens because more than one thread is watching for
53 * the same descriptor set, so when all of them call recv() or recvfrom()
54 * simultaneously, only one will succeed and the rest will get the error.
55 *
56 */
57#define THIS_FILE "ioq_select"
58
59/*
60 * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
61 * the correct error code.
62 */
63#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
64# error "Error reporting must be enabled for this function to work!"
65#endif
66
67/**
68 * Get the number of descriptors in the set. This is defined in sock_select.c
69 * This function will only return the number of sockets set from PJ_FD_SET
70 * operation. When the set is modified by other means (such as by select()),
71 * the count will not be reflected here.
72 *
73 * That's why don't export this function in the header file, to avoid
74 * misunderstanding.
75 *
76 * @param fdsetp The descriptor set.
77 *
78 * @return Number of descriptors in the set.
79 */
80PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
81
82
83/*
84 * During debugging build, VALIDATE_FD_SET is set.
85 * This will check the validity of the fd_sets.
86 */
87/*
88#if defined(PJ_DEBUG) && PJ_DEBUG != 0
89# define VALIDATE_FD_SET 1
90#else
91# define VALIDATE_FD_SET 0
92#endif
93*/
94#define VALIDATE_FD_SET 0
95
96/*
97 * This describes each key.
98 */
99struct pj_ioqueue_key_t
100{
101 DECLARE_COMMON_KEY
102};
103
104/*
105 * This describes the I/O queue itself.
106 */
107struct pj_ioqueue_t
108{
109 DECLARE_COMMON_IOQUEUE
110
111 unsigned max, count;
112 pj_ioqueue_key_t key_list;
113 pj_fd_set_t rfdset;
114 pj_fd_set_t wfdset;
115#if PJ_HAS_TCP
116 pj_fd_set_t xfdset;
117#endif
118};
119
120/* Include implementation for common abstraction after we declare
121 * pj_ioqueue_key_t and pj_ioqueue_t.
122 */
123#include "ioqueue_common_abs.c"
124
125/*
126 * pj_ioqueue_name()
127 */
128PJ_DEF(const char*) pj_ioqueue_name(void)
129{
130 return "select";
131}
132
133/*
134 * pj_ioqueue_create()
135 *
136 * Create select ioqueue.
137 */
138PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
139 pj_size_t max_fd,
140 pj_ioqueue_t **p_ioqueue)
141{
142 pj_ioqueue_t *ioqueue;
143 pj_lock_t *lock;
144 pj_status_t rc;
145
146 /* Check that arguments are valid. */
147 PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
148 max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,
149 PJ_EINVAL);
150
151 /* Check that size of pj_ioqueue_op_key_t is sufficient */
152 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
153 sizeof(union operation_key), PJ_EBUG);
154
155 ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
156
157 ioqueue_init(ioqueue);
158
159 ioqueue->max = max_fd;
160 ioqueue->count = 0;
161 PJ_FD_ZERO(&ioqueue->rfdset);
162 PJ_FD_ZERO(&ioqueue->wfdset);
163#if PJ_HAS_TCP
164 PJ_FD_ZERO(&ioqueue->xfdset);
165#endif
166 pj_list_init(&ioqueue->key_list);
167
168 rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
169 if (rc != PJ_SUCCESS)
170 return rc;
171
172 rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
173 if (rc != PJ_SUCCESS)
174 return rc;
175
176 PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
177
178 *p_ioqueue = ioqueue;
179 return PJ_SUCCESS;
180}
181
182/*
183 * pj_ioqueue_destroy()
184 *
185 * Destroy ioqueue.
186 */
187PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
188{
189 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
190
191 pj_lock_acquire(ioqueue->lock);
192 return ioqueue_destroy(ioqueue);
193}
194
195
196/*
197 * pj_ioqueue_register_sock()
198 *
199 * Register a handle to ioqueue.
200 */
201PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
202 pj_ioqueue_t *ioqueue,
203 pj_sock_t sock,
204 void *user_data,
205 const pj_ioqueue_callback *cb,
206 pj_ioqueue_key_t **p_key)
207{
208 pj_ioqueue_key_t *key = NULL;
209 pj_uint32_t value;
210 pj_status_t rc = PJ_SUCCESS;
211
212 PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
213 cb && p_key, PJ_EINVAL);
214
215 pj_lock_acquire(ioqueue->lock);
216
217 if (ioqueue->count >= ioqueue->max) {
218 rc = PJ_ETOOMANY;
219 goto on_return;
220 }
221
222 /* Set socket to nonblocking. */
223 value = 1;
224#ifdef PJ_WIN32
225 if (ioctlsocket(sock, FIONBIO, (u_long*)&value)) {
226#else
227 if (ioctl(sock, FIONBIO, &value)) {
228#endif
229 rc = pj_get_netos_error();
230 goto on_return;
231 }
232
233 /* Create key. */
234 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
235 rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
236 if (rc != PJ_SUCCESS) {
237 key = NULL;
238 goto on_return;
239 }
240
241 /* Register */
242 pj_list_insert_before(&ioqueue->key_list, key);
243 ++ioqueue->count;
244
245on_return:
246 /* On error, socket may be left in non-blocking mode. */
247 *p_key = key;
248 pj_lock_release(ioqueue->lock);
249
250 return rc;
251}
252
253/*
254 * pj_ioqueue_unregister()
255 *
256 * Unregister handle from ioqueue.
257 */
258PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
259{
260 pj_ioqueue_t *ioqueue;
261
262 PJ_ASSERT_RETURN(key, PJ_EINVAL);
263
264 ioqueue = key->ioqueue;
265
266 pj_lock_acquire(ioqueue->lock);
267
268 pj_assert(ioqueue->count > 0);
269 --ioqueue->count;
270 pj_list_erase(key);
271 PJ_FD_CLR(key->fd, &ioqueue->rfdset);
272 PJ_FD_CLR(key->fd, &ioqueue->wfdset);
273#if PJ_HAS_TCP
274 PJ_FD_CLR(key->fd, &ioqueue->xfdset);
275#endif
276
277 /* ioqueue_destroy may try to acquire key's mutex.
278 * Since normally the order of locking is to lock key's mutex first
279 * then ioqueue's mutex, ioqueue_destroy may deadlock unless we
280 * release ioqueue's mutex first.
281 */
282 pj_lock_release(ioqueue->lock);
283
284 /* Destroy the key. */
285 ioqueue_destroy_key(key);
286
287 return PJ_SUCCESS;
288}
289
290
291/* This supposed to check whether the fd_set values are consistent
292 * with the operation currently set in each key.
293 */
294#if VALIDATE_FD_SET
295static void validate_sets(const pj_ioqueue_t *ioqueue,
296 const pj_fd_set_t *rfdset,
297 const pj_fd_set_t *wfdset,
298 const pj_fd_set_t *xfdset)
299{
300 pj_ioqueue_key_t *key;
301
302 /*
303 * This basicly would not work anymore.
304 * We need to lock key before performing the check, but we can't do
305 * so because we're holding ioqueue mutex. If we acquire key's mutex
306 * now, the will cause deadlock.
307 */
308 pj_assert(0);
309
310 key = ioqueue->key_list.next;
311 while (key != &ioqueue->key_list) {
312 if (!pj_list_empty(&key->read_list)
313#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
314 || !pj_list_empty(&key->accept_list)
315#endif
316 )
317 {
318 pj_assert(PJ_FD_ISSET(key->fd, rfdset));
319 }
320 else {
321 pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
322 }
323 if (!pj_list_empty(&key->write_list)
324#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
325 || key->connecting
326#endif
327 )
328 {
329 pj_assert(PJ_FD_ISSET(key->fd, wfdset));
330 }
331 else {
332 pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
333 }
334#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
335 if (key->connecting)
336 {
337 pj_assert(PJ_FD_ISSET(key->fd, xfdset));
338 }
339 else {
340 pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0);
341 }
342#endif /* PJ_HAS_TCP */
343
344 key = key->next;
345 }
346}
347#endif /* VALIDATE_FD_SET */
348
349
350/* ioqueue_remove_from_set()
351 * This function is called from ioqueue_dispatch_event() to instruct
352 * the ioqueue to remove the specified descriptor from ioqueue's descriptor
353 * set for the specified event.
354 */
355static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
356 pj_sock_t fd,
357 enum ioqueue_event_type event_type)
358{
359 pj_lock_acquire(ioqueue->lock);
360
361 if (event_type == READABLE_EVENT)
362 PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset);
363 else if (event_type == WRITEABLE_EVENT)
364 PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset);
365 else if (event_type == EXCEPTION_EVENT)
366 PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset);
367 else
368 pj_assert(0);
369
370 pj_lock_release(ioqueue->lock);
371}
372
373/*
374 * ioqueue_add_to_set()
375 * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
376 * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
377 * set for the specified event.
378 */
379static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
380 pj_sock_t fd,
381 enum ioqueue_event_type event_type )
382{
383 pj_lock_acquire(ioqueue->lock);
384
385 if (event_type == READABLE_EVENT)
386 PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset);
387 else if (event_type == WRITEABLE_EVENT)
388 PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset);
389 else if (event_type == EXCEPTION_EVENT)
390 PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset);
391 else
392 pj_assert(0);
393
394 pj_lock_release(ioqueue->lock);
395}
396
397/*
398 * pj_ioqueue_poll()
399 *
400 * Few things worth written:
401 *
402 * - we used to do only one callback called per poll, but it didn't go
403 * very well. The reason is because on some situation, the write
404 * callback gets called all the time, thus doesn't give the read
405 * callback to get called. This happens, for example, when user
406 * submit write operation inside the write callback.
407 * As the result, we changed the behaviour so that now multiple
408 * callbacks are called in a single poll. It should be fast too,
409 * just that we need to be carefull with the ioqueue data structs.
410 *
411 * - to guarantee preemptiveness etc, the poll function must strictly
412 * work on fd_set copy of the ioqueue (not the original one).
413 */
414PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
415{
416 pj_fd_set_t rfdset, wfdset, xfdset;
417 int count, counter;
418 pj_ioqueue_key_t *h;
419 struct event
420 {
421 pj_ioqueue_key_t *key;
422 enum ioqueue_event_type event_type;
423 } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
424
425 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
426
427 /* Lock ioqueue before making fd_set copies */
428 pj_lock_acquire(ioqueue->lock);
429
430 /* We will only do select() when there are sockets to be polled.
431 * Otherwise select() will return error.
432 */
433 if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
434 PJ_FD_COUNT(&ioqueue->wfdset)==0 &&
435 PJ_FD_COUNT(&ioqueue->xfdset)==0)
436 {
437 pj_lock_release(ioqueue->lock);
438 if (timeout)
439 pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
440 return 0;
441 }
442
443 /* Copy ioqueue's pj_fd_set_t to local variables. */
444 pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));
445 pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));
446#if PJ_HAS_TCP
447 pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));
448#else
449 PJ_FD_ZERO(&xfdset);
450#endif
451
452#if VALIDATE_FD_SET
453 validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);
454#endif
455
456 /* Unlock ioqueue before select(). */
457 pj_lock_release(ioqueue->lock);
458
459 count = pj_sock_select(FD_SETSIZE, &rfdset, &wfdset, &xfdset, timeout);
460
461 if (count <= 0)
462 return count;
463 else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
464 count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;
465
466 /* Scan descriptor sets for event and add the events in the event
467 * array to be processed later in this function. We do this so that
468 * events can be processed in parallel without holding ioqueue lock.
469 */
470 pj_lock_acquire(ioqueue->lock);
471
472 counter = 0;
473
474 /* Scan for writable sockets first to handle piggy-back data
475 * coming with accept().
476 */
477 h = ioqueue->key_list.next;
478 for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) {
479 if ( (key_has_pending_write(h) || key_has_pending_connect(h))
480 && PJ_FD_ISSET(h->fd, &wfdset))
481 {
482 event[counter].key = h;
483 event[counter].event_type = WRITEABLE_EVENT;
484 ++counter;
485 }
486
487 /* Scan for readable socket. */
488 if ((key_has_pending_read(h) || key_has_pending_accept(h))
489 && PJ_FD_ISSET(h->fd, &rfdset))
490 {
491 event[counter].key = h;
492 event[counter].event_type = READABLE_EVENT;
493 ++counter;
494 }
495
496#if PJ_HAS_TCP
497 if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) {
498 event[counter].key = h;
499 event[counter].event_type = EXCEPTION_EVENT;
500 ++counter;
501 }
502#endif
503 }
504
505 pj_lock_release(ioqueue->lock);
506
507 count = counter;
508
509 /* Now process all events. The dispatch functions will take care
510 * of locking in each of the key
511 */
512 for (counter=0; counter<count; ++counter) {
513 switch (event[counter].event_type) {
514 case READABLE_EVENT:
515 ioqueue_dispatch_read_event(ioqueue, event[counter].key);
516 break;
517 case WRITEABLE_EVENT:
518 ioqueue_dispatch_write_event(ioqueue, event[counter].key);
519 break;
520 case EXCEPTION_EVENT:
521 ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
522 break;
523 case NO_EVENT:
524 pj_assert(!"Invalid event!");
525 break;
526 }
527 }
528
529 return count;
530}
531