Skip to content

Commit

Permalink
Add override to limit number of blocks inspected in tag value search (#…
Browse files Browse the repository at this point in the history
…2358)

* Add override to limit number of blocks inspected in tag value search

* Docs and changelog

* Typo
  • Loading branch information
mapno committed Apr 21, 2023
1 parent e72ccf6 commit e2bc232
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Create new endpoint `/api/v2/search/tags` that returns all tags organized by scope.
* [ENHANCEMENT] Ability to toggle off latency or count metrics in metrics-generator [#2070](https://github.com/grafana/tempo/pull/2070) (@AlexDHoffer)
* [ENHANCEMENT] Extend `/flush` to support flushing a single tenant [#2260](https://github.com/grafana/tempo/pull/2260) (@kvrhdn)
* [ENHANCEMENT] Add override to limit number of blocks inspected in tag value search [#2358](https://github.com/grafana/tempo/pull/2358) (@mapno)

## v2.1.0-rc.0 / 2023-04-12
* [BUGFIX] tempodb integer divide by zero error [#2167](https://github.com/grafana/tempo/issues/2167)
Expand Down
7 changes: 7 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,13 @@ overrides:
# A value of 0 disables the limit.
[max_bytes_per_tag_values_query: <int> | default = 5000000 (5MB) ]

# Maximum number of blocks to be inspected for a tag values query. Tag-values
# query is used mainly to populate the autocomplete dropdown. This limit
# protects the system from long block lists in the ingesters.
# This override limit is used by the ingester.
# A value of 0 disables the limit.
[max_blocks_per_tag_values_query: <int> | default = 0 (disabled) ]

# Generic forwarding configuration

# Per-user configuration of generic forwarder feature. Each forwarder in the list
Expand Down
22 changes: 22 additions & 0 deletions modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,24 @@ func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempop
limit := i.limiter.limits.MaxBytesPerTagValuesQuery(userID)
distinctValues := util.NewDistinctStringCollector(limit)

var inspectedBlocks, maxBlocks int
if limit := i.limiter.limits.MaxBlocksPerTagValuesQuery(userID); limit > 0 {
maxBlocks = limit
}

search := func(s common.Searcher, dv *util.DistinctStringCollector) error {
if maxBlocks > 0 && inspectedBlocks >= maxBlocks {
return nil
}

if s == nil {
return nil
}
if dv.Exceeded() {
return nil
}

inspectedBlocks++
err = s.SearchTagValues(ctx, tagName, dv.Collect, common.DefaultSearchOptions())
if err != nil && err != common.ErrUnsupported {
return fmt.Errorf("unexpected error searching tag values (%s): %w", tagName, err)
Expand Down Expand Up @@ -417,11 +428,22 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag
return distinctValues.Collect(tv)
}

var inspectedBlocks, maxBlocks int
if limit := i.limiter.limits.MaxBlocksPerTagValuesQuery(userID); limit > 0 {
maxBlocks = limit
}

search := func(s common.Searcher, dv *util.DistinctValueCollector[tempopb.TagValue]) error {
if maxBlocks > 0 && inspectedBlocks >= maxBlocks {
return nil
}

if s == nil || dv.Exceeded() {
return nil
}

inspectedBlocks++

err = s.SearchTagValuesV2(ctx, tag, cb, common.DefaultSearchOptions())
if err != nil && err != common.ErrUnsupported {
return fmt.Errorf("unexpected error searching tag values v2 (%s): %w", tag, err)
Expand Down
55 changes: 55 additions & 0 deletions modules/ingester/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,61 @@ func TestInstanceSearchMaxBytesPerTagValuesQueryReturnsPartial(t *testing.T) {
require.Equal(t, 2, len(resp.TagValues)) // Only two values of the form "bar123" fit in the 10 byte limit above.
}

// TestInstanceSearchMaxBytesPerTagValuesQueryReturnsPartial confirms that SearchTagValues returns
// partial results if the bytes of the found tag value exceeds the MaxBytesPerTagValuesQuery limit
func TestInstanceSearchMaxBlocksPerTagValuesQueryReturnsPartial(t *testing.T) {
limits, err := overrides.NewOverrides(overrides.Limits{
MaxBlocksPerTagValuesQuery: 1,
})
assert.NoError(t, err, "unexpected error creating limits")
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

tempDir := t.TempDir()

ingester, _, _ := defaultIngester(t, tempDir)
ingester.limiter = limiter
i, err := ingester.getOrCreateInstance("fake")
assert.NoError(t, err, "unexpected error creating new instance")

tagKey := "foo"

_, _ = writeTracesForSearch(t, i, tagKey, "bar", true)

// Cut the headblock
blockID, err := i.CutBlockIfReady(0, 0, true)
require.NoError(t, err)
assert.NotEqual(t, blockID, uuid.Nil)

// Write more traces
_, _ = writeTracesForSearch(t, i, tagKey, "another-bar", true)

fmt.Println(i.headBlock, len(i.completingBlocks), len(i.completeBlocks))

userCtx := user.InjectOrgID(context.Background(), "fake")

respV1, err := i.SearchTagValues(userCtx, tagKey)
require.NoError(t, err)
assert.Equal(t, 100, len(respV1.TagValues))

respV2, err := i.SearchTagValuesV2(userCtx, &tempopb.SearchTagValuesRequest{TagName: fmt.Sprintf(".%s", tagKey)})
require.NoError(t, err)
assert.Equal(t, 100, len(respV2.TagValues))

// Now test with unlimited blocks
limits, err = overrides.NewOverrides(overrides.Limits{})
assert.NoError(t, err, "unexpected error creating limits")

i.limiter = NewLimiter(limits, &ringCountMock{count: 1}, 1)

respV1, err = i.SearchTagValues(userCtx, tagKey)
require.NoError(t, err)
assert.Equal(t, 200, len(respV1.TagValues))

respV2, err = i.SearchTagValuesV2(userCtx, &tempopb.SearchTagValuesRequest{TagName: fmt.Sprintf(".%s", tagKey)})
require.NoError(t, err)
assert.Equal(t, 200, len(respV2.TagValues))
}

// writes traces to the given instance along with search data. returns
// ids expected to be returned from a tag search and strings expected to
// be returned from a tag value search
Expand Down
6 changes: 5 additions & 1 deletion modules/overrides/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
MetricMaxGlobalTracesPerUser = "max_global_traces_per_user"
MetricMaxBytesPerTrace = "max_bytes_per_trace"
MetricMaxBytesPerTagValuesQuery = "max_bytes_per_tag_values_query"
MetricMaxBlocksPerTagValuesQuery = "max_blocks_per_tag_values_query"
MetricIngestionRateLimitBytes = "ingestion_rate_limit_bytes"
MetricIngestionBurstSizeBytes = "ingestion_burst_size_bytes"
MetricBlockRetention = "block_retention"
Expand Down Expand Up @@ -74,7 +75,8 @@ type Limits struct {
BlockRetention model.Duration `yaml:"block_retention" json:"block_retention"`

// Querier and Ingester enforced limits.
MaxBytesPerTagValuesQuery int `yaml:"max_bytes_per_tag_values_query" json:"max_bytes_per_tag_values_query"`
MaxBytesPerTagValuesQuery int `yaml:"max_bytes_per_tag_values_query" json:"max_bytes_per_tag_values_query"`
MaxBlocksPerTagValuesQuery int `yaml:"max_blocks_per_tag_values_query" json:"max_blocks_per_tag_values_query"`

// QueryFrontend enforced limits
MaxSearchDuration model.Duration `yaml:"max_search_duration" json:"max_search_duration"`
Expand Down Expand Up @@ -102,6 +104,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {

// Querier limits
f.IntVar(&l.MaxBytesPerTagValuesQuery, "querier.max-bytes-per-tag-values-query", 50e5, "Maximum size of response for a tag-values query. Used mainly to limit large the number of values associated with a particular tag")
f.IntVar(&l.MaxBlocksPerTagValuesQuery, "querier.max-blocks-per-tag-values-query", 0, "Maximum number of blocks to query for a tag-values query. 0 to disable.")

f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "File name of per-user overrides.")
_ = l.PerTenantOverridePeriod.Set("10s")
Expand All @@ -117,6 +120,7 @@ func (l *Limits) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxGlobalTracesPerUser), MetricMaxGlobalTracesPerUser)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxBytesPerTrace), MetricMaxBytesPerTrace)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxBytesPerTagValuesQuery), MetricMaxBytesPerTagValuesQuery)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxBlocksPerTagValuesQuery), MetricMaxBlocksPerTagValuesQuery)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.IngestionRateLimitBytes), MetricIngestionRateLimitBytes)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.IngestionBurstSizeBytes), MetricIngestionBurstSizeBytes)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.BlockRetention), MetricBlockRetention)
Expand Down
5 changes: 5 additions & 0 deletions modules/overrides/overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,11 @@ func (o *Overrides) MaxBytesPerTagValuesQuery(userID string) int {
return o.getOverridesForUser(userID).MaxBytesPerTagValuesQuery
}

// MaxBlocksPerTagValuesQuery returns the maximum number of blocks to query for a tag-values query allowed for a user.
func (o *Overrides) MaxBlocksPerTagValuesQuery(userID string) int {
return o.getOverridesForUser(userID).MaxBlocksPerTagValuesQuery
}

// IngestionRateLimitBytes is the number of spans per second allowed for this tenant.
func (o *Overrides) IngestionRateLimitBytes(userID string) float64 {
return float64(o.getOverridesForUser(userID).IngestionRateLimitBytes)
Expand Down

0 comments on commit e2bc232

Please sign in to comment.