blob: bd701655471ac6d3c7170f76bf1c058286d554ff [file] [log] [blame]
Tristan Matthews0a329cc2013-07-17 13:20:14 -04001/* $Id$ */
2/*
3 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20#include <pjlib.h>
21#include "test.h"
22
23static pj_ioqueue_key_t *key;
24static pj_atomic_t *total_bytes;
25static pj_bool_t thread_quit_flag;
26
27struct op_key
28{
29 pj_ioqueue_op_key_t op_key_;
30 struct op_key *peer;
31 char *buffer;
32 pj_size_t size;
33 int is_pending;
34 pj_status_t last_err;
35 pj_sockaddr_in addr;
36 int addrlen;
37};
38
39static void on_read_complete(pj_ioqueue_key_t *key,
40 pj_ioqueue_op_key_t *op_key,
41 pj_ssize_t bytes_received)
42{
43 pj_status_t rc;
44 struct op_key *recv_rec = (struct op_key *)op_key;
45
46 for (;;) {
47 struct op_key *send_rec = recv_rec->peer;
48 recv_rec->is_pending = 0;
49
50 if (bytes_received < 0) {
51 if (-bytes_received != recv_rec->last_err) {
52 recv_rec->last_err = (pj_status_t)-bytes_received;
53 app_perror("...error receiving data", recv_rec->last_err);
54 }
55 } else if (bytes_received == 0) {
56 /* note: previous error, or write callback */
57 } else {
58 pj_atomic_add(total_bytes, (pj_atomic_value_t)bytes_received);
59
60 if (!send_rec->is_pending) {
61 pj_ssize_t sent = bytes_received;
62 pj_memcpy(send_rec->buffer, recv_rec->buffer, bytes_received);
63 pj_memcpy(&send_rec->addr, &recv_rec->addr, recv_rec->addrlen);
64 send_rec->addrlen = recv_rec->addrlen;
65 rc = pj_ioqueue_sendto(key, &send_rec->op_key_,
66 send_rec->buffer, &sent, 0,
67 &send_rec->addr, send_rec->addrlen);
68 send_rec->is_pending = (rc==PJ_EPENDING);
69
70 if (rc!=PJ_SUCCESS && rc!=PJ_EPENDING) {
71 app_perror("...send error(1)", rc);
72 }
73 }
74 }
75
76 if (!send_rec->is_pending) {
77 bytes_received = recv_rec->size;
78 rc = pj_ioqueue_recvfrom(key, &recv_rec->op_key_,
79 recv_rec->buffer, &bytes_received, 0,
80 &recv_rec->addr, &recv_rec->addrlen);
81 recv_rec->is_pending = (rc==PJ_EPENDING);
82 if (rc == PJ_SUCCESS) {
83 /* fall through next loop. */
84 } else if (rc == PJ_EPENDING) {
85 /* quit callback. */
86 break;
87 } else {
88 /* error */
89 app_perror("...recv error", rc);
90 recv_rec->last_err = rc;
91
92 bytes_received = 0;
93 /* fall through next loop. */
94 }
95 } else {
96 /* recv will be done when write completion callback is called. */
97 break;
98 }
99 }
100}
101
102static void on_write_complete(pj_ioqueue_key_t *key,
103 pj_ioqueue_op_key_t *op_key,
104 pj_ssize_t bytes_sent)
105{
106 struct op_key *send_rec = (struct op_key*)op_key;
107
108 if (bytes_sent <= 0) {
109 pj_status_t rc = (pj_status_t)-bytes_sent;
110 if (rc != send_rec->last_err) {
111 send_rec->last_err = rc;
112 app_perror("...send error(2)", rc);
113 }
114 }
115
116 send_rec->is_pending = 0;
117 on_read_complete(key, &send_rec->peer->op_key_, 0);
118}
119
120static int worker_thread(void *arg)
121{
122 pj_ioqueue_t *ioqueue = (pj_ioqueue_t*) arg;
123 struct op_key read_op, write_op;
124 char recv_buf[512], send_buf[512];
125 pj_ssize_t length;
126 pj_status_t rc;
127
128 read_op.peer = &write_op;
129 read_op.is_pending = 0;
130 read_op.last_err = 0;
131 read_op.buffer = recv_buf;
132 read_op.size = sizeof(recv_buf);
133 read_op.addrlen = sizeof(read_op.addr);
134
135 write_op.peer = &read_op;
136 write_op.is_pending = 0;
137 write_op.last_err = 0;
138 write_op.buffer = send_buf;
139 write_op.size = sizeof(send_buf);
140
141 length = sizeof(recv_buf);
142 rc = pj_ioqueue_recvfrom(key, &read_op.op_key_, recv_buf, &length, 0,
143 &read_op.addr, &read_op.addrlen);
144 if (rc == PJ_SUCCESS) {
145 read_op.is_pending = 1;
146 on_read_complete(key, &read_op.op_key_, length);
147 }
148
149 while (!thread_quit_flag) {
150 pj_time_val timeout;
151 timeout.sec = 0; timeout.msec = 10;
152 rc = pj_ioqueue_poll(ioqueue, &timeout);
153 }
154 return 0;
155}
156
157int udp_echo_srv_ioqueue(void)
158{
159 pj_pool_t *pool;
160 pj_sock_t sock;
161 pj_ioqueue_t *ioqueue;
162 pj_ioqueue_callback callback;
163 int i;
164 pj_thread_t *thread[ECHO_SERVER_MAX_THREADS];
165 pj_status_t rc;
166
167 pj_bzero(&callback, sizeof(callback));
168 callback.on_read_complete = &on_read_complete;
169 callback.on_write_complete = &on_write_complete;
170
171 pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
172 if (!pool)
173 return -10;
174
175 rc = pj_ioqueue_create(pool, 2, &ioqueue);
176 if (rc != PJ_SUCCESS) {
177 app_perror("...pj_ioqueue_create error", rc);
178 return -20;
179 }
180
181 rc = app_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0,
182 ECHO_SERVER_START_PORT, &sock);
183 if (rc != PJ_SUCCESS) {
184 app_perror("...app_socket error", rc);
185 return -30;
186 }
187
188 rc = pj_ioqueue_register_sock(pool, ioqueue, sock, NULL,
189 &callback, &key);
190 if (rc != PJ_SUCCESS) {
191 app_perror("...error registering socket", rc);
192 return -40;
193 }
194
195 rc = pj_atomic_create(pool, 0, &total_bytes);
196 if (rc != PJ_SUCCESS) {
197 app_perror("...error creating atomic variable", rc);
198 return -45;
199 }
200
201 for (i=0; i<ECHO_SERVER_MAX_THREADS; ++i) {
202 rc = pj_thread_create(pool, NULL, &worker_thread, ioqueue,
203 PJ_THREAD_DEFAULT_STACK_SIZE, 0,
204 &thread[i]);
205 if (rc != PJ_SUCCESS) {
206 app_perror("...create thread error", rc);
207 return -50;
208 }
209 }
210
211 echo_srv_common_loop(total_bytes);
212
213 return 0;
214}