Skip to content

Commit

Permalink
Add aurora input plugin (influxdata#4158)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and otherpirate committed Mar 15, 2019
1 parent c9198ed commit 89269b1
Show file tree
Hide file tree
Showing 5 changed files with 610 additions and 0 deletions.
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/aerospike"
_ "github.com/influxdata/telegraf/plugins/inputs/amqp_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/apache"
_ "github.com/influxdata/telegraf/plugins/inputs/aurora"
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
_ "github.com/influxdata/telegraf/plugins/inputs/bond"
_ "github.com/influxdata/telegraf/plugins/inputs/cassandra"
Expand Down
63 changes: 63 additions & 0 deletions plugins/inputs/aurora/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Aurora Input Plugin

The Aurora Input Plugin gathers metrics from [Apache Aurora](https://aurora.apache.org/) schedulers.

For monitoring recommendations reference [Monitoring your Aurora cluster](https://aurora.apache.org/documentation/latest/operations/monitoring/)

### Configuration:

```toml
[[inputs.aurora]]
## Schedulers are the base addresses of your Aurora Schedulers
schedulers = ["http://127.0.0.1:8081"]

## Set of role types to collect metrics from.
##
## The scheduler roles are checked each interval by contacting the
## scheduler nodes; zookeeper is not contacted.
# roles = ["leader", "follower"]

## Timeout is the max time for total network operations.
# timeout = "5s"

## Username and password are sent using HTTP Basic Auth.
# username = "username"
# password = "pa$$word"

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
```

### Metrics:

- aurora
- tags:
- scheduler (URL of scheduler)
- role (leader or follower)
- fields:
- Numeric metrics are collected from the `/vars` endpoint; string fields
are not gathered.


### Troubleshooting:

Check the Scheduler role, the leader will return a 200 status:
```
curl -v http://127.0.0.1:8081/leaderhealth
```

Get available metrics:
```
curl http://127.0.0.1:8081/vars
```

### Example Output:

The example output below has been trimmed.
```
> aurora,role=leader,scheduler=http://debian-stretch-aurora-coordinator-3.virt:8081 CronBatchWorker_batch_locked_events=0i,CronBatchWorker_batch_locked_events_per_sec=0,CronBatchWorker_batch_locked_nanos_per_event=0,CronBatchWorker_batch_locked_nanos_total=0i,CronBatchWorker_batch_locked_nanos_total_per_sec=0,CronBatchWorker_batch_unlocked_events=0i,CronBatchWorker_batch_unlocked_events_per_sec=0,CronBatchWorker_batch_unlocked_nanos_per_event=0,CronBatchWorker_batch_unlocked_nanos_total=0i,CronBatchWorker_batch_unlocked_nanos_total_per_sec=0,CronBatchWorker_batches_processed=0i,CronBatchWorker_items_processed=0i,CronBatchWorker_last_processed_batch_size=0i,CronBatchWorker_queue_size=0i,TaskEventBatchWorker_batch_locked_events=0i,TaskEventBatchWorker_batch_locked_events_per_sec=0,TaskEventBatchWorker_batch_locked_nanos_per_event=0,TaskEventBatchWorker_batch_locked_nanos_total=0i,TaskEventBatchWorker_batch_locked_nanos_total_per_sec=0,TaskEventBatchWorker_batch_unlocked_events=0i,TaskEventBatchWorker_batch_unlocked_events_per_sec=0,TaskEventBatchWorker_batch_unlocked_nanos_per_event=0,TaskEventBatchWorker_batch_unlocked_nanos_total=0i,TaskEventBatchWorker_batch_unlocked_nanos_total_per_sec=0,TaskEventBatchWorker_batches_processed=0i,TaskEventBatchWorker_items_processed=0i,TaskEventBatchWorker_last_processed_batch_size=0i,TaskEventBatchWorker_queue_size=0i,TaskGroupBatchWorker_batch_locked_events=0i,TaskGroupBatchWorker_batch_locked_events_per_sec=0,TaskGroupBatchWorker_batch_locked_nanos_per_event=0,TaskGroupBatchWorker_batch_locked_nanos_total=0i,TaskGroupBatchWorker_batch_locked_nanos_total_per_sec=0,TaskGroupBatchWorker_batch_unlocked_events=0i,TaskGroupBatchWorker_batch_unlocked_events_per_sec=0,TaskGroupBatchWorker_batch_unlocked_nanos_per_event=0,TaskGroupBatchWorker_batch_unlocked_nanos_total=0i,TaskGroupBatchWorker_batch_unlocked_nanos_total_per_sec=0,TaskGroupBatchWorker_batches_processed=0i,TaskGroupBatchWorker_items_processed=0i,TaskGroupBatchWorker_last_processed_batch_size=0i,TaskGroupBatchWorker_queue_size=0i,assigner_launch_failures=0i,async_executor_uncaught_exceptions=0i,async_tasks_completed=1i,cron_job_collisions=0i,cron_job_concurrent_runs=0i,cron_job_launch_failures=0i,cron_job_misfires=0i,cron_job_parse_failures=0i,cron_job_triggers=0i,cron_jobs_loaded=1i,empty_slots_dedicated_large=0i,empty_slots_dedicated_medium=0i,empty_slots_dedicated_revocable_large=0i,empty_slots_dedicated_revocable_medium=0i,empty_slots_dedicated_revocable_small=0i,empty_slots_dedicated_revocable_xlarge=0i,empty_slots_dedicated_small=0i,empty_slots_dedicated_xlarge=0i,empty_slots_large=0i,empty_slots_medium=0i,empty_slots_revocable_large=0i,empty_slots_revocable_medium=0i,empty_slots_revocable_small=0i,empty_slots_revocable_xlarge=0i,empty_slots_small=0i,empty_slots_xlarge=0i,event_bus_dead_events=0i,event_bus_exceptions=1i,framework_registered=1i,globally_banned_offers_size=0i,http_200_responses_events=55i,http_200_responses_events_per_sec=0,http_200_responses_nanos_per_event=0,http_200_responses_nanos_total=310416694i,http_200_responses_nanos_total_per_sec=0,job_update_delete_errors=0i,job_update_recovery_errors=0i,job_update_state_change_errors=0i,job_update_store_delete_all_events=1i,job_update_store_delete_all_events_per_sec=0,job_update_store_delete_all_nanos_per_event=0,job_update_store_delete_all_nanos_total=1227254i,job_update_store_delete_all_nanos_total_per_sec=0,job_update_store_fetch_details_query_events=74i,job_update_store_fetch_details_query_events_per_sec=0,job_update_store_fetch_details_query_nanos_per_event=0,job_update_store_fetch_details_query_nanos_total=24643149i,job_update_store_fetch_details_query_nanos_total_per_sec=0,job_update_store_prune_history_events=59i,job_update_store_prune_history_events_per_sec=0,job_update_store_prune_history_nanos_per_event=0,job_update_store_prune_history_nanos_total=262868218i,job_update_store_prune_history_nanos_total_per_sec=0,job_updates_pruned=0i,jvm_available_processors=2i,jvm_class_loaded_count=6707i,jvm_class_total_loaded_count=6732i,jvm_class_unloaded_count=25i,jvm_gc_PS_MarkSweep_collection_count=2i,jvm_gc_PS_MarkSweep_collection_time_ms=223i,jvm_gc_PS_Scavenge_collection_count=27i,jvm_gc_PS_Scavenge_collection_time_ms=1691i,jvm_gc_collection_count=29i,jvm_gc_collection_time_ms=1914i,jvm_memory_free_mb=65i,jvm_memory_heap_mb_committed=157i,jvm_memory_heap_mb_max=446i,jvm_memory_heap_mb_used=91i,jvm_memory_max_mb=446i,jvm_memory_mb_total=157i,jvm_memory_non_heap_mb_committed=50i,jvm_memory_non_heap_mb_max=0i,jvm_memory_non_heap_mb_used=49i,jvm_threads_active=47i,jvm_threads_daemon=28i,jvm_threads_peak=48i,jvm_threads_started=62i,jvm_time_ms=1526530686927i,jvm_uptime_secs=79947i,log_entry_serialize_events=16i,log_entry_serialize_events_per_sec=0,log_entry_serialize_nanos_per_event=0,log_entry_serialize_nanos_total=4815321i,log_entry_serialize_nanos_total_per_sec=0,log_manager_append_events=16i,log_manager_append_events_per_sec=0,log_manager_append_nanos_per_event=0,log_manager_append_nanos_total=506453428i,log_manager_append_nanos_total_per_sec=0,log_manager_deflate_events=14i,log_manager_deflate_events_per_sec=0,log_manager_deflate_nanos_per_event=0,log_manager_deflate_nanos_total=21010565i,log_manager_deflate_nanos_total_per_sec=0 1526530687000000000
```
280 changes: 280 additions & 0 deletions plugins/inputs/aurora/aurora.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
package aurora

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)

type RoleType int

const (
Unknown RoleType = iota
Leader
Follower
)

func (r RoleType) String() string {
switch r {
case Leader:
return "leader"
case Follower:
return "follower"
default:
return "unknown"
}
}

var (
defaultTimeout = 5 * time.Second
defaultRoles = []string{"leader", "follower"}
)

type Vars map[string]interface{}

type Aurora struct {
Schedulers []string `toml:"schedulers"`
Roles []string `toml:"roles"`
Timeout internal.Duration `toml:"timeout"`
Username string `toml:"username"`
Password string `toml:"password"`
tls.ClientConfig

client *http.Client
urls []*url.URL
}

var sampleConfig = `
## Schedulers are the base addresses of your Aurora Schedulers
schedulers = ["http://127.0.0.1:8081"]
## Set of role types to collect metrics from.
##
## The scheduler roles are checked each interval by contacting the
## scheduler nodes; zookeeper is not contacted.
# roles = ["leader", "follower"]
## Timeout is the max time for total network operations.
# timeout = "5s"
## Username and password are sent using HTTP Basic Auth.
# username = "username"
# password = "pa$$word"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
`

func (a *Aurora) SampleConfig() string {
return sampleConfig
}

func (a *Aurora) Description() string {
return "Gather metrics from Apache Aurora schedulers"
}

func (a *Aurora) Gather(acc telegraf.Accumulator) error {
if a.client == nil {
err := a.initialize()
if err != nil {
return err
}
}

ctx, cancel := context.WithTimeout(context.Background(), a.Timeout.Duration)
defer cancel()

var wg sync.WaitGroup
for _, u := range a.urls {
wg.Add(1)
go func(u *url.URL) {
defer wg.Done()
role, err := a.gatherRole(ctx, u)
if err != nil {
acc.AddError(fmt.Errorf("%s: %v", u, err))
return
}

if !a.roleEnabled(role) {
return
}

err = a.gatherScheduler(ctx, u, role, acc)
if err != nil {
acc.AddError(fmt.Errorf("%s: %v", u, err))
}
}(u)
}
wg.Wait()

return nil
}

func (a *Aurora) initialize() error {
tlsCfg, err := a.ClientConfig.TLSConfig()
if err != nil {
return err
}

client := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: tlsCfg,
},
}

urls := make([]*url.URL, 0, len(a.Schedulers))
for _, s := range a.Schedulers {
loc, err := url.Parse(s)
if err != nil {
return err
}

urls = append(urls, loc)
}

if a.Timeout.Duration < time.Second {
a.Timeout.Duration = defaultTimeout
}

if len(a.Roles) == 0 {
a.Roles = defaultRoles
}

a.client = client
a.urls = urls
return nil
}

func (a *Aurora) roleEnabled(role RoleType) bool {
if len(a.Roles) == 0 {
return true
}

for _, v := range a.Roles {
if role.String() == v {
return true
}
}
return false
}

func (a *Aurora) gatherRole(ctx context.Context, origin *url.URL) (RoleType, error) {
loc := *origin
loc.Path = "leaderhealth"
req, err := http.NewRequest("GET", loc.String(), nil)
if err != nil {
return Unknown, err
}

if a.Username != "" || a.Password != "" {
req.SetBasicAuth(a.Username, a.Password)
}
req.Header.Add("Accept", "text/plain")

resp, err := a.client.Do(req.WithContext(ctx))
if err != nil {
return Unknown, err
}
resp.Body.Close()

switch resp.StatusCode {
case http.StatusOK:
return Leader, nil
case http.StatusBadGateway:
fallthrough
case http.StatusServiceUnavailable:
return Follower, nil
default:
return Unknown, fmt.Errorf("%v", resp.Status)
}
}

func (a *Aurora) gatherScheduler(
ctx context.Context, origin *url.URL, role RoleType, acc telegraf.Accumulator,
) error {
loc := *origin
loc.Path = "vars.json"
req, err := http.NewRequest("GET", loc.String(), nil)
if err != nil {
return err
}

if a.Username != "" || a.Password != "" {
req.SetBasicAuth(a.Username, a.Password)
}
req.Header.Add("Accept", "application/json")

resp, err := a.client.Do(req.WithContext(ctx))
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%v", resp.Status)
}

var vars Vars
decoder := json.NewDecoder(resp.Body)
decoder.UseNumber()
err = decoder.Decode(&vars)
if err != nil {
return fmt.Errorf("decoding response: %v", err)
}

var fields = make(map[string]interface{}, len(vars))
for k, v := range vars {
switch v := v.(type) {
case json.Number:
// Aurora encodes numbers as you would specify them as a literal,
// use this to determine if a value is a float or int.
if strings.ContainsAny(v.String(), ".eE") {
fv, err := v.Float64()
if err != nil {
acc.AddError(err)
continue
}
fields[k] = fv
} else {
fi, err := v.Int64()
if err != nil {
acc.AddError(err)
continue
}
fields[k] = fi
}
default:
continue
}
}

acc.AddFields("aurora",
fields,
map[string]string{
"scheduler": origin.String(),
"role": role.String(),
},
)
return nil
}

func init() {
inputs.Add("aurora", func() telegraf.Input {
return &Aurora{}
})
}
Loading

0 comments on commit 89269b1

Please sign in to comment.