Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Fix run from location in states loader #35336

Merged
merged 8 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ with the ecs field name `container`. {pull}34403[34403]
automatic splitting at root level, if root level element is an array. {pull}34155[34155]
- Fix broken mapping for state.ends field. {pull}34891[34891]
- Fix issue using projects in airgapped environments by disabling npm audit. {pull}34936[34936]
- Fix broken state ID location naming. {pull}35336[35336]

*Heartbeat*

Expand Down
7 changes: 6 additions & 1 deletion heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) {
}),
trace: trace,
}
runFromID := "<unknown location>"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we have default value as runFrom Id in state and unknown here, can we follow a common value.

if parsedConfig.RunFrom != nil {
runFromID = parsedConfig.RunFrom.ID
}
logp.L().Infof("heartbeat starting, running from: %v", runFromID)
return bt, nil
}

Expand Down Expand Up @@ -300,7 +305,7 @@ func makeStatesClient(cfg *conf.C, replace func(monitorstate.StateLoader), runFr
}

if esClient != nil {
logp.L().Info("replacing states loader")
logp.L().Info("using ES states loader")
replace(monitorstate.MakeESLoader(esClient, "synthetics-*,heartbeat-*", runFrom))
}

Expand Down
18 changes: 18 additions & 0 deletions heartbeat/hbtestllext/isdefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package hbtestllext

import (
"fmt"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -62,6 +64,22 @@ var IsMonitorState = isdef.Is("isState", func(path llpath.Path, v interface{}) *
return llresult.ValidResult(path)
})

var IsMonitorStateInLocation = func(locName string) isdef.IsDef {
locPattern := fmt.Sprintf("^%s-[a-z0-9]+-0$", locName)
stateIdMatch := regexp.MustCompile(locPattern)
return isdef.Is("isState", func(path llpath.Path, v interface{}) *llresult.Results {
s, ok := v.(monitorstate.State)
if !ok {
return llresult.SimpleResult(path, false, "expected a monitorstate.State")
}

if !stateIdMatch.MatchString(s.ID) {
return llresult.SimpleResult(path, false, fmt.Sprintf("ID %s does not match regexp pattern /%s/", s.ID, locPattern))
}
return llresult.ValidResult(path)
})
}

var IsECSErr = func(expectedErr *ecserr.ECSErr) isdef.IsDef {
return isdef.Is("matches ECS ERR", func(path llpath.Path, v interface{}) *llresult.Results {
// This conditional is a bit awkward, apparently there's a bug in lookslike where a pointer
Expand Down
43 changes: 34 additions & 9 deletions heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,26 @@ func (f *RunnerFactory) Create(p beat.Pipeline, c *conf.C) (cfgfile.Runner, erro
if err != nil {
return nil, fmt.Errorf("could not create pipeline client via factory: %w", err)
}

// The state loader needs the beat location to accurately load the last state
sf, err := stdfields.ConfigToStdMonitorFields(c)
if err != nil {
return nil, fmt.Errorf("could not load stdfields in factory: %w", err)
}
loc := getLocation(f.beatLocation, sf)
if loc != nil {
geoMap, _ := util.GeoConfigToMap(loc.Geo)
err = c.Merge(map[string]interface{}{
"run_from": map[string]interface{}{
"id": f.beatLocation.ID,
"geo": geoMap,
},
})
if err != nil {
return nil, fmt.Errorf("could not merge location into monitor map: %w", err)
}
}

monitor, err := newMonitor(c, f.pluginsReg, pc, f.addTask, f.stateLoader, safeStop)
if err != nil {
return nil, fmt.Errorf("factory could not create monitor: %w", err)
Expand All @@ -183,6 +203,19 @@ func (f *RunnerFactory) CheckConfig(config *conf.C) error {
return checkMonitorConfig(config, plugin.GlobalPluginsReg)
}

// getLocation returns the location either from the stdfields or the beat preferring stdfields. Returns nil if declared in neither spot.
func getLocation(beatLocation *config.LocationWithID, sf stdfields.StdMonitorFields) (loc *config.LocationWithID) {
// Use the monitor-specific location if possible, otherwise use the beat's location
// Generally speaking direct HB users would use the beat location, and the synthetics service may as well (TBD)
// while Fleet configured monitors will always use a per location monitor
if sf.RunFrom != nil {
loc = sf.RunFrom
} else {
loc = beatLocation
}
return loc
}

func newCommonPublishConfigs(info beat.Info, beatLocation *config.LocationWithID, cfg *conf.C) (pipetool.ConfigEditor, error) {
var settings publishSettings
if err := cfg.Unpack(&settings); err != nil {
Expand All @@ -195,16 +228,8 @@ func newCommonPublishConfigs(info beat.Info, beatLocation *config.LocationWithID
}

// Early stage processors for setting data_stream, event.dataset, and index to write to
loc := getLocation(beatLocation, sf)

// Use the monitor-specific location if possible, otherwise use the beat's location
// Generally speaking direct HB users would use the beat location, and the synthetics service may as well (TBD)
// while Fleet configured monitors will always use a per location monitor
var loc *config.LocationWithID
if sf.RunFrom != nil {
loc = sf.RunFrom
} else {
loc = beatLocation
}
preProcs, err := preProcessors(info, loc, settings, sf.Type)
if err != nil {
return nil, err
Expand Down
13 changes: 9 additions & 4 deletions heartbeat/monitors/wrappers/monitorstate/esloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation
logp.L().Warn("ES state loader initialized with no index pattern, will not load states from ES")
return NilStateLoader
}

return func(sf stdfields.StdMonitorFields) (*State, error) {
var runFromID string
if sf.RunFrom != nil {
Copy link
Member

@vigneshshanmugam vigneshshanmugam May 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldnt we default to using beatLocation otherwise, passed as 3rd param in func?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this is that we want the fleet integration to set the location correctly, however the fleet integration can only set monitor fields, not root fields. Hence the precedence

runFromID = sf.RunFrom.ID
}
queryMustClauses := []mapstr.M{
{
"match": mapstr.M{"monitor.id": sf.ID},
Expand All @@ -54,9 +59,9 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation
},
}

if sf.RunFrom != nil {
if runFromID != "" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found a bug with this approach, As we are passing the state query only when runFrom is specified. It leads to a bug when then runFrom is set and removed later.

  1. Set heartbeat.runfrom to a test-loc, state will created on index <monitorstate:id=test-loc-<blah>>
  2. Remove the value of run_from, now instead of checking the default location or unknown if we change the naming. We will be look at the wrong index for the monitor.

queryMustClauses = append(queryMustClauses, mapstr.M{
"match": mapstr.M{"observer.name": sf.RunFrom.ID},
"match": mapstr.M{"observer.name": runFromID},
})
}
reqBody := mapstr.M{
Expand All @@ -70,7 +75,7 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation

status, body, err := esc.Request("POST", strings.Join([]string{"/", indexPattern, "/", "_search", "?size=1"}, ""), "", nil, reqBody)
if err != nil || status > 299 {
return nil, fmt.Errorf("error executing state search for %s: %w", sf.ID, err)
return nil, fmt.Errorf("error executing state search for %s in loc=%s: %w", sf.ID, runFromID, err)
}

type stateHits struct {
Expand All @@ -91,7 +96,7 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation
}

if len(sh.Hits.Hits) == 0 {
logp.L().Infof("no previous state found for monitor %s", sf.ID)
logp.L().Infof("no previous state found for monitor %s in Elasticsearch (loc=%s)", sf.ID, runFromID)
return nil, nil
}

Expand Down
7 changes: 7 additions & 0 deletions heartbeat/monitors/wrappers/monitorstate/monitorstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ type State struct {
ctr int
}

func (s *State) String() string {
if s == nil {
return "<monitorstate:nil>"
}
return fmt.Sprintf("<monitorstate:id=%s,started=%s,up=%d,down=%d>", s.ID, s.StartedAt, s.Up, s.Down)
}

func (s *State) incrementCounters(status StateStatus) {
s.DurationMs = time.Since(s.StartedAt).Milliseconds()
s.Checks++
Expand Down
4 changes: 4 additions & 0 deletions heartbeat/monitors/wrappers/monitorstate/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateSta
state := t.getCurrentState(sf)
if state == nil {
state = newMonitorState(sf, newStatus, 0, t.flappingEnabled)
logp.L().Infof("initializing new state for monitor %s: %s", sf.ID, state.String())
t.states[sf.ID] = state
} else {
state.recordCheck(sf, newStatus)
Expand All @@ -84,6 +85,9 @@ func (t *Tracker) getCurrentState(sf stdfields.StdMonitorFields) (state *State)
for i := 0; i < tries; i++ {
loadedState, err = t.stateLoader(sf)
if err == nil {
if loadedState != nil {
logp.L().Infof("loaded previous state for monitor %s: %s", sf.ID, loadedState.String())
}
break
}

Expand Down
1 change: 1 addition & 0 deletions x-pack/heartbeat/scenarios/basics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestRunFromOverride(t *testing.T) {
scenarioDB.RunAllWithATwist(t, TwistAddRunFrom, func(t *testing.T, mtr *framework.MonitorTestRun, err error) {
for _, e := range mtr.Events() {
testslike.Test(t, lookslike.MustCompile(map[string]interface{}{
"state": hbtestllext.IsMonitorStateInLocation(TestLocationDefault.ID),
"observer": map[string]interface{}{
"name": TestLocationDefault.ID,
"geo": map[string]interface{}{
Expand Down