blob: df1fa46c3f7532ff51b72158b847dfcf0b62f54c [file] [log] [blame]
Benny Prijonof260e462007-04-30 21:03:32 +00001/* $Id$ */
2/*
3 * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19#include <pj/ioqueue.h>
20#include <pj/assert.h>
21#include <pj/errno.h>
22#include <pj/list.h>
23#include <pj/lock.h>
24#include <pj/pool.h>
25#include <pj/string.h>
26
27#include "os_symbian.h"
28
29class CIoqueueCallback;
30
31/*
32 * IO Queue structure.
33 */
34struct pj_ioqueue_t
35{
36 int eventCount;
37 CPjTimeoutTimer *timeoutTimer;
38};
39
40
41/////////////////////////////////////////////////////////////////////////////
42// Class to encapsulate asynchronous socket operation.
43//
44class CIoqueueCallback : public CActive
45{
46public:
47 static CIoqueueCallback* NewL(pj_ioqueue_t *ioqueue,
48 pj_ioqueue_key_t *key,
49 pj_sock_t sock,
50 const pj_ioqueue_callback *cb,
51 void *user_data);
52
53 //
54 // Start asynchronous recv() operation
55 //
56 pj_status_t StartRead(pj_ioqueue_op_key_t *op_key,
57 void *buf, pj_ssize_t *size, unsigned flags,
58 pj_sockaddr_t *addr, int *addrlen);
59
60 //
61 // Start asynchronous accept() operation.
62 //
63 pj_status_t StartAccept(pj_ioqueue_op_key_t *op_key,
64 pj_sock_t *new_sock,
65 pj_sockaddr_t *local,
66 pj_sockaddr_t *remote,
67 int *addrlen );
68
69 //
70 // Completion callback.
71 //
72 void RunL();
73
74 //
75 // CActive's DoCancel()
76 //
77 void DoCancel();
78
79 //
80 // Cancel operation and call callback.
81 //
82 void CancelOperation(pj_ioqueue_op_key_t *op_key,
83 pj_ssize_t bytes_status);
84
85 //
86 // Accessors
87 //
88 void* get_user_data() const
89 {
90 return user_data_;
91 }
92 void set_user_data(void *user_data)
93 {
94 user_data_ = user_data;
95 }
96 pj_ioqueue_op_key_t *get_op_key() const
97 {
98 return pending_data_.common_.op_key_;
99 }
100 CPjSocket* get_pj_socket()
101 {
102 return sock_;
103 }
104
105private:
106 // Type of pending operation.
107 enum Type {
108 TYPE_NONE,
109 TYPE_READ,
110 TYPE_ACCEPT,
111 };
112
113 // Static data.
114 pj_ioqueue_t *ioqueue_;
115 pj_ioqueue_key_t *key_;
116 CPjSocket *sock_;
117 pj_ioqueue_callback cb_;
118 void *user_data_;
119
120 // Symbian data.
121 TPtr8 aBufferPtr_;
122 TInetAddr aAddress_;
123
124 // Application data.
125 Type type_;
126
127 union Pending_Data
128 {
129 struct Common
130 {
131 pj_ioqueue_op_key_t *op_key_;
132 } common_;
133
134 struct Pending_Read
135 {
136 pj_ioqueue_op_key_t *op_key_;
137 pj_sockaddr_t *addr_;
138 int *addrlen_;
139 } read_;
140
141 struct Pending_Accept
142 {
143 pj_ioqueue_op_key_t *op_key_;
144 pj_sock_t *new_sock_;
145 pj_sockaddr_t *local_;
146 pj_sockaddr_t *remote_;
147 int *addrlen_;
148 } accept_;
149 };
150
151 union Pending_Data pending_data_;
152 RSocket blank_sock_;
153
154 CIoqueueCallback(pj_ioqueue_t *ioqueue,
155 pj_ioqueue_key_t *key, pj_sock_t sock,
156 const pj_ioqueue_callback *cb, void *user_data)
157 : CActive(CActive::EPriorityStandard),
158 ioqueue_(ioqueue), key_(key), sock_((CPjSocket*)sock),
159 user_data_(user_data), aBufferPtr_(NULL, 0), type_(TYPE_NONE)
160 {
161 pj_memcpy(&cb_, cb, sizeof(*cb));
162 }
163
164
165 void ConstructL()
166 {
167 CActiveScheduler::Add(this);
168 }
169
170 void HandleReadCompletion();
171 CPjSocket *HandleAcceptCompletion();
172};
173
174
175CIoqueueCallback* CIoqueueCallback::NewL(pj_ioqueue_t *ioqueue,
176 pj_ioqueue_key_t *key,
177 pj_sock_t sock,
178 const pj_ioqueue_callback *cb,
179 void *user_data)
180{
181 CIoqueueCallback *self = new CIoqueueCallback(ioqueue, key, sock,
182 cb, user_data);
183 CleanupStack::PushL(self);
184 self->ConstructL();
185 CleanupStack::Pop(self);
186
187 return self;
188}
189
190
191//
192// Start asynchronous recv() operation
193//
194pj_status_t CIoqueueCallback::StartRead(pj_ioqueue_op_key_t *op_key,
195 void *buf, pj_ssize_t *size,
196 unsigned flags,
197 pj_sockaddr_t *addr, int *addrlen)
198{
199 PJ_ASSERT_RETURN(IsActive()==false, PJ_EBUSY);
200 PJ_ASSERT_RETURN(pending_data_.common_.op_key_==NULL, PJ_EBUSY);
201
202 flags &= ~PJ_IOQUEUE_ALWAYS_ASYNC;
203
204 pending_data_.read_.op_key_ = op_key;
205 pending_data_.read_.addr_ = addr;
206 pending_data_.read_.addrlen_ = addrlen;
207
208 aBufferPtr_.Set((TUint8*)buf, 0, (TInt)*size);
209
210 type_ = TYPE_READ;
211 if (addr && addrlen) {
212 sock_->Socket().RecvFrom(aBufferPtr_, aAddress_, flags, iStatus);
213 } else {
214 aAddress_.SetAddress(0);
215 aAddress_.SetPort(0);
216 sock_->Socket().Recv(aBufferPtr_, flags, iStatus);
217 }
218
Benny Prijono5d542642007-05-02 18:54:19 +0000219 SetActive();
220 return PJ_EPENDING;
Benny Prijonof260e462007-04-30 21:03:32 +0000221}
222
223
224//
225// Start asynchronous accept() operation.
226//
227pj_status_t CIoqueueCallback::StartAccept(pj_ioqueue_op_key_t *op_key,
228 pj_sock_t *new_sock,
229 pj_sockaddr_t *local,
230 pj_sockaddr_t *remote,
231 int *addrlen )
232{
233 PJ_ASSERT_RETURN(IsActive()==false, PJ_EBUSY);
234 PJ_ASSERT_RETURN(pending_data_.common_.op_key_==NULL, PJ_EBUSY);
235
236 pending_data_.accept_.op_key_ = op_key;
237 pending_data_.accept_.new_sock_ = new_sock;
238 pending_data_.accept_.local_ = local;
239 pending_data_.accept_.remote_ = remote;
240 pending_data_.accept_.addrlen_ = addrlen;
241
242 // Create blank socket
243 blank_sock_.Open(PjSymbianOS::Instance()->SocketServ());
244
245 type_ = TYPE_ACCEPT;
246 sock_->Socket().Accept(blank_sock_, iStatus);
247
Benny Prijono5d542642007-05-02 18:54:19 +0000248 SetActive();
249 return PJ_EPENDING;
Benny Prijonof260e462007-04-30 21:03:32 +0000250}
251
252
253//
254// Handle asynchronous RecvFrom() completion
255//
256void CIoqueueCallback::HandleReadCompletion()
257{
Benny Prijono5d542642007-05-02 18:54:19 +0000258 if (pending_data_.read_.addr_) {
259 PjSymbianOS::Addr2pj(aAddress_,
260 *(pj_sockaddr_in*)pending_data_.read_.addr_);
261 pending_data_.read_.addr_ = NULL;
262 }
263 if (pending_data_.read_.addrlen_) {
264 *pending_data_.read_.addrlen_ = sizeof(pj_sockaddr_in);
265 pending_data_.read_.addrlen_ = NULL;
266 }
Benny Prijonof260e462007-04-30 21:03:32 +0000267
Benny Prijono5d542642007-05-02 18:54:19 +0000268 pending_data_.read_.op_key_ = NULL;
Benny Prijonof260e462007-04-30 21:03:32 +0000269}
270
271
272//
273// Handle asynchronous Accept() completion.
274//
275CPjSocket *CIoqueueCallback::HandleAcceptCompletion()
276{
277 CPjSocket *pjNewSock = new CPjSocket(blank_sock_);
278
279 if (pending_data_.accept_.new_sock_) {
280 *pending_data_.accept_.new_sock_ = (pj_sock_t)pjNewSock;
281 pending_data_.accept_.new_sock_ = NULL;
282 }
283
284 if (pending_data_.accept_.local_) {
285 TInetAddr aAddr;
286 pj_sockaddr_in *ptr_sockaddr;
287
288 blank_sock_.LocalName(aAddr);
289 ptr_sockaddr = (pj_sockaddr_in*)pending_data_.accept_.local_;
290 PjSymbianOS::Addr2pj(aAddr, *ptr_sockaddr);
291 pending_data_.accept_.local_ = NULL;
292 }
293
294 if (pending_data_.accept_.remote_) {
295 TInetAddr aAddr;
296 pj_sockaddr_in *ptr_sockaddr;
297
298 blank_sock_.RemoteName(aAddr);
299 ptr_sockaddr = (pj_sockaddr_in*)pending_data_.accept_.remote_;
300 PjSymbianOS::Addr2pj(aAddr, *ptr_sockaddr);
301 pending_data_.accept_.remote_ = NULL;
302 }
303
304 if (pending_data_.accept_.addrlen_) {
305 *pending_data_.accept_.addrlen_ = sizeof(pj_sockaddr_in);
306 pending_data_.accept_.addrlen_ = NULL;
307 }
308
309 return pjNewSock;
310}
311
312
313//
314// Completion callback.
315//
316void CIoqueueCallback::RunL()
317{
318 Type cur_type = type_;
319
320 type_ = TYPE_NONE;
321
322 if (cur_type == TYPE_READ) {
323 //
324 // Completion of asynchronous RecvFrom()
325 //
326
327 /* Clear op_key (save it to temp variable first!) */
328 pj_ioqueue_op_key_t *op_key = pending_data_.read_.op_key_;
329 pending_data_.read_.op_key_ = NULL;
330
331 // Handle failure condition
332 if (iStatus != KErrNone) {
333 if (cb_.on_read_complete) {
334 cb_.on_read_complete( key_, op_key,
335 -PJ_RETURN_OS_ERROR(iStatus.Int()));
336 }
337 return;
338 }
339
340 HandleReadCompletion();
341
342 /* Call callback */
343 if (cb_.on_read_complete) {
344 cb_.on_read_complete(key_, op_key, aBufferPtr_.Length());
345 }
346
347 } else if (cur_type == TYPE_ACCEPT) {
348 //
349 // Completion of asynchronous Accept()
350 //
351
352 /* Clear op_key (save it to temp variable first!) */
353 pj_ioqueue_op_key_t *op_key = pending_data_.read_.op_key_;
354 pending_data_.read_.op_key_ = NULL;
355
356 // Handle failure condition
357 if (iStatus != KErrNone) {
358 if (pending_data_.accept_.new_sock_)
359 *pending_data_.accept_.new_sock_ = PJ_INVALID_SOCKET;
360
361 if (cb_.on_accept_complete) {
362 cb_.on_accept_complete( key_, op_key, PJ_INVALID_SOCKET,
363 -PJ_RETURN_OS_ERROR(iStatus.Int()));
364 }
365 return;
366 }
367
368 CPjSocket *pjNewSock = HandleAcceptCompletion();
369
370 // Call callback.
371 if (cb_.on_accept_complete) {
372 cb_.on_accept_complete( key_, op_key, (pj_sock_t)pjNewSock,
373 PJ_SUCCESS);
374 }
375 }
376
377 ioqueue_->eventCount++;
378}
379
380//
381// CActive's DoCancel()
382//
383void CIoqueueCallback::DoCancel()
384{
385 if (type_ == TYPE_READ)
386 sock_->Socket().CancelRecv();
387 else if (type_ == TYPE_ACCEPT)
388 sock_->Socket().CancelAccept();
389
390 type_ = TYPE_NONE;
391}
392
393//
394// Cancel operation and call callback.
395//
396void CIoqueueCallback::CancelOperation(pj_ioqueue_op_key_t *op_key,
397 pj_ssize_t bytes_status)
398{
399 Type cur_type = type_;
400
401 Cancel();
402
403 if (cur_type == TYPE_READ) {
404 if (cb_.on_read_complete)
405 cb_.on_read_complete(key_, op_key, bytes_status);
406 } else if (cur_type == TYPE_ACCEPT)
407 ;
408}
409
410
411/////////////////////////////////////////////////////////////////////////////
412/*
413 * IO Queue key structure.
414 */
415struct pj_ioqueue_key_t
416{
417 CIoqueueCallback *cbObj;
418};
419
420
421/*
422 * Return the name of the ioqueue implementation.
423 */
424PJ_DEF(const char*) pj_ioqueue_name(void)
425{
426 return "ioqueue-symbian";
427}
428
429
430/*
431 * Create a new I/O Queue framework.
432 */
433PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
434 pj_size_t max_fd,
435 pj_ioqueue_t **p_ioqueue)
436{
437 pj_ioqueue_t *ioq;
438
439 PJ_UNUSED_ARG(max_fd);
440
441 ioq = (pj_ioqueue_t*) pj_pool_zalloc(pool, sizeof(pj_ioqueue_t));
442 ioq->timeoutTimer = CPjTimeoutTimer::NewL();
443 *p_ioqueue = ioq;
444 return PJ_SUCCESS;
445}
446
447
448/*
449 * Destroy the I/O queue.
450 */
451PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioq )
452{
453 delete ioq->timeoutTimer;
454 ioq->timeoutTimer = NULL;
455
456 return PJ_SUCCESS;
457}
458
459
460/*
461 * Set the lock object to be used by the I/O Queue.
462 */
463PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioq,
464 pj_lock_t *lock,
465 pj_bool_t auto_delete )
466{
467 /* Don't really need lock for now */
468 PJ_UNUSED_ARG(ioq);
469
470 if (auto_delete) {
471 pj_lock_destroy(lock);
472 }
473
474 return PJ_SUCCESS;
475}
476
477
478/*
479 * Register a socket to the I/O queue framework.
480 */
481PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
482 pj_ioqueue_t *ioq,
483 pj_sock_t sock,
484 void *user_data,
485 const pj_ioqueue_callback *cb,
486 pj_ioqueue_key_t **p_key )
487{
488 pj_ioqueue_key_t *key;
489
490 key = (pj_ioqueue_key_t*) pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
491 key->cbObj = CIoqueueCallback::NewL(ioq, key, sock, cb, user_data);
492
493 *p_key = key;
494 return PJ_SUCCESS;
495}
496
497/*
498 * Unregister from the I/O Queue framework.
499 */
500PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
501{
502 if (key == NULL || key->cbObj == NULL)
503 return PJ_SUCCESS;
504
505 // Cancel pending async object
506 if (key->cbObj && key->cbObj->IsActive()) {
507 key->cbObj->Cancel();
508 }
509
510 // Close socket.
511 key->cbObj->get_pj_socket()->Socket().Close();
512 delete key->cbObj->get_pj_socket();
513
514 // Delete async object.
515 if (key->cbObj) {
516 delete key->cbObj;
517 key->cbObj = NULL;
518 }
519
520 return PJ_SUCCESS;
521}
522
523
524/*
525 * Get user data associated with an ioqueue key.
526 */
527PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
528{
529 return key->cbObj->get_user_data();
530}
531
532
533/*
534 * Set or change the user data to be associated with the file descriptor or
535 * handle or socket descriptor.
536 */
537PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
538 void *user_data,
539 void **old_data)
540{
541 if (old_data)
542 *old_data = key->cbObj->get_user_data();
543 key->cbObj->set_user_data(user_data);
544
545 return PJ_SUCCESS;
546}
547
548
549/*
550 * Initialize operation key.
551 */
552PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
553 pj_size_t size )
554{
555 pj_memset(op_key, 0, size);
556}
557
558
559/*
560 * Check if operation is pending on the specified operation key.
561 */
562PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
563 pj_ioqueue_op_key_t *op_key )
564{
565 return key->cbObj->get_op_key()==op_key &&
566 key->cbObj->IsActive();
567}
568
569
570/*
571 * Post completion status to the specified operation key and call the
572 * appropriate callback.
573 */
574PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
575 pj_ioqueue_op_key_t *op_key,
576 pj_ssize_t bytes_status )
577{
578 if (pj_ioqueue_is_pending(key, op_key)) {
579 key->cbObj->CancelOperation(op_key, bytes_status);
580 }
581 return PJ_SUCCESS;
582}
583
584
585#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
586/**
587 * Instruct I/O Queue to accept incoming connection on the specified
588 * listening socket.
589 */
590PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
591 pj_ioqueue_op_key_t *op_key,
592 pj_sock_t *new_sock,
593 pj_sockaddr_t *local,
594 pj_sockaddr_t *remote,
595 int *addrlen )
596{
597
598 return key->cbObj->StartAccept(op_key, new_sock, local, remote, addrlen);
599}
600
601
602/*
603 * Initiate non-blocking socket connect.
604 */
605PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
606 const pj_sockaddr_t *addr,
607 int addrlen )
608{
609 PJ_ASSERT_RETURN(addrlen == sizeof(pj_sockaddr_in), PJ_EINVAL);
610
611 RSocket &rSock = key->cbObj->get_pj_socket()->Socket();
612 TInetAddr inetAddr;
613 PjSymbianOS::pj2Addr(*(const pj_sockaddr_in*)addr, inetAddr);
614 TRequestStatus reqStatus;
615
616 // We don't support async connect for now.
617 PJ_TODO(IOQUEUE_SUPPORT_ASYNC_CONNECT);
618
619 rSock.Connect(inetAddr, reqStatus);
620 User::WaitForRequest(reqStatus);
621
622 if (reqStatus == KErrNone)
623 return PJ_SUCCESS;
624
625 return PJ_RETURN_OS_ERROR(reqStatus.Int());
626}
627
628
629#endif /* PJ_HAS_TCP */
630
631/*
632 * Poll the I/O Queue for completed events.
633 */
634PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioq,
635 const pj_time_val *timeout)
636{
637 CPjTimeoutTimer *timer;
638
639 if (timeout) {
640 //if (!ioq->timeoutTimer->IsActive())
641 if (0)
642 timer = ioq->timeoutTimer;
643 else
644 timer = CPjTimeoutTimer::NewL();
645
646 timer->StartTimer(timeout->sec*1000 + timeout->msec);
647
648 } else {
649 timer = NULL;
650 }
651
652 ioq->eventCount = 0;
653
654 do {
655 PjSymbianOS::Instance()->WaitForActiveObjects();
656 } while (ioq->eventCount == 0 && (!timer || (timer && !timer->HasTimedOut())));
657
658 if (timer && !timer->HasTimedOut())
659 timer->Cancel();
660
661 if (timer && timer != ioq->timeoutTimer)
662 delete timer;
663
664 return ioq->eventCount;
665}
666
667
668/*
669 * Instruct the I/O Queue to read from the specified handle.
670 */
671PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
672 pj_ioqueue_op_key_t *op_key,
673 void *buffer,
674 pj_ssize_t *length,
675 pj_uint32_t flags )
676{
677 // Clear flag
678 flags &= ~PJ_IOQUEUE_ALWAYS_ASYNC;
679 return key->cbObj->StartRead(op_key, buffer, length, flags, NULL, NULL);
680}
681
682
683/*
684 * This function behaves similarly as #pj_ioqueue_recv(), except that it is
685 * normally called for socket, and the remote address will also be returned
686 * along with the data.
687 */
688PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
689 pj_ioqueue_op_key_t *op_key,
690 void *buffer,
691 pj_ssize_t *length,
692 pj_uint32_t flags,
693 pj_sockaddr_t *addr,
694 int *addrlen)
695{
696 if (key->cbObj->IsActive())
697 return PJ_EBUSY;
698
699 // Clear flag
700 flags &= ~PJ_IOQUEUE_ALWAYS_ASYNC;
701 return key->cbObj->StartRead(op_key, buffer, length, flags, addr, addrlen);
702}
703
704
705/*
706 * Instruct the I/O Queue to write to the handle.
707 */
708PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
709 pj_ioqueue_op_key_t *op_key,
710 const void *data,
711 pj_ssize_t *length,
712 pj_uint32_t flags )
713{
714 TRequestStatus reqStatus;
715 TPtrC8 aBuffer((const TUint8*)data, (TInt)*length);
716 TSockXfrLength aLen;
717
718 PJ_UNUSED_ARG(op_key);
719
720 // Forcing pending operation is not supported.
721 PJ_ASSERT_RETURN((flags & PJ_IOQUEUE_ALWAYS_ASYNC)==0, PJ_EINVAL);
722
723 // Clear flag
724 flags &= ~PJ_IOQUEUE_ALWAYS_ASYNC;
725
726 key->cbObj->get_pj_socket()->Socket().Send(aBuffer, flags, reqStatus, aLen);
727 User::WaitForRequest(reqStatus);
728
729 if (reqStatus.Int() != KErrNone)
730 return PJ_RETURN_OS_ERROR(reqStatus.Int());
731
732 //At least in UIQ Emulator, aLen.Length() reports incorrect length
733 //for UDP (some newlc.com users seem to have reported this too).
734 //*length = aLen.Length();
735 return PJ_SUCCESS;
736}
737
738
739/*
740 * Instruct the I/O Queue to write to the handle.
741 */
742PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
743 pj_ioqueue_op_key_t *op_key,
744 const void *data,
745 pj_ssize_t *length,
746 pj_uint32_t flags,
747 const pj_sockaddr_t *addr,
748 int addrlen)
749{
750 TRequestStatus reqStatus;
751 TPtrC8 aBuffer;
752 TInetAddr inetAddr;
753 TSockXfrLength aLen;
754
755 PJ_UNUSED_ARG(op_key);
756
757 // Forcing pending operation is not supported.
758 PJ_ASSERT_RETURN((flags & PJ_IOQUEUE_ALWAYS_ASYNC)==0, PJ_EINVAL);
759
760 // Must be pj_sockaddr_in for now.
761 PJ_ASSERT_RETURN(addrlen == sizeof(pj_sockaddr_in), PJ_EINVAL);
762
763 // Clear flag
764 flags &= ~PJ_IOQUEUE_ALWAYS_ASYNC;
765
766 aBuffer.Set((const TUint8*)data, (TInt)*length);
767 PjSymbianOS::pj2Addr(*(const pj_sockaddr_in*)addr, inetAddr);
768 CPjSocket *pjSock = key->cbObj->get_pj_socket();
769
770 pjSock->Socket().SendTo(aBuffer, inetAddr, flags, reqStatus, aLen);
771 User::WaitForRequest(reqStatus);
772
773 if (reqStatus.Int() != KErrNone)
774 return PJ_RETURN_OS_ERROR(reqStatus.Int());
775
776 //At least in UIQ Emulator, aLen.Length() reports incorrect length
777 //for UDP (some newlc.com users seem to have reported this too).
778 //*length = aLen.Length();
779 return PJ_SUCCESS;
780}
781