blob: 016c1d34fd7645e9ee4928224b37a64a775711cc [file] [log] [blame]
Benny Prijono4bac2c12008-05-11 18:12:16 +00001/* $Id$ */
2/*
Benny Prijono844653c2008-12-23 17:27:53 +00003 * Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono4bac2c12008-05-11 18:12:16 +00005 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20#include <pj/activesock.h>
Benny Prijonobd344ff2008-08-04 09:59:02 +000021#include <pj/compat/socket.h>
Benny Prijono4bac2c12008-05-11 18:12:16 +000022#include <pj/assert.h>
23#include <pj/errno.h>
Sauw Mingbe3771a2010-08-27 06:46:29 +000024#include <pj/log.h>
Benny Prijono4bac2c12008-05-11 18:12:16 +000025#include <pj/pool.h>
26#include <pj/sock.h>
27#include <pj/string.h>
28
Sauw Mingbe3771a2010-08-27 06:46:29 +000029#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
30 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
31# include <CFNetwork/CFNetwork.h>
32#endif
33
Benny Prijono4bac2c12008-05-11 18:12:16 +000034#define PJ_ACTIVESOCK_MAX_LOOP 50
35
36
37enum read_type
38{
39 TYPE_NONE,
40 TYPE_RECV,
41 TYPE_RECV_FROM
42};
43
44struct read_op
45{
46 pj_ioqueue_op_key_t op_key;
47 pj_uint8_t *pkt;
48 unsigned max_size;
49 pj_size_t size;
50 pj_sockaddr src_addr;
51 int src_addr_len;
52};
53
54struct accept_op
55{
56 pj_ioqueue_op_key_t op_key;
57 pj_sock_t new_sock;
58 pj_sockaddr rem_addr;
59 int rem_addr_len;
60};
61
Benny Prijono417d6052008-07-29 20:15:15 +000062struct send_data
63{
64 pj_uint8_t *data;
65 pj_ssize_t len;
66 pj_ssize_t sent;
67 unsigned flags;
68};
69
Benny Prijono4bac2c12008-05-11 18:12:16 +000070struct pj_activesock_t
71{
72 pj_ioqueue_key_t *key;
73 pj_bool_t stream_oriented;
Benny Prijono417d6052008-07-29 20:15:15 +000074 pj_bool_t whole_data;
Benny Prijono4bac2c12008-05-11 18:12:16 +000075 pj_ioqueue_t *ioqueue;
76 void *user_data;
77 unsigned async_count;
78 unsigned max_loop;
79 pj_activesock_cb cb;
Sauw Mingbe3771a2010-08-27 06:46:29 +000080#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
81 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
82 int bg_setting;
83 pj_sock_t sock;
84 CFReadStreamRef readStream;
85#endif
86
Sauw Ming47b77a82010-09-22 13:11:11 +000087 unsigned err_counter;
88 pj_status_t last_err;
89
Benny Prijono417d6052008-07-29 20:15:15 +000090 struct send_data send_data;
91
Benny Prijono4bac2c12008-05-11 18:12:16 +000092 struct read_op *read_op;
93 pj_uint32_t read_flags;
94 enum read_type read_type;
95
96 struct accept_op *accept_op;
97};
98
99
100static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
101 pj_ioqueue_op_key_t *op_key,
102 pj_ssize_t bytes_read);
103static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
104 pj_ioqueue_op_key_t *op_key,
105 pj_ssize_t bytes_sent);
Benny Prijono1dd54202008-07-25 10:45:34 +0000106#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000107static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
108 pj_ioqueue_op_key_t *op_key,
109 pj_sock_t sock,
110 pj_status_t status);
111static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
112 pj_status_t status);
Benny Prijono1dd54202008-07-25 10:45:34 +0000113#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000114
115PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg)
116{
117 pj_bzero(cfg, sizeof(*cfg));
118 cfg->async_cnt = 1;
119 cfg->concurrency = -1;
Benny Prijono417d6052008-07-29 20:15:15 +0000120 cfg->whole_data = PJ_TRUE;
Benny Prijono4bac2c12008-05-11 18:12:16 +0000121}
122
Sauw Mingbe3771a2010-08-27 06:46:29 +0000123#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
124 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
125static void activesock_destroy_iphone_os_stream(pj_activesock_t *asock)
126{
127 if (asock->readStream) {
128 CFReadStreamClose(asock->readStream);
129 CFRelease(asock->readStream);
130 asock->readStream = NULL;
131 }
132}
133
134static void activesock_create_iphone_os_stream(pj_activesock_t *asock)
135{
136 if (asock->bg_setting && asock->stream_oriented) {
137 activesock_destroy_iphone_os_stream(asock);
138
139 CFStreamCreatePairWithSocket(kCFAllocatorDefault, asock->sock,
140 &asock->readStream, NULL);
141
142 if (!asock->readStream ||
143 CFReadStreamSetProperty(asock->readStream,
144 kCFStreamNetworkServiceType,
145 kCFStreamNetworkServiceTypeVoIP)
146 != TRUE ||
147 CFReadStreamOpen(asock->readStream) != TRUE)
148 {
149 PJ_LOG(2,("", "Failed to configure TCP transport for VoIP "
150 "usage. Background mode will not be supported."));
151
152 activesock_destroy_iphone_os_stream(asock);
153 }
154 }
155}
156
157
158PJ_DEF(void) pj_activesock_set_iphone_os_bg(pj_activesock_t *asock,
159 int val)
160{
161 asock->bg_setting = val;
162 if (asock->bg_setting)
163 activesock_create_iphone_os_stream(asock);
164 else
165 activesock_destroy_iphone_os_stream(asock);
166}
167#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000168
169PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
170 pj_sock_t sock,
171 int sock_type,
172 const pj_activesock_cfg *opt,
173 pj_ioqueue_t *ioqueue,
174 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000175 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000176 pj_activesock_t **p_asock)
177{
178 pj_activesock_t *asock;
179 pj_ioqueue_callback ioq_cb;
180 pj_status_t status;
181
182 PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL);
183 PJ_ASSERT_RETURN(sock!=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL);
184 PJ_ASSERT_RETURN(sock_type==pj_SOCK_STREAM() ||
185 sock_type==pj_SOCK_DGRAM(), PJ_EINVAL);
186 PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL);
187
188 asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t);
189 asock->ioqueue = ioqueue;
190 asock->stream_oriented = (sock_type == pj_SOCK_STREAM());
191 asock->async_count = (opt? opt->async_cnt : 1);
Benny Prijono417d6052008-07-29 20:15:15 +0000192 asock->whole_data = (opt? opt->whole_data : 1);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000193 asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP;
Benny Prijonoea8e4362008-06-06 14:12:23 +0000194 asock->user_data = user_data;
Benny Prijono4bac2c12008-05-11 18:12:16 +0000195 pj_memcpy(&asock->cb, cb, sizeof(*cb));
196
197 pj_bzero(&ioq_cb, sizeof(ioq_cb));
198 ioq_cb.on_read_complete = &ioqueue_on_read_complete;
199 ioq_cb.on_write_complete = &ioqueue_on_write_complete;
Benny Prijono1dd54202008-07-25 10:45:34 +0000200#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000201 ioq_cb.on_connect_complete = &ioqueue_on_connect_complete;
202 ioq_cb.on_accept_complete = &ioqueue_on_accept_complete;
Benny Prijono1dd54202008-07-25 10:45:34 +0000203#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000204
205 status = pj_ioqueue_register_sock(pool, ioqueue, sock, asock,
206 &ioq_cb, &asock->key);
207 if (status != PJ_SUCCESS) {
208 pj_activesock_close(asock);
209 return status;
210 }
211
Benny Prijono417d6052008-07-29 20:15:15 +0000212 if (asock->whole_data) {
213 /* Must disable concurrency otherwise there is a race condition */
214 pj_ioqueue_set_concurrency(asock->key, 0);
215 } else if (opt && opt->concurrency >= 0) {
Benny Prijono4bac2c12008-05-11 18:12:16 +0000216 pj_ioqueue_set_concurrency(asock->key, opt->concurrency);
217 }
218
Sauw Mingbe3771a2010-08-27 06:46:29 +0000219#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
220 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
221 asock->sock = sock;
222 pj_activesock_set_iphone_os_bg(asock,
223 PJ_ACTIVESOCK_TCP_IPHONE_OS_BG);
224#endif
225
Benny Prijono4bac2c12008-05-11 18:12:16 +0000226 *p_asock = asock;
227 return PJ_SUCCESS;
228}
229
230
231PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool,
232 const pj_sockaddr *addr,
233 const pj_activesock_cfg *opt,
234 pj_ioqueue_t *ioqueue,
235 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000236 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000237 pj_activesock_t **p_asock,
238 pj_sockaddr *bound_addr)
239{
240 pj_sock_t sock_fd;
241 pj_sockaddr default_addr;
242 pj_status_t status;
243
244 if (addr == NULL) {
245 pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0);
246 addr = &default_addr;
247 }
248
249 status = pj_sock_socket(addr->addr.sa_family, pj_SOCK_DGRAM(), 0,
250 &sock_fd);
251 if (status != PJ_SUCCESS) {
252 return status;
253 }
254
255 status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr));
256 if (status != PJ_SUCCESS) {
257 pj_sock_close(sock_fd);
258 return status;
259 }
260
261 status = pj_activesock_create(pool, sock_fd, pj_SOCK_DGRAM(), opt,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000262 ioqueue, cb, user_data, p_asock);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000263 if (status != PJ_SUCCESS) {
264 pj_sock_close(sock_fd);
265 return status;
266 }
267
268 if (bound_addr) {
269 int addr_len = sizeof(*bound_addr);
270 status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len);
271 if (status != PJ_SUCCESS) {
272 pj_activesock_close(*p_asock);
273 return status;
274 }
275 }
276
277 return PJ_SUCCESS;
278}
279
280
281PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock)
282{
283 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
284 if (asock->key) {
Sauw Mingbe3771a2010-08-27 06:46:29 +0000285#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
286 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
287 activesock_destroy_iphone_os_stream(asock);
288#endif
289
Benny Prijono4bac2c12008-05-11 18:12:16 +0000290 pj_ioqueue_unregister(asock->key);
291 asock->key = NULL;
292 }
293 return PJ_SUCCESS;
294}
295
296
297PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock,
298 void *user_data)
299{
300 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
301 asock->user_data = user_data;
302 return PJ_SUCCESS;
303}
304
305
306PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock)
307{
308 PJ_ASSERT_RETURN(asock, NULL);
309 return asock->user_data;
310}
311
312
313PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock,
314 pj_pool_t *pool,
315 unsigned buff_size,
316 pj_uint32_t flags)
317{
Benny Prijonobd344ff2008-08-04 09:59:02 +0000318 void **readbuf;
319 unsigned i;
320
321 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
322
323 readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
324 sizeof(void*));
325
326 for (i=0; i<asock->async_count; ++i) {
327 readbuf[i] = pj_pool_alloc(pool, buff_size);
328 }
329
330 return pj_activesock_start_read2(asock, pool, buff_size, readbuf, flags);
331}
332
333
334PJ_DEF(pj_status_t) pj_activesock_start_read2( pj_activesock_t *asock,
335 pj_pool_t *pool,
336 unsigned buff_size,
337 void *readbuf[],
338 pj_uint32_t flags)
339{
Benny Prijono4bac2c12008-05-11 18:12:16 +0000340 unsigned i;
341 pj_status_t status;
342
343 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
344 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
345 PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP);
346
347 asock->read_op = (struct read_op*)
348 pj_pool_calloc(pool, asock->async_count,
349 sizeof(struct read_op));
350 asock->read_type = TYPE_RECV;
351 asock->read_flags = flags;
352
353 for (i=0; i<asock->async_count; ++i) {
354 struct read_op *r = &asock->read_op[i];
355 pj_ssize_t size_to_read;
356
Benny Prijonobd344ff2008-08-04 09:59:02 +0000357 r->pkt = (pj_uint8_t*)readbuf[i];
Benny Prijono4bac2c12008-05-11 18:12:16 +0000358 r->max_size = size_to_read = buff_size;
359
360 status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read,
361 PJ_IOQUEUE_ALWAYS_ASYNC | flags);
362 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
363
364 if (status != PJ_EPENDING)
365 return status;
366 }
367
368 return PJ_SUCCESS;
369}
370
371
372PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock,
373 pj_pool_t *pool,
374 unsigned buff_size,
375 pj_uint32_t flags)
376{
Benny Prijonobd344ff2008-08-04 09:59:02 +0000377 void **readbuf;
378 unsigned i;
379
380 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
381
382 readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
383 sizeof(void*));
384
385 for (i=0; i<asock->async_count; ++i) {
386 readbuf[i] = pj_pool_alloc(pool, buff_size);
387 }
388
389 return pj_activesock_start_recvfrom2(asock, pool, buff_size,
390 readbuf, flags);
391}
392
393
394PJ_DEF(pj_status_t) pj_activesock_start_recvfrom2( pj_activesock_t *asock,
395 pj_pool_t *pool,
396 unsigned buff_size,
397 void *readbuf[],
398 pj_uint32_t flags)
399{
Benny Prijono4bac2c12008-05-11 18:12:16 +0000400 unsigned i;
401 pj_status_t status;
402
403 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
404 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
405
406 asock->read_op = (struct read_op*)
407 pj_pool_calloc(pool, asock->async_count,
408 sizeof(struct read_op));
409 asock->read_type = TYPE_RECV_FROM;
410 asock->read_flags = flags;
411
412 for (i=0; i<asock->async_count; ++i) {
413 struct read_op *r = &asock->read_op[i];
414 pj_ssize_t size_to_read;
415
Benny Prijonobd344ff2008-08-04 09:59:02 +0000416 r->pkt = (pj_uint8_t*) readbuf[i];
Benny Prijono4bac2c12008-05-11 18:12:16 +0000417 r->max_size = size_to_read = buff_size;
418 r->src_addr_len = sizeof(r->src_addr);
419
420 status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt,
421 &size_to_read,
422 PJ_IOQUEUE_ALWAYS_ASYNC | flags,
423 &r->src_addr, &r->src_addr_len);
424 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
425
426 if (status != PJ_EPENDING)
427 return status;
428 }
429
430 return PJ_SUCCESS;
431}
432
433
434static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
435 pj_ioqueue_op_key_t *op_key,
436 pj_ssize_t bytes_read)
437{
438 pj_activesock_t *asock;
439 struct read_op *r = (struct read_op*)op_key;
440 unsigned loop = 0;
441 pj_status_t status;
442
443 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
444
445 do {
446 unsigned flags;
447
448 if (bytes_read > 0) {
449 /*
450 * We've got new data.
451 */
452 pj_size_t remainder;
453 pj_bool_t ret;
454
455 /* Append this new data to existing data. If socket is stream
456 * oriented, user might have left some data in the buffer.
457 * Otherwise if socket is datagram there will be nothing in
458 * existing packet hence the packet will contain only the new
459 * packet.
460 */
461 r->size += bytes_read;
462
463 /* Set default remainder to zero */
464 remainder = 0;
465
466 /* And return value to TRUE */
467 ret = PJ_TRUE;
468
469 /* Notify callback */
470 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
471 ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
472 PJ_SUCCESS, &remainder);
473 } else if (asock->read_type == TYPE_RECV_FROM &&
474 asock->cb.on_data_recvfrom)
475 {
476 ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size,
477 &r->src_addr,
478 r->src_addr_len,
479 PJ_SUCCESS);
480 }
481
482 /* If callback returns false, we have been destroyed! */
483 if (!ret)
484 return;
485
486 /* Only stream oriented socket may leave data in the packet */
487 if (asock->stream_oriented) {
488 r->size = remainder;
489 } else {
490 r->size = 0;
491 }
492
Benny Prijonobd344ff2008-08-04 09:59:02 +0000493 } else if (bytes_read <= 0 &&
494 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
495 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
Benny Prijono3eb59632008-08-26 19:27:23 +0000496 (asock->stream_oriented ||
497 -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)))
Benny Prijonobd344ff2008-08-04 09:59:02 +0000498 {
Benny Prijono4bac2c12008-05-11 18:12:16 +0000499 pj_size_t remainder;
500 pj_bool_t ret;
501
502 if (bytes_read == 0) {
503 /* For stream/connection oriented socket, this means the
504 * connection has been closed. For datagram sockets, it means
505 * we've received datagram with zero length.
506 */
507 if (asock->stream_oriented)
508 status = PJ_EEOF;
509 else
510 status = PJ_SUCCESS;
511 } else {
512 /* This means we've got an error. If this is stream/connection
513 * oriented, it means connection has been closed. For datagram
514 * sockets, it means we've got some error (e.g. EWOULDBLOCK).
515 */
516 status = -bytes_read;
517 }
518
519 /* Set default remainder to zero */
520 remainder = 0;
521
522 /* And return value to TRUE */
523 ret = PJ_TRUE;
524
525 /* Notify callback */
526 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
527 /* For connection oriented socket, we still need to report
528 * the remainder data (if any) to the user to let user do
529 * processing with the remainder data before it closes the
530 * connection.
531 * If there is no remainder data, set the packet to NULL.
532 */
Nanang Izzuddina326fbf2009-10-26 14:09:09 +0000533
534 /* Shouldn't set the packet to NULL, as there may be active
535 * socket user, such as SSL socket, that needs to have access
536 * to the read buffer packet.
537 */
538 //ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL),
539 // r->size, status, &remainder);
540 ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
541 status, &remainder);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000542
543 } else if (asock->read_type == TYPE_RECV_FROM &&
544 asock->cb.on_data_recvfrom)
545 {
546 /* This would always be datagram oriented hence there's
547 * nothing in the packet. We can't be sure if there will be
548 * anything useful in the source_addr, so just put NULL
549 * there too.
550 */
Benny Prijono758decb2008-08-26 17:10:51 +0000551 /* In some scenarios, status may be PJ_SUCCESS. The upper
552 * layer application may not expect the callback to be called
553 * with successful status and NULL data, so lets not call the
554 * callback if the status is PJ_SUCCESS.
555 */
556 if (status != PJ_SUCCESS ) {
557 ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0,
558 NULL, 0, status);
559 }
Benny Prijono4bac2c12008-05-11 18:12:16 +0000560 }
561
562 /* If callback returns false, we have been destroyed! */
563 if (!ret)
564 return;
565
566 /* Only stream oriented socket may leave data in the packet */
567 if (asock->stream_oriented) {
568 r->size = remainder;
569 } else {
570 r->size = 0;
571 }
572 }
573
574 /* Read next data. We limit ourselves to processing max_loop immediate
575 * data, so when the loop counter has exceeded this value, force the
576 * read()/recvfrom() to return pending operation to allow the program
577 * to do other jobs.
578 */
579 bytes_read = r->max_size - r->size;
580 flags = asock->read_flags;
581 if (++loop >= asock->max_loop)
582 flags |= PJ_IOQUEUE_ALWAYS_ASYNC;
583
584 if (asock->read_type == TYPE_RECV) {
585 status = pj_ioqueue_recv(key, op_key, r->pkt + r->size,
586 &bytes_read, flags);
587 } else {
588 r->src_addr_len = sizeof(r->src_addr);
589 status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size,
590 &bytes_read, flags,
591 &r->src_addr, &r->src_addr_len);
592 }
593
Benny Prijono7f6ca732008-08-26 20:47:53 +0000594 if (status == PJ_SUCCESS) {
595 /* Immediate data */
596 ;
597 } else if (status != PJ_EPENDING && status != PJ_ECANCELLED) {
598 /* Error */
Benny Prijono758decb2008-08-26 17:10:51 +0000599 bytes_read = -status;
600 } else {
601 break;
602 }
603 } while (1);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000604
605}
606
607
Benny Prijono417d6052008-07-29 20:15:15 +0000608static pj_status_t send_remaining(pj_activesock_t *asock,
609 pj_ioqueue_op_key_t *send_key)
610{
611 struct send_data *sd = (struct send_data*)send_key->activesock_data;
612 pj_status_t status;
613
614 do {
615 pj_ssize_t size;
616
617 size = sd->len - sd->sent;
618 status = pj_ioqueue_send(asock->key, send_key,
619 sd->data+sd->sent, &size, sd->flags);
620 if (status != PJ_SUCCESS) {
621 /* Pending or error */
622 break;
623 }
624
625 sd->sent += size;
626 if (sd->sent == sd->len) {
627 /* The whole data has been sent. */
628 return PJ_SUCCESS;
629 }
630
631 } while (sd->sent < sd->len);
632
633 return status;
634}
635
636
Benny Prijono4bac2c12008-05-11 18:12:16 +0000637PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
638 pj_ioqueue_op_key_t *send_key,
639 const void *data,
640 pj_ssize_t *size,
641 unsigned flags)
642{
643 PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
644
Benny Prijono417d6052008-07-29 20:15:15 +0000645 send_key->activesock_data = NULL;
646
647 if (asock->whole_data) {
648 pj_ssize_t whole;
649 pj_status_t status;
650
651 whole = *size;
652
653 status = pj_ioqueue_send(asock->key, send_key, data, size, flags);
654 if (status != PJ_SUCCESS) {
655 /* Pending or error */
656 return status;
657 }
658
659 if (*size == whole) {
660 /* The whole data has been sent. */
661 return PJ_SUCCESS;
662 }
663
664 /* Data was partially sent */
665 asock->send_data.data = (pj_uint8_t*)data;
666 asock->send_data.len = whole;
667 asock->send_data.sent = *size;
668 asock->send_data.flags = flags;
669 send_key->activesock_data = &asock->send_data;
670
671 /* Try again */
672 status = send_remaining(asock, send_key);
673 if (status == PJ_SUCCESS) {
674 *size = whole;
675 }
676 return status;
677
678 } else {
679 return pj_ioqueue_send(asock->key, send_key, data, size, flags);
680 }
Benny Prijono4bac2c12008-05-11 18:12:16 +0000681}
682
683
684PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock,
685 pj_ioqueue_op_key_t *send_key,
686 const void *data,
687 pj_ssize_t *size,
688 unsigned flags,
689 const pj_sockaddr_t *addr,
690 int addr_len)
691{
692 PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len,
693 PJ_EINVAL);
694
695 return pj_ioqueue_sendto(asock->key, send_key, data, size, flags,
696 addr, addr_len);
697}
698
699
700static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
701 pj_ioqueue_op_key_t *op_key,
702 pj_ssize_t bytes_sent)
703{
704 pj_activesock_t *asock;
705
706 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
707
Benny Prijono417d6052008-07-29 20:15:15 +0000708 if (bytes_sent > 0 && op_key->activesock_data) {
709 /* whole_data is requested. Make sure we send all the data */
710 struct send_data *sd = (struct send_data*)op_key->activesock_data;
711
712 sd->sent += bytes_sent;
713 if (sd->sent == sd->len) {
714 /* all has been sent */
715 bytes_sent = sd->sent;
716 op_key->activesock_data = NULL;
717 } else {
718 /* send remaining data */
719 pj_status_t status;
720
721 status = send_remaining(asock, op_key);
722 if (status == PJ_EPENDING)
723 return;
724 else if (status == PJ_SUCCESS)
725 bytes_sent = sd->sent;
726 else
727 bytes_sent = -status;
728
729 op_key->activesock_data = NULL;
730 }
731 }
732
Benny Prijono4bac2c12008-05-11 18:12:16 +0000733 if (asock->cb.on_data_sent) {
734 pj_bool_t ret;
735
736 ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent);
737
738 /* If callback returns false, we have been destroyed! */
739 if (!ret)
740 return;
741 }
742}
743
Benny Prijono1dd54202008-07-25 10:45:34 +0000744#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000745PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock,
746 pj_pool_t *pool)
747{
748 unsigned i;
749
750 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
751 PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP);
752
753 asock->accept_op = (struct accept_op*)
754 pj_pool_calloc(pool, asock->async_count,
755 sizeof(struct accept_op));
756 for (i=0; i<asock->async_count; ++i) {
757 struct accept_op *a = &asock->accept_op[i];
758 pj_status_t status;
759
760 do {
761 a->new_sock = PJ_INVALID_SOCKET;
762 a->rem_addr_len = sizeof(a->rem_addr);
763
764 status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock,
765 NULL, &a->rem_addr, &a->rem_addr_len);
766 if (status == PJ_SUCCESS) {
767 /* We've got immediate connection. Not sure if it's a good
768 * idea to call the callback now (probably application will
769 * not be prepared to process it), so lets just silently
770 * close the socket.
771 */
772 pj_sock_close(a->new_sock);
773 }
774 } while (status == PJ_SUCCESS);
775
776 if (status != PJ_EPENDING) {
777 return status;
778 }
779 }
780
781 return PJ_SUCCESS;
782}
783
784
785static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
786 pj_ioqueue_op_key_t *op_key,
787 pj_sock_t new_sock,
788 pj_status_t status)
789{
790 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
791 struct accept_op *accept_op = (struct accept_op*) op_key;
792
Nanang Izzuddin7369d222009-11-03 12:58:54 +0000793 PJ_UNUSED_ARG(new_sock);
794
Benny Prijono4bac2c12008-05-11 18:12:16 +0000795 do {
Sauw Ming47b77a82010-09-22 13:11:11 +0000796 if (status == asock->last_err && status != PJ_SUCCESS) {
797 asock->err_counter++;
798 if (asock->err_counter >= PJ_ACTIVESOCK_MAX_CONSECUTIVE_ACCEPT_ERROR) {
799 PJ_LOG(3, ("", "Received %d consecutive errors: %d for the accept()"
800 " operation, stopping further ioqueue accepts.",
801 asock->err_counter, asock->last_err));
802 return;
803 }
804 } else {
805 asock->err_counter = 0;
806 asock->last_err = status;
807 }
808
Benny Prijono4bac2c12008-05-11 18:12:16 +0000809 if (status==PJ_SUCCESS && asock->cb.on_accept_complete) {
810 pj_bool_t ret;
811
812 /* Notify callback */
Nanang Izzuddin36aa1fb2009-11-03 12:44:11 +0000813 ret = (*asock->cb.on_accept_complete)(asock, accept_op->new_sock,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000814 &accept_op->rem_addr,
815 accept_op->rem_addr_len);
816
817 /* If callback returns false, we have been destroyed! */
818 if (!ret)
819 return;
820
Sauw Mingbe3771a2010-08-27 06:46:29 +0000821#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
822 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
823 activesock_create_iphone_os_stream(asock);
824#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000825 } else if (status==PJ_SUCCESS) {
826 /* Application doesn't handle the new socket, we need to
827 * close it to avoid resource leak.
828 */
829 pj_sock_close(accept_op->new_sock);
830 }
831
832 /* Prepare next accept() */
833 accept_op->new_sock = PJ_INVALID_SOCKET;
834 accept_op->rem_addr_len = sizeof(accept_op->rem_addr);
835
836 status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock,
837 NULL, &accept_op->rem_addr,
838 &accept_op->rem_addr_len);
839
840 } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
841}
842
843
844PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock,
845 pj_pool_t *pool,
846 const pj_sockaddr_t *remaddr,
847 int addr_len)
848{
849 PJ_UNUSED_ARG(pool);
850 return pj_ioqueue_connect(asock->key, remaddr, addr_len);
851}
852
Benny Prijono4bac2c12008-05-11 18:12:16 +0000853static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
854 pj_status_t status)
855{
856 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
857
858 if (asock->cb.on_connect_complete) {
859 pj_bool_t ret;
860
861 ret = (*asock->cb.on_connect_complete)(asock, status);
862
863 if (!ret) {
864 /* We've been destroyed */
865 return;
866 }
Sauw Mingbe3771a2010-08-27 06:46:29 +0000867
868#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
869 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
870 activesock_create_iphone_os_stream(asock);
871#endif
872
Benny Prijono4bac2c12008-05-11 18:12:16 +0000873 }
874}
Benny Prijono1dd54202008-07-25 10:45:34 +0000875#endif /* PJ_HAS_TCP */
Benny Prijono4bac2c12008-05-11 18:12:16 +0000876