blob: abebc672184e56d70e4bb09b75ea6fb78ef182ba [file] [log] [blame]
Emeric Vigier2f625822012-08-06 11:09:52 -04001// Copyright (C) 1999-2005 Open Source Telecom Corporation.
2// Copyright (C) 2006-2010 David Sugar, Tycho Softworks.
3//
4// This program is free software; you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation; either version 2 of the License, or
7// (at your option) any later version.
8//
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13//
14// You should have received a copy of the GNU General Public License
15// along with this program; if not, write to the Free Software
16// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17//
18// As a special exception, you may use this file as part of a free software
19// library without restriction. Specifically, if other files instantiate
20// templates or use macros or inline functions from this file, or you compile
21// this file and link it with other files to produce an executable, this
22// file does not by itself cause the resulting executable to be covered by
23// the GNU General Public License. This exception does not however
24// invalidate any other reasons why the executable file might be covered by
25// the GNU General Public License.
26//
27// This exception applies only to the code released under the name GNU
28// Common C++. If you copy code from other releases into a copy of GNU
29// Common C++, as the General Public License permits, the exception does
30// not apply to the code that you add in this way. To avoid misleading
31// anyone as to the status of such modified files, you must delete
32// this exception notice from them.
33//
34// If you write modifications of your own for GNU Common C++, it is your choice
35// whether to permit this exception to apply to your modifications.
36// If you do not wish that, delete this exception notice.
37//
38
39#include <cc++/config.h>
40#include <cc++/export.h>
41#include <cc++/exception.h>
42#include <cc++/thread.h>
43#include <cc++/buffer.h>
44#include <cstdio>
45
46#ifdef CCXX_NAMESPACES
47namespace ost {
48#endif
49
50const size_t Buffer::timeout = ((size_t)(-1l));
51
52#ifdef WIN32
53Buffer::Buffer(size_t capacity) : Mutex()
54#else
55Buffer::Buffer(size_t capacity) : Conditional()
56#endif
57{
58#ifdef WIN32
59 sem_head = ::CreateSemaphore((LPSECURITY_ATTRIBUTES)NULL, 0, MAX_SEM_VALUE, (LPCTSTR)NULL);
60 sem_tail = ::CreateSemaphore((LPSECURITY_ATTRIBUTES)NULL, (LONG)capacity, MAX_SEM_VALUE, (LPCTSTR)NULL);
61#endif
62 _size = capacity;
63 _used = 0;
64}
65
66Buffer::~Buffer()
67{
68#ifdef WIN32
69 ::CloseHandle(sem_head);
70 ::CloseHandle(sem_tail);
71#endif
72}
73
74#ifdef WIN32
75
76size_t Buffer::wait(void *buf, timeout_t timeout)
77{
78 size_t rc;
79
80 if(!timeout)
81 timeout = INFINITE;
82 if(Thread::waitThread(sem_head, timeout) != WAIT_OBJECT_0)
83 return Buffer::timeout;
84 enterMutex();
85 rc = onWait(buf);
86 --_used;
87 leaveMutex();
88 ::ReleaseSemaphore(sem_tail, 1, (LPLONG)NULL);
89 return rc;
90}
91
92size_t Buffer::post(void *buf, timeout_t timeout)
93{
94 size_t rc;
95
96 if(!timeout)
97 timeout = INFINITE;
98
99 if(Thread::waitThread(sem_tail, timeout) != WAIT_OBJECT_0)
100 return Buffer::timeout;
101 enterMutex();
102 rc = onPost(buf);
103 ++_used;
104 leaveMutex();
105 ::ReleaseSemaphore(sem_head, 1, (LPLONG)NULL);
106 return rc;
107}
108
109#else
110
111size_t Buffer::wait(void *buf, timeout_t timeout)
112{
113 size_t rc = 0;
114 enterMutex();
115 while(!_used) {
116 if(!Conditional::wait(timeout, true)) {
117 leaveMutex();
118 return Buffer::timeout;
119 }
120 }
121 rc = (ssize_t)onWait(buf);
122 --_used;
123 Conditional::signal(false);
124 leaveMutex();
125 return rc;
126}
127
128size_t Buffer::post(void *buf, timeout_t timeout)
129{
130 size_t rc = 0;
131
132 enterMutex();
133 while(_used == _size) {
134 if(!Conditional::wait(timeout, true)) {
135 leaveMutex();
136 return Buffer::timeout;
137 }
138 }
139 rc = (ssize_t)onPost(buf);
140 ++_used;
141 Conditional::signal(false);
142 leaveMutex();
143 return rc;
144}
145
146size_t Buffer::peek(void *buf)
147{
148 size_t rc;
149
150 enterMutex();
151 if(!_used) {
152 leaveMutex();
153 return 0;
154 }
155 rc = onPeek(buf);
156 leaveMutex();
157 return rc;
158}
159
160#endif
161
162bool Buffer::isValid(void)
163{
164 return true;
165}
166
167FixedBuffer::FixedBuffer(size_t capacity, size_t osize) :
168Buffer(capacity)
169{
170 objsize = osize;
171 buf = new char[capacity * objsize];
172
173#ifdef CCXX_EXCEPTIONS
174 if(!buf && Thread::getException() == Thread::throwObject)
175 throw(this);
176#ifdef COMMON_STD_EXCEPTION
177 else if(!buf && Thread::getException() == Thread::throwException)
178 throw(SyncException("fixed buffer failure"));
179#endif
180#endif
181
182 head = tail = buf;
183}
184
185FixedBuffer::~FixedBuffer()
186{
187 if(buf)
188 delete[] buf;
189}
190
191bool FixedBuffer::isValid(void)
192{
193 if(head && tail)
194 return true;
195
196 return false;
197}
198
199#define MAXBUF (buf + (getSize() * objsize))
200
201size_t FixedBuffer::onWait(void *data)
202{
203 memcpy(data, head, objsize);
204 if((head += objsize) >= MAXBUF)
205 head = buf;
206 return objsize;
207}
208
209size_t FixedBuffer::onPost(void *data)
210{
211 memcpy(tail, data, objsize);
212 if((tail += objsize) >= MAXBUF)
213 tail = buf;
214 return objsize;
215}
216
217size_t FixedBuffer::onPeek(void *data)
218{
219 memcpy(data, head, objsize);
220 return objsize;
221}
222
223ThreadQueue::ThreadQueue(const char *id, int pri, size_t stack) :
224Mutex(), Thread(pri, stack), Semaphore(), name(id)
225{
226 first = last = NULL;
227 started = false;
228 timeout = 0;
229}
230
231ThreadQueue::~ThreadQueue()
232{
233 data_t *data, *next;
234 if(started) {
235 started = false;
236 }
237 data = first;
238 while(data) {
239 next = data->next;
240 delete[] data;
241 data = next;
242 }
243}
244
245void ThreadQueue::run(void)
246{
247 bool posted;
248 data_t *prev;
249 started = true;
250 for(;;) {
251 posted = Semaphore::wait(timeout);
252 if(!posted) {
253 onTimer();
254 if(!first)
255 continue;
256 }
257 if(!started)
258 sleep((timeout_t)~0);
259 startQueue();
260 while(first) {
261 runQueue(first->data);
262 enterMutex();
263 prev = first;
264 first = first->next;
265 delete[] prev;
266 if(!first)
267 last = NULL;
268 leaveMutex();
269 if(first)
270 Semaphore::wait(); // demark semaphore
271 }
272 stopQueue();
273 }
274}
275
276void ThreadQueue::final()
277{
278#ifndef WIN32
279 // Unlock is needed to unlock the mutex in the case of a cancel during Semaphore::wait()
280 // see PTHREAD_COND_TIMEDWAIT(3P)
281 Semaphore::force_unlock_after_cancellation();
282#endif
283}
284
285void ThreadQueue::onTimer(void)
286{
287}
288
289void ThreadQueue::setTimer(timeout_t timed)
290{
291 enterMutex();
292 timeout = timed;
293 leaveMutex();
294 if(!started) {
295 start();
296 started = true;
297 }
298 else if(!first)
299 Semaphore::post();
300}
301
302void ThreadQueue::post(const void *dp, unsigned len)
303{
304 data_t *data = (data_t *)new char[sizeof(data_t) + len];
305 memcpy(data->data, dp, len);
306 data->len = len;
307 data->next = NULL;
308 enterMutex();
309 if(!first)
310 first = data;
311 if(last)
312 last->next = data;
313 last = data;
314 if(!started) {
315 start();
316 started = true;
317 }
318 leaveMutex();
319 Semaphore::post();
320}
321
322void ThreadQueue::startQueue(void)
323{
324}
325
326void ThreadQueue::stopQueue(void)
327{
328}
329
330#ifdef CCXX_NAMESPACES
331}
332#endif
333
334/** EMACS **
335 * Local variables:
336 * mode: c++
337 * c-basic-offset: 4
338 * End:
339 */