Skip to content

Commit

Permalink
Add gathering of RabbitMQ federation link metrics (influxdata#6283)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacquesh authored and idohalevi committed Sep 23, 2020
1 parent 7be6eb7 commit 8059d65
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 7 deletions.
25 changes: 25 additions & 0 deletions plugins/inputs/rabbitmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ For additional details reference the [RabbitMQ Management HTTP Stats][management
## specified, metrics for all exchanges are gathered.
# exchanges = ["telegraf"]

## A list of federation upstreams to gather as the rabbitmq_federation measurement.
## If not specified, metrics for all federation upstreams are gathered.
## Federation link metrics will only be gathered for queues and exchanges
## whose non-federation metrics will be collected (e.g a queue excluded
## by the 'queue_name_exclude' option will also be excluded from federation).
# federation_upstreams = ["dataCentre2"]

## Queues to include and exclude. Globs accepted.
## Note that an empty array for both will include all queues
# queue_name_include = []
Expand Down Expand Up @@ -158,6 +165,16 @@ For additional details reference the [RabbitMQ Management HTTP Stats][management
- messages_publish_out (int, count)
- messages_publish_out_rate (int, messages per second)

- rabbitmq_federation
- acks_uncommitted (int, count)
- consumers (int, count)
- messages_unacknowledged (int, count)
- messages_uncommitted (int, count)
- messages_unconfirmed (int, count)
- messages_confirm (int, count)
- messages_publish (int, count)
- messages_return_unroutable (int, count)

### Tags:

- All measurements have the following tags:
Expand Down Expand Up @@ -187,6 +204,14 @@ For additional details reference the [RabbitMQ Management HTTP Stats][management
- durable
- auto_delete

- rabbitmq_federation
- url
- vhost
- type
- upstream
- local_entity
- upstream_entity

### Sample Queries:

Message rates for the entire node can be calculated from total message counts. For instance, to get the rate of messages published per minute, use this query:
Expand Down
133 changes: 126 additions & 7 deletions plugins/inputs/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,17 @@ type RabbitMQ struct {
Queues []string
Exchanges []string

QueueInclude []string `toml:"queue_name_include"`
QueueExclude []string `toml:"queue_name_exclude"`
QueueInclude []string `toml:"queue_name_include"`
QueueExclude []string `toml:"queue_name_exclude"`
FederationUpstreamInclude []string `toml:"federation_upstream_include"`
FederationUpstreamExclude []string `toml:"federation_upstream_exclude"`

Client *http.Client

filterCreated bool
excludeEveryQueue bool
queueFilter filter.Filter
upstreamFilter filter.Filter
}

// OverviewResponse ...
Expand Down Expand Up @@ -178,6 +181,38 @@ type Exchange struct {
AutoDelete bool `json:"auto_delete"`
}

// FederationLinkChannelMessageStats ...
type FederationLinkChannelMessageStats struct {
Confirm int64 `json:"confirm"`
ConfirmDetails Details `json:"confirm_details"`
Publish int64 `json:"publish"`
PublishDetails Details `json:"publish_details"`
ReturnUnroutable int64 `json:"return_unroutable"`
ReturnUnroutableDetails Details `json:"return_unroutable_details"`
}

// FederationLinkChannel ...
type FederationLinkChannel struct {
AcksUncommitted int64 `json:"acks_uncommitted"`
ConsumerCount int64 `json:"consumer_count"`
MessagesUnacknowledged int64 `json:"messages_unacknowledged"`
MessagesUncommitted int64 `json:"messages_uncommitted"`
MessagesUnconfirmed int64 `json:"messages_unconfirmed"`
MessageStats FederationLinkChannelMessageStats `json:"message_stats"`
}

// FederationLink ...
type FederationLink struct {
Type string `json:"type"`
Queue string `json:"queue"`
UpstreamQueue string `json:"upstream_queue"`
Exchange string `json:"exchange"`
UpstreamExchange string `json:"upstream_exchange"`
Vhost string `json:"vhost"`
Upstream string `json:"upstream"`
LocalChannel FederationLinkChannel `json:"local_channel"`
}

type HealthCheck struct {
Status string `json:"status"`
}
Expand Down Expand Up @@ -214,7 +249,7 @@ type Memory struct {
// gatherFunc ...
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator)

var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues, gatherExchanges}
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues, gatherExchanges, gatherFederationLinks}

var sampleConfig = `
## Management Plugin url. (default: http://localhost:15672)
Expand Down Expand Up @@ -258,6 +293,15 @@ var sampleConfig = `
## Note that an empty array for both will include all queues
queue_name_include = []
queue_name_exclude = []
## Federation upstreams include and exclude when gathering the rabbitmq_federation measurement.
## If neither are specified, metrics for all federation upstreams are gathered.
## Federation link metrics will only be gathered for queues and exchanges
## whose non-federation metrics will be collected (e.g a queue excluded
## by the 'queue_name_exclude' option will also be excluded from federation).
## Globs accepted.
# federation_upstream_include = ["dataCentre-*"]
# federation_upstream_exclude = []
`

func boolToInt(b bool) int64 {
Expand Down Expand Up @@ -294,12 +338,16 @@ func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
}
}

// Create queue filter if not already created
// Create gather filters if not already created
if !r.filterCreated {
err := r.createQueueFilter()
if err != nil {
return err
}
err = r.createUpstreamFilter()
if err != nil {
return err
}
r.filterCreated = true
}

Expand Down Expand Up @@ -598,7 +646,7 @@ func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) {
}

for _, exchange := range exchanges {
if !r.shouldGatherExchange(exchange) {
if !r.shouldGatherExchange(exchange.Name) {
continue
}
tags := map[string]string{
Expand All @@ -624,6 +672,52 @@ func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) {
}
}

func gatherFederationLinks(r *RabbitMQ, acc telegraf.Accumulator) {
// Gather information about federation links
federationLinks := make([]FederationLink, 0)
err := r.requestJSON("/api/federation-links", &federationLinks)
if err != nil {
acc.AddError(err)
return
}

for _, link := range federationLinks {
if !r.shouldGatherFederationLink(link) {
continue
}

tags := map[string]string{
"url": r.URL,
"type": link.Type,
"vhost": link.Vhost,
"upstream": link.Upstream,
}

if link.Type == "exchange" {
tags["exchange"] = link.Exchange
tags["upstream_exchange"] = link.UpstreamExchange
} else {
tags["queue"] = link.Queue
tags["upstream_queue"] = link.UpstreamQueue
}

acc.AddFields(
"rabbitmq_federation",
map[string]interface{}{
"acks_uncommitted": link.LocalChannel.AcksUncommitted,
"consumers": link.LocalChannel.ConsumerCount,
"messages_unacknowledged": link.LocalChannel.MessagesUnacknowledged,
"messages_uncommitted": link.LocalChannel.MessagesUncommitted,
"messages_unconfirmed": link.LocalChannel.MessagesUnconfirmed,
"messages_confirm": link.LocalChannel.MessageStats.Confirm,
"messages_publish": link.LocalChannel.MessageStats.Publish,
"messages_return_unroutable": link.LocalChannel.MessageStats.ReturnUnroutable,
},
tags,
)
}
}

func (r *RabbitMQ) shouldGatherNode(node Node) bool {
if len(r.Nodes) == 0 {
return true
Expand Down Expand Up @@ -659,20 +753,45 @@ func (r *RabbitMQ) createQueueFilter() error {
return nil
}

func (r *RabbitMQ) shouldGatherExchange(exchange Exchange) bool {
func (r *RabbitMQ) createUpstreamFilter() error {
upstreamFilter, err := filter.NewIncludeExcludeFilter(r.FederationUpstreamInclude, r.FederationUpstreamExclude)
if err != nil {
return err
}
r.upstreamFilter = upstreamFilter

return nil
}

func (r *RabbitMQ) shouldGatherExchange(exchangeName string) bool {
if len(r.Exchanges) == 0 {
return true
}

for _, name := range r.Exchanges {
if name == exchange.Name {
if name == exchangeName {
return true
}
}

return false
}

func (r *RabbitMQ) shouldGatherFederationLink(link FederationLink) bool {
if !r.upstreamFilter.Match(link.Upstream) {
return false
}

switch link.Type {
case "exchange":
return r.shouldGatherExchange(link.Exchange)
case "queue":
return r.queueFilter.Match(link.Queue)
default:
return false
}
}

func init() {
inputs.Add("rabbitmq", func() telegraf.Input {
return &RabbitMQ{
Expand Down
14 changes: 14 additions & 0 deletions plugins/inputs/rabbitmq/rabbitmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
jsonFilePath = "testdata/exchanges.json"
case "/api/healthchecks/node/rabbit@vagrant-ubuntu-trusty-64":
jsonFilePath = "testdata/healthchecks.json"
case "/api/federation-links":
jsonFilePath = "testdata/federation-links.json"
case "/api/nodes/rabbit@vagrant-ubuntu-trusty-64/memory":
jsonFilePath = "testdata/memory.json"
default:
Expand Down Expand Up @@ -162,6 +164,18 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
"messages_publish_out_rate": 5.1,
}
compareMetrics(t, exchangeMetrics, acc, "rabbitmq_exchange")

federationLinkMetrics := map[string]interface{}{
"acks_uncommitted": 1,
"consumers": 2,
"messages_unacknowledged": 3,
"messages_uncommitted": 4,
"messages_unconfirmed": 5,
"messages_confirm": 67,
"messages_publish": 890,
"messages_return_unroutable": 1,
}
compareMetrics(t, federationLinkMetrics, acc, "rabbitmq_federation")
}

func compareMetrics(t *testing.T, expectedMetrics map[string]interface{},
Expand Down
63 changes: 63 additions & 0 deletions plugins/inputs/rabbitmq/testdata/federation-links.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
[
{
"node": "rabbit@rmqlocal",
"queue": "exampleLocalQueue",
"upstream_queue": "exampleUpstreamQueue",
"type": "queue",
"vhost": "/",
"upstream": "ExampleFederationUpstream",
"id": "8ba5218f",
"status": "running",
"local_connection": "<rabbit@somehost>",
"uri": "amqp://appsv03",
"timestamp": "2019-08-19 15:34:15",
"local_channel": {
"acks_uncommitted": 1,
"confirm": true,
"connection_details": {
"name": "<rabbit@somehost>",
"peer_host": "undefined",
"peer_port": "undefined"
},
"consumer_count": 2,
"garbage_collection": {
"fullsweep_after": 65535,
"max_heap_size": 0,
"min_bin_vheap_size": 46422,
"min_heap_size": 233,
"minor_gcs": 203
},
"global_prefetch_count": 0,
"message_stats": {
"confirm": 67,
"confirm_details": {
"rate": 2
},
"publish": 890,
"publish_details": {
"rate": 2
},
"return_unroutable": 1,
"return_unroutable_details": {
"rate": 0.1
}
},
"messages_unacknowledged": 3,
"messages_uncommitted": 4,
"messages_unconfirmed": 5,
"name": "<rabbit@somehost>",
"node": "rabbit@rmqlocal",
"number": 1,
"prefetch_count": 0,
"reductions": 1926653,
"reductions_details": {
"rate": 1068
},
"state": "running",
"transactional": false,
"user": "none",
"user_who_performed_action": "none",
"vhost": "sorandomsorandom"
}
}
]

0 comments on commit 8059d65

Please sign in to comment.