Set svn:eol-style property

git-svn-id: https://svn.pjsip.org/repos/pjproject/trunk@65 74dad513-b988-da41-8d7b-12977e46ad98
diff --git a/pjlib/src/pjlib-test/ioq_perf.c b/pjlib/src/pjlib-test/ioq_perf.c
index b10decc..3a8de8e 100644
--- a/pjlib/src/pjlib-test/ioq_perf.c
+++ b/pjlib/src/pjlib-test/ioq_perf.c
@@ -1,502 +1,502 @@
-/* $Id$ */

-/* 

- * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>

- *

- * This program is free software; you can redistribute it and/or modify

- * it under the terms of the GNU General Public License as published by

- * the Free Software Foundation; either version 2 of the License, or

- * (at your option) any later version.

- *

- * This program is distributed in the hope that it will be useful,

- * but WITHOUT ANY WARRANTY; without even the implied warranty of

- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the

- * GNU General Public License for more details.

- *

- * You should have received a copy of the GNU General Public License

- * along with this program; if not, write to the Free Software

- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 

- */

-#include "test.h"

-#include <pjlib.h>

-#include <pj/compat/high_precision.h>

-

-/**

- * \page page_pjlib_ioqueue_perf_test Test: I/O Queue Performance

- *

- * Test the performance of the I/O queue, using typical producer

- * consumer test. The test should examine the effect of using multiple

- * threads on the performance.

- *

- * This file is <b>pjlib-test/ioq_perf.c</b>

- *

- * \include pjlib-test/ioq_perf.c

- */

-

-#if INCLUDE_IOQUEUE_PERF_TEST

-

-#ifdef _MSC_VER

-#   pragma warning ( disable: 4204)     // non-constant aggregate initializer

-#endif

-

-#define THIS_FILE	"ioq_perf"

-//#define TRACE_(expr)	PJ_LOG(3,expr)

-#define TRACE_(expr)

-

-

-static pj_bool_t thread_quit_flag;

-static pj_status_t last_error;

-static unsigned last_error_counter;

-

-/* Descriptor for each producer/consumer pair. */

-typedef struct test_item

-{

-    pj_sock_t            server_fd, 

-                         client_fd;

-    pj_ioqueue_t        *ioqueue;

-    pj_ioqueue_key_t    *server_key,

-                        *client_key;

-    pj_ioqueue_op_key_t  recv_op,

-                         send_op;

-    int                  has_pending_send;

-    pj_size_t            buffer_size;

-    char                *outgoing_buffer;

-    char                *incoming_buffer;

-    pj_size_t            bytes_sent, 

-                         bytes_recv;

-} test_item;

-

-/* Callback when data has been read.

- * Increment item->bytes_recv and ready to read the next data.

- */

-static void on_read_complete(pj_ioqueue_key_t *key, 

-                             pj_ioqueue_op_key_t *op_key,

-                             pj_ssize_t bytes_read)

-{

-    test_item *item = pj_ioqueue_get_user_data(key);

-    pj_status_t rc;

-    int data_is_available = 1;

-

-    //TRACE_((THIS_FILE, "     read complete, bytes_read=%d", bytes_read));

-

-    do {

-        if (thread_quit_flag)

-            return;

-

-        if (bytes_read < 0) {

-            pj_status_t rc = -bytes_read;

-            char errmsg[128];

-

-	    if (rc != last_error) {

-	        //last_error = rc;

-	        pj_strerror(rc, errmsg, sizeof(errmsg));

-	        PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)", 

-		          bytes_read, errmsg));

-	        PJ_LOG(3,(THIS_FILE, 

-		          ".....additional info: total read=%u, total sent=%u",

-		          item->bytes_recv, item->bytes_sent));

-	    } else {

-	        last_error_counter++;

-	    }

-            bytes_read = 0;

-

-        } else if (bytes_read == 0) {

-            PJ_LOG(3,(THIS_FILE, "...socket has closed!"));

-        }

-

-        item->bytes_recv += bytes_read;

-    

-        /* To assure that the test quits, even if main thread

-         * doesn't have time to run.

-         */

-        if (item->bytes_recv > item->buffer_size * 10000) 

-	    thread_quit_flag = 1;

-

-        bytes_read = item->buffer_size;

-        rc = pj_ioqueue_recv( key, op_key,

-                              item->incoming_buffer, &bytes_read, 0 );

-

-        if (rc == PJ_SUCCESS) {

-            data_is_available = 1;

-        } else if (rc == PJ_EPENDING) {

-            data_is_available = 0;

-        } else {

-            data_is_available = 0;

-	    if (rc != last_error) {

-	        last_error = rc;

-	        app_perror("...error: read error(1)", rc);

-	    } else {

-	        last_error_counter++;

-	    }

-        }

-

-        if (!item->has_pending_send) {

-            pj_ssize_t sent = item->buffer_size;

-            rc = pj_ioqueue_send(item->client_key, &item->send_op,

-                                 item->outgoing_buffer, &sent, 0);

-            if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {

-                app_perror("...error: write error", rc);

-            }

-

-            item->has_pending_send = (rc==PJ_EPENDING);

-        }

-

-    } while (data_is_available);

-}

-

-/* Callback when data has been written.

- * Increment item->bytes_sent and write the next data.

- */

-static void on_write_complete(pj_ioqueue_key_t *key, 

-                              pj_ioqueue_op_key_t *op_key,

-                              pj_ssize_t bytes_sent)

-{

-    test_item *item = pj_ioqueue_get_user_data(key);

-    

-    //TRACE_((THIS_FILE, "     write complete: sent = %d", bytes_sent));

-

-    if (thread_quit_flag)

-        return;

-

-    item->has_pending_send = 0;

-    item->bytes_sent += bytes_sent;

-

-    if (bytes_sent <= 0) {

-        PJ_LOG(3,(THIS_FILE, "...error: sending stopped. bytes_sent=%d", 

-                  bytes_sent));

-    } 

-    else {

-        pj_status_t rc;

-

-        bytes_sent = item->buffer_size;

-        rc = pj_ioqueue_send( item->client_key, op_key,

-                              item->outgoing_buffer, &bytes_sent, 0);

-        if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {

-            app_perror("...error: write error", rc);

-        }

-

-        item->has_pending_send = (rc==PJ_EPENDING);

-    }

-}

-

-/* The worker thread. */

-static int worker_thread(void *arg)

-{

-    pj_ioqueue_t *ioqueue = arg;

-    const pj_time_val timeout = {0, 100};

-    int rc;

-

-    while (!thread_quit_flag) {

-        rc = pj_ioqueue_poll(ioqueue, &timeout);

-	//TRACE_((THIS_FILE, "     thread: poll returned rc=%d", rc));

-        if (rc < 0) {

-            app_perror("...error in pj_ioqueue_poll()", pj_get_netos_error());

-            return -1;

-        }

-    }

-    return 0;

-}

-

-/* Calculate the bandwidth for the specific test configuration.

- * The test is simple:

- *  - create sockpair_cnt number of producer-consumer socket pair.

- *  - create thread_cnt number of worker threads.

- *  - each producer will send buffer_size bytes data as fast and

- *    as soon as it can.

- *  - each consumer will read buffer_size bytes of data as fast 

- *    as it could.

- *  - measure the total bytes received by all consumers during a

- *    period of time.

- */

-static int perform_test(int sock_type, const char *type_name,

-                        unsigned thread_cnt, unsigned sockpair_cnt,

-                        pj_size_t buffer_size, 

-                        pj_size_t *p_bandwidth)

-{

-    enum { MSEC_DURATION = 5000 };

-    pj_pool_t *pool;

-    test_item *items;

-    pj_thread_t **thread;

-    pj_ioqueue_t *ioqueue;

-    pj_status_t rc;

-    pj_ioqueue_callback ioqueue_callback;

-    pj_uint32_t total_elapsed_usec, total_received;

-    pj_highprec_t bandwidth;

-    pj_timestamp start, stop;

-    unsigned i;

-

-    TRACE_((THIS_FILE, "    starting test.."));

-

-    ioqueue_callback.on_read_complete = &on_read_complete;

-    ioqueue_callback.on_write_complete = &on_write_complete;

-

-    thread_quit_flag = 0;

-

-    pool = pj_pool_create(mem, NULL, 4096, 4096, NULL);

-    if (!pool)

-        return -10;

-

-    items = pj_pool_alloc(pool, sockpair_cnt*sizeof(test_item));

-    thread = pj_pool_alloc(pool, thread_cnt*sizeof(pj_thread_t*));

-

-    TRACE_((THIS_FILE, "     creating ioqueue.."));

-    rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue);

-    if (rc != PJ_SUCCESS) {

-        app_perror("...error: unable to create ioqueue", rc);

-        return -15;

-    }

-

-    /* Initialize each producer-consumer pair. */

-    for (i=0; i<sockpair_cnt; ++i) {

-        pj_ssize_t bytes;

-

-        items[i].ioqueue = ioqueue;

-        items[i].buffer_size = buffer_size;

-        items[i].outgoing_buffer = pj_pool_alloc(pool, buffer_size);

-        items[i].incoming_buffer = pj_pool_alloc(pool, buffer_size);

-        items[i].bytes_recv = items[i].bytes_sent = 0;

-

-        /* randomize outgoing buffer. */

-        pj_create_random_string(items[i].outgoing_buffer, buffer_size);

-

-        /* Create socket pair. */

-	TRACE_((THIS_FILE, "      calling socketpair.."));

-        rc = app_socketpair(PJ_AF_INET, sock_type, 0, 

-                            &items[i].server_fd, &items[i].client_fd);

-        if (rc != PJ_SUCCESS) {

-            app_perror("...error: unable to create socket pair", rc);

-            return -20;

-        }

-

-        /* Register server socket to ioqueue. */

-	TRACE_((THIS_FILE, "      register(1).."));

-        rc = pj_ioqueue_register_sock(pool, ioqueue, 

-                                      items[i].server_fd,

-                                      &items[i], &ioqueue_callback,

-                                      &items[i].server_key);

-        if (rc != PJ_SUCCESS) {

-            app_perror("...error: registering server socket to ioqueue", rc);

-            return -60;

-        }

-

-        /* Register client socket to ioqueue. */

-	TRACE_((THIS_FILE, "      register(2).."));

-        rc = pj_ioqueue_register_sock(pool, ioqueue, 

-                                      items[i].client_fd,

-                                      &items[i],  &ioqueue_callback,

-                                      &items[i].client_key);

-        if (rc != PJ_SUCCESS) {

-            app_perror("...error: registering server socket to ioqueue", rc);

-            return -70;

-        }

-

-        /* Start reading. */

-	TRACE_((THIS_FILE, "      pj_ioqueue_recv.."));

-        bytes = items[i].buffer_size;

-        rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op,

-                             items[i].incoming_buffer, &bytes,

-			     0);

-        if (rc != PJ_EPENDING) {

-            app_perror("...error: pj_ioqueue_recv", rc);

-            return -73;

-        }

-

-        /* Start writing. */

-	TRACE_((THIS_FILE, "      pj_ioqueue_write.."));

-        bytes = items[i].buffer_size;

-        rc = pj_ioqueue_send(items[i].client_key, &items[i].recv_op,

-                             items[i].outgoing_buffer, &bytes, 0);

-        if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {

-            app_perror("...error: pj_ioqueue_write", rc);

-            return -76;

-        }

-

-        items[i].has_pending_send = (rc==PJ_EPENDING);

-    }

-

-    /* Create the threads. */

-    for (i=0; i<thread_cnt; ++i) {

-        rc = pj_thread_create( pool, NULL, 

-                               &worker_thread, 

-                               ioqueue, 

-                               PJ_THREAD_DEFAULT_STACK_SIZE, 

-                               PJ_THREAD_SUSPENDED, &thread[i] );

-        if (rc != PJ_SUCCESS) {

-            app_perror("...error: unable to create thread", rc);

-            return -80;

-        }

-    }

-

-    /* Mark start time. */

-    rc = pj_get_timestamp(&start);

-    if (rc != PJ_SUCCESS)

-        return -90;

-

-    /* Start the thread. */

-    TRACE_((THIS_FILE, "     resuming all threads.."));

-    for (i=0; i<thread_cnt; ++i) {

-        rc = pj_thread_resume(thread[i]);

-        if (rc != 0)

-            return -100;

-    }

-

-    /* Wait for MSEC_DURATION seconds. 

-     * This should be as simple as pj_thread_sleep(MSEC_DURATION) actually,

-     * but unfortunately it doesn't work when system doesn't employ

-     * timeslicing for threads.

-     */

-    TRACE_((THIS_FILE, "     wait for few seconds.."));

-    do {

-	pj_thread_sleep(1);

-

-	/* Mark end time. */

-	rc = pj_get_timestamp(&stop);

-

-	if (thread_quit_flag) {

-	    TRACE_((THIS_FILE, "      transfer limit reached.."));

-	    break;

-	}

-

-	if (pj_elapsed_usec(&start,&stop)<MSEC_DURATION * 1000) {

-	    TRACE_((THIS_FILE, "      time limit reached.."));

-	    break;

-	}

-

-    } while (1);

-

-    /* Terminate all threads. */

-    TRACE_((THIS_FILE, "     terminating all threads.."));

-    thread_quit_flag = 1;

-

-    for (i=0; i<thread_cnt; ++i) {

-	TRACE_((THIS_FILE, "      join thread %d..", i));

-        pj_thread_join(thread[i]);

-        pj_thread_destroy(thread[i]);

-    }

-

-    /* Close all sockets. */

-    TRACE_((THIS_FILE, "     closing all sockets.."));

-    for (i=0; i<sockpair_cnt; ++i) {

-        pj_ioqueue_unregister(items[i].server_key);

-        pj_ioqueue_unregister(items[i].client_key);

-        pj_sock_close(items[i].server_fd);

-        pj_sock_close(items[i].client_fd);

-    }

-

-    /* Destroy ioqueue. */

-    TRACE_((THIS_FILE, "     destroying ioqueue.."));

-    pj_ioqueue_destroy(ioqueue);

-

-    /* Calculate actual time in usec. */

-    total_elapsed_usec = pj_elapsed_usec(&start, &stop);

-

-    /* Calculate total bytes received. */

-    total_received = 0;

-    for (i=0; i<sockpair_cnt; ++i) {

-        total_received = items[i].bytes_recv;

-    }

-

-    /* bandwidth = total_received*1000/total_elapsed_usec */

-    bandwidth = total_received;

-    pj_highprec_mul(bandwidth, 1000);

-    pj_highprec_div(bandwidth, total_elapsed_usec);

-    

-    *p_bandwidth = (pj_uint32_t)bandwidth;

-

-    PJ_LOG(3,(THIS_FILE, "   %.4s    %d         %d        %3d us  %8d KB/s",

-              type_name, thread_cnt, sockpair_cnt,

-              -1 /*total_elapsed_usec/sockpair_cnt*/,

-              *p_bandwidth));

-

-    /* Done. */

-    pj_pool_release(pool);

-

-    TRACE_((THIS_FILE, "    done.."));

-    return 0;

-}

-

-/*

- * main test entry.

- */

-int ioqueue_perf_test(void)

-{

-    enum { BUF_SIZE = 512 };

-    int i, rc;

-    struct {

-        int         type;

-        const char *type_name;

-        int         thread_cnt;

-        int         sockpair_cnt;

-    } test_param[] = 

-    {

-        { PJ_SOCK_DGRAM, "udp", 1, 1},

-        { PJ_SOCK_DGRAM, "udp", 1, 2},

-        { PJ_SOCK_DGRAM, "udp", 1, 4},

-        { PJ_SOCK_DGRAM, "udp", 1, 8},

-        { PJ_SOCK_DGRAM, "udp", 2, 1},

-        { PJ_SOCK_DGRAM, "udp", 2, 2},

-        { PJ_SOCK_DGRAM, "udp", 2, 4},

-        { PJ_SOCK_DGRAM, "udp", 2, 8},

-        { PJ_SOCK_DGRAM, "udp", 4, 1},

-        { PJ_SOCK_DGRAM, "udp", 4, 2},

-        { PJ_SOCK_DGRAM, "udp", 4, 4},

-        { PJ_SOCK_DGRAM, "udp", 4, 8},

-        { PJ_SOCK_STREAM, "tcp", 1, 1},

-        { PJ_SOCK_STREAM, "tcp", 1, 2},

-        { PJ_SOCK_STREAM, "tcp", 1, 4},

-        { PJ_SOCK_STREAM, "tcp", 1, 8},

-        { PJ_SOCK_STREAM, "tcp", 2, 1},

-        { PJ_SOCK_STREAM, "tcp", 2, 2},

-        { PJ_SOCK_STREAM, "tcp", 2, 4},

-        { PJ_SOCK_STREAM, "tcp", 2, 8},

-        { PJ_SOCK_STREAM, "tcp", 4, 1},

-        { PJ_SOCK_STREAM, "tcp", 4, 2},

-        { PJ_SOCK_STREAM, "tcp", 4, 4},

-        { PJ_SOCK_STREAM, "tcp", 4, 8},

-    };

-    pj_size_t best_bandwidth;

-    int best_index = 0;

-

-    PJ_LOG(3,(THIS_FILE, "   Benchmarking %s ioqueue:", pj_ioqueue_name()));

-    PJ_LOG(3,(THIS_FILE, "   ==============================================="));

-    PJ_LOG(3,(THIS_FILE, "   Type  Threads  Skt.Pairs  Avg.Time    Bandwidth"));

-    PJ_LOG(3,(THIS_FILE, "   ==============================================="));

-

-    best_bandwidth = 0;

-    for (i=0; i<sizeof(test_param)/sizeof(test_param[0]); ++i) {

-        pj_size_t bandwidth;

-

-        rc = perform_test(test_param[i].type, 

-                          test_param[i].type_name,

-                          test_param[i].thread_cnt, 

-                          test_param[i].sockpair_cnt, 

-                          BUF_SIZE, 

-                          &bandwidth);

-        if (rc != 0)

-            return rc;

-

-        if (bandwidth > best_bandwidth)

-            best_bandwidth = bandwidth, best_index = i;

-

-        /* Give it a rest before next test. */

-        pj_thread_sleep(500);

-    }

-

-    PJ_LOG(3,(THIS_FILE, 

-              "   Best: Type=%s Threads=%d, Skt.Pairs=%d, Bandwidth=%u KB/s",

-              test_param[best_index].type_name,

-              test_param[best_index].thread_cnt,

-              test_param[best_index].sockpair_cnt,

-              best_bandwidth));

-    PJ_LOG(3,(THIS_FILE, "   (Note: packet size=%d, total errors=%u)", 

-			 BUF_SIZE, last_error_counter));

-    return 0;

-}

-

-#else

-/* To prevent warning about "translation unit is empty"

- * when this test is disabled. 

- */

-int dummy_uiq_perf_test;

-#endif  /* INCLUDE_IOQUEUE_PERF_TEST */

-

-

+/* $Id$ */
+/* 
+ * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
+ */
+#include "test.h"
+#include <pjlib.h>
+#include <pj/compat/high_precision.h>
+
+/**
+ * \page page_pjlib_ioqueue_perf_test Test: I/O Queue Performance
+ *
+ * Test the performance of the I/O queue, using typical producer
+ * consumer test. The test should examine the effect of using multiple
+ * threads on the performance.
+ *
+ * This file is <b>pjlib-test/ioq_perf.c</b>
+ *
+ * \include pjlib-test/ioq_perf.c
+ */
+
+#if INCLUDE_IOQUEUE_PERF_TEST
+
+#ifdef _MSC_VER
+#   pragma warning ( disable: 4204)     // non-constant aggregate initializer
+#endif
+
+#define THIS_FILE	"ioq_perf"
+//#define TRACE_(expr)	PJ_LOG(3,expr)
+#define TRACE_(expr)
+
+
+static pj_bool_t thread_quit_flag;
+static pj_status_t last_error;
+static unsigned last_error_counter;
+
+/* Descriptor for each producer/consumer pair. */
+typedef struct test_item
+{
+    pj_sock_t            server_fd, 
+                         client_fd;
+    pj_ioqueue_t        *ioqueue;
+    pj_ioqueue_key_t    *server_key,
+                        *client_key;
+    pj_ioqueue_op_key_t  recv_op,
+                         send_op;
+    int                  has_pending_send;
+    pj_size_t            buffer_size;
+    char                *outgoing_buffer;
+    char                *incoming_buffer;
+    pj_size_t            bytes_sent, 
+                         bytes_recv;
+} test_item;
+
+/* Callback when data has been read.
+ * Increment item->bytes_recv and ready to read the next data.
+ */
+static void on_read_complete(pj_ioqueue_key_t *key, 
+                             pj_ioqueue_op_key_t *op_key,
+                             pj_ssize_t bytes_read)
+{
+    test_item *item = pj_ioqueue_get_user_data(key);
+    pj_status_t rc;
+    int data_is_available = 1;
+
+    //TRACE_((THIS_FILE, "     read complete, bytes_read=%d", bytes_read));
+
+    do {
+        if (thread_quit_flag)
+            return;
+
+        if (bytes_read < 0) {
+            pj_status_t rc = -bytes_read;
+            char errmsg[128];
+
+	    if (rc != last_error) {
+	        //last_error = rc;
+	        pj_strerror(rc, errmsg, sizeof(errmsg));
+	        PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)", 
+		          bytes_read, errmsg));
+	        PJ_LOG(3,(THIS_FILE, 
+		          ".....additional info: total read=%u, total sent=%u",
+		          item->bytes_recv, item->bytes_sent));
+	    } else {
+	        last_error_counter++;
+	    }
+            bytes_read = 0;
+
+        } else if (bytes_read == 0) {
+            PJ_LOG(3,(THIS_FILE, "...socket has closed!"));
+        }
+
+        item->bytes_recv += bytes_read;
+    
+        /* To assure that the test quits, even if main thread
+         * doesn't have time to run.
+         */
+        if (item->bytes_recv > item->buffer_size * 10000) 
+	    thread_quit_flag = 1;
+
+        bytes_read = item->buffer_size;
+        rc = pj_ioqueue_recv( key, op_key,
+                              item->incoming_buffer, &bytes_read, 0 );
+
+        if (rc == PJ_SUCCESS) {
+            data_is_available = 1;
+        } else if (rc == PJ_EPENDING) {
+            data_is_available = 0;
+        } else {
+            data_is_available = 0;
+	    if (rc != last_error) {
+	        last_error = rc;
+	        app_perror("...error: read error(1)", rc);
+	    } else {
+	        last_error_counter++;
+	    }
+        }
+
+        if (!item->has_pending_send) {
+            pj_ssize_t sent = item->buffer_size;
+            rc = pj_ioqueue_send(item->client_key, &item->send_op,
+                                 item->outgoing_buffer, &sent, 0);
+            if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
+                app_perror("...error: write error", rc);
+            }
+
+            item->has_pending_send = (rc==PJ_EPENDING);
+        }
+
+    } while (data_is_available);
+}
+
+/* Callback when data has been written.
+ * Increment item->bytes_sent and write the next data.
+ */
+static void on_write_complete(pj_ioqueue_key_t *key, 
+                              pj_ioqueue_op_key_t *op_key,
+                              pj_ssize_t bytes_sent)
+{
+    test_item *item = pj_ioqueue_get_user_data(key);
+    
+    //TRACE_((THIS_FILE, "     write complete: sent = %d", bytes_sent));
+
+    if (thread_quit_flag)
+        return;
+
+    item->has_pending_send = 0;
+    item->bytes_sent += bytes_sent;
+
+    if (bytes_sent <= 0) {
+        PJ_LOG(3,(THIS_FILE, "...error: sending stopped. bytes_sent=%d", 
+                  bytes_sent));
+    } 
+    else {
+        pj_status_t rc;
+
+        bytes_sent = item->buffer_size;
+        rc = pj_ioqueue_send( item->client_key, op_key,
+                              item->outgoing_buffer, &bytes_sent, 0);
+        if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
+            app_perror("...error: write error", rc);
+        }
+
+        item->has_pending_send = (rc==PJ_EPENDING);
+    }
+}
+
+/* The worker thread. */
+static int worker_thread(void *arg)
+{
+    pj_ioqueue_t *ioqueue = arg;
+    const pj_time_val timeout = {0, 100};
+    int rc;
+
+    while (!thread_quit_flag) {
+        rc = pj_ioqueue_poll(ioqueue, &timeout);
+	//TRACE_((THIS_FILE, "     thread: poll returned rc=%d", rc));
+        if (rc < 0) {
+            app_perror("...error in pj_ioqueue_poll()", pj_get_netos_error());
+            return -1;
+        }
+    }
+    return 0;
+}
+
+/* Calculate the bandwidth for the specific test configuration.
+ * The test is simple:
+ *  - create sockpair_cnt number of producer-consumer socket pair.
+ *  - create thread_cnt number of worker threads.
+ *  - each producer will send buffer_size bytes data as fast and
+ *    as soon as it can.
+ *  - each consumer will read buffer_size bytes of data as fast 
+ *    as it could.
+ *  - measure the total bytes received by all consumers during a
+ *    period of time.
+ */
+static int perform_test(int sock_type, const char *type_name,
+                        unsigned thread_cnt, unsigned sockpair_cnt,
+                        pj_size_t buffer_size, 
+                        pj_size_t *p_bandwidth)
+{
+    enum { MSEC_DURATION = 5000 };
+    pj_pool_t *pool;
+    test_item *items;
+    pj_thread_t **thread;
+    pj_ioqueue_t *ioqueue;
+    pj_status_t rc;
+    pj_ioqueue_callback ioqueue_callback;
+    pj_uint32_t total_elapsed_usec, total_received;
+    pj_highprec_t bandwidth;
+    pj_timestamp start, stop;
+    unsigned i;
+
+    TRACE_((THIS_FILE, "    starting test.."));
+
+    ioqueue_callback.on_read_complete = &on_read_complete;
+    ioqueue_callback.on_write_complete = &on_write_complete;
+
+    thread_quit_flag = 0;
+
+    pool = pj_pool_create(mem, NULL, 4096, 4096, NULL);
+    if (!pool)
+        return -10;
+
+    items = pj_pool_alloc(pool, sockpair_cnt*sizeof(test_item));
+    thread = pj_pool_alloc(pool, thread_cnt*sizeof(pj_thread_t*));
+
+    TRACE_((THIS_FILE, "     creating ioqueue.."));
+    rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue);
+    if (rc != PJ_SUCCESS) {
+        app_perror("...error: unable to create ioqueue", rc);
+        return -15;
+    }
+
+    /* Initialize each producer-consumer pair. */
+    for (i=0; i<sockpair_cnt; ++i) {
+        pj_ssize_t bytes;
+
+        items[i].ioqueue = ioqueue;
+        items[i].buffer_size = buffer_size;
+        items[i].outgoing_buffer = pj_pool_alloc(pool, buffer_size);
+        items[i].incoming_buffer = pj_pool_alloc(pool, buffer_size);
+        items[i].bytes_recv = items[i].bytes_sent = 0;
+
+        /* randomize outgoing buffer. */
+        pj_create_random_string(items[i].outgoing_buffer, buffer_size);
+
+        /* Create socket pair. */
+	TRACE_((THIS_FILE, "      calling socketpair.."));
+        rc = app_socketpair(PJ_AF_INET, sock_type, 0, 
+                            &items[i].server_fd, &items[i].client_fd);
+        if (rc != PJ_SUCCESS) {
+            app_perror("...error: unable to create socket pair", rc);
+            return -20;
+        }
+
+        /* Register server socket to ioqueue. */
+	TRACE_((THIS_FILE, "      register(1).."));
+        rc = pj_ioqueue_register_sock(pool, ioqueue, 
+                                      items[i].server_fd,
+                                      &items[i], &ioqueue_callback,
+                                      &items[i].server_key);
+        if (rc != PJ_SUCCESS) {
+            app_perror("...error: registering server socket to ioqueue", rc);
+            return -60;
+        }
+
+        /* Register client socket to ioqueue. */
+	TRACE_((THIS_FILE, "      register(2).."));
+        rc = pj_ioqueue_register_sock(pool, ioqueue, 
+                                      items[i].client_fd,
+                                      &items[i],  &ioqueue_callback,
+                                      &items[i].client_key);
+        if (rc != PJ_SUCCESS) {
+            app_perror("...error: registering server socket to ioqueue", rc);
+            return -70;
+        }
+
+        /* Start reading. */
+	TRACE_((THIS_FILE, "      pj_ioqueue_recv.."));
+        bytes = items[i].buffer_size;
+        rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op,
+                             items[i].incoming_buffer, &bytes,
+			     0);
+        if (rc != PJ_EPENDING) {
+            app_perror("...error: pj_ioqueue_recv", rc);
+            return -73;
+        }
+
+        /* Start writing. */
+	TRACE_((THIS_FILE, "      pj_ioqueue_write.."));
+        bytes = items[i].buffer_size;
+        rc = pj_ioqueue_send(items[i].client_key, &items[i].recv_op,
+                             items[i].outgoing_buffer, &bytes, 0);
+        if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
+            app_perror("...error: pj_ioqueue_write", rc);
+            return -76;
+        }
+
+        items[i].has_pending_send = (rc==PJ_EPENDING);
+    }
+
+    /* Create the threads. */
+    for (i=0; i<thread_cnt; ++i) {
+        rc = pj_thread_create( pool, NULL, 
+                               &worker_thread, 
+                               ioqueue, 
+                               PJ_THREAD_DEFAULT_STACK_SIZE, 
+                               PJ_THREAD_SUSPENDED, &thread[i] );
+        if (rc != PJ_SUCCESS) {
+            app_perror("...error: unable to create thread", rc);
+            return -80;
+        }
+    }
+
+    /* Mark start time. */
+    rc = pj_get_timestamp(&start);
+    if (rc != PJ_SUCCESS)
+        return -90;
+
+    /* Start the thread. */
+    TRACE_((THIS_FILE, "     resuming all threads.."));
+    for (i=0; i<thread_cnt; ++i) {
+        rc = pj_thread_resume(thread[i]);
+        if (rc != 0)
+            return -100;
+    }
+
+    /* Wait for MSEC_DURATION seconds. 
+     * This should be as simple as pj_thread_sleep(MSEC_DURATION) actually,
+     * but unfortunately it doesn't work when system doesn't employ
+     * timeslicing for threads.
+     */
+    TRACE_((THIS_FILE, "     wait for few seconds.."));
+    do {
+	pj_thread_sleep(1);
+
+	/* Mark end time. */
+	rc = pj_get_timestamp(&stop);
+
+	if (thread_quit_flag) {
+	    TRACE_((THIS_FILE, "      transfer limit reached.."));
+	    break;
+	}
+
+	if (pj_elapsed_usec(&start,&stop)<MSEC_DURATION * 1000) {
+	    TRACE_((THIS_FILE, "      time limit reached.."));
+	    break;
+	}
+
+    } while (1);
+
+    /* Terminate all threads. */
+    TRACE_((THIS_FILE, "     terminating all threads.."));
+    thread_quit_flag = 1;
+
+    for (i=0; i<thread_cnt; ++i) {
+	TRACE_((THIS_FILE, "      join thread %d..", i));
+        pj_thread_join(thread[i]);
+        pj_thread_destroy(thread[i]);
+    }
+
+    /* Close all sockets. */
+    TRACE_((THIS_FILE, "     closing all sockets.."));
+    for (i=0; i<sockpair_cnt; ++i) {
+        pj_ioqueue_unregister(items[i].server_key);
+        pj_ioqueue_unregister(items[i].client_key);
+        pj_sock_close(items[i].server_fd);
+        pj_sock_close(items[i].client_fd);
+    }
+
+    /* Destroy ioqueue. */
+    TRACE_((THIS_FILE, "     destroying ioqueue.."));
+    pj_ioqueue_destroy(ioqueue);
+
+    /* Calculate actual time in usec. */
+    total_elapsed_usec = pj_elapsed_usec(&start, &stop);
+
+    /* Calculate total bytes received. */
+    total_received = 0;
+    for (i=0; i<sockpair_cnt; ++i) {
+        total_received = items[i].bytes_recv;
+    }
+
+    /* bandwidth = total_received*1000/total_elapsed_usec */
+    bandwidth = total_received;
+    pj_highprec_mul(bandwidth, 1000);
+    pj_highprec_div(bandwidth, total_elapsed_usec);
+    
+    *p_bandwidth = (pj_uint32_t)bandwidth;
+
+    PJ_LOG(3,(THIS_FILE, "   %.4s    %d         %d        %3d us  %8d KB/s",
+              type_name, thread_cnt, sockpair_cnt,
+              -1 /*total_elapsed_usec/sockpair_cnt*/,
+              *p_bandwidth));
+
+    /* Done. */
+    pj_pool_release(pool);
+
+    TRACE_((THIS_FILE, "    done.."));
+    return 0;
+}
+
+/*
+ * main test entry.
+ */
+int ioqueue_perf_test(void)
+{
+    enum { BUF_SIZE = 512 };
+    int i, rc;
+    struct {
+        int         type;
+        const char *type_name;
+        int         thread_cnt;
+        int         sockpair_cnt;
+    } test_param[] = 
+    {
+        { PJ_SOCK_DGRAM, "udp", 1, 1},
+        { PJ_SOCK_DGRAM, "udp", 1, 2},
+        { PJ_SOCK_DGRAM, "udp", 1, 4},
+        { PJ_SOCK_DGRAM, "udp", 1, 8},
+        { PJ_SOCK_DGRAM, "udp", 2, 1},
+        { PJ_SOCK_DGRAM, "udp", 2, 2},
+        { PJ_SOCK_DGRAM, "udp", 2, 4},
+        { PJ_SOCK_DGRAM, "udp", 2, 8},
+        { PJ_SOCK_DGRAM, "udp", 4, 1},
+        { PJ_SOCK_DGRAM, "udp", 4, 2},
+        { PJ_SOCK_DGRAM, "udp", 4, 4},
+        { PJ_SOCK_DGRAM, "udp", 4, 8},
+        { PJ_SOCK_STREAM, "tcp", 1, 1},
+        { PJ_SOCK_STREAM, "tcp", 1, 2},
+        { PJ_SOCK_STREAM, "tcp", 1, 4},
+        { PJ_SOCK_STREAM, "tcp", 1, 8},
+        { PJ_SOCK_STREAM, "tcp", 2, 1},
+        { PJ_SOCK_STREAM, "tcp", 2, 2},
+        { PJ_SOCK_STREAM, "tcp", 2, 4},
+        { PJ_SOCK_STREAM, "tcp", 2, 8},
+        { PJ_SOCK_STREAM, "tcp", 4, 1},
+        { PJ_SOCK_STREAM, "tcp", 4, 2},
+        { PJ_SOCK_STREAM, "tcp", 4, 4},
+        { PJ_SOCK_STREAM, "tcp", 4, 8},
+    };
+    pj_size_t best_bandwidth;
+    int best_index = 0;
+
+    PJ_LOG(3,(THIS_FILE, "   Benchmarking %s ioqueue:", pj_ioqueue_name()));
+    PJ_LOG(3,(THIS_FILE, "   ==============================================="));
+    PJ_LOG(3,(THIS_FILE, "   Type  Threads  Skt.Pairs  Avg.Time    Bandwidth"));
+    PJ_LOG(3,(THIS_FILE, "   ==============================================="));
+
+    best_bandwidth = 0;
+    for (i=0; i<sizeof(test_param)/sizeof(test_param[0]); ++i) {
+        pj_size_t bandwidth;
+
+        rc = perform_test(test_param[i].type, 
+                          test_param[i].type_name,
+                          test_param[i].thread_cnt, 
+                          test_param[i].sockpair_cnt, 
+                          BUF_SIZE, 
+                          &bandwidth);
+        if (rc != 0)
+            return rc;
+
+        if (bandwidth > best_bandwidth)
+            best_bandwidth = bandwidth, best_index = i;
+
+        /* Give it a rest before next test. */
+        pj_thread_sleep(500);
+    }
+
+    PJ_LOG(3,(THIS_FILE, 
+              "   Best: Type=%s Threads=%d, Skt.Pairs=%d, Bandwidth=%u KB/s",
+              test_param[best_index].type_name,
+              test_param[best_index].thread_cnt,
+              test_param[best_index].sockpair_cnt,
+              best_bandwidth));
+    PJ_LOG(3,(THIS_FILE, "   (Note: packet size=%d, total errors=%u)", 
+			 BUF_SIZE, last_error_counter));
+    return 0;
+}
+
+#else
+/* To prevent warning about "translation unit is empty"
+ * when this test is disabled. 
+ */
+int dummy_uiq_perf_test;
+#endif  /* INCLUDE_IOQUEUE_PERF_TEST */
+
+