blob: bd4f4bf313bf233af3b17482d067790e0abaf855 [file] [log] [blame]
Benny Prijonodd859a62005-11-01 16:42:51 +00001/* $Header: /pjproject-0.3/pjlib/src/pjlib-test/echo_srv.c 3 10/29/05 10:23p Bennylp $ */
2/*
3 * $Log: /pjproject-0.3/pjlib/src/pjlib-test/echo_srv.c $
4 *
5 * 3 10/29/05 10:23p Bennylp
6 * Changed ioqueue accept specification.
7 *
8 * 2 10/29/05 11:51a Bennylp
9 * Version 0.3-pre2.
10 *
11 * 1 10/24/05 11:28a Bennylp
12 * Created.
13 *
14 */
15#include "test.h"
16#include <pjlib.h>
17#include <pj/compat/high_precision.h>
18
19#if INCLUDE_ECHO_SERVER
20
21static pj_bool_t thread_quit_flag;
22
23struct server
24{
25 pj_pool_t *pool;
26 int sock_type;
27 int thread_count;
28 pj_ioqueue_t *ioqueue;
29 pj_sock_t sock;
30 pj_sock_t client_sock;
31 pj_ioqueue_key_t *key;
32 pj_ioqueue_callback cb;
33 char *buf;
34 pj_size_t bufsize;
35 pj_sockaddr_in addr;
36 int addrlen;
37 pj_size_t bytes_recv;
38 pj_timestamp start_time;
39};
40
41static void on_read_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_read)
42{
43 struct server *server = pj_ioqueue_get_user_data(key);
44 pj_status_t rc;
45
46 if (server->sock_type == PJ_SOCK_DGRAM) {
47 if (bytes_read > 0) {
48 /* Send data back to sender. */
49 rc = pj_ioqueue_sendto( server->ioqueue, server->key,
50 server->buf, bytes_read, 0,
51 &server->addr, server->addrlen);
52 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
53 app_perror("...sendto() error", rc);
54 }
55 } else {
56 PJ_LOG(3,("", "...read error (bytes_read=%d)", bytes_read));
57 }
58
59 /* Start next receive. */
60 rc = pj_ioqueue_recvfrom( server->ioqueue, server->key,
61 server->buf, server->bufsize, 0,
62 &server->addr, &server->addrlen);
63 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
64 app_perror("...recvfrom() error", rc);
65 }
66
67 }
68 else if (server->sock_type == PJ_SOCK_STREAM) {
69 if (bytes_read > 0) {
70 /* Send data back to sender. */
71 rc = pj_ioqueue_send( server->ioqueue, server->key,
72 server->buf, bytes_read, 0);
73 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
74 app_perror("...send() error", rc);
75 bytes_read = 0;
76 }
77 }
78
79 if (bytes_read <= 0) {
80 PJ_LOG(3,("", "...tcp closed"));
81 pj_ioqueue_unregister( server->ioqueue, server->key );
82 pj_sock_close( server->sock );
83 pj_pool_release( server->pool );
84 return;
85 }
86
87 /* Start next receive. */
88 rc = pj_ioqueue_recv( server->ioqueue, server->key,
89 server->buf, server->bufsize, 0);
90 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
91 app_perror("...recv() error", rc);
92 }
93 }
94
95 /* Add counter. */
96 if (bytes_read > 0) {
97 if (server->bytes_recv == 0) {
98 pj_get_timestamp(&server->start_time);
99 server->bytes_recv += bytes_read;
100 } else {
101 enum { USECS_IN_SECOND = 1000000 };
102 pj_timestamp now;
103 pj_uint32_t usec_elapsed;
104
105 server->bytes_recv += bytes_read;
106
107 pj_get_timestamp(&now);
108 usec_elapsed = pj_elapsed_usec(&server->start_time, &now);
109 if (usec_elapsed > USECS_IN_SECOND) {
110 if (usec_elapsed < 2 * USECS_IN_SECOND) {
111 pj_highprec_t bw;
112 pj_uint32_t bw32;
113 const char *type_name;
114
115 /* bandwidth(bw) = server->bytes_recv * USECS/elapsed */
116 bw = server->bytes_recv;
117 pj_highprec_mul(bw, USECS_IN_SECOND);
118 pj_highprec_div(bw, usec_elapsed);
119
120 bw32 = (pj_uint32_t) bw;
121
122 if (server->sock_type==PJ_SOCK_STREAM)
123 type_name = "tcp";
124 else if (server->sock_type==PJ_SOCK_DGRAM)
125 type_name = "udp";
126 else
127 type_name = "???";
128
129 PJ_LOG(3,("",
130 "...[%s:%d (%d threads)] Current bandwidth=%u KBps",
131 type_name,
132 ECHO_SERVER_START_PORT+server->thread_count,
133 server->thread_count,
134 bw32/1024));
135 }
136 server->start_time = now;
137 server->bytes_recv = 0;
138 }
139 }
140 }
141}
142
143static void on_accept_complete( pj_ioqueue_key_t *key, pj_sock_t sock,
144 int status)
145{
146 struct server *server_server = pj_ioqueue_get_user_data(key);
147 pj_status_t rc;
148
149 PJ_UNUSED_ARG(sock);
150
151 if (status == 0) {
152 pj_pool_t *pool;
153 struct server *new_server;
154
155 pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
156 new_server = pj_pool_zalloc(pool, sizeof(struct server));
157
158 new_server->pool = pool;
159 new_server->ioqueue = server_server->ioqueue;
160 new_server->sock_type = server_server->sock_type;
161 new_server->thread_count = server_server->thread_count;
162 new_server->sock = server_server->client_sock;
163 new_server->bufsize = 4096;
164 new_server->buf = pj_pool_alloc(pool, new_server->bufsize);
165 new_server->cb = server_server->cb;
166
167 rc = pj_ioqueue_register_sock( new_server->pool, new_server->ioqueue,
168 new_server->sock, new_server,
169 &server_server->cb, &new_server->key);
170 if (rc != PJ_SUCCESS) {
171 app_perror("...registering new tcp sock", rc);
172 pj_sock_close(new_server->sock);
173 pj_pool_release(pool);
174 thread_quit_flag = 1;
175 return;
176 }
177
178 rc = pj_ioqueue_recv( new_server->ioqueue, new_server->key,
179 new_server->buf, new_server->bufsize, 0);
180 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
181 app_perror("...recv() error", rc);
182 pj_sock_close(new_server->sock);
183 pj_pool_release(pool);
184 thread_quit_flag = 1;
185 return;
186 }
187 }
188
189 rc = pj_ioqueue_accept( server_server->ioqueue, server_server->key,
190 &server_server->client_sock,
191 NULL, NULL, NULL);
192 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
193 app_perror("...accept() error", rc);
194 thread_quit_flag = 1;
195 }
196}
197
198static int thread_proc(void *arg)
199{
200 pj_ioqueue_t *ioqueue = arg;
201
202 while (!thread_quit_flag) {
203 pj_time_val timeout;
204 int count;
205
206 timeout.sec = 0; timeout.msec = 50;
207 count = pj_ioqueue_poll( ioqueue, &timeout );
208 if (count > 0) {
209 count = 0;
210 }
211 }
212
213 return 0;
214}
215
216static int start_echo_server( int sock_type, int port, int thread_count )
217{
218 pj_pool_t *pool;
219 struct server *server;
220 int i;
221 pj_status_t rc;
222
223
224 pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
225 if (!pool)
226 return -10;
227
228 server = pj_pool_zalloc(pool, sizeof(struct server));
229
230 server->sock_type = sock_type;
231 server->thread_count = thread_count;
232 server->cb.on_read_complete = &on_read_complete;
233 server->cb.on_accept_complete = &on_accept_complete;
234
235 /* create ioqueue */
236 rc = pj_ioqueue_create( pool, 32, thread_count, &server->ioqueue);
237 if (rc != PJ_SUCCESS) {
238 app_perror("...error creating ioqueue", rc);
239 return -20;
240 }
241
242 /* create and register socket to ioqueue. */
243 rc = app_socket(PJ_AF_INET, sock_type, 0, port, &server->sock);
244 if (rc != PJ_SUCCESS) {
245 app_perror("...error initializing socket", rc);
246 return -30;
247 }
248
249 rc = pj_ioqueue_register_sock( pool, server->ioqueue,
250 server->sock,
251 server, &server->cb,
252 &server->key);
253 if (rc != PJ_SUCCESS) {
254 app_perror("...error registering socket to ioqueue", rc);
255 return -40;
256 }
257
258 /* create receive buffer. */
259 server->bufsize = 4096;
260 server->buf = pj_pool_alloc(pool, server->bufsize);
261
262 if (sock_type == PJ_SOCK_DGRAM) {
263 server->addrlen = sizeof(server->addr);
264 rc = pj_ioqueue_recvfrom( server->ioqueue, server->key,
265 server->buf, server->bufsize,
266 0,
267 &server->addr, &server->addrlen);
268 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
269 app_perror("...read error", rc);
270 return -50;
271 }
272 } else {
273 rc = pj_ioqueue_accept( server->ioqueue, server->key,
274 &server->client_sock, NULL, NULL, NULL );
275 if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
276 app_perror("...accept() error", rc);
277 return -60;
278 }
279 }
280
281 /* create threads. */
282
283 for (i=0; i<thread_count; ++i) {
284 pj_thread_t *thread;
285 rc = pj_thread_create(pool, NULL, &thread_proc, server->ioqueue,
286 PJ_THREAD_DEFAULT_STACK_SIZE, 0, &thread);
287 if (rc != PJ_SUCCESS) {
288 app_perror("...unable to create thread", rc);
289 return -70;
290 }
291 }
292
293 /* Done. */
294 return PJ_SUCCESS;
295}
296
297int echo_server(void)
298{
299 enum { MAX_THREADS = 4 };
300 int sock_types[2];
301 int i, j, rc;
302
303 sock_types[0] = PJ_SOCK_DGRAM;
304 sock_types[1] = PJ_SOCK_STREAM;
305
306 for (i=0; i<2; ++i) {
307 for (j=0; j<MAX_THREADS; ++j) {
308 rc = start_echo_server(sock_types[i], ECHO_SERVER_START_PORT+j, j+1);
309 if (rc != 0)
310 return rc;
311 }
312 }
313
314 pj_thread_sleep(100);
315 PJ_LOG(3,("", "Echo server started in port %d - %d",
316 ECHO_SERVER_START_PORT, ECHO_SERVER_START_PORT + MAX_THREADS));
317
318 PJ_LOG(3,("", "Press Ctrl-C to quit"));
319
320 for (;!thread_quit_flag;) {
321 pj_thread_sleep(1000);
322 }
323
324 return 0;
325}
326
327
328#else
329int dummy_echo_server;
330#endif /* INCLUDE_ECHO_SERVER */
331