Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/1363: Additional configuration for JSON parser #4351

Merged
merged 21 commits into from
Aug 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 126 additions & 11 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,31 @@ but can be overridden using the `name_override` config option.

#### JSON Configuration:

The JSON data format supports specifying "tag keys". If specified, keys
will be searched for in the root-level of the JSON blob. If the key(s) exist,
they will be applied as tags to the Telegraf metrics.
The JSON data format supports specifying "tag_keys", "string_keys", and "json_query".
If specified, keys in "tag_keys" and "string_keys" will be searched for in the root-level
and any nested lists of the JSON blob. All int and float values are added to fields by default.
If the key(s) exist, they will be applied as tags or fields to the Telegraf metrics.
If "string_keys" is specified, the string will be added as a field.

The "json_query" configuration is a gjson path to an JSON object or
list of JSON objects. If this path leads to an array of values or
single data point an error will be thrown. If this configuration
is specified, only the result of the query will be parsed and returned as metrics.

The "json_name_key" configuration specifies the key of the field whos value will be
added as the metric name.

Object paths are specified using gjson path format, which is denoted by object keys
concatenated with "." to go deeper in nested JSON objects.
Additional information on gjson paths can be found here: https://github.com/tidwall/gjson#path-syntax

The JSON data format also supports extracting time values through the
config "json_time_key" and "json_time_format". If "json_time_key" is set,
"json_time_format" must be specified. The "json_time_key" describes the
name of the field containing time information. The "json_time_format"
must be a recognized Go time format.
If there is no year provided, the metrics will have the current year.
More info on time formats can be found here: https://golang.org/pkg/time/#Parse

For example, if you had this configuration:

Expand All @@ -126,11 +148,28 @@ For example, if you had this configuration:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "json"

## List of tag names to extract from top-level of JSON server response
## List of tag names to extract from JSON server response
tag_keys = [
"my_tag_1",
"my_tag_2"
]

## The json path specifying where to extract the metric name from
# json_name_key = ""

## List of field names to extract from JSON and add as string fields
# json_string_fields = []

## gjson query path to specify a specific chunk of JSON to be parsed with
## the above configuration. If not specified, the whole file will be parsed.
## gjson query paths are described here: https://github.com/tidwall/gjson#path-syntax
# json_query = ""

## holds the name of the tag of timestamp
# json_time_key = ""

## holds the format of timestamp to be parsed
# json_time_format = ""
```

with this JSON output from a command:
Expand All @@ -151,8 +190,9 @@ Your Telegraf metrics would get tagged with "my_tag_1"
exec_mycollector,my_tag_1=foo a=5,b_c=6
```

If the JSON data is an array, then each element of the array is parsed with the configured settings.
Each resulting metric will be output with the same timestamp.
If the JSON data is an array, then each element of the array is
parsed with the configured settings. Each resulting metric will
be output with the same timestamp.

For example, if the following configuration:

Expand All @@ -175,6 +215,19 @@ For example, if the following configuration:
"my_tag_1",
"my_tag_2"
]

## List of field names to extract from JSON and add as string fields
# string_fields = []

## gjson query path to specify a specific chunk of JSON to be parsed with
## the above configuration. If not specified, the whole file will be parsed
# json_query = ""

## holds the name of the tag of timestamp
json_time_key = "b_time"

## holds the format of timestamp to be parsed
json_time_format = "02 Jan 06 15:04 MST"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting example time format because there it doesn't include a year. This results in a pared time that is in year 0. I think after parsing the time we should do something similar to what we did in logparser and use the current year. While this would be pretty annoying if you actually want to store in a metric from year 0, I think it is much more likely to be what is wanted:

if ts.Year() == 0 {
	ts = ts.AddDate(timestamp.Year(), 0, 0)
}

```

with this JSON output from a command:
Expand All @@ -184,27 +237,89 @@ with this JSON output from a command:
{
"a": 5,
"b": {
"c": 6
"c": 6,
"time":"04 Jan 06 15:04 MST"
},
"my_tag_1": "foo",
"my_tag_2": "baz"
},
{
"a": 7,
"b": {
"c": 8
"c": 8,
"time":"11 Jan 07 15:04 MST"
},
"my_tag_1": "bar",
"my_tag_2": "baz"
}
]
```

Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2"
Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2" and fielded with "b_c"
The metric's time will be a time.Time object, as specified by "b_time"

```
exec_mycollector,my_tag_1=foo,my_tag_2=baz b_c=6 1136387040000000000
exec_mycollector,my_tag_1=bar,my_tag_2=baz b_c=8 1168527840000000000
```

If you want to only use a specific portion of your JSON, use the "json_query"
configuration to specify a path to a JSON object.

For example, with the following config:
```toml
[[inputs.exec]]
## Commands array
commands = ["/usr/bin/mycollector --foo=bar"]

## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "json"

## List of tag names to extract from top-level of JSON server response
tag_keys = ["first"]

## List of field names to extract from JSON and add as string fields
string_fields = ["last"]

## gjson query path to specify a specific chunk of JSON to be parsed with
## the above configuration. If not specified, the whole file will be parsed
json_query = "obj.friends"

## holds the name of the tag of timestamp
# json_time_key = ""

## holds the format of timestamp to be parsed
# json_time_format = ""
```

with this JSON as input:
```json
{
"obj": {
"name": {"first": "Tom", "last": "Anderson"},
"age":37,
"children": ["Sara","Alex","Jack"],
"fav.movie": "Deer Hunter",
"friends": [
{"first": "Dale", "last": "Murphy", "age": 44},
{"first": "Roger", "last": "Craig", "age": 68},
{"first": "Jane", "last": "Murphy", "age": 47}
]
}
}
```
You would recieve 3 metrics tagged with "first", and fielded with "last" and "age"

```
exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6
exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8
exec_mycollector, "first":"Dale" "last":"Murphy","age":44
exec_mycollector, "first":"Roger" "last":"Craig","age":68
exec_mycollector, "first":"Jane" "last":"Murphy","age":47
```

# Value:
Expand Down
49 changes: 49 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,50 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
}
}

if node, ok := tbl.Fields["json_string_fields"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.JSONStringFields = append(c.JSONStringFields, str.Value)
}
}
}
}
}

if node, ok := tbl.Fields["json_name_key"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.JSONNameKey = str.Value
}
}
}

if node, ok := tbl.Fields["json_query"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.JSONQuery = str.Value
}
}
}

if node, ok := tbl.Fields["json_time_key"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.JSONTimeKey = str.Value
}
}
}

if node, ok := tbl.Fields["json_time_format"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.JSONTimeFormat = str.Value
}
}
}

if node, ok := tbl.Fields["data_type"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
Expand Down Expand Up @@ -1405,6 +1449,11 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
delete(tbl.Fields, "separator")
delete(tbl.Fields, "templates")
delete(tbl.Fields, "tag_keys")
delete(tbl.Fields, "string_fields")
delete(tbl.Fields, "json_query")
delete(tbl.Fields, "json_name_key")
delete(tbl.Fields, "json_time_key")
delete(tbl.Fields, "json_time_format")
delete(tbl.Fields, "data_type")
delete(tbl.Fields, "collectd_auth_file")
delete(tbl.Fields, "collectd_security_level")
Expand Down
5 changes: 4 additions & 1 deletion internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ func TestConfig_LoadDirectory(t *testing.T) {
"Testdata did not produce correct memcached metadata.")

ex := inputs.Inputs["exec"]().(*exec.Exec)
p, err := parsers.NewJSONParser("exec", nil, nil)
p, err := parsers.NewParser(&parsers.Config{
MetricName: "exec",
DataFormat: "json",
})
assert.NoError(t, err)
ex.SetParser(p)
ex.Command = "/usr/bin/myothercollector --foo=bar"
Expand Down
15 changes: 12 additions & 3 deletions plugins/inputs/exec/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ func (r runnerMock) Run(e *Exec, command string, acc telegraf.Accumulator) ([]by
}

func TestExec(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock([]byte(validJson), nil),
Commands: []string{"testcommand arg1"},
Expand All @@ -119,7 +122,10 @@ func TestExec(t *testing.T) {
}

func TestExecMalformed(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock([]byte(malformedJson), nil),
Commands: []string{"badcommand arg1"},
Expand All @@ -132,7 +138,10 @@ func TestExecMalformed(t *testing.T) {
}

func TestCommandError(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")),
Commands: []string{"badcommand"},
Expand Down
24 changes: 18 additions & 6 deletions plugins/inputs/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ func TestHTTPwithJSONFormat(t *testing.T) {
URLs: []string{url},
}
metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)

p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
plugin.SetParser(p)

var acc testutil.Accumulator
Expand Down Expand Up @@ -63,8 +67,11 @@ func TestHTTPHeaders(t *testing.T) {
URLs: []string{url},
Headers: map[string]string{header: headerValue},
}
metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)

p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
plugin.SetParser(p)

var acc testutil.Accumulator
Expand All @@ -83,7 +90,10 @@ func TestInvalidStatusCode(t *testing.T) {
}

metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)
p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: metricName,
})
plugin.SetParser(p)

var acc testutil.Accumulator
Expand All @@ -105,8 +115,10 @@ func TestMethod(t *testing.T) {
Method: "POST",
}

metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)
p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
plugin.SetParser(p)

var acc testutil.Accumulator
Expand Down
7 changes: 6 additions & 1 deletion plugins/inputs/httpjson/httpjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,12 @@ func (h *HttpJson) gatherServer(
"server": serverURL,
}

parser, err := parsers.NewJSONParser(msrmnt_name, h.TagKeys, tags)
parser, err := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: msrmnt_name,
TagKeys: h.TagKeys,
DefaultTags: tags,
})
if err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion plugins/inputs/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
k.acc = &acc
defer close(k.done)

k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
k.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "kafka_json_test",
})
go k.receiver()
in <- saramaMsg(testMsgJSON)
acc.Wait(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
k.acc = &acc
defer close(k.done)

k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
k.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "kafka_json_test",
})
go k.receiver()
in <- saramaMsg(testMsgJSON)
acc.Wait(1)
Expand Down
Loading