blob: 112ca5f23c596f6df78dcdcfa2fffd6b43837b49 [file] [log] [blame]
Benny Prijono4bac2c12008-05-11 18:12:16 +00001/* $Id$ */
2/*
3 * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19#include <pj/activesock.h>
20#include <pj/assert.h>
21#include <pj/errno.h>
22#include <pj/pool.h>
23#include <pj/sock.h>
24#include <pj/string.h>
25
26#define PJ_ACTIVESOCK_MAX_LOOP 50
27
28
29enum read_type
30{
31 TYPE_NONE,
32 TYPE_RECV,
33 TYPE_RECV_FROM
34};
35
36struct read_op
37{
38 pj_ioqueue_op_key_t op_key;
39 pj_uint8_t *pkt;
40 unsigned max_size;
41 pj_size_t size;
42 pj_sockaddr src_addr;
43 int src_addr_len;
44};
45
46struct accept_op
47{
48 pj_ioqueue_op_key_t op_key;
49 pj_sock_t new_sock;
50 pj_sockaddr rem_addr;
51 int rem_addr_len;
52};
53
54struct pj_activesock_t
55{
56 pj_ioqueue_key_t *key;
57 pj_bool_t stream_oriented;
58 pj_ioqueue_t *ioqueue;
59 void *user_data;
60 unsigned async_count;
61 unsigned max_loop;
62 pj_activesock_cb cb;
63
64 struct read_op *read_op;
65 pj_uint32_t read_flags;
66 enum read_type read_type;
67
68 struct accept_op *accept_op;
69};
70
71
72static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
73 pj_ioqueue_op_key_t *op_key,
74 pj_ssize_t bytes_read);
75static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
76 pj_ioqueue_op_key_t *op_key,
77 pj_ssize_t bytes_sent);
78static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
79 pj_ioqueue_op_key_t *op_key,
80 pj_sock_t sock,
81 pj_status_t status);
82static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
83 pj_status_t status);
84
85
86PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg)
87{
88 pj_bzero(cfg, sizeof(*cfg));
89 cfg->async_cnt = 1;
90 cfg->concurrency = -1;
91}
92
93
94PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
95 pj_sock_t sock,
96 int sock_type,
97 const pj_activesock_cfg *opt,
98 pj_ioqueue_t *ioqueue,
99 const pj_activesock_cb *cb,
100 pj_activesock_t **p_asock)
101{
102 pj_activesock_t *asock;
103 pj_ioqueue_callback ioq_cb;
104 pj_status_t status;
105
106 PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL);
107 PJ_ASSERT_RETURN(sock!=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL);
108 PJ_ASSERT_RETURN(sock_type==pj_SOCK_STREAM() ||
109 sock_type==pj_SOCK_DGRAM(), PJ_EINVAL);
110 PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL);
111
112 asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t);
113 asock->ioqueue = ioqueue;
114 asock->stream_oriented = (sock_type == pj_SOCK_STREAM());
115 asock->async_count = (opt? opt->async_cnt : 1);
116 asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP;
117 pj_memcpy(&asock->cb, cb, sizeof(*cb));
118
119 pj_bzero(&ioq_cb, sizeof(ioq_cb));
120 ioq_cb.on_read_complete = &ioqueue_on_read_complete;
121 ioq_cb.on_write_complete = &ioqueue_on_write_complete;
122 ioq_cb.on_connect_complete = &ioqueue_on_connect_complete;
123 ioq_cb.on_accept_complete = &ioqueue_on_accept_complete;
124
125 status = pj_ioqueue_register_sock(pool, ioqueue, sock, asock,
126 &ioq_cb, &asock->key);
127 if (status != PJ_SUCCESS) {
128 pj_activesock_close(asock);
129 return status;
130 }
131
132 if (opt && opt->concurrency >= 0) {
133 pj_ioqueue_set_concurrency(asock->key, opt->concurrency);
134 }
135
136 *p_asock = asock;
137 return PJ_SUCCESS;
138}
139
140
141PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool,
142 const pj_sockaddr *addr,
143 const pj_activesock_cfg *opt,
144 pj_ioqueue_t *ioqueue,
145 const pj_activesock_cb *cb,
146 pj_activesock_t **p_asock,
147 pj_sockaddr *bound_addr)
148{
149 pj_sock_t sock_fd;
150 pj_sockaddr default_addr;
151 pj_status_t status;
152
153 if (addr == NULL) {
154 pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0);
155 addr = &default_addr;
156 }
157
158 status = pj_sock_socket(addr->addr.sa_family, pj_SOCK_DGRAM(), 0,
159 &sock_fd);
160 if (status != PJ_SUCCESS) {
161 return status;
162 }
163
164 status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr));
165 if (status != PJ_SUCCESS) {
166 pj_sock_close(sock_fd);
167 return status;
168 }
169
170 status = pj_activesock_create(pool, sock_fd, pj_SOCK_DGRAM(), opt,
171 ioqueue, cb, p_asock);
172 if (status != PJ_SUCCESS) {
173 pj_sock_close(sock_fd);
174 return status;
175 }
176
177 if (bound_addr) {
178 int addr_len = sizeof(*bound_addr);
179 status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len);
180 if (status != PJ_SUCCESS) {
181 pj_activesock_close(*p_asock);
182 return status;
183 }
184 }
185
186 return PJ_SUCCESS;
187}
188
189
190PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock)
191{
192 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
193 if (asock->key) {
194 pj_ioqueue_unregister(asock->key);
195 asock->key = NULL;
196 }
197 return PJ_SUCCESS;
198}
199
200
201PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock,
202 void *user_data)
203{
204 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
205 asock->user_data = user_data;
206 return PJ_SUCCESS;
207}
208
209
210PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock)
211{
212 PJ_ASSERT_RETURN(asock, NULL);
213 return asock->user_data;
214}
215
216
217PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock,
218 pj_pool_t *pool,
219 unsigned buff_size,
220 pj_uint32_t flags)
221{
222 unsigned i;
223 pj_status_t status;
224
225 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
226 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
227 PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP);
228
229 asock->read_op = (struct read_op*)
230 pj_pool_calloc(pool, asock->async_count,
231 sizeof(struct read_op));
232 asock->read_type = TYPE_RECV;
233 asock->read_flags = flags;
234
235 for (i=0; i<asock->async_count; ++i) {
236 struct read_op *r = &asock->read_op[i];
237 pj_ssize_t size_to_read;
238
Benny Prijonoc67f8852008-05-20 08:51:03 +0000239 r->pkt = (pj_uint8_t*)pj_pool_alloc(pool, buff_size);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000240 r->max_size = size_to_read = buff_size;
241
242 status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read,
243 PJ_IOQUEUE_ALWAYS_ASYNC | flags);
244 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
245
246 if (status != PJ_EPENDING)
247 return status;
248 }
249
250 return PJ_SUCCESS;
251}
252
253
254PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock,
255 pj_pool_t *pool,
256 unsigned buff_size,
257 pj_uint32_t flags)
258{
259 unsigned i;
260 pj_status_t status;
261
262 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
263 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
264
265 asock->read_op = (struct read_op*)
266 pj_pool_calloc(pool, asock->async_count,
267 sizeof(struct read_op));
268 asock->read_type = TYPE_RECV_FROM;
269 asock->read_flags = flags;
270
271 for (i=0; i<asock->async_count; ++i) {
272 struct read_op *r = &asock->read_op[i];
273 pj_ssize_t size_to_read;
274
Benny Prijonoc67f8852008-05-20 08:51:03 +0000275 r->pkt = (pj_uint8_t*) pj_pool_alloc(pool, buff_size);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000276 r->max_size = size_to_read = buff_size;
277 r->src_addr_len = sizeof(r->src_addr);
278
279 status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt,
280 &size_to_read,
281 PJ_IOQUEUE_ALWAYS_ASYNC | flags,
282 &r->src_addr, &r->src_addr_len);
283 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
284
285 if (status != PJ_EPENDING)
286 return status;
287 }
288
289 return PJ_SUCCESS;
290}
291
292
293static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
294 pj_ioqueue_op_key_t *op_key,
295 pj_ssize_t bytes_read)
296{
297 pj_activesock_t *asock;
298 struct read_op *r = (struct read_op*)op_key;
299 unsigned loop = 0;
300 pj_status_t status;
301
302 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
303
304 do {
305 unsigned flags;
306
307 if (bytes_read > 0) {
308 /*
309 * We've got new data.
310 */
311 pj_size_t remainder;
312 pj_bool_t ret;
313
314 /* Append this new data to existing data. If socket is stream
315 * oriented, user might have left some data in the buffer.
316 * Otherwise if socket is datagram there will be nothing in
317 * existing packet hence the packet will contain only the new
318 * packet.
319 */
320 r->size += bytes_read;
321
322 /* Set default remainder to zero */
323 remainder = 0;
324
325 /* And return value to TRUE */
326 ret = PJ_TRUE;
327
328 /* Notify callback */
329 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
330 ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
331 PJ_SUCCESS, &remainder);
332 } else if (asock->read_type == TYPE_RECV_FROM &&
333 asock->cb.on_data_recvfrom)
334 {
335 ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size,
336 &r->src_addr,
337 r->src_addr_len,
338 PJ_SUCCESS);
339 }
340
341 /* If callback returns false, we have been destroyed! */
342 if (!ret)
343 return;
344
345 /* Only stream oriented socket may leave data in the packet */
346 if (asock->stream_oriented) {
347 r->size = remainder;
348 } else {
349 r->size = 0;
350 }
351
352 } else if (bytes_read <= 0) {
353
354 pj_size_t remainder;
355 pj_bool_t ret;
356
357 if (bytes_read == 0) {
358 /* For stream/connection oriented socket, this means the
359 * connection has been closed. For datagram sockets, it means
360 * we've received datagram with zero length.
361 */
362 if (asock->stream_oriented)
363 status = PJ_EEOF;
364 else
365 status = PJ_SUCCESS;
366 } else {
367 /* This means we've got an error. If this is stream/connection
368 * oriented, it means connection has been closed. For datagram
369 * sockets, it means we've got some error (e.g. EWOULDBLOCK).
370 */
371 status = -bytes_read;
372 }
373
374 /* Set default remainder to zero */
375 remainder = 0;
376
377 /* And return value to TRUE */
378 ret = PJ_TRUE;
379
380 /* Notify callback */
381 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
382 /* For connection oriented socket, we still need to report
383 * the remainder data (if any) to the user to let user do
384 * processing with the remainder data before it closes the
385 * connection.
386 * If there is no remainder data, set the packet to NULL.
387 */
388 ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL),
389 r->size, status, &remainder);
390
391 } else if (asock->read_type == TYPE_RECV_FROM &&
392 asock->cb.on_data_recvfrom)
393 {
394 /* This would always be datagram oriented hence there's
395 * nothing in the packet. We can't be sure if there will be
396 * anything useful in the source_addr, so just put NULL
397 * there too.
398 */
399 ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0,
400 NULL, 0, status);
401 }
402
403 /* If callback returns false, we have been destroyed! */
404 if (!ret)
405 return;
406
407 /* Only stream oriented socket may leave data in the packet */
408 if (asock->stream_oriented) {
409 r->size = remainder;
410 } else {
411 r->size = 0;
412 }
413 }
414
415 /* Read next data. We limit ourselves to processing max_loop immediate
416 * data, so when the loop counter has exceeded this value, force the
417 * read()/recvfrom() to return pending operation to allow the program
418 * to do other jobs.
419 */
420 bytes_read = r->max_size - r->size;
421 flags = asock->read_flags;
422 if (++loop >= asock->max_loop)
423 flags |= PJ_IOQUEUE_ALWAYS_ASYNC;
424
425 if (asock->read_type == TYPE_RECV) {
426 status = pj_ioqueue_recv(key, op_key, r->pkt + r->size,
427 &bytes_read, flags);
428 } else {
429 r->src_addr_len = sizeof(r->src_addr);
430 status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size,
431 &bytes_read, flags,
432 &r->src_addr, &r->src_addr_len);
433 }
434
435 } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
436
437}
438
439
440PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
441 pj_ioqueue_op_key_t *send_key,
442 const void *data,
443 pj_ssize_t *size,
444 unsigned flags)
445{
446 PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
447
448 return pj_ioqueue_send(asock->key, send_key, data, size, flags);
449}
450
451
452PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock,
453 pj_ioqueue_op_key_t *send_key,
454 const void *data,
455 pj_ssize_t *size,
456 unsigned flags,
457 const pj_sockaddr_t *addr,
458 int addr_len)
459{
460 PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len,
461 PJ_EINVAL);
462
463 return pj_ioqueue_sendto(asock->key, send_key, data, size, flags,
464 addr, addr_len);
465}
466
467
468static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
469 pj_ioqueue_op_key_t *op_key,
470 pj_ssize_t bytes_sent)
471{
472 pj_activesock_t *asock;
473
474 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
475
476 if (asock->cb.on_data_sent) {
477 pj_bool_t ret;
478
479 ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent);
480
481 /* If callback returns false, we have been destroyed! */
482 if (!ret)
483 return;
484 }
485}
486
487
488PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock,
489 pj_pool_t *pool)
490{
491 unsigned i;
492
493 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
494 PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP);
495
496 asock->accept_op = (struct accept_op*)
497 pj_pool_calloc(pool, asock->async_count,
498 sizeof(struct accept_op));
499 for (i=0; i<asock->async_count; ++i) {
500 struct accept_op *a = &asock->accept_op[i];
501 pj_status_t status;
502
503 do {
504 a->new_sock = PJ_INVALID_SOCKET;
505 a->rem_addr_len = sizeof(a->rem_addr);
506
507 status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock,
508 NULL, &a->rem_addr, &a->rem_addr_len);
509 if (status == PJ_SUCCESS) {
510 /* We've got immediate connection. Not sure if it's a good
511 * idea to call the callback now (probably application will
512 * not be prepared to process it), so lets just silently
513 * close the socket.
514 */
515 pj_sock_close(a->new_sock);
516 }
517 } while (status == PJ_SUCCESS);
518
519 if (status != PJ_EPENDING) {
520 return status;
521 }
522 }
523
524 return PJ_SUCCESS;
525}
526
527
528static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
529 pj_ioqueue_op_key_t *op_key,
530 pj_sock_t new_sock,
531 pj_status_t status)
532{
533 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
534 struct accept_op *accept_op = (struct accept_op*) op_key;
535
536 do {
537 if (status==PJ_SUCCESS && asock->cb.on_accept_complete) {
538 pj_bool_t ret;
539
540 /* Notify callback */
541 ret = (*asock->cb.on_accept_complete)(asock, new_sock,
542 &accept_op->rem_addr,
543 accept_op->rem_addr_len);
544
545 /* If callback returns false, we have been destroyed! */
546 if (!ret)
547 return;
548
549 } else if (status==PJ_SUCCESS) {
550 /* Application doesn't handle the new socket, we need to
551 * close it to avoid resource leak.
552 */
553 pj_sock_close(accept_op->new_sock);
554 }
555
556 /* Prepare next accept() */
557 accept_op->new_sock = PJ_INVALID_SOCKET;
558 accept_op->rem_addr_len = sizeof(accept_op->rem_addr);
559
560 status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock,
561 NULL, &accept_op->rem_addr,
562 &accept_op->rem_addr_len);
563
564 } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
565}
566
567
568PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock,
569 pj_pool_t *pool,
570 const pj_sockaddr_t *remaddr,
571 int addr_len)
572{
573 PJ_UNUSED_ARG(pool);
574 return pj_ioqueue_connect(asock->key, remaddr, addr_len);
575}
576
577
578static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
579 pj_status_t status)
580{
581 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
582
583 if (asock->cb.on_connect_complete) {
584 pj_bool_t ret;
585
586 ret = (*asock->cb.on_connect_complete)(asock, status);
587
588 if (!ret) {
589 /* We've been destroyed */
590 return;
591 }
592 }
593}
594