diff --git a/.gitignore b/.gitignore index 3300a23..1ba8135 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ Gemfile.lock .bundle vendor +logs diff --git a/README.md b/README.md index b5831b0..a1206b9 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,66 @@ -# Logstash Plugin +# Logstash Mongo Output Plugin -[![Travis Build Status](https://travis-ci.org/logstash-plugins/logstash-output-mongodb.svg)](https://travis-ci.org/logstash-plugins/logstash-output-mongodb) +--- +This is a fork of [logstash-plugins/logstash-output-mongodb](https://github.com/logstash-plugins/logstash-output-mongodb). + +It adds the :action, :filter, :update_expressions and :upsert parameters +--- This is a plugin for [Logstash](https://github.com/elastic/logstash). It is fully free and fully open source. The license is Apache 2.0, meaning you are pretty much free to use it however you want in whatever way. +## Example Usage + +``` ruby +output { + if [@metadata][event_type] == "product.event.created" { + mongodb { + id => "orders.projection.mongodb.product-insert" + uri => "${MONGO_DNS}" + collection => "product-aggregates" + database => "carts" + isodate => true + action => "update" + upsert => true + } + } + + if [@metadata][event_type] == "product.event.updated" or [@metadata][event_type] == "product.event.deleted" { + mongodb { + id => "orders.projection.mongodb.product-update" + uri => "${MONGO_DNS}" + collection => "product-aggregates" + database => "carts" + isodate => true + action => "update" + filter => { + "_id" => "[_id]" + "store_id" => "[store_id]" + } + } + } + + if [@metadata][event_type] == "stock.updated" and [quantity] > 0 { + mongodb { + id => "orders.projection.mongodb.stock-update" + uri => "${MONGO_DNS}" + collection => "product-aggregates" + database => "carts" + isodate => true + action => "update" + filter => { + "_id" => "[_id]" + "store_id" => "[store_id]" + } + update_expressions => { + "$inc" => {"stock" => "[stock_delta]"} + } + } + } +} +``` + ## Documentation Logstash provides infrastructure to automatically generate documentation for this plugin. We use the asciidoc format to write documentation so any comments in the source code will be first converted into asciidoc and then into html. All plugin documentation are placed under one [central location](http://www.elastic.co/guide/en/logstash/current/). @@ -21,20 +76,17 @@ Need help? Try #logstash on freenode IRC or the https://discuss.elastic.co/c/log ### 1. Plugin Developement and Testing -#### Code -- To get started, you'll need JRuby with the Bundler gem installed. +For developing this plugin we use the wonderful work of [cameronkerrnz/logstash-plugin-dev](https://github.com/cameronkerrnz/logstash-plugin-dev): -- Create a new plugin or clone and existing from the GitHub [logstash-plugins](https://github.com/logstash-plugins) organization. We also provide [example plugins](https://github.com/logstash-plugins?query=example). +To start an interactive environment run: -- Install dependencies -```sh -bundle install +``` sh +docker run --rm -it -v ${PWD}:/work cameronkerrnz/logstash-plugin-dev:7.9 ``` -#### Test - -- Update your dependencies +After that you can run the usual suspects: +- Install/Update dependencies ```sh bundle install ``` @@ -95,4 +147,4 @@ Programming is not a required skill. Whatever you've seen about open source and It is more important to the community that you are able to contribute. -For more information about contributing, see the [CONTRIBUTING](https://github.com/elastic/logstash/blob/master/CONTRIBUTING.md) file. \ No newline at end of file +For more information about contributing, see the [CONTRIBUTING](https://github.com/elastic/logstash/blob/master/CONTRIBUTING.md) file. diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 734646b..0f8c739 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -40,6 +40,10 @@ This plugin supports the following configuration options plus the <> |<>|No | <> |<>|No | <> |<>|Yes +| <> |<>|Yes +| <> |<>|No +| <> |<>|No +| <> |<>|No |======================================================================= Also see <> for a list of options supported by all @@ -129,6 +133,50 @@ The number of seconds to wait after failure before retrying. A MongoDB URI to connect to. See http://docs.mongodb.org/manual/reference/connection-string/. +[id="plugins-{type}s-{plugin}-action"] +===== `action` + +* Value type is <> +* Default value is `insert`. + +The method used to write processed events to MongoDB. + +Possible values are `insert`, `update` or `replace`. + +[id="plugins-{type}s-{plugin}-query_key"] +===== `query_key` + +* Value type is <> +* Default value is `_id`. + +The key of the query to find the document to update or replace in MongoDB. + +query_key is used like described https://docs.mongodb.com/ruby-driver/v2.6/tutorials/ruby-driver-bulk-operations[here] +for `update` and `replace` examples: + + :filter => {query_key => query_value} + +[id="plugins-{type}s-{plugin}-query_value"] +===== `query_value` + +* Value type is <> +* There is no default value for this setting. + +The value of the query to find the document to update or replace in MongoDB. This can be dynamic using the `%{foo}` syntax. + +query_value is used like described https://docs.mongodb.com/ruby-driver/v2.6/tutorials/ruby-driver-bulk-operations[here] +for `update` and `replace` examples: + + :filter => {query_key => query_value} + +[id="plugins-{type}s-{plugin}-upsert"] +===== `upsert` + +* Value type is <> +* Default value is `false`. + +If true, a new document is created if no document exists in DB with given `document_id`. +Only applies if action is `update` or `replace`. [id="plugins-{type}s-{plugin}-common-options"] diff --git a/lib/logstash/outputs/bson/big_decimal.rb b/lib/logstash/outputs/bson/big_decimal.rb index f7a2c89..3b5d4ef 100644 --- a/lib/logstash/outputs/bson/big_decimal.rb +++ b/lib/logstash/outputs/bson/big_decimal.rb @@ -33,7 +33,7 @@ module BigDecimal # 1.221311.to_bson # @return [ String ] The encoded string. # @see http://bsonspec.org/#/specification - def to_bson(buffer = ByteBuffer.new) + def to_bson(buffer = ByteBuffer.new, validating_keys = Config.validating_keys?) buffer.put_bytes([ self ].pack(PACK)) end diff --git a/lib/logstash/outputs/bson/logstash_event.rb b/lib/logstash/outputs/bson/logstash_event.rb index 7dcd4c2..9b4e17c 100644 --- a/lib/logstash/outputs/bson/logstash_event.rb +++ b/lib/logstash/outputs/bson/logstash_event.rb @@ -30,7 +30,7 @@ module LogStashEvent # Event.new("field" => "value").to_bson # @return [ String ] The encoded string. # @see http://bsonspec.org/#/specification - def to_bson(buffer = ByteBuffer.new) + def to_bson(buffer = ByteBuffer.new, validating_keys = Config.validating_keys?) position = buffer.length buffer.put_int32(0) to_hash.each do |field, value| diff --git a/lib/logstash/outputs/bson/logstash_timestamp.rb b/lib/logstash/outputs/bson/logstash_timestamp.rb index cb2141f..6eae0f0 100644 --- a/lib/logstash/outputs/bson/logstash_timestamp.rb +++ b/lib/logstash/outputs/bson/logstash_timestamp.rb @@ -25,7 +25,7 @@ module LogStashTimestamp # A time is type 0x09 in the BSON spec. BSON_TYPE = 9.chr.force_encoding(BINARY).freeze - def to_bson(buffer = ByteBuffer.new) + def to_bson(buffer = ByteBuffer.new, validating_keys = Config.validating_keys?) time.to_bson(buffer) end diff --git a/lib/logstash/outputs/mongodb.rb b/lib/logstash/outputs/mongodb.rb index 0b88c68..19caad0 100644 --- a/lib/logstash/outputs/mongodb.rb +++ b/lib/logstash/outputs/mongodb.rb @@ -29,27 +29,75 @@ class LogStash::Outputs::Mongodb < LogStash::Outputs::Base # The number of seconds to wait after failure before retrying. config :retry_delay, :validate => :number, :default => 3, :required => false + # The maximum number of times we should retry for. + # + # If not present the plugin will retry forever. This is the default. + config :max_retries, :validate => :number, :default => -1, :required => false + # If true, an "_id" field will be added to the document before insertion. # The "_id" field will use the timestamp of the event and overwrite an existing # "_id" field in the event. config :generateId, :validate => :boolean, :default => false - # Bulk insert flag, set to true to allow bulk insertion, else it will insert events one by one. config :bulk, :validate => :boolean, :default => false + # Bulk interval, Used to insert events periodically if the "bulk" flag is activated. config :bulk_interval, :validate => :number, :default => 2 + # Bulk events number, if the number of events to insert into a collection raise that limit, it will be bulk inserted # whatever the bulk interval value (mongodb hard limit is 1000). config :bulk_size, :validate => :number, :default => 900, :maximum => 999, :min => 2 + # The Mongo DB action to perform. Valid actions are: + # + # - insert: inserts a document, fails if a document the document already exists. + # - update: updates a document given a `filter`. You can also upsert a document, see the `upsert` option. + # - delete: *Not Supported* at the moment + # + # A sprintf-able string is allowed to change the action based on the content + # of the event. The value `%{[foo]}` would use the foo field for the action. + # + # For more details on actions, check out the https://docs.mongodb.com/ruby-driver/v2.6/tutorials/ruby-driver-bulk-operations[Mongo Ruby Driver documentation] + config :action, :validate => :string, :default => "insert" + + # The :filter clause for an update or replace. + # + # A sprintf-able string is allowed for keys: the value `my_%{[foo]}` would + # use the foo field instead *always coerced to a string*. + # + # Hovewever, the + # https://www.elastic.co/guide/en/logstash/current/field-references-deepdive.html[Field + # Reference Syntax] is required for values - these preserve type (integer, + # float, ...). + config :filter, :validate => :hash, :required => false, :default => {} + + # The hash in :update_expressions will be used *instead* of the default + # '$set'. This option is useful for using alternative operators like '$inc'. + # + # A sprintf-able string is allowed for keys: the value `my_%{[foo]}` would + # use the foo field instead *always coerced to a string*. + # + # Hovewever, the + # https://www.elastic.co/guide/en/logstash/current/field-references-deepdive.html[Field + # Reference Syntax] is required for values - these preserve type (integer, + # float, ...). + # + # Keys must start with `$`, see the https://docs.mongodb.com/manual/reference/operator/update/#id1[Mongo DB Update Operators] for reference. + # + # Note that pipeline support (Mongo >= 4.2) is not there yet. + config :update_expressions, :validate => :hash, :required => false, :default => nil + + # If true, a new document is created if no document exists in DB with given `document_id`. + # Only applies if action is `update` or `replace`. + config :upsert, :validate => :boolean, :required => false, :default => false + # Mutex used to synchronize access to 'documents' @@mutex = Mutex.new def register - if @bulk_size > 1000 - raise LogStash::ConfigurationError, "Bulk size must be lower than '1000', currently '#{@bulk_size}'" - end + + validate_config Mongo::Logger.logger = @logger conn = Mongo::Client.new(@uri) @@ -65,7 +113,7 @@ def register @@mutex.synchronize do @documents.each do |collection, values| if values.length > 0 - @db[collection].insert_many(values) + write_to_mongodb(collection, values) @documents.delete(collection) end end @@ -74,12 +122,44 @@ def register end end + def validate_config + if @bulk_size > 1000 + raise LogStash::ConfigurationError, "Bulk size must be lower than '1000', currently '#{@bulk_size}'" + end + if !@update_expressions.nil? + @update_expressions.keys.each { |k| + if !is_update_operator(k) + raise LogStash::ConfigurationError, "The :update_expressions option contains '#{k}', which is not an Update expression." + break + end + } + end + end + + def validate_action(action, filter, update_expressions) + if action != "insert" && action != "update" && action != "replace" + raise LogStash::ConfigurationError, "Only insert, update and replace are supported Mongo actions, got '#{action}'." + end + if (action == "update" || action == "replace") && (filter.nil? || filter.empty?) + raise LogStash::ConfigurationError, "If action is update or replace, filter must be set." + end + if action != "update" && !(update_expressions.nil? || update_expressions.empty?) + raise LogStash::ConfigurationError, "The :update_expressions only makes sense if the action is an update." + end + end + def receive(event) + + retry_count = 0 + + action = event.sprintf(@action) + + validate_action(action, @filter, @update_expressions) + begin # Our timestamp object now has a to_bson method, using it here # {}.merge(other) so we don't taint the event hash innards document = {}.merge(event.to_hash) - if !@isodate timestamp = event.timestamp if timestamp @@ -94,8 +174,21 @@ def receive(event) document["_id"] = BSON::ObjectId.new end + collection = event.sprintf(@collection) + if action == "update" or action == "replace" + document["metadata_mongodb_output_filter"] = apply_event_to_hash(event, @filter) + end + + if action == "update" and !(@update_expressions.nil? || @update_expressions.empty?) + # we only expand the values cause keys are update expressions + expressions_hash = {} + @update_expressions.each do |k, v| + expressions_hash[k] = apply_event_to_hash(event, v) + end + document["metadata_mongodb_output_update_expressions"] = expressions_hash + end + if @bulk - collection = event.sprintf(@collection) @@mutex.synchronize do if(!@documents[collection]) @documents[collection] = [] @@ -103,14 +196,29 @@ def receive(event) @documents[collection].push(document) if(@documents[collection].length >= @bulk_size) - @db[collection].insert_many(@documents[collection]) + write_to_mongodb(collection, @documents[collection]) @documents.delete(collection) end end else - @db[event.sprintf(@collection)].insert_one(document) + result = write_to_mongodb(collection, [document]) + @logger.debug("Bulk write result: #{result.to_s}") end + rescue => e + logger_data = {:collection => collection, + :document => document, + :action => action, + :filter => document["metadata_mongodb_output_filter"], + :update_expressions => document["metadata_mongodb_output_update_expressions"]} + + if (e.is_a? Mongo::Error::BulkWriteError) + logger_data["result"] = e.result + end + + @logger.debug("Error: #{e.message}", logger_data) + @logger.trace("Error backtrace", backtrace: e.backtrace) + if e.message =~ /^E11000/ # On a duplicate key error, skip the insert. # We could check if the duplicate key err is the _id key @@ -119,9 +227,81 @@ def receive(event) # to fix the issue. @logger.warn("Skipping insert because of a duplicate key error", :event => event, :exception => e) else - @logger.warn("Failed to send event to MongoDB, retrying in #{@retry_delay.to_s} seconds", :event => event, :exception => e) - sleep(@retry_delay) - retry + # if max_retries is negative we retry forever + if (@max_retries < 0 || retry_count < @max_retries) + retry_count += 1 + @logger.warn("Failed to send event to MongoDB retrying (#{retry_count.to_s}) in #{@retry_delay.to_s} seconds") + sleep(@retry_delay) + retry + end + end + end + end + + def write_to_mongodb(collection, documents) + ops = get_write_ops(documents) + @logger.debug("Sending", :ops => ops) + @db[collection].bulk_write(ops) + end + + def get_write_ops(documents) + ops = [] + documents.each do |doc| + filter = doc["metadata_mongodb_output_filter"] + doc.delete("metadata_mongodb_output_filter") + + update_expressions = doc["metadata_mongodb_output_update_expressions"] + doc.delete("metadata_mongodb_output_update_expressions") + + # TODO: support multiple expressions as pipeline for Mongo >= 4.2 + update = if !update_expressions.nil? + update_expressions + else + {'$set' => to_dotted_hash(doc)} + end + + if action == "insert" + ops << {:insert_one => doc} + elsif action == "update" + ops << {:update_one => {:filter => filter, :update => update, :upsert => @upsert}} + elsif action == "replace" + ops << {:replace_one => {:filter => filter, :replacement => doc, :upsert => @upsert}} + end + end + ops + end + + def is_update_operator(string) + string.start_with?("$") + end + + # Apply the event to the input hash keys and values. + # + # This function is recursive. + # + # It uses event.sprintf for keys but event.get for values because it looks + # like event.sprintf always returns a string and we don't want to always + # coerce. + # + # See https://github.com/elastic/logstash/issues/5114 + def apply_event_to_hash(event, hash) + hash.clone.each_with_object({}) do |(k, v), ret| + if v.is_a? Hash + ret[event.sprintf(k)] = apply_event_to_hash(event, v) + else + event_value = event.get(v) + ret[event.sprintf(k)] = event_value.nil? ? v : event_value + end + end + end + + def to_dotted_hash(hash, recursive_key = "") + hash.each_with_object({}) do |(k, v), ret| + key = recursive_key + k.to_s + if v.is_a? Hash + ret.merge! to_dotted_hash(v, key + ".") + else + ret[key] = v end end end diff --git a/logstash-output-mongodb.gemspec b/logstash-output-mongodb.gemspec index e198114..a21d569 100644 --- a/logstash-output-mongodb.gemspec +++ b/logstash-output-mongodb.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-mongodb' - s.version = '3.1.6' + s.version = '3.2.1' s.licenses = ['Apache License (2.0)'] s.summary = "Writes events to MongoDB" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" @@ -21,7 +21,7 @@ Gem::Specification.new do |s| # Gem dependencies s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_runtime_dependency 'logstash-codec-plain' - s.add_runtime_dependency 'mongo', '~> 2.6' + s.add_runtime_dependency 'mongo', '= 2.6' s.add_development_dependency 'logstash-devutils' end diff --git a/spec/integration/mongodb_spec.rb b/spec/integration/mongodb_spec.rb index 690e8fa..fe1f0e3 100644 --- a/spec/integration/mongodb_spec.rb +++ b/spec/integration/mongodb_spec.rb @@ -6,11 +6,11 @@ let(:uri) { 'mongodb://localhost:27017' } let(:database) { 'logstash' } let(:collection) { 'logs' } - let(:uuid) { SecureRandom.uuid } + let(:action) { 'insert' } let(:config) do { "uri" => uri, "database" => database, - "collection" => collection, "isodate" => true } + "collection" => collection, "isodate" => true, "action" => action } end describe "#send" do @@ -18,8 +18,10 @@ subject { LogStash::Outputs::Mongodb.new(config) } let(:properties) { { "message" => "This is a message!", - "uuid" => uuid, "number" => BigDecimal.new("4321.1234"), - "utf8" => "żółć", "int" => 42, + "uuid" => "00000000-0000-0000-0000-000000000000", + "number" => BigDecimal.new("4321.1234"), + "utf8" => "żółć", + "int" => 42, "arry" => [42, "string", 4321.1234]} } let(:event) { LogStash::Event.new(properties) } diff --git a/spec/outputs/mongodb_config_validation_spec.rb b/spec/outputs/mongodb_config_validation_spec.rb new file mode 100644 index 0000000..74f045e --- /dev/null +++ b/spec/outputs/mongodb_config_validation_spec.rb @@ -0,0 +1,102 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/plugin" + +describe LogStash::Outputs::Mongodb do + + let(:uri) { 'mongodb://localhost:27017' } + let(:database) { 'logstash' } + let(:collection) { 'logs' } + + describe "when validating config" do + + subject! { LogStash::Outputs::Mongodb.new(config) } + + [ + {:update_expressions => {"invalid-expression" => "foo"}, + :expected_reason => "The :update_expressions option contains 'invalid-expression', which is not an Update expression."}, + {:action => "insert", :bulk_size => 1001, + :expected_reason => "Bulk size must be lower than '1000', currently '1001'"}, + ].each do |test| + + describe "with :bulk_size => '#{test[:bulk_size]}', :upsert => '#{test[:upsert]}' and :update_expressions => '#{test[:update_expressions]}'" do + + let(:config) do + configuration = { + "uri" => uri, + "database" => database, + "collection" => collection, + "filter" => {"_id" => "123"}, + "action" => "update" + } + unless test[:bulk_size].nil? + configuration["bulk_size"] = test[:bulk_size] + end + unless test[:update_expressions].nil? + configuration["update_expressions"] = test[:update_expressions] + end + return configuration + end + + it "should raise error: '#{test[:expected_reason]}'" do + expect { subject.validate_config }.to raise_error(LogStash::ConfigurationError, test[:expected_reason]) + end + end + end + end + + describe "when validating action" do + + subject! { LogStash::Outputs::Mongodb.new(config) } + + [ + {:action => "unsupported", :filter => {"_id" => "123"}, :upsert => false, + :expected_reason => "Only insert, update and replace are supported Mongo actions, got 'unsupported'."}, + {:action => "delete", :filter => {"_id" => "123"}, :upsert => false, + :expected_reason => "Only insert, update and replace are supported Mongo actions, got 'delete'."}, + {:action => "update", :filter => {}, :upsert => false, + :expected_reason => "If action is update or replace, filter must be set."}, + {:action => "%{myaction}", :filter => {}, :upsert => false, + :expected_reason => "If action is update or replace, filter must be set."}, + {:action => "%{[myactionnested][foo]}", :filter => {}, :upsert => false, + :expected_reason => "If action is update or replace, filter must be set."}, + {:action => "update", :filter => nil, :upsert => false, + :expected_reason => "If action is update or replace, filter must be set."}, + {:action => "insert", :update_expressions => {"$inc" => {"quantity" => 1}}, + :expected_reason => "The :update_expressions only makes sense if the action is an update."}, + {:action => "replace", :filter => {"_id" => "123"}, :update_expressions => {"$inc" => {"quantity" => 1}}, + :expected_reason => "The :update_expressions only makes sense if the action is an update."}, + ].each do |test| + + describe "with :action => '#{test[:action]}', :filter => '#{test[:filter]}', :upsert => '#{test[:upsert]}' and :update_expressions => '#{test[:update_expressions]}'" do + + let(:event) { LogStash::Event.new("myaction" => "update", "myactionnested" => {"foo" => "replace"})} + + let(:config) do + configuration = { + "uri" => uri, + "database" => database, + "collection" => collection + } + unless test[:action].nil? + configuration["action"] = test[:action] + end + unless test[:filter].nil? + configuration["filter"] = test[:filter] + end + unless test[:upsert].nil? + configuration["upsert"] = test[:upsert] + end + unless test[:update_expressions].nil? + configuration["update_expressions"] = test[:update_expressions] + end + return configuration + end + + it "should raise error: '#{test[:expected_reason]}'" do + expect { subject.receive(event) }.to raise_error(LogStash::ConfigurationError, test[:expected_reason]) + end + end + end + end +end diff --git a/spec/outputs/mongodb_spec.rb b/spec/outputs/mongodb_insert_spec.rb similarity index 67% rename from spec/outputs/mongodb_spec.rb rename to spec/outputs/mongodb_insert_spec.rb index 8d3decb..96c6029 100644 --- a/spec/outputs/mongodb_spec.rb +++ b/spec/outputs/mongodb_insert_spec.rb @@ -7,11 +7,13 @@ let(:uri) { 'mongodb://localhost:27017' } let(:database) { 'logstash' } let(:collection) { 'logs' } + let(:action) { 'insert' } let(:config) {{ "uri" => uri, "database" => database, - "collection" => collection + "collection" => collection, + "action" => action }} it "should register and close" do @@ -20,7 +22,7 @@ plugin.close end - describe "receive" do + describe "receive method while action is 'insert'" do subject! { LogStash::Outputs::Mongodb.new(config) } let(:event) { LogStash::Event.new(properties) } @@ -32,7 +34,7 @@ allow(Mongo::Client).to receive(:new).and_return(connection) allow(connection).to receive(:use).and_return(client) allow(client).to receive(:[]).and_return(collection) - allow(collection).to receive(:insert_one) + allow(collection).to receive(:bulk_write) subject.register end @@ -40,45 +42,50 @@ subject.close end - describe "#send" do + describe "when processing an event" do let(:properties) {{ "message" => "This is a message!", - "uuid" => SecureRandom.uuid, + "uuid" => "00000000-0000-0000-0000-000000000000", "number" => BigDecimal.new("4321.1234"), "utf8" => "żółć" }} it "should send the event to the database" do - expect(collection).to receive(:insert_one) + expect(collection).to receive(:bulk_write) subject.receive(event) end end - describe "no event @timestamp" do + describe "when processing an event without @timestamp set" do let(:properties) { { "message" => "foo" } } - it "should not contain a @timestamp field in the mongo document" do + it "should send a document without @timestamp field to mongodb" do expect(event).to receive(:timestamp).and_return(nil) expect(event).to receive(:to_hash).and_return(properties) - expect(collection).to receive(:insert_one).with(properties) + expect(collection).to receive(:bulk_write).with( + [ {:insert_one => properties} ] + ) subject.receive(event) end end - describe "generateId" do + describe "when generateId is set" do let(:properties) { { "message" => "foo" } } let(:config) {{ "uri" => uri, "database" => database, "collection" => collection, - "generateId" => true + "generateId" => true, + "action" => "insert" }} - it "should contain a BSON::ObjectId as _id" do + it "should send a document containing a BSON::ObjectId as _id to mongodb" do expect(BSON::ObjectId).to receive(:new).and_return("BSON::ObjectId") expect(event).to receive(:timestamp).and_return(nil) expect(event).to receive(:to_hash).and_return(properties) - expect(collection).to receive(:insert_one).with(properties.merge("_id" => "BSON::ObjectId")) + expect(collection).to receive(:bulk_write).with( + [ {:insert_one => properties.merge("_id" => "BSON::ObjectId")} ] + ) subject.receive(event) end end diff --git a/spec/outputs/mongodb_replace_spec.rb b/spec/outputs/mongodb_replace_spec.rb new file mode 100644 index 0000000..85a6b03 --- /dev/null +++ b/spec/outputs/mongodb_replace_spec.rb @@ -0,0 +1,89 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/plugin" + +describe LogStash::Outputs::Mongodb do + + let(:uri) { 'mongodb://localhost:27017' } + let(:database) { 'logstash' } + let(:collection) { 'logs' } + let(:action) { 'replace' } + + let(:config) { { + "uri" => uri, + "database" => database, + "collection" => collection, + "action" => action + } } + + describe "receive method while action is 'replace'" do + subject! { LogStash::Outputs::Mongodb.new(config) } + + let(:properties) { { + "message" => "This is a message!", + "uuid" => "00000000-0000-0000-0000-000000000000", + "number" => BigDecimal.new("4321.1234"), + "utf8" => "żółć" + } } + let(:event) { LogStash::Event.new(properties) } + let(:connection) { double("connection") } + let(:client) { double("client") } + let(:collection) { double("collection") } + + before(:each) do + allow(Mongo::Client).to receive(:new).and_return(connection) + allow(connection).to receive(:use).and_return(client) + allow(client).to receive(:[]).and_return(collection) + allow(collection).to receive(:bulk_write) + subject.register + end + + after(:each) do + subject.close + end + + [ + {:filter => {"_id" => "[uuid]"}, :upsert => false, + :expected => {:filter => {"_id" => "00000000-0000-0000-0000-000000000000"}, :upsert => false} + }, + {:filter => {"%{utf8}" => "[message]"}, :upsert => nil, + :expected => {:filter => {"żółć" => "This is a message!"}, :upsert => false} + }, + {:filter => {"%{utf8}" => "[message]"}, :upsert => true, + :expected => {:filter => {"żółć" => "This is a message!"}, :upsert => true} + }, + ].each do |test| + + describe "when processing an event with :filter => '#{test[:filter]}' and :upsert => '#{test[:upsert]}'" do + + let(:config) { + configuration = { + "uri" => uri, + "database" => database, + "collection" => collection, + "action" => action + } + unless test[:filter].nil? + configuration["filter"] = test[:filter] + end + unless test[:upsert].nil? + configuration["upsert"] = test[:upsert] + end + return configuration + } + + expected = test[:expected] + it "should send that document as a replace to mongodb with :filter => '#{expected[:filter]}' and upsert => '#{expected[:upsert]}'" do + expect(event).to receive(:timestamp).and_return(nil) + expect(event).to receive(:to_hash).and_return(properties) + expect(collection).to receive(:bulk_write).with( + [{:replace_one => {:filter => expected[:filter], :replacement => properties, :upsert => expected[:upsert]}}] + ) + subject.receive(event) + end + end + + end + + end +end diff --git a/spec/outputs/mongodb_unit.rb b/spec/outputs/mongodb_unit.rb new file mode 100644 index 0000000..1cda792 --- /dev/null +++ b/spec/outputs/mongodb_unit.rb @@ -0,0 +1,48 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require_relative "../spec_helper" + +describe "mongodb unit tests" do + let(:uri) { "mongodb://localhost:27017" } + let(:database) { "logstash" } + let(:collection) { "logs" } + let(:action) { "insert" } + + let(:config) {{ + "uri" => uri, + "database" => database, + "collection" => collection, + "action" => action + }} + + subject! { LogStash::Outputs::Mongodb.new(config) } + + context "when calling apply_event_to_hash" do + + let (:event) { LogStash::Event.new({"message" => "hello", "positive" => 1, "negative" => -1}) } + + it "should preserve string type for values given the field reference syntax" do + h = {"key" => "[message]"} + applied_hash = subject.apply_event_to_hash(event, h) + expect(applied_hash["key"]).to eql "hello" + end + + it "should preserve positive int type for values given the field reference syntax" do + h = {"key" => "[positive]"} + applied_hash = subject.apply_event_to_hash(event, h) + expect(applied_hash["key"]).to eql 1 + end + + it "should preserve negative int type for values given the field reference syntax" do + h = {"key" => "[negative]"} + applied_hash = subject.apply_event_to_hash(event, h) + expect(applied_hash["key"]).to eql(-1) + end + + it "should always interpolate strings for keys given the sprintf syntax" do + h = {"key_%{positive}" => %{message}} + applied_hash = subject.apply_event_to_hash(event, h) + expect(applied_hash).to have_key("key_1") + end + end +end diff --git a/spec/outputs/mongodb_update_nested_fields_spec.rb b/spec/outputs/mongodb_update_nested_fields_spec.rb new file mode 100644 index 0000000..8cef399 --- /dev/null +++ b/spec/outputs/mongodb_update_nested_fields_spec.rb @@ -0,0 +1,89 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/plugin" + +describe LogStash::Outputs::Mongodb do + + let(:uri) { 'mongodb://localhost:27017' } + let(:database) { 'logstash' } + let(:collection) { 'logs' } + let(:action) { 'update' } + let(:filter) { {"_id" => 'foo' } } + + let(:config) { { + "uri" => uri, + "database" => database, + "collection" => collection, + "action" => action, + "filter" => filter, + } } + + describe "receive method while action is 'update'" do + subject! { LogStash::Outputs::Mongodb.new(config) } + + let(:properties) { { + "message" => "This is a message!", + "rootHashField" => { + "numFieldInHash" => 1, + "hashFieldInHash" => { + "numField": 2 + }, + "arrayFieldInHash" => ["one", "two", "three"] + }, + "rootArrayField": [ + {"strFieldInArray" => "four"}, + {"strFieldInArray" => "five"}, + {"strFieldInArray" => "six"} + ], + "nestedArrayField": [ + {"strFieldInArray" => "four", "arrayFieldInArray" => [3, 4], "hashFieldInArray" => {"numField" => 9}}, + {"strFieldInArray" => "five", "arrayFieldInArray" => [5, 6], "hashFieldInArray" => {"numField" => 10}}, + {"strFieldInArray" => "six", "arrayFieldInArray" => [7, 8], "hashFieldInArray" => {"numField" => 11}} + ] + } } + let(:event) { LogStash::Event.new(properties) } + let(:connection) { double("connection") } + let(:client) { double("client") } + let(:collection) { double("collection") } + + before(:each) do + allow(Mongo::Client).to receive(:new).and_return(connection) + allow(connection).to receive(:use).and_return(client) + allow(client).to receive(:[]).and_return(collection) + allow(collection).to receive(:bulk_write) + subject.register + end + + after(:each) do + subject.close + end + + describe "when processing an event with nested hash" do + + it "should send a document update to mongodb with dotted notation for fields in inner hashes" do + expect(event).to receive(:timestamp).and_return(nil) + expect(event).to receive(:to_hash).and_return(properties) + expect(collection).to receive(:bulk_write).with( + [{:update_one => {:filter => {"_id" => 'foo' }, :update => {"$set" => { + "message" => "This is a message!", + "rootHashField.numFieldInHash" => 1, + "rootHashField.hashFieldInHash.numField" => 2, + "rootHashField.arrayFieldInHash" => ["one", "two", "three"], + "rootArrayField" => [ + {"strFieldInArray" => "four"}, + {"strFieldInArray" => "five"}, + {"strFieldInArray" => "six"} + ], + "nestedArrayField" => [ + {"strFieldInArray" => "four", "arrayFieldInArray" => [3, 4], "hashFieldInArray" => {"numField" => 9}}, + {"strFieldInArray" => "five", "arrayFieldInArray" => [5, 6], "hashFieldInArray" => {"numField" => 10}}, + {"strFieldInArray" => "six", "arrayFieldInArray" => [7, 8], "hashFieldInArray" => {"numField" => 11}} + ], + }}, :upsert => false}}] + ) + subject.receive(event) + end + end + + end +end diff --git a/spec/outputs/mongodb_update_spec.rb b/spec/outputs/mongodb_update_spec.rb new file mode 100644 index 0000000..7d57f6e --- /dev/null +++ b/spec/outputs/mongodb_update_spec.rb @@ -0,0 +1,121 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/plugin" + +describe LogStash::Outputs::Mongodb do + + let(:uri) { 'mongodb://mongo:27017' } + let(:database) { 'logstash' } + let(:collection) { 'logs' } + let(:action) { 'update' } + + let(:config) { { + "uri" => uri, + "database" => database, + "collection" => collection, + "action" => action + } } + + describe "receive method while action is 'update'" do + subject! { LogStash::Outputs::Mongodb.new(config) } + + let(:properties) { { + "message" => "This is a message!", + "uuid" => "00000000-0000-0000-0000-000000000000", + "number" => BigDecimal.new("4321.1234"), + "integer" => 1, + "utf8" => "żółć" + } } + let(:event) { LogStash::Event.new(properties) } + let(:connection) { double("connection") } + let(:client) { double("client") } + let(:collection) { double("collection") } + + before(:each) do + allow(Mongo::Client).to receive(:new).and_return(connection) + allow(connection).to receive(:use).and_return(client) + allow(client).to receive(:[]).and_return(collection) + allow(collection).to receive(:bulk_write) + subject.register + end + + after(:each) do + subject.close + end + + [ + {:filter => {"_id" => "[uuid]"}, :upsert => false, + :expected => {:filter => {"_id" => "00000000-0000-0000-0000-000000000000"}, :upsert => false} + }, + {:filter => {"%{utf8}" => "[message]"}, :upsert => nil, + :expected => {:filter => {"żółć" => "This is a message!"}, :upsert => false} + }, + {:filter => {"%{utf8}" => "[message]"}, :upsert => true, + :expected => {:filter => {"żółć" => "This is a message!"}, :upsert => true} + }, + # Nested hash recursion + {:filter => {"_id" => "123"}, + :update_expressions => {"$inc" => {"quantity" => "[integer]"}, + "$currentDate" => {"updated_at" => {"$type" => "timestamp"}}}, + :expected => {:filter => {"_id" => "123"}, + :update_expressions => {"$inc" => {"quantity" => 1}, + "$currentDate" => {"updated_at" => {"$type" => "timestamp"}}}, + :upsert => false} + }, + # Nested key-value substitution + {:filter => {"_id" => "123"}, + :update_expressions => {"$inc" => {"quantity" => "[integer]"}, + "$rename" => {"foo" => "[utf8]", + "bar-%{integer}" => "baz"}}, + :expected => {:filter => {"_id" => "123"}, + :update_expressions => {"$inc" => {"quantity" => 1}, + "$rename" => {"foo" => "żółć", + "bar-1" => "baz"}}, + :upsert => false} + }, + ].each do |test| + + describe "when processing an event with :filter => '#{test[:filter]}', :upsert => '#{test[:upsert]}' and merge :update and :update_expressions}'" do + let(:config) { + configuration = { + "uri" => uri, + "database" => database, + "collection" => collection, + "action" => action + } + unless test[:filter].nil? + configuration["filter"] = test[:filter] + end + unless test[:upsert].nil? + configuration["upsert"] = test[:upsert] + end + unless test[:update_expressions].nil? + configuration["update_expressions"] = test[:update_expressions] + end + return configuration + } + + expected = test[:expected] + it "should send that document as an update to mongodb with :filter => '#{expected[:filter]}', :upsert => '#{expected[:upsert]}' and :update_expressions => '#{expected[:update_expressions]}'" do + + expect(event).to receive(:timestamp).and_return(nil) + expect(event).to receive(:to_hash).and_return(properties) + + update = if !expected[:update_expressions].nil? + expected[:update_expressions] + else + {"$set" => properties} + end + + expect(collection).to receive(:bulk_write).with( + [{:update_one => {:filter => expected[:filter], + :update => update, + :upsert => expected[:upsert]}}]) + subject.receive(event) + end + end + + end + + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 35ccf31..9550545 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -2,6 +2,16 @@ require "logstash/devutils/rspec/spec_helper" require "logstash/outputs/mongodb" +if ENV["TEST_DEBUG"] + LogStash::Logging::Logger::configure_logging("DEBUG") +else + LogStash::Logging::Logger::configure_logging("OFF") +end + +RSpec.configure do |config| + config.example_status_persistence_file_path = 'spec/test-report.txt' +end + RSpec::Matchers.define :have_received do |event| match do |subject| client = subject.instance_variable_get("@db")