diff --git a/CHANGELOG.md b/CHANGELOG.md index c6dad47fe663..eb11eaf5a1fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,14 @@ Main (unreleased) - A new `otelcol.exporter.debug` component for printing OTel telemetry from other `otelcol` components to the console. (@BarunKGP) +### Bugfixes + +- Fix an issue which caused the config to be reloaded if a config reload was triggered but the config hasn't changed. + The bug only affected the "metrics" and "logs" subsystems in Static mode. (@ptodev) + +- Fix a bug in Static mode and Flow which prevented config reloads to work if a Loki `metrics` stage is in the pipeline. + This resulted in a "failed to unregister all metrics from previous promtail" message. (@ptodev) + v0.41.1 (2024-06-07) -------------------- @@ -28,11 +36,6 @@ v0.41.1 (2024-06-07) - Updated pyroscope to v0.4.6 introducing `symbols_map_size` and `pid_map_size` configuration. (@simonswine) -### Bugfixes - -- Fix an issue which caused the config to be reloaded if a config reload was triggered but the config hasn't changed. - The bug only affected the "metrics" and "logs" subsystems in Static mode. - v0.41.0 (2024-05-31) -------------------- diff --git a/docs/sources/flow/reference/components/loki.process.md b/docs/sources/flow/reference/components/loki.process.md index 05eb467d63c2..ac8307a0e96a 100644 --- a/docs/sources/flow/reference/components/loki.process.md +++ b/docs/sources/flow/reference/components/loki.process.md @@ -717,6 +717,10 @@ The following blocks are supported inside the definition of `stage.metrics`: | metric.gauge | [metric.gauge][] | Defines a `gauge` metric. | no | | metric.histogram | [metric.histogram][] | Defines a `histogram` metric. | no | +{{< admonition type="note" >}} +The metrics will be reset if you reload the {{< param "PRODUCT_ROOT_NAME" >}} configuration file. +{{< /admonition >}} + [metric.counter]: #metriccounter-block [metric.gauge]: #metricgauge-block [metric.histogram]: #metrichistogram-block diff --git a/internal/component/loki/process/process.go b/internal/component/loki/process/process.go index b084582a951d..b7d0e8eb1b47 100644 --- a/internal/component/loki/process/process.go +++ b/internal/component/loki/process/process.go @@ -127,7 +127,7 @@ func (c *Component) Update(args component.Arguments) error { if err != nil { return err } - c.entryHandler = loki.NewEntryHandler(c.processOut, func() {}) + c.entryHandler = loki.NewEntryHandler(c.processOut, func() { pipeline.Cleanup() }) c.processIn = pipeline.Wrap(c.entryHandler).Chan() c.stages = newArgs.Stages } diff --git a/internal/component/loki/process/process_test.go b/internal/component/loki/process/process_test.go index 8a423c02af73..5256bbcbca0f 100644 --- a/internal/component/loki/process/process_test.go +++ b/internal/component/loki/process/process_test.go @@ -4,7 +4,9 @@ package process import ( "context" + "fmt" "os" + "strings" "testing" "time" @@ -18,12 +20,15 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/river" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/goleak" ) +const logline = `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}` + func TestJSONLabelsStage(t *testing.T) { defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) @@ -91,7 +96,6 @@ func TestJSONLabelsStage(t *testing.T) { // Send a log entry to the component's receiver. ts := time.Now() - logline := `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}` logEntry := loki.Entry{ Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"}, Entry: logproto.Entry{ @@ -454,7 +458,6 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) { go func() { for { ts := time.Now() - logline := `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}` logEntry := loki.Entry{ Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"}, Entry: logproto.Entry{ @@ -486,3 +489,189 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) { time.Sleep(1 * time.Second) require.WithinDuration(t, time.Now(), lastSend.Load().(time.Time), 300*time.Millisecond) } + +func TestMetricsStageRefresh(t *testing.T) { + tester := newTester(t) + defer tester.stop() + + forwardArgs := ` + // This will be filled later + forward_to = []` + + numLogsToSend := 3 + + cfgWithMetric := ` + stage.metrics { + metric.counter { + name = "paulin_test" + action = "inc" + match_all = true + } + }` + forwardArgs + + cfgWithMetric_Metrics := ` + # HELP loki_process_custom_paulin_test + # TYPE loki_process_custom_paulin_test counter + loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d + ` + + // The component will be reconfigured so that it has a metric. + t.Run("config with a metric", func(t *testing.T) { + tester.updateAndTest(numLogsToSend, cfgWithMetric, + "", + fmt.Sprintf(cfgWithMetric_Metrics, numLogsToSend)) + }) + + // The component will be "updated" with the same config. + // We expect the metric to stay the same before logs are sent - the component should be smart enough to + // know that the new config is the same as the old one and it should just keep running as it is. + // If it resets the metric, this could cause issues with some users who have a sidecar "autoreloader" + // which reloads the collector config every X seconds. + // Those users wouldn't expect their metrics to be reset every time the config is reloaded. + t.Run("config with the same metric", func(t *testing.T) { + tester.updateAndTest(numLogsToSend, cfgWithMetric, + fmt.Sprintf(cfgWithMetric_Metrics, numLogsToSend), + fmt.Sprintf(cfgWithMetric_Metrics, 2*numLogsToSend)) + }) + + // Use a config which has no metrics stage. + // This should cause the metric to disappear. + cfgWithNoStages := forwardArgs + t.Run("config with no metrics stage", func(t *testing.T) { + tester.updateAndTest(numLogsToSend, cfgWithNoStages, "", "") + }) + + // Use a config which has a metric with a different name, + // as well as a metric with the same name as the one in the previous config. + // We try having a metric with the same name as before so that we can see if there + // is some sort of double registration error for that metric. + cfgWithTwoMetrics := ` + stage.metrics { + metric.counter { + name = "paulin_test_3" + action = "inc" + match_all = true + } + metric.counter { + name = "paulin_test" + action = "inc" + match_all = true + } + }` + forwardArgs + + expectedMetrics3 := ` + # HELP loki_process_custom_paulin_test_3 + # TYPE loki_process_custom_paulin_test_3 counter + loki_process_custom_paulin_test_3{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d + # HELP loki_process_custom_paulin_test + # TYPE loki_process_custom_paulin_test counter + loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d + ` + + t.Run("config with a new and old metric", func(t *testing.T) { + tester.updateAndTest(numLogsToSend, cfgWithTwoMetrics, + "", + fmt.Sprintf(expectedMetrics3, numLogsToSend, numLogsToSend)) + }) +} + +type tester struct { + t *testing.T + component *Component + registry *prometheus.Registry + cancelFunc context.CancelFunc + logReceiver loki.LogsReceiver + logTimestamp time.Time + logEntry loki.Entry + wantLabelSet model.LabelSet +} + +// Create the component, so that it can process and forward logs. +func newTester(t *testing.T) *tester { + reg := prometheus.NewRegistry() + + opts := component.Options{ + Logger: util.TestFlowLogger(t), + Registerer: reg, + OnStateChange: func(e component.Exports) {}, + } + + initialCfg := `forward_to = []` + var args Arguments + err := river.Unmarshal([]byte(initialCfg), &args) + require.NoError(t, err) + + logReceiver := loki.NewLogsReceiver() + args.ForwardTo = []loki.LogsReceiver{logReceiver} + + c, err := New(opts, args) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + go c.Run(ctx) + + logTimestamp := time.Now() + + return &tester{ + t: t, + component: c, + registry: reg, + cancelFunc: cancel, + logReceiver: logReceiver, + logTimestamp: logTimestamp, + logEntry: loki.Entry{ + Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"}, + Entry: logproto.Entry{ + Timestamp: logTimestamp, + Line: logline, + }, + }, + wantLabelSet: model.LabelSet{ + "filename": "/var/log/pods/agent/agent/1.log", + "foo": "bar", + }, + } +} + +func (t *tester) stop() { + t.cancelFunc() +} + +func (t *tester) updateAndTest(numLogsToSend int, cfg, expectedMetricsBeforeSendingLogs, expectedMetricsAfterSendingLogs string) { + var args Arguments + err := river.Unmarshal([]byte(cfg), &args) + require.NoError(t.t, err) + + args.ForwardTo = []loki.LogsReceiver{t.logReceiver} + + t.component.Update(args) + + // Check the component metrics. + if err := testutil.GatherAndCompare(t.registry, + strings.NewReader(expectedMetricsBeforeSendingLogs)); err != nil { + require.NoError(t.t, err) + } + + // Send logs. + for i := 0; i < numLogsToSend; i++ { + t.component.receiver.Chan() <- t.logEntry + } + + // Receive logs. + for i := 0; i < numLogsToSend; i++ { + select { + case logEntry := <-t.logReceiver.Chan(): + require.True(t.t, t.logTimestamp.Equal(logEntry.Timestamp)) + require.Equal(t.t, logline, logEntry.Line) + require.Equal(t.t, t.wantLabelSet, logEntry.Labels) + case <-time.After(5 * time.Second): + require.FailNow(t.t, "failed waiting for log line") + } + } + + // Check the component metrics. + if err := testutil.GatherAndCompare(t.registry, + strings.NewReader(expectedMetricsAfterSendingLogs)); err != nil { + require.NoError(t.t, err) + } +} diff --git a/internal/util/unregisterer.go b/internal/util/unregisterer.go index 822132b01785..eedcabc2441a 100644 --- a/internal/util/unregisterer.go +++ b/internal/util/unregisterer.go @@ -18,6 +18,38 @@ func WrapWithUnregisterer(reg prometheus.Registerer) *Unregisterer { } } +// An "unchecked collector" is a collector which returns an empty description. +// It is described in the Prometheus documentation, here: +// https://pkg.go.dev/github.com/prometheus/client_golang/prometheus#hdr-Custom_Collectors_and_constant_Metrics +// +// > Alternatively, you could return no Desc at all, which will mark the Collector “unchecked”. +// > No checks are performed at registration time, but metric consistency will still be ensured at scrape time, +// > i.e. any inconsistencies will lead to scrape errors. Thus, with unchecked Collectors, +// > the responsibility to not collect metrics that lead to inconsistencies in the total scrape result +// > lies with the implementer of the Collector. While this is not a desirable state, it is sometimes necessary. +// > The typical use case is a situation where the exact metrics to be returned by a Collector cannot be predicted +// > at registration time, but the implementer has sufficient knowledge of the whole system to guarantee metric consistency. +// +// Unchecked collectors are used in the Loki "metrics" stage of the Loki "process" component. +// +// The isUncheckedCollector function is similar to how Prometheus' Go client extracts the metric description: +// https://github.com/prometheus/client_golang/blob/45f1e72421d9d11af6be784ad60b7389f7543e70/prometheus/registry.go#L372-L381 +func isUncheckedCollector(c prometheus.Collector) bool { + descChan := make(chan *prometheus.Desc, 10) + + go func() { + c.Describe(descChan) + close(descChan) + }() + + i := 0 + for range descChan { + i += 1 + } + + return i == 0 +} + // Register implements prometheus.Registerer. func (u *Unregisterer) Register(c prometheus.Collector) error { if u.wrap == nil { @@ -28,6 +60,11 @@ func (u *Unregisterer) Register(c prometheus.Collector) error { if err != nil { return err } + + if isUncheckedCollector(c) { + return nil + } + u.cs[c] = struct{}{} return nil } @@ -43,6 +80,10 @@ func (u *Unregisterer) MustRegister(cs ...prometheus.Collector) { // Unregister implements prometheus.Registerer. func (u *Unregisterer) Unregister(c prometheus.Collector) bool { + if isUncheckedCollector(c) { + return true + } + if u.wrap != nil && u.wrap.Unregister(c) { delete(u.cs, c) return true diff --git a/internal/util/unregisterer_test.go b/internal/util/unregisterer_test.go new file mode 100644 index 000000000000..e35cfbe6cbb4 --- /dev/null +++ b/internal/util/unregisterer_test.go @@ -0,0 +1,35 @@ +package util + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" +) + +func Test_UnregisterTwice_NormalCollector(t *testing.T) { + u := WrapWithUnregisterer(prometheus.NewRegistry()) + c := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "test_metric", + Help: "Test metric.", + }) + u.Register(c) + require.True(t, u.Unregister(c)) + require.False(t, u.Unregister(c)) +} + +type uncheckedCollector struct{} + +func (uncheckedCollector) Describe(chan<- *prometheus.Desc) {} + +func (uncheckedCollector) Collect(chan<- prometheus.Metric) {} + +var _ prometheus.Collector = uncheckedCollector{} + +func Test_UnregisterTwice_UncheckedCollector(t *testing.T) { + u := WrapWithUnregisterer(prometheus.NewRegistry()) + c := uncheckedCollector{} + u.Register(c) + require.True(t, u.Unregister(c)) + require.True(t, u.Unregister(c)) +} diff --git a/static/logs/logs_test.go b/static/logs/logs_test.go index 9ff1f1465bfc..42472befc425 100644 --- a/static/logs/logs_test.go +++ b/static/logs/logs_test.go @@ -3,7 +3,6 @@ package logs import ( - "bytes" "fmt" "net" "net/http" @@ -19,6 +18,7 @@ import ( "github.com/grafana/agent/internal/util" "github.com/grafana/loki/pkg/logproto" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" ) @@ -38,6 +38,8 @@ func checkConfigReloadLog(t *testing.T, logs string, expectedOccurances int) { } func TestLogs(t *testing.T) { + reg := prometheus.NewRegistry() + // // Create a temporary file to tail // @@ -87,16 +89,26 @@ configs: labels: job: test __path__: %s - `, positionsDir, lis.Addr().String(), tmpFile.Name())) + pipeline_stages: + - metrics: + log_lines_total: + type: Counter + description: "total number of log lines" + prefix: my_promtail_custom_ + max_idle_duration: 24h + config: + match_all: true + action: inc +`, positionsDir, lis.Addr().String(), tmpFile.Name())) var cfg Config dec := yaml.NewDecoder(strings.NewReader(cfgText)) dec.SetStrict(true) require.NoError(t, dec.Decode(&cfg)) require.NoError(t, cfg.ApplyDefaults()) - logBuffer := bytes.Buffer{} + logBuffer := util.SyncBuffer{} logger := log.NewSyncLogger(log.NewLogfmtLogger(&logBuffer)) - l, err := New(prometheus.NewRegistry(), &cfg, logger, false) + l, err := New(reg, &cfg, logger, false) require.NoError(t, err) defer l.Stop() @@ -115,6 +127,12 @@ configs: // We expect the config reload log line to not be printed. checkConfigReloadLog(t, logBuffer.String(), 0) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` +# HELP my_promtail_custom_log_lines_total total number of log lines +# TYPE my_promtail_custom_log_lines_total counter +my_promtail_custom_log_lines_total{filename="`+tmpFile.Name()+`",job="test",logs_config="default"} 1 +`), "my_promtail_custom_log_lines_total")) + // // Apply the same config and try reloading. // Recreate the config struct to make sure it's clean. @@ -128,6 +146,13 @@ configs: checkConfigReloadLog(t, logBuffer.String(), 1) + // The metrics should stay the same, as the config didn't change. + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` +# HELP my_promtail_custom_log_lines_total total number of log lines +# TYPE my_promtail_custom_log_lines_total counter +my_promtail_custom_log_lines_total{filename="`+tmpFile.Name()+`",job="test",logs_config="default"} 1 +`), "my_promtail_custom_log_lines_total")) + // // Apply a new config and write a new line. // @@ -146,7 +171,17 @@ configs: labels: job: test-2 __path__: %s - `, positionsDir, lis.Addr().String(), tmpFile.Name())) + pipeline_stages: + - metrics: + log_lines_total2: + type: Counter + description: "total number of log lines" + prefix: my_promtail_custom2_ + max_idle_duration: 24h + config: + match_all: true + action: inc +`, positionsDir, lis.Addr().String(), tmpFile.Name())) var newCfg Config dec = yaml.NewDecoder(strings.NewReader(cfgText)) @@ -155,6 +190,9 @@ configs: require.NoError(t, newCfg.ApplyDefaults()) require.NoError(t, l.ApplyConfig(&newCfg, false)) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(``), + "my_promtail_custom_log_lines_total", "my_promtail_custom2_log_lines_total2")) + fmt.Fprintf(tmpFile, "Hello again!\n") select { case <-time.After(time.Second * 30): @@ -167,6 +205,13 @@ configs: // We expect the config reload log line to not be printed again. checkConfigReloadLog(t, logBuffer.String(), 1) + // The metrics changed, and the old metric is no longer visible. + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP my_promtail_custom2_log_lines_total2 total number of log lines + # TYPE my_promtail_custom2_log_lines_total2 counter + my_promtail_custom2_log_lines_total2{filename="`+tmpFile.Name()+`",job="test-2",logs_config="default"} 1 + `), "my_promtail_custom_log_lines_total", "my_promtail_custom2_log_lines_total2")) + t.Run("update to nil", func(t *testing.T) { // Applying a nil config should remove all instances. err := l.ApplyConfig(nil, false) diff --git a/static/traces/traces_test.go b/static/traces/traces_test.go index 2727a5f32f70..0c9f5382063a 100644 --- a/static/traces/traces_test.go +++ b/static/traces/traces_test.go @@ -225,7 +225,7 @@ configs: err := dec.Decode(&cfg) require.NoError(t, err) - logBuffer := bytes.Buffer{} + logBuffer := util.SyncBuffer{} logger := log.NewLogfmtLogger(&logBuffer) traces, err := New(nil, nil, prometheus.NewRegistry(), cfg, logger)