blob: 0ba1a790c07219917bb7e60d076a986e5a7952fd [file] [log] [blame]
Benny Prijono4bac2c12008-05-11 18:12:16 +00001/* $Id$ */
2/*
Benny Prijono844653c2008-12-23 17:27:53 +00003 * Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono4bac2c12008-05-11 18:12:16 +00005 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20#include <pj/activesock.h>
Benny Prijonobd344ff2008-08-04 09:59:02 +000021#include <pj/compat/socket.h>
Benny Prijono4bac2c12008-05-11 18:12:16 +000022#include <pj/assert.h>
23#include <pj/errno.h>
Sauw Mingbe3771a2010-08-27 06:46:29 +000024#include <pj/log.h>
Benny Prijono4bac2c12008-05-11 18:12:16 +000025#include <pj/pool.h>
26#include <pj/sock.h>
27#include <pj/string.h>
28
Sauw Mingbe3771a2010-08-27 06:46:29 +000029#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
30 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
31# include <CFNetwork/CFNetwork.h>
32#endif
33
Benny Prijono4bac2c12008-05-11 18:12:16 +000034#define PJ_ACTIVESOCK_MAX_LOOP 50
35
36
37enum read_type
38{
39 TYPE_NONE,
40 TYPE_RECV,
41 TYPE_RECV_FROM
42};
43
44struct read_op
45{
46 pj_ioqueue_op_key_t op_key;
47 pj_uint8_t *pkt;
48 unsigned max_size;
49 pj_size_t size;
50 pj_sockaddr src_addr;
51 int src_addr_len;
52};
53
54struct accept_op
55{
56 pj_ioqueue_op_key_t op_key;
57 pj_sock_t new_sock;
58 pj_sockaddr rem_addr;
59 int rem_addr_len;
60};
61
Benny Prijono417d6052008-07-29 20:15:15 +000062struct send_data
63{
64 pj_uint8_t *data;
65 pj_ssize_t len;
66 pj_ssize_t sent;
67 unsigned flags;
68};
69
Benny Prijono4bac2c12008-05-11 18:12:16 +000070struct pj_activesock_t
71{
72 pj_ioqueue_key_t *key;
73 pj_bool_t stream_oriented;
Benny Prijono417d6052008-07-29 20:15:15 +000074 pj_bool_t whole_data;
Benny Prijono4bac2c12008-05-11 18:12:16 +000075 pj_ioqueue_t *ioqueue;
76 void *user_data;
77 unsigned async_count;
78 unsigned max_loop;
79 pj_activesock_cb cb;
Sauw Mingbe3771a2010-08-27 06:46:29 +000080#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
81 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
82 int bg_setting;
83 pj_sock_t sock;
84 CFReadStreamRef readStream;
85#endif
86
Benny Prijono417d6052008-07-29 20:15:15 +000087 struct send_data send_data;
88
Benny Prijono4bac2c12008-05-11 18:12:16 +000089 struct read_op *read_op;
90 pj_uint32_t read_flags;
91 enum read_type read_type;
92
93 struct accept_op *accept_op;
94};
95
96
97static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
98 pj_ioqueue_op_key_t *op_key,
99 pj_ssize_t bytes_read);
100static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
101 pj_ioqueue_op_key_t *op_key,
102 pj_ssize_t bytes_sent);
Benny Prijono1dd54202008-07-25 10:45:34 +0000103#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000104static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
105 pj_ioqueue_op_key_t *op_key,
106 pj_sock_t sock,
107 pj_status_t status);
108static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
109 pj_status_t status);
Benny Prijono1dd54202008-07-25 10:45:34 +0000110#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000111
112PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg)
113{
114 pj_bzero(cfg, sizeof(*cfg));
115 cfg->async_cnt = 1;
116 cfg->concurrency = -1;
Benny Prijono417d6052008-07-29 20:15:15 +0000117 cfg->whole_data = PJ_TRUE;
Benny Prijono4bac2c12008-05-11 18:12:16 +0000118}
119
Sauw Mingbe3771a2010-08-27 06:46:29 +0000120#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
121 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
122static void activesock_destroy_iphone_os_stream(pj_activesock_t *asock)
123{
124 if (asock->readStream) {
125 CFReadStreamClose(asock->readStream);
126 CFRelease(asock->readStream);
127 asock->readStream = NULL;
128 }
129}
130
131static void activesock_create_iphone_os_stream(pj_activesock_t *asock)
132{
133 if (asock->bg_setting && asock->stream_oriented) {
134 activesock_destroy_iphone_os_stream(asock);
135
136 CFStreamCreatePairWithSocket(kCFAllocatorDefault, asock->sock,
137 &asock->readStream, NULL);
138
139 if (!asock->readStream ||
140 CFReadStreamSetProperty(asock->readStream,
141 kCFStreamNetworkServiceType,
142 kCFStreamNetworkServiceTypeVoIP)
143 != TRUE ||
144 CFReadStreamOpen(asock->readStream) != TRUE)
145 {
146 PJ_LOG(2,("", "Failed to configure TCP transport for VoIP "
147 "usage. Background mode will not be supported."));
148
149 activesock_destroy_iphone_os_stream(asock);
150 }
151 }
152}
153
154
155PJ_DEF(void) pj_activesock_set_iphone_os_bg(pj_activesock_t *asock,
156 int val)
157{
158 asock->bg_setting = val;
159 if (asock->bg_setting)
160 activesock_create_iphone_os_stream(asock);
161 else
162 activesock_destroy_iphone_os_stream(asock);
163}
164#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000165
166PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
167 pj_sock_t sock,
168 int sock_type,
169 const pj_activesock_cfg *opt,
170 pj_ioqueue_t *ioqueue,
171 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000172 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000173 pj_activesock_t **p_asock)
174{
175 pj_activesock_t *asock;
176 pj_ioqueue_callback ioq_cb;
177 pj_status_t status;
178
179 PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL);
180 PJ_ASSERT_RETURN(sock!=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL);
181 PJ_ASSERT_RETURN(sock_type==pj_SOCK_STREAM() ||
182 sock_type==pj_SOCK_DGRAM(), PJ_EINVAL);
183 PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL);
184
185 asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t);
186 asock->ioqueue = ioqueue;
187 asock->stream_oriented = (sock_type == pj_SOCK_STREAM());
188 asock->async_count = (opt? opt->async_cnt : 1);
Benny Prijono417d6052008-07-29 20:15:15 +0000189 asock->whole_data = (opt? opt->whole_data : 1);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000190 asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP;
Benny Prijonoea8e4362008-06-06 14:12:23 +0000191 asock->user_data = user_data;
Benny Prijono4bac2c12008-05-11 18:12:16 +0000192 pj_memcpy(&asock->cb, cb, sizeof(*cb));
193
194 pj_bzero(&ioq_cb, sizeof(ioq_cb));
195 ioq_cb.on_read_complete = &ioqueue_on_read_complete;
196 ioq_cb.on_write_complete = &ioqueue_on_write_complete;
Benny Prijono1dd54202008-07-25 10:45:34 +0000197#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000198 ioq_cb.on_connect_complete = &ioqueue_on_connect_complete;
199 ioq_cb.on_accept_complete = &ioqueue_on_accept_complete;
Benny Prijono1dd54202008-07-25 10:45:34 +0000200#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000201
202 status = pj_ioqueue_register_sock(pool, ioqueue, sock, asock,
203 &ioq_cb, &asock->key);
204 if (status != PJ_SUCCESS) {
205 pj_activesock_close(asock);
206 return status;
207 }
208
Benny Prijono417d6052008-07-29 20:15:15 +0000209 if (asock->whole_data) {
210 /* Must disable concurrency otherwise there is a race condition */
211 pj_ioqueue_set_concurrency(asock->key, 0);
212 } else if (opt && opt->concurrency >= 0) {
Benny Prijono4bac2c12008-05-11 18:12:16 +0000213 pj_ioqueue_set_concurrency(asock->key, opt->concurrency);
214 }
215
Sauw Mingbe3771a2010-08-27 06:46:29 +0000216#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
217 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
218 asock->sock = sock;
219 pj_activesock_set_iphone_os_bg(asock,
220 PJ_ACTIVESOCK_TCP_IPHONE_OS_BG);
221#endif
222
Benny Prijono4bac2c12008-05-11 18:12:16 +0000223 *p_asock = asock;
224 return PJ_SUCCESS;
225}
226
227
228PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool,
229 const pj_sockaddr *addr,
230 const pj_activesock_cfg *opt,
231 pj_ioqueue_t *ioqueue,
232 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000233 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000234 pj_activesock_t **p_asock,
235 pj_sockaddr *bound_addr)
236{
237 pj_sock_t sock_fd;
238 pj_sockaddr default_addr;
239 pj_status_t status;
240
241 if (addr == NULL) {
242 pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0);
243 addr = &default_addr;
244 }
245
246 status = pj_sock_socket(addr->addr.sa_family, pj_SOCK_DGRAM(), 0,
247 &sock_fd);
248 if (status != PJ_SUCCESS) {
249 return status;
250 }
251
252 status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr));
253 if (status != PJ_SUCCESS) {
254 pj_sock_close(sock_fd);
255 return status;
256 }
257
258 status = pj_activesock_create(pool, sock_fd, pj_SOCK_DGRAM(), opt,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000259 ioqueue, cb, user_data, p_asock);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000260 if (status != PJ_SUCCESS) {
261 pj_sock_close(sock_fd);
262 return status;
263 }
264
265 if (bound_addr) {
266 int addr_len = sizeof(*bound_addr);
267 status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len);
268 if (status != PJ_SUCCESS) {
269 pj_activesock_close(*p_asock);
270 return status;
271 }
272 }
273
274 return PJ_SUCCESS;
275}
276
277
278PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock)
279{
280 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
281 if (asock->key) {
Sauw Mingbe3771a2010-08-27 06:46:29 +0000282#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
283 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
284 activesock_destroy_iphone_os_stream(asock);
285#endif
286
Benny Prijono4bac2c12008-05-11 18:12:16 +0000287 pj_ioqueue_unregister(asock->key);
288 asock->key = NULL;
289 }
290 return PJ_SUCCESS;
291}
292
293
294PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock,
295 void *user_data)
296{
297 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
298 asock->user_data = user_data;
299 return PJ_SUCCESS;
300}
301
302
303PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock)
304{
305 PJ_ASSERT_RETURN(asock, NULL);
306 return asock->user_data;
307}
308
309
310PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock,
311 pj_pool_t *pool,
312 unsigned buff_size,
313 pj_uint32_t flags)
314{
Benny Prijonobd344ff2008-08-04 09:59:02 +0000315 void **readbuf;
316 unsigned i;
317
318 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
319
320 readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
321 sizeof(void*));
322
323 for (i=0; i<asock->async_count; ++i) {
324 readbuf[i] = pj_pool_alloc(pool, buff_size);
325 }
326
327 return pj_activesock_start_read2(asock, pool, buff_size, readbuf, flags);
328}
329
330
331PJ_DEF(pj_status_t) pj_activesock_start_read2( pj_activesock_t *asock,
332 pj_pool_t *pool,
333 unsigned buff_size,
334 void *readbuf[],
335 pj_uint32_t flags)
336{
Benny Prijono4bac2c12008-05-11 18:12:16 +0000337 unsigned i;
338 pj_status_t status;
339
340 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
341 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
342 PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP);
343
344 asock->read_op = (struct read_op*)
345 pj_pool_calloc(pool, asock->async_count,
346 sizeof(struct read_op));
347 asock->read_type = TYPE_RECV;
348 asock->read_flags = flags;
349
350 for (i=0; i<asock->async_count; ++i) {
351 struct read_op *r = &asock->read_op[i];
352 pj_ssize_t size_to_read;
353
Benny Prijonobd344ff2008-08-04 09:59:02 +0000354 r->pkt = (pj_uint8_t*)readbuf[i];
Benny Prijono4bac2c12008-05-11 18:12:16 +0000355 r->max_size = size_to_read = buff_size;
356
357 status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read,
358 PJ_IOQUEUE_ALWAYS_ASYNC | flags);
359 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
360
361 if (status != PJ_EPENDING)
362 return status;
363 }
364
365 return PJ_SUCCESS;
366}
367
368
369PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock,
370 pj_pool_t *pool,
371 unsigned buff_size,
372 pj_uint32_t flags)
373{
Benny Prijonobd344ff2008-08-04 09:59:02 +0000374 void **readbuf;
375 unsigned i;
376
377 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
378
379 readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
380 sizeof(void*));
381
382 for (i=0; i<asock->async_count; ++i) {
383 readbuf[i] = pj_pool_alloc(pool, buff_size);
384 }
385
386 return pj_activesock_start_recvfrom2(asock, pool, buff_size,
387 readbuf, flags);
388}
389
390
391PJ_DEF(pj_status_t) pj_activesock_start_recvfrom2( pj_activesock_t *asock,
392 pj_pool_t *pool,
393 unsigned buff_size,
394 void *readbuf[],
395 pj_uint32_t flags)
396{
Benny Prijono4bac2c12008-05-11 18:12:16 +0000397 unsigned i;
398 pj_status_t status;
399
400 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
401 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
402
403 asock->read_op = (struct read_op*)
404 pj_pool_calloc(pool, asock->async_count,
405 sizeof(struct read_op));
406 asock->read_type = TYPE_RECV_FROM;
407 asock->read_flags = flags;
408
409 for (i=0; i<asock->async_count; ++i) {
410 struct read_op *r = &asock->read_op[i];
411 pj_ssize_t size_to_read;
412
Benny Prijonobd344ff2008-08-04 09:59:02 +0000413 r->pkt = (pj_uint8_t*) readbuf[i];
Benny Prijono4bac2c12008-05-11 18:12:16 +0000414 r->max_size = size_to_read = buff_size;
415 r->src_addr_len = sizeof(r->src_addr);
416
417 status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt,
418 &size_to_read,
419 PJ_IOQUEUE_ALWAYS_ASYNC | flags,
420 &r->src_addr, &r->src_addr_len);
421 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
422
423 if (status != PJ_EPENDING)
424 return status;
425 }
426
427 return PJ_SUCCESS;
428}
429
430
431static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
432 pj_ioqueue_op_key_t *op_key,
433 pj_ssize_t bytes_read)
434{
435 pj_activesock_t *asock;
436 struct read_op *r = (struct read_op*)op_key;
437 unsigned loop = 0;
438 pj_status_t status;
439
440 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
441
442 do {
443 unsigned flags;
444
445 if (bytes_read > 0) {
446 /*
447 * We've got new data.
448 */
449 pj_size_t remainder;
450 pj_bool_t ret;
451
452 /* Append this new data to existing data. If socket is stream
453 * oriented, user might have left some data in the buffer.
454 * Otherwise if socket is datagram there will be nothing in
455 * existing packet hence the packet will contain only the new
456 * packet.
457 */
458 r->size += bytes_read;
459
460 /* Set default remainder to zero */
461 remainder = 0;
462
463 /* And return value to TRUE */
464 ret = PJ_TRUE;
465
466 /* Notify callback */
467 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
468 ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
469 PJ_SUCCESS, &remainder);
470 } else if (asock->read_type == TYPE_RECV_FROM &&
471 asock->cb.on_data_recvfrom)
472 {
473 ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size,
474 &r->src_addr,
475 r->src_addr_len,
476 PJ_SUCCESS);
477 }
478
479 /* If callback returns false, we have been destroyed! */
480 if (!ret)
481 return;
482
483 /* Only stream oriented socket may leave data in the packet */
484 if (asock->stream_oriented) {
485 r->size = remainder;
486 } else {
487 r->size = 0;
488 }
489
Benny Prijonobd344ff2008-08-04 09:59:02 +0000490 } else if (bytes_read <= 0 &&
491 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
492 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
Benny Prijono3eb59632008-08-26 19:27:23 +0000493 (asock->stream_oriented ||
494 -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)))
Benny Prijonobd344ff2008-08-04 09:59:02 +0000495 {
Benny Prijono4bac2c12008-05-11 18:12:16 +0000496 pj_size_t remainder;
497 pj_bool_t ret;
498
499 if (bytes_read == 0) {
500 /* For stream/connection oriented socket, this means the
501 * connection has been closed. For datagram sockets, it means
502 * we've received datagram with zero length.
503 */
504 if (asock->stream_oriented)
505 status = PJ_EEOF;
506 else
507 status = PJ_SUCCESS;
508 } else {
509 /* This means we've got an error. If this is stream/connection
510 * oriented, it means connection has been closed. For datagram
511 * sockets, it means we've got some error (e.g. EWOULDBLOCK).
512 */
513 status = -bytes_read;
514 }
515
516 /* Set default remainder to zero */
517 remainder = 0;
518
519 /* And return value to TRUE */
520 ret = PJ_TRUE;
521
522 /* Notify callback */
523 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
524 /* For connection oriented socket, we still need to report
525 * the remainder data (if any) to the user to let user do
526 * processing with the remainder data before it closes the
527 * connection.
528 * If there is no remainder data, set the packet to NULL.
529 */
Nanang Izzuddina326fbf2009-10-26 14:09:09 +0000530
531 /* Shouldn't set the packet to NULL, as there may be active
532 * socket user, such as SSL socket, that needs to have access
533 * to the read buffer packet.
534 */
535 //ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL),
536 // r->size, status, &remainder);
537 ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
538 status, &remainder);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000539
540 } else if (asock->read_type == TYPE_RECV_FROM &&
541 asock->cb.on_data_recvfrom)
542 {
543 /* This would always be datagram oriented hence there's
544 * nothing in the packet. We can't be sure if there will be
545 * anything useful in the source_addr, so just put NULL
546 * there too.
547 */
Benny Prijono758decb2008-08-26 17:10:51 +0000548 /* In some scenarios, status may be PJ_SUCCESS. The upper
549 * layer application may not expect the callback to be called
550 * with successful status and NULL data, so lets not call the
551 * callback if the status is PJ_SUCCESS.
552 */
553 if (status != PJ_SUCCESS ) {
554 ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0,
555 NULL, 0, status);
556 }
Benny Prijono4bac2c12008-05-11 18:12:16 +0000557 }
558
559 /* If callback returns false, we have been destroyed! */
560 if (!ret)
561 return;
562
563 /* Only stream oriented socket may leave data in the packet */
564 if (asock->stream_oriented) {
565 r->size = remainder;
566 } else {
567 r->size = 0;
568 }
569 }
570
571 /* Read next data. We limit ourselves to processing max_loop immediate
572 * data, so when the loop counter has exceeded this value, force the
573 * read()/recvfrom() to return pending operation to allow the program
574 * to do other jobs.
575 */
576 bytes_read = r->max_size - r->size;
577 flags = asock->read_flags;
578 if (++loop >= asock->max_loop)
579 flags |= PJ_IOQUEUE_ALWAYS_ASYNC;
580
581 if (asock->read_type == TYPE_RECV) {
582 status = pj_ioqueue_recv(key, op_key, r->pkt + r->size,
583 &bytes_read, flags);
584 } else {
585 r->src_addr_len = sizeof(r->src_addr);
586 status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size,
587 &bytes_read, flags,
588 &r->src_addr, &r->src_addr_len);
589 }
590
Benny Prijono7f6ca732008-08-26 20:47:53 +0000591 if (status == PJ_SUCCESS) {
592 /* Immediate data */
593 ;
594 } else if (status != PJ_EPENDING && status != PJ_ECANCELLED) {
595 /* Error */
Benny Prijono758decb2008-08-26 17:10:51 +0000596 bytes_read = -status;
597 } else {
598 break;
599 }
600 } while (1);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000601
602}
603
604
Benny Prijono417d6052008-07-29 20:15:15 +0000605static pj_status_t send_remaining(pj_activesock_t *asock,
606 pj_ioqueue_op_key_t *send_key)
607{
608 struct send_data *sd = (struct send_data*)send_key->activesock_data;
609 pj_status_t status;
610
611 do {
612 pj_ssize_t size;
613
614 size = sd->len - sd->sent;
615 status = pj_ioqueue_send(asock->key, send_key,
616 sd->data+sd->sent, &size, sd->flags);
617 if (status != PJ_SUCCESS) {
618 /* Pending or error */
619 break;
620 }
621
622 sd->sent += size;
623 if (sd->sent == sd->len) {
624 /* The whole data has been sent. */
625 return PJ_SUCCESS;
626 }
627
628 } while (sd->sent < sd->len);
629
630 return status;
631}
632
633
Benny Prijono4bac2c12008-05-11 18:12:16 +0000634PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
635 pj_ioqueue_op_key_t *send_key,
636 const void *data,
637 pj_ssize_t *size,
638 unsigned flags)
639{
640 PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
641
Benny Prijono417d6052008-07-29 20:15:15 +0000642 send_key->activesock_data = NULL;
643
644 if (asock->whole_data) {
645 pj_ssize_t whole;
646 pj_status_t status;
647
648 whole = *size;
649
650 status = pj_ioqueue_send(asock->key, send_key, data, size, flags);
651 if (status != PJ_SUCCESS) {
652 /* Pending or error */
653 return status;
654 }
655
656 if (*size == whole) {
657 /* The whole data has been sent. */
658 return PJ_SUCCESS;
659 }
660
661 /* Data was partially sent */
662 asock->send_data.data = (pj_uint8_t*)data;
663 asock->send_data.len = whole;
664 asock->send_data.sent = *size;
665 asock->send_data.flags = flags;
666 send_key->activesock_data = &asock->send_data;
667
668 /* Try again */
669 status = send_remaining(asock, send_key);
670 if (status == PJ_SUCCESS) {
671 *size = whole;
672 }
673 return status;
674
675 } else {
676 return pj_ioqueue_send(asock->key, send_key, data, size, flags);
677 }
Benny Prijono4bac2c12008-05-11 18:12:16 +0000678}
679
680
681PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock,
682 pj_ioqueue_op_key_t *send_key,
683 const void *data,
684 pj_ssize_t *size,
685 unsigned flags,
686 const pj_sockaddr_t *addr,
687 int addr_len)
688{
689 PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len,
690 PJ_EINVAL);
691
692 return pj_ioqueue_sendto(asock->key, send_key, data, size, flags,
693 addr, addr_len);
694}
695
696
697static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
698 pj_ioqueue_op_key_t *op_key,
699 pj_ssize_t bytes_sent)
700{
701 pj_activesock_t *asock;
702
703 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
704
Benny Prijono417d6052008-07-29 20:15:15 +0000705 if (bytes_sent > 0 && op_key->activesock_data) {
706 /* whole_data is requested. Make sure we send all the data */
707 struct send_data *sd = (struct send_data*)op_key->activesock_data;
708
709 sd->sent += bytes_sent;
710 if (sd->sent == sd->len) {
711 /* all has been sent */
712 bytes_sent = sd->sent;
713 op_key->activesock_data = NULL;
714 } else {
715 /* send remaining data */
716 pj_status_t status;
717
718 status = send_remaining(asock, op_key);
719 if (status == PJ_EPENDING)
720 return;
721 else if (status == PJ_SUCCESS)
722 bytes_sent = sd->sent;
723 else
724 bytes_sent = -status;
725
726 op_key->activesock_data = NULL;
727 }
728 }
729
Benny Prijono4bac2c12008-05-11 18:12:16 +0000730 if (asock->cb.on_data_sent) {
731 pj_bool_t ret;
732
733 ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent);
734
735 /* If callback returns false, we have been destroyed! */
736 if (!ret)
737 return;
738 }
739}
740
Benny Prijono1dd54202008-07-25 10:45:34 +0000741#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000742PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock,
743 pj_pool_t *pool)
744{
745 unsigned i;
746
747 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
748 PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP);
749
750 asock->accept_op = (struct accept_op*)
751 pj_pool_calloc(pool, asock->async_count,
752 sizeof(struct accept_op));
753 for (i=0; i<asock->async_count; ++i) {
754 struct accept_op *a = &asock->accept_op[i];
755 pj_status_t status;
756
757 do {
758 a->new_sock = PJ_INVALID_SOCKET;
759 a->rem_addr_len = sizeof(a->rem_addr);
760
761 status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock,
762 NULL, &a->rem_addr, &a->rem_addr_len);
763 if (status == PJ_SUCCESS) {
764 /* We've got immediate connection. Not sure if it's a good
765 * idea to call the callback now (probably application will
766 * not be prepared to process it), so lets just silently
767 * close the socket.
768 */
769 pj_sock_close(a->new_sock);
770 }
771 } while (status == PJ_SUCCESS);
772
773 if (status != PJ_EPENDING) {
774 return status;
775 }
776 }
777
778 return PJ_SUCCESS;
779}
780
781
782static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
783 pj_ioqueue_op_key_t *op_key,
784 pj_sock_t new_sock,
785 pj_status_t status)
786{
787 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
788 struct accept_op *accept_op = (struct accept_op*) op_key;
789
Nanang Izzuddin7369d222009-11-03 12:58:54 +0000790 PJ_UNUSED_ARG(new_sock);
791
Benny Prijono4bac2c12008-05-11 18:12:16 +0000792 do {
793 if (status==PJ_SUCCESS && asock->cb.on_accept_complete) {
794 pj_bool_t ret;
795
796 /* Notify callback */
Nanang Izzuddin36aa1fb2009-11-03 12:44:11 +0000797 ret = (*asock->cb.on_accept_complete)(asock, accept_op->new_sock,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000798 &accept_op->rem_addr,
799 accept_op->rem_addr_len);
800
801 /* If callback returns false, we have been destroyed! */
802 if (!ret)
803 return;
804
Sauw Mingbe3771a2010-08-27 06:46:29 +0000805#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
806 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
807 activesock_create_iphone_os_stream(asock);
808#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000809 } else if (status==PJ_SUCCESS) {
810 /* Application doesn't handle the new socket, we need to
811 * close it to avoid resource leak.
812 */
813 pj_sock_close(accept_op->new_sock);
814 }
815
816 /* Prepare next accept() */
817 accept_op->new_sock = PJ_INVALID_SOCKET;
818 accept_op->rem_addr_len = sizeof(accept_op->rem_addr);
819
820 status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock,
821 NULL, &accept_op->rem_addr,
822 &accept_op->rem_addr_len);
823
824 } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
825}
826
827
828PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock,
829 pj_pool_t *pool,
830 const pj_sockaddr_t *remaddr,
831 int addr_len)
832{
833 PJ_UNUSED_ARG(pool);
834 return pj_ioqueue_connect(asock->key, remaddr, addr_len);
835}
836
Benny Prijono4bac2c12008-05-11 18:12:16 +0000837static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
838 pj_status_t status)
839{
840 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
841
842 if (asock->cb.on_connect_complete) {
843 pj_bool_t ret;
844
845 ret = (*asock->cb.on_connect_complete)(asock, status);
846
847 if (!ret) {
848 /* We've been destroyed */
849 return;
850 }
Sauw Mingbe3771a2010-08-27 06:46:29 +0000851
852#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
853 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
854 activesock_create_iphone_os_stream(asock);
855#endif
856
Benny Prijono4bac2c12008-05-11 18:12:16 +0000857 }
858}
Benny Prijono1dd54202008-07-25 10:45:34 +0000859#endif /* PJ_HAS_TCP */
Benny Prijono4bac2c12008-05-11 18:12:16 +0000860