blob: 7021a5a0d72bc9a166a53c951b5591ff710ebed2 [file] [log] [blame]
Benny Prijono4766ffe2005-11-01 17:56:59 +00001/* $Id$
Benny Prijono4766ffe2005-11-01 17:56:59 +00002 */
Benny Prijono85d3f452005-11-09 15:37:19 +00003#ifndef __PJPP_PROACTOR_H__
4#define __PJPP_PROACTOR_H__
Benny Prijono0a749f12005-10-31 21:02:30 +00005
6#include <pj/ioqueue.h>
7#include <pj++/pool.hpp>
8#include <pj++/sock.hpp>
9#include <pj++/timer.hpp>
Benny Prijono85d3f452005-11-09 15:37:19 +000010#include <pj/errno.h>
Benny Prijono0a749f12005-10-31 21:02:30 +000011
Benny Prijono85d3f452005-11-09 15:37:19 +000012class Pj_Proactor;
13class Pj_Event_Handler;
Benny Prijono0a749f12005-10-31 21:02:30 +000014
15
Benny Prijono85d3f452005-11-09 15:37:19 +000016//////////////////////////////////////////////////////////////////////////////
17// Asynchronous operation key.
18//
19// Applications may inheric this class to put their application
20// specific data.
21//
22class Pj_Async_Op : public pj_ioqueue_op_key_t
Benny Prijono0a749f12005-10-31 21:02:30 +000023{
Benny Prijono0a749f12005-10-31 21:02:30 +000024public:
Benny Prijono85d3f452005-11-09 15:37:19 +000025 //
26 // Constructor.
27 //
28 explicit Pj_Async_Op(Pj_Event_Handler *handler)
29 : handler_(handler)
30 {
31 pj_memset(this, 0, sizeof(pj_ioqueue_op_key_t));
32 }
Benny Prijono0a749f12005-10-31 21:02:30 +000033
Benny Prijono85d3f452005-11-09 15:37:19 +000034 //
35 // Check whether operation is still pending for this key.
36 //
37 bool is_pending();
Benny Prijono0a749f12005-10-31 21:02:30 +000038
Benny Prijono85d3f452005-11-09 15:37:19 +000039 //
40 // Cancel the operation.
41 //
42 bool cancel(pj_ssize_t bytes_status=-PJ_ECANCELLED);
43
44protected:
45 Pj_Event_Handler *handler_;
46};
47
48
49//////////////////////////////////////////////////////////////////////////////
50// Event handler.
51//
52// Applications should inherit this class to receive various event
53// notifications.
54//
55// Applications should implement get_socket_handle().
56//
57class Pj_Event_Handler : public Pj_Object
58{
59 friend class Pj_Proactor;
60public:
61 //
62 // Default constructor.
63 //
64 Pj_Event_Handler()
65 : key_(NULL)
66 {
67 pj_memset(&timer_, 0, sizeof(timer_));
68 timer_.user_data = this;
69 timer_.cb = &timer_callback;
70 }
71
72 //
73 // Destroy.
74 //
75 virtual ~Pj_Event_Handler()
76 {
77 unregister();
78 }
79
80 //
81 // Unregister this handler from the ioqueue.
82 //
83 void unregister()
84 {
85 if (key_) {
86 pj_ioqueue_unregister(key_);
87 key_ = NULL;
88 }
89 }
90
91 //
92 // Get socket handle associated with this.
93 //
94 virtual pj_sock_t get_socket_handle()
95 {
96 return PJ_INVALID_SOCKET;
97 }
98
99 //
100 // Receive data.
101 //
102 pj_status_t recv( Pj_Async_Op *op_key,
103 void *buf, pj_ssize_t *len,
104 unsigned flags)
105 {
106 return pj_ioqueue_recv( key_, op_key,
107 buf, len, flags);
108 }
109
110 //
111 // Recvfrom()
112 //
113 pj_status_t recvfrom( Pj_Async_Op *op_key,
114 void *buf, pj_ssize_t *len, unsigned flags,
115 Pj_Inet_Addr *addr)
116 {
117 addr->addrlen_ = sizeof(Pj_Inet_Addr);
118 return pj_ioqueue_recvfrom( key_, op_key, buf, len, flags,
119 addr, &addr->addrlen_ );
120 }
121
122 //
123 // send()
124 //
125 pj_status_t send( Pj_Async_Op *op_key,
126 const void *data, pj_ssize_t *len,
127 unsigned flags)
128 {
129 return pj_ioqueue_send( key_, op_key, data, len, flags);
130 }
131
132 //
133 // sendto()
134 //
135 pj_status_t sendto( Pj_Async_Op *op_key,
136 const void *data, pj_ssize_t *len, unsigned flags,
137 const Pj_Inet_Addr &addr)
138 {
139 return pj_ioqueue_sendto(key_, op_key, data, len, flags,
140 &addr, sizeof(addr));
141 }
142
Benny Prijono0a749f12005-10-31 21:02:30 +0000143#if PJ_HAS_TCP
Benny Prijono85d3f452005-11-09 15:37:19 +0000144 //
145 // connect()
146 //
147 pj_status_t connect(const Pj_Inet_Addr &addr)
148 {
149 return pj_ioqueue_connect(key_, &addr, sizeof(addr));
150 }
151
152 //
153 // accept.
154 //
155 pj_status_t accept( Pj_Async_Op *op_key,
156 Pj_Socket *sock,
157 Pj_Inet_Addr *local = NULL,
158 Pj_Inet_Addr *remote = NULL)
159 {
160 int *addrlen = local ? &local->addrlen_ : NULL;
161 return pj_ioqueue_accept( key_, op_key, &sock->sock_,
162 local, remote, addrlen );
163 }
164
Benny Prijono0a749f12005-10-31 21:02:30 +0000165#endif
166
167protected:
Benny Prijono85d3f452005-11-09 15:37:19 +0000168 //////////////////
Benny Prijono0a749f12005-10-31 21:02:30 +0000169 // Overridables
Benny Prijono85d3f452005-11-09 15:37:19 +0000170 //////////////////
171
Benny Prijono0a749f12005-10-31 21:02:30 +0000172 //
Benny Prijono85d3f452005-11-09 15:37:19 +0000173 // Timeout callback.
174 //
175 virtual void on_timeout(int data)
176 {
177 }
178
179 //
180 // On read complete callback.
181 //
182 virtual void on_read_complete( Pj_Async_Op *op_key,
183 pj_ssize_t bytes_read)
184 {
185 }
186
187 //
188 // On write complete callback.
189 //
190 virtual void on_write_complete( Pj_Async_Op *op_key,
191 pj_ssize_t bytes_sent)
192 {
193 }
194
Benny Prijono0a749f12005-10-31 21:02:30 +0000195#if PJ_HAS_TCP
Benny Prijono85d3f452005-11-09 15:37:19 +0000196 //
197 // On connect complete callback.
198 //
199 virtual void on_connect_complete(pj_status_t status)
200 {
201 }
202
203 //
204 // On new connection callback.
205 //
206 virtual void on_accept_complete( Pj_Async_Op *op_key,
207 pj_sock_t new_sock,
208 pj_status_t status)
209 {
210 }
211
Benny Prijono0a749f12005-10-31 21:02:30 +0000212#endif
213
Benny Prijono85d3f452005-11-09 15:37:19 +0000214
Benny Prijono0a749f12005-10-31 21:02:30 +0000215private:
Benny Prijono0a749f12005-10-31 21:02:30 +0000216 pj_ioqueue_key_t *key_;
217 pj_timer_entry timer_;
Benny Prijono0a749f12005-10-31 21:02:30 +0000218
Benny Prijono85d3f452005-11-09 15:37:19 +0000219 friend class Pj_Proactor;
220 friend class Pj_Async_Op;
221
222 //
223 // Static timer callback.
224 //
225 static void timer_callback( pj_timer_heap_t *timer_heap,
226 struct pj_timer_entry *entry)
227 {
228 Pj_Event_Handler *handler =
229 (Pj_Event_Handler*) entry->user_data;
230
231 handler->on_timeout(entry->id);
232 }
Benny Prijono0a749f12005-10-31 21:02:30 +0000233};
234
Benny Prijono85d3f452005-11-09 15:37:19 +0000235inline bool Pj_Async_Op::is_pending()
236{
237 return pj_ioqueue_is_pending(handler_->key_, this) != 0;
238}
239
240inline bool Pj_Async_Op::cancel(pj_ssize_t bytes_status)
241{
242 return pj_ioqueue_post_completion(handler_->key_, this,
243 bytes_status) == PJ_SUCCESS;
244}
245
246//////////////////////////////////////////////////////////////////////////////
247// Proactor
248//
249class Pj_Proactor : public Pj_Object
Benny Prijono0a749f12005-10-31 21:02:30 +0000250{
251public:
Benny Prijono85d3f452005-11-09 15:37:19 +0000252 //
253 // Default constructor, initializes to NULL.
254 //
255 Pj_Proactor()
256 : ioq_(NULL), th_(NULL)
257 {
258 cb_.on_read_complete = &read_complete_cb;
259 cb_.on_write_complete = &write_complete_cb;
260 cb_.on_accept_complete = &accept_complete_cb;
261 cb_.on_connect_complete = &connect_complete_cb;
262 }
Benny Prijono0a749f12005-10-31 21:02:30 +0000263
Benny Prijono85d3f452005-11-09 15:37:19 +0000264 //
265 // Construct proactor.
266 //
267 Pj_Proactor( Pj_Pool *pool, pj_size_t max_fd,
268 pj_size_t max_timer_entries )
269 : ioq_(NULL), th_(NULL)
270 {
271 cb_.on_read_complete = &read_complete_cb;
272 cb_.on_write_complete = &write_complete_cb;
273 cb_.on_accept_complete = &accept_complete_cb;
274 cb_.on_connect_complete = &connect_complete_cb;
275 }
Benny Prijono0a749f12005-10-31 21:02:30 +0000276
Benny Prijono85d3f452005-11-09 15:37:19 +0000277 //
278 // Destructor.
279 //
280 ~Pj_Proactor()
281 {
282 destroy();
283 }
Benny Prijono0a749f12005-10-31 21:02:30 +0000284
Benny Prijono85d3f452005-11-09 15:37:19 +0000285 //
286 // Create proactor.
287 //
288 pj_status_t create( Pj_Pool *pool, pj_size_t max_fd,
289 pj_size_t timer_entry_count)
290 {
291 pj_status_t status;
Benny Prijono0a749f12005-10-31 21:02:30 +0000292
Benny Prijono85d3f452005-11-09 15:37:19 +0000293 destroy();
Benny Prijono0a749f12005-10-31 21:02:30 +0000294
Benny Prijono85d3f452005-11-09 15:37:19 +0000295 status = pj_ioqueue_create(pool->pool_(), max_fd, &ioq_);
296 if (status != PJ_SUCCESS)
297 return status;
298
299 status = pj_timer_heap_create(pool->pool_(),
300 timer_entry_count, &th_);
301 if (status != PJ_SUCCESS) {
302 pj_ioqueue_destroy(ioq_);
303 ioq_ = NULL;
304 return NULL;
305 }
306
307 status;
308 }
309
310 //
311 // Destroy proactor.
312 //
313 void destroy()
314 {
315 if (ioq_) {
316 pj_ioqueue_destroy(ioq_);
317 ioq_ = NULL;
318 }
319 if (th_) {
320 pj_timer_heap_destroy(th_);
321 th_ = NULL;
322 }
323 }
324
325 //
326 // Register handler.
327 // This will call handler->get_socket_handle()
328 //
329 pj_status_t register_socket_handler(Pj_Pool *pool,
330 Pj_Event_Handler *handler)
331 {
332 return pj_ioqueue_register_sock( pool->pool_(), ioq_,
333 handler->get_socket_handle(),
334 handler, &cb_, &handler->key_ );
335 }
336
337 //
338 // Unregister handler.
339 //
340 static void unregister_handler(Pj_Event_Handler *handler)
341 {
342 if (handler->key_) {
343 pj_ioqueue_unregister( handler->key_ );
344 handler->key_ = NULL;
345 }
346 }
347
348 //
349 // Scheduler timer.
350 //
351 bool schedule_timer( Pj_Event_Handler *handler,
352 const Pj_Time_Val &delay,
353 int id=-1)
354 {
355 return schedule_timer(th_, handler, delay, id);
356 }
357
358 //
359 // Cancel timer.
360 //
361 bool cancel_timer(Pj_Event_Handler *handler)
362 {
363 return pj_timer_heap_cancel(th_, &handler->timer_) == 1;
364 }
365
366 //
367 // Handle events.
368 //
369 int handle_events(Pj_Time_Val *max_timeout)
370 {
371 Pj_Time_Val timeout(0, 0);
372 int timer_count;
373
374 timer_count = pj_timer_heap_poll( th_, &timeout );
375
376 if (timeout.get_sec() < 0)
377 timeout.sec = PJ_MAXINT32;
378
379 /* If caller specifies maximum time to wait, then compare the value
380 * with the timeout to wait from timer, and use the minimum value.
381 */
382 if (max_timeout && timeout >= *max_timeout) {
383 timeout = *max_timeout;
384 }
385
386 /* Poll events in ioqueue. */
387 int ioqueue_count;
388
389 ioqueue_count = pj_ioqueue_poll(ioq_, &timeout);
390 if (ioqueue_count < 0)
391 return ioqueue_count;
392
393 return ioqueue_count + timer_count;
394 }
395
396 //
397 // Get the internal ioqueue object.
398 //
399 pj_ioqueue_t *get_io_queue()
400 {
401 return ioq_;
402 }
403
404 //
405 // Get the internal timer heap object.
406 //
407 pj_timer_heap_t *get_timer_heap()
408 {
409 return th_;
410 }
Benny Prijono0a749f12005-10-31 21:02:30 +0000411
412private:
413 pj_ioqueue_t *ioq_;
414 pj_timer_heap_t *th_;
Benny Prijono85d3f452005-11-09 15:37:19 +0000415 pj_ioqueue_callback cb_;
Benny Prijono0a749f12005-10-31 21:02:30 +0000416
Benny Prijono85d3f452005-11-09 15:37:19 +0000417 static bool schedule_timer( pj_timer_heap_t *timer,
418 Pj_Event_Handler *handler,
419 const Pj_Time_Val &delay,
420 int id=-1)
421 {
422 handler->timer_.id = id;
423 return pj_timer_heap_schedule(timer, &handler->timer_, &delay) == 0;
424 }
425
426
427 //
428 // Static read completion callback.
429 //
430 static void read_complete_cb( pj_ioqueue_key_t *key,
431 pj_ioqueue_op_key_t *op_key,
432 pj_ssize_t bytes_read)
433 {
434 Pj_Event_Handler *handler =
435 (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
436
437 handler->on_read_complete((Pj_Async_Op*)op_key, bytes_read);
438 }
439
440 //
441 // Static write completion callback.
442 //
443 static void write_complete_cb(pj_ioqueue_key_t *key,
444 pj_ioqueue_op_key_t *op_key,
445 pj_ssize_t bytes_sent)
446 {
447 Pj_Event_Handler *handler =
448 (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
449
450 handler->on_write_complete((Pj_Async_Op*)op_key, bytes_sent);
451 }
452
453 //
454 // Static accept completion callback.
455 //
456 static void accept_complete_cb(pj_ioqueue_key_t *key,
457 pj_ioqueue_op_key_t *op_key,
458 pj_sock_t new_sock,
459 pj_status_t status)
460 {
461 Pj_Event_Handler *handler =
462 (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
463
464 handler->on_accept_complete((Pj_Async_Op*)op_key, new_sock, status);
465 }
466
467 //
468 // Static connect completion callback.
469 //
470 static void connect_complete_cb(pj_ioqueue_key_t *key,
471 pj_status_t status)
472 {
473 Pj_Event_Handler *handler =
474 (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
475
476 handler->on_connect_complete(status);
477 }
478
Benny Prijono0a749f12005-10-31 21:02:30 +0000479};
480
Benny Prijono85d3f452005-11-09 15:37:19 +0000481#endif /* __PJPP_PROACTOR_H__ */