/* $Id$ */ | |
/* | |
* Copyright (C)2003-2006 Benny Prijono <benny@prijono.org> | |
* | |
* This program is free software; you can redistribute it and/or modify | |
* it under the terms of the GNU General Public License as published by | |
* the Free Software Foundation; either version 2 of the License, or | |
* (at your option) any later version. | |
* | |
* This program is distributed in the hope that it will be useful, | |
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
* GNU General Public License for more details. | |
* | |
* You should have received a copy of the GNU General Public License | |
* along with this program; if not, write to the Free Software | |
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA | |
*/ | |
#ifndef __PJPP_PROACTOR_HPP__ | |
#define __PJPP_PROACTOR_HPP__ | |
#include <pj/ioqueue.h> | |
#include <pj++/pool.hpp> | |
#include <pj++/sock.hpp> | |
#include <pj++/timer.hpp> | |
#include <pj/errno.h> | |
class Pj_Proactor; | |
class Pj_Event_Handler; | |
////////////////////////////////////////////////////////////////////////////// | |
// Asynchronous operation key. | |
// | |
// Applications may inheric this class to put their application | |
// specific data. | |
// | |
class Pj_Async_Op : public pj_ioqueue_op_key_t | |
{ | |
public: | |
// | |
// Construct with null handler. | |
// App must call set_handler() before use. | |
// | |
Pj_Async_Op() | |
: handler_(NULL) | |
{ | |
pj_ioqueue_op_key_init(this, sizeof(*this)); | |
} | |
// | |
// Constructor. | |
// | |
explicit Pj_Async_Op(Pj_Event_Handler *handler) | |
: handler_(handler) | |
{ | |
pj_ioqueue_op_key_init(this, sizeof(*this)); | |
} | |
// | |
// Set handler. | |
// | |
void set_handler(Pj_Event_Handler *handler) | |
{ | |
handler_ = handler; | |
} | |
// | |
// Check whether operation is still pending for this key. | |
// | |
bool is_pending(); | |
// | |
// Cancel the operation. | |
// | |
bool cancel(pj_ssize_t bytes_status=-PJ_ECANCELLED); | |
protected: | |
Pj_Event_Handler *handler_; | |
}; | |
////////////////////////////////////////////////////////////////////////////// | |
// Event handler. | |
// | |
// Applications should inherit this class to receive various event | |
// notifications. | |
// | |
// Applications should implement get_socket_handle(). | |
// | |
class Pj_Event_Handler : public Pj_Object | |
{ | |
friend class Pj_Proactor; | |
public: | |
// | |
// Default constructor. | |
// | |
Pj_Event_Handler() | |
: key_(NULL) | |
{ | |
pj_memset(&timer_, 0, sizeof(timer_)); | |
timer_.user_data = this; | |
timer_.cb = &timer_callback; | |
} | |
// | |
// Destroy. | |
// | |
virtual ~Pj_Event_Handler() | |
{ | |
unregister(); | |
} | |
// | |
// Unregister this handler from the ioqueue. | |
// | |
void unregister() | |
{ | |
if (key_) { | |
pj_ioqueue_unregister(key_); | |
key_ = NULL; | |
} | |
} | |
// | |
// Get socket handle associated with this. | |
// | |
virtual pj_sock_t get_socket_handle() | |
{ | |
return PJ_INVALID_SOCKET; | |
} | |
// | |
// Start async receive. | |
// | |
pj_status_t recv( Pj_Async_Op *op_key, | |
void *buf, pj_ssize_t *len, | |
unsigned flags) | |
{ | |
return pj_ioqueue_recv( key_, op_key, | |
buf, len, flags); | |
} | |
// | |
// Start async recvfrom() | |
// | |
pj_status_t recvfrom( Pj_Async_Op *op_key, | |
void *buf, pj_ssize_t *len, unsigned flags, | |
Pj_Inet_Addr *addr) | |
{ | |
addr->addrlen_ = sizeof(Pj_Inet_Addr); | |
return pj_ioqueue_recvfrom( key_, op_key, buf, len, flags, | |
addr, &addr->addrlen_ ); | |
} | |
// | |
// Start async send() | |
// | |
pj_status_t send( Pj_Async_Op *op_key, | |
const void *data, pj_ssize_t *len, | |
unsigned flags) | |
{ | |
return pj_ioqueue_send( key_, op_key, data, len, flags); | |
} | |
// | |
// Start async sendto() | |
// | |
pj_status_t sendto( Pj_Async_Op *op_key, | |
const void *data, pj_ssize_t *len, unsigned flags, | |
const Pj_Inet_Addr &addr) | |
{ | |
return pj_ioqueue_sendto(key_, op_key, data, len, flags, | |
&addr, sizeof(addr)); | |
} | |
#if PJ_HAS_TCP | |
// | |
// Start async connect() | |
// | |
pj_status_t connect(const Pj_Inet_Addr &addr) | |
{ | |
return pj_ioqueue_connect(key_, &addr, sizeof(addr)); | |
} | |
// | |
// Start async accept(). | |
// | |
pj_status_t accept( Pj_Async_Op *op_key, | |
Pj_Socket *sock, | |
Pj_Inet_Addr *local = NULL, | |
Pj_Inet_Addr *remote = NULL) | |
{ | |
int *addrlen = local ? &local->addrlen_ : NULL; | |
return pj_ioqueue_accept( key_, op_key, &sock->sock_, | |
local, remote, addrlen ); | |
} | |
#endif | |
protected: | |
////////////////// | |
// Overridables | |
////////////////// | |
// | |
// Timeout callback. | |
// | |
virtual void on_timeout(int data) | |
{ | |
} | |
// | |
// On read complete callback. | |
// | |
virtual void on_read_complete( Pj_Async_Op *op_key, | |
pj_ssize_t bytes_read) | |
{ | |
} | |
// | |
// On write complete callback. | |
// | |
virtual void on_write_complete( Pj_Async_Op *op_key, | |
pj_ssize_t bytes_sent) | |
{ | |
} | |
#if PJ_HAS_TCP | |
// | |
// On connect complete callback. | |
// | |
virtual void on_connect_complete(pj_status_t status) | |
{ | |
} | |
// | |
// On new connection callback. | |
// | |
virtual void on_accept_complete( Pj_Async_Op *op_key, | |
pj_sock_t new_sock, | |
pj_status_t status) | |
{ | |
} | |
#endif | |
private: | |
pj_ioqueue_key_t *key_; | |
pj_timer_entry timer_; | |
friend class Pj_Proactor; | |
friend class Pj_Async_Op; | |
// | |
// Static timer callback. | |
// | |
static void timer_callback( pj_timer_heap_t *timer_heap, | |
struct pj_timer_entry *entry) | |
{ | |
Pj_Event_Handler *handler = | |
(Pj_Event_Handler*) entry->user_data; | |
handler->on_timeout(entry->id); | |
} | |
}; | |
inline bool Pj_Async_Op::is_pending() | |
{ | |
return pj_ioqueue_is_pending(handler_->key_, this) != 0; | |
} | |
inline bool Pj_Async_Op::cancel(pj_ssize_t bytes_status) | |
{ | |
return pj_ioqueue_post_completion(handler_->key_, this, | |
bytes_status) == PJ_SUCCESS; | |
} | |
////////////////////////////////////////////////////////////////////////////// | |
// Proactor | |
// | |
class Pj_Proactor : public Pj_Object | |
{ | |
public: | |
// | |
// Default constructor, initializes to NULL. | |
// | |
Pj_Proactor() | |
: ioq_(NULL), th_(NULL) | |
{ | |
cb_.on_read_complete = &read_complete_cb; | |
cb_.on_write_complete = &write_complete_cb; | |
cb_.on_accept_complete = &accept_complete_cb; | |
cb_.on_connect_complete = &connect_complete_cb; | |
} | |
// | |
// Construct proactor. | |
// | |
Pj_Proactor( Pj_Pool *pool, pj_size_t max_fd, | |
pj_size_t max_timer_entries ) | |
: ioq_(NULL), th_(NULL) | |
{ | |
cb_.on_read_complete = &read_complete_cb; | |
cb_.on_write_complete = &write_complete_cb; | |
cb_.on_accept_complete = &accept_complete_cb; | |
cb_.on_connect_complete = &connect_complete_cb; | |
create(pool, max_fd, max_timer_entries); | |
} | |
// | |
// Destructor. | |
// | |
~Pj_Proactor() | |
{ | |
destroy(); | |
} | |
// | |
// Create proactor. | |
// | |
pj_status_t create( Pj_Pool *pool, pj_size_t max_fd, | |
pj_size_t timer_entry_count) | |
{ | |
pj_status_t status; | |
destroy(); | |
status = pj_ioqueue_create(pool->pool_(), max_fd, &ioq_); | |
if (status != PJ_SUCCESS) | |
return status; | |
status = pj_timer_heap_create(pool->pool_(), | |
timer_entry_count, &th_); | |
if (status != PJ_SUCCESS) { | |
pj_ioqueue_destroy(ioq_); | |
ioq_ = NULL; | |
return NULL; | |
} | |
return status; | |
} | |
// | |
// Destroy proactor. | |
// | |
void destroy() | |
{ | |
if (ioq_) { | |
pj_ioqueue_destroy(ioq_); | |
ioq_ = NULL; | |
} | |
if (th_) { | |
pj_timer_heap_destroy(th_); | |
th_ = NULL; | |
} | |
} | |
// | |
// Register handler. | |
// This will call handler->get_socket_handle() | |
// | |
pj_status_t register_socket_handler(Pj_Pool *pool, | |
Pj_Event_Handler *handler) | |
{ | |
return pj_ioqueue_register_sock( pool->pool_(), ioq_, | |
handler->get_socket_handle(), | |
handler, &cb_, &handler->key_ ); | |
} | |
// | |
// Unregister handler. | |
// | |
static void unregister_handler(Pj_Event_Handler *handler) | |
{ | |
if (handler->key_) { | |
pj_ioqueue_unregister( handler->key_ ); | |
handler->key_ = NULL; | |
} | |
} | |
// | |
// Scheduler timer. | |
// | |
bool schedule_timer( Pj_Event_Handler *handler, | |
const Pj_Time_Val &delay, | |
int id=-1) | |
{ | |
return schedule_timer(th_, handler, delay, id); | |
} | |
// | |
// Cancel timer. | |
// | |
bool cancel_timer(Pj_Event_Handler *handler) | |
{ | |
return pj_timer_heap_cancel(th_, &handler->timer_) == 1; | |
} | |
// | |
// Handle events. | |
// | |
int handle_events(Pj_Time_Val *max_timeout) | |
{ | |
Pj_Time_Val timeout(0, 0); | |
int timer_count; | |
timer_count = pj_timer_heap_poll( th_, &timeout ); | |
if (timeout.get_sec() < 0) | |
timeout.sec = PJ_MAXINT32; | |
/* If caller specifies maximum time to wait, then compare the value | |
* with the timeout to wait from timer, and use the minimum value. | |
*/ | |
if (max_timeout && timeout >= *max_timeout) { | |
timeout = *max_timeout; | |
} | |
/* Poll events in ioqueue. */ | |
int ioqueue_count; | |
ioqueue_count = pj_ioqueue_poll(ioq_, &timeout); | |
if (ioqueue_count < 0) | |
return ioqueue_count; | |
return ioqueue_count + timer_count; | |
} | |
// | |
// Get the internal ioqueue object. | |
// | |
pj_ioqueue_t *get_io_queue() | |
{ | |
return ioq_; | |
} | |
// | |
// Get the internal timer heap object. | |
// | |
pj_timer_heap_t *get_timer_heap() | |
{ | |
return th_; | |
} | |
private: | |
pj_ioqueue_t *ioq_; | |
pj_timer_heap_t *th_; | |
pj_ioqueue_callback cb_; | |
static bool schedule_timer( pj_timer_heap_t *timer, | |
Pj_Event_Handler *handler, | |
const Pj_Time_Val &delay, | |
int id=-1) | |
{ | |
handler->timer_.id = id; | |
return pj_timer_heap_schedule(timer, &handler->timer_, &delay) == 0; | |
} | |
// | |
// Static read completion callback. | |
// | |
static void read_complete_cb( pj_ioqueue_key_t *key, | |
pj_ioqueue_op_key_t *op_key, | |
pj_ssize_t bytes_read) | |
{ | |
Pj_Event_Handler *handler = | |
(Pj_Event_Handler*) pj_ioqueue_get_user_data(key); | |
handler->on_read_complete((Pj_Async_Op*)op_key, bytes_read); | |
} | |
// | |
// Static write completion callback. | |
// | |
static void write_complete_cb(pj_ioqueue_key_t *key, | |
pj_ioqueue_op_key_t *op_key, | |
pj_ssize_t bytes_sent) | |
{ | |
Pj_Event_Handler *handler = | |
(Pj_Event_Handler*) pj_ioqueue_get_user_data(key); | |
handler->on_write_complete((Pj_Async_Op*)op_key, bytes_sent); | |
} | |
// | |
// Static accept completion callback. | |
// | |
static void accept_complete_cb(pj_ioqueue_key_t *key, | |
pj_ioqueue_op_key_t *op_key, | |
pj_sock_t new_sock, | |
pj_status_t status) | |
{ | |
Pj_Event_Handler *handler = | |
(Pj_Event_Handler*) pj_ioqueue_get_user_data(key); | |
handler->on_accept_complete((Pj_Async_Op*)op_key, new_sock, status); | |
} | |
// | |
// Static connect completion callback. | |
// | |
static void connect_complete_cb(pj_ioqueue_key_t *key, | |
pj_status_t status) | |
{ | |
Pj_Event_Handler *handler = | |
(Pj_Event_Handler*) pj_ioqueue_get_user_data(key); | |
handler->on_connect_complete(status); | |
} | |
}; | |
#endif /* __PJPP_PROACTOR_HPP__ */ | |