Skip to content

Commit

Permalink
memcached plugin: support unix sockets
Browse files Browse the repository at this point in the history
closes #415
  • Loading branch information
sparrc committed Dec 3, 2015
1 parent 7a2eeb7 commit 03863bd
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- [#412](https://github.com/influxdb/telegraf/pull/412): Additional memcached stats. Thanks @mgresser!
- [#410](https://github.com/influxdb/telegraf/pull/410): Additional redis metrics. Thanks @vlaadbrain!
- [#414](https://github.com/influxdb/telegraf/issues/414): Jolokia plugin auth parameters
- [#415](https://github.com/influxdb/telegraf/issues/415): memcached plugin: support unix sockets

### Bugfixes
- [#405](https://github.com/influxdb/telegraf/issues/405): Prometheus output cardinality issue
Expand Down
54 changes: 37 additions & 17 deletions plugins/memcached/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ import (

// Memcached is a memcached plugin
type Memcached struct {
Servers []string
Servers []string
UnixSockets []string
}

var sampleConfig = `
# An array of address to gather stats about. Specify an ip on hostname
# with optional port. ie localhost, 10.0.0.1:11211, etc.
#
# If no servers are specified, then localhost is used as the host.
servers = ["localhost"]
servers = ["localhost:11211"]
# unix_sockets = ["/var/run/memcached.sock"]
`

var defaultTimeout = 5 * time.Second
Expand Down Expand Up @@ -68,31 +70,49 @@ func (m *Memcached) Description() string {

// Gather reads stats from all configured servers accumulates stats
func (m *Memcached) Gather(acc plugins.Accumulator) error {
if len(m.Servers) == 0 {
return m.gatherServer(":11211", acc)
if len(m.Servers) == 0 && len(m.UnixSockets) == 0 {
return m.gatherServer(":11211", false, acc)
}

for _, serverAddress := range m.Servers {
if err := m.gatherServer(serverAddress, acc); err != nil {
if err := m.gatherServer(serverAddress, false, acc); err != nil {
return err
}
}

for _, unixAddress := range m.UnixSockets {
if err := m.gatherServer(unixAddress, true, acc); err != nil {
return err
}
}

return nil
}

func (m *Memcached) gatherServer(address string, acc plugins.Accumulator) error {
_, _, err := net.SplitHostPort(address)
if err != nil {
address = address + ":11211"
}
func (m *Memcached) gatherServer(
address string,
unix bool,
acc plugins.Accumulator,
) error {
var conn net.Conn
if unix {
conn, err := net.DialTimeout("unix", address, defaultTimeout)
if err != nil {
return err
}
defer conn.Close()
} else {
_, _, err := net.SplitHostPort(address)
if err != nil {
address = address + ":11211"
}

// Connect
conn, err := net.DialTimeout("tcp", address, defaultTimeout)
if err != nil {
return err
conn, err = net.DialTimeout("tcp", address, defaultTimeout)
if err != nil {
return err
}
defer conn.Close()
}
defer conn.Close()

// Extend connection
conn.SetDeadline(time.Now().Add(defaultTimeout))
Expand All @@ -101,10 +121,10 @@ func (m *Memcached) gatherServer(address string, acc plugins.Accumulator) error
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))

// Send command
if _, err = fmt.Fprint(rw, "stats\r\n"); err != nil {
if _, err := fmt.Fprint(rw, "stats\r\n"); err != nil {
return err
}
if err = rw.Flush(); err != nil {
if err := rw.Flush(); err != nil {
return err
}

Expand Down

0 comments on commit 03863bd

Please sign in to comment.