Skip to content

Commit

Permalink
feat: configurable engine blockstore worker count (ipfs#449)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Nov 18, 2020
1 parent 95cb1a0 commit 47b99b1
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 42 deletions.
51 changes: 37 additions & 14 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package bitswap
import (
"context"
"errors"
"fmt"

"sync"
"time"
Expand Down Expand Up @@ -45,6 +46,9 @@ const (
// these requests take at _least_ two minutes at the moment.
provideTimeout = time.Minute * 3
defaultProvSearchDelay = time.Second

// Number of concurrent workers in decision engine that process requests to the blockstore
defaulEngineBlockstoreWorkerCount = 128
)

var (
Expand Down Expand Up @@ -85,6 +89,17 @@ func RebroadcastDelay(newRebroadcastDelay delay.D) Option {
}
}

// EngineBlockstoreWorkerCount sets the number of worker threads used for
// blockstore operations in the decision engine
func EngineBlockstoreWorkerCount(count int) Option {
if count <= 0 {
panic(fmt.Sprintf("Engine blockstore worker count is %d but must be > 0", count))
}
return func(bs *Bitswap) {
bs.engineBstoreWorkerCount = count
}
}

// SetSendDontHaves indicates what to do when the engine receives a want-block
// for a block that is not in the blockstore. Either
// - Send a DONT_HAVE message
Expand All @@ -99,7 +114,7 @@ func SetSendDontHaves(send bool) Option {
// Configures the engine to use the given score decision logic.
func WithScoreLedger(scoreLedger deciface.ScoreLedger) Option {
return func(bs *Bitswap) {
bs.engine.UseScoreLedger(scoreLedger)
bs.engineScoreLedger = scoreLedger
}
}

Expand Down Expand Up @@ -166,40 +181,42 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
}
notif := notifications.New()
sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self())
engine := decision.NewEngine(ctx, bstore, network.ConnectionManager(), network.Self())

bs := &Bitswap{
blockstore: bstore,
engine: engine,
network: network,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
pm: pm,
pqm: pqm,
sm: sm,
sim: sim,
notif: notif,
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
provideEnabled: true,
provSearchDelay: defaultProvSearchDelay,
rebroadcastDelay: delay.Fixed(time.Minute),
sm: sm,
sim: sim,
notif: notif,
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
provideEnabled: true,
provSearchDelay: defaultProvSearchDelay,
rebroadcastDelay: delay.Fixed(time.Minute),
engineBstoreWorkerCount: defaulEngineBlockstoreWorkerCount,
}

// apply functional options before starting and running bitswap
for _, option := range options {
option(bs)
}

// Set up decision engine
bs.engine = decision.NewEngine(bstore, bs.engineBstoreWorkerCount, network.ConnectionManager(), network.Self(), bs.engineScoreLedger)

bs.pqm.Startup()
network.SetDelegate(bs)

// Start up bitswaps async worker routines
bs.startWorkers(ctx, px)
engine.StartWorkers(ctx, px)
bs.engine.StartWorkers(ctx, px)

// bind the context and process.
// do it over here to avoid closing before all setup is done.
Expand Down Expand Up @@ -270,6 +287,12 @@ type Bitswap struct {

// how often to rebroadcast providing requests to find more optimized providers
rebroadcastDelay delay.D

// how many worker threads to start for decision engine blockstore worker
engineBstoreWorkerCount int

// the score ledger used by the decision engine
engineScoreLedger deciface.ScoreLedger
}

type counters struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/decision/blockstoremanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type blockstoreManager struct {

// newBlockstoreManager creates a new blockstoreManager with the given context
// and number of workers
func newBlockstoreManager(ctx context.Context, bs bstore.Blockstore, workerCount int) *blockstoreManager {
func newBlockstoreManager(bs bstore.Blockstore, workerCount int) *blockstoreManager {
return &blockstoreManager{
bs: bs,
workerCount: workerCount,
Expand Down
10 changes: 5 additions & 5 deletions internal/decision/blockstoremanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestBlockstoreManagerNotFoundKey(t *testing.T) {
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManager(ctx, bstore, 5)
bsm := newBlockstoreManager(bstore, 5)
bsm.start(process.WithTeardown(func() error { return nil }))

cids := testutil.GenerateCids(4)
Expand Down Expand Up @@ -64,7 +64,7 @@ func TestBlockstoreManager(t *testing.T) {
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManager(ctx, bstore, 5)
bsm := newBlockstoreManager(bstore, 5)
bsm.start(process.WithTeardown(func() error { return nil }))

exp := make(map[cid.Cid]blocks.Block)
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestBlockstoreManagerConcurrency(t *testing.T) {
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

workerCount := 5
bsm := newBlockstoreManager(ctx, bstore, workerCount)
bsm := newBlockstoreManager(bstore, workerCount)
bsm.start(process.WithTeardown(func() error { return nil }))

blkSize := int64(8 * 1024)
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestBlockstoreManagerClose(t *testing.T) {
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManager(ctx, bstore, 3)
bsm := newBlockstoreManager(bstore, 3)
px := process.WithTeardown(func() error { return nil })
bsm.start(px)

Expand Down Expand Up @@ -227,7 +227,7 @@ func TestBlockstoreManagerCtxDone(t *testing.T) {
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManager(context.Background(), bstore, 3)
bsm := newBlockstoreManager(bstore, 3)
proc := process.WithTeardown(func() error { return nil })
bsm.start(proc)

Expand Down
19 changes: 5 additions & 14 deletions internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ const (

// Number of concurrent workers that pull tasks off the request queue
taskWorkerCount = 8

// Number of concurrent workers that process requests to the blockstore
blockstoreWorkerCount = 128
)

// Envelope contains a message for a Peer.
Expand Down Expand Up @@ -166,16 +163,16 @@ type Engine struct {

sendDontHaves bool

self peer.ID
self peer.ID
}

// NewEngine creates a new block sending engine for the given block store
func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID) *Engine {
return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock, nil)
func NewEngine(bs bstore.Blockstore, bstoreWorkerCount int, peerTagger PeerTagger, self peer.ID, scoreLedger ScoreLedger) *Engine {
return newEngine(bs, bstoreWorkerCount, peerTagger, self, maxBlockSizeReplaceHasWithBlock, scoreLedger)
}

// This constructor is used by the tests
func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID,
func newEngine(bs bstore.Blockstore, bstoreWorkerCount int, peerTagger PeerTagger, self peer.ID,
maxReplaceSize int, scoreLedger ScoreLedger) *Engine {

if scoreLedger == nil {
Expand All @@ -185,7 +182,7 @@ func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger,
e := &Engine{
ledgerMap: make(map[peer.ID]*ledger),
scoreLedger: scoreLedger,
bsm: newBlockstoreManager(ctx, bs, blockstoreWorkerCount),
bsm: newBlockstoreManager(bs, bstoreWorkerCount),
peerTagger: peerTagger,
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}, 1),
Expand Down Expand Up @@ -215,12 +212,6 @@ func (e *Engine) SetSendDontHaves(send bool) {
e.sendDontHaves = send
}

// Sets the scoreLedger to the given implementation. Should be called
// before StartWorkers().
func (e *Engine) UseScoreLedger(scoreLedger ScoreLedger) {
e.scoreLedger = scoreLedger
}

// Starts the score ledger. Before start the function checks and,
// if it is unset, initializes the scoreLedger with the default
// implementation.
Expand Down
16 changes: 8 additions & 8 deletions internal/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func newTestEngine(ctx context.Context, idStr string) engineSet {
func newTestEngineWithSampling(ctx context.Context, idStr string, peerSampleInterval time.Duration, sampleCh chan struct{}) engineSet {
fpt := &fakePeerTagger{}
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
e := newEngine(ctx, bs, fpt, "localhost", 0, NewTestScoreLedger(peerSampleInterval, sampleCh))
e := newEngine(bs, 4, fpt, "localhost", 0, NewTestScoreLedger(peerSampleInterval, sampleCh))
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
return engineSet{
Peer: peer.ID(idStr),
Expand Down Expand Up @@ -185,7 +185,7 @@ func peerIsPartner(p peer.ID, e *Engine) bool {
func TestOutboxClosedWhenEngineClosed(t *testing.T) {
ctx := context.Background()
t.SkipNow() // TODO implement *Engine.Close
e := newEngine(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -513,7 +513,7 @@ func TestPartnerWantHaveWantBlockNonActive(t *testing.T) {
testCases = onlyTestCases
}

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
for i, testCase := range testCases {
t.Logf("Test case %d:", i)
Expand Down Expand Up @@ -669,7 +669,7 @@ func TestPartnerWantHaveWantBlockActive(t *testing.T) {
testCases = onlyTestCases
}

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

var next envChan
Expand Down Expand Up @@ -854,7 +854,7 @@ func TestPartnerWantsThenCancels(t *testing.T) {
ctx := context.Background()
for i := 0; i < numRounds; i++ {
expected := make([][]string, 0, len(testcases))
e := newEngine(ctx, bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
for _, testcase := range testcases {
set := testcase[0]
Expand All @@ -879,7 +879,7 @@ func TestSendReceivedBlocksToPeersThatWantThem(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

blks := testutil.GenerateBlocksOfSize(4, 8*1024)
Expand Down Expand Up @@ -923,7 +923,7 @@ func TestSendDontHave(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

blks := testutil.GenerateBlocksOfSize(4, 8*1024)
Expand Down Expand Up @@ -987,7 +987,7 @@ func TestWantlistForPeer(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

blks := testutil.GenerateBlocksOfSize(4, 8*1024)
Expand Down

0 comments on commit 47b99b1

Please sign in to comment.