blob: 075141f622f1e28a234fda071456c538a251a51f [file] [log] [blame]
Benny Prijono4bac2c12008-05-11 18:12:16 +00001/* $Id$ */
2/*
Benny Prijono844653c2008-12-23 17:27:53 +00003 * Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono4bac2c12008-05-11 18:12:16 +00005 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20#include <pj/activesock.h>
Benny Prijonobd344ff2008-08-04 09:59:02 +000021#include <pj/compat/socket.h>
Benny Prijono4bac2c12008-05-11 18:12:16 +000022#include <pj/assert.h>
23#include <pj/errno.h>
24#include <pj/pool.h>
25#include <pj/sock.h>
26#include <pj/string.h>
27
28#define PJ_ACTIVESOCK_MAX_LOOP 50
29
30
31enum read_type
32{
33 TYPE_NONE,
34 TYPE_RECV,
35 TYPE_RECV_FROM
36};
37
38struct read_op
39{
40 pj_ioqueue_op_key_t op_key;
41 pj_uint8_t *pkt;
42 unsigned max_size;
43 pj_size_t size;
44 pj_sockaddr src_addr;
45 int src_addr_len;
46};
47
48struct accept_op
49{
50 pj_ioqueue_op_key_t op_key;
51 pj_sock_t new_sock;
52 pj_sockaddr rem_addr;
53 int rem_addr_len;
54};
55
Benny Prijono417d6052008-07-29 20:15:15 +000056struct send_data
57{
58 pj_uint8_t *data;
59 pj_ssize_t len;
60 pj_ssize_t sent;
61 unsigned flags;
62};
63
Benny Prijono4bac2c12008-05-11 18:12:16 +000064struct pj_activesock_t
65{
66 pj_ioqueue_key_t *key;
67 pj_bool_t stream_oriented;
Benny Prijono417d6052008-07-29 20:15:15 +000068 pj_bool_t whole_data;
Benny Prijono4bac2c12008-05-11 18:12:16 +000069 pj_ioqueue_t *ioqueue;
70 void *user_data;
71 unsigned async_count;
72 unsigned max_loop;
73 pj_activesock_cb cb;
74
Benny Prijono417d6052008-07-29 20:15:15 +000075 struct send_data send_data;
76
Benny Prijono4bac2c12008-05-11 18:12:16 +000077 struct read_op *read_op;
78 pj_uint32_t read_flags;
79 enum read_type read_type;
80
81 struct accept_op *accept_op;
82};
83
84
85static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
86 pj_ioqueue_op_key_t *op_key,
87 pj_ssize_t bytes_read);
88static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
89 pj_ioqueue_op_key_t *op_key,
90 pj_ssize_t bytes_sent);
Benny Prijono1dd54202008-07-25 10:45:34 +000091#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +000092static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
93 pj_ioqueue_op_key_t *op_key,
94 pj_sock_t sock,
95 pj_status_t status);
96static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
97 pj_status_t status);
Benny Prijono1dd54202008-07-25 10:45:34 +000098#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +000099
100PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg)
101{
102 pj_bzero(cfg, sizeof(*cfg));
103 cfg->async_cnt = 1;
104 cfg->concurrency = -1;
Benny Prijono417d6052008-07-29 20:15:15 +0000105 cfg->whole_data = PJ_TRUE;
Benny Prijono4bac2c12008-05-11 18:12:16 +0000106}
107
108
109PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
110 pj_sock_t sock,
111 int sock_type,
112 const pj_activesock_cfg *opt,
113 pj_ioqueue_t *ioqueue,
114 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000115 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000116 pj_activesock_t **p_asock)
117{
118 pj_activesock_t *asock;
119 pj_ioqueue_callback ioq_cb;
120 pj_status_t status;
121
122 PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL);
123 PJ_ASSERT_RETURN(sock!=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL);
124 PJ_ASSERT_RETURN(sock_type==pj_SOCK_STREAM() ||
125 sock_type==pj_SOCK_DGRAM(), PJ_EINVAL);
126 PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL);
127
128 asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t);
129 asock->ioqueue = ioqueue;
130 asock->stream_oriented = (sock_type == pj_SOCK_STREAM());
131 asock->async_count = (opt? opt->async_cnt : 1);
Benny Prijono417d6052008-07-29 20:15:15 +0000132 asock->whole_data = (opt? opt->whole_data : 1);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000133 asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP;
Benny Prijonoea8e4362008-06-06 14:12:23 +0000134 asock->user_data = user_data;
Benny Prijono4bac2c12008-05-11 18:12:16 +0000135 pj_memcpy(&asock->cb, cb, sizeof(*cb));
136
137 pj_bzero(&ioq_cb, sizeof(ioq_cb));
138 ioq_cb.on_read_complete = &ioqueue_on_read_complete;
139 ioq_cb.on_write_complete = &ioqueue_on_write_complete;
Benny Prijono1dd54202008-07-25 10:45:34 +0000140#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000141 ioq_cb.on_connect_complete = &ioqueue_on_connect_complete;
142 ioq_cb.on_accept_complete = &ioqueue_on_accept_complete;
Benny Prijono1dd54202008-07-25 10:45:34 +0000143#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000144
145 status = pj_ioqueue_register_sock(pool, ioqueue, sock, asock,
146 &ioq_cb, &asock->key);
147 if (status != PJ_SUCCESS) {
148 pj_activesock_close(asock);
149 return status;
150 }
151
Benny Prijono417d6052008-07-29 20:15:15 +0000152 if (asock->whole_data) {
153 /* Must disable concurrency otherwise there is a race condition */
154 pj_ioqueue_set_concurrency(asock->key, 0);
155 } else if (opt && opt->concurrency >= 0) {
Benny Prijono4bac2c12008-05-11 18:12:16 +0000156 pj_ioqueue_set_concurrency(asock->key, opt->concurrency);
157 }
158
159 *p_asock = asock;
160 return PJ_SUCCESS;
161}
162
163
164PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool,
165 const pj_sockaddr *addr,
166 const pj_activesock_cfg *opt,
167 pj_ioqueue_t *ioqueue,
168 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000169 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000170 pj_activesock_t **p_asock,
171 pj_sockaddr *bound_addr)
172{
173 pj_sock_t sock_fd;
174 pj_sockaddr default_addr;
175 pj_status_t status;
176
177 if (addr == NULL) {
178 pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0);
179 addr = &default_addr;
180 }
181
182 status = pj_sock_socket(addr->addr.sa_family, pj_SOCK_DGRAM(), 0,
183 &sock_fd);
184 if (status != PJ_SUCCESS) {
185 return status;
186 }
187
188 status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr));
189 if (status != PJ_SUCCESS) {
190 pj_sock_close(sock_fd);
191 return status;
192 }
193
194 status = pj_activesock_create(pool, sock_fd, pj_SOCK_DGRAM(), opt,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000195 ioqueue, cb, user_data, p_asock);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000196 if (status != PJ_SUCCESS) {
197 pj_sock_close(sock_fd);
198 return status;
199 }
200
201 if (bound_addr) {
202 int addr_len = sizeof(*bound_addr);
203 status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len);
204 if (status != PJ_SUCCESS) {
205 pj_activesock_close(*p_asock);
206 return status;
207 }
208 }
209
210 return PJ_SUCCESS;
211}
212
213
214PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock)
215{
216 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
217 if (asock->key) {
218 pj_ioqueue_unregister(asock->key);
219 asock->key = NULL;
220 }
221 return PJ_SUCCESS;
222}
223
224
225PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock,
226 void *user_data)
227{
228 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
229 asock->user_data = user_data;
230 return PJ_SUCCESS;
231}
232
233
234PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock)
235{
236 PJ_ASSERT_RETURN(asock, NULL);
237 return asock->user_data;
238}
239
240
241PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock,
242 pj_pool_t *pool,
243 unsigned buff_size,
244 pj_uint32_t flags)
245{
Benny Prijonobd344ff2008-08-04 09:59:02 +0000246 void **readbuf;
247 unsigned i;
248
249 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
250
251 readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
252 sizeof(void*));
253
254 for (i=0; i<asock->async_count; ++i) {
255 readbuf[i] = pj_pool_alloc(pool, buff_size);
256 }
257
258 return pj_activesock_start_read2(asock, pool, buff_size, readbuf, flags);
259}
260
261
262PJ_DEF(pj_status_t) pj_activesock_start_read2( pj_activesock_t *asock,
263 pj_pool_t *pool,
264 unsigned buff_size,
265 void *readbuf[],
266 pj_uint32_t flags)
267{
Benny Prijono4bac2c12008-05-11 18:12:16 +0000268 unsigned i;
269 pj_status_t status;
270
271 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
272 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
273 PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP);
274
275 asock->read_op = (struct read_op*)
276 pj_pool_calloc(pool, asock->async_count,
277 sizeof(struct read_op));
278 asock->read_type = TYPE_RECV;
279 asock->read_flags = flags;
280
281 for (i=0; i<asock->async_count; ++i) {
282 struct read_op *r = &asock->read_op[i];
283 pj_ssize_t size_to_read;
284
Benny Prijonobd344ff2008-08-04 09:59:02 +0000285 r->pkt = (pj_uint8_t*)readbuf[i];
Benny Prijono4bac2c12008-05-11 18:12:16 +0000286 r->max_size = size_to_read = buff_size;
287
288 status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read,
289 PJ_IOQUEUE_ALWAYS_ASYNC | flags);
290 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
291
292 if (status != PJ_EPENDING)
293 return status;
294 }
295
296 return PJ_SUCCESS;
297}
298
299
300PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock,
301 pj_pool_t *pool,
302 unsigned buff_size,
303 pj_uint32_t flags)
304{
Benny Prijonobd344ff2008-08-04 09:59:02 +0000305 void **readbuf;
306 unsigned i;
307
308 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
309
310 readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
311 sizeof(void*));
312
313 for (i=0; i<asock->async_count; ++i) {
314 readbuf[i] = pj_pool_alloc(pool, buff_size);
315 }
316
317 return pj_activesock_start_recvfrom2(asock, pool, buff_size,
318 readbuf, flags);
319}
320
321
322PJ_DEF(pj_status_t) pj_activesock_start_recvfrom2( pj_activesock_t *asock,
323 pj_pool_t *pool,
324 unsigned buff_size,
325 void *readbuf[],
326 pj_uint32_t flags)
327{
Benny Prijono4bac2c12008-05-11 18:12:16 +0000328 unsigned i;
329 pj_status_t status;
330
331 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
332 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
333
334 asock->read_op = (struct read_op*)
335 pj_pool_calloc(pool, asock->async_count,
336 sizeof(struct read_op));
337 asock->read_type = TYPE_RECV_FROM;
338 asock->read_flags = flags;
339
340 for (i=0; i<asock->async_count; ++i) {
341 struct read_op *r = &asock->read_op[i];
342 pj_ssize_t size_to_read;
343
Benny Prijonobd344ff2008-08-04 09:59:02 +0000344 r->pkt = (pj_uint8_t*) readbuf[i];
Benny Prijono4bac2c12008-05-11 18:12:16 +0000345 r->max_size = size_to_read = buff_size;
346 r->src_addr_len = sizeof(r->src_addr);
347
348 status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt,
349 &size_to_read,
350 PJ_IOQUEUE_ALWAYS_ASYNC | flags,
351 &r->src_addr, &r->src_addr_len);
352 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
353
354 if (status != PJ_EPENDING)
355 return status;
356 }
357
358 return PJ_SUCCESS;
359}
360
361
362static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
363 pj_ioqueue_op_key_t *op_key,
364 pj_ssize_t bytes_read)
365{
366 pj_activesock_t *asock;
367 struct read_op *r = (struct read_op*)op_key;
368 unsigned loop = 0;
369 pj_status_t status;
370
371 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
372
373 do {
374 unsigned flags;
375
376 if (bytes_read > 0) {
377 /*
378 * We've got new data.
379 */
380 pj_size_t remainder;
381 pj_bool_t ret;
382
383 /* Append this new data to existing data. If socket is stream
384 * oriented, user might have left some data in the buffer.
385 * Otherwise if socket is datagram there will be nothing in
386 * existing packet hence the packet will contain only the new
387 * packet.
388 */
389 r->size += bytes_read;
390
391 /* Set default remainder to zero */
392 remainder = 0;
393
394 /* And return value to TRUE */
395 ret = PJ_TRUE;
396
397 /* Notify callback */
398 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
399 ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
400 PJ_SUCCESS, &remainder);
401 } else if (asock->read_type == TYPE_RECV_FROM &&
402 asock->cb.on_data_recvfrom)
403 {
404 ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size,
405 &r->src_addr,
406 r->src_addr_len,
407 PJ_SUCCESS);
408 }
409
410 /* If callback returns false, we have been destroyed! */
411 if (!ret)
412 return;
413
414 /* Only stream oriented socket may leave data in the packet */
415 if (asock->stream_oriented) {
416 r->size = remainder;
417 } else {
418 r->size = 0;
419 }
420
Benny Prijonobd344ff2008-08-04 09:59:02 +0000421 } else if (bytes_read <= 0 &&
422 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
423 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
Benny Prijono3eb59632008-08-26 19:27:23 +0000424 (asock->stream_oriented ||
425 -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)))
Benny Prijonobd344ff2008-08-04 09:59:02 +0000426 {
Benny Prijono4bac2c12008-05-11 18:12:16 +0000427 pj_size_t remainder;
428 pj_bool_t ret;
429
430 if (bytes_read == 0) {
431 /* For stream/connection oriented socket, this means the
432 * connection has been closed. For datagram sockets, it means
433 * we've received datagram with zero length.
434 */
435 if (asock->stream_oriented)
436 status = PJ_EEOF;
437 else
438 status = PJ_SUCCESS;
439 } else {
440 /* This means we've got an error. If this is stream/connection
441 * oriented, it means connection has been closed. For datagram
442 * sockets, it means we've got some error (e.g. EWOULDBLOCK).
443 */
444 status = -bytes_read;
445 }
446
447 /* Set default remainder to zero */
448 remainder = 0;
449
450 /* And return value to TRUE */
451 ret = PJ_TRUE;
452
453 /* Notify callback */
454 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
455 /* For connection oriented socket, we still need to report
456 * the remainder data (if any) to the user to let user do
457 * processing with the remainder data before it closes the
458 * connection.
459 * If there is no remainder data, set the packet to NULL.
460 */
461 ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL),
462 r->size, status, &remainder);
463
464 } else if (asock->read_type == TYPE_RECV_FROM &&
465 asock->cb.on_data_recvfrom)
466 {
467 /* This would always be datagram oriented hence there's
468 * nothing in the packet. We can't be sure if there will be
469 * anything useful in the source_addr, so just put NULL
470 * there too.
471 */
Benny Prijono758decb2008-08-26 17:10:51 +0000472 /* In some scenarios, status may be PJ_SUCCESS. The upper
473 * layer application may not expect the callback to be called
474 * with successful status and NULL data, so lets not call the
475 * callback if the status is PJ_SUCCESS.
476 */
477 if (status != PJ_SUCCESS ) {
478 ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0,
479 NULL, 0, status);
480 }
Benny Prijono4bac2c12008-05-11 18:12:16 +0000481 }
482
483 /* If callback returns false, we have been destroyed! */
484 if (!ret)
485 return;
486
487 /* Only stream oriented socket may leave data in the packet */
488 if (asock->stream_oriented) {
489 r->size = remainder;
490 } else {
491 r->size = 0;
492 }
493 }
494
495 /* Read next data. We limit ourselves to processing max_loop immediate
496 * data, so when the loop counter has exceeded this value, force the
497 * read()/recvfrom() to return pending operation to allow the program
498 * to do other jobs.
499 */
500 bytes_read = r->max_size - r->size;
501 flags = asock->read_flags;
502 if (++loop >= asock->max_loop)
503 flags |= PJ_IOQUEUE_ALWAYS_ASYNC;
504
505 if (asock->read_type == TYPE_RECV) {
506 status = pj_ioqueue_recv(key, op_key, r->pkt + r->size,
507 &bytes_read, flags);
508 } else {
509 r->src_addr_len = sizeof(r->src_addr);
510 status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size,
511 &bytes_read, flags,
512 &r->src_addr, &r->src_addr_len);
513 }
514
Benny Prijono7f6ca732008-08-26 20:47:53 +0000515 if (status == PJ_SUCCESS) {
516 /* Immediate data */
517 ;
518 } else if (status != PJ_EPENDING && status != PJ_ECANCELLED) {
519 /* Error */
Benny Prijono758decb2008-08-26 17:10:51 +0000520 bytes_read = -status;
521 } else {
522 break;
523 }
524 } while (1);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000525
526}
527
528
Benny Prijono417d6052008-07-29 20:15:15 +0000529static pj_status_t send_remaining(pj_activesock_t *asock,
530 pj_ioqueue_op_key_t *send_key)
531{
532 struct send_data *sd = (struct send_data*)send_key->activesock_data;
533 pj_status_t status;
534
535 do {
536 pj_ssize_t size;
537
538 size = sd->len - sd->sent;
539 status = pj_ioqueue_send(asock->key, send_key,
540 sd->data+sd->sent, &size, sd->flags);
541 if (status != PJ_SUCCESS) {
542 /* Pending or error */
543 break;
544 }
545
546 sd->sent += size;
547 if (sd->sent == sd->len) {
548 /* The whole data has been sent. */
549 return PJ_SUCCESS;
550 }
551
552 } while (sd->sent < sd->len);
553
554 return status;
555}
556
557
Benny Prijono4bac2c12008-05-11 18:12:16 +0000558PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
559 pj_ioqueue_op_key_t *send_key,
560 const void *data,
561 pj_ssize_t *size,
562 unsigned flags)
563{
564 PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
565
Benny Prijono417d6052008-07-29 20:15:15 +0000566 send_key->activesock_data = NULL;
567
568 if (asock->whole_data) {
569 pj_ssize_t whole;
570 pj_status_t status;
571
572 whole = *size;
573
574 status = pj_ioqueue_send(asock->key, send_key, data, size, flags);
575 if (status != PJ_SUCCESS) {
576 /* Pending or error */
577 return status;
578 }
579
580 if (*size == whole) {
581 /* The whole data has been sent. */
582 return PJ_SUCCESS;
583 }
584
585 /* Data was partially sent */
586 asock->send_data.data = (pj_uint8_t*)data;
587 asock->send_data.len = whole;
588 asock->send_data.sent = *size;
589 asock->send_data.flags = flags;
590 send_key->activesock_data = &asock->send_data;
591
592 /* Try again */
593 status = send_remaining(asock, send_key);
594 if (status == PJ_SUCCESS) {
595 *size = whole;
596 }
597 return status;
598
599 } else {
600 return pj_ioqueue_send(asock->key, send_key, data, size, flags);
601 }
Benny Prijono4bac2c12008-05-11 18:12:16 +0000602}
603
604
605PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock,
606 pj_ioqueue_op_key_t *send_key,
607 const void *data,
608 pj_ssize_t *size,
609 unsigned flags,
610 const pj_sockaddr_t *addr,
611 int addr_len)
612{
613 PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len,
614 PJ_EINVAL);
615
616 return pj_ioqueue_sendto(asock->key, send_key, data, size, flags,
617 addr, addr_len);
618}
619
620
621static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
622 pj_ioqueue_op_key_t *op_key,
623 pj_ssize_t bytes_sent)
624{
625 pj_activesock_t *asock;
626
627 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
628
Benny Prijono417d6052008-07-29 20:15:15 +0000629 if (bytes_sent > 0 && op_key->activesock_data) {
630 /* whole_data is requested. Make sure we send all the data */
631 struct send_data *sd = (struct send_data*)op_key->activesock_data;
632
633 sd->sent += bytes_sent;
634 if (sd->sent == sd->len) {
635 /* all has been sent */
636 bytes_sent = sd->sent;
637 op_key->activesock_data = NULL;
638 } else {
639 /* send remaining data */
640 pj_status_t status;
641
642 status = send_remaining(asock, op_key);
643 if (status == PJ_EPENDING)
644 return;
645 else if (status == PJ_SUCCESS)
646 bytes_sent = sd->sent;
647 else
648 bytes_sent = -status;
649
650 op_key->activesock_data = NULL;
651 }
652 }
653
Benny Prijono4bac2c12008-05-11 18:12:16 +0000654 if (asock->cb.on_data_sent) {
655 pj_bool_t ret;
656
657 ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent);
658
659 /* If callback returns false, we have been destroyed! */
660 if (!ret)
661 return;
662 }
663}
664
Benny Prijono1dd54202008-07-25 10:45:34 +0000665#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000666PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock,
667 pj_pool_t *pool)
668{
669 unsigned i;
670
671 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
672 PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP);
673
674 asock->accept_op = (struct accept_op*)
675 pj_pool_calloc(pool, asock->async_count,
676 sizeof(struct accept_op));
677 for (i=0; i<asock->async_count; ++i) {
678 struct accept_op *a = &asock->accept_op[i];
679 pj_status_t status;
680
681 do {
682 a->new_sock = PJ_INVALID_SOCKET;
683 a->rem_addr_len = sizeof(a->rem_addr);
684
685 status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock,
686 NULL, &a->rem_addr, &a->rem_addr_len);
687 if (status == PJ_SUCCESS) {
688 /* We've got immediate connection. Not sure if it's a good
689 * idea to call the callback now (probably application will
690 * not be prepared to process it), so lets just silently
691 * close the socket.
692 */
693 pj_sock_close(a->new_sock);
694 }
695 } while (status == PJ_SUCCESS);
696
697 if (status != PJ_EPENDING) {
698 return status;
699 }
700 }
701
702 return PJ_SUCCESS;
703}
704
705
706static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
707 pj_ioqueue_op_key_t *op_key,
708 pj_sock_t new_sock,
709 pj_status_t status)
710{
711 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
712 struct accept_op *accept_op = (struct accept_op*) op_key;
713
714 do {
715 if (status==PJ_SUCCESS && asock->cb.on_accept_complete) {
716 pj_bool_t ret;
717
718 /* Notify callback */
719 ret = (*asock->cb.on_accept_complete)(asock, new_sock,
720 &accept_op->rem_addr,
721 accept_op->rem_addr_len);
722
723 /* If callback returns false, we have been destroyed! */
724 if (!ret)
725 return;
726
727 } else if (status==PJ_SUCCESS) {
728 /* Application doesn't handle the new socket, we need to
729 * close it to avoid resource leak.
730 */
731 pj_sock_close(accept_op->new_sock);
732 }
733
734 /* Prepare next accept() */
735 accept_op->new_sock = PJ_INVALID_SOCKET;
736 accept_op->rem_addr_len = sizeof(accept_op->rem_addr);
737
738 status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock,
739 NULL, &accept_op->rem_addr,
740 &accept_op->rem_addr_len);
741
742 } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
743}
744
745
746PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock,
747 pj_pool_t *pool,
748 const pj_sockaddr_t *remaddr,
749 int addr_len)
750{
751 PJ_UNUSED_ARG(pool);
752 return pj_ioqueue_connect(asock->key, remaddr, addr_len);
753}
754
Benny Prijono4bac2c12008-05-11 18:12:16 +0000755static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
756 pj_status_t status)
757{
758 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
759
760 if (asock->cb.on_connect_complete) {
761 pj_bool_t ret;
762
763 ret = (*asock->cb.on_connect_complete)(asock, status);
764
765 if (!ret) {
766 /* We've been destroyed */
767 return;
768 }
769 }
770}
Benny Prijono1dd54202008-07-25 10:45:34 +0000771#endif /* PJ_HAS_TCP */
Benny Prijono4bac2c12008-05-11 18:12:16 +0000772