blob: 9e53aef152d1a8757a99edf895fd66345118e637 [file] [log] [blame]
Benny Prijono4bac2c12008-05-11 18:12:16 +00001/* $Id$ */
2/*
Benny Prijono32177c02008-06-20 22:44:47 +00003 * Copyright (C)2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono4bac2c12008-05-11 18:12:16 +00004 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19#include <pj/activesock.h>
Benny Prijonobd344ff2008-08-04 09:59:02 +000020#include <pj/compat/socket.h>
Benny Prijono4bac2c12008-05-11 18:12:16 +000021#include <pj/assert.h>
22#include <pj/errno.h>
23#include <pj/pool.h>
24#include <pj/sock.h>
25#include <pj/string.h>
26
27#define PJ_ACTIVESOCK_MAX_LOOP 50
28
29
30enum read_type
31{
32 TYPE_NONE,
33 TYPE_RECV,
34 TYPE_RECV_FROM
35};
36
37struct read_op
38{
39 pj_ioqueue_op_key_t op_key;
40 pj_uint8_t *pkt;
41 unsigned max_size;
42 pj_size_t size;
43 pj_sockaddr src_addr;
44 int src_addr_len;
45};
46
47struct accept_op
48{
49 pj_ioqueue_op_key_t op_key;
50 pj_sock_t new_sock;
51 pj_sockaddr rem_addr;
52 int rem_addr_len;
53};
54
Benny Prijono417d6052008-07-29 20:15:15 +000055struct send_data
56{
57 pj_uint8_t *data;
58 pj_ssize_t len;
59 pj_ssize_t sent;
60 unsigned flags;
61};
62
Benny Prijono4bac2c12008-05-11 18:12:16 +000063struct pj_activesock_t
64{
65 pj_ioqueue_key_t *key;
66 pj_bool_t stream_oriented;
Benny Prijono417d6052008-07-29 20:15:15 +000067 pj_bool_t whole_data;
Benny Prijono4bac2c12008-05-11 18:12:16 +000068 pj_ioqueue_t *ioqueue;
69 void *user_data;
70 unsigned async_count;
71 unsigned max_loop;
72 pj_activesock_cb cb;
73
Benny Prijono417d6052008-07-29 20:15:15 +000074 struct send_data send_data;
75
Benny Prijono4bac2c12008-05-11 18:12:16 +000076 struct read_op *read_op;
77 pj_uint32_t read_flags;
78 enum read_type read_type;
79
80 struct accept_op *accept_op;
81};
82
83
84static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
85 pj_ioqueue_op_key_t *op_key,
86 pj_ssize_t bytes_read);
87static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
88 pj_ioqueue_op_key_t *op_key,
89 pj_ssize_t bytes_sent);
Benny Prijono1dd54202008-07-25 10:45:34 +000090#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +000091static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
92 pj_ioqueue_op_key_t *op_key,
93 pj_sock_t sock,
94 pj_status_t status);
95static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
96 pj_status_t status);
Benny Prijono1dd54202008-07-25 10:45:34 +000097#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +000098
99PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg)
100{
101 pj_bzero(cfg, sizeof(*cfg));
102 cfg->async_cnt = 1;
103 cfg->concurrency = -1;
Benny Prijono417d6052008-07-29 20:15:15 +0000104 cfg->whole_data = PJ_TRUE;
Benny Prijono4bac2c12008-05-11 18:12:16 +0000105}
106
107
108PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
109 pj_sock_t sock,
110 int sock_type,
111 const pj_activesock_cfg *opt,
112 pj_ioqueue_t *ioqueue,
113 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000114 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000115 pj_activesock_t **p_asock)
116{
117 pj_activesock_t *asock;
118 pj_ioqueue_callback ioq_cb;
119 pj_status_t status;
120
121 PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL);
122 PJ_ASSERT_RETURN(sock!=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL);
123 PJ_ASSERT_RETURN(sock_type==pj_SOCK_STREAM() ||
124 sock_type==pj_SOCK_DGRAM(), PJ_EINVAL);
125 PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL);
126
127 asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t);
128 asock->ioqueue = ioqueue;
129 asock->stream_oriented = (sock_type == pj_SOCK_STREAM());
130 asock->async_count = (opt? opt->async_cnt : 1);
Benny Prijono417d6052008-07-29 20:15:15 +0000131 asock->whole_data = (opt? opt->whole_data : 1);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000132 asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP;
Benny Prijonoea8e4362008-06-06 14:12:23 +0000133 asock->user_data = user_data;
Benny Prijono4bac2c12008-05-11 18:12:16 +0000134 pj_memcpy(&asock->cb, cb, sizeof(*cb));
135
136 pj_bzero(&ioq_cb, sizeof(ioq_cb));
137 ioq_cb.on_read_complete = &ioqueue_on_read_complete;
138 ioq_cb.on_write_complete = &ioqueue_on_write_complete;
Benny Prijono1dd54202008-07-25 10:45:34 +0000139#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000140 ioq_cb.on_connect_complete = &ioqueue_on_connect_complete;
141 ioq_cb.on_accept_complete = &ioqueue_on_accept_complete;
Benny Prijono1dd54202008-07-25 10:45:34 +0000142#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000143
144 status = pj_ioqueue_register_sock(pool, ioqueue, sock, asock,
145 &ioq_cb, &asock->key);
146 if (status != PJ_SUCCESS) {
147 pj_activesock_close(asock);
148 return status;
149 }
150
Benny Prijono417d6052008-07-29 20:15:15 +0000151 if (asock->whole_data) {
152 /* Must disable concurrency otherwise there is a race condition */
153 pj_ioqueue_set_concurrency(asock->key, 0);
154 } else if (opt && opt->concurrency >= 0) {
Benny Prijono4bac2c12008-05-11 18:12:16 +0000155 pj_ioqueue_set_concurrency(asock->key, opt->concurrency);
156 }
157
158 *p_asock = asock;
159 return PJ_SUCCESS;
160}
161
162
163PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool,
164 const pj_sockaddr *addr,
165 const pj_activesock_cfg *opt,
166 pj_ioqueue_t *ioqueue,
167 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000168 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000169 pj_activesock_t **p_asock,
170 pj_sockaddr *bound_addr)
171{
172 pj_sock_t sock_fd;
173 pj_sockaddr default_addr;
174 pj_status_t status;
175
176 if (addr == NULL) {
177 pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0);
178 addr = &default_addr;
179 }
180
181 status = pj_sock_socket(addr->addr.sa_family, pj_SOCK_DGRAM(), 0,
182 &sock_fd);
183 if (status != PJ_SUCCESS) {
184 return status;
185 }
186
187 status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr));
188 if (status != PJ_SUCCESS) {
189 pj_sock_close(sock_fd);
190 return status;
191 }
192
193 status = pj_activesock_create(pool, sock_fd, pj_SOCK_DGRAM(), opt,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000194 ioqueue, cb, user_data, p_asock);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000195 if (status != PJ_SUCCESS) {
196 pj_sock_close(sock_fd);
197 return status;
198 }
199
200 if (bound_addr) {
201 int addr_len = sizeof(*bound_addr);
202 status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len);
203 if (status != PJ_SUCCESS) {
204 pj_activesock_close(*p_asock);
205 return status;
206 }
207 }
208
209 return PJ_SUCCESS;
210}
211
212
213PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock)
214{
215 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
216 if (asock->key) {
217 pj_ioqueue_unregister(asock->key);
218 asock->key = NULL;
219 }
220 return PJ_SUCCESS;
221}
222
223
224PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock,
225 void *user_data)
226{
227 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
228 asock->user_data = user_data;
229 return PJ_SUCCESS;
230}
231
232
233PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock)
234{
235 PJ_ASSERT_RETURN(asock, NULL);
236 return asock->user_data;
237}
238
239
240PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock,
241 pj_pool_t *pool,
242 unsigned buff_size,
243 pj_uint32_t flags)
244{
Benny Prijonobd344ff2008-08-04 09:59:02 +0000245 void **readbuf;
246 unsigned i;
247
248 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
249
250 readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
251 sizeof(void*));
252
253 for (i=0; i<asock->async_count; ++i) {
254 readbuf[i] = pj_pool_alloc(pool, buff_size);
255 }
256
257 return pj_activesock_start_read2(asock, pool, buff_size, readbuf, flags);
258}
259
260
261PJ_DEF(pj_status_t) pj_activesock_start_read2( pj_activesock_t *asock,
262 pj_pool_t *pool,
263 unsigned buff_size,
264 void *readbuf[],
265 pj_uint32_t flags)
266{
Benny Prijono4bac2c12008-05-11 18:12:16 +0000267 unsigned i;
268 pj_status_t status;
269
270 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
271 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
272 PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP);
273
274 asock->read_op = (struct read_op*)
275 pj_pool_calloc(pool, asock->async_count,
276 sizeof(struct read_op));
277 asock->read_type = TYPE_RECV;
278 asock->read_flags = flags;
279
280 for (i=0; i<asock->async_count; ++i) {
281 struct read_op *r = &asock->read_op[i];
282 pj_ssize_t size_to_read;
283
Benny Prijonobd344ff2008-08-04 09:59:02 +0000284 r->pkt = (pj_uint8_t*)readbuf[i];
Benny Prijono4bac2c12008-05-11 18:12:16 +0000285 r->max_size = size_to_read = buff_size;
286
287 status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read,
288 PJ_IOQUEUE_ALWAYS_ASYNC | flags);
289 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
290
291 if (status != PJ_EPENDING)
292 return status;
293 }
294
295 return PJ_SUCCESS;
296}
297
298
299PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock,
300 pj_pool_t *pool,
301 unsigned buff_size,
302 pj_uint32_t flags)
303{
Benny Prijonobd344ff2008-08-04 09:59:02 +0000304 void **readbuf;
305 unsigned i;
306
307 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
308
309 readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
310 sizeof(void*));
311
312 for (i=0; i<asock->async_count; ++i) {
313 readbuf[i] = pj_pool_alloc(pool, buff_size);
314 }
315
316 return pj_activesock_start_recvfrom2(asock, pool, buff_size,
317 readbuf, flags);
318}
319
320
321PJ_DEF(pj_status_t) pj_activesock_start_recvfrom2( pj_activesock_t *asock,
322 pj_pool_t *pool,
323 unsigned buff_size,
324 void *readbuf[],
325 pj_uint32_t flags)
326{
Benny Prijono4bac2c12008-05-11 18:12:16 +0000327 unsigned i;
328 pj_status_t status;
329
330 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
331 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
332
333 asock->read_op = (struct read_op*)
334 pj_pool_calloc(pool, asock->async_count,
335 sizeof(struct read_op));
336 asock->read_type = TYPE_RECV_FROM;
337 asock->read_flags = flags;
338
339 for (i=0; i<asock->async_count; ++i) {
340 struct read_op *r = &asock->read_op[i];
341 pj_ssize_t size_to_read;
342
Benny Prijonobd344ff2008-08-04 09:59:02 +0000343 r->pkt = (pj_uint8_t*) readbuf[i];
Benny Prijono4bac2c12008-05-11 18:12:16 +0000344 r->max_size = size_to_read = buff_size;
345 r->src_addr_len = sizeof(r->src_addr);
346
347 status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt,
348 &size_to_read,
349 PJ_IOQUEUE_ALWAYS_ASYNC | flags,
350 &r->src_addr, &r->src_addr_len);
351 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
352
353 if (status != PJ_EPENDING)
354 return status;
355 }
356
357 return PJ_SUCCESS;
358}
359
360
361static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
362 pj_ioqueue_op_key_t *op_key,
363 pj_ssize_t bytes_read)
364{
365 pj_activesock_t *asock;
366 struct read_op *r = (struct read_op*)op_key;
367 unsigned loop = 0;
368 pj_status_t status;
369
370 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
371
372 do {
373 unsigned flags;
374
375 if (bytes_read > 0) {
376 /*
377 * We've got new data.
378 */
379 pj_size_t remainder;
380 pj_bool_t ret;
381
382 /* Append this new data to existing data. If socket is stream
383 * oriented, user might have left some data in the buffer.
384 * Otherwise if socket is datagram there will be nothing in
385 * existing packet hence the packet will contain only the new
386 * packet.
387 */
388 r->size += bytes_read;
389
390 /* Set default remainder to zero */
391 remainder = 0;
392
393 /* And return value to TRUE */
394 ret = PJ_TRUE;
395
396 /* Notify callback */
397 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
398 ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
399 PJ_SUCCESS, &remainder);
400 } else if (asock->read_type == TYPE_RECV_FROM &&
401 asock->cb.on_data_recvfrom)
402 {
403 ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size,
404 &r->src_addr,
405 r->src_addr_len,
406 PJ_SUCCESS);
407 }
408
409 /* If callback returns false, we have been destroyed! */
410 if (!ret)
411 return;
412
413 /* Only stream oriented socket may leave data in the packet */
414 if (asock->stream_oriented) {
415 r->size = remainder;
416 } else {
417 r->size = 0;
418 }
419
Benny Prijonobd344ff2008-08-04 09:59:02 +0000420 } else if (bytes_read <= 0 &&
421 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
422 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
423 -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET))
424 {
Benny Prijono4bac2c12008-05-11 18:12:16 +0000425 pj_size_t remainder;
426 pj_bool_t ret;
427
428 if (bytes_read == 0) {
429 /* For stream/connection oriented socket, this means the
430 * connection has been closed. For datagram sockets, it means
431 * we've received datagram with zero length.
432 */
433 if (asock->stream_oriented)
434 status = PJ_EEOF;
435 else
436 status = PJ_SUCCESS;
437 } else {
438 /* This means we've got an error. If this is stream/connection
439 * oriented, it means connection has been closed. For datagram
440 * sockets, it means we've got some error (e.g. EWOULDBLOCK).
441 */
442 status = -bytes_read;
443 }
444
445 /* Set default remainder to zero */
446 remainder = 0;
447
448 /* And return value to TRUE */
449 ret = PJ_TRUE;
450
451 /* Notify callback */
452 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
453 /* For connection oriented socket, we still need to report
454 * the remainder data (if any) to the user to let user do
455 * processing with the remainder data before it closes the
456 * connection.
457 * If there is no remainder data, set the packet to NULL.
458 */
459 ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL),
460 r->size, status, &remainder);
461
462 } else if (asock->read_type == TYPE_RECV_FROM &&
463 asock->cb.on_data_recvfrom)
464 {
465 /* This would always be datagram oriented hence there's
466 * nothing in the packet. We can't be sure if there will be
467 * anything useful in the source_addr, so just put NULL
468 * there too.
469 */
470 ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0,
471 NULL, 0, status);
472 }
473
474 /* If callback returns false, we have been destroyed! */
475 if (!ret)
476 return;
477
478 /* Only stream oriented socket may leave data in the packet */
479 if (asock->stream_oriented) {
480 r->size = remainder;
481 } else {
482 r->size = 0;
483 }
484 }
485
486 /* Read next data. We limit ourselves to processing max_loop immediate
487 * data, so when the loop counter has exceeded this value, force the
488 * read()/recvfrom() to return pending operation to allow the program
489 * to do other jobs.
490 */
491 bytes_read = r->max_size - r->size;
492 flags = asock->read_flags;
493 if (++loop >= asock->max_loop)
494 flags |= PJ_IOQUEUE_ALWAYS_ASYNC;
495
496 if (asock->read_type == TYPE_RECV) {
497 status = pj_ioqueue_recv(key, op_key, r->pkt + r->size,
498 &bytes_read, flags);
499 } else {
500 r->src_addr_len = sizeof(r->src_addr);
501 status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size,
502 &bytes_read, flags,
503 &r->src_addr, &r->src_addr_len);
504 }
505
506 } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
507
508}
509
510
Benny Prijono417d6052008-07-29 20:15:15 +0000511static pj_status_t send_remaining(pj_activesock_t *asock,
512 pj_ioqueue_op_key_t *send_key)
513{
514 struct send_data *sd = (struct send_data*)send_key->activesock_data;
515 pj_status_t status;
516
517 do {
518 pj_ssize_t size;
519
520 size = sd->len - sd->sent;
521 status = pj_ioqueue_send(asock->key, send_key,
522 sd->data+sd->sent, &size, sd->flags);
523 if (status != PJ_SUCCESS) {
524 /* Pending or error */
525 break;
526 }
527
528 sd->sent += size;
529 if (sd->sent == sd->len) {
530 /* The whole data has been sent. */
531 return PJ_SUCCESS;
532 }
533
534 } while (sd->sent < sd->len);
535
536 return status;
537}
538
539
Benny Prijono4bac2c12008-05-11 18:12:16 +0000540PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
541 pj_ioqueue_op_key_t *send_key,
542 const void *data,
543 pj_ssize_t *size,
544 unsigned flags)
545{
546 PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
547
Benny Prijono417d6052008-07-29 20:15:15 +0000548 send_key->activesock_data = NULL;
549
550 if (asock->whole_data) {
551 pj_ssize_t whole;
552 pj_status_t status;
553
554 whole = *size;
555
556 status = pj_ioqueue_send(asock->key, send_key, data, size, flags);
557 if (status != PJ_SUCCESS) {
558 /* Pending or error */
559 return status;
560 }
561
562 if (*size == whole) {
563 /* The whole data has been sent. */
564 return PJ_SUCCESS;
565 }
566
567 /* Data was partially sent */
568 asock->send_data.data = (pj_uint8_t*)data;
569 asock->send_data.len = whole;
570 asock->send_data.sent = *size;
571 asock->send_data.flags = flags;
572 send_key->activesock_data = &asock->send_data;
573
574 /* Try again */
575 status = send_remaining(asock, send_key);
576 if (status == PJ_SUCCESS) {
577 *size = whole;
578 }
579 return status;
580
581 } else {
582 return pj_ioqueue_send(asock->key, send_key, data, size, flags);
583 }
Benny Prijono4bac2c12008-05-11 18:12:16 +0000584}
585
586
587PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock,
588 pj_ioqueue_op_key_t *send_key,
589 const void *data,
590 pj_ssize_t *size,
591 unsigned flags,
592 const pj_sockaddr_t *addr,
593 int addr_len)
594{
595 PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len,
596 PJ_EINVAL);
597
598 return pj_ioqueue_sendto(asock->key, send_key, data, size, flags,
599 addr, addr_len);
600}
601
602
603static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
604 pj_ioqueue_op_key_t *op_key,
605 pj_ssize_t bytes_sent)
606{
607 pj_activesock_t *asock;
608
609 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
610
Benny Prijono417d6052008-07-29 20:15:15 +0000611 if (bytes_sent > 0 && op_key->activesock_data) {
612 /* whole_data is requested. Make sure we send all the data */
613 struct send_data *sd = (struct send_data*)op_key->activesock_data;
614
615 sd->sent += bytes_sent;
616 if (sd->sent == sd->len) {
617 /* all has been sent */
618 bytes_sent = sd->sent;
619 op_key->activesock_data = NULL;
620 } else {
621 /* send remaining data */
622 pj_status_t status;
623
624 status = send_remaining(asock, op_key);
625 if (status == PJ_EPENDING)
626 return;
627 else if (status == PJ_SUCCESS)
628 bytes_sent = sd->sent;
629 else
630 bytes_sent = -status;
631
632 op_key->activesock_data = NULL;
633 }
634 }
635
Benny Prijono4bac2c12008-05-11 18:12:16 +0000636 if (asock->cb.on_data_sent) {
637 pj_bool_t ret;
638
639 ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent);
640
641 /* If callback returns false, we have been destroyed! */
642 if (!ret)
643 return;
644 }
645}
646
Benny Prijono1dd54202008-07-25 10:45:34 +0000647#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000648PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock,
649 pj_pool_t *pool)
650{
651 unsigned i;
652
653 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
654 PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP);
655
656 asock->accept_op = (struct accept_op*)
657 pj_pool_calloc(pool, asock->async_count,
658 sizeof(struct accept_op));
659 for (i=0; i<asock->async_count; ++i) {
660 struct accept_op *a = &asock->accept_op[i];
661 pj_status_t status;
662
663 do {
664 a->new_sock = PJ_INVALID_SOCKET;
665 a->rem_addr_len = sizeof(a->rem_addr);
666
667 status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock,
668 NULL, &a->rem_addr, &a->rem_addr_len);
669 if (status == PJ_SUCCESS) {
670 /* We've got immediate connection. Not sure if it's a good
671 * idea to call the callback now (probably application will
672 * not be prepared to process it), so lets just silently
673 * close the socket.
674 */
675 pj_sock_close(a->new_sock);
676 }
677 } while (status == PJ_SUCCESS);
678
679 if (status != PJ_EPENDING) {
680 return status;
681 }
682 }
683
684 return PJ_SUCCESS;
685}
686
687
688static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
689 pj_ioqueue_op_key_t *op_key,
690 pj_sock_t new_sock,
691 pj_status_t status)
692{
693 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
694 struct accept_op *accept_op = (struct accept_op*) op_key;
695
696 do {
697 if (status==PJ_SUCCESS && asock->cb.on_accept_complete) {
698 pj_bool_t ret;
699
700 /* Notify callback */
701 ret = (*asock->cb.on_accept_complete)(asock, new_sock,
702 &accept_op->rem_addr,
703 accept_op->rem_addr_len);
704
705 /* If callback returns false, we have been destroyed! */
706 if (!ret)
707 return;
708
709 } else if (status==PJ_SUCCESS) {
710 /* Application doesn't handle the new socket, we need to
711 * close it to avoid resource leak.
712 */
713 pj_sock_close(accept_op->new_sock);
714 }
715
716 /* Prepare next accept() */
717 accept_op->new_sock = PJ_INVALID_SOCKET;
718 accept_op->rem_addr_len = sizeof(accept_op->rem_addr);
719
720 status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock,
721 NULL, &accept_op->rem_addr,
722 &accept_op->rem_addr_len);
723
724 } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
725}
726
727
728PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock,
729 pj_pool_t *pool,
730 const pj_sockaddr_t *remaddr,
731 int addr_len)
732{
733 PJ_UNUSED_ARG(pool);
734 return pj_ioqueue_connect(asock->key, remaddr, addr_len);
735}
736
Benny Prijono4bac2c12008-05-11 18:12:16 +0000737static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
738 pj_status_t status)
739{
740 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
741
742 if (asock->cb.on_connect_complete) {
743 pj_bool_t ret;
744
745 ret = (*asock->cb.on_connect_complete)(asock, status);
746
747 if (!ret) {
748 /* We've been destroyed */
749 return;
750 }
751 }
752}
Benny Prijono1dd54202008-07-25 10:45:34 +0000753#endif /* PJ_HAS_TCP */
Benny Prijono4bac2c12008-05-11 18:12:16 +0000754