blob: b0332ce6a61022a13ce316b4d11fe4a974974467 [file] [log] [blame]
Benny Prijono4bac2c12008-05-11 18:12:16 +00001/* $Id$ */
2/*
Benny Prijono844653c2008-12-23 17:27:53 +00003 * Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono4bac2c12008-05-11 18:12:16 +00005 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20#include <pj/activesock.h>
Benny Prijonobd344ff2008-08-04 09:59:02 +000021#include <pj/compat/socket.h>
Benny Prijono4bac2c12008-05-11 18:12:16 +000022#include <pj/assert.h>
23#include <pj/errno.h>
Sauw Mingbe3771a2010-08-27 06:46:29 +000024#include <pj/log.h>
Benny Prijono4bac2c12008-05-11 18:12:16 +000025#include <pj/pool.h>
26#include <pj/sock.h>
27#include <pj/string.h>
28
Sauw Mingbe3771a2010-08-27 06:46:29 +000029#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
30 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
31# include <CFNetwork/CFNetwork.h>
Sauw Ming3e310ec2010-10-11 10:59:37 +000032
33 static pj_bool_t ios_bg_support = PJ_TRUE;
Sauw Mingbe3771a2010-08-27 06:46:29 +000034#endif
35
Benny Prijono4bac2c12008-05-11 18:12:16 +000036#define PJ_ACTIVESOCK_MAX_LOOP 50
37
38
39enum read_type
40{
41 TYPE_NONE,
42 TYPE_RECV,
43 TYPE_RECV_FROM
44};
45
46struct read_op
47{
48 pj_ioqueue_op_key_t op_key;
49 pj_uint8_t *pkt;
50 unsigned max_size;
51 pj_size_t size;
52 pj_sockaddr src_addr;
53 int src_addr_len;
54};
55
56struct accept_op
57{
58 pj_ioqueue_op_key_t op_key;
59 pj_sock_t new_sock;
60 pj_sockaddr rem_addr;
61 int rem_addr_len;
62};
63
Benny Prijono417d6052008-07-29 20:15:15 +000064struct send_data
65{
66 pj_uint8_t *data;
67 pj_ssize_t len;
68 pj_ssize_t sent;
69 unsigned flags;
70};
71
Benny Prijono4bac2c12008-05-11 18:12:16 +000072struct pj_activesock_t
73{
74 pj_ioqueue_key_t *key;
75 pj_bool_t stream_oriented;
Benny Prijono417d6052008-07-29 20:15:15 +000076 pj_bool_t whole_data;
Benny Prijono4bac2c12008-05-11 18:12:16 +000077 pj_ioqueue_t *ioqueue;
78 void *user_data;
79 unsigned async_count;
80 unsigned max_loop;
81 pj_activesock_cb cb;
Sauw Mingbe3771a2010-08-27 06:46:29 +000082#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
83 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
84 int bg_setting;
85 pj_sock_t sock;
86 CFReadStreamRef readStream;
87#endif
88
Sauw Ming47b77a82010-09-22 13:11:11 +000089 unsigned err_counter;
90 pj_status_t last_err;
91
Benny Prijono417d6052008-07-29 20:15:15 +000092 struct send_data send_data;
93
Benny Prijono4bac2c12008-05-11 18:12:16 +000094 struct read_op *read_op;
95 pj_uint32_t read_flags;
96 enum read_type read_type;
97
98 struct accept_op *accept_op;
99};
100
101
102static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
103 pj_ioqueue_op_key_t *op_key,
104 pj_ssize_t bytes_read);
105static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
106 pj_ioqueue_op_key_t *op_key,
107 pj_ssize_t bytes_sent);
Benny Prijono1dd54202008-07-25 10:45:34 +0000108#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000109static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
110 pj_ioqueue_op_key_t *op_key,
111 pj_sock_t sock,
112 pj_status_t status);
113static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
114 pj_status_t status);
Benny Prijono1dd54202008-07-25 10:45:34 +0000115#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000116
117PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg)
118{
119 pj_bzero(cfg, sizeof(*cfg));
120 cfg->async_cnt = 1;
121 cfg->concurrency = -1;
Benny Prijono417d6052008-07-29 20:15:15 +0000122 cfg->whole_data = PJ_TRUE;
Benny Prijono4bac2c12008-05-11 18:12:16 +0000123}
124
Sauw Mingbe3771a2010-08-27 06:46:29 +0000125#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
126 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
127static void activesock_destroy_iphone_os_stream(pj_activesock_t *asock)
128{
129 if (asock->readStream) {
130 CFReadStreamClose(asock->readStream);
131 CFRelease(asock->readStream);
132 asock->readStream = NULL;
133 }
134}
135
136static void activesock_create_iphone_os_stream(pj_activesock_t *asock)
137{
Sauw Ming3e310ec2010-10-11 10:59:37 +0000138 if (ios_bg_support && asock->bg_setting && asock->stream_oriented) {
Sauw Mingbe3771a2010-08-27 06:46:29 +0000139 activesock_destroy_iphone_os_stream(asock);
140
141 CFStreamCreatePairWithSocket(kCFAllocatorDefault, asock->sock,
142 &asock->readStream, NULL);
143
144 if (!asock->readStream ||
145 CFReadStreamSetProperty(asock->readStream,
146 kCFStreamNetworkServiceType,
147 kCFStreamNetworkServiceTypeVoIP)
148 != TRUE ||
149 CFReadStreamOpen(asock->readStream) != TRUE)
150 {
151 PJ_LOG(2,("", "Failed to configure TCP transport for VoIP "
152 "usage. Background mode will not be supported."));
153
154 activesock_destroy_iphone_os_stream(asock);
155 }
156 }
157}
158
159
160PJ_DEF(void) pj_activesock_set_iphone_os_bg(pj_activesock_t *asock,
161 int val)
162{
163 asock->bg_setting = val;
164 if (asock->bg_setting)
165 activesock_create_iphone_os_stream(asock);
166 else
167 activesock_destroy_iphone_os_stream(asock);
168}
Sauw Ming3e310ec2010-10-11 10:59:37 +0000169
170PJ_DEF(void) pj_activesock_enable_iphone_os_bg(pj_bool_t val)
171{
172 ios_bg_support = val;
173}
Sauw Mingbe3771a2010-08-27 06:46:29 +0000174#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000175
176PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
177 pj_sock_t sock,
178 int sock_type,
179 const pj_activesock_cfg *opt,
180 pj_ioqueue_t *ioqueue,
181 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000182 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000183 pj_activesock_t **p_asock)
184{
185 pj_activesock_t *asock;
186 pj_ioqueue_callback ioq_cb;
187 pj_status_t status;
188
189 PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL);
190 PJ_ASSERT_RETURN(sock!=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL);
191 PJ_ASSERT_RETURN(sock_type==pj_SOCK_STREAM() ||
192 sock_type==pj_SOCK_DGRAM(), PJ_EINVAL);
193 PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL);
194
195 asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t);
196 asock->ioqueue = ioqueue;
197 asock->stream_oriented = (sock_type == pj_SOCK_STREAM());
198 asock->async_count = (opt? opt->async_cnt : 1);
Benny Prijono417d6052008-07-29 20:15:15 +0000199 asock->whole_data = (opt? opt->whole_data : 1);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000200 asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP;
Benny Prijonoea8e4362008-06-06 14:12:23 +0000201 asock->user_data = user_data;
Benny Prijono4bac2c12008-05-11 18:12:16 +0000202 pj_memcpy(&asock->cb, cb, sizeof(*cb));
203
204 pj_bzero(&ioq_cb, sizeof(ioq_cb));
205 ioq_cb.on_read_complete = &ioqueue_on_read_complete;
206 ioq_cb.on_write_complete = &ioqueue_on_write_complete;
Benny Prijono1dd54202008-07-25 10:45:34 +0000207#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000208 ioq_cb.on_connect_complete = &ioqueue_on_connect_complete;
209 ioq_cb.on_accept_complete = &ioqueue_on_accept_complete;
Benny Prijono1dd54202008-07-25 10:45:34 +0000210#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000211
212 status = pj_ioqueue_register_sock(pool, ioqueue, sock, asock,
213 &ioq_cb, &asock->key);
214 if (status != PJ_SUCCESS) {
215 pj_activesock_close(asock);
216 return status;
217 }
218
Benny Prijono417d6052008-07-29 20:15:15 +0000219 if (asock->whole_data) {
220 /* Must disable concurrency otherwise there is a race condition */
221 pj_ioqueue_set_concurrency(asock->key, 0);
222 } else if (opt && opt->concurrency >= 0) {
Benny Prijono4bac2c12008-05-11 18:12:16 +0000223 pj_ioqueue_set_concurrency(asock->key, opt->concurrency);
224 }
225
Sauw Mingbe3771a2010-08-27 06:46:29 +0000226#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
227 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
228 asock->sock = sock;
Sauw Ming30132a32010-09-22 13:21:40 +0000229 asock->bg_setting = PJ_ACTIVESOCK_TCP_IPHONE_OS_BG;
Sauw Mingbe3771a2010-08-27 06:46:29 +0000230#endif
231
Benny Prijono4bac2c12008-05-11 18:12:16 +0000232 *p_asock = asock;
233 return PJ_SUCCESS;
234}
235
236
237PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool,
238 const pj_sockaddr *addr,
239 const pj_activesock_cfg *opt,
240 pj_ioqueue_t *ioqueue,
241 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000242 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000243 pj_activesock_t **p_asock,
244 pj_sockaddr *bound_addr)
245{
246 pj_sock_t sock_fd;
247 pj_sockaddr default_addr;
248 pj_status_t status;
249
250 if (addr == NULL) {
251 pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0);
252 addr = &default_addr;
253 }
254
255 status = pj_sock_socket(addr->addr.sa_family, pj_SOCK_DGRAM(), 0,
256 &sock_fd);
257 if (status != PJ_SUCCESS) {
258 return status;
259 }
260
261 status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr));
262 if (status != PJ_SUCCESS) {
263 pj_sock_close(sock_fd);
264 return status;
265 }
266
267 status = pj_activesock_create(pool, sock_fd, pj_SOCK_DGRAM(), opt,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000268 ioqueue, cb, user_data, p_asock);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000269 if (status != PJ_SUCCESS) {
270 pj_sock_close(sock_fd);
271 return status;
272 }
273
274 if (bound_addr) {
275 int addr_len = sizeof(*bound_addr);
276 status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len);
277 if (status != PJ_SUCCESS) {
278 pj_activesock_close(*p_asock);
279 return status;
280 }
281 }
282
283 return PJ_SUCCESS;
284}
285
286
287PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock)
288{
289 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
290 if (asock->key) {
Sauw Mingbe3771a2010-08-27 06:46:29 +0000291#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
292 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
293 activesock_destroy_iphone_os_stream(asock);
294#endif
295
Benny Prijono4bac2c12008-05-11 18:12:16 +0000296 pj_ioqueue_unregister(asock->key);
297 asock->key = NULL;
298 }
299 return PJ_SUCCESS;
300}
301
302
303PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock,
304 void *user_data)
305{
306 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
307 asock->user_data = user_data;
308 return PJ_SUCCESS;
309}
310
311
312PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock)
313{
314 PJ_ASSERT_RETURN(asock, NULL);
315 return asock->user_data;
316}
317
318
319PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock,
320 pj_pool_t *pool,
321 unsigned buff_size,
322 pj_uint32_t flags)
323{
Benny Prijonobd344ff2008-08-04 09:59:02 +0000324 void **readbuf;
325 unsigned i;
326
327 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
328
329 readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
330 sizeof(void*));
331
332 for (i=0; i<asock->async_count; ++i) {
333 readbuf[i] = pj_pool_alloc(pool, buff_size);
334 }
335
336 return pj_activesock_start_read2(asock, pool, buff_size, readbuf, flags);
337}
338
339
340PJ_DEF(pj_status_t) pj_activesock_start_read2( pj_activesock_t *asock,
341 pj_pool_t *pool,
342 unsigned buff_size,
343 void *readbuf[],
344 pj_uint32_t flags)
345{
Benny Prijono4bac2c12008-05-11 18:12:16 +0000346 unsigned i;
347 pj_status_t status;
348
349 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
350 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
351 PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP);
352
353 asock->read_op = (struct read_op*)
354 pj_pool_calloc(pool, asock->async_count,
355 sizeof(struct read_op));
356 asock->read_type = TYPE_RECV;
357 asock->read_flags = flags;
358
359 for (i=0; i<asock->async_count; ++i) {
360 struct read_op *r = &asock->read_op[i];
361 pj_ssize_t size_to_read;
362
Benny Prijonobd344ff2008-08-04 09:59:02 +0000363 r->pkt = (pj_uint8_t*)readbuf[i];
Benny Prijono4bac2c12008-05-11 18:12:16 +0000364 r->max_size = size_to_read = buff_size;
365
366 status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read,
367 PJ_IOQUEUE_ALWAYS_ASYNC | flags);
368 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
369
370 if (status != PJ_EPENDING)
371 return status;
372 }
373
374 return PJ_SUCCESS;
375}
376
377
378PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock,
379 pj_pool_t *pool,
380 unsigned buff_size,
381 pj_uint32_t flags)
382{
Benny Prijonobd344ff2008-08-04 09:59:02 +0000383 void **readbuf;
384 unsigned i;
385
386 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
387
388 readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
389 sizeof(void*));
390
391 for (i=0; i<asock->async_count; ++i) {
392 readbuf[i] = pj_pool_alloc(pool, buff_size);
393 }
394
395 return pj_activesock_start_recvfrom2(asock, pool, buff_size,
396 readbuf, flags);
397}
398
399
400PJ_DEF(pj_status_t) pj_activesock_start_recvfrom2( pj_activesock_t *asock,
401 pj_pool_t *pool,
402 unsigned buff_size,
403 void *readbuf[],
404 pj_uint32_t flags)
405{
Benny Prijono4bac2c12008-05-11 18:12:16 +0000406 unsigned i;
407 pj_status_t status;
408
409 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
410 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
411
412 asock->read_op = (struct read_op*)
413 pj_pool_calloc(pool, asock->async_count,
414 sizeof(struct read_op));
415 asock->read_type = TYPE_RECV_FROM;
416 asock->read_flags = flags;
417
418 for (i=0; i<asock->async_count; ++i) {
419 struct read_op *r = &asock->read_op[i];
420 pj_ssize_t size_to_read;
421
Benny Prijonobd344ff2008-08-04 09:59:02 +0000422 r->pkt = (pj_uint8_t*) readbuf[i];
Benny Prijono4bac2c12008-05-11 18:12:16 +0000423 r->max_size = size_to_read = buff_size;
424 r->src_addr_len = sizeof(r->src_addr);
425
426 status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt,
427 &size_to_read,
428 PJ_IOQUEUE_ALWAYS_ASYNC | flags,
429 &r->src_addr, &r->src_addr_len);
430 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
431
432 if (status != PJ_EPENDING)
433 return status;
434 }
435
436 return PJ_SUCCESS;
437}
438
439
440static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
441 pj_ioqueue_op_key_t *op_key,
442 pj_ssize_t bytes_read)
443{
444 pj_activesock_t *asock;
445 struct read_op *r = (struct read_op*)op_key;
446 unsigned loop = 0;
447 pj_status_t status;
448
449 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
450
451 do {
452 unsigned flags;
453
454 if (bytes_read > 0) {
455 /*
456 * We've got new data.
457 */
458 pj_size_t remainder;
459 pj_bool_t ret;
460
461 /* Append this new data to existing data. If socket is stream
462 * oriented, user might have left some data in the buffer.
463 * Otherwise if socket is datagram there will be nothing in
464 * existing packet hence the packet will contain only the new
465 * packet.
466 */
467 r->size += bytes_read;
468
469 /* Set default remainder to zero */
470 remainder = 0;
471
472 /* And return value to TRUE */
473 ret = PJ_TRUE;
474
475 /* Notify callback */
476 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
477 ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
478 PJ_SUCCESS, &remainder);
479 } else if (asock->read_type == TYPE_RECV_FROM &&
480 asock->cb.on_data_recvfrom)
481 {
482 ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size,
483 &r->src_addr,
484 r->src_addr_len,
485 PJ_SUCCESS);
486 }
487
488 /* If callback returns false, we have been destroyed! */
489 if (!ret)
490 return;
491
492 /* Only stream oriented socket may leave data in the packet */
493 if (asock->stream_oriented) {
494 r->size = remainder;
495 } else {
496 r->size = 0;
497 }
498
Benny Prijonobd344ff2008-08-04 09:59:02 +0000499 } else if (bytes_read <= 0 &&
500 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
501 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
Benny Prijono3eb59632008-08-26 19:27:23 +0000502 (asock->stream_oriented ||
503 -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)))
Benny Prijonobd344ff2008-08-04 09:59:02 +0000504 {
Benny Prijono4bac2c12008-05-11 18:12:16 +0000505 pj_size_t remainder;
506 pj_bool_t ret;
507
508 if (bytes_read == 0) {
509 /* For stream/connection oriented socket, this means the
510 * connection has been closed. For datagram sockets, it means
511 * we've received datagram with zero length.
512 */
513 if (asock->stream_oriented)
514 status = PJ_EEOF;
515 else
516 status = PJ_SUCCESS;
517 } else {
518 /* This means we've got an error. If this is stream/connection
519 * oriented, it means connection has been closed. For datagram
520 * sockets, it means we've got some error (e.g. EWOULDBLOCK).
521 */
522 status = -bytes_read;
523 }
524
525 /* Set default remainder to zero */
526 remainder = 0;
527
528 /* And return value to TRUE */
529 ret = PJ_TRUE;
530
531 /* Notify callback */
532 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
533 /* For connection oriented socket, we still need to report
534 * the remainder data (if any) to the user to let user do
535 * processing with the remainder data before it closes the
536 * connection.
537 * If there is no remainder data, set the packet to NULL.
538 */
Nanang Izzuddina326fbf2009-10-26 14:09:09 +0000539
540 /* Shouldn't set the packet to NULL, as there may be active
541 * socket user, such as SSL socket, that needs to have access
542 * to the read buffer packet.
543 */
544 //ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL),
545 // r->size, status, &remainder);
546 ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
547 status, &remainder);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000548
549 } else if (asock->read_type == TYPE_RECV_FROM &&
550 asock->cb.on_data_recvfrom)
551 {
552 /* This would always be datagram oriented hence there's
553 * nothing in the packet. We can't be sure if there will be
554 * anything useful in the source_addr, so just put NULL
555 * there too.
556 */
Benny Prijono758decb2008-08-26 17:10:51 +0000557 /* In some scenarios, status may be PJ_SUCCESS. The upper
558 * layer application may not expect the callback to be called
559 * with successful status and NULL data, so lets not call the
560 * callback if the status is PJ_SUCCESS.
561 */
562 if (status != PJ_SUCCESS ) {
563 ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0,
564 NULL, 0, status);
565 }
Benny Prijono4bac2c12008-05-11 18:12:16 +0000566 }
567
568 /* If callback returns false, we have been destroyed! */
569 if (!ret)
570 return;
571
572 /* Only stream oriented socket may leave data in the packet */
573 if (asock->stream_oriented) {
574 r->size = remainder;
575 } else {
576 r->size = 0;
577 }
578 }
579
580 /* Read next data. We limit ourselves to processing max_loop immediate
581 * data, so when the loop counter has exceeded this value, force the
582 * read()/recvfrom() to return pending operation to allow the program
583 * to do other jobs.
584 */
585 bytes_read = r->max_size - r->size;
586 flags = asock->read_flags;
587 if (++loop >= asock->max_loop)
588 flags |= PJ_IOQUEUE_ALWAYS_ASYNC;
589
590 if (asock->read_type == TYPE_RECV) {
591 status = pj_ioqueue_recv(key, op_key, r->pkt + r->size,
592 &bytes_read, flags);
593 } else {
594 r->src_addr_len = sizeof(r->src_addr);
595 status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size,
596 &bytes_read, flags,
597 &r->src_addr, &r->src_addr_len);
598 }
599
Benny Prijono7f6ca732008-08-26 20:47:53 +0000600 if (status == PJ_SUCCESS) {
601 /* Immediate data */
602 ;
603 } else if (status != PJ_EPENDING && status != PJ_ECANCELLED) {
604 /* Error */
Benny Prijono758decb2008-08-26 17:10:51 +0000605 bytes_read = -status;
606 } else {
607 break;
608 }
609 } while (1);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000610
611}
612
613
Benny Prijono417d6052008-07-29 20:15:15 +0000614static pj_status_t send_remaining(pj_activesock_t *asock,
615 pj_ioqueue_op_key_t *send_key)
616{
617 struct send_data *sd = (struct send_data*)send_key->activesock_data;
618 pj_status_t status;
619
620 do {
621 pj_ssize_t size;
622
623 size = sd->len - sd->sent;
624 status = pj_ioqueue_send(asock->key, send_key,
625 sd->data+sd->sent, &size, sd->flags);
626 if (status != PJ_SUCCESS) {
627 /* Pending or error */
628 break;
629 }
630
631 sd->sent += size;
632 if (sd->sent == sd->len) {
633 /* The whole data has been sent. */
634 return PJ_SUCCESS;
635 }
636
637 } while (sd->sent < sd->len);
638
639 return status;
640}
641
642
Benny Prijono4bac2c12008-05-11 18:12:16 +0000643PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
644 pj_ioqueue_op_key_t *send_key,
645 const void *data,
646 pj_ssize_t *size,
647 unsigned flags)
648{
649 PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
650
Benny Prijono417d6052008-07-29 20:15:15 +0000651 send_key->activesock_data = NULL;
652
653 if (asock->whole_data) {
654 pj_ssize_t whole;
655 pj_status_t status;
656
657 whole = *size;
658
659 status = pj_ioqueue_send(asock->key, send_key, data, size, flags);
660 if (status != PJ_SUCCESS) {
661 /* Pending or error */
662 return status;
663 }
664
665 if (*size == whole) {
666 /* The whole data has been sent. */
667 return PJ_SUCCESS;
668 }
669
670 /* Data was partially sent */
671 asock->send_data.data = (pj_uint8_t*)data;
672 asock->send_data.len = whole;
673 asock->send_data.sent = *size;
674 asock->send_data.flags = flags;
675 send_key->activesock_data = &asock->send_data;
676
677 /* Try again */
678 status = send_remaining(asock, send_key);
679 if (status == PJ_SUCCESS) {
680 *size = whole;
681 }
682 return status;
683
684 } else {
685 return pj_ioqueue_send(asock->key, send_key, data, size, flags);
686 }
Benny Prijono4bac2c12008-05-11 18:12:16 +0000687}
688
689
690PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock,
691 pj_ioqueue_op_key_t *send_key,
692 const void *data,
693 pj_ssize_t *size,
694 unsigned flags,
695 const pj_sockaddr_t *addr,
696 int addr_len)
697{
698 PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len,
699 PJ_EINVAL);
700
701 return pj_ioqueue_sendto(asock->key, send_key, data, size, flags,
702 addr, addr_len);
703}
704
705
706static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
707 pj_ioqueue_op_key_t *op_key,
708 pj_ssize_t bytes_sent)
709{
710 pj_activesock_t *asock;
711
712 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
713
Benny Prijono417d6052008-07-29 20:15:15 +0000714 if (bytes_sent > 0 && op_key->activesock_data) {
715 /* whole_data is requested. Make sure we send all the data */
716 struct send_data *sd = (struct send_data*)op_key->activesock_data;
717
718 sd->sent += bytes_sent;
719 if (sd->sent == sd->len) {
720 /* all has been sent */
721 bytes_sent = sd->sent;
722 op_key->activesock_data = NULL;
723 } else {
724 /* send remaining data */
725 pj_status_t status;
726
727 status = send_remaining(asock, op_key);
728 if (status == PJ_EPENDING)
729 return;
730 else if (status == PJ_SUCCESS)
731 bytes_sent = sd->sent;
732 else
733 bytes_sent = -status;
734
735 op_key->activesock_data = NULL;
736 }
737 }
738
Benny Prijono4bac2c12008-05-11 18:12:16 +0000739 if (asock->cb.on_data_sent) {
740 pj_bool_t ret;
741
742 ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent);
743
744 /* If callback returns false, we have been destroyed! */
745 if (!ret)
746 return;
747 }
748}
749
Benny Prijono1dd54202008-07-25 10:45:34 +0000750#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000751PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock,
752 pj_pool_t *pool)
753{
754 unsigned i;
755
756 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
757 PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP);
758
759 asock->accept_op = (struct accept_op*)
760 pj_pool_calloc(pool, asock->async_count,
761 sizeof(struct accept_op));
762 for (i=0; i<asock->async_count; ++i) {
763 struct accept_op *a = &asock->accept_op[i];
764 pj_status_t status;
765
766 do {
767 a->new_sock = PJ_INVALID_SOCKET;
768 a->rem_addr_len = sizeof(a->rem_addr);
769
770 status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock,
771 NULL, &a->rem_addr, &a->rem_addr_len);
772 if (status == PJ_SUCCESS) {
773 /* We've got immediate connection. Not sure if it's a good
774 * idea to call the callback now (probably application will
775 * not be prepared to process it), so lets just silently
776 * close the socket.
777 */
778 pj_sock_close(a->new_sock);
779 }
780 } while (status == PJ_SUCCESS);
781
782 if (status != PJ_EPENDING) {
783 return status;
784 }
785 }
786
787 return PJ_SUCCESS;
788}
789
790
791static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
792 pj_ioqueue_op_key_t *op_key,
793 pj_sock_t new_sock,
794 pj_status_t status)
795{
796 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
797 struct accept_op *accept_op = (struct accept_op*) op_key;
798
Nanang Izzuddin7369d222009-11-03 12:58:54 +0000799 PJ_UNUSED_ARG(new_sock);
800
Benny Prijono4bac2c12008-05-11 18:12:16 +0000801 do {
Sauw Ming47b77a82010-09-22 13:11:11 +0000802 if (status == asock->last_err && status != PJ_SUCCESS) {
803 asock->err_counter++;
804 if (asock->err_counter >= PJ_ACTIVESOCK_MAX_CONSECUTIVE_ACCEPT_ERROR) {
805 PJ_LOG(3, ("", "Received %d consecutive errors: %d for the accept()"
806 " operation, stopping further ioqueue accepts.",
807 asock->err_counter, asock->last_err));
808 return;
809 }
810 } else {
811 asock->err_counter = 0;
812 asock->last_err = status;
813 }
814
Benny Prijono4bac2c12008-05-11 18:12:16 +0000815 if (status==PJ_SUCCESS && asock->cb.on_accept_complete) {
816 pj_bool_t ret;
817
818 /* Notify callback */
Nanang Izzuddin36aa1fb2009-11-03 12:44:11 +0000819 ret = (*asock->cb.on_accept_complete)(asock, accept_op->new_sock,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000820 &accept_op->rem_addr,
821 accept_op->rem_addr_len);
822
823 /* If callback returns false, we have been destroyed! */
824 if (!ret)
825 return;
826
Sauw Mingbe3771a2010-08-27 06:46:29 +0000827#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
828 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
829 activesock_create_iphone_os_stream(asock);
830#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000831 } else if (status==PJ_SUCCESS) {
832 /* Application doesn't handle the new socket, we need to
833 * close it to avoid resource leak.
834 */
835 pj_sock_close(accept_op->new_sock);
836 }
837
838 /* Prepare next accept() */
839 accept_op->new_sock = PJ_INVALID_SOCKET;
840 accept_op->rem_addr_len = sizeof(accept_op->rem_addr);
841
842 status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock,
843 NULL, &accept_op->rem_addr,
844 &accept_op->rem_addr_len);
845
846 } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
847}
848
849
850PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock,
851 pj_pool_t *pool,
852 const pj_sockaddr_t *remaddr,
853 int addr_len)
854{
855 PJ_UNUSED_ARG(pool);
856 return pj_ioqueue_connect(asock->key, remaddr, addr_len);
857}
858
Benny Prijono4bac2c12008-05-11 18:12:16 +0000859static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
860 pj_status_t status)
861{
862 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
863
864 if (asock->cb.on_connect_complete) {
865 pj_bool_t ret;
866
867 ret = (*asock->cb.on_connect_complete)(asock, status);
868
869 if (!ret) {
870 /* We've been destroyed */
871 return;
872 }
Sauw Mingbe3771a2010-08-27 06:46:29 +0000873
874#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
875 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
876 activesock_create_iphone_os_stream(asock);
877#endif
878
Benny Prijono4bac2c12008-05-11 18:12:16 +0000879 }
880}
Benny Prijono1dd54202008-07-25 10:45:34 +0000881#endif /* PJ_HAS_TCP */
Benny Prijono4bac2c12008-05-11 18:12:16 +0000882