Skip to content

Commit

Permalink
Fix #3170: WebRTC: Support WHIP(WebRTC-HTTP ingestion protocol). v4.0…
Browse files Browse the repository at this point in the history
….262
  • Loading branch information
winlinvip committed Sep 6, 2022
1 parent e0c8c19 commit 15610ca
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 51 deletions.
1 change: 1 addition & 0 deletions trunk/doc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The changelog for SRS.

## SRS 4.0 Changelog

* v4.0, 2022-09-06, Fix [#3170](https://github.com/ossrs/srs/issues/3170): WebRTC: Support WHIP(WebRTC-HTTP ingestion protocol). v4.0.262
* v4.0, 2022-09-03, Fix HTTP url parsing bug. v4.0.261
* v4.0, 2022-09-03, For [#3167](https://github.com/ossrs/srs/issues/3167): WebRTC: Play stucked when republish. v4.0.260
* v4.0, 2022-09-02, For [#307](https://github.com/ossrs/srs/issues/307): WebRTC: Support use domain name as CANDIDATE. v4.0.259
Expand Down
213 changes: 163 additions & 50 deletions trunk/src/app/srs_app_rtc_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe

srs_parse_rtmp_url(streamurl, ruc.req_->tcUrl, ruc.req_->stream);

srs_discovery_tc_url(ruc.req_->tcUrl, ruc.req_->schema, ruc.req_->host, ruc.req_->vhost,
srs_discovery_tc_url(ruc.req_->tcUrl, ruc.req_->schema, ruc.req_->host, ruc.req_->vhost,
ruc.req_->app, ruc.req_->stream, ruc.req_->port, ruc.req_->param);

// discovery vhost, resolve the vhost from config
Expand All @@ -130,10 +130,6 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
ruc.req_->vhost = parsed_vhost->arg0();
}

if ((err = http_hooks_on_play(ruc.req_)) != srs_success) {
return srs_error_wrap(err, "RTC: http_hooks_on_play");
}

// For client to specifies the candidate(EIP) of server.
string eip = r->query_get("eip");
if (eip.empty()) {
Expand All @@ -144,9 +140,11 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
string srtp = r->query_get("encrypt");
string dtls = r->query_get("dtls");

srs_trace("RTC play %s, api=%s, tid=%s, clientip=%s, app=%s, stream=%s, offer=%dB, eip=%s, codec=%s, srtp=%s, dtls=%s",
streamurl.c_str(), api.c_str(), tid.c_str(), clientip.c_str(), ruc.req_->app.c_str(), ruc.req_->stream.c_str(), remote_sdp_str.length(),
eip.c_str(), codec.c_str(), srtp.c_str(), dtls.c_str()
srs_trace(
"RTC play %s, api=%s, tid=%s, clientip=%s, app=%s, stream=%s, offer=%dB, eip=%s, codec=%s, srtp=%s, dtls=%s",
streamurl.c_str(), api.c_str(), tid.c_str(), clientip.c_str(), ruc.req_->app.c_str(),
ruc.req_->stream.c_str(), remote_sdp_str.length(),
eip.c_str(), codec.c_str(), srtp.c_str(), dtls.c_str()
);

ruc.eip_ = eip;
Expand All @@ -161,6 +159,7 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
}

// TODO: FIXME: It seems remote_sdp doesn't represents the full SDP information.
ruc.remote_sdp_str_ = remote_sdp_str;
if ((err = ruc.remote_sdp_.parse(remote_sdp_str)) != srs_success) {
return srs_error_wrap(err, "parse sdp failed: %s", remote_sdp_str.c_str());
}
Expand All @@ -169,42 +168,64 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
return srs_error_wrap(err, "remote sdp check failed");
}

if ((err = http_hooks_on_play(ruc.req_)) != srs_success) {
return srs_error_wrap(err, "RTC: http_hooks_on_play");
}

if ((err = serve_http(w, r, &ruc)) != srs_success) {
return srs_error_wrap(err, "serve");
}

res->set("code", SrsJsonAny::integer(ERROR_SUCCESS));
res->set("server", SrsJsonAny::str(SrsStatistic::instance()->server_id().c_str()));

// TODO: add candidates in response json?
res->set("sdp", SrsJsonAny::str(ruc.local_sdp_str_.c_str()));
res->set("sessionid", SrsJsonAny::str(ruc.session_id_.c_str()));

return err;
}

srs_error_t SrsGoApiRtcPlay::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsRtcUserConfig* ruc)
{
srs_error_t err = srs_success;

SrsSdp local_sdp;

// Config for SDP and session.
local_sdp.session_config_.dtls_role = _srs_config->get_rtc_dtls_role(ruc.req_->vhost);
local_sdp.session_config_.dtls_version = _srs_config->get_rtc_dtls_version(ruc.req_->vhost);
local_sdp.session_config_.dtls_role = _srs_config->get_rtc_dtls_role(ruc->req_->vhost);
local_sdp.session_config_.dtls_version = _srs_config->get_rtc_dtls_version(ruc->req_->vhost);

// Whether enabled.
bool server_enabled = _srs_config->get_rtc_server_enabled();
bool rtc_enabled = _srs_config->get_rtc_enabled(ruc.req_->vhost);
bool rtc_enabled = _srs_config->get_rtc_enabled(ruc->req_->vhost);
if (server_enabled && !rtc_enabled) {
srs_warn("RTC disabled in vhost %s", ruc.req_->vhost.c_str());
srs_warn("RTC disabled in vhost %s", ruc->req_->vhost.c_str());
}
if (!server_enabled || !rtc_enabled) {
return srs_error_new(ERROR_RTC_DISABLED, "Disabled server=%d, rtc=%d, vhost=%s",
server_enabled, rtc_enabled, ruc.req_->vhost.c_str());
server_enabled, rtc_enabled, ruc->req_->vhost.c_str());
}

// Whether RTC stream is active.
bool is_rtc_stream_active = false;
if (true) {
SrsRtcSource* source = _srs_rtc_sources->fetch(ruc.req_);
SrsRtcSource* source = _srs_rtc_sources->fetch(ruc->req_);
is_rtc_stream_active = (source && !source->can_publish());
}

// For RTMP to RTC, fail if disabled and RTMP is active, see https://github.com/ossrs/srs/issues/2728
if (!is_rtc_stream_active && !_srs_config->get_rtc_from_rtmp(ruc.req_->vhost)) {
SrsLiveSource* rtmp = _srs_sources->fetch(ruc.req_);
if (!is_rtc_stream_active && !_srs_config->get_rtc_from_rtmp(ruc->req_->vhost)) {
SrsLiveSource* rtmp = _srs_sources->fetch(ruc->req_);
if (rtmp && !rtmp->inactive()) {
return srs_error_new(ERROR_RTC_DISABLED, "Disabled rtmp_to_rtc of %s, see #2728", ruc.req_->vhost.c_str());
return srs_error_new(ERROR_RTC_DISABLED, "Disabled rtmp_to_rtc of %s, see #2728", ruc->req_->vhost.c_str());
}
}

// TODO: FIXME: When server enabled, but vhost disabled, should report error.
SrsRtcConnection* session = NULL;
if ((err = server_->create_session(&ruc, local_sdp, &session)) != srs_success) {
return srs_error_wrap(err, "create session, dtls=%u, srtp=%u, eip=%s", ruc.dtls_, ruc.srtp_, eip.c_str());
if ((err = server_->create_session(ruc, local_sdp, &session)) != srs_success) {
return srs_error_wrap(err, "create session, dtls=%u, srtp=%u, eip=%s", ruc->dtls_, ruc->srtp_, ruc->eip_.c_str());
}

ostringstream os;
Expand All @@ -216,17 +237,12 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
// Filter the \r\n to \\r\\n for JSON.
string local_sdp_escaped = srs_string_replace(local_sdp_str.c_str(), "\r\n", "\\r\\n");

res->set("code", SrsJsonAny::integer(ERROR_SUCCESS));
res->set("server", SrsJsonAny::str(SrsStatistic::instance()->server_id().c_str()));

// TODO: add candidates in response json?

res->set("sdp", SrsJsonAny::str(local_sdp_str.c_str()));
res->set("sessionid", SrsJsonAny::str(session->username().c_str()));
ruc->local_sdp_str_ = local_sdp_str;
ruc->session_id_ = session->username();

srs_trace("RTC username=%s, dtls=%u, srtp=%u, offer=%dB, answer=%dB", session->username().c_str(),
ruc.dtls_, ruc.srtp_, remote_sdp_str.length(), local_sdp_escaped.length());
srs_trace("RTC remote offer: %s", srs_string_replace(remote_sdp_str.c_str(), "\r\n", "\\r\\n").c_str());
ruc->dtls_, ruc->srtp_, ruc->remote_sdp_str_.length(), local_sdp_escaped.length());
srs_trace("RTC remote offer: %s", srs_string_replace(ruc->remote_sdp_str_.c_str(), "\r\n", "\\r\\n").c_str());
srs_trace("RTC local answer: %s", local_sdp_escaped.c_str());

return err;
Expand Down Expand Up @@ -333,8 +349,7 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
srs_error_t err = srs_success;

// For each RTC session, we use short-term HTTP connection.
SrsHttpHeader* hdr = w->header();
hdr->set("Connection", "Close");
w->header()->set("Connection", "Close");

// Parse req, the request json object, from body.
SrsJsonObject* req = NULL;
Expand Down Expand Up @@ -406,10 +421,6 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
ruc.req_->vhost = parsed_vhost->arg0();
}

if ((err = http_hooks_on_publish(ruc.req_)) != srs_success) {
return srs_error_wrap(err, "RTC: http_hooks_on_publish");
}

// For client to specifies the candidate(EIP) of server.
string eip = r->query_get("eip");
if (eip.empty()) {
Expand All @@ -428,35 +439,58 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
ruc.dtls_ = ruc.srtp_ = true;

// TODO: FIXME: It seems remote_sdp doesn't represents the full SDP information.
ruc.remote_sdp_str_ = remote_sdp_str;
if ((err = ruc.remote_sdp_.parse(remote_sdp_str)) != srs_success) {
return srs_error_wrap(err, "parse sdp failed: %s", remote_sdp_str.c_str());
}

if ((err = check_remote_sdp(ruc.remote_sdp_)) != srs_success) {
if ((err = serve_http(w, r, &ruc)) != srs_success) {
return srs_error_wrap(err, "serve");
}

res->set("code", SrsJsonAny::integer(ERROR_SUCCESS));
res->set("server", SrsJsonAny::str(SrsStatistic::instance()->server_id().c_str()));

// TODO: add candidates in response json?
res->set("sdp", SrsJsonAny::str(ruc.local_sdp_str_.c_str()));
res->set("sessionid", SrsJsonAny::str(ruc.session_id_.c_str()));

return err;
}

srs_error_t SrsGoApiRtcPublish::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsRtcUserConfig* ruc)
{
srs_error_t err = srs_success;

if ((err = check_remote_sdp(ruc->remote_sdp_)) != srs_success) {
return srs_error_wrap(err, "remote sdp check failed");
}

if ((err = http_hooks_on_publish(ruc->req_)) != srs_success) {
return srs_error_wrap(err, "RTC: http_hooks_on_publish");
}

SrsSdp local_sdp;

// TODO: FIXME: move to create_session.
// Config for SDP and session.
local_sdp.session_config_.dtls_role = _srs_config->get_rtc_dtls_role(ruc.req_->vhost);
local_sdp.session_config_.dtls_version = _srs_config->get_rtc_dtls_version(ruc.req_->vhost);
local_sdp.session_config_.dtls_role = _srs_config->get_rtc_dtls_role(ruc->req_->vhost);
local_sdp.session_config_.dtls_version = _srs_config->get_rtc_dtls_version(ruc->req_->vhost);

// Whether enabled.
bool server_enabled = _srs_config->get_rtc_server_enabled();
bool rtc_enabled = _srs_config->get_rtc_enabled(ruc.req_->vhost);
bool rtc_enabled = _srs_config->get_rtc_enabled(ruc->req_->vhost);
if (server_enabled && !rtc_enabled) {
srs_warn("RTC disabled in vhost %s", ruc.req_->vhost.c_str());
srs_warn("RTC disabled in vhost %s", ruc->req_->vhost.c_str());
}
if (!server_enabled || !rtc_enabled) {
return srs_error_new(ERROR_RTC_DISABLED, "Disabled server=%d, rtc=%d, vhost=%s",
server_enabled, rtc_enabled, ruc.req_->vhost.c_str());
server_enabled, rtc_enabled, ruc->req_->vhost.c_str());
}

// TODO: FIXME: When server enabled, but vhost disabled, should report error.
SrsRtcConnection* session = NULL;
if ((err = server_->create_session(&ruc, local_sdp, &session)) != srs_success) {
if ((err = server_->create_session(ruc, local_sdp, &session)) != srs_success) {
return srs_error_wrap(err, "create session");
}

Expand All @@ -469,17 +503,12 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
// Filter the \r\n to \\r\\n for JSON.
string local_sdp_escaped = srs_string_replace(local_sdp_str.c_str(), "\r\n", "\\r\\n");

res->set("code", SrsJsonAny::integer(ERROR_SUCCESS));
res->set("server", SrsJsonAny::str(SrsStatistic::instance()->server_id().c_str()));

// TODO: add candidates in response json?

res->set("sdp", SrsJsonAny::str(local_sdp_str.c_str()));
res->set("sessionid", SrsJsonAny::str(session->username().c_str()));
ruc->local_sdp_str_ = local_sdp_str;
ruc->session_id_ = session->username();

srs_trace("RTC username=%s, offer=%dB, answer=%dB", session->username().c_str(),
remote_sdp_str.length(), local_sdp_escaped.length());
srs_trace("RTC remote offer: %s", srs_string_replace(remote_sdp_str.c_str(), "\r\n", "\\r\\n").c_str());
ruc->remote_sdp_str_.length(), local_sdp_escaped.length());
srs_trace("RTC remote offer: %s", srs_string_replace(ruc->remote_sdp_str_.c_str(), "\r\n", "\\r\\n").c_str());
srs_trace("RTC local answer: %s", local_sdp_escaped.c_str());

return err;
Expand Down Expand Up @@ -545,6 +574,90 @@ srs_error_t SrsGoApiRtcPublish::http_hooks_on_publish(SrsRequest* req)
return err;
}

SrsGoApiRtcWhip::SrsGoApiRtcWhip(SrsRtcServer* server)
{
publish_ = new SrsGoApiRtcPublish(server);
}

SrsGoApiRtcWhip::~SrsGoApiRtcWhip()
{
srs_freep(publish_);
}

srs_error_t SrsGoApiRtcWhip::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
srs_error_t err = srs_success;

// For each RTC session, we use short-term HTTP connection.
w->header()->set("Connection", "Close");

string remote_sdp_str;
if ((err = r->body_read_all(remote_sdp_str)) != srs_success) {
return srs_error_wrap(err, "read sdp");
}

string clientip;
if (clientip.empty()){
clientip = dynamic_cast<SrsHttpMessage*>(r)->connection()->remote_ip();
// Overwrite by ip from proxy.
string oip = srs_get_original_ip(r);
if (!oip.empty()) {
clientip = oip;
}
}

// For client to specifies the candidate(EIP) of server.
string eip = r->query_get("eip");
if (eip.empty()) {
eip = r->query_get("candidate");
}
string codec = r->query_get("codec");
string app = r->query_get("app");
string stream = r->query_get("stream");

// The RTC user config object.
SrsRtcUserConfig ruc;
ruc.req_->ip = clientip;
ruc.req_->host = r->host();
ruc.req_->vhost = ruc.req_->host;
ruc.req_->app = app.empty() ? "live" : app;
ruc.req_->stream = stream.empty() ? "livestream" : stream;

// discovery vhost, resolve the vhost from config
SrsConfDirective* parsed_vhost = _srs_config->get_vhost(ruc.req_->vhost);
if (parsed_vhost) {
ruc.req_->vhost = parsed_vhost->arg0();
}

srs_trace("RTC whip %s, clientip=%s, app=%s, stream=%s, offer=%dB, eip=%s, codec=%s",
ruc.req_->get_stream_url().c_str(), clientip.c_str(), ruc.req_->app.c_str(), ruc.req_->stream.c_str(),
remote_sdp_str.length(), eip.c_str(), codec.c_str()
);

ruc.eip_ = eip;
ruc.codec_ = codec;
ruc.publish_ = true;
ruc.dtls_ = ruc.srtp_ = true;

// TODO: FIXME: It seems remote_sdp doesn't represents the full SDP information.
ruc.remote_sdp_str_ = remote_sdp_str;
if ((err = ruc.remote_sdp_.parse(remote_sdp_str)) != srs_success) {
return srs_error_wrap(err, "parse sdp failed: %s", remote_sdp_str.c_str());
}

if ((err = publish_->serve_http(w, r, &ruc)) != srs_success) {
return srs_error_wrap(err, "serve");
}

if (ruc.local_sdp_str_.empty()) {
return srs_go_http_error(w, SRS_CONSTS_HTTP_InternalServerError);
}

string sdp = ruc.local_sdp_str_;
w->header()->set("Content-Type", "application/sdp");
return w->write((char*)sdp.data(), (int)sdp.length());
}

SrsGoApiRtcNACK::SrsGoApiRtcNACK(SrsRtcServer* server)
{
server_ = server;
Expand Down
18 changes: 18 additions & 0 deletions trunk/src/app/srs_app_rtc_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
class SrsRtcServer;
class SrsRequest;
class SrsSdp;
class SrsRtcUserConfig;

class SrsGoApiRtcPlay : public ISrsHttpHandler
{
Expand All @@ -26,6 +27,7 @@ class SrsGoApiRtcPlay : public ISrsHttpHandler
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
private:
virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res);
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsRtcUserConfig* ruc);
srs_error_t check_remote_sdp(const SrsSdp& remote_sdp);
private:
virtual srs_error_t http_hooks_on_play(SrsRequest* req);
Expand All @@ -42,11 +44,27 @@ class SrsGoApiRtcPublish : public ISrsHttpHandler
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
private:
virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res);
public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsRtcUserConfig* ruc);
private:
srs_error_t check_remote_sdp(const SrsSdp& remote_sdp);
private:
virtual srs_error_t http_hooks_on_publish(SrsRequest* req);
};

// See https://datatracker.ietf.org/doc/draft-ietf-wish-whip/
class SrsGoApiRtcWhip : public ISrsHttpHandler
{
private:
SrsRtcServer* server_;
SrsGoApiRtcPublish* publish_;
public:
SrsGoApiRtcWhip(SrsRtcServer* server);
virtual ~SrsGoApiRtcWhip();
public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
};

class SrsGoApiRtcNACK : public ISrsHttpHandler
{
private:
Expand Down
Loading

0 comments on commit 15610ca

Please sign in to comment.