blob: 3a8de8e1692a58ec501d70b4dbe7271e185ff343 [file] [log] [blame]
Benny Prijono5dcb38d2005-11-21 01:55:47 +00001/* $Id$ */
2/*
3 * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19#include "test.h"
20#include <pjlib.h>
21#include <pj/compat/high_precision.h>
22
23/**
24 * \page page_pjlib_ioqueue_perf_test Test: I/O Queue Performance
25 *
26 * Test the performance of the I/O queue, using typical producer
27 * consumer test. The test should examine the effect of using multiple
28 * threads on the performance.
29 *
30 * This file is <b>pjlib-test/ioq_perf.c</b>
31 *
32 * \include pjlib-test/ioq_perf.c
33 */
34
35#if INCLUDE_IOQUEUE_PERF_TEST
36
37#ifdef _MSC_VER
38# pragma warning ( disable: 4204) // non-constant aggregate initializer
39#endif
40
41#define THIS_FILE "ioq_perf"
42//#define TRACE_(expr) PJ_LOG(3,expr)
43#define TRACE_(expr)
44
45
46static pj_bool_t thread_quit_flag;
47static pj_status_t last_error;
48static unsigned last_error_counter;
49
50/* Descriptor for each producer/consumer pair. */
51typedef struct test_item
52{
53 pj_sock_t server_fd,
54 client_fd;
55 pj_ioqueue_t *ioqueue;
56 pj_ioqueue_key_t *server_key,
57 *client_key;
58 pj_ioqueue_op_key_t recv_op,
59 send_op;
60 int has_pending_send;
61 pj_size_t buffer_size;
62 char *outgoing_buffer;
63 char *incoming_buffer;
64 pj_size_t bytes_sent,
65 bytes_recv;
66} test_item;
67
68/* Callback when data has been read.
69 * Increment item->bytes_recv and ready to read the next data.
70 */
71static void on_read_complete(pj_ioqueue_key_t *key,
72 pj_ioqueue_op_key_t *op_key,
73 pj_ssize_t bytes_read)
74{
75 test_item *item = pj_ioqueue_get_user_data(key);
76 pj_status_t rc;
77 int data_is_available = 1;
78
79 //TRACE_((THIS_FILE, " read complete, bytes_read=%d", bytes_read));
80
81 do {
82 if (thread_quit_flag)
83 return;
84
85 if (bytes_read < 0) {
86 pj_status_t rc = -bytes_read;
87 char errmsg[128];
88
89 if (rc != last_error) {
90 //last_error = rc;
91 pj_strerror(rc, errmsg, sizeof(errmsg));
92 PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)",
93 bytes_read, errmsg));
94 PJ_LOG(3,(THIS_FILE,
95 ".....additional info: total read=%u, total sent=%u",
96 item->bytes_recv, item->bytes_sent));
97 } else {
98 last_error_counter++;
99 }
100 bytes_read = 0;
101
102 } else if (bytes_read == 0) {
103 PJ_LOG(3,(THIS_FILE, "...socket has closed!"));
104 }
105
106 item->bytes_recv += bytes_read;
107
108 /* To assure that the test quits, even if main thread
109 * doesn't have time to run.
110 */
111 if (item->bytes_recv > item->buffer_size * 10000)
112 thread_quit_flag = 1;
113
114 bytes_read = item->buffer_size;
115 rc = pj_ioqueue_recv( key, op_key,
116 item->incoming_buffer, &bytes_read, 0 );
117
118 if (rc == PJ_SUCCESS) {
119 data_is_available = 1;
120 } else if (rc == PJ_EPENDING) {
121 data_is_available = 0;
122 } else {
123 data_is_available = 0;
124 if (rc != last_error) {
125 last_error = rc;
126 app_perror("...error: read error(1)", rc);
127 } else {
128 last_error_counter++;
129 }
130 }
131
132 if (!item->has_pending_send) {
133 pj_ssize_t sent = item->buffer_size;
134 rc = pj_ioqueue_send(item->client_key, &item->send_op,
135 item->outgoing_buffer, &sent, 0);
136 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
137 app_perror("...error: write error", rc);
138 }
139
140 item->has_pending_send = (rc==PJ_EPENDING);
141 }
142
143 } while (data_is_available);
144}
145
146/* Callback when data has been written.
147 * Increment item->bytes_sent and write the next data.
148 */
149static void on_write_complete(pj_ioqueue_key_t *key,
150 pj_ioqueue_op_key_t *op_key,
151 pj_ssize_t bytes_sent)
152{
153 test_item *item = pj_ioqueue_get_user_data(key);
154
155 //TRACE_((THIS_FILE, " write complete: sent = %d", bytes_sent));
156
157 if (thread_quit_flag)
158 return;
159
160 item->has_pending_send = 0;
161 item->bytes_sent += bytes_sent;
162
163 if (bytes_sent <= 0) {
164 PJ_LOG(3,(THIS_FILE, "...error: sending stopped. bytes_sent=%d",
165 bytes_sent));
166 }
167 else {
168 pj_status_t rc;
169
170 bytes_sent = item->buffer_size;
171 rc = pj_ioqueue_send( item->client_key, op_key,
172 item->outgoing_buffer, &bytes_sent, 0);
173 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
174 app_perror("...error: write error", rc);
175 }
176
177 item->has_pending_send = (rc==PJ_EPENDING);
178 }
179}
180
181/* The worker thread. */
182static int worker_thread(void *arg)
183{
184 pj_ioqueue_t *ioqueue = arg;
185 const pj_time_val timeout = {0, 100};
186 int rc;
187
188 while (!thread_quit_flag) {
189 rc = pj_ioqueue_poll(ioqueue, &timeout);
190 //TRACE_((THIS_FILE, " thread: poll returned rc=%d", rc));
191 if (rc < 0) {
192 app_perror("...error in pj_ioqueue_poll()", pj_get_netos_error());
193 return -1;
194 }
195 }
196 return 0;
197}
198
199/* Calculate the bandwidth for the specific test configuration.
200 * The test is simple:
201 * - create sockpair_cnt number of producer-consumer socket pair.
202 * - create thread_cnt number of worker threads.
203 * - each producer will send buffer_size bytes data as fast and
204 * as soon as it can.
205 * - each consumer will read buffer_size bytes of data as fast
206 * as it could.
207 * - measure the total bytes received by all consumers during a
208 * period of time.
209 */
210static int perform_test(int sock_type, const char *type_name,
211 unsigned thread_cnt, unsigned sockpair_cnt,
212 pj_size_t buffer_size,
213 pj_size_t *p_bandwidth)
214{
215 enum { MSEC_DURATION = 5000 };
216 pj_pool_t *pool;
217 test_item *items;
218 pj_thread_t **thread;
219 pj_ioqueue_t *ioqueue;
220 pj_status_t rc;
221 pj_ioqueue_callback ioqueue_callback;
222 pj_uint32_t total_elapsed_usec, total_received;
223 pj_highprec_t bandwidth;
224 pj_timestamp start, stop;
225 unsigned i;
226
227 TRACE_((THIS_FILE, " starting test.."));
228
229 ioqueue_callback.on_read_complete = &on_read_complete;
230 ioqueue_callback.on_write_complete = &on_write_complete;
231
232 thread_quit_flag = 0;
233
234 pool = pj_pool_create(mem, NULL, 4096, 4096, NULL);
235 if (!pool)
236 return -10;
237
238 items = pj_pool_alloc(pool, sockpair_cnt*sizeof(test_item));
239 thread = pj_pool_alloc(pool, thread_cnt*sizeof(pj_thread_t*));
240
241 TRACE_((THIS_FILE, " creating ioqueue.."));
242 rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue);
243 if (rc != PJ_SUCCESS) {
244 app_perror("...error: unable to create ioqueue", rc);
245 return -15;
246 }
247
248 /* Initialize each producer-consumer pair. */
249 for (i=0; i<sockpair_cnt; ++i) {
250 pj_ssize_t bytes;
251
252 items[i].ioqueue = ioqueue;
253 items[i].buffer_size = buffer_size;
254 items[i].outgoing_buffer = pj_pool_alloc(pool, buffer_size);
255 items[i].incoming_buffer = pj_pool_alloc(pool, buffer_size);
256 items[i].bytes_recv = items[i].bytes_sent = 0;
257
258 /* randomize outgoing buffer. */
259 pj_create_random_string(items[i].outgoing_buffer, buffer_size);
260
261 /* Create socket pair. */
262 TRACE_((THIS_FILE, " calling socketpair.."));
263 rc = app_socketpair(PJ_AF_INET, sock_type, 0,
264 &items[i].server_fd, &items[i].client_fd);
265 if (rc != PJ_SUCCESS) {
266 app_perror("...error: unable to create socket pair", rc);
267 return -20;
268 }
269
270 /* Register server socket to ioqueue. */
271 TRACE_((THIS_FILE, " register(1).."));
272 rc = pj_ioqueue_register_sock(pool, ioqueue,
273 items[i].server_fd,
274 &items[i], &ioqueue_callback,
275 &items[i].server_key);
276 if (rc != PJ_SUCCESS) {
277 app_perror("...error: registering server socket to ioqueue", rc);
278 return -60;
279 }
280
281 /* Register client socket to ioqueue. */
282 TRACE_((THIS_FILE, " register(2).."));
283 rc = pj_ioqueue_register_sock(pool, ioqueue,
284 items[i].client_fd,
285 &items[i], &ioqueue_callback,
286 &items[i].client_key);
287 if (rc != PJ_SUCCESS) {
288 app_perror("...error: registering server socket to ioqueue", rc);
289 return -70;
290 }
291
292 /* Start reading. */
293 TRACE_((THIS_FILE, " pj_ioqueue_recv.."));
294 bytes = items[i].buffer_size;
295 rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op,
296 items[i].incoming_buffer, &bytes,
297 0);
298 if (rc != PJ_EPENDING) {
299 app_perror("...error: pj_ioqueue_recv", rc);
300 return -73;
301 }
302
303 /* Start writing. */
304 TRACE_((THIS_FILE, " pj_ioqueue_write.."));
305 bytes = items[i].buffer_size;
306 rc = pj_ioqueue_send(items[i].client_key, &items[i].recv_op,
307 items[i].outgoing_buffer, &bytes, 0);
308 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
309 app_perror("...error: pj_ioqueue_write", rc);
310 return -76;
311 }
312
313 items[i].has_pending_send = (rc==PJ_EPENDING);
314 }
315
316 /* Create the threads. */
317 for (i=0; i<thread_cnt; ++i) {
318 rc = pj_thread_create( pool, NULL,
319 &worker_thread,
320 ioqueue,
321 PJ_THREAD_DEFAULT_STACK_SIZE,
322 PJ_THREAD_SUSPENDED, &thread[i] );
323 if (rc != PJ_SUCCESS) {
324 app_perror("...error: unable to create thread", rc);
325 return -80;
326 }
327 }
328
329 /* Mark start time. */
330 rc = pj_get_timestamp(&start);
331 if (rc != PJ_SUCCESS)
332 return -90;
333
334 /* Start the thread. */
335 TRACE_((THIS_FILE, " resuming all threads.."));
336 for (i=0; i<thread_cnt; ++i) {
337 rc = pj_thread_resume(thread[i]);
338 if (rc != 0)
339 return -100;
340 }
341
342 /* Wait for MSEC_DURATION seconds.
343 * This should be as simple as pj_thread_sleep(MSEC_DURATION) actually,
344 * but unfortunately it doesn't work when system doesn't employ
345 * timeslicing for threads.
346 */
347 TRACE_((THIS_FILE, " wait for few seconds.."));
348 do {
349 pj_thread_sleep(1);
350
351 /* Mark end time. */
352 rc = pj_get_timestamp(&stop);
353
354 if (thread_quit_flag) {
355 TRACE_((THIS_FILE, " transfer limit reached.."));
356 break;
357 }
358
359 if (pj_elapsed_usec(&start,&stop)<MSEC_DURATION * 1000) {
360 TRACE_((THIS_FILE, " time limit reached.."));
361 break;
362 }
363
364 } while (1);
365
366 /* Terminate all threads. */
367 TRACE_((THIS_FILE, " terminating all threads.."));
368 thread_quit_flag = 1;
369
370 for (i=0; i<thread_cnt; ++i) {
371 TRACE_((THIS_FILE, " join thread %d..", i));
372 pj_thread_join(thread[i]);
373 pj_thread_destroy(thread[i]);
374 }
375
376 /* Close all sockets. */
377 TRACE_((THIS_FILE, " closing all sockets.."));
378 for (i=0; i<sockpair_cnt; ++i) {
379 pj_ioqueue_unregister(items[i].server_key);
380 pj_ioqueue_unregister(items[i].client_key);
381 pj_sock_close(items[i].server_fd);
382 pj_sock_close(items[i].client_fd);
383 }
384
385 /* Destroy ioqueue. */
386 TRACE_((THIS_FILE, " destroying ioqueue.."));
387 pj_ioqueue_destroy(ioqueue);
388
389 /* Calculate actual time in usec. */
390 total_elapsed_usec = pj_elapsed_usec(&start, &stop);
391
392 /* Calculate total bytes received. */
393 total_received = 0;
394 for (i=0; i<sockpair_cnt; ++i) {
395 total_received = items[i].bytes_recv;
396 }
397
398 /* bandwidth = total_received*1000/total_elapsed_usec */
399 bandwidth = total_received;
400 pj_highprec_mul(bandwidth, 1000);
401 pj_highprec_div(bandwidth, total_elapsed_usec);
402
403 *p_bandwidth = (pj_uint32_t)bandwidth;
404
405 PJ_LOG(3,(THIS_FILE, " %.4s %d %d %3d us %8d KB/s",
406 type_name, thread_cnt, sockpair_cnt,
407 -1 /*total_elapsed_usec/sockpair_cnt*/,
408 *p_bandwidth));
409
410 /* Done. */
411 pj_pool_release(pool);
412
413 TRACE_((THIS_FILE, " done.."));
414 return 0;
415}
416
417/*
418 * main test entry.
419 */
420int ioqueue_perf_test(void)
421{
422 enum { BUF_SIZE = 512 };
423 int i, rc;
424 struct {
425 int type;
426 const char *type_name;
427 int thread_cnt;
428 int sockpair_cnt;
429 } test_param[] =
430 {
431 { PJ_SOCK_DGRAM, "udp", 1, 1},
432 { PJ_SOCK_DGRAM, "udp", 1, 2},
433 { PJ_SOCK_DGRAM, "udp", 1, 4},
434 { PJ_SOCK_DGRAM, "udp", 1, 8},
435 { PJ_SOCK_DGRAM, "udp", 2, 1},
436 { PJ_SOCK_DGRAM, "udp", 2, 2},
437 { PJ_SOCK_DGRAM, "udp", 2, 4},
438 { PJ_SOCK_DGRAM, "udp", 2, 8},
439 { PJ_SOCK_DGRAM, "udp", 4, 1},
440 { PJ_SOCK_DGRAM, "udp", 4, 2},
441 { PJ_SOCK_DGRAM, "udp", 4, 4},
442 { PJ_SOCK_DGRAM, "udp", 4, 8},
443 { PJ_SOCK_STREAM, "tcp", 1, 1},
444 { PJ_SOCK_STREAM, "tcp", 1, 2},
445 { PJ_SOCK_STREAM, "tcp", 1, 4},
446 { PJ_SOCK_STREAM, "tcp", 1, 8},
447 { PJ_SOCK_STREAM, "tcp", 2, 1},
448 { PJ_SOCK_STREAM, "tcp", 2, 2},
449 { PJ_SOCK_STREAM, "tcp", 2, 4},
450 { PJ_SOCK_STREAM, "tcp", 2, 8},
451 { PJ_SOCK_STREAM, "tcp", 4, 1},
452 { PJ_SOCK_STREAM, "tcp", 4, 2},
453 { PJ_SOCK_STREAM, "tcp", 4, 4},
454 { PJ_SOCK_STREAM, "tcp", 4, 8},
455 };
456 pj_size_t best_bandwidth;
457 int best_index = 0;
458
459 PJ_LOG(3,(THIS_FILE, " Benchmarking %s ioqueue:", pj_ioqueue_name()));
460 PJ_LOG(3,(THIS_FILE, " ==============================================="));
461 PJ_LOG(3,(THIS_FILE, " Type Threads Skt.Pairs Avg.Time Bandwidth"));
462 PJ_LOG(3,(THIS_FILE, " ==============================================="));
463
464 best_bandwidth = 0;
465 for (i=0; i<sizeof(test_param)/sizeof(test_param[0]); ++i) {
466 pj_size_t bandwidth;
467
468 rc = perform_test(test_param[i].type,
469 test_param[i].type_name,
470 test_param[i].thread_cnt,
471 test_param[i].sockpair_cnt,
472 BUF_SIZE,
473 &bandwidth);
474 if (rc != 0)
475 return rc;
476
477 if (bandwidth > best_bandwidth)
478 best_bandwidth = bandwidth, best_index = i;
479
480 /* Give it a rest before next test. */
481 pj_thread_sleep(500);
482 }
483
484 PJ_LOG(3,(THIS_FILE,
485 " Best: Type=%s Threads=%d, Skt.Pairs=%d, Bandwidth=%u KB/s",
486 test_param[best_index].type_name,
487 test_param[best_index].thread_cnt,
488 test_param[best_index].sockpair_cnt,
489 best_bandwidth));
490 PJ_LOG(3,(THIS_FILE, " (Note: packet size=%d, total errors=%u)",
491 BUF_SIZE, last_error_counter));
492 return 0;
493}
494
495#else
496/* To prevent warning about "translation unit is empty"
497 * when this test is disabled.
498 */
499int dummy_uiq_perf_test;
500#endif /* INCLUDE_IOQUEUE_PERF_TEST */
501
502