From 426203b4e351b591ae48702cc4a207c366b0bf76 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 11 Jan 2022 16:00:03 +0200 Subject: [PATCH] basic rcmgr integration tests --- itest/echo.go | 31 ++++-- itest/echo_test.go | 49 ++++------ itest/rcmgr_test.go | 233 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 275 insertions(+), 38 deletions(-) create mode 100644 itest/rcmgr_test.go diff --git a/itest/echo.go b/itest/echo.go index f64fbe316d..6ea0b883c4 100644 --- a/itest/echo.go +++ b/itest/echo.go @@ -26,7 +26,7 @@ var ( type Echo struct { Host host.Host - WaitBeforeRead, WaitBeforeWrite func() error + BeforeReserve, BeforeRead, BeforeWrite, BeforeDone func() error mx sync.Mutex status EchoStatus @@ -60,6 +60,15 @@ func (e *Echo) handleStream(s network.Stream) { e.status.StreamsIn++ e.mx.Unlock() + if e.BeforeReserve != nil { + if err := e.BeforeReserve(); err != nil { + echoLog.Debugf("error syncing before reserve: %s", err) + + s.Reset() + return + } + } + if err := s.Scope().SetService(EchoService); err != nil { echoLog.Debugf("error attaching stream to echo service: %s", err) @@ -82,9 +91,9 @@ func (e *Echo) handleStream(s network.Stream) { return } - if e.WaitBeforeRead != nil { - if err := e.WaitBeforeRead(); err != nil { - echoLog.Debugf("error waiting before read: %s", err) + if e.BeforeRead != nil { + if err := e.BeforeRead(); err != nil { + echoLog.Debugf("error syncing before read: %s", err) s.Reset() return @@ -116,9 +125,9 @@ func (e *Echo) handleStream(s network.Stream) { e.status.EchosIn++ e.mx.Unlock() - if e.WaitBeforeWrite != nil { - if err := e.WaitBeforeWrite(); err != nil { - echoLog.Debugf("error waiting before write: %s", err) + if e.BeforeWrite != nil { + if err := e.BeforeWrite(); err != nil { + echoLog.Debugf("error syncing before write: %s", err) s.Reset() return @@ -143,6 +152,14 @@ func (e *Echo) handleStream(s network.Stream) { e.mx.Unlock() s.CloseWrite() + + if e.BeforeDone != nil { + if err := e.BeforeDone(); err != nil { + echoLog.Debugf("error syncing before done: %s", err) + + s.Reset() + } + } } func (e *Echo) Echo(p peer.ID, what string) error { diff --git a/itest/echo_test.go b/itest/echo_test.go index 9eb6f5f151..52d92067ed 100644 --- a/itest/echo_test.go +++ b/itest/echo_test.go @@ -7,12 +7,19 @@ import ( "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" + + "github.com/stretchr/testify/require" ) -func createEchos(t *testing.T, count int, opts ...libp2p.Option) []*Echo { +func createEchos(t *testing.T, count int, makeOpts ...func() libp2p.Option) []*Echo { result := make([]*Echo, 0, count) for i := 0; i < count; i++ { + opts := make([]libp2p.Option, 0, len(makeOpts)) + for _, makeOpt := range makeOpts { + opts = append(opts, makeOpt()) + } + h, err := libp2p.New(opts...) if err != nil { t.Fatal(err) @@ -35,46 +42,26 @@ func createEchos(t *testing.T, count int, opts ...libp2p.Option) []*Echo { return result } +func closeEchos(echos []*Echo) { + for _, e := range echos { + e.Host.Close() + } +} + func checkEchoStatus(t *testing.T, e *Echo, expected EchoStatus) { t.Helper() - - status := e.Status() - - if status.StreamsIn != expected.StreamsIn { - t.Fatalf("expected %d streams in, got %d", expected.StreamsIn, status.StreamsIn) - } - if status.EchosIn != expected.EchosIn { - t.Fatalf("expected %d echos in, got %d", expected.EchosIn, status.EchosIn) - } - if status.EchosOut != expected.EchosOut { - t.Fatalf("expected %d echos out, got %d", expected.EchosOut, status.EchosOut) - } - if status.IOErrors != expected.IOErrors { - t.Fatalf("expected %d I/O errors, got %d", expected.IOErrors, status.IOErrors) - } - if status.ResourceServiceErrors != expected.ResourceServiceErrors { - t.Fatalf("expected %d service resource errors, got %d", expected.ResourceServiceErrors, status.ResourceServiceErrors) - } - if status.ResourceReservationErrors != expected.ResourceReservationErrors { - t.Fatalf("expected %d reservation resource errors, got %d", expected.ResourceReservationErrors, status.ResourceReservationErrors) - } + require.Equal(t, expected, e.Status()) } func TestEcho(t *testing.T) { echos := createEchos(t, 2) + defer closeEchos(echos) - err := echos[0].Host.Connect(context.TODO(), peer.AddrInfo{ID: echos[1].Host.ID()}) - if err != nil { + if err := echos[0].Host.Connect(context.TODO(), peer.AddrInfo{ID: echos[1].Host.ID()}); err != nil { t.Fatal(err) } - defer func() { - for _, e := range echos { - e.Host.Close() - } - }() - - if err = echos[0].Echo(echos[1].Host.ID(), "hello libp2p"); err != nil { + if err := echos[0].Echo(echos[1].Host.ID(), "hello libp2p"); err != nil { t.Fatal(err) } diff --git a/itest/rcmgr_test.go b/itest/rcmgr_test.go new file mode 100644 index 0000000000..b7ef0e9e83 --- /dev/null +++ b/itest/rcmgr_test.go @@ -0,0 +1,233 @@ +package itest + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + rcmgr "github.com/libp2p/go-libp2p-resource-manager" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/peer" +) + +func makeRcmgrOption(t *testing.T, limiter *rcmgr.BasicLimiter) func() libp2p.Option { + return func() libp2p.Option { + mgr, err := rcmgr.NewResourceManager(limiter) + if err != nil { + t.Fatal(err) + } + return libp2p.ResourceManager(mgr) + } +} + +func TestResourceManagerConnInbound(t *testing.T) { + // this test checks that we can not exceed the inbound conn limit at system level + // we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns + limiter := rcmgr.NewFixedLimiter(1 << 30) + limiter.SystemLimits = limiter.SystemLimits.WithConnLimit(3, 1024, 1024) + limiter.DefaultPeerLimits = limiter.DefaultPeerLimits.WithConnLimit(1, 16, 16) + + echos := createEchos(t, 5, makeRcmgrOption(t, limiter)) + defer closeEchos(echos) + + for i := 1; i < 4; i++ { + err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()}) + if err != nil { + t.Fatal(err) + } + time.Sleep(10 * time.Millisecond) + } + + for i := 1; i < 4; i++ { + count := len(echos[i].Host.Network().ConnsToPeer(echos[0].Host.ID())) + if count != 1 { + t.Fatalf("expected %d connections to peer, got %d", 1, count) + } + } + + err := echos[4].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()}) + if err == nil { + t.Fatal("expected ResourceManager to block incoming connection") + } +} + +func TestResourceManagerConnOutbound(t *testing.T) { + // this test checks that we can not exceed the inbound conn limit at system level + // we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns + limiter := rcmgr.NewFixedLimiter(1 << 30) + limiter.SystemLimits = limiter.SystemLimits.WithConnLimit(1024, 3, 1024) + limiter.DefaultPeerLimits = limiter.DefaultPeerLimits.WithConnLimit(16, 1, 16) + echos := createEchos(t, 5, makeRcmgrOption(t, limiter)) + defer closeEchos(echos) + + for i := 1; i < 4; i++ { + err := echos[0].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[i].Host.ID()}) + if err != nil { + t.Fatal(err) + } + time.Sleep(10 * time.Millisecond) + } + + for i := 1; i < 4; i++ { + count := len(echos[i].Host.Network().ConnsToPeer(echos[0].Host.ID())) + if count != 1 { + t.Fatalf("expected %d connections to peer, got %d", 1, count) + } + } + + err := echos[0].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[4].Host.ID()}) + if err == nil { + t.Fatal("expected ResourceManager to block incoming connection") + } +} + +func TestResourceManagerServiceInbound(t *testing.T) { + // this test checks that we can not exceed the inbound stream limit at service level + // we specify: 3 streams for the service, and we try to create 4 streams + limiter := rcmgr.NewFixedLimiter(1 << 30) + limiter.DefaultServiceLimits = limiter.DefaultServiceLimits.WithStreamLimit(3, 1024, 1024) + echos := createEchos(t, 5, makeRcmgrOption(t, limiter)) + defer closeEchos(echos) + + ready := new(chan struct{}) + echos[0].BeforeDone = waitForChannel(ready, time.Minute) + + for i := 1; i < 5; i++ { + err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()}) + if err != nil { + t.Fatal(err) + } + time.Sleep(10 * time.Millisecond) + } + + *ready = make(chan struct{}) + + var once sync.Once + var wg sync.WaitGroup + for i := 1; i < 5; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + + err := echos[i].Echo(echos[0].Host.ID(), "hello libp2p") + if err != nil { + t.Log(err) + once.Do(func() { + close(*ready) + }) + } + }(i) + } + wg.Wait() + + checkEchoStatus(t, echos[0], EchoStatus{ + StreamsIn: 4, + EchosIn: 3, + EchosOut: 3, + ResourceServiceErrors: 1, + }) +} + +func TestResourceManagerServicePeerInbound(t *testing.T) { + // this test checks that we cannot exceed the per peer inbound stream limit at service level + // we specify: 2 streams per peer for echo, and we try to create 3 streams + limiter := rcmgr.NewFixedLimiter(1 << 30) + limiter.ServicePeerLimits = map[string]rcmgr.Limit{ + EchoService: limiter.DefaultPeerLimits.WithStreamLimit(2, 1024, 1024), + } + echos := createEchos(t, 5, makeRcmgrOption(t, limiter)) + defer closeEchos(echos) + + count := new(int32) + ready := new(chan struct{}) + echos[0].BeforeDone = waitForBarrier(count, ready, time.Minute) + + for i := 1; i < 5; i++ { + err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()}) + if err != nil { + t.Fatal(err) + } + time.Sleep(10 * time.Millisecond) + } + + *count = 4 + *ready = make(chan struct{}) + + var wg sync.WaitGroup + for i := 1; i < 5; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + + err := echos[i].Echo(echos[0].Host.ID(), "hello libp2p") + if err != nil { + t.Log(err) + } + }(i) + } + wg.Wait() + + checkEchoStatus(t, echos[0], EchoStatus{ + StreamsIn: 4, + EchosIn: 4, + EchosOut: 4, + ResourceServiceErrors: 0, + }) + + *ready = make(chan struct{}) + echos[0].BeforeDone = waitForChannel(ready, time.Minute) + + var once sync.Once + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + err := echos[2].Echo(echos[0].Host.ID(), "hello libp2p") + if err != nil { + t.Log(err) + once.Do(func() { + close(*ready) + }) + } + }() + } + wg.Wait() + + checkEchoStatus(t, echos[0], EchoStatus{ + StreamsIn: 7, + EchosIn: 6, + EchosOut: 6, + ResourceServiceErrors: 1, + }) +} + +func waitForBarrier(count *int32, ready *chan struct{}, timeout time.Duration) func() error { + return func() error { + if atomic.AddInt32(count, -1) == 0 { + close(*ready) + } + + select { + case <-*ready: + return nil + case <-time.After(timeout): + return fmt.Errorf("timeout") + } + } +} + +func waitForChannel(ready *chan struct{}, timeout time.Duration) func() error { + return func() error { + select { + case <-*ready: + return nil + case <-time.After(timeout): + return fmt.Errorf("timeout") + } + } +}