blob: 328620db67f253a8383b5140d11e4bf97b5dc0f8 [file] [log] [blame]
Benny Prijono4766ffe2005-11-01 17:56:59 +00001/* $Id$
2 *
3
Benny Prijonodd859a62005-11-01 16:42:51 +00004 */
5/*
6 * $Log: /pjproject-0.3/pjlib/src/pjlib-test/ioq_udp.c $
7 *
8 * 4 10/29/05 10:23p Bennylp
9 * Fixed no-memory exception.
10 *
11 * 3 10/29/05 11:51a Bennylp
12 * Version 0.3-pre2.
13 *
14 * 2 10/14/05 12:26a Bennylp
15 * Finished error code framework, some fixes in ioqueue, etc. Pretty
16 * major.
17 *
18 */
19#include "test.h"
20
21
22/**
23 * \page page_pjlib_ioqueue_udp_test Test: I/O Queue (UDP)
24 *
25 * This file provides implementation to test the
26 * functionality of the I/O queue when UDP socket is used.
27 *
28 *
29 * This file is <b>pjlib-test/ioq_udp.c</b>
30 *
31 * \include pjlib-test/ioq_udp.c
32 */
33
34
35#if INCLUDE_UDP_IOQUEUE_TEST
36
37#include <pjlib.h>
38
39#include <pj/compat/socket.h>
40
41#define THIS_FILE "test_udp"
42#define PORT 51233
43#define LOOP 100
44#define BUF_MIN_SIZE 32
45#define BUF_MAX_SIZE 2048
46#define SOCK_INACTIVE_MIN (1)
47#define SOCK_INACTIVE_MAX (PJ_IOQUEUE_MAX_HANDLES - 2)
48#define POOL_SIZE (2*BUF_MAX_SIZE + SOCK_INACTIVE_MAX*128 + 2048)
49
50#undef TRACE_
51#define TRACE_(msg) PJ_LOG(3,(THIS_FILE,"....." msg))
52
53static pj_ssize_t callback_read_size,
54 callback_write_size,
55 callback_accept_status,
56 callback_connect_status;
57static pj_ioqueue_key_t *callback_read_key,
58 *callback_write_key,
59 *callback_accept_key,
60 *callback_connect_key;
61
62static void on_ioqueue_read(pj_ioqueue_key_t *key, pj_ssize_t bytes_read)
63{
64 callback_read_key = key;
65 callback_read_size = bytes_read;
66}
67
68static void on_ioqueue_write(pj_ioqueue_key_t *key, pj_ssize_t bytes_written)
69{
70 callback_write_key = key;
71 callback_write_size = bytes_written;
72}
73
74static void on_ioqueue_accept(pj_ioqueue_key_t *key, pj_sock_t sock, int status)
75{
76 PJ_UNUSED_ARG(sock);
77 callback_accept_key = key;
78 callback_accept_status = status;
79}
80
81static void on_ioqueue_connect(pj_ioqueue_key_t *key, int status)
82{
83 callback_connect_key = key;
84 callback_connect_status = status;
85}
86
87static pj_ioqueue_callback test_cb =
88{
89 &on_ioqueue_read,
90 &on_ioqueue_write,
91 &on_ioqueue_accept,
92 &on_ioqueue_connect,
93};
94
95#ifdef PJ_WIN32
96# define S_ADDR S_un.S_addr
97#else
98# define S_ADDR s_addr
99#endif
100
101/*
102 * native_format_test()
103 * This is just a simple test to verify that various structures in sock.h
104 * are really compatible with operating system's definitions.
105 */
106static int native_format_test(void)
107{
108 pj_status_t rc;
109
110 // Test that PJ_INVALID_SOCKET is working.
111 {
112 pj_sock_t sock;
113 rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, -1, &sock);
114 if (rc == PJ_SUCCESS)
115 return -1020;
116 }
117
118 // Previous func will set errno var.
119 pj_set_os_error(PJ_SUCCESS);
120
121 return 0;
122}
123
124/*
125 * compliance_test()
126 * To test that the basic IOQueue functionality works. It will just exchange
127 * data between two sockets.
128 */
129static int compliance_test(void)
130{
131 pj_sock_t ssock=-1, csock=-1;
132 pj_sockaddr_in addr;
133 int addrlen;
134 pj_pool_t *pool = NULL;
135 char *send_buf, *recv_buf;
136 pj_ioqueue_t *ioque = NULL;
137 pj_ioqueue_key_t *skey, *ckey;
138 int bufsize = BUF_MIN_SIZE;
139 pj_ssize_t bytes, status = -1;
140 pj_str_t temp;
141 pj_bool_t send_pending, recv_pending;
142 pj_status_t rc;
143
144 pj_set_os_error(PJ_SUCCESS);
145
146 // Create pool.
147 pool = pj_pool_create(mem, NULL, POOL_SIZE, 4000, NULL);
148
149 // Allocate buffers for send and receive.
150 send_buf = (char*)pj_pool_alloc(pool, bufsize);
151 recv_buf = (char*)pj_pool_alloc(pool, bufsize);
152
153 // Allocate sockets for sending and receiving.
154 TRACE_("creating sockets...");
155 rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &ssock);
156 if (rc==PJ_SUCCESS)
157 rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &csock);
158 else
159 csock = PJ_INVALID_SOCKET;
160 if (rc != PJ_SUCCESS) {
161 app_perror("...ERROR in pj_sock_socket()", rc);
162 status=-1; goto on_error;
163 }
164
165 // Bind server socket.
166 TRACE_("bind socket...");
167 memset(&addr, 0, sizeof(addr));
168 addr.sin_family = PJ_AF_INET;
169 addr.sin_port = pj_htons(PORT);
170 if (pj_sock_bind(ssock, &addr, sizeof(addr))) {
171 status=-10; goto on_error;
172 }
173
174 // Create I/O Queue.
175 TRACE_("create ioqueue...");
176 rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES,
177 PJ_IOQUEUE_DEFAULT_THREADS, &ioque);
178 if (rc != PJ_SUCCESS) {
179 status=-20; goto on_error;
180 }
181
182 // Register server and client socket.
183 // We put this after inactivity socket, hopefully this can represent the
184 // worst waiting time.
185 TRACE_("registering first sockets...");
186 rc = pj_ioqueue_register_sock(pool, ioque, ssock, NULL,
187 &test_cb, &skey);
188 if (rc != PJ_SUCCESS) {
189 app_perror("...error(10): ioqueue_register error", rc);
190 status=-25; goto on_error;
191 }
192 TRACE_("registering second sockets...");
193 rc = pj_ioqueue_register_sock( pool, ioque, csock, NULL,
194 &test_cb, &ckey);
195 if (rc != PJ_SUCCESS) {
196 app_perror("...error(11): ioqueue_register error", rc);
197 status=-26; goto on_error;
198 }
199
200 // Set destination address to send the packet.
201 TRACE_("set destination address...");
202 temp = pj_str("127.0.0.1");
203 if ((rc=pj_sockaddr_in_init(&addr, &temp, PORT)) != 0) {
204 app_perror("...error: unable to resolve 127.0.0.1", rc);
205 status=-26; goto on_error;
206 }
207
208 // Randomize send_buf.
209 pj_create_random_string(send_buf, bufsize);
210
211 // Register reading from ioqueue.
212 TRACE_("start recvfrom...");
213 addrlen = sizeof(addr);
214 bytes = pj_ioqueue_recvfrom(ioque, skey, recv_buf, bufsize, 0,
215 &addr, &addrlen);
216 if (bytes < 0 && bytes != PJ_EPENDING) {
217 status=-28; goto on_error;
218 } else if (bytes == PJ_EPENDING) {
219 recv_pending = 1;
220 PJ_LOG(3, (THIS_FILE,
221 "......ok: recvfrom returned pending"));
222 } else {
223 PJ_LOG(3, (THIS_FILE,
224 "......error: recvfrom returned immediate ok!"));
225 status=-29; goto on_error;
226 }
227
228 // Write must return the number of bytes.
229 TRACE_("start sendto...");
230 bytes = pj_ioqueue_sendto(ioque, ckey, send_buf, bufsize, 0, &addr,
231 sizeof(addr));
232 if (bytes != bufsize && bytes != PJ_EPENDING) {
233 PJ_LOG(1,(THIS_FILE,
234 "......error: sendto returned %d", bytes));
235 status=-30; goto on_error;
236 } else if (bytes == PJ_EPENDING) {
237 send_pending = 1;
238 PJ_LOG(3, (THIS_FILE,
239 "......ok: sendto returned pending"));
240 } else {
241 send_pending = 0;
242 PJ_LOG(3, (THIS_FILE,
243 "......ok: sendto returned immediate success"));
244 }
245
246 // reset callback variables.
247 callback_read_size = callback_write_size = 0;
248 callback_accept_status = callback_connect_status = -2;
249 callback_read_key = callback_write_key =
250 callback_accept_key = callback_connect_key = NULL;
251
252 // Poll if pending.
253 while (send_pending && recv_pending) {
254 int rc;
255 pj_time_val timeout = { 5, 0 };
256
257 TRACE_("poll...");
258 rc = pj_ioqueue_poll(ioque, &timeout);
259
260 if (rc == 0) {
261 PJ_LOG(1,(THIS_FILE, "...ERROR: timed out..."));
262 status=-45; goto on_error;
263 } else if (rc < 0) {
264 app_perror("...ERROR in ioqueue_poll()", rc);
265 status=-50; goto on_error;
266 }
267
268 if (callback_read_key != NULL) {
269 if (callback_read_size != bufsize) {
270 status=-61; goto on_error;
271 }
272
273 if (callback_read_key != skey) {
274 status=-65; goto on_error;
275 }
276
277 if (memcmp(send_buf, recv_buf, bufsize) != 0) {
278 status=-70; goto on_error;
279 }
280
281
282 recv_pending = 0;
283 }
284
285 if (callback_write_key != NULL) {
286 if (callback_write_size != bufsize) {
287 status=-73; goto on_error;
288 }
289
290 if (callback_write_key != ckey) {
291 status=-75; goto on_error;
292 }
293
294 send_pending = 0;
295 }
296 }
297
298 // Success
299 status = 0;
300
301on_error:
302 if (status != 0) {
303 char errbuf[128];
304 PJ_LOG(1, (THIS_FILE,
305 "...compliance test error: status=%d, os_err=%d (%s)",
306 status, pj_get_netos_error(),
307 pj_strerror(pj_get_netos_error(), errbuf, sizeof(errbuf))));
308 }
309 if (ssock)
310 pj_sock_close(ssock);
311 if (csock)
312 pj_sock_close(csock);
313 if (ioque != NULL)
314 pj_ioqueue_destroy(ioque);
315 pj_pool_release(pool);
316 return status;
317
318}
319
320/*
321 * Testing with many handles.
322 * This will just test registering PJ_IOQUEUE_MAX_HANDLES count
323 * of sockets to the ioqueue.
324 */
325static int many_handles_test(void)
326{
327 enum { MAX = PJ_IOQUEUE_MAX_HANDLES };
328 pj_pool_t *pool;
329 pj_ioqueue_t *ioqueue;
330 pj_sock_t *sock;
331 pj_ioqueue_key_t **key;
332 pj_status_t rc;
333 int count, i;
334
335 PJ_LOG(3,(THIS_FILE,"...testing with so many handles"));
336
337 pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
338 if (!pool)
339 return PJ_ENOMEM;
340
341 key = pj_pool_alloc(pool, MAX*sizeof(pj_ioqueue_key_t*));
342 sock = pj_pool_alloc(pool, MAX*sizeof(pj_sock_t));
343
344 /* Create IOQueue */
345 rc = pj_ioqueue_create(pool, MAX,
346 PJ_IOQUEUE_DEFAULT_THREADS,
347 &ioqueue);
348 if (rc != PJ_SUCCESS || ioqueue == NULL) {
349 app_perror("...error in pj_ioqueue_create", rc);
350 return -10;
351 }
352
353 /* Register as many sockets. */
354 for (count=0; count<MAX; ++count) {
355 sock[count] = PJ_INVALID_SOCKET;
356 rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &sock[count]);
357 if (rc != PJ_SUCCESS || sock[count] == PJ_INVALID_SOCKET) {
358 PJ_LOG(3,(THIS_FILE, "....unable to create %d-th socket, rc=%d",
359 count, rc));
360 break;
361 }
362 key[count] = NULL;
363 rc = pj_ioqueue_register_sock(pool, ioqueue, sock[count],
364 NULL, &test_cb, &key[count]);
365 if (rc != PJ_SUCCESS || key[count] == NULL) {
366 PJ_LOG(3,(THIS_FILE, "....unable to register %d-th socket, rc=%d",
367 count, rc));
368 return -30;
369 }
370 }
371
372 /* Test complete. */
373
374 /* Now deregister and close all handles. */
375
376 for (i=0; i<count; ++i) {
377 rc = pj_ioqueue_unregister(ioqueue, key[i]);
378 if (rc != PJ_SUCCESS) {
379 app_perror("...error in pj_ioqueue_unregister", rc);
380 }
381 rc = pj_sock_close(sock[i]);
382 if (rc != PJ_SUCCESS) {
383 app_perror("...error in pj_sock_close", rc);
384 }
385 }
386
387 rc = pj_ioqueue_destroy(ioqueue);
388 if (rc != PJ_SUCCESS) {
389 app_perror("...error in pj_ioqueue_destroy", rc);
390 }
391
392 pj_pool_release(pool);
393
394 PJ_LOG(3,(THIS_FILE,"....many_handles_test() ok"));
395
396 return 0;
397}
398
399/*
400 * Multi-operation test.
401 */
402
403/*
404 * Benchmarking IOQueue
405 */
406static int bench_test(int bufsize, int inactive_sock_count)
407{
408 pj_sock_t ssock=-1, csock=-1;
409 pj_sockaddr_in addr;
410 pj_pool_t *pool = NULL;
411 pj_sock_t *inactive_sock=NULL;
412 char *send_buf, *recv_buf;
413 pj_ioqueue_t *ioque = NULL;
414 pj_ioqueue_key_t *skey, *ckey, *key;
415 pj_timestamp t1, t2, t_elapsed;
416 int rc=0, i;
417 pj_str_t temp;
418 char errbuf[128];
419
420 // Create pool.
421 pool = pj_pool_create(mem, NULL, POOL_SIZE, 4000, NULL);
422
423 // Allocate buffers for send and receive.
424 send_buf = (char*)pj_pool_alloc(pool, bufsize);
425 recv_buf = (char*)pj_pool_alloc(pool, bufsize);
426
427 // Allocate sockets for sending and receiving.
428 rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &ssock);
429 if (rc == PJ_SUCCESS) {
430 rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &csock);
431 } else
432 csock = PJ_INVALID_SOCKET;
433 if (rc != PJ_SUCCESS) {
434 app_perror("...error: pj_sock_socket()", rc);
435 goto on_error;
436 }
437
438 // Bind server socket.
439 memset(&addr, 0, sizeof(addr));
440 addr.sin_family = PJ_AF_INET;
441 addr.sin_port = pj_htons(PORT);
442 if (pj_sock_bind(ssock, &addr, sizeof(addr)))
443 goto on_error;
444
445 pj_assert(inactive_sock_count+2 <= PJ_IOQUEUE_MAX_HANDLES);
446
447 // Create I/O Queue.
448 rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES,
449 PJ_IOQUEUE_DEFAULT_THREADS, &ioque);
450 if (rc != PJ_SUCCESS) {
451 app_perror("...error: pj_ioqueue_create()", rc);
452 goto on_error;
453 }
454
455 // Allocate inactive sockets, and bind them to some arbitrary address.
456 // Then register them to the I/O queue, and start a read operation.
457 inactive_sock = (pj_sock_t*)pj_pool_alloc(pool,
458 inactive_sock_count*sizeof(pj_sock_t));
459 memset(&addr, 0, sizeof(addr));
460 addr.sin_family = PJ_AF_INET;
461 for (i=0; i<inactive_sock_count; ++i) {
462 rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &inactive_sock[i]);
463 if (rc != PJ_SUCCESS || inactive_sock[i] < 0) {
464 app_perror("...error: pj_sock_socket()", rc);
465 goto on_error;
466 }
467 if ((rc=pj_sock_bind(inactive_sock[i], &addr, sizeof(addr))) != 0) {
468 pj_sock_close(inactive_sock[i]);
469 inactive_sock[i] = PJ_INVALID_SOCKET;
470 app_perror("...error: pj_sock_bind()", rc);
471 goto on_error;
472 }
473 rc = pj_ioqueue_register_sock(pool, ioque, inactive_sock[i],
474 NULL, &test_cb, &key);
475 if (rc != PJ_SUCCESS) {
476 pj_sock_close(inactive_sock[i]);
477 inactive_sock[i] = PJ_INVALID_SOCKET;
478 app_perror("...error(1): pj_ioqueue_register_sock()", rc);
479 PJ_LOG(3,(THIS_FILE, "....i=%d", i));
480 goto on_error;
481 }
482 rc = pj_ioqueue_read(ioque, key, recv_buf, bufsize);
483 if ( rc < 0 && rc != PJ_EPENDING) {
484 pj_sock_close(inactive_sock[i]);
485 inactive_sock[i] = PJ_INVALID_SOCKET;
486 app_perror("...error: pj_ioqueue_read()", rc);
487 goto on_error;
488 }
489 }
490
491 // Register server and client socket.
492 // We put this after inactivity socket, hopefully this can represent the
493 // worst waiting time.
494 rc = pj_ioqueue_register_sock(pool, ioque, ssock, NULL,
495 &test_cb, &skey);
496 if (rc != PJ_SUCCESS) {
497 app_perror("...error(2): pj_ioqueue_register_sock()", rc);
498 goto on_error;
499 }
500
501 rc = pj_ioqueue_register_sock(pool, ioque, csock, NULL,
502 &test_cb, &ckey);
503 if (rc != PJ_SUCCESS) {
504 app_perror("...error(3): pj_ioqueue_register_sock()", rc);
505 goto on_error;
506 }
507
508 // Set destination address to send the packet.
509 pj_sockaddr_in_init(&addr, pj_cstr(&temp, "127.0.0.1"), PORT);
510
511 // Test loop.
512 t_elapsed.u64 = 0;
513 for (i=0; i<LOOP; ++i) {
514 pj_ssize_t bytes;
515
516 // Randomize send buffer.
517 pj_create_random_string(send_buf, bufsize);
518
519 // Start reading on the server side.
520 rc = pj_ioqueue_read(ioque, skey, recv_buf, bufsize);
521 if (rc < 0 && rc != PJ_EPENDING) {
522 app_perror("...error: pj_ioqueue_read()", rc);
523 break;
524 }
525
526 // Starts send on the client side.
527 bytes = pj_ioqueue_sendto(ioque, ckey, send_buf, bufsize, 0,
528 &addr, sizeof(addr));
529 if (bytes != bufsize && bytes != PJ_EPENDING) {
530 app_perror("...error: pj_ioqueue_write()", bytes);
531 rc = -1;
532 break;
533 }
534
535 // Begin time.
536 pj_get_timestamp(&t1);
537
538 // Poll the queue until we've got completion event in the server side.
539 callback_read_key = NULL;
540 callback_read_size = 0;
541 do {
542 rc = pj_ioqueue_poll(ioque, NULL);
543 } while (rc >= 0 && callback_read_key != skey);
544
545 // End time.
546 pj_get_timestamp(&t2);
547 t_elapsed.u64 += (t2.u64 - t1.u64);
548
549 if (rc < 0)
550 break;
551
552 // Compare recv buffer with send buffer.
553 if (callback_read_size != bufsize ||
554 memcmp(send_buf, recv_buf, bufsize))
555 {
556 rc = -1;
557 break;
558 }
559
560 // Poll until all events are exhausted, before we start the next loop.
561 do {
562 pj_time_val timeout = { 0, 10 };
563 rc = pj_ioqueue_poll(ioque, &timeout);
564 } while (rc>0);
565
566 rc = 0;
567 }
568
569 // Print results
570 if (rc == 0) {
571 pj_timestamp tzero;
572 pj_uint32_t usec_delay;
573
574 tzero.u32.hi = tzero.u32.lo = 0;
575 usec_delay = pj_elapsed_usec( &tzero, &t_elapsed);
576
577 PJ_LOG(3, (THIS_FILE, "...%10d %15d % 9d",
578 bufsize, inactive_sock_count, usec_delay));
579
580 } else {
581 PJ_LOG(2, (THIS_FILE, "...ERROR (buf:%d, fds:%d)",
582 bufsize, inactive_sock_count+2));
583 }
584
585 // Cleaning up.
586 for (i=0; i<inactive_sock_count; ++i)
587 pj_sock_close(inactive_sock[i]);
588 pj_sock_close(ssock);
589 pj_sock_close(csock);
590
591 pj_ioqueue_destroy(ioque);
592 pj_pool_release( pool);
593 return 0;
594
595on_error:
596 PJ_LOG(1,(THIS_FILE, "...ERROR: %s",
597 pj_strerror(pj_get_netos_error(), errbuf, sizeof(errbuf))));
598 if (ssock)
599 pj_sock_close(ssock);
600 if (csock)
601 pj_sock_close(csock);
602 for (i=0; i<inactive_sock_count && inactive_sock &&
603 inactive_sock[i]!=PJ_INVALID_SOCKET; ++i)
604 {
605 pj_sock_close(inactive_sock[i]);
606 }
607 if (ioque != NULL)
608 pj_ioqueue_destroy(ioque);
609 pj_pool_release( pool);
610 return -1;
611}
612
613int udp_ioqueue_test()
614{
615 int status;
616 int bufsize, sock_count;
617
618 PJ_LOG(3, (THIS_FILE, "...format test"));
619 if ((status = native_format_test()) != 0)
620 return status;
621 PJ_LOG(3, (THIS_FILE, "....native format test ok"));
622
623 PJ_LOG(3, (THIS_FILE, "...compliance test"));
624 if ((status=compliance_test()) != 0) {
625 return status;
626 }
627 PJ_LOG(3, (THIS_FILE, "....compliance test ok"));
628
629 if ((status=many_handles_test()) != 0) {
630 return status;
631 }
632
633 PJ_LOG(4, (THIS_FILE, "...benchmarking different buffer size:"));
634 PJ_LOG(4, (THIS_FILE, "... note: buf=bytes sent, fds=# of fds, "
635 "elapsed=in timer ticks"));
636
637 PJ_LOG(3, (THIS_FILE, "...Benchmarking poll times:"));
638 PJ_LOG(3, (THIS_FILE, "...====================================="));
639 PJ_LOG(3, (THIS_FILE, "...Buf.size #inactive-socks Time/poll"));
640 PJ_LOG(3, (THIS_FILE, "... (bytes) (nanosec)"));
641 PJ_LOG(3, (THIS_FILE, "...====================================="));
642
643 for (bufsize=BUF_MIN_SIZE; bufsize <= BUF_MAX_SIZE; bufsize *= 2) {
644 if (bench_test(bufsize, SOCK_INACTIVE_MIN))
645 return -1;
646 }
647 bufsize = 512;
648 for (sock_count=SOCK_INACTIVE_MIN+2;
649 sock_count<=SOCK_INACTIVE_MAX+2;
650 sock_count *= 2)
651 {
652 //PJ_LOG(3,(THIS_FILE, "...testing with %d fds", sock_count));
653 if (bench_test(bufsize, sock_count-2))
654 return -1;
655 }
656 return 0;
657}
658
659#else
660/* To prevent warning about "translation unit is empty"
661 * when this test is disabled.
662 */
663int dummy_uiq_udp;
664#endif /* INCLUDE_UDP_IOQUEUE_TEST */
665
666