Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
speed up the dial tests
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Dec 19, 2021
1 parent a89449e commit cd80a07
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 118 deletions.
115 changes: 36 additions & 79 deletions dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"github.com/stretchr/testify/require"
)

func init() {
transport.DialTimeout = time.Second
func setDialTimeout(t time.Duration) (reset func()) {
orig := transport.DialTimeout
transport.DialTimeout = t
return func() { transport.DialTimeout = orig }
}

func closeSwarms(swarms []*Swarm) {
Expand Down Expand Up @@ -92,6 +94,7 @@ func TestSimultDials(t *testing.T) {

ctx := context.Background()
swarms := makeSwarms(t, 2, swarmt.OptDisableReuseport)
defer closeSwarms(swarms)

// connect everyone
{
Expand Down Expand Up @@ -133,10 +136,6 @@ func TestSimultDials(t *testing.T) {
if c10l > 2 {
t.Error("1->0 has", c10l)
}

for _, s := range swarms {
s.Close()
}
}

func newSilentPeer(t *testing.T) (peer.ID, ma.Multiaddr, net.Listener) {
Expand All @@ -159,7 +158,8 @@ func newSilentPeer(t *testing.T) (peer.ID, ma.Multiaddr, net.Listener) {
}

func TestDialWait(t *testing.T) {
t.Parallel()
reset := setDialTimeout(250 * time.Millisecond)
defer reset()

ctx := context.Background()
swarms := makeSwarms(t, 1)
Expand Down Expand Up @@ -194,7 +194,6 @@ func TestDialWait(t *testing.T) {
}

func TestDialBackoff(t *testing.T) {
// t.Skip("skipping for another test")
if ci.IsRunning() {
t.Skip("travis will never have fun with this test")
}
Expand All @@ -203,10 +202,9 @@ func TestDialBackoff(t *testing.T) {

ctx := context.Background()
swarms := makeSwarms(t, 2)
defer closeSwarms(swarms)
s1 := swarms[0]
s2 := swarms[1]
defer s1.Close()
defer s2.Close()

s2addrs, err := s2.InterfaceListenAddresses()
if err != nil {
Expand Down Expand Up @@ -405,15 +403,13 @@ func TestDialBackoff(t *testing.T) {
}

func TestDialBackoffClears(t *testing.T) {
// t.Skip("skipping for another test")
t.Parallel()
reset := setDialTimeout(250 * time.Millisecond)
defer reset()

ctx := context.Background()
swarms := makeSwarms(t, 2)
defer closeSwarms(swarms)
s1 := swarms[0]
s2 := swarms[1]
defer s1.Close()
defer s2.Close()

// use another address first, that accept and hang on conns
_, s2bad, s2l := newSilentPeer(t)
Expand All @@ -424,13 +420,8 @@ func TestDialBackoffClears(t *testing.T) {
s1.Peerstore().AddAddr(s2.LocalPeer(), s2bad, peerstore.PermanentAddrTTL)

before := time.Now()
c, err := s1.DialPeer(ctx, s2.LocalPeer())
if err == nil {
defer c.Close()
t.Fatal("dialing to broken addr worked...", err)
} else {
t.Log("correctly got error:", err)
}
_, err := s1.DialPeer(context.Background(), s2.LocalPeer())
require.Error(t, err, "dialing to broken addr worked...")
duration := time.Since(before)

if duration < transport.DialTimeout*DialAttempts {
Expand All @@ -439,39 +430,18 @@ func TestDialBackoffClears(t *testing.T) {
if duration > 2*transport.DialTimeout*DialAttempts {
t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts)
}

if !s1.Backoff().Backoff(s2.LocalPeer(), s2bad) {
t.Error("s2 should now be on backoff")
} else {
t.Log("correctly added to backoff")
}
require.True(t, s1.Backoff().Backoff(s2.LocalPeer(), s2bad), "s2 should now be on backoff")

// phase 2 -- add the working address. dial should succeed.
ifaceAddrs1, err := swarms[1].InterfaceListenAddresses()
if err != nil {
t.Fatal(err)
}
ifaceAddrs1, err := s2.InterfaceListenAddresses()
require.NoError(t, err)
s1.Peerstore().AddAddrs(s2.LocalPeer(), ifaceAddrs1, peerstore.PermanentAddrTTL)

if c, err := s1.DialPeer(ctx, s2.LocalPeer()); err == nil {
c.Close()
t.Log("backoffs are per address, not peer")
}

time.Sleep(BackoffBase)

if c, err := s1.DialPeer(ctx, s2.LocalPeer()); err != nil {
t.Fatal(err)
} else {
c.Close()
t.Log("correctly connected")
}

if s1.Backoff().Backoff(s2.LocalPeer(), s2bad) {
t.Error("s2 should no longer be on backoff")
} else {
t.Log("correctly cleared backoff")
}
// backoffs are per address, not peer
c, err := s1.DialPeer(context.Background(), s2.LocalPeer())
require.NoError(t, err)
defer c.Close()
require.False(t, s1.Backoff().Backoff(s2.LocalPeer(), s2bad), "s2 should no longer be on backoff")
}

func TestDialPeerFailed(t *testing.T) {
Expand Down Expand Up @@ -515,28 +485,20 @@ func TestDialPeerFailed(t *testing.T) {
}

func TestDialExistingConnection(t *testing.T) {
ctx := context.Background()

swarms := makeSwarms(t, 2)
defer closeSwarms(swarms)
s1 := swarms[0]
s2 := swarms[1]

s1.Peerstore().AddAddrs(s2.LocalPeer(), s2.ListenAddresses(), peerstore.PermanentAddrTTL)

c1, err := s1.DialPeer(ctx, s2.LocalPeer())
if err != nil {
t.Fatal(err)
}
c1, err := s1.DialPeer(context.Background(), s2.LocalPeer())
require.NoError(t, err)

c2, err := s1.DialPeer(ctx, s2.LocalPeer())
if err != nil {
t.Fatal(err)
}
c2, err := s1.DialPeer(context.Background(), s2.LocalPeer())
require.NoError(t, err)

if c1 != c2 {
t.Fatal("expecting the same connection from both dials")
}
require.Equal(t, c1, c2, "expecting the same connection from both dials")
}

func newSilentListener(t *testing.T) ([]ma.Multiaddr, net.Listener) {
Expand All @@ -558,14 +520,13 @@ func newSilentListener(t *testing.T) ([]ma.Multiaddr, net.Listener) {
}

func TestDialSimultaneousJoin(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
reset := setDialTimeout(250 * time.Millisecond)
defer reset()

swarms := makeSwarms(t, 2)
defer closeSwarms(swarms)
s1 := swarms[0]
s2 := swarms[1]
defer s1.Close()
defer s2.Close()

s2silentAddrs, s2silentListener := newSilentListener(t)
go acceptAndHang(s2silentListener)
Expand All @@ -577,7 +538,7 @@ func TestDialSimultaneousJoin(t *testing.T) {
go func() {
s1.Peerstore().AddAddrs(s2.LocalPeer(), s2silentAddrs, peerstore.PermanentAddrTTL)

c, err := s1.DialPeer(ctx, s2.LocalPeer())
c, err := s1.DialPeer(context.Background(), s2.LocalPeer())
if err != nil {
errs <- err
connch <- nil
Expand All @@ -602,7 +563,7 @@ func TestDialSimultaneousJoin(t *testing.T) {
}
s1.Peerstore().AddAddrs(s2.LocalPeer(), s2addrs[:1], peerstore.PermanentAddrTTL)

c, err := s1.DialPeer(ctx, s2.LocalPeer())
c, err := s1.DialPeer(context.Background(), s2.LocalPeer())
if err != nil {
errs <- err
connch <- nil
Expand All @@ -620,7 +581,7 @@ func TestDialSimultaneousJoin(t *testing.T) {

// start a third dial to s2, this should get the existing connection from the successful dial
go func() {
c, err := s1.DialPeer(ctx, s2.LocalPeer())
c, err := s1.DialPeer(context.Background(), s2.LocalPeer())
if err != nil {
errs <- err
connch <- nil
Expand All @@ -637,10 +598,7 @@ func TestDialSimultaneousJoin(t *testing.T) {

// raise any errors from the previous goroutines
for i := 0; i < 3; i++ {
err := <-errs
if err != nil {
t.Fatal(err)
}
require.NoError(t, <-errs)
}

if c2 != c3 {
Expand All @@ -660,13 +618,12 @@ func TestDialSimultaneousJoin(t *testing.T) {
}

func TestDialSelf(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Parallel()

swarms := makeSwarms(t, 2)
defer closeSwarms(swarms)
s1 := swarms[0]
defer s1.Close()

_, err := s1.DialPeer(ctx, s1.LocalPeer())
_, err := s1.DialPeer(context.Background(), s1.LocalPeer())
require.ErrorIs(t, err, ErrDialToSelf, "expected error from self dial")
}
43 changes: 21 additions & 22 deletions limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ import (
mafmt "github.com/multiformats/go-multiaddr-fmt"
)

func mustAddr(t *testing.T, s string) ma.Multiaddr {
a, err := ma.NewMultiaddr(s)
if err != nil {
t.Fatal(err)
}
return a
func setDialTimeout(t time.Duration) (reset func()) {
orig := transport.DialTimeout
transport.DialTimeout = t
return func() { transport.DialTimeout = orig }
}

func addrWithPort(t *testing.T, p int) ma.Multiaddr {
return mustAddr(t, fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p))
func addrWithPort(p int) ma.Multiaddr {
return ma.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p))
}

// in these tests I use addresses with tcp ports over a certain number to
Expand Down Expand Up @@ -84,8 +82,8 @@ func TestLimiterBasicDials(t *testing.T) {

l := newDialLimiterWithParams(hangDialFunc(hang), ConcurrentFdDials, 4)

bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
good := addrWithPort(t, 20)
bads := []ma.Multiaddr{addrWithPort(1), addrWithPort(2), addrWithPort(3), addrWithPort(4)}
good := addrWithPort(20)

resch := make(chan dialResult)
pid := peer.ID("testpeer")
Expand Down Expand Up @@ -133,9 +131,9 @@ func TestFDLimiting(t *testing.T) {
defer close(hang)
l := newDialLimiterWithParams(hangDialFunc(hang), 16, 5)

bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
bads := []ma.Multiaddr{addrWithPort(1), addrWithPort(2), addrWithPort(3), addrWithPort(4)}
pids := []peer.ID{"testpeer1", "testpeer2", "testpeer3", "testpeer4"}
goodTCP := addrWithPort(t, 20)
goodTCP := addrWithPort(20)

ctx := context.Background()
resch := make(chan dialResult)
Expand Down Expand Up @@ -163,7 +161,7 @@ func TestFDLimiting(t *testing.T) {
}

pid5 := peer.ID("testpeer5")
utpaddr := mustAddr(t, "/ip4/127.0.0.1/udp/7777/utp")
utpaddr := ma.StringCast("/ip4/127.0.0.1/udp/7777/utp")

// This should complete immediately since utp addresses arent blocked by fd rate limiting
l.AddDialJob(&dialJob{ctx: ctx, peer: pid5, addr: utpaddr, resp: resch})
Expand All @@ -180,7 +178,7 @@ func TestFDLimiting(t *testing.T) {
// A relay address with tcp transport will complete because we do not consume fds for dials
// with relay addresses as the fd will be consumed when we actually dial the relay server.
pid6 := test.RandPeerIDFatal(t)
relayAddr := mustAddr(t, fmt.Sprintf("/ip4/127.0.0.1/tcp/20/p2p-circuit/p2p/%s", pid6))
relayAddr := ma.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/20/p2p-circuit/p2p/%s", pid6))
l.AddDialJob(&dialJob{ctx: ctx, peer: pid6, addr: relayAddr, resp: resch})

select {
Expand Down Expand Up @@ -209,7 +207,7 @@ func TestTokenRedistribution(t *testing.T) {
}
l := newDialLimiterWithParams(df, 8, 4)

bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
bads := []ma.Multiaddr{addrWithPort(1), addrWithPort(2), addrWithPort(3), addrWithPort(4)}
pids := []peer.ID{"testpeer1", "testpeer2"}

ctx := context.Background()
Expand All @@ -224,13 +222,11 @@ func TestTokenRedistribution(t *testing.T) {
tryDialAddrs(ctx, l, pid, bads, resch)
}

good := mustAddr(t, "/ip4/127.0.0.1/tcp/1001")

// add a good dial job for peer 1
l.AddDialJob(&dialJob{
ctx: ctx,
peer: pids[1],
addr: good,
addr: ma.StringCast("/ip4/127.0.0.1/tcp/1001"),
resp: resch,
})

Expand Down Expand Up @@ -263,7 +259,7 @@ func TestTokenRedistribution(t *testing.T) {
l.AddDialJob(&dialJob{
ctx: ctx,
peer: pids[0],
addr: addrWithPort(t, 7),
addr: addrWithPort(7),
resp: resch,
})

Expand Down Expand Up @@ -304,10 +300,10 @@ func TestStressLimiter(t *testing.T) {

var bads []ma.Multiaddr
for i := 0; i < 100; i++ {
bads = append(bads, addrWithPort(t, i))
bads = append(bads, addrWithPort(i))
}

addresses := append(bads, addrWithPort(t, 2000))
addresses := append(bads, addrWithPort(2000))
success := make(chan struct{})

for i := 0; i < 20; i++ {
Expand Down Expand Up @@ -345,6 +341,9 @@ func TestStressLimiter(t *testing.T) {
}

func TestFDLimitUnderflow(t *testing.T) {
reset := setDialTimeout(250 * time.Millisecond)
defer reset()

df := func(ctx context.Context, p peer.ID, addr ma.Multiaddr) (transport.CapableConn, error) {
select {
case <-ctx.Done():
Expand All @@ -358,7 +357,7 @@ func TestFDLimitUnderflow(t *testing.T) {

var addrs []ma.Multiaddr
for i := 0; i <= 1000; i++ {
addrs = append(addrs, addrWithPort(t, i))
addrs = append(addrs, addrWithPort(i))
}

wg := sync.WaitGroup{}
Expand Down
4 changes: 2 additions & 2 deletions simul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestSimultOpenMany(t *testing.T) {
addrs = 10
rounds = 5
}
SubtestSwarm(t, addrs, rounds)
subtestSwarm(t, addrs, rounds)
}

func TestSimultOpenFewStress(t *testing.T) {
Expand All @@ -72,7 +72,7 @@ func TestSimultOpenFewStress(t *testing.T) {
// rounds := 100

for i := 0; i < rounds; i++ {
SubtestSwarm(t, swarms, msgs)
subtestSwarm(t, swarms, msgs)
<-time.After(10 * time.Millisecond)
}
}
Loading

0 comments on commit cd80a07

Please sign in to comment.