Skip to content

Commit

Permalink
Refactor UDP & TCP input buffers
Browse files Browse the repository at this point in the history
closes #991
  • Loading branch information
sparrc committed Apr 7, 2016
1 parent c6faf00 commit bb9a5dc
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 16 deletions.
15 changes: 9 additions & 6 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
const (
// UDP packet limit, see
// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
UDP_PACKET_SIZE int = 65507
UDP_MAX_PACKET_SIZE int = 64 * 1024

defaultFieldName = "value"

Expand Down Expand Up @@ -57,8 +57,10 @@ type Statsd struct {
// statsd protocol (http://docs.datadoghq.com/guides/dogstatsd/)
ParseDataDogTags bool

// UDPPacketSize is the size of the read packets for the server listening
// for statsd UDP packets. This will default to 1500 bytes.
// UDPPacketSize is deprecated, it's only here for legacy support
// we now always create 1 max size buffer and then copy only what we need
// into the in channel
// see https://github.com/influxdata/telegraf/pull/992
UDPPacketSize int `toml:"udp_packet_size"`

sync.Mutex
Expand Down Expand Up @@ -272,7 +274,7 @@ func (s *Statsd) udpListen() error {
}
log.Println("Statsd listener listening on: ", s.listener.LocalAddr().String())

buf := make([]byte, s.UDPPacketSize)
buf := make([]byte, UDP_MAX_PACKET_SIZE)
for {
select {
case <-s.done:
Expand All @@ -283,9 +285,11 @@ func (s *Statsd) udpListen() error {
log.Printf("ERROR READ: %s\n", err.Error())
continue
}
bufCopy := make([]byte, n)
copy(bufCopy, buf[:n])

select {
case s.in <- buf[:n]:
case s.in <- bufCopy:
default:
log.Printf(dropwarn, string(buf[:n]))
}
Expand Down Expand Up @@ -631,7 +635,6 @@ func init() {
inputs.Add("statsd", func() telegraf.Input {
return &Statsd{
MetricSeparator: "_",
UDPPacketSize: UDP_PACKET_SIZE,
}
})
}
12 changes: 10 additions & 2 deletions plugins/inputs/tcp_listener/tcp_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type TcpListener struct {
acc telegraf.Accumulator
}

var dropwarn = "ERROR: Message queue full. Discarding metric. " +
var dropwarn = "ERROR: Message queue full. Discarding metric [%s], " +
"You may want to increase allowed_pending_messages in the config\n"

const sampleConfig = `
Expand Down Expand Up @@ -193,6 +193,7 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) {
t.forget(id)
}()

var buf []byte
scanner := bufio.NewScanner(conn)
for {
select {
Expand All @@ -202,8 +203,15 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) {
if !scanner.Scan() {
return
}
buf = scanner.Bytes()
if len(buf) == 0 {
continue
}
bufCopy := make([]byte, len(buf))
copy(bufCopy, buf)

select {
case t.in <- scanner.Bytes():
case t.in <- bufCopy:
default:
log.Printf(dropwarn)
}
Expand Down
20 changes: 12 additions & 8 deletions plugins/inputs/udp_listener/udp_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import (
)

type UdpListener struct {
ServiceAddress string
ServiceAddress string
// UDPPacketSize is deprecated, it's only here for legacy support
// we now always create 1 max size buffer and then copy only what we need
// into the in channel
// see https://github.com/influxdata/telegraf/pull/992
UDPPacketSize int `toml:"udp_packet_size"`
AllowedPendingMessages int

Expand All @@ -32,7 +36,7 @@ type UdpListener struct {

// UDP packet limit, see
// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
const UDP_PACKET_SIZE int = 65507
const UDP_MAX_PACKET_SIZE int = 64 * 1024

var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
"You may want to increase allowed_pending_messages in the config\n"
Expand Down Expand Up @@ -104,7 +108,7 @@ func (u *UdpListener) udpListen() error {
}
log.Println("UDP server listening on: ", u.listener.LocalAddr().String())

buf := make([]byte, u.UDPPacketSize)
buf := make([]byte, UDP_MAX_PACKET_SIZE)
for {
select {
case <-u.done:
Expand All @@ -115,11 +119,13 @@ func (u *UdpListener) udpListen() error {
log.Printf("ERROR: %s\n", err.Error())
continue
}
bufCopy := make([]byte, n)
copy(bufCopy, buf[:n])

select {
case u.in <- buf[:n]:
case u.in <- bufCopy:
default:
log.Printf(dropwarn, string(buf[:n]))
log.Printf(dropwarn, string(bufCopy))
}
}
}
Expand Down Expand Up @@ -155,8 +161,6 @@ func (u *UdpListener) storeMetrics(metrics []telegraf.Metric) error {

func init() {
inputs.Add("udp_listener", func() telegraf.Input {
return &UdpListener{
UDPPacketSize: UDP_PACKET_SIZE,
}
return &UdpListener{}
})
}

0 comments on commit bb9a5dc

Please sign in to comment.