Skip to content

Commit

Permalink
Add chomp_records option for compatible format with plugin v2 - #142
Browse files Browse the repository at this point in the history
  • Loading branch information
simukappu committed Feb 17, 2019
1 parent 780ae4d commit 715dced
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 13 deletions.
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ This plugin uses the same configuration in [fluent-plugin-s3][fluent-plugin-s3].
**aws_key_id**

AWS access key id. This parameter is required when your agent is not
running on EC2 instance with an IAM Role. When using an IAM role, make
running on EC2 instance with an IAM Role. When using an IAM role, make
sure to configure `instance_profile_credentials`. Usage can be found below.

**aws_sec_key**
Expand Down Expand Up @@ -178,7 +178,7 @@ their customers' accounts.
**sts_http_proxy**

Proxy url for proxying requests to amazon sts service api. This needs to be set up independently from global http_proxy parameter
for the use case in which requests to kinesis api are going via kinesis vpc endpoint but requests to sts api have to go via http proxy.
for the use case in which requests to kinesis api are going via kinesis vpc endpoint but requests to sts api have to go via http proxy.
It should be added to assume_role_credentials configuration stanza in the next format:
sts_http_proxy http://[username:password]@hostname:port

Expand Down Expand Up @@ -300,6 +300,10 @@ Specifing compression way for data of each record. Current accepted options are
### log_truncate_max_size
Integer, default 1024. When emitting the log entry, the message will be truncated by this size to avoid infinite loop when the log is also sent to Kinesis. The value 0 means no truncation.

### chomp_records
Boolean. Default `false`. If it is enabled, formatter calls chomp records (it removes separator from the end of each record) for compatible format with plugin v2.
See [#142](https://github.com/awslabs/aws-fluent-plugin-kinesis/issues/142) for more details.

## Configuraion: API
### region
AWS region of your stream. It should be in form like `us-east-1`, `us-west-2`. Refer to [Regions and Endpoints in AWS General Reference][region] for supported regions.
Expand Down Expand Up @@ -362,7 +366,7 @@ Here are `kinesis_firehose` specific configurations.
Name of the delivery stream to put data.

### append_new_line
Boolean. Default `true`. If it is enabled, the plugin add new line character (`\n`) to each serialized record.
Boolean. Default `true`. If it is enabled, the plugin add new line character (`\n`) to each serialized record. The overridden formatter calls chomp records (it removes separator from the end of each record) and append new line character (`\n`) to each record.

## Configuration: kinesis_streams_aggregated
Here are `kinesis_streams_aggregated` specific configurations.
Expand Down
24 changes: 18 additions & 6 deletions lib/fluent/plugin/kinesis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ def initialize(record)
config_param :data_key, :string, default: nil
config_param :log_truncate_max_size, :integer, default: 1024
config_param :compression, :string, default: nil

desc "Formatter calls chomp records for compatible format with plugin v2. (default: false)"
# https://github.com/awslabs/aws-fluent-plugin-kinesis/issues/142
config_param :chomp_records, :bool, default: false

config_section :format do
config_set_default :@type, 'json'
end
Expand Down Expand Up @@ -84,12 +89,19 @@ def data_formatter_create(conf)
formatter = formatter_create
compressor = compressor_create
if @data_key.nil?
->(tag, time, record) {
record = inject_values_to_record(tag, time, record)
# TODO Make option to chomp as compatible format with v2
# compressor.call(formatter.format(tag, time, record).chomp.b)
compressor.call(formatter.format(tag, time, record).b)
}
if @chomp_records
->(tag, time, record) {
record = inject_values_to_record(tag, time, record)
# Formatter calls chomp records for compatible format with plugin v2
# https://github.com/awslabs/aws-fluent-plugin-kinesis/issues/142
compressor.call(formatter.format(tag, time, record).chomp.b)
}
else
->(tag, time, record) {
record = inject_values_to_record(tag, time, record)
compressor.call(formatter.format(tag, time, record).b)
}
end
else
->(tag, time, record) {
raise InvalidRecordError, record unless record.is_a? Hash
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/out_kinesis_firehose.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def configure(conf)
if @append_new_line
org_data_formatter = @data_formatter
@data_formatter = ->(tag, time, record) {
org_data_formatter.call(tag, time, record) + "\n"
org_data_formatter.call(tag, time, record).chomp + "\n"
}
end
end
Expand Down
26 changes: 24 additions & 2 deletions test/plugin/test_out_kinesis_firehose.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def test_format(data)
formatter, expected = data
d = create_driver(default_config + "<format>\n@type #{formatter}\n</format>")
driver_run(d, [{"a"=>1,"b"=>2}])
assert_equal ("#{expected}\n" + "\n").b, @server.records.first
assert_equal ("#{expected}\n").b, @server.records.first
end

data(
Expand All @@ -92,6 +92,28 @@ def test_format_without_append_new_line(data)
assert_equal "#{expected}\n".b, @server.records.first
end

data(
'json' => ['json', '{"a":1,"b":2}'],
'ltsv' => ['ltsv', "a:1\tb:2"],
)
def test_format_with_chomp_records(data)
formatter, expected = data
d = create_driver(default_config + "<format>\n@type #{formatter}\n</format>\nchomp_records true")
driver_run(d, [{"a"=>1,"b"=>2}])
assert_equal "#{expected}\n".b, @server.records.first
end

data(
'json' => ['json', '{"a":1,"b":2}'],
'ltsv' => ['ltsv', "a:1\tb:2"],
)
def test_format_with_chomp_records_without_append_new_line(data)
formatter, expected = data
d = create_driver(default_config + "<format>\n@type #{formatter}\n</format>\nchomp_records true\nappend_new_line false")
driver_run(d, [{"a"=>1,"b"=>2}])
assert_equal expected, @server.records.first
end

def test_data_key
d = create_driver(default_config + "data_key a")
driver_run(d, [{"a"=>1,"b"=>2}, {"b"=>2}])
Expand Down Expand Up @@ -173,7 +195,7 @@ def test_multibyte_input
record = {"a" => "てすと"}
driver_run(d, [record])
assert_equal 0, d.instance.log.out.logs.size
assert_equal ("#{record.to_json}\n" + "\n").b, @server.records.first
assert_equal record.to_json.b + "\n", @server.records.first
end

def test_record_count
Expand Down
13 changes: 12 additions & 1 deletion test/plugin/test_out_kinesis_streams.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ def test_format(data)
assert_equal expected + "\n", @server.records.first
end

data(
'json' => ['json', '{"a":1,"b":2}'],
'ltsv' => ['ltsv', "a:1\tb:2"],
)
def test_format_with_chomp_records(data)
formatter, expected = data
d = create_driver(default_config + "<format>\n@type #{formatter}\n</format>\nchomp_records true")
driver_run(d, [{"a"=>1,"b"=>2}])
assert_equal expected, @server.records.first
end

def test_partition_key_not_found
d = create_driver(default_config + "partition_key partition_key")
driver_run(d, [{"a"=>1}])
Expand Down Expand Up @@ -151,7 +162,7 @@ def test_multibyte_input
record = {"a" => "てすと"}
driver_run(d, [record])
assert_equal 0, d.instance.log.out.logs.size
assert_equal (record.to_json + "\n").b, @server.records.first
assert_equal (record.to_json + "\n").b, @server.records.first
end

def test_record_count
Expand Down
11 changes: 11 additions & 0 deletions test/plugin/test_out_kinesis_streams_aggregated.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ def test_format(data)
assert_equal (expected + "\n").b, @server.records.first
end

data(
'json' => ['json', '{"a":1,"b":2}'],
'ltsv' => ['ltsv', "a:1\tb:2"],
)
def test_format_with_chomp_records(data)
formatter, expected = data
d = create_driver(default_config + "<format>\n@type #{formatter}\n</format>\nchomp_records true")
driver_run(d, [{"a"=>1,"b"=>2}])
assert_equal expected.b, @server.records.first
end

def test_data_key
d = create_driver(default_config + "data_key a")
driver_run(d, [{"a"=>1,"b"=>2}, {"b"=>2}])
Expand Down

0 comments on commit 715dced

Please sign in to comment.