Ticket #913: Concurrency problem in select ioqueue may corrupt descriptor set
 - fixed the concurrency problem
 - also fixed ioqueue unregister test in pjlib-test


git-svn-id: https://svn.pjsip.org/repos/pjproject/trunk@2826 74dad513-b988-da41-8d7b-12977e46ad98
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c
index e965b14..cccabc8 100644
--- a/pjlib/src/pj/ioqueue_common_abs.c
+++ b/pjlib/src/pj/ioqueue_common_abs.c
@@ -687,6 +687,14 @@
     read_op->flags = flags;
 
     pj_mutex_lock(key->mutex);
+    /* Check again. Handle may have been closed after the previous check
+     * in multithreaded app. If we add bad handle to the set it will
+     * corrupt the ioqueue set. See #913
+     */
+    if (IS_CLOSING(key)) {
+	pj_mutex_unlock(key->mutex);
+	return PJ_ECANCELLED;
+    }
     pj_list_insert_before(&key->read_list, read_op);
     ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
     pj_mutex_unlock(key->mutex);
@@ -755,6 +763,14 @@
     read_op->rmt_addrlen = addrlen;
 
     pj_mutex_lock(key->mutex);
+    /* Check again. Handle may have been closed after the previous check
+     * in multithreaded app. If we add bad handle to the set it will
+     * corrupt the ioqueue set. See #913
+     */
+    if (IS_CLOSING(key)) {
+	pj_mutex_unlock(key->mutex);
+	return PJ_ECANCELLED;
+    }
     pj_list_insert_before(&key->read_list, read_op);
     ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
     pj_mutex_unlock(key->mutex);
@@ -861,6 +877,14 @@
     write_op->flags = flags;
     
     pj_mutex_lock(key->mutex);
+    /* Check again. Handle may have been closed after the previous check
+     * in multithreaded app. If we add bad handle to the set it will
+     * corrupt the ioqueue set. See #913
+     */
+    if (IS_CLOSING(key)) {
+	pj_mutex_unlock(key->mutex);
+	return PJ_ECANCELLED;
+    }
     pj_list_insert_before(&key->write_list, write_op);
     ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
     pj_mutex_unlock(key->mutex);
@@ -978,6 +1002,14 @@
     write_op->rmt_addrlen = addrlen;
     
     pj_mutex_lock(key->mutex);
+    /* Check again. Handle may have been closed after the previous check
+     * in multithreaded app. If we add bad handle to the set it will
+     * corrupt the ioqueue set. See #913
+     */
+    if (IS_CLOSING(key)) {
+	pj_mutex_unlock(key->mutex);
+	return PJ_ECANCELLED;
+    }
     pj_list_insert_before(&key->write_list, write_op);
     ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
     pj_mutex_unlock(key->mutex);
@@ -1047,6 +1079,14 @@
     accept_op->local_addr = local;
 
     pj_mutex_lock(key->mutex);
+    /* Check again. Handle may have been closed after the previous check
+     * in multithreaded app. If we add bad handle to the set it will
+     * corrupt the ioqueue set. See #913
+     */
+    if (IS_CLOSING(key)) {
+	pj_mutex_unlock(key->mutex);
+	return PJ_ECANCELLED;
+    }
     pj_list_insert_before(&key->accept_list, accept_op);
     ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
     pj_mutex_unlock(key->mutex);
@@ -1083,6 +1123,13 @@
 	if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
 	    /* Pending! */
             pj_mutex_lock(key->mutex);
+	    /* Check again. Handle may have been closed after the previous 
+	     * check in multithreaded app. See #913
+	     */
+	    if (IS_CLOSING(key)) {
+		pj_mutex_unlock(key->mutex);
+		return PJ_ECANCELLED;
+	    }
 	    key->connecting = PJ_TRUE;
             ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
             ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
diff --git a/pjlib/src/pjlib-test/ioq_unreg.c b/pjlib/src/pjlib-test/ioq_unreg.c
index 5b32096..6b19ed1 100644
--- a/pjlib/src/pjlib-test/ioq_unreg.c
+++ b/pjlib/src/pjlib-test/ioq_unreg.c
@@ -93,9 +93,6 @@
 	if (PJ_TIME_VAL_GTE(now, time_to_unregister)) { 
 	    sock_data.unregistered = 1;
 	    pj_ioqueue_unregister(key);
-	    pj_mutex_destroy(sock_data.mutex);
-	    pj_pool_release(sock_data.pool);
-	    sock_data.pool = NULL;
 	    return;
 	}
     }
@@ -243,31 +240,30 @@
     /* Loop until test time ends */
     for (;;) {
 	pj_time_val now, timeout;
+	int n;
 
 	pj_gettimeofday(&now);
 
 	if (test_method == UNREGISTER_IN_APP && 
 	    PJ_TIME_VAL_GTE(now, time_to_unregister) &&
-	    sock_data.pool) 
+	    !sock_data.unregistered) 
 	{
-	    //Can't do this otherwise it'll deadlock
-	    //pj_mutex_lock(sock_data.mutex);
-
 	    sock_data.unregistered = 1;
+	    /* Wait (as much as possible) for callback to complete */
+	    pj_mutex_lock(sock_data.mutex);
+	    pj_mutex_unlock(sock_data.mutex);
 	    pj_ioqueue_unregister(sock_data.key);
-	    //pj_mutex_unlock(sock_data.mutex);
-	    pj_mutex_destroy(sock_data.mutex);
-	    pj_pool_release(sock_data.pool);
-	    sock_data.pool = NULL;
 	}
 
 	if (PJ_TIME_VAL_GT(now, end_time) && sock_data.unregistered)
 	    break;
 
 	timeout.sec = 0; timeout.msec = 10;
-	pj_ioqueue_poll(ioqueue, &timeout);
-	//pj_thread_sleep(1);
-
+	n = pj_ioqueue_poll(ioqueue, &timeout);
+	if (n < 0) {
+	    app_perror("pj_ioqueue_poll error", -n);
+	    pj_thread_sleep(1);
+	}
     }
 
     thread_quitting = 1;
@@ -277,6 +273,11 @@
 	pj_thread_destroy(thread[i]);
     }
 
+    /* Destroy data */
+    pj_mutex_destroy(sock_data.mutex);
+    pj_pool_release(sock_data.pool);
+    sock_data.pool = NULL;
+
     if (other_socket) {
 	pj_ioqueue_unregister(osd.key);
     }
@@ -314,7 +315,7 @@
 	return -12;
     }
 
-    PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 0/3 (%s)", 
+    PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 0/3, unregister in app (%s)", 
 	       pj_ioqueue_name()));
     for (i=0; i<LOOP; ++i) {
 	pj_ansi_sprintf(title, "repeat %d/%d", i, LOOP);
@@ -324,7 +325,7 @@
     }
 
 
-    PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 1/3 (%s)",
+    PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 1/3, unregister in app (%s)",
 	       pj_ioqueue_name()));
     for (i=0; i<LOOP; ++i) {
 	pj_ansi_sprintf(title, "repeat %d/%d", i, LOOP);
@@ -335,7 +336,7 @@
 
     test_method = UNREGISTER_IN_CALLBACK;
 
-    PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 2/3 (%s)", 
+    PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 2/3, unregister in cb (%s)", 
 	       pj_ioqueue_name()));
     for (i=0; i<LOOP; ++i) {
 	pj_ansi_sprintf(title, "repeat %d/%d", i, LOOP);
@@ -345,7 +346,7 @@
     }
 
 
-    PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 3/3 (%s)", 
+    PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 3/3, unregister in cb (%s)", 
 	       pj_ioqueue_name()));
     for (i=0; i<LOOP; ++i) {
 	pj_ansi_sprintf(title, "repeat %d/%d", i, LOOP);
@@ -366,7 +367,7 @@
 
     rc = udp_ioqueue_unreg_test_imp(PJ_TRUE);
     if (rc != 0)
-	return rc;
+    	return rc;
 
     rc = udp_ioqueue_unreg_test_imp(PJ_FALSE);
     if (rc != 0)