Skip to content

Commit

Permalink
Added retry function to reconnect and retry after an unsuccessful flu…
Browse files Browse the repository at this point in the history
…sh to prevent the client to stay in a permanent disconnected state. (#2)
  • Loading branch information
gguridi committed Oct 13, 2020
1 parent d590abc commit f6972d4
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 5 deletions.
11 changes: 11 additions & 0 deletions aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Aggregator interface {
SetInactive(string)
Run(time.Duration, chan bool) Aggregator
Flush() (int, error)
Retry() (int, error)
}

type aggregator struct {
Expand All @@ -33,6 +34,13 @@ func (a *aggregator) GetMetrics() map[string]Metric {
return a.metrics
}

// Retry tries to retry the flush of metrics in case something went wrong. If this
// retry went wrong it won't try a third time.
func (a *aggregator) Retry() (int, error) {
a.client.Reconnect()
return a.Flush()
}

func (a *aggregator) getMetric(path string, defaultMetric Metric) Metric {
metricPath := a.config.getMetricPath(path)
if metric, exists := a.metrics[metricPath]; exists {
Expand Down Expand Up @@ -80,6 +88,9 @@ func (a *aggregator) run(period time.Duration, stopSendingMetrics chan bool) {
case <-ticker.C:
if _, err := a.Flush(); err != nil {
log.Printf("Unable to send metrics: %s\n", err.Error())
if _, err := a.Retry(); err != nil {
log.Printf("Unable to send metrics after reconnecting neither: %s\n", err.Error())
}
}
case <-stopSendingMetrics:
return
Expand Down
9 changes: 9 additions & 0 deletions aggregator_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type MockAggregator struct {
MethodSetInactive func(*MockAggregator, string)
MethodRun func(*MockAggregator, time.Duration, chan bool) Aggregator
MethodFlush func(*MockAggregator) (int, error)
MethodRetry func(*MockAggregator) (int, error)
}

// AddSum is an implementation of Aggregator interface to be used with the mocking object.
Expand Down Expand Up @@ -76,3 +77,11 @@ func (m *MockAggregator) Flush() (int, error) {
}
return 0, nil
}

// Retry is an implementation of Aggregator interface to be used with the mocking object.
func (m *MockAggregator) Retry() (int, error) {
if m.MethodRetry != nil {
return m.MethodRetry(m)
}
return 0, nil
}
24 changes: 19 additions & 5 deletions aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"bytes"
"errors"
"fmt"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"strconv"
"sync"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var _ = Describe("graphite aggregator", func() {
Expand Down Expand Up @@ -48,7 +49,12 @@ var _ = Describe("graphite aggregator", func() {

BeforeEach(func() {
client = &MockGraphite{
Data: map[string]string{},
Extra: map[string]interface{}{},
Data: map[string]string{},
MethodReconnect: func(m *MockGraphite) error {
m.Extra["reconnected"] = "yes, it tried!"
return nil
},
MethodSendBuffer: func(m *MockGraphite, buffer *bytes.Buffer) (int, error) {
mutex.Lock()
defer mutex.Unlock()
Expand Down Expand Up @@ -272,13 +278,21 @@ var _ = Describe("graphite aggregator", func() {
agg.Run(2*time.Second, stop)
agg.AddSum(failMetric, 15)
time.Sleep(3 * time.Second)
Expect(getFlushSent(client)).To(Equal(1))
Expect(getFlushSent(client)).To(Equal(2))
agg.AddSum(failMetric, 25)
time.Sleep(2 * time.Second)
Expect(getFlushSent(client)).To(Equal(2))
Expect(getFlushSent(client)).To(Equal(4))
metrics := agg.(*aggregator).GetMetrics()
Expect(metrics[failMetric].Calculate()).To(Equal("40"))
})

It("retries once if something went wrong", func() {
agg.Run(2*time.Second, stop)
agg.AddSum(failMetric, 15)
time.Sleep(3 * time.Second)
Expect(getFlushSent(client)).To(Equal(2))
Expect(client.(*MockGraphite).Extra).To(HaveKey("reconnected"))
})
})

Context("uses client configuration", func() {
Expand Down
1 change: 1 addition & 0 deletions graphite_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

// MockGraphite implements the interface Graphite
type MockGraphite struct {
Extra map[string]interface{}
Data map[string]string
MethodSend func(*MockGraphite, string, string) (int, error)
MethodSendBuffer func(*MockGraphite, *bytes.Buffer) (int, error)
Expand Down

0 comments on commit f6972d4

Please sign in to comment.