blob: 8e098ed10f330da359dc05c39eed9ffacd9c52eb [file] [log] [blame]
Benny Prijono4bac2c12008-05-11 18:12:16 +00001/* $Id$ */
2/*
3 * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19#include <pj/activesock.h>
20#include <pj/assert.h>
21#include <pj/errno.h>
22#include <pj/pool.h>
23#include <pj/sock.h>
24#include <pj/string.h>
25
26#define PJ_ACTIVESOCK_MAX_LOOP 50
27
28
29enum read_type
30{
31 TYPE_NONE,
32 TYPE_RECV,
33 TYPE_RECV_FROM
34};
35
36struct read_op
37{
38 pj_ioqueue_op_key_t op_key;
39 pj_uint8_t *pkt;
40 unsigned max_size;
41 pj_size_t size;
42 pj_sockaddr src_addr;
43 int src_addr_len;
44};
45
46struct accept_op
47{
48 pj_ioqueue_op_key_t op_key;
49 pj_sock_t new_sock;
50 pj_sockaddr rem_addr;
51 int rem_addr_len;
52};
53
54struct pj_activesock_t
55{
56 pj_ioqueue_key_t *key;
57 pj_bool_t stream_oriented;
58 pj_ioqueue_t *ioqueue;
59 void *user_data;
60 unsigned async_count;
61 unsigned max_loop;
62 pj_activesock_cb cb;
63
64 struct read_op *read_op;
65 pj_uint32_t read_flags;
66 enum read_type read_type;
67
68 struct accept_op *accept_op;
69};
70
71
72static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
73 pj_ioqueue_op_key_t *op_key,
74 pj_ssize_t bytes_read);
75static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
76 pj_ioqueue_op_key_t *op_key,
77 pj_ssize_t bytes_sent);
78static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
79 pj_ioqueue_op_key_t *op_key,
80 pj_sock_t sock,
81 pj_status_t status);
82static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
83 pj_status_t status);
84
85
86PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg)
87{
88 pj_bzero(cfg, sizeof(*cfg));
89 cfg->async_cnt = 1;
90 cfg->concurrency = -1;
91}
92
93
94PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
95 pj_sock_t sock,
96 int sock_type,
97 const pj_activesock_cfg *opt,
98 pj_ioqueue_t *ioqueue,
99 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000100 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000101 pj_activesock_t **p_asock)
102{
103 pj_activesock_t *asock;
104 pj_ioqueue_callback ioq_cb;
105 pj_status_t status;
106
107 PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL);
108 PJ_ASSERT_RETURN(sock!=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL);
109 PJ_ASSERT_RETURN(sock_type==pj_SOCK_STREAM() ||
110 sock_type==pj_SOCK_DGRAM(), PJ_EINVAL);
111 PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL);
112
113 asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t);
114 asock->ioqueue = ioqueue;
115 asock->stream_oriented = (sock_type == pj_SOCK_STREAM());
116 asock->async_count = (opt? opt->async_cnt : 1);
117 asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP;
Benny Prijonoea8e4362008-06-06 14:12:23 +0000118 asock->user_data = user_data;
Benny Prijono4bac2c12008-05-11 18:12:16 +0000119 pj_memcpy(&asock->cb, cb, sizeof(*cb));
120
121 pj_bzero(&ioq_cb, sizeof(ioq_cb));
122 ioq_cb.on_read_complete = &ioqueue_on_read_complete;
123 ioq_cb.on_write_complete = &ioqueue_on_write_complete;
124 ioq_cb.on_connect_complete = &ioqueue_on_connect_complete;
125 ioq_cb.on_accept_complete = &ioqueue_on_accept_complete;
126
127 status = pj_ioqueue_register_sock(pool, ioqueue, sock, asock,
128 &ioq_cb, &asock->key);
129 if (status != PJ_SUCCESS) {
130 pj_activesock_close(asock);
131 return status;
132 }
133
134 if (opt && opt->concurrency >= 0) {
135 pj_ioqueue_set_concurrency(asock->key, opt->concurrency);
136 }
137
138 *p_asock = asock;
139 return PJ_SUCCESS;
140}
141
142
143PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool,
144 const pj_sockaddr *addr,
145 const pj_activesock_cfg *opt,
146 pj_ioqueue_t *ioqueue,
147 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000148 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000149 pj_activesock_t **p_asock,
150 pj_sockaddr *bound_addr)
151{
152 pj_sock_t sock_fd;
153 pj_sockaddr default_addr;
154 pj_status_t status;
155
156 if (addr == NULL) {
157 pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0);
158 addr = &default_addr;
159 }
160
161 status = pj_sock_socket(addr->addr.sa_family, pj_SOCK_DGRAM(), 0,
162 &sock_fd);
163 if (status != PJ_SUCCESS) {
164 return status;
165 }
166
167 status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr));
168 if (status != PJ_SUCCESS) {
169 pj_sock_close(sock_fd);
170 return status;
171 }
172
173 status = pj_activesock_create(pool, sock_fd, pj_SOCK_DGRAM(), opt,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000174 ioqueue, cb, user_data, p_asock);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000175 if (status != PJ_SUCCESS) {
176 pj_sock_close(sock_fd);
177 return status;
178 }
179
180 if (bound_addr) {
181 int addr_len = sizeof(*bound_addr);
182 status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len);
183 if (status != PJ_SUCCESS) {
184 pj_activesock_close(*p_asock);
185 return status;
186 }
187 }
188
189 return PJ_SUCCESS;
190}
191
192
193PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock)
194{
195 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
196 if (asock->key) {
197 pj_ioqueue_unregister(asock->key);
198 asock->key = NULL;
199 }
200 return PJ_SUCCESS;
201}
202
203
204PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock,
205 void *user_data)
206{
207 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
208 asock->user_data = user_data;
209 return PJ_SUCCESS;
210}
211
212
213PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock)
214{
215 PJ_ASSERT_RETURN(asock, NULL);
216 return asock->user_data;
217}
218
219
220PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock,
221 pj_pool_t *pool,
222 unsigned buff_size,
223 pj_uint32_t flags)
224{
225 unsigned i;
226 pj_status_t status;
227
228 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
229 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
230 PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP);
231
232 asock->read_op = (struct read_op*)
233 pj_pool_calloc(pool, asock->async_count,
234 sizeof(struct read_op));
235 asock->read_type = TYPE_RECV;
236 asock->read_flags = flags;
237
238 for (i=0; i<asock->async_count; ++i) {
239 struct read_op *r = &asock->read_op[i];
240 pj_ssize_t size_to_read;
241
Benny Prijonoc67f8852008-05-20 08:51:03 +0000242 r->pkt = (pj_uint8_t*)pj_pool_alloc(pool, buff_size);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000243 r->max_size = size_to_read = buff_size;
244
245 status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read,
246 PJ_IOQUEUE_ALWAYS_ASYNC | flags);
247 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
248
249 if (status != PJ_EPENDING)
250 return status;
251 }
252
253 return PJ_SUCCESS;
254}
255
256
257PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock,
258 pj_pool_t *pool,
259 unsigned buff_size,
260 pj_uint32_t flags)
261{
262 unsigned i;
263 pj_status_t status;
264
265 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
266 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
267
268 asock->read_op = (struct read_op*)
269 pj_pool_calloc(pool, asock->async_count,
270 sizeof(struct read_op));
271 asock->read_type = TYPE_RECV_FROM;
272 asock->read_flags = flags;
273
274 for (i=0; i<asock->async_count; ++i) {
275 struct read_op *r = &asock->read_op[i];
276 pj_ssize_t size_to_read;
277
Benny Prijonoc67f8852008-05-20 08:51:03 +0000278 r->pkt = (pj_uint8_t*) pj_pool_alloc(pool, buff_size);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000279 r->max_size = size_to_read = buff_size;
280 r->src_addr_len = sizeof(r->src_addr);
281
282 status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt,
283 &size_to_read,
284 PJ_IOQUEUE_ALWAYS_ASYNC | flags,
285 &r->src_addr, &r->src_addr_len);
286 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
287
288 if (status != PJ_EPENDING)
289 return status;
290 }
291
292 return PJ_SUCCESS;
293}
294
295
296static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
297 pj_ioqueue_op_key_t *op_key,
298 pj_ssize_t bytes_read)
299{
300 pj_activesock_t *asock;
301 struct read_op *r = (struct read_op*)op_key;
302 unsigned loop = 0;
303 pj_status_t status;
304
305 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
306
307 do {
308 unsigned flags;
309
310 if (bytes_read > 0) {
311 /*
312 * We've got new data.
313 */
314 pj_size_t remainder;
315 pj_bool_t ret;
316
317 /* Append this new data to existing data. If socket is stream
318 * oriented, user might have left some data in the buffer.
319 * Otherwise if socket is datagram there will be nothing in
320 * existing packet hence the packet will contain only the new
321 * packet.
322 */
323 r->size += bytes_read;
324
325 /* Set default remainder to zero */
326 remainder = 0;
327
328 /* And return value to TRUE */
329 ret = PJ_TRUE;
330
331 /* Notify callback */
332 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
333 ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
334 PJ_SUCCESS, &remainder);
335 } else if (asock->read_type == TYPE_RECV_FROM &&
336 asock->cb.on_data_recvfrom)
337 {
338 ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size,
339 &r->src_addr,
340 r->src_addr_len,
341 PJ_SUCCESS);
342 }
343
344 /* If callback returns false, we have been destroyed! */
345 if (!ret)
346 return;
347
348 /* Only stream oriented socket may leave data in the packet */
349 if (asock->stream_oriented) {
350 r->size = remainder;
351 } else {
352 r->size = 0;
353 }
354
355 } else if (bytes_read <= 0) {
356
357 pj_size_t remainder;
358 pj_bool_t ret;
359
360 if (bytes_read == 0) {
361 /* For stream/connection oriented socket, this means the
362 * connection has been closed. For datagram sockets, it means
363 * we've received datagram with zero length.
364 */
365 if (asock->stream_oriented)
366 status = PJ_EEOF;
367 else
368 status = PJ_SUCCESS;
369 } else {
370 /* This means we've got an error. If this is stream/connection
371 * oriented, it means connection has been closed. For datagram
372 * sockets, it means we've got some error (e.g. EWOULDBLOCK).
373 */
374 status = -bytes_read;
375 }
376
377 /* Set default remainder to zero */
378 remainder = 0;
379
380 /* And return value to TRUE */
381 ret = PJ_TRUE;
382
383 /* Notify callback */
384 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
385 /* For connection oriented socket, we still need to report
386 * the remainder data (if any) to the user to let user do
387 * processing with the remainder data before it closes the
388 * connection.
389 * If there is no remainder data, set the packet to NULL.
390 */
391 ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL),
392 r->size, status, &remainder);
393
394 } else if (asock->read_type == TYPE_RECV_FROM &&
395 asock->cb.on_data_recvfrom)
396 {
397 /* This would always be datagram oriented hence there's
398 * nothing in the packet. We can't be sure if there will be
399 * anything useful in the source_addr, so just put NULL
400 * there too.
401 */
402 ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0,
403 NULL, 0, status);
404 }
405
406 /* If callback returns false, we have been destroyed! */
407 if (!ret)
408 return;
409
410 /* Only stream oriented socket may leave data in the packet */
411 if (asock->stream_oriented) {
412 r->size = remainder;
413 } else {
414 r->size = 0;
415 }
416 }
417
418 /* Read next data. We limit ourselves to processing max_loop immediate
419 * data, so when the loop counter has exceeded this value, force the
420 * read()/recvfrom() to return pending operation to allow the program
421 * to do other jobs.
422 */
423 bytes_read = r->max_size - r->size;
424 flags = asock->read_flags;
425 if (++loop >= asock->max_loop)
426 flags |= PJ_IOQUEUE_ALWAYS_ASYNC;
427
428 if (asock->read_type == TYPE_RECV) {
429 status = pj_ioqueue_recv(key, op_key, r->pkt + r->size,
430 &bytes_read, flags);
431 } else {
432 r->src_addr_len = sizeof(r->src_addr);
433 status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size,
434 &bytes_read, flags,
435 &r->src_addr, &r->src_addr_len);
436 }
437
438 } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
439
440}
441
442
443PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
444 pj_ioqueue_op_key_t *send_key,
445 const void *data,
446 pj_ssize_t *size,
447 unsigned flags)
448{
449 PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
450
451 return pj_ioqueue_send(asock->key, send_key, data, size, flags);
452}
453
454
455PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock,
456 pj_ioqueue_op_key_t *send_key,
457 const void *data,
458 pj_ssize_t *size,
459 unsigned flags,
460 const pj_sockaddr_t *addr,
461 int addr_len)
462{
463 PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len,
464 PJ_EINVAL);
465
466 return pj_ioqueue_sendto(asock->key, send_key, data, size, flags,
467 addr, addr_len);
468}
469
470
471static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
472 pj_ioqueue_op_key_t *op_key,
473 pj_ssize_t bytes_sent)
474{
475 pj_activesock_t *asock;
476
477 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
478
479 if (asock->cb.on_data_sent) {
480 pj_bool_t ret;
481
482 ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent);
483
484 /* If callback returns false, we have been destroyed! */
485 if (!ret)
486 return;
487 }
488}
489
490
491PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock,
492 pj_pool_t *pool)
493{
494 unsigned i;
495
496 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
497 PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP);
498
499 asock->accept_op = (struct accept_op*)
500 pj_pool_calloc(pool, asock->async_count,
501 sizeof(struct accept_op));
502 for (i=0; i<asock->async_count; ++i) {
503 struct accept_op *a = &asock->accept_op[i];
504 pj_status_t status;
505
506 do {
507 a->new_sock = PJ_INVALID_SOCKET;
508 a->rem_addr_len = sizeof(a->rem_addr);
509
510 status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock,
511 NULL, &a->rem_addr, &a->rem_addr_len);
512 if (status == PJ_SUCCESS) {
513 /* We've got immediate connection. Not sure if it's a good
514 * idea to call the callback now (probably application will
515 * not be prepared to process it), so lets just silently
516 * close the socket.
517 */
518 pj_sock_close(a->new_sock);
519 }
520 } while (status == PJ_SUCCESS);
521
522 if (status != PJ_EPENDING) {
523 return status;
524 }
525 }
526
527 return PJ_SUCCESS;
528}
529
530
531static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
532 pj_ioqueue_op_key_t *op_key,
533 pj_sock_t new_sock,
534 pj_status_t status)
535{
536 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
537 struct accept_op *accept_op = (struct accept_op*) op_key;
538
539 do {
540 if (status==PJ_SUCCESS && asock->cb.on_accept_complete) {
541 pj_bool_t ret;
542
543 /* Notify callback */
544 ret = (*asock->cb.on_accept_complete)(asock, new_sock,
545 &accept_op->rem_addr,
546 accept_op->rem_addr_len);
547
548 /* If callback returns false, we have been destroyed! */
549 if (!ret)
550 return;
551
552 } else if (status==PJ_SUCCESS) {
553 /* Application doesn't handle the new socket, we need to
554 * close it to avoid resource leak.
555 */
556 pj_sock_close(accept_op->new_sock);
557 }
558
559 /* Prepare next accept() */
560 accept_op->new_sock = PJ_INVALID_SOCKET;
561 accept_op->rem_addr_len = sizeof(accept_op->rem_addr);
562
563 status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock,
564 NULL, &accept_op->rem_addr,
565 &accept_op->rem_addr_len);
566
567 } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
568}
569
570
571PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock,
572 pj_pool_t *pool,
573 const pj_sockaddr_t *remaddr,
574 int addr_len)
575{
576 PJ_UNUSED_ARG(pool);
577 return pj_ioqueue_connect(asock->key, remaddr, addr_len);
578}
579
580
581static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
582 pj_status_t status)
583{
584 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
585
586 if (asock->cb.on_connect_complete) {
587 pj_bool_t ret;
588
589 ret = (*asock->cb.on_connect_complete)(asock, status);
590
591 if (!ret) {
592 /* We've been destroyed */
593 return;
594 }
595 }
596}
597