blob: 0dfcacb399c0d818921edcb67404385267a223f9 [file] [log] [blame]
Tristan Matthews0a329cc2013-07-17 13:20:14 -04001/* $Id$ */
2/*
3 * Copyright (C) 2011-2011 Teluu Inc. (http://www.teluu.com)
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19#include <pjmedia/event.h>
20#include <pjmedia/errno.h>
21#include <pj/assert.h>
22#include <pj/list.h>
23#include <pj/log.h>
24#include <pj/os.h>
25#include <pj/pool.h>
26#include <pj/string.h>
27
28#define THIS_FILE "event.c"
29
30#define MAX_EVENTS 16
31
32typedef struct esub esub;
33
34struct esub
35{
36 PJ_DECL_LIST_MEMBER(esub);
37
38 pjmedia_event_cb *cb;
39 void *user_data;
40 void *epub;
41};
42
43typedef struct event_queue
44{
45 pjmedia_event events[MAX_EVENTS]; /**< array of events. */
46 int head, tail;
47 pj_bool_t is_full;
48} event_queue;
49
50struct pjmedia_event_mgr
51{
52 pj_pool_t *pool;
53 pj_thread_t *thread; /**< worker thread. */
54 pj_bool_t is_quitting;
55 pj_sem_t *sem;
56 pj_mutex_t *mutex;
57 event_queue ev_queue;
58 event_queue *pub_ev_queue; /**< publish() event queue. */
59 esub esub_list; /**< list of subscribers. */
60 esub free_esub_list; /**< list of subscribers. */
61 esub *th_next_sub, /**< worker thread's next sub. */
62 *pub_next_sub; /**< publish() next sub. */
63};
64
65static pjmedia_event_mgr *event_manager_instance;
66
67static pj_status_t event_queue_add_event(event_queue* ev_queue,
68 pjmedia_event *event)
69{
70 if (ev_queue->is_full) {
71 char ev_name[5];
72
73 /* This event will be ignored. */
74 PJ_LOG(4, (THIS_FILE, "Lost event %s from publisher [0x%p] "
75 "due to full queue.",
76 pjmedia_fourcc_name(event->type, ev_name),
77 event->epub));
78
79 return PJ_ETOOMANY;
80 }
81
82 pj_memcpy(&ev_queue->events[ev_queue->tail], event, sizeof(*event));
83 ev_queue->tail = (ev_queue->tail + 1) % MAX_EVENTS;
84 if (ev_queue->tail == ev_queue->head)
85 ev_queue->is_full = PJ_TRUE;
86
87 return PJ_SUCCESS;
88}
89
90static pj_status_t event_mgr_distribute_events(pjmedia_event_mgr *mgr,
91 event_queue *ev_queue,
92 esub **next_sub,
93 pj_bool_t rls_lock)
94{
95 pj_status_t err = PJ_SUCCESS;
96 esub * sub = mgr->esub_list.next;
97 pjmedia_event *ev = &ev_queue->events[ev_queue->head];
98
99 while (sub != &mgr->esub_list) {
100 *next_sub = sub->next;
101
102 /* Check if the subscriber is interested in
103 * receiving the event from the publisher.
104 */
105 if (sub->epub == ev->epub || !sub->epub) {
106 pjmedia_event_cb *cb = sub->cb;
107 void *user_data = sub->user_data;
108 pj_status_t status;
109
110 if (rls_lock)
111 pj_mutex_unlock(mgr->mutex);
112
113 status = (*cb)(ev, user_data);
114 if (status != PJ_SUCCESS && err == PJ_SUCCESS)
115 err = status;
116
117 if (rls_lock)
118 pj_mutex_lock(mgr->mutex);
119 }
120 sub = *next_sub;
121 }
122 *next_sub = NULL;
123
124 ev_queue->head = (ev_queue->head + 1) % MAX_EVENTS;
125 ev_queue->is_full = PJ_FALSE;
126
127 return err;
128}
129
130/* Event worker thread function. */
131static int event_worker_thread(void *arg)
132{
133 pjmedia_event_mgr *mgr = (pjmedia_event_mgr *)arg;
134
135 while (1) {
136 /* Wait until there is an event. */
137 pj_sem_wait(mgr->sem);
138
139 if (mgr->is_quitting)
140 break;
141
142 pj_mutex_lock(mgr->mutex);
143 event_mgr_distribute_events(mgr, &mgr->ev_queue,
144 &mgr->th_next_sub, PJ_TRUE);
145 pj_mutex_unlock(mgr->mutex);
146 }
147
148 return 0;
149}
150
151PJ_DEF(pj_status_t) pjmedia_event_mgr_create(pj_pool_t *pool,
152 unsigned options,
153 pjmedia_event_mgr **p_mgr)
154{
155 pjmedia_event_mgr *mgr;
156 pj_status_t status;
157
158 mgr = PJ_POOL_ZALLOC_T(pool, pjmedia_event_mgr);
159 mgr->pool = pj_pool_create(pool->factory, "evt mgr", 500, 500, NULL);
160 pj_list_init(&mgr->esub_list);
161 pj_list_init(&mgr->free_esub_list);
162
163 if (!(options & PJMEDIA_EVENT_MGR_NO_THREAD)) {
164 status = pj_sem_create(mgr->pool, "ev_sem", 0, MAX_EVENTS + 1,
165 &mgr->sem);
166 if (status != PJ_SUCCESS)
167 return status;
168
169 status = pj_thread_create(mgr->pool, "ev_thread",
170 &event_worker_thread,
171 mgr, 0, 0, &mgr->thread);
172 if (status != PJ_SUCCESS) {
173 pjmedia_event_mgr_destroy(mgr);
174 return status;
175 }
176 }
177
178 status = pj_mutex_create_recursive(mgr->pool, "ev_mutex", &mgr->mutex);
179 if (status != PJ_SUCCESS) {
180 pjmedia_event_mgr_destroy(mgr);
181 return status;
182 }
183
184 if (!event_manager_instance)
185 event_manager_instance = mgr;
186
187 if (p_mgr)
188 *p_mgr = mgr;
189
190 return PJ_SUCCESS;
191}
192
193PJ_DEF(pjmedia_event_mgr*) pjmedia_event_mgr_instance(void)
194{
195 return event_manager_instance;
196}
197
198PJ_DEF(void) pjmedia_event_mgr_set_instance(pjmedia_event_mgr *mgr)
199{
200 event_manager_instance = mgr;
201}
202
203PJ_DEF(void) pjmedia_event_mgr_destroy(pjmedia_event_mgr *mgr)
204{
205 if (!mgr) mgr = pjmedia_event_mgr_instance();
206 PJ_ASSERT_ON_FAIL(mgr != NULL, return);
207
208 if (mgr->thread) {
209 mgr->is_quitting = PJ_TRUE;
210 pj_sem_post(mgr->sem);
211 pj_thread_join(mgr->thread);
212 }
213
214 if (mgr->sem) {
215 pj_sem_destroy(mgr->sem);
216 mgr->sem = NULL;
217 }
218
219 if (mgr->mutex) {
220 pj_mutex_destroy(mgr->mutex);
221 mgr->mutex = NULL;
222 }
223
224 if (mgr->pool)
225 pj_pool_release(mgr->pool);
226
227 if (event_manager_instance == mgr)
228 event_manager_instance = NULL;
229}
230
231PJ_DEF(void) pjmedia_event_init( pjmedia_event *event,
232 pjmedia_event_type type,
233 const pj_timestamp *ts,
234 const void *src)
235{
236 pj_bzero(event, sizeof(*event));
237 event->type = type;
238 if (ts)
239 event->timestamp.u64 = ts->u64;
240 event->epub = event->src = src;
241}
242
243PJ_DEF(pj_status_t) pjmedia_event_subscribe( pjmedia_event_mgr *mgr,
244 pjmedia_event_cb *cb,
245 void *user_data,
246 void *epub)
247{
248 esub *sub;
249
250 PJ_ASSERT_RETURN(cb, PJ_EINVAL);
251
252 if (!mgr) mgr = pjmedia_event_mgr_instance();
253 PJ_ASSERT_RETURN(mgr, PJ_EINVAL);
254
255 pj_mutex_lock(mgr->mutex);
256 /* Check whether callback function with the same user data is already
257 * subscribed to the publisher. This is to prevent the callback function
258 * receiving the same event from the same publisher more than once.
259 */
260 sub = mgr->esub_list.next;
261 while (sub != &mgr->esub_list) {
262 esub *next = sub->next;
263 if (sub->cb == cb && sub->user_data == user_data &&
264 sub->epub == epub)
265 {
266 pj_mutex_unlock(mgr->mutex);
267 return PJ_SUCCESS;
268 }
269 sub = next;
270 }
271
272 if (mgr->free_esub_list.next != &mgr->free_esub_list) {
273 sub = mgr->free_esub_list.next;
274 pj_list_erase(sub);
275 } else
276 sub = PJ_POOL_ZALLOC_T(mgr->pool, esub);
277 sub->cb = cb;
278 sub->user_data = user_data;
279 sub->epub = epub;
280 pj_list_push_back(&mgr->esub_list, sub);
281 pj_mutex_unlock(mgr->mutex);
282
283 return PJ_SUCCESS;
284}
285
286PJ_DEF(pj_status_t)
287pjmedia_event_unsubscribe(pjmedia_event_mgr *mgr,
288 pjmedia_event_cb *cb,
289 void *user_data,
290 void *epub)
291{
292 esub *sub;
293
294 PJ_ASSERT_RETURN(cb, PJ_EINVAL);
295
296 if (!mgr) mgr = pjmedia_event_mgr_instance();
297 PJ_ASSERT_RETURN(mgr, PJ_EINVAL);
298
299 pj_mutex_lock(mgr->mutex);
300 sub = mgr->esub_list.next;
301 while (sub != &mgr->esub_list) {
302 esub *next = sub->next;
303 if (sub->cb == cb && (sub->user_data == user_data || !user_data) &&
304 (sub->epub == epub || !epub))
305 {
306 /* If the worker thread or pjmedia_event_publish() API is
307 * in the process of distributing events, make sure that
308 * its pointer to the next subscriber stays valid.
309 */
310 if (mgr->th_next_sub == sub)
311 mgr->th_next_sub = sub->next;
312 if (mgr->pub_next_sub == sub)
313 mgr->pub_next_sub = sub->next;
314 pj_list_erase(sub);
315 pj_list_push_back(&mgr->free_esub_list, sub);
316 if (user_data && epub)
317 break;
318 }
319 sub = next;
320 }
321 pj_mutex_unlock(mgr->mutex);
322
323 return PJ_SUCCESS;
324}
325
326PJ_DEF(pj_status_t) pjmedia_event_publish( pjmedia_event_mgr *mgr,
327 void *epub,
328 pjmedia_event *event,
329 pjmedia_event_publish_flag flag)
330{
331 pj_status_t err = PJ_SUCCESS;
332
333 PJ_ASSERT_RETURN(epub && event, PJ_EINVAL);
334
335 if (!mgr) mgr = pjmedia_event_mgr_instance();
336 PJ_ASSERT_RETURN(mgr, PJ_EINVAL);
337
338 event->epub = epub;
339
340 pj_mutex_lock(mgr->mutex);
341 if (flag & PJMEDIA_EVENT_PUBLISH_POST_EVENT) {
342 if (event_queue_add_event(&mgr->ev_queue, event) == PJ_SUCCESS)
343 pj_sem_post(mgr->sem);
344 } else {
345 /* For nested pjmedia_event_publish() calls, i.e. calling publish()
346 * inside the subscriber's callback, the function will only add
347 * the event to the event queue of the first publish() call. It
348 * is the first publish() call that will be responsible to
349 * distribute the events.
350 */
351 if (mgr->pub_ev_queue) {
352 event_queue_add_event(mgr->pub_ev_queue, event);
353 } else {
354 static event_queue ev_queue;
355 pj_status_t status;
356
357 ev_queue.head = ev_queue.tail = 0;
358 ev_queue.is_full = PJ_FALSE;
359 mgr->pub_ev_queue = &ev_queue;
360
361 event_queue_add_event(mgr->pub_ev_queue, event);
362
363 do {
364 status = event_mgr_distribute_events(mgr, mgr->pub_ev_queue,
365 &mgr->pub_next_sub,
366 PJ_FALSE);
367 if (status != PJ_SUCCESS && err == PJ_SUCCESS)
368 err = status;
369 } while(ev_queue.head != ev_queue.tail || ev_queue.is_full);
370
371 mgr->pub_ev_queue = NULL;
372 }
373 }
374 pj_mutex_unlock(mgr->mutex);
375
376 return err;
377}