blob: 24aa68a029ae0a23bf773b26d1e3d165ed8745e7 [file] [log] [blame]
Benny Prijono4766ffe2005-11-01 17:56:59 +00001/* $Id$
2 *
3 */
Benny Prijonodd859a62005-11-01 16:42:51 +00004#include <pjsip_simple/event_notify.h>
5#include <pjsip/sip_msg.h>
6#include <pjsip/sip_misc.h>
7#include <pjsip/sip_endpoint.h>
8#include <pjsip/sip_module.h>
9#include <pjsip/sip_transaction.h>
10#include <pjsip/sip_event.h>
11#include <pj/pool.h>
12#include <pj/timer.h>
13#include <pj/string.h>
14#include <pj/hash.h>
15#include <pj/os.h>
16#include <pj/except.h>
17#include <pj/log.h>
18#include <pj/guid.h>
19
20#define THIS_FILE "event_sub"
21
22/* String names for state.
23 * The names here should be compliant with sub_state names in RFC3265.
24 */
25static const pj_str_t state[] = {
26 { "null", 4 },
27 { "active", 6 },
28 { "pending", 7 },
29 { "terminated", 10 },
30 { "unknown", 7 }
31};
32
33/* Timer IDs */
34#define TIMER_ID_REFRESH 1
35#define TIMER_ID_UAS_EXPIRY 2
36
37/* Static configuration. */
38#define SECONDS_BEFORE_EXPIRY 10
39#define MGR_POOL_SIZE 512
40#define MGR_POOL_INC 0
41#define SUB_POOL_SIZE 2048
42#define SUB_POOL_INC 0
43#define HASH_TABLE_SIZE 32
44
45/* Static vars. */
46static int mod_id;
47static const pjsip_method SUBSCRIBE = { PJSIP_OTHER_METHOD, {"SUBSCRIBE", 9}};
48static const pjsip_method NOTIFY = { PJSIP_OTHER_METHOD, { "NOTIFY", 6}};
49
50typedef struct package
51{
52 PJ_DECL_LIST_MEMBER(struct package)
53 pj_str_t event;
54 int accept_cnt;
55 pj_str_t *accept;
56 pjsip_event_sub_pkg_cb cb;
57} package;
58
59/* Event subscription manager singleton instance. */
60static struct pjsip_event_sub_mgr
61{
62 pj_pool_t *pool;
63 pj_hash_table_t *ht;
64 pjsip_endpoint *endpt;
65 pj_mutex_t *mutex;
66 pjsip_allow_events_hdr *allow_events;
67 package pkg_list;
68} mgr;
69
70/* Fordward declarations for static functions. */
71static pj_status_t mod_init(pjsip_endpoint *, pjsip_module *, pj_uint32_t);
72static pj_status_t mod_deinit(pjsip_module*);
73static void tsx_handler(pjsip_module*, pjsip_event*);
74static pjsip_event_sub *find_sub(pjsip_rx_data *);
75static void on_subscribe_request(pjsip_transaction*, pjsip_rx_data*);
76static void on_subscribe_response(void *, pjsip_event*);
77static void on_notify_request(pjsip_transaction *, pjsip_rx_data*);
78static void on_notify_response(void *, pjsip_event *);
79static void refresh_timer_cb(pj_timer_heap_t*, pj_timer_entry*);
80static void uas_expire_timer_cb(pj_timer_heap_t*, pj_timer_entry*);
81static pj_status_t send_sub_refresh( pjsip_event_sub *sub );
82
83/* Module descriptor. */
84static pjsip_module event_sub_module =
85{
86 {"EventSub", 8}, /* Name. */
87 0, /* Flag */
88 128, /* Priority */
89 &mgr, /* User data. */
90 2, /* Number of methods supported . */
91 { &SUBSCRIBE, &NOTIFY }, /* Array of methods */
92 &mod_init, /* init_module() */
93 NULL, /* start_module() */
94 &mod_deinit, /* deinit_module() */
95 &tsx_handler, /* tsx_handler() */
96};
97
98/*
99 * Module initialization.
100 * This will be called by endpoint when it initializes all modules.
101 */
102static pj_status_t mod_init( pjsip_endpoint *endpt,
103 struct pjsip_module *mod, pj_uint32_t id )
104{
105 pj_pool_t *pool;
106
107 pool = pjsip_endpt_create_pool(endpt, "esubmgr", MGR_POOL_SIZE, MGR_POOL_INC);
108 if (!pool)
109 return -1;
110
111 /* Manager initialization: create hash table and mutex. */
112 mgr.pool = pool;
113 mgr.endpt = endpt;
114 mgr.ht = pj_hash_create(pool, HASH_TABLE_SIZE);
115 if (!mgr.ht)
116 return -1;
117
118 mgr.mutex = pj_mutex_create(pool, "esubmgr", PJ_MUTEX_SIMPLE);
119 if (!mgr.mutex)
120 return -1;
121
122 /* Attach manager to module. */
123 mod->mod_data = &mgr;
124
125 /* Init package list. */
126 pj_list_init(&mgr.pkg_list);
127
128 /* Init Allow-Events header. */
129 mgr.allow_events = pjsip_allow_events_hdr_create(mgr.pool);
130
131 /* Save the module ID. */
132 mod_id = id;
133
134 pjsip_event_notify_init_parser();
135 return 0;
136}
137
138/*
139 * Module deinitialization.
140 * Called by endpoint.
141 */
142static pj_status_t mod_deinit( struct pjsip_module *mod )
143{
144 pj_mutex_lock(mgr.mutex);
145 pj_mutex_destroy(mgr.mutex);
146 pjsip_endpt_destroy_pool(mgr.endpt, mgr.pool);
147 return 0;
148}
149
150/*
151 * This public function is called by application to register callback.
152 * In exchange, the instance of the module is returned.
153 */
154PJ_DEF(pjsip_module*) pjsip_event_sub_get_module(void)
155{
156 return &event_sub_module;
157}
158
159/*
160 * Register event package.
161 */
162PJ_DEF(pj_status_t) pjsip_event_sub_register_pkg( const pj_str_t *event,
163 int accept_cnt,
164 const pj_str_t accept[],
165 const pjsip_event_sub_pkg_cb *cb )
166{
167 package *pkg;
168 int i;
169
170 pj_mutex_lock(mgr.mutex);
171
172 /* Create and register new package. */
173 pkg = pj_pool_alloc(mgr.pool, sizeof(*pkg));
174 pj_strdup(mgr.pool, &pkg->event, event);
175 pj_list_insert_before(&mgr.pkg_list, pkg);
176
177 /* Save Accept specification. */
178 pkg->accept_cnt = accept_cnt;
179 pkg->accept = pj_pool_alloc(mgr.pool, accept_cnt*sizeof(pj_str_t));
180 for (i=0; i<accept_cnt; ++i) {
181 pj_strdup(mgr.pool, &pkg->accept[i], &accept[i]);
182 }
183
184 /* Copy callback. */
185 pj_memcpy(&pkg->cb, cb, sizeof(*cb));
186
187 /* Update Allow-Events header. */
188 pj_assert(mgr.allow_events->event_cnt < PJSIP_MAX_ALLOW_EVENTS);
189 mgr.allow_events->events[mgr.allow_events->event_cnt++] = pkg->event;
190
191 pj_mutex_unlock(mgr.mutex);
192 return 0;
193}
194
195/*
196 * Create subscription key (for hash table).
197 */
198static void create_subscriber_key( pj_str_t *key, pj_pool_t *pool,
199 pjsip_role_e role,
200 const pj_str_t *call_id, const pj_str_t *from_tag)
201{
202 char *p;
203
204 p = key->ptr = pj_pool_alloc(pool, call_id->slen + from_tag->slen + 3);
205 *p++ = (role == PJSIP_ROLE_UAS ? 'S' : 'C');
206 *p++ = '$';
207 pj_memcpy(p, call_id->ptr, call_id->slen);
208 p += call_id->slen;
209 *p++ = '$';
210 pj_memcpy(p, from_tag->ptr, from_tag->slen);
211 p += from_tag->slen;
212
213 key->slen = p - key->ptr;
214}
215
216
217/*
218 * Create UAC subscription.
219 */
220PJ_DEF(pjsip_event_sub*) pjsip_event_sub_create( pjsip_endpoint *endpt,
221 const pj_str_t *from,
222 const pj_str_t *to,
223 const pj_str_t *event,
224 int expires,
225 int accept_cnt,
226 const pj_str_t accept[],
227 void *user_data,
228 const pjsip_event_sub_cb *cb)
229{
230 pjsip_tx_data *tdata;
231 pj_pool_t *pool;
232 const pjsip_hdr *hdr;
233 pjsip_event_sub *sub;
234 PJ_USE_EXCEPTION;
235
236 PJ_LOG(5,(THIS_FILE, "Creating event subscription %.*s to %.*s",
237 event->slen, event->ptr, to->slen, to->ptr));
238
239 /* Create pool for the event subscription. */
240 pool = pjsip_endpt_create_pool(endpt, "esub", SUB_POOL_SIZE, SUB_POOL_INC);
241 if (!pool) {
242 return NULL;
243 }
244
245 /* Init subscription. */
246 sub = pj_pool_calloc(pool, 1, sizeof(*sub));
247 sub->pool = pool;
248 sub->endpt = endpt;
249 sub->role = PJSIP_ROLE_UAC;
250 sub->state = PJSIP_EVENT_SUB_STATE_PENDING;
251 sub->state_str = state[sub->state];
252 sub->user_data = user_data;
253 sub->timer.id = 0;
254 sub->default_interval = expires;
255 pj_memcpy(&sub->cb, cb, sizeof(*cb));
256 pj_list_init(&sub->auth_sess);
257 pj_list_init(&sub->route_set);
258 sub->mutex = pj_mutex_create(pool, "esub", PJ_MUTEX_RECURSE);
259 if (!sub->mutex) {
260 pjsip_endpt_destroy_pool(endpt, pool);
261 return NULL;
262 }
263
264 /* The easiest way to parse the parameters is to create a dummy request! */
265 tdata = pjsip_endpt_create_request( endpt, &SUBSCRIBE, to, from, to, from,
266 NULL, -1, NULL);
267 if (!tdata) {
268 pj_mutex_destroy(sub->mutex);
269 pjsip_endpt_destroy_pool(endpt, pool);
270 return NULL;
271 }
272
273 /*
274 * Duplicate headers in the request to our structure.
275 */
276 PJ_TRY {
277 int i;
278
279 /* From */
280 hdr = pjsip_msg_find_hdr(tdata->msg, PJSIP_H_FROM, NULL);
281 pj_assert(hdr != NULL);
282 sub->from = pjsip_hdr_clone(pool, hdr);
283
284 /* To */
285 hdr = pjsip_msg_find_hdr(tdata->msg, PJSIP_H_TO, NULL);
286 pj_assert(hdr != NULL);
287 sub->to = pjsip_hdr_clone(pool, hdr);
288
289 /* Contact. */
290 sub->contact = pjsip_contact_hdr_create(pool);
291 sub->contact->uri = sub->from->uri;
292
293 /* Call-ID */
294 hdr = pjsip_msg_find_hdr(tdata->msg, PJSIP_H_CALL_ID, NULL);
295 pj_assert(hdr != NULL);
296 sub->call_id = pjsip_hdr_clone(pool, hdr);
297
298 /* CSeq */
299 sub->cseq = pj_rand() % 0xFFFF;
300
301 /* Event. */
302 sub->event = pjsip_event_hdr_create(sub->pool);
303 pj_strdup(pool, &sub->event->event_type, event);
304
305 /* Expires. */
306 sub->uac_expires = pjsip_expires_hdr_create(pool);
307 sub->uac_expires->ivalue = expires;
308
309 /* Accept. */
310 sub->local_accept = pjsip_accept_hdr_create(pool);
311 for (i=0; i<accept_cnt && i < PJSIP_MAX_ACCEPT_COUNT; ++i) {
312 sub->local_accept->count++;
313 pj_strdup(sub->pool, &sub->local_accept->values[i], &accept[i]);
314 }
315
316 /* Register to hash table. */
317 create_subscriber_key( &sub->key, pool, PJSIP_ROLE_UAC,
318 &sub->call_id->id, &sub->from->tag);
319 pj_mutex_lock( mgr.mutex );
320 pj_hash_set( pool, mgr.ht, sub->key.ptr, sub->key.slen, sub);
321 pj_mutex_unlock( mgr.mutex );
322
323 }
324 PJ_DEFAULT {
325 PJ_LOG(4,(THIS_FILE, "event_sub%p (%s): caught exception %d during init",
326 sub, state[sub->state].ptr, PJ_GET_EXCEPTION()));
327
328 pjsip_tx_data_dec_ref(tdata);
329 pj_mutex_destroy(sub->mutex);
330 pjsip_endpt_destroy_pool(endpt, sub->pool);
331 return NULL;
332 }
333 PJ_END;
334
335 /* All set, delete temporary transmit data as we don't need it. */
336 pjsip_tx_data_dec_ref(tdata);
337
338 PJ_LOG(4,(THIS_FILE, "event_sub%p (%s): client created, target=%.*s, event=%.*s",
339 sub, state[sub->state].ptr,
340 to->slen, to->ptr, event->slen, event->ptr));
341
342 return sub;
343}
344
345/*
346 * Set credentials.
347 */
348PJ_DEF(pj_status_t) pjsip_event_sub_set_credentials( pjsip_event_sub *sub,
349 int count,
350 const pjsip_cred_info cred[])
351{
352 pj_mutex_lock(sub->mutex);
353 if (count > 0) {
354 sub->cred_info = pj_pool_alloc(sub->pool, count*sizeof(pjsip_cred_info));
355 pj_memcpy( sub->cred_info, cred, count*sizeof(pjsip_cred_info));
356 }
357 sub->cred_cnt = count;
358 pj_mutex_unlock(sub->mutex);
359 return 0;
360}
361
362/*
363 * Set route-set.
364 */
365PJ_DEF(pj_status_t) pjsip_event_sub_set_route_set( pjsip_event_sub *sub,
366 const pjsip_route_hdr *route_set )
367{
368 const pjsip_route_hdr *hdr;
369
370 pj_mutex_lock(sub->mutex);
371
372 /* Clear existing route set. */
373 pj_list_init(&sub->route_set);
374
375 /* Duplicate route headers. */
376 hdr = route_set->next;
377 while (hdr != route_set) {
378 pjsip_route_hdr *new_hdr = pjsip_hdr_clone(sub->pool, hdr);
379 pj_list_insert_before(&sub->route_set, new_hdr);
380 hdr = hdr->next;
381 }
382
383 pj_mutex_unlock(sub->mutex);
384
385 return 0;
386}
387
388/*
389 * Send subscribe request.
390 */
391PJ_DEF(pj_status_t) pjsip_event_sub_subscribe( pjsip_event_sub *sub )
392{
393 pj_status_t status;
394
395 pj_mutex_lock(sub->mutex);
396 status = send_sub_refresh(sub);
397 pj_mutex_unlock(sub->mutex);
398
399 return status;
400}
401
402/*
403 * Destroy subscription.
404 * If there are pending transactions, then this will just set the flag.
405 */
406PJ_DEF(pj_status_t) pjsip_event_sub_destroy(pjsip_event_sub *sub)
407{
408 pj_assert(sub != NULL);
409 if (sub == NULL)
410 return -1;
411
412 /* Application must terminate the subscription first. */
413 pj_assert(sub->state == PJSIP_EVENT_SUB_STATE_NULL ||
414 sub->state == PJSIP_EVENT_SUB_STATE_TERMINATED);
415
416 PJ_LOG(4,(THIS_FILE, "event_sub%p (%s): about to be destroyed",
417 sub, state[sub->state].ptr));
418
419 pj_mutex_lock(mgr.mutex);
420 pj_mutex_lock(sub->mutex);
421
422 /* Set delete flag. */
423 sub->delete_flag = 1;
424
425 /* Unregister timer, if any. */
426 if (sub->timer.id != 0) {
427 pjsip_endpt_cancel_timer(sub->endpt, &sub->timer);
428 sub->timer.id = 0;
429 }
430
431 if (sub->pending_tsx > 0) {
432 pj_mutex_unlock(sub->mutex);
433 pj_mutex_unlock(mgr.mutex);
434 PJ_LOG(4,(THIS_FILE, "event_sub%p (%s): has %d pending, will destroy later",
435 sub, state[sub->state].ptr,
436 sub->pending_tsx));
437 return 1;
438 }
439
440 /* Unregister from hash table. */
441 pj_hash_set(sub->pool, mgr.ht, sub->key.ptr, sub->key.slen, NULL);
442
443 /* Destroy. */
444 pj_mutex_destroy(sub->mutex);
445 pjsip_endpt_destroy_pool(sub->endpt, sub->pool);
446
447 pj_mutex_unlock(mgr.mutex);
448
449 PJ_LOG(4,(THIS_FILE, "event_sub%p: destroyed", sub));
450 return 0;
451}
452
453/* Change state. */
454static void sub_set_state( pjsip_event_sub *sub, int new_state)
455{
456 PJ_LOG(4,(THIS_FILE, "event_sub%p (%s): changed state to %s",
457 sub, state[sub->state].ptr, state[new_state].ptr));
458 sub->state = new_state;
459 sub->state_str = state[new_state];
460}
461
462/*
463 * Refresh subscription.
464 */
465static pj_status_t send_sub_refresh( pjsip_event_sub *sub )
466{
467 pjsip_tx_data *tdata;
468 pj_status_t status;
469 const pjsip_route_hdr *route;
470
471 pj_assert(sub->role == PJSIP_ROLE_UAC);
472 pj_assert(sub->state != PJSIP_EVENT_SUB_STATE_TERMINATED);
473 if (sub->role != PJSIP_ROLE_UAC ||
474 sub->state == PJSIP_EVENT_SUB_STATE_TERMINATED)
475 {
476 return -1;
477 }
478
479 PJ_LOG(4,(THIS_FILE, "event_sub%p (%s): refreshing subscription",
480 sub, state[sub->state].ptr));
481
482 /* Create request. */
483 tdata = pjsip_endpt_create_request_from_hdr( sub->endpt,
484 &SUBSCRIBE,
485 sub->to->uri,
486 sub->from, sub->to,
487 sub->contact, sub->call_id,
488 sub->cseq++,
489 NULL);
490
491 if (!tdata) {
492 PJ_LOG(4,(THIS_FILE, "event_sub%p (%s): refresh: unable to create tx data!",
493 sub, state[sub->state].ptr));
494 return -1;
495 }
496
497 pjsip_msg_add_hdr( tdata->msg,
498 pjsip_hdr_shallow_clone(tdata->pool, sub->event));
499 pjsip_msg_add_hdr( tdata->msg,
500 pjsip_hdr_shallow_clone(tdata->pool, sub->uac_expires));
501 pjsip_msg_add_hdr( tdata->msg,
502 pjsip_hdr_shallow_clone(tdata->pool, sub->local_accept));
503 pjsip_msg_add_hdr( tdata->msg,
504 pjsip_hdr_shallow_clone(tdata->pool, mgr.allow_events));
505
506 /* Authentication */
507 pjsip_auth_init_req( sub->pool, tdata, &sub->auth_sess,
508 sub->cred_cnt, sub->cred_info);
509
510 /* Route set. */
511 route = sub->route_set.next;
512 while (route != &sub->route_set) {
513 pj_list_insert_before( &tdata->msg->hdr,
514 pjsip_hdr_shallow_clone(tdata->pool, route));
515 route = route->next;
516 }
517
518 /* Send */
519 status = pjsip_endpt_send_request( sub->endpt, tdata, -1, sub,
520 &on_subscribe_response);
521 if (status == 0) {
522 sub->pending_tsx++;
523 } else {
524 PJ_LOG(4,(THIS_FILE, "event_sub%p (%s): FAILED to refresh subscription!",
525 sub, state[sub->state].ptr));
526 }
527
528 return status;
529}
530
531/*
532 * Stop subscription.
533 */
534PJ_DEF(pj_status_t) pjsip_event_sub_unsubscribe( pjsip_event_sub *sub )
535{
536 pjsip_tx_data *tdata;
537 const pjsip_route_hdr *route;
538 pj_status_t status;
539
540 PJ_LOG(4,(THIS_FILE, "event_sub%p (%s): unsubscribing...",
541 sub, state[sub->state].ptr));
542
543 /* Lock subscription. */
544 pj_mutex_lock(sub->mutex);
545
546 pj_assert(sub->role == PJSIP_ROLE_UAC);
547
548 /* Kill refresh timer, if any. */
549 if (sub->timer.id != 0) {
550 sub->timer.id = 0;
551 pjsip_endpt_cancel_timer(sub->endpt, &sub->timer);
552 }
553
554 /* Create request. */
555 tdata = pjsip_endpt_create_request_from_hdr( sub->endpt,
556 &SUBSCRIBE,
557 sub->to->uri,
558 sub->from, sub->to,
559 sub->contact, sub->call_id,
560 sub->cseq++,
561 NULL);
562
563 if (!tdata) {
564 pj_mutex_unlock(sub->mutex);
565 return -1;
566 }
567
568 /* Add headers to request. */
569 pjsip_msg_add_hdr( tdata->msg, pjsip_hdr_shallow_clone(tdata->pool, sub->event));
570 sub->uac_expires->ivalue = 0;
571 pjsip_msg_add_hdr( tdata->msg, pjsip_hdr_shallow_clone(tdata->pool, sub->uac_expires));
572
573 /* Add authentication. */
574 pjsip_auth_init_req( sub->pool, tdata, &sub->auth_sess,
575 sub->cred_cnt, sub->cred_info);
576
577
578 /* Route set. */
579 route = sub->route_set.next;
580 while (route != &sub->route_set) {
581 pj_list_insert_before( &tdata->msg->hdr,
582 pjsip_hdr_shallow_clone(tdata->pool, route));
583 route = route->next;
584 }
585
586 /* Prevent timer from refreshing itself. */
587 sub->default_interval = 0;
588
589 /* Set state. */
590 sub_set_state( sub, PJSIP_EVENT_SUB_STATE_TERMINATED );
591
592 /* Send the request. */
593 status = pjsip_endpt_send_request( sub->endpt, tdata, -1, sub,
594 &on_subscribe_response);
595 if (status == 0) {
596 sub->pending_tsx++;
597 }
598
599 pj_mutex_unlock(sub->mutex);
600
601 if (status != 0) {
602 PJ_LOG(4,(THIS_FILE, "event_sub%p (%s): FAILED to unsubscribe!",
603 sub, state[sub->state].ptr));
604 }
605
606 return status;
607}
608
609/*
610 * Send notify.
611 */
612PJ_DEF(pj_status_t) pjsip_event_sub_notify(pjsip_event_sub *sub,
613 pjsip_event_sub_state new_state,
614 const pj_str_t *reason,
615 pjsip_msg_body *body)
616{
617 pjsip_tx_data *tdata;
618 pjsip_sub_state_hdr *ss_hdr;
619 const pjsip_route_hdr *route;
620 pj_time_val now;
621 pj_status_t status;
622 pjsip_event_sub_state old_state = sub->state;
623
624 pj_gettimeofday(&now);
625
626 pj_assert(sub->role == PJSIP_ROLE_UAS);
627 if (sub->role != PJSIP_ROLE_UAS)
628 return -1;
629
630 PJ_LOG(4,(THIS_FILE, "event_sub%p (%s): sending NOTIFY",
631 sub, state[new_state].ptr));
632
633 /* Lock subscription. */
634 pj_mutex_lock(sub->mutex);
635
636 /* Can not send NOTIFY if current state is NULL. We can accept TERMINATED. */
637 if (sub->state==PJSIP_EVENT_SUB_STATE_NULL) {
638 pj_assert(0);
639 pj_mutex_unlock(sub->mutex);
640 return -1;
641 }
642
643 /* Update state no matter what. */
644 sub_set_state(sub, new_state);
645
646 /* Create transmit data. */
647 tdata = pjsip_endpt_create_request_from_hdr( sub->endpt,
648 &NOTIFY,
649 sub->to->uri,
650 sub->from, sub->to,
651 sub->contact, sub->call_id,
652 sub->cseq++,
653 NULL);
654 if (!tdata) {
655 pj_mutex_unlock(sub->mutex);
656 return -1;
657 }
658
659 /* Add Event header. */
660 pjsip_msg_add_hdr(tdata->msg, pjsip_hdr_shallow_clone(tdata->pool, sub->event));
661
662 /* Add Subscription-State header. */
663 ss_hdr = pjsip_sub_state_hdr_create(tdata->pool);
664 ss_hdr->sub_state = state[new_state];
665 ss_hdr->expires_param = sub->expiry_time.sec - now.sec;
666 if (ss_hdr->expires_param < 0)
667 ss_hdr->expires_param = 0;
668 if (reason)
669 pj_strdup(tdata->pool, &ss_hdr->reason_param, reason);
670 pjsip_msg_add_hdr(tdata->msg, (pjsip_hdr*)ss_hdr);
671
672 /* Add Allow-Events header. */
673 pjsip_msg_add_hdr( tdata->msg,
674 pjsip_hdr_shallow_clone(tdata->pool, mgr.allow_events));
675
676 /* Add authentication */
677 pjsip_auth_init_req( sub->pool, tdata, &sub->auth_sess,
678 sub->cred_cnt, sub->cred_info);
679
680 /* Route set. */
681 route = sub->route_set.next;
682 while (route != &sub->route_set) {
683 pj_list_insert_before( &tdata->msg->hdr,
684 pjsip_hdr_shallow_clone(tdata->pool, route));
685 route = route->next;
686 }
687
688 /* Attach body. */
689 tdata->msg->body = body;
690
691 /* That's it, send! */
692 status = pjsip_endpt_send_request( sub->endpt, tdata, -1, sub, &on_notify_response);
693 if (status == 0)
694 sub->pending_tsx++;
695
696 /* If terminated notify application. */
697 if (new_state!=old_state && new_state==PJSIP_EVENT_SUB_STATE_TERMINATED) {
698 if (sub->cb.on_sub_terminated) {
699 sub->pending_tsx++;
700 (*sub->cb.on_sub_terminated)(sub, reason);
701 sub->pending_tsx--;
702 }
703 }
704
705 /* Unlock subscription. */
706 pj_mutex_unlock(sub->mutex);
707
708 if (status != 0) {
709 PJ_LOG(4,(THIS_FILE, "event_sub%p (%s): failed to send NOTIFY",
710 sub, state[sub->state].ptr));
711 }
712
713 if (sub->delete_flag && sub->pending_tsx <= 0) {
714 pjsip_event_sub_destroy(sub);
715 }
716 return status;
717}
718
719
720/* If this timer callback is called, it means subscriber hasn't refreshed its
721 * subscription on-time. Set the state to terminated. This will also send
722 * NOTIFY with Subscription-State set to terminated.
723 */
724static void uas_expire_timer_cb( pj_timer_heap_t *timer_heap, pj_timer_entry *entry)
725{
726 pjsip_event_sub *sub = entry->user_data;
727 pj_str_t reason = { "timeout", 7 };
728
729 PJ_LOG(4,(THIS_FILE, "event_sub%p (%s): UAS subscription expired!",
730 sub, state[sub->state].ptr));
731
732 pj_mutex_lock(sub->mutex);
733 sub->timer.id = 0;
734
735 if (sub->cb.on_sub_terminated && sub->state!=PJSIP_EVENT_SUB_STATE_TERMINATED) {
736 /* Notify application, but prevent app from destroying the sub. */
737 ++sub->pending_tsx;
738 (*sub->cb.on_sub_terminated)(sub, &reason);
739 --sub->pending_tsx;
740 }
741 //pjsip_event_sub_notify( sub, PJSIP_EVENT_SUB_STATE_TERMINATED,
742 // &reason, NULL);
743 pj_mutex_unlock(sub->mutex);
744
745}
746
747/* Schedule notifier expiration. */
748static void sub_schedule_uas_expire( pjsip_event_sub *sub, int sec_delay)
749{
750 pj_time_val delay = { 0, 0 };
751 pj_parsed_time pt;
752
753 if (sub->timer.id != 0)
754 pjsip_endpt_cancel_timer(sub->endpt, &sub->timer);
755
756 pj_gettimeofday(&sub->expiry_time);
757 sub->expiry_time.sec += sec_delay;
758
759 sub->timer.id = TIMER_ID_UAS_EXPIRY;
760 sub->timer.user_data = sub;
761 sub->timer.cb = &uas_expire_timer_cb;
762 delay.sec = sec_delay;
763 pjsip_endpt_schedule_timer( sub->endpt, &sub->timer, &delay);
764
765 pj_time_decode(&sub->expiry_time, &pt);
766 PJ_LOG(4,(THIS_FILE,
767 "event_sub%p (%s)(UAS): will expire at %02d:%02d:%02d (in %d secs)",
768 sub, state[sub->state].ptr, pt.hour, pt.min, pt.sec, sec_delay));
769}
770
771/* This timer is called for UAC to refresh the subscription. */
772static void refresh_timer_cb( pj_timer_heap_t *timer_heap, pj_timer_entry *entry)
773{
774 pjsip_event_sub *sub = entry->user_data;
775
776 PJ_LOG(4,(THIS_FILE, "event_sub%p (%s): refresh subscription timer",
777 sub, state[sub->state].ptr));
778
779 pj_mutex_lock(sub->mutex);
780 sub->timer.id = 0;
781 send_sub_refresh(sub);
782 pj_mutex_unlock(sub->mutex);
783}
784
785
786/* This will update the UAC's refresh schedule. */
787static void update_next_refresh(pjsip_event_sub *sub, int interval)
788{
789 pj_time_val delay = {0, 0};
790 pj_parsed_time pt;
791
792 if (interval < SECONDS_BEFORE_EXPIRY) {
793 PJ_LOG(4,(THIS_FILE,
794 "event_sub%p (%s): expiration delay too short (%d sec)! updated.",
795 sub, state[sub->state].ptr, interval));
796 interval = SECONDS_BEFORE_EXPIRY;
797 }
798
799 if (sub->timer.id != 0)
800 pjsip_endpt_cancel_timer(sub->endpt, &sub->timer);
801
802 sub->timer.id = TIMER_ID_REFRESH;
803 sub->timer.user_data = sub;
804 sub->timer.cb = &refresh_timer_cb;
805 pj_gettimeofday(&sub->expiry_time);
806 delay.sec = interval - SECONDS_BEFORE_EXPIRY;
807 sub->expiry_time.sec += delay.sec;
808
809 pj_time_decode(&sub->expiry_time, &pt);
810 PJ_LOG(4,(THIS_FILE,
811 "event_sub%p (%s): will send SUBSCRIBE at %02d:%02d:%02d (in %d secs)",
812 sub, state[sub->state].ptr,
813 pt.hour, pt.min, pt.sec,
814 delay.sec));
815
816 pjsip_endpt_schedule_timer( sub->endpt, &sub->timer, &delay );
817}
818
819
820/* Find subscription in the hash table.
821 * If found, lock the subscription before returning to caller.
822 */
823static pjsip_event_sub *find_sub(pjsip_rx_data *rdata)
824{
825 pj_str_t key;
826 pjsip_role_e role;
827 pjsip_event_sub *sub;
828 pjsip_method *method = &rdata->msg->line.req.method;
829 pj_str_t *tag;
830
831 if (rdata->msg->type == PJSIP_REQUEST_MSG) {
832 if (pjsip_method_cmp(method, &SUBSCRIBE)==0) {
833 role = PJSIP_ROLE_UAS;
834 tag = &rdata->to_tag;
835 } else {
836 pj_assert(pjsip_method_cmp(method, &NOTIFY) == 0);
837 role = PJSIP_ROLE_UAC;
838 tag = &rdata->to_tag;
839 }
840 } else {
841 if (pjsip_method_cmp(&rdata->cseq->method, &SUBSCRIBE)==0) {
842 role = PJSIP_ROLE_UAC;
843 tag = &rdata->from_tag;
844 } else {
845 pj_assert(pjsip_method_cmp(method, &NOTIFY) == 0);
846 role = PJSIP_ROLE_UAS;
847 tag = &rdata->from_tag;
848 }
849 }
850 create_subscriber_key( &key, rdata->pool, role, &rdata->call_id, tag);
851
852 pj_mutex_lock(mgr.mutex);
853 sub = pj_hash_get(mgr.ht, key.ptr, key.slen);
854 if (sub)
855 pj_mutex_lock(sub->mutex);
856 pj_mutex_unlock(mgr.mutex);
857
858 return sub;
859}
860
861
862/* This function is called when we receive SUBSCRIBE request message
863 * to refresh existing subscription.
864 */
865static void on_received_sub_refresh( pjsip_event_sub *sub,
866 pjsip_transaction *tsx, pjsip_rx_data *rdata)
867{
868 pjsip_event_hdr *e;
869 pjsip_expires_hdr *expires;
870 pj_str_t hname;
871 int status = 200;
872 pj_str_t reason_phrase = { NULL, 0 };
873 int new_state = sub->state;
874 int old_state = sub->state;
875 int new_interval = 0;
876 pjsip_tx_data *tdata;
877
878 PJ_LOG(4,(THIS_FILE, "event_sub%p (%s): received target refresh",
879 sub, state[sub->state].ptr));
880
881 /* Check that the event matches. */
882 hname = pj_str("Event");
883 e = pjsip_msg_find_hdr_by_name( rdata->msg, &hname, NULL);
884 if (!e) {
885 status = 400;
886 reason_phrase = pj_str("Missing Event header");
887 goto send_response;
888 }
889 if (pj_stricmp(&e->event_type, &sub->event->event_type) != 0 ||
890 pj_stricmp(&e->id_param, &sub->event->id_param) != 0)
891 {
892 status = 481;
893 reason_phrase = pj_str("Subscription does not exist");
894 goto send_response;
895 }
896
897 /* Check server state. */
898 if (sub->state == PJSIP_EVENT_SUB_STATE_TERMINATED) {
899 status = 481;
900 reason_phrase = pj_str("Subscription does not exist");
901 goto send_response;
902 }
903
904 /* Check expires header. */
905 expires = pjsip_msg_find_hdr(rdata->msg, PJSIP_H_EXPIRES, NULL);
906 if (!expires) {
907 /*
908 status = 400;
909 reason_phrase = pj_str("Missing Expires header");
910 goto send_response;
911 */
912 new_interval = sub->default_interval;
913 } else {
914 /* Check that interval is not too short.
915 * Note that expires time may be zero (for unsubscription).
916 */
917 new_interval = expires->ivalue;
918 if (new_interval != 0 && new_interval < SECONDS_BEFORE_EXPIRY) {
919 status = PJSIP_SC_INTERVAL_TOO_BRIEF;
920 goto send_response;
921 }
922 }
923
924 /* Update interval. */
925 sub->default_interval = new_interval;
926 pj_gettimeofday(&sub->expiry_time);
927 sub->expiry_time.sec += new_interval;
928
929 /* Update timer only if this is not unsubscription. */
930 if (new_interval > 0) {
931 sub->default_interval = new_interval;
932 sub_schedule_uas_expire( sub, new_interval );
933
934 /* Call callback. */
935 if (sub->cb.on_received_refresh) {
936 sub->pending_tsx++;
937 (*sub->cb.on_received_refresh)(sub, rdata);
938 sub->pending_tsx--;
939 }
940 }
941
942send_response:
943 tdata = pjsip_endpt_create_response( sub->endpt, rdata, status);
944 if (tdata) {
945 if (reason_phrase.slen)
946 tdata->msg->line.status.reason = reason_phrase;
947
948 /* Add Expires header. */
949 expires = pjsip_expires_hdr_create(tdata->pool);
950 expires->ivalue = sub->default_interval;
951 pjsip_msg_add_hdr(tdata->msg, (pjsip_hdr*)expires);
952
953 if (PJSIP_IS_STATUS_IN_CLASS(status,200)) {
954 pjsip_msg_add_hdr(tdata->msg,
955 pjsip_hdr_shallow_clone(tdata->pool, mgr.allow_events));
956 }
957 /* Send down to transaction. */
958 pjsip_tsx_on_tx_msg(tsx, tdata);
959 }
960
961 if (sub->default_interval==0 || !PJSIP_IS_STATUS_IN_CLASS(status,200)) {
962 /* Notify application if sub is terminated. */
963 new_state = PJSIP_EVENT_SUB_STATE_TERMINATED;
964 sub_set_state(sub, new_state);
965 if (new_state!=old_state && sub->cb.on_sub_terminated) {
966 pj_str_t reason = {"", 0};
967 if (reason_phrase.slen) reason = reason_phrase;
968 else reason = *pjsip_get_status_text(status);
969
970 sub->pending_tsx++;
971 (*sub->cb.on_sub_terminated)(sub, &reason);
972 sub->pending_tsx--;
973 }
974 }
975
976 pj_mutex_unlock(sub->mutex);
977
978 /* Prefer to call log when we're not holding the mutex. */
979 PJ_LOG(4,(THIS_FILE, "event_sub%p (%s): sent refresh response %s, status=%d",
980 sub, state[sub->state].ptr,
981 (tdata ? tdata->obj_name : "null"), status));
982
983 /* Check if application has requested deletion. */
984 if (sub->delete_flag && sub->pending_tsx <= 0) {
985 pjsip_event_sub_destroy(sub);
986 }
987
988}
989
990
991/* This function is called when we receive SUBSCRIBE request message for
992 * a new subscription.
993 */
994static void on_new_subscription( pjsip_transaction *tsx, pjsip_rx_data *rdata )
995{
996 package *pkg;
997 pj_pool_t *pool;
998 pjsip_event_sub *sub = NULL;
999 pj_str_t hname;
1000 int status = 200;
1001 pj_str_t reason = { NULL, 0 };
1002 pjsip_tx_data *tdata;
1003 pjsip_expires_hdr *expires;
1004 pjsip_accept_hdr *accept;
1005 pjsip_event_hdr *evhdr;
1006
1007 /* Get the Event header. */
1008 hname = pj_str("Event");
1009 evhdr = pjsip_msg_find_hdr_by_name(rdata->msg, &hname, NULL);
1010 if (!evhdr) {
1011 status = 400;
1012 reason = pj_str("No Event header in request");
1013 goto send_response;
1014 }
1015
1016 /* Find corresponding package.
1017 * We don't lock the manager's mutex since we assume the package list
1018 * won't change once the application is running!
1019 */
1020 pkg = mgr.pkg_list.next;
1021 while (pkg != &mgr.pkg_list) {
1022 if (pj_stricmp(&pkg->event, &evhdr->event_type) == 0)
1023 break;
1024 pkg = pkg->next;
1025 }
1026
1027 if (pkg == &mgr.pkg_list) {
1028 /* Event type is not supported by any packages! */
1029 status = 489;
1030 reason = pj_str("Bad Event");
1031 goto send_response;
1032 }
1033
1034 /* First check that the Accept specification matches the
1035 * package's Accept types.
1036 */
1037 accept = pjsip_msg_find_hdr(rdata->msg, PJSIP_H_ACCEPT, NULL);
1038 if (accept) {
1039 unsigned i;
1040 pj_str_t *content_type = NULL;
1041
1042 for (i=0; i<accept->count && !content_type; ++i) {
1043 int j;
1044 for (j=0; j<pkg->accept_cnt; ++j) {
1045 if (pj_stricmp(&accept->values[i], &pkg->accept[j])==0) {
1046 content_type = &pkg->accept[j];
1047 break;
1048 }
1049 }
1050 }
1051
1052 if (!content_type) {
1053 status = PJSIP_SC_NOT_ACCEPTABLE_HERE;
1054 goto send_response;
1055 }
1056 }
1057
1058 /* Check whether the package wants to accept the subscription. */
1059 pj_assert(pkg->cb.on_query_subscribe != NULL);
1060 (*pkg->cb.on_query_subscribe)(rdata, &status);
1061 if (!PJSIP_IS_STATUS_IN_CLASS(status,200))
1062 goto send_response;
1063
1064 /* Create new subscription record. */
1065 pool = pjsip_endpt_create_pool(tsx->endpt, "esub",
1066 SUB_POOL_SIZE, SUB_POOL_INC);
1067 if (!pool) {
1068 status = 500;
1069 goto send_response;
1070 }
1071 sub = pj_pool_calloc(pool, 1, sizeof(*sub));
1072 sub->pool = pool;
1073 sub->mutex = pj_mutex_create(pool, "esub", PJ_MUTEX_RECURSE);
1074 if (!sub->mutex) {
1075 status = 500;
1076 goto send_response;
1077 }
1078
1079 PJ_LOG(4,(THIS_FILE, "event_sub%p: notifier is created.", sub));
1080
1081 /* Start locking mutex. */
1082 pj_mutex_lock(sub->mutex);
1083
1084 /* Init UAS subscription */
1085 sub->endpt = tsx->endpt;
1086 sub->role = PJSIP_ROLE_UAS;
1087 sub->state = PJSIP_EVENT_SUB_STATE_PENDING;
1088 sub->state_str = state[sub->state];
1089 pj_list_init(&sub->auth_sess);
1090 pj_list_init(&sub->route_set);
1091 sub->from = pjsip_hdr_clone(pool, rdata->to);
1092 pjsip_fromto_set_from(sub->from);
1093 if (sub->from->tag.slen == 0) {
1094 pj_create_unique_string(pool, &sub->from->tag);
1095 rdata->to->tag = sub->from->tag;
1096 }
1097 sub->to = pjsip_hdr_clone(pool, rdata->from);
1098 pjsip_fromto_set_to(sub->to);
1099 sub->contact = pjsip_contact_hdr_create(pool);
1100 sub->contact->uri = sub->from->uri;
1101 sub->call_id = pjsip_cid_hdr_create(pool);
1102 pj_strdup(pool, &sub->call_id->id, &rdata->call_id);
1103 sub->cseq = pj_rand() % 0xFFFF;
1104
1105 expires = pjsip_msg_find_hdr( rdata->msg, PJSIP_H_EXPIRES, NULL);
1106 if (expires) {
1107 sub->default_interval = expires->ivalue;
1108 if (sub->default_interval > 0 &&
1109 sub->default_interval < SECONDS_BEFORE_EXPIRY)
1110 {
1111 status = 423; /* Interval too short. */
1112 goto send_response;
1113 }
1114 } else {
1115 sub->default_interval = 600;
1116 }
1117
1118 /* Clone Event header. */
1119 sub->event = pjsip_hdr_clone(pool, evhdr);
1120
1121 /* Register to hash table. */
1122 create_subscriber_key(&sub->key, pool, PJSIP_ROLE_UAS, &sub->call_id->id,
1123 &sub->from->tag);
1124 pj_mutex_lock(mgr.mutex);
1125 pj_hash_set(pool, mgr.ht, sub->key.ptr, sub->key.slen, sub);
1126 pj_mutex_unlock(mgr.mutex);
1127
1128 /* Set timer where subscription will expire only when expires<>0.
1129 * Subscriber may send new subscription with expires==0.
1130 */
1131 if (sub->default_interval != 0) {
1132 sub_schedule_uas_expire( sub, sub->default_interval-SECONDS_BEFORE_EXPIRY);
1133 }
1134
1135 /* Notify application. */
1136 if (pkg->cb.on_subscribe) {
1137 pjsip_event_sub_cb *cb = NULL;
1138 sub->pending_tsx++;
1139 (*pkg->cb.on_subscribe)(sub, rdata, &cb, &sub->default_interval);
1140 sub->pending_tsx--;
1141 if (cb == NULL)
1142 pj_memset(&sub->cb, 0, sizeof(*cb));
1143 else
1144 pj_memcpy(&sub->cb, cb, sizeof(*cb));
1145 }
1146
1147
1148send_response:
1149 PJ_LOG(4,(THIS_FILE, "event_sub%p (%s)(UAS): status=%d",
1150 sub, state[sub->state].ptr, status));
1151
1152 tdata = pjsip_endpt_create_response( tsx->endpt, rdata, status);
1153 if (tdata) {
1154 if (reason.slen) {
1155 /* Customize reason text. */
1156 tdata->msg->line.status.reason = reason;
1157 }
1158 if (PJSIP_IS_STATUS_IN_CLASS(status,200)) {
1159 /* Add Expires header. */
1160 pjsip_expires_hdr *hdr;
1161
1162 hdr = pjsip_expires_hdr_create(tdata->pool);
1163 hdr->ivalue = sub->default_interval;
1164 pjsip_msg_add_hdr( tdata->msg, (pjsip_hdr*)hdr );
1165 }
1166 if (status == 423) {
1167 /* Add Min-Expires header. */
1168 pjsip_min_expires_hdr *hdr;
1169
1170 hdr = pjsip_min_expires_hdr_create(tdata->pool);
1171 hdr->ivalue = SECONDS_BEFORE_EXPIRY;
1172 pjsip_msg_add_hdr( tdata->msg, (pjsip_hdr*)hdr);
1173 }
1174 if (status == 489 ||
1175 status==PJSIP_SC_NOT_ACCEPTABLE_HERE ||
1176 PJSIP_IS_STATUS_IN_CLASS(status,200))
1177 {
1178 /* Add Allow-Events header. */
1179 pjsip_hdr *hdr;
1180 hdr = pjsip_hdr_shallow_clone(tdata->pool, mgr.allow_events);
1181 pjsip_msg_add_hdr(tdata->msg, hdr);
1182
1183 /* Should add Accept header?. */
1184 }
1185
1186 pjsip_tsx_on_tx_msg(tsx, tdata);
1187 }
1188
1189 /* If received new subscription with expires=0, terminate. */
1190 if (sub && sub->default_interval == 0) {
1191 pj_assert(sub->state == PJSIP_EVENT_SUB_STATE_TERMINATED);
1192 if (sub->cb.on_sub_terminated) {
1193 pj_str_t reason = { "timeout", 7 };
1194 (*sub->cb.on_sub_terminated)(sub, &reason);
1195 }
1196 }
1197
1198 if (!PJSIP_IS_STATUS_IN_CLASS(status,200) || (sub && sub->delete_flag)) {
1199 if (sub && sub->mutex) {
1200 pjsip_event_sub_destroy(sub);
1201 } else if (sub) {
1202 pjsip_endpt_destroy_pool(tsx->endpt, sub->pool);
1203 }
1204 } else {
1205 pj_assert(status >= 200);
1206 pj_mutex_unlock(sub->mutex);
1207 }
1208}
1209
1210/* This is the main callback when SUBSCRIBE request is received. */
1211static void on_subscribe_request(pjsip_transaction *tsx, pjsip_rx_data *rdata)
1212{
1213 pjsip_event_sub *sub = find_sub(rdata);
1214
1215 if (sub)
1216 on_received_sub_refresh(sub, tsx, rdata);
1217 else
1218 on_new_subscription(tsx, rdata);
1219}
1220
1221
1222/* This callback is called when response to SUBSCRIBE is received. */
1223static void on_subscribe_response(void *token, pjsip_event *event)
1224{
1225 pjsip_event_sub *sub = token;
1226 pjsip_transaction *tsx = event->obj.tsx;
1227 int new_state, old_state = sub->state;
1228
1229 pj_assert(tsx->status_code >= 200);
1230 if (tsx->status_code < 200)
1231 return;
1232
1233 pj_assert(sub->role == PJSIP_ROLE_UAC);
1234
1235 /* Lock mutex. */
1236 pj_mutex_lock(sub->mutex);
1237
1238 /* If request failed with 401/407 error, silently retry the request. */
1239 if (tsx->status_code==401 || tsx->status_code==407) {
1240 pjsip_tx_data *tdata;
1241 tdata = pjsip_auth_reinit_req(sub->endpt,
1242 sub->pool, &sub->auth_sess,
1243 sub->cred_cnt, sub->cred_info,
1244 tsx->last_tx, event->src.rdata );
1245 if (tdata) {
1246 int status;
1247 pjsip_cseq_hdr *cseq;
1248 cseq = pjsip_msg_find_hdr(tdata->msg, PJSIP_H_CSEQ, NULL);
1249 cseq->cseq = sub->cseq++;
1250 status = pjsip_endpt_send_request( sub->endpt, tdata,
1251 -1, sub,
1252 &on_subscribe_response);
1253 if (status == 0) {
1254 pj_mutex_unlock(sub->mutex);
1255 return;
1256 }
1257 }
1258 }
1259
1260 if (PJSIP_IS_STATUS_IN_CLASS(tsx->status_code,200)) {
1261 /* Update To tag. */
1262 if (sub->to->tag.slen == 0)
1263 pj_strdup(sub->pool, &sub->to->tag, &event->src.rdata->to_tag);
1264
1265 new_state = sub->state;
1266
1267 } else if (tsx->status_code == 481) {
1268 new_state = PJSIP_EVENT_SUB_STATE_TERMINATED;
1269
1270 } else if (tsx->status_code >= 300) {
1271 /* RFC 3265 Section 3.1.4.2:
1272 * If a SUBSCRIBE request to refresh a subscription fails
1273 * with a non-481 response, the original subscription is still
1274 * considered valid for the duration of original exires.
1275 *
1276 * Note:
1277 * Since we normally send SUBSCRIBE for refreshing the subscription,
1278 * it means the subscription already expired anyway. So we terminate
1279 * the subscription now.
1280 */
1281 if (sub->state != PJSIP_EVENT_SUB_STATE_ACTIVE) {
1282 new_state = PJSIP_EVENT_SUB_STATE_TERMINATED;
1283 } else {
1284 /* Use this to be compliant with Section 3.1.4.2
1285 new_state = sub->state;
1286 */
1287 new_state = PJSIP_EVENT_SUB_STATE_TERMINATED;
1288 }
1289 } else {
1290 pj_assert(0);
1291 new_state = sub->state;
1292 }
1293
1294 if (new_state != sub->state && sub->state != PJSIP_EVENT_SUB_STATE_TERMINATED) {
1295 sub_set_state(sub, new_state);
1296 }
1297
1298 if (sub->state == PJSIP_EVENT_SUB_STATE_ACTIVE ||
1299 sub->state == PJSIP_EVENT_SUB_STATE_PENDING)
1300 {
1301 /*
1302 * Register timer for next subscription refresh, but only when
1303 * we're not unsubscribing. Also update default_interval and Expires
1304 * header.
1305 */
1306 if (sub->default_interval > 0 && !sub->delete_flag) {
1307 pjsip_expires_hdr *exp = NULL;
1308
1309 /* Could be transaction timeout. */
1310 if (event->src_type == PJSIP_EVENT_RX_MSG) {
1311 exp = pjsip_msg_find_hdr(event->src.rdata->msg,
1312 PJSIP_H_EXPIRES, NULL);
1313 }
1314
1315 if (exp) {
1316 int delay = exp->ivalue;
1317 if (delay > 0) {
1318 pj_time_val new_expiry;
1319 pj_gettimeofday(&new_expiry);
1320 new_expiry.sec += delay;
1321 if (sub->timer.id==0 ||
1322 new_expiry.sec < sub->expiry_time.sec-SECONDS_BEFORE_EXPIRY/2)
1323 {
1324 //if (delay > 0 && delay < sub->default_interval) {
1325 sub->default_interval = delay;
1326 sub->uac_expires->ivalue = delay;
1327 update_next_refresh(sub, delay);
1328 }
1329 }
1330 }
1331 }
1332 }
1333
1334 /* Call callback. */
1335 if (!sub->delete_flag) {
1336 if (sub->cb.on_received_sub_response) {
1337 (*sub->cb.on_received_sub_response)(sub, event);
1338 }
1339 }
1340
1341 /* Notify application if we're terminated. */
1342 if (new_state!=old_state && new_state==PJSIP_EVENT_SUB_STATE_TERMINATED) {
1343 if (sub->cb.on_sub_terminated) {
1344 pj_str_t reason;
1345 if (event->src_type == PJSIP_EVENT_RX_MSG)
1346 reason = event->src.rdata->msg->line.status.reason;
1347 else
1348 reason = *pjsip_get_status_text(tsx->status_code);
1349
1350 (*sub->cb.on_sub_terminated)(sub, &reason);
1351 }
1352 }
1353
1354 /* Decrement pending tsx count. */
1355 --sub->pending_tsx;
1356 pj_assert(sub->pending_tsx >= 0);
1357
1358 if (sub->delete_flag && sub->pending_tsx <= 0) {
1359 pjsip_event_sub_destroy(sub);
1360 } else {
1361 pj_mutex_unlock(sub->mutex);
1362 }
1363
1364 /* DO NOT ACCESS sub FROM NOW ON! IT MIGHT HAVE BEEN DELETED */
1365}
1366
1367/*
1368 * This callback called when we receive incoming NOTIFY request.
1369 */
1370static void on_notify_request(pjsip_transaction *tsx, pjsip_rx_data *rdata)
1371{
1372 pjsip_event_sub *sub;
1373 pjsip_tx_data *tdata;
1374 int status = 200;
1375 int old_state;
1376 pj_str_t reason = { NULL, 0 };
1377 pj_str_t reason_phrase = { NULL, 0 };
1378 int new_state = PJSIP_EVENT_SUB_STATE_NULL;
1379
1380 /* Find subscription based on Call-ID and From tag.
1381 * This will also automatically lock the subscription, if it's found.
1382 */
1383 sub = find_sub(rdata);
1384 if (!sub) {
1385 /* RFC 3265: Section 3.2 Description of NOTIFY Behavior:
1386 * Answer with 481 Subscription does not exist.
1387 */
1388 PJ_LOG(4,(THIS_FILE, "Unable to find subscription for incoming NOTIFY!"));
1389 status = 481;
1390 reason_phrase = pj_str("Subscription does not exist");
1391
1392 } else {
1393 pj_assert(sub->role == PJSIP_ROLE_UAC);
1394 PJ_LOG(4,(THIS_FILE, "event_sub%p (%s): received NOTIFY",
1395 sub, state[sub->state].ptr));
1396
1397 }
1398
1399 new_state = old_state = sub->state;
1400
1401 /* RFC 3265: Section 3.2.1
1402 * Check that the Event header match the subscription.
1403 */
1404 if (status == 200) {
1405 pjsip_event_hdr *hdr;
1406 pj_str_t hname = { "Event", 5 };
1407
1408 hdr = pjsip_msg_find_hdr_by_name(rdata->msg, &hname, NULL);
1409 if (!hdr) {
1410 status = PJSIP_SC_BAD_REQUEST;
1411 reason_phrase = pj_str("No Event header found");
1412 } else if (pj_stricmp(&hdr->event_type, &sub->event->event_type) != 0 ||
1413 pj_stricmp(&hdr->id_param, &sub->event->id_param) != 0)
1414 {
1415 status = 481;
1416 reason_phrase = pj_str("Subscription does not exist");
1417 }
1418 }
1419
1420 /* Update subscription state and timer. */
1421 if (status == 200) {
1422 pjsip_sub_state_hdr *hdr;
1423 const pj_str_t hname = { "Subscription-State", 18 };
1424 const pj_str_t state_active = { "active", 6 },
1425 state_pending = { "pending", 7},
1426 state_terminated = { "terminated", 10 };
1427
1428 hdr = pjsip_msg_find_hdr_by_name( rdata->msg, &hname, NULL);
1429 if (!hdr) {
1430 status = PJSIP_SC_BAD_REQUEST;
1431 reason_phrase = pj_str("No Subscription-State header found");
1432 goto process;
1433 }
1434
1435 /*
1436 * Update subscription state.
1437 */
1438 if (pj_stricmp(&hdr->sub_state, &state_active) == 0) {
1439 if (sub->state != PJSIP_EVENT_SUB_STATE_TERMINATED)
1440 new_state = PJSIP_EVENT_SUB_STATE_ACTIVE;
1441 } else if (pj_stricmp(&hdr->sub_state, &state_pending) == 0) {
1442 if (sub->state != PJSIP_EVENT_SUB_STATE_TERMINATED)
1443 new_state = PJSIP_EVENT_SUB_STATE_PENDING;
1444 } else if (pj_stricmp(&hdr->sub_state, &state_terminated) == 0) {
1445 new_state = PJSIP_EVENT_SUB_STATE_TERMINATED;
1446 } else {
1447 new_state = PJSIP_EVENT_SUB_STATE_UNKNOWN;
1448 }
1449
1450 reason = hdr->reason_param;
1451
1452 if (new_state != sub->state && new_state != PJSIP_EVENT_SUB_STATE_NULL &&
1453 sub->state != PJSIP_EVENT_SUB_STATE_TERMINATED)
1454 {
1455 sub_set_state(sub, new_state);
1456 if (new_state == PJSIP_EVENT_SUB_STATE_UNKNOWN) {
1457 pj_strdup_with_null(sub->pool, &sub->state_str, &hdr->sub_state);
1458 } else {
1459 sub->state_str = state[new_state];
1460 }
1461 }
1462
1463 /*
1464 * Update timeout timer in required, just in case notifier changed the
1465 * expiration to shorter time.
1466 * Section 3.2.2: the expires param can only shorten the interval.
1467 */
1468 if ((sub->state==PJSIP_EVENT_SUB_STATE_ACTIVE ||
1469 sub->state==PJSIP_EVENT_SUB_STATE_PENDING) && hdr->expires_param > 0)
1470 {
1471 pj_time_val now, new_expiry;
1472
1473 pj_gettimeofday(&now);
1474 new_expiry.sec = now.sec + hdr->expires_param;
1475 if (sub->timer.id==0 ||
1476 new_expiry.sec < sub->expiry_time.sec-SECONDS_BEFORE_EXPIRY/2)
1477 {
1478 update_next_refresh(sub, hdr->expires_param);
1479 }
1480 }
1481 }
1482
1483process:
1484 /* Note: here we sub MAY BE NULL! */
1485
1486 /* Send response to NOTIFY */
1487 tdata = pjsip_endpt_create_response( tsx->endpt, rdata, status );
1488 if (tdata) {
1489 if (reason_phrase.slen)
1490 tdata->msg->line.status.reason = reason_phrase;
1491
1492 if (PJSIP_IS_STATUS_IN_CLASS(status,200)) {
1493 pjsip_hdr *hdr;
1494 hdr = pjsip_hdr_shallow_clone(tdata->pool, mgr.allow_events);
1495 pjsip_msg_add_hdr( tdata->msg, hdr);
1496 }
1497
1498 pjsip_tsx_on_tx_msg(tsx, tdata);
1499 }
1500
1501 /* Call NOTIFY callback, if any. */
1502 if (sub && PJSIP_IS_STATUS_IN_CLASS(status,200) && sub->cb.on_received_notify) {
1503 sub->pending_tsx++;
1504 (*sub->cb.on_received_notify)(sub, rdata);
1505 sub->pending_tsx--;
1506 }
1507
1508 /* Check if subscription is terminated and call callback. */
1509 if (sub && new_state!=old_state && new_state==PJSIP_EVENT_SUB_STATE_TERMINATED) {
1510 if (sub->cb.on_sub_terminated) {
1511 sub->pending_tsx++;
1512 (*sub->cb.on_sub_terminated)(sub, &reason);
1513 sub->pending_tsx--;
1514 }
1515 }
1516
1517 /* Check if application has requested deletion. */
1518 if (sub && sub->delete_flag && sub->pending_tsx <= 0) {
1519 pjsip_event_sub_destroy(sub);
1520 } else if (sub) {
1521 pj_mutex_unlock(sub->mutex);
1522 }
1523}
1524
1525/* This callback is called when we received NOTIFY response. */
1526static void on_notify_response(void *token, pjsip_event *event)
1527{
1528 pjsip_event_sub *sub = token;
1529 pjsip_event_sub_state old_state = sub->state;
1530 pjsip_transaction *tsx = event->obj.tsx;
1531
1532 /* Lock the subscription. */
1533 pj_mutex_lock(sub->mutex);
1534
1535 pj_assert(sub->role == PJSIP_ROLE_UAS);
1536
1537 /* If request failed with authorization failure, silently retry. */
1538 if (tsx->status_code==401 || tsx->status_code==407) {
1539 pjsip_tx_data *tdata;
1540 tdata = pjsip_auth_reinit_req(sub->endpt,
1541 sub->pool, &sub->auth_sess,
1542 sub->cred_cnt, sub->cred_info,
1543 tsx->last_tx, event->src.rdata );
1544 if (tdata) {
1545 int status;
1546 pjsip_cseq_hdr *cseq;
1547 cseq = pjsip_msg_find_hdr(tdata->msg, PJSIP_H_CSEQ, NULL);
1548 cseq->cseq = sub->cseq++;
1549 status = pjsip_endpt_send_request( sub->endpt, tdata,
1550 -1, sub,
1551 &on_notify_response);
1552 if (status == 0) {
1553 pj_mutex_unlock(sub->mutex);
1554 return;
1555 }
1556 }
1557 }
1558
1559 /* Notify application. */
1560 if (sub->cb.on_received_notify_response)
1561 (*sub->cb.on_received_notify_response)(sub, event);
1562
1563 /* Check for response 481. */
1564 if (event->obj.tsx->status_code == 481) {
1565 /* Remote says that the subscription does not exist!
1566 * Terminate subscription!
1567 */
1568 sub_set_state(sub, PJSIP_EVENT_SUB_STATE_TERMINATED);
1569 if (sub->timer.id) {
1570 pjsip_endpt_cancel_timer(sub->endpt, &sub->timer);
1571 sub->timer.id = 0;
1572 }
1573
1574 PJ_LOG(4, (THIS_FILE,
1575 "event_sub%p (%s): got 481 response to NOTIFY. Terminating...",
1576 sub, state[sub->state].ptr));
1577
1578 /* Notify app. */
1579 if (sub->state!=old_state && sub->cb.on_sub_terminated)
1580 (*sub->cb.on_sub_terminated)(sub, &event->src.rdata->msg->line.status.reason);
1581 }
1582
1583 /* Decrement pending transaction count. */
1584 --sub->pending_tsx;
1585 pj_assert(sub->pending_tsx >= 0);
1586
1587 /* Check that the subscription is marked for deletion. */
1588 if (sub->delete_flag && sub->pending_tsx <= 0) {
1589 pjsip_event_sub_destroy(sub);
1590 } else {
1591 pj_mutex_unlock(sub->mutex);
1592 }
1593
1594 /* DO NOT ACCESS sub, IT MIGHT HAVE BEEN DESTROYED! */
1595}
1596
1597
1598/* This is the transaction handler for incoming SUBSCRIBE and NOTIFY
1599 * requests.
1600 */
1601static void tsx_handler( struct pjsip_module *mod, pjsip_event *event )
1602{
1603 pjsip_msg *msg;
1604 pjsip_rx_data *rdata;
1605
1606 /* Only want incoming message events. */
1607 if (event->src_type != PJSIP_EVENT_RX_MSG)
1608 return;
1609
1610 rdata = event->src.rdata;
1611 msg = rdata->msg;
1612
1613 /* Only want to process request messages. */
1614 if (msg->type != PJSIP_REQUEST_MSG)
1615 return;
1616
1617 /* Only want the first notification. */
1618 if (event->obj.tsx && event->obj.tsx->status_code >= 100)
1619 return;
1620
1621 if (pjsip_method_cmp(&msg->line.req.method, &SUBSCRIBE)==0) {
1622 /* Process incoming SUBSCRIBE request. */
1623 on_subscribe_request( event->obj.tsx, rdata );
1624 } else if (pjsip_method_cmp(&msg->line.req.method, &NOTIFY)==0) {
1625 /* Process incoming NOTIFY request. */
1626 on_notify_request( event->obj.tsx, rdata );
1627 }
1628}
1629