blob: 45173c5b50859d5f0e2e579cd79fbae72d59663f [file] [log] [blame]
Benny Prijono4bac2c12008-05-11 18:12:16 +00001/* $Id$ */
2/*
Benny Prijono32177c02008-06-20 22:44:47 +00003 * Copyright (C)2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono4bac2c12008-05-11 18:12:16 +00004 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19#include <pj/activesock.h>
20#include <pj/assert.h>
21#include <pj/errno.h>
22#include <pj/pool.h>
23#include <pj/sock.h>
24#include <pj/string.h>
25
26#define PJ_ACTIVESOCK_MAX_LOOP 50
27
28
29enum read_type
30{
31 TYPE_NONE,
32 TYPE_RECV,
33 TYPE_RECV_FROM
34};
35
36struct read_op
37{
38 pj_ioqueue_op_key_t op_key;
39 pj_uint8_t *pkt;
40 unsigned max_size;
41 pj_size_t size;
42 pj_sockaddr src_addr;
43 int src_addr_len;
44};
45
46struct accept_op
47{
48 pj_ioqueue_op_key_t op_key;
49 pj_sock_t new_sock;
50 pj_sockaddr rem_addr;
51 int rem_addr_len;
52};
53
54struct pj_activesock_t
55{
56 pj_ioqueue_key_t *key;
57 pj_bool_t stream_oriented;
58 pj_ioqueue_t *ioqueue;
59 void *user_data;
60 unsigned async_count;
61 unsigned max_loop;
62 pj_activesock_cb cb;
63
64 struct read_op *read_op;
65 pj_uint32_t read_flags;
66 enum read_type read_type;
67
68 struct accept_op *accept_op;
69};
70
71
72static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
73 pj_ioqueue_op_key_t *op_key,
74 pj_ssize_t bytes_read);
75static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
76 pj_ioqueue_op_key_t *op_key,
77 pj_ssize_t bytes_sent);
Benny Prijono1dd54202008-07-25 10:45:34 +000078#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +000079static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
80 pj_ioqueue_op_key_t *op_key,
81 pj_sock_t sock,
82 pj_status_t status);
83static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
84 pj_status_t status);
Benny Prijono1dd54202008-07-25 10:45:34 +000085#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +000086
87PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg)
88{
89 pj_bzero(cfg, sizeof(*cfg));
90 cfg->async_cnt = 1;
91 cfg->concurrency = -1;
92}
93
94
95PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
96 pj_sock_t sock,
97 int sock_type,
98 const pj_activesock_cfg *opt,
99 pj_ioqueue_t *ioqueue,
100 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000101 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000102 pj_activesock_t **p_asock)
103{
104 pj_activesock_t *asock;
105 pj_ioqueue_callback ioq_cb;
106 pj_status_t status;
107
108 PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL);
109 PJ_ASSERT_RETURN(sock!=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL);
110 PJ_ASSERT_RETURN(sock_type==pj_SOCK_STREAM() ||
111 sock_type==pj_SOCK_DGRAM(), PJ_EINVAL);
112 PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL);
113
114 asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t);
115 asock->ioqueue = ioqueue;
116 asock->stream_oriented = (sock_type == pj_SOCK_STREAM());
117 asock->async_count = (opt? opt->async_cnt : 1);
118 asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP;
Benny Prijonoea8e4362008-06-06 14:12:23 +0000119 asock->user_data = user_data;
Benny Prijono4bac2c12008-05-11 18:12:16 +0000120 pj_memcpy(&asock->cb, cb, sizeof(*cb));
121
122 pj_bzero(&ioq_cb, sizeof(ioq_cb));
123 ioq_cb.on_read_complete = &ioqueue_on_read_complete;
124 ioq_cb.on_write_complete = &ioqueue_on_write_complete;
Benny Prijono1dd54202008-07-25 10:45:34 +0000125#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000126 ioq_cb.on_connect_complete = &ioqueue_on_connect_complete;
127 ioq_cb.on_accept_complete = &ioqueue_on_accept_complete;
Benny Prijono1dd54202008-07-25 10:45:34 +0000128#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000129
130 status = pj_ioqueue_register_sock(pool, ioqueue, sock, asock,
131 &ioq_cb, &asock->key);
132 if (status != PJ_SUCCESS) {
133 pj_activesock_close(asock);
134 return status;
135 }
136
137 if (opt && opt->concurrency >= 0) {
138 pj_ioqueue_set_concurrency(asock->key, opt->concurrency);
139 }
140
141 *p_asock = asock;
142 return PJ_SUCCESS;
143}
144
145
146PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool,
147 const pj_sockaddr *addr,
148 const pj_activesock_cfg *opt,
149 pj_ioqueue_t *ioqueue,
150 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000151 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000152 pj_activesock_t **p_asock,
153 pj_sockaddr *bound_addr)
154{
155 pj_sock_t sock_fd;
156 pj_sockaddr default_addr;
157 pj_status_t status;
158
159 if (addr == NULL) {
160 pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0);
161 addr = &default_addr;
162 }
163
164 status = pj_sock_socket(addr->addr.sa_family, pj_SOCK_DGRAM(), 0,
165 &sock_fd);
166 if (status != PJ_SUCCESS) {
167 return status;
168 }
169
170 status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr));
171 if (status != PJ_SUCCESS) {
172 pj_sock_close(sock_fd);
173 return status;
174 }
175
176 status = pj_activesock_create(pool, sock_fd, pj_SOCK_DGRAM(), opt,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000177 ioqueue, cb, user_data, p_asock);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000178 if (status != PJ_SUCCESS) {
179 pj_sock_close(sock_fd);
180 return status;
181 }
182
183 if (bound_addr) {
184 int addr_len = sizeof(*bound_addr);
185 status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len);
186 if (status != PJ_SUCCESS) {
187 pj_activesock_close(*p_asock);
188 return status;
189 }
190 }
191
192 return PJ_SUCCESS;
193}
194
195
196PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock)
197{
198 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
199 if (asock->key) {
200 pj_ioqueue_unregister(asock->key);
201 asock->key = NULL;
202 }
203 return PJ_SUCCESS;
204}
205
206
207PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock,
208 void *user_data)
209{
210 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
211 asock->user_data = user_data;
212 return PJ_SUCCESS;
213}
214
215
216PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock)
217{
218 PJ_ASSERT_RETURN(asock, NULL);
219 return asock->user_data;
220}
221
222
223PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock,
224 pj_pool_t *pool,
225 unsigned buff_size,
226 pj_uint32_t flags)
227{
228 unsigned i;
229 pj_status_t status;
230
231 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
232 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
233 PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP);
234
235 asock->read_op = (struct read_op*)
236 pj_pool_calloc(pool, asock->async_count,
237 sizeof(struct read_op));
238 asock->read_type = TYPE_RECV;
239 asock->read_flags = flags;
240
241 for (i=0; i<asock->async_count; ++i) {
242 struct read_op *r = &asock->read_op[i];
243 pj_ssize_t size_to_read;
244
Benny Prijonoc67f8852008-05-20 08:51:03 +0000245 r->pkt = (pj_uint8_t*)pj_pool_alloc(pool, buff_size);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000246 r->max_size = size_to_read = buff_size;
247
248 status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read,
249 PJ_IOQUEUE_ALWAYS_ASYNC | flags);
250 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
251
252 if (status != PJ_EPENDING)
253 return status;
254 }
255
256 return PJ_SUCCESS;
257}
258
259
260PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock,
261 pj_pool_t *pool,
262 unsigned buff_size,
263 pj_uint32_t flags)
264{
265 unsigned i;
266 pj_status_t status;
267
268 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
269 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
270
271 asock->read_op = (struct read_op*)
272 pj_pool_calloc(pool, asock->async_count,
273 sizeof(struct read_op));
274 asock->read_type = TYPE_RECV_FROM;
275 asock->read_flags = flags;
276
277 for (i=0; i<asock->async_count; ++i) {
278 struct read_op *r = &asock->read_op[i];
279 pj_ssize_t size_to_read;
280
Benny Prijonoc67f8852008-05-20 08:51:03 +0000281 r->pkt = (pj_uint8_t*) pj_pool_alloc(pool, buff_size);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000282 r->max_size = size_to_read = buff_size;
283 r->src_addr_len = sizeof(r->src_addr);
284
285 status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt,
286 &size_to_read,
287 PJ_IOQUEUE_ALWAYS_ASYNC | flags,
288 &r->src_addr, &r->src_addr_len);
289 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
290
291 if (status != PJ_EPENDING)
292 return status;
293 }
294
295 return PJ_SUCCESS;
296}
297
298
299static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
300 pj_ioqueue_op_key_t *op_key,
301 pj_ssize_t bytes_read)
302{
303 pj_activesock_t *asock;
304 struct read_op *r = (struct read_op*)op_key;
305 unsigned loop = 0;
306 pj_status_t status;
307
308 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
309
310 do {
311 unsigned flags;
312
313 if (bytes_read > 0) {
314 /*
315 * We've got new data.
316 */
317 pj_size_t remainder;
318 pj_bool_t ret;
319
320 /* Append this new data to existing data. If socket is stream
321 * oriented, user might have left some data in the buffer.
322 * Otherwise if socket is datagram there will be nothing in
323 * existing packet hence the packet will contain only the new
324 * packet.
325 */
326 r->size += bytes_read;
327
328 /* Set default remainder to zero */
329 remainder = 0;
330
331 /* And return value to TRUE */
332 ret = PJ_TRUE;
333
334 /* Notify callback */
335 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
336 ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
337 PJ_SUCCESS, &remainder);
338 } else if (asock->read_type == TYPE_RECV_FROM &&
339 asock->cb.on_data_recvfrom)
340 {
341 ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size,
342 &r->src_addr,
343 r->src_addr_len,
344 PJ_SUCCESS);
345 }
346
347 /* If callback returns false, we have been destroyed! */
348 if (!ret)
349 return;
350
351 /* Only stream oriented socket may leave data in the packet */
352 if (asock->stream_oriented) {
353 r->size = remainder;
354 } else {
355 r->size = 0;
356 }
357
358 } else if (bytes_read <= 0) {
359
360 pj_size_t remainder;
361 pj_bool_t ret;
362
363 if (bytes_read == 0) {
364 /* For stream/connection oriented socket, this means the
365 * connection has been closed. For datagram sockets, it means
366 * we've received datagram with zero length.
367 */
368 if (asock->stream_oriented)
369 status = PJ_EEOF;
370 else
371 status = PJ_SUCCESS;
372 } else {
373 /* This means we've got an error. If this is stream/connection
374 * oriented, it means connection has been closed. For datagram
375 * sockets, it means we've got some error (e.g. EWOULDBLOCK).
376 */
377 status = -bytes_read;
378 }
379
380 /* Set default remainder to zero */
381 remainder = 0;
382
383 /* And return value to TRUE */
384 ret = PJ_TRUE;
385
386 /* Notify callback */
387 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
388 /* For connection oriented socket, we still need to report
389 * the remainder data (if any) to the user to let user do
390 * processing with the remainder data before it closes the
391 * connection.
392 * If there is no remainder data, set the packet to NULL.
393 */
394 ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL),
395 r->size, status, &remainder);
396
397 } else if (asock->read_type == TYPE_RECV_FROM &&
398 asock->cb.on_data_recvfrom)
399 {
400 /* This would always be datagram oriented hence there's
401 * nothing in the packet. We can't be sure if there will be
402 * anything useful in the source_addr, so just put NULL
403 * there too.
404 */
405 ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0,
406 NULL, 0, status);
407 }
408
409 /* If callback returns false, we have been destroyed! */
410 if (!ret)
411 return;
412
413 /* Only stream oriented socket may leave data in the packet */
414 if (asock->stream_oriented) {
415 r->size = remainder;
416 } else {
417 r->size = 0;
418 }
419 }
420
421 /* Read next data. We limit ourselves to processing max_loop immediate
422 * data, so when the loop counter has exceeded this value, force the
423 * read()/recvfrom() to return pending operation to allow the program
424 * to do other jobs.
425 */
426 bytes_read = r->max_size - r->size;
427 flags = asock->read_flags;
428 if (++loop >= asock->max_loop)
429 flags |= PJ_IOQUEUE_ALWAYS_ASYNC;
430
431 if (asock->read_type == TYPE_RECV) {
432 status = pj_ioqueue_recv(key, op_key, r->pkt + r->size,
433 &bytes_read, flags);
434 } else {
435 r->src_addr_len = sizeof(r->src_addr);
436 status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size,
437 &bytes_read, flags,
438 &r->src_addr, &r->src_addr_len);
439 }
440
441 } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
442
443}
444
445
446PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
447 pj_ioqueue_op_key_t *send_key,
448 const void *data,
449 pj_ssize_t *size,
450 unsigned flags)
451{
452 PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
453
454 return pj_ioqueue_send(asock->key, send_key, data, size, flags);
455}
456
457
458PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock,
459 pj_ioqueue_op_key_t *send_key,
460 const void *data,
461 pj_ssize_t *size,
462 unsigned flags,
463 const pj_sockaddr_t *addr,
464 int addr_len)
465{
466 PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len,
467 PJ_EINVAL);
468
469 return pj_ioqueue_sendto(asock->key, send_key, data, size, flags,
470 addr, addr_len);
471}
472
473
474static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
475 pj_ioqueue_op_key_t *op_key,
476 pj_ssize_t bytes_sent)
477{
478 pj_activesock_t *asock;
479
480 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
481
482 if (asock->cb.on_data_sent) {
483 pj_bool_t ret;
484
485 ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent);
486
487 /* If callback returns false, we have been destroyed! */
488 if (!ret)
489 return;
490 }
491}
492
Benny Prijono1dd54202008-07-25 10:45:34 +0000493#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000494PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock,
495 pj_pool_t *pool)
496{
497 unsigned i;
498
499 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
500 PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP);
501
502 asock->accept_op = (struct accept_op*)
503 pj_pool_calloc(pool, asock->async_count,
504 sizeof(struct accept_op));
505 for (i=0; i<asock->async_count; ++i) {
506 struct accept_op *a = &asock->accept_op[i];
507 pj_status_t status;
508
509 do {
510 a->new_sock = PJ_INVALID_SOCKET;
511 a->rem_addr_len = sizeof(a->rem_addr);
512
513 status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock,
514 NULL, &a->rem_addr, &a->rem_addr_len);
515 if (status == PJ_SUCCESS) {
516 /* We've got immediate connection. Not sure if it's a good
517 * idea to call the callback now (probably application will
518 * not be prepared to process it), so lets just silently
519 * close the socket.
520 */
521 pj_sock_close(a->new_sock);
522 }
523 } while (status == PJ_SUCCESS);
524
525 if (status != PJ_EPENDING) {
526 return status;
527 }
528 }
529
530 return PJ_SUCCESS;
531}
532
533
534static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
535 pj_ioqueue_op_key_t *op_key,
536 pj_sock_t new_sock,
537 pj_status_t status)
538{
539 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
540 struct accept_op *accept_op = (struct accept_op*) op_key;
541
542 do {
543 if (status==PJ_SUCCESS && asock->cb.on_accept_complete) {
544 pj_bool_t ret;
545
546 /* Notify callback */
547 ret = (*asock->cb.on_accept_complete)(asock, new_sock,
548 &accept_op->rem_addr,
549 accept_op->rem_addr_len);
550
551 /* If callback returns false, we have been destroyed! */
552 if (!ret)
553 return;
554
555 } else if (status==PJ_SUCCESS) {
556 /* Application doesn't handle the new socket, we need to
557 * close it to avoid resource leak.
558 */
559 pj_sock_close(accept_op->new_sock);
560 }
561
562 /* Prepare next accept() */
563 accept_op->new_sock = PJ_INVALID_SOCKET;
564 accept_op->rem_addr_len = sizeof(accept_op->rem_addr);
565
566 status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock,
567 NULL, &accept_op->rem_addr,
568 &accept_op->rem_addr_len);
569
570 } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
571}
572
573
574PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock,
575 pj_pool_t *pool,
576 const pj_sockaddr_t *remaddr,
577 int addr_len)
578{
579 PJ_UNUSED_ARG(pool);
580 return pj_ioqueue_connect(asock->key, remaddr, addr_len);
581}
582
Benny Prijono4bac2c12008-05-11 18:12:16 +0000583static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
584 pj_status_t status)
585{
586 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
587
588 if (asock->cb.on_connect_complete) {
589 pj_bool_t ret;
590
591 ret = (*asock->cb.on_connect_complete)(asock, status);
592
593 if (!ret) {
594 /* We've been destroyed */
595 return;
596 }
597 }
598}
Benny Prijono1dd54202008-07-25 10:45:34 +0000599#endif /* PJ_HAS_TCP */
Benny Prijono4bac2c12008-05-11 18:12:16 +0000600