Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Fix flaky DontHaveTimeoutManger tests #495

Merged
merged 6 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 33 additions & 20 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ func WithScoreLedger(scoreLedger deciface.ScoreLedger) Option {
}
}

func SetSimulateDontHavesOnTimeout(send bool) Option {
return func(bs *Bitswap) {
bs.simulateDontHavesOnTimeout = send
}
}

// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate. Runs until context is cancelled or bitswap.Close is called.
Expand Down Expand Up @@ -149,9 +155,12 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
// has an old version of Bitswap that doesn't support DONT_HAVE messages,
// or when no response is received within a timeout.
var sm *bssm.SessionManager
var bs *Bitswap
onDontHaveTimeout := func(p peer.ID, dontHaves []cid.Cid) {
// Simulate a message arriving with DONT_HAVEs
sm.ReceiveFrom(ctx, p, nil, nil, dontHaves)
if bs.simulateDontHavesOnTimeout {
sm.ReceiveFrom(ctx, p, nil, nil, dontHaves)
}
}
peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
return bsmq.New(ctx, p, network, onDontHaveTimeout)
Expand Down Expand Up @@ -182,25 +191,26 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
notif := notifications.New()
sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self())

bs := &Bitswap{
blockstore: bstore,
network: network,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
pm: pm,
pqm: pqm,
sm: sm,
sim: sim,
notif: notif,
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
provideEnabled: true,
provSearchDelay: defaultProvSearchDelay,
rebroadcastDelay: delay.Fixed(time.Minute),
engineBstoreWorkerCount: defaulEngineBlockstoreWorkerCount,
bs = &Bitswap{
blockstore: bstore,
network: network,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
pm: pm,
pqm: pqm,
sm: sm,
sim: sim,
notif: notif,
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
provideEnabled: true,
provSearchDelay: defaultProvSearchDelay,
rebroadcastDelay: delay.Fixed(time.Minute),
engineBstoreWorkerCount: defaulEngineBlockstoreWorkerCount,
simulateDontHavesOnTimeout: true,
}

// apply functional options before starting and running bitswap
Expand Down Expand Up @@ -293,6 +303,9 @@ type Bitswap struct {

// the score ledger used by the decision engine
engineScoreLedger deciface.ScoreLedger

// whether we should actually simulate dont haves on request timeout
simulateDontHavesOnTimeout bool
}

type counters struct {
Expand Down
2 changes: 1 addition & 1 deletion bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestSessionBetweenPeers(t *testing.T) {
defer cancel()

vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(time.Millisecond))
ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{bitswap.SetSimulateDontHavesOnTimeout(false)})
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module github.com/ipfs/go-bitswap

require (
github.com/benbjohnson/clock v1.1.0
github.com/cskr/pubsub v1.0.2
github.com/gogo/protobuf v1.3.1
github.com/google/uuid v1.1.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETF
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
Expand Down
37 changes: 21 additions & 16 deletions internal/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

"github.com/benbjohnson/clock"
"github.com/ipfs/go-bitswap/internal/testutil"
message "github.com/ipfs/go-bitswap/message"
pb "github.com/ipfs/go-bitswap/message/pb"
Expand Down Expand Up @@ -91,13 +92,13 @@ type engineSet struct {
}

func newTestEngine(ctx context.Context, idStr string) engineSet {
return newTestEngineWithSampling(ctx, idStr, shortTerm, nil)
return newTestEngineWithSampling(ctx, idStr, shortTerm, nil, clock.New())
}

func newTestEngineWithSampling(ctx context.Context, idStr string, peerSampleInterval time.Duration, sampleCh chan struct{}) engineSet {
func newTestEngineWithSampling(ctx context.Context, idStr string, peerSampleInterval time.Duration, sampleCh chan struct{}, clock clock.Clock) engineSet {
fpt := &fakePeerTagger{}
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
e := newEngine(bs, 4, fpt, "localhost", 0, NewTestScoreLedger(peerSampleInterval, sampleCh))
e := newEngine(bs, 4, fpt, "localhost", 0, NewTestScoreLedger(peerSampleInterval, sampleCh, clock))
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
return engineSet{
Peer: peer.ID(idStr),
Expand Down Expand Up @@ -184,7 +185,7 @@ func peerIsPartner(p peer.ID, e *Engine) bool {

func TestOutboxClosedWhenEngineClosed(t *testing.T) {
t.SkipNow() // TODO implement *Engine.Close
e := newEngine(blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New()))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -512,7 +513,7 @@ func TestPartnerWantHaveWantBlockNonActive(t *testing.T) {
testCases = onlyTestCases
}

e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New()))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
for i, testCase := range testCases {
t.Logf("Test case %d:", i)
Expand Down Expand Up @@ -668,7 +669,7 @@ func TestPartnerWantHaveWantBlockActive(t *testing.T) {
testCases = onlyTestCases
}

e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New()))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

var next envChan
Expand Down Expand Up @@ -853,7 +854,7 @@ func TestPartnerWantsThenCancels(t *testing.T) {
ctx := context.Background()
for i := 0; i < numRounds; i++ {
expected := make([][]string, 0, len(testcases))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New()))
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
for _, testcase := range testcases {
set := testcase[0]
Expand All @@ -878,7 +879,7 @@ func TestSendReceivedBlocksToPeersThatWantThem(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)

e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New()))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

blks := testutil.GenerateBlocksOfSize(4, 8*1024)
Expand Down Expand Up @@ -922,7 +923,7 @@ func TestSendDontHave(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)

e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New()))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

blks := testutil.GenerateBlocksOfSize(4, 8*1024)
Expand Down Expand Up @@ -986,7 +987,7 @@ func TestWantlistForPeer(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)

e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New()))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

blks := testutil.GenerateBlocksOfSize(4, 8*1024)
Expand Down Expand Up @@ -1044,13 +1045,15 @@ func TestTaggingPeers(t *testing.T) {
}

func TestTaggingUseful(t *testing.T) {
peerSampleInterval := 1 * time.Millisecond
peerSampleIntervalHalf := 10 * time.Millisecond

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

sampleCh := make(chan struct{})
me := newTestEngineWithSampling(ctx, "engine", peerSampleInterval, sampleCh)
mockClock := clock.NewMock()
me := newTestEngineWithSampling(ctx, "engine", peerSampleIntervalHalf*2, sampleCh, mockClock)
mockClock.Add(1 * time.Millisecond)
friend := peer.ID("friend")

block := blocks.NewBlock([]byte("foobar"))
Expand All @@ -1061,18 +1064,18 @@ func TestTaggingUseful(t *testing.T) {
if untagged := me.PeerTagger.count(me.Engine.tagUseful); untagged != 0 {
t.Fatalf("%d peers should be untagged but weren't", untagged)
}

mockClock.Add(peerSampleIntervalHalf)
me.Engine.MessageSent(friend, msg)

for j := 0; j < 2; j++ {
<-sampleCh
}
mockClock.Add(peerSampleIntervalHalf)
<-sampleCh

if tagged := me.PeerTagger.count(me.Engine.tagUseful); tagged != 1 {
t.Fatalf("1 peer should be tagged, but %d were", tagged)
}

for j := 0; j < longTermRatio; j++ {
mockClock.Add(peerSampleIntervalHalf * 2)
<-sampleCh
}
}
Expand All @@ -1082,6 +1085,7 @@ func TestTaggingUseful(t *testing.T) {
}

for j := 0; j < longTermRatio; j++ {
mockClock.Add(peerSampleIntervalHalf * 2)
<-sampleCh
}

Expand All @@ -1090,6 +1094,7 @@ func TestTaggingUseful(t *testing.T) {
}

for j := 0; j < longTermRatio; j++ {
mockClock.Add(peerSampleIntervalHalf * 2)
<-sampleCh
}

Expand Down
21 changes: 14 additions & 7 deletions internal/decision/scoreledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"
"time"

"github.com/benbjohnson/clock"
peer "github.com/libp2p/go-libp2p-core/peer"
)

Expand Down Expand Up @@ -55,6 +56,8 @@ type scoreledger struct {

// the record lock
lock sync.RWMutex

clock clock.Clock
}

// Receipt is a summary of the ledger for a given peer
Expand All @@ -73,7 +76,7 @@ func (l *scoreledger) AddToSentBytes(n int) {
l.lock.Lock()
defer l.lock.Unlock()
l.exchangeCount++
l.lastExchange = time.Now()
l.lastExchange = l.clock.Now()
l.bytesSent += uint64(n)
}

Expand All @@ -82,7 +85,7 @@ func (l *scoreledger) AddToReceivedBytes(n int) {
l.lock.Lock()
defer l.lock.Unlock()
l.exchangeCount++
l.lastExchange = time.Now()
l.lastExchange = l.clock.Now()
l.bytesRecv += uint64(n)
}

Expand Down Expand Up @@ -114,6 +117,7 @@ type DefaultScoreLedger struct {
peerSampleInterval time.Duration
// used by the tests to detect when a sample is taken
sampleCh chan struct{}
clock clock.Clock
}

// scoreWorker keeps track of how "useful" our peers are, updating scores in the
Expand All @@ -134,7 +138,7 @@ type DefaultScoreLedger struct {
// adjust it ±25% based on our debt ratio. Peers that have historically been
// more useful to us than we are to them get the highest score.
func (dsl *DefaultScoreLedger) scoreWorker() {
ticker := time.NewTicker(dsl.peerSampleInterval)
ticker := dsl.clock.Ticker(dsl.peerSampleInterval)
defer ticker.Stop()

type update struct {
Expand Down Expand Up @@ -236,9 +240,10 @@ func (dsl *DefaultScoreLedger) find(p peer.ID) *scoreledger {
}

// Returns a new scoreledger.
func newScoreLedger(p peer.ID) *scoreledger {
func newScoreLedger(p peer.ID, clock clock.Clock) *scoreledger {
return &scoreledger{
partner: p,
clock: clock,
}
}

Expand All @@ -255,7 +260,7 @@ func (dsl *DefaultScoreLedger) findOrCreate(p peer.ID) *scoreledger {
defer dsl.lock.Unlock()
l, ok := dsl.ledgerMap[p]
if !ok {
l = newScoreLedger(p)
l = newScoreLedger(p, dsl.clock)
dsl.ledgerMap[p] = l
}
return l
Expand Down Expand Up @@ -315,7 +320,7 @@ func (dsl *DefaultScoreLedger) PeerConnected(p peer.ID) {
defer dsl.lock.Unlock()
_, ok := dsl.ledgerMap[p]
if !ok {
dsl.ledgerMap[p] = newScoreLedger(p)
dsl.ledgerMap[p] = newScoreLedger(p, dsl.clock)
}
}

Expand All @@ -333,14 +338,16 @@ func NewDefaultScoreLedger() *DefaultScoreLedger {
ledgerMap: make(map[peer.ID]*scoreledger),
closing: make(chan struct{}),
peerSampleInterval: shortTerm,
clock: clock.New(),
}
}

// Creates a new instance of the default score ledger with testing
// parameters.
func NewTestScoreLedger(peerSampleInterval time.Duration, sampleCh chan struct{}) *DefaultScoreLedger {
func NewTestScoreLedger(peerSampleInterval time.Duration, sampleCh chan struct{}, clock clock.Clock) *DefaultScoreLedger {
dsl := NewDefaultScoreLedger()
dsl.peerSampleInterval = peerSampleInterval
dsl.sampleCh = sampleCh
dsl.clock = clock
return dsl
}
Loading