Skip to content

Commit

Permalink
quic: avoid deadlock on listener close
Browse files Browse the repository at this point in the history
Avoid holding Listener.connsMu while blocking on a Conn's loop,
since the Conn can acquire the mutex while shutting down.

Fix Conn.waitReady to check conn readiness before checking
the Context doneness. This doesn't make a difference in the
current exported API, but this simplifies some tests and
will be useful once 0-RTT is implemented.

Refactor a bit of the testConn datagram handling to use a
testListener type, which helped expose the above deadlock
and will be useful for writing tests which don't involve
a Conn.

Change-Id: I064fca99ae9a165631fc0ff46eb334d25d7dd957
Reviewed-on: https://go-review.googlesource.com/c/net/+/529935
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
  • Loading branch information
neild committed Sep 21, 2023
1 parent 732b4bc commit 3b0ab98
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 38 deletions.
20 changes: 17 additions & 3 deletions internal/quic/conn_close.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ func (c *Conn) enterDraining(err error) {
}

func (c *Conn) waitReady(ctx context.Context) error {
select {
case <-c.lifetime.readyc:
return nil
case <-c.lifetime.drainingc:
return c.lifetime.finalErr
default:
}
select {
case <-c.lifetime.readyc:
return nil
Expand Down Expand Up @@ -215,7 +222,7 @@ func (c *Conn) Abort(err error) {
if err == nil {
err = localTransportError(errNo)
}
c.runOnLoop(func(now time.Time, c *Conn) {
c.sendMsg(func(now time.Time, c *Conn) {
c.abort(now, err)
})
}
Expand All @@ -228,11 +235,18 @@ func (c *Conn) abort(now time.Time, err error) {
c.lifetime.localErr = err
}

// abortImmediately terminates a connection.
// The connection does not send a CONNECTION_CLOSE, and skips the draining period.
func (c *Conn) abortImmediately(now time.Time, err error) {
c.abort(now, err)
c.enterDraining(err)
c.exited = true
}

// exit fully terminates a connection immediately.
func (c *Conn) exit() {
c.runOnLoop(func(now time.Time, c *Conn) {
c.sendMsg(func(now time.Time, c *Conn) {
c.enterDraining(errors.New("connection closed"))
c.exited = true
})
<-c.donec
}
41 changes: 7 additions & 34 deletions internal/quic/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import (
"errors"
"flag"
"fmt"
"io"
"math"
"net"
"net/netip"
"reflect"
"strings"
Expand Down Expand Up @@ -112,7 +110,7 @@ const maxTestKeyPhases = 3
type testConn struct {
t *testing.T
conn *Conn
listener *Listener
listener *testListener
now time.Time
timer time.Time
timerLastFired time.Time
Expand Down Expand Up @@ -231,8 +229,8 @@ func newTestConn(t *testing.T, side connSide, opts ...any) *testConn {
tc.peerTLSConn.SetTransportParameters(marshalTransportParameters(peerProvidedParams))
tc.peerTLSConn.Start(context.Background())

tc.listener = newListener((*testConnUDPConn)(tc), config, (*testConnHooks)(tc))
conn, err := tc.listener.newConn(
tc.listener = newTestListener(t, config, (*testConnHooks)(tc))
conn, err := tc.listener.l.newConn(
tc.now,
side,
initialConnID,
Expand Down Expand Up @@ -335,7 +333,7 @@ func (tc *testConn) cleanup() {
return
}
tc.conn.exit()
tc.listener.Close(context.Background())
<-tc.conn.donec
}

func (tc *testConn) logDatagram(text string, d *testDatagram) {
Expand Down Expand Up @@ -388,6 +386,7 @@ func (tc *testConn) write(d *testDatagram) {
for len(buf) < d.paddedSize {
buf = append(buf, 0)
}
// TODO: This should use tc.listener.write.
tc.conn.sendMsg(&datagram{
b: buf,
})
Expand Down Expand Up @@ -457,11 +456,10 @@ func (tc *testConn) readDatagram() *testDatagram {
tc.wait()
tc.sentPackets = nil
tc.sentFrames = nil
if len(tc.sentDatagrams) == 0 {
buf := tc.listener.read()
if buf == nil {
return nil
}
buf := tc.sentDatagrams[0]
tc.sentDatagrams = tc.sentDatagrams[1:]
d := tc.parseTestDatagram(buf)
// Log the datagram before removing ignored frames.
// When things go wrong, it's useful to see all the frames.
Expand Down Expand Up @@ -982,31 +980,6 @@ func testPeerConnID(seq int64) []byte {
return []byte{0xbe, 0xee, 0xff, byte(seq)}
}

// testConnUDPConn implements UDPConn.
type testConnUDPConn testConn

func (tc *testConnUDPConn) Close() error {
close(tc.recvDatagram)
return nil
}

func (tc *testConnUDPConn) LocalAddr() net.Addr {
return net.UDPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:443"))
}

func (tc *testConnUDPConn) ReadMsgUDPAddrPort(b, control []byte) (n, controln, flags int, _ netip.AddrPort, _ error) {
for d := range tc.recvDatagram {
n = copy(b, d.b)
return n, 0, 0, d.addr, nil
}
return 0, 0, 0, netip.AddrPort{}, io.EOF
}

func (tc *testConnUDPConn) WriteToUDPAddrPort(b []byte, addr netip.AddrPort) (int, error) {
tc.sentDatagrams = append(tc.sentDatagrams, append([]byte(nil), b...))
return len(b), nil
}

// canceledContext returns a canceled Context.
//
// Functions which take a context preference progress over cancelation.
Expand Down
2 changes: 1 addition & 1 deletion internal/quic/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (l *Listener) Close(ctx context.Context) error {
if !l.closing {
l.closing = true
for c := range l.conns {
c.Close()
c.Abort(errors.New("listener closed"))
}
if len(l.conns) == 0 {
l.udpConn.Close()
Expand Down
75 changes: 75 additions & 0 deletions internal/quic/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"bytes"
"context"
"io"
"net"
"net/netip"
"testing"
)

Expand Down Expand Up @@ -86,3 +88,76 @@ func newLocalListener(t *testing.T, side connSide, conf *Config) *Listener {
})
return l
}

type testListener struct {
t *testing.T
l *Listener
recvc chan *datagram
idlec chan struct{}
sentDatagrams [][]byte
}

func newTestListener(t *testing.T, config *Config, testHooks connTestHooks) *testListener {
tl := &testListener{
t: t,
recvc: make(chan *datagram),
idlec: make(chan struct{}),
}
tl.l = newListener((*testListenerUDPConn)(tl), config, testHooks)
t.Cleanup(tl.cleanup)
return tl
}

func (tl *testListener) cleanup() {
tl.l.Close(canceledContext())
}

func (tl *testListener) wait() {
tl.idlec <- struct{}{}
}

func (tl *testListener) write(d *datagram) {
tl.recvc <- d
tl.wait()
}

func (tl *testListener) read() []byte {
tl.wait()
if len(tl.sentDatagrams) == 0 {
return nil
}
d := tl.sentDatagrams[0]
tl.sentDatagrams = tl.sentDatagrams[1:]
return d
}

// testListenerUDPConn implements UDPConn.
type testListenerUDPConn testListener

func (tl *testListenerUDPConn) Close() error {
close(tl.recvc)
return nil
}

func (tl *testListenerUDPConn) LocalAddr() net.Addr {
return net.UDPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:443"))
}

func (tl *testListenerUDPConn) ReadMsgUDPAddrPort(b, control []byte) (n, controln, flags int, _ netip.AddrPort, _ error) {
for {
select {
case d, ok := <-tl.recvc:
if !ok {
return 0, 0, 0, netip.AddrPort{}, io.EOF
}
n = copy(b, d.b)
return n, 0, 0, d.addr, nil
case <-tl.idlec:
}
}
}

func (tl *testListenerUDPConn) WriteToUDPAddrPort(b []byte, addr netip.AddrPort) (int, error) {
tl.sentDatagrams = append(tl.sentDatagrams, append([]byte(nil), b...))
return len(b), nil
}

0 comments on commit 3b0ab98

Please sign in to comment.