blob: 6be66ae538f75e4a2dd205845ddd543c3b129b24 [file] [log] [blame]
Tristan Matthews0a329cc2013-07-17 13:20:14 -04001/* $Id$ */
2/*
3 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20
21/*
22 * sock_select.c
23 *
24 * This is the implementation of IOQueue using pj_sock_select().
25 * It runs anywhere where pj_sock_select() is available (currently
26 * Win32, Linux, Linux kernel, etc.).
27 */
28
29#include <pj/ioqueue.h>
30#include <pj/os.h>
31#include <pj/lock.h>
32#include <pj/log.h>
33#include <pj/list.h>
34#include <pj/pool.h>
35#include <pj/string.h>
36#include <pj/assert.h>
37#include <pj/sock.h>
38#include <pj/compat/socket.h>
39#include <pj/sock_select.h>
40#include <pj/sock_qos.h>
41#include <pj/errno.h>
42#include <pj/rand.h>
43
44/* Now that we have access to OS'es <sys/select>, lets check again that
45 * PJ_IOQUEUE_MAX_HANDLES is not greater than FD_SETSIZE
46 */
47#if PJ_IOQUEUE_MAX_HANDLES > FD_SETSIZE
48# error "PJ_IOQUEUE_MAX_HANDLES cannot be greater than FD_SETSIZE"
49#endif
50
51
52/*
53 * Include declaration from common abstraction.
54 */
55#include "ioqueue_common_abs.h"
56
57/*
58 * ISSUES with ioqueue_select()
59 *
60 * EAGAIN/EWOULDBLOCK error in recv():
61 * - when multiple threads are working with the ioqueue, application
62 * may receive EAGAIN or EWOULDBLOCK in the receive callback.
63 * This error happens because more than one thread is watching for
64 * the same descriptor set, so when all of them call recv() or recvfrom()
65 * simultaneously, only one will succeed and the rest will get the error.
66 *
67 */
68#define THIS_FILE "ioq_select"
69
70/*
71 * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
72 * the correct error code.
73 */
74#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
75# error "Error reporting must be enabled for this function to work!"
76#endif
77
78/*
79 * During debugging build, VALIDATE_FD_SET is set.
80 * This will check the validity of the fd_sets.
81 */
82/*
83#if defined(PJ_DEBUG) && PJ_DEBUG != 0
84# define VALIDATE_FD_SET 1
85#else
86# define VALIDATE_FD_SET 0
87#endif
88*/
89#define VALIDATE_FD_SET 0
90
91#if 0
92# define TRACE__(args) PJ_LOG(3,args)
93#else
94# define TRACE__(args)
95#endif
96
97/*
98 * This describes each key.
99 */
100struct pj_ioqueue_key_t
101{
102 DECLARE_COMMON_KEY
103};
104
105/*
106 * This describes the I/O queue itself.
107 */
108struct pj_ioqueue_t
109{
110 DECLARE_COMMON_IOQUEUE
111
112 unsigned max, count; /* Max and current key count */
113 int nfds; /* The largest fd value (for select)*/
114 pj_ioqueue_key_t active_list; /* List of active keys. */
115 pj_fd_set_t rfdset;
116 pj_fd_set_t wfdset;
117#if PJ_HAS_TCP
118 pj_fd_set_t xfdset;
119#endif
120
121#if PJ_IOQUEUE_HAS_SAFE_UNREG
122 pj_mutex_t *ref_cnt_mutex;
123 pj_ioqueue_key_t closing_list;
124 pj_ioqueue_key_t free_list;
125#endif
126};
127
128/* Proto */
129#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
130 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
131static pj_status_t replace_udp_sock(pj_ioqueue_key_t *h);
132#endif
133
134/* Include implementation for common abstraction after we declare
135 * pj_ioqueue_key_t and pj_ioqueue_t.
136 */
137#include "ioqueue_common_abs.c"
138
139#if PJ_IOQUEUE_HAS_SAFE_UNREG
140/* Scan closing keys to be put to free list again */
141static void scan_closing_keys(pj_ioqueue_t *ioqueue);
142#endif
143
144/*
145 * pj_ioqueue_name()
146 */
147PJ_DEF(const char*) pj_ioqueue_name(void)
148{
149 return "select";
150}
151
152/*
153 * Scan the socket descriptor sets for the largest descriptor.
154 * This value is needed by select().
155 */
156#if defined(PJ_SELECT_NEEDS_NFDS) && PJ_SELECT_NEEDS_NFDS!=0
157static void rescan_fdset(pj_ioqueue_t *ioqueue)
158{
159 pj_ioqueue_key_t *key = ioqueue->active_list.next;
160 int max = 0;
161
162 while (key != &ioqueue->active_list) {
163 if (key->fd > max)
164 max = key->fd;
165 key = key->next;
166 }
167
168 ioqueue->nfds = max;
169}
170#else
171static void rescan_fdset(pj_ioqueue_t *ioqueue)
172{
173 ioqueue->nfds = FD_SETSIZE-1;
174}
175#endif
176
177
178/*
179 * pj_ioqueue_create()
180 *
181 * Create select ioqueue.
182 */
183PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
184 pj_size_t max_fd,
185 pj_ioqueue_t **p_ioqueue)
186{
187 pj_ioqueue_t *ioqueue;
188 pj_lock_t *lock;
189 unsigned i;
190 pj_status_t rc;
191
192 /* Check that arguments are valid. */
193 PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
194 max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,
195 PJ_EINVAL);
196
197 /* Check that size of pj_ioqueue_op_key_t is sufficient */
198 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
199 sizeof(union operation_key), PJ_EBUG);
200
201 /* Create and init common ioqueue stuffs */
202 ioqueue = PJ_POOL_ALLOC_T(pool, pj_ioqueue_t);
203 ioqueue_init(ioqueue);
204
205 ioqueue->max = (unsigned)max_fd;
206 ioqueue->count = 0;
207 PJ_FD_ZERO(&ioqueue->rfdset);
208 PJ_FD_ZERO(&ioqueue->wfdset);
209#if PJ_HAS_TCP
210 PJ_FD_ZERO(&ioqueue->xfdset);
211#endif
212 pj_list_init(&ioqueue->active_list);
213
214 rescan_fdset(ioqueue);
215
216#if PJ_IOQUEUE_HAS_SAFE_UNREG
217 /* When safe unregistration is used (the default), we pre-create
218 * all keys and put them in the free list.
219 */
220
221 /* Mutex to protect key's reference counter
222 * We don't want to use key's mutex or ioqueue's mutex because
223 * that would create deadlock situation in some cases.
224 */
225 rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex);
226 if (rc != PJ_SUCCESS)
227 return rc;
228
229
230 /* Init key list */
231 pj_list_init(&ioqueue->free_list);
232 pj_list_init(&ioqueue->closing_list);
233
234
235 /* Pre-create all keys according to max_fd */
236 for (i=0; i<max_fd; ++i) {
237 pj_ioqueue_key_t *key;
238
239 key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t);
240 key->ref_count = 0;
241 rc = pj_lock_create_recursive_mutex(pool, NULL, &key->lock);
242 if (rc != PJ_SUCCESS) {
243 key = ioqueue->free_list.next;
244 while (key != &ioqueue->free_list) {
245 pj_lock_destroy(key->lock);
246 key = key->next;
247 }
248 pj_mutex_destroy(ioqueue->ref_cnt_mutex);
249 return rc;
250 }
251
252 pj_list_push_back(&ioqueue->free_list, key);
253 }
254#endif
255
256 /* Create and init ioqueue mutex */
257 rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
258 if (rc != PJ_SUCCESS)
259 return rc;
260
261 rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
262 if (rc != PJ_SUCCESS)
263 return rc;
264
265 PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
266
267 *p_ioqueue = ioqueue;
268 return PJ_SUCCESS;
269}
270
271/*
272 * pj_ioqueue_destroy()
273 *
274 * Destroy ioqueue.
275 */
276PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
277{
278 pj_ioqueue_key_t *key;
279
280 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
281
282 pj_lock_acquire(ioqueue->lock);
283
284#if PJ_IOQUEUE_HAS_SAFE_UNREG
285 /* Destroy reference counters */
286 key = ioqueue->active_list.next;
287 while (key != &ioqueue->active_list) {
288 pj_lock_destroy(key->lock);
289 key = key->next;
290 }
291
292 key = ioqueue->closing_list.next;
293 while (key != &ioqueue->closing_list) {
294 pj_lock_destroy(key->lock);
295 key = key->next;
296 }
297
298 key = ioqueue->free_list.next;
299 while (key != &ioqueue->free_list) {
300 pj_lock_destroy(key->lock);
301 key = key->next;
302 }
303
304 pj_mutex_destroy(ioqueue->ref_cnt_mutex);
305#endif
306
307 return ioqueue_destroy(ioqueue);
308}
309
310
311/*
312 * pj_ioqueue_register_sock()
313 *
314 * Register socket handle to ioqueue.
315 */
316PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool,
317 pj_ioqueue_t *ioqueue,
318 pj_sock_t sock,
319 pj_grp_lock_t *grp_lock,
320 void *user_data,
321 const pj_ioqueue_callback *cb,
322 pj_ioqueue_key_t **p_key)
323{
324 pj_ioqueue_key_t *key = NULL;
325#if defined(PJ_WIN32) && PJ_WIN32!=0 || \
326 defined(PJ_WIN64) && PJ_WIN64 != 0 || \
327 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
328 u_long value;
329#else
330 pj_uint32_t value;
331#endif
332 pj_status_t rc = PJ_SUCCESS;
333
334 PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
335 cb && p_key, PJ_EINVAL);
336
337 pj_lock_acquire(ioqueue->lock);
338
339 if (ioqueue->count >= ioqueue->max) {
340 rc = PJ_ETOOMANY;
341 goto on_return;
342 }
343
344 /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get
345 * the key from the free list. Otherwise allocate a new one.
346 */
347#if PJ_IOQUEUE_HAS_SAFE_UNREG
348
349 /* Scan closing_keys first to let them come back to free_list */
350 scan_closing_keys(ioqueue);
351
352 pj_assert(!pj_list_empty(&ioqueue->free_list));
353 if (pj_list_empty(&ioqueue->free_list)) {
354 rc = PJ_ETOOMANY;
355 goto on_return;
356 }
357
358 key = ioqueue->free_list.next;
359 pj_list_erase(key);
360#else
361 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
362#endif
363
364 rc = ioqueue_init_key(pool, ioqueue, key, sock, grp_lock, user_data, cb);
365 if (rc != PJ_SUCCESS) {
366 key = NULL;
367 goto on_return;
368 }
369
370 /* Set socket to nonblocking. */
371 value = 1;
372#if defined(PJ_WIN32) && PJ_WIN32!=0 || \
373 defined(PJ_WIN64) && PJ_WIN64 != 0 || \
374 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
375 if (ioctlsocket(sock, FIONBIO, &value)) {
376#else
377 if (ioctl(sock, FIONBIO, &value)) {
378#endif
379 rc = pj_get_netos_error();
380 goto on_return;
381 }
382
383
384 /* Put in active list. */
385 pj_list_insert_before(&ioqueue->active_list, key);
386 ++ioqueue->count;
387
388 /* Rescan fdset to get max descriptor */
389 rescan_fdset(ioqueue);
390
391on_return:
392 /* On error, socket may be left in non-blocking mode. */
393 if (rc != PJ_SUCCESS) {
394 if (key && key->grp_lock)
395 pj_grp_lock_dec_ref_dbg(key->grp_lock, "ioqueue", 0);
396 }
397 *p_key = key;
398 pj_lock_release(ioqueue->lock);
399
400 return rc;
401}
402
403PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
404 pj_ioqueue_t *ioqueue,
405 pj_sock_t sock,
406 void *user_data,
407 const pj_ioqueue_callback *cb,
408 pj_ioqueue_key_t **p_key)
409{
410 return pj_ioqueue_register_sock2(pool, ioqueue, sock, NULL, user_data,
411 cb, p_key);
412}
413
414#if PJ_IOQUEUE_HAS_SAFE_UNREG
415/* Increment key's reference counter */
416static void increment_counter(pj_ioqueue_key_t *key)
417{
418 pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
419 ++key->ref_count;
420 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
421}
422
423/* Decrement the key's reference counter, and when the counter reach zero,
424 * destroy the key.
425 *
426 * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK.
427 */
428static void decrement_counter(pj_ioqueue_key_t *key)
429{
430 pj_lock_acquire(key->ioqueue->lock);
431 pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
432 --key->ref_count;
433 if (key->ref_count == 0) {
434
435 pj_assert(key->closing == 1);
436 pj_gettickcount(&key->free_time);
437 key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
438 pj_time_val_normalize(&key->free_time);
439
440 pj_list_erase(key);
441 pj_list_push_back(&key->ioqueue->closing_list, key);
442 /* Rescan fdset to get max descriptor */
443 rescan_fdset(key->ioqueue);
444 }
445 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
446 pj_lock_release(key->ioqueue->lock);
447}
448#endif
449
450
451/*
452 * pj_ioqueue_unregister()
453 *
454 * Unregister handle from ioqueue.
455 */
456PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
457{
458 pj_ioqueue_t *ioqueue;
459
460 PJ_ASSERT_RETURN(key, PJ_EINVAL);
461
462 ioqueue = key->ioqueue;
463
464 /* Lock the key to make sure no callback is simultaneously modifying
465 * the key. We need to lock the key before ioqueue here to prevent
466 * deadlock.
467 */
468 pj_ioqueue_lock_key(key);
469
470 /* Also lock ioqueue */
471 pj_lock_acquire(ioqueue->lock);
472
473 pj_assert(ioqueue->count > 0);
474 --ioqueue->count;
475#if !PJ_IOQUEUE_HAS_SAFE_UNREG
476 /* Ticket #520, key will be erased more than once */
477 pj_list_erase(key);
478#endif
479 PJ_FD_CLR(key->fd, &ioqueue->rfdset);
480 PJ_FD_CLR(key->fd, &ioqueue->wfdset);
481#if PJ_HAS_TCP
482 PJ_FD_CLR(key->fd, &ioqueue->xfdset);
483#endif
484
485 /* Close socket. */
486 pj_sock_close(key->fd);
487
488 /* Clear callback */
489 key->cb.on_accept_complete = NULL;
490 key->cb.on_connect_complete = NULL;
491 key->cb.on_read_complete = NULL;
492 key->cb.on_write_complete = NULL;
493
494 /* Must release ioqueue lock first before decrementing counter, to
495 * prevent deadlock.
496 */
497 pj_lock_release(ioqueue->lock);
498
499#if PJ_IOQUEUE_HAS_SAFE_UNREG
500 /* Mark key is closing. */
501 key->closing = 1;
502
503 /* Decrement counter. */
504 decrement_counter(key);
505
506 /* Done. */
507 if (key->grp_lock) {
508 /* just dec_ref and unlock. we will set grp_lock to NULL
509 * elsewhere */
510 pj_grp_lock_t *grp_lock = key->grp_lock;
511 // Don't set grp_lock to NULL otherwise the other thread
512 // will crash. Just leave it as dangling pointer, but this
513 // should be safe
514 //key->grp_lock = NULL;
515 pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0);
516 pj_grp_lock_release(grp_lock);
517 } else {
518 pj_ioqueue_unlock_key(key);
519 }
520#else
521 if (key->grp_lock) {
522 /* set grp_lock to NULL and unlock */
523 pj_grp_lock_t *grp_lock = key->grp_lock;
524 // Don't set grp_lock to NULL otherwise the other thread
525 // will crash. Just leave it as dangling pointer, but this
526 // should be safe
527 //key->grp_lock = NULL;
528 pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0);
529 pj_grp_lock_release(grp_lock);
530 } else {
531 pj_ioqueue_unlock_key(key);
532 }
533
534 pj_lock_destroy(key->lock);
535#endif
536
537 return PJ_SUCCESS;
538}
539
540
541/* This supposed to check whether the fd_set values are consistent
542 * with the operation currently set in each key.
543 */
544#if VALIDATE_FD_SET
545static void validate_sets(const pj_ioqueue_t *ioqueue,
546 const pj_fd_set_t *rfdset,
547 const pj_fd_set_t *wfdset,
548 const pj_fd_set_t *xfdset)
549{
550 pj_ioqueue_key_t *key;
551
552 /*
553 * This basicly would not work anymore.
554 * We need to lock key before performing the check, but we can't do
555 * so because we're holding ioqueue mutex. If we acquire key's mutex
556 * now, the will cause deadlock.
557 */
558 pj_assert(0);
559
560 key = ioqueue->active_list.next;
561 while (key != &ioqueue->active_list) {
562 if (!pj_list_empty(&key->read_list)
563#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
564 || !pj_list_empty(&key->accept_list)
565#endif
566 )
567 {
568 pj_assert(PJ_FD_ISSET(key->fd, rfdset));
569 }
570 else {
571 pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
572 }
573 if (!pj_list_empty(&key->write_list)
574#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
575 || key->connecting
576#endif
577 )
578 {
579 pj_assert(PJ_FD_ISSET(key->fd, wfdset));
580 }
581 else {
582 pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
583 }
584#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
585 if (key->connecting)
586 {
587 pj_assert(PJ_FD_ISSET(key->fd, xfdset));
588 }
589 else {
590 pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0);
591 }
592#endif /* PJ_HAS_TCP */
593
594 key = key->next;
595 }
596}
597#endif /* VALIDATE_FD_SET */
598
599
600/* ioqueue_remove_from_set()
601 * This function is called from ioqueue_dispatch_event() to instruct
602 * the ioqueue to remove the specified descriptor from ioqueue's descriptor
603 * set for the specified event.
604 */
605static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
606 pj_ioqueue_key_t *key,
607 enum ioqueue_event_type event_type)
608{
609 pj_lock_acquire(ioqueue->lock);
610
611 if (event_type == READABLE_EVENT)
612 PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->rfdset);
613 else if (event_type == WRITEABLE_EVENT)
614 PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->wfdset);
615#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
616 else if (event_type == EXCEPTION_EVENT)
617 PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->xfdset);
618#endif
619 else
620 pj_assert(0);
621
622 pj_lock_release(ioqueue->lock);
623}
624
625/*
626 * ioqueue_add_to_set()
627 * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
628 * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
629 * set for the specified event.
630 */
631static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
632 pj_ioqueue_key_t *key,
633 enum ioqueue_event_type event_type )
634{
635 pj_lock_acquire(ioqueue->lock);
636
637 if (event_type == READABLE_EVENT)
638 PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->rfdset);
639 else if (event_type == WRITEABLE_EVENT)
640 PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->wfdset);
641#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
642 else if (event_type == EXCEPTION_EVENT)
643 PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->xfdset);
644#endif
645 else
646 pj_assert(0);
647
648 pj_lock_release(ioqueue->lock);
649}
650
651#if PJ_IOQUEUE_HAS_SAFE_UNREG
652/* Scan closing keys to be put to free list again */
653static void scan_closing_keys(pj_ioqueue_t *ioqueue)
654{
655 pj_time_val now;
656 pj_ioqueue_key_t *h;
657
658 pj_gettickcount(&now);
659 h = ioqueue->closing_list.next;
660 while (h != &ioqueue->closing_list) {
661 pj_ioqueue_key_t *next = h->next;
662
663 pj_assert(h->closing != 0);
664
665 if (PJ_TIME_VAL_GTE(now, h->free_time)) {
666 pj_list_erase(h);
667 // Don't set grp_lock to NULL otherwise the other thread
668 // will crash. Just leave it as dangling pointer, but this
669 // should be safe
670 //h->grp_lock = NULL;
671 pj_list_push_back(&ioqueue->free_list, h);
672 }
673 h = next;
674 }
675}
676#endif
677
678#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
679 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
680static pj_status_t replace_udp_sock(pj_ioqueue_key_t *h)
681{
682 enum flags {
683 HAS_PEER_ADDR = 1,
684 HAS_QOS = 2
685 };
686 pj_sock_t old_sock, new_sock = PJ_INVALID_SOCKET;
687 pj_sockaddr local_addr, rem_addr;
688 int val, addr_len;
689 pj_fd_set_t *fds[3];
690 unsigned i, fds_cnt, flags=0;
691 pj_qos_params qos_params;
692 unsigned msec;
693 pj_status_t status;
694
695 pj_lock_acquire(h->ioqueue->lock);
696
697 old_sock = h->fd;
698
699 /* Can only replace UDP socket */
700 pj_assert(h->fd_type == pj_SOCK_DGRAM());
701
702 PJ_LOG(4,(THIS_FILE, "Attempting to replace UDP socket %d", old_sock));
703
704 /* Investigate the old socket */
705 addr_len = sizeof(local_addr);
706 status = pj_sock_getsockname(old_sock, &local_addr, &addr_len);
707 if (status != PJ_SUCCESS)
708 goto on_error;
709
710 addr_len = sizeof(rem_addr);
711 status = pj_sock_getpeername(old_sock, &rem_addr, &addr_len);
712 if (status == PJ_SUCCESS)
713 flags |= HAS_PEER_ADDR;
714
715 status = pj_sock_get_qos_params(old_sock, &qos_params);
716 if (status == PJ_SUCCESS)
717 flags |= HAS_QOS;
718
719 /* We're done with the old socket, close it otherwise we'll get
720 * error in bind()
721 */
722 pj_sock_close(old_sock);
723
724 /* Prepare the new socket */
725 status = pj_sock_socket(local_addr.addr.sa_family, PJ_SOCK_DGRAM, 0,
726 &new_sock);
727 if (status != PJ_SUCCESS)
728 goto on_error;
729
730 /* Even after the socket is closed, we'll still get "Address in use"
731 * errors, so force it with SO_REUSEADDR
732 */
733 val = 1;
734 status = pj_sock_setsockopt(new_sock, SOL_SOCKET, SO_REUSEADDR,
735 &val, sizeof(val));
736 if (status != PJ_SUCCESS)
737 goto on_error;
738
739 /* The loop is silly, but what else can we do? */
740 addr_len = pj_sockaddr_get_len(&local_addr);
741 for (msec=20; ; msec<1000? msec=msec*2 : 1000) {
742 status = pj_sock_bind(new_sock, &local_addr, addr_len);
743 if (status != PJ_STATUS_FROM_OS(EADDRINUSE))
744 break;
745 PJ_LOG(4,(THIS_FILE, "Address is still in use, retrying.."));
746 pj_thread_sleep(msec);
747 }
748
749 if (status != PJ_SUCCESS)
750 goto on_error;
751
752 if (flags & HAS_QOS) {
753 status = pj_sock_set_qos_params(new_sock, &qos_params);
754 if (status != PJ_SUCCESS)
755 goto on_error;
756 }
757
758 if (flags & HAS_PEER_ADDR) {
759 status = pj_sock_connect(new_sock, &rem_addr, addr_len);
760 if (status != PJ_SUCCESS)
761 goto on_error;
762 }
763
764 /* Set socket to nonblocking. */
765 val = 1;
766#if defined(PJ_WIN32) && PJ_WIN32!=0 || \
767 defined(PJ_WIN64) && PJ_WIN64 != 0 || \
768 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
769 if (ioctlsocket(new_sock, FIONBIO, &val)) {
770#else
771 if (ioctl(new_sock, FIONBIO, &val)) {
772#endif
773 status = pj_get_netos_error();
774 goto on_error;
775 }
776
777 /* Replace the occurrence of old socket with new socket in the
778 * fd sets.
779 */
780 fds_cnt = 0;
781 fds[fds_cnt++] = &h->ioqueue->rfdset;
782 fds[fds_cnt++] = &h->ioqueue->wfdset;
783#if PJ_HAS_TCP
784 fds[fds_cnt++] = &h->ioqueue->xfdset;
785#endif
786
787 for (i=0; i<fds_cnt; ++i) {
788 if (PJ_FD_ISSET(old_sock, fds[i])) {
789 PJ_FD_CLR(old_sock, fds[i]);
790 PJ_FD_SET(new_sock, fds[i]);
791 }
792 }
793
794 /* And finally replace the fd in the key */
795 h->fd = new_sock;
796
797 PJ_LOG(4,(THIS_FILE, "UDP has been replaced successfully!"));
798
799 pj_lock_release(h->ioqueue->lock);
800
801 return PJ_SUCCESS;
802
803on_error:
804 if (new_sock != PJ_INVALID_SOCKET)
805 pj_sock_close(new_sock);
806 PJ_PERROR(1,(THIS_FILE, status, "Error replacing socket"));
807 pj_lock_release(h->ioqueue->lock);
808 return status;
809}
810#endif
811
812
813/*
814 * pj_ioqueue_poll()
815 *
816 * Few things worth written:
817 *
818 * - we used to do only one callback called per poll, but it didn't go
819 * very well. The reason is because on some situation, the write
820 * callback gets called all the time, thus doesn't give the read
821 * callback to get called. This happens, for example, when user
822 * submit write operation inside the write callback.
823 * As the result, we changed the behaviour so that now multiple
824 * callbacks are called in a single poll. It should be fast too,
825 * just that we need to be carefull with the ioqueue data structs.
826 *
827 * - to guarantee preemptiveness etc, the poll function must strictly
828 * work on fd_set copy of the ioqueue (not the original one).
829 */
830PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
831{
832 pj_fd_set_t rfdset, wfdset, xfdset;
833 int count, i, counter;
834 pj_ioqueue_key_t *h;
835 struct event
836 {
837 pj_ioqueue_key_t *key;
838 enum ioqueue_event_type event_type;
839 } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
840
841 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
842
843 /* Lock ioqueue before making fd_set copies */
844 pj_lock_acquire(ioqueue->lock);
845
846 /* We will only do select() when there are sockets to be polled.
847 * Otherwise select() will return error.
848 */
849 if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
850 PJ_FD_COUNT(&ioqueue->wfdset)==0
851#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
852 && PJ_FD_COUNT(&ioqueue->xfdset)==0
853#endif
854 )
855 {
856#if PJ_IOQUEUE_HAS_SAFE_UNREG
857 scan_closing_keys(ioqueue);
858#endif
859 pj_lock_release(ioqueue->lock);
860 TRACE__((THIS_FILE, " poll: no fd is set"));
861 if (timeout)
862 pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
863 return 0;
864 }
865
866 /* Copy ioqueue's pj_fd_set_t to local variables. */
867 pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));
868 pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));
869#if PJ_HAS_TCP
870 pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));
871#else
872 PJ_FD_ZERO(&xfdset);
873#endif
874
875#if VALIDATE_FD_SET
876 validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);
877#endif
878
879 /* Unlock ioqueue before select(). */
880 pj_lock_release(ioqueue->lock);
881
882 count = pj_sock_select(ioqueue->nfds+1, &rfdset, &wfdset, &xfdset,
883 timeout);
884
885 if (count == 0)
886 return 0;
887 else if (count < 0)
888 return -pj_get_netos_error();
889 else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
890 count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;
891
892 /* Scan descriptor sets for event and add the events in the event
893 * array to be processed later in this function. We do this so that
894 * events can be processed in parallel without holding ioqueue lock.
895 */
896 pj_lock_acquire(ioqueue->lock);
897
898 counter = 0;
899
900 /* Scan for writable sockets first to handle piggy-back data
901 * coming with accept().
902 */
903 h = ioqueue->active_list.next;
904 for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) {
905
906 if ( (key_has_pending_write(h) || key_has_pending_connect(h))
907 && PJ_FD_ISSET(h->fd, &wfdset) && !IS_CLOSING(h))
908 {
909#if PJ_IOQUEUE_HAS_SAFE_UNREG
910 increment_counter(h);
911#endif
912 event[counter].key = h;
913 event[counter].event_type = WRITEABLE_EVENT;
914 ++counter;
915 }
916
917 /* Scan for readable socket. */
918 if ((key_has_pending_read(h) || key_has_pending_accept(h))
919 && PJ_FD_ISSET(h->fd, &rfdset) && !IS_CLOSING(h) &&
920 counter<count)
921 {
922#if PJ_IOQUEUE_HAS_SAFE_UNREG
923 increment_counter(h);
924#endif
925 event[counter].key = h;
926 event[counter].event_type = READABLE_EVENT;
927 ++counter;
928 }
929
930#if PJ_HAS_TCP
931 if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) &&
932 !IS_CLOSING(h) && counter<count)
933 {
934#if PJ_IOQUEUE_HAS_SAFE_UNREG
935 increment_counter(h);
936#endif
937 event[counter].key = h;
938 event[counter].event_type = EXCEPTION_EVENT;
939 ++counter;
940 }
941#endif
942 }
943
944 for (i=0; i<counter; ++i) {
945 if (event[i].key->grp_lock)
946 pj_grp_lock_add_ref_dbg(event[i].key->grp_lock, "ioqueue", 0);
947 }
948
949 PJ_RACE_ME(5);
950
951 pj_lock_release(ioqueue->lock);
952
953 PJ_RACE_ME(5);
954
955 count = counter;
956
957 /* Now process all events. The dispatch functions will take care
958 * of locking in each of the key
959 */
960 for (counter=0; counter<count; ++counter) {
961 switch (event[counter].event_type) {
962 case READABLE_EVENT:
963 ioqueue_dispatch_read_event(ioqueue, event[counter].key);
964 break;
965 case WRITEABLE_EVENT:
966 ioqueue_dispatch_write_event(ioqueue, event[counter].key);
967 break;
968 case EXCEPTION_EVENT:
969 ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
970 break;
971 case NO_EVENT:
972 pj_assert(!"Invalid event!");
973 break;
974 }
975
976#if PJ_IOQUEUE_HAS_SAFE_UNREG
977 decrement_counter(event[counter].key);
978#endif
979
980 if (event[counter].key->grp_lock)
981 pj_grp_lock_dec_ref_dbg(event[counter].key->grp_lock,
982 "ioqueue", 0);
983 }
984
985
986 return count;
987}
988