From 203dad2e28160282fd27f6a547455449ba8d5d72 Mon Sep 17 00:00:00 2001 From: envestcc Date: Fri, 22 Dec 2023 15:07:51 +0800 Subject: [PATCH 1/6] add p2p bandwidth metrics --- p2p.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/p2p.go b/p2p.go index 1283f7f..1421db0 100644 --- a/p2p.go +++ b/p2p.go @@ -16,6 +16,7 @@ import ( "github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multihash" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "golang.org/x/time/rate" @@ -24,6 +25,7 @@ import ( connmgr "github.com/libp2p/go-libp2p-connmgr" core "github.com/libp2p/go-libp2p-core" "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/metrics" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/pnet" "github.com/libp2p/go-libp2p-core/protocol" @@ -118,6 +120,20 @@ var ( } ) +var ( + _p2pBandwidthHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "iotex_p2p_bandwidth_histogram", + Help: "P2P bandwidth stats", + }, + []string{"protocol", "type"}, + ) +) + +func init() { + prometheus.MustRegister(_p2pBandwidthHistogram) +} + // Option defines the option function to modify the config for a host type Option func(cfg *Config) error @@ -266,6 +282,7 @@ type Host struct { peersLimiters *lru.Cache unicastLimiter *rate.Limiter peerManager *peerManager + bc *metrics.BandwidthCounter } // NewHost constructs a host struct @@ -308,6 +325,7 @@ func NewHost(ctx context.Context, options ...Option) (*Host, error) { if err != nil { return nil, err } + bc := metrics.NewBandwidthCounter() opts := []libp2p.Option{ libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/%s/tcp/%d", ip, cfg.Port)), libp2p.AddrsFactory(func(addrs []multiaddr.Multiaddr) []multiaddr.Multiaddr { @@ -323,6 +341,7 @@ func NewHost(ctx context.Context, options ...Option) (*Host, error) { }), libp2p.Muxer("/yamux/2.0.0", yamux.DefaultTransport), libp2p.ConnectionManager(connmgr.NewConnManager(cfg.ConnLowWater, cfg.ConnHighWater, cfg.ConnGracePeriod)), + libp2p.BandwidthReporter(bc), } if !cfg.SecureIO { @@ -401,6 +420,7 @@ func NewHost(ctx context.Context, options ...Option) (*Host, error) { unicastLimiter: rate.NewLimiter(rate.Limit(cfg.RateLimit.GlobalUnicastAvg), cfg.RateLimit.GlobalUnicastBurst), peerManager: newPeerManager(host, discovery.NewRoutingDiscovery(kad), cfg.GroupID, withMaxPeers(cfg.MaxPeer), withBlacklistTolerance(cfg.BlacklistTolerance), withBlacklistTimeout(cfg.BlackListTimeout)), + bc: bc, } addrs := make([]string, 0) @@ -411,6 +431,18 @@ func NewHost(ctx context.Context, options ...Option) (*Host, error) { zap.Strings("address", addrs), zap.Bool("secureIO", myHost.cfg.SecureIO), zap.Bool("gossip", myHost.cfg.Gossip)) + // start metrics update + go func() { + ticker := time.NewTicker(500 * time.Millisecond) + for { + select { + case <-ticker.C: + myHost.updateMetrics() + case <-myHost.close: + return + } + } + }() return &myHost, nil } @@ -684,6 +716,19 @@ func (h *Host) Close() error { return nil } +func (h *Host) updateMetrics() { + if h.bc == nil { + return + } + for p, stats := range h.bc.GetBandwidthByProtocol() { + protocol := string(p) + _p2pBandwidthHistogram.WithLabelValues(protocol, "in").Observe(float64(stats.TotalIn)) + _p2pBandwidthHistogram.WithLabelValues(protocol, "out").Observe(float64(stats.TotalOut)) + _p2pBandwidthHistogram.WithLabelValues(protocol, "rateIn").Observe(float64(stats.RateIn)) + _p2pBandwidthHistogram.WithLabelValues(protocol, "rateOut").Observe(float64(stats.RateOut)) + } +} + func (h *Host) allowSource(src core.PeerID) (bool, error) { if !h.cfg.EnableRateLimit { return true, nil From 6bcaa96c69dd16a3088296cb3e6e6c729a5ccfd5 Mon Sep 17 00:00:00 2001 From: envestcc Date: Mon, 25 Dec 2023 15:02:18 +0800 Subject: [PATCH 2/6] use gauge --- p2p.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/p2p.go b/p2p.go index 1421db0..adb33ac 100644 --- a/p2p.go +++ b/p2p.go @@ -121,9 +121,9 @@ var ( ) var ( - _p2pBandwidthHistogram = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "iotex_p2p_bandwidth_histogram", + _p2pBandwidthGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "iotex_p2p_bandwidth_gauge", Help: "P2P bandwidth stats", }, []string{"protocol", "type"}, @@ -131,7 +131,7 @@ var ( ) func init() { - prometheus.MustRegister(_p2pBandwidthHistogram) + prometheus.MustRegister(_p2pBandwidthGauge) } // Option defines the option function to modify the config for a host @@ -722,10 +722,10 @@ func (h *Host) updateMetrics() { } for p, stats := range h.bc.GetBandwidthByProtocol() { protocol := string(p) - _p2pBandwidthHistogram.WithLabelValues(protocol, "in").Observe(float64(stats.TotalIn)) - _p2pBandwidthHistogram.WithLabelValues(protocol, "out").Observe(float64(stats.TotalOut)) - _p2pBandwidthHistogram.WithLabelValues(protocol, "rateIn").Observe(float64(stats.RateIn)) - _p2pBandwidthHistogram.WithLabelValues(protocol, "rateOut").Observe(float64(stats.RateOut)) + _p2pBandwidthGauge.WithLabelValues(protocol, "in").Set(float64(stats.TotalIn)) + _p2pBandwidthGauge.WithLabelValues(protocol, "out").Set(float64(stats.TotalOut)) + _p2pBandwidthGauge.WithLabelValues(protocol, "rateIn").Set(float64(stats.RateIn)) + _p2pBandwidthGauge.WithLabelValues(protocol, "rateOut").Set(float64(stats.RateOut)) } } From 7ab162958ec80ef2975e6c24fee232687a530110 Mon Sep 17 00:00:00 2001 From: envestcc Date: Mon, 25 Dec 2023 22:34:59 +0800 Subject: [PATCH 3/6] remove disconnected peers --- peerManager.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/peerManager.go b/peerManager.go index 41b5d85..2e63d5c 100644 --- a/peerManager.go +++ b/peerManager.go @@ -9,6 +9,7 @@ import ( "github.com/iotexproject/go-pkgs/cache/ttl" core "github.com/libp2p/go-libp2p-core" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" discovery "github.com/libp2p/go-libp2p-discovery" "github.com/multiformats/go-multiaddr" @@ -245,6 +246,9 @@ func (pm *peerManager) ConnectedPeers() []peer.AddrInfo { connSet := make(map[string]bool, len(conns)) for _, conn := range conns { remoteID := conn.RemotePeer() + if pm.host.Network().Connectedness(remoteID) != network.Connected { + continue + } if connSet[remoteID.Pretty()] { continue } From f36811fd83913c19c11185f6a06cfb82a680d10e Mon Sep 17 00:00:00 2001 From: envestcc Date: Tue, 26 Dec 2023 14:46:40 +0800 Subject: [PATCH 4/6] Revert "remove disconnected peers" This reverts commit 7ab162958ec80ef2975e6c24fee232687a530110. --- peerManager.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/peerManager.go b/peerManager.go index 2e63d5c..41b5d85 100644 --- a/peerManager.go +++ b/peerManager.go @@ -9,7 +9,6 @@ import ( "github.com/iotexproject/go-pkgs/cache/ttl" core "github.com/libp2p/go-libp2p-core" - "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" discovery "github.com/libp2p/go-libp2p-discovery" "github.com/multiformats/go-multiaddr" @@ -246,9 +245,6 @@ func (pm *peerManager) ConnectedPeers() []peer.AddrInfo { connSet := make(map[string]bool, len(conns)) for _, conn := range conns { remoteID := conn.RemotePeer() - if pm.host.Network().Connectedness(remoteID) != network.Connected { - continue - } if connSet[remoteID.Pretty()] { continue } From c68ef6fca599bdf5ba215523b01aa18729b2e324 Mon Sep 17 00:00:00 2001 From: envestcc Date: Wed, 27 Dec 2023 11:37:47 +0800 Subject: [PATCH 5/6] add protectedPeers metric --- p2p.go | 100 +++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 90 insertions(+), 10 deletions(-) diff --git a/p2p.go b/p2p.go index adb33ac..f150bdc 100644 --- a/p2p.go +++ b/p2p.go @@ -128,10 +128,18 @@ var ( }, []string{"protocol", "type"}, ) + _p2pGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "iotex_p2p_gauge", + Help: "P2P conn stats", + }, + []string{"type"}, + ) ) func init() { prometheus.MustRegister(_p2pBandwidthGauge) + prometheus.MustRegister(_p2pGauge) } // Option defines the option function to modify the config for a host @@ -283,6 +291,7 @@ type Host struct { unicastLimiter *rate.Limiter peerManager *peerManager bc *metrics.BandwidthCounter + connMgr *connmgr.BasicConnMgr } // NewHost constructs a host struct @@ -326,6 +335,7 @@ func NewHost(ctx context.Context, options ...Option) (*Host, error) { return nil, err } bc := metrics.NewBandwidthCounter() + connMgr := connmgr.NewConnManager(cfg.ConnLowWater, cfg.ConnHighWater, cfg.ConnGracePeriod) opts := []libp2p.Option{ libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/%s/tcp/%d", ip, cfg.Port)), libp2p.AddrsFactory(func(addrs []multiaddr.Multiaddr) []multiaddr.Multiaddr { @@ -340,7 +350,7 @@ func NewHost(ctx context.Context, options ...Option) (*Host, error) { return &tcp.TcpTransport{Upgrader: upgrader, ConnectTimeout: cfg.ConnectTimeout} }), libp2p.Muxer("/yamux/2.0.0", yamux.DefaultTransport), - libp2p.ConnectionManager(connmgr.NewConnManager(cfg.ConnLowWater, cfg.ConnHighWater, cfg.ConnGracePeriod)), + libp2p.ConnectionManager(connMgr), libp2p.BandwidthReporter(bc), } @@ -420,7 +430,8 @@ func NewHost(ctx context.Context, options ...Option) (*Host, error) { unicastLimiter: rate.NewLimiter(rate.Limit(cfg.RateLimit.GlobalUnicastAvg), cfg.RateLimit.GlobalUnicastBurst), peerManager: newPeerManager(host, discovery.NewRoutingDiscovery(kad), cfg.GroupID, withMaxPeers(cfg.MaxPeer), withBlacklistTolerance(cfg.BlacklistTolerance), withBlacklistTimeout(cfg.BlackListTimeout)), - bc: bc, + bc: bc, + connMgr: connMgr, } addrs := make([]string, 0) @@ -717,16 +728,85 @@ func (h *Host) Close() error { } func (h *Host) updateMetrics() { - if h.bc == nil { - return + if h.bc != nil { + for p, stats := range h.bc.GetBandwidthByProtocol() { + protocol := string(p) + _p2pBandwidthGauge.WithLabelValues(protocol, "in").Set(float64(stats.TotalIn)) + _p2pBandwidthGauge.WithLabelValues(protocol, "out").Set(float64(stats.TotalOut)) + _p2pBandwidthGauge.WithLabelValues(protocol, "rateIn").Set(float64(stats.RateIn)) + _p2pBandwidthGauge.WithLabelValues(protocol, "rateOut").Set(float64(stats.RateOut)) + } + } + if h.connMgr != nil { + _p2pGauge.WithLabelValues("numConnectedPeers").Set(float64(h.connMgr.GetInfo().ConnCount)) + _p2pGauge.WithLabelValues("numProtectedPeers").Set(float64(len(h.protectedPeers()))) + } +} + +func (h *Host) protectedPeers() []peer.ID { + if h.connMgr == nil || h.host.Peerstore() == nil { + return nil + } + protectedPeers := make([]peer.ID, 0) + for _, p := range h.host.Peerstore().Peers() { + tagInfo := h.connMgr.GetTagInfo(p) + if tagInfo == nil { + continue + } + protectTags := make([]string, 0) + for _, tag := range []string{"kbucket", "pubsub:broadcastbeefdb3001dc858cfe8b"} { + if h.connMgr.IsProtected(p, tag) { + protectTags = append(protectTags, tag) + } + } + if len(protectTags) > 0 { + protectedPeers = append(protectedPeers, p) + } } - for p, stats := range h.bc.GetBandwidthByProtocol() { - protocol := string(p) - _p2pBandwidthGauge.WithLabelValues(protocol, "in").Set(float64(stats.TotalIn)) - _p2pBandwidthGauge.WithLabelValues(protocol, "out").Set(float64(stats.TotalOut)) - _p2pBandwidthGauge.WithLabelValues(protocol, "rateIn").Set(float64(stats.RateIn)) - _p2pBandwidthGauge.WithLabelValues(protocol, "rateOut").Set(float64(stats.RateOut)) + return protectedPeers +} + +func (h *Host) printPeers() { + printPeers := func() { + if h.connMgr == nil || h.host.Peerstore() == nil { + return + } + Logger().Info("print peer===============================") + protectedSize := 0 + for _, p := range h.host.Peerstore().Peers() { + tagInfo := h.connMgr.GetTagInfo(p) + if tagInfo == nil { + continue + } + protectTags := make([]string, 0) + for _, tag := range []string{"kbucket", "pubsub:broadcastbeefdb3001dc858cfe8b"} { + if h.connMgr.IsProtected(p, tag) { + protectTags = append(protectTags, tag) + } + } + if len(protectTags) > 0 { + protectedSize++ + } + Logger().Info("print peer", + zap.String("id", p.Pretty()), + zap.Any("tags", tagInfo.Tags), + zap.Any("protectTags", protectTags), + zap.Int("protectTagSize", len(protectTags)), + zap.Any("connsSize", len(tagInfo.Conns)), + ) + } + Logger().Info("print peer", zap.Int("protectedSize", protectedSize)) + } + ticker := time.NewTicker(3 * time.Second) + for { + select { + case <-ticker.C: + printPeers() + case <-h.close: + return + } } + } func (h *Host) allowSource(src core.PeerID) (bool, error) { From 8e65d2fcf36e20d54226c5eb564c7ff71dedb218 Mon Sep 17 00:00:00 2001 From: envestcc Date: Thu, 4 Jan 2024 11:02:41 +0800 Subject: [PATCH 6/6] add peer metric & print DHT --- p2p.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/p2p.go b/p2p.go index f150bdc..46a97f0 100644 --- a/p2p.go +++ b/p2p.go @@ -135,11 +135,19 @@ var ( }, []string{"type"}, ) + _p2pPeersGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "iotex_p2p_peers_gauge", + Help: "P2P peers stats", + }, + []string{"peer"}, + ) ) func init() { prometheus.MustRegister(_p2pBandwidthGauge) prometheus.MustRegister(_p2pGauge) + prometheus.MustRegister(_p2pPeersGauge) } // Option defines the option function to modify the config for a host @@ -454,6 +462,18 @@ func NewHost(ctx context.Context, options ...Option) (*Host, error) { } } }() + // start print peers + go func() { + ticker := time.NewTicker(1 * time.Minute) + for { + select { + case <-ticker.C: + kad.RoutingTable().Print() + case <-myHost.close: + return + } + } + }() return &myHost, nil } @@ -741,6 +761,9 @@ func (h *Host) updateMetrics() { _p2pGauge.WithLabelValues("numConnectedPeers").Set(float64(h.connMgr.GetInfo().ConnCount)) _p2pGauge.WithLabelValues("numProtectedPeers").Set(float64(len(h.protectedPeers()))) } + for _, p := range h.ConnectedPeers() { + _p2pPeersGauge.WithLabelValues(p.ID.Pretty()).Set(1) + } } func (h *Host) protectedPeers() []peer.ID {