Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue with config reload when using a log pipeline with a metric stage #6971

Merged
merged 4 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ Main (unreleased)
- A new `otelcol.exporter.debug` component for printing OTel telemetry from
other `otelcol` components to the console. (@BarunKGP)

### Bugfixes

- Fix an issue which caused the config to be reloaded if a config reload was triggered but the config hasn't changed.
The bug only affected the "metrics" and "logs" subsystems in Static mode. (@ptodev)

- Fix a bug in Static mode and Flow which prevented config reloads to work if a Loki `metrics` stage is in the pipeline.
This resulted in a "failed to unregister all metrics from previous promtail" message. (@ptodev)

v0.41.1 (2024-06-07)
--------------------

Expand All @@ -28,11 +36,6 @@ v0.41.1 (2024-06-07)

- Updated pyroscope to v0.4.6 introducing `symbols_map_size` and `pid_map_size` configuration. (@simonswine)

### Bugfixes

- Fix an issue which caused the config to be reloaded if a config reload was triggered but the config hasn't changed.
The bug only affected the "metrics" and "logs" subsystems in Static mode.

v0.41.0 (2024-05-31)
--------------------

Expand Down
4 changes: 4 additions & 0 deletions docs/sources/flow/reference/components/loki.process.md
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,10 @@ The following blocks are supported inside the definition of `stage.metrics`:
| metric.gauge | [metric.gauge][] | Defines a `gauge` metric. | no |
| metric.histogram | [metric.histogram][] | Defines a `histogram` metric. | no |

{{< admonition type="note" >}}
The metrics will be reset if you reload the {{< param "PRODUCT_ROOT_NAME" >}} configuration file.
{{< /admonition >}}

[metric.counter]: #metriccounter-block
[metric.gauge]: #metricgauge-block
[metric.histogram]: #metrichistogram-block
Expand Down
2 changes: 1 addition & 1 deletion internal/component/loki/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (c *Component) Update(args component.Arguments) error {
if err != nil {
return err
}
c.entryHandler = loki.NewEntryHandler(c.processOut, func() {})
c.entryHandler = loki.NewEntryHandler(c.processOut, func() { pipeline.Cleanup() })
c.processIn = pipeline.Wrap(c.entryHandler).Chan()
c.stages = newArgs.Stages
}
Expand Down
193 changes: 191 additions & 2 deletions internal/component/loki/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package process

import (
"context"
"fmt"
"os"
"strings"
"testing"
"time"

Expand All @@ -18,12 +20,15 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/river"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/goleak"
)

const logline = `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}`

func TestJSONLabelsStage(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))

Expand Down Expand Up @@ -91,7 +96,6 @@ func TestJSONLabelsStage(t *testing.T) {

// Send a log entry to the component's receiver.
ts := time.Now()
logline := `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}`
logEntry := loki.Entry{
Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"},
Entry: logproto.Entry{
Expand Down Expand Up @@ -454,7 +458,6 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) {
go func() {
for {
ts := time.Now()
logline := `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}`
logEntry := loki.Entry{
Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"},
Entry: logproto.Entry{
Expand Down Expand Up @@ -486,3 +489,189 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) {
time.Sleep(1 * time.Second)
require.WithinDuration(t, time.Now(), lastSend.Load().(time.Time), 300*time.Millisecond)
}

func TestMetricsStageRefresh(t *testing.T) {
tester := newTester(t)
defer tester.stop()

forwardArgs := `
// This will be filled later
forward_to = []`

numLogsToSend := 3

cfgWithMetric := `
stage.metrics {
metric.counter {
name = "paulin_test"
action = "inc"
match_all = true
}
}` + forwardArgs

cfgWithMetric_Metrics := `
# HELP loki_process_custom_paulin_test
# TYPE loki_process_custom_paulin_test counter
loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d
`

// The component will be reconfigured so that it has a metric.
t.Run("config with a metric", func(t *testing.T) {
tester.updateAndTest(numLogsToSend, cfgWithMetric,
"",
fmt.Sprintf(cfgWithMetric_Metrics, numLogsToSend))
})

// The component will be "updated" with the same config.
// We expect the metric to stay the same before logs are sent - the component should be smart enough to
// know that the new config is the same as the old one and it should just keep running as it is.
// If it resets the metric, this could cause issues with some users who have a sidecar "autoreloader"
// which reloads the collector config every X seconds.
// Those users wouldn't expect their metrics to be reset every time the config is reloaded.
t.Run("config with the same metric", func(t *testing.T) {
tester.updateAndTest(numLogsToSend, cfgWithMetric,
fmt.Sprintf(cfgWithMetric_Metrics, numLogsToSend),
fmt.Sprintf(cfgWithMetric_Metrics, 2*numLogsToSend))
})

// Use a config which has no metrics stage.
// This should cause the metric to disappear.
cfgWithNoStages := forwardArgs
t.Run("config with no metrics stage", func(t *testing.T) {
tester.updateAndTest(numLogsToSend, cfgWithNoStages, "", "")
})

// Use a config which has a metric with a different name,
// as well as a metric with the same name as the one in the previous config.
// We try having a metric with the same name as before so that we can see if there
// is some sort of double registration error for that metric.
cfgWithTwoMetrics := `
stage.metrics {
metric.counter {
name = "paulin_test_3"
action = "inc"
match_all = true
}
metric.counter {
name = "paulin_test"
action = "inc"
match_all = true
}
}` + forwardArgs

expectedMetrics3 := `
# HELP loki_process_custom_paulin_test_3
# TYPE loki_process_custom_paulin_test_3 counter
loki_process_custom_paulin_test_3{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d
# HELP loki_process_custom_paulin_test
# TYPE loki_process_custom_paulin_test counter
loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d
`

t.Run("config with a new and old metric", func(t *testing.T) {
tester.updateAndTest(numLogsToSend, cfgWithTwoMetrics,
"",
fmt.Sprintf(expectedMetrics3, numLogsToSend, numLogsToSend))
})
}

type tester struct {
t *testing.T
component *Component
registry *prometheus.Registry
cancelFunc context.CancelFunc
logReceiver loki.LogsReceiver
logTimestamp time.Time
logEntry loki.Entry
wantLabelSet model.LabelSet
}

// Create the component, so that it can process and forward logs.
func newTester(t *testing.T) *tester {
reg := prometheus.NewRegistry()

opts := component.Options{
Logger: util.TestFlowLogger(t),
Registerer: reg,
OnStateChange: func(e component.Exports) {},
}

initialCfg := `forward_to = []`
var args Arguments
err := river.Unmarshal([]byte(initialCfg), &args)
require.NoError(t, err)

logReceiver := loki.NewLogsReceiver()
args.ForwardTo = []loki.LogsReceiver{logReceiver}

c, err := New(opts, args)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
go c.Run(ctx)

logTimestamp := time.Now()

return &tester{
t: t,
component: c,
registry: reg,
cancelFunc: cancel,
logReceiver: logReceiver,
logTimestamp: logTimestamp,
logEntry: loki.Entry{
Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"},
Entry: logproto.Entry{
Timestamp: logTimestamp,
Line: logline,
},
},
wantLabelSet: model.LabelSet{
"filename": "/var/log/pods/agent/agent/1.log",
"foo": "bar",
},
}
}

func (t *tester) stop() {
t.cancelFunc()
}

func (t *tester) updateAndTest(numLogsToSend int, cfg, expectedMetricsBeforeSendingLogs, expectedMetricsAfterSendingLogs string) {
var args Arguments
err := river.Unmarshal([]byte(cfg), &args)
require.NoError(t.t, err)

args.ForwardTo = []loki.LogsReceiver{t.logReceiver}

t.component.Update(args)

// Check the component metrics.
if err := testutil.GatherAndCompare(t.registry,
strings.NewReader(expectedMetricsBeforeSendingLogs)); err != nil {
require.NoError(t.t, err)
}

// Send logs.
for i := 0; i < numLogsToSend; i++ {
t.component.receiver.Chan() <- t.logEntry
}

// Receive logs.
for i := 0; i < numLogsToSend; i++ {
select {
case logEntry := <-t.logReceiver.Chan():
require.True(t.t, t.logTimestamp.Equal(logEntry.Timestamp))
require.Equal(t.t, logline, logEntry.Line)
require.Equal(t.t, t.wantLabelSet, logEntry.Labels)
case <-time.After(5 * time.Second):
require.FailNow(t.t, "failed waiting for log line")
}
}

// Check the component metrics.
if err := testutil.GatherAndCompare(t.registry,
strings.NewReader(expectedMetricsAfterSendingLogs)); err != nil {
require.NoError(t.t, err)
}
}
41 changes: 41 additions & 0 deletions internal/util/unregisterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,38 @@ func WrapWithUnregisterer(reg prometheus.Registerer) *Unregisterer {
}
}

// An "unchecked collector" is a collector which returns an empty description.
// It is described in the Prometheus documentation, here:
// https://pkg.go.dev/github.com/prometheus/client_golang/prometheus#hdr-Custom_Collectors_and_constant_Metrics
//
// > Alternatively, you could return no Desc at all, which will mark the Collector “unchecked”.
// > No checks are performed at registration time, but metric consistency will still be ensured at scrape time,
// > i.e. any inconsistencies will lead to scrape errors. Thus, with unchecked Collectors,
// > the responsibility to not collect metrics that lead to inconsistencies in the total scrape result
// > lies with the implementer of the Collector. While this is not a desirable state, it is sometimes necessary.
// > The typical use case is a situation where the exact metrics to be returned by a Collector cannot be predicted
// > at registration time, but the implementer has sufficient knowledge of the whole system to guarantee metric consistency.
//
// Unchecked collectors are used in the Loki "metrics" stage of the Loki "process" component.
//
// The isUncheckedCollector function is similar to how Prometheus' Go client extracts the metric description:
// https://github.com/prometheus/client_golang/blob/45f1e72421d9d11af6be784ad60b7389f7543e70/prometheus/registry.go#L372-L381
func isUncheckedCollector(c prometheus.Collector) bool {
thampiotr marked this conversation as resolved.
Show resolved Hide resolved
descChan := make(chan *prometheus.Desc, 10)

go func() {
ptodev marked this conversation as resolved.
Show resolved Hide resolved
c.Describe(descChan)
close(descChan)
}()

i := 0
for range descChan {
i += 1
}

return i == 0
}

// Register implements prometheus.Registerer.
func (u *Unregisterer) Register(c prometheus.Collector) error {
if u.wrap == nil {
Expand All @@ -28,6 +60,11 @@ func (u *Unregisterer) Register(c prometheus.Collector) error {
if err != nil {
return err
}

if isUncheckedCollector(c) {
return nil
}

u.cs[c] = struct{}{}
return nil
}
Expand All @@ -43,6 +80,10 @@ func (u *Unregisterer) MustRegister(cs ...prometheus.Collector) {

// Unregister implements prometheus.Registerer.
func (u *Unregisterer) Unregister(c prometheus.Collector) bool {
if isUncheckedCollector(c) {
return true
}

if u.wrap != nil && u.wrap.Unregister(c) {
delete(u.cs, c)
return true
Expand Down
35 changes: 35 additions & 0 deletions internal/util/unregisterer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package util

import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

func Test_UnregisterTwice_NormalCollector(t *testing.T) {
u := WrapWithUnregisterer(prometheus.NewRegistry())
c := prometheus.NewCounter(prometheus.CounterOpts{
Name: "test_metric",
Help: "Test metric.",
})
u.Register(c)
require.True(t, u.Unregister(c))
require.False(t, u.Unregister(c))
}

type uncheckedCollector struct{}

func (uncheckedCollector) Describe(chan<- *prometheus.Desc) {}

func (uncheckedCollector) Collect(chan<- prometheus.Metric) {}

var _ prometheus.Collector = uncheckedCollector{}

func Test_UnregisterTwice_UncheckedCollector(t *testing.T) {
u := WrapWithUnregisterer(prometheus.NewRegistry())
c := uncheckedCollector{}
u.Register(c)
require.True(t, u.Unregister(c))
require.True(t, u.Unregister(c))
}
Loading
Loading