blob: f23350e3a24cb525215b98e6dcf25d77dc6887e9 [file] [log] [blame]
Benny Prijono5dcb38d2005-11-21 01:55:47 +00001/* $Id$ */
2/*
Benny Prijono844653c2008-12-23 17:27:53 +00003 * Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono5dcb38d2005-11-21 01:55:47 +00005 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20#include <pjlib.h>
21#include "test.h"
22
23static pj_ioqueue_key_t *key;
24static pj_atomic_t *total_bytes;
25
26struct op_key
27{
28 pj_ioqueue_op_key_t op_key_;
29 struct op_key *peer;
30 char *buffer;
31 pj_size_t size;
32 int is_pending;
33 pj_status_t last_err;
34 pj_sockaddr_in addr;
35 int addrlen;
36};
37
38static void on_read_complete(pj_ioqueue_key_t *key,
39 pj_ioqueue_op_key_t *op_key,
40 pj_ssize_t bytes_received)
41{
42 pj_status_t rc;
43 struct op_key *recv_rec = (struct op_key *)op_key;
44
45 for (;;) {
46 struct op_key *send_rec = recv_rec->peer;
47 recv_rec->is_pending = 0;
48
49 if (bytes_received < 0) {
50 if (-bytes_received != recv_rec->last_err) {
51 recv_rec->last_err = -bytes_received;
52 app_perror("...error receiving data", -bytes_received);
53 }
54 } else if (bytes_received == 0) {
55 /* note: previous error, or write callback */
56 } else {
57 pj_atomic_add(total_bytes, bytes_received);
58
59 if (!send_rec->is_pending) {
60 pj_ssize_t sent = bytes_received;
61 pj_memcpy(send_rec->buffer, recv_rec->buffer, bytes_received);
62 pj_memcpy(&send_rec->addr, &recv_rec->addr, recv_rec->addrlen);
63 send_rec->addrlen = recv_rec->addrlen;
64 rc = pj_ioqueue_sendto(key, &send_rec->op_key_,
65 send_rec->buffer, &sent, 0,
66 &send_rec->addr, send_rec->addrlen);
67 send_rec->is_pending = (rc==PJ_EPENDING);
68
69 if (rc!=PJ_SUCCESS && rc!=PJ_EPENDING) {
70 app_perror("...send error(1)", rc);
71 }
72 }
73 }
74
75 if (!send_rec->is_pending) {
76 bytes_received = recv_rec->size;
77 rc = pj_ioqueue_recvfrom(key, &recv_rec->op_key_,
78 recv_rec->buffer, &bytes_received, 0,
79 &recv_rec->addr, &recv_rec->addrlen);
80 recv_rec->is_pending = (rc==PJ_EPENDING);
81 if (rc == PJ_SUCCESS) {
82 /* fall through next loop. */
83 } else if (rc == PJ_EPENDING) {
84 /* quit callback. */
85 break;
86 } else {
87 /* error */
88 app_perror("...recv error", rc);
89 recv_rec->last_err = rc;
90
91 bytes_received = 0;
92 /* fall through next loop. */
93 }
94 } else {
95 /* recv will be done when write completion callback is called. */
96 break;
97 }
98 }
99}
100
101static void on_write_complete(pj_ioqueue_key_t *key,
102 pj_ioqueue_op_key_t *op_key,
103 pj_ssize_t bytes_sent)
104{
105 struct op_key *send_rec = (struct op_key*)op_key;
106
107 if (bytes_sent <= 0) {
108 pj_status_t rc = -bytes_sent;
109 if (rc != send_rec->last_err) {
110 send_rec->last_err = rc;
111 app_perror("...send error(2)", rc);
112 }
113 }
114
115 send_rec->is_pending = 0;
116 on_read_complete(key, &send_rec->peer->op_key_, 0);
117}
118
119static int worker_thread(void *arg)
120{
Benny Prijonof260e462007-04-30 21:03:32 +0000121 pj_ioqueue_t *ioqueue = (pj_ioqueue_t*) arg;
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000122 struct op_key read_op, write_op;
123 char recv_buf[512], send_buf[512];
124 pj_ssize_t length;
125 pj_status_t rc;
126
127 read_op.peer = &write_op;
128 read_op.is_pending = 0;
129 read_op.last_err = 0;
130 read_op.buffer = recv_buf;
131 read_op.size = sizeof(recv_buf);
132 read_op.addrlen = sizeof(read_op.addr);
133
134 write_op.peer = &read_op;
135 write_op.is_pending = 0;
136 write_op.last_err = 0;
137 write_op.buffer = send_buf;
138 write_op.size = sizeof(send_buf);
139
140 length = sizeof(recv_buf);
141 rc = pj_ioqueue_recvfrom(key, &read_op.op_key_, recv_buf, &length, 0,
142 &read_op.addr, &read_op.addrlen);
143 if (rc == PJ_SUCCESS) {
144 read_op.is_pending = 1;
145 on_read_complete(key, &read_op.op_key_, length);
146 }
147
148 for (;;) {
149 pj_time_val timeout;
150 timeout.sec = 0; timeout.msec = 10;
151 rc = pj_ioqueue_poll(ioqueue, &timeout);
152 }
Benny Prijonod8410532006-06-15 11:04:33 +0000153 return 0;
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000154}
155
156int udp_echo_srv_ioqueue(void)
157{
158 pj_pool_t *pool;
159 pj_sock_t sock;
160 pj_ioqueue_t *ioqueue;
161 pj_ioqueue_callback callback;
162 int i;
163 pj_thread_t *thread[ECHO_SERVER_MAX_THREADS];
164 pj_status_t rc;
165
Benny Prijonoac623b32006-07-03 15:19:31 +0000166 pj_bzero(&callback, sizeof(callback));
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000167 callback.on_read_complete = &on_read_complete;
168 callback.on_write_complete = &on_write_complete;
169
170 pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
171 if (!pool)
172 return -10;
173
174 rc = pj_ioqueue_create(pool, 2, &ioqueue);
175 if (rc != PJ_SUCCESS) {
176 app_perror("...pj_ioqueue_create error", rc);
177 return -20;
178 }
179
Benny Prijono8ab968f2007-07-20 08:08:30 +0000180 rc = app_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0,
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000181 ECHO_SERVER_START_PORT, &sock);
182 if (rc != PJ_SUCCESS) {
183 app_perror("...app_socket error", rc);
184 return -30;
185 }
186
187 rc = pj_ioqueue_register_sock(pool, ioqueue, sock, NULL,
188 &callback, &key);
189 if (rc != PJ_SUCCESS) {
190 app_perror("...error registering socket", rc);
191 return -40;
192 }
193
194 rc = pj_atomic_create(pool, 0, &total_bytes);
195 if (rc != PJ_SUCCESS) {
196 app_perror("...error creating atomic variable", rc);
197 return -45;
198 }
199
200 for (i=0; i<ECHO_SERVER_MAX_THREADS; ++i) {
201 rc = pj_thread_create(pool, NULL, &worker_thread, ioqueue,
202 PJ_THREAD_DEFAULT_STACK_SIZE, 0,
203 &thread[i]);
204 if (rc != PJ_SUCCESS) {
205 app_perror("...create thread error", rc);
206 return -50;
207 }
208 }
209
210 echo_srv_common_loop(total_bytes);
211
212 return 0;
213}