blob: e10b5ffe9d098a2373b9f91cc559c99a2a5912f0 [file] [log] [blame]
Benny Prijono4766ffe2005-11-01 17:56:59 +00001/* $Id$
Benny Prijonodd859a62005-11-01 16:42:51 +00002 */
3#include "test.h"
4#include <pjlib.h>
5#include <pj/compat/high_precision.h>
6
7#if INCLUDE_ECHO_SERVER
8
9static pj_bool_t thread_quit_flag;
10
11struct server
12{
13 pj_pool_t *pool;
14 int sock_type;
15 int thread_count;
16 pj_ioqueue_t *ioqueue;
17 pj_sock_t sock;
18 pj_sock_t client_sock;
19 pj_ioqueue_key_t *key;
20 pj_ioqueue_callback cb;
21 char *buf;
22 pj_size_t bufsize;
23 pj_sockaddr_in addr;
24 int addrlen;
25 pj_size_t bytes_recv;
26 pj_timestamp start_time;
27};
28
29static void on_read_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_read)
30{
31 struct server *server = pj_ioqueue_get_user_data(key);
32 pj_status_t rc;
33
34 if (server->sock_type == PJ_SOCK_DGRAM) {
35 if (bytes_read > 0) {
36 /* Send data back to sender. */
37 rc = pj_ioqueue_sendto( server->ioqueue, server->key,
38 server->buf, bytes_read, 0,
39 &server->addr, server->addrlen);
40 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
41 app_perror("...sendto() error", rc);
42 }
43 } else {
44 PJ_LOG(3,("", "...read error (bytes_read=%d)", bytes_read));
45 }
46
47 /* Start next receive. */
48 rc = pj_ioqueue_recvfrom( server->ioqueue, server->key,
49 server->buf, server->bufsize, 0,
50 &server->addr, &server->addrlen);
51 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
52 app_perror("...recvfrom() error", rc);
53 }
54
55 }
56 else if (server->sock_type == PJ_SOCK_STREAM) {
57 if (bytes_read > 0) {
58 /* Send data back to sender. */
59 rc = pj_ioqueue_send( server->ioqueue, server->key,
60 server->buf, bytes_read, 0);
61 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
62 app_perror("...send() error", rc);
63 bytes_read = 0;
64 }
65 }
66
67 if (bytes_read <= 0) {
68 PJ_LOG(3,("", "...tcp closed"));
69 pj_ioqueue_unregister( server->ioqueue, server->key );
70 pj_sock_close( server->sock );
71 pj_pool_release( server->pool );
72 return;
73 }
74
75 /* Start next receive. */
76 rc = pj_ioqueue_recv( server->ioqueue, server->key,
77 server->buf, server->bufsize, 0);
78 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
79 app_perror("...recv() error", rc);
80 }
81 }
82
83 /* Add counter. */
84 if (bytes_read > 0) {
85 if (server->bytes_recv == 0) {
86 pj_get_timestamp(&server->start_time);
87 server->bytes_recv += bytes_read;
88 } else {
89 enum { USECS_IN_SECOND = 1000000 };
90 pj_timestamp now;
91 pj_uint32_t usec_elapsed;
92
93 server->bytes_recv += bytes_read;
94
95 pj_get_timestamp(&now);
96 usec_elapsed = pj_elapsed_usec(&server->start_time, &now);
97 if (usec_elapsed > USECS_IN_SECOND) {
98 if (usec_elapsed < 2 * USECS_IN_SECOND) {
99 pj_highprec_t bw;
100 pj_uint32_t bw32;
101 const char *type_name;
102
103 /* bandwidth(bw) = server->bytes_recv * USECS/elapsed */
104 bw = server->bytes_recv;
105 pj_highprec_mul(bw, USECS_IN_SECOND);
106 pj_highprec_div(bw, usec_elapsed);
107
108 bw32 = (pj_uint32_t) bw;
109
110 if (server->sock_type==PJ_SOCK_STREAM)
111 type_name = "tcp";
112 else if (server->sock_type==PJ_SOCK_DGRAM)
113 type_name = "udp";
114 else
115 type_name = "???";
116
117 PJ_LOG(3,("",
118 "...[%s:%d (%d threads)] Current bandwidth=%u KBps",
119 type_name,
120 ECHO_SERVER_START_PORT+server->thread_count,
121 server->thread_count,
122 bw32/1024));
123 }
124 server->start_time = now;
125 server->bytes_recv = 0;
126 }
127 }
128 }
129}
130
131static void on_accept_complete( pj_ioqueue_key_t *key, pj_sock_t sock,
132 int status)
133{
134 struct server *server_server = pj_ioqueue_get_user_data(key);
135 pj_status_t rc;
136
137 PJ_UNUSED_ARG(sock);
138
139 if (status == 0) {
140 pj_pool_t *pool;
141 struct server *new_server;
142
143 pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
144 new_server = pj_pool_zalloc(pool, sizeof(struct server));
145
146 new_server->pool = pool;
147 new_server->ioqueue = server_server->ioqueue;
148 new_server->sock_type = server_server->sock_type;
149 new_server->thread_count = server_server->thread_count;
150 new_server->sock = server_server->client_sock;
151 new_server->bufsize = 4096;
152 new_server->buf = pj_pool_alloc(pool, new_server->bufsize);
153 new_server->cb = server_server->cb;
154
155 rc = pj_ioqueue_register_sock( new_server->pool, new_server->ioqueue,
156 new_server->sock, new_server,
157 &server_server->cb, &new_server->key);
158 if (rc != PJ_SUCCESS) {
159 app_perror("...registering new tcp sock", rc);
160 pj_sock_close(new_server->sock);
161 pj_pool_release(pool);
162 thread_quit_flag = 1;
163 return;
164 }
165
166 rc = pj_ioqueue_recv( new_server->ioqueue, new_server->key,
167 new_server->buf, new_server->bufsize, 0);
168 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
169 app_perror("...recv() error", rc);
170 pj_sock_close(new_server->sock);
171 pj_pool_release(pool);
172 thread_quit_flag = 1;
173 return;
174 }
175 }
176
177 rc = pj_ioqueue_accept( server_server->ioqueue, server_server->key,
178 &server_server->client_sock,
179 NULL, NULL, NULL);
180 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
181 app_perror("...accept() error", rc);
182 thread_quit_flag = 1;
183 }
184}
185
186static int thread_proc(void *arg)
187{
188 pj_ioqueue_t *ioqueue = arg;
189
190 while (!thread_quit_flag) {
191 pj_time_val timeout;
192 int count;
193
194 timeout.sec = 0; timeout.msec = 50;
195 count = pj_ioqueue_poll( ioqueue, &timeout );
196 if (count > 0) {
197 count = 0;
198 }
199 }
200
201 return 0;
202}
203
204static int start_echo_server( int sock_type, int port, int thread_count )
205{
206 pj_pool_t *pool;
207 struct server *server;
208 int i;
209 pj_status_t rc;
210
211
212 pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
213 if (!pool)
214 return -10;
215
216 server = pj_pool_zalloc(pool, sizeof(struct server));
217
218 server->sock_type = sock_type;
219 server->thread_count = thread_count;
220 server->cb.on_read_complete = &on_read_complete;
221 server->cb.on_accept_complete = &on_accept_complete;
222
223 /* create ioqueue */
224 rc = pj_ioqueue_create( pool, 32, thread_count, &server->ioqueue);
225 if (rc != PJ_SUCCESS) {
226 app_perror("...error creating ioqueue", rc);
227 return -20;
228 }
229
230 /* create and register socket to ioqueue. */
231 rc = app_socket(PJ_AF_INET, sock_type, 0, port, &server->sock);
232 if (rc != PJ_SUCCESS) {
233 app_perror("...error initializing socket", rc);
234 return -30;
235 }
236
237 rc = pj_ioqueue_register_sock( pool, server->ioqueue,
238 server->sock,
239 server, &server->cb,
240 &server->key);
241 if (rc != PJ_SUCCESS) {
242 app_perror("...error registering socket to ioqueue", rc);
243 return -40;
244 }
245
246 /* create receive buffer. */
247 server->bufsize = 4096;
248 server->buf = pj_pool_alloc(pool, server->bufsize);
249
250 if (sock_type == PJ_SOCK_DGRAM) {
251 server->addrlen = sizeof(server->addr);
252 rc = pj_ioqueue_recvfrom( server->ioqueue, server->key,
253 server->buf, server->bufsize,
254 0,
255 &server->addr, &server->addrlen);
256 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
257 app_perror("...read error", rc);
258 return -50;
259 }
260 } else {
261 rc = pj_ioqueue_accept( server->ioqueue, server->key,
262 &server->client_sock, NULL, NULL, NULL );
263 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
264 app_perror("...accept() error", rc);
265 return -60;
266 }
267 }
268
269 /* create threads. */
270
271 for (i=0; i<thread_count; ++i) {
272 pj_thread_t *thread;
273 rc = pj_thread_create(pool, NULL, &thread_proc, server->ioqueue,
274 PJ_THREAD_DEFAULT_STACK_SIZE, 0, &thread);
275 if (rc != PJ_SUCCESS) {
276 app_perror("...unable to create thread", rc);
277 return -70;
278 }
279 }
280
281 /* Done. */
282 return PJ_SUCCESS;
283}
284
285int echo_server(void)
286{
287 enum { MAX_THREADS = 4 };
288 int sock_types[2];
289 int i, j, rc;
290
291 sock_types[0] = PJ_SOCK_DGRAM;
292 sock_types[1] = PJ_SOCK_STREAM;
293
294 for (i=0; i<2; ++i) {
295 for (j=0; j<MAX_THREADS; ++j) {
296 rc = start_echo_server(sock_types[i], ECHO_SERVER_START_PORT+j, j+1);
297 if (rc != 0)
298 return rc;
299 }
300 }
301
302 pj_thread_sleep(100);
303 PJ_LOG(3,("", "Echo server started in port %d - %d",
304 ECHO_SERVER_START_PORT, ECHO_SERVER_START_PORT + MAX_THREADS));
305
306 PJ_LOG(3,("", "Press Ctrl-C to quit"));
307
308 for (;!thread_quit_flag;) {
309 pj_thread_sleep(1000);
310 }
311
312 return 0;
313}
314
315
316#else
317int dummy_echo_server;
318#endif /* INCLUDE_ECHO_SERVER */
319