Skip to content

Commit

Permalink
[Heartbeat] States ES Client backoff and run once exit (elastic#35376)
Browse files Browse the repository at this point in the history
* Clean up config file

* Fix zero duration panic

* Add changelog

* Address feedback
  • Loading branch information
emilioalvap committed May 10, 2023
1 parent d34fe83 commit 29fcd59
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Fix broken mapping for state.ends field. {pull}34891[34891]
- Fix issue using projects in airgapped environments by disabling npm audit. {pull}34936[34936]
- Fix broken state ID location naming. {pull}35336[35336]
- Fix output pipeline exit on run_once. {pull}35376[35376]

*Heartbeat*

Expand Down
55 changes: 41 additions & 14 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package beater

import (
"context"
"errors"
"fmt"
"sync"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/elastic/beats/v7/libbeat/autodiscover"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/backoff"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/management"
Expand Down Expand Up @@ -66,11 +68,20 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) {
return nil, fmt.Errorf("error reading config file: %w", err)
}

// Check if any of these can prevent using states client
stateLoader, replaceStateLoader := monitorstate.AtomicStateLoader(monitorstate.NilStateLoader)
if b.Config.Output.Name() == "elasticsearch" && !b.Manager.Enabled() {
// Connect to ES and setup the State loader if the output is not managed by agent
if err := makeStatesClient(b.Config.Output.Config(), replaceStateLoader, parsedConfig.RunFrom); err != nil {
logp.L().Warnf("could not connect to ES for state management during initial load: %s", err)
// Note this, intentionally, blocks until connected or max attempts reached
esClient, err := makeESClient(b.Config.Output.Config(), 3, 2*time.Second)
if err != nil {
if parsedConfig.RunOnce {
return nil, fmt.Errorf("run_once mode fatal error: %w", err)
} else {
logp.L().Warnf("skipping monitor state management: %w", err)
}
} else {
replaceStateLoader(monitorstate.MakeESLoader(esClient, monitorstate.DefaultDataStreams, parsedConfig.RunFrom))
}
} else if b.Manager.Enabled() {
stateLoader, replaceStateLoader = monitorstate.DeferredStateLoader(monitorstate.NilStateLoader, 15*time.Second)
Expand Down Expand Up @@ -253,8 +264,12 @@ func (bt *Heartbeat) RunCentralMgmtMonitors(b *beat.Beat) {
return nil
}

if err := makeStatesClient(outCfg.Config(), bt.replaceStateLoader, bt.config.RunFrom); err != nil {
logp.L().Warnf("could not connect to ES for state management during managed reload: %s", err)
// Backoff panics with 0 duration, set to smallest unit
esClient, err := makeESClient(outCfg.Config(), 1, 1*time.Nanosecond)
if err != nil {
logp.L().Warnf("skipping monitor state management during managed reload: %w", err)
} else {
bt.replaceStateLoader(monitorstate.MakeESLoader(esClient, monitorstate.DefaultDataStreams, bt.config.RunFrom))
}

return nil
Expand All @@ -268,7 +283,7 @@ func (bt *Heartbeat) RunCentralMgmtMonitors(b *beat.Beat) {
func (bt *Heartbeat) RunReloadableMonitors() (err error) {
// Check monitor configs
if err := bt.monitorReloader.Check(bt.monitorFactory); err != nil {
logp.Error(fmt.Errorf("error loading reloadable monitors: %w", err))
logp.L().Error(fmt.Errorf("error loading reloadable monitors: %w", err))
}

// Execute the monitor
Expand Down Expand Up @@ -298,16 +313,28 @@ func (bt *Heartbeat) Stop() {
bt.stopOnce.Do(func() { close(bt.done) })
}

func makeStatesClient(cfg *conf.C, replace func(monitorstate.StateLoader), runFrom *config.LocationWithID) error {
esClient, err := eslegclient.NewConnectedClient(cfg, "Heartbeat")
if err != nil {
return err
}
func makeESClient(cfg *conf.C, attempts int, wait time.Duration) (*eslegclient.Connection, error) {
var (
esClient *eslegclient.Connection
err error
)

// ES client backoff
connectDelay := backoff.NewEqualJitterBackoff(
context.Background().Done(),
wait,
wait,
)

if esClient != nil {
logp.L().Info("using ES states loader")
replace(monitorstate.MakeESLoader(esClient, "synthetics-*,heartbeat-*", runFrom))
for i := 0; i < attempts; i++ {
esClient, err = eslegclient.NewConnectedClient(cfg, "Heartbeat")
if err == nil {
connectDelay.Reset()
return esClient, nil
} else {
connectDelay.Wait()
}
}

return nil
return nil, fmt.Errorf("could not establish states loader connection after %d attempts, with %s delay", attempts, wait)
}
2 changes: 2 additions & 0 deletions heartbeat/monitors/wrappers/monitorstate/esloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
)

var DefaultDataStreams = "synthetics-*,heartbeat-*"

func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation *config.LocationWithID) StateLoader {
if indexPattern == "" {
// Should never happen, but if we ever make a coding error...
Expand Down

0 comments on commit 29fcd59

Please sign in to comment.