From 46c975de781d9d743618d2ba7a2e6ec3ce877cb4 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 19 Dec 2021 17:33:14 +0400 Subject: [PATCH] speed up the dial tests --- dial_test.go | 127 +++++++++++++----------------------------------- limiter_test.go | 43 ++++++++-------- simul_test.go | 4 +- swarm_test.go | 18 ++----- 4 files changed, 59 insertions(+), 133 deletions(-) diff --git a/dial_test.go b/dial_test.go index a3448640..fe8b32c2 100644 --- a/dial_test.go +++ b/dial_test.go @@ -82,11 +82,11 @@ func acceptAndHang(l net.Listener) { } func TestSimultDials(t *testing.T) { - // t.Skip("skipping for another test") t.Parallel() ctx := context.Background() swarms := makeSwarms(t, 2, swarmt.OptDisableReuseport) + defer closeSwarms(swarms) // connect everyone { @@ -128,10 +128,6 @@ func TestSimultDials(t *testing.T) { if c10l > 2 { t.Error("1->0 has", c10l) } - - for _, s := range swarms { - s.Close() - } } func newSilentPeer(t *testing.T) (peer.ID, ma.Multiaddr, net.Listener) { @@ -154,8 +150,6 @@ func newSilentPeer(t *testing.T) (peer.ID, ma.Multiaddr, net.Listener) { } func TestDialWait(t *testing.T) { - t.Parallel() - const dialTimeout = 250 * time.Millisecond swarms := makeSwarms(t, 1, swarmt.DialTimeout(dialTimeout)) @@ -193,22 +187,18 @@ func TestDialBackoff(t *testing.T) { if ci.IsRunning() { t.Skip("travis will never have fun with this test") } - t.Parallel() - const dialTimeout = 250 * time.Millisecond + const dialTimeout = 100 * time.Millisecond ctx := context.Background() swarms := makeSwarms(t, 2, swarmt.DialTimeout(dialTimeout)) + defer closeSwarms(swarms) s1 := swarms[0] s2 := swarms[1] - defer s1.Close() - defer s2.Close() s2addrs, err := s2.InterfaceListenAddresses() - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) s1.Peerstore().AddAddrs(s2.LocalPeer(), s2addrs, peerstore.PermanentAddrTTL) // dial to a non-existent peer. @@ -405,13 +395,10 @@ func TestDialBackoffClears(t *testing.T) { t.Parallel() const dialTimeout = 250 * time.Millisecond - - ctx := context.Background() swarms := makeSwarms(t, 2, swarmt.DialTimeout(dialTimeout)) + defer closeSwarms(swarms) s1 := swarms[0] s2 := swarms[1] - defer s1.Close() - defer s2.Close() // use another address first, that accept and hang on conns _, s2bad, s2l := newSilentPeer(t) @@ -422,13 +409,8 @@ func TestDialBackoffClears(t *testing.T) { s1.Peerstore().AddAddr(s2.LocalPeer(), s2bad, peerstore.PermanentAddrTTL) before := time.Now() - c, err := s1.DialPeer(ctx, s2.LocalPeer()) - if err == nil { - defer c.Close() - t.Fatal("dialing to broken addr worked...", err) - } else { - t.Log("correctly got error:", err) - } + _, err := s1.DialPeer(context.Background(), s2.LocalPeer()) + require.Error(t, err, "dialing to broken addr worked...") duration := time.Since(before) if duration < dialTimeout*DialAttempts { @@ -437,65 +419,38 @@ func TestDialBackoffClears(t *testing.T) { if duration > 2*dialTimeout*DialAttempts { t.Error("> 2*dialTimeout * DialAttempts not being respected", duration, 2*dialTimeout*DialAttempts) } - - if !s1.Backoff().Backoff(s2.LocalPeer(), s2bad) { - t.Error("s2 should now be on backoff") - } else { - t.Log("correctly added to backoff") - } + require.True(t, s1.Backoff().Backoff(s2.LocalPeer(), s2bad), "s2 should now be on backoff") // phase 2 -- add the working address. dial should succeed. - ifaceAddrs1, err := swarms[1].InterfaceListenAddresses() - if err != nil { - t.Fatal(err) - } + ifaceAddrs1, err := s2.InterfaceListenAddresses() + require.NoError(t, err) s1.Peerstore().AddAddrs(s2.LocalPeer(), ifaceAddrs1, peerstore.PermanentAddrTTL) - if c, err := s1.DialPeer(ctx, s2.LocalPeer()); err == nil { - c.Close() - t.Log("backoffs are per address, not peer") - } - - time.Sleep(BackoffBase) - - if c, err := s1.DialPeer(ctx, s2.LocalPeer()); err != nil { - t.Fatal(err) - } else { - c.Close() - t.Log("correctly connected") - } - - if s1.Backoff().Backoff(s2.LocalPeer(), s2bad) { - t.Error("s2 should no longer be on backoff") - } else { - t.Log("correctly cleared backoff") - } + // backoffs are per address, not peer + c, err := s1.DialPeer(context.Background(), s2.LocalPeer()) + require.NoError(t, err) + defer c.Close() + require.False(t, s1.Backoff().Backoff(s2.LocalPeer(), s2bad), "s2 should no longer be on backoff") } func TestDialPeerFailed(t *testing.T) { t.Parallel() - ctx := context.Background() - swarms := makeSwarms(t, 2) + swarms := makeSwarms(t, 2, swarmt.DialTimeout(100*time.Millisecond)) defer closeSwarms(swarms) testedSwarm, targetSwarm := swarms[0], swarms[1] - expectedErrorsCount := 5 + const expectedErrorsCount = 5 for i := 0; i < expectedErrorsCount; i++ { _, silentPeerAddress, silentPeerListener := newSilentPeer(t) go acceptAndHang(silentPeerListener) defer silentPeerListener.Close() - testedSwarm.Peerstore().AddAddr( - targetSwarm.LocalPeer(), - silentPeerAddress, - peerstore.PermanentAddrTTL) + testedSwarm.Peerstore().AddAddr(targetSwarm.LocalPeer(), silentPeerAddress, peerstore.PermanentAddrTTL) } - _, err := testedSwarm.DialPeer(ctx, targetSwarm.LocalPeer()) - if err == nil { - t.Fatal(err) - } + _, err := testedSwarm.DialPeer(context.Background(), targetSwarm.LocalPeer()) + require.Error(t, err) // dial_test.go:508: correctly get a combined error: failed to dial PEER: all dials failed // * [/ip4/127.0.0.1/tcp/46485] failed to negotiate security protocol: context deadline exceeded @@ -513,8 +468,6 @@ func TestDialPeerFailed(t *testing.T) { } func TestDialExistingConnection(t *testing.T) { - ctx := context.Background() - swarms := makeSwarms(t, 2) defer closeSwarms(swarms) s1 := swarms[0] @@ -522,19 +475,13 @@ func TestDialExistingConnection(t *testing.T) { s1.Peerstore().AddAddrs(s2.LocalPeer(), s2.ListenAddresses(), peerstore.PermanentAddrTTL) - c1, err := s1.DialPeer(ctx, s2.LocalPeer()) - if err != nil { - t.Fatal(err) - } + c1, err := s1.DialPeer(context.Background(), s2.LocalPeer()) + require.NoError(t, err) - c2, err := s1.DialPeer(ctx, s2.LocalPeer()) - if err != nil { - t.Fatal(err) - } + c2, err := s1.DialPeer(context.Background(), s2.LocalPeer()) + require.NoError(t, err) - if c1 != c2 { - t.Fatal("expecting the same connection from both dials") - } + require.Equal(t, c1, c2, "expecting the same connection from both dials") } func newSilentListener(t *testing.T) ([]ma.Multiaddr, net.Listener) { @@ -556,16 +503,12 @@ func newSilentListener(t *testing.T) ([]ma.Multiaddr, net.Listener) { } func TestDialSimultaneousJoin(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - const dialTimeout = 250 * time.Millisecond swarms := makeSwarms(t, 2, swarmt.DialTimeout(dialTimeout)) + defer closeSwarms(swarms) s1 := swarms[0] s2 := swarms[1] - defer s1.Close() - defer s2.Close() s2silentAddrs, s2silentListener := newSilentListener(t) go acceptAndHang(s2silentListener) @@ -577,7 +520,7 @@ func TestDialSimultaneousJoin(t *testing.T) { go func() { s1.Peerstore().AddAddrs(s2.LocalPeer(), s2silentAddrs, peerstore.PermanentAddrTTL) - c, err := s1.DialPeer(ctx, s2.LocalPeer()) + c, err := s1.DialPeer(context.Background(), s2.LocalPeer()) if err != nil { errs <- err connch <- nil @@ -602,7 +545,7 @@ func TestDialSimultaneousJoin(t *testing.T) { } s1.Peerstore().AddAddrs(s2.LocalPeer(), s2addrs[:1], peerstore.PermanentAddrTTL) - c, err := s1.DialPeer(ctx, s2.LocalPeer()) + c, err := s1.DialPeer(context.Background(), s2.LocalPeer()) if err != nil { errs <- err connch <- nil @@ -620,7 +563,7 @@ func TestDialSimultaneousJoin(t *testing.T) { // start a third dial to s2, this should get the existing connection from the successful dial go func() { - c, err := s1.DialPeer(ctx, s2.LocalPeer()) + c, err := s1.DialPeer(context.Background(), s2.LocalPeer()) if err != nil { errs <- err connch <- nil @@ -637,10 +580,7 @@ func TestDialSimultaneousJoin(t *testing.T) { // raise any errors from the previous goroutines for i := 0; i < 3; i++ { - err := <-errs - if err != nil { - t.Fatal(err) - } + require.NoError(t, <-errs) } if c2 != c3 { @@ -660,13 +600,12 @@ func TestDialSimultaneousJoin(t *testing.T) { } func TestDialSelf(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + t.Parallel() swarms := makeSwarms(t, 2) + defer closeSwarms(swarms) s1 := swarms[0] - defer s1.Close() - _, err := s1.DialPeer(ctx, s1.LocalPeer()) + _, err := s1.DialPeer(context.Background(), s1.LocalPeer()) require.ErrorIs(t, err, ErrDialToSelf, "expected error from self dial") } diff --git a/limiter_test.go b/limiter_test.go index 4a592b86..cd485861 100644 --- a/limiter_test.go +++ b/limiter_test.go @@ -18,16 +18,14 @@ import ( mafmt "github.com/multiformats/go-multiaddr-fmt" ) -func mustAddr(t *testing.T, s string) ma.Multiaddr { - a, err := ma.NewMultiaddr(s) - if err != nil { - t.Fatal(err) - } - return a +func setDialTimeout(t time.Duration) (reset func()) { + orig := transport.DialTimeout + transport.DialTimeout = t + return func() { transport.DialTimeout = orig } } -func addrWithPort(t *testing.T, p int) ma.Multiaddr { - return mustAddr(t, fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p)) +func addrWithPort(p int) ma.Multiaddr { + return ma.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p)) } // in these tests I use addresses with tcp ports over a certain number to @@ -84,8 +82,8 @@ func TestLimiterBasicDials(t *testing.T) { l := newDialLimiterWithParams(hangDialFunc(hang), ConcurrentFdDials, 4) - bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)} - good := addrWithPort(t, 20) + bads := []ma.Multiaddr{addrWithPort(1), addrWithPort(2), addrWithPort(3), addrWithPort(4)} + good := addrWithPort(20) resch := make(chan dialResult) pid := peer.ID("testpeer") @@ -133,9 +131,9 @@ func TestFDLimiting(t *testing.T) { defer close(hang) l := newDialLimiterWithParams(hangDialFunc(hang), 16, 5) - bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)} + bads := []ma.Multiaddr{addrWithPort(1), addrWithPort(2), addrWithPort(3), addrWithPort(4)} pids := []peer.ID{"testpeer1", "testpeer2", "testpeer3", "testpeer4"} - goodTCP := addrWithPort(t, 20) + goodTCP := addrWithPort(20) ctx := context.Background() resch := make(chan dialResult) @@ -163,7 +161,7 @@ func TestFDLimiting(t *testing.T) { } pid5 := peer.ID("testpeer5") - utpaddr := mustAddr(t, "/ip4/127.0.0.1/udp/7777/utp") + utpaddr := ma.StringCast("/ip4/127.0.0.1/udp/7777/utp") // This should complete immediately since utp addresses arent blocked by fd rate limiting l.AddDialJob(&dialJob{ctx: ctx, peer: pid5, addr: utpaddr, resp: resch}) @@ -180,7 +178,7 @@ func TestFDLimiting(t *testing.T) { // A relay address with tcp transport will complete because we do not consume fds for dials // with relay addresses as the fd will be consumed when we actually dial the relay server. pid6 := test.RandPeerIDFatal(t) - relayAddr := mustAddr(t, fmt.Sprintf("/ip4/127.0.0.1/tcp/20/p2p-circuit/p2p/%s", pid6)) + relayAddr := ma.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/20/p2p-circuit/p2p/%s", pid6)) l.AddDialJob(&dialJob{ctx: ctx, peer: pid6, addr: relayAddr, resp: resch}) select { @@ -209,7 +207,7 @@ func TestTokenRedistribution(t *testing.T) { } l := newDialLimiterWithParams(df, 8, 4) - bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)} + bads := []ma.Multiaddr{addrWithPort(1), addrWithPort(2), addrWithPort(3), addrWithPort(4)} pids := []peer.ID{"testpeer1", "testpeer2"} ctx := context.Background() @@ -224,13 +222,11 @@ func TestTokenRedistribution(t *testing.T) { tryDialAddrs(ctx, l, pid, bads, resch) } - good := mustAddr(t, "/ip4/127.0.0.1/tcp/1001") - // add a good dial job for peer 1 l.AddDialJob(&dialJob{ ctx: ctx, peer: pids[1], - addr: good, + addr: ma.StringCast("/ip4/127.0.0.1/tcp/1001"), resp: resch, }) @@ -263,7 +259,7 @@ func TestTokenRedistribution(t *testing.T) { l.AddDialJob(&dialJob{ ctx: ctx, peer: pids[0], - addr: addrWithPort(t, 7), + addr: addrWithPort(7), resp: resch, }) @@ -304,10 +300,10 @@ func TestStressLimiter(t *testing.T) { var bads []ma.Multiaddr for i := 0; i < 100; i++ { - bads = append(bads, addrWithPort(t, i)) + bads = append(bads, addrWithPort(i)) } - addresses := append(bads, addrWithPort(t, 2000)) + addresses := append(bads, addrWithPort(2000)) success := make(chan struct{}) for i := 0; i < 20; i++ { @@ -345,6 +341,9 @@ func TestStressLimiter(t *testing.T) { } func TestFDLimitUnderflow(t *testing.T) { + reset := setDialTimeout(250 * time.Millisecond) + defer reset() + df := func(ctx context.Context, p peer.ID, addr ma.Multiaddr) (transport.CapableConn, error) { select { case <-ctx.Done(): @@ -358,7 +357,7 @@ func TestFDLimitUnderflow(t *testing.T) { var addrs []ma.Multiaddr for i := 0; i <= 1000; i++ { - addrs = append(addrs, addrWithPort(t, i)) + addrs = append(addrs, addrWithPort(i)) } wg := sync.WaitGroup{} diff --git a/simul_test.go b/simul_test.go index aa4eb590..0e7d6da7 100644 --- a/simul_test.go +++ b/simul_test.go @@ -56,7 +56,7 @@ func TestSimultOpenMany(t *testing.T) { addrs = 10 rounds = 5 } - SubtestSwarm(t, addrs, rounds) + subtestSwarm(t, addrs, rounds) } func TestSimultOpenFewStress(t *testing.T) { @@ -72,7 +72,7 @@ func TestSimultOpenFewStress(t *testing.T) { // rounds := 100 for i := 0; i < rounds; i++ { - SubtestSwarm(t, swarms, msgs) + subtestSwarm(t, swarms, msgs) <-time.After(10 * time.Millisecond) } } diff --git a/swarm_test.go b/swarm_test.go index 0dac49d1..0c5c341d 100644 --- a/swarm_test.go +++ b/swarm_test.go @@ -99,7 +99,7 @@ func connectSwarms(t *testing.T, ctx context.Context, swarms []*swarm.Swarm) { } } -func SubtestSwarm(t *testing.T, SwarmNum int, MsgNum int) { +func subtestSwarm(t *testing.T, SwarmNum int, MsgNum int) { swarms := makeSwarms(t, SwarmNum, OptDisableReuseport) // connect everyone @@ -216,29 +216,17 @@ func SubtestSwarm(t *testing.T, SwarmNum int, MsgNum int) { cancel() <-time.After(10 * time.Millisecond) } - - for _, s := range swarms { - s.Close() - } } func TestSwarm(t *testing.T) { - // t.Skip("skipping for another test") t.Parallel() - - // msgs := 1000 - msgs := 100 - swarms := 5 - SubtestSwarm(t, swarms, msgs) + subtestSwarm(t, 5, 100) } func TestBasicSwarm(t *testing.T) { // t.Skip("skipping for another test") t.Parallel() - - msgs := 1 - swarms := 2 - SubtestSwarm(t, swarms, msgs) + subtestSwarm(t, 2, 1) } func TestConnectionGating(t *testing.T) {