diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c52e6916558..dee8147ff3b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -136,6 +136,7 @@ with the ecs field name `container`. {pull}34403[34403] automatic splitting at root level, if root level element is an array. {pull}34155[34155] - Fix broken mapping for state.ends field. {pull}34891[34891] - Fix issue using projects in airgapped environments by disabling npm audit. {pull}34936[34936] +- Fix broken state ID location naming. {pull}35336[35336] *Heartbeat* diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 5718927369d..58393474153 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -131,6 +131,11 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) { }), trace: trace, } + runFromID := "" + if parsedConfig.RunFrom != nil { + runFromID = parsedConfig.RunFrom.ID + } + logp.L().Infof("heartbeat starting, running from: %v", runFromID) return bt, nil } @@ -300,7 +305,7 @@ func makeStatesClient(cfg *conf.C, replace func(monitorstate.StateLoader), runFr } if esClient != nil { - logp.L().Info("replacing states loader") + logp.L().Info("using ES states loader") replace(monitorstate.MakeESLoader(esClient, "synthetics-*,heartbeat-*", runFrom)) } diff --git a/heartbeat/hbtestllext/isdefs.go b/heartbeat/hbtestllext/isdefs.go index f5fdc4feeb8..e20f2cb18a1 100644 --- a/heartbeat/hbtestllext/isdefs.go +++ b/heartbeat/hbtestllext/isdefs.go @@ -18,6 +18,8 @@ package hbtestllext import ( + "fmt" + "regexp" "strings" "time" @@ -62,6 +64,22 @@ var IsMonitorState = isdef.Is("isState", func(path llpath.Path, v interface{}) * return llresult.ValidResult(path) }) +var IsMonitorStateInLocation = func(locName string) isdef.IsDef { + locPattern := fmt.Sprintf("^%s-[a-z0-9]+-0$", locName) + stateIdMatch := regexp.MustCompile(locPattern) + return isdef.Is("isState", func(path llpath.Path, v interface{}) *llresult.Results { + s, ok := v.(monitorstate.State) + if !ok { + return llresult.SimpleResult(path, false, "expected a monitorstate.State") + } + + if !stateIdMatch.MatchString(s.ID) { + return llresult.SimpleResult(path, false, fmt.Sprintf("ID %s does not match regexp pattern /%s/", s.ID, locPattern)) + } + return llresult.ValidResult(path) + }) +} + var IsECSErr = func(expectedErr *ecserr.ECSErr) isdef.IsDef { return isdef.Is("matches ECS ERR", func(path llpath.Path, v interface{}) *llresult.Results { // This conditional is a bit awkward, apparently there's a bug in lookslike where a pointer diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index 61dca6a5147..b9a25cbe348 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -159,6 +159,26 @@ func (f *RunnerFactory) Create(p beat.Pipeline, c *conf.C) (cfgfile.Runner, erro if err != nil { return nil, fmt.Errorf("could not create pipeline client via factory: %w", err) } + + // The state loader needs the beat location to accurately load the last state + sf, err := stdfields.ConfigToStdMonitorFields(c) + if err != nil { + return nil, fmt.Errorf("could not load stdfields in factory: %w", err) + } + loc := getLocation(f.beatLocation, sf) + if loc != nil { + geoMap, _ := util.GeoConfigToMap(loc.Geo) + err = c.Merge(map[string]interface{}{ + "run_from": map[string]interface{}{ + "id": f.beatLocation.ID, + "geo": geoMap, + }, + }) + if err != nil { + return nil, fmt.Errorf("could not merge location into monitor map: %w", err) + } + } + monitor, err := newMonitor(c, f.pluginsReg, pc, f.addTask, f.stateLoader, safeStop) if err != nil { return nil, fmt.Errorf("factory could not create monitor: %w", err) @@ -183,6 +203,19 @@ func (f *RunnerFactory) CheckConfig(config *conf.C) error { return checkMonitorConfig(config, plugin.GlobalPluginsReg) } +// getLocation returns the location either from the stdfields or the beat preferring stdfields. Returns nil if declared in neither spot. +func getLocation(beatLocation *config.LocationWithID, sf stdfields.StdMonitorFields) (loc *config.LocationWithID) { + // Use the monitor-specific location if possible, otherwise use the beat's location + // Generally speaking direct HB users would use the beat location, and the synthetics service may as well (TBD) + // while Fleet configured monitors will always use a per location monitor + if sf.RunFrom != nil { + loc = sf.RunFrom + } else { + loc = beatLocation + } + return loc +} + func newCommonPublishConfigs(info beat.Info, beatLocation *config.LocationWithID, cfg *conf.C) (pipetool.ConfigEditor, error) { var settings publishSettings if err := cfg.Unpack(&settings); err != nil { @@ -195,16 +228,8 @@ func newCommonPublishConfigs(info beat.Info, beatLocation *config.LocationWithID } // Early stage processors for setting data_stream, event.dataset, and index to write to + loc := getLocation(beatLocation, sf) - // Use the monitor-specific location if possible, otherwise use the beat's location - // Generally speaking direct HB users would use the beat location, and the synthetics service may as well (TBD) - // while Fleet configured monitors will always use a per location monitor - var loc *config.LocationWithID - if sf.RunFrom != nil { - loc = sf.RunFrom - } else { - loc = beatLocation - } preProcs, err := preProcessors(info, loc, settings, sf.Type) if err != nil { return nil, err diff --git a/heartbeat/monitors/wrappers/monitorstate/esloader.go b/heartbeat/monitors/wrappers/monitorstate/esloader.go index b7302dd8a78..877e23edb21 100644 --- a/heartbeat/monitors/wrappers/monitorstate/esloader.go +++ b/heartbeat/monitors/wrappers/monitorstate/esloader.go @@ -36,7 +36,12 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation logp.L().Warn("ES state loader initialized with no index pattern, will not load states from ES") return NilStateLoader } + return func(sf stdfields.StdMonitorFields) (*State, error) { + var runFromID string + if sf.RunFrom != nil { + runFromID = sf.RunFrom.ID + } queryMustClauses := []mapstr.M{ { "match": mapstr.M{"monitor.id": sf.ID}, @@ -54,9 +59,9 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation }, } - if sf.RunFrom != nil { + if runFromID != "" { queryMustClauses = append(queryMustClauses, mapstr.M{ - "match": mapstr.M{"observer.name": sf.RunFrom.ID}, + "match": mapstr.M{"observer.name": runFromID}, }) } reqBody := mapstr.M{ @@ -70,7 +75,7 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation status, body, err := esc.Request("POST", strings.Join([]string{"/", indexPattern, "/", "_search", "?size=1"}, ""), "", nil, reqBody) if err != nil || status > 299 { - return nil, fmt.Errorf("error executing state search for %s: %w", sf.ID, err) + return nil, fmt.Errorf("error executing state search for %s in loc=%s: %w", sf.ID, runFromID, err) } type stateHits struct { @@ -91,7 +96,7 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation } if len(sh.Hits.Hits) == 0 { - logp.L().Infof("no previous state found for monitor %s", sf.ID) + logp.L().Infof("no previous state found for monitor %s in Elasticsearch (loc=%s)", sf.ID, runFromID) return nil, nil } diff --git a/heartbeat/monitors/wrappers/monitorstate/monitorstate.go b/heartbeat/monitors/wrappers/monitorstate/monitorstate.go index 2b5046d7dd3..0d9a8d7c60d 100644 --- a/heartbeat/monitors/wrappers/monitorstate/monitorstate.go +++ b/heartbeat/monitors/wrappers/monitorstate/monitorstate.go @@ -74,6 +74,13 @@ type State struct { ctr int } +func (s *State) String() string { + if s == nil { + return "" + } + return fmt.Sprintf("", s.ID, s.StartedAt, s.Up, s.Down) +} + func (s *State) incrementCounters(status StateStatus) { s.DurationMs = time.Since(s.StartedAt).Milliseconds() s.Checks++ diff --git a/heartbeat/monitors/wrappers/monitorstate/tracker.go b/heartbeat/monitors/wrappers/monitorstate/tracker.go index 2fe018e17b9..d3e9d7ff178 100644 --- a/heartbeat/monitors/wrappers/monitorstate/tracker.go +++ b/heartbeat/monitors/wrappers/monitorstate/tracker.go @@ -65,6 +65,7 @@ func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateSta state := t.getCurrentState(sf) if state == nil { state = newMonitorState(sf, newStatus, 0, t.flappingEnabled) + logp.L().Infof("initializing new state for monitor %s: %s", sf.ID, state.String()) t.states[sf.ID] = state } else { state.recordCheck(sf, newStatus) @@ -84,6 +85,9 @@ func (t *Tracker) getCurrentState(sf stdfields.StdMonitorFields) (state *State) for i := 0; i < tries; i++ { loadedState, err = t.stateLoader(sf) if err == nil { + if loadedState != nil { + logp.L().Infof("loaded previous state for monitor %s: %s", sf.ID, loadedState.String()) + } break } diff --git a/x-pack/heartbeat/scenarios/basics_test.go b/x-pack/heartbeat/scenarios/basics_test.go index db82493da70..06ebb39f15b 100644 --- a/x-pack/heartbeat/scenarios/basics_test.go +++ b/x-pack/heartbeat/scenarios/basics_test.go @@ -83,6 +83,7 @@ func TestRunFromOverride(t *testing.T) { scenarioDB.RunAllWithATwist(t, TwistAddRunFrom, func(t *testing.T, mtr *framework.MonitorTestRun, err error) { for _, e := range mtr.Events() { testslike.Test(t, lookslike.MustCompile(map[string]interface{}{ + "state": hbtestllext.IsMonitorStateInLocation(TestLocationDefault.ID), "observer": map[string]interface{}{ "name": TestLocationDefault.ID, "geo": map[string]interface{}{