From cecbde2e8242af5d92b25e619ad1659c1b8bfee9 Mon Sep 17 00:00:00 2001 From: Brandon Dahler Date: Tue, 1 Feb 2022 17:14:53 -0500 Subject: [PATCH 1/3] Make cloudwatchlogs pusher's Close wait for the final flush to complete before returning. --- plugins/outputs/cloudwatchlogs/pusher.go | 5 +++++ plugins/outputs/cloudwatchlogs/pusher_test.go | 1 - 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/plugins/outputs/cloudwatchlogs/pusher.go b/plugins/outputs/cloudwatchlogs/pusher.go index a14591aa99..ec23e2c8b6 100644 --- a/plugins/outputs/cloudwatchlogs/pusher.go +++ b/plugins/outputs/cloudwatchlogs/pusher.go @@ -55,6 +55,7 @@ type pusher struct { initNonBlockingChOnce sync.Once startNonBlockCh chan struct{} + wg sync.WaitGroup } func NewPusher(target Target, service CloudWatchLogsService, flushTimeout time.Duration, retryDuration time.Duration, logger telegraf.Logger) *pusher { @@ -71,6 +72,7 @@ func NewPusher(target Target, service CloudWatchLogsService, flushTimeout time.D startNonBlockCh: make(chan struct{}), } p.putRetentionPolicy() + p.wg.Add(1) go p.start() return p } @@ -122,9 +124,12 @@ func hasValidTime(e logs.LogEvent) bool { func (p *pusher) Stop() { close(p.stop) + p.wg.Wait() } func (p *pusher) start() { + defer p.wg.Done() + ec := make(chan logs.LogEvent) // Merge events from both blocking and non-blocking channel diff --git a/plugins/outputs/cloudwatchlogs/pusher_test.go b/plugins/outputs/cloudwatchlogs/pusher_test.go index 40b0352078..5538b297a4 100644 --- a/plugins/outputs/cloudwatchlogs/pusher_test.go +++ b/plugins/outputs/cloudwatchlogs/pusher_test.go @@ -144,7 +144,6 @@ func TestStopPusherWouldDoFinalSend(t *testing.T) { } time.Sleep(10 * time.Millisecond) p.Stop() - time.Sleep(10 * time.Millisecond) if !called { t.Errorf("PutLogEvents has not been called after p has been Stopped.") } From ec8420ebf5208e0aacce11ba4e4814ae08d7b8fd Mon Sep 17 00:00:00 2001 From: Brandon Dahler Date: Wed, 2 Feb 2022 09:54:24 -0500 Subject: [PATCH 2/3] Share stop channel and wait group across all pushers. --- .../outputs/cloudwatchlogs/cloudwatchlogs.go | 12 +- plugins/outputs/cloudwatchlogs/pusher.go | 14 +-- plugins/outputs/cloudwatchlogs/pusher_test.go | 117 +++++++++++------- 3 files changed, 87 insertions(+), 56 deletions(-) diff --git a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go index 282363a40c..88776a66c1 100644 --- a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go +++ b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go @@ -61,7 +61,9 @@ type CloudWatchLogs struct { Log telegraf.Logger `toml:"-"` - cwDests map[Target]*cwDest + pusherStopChan chan struct{} + pusherWaitGroup sync.WaitGroup + cwDests map[Target]*cwDest } func (c *CloudWatchLogs) Connect() error { @@ -69,9 +71,13 @@ func (c *CloudWatchLogs) Connect() error { } func (c *CloudWatchLogs) Close() error { + close(c.pusherStopChan) + c.pusherWaitGroup.Wait() + for _, d := range c.cwDests { d.Stop() } + return nil } @@ -129,7 +135,7 @@ func (c *CloudWatchLogs) getDest(t Target) *cwDest { client.Handlers.Build.PushBackNamed(handlers.NewRequestCompressionHandler([]string{"PutLogEvents"})) client.Handlers.Build.PushBackNamed(handlers.NewCustomHeaderHandler("User-Agent", agentinfo.UserAgent(t.Group))) - pusher := NewPusher(t, client, c.ForceFlushInterval.Duration, maxRetryTimeout, c.Log) + pusher := NewPusher(t, client, c.ForceFlushInterval.Duration, maxRetryTimeout, c.Log, c.pusherStopChan, &c.pusherWaitGroup) cwd := &cwDest{pusher: pusher, retryer: logThrottleRetryer} c.cwDests[t] = cwd return cwd @@ -290,7 +296,6 @@ func (cd *cwDest) Publish(events []logs.LogEvent) error { } func (cd *cwDest) Stop() { - cd.pusher.Stop() cd.retryer.Stop() cd.stopped = true } @@ -365,6 +370,7 @@ func init() { outputs.Add("cloudwatchlogs", func() telegraf.Output { return &CloudWatchLogs{ ForceFlushInterval: internal.Duration{Duration: defaultFlushTimeout}, + pusherStopChan: make(chan struct{}), cwDests: make(map[Target]*cwDest), } }) diff --git a/plugins/outputs/cloudwatchlogs/pusher.go b/plugins/outputs/cloudwatchlogs/pusher.go index ec23e2c8b6..69951a58ce 100644 --- a/plugins/outputs/cloudwatchlogs/pusher.go +++ b/plugins/outputs/cloudwatchlogs/pusher.go @@ -50,15 +50,15 @@ type pusher struct { sequenceToken *string lastValidTime int64 needSort bool - stop chan struct{} + stop <-chan struct{} lastSentTime time.Time initNonBlockingChOnce sync.Once startNonBlockCh chan struct{} - wg sync.WaitGroup + wg *sync.WaitGroup } -func NewPusher(target Target, service CloudWatchLogsService, flushTimeout time.Duration, retryDuration time.Duration, logger telegraf.Logger) *pusher { +func NewPusher(target Target, service CloudWatchLogsService, flushTimeout time.Duration, retryDuration time.Duration, logger telegraf.Logger, stop <-chan struct{}, wg *sync.WaitGroup) *pusher { p := &pusher{ Target: target, Service: service, @@ -68,8 +68,9 @@ func NewPusher(target Target, service CloudWatchLogsService, flushTimeout time.D events: make([]*cloudwatchlogs.InputLogEvent, 0, 10), eventsCh: make(chan logs.LogEvent, 100), flushTimer: time.NewTimer(flushTimeout), - stop: make(chan struct{}), + stop: stop, startNonBlockCh: make(chan struct{}), + wg: wg, } p.putRetentionPolicy() p.wg.Add(1) @@ -122,11 +123,6 @@ func hasValidTime(e logs.LogEvent) bool { return true } -func (p *pusher) Stop() { - close(p.stop) - p.wg.Wait() -} - func (p *pusher) start() { defer p.wg.Done() diff --git a/plugins/outputs/cloudwatchlogs/pusher_test.go b/plugins/outputs/cloudwatchlogs/pusher_test.go index 5538b297a4..957b5df96b 100644 --- a/plugins/outputs/cloudwatchlogs/pusher_test.go +++ b/plugins/outputs/cloudwatchlogs/pusher_test.go @@ -11,6 +11,7 @@ import ( "log" "os" "strings" + "sync" "testing" "time" @@ -20,6 +21,8 @@ import ( "github.com/influxdata/telegraf/models" ) +var wg sync.WaitGroup + type svcMock struct { ple func(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) clg func(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) @@ -54,7 +57,7 @@ func (s *svcMock) PutRetentionPolicy(in *cloudwatchlogs.PutRetentionPolicyInput) func TestNewPusher(t *testing.T) { var s svcMock - p := NewPusher(Target{"G", "S", 1}, &s, time.Second, maxRetryTimeout, models.NewLogger("cloudwatchlogs", "test", "")) + stop, p := testPreparation(1, &s, time.Second, maxRetryTimeout) if p.Service != &s { t.Errorf("Pusher service does not match the service passed in") } @@ -63,7 +66,8 @@ func TestNewPusher(t *testing.T) { t.Errorf("Pusher initialized with the wrong target: %v", p.Target) } - p.Stop() + close(stop) + wg.Wait() } type evtMock struct { @@ -105,7 +109,7 @@ func TestAddSingleEvent(t *testing.T) { }, nil } - p := NewPusher(Target{"G", "S", 1}, &s, 1*time.Hour, maxRetryTimeout, models.NewLogger("cloudwatchlogs", "test", "")) + stop, p := testPreparation(1, &s, 1*time.Hour, maxRetryTimeout) p.AddEvent(evtMock{"MSG", time.Now(), nil}) if called { @@ -121,6 +125,9 @@ func TestAddSingleEvent(t *testing.T) { if *p.sequenceToken != nst { t.Errorf("Pusher did not capture the NextSequenceToken") } + + close(stop) + wg.Wait() } func TestStopPusherWouldDoFinalSend(t *testing.T) { @@ -136,14 +143,17 @@ func TestStopPusherWouldDoFinalSend(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{NextSequenceToken: &nst}, nil } - p := NewPusher(Target{"G", "S", 1}, &s, 1*time.Hour, maxRetryTimeout, models.NewLogger("cloudwatchlogs", "test", "")) + stop, p := testPreparation(1, &s, 1*time.Hour, maxRetryTimeout) p.AddEvent(evtMock{"MSG", time.Now(), nil}) + time.Sleep(10 * time.Millisecond) if called { t.Errorf("PutLogEvents has been called too fast, it should wait until FlushTimeout.") } - time.Sleep(10 * time.Millisecond) - p.Stop() + + close(stop) + wg.Wait() + if !called { t.Errorf("PutLogEvents has not been called after p has been Stopped.") } @@ -174,11 +184,12 @@ func TestLongMessageGetsTruncated(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{NextSequenceToken: &nst}, nil } - p := NewPusher(Target{"G", "S", 1}, &s, 1*time.Hour, maxRetryTimeout, models.NewLogger("cloudwatchlogs", "test", "")) + stop, p := testPreparation(1, &s, 1*time.Hour, maxRetryTimeout) p.AddEvent(evtMock{longMsg, time.Now(), nil}) time.Sleep(10 * time.Millisecond) p.send() - p.Stop() + close(stop) + wg.Wait() } func TestRequestIsLessThan1MB(t *testing.T) { @@ -200,15 +211,15 @@ func TestRequestIsLessThan1MB(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{NextSequenceToken: &nst}, nil } - p := NewPusher(Target{"G", "S", 1}, &s, 1*time.Hour, maxRetryTimeout, models.NewLogger("cloudwatchlogs", "test", "")) + stop, p := testPreparation(1, &s, 1*time.Hour, maxRetryTimeout) for i := 0; i < 8; i++ { p.AddEvent(evtMock{longMsg, time.Now(), nil}) } time.Sleep(10 * time.Millisecond) p.send() p.send() - p.Stop() - time.Sleep(10 * time.Millisecond) + close(stop) + wg.Wait() } func TestRequestIsLessThan10kEvents(t *testing.T) { @@ -225,7 +236,7 @@ func TestRequestIsLessThan10kEvents(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{NextSequenceToken: &nst}, nil } - p := NewPusher(Target{"G", "S", 1}, &s, 1*time.Hour, maxRetryTimeout, models.NewLogger("cloudwatchlogs", "test", "")) + stop, p := testPreparation(1, &s, 1*time.Hour, maxRetryTimeout) for i := 0; i < 30000; i++ { p.AddEvent(evtMock{msg, time.Now(), nil}) } @@ -233,8 +244,8 @@ func TestRequestIsLessThan10kEvents(t *testing.T) { for i := 0; i < 5; i++ { p.send() } - p.Stop() - time.Sleep(100 * time.Millisecond) + close(stop) + wg.Wait() } func TestTimestampPopulation(t *testing.T) { @@ -250,7 +261,7 @@ func TestTimestampPopulation(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{NextSequenceToken: &nst}, nil } - p := NewPusher(Target{"G", "S", 1}, &s, 1*time.Hour, maxRetryTimeout, models.NewLogger("cloudwatchlogs", "test", "")) + stop, p := testPreparation(1, &s, 1*time.Hour, maxRetryTimeout) for i := 0; i < 3; i++ { p.AddEvent(evtMock{"msg", time.Time{}, nil}) // time.Time{} creates zero time } @@ -258,8 +269,8 @@ func TestTimestampPopulation(t *testing.T) { for i := 0; i < 5; i++ { p.send() } - p.Stop() - time.Sleep(100 * time.Millisecond) + close(stop) + wg.Wait() } func TestIgnoreOutOfTimeRangeEvent(t *testing.T) { @@ -274,7 +285,7 @@ func TestIgnoreOutOfTimeRangeEvent(t *testing.T) { var logbuf bytes.Buffer log.SetOutput(io.MultiWriter(&logbuf, os.Stdout)) - p := NewPusher(Target{"G", "S", 1}, &s, 10*time.Millisecond, maxRetryTimeout, models.NewLogger("cloudwatchlogs", "test", "")) + stop, p := testPreparation(1, &s, 10*time.Millisecond, maxRetryTimeout) p.AddEvent(evtMock{"MSG", time.Now().Add(-15 * 24 * time.Hour), nil}) p.AddEvent(evtMock{"MSG", time.Now().Add(2*time.Hour + 1*time.Minute), nil}) @@ -291,7 +302,8 @@ func TestIgnoreOutOfTimeRangeEvent(t *testing.T) { log.SetOutput(os.Stderr) time.Sleep(20 * time.Millisecond) - p.Stop() + close(stop) + wg.Wait() } func TestAddMultipleEvents(t *testing.T) { @@ -332,7 +344,7 @@ func TestAddMultipleEvents(t *testing.T) { evts = append(evts, e) } evts[10], evts[90] = evts[90], evts[10] // make events out of order - p := NewPusher(Target{"G", "S", 1}, &s, 1*time.Hour, maxRetryTimeout, models.NewLogger("cloudwatchlogs", "test", "")) + stop, p := testPreparation(1, &s, 1*time.Hour, maxRetryTimeout) for _, e := range evts { p.AddEvent(e) } @@ -344,7 +356,8 @@ func TestAddMultipleEvents(t *testing.T) { if p.sequenceToken == nil || *p.sequenceToken != nst { t.Errorf("Pusher did not capture the NextSequenceToken") } - p.Stop() + close(stop) + wg.Wait() } func TestSendReqWhenEventsSpanMoreThan24Hrs(t *testing.T) { @@ -386,7 +399,7 @@ func TestSendReqWhenEventsSpanMoreThan24Hrs(t *testing.T) { return nil, nil } - p := NewPusher(Target{"G", "S", 1}, &s, 1*time.Hour, maxRetryTimeout, models.NewLogger("cloudwatchlogs", "test", "")) + stop, p := testPreparation(1, &s, 1*time.Hour, maxRetryTimeout) p.AddEvent(evtMock{"MSG 25hrs ago", time.Now().Add(-25 * time.Hour), nil}) p.AddEvent(evtMock{"MSG 24hrs ago", time.Now().Add(-24 * time.Hour), nil}) p.AddEvent(evtMock{"MSG 23hrs ago", time.Now().Add(-23 * time.Hour), nil}) @@ -394,7 +407,8 @@ func TestSendReqWhenEventsSpanMoreThan24Hrs(t *testing.T) { p.FlushTimeout = 10 * time.Millisecond p.resetFlushTimer() time.Sleep(20 * time.Millisecond) - p.Stop() + close(stop) + wg.Wait() } func TestUnhandledErrorWouldNotResend(t *testing.T) { @@ -414,7 +428,7 @@ func TestUnhandledErrorWouldNotResend(t *testing.T) { var logbuf bytes.Buffer log.SetOutput(io.MultiWriter(&logbuf, os.Stdout)) - p := NewPusher(Target{"G", "S", 1}, &s, 1*time.Hour, maxRetryTimeout, models.NewLogger("cloudwatchlogs", "test", "")) + stop, p := testPreparation(1, &s, 1*time.Hour, maxRetryTimeout) p.AddEvent(evtMock{"msg", time.Now(), nil}) p.FlushTimeout = 10 * time.Millisecond time.Sleep(2000 * time.Millisecond) @@ -425,12 +439,12 @@ func TestUnhandledErrorWouldNotResend(t *testing.T) { } log.SetOutput(os.Stderr) - p.Stop() + close(stop) + wg.Wait() if cnt != 1 { t.Errorf("Expecting pusher to call send 1 time, but %d times called", cnt) } - time.Sleep(20 * time.Millisecond) } func TestCreateLogGroupAndLogSteamWhenNotFound(t *testing.T) { @@ -471,7 +485,7 @@ func TestCreateLogGroupAndLogSteamWhenNotFound(t *testing.T) { var logbuf bytes.Buffer log.SetOutput(io.MultiWriter(&logbuf, os.Stdout)) - p := NewPusher(Target{"G", "S", 1}, &s, 1*time.Hour, maxRetryTimeout, models.NewLogger("cloudwatchlogs", "test", "")) + stop, p := testPreparation(1, &s, 1*time.Hour, maxRetryTimeout) p.AddEvent(evtMock{"msg", time.Now(), nil}) time.Sleep(10 * time.Millisecond) p.send() @@ -495,13 +509,13 @@ func TestCreateLogGroupAndLogSteamWhenNotFound(t *testing.T) { log.SetOutput(os.Stderr) - p.Stop() - time.Sleep(10 * time.Millisecond) + close(stop) + wg.Wait() } func TestCreateLogGroupWithError(t *testing.T) { var s svcMock - p := NewPusher(Target{"G", "S", 1}, &s, 1*time.Hour, maxRetryTimeout, models.NewLogger("cloudwatchlogs", "test", "")) + stop, p := testPreparation(1, &s, 1*time.Hour, maxRetryTimeout) // test normal case. 1. creating stream fails, 2, creating group succeeds, 3, creating stream succeeds. var cnt_clg int @@ -585,9 +599,8 @@ func TestCreateLogGroupWithError(t *testing.T) { t.Errorf("CreateLogGroup should be called for one time.") } - p.Stop() - time.Sleep(10 * time.Millisecond) - + close(stop) + wg.Wait() } func TestLogRejectedLogEntryInfo(t *testing.T) { @@ -608,7 +621,7 @@ func TestLogRejectedLogEntryInfo(t *testing.T) { var logbuf bytes.Buffer log.SetOutput(io.MultiWriter(&logbuf, os.Stdout)) - p := NewPusher(Target{"G", "S", 1}, &s, 1*time.Hour, maxRetryTimeout, models.NewLogger("cloudwatchlogs", "test", "")) + stop, p := testPreparation(1, &s, 1*time.Hour, maxRetryTimeout) p.AddEvent(evtMock{"msg", time.Now(), nil}) time.Sleep(10 * time.Millisecond) p.send() @@ -632,8 +645,8 @@ func TestLogRejectedLogEntryInfo(t *testing.T) { log.SetOutput(os.Stderr) - p.Stop() - time.Sleep(10 * time.Millisecond) + close(stop) + wg.Wait() } func TestAddEventNonBlocking(t *testing.T) { @@ -662,7 +675,7 @@ func TestAddEventNonBlocking(t *testing.T) { } evts = append(evts, e) } - p := NewPusher(Target{"G", "S", 1}, &s, 1*time.Hour, maxRetryTimeout, models.NewLogger("cloudwatchlogs", "test", "")) + stop, p := testPreparation(1, &s, 1*time.Hour, maxRetryTimeout) p.FlushTimeout = 10 * time.Millisecond p.resetFlushTimer() time.Sleep(200 * time.Millisecond) // Wait until pusher started, merge channel is blocked @@ -676,7 +689,8 @@ func TestAddEventNonBlocking(t *testing.T) { if p.sequenceToken == nil || *p.sequenceToken != nst { t.Errorf("Pusher did not capture the NextSequenceToken") } - p.Stop() + close(stop) + wg.Wait() } func TestPutRetentionNegativeInput(t *testing.T) { @@ -686,11 +700,15 @@ func TestPutRetentionNegativeInput(t *testing.T) { prpc++ return nil, nil } - p := NewPusher(Target{"G", "S", -1}, &s, 1*time.Hour, maxRetryTimeout, models.NewLogger("cloudwatchlogs", "test", "")) + stop, p := testPreparation(-1, &s, 1*time.Hour, maxRetryTimeout) p.putRetentionPolicy() + if prpc == 1 { t.Errorf("Put Retention Policy api shouldn't have been called") } + + close(stop) + wg.Wait() } func TestPutRetentionValidMaxInput(t *testing.T) { @@ -700,13 +718,15 @@ func TestPutRetentionValidMaxInput(t *testing.T) { prpc++ return nil, nil } - p := NewPusher(Target{"G", "S", 1000000000000000000}, &s, 1*time.Hour, maxRetryTimeout, models.NewLogger("cloudwatchlogs", "test", "")) + stop, p := testPreparation(1000000000000000000, &s, 1*time.Hour, maxRetryTimeout) p.putRetentionPolicy() if prpc != 2 { t.Errorf("Put Retention Policy api should have been called twice. Number of times called: %v", prpc) } + close(stop) + wg.Wait() } func TestPutRetentionWhenError(t *testing.T) { @@ -718,7 +738,7 @@ func TestPutRetentionWhenError(t *testing.T) { } var logbuf bytes.Buffer log.SetOutput(io.MultiWriter(&logbuf, os.Stdout)) - p := NewPusher(Target{"G", "S", 1}, &s, 1*time.Hour, maxRetryTimeout, models.NewLogger("cloudwatchlogs", "test", "")) + stop, p := testPreparation(1, &s, 1*time.Hour, maxRetryTimeout) time.Sleep(10 * time.Millisecond) loglines := strings.Split(strings.TrimSpace(logbuf.String()), "\n") logline := loglines[0] @@ -728,6 +748,9 @@ func TestPutRetentionWhenError(t *testing.T) { if !strings.Contains(logline, "ResourceNotFound") { t.Errorf("Expecting ResourceNotFoundException but got '%s' in the log", logbuf.String()) } + + close(stop) + wg.Wait() } func TestResendWouldStopAfterExhaustedRetries(t *testing.T) { var s svcMock @@ -741,7 +764,7 @@ func TestResendWouldStopAfterExhaustedRetries(t *testing.T) { var logbuf bytes.Buffer log.SetOutput(io.MultiWriter(&logbuf, os.Stdout)) - p := NewPusher(Target{"G", "S", 1}, &s, 10*time.Millisecond, time.Second, models.NewLogger("cloudwatchlogs", "test", "")) + stop, p := testPreparation(1, &s, 10*time.Millisecond, time.Second) p.AddEvent(evtMock{"msg", time.Now(), nil}) time.Sleep(2 * time.Second) @@ -753,6 +776,12 @@ func TestResendWouldStopAfterExhaustedRetries(t *testing.T) { } log.SetOutput(os.Stderr) - p.Stop() - time.Sleep(10 * time.Millisecond) + close(stop) + wg.Wait() +} + +func testPreparation(retention int, s *svcMock, flushTimeout time.Duration, retryDuration time.Duration) (chan struct{}, *pusher) { + stop := make(chan struct{}) + p := NewPusher(Target{"G", "S", retention}, s, flushTimeout, retryDuration, models.NewLogger("cloudwatchlogs", "test", ""), stop, &wg) + return stop, p } From b44a5951b2e31645f1dff0bccc2cde8400469b8b Mon Sep 17 00:00:00 2001 From: Brandon Dahler Date: Fri, 11 Mar 2022 22:56:04 -0500 Subject: [PATCH 3/3] Stop pusher retries when stop requested. --- plugins/outputs/cloudwatchlogs/pusher.go | 10 ++++++- plugins/outputs/cloudwatchlogs/pusher_test.go | 26 +++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/plugins/outputs/cloudwatchlogs/pusher.go b/plugins/outputs/cloudwatchlogs/pusher.go index 69951a58ce..d0ebae8cf5 100644 --- a/plugins/outputs/cloudwatchlogs/pusher.go +++ b/plugins/outputs/cloudwatchlogs/pusher.go @@ -300,7 +300,15 @@ func (p *pusher) send() { } p.Log.Warnf("Retried %v time, going to sleep %v before retrying.", retryCount, wait) - time.Sleep(wait) + + select { + case <-p.stop: + p.Log.Errorf("Stop requested after %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCount, p.Group, p.Stream) + p.reset() + return + case <-time.After(wait): + } + retryCount++ } diff --git a/plugins/outputs/cloudwatchlogs/pusher_test.go b/plugins/outputs/cloudwatchlogs/pusher_test.go index 957b5df96b..a739f89563 100644 --- a/plugins/outputs/cloudwatchlogs/pusher_test.go +++ b/plugins/outputs/cloudwatchlogs/pusher_test.go @@ -159,6 +159,32 @@ func TestStopPusherWouldDoFinalSend(t *testing.T) { } } +func TestStopPusherWouldStopRetries(t *testing.T) { + var s svcMock + + s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + return nil, &cloudwatchlogs.ServiceUnavailableException{} + } + + stop, p := testPreparation(1, &s, 1*time.Hour, maxRetryTimeout) + p.AddEvent(evtMock{"MSG", time.Now(), nil}) + + sendComplete := make(chan struct{}) + + go func() { + defer close(sendComplete) + p.send() + }() + + close(stop) + + select { + case <-time.After(50 * time.Millisecond): + t.Errorf("send did not quit retrying after p has been Stopped.") + case <-sendComplete: + } +} + func TestLongMessageGetsTruncated(t *testing.T) { var s svcMock nst := "NEXT_SEQ_TOKEN"