Skip to content

Commit

Permalink
Forward: multiple forwarders config full rtmp url to other server (#1342
Browse files Browse the repository at this point in the history
)
  • Loading branch information
chundonglinlin committed Dec 28, 2021
1 parent f1a448b commit 2b0da1a
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 7 deletions.
8 changes: 8 additions & 0 deletions trunk/conf/forward.master.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,12 @@ vhost __defaultVhost__ {
enabled on;
destination 127.0.0.1:19350;
}

forward live/livestream {
destination rtmp://ossrs.net/live01/test001?auth_key=262953 rtmp://ossrs.net/live02/test002;
}

forward live1/livestream1 {
destination rtmp://ossrs.net/live03/test003?auth_key=abcde rtmp://ossrs.net/live04/test004;
}
}
20 changes: 18 additions & 2 deletions trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4548,7 +4548,7 @@ bool SrsConfig::get_forward_enabled(SrsConfDirective* vhost)
{
static bool DEFAULT = false;

SrsConfDirective* conf = vhost->get("forward");
SrsConfDirective* conf = vhost->get("forward", "");
if (!conf) {
return DEFAULT;
}
Expand All @@ -4568,14 +4568,30 @@ SrsConfDirective* SrsConfig::get_forwards(string vhost)
return NULL;
}

conf = conf->get("forward");
conf = conf->get("forward", "");
if (!conf) {
return NULL;
}

return conf->get("destination");
}

SrsConfDirective* SrsConfig::get_forwards(string vhost, string app, string stream)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return NULL;
}

string pattern = app + "/" + stream;
conf = conf->get("forward", pattern);
if (!conf || conf->arg0().empty()) {
return NULL;
}

return conf->get("destination");
}

SrsConfDirective* SrsConfig::get_vhost_http_hooks(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
Expand Down
3 changes: 2 additions & 1 deletion trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,8 @@ class SrsConfig
virtual bool get_forward_enabled(SrsConfDirective* vhost);
// Get the forward directive of vhost.
virtual SrsConfDirective* get_forwards(std::string vhost);

// Get the forward url directive of vhost.
virtual SrsConfDirective* get_forwards(std::string vhost, std::string app, std::string stream);
public:
// Whether the srt sevice enabled
virtual bool get_srt_enabled();
Expand Down
2 changes: 2 additions & 0 deletions trunk/src/app/srs_app_forward.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ SrsForwarder::~SrsForwarder()

srs_freep(sh_video);
srs_freep(sh_audio);

srs_freep(req);
}

srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep)
Expand Down
42 changes: 38 additions & 4 deletions trunk/src/app/srs_app_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1468,20 +1468,54 @@ srs_error_t SrsOriginHub::on_reload_vhost_exec(string vhost)
srs_error_t SrsOriginHub::create_forwarders()
{
srs_error_t err = srs_success;


// pattern: app/stream
SrsConfDirective* conf = _srs_config->get_forwards(req->vhost, req->app, req->stream);
for (int i = 0; conf && i < (int)conf->args.size(); i++) {
std::string url = conf->args.at(i);

// create forwarder by url
SrsRequest* freq = new SrsRequest();
srs_parse_rtmp_url(url, freq->tcUrl, freq->stream);
srs_discovery_tc_url(freq->tcUrl, freq->schema, freq->host, freq->vhost, freq->app, freq->stream, freq->port, freq->param);

SrsForwarder* forwarder = new SrsForwarder(this);
forwarders.push_back(forwarder);

std::stringstream ss;
ss << freq->host << ":" << freq->port;
std::string forward_server = ss.str();

// initialize the forwarder with request.
if ((err = forwarder->initialize(freq, forward_server)) != srs_success) {
return srs_error_wrap(err, "init forwarder");
}

srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);
forwarder->set_queue_size(queue_size);

if ((err = forwarder->on_publish()) != srs_success) {
return srs_error_wrap(err, "start forwarder failed, vhost=%s, app=%s, stream=%s, forward-to=%s",
req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), url.c_str());
}
}

if (!_srs_config->get_forward_enabled(req->vhost)) {
return err;
}

SrsConfDirective* conf = _srs_config->get_forwards(req->vhost);
conf = _srs_config->get_forwards(req->vhost);
for (int i = 0; conf && i < (int)conf->args.size(); i++) {
std::string forward_server = conf->args.at(i);


// copy source req
SrsRequest* freq = req->copy();

SrsForwarder* forwarder = new SrsForwarder(this);
forwarders.push_back(forwarder);

// initialize the forwarder with request.
if ((err = forwarder->initialize(req, forward_server)) != srs_success) {
if ((err = forwarder->initialize(freq, forward_server)) != srs_success) {
return srs_error_wrap(err, "init forwarder");
}

Expand Down
7 changes: 7 additions & 0 deletions trunk/src/utest/srs_utest_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2914,6 +2914,7 @@ VOID TEST(ConfigMainTest, CheckVhostConfig2)
EXPECT_EQ(5000000, conf.get_publish_normal_timeout("ossrs.net"));
EXPECT_FALSE(conf.get_forward_enabled("ossrs.net"));
EXPECT_TRUE(conf.get_forwards("ossrs.net") == NULL);
EXPECT_TRUE(conf.get_forwards("ossrs.net", "live", "livestream") == NULL);
}

if (true) {
Expand All @@ -2928,6 +2929,12 @@ VOID TEST(ConfigMainTest, CheckVhostConfig2)
EXPECT_TRUE(conf.get_forward_enabled("ossrs.net"));
}

if (true) {
MockSrsConfig conf;
HELPER_ASSERT_SUCCESS(conf.parse(_MIN_OK_CONF "vhost ossrs.net{forward live/livestream {destination xxx;}}"));
EXPECT_TRUE(conf.get_forwards("ossrs.net", "live", "livestream"));
}

if (true) {
MockSrsConfig conf;
HELPER_ASSERT_SUCCESS(conf.parse(_MIN_OK_CONF "vhost ossrs.net{publish {normal_timeout 10;}}"));
Expand Down

0 comments on commit 2b0da1a

Please sign in to comment.