Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for SIGUSR1 to trigger flush #7366

Merged
merged 6 commits into from
Apr 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 28 additions & 20 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"os"
"runtime"
"sync"
"time"
Expand Down Expand Up @@ -516,16 +517,7 @@ func (a *Agent) runOutputs(
wg.Add(1)
go func(output *models.RunningOutput) {
defer wg.Done()

if a.Config.Agent.RoundInterval {
err := internal.SleepContext(
ctx, internal.AlignDuration(startTime, interval))
if err != nil {
return
}
}

a.flush(ctx, output, interval, jitter)
a.flushLoop(ctx, startTime, output, interval, jitter)
}(output)
}

Expand All @@ -546,25 +538,39 @@ func (a *Agent) runOutputs(
return nil
}

// flush runs an output's flush function periodically until the context is
// flushLoop runs an output's flush function periodically until the context is
// done.
func (a *Agent) flush(
func (a *Agent) flushLoop(
ctx context.Context,
startTime time.Time,
output *models.RunningOutput,
interval time.Duration,
jitter time.Duration,
) {
// since we are watching two channels we need a ticker with the jitter
// integrated.
ticker := NewTicker(interval, jitter)
defer ticker.Stop()

logError := func(err error) {
if err != nil {
log.Printf("E! [agent] Error writing to %s: %v", output.LogName(), err)
}
}

// watch for flush requests
flushRequested := make(chan os.Signal, 1)
watchForFlushSignal(flushRequested)

// align to round interval
if a.Config.Agent.RoundInterval {
err := internal.SleepContext(
ctx, internal.AlignDuration(startTime, interval))
if err != nil {
return
}
}

// since we are watching two channels we need a ticker with the jitter
// integrated.
ticker := NewTicker(interval, jitter)
defer ticker.Stop()

for {
// Favor shutdown over other methods.
select {
Expand All @@ -575,8 +581,13 @@ func (a *Agent) flush(
}

select {
case <-ctx.Done():
logError(a.flushOnce(output, interval, output.Write))
return
case <-ticker.C:
logError(a.flushOnce(output, interval, output.Write))
case <-flushRequested:
logError(a.flushOnce(output, interval, output.Write))
case <-output.BatchReady:
// Favor the ticker over batch ready
select {
Expand All @@ -585,9 +596,6 @@ func (a *Agent) flush(
default:
logError(a.flushOnce(output, interval, output.WriteBatch))
}
case <-ctx.Done():
logError(a.flushOnce(output, interval, output.Write))
return
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions agent/agent_notwindows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// +build !windows

package agent

import (
"os"
"os/signal"
"syscall"
)

const flushSignal = syscall.SIGUSR1

func watchForFlushSignal(flushRequested chan os.Signal) {
signal.Notify(flushRequested, flushSignal)
defer signal.Stop(flushRequested)
}
9 changes: 9 additions & 0 deletions agent/agent_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// +build windows

package agent

import "os"

func watchForFlushSignal(flushRequested chan os.Signal) {
// not implemented
}
116 changes: 9 additions & 107 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
_ "net/http/pprof" // Comment this line to disable pprof endpoint.
"os"
"os/signal"
"runtime"
"sort"
"strings"
"syscall"
Expand All @@ -27,16 +26,16 @@ import (
"github.com/influxdata/telegraf/plugins/outputs"
_ "github.com/influxdata/telegraf/plugins/outputs/all"
_ "github.com/influxdata/telegraf/plugins/processors/all"
"github.com/kardianos/service"
)

// If you update these, update usage.go and usage_windows.go
var fDebug = flag.Bool("debug", false,
"turn on debug logging")
var pprofAddr = flag.String("pprof-addr", "",
"pprof address to listen on, not activate pprof if empty")
var fQuiet = flag.Bool("quiet", false,
"run in quiet mode")
var fTest = flag.Bool("test", false, "enable test mode: gather metrics, print them out, and exit")
var fTest = flag.Bool("test", false, "enable test mode: gather metrics, print them out, and exit. Note: Test mode only runs inputs, not processors, aggregators, or outputs")
ssoroka marked this conversation as resolved.
Show resolved Hide resolved
var fTestWait = flag.Int("test-wait", 0, "wait up to this many seconds for service inputs to complete in test mode")
var fConfig = flag.String("config", "", "configuration file to load")
var fConfigDirectory = flag.String("config-directory", "",
Expand Down Expand Up @@ -78,7 +77,6 @@ var (
var stop chan struct{}

func reloadLoop(
stop chan struct{},
inputFilters []string,
outputFilters []string,
aggregatorFilters []string,
Expand All @@ -91,7 +89,7 @@ func reloadLoop(

ctx, cancel := context.WithCancel(context.Background())

signals := make(chan os.Signal)
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGHUP,
syscall.SIGTERM, syscall.SIGINT)
go func() {
Expand Down Expand Up @@ -208,32 +206,6 @@ func usageExit(rc int) {
os.Exit(rc)
}

type program struct {
inputFilters []string
outputFilters []string
aggregatorFilters []string
processorFilters []string
}

func (p *program) Start(s service.Service) error {
go p.run()
return nil
}
func (p *program) run() {
stop = make(chan struct{})
reloadLoop(
stop,
p.inputFilters,
p.outputFilters,
p.aggregatorFilters,
p.processorFilters,
)
}
func (p *program) Stop(s service.Service) error {
close(stop)
return nil
}

func formatFullVersion() string {
var parts = []string{"Telegraf"}

Expand Down Expand Up @@ -380,80 +352,10 @@ func main() {
log.Println("Telegraf version already configured to: " + internal.Version())
}

if runtime.GOOS == "windows" && windowsRunAsService() {
programFiles := os.Getenv("ProgramFiles")
if programFiles == "" { // Should never happen
programFiles = "C:\\Program Files"
}
svcConfig := &service.Config{
Name: *fServiceName,
DisplayName: *fServiceDisplayName,
Description: "Collects data using a series of plugins and publishes it to" +
"another series of plugins.",
Arguments: []string{"--config", programFiles + "\\Telegraf\\telegraf.conf"},
}

prg := &program{
inputFilters: inputFilters,
outputFilters: outputFilters,
aggregatorFilters: aggregatorFilters,
processorFilters: processorFilters,
}
s, err := service.New(prg, svcConfig)
if err != nil {
log.Fatal("E! " + err.Error())
}
// Handle the --service flag here to prevent any issues with tooling that
// may not have an interactive session, e.g. installing from Ansible.
if *fService != "" {
if *fConfig != "" {
svcConfig.Arguments = []string{"--config", *fConfig}
}
if *fConfigDirectory != "" {
svcConfig.Arguments = append(svcConfig.Arguments, "--config-directory", *fConfigDirectory)
}
//set servicename to service cmd line, to have a custom name after relaunch as a service
svcConfig.Arguments = append(svcConfig.Arguments, "--service-name", *fServiceName)

err := service.Control(s, *fService)
if err != nil {
log.Fatal("E! " + err.Error())
}
os.Exit(0)
} else {
winlogger, err := s.Logger(nil)
if err == nil {
//When in service mode, register eventlog target andd setup default logging to eventlog
logger.RegisterEventLogger(winlogger)
logger.SetupLogging(logger.LogConfig{LogTarget: logger.LogTargetEventlog})
}
err = s.Run()

if err != nil {
log.Println("E! " + err.Error())
}
}
} else {
stop = make(chan struct{})
reloadLoop(
stop,
inputFilters,
outputFilters,
aggregatorFilters,
processorFilters,
)
}
}

// Return true if Telegraf should create a Windows service.
func windowsRunAsService() bool {
if *fService != "" {
return true
}

if *fRunAsConsole {
return false
}

return !service.Interactive()
run(
inputFilters,
outputFilters,
aggregatorFilters,
processorFilters,
)
}
13 changes: 13 additions & 0 deletions cmd/telegraf/telegraf_notwindows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// +build !windows

package main

func run(inputFilters, outputFilters, aggregatorFilters, processorFilters []string) {
stop = make(chan struct{})
reloadLoop(
inputFilters,
outputFilters,
aggregatorFilters,
processorFilters,
)
}
Loading