Fixed race condition bug in ioqueue unregistration for select and Win32 IOCP backend

git-svn-id: https://svn.pjsip.org/repos/pjproject/trunk@365 74dad513-b988-da41-8d7b-12977e46ad98
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c
index 30e2602..b128d81 100644
--- a/pjlib/src/pj/ioqueue_common_abs.c
+++ b/pjlib/src/pj/ioqueue_common_abs.c
@@ -27,25 +27,10 @@
  * This file is NOT supposed to be compiled as stand-alone source.
  */
 
-static long ioqueue_tls_id = -1;
-
-typedef struct key_lock_data {
-    struct key_lock_data *prev;
-    pj_ioqueue_key_t     *key;
-    int			  is_alive;
-} key_lock_data;
-
-
 static void ioqueue_init( pj_ioqueue_t *ioqueue )
 {
     ioqueue->lock = NULL;
     ioqueue->auto_delete_lock = 0;
-
-    if (ioqueue_tls_id == -1) {
-	pj_status_t status;
-	status = pj_thread_local_alloc(&ioqueue_tls_id);
-	pj_thread_local_set(ioqueue_tls_id, NULL);
-    }
 }
 
 static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
@@ -93,11 +78,20 @@
     pj_list_init(&key->write_list);
 #if PJ_HAS_TCP
     pj_list_init(&key->accept_list);
+    key->connecting = 0;
 #endif
 
     /* Save callback. */
     pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
 
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+    /* Set initial reference count to 1 */
+    pj_assert(key->ref_count == 0);
+    ++key->ref_count;
+
+    key->closing = 0;
+#endif
+
     /* Get socket type. When socket type is datagram, some optimization
      * will be performed during send to allow parallel send operations.
      */
@@ -107,68 +101,14 @@
     if (rc != PJ_SUCCESS)
         key->fd_type = PJ_SOCK_STREAM;
 
-    key->inside_callback = 0;
-    key->destroy_requested = 0;
-
     /* Create mutex for the key. */
-    rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
+#if !PJ_IOQUEUE_HAS_SAFE_UNREG
+    rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
+#endif
     
     return rc;
 }
 
-/* Lock the key and also keep the lock data in thread local storage.
- * The lock data is used to detect if the key is deleted by application
- * when we call its callback.
- */
-static void lock_key(pj_ioqueue_key_t *key, key_lock_data *lck)
-{
-    struct key_lock_data *prev_data;
-
-    pj_mutex_lock(key->mutex);
-    prev_data = (struct key_lock_data *) 
-                    pj_thread_local_get(ioqueue_tls_id);
-    lck->prev = prev_data;
-    lck->key = key;
-    lck->is_alive = 1;
-    pj_thread_local_set(ioqueue_tls_id, lck);
-}
-
-/* Unlock the key only if it is still valid. */
-static void unlock_key(pj_ioqueue_key_t *key, key_lock_data *lck)
-{
-    pj_assert( (void*)pj_thread_local_get(ioqueue_tls_id) == lck);
-    pj_assert( lck->key == key );
-    pj_thread_local_set(ioqueue_tls_id, lck->prev);
-    if (lck->is_alive)
-	pj_mutex_unlock(key->mutex);
-}
-
-/* Destroy key */
-static void ioqueue_destroy_key( pj_ioqueue_key_t *key )
-{
-    key_lock_data *lck;
-
-    /* Make sure that no other threads are doing something with
-     * the key.
-     */
-    pj_mutex_lock(key->mutex);
-    
-    /* Check if this function is called within a callback context.
-     * If so, then we need to inform the callback that the key has
-     * been destroyed so that it doesn't attempt to unlock the
-     * key's mutex.
-     */
-    lck = pj_thread_local_get(ioqueue_tls_id);
-    while (lck) {
-	if (lck->key == key) {
-	    lck->is_alive = 0;
-	}
-	lck = lck->prev;
-    }
-
-    pj_mutex_destroy(key->mutex);
-}
-
 /*
  * pj_ioqueue_get_user_data()
  *
@@ -221,6 +161,13 @@
 }
 
 
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+#   define IS_CLOSING(key)  (key->closing)
+#else
+#   define IS_CLOSING(key)  (0)
+#endif
+
+
 /*
  * ioqueue_dispatch_event()
  *
@@ -229,10 +176,13 @@
  */
 void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
 {
-    key_lock_data lck_data;
-
     /* Lock the key. */
-    lock_key(h, &lck_data);
+    pj_mutex_lock(h->mutex);
+
+    if (h->closing) {
+	pj_mutex_unlock(h->mutex);
+	return;
+    }
 
 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
     if (h->connecting) {
@@ -245,8 +195,6 @@
         ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
         ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
 
-        /* Unlock; from this point we don't need to hold key's mutex. */
-        //pj_mutex_unlock(h->mutex);
 
 #if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
 	/* from connect(2): 
@@ -293,8 +241,11 @@
 	}
 #endif
 
+        /* Unlock; from this point we don't need to hold key's mutex. */
+        pj_mutex_unlock(h->mutex);
+
 	/* Call callback. */
-        if (h->cb.on_connect_complete)
+        if (h->cb.on_connect_complete && !IS_CLOSING(h))
 	    (*h->cb.on_connect_complete)(h, bytes_transfered);
 
         /* Done. */
@@ -319,7 +270,6 @@
             if (pj_list_empty(&h->write_list))
                 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
 
-            //pj_mutex_unlock(h->mutex);
         }
 
         /* Send the data. 
@@ -365,19 +315,20 @@
                 if (pj_list_empty(&h->write_list))
                     ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
 
-                /* No need to hold mutex anymore */
-                //pj_mutex_unlock(h->mutex);
             }
 
+            /* No need to hold mutex anymore */
+            pj_mutex_unlock(h->mutex);
+
 	    /* Call callback. */
-            if (h->cb.on_write_complete) {
+            if (h->cb.on_write_complete && !IS_CLOSING(h)) {
 	        (*h->cb.on_write_complete)(h, 
                                            (pj_ioqueue_op_key_t*)write_op,
                                            write_op->written);
             }
 
         } else {
-            //pj_mutex_unlock(h->mutex);
+            pj_mutex_unlock(h->mutex);
         }
 
         /* Done. */
@@ -387,20 +338,21 @@
          * are signalled for the same event, but only one thread eventually
          * able to process the event.
          */
-        //pj_mutex_unlock(h->mutex);
+        pj_mutex_unlock(h->mutex);
     }
-
-    /* Finally unlock key */
-    unlock_key(h, &lck_data);
 }
 
 void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
 {
-    key_lock_data lck_data;
     pj_status_t rc;
 
     /* Lock the key. */
-    lock_key(h, &lck_data);
+    pj_mutex_lock(h->mutex);
+
+    if (h->closing) {
+	pj_mutex_unlock(h->mutex);
+	return;
+    }
 
 #   if PJ_HAS_TCP
     if (!pj_list_empty(&h->accept_list)) {
@@ -416,9 +368,6 @@
         if (pj_list_empty(&h->accept_list))
             ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
 
-        /* Unlock; from this point we don't need to hold key's mutex. */
-        //pj_mutex_unlock(h->mutex);
-
 	rc=pj_sock_accept(h->fd, accept_op->accept_fd, 
                           accept_op->rmt_addr, accept_op->addrlen);
 	if (rc==PJ_SUCCESS && accept_op->local_addr) {
@@ -427,8 +376,11 @@
 				     accept_op->addrlen);
 	}
 
+        /* Unlock; from this point we don't need to hold key's mutex. */
+        pj_mutex_unlock(h->mutex);
+
 	/* Call callback. */
-        if (h->cb.on_accept_complete) {
+        if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
 	    (*h->cb.on_accept_complete)(h, 
                                         (pj_ioqueue_op_key_t*)accept_op,
                                         *accept_op->accept_fd, rc);
@@ -449,11 +401,6 @@
         if (pj_list_empty(&h->read_list))
             ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
 
-        /* Unlock; from this point we don't need to hold key's mutex. */
-	//Crash as of revision 353 (since we added pjmedia socket to
-	//main ioqueue).
-        //pj_mutex_unlock(h->mutex);
-
         bytes_read = read_op->size;
 
 	if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
@@ -516,8 +463,11 @@
             bytes_read = -rc;
 	}
 
+        /* Unlock; from this point we don't need to hold key's mutex. */
+        pj_mutex_unlock(h->mutex);
+
 	/* Call callback. */
-        if (h->cb.on_read_complete) {
+        if (h->cb.on_read_complete && !IS_CLOSING(h)) {
 	    (*h->cb.on_read_complete)(h, 
                                       (pj_ioqueue_op_key_t*)read_op,
                                       bytes_read);
@@ -529,44 +479,41 @@
          * are signalled for the same event, but only one thread eventually
          * able to process the event.
          */
-        //pj_mutex_unlock(h->mutex);
+        pj_mutex_unlock(h->mutex);
     }
-
-    /* Unlock handle. */
-    unlock_key(h, &lck_data);
 }
 
 
 void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, 
                                        pj_ioqueue_key_t *h )
 {
-    key_lock_data lck_data;
-
-    lock_key(h, &lck_data);
+    pj_mutex_lock(h->mutex);
 
     if (!h->connecting) {
         /* It is possible that more than one thread was woken up, thus
          * the remaining thread will see h->connecting as zero because
          * it has been processed by other thread.
          */
-        //pj_mutex_unlock(h->mutex);
-	unlock_key(h, &lck_data);
+        pj_mutex_unlock(h->mutex);
         return;
     }
 
+    if (h->closing) {
+	pj_mutex_unlock(h->mutex);
+	return;
+    }
+
     /* Clear operation. */
     h->connecting = 0;
 
-    //pj_mutex_unlock(h->mutex);
-
     ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
     ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
 
-    /* Call callback. */
-    if (h->cb.on_connect_complete)
-	(*h->cb.on_connect_complete)(h, -1);
+    pj_mutex_unlock(h->mutex);
 
-    unlock_key(h, &lck_data);
+    /* Call callback. */
+    if (h->cb.on_connect_complete && !IS_CLOSING(h))
+	(*h->cb.on_connect_complete)(h, -1);
 }
 
 /*
@@ -588,6 +535,10 @@
     read_op = (struct read_operation*)op_key;
     read_op->op = 0;
 
+    /* Check if key is closing. */
+    if (key->closing)
+	return PJ_ECANCELLED;
+
     /* Try to see if there's data immediately available. 
      */
     if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
@@ -646,6 +597,10 @@
     PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
     PJ_CHECK_STACK();
 
+    /* Check if key is closing. */
+    if (key->closing)
+	return PJ_ECANCELLED;
+
     read_op = (struct read_operation*)op_key;
     read_op->op = 0;
 
@@ -710,6 +665,10 @@
     PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
     PJ_CHECK_STACK();
 
+    /* Check if key is closing. */
+    if (key->closing)
+	return PJ_ECANCELLED;
+
     write_op = (struct write_operation*)op_key;
     write_op->op = 0;
 
@@ -788,6 +747,10 @@
     PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
     PJ_CHECK_STACK();
 
+    /* Check if key is closing. */
+    if (key->closing)
+	return PJ_ECANCELLED;
+
     write_op = (struct write_operation*)op_key;
     write_op->op = 0;
 
@@ -869,6 +832,10 @@
     /* check parameters. All must be specified! */
     PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
 
+    /* Check if key is closing. */
+    if (key->closing)
+	return PJ_ECANCELLED;
+
     accept_op = (struct accept_operation*)op_key;
     accept_op->op = 0;
 
@@ -930,6 +897,10 @@
     /* check parameters. All must be specified! */
     PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
 
+    /* Check if key is closing. */
+    if (key->closing)
+	return PJ_ECANCELLED;
+
     /* Check if socket has not been marked for connecting */
     if (key->connecting != 0)
         return PJ_EPENDING;
@@ -986,13 +957,12 @@
                                                 pj_ssize_t bytes_status )
 {
     struct generic_operation *op_rec;
-    key_lock_data lck_data;
 
     /*
      * Find the operation key in all pending operation list to
      * really make sure that it's still there; then call the callback.
      */
-    lock_key(key, &lck_data);
+    pj_mutex_lock(key->mutex);
 
     /* Find the operation in the pending read list. */
     op_rec = (struct generic_operation*)key->read_list.next;
@@ -1000,11 +970,9 @@
         if (op_rec == (void*)op_key) {
             pj_list_erase(op_rec);
             op_rec->op = 0;
-            //pj_mutex_unlock(key->mutex);
+            pj_mutex_unlock(key->mutex);
 
             (*key->cb.on_read_complete)(key, op_key, bytes_status);
-
-	    unlock_key(key, &lck_data);
             return PJ_SUCCESS;
         }
         op_rec = op_rec->next;
@@ -1016,11 +984,9 @@
         if (op_rec == (void*)op_key) {
             pj_list_erase(op_rec);
             op_rec->op = 0;
-            //pj_mutex_unlock(key->mutex);
+            pj_mutex_unlock(key->mutex);
 
             (*key->cb.on_write_complete)(key, op_key, bytes_status);
-
-	    unlock_key(key, &lck_data);
             return PJ_SUCCESS;
         }
         op_rec = op_rec->next;
@@ -1032,19 +998,17 @@
         if (op_rec == (void*)op_key) {
             pj_list_erase(op_rec);
             op_rec->op = 0;
-            //pj_mutex_unlock(key->mutex);
+            pj_mutex_unlock(key->mutex);
 
             (*key->cb.on_accept_complete)(key, op_key, 
                                           PJ_INVALID_SOCKET,
                                           bytes_status);
-
-	    unlock_key(key, &lck_data);
             return PJ_SUCCESS;
         }
         op_rec = op_rec->next;
     }
 
-    unlock_key(key, &lck_data);
+    pj_mutex_unlock(key->mutex);
     
     return PJ_EINVALIDOP;
 }