From cea5835db556c83fe99afec0715b52d4f3190dc0 Mon Sep 17 00:00:00 2001 From: Nils Kuhn Date: Tue, 1 Sep 2020 13:49:42 +0200 Subject: [PATCH 01/10] :pencil: Added new config settings to ascii doc --- docs/index.asciidoc | 48 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 734646b..0f8c739 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -40,6 +40,10 @@ This plugin supports the following configuration options plus the <> |<>|No | <> |<>|No | <> |<>|Yes +| <> |<>|Yes +| <> |<>|No +| <> |<>|No +| <> |<>|No |======================================================================= Also see <> for a list of options supported by all @@ -129,6 +133,50 @@ The number of seconds to wait after failure before retrying. A MongoDB URI to connect to. See http://docs.mongodb.org/manual/reference/connection-string/. +[id="plugins-{type}s-{plugin}-action"] +===== `action` + +* Value type is <> +* Default value is `insert`. + +The method used to write processed events to MongoDB. + +Possible values are `insert`, `update` or `replace`. + +[id="plugins-{type}s-{plugin}-query_key"] +===== `query_key` + +* Value type is <> +* Default value is `_id`. + +The key of the query to find the document to update or replace in MongoDB. + +query_key is used like described https://docs.mongodb.com/ruby-driver/v2.6/tutorials/ruby-driver-bulk-operations[here] +for `update` and `replace` examples: + + :filter => {query_key => query_value} + +[id="plugins-{type}s-{plugin}-query_value"] +===== `query_value` + +* Value type is <> +* There is no default value for this setting. + +The value of the query to find the document to update or replace in MongoDB. This can be dynamic using the `%{foo}` syntax. + +query_value is used like described https://docs.mongodb.com/ruby-driver/v2.6/tutorials/ruby-driver-bulk-operations[here] +for `update` and `replace` examples: + + :filter => {query_key => query_value} + +[id="plugins-{type}s-{plugin}-upsert"] +===== `upsert` + +* Value type is <> +* Default value is `false`. + +If true, a new document is created if no document exists in DB with given `document_id`. +Only applies if action is `update` or `replace`. [id="plugins-{type}s-{plugin}-common-options"] From e1d3774f31307ef2b559cf5b8d58c7ef945abfc9 Mon Sep 17 00:00:00 2001 From: Nils Kuhn Date: Tue, 1 Sep 2020 13:53:44 +0200 Subject: [PATCH 02/10] :bug: Fix for issues #60, #64, #65 and #66 Makes plugin work with mongodb driver, version 2.6 (again) --- lib/logstash/outputs/bson/big_decimal.rb | 2 +- lib/logstash/outputs/bson/logstash_event.rb | 2 +- lib/logstash/outputs/bson/logstash_timestamp.rb | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/logstash/outputs/bson/big_decimal.rb b/lib/logstash/outputs/bson/big_decimal.rb index f7a2c89..3b5d4ef 100644 --- a/lib/logstash/outputs/bson/big_decimal.rb +++ b/lib/logstash/outputs/bson/big_decimal.rb @@ -33,7 +33,7 @@ module BigDecimal # 1.221311.to_bson # @return [ String ] The encoded string. # @see http://bsonspec.org/#/specification - def to_bson(buffer = ByteBuffer.new) + def to_bson(buffer = ByteBuffer.new, validating_keys = Config.validating_keys?) buffer.put_bytes([ self ].pack(PACK)) end diff --git a/lib/logstash/outputs/bson/logstash_event.rb b/lib/logstash/outputs/bson/logstash_event.rb index 7dcd4c2..9b4e17c 100644 --- a/lib/logstash/outputs/bson/logstash_event.rb +++ b/lib/logstash/outputs/bson/logstash_event.rb @@ -30,7 +30,7 @@ module LogStashEvent # Event.new("field" => "value").to_bson # @return [ String ] The encoded string. # @see http://bsonspec.org/#/specification - def to_bson(buffer = ByteBuffer.new) + def to_bson(buffer = ByteBuffer.new, validating_keys = Config.validating_keys?) position = buffer.length buffer.put_int32(0) to_hash.each do |field, value| diff --git a/lib/logstash/outputs/bson/logstash_timestamp.rb b/lib/logstash/outputs/bson/logstash_timestamp.rb index cb2141f..6eae0f0 100644 --- a/lib/logstash/outputs/bson/logstash_timestamp.rb +++ b/lib/logstash/outputs/bson/logstash_timestamp.rb @@ -25,7 +25,7 @@ module LogStashTimestamp # A time is type 0x09 in the BSON spec. BSON_TYPE = 9.chr.force_encoding(BINARY).freeze - def to_bson(buffer = ByteBuffer.new) + def to_bson(buffer = ByteBuffer.new, validating_keys = Config.validating_keys?) time.to_bson(buffer) end From 663a28ba2bda8d5e5a0f9599a9ad64735a177e38 Mon Sep 17 00:00:00 2001 From: Nils Kuhn Date: Tue, 1 Sep 2020 13:55:50 +0200 Subject: [PATCH 03/10] :sparkles: Support updating and replacing given documents in addition to inserting new ones --- lib/logstash/outputs/mongodb.rb | 79 ++++++++++++++++++++++++++++++--- 1 file changed, 72 insertions(+), 7 deletions(-) diff --git a/lib/logstash/outputs/mongodb.rb b/lib/logstash/outputs/mongodb.rb index 0b88c68..75436d7 100644 --- a/lib/logstash/outputs/mongodb.rb +++ b/lib/logstash/outputs/mongodb.rb @@ -43,13 +43,23 @@ class LogStash::Outputs::Mongodb < LogStash::Outputs::Base # whatever the bulk interval value (mongodb hard limit is 1000). config :bulk_size, :validate => :number, :default => 900, :maximum => 999, :min => 2 + # The method used to write processed events to MongoDB. + # Possible values are `insert`, `update` and `replace`. + config :action, :validate => :string, :required => true + # The key of the query to find the document to update or replace. + config :query_key, :validate => :string, :required => false, :default => "_id" + # The value of the query to find the document to update or replace. This can be dynamic using the `%{foo}` syntax. + config :query_value, :validate => :string, :required => false + # If true, a new document is created if no document exists in DB with given `document_id`. + # Only applies if action is `update` or `replace`. + config :upsert, :validate => :boolean, :required => false, :default => false + # Mutex used to synchronize access to 'documents' @@mutex = Mutex.new def register - if @bulk_size > 1000 - raise LogStash::ConfigurationError, "Bulk size must be lower than '1000', currently '#{@bulk_size}'" - end + + validate_config Mongo::Logger.logger = @logger conn = Mongo::Client.new(@uri) @@ -65,7 +75,7 @@ def register @@mutex.synchronize do @documents.each do |collection, values| if values.length > 0 - @db[collection].insert_many(values) + write_to_mongodb(collection, values) @documents.delete(collection) end end @@ -74,6 +84,18 @@ def register end end + def validate_config + if @bulk_size > 1000 + raise LogStash::ConfigurationError, "Bulk size must be lower than '1000', currently '#{@bulk_size}'" + end + if @action != "insert" && @action != "update" && @action != "replace" + raise LogStash::ConfigurationError, "Only insert, update and replace are valid for 'action' setting." + end + if (@action == "update" || @action == "replace") && @query_value.blank? + raise LogStash::ConfigurationError, "If action is update or replace, query_value must be set." + end + end + def receive(event) begin # Our timestamp object now has a to_bson method, using it here @@ -94,8 +116,11 @@ def receive(event) document["_id"] = BSON::ObjectId.new end + collection = event.sprintf(@collection) + if @action == "update" or @action == "replace" + document["metadata_mongodb_output_query_value"] = event.sprintf(@query_value) + end if @bulk - collection = event.sprintf(@collection) @@mutex.synchronize do if(!@documents[collection]) @documents[collection] = [] @@ -103,12 +128,12 @@ def receive(event) @documents[collection].push(document) if(@documents[collection].length >= @bulk_size) - @db[collection].insert_many(@documents[collection]) + write_to_mongodb(collection, @documents[collection]) @documents.delete(collection) end end else - @db[event.sprintf(@collection)].insert_one(document) + write_to_mongodb(collection, [document]) end rescue => e if e.message =~ /^E11000/ @@ -126,6 +151,46 @@ def receive(event) end end + def write_to_mongodb(collection, documents) + ops = get_write_ops(documents) + @db[collection].bulk_write(ops) + end + + def get_write_ops(documents) + ops = [] + documents.each do |doc| + replaced_query_value = doc["metadata_mongodb_output_query_value"] + if @action == "insert" + ops << {:insert_one => doc} + elsif @action == "update" + ops << {:update_one => {:filter => {@query_key => replaced_query_value}, :update => {'$set' => to_dotted_hash(doc)}, :upsert => @upsert}} + elsif @action == "replace" + ops << {:replace_one => {:filter => {@query_key => replaced_query_value}, :replacement => doc, :upsert => @upsert}} + end + end + ops + end + + def to_dotted_hash(hash, recursive_key = "") + hash.each_with_object({}) do |(k, v), ret| + key = recursive_key + k.to_s + if v.is_a? Array + v.each_with_index do |arrV, i| + arrKey = key + "." + i.to_s + if arrV.is_a? Hash + ret.merge! to_dotted_hash(arrV, arrKey + ".") + else + ret[arrKey] = arrV + end + end + elsif v.is_a? Hash + ret.merge! to_dotted_hash(v, key + ".") + else + ret[key] = v + end + end + end + def close @closed.make_true @bulk_thread.wakeup From b69a38fcca2b385daeeb47575a63360a8a146606 Mon Sep 17 00:00:00 2001 From: Nils Kuhn Date: Tue, 1 Sep 2020 13:57:28 +0200 Subject: [PATCH 04/10] :bookmark: Set version to 3.2.0 fixing mongo driver version to 2.6 --- logstash-output-mongodb.gemspec | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/logstash-output-mongodb.gemspec b/logstash-output-mongodb.gemspec index e198114..550fbd1 100644 --- a/logstash-output-mongodb.gemspec +++ b/logstash-output-mongodb.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-mongodb' - s.version = '3.1.6' + s.version = '3.2.0' s.licenses = ['Apache License (2.0)'] s.summary = "Writes events to MongoDB" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" @@ -21,7 +21,7 @@ Gem::Specification.new do |s| # Gem dependencies s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_runtime_dependency 'logstash-codec-plain' - s.add_runtime_dependency 'mongo', '~> 2.6' + s.add_runtime_dependency 'mongo', '= 2.6' s.add_development_dependency 'logstash-devutils' end From f1dfb8114948d05f19d56bb3331c1828cc1dd21c Mon Sep 17 00:00:00 2001 From: Nils Kuhn Date: Tue, 1 Sep 2020 18:28:18 +0200 Subject: [PATCH 05/10] :white_check_mark: Added tests --- lib/logstash/outputs/mongodb.rb | 3 +- spec/integration/mongodb_spec.rb | 3 +- .../outputs/mongodb_config_validation_spec.rb | 64 ++++++++++++ ...mongodb_spec.rb => mongodb_insert_spec.rb} | 31 +++--- spec/outputs/mongodb_replace_spec.rb | 98 +++++++++++++++++++ .../mongodb_update_nested_fields_spec.rb | 81 +++++++++++++++ spec/outputs/mongodb_update_spec.rb | 98 +++++++++++++++++++ 7 files changed, 364 insertions(+), 14 deletions(-) create mode 100644 spec/outputs/mongodb_config_validation_spec.rb rename spec/outputs/{mongodb_spec.rb => mongodb_insert_spec.rb} (69%) create mode 100644 spec/outputs/mongodb_replace_spec.rb create mode 100644 spec/outputs/mongodb_update_nested_fields_spec.rb create mode 100644 spec/outputs/mongodb_update_spec.rb diff --git a/lib/logstash/outputs/mongodb.rb b/lib/logstash/outputs/mongodb.rb index 75436d7..749a026 100644 --- a/lib/logstash/outputs/mongodb.rb +++ b/lib/logstash/outputs/mongodb.rb @@ -91,7 +91,7 @@ def validate_config if @action != "insert" && @action != "update" && @action != "replace" raise LogStash::ConfigurationError, "Only insert, update and replace are valid for 'action' setting." end - if (@action == "update" || @action == "replace") && @query_value.blank? + if (@action == "update" || @action == "replace") && (@query_value.nil? || @query_value.empty?) raise LogStash::ConfigurationError, "If action is update or replace, query_value must be set." end end @@ -160,6 +160,7 @@ def get_write_ops(documents) ops = [] documents.each do |doc| replaced_query_value = doc["metadata_mongodb_output_query_value"] + doc.delete("metadata_mongodb_output_query_value") if @action == "insert" ops << {:insert_one => doc} elsif @action == "update" diff --git a/spec/integration/mongodb_spec.rb b/spec/integration/mongodb_spec.rb index 690e8fa..c6ed3e2 100644 --- a/spec/integration/mongodb_spec.rb +++ b/spec/integration/mongodb_spec.rb @@ -7,10 +7,11 @@ let(:database) { 'logstash' } let(:collection) { 'logs' } let(:uuid) { SecureRandom.uuid } + let(:action) { 'insert' } let(:config) do { "uri" => uri, "database" => database, - "collection" => collection, "isodate" => true } + "collection" => collection, "isodate" => true, "action" => action } end describe "#send" do diff --git a/spec/outputs/mongodb_config_validation_spec.rb b/spec/outputs/mongodb_config_validation_spec.rb new file mode 100644 index 0000000..4f590df --- /dev/null +++ b/spec/outputs/mongodb_config_validation_spec.rb @@ -0,0 +1,64 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/plugin" + +describe LogStash::Outputs::Mongodb do + + let(:uri) { 'mongodb://localhost:27017' } + let(:database) { 'logstash' } + let(:collection) { 'logs' } + + describe "validate_config method" do + + subject! { LogStash::Outputs::Mongodb.new(config) } + + [ + {:action => "not-supported", :query_key => "qk", :query_value => "qv", :upsert => false, + :expected_reason => "Only insert, update and replace are valid for 'action' setting."}, + {:action => "update", :query_key => "qk", :query_value => nil, :upsert => false, + :expected_reason => "If action is update or replace, query_value must be set."}, + {:action => "update", :query_key => "qk", :query_value => "", :upsert => false, + :expected_reason => "If action is update or replace, query_value must be set."}, + {:action => "replace", :query_key => "qk", :query_value => nil, :upsert => false, + :expected_reason => "If action is update or replace, query_value must be set."}, + {:action => "replace", :query_key => "qk", :query_value => "", :upsert => false, + :expected_reason => "If action is update or replace, query_value must be set."}, + {:action => "insert", :bulk_size => 1001, + :expected_reason => "Bulk size must be lower than '1000', currently '1001'"}, + ].each do |test| + + describe "when validating config with action '#{test[:action]}' query_key '#{test[:query_key]}', query_value '#{test[:query_value]}' and upsert '#{test[:upsert]}'" do + + let(:config) { + configuration = { + "uri" => uri, + "database" => database, + "collection" => collection + } + unless test[:action].nil? + configuration["action"] = test[:action] + end + unless test[:query_key].nil? + configuration["query_key"] = test[:query_key] + end + unless test[:query_value].nil? + configuration["query_value"] = test[:query_value] + end + unless test[:upsert].nil? + configuration["upsert"] = test[:upsert] + end + unless test[:bulk_size].nil? + configuration["bulk_size"] = test[:bulk_size] + end + return configuration + } + + it "should raise error: #{test[:expected_reason]}" do + expect { subject.validate_config }.to raise_error(LogStash::ConfigurationError, test[:expected_reason]) + end + end + + end + + end +end diff --git a/spec/outputs/mongodb_spec.rb b/spec/outputs/mongodb_insert_spec.rb similarity index 69% rename from spec/outputs/mongodb_spec.rb rename to spec/outputs/mongodb_insert_spec.rb index 8d3decb..a442465 100644 --- a/spec/outputs/mongodb_spec.rb +++ b/spec/outputs/mongodb_insert_spec.rb @@ -7,11 +7,13 @@ let(:uri) { 'mongodb://localhost:27017' } let(:database) { 'logstash' } let(:collection) { 'logs' } + let(:action) { 'insert' } let(:config) {{ "uri" => uri, "database" => database, - "collection" => collection + "collection" => collection, + "action" => action }} it "should register and close" do @@ -20,7 +22,7 @@ plugin.close end - describe "receive" do + describe "receive method while action is 'insert'" do subject! { LogStash::Outputs::Mongodb.new(config) } let(:event) { LogStash::Event.new(properties) } @@ -32,7 +34,7 @@ allow(Mongo::Client).to receive(:new).and_return(connection) allow(connection).to receive(:use).and_return(client) allow(client).to receive(:[]).and_return(collection) - allow(collection).to receive(:insert_one) + allow(collection).to receive(:bulk_write) subject.register end @@ -40,7 +42,7 @@ subject.close end - describe "#send" do + describe "when processing an event" do let(:properties) {{ "message" => "This is a message!", "uuid" => SecureRandom.uuid, @@ -49,36 +51,41 @@ }} it "should send the event to the database" do - expect(collection).to receive(:insert_one) + expect(collection).to receive(:bulk_write) subject.receive(event) end end - describe "no event @timestamp" do + describe "when processing an event without @timestamp set" do let(:properties) { { "message" => "foo" } } - it "should not contain a @timestamp field in the mongo document" do + it "should send a document without @timestamp field to mongodb" do expect(event).to receive(:timestamp).and_return(nil) expect(event).to receive(:to_hash).and_return(properties) - expect(collection).to receive(:insert_one).with(properties) + expect(collection).to receive(:bulk_write).with( + [ {:insert_one => properties} ] + ) subject.receive(event) end end - describe "generateId" do + describe "when generateId is set" do let(:properties) { { "message" => "foo" } } let(:config) {{ "uri" => uri, "database" => database, "collection" => collection, - "generateId" => true + "generateId" => true, + "action" => "insert" }} - it "should contain a BSON::ObjectId as _id" do + it "should send a document containing a BSON::ObjectId as _id to mongodb" do expect(BSON::ObjectId).to receive(:new).and_return("BSON::ObjectId") expect(event).to receive(:timestamp).and_return(nil) expect(event).to receive(:to_hash).and_return(properties) - expect(collection).to receive(:insert_one).with(properties.merge("_id" => "BSON::ObjectId")) + expect(collection).to receive(:bulk_write).with( + [ {:insert_one => properties.merge("_id" => "BSON::ObjectId")} ] + ) subject.receive(event) end end diff --git a/spec/outputs/mongodb_replace_spec.rb b/spec/outputs/mongodb_replace_spec.rb new file mode 100644 index 0000000..d8bf35d --- /dev/null +++ b/spec/outputs/mongodb_replace_spec.rb @@ -0,0 +1,98 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/plugin" + +describe LogStash::Outputs::Mongodb do + + let(:uri) { 'mongodb://localhost:27017' } + let(:database) { 'logstash' } + let(:collection) { 'logs' } + let(:action) { 'replace' } + + let(:config) { { + "uri" => uri, + "database" => database, + "collection" => collection, + "action" => action + } } + + describe "receive method while action is 'replace'" do + subject! { LogStash::Outputs::Mongodb.new(config) } + + let(:properties) { { + "message" => "This is a message!", + "uuid" => SecureRandom.uuid, + "number" => BigDecimal.new("4321.1234"), + "utf8" => "żółć" + } } + let(:event) { LogStash::Event.new(properties) } + let(:connection) { double("connection") } + let(:client) { double("client") } + let(:collection) { double("collection") } + + before(:each) do + allow(Mongo::Client).to receive(:new).and_return(connection) + allow(connection).to receive(:use).and_return(client) + allow(client).to receive(:[]).and_return(collection) + allow(collection).to receive(:bulk_write) + subject.register + end + + after(:each) do + subject.close + end + + [ + {:query_key => nil, :query_value => "qv", :upsert => false, + :expected => {:query_key => "_id", :query_value => "qv", :upsert => false} + }, + {:query_key => "qk", :query_value => "qv", :upsert => false, + :expected => {:query_key => "qk", :query_value => "qv", :upsert => false} + }, + {:query_key => "qk", :query_value => "qv", :upsert => nil, + :expected => {:query_key => "qk", :query_value => "qv", :upsert => false} + }, + {:query_key => nil, :query_value => "qv", :upsert => true, + :expected => {:query_key => "_id", :query_value => "qv", :upsert => true} + }, + {:query_key => "qk", :query_value => "qv", :upsert => true, + :expected => {:query_key => "qk", :query_value => "qv", :upsert => true} + }, + ].each do |test| + + describe "when processing an event with query_key set to '#{test[:query_key]}', query_value set to '#{test[:query_value]}' and upsert set to '#{test[:upsert]}'" do + + let(:config) { + configuration = { + "uri" => uri, + "database" => database, + "collection" => collection, + "action" => action + } + unless test[:query_key].nil? + configuration["query_key"] = test[:query_key] + end + unless test[:query_value].nil? + configuration["query_value"] = test[:query_value] + end + unless test[:upsert].nil? + configuration["upsert"] = test[:upsert] + end + return configuration + } + + expected = test[:expected] + it "should send that document as a replace to mongodb with query_key '#{expected[:query_key]}', query_value '#{expected[:query_value]}' and upsert '#{expected[:upsert]}'" do + expect(event).to receive(:timestamp).and_return(nil) + expect(event).to receive(:to_hash).and_return(properties) + expect(collection).to receive(:bulk_write).with( + [{:replace_one => {:filter => {expected[:query_key] => expected[:query_value]}, :replacement => properties, :upsert => expected[:upsert]}}] + ) + subject.receive(event) + end + end + + end + + end +end diff --git a/spec/outputs/mongodb_update_nested_fields_spec.rb b/spec/outputs/mongodb_update_nested_fields_spec.rb new file mode 100644 index 0000000..6639e69 --- /dev/null +++ b/spec/outputs/mongodb_update_nested_fields_spec.rb @@ -0,0 +1,81 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/plugin" + +describe LogStash::Outputs::Mongodb do + + let(:uri) { 'mongodb://localhost:27017' } + let(:database) { 'logstash' } + let(:collection) { 'logs' } + let(:action) { 'update' } + let(:query_value) { 'qv' } + + let(:config) { { + "uri" => uri, + "database" => database, + "collection" => collection, + "action" => action, + "query_value" => query_value + } } + + describe "receive method while action is 'update'" do + subject! { LogStash::Outputs::Mongodb.new(config) } + + let(:properties) { { + "message" => "This is a message!", + "hashField" => { + "numField" => 1, + "hashField" => { + "numField": 2 + }, + "arrayField" => ["one", "two", "three"] + }, + "arrayField": [ + {"strField" => "four"}, + {"strField" => "five"}, + {"strField" => "six"}, + "numField" => 3 + ] + } } + let(:event) { LogStash::Event.new(properties) } + let(:connection) { double("connection") } + let(:client) { double("client") } + let(:collection) { double("collection") } + + before(:each) do + allow(Mongo::Client).to receive(:new).and_return(connection) + allow(connection).to receive(:use).and_return(client) + allow(client).to receive(:[]).and_return(collection) + allow(collection).to receive(:bulk_write) + subject.register + end + + after(:each) do + subject.close + end + + describe "when processing an event with nested hash" do + + it "should send a document update to mongodb with dotted notation" do + expect(event).to receive(:timestamp).and_return(nil) + expect(event).to receive(:to_hash).and_return(properties) + expect(collection).to receive(:bulk_write).with( + [{:update_one => {:filter => {"_id" => query_value}, :update => {"$set" => { + "message" => "This is a message!", + "hashField.numField" => 1, + "hashField.hashField.numField" => 2, + "hashField.arrayField.0" => "one", + "hashField.arrayField.1" => "two", + "hashField.arrayField.2" => "three", + "arrayField.0.strField" => "four", + "arrayField.1.strField" => "five", + "arrayField.2.strField" => "six", + "arrayField.3.numField" => 3, + }}, :upsert => false}}] + ) + subject.receive(event) + end + end + + end +end diff --git a/spec/outputs/mongodb_update_spec.rb b/spec/outputs/mongodb_update_spec.rb new file mode 100644 index 0000000..2955d40 --- /dev/null +++ b/spec/outputs/mongodb_update_spec.rb @@ -0,0 +1,98 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/plugin" + +describe LogStash::Outputs::Mongodb do + + let(:uri) { 'mongodb://localhost:27017' } + let(:database) { 'logstash' } + let(:collection) { 'logs' } + let(:action) { 'update' } + + let(:config) { { + "uri" => uri, + "database" => database, + "collection" => collection, + "action" => action + } } + + describe "receive method while action is 'update'" do + subject! { LogStash::Outputs::Mongodb.new(config) } + + let(:properties) { { + "message" => "This is a message!", + "uuid" => SecureRandom.uuid, + "number" => BigDecimal.new("4321.1234"), + "utf8" => "żółć" + } } + let(:event) { LogStash::Event.new(properties) } + let(:connection) { double("connection") } + let(:client) { double("client") } + let(:collection) { double("collection") } + + before(:each) do + allow(Mongo::Client).to receive(:new).and_return(connection) + allow(connection).to receive(:use).and_return(client) + allow(client).to receive(:[]).and_return(collection) + allow(collection).to receive(:bulk_write) + subject.register + end + + after(:each) do + subject.close + end + + [ + {:query_key => nil, :query_value => "qv", :upsert => false, + :expected => {:query_key => "_id", :query_value => "qv", :upsert => false} + }, + {:query_key => "qk", :query_value => "qv", :upsert => false, + :expected => {:query_key => "qk", :query_value => "qv", :upsert => false} + }, + {:query_key => "qk", :query_value => "qv", :upsert => nil, + :expected => {:query_key => "qk", :query_value => "qv", :upsert => false} + }, + {:query_key => nil, :query_value => "qv", :upsert => true, + :expected => {:query_key => "_id", :query_value => "qv", :upsert => true} + }, + {:query_key => "qk", :query_value => "qv", :upsert => true, + :expected => {:query_key => "qk", :query_value => "qv", :upsert => true} + }, + ].each do |test| + + describe "when processing an event with query_key set to '#{test[:query_key]}', query_value set to '#{test[:query_value]}' and upsert set to '#{test[:upsert]}'" do + + let(:config) { + configuration = { + "uri" => uri, + "database" => database, + "collection" => collection, + "action" => action + } + unless test[:query_key].nil? + configuration["query_key"] = test[:query_key] + end + unless test[:query_value].nil? + configuration["query_value"] = test[:query_value] + end + unless test[:upsert].nil? + configuration["upsert"] = test[:upsert] + end + return configuration + } + + expected = test[:expected] + it "should send that document as an update to mongodb with query_key '#{expected[:query_key]}', query_value '#{expected[:query_value]}' and upsert '#{expected[:upsert]}'" do + expect(event).to receive(:timestamp).and_return(nil) + expect(event).to receive(:to_hash).and_return(properties) + expect(collection).to receive(:bulk_write).with( + [{:update_one => {:filter => {expected[:query_key] => expected[:query_value]}, :update => {"$set" => properties}, :upsert => expected[:upsert]}}] + ) + subject.receive(event) + end + end + + end + + end +end From 9d977cb0c0b6bc6cb5d4b4ecdd6a2d14b45596e1 Mon Sep 17 00:00:00 2001 From: Nils Kuhn Date: Thu, 3 Sep 2020 11:03:57 +0200 Subject: [PATCH 06/10] :bug: Removed partial update support for arrays probably doesn't make sense in most use cases and causes trouble if size or order of array elements changes --- lib/logstash/outputs/mongodb.rb | 11 +---- .../mongodb_update_nested_fields_spec.rb | 46 +++++++++++-------- 2 files changed, 28 insertions(+), 29 deletions(-) diff --git a/lib/logstash/outputs/mongodb.rb b/lib/logstash/outputs/mongodb.rb index 749a026..245dacc 100644 --- a/lib/logstash/outputs/mongodb.rb +++ b/lib/logstash/outputs/mongodb.rb @@ -175,16 +175,7 @@ def get_write_ops(documents) def to_dotted_hash(hash, recursive_key = "") hash.each_with_object({}) do |(k, v), ret| key = recursive_key + k.to_s - if v.is_a? Array - v.each_with_index do |arrV, i| - arrKey = key + "." + i.to_s - if arrV.is_a? Hash - ret.merge! to_dotted_hash(arrV, arrKey + ".") - else - ret[arrKey] = arrV - end - end - elsif v.is_a? Hash + if v.is_a? Hash ret.merge! to_dotted_hash(v, key + ".") else ret[key] = v diff --git a/spec/outputs/mongodb_update_nested_fields_spec.rb b/spec/outputs/mongodb_update_nested_fields_spec.rb index 6639e69..fa0f732 100644 --- a/spec/outputs/mongodb_update_nested_fields_spec.rb +++ b/spec/outputs/mongodb_update_nested_fields_spec.rb @@ -23,18 +23,22 @@ let(:properties) { { "message" => "This is a message!", - "hashField" => { - "numField" => 1, - "hashField" => { + "rootHashField" => { + "numFieldInHash" => 1, + "hashFieldInHash" => { "numField": 2 }, - "arrayField" => ["one", "two", "three"] + "arrayFieldInHash" => ["one", "two", "three"] }, - "arrayField": [ - {"strField" => "four"}, - {"strField" => "five"}, - {"strField" => "six"}, - "numField" => 3 + "rootArrayField": [ + {"strFieldInArray" => "four"}, + {"strFieldInArray" => "five"}, + {"strFieldInArray" => "six"} + ], + "nestedArrayField": [ + {"strFieldInArray" => "four", "arrayFieldInArray" => [3, 4], "hashFieldInArray" => {"numField" => 9}}, + {"strFieldInArray" => "five", "arrayFieldInArray" => [5, 6], "hashFieldInArray" => {"numField" => 10}}, + {"strFieldInArray" => "six", "arrayFieldInArray" => [7, 8], "hashFieldInArray" => {"numField" => 11}} ] } } let(:event) { LogStash::Event.new(properties) } @@ -56,21 +60,25 @@ describe "when processing an event with nested hash" do - it "should send a document update to mongodb with dotted notation" do + it "should send a document update to mongodb with dotted notation for fields in inner hashes" do expect(event).to receive(:timestamp).and_return(nil) expect(event).to receive(:to_hash).and_return(properties) expect(collection).to receive(:bulk_write).with( [{:update_one => {:filter => {"_id" => query_value}, :update => {"$set" => { "message" => "This is a message!", - "hashField.numField" => 1, - "hashField.hashField.numField" => 2, - "hashField.arrayField.0" => "one", - "hashField.arrayField.1" => "two", - "hashField.arrayField.2" => "three", - "arrayField.0.strField" => "four", - "arrayField.1.strField" => "five", - "arrayField.2.strField" => "six", - "arrayField.3.numField" => 3, + "rootHashField.numFieldInHash" => 1, + "rootHashField.hashFieldInHash.numField" => 2, + "rootHashField.arrayFieldInHash" => ["one", "two", "three"], + "rootArrayField" => [ + {"strFieldInArray" => "four"}, + {"strFieldInArray" => "five"}, + {"strFieldInArray" => "six"} + ], + "nestedArrayField" => [ + {"strFieldInArray" => "four", "arrayFieldInArray" => [3, 4], "hashFieldInArray" => {"numField" => 9}}, + {"strFieldInArray" => "five", "arrayFieldInArray" => [5, 6], "hashFieldInArray" => {"numField" => 10}}, + {"strFieldInArray" => "six", "arrayFieldInArray" => [7, 8], "hashFieldInArray" => {"numField" => 11}} + ], }}, :upsert => false}}] ) subject.receive(event) From 7cc4283d7c918ca167df89166f99c7b3e76ff7b4 Mon Sep 17 00:00:00 2001 From: Andrea Richiardi Date: Thu, 8 Oct 2020 19:07:10 -0700 Subject: [PATCH 07/10] :tada: introduce :filter and :update_expressions This commit introduces :filter (instead of :query-key and :query-value) for defining the query that we pass down to Mongo. It is a hash and will allow us to filter by a composite key. In addition to that :update_expressions is now another optional hash that can be added to *replace* the default $set operator. The hash is a set of Mongo Update Expressions (https://docs.mongodb.com/manual/reference/operator/update/#id1) and the values are also substituted. Note that pipeline (Mongo >= 4.2) support is not there yet. Finally, action is now fully dynamic and expanded via sprintf as in logstash-output-elasticsearch. --- .gitignore | 1 + README.md | 25 +-- lib/logstash/outputs/mongodb.rb | 145 +++++++++++++++--- logstash-output-mongodb.gemspec | 2 +- spec/integration/mongodb_spec.rb | 9 +- .../outputs/mongodb_config_validation_spec.rb | 88 ++++++++--- spec/outputs/mongodb_insert_spec.rb | 2 +- spec/outputs/mongodb_replace_spec.rb | 39 ++--- spec/outputs/mongodb_unit.rb | 48 ++++++ .../mongodb_update_nested_fields_spec.rb | 6 +- spec/outputs/mongodb_update_spec.rb | 77 ++++++---- spec/spec_helper.rb | 4 + 12 files changed, 325 insertions(+), 121 deletions(-) create mode 100644 spec/outputs/mongodb_unit.rb diff --git a/.gitignore b/.gitignore index 3300a23..1ba8135 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ Gemfile.lock .bundle vendor +logs diff --git a/README.md b/README.md index b5831b0..0ffc4fd 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,10 @@ -# Logstash Plugin +# Logstash Mongo Output Plugin -[![Travis Build Status](https://travis-ci.org/logstash-plugins/logstash-output-mongodb.svg)](https://travis-ci.org/logstash-plugins/logstash-output-mongodb) +--- +This is a fork of [logstash-plugins/logstash-output-mongodb](https://github.com/logstash-plugins/logstash-output-mongodb). + +It adds the :action, :filter, :update_expressions and :upsert parameters +--- This is a plugin for [Logstash](https://github.com/elastic/logstash). @@ -21,20 +25,17 @@ Need help? Try #logstash on freenode IRC or the https://discuss.elastic.co/c/log ### 1. Plugin Developement and Testing -#### Code -- To get started, you'll need JRuby with the Bundler gem installed. +For developing this plugin we use the wonderful work of [cameronkerrnz/logstash-plugin-dev](https://github.com/cameronkerrnz/logstash-plugin-dev): -- Create a new plugin or clone and existing from the GitHub [logstash-plugins](https://github.com/logstash-plugins) organization. We also provide [example plugins](https://github.com/logstash-plugins?query=example). +To start an interactive environment run: -- Install dependencies -```sh -bundle install +``` sh +docker run --rm -it -v ${PWD}:/work cameronkerrnz/logstash-plugin-dev:7.9 ``` -#### Test - -- Update your dependencies +After that you can run the usual suspects: +- Install/Update dependencies ```sh bundle install ``` @@ -95,4 +96,4 @@ Programming is not a required skill. Whatever you've seen about open source and It is more important to the community that you are able to contribute. -For more information about contributing, see the [CONTRIBUTING](https://github.com/elastic/logstash/blob/master/CONTRIBUTING.md) file. \ No newline at end of file +For more information about contributing, see the [CONTRIBUTING](https://github.com/elastic/logstash/blob/master/CONTRIBUTING.md) file. diff --git a/lib/logstash/outputs/mongodb.rb b/lib/logstash/outputs/mongodb.rb index 245dacc..b8c2e79 100644 --- a/lib/logstash/outputs/mongodb.rb +++ b/lib/logstash/outputs/mongodb.rb @@ -34,22 +34,55 @@ class LogStash::Outputs::Mongodb < LogStash::Outputs::Base # "_id" field in the event. config :generateId, :validate => :boolean, :default => false - # Bulk insert flag, set to true to allow bulk insertion, else it will insert events one by one. config :bulk, :validate => :boolean, :default => false + # Bulk interval, Used to insert events periodically if the "bulk" flag is activated. config :bulk_interval, :validate => :number, :default => 2 + # Bulk events number, if the number of events to insert into a collection raise that limit, it will be bulk inserted # whatever the bulk interval value (mongodb hard limit is 1000). config :bulk_size, :validate => :number, :default => 900, :maximum => 999, :min => 2 - # The method used to write processed events to MongoDB. - # Possible values are `insert`, `update` and `replace`. - config :action, :validate => :string, :required => true - # The key of the query to find the document to update or replace. - config :query_key, :validate => :string, :required => false, :default => "_id" - # The value of the query to find the document to update or replace. This can be dynamic using the `%{foo}` syntax. - config :query_value, :validate => :string, :required => false + # The Mongo DB action to perform. Valid actions are: + # + # - insert: inserts a document, fails if a document the document already exists. + # - update: updates a document given a `filter`. You can also upsert a document, see the `upsert` option. + # - delete: *Not Supported* at the moment + # + # A sprintf-able string is allowed to change the action based on the content + # of the event. The value `%{[foo]}` would use the foo field for the action. + # + # For more details on actions, check out the https://docs.mongodb.com/ruby-driver/v2.6/tutorials/ruby-driver-bulk-operations[Mongo Ruby Driver documentation] + config :action, :validate => :string, :default => "insert" + + # The :filter clause for an update or replace. + # + # A sprintf-able string is allowed for keys: the value `my_%{[foo]}` would + # use the foo field instead *always coerced to a string*. + # + # Hovewever, the + # https://www.elastic.co/guide/en/logstash/current/field-references-deepdive.html[Field + # Reference Syntax] is required for values - these preserve type (integer, + # float, ...). + config :filter, :validate => :hash, :required => false, :default => {} + + # The hash in :update_expressions will be used *instead* of the default + # '$set'. This option is useful for using alternative operators like '$inc'. + # + # A sprintf-able string is allowed for keys: the value `my_%{[foo]}` would + # use the foo field instead *always coerced to a string*. + # + # Hovewever, the + # https://www.elastic.co/guide/en/logstash/current/field-references-deepdive.html[Field + # Reference Syntax] is required for values - these preserve type (integer, + # float, ...). + # + # Keys must start with `$`, see the https://docs.mongodb.com/manual/reference/operator/update/#id1[Mongo DB Update Operators] for reference. + # + # Note that pipeline support (Mongo >= 4.2) is not there yet. + config :update_expressions, :validate => :hash, :required => false, :default => nil + # If true, a new document is created if no document exists in DB with given `document_id`. # Only applies if action is `update` or `replace`. config :upsert, :validate => :boolean, :required => false, :default => false @@ -88,20 +121,37 @@ def validate_config if @bulk_size > 1000 raise LogStash::ConfigurationError, "Bulk size must be lower than '1000', currently '#{@bulk_size}'" end - if @action != "insert" && @action != "update" && @action != "replace" - raise LogStash::ConfigurationError, "Only insert, update and replace are valid for 'action' setting." + if !@update_expressions.nil? + @update_expressions.keys.each { |k| + if !is_update_operator(k) + raise LogStash::ConfigurationError, "The :update_expressions option contains '#{k}', which is not an Update expression." + break + end + } end - if (@action == "update" || @action == "replace") && (@query_value.nil? || @query_value.empty?) - raise LogStash::ConfigurationError, "If action is update or replace, query_value must be set." + end + + def validate_action(action, filter, update_expressions) + if action != "insert" && action != "update" && action != "replace" + raise LogStash::ConfigurationError, "Only insert, update and replace are supported Mongo actions, got '#{action}'." + end + if (action == "update" || action == "replace") && (filter.nil? || filter.empty?) + raise LogStash::ConfigurationError, "If action is update or replace, filter must be set." + end + if action != "update" && !(update_expressions.nil? || update_expressions.empty?) + raise LogStash::ConfigurationError, "The :update_expressions only makes sense if the action is an update." end end def receive(event) + action = event.sprintf(@action) + + validate_action(action, @filter, @update_expressions) + begin # Our timestamp object now has a to_bson method, using it here # {}.merge(other) so we don't taint the event hash innards document = {}.merge(event.to_hash) - if !@isodate timestamp = event.timestamp if timestamp @@ -117,9 +167,19 @@ def receive(event) end collection = event.sprintf(@collection) - if @action == "update" or @action == "replace" - document["metadata_mongodb_output_query_value"] = event.sprintf(@query_value) + if action == "update" or action == "replace" + document["metadata_mongodb_output_filter"] = apply_event_to_hash(event, @filter) end + + if action == "update" and !(@update_expressions.nil? || @update_expressions.empty?) + # we only expand the values cause keys are update expressions + expressions_hash = {} + @update_expressions.each do |k, v| + expressions_hash[k] = apply_event_to_hash(event, v) + end + document["metadata_mongodb_output_update_expressions"] = expressions_hash + end + if @bulk @@mutex.synchronize do if(!@documents[collection]) @@ -133,7 +193,8 @@ def receive(event) end end else - write_to_mongodb(collection, [document]) + result = write_to_mongodb(collection, [document]) + @logger.debug("Bulk write result", :result => result) end rescue => e if e.message =~ /^E11000/ @@ -144,7 +205,7 @@ def receive(event) # to fix the issue. @logger.warn("Skipping insert because of a duplicate key error", :event => event, :exception => e) else - @logger.warn("Failed to send event to MongoDB, retrying in #{@retry_delay.to_s} seconds", :event => event, :exception => e) + @logger.warn("Failed to send event to MongoDB retrying in #{@retry_delay.to_s} seconds", :result=> e.result, :message => e.message) sleep(@retry_delay) retry end @@ -153,25 +214,61 @@ def receive(event) def write_to_mongodb(collection, documents) ops = get_write_ops(documents) + @logger.debug("Sending", :ops => ops) @db[collection].bulk_write(ops) end def get_write_ops(documents) ops = [] documents.each do |doc| - replaced_query_value = doc["metadata_mongodb_output_query_value"] - doc.delete("metadata_mongodb_output_query_value") - if @action == "insert" + filter = doc["metadata_mongodb_output_filter"] + doc.delete("metadata_mongodb_output_filter") + + update_expressions = doc["metadata_mongodb_output_update_expressions"] + doc.delete("metadata_mongodb_output_update_expressions") + + # TODO: support multiple expressions as pipeline for Mongo >= 4.2 + update = if !update_expressions.nil? + update_expressions + else + {'$set' => to_dotted_hash(doc)} + end + + if action == "insert" ops << {:insert_one => doc} - elsif @action == "update" - ops << {:update_one => {:filter => {@query_key => replaced_query_value}, :update => {'$set' => to_dotted_hash(doc)}, :upsert => @upsert}} - elsif @action == "replace" - ops << {:replace_one => {:filter => {@query_key => replaced_query_value}, :replacement => doc, :upsert => @upsert}} + elsif action == "update" + ops << {:update_one => {:filter => filter, :update => update, :upsert => @upsert}} + elsif action == "replace" + ops << {:replace_one => {:filter => filter, :replacement => doc, :upsert => @upsert}} end end ops end + def is_update_operator(string) + string.start_with?("$") + end + + # Apply the event to the input hash keys and values. + # + # This function is recursive. + # + # It uses event.sprintf for keys but event.get for values because it looks + # like event.sprintf always returns a string and we don't want to always + # coerce. + # + # See https://github.com/elastic/logstash/issues/5114 + def apply_event_to_hash(event, hash) + hash.clone.each_with_object({}) do |(k, v), ret| + if v.is_a? Hash + ret[event.sprintf(k)] = apply_event_to_hash(event, v) + else + event_value = event.get(v) + ret[event.sprintf(k)] = event_value.nil? ? v : event_value + end + end + end + def to_dotted_hash(hash, recursive_key = "") hash.each_with_object({}) do |(k, v), ret| key = recursive_key + k.to_s diff --git a/logstash-output-mongodb.gemspec b/logstash-output-mongodb.gemspec index 550fbd1..a21d569 100644 --- a/logstash-output-mongodb.gemspec +++ b/logstash-output-mongodb.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-mongodb' - s.version = '3.2.0' + s.version = '3.2.1' s.licenses = ['Apache License (2.0)'] s.summary = "Writes events to MongoDB" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/integration/mongodb_spec.rb b/spec/integration/mongodb_spec.rb index c6ed3e2..fe1f0e3 100644 --- a/spec/integration/mongodb_spec.rb +++ b/spec/integration/mongodb_spec.rb @@ -6,8 +6,7 @@ let(:uri) { 'mongodb://localhost:27017' } let(:database) { 'logstash' } let(:collection) { 'logs' } - let(:uuid) { SecureRandom.uuid } - let(:action) { 'insert' } + let(:action) { 'insert' } let(:config) do { "uri" => uri, "database" => database, @@ -19,8 +18,10 @@ subject { LogStash::Outputs::Mongodb.new(config) } let(:properties) { { "message" => "This is a message!", - "uuid" => uuid, "number" => BigDecimal.new("4321.1234"), - "utf8" => "żółć", "int" => 42, + "uuid" => "00000000-0000-0000-0000-000000000000", + "number" => BigDecimal.new("4321.1234"), + "utf8" => "żółć", + "int" => 42, "arry" => [42, "string", 4321.1234]} } let(:event) { LogStash::Event.new(properties) } diff --git a/spec/outputs/mongodb_config_validation_spec.rb b/spec/outputs/mongodb_config_validation_spec.rb index 4f590df..74f045e 100644 --- a/spec/outputs/mongodb_config_validation_spec.rb +++ b/spec/outputs/mongodb_config_validation_spec.rb @@ -8,28 +8,71 @@ let(:database) { 'logstash' } let(:collection) { 'logs' } - describe "validate_config method" do + describe "when validating config" do subject! { LogStash::Outputs::Mongodb.new(config) } [ - {:action => "not-supported", :query_key => "qk", :query_value => "qv", :upsert => false, - :expected_reason => "Only insert, update and replace are valid for 'action' setting."}, - {:action => "update", :query_key => "qk", :query_value => nil, :upsert => false, - :expected_reason => "If action is update or replace, query_value must be set."}, - {:action => "update", :query_key => "qk", :query_value => "", :upsert => false, - :expected_reason => "If action is update or replace, query_value must be set."}, - {:action => "replace", :query_key => "qk", :query_value => nil, :upsert => false, - :expected_reason => "If action is update or replace, query_value must be set."}, - {:action => "replace", :query_key => "qk", :query_value => "", :upsert => false, - :expected_reason => "If action is update or replace, query_value must be set."}, + {:update_expressions => {"invalid-expression" => "foo"}, + :expected_reason => "The :update_expressions option contains 'invalid-expression', which is not an Update expression."}, {:action => "insert", :bulk_size => 1001, :expected_reason => "Bulk size must be lower than '1000', currently '1001'"}, ].each do |test| - describe "when validating config with action '#{test[:action]}' query_key '#{test[:query_key]}', query_value '#{test[:query_value]}' and upsert '#{test[:upsert]}'" do + describe "with :bulk_size => '#{test[:bulk_size]}', :upsert => '#{test[:upsert]}' and :update_expressions => '#{test[:update_expressions]}'" do - let(:config) { + let(:config) do + configuration = { + "uri" => uri, + "database" => database, + "collection" => collection, + "filter" => {"_id" => "123"}, + "action" => "update" + } + unless test[:bulk_size].nil? + configuration["bulk_size"] = test[:bulk_size] + end + unless test[:update_expressions].nil? + configuration["update_expressions"] = test[:update_expressions] + end + return configuration + end + + it "should raise error: '#{test[:expected_reason]}'" do + expect { subject.validate_config }.to raise_error(LogStash::ConfigurationError, test[:expected_reason]) + end + end + end + end + + describe "when validating action" do + + subject! { LogStash::Outputs::Mongodb.new(config) } + + [ + {:action => "unsupported", :filter => {"_id" => "123"}, :upsert => false, + :expected_reason => "Only insert, update and replace are supported Mongo actions, got 'unsupported'."}, + {:action => "delete", :filter => {"_id" => "123"}, :upsert => false, + :expected_reason => "Only insert, update and replace are supported Mongo actions, got 'delete'."}, + {:action => "update", :filter => {}, :upsert => false, + :expected_reason => "If action is update or replace, filter must be set."}, + {:action => "%{myaction}", :filter => {}, :upsert => false, + :expected_reason => "If action is update or replace, filter must be set."}, + {:action => "%{[myactionnested][foo]}", :filter => {}, :upsert => false, + :expected_reason => "If action is update or replace, filter must be set."}, + {:action => "update", :filter => nil, :upsert => false, + :expected_reason => "If action is update or replace, filter must be set."}, + {:action => "insert", :update_expressions => {"$inc" => {"quantity" => 1}}, + :expected_reason => "The :update_expressions only makes sense if the action is an update."}, + {:action => "replace", :filter => {"_id" => "123"}, :update_expressions => {"$inc" => {"quantity" => 1}}, + :expected_reason => "The :update_expressions only makes sense if the action is an update."}, + ].each do |test| + + describe "with :action => '#{test[:action]}', :filter => '#{test[:filter]}', :upsert => '#{test[:upsert]}' and :update_expressions => '#{test[:update_expressions]}'" do + + let(:event) { LogStash::Event.new("myaction" => "update", "myactionnested" => {"foo" => "replace"})} + + let(:config) do configuration = { "uri" => uri, "database" => database, @@ -38,27 +81,22 @@ unless test[:action].nil? configuration["action"] = test[:action] end - unless test[:query_key].nil? - configuration["query_key"] = test[:query_key] - end - unless test[:query_value].nil? - configuration["query_value"] = test[:query_value] + unless test[:filter].nil? + configuration["filter"] = test[:filter] end unless test[:upsert].nil? configuration["upsert"] = test[:upsert] end - unless test[:bulk_size].nil? - configuration["bulk_size"] = test[:bulk_size] + unless test[:update_expressions].nil? + configuration["update_expressions"] = test[:update_expressions] end return configuration - } + end - it "should raise error: #{test[:expected_reason]}" do - expect { subject.validate_config }.to raise_error(LogStash::ConfigurationError, test[:expected_reason]) + it "should raise error: '#{test[:expected_reason]}'" do + expect { subject.receive(event) }.to raise_error(LogStash::ConfigurationError, test[:expected_reason]) end end - end - end end diff --git a/spec/outputs/mongodb_insert_spec.rb b/spec/outputs/mongodb_insert_spec.rb index a442465..96c6029 100644 --- a/spec/outputs/mongodb_insert_spec.rb +++ b/spec/outputs/mongodb_insert_spec.rb @@ -45,7 +45,7 @@ describe "when processing an event" do let(:properties) {{ "message" => "This is a message!", - "uuid" => SecureRandom.uuid, + "uuid" => "00000000-0000-0000-0000-000000000000", "number" => BigDecimal.new("4321.1234"), "utf8" => "żółć" }} diff --git a/spec/outputs/mongodb_replace_spec.rb b/spec/outputs/mongodb_replace_spec.rb index d8bf35d..85a6b03 100644 --- a/spec/outputs/mongodb_replace_spec.rb +++ b/spec/outputs/mongodb_replace_spec.rb @@ -21,7 +21,7 @@ let(:properties) { { "message" => "This is a message!", - "uuid" => SecureRandom.uuid, + "uuid" => "00000000-0000-0000-0000-000000000000", "number" => BigDecimal.new("4321.1234"), "utf8" => "żółć" } } @@ -43,24 +43,18 @@ end [ - {:query_key => nil, :query_value => "qv", :upsert => false, - :expected => {:query_key => "_id", :query_value => "qv", :upsert => false} - }, - {:query_key => "qk", :query_value => "qv", :upsert => false, - :expected => {:query_key => "qk", :query_value => "qv", :upsert => false} - }, - {:query_key => "qk", :query_value => "qv", :upsert => nil, - :expected => {:query_key => "qk", :query_value => "qv", :upsert => false} - }, - {:query_key => nil, :query_value => "qv", :upsert => true, - :expected => {:query_key => "_id", :query_value => "qv", :upsert => true} - }, - {:query_key => "qk", :query_value => "qv", :upsert => true, - :expected => {:query_key => "qk", :query_value => "qv", :upsert => true} - }, + {:filter => {"_id" => "[uuid]"}, :upsert => false, + :expected => {:filter => {"_id" => "00000000-0000-0000-0000-000000000000"}, :upsert => false} + }, + {:filter => {"%{utf8}" => "[message]"}, :upsert => nil, + :expected => {:filter => {"żółć" => "This is a message!"}, :upsert => false} + }, + {:filter => {"%{utf8}" => "[message]"}, :upsert => true, + :expected => {:filter => {"żółć" => "This is a message!"}, :upsert => true} + }, ].each do |test| - describe "when processing an event with query_key set to '#{test[:query_key]}', query_value set to '#{test[:query_value]}' and upsert set to '#{test[:upsert]}'" do + describe "when processing an event with :filter => '#{test[:filter]}' and :upsert => '#{test[:upsert]}'" do let(:config) { configuration = { @@ -69,11 +63,8 @@ "collection" => collection, "action" => action } - unless test[:query_key].nil? - configuration["query_key"] = test[:query_key] - end - unless test[:query_value].nil? - configuration["query_value"] = test[:query_value] + unless test[:filter].nil? + configuration["filter"] = test[:filter] end unless test[:upsert].nil? configuration["upsert"] = test[:upsert] @@ -82,11 +73,11 @@ } expected = test[:expected] - it "should send that document as a replace to mongodb with query_key '#{expected[:query_key]}', query_value '#{expected[:query_value]}' and upsert '#{expected[:upsert]}'" do + it "should send that document as a replace to mongodb with :filter => '#{expected[:filter]}' and upsert => '#{expected[:upsert]}'" do expect(event).to receive(:timestamp).and_return(nil) expect(event).to receive(:to_hash).and_return(properties) expect(collection).to receive(:bulk_write).with( - [{:replace_one => {:filter => {expected[:query_key] => expected[:query_value]}, :replacement => properties, :upsert => expected[:upsert]}}] + [{:replace_one => {:filter => expected[:filter], :replacement => properties, :upsert => expected[:upsert]}}] ) subject.receive(event) end diff --git a/spec/outputs/mongodb_unit.rb b/spec/outputs/mongodb_unit.rb new file mode 100644 index 0000000..1cda792 --- /dev/null +++ b/spec/outputs/mongodb_unit.rb @@ -0,0 +1,48 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require_relative "../spec_helper" + +describe "mongodb unit tests" do + let(:uri) { "mongodb://localhost:27017" } + let(:database) { "logstash" } + let(:collection) { "logs" } + let(:action) { "insert" } + + let(:config) {{ + "uri" => uri, + "database" => database, + "collection" => collection, + "action" => action + }} + + subject! { LogStash::Outputs::Mongodb.new(config) } + + context "when calling apply_event_to_hash" do + + let (:event) { LogStash::Event.new({"message" => "hello", "positive" => 1, "negative" => -1}) } + + it "should preserve string type for values given the field reference syntax" do + h = {"key" => "[message]"} + applied_hash = subject.apply_event_to_hash(event, h) + expect(applied_hash["key"]).to eql "hello" + end + + it "should preserve positive int type for values given the field reference syntax" do + h = {"key" => "[positive]"} + applied_hash = subject.apply_event_to_hash(event, h) + expect(applied_hash["key"]).to eql 1 + end + + it "should preserve negative int type for values given the field reference syntax" do + h = {"key" => "[negative]"} + applied_hash = subject.apply_event_to_hash(event, h) + expect(applied_hash["key"]).to eql(-1) + end + + it "should always interpolate strings for keys given the sprintf syntax" do + h = {"key_%{positive}" => %{message}} + applied_hash = subject.apply_event_to_hash(event, h) + expect(applied_hash).to have_key("key_1") + end + end +end diff --git a/spec/outputs/mongodb_update_nested_fields_spec.rb b/spec/outputs/mongodb_update_nested_fields_spec.rb index fa0f732..8cef399 100644 --- a/spec/outputs/mongodb_update_nested_fields_spec.rb +++ b/spec/outputs/mongodb_update_nested_fields_spec.rb @@ -8,14 +8,14 @@ let(:database) { 'logstash' } let(:collection) { 'logs' } let(:action) { 'update' } - let(:query_value) { 'qv' } + let(:filter) { {"_id" => 'foo' } } let(:config) { { "uri" => uri, "database" => database, "collection" => collection, "action" => action, - "query_value" => query_value + "filter" => filter, } } describe "receive method while action is 'update'" do @@ -64,7 +64,7 @@ expect(event).to receive(:timestamp).and_return(nil) expect(event).to receive(:to_hash).and_return(properties) expect(collection).to receive(:bulk_write).with( - [{:update_one => {:filter => {"_id" => query_value}, :update => {"$set" => { + [{:update_one => {:filter => {"_id" => 'foo' }, :update => {"$set" => { "message" => "This is a message!", "rootHashField.numFieldInHash" => 1, "rootHashField.hashFieldInHash.numField" => 2, diff --git a/spec/outputs/mongodb_update_spec.rb b/spec/outputs/mongodb_update_spec.rb index 2955d40..7d57f6e 100644 --- a/spec/outputs/mongodb_update_spec.rb +++ b/spec/outputs/mongodb_update_spec.rb @@ -4,7 +4,7 @@ describe LogStash::Outputs::Mongodb do - let(:uri) { 'mongodb://localhost:27017' } + let(:uri) { 'mongodb://mongo:27017' } let(:database) { 'logstash' } let(:collection) { 'logs' } let(:action) { 'update' } @@ -21,8 +21,9 @@ let(:properties) { { "message" => "This is a message!", - "uuid" => SecureRandom.uuid, + "uuid" => "00000000-0000-0000-0000-000000000000", "number" => BigDecimal.new("4321.1234"), + "integer" => 1, "utf8" => "żółć" } } let(:event) { LogStash::Event.new(properties) } @@ -43,25 +44,38 @@ end [ - {:query_key => nil, :query_value => "qv", :upsert => false, - :expected => {:query_key => "_id", :query_value => "qv", :upsert => false} - }, - {:query_key => "qk", :query_value => "qv", :upsert => false, - :expected => {:query_key => "qk", :query_value => "qv", :upsert => false} - }, - {:query_key => "qk", :query_value => "qv", :upsert => nil, - :expected => {:query_key => "qk", :query_value => "qv", :upsert => false} - }, - {:query_key => nil, :query_value => "qv", :upsert => true, - :expected => {:query_key => "_id", :query_value => "qv", :upsert => true} - }, - {:query_key => "qk", :query_value => "qv", :upsert => true, - :expected => {:query_key => "qk", :query_value => "qv", :upsert => true} - }, + {:filter => {"_id" => "[uuid]"}, :upsert => false, + :expected => {:filter => {"_id" => "00000000-0000-0000-0000-000000000000"}, :upsert => false} + }, + {:filter => {"%{utf8}" => "[message]"}, :upsert => nil, + :expected => {:filter => {"żółć" => "This is a message!"}, :upsert => false} + }, + {:filter => {"%{utf8}" => "[message]"}, :upsert => true, + :expected => {:filter => {"żółć" => "This is a message!"}, :upsert => true} + }, + # Nested hash recursion + {:filter => {"_id" => "123"}, + :update_expressions => {"$inc" => {"quantity" => "[integer]"}, + "$currentDate" => {"updated_at" => {"$type" => "timestamp"}}}, + :expected => {:filter => {"_id" => "123"}, + :update_expressions => {"$inc" => {"quantity" => 1}, + "$currentDate" => {"updated_at" => {"$type" => "timestamp"}}}, + :upsert => false} + }, + # Nested key-value substitution + {:filter => {"_id" => "123"}, + :update_expressions => {"$inc" => {"quantity" => "[integer]"}, + "$rename" => {"foo" => "[utf8]", + "bar-%{integer}" => "baz"}}, + :expected => {:filter => {"_id" => "123"}, + :update_expressions => {"$inc" => {"quantity" => 1}, + "$rename" => {"foo" => "żółć", + "bar-1" => "baz"}}, + :upsert => false} + }, ].each do |test| - describe "when processing an event with query_key set to '#{test[:query_key]}', query_value set to '#{test[:query_value]}' and upsert set to '#{test[:upsert]}'" do - + describe "when processing an event with :filter => '#{test[:filter]}', :upsert => '#{test[:upsert]}' and merge :update and :update_expressions}'" do let(:config) { configuration = { "uri" => uri, @@ -69,25 +83,34 @@ "collection" => collection, "action" => action } - unless test[:query_key].nil? - configuration["query_key"] = test[:query_key] - end - unless test[:query_value].nil? - configuration["query_value"] = test[:query_value] + unless test[:filter].nil? + configuration["filter"] = test[:filter] end unless test[:upsert].nil? configuration["upsert"] = test[:upsert] end + unless test[:update_expressions].nil? + configuration["update_expressions"] = test[:update_expressions] + end return configuration } expected = test[:expected] - it "should send that document as an update to mongodb with query_key '#{expected[:query_key]}', query_value '#{expected[:query_value]}' and upsert '#{expected[:upsert]}'" do + it "should send that document as an update to mongodb with :filter => '#{expected[:filter]}', :upsert => '#{expected[:upsert]}' and :update_expressions => '#{expected[:update_expressions]}'" do + expect(event).to receive(:timestamp).and_return(nil) expect(event).to receive(:to_hash).and_return(properties) + + update = if !expected[:update_expressions].nil? + expected[:update_expressions] + else + {"$set" => properties} + end + expect(collection).to receive(:bulk_write).with( - [{:update_one => {:filter => {expected[:query_key] => expected[:query_value]}, :update => {"$set" => properties}, :upsert => expected[:upsert]}}] - ) + [{:update_one => {:filter => expected[:filter], + :update => update, + :upsert => expected[:upsert]}}]) subject.receive(event) end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 35ccf31..2bc2a17 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -2,6 +2,10 @@ require "logstash/devutils/rspec/spec_helper" require "logstash/outputs/mongodb" +RSpec.configure do |config| + config.example_status_persistence_file_path = 'spec/test-report.txt' +end + RSpec::Matchers.define :have_received do |event| match do |subject| client = subject.instance_variable_get("@db") From 1fb9e9274013daf36c1b893c8a0f4d16c7e0c91d Mon Sep 17 00:00:00 2001 From: Andrea Richiardi Date: Fri, 16 Oct 2020 10:23:18 -0700 Subject: [PATCH 08/10] Add Example Usage to the README --- README.md | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/README.md b/README.md index 0ffc4fd..a1206b9 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,57 @@ This is a plugin for [Logstash](https://github.com/elastic/logstash). It is fully free and fully open source. The license is Apache 2.0, meaning you are pretty much free to use it however you want in whatever way. +## Example Usage + +``` ruby +output { + if [@metadata][event_type] == "product.event.created" { + mongodb { + id => "orders.projection.mongodb.product-insert" + uri => "${MONGO_DNS}" + collection => "product-aggregates" + database => "carts" + isodate => true + action => "update" + upsert => true + } + } + + if [@metadata][event_type] == "product.event.updated" or [@metadata][event_type] == "product.event.deleted" { + mongodb { + id => "orders.projection.mongodb.product-update" + uri => "${MONGO_DNS}" + collection => "product-aggregates" + database => "carts" + isodate => true + action => "update" + filter => { + "_id" => "[_id]" + "store_id" => "[store_id]" + } + } + } + + if [@metadata][event_type] == "stock.updated" and [quantity] > 0 { + mongodb { + id => "orders.projection.mongodb.stock-update" + uri => "${MONGO_DNS}" + collection => "product-aggregates" + database => "carts" + isodate => true + action => "update" + filter => { + "_id" => "[_id]" + "store_id" => "[store_id]" + } + update_expressions => { + "$inc" => {"stock" => "[stock_delta]"} + } + } + } +} +``` + ## Documentation Logstash provides infrastructure to automatically generate documentation for this plugin. We use the asciidoc format to write documentation so any comments in the source code will be first converted into asciidoc and then into html. All plugin documentation are placed under one [central location](http://www.elastic.co/guide/en/logstash/current/). From 75fc6fbfd2a8cdca025a4912663f98fa899ed78a Mon Sep 17 00:00:00 2001 From: Andrea Richiardi Date: Fri, 16 Oct 2020 12:23:04 -0700 Subject: [PATCH 09/10] Add max_retries option to the configuration This option makes possible to set a maximum number the plugin will retry writes for. It defaults to retry forever (negative max_retries) to keep backward compatibility. --- lib/logstash/outputs/mongodb.rb | 32 +++++++++++++++++++++++++++++--- spec/spec_helper.rb | 6 ++++++ 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/lib/logstash/outputs/mongodb.rb b/lib/logstash/outputs/mongodb.rb index b8c2e79..c26580e 100644 --- a/lib/logstash/outputs/mongodb.rb +++ b/lib/logstash/outputs/mongodb.rb @@ -29,6 +29,11 @@ class LogStash::Outputs::Mongodb < LogStash::Outputs::Base # The number of seconds to wait after failure before retrying. config :retry_delay, :validate => :number, :default => 3, :required => false + # The maximum number of times we should retry for. + # + # If not present the plugin will retry forever. This is the default. + config :max_retries, :validate => :number, :default => -1, :required => false + # If true, an "_id" field will be added to the document before insertion. # The "_id" field will use the timestamp of the event and overwrite an existing # "_id" field in the event. @@ -144,6 +149,9 @@ def validate_action(action, filter, update_expressions) end def receive(event) + + retry_count = 0 + action = event.sprintf(@action) validate_action(action, @filter, @update_expressions) @@ -196,7 +204,21 @@ def receive(event) result = write_to_mongodb(collection, [document]) @logger.debug("Bulk write result", :result => result) end + rescue => e + logger_data = {:collection => collection, + :document => document, + :action => action, + :filter => document["metadata_mongodb_output_filter"], + :update_expressions => document["metadata_mongodb_output_update_expressions"]} + + if (e.is_a? Mongo::Error::BulkWriteError) + logger_data["result"] = e.result + end + + @logger.debug("Error: #{e.message}", logger_data) + @logger.trace("Error backtrace", backtrace: e.backtrace) + if e.message =~ /^E11000/ # On a duplicate key error, skip the insert. # We could check if the duplicate key err is the _id key @@ -205,9 +227,13 @@ def receive(event) # to fix the issue. @logger.warn("Skipping insert because of a duplicate key error", :event => event, :exception => e) else - @logger.warn("Failed to send event to MongoDB retrying in #{@retry_delay.to_s} seconds", :result=> e.result, :message => e.message) - sleep(@retry_delay) - retry + # if max_retries is negative we retry forever + if (@max_retries < 0 || retry_count < @max_retries) + retry_count += 1 + @logger.warn("Failed to send event to MongoDB retrying (#{retry_count.to_s}) in #{@retry_delay.to_s} seconds") + sleep(@retry_delay) + retry + end end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 2bc2a17..9550545 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -2,6 +2,12 @@ require "logstash/devutils/rspec/spec_helper" require "logstash/outputs/mongodb" +if ENV["TEST_DEBUG"] + LogStash::Logging::Logger::configure_logging("DEBUG") +else + LogStash::Logging::Logger::configure_logging("OFF") +end + RSpec.configure do |config| config.example_status_persistence_file_path = 'spec/test-report.txt' end From 8a04990ba8f37a6e0f25b4badbe2feb0312e8618 Mon Sep 17 00:00:00 2001 From: Andrea Richiardi Date: Fri, 16 Oct 2020 12:23:04 -0700 Subject: [PATCH 10/10] Add max_retries option to the configuration This option makes possible to set a maximum number the plugin will retry writes for. It defaults to retry forever (negative max_retries) to keep backward compatibility. --- lib/logstash/outputs/mongodb.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/outputs/mongodb.rb b/lib/logstash/outputs/mongodb.rb index c26580e..19caad0 100644 --- a/lib/logstash/outputs/mongodb.rb +++ b/lib/logstash/outputs/mongodb.rb @@ -202,7 +202,7 @@ def receive(event) end else result = write_to_mongodb(collection, [document]) - @logger.debug("Bulk write result", :result => result) + @logger.debug("Bulk write result: #{result.to_s}") end rescue => e