blob: c11a1653e6ed6bbc4c9971f0263dc75069be8e7e [file] [log] [blame]
Emeric Vigier2f625822012-08-06 11:09:52 -04001// Copyright (C) 1999-2005 Open Source Telecom Corporation.
2// Copyright (C) 2006-2010 David Sugar, Tycho Softworks.
3//
4// This program is free software; you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation; either version 2 of the License, or
7// (at your option) any later version.
8//
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13//
14// You should have received a copy of the GNU General Public License
15// along with this program; if not, write to the Free Software
16// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17//
18// As a special exception, you may use this file as part of a free software
19// library without restriction. Specifically, if other files instantiate
20// templates or use macros or inline functions from this file, or you compile
21// this file and link it with other files to produce an executable, this
22// file does not by itself cause the resulting executable to be covered by
23// the GNU General Public License. This exception does not however
24// invalidate any other reasons why the executable file might be covered by
25// the GNU General Public License.
26//
27// This exception applies only to the code released under the name GNU
28// Common C++. If you copy code from other releases into a copy of GNU
29// Common C++, as the General Public License permits, the exception does
30// not apply to the code that you add in this way. To avoid misleading
31// anyone as to the status of such modified files, you must delete
32// this exception notice from them.
33//
34// If you write modifications of your own for GNU Common C++, it is your choice
35// whether to permit this exception to apply to your modifications.
36// If you do not wish that, delete this exception notice.
37//
38
39/**
40 * @file buffer.h
41 * @short object passing services between threads.
42 **/
43
44#ifndef CCXX_BUFFER_H_
45#define CCXX_BUFFER_H_
46
47#ifndef CCXX_THREAD_H_
48#include <cc++/thread.h>
49#endif
50#ifndef CCXX_STRING_H_
51#include <cc++/string.h>
52#endif
53#ifdef CCXX_NAMESPACES
54namespace ost {
55#endif
56
57/**
58 * The buffer class represents an IPC service that is built upon a buffer
59 * of fixed capacity that can be used to transfer objects between one or
60 * more producer and consumer threads. Producer threads post objects
61 * into the buffer, and consumer threads wait for and receive objects from
62 * the buffer. Semaphores are used to to block the buffer from overflowing
63 * and indicate when there is data available, and mutexes are used to protect
64 * multiple consumers and producer threads from stepping over each other.
65 *
66 * The buffer class is an abstract class in that the actual data being
67 * buffered is not directly specified within the buffer class itself. The
68 * buffer class should be used as a base class for a class that actually
69 * impliments buffering and which may be aware of the data types actually
70 * are being buffered. A template class could be created based on buffer
71 * for this purpose. Another possibility is to create a class derived
72 * from both Thread and Buffer which can be used to implement message passing
73 * threads.
74 *
75 * @author David Sugar <dyfet@ostel.com>
76 * @short Producer/Consumer buffer for use between threads.
77 */
78#ifdef WIN32
79class __EXPORT Buffer : public Mutex
80#else
81class __EXPORT Buffer : public Conditional
82#endif
83{
84private:
85#ifdef WIN32
86 HANDLE sem_head, sem_tail;
87#endif
88 size_t _size;
89 size_t _used;
90
91protected:
92 /**
93 * Invoke derived class buffer peeking method.
94 * @return size of object found.
95 * @param buf pointer to copy contents of head of buffer to.
96 */
97 virtual size_t onPeek(void *buf) = 0;
98
99 /**
100 * Invoke derived class object request from buffer.
101 * @return size of object returned.
102 * @param buf pointer to hold object returned from the buffer.
103 */
104 virtual size_t onWait(void *buf) = 0;
105
106 /**
107 * Invoke derived class posting of object to buffer.
108 * @return size of object posted.
109 * @param buf pointer to object being posted to the buffer.
110 */
111 virtual size_t onPost(void *buf) = 0;
112
113public:
114 /**
115 * value to return when a timed operation returned with a
116 * timeout.
117 */
118 static const size_t timeout;
119
120 /**
121 * Create a buffer object of known capacity.
122 * @param capacity is the integer capacity of the buffer.
123 */
124 Buffer(size_t capacity);
125 /**
126 * In derived functions, may be used to free the actual memory
127 * used to hold buffered data.
128 */
129 virtual ~Buffer();
130
131 /**
132 * Return the capacity of the buffer as specified at creation.
133 * @return size of buffer.
134 */
135 inline size_t getSize(void)
136 {return _size;};
137
138 /**
139 * Return the current capacity in use for the buffer. Free space
140 * is technically getSize() - getUsed().
141 * @return integer used capacity of the buffer.
142 * @see #getSize
143 */
144 inline size_t getUsed(void)
145 {return _used;};
146
147 /**
148 * Let one or more threads wait for an object to become available
149 * in the buffer. The waiting thread(s) will wait forever if no
150 * object is ever placed into the buffer.
151 *
152 * @return size of object passed by buffer in bytes.
153 * @param buf pointer to store object retrieved from the buffer.
154 * @param timeout time to wait.
155 */
156 size_t wait(void *buf, timeout_t timeout = 0);
157
158 /**
159 * Post an object into the buffer and enable a waiting thread to
160 * receive it.
161 *
162 * @return size of object posted in bytes.
163 * @param buf pointer to object to store in the buffer.
164 * @param timeout time to wait.
165 */
166 size_t post(void *buf, timeout_t timeout = 0);
167
168 /**
169 * Peek at the current content (first object) in the buffer.
170 *
171 * @return size of object in the buffer.
172 * @param buf pointer to store object found in the buffer.
173 */
174 size_t peek(void *buf);
175
176 /**
177 * New virtual to test if buffer is a valid object.
178 * @return true if object is valid.
179 */
180 virtual bool isValid(void);
181};
182
183/**
184 * A buffer class that holds a known capacity of fixed sized objects defined
185 * during creation.
186 *
187 * @author David Sugar <dyfet@ostel.com>
188 * @short producer/consumer buffer for fixed size objects.
189 */
190class __EXPORT FixedBuffer : public Buffer
191{
192private:
193 char *buf, *head, *tail;
194 size_t objsize;
195
196protected:
197 /**
198 * Return the first object in the buffer.
199 * @return predefined size of this buffers objects.
200 * @param buf pointer to copy contents of head of buffer to.
201 */
202 size_t onPeek(void *buf);
203
204 /**
205 * Wait for and return a fixed object in the buffer.
206 * @return predefined size of this buffers objects.
207 * @param buf pointer to hold object returned from the buffer.
208 */
209 size_t onWait(void *buf);
210
211 /**
212 * Post an object of the appropriate size into the buffer.
213 * @return predefined size of this buffers objects.
214 * @param buf pointer to data to copy into the buffer.
215 */
216 size_t onPost(void *buf);
217
218public:
219 /**
220 * Create a buffer of known capacity for objects of a specified
221 * size.
222 *
223 * @param capacity of the buffer.
224 * @param objsize for each object held in the buffer.
225 */
226 FixedBuffer(size_t capacity, size_t objsize);
227
228 /**
229 * Create a copy of an existing fixed size buffer and duplicate
230 * it's contents.
231 *
232 * @param fb existing FixedBuffer object.
233 */
234 FixedBuffer(const FixedBuffer &fb);
235
236 /**
237 * Destroy the fixed buffer and free the memory used to store objects.
238 */
239 virtual ~FixedBuffer();
240
241 FixedBuffer &operator=(const FixedBuffer &fb);
242
243 bool isValid(void);
244};
245
246/**
247 * Somewhat generic queue processing class to establish a producer
248 * consumer queue. This may be used to buffer cdr records, or for
249 * other purposes where an in-memory queue is needed for rapid
250 * posting. This class is derived from Mutex and maintains a linked
251 * list. A thread is used to dequeue data and pass it to a callback
252 * method that is used in place of "run" for each item present on the
253 * queue. The conditional is used to signal the run thread when new
254 * data is posted.
255 *
256 * This class was changed by Angelo Naselli to have a timeout on the queue
257 *
258 * @short in memory data queue interface.
259 * @author David Sugar <dyfet@ostel.com>
260 */
261class __EXPORT ThreadQueue : public Mutex, public Thread, public Semaphore
262{
263private:
264 void run(void); // private run method
265
266protected:
267 typedef struct _data {
268 struct _data *next;
269 unsigned len;
270 char data[1];
271 } data_t;
272
273 timeout_t timeout;
274 bool started;
275
276 data_t *first, *last; // head/tail of list
277
278 String name;
279
280 /*
281 * Overloading of final(). It demarks Semaphore to avoid deadlock.
282 */
283 virtual void final();
284
285 /**
286 * Start of dequeing. Maybe we need to connect a database
287 * or something, so we have a virtual...
288 */
289 virtual void startQueue(void);
290
291 /**
292 * End of dequeing, we expect the queue is empty for now. Maybe
293 * we need to disconnect a database or something, so we have
294 * another virtual.
295 */
296 virtual void stopQueue(void);
297
298 /**
299 * A derivable method to call when the timout is expired.
300 */
301 virtual void onTimer(void);
302
303 /**
304 * Virtual callback method to handle processing of a queued
305 * data items. After the item is processed, it is deleted from
306 * memory. We can call multiple instances of runQueue in order
307 * if multiple items are waiting.
308 *
309 * @param data item being dequed.
310 */
311 virtual void runQueue(void *data) = 0;
312
313public:
314 /**
315 * Create instance of our queue and give it a process priority.
316 *
317 * @param id queue ID.
318 * @param pri process priority.
319 * @param stack stack size.
320 */
321 ThreadQueue(const char *id, int pri, size_t stack = 0);
322
323 /**
324 * Destroy the queue.
325 */
326 virtual ~ThreadQueue();
327
328 /**
329 * Set the queue timeout.
330 * When the timer expires, the onTimer() method is called
331 * for the thread
332 *
333 * @param timeout timeout in milliseconds.
334 */
335 void setTimer(timeout_t timeout);
336
337 /**
338 * Put some unspecified data into this queue. A new qd
339 * structure is created and sized to contain a copy of
340 * the actual content.
341 *
342 * @param data pointer to data.
343 * @param len size of data.
344 */
345 void post(const void *data, unsigned len);
346};
347
348
349/** @relates Buffer */
350inline size_t get(Buffer &b, void *o, timeout_t t = 0)
351 {return b.wait(o, t);}
352
353/** @relates Buffer */
354inline size_t put(Buffer &b, void *o, timeout_t t = 0)
355 {return b.post(o, t);}
356
357/** @relates Buffer */
358inline size_t peek(Buffer &b, void *o)
359 {return b.peek(o);}
360
361
362#ifdef CCXX_NAMESPACES
363}
364#endif
365
366#endif
367/** EMACS **
368 * Local variables:
369 * mode: c++
370 * c-basic-offset: 4
371 * End:
372 */