diff --git a/p2p.go b/p2p.go index 1283f7f..46a97f0 100644 --- a/p2p.go +++ b/p2p.go @@ -16,6 +16,7 @@ import ( "github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multihash" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "golang.org/x/time/rate" @@ -24,6 +25,7 @@ import ( connmgr "github.com/libp2p/go-libp2p-connmgr" core "github.com/libp2p/go-libp2p-core" "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/metrics" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/pnet" "github.com/libp2p/go-libp2p-core/protocol" @@ -118,6 +120,36 @@ var ( } ) +var ( + _p2pBandwidthGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "iotex_p2p_bandwidth_gauge", + Help: "P2P bandwidth stats", + }, + []string{"protocol", "type"}, + ) + _p2pGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "iotex_p2p_gauge", + Help: "P2P conn stats", + }, + []string{"type"}, + ) + _p2pPeersGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "iotex_p2p_peers_gauge", + Help: "P2P peers stats", + }, + []string{"peer"}, + ) +) + +func init() { + prometheus.MustRegister(_p2pBandwidthGauge) + prometheus.MustRegister(_p2pGauge) + prometheus.MustRegister(_p2pPeersGauge) +} + // Option defines the option function to modify the config for a host type Option func(cfg *Config) error @@ -266,6 +298,8 @@ type Host struct { peersLimiters *lru.Cache unicastLimiter *rate.Limiter peerManager *peerManager + bc *metrics.BandwidthCounter + connMgr *connmgr.BasicConnMgr } // NewHost constructs a host struct @@ -308,6 +342,8 @@ func NewHost(ctx context.Context, options ...Option) (*Host, error) { if err != nil { return nil, err } + bc := metrics.NewBandwidthCounter() + connMgr := connmgr.NewConnManager(cfg.ConnLowWater, cfg.ConnHighWater, cfg.ConnGracePeriod) opts := []libp2p.Option{ libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/%s/tcp/%d", ip, cfg.Port)), libp2p.AddrsFactory(func(addrs []multiaddr.Multiaddr) []multiaddr.Multiaddr { @@ -322,7 +358,8 @@ func NewHost(ctx context.Context, options ...Option) (*Host, error) { return &tcp.TcpTransport{Upgrader: upgrader, ConnectTimeout: cfg.ConnectTimeout} }), libp2p.Muxer("/yamux/2.0.0", yamux.DefaultTransport), - libp2p.ConnectionManager(connmgr.NewConnManager(cfg.ConnLowWater, cfg.ConnHighWater, cfg.ConnGracePeriod)), + libp2p.ConnectionManager(connMgr), + libp2p.BandwidthReporter(bc), } if !cfg.SecureIO { @@ -401,6 +438,8 @@ func NewHost(ctx context.Context, options ...Option) (*Host, error) { unicastLimiter: rate.NewLimiter(rate.Limit(cfg.RateLimit.GlobalUnicastAvg), cfg.RateLimit.GlobalUnicastBurst), peerManager: newPeerManager(host, discovery.NewRoutingDiscovery(kad), cfg.GroupID, withMaxPeers(cfg.MaxPeer), withBlacklistTolerance(cfg.BlacklistTolerance), withBlacklistTimeout(cfg.BlackListTimeout)), + bc: bc, + connMgr: connMgr, } addrs := make([]string, 0) @@ -411,6 +450,30 @@ func NewHost(ctx context.Context, options ...Option) (*Host, error) { zap.Strings("address", addrs), zap.Bool("secureIO", myHost.cfg.SecureIO), zap.Bool("gossip", myHost.cfg.Gossip)) + // start metrics update + go func() { + ticker := time.NewTicker(500 * time.Millisecond) + for { + select { + case <-ticker.C: + myHost.updateMetrics() + case <-myHost.close: + return + } + } + }() + // start print peers + go func() { + ticker := time.NewTicker(1 * time.Minute) + for { + select { + case <-ticker.C: + kad.RoutingTable().Print() + case <-myHost.close: + return + } + } + }() return &myHost, nil } @@ -684,6 +747,91 @@ func (h *Host) Close() error { return nil } +func (h *Host) updateMetrics() { + if h.bc != nil { + for p, stats := range h.bc.GetBandwidthByProtocol() { + protocol := string(p) + _p2pBandwidthGauge.WithLabelValues(protocol, "in").Set(float64(stats.TotalIn)) + _p2pBandwidthGauge.WithLabelValues(protocol, "out").Set(float64(stats.TotalOut)) + _p2pBandwidthGauge.WithLabelValues(protocol, "rateIn").Set(float64(stats.RateIn)) + _p2pBandwidthGauge.WithLabelValues(protocol, "rateOut").Set(float64(stats.RateOut)) + } + } + if h.connMgr != nil { + _p2pGauge.WithLabelValues("numConnectedPeers").Set(float64(h.connMgr.GetInfo().ConnCount)) + _p2pGauge.WithLabelValues("numProtectedPeers").Set(float64(len(h.protectedPeers()))) + } + for _, p := range h.ConnectedPeers() { + _p2pPeersGauge.WithLabelValues(p.ID.Pretty()).Set(1) + } +} + +func (h *Host) protectedPeers() []peer.ID { + if h.connMgr == nil || h.host.Peerstore() == nil { + return nil + } + protectedPeers := make([]peer.ID, 0) + for _, p := range h.host.Peerstore().Peers() { + tagInfo := h.connMgr.GetTagInfo(p) + if tagInfo == nil { + continue + } + protectTags := make([]string, 0) + for _, tag := range []string{"kbucket", "pubsub:broadcastbeefdb3001dc858cfe8b"} { + if h.connMgr.IsProtected(p, tag) { + protectTags = append(protectTags, tag) + } + } + if len(protectTags) > 0 { + protectedPeers = append(protectedPeers, p) + } + } + return protectedPeers +} + +func (h *Host) printPeers() { + printPeers := func() { + if h.connMgr == nil || h.host.Peerstore() == nil { + return + } + Logger().Info("print peer===============================") + protectedSize := 0 + for _, p := range h.host.Peerstore().Peers() { + tagInfo := h.connMgr.GetTagInfo(p) + if tagInfo == nil { + continue + } + protectTags := make([]string, 0) + for _, tag := range []string{"kbucket", "pubsub:broadcastbeefdb3001dc858cfe8b"} { + if h.connMgr.IsProtected(p, tag) { + protectTags = append(protectTags, tag) + } + } + if len(protectTags) > 0 { + protectedSize++ + } + Logger().Info("print peer", + zap.String("id", p.Pretty()), + zap.Any("tags", tagInfo.Tags), + zap.Any("protectTags", protectTags), + zap.Int("protectTagSize", len(protectTags)), + zap.Any("connsSize", len(tagInfo.Conns)), + ) + } + Logger().Info("print peer", zap.Int("protectedSize", protectedSize)) + } + ticker := time.NewTicker(3 * time.Second) + for { + select { + case <-ticker.C: + printPeers() + case <-h.close: + return + } + } + +} + func (h *Host) allowSource(src core.PeerID) (bool, error) { if !h.cfg.EnableRateLimit { return true, nil