blob: 4f7448b09ffd22a5357b1eb4051cb1702486bcee [file] [log] [blame]
Benny Prijono5dcb38d2005-11-21 01:55:47 +00001/* $Id$ */
2/*
Benny Prijonoa771a512007-02-19 01:13:53 +00003 * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org>
Benny Prijono5dcb38d2005-11-21 01:55:47 +00004 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19#include "test.h"
20#include <pjlib.h>
21
22#if INCLUDE_ECHO_CLIENT
23
24enum { BUF_SIZE = 512 };
25
26struct client
27{
28 int sock_type;
29 const char *server;
30 int port;
31};
32
33static pj_atomic_t *totalBytes;
34static pj_atomic_t *timeout_counter;
35static pj_atomic_t *invalid_counter;
36
37#define MSEC_PRINT_DURATION 1000
38
39static int wait_socket(pj_sock_t sock, unsigned msec_timeout)
40{
41 pj_fd_set_t fdset;
42 pj_time_val timeout;
43
44 timeout.sec = 0;
45 timeout.msec = msec_timeout;
46 pj_time_val_normalize(&timeout);
47
48 PJ_FD_ZERO(&fdset);
49 PJ_FD_SET(sock, &fdset);
50
51 return pj_sock_select(FD_SETSIZE, &fdset, NULL, NULL, &timeout);
52}
53
54static int echo_client_thread(void *arg)
55{
56 pj_sock_t sock;
57 char send_buf[BUF_SIZE];
58 char recv_buf[BUF_SIZE];
59 pj_sockaddr_in addr;
60 pj_str_t s;
61 pj_status_t rc;
62 pj_uint32_t buffer_id;
63 pj_uint32_t buffer_counter;
64 struct client *client = arg;
65 pj_status_t last_recv_err = PJ_SUCCESS, last_send_err = PJ_SUCCESS;
66 unsigned counter = 0;
67
Benny Prijono8ab968f2007-07-20 08:08:30 +000068 rc = app_socket(pj_AF_INET(), client->sock_type, 0, -1, &sock);
Benny Prijono5dcb38d2005-11-21 01:55:47 +000069 if (rc != PJ_SUCCESS) {
70 app_perror("...unable to create socket", rc);
71 return -10;
72 }
73
74 rc = pj_sockaddr_in_init( &addr, pj_cstr(&s, client->server),
75 (pj_uint16_t)client->port);
76 if (rc != PJ_SUCCESS) {
77 app_perror("...unable to resolve server", rc);
78 return -15;
79 }
80
81 rc = pj_sock_connect(sock, &addr, sizeof(addr));
82 if (rc != PJ_SUCCESS) {
83 app_perror("...connect() error", rc);
84 pj_sock_close(sock);
85 return -20;
86 }
87
88 PJ_LOG(3,("", "...socket connected to %s:%d",
89 pj_inet_ntoa(addr.sin_addr),
90 pj_ntohs(addr.sin_port)));
91
92 pj_memset(send_buf, 'A', BUF_SIZE);
93 send_buf[BUF_SIZE-1]='\0';
94
95 /* Give other thread chance to initialize themselves! */
96 pj_thread_sleep(200);
97
98 //PJ_LOG(3,("", "...thread %p running", pj_thread_this()));
99
100 buffer_id = (pj_uint32_t) pj_thread_this();
101 buffer_counter = 0;
102
103 *(pj_uint32_t*)send_buf = buffer_id;
104
105 for (;;) {
106 int rc;
107 pj_ssize_t bytes;
108
109 ++counter;
110
111 //while (wait_socket(sock,0) > 0)
112 // ;
113
114 /* Send a packet. */
115 bytes = BUF_SIZE;
116 *(pj_uint32_t*)(send_buf+4) = ++buffer_counter;
117 rc = pj_sock_send(sock, send_buf, &bytes, 0);
118 if (rc != PJ_SUCCESS || bytes != BUF_SIZE) {
119 if (rc != last_send_err) {
120 app_perror("...send() error", rc);
121 PJ_LOG(3,("", "...ignoring subsequent error.."));
122 last_send_err = rc;
123 pj_thread_sleep(100);
124 }
125 continue;
126 }
127
128 rc = wait_socket(sock, 500);
129 if (rc == 0) {
130 PJ_LOG(3,("", "...timeout"));
131 bytes = 0;
132 pj_atomic_inc(timeout_counter);
133 } else if (rc < 0) {
134 rc = pj_get_netos_error();
135 app_perror("...select() error", rc);
136 break;
137 } else {
138 /* Receive back the original packet. */
139 bytes = 0;
140 do {
141 pj_ssize_t received = BUF_SIZE - bytes;
142 rc = pj_sock_recv(sock, recv_buf+bytes, &received, 0);
143 if (rc != PJ_SUCCESS || received == 0) {
144 if (rc != last_recv_err) {
145 app_perror("...recv() error", rc);
146 PJ_LOG(3,("", "...ignoring subsequent error.."));
147 last_recv_err = rc;
148 pj_thread_sleep(100);
149 }
150 bytes = 0;
151 received = 0;
152 break;
153 }
154 bytes += received;
155 } while (bytes != BUF_SIZE && bytes != 0);
156 }
157
158 if (bytes == 0)
159 continue;
160
161 if (pj_memcmp(send_buf, recv_buf, BUF_SIZE) != 0) {
162 recv_buf[BUF_SIZE-1] = '\0';
163 PJ_LOG(3,("", "...error: buffer %u has changed!\n"
164 "send_buf=%s\n"
165 "recv_buf=%s\n",
166 counter, send_buf, recv_buf));
167 pj_atomic_inc(invalid_counter);
168 }
169
170 /* Accumulate total received. */
171 pj_atomic_add(totalBytes, bytes);
172 }
173
174 pj_sock_close(sock);
175 return 0;
176}
177
178int echo_client(int sock_type, const char *server, int port)
179{
180 pj_pool_t *pool;
181 pj_thread_t *thread[ECHO_CLIENT_MAX_THREADS];
182 pj_status_t rc;
183 struct client client;
184 int i;
185 pj_atomic_value_t last_received;
186 pj_timestamp last_report;
187
188 client.sock_type = sock_type;
189 client.server = server;
190 client.port = port;
191
192 pool = pj_pool_create( mem, NULL, 4000, 4000, NULL );
193
194 rc = pj_atomic_create(pool, 0, &totalBytes);
195 if (rc != PJ_SUCCESS) {
196 PJ_LOG(3,("", "...error: unable to create atomic variable", rc));
197 return -30;
198 }
199 rc = pj_atomic_create(pool, 0, &invalid_counter);
200 rc = pj_atomic_create(pool, 0, &timeout_counter);
201
202 PJ_LOG(3,("", "Echo client started"));
203 PJ_LOG(3,("", " Destination: %s:%d",
204 ECHO_SERVER_ADDRESS, ECHO_SERVER_START_PORT));
205 PJ_LOG(3,("", " Press Ctrl-C to exit"));
206
207 for (i=0; i<ECHO_CLIENT_MAX_THREADS; ++i) {
208 rc = pj_thread_create( pool, NULL, &echo_client_thread, &client,
209 PJ_THREAD_DEFAULT_STACK_SIZE, 0,
210 &thread[i]);
211 if (rc != PJ_SUCCESS) {
212 app_perror("...error: unable to create thread", rc);
213 return -10;
214 }
215 }
216
217 last_received = 0;
218 pj_get_timestamp(&last_report);
219
220 for (;;) {
221 pj_timestamp now;
222 unsigned long received, cur_received;
223 unsigned msec;
224 pj_highprec_t bw;
225 pj_time_val elapsed;
226 unsigned bw32;
227 pj_uint32_t timeout, invalid;
228
229 pj_thread_sleep(1000);
230
231 pj_get_timestamp(&now);
232 elapsed = pj_elapsed_time(&last_report, &now);
233 msec = PJ_TIME_VAL_MSEC(elapsed);
234
235 received = pj_atomic_get(totalBytes);
236 cur_received = received - last_received;
237
238 bw = cur_received;
239 pj_highprec_mul(bw, 1000);
240 pj_highprec_div(bw, msec);
241
242 bw32 = (unsigned)bw;
243
244 last_report = now;
245 last_received = received;
246
247 timeout = pj_atomic_get(timeout_counter);
248 invalid = pj_atomic_get(invalid_counter);
249
250 PJ_LOG(3,("",
251 "...%d threads, total bandwidth: %d KB/s, "
252 "timeout=%d, invalid=%d",
253 ECHO_CLIENT_MAX_THREADS, bw32/1000,
254 timeout, invalid));
255 }
256
257 for (i=0; i<ECHO_CLIENT_MAX_THREADS; ++i) {
258 pj_thread_join( thread[i] );
259 }
260
261 pj_pool_release(pool);
262 return 0;
263}
264
265
266#else
267int dummy_echo_client;
268#endif /* INCLUDE_ECHO_CLIENT */