blob: 4c3985db7664e8c3ca5b75a4ba6ae8bd83c3a2e1 [file] [log] [blame]
Benny Prijono4bac2c12008-05-11 18:12:16 +00001/* $Id$ */
2/*
Benny Prijono32177c02008-06-20 22:44:47 +00003 * Copyright (C)2003-2008 Benny Prijono <benny@prijono.org>
Benny Prijono4bac2c12008-05-11 18:12:16 +00004 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19#include <pj/activesock.h>
20#include <pj/assert.h>
21#include <pj/errno.h>
22#include <pj/pool.h>
23#include <pj/sock.h>
24#include <pj/string.h>
25
26#define PJ_ACTIVESOCK_MAX_LOOP 50
27
28
29enum read_type
30{
31 TYPE_NONE,
32 TYPE_RECV,
33 TYPE_RECV_FROM
34};
35
36struct read_op
37{
38 pj_ioqueue_op_key_t op_key;
39 pj_uint8_t *pkt;
40 unsigned max_size;
41 pj_size_t size;
42 pj_sockaddr src_addr;
43 int src_addr_len;
44};
45
46struct accept_op
47{
48 pj_ioqueue_op_key_t op_key;
49 pj_sock_t new_sock;
50 pj_sockaddr rem_addr;
51 int rem_addr_len;
52};
53
Benny Prijono417d6052008-07-29 20:15:15 +000054struct send_data
55{
56 pj_uint8_t *data;
57 pj_ssize_t len;
58 pj_ssize_t sent;
59 unsigned flags;
60};
61
Benny Prijono4bac2c12008-05-11 18:12:16 +000062struct pj_activesock_t
63{
64 pj_ioqueue_key_t *key;
65 pj_bool_t stream_oriented;
Benny Prijono417d6052008-07-29 20:15:15 +000066 pj_bool_t whole_data;
Benny Prijono4bac2c12008-05-11 18:12:16 +000067 pj_ioqueue_t *ioqueue;
68 void *user_data;
69 unsigned async_count;
70 unsigned max_loop;
71 pj_activesock_cb cb;
72
Benny Prijono417d6052008-07-29 20:15:15 +000073 struct send_data send_data;
74
Benny Prijono4bac2c12008-05-11 18:12:16 +000075 struct read_op *read_op;
76 pj_uint32_t read_flags;
77 enum read_type read_type;
78
79 struct accept_op *accept_op;
80};
81
82
83static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
84 pj_ioqueue_op_key_t *op_key,
85 pj_ssize_t bytes_read);
86static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
87 pj_ioqueue_op_key_t *op_key,
88 pj_ssize_t bytes_sent);
Benny Prijono1dd54202008-07-25 10:45:34 +000089#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +000090static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
91 pj_ioqueue_op_key_t *op_key,
92 pj_sock_t sock,
93 pj_status_t status);
94static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
95 pj_status_t status);
Benny Prijono1dd54202008-07-25 10:45:34 +000096#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +000097
98PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg)
99{
100 pj_bzero(cfg, sizeof(*cfg));
101 cfg->async_cnt = 1;
102 cfg->concurrency = -1;
Benny Prijono417d6052008-07-29 20:15:15 +0000103 cfg->whole_data = PJ_TRUE;
Benny Prijono4bac2c12008-05-11 18:12:16 +0000104}
105
106
107PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
108 pj_sock_t sock,
109 int sock_type,
110 const pj_activesock_cfg *opt,
111 pj_ioqueue_t *ioqueue,
112 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000113 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000114 pj_activesock_t **p_asock)
115{
116 pj_activesock_t *asock;
117 pj_ioqueue_callback ioq_cb;
118 pj_status_t status;
119
120 PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL);
121 PJ_ASSERT_RETURN(sock!=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL);
122 PJ_ASSERT_RETURN(sock_type==pj_SOCK_STREAM() ||
123 sock_type==pj_SOCK_DGRAM(), PJ_EINVAL);
124 PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL);
125
126 asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t);
127 asock->ioqueue = ioqueue;
128 asock->stream_oriented = (sock_type == pj_SOCK_STREAM());
129 asock->async_count = (opt? opt->async_cnt : 1);
Benny Prijono417d6052008-07-29 20:15:15 +0000130 asock->whole_data = (opt? opt->whole_data : 1);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000131 asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP;
Benny Prijonoea8e4362008-06-06 14:12:23 +0000132 asock->user_data = user_data;
Benny Prijono4bac2c12008-05-11 18:12:16 +0000133 pj_memcpy(&asock->cb, cb, sizeof(*cb));
134
135 pj_bzero(&ioq_cb, sizeof(ioq_cb));
136 ioq_cb.on_read_complete = &ioqueue_on_read_complete;
137 ioq_cb.on_write_complete = &ioqueue_on_write_complete;
Benny Prijono1dd54202008-07-25 10:45:34 +0000138#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000139 ioq_cb.on_connect_complete = &ioqueue_on_connect_complete;
140 ioq_cb.on_accept_complete = &ioqueue_on_accept_complete;
Benny Prijono1dd54202008-07-25 10:45:34 +0000141#endif
Benny Prijono4bac2c12008-05-11 18:12:16 +0000142
143 status = pj_ioqueue_register_sock(pool, ioqueue, sock, asock,
144 &ioq_cb, &asock->key);
145 if (status != PJ_SUCCESS) {
146 pj_activesock_close(asock);
147 return status;
148 }
149
Benny Prijono417d6052008-07-29 20:15:15 +0000150 if (asock->whole_data) {
151 /* Must disable concurrency otherwise there is a race condition */
152 pj_ioqueue_set_concurrency(asock->key, 0);
153 } else if (opt && opt->concurrency >= 0) {
Benny Prijono4bac2c12008-05-11 18:12:16 +0000154 pj_ioqueue_set_concurrency(asock->key, opt->concurrency);
155 }
156
157 *p_asock = asock;
158 return PJ_SUCCESS;
159}
160
161
162PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool,
163 const pj_sockaddr *addr,
164 const pj_activesock_cfg *opt,
165 pj_ioqueue_t *ioqueue,
166 const pj_activesock_cb *cb,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000167 void *user_data,
Benny Prijono4bac2c12008-05-11 18:12:16 +0000168 pj_activesock_t **p_asock,
169 pj_sockaddr *bound_addr)
170{
171 pj_sock_t sock_fd;
172 pj_sockaddr default_addr;
173 pj_status_t status;
174
175 if (addr == NULL) {
176 pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0);
177 addr = &default_addr;
178 }
179
180 status = pj_sock_socket(addr->addr.sa_family, pj_SOCK_DGRAM(), 0,
181 &sock_fd);
182 if (status != PJ_SUCCESS) {
183 return status;
184 }
185
186 status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr));
187 if (status != PJ_SUCCESS) {
188 pj_sock_close(sock_fd);
189 return status;
190 }
191
192 status = pj_activesock_create(pool, sock_fd, pj_SOCK_DGRAM(), opt,
Benny Prijonoea8e4362008-06-06 14:12:23 +0000193 ioqueue, cb, user_data, p_asock);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000194 if (status != PJ_SUCCESS) {
195 pj_sock_close(sock_fd);
196 return status;
197 }
198
199 if (bound_addr) {
200 int addr_len = sizeof(*bound_addr);
201 status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len);
202 if (status != PJ_SUCCESS) {
203 pj_activesock_close(*p_asock);
204 return status;
205 }
206 }
207
208 return PJ_SUCCESS;
209}
210
211
212PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock)
213{
214 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
215 if (asock->key) {
216 pj_ioqueue_unregister(asock->key);
217 asock->key = NULL;
218 }
219 return PJ_SUCCESS;
220}
221
222
223PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock,
224 void *user_data)
225{
226 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
227 asock->user_data = user_data;
228 return PJ_SUCCESS;
229}
230
231
232PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock)
233{
234 PJ_ASSERT_RETURN(asock, NULL);
235 return asock->user_data;
236}
237
238
239PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock,
240 pj_pool_t *pool,
241 unsigned buff_size,
242 pj_uint32_t flags)
243{
244 unsigned i;
245 pj_status_t status;
246
247 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
248 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
249 PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP);
250
251 asock->read_op = (struct read_op*)
252 pj_pool_calloc(pool, asock->async_count,
253 sizeof(struct read_op));
254 asock->read_type = TYPE_RECV;
255 asock->read_flags = flags;
256
257 for (i=0; i<asock->async_count; ++i) {
258 struct read_op *r = &asock->read_op[i];
259 pj_ssize_t size_to_read;
260
Benny Prijonoc67f8852008-05-20 08:51:03 +0000261 r->pkt = (pj_uint8_t*)pj_pool_alloc(pool, buff_size);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000262 r->max_size = size_to_read = buff_size;
263
264 status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read,
265 PJ_IOQUEUE_ALWAYS_ASYNC | flags);
266 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
267
268 if (status != PJ_EPENDING)
269 return status;
270 }
271
272 return PJ_SUCCESS;
273}
274
275
276PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock,
277 pj_pool_t *pool,
278 unsigned buff_size,
279 pj_uint32_t flags)
280{
281 unsigned i;
282 pj_status_t status;
283
284 PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
285 PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
286
287 asock->read_op = (struct read_op*)
288 pj_pool_calloc(pool, asock->async_count,
289 sizeof(struct read_op));
290 asock->read_type = TYPE_RECV_FROM;
291 asock->read_flags = flags;
292
293 for (i=0; i<asock->async_count; ++i) {
294 struct read_op *r = &asock->read_op[i];
295 pj_ssize_t size_to_read;
296
Benny Prijonoc67f8852008-05-20 08:51:03 +0000297 r->pkt = (pj_uint8_t*) pj_pool_alloc(pool, buff_size);
Benny Prijono4bac2c12008-05-11 18:12:16 +0000298 r->max_size = size_to_read = buff_size;
299 r->src_addr_len = sizeof(r->src_addr);
300
301 status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt,
302 &size_to_read,
303 PJ_IOQUEUE_ALWAYS_ASYNC | flags,
304 &r->src_addr, &r->src_addr_len);
305 PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
306
307 if (status != PJ_EPENDING)
308 return status;
309 }
310
311 return PJ_SUCCESS;
312}
313
314
315static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
316 pj_ioqueue_op_key_t *op_key,
317 pj_ssize_t bytes_read)
318{
319 pj_activesock_t *asock;
320 struct read_op *r = (struct read_op*)op_key;
321 unsigned loop = 0;
322 pj_status_t status;
323
324 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
325
326 do {
327 unsigned flags;
328
329 if (bytes_read > 0) {
330 /*
331 * We've got new data.
332 */
333 pj_size_t remainder;
334 pj_bool_t ret;
335
336 /* Append this new data to existing data. If socket is stream
337 * oriented, user might have left some data in the buffer.
338 * Otherwise if socket is datagram there will be nothing in
339 * existing packet hence the packet will contain only the new
340 * packet.
341 */
342 r->size += bytes_read;
343
344 /* Set default remainder to zero */
345 remainder = 0;
346
347 /* And return value to TRUE */
348 ret = PJ_TRUE;
349
350 /* Notify callback */
351 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
352 ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
353 PJ_SUCCESS, &remainder);
354 } else if (asock->read_type == TYPE_RECV_FROM &&
355 asock->cb.on_data_recvfrom)
356 {
357 ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size,
358 &r->src_addr,
359 r->src_addr_len,
360 PJ_SUCCESS);
361 }
362
363 /* If callback returns false, we have been destroyed! */
364 if (!ret)
365 return;
366
367 /* Only stream oriented socket may leave data in the packet */
368 if (asock->stream_oriented) {
369 r->size = remainder;
370 } else {
371 r->size = 0;
372 }
373
374 } else if (bytes_read <= 0) {
375
376 pj_size_t remainder;
377 pj_bool_t ret;
378
379 if (bytes_read == 0) {
380 /* For stream/connection oriented socket, this means the
381 * connection has been closed. For datagram sockets, it means
382 * we've received datagram with zero length.
383 */
384 if (asock->stream_oriented)
385 status = PJ_EEOF;
386 else
387 status = PJ_SUCCESS;
388 } else {
389 /* This means we've got an error. If this is stream/connection
390 * oriented, it means connection has been closed. For datagram
391 * sockets, it means we've got some error (e.g. EWOULDBLOCK).
392 */
393 status = -bytes_read;
394 }
395
396 /* Set default remainder to zero */
397 remainder = 0;
398
399 /* And return value to TRUE */
400 ret = PJ_TRUE;
401
402 /* Notify callback */
403 if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
404 /* For connection oriented socket, we still need to report
405 * the remainder data (if any) to the user to let user do
406 * processing with the remainder data before it closes the
407 * connection.
408 * If there is no remainder data, set the packet to NULL.
409 */
410 ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL),
411 r->size, status, &remainder);
412
413 } else if (asock->read_type == TYPE_RECV_FROM &&
414 asock->cb.on_data_recvfrom)
415 {
416 /* This would always be datagram oriented hence there's
417 * nothing in the packet. We can't be sure if there will be
418 * anything useful in the source_addr, so just put NULL
419 * there too.
420 */
421 ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0,
422 NULL, 0, status);
423 }
424
425 /* If callback returns false, we have been destroyed! */
426 if (!ret)
427 return;
428
429 /* Only stream oriented socket may leave data in the packet */
430 if (asock->stream_oriented) {
431 r->size = remainder;
432 } else {
433 r->size = 0;
434 }
435 }
436
437 /* Read next data. We limit ourselves to processing max_loop immediate
438 * data, so when the loop counter has exceeded this value, force the
439 * read()/recvfrom() to return pending operation to allow the program
440 * to do other jobs.
441 */
442 bytes_read = r->max_size - r->size;
443 flags = asock->read_flags;
444 if (++loop >= asock->max_loop)
445 flags |= PJ_IOQUEUE_ALWAYS_ASYNC;
446
447 if (asock->read_type == TYPE_RECV) {
448 status = pj_ioqueue_recv(key, op_key, r->pkt + r->size,
449 &bytes_read, flags);
450 } else {
451 r->src_addr_len = sizeof(r->src_addr);
452 status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size,
453 &bytes_read, flags,
454 &r->src_addr, &r->src_addr_len);
455 }
456
457 } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
458
459}
460
461
Benny Prijono417d6052008-07-29 20:15:15 +0000462static pj_status_t send_remaining(pj_activesock_t *asock,
463 pj_ioqueue_op_key_t *send_key)
464{
465 struct send_data *sd = (struct send_data*)send_key->activesock_data;
466 pj_status_t status;
467
468 do {
469 pj_ssize_t size;
470
471 size = sd->len - sd->sent;
472 status = pj_ioqueue_send(asock->key, send_key,
473 sd->data+sd->sent, &size, sd->flags);
474 if (status != PJ_SUCCESS) {
475 /* Pending or error */
476 break;
477 }
478
479 sd->sent += size;
480 if (sd->sent == sd->len) {
481 /* The whole data has been sent. */
482 return PJ_SUCCESS;
483 }
484
485 } while (sd->sent < sd->len);
486
487 return status;
488}
489
490
Benny Prijono4bac2c12008-05-11 18:12:16 +0000491PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
492 pj_ioqueue_op_key_t *send_key,
493 const void *data,
494 pj_ssize_t *size,
495 unsigned flags)
496{
497 PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
498
Benny Prijono417d6052008-07-29 20:15:15 +0000499 send_key->activesock_data = NULL;
500
501 if (asock->whole_data) {
502 pj_ssize_t whole;
503 pj_status_t status;
504
505 whole = *size;
506
507 status = pj_ioqueue_send(asock->key, send_key, data, size, flags);
508 if (status != PJ_SUCCESS) {
509 /* Pending or error */
510 return status;
511 }
512
513 if (*size == whole) {
514 /* The whole data has been sent. */
515 return PJ_SUCCESS;
516 }
517
518 /* Data was partially sent */
519 asock->send_data.data = (pj_uint8_t*)data;
520 asock->send_data.len = whole;
521 asock->send_data.sent = *size;
522 asock->send_data.flags = flags;
523 send_key->activesock_data = &asock->send_data;
524
525 /* Try again */
526 status = send_remaining(asock, send_key);
527 if (status == PJ_SUCCESS) {
528 *size = whole;
529 }
530 return status;
531
532 } else {
533 return pj_ioqueue_send(asock->key, send_key, data, size, flags);
534 }
Benny Prijono4bac2c12008-05-11 18:12:16 +0000535}
536
537
538PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock,
539 pj_ioqueue_op_key_t *send_key,
540 const void *data,
541 pj_ssize_t *size,
542 unsigned flags,
543 const pj_sockaddr_t *addr,
544 int addr_len)
545{
546 PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len,
547 PJ_EINVAL);
548
549 return pj_ioqueue_sendto(asock->key, send_key, data, size, flags,
550 addr, addr_len);
551}
552
553
554static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
555 pj_ioqueue_op_key_t *op_key,
556 pj_ssize_t bytes_sent)
557{
558 pj_activesock_t *asock;
559
560 asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
561
Benny Prijono417d6052008-07-29 20:15:15 +0000562 if (bytes_sent > 0 && op_key->activesock_data) {
563 /* whole_data is requested. Make sure we send all the data */
564 struct send_data *sd = (struct send_data*)op_key->activesock_data;
565
566 sd->sent += bytes_sent;
567 if (sd->sent == sd->len) {
568 /* all has been sent */
569 bytes_sent = sd->sent;
570 op_key->activesock_data = NULL;
571 } else {
572 /* send remaining data */
573 pj_status_t status;
574
575 status = send_remaining(asock, op_key);
576 if (status == PJ_EPENDING)
577 return;
578 else if (status == PJ_SUCCESS)
579 bytes_sent = sd->sent;
580 else
581 bytes_sent = -status;
582
583 op_key->activesock_data = NULL;
584 }
585 }
586
Benny Prijono4bac2c12008-05-11 18:12:16 +0000587 if (asock->cb.on_data_sent) {
588 pj_bool_t ret;
589
590 ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent);
591
592 /* If callback returns false, we have been destroyed! */
593 if (!ret)
594 return;
595 }
596}
597
Benny Prijono1dd54202008-07-25 10:45:34 +0000598#if PJ_HAS_TCP
Benny Prijono4bac2c12008-05-11 18:12:16 +0000599PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock,
600 pj_pool_t *pool)
601{
602 unsigned i;
603
604 PJ_ASSERT_RETURN(asock, PJ_EINVAL);
605 PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP);
606
607 asock->accept_op = (struct accept_op*)
608 pj_pool_calloc(pool, asock->async_count,
609 sizeof(struct accept_op));
610 for (i=0; i<asock->async_count; ++i) {
611 struct accept_op *a = &asock->accept_op[i];
612 pj_status_t status;
613
614 do {
615 a->new_sock = PJ_INVALID_SOCKET;
616 a->rem_addr_len = sizeof(a->rem_addr);
617
618 status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock,
619 NULL, &a->rem_addr, &a->rem_addr_len);
620 if (status == PJ_SUCCESS) {
621 /* We've got immediate connection. Not sure if it's a good
622 * idea to call the callback now (probably application will
623 * not be prepared to process it), so lets just silently
624 * close the socket.
625 */
626 pj_sock_close(a->new_sock);
627 }
628 } while (status == PJ_SUCCESS);
629
630 if (status != PJ_EPENDING) {
631 return status;
632 }
633 }
634
635 return PJ_SUCCESS;
636}
637
638
639static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
640 pj_ioqueue_op_key_t *op_key,
641 pj_sock_t new_sock,
642 pj_status_t status)
643{
644 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
645 struct accept_op *accept_op = (struct accept_op*) op_key;
646
647 do {
648 if (status==PJ_SUCCESS && asock->cb.on_accept_complete) {
649 pj_bool_t ret;
650
651 /* Notify callback */
652 ret = (*asock->cb.on_accept_complete)(asock, new_sock,
653 &accept_op->rem_addr,
654 accept_op->rem_addr_len);
655
656 /* If callback returns false, we have been destroyed! */
657 if (!ret)
658 return;
659
660 } else if (status==PJ_SUCCESS) {
661 /* Application doesn't handle the new socket, we need to
662 * close it to avoid resource leak.
663 */
664 pj_sock_close(accept_op->new_sock);
665 }
666
667 /* Prepare next accept() */
668 accept_op->new_sock = PJ_INVALID_SOCKET;
669 accept_op->rem_addr_len = sizeof(accept_op->rem_addr);
670
671 status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock,
672 NULL, &accept_op->rem_addr,
673 &accept_op->rem_addr_len);
674
675 } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
676}
677
678
679PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock,
680 pj_pool_t *pool,
681 const pj_sockaddr_t *remaddr,
682 int addr_len)
683{
684 PJ_UNUSED_ARG(pool);
685 return pj_ioqueue_connect(asock->key, remaddr, addr_len);
686}
687
Benny Prijono4bac2c12008-05-11 18:12:16 +0000688static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
689 pj_status_t status)
690{
691 pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
692
693 if (asock->cb.on_connect_complete) {
694 pj_bool_t ret;
695
696 ret = (*asock->cb.on_connect_complete)(asock, status);
697
698 if (!ret) {
699 /* We've been destroyed */
700 return;
701 }
702 }
703}
Benny Prijono1dd54202008-07-25 10:45:34 +0000704#endif /* PJ_HAS_TCP */
Benny Prijono4bac2c12008-05-11 18:12:16 +0000705