blob: 33d957d3f7f5388ecc0627b74f582f42a18276af [file] [log] [blame]
Benny Prijono4bac2c12008-05-11 18:12:16 +00001/* $Id$ */
2/*
Benny Prijono844653c2008-12-23 17:27:53 +00003 * Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono4bac2c12008-05-11 18:12:16 +00005 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20#include <pj/activesock.h>
Benny Prijonobd344ff2008-08-04 09:59:02 +000021#include <pj/compat/socket.h>
Benny Prijono4bac2c12008-05-11 18:12:16 +000022#include <pj/assert.h>
23#include <pj/errno.h>
24#include <pj/pool.h>
25#include <pj/sock.h>
26#include <pj/string.h>
27
28#define PJ_ACTIVESOCK_MAX_LOOP 50
29
30
31enum read_type
32{
33 TYPE_NONE,
34 TYPE_RECV,
35 TYPE_RECV_FROM
36};
37
38struct read_op
39{
40 pj_ioqueue_op_key_t op_key;
41 pj_uint8_t *pkt;
42 unsigned max_size;
43 pj_size_t size;
44 pj_sockaddr src_addr;
45 int src_addr_len;
46};
47
48struct accept_op
49{
50 pj_ioqueue_op_key_t op_key;
51 pj_sock_t new_sock;
52 pj_sockaddr rem_addr;
53 int rem_addr_len;
54};
55
Benny Prijono417d6052008-07-29 20:15:15 +000056struct send_data
57{
58 pj_uint8_t *data;
59 pj_ssize_t len;
60 pj_ssize_t sent;
61 unsigned flags;
62};
63
Benny Prijono4bac2c12008-05-11 18:12:16 +000064struct pj_activesock_t
65{
66 pj_ioqueue_key_t *key;
67 pj_bool_t stream_oriented;
Benny Prijono417d6052008-07-29 20:15:15 +000068 pj_bool_t whole_data;
Benny Prijono4bac2c12008-05-11 18:12:16 +000069 pj_ioqueue_t *ioqueue;
70 void *user_data;
71 unsigned async_count;
72 unsigned max_loop;
73 pj_activesock_cb cb;
74
Benny Prijono417d6052008-07-29 20:15:15 +000075 struct send_data send_data;
76
Benny Prijono4bac2c12008-05-11 18:12:16 +000077 struct read_op *read_op;
78 pj_uint32_t read_flags;
79 enum read_type read_type;
80
81 struct accept_op *accept_op;
82};
83
84
85static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
86 pj_ioqueue_op_key_t *op_key,
87 pj_ssize_t bytes_read);
88static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
89 pj_ioqueue_op_key_t *op_key,
90 pj_ssize_t bytes_sent);
Benny Prijono1dd54202008-07-25 10:45:34 +000091#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +000092static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
93 pj_ioqueue_op_key_t *op_key,
94 pj_sock_t sock,
95 pj_status_t status);
96static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
97 pj_status_t status);
Benny Prijono1dd54202008-07-25 10:45:34 +000098#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +000099
100PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg)
101{
102 pj_bzero(cfg, sizeof(*cfg));
103 cfg->async_cnt = 1;
104 cfg->concurrency = -1;
Benny Prijono417d6052008-07-29 20:15:15 +0000105 cfg->whole_data = PJ_TRUE;
Benny Prijono4bac2c12008-05-11 18:12:16 +0000106}
107
108
109PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
110 pj_sock_t sock,
111 int sock_type,
112 const pj_activesock_cfg *opt,
113 pj_ioqueue_t *ioqueue,
114 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000115 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000116 pj_activesock_t **p_asock)
117{
118 pj_activesock_t *asock;
119 pj_ioqueue_callback ioq_cb;
120 pj_status_t status;
121
122 PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL);
123 PJ_ASSERT_RETURN(sock!=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL);
124 PJ_ASSERT_RETURN(sock_type==pj_SOCK_STREAM() ||
125 sock_type==pj_SOCK_DGRAM(), PJ_EINVAL);
126 PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL);
127
128 asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t);
129 asock->ioqueue = ioqueue;
130 asock->stream_oriented = (sock_type == pj_SOCK_STREAM());
131 asock->async_count = (opt? opt->async_cnt : 1);
Benny Prijono417d6052008-07-29 20:15:15 +0000132 asock->whole_data = (opt? opt->whole_data : 1);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000133 asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP;
Benny Prijonoea8e4362008-06-06 14:12:23 +0000134 asock->user_data = user_data;
Benny Prijono4bac2c12008-05-11 18:12:16 +0000135 pj_memcpy(&asock->cb, cb, sizeof(*cb));
136
137 pj_bzero(&ioq_cb, sizeof(ioq_cb));
138 ioq_cb.on_read_complete = &ioqueue_on_read_complete;
139 ioq_cb.on_write_complete = &ioqueue_on_write_complete;
Benny Prijono1dd54202008-07-25 10:45:34 +0000140#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000141 ioq_cb.on_connect_complete = &ioqueue_on_connect_complete;
142 ioq_cb.on_accept_complete = &ioqueue_on_accept_complete;
Benny Prijono1dd54202008-07-25 10:45:34 +0000143#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000144
145 status = pj_ioqueue_register_sock(pool, ioqueue, sock, asock,
146 &ioq_cb, &asock->key);
147 if (status != PJ_SUCCESS) {
148 pj_activesock_close(asock);
149 return status;
150 }
151
Benny Prijono417d6052008-07-29 20:15:15 +0000152 if (asock->whole_data) {
153 /* Must disable concurrency otherwise there is a race condition */
154 pj_ioqueue_set_concurrency(asock->key, 0);
155 } else if (opt && opt->concurrency >= 0) {
Benny Prijono4bac2c12008-05-11 18:12:16 +0000156 pj_ioqueue_set_concurrency(asock->key, opt->concurrency);
157 }
158
159 *p_asock = asock;
160 return PJ_SUCCESS;
161}
162
163
164PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool,
165 const pj_sockaddr *addr,
166 const pj_activesock_cfg *opt,
167 pj_ioqueue_t *ioqueue,
168 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000169 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000170 pj_activesock_t **p_asock,
171 pj_sockaddr *bound_addr)
172{
173 pj_sock_t sock_fd;
174 pj_sockaddr default_addr;
175 pj_status_t status;
176
177 if (addr == NULL) {
178 pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0);
179 addr = &default_addr;
180 }
181
182 status = pj_sock_socket(addr->addr.sa_family, pj_SOCK_DGRAM(), 0,
183 &sock_fd);
184 if (status != PJ_SUCCESS) {
185 return status;
186 }
187
188 status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr));
189 if (status != PJ_SUCCESS) {
190 pj_sock_close(sock_fd);
191 return status;
192 }
193
194 status = pj_activesock_create(pool, sock_fd, pj_SOCK_DGRAM(), opt,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000195 ioqueue, cb, user_data, p_asock);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000196 if (status != PJ_SUCCESS) {
197 pj_sock_close(sock_fd);
198 return status;
199 }
200
201 if (bound_addr) {
202 int addr_len = sizeof(*bound_addr);
203 status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len);
204 if (status != PJ_SUCCESS) {
205 pj_activesock_close(*p_asock);
206 return status;
207 }
208 }
209
210 return PJ_SUCCESS;
211}
212
213
214PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock)
215{
216 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
217 if (asock->key) {
218 pj_ioqueue_unregister(asock->key);
219 asock->key = NULL;
220 }
221 return PJ_SUCCESS;
222}
223
224
225PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock,
226 void *user_data)
227{
228 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
229 asock->user_data = user_data;
230 return PJ_SUCCESS;
231}
232
233
234PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock)
235{
236 PJ_ASSERT_RETURN(asock, NULL);
237 return asock->user_data;
238}
239
240
241PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock,
242 pj_pool_t *pool,
243 unsigned buff_size,
244 pj_uint32_t flags)
245{
Benny Prijonobd344ff2008-08-04 09:59:02 +0000246 void **readbuf;
247 unsigned i;
248
249 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
250
251 readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
252 sizeof(void*));
253
254 for (i=0; i<asock->async_count; ++i) {
255 readbuf[i] = pj_pool_alloc(pool, buff_size);
256 }
257
258 return pj_activesock_start_read2(asock, pool, buff_size, readbuf, flags);
259}
260
261
262PJ_DEF(pj_status_t) pj_activesock_start_read2( pj_activesock_t *asock,
263 pj_pool_t *pool,
264 unsigned buff_size,
265 void *readbuf[],
266 pj_uint32_t flags)
267{
Benny Prijono4bac2c12008-05-11 18:12:16 +0000268 unsigned i;
269 pj_status_t status;
270
271 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
272 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
273 PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP);
274
275 asock->read_op = (struct read_op*)
276 pj_pool_calloc(pool, asock->async_count,
277 sizeof(struct read_op));
278 asock->read_type = TYPE_RECV;
279 asock->read_flags = flags;
280
281 for (i=0; i<asock->async_count; ++i) {
282 struct read_op *r = &asock->read_op[i];
283 pj_ssize_t size_to_read;
284
Benny Prijonobd344ff2008-08-04 09:59:02 +0000285 r->pkt = (pj_uint8_t*)readbuf[i];
Benny Prijono4bac2c12008-05-11 18:12:16 +0000286 r->max_size = size_to_read = buff_size;
287
288 status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read,
289 PJ_IOQUEUE_ALWAYS_ASYNC | flags);
290 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
291
292 if (status != PJ_EPENDING)
293 return status;
294 }
295
296 return PJ_SUCCESS;
297}
298
299
300PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock,
301 pj_pool_t *pool,
302 unsigned buff_size,
303 pj_uint32_t flags)
304{
Benny Prijonobd344ff2008-08-04 09:59:02 +0000305 void **readbuf;
306 unsigned i;
307
308 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
309
310 readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
311 sizeof(void*));
312
313 for (i=0; i<asock->async_count; ++i) {
314 readbuf[i] = pj_pool_alloc(pool, buff_size);
315 }
316
317 return pj_activesock_start_recvfrom2(asock, pool, buff_size,
318 readbuf, flags);
319}
320
321
322PJ_DEF(pj_status_t) pj_activesock_start_recvfrom2( pj_activesock_t *asock,
323 pj_pool_t *pool,
324 unsigned buff_size,
325 void *readbuf[],
326 pj_uint32_t flags)
327{
Benny Prijono4bac2c12008-05-11 18:12:16 +0000328 unsigned i;
329 pj_status_t status;
330
331 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
332 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
333
334 asock->read_op = (struct read_op*)
335 pj_pool_calloc(pool, asock->async_count,
336 sizeof(struct read_op));
337 asock->read_type = TYPE_RECV_FROM;
338 asock->read_flags = flags;
339
340 for (i=0; i<asock->async_count; ++i) {
341 struct read_op *r = &asock->read_op[i];
342 pj_ssize_t size_to_read;
343
Benny Prijonobd344ff2008-08-04 09:59:02 +0000344 r->pkt = (pj_uint8_t*) readbuf[i];
Benny Prijono4bac2c12008-05-11 18:12:16 +0000345 r->max_size = size_to_read = buff_size;
346 r->src_addr_len = sizeof(r->src_addr);
347
348 status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt,
349 &size_to_read,
350 PJ_IOQUEUE_ALWAYS_ASYNC | flags,
351 &r->src_addr, &r->src_addr_len);
352 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
353
354 if (status != PJ_EPENDING)
355 return status;
356 }
357
358 return PJ_SUCCESS;
359}
360
361
362static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
363 pj_ioqueue_op_key_t *op_key,
364 pj_ssize_t bytes_read)
365{
366 pj_activesock_t *asock;
367 struct read_op *r = (struct read_op*)op_key;
368 unsigned loop = 0;
369 pj_status_t status;
370
371 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
372
373 do {
374 unsigned flags;
375
376 if (bytes_read > 0) {
377 /*
378 * We've got new data.
379 */
380 pj_size_t remainder;
381 pj_bool_t ret;
382
383 /* Append this new data to existing data. If socket is stream
384 * oriented, user might have left some data in the buffer.
385 * Otherwise if socket is datagram there will be nothing in
386 * existing packet hence the packet will contain only the new
387 * packet.
388 */
389 r->size += bytes_read;
390
391 /* Set default remainder to zero */
392 remainder = 0;
393
394 /* And return value to TRUE */
395 ret = PJ_TRUE;
396
397 /* Notify callback */
398 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
399 ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
400 PJ_SUCCESS, &remainder);
401 } else if (asock->read_type == TYPE_RECV_FROM &&
402 asock->cb.on_data_recvfrom)
403 {
404 ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size,
405 &r->src_addr,
406 r->src_addr_len,
407 PJ_SUCCESS);
408 }
409
410 /* If callback returns false, we have been destroyed! */
411 if (!ret)
412 return;
413
414 /* Only stream oriented socket may leave data in the packet */
415 if (asock->stream_oriented) {
416 r->size = remainder;
417 } else {
418 r->size = 0;
419 }
420
Benny Prijonobd344ff2008-08-04 09:59:02 +0000421 } else if (bytes_read <= 0 &&
422 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
423 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
Benny Prijono3eb59632008-08-26 19:27:23 +0000424 (asock->stream_oriented ||
425 -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)))
Benny Prijonobd344ff2008-08-04 09:59:02 +0000426 {
Benny Prijono4bac2c12008-05-11 18:12:16 +0000427 pj_size_t remainder;
428 pj_bool_t ret;
429
430 if (bytes_read == 0) {
431 /* For stream/connection oriented socket, this means the
432 * connection has been closed. For datagram sockets, it means
433 * we've received datagram with zero length.
434 */
435 if (asock->stream_oriented)
436 status = PJ_EEOF;
437 else
438 status = PJ_SUCCESS;
439 } else {
440 /* This means we've got an error. If this is stream/connection
441 * oriented, it means connection has been closed. For datagram
442 * sockets, it means we've got some error (e.g. EWOULDBLOCK).
443 */
444 status = -bytes_read;
445 }
446
447 /* Set default remainder to zero */
448 remainder = 0;
449
450 /* And return value to TRUE */
451 ret = PJ_TRUE;
452
453 /* Notify callback */
454 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
455 /* For connection oriented socket, we still need to report
456 * the remainder data (if any) to the user to let user do
457 * processing with the remainder data before it closes the
458 * connection.
459 * If there is no remainder data, set the packet to NULL.
460 */
Nanang Izzuddina326fbf2009-10-26 14:09:09 +0000461
462 /* Shouldn't set the packet to NULL, as there may be active
463 * socket user, such as SSL socket, that needs to have access
464 * to the read buffer packet.
465 */
466 //ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL),
467 // r->size, status, &remainder);
468 ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
469 status, &remainder);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000470
471 } else if (asock->read_type == TYPE_RECV_FROM &&
472 asock->cb.on_data_recvfrom)
473 {
474 /* This would always be datagram oriented hence there's
475 * nothing in the packet. We can't be sure if there will be
476 * anything useful in the source_addr, so just put NULL
477 * there too.
478 */
Benny Prijono758decb2008-08-26 17:10:51 +0000479 /* In some scenarios, status may be PJ_SUCCESS. The upper
480 * layer application may not expect the callback to be called
481 * with successful status and NULL data, so lets not call the
482 * callback if the status is PJ_SUCCESS.
483 */
484 if (status != PJ_SUCCESS ) {
485 ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0,
486 NULL, 0, status);
487 }
Benny Prijono4bac2c12008-05-11 18:12:16 +0000488 }
489
490 /* If callback returns false, we have been destroyed! */
491 if (!ret)
492 return;
493
494 /* Only stream oriented socket may leave data in the packet */
495 if (asock->stream_oriented) {
496 r->size = remainder;
497 } else {
498 r->size = 0;
499 }
500 }
501
502 /* Read next data. We limit ourselves to processing max_loop immediate
503 * data, so when the loop counter has exceeded this value, force the
504 * read()/recvfrom() to return pending operation to allow the program
505 * to do other jobs.
506 */
507 bytes_read = r->max_size - r->size;
508 flags = asock->read_flags;
509 if (++loop >= asock->max_loop)
510 flags |= PJ_IOQUEUE_ALWAYS_ASYNC;
511
512 if (asock->read_type == TYPE_RECV) {
513 status = pj_ioqueue_recv(key, op_key, r->pkt + r->size,
514 &bytes_read, flags);
515 } else {
516 r->src_addr_len = sizeof(r->src_addr);
517 status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size,
518 &bytes_read, flags,
519 &r->src_addr, &r->src_addr_len);
520 }
521
Benny Prijono7f6ca732008-08-26 20:47:53 +0000522 if (status == PJ_SUCCESS) {
523 /* Immediate data */
524 ;
525 } else if (status != PJ_EPENDING && status != PJ_ECANCELLED) {
526 /* Error */
Benny Prijono758decb2008-08-26 17:10:51 +0000527 bytes_read = -status;
528 } else {
529 break;
530 }
531 } while (1);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000532
533}
534
535
Benny Prijono417d6052008-07-29 20:15:15 +0000536static pj_status_t send_remaining(pj_activesock_t *asock,
537 pj_ioqueue_op_key_t *send_key)
538{
539 struct send_data *sd = (struct send_data*)send_key->activesock_data;
540 pj_status_t status;
541
542 do {
543 pj_ssize_t size;
544
545 size = sd->len - sd->sent;
546 status = pj_ioqueue_send(asock->key, send_key,
547 sd->data+sd->sent, &size, sd->flags);
548 if (status != PJ_SUCCESS) {
549 /* Pending or error */
550 break;
551 }
552
553 sd->sent += size;
554 if (sd->sent == sd->len) {
555 /* The whole data has been sent. */
556 return PJ_SUCCESS;
557 }
558
559 } while (sd->sent < sd->len);
560
561 return status;
562}
563
564
Benny Prijono4bac2c12008-05-11 18:12:16 +0000565PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
566 pj_ioqueue_op_key_t *send_key,
567 const void *data,
568 pj_ssize_t *size,
569 unsigned flags)
570{
571 PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
572
Benny Prijono417d6052008-07-29 20:15:15 +0000573 send_key->activesock_data = NULL;
574
575 if (asock->whole_data) {
576 pj_ssize_t whole;
577 pj_status_t status;
578
579 whole = *size;
580
581 status = pj_ioqueue_send(asock->key, send_key, data, size, flags);
582 if (status != PJ_SUCCESS) {
583 /* Pending or error */
584 return status;
585 }
586
587 if (*size == whole) {
588 /* The whole data has been sent. */
589 return PJ_SUCCESS;
590 }
591
592 /* Data was partially sent */
593 asock->send_data.data = (pj_uint8_t*)data;
594 asock->send_data.len = whole;
595 asock->send_data.sent = *size;
596 asock->send_data.flags = flags;
597 send_key->activesock_data = &asock->send_data;
598
599 /* Try again */
600 status = send_remaining(asock, send_key);
601 if (status == PJ_SUCCESS) {
602 *size = whole;
603 }
604 return status;
605
606 } else {
607 return pj_ioqueue_send(asock->key, send_key, data, size, flags);
608 }
Benny Prijono4bac2c12008-05-11 18:12:16 +0000609}
610
611
612PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock,
613 pj_ioqueue_op_key_t *send_key,
614 const void *data,
615 pj_ssize_t *size,
616 unsigned flags,
617 const pj_sockaddr_t *addr,
618 int addr_len)
619{
620 PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len,
621 PJ_EINVAL);
622
623 return pj_ioqueue_sendto(asock->key, send_key, data, size, flags,
624 addr, addr_len);
625}
626
627
628static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
629 pj_ioqueue_op_key_t *op_key,
630 pj_ssize_t bytes_sent)
631{
632 pj_activesock_t *asock;
633
634 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
635
Benny Prijono417d6052008-07-29 20:15:15 +0000636 if (bytes_sent > 0 && op_key->activesock_data) {
637 /* whole_data is requested. Make sure we send all the data */
638 struct send_data *sd = (struct send_data*)op_key->activesock_data;
639
640 sd->sent += bytes_sent;
641 if (sd->sent == sd->len) {
642 /* all has been sent */
643 bytes_sent = sd->sent;
644 op_key->activesock_data = NULL;
645 } else {
646 /* send remaining data */
647 pj_status_t status;
648
649 status = send_remaining(asock, op_key);
650 if (status == PJ_EPENDING)
651 return;
652 else if (status == PJ_SUCCESS)
653 bytes_sent = sd->sent;
654 else
655 bytes_sent = -status;
656
657 op_key->activesock_data = NULL;
658 }
659 }
660
Benny Prijono4bac2c12008-05-11 18:12:16 +0000661 if (asock->cb.on_data_sent) {
662 pj_bool_t ret;
663
664 ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent);
665
666 /* If callback returns false, we have been destroyed! */
667 if (!ret)
668 return;
669 }
670}
671
Benny Prijono1dd54202008-07-25 10:45:34 +0000672#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000673PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock,
674 pj_pool_t *pool)
675{
676 unsigned i;
677
678 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
679 PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP);
680
681 asock->accept_op = (struct accept_op*)
682 pj_pool_calloc(pool, asock->async_count,
683 sizeof(struct accept_op));
684 for (i=0; i<asock->async_count; ++i) {
685 struct accept_op *a = &asock->accept_op[i];
686 pj_status_t status;
687
688 do {
689 a->new_sock = PJ_INVALID_SOCKET;
690 a->rem_addr_len = sizeof(a->rem_addr);
691
692 status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock,
693 NULL, &a->rem_addr, &a->rem_addr_len);
694 if (status == PJ_SUCCESS) {
695 /* We've got immediate connection. Not sure if it's a good
696 * idea to call the callback now (probably application will
697 * not be prepared to process it), so lets just silently
698 * close the socket.
699 */
700 pj_sock_close(a->new_sock);
701 }
702 } while (status == PJ_SUCCESS);
703
704 if (status != PJ_EPENDING) {
705 return status;
706 }
707 }
708
709 return PJ_SUCCESS;
710}
711
712
713static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
714 pj_ioqueue_op_key_t *op_key,
715 pj_sock_t new_sock,
716 pj_status_t status)
717{
718 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
719 struct accept_op *accept_op = (struct accept_op*) op_key;
720
721 do {
722 if (status==PJ_SUCCESS && asock->cb.on_accept_complete) {
723 pj_bool_t ret;
724
725 /* Notify callback */
726 ret = (*asock->cb.on_accept_complete)(asock, new_sock,
727 &accept_op->rem_addr,
728 accept_op->rem_addr_len);
729
730 /* If callback returns false, we have been destroyed! */
731 if (!ret)
732 return;
733
734 } else if (status==PJ_SUCCESS) {
735 /* Application doesn't handle the new socket, we need to
736 * close it to avoid resource leak.
737 */
738 pj_sock_close(accept_op->new_sock);
739 }
740
741 /* Prepare next accept() */
742 accept_op->new_sock = PJ_INVALID_SOCKET;
743 accept_op->rem_addr_len = sizeof(accept_op->rem_addr);
744
745 status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock,
746 NULL, &accept_op->rem_addr,
747 &accept_op->rem_addr_len);
748
749 } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
750}
751
752
753PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock,
754 pj_pool_t *pool,
755 const pj_sockaddr_t *remaddr,
756 int addr_len)
757{
758 PJ_UNUSED_ARG(pool);
759 return pj_ioqueue_connect(asock->key, remaddr, addr_len);
760}
761
Benny Prijono4bac2c12008-05-11 18:12:16 +0000762static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
763 pj_status_t status)
764{
765 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
766
767 if (asock->cb.on_connect_complete) {
768 pj_bool_t ret;
769
770 ret = (*asock->cb.on_connect_complete)(asock, status);
771
772 if (!ret) {
773 /* We've been destroyed */
774 return;
775 }
776 }
777}
Benny Prijono1dd54202008-07-25 10:45:34 +0000778#endif /* PJ_HAS_TCP */
Benny Prijono4bac2c12008-05-11 18:12:16 +0000779