| /* $Id$ */ |
| /* |
| * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) |
| * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> |
| * |
| * This program is free software; you can redistribute it and/or modify |
| * it under the terms of the GNU General Public License as published by |
| * the Free Software Foundation; either version 2 of the License, or |
| * (at your option) any later version. |
| * |
| * This program is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| * GNU General Public License for more details. |
| * |
| * You should have received a copy of the GNU General Public License |
| * along with this program; if not, write to the Free Software |
| * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
| */ |
| #include <pj/activesock.h> |
| #include <pj/compat/socket.h> |
| #include <pj/assert.h> |
| #include <pj/errno.h> |
| #include <pj/log.h> |
| #include <pj/pool.h> |
| #include <pj/sock.h> |
| #include <pj/string.h> |
| |
| #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ |
| PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 |
| # include <CFNetwork/CFNetwork.h> |
| |
| static pj_bool_t ios_bg_support = PJ_TRUE; |
| #endif |
| |
| #define PJ_ACTIVESOCK_MAX_LOOP 50 |
| |
| |
| enum read_type |
| { |
| TYPE_NONE, |
| TYPE_RECV, |
| TYPE_RECV_FROM |
| }; |
| |
| enum shutdown_dir |
| { |
| SHUT_NONE = 0, |
| SHUT_RX = 1, |
| SHUT_TX = 2 |
| }; |
| |
| struct read_op |
| { |
| pj_ioqueue_op_key_t op_key; |
| pj_uint8_t *pkt; |
| unsigned max_size; |
| pj_size_t size; |
| pj_sockaddr src_addr; |
| int src_addr_len; |
| }; |
| |
| struct accept_op |
| { |
| pj_ioqueue_op_key_t op_key; |
| pj_sock_t new_sock; |
| pj_sockaddr rem_addr; |
| int rem_addr_len; |
| }; |
| |
| struct send_data |
| { |
| pj_uint8_t *data; |
| pj_ssize_t len; |
| pj_ssize_t sent; |
| unsigned flags; |
| }; |
| |
| struct pj_activesock_t |
| { |
| pj_ioqueue_key_t *key; |
| pj_bool_t stream_oriented; |
| pj_bool_t whole_data; |
| pj_ioqueue_t *ioqueue; |
| void *user_data; |
| unsigned async_count; |
| unsigned shutdown; |
| unsigned max_loop; |
| pj_activesock_cb cb; |
| #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ |
| PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 |
| int bg_setting; |
| pj_sock_t sock; |
| CFReadStreamRef readStream; |
| #endif |
| |
| unsigned err_counter; |
| pj_status_t last_err; |
| |
| struct send_data send_data; |
| |
| struct read_op *read_op; |
| pj_uint32_t read_flags; |
| enum read_type read_type; |
| |
| struct accept_op *accept_op; |
| }; |
| |
| |
| static void ioqueue_on_read_complete(pj_ioqueue_key_t *key, |
| pj_ioqueue_op_key_t *op_key, |
| pj_ssize_t bytes_read); |
| static void ioqueue_on_write_complete(pj_ioqueue_key_t *key, |
| pj_ioqueue_op_key_t *op_key, |
| pj_ssize_t bytes_sent); |
| #if PJ_HAS_TCP |
| static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key, |
| pj_ioqueue_op_key_t *op_key, |
| pj_sock_t sock, |
| pj_status_t status); |
| static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key, |
| pj_status_t status); |
| #endif |
| |
| PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg) |
| { |
| pj_bzero(cfg, sizeof(*cfg)); |
| cfg->async_cnt = 1; |
| cfg->concurrency = -1; |
| cfg->whole_data = PJ_TRUE; |
| } |
| |
| #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ |
| PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 |
| static void activesock_destroy_iphone_os_stream(pj_activesock_t *asock) |
| { |
| if (asock->readStream) { |
| CFReadStreamClose(asock->readStream); |
| CFRelease(asock->readStream); |
| asock->readStream = NULL; |
| } |
| } |
| |
| static void activesock_create_iphone_os_stream(pj_activesock_t *asock) |
| { |
| if (ios_bg_support && asock->bg_setting && asock->stream_oriented) { |
| activesock_destroy_iphone_os_stream(asock); |
| |
| CFStreamCreatePairWithSocket(kCFAllocatorDefault, asock->sock, |
| &asock->readStream, NULL); |
| |
| if (!asock->readStream || |
| CFReadStreamSetProperty(asock->readStream, |
| kCFStreamNetworkServiceType, |
| kCFStreamNetworkServiceTypeVoIP) |
| != TRUE || |
| CFReadStreamOpen(asock->readStream) != TRUE) |
| { |
| PJ_LOG(2,("", "Failed to configure TCP transport for VoIP " |
| "usage. Background mode will not be supported.")); |
| |
| activesock_destroy_iphone_os_stream(asock); |
| } |
| } |
| } |
| |
| |
| PJ_DEF(void) pj_activesock_set_iphone_os_bg(pj_activesock_t *asock, |
| int val) |
| { |
| asock->bg_setting = val; |
| if (asock->bg_setting) |
| activesock_create_iphone_os_stream(asock); |
| else |
| activesock_destroy_iphone_os_stream(asock); |
| } |
| |
| PJ_DEF(void) pj_activesock_enable_iphone_os_bg(pj_bool_t val) |
| { |
| ios_bg_support = val; |
| } |
| #endif |
| |
| PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool, |
| pj_sock_t sock, |
| int sock_type, |
| const pj_activesock_cfg *opt, |
| pj_ioqueue_t *ioqueue, |
| const pj_activesock_cb *cb, |
| void *user_data, |
| pj_activesock_t **p_asock) |
| { |
| pj_activesock_t *asock; |
| pj_ioqueue_callback ioq_cb; |
| pj_status_t status; |
| |
| PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL); |
| PJ_ASSERT_RETURN(sock!=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL); |
| PJ_ASSERT_RETURN(sock_type==pj_SOCK_STREAM() || |
| sock_type==pj_SOCK_DGRAM(), PJ_EINVAL); |
| PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL); |
| |
| asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t); |
| asock->ioqueue = ioqueue; |
| asock->stream_oriented = (sock_type == pj_SOCK_STREAM()); |
| asock->async_count = (opt? opt->async_cnt : 1); |
| asock->whole_data = (opt? opt->whole_data : 1); |
| asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP; |
| asock->user_data = user_data; |
| pj_memcpy(&asock->cb, cb, sizeof(*cb)); |
| |
| pj_bzero(&ioq_cb, sizeof(ioq_cb)); |
| ioq_cb.on_read_complete = &ioqueue_on_read_complete; |
| ioq_cb.on_write_complete = &ioqueue_on_write_complete; |
| #if PJ_HAS_TCP |
| ioq_cb.on_connect_complete = &ioqueue_on_connect_complete; |
| ioq_cb.on_accept_complete = &ioqueue_on_accept_complete; |
| #endif |
| |
| status = pj_ioqueue_register_sock2(pool, ioqueue, sock, |
| (opt? opt->grp_lock : NULL), |
| asock, &ioq_cb, &asock->key); |
| if (status != PJ_SUCCESS) { |
| pj_activesock_close(asock); |
| return status; |
| } |
| |
| if (asock->whole_data) { |
| /* Must disable concurrency otherwise there is a race condition */ |
| pj_ioqueue_set_concurrency(asock->key, 0); |
| } else if (opt && opt->concurrency >= 0) { |
| pj_ioqueue_set_concurrency(asock->key, opt->concurrency); |
| } |
| |
| #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ |
| PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 |
| asock->sock = sock; |
| asock->bg_setting = PJ_ACTIVESOCK_TCP_IPHONE_OS_BG; |
| #endif |
| |
| *p_asock = asock; |
| return PJ_SUCCESS; |
| } |
| |
| |
| PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool, |
| const pj_sockaddr *addr, |
| const pj_activesock_cfg *opt, |
| pj_ioqueue_t *ioqueue, |
| const pj_activesock_cb *cb, |
| void *user_data, |
| pj_activesock_t **p_asock, |
| pj_sockaddr *bound_addr) |
| { |
| pj_sock_t sock_fd; |
| pj_sockaddr default_addr; |
| pj_status_t status; |
| |
| if (addr == NULL) { |
| pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0); |
| addr = &default_addr; |
| } |
| |
| status = pj_sock_socket(addr->addr.sa_family, pj_SOCK_DGRAM(), 0, |
| &sock_fd); |
| if (status != PJ_SUCCESS) { |
| return status; |
| } |
| |
| status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr)); |
| if (status != PJ_SUCCESS) { |
| pj_sock_close(sock_fd); |
| return status; |
| } |
| |
| status = pj_activesock_create(pool, sock_fd, pj_SOCK_DGRAM(), opt, |
| ioqueue, cb, user_data, p_asock); |
| if (status != PJ_SUCCESS) { |
| pj_sock_close(sock_fd); |
| return status; |
| } |
| |
| if (bound_addr) { |
| int addr_len = sizeof(*bound_addr); |
| status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len); |
| if (status != PJ_SUCCESS) { |
| pj_activesock_close(*p_asock); |
| return status; |
| } |
| } |
| |
| return PJ_SUCCESS; |
| } |
| |
| PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock) |
| { |
| PJ_ASSERT_RETURN(asock, PJ_EINVAL); |
| asock->shutdown = SHUT_RX | SHUT_TX; |
| if (asock->key) { |
| #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ |
| PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 |
| activesock_destroy_iphone_os_stream(asock); |
| #endif |
| |
| pj_ioqueue_unregister(asock->key); |
| asock->key = NULL; |
| } |
| return PJ_SUCCESS; |
| } |
| |
| |
| PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock, |
| void *user_data) |
| { |
| PJ_ASSERT_RETURN(asock, PJ_EINVAL); |
| asock->user_data = user_data; |
| return PJ_SUCCESS; |
| } |
| |
| |
| PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock) |
| { |
| PJ_ASSERT_RETURN(asock, NULL); |
| return asock->user_data; |
| } |
| |
| |
| PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock, |
| pj_pool_t *pool, |
| unsigned buff_size, |
| pj_uint32_t flags) |
| { |
| void **readbuf; |
| unsigned i; |
| |
| PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL); |
| |
| readbuf = (void**) pj_pool_calloc(pool, asock->async_count, |
| sizeof(void*)); |
| |
| for (i=0; i<asock->async_count; ++i) { |
| readbuf[i] = pj_pool_alloc(pool, buff_size); |
| } |
| |
| return pj_activesock_start_read2(asock, pool, buff_size, readbuf, flags); |
| } |
| |
| |
| PJ_DEF(pj_status_t) pj_activesock_start_read2( pj_activesock_t *asock, |
| pj_pool_t *pool, |
| unsigned buff_size, |
| void *readbuf[], |
| pj_uint32_t flags) |
| { |
| unsigned i; |
| pj_status_t status; |
| |
| PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL); |
| PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP); |
| PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP); |
| |
| asock->read_op = (struct read_op*) |
| pj_pool_calloc(pool, asock->async_count, |
| sizeof(struct read_op)); |
| asock->read_type = TYPE_RECV; |
| asock->read_flags = flags; |
| |
| for (i=0; i<asock->async_count; ++i) { |
| struct read_op *r = &asock->read_op[i]; |
| pj_ssize_t size_to_read; |
| |
| r->pkt = (pj_uint8_t*)readbuf[i]; |
| size_to_read = r->max_size = buff_size; |
| |
| status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read, |
| PJ_IOQUEUE_ALWAYS_ASYNC | flags); |
| PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG); |
| |
| if (status != PJ_EPENDING) |
| return status; |
| } |
| |
| return PJ_SUCCESS; |
| } |
| |
| |
| PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock, |
| pj_pool_t *pool, |
| unsigned buff_size, |
| pj_uint32_t flags) |
| { |
| void **readbuf; |
| unsigned i; |
| |
| PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL); |
| |
| readbuf = (void**) pj_pool_calloc(pool, asock->async_count, |
| sizeof(void*)); |
| |
| for (i=0; i<asock->async_count; ++i) { |
| readbuf[i] = pj_pool_alloc(pool, buff_size); |
| } |
| |
| return pj_activesock_start_recvfrom2(asock, pool, buff_size, |
| readbuf, flags); |
| } |
| |
| |
| PJ_DEF(pj_status_t) pj_activesock_start_recvfrom2( pj_activesock_t *asock, |
| pj_pool_t *pool, |
| unsigned buff_size, |
| void *readbuf[], |
| pj_uint32_t flags) |
| { |
| unsigned i; |
| pj_status_t status; |
| |
| PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL); |
| PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP); |
| |
| asock->read_op = (struct read_op*) |
| pj_pool_calloc(pool, asock->async_count, |
| sizeof(struct read_op)); |
| asock->read_type = TYPE_RECV_FROM; |
| asock->read_flags = flags; |
| |
| for (i=0; i<asock->async_count; ++i) { |
| struct read_op *r = &asock->read_op[i]; |
| pj_ssize_t size_to_read; |
| |
| r->pkt = (pj_uint8_t*) readbuf[i]; |
| size_to_read = r->max_size = buff_size; |
| r->src_addr_len = sizeof(r->src_addr); |
| |
| status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt, |
| &size_to_read, |
| PJ_IOQUEUE_ALWAYS_ASYNC | flags, |
| &r->src_addr, &r->src_addr_len); |
| PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG); |
| |
| if (status != PJ_EPENDING) |
| return status; |
| } |
| |
| return PJ_SUCCESS; |
| } |
| |
| |
| static void ioqueue_on_read_complete(pj_ioqueue_key_t *key, |
| pj_ioqueue_op_key_t *op_key, |
| pj_ssize_t bytes_read) |
| { |
| pj_activesock_t *asock; |
| struct read_op *r = (struct read_op*)op_key; |
| unsigned loop = 0; |
| pj_status_t status; |
| |
| asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); |
| |
| /* Ignore if we've been shutdown */ |
| if (asock->shutdown & SHUT_RX) |
| return; |
| |
| do { |
| unsigned flags; |
| |
| if (bytes_read > 0) { |
| /* |
| * We've got new data. |
| */ |
| pj_size_t remainder; |
| pj_bool_t ret; |
| |
| /* Append this new data to existing data. If socket is stream |
| * oriented, user might have left some data in the buffer. |
| * Otherwise if socket is datagram there will be nothing in |
| * existing packet hence the packet will contain only the new |
| * packet. |
| */ |
| r->size += bytes_read; |
| |
| /* Set default remainder to zero */ |
| remainder = 0; |
| |
| /* And return value to TRUE */ |
| ret = PJ_TRUE; |
| |
| /* Notify callback */ |
| if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) { |
| ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size, |
| PJ_SUCCESS, &remainder); |
| } else if (asock->read_type == TYPE_RECV_FROM && |
| asock->cb.on_data_recvfrom) |
| { |
| ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size, |
| &r->src_addr, |
| r->src_addr_len, |
| PJ_SUCCESS); |
| } |
| |
| /* If callback returns false, we have been destroyed! */ |
| if (!ret) |
| return; |
| |
| /* Only stream oriented socket may leave data in the packet */ |
| if (asock->stream_oriented) { |
| r->size = remainder; |
| } else { |
| r->size = 0; |
| } |
| |
| } else if (bytes_read <= 0 && |
| -bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) && |
| -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) && |
| (asock->stream_oriented || |
| -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET))) |
| { |
| pj_size_t remainder; |
| pj_bool_t ret; |
| |
| if (bytes_read == 0) { |
| /* For stream/connection oriented socket, this means the |
| * connection has been closed. For datagram sockets, it means |
| * we've received datagram with zero length. |
| */ |
| if (asock->stream_oriented) |
| status = PJ_EEOF; |
| else |
| status = PJ_SUCCESS; |
| } else { |
| /* This means we've got an error. If this is stream/connection |
| * oriented, it means connection has been closed. For datagram |
| * sockets, it means we've got some error (e.g. EWOULDBLOCK). |
| */ |
| status = (pj_status_t)-bytes_read; |
| } |
| |
| /* Set default remainder to zero */ |
| remainder = 0; |
| |
| /* And return value to TRUE */ |
| ret = PJ_TRUE; |
| |
| /* Notify callback */ |
| if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) { |
| /* For connection oriented socket, we still need to report |
| * the remainder data (if any) to the user to let user do |
| * processing with the remainder data before it closes the |
| * connection. |
| * If there is no remainder data, set the packet to NULL. |
| */ |
| |
| /* Shouldn't set the packet to NULL, as there may be active |
| * socket user, such as SSL socket, that needs to have access |
| * to the read buffer packet. |
| */ |
| //ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL), |
| // r->size, status, &remainder); |
| ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size, |
| status, &remainder); |
| |
| } else if (asock->read_type == TYPE_RECV_FROM && |
| asock->cb.on_data_recvfrom) |
| { |
| /* This would always be datagram oriented hence there's |
| * nothing in the packet. We can't be sure if there will be |
| * anything useful in the source_addr, so just put NULL |
| * there too. |
| */ |
| /* In some scenarios, status may be PJ_SUCCESS. The upper |
| * layer application may not expect the callback to be called |
| * with successful status and NULL data, so lets not call the |
| * callback if the status is PJ_SUCCESS. |
| */ |
| if (status != PJ_SUCCESS ) { |
| ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0, |
| NULL, 0, status); |
| } |
| } |
| |
| /* If callback returns false, we have been destroyed! */ |
| if (!ret) |
| return; |
| |
| /* Also stop further read if we've been shutdown */ |
| if (asock->shutdown & SHUT_RX) |
| return; |
| |
| /* Only stream oriented socket may leave data in the packet */ |
| if (asock->stream_oriented) { |
| r->size = remainder; |
| } else { |
| r->size = 0; |
| } |
| } |
| |
| /* Read next data. We limit ourselves to processing max_loop immediate |
| * data, so when the loop counter has exceeded this value, force the |
| * read()/recvfrom() to return pending operation to allow the program |
| * to do other jobs. |
| */ |
| bytes_read = r->max_size - r->size; |
| flags = asock->read_flags; |
| if (++loop >= asock->max_loop) |
| flags |= PJ_IOQUEUE_ALWAYS_ASYNC; |
| |
| if (asock->read_type == TYPE_RECV) { |
| status = pj_ioqueue_recv(key, op_key, r->pkt + r->size, |
| &bytes_read, flags); |
| } else { |
| r->src_addr_len = sizeof(r->src_addr); |
| status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size, |
| &bytes_read, flags, |
| &r->src_addr, &r->src_addr_len); |
| } |
| |
| if (status == PJ_SUCCESS) { |
| /* Immediate data */ |
| ; |
| } else if (status != PJ_EPENDING && status != PJ_ECANCELLED) { |
| /* Error */ |
| bytes_read = -status; |
| } else { |
| break; |
| } |
| } while (1); |
| |
| } |
| |
| |
| static pj_status_t send_remaining(pj_activesock_t *asock, |
| pj_ioqueue_op_key_t *send_key) |
| { |
| struct send_data *sd = (struct send_data*)send_key->activesock_data; |
| pj_status_t status; |
| |
| do { |
| pj_ssize_t size; |
| |
| size = sd->len - sd->sent; |
| status = pj_ioqueue_send(asock->key, send_key, |
| sd->data+sd->sent, &size, sd->flags); |
| if (status != PJ_SUCCESS) { |
| /* Pending or error */ |
| break; |
| } |
| |
| sd->sent += size; |
| if (sd->sent == sd->len) { |
| /* The whole data has been sent. */ |
| return PJ_SUCCESS; |
| } |
| |
| } while (sd->sent < sd->len); |
| |
| return status; |
| } |
| |
| |
| PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock, |
| pj_ioqueue_op_key_t *send_key, |
| const void *data, |
| pj_ssize_t *size, |
| unsigned flags) |
| { |
| PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL); |
| |
| if (asock->shutdown & SHUT_TX) |
| return PJ_EINVALIDOP; |
| |
| send_key->activesock_data = NULL; |
| |
| if (asock->whole_data) { |
| pj_ssize_t whole; |
| pj_status_t status; |
| |
| whole = *size; |
| |
| status = pj_ioqueue_send(asock->key, send_key, data, size, flags); |
| if (status != PJ_SUCCESS) { |
| /* Pending or error */ |
| return status; |
| } |
| |
| if (*size == whole) { |
| /* The whole data has been sent. */ |
| return PJ_SUCCESS; |
| } |
| |
| /* Data was partially sent */ |
| asock->send_data.data = (pj_uint8_t*)data; |
| asock->send_data.len = whole; |
| asock->send_data.sent = *size; |
| asock->send_data.flags = flags; |
| send_key->activesock_data = &asock->send_data; |
| |
| /* Try again */ |
| status = send_remaining(asock, send_key); |
| if (status == PJ_SUCCESS) { |
| *size = whole; |
| } |
| return status; |
| |
| } else { |
| return pj_ioqueue_send(asock->key, send_key, data, size, flags); |
| } |
| } |
| |
| |
| PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock, |
| pj_ioqueue_op_key_t *send_key, |
| const void *data, |
| pj_ssize_t *size, |
| unsigned flags, |
| const pj_sockaddr_t *addr, |
| int addr_len) |
| { |
| PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len, |
| PJ_EINVAL); |
| |
| if (asock->shutdown & SHUT_TX) |
| return PJ_EINVALIDOP; |
| |
| return pj_ioqueue_sendto(asock->key, send_key, data, size, flags, |
| addr, addr_len); |
| } |
| |
| |
| static void ioqueue_on_write_complete(pj_ioqueue_key_t *key, |
| pj_ioqueue_op_key_t *op_key, |
| pj_ssize_t bytes_sent) |
| { |
| pj_activesock_t *asock; |
| |
| asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); |
| |
| /* Ignore if we've been shutdown. This may cause data to be partially |
| * sent even when 'wholedata' was requested if the OS only sent partial |
| * buffer. |
| */ |
| if (asock->shutdown & SHUT_TX) |
| return; |
| |
| if (bytes_sent > 0 && op_key->activesock_data) { |
| /* whole_data is requested. Make sure we send all the data */ |
| struct send_data *sd = (struct send_data*)op_key->activesock_data; |
| |
| sd->sent += bytes_sent; |
| if (sd->sent == sd->len) { |
| /* all has been sent */ |
| bytes_sent = sd->sent; |
| op_key->activesock_data = NULL; |
| } else { |
| /* send remaining data */ |
| pj_status_t status; |
| |
| status = send_remaining(asock, op_key); |
| if (status == PJ_EPENDING) |
| return; |
| else if (status == PJ_SUCCESS) |
| bytes_sent = sd->sent; |
| else |
| bytes_sent = -status; |
| |
| op_key->activesock_data = NULL; |
| } |
| } |
| |
| if (asock->cb.on_data_sent) { |
| pj_bool_t ret; |
| |
| ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent); |
| |
| /* If callback returns false, we have been destroyed! */ |
| if (!ret) |
| return; |
| } |
| } |
| |
| #if PJ_HAS_TCP |
| PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock, |
| pj_pool_t *pool) |
| { |
| unsigned i; |
| |
| PJ_ASSERT_RETURN(asock, PJ_EINVAL); |
| PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP); |
| |
| /* Ignore if we've been shutdown */ |
| if (asock->shutdown) |
| return PJ_EINVALIDOP; |
| |
| asock->accept_op = (struct accept_op*) |
| pj_pool_calloc(pool, asock->async_count, |
| sizeof(struct accept_op)); |
| for (i=0; i<asock->async_count; ++i) { |
| struct accept_op *a = &asock->accept_op[i]; |
| pj_status_t status; |
| |
| do { |
| a->new_sock = PJ_INVALID_SOCKET; |
| a->rem_addr_len = sizeof(a->rem_addr); |
| |
| status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock, |
| NULL, &a->rem_addr, &a->rem_addr_len); |
| if (status == PJ_SUCCESS) { |
| /* We've got immediate connection. Not sure if it's a good |
| * idea to call the callback now (probably application will |
| * not be prepared to process it), so lets just silently |
| * close the socket. |
| */ |
| pj_sock_close(a->new_sock); |
| } |
| } while (status == PJ_SUCCESS); |
| |
| if (status != PJ_EPENDING) { |
| return status; |
| } |
| } |
| |
| return PJ_SUCCESS; |
| } |
| |
| |
| static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key, |
| pj_ioqueue_op_key_t *op_key, |
| pj_sock_t new_sock, |
| pj_status_t status) |
| { |
| pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); |
| struct accept_op *accept_op = (struct accept_op*) op_key; |
| |
| PJ_UNUSED_ARG(new_sock); |
| |
| /* Ignore if we've been shutdown */ |
| if (asock->shutdown) |
| return; |
| |
| do { |
| if (status == asock->last_err && status != PJ_SUCCESS) { |
| asock->err_counter++; |
| if (asock->err_counter >= PJ_ACTIVESOCK_MAX_CONSECUTIVE_ACCEPT_ERROR) { |
| PJ_LOG(3, ("", "Received %d consecutive errors: %d for the accept()" |
| " operation, stopping further ioqueue accepts.", |
| asock->err_counter, asock->last_err)); |
| |
| if ((status == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) && |
| (asock->cb.on_accept_complete2)) |
| { |
| (*asock->cb.on_accept_complete2)(asock, |
| accept_op->new_sock, |
| &accept_op->rem_addr, |
| accept_op->rem_addr_len, |
| PJ_ESOCKETSTOP); |
| } |
| return; |
| } |
| } else { |
| asock->err_counter = 0; |
| asock->last_err = status; |
| } |
| |
| if (status==PJ_SUCCESS && (asock->cb.on_accept_complete2 || |
| asock->cb.on_accept_complete)) { |
| pj_bool_t ret; |
| |
| /* Notify callback */ |
| if (asock->cb.on_accept_complete2) { |
| ret = (*asock->cb.on_accept_complete2)(asock, |
| accept_op->new_sock, |
| &accept_op->rem_addr, |
| accept_op->rem_addr_len, |
| status); |
| } else { |
| ret = (*asock->cb.on_accept_complete)(asock, |
| accept_op->new_sock, |
| &accept_op->rem_addr, |
| accept_op->rem_addr_len); |
| } |
| |
| /* If callback returns false, we have been destroyed! */ |
| if (!ret) |
| return; |
| |
| #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ |
| PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 |
| activesock_create_iphone_os_stream(asock); |
| #endif |
| } else if (status==PJ_SUCCESS) { |
| /* Application doesn't handle the new socket, we need to |
| * close it to avoid resource leak. |
| */ |
| pj_sock_close(accept_op->new_sock); |
| } |
| |
| /* Don't start another accept() if we've been shutdown */ |
| if (asock->shutdown) |
| return; |
| |
| /* Prepare next accept() */ |
| accept_op->new_sock = PJ_INVALID_SOCKET; |
| accept_op->rem_addr_len = sizeof(accept_op->rem_addr); |
| |
| status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock, |
| NULL, &accept_op->rem_addr, |
| &accept_op->rem_addr_len); |
| |
| } while (status != PJ_EPENDING && status != PJ_ECANCELLED); |
| } |
| |
| |
| PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock, |
| pj_pool_t *pool, |
| const pj_sockaddr_t *remaddr, |
| int addr_len) |
| { |
| PJ_UNUSED_ARG(pool); |
| |
| if (asock->shutdown) |
| return PJ_EINVALIDOP; |
| |
| return pj_ioqueue_connect(asock->key, remaddr, addr_len); |
| } |
| |
| static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key, |
| pj_status_t status) |
| { |
| pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); |
| |
| /* Ignore if we've been shutdown */ |
| if (asock->shutdown) |
| return; |
| |
| if (asock->cb.on_connect_complete) { |
| pj_bool_t ret; |
| |
| ret = (*asock->cb.on_connect_complete)(asock, status); |
| |
| if (!ret) { |
| /* We've been destroyed */ |
| return; |
| } |
| |
| #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ |
| PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 |
| activesock_create_iphone_os_stream(asock); |
| #endif |
| |
| } |
| } |
| #endif /* PJ_HAS_TCP */ |
| |