From cd80a0732693c88d8c1758ee7442a7fd7bf1e615 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 19 Dec 2021 17:33:14 +0400 Subject: [PATCH] speed up the dial tests --- dial_test.go | 115 +++++++++++++++--------------------------------- limiter_test.go | 43 +++++++++--------- simul_test.go | 4 +- swarm_test.go | 18 ++------ 4 files changed, 62 insertions(+), 118 deletions(-) diff --git a/dial_test.go b/dial_test.go index 5fe4bdc2..8b982daa 100644 --- a/dial_test.go +++ b/dial_test.go @@ -24,8 +24,10 @@ import ( "github.com/stretchr/testify/require" ) -func init() { - transport.DialTimeout = time.Second +func setDialTimeout(t time.Duration) (reset func()) { + orig := transport.DialTimeout + transport.DialTimeout = t + return func() { transport.DialTimeout = orig } } func closeSwarms(swarms []*Swarm) { @@ -92,6 +94,7 @@ func TestSimultDials(t *testing.T) { ctx := context.Background() swarms := makeSwarms(t, 2, swarmt.OptDisableReuseport) + defer closeSwarms(swarms) // connect everyone { @@ -133,10 +136,6 @@ func TestSimultDials(t *testing.T) { if c10l > 2 { t.Error("1->0 has", c10l) } - - for _, s := range swarms { - s.Close() - } } func newSilentPeer(t *testing.T) (peer.ID, ma.Multiaddr, net.Listener) { @@ -159,7 +158,8 @@ func newSilentPeer(t *testing.T) (peer.ID, ma.Multiaddr, net.Listener) { } func TestDialWait(t *testing.T) { - t.Parallel() + reset := setDialTimeout(250 * time.Millisecond) + defer reset() ctx := context.Background() swarms := makeSwarms(t, 1) @@ -194,7 +194,6 @@ func TestDialWait(t *testing.T) { } func TestDialBackoff(t *testing.T) { - // t.Skip("skipping for another test") if ci.IsRunning() { t.Skip("travis will never have fun with this test") } @@ -203,10 +202,9 @@ func TestDialBackoff(t *testing.T) { ctx := context.Background() swarms := makeSwarms(t, 2) + defer closeSwarms(swarms) s1 := swarms[0] s2 := swarms[1] - defer s1.Close() - defer s2.Close() s2addrs, err := s2.InterfaceListenAddresses() if err != nil { @@ -405,15 +403,13 @@ func TestDialBackoff(t *testing.T) { } func TestDialBackoffClears(t *testing.T) { - // t.Skip("skipping for another test") - t.Parallel() + reset := setDialTimeout(250 * time.Millisecond) + defer reset() - ctx := context.Background() swarms := makeSwarms(t, 2) + defer closeSwarms(swarms) s1 := swarms[0] s2 := swarms[1] - defer s1.Close() - defer s2.Close() // use another address first, that accept and hang on conns _, s2bad, s2l := newSilentPeer(t) @@ -424,13 +420,8 @@ func TestDialBackoffClears(t *testing.T) { s1.Peerstore().AddAddr(s2.LocalPeer(), s2bad, peerstore.PermanentAddrTTL) before := time.Now() - c, err := s1.DialPeer(ctx, s2.LocalPeer()) - if err == nil { - defer c.Close() - t.Fatal("dialing to broken addr worked...", err) - } else { - t.Log("correctly got error:", err) - } + _, err := s1.DialPeer(context.Background(), s2.LocalPeer()) + require.Error(t, err, "dialing to broken addr worked...") duration := time.Since(before) if duration < transport.DialTimeout*DialAttempts { @@ -439,39 +430,18 @@ func TestDialBackoffClears(t *testing.T) { if duration > 2*transport.DialTimeout*DialAttempts { t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts) } - - if !s1.Backoff().Backoff(s2.LocalPeer(), s2bad) { - t.Error("s2 should now be on backoff") - } else { - t.Log("correctly added to backoff") - } + require.True(t, s1.Backoff().Backoff(s2.LocalPeer(), s2bad), "s2 should now be on backoff") // phase 2 -- add the working address. dial should succeed. - ifaceAddrs1, err := swarms[1].InterfaceListenAddresses() - if err != nil { - t.Fatal(err) - } + ifaceAddrs1, err := s2.InterfaceListenAddresses() + require.NoError(t, err) s1.Peerstore().AddAddrs(s2.LocalPeer(), ifaceAddrs1, peerstore.PermanentAddrTTL) - if c, err := s1.DialPeer(ctx, s2.LocalPeer()); err == nil { - c.Close() - t.Log("backoffs are per address, not peer") - } - - time.Sleep(BackoffBase) - - if c, err := s1.DialPeer(ctx, s2.LocalPeer()); err != nil { - t.Fatal(err) - } else { - c.Close() - t.Log("correctly connected") - } - - if s1.Backoff().Backoff(s2.LocalPeer(), s2bad) { - t.Error("s2 should no longer be on backoff") - } else { - t.Log("correctly cleared backoff") - } + // backoffs are per address, not peer + c, err := s1.DialPeer(context.Background(), s2.LocalPeer()) + require.NoError(t, err) + defer c.Close() + require.False(t, s1.Backoff().Backoff(s2.LocalPeer(), s2bad), "s2 should no longer be on backoff") } func TestDialPeerFailed(t *testing.T) { @@ -515,8 +485,6 @@ func TestDialPeerFailed(t *testing.T) { } func TestDialExistingConnection(t *testing.T) { - ctx := context.Background() - swarms := makeSwarms(t, 2) defer closeSwarms(swarms) s1 := swarms[0] @@ -524,19 +492,13 @@ func TestDialExistingConnection(t *testing.T) { s1.Peerstore().AddAddrs(s2.LocalPeer(), s2.ListenAddresses(), peerstore.PermanentAddrTTL) - c1, err := s1.DialPeer(ctx, s2.LocalPeer()) - if err != nil { - t.Fatal(err) - } + c1, err := s1.DialPeer(context.Background(), s2.LocalPeer()) + require.NoError(t, err) - c2, err := s1.DialPeer(ctx, s2.LocalPeer()) - if err != nil { - t.Fatal(err) - } + c2, err := s1.DialPeer(context.Background(), s2.LocalPeer()) + require.NoError(t, err) - if c1 != c2 { - t.Fatal("expecting the same connection from both dials") - } + require.Equal(t, c1, c2, "expecting the same connection from both dials") } func newSilentListener(t *testing.T) ([]ma.Multiaddr, net.Listener) { @@ -558,14 +520,13 @@ func newSilentListener(t *testing.T) ([]ma.Multiaddr, net.Listener) { } func TestDialSimultaneousJoin(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + reset := setDialTimeout(250 * time.Millisecond) + defer reset() swarms := makeSwarms(t, 2) + defer closeSwarms(swarms) s1 := swarms[0] s2 := swarms[1] - defer s1.Close() - defer s2.Close() s2silentAddrs, s2silentListener := newSilentListener(t) go acceptAndHang(s2silentListener) @@ -577,7 +538,7 @@ func TestDialSimultaneousJoin(t *testing.T) { go func() { s1.Peerstore().AddAddrs(s2.LocalPeer(), s2silentAddrs, peerstore.PermanentAddrTTL) - c, err := s1.DialPeer(ctx, s2.LocalPeer()) + c, err := s1.DialPeer(context.Background(), s2.LocalPeer()) if err != nil { errs <- err connch <- nil @@ -602,7 +563,7 @@ func TestDialSimultaneousJoin(t *testing.T) { } s1.Peerstore().AddAddrs(s2.LocalPeer(), s2addrs[:1], peerstore.PermanentAddrTTL) - c, err := s1.DialPeer(ctx, s2.LocalPeer()) + c, err := s1.DialPeer(context.Background(), s2.LocalPeer()) if err != nil { errs <- err connch <- nil @@ -620,7 +581,7 @@ func TestDialSimultaneousJoin(t *testing.T) { // start a third dial to s2, this should get the existing connection from the successful dial go func() { - c, err := s1.DialPeer(ctx, s2.LocalPeer()) + c, err := s1.DialPeer(context.Background(), s2.LocalPeer()) if err != nil { errs <- err connch <- nil @@ -637,10 +598,7 @@ func TestDialSimultaneousJoin(t *testing.T) { // raise any errors from the previous goroutines for i := 0; i < 3; i++ { - err := <-errs - if err != nil { - t.Fatal(err) - } + require.NoError(t, <-errs) } if c2 != c3 { @@ -660,13 +618,12 @@ func TestDialSimultaneousJoin(t *testing.T) { } func TestDialSelf(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + t.Parallel() swarms := makeSwarms(t, 2) + defer closeSwarms(swarms) s1 := swarms[0] - defer s1.Close() - _, err := s1.DialPeer(ctx, s1.LocalPeer()) + _, err := s1.DialPeer(context.Background(), s1.LocalPeer()) require.ErrorIs(t, err, ErrDialToSelf, "expected error from self dial") } diff --git a/limiter_test.go b/limiter_test.go index 4a592b86..cd485861 100644 --- a/limiter_test.go +++ b/limiter_test.go @@ -18,16 +18,14 @@ import ( mafmt "github.com/multiformats/go-multiaddr-fmt" ) -func mustAddr(t *testing.T, s string) ma.Multiaddr { - a, err := ma.NewMultiaddr(s) - if err != nil { - t.Fatal(err) - } - return a +func setDialTimeout(t time.Duration) (reset func()) { + orig := transport.DialTimeout + transport.DialTimeout = t + return func() { transport.DialTimeout = orig } } -func addrWithPort(t *testing.T, p int) ma.Multiaddr { - return mustAddr(t, fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p)) +func addrWithPort(p int) ma.Multiaddr { + return ma.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p)) } // in these tests I use addresses with tcp ports over a certain number to @@ -84,8 +82,8 @@ func TestLimiterBasicDials(t *testing.T) { l := newDialLimiterWithParams(hangDialFunc(hang), ConcurrentFdDials, 4) - bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)} - good := addrWithPort(t, 20) + bads := []ma.Multiaddr{addrWithPort(1), addrWithPort(2), addrWithPort(3), addrWithPort(4)} + good := addrWithPort(20) resch := make(chan dialResult) pid := peer.ID("testpeer") @@ -133,9 +131,9 @@ func TestFDLimiting(t *testing.T) { defer close(hang) l := newDialLimiterWithParams(hangDialFunc(hang), 16, 5) - bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)} + bads := []ma.Multiaddr{addrWithPort(1), addrWithPort(2), addrWithPort(3), addrWithPort(4)} pids := []peer.ID{"testpeer1", "testpeer2", "testpeer3", "testpeer4"} - goodTCP := addrWithPort(t, 20) + goodTCP := addrWithPort(20) ctx := context.Background() resch := make(chan dialResult) @@ -163,7 +161,7 @@ func TestFDLimiting(t *testing.T) { } pid5 := peer.ID("testpeer5") - utpaddr := mustAddr(t, "/ip4/127.0.0.1/udp/7777/utp") + utpaddr := ma.StringCast("/ip4/127.0.0.1/udp/7777/utp") // This should complete immediately since utp addresses arent blocked by fd rate limiting l.AddDialJob(&dialJob{ctx: ctx, peer: pid5, addr: utpaddr, resp: resch}) @@ -180,7 +178,7 @@ func TestFDLimiting(t *testing.T) { // A relay address with tcp transport will complete because we do not consume fds for dials // with relay addresses as the fd will be consumed when we actually dial the relay server. pid6 := test.RandPeerIDFatal(t) - relayAddr := mustAddr(t, fmt.Sprintf("/ip4/127.0.0.1/tcp/20/p2p-circuit/p2p/%s", pid6)) + relayAddr := ma.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/20/p2p-circuit/p2p/%s", pid6)) l.AddDialJob(&dialJob{ctx: ctx, peer: pid6, addr: relayAddr, resp: resch}) select { @@ -209,7 +207,7 @@ func TestTokenRedistribution(t *testing.T) { } l := newDialLimiterWithParams(df, 8, 4) - bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)} + bads := []ma.Multiaddr{addrWithPort(1), addrWithPort(2), addrWithPort(3), addrWithPort(4)} pids := []peer.ID{"testpeer1", "testpeer2"} ctx := context.Background() @@ -224,13 +222,11 @@ func TestTokenRedistribution(t *testing.T) { tryDialAddrs(ctx, l, pid, bads, resch) } - good := mustAddr(t, "/ip4/127.0.0.1/tcp/1001") - // add a good dial job for peer 1 l.AddDialJob(&dialJob{ ctx: ctx, peer: pids[1], - addr: good, + addr: ma.StringCast("/ip4/127.0.0.1/tcp/1001"), resp: resch, }) @@ -263,7 +259,7 @@ func TestTokenRedistribution(t *testing.T) { l.AddDialJob(&dialJob{ ctx: ctx, peer: pids[0], - addr: addrWithPort(t, 7), + addr: addrWithPort(7), resp: resch, }) @@ -304,10 +300,10 @@ func TestStressLimiter(t *testing.T) { var bads []ma.Multiaddr for i := 0; i < 100; i++ { - bads = append(bads, addrWithPort(t, i)) + bads = append(bads, addrWithPort(i)) } - addresses := append(bads, addrWithPort(t, 2000)) + addresses := append(bads, addrWithPort(2000)) success := make(chan struct{}) for i := 0; i < 20; i++ { @@ -345,6 +341,9 @@ func TestStressLimiter(t *testing.T) { } func TestFDLimitUnderflow(t *testing.T) { + reset := setDialTimeout(250 * time.Millisecond) + defer reset() + df := func(ctx context.Context, p peer.ID, addr ma.Multiaddr) (transport.CapableConn, error) { select { case <-ctx.Done(): @@ -358,7 +357,7 @@ func TestFDLimitUnderflow(t *testing.T) { var addrs []ma.Multiaddr for i := 0; i <= 1000; i++ { - addrs = append(addrs, addrWithPort(t, i)) + addrs = append(addrs, addrWithPort(i)) } wg := sync.WaitGroup{} diff --git a/simul_test.go b/simul_test.go index aa4eb590..0e7d6da7 100644 --- a/simul_test.go +++ b/simul_test.go @@ -56,7 +56,7 @@ func TestSimultOpenMany(t *testing.T) { addrs = 10 rounds = 5 } - SubtestSwarm(t, addrs, rounds) + subtestSwarm(t, addrs, rounds) } func TestSimultOpenFewStress(t *testing.T) { @@ -72,7 +72,7 @@ func TestSimultOpenFewStress(t *testing.T) { // rounds := 100 for i := 0; i < rounds; i++ { - SubtestSwarm(t, swarms, msgs) + subtestSwarm(t, swarms, msgs) <-time.After(10 * time.Millisecond) } } diff --git a/swarm_test.go b/swarm_test.go index 0dac49d1..0c5c341d 100644 --- a/swarm_test.go +++ b/swarm_test.go @@ -99,7 +99,7 @@ func connectSwarms(t *testing.T, ctx context.Context, swarms []*swarm.Swarm) { } } -func SubtestSwarm(t *testing.T, SwarmNum int, MsgNum int) { +func subtestSwarm(t *testing.T, SwarmNum int, MsgNum int) { swarms := makeSwarms(t, SwarmNum, OptDisableReuseport) // connect everyone @@ -216,29 +216,17 @@ func SubtestSwarm(t *testing.T, SwarmNum int, MsgNum int) { cancel() <-time.After(10 * time.Millisecond) } - - for _, s := range swarms { - s.Close() - } } func TestSwarm(t *testing.T) { - // t.Skip("skipping for another test") t.Parallel() - - // msgs := 1000 - msgs := 100 - swarms := 5 - SubtestSwarm(t, swarms, msgs) + subtestSwarm(t, 5, 100) } func TestBasicSwarm(t *testing.T) { // t.Skip("skipping for another test") t.Parallel() - - msgs := 1 - swarms := 2 - SubtestSwarm(t, swarms, msgs) + subtestSwarm(t, 2, 1) } func TestConnectionGating(t *testing.T) {