Skip to content

Commit

Permalink
Merge pull request #75 from libp2p/prometheus
Browse files Browse the repository at this point in the history
expose some Prometheus metrics
  • Loading branch information
marten-seemann committed Apr 8, 2021
2 parents e5e4091 + 0006048 commit cae447e
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 2 deletions.
176 changes: 176 additions & 0 deletions p2p/transport/tcp/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package tcp

import (
"strings"
"sync"
"time"

"github.com/marten-seemann/tcp"
"github.com/mikioh/tcpinfo"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/prometheus/client_golang/prometheus"
)

var (
newConns *prometheus.CounterVec
closedConns *prometheus.CounterVec
)

var collector *aggregatingCollector

func init() {
collector = newAggregatingCollector()
prometheus.MustRegister(collector)

const direction = "direction"

newConns = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "tcp_connections_new_total",
Help: "TCP new connections",
},
[]string{direction},
)
prometheus.MustRegister(newConns)
closedConns = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "tcp_connections_closed_total",
Help: "TCP connections closed",
},
[]string{direction},
)
prometheus.MustRegister(closedConns)
}

type aggregatingCollector struct {
mutex sync.Mutex

highestID uint64
conns map[uint64] /* id */ *tracingConn
rtts prometheus.Histogram
connDurations prometheus.Histogram
}

var _ prometheus.Collector = &aggregatingCollector{}

func newAggregatingCollector() *aggregatingCollector {
return &aggregatingCollector{
conns: make(map[uint64]*tracingConn),
rtts: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tcp_rtt",
Help: "TCP round trip time",
Buckets: prometheus.ExponentialBuckets(0.001, 1.25, 40), // 1ms to ~6000ms
}),
connDurations: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tcp_connection_duration",
Help: "TCP Connection Duration",
Buckets: prometheus.ExponentialBuckets(1, 1.5, 40), // 1s to ~12 weeks
}),
}
}

func (c *aggregatingCollector) AddConn(t *tracingConn) uint64 {
c.mutex.Lock()
defer c.mutex.Unlock()
c.highestID++
c.conns[c.highestID] = t
return c.highestID
}

func (c *aggregatingCollector) removeConn(id uint64) {
delete(c.conns, id)
}

func (c *aggregatingCollector) Describe(descs chan<- *prometheus.Desc) {
descs <- c.rtts.Desc()
descs <- c.connDurations.Desc()
}

func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) {
now := time.Now()
c.mutex.Lock()
for _, conn := range c.conns {
info, err := conn.getTCPInfo()
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") {
c.closedConn(conn)
continue
}
log.Errorf("Failed to get TCP info: %s", err)
continue
}
c.rtts.Observe(info.RTT.Seconds())
c.connDurations.Observe(now.Sub(conn.startTime).Seconds())
if info.State == tcpinfo.Closed {
c.closedConn(conn)
}
}
c.mutex.Unlock()
metrics <- c.rtts
metrics <- c.connDurations
}

func (c *aggregatingCollector) closedConn(conn *tracingConn) {
collector.removeConn(conn.id)
closedConns.WithLabelValues(conn.getDirection()).Inc()
}

type tracingConn struct {
id uint64

startTime time.Time
isClient bool

manet.Conn
tcpConn *tcp.Conn
}

func newTracingConn(c manet.Conn, isClient bool) (*tracingConn, error) {
conn, err := tcp.NewConn(c)
if err != nil {
return nil, err
}
tc := &tracingConn{
startTime: time.Now(),
isClient: isClient,
Conn: c,
tcpConn: conn,
}
tc.id = collector.AddConn(tc)
newConns.WithLabelValues(tc.getDirection()).Inc()
return tc, nil
}

func (c *tracingConn) getDirection() string {
if c.isClient {
return "outgoing"
}
return "incoming"
}

func (c *tracingConn) Close() error {
return c.Conn.Close()
}

func (c *tracingConn) getTCPInfo() (*tcpinfo.Info, error) {
var o tcpinfo.Info
var b [256]byte
i, err := c.tcpConn.Option(o.Level(), o.Name(), b[:])
if err != nil {
return nil, err
}
info := i.(*tcpinfo.Info)
return info, nil
}

type tracingListener struct {
manet.Listener
}

func (l *tracingListener) Accept() (manet.Conn, error) {
conn, err := l.Listener.Accept()
if err != nil {
return nil, err
}
return newTracingConn(conn, false)
}
8 changes: 6 additions & 2 deletions p2p/transport/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ func (t *TcpTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID)
// This means we can immediately reuse the 5-tuple and reconnect.
tryLinger(conn, 0)
tryKeepAlive(conn, true)
return t.Upgrader.UpgradeOutbound(ctx, t, conn, p)
c, err := newTracingConn(conn, true)
if err != nil {
return nil, err
}
return t.Upgrader.UpgradeOutbound(ctx, t, c, p)
}

// UseReuseport returns true if reuseport is enabled and available.
Expand All @@ -153,7 +157,7 @@ func (t *TcpTransport) Listen(laddr ma.Multiaddr) (transport.Listener, error) {
if err != nil {
return nil, err
}
list = &tcpListener{list, 0}
list = &tracingListener{&tcpListener{list, 0}}
return t.Upgrader.UpgradeListener(t, list), nil
}

Expand Down

0 comments on commit cae447e

Please sign in to comment.