Skip to content

Commit

Permalink
[processor/transform] Add ability to limit attributes (#9552)
Browse files Browse the repository at this point in the history
* Add truncation function

* Add TODO

* Updated changelog

* Updated changelog

* Update name to truncateAll

* Fixed README

* add limit function

* Updated changelog

* Updated go.sum

* Add check to avoid loop

* added checked for negative limit

* fix README.md

* Fix lint issue

* Add link to issue for writing logs

Co-authored-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
TylerHelmuth and bogdandrutu authored May 6, 2022
1 parent 789297f commit f21f89e
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
- `datadogexporter`: Add `api.fail_on_invalid_key` to fail fast if api key is invalid (#9426)
- `transformprocessor`: Add support for functions to validate parameters (#9563)
- `googlecloudexporter`: Add GCP cloud logging exporter (#9679)
- `transformprocessor`: Add new `limit` function to allow limiting the number of items in a map, such as the number of attributes in `attributes` or `resource.attributes` (#9552)
- `processor/attributes`: Support attributes set by server authenticator (#9420)
- `datadogexporter`: Experimental support for Exponential Histograms with delta aggregation temporality (#8350)

Expand Down
12 changes: 9 additions & 3 deletions processor/transformprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ it references an unset map value, there will be no action.
- `keep_keys(target, string...)` - `target` is a path expression to a map type field. The map will be mutated to only contain
the fields specified by the list of strings. e.g., `keep_keys(attributes, "http.method")`, `keep_keys(attributes, "http.method", "http.route")`

- `truncate_all(target, limit)` - `target` is a path expression to a map type field. `limit` is an integer. The map will be mutated such that all string values are truncated to the limit. e.g., `truncate(attributes, 100)` will truncate all string values in `attributes` such that all string values have less than or equal to 100 characters. Non-string values are ignored.
- `truncate_all(target, limit)` - `target` is a path expression to a map type field. `limit` is an integer. The map will be mutated such that all string values are truncated to the limit. e.g., `truncate_all(attributes, 100)` will truncate all string values in `attributes` such that all string values have less than or equal to 100 characters. Non-string values are ignored.

- `limit(target, limit)` - `target` is a path expression to a map type field. `limit` is an integer. The map will be mutated such that the number of items does not exceed the limit. e.g., `limit(attributes, 100)` will limit `attributes` to no more than 100 items. Which items are dropped is random.

Supported where operations:
- `==` - matches telemetry where the values are equal to each other
Expand Down Expand Up @@ -50,6 +52,8 @@ processors:
- set(status.code, 1) where attributes["http.path"] == "/health"
- keep_keys(resource.attributes, "service.name", "service.namespace", "cloud.region")
- set(name, attributes["http.route"])
- limit(attributes, 100)
- limit(resource.attributes, 100)
- truncate_all(attributes, 4096)
- truncate_all(resource.attributes, 4096)
service:
Expand Down Expand Up @@ -77,5 +81,7 @@ All spans
1) Set status code to OK for all spans with a path `/health`
2) Keep only `service.name`, `service.namespace`, `cloud.region` resource attributes
3) Set `name` to the `http.route` attribute if it is set
4) Truncate all span attributes such that no string value has more than 4096 characters.
5) Truncate all resource attributes such that no string value has more than 4096 characters.
4) Limit all span attributes such that each span has no more than 100 attributes.
5) Limit all resource attributes such that each resource no more than 100 attributes.
6) Truncate all span attributes such that no string value has more than 4096 characters.
7) Truncate all resource attributes such that no string value has more than 4096 characters.
33 changes: 33 additions & 0 deletions processor/transformprocessor/internal/common/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var registry = map[string]interface{}{
"keep_keys": keepKeys,
"set": set,
"truncate_all": truncateAll,
"limit": limit,
}

type PathExpressionParser func(*Path) (GetSetter, error)
Expand Down Expand Up @@ -96,6 +97,38 @@ func truncateAll(target GetSetter, limit int64) (ExprFunc, error) {
})
target.Set(ctx, updated)
// TODO: Write log when truncation is performed
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/9730
}
return nil
}, nil
}

func limit(target GetSetter, limit int64) (ExprFunc, error) {
return func(ctx TransformContext) interface{} {
val := target.Get(ctx)
if val == nil {
return nil
}

if attrs, ok := val.(pcommon.Map); ok {
if int64(attrs.Len()) <= limit {
return nil
}

updated := pcommon.NewMap()
updated.EnsureCapacity(attrs.Len())
count := int64(0)
attrs.Range(func(key string, val pcommon.Value) bool {
if count < limit {
updated.Insert(key, val)
count++
return true
}
return false
})
target.Set(ctx, updated)
// TODO: Write log when limiting is performed
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/9730
}
return nil
}, nil
Expand Down
20 changes: 20 additions & 0 deletions processor/transformprocessor/internal/common/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,26 @@ func Test_newFunctionCall_invalid(t *testing.T) {
},
},
},
{
name: "not int",
inv: Invocation{
Function: "limit",
Arguments: []Value{
{
Path: &Path{
Fields: []Field{
{
Name: "name",
},
},
},
},
{
String: strp("not an int"),
},
},
},
},
{
name: "function call returns error",
inv: Invocation{
Expand Down
114 changes: 114 additions & 0 deletions processor/transformprocessor/internal/traces/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,120 @@ func Test_newFunctionCall(t *testing.T) {
attrs.CopyTo(span.Attributes())
},
},
{
name: "limit attributes",
inv: common.Invocation{
Function: "limit",
Arguments: []common.Value{
{
Path: &common.Path{
Fields: []common.Field{
{
Name: "attributes",
},
},
},
},
{
Int: intp(1),
},
},
},
want: func(span ptrace.Span) {
input.CopyTo(span)
span.Attributes().Clear()
attrs := pcommon.NewMap()
attrs.InsertString("test", "hello world")
attrs.CopyTo(span.Attributes())
},
},
{
name: "limit attributes zero",
inv: common.Invocation{
Function: "limit",
Arguments: []common.Value{
{
Path: &common.Path{
Fields: []common.Field{
{
Name: "attributes",
},
},
},
},
{
Int: intp(0),
},
},
},
want: func(span ptrace.Span) {
input.CopyTo(span)
span.Attributes().Clear()
attrs := pcommon.NewMap()
attrs.CopyTo(span.Attributes())
},
},
{
name: "limit attributes nothing",
inv: common.Invocation{
Function: "limit",
Arguments: []common.Value{
{
Path: &common.Path{
Fields: []common.Field{
{
Name: "attributes",
},
},
},
},
{
Int: intp(100),
},
},
},
want: func(span ptrace.Span) {
input.CopyTo(span)
span.Attributes().Clear()
attrs := pcommon.NewMap()
attrs.InsertString("test", "hello world")
attrs.InsertInt("test2", 3)
attrs.InsertBool("test3", true)
attrs.CopyTo(span.Attributes())
},
},
{
name: "limit resource attributes",
inv: common.Invocation{
Function: "limit",
Arguments: []common.Value{
{
Path: &common.Path{
Fields: []common.Field{
{
Name: "resource",
},
{
Name: "attributes",
},
},
},
},
{
Int: intp(1),
},
},
},
want: func(span ptrace.Span) {
input.CopyTo(span)
span.Attributes().Clear()
attrs := pcommon.NewMap()
attrs.InsertString("test", "hello world")
attrs.InsertInt("test2", 3)
attrs.InsertBool("test3", true)
attrs.CopyTo(span.Attributes())
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down

0 comments on commit f21f89e

Please sign in to comment.