From 30c8633e45b7989bc6beebbc53715f1f5c188d87 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Mon, 8 May 2023 13:13:22 -0700 Subject: [PATCH 1/2] Log unexpected listener errors --- core/transport/transport.go | 4 ++++ p2p/net/swarm/swarm_listen.go | 3 +++ p2p/net/upgrader/listener.go | 4 ++++ p2p/net/upgrader/listener_test.go | 2 +- p2p/protocol/circuitv2/client/listen.go | 4 ++-- p2p/transport/quic/virtuallistener.go | 11 +++++------ p2p/transport/quicreuse/connmgr_test.go | 4 ++-- p2p/transport/quicreuse/listener.go | 9 +++++++-- p2p/transport/websocket/listener.go | 4 ++-- p2p/transport/webtransport/listener.go | 4 +--- 10 files changed, 31 insertions(+), 18 deletions(-) diff --git a/core/transport/transport.go b/core/transport/transport.go index ad2ee66496..859c6d6088 100644 --- a/core/transport/transport.go +++ b/core/transport/transport.go @@ -4,6 +4,7 @@ package transport import ( "context" + "errors" "net" "github.com/libp2p/go-libp2p/core/network" @@ -94,6 +95,9 @@ type Listener interface { Multiaddr() ma.Multiaddr } +// ErrListenerClosed is returned by Listener.Accept when the listener is gracefully closed. +var ErrListenerClosed = errors.New("listener closed") + // TransportNetwork is an inet.Network with methods for managing transports. type TransportNetwork interface { network.Network diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index 7ae18a9c97..bec81b8022 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -127,6 +127,9 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error { for { c, err := list.Accept() if err != nil { + if err != transport.ErrListenerClosed { + log.Error("swarm listener accept error: ", err) + } return } canonicallog.LogPeerStatus(100, c.RemotePeer(), c.RemoteMultiaddr(), "connection_status", "established", "dir", "inbound") diff --git a/p2p/net/upgrader/listener.go b/p2p/net/upgrader/listener.go index c07299c1a5..0871d2f5af 100644 --- a/p2p/net/upgrader/listener.go +++ b/p2p/net/upgrader/listener.go @@ -3,6 +3,7 @@ package upgrader import ( "context" "fmt" + "strings" "sync" "github.com/libp2p/go-libp2p/core/network" @@ -165,6 +166,9 @@ func (l *listener) Accept() (transport.CapableConn, error) { return c, nil } } + if strings.Contains(l.err.Error(), "use of closed network connection") { + return nil, transport.ErrListenerClosed + } return nil, l.err } diff --git a/p2p/net/upgrader/listener_test.go b/p2p/net/upgrader/listener_test.go index be4a1f465a..331e973ed1 100644 --- a/p2p/net/upgrader/listener_test.go +++ b/p2p/net/upgrader/listener_test.go @@ -158,7 +158,7 @@ func TestListenerClose(t *testing.T) { require.NoError(ln.Close()) err := <-errCh require.Error(err) - require.Contains(err.Error(), "use of closed network connection") + require.Equal(err, transport.ErrListenerClosed) // doesn't accept new connections when it is closed _, err = dial(t, u, ln.Multiaddr(), peer.ID("1"), &network.NullScope{}) diff --git a/p2p/protocol/circuitv2/client/listen.go b/p2p/protocol/circuitv2/client/listen.go index 0d44ac726d..6f5050c2a6 100644 --- a/p2p/protocol/circuitv2/client/listen.go +++ b/p2p/protocol/circuitv2/client/listen.go @@ -1,9 +1,9 @@ package client import ( - "errors" "net" + "github.com/libp2p/go-libp2p/core/transport" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" ) @@ -33,7 +33,7 @@ func (l *Listener) Accept() (manet.Conn, error) { return evt.conn, nil case <-l.ctx.Done(): - return nil, errors.New("circuit v2 client closed") + return nil, transport.ErrListenerClosed } } } diff --git a/p2p/transport/quic/virtuallistener.go b/p2p/transport/quic/virtuallistener.go index fc6373f857..009bc64ae3 100644 --- a/p2p/transport/quic/virtuallistener.go +++ b/p2p/transport/quic/virtuallistener.go @@ -1,7 +1,6 @@ package libp2pquic import ( - "errors" "sync" tpt "github.com/libp2p/go-libp2p/core/transport" @@ -30,7 +29,7 @@ func (l *virtualListener) Multiaddr() ma.Multiaddr { } func (l *virtualListener) Close() error { - l.acceptRunnner.RmAcceptForVersion(l.version) + l.acceptRunnner.RmAcceptForVersion(l.version, tpt.ErrListenerClosed) return l.t.CloseVirtualListener(l) } @@ -64,7 +63,7 @@ func (r *acceptLoopRunner) AcceptForVersion(v quic.VersionNumber) chan acceptVal return ch } -func (r *acceptLoopRunner) RmAcceptForVersion(v quic.VersionNumber) { +func (r *acceptLoopRunner) RmAcceptForVersion(v quic.VersionNumber, err error) { r.muxerMu.Lock() defer r.muxerMu.Unlock() @@ -72,7 +71,7 @@ func (r *acceptLoopRunner) RmAcceptForVersion(v quic.VersionNumber) { if !ok { panic("expected chan in accept muxer") } - ch <- acceptVal{err: errors.New("listener Accept closed")} + ch <- acceptVal{err: err} delete(r.muxer, v) } @@ -97,7 +96,7 @@ func (r *acceptLoopRunner) innerAccept(l *listener, expectedVersion quic.Version // Check if we have a buffered connection first from an earlier Accept call case v, ok := <-bufferedConnChan: if !ok { - return nil, errors.New("listener closed") + return nil, tpt.ErrListenerClosed } return v.conn, v.err default: @@ -159,7 +158,7 @@ func (r *acceptLoopRunner) Accept(l *listener, expectedVersion quic.VersionNumbe } case v, ok := <-bufferedConnChan: if !ok { - return nil, errors.New("listener closed") + return nil, tpt.ErrListenerClosed } conn = v.conn err = v.err diff --git a/p2p/transport/quicreuse/connmgr_test.go b/p2p/transport/quicreuse/connmgr_test.go index 16677c50fe..c3be7b9f37 100644 --- a/p2p/transport/quicreuse/connmgr_test.go +++ b/p2p/transport/quicreuse/connmgr_test.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/transport" libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls" ma "github.com/multiformats/go-multiaddr" @@ -152,8 +153,7 @@ func TestAcceptErrorGetCleanedUp(t *testing.T) { require.NoError(t, err) defer l.Close() _, err = l.Accept(context.Background()) - require.EqualError(t, err, "accept goroutine finished") - + require.Equal(t, err, transport.ErrListenerClosed) } // The connection passed to quic-go needs to be type-assertable to a net.UDPConn, diff --git a/p2p/transport/quicreuse/listener.go b/p2p/transport/quicreuse/listener.go index 5f219fc65e..7f8b836d00 100644 --- a/p2p/transport/quicreuse/listener.go +++ b/p2p/transport/quicreuse/listener.go @@ -7,8 +7,10 @@ import ( "fmt" "io" "net" + "strings" "sync" + "github.com/libp2p/go-libp2p/core/transport" ma "github.com/multiformats/go-multiaddr" "github.com/quic-go/quic-go" ) @@ -134,6 +136,9 @@ func (l *connListener) Run() error { for { conn, err := l.l.Accept(context.Background()) if err != nil { + if err == quic.ErrServerClosed || strings.Contains(err.Error(), "use of closed network connection") { + return transport.ErrListenerClosed + } return err } proto := conn.ConnectionState().TLS.NegotiatedProtocol @@ -192,10 +197,10 @@ func (l *listener) Accept(ctx context.Context) (quic.Connection, error) { case <-ctx.Done(): return nil, ctx.Err() case <-l.acceptLoopRunning: - return nil, errors.New("accept goroutine finished") + return nil, transport.ErrListenerClosed case c, ok := <-l.queue: if !ok { - return nil, errors.New("listener closed") + return nil, transport.ErrListenerClosed } return c, nil } diff --git a/p2p/transport/websocket/listener.go b/p2p/transport/websocket/listener.go index ab9a73f8ab..e882b15146 100644 --- a/p2p/transport/websocket/listener.go +++ b/p2p/transport/websocket/listener.go @@ -140,7 +140,7 @@ func (l *listener) Accept() (manet.Conn, error) { select { case c, ok := <-l.incoming: if !ok { - return nil, fmt.Errorf("listener is closed") + return nil, transport.ErrListenerClosed } mnc, err := manet.WrapNetConn(c) @@ -151,7 +151,7 @@ func (l *listener) Accept() (manet.Conn, error) { return mnc, nil case <-l.closed: - return nil, fmt.Errorf("listener is closed") + return nil, transport.ErrListenerClosed } } diff --git a/p2p/transport/webtransport/listener.go b/p2p/transport/webtransport/listener.go index 4722e00811..337239fa8a 100644 --- a/p2p/transport/webtransport/listener.go +++ b/p2p/transport/webtransport/listener.go @@ -18,8 +18,6 @@ import ( "github.com/quic-go/webtransport-go" ) -var errClosed = errors.New("closed") - const queueLen = 16 const handshakeTimeout = 10 * time.Second @@ -155,7 +153,7 @@ func (l *listener) httpHandlerWithConnScope(w http.ResponseWriter, r *http.Reque func (l *listener) Accept() (tpt.CapableConn, error) { select { case <-l.ctx.Done(): - return nil, errClosed + return nil, tpt.ErrListenerClosed case c := <-l.queue: return c, nil } From 6ddce139ca8456f0821eb801fc36ba47b359e4fe Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 10 May 2023 18:58:07 -0700 Subject: [PATCH 2/2] Use errors.Is --- p2p/net/swarm/swarm_listen.go | 5 +++-- p2p/transport/quicreuse/connmgr_test.go | 2 +- p2p/transport/quicreuse/listener.go | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index bec81b8022..0905e84513 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -1,6 +1,7 @@ package swarm import ( + "errors" "fmt" "time" @@ -127,8 +128,8 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error { for { c, err := list.Accept() if err != nil { - if err != transport.ErrListenerClosed { - log.Error("swarm listener accept error: ", err) + if !errors.Is(err, transport.ErrListenerClosed) { + log.Errorf("swarm listener for %s accept error: %s", a, err) } return } diff --git a/p2p/transport/quicreuse/connmgr_test.go b/p2p/transport/quicreuse/connmgr_test.go index c3be7b9f37..ec0adc4251 100644 --- a/p2p/transport/quicreuse/connmgr_test.go +++ b/p2p/transport/quicreuse/connmgr_test.go @@ -153,7 +153,7 @@ func TestAcceptErrorGetCleanedUp(t *testing.T) { require.NoError(t, err) defer l.Close() _, err = l.Accept(context.Background()) - require.Equal(t, err, transport.ErrListenerClosed) + require.ErrorIs(t, err, transport.ErrListenerClosed) } // The connection passed to quic-go needs to be type-assertable to a net.UDPConn, diff --git a/p2p/transport/quicreuse/listener.go b/p2p/transport/quicreuse/listener.go index 7f8b836d00..e7c0101718 100644 --- a/p2p/transport/quicreuse/listener.go +++ b/p2p/transport/quicreuse/listener.go @@ -136,7 +136,7 @@ func (l *connListener) Run() error { for { conn, err := l.l.Accept(context.Background()) if err != nil { - if err == quic.ErrServerClosed || strings.Contains(err.Error(), "use of closed network connection") { + if errors.Is(err, quic.ErrServerClosed) || strings.Contains(err.Error(), "use of closed network connection") { return transport.ErrListenerClosed } return err