Skip to content

Commit

Permalink
Implement HTTP Output Compression
Browse files Browse the repository at this point in the history
  • Loading branch information
mihaitodor committed Oct 5, 2018
1 parent 6bf5643 commit 26ee478
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 11 deletions.
4 changes: 4 additions & 0 deletions plugins/outputs/http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,8 @@ data formats. For data_formats that support batching, metrics are sent in batch
# [outputs.http.headers]
# # Should be set manually to "application/json" for json data_format
# Content-Type = "text/plain; charset=utf-8"

## HTTP Content-Encoding for write request body, can be set to "gzip" to
## compress body or "identity" to apply no encoding.
# content_encoding = "identity"
```
57 changes: 46 additions & 11 deletions plugins/outputs/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package http

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
Expand Down Expand Up @@ -55,6 +57,10 @@ var sampleConfig = `
# [outputs.http.headers]
# # Should be set manually to "application/json" for json data_format
# Content-Type = "text/plain; charset=utf-8"
## HTTP Content-Encoding for write request body, can be set to "gzip" to
## compress body or "identity" to apply no encoding.
# content_encoding = "identity"
`

const (
Expand All @@ -64,16 +70,17 @@ const (
)

type HTTP struct {
URL string `toml:"url"`
Timeout internal.Duration `toml:"timeout"`
Method string `toml:"method"`
Username string `toml:"username"`
Password string `toml:"password"`
Headers map[string]string `toml:"headers"`
ClientID string `toml:"client_id"`
ClientSecret string `toml:"client_secret"`
TokenURL string `toml:"token_url"`
Scopes []string `toml:"scopes"`
URL string `toml:"url"`
Timeout internal.Duration `toml:"timeout"`
Method string `toml:"method"`
Username string `toml:"username"`
Password string `toml:"password"`
Headers map[string]string `toml:"headers"`
ClientID string `toml:"client_id"`
ClientSecret string `toml:"client_secret"`
TokenURL string `toml:"token_url"`
Scopes []string `toml:"scopes"`
ContentEncoding string `toml:"content_encoding"`
tls.ClientConfig

client *http.Client
Expand Down Expand Up @@ -162,7 +169,18 @@ func (h *HTTP) Write(metrics []telegraf.Metric) error {
}

func (h *HTTP) write(reqBody []byte) error {
req, err := http.NewRequest(h.Method, h.URL, bytes.NewBuffer(reqBody))
var reqBodyBuffer io.Reader = bytes.NewBuffer(reqBody)
var err error

if h.ContentEncoding == "gzip" {
reqBodyBuffer, err = compressWithGzip(reqBodyBuffer)

if err != nil {
return err
}
}

req, err := http.NewRequest(h.Method, h.URL, reqBodyBuffer)
if err != nil {
return err
}
Expand All @@ -172,6 +190,9 @@ func (h *HTTP) write(reqBody []byte) error {
}

req.Header.Set("Content-Type", defaultContentType)
if h.ContentEncoding == "gzip" {
req.Header.Set("Content-Encoding", "gzip")
}
for k, v := range h.Headers {
req.Header.Set(k, v)
}
Expand All @@ -190,6 +211,20 @@ func (h *HTTP) write(reqBody []byte) error {
return nil
}

func compressWithGzip(data io.Reader) (io.Reader, error) {
pr, pw := io.Pipe()
gw := gzip.NewWriter(pw)
var err error

go func() {
_, err = io.Copy(gw, data)
gw.Close()
pw.Close()
}()

return pr, err
}

func init() {
outputs.Add("http", func() telegraf.Output {
return &HTTP{
Expand Down
62 changes: 62 additions & 0 deletions plugins/outputs/http/http_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package http

import (
"compress/gzip"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
Expand Down Expand Up @@ -227,6 +229,66 @@ func TestContentType(t *testing.T) {
}
}

func TestContentEncodingGzip(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()

u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)

tests := []struct {
name string
plugin *HTTP
payload string
expected string
}{
{
name: "default is no content encoding",
plugin: &HTTP{
URL: u.String(),
},
expected: "",
},
{
name: "overwrite content_encoding",
plugin: &HTTP{
URL: u.String(),
ContentEncoding: "gzip",
},
expected: "gzip",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, tt.expected, r.Header.Get("Content-Encoding"))

body := r.Body
var err error
if r.Header.Get("Content-Encoding") == "gzip" {
body, err = gzip.NewReader(r.Body)
require.NoError(t, err)
}

payload, err := ioutil.ReadAll(body)
require.NoError(t, err)
require.Contains(t, string(payload), "cpu value=42")

w.WriteHeader(http.StatusNoContent)
})

serializer := influx.NewSerializer()
tt.plugin.SetSerializer(serializer)
err = tt.plugin.Connect()
require.NoError(t, err)

err = tt.plugin.Write([]telegraf.Metric{getMetric()})
require.NoError(t, err)
})
}
}

func TestBasicAuth(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
Expand Down

0 comments on commit 26ee478

Please sign in to comment.