From 715dced8fd0ae94bc07fcab31307ff4e50e78ca6 Mon Sep 17 00:00:00 2001 From: simukappu Date: Sun, 17 Feb 2019 21:05:30 +0900 Subject: [PATCH] Add chomp_records option for compatible format with plugin v2 - #142 --- README.md | 10 ++++--- lib/fluent/plugin/kinesis.rb | 24 ++++++++++++----- lib/fluent/plugin/out_kinesis_firehose.rb | 2 +- test/plugin/test_out_kinesis_firehose.rb | 26 +++++++++++++++++-- test/plugin/test_out_kinesis_streams.rb | 13 +++++++++- .../test_out_kinesis_streams_aggregated.rb | 11 ++++++++ 6 files changed, 73 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index ea4acfc..1ef40c6 100644 --- a/README.md +++ b/README.md @@ -119,7 +119,7 @@ This plugin uses the same configuration in [fluent-plugin-s3][fluent-plugin-s3]. **aws_key_id** AWS access key id. This parameter is required when your agent is not -running on EC2 instance with an IAM Role. When using an IAM role, make +running on EC2 instance with an IAM Role. When using an IAM role, make sure to configure `instance_profile_credentials`. Usage can be found below. **aws_sec_key** @@ -178,7 +178,7 @@ their customers' accounts. **sts_http_proxy** Proxy url for proxying requests to amazon sts service api. This needs to be set up independently from global http_proxy parameter -for the use case in which requests to kinesis api are going via kinesis vpc endpoint but requests to sts api have to go via http proxy. +for the use case in which requests to kinesis api are going via kinesis vpc endpoint but requests to sts api have to go via http proxy. It should be added to assume_role_credentials configuration stanza in the next format: sts_http_proxy http://[username:password]@hostname:port @@ -300,6 +300,10 @@ Specifing compression way for data of each record. Current accepted options are ### log_truncate_max_size Integer, default 1024. When emitting the log entry, the message will be truncated by this size to avoid infinite loop when the log is also sent to Kinesis. The value 0 means no truncation. +### chomp_records +Boolean. Default `false`. If it is enabled, formatter calls chomp records (it removes separator from the end of each record) for compatible format with plugin v2. +See [#142](https://github.com/awslabs/aws-fluent-plugin-kinesis/issues/142) for more details. + ## Configuraion: API ### region AWS region of your stream. It should be in form like `us-east-1`, `us-west-2`. Refer to [Regions and Endpoints in AWS General Reference][region] for supported regions. @@ -362,7 +366,7 @@ Here are `kinesis_firehose` specific configurations. Name of the delivery stream to put data. ### append_new_line -Boolean. Default `true`. If it is enabled, the plugin add new line character (`\n`) to each serialized record. +Boolean. Default `true`. If it is enabled, the plugin add new line character (`\n`) to each serialized record. The overridden formatter calls chomp records (it removes separator from the end of each record) and append new line character (`\n`) to each record. ## Configuration: kinesis_streams_aggregated Here are `kinesis_streams_aggregated` specific configurations. diff --git a/lib/fluent/plugin/kinesis.rb b/lib/fluent/plugin/kinesis.rb index 989926c..d7374c1 100644 --- a/lib/fluent/plugin/kinesis.rb +++ b/lib/fluent/plugin/kinesis.rb @@ -57,6 +57,11 @@ def initialize(record) config_param :data_key, :string, default: nil config_param :log_truncate_max_size, :integer, default: 1024 config_param :compression, :string, default: nil + + desc "Formatter calls chomp records for compatible format with plugin v2. (default: false)" + # https://github.com/awslabs/aws-fluent-plugin-kinesis/issues/142 + config_param :chomp_records, :bool, default: false + config_section :format do config_set_default :@type, 'json' end @@ -84,12 +89,19 @@ def data_formatter_create(conf) formatter = formatter_create compressor = compressor_create if @data_key.nil? - ->(tag, time, record) { - record = inject_values_to_record(tag, time, record) - # TODO Make option to chomp as compatible format with v2 - # compressor.call(formatter.format(tag, time, record).chomp.b) - compressor.call(formatter.format(tag, time, record).b) - } + if @chomp_records + ->(tag, time, record) { + record = inject_values_to_record(tag, time, record) + # Formatter calls chomp records for compatible format with plugin v2 + # https://github.com/awslabs/aws-fluent-plugin-kinesis/issues/142 + compressor.call(formatter.format(tag, time, record).chomp.b) + } + else + ->(tag, time, record) { + record = inject_values_to_record(tag, time, record) + compressor.call(formatter.format(tag, time, record).b) + } + end else ->(tag, time, record) { raise InvalidRecordError, record unless record.is_a? Hash diff --git a/lib/fluent/plugin/out_kinesis_firehose.rb b/lib/fluent/plugin/out_kinesis_firehose.rb index 79700fc..ff9e41c 100644 --- a/lib/fluent/plugin/out_kinesis_firehose.rb +++ b/lib/fluent/plugin/out_kinesis_firehose.rb @@ -32,7 +32,7 @@ def configure(conf) if @append_new_line org_data_formatter = @data_formatter @data_formatter = ->(tag, time, record) { - org_data_formatter.call(tag, time, record) + "\n" + org_data_formatter.call(tag, time, record).chomp + "\n" } end end diff --git a/test/plugin/test_out_kinesis_firehose.rb b/test/plugin/test_out_kinesis_firehose.rb index 490b499..6c3fb23 100644 --- a/test/plugin/test_out_kinesis_firehose.rb +++ b/test/plugin/test_out_kinesis_firehose.rb @@ -78,7 +78,7 @@ def test_format(data) formatter, expected = data d = create_driver(default_config + "\n@type #{formatter}\n") driver_run(d, [{"a"=>1,"b"=>2}]) - assert_equal ("#{expected}\n" + "\n").b, @server.records.first + assert_equal ("#{expected}\n").b, @server.records.first end data( @@ -92,6 +92,28 @@ def test_format_without_append_new_line(data) assert_equal "#{expected}\n".b, @server.records.first end + data( + 'json' => ['json', '{"a":1,"b":2}'], + 'ltsv' => ['ltsv', "a:1\tb:2"], + ) + def test_format_with_chomp_records(data) + formatter, expected = data + d = create_driver(default_config + "\n@type #{formatter}\n\nchomp_records true") + driver_run(d, [{"a"=>1,"b"=>2}]) + assert_equal "#{expected}\n".b, @server.records.first + end + + data( + 'json' => ['json', '{"a":1,"b":2}'], + 'ltsv' => ['ltsv', "a:1\tb:2"], + ) + def test_format_with_chomp_records_without_append_new_line(data) + formatter, expected = data + d = create_driver(default_config + "\n@type #{formatter}\n\nchomp_records true\nappend_new_line false") + driver_run(d, [{"a"=>1,"b"=>2}]) + assert_equal expected, @server.records.first + end + def test_data_key d = create_driver(default_config + "data_key a") driver_run(d, [{"a"=>1,"b"=>2}, {"b"=>2}]) @@ -173,7 +195,7 @@ def test_multibyte_input record = {"a" => "てすと"} driver_run(d, [record]) assert_equal 0, d.instance.log.out.logs.size - assert_equal ("#{record.to_json}\n" + "\n").b, @server.records.first + assert_equal record.to_json.b + "\n", @server.records.first end def test_record_count diff --git a/test/plugin/test_out_kinesis_streams.rb b/test/plugin/test_out_kinesis_streams.rb index d71bb9e..0293523 100644 --- a/test/plugin/test_out_kinesis_streams.rb +++ b/test/plugin/test_out_kinesis_streams.rb @@ -81,6 +81,17 @@ def test_format(data) assert_equal expected + "\n", @server.records.first end + data( + 'json' => ['json', '{"a":1,"b":2}'], + 'ltsv' => ['ltsv', "a:1\tb:2"], + ) + def test_format_with_chomp_records(data) + formatter, expected = data + d = create_driver(default_config + "\n@type #{formatter}\n\nchomp_records true") + driver_run(d, [{"a"=>1,"b"=>2}]) + assert_equal expected, @server.records.first + end + def test_partition_key_not_found d = create_driver(default_config + "partition_key partition_key") driver_run(d, [{"a"=>1}]) @@ -151,7 +162,7 @@ def test_multibyte_input record = {"a" => "てすと"} driver_run(d, [record]) assert_equal 0, d.instance.log.out.logs.size - assert_equal (record.to_json + "\n").b, @server.records.first + assert_equal (record.to_json + "\n").b, @server.records.first end def test_record_count diff --git a/test/plugin/test_out_kinesis_streams_aggregated.rb b/test/plugin/test_out_kinesis_streams_aggregated.rb index 3953f38..d5dba3a 100644 --- a/test/plugin/test_out_kinesis_streams_aggregated.rb +++ b/test/plugin/test_out_kinesis_streams_aggregated.rb @@ -82,6 +82,17 @@ def test_format(data) assert_equal (expected + "\n").b, @server.records.first end + data( + 'json' => ['json', '{"a":1,"b":2}'], + 'ltsv' => ['ltsv', "a:1\tb:2"], + ) + def test_format_with_chomp_records(data) + formatter, expected = data + d = create_driver(default_config + "\n@type #{formatter}\n\nchomp_records true") + driver_run(d, [{"a"=>1,"b"=>2}]) + assert_equal expected.b, @server.records.first + end + def test_data_key d = create_driver(default_config + "data_key a") driver_run(d, [{"a"=>1,"b"=>2}, {"b"=>2}])