Rework pjlib++

git-svn-id: https://svn.pjsip.org/repos/pjproject/main@36 74dad513-b988-da41-8d7b-12977e46ad98
diff --git a/pjlib/src/pj++/compiletest.cpp b/pjlib/src/pj++/compiletest.cpp
deleted file mode 100644
index 5bc4f8b..0000000
--- a/pjlib/src/pj++/compiletest.cpp
+++ /dev/null
@@ -1,46 +0,0 @@
-/* $Id$
- *
- */

-#include <pjlib++.hpp>

-

-

-#if 0

-struct MyNode

-{

-    PJ_DECL_LIST_MEMBER(struct MyNode)

-    int data;

-};

-

-int test()

-{

-    typedef PJ_List<MyNode> MyList;

-    MyList list;

-    MyList::iterator it, end = list.end();

-

-    for (it=list.begin(); it!=end; ++it) {

-	MyNode *n = *it;

-    }

-

-    return 0;

-}

-

-int test_scan()

-{

-    PJ_Scanner scan;

-    PJ_String s;

-    PJ_CharSpec cs;

-

-    scan.get(&cs, &s);

-    return 0;

-}

-

-int test_scan_c()

-{

-    pj_scanner scan;

-    pj_str_t s;

-    pj_char_spec cs;

-

-    pj_scan_get(&scan, cs, &s);

-    return 0;

-}

-#endif

diff --git a/pjlib/src/pj++/pj++.cpp b/pjlib/src/pj++/pj++.cpp
deleted file mode 100644
index 4544209..0000000
--- a/pjlib/src/pj++/pj++.cpp
+++ /dev/null
@@ -1,17 +0,0 @@
-/* $Id$
- *
- */

-#include <pj++/scanner.hpp>

-#include <pj++/timer.hpp>

-#include <pj/except.h>

-

-void PJ_Scanner::syntax_error_handler_throw_pj(pj_scanner *)

-{

-    PJ_THROW( PJ_Scanner::SYNTAX_ERROR );

-}

-

-void PJ_Timer_Entry::timer_heap_callback(pj_timer_heap_t *, pj_timer_entry *e)

-{

-    PJ_Timer_Entry *entry = static_cast<PJ_Timer_Entry*>(e);

-    entry->on_timeout();

-}

diff --git a/pjlib/src/pj++/proactor.cpp b/pjlib/src/pj++/proactor.cpp
deleted file mode 100644
index dba9370..0000000
--- a/pjlib/src/pj++/proactor.cpp
+++ /dev/null
@@ -1,298 +0,0 @@
-/* $Id$
- *
- */

-#include <pj++/proactor.hpp>

-#include <pj/string.h>	// memset

-

-static struct pj_ioqueue_callback ioqueue_cb =

-{

-    &PJ_Event_Handler::read_complete_cb,

-    &PJ_Event_Handler::write_complete_cb,

-    &PJ_Event_Handler::accept_complete_cb,

-    &PJ_Event_Handler::connect_complete_cb,

-};

-

-PJ_Event_Handler::PJ_Event_Handler()

-: proactor_(NULL), key_(NULL)

-{

-    pj_memset(&timer_, 0, sizeof(timer_));

-    timer_.user_data = this;

-    timer_.cb = &timer_callback;

-}

-

-PJ_Event_Handler::~PJ_Event_Handler()

-{

-}

-

-#if PJ_HAS_TCP

-bool PJ_Event_Handler::connect(const PJ_INET_Addr &addr)

-{

-    pj_assert(key_ != NULL && proactor_ != NULL);

-

-    if (key_ == NULL || proactor_ == NULL)

-	return false;

-

-    int status = pj_ioqueue_connect(proactor_->get_io_queue(), key_, 

-				    &addr, sizeof(PJ_INET_Addr));

-    if (status == 0) {

-	on_connect_complete(0);

-	return true;

-    } else if (status == PJ_IOQUEUE_PENDING) {

-	return true;

-    } else {

-	return false;

-    }

-}

-

-bool PJ_Event_Handler::accept(PJ_Socket *sock, PJ_INET_Addr *local, PJ_INET_Addr *remote)

-{

-    pj_assert(key_ != NULL && proactor_ != NULL);

-

-    if (key_ == NULL || proactor_ == NULL)

-	return false;

-

-    int status = pj_ioqueue_accept(proactor_->get_io_queue(), key_, 

-				   &sock->get_handle(), 

-				   local_addr, remote, 

-				   (remote? sizeof(*remote) : 0));

-    if (status == 0) {

-	on_accept_complete(0);

-	return true;

-    } else if (status == PJ_IOQUEUE_PENDING) {

-	return true;

-    } else {

-	return false;

-    }

-}

-

-#endif

-

-bool PJ_Event_Handler::read(void *buf, pj_size_t len)

-{

-    pj_assert(key_ != NULL && proactor_ != NULL);

-

-    if (key_ == NULL || proactor_ == NULL)

-	return false;

-

-    int bytes_status = pj_ioqueue_read(proactor_->get_io_queue(), 

-				       key_, buf, len);

-    if (bytes_status >= 0) {

-	on_read_complete(bytes_status);

-	return true;

-    } else if (bytes_status == PJ_IOQUEUE_PENDING) {

-	return true;

-    } else {

-	return false;

-    }

-}

-

-bool PJ_Event_Handler::recvfrom(void *buf, pj_size_t len, PJ_INET_Addr *addr)

-{

-    pj_assert(key_ != NULL && proactor_ != NULL);

-

-    if (key_ == NULL || proactor_ == NULL)

-	return false;

-

-

-    tmp_recvfrom_addr_len = sizeof(PJ_INET_Addr);

-

-    int bytes_status = pj_ioqueue_recvfrom(proactor_->get_io_queue(), 

-					   key_, buf, len,

-					   addr,

-					   (addr? &tmp_recvfrom_addr_len : NULL));

-    if (bytes_status >= 0) {

-	on_read_complete(bytes_status);

-	return true;

-    } else if (bytes_status == PJ_IOQUEUE_PENDING) {

-	return true;

-    } else {

-	return false;

-    }

-}

-

-bool PJ_Event_Handler::write(const void *data, pj_size_t len)

-{

-    pj_assert(key_ != NULL && proactor_ != NULL);

-

-    if (key_ == NULL || proactor_ == NULL)

-	return false;

-

-    int bytes_status = pj_ioqueue_write(proactor_->get_io_queue(), 

-					key_, data, len);

-    if (bytes_status >= 0) {

-	on_write_complete(bytes_status);

-	return true;

-    } else if (bytes_status == PJ_IOQUEUE_PENDING) {

-	return true;

-    } else {

-	return false;

-    }

-}

-

-bool PJ_Event_Handler::sendto(const void *data, pj_size_t len, const PJ_INET_Addr &addr)

-{

-    pj_assert(key_ != NULL && proactor_ != NULL);

-

-    if (key_ == NULL || proactor_ == NULL)

-	return false;

-

-    int bytes_status = pj_ioqueue_sendto(proactor_->get_io_queue(), 

-					 key_, data, len, 

-					 &addr, sizeof(PJ_INET_Addr));

-    if (bytes_status >= 0) {

-	on_write_complete(bytes_status);

-	return true;

-    } else if (bytes_status == PJ_IOQUEUE_PENDING) {

-	return true;

-    } else {

-	return false;

-    }

-}

-

-

-void PJ_Event_Handler::read_complete_cb(pj_ioqueue_key_t *key, pj_ssize_t bytes_read)

-{

-    PJ_Event_Handler *handler = 

-	(PJ_Event_Handler*) pj_ioqueue_get_user_data(key);

-

-    handler->on_read_complete(bytes_read);

-}

-

-void PJ_Event_Handler::write_complete_cb(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent)

-{

-    PJ_Event_Handler *handler = 

-	(PJ_Event_Handler*) pj_ioqueue_get_user_data(key);

-

-    handler->on_write_complete(bytes_sent);

-}

-

-void PJ_Event_Handler::accept_complete_cb(pj_ioqueue_key_t *key, int status)

-{

-#if PJ_HAS_TCP

-    PJ_Event_Handler *handler = 

-	(PJ_Event_Handler*) pj_ioqueue_get_user_data(key);

-

-    handler->on_accept_complete(status);

-#endif

-}

-

-void PJ_Event_Handler::connect_complete_cb(pj_ioqueue_key_t *key, int status)

-{

-#if PJ_HAS_TCP

-    PJ_Event_Handler *handler = 

-	(PJ_Event_Handler*) pj_ioqueue_get_user_data(key);

-

-    handler->on_connect_complete(status);

-#endif

-}

-

-void PJ_Event_Handler::timer_callback( pj_timer_heap_t *timer_heap,

-				       struct pj_timer_entry *entry)

-{

-    PJ_Event_Handler *handler = (PJ_Event_Handler*) entry->user_data;

-    handler->on_timeout(entry->id);

-}

-

-

-PJ_Proactor *PJ_Proactor::create(PJ_Pool *pool, pj_size_t max_fd, 

-				 pj_size_t timer_entry_count, unsigned timer_flags)

-{

-    PJ_Proactor *p = (PJ_Proactor*) pool->calloc(1, sizeof(PJ_Proactor));

-    if (!p) return NULL;

-

-    p->ioq_ = pj_ioqueue_create(pool->pool_(), max_fd);

-    if (!p->ioq_) return NULL;

-

-    p->th_ = pj_timer_heap_create(pool->pool_(), timer_entry_count, timer_flags);

-    if (!p->th_) return NULL;

-

-    return p;

-}

-

-void PJ_Proactor::destroy()

-{

-    pj_ioqueue_destroy(ioq_);

-}

-

-bool PJ_Proactor::register_handler(PJ_Pool *pool, PJ_Event_Handler *handler)

-{

-    pj_assert(handler->key_ == NULL && handler->proactor_ == NULL);

-

-    if (handler->key_ != NULL) 

-	return false;

-

-    handler->key_ = pj_ioqueue_register_sock(pool->pool_(), ioq_, 

-                                             handler->get_handle(), 

-					     handler, &ioqueue_cb);

-    if (handler->key_ != NULL) {

-	handler->proactor_ = this;

-	return true;

-    } else {

-	return false;

-    }

-}

-

-void PJ_Proactor::unregister_handler(PJ_Event_Handler *handler)

-{

-    if (handler->key_ == NULL) return;

-    pj_ioqueue_unregister(ioq_, handler->key_);

-    handler->key_ = NULL;

-    handler->proactor_ = NULL;

-}

-

-bool PJ_Proactor::schedule_timer( pj_timer_heap_t *timer, PJ_Event_Handler *handler,

-				  const PJ_Time_Val &delay, int id)

-{

-    handler->timer_.id = id;

-    return pj_timer_heap_schedule(timer, &handler->timer_, &delay) == 0;

-}

-

-bool PJ_Proactor::schedule_timer(PJ_Event_Handler *handler, const PJ_Time_Val &delay, 

-				 int id)

-{

-    return schedule_timer(th_, handler, delay, id);

-}

-

-bool PJ_Proactor::cancel_timer(PJ_Event_Handler *handler)

-{

-    return pj_timer_heap_cancel(th_, &handler->timer_) == 1;

-}

-

-bool PJ_Proactor::handle_events(PJ_Time_Val *max_timeout)

-{

-    pj_time_val timeout;

-

-    timeout.sec = timeout.msec = 0; /* timeout is 'out' var. */

-

-    if (pj_timer_heap_poll( th_, &timeout ) > 0)

-	return true;

-

-    if (timeout.sec < 0) timeout.sec = PJ_MAXINT32;

-

-    /* If caller specifies maximum time to wait, then compare the value with

-     * the timeout to wait from timer, and use the minimum value.

-     */

-    if (max_timeout && PJ_TIME_VAL_GT(timeout, *max_timeout)) {

-	timeout = *max_timeout;

-    }

-

-    /* Poll events in ioqueue. */

-    int result;

-

-    result = pj_ioqueue_poll(ioq_, &timeout);

-    if (result != 1)

-	return false;

-

-    return true;

-}

-

-pj_ioqueue_t *PJ_Proactor::get_io_queue()

-{

-    return ioq_;

-}

-

-pj_timer_heap_t *PJ_Proactor::get_timer_heap()

-{

-    return th_;

-}

-

diff --git a/pjlib/src/pj/errno.c b/pjlib/src/pj/errno.c
index 682d9b6..25bf5b7 100644
--- a/pjlib/src/pj/errno.c
+++ b/pjlib/src/pj/errno.c
@@ -29,7 +29,8 @@
     { PJ_ETOOMANY,      "Too many objects of the specified type"},
     { PJ_EBUSY,         "Object is busy"},
     { PJ_ENOTSUP,	"Option/operation is not supported"},
-    { PJ_EINVALIDOP,	"Invalid operation"}
+    { PJ_EINVALIDOP,	"Invalid operation"},

+    { PJ_ECANCELLED,    "Operation cancelled"}
 };
 
 /*
diff --git a/pjlib/src/pj/file_io_ansi.c b/pjlib/src/pj/file_io_ansi.c
index f95c74a..0946edd 100644
--- a/pjlib/src/pj/file_io_ansi.c
+++ b/pjlib/src/pj/file_io_ansi.c
@@ -66,7 +66,8 @@
 
     clearerr((FILE*)fd);
     written = fwrite(data, 1, *size, (FILE*)fd);
-    if (ferror((FILE*)fd)) {
+    if (ferror((FILE*)fd)) {

+        *size = -1;
         return PJ_RETURN_OS_ERROR(errno);
     }
 
@@ -82,7 +83,8 @@
 
     clearerr((FILE*)fd);
     bytes = fread(data, 1, *size, (FILE*)fd);
-    if (ferror((FILE*)fd)) {
+    if (ferror((FILE*)fd)) {

+        *size = -1;
         return PJ_RETURN_OS_ERROR(errno);
     }
 
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c
index 4cffcae..75774ed 100644
--- a/pjlib/src/pj/ioqueue_common_abs.c
+++ b/pjlib/src/pj/ioqueue_common_abs.c
@@ -227,7 +227,9 @@
          * so that send() can work in parallel.
          */
         if (h->fd_type == PJ_SOCK_DGRAM) {
-            pj_list_erase(write_op);
+            pj_list_erase(write_op);

+            write_op->op = 0;

+
             if (pj_list_empty(&h->write_list))
                 ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
 
@@ -267,7 +269,8 @@
         {
             if (h->fd_type != PJ_SOCK_DGRAM) {
                 /* Write completion of the whole stream. */
-                pj_list_erase(write_op);
+                pj_list_erase(write_op);

+                write_op->op = 0;
 
                 /* Clear operation if there's no more data to send. */
                 if (pj_list_empty(&h->write_list))
@@ -313,7 +316,8 @@
 	
         /* Get one accept operation from the list. */
 	accept_op = h->accept_list.next;
-        pj_list_erase(accept_op);
+        pj_list_erase(accept_op);

+        accept_op->op = 0;
 
 	/* Clear bit in fdset if there is no more pending accept */
         if (pj_list_empty(&h->accept_list))
@@ -346,7 +350,8 @@
 
         /* Get one pending read operation from the list. */
         read_op = h->read_list.next;
-        pj_list_erase(read_op);
+        pj_list_erase(read_op);

+        read_op->op = 0;
 
         /* Clear fdset if there is no pending read. */
         if (pj_list_empty(&h->read_list))
@@ -475,6 +480,9 @@
 
     PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
     PJ_CHECK_STACK();
+

+    read_op = (struct read_operation*)op_key;

+    read_op->op = 0;

 
     /* Try to see if there's data immediately available. 
      */
@@ -496,8 +504,6 @@
      * No data is immediately available.
      * Must schedule asynchronous operation to the ioqueue.
      */
-    read_op = (struct read_operation*)op_key;
-
     read_op->op = PJ_IOQUEUE_OP_RECV;
     read_op->buf = buffer;
     read_op->size = *length;
@@ -530,6 +536,9 @@
 
     PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
     PJ_CHECK_STACK();
+

+    read_op = (struct read_operation*)op_key;

+    read_op->op = 0;

 
     /* Try to see if there's data immediately available. 
      */
@@ -552,8 +561,6 @@
      * No data is immediately available.
      * Must schedule asynchronous operation to the ioqueue.
      */
-    read_op = (struct read_operation*)op_key;
-
     read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
     read_op->buf = buffer;
     read_op->size = *length;
@@ -586,6 +593,9 @@
 
     PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
     PJ_CHECK_STACK();
+

+    write_op = (struct write_operation*)op_key;

+    write_op->op = 0;

 
     /* Fast track:
      *   Try to send data immediately, only if there's no pending write!
@@ -624,7 +634,6 @@
     /*
      * Schedule asynchronous send.
      */
-    write_op = (struct write_operation*)op_key;
     write_op->op = PJ_IOQUEUE_OP_SEND;
     write_op->buf = (void*)data;
     write_op->size = *length;
@@ -659,6 +668,9 @@
 
     PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
     PJ_CHECK_STACK();
+

+    write_op = (struct write_operation*)op_key;

+    write_op->op = 0;

 
     /* Fast track:
      *   Try to send data immediately, only if there's no pending write!
@@ -702,7 +714,6 @@
     /*
      * Schedule asynchronous send.
      */
-    write_op = (struct write_operation*)op_key;
     write_op->op = PJ_IOQUEUE_OP_SEND_TO;
     write_op->buf = (void*)data;
     write_op->size = *length;
@@ -735,6 +746,9 @@
 
     /* check parameters. All must be specified! */
     PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
+

+    accept_op = (struct accept_operation*)op_key;

+    accept_op->op = 0;

 
     /* Fast track:
      *  See if there's new connection available immediately.
@@ -767,8 +781,6 @@
      * Schedule accept() operation to be completed when there is incoming
      * connection available.
      */
-    accept_op = (struct accept_operation*)op_key;
-
     accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
     accept_op->accept_fd = new_sock;
     accept_op->rmt_addr = remote;
@@ -821,3 +833,82 @@
 }
 #endif	/* PJ_HAS_TCP */
 
+/*

+ * pj_ioqueue_is_pending()

+ */

+PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,

+                                         pj_ioqueue_op_key_t *op_key )

+{

+    struct generic_operation *op_rec;

+

+    PJ_UNUSED_ARG(key);

+

+    op_rec = (struct generic_operation*)op_key;

+    return op_rec->op != 0;

+}

+

+

+/*

+ * pj_ioqueue_post_completion()

+ */

+PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,

+                                                pj_ioqueue_op_key_t *op_key,

+                                                pj_ssize_t bytes_status )

+{

+    struct generic_operation *op_rec;

+

+    /*

+     * Find the operation key in all pending operation list to

+     * really make sure that it's still there; then call the callback.

+     */

+    pj_mutex_lock(key->mutex);

+

+    /* Find the operation in the pending read list. */

+    op_rec = (struct generic_operation*)key->read_list.next;

+    while (op_rec != (void*)&key->read_list) {

+        if (op_rec == (void*)op_key) {

+            pj_list_erase(op_rec);

+            op_rec->op = 0;

+            pj_mutex_unlock(key->mutex);

+

+            (*key->cb.on_read_complete)(key, op_key, bytes_status);

+            return PJ_SUCCESS;

+        }

+        op_rec = op_rec->next;

+    }

+

+    /* Find the operation in the pending write list. */

+    op_rec = (struct generic_operation*)key->write_list.next;

+    while (op_rec != (void*)&key->write_list) {

+        if (op_rec == (void*)op_key) {

+            pj_list_erase(op_rec);

+            op_rec->op = 0;

+            pj_mutex_unlock(key->mutex);

+

+            (*key->cb.on_write_complete)(key, op_key, bytes_status);

+            return PJ_SUCCESS;

+        }

+        op_rec = op_rec->next;

+    }

+

+    /* Find the operation in the pending accept list. */

+    op_rec = (struct generic_operation*)key->accept_list.next;

+    while (op_rec != (void*)&key->accept_list) {

+        if (op_rec == (void*)op_key) {

+            pj_list_erase(op_rec);

+            op_rec->op = 0;

+            pj_mutex_unlock(key->mutex);

+

+            (*key->cb.on_accept_complete)(key, op_key, 

+                                          PJ_INVALID_SOCKET,

+                                          bytes_status);

+            return PJ_SUCCESS;

+        }

+        op_rec = op_rec->next;

+    }

+

+    pj_mutex_unlock(key->mutex);

+    

+    return PJ_EINVALIDOP;

+}

+

diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c
index 7944953..93cbb6d 100644
--- a/pjlib/src/pj/ioqueue_winnt.c
+++ b/pjlib/src/pj/ioqueue_winnt.c
@@ -891,3 +891,37 @@
 }
 #endif	/* #if PJ_HAS_TCP */
 
+

+

+PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,

+                                         pj_ioqueue_op_key_t *op_key )

+{

+    BOOL rc;

+    DWORD bytesTransfered;

+

+    rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key,

+                              &bytesTransfered, FALSE );

+

+    if (rc == FALSE) {

+        return GetLastError()==ERROR_IO_INCOMPLETE;

+    }

+

+    return FALSE;

+}

+

+

+PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,

+                                                pj_ioqueue_op_key_t *op_key,

+                                                pj_ssize_t bytes_status )

+{

+    BOOL rc;

+

+    rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status,

+                                    (long)key, (OVERLAPPED*)op_key );

+    if (rc == FALSE) {

+        return PJ_RETURN_OS_ERROR(GetLastError());

+    }

+

+    return PJ_SUCCESS;

+}

+

diff --git a/pjlib/src/pj/os_core_unix.c b/pjlib/src/pj/os_core_unix.c
index 3892b64..cc57aab 100644
--- a/pjlib/src/pj/os_core_unix.c
+++ b/pjlib/src/pj/os_core_unix.c
@@ -558,54 +558,91 @@
     return oldval;
 }
 
+/*

+ * pj_atomic_inc_and_get()

+ */

+PJ_DEF(pj_atomic_value_t) pj_atomic_inc_and_get(pj_atomic_t *atomic_var)

+{

+    pj_atomic_value_t new_value;

+

+    PJ_CHECK_STACK();

+

+#if PJ_HAS_THREADS

+    pj_mutex_lock( atomic_var->mutex );

+#endif

+    new_value = ++atomic_var->value;

+#if PJ_HAS_THREADS

+    pj_mutex_unlock( atomic_var->mutex);

+#endif

+

+    return new_value;

+}

 /*
  * pj_atomic_inc()
  */
 PJ_DEF(void) pj_atomic_inc(pj_atomic_t *atomic_var)
 {
-    PJ_CHECK_STACK();
-
-#if PJ_HAS_THREADS
-    pj_mutex_lock( atomic_var->mutex );
-#endif
-    ++atomic_var->value;
-#if PJ_HAS_THREADS
-    pj_mutex_unlock( atomic_var->mutex);
-#endif
+    pj_atomic_inc_and_get(atomic_var);
 }
 
 /*
- * pj_atomic_dec()
+ * pj_atomic_dec_and_get()
  */
-PJ_DEF(void) pj_atomic_dec(pj_atomic_t *atomic_var)
-{
+PJ_DEF(pj_atomic_value_t) pj_atomic_dec_and_get(pj_atomic_t *atomic_var)
+{

+    pj_atomic_value_t new_value;

+
     PJ_CHECK_STACK();
 
 #if PJ_HAS_THREADS
     pj_mutex_lock( atomic_var->mutex );
 #endif
-    --atomic_var->value;
+    new_value = --atomic_var->value;
 #if PJ_HAS_THREADS
     pj_mutex_unlock( atomic_var->mutex);
-#endif
+#endif

+

+    return new_value;
 }
+

+/*

+ * pj_atomic_dec()

+ */

+PJ_DEF(void) pj_atomic_dec(pj_atomic_t *atomic_var)

+{

+    pj_atomic_dec_and_get(atomic_var);

+}

 
 /*
- * pj_atomic_add()
+ * pj_atomic_add_and_get()
  */ 
-PJ_DEF(void) pj_atomic_add( pj_atomic_t *atomic_var, pj_atomic_value_t value )
-{
+PJ_DEF(pj_atomic_value_t) pj_atomic_add_and_get( pj_atomic_t *atomic_var, 

+                                                 pj_atomic_value_t value )
+{

+    pj_atomic_value_t new_value;

+
 #if PJ_HAS_THREADS
     pj_mutex_lock(atomic_var->mutex);
 #endif
     
-    atomic_var->value += value;
+    atomic_var->value += value;

+    new_value = atomic_var->value;
 
 #if PJ_HAS_THREADS
     pj_mutex_unlock(atomic_var->mutex);
-#endif
+#endif

+

+    return new_value;
 }
 
+/*

+ * pj_atomic_add()

+ */ 

+PJ_DEF(void) pj_atomic_add( pj_atomic_t *atomic_var, 

+                            pj_atomic_value_t value )

+{

+    pj_atomic_add_and_get(atomic_var, value);

+}

 
 ///////////////////////////////////////////////////////////////////////////////
 /*
diff --git a/pjlib/src/pj/os_core_win32.c b/pjlib/src/pj/os_core_win32.c
index 68416c6..be770d5 100644
--- a/pjlib/src/pj/os_core_win32.c
+++ b/pjlib/src/pj/os_core_win32.c
@@ -512,32 +512,48 @@
     return atomic_var->value;
 }
 
+/*

+ * pj_atomic_inc_and_get()

+ */

+PJ_DEF(pj_atomic_value_t) pj_atomic_inc_and_get(pj_atomic_t *atomic_var)

+{

+    PJ_CHECK_STACK();

+

+#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400

+    return InterlockedIncrement(&atomic_var->value);

+#else

+#   error Fix Me

+#endif

+}

+

 /*
  * pj_atomic_inc()
  */
 PJ_DEF(void) pj_atomic_inc(pj_atomic_t *atomic_var)
-{
-    PJ_CHECK_STACK();
-
-#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400
-    InterlockedIncrement(&atomic_var->value);
-#else
-#   error Fix Me
-#endif
+{

+    pj_atomic_inc_and_get(atomic_var);
 }
-
+

+/*

+ * pj_atomic_dec_and_get()

+ */

+PJ_DEF(pj_atomic_value_t) pj_atomic_dec_and_get(pj_atomic_t *atomic_var)

+{

+    PJ_CHECK_STACK();

+

+#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400

+    return InterlockedDecrement(&atomic_var->value);

+#else

+#   error Fix me

+#endif

+}

+

 /*
  * pj_atomic_dec()
  */
 PJ_DEF(void) pj_atomic_dec(pj_atomic_t *atomic_var)
 {
-    PJ_CHECK_STACK();
-
-#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400
-    InterlockedDecrement(&atomic_var->value);
-#else
-#   error Fix me
-#endif
+    pj_atomic_dec_and_get(atomic_var);
 }
 
 /*
@@ -546,10 +562,27 @@
 PJ_DEF(void) pj_atomic_add( pj_atomic_t *atomic_var,
 			    pj_atomic_value_t value )
 {
-    InterlockedExchangeAdd( &atomic_var->value, value );
+#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400

+    InterlockedExchangeAdd( &atomic_var->value, value );

+#else

+#   error Fix me

+#endif
+}
+

+/*

+ * pj_atomic_add_and_get()

+ */

+PJ_DEF(pj_atomic_value_t) pj_atomic_add_and_get( pj_atomic_t *atomic_var,

+			                         pj_atomic_value_t value)

+{

+#if defined(PJ_WIN32_WINNT) && PJ_WIN32_WINNT >= 0x0400

+    long oldValue = InterlockedExchangeAdd( &atomic_var->value, value);

+    return oldValue + value;

+#else

+#   error Fix me

+#endif

 }
 
-	
 ///////////////////////////////////////////////////////////////////////////////
 /*
  * pj_thread_local_alloc()
diff --git a/pjlib/src/pj/timer.c b/pjlib/src/pj/timer.c
index 9fa190a..ffec1f4 100644
--- a/pjlib/src/pj/timer.c
+++ b/pjlib/src/pj/timer.c
@@ -14,9 +14,13 @@
 #include <pj/string.h>
 #include <pj/assert.h>
 #include <pj/errno.h>
+#include <pj/lock.h>

 
 #define HEAP_PARENT(X)	(X == 0 ? 0 : (((X) - 1) / 2))
 #define HEAP_LEFT(X)	(((X)+(X))+1)
+

+

+#define DEFAULT_MAX_TIMED_OUT_PER_POLL  (64)

 
 
 /**
@@ -32,9 +36,15 @@
 
     /** Current size of the heap. */
     pj_size_t cur_size;
+

+    /** Max timed out entries to process per poll. */

+    unsigned max_entries_per_poll;

 
-    /** Mutex for synchronization, or NULL */
-    pj_mutex_t *mutex;
+    /** Lock object. */
+    pj_lock_t *lock;
+

+    /** Autodelete lock. */

+    pj_bool_t auto_delete_lock;

 
     /**
      * Current contents of the Heap, which is organized as a "heap" of
@@ -71,15 +81,15 @@
 
 PJ_INLINE(void) lock_timer_heap( pj_timer_heap_t *ht )
 {
-    if (ht->mutex) {
-	pj_mutex_lock(ht->mutex);
+    if (ht->lock) {
+	pj_lock_acquire(ht->lock);
     }
 }
 
 PJ_INLINE(void) unlock_timer_heap( pj_timer_heap_t *ht )
 {
-    if (ht->mutex) {
-	pj_mutex_unlock(ht->mutex);
+    if (ht->lock) {
+	pj_lock_release(ht->lock);
     }
 }
 
@@ -319,7 +329,7 @@
            sizeof(pj_timer_heap_t) + 
            /* size of each entry: */
            (count+2) * (sizeof(pj_timer_entry*)+sizeof(pj_timer_id_t)) +
-           /* mutex, pool etc: */
+           /* lock, pool etc: */
            132;
 }
 
@@ -328,7 +338,6 @@
  */
 PJ_DEF(pj_status_t) pj_timer_heap_create( pj_pool_t *pool,
 					  pj_size_t size,
-					  unsigned flag,
                                           pj_timer_heap_t **p_heap)
 {
     pj_timer_heap_t *ht;
@@ -348,23 +357,14 @@
 
     /* Initialize timer heap sizes */
     ht->max_size = size;
-    ht->cur_size = 0;
+    ht->cur_size = 0;

+    ht->max_entries_per_poll = DEFAULT_MAX_TIMED_OUT_PER_POLL;
     ht->timer_ids_freelist = 1;
-    ht->pool = pool;
+    ht->pool = pool;

 
-    /* Mutex. */
-    if (flag & PJ_TIMER_HEAP_NO_SYNCHRONIZE) {
-	ht->mutex = NULL;
-    } else {
-        pj_status_t rc;
-
-	/* Mutex must be the recursive types. 
-         * See commented code inside pj_timer_heap_poll() 
-         */
-	rc = pj_mutex_create(pool, "tmhp%p", PJ_MUTEX_RECURSE, &ht->mutex);
-	if (rc != PJ_SUCCESS)
-	    return rc;
-    }
+    /* Lock. */
+    ht->lock = NULL;

+    ht->auto_delete_lock = 0;

 
     // Create the heap array.
     ht->heap = pj_pool_alloc(pool, sizeof(pj_timer_entry*) * size);
@@ -385,6 +385,34 @@
     *p_heap = ht;
     return PJ_SUCCESS;
 }
+

+PJ_DEF(void) pj_timer_heap_destroy( pj_timer_heap_t *ht )

+{

+    if (ht->lock && ht->auto_delete_lock) {

+        pj_lock_destroy(ht->lock);

+        ht->lock = NULL;

+    }

+}

+

+PJ_DEF(void) pj_timer_heap_set_lock(  pj_timer_heap_t *ht,

+                                      pj_lock_t *lock,

+                                      pj_bool_t auto_del )

+{

+    if (ht->lock && ht->auto_delete_lock)

+        pj_lock_destroy(ht->lock);

+

+    ht->lock = lock;

+    ht->auto_delete_lock = auto_del;

+}

+

+

+PJ_DEF(unsigned) pj_timer_heap_set_max_timed_out_per_poll(pj_timer_heap_t *ht,

+                                                          unsigned count )

+{

+    unsigned old_count = ht->max_entries_per_poll;

+    ht->max_entries_per_poll = count;

+    return old_count;

+}

 
 PJ_DEF(pj_timer_entry*) pj_timer_entry_init( pj_timer_entry *entry,
                                              int id,
@@ -433,12 +461,13 @@
     return count;
 }
 
-PJ_DEF(int) pj_timer_heap_poll( pj_timer_heap_t *ht, pj_time_val *next_delay )
+PJ_DEF(unsigned) pj_timer_heap_poll( pj_timer_heap_t *ht, 

+                                     pj_time_val *next_delay )
 {
     pj_time_val now;
-    int count;
+    unsigned count;
 
-    PJ_ASSERT_RETURN(ht, -1);
+    PJ_ASSERT_RETURN(ht, 0);
 
     if (!ht->cur_size && next_delay) {
 	next_delay->sec = next_delay->msec = PJ_MAXINT32;
@@ -450,16 +479,15 @@
 
     lock_timer_heap(ht);
     while ( ht->cur_size && 
-	    PJ_TIME_VAL_LTE(ht->heap[0]->_timer_value, now) ) 
+	    PJ_TIME_VAL_LTE(ht->heap[0]->_timer_value, now) &&

+            count < ht->max_entries_per_poll ) 
     {
 	pj_timer_entry *node = remove_node(ht, 0);
 	++count;
 
-	//Better not to temporarily release mutex to save some syscalls.
-	//But then make sure the mutex must be the recursive types (PJ_MUTEX_RECURSE)!
-	//unlock_timer_heap(ht);
+	unlock_timer_heap(ht);
 	(*node->cb)(ht, node);
-	//lock_timer_heap(ht);
+	lock_timer_heap(ht);
     }
     if (ht->cur_size && next_delay) {
 	*next_delay = ht->heap[0]->_timer_value;
@@ -473,7 +501,9 @@
 }
 
 PJ_DEF(pj_size_t) pj_timer_heap_count( pj_timer_heap_t *ht )
-{
+{

+    PJ_ASSERT_RETURN(ht, 0);

+
     return ht->cur_size;
 }
 
diff --git a/pjlib/src/pjlib++-test/main.cpp b/pjlib/src/pjlib++-test/main.cpp
new file mode 100644
index 0000000..4a6d0aa
--- /dev/null
+++ b/pjlib/src/pjlib++-test/main.cpp
@@ -0,0 +1,29 @@
+#include <pj++/file.hpp>

+#include <pj++/list.hpp>

+#include <pj++/lock.hpp>

+#include <pj++/hash.hpp>

+#include <pj++/os.hpp>

+#include <pj++/proactor.hpp>

+#include <pj++/sock.hpp>

+#include <pj++/string.hpp>

+#include <pj++/timer.hpp>

+#include <pj++/tree.hpp>

+

+int main()

+{

+    Pjlib lib;

+    Pj_Caching_Pool mem;

+    Pj_Pool the_pool;

+    Pj_Pool *pool = &the_pool;

+    

+    the_pool.attach(mem.create_pool(4000,4000));

+

+    Pj_Semaphore_Lock lsem(pool);

+    Pj_Semaphore_Lock *plsem;

+

+    plsem = new(pool) Pj_Semaphore_Lock(pool);

+    delete plsem;

+

+    return 0;

+}

+