blob: 14ec28780506e6c0cb53cb9e7e37c99dac3fdae9 [file] [log] [blame]
Tristan Matthews0a329cc2013-07-17 13:20:14 -04001/* $Id$ */
2/*
3 * Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20#ifndef __PJPP_PROACTOR_HPP__
21#define __PJPP_PROACTOR_HPP__
22
23#include <pj/ioqueue.h>
24#include <pj++/pool.hpp>
25#include <pj++/sock.hpp>
26#include <pj++/timer.hpp>
27#include <pj/errno.h>
28
29class Pj_Proactor;
30class Pj_Event_Handler;
31
32
33//////////////////////////////////////////////////////////////////////////////
34// Asynchronous operation key.
35//
36// Applications may inheric this class to put their application
37// specific data.
38//
39class Pj_Async_Op : public pj_ioqueue_op_key_t
40{
41public:
42 //
43 // Construct with null handler.
44 // App must call set_handler() before use.
45 //
46 Pj_Async_Op()
47 : handler_(NULL)
48 {
49 pj_ioqueue_op_key_init(this, sizeof(*this));
50 }
51
52 //
53 // Constructor.
54 //
55 explicit Pj_Async_Op(Pj_Event_Handler *handler)
56 : handler_(handler)
57 {
58 pj_ioqueue_op_key_init(this, sizeof(*this));
59 }
60
61 //
62 // Set handler.
63 //
64 void set_handler(Pj_Event_Handler *handler)
65 {
66 handler_ = handler;
67 }
68
69 //
70 // Check whether operation is still pending for this key.
71 //
72 bool is_pending();
73
74 //
75 // Cancel the operation.
76 //
77 bool cancel(pj_ssize_t bytes_status=-PJ_ECANCELLED);
78
79protected:
80 Pj_Event_Handler *handler_;
81};
82
83
84//////////////////////////////////////////////////////////////////////////////
85// Event handler.
86//
87// Applications should inherit this class to receive various event
88// notifications.
89//
90// Applications should implement get_socket_handle().
91//
92class Pj_Event_Handler : public Pj_Object
93{
94 friend class Pj_Proactor;
95public:
96 //
97 // Default constructor.
98 //
99 Pj_Event_Handler()
100 : key_(NULL)
101 {
102 pj_memset(&timer_, 0, sizeof(timer_));
103 timer_.user_data = this;
104 timer_.cb = &timer_callback;
105 }
106
107 //
108 // Destroy.
109 //
110 virtual ~Pj_Event_Handler()
111 {
112 unregister();
113 }
114
115 //
116 // Unregister this handler from the ioqueue.
117 //
118 void unregister()
119 {
120 if (key_) {
121 pj_ioqueue_unregister(key_);
122 key_ = NULL;
123 }
124 }
125
126 //
127 // Get socket handle associated with this.
128 //
129 virtual pj_sock_t get_socket_handle()
130 {
131 return PJ_INVALID_SOCKET;
132 }
133
134 //
135 // Start async receive.
136 //
137 pj_status_t recv( Pj_Async_Op *op_key,
138 void *buf, pj_ssize_t *len,
139 unsigned flags)
140 {
141 return pj_ioqueue_recv( key_, op_key,
142 buf, len, flags);
143 }
144
145 //
146 // Start async recvfrom()
147 //
148 pj_status_t recvfrom( Pj_Async_Op *op_key,
149 void *buf, pj_ssize_t *len, unsigned flags,
150 Pj_Inet_Addr *addr)
151 {
152 addr->addrlen_ = sizeof(Pj_Inet_Addr);
153 return pj_ioqueue_recvfrom( key_, op_key, buf, len, flags,
154 addr, &addr->addrlen_ );
155 }
156
157 //
158 // Start async send()
159 //
160 pj_status_t send( Pj_Async_Op *op_key,
161 const void *data, pj_ssize_t *len,
162 unsigned flags)
163 {
164 return pj_ioqueue_send( key_, op_key, data, len, flags);
165 }
166
167 //
168 // Start async sendto()
169 //
170 pj_status_t sendto( Pj_Async_Op *op_key,
171 const void *data, pj_ssize_t *len, unsigned flags,
172 const Pj_Inet_Addr &addr)
173 {
174 return pj_ioqueue_sendto(key_, op_key, data, len, flags,
175 &addr, sizeof(addr));
176 }
177
178#if PJ_HAS_TCP
179 //
180 // Start async connect()
181 //
182 pj_status_t connect(const Pj_Inet_Addr &addr)
183 {
184 return pj_ioqueue_connect(key_, &addr, sizeof(addr));
185 }
186
187 //
188 // Start async accept().
189 //
190 pj_status_t accept( Pj_Async_Op *op_key,
191 Pj_Socket *sock,
192 Pj_Inet_Addr *local = NULL,
193 Pj_Inet_Addr *remote = NULL)
194 {
195 int *addrlen = local ? &local->addrlen_ : NULL;
196 return pj_ioqueue_accept( key_, op_key, &sock->sock_,
197 local, remote, addrlen );
198 }
199
200#endif
201
202protected:
203 //////////////////
204 // Overridables
205 //////////////////
206
207 //
208 // Timeout callback.
209 //
210 virtual void on_timeout(int)
211 {
212 }
213
214 //
215 // On read complete callback.
216 //
217 virtual void on_read_complete( Pj_Async_Op*, pj_ssize_t)
218 {
219 }
220
221 //
222 // On write complete callback.
223 //
224 virtual void on_write_complete( Pj_Async_Op *, pj_ssize_t)
225 {
226 }
227
228#if PJ_HAS_TCP
229 //
230 // On connect complete callback.
231 //
232 virtual void on_connect_complete(pj_status_t)
233 {
234 }
235
236 //
237 // On new connection callback.
238 //
239 virtual void on_accept_complete( Pj_Async_Op*, pj_sock_t, pj_status_t)
240 {
241 }
242
243#endif
244
245
246private:
247 pj_ioqueue_key_t *key_;
248 pj_timer_entry timer_;
249
250 friend class Pj_Proactor;
251 friend class Pj_Async_Op;
252
253 //
254 // Static timer callback.
255 //
256 static void timer_callback( pj_timer_heap_t*,
257 struct pj_timer_entry *entry)
258 {
259 Pj_Event_Handler *handler =
260 (Pj_Event_Handler*) entry->user_data;
261
262 handler->on_timeout(entry->id);
263 }
264};
265
266inline bool Pj_Async_Op::is_pending()
267{
268 return pj_ioqueue_is_pending(handler_->key_, this) != 0;
269}
270
271inline bool Pj_Async_Op::cancel(pj_ssize_t bytes_status)
272{
273 return pj_ioqueue_post_completion(handler_->key_, this,
274 bytes_status) == PJ_SUCCESS;
275}
276
277//////////////////////////////////////////////////////////////////////////////
278// Proactor
279//
280class Pj_Proactor : public Pj_Object
281{
282public:
283 //
284 // Default constructor, initializes to NULL.
285 //
286 Pj_Proactor()
287 : ioq_(NULL), th_(NULL)
288 {
289 cb_.on_read_complete = &read_complete_cb;
290 cb_.on_write_complete = &write_complete_cb;
291 cb_.on_accept_complete = &accept_complete_cb;
292 cb_.on_connect_complete = &connect_complete_cb;
293 }
294
295 //
296 // Construct proactor.
297 //
298 Pj_Proactor( Pj_Pool *pool, pj_size_t max_fd,
299 pj_size_t max_timer_entries )
300 : ioq_(NULL), th_(NULL)
301 {
302 cb_.on_read_complete = &read_complete_cb;
303 cb_.on_write_complete = &write_complete_cb;
304 cb_.on_accept_complete = &accept_complete_cb;
305 cb_.on_connect_complete = &connect_complete_cb;
306
307 create(pool, max_fd, max_timer_entries);
308 }
309
310 //
311 // Destructor.
312 //
313 ~Pj_Proactor()
314 {
315 destroy();
316 }
317
318 //
319 // Create proactor.
320 //
321 pj_status_t create( Pj_Pool *pool, pj_size_t max_fd,
322 pj_size_t timer_entry_count)
323 {
324 pj_status_t status;
325
326 destroy();
327
328 status = pj_ioqueue_create(pool->pool_(), max_fd, &ioq_);
329 if (status != PJ_SUCCESS)
330 return status;
331
332 status = pj_timer_heap_create(pool->pool_(),
333 timer_entry_count, &th_);
334 if (status != PJ_SUCCESS) {
335 pj_ioqueue_destroy(ioq_);
336 ioq_ = NULL;
337 return NULL;
338 }
339
340 return status;
341 }
342
343 //
344 // Destroy proactor.
345 //
346 void destroy()
347 {
348 if (ioq_) {
349 pj_ioqueue_destroy(ioq_);
350 ioq_ = NULL;
351 }
352 if (th_) {
353 pj_timer_heap_destroy(th_);
354 th_ = NULL;
355 }
356 }
357
358 //
359 // Register handler.
360 // This will call handler->get_socket_handle()
361 //
362 pj_status_t register_socket_handler(Pj_Pool *pool,
363 Pj_Event_Handler *handler)
364 {
365 return pj_ioqueue_register_sock( pool->pool_(), ioq_,
366 handler->get_socket_handle(),
367 handler, &cb_, &handler->key_ );
368 }
369
370 //
371 // Unregister handler.
372 //
373 static void unregister_handler(Pj_Event_Handler *handler)
374 {
375 if (handler->key_) {
376 pj_ioqueue_unregister( handler->key_ );
377 handler->key_ = NULL;
378 }
379 }
380
381 //
382 // Scheduler timer.
383 //
384 bool schedule_timer( Pj_Event_Handler *handler,
385 const Pj_Time_Val &delay,
386 int id=-1)
387 {
388 return schedule_timer(th_, handler, delay, id);
389 }
390
391 //
392 // Cancel timer.
393 //
394 bool cancel_timer(Pj_Event_Handler *handler)
395 {
396 return pj_timer_heap_cancel(th_, &handler->timer_) == 1;
397 }
398
399 //
400 // Handle events.
401 //
402 int handle_events(Pj_Time_Val *max_timeout)
403 {
404 Pj_Time_Val timeout(0, 0);
405 int timer_count;
406
407 timer_count = pj_timer_heap_poll( th_, &timeout );
408
409 if (timeout.get_sec() < 0)
410 timeout.sec = PJ_MAXINT32;
411
412 /* If caller specifies maximum time to wait, then compare the value
413 * with the timeout to wait from timer, and use the minimum value.
414 */
415 if (max_timeout && timeout >= *max_timeout) {
416 timeout = *max_timeout;
417 }
418
419 /* Poll events in ioqueue. */
420 int ioqueue_count;
421
422 ioqueue_count = pj_ioqueue_poll(ioq_, &timeout);
423 if (ioqueue_count < 0)
424 return ioqueue_count;
425
426 return ioqueue_count + timer_count;
427 }
428
429 //
430 // Get the internal ioqueue object.
431 //
432 pj_ioqueue_t *get_io_queue()
433 {
434 return ioq_;
435 }
436
437 //
438 // Get the internal timer heap object.
439 //
440 pj_timer_heap_t *get_timer_heap()
441 {
442 return th_;
443 }
444
445private:
446 pj_ioqueue_t *ioq_;
447 pj_timer_heap_t *th_;
448 pj_ioqueue_callback cb_;
449
450 static bool schedule_timer( pj_timer_heap_t *timer,
451 Pj_Event_Handler *handler,
452 const Pj_Time_Val &delay,
453 int id=-1)
454 {
455 handler->timer_.id = id;
456 return pj_timer_heap_schedule(timer, &handler->timer_, &delay) == 0;
457 }
458
459
460 //
461 // Static read completion callback.
462 //
463 static void read_complete_cb( pj_ioqueue_key_t *key,
464 pj_ioqueue_op_key_t *op_key,
465 pj_ssize_t bytes_read)
466 {
467 Pj_Event_Handler *handler =
468 (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
469
470 handler->on_read_complete((Pj_Async_Op*)op_key, bytes_read);
471 }
472
473 //
474 // Static write completion callback.
475 //
476 static void write_complete_cb(pj_ioqueue_key_t *key,
477 pj_ioqueue_op_key_t *op_key,
478 pj_ssize_t bytes_sent)
479 {
480 Pj_Event_Handler *handler =
481 (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
482
483 handler->on_write_complete((Pj_Async_Op*)op_key, bytes_sent);
484 }
485
486 //
487 // Static accept completion callback.
488 //
489 static void accept_complete_cb(pj_ioqueue_key_t *key,
490 pj_ioqueue_op_key_t *op_key,
491 pj_sock_t new_sock,
492 pj_status_t status)
493 {
494 Pj_Event_Handler *handler =
495 (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
496
497 handler->on_accept_complete((Pj_Async_Op*)op_key, new_sock, status);
498 }
499
500 //
501 // Static connect completion callback.
502 //
503 static void connect_complete_cb(pj_ioqueue_key_t *key,
504 pj_status_t status)
505 {
506 Pj_Event_Handler *handler =
507 (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
508
509 handler->on_connect_complete(status);
510 }
511
512};
513
514#endif /* __PJPP_PROACTOR_HPP__ */
515