Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make cloudwatchlogs's pusher wait for the final flush to complete before returning #350

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions plugins/outputs/cloudwatchlogs/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,23 @@ type CloudWatchLogs struct {

Log telegraf.Logger `toml:"-"`

cwDests map[Target]*cwDest
pusherStopChan chan struct{}
pusherWaitGroup sync.WaitGroup
cwDests map[Target]*cwDest
}

func (c *CloudWatchLogs) Connect() error {
return nil
}

func (c *CloudWatchLogs) Close() error {
close(c.pusherStopChan)
c.pusherWaitGroup.Wait()
brandondahler marked this conversation as resolved.
Show resolved Hide resolved

for _, d := range c.cwDests {
d.Stop()
}

return nil
}

Expand Down Expand Up @@ -129,7 +135,7 @@ func (c *CloudWatchLogs) getDest(t Target) *cwDest {
client.Handlers.Build.PushBackNamed(handlers.NewRequestCompressionHandler([]string{"PutLogEvents"}))
client.Handlers.Build.PushBackNamed(handlers.NewCustomHeaderHandler("User-Agent", agentinfo.UserAgent(t.Group)))

pusher := NewPusher(t, client, c.ForceFlushInterval.Duration, maxRetryTimeout, c.Log)
pusher := NewPusher(t, client, c.ForceFlushInterval.Duration, maxRetryTimeout, c.Log, c.pusherStopChan, &c.pusherWaitGroup)
cwd := &cwDest{pusher: pusher, retryer: logThrottleRetryer}
c.cwDests[t] = cwd
return cwd
Expand Down Expand Up @@ -290,7 +296,6 @@ func (cd *cwDest) Publish(events []logs.LogEvent) error {
}

func (cd *cwDest) Stop() {
cd.pusher.Stop()
cd.retryer.Stop()
cd.stopped = true
}
Expand Down Expand Up @@ -365,6 +370,7 @@ func init() {
outputs.Add("cloudwatchlogs", func() telegraf.Output {
return &CloudWatchLogs{
ForceFlushInterval: internal.Duration{Duration: defaultFlushTimeout},
pusherStopChan: make(chan struct{}),
cwDests: make(map[Target]*cwDest),
}
})
Expand Down
25 changes: 17 additions & 8 deletions plugins/outputs/cloudwatchlogs/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@ type pusher struct {
sequenceToken *string
lastValidTime int64
needSort bool
stop chan struct{}
stop <-chan struct{}
lastSentTime time.Time

initNonBlockingChOnce sync.Once
startNonBlockCh chan struct{}
wg *sync.WaitGroup
}

func NewPusher(target Target, service CloudWatchLogsService, flushTimeout time.Duration, retryDuration time.Duration, logger telegraf.Logger) *pusher {
func NewPusher(target Target, service CloudWatchLogsService, flushTimeout time.Duration, retryDuration time.Duration, logger telegraf.Logger, stop <-chan struct{}, wg *sync.WaitGroup) *pusher {
p := &pusher{
Target: target,
Service: service,
Expand All @@ -67,10 +68,12 @@ func NewPusher(target Target, service CloudWatchLogsService, flushTimeout time.D
events: make([]*cloudwatchlogs.InputLogEvent, 0, 10),
eventsCh: make(chan logs.LogEvent, 100),
flushTimer: time.NewTimer(flushTimeout),
stop: make(chan struct{}),
stop: stop,
startNonBlockCh: make(chan struct{}),
wg: wg,
}
p.putRetentionPolicy()
p.wg.Add(1)
go p.start()
return p
}
Expand Down Expand Up @@ -120,11 +123,9 @@ func hasValidTime(e logs.LogEvent) bool {
return true
}

func (p *pusher) Stop() {
close(p.stop)
}

func (p *pusher) start() {
defer p.wg.Done()

ec := make(chan logs.LogEvent)

// Merge events from both blocking and non-blocking channel
Expand Down Expand Up @@ -299,7 +300,15 @@ func (p *pusher) send() {
}

p.Log.Warnf("Retried %v time, going to sleep %v before retrying.", retryCount, wait)
time.Sleep(wait)

select {
case <-p.stop:
p.Log.Errorf("Stop requested after %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCount, p.Group, p.Stream)
p.reset()
return
case <-time.After(wait):
}

retryCount++
}

Expand Down
Loading