blob: 2b2fc6da3eaf7e9b07864486f72a1ae7c4c4e253 [file] [log] [blame]
Benny Prijono4bac2c12008-05-11 18:12:16 +00001/* $Id$ */
2/*
Benny Prijono32177c02008-06-20 22:44:47 +00003 * Copyright (C)2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono4bac2c12008-05-11 18:12:16 +00004 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19#include <pj/activesock.h>
Benny Prijonobd344ff2008-08-04 09:59:02 +000020#include <pj/compat/socket.h>
Benny Prijono4bac2c12008-05-11 18:12:16 +000021#include <pj/assert.h>
22#include <pj/errno.h>
23#include <pj/pool.h>
24#include <pj/sock.h>
25#include <pj/string.h>
26
27#define PJ_ACTIVESOCK_MAX_LOOP 50
28
29
30enum read_type
31{
32 TYPE_NONE,
33 TYPE_RECV,
34 TYPE_RECV_FROM
35};
36
37struct read_op
38{
39 pj_ioqueue_op_key_t op_key;
40 pj_uint8_t *pkt;
41 unsigned max_size;
42 pj_size_t size;
43 pj_sockaddr src_addr;
44 int src_addr_len;
45};
46
47struct accept_op
48{
49 pj_ioqueue_op_key_t op_key;
50 pj_sock_t new_sock;
51 pj_sockaddr rem_addr;
52 int rem_addr_len;
53};
54
Benny Prijono417d6052008-07-29 20:15:15 +000055struct send_data
56{
57 pj_uint8_t *data;
58 pj_ssize_t len;
59 pj_ssize_t sent;
60 unsigned flags;
61};
62
Benny Prijono4bac2c12008-05-11 18:12:16 +000063struct pj_activesock_t
64{
65 pj_ioqueue_key_t *key;
66 pj_bool_t stream_oriented;
Benny Prijono417d6052008-07-29 20:15:15 +000067 pj_bool_t whole_data;
Benny Prijono4bac2c12008-05-11 18:12:16 +000068 pj_ioqueue_t *ioqueue;
69 void *user_data;
70 unsigned async_count;
71 unsigned max_loop;
72 pj_activesock_cb cb;
73
Benny Prijono417d6052008-07-29 20:15:15 +000074 struct send_data send_data;
75
Benny Prijono4bac2c12008-05-11 18:12:16 +000076 struct read_op *read_op;
77 pj_uint32_t read_flags;
78 enum read_type read_type;
79
80 struct accept_op *accept_op;
81};
82
83
84static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
85 pj_ioqueue_op_key_t *op_key,
86 pj_ssize_t bytes_read);
87static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
88 pj_ioqueue_op_key_t *op_key,
89 pj_ssize_t bytes_sent);
Benny Prijono1dd54202008-07-25 10:45:34 +000090#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +000091static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
92 pj_ioqueue_op_key_t *op_key,
93 pj_sock_t sock,
94 pj_status_t status);
95static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
96 pj_status_t status);
Benny Prijono1dd54202008-07-25 10:45:34 +000097#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +000098
99PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg)
100{
101 pj_bzero(cfg, sizeof(*cfg));
102 cfg->async_cnt = 1;
103 cfg->concurrency = -1;
Benny Prijono417d6052008-07-29 20:15:15 +0000104 cfg->whole_data = PJ_TRUE;
Benny Prijono4bac2c12008-05-11 18:12:16 +0000105}
106
107
108PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
109 pj_sock_t sock,
110 int sock_type,
111 const pj_activesock_cfg *opt,
112 pj_ioqueue_t *ioqueue,
113 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000114 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000115 pj_activesock_t **p_asock)
116{
117 pj_activesock_t *asock;
118 pj_ioqueue_callback ioq_cb;
119 pj_status_t status;
120
121 PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL);
122 PJ_ASSERT_RETURN(sock!=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL);
123 PJ_ASSERT_RETURN(sock_type==pj_SOCK_STREAM() ||
124 sock_type==pj_SOCK_DGRAM(), PJ_EINVAL);
125 PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL);
126
127 asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t);
128 asock->ioqueue = ioqueue;
129 asock->stream_oriented = (sock_type == pj_SOCK_STREAM());
130 asock->async_count = (opt? opt->async_cnt : 1);
Benny Prijono417d6052008-07-29 20:15:15 +0000131 asock->whole_data = (opt? opt->whole_data : 1);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000132 asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP;
Benny Prijonoea8e4362008-06-06 14:12:23 +0000133 asock->user_data = user_data;
Benny Prijono4bac2c12008-05-11 18:12:16 +0000134 pj_memcpy(&asock->cb, cb, sizeof(*cb));
135
136 pj_bzero(&ioq_cb, sizeof(ioq_cb));
137 ioq_cb.on_read_complete = &ioqueue_on_read_complete;
138 ioq_cb.on_write_complete = &ioqueue_on_write_complete;
Benny Prijono1dd54202008-07-25 10:45:34 +0000139#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000140 ioq_cb.on_connect_complete = &ioqueue_on_connect_complete;
141 ioq_cb.on_accept_complete = &ioqueue_on_accept_complete;
Benny Prijono1dd54202008-07-25 10:45:34 +0000142#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000143
144 status = pj_ioqueue_register_sock(pool, ioqueue, sock, asock,
145 &ioq_cb, &asock->key);
146 if (status != PJ_SUCCESS) {
147 pj_activesock_close(asock);
148 return status;
149 }
150
Benny Prijono417d6052008-07-29 20:15:15 +0000151 if (asock->whole_data) {
152 /* Must disable concurrency otherwise there is a race condition */
153 pj_ioqueue_set_concurrency(asock->key, 0);
154 } else if (opt && opt->concurrency >= 0) {
Benny Prijono4bac2c12008-05-11 18:12:16 +0000155 pj_ioqueue_set_concurrency(asock->key, opt->concurrency);
156 }
157
158 *p_asock = asock;
159 return PJ_SUCCESS;
160}
161
162
163PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool,
164 const pj_sockaddr *addr,
165 const pj_activesock_cfg *opt,
166 pj_ioqueue_t *ioqueue,
167 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000168 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000169 pj_activesock_t **p_asock,
170 pj_sockaddr *bound_addr)
171{
172 pj_sock_t sock_fd;
173 pj_sockaddr default_addr;
174 pj_status_t status;
175
176 if (addr == NULL) {
177 pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0);
178 addr = &default_addr;
179 }
180
181 status = pj_sock_socket(addr->addr.sa_family, pj_SOCK_DGRAM(), 0,
182 &sock_fd);
183 if (status != PJ_SUCCESS) {
184 return status;
185 }
186
187 status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr));
188 if (status != PJ_SUCCESS) {
189 pj_sock_close(sock_fd);
190 return status;
191 }
192
193 status = pj_activesock_create(pool, sock_fd, pj_SOCK_DGRAM(), opt,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000194 ioqueue, cb, user_data, p_asock);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000195 if (status != PJ_SUCCESS) {
196 pj_sock_close(sock_fd);
197 return status;
198 }
199
200 if (bound_addr) {
201 int addr_len = sizeof(*bound_addr);
202 status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len);
203 if (status != PJ_SUCCESS) {
204 pj_activesock_close(*p_asock);
205 return status;
206 }
207 }
208
209 return PJ_SUCCESS;
210}
211
212
213PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock)
214{
215 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
216 if (asock->key) {
217 pj_ioqueue_unregister(asock->key);
218 asock->key = NULL;
219 }
220 return PJ_SUCCESS;
221}
222
223
224PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock,
225 void *user_data)
226{
227 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
228 asock->user_data = user_data;
229 return PJ_SUCCESS;
230}
231
232
233PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock)
234{
235 PJ_ASSERT_RETURN(asock, NULL);
236 return asock->user_data;
237}
238
239
240PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock,
241 pj_pool_t *pool,
242 unsigned buff_size,
243 pj_uint32_t flags)
244{
Benny Prijonobd344ff2008-08-04 09:59:02 +0000245 void **readbuf;
246 unsigned i;
247
248 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
249
250 readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
251 sizeof(void*));
252
253 for (i=0; i<asock->async_count; ++i) {
254 readbuf[i] = pj_pool_alloc(pool, buff_size);
255 }
256
257 return pj_activesock_start_read2(asock, pool, buff_size, readbuf, flags);
258}
259
260
261PJ_DEF(pj_status_t) pj_activesock_start_read2( pj_activesock_t *asock,
262 pj_pool_t *pool,
263 unsigned buff_size,
264 void *readbuf[],
265 pj_uint32_t flags)
266{
Benny Prijono4bac2c12008-05-11 18:12:16 +0000267 unsigned i;
268 pj_status_t status;
269
270 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
271 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
272 PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP);
273
274 asock->read_op = (struct read_op*)
275 pj_pool_calloc(pool, asock->async_count,
276 sizeof(struct read_op));
277 asock->read_type = TYPE_RECV;
278 asock->read_flags = flags;
279
280 for (i=0; i<asock->async_count; ++i) {
281 struct read_op *r = &asock->read_op[i];
282 pj_ssize_t size_to_read;
283
Benny Prijonobd344ff2008-08-04 09:59:02 +0000284 r->pkt = (pj_uint8_t*)readbuf[i];
Benny Prijono4bac2c12008-05-11 18:12:16 +0000285 r->max_size = size_to_read = buff_size;
286
287 status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read,
288 PJ_IOQUEUE_ALWAYS_ASYNC | flags);
289 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
290
291 if (status != PJ_EPENDING)
292 return status;
293 }
294
295 return PJ_SUCCESS;
296}
297
298
299PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock,
300 pj_pool_t *pool,
301 unsigned buff_size,
302 pj_uint32_t flags)
303{
Benny Prijonobd344ff2008-08-04 09:59:02 +0000304 void **readbuf;
305 unsigned i;
306
307 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
308
309 readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
310 sizeof(void*));
311
312 for (i=0; i<asock->async_count; ++i) {
313 readbuf[i] = pj_pool_alloc(pool, buff_size);
314 }
315
316 return pj_activesock_start_recvfrom2(asock, pool, buff_size,
317 readbuf, flags);
318}
319
320
321PJ_DEF(pj_status_t) pj_activesock_start_recvfrom2( pj_activesock_t *asock,
322 pj_pool_t *pool,
323 unsigned buff_size,
324 void *readbuf[],
325 pj_uint32_t flags)
326{
Benny Prijono4bac2c12008-05-11 18:12:16 +0000327 unsigned i;
328 pj_status_t status;
329
330 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
331 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
332
333 asock->read_op = (struct read_op*)
334 pj_pool_calloc(pool, asock->async_count,
335 sizeof(struct read_op));
336 asock->read_type = TYPE_RECV_FROM;
337 asock->read_flags = flags;
338
339 for (i=0; i<asock->async_count; ++i) {
340 struct read_op *r = &asock->read_op[i];
341 pj_ssize_t size_to_read;
342
Benny Prijonobd344ff2008-08-04 09:59:02 +0000343 r->pkt = (pj_uint8_t*) readbuf[i];
Benny Prijono4bac2c12008-05-11 18:12:16 +0000344 r->max_size = size_to_read = buff_size;
345 r->src_addr_len = sizeof(r->src_addr);
346
347 status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt,
348 &size_to_read,
349 PJ_IOQUEUE_ALWAYS_ASYNC | flags,
350 &r->src_addr, &r->src_addr_len);
351 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
352
353 if (status != PJ_EPENDING)
354 return status;
355 }
356
357 return PJ_SUCCESS;
358}
359
360
361static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
362 pj_ioqueue_op_key_t *op_key,
363 pj_ssize_t bytes_read)
364{
365 pj_activesock_t *asock;
366 struct read_op *r = (struct read_op*)op_key;
367 unsigned loop = 0;
368 pj_status_t status;
369
370 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
371
372 do {
373 unsigned flags;
374
375 if (bytes_read > 0) {
376 /*
377 * We've got new data.
378 */
379 pj_size_t remainder;
380 pj_bool_t ret;
381
382 /* Append this new data to existing data. If socket is stream
383 * oriented, user might have left some data in the buffer.
384 * Otherwise if socket is datagram there will be nothing in
385 * existing packet hence the packet will contain only the new
386 * packet.
387 */
388 r->size += bytes_read;
389
390 /* Set default remainder to zero */
391 remainder = 0;
392
393 /* And return value to TRUE */
394 ret = PJ_TRUE;
395
396 /* Notify callback */
397 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
398 ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
399 PJ_SUCCESS, &remainder);
400 } else if (asock->read_type == TYPE_RECV_FROM &&
401 asock->cb.on_data_recvfrom)
402 {
403 ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size,
404 &r->src_addr,
405 r->src_addr_len,
406 PJ_SUCCESS);
407 }
408
409 /* If callback returns false, we have been destroyed! */
410 if (!ret)
411 return;
412
413 /* Only stream oriented socket may leave data in the packet */
414 if (asock->stream_oriented) {
415 r->size = remainder;
416 } else {
417 r->size = 0;
418 }
419
Benny Prijonobd344ff2008-08-04 09:59:02 +0000420 } else if (bytes_read <= 0 &&
421 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
422 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
Benny Prijono3eb59632008-08-26 19:27:23 +0000423 (asock->stream_oriented ||
424 -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)))
Benny Prijonobd344ff2008-08-04 09:59:02 +0000425 {
Benny Prijono4bac2c12008-05-11 18:12:16 +0000426 pj_size_t remainder;
427 pj_bool_t ret;
428
429 if (bytes_read == 0) {
430 /* For stream/connection oriented socket, this means the
431 * connection has been closed. For datagram sockets, it means
432 * we've received datagram with zero length.
433 */
434 if (asock->stream_oriented)
435 status = PJ_EEOF;
436 else
437 status = PJ_SUCCESS;
438 } else {
439 /* This means we've got an error. If this is stream/connection
440 * oriented, it means connection has been closed. For datagram
441 * sockets, it means we've got some error (e.g. EWOULDBLOCK).
442 */
443 status = -bytes_read;
444 }
445
446 /* Set default remainder to zero */
447 remainder = 0;
448
449 /* And return value to TRUE */
450 ret = PJ_TRUE;
451
452 /* Notify callback */
453 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
454 /* For connection oriented socket, we still need to report
455 * the remainder data (if any) to the user to let user do
456 * processing with the remainder data before it closes the
457 * connection.
458 * If there is no remainder data, set the packet to NULL.
459 */
460 ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL),
461 r->size, status, &remainder);
462
463 } else if (asock->read_type == TYPE_RECV_FROM &&
464 asock->cb.on_data_recvfrom)
465 {
466 /* This would always be datagram oriented hence there's
467 * nothing in the packet. We can't be sure if there will be
468 * anything useful in the source_addr, so just put NULL
469 * there too.
470 */
Benny Prijono758decb2008-08-26 17:10:51 +0000471 /* In some scenarios, status may be PJ_SUCCESS. The upper
472 * layer application may not expect the callback to be called
473 * with successful status and NULL data, so lets not call the
474 * callback if the status is PJ_SUCCESS.
475 */
476 if (status != PJ_SUCCESS ) {
477 ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0,
478 NULL, 0, status);
479 }
Benny Prijono4bac2c12008-05-11 18:12:16 +0000480 }
481
482 /* If callback returns false, we have been destroyed! */
483 if (!ret)
484 return;
485
486 /* Only stream oriented socket may leave data in the packet */
487 if (asock->stream_oriented) {
488 r->size = remainder;
489 } else {
490 r->size = 0;
491 }
492 }
493
494 /* Read next data. We limit ourselves to processing max_loop immediate
495 * data, so when the loop counter has exceeded this value, force the
496 * read()/recvfrom() to return pending operation to allow the program
497 * to do other jobs.
498 */
499 bytes_read = r->max_size - r->size;
500 flags = asock->read_flags;
501 if (++loop >= asock->max_loop)
502 flags |= PJ_IOQUEUE_ALWAYS_ASYNC;
503
504 if (asock->read_type == TYPE_RECV) {
505 status = pj_ioqueue_recv(key, op_key, r->pkt + r->size,
506 &bytes_read, flags);
507 } else {
508 r->src_addr_len = sizeof(r->src_addr);
509 status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size,
510 &bytes_read, flags,
511 &r->src_addr, &r->src_addr_len);
512 }
513
Benny Prijono7f6ca732008-08-26 20:47:53 +0000514 if (status == PJ_SUCCESS) {
515 /* Immediate data */
516 ;
517 } else if (status != PJ_EPENDING && status != PJ_ECANCELLED) {
518 /* Error */
Benny Prijono758decb2008-08-26 17:10:51 +0000519 bytes_read = -status;
520 } else {
521 break;
522 }
523 } while (1);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000524
525}
526
527
Benny Prijono417d6052008-07-29 20:15:15 +0000528static pj_status_t send_remaining(pj_activesock_t *asock,
529 pj_ioqueue_op_key_t *send_key)
530{
531 struct send_data *sd = (struct send_data*)send_key->activesock_data;
532 pj_status_t status;
533
534 do {
535 pj_ssize_t size;
536
537 size = sd->len - sd->sent;
538 status = pj_ioqueue_send(asock->key, send_key,
539 sd->data+sd->sent, &size, sd->flags);
540 if (status != PJ_SUCCESS) {
541 /* Pending or error */
542 break;
543 }
544
545 sd->sent += size;
546 if (sd->sent == sd->len) {
547 /* The whole data has been sent. */
548 return PJ_SUCCESS;
549 }
550
551 } while (sd->sent < sd->len);
552
553 return status;
554}
555
556
Benny Prijono4bac2c12008-05-11 18:12:16 +0000557PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
558 pj_ioqueue_op_key_t *send_key,
559 const void *data,
560 pj_ssize_t *size,
561 unsigned flags)
562{
563 PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
564
Benny Prijono417d6052008-07-29 20:15:15 +0000565 send_key->activesock_data = NULL;
566
567 if (asock->whole_data) {
568 pj_ssize_t whole;
569 pj_status_t status;
570
571 whole = *size;
572
573 status = pj_ioqueue_send(asock->key, send_key, data, size, flags);
574 if (status != PJ_SUCCESS) {
575 /* Pending or error */
576 return status;
577 }
578
579 if (*size == whole) {
580 /* The whole data has been sent. */
581 return PJ_SUCCESS;
582 }
583
584 /* Data was partially sent */
585 asock->send_data.data = (pj_uint8_t*)data;
586 asock->send_data.len = whole;
587 asock->send_data.sent = *size;
588 asock->send_data.flags = flags;
589 send_key->activesock_data = &asock->send_data;
590
591 /* Try again */
592 status = send_remaining(asock, send_key);
593 if (status == PJ_SUCCESS) {
594 *size = whole;
595 }
596 return status;
597
598 } else {
599 return pj_ioqueue_send(asock->key, send_key, data, size, flags);
600 }
Benny Prijono4bac2c12008-05-11 18:12:16 +0000601}
602
603
604PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock,
605 pj_ioqueue_op_key_t *send_key,
606 const void *data,
607 pj_ssize_t *size,
608 unsigned flags,
609 const pj_sockaddr_t *addr,
610 int addr_len)
611{
612 PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len,
613 PJ_EINVAL);
614
615 return pj_ioqueue_sendto(asock->key, send_key, data, size, flags,
616 addr, addr_len);
617}
618
619
620static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
621 pj_ioqueue_op_key_t *op_key,
622 pj_ssize_t bytes_sent)
623{
624 pj_activesock_t *asock;
625
626 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
627
Benny Prijono417d6052008-07-29 20:15:15 +0000628 if (bytes_sent > 0 && op_key->activesock_data) {
629 /* whole_data is requested. Make sure we send all the data */
630 struct send_data *sd = (struct send_data*)op_key->activesock_data;
631
632 sd->sent += bytes_sent;
633 if (sd->sent == sd->len) {
634 /* all has been sent */
635 bytes_sent = sd->sent;
636 op_key->activesock_data = NULL;
637 } else {
638 /* send remaining data */
639 pj_status_t status;
640
641 status = send_remaining(asock, op_key);
642 if (status == PJ_EPENDING)
643 return;
644 else if (status == PJ_SUCCESS)
645 bytes_sent = sd->sent;
646 else
647 bytes_sent = -status;
648
649 op_key->activesock_data = NULL;
650 }
651 }
652
Benny Prijono4bac2c12008-05-11 18:12:16 +0000653 if (asock->cb.on_data_sent) {
654 pj_bool_t ret;
655
656 ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent);
657
658 /* If callback returns false, we have been destroyed! */
659 if (!ret)
660 return;
661 }
662}
663
Benny Prijono1dd54202008-07-25 10:45:34 +0000664#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000665PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock,
666 pj_pool_t *pool)
667{
668 unsigned i;
669
670 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
671 PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP);
672
673 asock->accept_op = (struct accept_op*)
674 pj_pool_calloc(pool, asock->async_count,
675 sizeof(struct accept_op));
676 for (i=0; i<asock->async_count; ++i) {
677 struct accept_op *a = &asock->accept_op[i];
678 pj_status_t status;
679
680 do {
681 a->new_sock = PJ_INVALID_SOCKET;
682 a->rem_addr_len = sizeof(a->rem_addr);
683
684 status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock,
685 NULL, &a->rem_addr, &a->rem_addr_len);
686 if (status == PJ_SUCCESS) {
687 /* We've got immediate connection. Not sure if it's a good
688 * idea to call the callback now (probably application will
689 * not be prepared to process it), so lets just silently
690 * close the socket.
691 */
692 pj_sock_close(a->new_sock);
693 }
694 } while (status == PJ_SUCCESS);
695
696 if (status != PJ_EPENDING) {
697 return status;
698 }
699 }
700
701 return PJ_SUCCESS;
702}
703
704
705static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
706 pj_ioqueue_op_key_t *op_key,
707 pj_sock_t new_sock,
708 pj_status_t status)
709{
710 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
711 struct accept_op *accept_op = (struct accept_op*) op_key;
712
713 do {
714 if (status==PJ_SUCCESS && asock->cb.on_accept_complete) {
715 pj_bool_t ret;
716
717 /* Notify callback */
718 ret = (*asock->cb.on_accept_complete)(asock, new_sock,
719 &accept_op->rem_addr,
720 accept_op->rem_addr_len);
721
722 /* If callback returns false, we have been destroyed! */
723 if (!ret)
724 return;
725
726 } else if (status==PJ_SUCCESS) {
727 /* Application doesn't handle the new socket, we need to
728 * close it to avoid resource leak.
729 */
730 pj_sock_close(accept_op->new_sock);
731 }
732
733 /* Prepare next accept() */
734 accept_op->new_sock = PJ_INVALID_SOCKET;
735 accept_op->rem_addr_len = sizeof(accept_op->rem_addr);
736
737 status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock,
738 NULL, &accept_op->rem_addr,
739 &accept_op->rem_addr_len);
740
741 } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
742}
743
744
745PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock,
746 pj_pool_t *pool,
747 const pj_sockaddr_t *remaddr,
748 int addr_len)
749{
750 PJ_UNUSED_ARG(pool);
751 return pj_ioqueue_connect(asock->key, remaddr, addr_len);
752}
753
Benny Prijono4bac2c12008-05-11 18:12:16 +0000754static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
755 pj_status_t status)
756{
757 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
758
759 if (asock->cb.on_connect_complete) {
760 pj_bool_t ret;
761
762 ret = (*asock->cb.on_connect_complete)(asock, status);
763
764 if (!ret) {
765 /* We've been destroyed */
766 return;
767 }
768 }
769}
Benny Prijono1dd54202008-07-25 10:45:34 +0000770#endif /* PJ_HAS_TCP */
Benny Prijono4bac2c12008-05-11 18:12:16 +0000771