Ticket #474: option in ioqueue to control concurrency (to allow/disallow simultaneous/multiple callback calls)

git-svn-id: https://svn.pjsip.org/repos/pjproject/trunk@1789 74dad513-b988-da41-8d7b-12977e46ad98
diff --git a/pjlib/src/pjlib-test/ioq_perf.c b/pjlib/src/pjlib-test/ioq_perf.c
index 92d4540..e7ddf1f 100644
--- a/pjlib/src/pjlib-test/ioq_perf.c
+++ b/pjlib/src/pjlib-test/ioq_perf.c
@@ -221,7 +221,8 @@
  *  - measure the total bytes received by all consumers during a
  *    period of time.
  */
-static int perform_test(int sock_type, const char *type_name,
+static int perform_test(pj_bool_t allow_concur,
+			int sock_type, const char *type_name,
                         unsigned thread_cnt, unsigned sockpair_cnt,
                         pj_size_t buffer_size, 
                         pj_size_t *p_bandwidth)
@@ -260,6 +261,12 @@
         return -15;
     }
 
+    rc = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur);
+    if (rc != PJ_SUCCESS) {
+	app_perror("...error: pj_ioqueue_set_default_concurrency()", rc);
+        return -16;
+    }
+
     /* Initialize each producer-consumer pair. */
     for (i=0; i<sockpair_cnt; ++i) {
         pj_ssize_t bytes;
@@ -437,10 +444,7 @@
     return 0;
 }
 
-/*
- * main test entry.
- */
-int ioqueue_perf_test(void)
+static int ioqueue_perf_test_imp(pj_bool_t allow_concur)
 {
     enum { BUF_SIZE = 512 };
     int i, rc;
@@ -500,6 +504,7 @@
     int best_index = 0;
 
     PJ_LOG(3,(THIS_FILE, "   Benchmarking %s ioqueue:", pj_ioqueue_name()));
+    PJ_LOG(3,(THIS_FILE, "   Testing with concurency=%d", allow_concur));
     PJ_LOG(3,(THIS_FILE, "   ======================================="));
     PJ_LOG(3,(THIS_FILE, "   Type  Threads  Skt.Pairs      Bandwidth"));
     PJ_LOG(3,(THIS_FILE, "   ======================================="));
@@ -508,7 +513,8 @@
     for (i=0; i<(int)(sizeof(test_param)/sizeof(test_param[0])); ++i) {
         pj_size_t bandwidth;
 
-        rc = perform_test(test_param[i].type, 
+        rc = perform_test(allow_concur,
+			  test_param[i].type, 
                           test_param[i].type_name,
                           test_param[i].thread_cnt, 
                           test_param[i].sockpair_cnt, 
@@ -537,6 +543,24 @@
     return 0;
 }
 
+/*
+ * main test entry.
+ */
+int ioqueue_perf_test(void)
+{
+    int rc;
+
+    rc = ioqueue_perf_test_imp(PJ_TRUE);
+    if (rc != 0)
+	return rc;
+
+    rc = ioqueue_perf_test_imp(PJ_FALSE);
+    if (rc != 0)
+	return rc;
+
+    return 0;
+}
+
 #else
 /* To prevent warning about "translation unit is empty"
  * when this test is disabled. 
diff --git a/pjlib/src/pjlib-test/ioq_tcp.c b/pjlib/src/pjlib-test/ioq_tcp.c
index c6e117d..106a964 100644
--- a/pjlib/src/pjlib-test/ioq_tcp.c
+++ b/pjlib/src/pjlib-test/ioq_tcp.c
@@ -232,7 +232,7 @@
 /*
  * Compliance test for success scenario.
  */
-static int compliance_test_0(void)
+static int compliance_test_0(pj_bool_t allow_concur)
 {
     pj_sock_t ssock=-1, csock0=-1, csock1=-1;
     pj_sockaddr_in addr, client_addr, rmt_addr;
@@ -292,6 +292,13 @@
 	status=-20; goto on_error;
     }
 
+    // Concurrency
+    rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur);
+    if (rc != PJ_SUCCESS) {
+        app_perror("...ERROR in pj_ioqueue_set_default_concurrency()", rc);
+	status=-21; goto on_error;
+    }
+
     // Register server socket and client socket.
     rc = pj_ioqueue_register_sock(pool, ioque, ssock, NULL, &test_cb, &skey);
     if (rc == PJ_SUCCESS)
@@ -458,7 +465,7 @@
  * Compliance test for failed scenario.
  * In this case, the client connects to a non-existant service.
  */
-static int compliance_test_1(void)
+static int compliance_test_1(pj_bool_t allow_concur)
 {
     pj_sock_t csock1=PJ_INVALID_SOCKET;
     pj_sockaddr_in addr;
@@ -479,6 +486,12 @@
 	status=-20; goto on_error;
     }
 
+    // Concurrency
+    rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur);
+    if (rc != PJ_SUCCESS) {
+	status=-21; goto on_error;
+    }
+
     // Create client socket
     rc = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0, &csock1);
     if (rc != PJ_SUCCESS) {
@@ -581,7 +594,7 @@
 /*
  * Repeated connect/accept on the same listener socket.
  */
-static int compliance_test_2(void)
+static int compliance_test_2(pj_bool_t allow_concur)
 {
 #if defined(PJ_SYMBIAN) && PJ_SYMBIAN!=0
     enum { MAX_PAIR = 1, TEST_LOOP = 2 };
@@ -648,6 +661,13 @@
     }
 
 
+    // Concurrency
+    rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur);
+    if (rc != PJ_SUCCESS) {
+        app_perror("...ERROR in pj_ioqueue_set_default_concurrency()", rc);
+	return -11;
+    }
+
     // Allocate buffers for send and receive.
     send_buf = (char*)pj_pool_alloc(pool, bufsize);
     recv_buf = (char*)pj_pool_alloc(pool, bufsize);
@@ -887,26 +907,28 @@
 }
 
 
-int tcp_ioqueue_test()
+static int tcp_ioqueue_test_impl(pj_bool_t allow_concur)
 {
     int status;
 
+    PJ_LOG(3,(THIS_FILE, "..testing with concurency=%d", allow_concur));
+
     PJ_LOG(3, (THIS_FILE, "..%s compliance test 0 (success scenario)",
 	       pj_ioqueue_name()));
-    if ((status=compliance_test_0()) != 0) {
+    if ((status=compliance_test_0(allow_concur)) != 0) {
 	PJ_LOG(1, (THIS_FILE, "....FAILED (status=%d)\n", status));
 	return status;
     }
     PJ_LOG(3, (THIS_FILE, "..%s compliance test 1 (failed scenario)",
                pj_ioqueue_name()));
-    if ((status=compliance_test_1()) != 0) {
+    if ((status=compliance_test_1(allow_concur)) != 0) {
 	PJ_LOG(1, (THIS_FILE, "....FAILED (status=%d)\n", status));
 	return status;
     }
 
     PJ_LOG(3, (THIS_FILE, "..%s compliance test 2 (repeated accept)",
                pj_ioqueue_name()));
-    if ((status=compliance_test_2()) != 0) {
+    if ((status=compliance_test_2(allow_concur)) != 0) {
 	PJ_LOG(1, (THIS_FILE, "....FAILED (status=%d)\n", status));
 	return status;
     }
@@ -914,6 +936,21 @@
     return 0;
 }
 
+int tcp_ioqueue_test()
+{
+    int rc;
+
+    rc = tcp_ioqueue_test_impl(PJ_TRUE);
+    if (rc != 0)
+	return rc;
+
+    rc = tcp_ioqueue_test_impl(PJ_FALSE);
+    if (rc != 0)
+	return rc;
+
+    return 0;
+}
+
 #endif	/* PJ_HAS_TCP */
 
 
diff --git a/pjlib/src/pjlib-test/ioq_udp.c b/pjlib/src/pjlib-test/ioq_udp.c
index 1bbe494..e7e1ae5 100644
--- a/pjlib/src/pjlib-test/ioq_udp.c
+++ b/pjlib/src/pjlib-test/ioq_udp.c
@@ -125,7 +125,7 @@
  * To test that the basic IOQueue functionality works. It will just exchange
  * data between two sockets.
  */ 
-static int compliance_test(void)
+static int compliance_test(pj_bool_t allow_concur)
 {
     pj_sock_t ssock=-1, csock=-1;
     pj_sockaddr_in addr, dst_addr;
@@ -178,6 +178,13 @@
 	status=-20; goto on_error;
     }
 
+    // Set concurrency
+    TRACE_("set concurrency...");
+    rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur);
+    if (rc != PJ_SUCCESS) {
+	status=-21; goto on_error;
+    }
+
     // Register server and client socket.
     // We put this after inactivity socket, hopefully this can represent the
     // worst waiting time.
@@ -351,7 +358,7 @@
  * Check if callback is still called after socket has been unregistered or 
  * closed.
  */ 
-static int unregister_test(void)
+static int unregister_test(pj_bool_t allow_concur)
 {
     enum { RPORT = 50000, SPORT = 50001 };
     pj_pool_t *pool;
@@ -381,6 +388,13 @@
 	return -110;
     }
 
+    // Set concurrency
+    TRACE_("set concurrency...");
+    status = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur);
+    if (status != PJ_SUCCESS) {
+	return -112;
+    }
+
     /* Create sender socket */
     status = app_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, SPORT, &ssock);
     if (status != PJ_SUCCESS) {
@@ -512,7 +526,7 @@
  * This will just test registering PJ_IOQUEUE_MAX_HANDLES count
  * of sockets to the ioqueue.
  */
-static int many_handles_test(void)
+static int many_handles_test(pj_bool_t allow_concur)
 {
     enum { MAX = PJ_IOQUEUE_MAX_HANDLES };
     pj_pool_t *pool;
@@ -539,6 +553,12 @@
 	return -10;
     }
 
+    // Set concurrency
+    rc = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur);
+    if (rc != PJ_SUCCESS) {
+	return -11;
+    }
+
     /* Register as many sockets. */
     for (count=0; count<MAX; ++count) {
 	sock[count] = PJ_INVALID_SOCKET;
@@ -600,7 +620,8 @@
 /*
  * Benchmarking IOQueue
  */
-static int bench_test(int bufsize, int inactive_sock_count)
+static int bench_test(pj_bool_t allow_concur, int bufsize, 
+		      int inactive_sock_count)
 {
     pj_sock_t ssock=-1, csock=-1;
     pj_sockaddr_in addr;
@@ -651,6 +672,13 @@
 	goto on_error;
     }
 
+    // Set concurrency
+    rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur);
+    if (rc != PJ_SUCCESS) {
+	app_perror("...error: pj_ioqueue_set_default_concurrency()", rc);
+	goto on_error;
+    }
+
     // Allocate inactive sockets, and bind them to some arbitrary address.
     // Then register them to the I/O queue, and start a read operation.
     inactive_sock = (pj_sock_t*)pj_pool_alloc(pool, 
@@ -839,27 +867,29 @@
     return -1;
 }
 
-int udp_ioqueue_test()
+static int udp_ioqueue_test_imp(pj_bool_t allow_concur)
 {
     int status;
     int bufsize, sock_count;
 
+    PJ_LOG(3,(THIS_FILE, "..testing with concurency=%d", allow_concur));
+
     //goto pass1;
 
     PJ_LOG(3, (THIS_FILE, "...compliance test (%s)", pj_ioqueue_name()));
-    if ((status=compliance_test()) != 0) {
+    if ((status=compliance_test(allow_concur)) != 0) {
 	return status;
     }
     PJ_LOG(3, (THIS_FILE, "....compliance test ok"));
 
 
     PJ_LOG(3, (THIS_FILE, "...unregister test (%s)", pj_ioqueue_name()));
-    if ((status=unregister_test()) != 0) {
+    if ((status=unregister_test(allow_concur)) != 0) {
 	return status;
     }
     PJ_LOG(3, (THIS_FILE, "....unregister test ok"));
 
-    if ((status=many_handles_test()) != 0) {
+    if ((status=many_handles_test(allow_concur)) != 0) {
 	return status;
     }
     
@@ -879,7 +909,7 @@
     //goto pass2;
 
     for (bufsize=BUF_MIN_SIZE; bufsize <= BUF_MAX_SIZE; bufsize *= 2) {
-	if ((status=bench_test(bufsize, SOCK_INACTIVE_MIN)) != 0)
+	if ((status=bench_test(allow_concur, bufsize, SOCK_INACTIVE_MIN)) != 0)
 	    return status;
     }
 //pass2:
@@ -889,12 +919,27 @@
 	 sock_count *= 2) 
     {
 	//PJ_LOG(3,(THIS_FILE, "...testing with %d fds", sock_count));
-	if ((status=bench_test(bufsize, sock_count-2)) != 0)
+	if ((status=bench_test(allow_concur, bufsize, sock_count-2)) != 0)
 	    return status;
     }
     return 0;
 }
 
+int udp_ioqueue_test()
+{
+    int rc;
+
+    rc = udp_ioqueue_test_imp(PJ_TRUE);
+    if (rc != 0)
+	return rc;
+
+    rc = udp_ioqueue_test_imp(PJ_FALSE);
+    if (rc != 0)
+	return rc;
+
+    return 0;
+}
+
 #else
 /* To prevent warning about "translation unit is empty"
  * when this test is disabled. 
diff --git a/pjlib/src/pjlib-test/ioq_unreg.c b/pjlib/src/pjlib-test/ioq_unreg.c
index 33e8627..a1e8075 100644
--- a/pjlib/src/pjlib-test/ioq_unreg.c
+++ b/pjlib/src/pjlib-test/ioq_unreg.c
@@ -286,14 +286,16 @@
     return 0;
 }
 
-int udp_ioqueue_unreg_test(void)
+static int udp_ioqueue_unreg_test_imp(pj_bool_t allow_concur)
 {
     enum { LOOP = 10 };
     int i, rc;
     char title[30];
     pj_ioqueue_t *ioqueue;
     pj_pool_t *test_pool;
-			      
+	
+    PJ_LOG(3,(THIS_FILE, "..testing with concurency=%d", allow_concur));
+
     test_method = UNREGISTER_IN_APP;
 
     test_pool = pj_pool_create(mem, "unregtest", 4000, 4000, NULL);
@@ -304,6 +306,11 @@
 	return -10;
     }
 
+    rc = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur);
+    if (rc != PJ_SUCCESS) {
+	app_perror("Error in pj_ioqueue_set_default_concurrency()", rc);
+	return -12;
+    }
 
     PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 0/3 (%s)", 
 	       pj_ioqueue_name()));
@@ -351,7 +358,20 @@
     return 0;
 }
 
+int udp_ioqueue_unreg_test(void)
+{
+    int rc;
 
+    rc = udp_ioqueue_unreg_test_imp(PJ_TRUE);
+    if (rc != 0)
+	return rc;
+
+    rc = udp_ioqueue_unreg_test_imp(PJ_FALSE);
+    if (rc != 0)
+	return rc;
+
+    return 0;
+}
 
 #else
 /* To prevent warning about "translation unit is empty"