Skip to content

Commit

Permalink
feat: API: Expose optional label matcher for label names API (#11982)
Browse files Browse the repository at this point in the history
Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
yuri-rs and cyriltovena committed Jun 3, 2024
1 parent 09faea8 commit 8084259
Show file tree
Hide file tree
Showing 20 changed files with 322 additions and 207 deletions.
1 change: 1 addition & 0 deletions docs/sources/reference/loki-http-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ It accepts the following query parameters in the URL:
- `start`: The start time for the query as a nanosecond Unix epoch. Defaults to 6 hours ago.
- `end`: The end time for the query as a nanosecond Unix epoch. Defaults to now.
- `since`: A `duration` used to calculate `start` relative to `end`. If `end` is in the future, `start` is calculated as this duration before now. Any value specified for `start` supersedes this parameter.
- `query`: A set of log stream selector that selects the streams to match and return label names. Example: `{"app": "myapp", "environment": "dev"}`

In microservices mode, `/loki/api/v1/labels` is exposed by the querier.

Expand Down
19 changes: 17 additions & 2 deletions pkg/indexgateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,22 @@ func (g *Gateway) LabelNamesForMetricName(ctx context.Context, req *logproto.Lab
if err != nil {
return nil, err
}
names, err := g.indexQuerier.LabelNamesForMetricName(ctx, instanceID, req.From, req.Through, req.MetricName)
var matchers []*labels.Matcher
// An empty matchers string cannot be parsed,
// therefore we check the string representation of the matchers.
if req.Matchers != syntax.EmptyMatchers {
expr, err := syntax.ParseExprWithoutValidation(req.Matchers)
if err != nil {
return nil, err
}

matcherExpr, ok := expr.(*syntax.MatchersExpr)
if !ok {
return nil, fmt.Errorf("invalid label matchers found of type %T", expr)
}
matchers = matcherExpr.Mts
}
names, err := g.indexQuerier.LabelNamesForMetricName(ctx, instanceID, req.From, req.Through, req.MetricName, matchers...)
if err != nil {
return nil, err
}
Expand All @@ -308,7 +323,7 @@ func (g *Gateway) LabelValuesForMetricName(ctx context.Context, req *logproto.La
}
var matchers []*labels.Matcher
// An empty matchers string cannot be parsed,
// therefore we check the string representation of the the matchers.
// therefore we check the string representation of the matchers.
if req.Matchers != syntax.EmptyMatchers {
expr, err := syntax.ParseExprWithoutValidation(req.Matchers)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
return
}
} else {
storeValues, err = cs.LabelNamesForMetricName(ctx, userID, from, through, "logs")
storeValues, err = cs.LabelNamesForMetricName(ctx, userID, from, through, "logs", matchers...)
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func (s *mockStore) LabelValuesForMetricName(_ context.Context, _ string, _, _ m
return []string{"val1", "val2"}, nil
}

func (s *mockStore) LabelNamesForMetricName(_ context.Context, _ string, _, _ model.Time, _ string) ([]string, error) {
func (s *mockStore) LabelNamesForMetricName(_ context.Context, _ string, _, _ model.Time, _ string, _ ...*labels.Matcher) ([]string, error) {
return nil, nil
}

Expand Down
395 changes: 226 additions & 169 deletions pkg/logproto/logproto.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/logproto/logproto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ message LabelNamesForMetricNameRequest {
(gogoproto.customtype) = "github.com/prometheus/common/model.Time",
(gogoproto.nullable) = false
];
string matchers = 4;
}

// TODO(owen-d): fix. This will break rollouts as soon as the internal repr is changed.
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (q *SingleTenantQuerier) Label(ctx context.Context, req *logproto.LabelRequ
if req.Values {
storeValues, err = q.store.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name, matchers...)
} else {
storeValues, err = q.store.LabelNamesForMetricName(ctx, userID, from, through, "logs")
storeValues, err = q.store.LabelNamesForMetricName(ctx, userID, from, through, "logs", matchers...)
}
return err
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func (s *storeMock) LabelValuesForMetricName(ctx context.Context, userID string,
return args.Get(0).([]string), args.Error(1)
}

func (s *storeMock) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
func (s *storeMock) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, _ ...*labels.Matcher) ([]string, error) {
args := s.Called(ctx, userID, from, through, metricName)
return args.Get(0).([]string), args.Error(1)
}
Expand Down
40 changes: 33 additions & 7 deletions pkg/querier/queryrange/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func Test_codec_EncodeDecodeRequest(t *testing.T) {
}, false},
{"labels", func() (*http.Request, error) {
return http.NewRequest(http.MethodGet,
fmt.Sprintf(`/label?start=%d&end=%d`, start.UnixNano(), end.UnixNano()), nil)
}, NewLabelRequest(start, end, "", "", "/label"),
fmt.Sprintf(`/label?start=%d&end=%d&query={foo="bar"}`, start.UnixNano(), end.UnixNano()), nil)
}, NewLabelRequest(start, end, `{foo="bar"}`, "", "/label"),
false},
{"label_values", func() (*http.Request, error) {
req, err := http.NewRequest(http.MethodGet,
Expand Down Expand Up @@ -875,22 +875,26 @@ func Test_codec_series_EncodeRequest(t *testing.T) {

func Test_codec_labels_EncodeRequest(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "1")
toEncode := NewLabelRequest(start, end, "", "", "/loki/api/v1/labels")

// Test labels endpoint
toEncode := NewLabelRequest(start, end, `{foo="bar"}`, "", "/loki/api/v1/labels")
got, err := DefaultCodec.EncodeRequest(ctx, toEncode)
require.NoError(t, err)
require.Equal(t, ctx, got.Context())
require.Equal(t, "/loki/api/v1/labels", got.URL.Path)
require.Equal(t, fmt.Sprintf("%d", start.UnixNano()), got.URL.Query().Get("start"))
require.Equal(t, fmt.Sprintf("%d", end.UnixNano()), got.URL.Query().Get("end"))
require.Equal(t, `{foo="bar"}`, got.URL.Query().Get("query"))

// testing a full roundtrip
req, err := DefaultCodec.DecodeRequest(context.TODO(), got, nil)
require.NoError(t, err)
require.Equal(t, toEncode.Start, req.(*LabelRequest).Start)
require.Equal(t, toEncode.End, req.(*LabelRequest).End)
require.Equal(t, toEncode.Query, req.(*LabelRequest).Query)
require.Equal(t, "/loki/api/v1/labels", req.(*LabelRequest).Path())

// Test labels values endpoint
// Test label values endpoint
toEncode = NewLabelRequest(start, end, `{foo="bar"}`, "__name__", "/loki/api/v1/label/__name__/values")
got, err = DefaultCodec.EncodeRequest(ctx, toEncode)
require.NoError(t, err)
Expand All @@ -912,21 +916,43 @@ func Test_codec_labels_EncodeRequest(t *testing.T) {

func Test_codec_labels_DecodeRequest(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "1")
u, err := url.Parse(`/loki/api/v1/label/__name__/values?start=1575285010000000010&end=1575288610000000010&query={foo="bar"}`)

// Test labels endpoint
u, err := url.Parse(`/loki/api/v1/labels?start=1575285010000000010&end=1575288610000000010&query={foo="bar"}`)
require.NoError(t, err)

r := &http.Request{URL: u}
r = mux.SetURLVars(r, map[string]string{"name": "__name__"})
req, err := DefaultCodec.DecodeRequest(context.TODO(), r, nil)
require.NoError(t, err)
require.Equal(t, start, *req.(*LabelRequest).Start)
require.Equal(t, end, *req.(*LabelRequest).End)
require.Equal(t, `{foo="bar"}`, req.(*LabelRequest).Query)
require.Equal(t, "/loki/api/v1/label/__name__/values", req.(*LabelRequest).Path())
require.Equal(t, "/loki/api/v1/labels", req.(*LabelRequest).Path())

got, err := DefaultCodec.EncodeRequest(ctx, req)
require.NoError(t, err)
require.Equal(t, ctx, got.Context())
require.Equal(t, "/loki/api/v1/labels", got.URL.Path)
require.Equal(t, fmt.Sprintf("%d", start.UnixNano()), got.URL.Query().Get("start"))
require.Equal(t, fmt.Sprintf("%d", end.UnixNano()), got.URL.Query().Get("end"))
require.Equal(t, `{foo="bar"}`, got.URL.Query().Get("query"))

// Test label values endpoint
u, err = url.Parse(`/loki/api/v1/label/__name__/values?start=1575285010000000010&end=1575288610000000010&query={foo="bar"}`)
require.NoError(t, err)

r = &http.Request{URL: u}
r = mux.SetURLVars(r, map[string]string{"name": "__name__"})
req, err = DefaultCodec.DecodeRequest(context.TODO(), r, nil)
require.NoError(t, err)
require.Equal(t, start, *req.(*LabelRequest).Start)
require.Equal(t, end, *req.(*LabelRequest).End)
require.Equal(t, `{foo="bar"}`, req.(*LabelRequest).Query)
require.Equal(t, "/loki/api/v1/label/__name__/values", req.(*LabelRequest).Path())

got, err = DefaultCodec.EncodeRequest(ctx, req)
require.NoError(t, err)
require.Equal(t, ctx, got.Context())
require.Equal(t, "/loki/api/v1/label/__name__/values", got.URL.Path)
require.Equal(t, fmt.Sprintf("%d", start.UnixNano()), got.URL.Query().Get("start"))
require.Equal(t, fmt.Sprintf("%d", end.UnixNano()), got.URL.Query().Get("end"))
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/queryrange/labels_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (i cacheKeyLabels) GenerateCacheKey(ctx context.Context, userID string, r r
return fmt.Sprintf("labelvalues:%s:%s:%s:%d:%d", userID, lr.GetName(), lr.GetQuery(), currentInterval, split)
}

return fmt.Sprintf("labels:%s:%d:%d", userID, currentInterval, split)
return fmt.Sprintf("labels:%s:%s:%d:%d", userID, lr.GetQuery(), currentInterval, split)
}

type labelsExtractor struct{}
Expand Down
16 changes: 10 additions & 6 deletions pkg/querier/queryrange/labels_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@ func TestCacheKeyLabels_GenerateCacheKey(t *testing.T) {
expectedInterval := testTime.UnixMilli() / time.Hour.Milliseconds()

t.Run("labels", func(t *testing.T) {
require.Equal(t, fmt.Sprintf(`labels:fake:%d:%d`, expectedInterval, time.Hour.Nanoseconds()), k.GenerateCacheKey(context.Background(), "fake", &req))
require.Equal(t, fmt.Sprintf(`labels:fake::%d:%d`, expectedInterval, time.Hour.Nanoseconds()), k.GenerateCacheKey(context.Background(), "fake", &req))

req.Query = `{cluster="eu-west1"}`
require.Equal(t, fmt.Sprintf(`labels:fake:{cluster="eu-west1"}:%d:%d`, expectedInterval, time.Hour.Nanoseconds()), k.GenerateCacheKey(context.Background(), "fake", &req))
})

t.Run("label values", func(t *testing.T) {
req := req
req.Name = "foo"
req.Values = true
req.Query = ``
require.Equal(t, fmt.Sprintf(`labelvalues:fake:foo::%d:%d`, expectedInterval, time.Hour.Nanoseconds()), k.GenerateCacheKey(context.Background(), "fake", &req))

req.Query = `{cluster="eu-west1"}`
Expand Down Expand Up @@ -361,20 +365,20 @@ func TestLabelQueryCacheKey(t *testing.T) {
t.Run(fmt.Sprintf("%s (values: %v)", tc.name, values), func(t *testing.T) {
keyGen := cacheKeyLabels{tc.limits, nil}

const labelName = "foo"
const query = `{cluster="eu-west1"}`

r := &LabelRequest{
LabelRequest: logproto.LabelRequest{
Start: &tc.start,
End: &tc.end,
Query: query,
},
}

const labelName = "foo"
const query = `{cluster="eu-west1"}`

if values {
r.LabelRequest.Values = true
r.LabelRequest.Name = labelName
r.LabelRequest.Query = query
}

// we use regex here because cache key always refers to the current time to get the ingester query window,
Expand All @@ -383,7 +387,7 @@ func TestLabelQueryCacheKey(t *testing.T) {
if values {
pattern = regexp.MustCompile(fmt.Sprintf(`labelvalues:%s:%s:%s:(\d+):%d`, tenantID, labelName, regexp.QuoteMeta(query), tc.expectedSplit))
} else {
pattern = regexp.MustCompile(fmt.Sprintf(`labels:%s:(\d+):%d`, tenantID, tc.expectedSplit))
pattern = regexp.MustCompile(fmt.Sprintf(`labels:%s:%s:(\d+):%d`, tenantID, regexp.QuoteMeta(query), tc.expectedSplit))
}

require.Regexp(t, pattern, keyGen.GenerateCacheKey(context.Background(), tenantID, r))
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ func (c CompositeStore) LabelValuesForMetricName(ctx context.Context, userID str
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c CompositeStore) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
func (c CompositeStore) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error) {
var result util.UniqueStrings
err := c.forStores(ctx, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error {
labelNames, err := store.LabelNamesForMetricName(innerCtx, userID, from, through, metricName)
labelNames, err := store.LabelNamesForMetricName(innerCtx, userID, from, through, metricName, matchers...)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/composite_store_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (c *storeEntry) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c *storeEntry) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
func (c *storeEntry) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelNamesForMetricName")
defer sp.Finish()
log := spanlogger.FromContext(ctx)
Expand All @@ -122,7 +122,7 @@ func (c *storeEntry) LabelNamesForMetricName(ctx context.Context, userID string,
}
level.Debug(log).Log("metric", metricName)

return c.indexReader.LabelNamesForMetricName(ctx, userID, from, through, metricName)
return c.indexReader.LabelNamesForMetricName(ctx, userID, from, through, metricName, matchers...)
}

func (c *storeEntry) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/composite_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (m mockStore) GetSeries(_ context.Context, _ string, _, _ model.Time, _ ...
return nil, nil
}

func (m mockStore) LabelNamesForMetricName(_ context.Context, _ string, _, _ model.Time, _ string) ([]string, error) {
func (m mockStore) LabelNamesForMetricName(_ context.Context, _ string, _, _ model.Time, _ string, _ ...*labels.Matcher) ([]string, error) {
return nil, nil
}

Expand Down Expand Up @@ -210,7 +210,7 @@ func (m mockStoreLabel) LabelValuesForMetricName(_ context.Context, _ string, _,
return m.values, nil
}

func (m mockStoreLabel) LabelNamesForMetricName(_ context.Context, _ string, _, _ model.Time, _ string) ([]string, error) {
func (m mockStoreLabel) LabelNamesForMetricName(_ context.Context, _ string, _, _ model.Time, _ string, _ ...*labels.Matcher) ([]string, error) {
return m.values, nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/stores/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Filterable interface {
type BaseReader interface {
GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error)
LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error)
LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error)
LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error)
}

type StatsReader interface {
Expand Down Expand Up @@ -112,11 +112,11 @@ func (m MonitoredReaderWriter) LabelValuesForMetricName(ctx context.Context, use
return values, nil
}

func (m MonitoredReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
func (m MonitoredReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error) {
var values []string
if err := loki_instrument.TimeRequest(ctx, "label_names", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
var err error
values, err = m.rw.LabelNamesForMetricName(ctx, userID, from, through, metricName)
values, err = m.rw.LabelNamesForMetricName(ctx, userID, from, through, metricName, matchers...)
return err
}); err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/stores/series/series_index_gateway_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,12 @@ func (c *IndexGatewayClientStore) GetSeries(ctx context.Context, _ string, from,
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c *IndexGatewayClientStore) LabelNamesForMetricName(ctx context.Context, _ string, from, through model.Time, metricName string) ([]string, error) {
func (c *IndexGatewayClientStore) LabelNamesForMetricName(ctx context.Context, _ string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error) {
resp, err := c.client.LabelNamesForMetricName(ctx, &logproto.LabelNamesForMetricNameRequest{
MetricName: metricName,
From: from,
Through: through,
Matchers: (&syntax.MatchersExpr{Mts: matchers}).String(),
})
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/series/series_index_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,14 @@ func (c *IndexReaderWriter) chunksToSeries(ctx context.Context, in []logproto.Ch
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c *IndexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
func (c *IndexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelNamesForMetricName")
defer sp.Finish()
log := spanlogger.FromContext(ctx)
defer log.Span.Finish()

// Fetch the series IDs from the index
seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, userID, metricName, nil)
seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, userID, metricName, matchers)
if err != nil {
return nil, err
}
Expand Down
16 changes: 13 additions & 3 deletions pkg/storage/stores/series/series_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,14 +246,24 @@ func TestChunkStore_LabelNamesForMetricName(t *testing.T) {
for _, tc := range []struct {
metricName string
expect []string
matchers []*labels.Matcher
}{
{
`foo`,
[]string{"bar", "flip", "toms"},
nil,
},
{
`bar`,
[]string{"bar", "toms"},
nil,
},
{
`foo`,
[]string{"bar", "toms"},
[]*labels.Matcher{
labels.MustNewMatcher(labels.MatchRegexp, "bar", "beep"),
},
},
} {
for _, schema := range schemas {
Expand Down Expand Up @@ -286,23 +296,23 @@ func TestChunkStore_LabelNamesForMetricName(t *testing.T) {
}

// Query with ordinary time-range
labelNames1, err := store.LabelNamesForMetricName(ctx, userID, now.Add(-time.Hour), now, tc.metricName)
labelNames1, err := store.LabelNamesForMetricName(ctx, userID, now.Add(-time.Hour), now, tc.metricName, tc.matchers...)
require.NoError(t, err)

if !reflect.DeepEqual(tc.expect, labelNames1) {
t.Fatalf("%s: wrong label name - %s", tc.metricName, test.Diff(tc.expect, labelNames1))
}

// Pushing end of time-range into future should yield exact same resultset
labelNames2, err := store.LabelNamesForMetricName(ctx, userID, now.Add(-time.Hour), now.Add(time.Hour*24*10), tc.metricName)
labelNames2, err := store.LabelNamesForMetricName(ctx, userID, now.Add(-time.Hour), now.Add(time.Hour*24*10), tc.metricName, tc.matchers...)
require.NoError(t, err)

if !reflect.DeepEqual(tc.expect, labelNames2) {
t.Fatalf("%s: wrong label name - %s", tc.metricName, test.Diff(tc.expect, labelNames2))
}

// Query with both begin & end of time-range in future should yield empty resultset
labelNames3, err := store.LabelNamesForMetricName(ctx, userID, now.Add(time.Hour), now.Add(time.Hour*2), tc.metricName)
labelNames3, err := store.LabelNamesForMetricName(ctx, userID, now.Add(time.Hour), now.Add(time.Hour*2), tc.metricName, tc.matchers...)
require.NoError(t, err)
if len(labelNames3) != 0 {
t.Fatalf("%s: future query should yield empty resultset ... actually got %v label names: %#v",
Expand Down
Loading

0 comments on commit 8084259

Please sign in to comment.