Skip to content

Mongo plugin with actions #76

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
Gemfile.lock
.bundle
vendor
logs
76 changes: 64 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,66 @@
# Logstash Plugin
# Logstash Mongo Output Plugin

[![Travis Build Status](https://travis-ci.org/logstash-plugins/logstash-output-mongodb.svg)](https://travis-ci.org/logstash-plugins/logstash-output-mongodb)
---
This is a fork of [logstash-plugins/logstash-output-mongodb](https://github.com/logstash-plugins/logstash-output-mongodb).

It adds the :action, :filter, :update_expressions and :upsert parameters
---

This is a plugin for [Logstash](https://github.com/elastic/logstash).

It is fully free and fully open source. The license is Apache 2.0, meaning you are pretty much free to use it however you want in whatever way.

## Example Usage

``` ruby
output {
if [@metadata][event_type] == "product.event.created" {
mongodb {
id => "orders.projection.mongodb.product-insert"
uri => "${MONGO_DNS}"
collection => "product-aggregates"
database => "carts"
isodate => true
action => "update"
upsert => true
}
}

if [@metadata][event_type] == "product.event.updated" or [@metadata][event_type] == "product.event.deleted" {
mongodb {
id => "orders.projection.mongodb.product-update"
uri => "${MONGO_DNS}"
collection => "product-aggregates"
database => "carts"
isodate => true
action => "update"
filter => {
"_id" => "[_id]"
"store_id" => "[store_id]"
}
}
}

if [@metadata][event_type] == "stock.updated" and [quantity] > 0 {
mongodb {
id => "orders.projection.mongodb.stock-update"
uri => "${MONGO_DNS}"
collection => "product-aggregates"
database => "carts"
isodate => true
action => "update"
filter => {
"_id" => "[_id]"
"store_id" => "[store_id]"
}
update_expressions => {
"$inc" => {"stock" => "[stock_delta]"}
}
}
}
}
```

## Documentation

Logstash provides infrastructure to automatically generate documentation for this plugin. We use the asciidoc format to write documentation so any comments in the source code will be first converted into asciidoc and then into html. All plugin documentation are placed under one [central location](http://www.elastic.co/guide/en/logstash/current/).
Expand All @@ -21,20 +76,17 @@ Need help? Try #logstash on freenode IRC or the https://discuss.elastic.co/c/log

### 1. Plugin Developement and Testing

#### Code
- To get started, you'll need JRuby with the Bundler gem installed.
For developing this plugin we use the wonderful work of [cameronkerrnz/logstash-plugin-dev](https://github.com/cameronkerrnz/logstash-plugin-dev):

- Create a new plugin or clone and existing from the GitHub [logstash-plugins](https://github.com/logstash-plugins) organization. We also provide [example plugins](https://github.com/logstash-plugins?query=example).
To start an interactive environment run:

- Install dependencies
```sh
bundle install
``` sh
docker run --rm -it -v ${PWD}:/work cameronkerrnz/logstash-plugin-dev:7.9
```

#### Test

- Update your dependencies
After that you can run the usual suspects:

- Install/Update dependencies
```sh
bundle install
```
Expand Down Expand Up @@ -95,4 +147,4 @@ Programming is not a required skill. Whatever you've seen about open source and

It is more important to the community that you are able to contribute.

For more information about contributing, see the [CONTRIBUTING](https://github.com/elastic/logstash/blob/master/CONTRIBUTING.md) file.
For more information about contributing, see the [CONTRIBUTING](https://github.com/elastic/logstash/blob/master/CONTRIBUTING.md) file.
48 changes: 48 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-isodate>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-retry_delay>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-uri>> |<<string,string>>|Yes
| <<plugins-{type}s-{plugin}-action>> |<<string,string>>|Yes
| <<plugins-{type}s-{plugin}-query_key>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-query_value>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-upsert>> |<<boolean,boolean>>|No
|=======================================================================

Also see <<plugins-{type}s-{plugin}-common-options>> for a list of options supported by all
Expand Down Expand Up @@ -129,6 +133,50 @@ The number of seconds to wait after failure before retrying.
A MongoDB URI to connect to.
See http://docs.mongodb.org/manual/reference/connection-string/.

[id="plugins-{type}s-{plugin}-action"]
===== `action`

* Value type is <<string,string>>
* Default value is `insert`.

The method used to write processed events to MongoDB.

Possible values are `insert`, `update` or `replace`.

[id="plugins-{type}s-{plugin}-query_key"]
===== `query_key`

* Value type is <<string,string>>
* Default value is `_id`.

The key of the query to find the document to update or replace in MongoDB.

query_key is used like described https://docs.mongodb.com/ruby-driver/v2.6/tutorials/ruby-driver-bulk-operations[here]
for `update` and `replace` examples:

:filter => {query_key => query_value}

[id="plugins-{type}s-{plugin}-query_value"]
===== `query_value`

* Value type is <<string,string>>
* There is no default value for this setting.

The value of the query to find the document to update or replace in MongoDB. This can be dynamic using the `%{foo}` syntax.

query_value is used like described https://docs.mongodb.com/ruby-driver/v2.6/tutorials/ruby-driver-bulk-operations[here]
for `update` and `replace` examples:

:filter => {query_key => query_value}

[id="plugins-{type}s-{plugin}-upsert"]
===== `upsert`

* Value type is <<boolean,boolean>>
* Default value is `false`.

If true, a new document is created if no document exists in DB with given `document_id`.
Only applies if action is `update` or `replace`.


[id="plugins-{type}s-{plugin}-common-options"]
Expand Down
2 changes: 1 addition & 1 deletion lib/logstash/outputs/bson/big_decimal.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ module BigDecimal
# 1.221311.to_bson
# @return [ String ] The encoded string.
# @see http://bsonspec.org/#/specification
def to_bson(buffer = ByteBuffer.new)
def to_bson(buffer = ByteBuffer.new, validating_keys = Config.validating_keys?)
buffer.put_bytes([ self ].pack(PACK))
end

Expand Down
2 changes: 1 addition & 1 deletion lib/logstash/outputs/bson/logstash_event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module LogStashEvent
# Event.new("field" => "value").to_bson
# @return [ String ] The encoded string.
# @see http://bsonspec.org/#/specification
def to_bson(buffer = ByteBuffer.new)
def to_bson(buffer = ByteBuffer.new, validating_keys = Config.validating_keys?)
position = buffer.length
buffer.put_int32(0)
to_hash.each do |field, value|
Expand Down
2 changes: 1 addition & 1 deletion lib/logstash/outputs/bson/logstash_timestamp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ module LogStashTimestamp
# A time is type 0x09 in the BSON spec.
BSON_TYPE = 9.chr.force_encoding(BINARY).freeze

def to_bson(buffer = ByteBuffer.new)
def to_bson(buffer = ByteBuffer.new, validating_keys = Config.validating_keys?)
time.to_bson(buffer)
end

Expand Down
Loading