-
Notifications
You must be signed in to change notification settings - Fork 0
/
aggregator.go
139 lines (122 loc) · 4.22 KB
/
aggregator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package graphite
import (
"bytes"
"fmt"
"log"
"sync"
"time"
)
var mutex = &sync.Mutex{}
// Aggregator is an interface exposing the methods that we can use to work with different kinds of metrics
// in a transparent way for the user.
type Aggregator interface {
AddSum(string, interface{})
Increase(string)
AddAverage(string, interface{})
SetActive(string)
SetInactive(string)
Run(time.Duration, chan bool) Aggregator
Flush() (int, error)
Retry() (int, error)
}
type aggregator struct {
config *Config
metrics map[string]Metric
client Graphite
}
// GetMetrics retuns the metrics stored till this point in the aggregator.
func (a *aggregator) GetMetrics() map[string]Metric {
return a.metrics
}
// Retry tries to retry the flush of metrics in case something went wrong. If this
// retry went wrong it won't try a third time.
func (a *aggregator) Retry() (int, error) {
a.client.Reconnect()
return a.Flush()
}
func (a *aggregator) getMetric(path string, defaultMetric Metric) Metric {
metricPath := a.config.getMetricPath(path)
if metric, exists := a.metrics[metricPath]; exists {
return metric
}
return defaultMetric
}
func (a *aggregator) setMetric(path string, metric Metric) {
metricPath := a.config.getMetricPath(path)
a.metrics[metricPath] = metric
}
func (a *aggregator) updateMetric(path string, value interface{}, defaultMetric Metric) {
mutex.Lock()
defer mutex.Unlock()
metric := a.getMetric(path, defaultMetric)
metric.Update(value)
a.setMetric(path, metric)
}
// Flush forces sending the current stored metrics to graphite.
func (a *aggregator) Flush() (int, error) {
mutex.Lock()
defer mutex.Unlock()
if len(a.metrics) > 0 {
buffer := bytes.NewBufferString("")
timestamp := time.Now().Unix()
for path, metric := range a.metrics {
buffer.WriteString(fmt.Sprintf("%s %s %d\n", path, metric.Calculate(), timestamp))
}
n, err := a.client.SendBuffer(buffer)
if err == nil {
a.metrics = map[string]Metric{}
}
return n, err
}
return 0, nil
}
func (a *aggregator) run(period time.Duration, stopSendingMetrics chan bool) {
ticker := time.NewTicker(period)
for {
select {
case <-ticker.C:
if _, err := a.Flush(); err != nil {
log.Printf("Unable to send metrics: %s\n", err.Error())
if _, err := a.Retry(); err != nil {
log.Printf("Unable to send metrics after reconnecting neither: %s\n", err.Error())
}
}
case <-stopSendingMetrics:
return
}
}
}
// AddSum initialises a metric where the final value sent to graphite will be the addition
// of all the values passed to the aggregator. So if we call `AddSum` with a specific metric path and
// values 5, 10, 15 and then we `Flush`, we will be sending a final value of 30 to graphite.
func (a *aggregator) AddSum(path string, value interface{}) {
a.updateMetric(path, value, &MetricSum{})
}
// Increase is used as an alias of `AddSum` where the value incremented is always 1. Useful for giving
// a comprehensive behaviour to the metric.
func (a *aggregator) Increase(path string) {
a.updateMetric(path, 1, &MetricSum{})
}
// AddAverage initialises a metric where the final value sent to graphite will be the average
// of all the values passed to the aggregator. So if we call `AddAverage` with a specific metric path
// and values 2, 10, 10 and then we `Flush`, we will be sending a final value of 7.333333 to graphite. The
// maximum decimals allowed is 6.
func (a *aggregator) AddAverage(path string, value interface{}) {
a.updateMetric(path, value, &MetricAverage{})
}
// SetActive initialises a boolean metric where the final value sent to graphite will
// be 1, representing an `active` status.
func (a *aggregator) SetActive(path string) {
a.updateMetric(path, true, &MetricActive{})
}
// SetInactive initialises a boolean metric where the final value sent to graphite will
// be 0, representing an `inactive` status. It's inteded to be used with
func (a *aggregator) SetInactive(path string) {
a.updateMetric(path, false, &MetricActive{})
}
// Run starts a go routine to periodically flush the values stored in the aggregator to graphite.
// Useful if we don't want to manually call `Flush` every time.
func (a *aggregator) Run(period time.Duration, stopSendingMetrics chan bool) Aggregator {
go a.run(period, stopSendingMetrics)
return a
}