Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/2108 - csv parser #4439

Merged
merged 27 commits into from
Aug 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a6c1e2b
unfinished csv parser
maxunt Jul 17, 2018
c839ce3
functionality for csv parser, still needs unit tests
maxunt Jul 18, 2018
3c8cb17
add unit tests for csv parser
maxunt Jul 19, 2018
4a07734
mess with config options
maxunt Jul 19, 2018
48210f5
fix unit tests
maxunt Jul 19, 2018
67f4929
change README
maxunt Jul 19, 2018
d24e687
unfinished test case for csv
maxunt Jul 25, 2018
e07ed58
fix type conversion and add unit test
maxunt Jul 25, 2018
edd8afc
addresses greg and chris's comments
maxunt Jul 27, 2018
b5ff78f
address some of greg+chris's comments. includes config for trimspace …
maxunt Jul 27, 2018
7704f3e
get rid of grok changes on branch
maxunt Jul 27, 2018
83db721
Merge branch 'master' into feature/2108
maxunt Aug 17, 2018
80135ee
initial config changes
maxunt Aug 20, 2018
339670f
Merge branch 'master' into feature/2108
maxunt Aug 20, 2018
60761d7
additional config options
maxunt Aug 20, 2018
24e38f3
start to remove field_column config
maxunt Aug 20, 2018
fc36fd5
just broke a lot. lovely
maxunt Aug 21, 2018
6e7ec3e
fixed it
maxunt Aug 22, 2018
0d7b236
address some of daniel's comments
maxunt Aug 22, 2018
20ed819
trim space manually
maxunt Aug 22, 2018
5016899
fix config
maxunt Aug 22, 2018
162b092
Merge branch 'master' into feature/2108
maxunt Aug 22, 2018
86d353f
Merge branch 'master' into feature/2108
maxunt Aug 23, 2018
c058db6
Remerge data format docs
danielnelson Aug 23, 2018
4847a59
finally fixes hopefully. error checks in registry.go
maxunt Aug 24, 2018
b408ac4
tags are added before default tags
maxunt Aug 24, 2018
acc5ea7
fix tags being removed from fields
maxunt Aug 24, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 102 additions & 20 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Telegraf is able to parse the following input data formats into metrics:
1. [Grok](#grok)
1. [Logfmt](#logfmt)
1. [Wavefront](#wavefront)
1. [CSV](#csv)

Telegraf metrics, like InfluxDB
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
Expand Down Expand Up @@ -107,28 +108,28 @@ but can be overridden using the `name_override` config option.

#### JSON Configuration:

The JSON data format supports specifying "tag_keys", "string_keys", and "json_query".
If specified, keys in "tag_keys" and "string_keys" will be searched for in the root-level
and any nested lists of the JSON blob. All int and float values are added to fields by default.
If the key(s) exist, they will be applied as tags or fields to the Telegraf metrics.
The JSON data format supports specifying "tag_keys", "string_keys", and "json_query".
If specified, keys in "tag_keys" and "string_keys" will be searched for in the root-level
and any nested lists of the JSON blob. All int and float values are added to fields by default.
If the key(s) exist, they will be applied as tags or fields to the Telegraf metrics.
If "string_keys" is specified, the string will be added as a field.

The "json_query" configuration is a gjson path to an JSON object or
list of JSON objects. If this path leads to an array of values or
single data point an error will be thrown. If this configuration
The "json_query" configuration is a gjson path to an JSON object or
list of JSON objects. If this path leads to an array of values or
single data point an error will be thrown. If this configuration
is specified, only the result of the query will be parsed and returned as metrics.

The "json_name_key" configuration specifies the key of the field whos value will be
added as the metric name.

Object paths are specified using gjson path format, which is denoted by object keys
concatenated with "." to go deeper in nested JSON objects.
Object paths are specified using gjson path format, which is denoted by object keys
concatenated with "." to go deeper in nested JSON objects.
Additional information on gjson paths can be found here: https://github.com/tidwall/gjson#path-syntax

The JSON data format also supports extracting time values through the
config "json_time_key" and "json_time_format". If "json_time_key" is set,
"json_time_format" must be specified. The "json_time_key" describes the
name of the field containing time information. The "json_time_format"
The JSON data format also supports extracting time values through the
config "json_time_key" and "json_time_format". If "json_time_key" is set,
"json_time_format" must be specified. The "json_time_key" describes the
name of the field containing time information. The "json_time_format"
must be a recognized Go time format.
If there is no year provided, the metrics will have the current year.
More info on time formats can be found here: https://golang.org/pkg/time/#Parse
Expand Down Expand Up @@ -161,8 +162,8 @@ For example, if you had this configuration:
## List of field names to extract from JSON and add as string fields
# json_string_fields = []

## gjson query path to specify a specific chunk of JSON to be parsed with
## the above configuration. If not specified, the whole file will be parsed.
## gjson query path to specify a specific chunk of JSON to be parsed with
## the above configuration. If not specified, the whole file will be parsed.
## gjson query paths are described here: https://github.com/tidwall/gjson#path-syntax
# json_query = ""

Expand Down Expand Up @@ -191,8 +192,8 @@ Your Telegraf metrics would get tagged with "my_tag_1"
exec_mycollector,my_tag_1=foo a=5,b_c=6
```

If the JSON data is an array, then each element of the array is
parsed with the configured settings. Each resulting metric will
If the JSON data is an array, then each element of the array is
parsed with the configured settings. Each resulting metric will
be output with the same timestamp.

For example, if the following configuration:
Expand Down Expand Up @@ -220,7 +221,7 @@ For example, if the following configuration:
## List of field names to extract from JSON and add as string fields
# string_fields = []

## gjson query path to specify a specific chunk of JSON to be parsed with
## gjson query path to specify a specific chunk of JSON to be parsed with
## the above configuration. If not specified, the whole file will be parsed
# json_query = ""

Expand Down Expand Up @@ -264,7 +265,7 @@ exec_mycollector,my_tag_1=foo,my_tag_2=baz b_c=6 1136387040000000000
exec_mycollector,my_tag_1=bar,my_tag_2=baz b_c=8 1168527840000000000
```

If you want to only use a specific portion of your JSON, use the "json_query"
If you want to only use a specific portion of your JSON, use the "json_query"
configuration to specify a path to a JSON object.

For example, with the following config:
Expand All @@ -288,7 +289,7 @@ For example, with the following config:
## List of field names to extract from JSON and add as string fields
string_fields = ["last"]

## gjson query path to specify a specific chunk of JSON to be parsed with
## gjson query path to specify a specific chunk of JSON to be parsed with
## the above configuration. If not specified, the whole file will be parsed
json_query = "obj.friends"

Expand Down Expand Up @@ -1038,3 +1039,84 @@ There are no additional configuration options for Wavefront Data Format line-pro
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "wavefront"
```

# CSV
Parse out metrics from a CSV formatted table. By default, the parser assumes there is no header and
will read data from the first line. If `csv_header_row_count` is set to anything besides 0, the parser
will extract column names from the first number of rows. Headers of more than 1 row will have their
names concatenated together. Any unnamed columns will be ignored by the parser.

The `csv_skip_rows` config indicates the number of rows to skip before looking for header information or data
to parse. By default, no rows will be skipped.

The `csv_skip_columns` config indicates the number of columns to be skipped before parsing data. These
columns will not be read out of the header. Naming with the `csv_column_names` will begin at the first
parsed column after skipping the indicated columns. By default, no columns are skipped.

To assign custom column names, the `csv_column_names` config is available. If the `csv_column_names`
config is used, all columns must be named as additional columns will be ignored. If `csv_header_row_count`
is set to 0, `csv_column_names` must be specified. Names listed in `csv_column_names` will override names extracted
from the header.

The `csv_tag_columns` and `csv_field_columns` configs are available to add the column data to the metric.
The name used to specify the column is the name in the header, or if specified, the corresponding
name assigned in `csv_column_names`. If neither config is specified, no data will be added to the metric.

Additional configs are available to dynamically name metrics and set custom timestamps. If the
`csv_column_names` config is specified, the parser will assign the metric name to the value found
in that column. If the `csv_timestamp_column` is specified, the parser will extract the timestamp from
that column. If `csv_timestamp_column` is specified, the `csv_timestamp_format` must also be specified
or an error will be thrown.

#### CSV Configuration
```toml
data_format = "csv"

## Indicates how many rows to treat as a header. By default, the parser assumes
## there is no header and will parse the first row as data. If set to anything more
## than 1, column names will be concatenated with the name listed in the next header row.
## If `csv_column_names` is specified, the column names in header will be overridden.
# csv_header_row_count = 0

## Indicates the number of rows to skip before looking for header information.
# csv_skip_rows = 0

## Indicates the number of columns to skip before looking for data to parse.
## These columns will be skipped in the header as well.
# csv_skip_columns = 0

## The seperator between csv fields
## By default, the parser assumes a comma (",")
# csv_delimiter = ","

## The character reserved for marking a row as a comment row
## Commented rows are skipped and not parsed
# csv_comment = ""

## If set to true, the parser will remove leading whitespace from fields
## By default, this is false
# csv_trim_space = false

## For assigning custom names to columns
## If this is specified, all columns should have a name
## Unnamed columns will be ignored by the parser.
## If `csv_header_row_count` is set to 0, this config must be used
csv_column_names = []

## Columns listed here will be added as tags. Any other columns
## will be added as fields.
csv_tag_columns = []

## The column to extract the name of the metric from
## By default, this is the name of the plugin
## the `name_override` config overrides this
# csv_measurement_column = ""

## The column to extract time information for the metric
## `csv_timestamp_format` must be specified if this is used
# csv_timestamp_column = ""

## The format of time data extracted from `csv_timestamp_column`
## this must be specified if `csv_timestamp_column` is specified
# csv_timestamp_format = ""
```
122 changes: 122 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1443,6 +1443,120 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
}
}

//for csv parser
if node, ok := tbl.Fields["csv_column_names"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.CSVColumnNames = append(c.CSVColumnNames, str.Value)
}
}
}
}
}

if node, ok := tbl.Fields["csv_tag_columns"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.CSVTagColumns = append(c.CSVTagColumns, str.Value)
}
}
}
}
}

if node, ok := tbl.Fields["csv_delimiter"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CSVDelimiter = str.Value
}
}
}

if node, ok := tbl.Fields["csv_comment"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CSVComment = str.Value
}
}
}

if node, ok := tbl.Fields["csv_measurement_column"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CSVMeasurementColumn = str.Value
}
}
}

if node, ok := tbl.Fields["csv_timestamp_column"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CSVTimestampColumn = str.Value
}
}
}

if node, ok := tbl.Fields["csv_timestamp_format"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CSVTimestampFormat = str.Value
}
}
}

if node, ok := tbl.Fields["csv_header_row_count"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
iVal, err := strconv.Atoi(str.Value)
c.CSVHeaderRowCount = iVal
if err != nil {
return nil, fmt.Errorf("E! parsing to int: %v", err)
}
}
}
}

if node, ok := tbl.Fields["csv_skip_rows"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
iVal, err := strconv.Atoi(str.Value)
c.CSVSkipRows = iVal
if err != nil {
return nil, fmt.Errorf("E! parsing to int: %v", err)
}
}
}
}

if node, ok := tbl.Fields["csv_skip_columns"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
iVal, err := strconv.Atoi(str.Value)
c.CSVSkipColumns = iVal
if err != nil {
return nil, fmt.Errorf("E! parsing to int: %v", err)
}
}
}
}

if node, ok := tbl.Fields["csv_trim_space"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.Boolean); ok {
//for config with no quotes
val, err := strconv.ParseBool(str.Value)
c.CSVTrimSpace = val
if err != nil {
return nil, fmt.Errorf("E! parsing to bool: %v", err)
}
}
}
}

c.MetricName = name

delete(tbl.Fields, "data_format")
Expand All @@ -1469,6 +1583,14 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
delete(tbl.Fields, "grok_custom_patterns")
delete(tbl.Fields, "grok_custom_pattern_files")
delete(tbl.Fields, "grok_timezone")
delete(tbl.Fields, "csv_data_columns")
delete(tbl.Fields, "csv_tag_columns")
delete(tbl.Fields, "csv_field_columns")
delete(tbl.Fields, "csv_name_column")
delete(tbl.Fields, "csv_timestamp_column")
delete(tbl.Fields, "csv_timestamp_format")
delete(tbl.Fields, "csv_delimiter")
delete(tbl.Fields, "csv_header")

return parsers.NewParser(c)
}
Expand Down
Loading