blob: 8ede493be57f93a259119276bf9203b8079b027f [file] [log] [blame]
Benny Prijono5dcb38d2005-11-21 01:55:47 +00001/* $Id$ */
2/*
3 * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19#include <pjlib.h>
20#include "test.h"
21
22static pj_ioqueue_key_t *key;
23static pj_atomic_t *total_bytes;
24
25struct op_key
26{
27 pj_ioqueue_op_key_t op_key_;
28 struct op_key *peer;
29 char *buffer;
30 pj_size_t size;
31 int is_pending;
32 pj_status_t last_err;
33 pj_sockaddr_in addr;
34 int addrlen;
35};
36
37static void on_read_complete(pj_ioqueue_key_t *key,
38 pj_ioqueue_op_key_t *op_key,
39 pj_ssize_t bytes_received)
40{
41 pj_status_t rc;
42 struct op_key *recv_rec = (struct op_key *)op_key;
43
44 for (;;) {
45 struct op_key *send_rec = recv_rec->peer;
46 recv_rec->is_pending = 0;
47
48 if (bytes_received < 0) {
49 if (-bytes_received != recv_rec->last_err) {
50 recv_rec->last_err = -bytes_received;
51 app_perror("...error receiving data", -bytes_received);
52 }
53 } else if (bytes_received == 0) {
54 /* note: previous error, or write callback */
55 } else {
56 pj_atomic_add(total_bytes, bytes_received);
57
58 if (!send_rec->is_pending) {
59 pj_ssize_t sent = bytes_received;
60 pj_memcpy(send_rec->buffer, recv_rec->buffer, bytes_received);
61 pj_memcpy(&send_rec->addr, &recv_rec->addr, recv_rec->addrlen);
62 send_rec->addrlen = recv_rec->addrlen;
63 rc = pj_ioqueue_sendto(key, &send_rec->op_key_,
64 send_rec->buffer, &sent, 0,
65 &send_rec->addr, send_rec->addrlen);
66 send_rec->is_pending = (rc==PJ_EPENDING);
67
68 if (rc!=PJ_SUCCESS && rc!=PJ_EPENDING) {
69 app_perror("...send error(1)", rc);
70 }
71 }
72 }
73
74 if (!send_rec->is_pending) {
75 bytes_received = recv_rec->size;
76 rc = pj_ioqueue_recvfrom(key, &recv_rec->op_key_,
77 recv_rec->buffer, &bytes_received, 0,
78 &recv_rec->addr, &recv_rec->addrlen);
79 recv_rec->is_pending = (rc==PJ_EPENDING);
80 if (rc == PJ_SUCCESS) {
81 /* fall through next loop. */
82 } else if (rc == PJ_EPENDING) {
83 /* quit callback. */
84 break;
85 } else {
86 /* error */
87 app_perror("...recv error", rc);
88 recv_rec->last_err = rc;
89
90 bytes_received = 0;
91 /* fall through next loop. */
92 }
93 } else {
94 /* recv will be done when write completion callback is called. */
95 break;
96 }
97 }
98}
99
100static void on_write_complete(pj_ioqueue_key_t *key,
101 pj_ioqueue_op_key_t *op_key,
102 pj_ssize_t bytes_sent)
103{
104 struct op_key *send_rec = (struct op_key*)op_key;
105
106 if (bytes_sent <= 0) {
107 pj_status_t rc = -bytes_sent;
108 if (rc != send_rec->last_err) {
109 send_rec->last_err = rc;
110 app_perror("...send error(2)", rc);
111 }
112 }
113
114 send_rec->is_pending = 0;
115 on_read_complete(key, &send_rec->peer->op_key_, 0);
116}
117
118static int worker_thread(void *arg)
119{
120 pj_ioqueue_t *ioqueue = arg;
121 struct op_key read_op, write_op;
122 char recv_buf[512], send_buf[512];
123 pj_ssize_t length;
124 pj_status_t rc;
125
126 read_op.peer = &write_op;
127 read_op.is_pending = 0;
128 read_op.last_err = 0;
129 read_op.buffer = recv_buf;
130 read_op.size = sizeof(recv_buf);
131 read_op.addrlen = sizeof(read_op.addr);
132
133 write_op.peer = &read_op;
134 write_op.is_pending = 0;
135 write_op.last_err = 0;
136 write_op.buffer = send_buf;
137 write_op.size = sizeof(send_buf);
138
139 length = sizeof(recv_buf);
140 rc = pj_ioqueue_recvfrom(key, &read_op.op_key_, recv_buf, &length, 0,
141 &read_op.addr, &read_op.addrlen);
142 if (rc == PJ_SUCCESS) {
143 read_op.is_pending = 1;
144 on_read_complete(key, &read_op.op_key_, length);
145 }
146
147 for (;;) {
148 pj_time_val timeout;
149 timeout.sec = 0; timeout.msec = 10;
150 rc = pj_ioqueue_poll(ioqueue, &timeout);
151 }
152}
153
154int udp_echo_srv_ioqueue(void)
155{
156 pj_pool_t *pool;
157 pj_sock_t sock;
158 pj_ioqueue_t *ioqueue;
159 pj_ioqueue_callback callback;
160 int i;
161 pj_thread_t *thread[ECHO_SERVER_MAX_THREADS];
162 pj_status_t rc;
163
164 pj_memset(&callback, 0, sizeof(callback));
165 callback.on_read_complete = &on_read_complete;
166 callback.on_write_complete = &on_write_complete;
167
168 pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
169 if (!pool)
170 return -10;
171
172 rc = pj_ioqueue_create(pool, 2, &ioqueue);
173 if (rc != PJ_SUCCESS) {
174 app_perror("...pj_ioqueue_create error", rc);
175 return -20;
176 }
177
178 rc = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0,
179 ECHO_SERVER_START_PORT, &sock);
180 if (rc != PJ_SUCCESS) {
181 app_perror("...app_socket error", rc);
182 return -30;
183 }
184
185 rc = pj_ioqueue_register_sock(pool, ioqueue, sock, NULL,
186 &callback, &key);
187 if (rc != PJ_SUCCESS) {
188 app_perror("...error registering socket", rc);
189 return -40;
190 }
191
192 rc = pj_atomic_create(pool, 0, &total_bytes);
193 if (rc != PJ_SUCCESS) {
194 app_perror("...error creating atomic variable", rc);
195 return -45;
196 }
197
198 for (i=0; i<ECHO_SERVER_MAX_THREADS; ++i) {
199 rc = pj_thread_create(pool, NULL, &worker_thread, ioqueue,
200 PJ_THREAD_DEFAULT_STACK_SIZE, 0,
201 &thread[i]);
202 if (rc != PJ_SUCCESS) {
203 app_perror("...create thread error", rc);
204 return -50;
205 }
206 }
207
208 echo_srv_common_loop(total_bytes);
209
210 return 0;
211}