blob: 12c4e03568ff995326f27298a7024635895f7c00 [file] [log] [blame]
Benny Prijono5dcb38d2005-11-21 01:55:47 +00001/* $Id$ */
2/*
Nanang Izzuddina62ffc92011-05-05 06:14:19 +00003 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
Benny Prijono844653c2008-12-23 17:27:53 +00004 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono5dcb38d2005-11-21 01:55:47 +00005 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20#include "test.h"
21#include <pjlib.h>
22
23#if INCLUDE_ECHO_CLIENT
24
25enum { BUF_SIZE = 512 };
26
27struct client
28{
29 int sock_type;
30 const char *server;
31 int port;
32};
33
34static pj_atomic_t *totalBytes;
35static pj_atomic_t *timeout_counter;
36static pj_atomic_t *invalid_counter;
37
38#define MSEC_PRINT_DURATION 1000
39
40static int wait_socket(pj_sock_t sock, unsigned msec_timeout)
41{
42 pj_fd_set_t fdset;
43 pj_time_val timeout;
44
45 timeout.sec = 0;
46 timeout.msec = msec_timeout;
47 pj_time_val_normalize(&timeout);
48
49 PJ_FD_ZERO(&fdset);
50 PJ_FD_SET(sock, &fdset);
51
52 return pj_sock_select(FD_SETSIZE, &fdset, NULL, NULL, &timeout);
53}
54
55static int echo_client_thread(void *arg)
56{
57 pj_sock_t sock;
58 char send_buf[BUF_SIZE];
59 char recv_buf[BUF_SIZE];
60 pj_sockaddr_in addr;
61 pj_str_t s;
62 pj_status_t rc;
63 pj_uint32_t buffer_id;
64 pj_uint32_t buffer_counter;
65 struct client *client = arg;
66 pj_status_t last_recv_err = PJ_SUCCESS, last_send_err = PJ_SUCCESS;
67 unsigned counter = 0;
68
Benny Prijono8ab968f2007-07-20 08:08:30 +000069 rc = app_socket(pj_AF_INET(), client->sock_type, 0, -1, &sock);
Benny Prijono5dcb38d2005-11-21 01:55:47 +000070 if (rc != PJ_SUCCESS) {
71 app_perror("...unable to create socket", rc);
72 return -10;
73 }
74
75 rc = pj_sockaddr_in_init( &addr, pj_cstr(&s, client->server),
76 (pj_uint16_t)client->port);
77 if (rc != PJ_SUCCESS) {
78 app_perror("...unable to resolve server", rc);
79 return -15;
80 }
81
82 rc = pj_sock_connect(sock, &addr, sizeof(addr));
83 if (rc != PJ_SUCCESS) {
84 app_perror("...connect() error", rc);
85 pj_sock_close(sock);
86 return -20;
87 }
88
89 PJ_LOG(3,("", "...socket connected to %s:%d",
90 pj_inet_ntoa(addr.sin_addr),
91 pj_ntohs(addr.sin_port)));
92
93 pj_memset(send_buf, 'A', BUF_SIZE);
94 send_buf[BUF_SIZE-1]='\0';
95
96 /* Give other thread chance to initialize themselves! */
97 pj_thread_sleep(200);
98
99 //PJ_LOG(3,("", "...thread %p running", pj_thread_this()));
100
101 buffer_id = (pj_uint32_t) pj_thread_this();
102 buffer_counter = 0;
103
104 *(pj_uint32_t*)send_buf = buffer_id;
105
106 for (;;) {
107 int rc;
108 pj_ssize_t bytes;
109
110 ++counter;
111
112 //while (wait_socket(sock,0) > 0)
113 // ;
114
115 /* Send a packet. */
116 bytes = BUF_SIZE;
117 *(pj_uint32_t*)(send_buf+4) = ++buffer_counter;
118 rc = pj_sock_send(sock, send_buf, &bytes, 0);
119 if (rc != PJ_SUCCESS || bytes != BUF_SIZE) {
120 if (rc != last_send_err) {
121 app_perror("...send() error", rc);
122 PJ_LOG(3,("", "...ignoring subsequent error.."));
123 last_send_err = rc;
124 pj_thread_sleep(100);
125 }
126 continue;
127 }
128
129 rc = wait_socket(sock, 500);
130 if (rc == 0) {
131 PJ_LOG(3,("", "...timeout"));
132 bytes = 0;
133 pj_atomic_inc(timeout_counter);
134 } else if (rc < 0) {
135 rc = pj_get_netos_error();
136 app_perror("...select() error", rc);
137 break;
138 } else {
139 /* Receive back the original packet. */
140 bytes = 0;
141 do {
142 pj_ssize_t received = BUF_SIZE - bytes;
143 rc = pj_sock_recv(sock, recv_buf+bytes, &received, 0);
144 if (rc != PJ_SUCCESS || received == 0) {
145 if (rc != last_recv_err) {
146 app_perror("...recv() error", rc);
147 PJ_LOG(3,("", "...ignoring subsequent error.."));
148 last_recv_err = rc;
149 pj_thread_sleep(100);
150 }
151 bytes = 0;
152 received = 0;
153 break;
154 }
155 bytes += received;
156 } while (bytes != BUF_SIZE && bytes != 0);
157 }
158
159 if (bytes == 0)
160 continue;
161
162 if (pj_memcmp(send_buf, recv_buf, BUF_SIZE) != 0) {
163 recv_buf[BUF_SIZE-1] = '\0';
164 PJ_LOG(3,("", "...error: buffer %u has changed!\n"
165 "send_buf=%s\n"
166 "recv_buf=%s\n",
167 counter, send_buf, recv_buf));
168 pj_atomic_inc(invalid_counter);
169 }
170
171 /* Accumulate total received. */
172 pj_atomic_add(totalBytes, bytes);
173 }
174
175 pj_sock_close(sock);
176 return 0;
177}
178
179int echo_client(int sock_type, const char *server, int port)
180{
181 pj_pool_t *pool;
182 pj_thread_t *thread[ECHO_CLIENT_MAX_THREADS];
183 pj_status_t rc;
184 struct client client;
185 int i;
186 pj_atomic_value_t last_received;
187 pj_timestamp last_report;
188
189 client.sock_type = sock_type;
190 client.server = server;
191 client.port = port;
192
193 pool = pj_pool_create( mem, NULL, 4000, 4000, NULL );
194
195 rc = pj_atomic_create(pool, 0, &totalBytes);
196 if (rc != PJ_SUCCESS) {
197 PJ_LOG(3,("", "...error: unable to create atomic variable", rc));
198 return -30;
199 }
200 rc = pj_atomic_create(pool, 0, &invalid_counter);
201 rc = pj_atomic_create(pool, 0, &timeout_counter);
202
203 PJ_LOG(3,("", "Echo client started"));
204 PJ_LOG(3,("", " Destination: %s:%d",
205 ECHO_SERVER_ADDRESS, ECHO_SERVER_START_PORT));
206 PJ_LOG(3,("", " Press Ctrl-C to exit"));
207
208 for (i=0; i<ECHO_CLIENT_MAX_THREADS; ++i) {
209 rc = pj_thread_create( pool, NULL, &echo_client_thread, &client,
210 PJ_THREAD_DEFAULT_STACK_SIZE, 0,
211 &thread[i]);
212 if (rc != PJ_SUCCESS) {
213 app_perror("...error: unable to create thread", rc);
214 return -10;
215 }
216 }
217
218 last_received = 0;
219 pj_get_timestamp(&last_report);
220
221 for (;;) {
222 pj_timestamp now;
223 unsigned long received, cur_received;
224 unsigned msec;
225 pj_highprec_t bw;
226 pj_time_val elapsed;
227 unsigned bw32;
228 pj_uint32_t timeout, invalid;
229
230 pj_thread_sleep(1000);
231
232 pj_get_timestamp(&now);
233 elapsed = pj_elapsed_time(&last_report, &now);
234 msec = PJ_TIME_VAL_MSEC(elapsed);
235
236 received = pj_atomic_get(totalBytes);
237 cur_received = received - last_received;
238
239 bw = cur_received;
240 pj_highprec_mul(bw, 1000);
241 pj_highprec_div(bw, msec);
242
243 bw32 = (unsigned)bw;
244
245 last_report = now;
246 last_received = received;
247
248 timeout = pj_atomic_get(timeout_counter);
249 invalid = pj_atomic_get(invalid_counter);
250
251 PJ_LOG(3,("",
252 "...%d threads, total bandwidth: %d KB/s, "
253 "timeout=%d, invalid=%d",
254 ECHO_CLIENT_MAX_THREADS, bw32/1000,
255 timeout, invalid));
256 }
257
258 for (i=0; i<ECHO_CLIENT_MAX_THREADS; ++i) {
259 pj_thread_join( thread[i] );
260 }
261
262 pj_pool_release(pool);
263 return 0;
264}
265
266
267#else
268int dummy_echo_client;
269#endif /* INCLUDE_ECHO_CLIENT */