Fixed bug in ioqueue with IO Completion Port backend, where events may still be called after key is unregistered

git-svn-id: https://svn.pjsip.org/repos/pjproject/trunk@349 74dad513-b988-da41-8d7b-12977e46ad98
diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c
index 79e4618..40dd31b 100644
--- a/pjlib/src/pj/ioqueue_winnt.c
+++ b/pjlib/src/pj/ioqueue_winnt.c
@@ -99,6 +99,8 @@
     HND_IS_SOCKET,
 };
 
+enum { POST_QUIT_LEN = 0xFFFFDEADUL };
+
 /*
  * Structure for individual socket.
  */
@@ -112,6 +114,7 @@
     int			connecting;
 #endif
     pj_ioqueue_callback	cb;
+    pj_bool_t		has_quit_signal;
 };
 
 /*
@@ -392,37 +395,6 @@
     return PJ_SUCCESS;
 }
 
-/*
- * pj_ioqueue_unregister()
- */
-PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
-{
-    PJ_ASSERT_RETURN(key, PJ_EINVAL);
-
-#if PJ_HAS_TCP
-    if (key->connecting) {
-	unsigned pos;
-        pj_ioqueue_t *ioqueue;
-
-        ioqueue = key->ioqueue;
-
-	/* Erase from connecting_handles */
-	pj_lock_acquire(ioqueue->lock);
-	for (pos=0; pos < ioqueue->connecting_count; ++pos) {
-	    if (ioqueue->connecting_keys[pos] == key) {
-		erase_connecting_socket(ioqueue, pos);
-		break;
-	    }
-	}
-	key->connecting = 0;
-	pj_lock_release(ioqueue->lock);
-    }
-#endif
-    if (key->hnd_type == HND_IS_FILE) {
-        CloseHandle(key->hnd);
-    }
-    return PJ_SUCCESS;
-}
 
 /*
  * pj_ioqueue_get_user_data()
@@ -449,34 +421,25 @@
     return PJ_SUCCESS;
 }
 
+
+
 /*
- * pj_ioqueue_poll()
- *
- * Poll for events.
+ * Internal function to poll the I/O Completion Port, execute callback, 
+ * and return the key and bytes transfered of the last operation.
  */
-PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
+static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, 
+			    pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key )
 {
-    DWORD dwMsec, dwBytesTransfered, dwKey;
+    DWORD dwBytesTransfered, dwKey;
     generic_overlapped *pOv;
     pj_ioqueue_key_t *key;
-    int connect_count;
     pj_ssize_t size_status = -1;
-    BOOL rcGetQueued;;
-
-    PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
-
-    /* Check the connecting array. */
-#if PJ_HAS_TCP
-    connect_count = check_connecting(ioqueue);
-#endif
-
-    /* Calculate miliseconds timeout for GetQueuedCompletionStatus */
-    dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
+    BOOL rcGetQueued;
 
     /* Poll for completion status. */
-    rcGetQueued = GetQueuedCompletionStatus(ioqueue->iocp, &dwBytesTransfered,
+    rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransfered,
 					    &dwKey, (OVERLAPPED**)&pOv, 
-					    dwMsec);
+					    dwTimeout);
 
     /* The return value is:
      * - nonzero if event was dequeued.
@@ -487,6 +450,25 @@
 	/* Event was dequeued for either successfull or failed I/O */
 	key = (pj_ioqueue_key_t*)dwKey;
 	size_status = dwBytesTransfered;
+
+	/* Report to caller regardless */
+	if (p_bytes)
+	    *p_bytes = size_status;
+	if (p_key)
+	    *p_key = key;
+
+	/* If size_status is POST_QUIT_LEN, mark the key as quitting */
+	if (size_status == POST_QUIT_LEN) {
+	    key->has_quit_signal = 1;
+	    return PJ_TRUE;
+	}
+
+	/* We shouldn't call callbacks if key is quitting. 
+	 * But this should have been taken care by unregister function
+	 * (the unregister function should have cleared out the callbacks)
+	 */
+
+	/* Carry out the callback */
 	switch (pOv->operation) {
 	case PJ_IOQUEUE_OP_READ:
 	case PJ_IOQUEUE_OP_RECV:
@@ -522,11 +504,121 @@
 	    pj_assert(0);
 	    break;
 	}
-	return connect_count+1;
+	return PJ_TRUE;
     }
 
     /* No event was queued. */
-    return connect_count;
+    return PJ_FALSE;
+}
+
+/*
+ * pj_ioqueue_unregister()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
+{
+    pj_ssize_t polled_len;
+    pj_ioqueue_key_t *polled_key;
+    generic_overlapped ov;
+    BOOL rc;
+
+    PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+#if PJ_HAS_TCP
+    if (key->connecting) {
+	unsigned pos;
+        pj_ioqueue_t *ioqueue;
+
+        ioqueue = key->ioqueue;
+
+	/* Erase from connecting_handles */
+	pj_lock_acquire(ioqueue->lock);
+	for (pos=0; pos < ioqueue->connecting_count; ++pos) {
+	    if (ioqueue->connecting_keys[pos] == key) {
+		erase_connecting_socket(ioqueue, pos);
+		break;
+	    }
+	}
+	key->connecting = 0;
+	pj_lock_release(ioqueue->lock);
+    }
+#endif
+
+
+    /* Unregistering handle from IOCP is pretty tricky.
+     *
+     * Even after the socket has been closed, GetQueuedCompletionStatus
+     * may still return events for the handle. This will likely to
+     * cause crash in pjlib, because the key associated with the handle
+     * most likely will have been destroyed.
+     *
+     * The solution is to poll the IOCP until we're sure that there are
+     * no further events for the handle.
+     */
+
+    /* Clear up callbacks for the key. 
+     * We don't want the callback to be called for this key.
+     */
+    key->cb.on_read_complete = NULL;
+    key->cb.on_write_complete = NULL;
+    key->cb.on_accept_complete = NULL;
+    key->cb.on_connect_complete = NULL;
+
+    /* Init overlapped struct */
+    pj_memset(&ov, 0, sizeof(ov));
+    ov.operation = PJ_IOQUEUE_OP_READ;
+
+    /* Post queued completion status with a special length. */
+    rc = PostQueuedCompletionStatus( key->ioqueue->iocp, (DWORD)POST_QUIT_LEN,
+				     (DWORD)key, &ov.overlapped);
+
+    /* Poll IOCP until has_quit_signal is set in the key.
+     * The has_quit_signal flag is set in poll_iocp() when POST_QUIT_LEN
+     * is detected. We need to have this flag because POST_QUIT_LEN may be
+     * detected by other threads.
+     */
+    do {
+	polled_len = 0;
+	polled_key = NULL;
+
+	rc = poll_iocp(key->ioqueue->iocp, 0, &polled_len, &polled_key);
+
+    } while (rc && !key->has_quit_signal);
+
+
+    /* Close handle if this is a file. */
+    if (key->hnd_type == HND_IS_FILE) {
+        CloseHandle(key->hnd);
+    }
+
+    return PJ_SUCCESS;
+}
+
+/*
+ * pj_ioqueue_poll()
+ *
+ * Poll for events.
+ */
+PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
+{
+    DWORD dwMsec;
+    int connect_count = 0;
+    pj_bool_t has_event;
+
+    PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
+
+    /* Check the connecting array. */
+#if PJ_HAS_TCP
+    connect_count = check_connecting(ioqueue);
+#endif
+
+    /* Calculate miliseconds timeout for GetQueuedCompletionStatus */
+    dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
+
+    /* Poll for completion status. */
+    has_event = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);
+
+    /* Return number of events. */
+    return connect_count + has_event;
 }
 
 /*