blob: ea47603f2d35c1f5b74175be6700b63470f55694 [file] [log] [blame]
Benny Prijono5dcb38d2005-11-21 01:55:47 +00001/* $Id$ */
2/*
Benny Prijonoa771a512007-02-19 01:13:53 +00003 * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org>
Benny Prijono5dcb38d2005-11-21 01:55:47 +00004 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19#include "test.h"
20#include <pjlib.h>
21#include <pj/compat/high_precision.h>
22
23/**
24 * \page page_pjlib_ioqueue_perf_test Test: I/O Queue Performance
25 *
26 * Test the performance of the I/O queue, using typical producer
27 * consumer test. The test should examine the effect of using multiple
28 * threads on the performance.
29 *
30 * This file is <b>pjlib-test/ioq_perf.c</b>
31 *
32 * \include pjlib-test/ioq_perf.c
33 */
34
35#if INCLUDE_IOQUEUE_PERF_TEST
36
37#ifdef _MSC_VER
38# pragma warning ( disable: 4204) // non-constant aggregate initializer
39#endif
40
41#define THIS_FILE "ioq_perf"
42//#define TRACE_(expr) PJ_LOG(3,expr)
43#define TRACE_(expr)
44
45
46static pj_bool_t thread_quit_flag;
47static pj_status_t last_error;
48static unsigned last_error_counter;
49
50/* Descriptor for each producer/consumer pair. */
51typedef struct test_item
52{
53 pj_sock_t server_fd,
54 client_fd;
55 pj_ioqueue_t *ioqueue;
56 pj_ioqueue_key_t *server_key,
57 *client_key;
58 pj_ioqueue_op_key_t recv_op,
59 send_op;
60 int has_pending_send;
61 pj_size_t buffer_size;
62 char *outgoing_buffer;
63 char *incoming_buffer;
64 pj_size_t bytes_sent,
65 bytes_recv;
66} test_item;
67
68/* Callback when data has been read.
69 * Increment item->bytes_recv and ready to read the next data.
70 */
71static void on_read_complete(pj_ioqueue_key_t *key,
72 pj_ioqueue_op_key_t *op_key,
73 pj_ssize_t bytes_read)
74{
Benny Prijonof260e462007-04-30 21:03:32 +000075 test_item *item = (test_item*)pj_ioqueue_get_user_data(key);
Benny Prijono5dcb38d2005-11-21 01:55:47 +000076 pj_status_t rc;
77 int data_is_available = 1;
78
79 //TRACE_((THIS_FILE, " read complete, bytes_read=%d", bytes_read));
80
81 do {
82 if (thread_quit_flag)
83 return;
84
85 if (bytes_read < 0) {
86 pj_status_t rc = -bytes_read;
Benny Prijono2cab3a52006-03-22 19:08:19 +000087 char errmsg[PJ_ERR_MSG_SIZE];
Benny Prijono5dcb38d2005-11-21 01:55:47 +000088
89 if (rc != last_error) {
90 //last_error = rc;
91 pj_strerror(rc, errmsg, sizeof(errmsg));
Benny Prijono2cab3a52006-03-22 19:08:19 +000092 PJ_LOG(3,(THIS_FILE,"...error: read error, bytes_read=%d (%s)",
Benny Prijono5dcb38d2005-11-21 01:55:47 +000093 bytes_read, errmsg));
94 PJ_LOG(3,(THIS_FILE,
95 ".....additional info: total read=%u, total sent=%u",
96 item->bytes_recv, item->bytes_sent));
97 } else {
98 last_error_counter++;
99 }
100 bytes_read = 0;
101
102 } else if (bytes_read == 0) {
103 PJ_LOG(3,(THIS_FILE, "...socket has closed!"));
104 }
105
106 item->bytes_recv += bytes_read;
107
108 /* To assure that the test quits, even if main thread
109 * doesn't have time to run.
110 */
111 if (item->bytes_recv > item->buffer_size * 10000)
112 thread_quit_flag = 1;
113
114 bytes_read = item->buffer_size;
115 rc = pj_ioqueue_recv( key, op_key,
116 item->incoming_buffer, &bytes_read, 0 );
117
118 if (rc == PJ_SUCCESS) {
119 data_is_available = 1;
120 } else if (rc == PJ_EPENDING) {
121 data_is_available = 0;
122 } else {
123 data_is_available = 0;
124 if (rc != last_error) {
125 last_error = rc;
126 app_perror("...error: read error(1)", rc);
127 } else {
128 last_error_counter++;
129 }
130 }
131
132 if (!item->has_pending_send) {
133 pj_ssize_t sent = item->buffer_size;
134 rc = pj_ioqueue_send(item->client_key, &item->send_op,
135 item->outgoing_buffer, &sent, 0);
136 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
137 app_perror("...error: write error", rc);
138 }
139
140 item->has_pending_send = (rc==PJ_EPENDING);
141 }
142
143 } while (data_is_available);
144}
145
146/* Callback when data has been written.
147 * Increment item->bytes_sent and write the next data.
148 */
149static void on_write_complete(pj_ioqueue_key_t *key,
150 pj_ioqueue_op_key_t *op_key,
151 pj_ssize_t bytes_sent)
152{
Benny Prijonof260e462007-04-30 21:03:32 +0000153 test_item *item = (test_item*) pj_ioqueue_get_user_data(key);
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000154
155 //TRACE_((THIS_FILE, " write complete: sent = %d", bytes_sent));
156
157 if (thread_quit_flag)
158 return;
159
160 item->has_pending_send = 0;
161 item->bytes_sent += bytes_sent;
162
163 if (bytes_sent <= 0) {
164 PJ_LOG(3,(THIS_FILE, "...error: sending stopped. bytes_sent=%d",
165 bytes_sent));
166 }
167 else {
168 pj_status_t rc;
169
170 bytes_sent = item->buffer_size;
171 rc = pj_ioqueue_send( item->client_key, op_key,
172 item->outgoing_buffer, &bytes_sent, 0);
173 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
174 app_perror("...error: write error", rc);
175 }
176
177 item->has_pending_send = (rc==PJ_EPENDING);
178 }
179}
180
Benny Prijono2cab3a52006-03-22 19:08:19 +0000181struct thread_arg
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000182{
Benny Prijono2cab3a52006-03-22 19:08:19 +0000183 int id;
184 pj_ioqueue_t *ioqueue;
185 unsigned counter;
186};
187
188/* The worker thread. */
189static int worker_thread(void *p)
190{
Benny Prijonof260e462007-04-30 21:03:32 +0000191 struct thread_arg *arg = (struct thread_arg*) p;
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000192 const pj_time_val timeout = {0, 100};
193 int rc;
194
195 while (!thread_quit_flag) {
Benny Prijono2cab3a52006-03-22 19:08:19 +0000196
197 ++arg->counter;
198 rc = pj_ioqueue_poll(arg->ioqueue, &timeout);
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000199 //TRACE_((THIS_FILE, " thread: poll returned rc=%d", rc));
200 if (rc < 0) {
Benny Prijono2cab3a52006-03-22 19:08:19 +0000201 char errmsg[PJ_ERR_MSG_SIZE];
202 pj_strerror(-rc, errmsg, sizeof(errmsg));
203 PJ_LOG(3, (THIS_FILE,
204 "...error in pj_ioqueue_poll() in thread %d "
205 "after %d loop: %s [pj_status_t=%d]",
206 arg->id, arg->counter, errmsg, -rc));
207 //return -1;
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000208 }
209 }
210 return 0;
211}
212
213/* Calculate the bandwidth for the specific test configuration.
214 * The test is simple:
215 * - create sockpair_cnt number of producer-consumer socket pair.
216 * - create thread_cnt number of worker threads.
217 * - each producer will send buffer_size bytes data as fast and
218 * as soon as it can.
219 * - each consumer will read buffer_size bytes of data as fast
220 * as it could.
221 * - measure the total bytes received by all consumers during a
222 * period of time.
223 */
224static int perform_test(int sock_type, const char *type_name,
225 unsigned thread_cnt, unsigned sockpair_cnt,
226 pj_size_t buffer_size,
227 pj_size_t *p_bandwidth)
228{
229 enum { MSEC_DURATION = 5000 };
230 pj_pool_t *pool;
231 test_item *items;
232 pj_thread_t **thread;
233 pj_ioqueue_t *ioqueue;
234 pj_status_t rc;
235 pj_ioqueue_callback ioqueue_callback;
236 pj_uint32_t total_elapsed_usec, total_received;
237 pj_highprec_t bandwidth;
238 pj_timestamp start, stop;
239 unsigned i;
240
241 TRACE_((THIS_FILE, " starting test.."));
242
243 ioqueue_callback.on_read_complete = &on_read_complete;
244 ioqueue_callback.on_write_complete = &on_write_complete;
245
246 thread_quit_flag = 0;
247
248 pool = pj_pool_create(mem, NULL, 4096, 4096, NULL);
249 if (!pool)
250 return -10;
251
Benny Prijonof260e462007-04-30 21:03:32 +0000252 items = (test_item*) pj_pool_alloc(pool, sockpair_cnt*sizeof(test_item));
253 thread = (pj_thread_t**)
254 pj_pool_alloc(pool, thread_cnt*sizeof(pj_thread_t*));
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000255
256 TRACE_((THIS_FILE, " creating ioqueue.."));
257 rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue);
258 if (rc != PJ_SUCCESS) {
259 app_perror("...error: unable to create ioqueue", rc);
260 return -15;
261 }
262
263 /* Initialize each producer-consumer pair. */
264 for (i=0; i<sockpair_cnt; ++i) {
265 pj_ssize_t bytes;
266
267 items[i].ioqueue = ioqueue;
268 items[i].buffer_size = buffer_size;
Benny Prijonof260e462007-04-30 21:03:32 +0000269 items[i].outgoing_buffer = (char*) pj_pool_alloc(pool, buffer_size);
270 items[i].incoming_buffer = (char*) pj_pool_alloc(pool, buffer_size);
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000271 items[i].bytes_recv = items[i].bytes_sent = 0;
272
273 /* randomize outgoing buffer. */
274 pj_create_random_string(items[i].outgoing_buffer, buffer_size);
275
276 /* Create socket pair. */
277 TRACE_((THIS_FILE, " calling socketpair.."));
278 rc = app_socketpair(PJ_AF_INET, sock_type, 0,
279 &items[i].server_fd, &items[i].client_fd);
280 if (rc != PJ_SUCCESS) {
281 app_perror("...error: unable to create socket pair", rc);
282 return -20;
283 }
284
285 /* Register server socket to ioqueue. */
286 TRACE_((THIS_FILE, " register(1).."));
287 rc = pj_ioqueue_register_sock(pool, ioqueue,
288 items[i].server_fd,
289 &items[i], &ioqueue_callback,
290 &items[i].server_key);
291 if (rc != PJ_SUCCESS) {
292 app_perror("...error: registering server socket to ioqueue", rc);
293 return -60;
294 }
295
296 /* Register client socket to ioqueue. */
297 TRACE_((THIS_FILE, " register(2).."));
298 rc = pj_ioqueue_register_sock(pool, ioqueue,
299 items[i].client_fd,
300 &items[i], &ioqueue_callback,
301 &items[i].client_key);
302 if (rc != PJ_SUCCESS) {
303 app_perror("...error: registering server socket to ioqueue", rc);
304 return -70;
305 }
306
307 /* Start reading. */
308 TRACE_((THIS_FILE, " pj_ioqueue_recv.."));
309 bytes = items[i].buffer_size;
310 rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op,
311 items[i].incoming_buffer, &bytes,
312 0);
313 if (rc != PJ_EPENDING) {
314 app_perror("...error: pj_ioqueue_recv", rc);
315 return -73;
316 }
317
318 /* Start writing. */
319 TRACE_((THIS_FILE, " pj_ioqueue_write.."));
320 bytes = items[i].buffer_size;
Benny Prijono37e8d332006-01-20 21:03:36 +0000321 rc = pj_ioqueue_send(items[i].client_key, &items[i].send_op,
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000322 items[i].outgoing_buffer, &bytes, 0);
323 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
324 app_perror("...error: pj_ioqueue_write", rc);
325 return -76;
326 }
327
328 items[i].has_pending_send = (rc==PJ_EPENDING);
329 }
330
331 /* Create the threads. */
332 for (i=0; i<thread_cnt; ++i) {
Benny Prijono2cab3a52006-03-22 19:08:19 +0000333 struct thread_arg *arg;
334
Benny Prijono64898b52007-05-01 06:36:15 +0000335 arg = (struct thread_arg*) pj_pool_zalloc(pool, sizeof(*arg));
Benny Prijono2cab3a52006-03-22 19:08:19 +0000336 arg->id = i;
337 arg->ioqueue = ioqueue;
338 arg->counter = 0;
339
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000340 rc = pj_thread_create( pool, NULL,
341 &worker_thread,
Benny Prijono2cab3a52006-03-22 19:08:19 +0000342 arg,
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000343 PJ_THREAD_DEFAULT_STACK_SIZE,
344 PJ_THREAD_SUSPENDED, &thread[i] );
345 if (rc != PJ_SUCCESS) {
346 app_perror("...error: unable to create thread", rc);
347 return -80;
348 }
349 }
350
351 /* Mark start time. */
352 rc = pj_get_timestamp(&start);
353 if (rc != PJ_SUCCESS)
354 return -90;
355
356 /* Start the thread. */
357 TRACE_((THIS_FILE, " resuming all threads.."));
358 for (i=0; i<thread_cnt; ++i) {
359 rc = pj_thread_resume(thread[i]);
360 if (rc != 0)
361 return -100;
362 }
363
364 /* Wait for MSEC_DURATION seconds.
365 * This should be as simple as pj_thread_sleep(MSEC_DURATION) actually,
366 * but unfortunately it doesn't work when system doesn't employ
367 * timeslicing for threads.
368 */
369 TRACE_((THIS_FILE, " wait for few seconds.."));
370 do {
371 pj_thread_sleep(1);
372
373 /* Mark end time. */
374 rc = pj_get_timestamp(&stop);
375
376 if (thread_quit_flag) {
377 TRACE_((THIS_FILE, " transfer limit reached.."));
378 break;
379 }
380
381 if (pj_elapsed_usec(&start,&stop)<MSEC_DURATION * 1000) {
382 TRACE_((THIS_FILE, " time limit reached.."));
383 break;
384 }
385
386 } while (1);
387
388 /* Terminate all threads. */
389 TRACE_((THIS_FILE, " terminating all threads.."));
390 thread_quit_flag = 1;
391
392 for (i=0; i<thread_cnt; ++i) {
393 TRACE_((THIS_FILE, " join thread %d..", i));
394 pj_thread_join(thread[i]);
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000395 }
396
397 /* Close all sockets. */
398 TRACE_((THIS_FILE, " closing all sockets.."));
399 for (i=0; i<sockpair_cnt; ++i) {
400 pj_ioqueue_unregister(items[i].server_key);
401 pj_ioqueue_unregister(items[i].client_key);
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000402 }
403
Benny Prijono2cab3a52006-03-22 19:08:19 +0000404 /* Destroy threads */
405 for (i=0; i<thread_cnt; ++i) {
406 pj_thread_destroy(thread[i]);
407 }
408
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000409 /* Destroy ioqueue. */
410 TRACE_((THIS_FILE, " destroying ioqueue.."));
411 pj_ioqueue_destroy(ioqueue);
412
413 /* Calculate actual time in usec. */
414 total_elapsed_usec = pj_elapsed_usec(&start, &stop);
415
416 /* Calculate total bytes received. */
417 total_received = 0;
418 for (i=0; i<sockpair_cnt; ++i) {
419 total_received = items[i].bytes_recv;
420 }
421
422 /* bandwidth = total_received*1000/total_elapsed_usec */
423 bandwidth = total_received;
424 pj_highprec_mul(bandwidth, 1000);
425 pj_highprec_div(bandwidth, total_elapsed_usec);
426
427 *p_bandwidth = (pj_uint32_t)bandwidth;
428
Benny Prijonoac52df42006-03-25 10:06:00 +0000429 PJ_LOG(3,(THIS_FILE, " %.4s %2d %2d %8d KB/s",
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000430 type_name, thread_cnt, sockpair_cnt,
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000431 *p_bandwidth));
432
433 /* Done. */
434 pj_pool_release(pool);
435
436 TRACE_((THIS_FILE, " done.."));
437 return 0;
438}
439
440/*
441 * main test entry.
442 */
443int ioqueue_perf_test(void)
444{
445 enum { BUF_SIZE = 512 };
446 int i, rc;
447 struct {
448 int type;
449 const char *type_name;
450 int thread_cnt;
451 int sockpair_cnt;
452 } test_param[] =
453 {
454 { PJ_SOCK_DGRAM, "udp", 1, 1},
455 { PJ_SOCK_DGRAM, "udp", 1, 2},
456 { PJ_SOCK_DGRAM, "udp", 1, 4},
457 { PJ_SOCK_DGRAM, "udp", 1, 8},
458 { PJ_SOCK_DGRAM, "udp", 2, 1},
459 { PJ_SOCK_DGRAM, "udp", 2, 2},
460 { PJ_SOCK_DGRAM, "udp", 2, 4},
461 { PJ_SOCK_DGRAM, "udp", 2, 8},
462 { PJ_SOCK_DGRAM, "udp", 4, 1},
463 { PJ_SOCK_DGRAM, "udp", 4, 2},
464 { PJ_SOCK_DGRAM, "udp", 4, 4},
465 { PJ_SOCK_DGRAM, "udp", 4, 8},
Benny Prijono2cab3a52006-03-22 19:08:19 +0000466 { PJ_SOCK_DGRAM, "udp", 4, 16},
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000467 { PJ_SOCK_STREAM, "tcp", 1, 1},
468 { PJ_SOCK_STREAM, "tcp", 1, 2},
469 { PJ_SOCK_STREAM, "tcp", 1, 4},
470 { PJ_SOCK_STREAM, "tcp", 1, 8},
471 { PJ_SOCK_STREAM, "tcp", 2, 1},
472 { PJ_SOCK_STREAM, "tcp", 2, 2},
473 { PJ_SOCK_STREAM, "tcp", 2, 4},
474 { PJ_SOCK_STREAM, "tcp", 2, 8},
475 { PJ_SOCK_STREAM, "tcp", 4, 1},
476 { PJ_SOCK_STREAM, "tcp", 4, 2},
477 { PJ_SOCK_STREAM, "tcp", 4, 4},
478 { PJ_SOCK_STREAM, "tcp", 4, 8},
Benny Prijono2cab3a52006-03-22 19:08:19 +0000479 { PJ_SOCK_STREAM, "tcp", 4, 16},
480/*
481 { PJ_SOCK_DGRAM, "udp", 32, 1},
482 { PJ_SOCK_DGRAM, "udp", 32, 1},
483 { PJ_SOCK_DGRAM, "udp", 32, 1},
484 { PJ_SOCK_DGRAM, "udp", 32, 1},
485 { PJ_SOCK_DGRAM, "udp", 1, 32},
486 { PJ_SOCK_DGRAM, "udp", 1, 32},
487 { PJ_SOCK_DGRAM, "udp", 1, 32},
488 { PJ_SOCK_DGRAM, "udp", 1, 32},
489 { PJ_SOCK_STREAM, "tcp", 32, 1},
490 { PJ_SOCK_STREAM, "tcp", 32, 1},
491 { PJ_SOCK_STREAM, "tcp", 32, 1},
492 { PJ_SOCK_STREAM, "tcp", 32, 1},
493 { PJ_SOCK_STREAM, "tcp", 1, 32},
494 { PJ_SOCK_STREAM, "tcp", 1, 32},
495 { PJ_SOCK_STREAM, "tcp", 1, 32},
496 { PJ_SOCK_STREAM, "tcp", 1, 32},
497*/
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000498 };
499 pj_size_t best_bandwidth;
500 int best_index = 0;
501
502 PJ_LOG(3,(THIS_FILE, " Benchmarking %s ioqueue:", pj_ioqueue_name()));
Benny Prijono2cab3a52006-03-22 19:08:19 +0000503 PJ_LOG(3,(THIS_FILE, " ======================================="));
504 PJ_LOG(3,(THIS_FILE, " Type Threads Skt.Pairs Bandwidth"));
505 PJ_LOG(3,(THIS_FILE, " ======================================="));
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000506
507 best_bandwidth = 0;
508 for (i=0; i<sizeof(test_param)/sizeof(test_param[0]); ++i) {
509 pj_size_t bandwidth;
510
511 rc = perform_test(test_param[i].type,
512 test_param[i].type_name,
513 test_param[i].thread_cnt,
514 test_param[i].sockpair_cnt,
515 BUF_SIZE,
516 &bandwidth);
517 if (rc != 0)
518 return rc;
519
520 if (bandwidth > best_bandwidth)
521 best_bandwidth = bandwidth, best_index = i;
522
Benny Prijono2cab3a52006-03-22 19:08:19 +0000523 /* Give it a rest before next test, to allow system to close the
524 * sockets properly.
525 */
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000526 pj_thread_sleep(500);
527 }
528
529 PJ_LOG(3,(THIS_FILE,
530 " Best: Type=%s Threads=%d, Skt.Pairs=%d, Bandwidth=%u KB/s",
531 test_param[best_index].type_name,
532 test_param[best_index].thread_cnt,
533 test_param[best_index].sockpair_cnt,
534 best_bandwidth));
535 PJ_LOG(3,(THIS_FILE, " (Note: packet size=%d, total errors=%u)",
536 BUF_SIZE, last_error_counter));
537 return 0;
538}
539
540#else
541/* To prevent warning about "translation unit is empty"
542 * when this test is disabled.
543 */
544int dummy_uiq_perf_test;
545#endif /* INCLUDE_IOQUEUE_PERF_TEST */
546
547