Ticket #474: option in ioqueue to control concurrency (to allow/disallow simultaneous/multiple callback calls)

git-svn-id: https://svn.pjsip.org/repos/pjproject/trunk@1789 74dad513-b988-da41-8d7b-12977e46ad98
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c
index fdd1afe..0af9cba 100644
--- a/pjlib/src/pj/ioqueue_common_abs.c
+++ b/pjlib/src/pj/ioqueue_common_abs.c
@@ -96,6 +96,10 @@
     key->closing = 0;
 #endif
 
+    rc = pj_ioqueue_set_concurrency(key, ioqueue->default_concurrency);
+    if (rc != PJ_SUCCESS)
+	return rc;
+
     /* Get socket type. When socket type is datagram, some optimization
      * will be performed during send to allow parallel send operations.
      */
@@ -193,6 +197,7 @@
     if (h->connecting) {
 	/* Completion of connect() operation */
 	pj_ssize_t bytes_transfered;
+	pj_bool_t has_lock;
 
 	/* Clear operation. */
 	h->connecting = 0;
@@ -246,13 +251,28 @@
 	}
 #endif
 
-        /* Unlock; from this point we don't need to hold key's mutex. */
-        pj_mutex_unlock(h->mutex);
+        /* Unlock; from this point we don't need to hold key's mutex
+	 * (unless concurrency is disabled, which in this case we should
+	 * hold the mutex while calling the callback) */
+	if (h->allow_concurrent) {
+	    /* concurrency may be changed while we're in the callback, so
+	     * save it to a flag.
+	     */
+	    has_lock = PJ_FALSE;
+	    pj_mutex_unlock(h->mutex);
+	} else {
+	    has_lock = PJ_TRUE;
+	}
 
 	/* Call callback. */
         if (h->cb.on_connect_complete && !IS_CLOSING(h))
 	    (*h->cb.on_connect_complete)(h, bytes_transfered);
 
+	/* Unlock if we still hold the lock */
+	if (has_lock) {
+	    pj_mutex_unlock(h->mutex);
+	}
+
         /* Done. */
 
     } else 
@@ -317,6 +337,7 @@
             write_op->written == (pj_ssize_t)write_op->size ||
             h->fd_type == pj_SOCK_DGRAM()) 
         {
+	    pj_bool_t has_lock;
 
 	    write_op->op = PJ_IOQUEUE_OP_NONE;
 
@@ -330,8 +351,18 @@
 
             }
 
-            /* No need to hold mutex anymore */
-            pj_mutex_unlock(h->mutex);
+	    /* Unlock; from this point we don't need to hold key's mutex
+	     * (unless concurrency is disabled, which in this case we should
+	     * hold the mutex while calling the callback) */
+	    if (h->allow_concurrent) {
+		/* concurrency may be changed while we're in the callback, so
+		 * save it to a flag.
+		 */
+		has_lock = PJ_FALSE;
+		pj_mutex_unlock(h->mutex);
+	    } else {
+		has_lock = PJ_TRUE;
+	    }
 
 	    /* Call callback. */
             if (h->cb.on_write_complete && !IS_CLOSING(h)) {
@@ -340,6 +371,10 @@
                                            write_op->written);
             }
 
+	    if (has_lock) {
+		pj_mutex_unlock(h->mutex);
+	    }
+
         } else {
             pj_mutex_unlock(h->mutex);
         }
@@ -371,6 +406,7 @@
     if (!pj_list_empty(&h->accept_list)) {
 
         struct accept_operation *accept_op;
+	pj_bool_t has_lock;
 	
         /* Get one accept operation from the list. */
 	accept_op = h->accept_list.next;
@@ -389,8 +425,18 @@
 				     accept_op->addrlen);
 	}
 
-        /* Unlock; from this point we don't need to hold key's mutex. */
-        pj_mutex_unlock(h->mutex);
+	/* Unlock; from this point we don't need to hold key's mutex
+	 * (unless concurrency is disabled, which in this case we should
+	 * hold the mutex while calling the callback) */
+	if (h->allow_concurrent) {
+	    /* concurrency may be changed while we're in the callback, so
+	     * save it to a flag.
+	     */
+	    has_lock = PJ_FALSE;
+	    pj_mutex_unlock(h->mutex);
+	} else {
+	    has_lock = PJ_TRUE;
+	}
 
 	/* Call callback. */
         if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
@@ -399,12 +445,16 @@
                                         *accept_op->accept_fd, rc);
 	}
 
+	if (has_lock) {
+	    pj_mutex_unlock(h->mutex);
+	}
     }
     else
 #   endif
     if (key_has_pending_read(h)) {
         struct read_operation *read_op;
         pj_ssize_t bytes_read;
+	pj_bool_t has_lock;
 
         /* Get one pending read operation from the list. */
         read_op = h->read_list.next;
@@ -479,8 +529,18 @@
             bytes_read = -rc;
 	}
 
-        /* Unlock; from this point we don't need to hold key's mutex. */
-        pj_mutex_unlock(h->mutex);
+	/* Unlock; from this point we don't need to hold key's mutex
+	 * (unless concurrency is disabled, which in this case we should
+	 * hold the mutex while calling the callback) */
+	if (h->allow_concurrent) {
+	    /* concurrency may be changed while we're in the callback, so
+	     * save it to a flag.
+	     */
+	    has_lock = PJ_FALSE;
+	    pj_mutex_unlock(h->mutex);
+	} else {
+	    has_lock = PJ_TRUE;
+	}
 
 	/* Call callback. */
         if (h->cb.on_read_complete && !IS_CLOSING(h)) {
@@ -489,6 +549,10 @@
                                       bytes_read);
         }
 
+	if (has_lock) {
+	    pj_mutex_unlock(h->mutex);
+	}
+
     } else {
         /*
          * This is normal; execution may fall here when multiple threads
@@ -503,6 +567,8 @@
 void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, 
                                        pj_ioqueue_key_t *h )
 {
+    pj_bool_t has_lock;
+
     pj_mutex_lock(h->mutex);
 
     if (!h->connecting) {
@@ -525,7 +591,18 @@
     ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
     ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
 
-    pj_mutex_unlock(h->mutex);
+    /* Unlock; from this point we don't need to hold key's mutex
+     * (unless concurrency is disabled, which in this case we should
+     * hold the mutex while calling the callback) */
+    if (h->allow_concurrent) {
+	/* concurrency may be changed while we're in the callback, so
+	 * save it to a flag.
+	 */
+	has_lock = PJ_FALSE;
+	pj_mutex_unlock(h->mutex);
+    } else {
+	has_lock = PJ_TRUE;
+    }
 
     /* Call callback. */
     if (h->cb.on_connect_complete && !IS_CLOSING(h)) {
@@ -542,6 +619,10 @@
 
 	(*h->cb.on_connect_complete)(h, status);
     }
+
+    if (has_lock) {
+	pj_mutex_unlock(h->mutex);
+    }
 }
 
 /*
@@ -1096,3 +1177,36 @@
     return PJ_EINVALIDOP;
 }
 
+PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency( pj_ioqueue_t *ioqueue,
+							pj_bool_t allow)
+{
+    PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
+    ioqueue->default_concurrency = allow;
+    return PJ_SUCCESS;
+}
+
+
+PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
+					       pj_bool_t allow)
+{
+    PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+    /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
+     * disabled.
+     */
+    PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
+
+    key->allow_concurrent = allow;
+    return PJ_SUCCESS;
+}
+
+PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
+{
+    return pj_mutex_lock(key->mutex);
+}
+
+PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
+{
+    return pj_mutex_unlock(key->mutex);
+}
+
diff --git a/pjlib/src/pj/ioqueue_common_abs.h b/pjlib/src/pj/ioqueue_common_abs.h
index 194f884..4d35632 100644
--- a/pjlib/src/pj/ioqueue_common_abs.h
+++ b/pjlib/src/pj/ioqueue_common_abs.h
@@ -103,6 +103,7 @@
     pj_mutex_t             *mutex;                  \
     pj_bool_t		    inside_callback;	    \
     pj_bool_t		    destroy_requested;	    \
+    pj_bool_t		    allow_concurrent;	    \
     pj_sock_t		    fd;                     \
     int                     fd_type;                \
     void		   *user_data;              \
@@ -116,7 +117,8 @@
 
 #define DECLARE_COMMON_IOQUEUE                      \
     pj_lock_t          *lock;                       \
-    pj_bool_t           auto_delete_lock;
+    pj_bool_t           auto_delete_lock;	    \
+    pj_bool_t		default_concurrency;
 
 
 enum ioqueue_event_type
diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c
index 3e8253e..3c6b9fb 100644
--- a/pjlib/src/pj/ioqueue_winnt.c
+++ b/pjlib/src/pj/ioqueue_winnt.c
@@ -114,6 +114,7 @@
     void	       *user_data;
     enum handle_type    hnd_type;
     pj_ioqueue_callback	cb;
+    pj_bool_t		allow_concurrent;
 
 #if PJ_HAS_TCP
     int			connecting;
@@ -123,6 +124,7 @@
     pj_atomic_t	       *ref_count;
     pj_bool_t		closing;
     pj_time_val		free_time;
+    pj_mutex_t	       *mutex;
 #endif
 
 };
@@ -135,6 +137,7 @@
     HANDLE	      iocp;
     pj_lock_t        *lock;
     pj_bool_t         auto_delete_lock;
+    pj_bool_t	      default_concurrency;
 
 #if PJ_IOQUEUE_HAS_SAFE_UNREG
     pj_ioqueue_key_t  active_list;
@@ -153,6 +156,12 @@
 };
 
 
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+/* Prototype */
+static void scan_closing_keys(pj_ioqueue_t *ioqueue);
+#endif
+
+
 #if PJ_HAS_TCP
 /*
  * Process the socket when the overlapped accept() completed.
@@ -315,13 +324,14 @@
 	return PJ_RETURN_OS_ERROR(GetLastError());
 
     /* Create IOCP mutex */
-    rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock);
+    rc = pj_lock_create_recursive_mutex(pool, NULL, &ioqueue->lock);
     if (rc != PJ_SUCCESS) {
 	CloseHandle(ioqueue->iocp);
 	return rc;
     }
 
     ioqueue->auto_delete_lock = PJ_TRUE;
+    ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
 
 #if PJ_IOQUEUE_HAS_SAFE_UNREG
     /*
@@ -344,6 +354,20 @@
 	    key = ioqueue->free_list.next;
 	    while (key != &ioqueue->free_list) {
 		pj_atomic_destroy(key->ref_count);
+		pj_mutex_destroy(key->mutex);
+		key = key->next;
+	    }
+	    CloseHandle(ioqueue->iocp);
+	    return rc;
+	}
+
+	rc = pj_mutex_create_recursive(pool, "ioqkey", &key->mutex);
+	if (rc != PJ_SUCCESS) {
+	    pj_atomic_destroy(key->ref_count);
+	    key = ioqueue->free_list.next;
+	    while (key != &ioqueue->free_list) {
+		pj_atomic_destroy(key->ref_count);
+		pj_mutex_destroy(key->mutex);
 		key = key->next;
 	    }
 	    CloseHandle(ioqueue->iocp);
@@ -351,7 +375,6 @@
 	}
 
 	pj_list_push_back(&ioqueue->free_list, key);
-
     }
 #endif
 
@@ -392,18 +415,21 @@
     key = ioqueue->active_list.next;
     while (key != &ioqueue->active_list) {
 	pj_atomic_destroy(key->ref_count);
+	pj_mutex_destroy(key->mutex);
 	key = key->next;
     }
 
     key = ioqueue->closing_list.next;
     while (key != &ioqueue->closing_list) {
 	pj_atomic_destroy(key->ref_count);
+	pj_mutex_destroy(key->mutex);
 	key = key->next;
     }
 
     key = ioqueue->free_list.next;
     while (key != &ioqueue->free_list) {
 	pj_atomic_destroy(key->ref_count);
+	pj_mutex_destroy(key->mutex);
 	key = key->next;
     }
 #endif
@@ -414,6 +440,15 @@
     return PJ_SUCCESS;
 }
 
+
+PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue,
+						       pj_bool_t allow)
+{
+    PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
+    ioqueue->default_concurrency = allow;
+    return PJ_SUCCESS;
+}
+
 /*
  * pj_ioqueue_set_lock()
  */
@@ -453,6 +488,11 @@
     pj_lock_acquire(ioqueue->lock);
 
 #if PJ_IOQUEUE_HAS_SAFE_UNREG
+    /* Scan closing list first to release unused keys.
+     * Must do this with lock acquired.
+     */
+    scan_closing_keys(ioqueue);
+
     /* If safe unregistration is used, then get the key record from
      * the free list.
      */
@@ -481,6 +521,13 @@
     rec->user_data = user_data;
     pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
 
+    /* Set concurrency for this handle */
+    rc = pj_ioqueue_set_concurrency(rec, ioqueue->default_concurrency);
+    if (rc != PJ_SUCCESS) {
+	pj_lock_release(ioqueue->lock);
+	return rc;
+    }
+
 #if PJ_HAS_TCP
     rec->connecting = 0;
 #endif
@@ -585,6 +632,8 @@
      * - zero and pOv!=NULL if event for failed I/O was dequeued.
      */
     if (pOv) {
+	pj_bool_t has_lock;
+
 	/* Event was dequeued for either successfull or failed I/O */
 	key = (pj_ioqueue_key_t*)dwKey;
 	size_status = dwBytesTransfered;
@@ -600,10 +649,30 @@
 	if (key->closing)
 	    return PJ_TRUE;
 
+	/* If concurrency is disabled, lock the key 
+	 * (and save the lock status to local var since app may change
+	 * concurrency setting while in the callback) */
+	if (key->allow_concurrent == PJ_FALSE) {
+	    pj_mutex_lock(key->mutex);
+	    has_lock = PJ_TRUE;
+	} else {
+	    has_lock = PJ_FALSE;
+	}
+
+	/* Now that we get the lock, check again that key is not closing */
+	if (key->closing) {
+	    if (has_lock) {
+		pj_mutex_unlock(key->mutex);
+	    }
+	    return PJ_TRUE;
+	}
+
 	/* Increment reference counter to prevent this key from being
 	 * deleted
 	 */
 	pj_atomic_inc(key->ref_count);
+#else
+	PJ_UNUSED_ARG(has_lock);
 #endif
 
 	/* Carry out the callback */
@@ -654,6 +723,8 @@
 
 #if PJ_IOQUEUE_HAS_SAFE_UNREG
 	decrement_counter(key);
+	if (has_lock)
+	    pj_mutex_unlock(key->mutex);
 #endif
 
 	return PJ_TRUE;
@@ -669,6 +740,7 @@
 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
 {
     unsigned i;
+    pj_bool_t has_lock;
     enum { RETRY = 10 };
 
     PJ_ASSERT_RETURN(key, PJ_EINVAL);
@@ -696,6 +768,18 @@
 #if PJ_IOQUEUE_HAS_SAFE_UNREG
     /* Mark key as closing before closing handle. */
     key->closing = 1;
+
+    /* If concurrency is disabled, wait until the key has finished
+     * processing the callback
+     */
+    if (key->allow_concurrent == PJ_FALSE) {
+	pj_mutex_lock(key->mutex);
+	has_lock = PJ_TRUE;
+    } else {
+	has_lock = PJ_FALSE;
+    }
+#else
+    PJ_UNUSED_ARG(has_lock);
 #endif
     
     /* Close handle (the only way to disassociate handle from IOCP). 
@@ -717,6 +801,11 @@
      *
      * Forcing context switch seems to have fixed that, but this is quite
      * an ugly solution..
+     *
+     * Update 2008/02/13:
+     *	This should not happen if concurrency is disallowed for the key.
+     *  So at least application has a solution for this (i.e. by disallowing
+     *  concurrency in the key).
      */
     //This will loop forever if unregistration is done on the callback.
     //Doing this with RETRY I think should solve the IOCP setting the 
@@ -728,11 +817,45 @@
 
     /* Decrement reference counter to destroy the key. */
     decrement_counter(key);
+
+    if (has_lock)
+	pj_mutex_unlock(key->mutex);
 #endif
 
     return PJ_SUCCESS;
 }
 
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+/* Scan the closing list, and put pending closing keys to free list.
+ * Must do this with ioqueue mutex held.
+ */
+static void scan_closing_keys(pj_ioqueue_t *ioqueue)
+{
+    if (!pj_list_empty(&ioqueue->closing_list)) {
+	pj_time_val now;
+	pj_ioqueue_key_t *key;
+
+	pj_gettimeofday(&now);
+	
+	/* Move closing keys to free list when they've finished the closing
+	 * idle time.
+	 */
+	key = ioqueue->closing_list.next;
+	while (key != &ioqueue->closing_list) {
+	    pj_ioqueue_key_t *next = key->next;
+
+	    pj_assert(key->closing != 0);
+
+	    if (PJ_TIME_VAL_GTE(now, key->free_time)) {
+		pj_list_erase(key);
+		pj_list_push_back(&ioqueue->free_list, key);
+	    }
+	    key = next;
+	}
+    }
+}
+#endif
+
 /*
  * pj_ioqueue_poll()
  *
@@ -766,32 +889,10 @@
 #if PJ_IOQUEUE_HAS_SAFE_UNREG
     /* Check the closing keys only when there's no activity and when there are
      * pending closing keys.
-     * blp:
-     *	no, always check the list. Otherwise on busy activity, this will cause
-     *  ioqueue to reject new registration.
      */
-    if (/*event_count == 0 &&*/ !pj_list_empty(&ioqueue->closing_list)) {
-	pj_time_val now;
-	pj_ioqueue_key_t *key;
-
-	pj_gettimeofday(&now);
-	
-	/* Move closing keys to free list when they've finished the closing
-	 * idle time.
-	 */
+    if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) {
 	pj_lock_acquire(ioqueue->lock);
-	key = ioqueue->closing_list.next;
-	while (key != &ioqueue->closing_list) {
-	    pj_ioqueue_key_t *next = key->next;
-
-	    pj_assert(key->closing != 0);
-
-	    if (PJ_TIME_VAL_GTE(now, key->free_time)) {
-		pj_list_erase(key);
-		pj_list_push_back(&ioqueue->free_list, key);
-	    }
-	    key = next;
-	}
+	scan_closing_keys(ioqueue);
 	pj_lock_release(ioqueue->lock);
     }
 #endif
@@ -1268,3 +1369,35 @@
     return PJ_SUCCESS;
 }
 
+PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
+					       pj_bool_t allow)
+{
+    PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+    /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
+     * disabled.
+     */
+    PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
+
+    key->allow_concurrent = allow;
+    return PJ_SUCCESS;
+}
+
+PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
+{
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+    return pj_mutex_lock(key->mutex);
+#else
+    PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
+#endif
+}
+
+PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
+{
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+    return pj_mutex_unlock(key->mutex);
+#else
+    PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
+#endif
+}
+