modules/rtp_in/rtp_in.c (712 lines of code) (raw):
/*
* GPAC - Multimedia Framework C SDK
*
* Authors: Jean Le Feuvre
* Copyright (c) Telecom ParisTech 2000-2012
* All rights reserved
*
* This file is part of GPAC / RTP input module
*
* GPAC is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2, or (at your option)
* any later version.
*
* GPAC is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; see the file COPYING. If not, write to
* the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
*
*/
#include "rtp_in.h"
#ifndef GPAC_DISABLE_STREAMING
#define RTP_DEFAULT_TIMEOUT 10000
#define RTSP_DEFAULT_TIMEOUT 2000
static void RT_LoadPrefs(GF_InputService *plug, RTPClient *rtp)
{
const char *sOpt;
sOpt = gf_modules_get_option((GF_BaseInterface *)plug, "Streaming", "DefaultPort");
if (sOpt) {
rtp->default_port = atoi(sOpt);
} else {
rtp->default_port = 554;
}
if ((rtp->default_port == 80) || (rtp->default_port == 8080))
gf_modules_set_option((GF_BaseInterface *)plug, "Streaming", "RTPoverRTSP", "yes");
sOpt = gf_modules_get_option((GF_BaseInterface *)plug, "Streaming", "RTPoverRTSP");
if (sOpt && !stricmp(sOpt, "yes")) {
rtp->transport_mode = 1;
} else if (sOpt && !stricmp(sOpt, "OnlyCritical")) {
rtp->transport_mode = 2;
} else {
rtp->transport_mode = 0;
}
/*
get heneral network config for UDP
*/
/*if UDP not available don't try it*/
sOpt = gf_modules_get_option((GF_BaseInterface *)plug, "Network", "UDPNotAvailable");
if (sOpt && !stricmp(sOpt, "yes")) {
if (!rtp->transport_mode) rtp->transport_mode = 1;
/*turn it off*/
gf_modules_set_option((GF_BaseInterface *)plug, "Network", "UDPNotAvailable", "no");
}
if (!rtp->transport_mode) {
sOpt = gf_modules_get_option((GF_BaseInterface *)plug, "Network", "UDPTimeout");
if (sOpt ) {
rtp->udp_time_out = atoi(sOpt);
} else {
rtp->udp_time_out = RTP_DEFAULT_TIMEOUT;
}
}
sOpt = gf_modules_get_option((GF_BaseInterface *)plug, "Streaming", "RTSPTimeout");
if (sOpt ) {
rtp->time_out = atoi(sOpt);
} else {
rtp->time_out = RTSP_DEFAULT_TIMEOUT;
}
/*packet drop emulation*/
sOpt = gf_modules_get_option((GF_BaseInterface *)plug, "Streaming", "FirstPacketDrop");
if (sOpt) {
rtp->first_packet_drop = atoi(sOpt);
} else {
rtp->first_packet_drop = 0;
}
sOpt = gf_modules_get_option((GF_BaseInterface *)plug, "Streaming", "PacketDropFrequency");
if (sOpt) {
rtp->frequency_drop = atoi(sOpt);
} else {
rtp->frequency_drop = 0;
}
// rtp->handle_announce = 0;
}
static void RP_cleanup(RTPClient *rtp)
{
RTSPSession *sess;
while (gf_list_count(rtp->channels)) {
RTPStream *ch = (RTPStream *)gf_list_get(rtp->channels, 0);
gf_list_rem(rtp->channels, 0);
RP_DeleteStream(ch);
}
while ( (sess = (RTSPSession *)gf_list_last(rtp->sessions)) ) {
gf_list_rem_last(rtp->sessions);
RP_DelSession(sess);
}
if (rtp->session_desc) gf_odf_desc_del(rtp->session_desc);
rtp->session_desc = NULL;
if (rtp->sdp_temp) {
gf_free(rtp->sdp_temp->remote_url);
gf_free(rtp->sdp_temp);
}
rtp->sdp_temp = NULL;
}
u32 RP_Thread(void *param)
{
u32 i;
GF_NetworkCommand com;
RTSPSession *sess;
RTPStream *ch;
RTPClient *rtp = (RTPClient *)param;
rtp->th_state = 1;
com.command_type = GF_NET_CHAN_BUFFER_QUERY;
while (rtp->th_state) {
gf_mx_p(rtp->mx);
/*fecth data on udp*/
i=0;
while ((ch = (RTPStream *)gf_list_enum(rtp->channels, &i))) {
if ((ch->flags & RTP_EOS) || (ch->status!=RTP_Running) ) continue;
/*for interleaved channels don't read too fast, query the buffer occupancy*/
if (ch->flags & RTP_INTERLEAVED) {
com.base.on_channel = ch->channel;
gf_service_command(rtp->service, &com, GF_OK);
/*if no buffering, use a default value (3 sec of data should do it)*/
if (!com.buffer.max) com.buffer.max = 3000;
if (com.buffer.occupancy <= com.buffer.max) ch->rtsp->flags |= RTSP_TCP_FLUSH;
} else {
RP_ReadStream(ch);
}
}
/*and process commands / flush TCP*/
i=0;
while ((sess = (RTSPSession *)gf_list_enum(rtp->sessions, &i))) {
RP_ProcessCommands(sess);
if (sess->connect_error) {
gf_service_connect_ack(sess->owner->service, NULL, sess->connect_error);
sess->connect_error = GF_OK;
}
}
gf_mx_v(rtp->mx);
gf_sleep(1);
}
if (rtp->dnload) gf_service_download_del(rtp->dnload);
rtp->dnload = NULL;
rtp->th_state = 2;
return 0;
}
static const char * sdp_mime = "application/sdp";
static const char * sdp_exts = "sdp";
static const char * sdp_desc = "OnDemand Media/Multicast Session";
static u32 RP_RegisterMimeTypes(const GF_InputService *plug) {
if (!plug)
return 0;
gf_service_register_mime(plug, sdp_mime, sdp_exts, sdp_desc);
return 1;
}
static Bool RP_CanHandleURL(GF_InputService *plug, const char *url)
{
char *sExt;
if (!plug || !url)
return GF_FALSE;
sExt = strrchr(url, '.');
if (sExt && gf_service_check_mime_register(plug, sdp_mime, sdp_exts, sdp_desc, sExt)) return GF_TRUE;
/*local */
if (strstr(url, "data:application/sdp")) return GF_TRUE;
/*embedded data*/
if (strstr(url, "data:application/mpeg4-od-au;base64") ||
strstr(url, "data:application/mpeg4-bifs-au;base64") ||
strstr(url, "data:application/mpeg4-es-au;base64")) return GF_TRUE;
/*we need rtsp/tcp , rtsp/udp or direct RTP sender (no control)*/
if (!strnicmp(url, "rtsp://", 7) || !strnicmp(url, "rtspu://", 8) || !strnicmp(url, "rtp://", 6)) return GF_TRUE;
/*we don't check extensions*/
return GF_FALSE;
}
GF_Err RP_ConnectServiceEx(GF_InputService *plug, GF_ClientService *serv, const char *url, Bool skip_migration)
{
char *session_cache;
RTSPSession *sess;
RTPClient *priv = (RTPClient *)plug->priv;
/*store user address*/
priv->service = serv;
if (priv->dnload) gf_service_download_del(priv->dnload);
priv->dnload = NULL;
GF_LOG(GF_LOG_INFO, GF_LOG_RTP, ("[RTP] Opening service %s\n", url));
/*load preferences*/
RT_LoadPrefs(plug, priv);
/*start thread*/
gf_th_run(priv->th, RP_Thread, priv);
if (!skip_migration) {
session_cache = (char *) gf_modules_get_option((GF_BaseInterface *) plug, "Streaming", "SessionMigrationFile");
if (session_cache && session_cache[0]) {
FILE *f = gf_fopen(session_cache, "rb");
if (f) {
gf_fclose(f);
GF_LOG(GF_LOG_INFO, GF_LOG_RTP, ("[RTP] Restarting RTSP session from %s\n", session_cache));
RP_FetchSDP(priv, (char *) session_cache, NULL, (char *) url);
return GF_OK;
}
if (!strncmp(session_cache, "http://", 7)) {
GF_LOG(GF_LOG_INFO, GF_LOG_RTP, ("[RTP] Restarting RTSP session from %s\n", session_cache));
RP_FetchSDP(priv, (char *) session_cache, NULL, (char *) url);
return GF_OK;
}
}
}
/*local or remote SDP*/
if (strstr(url, "data:application/sdp") || (strnicmp(url, "rtsp", 4) && strstr(url, ".sdp")) ) {
RP_FetchSDP(priv, (char *) url, NULL, NULL);
return GF_OK;
}
/*rtsp and rtsp over udp*/
if (!strnicmp(url, "rtsp://", 7) || !strnicmp(url, "rtspu://", 8)) {
char *the_url = gf_strdup(url);
char *the_ext = strrchr(the_url, '#');
if (the_ext) {
if (!stricmp(the_ext, "#audio")) priv->media_type = GF_MEDIA_OBJECT_AUDIO;
else if (!stricmp(the_ext, "#video")) priv->media_type = GF_MEDIA_OBJECT_VIDEO;
the_ext[0] = 0;
}
sess = RP_NewSession(priv, (char *) the_url);
gf_free(the_url);
if (!sess) {
gf_service_connect_ack(serv, NULL, GF_NOT_SUPPORTED);
} else {
RP_Describe(sess, 0, NULL);
}
return GF_OK;
}
/*direct RTP (no control) or embedded data - this means the service is attached to a single channel (no IOD)
reply right away*/
gf_service_connect_ack(serv, NULL, GF_OK);
RP_SetupObjects(priv);
return GF_OK;
}
GF_Err RP_ConnectService(GF_InputService *plug, GF_ClientService *serv, const char *url)
{
return RP_ConnectServiceEx(plug, serv, url, GF_FALSE);
}
static void RP_FlushCommands(RTPClient *rtp)
{
u32 i, nb_com;
RTSPSession *sess;
/*process teardown on all sessions*/
while (1) {
nb_com = 0;
i=0;
while ((sess = (RTSPSession *)gf_list_enum(rtp->sessions, &i))) {
if (!sess->connect_error)
nb_com += gf_list_count(sess->rtsp_commands);
}
if (!nb_com) break;
gf_sleep(10);
}
}
static GF_Err RP_CloseService(GF_InputService *plug)
{
u32 i;
const char *opt;
RTSPSession *sess;
RTPClient *rtp = (RTPClient *)plug->priv;
GF_LOG(GF_LOG_DEBUG, GF_LOG_RTP, ("[RTP] Closing service\n"));
RP_FlushCommands(rtp);
if (rtp->session_migration) {
opt = gf_modules_get_option((GF_BaseInterface *) plug, "Streaming", "SessionMigrationPause");
if (opt && !strcmp(opt, "yes")) {
GF_NetworkCommand com;
com.command_type = GF_NET_CHAN_PAUSE;
com.base.on_channel = NULL;
/*send pause on all sessions*/
i=0;
while ((sess = (RTSPSession *)gf_list_enum(rtp->sessions, &i))) {
RP_UserCommand(sess, NULL, &com);
}
}
RP_SaveSessionState(rtp);
} else {
/*remove session state file*/
if (rtp->session_state_data) {
gf_free(rtp->session_state_data);
rtp->session_state_data = NULL;
}
/*send teardown on all sessions*/
i=0;
while ((sess = (RTSPSession *)gf_list_enum(rtp->sessions, &i))) {
RP_Teardown(sess, NULL);
}
}
RP_FlushCommands(rtp);
/*shutdown thread*/
if (rtp->th_state==1) rtp->th_state = 0;
/*confirm close*/
gf_service_disconnect_ack(rtp->service, NULL, GF_OK);
return GF_OK;
}
static GF_Descriptor *RP_GetServiceDesc(GF_InputService *plug, u32 expect_type, const char *sub_url)
{
GF_Descriptor *desc;
RTPClient *priv = (RTPClient *)plug->priv;
GF_LOG(GF_LOG_DEBUG, GF_LOG_RTP, ("[RTP] Fetching service descriptor\n"));
if ((expect_type!=GF_MEDIA_OBJECT_UNDEF) && (expect_type!=GF_MEDIA_OBJECT_SCENE) && (expect_type!=GF_MEDIA_OBJECT_UPDATES)) {
/*ignore the SDP IOD and regenerate one*/
if (priv->session_desc) gf_odf_desc_del(priv->session_desc);
priv->session_desc = NULL;
priv->media_type = expect_type;
return RP_EmulateIOD(priv, sub_url);
}
desc = priv->session_desc;
priv->session_desc = NULL;
return desc;
}
static GF_Err RP_ConnectChannel(GF_InputService *plug, LPNETCHANNEL channel, const char *url, Bool upstream)
{
u32 ESID, i;
RTPStream *ch, *next_ch;
RTSPSession *sess;
char *es_url;
RTPClient *priv = (RTPClient *)plug->priv;
if (upstream) return GF_NOT_SUPPORTED;
GF_LOG(GF_LOG_DEBUG, GF_LOG_RTP, ("[RTP] Connecting channel @%08x - %s\n", channel, url));
ch = RP_FindChannel(priv, channel, 0, (char *) url, GF_FALSE);
if (ch && (ch->status != RTP_Disconnected) ) return GF_SERVICE_ERROR;
es_url = NULL;
sess = NULL;
if (strstr(url, "ES_ID=")) {
sscanf(url, "ES_ID=%ud", &ESID);
/*first case: simple URL (same namespace)*/
ch = RP_FindChannel(priv, NULL, ESID, NULL, GF_FALSE);
/*this should not happen, the sdp must describe all streams in the service*/
if (!ch) return GF_STREAM_NOT_FOUND;
/*search for next stream*/
ch->next_stream = 0;
for (i = 0; i < gf_list_count(priv->channels); i++) {
next_ch = (RTPStream *)gf_list_get(priv->channels, i);
if (next_ch->prev_stream == ch->mid)
{
ch->next_stream = next_ch->mid;
break;
}
}
/*assign app channel*/
ch->channel = channel;
sess = ch->rtsp;
}
/*rtsp url - create a session if needed*/
else if (!strnicmp(url, "rtsp://", 7) || !strnicmp(url, "rtspu://", 8)) {
sess = RP_CheckSession(priv, (char *) url);
if (!sess) sess = RP_NewSession(priv, (char *) url);
es_url = (char *) url;
}
/*data: url*/
else if (strstr(url, "data:application/mpeg4-od-au;base64")
|| strstr(url, "data:application/mpeg4-bifs-au;base64")
|| strstr(url, "data:application/mpeg4-es-au;base64")
) {
GF_SAFEALLOC(ch, RTPStream);
ch->control = gf_strdup(url);
ch->owner = priv;
ch->channel = channel;
ch->status = RTP_Connected;
/*register*/
gf_list_add(priv->channels, ch);
RP_ConfirmChannelConnect(ch, GF_OK);
return GF_OK;
}
/*session migration resume - don't send data to the server*/
if (ch->status==RTP_SessionResume) {
ch->flags |= RTP_CONNECTED;
RP_InitStream(ch, GF_FALSE);
RP_ConfirmChannelConnect(ch, GF_OK);
return GF_OK;
}
/*send a DESCRIBE (not a setup) on the channel. If the channel is already created then the
describe is skipped and a SETUP is sent directly, otherwise the channel is first described then setup*/
if (sess) RP_Describe(sess, es_url, channel);
/*otherwise confirm channel connection*/
else RP_ConfirmChannelConnect(ch, GF_OK);
return GF_OK;
}
static GF_Err RP_DisconnectChannel(GF_InputService *plug, LPNETCHANNEL channel)
{
RTPStream *ch;
RTPClient *priv = (RTPClient *)plug->priv;
GF_LOG(GF_LOG_DEBUG, GF_LOG_RTP, ("[RTP] Disconnecting channel @%08x\n", channel));
ch = RP_FindChannel(priv, channel, 0, NULL, GF_FALSE);
if (!ch) return GF_STREAM_NOT_FOUND;
gf_mx_p(priv->mx);
/*disconnect stream BUT DO NOT DELETE IT since we don't store SDP*/
ch->flags &= ~RTP_CONNECTED;
ch->channel = NULL;
gf_mx_v(priv->mx);
gf_service_disconnect_ack(priv->service, channel, GF_OK);
return GF_OK;
}
static void gf_rtp_switch_quality(RTPClient *rtp, Bool switch_up)
{
u32 i,count;
RTPStream *ch, *cur_ch;
GF_NetworkCommand com;
count = gf_list_count(rtp->channels);
/*find the current stream*/
ch = cur_ch = NULL;
for (i = 0; i < count; i++) {
cur_ch = (RTPStream *) gf_list_get(rtp->channels, i);
if (cur_ch->mid != rtp->cur_mid) {
cur_ch=NULL;
continue;
}
break;
}
if (!cur_ch) return;
if (switch_up)
{
/*this is the highest stream*/
if (!cur_ch->next_stream)
{
cur_ch->status = RTP_Running;
return;
}
else
{
for (i = 0; i < count; i++) {
ch = (RTPStream *) gf_list_get(rtp->channels, i);
if (ch->mid == cur_ch->next_stream)
{
/*resume streaming next channel*/
gf_mx_p(rtp->mx);
RP_InitStream(ch, GF_FALSE);
gf_mx_v(rtp->mx);
ch->status = RTP_Running;
rtp->cur_mid = ch->mid;
break;
}
}
}
}
else
{
/*this is the lowest stream i.e base layer*/
if (!cur_ch->prev_stream)
{
cur_ch->status = RTP_Running;
return;
}
else
{
for (i = 0; i < count; i++) {
ch = (RTPStream *) gf_list_get(rtp->channels, i);
if (ch->mid == cur_ch->prev_stream)
{
/*stop streaming current channel*/
gf_rtp_stop(cur_ch->rtp_ch);
cur_ch->status = RTP_Connected;
com.command_type = GF_NET_CHAN_RESET;
com.base.on_channel = cur_ch;
gf_service_command(rtp->service, &com, GF_OK);
rtp->cur_mid = ch->mid;
break;
}
}
}
}
GF_LOG(GF_LOG_DEBUG, GF_LOG_CODEC, ("Switch from ES%d to ES %d\n", cur_ch->mid, ch->mid));
return;
}
static GF_Err RP_ServiceCommand(GF_InputService *plug, GF_NetworkCommand *com)
{
RTPStream *ch;
RTPClient *priv = (RTPClient *)plug->priv;
if (com->command_type==GF_NET_SERVICE_HAS_AUDIO) {
u32 i;
for (i=0; i<gf_list_count(priv->channels); i++) {
ch = (RTPStream*)gf_list_get(priv->channels, i);
if (ch->depacketizer->sl_map.StreamType==GF_STREAM_AUDIO)
return GF_OK;
}
return GF_NOT_SUPPORTED;
}
if (com->command_type==GF_NET_SERVICE_MIGRATION_INFO) {
RP_SaveSessionState(priv);
priv->session_migration = GF_TRUE;
if (priv->session_state_data) {
com->migrate.data = priv->session_state_data;
com->migrate.data_len = (u32) strlen(priv->session_state_data);
return GF_OK;
}
return GF_NOT_SUPPORTED;
}
if (com->command_type == GF_NET_SERVICE_QUALITY_SWITCH)
{
gf_rtp_switch_quality(priv, com->switch_quality.up);
return GF_OK;
}
/*ignore commands other than channels one*/
if (!com->base.on_channel) {
if (com->command_type==GF_NET_IS_CACHABLE) return GF_OK;
return GF_NOT_SUPPORTED;
}
ch = RP_FindChannel(priv, com->base.on_channel, 0, NULL, GF_FALSE);
if (!ch) return GF_STREAM_NOT_FOUND;
switch (com->command_type) {
case GF_NET_CHAN_SET_PULL:
if (ch->rtp_ch || ch->rtsp || !ch->control) return GF_NOT_SUPPORTED;
/*embedded channels work in pull mode*/
if (strstr(ch->control, "data:application/")) return GF_OK;
return GF_NOT_SUPPORTED;
case GF_NET_CHAN_INTERACTIVE:
/*looks like pure RTP / multicast etc, not interactive*/
if (!ch->control) return GF_NOT_SUPPORTED;
/*emulated broadcast mode*/
else if (ch->flags & RTP_FORCE_BROADCAST) return GF_NOT_SUPPORTED;
/*regular rtsp mode*/
else if (ch->flags & RTP_HAS_RANGE) return GF_OK;
/*embedded data*/
else if (strstr(ch->control, "application")) return GF_OK;
return GF_NOT_SUPPORTED;
case GF_NET_CHAN_BUFFER:
if (!(ch->rtp_ch || ch->rtsp || !ch->control)) {
com->buffer.max = com->buffer.min = 0;
} else {
const char *opt;
/*amount of buffering in ms*/
opt = gf_modules_get_option((GF_BaseInterface *)plug, "Network", "BufferLength");
com->buffer.max = opt ? atoi(opt) : 1000;
/*rebuffer low limit in ms - if the amount of buffering is less than this, rebuffering will never occur*/
opt = gf_modules_get_option((GF_BaseInterface *)plug, "Network", "RebufferLength");
if (opt) com->buffer.min = atoi(opt);
else com->buffer.min = 500;
if (com->buffer.min >= com->buffer.max ) com->buffer.min = 0;
}
return GF_OK;
case GF_NET_CHAN_DURATION:
com->duration.duration = (ch->flags & RTP_HAS_RANGE) ? (ch->range_end - ch->range_start) : 0;
return GF_OK;
/*RTP channel config is done upon connection, once the complete SL mapping is known
however we must store some info not carried in SDP*/
case GF_NET_CHAN_CONFIG:
if (com->cfg.frame_duration) ch->depacketizer->sl_hdr.au_duration = com->cfg.frame_duration;
ch->ts_res = com->cfg.sl_config.timestampResolution;
return GF_OK;
case GF_NET_CHAN_PLAY:
GF_LOG(GF_LOG_DEBUG, GF_LOG_RTP, ("[RTP] Processing play on channel @%08x - %s\n", ch, ch->rtsp ? "RTSP control" : "No control (RTP)" ));
/*is this RTSP or direct RTP?*/
ch->flags &= ~RTP_EOS;
if (ch->rtsp) {
if (ch->status==RTP_SessionResume) {
const char *opt = gf_modules_get_option((GF_BaseInterface *) plug, "Streaming", "SessionMigrationPause");
if (opt && !strcmp(opt, "yes")) {
ch->status = RTP_Connected;
com->play.start_range = ch->current_start;
} else {
ch->status = RTP_Running;
return GF_OK;
}
}
RP_UserCommand(ch->rtsp, ch, com);
} else {
ch->status = RTP_Running;
if (!ch->next_stream)
priv->cur_mid = ch->mid;
if (ch->rtp_ch) {
/*technically we shouldn't attempt to synchronize streams based on RTP, we should use RTCP/ However it
may happen that the RTCP traffic is absent ...*/
//ch->check_rtp_time = RTP_SET_TIME_RTP;
ch->rtcp_init = GF_FALSE;
gf_mx_p(priv->mx);
RP_InitStream(ch, (ch->flags & RTP_CONNECTED) ? GF_TRUE : GF_FALSE);
gf_mx_v(priv->mx);
gf_rtp_set_info_rtp(ch->rtp_ch, 0, 0, 0);
} else {
/*direct channel, store current start*/
ch->current_start = com->play.start_range;
ch->flags |= GF_RTP_NEW_AU;
gf_rtp_depacketizer_reset(ch->depacketizer, GF_FALSE);
}
}
return GF_OK;
case GF_NET_CHAN_STOP:
/*is this RTSP or direct RTP?*/
if (ch->rtsp) {
if (! ch->owner->session_migration) {
RP_UserCommand(ch->rtsp, ch, com);
}
} else {
ch->status = RTP_Connected;
ch->owner->last_ntp = 0;
}
ch->rtcp_init = GF_FALSE;
return GF_OK;
case GF_NET_CHAN_SET_SPEED:
case GF_NET_CHAN_PAUSE:
case GF_NET_CHAN_RESUME:
assert(ch->rtsp);
RP_UserCommand(ch->rtsp, ch, com);
return GF_OK;
case GF_NET_CHAN_GET_DSI:
if (ch->depacketizer && ch->depacketizer->sl_map.configSize) {
com->get_dsi.dsi_len = ch->depacketizer->sl_map.configSize;
com->get_dsi.dsi = (char*)gf_malloc(sizeof(char)*com->get_dsi.dsi_len);
memcpy(com->get_dsi.dsi, ch->depacketizer->sl_map.config, sizeof(char)*com->get_dsi.dsi_len);
} else {
com->get_dsi.dsi = NULL;
com->get_dsi.dsi_len = 0;
}
return GF_OK;
case GF_NET_GET_STATS:
memset(&com->net_stats, 0, sizeof(GF_NetComStats));
if (ch->rtp_ch) {
u32 time;
Float bps;
com->net_stats.pck_loss_percentage = gf_rtp_get_loss(ch->rtp_ch);
if (ch->flags & RTP_INTERLEAVED) {
com->net_stats.multiplex_port = gf_rtsp_get_session_port(ch->rtsp->session);
com->net_stats.port = gf_rtp_get_low_interleave_id(ch->rtp_ch);
com->net_stats.ctrl_port = gf_rtp_get_hight_interleave_id(ch->rtp_ch);
} else {
com->net_stats.multiplex_port = 0;
gf_rtp_get_ports(ch->rtp_ch, &com->net_stats.port, &com->net_stats.ctrl_port);
}
if (ch->stat_stop_time) {
time = ch->stat_stop_time - ch->stat_start_time;
} else {
time = gf_sys_clock() - ch->stat_start_time;
}
bps = 8.0f * ch->rtp_bytes;
bps *= 1000;
bps /= time;
com->net_stats.bw_down = (u32) bps;
bps = 8.0f * ch->rtcp_bytes;
bps *= 1000;
bps /= time;
com->net_stats.ctrl_bw_down = (u32) bps;
bps = 8.0f * gf_rtp_get_tcp_bytes_sent(ch->rtp_ch);
bps *= 1000;
bps /= time;
com->net_stats.ctrl_bw_up = (u32) bps;
}
return GF_OK;
default:
break;
}
return GF_NOT_SUPPORTED;
}
static GF_Err RP_ChannelGetSLP(GF_InputService *plug, LPNETCHANNEL channel, char **out_data_ptr, u32 *out_data_size, GF_SLHeader *out_sl_hdr, Bool *sl_compressed, GF_Err *out_reception_status, Bool *is_new_data)
{
char *data;
RTPStream *ch;
RTPClient *priv = (RTPClient *)plug->priv;
ch = RP_FindChannel(priv, channel, 0, NULL, GF_FALSE);
if (!ch) return GF_STREAM_NOT_FOUND;
if (ch->rtp_ch || ch->rtsp || !ch->control) return GF_SERVICE_ERROR;
if (ch->status != RTP_Running) return GF_SERVICE_ERROR;
data = strstr(ch->control, ";base64");
if (!data) return GF_SERVICE_ERROR;
if (ch->current_start>=0) {
*sl_compressed = GF_FALSE;
memset(out_sl_hdr, 0, sizeof(GF_SLHeader));
out_sl_hdr->accessUnitEndFlag = 1;
out_sl_hdr->accessUnitStartFlag = 1;
out_sl_hdr->compositionTimeStamp = (u64) (ch->current_start * ch->ts_res);
out_sl_hdr->compositionTimeStampFlag = 1;
out_sl_hdr->randomAccessPointFlag = 1;
*out_reception_status = GF_OK;
*is_new_data = (ch->flags & GF_RTP_NEW_AU) ? GF_TRUE : GF_FALSE;
/*decode data*/
data = strstr(data, ",");
data += 1;
*out_data_size = gf_base64_decode(data, (u32) strlen(data), ch->buffer, RTP_BUFFER_SIZE);
/*FIXME - currently only support for empty SL header*/
*out_data_ptr = ch->buffer;
ch->flags &= ~GF_RTP_NEW_AU;
} else {
*out_data_ptr = NULL;
*out_data_size = 0;
*out_reception_status = GF_EOS;
ch->flags |= RTP_EOS;
}
return GF_OK;
}
static GF_Err RP_ChannelReleaseSLP(GF_InputService *plug, LPNETCHANNEL channel)
{
RTPStream *ch;
RTPClient *priv = (RTPClient *)plug->priv;
ch = RP_FindChannel(priv, channel, 0, NULL, GF_FALSE);
if (!ch) return GF_STREAM_NOT_FOUND;
if (ch->rtp_ch || ch->rtsp || !ch->control) return GF_SERVICE_ERROR;
if (ch->status != RTP_Running) return GF_SERVICE_ERROR;
/*this will trigger EOS at next fetch*/
ch->current_start = -1.0;
return GF_OK;
}
static Bool RP_CanHandleURLInService(GF_InputService *plug, const char *url)
{
RTSPSession *sess;
RTPClient *priv = (RTPClient *)plug->priv;
if (strstr(url, "data:application/mpeg4-od-au;base64")
|| strstr(url, "data:application/mpeg4-bifs-au;base64")
|| strstr(url, "data:application/mpeg4-es-au;base64")
) return GF_TRUE;
if (url[0]=='#') {
Bool st_type = GF_FALSE;
if (!stricmp(url, "#video")) st_type = GF_STREAM_VISUAL;
else if (!stricmp(url, "#audio")) st_type = GF_STREAM_AUDIO;
if (st_type) {
u32 i=0;
RTPStream *st;
while ((st = (RTPStream *)gf_list_enum(priv->channels, &i))) {
if (st->depacketizer && (st->depacketizer->sl_map.StreamType==st_type))
return GF_TRUE;
}
}
return GF_FALSE;
} else if (!RP_CanHandleURL(plug, url)) {
return GF_FALSE;
}
/*if this URL is part of a running session then ok*/
sess = RP_CheckSession(priv, (char *) url);
if (sess) return GF_TRUE;
return GF_FALSE;
}
GF_InputService *RTP_Load()
{
RTPClient *priv;
GF_InputService *plug;
GF_SAFEALLOC(plug, GF_InputService);
memset(plug, 0, sizeof(GF_InputService));
GF_REGISTER_MODULE_INTERFACE(plug, GF_NET_CLIENT_INTERFACE, "GPAC RTP/RTSP Client", "gpac distribution")
plug->CanHandleURL = RP_CanHandleURL;
plug->CanHandleURLInService = RP_CanHandleURLInService;
plug->ConnectService = RP_ConnectService;
plug->CloseService = RP_CloseService;
plug->GetServiceDescriptor = RP_GetServiceDesc;
plug->ConnectChannel = RP_ConnectChannel;
plug->DisconnectChannel = RP_DisconnectChannel;
plug->ServiceCommand = RP_ServiceCommand;
plug->RegisterMimeTypes = RP_RegisterMimeTypes;
/*PULL mode for embedded streams*/
plug->ChannelGetSLP = RP_ChannelGetSLP;
plug->ChannelReleaseSLP = RP_ChannelReleaseSLP;
GF_SAFEALLOC(priv, RTPClient);
priv->sessions = gf_list_new();
priv->channels = gf_list_new();
plug->priv = priv;
priv->time_out = RTSP_DEFAULT_TIMEOUT;
priv->mx = gf_mx_new("RTPDemux");
priv->th = gf_th_new("RTPDemux");
return plug;
}
void RTP_Delete(GF_BaseInterface *bi)
{
RTPClient *rtp;
u32 retry;
GF_InputService *plug = (GF_InputService *) bi;
rtp = (RTPClient *)plug->priv;
/*shutdown thread*/
if (rtp->th_state==1) rtp->th_state = 0;
retry = 20;
while ((rtp->th_state==1) && retry) {
gf_sleep(10);
retry--;
}
assert(retry);
if (rtp->session_state_data) gf_free(rtp->session_state_data);
RP_cleanup(rtp);
gf_th_del(rtp->th);
gf_mx_del(rtp->mx);
gf_list_del(rtp->sessions);
gf_list_del(rtp->channels);
gf_free(rtp);
gf_free(bi);
}
#endif
GPAC_MODULE_EXPORT
const u32 *QueryInterfaces()
{
static u32 si [] = {
#ifndef GPAC_DISABLE_STREAMING
GF_NET_CLIENT_INTERFACE,
#endif
0
};
return si;
}
GPAC_MODULE_EXPORT
GF_BaseInterface *LoadInterface(u32 InterfaceType)
{
#ifndef GPAC_DISABLE_STREAMING
if (InterfaceType == GF_NET_CLIENT_INTERFACE) return (GF_BaseInterface *)RTP_Load();
#endif
return NULL;
}
GPAC_MODULE_EXPORT
void ShutdownInterface(GF_BaseInterface *ifce)
{
switch (ifce->InterfaceType) {
#ifndef GPAC_DISABLE_STREAMING
case GF_NET_CLIENT_INTERFACE:
RTP_Delete(ifce);
break;
#endif
}
}
GPAC_MODULE_STATIC_DECLARATION( rtp_in )