Fixed bug in ioqueue with IO Completion Port backend, where events may still be called after key is unregistered

git-svn-id: https://svn.pjsip.org/repos/pjproject/trunk@349 74dad513-b988-da41-8d7b-12977e46ad98
diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c
index 79e4618..40dd31b 100644
--- a/pjlib/src/pj/ioqueue_winnt.c
+++ b/pjlib/src/pj/ioqueue_winnt.c
@@ -99,6 +99,8 @@
     HND_IS_SOCKET,
 };
 
+enum { POST_QUIT_LEN = 0xFFFFDEADUL };
+
 /*
  * Structure for individual socket.
  */
@@ -112,6 +114,7 @@
     int			connecting;
 #endif
     pj_ioqueue_callback	cb;
+    pj_bool_t		has_quit_signal;
 };
 
 /*
@@ -392,37 +395,6 @@
     return PJ_SUCCESS;
 }
 
-/*
- * pj_ioqueue_unregister()
- */
-PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
-{
-    PJ_ASSERT_RETURN(key, PJ_EINVAL);
-
-#if PJ_HAS_TCP
-    if (key->connecting) {
-	unsigned pos;
-        pj_ioqueue_t *ioqueue;
-
-        ioqueue = key->ioqueue;
-
-	/* Erase from connecting_handles */
-	pj_lock_acquire(ioqueue->lock);
-	for (pos=0; pos < ioqueue->connecting_count; ++pos) {
-	    if (ioqueue->connecting_keys[pos] == key) {
-		erase_connecting_socket(ioqueue, pos);
-		break;
-	    }
-	}
-	key->connecting = 0;
-	pj_lock_release(ioqueue->lock);
-    }
-#endif
-    if (key->hnd_type == HND_IS_FILE) {
-        CloseHandle(key->hnd);
-    }
-    return PJ_SUCCESS;
-}
 
 /*
  * pj_ioqueue_get_user_data()
@@ -449,34 +421,25 @@
     return PJ_SUCCESS;
 }
 
+
+
 /*
- * pj_ioqueue_poll()
- *
- * Poll for events.
+ * Internal function to poll the I/O Completion Port, execute callback, 
+ * and return the key and bytes transfered of the last operation.
  */
-PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
+static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, 
+			    pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key )
 {
-    DWORD dwMsec, dwBytesTransfered, dwKey;
+    DWORD dwBytesTransfered, dwKey;
     generic_overlapped *pOv;
     pj_ioqueue_key_t *key;
-    int connect_count;
     pj_ssize_t size_status = -1;
-    BOOL rcGetQueued;;
-
-    PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
-
-    /* Check the connecting array. */
-#if PJ_HAS_TCP
-    connect_count = check_connecting(ioqueue);
-#endif
-
-    /* Calculate miliseconds timeout for GetQueuedCompletionStatus */
-    dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
+    BOOL rcGetQueued;
 
     /* Poll for completion status. */
-    rcGetQueued = GetQueuedCompletionStatus(ioqueue->iocp, &dwBytesTransfered,
+    rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransfered,
 					    &dwKey, (OVERLAPPED**)&pOv, 
-					    dwMsec);
+					    dwTimeout);
 
     /* The return value is:
      * - nonzero if event was dequeued.
@@ -487,6 +450,25 @@
 	/* Event was dequeued for either successfull or failed I/O */
 	key = (pj_ioqueue_key_t*)dwKey;
 	size_status = dwBytesTransfered;
+
+	/* Report to caller regardless */
+	if (p_bytes)
+	    *p_bytes = size_status;
+	if (p_key)
+	    *p_key = key;
+
+	/* If size_status is POST_QUIT_LEN, mark the key as quitting */
+	if (size_status == POST_QUIT_LEN) {
+	    key->has_quit_signal = 1;
+	    return PJ_TRUE;
+	}
+
+	/* We shouldn't call callbacks if key is quitting. 
+	 * But this should have been taken care by unregister function
+	 * (the unregister function should have cleared out the callbacks)
+	 */
+
+	/* Carry out the callback */
 	switch (pOv->operation) {
 	case PJ_IOQUEUE_OP_READ:
 	case PJ_IOQUEUE_OP_RECV:
@@ -522,11 +504,121 @@
 	    pj_assert(0);
 	    break;
 	}
-	return connect_count+1;
+	return PJ_TRUE;
     }
 
     /* No event was queued. */
-    return connect_count;
+    return PJ_FALSE;
+}
+
+/*
+ * pj_ioqueue_unregister()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
+{
+    pj_ssize_t polled_len;
+    pj_ioqueue_key_t *polled_key;
+    generic_overlapped ov;
+    BOOL rc;
+
+    PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+#if PJ_HAS_TCP
+    if (key->connecting) {
+	unsigned pos;
+        pj_ioqueue_t *ioqueue;
+
+        ioqueue = key->ioqueue;
+
+	/* Erase from connecting_handles */
+	pj_lock_acquire(ioqueue->lock);
+	for (pos=0; pos < ioqueue->connecting_count; ++pos) {
+	    if (ioqueue->connecting_keys[pos] == key) {
+		erase_connecting_socket(ioqueue, pos);
+		break;
+	    }
+	}
+	key->connecting = 0;
+	pj_lock_release(ioqueue->lock);
+    }
+#endif
+
+
+    /* Unregistering handle from IOCP is pretty tricky.
+     *
+     * Even after the socket has been closed, GetQueuedCompletionStatus
+     * may still return events for the handle. This will likely to
+     * cause crash in pjlib, because the key associated with the handle
+     * most likely will have been destroyed.
+     *
+     * The solution is to poll the IOCP until we're sure that there are
+     * no further events for the handle.
+     */
+
+    /* Clear up callbacks for the key. 
+     * We don't want the callback to be called for this key.
+     */
+    key->cb.on_read_complete = NULL;
+    key->cb.on_write_complete = NULL;
+    key->cb.on_accept_complete = NULL;
+    key->cb.on_connect_complete = NULL;
+
+    /* Init overlapped struct */
+    pj_memset(&ov, 0, sizeof(ov));
+    ov.operation = PJ_IOQUEUE_OP_READ;
+
+    /* Post queued completion status with a special length. */
+    rc = PostQueuedCompletionStatus( key->ioqueue->iocp, (DWORD)POST_QUIT_LEN,
+				     (DWORD)key, &ov.overlapped);
+
+    /* Poll IOCP until has_quit_signal is set in the key.
+     * The has_quit_signal flag is set in poll_iocp() when POST_QUIT_LEN
+     * is detected. We need to have this flag because POST_QUIT_LEN may be
+     * detected by other threads.
+     */
+    do {
+	polled_len = 0;
+	polled_key = NULL;
+
+	rc = poll_iocp(key->ioqueue->iocp, 0, &polled_len, &polled_key);
+
+    } while (rc && !key->has_quit_signal);
+
+
+    /* Close handle if this is a file. */
+    if (key->hnd_type == HND_IS_FILE) {
+        CloseHandle(key->hnd);
+    }
+
+    return PJ_SUCCESS;
+}
+
+/*
+ * pj_ioqueue_poll()
+ *
+ * Poll for events.
+ */
+PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
+{
+    DWORD dwMsec;
+    int connect_count = 0;
+    pj_bool_t has_event;
+
+    PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
+
+    /* Check the connecting array. */
+#if PJ_HAS_TCP
+    connect_count = check_connecting(ioqueue);
+#endif
+
+    /* Calculate miliseconds timeout for GetQueuedCompletionStatus */
+    dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
+
+    /* Poll for completion status. */
+    has_event = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);
+
+    /* Return number of events. */
+    return connect_count + has_event;
 }
 
 /*
diff --git a/pjlib/src/pjlib-test/ioq_udp.c b/pjlib/src/pjlib-test/ioq_udp.c
index 5f933af..b81764f 100644
--- a/pjlib/src/pjlib-test/ioq_udp.c
+++ b/pjlib/src/pjlib-test/ioq_udp.c
@@ -313,6 +313,173 @@
 
 }
 
+
+static void on_read_complete(pj_ioqueue_key_t *key, 
+                             pj_ioqueue_op_key_t *op_key, 
+                             pj_ssize_t bytes_read)
+{
+    unsigned *p_packet_cnt = pj_ioqueue_get_user_data(key);
+
+    PJ_UNUSED_ARG(op_key);
+    PJ_UNUSED_ARG(bytes_read);
+
+    (*p_packet_cnt)++;
+}
+
+/*
+ * unregister_test()
+ * Check if callback is still called after socket has been unregistered or 
+ * closed.
+ */ 
+static int unregister_test(void)
+{
+    enum { RPORT = 50000, SPORT = 50001 };
+    pj_pool_t *pool;
+    pj_ioqueue_t *ioqueue;
+    pj_sock_t ssock;
+    pj_sock_t rsock;
+    int addrlen;
+    pj_sockaddr_in addr;
+    pj_ioqueue_key_t *key;
+    pj_ioqueue_op_key_t opkey;
+    pj_ioqueue_callback cb;
+    unsigned packet_cnt;
+    char sendbuf[10], recvbuf[10];
+    pj_ssize_t bytes;
+    pj_time_val timeout;
+    pj_status_t status;
+
+    pool = pj_pool_create(mem, "test", 4000, 4000, NULL);
+    if (!pool) {
+	app_perror("Unable to create pool", PJ_ENOMEM);
+	return -100;
+    }
+
+    status = pj_ioqueue_create(pool, 16, &ioqueue);
+    if (status != PJ_SUCCESS) {
+	app_perror("Error creating ioqueue", status);
+	return -110;
+    }
+
+    /* Create sender socket */
+    status = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, SPORT, &ssock);
+    if (status != PJ_SUCCESS) {
+	app_perror("Error initializing socket", status);
+	return -120;
+    }
+
+    /* Create receiver socket. */
+    status = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, RPORT, &rsock);
+    if (status != PJ_SUCCESS) {
+	app_perror("Error initializing socket", status);
+	return -130;
+    }
+
+    /* Register rsock to ioqueue. */
+    pj_memset(&cb, 0, sizeof(cb));
+    cb.on_read_complete = &on_read_complete;
+    packet_cnt = 0;
+    status = pj_ioqueue_register_sock(pool, ioqueue, rsock, &packet_cnt,
+				      &cb, &key);
+    if (status != PJ_SUCCESS) {
+	app_perror("Error registering to ioqueue", status);
+	return -140;
+    }
+
+    /* Init operation key. */
+    pj_ioqueue_op_key_init(&opkey, sizeof(opkey));
+
+    /* Start reading. */
+    bytes = sizeof(recvbuf);
+    status = pj_ioqueue_recv( key, &opkey, recvbuf, &bytes, 0);
+    if (status != PJ_EPENDING) {
+	app_perror("Expecting PJ_EPENDING, but got this", status);
+	return -150;
+    }
+
+    /* Init destination address. */
+    addrlen = sizeof(addr);
+    status = pj_sock_getsockname(rsock, &addr, &addrlen);
+    if (status != PJ_SUCCESS) {
+	app_perror("getsockname error", status);
+	return -160;
+    }
+
+    /* Override address with 127.0.0.1, since getsockname will return
+     * zero in the address field.
+     */
+    addr.sin_addr = pj_inet_addr2("127.0.0.1");
+
+    /* Init buffer to send */
+    pj_ansi_strcpy(sendbuf, "Hello0123");
+
+    /* Send one packet. */
+    bytes = sizeof(sendbuf);
+    status = pj_sock_sendto(ssock, sendbuf, &bytes, 0,
+			    &addr, sizeof(addr));
+
+    if (status != PJ_SUCCESS) {
+	app_perror("sendto error", status);
+	return -170;
+    }
+
+    /* Check if packet is received. */
+    timeout.sec = 1; timeout.msec = 0;
+    pj_ioqueue_poll(ioqueue, &timeout);
+
+    if (packet_cnt != 1) {
+	return -180;
+    }
+
+    /* Just to make sure things are settled.. */
+    pj_thread_sleep(100);
+
+    /* Start reading again. */
+    bytes = sizeof(recvbuf);
+    status = pj_ioqueue_recv( key, &opkey, recvbuf, &bytes, 0);
+    if (status != PJ_EPENDING) {
+	app_perror("Expecting PJ_EPENDING, but got this", status);
+	return -190;
+    }
+
+    /* Reset packet counter */
+    packet_cnt = 0;
+
+    /* Send one packet. */
+    bytes = sizeof(sendbuf);
+    status = pj_sock_sendto(ssock, sendbuf, &bytes, 0,
+			    &addr, sizeof(addr));
+
+    if (status != PJ_SUCCESS) {
+	app_perror("sendto error", status);
+	return -200;
+    }
+
+    /* Now unregister and close socket. */
+    pj_ioqueue_unregister(key);
+    pj_sock_close(rsock);
+
+    /* Poll ioqueue. */
+    timeout.sec = 1; timeout.msec = 0;
+    pj_ioqueue_poll(ioqueue, &timeout);
+
+    /* Must NOT receive any packets after socket is closed! */
+    if (packet_cnt > 0) {
+	PJ_LOG(3,(THIS_FILE, "....errror: not expecting to receive packet "
+			     "after socket has been closed"));
+	return -210;
+    }
+
+    /* Success */
+    pj_sock_close(ssock);
+    pj_ioqueue_destroy(ioqueue);
+
+    pj_pool_release(pool);
+
+    return 0;
+}
+
+
 /*
  * Testing with many handles.
  * This will just test registering PJ_IOQUEUE_MAX_HANDLES count
@@ -625,6 +792,13 @@
     }
     PJ_LOG(3, (THIS_FILE, "....compliance test ok"));
 
+
+    PJ_LOG(3, (THIS_FILE, "...unregister test (%s)", pj_ioqueue_name()));
+    if ((status=unregister_test()) != 0) {
+	return status;
+    }
+    PJ_LOG(3, (THIS_FILE, "....unregister test ok"));
+
     if ((status=many_handles_test()) != 0) {
 	return status;
     }
diff --git a/pjlib/src/pjlib-test/main_win32.c b/pjlib/src/pjlib-test/main_win32.c
index 07d9187..6024a95 100644
--- a/pjlib/src/pjlib-test/main_win32.c
+++ b/pjlib/src/pjlib-test/main_win32.c
@@ -55,7 +55,7 @@
     PJ_UNUSED_ARG(level);
     PJ_UNUSED_ARG(len);
     SendMessage(hwndLog, EM_REPLACESEL, FALSE, 
-		(LPARAM)PJ_STRING_TO_NATIVE(data,wdata));
+		(LPARAM)PJ_STRING_TO_NATIVE(data,wdata,256));
 }