blob: 3f967b734e52d06a37caa9188e73486ddc0c5659 [file] [log] [blame]
Benny Prijono5dcb38d2005-11-21 01:55:47 +00001/* $Id$ */
2/*
3 * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19#ifndef __PJPP_PROACTOR_HPP__
20#define __PJPP_PROACTOR_HPP__
21
22#include <pj/ioqueue.h>
23#include <pj++/pool.hpp>
24#include <pj++/sock.hpp>
25#include <pj++/timer.hpp>
26#include <pj/errno.h>
27
28class Pj_Proactor;
29class Pj_Event_Handler;
30
31
32//////////////////////////////////////////////////////////////////////////////
33// Asynchronous operation key.
34//
35// Applications may inheric this class to put their application
36// specific data.
37//
38class Pj_Async_Op : public pj_ioqueue_op_key_t
39{
40public:
41 //
42 // Construct with null handler.
43 // App must call set_handler() before use.
44 //
45 Pj_Async_Op()
46 : handler_(NULL)
47 {
48 pj_ioqueue_op_key_init(this, sizeof(*this));
49 }
50
51 //
52 // Constructor.
53 //
54 explicit Pj_Async_Op(Pj_Event_Handler *handler)
55 : handler_(handler)
56 {
57 pj_ioqueue_op_key_init(this, sizeof(*this));
58 }
59
60 //
61 // Set handler.
62 //
63 void set_handler(Pj_Event_Handler *handler)
64 {
65 handler_ = handler;
66 }
67
68 //
69 // Check whether operation is still pending for this key.
70 //
71 bool is_pending();
72
73 //
74 // Cancel the operation.
75 //
76 bool cancel(pj_ssize_t bytes_status=-PJ_ECANCELLED);
77
78protected:
79 Pj_Event_Handler *handler_;
80};
81
82
83//////////////////////////////////////////////////////////////////////////////
84// Event handler.
85//
86// Applications should inherit this class to receive various event
87// notifications.
88//
89// Applications should implement get_socket_handle().
90//
91class Pj_Event_Handler : public Pj_Object
92{
93 friend class Pj_Proactor;
94public:
95 //
96 // Default constructor.
97 //
98 Pj_Event_Handler()
99 : key_(NULL)
100 {
101 pj_memset(&timer_, 0, sizeof(timer_));
102 timer_.user_data = this;
103 timer_.cb = &timer_callback;
104 }
105
106 //
107 // Destroy.
108 //
109 virtual ~Pj_Event_Handler()
110 {
111 unregister();
112 }
113
114 //
115 // Unregister this handler from the ioqueue.
116 //
117 void unregister()
118 {
119 if (key_) {
120 pj_ioqueue_unregister(key_);
121 key_ = NULL;
122 }
123 }
124
125 //
126 // Get socket handle associated with this.
127 //
128 virtual pj_sock_t get_socket_handle()
129 {
130 return PJ_INVALID_SOCKET;
131 }
132
133 //
134 // Start async receive.
135 //
136 pj_status_t recv( Pj_Async_Op *op_key,
137 void *buf, pj_ssize_t *len,
138 unsigned flags)
139 {
140 return pj_ioqueue_recv( key_, op_key,
141 buf, len, flags);
142 }
143
144 //
145 // Start async recvfrom()
146 //
147 pj_status_t recvfrom( Pj_Async_Op *op_key,
148 void *buf, pj_ssize_t *len, unsigned flags,
149 Pj_Inet_Addr *addr)
150 {
151 addr->addrlen_ = sizeof(Pj_Inet_Addr);
152 return pj_ioqueue_recvfrom( key_, op_key, buf, len, flags,
153 addr, &addr->addrlen_ );
154 }
155
156 //
157 // Start async send()
158 //
159 pj_status_t send( Pj_Async_Op *op_key,
160 const void *data, pj_ssize_t *len,
161 unsigned flags)
162 {
163 return pj_ioqueue_send( key_, op_key, data, len, flags);
164 }
165
166 //
167 // Start async sendto()
168 //
169 pj_status_t sendto( Pj_Async_Op *op_key,
170 const void *data, pj_ssize_t *len, unsigned flags,
171 const Pj_Inet_Addr &addr)
172 {
173 return pj_ioqueue_sendto(key_, op_key, data, len, flags,
174 &addr, sizeof(addr));
175 }
176
177#if PJ_HAS_TCP
178 //
179 // Start async connect()
180 //
181 pj_status_t connect(const Pj_Inet_Addr &addr)
182 {
183 return pj_ioqueue_connect(key_, &addr, sizeof(addr));
184 }
185
186 //
187 // Start async accept().
188 //
189 pj_status_t accept( Pj_Async_Op *op_key,
190 Pj_Socket *sock,
191 Pj_Inet_Addr *local = NULL,
192 Pj_Inet_Addr *remote = NULL)
193 {
194 int *addrlen = local ? &local->addrlen_ : NULL;
195 return pj_ioqueue_accept( key_, op_key, &sock->sock_,
196 local, remote, addrlen );
197 }
198
199#endif
200
201protected:
202 //////////////////
203 // Overridables
204 //////////////////
205
206 //
207 // Timeout callback.
208 //
Benny Prijonoac9d1422006-01-18 23:32:27 +0000209 virtual void on_timeout(int)
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000210 {
211 }
212
213 //
214 // On read complete callback.
215 //
Benny Prijonoac9d1422006-01-18 23:32:27 +0000216 virtual void on_read_complete( Pj_Async_Op*, pj_ssize_t)
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000217 {
218 }
219
220 //
221 // On write complete callback.
222 //
Benny Prijonoac9d1422006-01-18 23:32:27 +0000223 virtual void on_write_complete( Pj_Async_Op *, pj_ssize_t)
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000224 {
225 }
226
227#if PJ_HAS_TCP
228 //
229 // On connect complete callback.
230 //
Benny Prijonoac9d1422006-01-18 23:32:27 +0000231 virtual void on_connect_complete(pj_status_t)
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000232 {
233 }
234
235 //
236 // On new connection callback.
237 //
Benny Prijonoac9d1422006-01-18 23:32:27 +0000238 virtual void on_accept_complete( Pj_Async_Op*, pj_sock_t, pj_status_t)
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000239 {
240 }
241
242#endif
243
244
245private:
246 pj_ioqueue_key_t *key_;
247 pj_timer_entry timer_;
248
249 friend class Pj_Proactor;
250 friend class Pj_Async_Op;
251
252 //
253 // Static timer callback.
254 //
Benny Prijonoac9d1422006-01-18 23:32:27 +0000255 static void timer_callback( pj_timer_heap_t*,
Benny Prijono5dcb38d2005-11-21 01:55:47 +0000256 struct pj_timer_entry *entry)
257 {
258 Pj_Event_Handler *handler =
259 (Pj_Event_Handler*) entry->user_data;
260
261 handler->on_timeout(entry->id);
262 }
263};
264
265inline bool Pj_Async_Op::is_pending()
266{
267 return pj_ioqueue_is_pending(handler_->key_, this) != 0;
268}
269
270inline bool Pj_Async_Op::cancel(pj_ssize_t bytes_status)
271{
272 return pj_ioqueue_post_completion(handler_->key_, this,
273 bytes_status) == PJ_SUCCESS;
274}
275
276//////////////////////////////////////////////////////////////////////////////
277// Proactor
278//
279class Pj_Proactor : public Pj_Object
280{
281public:
282 //
283 // Default constructor, initializes to NULL.
284 //
285 Pj_Proactor()
286 : ioq_(NULL), th_(NULL)
287 {
288 cb_.on_read_complete = &read_complete_cb;
289 cb_.on_write_complete = &write_complete_cb;
290 cb_.on_accept_complete = &accept_complete_cb;
291 cb_.on_connect_complete = &connect_complete_cb;
292 }
293
294 //
295 // Construct proactor.
296 //
297 Pj_Proactor( Pj_Pool *pool, pj_size_t max_fd,
298 pj_size_t max_timer_entries )
299 : ioq_(NULL), th_(NULL)
300 {
301 cb_.on_read_complete = &read_complete_cb;
302 cb_.on_write_complete = &write_complete_cb;
303 cb_.on_accept_complete = &accept_complete_cb;
304 cb_.on_connect_complete = &connect_complete_cb;
305
306 create(pool, max_fd, max_timer_entries);
307 }
308
309 //
310 // Destructor.
311 //
312 ~Pj_Proactor()
313 {
314 destroy();
315 }
316
317 //
318 // Create proactor.
319 //
320 pj_status_t create( Pj_Pool *pool, pj_size_t max_fd,
321 pj_size_t timer_entry_count)
322 {
323 pj_status_t status;
324
325 destroy();
326
327 status = pj_ioqueue_create(pool->pool_(), max_fd, &ioq_);
328 if (status != PJ_SUCCESS)
329 return status;
330
331 status = pj_timer_heap_create(pool->pool_(),
332 timer_entry_count, &th_);
333 if (status != PJ_SUCCESS) {
334 pj_ioqueue_destroy(ioq_);
335 ioq_ = NULL;
336 return NULL;
337 }
338
339 return status;
340 }
341
342 //
343 // Destroy proactor.
344 //
345 void destroy()
346 {
347 if (ioq_) {
348 pj_ioqueue_destroy(ioq_);
349 ioq_ = NULL;
350 }
351 if (th_) {
352 pj_timer_heap_destroy(th_);
353 th_ = NULL;
354 }
355 }
356
357 //
358 // Register handler.
359 // This will call handler->get_socket_handle()
360 //
361 pj_status_t register_socket_handler(Pj_Pool *pool,
362 Pj_Event_Handler *handler)
363 {
364 return pj_ioqueue_register_sock( pool->pool_(), ioq_,
365 handler->get_socket_handle(),
366 handler, &cb_, &handler->key_ );
367 }
368
369 //
370 // Unregister handler.
371 //
372 static void unregister_handler(Pj_Event_Handler *handler)
373 {
374 if (handler->key_) {
375 pj_ioqueue_unregister( handler->key_ );
376 handler->key_ = NULL;
377 }
378 }
379
380 //
381 // Scheduler timer.
382 //
383 bool schedule_timer( Pj_Event_Handler *handler,
384 const Pj_Time_Val &delay,
385 int id=-1)
386 {
387 return schedule_timer(th_, handler, delay, id);
388 }
389
390 //
391 // Cancel timer.
392 //
393 bool cancel_timer(Pj_Event_Handler *handler)
394 {
395 return pj_timer_heap_cancel(th_, &handler->timer_) == 1;
396 }
397
398 //
399 // Handle events.
400 //
401 int handle_events(Pj_Time_Val *max_timeout)
402 {
403 Pj_Time_Val timeout(0, 0);
404 int timer_count;
405
406 timer_count = pj_timer_heap_poll( th_, &timeout );
407
408 if (timeout.get_sec() < 0)
409 timeout.sec = PJ_MAXINT32;
410
411 /* If caller specifies maximum time to wait, then compare the value
412 * with the timeout to wait from timer, and use the minimum value.
413 */
414 if (max_timeout && timeout >= *max_timeout) {
415 timeout = *max_timeout;
416 }
417
418 /* Poll events in ioqueue. */
419 int ioqueue_count;
420
421 ioqueue_count = pj_ioqueue_poll(ioq_, &timeout);
422 if (ioqueue_count < 0)
423 return ioqueue_count;
424
425 return ioqueue_count + timer_count;
426 }
427
428 //
429 // Get the internal ioqueue object.
430 //
431 pj_ioqueue_t *get_io_queue()
432 {
433 return ioq_;
434 }
435
436 //
437 // Get the internal timer heap object.
438 //
439 pj_timer_heap_t *get_timer_heap()
440 {
441 return th_;
442 }
443
444private:
445 pj_ioqueue_t *ioq_;
446 pj_timer_heap_t *th_;
447 pj_ioqueue_callback cb_;
448
449 static bool schedule_timer( pj_timer_heap_t *timer,
450 Pj_Event_Handler *handler,
451 const Pj_Time_Val &delay,
452 int id=-1)
453 {
454 handler->timer_.id = id;
455 return pj_timer_heap_schedule(timer, &handler->timer_, &delay) == 0;
456 }
457
458
459 //
460 // Static read completion callback.
461 //
462 static void read_complete_cb( pj_ioqueue_key_t *key,
463 pj_ioqueue_op_key_t *op_key,
464 pj_ssize_t bytes_read)
465 {
466 Pj_Event_Handler *handler =
467 (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
468
469 handler->on_read_complete((Pj_Async_Op*)op_key, bytes_read);
470 }
471
472 //
473 // Static write completion callback.
474 //
475 static void write_complete_cb(pj_ioqueue_key_t *key,
476 pj_ioqueue_op_key_t *op_key,
477 pj_ssize_t bytes_sent)
478 {
479 Pj_Event_Handler *handler =
480 (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
481
482 handler->on_write_complete((Pj_Async_Op*)op_key, bytes_sent);
483 }
484
485 //
486 // Static accept completion callback.
487 //
488 static void accept_complete_cb(pj_ioqueue_key_t *key,
489 pj_ioqueue_op_key_t *op_key,
490 pj_sock_t new_sock,
491 pj_status_t status)
492 {
493 Pj_Event_Handler *handler =
494 (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
495
496 handler->on_accept_complete((Pj_Async_Op*)op_key, new_sock, status);
497 }
498
499 //
500 // Static connect completion callback.
501 //
502 static void connect_complete_cb(pj_ioqueue_key_t *key,
503 pj_status_t status)
504 {
505 Pj_Event_Handler *handler =
506 (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
507
508 handler->on_connect_complete(status);
509 }
510
511};
512
513#endif /* __PJPP_PROACTOR_HPP__ */
514