blob: 9de23156317e287f8b914ca970458276a86aff99 [file] [log] [blame]
Benny Prijono4bac2c12008-05-11 18:12:16 +00001/* $Id$ */
2/*
Benny Prijono844653c2008-12-23 17:27:53 +00003 * Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono4bac2c12008-05-11 18:12:16 +00005 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20#include <pj/activesock.h>
Benny Prijonobd344ff2008-08-04 09:59:02 +000021#include <pj/compat/socket.h>
Benny Prijono4bac2c12008-05-11 18:12:16 +000022#include <pj/assert.h>
23#include <pj/errno.h>
Sauw Mingbe3771a2010-08-27 06:46:29 +000024#include <pj/log.h>
Benny Prijono4bac2c12008-05-11 18:12:16 +000025#include <pj/pool.h>
26#include <pj/sock.h>
27#include <pj/string.h>
28
Sauw Mingbe3771a2010-08-27 06:46:29 +000029#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
30 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
31# include <CFNetwork/CFNetwork.h>
32#endif
33
Benny Prijono4bac2c12008-05-11 18:12:16 +000034#define PJ_ACTIVESOCK_MAX_LOOP 50
35
36
37enum read_type
38{
39 TYPE_NONE,
40 TYPE_RECV,
41 TYPE_RECV_FROM
42};
43
44struct read_op
45{
46 pj_ioqueue_op_key_t op_key;
47 pj_uint8_t *pkt;
48 unsigned max_size;
49 pj_size_t size;
50 pj_sockaddr src_addr;
51 int src_addr_len;
52};
53
54struct accept_op
55{
56 pj_ioqueue_op_key_t op_key;
57 pj_sock_t new_sock;
58 pj_sockaddr rem_addr;
59 int rem_addr_len;
60};
61
Benny Prijono417d6052008-07-29 20:15:15 +000062struct send_data
63{
64 pj_uint8_t *data;
65 pj_ssize_t len;
66 pj_ssize_t sent;
67 unsigned flags;
68};
69
Benny Prijono4bac2c12008-05-11 18:12:16 +000070struct pj_activesock_t
71{
72 pj_ioqueue_key_t *key;
73 pj_bool_t stream_oriented;
Benny Prijono417d6052008-07-29 20:15:15 +000074 pj_bool_t whole_data;
Benny Prijono4bac2c12008-05-11 18:12:16 +000075 pj_ioqueue_t *ioqueue;
76 void *user_data;
77 unsigned async_count;
78 unsigned max_loop;
79 pj_activesock_cb cb;
Sauw Mingbe3771a2010-08-27 06:46:29 +000080#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
81 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
82 int bg_setting;
83 pj_sock_t sock;
84 CFReadStreamRef readStream;
85#endif
86
Sauw Ming47b77a82010-09-22 13:11:11 +000087 unsigned err_counter;
88 pj_status_t last_err;
89
Benny Prijono417d6052008-07-29 20:15:15 +000090 struct send_data send_data;
91
Benny Prijono4bac2c12008-05-11 18:12:16 +000092 struct read_op *read_op;
93 pj_uint32_t read_flags;
94 enum read_type read_type;
95
96 struct accept_op *accept_op;
97};
98
99
100static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
101 pj_ioqueue_op_key_t *op_key,
102 pj_ssize_t bytes_read);
103static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
104 pj_ioqueue_op_key_t *op_key,
105 pj_ssize_t bytes_sent);
Benny Prijono1dd54202008-07-25 10:45:34 +0000106#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000107static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
108 pj_ioqueue_op_key_t *op_key,
109 pj_sock_t sock,
110 pj_status_t status);
111static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
112 pj_status_t status);
Benny Prijono1dd54202008-07-25 10:45:34 +0000113#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000114
115PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg)
116{
117 pj_bzero(cfg, sizeof(*cfg));
118 cfg->async_cnt = 1;
119 cfg->concurrency = -1;
Benny Prijono417d6052008-07-29 20:15:15 +0000120 cfg->whole_data = PJ_TRUE;
Benny Prijono4bac2c12008-05-11 18:12:16 +0000121}
122
Sauw Mingbe3771a2010-08-27 06:46:29 +0000123#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
124 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
125static void activesock_destroy_iphone_os_stream(pj_activesock_t *asock)
126{
127 if (asock->readStream) {
128 CFReadStreamClose(asock->readStream);
129 CFRelease(asock->readStream);
130 asock->readStream = NULL;
131 }
132}
133
134static void activesock_create_iphone_os_stream(pj_activesock_t *asock)
135{
136 if (asock->bg_setting && asock->stream_oriented) {
137 activesock_destroy_iphone_os_stream(asock);
138
139 CFStreamCreatePairWithSocket(kCFAllocatorDefault, asock->sock,
140 &asock->readStream, NULL);
141
142 if (!asock->readStream ||
143 CFReadStreamSetProperty(asock->readStream,
144 kCFStreamNetworkServiceType,
145 kCFStreamNetworkServiceTypeVoIP)
146 != TRUE ||
147 CFReadStreamOpen(asock->readStream) != TRUE)
148 {
149 PJ_LOG(2,("", "Failed to configure TCP transport for VoIP "
150 "usage. Background mode will not be supported."));
151
152 activesock_destroy_iphone_os_stream(asock);
153 }
154 }
155}
156
157
158PJ_DEF(void) pj_activesock_set_iphone_os_bg(pj_activesock_t *asock,
159 int val)
160{
161 asock->bg_setting = val;
162 if (asock->bg_setting)
163 activesock_create_iphone_os_stream(asock);
164 else
165 activesock_destroy_iphone_os_stream(asock);
166}
167#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000168
169PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
170 pj_sock_t sock,
171 int sock_type,
172 const pj_activesock_cfg *opt,
173 pj_ioqueue_t *ioqueue,
174 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000175 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000176 pj_activesock_t **p_asock)
177{
178 pj_activesock_t *asock;
179 pj_ioqueue_callback ioq_cb;
180 pj_status_t status;
181
182 PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL);
183 PJ_ASSERT_RETURN(sock!=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL);
184 PJ_ASSERT_RETURN(sock_type==pj_SOCK_STREAM() ||
185 sock_type==pj_SOCK_DGRAM(), PJ_EINVAL);
186 PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL);
187
188 asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t);
189 asock->ioqueue = ioqueue;
190 asock->stream_oriented = (sock_type == pj_SOCK_STREAM());
191 asock->async_count = (opt? opt->async_cnt : 1);
Benny Prijono417d6052008-07-29 20:15:15 +0000192 asock->whole_data = (opt? opt->whole_data : 1);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000193 asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP;
Benny Prijonoea8e4362008-06-06 14:12:23 +0000194 asock->user_data = user_data;
Benny Prijono4bac2c12008-05-11 18:12:16 +0000195 pj_memcpy(&asock->cb, cb, sizeof(*cb));
196
197 pj_bzero(&ioq_cb, sizeof(ioq_cb));
198 ioq_cb.on_read_complete = &ioqueue_on_read_complete;
199 ioq_cb.on_write_complete = &ioqueue_on_write_complete;
Benny Prijono1dd54202008-07-25 10:45:34 +0000200#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000201 ioq_cb.on_connect_complete = &ioqueue_on_connect_complete;
202 ioq_cb.on_accept_complete = &ioqueue_on_accept_complete;
Benny Prijono1dd54202008-07-25 10:45:34 +0000203#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000204
205 status = pj_ioqueue_register_sock(pool, ioqueue, sock, asock,
206 &ioq_cb, &asock->key);
207 if (status != PJ_SUCCESS) {
208 pj_activesock_close(asock);
209 return status;
210 }
211
Benny Prijono417d6052008-07-29 20:15:15 +0000212 if (asock->whole_data) {
213 /* Must disable concurrency otherwise there is a race condition */
214 pj_ioqueue_set_concurrency(asock->key, 0);
215 } else if (opt && opt->concurrency >= 0) {
Benny Prijono4bac2c12008-05-11 18:12:16 +0000216 pj_ioqueue_set_concurrency(asock->key, opt->concurrency);
217 }
218
Sauw Mingbe3771a2010-08-27 06:46:29 +0000219#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
220 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
221 asock->sock = sock;
Sauw Ming30132a32010-09-22 13:21:40 +0000222 asock->bg_setting = PJ_ACTIVESOCK_TCP_IPHONE_OS_BG;
Sauw Mingbe3771a2010-08-27 06:46:29 +0000223#endif
224
Benny Prijono4bac2c12008-05-11 18:12:16 +0000225 *p_asock = asock;
226 return PJ_SUCCESS;
227}
228
229
230PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool,
231 const pj_sockaddr *addr,
232 const pj_activesock_cfg *opt,
233 pj_ioqueue_t *ioqueue,
234 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000235 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000236 pj_activesock_t **p_asock,
237 pj_sockaddr *bound_addr)
238{
239 pj_sock_t sock_fd;
240 pj_sockaddr default_addr;
241 pj_status_t status;
242
243 if (addr == NULL) {
244 pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0);
245 addr = &default_addr;
246 }
247
248 status = pj_sock_socket(addr->addr.sa_family, pj_SOCK_DGRAM(), 0,
249 &sock_fd);
250 if (status != PJ_SUCCESS) {
251 return status;
252 }
253
254 status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr));
255 if (status != PJ_SUCCESS) {
256 pj_sock_close(sock_fd);
257 return status;
258 }
259
260 status = pj_activesock_create(pool, sock_fd, pj_SOCK_DGRAM(), opt,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000261 ioqueue, cb, user_data, p_asock);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000262 if (status != PJ_SUCCESS) {
263 pj_sock_close(sock_fd);
264 return status;
265 }
266
267 if (bound_addr) {
268 int addr_len = sizeof(*bound_addr);
269 status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len);
270 if (status != PJ_SUCCESS) {
271 pj_activesock_close(*p_asock);
272 return status;
273 }
274 }
275
276 return PJ_SUCCESS;
277}
278
279
280PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock)
281{
282 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
283 if (asock->key) {
Sauw Mingbe3771a2010-08-27 06:46:29 +0000284#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
285 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
286 activesock_destroy_iphone_os_stream(asock);
287#endif
288
Benny Prijono4bac2c12008-05-11 18:12:16 +0000289 pj_ioqueue_unregister(asock->key);
290 asock->key = NULL;
291 }
292 return PJ_SUCCESS;
293}
294
295
296PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock,
297 void *user_data)
298{
299 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
300 asock->user_data = user_data;
301 return PJ_SUCCESS;
302}
303
304
305PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock)
306{
307 PJ_ASSERT_RETURN(asock, NULL);
308 return asock->user_data;
309}
310
311
312PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock,
313 pj_pool_t *pool,
314 unsigned buff_size,
315 pj_uint32_t flags)
316{
Benny Prijonobd344ff2008-08-04 09:59:02 +0000317 void **readbuf;
318 unsigned i;
319
320 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
321
322 readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
323 sizeof(void*));
324
325 for (i=0; i<asock->async_count; ++i) {
326 readbuf[i] = pj_pool_alloc(pool, buff_size);
327 }
328
329 return pj_activesock_start_read2(asock, pool, buff_size, readbuf, flags);
330}
331
332
333PJ_DEF(pj_status_t) pj_activesock_start_read2( pj_activesock_t *asock,
334 pj_pool_t *pool,
335 unsigned buff_size,
336 void *readbuf[],
337 pj_uint32_t flags)
338{
Benny Prijono4bac2c12008-05-11 18:12:16 +0000339 unsigned i;
340 pj_status_t status;
341
342 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
343 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
344 PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP);
345
346 asock->read_op = (struct read_op*)
347 pj_pool_calloc(pool, asock->async_count,
348 sizeof(struct read_op));
349 asock->read_type = TYPE_RECV;
350 asock->read_flags = flags;
351
352 for (i=0; i<asock->async_count; ++i) {
353 struct read_op *r = &asock->read_op[i];
354 pj_ssize_t size_to_read;
355
Benny Prijonobd344ff2008-08-04 09:59:02 +0000356 r->pkt = (pj_uint8_t*)readbuf[i];
Benny Prijono4bac2c12008-05-11 18:12:16 +0000357 r->max_size = size_to_read = buff_size;
358
359 status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read,
360 PJ_IOQUEUE_ALWAYS_ASYNC | flags);
361 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
362
363 if (status != PJ_EPENDING)
364 return status;
365 }
366
367 return PJ_SUCCESS;
368}
369
370
371PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock,
372 pj_pool_t *pool,
373 unsigned buff_size,
374 pj_uint32_t flags)
375{
Benny Prijonobd344ff2008-08-04 09:59:02 +0000376 void **readbuf;
377 unsigned i;
378
379 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
380
381 readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
382 sizeof(void*));
383
384 for (i=0; i<asock->async_count; ++i) {
385 readbuf[i] = pj_pool_alloc(pool, buff_size);
386 }
387
388 return pj_activesock_start_recvfrom2(asock, pool, buff_size,
389 readbuf, flags);
390}
391
392
393PJ_DEF(pj_status_t) pj_activesock_start_recvfrom2( pj_activesock_t *asock,
394 pj_pool_t *pool,
395 unsigned buff_size,
396 void *readbuf[],
397 pj_uint32_t flags)
398{
Benny Prijono4bac2c12008-05-11 18:12:16 +0000399 unsigned i;
400 pj_status_t status;
401
402 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
403 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
404
405 asock->read_op = (struct read_op*)
406 pj_pool_calloc(pool, asock->async_count,
407 sizeof(struct read_op));
408 asock->read_type = TYPE_RECV_FROM;
409 asock->read_flags = flags;
410
411 for (i=0; i<asock->async_count; ++i) {
412 struct read_op *r = &asock->read_op[i];
413 pj_ssize_t size_to_read;
414
Benny Prijonobd344ff2008-08-04 09:59:02 +0000415 r->pkt = (pj_uint8_t*) readbuf[i];
Benny Prijono4bac2c12008-05-11 18:12:16 +0000416 r->max_size = size_to_read = buff_size;
417 r->src_addr_len = sizeof(r->src_addr);
418
419 status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt,
420 &size_to_read,
421 PJ_IOQUEUE_ALWAYS_ASYNC | flags,
422 &r->src_addr, &r->src_addr_len);
423 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
424
425 if (status != PJ_EPENDING)
426 return status;
427 }
428
429 return PJ_SUCCESS;
430}
431
432
433static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
434 pj_ioqueue_op_key_t *op_key,
435 pj_ssize_t bytes_read)
436{
437 pj_activesock_t *asock;
438 struct read_op *r = (struct read_op*)op_key;
439 unsigned loop = 0;
440 pj_status_t status;
441
442 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
443
444 do {
445 unsigned flags;
446
447 if (bytes_read > 0) {
448 /*
449 * We've got new data.
450 */
451 pj_size_t remainder;
452 pj_bool_t ret;
453
454 /* Append this new data to existing data. If socket is stream
455 * oriented, user might have left some data in the buffer.
456 * Otherwise if socket is datagram there will be nothing in
457 * existing packet hence the packet will contain only the new
458 * packet.
459 */
460 r->size += bytes_read;
461
462 /* Set default remainder to zero */
463 remainder = 0;
464
465 /* And return value to TRUE */
466 ret = PJ_TRUE;
467
468 /* Notify callback */
469 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
470 ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
471 PJ_SUCCESS, &remainder);
472 } else if (asock->read_type == TYPE_RECV_FROM &&
473 asock->cb.on_data_recvfrom)
474 {
475 ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size,
476 &r->src_addr,
477 r->src_addr_len,
478 PJ_SUCCESS);
479 }
480
481 /* If callback returns false, we have been destroyed! */
482 if (!ret)
483 return;
484
485 /* Only stream oriented socket may leave data in the packet */
486 if (asock->stream_oriented) {
487 r->size = remainder;
488 } else {
489 r->size = 0;
490 }
491
Benny Prijonobd344ff2008-08-04 09:59:02 +0000492 } else if (bytes_read <= 0 &&
493 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
494 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
Benny Prijono3eb59632008-08-26 19:27:23 +0000495 (asock->stream_oriented ||
496 -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)))
Benny Prijonobd344ff2008-08-04 09:59:02 +0000497 {
Benny Prijono4bac2c12008-05-11 18:12:16 +0000498 pj_size_t remainder;
499 pj_bool_t ret;
500
501 if (bytes_read == 0) {
502 /* For stream/connection oriented socket, this means the
503 * connection has been closed. For datagram sockets, it means
504 * we've received datagram with zero length.
505 */
506 if (asock->stream_oriented)
507 status = PJ_EEOF;
508 else
509 status = PJ_SUCCESS;
510 } else {
511 /* This means we've got an error. If this is stream/connection
512 * oriented, it means connection has been closed. For datagram
513 * sockets, it means we've got some error (e.g. EWOULDBLOCK).
514 */
515 status = -bytes_read;
516 }
517
518 /* Set default remainder to zero */
519 remainder = 0;
520
521 /* And return value to TRUE */
522 ret = PJ_TRUE;
523
524 /* Notify callback */
525 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
526 /* For connection oriented socket, we still need to report
527 * the remainder data (if any) to the user to let user do
528 * processing with the remainder data before it closes the
529 * connection.
530 * If there is no remainder data, set the packet to NULL.
531 */
Nanang Izzuddina326fbf2009-10-26 14:09:09 +0000532
533 /* Shouldn't set the packet to NULL, as there may be active
534 * socket user, such as SSL socket, that needs to have access
535 * to the read buffer packet.
536 */
537 //ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL),
538 // r->size, status, &remainder);
539 ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
540 status, &remainder);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000541
542 } else if (asock->read_type == TYPE_RECV_FROM &&
543 asock->cb.on_data_recvfrom)
544 {
545 /* This would always be datagram oriented hence there's
546 * nothing in the packet. We can't be sure if there will be
547 * anything useful in the source_addr, so just put NULL
548 * there too.
549 */
Benny Prijono758decb2008-08-26 17:10:51 +0000550 /* In some scenarios, status may be PJ_SUCCESS. The upper
551 * layer application may not expect the callback to be called
552 * with successful status and NULL data, so lets not call the
553 * callback if the status is PJ_SUCCESS.
554 */
555 if (status != PJ_SUCCESS ) {
556 ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0,
557 NULL, 0, status);
558 }
Benny Prijono4bac2c12008-05-11 18:12:16 +0000559 }
560
561 /* If callback returns false, we have been destroyed! */
562 if (!ret)
563 return;
564
565 /* Only stream oriented socket may leave data in the packet */
566 if (asock->stream_oriented) {
567 r->size = remainder;
568 } else {
569 r->size = 0;
570 }
571 }
572
573 /* Read next data. We limit ourselves to processing max_loop immediate
574 * data, so when the loop counter has exceeded this value, force the
575 * read()/recvfrom() to return pending operation to allow the program
576 * to do other jobs.
577 */
578 bytes_read = r->max_size - r->size;
579 flags = asock->read_flags;
580 if (++loop >= asock->max_loop)
581 flags |= PJ_IOQUEUE_ALWAYS_ASYNC;
582
583 if (asock->read_type == TYPE_RECV) {
584 status = pj_ioqueue_recv(key, op_key, r->pkt + r->size,
585 &bytes_read, flags);
586 } else {
587 r->src_addr_len = sizeof(r->src_addr);
588 status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size,
589 &bytes_read, flags,
590 &r->src_addr, &r->src_addr_len);
591 }
592
Benny Prijono7f6ca732008-08-26 20:47:53 +0000593 if (status == PJ_SUCCESS) {
594 /* Immediate data */
595 ;
596 } else if (status != PJ_EPENDING && status != PJ_ECANCELLED) {
597 /* Error */
Benny Prijono758decb2008-08-26 17:10:51 +0000598 bytes_read = -status;
599 } else {
600 break;
601 }
602 } while (1);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000603
604}
605
606
Benny Prijono417d6052008-07-29 20:15:15 +0000607static pj_status_t send_remaining(pj_activesock_t *asock,
608 pj_ioqueue_op_key_t *send_key)
609{
610 struct send_data *sd = (struct send_data*)send_key->activesock_data;
611 pj_status_t status;
612
613 do {
614 pj_ssize_t size;
615
616 size = sd->len - sd->sent;
617 status = pj_ioqueue_send(asock->key, send_key,
618 sd->data+sd->sent, &size, sd->flags);
619 if (status != PJ_SUCCESS) {
620 /* Pending or error */
621 break;
622 }
623
624 sd->sent += size;
625 if (sd->sent == sd->len) {
626 /* The whole data has been sent. */
627 return PJ_SUCCESS;
628 }
629
630 } while (sd->sent < sd->len);
631
632 return status;
633}
634
635
Benny Prijono4bac2c12008-05-11 18:12:16 +0000636PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
637 pj_ioqueue_op_key_t *send_key,
638 const void *data,
639 pj_ssize_t *size,
640 unsigned flags)
641{
642 PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
643
Benny Prijono417d6052008-07-29 20:15:15 +0000644 send_key->activesock_data = NULL;
645
646 if (asock->whole_data) {
647 pj_ssize_t whole;
648 pj_status_t status;
649
650 whole = *size;
651
652 status = pj_ioqueue_send(asock->key, send_key, data, size, flags);
653 if (status != PJ_SUCCESS) {
654 /* Pending or error */
655 return status;
656 }
657
658 if (*size == whole) {
659 /* The whole data has been sent. */
660 return PJ_SUCCESS;
661 }
662
663 /* Data was partially sent */
664 asock->send_data.data = (pj_uint8_t*)data;
665 asock->send_data.len = whole;
666 asock->send_data.sent = *size;
667 asock->send_data.flags = flags;
668 send_key->activesock_data = &asock->send_data;
669
670 /* Try again */
671 status = send_remaining(asock, send_key);
672 if (status == PJ_SUCCESS) {
673 *size = whole;
674 }
675 return status;
676
677 } else {
678 return pj_ioqueue_send(asock->key, send_key, data, size, flags);
679 }
Benny Prijono4bac2c12008-05-11 18:12:16 +0000680}
681
682
683PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock,
684 pj_ioqueue_op_key_t *send_key,
685 const void *data,
686 pj_ssize_t *size,
687 unsigned flags,
688 const pj_sockaddr_t *addr,
689 int addr_len)
690{
691 PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len,
692 PJ_EINVAL);
693
694 return pj_ioqueue_sendto(asock->key, send_key, data, size, flags,
695 addr, addr_len);
696}
697
698
699static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
700 pj_ioqueue_op_key_t *op_key,
701 pj_ssize_t bytes_sent)
702{
703 pj_activesock_t *asock;
704
705 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
706
Benny Prijono417d6052008-07-29 20:15:15 +0000707 if (bytes_sent > 0 && op_key->activesock_data) {
708 /* whole_data is requested. Make sure we send all the data */
709 struct send_data *sd = (struct send_data*)op_key->activesock_data;
710
711 sd->sent += bytes_sent;
712 if (sd->sent == sd->len) {
713 /* all has been sent */
714 bytes_sent = sd->sent;
715 op_key->activesock_data = NULL;
716 } else {
717 /* send remaining data */
718 pj_status_t status;
719
720 status = send_remaining(asock, op_key);
721 if (status == PJ_EPENDING)
722 return;
723 else if (status == PJ_SUCCESS)
724 bytes_sent = sd->sent;
725 else
726 bytes_sent = -status;
727
728 op_key->activesock_data = NULL;
729 }
730 }
731
Benny Prijono4bac2c12008-05-11 18:12:16 +0000732 if (asock->cb.on_data_sent) {
733 pj_bool_t ret;
734
735 ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent);
736
737 /* If callback returns false, we have been destroyed! */
738 if (!ret)
739 return;
740 }
741}
742
Benny Prijono1dd54202008-07-25 10:45:34 +0000743#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000744PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock,
745 pj_pool_t *pool)
746{
747 unsigned i;
748
749 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
750 PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP);
751
752 asock->accept_op = (struct accept_op*)
753 pj_pool_calloc(pool, asock->async_count,
754 sizeof(struct accept_op));
755 for (i=0; i<asock->async_count; ++i) {
756 struct accept_op *a = &asock->accept_op[i];
757 pj_status_t status;
758
759 do {
760 a->new_sock = PJ_INVALID_SOCKET;
761 a->rem_addr_len = sizeof(a->rem_addr);
762
763 status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock,
764 NULL, &a->rem_addr, &a->rem_addr_len);
765 if (status == PJ_SUCCESS) {
766 /* We've got immediate connection. Not sure if it's a good
767 * idea to call the callback now (probably application will
768 * not be prepared to process it), so lets just silently
769 * close the socket.
770 */
771 pj_sock_close(a->new_sock);
772 }
773 } while (status == PJ_SUCCESS);
774
775 if (status != PJ_EPENDING) {
776 return status;
777 }
778 }
779
780 return PJ_SUCCESS;
781}
782
783
784static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
785 pj_ioqueue_op_key_t *op_key,
786 pj_sock_t new_sock,
787 pj_status_t status)
788{
789 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
790 struct accept_op *accept_op = (struct accept_op*) op_key;
791
Nanang Izzuddin7369d222009-11-03 12:58:54 +0000792 PJ_UNUSED_ARG(new_sock);
793
Benny Prijono4bac2c12008-05-11 18:12:16 +0000794 do {
Sauw Ming47b77a82010-09-22 13:11:11 +0000795 if (status == asock->last_err && status != PJ_SUCCESS) {
796 asock->err_counter++;
797 if (asock->err_counter >= PJ_ACTIVESOCK_MAX_CONSECUTIVE_ACCEPT_ERROR) {
798 PJ_LOG(3, ("", "Received %d consecutive errors: %d for the accept()"
799 " operation, stopping further ioqueue accepts.",
800 asock->err_counter, asock->last_err));
801 return;
802 }
803 } else {
804 asock->err_counter = 0;
805 asock->last_err = status;
806 }
807
Benny Prijono4bac2c12008-05-11 18:12:16 +0000808 if (status==PJ_SUCCESS && asock->cb.on_accept_complete) {
809 pj_bool_t ret;
810
811 /* Notify callback */
Nanang Izzuddin36aa1fb2009-11-03 12:44:11 +0000812 ret = (*asock->cb.on_accept_complete)(asock, accept_op->new_sock,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000813 &accept_op->rem_addr,
814 accept_op->rem_addr_len);
815
816 /* If callback returns false, we have been destroyed! */
817 if (!ret)
818 return;
819
Sauw Mingbe3771a2010-08-27 06:46:29 +0000820#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
821 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
822 activesock_create_iphone_os_stream(asock);
823#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000824 } else if (status==PJ_SUCCESS) {
825 /* Application doesn't handle the new socket, we need to
826 * close it to avoid resource leak.
827 */
828 pj_sock_close(accept_op->new_sock);
829 }
830
831 /* Prepare next accept() */
832 accept_op->new_sock = PJ_INVALID_SOCKET;
833 accept_op->rem_addr_len = sizeof(accept_op->rem_addr);
834
835 status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock,
836 NULL, &accept_op->rem_addr,
837 &accept_op->rem_addr_len);
838
839 } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
840}
841
842
843PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock,
844 pj_pool_t *pool,
845 const pj_sockaddr_t *remaddr,
846 int addr_len)
847{
848 PJ_UNUSED_ARG(pool);
849 return pj_ioqueue_connect(asock->key, remaddr, addr_len);
850}
851
Benny Prijono4bac2c12008-05-11 18:12:16 +0000852static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
853 pj_status_t status)
854{
855 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
856
857 if (asock->cb.on_connect_complete) {
858 pj_bool_t ret;
859
860 ret = (*asock->cb.on_connect_complete)(asock, status);
861
862 if (!ret) {
863 /* We've been destroyed */
864 return;
865 }
Sauw Mingbe3771a2010-08-27 06:46:29 +0000866
867#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
868 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
869 activesock_create_iphone_os_stream(asock);
870#endif
871
Benny Prijono4bac2c12008-05-11 18:12:16 +0000872 }
873}
Benny Prijono1dd54202008-07-25 10:45:34 +0000874#endif /* PJ_HAS_TCP */
Benny Prijono4bac2c12008-05-11 18:12:16 +0000875