Skip to content

Commit

Permalink
[Heartbeat] Fix run from location in states loader (elastic#35336)
Browse files Browse the repository at this point in the history
* [Heartbeat] Fix run from location in states loader

The states loader currently does not correctly read the location from
the `heartbeat.run_from.id` config option. It's currently only applied
to the `observer.*` fields. This patch fixes that.

* Fix errcheck

* Add tests

* Fixes

* Add changelog

* More nil checks

---------

Co-authored-by: Vignesh Shanmugam <vignesh.shanmugam22@gmail.com>
  • Loading branch information
andrewvc and vigneshshanmugam committed May 5, 2023
1 parent 7e3bd7c commit 3550280
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ with the ecs field name `container`. {pull}34403[34403]
automatic splitting at root level, if root level element is an array. {pull}34155[34155]
- Fix broken mapping for state.ends field. {pull}34891[34891]
- Fix issue using projects in airgapped environments by disabling npm audit. {pull}34936[34936]
- Fix broken state ID location naming. {pull}35336[35336]

*Heartbeat*

Expand Down
7 changes: 6 additions & 1 deletion heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) {
}),
trace: trace,
}
runFromID := "<unknown location>"
if parsedConfig.RunFrom != nil {
runFromID = parsedConfig.RunFrom.ID
}
logp.L().Infof("heartbeat starting, running from: %v", runFromID)
return bt, nil
}

Expand Down Expand Up @@ -300,7 +305,7 @@ func makeStatesClient(cfg *conf.C, replace func(monitorstate.StateLoader), runFr
}

if esClient != nil {
logp.L().Info("replacing states loader")
logp.L().Info("using ES states loader")
replace(monitorstate.MakeESLoader(esClient, "synthetics-*,heartbeat-*", runFrom))
}

Expand Down
18 changes: 18 additions & 0 deletions heartbeat/hbtestllext/isdefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package hbtestllext

import (
"fmt"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -62,6 +64,22 @@ var IsMonitorState = isdef.Is("isState", func(path llpath.Path, v interface{}) *
return llresult.ValidResult(path)
})

var IsMonitorStateInLocation = func(locName string) isdef.IsDef {
locPattern := fmt.Sprintf("^%s-[a-z0-9]+-0$", locName)
stateIdMatch := regexp.MustCompile(locPattern)
return isdef.Is("isState", func(path llpath.Path, v interface{}) *llresult.Results {
s, ok := v.(monitorstate.State)
if !ok {
return llresult.SimpleResult(path, false, "expected a monitorstate.State")
}

if !stateIdMatch.MatchString(s.ID) {
return llresult.SimpleResult(path, false, fmt.Sprintf("ID %s does not match regexp pattern /%s/", s.ID, locPattern))
}
return llresult.ValidResult(path)
})
}

var IsECSErr = func(expectedErr *ecserr.ECSErr) isdef.IsDef {
return isdef.Is("matches ECS ERR", func(path llpath.Path, v interface{}) *llresult.Results {
// This conditional is a bit awkward, apparently there's a bug in lookslike where a pointer
Expand Down
43 changes: 34 additions & 9 deletions heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,26 @@ func (f *RunnerFactory) Create(p beat.Pipeline, c *conf.C) (cfgfile.Runner, erro
if err != nil {
return nil, fmt.Errorf("could not create pipeline client via factory: %w", err)
}

// The state loader needs the beat location to accurately load the last state
sf, err := stdfields.ConfigToStdMonitorFields(c)
if err != nil {
return nil, fmt.Errorf("could not load stdfields in factory: %w", err)
}
loc := getLocation(f.beatLocation, sf)
if loc != nil {
geoMap, _ := util.GeoConfigToMap(loc.Geo)
err = c.Merge(map[string]interface{}{
"run_from": map[string]interface{}{
"id": f.beatLocation.ID,
"geo": geoMap,
},
})
if err != nil {
return nil, fmt.Errorf("could not merge location into monitor map: %w", err)
}
}

monitor, err := newMonitor(c, f.pluginsReg, pc, f.addTask, f.stateLoader, safeStop)
if err != nil {
return nil, fmt.Errorf("factory could not create monitor: %w", err)
Expand All @@ -183,6 +203,19 @@ func (f *RunnerFactory) CheckConfig(config *conf.C) error {
return checkMonitorConfig(config, plugin.GlobalPluginsReg)
}

// getLocation returns the location either from the stdfields or the beat preferring stdfields. Returns nil if declared in neither spot.
func getLocation(beatLocation *config.LocationWithID, sf stdfields.StdMonitorFields) (loc *config.LocationWithID) {
// Use the monitor-specific location if possible, otherwise use the beat's location
// Generally speaking direct HB users would use the beat location, and the synthetics service may as well (TBD)
// while Fleet configured monitors will always use a per location monitor
if sf.RunFrom != nil {
loc = sf.RunFrom
} else {
loc = beatLocation
}
return loc
}

func newCommonPublishConfigs(info beat.Info, beatLocation *config.LocationWithID, cfg *conf.C) (pipetool.ConfigEditor, error) {
var settings publishSettings
if err := cfg.Unpack(&settings); err != nil {
Expand All @@ -195,16 +228,8 @@ func newCommonPublishConfigs(info beat.Info, beatLocation *config.LocationWithID
}

// Early stage processors for setting data_stream, event.dataset, and index to write to
loc := getLocation(beatLocation, sf)

// Use the monitor-specific location if possible, otherwise use the beat's location
// Generally speaking direct HB users would use the beat location, and the synthetics service may as well (TBD)
// while Fleet configured monitors will always use a per location monitor
var loc *config.LocationWithID
if sf.RunFrom != nil {
loc = sf.RunFrom
} else {
loc = beatLocation
}
preProcs, err := preProcessors(info, loc, settings, sf.Type)
if err != nil {
return nil, err
Expand Down
13 changes: 9 additions & 4 deletions heartbeat/monitors/wrappers/monitorstate/esloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation
logp.L().Warn("ES state loader initialized with no index pattern, will not load states from ES")
return NilStateLoader
}

return func(sf stdfields.StdMonitorFields) (*State, error) {
var runFromID string
if sf.RunFrom != nil {
runFromID = sf.RunFrom.ID
}
queryMustClauses := []mapstr.M{
{
"match": mapstr.M{"monitor.id": sf.ID},
Expand All @@ -54,9 +59,9 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation
},
}

if sf.RunFrom != nil {
if runFromID != "" {
queryMustClauses = append(queryMustClauses, mapstr.M{
"match": mapstr.M{"observer.name": sf.RunFrom.ID},
"match": mapstr.M{"observer.name": runFromID},
})
}
reqBody := mapstr.M{
Expand All @@ -70,7 +75,7 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation

status, body, err := esc.Request("POST", strings.Join([]string{"/", indexPattern, "/", "_search", "?size=1"}, ""), "", nil, reqBody)
if err != nil || status > 299 {
return nil, fmt.Errorf("error executing state search for %s: %w", sf.ID, err)
return nil, fmt.Errorf("error executing state search for %s in loc=%s: %w", sf.ID, runFromID, err)
}

type stateHits struct {
Expand All @@ -91,7 +96,7 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation
}

if len(sh.Hits.Hits) == 0 {
logp.L().Infof("no previous state found for monitor %s", sf.ID)
logp.L().Infof("no previous state found for monitor %s in Elasticsearch (loc=%s)", sf.ID, runFromID)
return nil, nil
}

Expand Down
7 changes: 7 additions & 0 deletions heartbeat/monitors/wrappers/monitorstate/monitorstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ type State struct {
ctr int
}

func (s *State) String() string {
if s == nil {
return "<monitorstate:nil>"
}
return fmt.Sprintf("<monitorstate:id=%s,started=%s,up=%d,down=%d>", s.ID, s.StartedAt, s.Up, s.Down)
}

func (s *State) incrementCounters(status StateStatus) {
s.DurationMs = time.Since(s.StartedAt).Milliseconds()
s.Checks++
Expand Down
4 changes: 4 additions & 0 deletions heartbeat/monitors/wrappers/monitorstate/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateSta
state := t.getCurrentState(sf)
if state == nil {
state = newMonitorState(sf, newStatus, 0, t.flappingEnabled)
logp.L().Infof("initializing new state for monitor %s: %s", sf.ID, state.String())
t.states[sf.ID] = state
} else {
state.recordCheck(sf, newStatus)
Expand All @@ -84,6 +85,9 @@ func (t *Tracker) getCurrentState(sf stdfields.StdMonitorFields) (state *State)
for i := 0; i < tries; i++ {
loadedState, err = t.stateLoader(sf)
if err == nil {
if loadedState != nil {
logp.L().Infof("loaded previous state for monitor %s: %s", sf.ID, loadedState.String())
}
break
}

Expand Down
1 change: 1 addition & 0 deletions x-pack/heartbeat/scenarios/basics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestRunFromOverride(t *testing.T) {
scenarioDB.RunAllWithATwist(t, TwistAddRunFrom, func(t *testing.T, mtr *framework.MonitorTestRun, err error) {
for _, e := range mtr.Events() {
testslike.Test(t, lookslike.MustCompile(map[string]interface{}{
"state": hbtestllext.IsMonitorStateInLocation(TestLocationDefault.ID),
"observer": map[string]interface{}{
"name": TestLocationDefault.ID,
"geo": map[string]interface{}{
Expand Down

0 comments on commit 3550280

Please sign in to comment.