Skip to content

Commit

Permalink
Make each output keep it's own slice of points
Browse files Browse the repository at this point in the history
Also moved some objects out of config.go and put them in their own
package, internal/models

fixes #568
closes #285
  • Loading branch information
sparrc committed Jan 22, 2016
1 parent f2ab5f6 commit 9d8b4a4
Show file tree
Hide file tree
Showing 12 changed files with 447 additions and 421 deletions.
6 changes: 3 additions & 3 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sync"
"time"

"github.com/influxdata/telegraf/internal/config"
"github.com/influxdata/telegraf/internal/models"

"github.com/influxdata/influxdb/client/v2"
)
Expand All @@ -29,7 +29,7 @@ type Accumulator interface {
}

func NewAccumulator(
inputConfig *config.InputConfig,
inputConfig *models.InputConfig,
points chan *client.Point,
) Accumulator {
acc := accumulator{}
Expand All @@ -47,7 +47,7 @@ type accumulator struct {

debug bool

inputConfig *config.InputConfig
inputConfig *models.InputConfig

prefix string
}
Expand Down
93 changes: 23 additions & 70 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/influxdata/telegraf/internal/config"
"github.com/influxdata/telegraf/internal/models"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs"

Expand Down Expand Up @@ -101,7 +102,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {

wg.Add(1)
counter++
go func(input *config.RunningInput) {
go func(input *models.RunningInput) {
defer wg.Done()

acc := NewAccumulator(input.Config, pointChan)
Expand Down Expand Up @@ -144,7 +145,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {
// reporting interval.
func (a *Agent) gatherSeparate(
shutdown chan struct{},
input *config.RunningInput,
input *models.RunningInput,
pointChan chan *client.Point,
) error {
ticker := time.NewTicker(input.Config.Interval)
Expand Down Expand Up @@ -228,93 +229,45 @@ func (a *Agent) Test() error {
return nil
}

// writeOutput writes a list of points to a single output, with retries.
// Optionally takes a `done` channel to indicate that it is done writing.
func (a *Agent) writeOutput(
points []*client.Point,
ro *config.RunningOutput,
shutdown chan struct{},
wg *sync.WaitGroup,
) {
defer wg.Done()
if len(points) == 0 {
return
}
retry := 0
retries := a.Config.Agent.FlushRetries
start := time.Now()

for {
filtered := ro.FilterPoints(points)
err := ro.Output.Write(filtered)
if err == nil {
// Write successful
elapsed := time.Since(start)
if !a.Config.Agent.Quiet {
log.Printf("Flushed %d metrics to output %s in %s\n",
len(filtered), ro.Name, elapsed)
}
return
}

select {
case <-shutdown:
return
default:
if retry >= retries {
// No more retries
msg := "FATAL: Write to output [%s] failed %d times, dropping" +
" %d metrics\n"
log.Printf(msg, ro.Name, retries+1, len(points))
return
} else if err != nil {
// Sleep for a retry
log.Printf("Error in output [%s]: %s, retrying in %s",
ro.Name, err.Error(), a.Config.Agent.FlushInterval.Duration)
time.Sleep(a.Config.Agent.FlushInterval.Duration)
}
}

retry++
}
}

// flush writes a list of points to all configured outputs
func (a *Agent) flush(
points []*client.Point,
shutdown chan struct{},
wait bool,
) {
func (a *Agent) flush() {
var wg sync.WaitGroup

wg.Add(len(a.Config.Outputs))
for _, o := range a.Config.Outputs {
wg.Add(1)
go a.writeOutput(points, o, shutdown, &wg)
}
if wait {
wg.Wait()
go func(output *models.RunningOutput) {
defer wg.Done()
err := output.Write()
if err != nil {
log.Printf("Error writing to output [%s]: %s\n",
output.Name, err.Error())
}
}(o)
}

wg.Wait()
}

// flusher monitors the points input channel and flushes on the minimum interval
func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) error {
// Inelegant, but this sleep is to allow the Gather threads to run, so that
// the flusher will flush after metrics are collected.
time.Sleep(time.Millisecond * 100)
time.Sleep(time.Millisecond * 200)

ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
points := make([]*client.Point, 0)

for {
select {
case <-shutdown:
log.Println("Hang on, flushing any cached points before shutdown")
a.flush(points, shutdown, true)
a.flush()
return nil
case <-ticker.C:
a.flush(points, shutdown, false)
points = make([]*client.Point, 0)
a.flush()
case pt := <-pointChan:
points = append(points, pt)
for _, o := range a.Config.Outputs {
o.AddPoint(pt)
}
}
}
}
Expand Down Expand Up @@ -389,7 +342,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
// configured. Default intervals are handled below with gatherParallel
if input.Config.Interval != 0 {
wg.Add(1)
go func(input *config.RunningInput) {
go func(input *models.RunningInput) {
defer wg.Done()
if err := a.gatherSeparate(shutdown, input, pointChan); err != nil {
log.Printf(err.Error())
Expand Down
Loading

0 comments on commit 9d8b4a4

Please sign in to comment.