blob: 231facc6ddda673302e10f798edd5c92624a3092 [file] [log] [blame]
Benny Prijono5dcb38d2005-11-21 01:55:47 +00001/* $Id$ */
2/*
Benny Prijonoa771a512007-02-19 01:13:53 +00003 * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org>
Benny Prijono5dcb38d2005-11-21 01:55:47 +00004 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19#include <pjlib.h>
20#include "test.h"
21
22static pj_ioqueue_key_t *key;
23static pj_atomic_t *total_bytes;
24
25struct op_key
26{
27 pj_ioqueue_op_key_t op_key_;
28 struct op_key *peer;
29 char *buffer;
30 pj_size_t size;
31 int is_pending;
32 pj_status_t last_err;
33 pj_sockaddr_in addr;
34 int addrlen;
35};
36
37static void on_read_complete(pj_ioqueue_key_t *key,
38 pj_ioqueue_op_key_t *op_key,
39 pj_ssize_t bytes_received)
40{
41 pj_status_t rc;
42 struct op_key *recv_rec = (struct op_key *)op_key;
43
44 for (;;) {
45 struct op_key *send_rec = recv_rec->peer;
46 recv_rec->is_pending = 0;
47
48 if (bytes_received < 0) {
49 if (-bytes_received != recv_rec->last_err) {
50 recv_rec->last_err = -bytes_received;
51 app_perror("...error receiving data", -bytes_received);
52 }
53 } else if (bytes_received == 0) {
54 /* note: previous error, or write callback */
55 } else {
56 pj_atomic_add(total_bytes, bytes_received);
57
58 if (!send_rec->is_pending) {
59 pj_ssize_t sent = bytes_received;
60 pj_memcpy(send_rec->buffer, recv_rec->buffer, bytes_received);
61 pj_memcpy(&send_rec->addr, &recv_rec->addr, recv_rec->addrlen);
62 send_rec->addrlen = recv_rec->addrlen;
63 rc = pj_ioqueue_sendto(key, &send_rec->op_key_,
64 send_rec->buffer, &sent, 0,
65 &send_rec->addr, send_rec->addrlen);
66 send_rec->is_pending = (rc==PJ_EPENDING);
67
68 if (rc!=PJ_SUCCESS && rc!=PJ_EPENDING) {
69 app_perror("...send error(1)", rc);
70 }
71 }
72 }
73
74 if (!send_rec->is_pending) {
75 bytes_received = recv_rec->size;
76 rc = pj_ioqueue_recvfrom(key, &recv_rec->op_key_,
77 recv_rec->buffer, &bytes_received, 0,
78 &recv_rec->addr, &recv_rec->addrlen);
79 recv_rec->is_pending = (rc==PJ_EPENDING);
80 if (rc == PJ_SUCCESS) {
81 /* fall through next loop. */
82 } else if (rc == PJ_EPENDING) {
83 /* quit callback. */
84 break;
85 } else {
86 /* error */
87 app_perror("...recv error", rc);
88 recv_rec->last_err = rc;
89
90 bytes_received = 0;
91 /* fall through next loop. */
92 }
93 } else {
94 /* recv will be done when write completion callback is called. */
95 break;
96 }
97 }
98}
99
100static void on_write_complete(pj_ioqueue_key_t *key,
101 pj_ioqueue_op_key_t *op_key,
102 pj_ssize_t bytes_sent)
103{
104 struct op_key *send_rec = (struct op_key*)op_key;
105
106 if (bytes_sent <= 0) {
107 pj_status_t rc = -bytes_sent;
108 if (rc != send_rec->last_err) {
109 send_rec->last_err = rc;
110 app_perror("...send error(2)", rc);
111 }
112 }
113
114 send_rec->is_pending = 0;
115 on_read_complete(key, &send_rec->peer->op_key_, 0);
116}
117
118static int worker_thread(void *arg)
119{
Benny Prijonof260e462007-04-30 21:03:32 +0000120 pj_ioqueue_t *ioqueue = (pj_ioqueue_t*) arg;
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000121 struct op_key read_op, write_op;
122 char recv_buf[512], send_buf[512];
123 pj_ssize_t length;
124 pj_status_t rc;
125
126 read_op.peer = &write_op;
127 read_op.is_pending = 0;
128 read_op.last_err = 0;
129 read_op.buffer = recv_buf;
130 read_op.size = sizeof(recv_buf);
131 read_op.addrlen = sizeof(read_op.addr);
132
133 write_op.peer = &read_op;
134 write_op.is_pending = 0;
135 write_op.last_err = 0;
136 write_op.buffer = send_buf;
137 write_op.size = sizeof(send_buf);
138
139 length = sizeof(recv_buf);
140 rc = pj_ioqueue_recvfrom(key, &read_op.op_key_, recv_buf, &length, 0,
141 &read_op.addr, &read_op.addrlen);
142 if (rc == PJ_SUCCESS) {
143 read_op.is_pending = 1;
144 on_read_complete(key, &read_op.op_key_, length);
145 }
146
147 for (;;) {
148 pj_time_val timeout;
149 timeout.sec = 0; timeout.msec = 10;
150 rc = pj_ioqueue_poll(ioqueue, &timeout);
151 }
Benny Prijonod8410532006-06-15 11:04:33 +0000152 return 0;
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000153}
154
155int udp_echo_srv_ioqueue(void)
156{
157 pj_pool_t *pool;
158 pj_sock_t sock;
159 pj_ioqueue_t *ioqueue;
160 pj_ioqueue_callback callback;
161 int i;
162 pj_thread_t *thread[ECHO_SERVER_MAX_THREADS];
163 pj_status_t rc;
164
Benny Prijonoac623b32006-07-03 15:19:31 +0000165 pj_bzero(&callback, sizeof(callback));
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000166 callback.on_read_complete = &on_read_complete;
167 callback.on_write_complete = &on_write_complete;
168
169 pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
170 if (!pool)
171 return -10;
172
173 rc = pj_ioqueue_create(pool, 2, &ioqueue);
174 if (rc != PJ_SUCCESS) {
175 app_perror("...pj_ioqueue_create error", rc);
176 return -20;
177 }
178
Benny Prijono8ab968f2007-07-20 08:08:30 +0000179 rc = app_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0,
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000180 ECHO_SERVER_START_PORT, &sock);
181 if (rc != PJ_SUCCESS) {
182 app_perror("...app_socket error", rc);
183 return -30;
184 }
185
186 rc = pj_ioqueue_register_sock(pool, ioqueue, sock, NULL,
187 &callback, &key);
188 if (rc != PJ_SUCCESS) {
189 app_perror("...error registering socket", rc);
190 return -40;
191 }
192
193 rc = pj_atomic_create(pool, 0, &total_bytes);
194 if (rc != PJ_SUCCESS) {
195 app_perror("...error creating atomic variable", rc);
196 return -45;
197 }
198
199 for (i=0; i<ECHO_SERVER_MAX_THREADS; ++i) {
200 rc = pj_thread_create(pool, NULL, &worker_thread, ioqueue,
201 PJ_THREAD_DEFAULT_STACK_SIZE, 0,
202 &thread[i]);
203 if (rc != PJ_SUCCESS) {
204 app_perror("...create thread error", rc);
205 return -50;
206 }
207 }
208
209 echo_srv_common_loop(total_bytes);
210
211 return 0;
212}