blob: 7f9e56b3926eb4d2997a5498be73b78d69f5d752 [file] [log] [blame]
/* $Id$ */
/*
* Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
* Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <pjmedia/stream.h>
#include <pjmedia/errno.h>
#include <pjmedia/rtp.h>
#include <pjmedia/rtcp.h>
#include <pjmedia/jbuf.h>
#include <pj/array.h>
#include <pj/assert.h>
#include <pj/ctype.h>
#include <pj/compat/socket.h>
#include <pj/errno.h>
#include <pj/ioqueue.h>
#include <pj/log.h>
#include <pj/os.h>
#include <pj/pool.h>
#include <pj/rand.h>
#include <pj/sock_select.h>
#include <pj/string.h> /* memcpy() */
#define THIS_FILE "stream.c"
#define ERRLEVEL 1
#define LOGERR_(expr) stream_perror expr
#define TRC_(expr) PJ_LOG(5,expr)
#define BYTES_PER_SAMPLE 2
/* Limit the number of synthetic audio samples that are generated by PLC.
* Normally PLC should have it's own means to limit the number of
* synthetic frames, so we need to set this to a reasonably large value
* just as precaution
*/
#define MAX_PLC_MSEC PJMEDIA_MAX_PLC_DURATION_MSEC
/* Tracing jitter buffer operations in a stream session to a CSV file.
* The trace will contain JB operation timestamp, frame info, RTP info, and
* the JB state right after the operation.
*/
#define TRACE_JB 0 /* Enable/disable trace. */
#define TRACE_JB_PATH_PREFIX "" /* Optional path/prefix
for the CSV filename. */
#if TRACE_JB
# include <pj/file_io.h>
# define TRACE_JB_INVALID_FD ((pj_oshandle_t)-1)
# define TRACE_JB_OPENED(s) (s->trace_jb_fd != TRACE_JB_INVALID_FD)
#endif
/**
* Media channel.
*/
struct pjmedia_channel
{
pjmedia_stream *stream; /**< Parent stream. */
pjmedia_dir dir; /**< Channel direction. */
unsigned pt; /**< Payload type. */
pj_bool_t paused; /**< Paused?. */
unsigned out_pkt_size; /**< Size of output buffer. */
void *out_pkt; /**< Output buffer. */
pjmedia_rtp_session rtp; /**< RTP session. */
};
struct dtmf
{
int event;
pj_uint32_t duration;
};
/**
* This structure describes media stream.
* A media stream is bidirectional media transmission between two endpoints.
* It consists of two channels, i.e. encoding and decoding channels.
* A media stream corresponds to a single "m=" line in a SDP session
* description.
*/
struct pjmedia_stream
{
pjmedia_endpt *endpt; /**< Media endpoint. */
pjmedia_codec_mgr *codec_mgr; /**< Codec manager instance. */
pjmedia_port port; /**< Port interface. */
pjmedia_channel *enc; /**< Encoding channel. */
pjmedia_channel *dec; /**< Decoding channel. */
pjmedia_dir dir; /**< Stream direction. */
void *user_data; /**< User data. */
pj_str_t cname; /**< SDES CNAME */
pjmedia_transport *transport; /**< Stream transport. */
pjmedia_codec *codec; /**< Codec instance being used. */
pjmedia_codec_param codec_param; /**< Codec param. */
pj_int16_t *enc_buf; /**< Encoding buffer, when enc's
ptime is different than dec.
Otherwise it's NULL. */
unsigned enc_samples_per_pkt;
unsigned enc_buf_size; /**< Encoding buffer size, in
samples. */
unsigned enc_buf_pos; /**< First position in buf. */
unsigned enc_buf_count; /**< Number of samples in the
encoding buffer. */
unsigned plc_cnt; /**< # of consecutive PLC frames*/
unsigned max_plc_cnt; /**< Max # of PLC frames */
unsigned vad_enabled; /**< VAD enabled in param. */
unsigned frame_size; /**< Size of encoded base frame.*/
pj_bool_t is_streaming; /**< Currently streaming?. This
is used to put RTP marker
bit. */
pj_uint32_t ts_vad_disabled;/**< TS when VAD was disabled. */
pj_uint32_t tx_duration; /**< TX duration in timestamp. */
pj_mutex_t *jb_mutex;
pjmedia_jbuf *jb; /**< Jitter buffer. */
char jb_last_frm; /**< Last frame type from jb */
unsigned jb_last_frm_cnt;/**< Last JB frame type counter*/
pjmedia_rtcp_session rtcp; /**< RTCP for incoming RTP. */
pj_uint32_t rtcp_last_tx; /**< RTCP tx time in timestamp */
pj_uint32_t rtcp_interval; /**< Interval, in timestamp. */
pj_bool_t initial_rr; /**< Initial RTCP RR sent */
/* RFC 2833 DTMF transmission queue: */
int tx_event_pt; /**< Outgoing pt for dtmf. */
int tx_dtmf_count; /**< # of digits in tx dtmf buf.*/
struct dtmf tx_dtmf_buf[32];/**< Outgoing dtmf queue. */
/* Incoming DTMF: */
int rx_event_pt; /**< Incoming pt for dtmf. */
int last_dtmf; /**< Current digit, or -1. */
pj_uint32_t last_dtmf_dur; /**< Start ts for cur digit. */
unsigned rx_dtmf_count; /**< # of digits in dtmf rx buf.*/
char rx_dtmf_buf[32];/**< Incoming DTMF buffer. */
/* DTMF callback */
void (*dtmf_cb)(pjmedia_stream*, void*, int);
void *dtmf_cb_user_data;
#if defined(PJMEDIA_HANDLE_G722_MPEG_BUG) && (PJMEDIA_HANDLE_G722_MPEG_BUG!=0)
/* Enable support to handle codecs with inconsistent clock rate
* between clock rate in SDP/RTP & the clock rate that is actually used.
* This happens for example with G.722 and MPEG audio codecs.
*/
pj_bool_t has_g722_mpeg_bug;
/**< Flag to specify whether
normalization process
is needed */
unsigned rtp_tx_ts_len_per_pkt;
/**< Normalized ts length per packet
transmitted according to
'erroneous' definition */
unsigned rtp_rx_ts_len_per_frame;
/**< Normalized ts length per frame
received according to
'erroneous' definition */
pj_uint32_t rtp_rx_last_ts;/**< Last received RTP timestamp
for timestamp checking */
unsigned rtp_rx_last_cnt;/**< Nb of frames in last pkt */
unsigned rtp_rx_check_cnt;
/**< Counter of remote timestamp
checking */
#endif
#if defined(PJMEDIA_HAS_RTCP_XR) && (PJMEDIA_HAS_RTCP_XR != 0)
pj_uint32_t rtcp_xr_last_tx; /**< RTCP XR tx time
in timestamp. */
pj_uint32_t rtcp_xr_interval; /**< Interval, in timestamp. */
pj_sockaddr rtcp_xr_dest; /**< Additional remote RTCP XR
dest. If sin_family is
zero, it will be ignored*/
unsigned rtcp_xr_dest_len; /**< Length of RTCP XR dest
address */
#endif
#if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA!=0
pj_bool_t use_ka; /**< Stream keep-alive with non-
codec-VAD mechanism is
enabled? */
pj_timestamp last_frm_ts_sent; /**< Timestamp of last sending
packet */
#endif
#if TRACE_JB
pj_oshandle_t trace_jb_fd; /**< Jitter tracing file handle.*/
char *trace_jb_buf; /**< Jitter tracing buffer. */
#endif
};
/* RFC 2833 digit */
static const char digitmap[16] = { '0', '1', '2', '3',
'4', '5', '6', '7',
'8', '9', '*', '#',
'A', 'B', 'C', 'D'};
/* Zero audio frame samples */
static pj_int16_t zero_frame[2 * 30 * 16000 / 1000];
/*
* Print error.
*/
static void stream_perror(const char *sender, const char *title,
pj_status_t status)
{
char errmsg[PJ_ERR_MSG_SIZE];
pj_strerror(status, errmsg, sizeof(errmsg));
PJ_LOG(4,(sender, "%s: %s [err:%d]", title, errmsg, status));
}
#if TRACE_JB
PJ_INLINE(int) trace_jb_print_timestamp(char **buf, pj_ssize_t len)
{
pj_time_val now;
pj_parsed_time ptime;
char *p = *buf;
if (len < 14)
return -1;
pj_gettimeofday(&now);
pj_time_decode(&now, &ptime);
p += pj_utoa_pad(ptime.hour, p, 2, '0');
*p++ = ':';
p += pj_utoa_pad(ptime.min, p, 2, '0');
*p++ = ':';
p += pj_utoa_pad(ptime.sec, p, 2, '0');
*p++ = '.';
p += pj_utoa_pad(ptime.msec, p, 3, '0');
*p++ = ',';
*buf = p;
return 0;
}
PJ_INLINE(int) trace_jb_print_state(pjmedia_stream *stream,
char **buf, pj_ssize_t len)
{
char *p = *buf;
char *endp = *buf + len;
pjmedia_jb_state state;
pjmedia_jbuf_get_state(stream->jb, &state);
len = pj_ansi_snprintf(p, endp-p, "%d, %d, %d",
state.size, state.burst, state.prefetch);
if ((len < 0) || (len >= endp-p))
return -1;
p += len;
*buf = p;
return 0;
}
static void trace_jb_get(pjmedia_stream *stream, pjmedia_jb_frame_type ft,
pj_size_t fsize)
{
char *p = stream->trace_jb_buf;
char *endp = stream->trace_jb_buf + PJ_LOG_MAX_SIZE;
pj_ssize_t len = 0;
const char* ft_st;
if (!TRACE_JB_OPENED(stream))
return;
/* Print timestamp. */
if (trace_jb_print_timestamp(&p, endp-p))
goto on_insuff_buffer;
/* Print frame type and size */
switch(ft) {
case PJMEDIA_JB_MISSING_FRAME:
ft_st = "missing";
break;
case PJMEDIA_JB_NORMAL_FRAME:
ft_st = "normal";
break;
case PJMEDIA_JB_ZERO_PREFETCH_FRAME:
ft_st = "prefetch";
break;
case PJMEDIA_JB_ZERO_EMPTY_FRAME:
ft_st = "empty";
break;
default:
ft_st = "unknown";
break;
}
/* Print operation, size, frame count, frame type */
len = pj_ansi_snprintf(p, endp-p, "GET,%d,1,%s,,,,", fsize, ft_st);
if ((len < 0) || (len >= endp-p))
goto on_insuff_buffer;
p += len;
/* Print JB state */
if (trace_jb_print_state(stream, &p, endp-p))
goto on_insuff_buffer;
/* Print end of line */
if (endp-p < 2)
goto on_insuff_buffer;
*p++ = '\n';
/* Write and flush */
len = p - stream->trace_jb_buf;
pj_file_write(stream->trace_jb_fd, stream->trace_jb_buf, &len);
pj_file_flush(stream->trace_jb_fd);
return;
on_insuff_buffer:
pj_assert(!"Trace buffer too small, check PJ_LOG_MAX_SIZE!");
}
static void trace_jb_put(pjmedia_stream *stream, const pjmedia_rtp_hdr *hdr,
unsigned payloadlen, unsigned frame_cnt)
{
char *p = stream->trace_jb_buf;
char *endp = stream->trace_jb_buf + PJ_LOG_MAX_SIZE;
pj_ssize_t len = 0;
if (!TRACE_JB_OPENED(stream))
return;
/* Print timestamp. */
if (trace_jb_print_timestamp(&p, endp-p))
goto on_insuff_buffer;
/* Print operation, size, frame count, RTP info */
len = pj_ansi_snprintf(p, endp-p,
"PUT,%d,%d,,%d,%d,%d,",
payloadlen, frame_cnt,
pj_ntohs(hdr->seq), pj_ntohl(hdr->ts), hdr->m);
if ((len < 0) || (len >= endp-p))
goto on_insuff_buffer;
p += len;
/* Print JB state */
if (trace_jb_print_state(stream, &p, endp-p))
goto on_insuff_buffer;
/* Print end of line */
if (endp-p < 2)
goto on_insuff_buffer;
*p++ = '\n';
/* Write and flush */
len = p - stream->trace_jb_buf;
pj_file_write(stream->trace_jb_fd, stream->trace_jb_buf, &len);
pj_file_flush(stream->trace_jb_fd);
return;
on_insuff_buffer:
pj_assert(!"Trace buffer too small, check PJ_LOG_MAX_SIZE!");
}
#endif /* TRACE_JB */
#if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA != 0
/*
* Send keep-alive packet using non-codec frame.
*/
static void send_keep_alive_packet(pjmedia_stream *stream)
{
#if PJMEDIA_STREAM_ENABLE_KA == PJMEDIA_STREAM_KA_EMPTY_RTP
/* Keep-alive packet is empty RTP */
pj_status_t status;
void *pkt;
int pkt_len;
TRC_((stream->port.info.name.ptr,
"Sending keep-alive (RTCP and empty RTP)"));
/* Send RTP */
status = pjmedia_rtp_encode_rtp( &stream->enc->rtp,
stream->enc->pt, 0,
1,
0,
(const void**)&pkt,
&pkt_len);
pj_assert(status == PJ_SUCCESS);
pj_memcpy(stream->enc->out_pkt, pkt, pkt_len);
pjmedia_transport_send_rtp(stream->transport, stream->enc->out_pkt,
pkt_len);
/* Send RTCP */
pjmedia_rtcp_build_rtcp(&stream->rtcp, &pkt, &pkt_len);
pjmedia_transport_send_rtcp(stream->transport, pkt, pkt_len);
#elif PJMEDIA_STREAM_ENABLE_KA == PJMEDIA_STREAM_KA_USER
/* Keep-alive packet is defined in PJMEDIA_STREAM_KA_USER_PKT */
int pkt_len;
const pj_str_t str_ka = PJMEDIA_STREAM_KA_USER_PKT;
TRC_((stream->port.info.name.ptr,
"Sending keep-alive (custom RTP/RTCP packets)"));
/* Send to RTP port */
pj_memcpy(stream->enc->out_pkt, str_ka.ptr, str_ka.slen);
pkt_len = str_ka.slen;
pjmedia_transport_send_rtp(stream->transport, stream->enc->out_pkt,
pkt_len);
/* Send to RTCP port */
pjmedia_transport_send_rtcp(stream->transport, stream->enc->out_pkt,
pkt_len);
#else
PJ_UNUSED_ARG(stream);
#endif
}
#endif /* defined(PJMEDIA_STREAM_ENABLE_KA) */
/*
* play_callback()
*
* This callback is called by sound device's player thread when it
* needs to feed the player with some frames.
*/
static pj_status_t get_frame( pjmedia_port *port, pjmedia_frame *frame)
{
pjmedia_stream *stream = (pjmedia_stream*) port->port_data.pdata;
pjmedia_channel *channel = stream->dec;
unsigned samples_count, samples_per_frame, samples_required;
pj_int16_t *p_out_samp;
pj_status_t status;
/* Return no frame is channel is paused */
if (channel->paused) {
frame->type = PJMEDIA_FRAME_TYPE_NONE;
return PJ_SUCCESS;
}
/* Repeat get frame from the jitter buffer and decode the frame
* until we have enough frames according to codec's ptime.
*/
/* Lock jitter buffer mutex first */
pj_mutex_lock( stream->jb_mutex );
samples_required = stream->port.info.samples_per_frame;
samples_per_frame = stream->codec_param.info.frm_ptime *
stream->codec_param.info.clock_rate *
stream->codec_param.info.channel_cnt /
1000;
p_out_samp = (pj_int16_t*) frame->buf;
for (samples_count=0; samples_count < samples_required;
samples_count += samples_per_frame)
{
char frame_type;
pj_size_t frame_size;
pj_uint32_t bit_info;
/* Get frame from jitter buffer. */
pjmedia_jbuf_get_frame2(stream->jb, channel->out_pkt, &frame_size,
&frame_type, &bit_info);
#if TRACE_JB
trace_jb_get(stream, frame_type, frame_size);
#endif
if (frame_type == PJMEDIA_JB_MISSING_FRAME) {
/* Activate PLC */
if (stream->codec->op->recover &&
stream->codec_param.setting.plc &&
stream->plc_cnt < stream->max_plc_cnt)
{
pjmedia_frame frame_out;
frame_out.buf = p_out_samp + samples_count;
frame_out.size = frame->size - samples_count*2;
status = (*stream->codec->op->recover)(stream->codec,
frame_out.size,
&frame_out);
++stream->plc_cnt;
} else {
status = -1;
}
if (status != PJ_SUCCESS) {
/* Either PLC failed or PLC not supported/enabled */
pjmedia_zero_samples(p_out_samp + samples_count,
samples_required - samples_count);
}
if (frame_type != stream->jb_last_frm) {
/* Report changing frame type event */
PJ_LOG(5,(stream->port.info.name.ptr, "Frame lost%s!",
(status == PJ_SUCCESS? ", recovered":"")));
stream->jb_last_frm = frame_type;
stream->jb_last_frm_cnt = 1;
} else {
stream->jb_last_frm_cnt++;
}
} else if (frame_type == PJMEDIA_JB_ZERO_EMPTY_FRAME) {
const char *with_plc = "";
/* Jitter buffer is empty. If this is the first "empty" state,
* activate PLC to smoothen the fade-out, otherwise zero
* the frame.
*/
//Using this "if" will only invoke PLC for the first packet
//lost and not the subsequent ones.
//if (frame_type != stream->jb_last_frm) {
if (1) {
/* Activate PLC to smoothen the missing frame */
if (stream->codec->op->recover &&
stream->codec_param.setting.plc &&
stream->plc_cnt < stream->max_plc_cnt)
{
pjmedia_frame frame_out;
do {
frame_out.buf = p_out_samp + samples_count;
frame_out.size = frame->size - samples_count*2;
status = (*stream->codec->op->recover)(stream->codec,
frame_out.size,
&frame_out);
if (status != PJ_SUCCESS)
break;
samples_count += samples_per_frame;
++stream->plc_cnt;
} while (samples_count < samples_required &&
stream->plc_cnt < stream->max_plc_cnt);
with_plc = ", plc invoked";
}
}
if (samples_count < samples_required) {
pjmedia_zero_samples(p_out_samp + samples_count,
samples_required - samples_count);
samples_count = samples_required;
}
if (stream->jb_last_frm != frame_type) {
pjmedia_jb_state jb_state;
/* Report changing frame type event */
pjmedia_jbuf_get_state(stream->jb, &jb_state);
PJ_LOG(5,(stream->port.info.name.ptr,
"Jitter buffer empty (prefetch=%d)%s",
jb_state.prefetch, with_plc));
stream->jb_last_frm = frame_type;
stream->jb_last_frm_cnt = 1;
} else {
stream->jb_last_frm_cnt++;
}
break;
} else if (frame_type != PJMEDIA_JB_NORMAL_FRAME) {
const char *with_plc = "";
/* It can only be PJMEDIA_JB_ZERO_PREFETCH frame */
pj_assert(frame_type == PJMEDIA_JB_ZERO_PREFETCH_FRAME);
/* Always activate PLC when it's available.. */
if (stream->codec->op->recover &&
stream->codec_param.setting.plc &&
stream->plc_cnt < stream->max_plc_cnt)
{
pjmedia_frame frame_out;
do {
frame_out.buf = p_out_samp + samples_count;
frame_out.size = frame->size - samples_count*2;
status = (*stream->codec->op->recover)(stream->codec,
frame_out.size,
&frame_out);
if (status != PJ_SUCCESS)
break;
samples_count += samples_per_frame;
++stream->plc_cnt;
} while (samples_count < samples_required &&
stream->plc_cnt < stream->max_plc_cnt);
with_plc = ", plc invoked";
}
if (samples_count < samples_required) {
pjmedia_zero_samples(p_out_samp + samples_count,
samples_required - samples_count);
samples_count = samples_required;
}
if (stream->jb_last_frm != frame_type) {
pjmedia_jb_state jb_state;
/* Report changing frame type event */
pjmedia_jbuf_get_state(stream->jb, &jb_state);
PJ_LOG(5,(stream->port.info.name.ptr,
"Jitter buffer is bufferring (prefetch=%d)%s",
jb_state.prefetch, with_plc));
stream->jb_last_frm = frame_type;
stream->jb_last_frm_cnt = 1;
} else {
stream->jb_last_frm_cnt++;
}
break;
} else {
/* Got "NORMAL" frame from jitter buffer */
pjmedia_frame frame_in, frame_out;
stream->plc_cnt = 0;
/* Decode */
frame_in.buf = channel->out_pkt;
frame_in.size = frame_size;
frame_in.bit_info = bit_info;
frame_in.type = PJMEDIA_FRAME_TYPE_AUDIO; /* ignored */
frame_out.buf = p_out_samp + samples_count;
frame_out.size = frame->size - samples_count*BYTES_PER_SAMPLE;
status = stream->codec->op->decode( stream->codec, &frame_in,
frame_out.size, &frame_out);
if (status != 0) {
LOGERR_((port->info.name.ptr, "codec decode() error",
status));
pjmedia_zero_samples(p_out_samp + samples_count,
samples_per_frame);
}
if (stream->jb_last_frm != frame_type) {
/* Report changing frame type event */
PJ_LOG(5,(stream->port.info.name.ptr,
"Jitter buffer starts returning normal frames "
"(after %d empty/lost)",
stream->jb_last_frm_cnt, stream->jb_last_frm));
stream->jb_last_frm = frame_type;
stream->jb_last_frm_cnt = 1;
} else {
stream->jb_last_frm_cnt++;
}
}
}
/* Unlock jitter buffer mutex. */
pj_mutex_unlock( stream->jb_mutex );
/* Return PJMEDIA_FRAME_TYPE_NONE if we have no frames at all
* (it can happen when jitter buffer returns PJMEDIA_JB_ZERO_EMPTY_FRAME).
*/
if (samples_count == 0) {
frame->type = PJMEDIA_FRAME_TYPE_NONE;
frame->size = 0;
} else {
frame->type = PJMEDIA_FRAME_TYPE_AUDIO;
frame->size = samples_count * BYTES_PER_SAMPLE;
frame->timestamp.u64 = 0;
}
return PJ_SUCCESS;
}
/* The other version of get_frame callback used when stream port format
* is non linear PCM.
*/
static pj_status_t get_frame_ext( pjmedia_port *port, pjmedia_frame *frame)
{
pjmedia_stream *stream = (pjmedia_stream*) port->port_data.pdata;
pjmedia_channel *channel = stream->dec;
pjmedia_frame_ext *f = (pjmedia_frame_ext*)frame;
unsigned samples_per_frame, samples_required;
pj_status_t status;
/* Return no frame if channel is paused */
if (channel->paused) {
frame->type = PJMEDIA_FRAME_TYPE_NONE;
return PJ_SUCCESS;
}
/* Repeat get frame from the jitter buffer and decode the frame
* until we have enough frames according to codec's ptime.
*/
samples_required = stream->port.info.samples_per_frame;
samples_per_frame = stream->codec_param.info.frm_ptime *
stream->codec_param.info.clock_rate *
stream->codec_param.info.channel_cnt /
1000;
pj_bzero(f, sizeof(pjmedia_frame_ext));
f->base.type = PJMEDIA_FRAME_TYPE_EXTENDED;
while (f->samples_cnt < samples_required) {
char frame_type;
pj_size_t frame_size;
pj_uint32_t bit_info;
/* Lock jitter buffer mutex first */
pj_mutex_lock( stream->jb_mutex );
/* Get frame from jitter buffer. */
pjmedia_jbuf_get_frame2(stream->jb, channel->out_pkt, &frame_size,
&frame_type, &bit_info);
#if TRACE_JB
trace_jb_get(stream, frame_type, frame_size);
#endif
/* Unlock jitter buffer mutex. */
pj_mutex_unlock( stream->jb_mutex );
if (frame_type == PJMEDIA_JB_NORMAL_FRAME) {
/* Got "NORMAL" frame from jitter buffer */
pjmedia_frame frame_in;
/* Decode */
frame_in.buf = channel->out_pkt;
frame_in.size = frame_size;
frame_in.bit_info = bit_info;
frame_in.type = PJMEDIA_FRAME_TYPE_AUDIO;
status = stream->codec->op->decode( stream->codec, &frame_in,
0, frame);
if (status != PJ_SUCCESS) {
LOGERR_((port->info.name.ptr, "codec decode() error",
status));
pjmedia_frame_ext_append_subframe(f, NULL, 0,
(pj_uint16_t)samples_per_frame);
}
if (stream->jb_last_frm != frame_type) {
/* Report changing frame type event */
PJ_LOG(5,(stream->port.info.name.ptr,
"Jitter buffer starts returning normal frames "
"(after %d empty/lost)",
stream->jb_last_frm_cnt, stream->jb_last_frm));
stream->jb_last_frm = frame_type;
stream->jb_last_frm_cnt = 1;
} else {
stream->jb_last_frm_cnt++;
}
} else {
/* Try to generate frame by invoking PLC (when any) */
status = PJ_SUCCESS;
if (stream->codec->op->recover) {
status = (*stream->codec->op->recover)(stream->codec,
0, frame);
}
/* No PLC or PLC failed */
if (!stream->codec->op->recover || status != PJ_SUCCESS) {
pjmedia_frame_ext_append_subframe(f, NULL, 0,
(pj_uint16_t)samples_per_frame);
}
if (frame_type == PJMEDIA_JB_MISSING_FRAME) {
if (frame_type != stream->jb_last_frm) {
/* Report changing frame type event */
PJ_LOG(5,(stream->port.info.name.ptr, "Frame lost!"));
stream->jb_last_frm = frame_type;
stream->jb_last_frm_cnt = 1;
} else {
stream->jb_last_frm_cnt++;
}
} else if (frame_type == PJMEDIA_JB_ZERO_EMPTY_FRAME) {
if (frame_type != stream->jb_last_frm) {
pjmedia_jb_state jb_state;
/* Report changing frame type event */
pjmedia_jbuf_get_state(stream->jb, &jb_state);
PJ_LOG(5,(stream->port.info.name.ptr,
"Jitter buffer empty (prefetch=%d)",
jb_state.prefetch));
stream->jb_last_frm = frame_type;
stream->jb_last_frm_cnt = 1;
} else {
stream->jb_last_frm_cnt++;
}
} else {
/* It can only be PJMEDIA_JB_ZERO_PREFETCH frame */
pj_assert(frame_type == PJMEDIA_JB_ZERO_PREFETCH_FRAME);
if (stream->jb_last_frm != frame_type) {
pjmedia_jb_state jb_state;
/* Report changing frame type event */
pjmedia_jbuf_get_state(stream->jb, &jb_state);
PJ_LOG(5,(stream->port.info.name.ptr,
"Jitter buffer is bufferring (prefetch=%d)",
jb_state.prefetch));
stream->jb_last_frm = frame_type;
stream->jb_last_frm_cnt = 1;
} else {
stream->jb_last_frm_cnt++;
}
}
}
}
return PJ_SUCCESS;
}
/*
* Transmit DTMF
*/
static void create_dtmf_payload(pjmedia_stream *stream,
struct pjmedia_frame *frame_out,
int *first, int *last)
{
pjmedia_rtp_dtmf_event *event;
struct dtmf *digit = &stream->tx_dtmf_buf[0];
pj_uint32_t cur_ts;
pj_assert(sizeof(pjmedia_rtp_dtmf_event) == 4);
*first = *last = 0;
event = (pjmedia_rtp_dtmf_event*) frame_out->buf;
cur_ts = pj_ntohl(stream->enc->rtp.out_hdr.ts);
if (digit->duration == 0) {
PJ_LOG(5,(stream->port.info.name.ptr, "Sending DTMF digit id %c",
digitmap[digit->event]));
*first = 1;
}
digit->duration += stream->port.info.samples_per_frame;
event->event = (pj_uint8_t)digit->event;
event->e_vol = 10;
event->duration = pj_htons((pj_uint16_t)digit->duration);
if (digit->duration >= PJMEDIA_DTMF_DURATION) {
event->e_vol |= 0x80;
*last = 1;
/* Prepare next digit. */
pj_mutex_lock(stream->jb_mutex);
pj_array_erase(stream->tx_dtmf_buf, sizeof(stream->tx_dtmf_buf[0]),
stream->tx_dtmf_count, 0);
--stream->tx_dtmf_count;
pj_mutex_unlock(stream->jb_mutex);
}
frame_out->size = 4;
}
/**
* check_tx_rtcp()
*
* This function is can be called by either put_frame() or get_frame(),
* to transmit periodic RTCP SR/RR report.
*/
static void check_tx_rtcp(pjmedia_stream *stream, pj_uint32_t timestamp)
{
/* Note that timestamp may represent local or remote timestamp,
* depending on whether this function is called from put_frame()
* or get_frame().
*/
if (stream->rtcp_last_tx == 0) {
stream->rtcp_last_tx = timestamp;
} else if (timestamp - stream->rtcp_last_tx >= stream->rtcp_interval) {
void *rtcp_pkt;
int len;
pjmedia_rtcp_build_rtcp(&stream->rtcp, &rtcp_pkt, &len);
pjmedia_transport_send_rtcp(stream->transport, rtcp_pkt, len);
stream->rtcp_last_tx = timestamp;
}
#if defined(PJMEDIA_HAS_RTCP_XR) && (PJMEDIA_HAS_RTCP_XR != 0)
if (stream->rtcp.xr_enabled) {
if (stream->rtcp_xr_last_tx == 0) {
stream->rtcp_xr_last_tx = timestamp;
} else if (timestamp - stream->rtcp_xr_last_tx >=
stream->rtcp_xr_interval)
{
int i;
pjmedia_jb_state jb_state;
void *rtcp_pkt;
int len;
/* Update RTCP XR with current JB states */
pjmedia_jbuf_get_state(stream->jb, &jb_state);
i = jb_state.avg_delay;
pjmedia_rtcp_xr_update_info(&stream->rtcp.xr_session,
PJMEDIA_RTCP_XR_INFO_JB_NOM,
i);
i = jb_state.max_delay;
pjmedia_rtcp_xr_update_info(&stream->rtcp.xr_session,
PJMEDIA_RTCP_XR_INFO_JB_MAX,
i);
/* Build RTCP XR packet */
pjmedia_rtcp_build_rtcp_xr(&stream->rtcp.xr_session, 0,
&rtcp_pkt, &len);
/* Send the RTCP XR to remote address */
pjmedia_transport_send_rtcp(stream->transport, rtcp_pkt, len);
/* Send the RTCP XR to third-party destination if specified */
if (stream->rtcp_xr_dest_len) {
pjmedia_transport_send_rtcp2(stream->transport,
&stream->rtcp_xr_dest,
stream->rtcp_xr_dest_len,
rtcp_pkt, len);
}
/* Update last tx RTCP XR */
stream->rtcp_xr_last_tx = timestamp;
}
}
#endif
}
/* Build RTCP SDES packet */
static unsigned create_rtcp_sdes(pjmedia_stream *stream, pj_uint8_t *pkt,
unsigned max_len)
{
pjmedia_rtcp_common hdr;
pj_uint8_t *p = pkt;
/* SDES header */
hdr.version = 2;
hdr.p = 0;
hdr.count = 1;
hdr.pt = 202;
hdr.length = 2 + (4+stream->cname.slen+3)/4 - 1;
if (max_len < (hdr.length << 2)) {
pj_assert(!"Not enough buffer for SDES packet");
return 0;
}
hdr.length = pj_htons((pj_uint16_t)hdr.length);
hdr.ssrc = stream->enc->rtp.out_hdr.ssrc;
pj_memcpy(p, &hdr, sizeof(hdr));
p += sizeof(hdr);
/* CNAME item */
*p++ = 1;
*p++ = (pj_uint8_t)stream->cname.slen;
pj_memcpy(p, stream->cname.ptr, stream->cname.slen);
p += stream->cname.slen;
/* END */
*p++ = '\0';
*p++ = '\0';
/* Pad to 32bit */
while ((p-pkt) % 4)
*p++ = '\0';
return (p - pkt);
}
/* Build RTCP BYE packet */
static unsigned create_rtcp_bye(pjmedia_stream *stream, pj_uint8_t *pkt,
unsigned max_len)
{
pjmedia_rtcp_common hdr;
/* BYE header */
hdr.version = 2;
hdr.p = 0;
hdr.count = 1;
hdr.pt = 203;
hdr.length = 1;
if (max_len < (hdr.length << 2)) {
pj_assert(!"Not enough buffer for SDES packet");
return 0;
}
hdr.length = pj_htons((pj_uint16_t)hdr.length);
hdr.ssrc = stream->enc->rtp.out_hdr.ssrc;
pj_memcpy(pkt, &hdr, sizeof(hdr));
return sizeof(hdr);
}
/**
* Rebuffer the frame when encoder and decoder has different ptime
* (such as when different iLBC modes are used by local and remote)
*/
static void rebuffer(pjmedia_stream *stream,
pjmedia_frame *frame)
{
/* How many samples are needed */
unsigned count;
/* Normalize frame */
if (frame->type != PJMEDIA_FRAME_TYPE_AUDIO)
frame->size = 0;
/* Remove used frame from the buffer. */
if (stream->enc_buf_pos) {
if (stream->enc_buf_count) {
pj_memmove(stream->enc_buf,
stream->enc_buf + stream->enc_buf_pos,
(stream->enc_buf_count << 1));
}
stream->enc_buf_pos = 0;
}
/* Make sure we have space to store the new frame */
pj_assert(stream->enc_buf_count + (frame->size >> 1) <
stream->enc_buf_size);
/* Append new frame to the buffer */
if (frame->size) {
/* Handle case when there is no port transmitting to this port */
if (frame->buf) {
pj_memcpy(stream->enc_buf + stream->enc_buf_count,
frame->buf, frame->size);
} else {
pj_bzero(stream->enc_buf + stream->enc_buf_count, frame->size);
}
stream->enc_buf_count += (frame->size >> 1);
}
/* How many samples are needed */
count = stream->codec_param.info.enc_ptime *
stream->port.info.clock_rate / 1000;
/* See if we have enough samples */
if (stream->enc_buf_count >= count) {
frame->type = PJMEDIA_FRAME_TYPE_AUDIO;
frame->buf = stream->enc_buf;
frame->size = (count << 1);
stream->enc_buf_pos = count;
stream->enc_buf_count -= count;
} else {
/* We don't have enough samples */
frame->type = PJMEDIA_FRAME_TYPE_NONE;
}
}
/**
* put_frame_imp()
*/
static pj_status_t put_frame_imp( pjmedia_port *port,
const pjmedia_frame *frame )
{
pjmedia_stream *stream = (pjmedia_stream*) port->port_data.pdata;
pjmedia_channel *channel = stream->enc;
pj_status_t status = 0;
pjmedia_frame frame_out;
unsigned ts_len, rtp_ts_len, samples_per_frame;
void *rtphdr;
int rtphdrlen;
int inc_timestamp = 0;
#if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA != 0
/* If the interval since last sending packet is greater than
* PJMEDIA_STREAM_KA_INTERVAL, send keep-alive packet.
*/
if (stream->use_ka)
{
pj_uint32_t dtx_duration;
dtx_duration = pj_timestamp_diff32(&stream->last_frm_ts_sent,
&frame->timestamp);
if (dtx_duration >
PJMEDIA_STREAM_KA_INTERVAL * stream->port.info.clock_rate)
{
send_keep_alive_packet(stream);
stream->last_frm_ts_sent = frame->timestamp;
}
}
#endif
/* Don't do anything if stream is paused */
if (channel->paused) {
stream->enc_buf_pos = stream->enc_buf_count = 0;
return PJ_SUCCESS;
}
/* Number of samples in the frame */
if (frame->type == PJMEDIA_FRAME_TYPE_AUDIO)
ts_len = (frame->size >> 1) / stream->codec_param.info.channel_cnt;
else if (frame->type == PJMEDIA_FRAME_TYPE_EXTENDED)
ts_len = stream->port.info.samples_per_frame /
stream->port.info.channel_count;
else
ts_len = 0;
/* Increment transmit duration */
stream->tx_duration += ts_len;
#if defined(PJMEDIA_HANDLE_G722_MPEG_BUG) && (PJMEDIA_HANDLE_G722_MPEG_BUG!=0)
/* Handle special case for audio codec with RTP timestamp inconsistence
* e.g: G722, MPEG audio.
*/
if (stream->has_g722_mpeg_bug)
rtp_ts_len = stream->rtp_tx_ts_len_per_pkt;
else
rtp_ts_len = ts_len;
#else
rtp_ts_len = ts_len;
#endif
/* Init frame_out buffer. */
frame_out.buf = ((char*)channel->out_pkt) + sizeof(pjmedia_rtp_hdr);
frame_out.size = 0;
/* Calculate number of samples per frame */
samples_per_frame = stream->enc_samples_per_pkt;
/* If we have DTMF digits in the queue, transmit the digits.
* Otherwise encode the PCM buffer.
*/
if (stream->tx_dtmf_count) {
int first=0, last=0;
create_dtmf_payload(stream, &frame_out, &first, &last);
/* Encapsulate into RTP packet. Note that:
* - RTP marker should be set on the beginning of a new event
* - RTP timestamp is constant for the same packet.
*/
status = pjmedia_rtp_encode_rtp( &channel->rtp,
stream->tx_event_pt, first,
frame_out.size,
(first ? rtp_ts_len : 0),
(const void**)&rtphdr,
&rtphdrlen);
if (last) {
/* This is the last packet for the event.
* Increment the RTP timestamp of the RTP session, for next
* RTP packets.
*/
inc_timestamp = PJMEDIA_DTMF_DURATION - rtp_ts_len;
}
/*
* Special treatment for FRAME_TYPE_AUDIO but with frame->buf==NULL.
* This happens when stream input is disconnected from the bridge.
* In this case we periodically transmit RTP frame to keep NAT binding
* open, by giving zero PCM frame to the codec.
*
* This was originally done in http://trac.pjsip.org/repos/ticket/56,
* but then disabled in http://trac.pjsip.org/repos/ticket/439, but
* now it's enabled again.
*/
} else if (frame->type == PJMEDIA_FRAME_TYPE_AUDIO &&
frame->buf == NULL &&
stream->port.info.format.id == PJMEDIA_FORMAT_L16 &&
(stream->dir & PJMEDIA_DIR_ENCODING) &&
stream->codec_param.info.frm_ptime *
stream->codec_param.info.channel_cnt *
stream->codec_param.info.clock_rate/1000 <
PJ_ARRAY_SIZE(zero_frame))
{
pjmedia_frame silence_frame;
pj_bzero(&silence_frame, sizeof(silence_frame));
silence_frame.buf = zero_frame;
silence_frame.size = stream->codec_param.info.frm_ptime * 2 *
stream->codec_param.info.channel_cnt *
stream->codec_param.info.clock_rate / 1000;
silence_frame.type = PJMEDIA_FRAME_TYPE_AUDIO;
silence_frame.timestamp.u32.lo = pj_ntohl(stream->enc->rtp.out_hdr.ts);
/* Encode! */
status = stream->codec->op->encode( stream->codec, &silence_frame,
channel->out_pkt_size -
sizeof(pjmedia_rtp_hdr),
&frame_out);
if (status != PJ_SUCCESS) {
LOGERR_((stream->port.info.name.ptr,
"Codec encode() error", status));
return status;
}
/* Encapsulate. */
status = pjmedia_rtp_encode_rtp( &channel->rtp,
channel->pt, 0,
frame_out.size, rtp_ts_len,
(const void**)&rtphdr,
&rtphdrlen);
/* Encode audio frame */
} else if ((frame->type == PJMEDIA_FRAME_TYPE_AUDIO &&
frame->buf != NULL) ||
(frame->type == PJMEDIA_FRAME_TYPE_EXTENDED))
{
/* Encode! */
status = stream->codec->op->encode( stream->codec, frame,
channel->out_pkt_size -
sizeof(pjmedia_rtp_hdr),
&frame_out);
if (status != PJ_SUCCESS) {
LOGERR_((stream->port.info.name.ptr,
"Codec encode() error", status));
return status;
}
/* Encapsulate. */
status = pjmedia_rtp_encode_rtp( &channel->rtp,
channel->pt, 0,
frame_out.size, rtp_ts_len,
(const void**)&rtphdr,
&rtphdrlen);
} else {
/* Just update RTP session's timestamp. */
status = pjmedia_rtp_encode_rtp( &channel->rtp,
0, 0,
0, rtp_ts_len,
(const void**)&rtphdr,
&rtphdrlen);
}
if (status != PJ_SUCCESS) {
LOGERR_((stream->port.info.name.ptr,
"RTP encode_rtp() error", status));
return status;
}
/* Check if now is the time to transmit RTCP SR/RR report.
* We only do this when stream direction is not "decoding only", because
* when it is, check_tx_rtcp() will be handled by get_frame().
*/
if (stream->dir != PJMEDIA_DIR_DECODING) {
check_tx_rtcp(stream, pj_ntohl(channel->rtp.out_hdr.ts));
}
/* Do nothing if we have nothing to transmit */
if (frame_out.size == 0) {
if (stream->is_streaming) {
PJ_LOG(5,(stream->port.info.name.ptr,"Starting silence"));
stream->is_streaming = PJ_FALSE;
}
return PJ_SUCCESS;
}
/* Copy RTP header to the beginning of packet */
pj_memcpy(channel->out_pkt, rtphdr, sizeof(pjmedia_rtp_hdr));
/* Special case for DTMF: timestamp remains constant for
* the same event, and is only updated after a complete event
* has been transmitted.
*/
if (inc_timestamp) {
pjmedia_rtp_encode_rtp( &channel->rtp, stream->tx_event_pt, 0,
0, inc_timestamp, NULL, NULL);
}
/* Set RTP marker bit if currently not streaming */
if (stream->is_streaming == PJ_FALSE) {
pjmedia_rtp_hdr *rtp = (pjmedia_rtp_hdr*) channel->out_pkt;
rtp->m = 1;
PJ_LOG(5,(stream->port.info.name.ptr,"Start talksprut.."));
}
stream->is_streaming = PJ_TRUE;
/* Send the RTP packet to the transport. */
pjmedia_transport_send_rtp(stream->transport, channel->out_pkt,
frame_out.size + sizeof(pjmedia_rtp_hdr));
/* Update stat */
pjmedia_rtcp_tx_rtp(&stream->rtcp, frame_out.size);
stream->rtcp.stat.rtp_tx_last_ts = pj_ntohl(stream->enc->rtp.out_hdr.ts);
stream->rtcp.stat.rtp_tx_last_seq = pj_ntohs(stream->enc->rtp.out_hdr.seq);
#if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA!=0
/* Update timestamp of last sending packet. */
stream->last_frm_ts_sent = frame->timestamp;
#endif
return PJ_SUCCESS;
}
/**
* put_frame()
*
* This callback is called by upstream component when it has PCM frame
* to transmit. This function encodes the PCM frame, pack it into
* RTP packet, and transmit to peer.
*/
static pj_status_t put_frame( pjmedia_port *port,
const pjmedia_frame *frame )
{
pjmedia_stream *stream = (pjmedia_stream*) port->port_data.pdata;
pjmedia_frame tmp_zero_frame;
unsigned samples_per_frame;
samples_per_frame = stream->enc_samples_per_pkt;
/* http://www.pjsip.org/trac/ticket/56:
* when input is PJMEDIA_FRAME_TYPE_NONE, feed zero PCM frame
* instead so that encoder can decide whether or not to transmit
* silence frame.
*/
if (frame->type == PJMEDIA_FRAME_TYPE_NONE) {
pj_memcpy(&tmp_zero_frame, frame, sizeof(pjmedia_frame));
frame = &tmp_zero_frame;
tmp_zero_frame.buf = NULL;
tmp_zero_frame.size = samples_per_frame * 2;
tmp_zero_frame.type = PJMEDIA_FRAME_TYPE_AUDIO;
}
#if 0
// This is no longer needed because each TYPE_NONE frame will
// be converted into zero frame above
/* If VAD is temporarily disabled during creation, feed zero PCM frame
* to the codec.
*/
if (stream->vad_enabled != stream->codec_param.setting.vad &&
stream->vad_enabled != 0 &&
frame->type == PJMEDIA_FRAME_TYPE_NONE &&
samples_per_frame <= ZERO_PCM_MAX_SIZE)
{
pj_memcpy(&tmp_in_frame, frame, sizeof(pjmedia_frame));
frame = &tmp_in_frame;
tmp_in_frame.buf = NULL;
tmp_in_frame.size = samples_per_frame * 2;
tmp_in_frame.type = PJMEDIA_FRAME_TYPE_AUDIO;
}
#endif
/* If VAD is temporarily disabled during creation, enable it
* after transmitting for VAD_SUSPEND_SEC seconds.
*/
if (stream->vad_enabled != stream->codec_param.setting.vad &&
(stream->tx_duration - stream->ts_vad_disabled) >
stream->port.info.clock_rate * PJMEDIA_STREAM_VAD_SUSPEND_MSEC / 1000)
{
stream->codec_param.setting.vad = stream->vad_enabled;
stream->codec->op->modify(stream->codec, &stream->codec_param);
PJ_LOG(4,(stream->port.info.name.ptr,"VAD re-enabled"));
}
/* If encoder has different ptime than decoder, then the frame must
* be passed through the encoding buffer via rebuffer() function.
*/
if (stream->enc_buf != NULL) {
pjmedia_frame tmp_rebuffer_frame;
pj_status_t status = PJ_SUCCESS;
/* Copy original frame to temporary frame since we need
* to modify it.
*/
pj_memcpy(&tmp_rebuffer_frame, frame, sizeof(pjmedia_frame));
/* Loop while we have full frame in enc_buffer */
for (;;) {
pj_status_t st;
/* Run rebuffer() */
rebuffer(stream, &tmp_rebuffer_frame);
/* Process this frame */
st = put_frame_imp(port, &tmp_rebuffer_frame);
if (st != PJ_SUCCESS)
status = st;
/* If we still have full frame in the buffer, re-run
* rebuffer() with NULL frame.
*/
if (stream->enc_buf_count >= stream->enc_samples_per_pkt) {
tmp_rebuffer_frame.type = PJMEDIA_FRAME_TYPE_NONE;
} else {
/* Otherwise break */
break;
}
}
return status;
} else {
return put_frame_imp(port, frame);
}
}
#if 0
static void dump_bin(const char *buf, unsigned len)
{
unsigned i;
PJ_LOG(3,(THIS_FILE, "begin dump"));
for (i=0; i<len; ++i) {
int j;
char bits[9];
unsigned val = buf[i] & 0xFF;
bits[8] = '\0';
for (j=0; j<8; ++j) {
if (val & (1 << (7-j)))
bits[j] = '1';
else
bits[j] = '0';
}
PJ_LOG(3,(THIS_FILE, "%2d %s [%d]", i, bits, val));
}
PJ_LOG(3,(THIS_FILE, "end dump"));
}
#endif
/*
* Handle incoming DTMF digits.
*/
static void handle_incoming_dtmf( pjmedia_stream *stream,
const void *payload, unsigned payloadlen)
{
pjmedia_rtp_dtmf_event *event = (pjmedia_rtp_dtmf_event*) payload;
/* Check compiler packing. */
pj_assert(sizeof(pjmedia_rtp_dtmf_event)==4);
/* Must have sufficient length before we proceed. */
if (payloadlen < sizeof(pjmedia_rtp_dtmf_event))
return;
//dump_bin(payload, payloadlen);
/* Check if this is the same/current digit of the last packet. */
if (stream->last_dtmf != -1 &&
event->event == stream->last_dtmf &&
pj_ntohs(event->duration) >= stream->last_dtmf_dur)
{
/* Yes, this is the same event. */
stream->last_dtmf_dur = pj_ntohs(event->duration);
return;
}
/* Ignore unknown event. */
if (event->event > 15) {
PJ_LOG(5,(stream->port.info.name.ptr,
"Ignored RTP pkt with bad DTMF event %d",
event->event));
return;
}
/* New event! */
PJ_LOG(5,(stream->port.info.name.ptr, "Received DTMF digit %c, vol=%d",
digitmap[event->event],
(event->e_vol & 0x3F)));
stream->last_dtmf = event->event;
stream->last_dtmf_dur = pj_ntohs(event->duration);
/* If DTMF callback is installed, call the callback, otherwise keep
* the DTMF digits in the buffer.
*/
if (stream->dtmf_cb) {
stream->dtmf_cb(stream, stream->dtmf_cb_user_data,
digitmap[event->event]);
} else {
/* By convention, we use jitter buffer's mutex to access shared
* DTMF variables.
*/
pj_mutex_lock(stream->jb_mutex);
if (stream->rx_dtmf_count >= PJ_ARRAY_SIZE(stream->rx_dtmf_buf)) {
/* DTMF digits overflow. Discard the oldest digit. */
pj_array_erase(stream->rx_dtmf_buf,
sizeof(stream->rx_dtmf_buf[0]),
stream->rx_dtmf_count, 0);
--stream->rx_dtmf_count;
}
stream->rx_dtmf_buf[stream->rx_dtmf_count++] = digitmap[event->event];
pj_mutex_unlock(stream->jb_mutex);
}
}
/*
* This callback is called by stream transport on receipt of packets
* in the RTP socket.
*/
static void on_rx_rtp( void *data,
void *pkt,
pj_ssize_t bytes_read)
{
pjmedia_stream *stream = (pjmedia_stream*) data;
pjmedia_channel *channel = stream->dec;
const pjmedia_rtp_hdr *hdr;
const void *payload;
unsigned payloadlen;
pjmedia_rtp_status seq_st;
pj_bool_t check_pt;
pj_status_t status;
pj_bool_t pkt_discarded = PJ_FALSE;
/* Check for errors */
if (bytes_read < 0) {
LOGERR_((stream->port.info.name.ptr, "RTP recv() error", -bytes_read));
return;
}
/* Ignore keep-alive packets */
if (bytes_read < (pj_ssize_t) sizeof(pjmedia_rtp_hdr))
return;
/* Update RTP and RTCP session. */
status = pjmedia_rtp_decode_rtp(&channel->rtp, pkt, bytes_read,
&hdr, &payload, &payloadlen);
if (status != PJ_SUCCESS) {
LOGERR_((stream->port.info.name.ptr, "RTP decode error", status));
stream->rtcp.stat.rx.discard++;
return;
}
/* Ignore the packet if decoder is paused */
if (channel->paused)
goto on_return;
/* Update RTP session (also checks if RTP session can accept
* the incoming packet.
*/
check_pt = (hdr->pt != stream->rx_event_pt) && PJMEDIA_STREAM_CHECK_RTP_PT;
pjmedia_rtp_session_update2(&channel->rtp, hdr, &seq_st, check_pt);
#if !PJMEDIA_STREAM_CHECK_RTP_PT
if (!check_pt && hdr->pt != channel->rtp.out_pt &&
hdr->pt != stream->rx_event_pt)
{
seq_st.status.flag.badpt = 1;
}
#endif
if (seq_st.status.value) {
TRC_ ((stream->port.info.name.ptr,
"RTP status: badpt=%d, badssrc=%d, dup=%d, "
"outorder=%d, probation=%d, restart=%d",
seq_st.status.flag.badpt,
seq_st.status.flag.badssrc,
seq_st.status.flag.dup,
seq_st.status.flag.outorder,
seq_st.status.flag.probation,
seq_st.status.flag.restart));
if (seq_st.status.flag.badpt) {
PJ_LOG(4,(stream->port.info.name.ptr,
"Bad RTP pt %d (expecting %d)",
hdr->pt, channel->rtp.out_pt));
}
if (seq_st.status.flag.badssrc) {
PJ_LOG(4,(stream->port.info.name.ptr,
"Changed RTP peer SSRC %d (previously %d)",
channel->rtp.peer_ssrc, stream->rtcp.peer_ssrc));
stream->rtcp.peer_ssrc = channel->rtp.peer_ssrc;
}
}
/* Skip bad RTP packet */
if (seq_st.status.flag.bad) {
pkt_discarded = PJ_TRUE;
goto on_return;
}
/* Ignore if payloadlen is zero */
if (payloadlen == 0) {
pkt_discarded = PJ_TRUE;
goto on_return;
}
/* Handle incoming DTMF. */
if (hdr->pt == stream->rx_event_pt) {
/* Ignore out-of-order packet as it will be detected as new
* digit. Also ignore duplicate packet as it serves no use.
*/
if (seq_st.status.flag.outorder || seq_st.status.flag.dup) {
goto on_return;
}
handle_incoming_dtmf(stream, payload, payloadlen);
goto on_return;
}
/* Put "good" packet to jitter buffer, or reset the jitter buffer
* when RTP session is restarted.
*/
pj_mutex_lock( stream->jb_mutex );
if (seq_st.status.flag.restart) {
status = pjmedia_jbuf_reset(stream->jb);
PJ_LOG(4,(stream->port.info.name.ptr, "Jitter buffer reset"));
} else {
/*
* Packets may contain more than one frames, while the jitter
* buffer can only take one frame per "put" operation. So we need
* to ask the codec to "parse" the payload into multiple frames.
*/
enum { MAX = 16 };
pj_timestamp ts;
unsigned i, count = MAX;
unsigned ts_span;
pjmedia_frame frames[MAX];
/* Get the timestamp of the first sample */
ts.u64 = pj_ntohl(hdr->ts);
/* Parse the payload. */
status = (*stream->codec->op->parse)(stream->codec,
(void*)payload,
payloadlen,
&ts,
&count,
frames);
if (status != PJ_SUCCESS) {
LOGERR_((stream->port.info.name.ptr,
"Codec parse() error",
status));
count = 0;
}
#if defined(PJMEDIA_HANDLE_G722_MPEG_BUG) && (PJMEDIA_HANDLE_G722_MPEG_BUG!=0)
/* This code is used to learn the samples per frame value that is put
* by remote endpoint, for codecs with inconsistent clock rate such
* as G.722 or MPEG audio. We need to learn the samples per frame
* value as it is used as divider when inserting frames into the
* jitter buffer.
*/
if (stream->has_g722_mpeg_bug) {
if (stream->rtp_rx_check_cnt) {
/* Make sure the detection performed only on two consecutive
* packets with valid RTP sequence and no wrapped timestamp.
*/
if (seq_st.diff == 1 && stream->rtp_rx_last_ts &&
ts.u64 > stream->rtp_rx_last_ts &&
stream->rtp_rx_last_cnt > 0)
{
unsigned peer_frm_ts_diff;
unsigned frm_ts_span;
/* Calculate actual frame timestamp span */
frm_ts_span = stream->port.info.samples_per_frame /
stream->codec_param.setting.frm_per_pkt/
stream->port.info.channel_count;
/* Get remote frame timestamp span */
peer_frm_ts_diff =
((pj_uint32_t)ts.u64-stream->rtp_rx_last_ts) /
stream->rtp_rx_last_cnt;
/* Possibilities remote's samples per frame for G.722
* are only (frm_ts_span) and (frm_ts_span/2), this
* validation is needed to avoid wrong decision because
* of silence frames.
*/
if (stream->codec_param.info.pt == PJMEDIA_RTP_PT_G722 &&
(peer_frm_ts_diff == frm_ts_span ||
peer_frm_ts_diff == (frm_ts_span>>1)))
{
if (peer_frm_ts_diff < stream->rtp_rx_ts_len_per_frame)
stream->rtp_rx_ts_len_per_frame = peer_frm_ts_diff;
if (--stream->rtp_rx_check_cnt == 0) {
PJ_LOG(4, (THIS_FILE, "G722 codec used, remote"
" samples per frame detected = %d",
stream->rtp_rx_ts_len_per_frame));
/* Reset jitter buffer once detection done */
pjmedia_jbuf_reset(stream->jb);
}
}
}
stream->rtp_rx_last_ts = (pj_uint32_t)ts.u64;
stream->rtp_rx_last_cnt = count;
}
ts_span = stream->rtp_rx_ts_len_per_frame;
/* Adjust the timestamp of the parsed frames */
for (i=0; i<count; ++i) {
frames[i].timestamp.u64 = ts.u64 + ts_span * i;
}
} else {
ts_span = stream->codec_param.info.frm_ptime *
stream->codec_param.info.clock_rate /
1000;
}
#else
ts_span = stream->codec_param.info.frm_ptime *
stream->codec_param.info.clock_rate /
1000;
#endif
/* Put each frame to jitter buffer. */
for (i=0; i<count; ++i) {
unsigned ext_seq;
pj_bool_t discarded;
ext_seq = (unsigned)(frames[i].timestamp.u64 / ts_span);
pjmedia_jbuf_put_frame2(stream->jb, frames[i].buf, frames[i].size,
frames[i].bit_info, ext_seq, &discarded);
if (discarded)
pkt_discarded = PJ_TRUE;
}
#if TRACE_JB
trace_jb_put(stream, hdr, payloadlen, count);
#endif
}
pj_mutex_unlock( stream->jb_mutex );
/* Check if now is the time to transmit RTCP SR/RR report.
* We only do this when stream direction is "decoding only",
* because otherwise check_tx_rtcp() will be handled by put_frame()
*/
if (stream->dir == PJMEDIA_DIR_DECODING) {
check_tx_rtcp(stream, pj_ntohl(hdr->ts));
}
if (status != 0) {
LOGERR_((stream->port.info.name.ptr, "Jitter buffer put() error",
status));
pkt_discarded = PJ_TRUE;
goto on_return;
}
on_return:
/* Update RTCP session */
if (stream->rtcp.peer_ssrc == 0)
stream->rtcp.peer_ssrc = channel->rtp.peer_ssrc;
pjmedia_rtcp_rx_rtp2(&stream->rtcp, pj_ntohs(hdr->seq),
pj_ntohl(hdr->ts), payloadlen, pkt_discarded);
/* Send RTCP RR and SDES after we receive some RTP packets */
if (stream->rtcp.received >= 10 && !stream->initial_rr) {
void *sr_rr_pkt;
pj_uint8_t *pkt;
int len;
/* Build RR or SR */
pjmedia_rtcp_build_rtcp(&stream->rtcp, &sr_rr_pkt, &len);
pkt = (pj_uint8_t*) stream->enc->out_pkt;
pj_memcpy(pkt, sr_rr_pkt, len);
pkt += len;
/* Append SDES */
len = create_rtcp_sdes(stream, (pj_uint8_t*)pkt,
stream->enc->out_pkt_size - len);
if (len > 0) {
pkt += len;
len = ((pj_uint8_t*)pkt) - ((pj_uint8_t*)stream->enc->out_pkt);
pjmedia_transport_send_rtcp(stream->transport,
stream->enc->out_pkt, len);
}
stream->initial_rr = PJ_TRUE;
}
}
/*
* This callback is called by stream transport on receipt of packets
* in the RTCP socket.
*/
static void on_rx_rtcp( void *data,
void *pkt,
pj_ssize_t bytes_read)
{
pjmedia_stream *stream = (pjmedia_stream*) data;
/* Check for errors */
if (bytes_read < 0) {
LOGERR_((stream->port.info.name.ptr, "RTCP recv() error",
-bytes_read));
return;
}
pjmedia_rtcp_rx_rtcp(&stream->rtcp, pkt, bytes_read);
}
/*
* Create media channel.
*/
static pj_status_t create_channel( pj_pool_t *pool,
pjmedia_stream *stream,
pjmedia_dir dir,
unsigned pt,
const pjmedia_stream_info *param,
pjmedia_channel **p_channel)
{
pjmedia_channel *channel;
pj_status_t status;
unsigned min_out_pkt_size;
/* Allocate memory for channel descriptor */
channel = PJ_POOL_ZALLOC_T(pool, pjmedia_channel);
PJ_ASSERT_RETURN(channel != NULL, PJ_ENOMEM);
/* Init channel info. */
channel->stream = stream;
channel->dir = dir;
channel->paused = 1;
channel->pt = pt;
/* Allocate buffer for outgoing packet. */
channel->out_pkt_size = sizeof(pjmedia_rtp_hdr) +
stream->codec_param.info.max_bps *
PJMEDIA_MAX_FRAME_DURATION_MS /
8 / 1000;
if (channel->out_pkt_size > PJMEDIA_MAX_MTU)
channel->out_pkt_size = PJMEDIA_MAX_MTU;
/* It should big enough to hold (minimally) RTCP SR with an SDES. */
min_out_pkt_size = sizeof(pjmedia_rtcp_sr_pkt) +
sizeof(pjmedia_rtcp_common) +
(4 + stream->cname.slen) +
32;
if (channel->out_pkt_size < min_out_pkt_size)
channel->out_pkt_size = min_out_pkt_size;
channel->out_pkt = pj_pool_alloc(pool, channel->out_pkt_size);
PJ_ASSERT_RETURN(channel->out_pkt != NULL, PJ_ENOMEM);
/* Create RTP and RTCP sessions: */
if (param->rtp_seq_ts_set == 0) {
status = pjmedia_rtp_session_init(&channel->rtp, pt, param->ssrc);
} else {
pjmedia_rtp_session_setting settings;
settings.flags = (pj_uint8_t)((param->rtp_seq_ts_set << 2) | 3);
settings.default_pt = pt;
settings.sender_ssrc = param->ssrc;
settings.seq = param->rtp_seq;
settings.ts = param->rtp_ts;
status = pjmedia_rtp_session_init2(&channel->rtp, settings);
}
if (status != PJ_SUCCESS)
return status;
/* Done. */
*p_channel = channel;
return PJ_SUCCESS;
}
/*
* Create media stream.
*/
PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt,
pj_pool_t *pool,
const pjmedia_stream_info *info,
pjmedia_transport *tp,
void *user_data,
pjmedia_stream **p_stream)
{
enum { M = 32 };
pjmedia_stream *stream;
pj_str_t name;
unsigned jb_init, jb_max, jb_min_pre, jb_max_pre, len;
char *p;
pj_status_t status;
PJ_ASSERT_RETURN(pool && info && p_stream, PJ_EINVAL);
/* Allocate the media stream: */
stream = PJ_POOL_ZALLOC_T(pool, pjmedia_stream);
PJ_ASSERT_RETURN(stream != NULL, PJ_ENOMEM);
/* Init stream/port name */
name.ptr = (char*) pj_pool_alloc(pool, M);
name.slen = pj_ansi_snprintf(name.ptr, M, "strm%p", stream);
/* Init some port-info. Some parts of the info will be set later
* once we have more info about the codec.
*/
pjmedia_port_info_init(&stream->port.info, &name,
PJMEDIA_PORT_SIGNATURE('S', 'T', 'R', 'M'),
info->fmt.clock_rate, info->fmt.channel_cnt,
16, 80);
/* Init port. */
pj_strdup(pool, &stream->port.info.encoding_name, &info->fmt.encoding_name);
stream->port.info.clock_rate = info->fmt.clock_rate;
stream->port.info.channel_count = info->fmt.channel_cnt;
stream->port.port_data.pdata = stream;
/* Init stream: */
stream->endpt = endpt;
stream->codec_mgr = pjmedia_endpt_get_codec_mgr(endpt);
stream->dir = info->dir;
stream->user_data = user_data;
stream->rtcp_interval = (PJMEDIA_RTCP_INTERVAL-500 + (pj_rand()%1000)) *
info->fmt.clock_rate / 1000;
stream->tx_event_pt = info->tx_event_pt ? info->tx_event_pt : -1;
stream->rx_event_pt = info->rx_event_pt ? info->rx_event_pt : -1;
stream->last_dtmf = -1;
stream->jb_last_frm = PJMEDIA_JB_NORMAL_FRAME;
#if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA!=0
stream->use_ka = info->use_ka;
#endif
/* Build random RTCP CNAME. CNAME has user@host format */
stream->cname.ptr = p = (char*) pj_pool_alloc(pool, 20);
pj_create_random_string(p, 5);
p += 5;
*p++ = '@'; *p++ = 'p'; *p++ = 'j';
pj_create_random_string(p, 6);
p += 6;
*p++ = '.'; *p++ = 'o'; *p++ = 'r'; *p++ = 'g';
stream->cname.slen = p - stream->cname.ptr;
/* Create mutex to protect jitter buffer: */
status = pj_mutex_create_simple(pool, NULL, &stream->jb_mutex);
if (status != PJ_SUCCESS)
goto err_cleanup;
/* Create and initialize codec: */
status = pjmedia_codec_mgr_alloc_codec( stream->codec_mgr,
&info->fmt, &stream->codec);
if (status != PJ_SUCCESS)
goto err_cleanup;
/* Get codec param: */
if (info->param)
stream->codec_param = *info->param;
else {
status = pjmedia_codec_mgr_get_default_param(stream->codec_mgr,
&info->fmt,
&stream->codec_param);
if (status != PJ_SUCCESS)
goto err_cleanup;
}
/* Check for invalid max_bps. */
if (stream->codec_param.info.max_bps < stream->codec_param.info.avg_bps)
stream->codec_param.info.max_bps = stream->codec_param.info.avg_bps;
/* Check for invalid frame per packet. */
if (stream->codec_param.setting.frm_per_pkt < 1)
stream->codec_param.setting.frm_per_pkt = 1;
/* Open the codec. */
status = stream->codec->op->open(stream->codec, &stream->codec_param);
if (status != PJ_SUCCESS)
goto err_cleanup;
/* Set additional info and callbacks. */
stream->port.info.bits_per_sample = 16;
stream->port.info.samples_per_frame = info->fmt.clock_rate *
stream->codec_param.info.channel_cnt *
stream->codec_param.info.frm_ptime *
stream->codec_param.setting.frm_per_pkt /
1000;
stream->port.info.format.id = stream->codec_param.info.fmt_id;
if (stream->codec_param.info.fmt_id == PJMEDIA_FORMAT_L16) {
/* Raw format */
stream->port.info.bytes_per_frame = stream->port.info.samples_per_frame *
stream->port.info.bits_per_sample / 8;
stream->port.put_frame = &put_frame;
stream->port.get_frame = &get_frame;
} else {
/* Encoded format */
stream->port.info.bytes_per_frame = stream->codec_param.info.max_bps *
stream->codec_param.info.frm_ptime *
stream->codec_param.setting.frm_per_pkt /
8 / 1000;
if ((stream->codec_param.info.max_bps * stream->codec_param.info.frm_ptime *
stream->codec_param.setting.frm_per_pkt) % 8000 != 0)
{
++stream->port.info.bytes_per_frame;
}
stream->port.info.format.bitrate = stream->codec_param.info.avg_bps;
stream->port.info.format.vad = (stream->codec_param.setting.vad != 0);
stream->port.put_frame = &put_frame;
stream->port.get_frame = &get_frame_ext;
}
/* If encoder and decoder's ptime are asymmetric, then we need to
* create buffer on the encoder side. This could happen for example
* with iLBC
*/
if (stream->codec_param.info.enc_ptime!=0 &&
stream->codec_param.info.enc_ptime!=stream->codec_param.info.frm_ptime)
{
unsigned ptime;
stream->enc_samples_per_pkt = stream->codec_param.info.enc_ptime *
stream->codec_param.info.channel_cnt *
stream->port.info.clock_rate / 1000;
/* Set buffer size as twice the largest ptime value between
* stream's ptime, encoder ptime, or decoder ptime.
*/
ptime = stream->port.info.samples_per_frame * 1000 /
stream->port.info.clock_rate;
if (stream->codec_param.info.enc_ptime > ptime)
ptime = stream->codec_param.info.enc_ptime;
if (stream->codec_param.info.frm_ptime > ptime)
ptime = stream->codec_param.info.frm_ptime;
ptime <<= 1;
/* Allocate buffer */
stream->enc_buf_size = stream->port.info.clock_rate * ptime / 1000;
stream->enc_buf = (pj_int16_t*)
pj_pool_alloc(pool, stream->enc_buf_size * 2);
} else {
stream->enc_samples_per_pkt = stream->port.info.samples_per_frame;
}
/* Initially disable the VAD in the stream, to help traverse NAT better */
stream->vad_enabled = stream->codec_param.setting.vad;
if (PJMEDIA_STREAM_VAD_SUSPEND_MSEC > 0 && stream->vad_enabled) {
stream->codec_param.setting.vad = 0;
stream->ts_vad_disabled = 0;
stream->codec->op->modify(stream->codec, &stream->codec_param);
PJ_LOG(4,(stream->port.info.name.ptr,"VAD temporarily disabled"));
}
/* Get the frame size */
stream->frame_size = stream->codec_param.info.max_bps *
stream->codec_param.info.frm_ptime / 8 / 1000;
if ((stream->codec_param.info.max_bps * stream->codec_param.info.frm_ptime)
% 8000 != 0)
{
++stream->frame_size;
}
/* How many consecutive PLC frames can be generated */
stream->max_plc_cnt = (MAX_PLC_MSEC+stream->codec_param.info.frm_ptime-1)/
stream->codec_param.info.frm_ptime;
#if defined(PJMEDIA_HANDLE_G722_MPEG_BUG) && (PJMEDIA_HANDLE_G722_MPEG_BUG!=0)
stream->rtp_rx_check_cnt = 5;
stream->has_g722_mpeg_bug = PJ_FALSE;
stream->rtp_rx_last_ts = 0;
stream->rtp_rx_last_cnt = 0;
stream->rtp_tx_ts_len_per_pkt = stream->enc_samples_per_pkt /
stream->codec_param.info.channel_cnt;
stream->rtp_rx_ts_len_per_frame = stream->port.info.samples_per_frame /
stream->codec_param.setting.frm_per_pkt /
stream->codec_param.info.channel_cnt;
if (info->fmt.pt == PJMEDIA_RTP_PT_G722) {
stream->has_g722_mpeg_bug = PJ_TRUE;
/* RTP clock rate = 1/2 real clock rate */
stream->rtp_tx_ts_len_per_pkt >>= 1;
}
#endif
/* Init jitter buffer parameters: */
if (info->jb_max >= stream->codec_param.info.frm_ptime)
jb_max = (info->jb_max + stream->codec_param.info.frm_ptime - 1) /
stream->codec_param.info.frm_ptime;
else
jb_max = 500 / stream->codec_param.info.frm_ptime;
if (info->jb_min_pre >= stream->codec_param.info.frm_ptime)
jb_min_pre = info->jb_min_pre / stream->codec_param.info.frm_ptime;
else
//jb_min_pre = 60 / stream->codec_param.info.frm_ptime;
jb_min_pre = 1;
if (info->jb_max_pre >= stream->codec_param.info.frm_ptime)
jb_max_pre = info->jb_max_pre / stream->codec_param.info.frm_ptime;
else
//jb_max_pre = 240 / stream->codec_param.info.frm_ptime;
jb_max_pre = jb_max * 4 / 5;
if (info->jb_init >= stream->codec_param.info.frm_ptime)
jb_init = info->jb_init / stream->codec_param.info.frm_ptime;
else
//jb_init = (jb_min_pre + jb_max_pre) / 2;
jb_init = 0;
/* Create jitter buffer */
status = pjmedia_jbuf_create(pool, &stream->port.info.name,
stream->frame_size,
stream->codec_param.info.frm_ptime,
jb_max, &stream->jb);
if (status != PJ_SUCCESS)
goto err_cleanup;
/* Set up jitter buffer */
pjmedia_jbuf_set_adaptive( stream->jb, jb_init, jb_min_pre, jb_max_pre);
/* Create decoder channel: */
status = create_channel( pool, stream, PJMEDIA_DIR_DECODING,
info->fmt.pt, info, &stream->dec);
if (status != PJ_SUCCESS)
goto err_cleanup;
/* Create encoder channel: */
status = create_channel( pool, stream, PJMEDIA_DIR_ENCODING,
info->tx_pt, info, &stream->enc);
if (status != PJ_SUCCESS)
goto err_cleanup;
/* Init RTCP session: */
{
pjmedia_rtcp_session_setting rtcp_setting;
pjmedia_rtcp_session_setting_default(&rtcp_setting);
rtcp_setting.name = stream->port.info.name.ptr;
rtcp_setting.ssrc = info->ssrc;
rtcp_setting.rtp_ts_base = pj_ntohl(stream->enc->rtp.out_hdr.ts);
rtcp_setting.clock_rate = info->fmt.clock_rate;
rtcp_setting.samples_per_frame = stream->port.info.samples_per_frame;
#if defined(PJMEDIA_HANDLE_G722_MPEG_BUG) && (PJMEDIA_HANDLE_G722_MPEG_BUG!=0)
/* Special case for G.722 */
if (info->fmt.pt == PJMEDIA_RTP_PT_G722) {
rtcp_setting.clock_rate = 8000;
rtcp_setting.samples_per_frame = 160;
}
#endif
pjmedia_rtcp_init2(&stream->rtcp, &rtcp_setting);
}
/* Only attach transport when stream is ready. */
status = pjmedia_transport_attach(tp, stream, &info->rem_addr,
&info->rem_rtcp,
pj_sockaddr_get_len(&info->rem_addr),
&on_rx_rtp, &on_rx_rtcp);
if (status != PJ_SUCCESS)
goto err_cleanup;
stream->transport = tp;
#if defined(PJMEDIA_HAS_RTCP_XR) && (PJMEDIA_HAS_RTCP_XR != 0)
/* Enable RTCP XR and update stream info/config to RTCP XR */
if (info->rtcp_xr_enabled) {
int i;
pjmedia_rtcp_enable_xr(&stream->rtcp, PJ_TRUE);
/* Set RTCP XR TX interval */
if (info->rtcp_xr_interval != 0)
stream->rtcp_xr_interval = info->rtcp_xr_interval;
else
stream->rtcp_xr_interval = (PJMEDIA_RTCP_INTERVAL +
(pj_rand() % 8000)) *
info->fmt.clock_rate / 1000;
/* Additional third-party RTCP XR destination */
if (info->rtcp_xr_dest.addr.sa_family != 0) {
stream->rtcp_xr_dest_len = pj_sockaddr_get_len(&info->rtcp_xr_dest);
pj_memcpy(&stream->rtcp_xr_dest, &info->rtcp_xr_dest,
stream->rtcp_xr_dest_len);
}
/* jitter buffer adaptive info */
i = PJMEDIA_RTCP_XR_JB_ADAPTIVE;
pjmedia_rtcp_xr_update_info(&stream->rtcp.xr_session,
PJMEDIA_RTCP_XR_INFO_CONF_JBA,
i);
/* Jitter buffer aggressiveness info (estimated) */
i = 7;
pjmedia_rtcp_xr_update_info(&stream->rtcp.xr_session,
PJMEDIA_RTCP_XR_INFO_CONF_JBR,
i);
/* Jitter buffer absolute maximum delay */
i = jb_max * stream->codec_param.info.frm_ptime;
pjmedia_rtcp_xr_update_info(&stream->rtcp.xr_session,
PJMEDIA_RTCP_XR_INFO_JB_ABS_MAX,
i);
/* PLC info */
if (stream->codec_param.setting.plc == 0)
i = PJMEDIA_RTCP_XR_PLC_DIS;
else
#if PJMEDIA_WSOLA_IMP==PJMEDIA_WSOLA_IMP_WSOLA
i = PJMEDIA_RTCP_XR_PLC_ENH;
#else
i = PJMEDIA_RTCP_XR_PLC_DIS;
#endif
pjmedia_rtcp_xr_update_info(&stream->rtcp.xr_session,
PJMEDIA_RTCP_XR_INFO_CONF_PLC,
i);
}
#endif
/* Send RTCP SDES */
len = create_rtcp_sdes(stream, (pj_uint8_t*)stream->enc->out_pkt,
stream->enc->out_pkt_size);
if (len != 0) {
pjmedia_transport_send_rtcp(stream->transport,
stream->enc->out_pkt, len);
}
#if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA!=0
/* NAT hole punching by sending KA packet via RTP transport. */
if (stream->use_ka)
send_keep_alive_packet(stream);
#endif
#if TRACE_JB
{
char trace_name[PJ_MAXPATH];
pj_ssize_t len;
pj_ansi_snprintf(trace_name, sizeof(trace_name),
TRACE_JB_PATH_PREFIX "%s.csv",
stream->port.info.name.ptr);
status = pj_file_open(pool, trace_name, PJ_O_RDWR, &stream->trace_jb_fd);
if (status != PJ_SUCCESS) {
stream->trace_jb_fd = TRACE_JB_INVALID_FD;
PJ_LOG(3,(THIS_FILE, "Failed creating RTP trace file '%s'",
trace_name));
} else {
stream->trace_jb_buf = (char*)pj_pool_alloc(pool, PJ_LOG_MAX_SIZE);
/* Print column header */
len = pj_ansi_snprintf(stream->trace_jb_buf, PJ_LOG_MAX_SIZE,
"Time, Operation, Size, Frame Count, "
"Frame type, RTP Seq, RTP TS, RTP M, "
"JB size, JB burst level, JB prefetch\n");
pj_file_write(stream->trace_jb_fd, stream->trace_jb_buf, &len);
pj_file_flush(stream->trace_jb_fd);
}
}
#endif
/* Success! */
*p_stream = stream;
PJ_LOG(5,(THIS_FILE, "Stream %s created", stream->port.info.name.ptr));
return PJ_SUCCESS;
err_cleanup:
pjmedia_stream_destroy(stream);
return status;
}
/*
* Destroy stream.
*/
PJ_DEF(pj_status_t) pjmedia_stream_destroy( pjmedia_stream *stream )
{
unsigned len;
PJ_ASSERT_RETURN(stream != NULL, PJ_EINVAL);
#if defined(PJMEDIA_HAS_RTCP_XR) && (PJMEDIA_HAS_RTCP_XR != 0)
/* Send RTCP XR on stream destroy */
if (stream->rtcp.xr_enabled) {
int i;
pjmedia_jb_state jb_state;
void *rtcp_pkt;
int len;
/* Update RTCP XR with current JB states */
pjmedia_jbuf_get_state(stream->jb, &jb_state);
i = jb_state.avg_delay;
pjmedia_rtcp_xr_update_info(&stream->rtcp.xr_session,
PJMEDIA_RTCP_XR_INFO_JB_NOM,
i);
i = jb_state.max_delay;
pjmedia_rtcp_xr_update_info(&stream->rtcp.xr_session,
PJMEDIA_RTCP_XR_INFO_JB_MAX,
i);
/* Build RTCP XR packet */
pjmedia_rtcp_build_rtcp_xr(&stream->rtcp.xr_session, 0,
&rtcp_pkt, &len);
/* Send the RTCP XR to remote address */
pjmedia_transport_send_rtcp(stream->transport, rtcp_pkt, len);
/* Send the RTCP XR to third-party destination if specified */
if (stream->rtcp_xr_dest_len) {
pjmedia_transport_send_rtcp2(stream->transport,
&stream->rtcp_xr_dest,
stream->rtcp_xr_dest_len,
rtcp_pkt, len);
}
}
#endif
/* Send RTCP BYE */
if (stream->enc && stream->transport) {
len = create_rtcp_bye(stream, (pj_uint8_t*)stream->enc->out_pkt,
stream->enc->out_pkt_size);
if (len != 0) {
pjmedia_transport_send_rtcp(stream->transport,
stream->enc->out_pkt, len);
}
}
/* Detach from transport
* MUST NOT hold stream mutex while detaching from transport, as
* it may cause deadlock. See ticket #460 for the details.
*/
if (stream->transport) {
pjmedia_transport_detach(stream->transport, stream);
stream->transport = NULL;
}
/* This function may be called when stream is partly initialized. */
if (stream->jb_mutex)
pj_mutex_lock(stream->jb_mutex);
/* Free codec. */
if (stream->codec) {
stream->codec->op->close(stream->codec);
pjmedia_codec_mgr_dealloc_codec(stream->codec_mgr, stream->codec);
stream->codec = NULL;
}
/* Free mutex */
if (stream->jb_mutex) {
pj_mutex_destroy(stream->jb_mutex);
stream->jb_mutex = NULL;
}
/* Destroy jitter buffer */
if (stream->jb)
pjmedia_jbuf_destroy(stream->jb);
#if TRACE_JB
if (TRACE_JB_OPENED(stream)) {
pj_file_close(stream->trace_jb_fd);
stream->trace_jb_fd = TRACE_JB_INVALID_FD;
}
#endif
return PJ_SUCCESS;
}
/*
* Get the port interface.
*/
PJ_DEF(pj_status_t) pjmedia_stream_get_port( pjmedia_stream *stream,
pjmedia_port **p_port )
{
*p_port = &stream->port;
return PJ_SUCCESS;
}
/*
* Get the transport object
*/
PJ_DEF(pjmedia_transport*) pjmedia_stream_get_transport(pjmedia_stream *st)
{
return st->transport;
}
/*
* Start stream.
*/
PJ_DEF(pj_status_t) pjmedia_stream_start(pjmedia_stream *stream)
{
PJ_ASSERT_RETURN(stream && stream->enc && stream->dec, PJ_EINVALIDOP);
if (stream->enc && (stream->dir & PJMEDIA_DIR_ENCODING)) {
stream->enc->paused = 0;
//pjmedia_snd_stream_start(stream->enc->snd_stream);
PJ_LOG(4,(stream->port.info.name.ptr, "Encoder stream started"));
} else {
PJ_LOG(4,(stream->port.info.name.ptr, "Encoder stream paused"));
}
if (stream->dec && (stream->dir & PJMEDIA_DIR_DECODING)) {
stream->dec->paused = 0;
//pjmedia_snd_stream_start(stream->dec->snd_stream);
PJ_LOG(4,(stream->port.info.name.ptr, "Decoder stream started"));
} else {
PJ_LOG(4,(stream->port.info.name.ptr, "Decoder stream paused"));
}
return PJ_SUCCESS;
}
/*
* Get stream statistics.
*/
PJ_DEF(pj_status_t) pjmedia_stream_get_stat( const pjmedia_stream *stream,
pjmedia_rtcp_stat *stat)
{
PJ_ASSERT_RETURN(stream && stat, PJ_EINVAL);
pj_memcpy(stat, &stream->rtcp.stat, sizeof(pjmedia_rtcp_stat));
return PJ_SUCCESS;
}
/*
* Reset the stream statistics in the middle of a stream session.
*/
PJ_DEF(pj_status_t) pjmedia_stream_reset_stat(pjmedia_stream *stream)
{
PJ_ASSERT_RETURN(stream, PJ_EINVAL);
pjmedia_rtcp_init_stat(&stream->rtcp.stat);
return PJ_SUCCESS;
}
#if defined(PJMEDIA_HAS_RTCP_XR) && (PJMEDIA_HAS_RTCP_XR != 0)
/*
* Get stream extended statistics.
*/
PJ_DEF(pj_status_t) pjmedia_stream_get_stat_xr( const pjmedia_stream *stream,
pjmedia_rtcp_xr_stat *stat)
{
PJ_ASSERT_RETURN(stream && stat, PJ_EINVAL);
if (stream->rtcp.xr_enabled) {
pj_memcpy(stat, &stream->rtcp.xr_session.stat, sizeof(pjmedia_rtcp_xr_stat));
return PJ_SUCCESS;
}
return PJ_ENOTFOUND;
}
#endif
/*
* Get jitter buffer state.
*/
PJ_DEF(pj_status_t) pjmedia_stream_get_stat_jbuf(const pjmedia_stream *stream,
pjmedia_jb_state *state)
{
PJ_ASSERT_RETURN(stream && state, PJ_EINVAL);
return pjmedia_jbuf_get_state(stream->jb, state);
}
/*
* Pause stream.
*/
PJ_DEF(pj_status_t) pjmedia_stream_pause( pjmedia_stream *stream,
pjmedia_dir dir)
{
PJ_ASSERT_RETURN(stream, PJ_EINVAL);
if ((dir & PJMEDIA_DIR_ENCODING) && stream->enc) {
stream->enc->paused = 1;
PJ_LOG(4,(stream->port.info.name.ptr, "Encoder stream paused"));
}
if ((dir & PJMEDIA_DIR_DECODING) && stream->dec) {
stream->dec->paused = 1;
/* Also reset jitter buffer */
pj_mutex_lock( stream->jb_mutex );
pjmedia_jbuf_reset(stream->jb);
pj_mutex_unlock( stream->jb_mutex );
PJ_LOG(4,(stream->port.info.name.ptr, "Decoder stream paused"));
}
return PJ_SUCCESS;
}
/*
* Resume stream
*/
PJ_DEF(pj_status_t) pjmedia_stream_resume( pjmedia_stream *stream,
pjmedia_dir dir)
{
PJ_ASSERT_RETURN(stream, PJ_EINVAL);
if ((dir & PJMEDIA_DIR_ENCODING) && stream->enc) {
stream->enc->paused = 0;
PJ_LOG(4,(stream->port.info.name.ptr, "Encoder stream resumed"));
}
if ((dir & PJMEDIA_DIR_DECODING) && stream->dec) {
stream->dec->paused = 0;
PJ_LOG(4,(stream->port.info.name.ptr, "Decoder stream resumed"));
}
return PJ_SUCCESS;
}
/*
* Dial DTMF
*/
PJ_DEF(pj_status_t) pjmedia_stream_dial_dtmf( pjmedia_stream *stream,
const pj_str_t *digit_char)
{
pj_status_t status = PJ_SUCCESS;
/* By convention we use jitter buffer mutex to access DTMF
* queue.
*/
PJ_ASSERT_RETURN(stream && digit_char, PJ_EINVAL);
/* Check that remote can receive DTMF events. */
if (stream->tx_event_pt < 0) {
return PJMEDIA_RTP_EREMNORFC2833;
}
pj_mutex_lock(stream->jb_mutex);
if (stream->tx_dtmf_count+digit_char->slen >=
(long)PJ_ARRAY_SIZE(stream->tx_dtmf_buf))
{
status = PJ_ETOOMANY;
} else {
int i;
/* convert ASCII digits into payload type first, to make sure
* that all digits are valid.
*/
for (i=0; i<digit_char->slen; ++i) {
unsigned pt;
int dig = pj_tolower(digit_char->ptr[i]);
if (dig >= '0' && dig <= '9')
{
pt = dig - '0';
}
else if (dig >= 'a' && dig <= 'd')
{
pt = dig - 'a' + 12;
}
else if (dig == '*')
{
pt = 10;
}
else if (dig == '#')
{
pt = 11;
}
else
{
status = PJMEDIA_RTP_EINDTMF;
break;
}
stream->tx_dtmf_buf[stream->tx_dtmf_count+i].event = pt;
stream->tx_dtmf_buf[stream->tx_dtmf_count+i].duration = 0;
}
if (status != PJ_SUCCESS)
goto on_return;
/* Increment digit count only if all digits are valid. */
stream->tx_dtmf_count += digit_char->slen;
}
on_return:
pj_mutex_unlock(stream->jb_mutex);
return status;
}
/*
* See if we have DTMF digits in the rx buffer.
*/
PJ_DEF(pj_bool_t) pjmedia_stream_check_dtmf(pjmedia_stream *stream)
{
return stream->rx_dtmf_count != 0;
}
/*
* Retrieve incoming DTMF digits from the stream's DTMF buffer.
*/
PJ_DEF(pj_status_t) pjmedia_stream_get_dtmf( pjmedia_stream *stream,
char *digits,
unsigned *size)
{
PJ_ASSERT_RETURN(stream && digits && size, PJ_EINVAL);
pj_assert(sizeof(stream->rx_dtmf_buf[0]) == 0);
/* By convention, we use jitter buffer's mutex to access DTMF
* digits resources.
*/
pj_mutex_lock(stream->jb_mutex);
if (stream->rx_dtmf_count < *size)
*size = stream->rx_dtmf_count;
if (*size) {
pj_memcpy(digits, stream->rx_dtmf_buf, *size);
stream->rx_dtmf_count -= *size;
if (stream->rx_dtmf_count) {
pj_memmove(stream->rx_dtmf_buf,
&stream->rx_dtmf_buf[*size],
stream->rx_dtmf_count);
}
}
pj_mutex_unlock(stream->jb_mutex);
return PJ_SUCCESS;
}
/*
* Set callback to be called upon receiving DTMF digits.
*/
PJ_DEF(pj_status_t) pjmedia_stream_set_dtmf_callback(pjmedia_stream *stream,
void (*cb)(pjmedia_stream*,
void *user_data,
int digit),
void *user_data)
{
PJ_ASSERT_RETURN(stream, PJ_EINVAL);
/* By convention, we use jitter buffer's mutex to access DTMF
* digits resources.
*/
pj_mutex_lock(stream->jb_mutex);
stream->dtmf_cb = cb;
stream->dtmf_cb_user_data = user_data;
pj_mutex_unlock(stream->jb_mutex);
return PJ_SUCCESS;
}