Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scope parameter to /api/search/tags #2282

Merged
merged 24 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## main / unreleased

* [ENHANCEMENT] Add `scope` parameter to `/api/search/tags` [#2282](https://github.com/grafana/tempo/pull/2282) (@joe-elliott)
Create new endpoint `/api/v2/search/tags` that returns all tags organized by scope.
* [ENHANCEMENT] Extend `/flush` to support flushing a single tenant [#2260](https://github.com/grafana/tempo/pull/2260) (@kvrhdn)

## v2.1.0-rc.0 / 2023-04-12
Expand Down
4 changes: 4 additions & 0 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ func (t *App) initQuerier() (services.Service, error) {
searchTagsHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchTagsHandler))
t.Server.HTTP.Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTags)), searchTagsHandler)

searchTagsV2Handler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchTagsV2Handler))
t.Server.HTTP.Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTagsV2)), searchTagsV2Handler)

searchTagValuesHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchTagValuesHandler))
t.Server.HTTP.Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValues)), searchTagValuesHandler)

Expand Down Expand Up @@ -296,6 +299,7 @@ func (t *App) initQueryFrontend() (services.Service, error) {
// http search endpoints
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearch), searchHandler)
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTags), searchHandler)
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagsV2), searchHandler)
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValues), searchHandler)
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValuesV2), searchHandler)

Expand Down
60 changes: 54 additions & 6 deletions docs/sources/tempo/api_docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ These endpoints are exposed both when running Tempo in microservices and monolit
| [Querying traces by id](#query) | Query-frontend | HTTP | `GET /api/traces/<traceID>` |
| [Searching traces](#search) | Query-frontend | HTTP | `GET /api/search?<params>` |
| [Search tag names](#search-tags) | Query-frontend | HTTP | `GET /api/search/tags` |
| [Search tag names V2](#search-tags-v2) | Query-frontend | HTTP | `GET /api/v2/search/tags` |
| [Search tag values](#search-tag-values) | Query-frontend | HTTP | `GET /api/search/tag/<tag>/values` |
| [Search tag values V2](#search-tag-values-v2) | Query-frontend | HTTP | `GET /api/v2/search/tag/<tag>/values` |
| [Query Echo Endpoint](#query-echo-endpoint) | Query-frontend | HTTP | `GET /api/echo` |
Expand Down Expand Up @@ -248,10 +249,11 @@ $ curl -G -s http://localhost:3200/api/search --data-urlencode 'tags=service.nam
Ingester configuration `complete_block_timeout` affects how long tags are available for search.

This endpoint retrieves all discovered tag names that can be used in search. The endpoint is available in the query frontend service in
a microservices deployment, or the Tempo endpoint in a monolithic mode deployment.
a microservices deployment, or the Tempo endpoint in a monolithic mode deployment. The tags endpoint takes a scope that controls the kinds
of tags or attributes returned. If nothing is provided, the endpoint will return all resource and span tags.

```
GET /api/search/tags
GET /api/search/tags?scope=<resource|span|intrinsic>
zalegrala marked this conversation as resolved.
Show resolved Hide resolved
```

#### Example
Expand All @@ -260,7 +262,7 @@ Example of how to query Tempo using curl.
This query will return all discovered tag names.

```bash
$ curl -G -s http://localhost:3200/api/search/tags | jq
$ curl -G -s http://localhost:3200/api/search/tags?scope=span | jq
{
"tagNames": [
"host.name",
Expand All @@ -270,10 +272,7 @@ $ curl -G -s http://localhost:3200/api/search/tags | jq
"ip",
"load_generator.seq_num",
"name",
"opencensus.exporterversion",
"region",
"root.name",
"root.service.name",
"root_cause_error",
"sampler.param",
"sampler.type",
Expand All @@ -284,6 +283,55 @@ $ curl -G -s http://localhost:3200/api/search/tags | jq
}
```

### Search tags V2

Ingester configuration `complete_block_timeout` affects how long tags are available for search.

This endpoint retrieves all discovered tag names that can be used in search. The endpoint is available in the query frontend service in
a microservices deployment, or the Tempo endpoint in a monolithic mode deployment. The tags endpoint takes a scope that controls the kinds
of tags or attributes returned. If nothing is provided, the endpoint will return all resource and span tags.

```
GET /api/v2/search/tags?scope=<resource|span|intrinsic>
```

#### Example

Example of how to query Tempo using curl.
This query will return all discovered tag names.

```bash
$ curl -G -s http://localhost:3200/api/v2/search/tags | jq
{
"scopes": [
{
"name": "span",
"tags": [
"article.count",
"http.flavor",
"http.method",
]
},
{
"name": "resource",
"tags": [
"k6",
"service.name"
]
},
{
"name": "intrinsic",
"tags": [
"duration",
"kind",
"name",
"status"
]
}
]
}
```

### Search tag values

Ingester configuration `complete_block_timeout` affects how long tags are available for search.
Expand Down
20 changes: 19 additions & 1 deletion modules/ingester/ingester_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,25 @@ func (i *Ingester) SearchTags(ctx context.Context, req *tempopb.SearchTagsReques
return &tempopb.SearchTagsResponse{}, nil
}

res, err := inst.SearchTags(ctx)
res, err := inst.SearchTags(ctx, req.Scope)
if err != nil {
return nil, err
}

return res, nil
}

func (i *Ingester) SearchTagsV2(ctx context.Context, req *tempopb.SearchTagsRequest) (*tempopb.SearchTagsV2Response, error) {
instanceID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}
inst, ok := i.getInstanceByID(instanceID)
if !ok || inst == nil {
return &tempopb.SearchTagsV2Response{}, nil
}

res, err := inst.SearchTagsV2(ctx, req.Scope)
if err != nil {
return nil, err
}
Expand Down
71 changes: 69 additions & 2 deletions modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sort"
"sync"

"github.com/go-kit/log/level"
"github.com/grafana/tempo/pkg/api"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/opentracing/opentracing-go"
ot_log "github.com/opentracing/opentracing-go/log"
"github.com/uber-go/atomic"
"github.com/weaveworks/common/user"
)

Expand Down Expand Up @@ -202,12 +204,25 @@ func (i *instance) searchLocalBlocks(ctx context.Context, req *tempopb.SearchReq
}
}

func (i *instance) SearchTags(ctx context.Context) (*tempopb.SearchTagsResponse, error) {
func (i *instance) SearchTags(ctx context.Context, scope string) (*tempopb.SearchTagsResponse, error) {
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

// check if it's the special intrinsic scope
if scope == api.ParamScopeIntrinsic {
return &tempopb.SearchTagsResponse{
TagNames: search.GetVirtualIntrinsicValues(),
}, nil
}

// parse for normal scopes
attributeScope := traceql.AttributeScopeFromString(scope)
if attributeScope == traceql.AttributeScopeUnknown {
return nil, fmt.Errorf("unknown scope: %s", scope)
}

limit := i.limiter.limits.MaxBytesPerTagValuesQuery(userID)
distinctValues := util.NewDistinctStringCollector(limit)

Expand All @@ -218,7 +233,7 @@ func (i *instance) SearchTags(ctx context.Context) (*tempopb.SearchTagsResponse,
if dv.Exceeded() {
return nil
}
err = s.SearchTags(ctx, dv.Collect, common.DefaultSearchOptions())
err = s.SearchTags(ctx, attributeScope, dv.Collect, common.DefaultSearchOptions())
if err != nil && err != common.ErrUnsupported {
return fmt.Errorf("unexpected error searching tags: %w", err)
}
Expand Down Expand Up @@ -253,6 +268,58 @@ func (i *instance) SearchTags(ctx context.Context) (*tempopb.SearchTagsResponse,
}, nil
}

// SearchTagsV2 calls SearchTags for each scope and returns the results.
func (i *instance) SearchTagsV2(ctx context.Context, scope string) (*tempopb.SearchTagsV2Response, error) {
scopes := []string{scope}
if scope == "" {
// start with intrinsic scope and all traceql attribute scopes
atts := traceql.AllAttributeScopes()
scopes = make([]string, 0, len(atts)+1) // +1 for intrinsic

scopes = append(scopes, api.ParamScopeIntrinsic)
for _, att := range atts {
scopes = append(scopes, att.String())
}
}
resps := make([]*tempopb.SearchTagsResponse, len(scopes))

overallError := atomic.NewError(nil)
wg := sync.WaitGroup{}
for idx := range scopes {
resps[idx] = &tempopb.SearchTagsResponse{}

wg.Add(1)
go func(scope string, ret **tempopb.SearchTagsResponse) {
defer wg.Done()

resp, err := i.SearchTags(ctx, scope)
if err != nil {
overallError.Store(fmt.Errorf("error searching tags: %s, %w", scope, err))
return
}

*ret = resp
}(scopes[idx], &resps[idx])
}
wg.Wait()

err := overallError.Load()
if err != nil {
return nil, err
}

// build response
resp := &tempopb.SearchTagsV2Response{}
for idx := range resps {
resp.Scopes = append(resp.Scopes, &tempopb.SearchTagsV2Scope{
Name: scopes[idx],
Tags: resps[idx].TagNames,
})
}

return resp, nil
}

func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempopb.SearchTagValuesResponse, error) {
userID, err := user.ExtractOrgID(ctx)
if err != nil {
Expand Down
32 changes: 28 additions & 4 deletions modules/ingester/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,41 @@ func TestInstanceSearchTags(t *testing.T) {

// nolint:revive,unparam
func testSearchTagsAndValues(t *testing.T, ctx context.Context, i *instance, tagName string, expectedTagValues []string) {
sr, err := i.SearchTags(ctx)
sr, err := i.SearchTags(ctx, "")
require.NoError(t, err)
assert.Contains(t, sr.TagNames, tagName)

sr, err = i.SearchTags(ctx, "span")
require.NoError(t, err)
assert.Contains(t, sr.TagNames, tagName)

sr, err = i.SearchTags(ctx, "resource")
require.NoError(t, err)
assert.NotContains(t, sr.TagNames, tagName) // tags are added to h the spans and not resources so they should not be returned

srv, err := i.SearchTagValues(ctx, tagName)
require.NoError(t, err)

sort.Strings(srv.TagValues)
sort.Strings(expectedTagValues)
assert.Contains(t, sr.TagNames, tagName)
sort.Strings(srv.TagValues)
assert.Equal(t, expectedTagValues, srv.TagValues)
}

// TestInstanceSearchTagsSpecialCases tess that SearchTags errors on an unknown scope and
// returns known instrinics for the "intrinsic" scope
func TestInstanceSearchTagsSpecialCases(t *testing.T) {
i, _ := defaultInstance(t)
userCtx := user.InjectOrgID(context.Background(), "fake")

resp, err := i.SearchTags(userCtx, "foo")
require.Error(t, err)
require.Nil(t, resp)

resp, err = i.SearchTags(userCtx, "intrinsic")
require.NoError(t, err)
require.Equal(t, []string{"duration", "kind", "name", "status"}, resp.TagNames)
}

// TestInstanceSearchMaxBytesPerTagValuesQueryReturnsPartial confirms that SearchTagValues returns
// partial results if the bytes of the found tag value exceeds the MaxBytesPerTagValuesQuery limit
func TestInstanceSearchMaxBytesPerTagValuesQueryReturnsPartial(t *testing.T) {
Expand Down Expand Up @@ -385,7 +409,7 @@ func TestInstanceSearchDoesNotRace(t *testing.T) {
go concurrent(func() {
// SearchTags queries now require userID in ctx
ctx := user.InjectOrgID(context.Background(), "test")
_, err := i.SearchTags(ctx)
_, err := i.SearchTags(ctx, "")
require.NoError(t, err, "error getting search tags")
})

Expand Down
4 changes: 2 additions & 2 deletions modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,15 +673,15 @@ func benchmarkInstanceSearch(b testing.TB) {
if rt, ok := b.(*testing.B); ok {
rt.ResetTimer()
for i := 0; i < rt.N; i++ {
resp, err := instance.SearchTags(ctx)
resp, err := instance.SearchTags(ctx, "")
require.NoError(b, err)
require.NotNil(b, resp)
}
return
}

for i := 0; i < 100; i++ {
resp, err := instance.SearchTags(ctx)
resp, err := instance.SearchTags(ctx, "")
require.NoError(b, err)
require.NotNil(b, resp)
}
Expand Down
Loading