blob: 0f322834615514d96a2d816ce237bdb2f2d7a131 [file] [log] [blame]
Alexandre Lision67916dd2014-01-24 13:33:04 -05001/* $Id$ */
2/*
3 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20#include <pj/activesock.h>
21#include <pj/compat/socket.h>
22#include <pj/assert.h>
23#include <pj/errno.h>
24#include <pj/log.h>
25#include <pj/pool.h>
26#include <pj/sock.h>
27#include <pj/string.h>
28
29#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
30 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
31# include <CFNetwork/CFNetwork.h>
32
33 static pj_bool_t ios_bg_support = PJ_TRUE;
34#endif
35
36#define PJ_ACTIVESOCK_MAX_LOOP 50
37
38
39enum read_type
40{
41 TYPE_NONE,
42 TYPE_RECV,
43 TYPE_RECV_FROM
44};
45
46enum shutdown_dir
47{
48 SHUT_NONE = 0,
49 SHUT_RX = 1,
50 SHUT_TX = 2
51};
52
53struct read_op
54{
55 pj_ioqueue_op_key_t op_key;
56 pj_uint8_t *pkt;
57 unsigned max_size;
58 pj_size_t size;
59 pj_sockaddr src_addr;
60 int src_addr_len;
61};
62
63struct accept_op
64{
65 pj_ioqueue_op_key_t op_key;
66 pj_sock_t new_sock;
67 pj_sockaddr rem_addr;
68 int rem_addr_len;
69};
70
71struct send_data
72{
73 pj_uint8_t *data;
74 pj_ssize_t len;
75 pj_ssize_t sent;
76 unsigned flags;
77};
78
79struct pj_activesock_t
80{
81 pj_ioqueue_key_t *key;
82 pj_bool_t stream_oriented;
83 pj_bool_t whole_data;
84 pj_ioqueue_t *ioqueue;
85 void *user_data;
86 unsigned async_count;
87 unsigned shutdown;
88 unsigned max_loop;
89 pj_activesock_cb cb;
90#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
91 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
92 int bg_setting;
93 pj_sock_t sock;
94 CFReadStreamRef readStream;
95#endif
96
97 unsigned err_counter;
98 pj_status_t last_err;
99
100 struct send_data send_data;
101
102 struct read_op *read_op;
103 pj_uint32_t read_flags;
104 enum read_type read_type;
105
106 struct accept_op *accept_op;
107};
108
109
110static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
111 pj_ioqueue_op_key_t *op_key,
112 pj_ssize_t bytes_read);
113static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
114 pj_ioqueue_op_key_t *op_key,
115 pj_ssize_t bytes_sent);
116#if PJ_HAS_TCP
117static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
118 pj_ioqueue_op_key_t *op_key,
119 pj_sock_t sock,
120 pj_status_t status);
121static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
122 pj_status_t status);
123#endif
124
125PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg)
126{
127 pj_bzero(cfg, sizeof(*cfg));
128 cfg->async_cnt = 1;
129 cfg->concurrency = -1;
130 cfg->whole_data = PJ_TRUE;
131}
132
133#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
134 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
135static void activesock_destroy_iphone_os_stream(pj_activesock_t *asock)
136{
137 if (asock->readStream) {
138 CFReadStreamClose(asock->readStream);
139 CFRelease(asock->readStream);
140 asock->readStream = NULL;
141 }
142}
143
144static void activesock_create_iphone_os_stream(pj_activesock_t *asock)
145{
146 if (ios_bg_support && asock->bg_setting && asock->stream_oriented) {
147 activesock_destroy_iphone_os_stream(asock);
148
149 CFStreamCreatePairWithSocket(kCFAllocatorDefault, asock->sock,
150 &asock->readStream, NULL);
151
152 if (!asock->readStream ||
153 CFReadStreamSetProperty(asock->readStream,
154 kCFStreamNetworkServiceType,
155 kCFStreamNetworkServiceTypeVoIP)
156 != TRUE ||
157 CFReadStreamOpen(asock->readStream) != TRUE)
158 {
159 PJ_LOG(2,("", "Failed to configure TCP transport for VoIP "
160 "usage. Background mode will not be supported."));
161
162 activesock_destroy_iphone_os_stream(asock);
163 }
164 }
165}
166
167
168PJ_DEF(void) pj_activesock_set_iphone_os_bg(pj_activesock_t *asock,
169 int val)
170{
171 asock->bg_setting = val;
172 if (asock->bg_setting)
173 activesock_create_iphone_os_stream(asock);
174 else
175 activesock_destroy_iphone_os_stream(asock);
176}
177
178PJ_DEF(void) pj_activesock_enable_iphone_os_bg(pj_bool_t val)
179{
180 ios_bg_support = val;
181}
182#endif
183
184PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
185 pj_sock_t sock,
186 int sock_type,
187 const pj_activesock_cfg *opt,
188 pj_ioqueue_t *ioqueue,
189 const pj_activesock_cb *cb,
190 void *user_data,
191 pj_activesock_t **p_asock)
192{
193 pj_activesock_t *asock;
194 pj_ioqueue_callback ioq_cb;
195 pj_status_t status;
196
197 PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL);
198 PJ_ASSERT_RETURN(sock!=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL);
199 PJ_ASSERT_RETURN(sock_type==pj_SOCK_STREAM() ||
200 sock_type==pj_SOCK_DGRAM(), PJ_EINVAL);
201 PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL);
202
203 asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t);
204 asock->ioqueue = ioqueue;
205 asock->stream_oriented = (sock_type == pj_SOCK_STREAM());
206 asock->async_count = (opt? opt->async_cnt : 1);
207 asock->whole_data = (opt? opt->whole_data : 1);
208 asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP;
209 asock->user_data = user_data;
210 pj_memcpy(&asock->cb, cb, sizeof(*cb));
211
212 pj_bzero(&ioq_cb, sizeof(ioq_cb));
213 ioq_cb.on_read_complete = &ioqueue_on_read_complete;
214 ioq_cb.on_write_complete = &ioqueue_on_write_complete;
215#if PJ_HAS_TCP
216 ioq_cb.on_connect_complete = &ioqueue_on_connect_complete;
217 ioq_cb.on_accept_complete = &ioqueue_on_accept_complete;
218#endif
219
220 status = pj_ioqueue_register_sock2(pool, ioqueue, sock,
221 (opt? opt->grp_lock : NULL),
222 asock, &ioq_cb, &asock->key);
223 if (status != PJ_SUCCESS) {
224 pj_activesock_close(asock);
225 return status;
226 }
227
228 if (asock->whole_data) {
229 /* Must disable concurrency otherwise there is a race condition */
230 pj_ioqueue_set_concurrency(asock->key, 0);
231 } else if (opt && opt->concurrency >= 0) {
232 pj_ioqueue_set_concurrency(asock->key, opt->concurrency);
233 }
234
235#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
236 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
237 asock->sock = sock;
238 asock->bg_setting = PJ_ACTIVESOCK_TCP_IPHONE_OS_BG;
239#endif
240
241 *p_asock = asock;
242 return PJ_SUCCESS;
243}
244
245
246PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool,
247 const pj_sockaddr *addr,
248 const pj_activesock_cfg *opt,
249 pj_ioqueue_t *ioqueue,
250 const pj_activesock_cb *cb,
251 void *user_data,
252 pj_activesock_t **p_asock,
253 pj_sockaddr *bound_addr)
254{
255 pj_sock_t sock_fd;
256 pj_sockaddr default_addr;
257 pj_status_t status;
258
259 if (addr == NULL) {
260 pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0);
261 addr = &default_addr;
262 }
263
264 status = pj_sock_socket(addr->addr.sa_family, pj_SOCK_DGRAM(), 0,
265 &sock_fd);
266 if (status != PJ_SUCCESS) {
267 return status;
268 }
269
270 status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr));
271 if (status != PJ_SUCCESS) {
272 pj_sock_close(sock_fd);
273 return status;
274 }
275
276 status = pj_activesock_create(pool, sock_fd, pj_SOCK_DGRAM(), opt,
277 ioqueue, cb, user_data, p_asock);
278 if (status != PJ_SUCCESS) {
279 pj_sock_close(sock_fd);
280 return status;
281 }
282
283 if (bound_addr) {
284 int addr_len = sizeof(*bound_addr);
285 status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len);
286 if (status != PJ_SUCCESS) {
287 pj_activesock_close(*p_asock);
288 return status;
289 }
290 }
291
292 return PJ_SUCCESS;
293}
294
295PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock)
296{
297 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
298 asock->shutdown = SHUT_RX | SHUT_TX;
299 if (asock->key) {
300#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
301 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
302 activesock_destroy_iphone_os_stream(asock);
303#endif
304
305 pj_ioqueue_unregister(asock->key);
306 asock->key = NULL;
307 }
308 return PJ_SUCCESS;
309}
310
311
312PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock,
313 void *user_data)
314{
315 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
316 asock->user_data = user_data;
317 return PJ_SUCCESS;
318}
319
320
321PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock)
322{
323 PJ_ASSERT_RETURN(asock, NULL);
324 return asock->user_data;
325}
326
327
328PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock,
329 pj_pool_t *pool,
330 unsigned buff_size,
331 pj_uint32_t flags)
332{
333 void **readbuf;
334 unsigned i;
335
336 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
337
338 readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
339 sizeof(void*));
340
341 for (i=0; i<asock->async_count; ++i) {
342 readbuf[i] = pj_pool_alloc(pool, buff_size);
343 }
344
345 return pj_activesock_start_read2(asock, pool, buff_size, readbuf, flags);
346}
347
348
349PJ_DEF(pj_status_t) pj_activesock_start_read2( pj_activesock_t *asock,
350 pj_pool_t *pool,
351 unsigned buff_size,
352 void *readbuf[],
353 pj_uint32_t flags)
354{
355 unsigned i;
356 pj_status_t status;
357
358 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
359 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
360 PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP);
361
362 asock->read_op = (struct read_op*)
363 pj_pool_calloc(pool, asock->async_count,
364 sizeof(struct read_op));
365 asock->read_type = TYPE_RECV;
366 asock->read_flags = flags;
367
368 for (i=0; i<asock->async_count; ++i) {
369 struct read_op *r = &asock->read_op[i];
370 pj_ssize_t size_to_read;
371
372 r->pkt = (pj_uint8_t*)readbuf[i];
373 size_to_read = r->max_size = buff_size;
374
375 status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read,
376 PJ_IOQUEUE_ALWAYS_ASYNC | flags);
377 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
378
379 if (status != PJ_EPENDING)
380 return status;
381 }
382
383 return PJ_SUCCESS;
384}
385
386
387PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock,
388 pj_pool_t *pool,
389 unsigned buff_size,
390 pj_uint32_t flags)
391{
392 void **readbuf;
393 unsigned i;
394
395 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
396
397 readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
398 sizeof(void*));
399
400 for (i=0; i<asock->async_count; ++i) {
401 readbuf[i] = pj_pool_alloc(pool, buff_size);
402 }
403
404 return pj_activesock_start_recvfrom2(asock, pool, buff_size,
405 readbuf, flags);
406}
407
408
409PJ_DEF(pj_status_t) pj_activesock_start_recvfrom2( pj_activesock_t *asock,
410 pj_pool_t *pool,
411 unsigned buff_size,
412 void *readbuf[],
413 pj_uint32_t flags)
414{
415 unsigned i;
416 pj_status_t status;
417
418 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
419 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
420
421 asock->read_op = (struct read_op*)
422 pj_pool_calloc(pool, asock->async_count,
423 sizeof(struct read_op));
424 asock->read_type = TYPE_RECV_FROM;
425 asock->read_flags = flags;
426
427 for (i=0; i<asock->async_count; ++i) {
428 struct read_op *r = &asock->read_op[i];
429 pj_ssize_t size_to_read;
430
431 r->pkt = (pj_uint8_t*) readbuf[i];
432 size_to_read = r->max_size = buff_size;
433 r->src_addr_len = sizeof(r->src_addr);
434
435 status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt,
436 &size_to_read,
437 PJ_IOQUEUE_ALWAYS_ASYNC | flags,
438 &r->src_addr, &r->src_addr_len);
439 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
440
441 if (status != PJ_EPENDING)
442 return status;
443 }
444
445 return PJ_SUCCESS;
446}
447
448
449static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
450 pj_ioqueue_op_key_t *op_key,
451 pj_ssize_t bytes_read)
452{
453 pj_activesock_t *asock;
454 struct read_op *r = (struct read_op*)op_key;
455 unsigned loop = 0;
456 pj_status_t status;
457
458 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
459
460 /* Ignore if we've been shutdown */
461 if (asock->shutdown & SHUT_RX)
462 return;
463
464 do {
465 unsigned flags;
466
467 if (bytes_read > 0) {
468 /*
469 * We've got new data.
470 */
471 pj_size_t remainder;
472 pj_bool_t ret;
473
474 /* Append this new data to existing data. If socket is stream
475 * oriented, user might have left some data in the buffer.
476 * Otherwise if socket is datagram there will be nothing in
477 * existing packet hence the packet will contain only the new
478 * packet.
479 */
480 r->size += bytes_read;
481
482 /* Set default remainder to zero */
483 remainder = 0;
484
485 /* And return value to TRUE */
486 ret = PJ_TRUE;
487
488 /* Notify callback */
489 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
490 ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
491 PJ_SUCCESS, &remainder);
492 } else if (asock->read_type == TYPE_RECV_FROM &&
493 asock->cb.on_data_recvfrom)
494 {
495 ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size,
496 &r->src_addr,
497 r->src_addr_len,
498 PJ_SUCCESS);
499 }
500
501 /* If callback returns false, we have been destroyed! */
502 if (!ret)
503 return;
504
505 /* Only stream oriented socket may leave data in the packet */
506 if (asock->stream_oriented) {
507 r->size = remainder;
508 } else {
509 r->size = 0;
510 }
511
512 } else if (bytes_read <= 0 &&
513 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
514 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
515 (asock->stream_oriented ||
516 -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)))
517 {
518 pj_size_t remainder;
519 pj_bool_t ret;
520
521 if (bytes_read == 0) {
522 /* For stream/connection oriented socket, this means the
523 * connection has been closed. For datagram sockets, it means
524 * we've received datagram with zero length.
525 */
526 if (asock->stream_oriented)
527 status = PJ_EEOF;
528 else
529 status = PJ_SUCCESS;
530 } else {
531 /* This means we've got an error. If this is stream/connection
532 * oriented, it means connection has been closed. For datagram
533 * sockets, it means we've got some error (e.g. EWOULDBLOCK).
534 */
535 status = (pj_status_t)-bytes_read;
536 }
537
538 /* Set default remainder to zero */
539 remainder = 0;
540
541 /* And return value to TRUE */
542 ret = PJ_TRUE;
543
544 /* Notify callback */
545 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
546 /* For connection oriented socket, we still need to report
547 * the remainder data (if any) to the user to let user do
548 * processing with the remainder data before it closes the
549 * connection.
550 * If there is no remainder data, set the packet to NULL.
551 */
552
553 /* Shouldn't set the packet to NULL, as there may be active
554 * socket user, such as SSL socket, that needs to have access
555 * to the read buffer packet.
556 */
557 //ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL),
558 // r->size, status, &remainder);
559 ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
560 status, &remainder);
561
562 } else if (asock->read_type == TYPE_RECV_FROM &&
563 asock->cb.on_data_recvfrom)
564 {
565 /* This would always be datagram oriented hence there's
566 * nothing in the packet. We can't be sure if there will be
567 * anything useful in the source_addr, so just put NULL
568 * there too.
569 */
570 /* In some scenarios, status may be PJ_SUCCESS. The upper
571 * layer application may not expect the callback to be called
572 * with successful status and NULL data, so lets not call the
573 * callback if the status is PJ_SUCCESS.
574 */
575 if (status != PJ_SUCCESS ) {
576 ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0,
577 NULL, 0, status);
578 }
579 }
580
581 /* If callback returns false, we have been destroyed! */
582 if (!ret)
583 return;
584
585 /* Also stop further read if we've been shutdown */
586 if (asock->shutdown & SHUT_RX)
587 return;
588
589 /* Only stream oriented socket may leave data in the packet */
590 if (asock->stream_oriented) {
591 r->size = remainder;
592 } else {
593 r->size = 0;
594 }
595 }
596
597 /* Read next data. We limit ourselves to processing max_loop immediate
598 * data, so when the loop counter has exceeded this value, force the
599 * read()/recvfrom() to return pending operation to allow the program
600 * to do other jobs.
601 */
602 bytes_read = r->max_size - r->size;
603 flags = asock->read_flags;
604 if (++loop >= asock->max_loop)
605 flags |= PJ_IOQUEUE_ALWAYS_ASYNC;
606
607 if (asock->read_type == TYPE_RECV) {
608 status = pj_ioqueue_recv(key, op_key, r->pkt + r->size,
609 &bytes_read, flags);
610 } else {
611 r->src_addr_len = sizeof(r->src_addr);
612 status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size,
613 &bytes_read, flags,
614 &r->src_addr, &r->src_addr_len);
615 }
616
617 if (status == PJ_SUCCESS) {
618 /* Immediate data */
619 ;
620 } else if (status != PJ_EPENDING && status != PJ_ECANCELLED) {
621 /* Error */
622 bytes_read = -status;
623 } else {
624 break;
625 }
626 } while (1);
627
628}
629
630
631static pj_status_t send_remaining(pj_activesock_t *asock,
632 pj_ioqueue_op_key_t *send_key)
633{
634 struct send_data *sd = (struct send_data*)send_key->activesock_data;
635 pj_status_t status;
636
637 do {
638 pj_ssize_t size;
639
640 size = sd->len - sd->sent;
641 status = pj_ioqueue_send(asock->key, send_key,
642 sd->data+sd->sent, &size, sd->flags);
643 if (status != PJ_SUCCESS) {
644 /* Pending or error */
645 break;
646 }
647
648 sd->sent += size;
649 if (sd->sent == sd->len) {
650 /* The whole data has been sent. */
651 return PJ_SUCCESS;
652 }
653
654 } while (sd->sent < sd->len);
655
656 return status;
657}
658
659
660PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
661 pj_ioqueue_op_key_t *send_key,
662 const void *data,
663 pj_ssize_t *size,
664 unsigned flags)
665{
666 PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
667
668 if (asock->shutdown & SHUT_TX)
669 return PJ_EINVALIDOP;
670
671 send_key->activesock_data = NULL;
672
673 if (asock->whole_data) {
674 pj_ssize_t whole;
675 pj_status_t status;
676
677 whole = *size;
678
679 status = pj_ioqueue_send(asock->key, send_key, data, size, flags);
680 if (status != PJ_SUCCESS) {
681 /* Pending or error */
682 return status;
683 }
684
685 if (*size == whole) {
686 /* The whole data has been sent. */
687 return PJ_SUCCESS;
688 }
689
690 /* Data was partially sent */
691 asock->send_data.data = (pj_uint8_t*)data;
692 asock->send_data.len = whole;
693 asock->send_data.sent = *size;
694 asock->send_data.flags = flags;
695 send_key->activesock_data = &asock->send_data;
696
697 /* Try again */
698 status = send_remaining(asock, send_key);
699 if (status == PJ_SUCCESS) {
700 *size = whole;
701 }
702 return status;
703
704 } else {
705 return pj_ioqueue_send(asock->key, send_key, data, size, flags);
706 }
707}
708
709
710PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock,
711 pj_ioqueue_op_key_t *send_key,
712 const void *data,
713 pj_ssize_t *size,
714 unsigned flags,
715 const pj_sockaddr_t *addr,
716 int addr_len)
717{
718 PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len,
719 PJ_EINVAL);
720
721 if (asock->shutdown & SHUT_TX)
722 return PJ_EINVALIDOP;
723
724 return pj_ioqueue_sendto(asock->key, send_key, data, size, flags,
725 addr, addr_len);
726}
727
728
729static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
730 pj_ioqueue_op_key_t *op_key,
731 pj_ssize_t bytes_sent)
732{
733 pj_activesock_t *asock;
734
735 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
736
737 /* Ignore if we've been shutdown. This may cause data to be partially
738 * sent even when 'wholedata' was requested if the OS only sent partial
739 * buffer.
740 */
741 if (asock->shutdown & SHUT_TX)
742 return;
743
744 if (bytes_sent > 0 && op_key->activesock_data) {
745 /* whole_data is requested. Make sure we send all the data */
746 struct send_data *sd = (struct send_data*)op_key->activesock_data;
747
748 sd->sent += bytes_sent;
749 if (sd->sent == sd->len) {
750 /* all has been sent */
751 bytes_sent = sd->sent;
752 op_key->activesock_data = NULL;
753 } else {
754 /* send remaining data */
755 pj_status_t status;
756
757 status = send_remaining(asock, op_key);
758 if (status == PJ_EPENDING)
759 return;
760 else if (status == PJ_SUCCESS)
761 bytes_sent = sd->sent;
762 else
763 bytes_sent = -status;
764
765 op_key->activesock_data = NULL;
766 }
767 }
768
769 if (asock->cb.on_data_sent) {
770 pj_bool_t ret;
771
772 ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent);
773
774 /* If callback returns false, we have been destroyed! */
775 if (!ret)
776 return;
777 }
778}
779
780#if PJ_HAS_TCP
781PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock,
782 pj_pool_t *pool)
783{
784 unsigned i;
785
786 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
787 PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP);
788
789 /* Ignore if we've been shutdown */
790 if (asock->shutdown)
791 return PJ_EINVALIDOP;
792
793 asock->accept_op = (struct accept_op*)
794 pj_pool_calloc(pool, asock->async_count,
795 sizeof(struct accept_op));
796 for (i=0; i<asock->async_count; ++i) {
797 struct accept_op *a = &asock->accept_op[i];
798 pj_status_t status;
799
800 do {
801 a->new_sock = PJ_INVALID_SOCKET;
802 a->rem_addr_len = sizeof(a->rem_addr);
803
804 status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock,
805 NULL, &a->rem_addr, &a->rem_addr_len);
806 if (status == PJ_SUCCESS) {
807 /* We've got immediate connection. Not sure if it's a good
808 * idea to call the callback now (probably application will
809 * not be prepared to process it), so lets just silently
810 * close the socket.
811 */
812 pj_sock_close(a->new_sock);
813 }
814 } while (status == PJ_SUCCESS);
815
816 if (status != PJ_EPENDING) {
817 return status;
818 }
819 }
820
821 return PJ_SUCCESS;
822}
823
824
825static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
826 pj_ioqueue_op_key_t *op_key,
827 pj_sock_t new_sock,
828 pj_status_t status)
829{
830 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
831 struct accept_op *accept_op = (struct accept_op*) op_key;
832
833 PJ_UNUSED_ARG(new_sock);
834
835 /* Ignore if we've been shutdown */
836 if (asock->shutdown)
837 return;
838
839 do {
840 if (status == asock->last_err && status != PJ_SUCCESS) {
841 asock->err_counter++;
842 if (asock->err_counter >= PJ_ACTIVESOCK_MAX_CONSECUTIVE_ACCEPT_ERROR) {
843 PJ_LOG(3, ("", "Received %d consecutive errors: %d for the accept()"
844 " operation, stopping further ioqueue accepts.",
845 asock->err_counter, asock->last_err));
846
847 if ((status == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) &&
848 (asock->cb.on_accept_complete2))
849 {
850 (*asock->cb.on_accept_complete2)(asock,
851 accept_op->new_sock,
852 &accept_op->rem_addr,
853 accept_op->rem_addr_len,
854 PJ_ESOCKETSTOP);
855 }
856 return;
857 }
858 } else {
859 asock->err_counter = 0;
860 asock->last_err = status;
861 }
862
863 if (status==PJ_SUCCESS && (asock->cb.on_accept_complete2 ||
864 asock->cb.on_accept_complete)) {
865 pj_bool_t ret;
866
867 /* Notify callback */
868 if (asock->cb.on_accept_complete2) {
869 ret = (*asock->cb.on_accept_complete2)(asock,
870 accept_op->new_sock,
871 &accept_op->rem_addr,
872 accept_op->rem_addr_len,
873 status);
874 } else {
875 ret = (*asock->cb.on_accept_complete)(asock,
876 accept_op->new_sock,
877 &accept_op->rem_addr,
878 accept_op->rem_addr_len);
879 }
880
881 /* If callback returns false, we have been destroyed! */
882 if (!ret)
883 return;
884
885#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
886 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
887 activesock_create_iphone_os_stream(asock);
888#endif
889 } else if (status==PJ_SUCCESS) {
890 /* Application doesn't handle the new socket, we need to
891 * close it to avoid resource leak.
892 */
893 pj_sock_close(accept_op->new_sock);
894 }
895
896 /* Don't start another accept() if we've been shutdown */
897 if (asock->shutdown)
898 return;
899
900 /* Prepare next accept() */
901 accept_op->new_sock = PJ_INVALID_SOCKET;
902 accept_op->rem_addr_len = sizeof(accept_op->rem_addr);
903
904 status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock,
905 NULL, &accept_op->rem_addr,
906 &accept_op->rem_addr_len);
907
908 } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
909}
910
911
912PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock,
913 pj_pool_t *pool,
914 const pj_sockaddr_t *remaddr,
915 int addr_len)
916{
917 PJ_UNUSED_ARG(pool);
918
919 if (asock->shutdown)
920 return PJ_EINVALIDOP;
921
922 return pj_ioqueue_connect(asock->key, remaddr, addr_len);
923}
924
925static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
926 pj_status_t status)
927{
928 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
929
930 /* Ignore if we've been shutdown */
931 if (asock->shutdown)
932 return;
933
934 if (asock->cb.on_connect_complete) {
935 pj_bool_t ret;
936
937 ret = (*asock->cb.on_connect_complete)(asock, status);
938
939 if (!ret) {
940 /* We've been destroyed */
941 return;
942 }
943
944#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
945 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
946 activesock_create_iphone_os_stream(asock);
947#endif
948
949 }
950}
951#endif /* PJ_HAS_TCP */
952