Skip to content

Commit

Permalink
Add NATS output plugin.
Browse files Browse the repository at this point in the history
Added NATS server container needed for tests.

Added NATS output plug-in. Fixes #1487

NATS output plug-in use internal.GetTLSConfig to instrument TLS configuration.

Added NATS output plug-in to changelog.

closes #1487
closes #1697
  • Loading branch information
pires authored and sparrc committed Sep 6, 2016
1 parent 1271f9d commit 6b1cc67
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- [#1471](https://github.com/influxdata/telegraf/pull/1471): iptables input plugin.
- [#1542](https://github.com/influxdata/telegraf/pull/1542): Add filestack webhook plugin.
- [#1599](https://github.com/influxdata/telegraf/pull/1599): Add server hostname for each docker measurements.
- [#1697](https://github.com/influxdata/telegraf/pull/1697): Add NATS output plugin.

### Bugfixes

Expand Down
4 changes: 2 additions & 2 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ github.com/matttproud/golang_protobuf_extensions d0c3fe89de86839aecf2e0579c40ba3
github.com/miekg/dns cce6c130cdb92c752850880fd285bea1d64439dd
github.com/mreiferson/go-snappystream 028eae7ab5c4c9e2d1cb4c4ca1e53259bbe7e504
github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b
github.com/nats-io/nats b13fc9d12b0b123ebc374e6b808c6228ae4234a3
github.com/nats-io/nuid 4f84f5f3b2786224e336af2e13dba0a0a80b76fa
github.com/nats-io/nats ea8b4fd12ebb823073c0004b9f09ac8748f4f165
github.com/nats-io/nuid a5152d67cf63cbfb5d992a395458722a45194715
github.com/nsqio/go-nsq 0b80d6f05e15ca1930e0c5e1d540ed627e299980
github.com/opencontainers/runc 89ab7f2ccc1e45ddf6485eaa802c35dcf321dfc8
github.com/prometheus/client_golang 18acf9993a863f4c4b40612e19cdd243e7c86831
Expand Down
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ docker-run:
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d blalor/riemann
docker run --name nats -p "4222:4222" -d nats

# Run docker containers necessary for CircleCI unit tests
docker-run-circle:
Expand All @@ -68,11 +69,12 @@ docker-run-circle:
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d blalor/riemann
docker run --name nats -p "4222:4222" -d nats

# Kill all docker containers, ignore errors
docker-kill:
-docker kill nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann
-docker rm nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann
-docker kill nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats
-docker rm nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats

# Run full unit tests using docker containers (includes setup and teardown)
test: vet docker-kill docker-run
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ want to add support for another service or third-party API.
* [kafka](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/kafka)
* [librato](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/librato)
* [mqtt](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/mqtt)
* [nats](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/nats)
* [nsq](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/nsq)
* [opentsdb](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/opentsdb)
* [prometheus](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/prometheus_client)
Expand Down
1 change: 1 addition & 0 deletions plugins/outputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/kinesis"
_ "github.com/influxdata/telegraf/plugins/outputs/librato"
_ "github.com/influxdata/telegraf/plugins/outputs/mqtt"
_ "github.com/influxdata/telegraf/plugins/outputs/nats"
_ "github.com/influxdata/telegraf/plugins/outputs/nsq"
_ "github.com/influxdata/telegraf/plugins/outputs/opentsdb"
_ "github.com/influxdata/telegraf/plugins/outputs/prometheus_client"
Expand Down
37 changes: 37 additions & 0 deletions plugins/outputs/nats/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# NATS Output Plugin

This plugin writes to a (list of) specified NATS instance(s).

```
[[outputs.nats]]
## URLs of NATS servers
servers = ["nats://localhost:4222"]
## Optional credentials
# username = ""
# password = ""
## NATS subject for producer messages
subject = "telegraf"
## Optional TLS Config
## CA certificate used to self-sign NATS server(s) TLS certificate(s)
# tls_ca = "/etc/telegraf/ca.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Data format to output.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
```

### Required parameters:

* `servers`: List of strings, this is for NATS clustering support. Each URL should start with `nats://`.
* `subject`: The NATS subject to publish to.

### Optional parameters:

* `username`: Username for NATS
* `password`: Password for NATS
* `tls_ca`: TLS CA
* `insecure_skip_verify`: Use SSL but skip chain & host verification (default: false)
129 changes: 129 additions & 0 deletions plugins/outputs/nats/nats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package nats

import (
"fmt"

nats_client "github.com/nats-io/nats"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)

type NATS struct {
// Servers is the NATS server pool to connect to
Servers []string
// Credentials
Username string
Password string
// NATS subject to publish metrics to
Subject string

// Path to CA file
CAFile string `toml:"tls_ca"`

// Skip SSL verification
InsecureSkipVerify bool

conn *nats_client.Conn
serializer serializers.Serializer
}

var sampleConfig = `
## URLs of NATS servers
servers = ["nats://localhost:4222"]
## Optional credentials
# username = ""
# password = ""
## NATS subject for producer messages
subject = "telegraf"
## Optional TLS Config
## CA certificate used to self-sign NATS server(s) TLS certificate(s)
# tls_ca = "/etc/telegraf/ca.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Data format to output.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
`

func (n *NATS) SetSerializer(serializer serializers.Serializer) {
n.serializer = serializer
}

func (n *NATS) Connect() error {
var err error
// set NATS connection options
opts := nats_client.DefaultOptions
opts.Servers = n.Servers
if n.Username != "" {
opts.User = n.Username
opts.Password = n.Password
}

// is TLS enabled?
tlsConfig, err := internal.GetTLSConfig(
"", "", n.CAFile, n.InsecureSkipVerify)
if err != nil {
return err
}
if tlsConfig != nil {
// set NATS connection TLS options
opts.Secure = true
opts.TLSConfig = tlsConfig
}

// try and connect
n.conn, err = opts.Connect()

return err
}

func (n *NATS) Close() error {
n.conn.Close()
return nil
}

func (n *NATS) SampleConfig() string {
return sampleConfig
}

func (n *NATS) Description() string {
return "Send telegraf measurements to NATS"
}

func (n *NATS) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}

for _, metric := range metrics {
values, err := n.serializer.Serialize(metric)
if err != nil {
return err
}

var pubErr error
for _, value := range values {
err = n.conn.Publish(n.Subject, []byte(value))
if err != nil {
pubErr = err
}
}

if pubErr != nil {
return fmt.Errorf("FAILED to send NATS message: %s", err)
}
}
return nil
}

func init() {
outputs.Add("nats", func() telegraf.Output {
return &NATS{}
})
}
31 changes: 31 additions & 0 deletions plugins/outputs/nats/nats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package nats

import (
"testing"

"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

func TestConnectAndWrite(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

server := []string{"nats://" + testutil.GetLocalHost() + ":4222"}
s, _ := serializers.NewInfluxSerializer()
n := &NATS{
Servers: server,
Subject: "telegraf",
serializer: s,
}

// Verify that we can connect to the NATS daemon
err := n.Connect()
require.NoError(t, err)

// Verify that we can successfully write data to the NATS daemon
err = n.Write(testutil.MockMetrics())
require.NoError(t, err)
}

0 comments on commit 6b1cc67

Please sign in to comment.