blob: 86a01c7e871e1d82490904fab93161aeffc0b433 [file] [log] [blame]
Tristan Matthews0a329cc2013-07-17 13:20:14 -04001/* $Id$ */
2/*
3 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20#include "test.h"
21
22
23/**
24 * \page page_pjlib_ioqueue_udp_test Test: I/O Queue (UDP)
25 *
26 * This file provides implementation to test the
27 * functionality of the I/O queue when UDP socket is used.
28 *
29 *
30 * This file is <b>pjlib-test/ioq_udp.c</b>
31 *
32 * \include pjlib-test/ioq_udp.c
33 */
34
35
36#if INCLUDE_UDP_IOQUEUE_TEST
37
38#include <pjlib.h>
39
40#include <pj/compat/socket.h>
41
42#define THIS_FILE "test_udp"
43#define PORT 51233
44#define LOOP 2
45///#define LOOP 2
46#define BUF_MIN_SIZE 32
47#define BUF_MAX_SIZE 2048
48#define SOCK_INACTIVE_MIN (1)
49#define SOCK_INACTIVE_MAX (PJ_IOQUEUE_MAX_HANDLES - 2)
50#define POOL_SIZE (2*BUF_MAX_SIZE + SOCK_INACTIVE_MAX*128 + 2048)
51
52#undef TRACE_
53#define TRACE_(msg) PJ_LOG(3,(THIS_FILE,"....." msg))
54
55#if 0
56# define TRACE__(args) PJ_LOG(3,args)
57#else
58# define TRACE__(args)
59#endif
60
61
62static pj_ssize_t callback_read_size,
63 callback_write_size,
64 callback_accept_status,
65 callback_connect_status;
66static pj_ioqueue_key_t *callback_read_key,
67 *callback_write_key,
68 *callback_accept_key,
69 *callback_connect_key;
70static pj_ioqueue_op_key_t *callback_read_op,
71 *callback_write_op,
72 *callback_accept_op;
73
74static void on_ioqueue_read(pj_ioqueue_key_t *key,
75 pj_ioqueue_op_key_t *op_key,
76 pj_ssize_t bytes_read)
77{
78 callback_read_key = key;
79 callback_read_op = op_key;
80 callback_read_size = bytes_read;
81 TRACE__((THIS_FILE, " callback_read_key = %p, bytes=%d",
82 key, bytes_read));
83}
84
85static void on_ioqueue_write(pj_ioqueue_key_t *key,
86 pj_ioqueue_op_key_t *op_key,
87 pj_ssize_t bytes_written)
88{
89 callback_write_key = key;
90 callback_write_op = op_key;
91 callback_write_size = bytes_written;
92}
93
94static void on_ioqueue_accept(pj_ioqueue_key_t *key,
95 pj_ioqueue_op_key_t *op_key,
96 pj_sock_t sock, int status)
97{
98 PJ_UNUSED_ARG(sock);
99 callback_accept_key = key;
100 callback_accept_op = op_key;
101 callback_accept_status = status;
102}
103
104static void on_ioqueue_connect(pj_ioqueue_key_t *key, int status)
105{
106 callback_connect_key = key;
107 callback_connect_status = status;
108}
109
110static pj_ioqueue_callback test_cb =
111{
112 &on_ioqueue_read,
113 &on_ioqueue_write,
114 &on_ioqueue_accept,
115 &on_ioqueue_connect,
116};
117
118#if defined(PJ_WIN32) || defined(PJ_WIN64)
119# define S_ADDR S_un.S_addr
120#else
121# define S_ADDR s_addr
122#endif
123
124/*
125 * compliance_test()
126 * To test that the basic IOQueue functionality works. It will just exchange
127 * data between two sockets.
128 */
129static int compliance_test(pj_bool_t allow_concur)
130{
131 pj_sock_t ssock=-1, csock=-1;
132 pj_sockaddr_in addr, dst_addr;
133 int addrlen;
134 pj_pool_t *pool = NULL;
135 char *send_buf, *recv_buf;
136 pj_ioqueue_t *ioque = NULL;
137 pj_ioqueue_key_t *skey = NULL, *ckey = NULL;
138 pj_ioqueue_op_key_t read_op, write_op;
139 int bufsize = BUF_MIN_SIZE;
140 pj_ssize_t bytes;
141 int status = -1;
142 pj_str_t temp;
143 pj_bool_t send_pending, recv_pending;
144 pj_status_t rc;
145
146 pj_set_os_error(PJ_SUCCESS);
147
148 // Create pool.
149 pool = pj_pool_create(mem, NULL, POOL_SIZE, 4000, NULL);
150
151 // Allocate buffers for send and receive.
152 send_buf = (char*)pj_pool_alloc(pool, bufsize);
153 recv_buf = (char*)pj_pool_alloc(pool, bufsize);
154
155 // Allocate sockets for sending and receiving.
156 TRACE_("creating sockets...");
157 rc = pj_sock_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, &ssock);
158 if (rc==PJ_SUCCESS)
159 rc = pj_sock_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, &csock);
160 else
161 csock = PJ_INVALID_SOCKET;
162 if (rc != PJ_SUCCESS) {
163 app_perror("...ERROR in pj_sock_socket()", rc);
164 status=-1; goto on_error;
165 }
166
167 // Bind server socket.
168 TRACE_("bind socket...");
169 pj_bzero(&addr, sizeof(addr));
170 addr.sin_family = pj_AF_INET();
171 addr.sin_port = pj_htons(PORT);
172 if (pj_sock_bind(ssock, &addr, sizeof(addr))) {
173 status=-10; goto on_error;
174 }
175
176 // Create I/O Queue.
177 TRACE_("create ioqueue...");
178 rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque);
179 if (rc != PJ_SUCCESS) {
180 status=-20; goto on_error;
181 }
182
183 // Set concurrency
184 TRACE_("set concurrency...");
185 rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur);
186 if (rc != PJ_SUCCESS) {
187 status=-21; goto on_error;
188 }
189
190 // Register server and client socket.
191 // We put this after inactivity socket, hopefully this can represent the
192 // worst waiting time.
193 TRACE_("registering first sockets...");
194 rc = pj_ioqueue_register_sock(pool, ioque, ssock, NULL,
195 &test_cb, &skey);
196 if (rc != PJ_SUCCESS) {
197 app_perror("...error(10): ioqueue_register error", rc);
198 status=-25; goto on_error;
199 }
200 TRACE_("registering second sockets...");
201 rc = pj_ioqueue_register_sock( pool, ioque, csock, NULL,
202 &test_cb, &ckey);
203 if (rc != PJ_SUCCESS) {
204 app_perror("...error(11): ioqueue_register error", rc);
205 status=-26; goto on_error;
206 }
207
208 // Randomize send_buf.
209 pj_create_random_string(send_buf, bufsize);
210
211 // Register reading from ioqueue.
212 TRACE_("start recvfrom...");
213 pj_bzero(&addr, sizeof(addr));
214 addrlen = sizeof(addr);
215 bytes = bufsize;
216 rc = pj_ioqueue_recvfrom(skey, &read_op, recv_buf, &bytes, 0,
217 &addr, &addrlen);
218 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
219 app_perror("...error: pj_ioqueue_recvfrom", rc);
220 status=-28; goto on_error;
221 } else if (rc == PJ_EPENDING) {
222 recv_pending = 1;
223 PJ_LOG(3, (THIS_FILE,
224 "......ok: recvfrom returned pending"));
225 } else {
226 PJ_LOG(3, (THIS_FILE,
227 "......error: recvfrom returned immediate ok!"));
228 status=-29; goto on_error;
229 }
230
231 // Set destination address to send the packet.
232 TRACE_("set destination address...");
233 temp = pj_str("127.0.0.1");
234 if ((rc=pj_sockaddr_in_init(&dst_addr, &temp, PORT)) != 0) {
235 app_perror("...error: unable to resolve 127.0.0.1", rc);
236 status=-290; goto on_error;
237 }
238
239 // Write must return the number of bytes.
240 TRACE_("start sendto...");
241 bytes = bufsize;
242 rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0, &dst_addr,
243 sizeof(dst_addr));
244 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
245 app_perror("...error: pj_ioqueue_sendto", rc);
246 status=-30; goto on_error;
247 } else if (rc == PJ_EPENDING) {
248 send_pending = 1;
249 PJ_LOG(3, (THIS_FILE,
250 "......ok: sendto returned pending"));
251 } else {
252 send_pending = 0;
253 PJ_LOG(3, (THIS_FILE,
254 "......ok: sendto returned immediate success"));
255 }
256
257 // reset callback variables.
258 callback_read_size = callback_write_size = 0;
259 callback_accept_status = callback_connect_status = -2;
260 callback_read_key = callback_write_key =
261 callback_accept_key = callback_connect_key = NULL;
262 callback_read_op = callback_write_op = NULL;
263
264 // Poll if pending.
265 while (send_pending || recv_pending) {
266 int rc;
267 pj_time_val timeout = { 5, 0 };
268
269 TRACE_("poll...");
270#ifdef PJ_SYMBIAN
271 rc = pj_symbianos_poll(-1, PJ_TIME_VAL_MSEC(timeout));
272#else
273 rc = pj_ioqueue_poll(ioque, &timeout);
274#endif
275
276 if (rc == 0) {
277 PJ_LOG(1,(THIS_FILE, "...ERROR: timed out..."));
278 status=-45; goto on_error;
279 } else if (rc < 0) {
280 app_perror("...ERROR in ioqueue_poll()", -rc);
281 status=-50; goto on_error;
282 }
283
284 if (callback_read_key != NULL) {
285 if (callback_read_size != bufsize) {
286 status=-61; goto on_error;
287 }
288 if (callback_read_key != skey) {
289 status=-65; goto on_error;
290 }
291 if (callback_read_op != &read_op) {
292 status=-66; goto on_error;
293 }
294
295 if (pj_memcmp(send_buf, recv_buf, bufsize) != 0) {
296 status=-67; goto on_error;
297 }
298 if (addrlen != sizeof(pj_sockaddr_in)) {
299 status=-68; goto on_error;
300 }
301 if (addr.sin_family != pj_AF_INET()) {
302 status=-69; goto on_error;
303 }
304
305
306 recv_pending = 0;
307 }
308
309 if (callback_write_key != NULL) {
310 if (callback_write_size != bufsize) {
311 status=-73; goto on_error;
312 }
313 if (callback_write_key != ckey) {
314 status=-75; goto on_error;
315 }
316 if (callback_write_op != &write_op) {
317 status=-76; goto on_error;
318 }
319
320 send_pending = 0;
321 }
322 }
323
324 // Success
325 status = 0;
326
327on_error:
328 if (skey)
329 pj_ioqueue_unregister(skey);
330 else if (ssock != -1)
331 pj_sock_close(ssock);
332
333 if (ckey)
334 pj_ioqueue_unregister(ckey);
335 else if (csock != -1)
336 pj_sock_close(csock);
337
338 if (ioque != NULL)
339 pj_ioqueue_destroy(ioque);
340 pj_pool_release(pool);
341 return status;
342
343}
344
345
346static void on_read_complete(pj_ioqueue_key_t *key,
347 pj_ioqueue_op_key_t *op_key,
348 pj_ssize_t bytes_read)
349{
350 unsigned *p_packet_cnt = (unsigned*) pj_ioqueue_get_user_data(key);
351
352 PJ_UNUSED_ARG(op_key);
353 PJ_UNUSED_ARG(bytes_read);
354
355 (*p_packet_cnt)++;
356}
357
358/*
359 * unregister_test()
360 * Check if callback is still called after socket has been unregistered or
361 * closed.
362 */
363static int unregister_test(pj_bool_t allow_concur)
364{
365 enum { RPORT = 50000, SPORT = 50001 };
366 pj_pool_t *pool;
367 pj_ioqueue_t *ioqueue;
368 pj_sock_t ssock;
369 pj_sock_t rsock;
370 int addrlen;
371 pj_sockaddr_in addr;
372 pj_ioqueue_key_t *key;
373 pj_ioqueue_op_key_t opkey;
374 pj_ioqueue_callback cb;
375 unsigned packet_cnt;
376 char sendbuf[10], recvbuf[10];
377 pj_ssize_t bytes;
378 pj_time_val timeout;
379 pj_status_t status;
380
381 pool = pj_pool_create(mem, "test", 4000, 4000, NULL);
382 if (!pool) {
383 app_perror("Unable to create pool", PJ_ENOMEM);
384 return -100;
385 }
386
387 status = pj_ioqueue_create(pool, 16, &ioqueue);
388 if (status != PJ_SUCCESS) {
389 app_perror("Error creating ioqueue", status);
390 return -110;
391 }
392
393 // Set concurrency
394 TRACE_("set concurrency...");
395 status = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur);
396 if (status != PJ_SUCCESS) {
397 return -112;
398 }
399
400 /* Create sender socket */
401 status = app_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, SPORT, &ssock);
402 if (status != PJ_SUCCESS) {
403 app_perror("Error initializing socket", status);
404 return -120;
405 }
406
407 /* Create receiver socket. */
408 status = app_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, RPORT, &rsock);
409 if (status != PJ_SUCCESS) {
410 app_perror("Error initializing socket", status);
411 return -130;
412 }
413
414 /* Register rsock to ioqueue. */
415 pj_bzero(&cb, sizeof(cb));
416 cb.on_read_complete = &on_read_complete;
417 packet_cnt = 0;
418 status = pj_ioqueue_register_sock(pool, ioqueue, rsock, &packet_cnt,
419 &cb, &key);
420 if (status != PJ_SUCCESS) {
421 app_perror("Error registering to ioqueue", status);
422 return -140;
423 }
424
425 /* Init operation key. */
426 pj_ioqueue_op_key_init(&opkey, sizeof(opkey));
427
428 /* Start reading. */
429 bytes = sizeof(recvbuf);
430 status = pj_ioqueue_recv( key, &opkey, recvbuf, &bytes, 0);
431 if (status != PJ_EPENDING) {
432 app_perror("Expecting PJ_EPENDING, but got this", status);
433 return -150;
434 }
435
436 /* Init destination address. */
437 addrlen = sizeof(addr);
438 status = pj_sock_getsockname(rsock, &addr, &addrlen);
439 if (status != PJ_SUCCESS) {
440 app_perror("getsockname error", status);
441 return -160;
442 }
443
444 /* Override address with 127.0.0.1, since getsockname will return
445 * zero in the address field.
446 */
447 addr.sin_addr = pj_inet_addr2("127.0.0.1");
448
449 /* Init buffer to send */
450 pj_ansi_strcpy(sendbuf, "Hello0123");
451
452 /* Send one packet. */
453 bytes = sizeof(sendbuf);
454 status = pj_sock_sendto(ssock, sendbuf, &bytes, 0,
455 &addr, sizeof(addr));
456
457 if (status != PJ_SUCCESS) {
458 app_perror("sendto error", status);
459 return -170;
460 }
461
462 /* Check if packet is received. */
463 timeout.sec = 1; timeout.msec = 0;
464#ifdef PJ_SYMBIAN
465 pj_symbianos_poll(-1, 1000);
466#else
467 pj_ioqueue_poll(ioqueue, &timeout);
468#endif
469
470 if (packet_cnt != 1) {
471 return -180;
472 }
473
474 /* Just to make sure things are settled.. */
475 pj_thread_sleep(100);
476
477 /* Start reading again. */
478 bytes = sizeof(recvbuf);
479 status = pj_ioqueue_recv( key, &opkey, recvbuf, &bytes, 0);
480 if (status != PJ_EPENDING) {
481 app_perror("Expecting PJ_EPENDING, but got this", status);
482 return -190;
483 }
484
485 /* Reset packet counter */
486 packet_cnt = 0;
487
488 /* Send one packet. */
489 bytes = sizeof(sendbuf);
490 status = pj_sock_sendto(ssock, sendbuf, &bytes, 0,
491 &addr, sizeof(addr));
492
493 if (status != PJ_SUCCESS) {
494 app_perror("sendto error", status);
495 return -200;
496 }
497
498 /* Now unregister and close socket. */
499 pj_ioqueue_unregister(key);
500
501 /* Poll ioqueue. */
502#ifdef PJ_SYMBIAN
503 pj_symbianos_poll(-1, 1000);
504#else
505 timeout.sec = 1; timeout.msec = 0;
506 pj_ioqueue_poll(ioqueue, &timeout);
507#endif
508
509 /* Must NOT receive any packets after socket is closed! */
510 if (packet_cnt > 0) {
511 PJ_LOG(3,(THIS_FILE, "....errror: not expecting to receive packet "
512 "after socket has been closed"));
513 return -210;
514 }
515
516 /* Success */
517 pj_sock_close(ssock);
518 pj_ioqueue_destroy(ioqueue);
519
520 pj_pool_release(pool);
521
522 return 0;
523}
524
525
526/*
527 * Testing with many handles.
528 * This will just test registering PJ_IOQUEUE_MAX_HANDLES count
529 * of sockets to the ioqueue.
530 */
531static int many_handles_test(pj_bool_t allow_concur)
532{
533 enum { MAX = PJ_IOQUEUE_MAX_HANDLES };
534 pj_pool_t *pool;
535 pj_ioqueue_t *ioqueue;
536 pj_sock_t *sock;
537 pj_ioqueue_key_t **key;
538 pj_status_t rc;
539 int count, i; /* must be signed */
540
541 PJ_LOG(3,(THIS_FILE,"...testing with so many handles"));
542
543 pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
544 if (!pool)
545 return PJ_ENOMEM;
546
547 key = (pj_ioqueue_key_t**)
548 pj_pool_alloc(pool, MAX*sizeof(pj_ioqueue_key_t*));
549 sock = (pj_sock_t*) pj_pool_alloc(pool, MAX*sizeof(pj_sock_t));
550
551 /* Create IOQueue */
552 rc = pj_ioqueue_create(pool, MAX, &ioqueue);
553 if (rc != PJ_SUCCESS || ioqueue == NULL) {
554 app_perror("...error in pj_ioqueue_create", rc);
555 return -10;
556 }
557
558 // Set concurrency
559 rc = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur);
560 if (rc != PJ_SUCCESS) {
561 return -11;
562 }
563
564 /* Register as many sockets. */
565 for (count=0; count<MAX; ++count) {
566 sock[count] = PJ_INVALID_SOCKET;
567 rc = pj_sock_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, &sock[count]);
568 if (rc != PJ_SUCCESS || sock[count] == PJ_INVALID_SOCKET) {
569 PJ_LOG(3,(THIS_FILE, "....unable to create %d-th socket, rc=%d",
570 count, rc));
571 break;
572 }
573 key[count] = NULL;
574 rc = pj_ioqueue_register_sock(pool, ioqueue, sock[count],
575 NULL, &test_cb, &key[count]);
576 if (rc != PJ_SUCCESS || key[count] == NULL) {
577 PJ_LOG(3,(THIS_FILE, "....unable to register %d-th socket, rc=%d",
578 count, rc));
579 return -30;
580 }
581 }
582
583 /* Test complete. */
584
585 /* Now deregister and close all handles. */
586
587 /* NOTE for RTEMS:
588 * It seems that the order of close(sock) is pretty important here.
589 * If we close the sockets with the same order as when they were created,
590 * RTEMS doesn't seem to reuse the sockets, thus next socket created
591 * will have descriptor higher than the last socket created.
592 * If we close the sockets in the reverse order, then the descriptor will
593 * get reused.
594 * This used to cause problem with select ioqueue, since the ioqueue
595 * always gives FD_SETSIZE for the first select() argument. This ioqueue
596 * behavior can be changed with setting PJ_SELECT_NEEDS_NFDS macro.
597 */
598 for (i=count-1; i>=0; --i) {
599 ///for (i=0; i<count; ++i) {
600 rc = pj_ioqueue_unregister(key[i]);
601 if (rc != PJ_SUCCESS) {
602 app_perror("...error in pj_ioqueue_unregister", rc);
603 }
604 }
605
606 rc = pj_ioqueue_destroy(ioqueue);
607 if (rc != PJ_SUCCESS) {
608 app_perror("...error in pj_ioqueue_destroy", rc);
609 }
610
611 pj_pool_release(pool);
612
613 PJ_LOG(3,(THIS_FILE,"....many_handles_test() ok"));
614
615 return 0;
616}
617
618/*
619 * Multi-operation test.
620 */
621
622/*
623 * Benchmarking IOQueue
624 */
625static int bench_test(pj_bool_t allow_concur, int bufsize,
626 int inactive_sock_count)
627{
628 pj_sock_t ssock=-1, csock=-1;
629 pj_sockaddr_in addr;
630 pj_pool_t *pool = NULL;
631 pj_sock_t *inactive_sock=NULL;
632 pj_ioqueue_op_key_t *inactive_read_op;
633 char *send_buf, *recv_buf;
634 pj_ioqueue_t *ioque = NULL;
635 pj_ioqueue_key_t *skey, *ckey, *keys[SOCK_INACTIVE_MAX+2];
636 pj_timestamp t1, t2, t_elapsed;
637 int rc=0, i; /* i must be signed */
638 pj_str_t temp;
639 char errbuf[PJ_ERR_MSG_SIZE];
640
641 TRACE__((THIS_FILE, " bench test %d", inactive_sock_count));
642
643 // Create pool.
644 pool = pj_pool_create(mem, NULL, POOL_SIZE, 4000, NULL);
645
646 // Allocate buffers for send and receive.
647 send_buf = (char*)pj_pool_alloc(pool, bufsize);
648 recv_buf = (char*)pj_pool_alloc(pool, bufsize);
649
650 // Allocate sockets for sending and receiving.
651 rc = pj_sock_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, &ssock);
652 if (rc == PJ_SUCCESS) {
653 rc = pj_sock_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, &csock);
654 } else
655 csock = PJ_INVALID_SOCKET;
656 if (rc != PJ_SUCCESS) {
657 app_perror("...error: pj_sock_socket()", rc);
658 goto on_error;
659 }
660
661 // Bind server socket.
662 pj_bzero(&addr, sizeof(addr));
663 addr.sin_family = pj_AF_INET();
664 addr.sin_port = pj_htons(PORT);
665 if (pj_sock_bind(ssock, &addr, sizeof(addr)))
666 goto on_error;
667
668 pj_assert(inactive_sock_count+2 <= PJ_IOQUEUE_MAX_HANDLES);
669
670 // Create I/O Queue.
671 rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque);
672 if (rc != PJ_SUCCESS) {
673 app_perror("...error: pj_ioqueue_create()", rc);
674 goto on_error;
675 }
676
677 // Set concurrency
678 rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur);
679 if (rc != PJ_SUCCESS) {
680 app_perror("...error: pj_ioqueue_set_default_concurrency()", rc);
681 goto on_error;
682 }
683
684 // Allocate inactive sockets, and bind them to some arbitrary address.
685 // Then register them to the I/O queue, and start a read operation.
686 inactive_sock = (pj_sock_t*)pj_pool_alloc(pool,
687 inactive_sock_count*sizeof(pj_sock_t));
688 inactive_read_op = (pj_ioqueue_op_key_t*)pj_pool_alloc(pool,
689 inactive_sock_count*sizeof(pj_ioqueue_op_key_t));
690 pj_bzero(&addr, sizeof(addr));
691 addr.sin_family = pj_AF_INET();
692 for (i=0; i<inactive_sock_count; ++i) {
693 pj_ssize_t bytes;
694
695 rc = pj_sock_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, &inactive_sock[i]);
696 if (rc != PJ_SUCCESS || inactive_sock[i] < 0) {
697 app_perror("...error: pj_sock_socket()", rc);
698 goto on_error;
699 }
700 if ((rc=pj_sock_bind(inactive_sock[i], &addr, sizeof(addr))) != 0) {
701 pj_sock_close(inactive_sock[i]);
702 inactive_sock[i] = PJ_INVALID_SOCKET;
703 app_perror("...error: pj_sock_bind()", rc);
704 goto on_error;
705 }
706 rc = pj_ioqueue_register_sock(pool, ioque, inactive_sock[i],
707 NULL, &test_cb, &keys[i]);
708 if (rc != PJ_SUCCESS) {
709 pj_sock_close(inactive_sock[i]);
710 inactive_sock[i] = PJ_INVALID_SOCKET;
711 app_perror("...error(1): pj_ioqueue_register_sock()", rc);
712 PJ_LOG(3,(THIS_FILE, "....i=%d", i));
713 goto on_error;
714 }
715 bytes = bufsize;
716 rc = pj_ioqueue_recv(keys[i], &inactive_read_op[i], recv_buf, &bytes, 0);
717 if (rc != PJ_EPENDING) {
718 pj_sock_close(inactive_sock[i]);
719 inactive_sock[i] = PJ_INVALID_SOCKET;
720 app_perror("...error: pj_ioqueue_read()", rc);
721 goto on_error;
722 }
723 }
724
725 // Register server and client socket.
726 // We put this after inactivity socket, hopefully this can represent the
727 // worst waiting time.
728 rc = pj_ioqueue_register_sock(pool, ioque, ssock, NULL,
729 &test_cb, &skey);
730 if (rc != PJ_SUCCESS) {
731 app_perror("...error(2): pj_ioqueue_register_sock()", rc);
732 goto on_error;
733 }
734
735 rc = pj_ioqueue_register_sock(pool, ioque, csock, NULL,
736 &test_cb, &ckey);
737 if (rc != PJ_SUCCESS) {
738 app_perror("...error(3): pj_ioqueue_register_sock()", rc);
739 goto on_error;
740 }
741
742 // Set destination address to send the packet.
743 pj_sockaddr_in_init(&addr, pj_cstr(&temp, "127.0.0.1"), PORT);
744
745 // Test loop.
746 t_elapsed.u64 = 0;
747 for (i=0; i<LOOP; ++i) {
748 pj_ssize_t bytes;
749 pj_ioqueue_op_key_t read_op, write_op;
750
751 // Randomize send buffer.
752 pj_create_random_string(send_buf, bufsize);
753
754 // Start reading on the server side.
755 bytes = bufsize;
756 rc = pj_ioqueue_recv(skey, &read_op, recv_buf, &bytes, 0);
757 if (rc != PJ_EPENDING) {
758 app_perror("...error: pj_ioqueue_read()", rc);
759 break;
760 }
761
762 // Starts send on the client side.
763 bytes = bufsize;
764 rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0,
765 &addr, sizeof(addr));
766 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
767 app_perror("...error: pj_ioqueue_write()", rc);
768 break;
769 }
770 if (rc == PJ_SUCCESS) {
771 if (bytes < 0) {
772 app_perror("...error: pj_ioqueue_sendto()",(pj_status_t)-bytes);
773 break;
774 }
775 }
776
777 // Begin time.
778 pj_get_timestamp(&t1);
779
780 // Poll the queue until we've got completion event in the server side.
781 callback_read_key = NULL;
782 callback_read_size = 0;
783 TRACE__((THIS_FILE, " waiting for key = %p", skey));
784 do {
785 pj_time_val timeout = { 1, 0 };
786#ifdef PJ_SYMBIAN
787 rc = pj_symbianos_poll(-1, PJ_TIME_VAL_MSEC(timeout));
788#else
789 rc = pj_ioqueue_poll(ioque, &timeout);
790#endif
791 TRACE__((THIS_FILE, " poll rc=%d", rc));
792 } while (rc >= 0 && callback_read_key != skey);
793
794 // End time.
795 pj_get_timestamp(&t2);
796 t_elapsed.u64 += (t2.u64 - t1.u64);
797
798 if (rc < 0) {
799 app_perror(" error: pj_ioqueue_poll", -rc);
800 break;
801 }
802
803 // Compare recv buffer with send buffer.
804 if (callback_read_size != bufsize ||
805 pj_memcmp(send_buf, recv_buf, bufsize))
806 {
807 rc = -10;
808 PJ_LOG(3,(THIS_FILE, " error: size/buffer mismatch"));
809 break;
810 }
811
812 // Poll until all events are exhausted, before we start the next loop.
813 do {
814 pj_time_val timeout = { 0, 10 };
815#ifdef PJ_SYMBIAN
816 PJ_UNUSED_ARG(timeout);
817 rc = pj_symbianos_poll(-1, 100);
818#else
819 rc = pj_ioqueue_poll(ioque, &timeout);
820#endif
821 } while (rc>0);
822
823 rc = 0;
824 }
825
826 // Print results
827 if (rc == 0) {
828 pj_timestamp tzero;
829 pj_uint32_t usec_delay;
830
831 tzero.u32.hi = tzero.u32.lo = 0;
832 usec_delay = pj_elapsed_usec( &tzero, &t_elapsed);
833
834 PJ_LOG(3, (THIS_FILE, "...%10d %15d % 9d",
835 bufsize, inactive_sock_count, usec_delay));
836
837 } else {
838 PJ_LOG(2, (THIS_FILE, "...ERROR rc=%d (buf:%d, fds:%d)",
839 rc, bufsize, inactive_sock_count+2));
840 }
841
842 // Cleaning up.
843 for (i=inactive_sock_count-1; i>=0; --i) {
844 pj_ioqueue_unregister(keys[i]);
845 }
846
847 pj_ioqueue_unregister(skey);
848 pj_ioqueue_unregister(ckey);
849
850
851 pj_ioqueue_destroy(ioque);
852 pj_pool_release( pool);
853 return rc;
854
855on_error:
856 PJ_LOG(1,(THIS_FILE, "...ERROR: %s",
857 pj_strerror(pj_get_netos_error(), errbuf, sizeof(errbuf))));
858 if (ssock)
859 pj_sock_close(ssock);
860 if (csock)
861 pj_sock_close(csock);
862 for (i=0; i<inactive_sock_count && inactive_sock &&
863 inactive_sock[i]!=PJ_INVALID_SOCKET; ++i)
864 {
865 pj_sock_close(inactive_sock[i]);
866 }
867 if (ioque != NULL)
868 pj_ioqueue_destroy(ioque);
869 pj_pool_release( pool);
870 return -1;
871}
872
873static int udp_ioqueue_test_imp(pj_bool_t allow_concur)
874{
875 int status;
876 int bufsize, sock_count;
877
878 PJ_LOG(3,(THIS_FILE, "..testing with concurency=%d", allow_concur));
879
880 //goto pass1;
881
882 PJ_LOG(3, (THIS_FILE, "...compliance test (%s)", pj_ioqueue_name()));
883 if ((status=compliance_test(allow_concur)) != 0) {
884 return status;
885 }
886 PJ_LOG(3, (THIS_FILE, "....compliance test ok"));
887
888
889 PJ_LOG(3, (THIS_FILE, "...unregister test (%s)", pj_ioqueue_name()));
890 if ((status=unregister_test(allow_concur)) != 0) {
891 return status;
892 }
893 PJ_LOG(3, (THIS_FILE, "....unregister test ok"));
894
895 if ((status=many_handles_test(allow_concur)) != 0) {
896 return status;
897 }
898
899 //return 0;
900
901 PJ_LOG(4, (THIS_FILE, "...benchmarking different buffer size:"));
902 PJ_LOG(4, (THIS_FILE, "... note: buf=bytes sent, fds=# of fds, "
903 "elapsed=in timer ticks"));
904
905//pass1:
906 PJ_LOG(3, (THIS_FILE, "...Benchmarking poll times for %s:", pj_ioqueue_name()));
907 PJ_LOG(3, (THIS_FILE, "...====================================="));
908 PJ_LOG(3, (THIS_FILE, "...Buf.size #inactive-socks Time/poll"));
909 PJ_LOG(3, (THIS_FILE, "... (bytes) (nanosec)"));
910 PJ_LOG(3, (THIS_FILE, "...====================================="));
911
912 //goto pass2;
913
914 for (bufsize=BUF_MIN_SIZE; bufsize <= BUF_MAX_SIZE; bufsize *= 2) {
915 if ((status=bench_test(allow_concur, bufsize, SOCK_INACTIVE_MIN)) != 0)
916 return status;
917 }
918//pass2:
919 bufsize = 512;
920 for (sock_count=SOCK_INACTIVE_MIN+2;
921 sock_count<=SOCK_INACTIVE_MAX+2;
922 sock_count *= 2)
923 {
924 //PJ_LOG(3,(THIS_FILE, "...testing with %d fds", sock_count));
925 if ((status=bench_test(allow_concur, bufsize, sock_count-2)) != 0)
926 return status;
927 }
928 return 0;
929}
930
931int udp_ioqueue_test()
932{
933 int rc;
934
935 rc = udp_ioqueue_test_imp(PJ_TRUE);
936 if (rc != 0)
937 return rc;
938
939 rc = udp_ioqueue_test_imp(PJ_FALSE);
940 if (rc != 0)
941 return rc;
942
943 return 0;
944}
945
946#else
947/* To prevent warning about "translation unit is empty"
948 * when this test is disabled.
949 */
950int dummy_uiq_udp;
951#endif /* INCLUDE_UDP_IOQUEUE_TEST */
952
953