Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add -run-once option to exit after a single collection #6537

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (

// Agent runs a set of plugins.
type Agent struct {
Config *config.Config
Config *config.Config
RunOnce bool
}

// NewAgent returns an Agent for the given Config.
func NewAgent(config *config.Config) (*Agent, error) {
func NewAgent(config *config.Config, runOnce bool) (*Agent, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation of this isn't something I would accept, adding booleans as trailing parameters is not allowed and it pushes a lot of new complexity into the agent.

The interface to this should be a new function, similar to what was done with the Test function.

Implementation might be shared, but it ought to guarantee a single Gather, single Aggregation, and as many writes are are required to send it all. So, I'm not sure if sharing much would work out.

a := &Agent{
Config: config,
Config: config,
RunOnce: runOnce,
}
return a, nil
}
Expand Down Expand Up @@ -57,10 +59,12 @@ func (a *Agent) Run(ctx context.Context) error {

startTime := time.Now()

log.Printf("D! [agent] Starting service inputs")
err = a.startServiceInputs(ctx, inputC)
if err != nil {
return err
if !a.RunOnce {
log.Printf("D! [agent] Starting service inputs")
err = a.startServiceInputs(ctx, inputC)
if err != nil {
return err
}
}

var wg sync.WaitGroup
Expand All @@ -77,8 +81,10 @@ func (a *Agent) Run(ctx context.Context) error {
log.Printf("E! [agent] Error running inputs: %v", err)
}

log.Printf("D! [agent] Stopping service inputs")
a.stopServiceInputs()
if !a.RunOnce {
log.Printf("D! [agent] Stopping service inputs")
a.stopServiceInputs()
}

close(dst)
log.Printf("D! [agent] Input channel closed")
Expand Down Expand Up @@ -267,14 +273,19 @@ func (a *Agent) runInputs(
go func(input *models.RunningInput) {
defer wg.Done()

if a.Config.Agent.RoundInterval {
if !a.RunOnce && a.Config.Agent.RoundInterval {
err := internal.SleepContext(
ctx, internal.AlignDuration(startTime, interval))
if err != nil {
return
}
}

if a.RunOnce {
a.gatherOnce(acc, input, interval)
return
}

a.gatherOnInterval(ctx, acc, input, interval, jitter)
}(input)
}
Expand Down Expand Up @@ -507,7 +518,7 @@ func (a *Agent) runOutputs(
go func(output *models.RunningOutput) {
defer wg.Done()

if a.Config.Agent.RoundInterval {
if !a.RunOnce && !a.Config.Agent.RoundInterval {
err := internal.SleepContext(
ctx, internal.AlignDuration(startTime, interval))
if err != nil {
Expand Down
26 changes: 13 additions & 13 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
func TestAgent_OmitHostname(t *testing.T) {
c := config.NewConfig()
c.Agent.OmitHostname = true
_, err := NewAgent(c)
_, err := NewAgent(c, false)
assert.NoError(t, err)
assert.NotContains(t, c.Tags, "host")
}
Expand All @@ -24,35 +24,35 @@ func TestAgent_LoadPlugin(t *testing.T) {
c.InputFilters = []string{"mysql"}
err := c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ := NewAgent(c)
a, _ := NewAgent(c, false)
assert.Equal(t, 1, len(a.Config.Inputs))

c = config.NewConfig()
c.InputFilters = []string{"foo"}
err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ = NewAgent(c)
a, _ = NewAgent(c, false)
assert.Equal(t, 0, len(a.Config.Inputs))

c = config.NewConfig()
c.InputFilters = []string{"mysql", "foo"}
err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ = NewAgent(c)
a, _ = NewAgent(c, false)
assert.Equal(t, 1, len(a.Config.Inputs))

c = config.NewConfig()
c.InputFilters = []string{"mysql", "redis"}
err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ = NewAgent(c)
a, _ = NewAgent(c, false)
assert.Equal(t, 2, len(a.Config.Inputs))

c = config.NewConfig()
c.InputFilters = []string{"mysql", "foo", "redis", "bar"}
err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ = NewAgent(c)
a, _ = NewAgent(c, false)
assert.Equal(t, 2, len(a.Config.Inputs))
}

Expand All @@ -61,50 +61,50 @@ func TestAgent_LoadOutput(t *testing.T) {
c.OutputFilters = []string{"influxdb"}
err := c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ := NewAgent(c)
a, _ := NewAgent(c, false)
assert.Equal(t, 2, len(a.Config.Outputs))

c = config.NewConfig()
c.OutputFilters = []string{"kafka"}
err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ = NewAgent(c)
a, _ = NewAgent(c, false)
assert.Equal(t, 1, len(a.Config.Outputs))

c = config.NewConfig()
c.OutputFilters = []string{}
err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ = NewAgent(c)
a, _ = NewAgent(c, false)
assert.Equal(t, 3, len(a.Config.Outputs))

c = config.NewConfig()
c.OutputFilters = []string{"foo"}
err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ = NewAgent(c)
a, _ = NewAgent(c, false)
assert.Equal(t, 0, len(a.Config.Outputs))

c = config.NewConfig()
c.OutputFilters = []string{"influxdb", "foo"}
err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ = NewAgent(c)
a, _ = NewAgent(c, false)
assert.Equal(t, 2, len(a.Config.Outputs))

c = config.NewConfig()
c.OutputFilters = []string{"influxdb", "kafka"}
err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
assert.Equal(t, 3, len(c.Outputs))
a, _ = NewAgent(c)
a, _ = NewAgent(c, false)
assert.Equal(t, 3, len(a.Config.Outputs))

c = config.NewConfig()
c.OutputFilters = []string{"influxdb", "foo", "kafka", "bar"}
err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ = NewAgent(c)
a, _ = NewAgent(c, false)
assert.Equal(t, 3, len(a.Config.Outputs))
}

Expand Down
3 changes: 2 additions & 1 deletion cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var fServiceDisplayName = flag.String("service-display-name", "Telegraf Data Col
var fRunAsConsole = flag.Bool("console", false, "run as console application (windows only)")
var fPlugins = flag.String("plugin-directory", "",
"path to directory containing external plugins")
var fRunOnce = flag.Bool("run-once", false, "collect metrics from inputs once and exit")

var (
version string
Expand Down Expand Up @@ -152,7 +153,7 @@ func runAgent(ctx context.Context,
c.Agent.Interval.Duration)
}

ag, err := agent.NewAgent(c)
ag, err := agent.NewAgent(c, *fRunOnce)
if err != nil {
return err
}
Expand Down