From 83e3b50c93e75a6fcdd75c30e30485472b81f91f Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Thu, 16 Jul 2020 18:10:19 +0530 Subject: [PATCH 1/4] Fixed bson arguement error and changed mongo and bson version (#1) Co-authored-by: singhksandeep --- lib/logstash/outputs/bson/big_decimal.rb | 2 +- lib/logstash/outputs/bson/logstash_event.rb | 2 +- lib/logstash/outputs/bson/logstash_timestamp.rb | 2 +- logstash-output-mongodb.gemspec | 7 ++++--- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/lib/logstash/outputs/bson/big_decimal.rb b/lib/logstash/outputs/bson/big_decimal.rb index f7a2c89..3b5d4ef 100644 --- a/lib/logstash/outputs/bson/big_decimal.rb +++ b/lib/logstash/outputs/bson/big_decimal.rb @@ -33,7 +33,7 @@ module BigDecimal # 1.221311.to_bson # @return [ String ] The encoded string. # @see http://bsonspec.org/#/specification - def to_bson(buffer = ByteBuffer.new) + def to_bson(buffer = ByteBuffer.new, validating_keys = Config.validating_keys?) buffer.put_bytes([ self ].pack(PACK)) end diff --git a/lib/logstash/outputs/bson/logstash_event.rb b/lib/logstash/outputs/bson/logstash_event.rb index 7dcd4c2..9b4e17c 100644 --- a/lib/logstash/outputs/bson/logstash_event.rb +++ b/lib/logstash/outputs/bson/logstash_event.rb @@ -30,7 +30,7 @@ module LogStashEvent # Event.new("field" => "value").to_bson # @return [ String ] The encoded string. # @see http://bsonspec.org/#/specification - def to_bson(buffer = ByteBuffer.new) + def to_bson(buffer = ByteBuffer.new, validating_keys = Config.validating_keys?) position = buffer.length buffer.put_int32(0) to_hash.each do |field, value| diff --git a/lib/logstash/outputs/bson/logstash_timestamp.rb b/lib/logstash/outputs/bson/logstash_timestamp.rb index cb2141f..6eae0f0 100644 --- a/lib/logstash/outputs/bson/logstash_timestamp.rb +++ b/lib/logstash/outputs/bson/logstash_timestamp.rb @@ -25,7 +25,7 @@ module LogStashTimestamp # A time is type 0x09 in the BSON spec. BSON_TYPE = 9.chr.force_encoding(BINARY).freeze - def to_bson(buffer = ByteBuffer.new) + def to_bson(buffer = ByteBuffer.new, validating_keys = Config.validating_keys?) time.to_bson(buffer) end diff --git a/logstash-output-mongodb.gemspec b/logstash-output-mongodb.gemspec index e198114..d562656 100644 --- a/logstash-output-mongodb.gemspec +++ b/logstash-output-mongodb.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-output-mongodb' - s.version = '3.1.6' - s.licenses = ['Apache License (2.0)'] + s.version = '3.1.7' + s.licenses = ['Apache-2.0'] s.summary = "Writes events to MongoDB" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" s.authors = ["Elastic"] @@ -21,7 +21,8 @@ Gem::Specification.new do |s| # Gem dependencies s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_runtime_dependency 'logstash-codec-plain' - s.add_runtime_dependency 'mongo', '~> 2.6' + s.add_runtime_dependency 'mongo', '~> 2.11.4' + s.add_runtime_dependency 'bson', '~> 4.8.2' s.add_development_dependency 'logstash-devutils' end From e23b4076eaeb39f360dad7ff33eafb515ea9ccd1 Mon Sep 17 00:00:00 2001 From: Aaron Dsouza Date: Tue, 4 May 2021 12:20:04 +0530 Subject: [PATCH 2/4] Fixed bson issue --- Gemfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Gemfile b/Gemfile index 32cc6fb..fffa312 100644 --- a/Gemfile +++ b/Gemfile @@ -2,7 +2,7 @@ source 'https://rubygems.org' gemspec -logstash_path = ENV["LOGSTASH_PATH"] || "../../logstash" +logstash_path = ENV["LOGSTASH_PATH"] || "." use_logstash_source = ENV["LOGSTASH_SOURCE"] && ENV["LOGSTASH_SOURCE"].to_s == "1" if Dir.exist?(logstash_path) && use_logstash_source From 66c35632fb239fe8d333655140f8ff951739bf8f Mon Sep 17 00:00:00 2001 From: Mathias Gebbe Date: Mon, 31 Jul 2023 17:12:21 +0200 Subject: [PATCH 3/4] feat: BigDecimal.new is deprecated; use BigDecimal() method instead --- lib/logstash/outputs/bson/big_decimal.rb | 2 +- spec/bson/big_decimal_spec.rb | 2 +- spec/integration/mongodb_spec.rb | 2 +- spec/outputs/mongodb_spec.rb | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/logstash/outputs/bson/big_decimal.rb b/lib/logstash/outputs/bson/big_decimal.rb index 3b5d4ef..fd06f9e 100644 --- a/lib/logstash/outputs/bson/big_decimal.rb +++ b/lib/logstash/outputs/bson/big_decimal.rb @@ -50,7 +50,7 @@ def from_bson(bson) private def from_bson_double(double) - new(double.unpack(PACK).first.to_s) + BigDecimal(double.unpack(PACK).first.to_s) end end diff --git a/spec/bson/big_decimal_spec.rb b/spec/bson/big_decimal_spec.rb index b515e9d..3abfbd4 100644 --- a/spec/bson/big_decimal_spec.rb +++ b/spec/bson/big_decimal_spec.rb @@ -23,7 +23,7 @@ describe "class methods" do it "builds a new BigDecimal from BSON" do decoded = described_class.from_bson(4321.1234.to_bson) - expect(decoded).to eql(BigDecimal.new(a_number)) + expect(decoded).to eql(BigDecimal(a_number)) end end end diff --git a/spec/integration/mongodb_spec.rb b/spec/integration/mongodb_spec.rb index 690e8fa..bdaf1a6 100644 --- a/spec/integration/mongodb_spec.rb +++ b/spec/integration/mongodb_spec.rb @@ -18,7 +18,7 @@ subject { LogStash::Outputs::Mongodb.new(config) } let(:properties) { { "message" => "This is a message!", - "uuid" => uuid, "number" => BigDecimal.new("4321.1234"), + "uuid" => uuid, "number" => BigDecimal("4321.1234"), "utf8" => "żółć", "int" => 42, "arry" => [42, "string", 4321.1234]} } let(:event) { LogStash::Event.new(properties) } diff --git a/spec/outputs/mongodb_spec.rb b/spec/outputs/mongodb_spec.rb index 8d3decb..3cecfd4 100644 --- a/spec/outputs/mongodb_spec.rb +++ b/spec/outputs/mongodb_spec.rb @@ -44,7 +44,7 @@ let(:properties) {{ "message" => "This is a message!", "uuid" => SecureRandom.uuid, - "number" => BigDecimal.new("4321.1234"), + "number" => BigDecimal("4321.1234"), "utf8" => "żółć" }} From d70d648f809e26d8382b5a33f1e9e89e28ca8480 Mon Sep 17 00:00:00 2001 From: Mathias Gebbe Date: Mon, 31 Jul 2023 17:20:43 +0200 Subject: [PATCH 4/4] feat: allow bulk upsert and set custom field used for _id --- lib/logstash/outputs/mongodb.rb | 41 ++++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/lib/logstash/outputs/mongodb.rb b/lib/logstash/outputs/mongodb.rb index 0b88c68..adc3e88 100644 --- a/lib/logstash/outputs/mongodb.rb +++ b/lib/logstash/outputs/mongodb.rb @@ -34,6 +34,12 @@ class LogStash::Outputs::Mongodb < LogStash::Outputs::Base # "_id" field in the event. config :generateId, :validate => :boolean, :default => false + # The field that will be used for the _id field + # This can be for example the ID column of a SQL table when using JDBC. + config :idField, :validate => :string, :required => false + + # Upsert documents flag, set to true to use replace_one instead of insert_one. + config :upsert, :validate => :boolean, :default => false # Bulk insert flag, set to true to allow bulk insertion, else it will insert events one by one. config :bulk, :validate => :boolean, :default => false @@ -65,7 +71,14 @@ def register @@mutex.synchronize do @documents.each do |collection, values| if values.length > 0 - @db[collection].insert_many(values) + if @upsert + bulk_operations = values.map do |doc| + { replace_one: { filter: { _id: doc["_id"] }, replacement: doc, upsert: true } } + end + @db[collection].bulk_write(bulk_operations) + else + @db[collection].insert_many(values) + end @documents.delete(collection) end end @@ -94,6 +107,17 @@ def receive(event) document["_id"] = BSON::ObjectId.new end + if @idField + field_name = event.sprintf(@idField) + if event.include?(field_name) && !event.get(field_name).nil? + document["_id"] = event.get(field_name) + else + @logger.warn("Cannot set MongoDB document `_id` field because it does not exist in the event", :event => event) + document["_id"] = BSON::ObjectId.new + end + end + + if @bulk collection = event.sprintf(@collection) @@mutex.synchronize do @@ -103,12 +127,23 @@ def receive(event) @documents[collection].push(document) if(@documents[collection].length >= @bulk_size) - @db[collection].insert_many(@documents[collection]) + if @upsert && document.key?("_id") + bulk_operations = @documents[collection].map do |doc| + { replace_one: { filter: { _id: doc["_id"] }, replacement: doc, upsert: true } } + end + @db[collection].bulk_write(bulk_operations) + else + @db[collection].insert_many(@documents[collection]) + end @documents.delete(collection) end end else - @db[event.sprintf(@collection)].insert_one(document) + if @upsert && document.key?("_id") + @db[event.sprintf(@collection)].replace_one({ _id: document["_id"] }, document, { upsert: true }) + else + @db[event.sprintf(@collection)].insert_one(document) + end end rescue => e if e.message =~ /^E11000/