Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Beanstalkd input plugin #4272

Merged
merged 3 commits into from
Sep 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/apache"
_ "github.com/influxdata/telegraf/plugins/inputs/aurora"
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
_ "github.com/influxdata/telegraf/plugins/inputs/beanstalkd"
_ "github.com/influxdata/telegraf/plugins/inputs/bond"
_ "github.com/influxdata/telegraf/plugins/inputs/burrow"
_ "github.com/influxdata/telegraf/plugins/inputs/cassandra"
Expand Down
98 changes: 98 additions & 0 deletions plugins/inputs/beanstalkd/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Beanstalkd Input Plugin

The `beanstalkd` plugin collects server stats as well as tube stats (reported by `stats` and `stats-tube` commands respectively).

### Configuration:

```toml
[[inputs.beanstalkd]]
## Server to collect data from
server = "localhost:11300"

## List of tubes to gather stats about.
## If no tubes specified then data gathered for each tube on server reported by list-tubes command
tubes = ["notifications"]
```

### Metrics:

Please see the [Beanstalk Protocol doc](https://github.com/kr/beanstalkd/master/doc/protocol.txt) for detailed explanation of `stats` and `stats-tube` commands output.

`beanstalkd_overview` – statistical information about the system as a whole
- fields
- cmd_delete
- cmd_pause_tube
- current_jobs_buried
- current_jobs_delayed
- current_jobs_ready
- current_jobs_reserved
- current_jobs_urgent
- current_using
- current_waiting
- current_watching
- pause
- pause_time_left
- total_jobs
- tags
- name
- server (address taken from config)

`beanstalkd_tube` – statistical information about the specified tube
- fields
- binlog_current_index
- binlog_max_size
- binlog_oldest_index
- binlog_records_migrated
- binlog_records_written
- cmd_bury
- cmd_delete
- cmd_ignore
- cmd_kick
- cmd_list_tube_used
- cmd_list_tubes
- cmd_list_tubes_watched
- cmd_pause_tube
- cmd_peek
- cmd_peek_buried
- cmd_peek_delayed
- cmd_peek_ready
- cmd_put
- cmd_release
- cmd_reserve
- cmd_reserve_with_timeout
- cmd_stats
- cmd_stats_job
- cmd_stats_tube
- cmd_touch
- cmd_use
- cmd_watch
- current_connections
- current_jobs_buried
- current_jobs_delayed
- current_jobs_ready
- current_jobs_reserved
- current_jobs_urgent
- current_producers
- current_tubes
- current_waiting
- current_workers
- job_timeouts
- max_job_size
- pid
- rusage_stime
- rusage_utime
- total_connections
- total_jobs
- uptime
- tags
- hostname
- id
- server (address taken from config)
- version

### Example Output:
```
beanstalkd_overview,host=server.local,hostname=a2ab22ed12e0,id=232485800aa11b24,server=localhost:11300,version=1.10 cmd_stats_tube=29482i,current_jobs_delayed=0i,current_jobs_urgent=6i,cmd_kick=0i,cmd_stats=7378i,cmd_stats_job=0i,current_waiting=0i,max_job_size=65535i,pid=6i,cmd_bury=0i,cmd_reserve_with_timeout=0i,cmd_touch=0i,current_connections=1i,current_jobs_ready=6i,current_producers=0i,cmd_delete=0i,cmd_list_tubes=7369i,cmd_peek_ready=0i,cmd_put=6i,cmd_use=3i,cmd_watch=0i,current_jobs_reserved=0i,rusage_stime=6.07,cmd_list_tubes_watched=0i,cmd_pause_tube=0i,total_jobs=6i,binlog_records_migrated=0i,cmd_list_tube_used=0i,cmd_peek_delayed=0i,cmd_release=0i,current_jobs_buried=0i,job_timeouts=0i,binlog_current_index=0i,binlog_max_size=10485760i,total_connections=7378i,cmd_peek_buried=0i,cmd_reserve=0i,current_tubes=4i,binlog_records_written=0i,cmd_peek=0i,rusage_utime=1.13,uptime=7099i,binlog_oldest_index=0i,current_workers=0i,cmd_ignore=0i 1528801650000000000

beanstalkd_tube,host=server.local,name=notifications,server=localhost:11300 pause_time_left=0i,current_jobs_buried=0i,current_jobs_delayed=0i,current_jobs_reserved=0i,current_using=0i,current_waiting=0i,pause=0i,total_jobs=3i,cmd_delete=0i,cmd_pause_tube=0i,current_jobs_ready=3i,current_jobs_urgent=3i,current_watching=0i 1528801650000000000
```
270 changes: 270 additions & 0 deletions plugins/inputs/beanstalkd/beanstalkd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
package beanstalkd

import (
"fmt"
"io"
"net/textproto"
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"gopkg.in/yaml.v2"
)

const sampleConfig = `
## Server to collect data from
server = "localhost:11300"

## List of tubes to gather stats about.
## If no tubes specified then data gathered for each tube on server reported by list-tubes command
tubes = ["notifications"]
`

type Beanstalkd struct {
Server string `toml:"server"`
Tubes []string `toml:"tubes"`
}

func (b *Beanstalkd) Description() string {
return "Collects Beanstalkd server and tubes stats"
}

func (b *Beanstalkd) SampleConfig() string {
return sampleConfig
}

func (b *Beanstalkd) Gather(acc telegraf.Accumulator) error {
connection, err := textproto.Dial("tcp", b.Server)
if err != nil {
return err
}
defer connection.Close()

tubes := b.Tubes
if len(tubes) == 0 {
err = runQuery(connection, "list-tubes", &tubes)
if err != nil {
acc.AddError(err)
}
}

var wg sync.WaitGroup

wg.Add(1)
go func() {
err := b.gatherServerStats(connection, acc)
if err != nil {
acc.AddError(err)
}
wg.Done()
}()

for _, tube := range tubes {
wg.Add(1)
go func(tube string) {
b.gatherTubeStats(connection, tube, acc)
wg.Done()
}(tube)
}

wg.Wait()

return nil
}

func (b *Beanstalkd) gatherServerStats(connection *textproto.Conn, acc telegraf.Accumulator) error {
stats := new(statsResponse)
if err := runQuery(connection, "stats", stats); err != nil {
return err
}

acc.AddFields("beanstalkd_overview",
map[string]interface{}{
"binlog_current_index": stats.BinlogCurrentIndex,
"binlog_max_size": stats.BinlogMaxSize,
"binlog_oldest_index": stats.BinlogOldestIndex,
"binlog_records_migrated": stats.BinlogRecordsMigrated,
"binlog_records_written": stats.BinlogRecordsWritten,
"cmd_bury": stats.CmdBury,
"cmd_delete": stats.CmdDelete,
"cmd_ignore": stats.CmdIgnore,
"cmd_kick": stats.CmdKick,
"cmd_list_tube_used": stats.CmdListTubeUsed,
"cmd_list_tubes": stats.CmdListTubes,
"cmd_list_tubes_watched": stats.CmdListTubesWatched,
"cmd_pause_tube": stats.CmdPauseTube,
"cmd_peek": stats.CmdPeek,
"cmd_peek_buried": stats.CmdPeekBuried,
"cmd_peek_delayed": stats.CmdPeekDelayed,
"cmd_peek_ready": stats.CmdPeekReady,
"cmd_put": stats.CmdPut,
"cmd_release": stats.CmdRelease,
"cmd_reserve": stats.CmdReserve,
"cmd_reserve_with_timeout": stats.CmdReserveWithTimeout,
"cmd_stats": stats.CmdStats,
"cmd_stats_job": stats.CmdStatsJob,
"cmd_stats_tube": stats.CmdStatsTube,
"cmd_touch": stats.CmdTouch,
"cmd_use": stats.CmdUse,
"cmd_watch": stats.CmdWatch,
"current_connections": stats.CurrentConnections,
"current_jobs_buried": stats.CurrentJobsBuried,
"current_jobs_delayed": stats.CurrentJobsDelayed,
"current_jobs_ready": stats.CurrentJobsReady,
"current_jobs_reserved": stats.CurrentJobsReserved,
"current_jobs_urgent": stats.CurrentJobsUrgent,
"current_producers": stats.CurrentProducers,
"current_tubes": stats.CurrentTubes,
"current_waiting": stats.CurrentWaiting,
"current_workers": stats.CurrentWorkers,
"job_timeouts": stats.JobTimeouts,
"max_job_size": stats.MaxJobSize,
"pid": stats.Pid,
"rusage_stime": stats.RusageStime,
"rusage_utime": stats.RusageUtime,
"total_connections": stats.TotalConnections,
"total_jobs": stats.TotalJobs,
"uptime": stats.Uptime,
},
map[string]string{
"hostname": stats.Hostname,
"id": stats.Id,
"server": b.Server,
"version": stats.Version,
},
)

return nil
}

func (b *Beanstalkd) gatherTubeStats(connection *textproto.Conn, tube string, acc telegraf.Accumulator) error {
stats := new(statsTubeResponse)
if err := runQuery(connection, "stats-tube "+tube, stats); err != nil {
return err
}

acc.AddFields("beanstalkd_tube",
map[string]interface{}{
"cmd_delete": stats.CmdDelete,
"cmd_pause_tube": stats.CmdPauseTube,
"current_jobs_buried": stats.CurrentJobsBuried,
"current_jobs_delayed": stats.CurrentJobsDelayed,
"current_jobs_ready": stats.CurrentJobsReady,
"current_jobs_reserved": stats.CurrentJobsReserved,
"current_jobs_urgent": stats.CurrentJobsUrgent,
"current_using": stats.CurrentUsing,
"current_waiting": stats.CurrentWaiting,
"current_watching": stats.CurrentWatching,
"pause": stats.Pause,
"pause_time_left": stats.PauseTimeLeft,
"total_jobs": stats.TotalJobs,
},
map[string]string{
"name": stats.Name,
"server": b.Server,
},
)

return nil
}

func runQuery(connection *textproto.Conn, cmd string, result interface{}) error {
requestId, err := connection.Cmd(cmd)
if err != nil {
return err
}

connection.StartResponse(requestId)
defer connection.EndResponse(requestId)

status, err := connection.ReadLine()
if err != nil {
return err
}

size := 0
if _, err = fmt.Sscanf(status, "OK %d", &size); err != nil {
return err
}

body := make([]byte, size+2)
if _, err = io.ReadFull(connection.R, body); err != nil {
return err
}

return yaml.Unmarshal(body, result)
}

func init() {
inputs.Add("beanstalkd", func() telegraf.Input {
return &Beanstalkd{}
})
}

type statsResponse struct {
BinlogCurrentIndex int `yaml:"binlog-current-index"`
BinlogMaxSize int `yaml:"binlog-max-size"`
BinlogOldestIndex int `yaml:"binlog-oldest-index"`
BinlogRecordsMigrated int `yaml:"binlog-records-migrated"`
BinlogRecordsWritten int `yaml:"binlog-records-written"`
CmdBury int `yaml:"cmd-bury"`
CmdDelete int `yaml:"cmd-delete"`
CmdIgnore int `yaml:"cmd-ignore"`
CmdKick int `yaml:"cmd-kick"`
CmdListTubeUsed int `yaml:"cmd-list-tube-used"`
CmdListTubes int `yaml:"cmd-list-tubes"`
CmdListTubesWatched int `yaml:"cmd-list-tubes-watched"`
CmdPauseTube int `yaml:"cmd-pause-tube"`
CmdPeek int `yaml:"cmd-peek"`
CmdPeekBuried int `yaml:"cmd-peek-buried"`
CmdPeekDelayed int `yaml:"cmd-peek-delayed"`
CmdPeekReady int `yaml:"cmd-peek-ready"`
CmdPut int `yaml:"cmd-put"`
CmdRelease int `yaml:"cmd-release"`
CmdReserve int `yaml:"cmd-reserve"`
CmdReserveWithTimeout int `yaml:"cmd-reserve-with-timeout"`
CmdStats int `yaml:"cmd-stats"`
CmdStatsJob int `yaml:"cmd-stats-job"`
CmdStatsTube int `yaml:"cmd-stats-tube"`
CmdTouch int `yaml:"cmd-touch"`
CmdUse int `yaml:"cmd-use"`
CmdWatch int `yaml:"cmd-watch"`
CurrentConnections int `yaml:"current-connections"`
CurrentJobsBuried int `yaml:"current-jobs-buried"`
CurrentJobsDelayed int `yaml:"current-jobs-delayed"`
CurrentJobsReady int `yaml:"current-jobs-ready"`
CurrentJobsReserved int `yaml:"current-jobs-reserved"`
CurrentJobsUrgent int `yaml:"current-jobs-urgent"`
CurrentProducers int `yaml:"current-producers"`
CurrentTubes int `yaml:"current-tubes"`
CurrentWaiting int `yaml:"current-waiting"`
CurrentWorkers int `yaml:"current-workers"`
Hostname string `yaml:"hostname"`
Id string `yaml:"id"`
JobTimeouts int `yaml:"job-timeouts"`
MaxJobSize int `yaml:"max-job-size"`
Pid int `yaml:"pid"`
RusageStime float64 `yaml:"rusage-stime"`
RusageUtime float64 `yaml:"rusage-utime"`
TotalConnections int `yaml:"total-connections"`
TotalJobs int `yaml:"total-jobs"`
Uptime int `yaml:"uptime"`
Version string `yaml:"version"`
}

type statsTubeResponse struct {
CmdDelete int `yaml:"cmd-delete"`
CmdPauseTube int `yaml:"cmd-pause-tube"`
CurrentJobsBuried int `yaml:"current-jobs-buried"`
CurrentJobsDelayed int `yaml:"current-jobs-delayed"`
CurrentJobsReady int `yaml:"current-jobs-ready"`
CurrentJobsReserved int `yaml:"current-jobs-reserved"`
CurrentJobsUrgent int `yaml:"current-jobs-urgent"`
CurrentUsing int `yaml:"current-using"`
CurrentWaiting int `yaml:"current-waiting"`
CurrentWatching int `yaml:"current-watching"`
Name string `yaml:"name"`
Pause int `yaml:"pause"`
PauseTimeLeft int `yaml:"pause-time-left"`
TotalJobs int `yaml:"total-jobs"`
}
Loading