Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/kafka #510

Merged
merged 46 commits into from
Oct 23, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
ca73534
refine comments for api key
winlinvip Oct 13, 2015
d9f991e
use system utility for string finds
winlinvip Oct 13, 2015
908365a
use string for const char*
winlinvip Oct 13, 2015
69cc01b
use system utilities
winlinvip Oct 13, 2015
20fcfb3
fix bug, use system utility
winlinvip Oct 13, 2015
a9bb606
use tcp client for raw connect.
winlinvip Oct 13, 2015
24b3899
use SrsTcpClient instead raw socket.
winlinvip Oct 14, 2015
adb8f24
use simple rtmp client for raw connect app
winlinvip Oct 14, 2015
a9ad7b2
refine simple rtmp client for post flv stream.
winlinvip Oct 14, 2015
e4c8529
refine code, support override vhost
winlinvip Oct 14, 2015
bc27481
refine code, use simple rtmp client.
winlinvip Oct 14, 2015
12e0131
refine code, use simple rtmp client.
winlinvip Oct 14, 2015
0f4cb8e
refine code, remove the rtmp recv/send macro, use one macro.
winlinvip Oct 14, 2015
ad9b377
refine forwarder code, use simple rtmp client.
winlinvip Oct 14, 2015
a08d8f8
refine mpegts code, use simple rtmp client
winlinvip Oct 14, 2015
a9fdb63
refine code, replace all rtmp connect by simple rtmp client.
winlinvip Oct 14, 2015
8974fe2
connect to kafka and send metadata request.
winlinvip Oct 15, 2015
8a4ec49
add graph comments for size of request and response.
winlinvip Oct 15, 2015
45755fd
refine code for kakfa request/response, string/bytes.
winlinvip Oct 16, 2015
9117e1e
extract ISrsCodec for code and decode object.
winlinvip Oct 16, 2015
3c64e4b
kafka encode and send packet.
winlinvip Oct 16, 2015
2e67eb8
refine comments for kafka messages.
winlinvip Oct 16, 2015
6e5ed11
for kafka, support correlation id cache.
winlinvip Oct 19, 2015
c486287
kafka rename message set to raw message set.
winlinvip Oct 19, 2015
493d282
kafka refine array, to decode and create object.
winlinvip Oct 19, 2015
a108fa8
kafka recv and decode message.
winlinvip Oct 19, 2015
7106934
kafka producer use async interface to request metadata.
winlinvip Oct 19, 2015
84b3981
refs #1670: support decode the metadata response.
winlinvip Oct 22, 2015
33a0153
add function to convert kafka array to vector
winlinvip Oct 22, 2015
d013374
rename ISrsCodec size to nb_bytes.
winlinvip Oct 22, 2015
f0e39cc
support show the summary of kafka metadata.
winlinvip Oct 22, 2015
31a77a8
convert metadata to partitions
winlinvip Oct 22, 2015
de41c1c
kafka refine comments.
winlinvip Oct 22, 2015
7013993
kafka send the accept message.
winlinvip Oct 22, 2015
61486a8
kafka use topic and partition cache
winlinvip Oct 22, 2015
531b658
retry when metadata empty
winlinvip Oct 22, 2015
22fa9a0
kafka use temp transport to fetch metadata.
winlinvip Oct 22, 2015
f9f5b56
kakfa erase messages when wrote.
winlinvip Oct 22, 2015
7a0aaf5
kafka refine code
winlinvip Oct 22, 2015
8e344f1
add producer api messages.
winlinvip Oct 23, 2015
76cd3f8
kafka convert json to producer message.
winlinvip Oct 23, 2015
9a47390
fix #467, support write log to kafka. 3.0.6
winlinvip Oct 23, 2015
7145187
refine kafka, simplify code.
winlinvip Oct 23, 2015
d2ca51a
notify kafka when client close
winlinvip Oct 23, 2015
cbe4252
refine code.
winlinvip Oct 23, 2015
b8f2ba4
complete kafka
winlinvip Oct 23, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ The `features`, `compare`, `release` and `performance` of SRS.
1. Support NGINX-RTMP style EXEC, read [#367][bug #367].
1. Support NGINX-RTMP style dvr control module, read [#459][bug #459].
1. Support HTTP Security Raw Api, read [#459][bug #459], [#470][bug #470], [#319][bug #319].
1. [dev]Support Integration with Kafka/Spark Big-Data system, read [#467][bug #467].
1. Support Integration with Kafka/Spark Big-Data system, read [#467][bug #467].
1. [plan]Support Origin Cluster for Load Balance and Fault Tolarence, read [#464][bug #464], [RTMP 302][bug #92].
1. [plan]Support H.265, push RTMP with H.265, delivery in HLS, read [#465][bug #465].
1. [plan]Support MPEG-DASH, the future streaming protocol, read [#299][bug #299].
Expand Down Expand Up @@ -386,6 +386,7 @@ Remark:

### History

* v3.0, 2015-10-23, fix [#467][bug #467], support write log to kafka. 3.0.6
* v3.0, 2015-10-20, fix [#502][bug #502], support snapshot with http-callback or transcoder. 3.0.5
* v3.0, 2015-09-19, support amf0 and json to convert with each other.
* v3.0, 2015-09-19, json objects support dumps to string.
Expand Down Expand Up @@ -1279,6 +1280,7 @@ Winlin
[bug #466]: https://github.com/simple-rtmp-server/srs/issues/466
[bug #468]: https://github.com/simple-rtmp-server/srs/issues/468
[bug #502]: https://github.com/simple-rtmp-server/srs/issues/502
[bug #467]: https://github.com/simple-rtmp-server/srs/issues/467
[bug #xxxxxxx]: https://github.com/simple-rtmp-server/srs/issues/xxxxxxx

[r2.0a2]: https://github.com/simple-rtmp-server/srs/releases/tag/v2.0-a2
Expand Down
5 changes: 4 additions & 1 deletion trunk/conf/full.conf
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,11 @@ kafka {
enabled off;
# the broker list, broker is <ip:port>
# and use space to specify multple brokers.
# for exampl, 127.0.0.1:9092 127.0.0.1:9093
# for example, 127.0.0.1:9092 127.0.0.1:9093
brokers 127.0.0.1:9092;
# the kafka topic to use.
# default: srs
topic srs;
}

#############################################################################################
Expand Down
189 changes: 27 additions & 162 deletions trunk/src/app/srs_app_caster_flv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ using namespace std;
#include <srs_app_utility.hpp>
#include <srs_protocol_amf0.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_app_rtmp_conn.hpp>

#define SRS_HTTP_FLV_STREAM_BUFFER 4096

Expand Down Expand Up @@ -117,20 +118,13 @@ int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m)
: SrsHttpConn(cm, fd, m)
{

req = NULL;
io = NULL;
client = NULL;
stfd = NULL;
stream_id = 0;

sdk = new SrsSimpleRtmpClient();
pprint = SrsPithyPrint::create_caster();
}

SrsDynamicHttpConn::~SrsDynamicHttpConn()
{
close();

srs_freep(sdk);
srs_freep(pprint);
}

Expand Down Expand Up @@ -176,7 +170,7 @@ int SrsDynamicHttpConn::proxy(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std
}

ret = do_proxy(rr, &dec);
close();
sdk->close();

return ret;
}
Expand All @@ -185,14 +179,22 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec)
{
int ret = ERROR_SUCCESS;

int64_t cto = SRS_CONSTS_RTMP_TIMEOUT_US;
int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US;
if ((ret = sdk->connect(output, cto, sto)) != ERROR_SUCCESS) {
srs_error("flv: connect %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", output.c_str(), cto, sto, ret);
return ret;
}

if ((ret = sdk->publish()) != ERROR_SUCCESS) {
srs_error("flv: publish failed. ret=%d", ret);
return ret;
}

char pps[4];
while (!rr->eof()) {
pprint->elapse();

if ((ret = connect()) != ERROR_SUCCESS) {
return ret;
}

char type;
int32_t size;
u_int32_t time;
Expand All @@ -212,13 +214,23 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec)
return ret;
}

if ((ret = rtmp_write_packet(type, time, data, size)) != ERROR_SUCCESS) {
SrsSharedPtrMessage* msg = NULL;
if ((ret = sdk->rtmp_create_msg(type, time, data, size, &msg)) != ERROR_SUCCESS) {
return ret;
}

// TODO: FIXME: for post flv, reconnect when error.
if ((ret = sdk->send_and_free_message(msg)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("flv: proxy rtmp packet failed. ret=%d", ret);
}
return ret;
}

if (pprint->can_print()) {
srs_trace("flv: send msg %d age=%d, dts=%d, size=%d", type, pprint->age(), time, size);
}

if ((ret = dec->read_previous_tag_size(pps)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("flv: proxy tag header pps failed. ret=%d", ret);
Expand All @@ -230,153 +242,6 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec)
return ret;
}

int SrsDynamicHttpConn::rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size)
{
int ret = ERROR_SUCCESS;

SrsSharedPtrMessage* msg = NULL;

if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, stream_id, &msg)) != ERROR_SUCCESS) {
srs_error("flv: create shared ptr msg failed. ret=%d", ret);
return ret;
}
srs_assert(msg);

if (pprint->can_print()) {
srs_trace("flv: send msg %s age=%d, dts=%"PRId64", size=%d",
msg->is_audio()? "A":msg->is_video()? "V":"N", pprint->age(), msg->timestamp, msg->size);
}

// send out encoded msg.
if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) {
return ret;
}

return ret;
}

int SrsDynamicHttpConn::connect()
{
int ret = ERROR_SUCCESS;

// when ok, ignore.
// TODO: FIXME: should reconnect when disconnected.
if (io || client) {
return ret;
}

// parse uri
if (!req) {
req = new SrsRequest();

size_t pos = string::npos;
string uri = req->tcUrl = output;

// tcUrl, stream
if ((pos = uri.rfind("/")) != string::npos) {
req->stream = uri.substr(pos + 1);
req->tcUrl = uri = uri.substr(0, pos);
}

srs_discovery_tc_url(req->tcUrl,
req->schema, req->host, req->vhost, req->app, req->port,
req->param);
}

// connect host.
if ((ret = srs_socket_connect(req->host, ::atoi(req->port.c_str()), ST_UTIME_NO_TIMEOUT, &stfd)) != ERROR_SUCCESS) {
srs_error("mpegts: connect server %s:%s failed. ret=%d", req->host.c_str(), req->port.c_str(), ret);
return ret;
}
io = new SrsStSocket(stfd);
client = new SrsRtmpClient(io);

client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);

// connect to vhost/app
if ((ret = client->handshake()) != ERROR_SUCCESS) {
srs_error("mpegts: handshake with server failed. ret=%d", ret);
return ret;
}
if ((ret = connect_app(req->host, req->port)) != ERROR_SUCCESS) {
srs_error("mpegts: connect with server failed. ret=%d", ret);
return ret;
}
if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
srs_error("mpegts: connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
return ret;
}

// publish.
if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) {
srs_error("mpegts: publish failed, stream=%s, stream_id=%d. ret=%d",
req->stream.c_str(), stream_id, ret);
return ret;
}

return ret;
}

// TODO: FIXME: refine the connect_app.
int SrsDynamicHttpConn::connect_app(string ep_server, string ep_port)
{
int ret = ERROR_SUCCESS;

// args of request takes the srs info.
if (req->args == NULL) {
req->args = SrsAmf0Any::object();
}

// notify server the edge identity,
// @see https://github.com/simple-rtmp-server/srs/issues/147
SrsAmf0Object* data = req->args;
data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY));
data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")"));
data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE));
data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE));
data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL));
data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));
data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB));
data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL));
data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT));
data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));
data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));
// for edge to directly get the id of client.
data->set("srs_pid", SrsAmf0Any::number(getpid()));
data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id()));

// local ip of edge
std::vector<std::string> ips = srs_get_local_ipv4_ips();
assert(_srs_config->get_stats_network() < (int)ips.size());
std::string local_ip = ips[_srs_config->get_stats_network()];
data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str()));

// generate the tcUrl
std::string param = "";
std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param);

// upnode server identity will show in the connect_app of client.
// @see https://github.com/simple-rtmp-server/srs/issues/160
// the debug_srs_upnode is config in vhost and default to true.
bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost);
if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) {
srs_error("mpegts: connect with server failed, tcUrl=%s, dsu=%d. ret=%d",
tc_url.c_str(), debug_srs_upnode, ret);
return ret;
}

return ret;
}

void SrsDynamicHttpConn::close()
{
srs_freep(client);
srs_freep(io);
srs_freep(req);
srs_close_stfd(stfd);
}

SrsHttpFileReader::SrsHttpFileReader(ISrsHttpResponseReader* h)
{
http = h;
Expand Down
17 changes: 3 additions & 14 deletions trunk/src/app/srs_app_caster_flv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class SrsRequest;
class SrsPithyPrint;
class ISrsHttpResponseReader;
class SrsFlvDecoder;
class SrsTcpClient;
class SrsSimpleRtmpClient;

#include <srs_app_st.hpp>
#include <srs_app_listener.hpp>
Expand Down Expand Up @@ -84,12 +86,7 @@ class SrsDynamicHttpConn : public SrsHttpConn
private:
std::string output;
SrsPithyPrint* pprint;
private:
SrsRequest* req;
st_netfd_t stfd;
SrsStSocket* io;
SrsRtmpClient* client;
int stream_id;
SrsSimpleRtmpClient* sdk;
public:
SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m);
virtual ~SrsDynamicHttpConn();
Expand All @@ -99,14 +96,6 @@ class SrsDynamicHttpConn : public SrsHttpConn
virtual int proxy(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string o);
private:
virtual int do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec);
virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size);
private:
// connect to rtmp output url.
// @remark ignore when not connected, reconnect when disconnected.
virtual int connect();
virtual int connect_app(std::string ep_server, std::string ep_port);
// close the connected io and rtmp to ready to be re-connect.
virtual void close();
};

/**
Expand Down
21 changes: 20 additions & 1 deletion trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2128,6 +2128,8 @@ int SrsConfig::global_to_json(SrsJsonObject* obj)
sobj->set(sdir->name, sdir->dumps_arg0_to_boolean());
} else if (sdir->name == "brokers") {
sobj->set(sdir->name, sdir->dumps_args());
} else if (sdir->name == "topic") {
sobj->set(sdir->name, sdir->dumps_arg0_to_str());
}
}
obj->set(dir->name, sobj);
Expand Down Expand Up @@ -3546,7 +3548,7 @@ int SrsConfig::check_config()
SrsConfDirective* conf = root->get("kafka");
for (int i = 0; conf && i < (int)conf->directives.size(); i++) {
string n = conf->at(i)->name;
if (n != "enabled" && n != "brokers") {
if (n != "enabled" && n != "brokers" && n != "topic") {
ret = ERROR_SYSTEM_CONFIG_INVALID;
srs_error("unsupported kafka directive %s, ret=%d", n.c_str(), ret);
return ret;
Expand Down Expand Up @@ -4298,6 +4300,23 @@ SrsConfDirective* SrsConfig::get_kafka_brokers()
return conf;
}

string SrsConfig::get_kafka_topic()
{
static string DEFAULT = "srs";

SrsConfDirective* conf = root->get("kafka");
if (!conf) {
return DEFAULT;
}

conf = conf->get("topic");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}

return conf->arg0();
}

SrsConfDirective* SrsConfig::get_vhost(string vhost, bool try_default_vhost)
{
srs_assert(root);
Expand Down
4 changes: 4 additions & 0 deletions trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,10 @@ class SrsConfig
* get the broker list, each is format in <ip:port>.
*/
virtual SrsConfDirective* get_kafka_brokers();
/**
* get the kafka topic to use for srs.
*/
virtual std::string get_kafka_topic();
// vhost specified section
public:
/**
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_dvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ int SrsFlvSegment::open(bool use_tmp_file)
bool fresh_flv_file = !srs_path_exists(path);

// create dir first.
std::string dir = path.substr(0, path.rfind("/"));
std::string dir = srs_path_dirname(path);
if ((ret = srs_create_dir_recursively(dir)) != ERROR_SUCCESS) {
srs_error("create dir=%s failed. ret=%d", dir.c_str(), ret);
return ret;
Expand Down Expand Up @@ -410,7 +410,7 @@ string SrsFlvSegment::generate_path()
std::string path_config = _srs_config->get_dvr_path(req->vhost);

// add [stream].[timestamp].flv as filename for dir
if (path_config.find(".flv") != path_config.length() - 4) {
if (!srs_string_ends_with(path_config, ".flv")) {
path_config += "/[stream].[timestamp].flv";
}

Expand Down
Loading