diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 734646b..0f8c739 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -40,6 +40,10 @@ This plugin supports the following configuration options plus the <> |<>|No | <> |<>|No | <> |<>|Yes +| <> |<>|Yes +| <> |<>|No +| <> |<>|No +| <> |<>|No |======================================================================= Also see <> for a list of options supported by all @@ -129,6 +133,50 @@ The number of seconds to wait after failure before retrying. A MongoDB URI to connect to. See http://docs.mongodb.org/manual/reference/connection-string/. +[id="plugins-{type}s-{plugin}-action"] +===== `action` + +* Value type is <> +* Default value is `insert`. + +The method used to write processed events to MongoDB. + +Possible values are `insert`, `update` or `replace`. + +[id="plugins-{type}s-{plugin}-query_key"] +===== `query_key` + +* Value type is <> +* Default value is `_id`. + +The key of the query to find the document to update or replace in MongoDB. + +query_key is used like described https://docs.mongodb.com/ruby-driver/v2.6/tutorials/ruby-driver-bulk-operations[here] +for `update` and `replace` examples: + + :filter => {query_key => query_value} + +[id="plugins-{type}s-{plugin}-query_value"] +===== `query_value` + +* Value type is <> +* There is no default value for this setting. + +The value of the query to find the document to update or replace in MongoDB. This can be dynamic using the `%{foo}` syntax. + +query_value is used like described https://docs.mongodb.com/ruby-driver/v2.6/tutorials/ruby-driver-bulk-operations[here] +for `update` and `replace` examples: + + :filter => {query_key => query_value} + +[id="plugins-{type}s-{plugin}-upsert"] +===== `upsert` + +* Value type is <> +* Default value is `false`. + +If true, a new document is created if no document exists in DB with given `document_id`. +Only applies if action is `update` or `replace`. [id="plugins-{type}s-{plugin}-common-options"] diff --git a/lib/logstash/outputs/bson/big_decimal.rb b/lib/logstash/outputs/bson/big_decimal.rb index f7a2c89..3b5d4ef 100644 --- a/lib/logstash/outputs/bson/big_decimal.rb +++ b/lib/logstash/outputs/bson/big_decimal.rb @@ -33,7 +33,7 @@ module BigDecimal # 1.221311.to_bson # @return [ String ] The encoded string. # @see http://bsonspec.org/#/specification - def to_bson(buffer = ByteBuffer.new) + def to_bson(buffer = ByteBuffer.new, validating_keys = Config.validating_keys?) buffer.put_bytes([ self ].pack(PACK)) end diff --git a/lib/logstash/outputs/bson/logstash_event.rb b/lib/logstash/outputs/bson/logstash_event.rb index 7dcd4c2..9b4e17c 100644 --- a/lib/logstash/outputs/bson/logstash_event.rb +++ b/lib/logstash/outputs/bson/logstash_event.rb @@ -30,7 +30,7 @@ module LogStashEvent # Event.new("field" => "value").to_bson # @return [ String ] The encoded string. # @see http://bsonspec.org/#/specification - def to_bson(buffer = ByteBuffer.new) + def to_bson(buffer = ByteBuffer.new, validating_keys = Config.validating_keys?) position = buffer.length buffer.put_int32(0) to_hash.each do |field, value| diff --git a/lib/logstash/outputs/bson/logstash_timestamp.rb b/lib/logstash/outputs/bson/logstash_timestamp.rb index cb2141f..6eae0f0 100644 --- a/lib/logstash/outputs/bson/logstash_timestamp.rb +++ b/lib/logstash/outputs/bson/logstash_timestamp.rb @@ -25,7 +25,7 @@ module LogStashTimestamp # A time is type 0x09 in the BSON spec. BSON_TYPE = 9.chr.force_encoding(BINARY).freeze - def to_bson(buffer = ByteBuffer.new) + def to_bson(buffer = ByteBuffer.new, validating_keys = Config.validating_keys?) time.to_bson(buffer) end diff --git a/lib/logstash/outputs/mongodb.rb b/lib/logstash/outputs/mongodb.rb index 0b88c68..245dacc 100644 --- a/lib/logstash/outputs/mongodb.rb +++ b/lib/logstash/outputs/mongodb.rb @@ -43,13 +43,23 @@ class LogStash::Outputs::Mongodb < LogStash::Outputs::Base # whatever the bulk interval value (mongodb hard limit is 1000). config :bulk_size, :validate => :number, :default => 900, :maximum => 999, :min => 2 + # The method used to write processed events to MongoDB. + # Possible values are `insert`, `update` and `replace`. + config :action, :validate => :string, :required => true + # The key of the query to find the document to update or replace. + config :query_key, :validate => :string, :required => false, :default => "_id" + # The value of the query to find the document to update or replace. This can be dynamic using the `%{foo}` syntax. + config :query_value, :validate => :string, :required => false + # If true, a new document is created if no document exists in DB with given `document_id`. + # Only applies if action is `update` or `replace`. + config :upsert, :validate => :boolean, :required => false, :default => false + # Mutex used to synchronize access to 'documents' @@mutex = Mutex.new def register - if @bulk_size > 1000 - raise LogStash::ConfigurationError, "Bulk size must be lower than '1000', currently '#{@bulk_size}'" - end + + validate_config Mongo::Logger.logger = @logger conn = Mongo::Client.new(@uri) @@ -65,7 +75,7 @@ def register @@mutex.synchronize do @documents.each do |collection, values| if values.length > 0 - @db[collection].insert_many(values) + write_to_mongodb(collection, values) @documents.delete(collection) end end @@ -74,6 +84,18 @@ def register end end + def validate_config + if @bulk_size > 1000 + raise LogStash::ConfigurationError, "Bulk size must be lower than '1000', currently '#{@bulk_size}'" + end + if @action != "insert" && @action != "update" && @action != "replace" + raise LogStash::ConfigurationError, "Only insert, update and replace are valid for 'action' setting." + end + if (@action == "update" || @action == "replace") && (@query_value.nil? || @query_value.empty?) + raise LogStash::ConfigurationError, "If action is update or replace, query_value must be set." + end + end + def receive(event) begin # Our timestamp object now has a to_bson method, using it here @@ -94,8 +116,11 @@ def receive(event) document["_id"] = BSON::ObjectId.new end + collection = event.sprintf(@collection) + if @action == "update" or @action == "replace" + document["metadata_mongodb_output_query_value"] = event.sprintf(@query_value) + end if @bulk - collection = event.sprintf(@collection) @@mutex.synchronize do if(!@documents[collection]) @documents[collection] = [] @@ -103,12 +128,12 @@ def receive(event) @documents[collection].push(document) if(@documents[collection].length >= @bulk_size) - @db[collection].insert_many(@documents[collection]) + write_to_mongodb(collection, @documents[collection]) @documents.delete(collection) end end else - @db[event.sprintf(@collection)].insert_one(document) + write_to_mongodb(collection, [document]) end rescue => e if e.message =~ /^E11000/ @@ -126,6 +151,38 @@ def receive(event) end end + def write_to_mongodb(collection, documents) + ops = get_write_ops(documents) + @db[collection].bulk_write(ops) + end + + def get_write_ops(documents) + ops = [] + documents.each do |doc| + replaced_query_value = doc["metadata_mongodb_output_query_value"] + doc.delete("metadata_mongodb_output_query_value") + if @action == "insert" + ops << {:insert_one => doc} + elsif @action == "update" + ops << {:update_one => {:filter => {@query_key => replaced_query_value}, :update => {'$set' => to_dotted_hash(doc)}, :upsert => @upsert}} + elsif @action == "replace" + ops << {:replace_one => {:filter => {@query_key => replaced_query_value}, :replacement => doc, :upsert => @upsert}} + end + end + ops + end + + def to_dotted_hash(hash, recursive_key = "") + hash.each_with_object({}) do |(k, v), ret| + key = recursive_key + k.to_s + if v.is_a? Hash + ret.merge! to_dotted_hash(v, key + ".") + else + ret[key] = v + end + end + end + def close @closed.make_true @bulk_thread.wakeup diff --git a/logstash-output-mongodb.gemspec b/logstash-output-mongodb.gemspec index e198114..550fbd1 100644 --- a/logstash-output-mongodb.gemspec +++ b/logstash-output-mongodb.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-mongodb' - s.version = '3.1.6' + s.version = '3.2.0' s.licenses = ['Apache License (2.0)'] s.summary = "Writes events to MongoDB" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" @@ -21,7 +21,7 @@ Gem::Specification.new do |s| # Gem dependencies s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_runtime_dependency 'logstash-codec-plain' - s.add_runtime_dependency 'mongo', '~> 2.6' + s.add_runtime_dependency 'mongo', '= 2.6' s.add_development_dependency 'logstash-devutils' end diff --git a/spec/integration/mongodb_spec.rb b/spec/integration/mongodb_spec.rb index 690e8fa..c6ed3e2 100644 --- a/spec/integration/mongodb_spec.rb +++ b/spec/integration/mongodb_spec.rb @@ -7,10 +7,11 @@ let(:database) { 'logstash' } let(:collection) { 'logs' } let(:uuid) { SecureRandom.uuid } + let(:action) { 'insert' } let(:config) do { "uri" => uri, "database" => database, - "collection" => collection, "isodate" => true } + "collection" => collection, "isodate" => true, "action" => action } end describe "#send" do diff --git a/spec/outputs/mongodb_config_validation_spec.rb b/spec/outputs/mongodb_config_validation_spec.rb new file mode 100644 index 0000000..4f590df --- /dev/null +++ b/spec/outputs/mongodb_config_validation_spec.rb @@ -0,0 +1,64 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/plugin" + +describe LogStash::Outputs::Mongodb do + + let(:uri) { 'mongodb://localhost:27017' } + let(:database) { 'logstash' } + let(:collection) { 'logs' } + + describe "validate_config method" do + + subject! { LogStash::Outputs::Mongodb.new(config) } + + [ + {:action => "not-supported", :query_key => "qk", :query_value => "qv", :upsert => false, + :expected_reason => "Only insert, update and replace are valid for 'action' setting."}, + {:action => "update", :query_key => "qk", :query_value => nil, :upsert => false, + :expected_reason => "If action is update or replace, query_value must be set."}, + {:action => "update", :query_key => "qk", :query_value => "", :upsert => false, + :expected_reason => "If action is update or replace, query_value must be set."}, + {:action => "replace", :query_key => "qk", :query_value => nil, :upsert => false, + :expected_reason => "If action is update or replace, query_value must be set."}, + {:action => "replace", :query_key => "qk", :query_value => "", :upsert => false, + :expected_reason => "If action is update or replace, query_value must be set."}, + {:action => "insert", :bulk_size => 1001, + :expected_reason => "Bulk size must be lower than '1000', currently '1001'"}, + ].each do |test| + + describe "when validating config with action '#{test[:action]}' query_key '#{test[:query_key]}', query_value '#{test[:query_value]}' and upsert '#{test[:upsert]}'" do + + let(:config) { + configuration = { + "uri" => uri, + "database" => database, + "collection" => collection + } + unless test[:action].nil? + configuration["action"] = test[:action] + end + unless test[:query_key].nil? + configuration["query_key"] = test[:query_key] + end + unless test[:query_value].nil? + configuration["query_value"] = test[:query_value] + end + unless test[:upsert].nil? + configuration["upsert"] = test[:upsert] + end + unless test[:bulk_size].nil? + configuration["bulk_size"] = test[:bulk_size] + end + return configuration + } + + it "should raise error: #{test[:expected_reason]}" do + expect { subject.validate_config }.to raise_error(LogStash::ConfigurationError, test[:expected_reason]) + end + end + + end + + end +end diff --git a/spec/outputs/mongodb_spec.rb b/spec/outputs/mongodb_insert_spec.rb similarity index 69% rename from spec/outputs/mongodb_spec.rb rename to spec/outputs/mongodb_insert_spec.rb index 8d3decb..a442465 100644 --- a/spec/outputs/mongodb_spec.rb +++ b/spec/outputs/mongodb_insert_spec.rb @@ -7,11 +7,13 @@ let(:uri) { 'mongodb://localhost:27017' } let(:database) { 'logstash' } let(:collection) { 'logs' } + let(:action) { 'insert' } let(:config) {{ "uri" => uri, "database" => database, - "collection" => collection + "collection" => collection, + "action" => action }} it "should register and close" do @@ -20,7 +22,7 @@ plugin.close end - describe "receive" do + describe "receive method while action is 'insert'" do subject! { LogStash::Outputs::Mongodb.new(config) } let(:event) { LogStash::Event.new(properties) } @@ -32,7 +34,7 @@ allow(Mongo::Client).to receive(:new).and_return(connection) allow(connection).to receive(:use).and_return(client) allow(client).to receive(:[]).and_return(collection) - allow(collection).to receive(:insert_one) + allow(collection).to receive(:bulk_write) subject.register end @@ -40,7 +42,7 @@ subject.close end - describe "#send" do + describe "when processing an event" do let(:properties) {{ "message" => "This is a message!", "uuid" => SecureRandom.uuid, @@ -49,36 +51,41 @@ }} it "should send the event to the database" do - expect(collection).to receive(:insert_one) + expect(collection).to receive(:bulk_write) subject.receive(event) end end - describe "no event @timestamp" do + describe "when processing an event without @timestamp set" do let(:properties) { { "message" => "foo" } } - it "should not contain a @timestamp field in the mongo document" do + it "should send a document without @timestamp field to mongodb" do expect(event).to receive(:timestamp).and_return(nil) expect(event).to receive(:to_hash).and_return(properties) - expect(collection).to receive(:insert_one).with(properties) + expect(collection).to receive(:bulk_write).with( + [ {:insert_one => properties} ] + ) subject.receive(event) end end - describe "generateId" do + describe "when generateId is set" do let(:properties) { { "message" => "foo" } } let(:config) {{ "uri" => uri, "database" => database, "collection" => collection, - "generateId" => true + "generateId" => true, + "action" => "insert" }} - it "should contain a BSON::ObjectId as _id" do + it "should send a document containing a BSON::ObjectId as _id to mongodb" do expect(BSON::ObjectId).to receive(:new).and_return("BSON::ObjectId") expect(event).to receive(:timestamp).and_return(nil) expect(event).to receive(:to_hash).and_return(properties) - expect(collection).to receive(:insert_one).with(properties.merge("_id" => "BSON::ObjectId")) + expect(collection).to receive(:bulk_write).with( + [ {:insert_one => properties.merge("_id" => "BSON::ObjectId")} ] + ) subject.receive(event) end end diff --git a/spec/outputs/mongodb_replace_spec.rb b/spec/outputs/mongodb_replace_spec.rb new file mode 100644 index 0000000..d8bf35d --- /dev/null +++ b/spec/outputs/mongodb_replace_spec.rb @@ -0,0 +1,98 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/plugin" + +describe LogStash::Outputs::Mongodb do + + let(:uri) { 'mongodb://localhost:27017' } + let(:database) { 'logstash' } + let(:collection) { 'logs' } + let(:action) { 'replace' } + + let(:config) { { + "uri" => uri, + "database" => database, + "collection" => collection, + "action" => action + } } + + describe "receive method while action is 'replace'" do + subject! { LogStash::Outputs::Mongodb.new(config) } + + let(:properties) { { + "message" => "This is a message!", + "uuid" => SecureRandom.uuid, + "number" => BigDecimal.new("4321.1234"), + "utf8" => "żółć" + } } + let(:event) { LogStash::Event.new(properties) } + let(:connection) { double("connection") } + let(:client) { double("client") } + let(:collection) { double("collection") } + + before(:each) do + allow(Mongo::Client).to receive(:new).and_return(connection) + allow(connection).to receive(:use).and_return(client) + allow(client).to receive(:[]).and_return(collection) + allow(collection).to receive(:bulk_write) + subject.register + end + + after(:each) do + subject.close + end + + [ + {:query_key => nil, :query_value => "qv", :upsert => false, + :expected => {:query_key => "_id", :query_value => "qv", :upsert => false} + }, + {:query_key => "qk", :query_value => "qv", :upsert => false, + :expected => {:query_key => "qk", :query_value => "qv", :upsert => false} + }, + {:query_key => "qk", :query_value => "qv", :upsert => nil, + :expected => {:query_key => "qk", :query_value => "qv", :upsert => false} + }, + {:query_key => nil, :query_value => "qv", :upsert => true, + :expected => {:query_key => "_id", :query_value => "qv", :upsert => true} + }, + {:query_key => "qk", :query_value => "qv", :upsert => true, + :expected => {:query_key => "qk", :query_value => "qv", :upsert => true} + }, + ].each do |test| + + describe "when processing an event with query_key set to '#{test[:query_key]}', query_value set to '#{test[:query_value]}' and upsert set to '#{test[:upsert]}'" do + + let(:config) { + configuration = { + "uri" => uri, + "database" => database, + "collection" => collection, + "action" => action + } + unless test[:query_key].nil? + configuration["query_key"] = test[:query_key] + end + unless test[:query_value].nil? + configuration["query_value"] = test[:query_value] + end + unless test[:upsert].nil? + configuration["upsert"] = test[:upsert] + end + return configuration + } + + expected = test[:expected] + it "should send that document as a replace to mongodb with query_key '#{expected[:query_key]}', query_value '#{expected[:query_value]}' and upsert '#{expected[:upsert]}'" do + expect(event).to receive(:timestamp).and_return(nil) + expect(event).to receive(:to_hash).and_return(properties) + expect(collection).to receive(:bulk_write).with( + [{:replace_one => {:filter => {expected[:query_key] => expected[:query_value]}, :replacement => properties, :upsert => expected[:upsert]}}] + ) + subject.receive(event) + end + end + + end + + end +end diff --git a/spec/outputs/mongodb_update_nested_fields_spec.rb b/spec/outputs/mongodb_update_nested_fields_spec.rb new file mode 100644 index 0000000..fa0f732 --- /dev/null +++ b/spec/outputs/mongodb_update_nested_fields_spec.rb @@ -0,0 +1,89 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/plugin" + +describe LogStash::Outputs::Mongodb do + + let(:uri) { 'mongodb://localhost:27017' } + let(:database) { 'logstash' } + let(:collection) { 'logs' } + let(:action) { 'update' } + let(:query_value) { 'qv' } + + let(:config) { { + "uri" => uri, + "database" => database, + "collection" => collection, + "action" => action, + "query_value" => query_value + } } + + describe "receive method while action is 'update'" do + subject! { LogStash::Outputs::Mongodb.new(config) } + + let(:properties) { { + "message" => "This is a message!", + "rootHashField" => { + "numFieldInHash" => 1, + "hashFieldInHash" => { + "numField": 2 + }, + "arrayFieldInHash" => ["one", "two", "three"] + }, + "rootArrayField": [ + {"strFieldInArray" => "four"}, + {"strFieldInArray" => "five"}, + {"strFieldInArray" => "six"} + ], + "nestedArrayField": [ + {"strFieldInArray" => "four", "arrayFieldInArray" => [3, 4], "hashFieldInArray" => {"numField" => 9}}, + {"strFieldInArray" => "five", "arrayFieldInArray" => [5, 6], "hashFieldInArray" => {"numField" => 10}}, + {"strFieldInArray" => "six", "arrayFieldInArray" => [7, 8], "hashFieldInArray" => {"numField" => 11}} + ] + } } + let(:event) { LogStash::Event.new(properties) } + let(:connection) { double("connection") } + let(:client) { double("client") } + let(:collection) { double("collection") } + + before(:each) do + allow(Mongo::Client).to receive(:new).and_return(connection) + allow(connection).to receive(:use).and_return(client) + allow(client).to receive(:[]).and_return(collection) + allow(collection).to receive(:bulk_write) + subject.register + end + + after(:each) do + subject.close + end + + describe "when processing an event with nested hash" do + + it "should send a document update to mongodb with dotted notation for fields in inner hashes" do + expect(event).to receive(:timestamp).and_return(nil) + expect(event).to receive(:to_hash).and_return(properties) + expect(collection).to receive(:bulk_write).with( + [{:update_one => {:filter => {"_id" => query_value}, :update => {"$set" => { + "message" => "This is a message!", + "rootHashField.numFieldInHash" => 1, + "rootHashField.hashFieldInHash.numField" => 2, + "rootHashField.arrayFieldInHash" => ["one", "two", "three"], + "rootArrayField" => [ + {"strFieldInArray" => "four"}, + {"strFieldInArray" => "five"}, + {"strFieldInArray" => "six"} + ], + "nestedArrayField" => [ + {"strFieldInArray" => "four", "arrayFieldInArray" => [3, 4], "hashFieldInArray" => {"numField" => 9}}, + {"strFieldInArray" => "five", "arrayFieldInArray" => [5, 6], "hashFieldInArray" => {"numField" => 10}}, + {"strFieldInArray" => "six", "arrayFieldInArray" => [7, 8], "hashFieldInArray" => {"numField" => 11}} + ], + }}, :upsert => false}}] + ) + subject.receive(event) + end + end + + end +end diff --git a/spec/outputs/mongodb_update_spec.rb b/spec/outputs/mongodb_update_spec.rb new file mode 100644 index 0000000..2955d40 --- /dev/null +++ b/spec/outputs/mongodb_update_spec.rb @@ -0,0 +1,98 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/plugin" + +describe LogStash::Outputs::Mongodb do + + let(:uri) { 'mongodb://localhost:27017' } + let(:database) { 'logstash' } + let(:collection) { 'logs' } + let(:action) { 'update' } + + let(:config) { { + "uri" => uri, + "database" => database, + "collection" => collection, + "action" => action + } } + + describe "receive method while action is 'update'" do + subject! { LogStash::Outputs::Mongodb.new(config) } + + let(:properties) { { + "message" => "This is a message!", + "uuid" => SecureRandom.uuid, + "number" => BigDecimal.new("4321.1234"), + "utf8" => "żółć" + } } + let(:event) { LogStash::Event.new(properties) } + let(:connection) { double("connection") } + let(:client) { double("client") } + let(:collection) { double("collection") } + + before(:each) do + allow(Mongo::Client).to receive(:new).and_return(connection) + allow(connection).to receive(:use).and_return(client) + allow(client).to receive(:[]).and_return(collection) + allow(collection).to receive(:bulk_write) + subject.register + end + + after(:each) do + subject.close + end + + [ + {:query_key => nil, :query_value => "qv", :upsert => false, + :expected => {:query_key => "_id", :query_value => "qv", :upsert => false} + }, + {:query_key => "qk", :query_value => "qv", :upsert => false, + :expected => {:query_key => "qk", :query_value => "qv", :upsert => false} + }, + {:query_key => "qk", :query_value => "qv", :upsert => nil, + :expected => {:query_key => "qk", :query_value => "qv", :upsert => false} + }, + {:query_key => nil, :query_value => "qv", :upsert => true, + :expected => {:query_key => "_id", :query_value => "qv", :upsert => true} + }, + {:query_key => "qk", :query_value => "qv", :upsert => true, + :expected => {:query_key => "qk", :query_value => "qv", :upsert => true} + }, + ].each do |test| + + describe "when processing an event with query_key set to '#{test[:query_key]}', query_value set to '#{test[:query_value]}' and upsert set to '#{test[:upsert]}'" do + + let(:config) { + configuration = { + "uri" => uri, + "database" => database, + "collection" => collection, + "action" => action + } + unless test[:query_key].nil? + configuration["query_key"] = test[:query_key] + end + unless test[:query_value].nil? + configuration["query_value"] = test[:query_value] + end + unless test[:upsert].nil? + configuration["upsert"] = test[:upsert] + end + return configuration + } + + expected = test[:expected] + it "should send that document as an update to mongodb with query_key '#{expected[:query_key]}', query_value '#{expected[:query_value]}' and upsert '#{expected[:upsert]}'" do + expect(event).to receive(:timestamp).and_return(nil) + expect(event).to receive(:to_hash).and_return(properties) + expect(collection).to receive(:bulk_write).with( + [{:update_one => {:filter => {expected[:query_key] => expected[:query_value]}, :update => {"$set" => properties}, :upsert => expected[:upsert]}}] + ) + subject.receive(event) + end + end + + end + + end +end