From 715f5f4a19fd9250309d1333cf3f0905603bd507 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Fri, 24 Oct 2014 16:14:12 -0700 Subject: [PATCH 1/2] fix(blockstore, bitswap) enforce threadsafety in blockstore fixes data race detected in a testnet test --- blockstore/blockstore.go | 4 ++-- blockstore/blockstore_test.go | 7 ++++--- exchange/bitswap/bitswap.go | 2 +- exchange/bitswap/bitswap_test.go | 3 ++- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/blockstore/blockstore.go b/blockstore/blockstore.go index 99892b24bfe..cadd2080266 100644 --- a/blockstore/blockstore.go +++ b/blockstore/blockstore.go @@ -16,14 +16,14 @@ type Blockstore interface { Put(*blocks.Block) error } -func NewBlockstore(d ds.Datastore) Blockstore { +func NewBlockstore(d ds.ThreadSafeDatastore) Blockstore { return &blockstore{ datastore: d, } } type blockstore struct { - datastore ds.Datastore + datastore ds.ThreadSafeDatastore } func (bs *blockstore) Get(k u.Key) (*blocks.Block, error) { diff --git a/blockstore/blockstore_test.go b/blockstore/blockstore_test.go index 8daed5f3de7..00edf61abf2 100644 --- a/blockstore/blockstore_test.go +++ b/blockstore/blockstore_test.go @@ -5,6 +5,7 @@ import ( "testing" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" blocks "github.com/jbenet/go-ipfs/blocks" u "github.com/jbenet/go-ipfs/util" ) @@ -12,7 +13,7 @@ import ( // TODO(brian): TestGetReturnsNil func TestGetWhenKeyNotPresent(t *testing.T) { - bs := NewBlockstore(ds.NewMapDatastore()) + bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) _, err := bs.Get(u.Key("not present")) if err != nil { @@ -23,7 +24,7 @@ func TestGetWhenKeyNotPresent(t *testing.T) { } func TestPutThenGetBlock(t *testing.T) { - bs := NewBlockstore(ds.NewMapDatastore()) + bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) block := blocks.NewBlock([]byte("some data")) err := bs.Put(block) @@ -46,7 +47,7 @@ func TestValueTypeMismatch(t *testing.T) { datastore := ds.NewMapDatastore() datastore.Put(block.Key().DsKey(), "data that isn't a block!") - blockstore := NewBlockstore(datastore) + blockstore := NewBlockstore(ds_sync.MutexWrap(datastore)) _, err := blockstore.Get(block.Key()) if err != ValueTypeMismatch { diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 19ee6e2fc77..89ddbc821bb 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -22,7 +22,7 @@ var log = u.Logger("bitswap") // provided NetMessage service func NetMessageSession(parent context.Context, p peer.Peer, net inet.Network, srv inet.Service, directory bsnet.Routing, - d ds.Datastore, nice bool) exchange.Interface { + d ds.ThreadSafeDatastore, nice bool) exchange.Interface { networkAdapter := bsnet.NetMessageAdapter(srv, net, nil) bs := &bitswap{ diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 4c881a04e94..f34ea3c844e 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -9,6 +9,7 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" blocks "github.com/jbenet/go-ipfs/blocks" bstore "github.com/jbenet/go-ipfs/blockstore" exchange "github.com/jbenet/go-ipfs/exchange" @@ -279,7 +280,7 @@ func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance { adapter := net.Adapter(p) htc := rs.Client(p) - blockstore := bstore.NewBlockstore(ds.NewMapDatastore()) + blockstore := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) const alwaysSendToPeer = true bs := &bitswap{ blockstore: blockstore, From c848202c7d7c77a3e7ee5c396d3b748d311e8d00 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Fri, 24 Oct 2014 16:15:48 -0700 Subject: [PATCH 2/2] fix(bitswap) move mutex up to strategy from ledger addresses concurrent access in bitswap session --- exchange/bitswap/strategy/ledger.go | 21 +------------------ exchange/bitswap/strategy/ledger_test.go | 22 -------------------- exchange/bitswap/strategy/strategy.go | 26 ++++++++++++++++++++++++ 3 files changed, 27 insertions(+), 42 deletions(-) diff --git a/exchange/bitswap/strategy/ledger.go b/exchange/bitswap/strategy/ledger.go index 3700c1f4350..9f33b1aba43 100644 --- a/exchange/bitswap/strategy/ledger.go +++ b/exchange/bitswap/strategy/ledger.go @@ -1,7 +1,6 @@ package strategy import ( - "sync" "time" peer "github.com/jbenet/go-ipfs/peer" @@ -21,9 +20,8 @@ func newLedger(p peer.Peer, strategy strategyFunc) *ledger { } // ledger stores the data exchange relationship between two peers. +// NOT threadsafe type ledger struct { - lock sync.RWMutex - // Partner is the remote Peer. Partner peer.Peer @@ -46,25 +44,16 @@ type ledger struct { } func (l *ledger) ShouldSend() bool { - l.lock.Lock() - defer l.lock.Unlock() - return l.Strategy(l) } func (l *ledger) SentBytes(n int) { - l.lock.Lock() - defer l.lock.Unlock() - l.exchangeCount++ l.lastExchange = time.Now() l.Accounting.BytesSent += uint64(n) } func (l *ledger) ReceivedBytes(n int) { - l.lock.Lock() - defer l.lock.Unlock() - l.exchangeCount++ l.lastExchange = time.Now() l.Accounting.BytesRecv += uint64(n) @@ -72,22 +61,14 @@ func (l *ledger) ReceivedBytes(n int) { // TODO: this needs to be different. We need timeouts. func (l *ledger) Wants(k u.Key) { - l.lock.Lock() - defer l.lock.Unlock() - l.wantList[k] = struct{}{} } func (l *ledger) WantListContains(k u.Key) bool { - l.lock.RLock() - defer l.lock.RUnlock() - _, ok := l.wantList[k] return ok } func (l *ledger) ExchangeCount() uint64 { - l.lock.RLock() - defer l.lock.RUnlock() return l.exchangeCount } diff --git a/exchange/bitswap/strategy/ledger_test.go b/exchange/bitswap/strategy/ledger_test.go index 0fdfae0ccc0..4271d525c20 100644 --- a/exchange/bitswap/strategy/ledger_test.go +++ b/exchange/bitswap/strategy/ledger_test.go @@ -1,23 +1 @@ package strategy - -import ( - "sync" - "testing" -) - -func TestRaceConditions(t *testing.T) { - const numberOfExpectedExchanges = 10000 - l := new(ledger) - var wg sync.WaitGroup - for i := 0; i < numberOfExpectedExchanges; i++ { - wg.Add(1) - go func() { - defer wg.Done() - l.ReceivedBytes(1) - }() - } - wg.Wait() - if l.ExchangeCount() != numberOfExpectedExchanges { - t.Fail() - } -} diff --git a/exchange/bitswap/strategy/strategy.go b/exchange/bitswap/strategy/strategy.go index 399d7777ba8..42cbe777392 100644 --- a/exchange/bitswap/strategy/strategy.go +++ b/exchange/bitswap/strategy/strategy.go @@ -2,6 +2,7 @@ package strategy import ( "errors" + "sync" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" "github.com/jbenet/go-ipfs/peer" @@ -26,6 +27,7 @@ func New(nice bool) Strategy { } type strategist struct { + lock sync.RWMutex ledgerMap strategyFunc } @@ -38,6 +40,9 @@ type peerKey u.Key // Peers returns a list of peers func (s *strategist) Peers() []peer.Peer { + s.lock.RLock() + defer s.lock.RUnlock() + response := make([]peer.Peer, 0) for _, ledger := range s.ledgerMap { response = append(response, ledger.Partner) @@ -46,20 +51,32 @@ func (s *strategist) Peers() []peer.Peer { } func (s *strategist) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool { + s.lock.RLock() + defer s.lock.RUnlock() + ledger := s.ledger(p) return ledger.WantListContains(k) } func (s *strategist) ShouldSendBlockToPeer(k u.Key, p peer.Peer) bool { + s.lock.RLock() + defer s.lock.RUnlock() + ledger := s.ledger(p) return ledger.ShouldSend() } func (s *strategist) Seed(int64) { + s.lock.Lock() + defer s.lock.Unlock() + // TODO } func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error { + s.lock.Lock() + defer s.lock.Unlock() + // TODO find a more elegant way to handle this check if p == nil { return errors.New("Strategy received nil peer") @@ -85,6 +102,9 @@ func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error // send happen atomically func (s *strategist) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error { + s.lock.Lock() + defer s.lock.Unlock() + l := s.ledger(p) for _, block := range m.Blocks() { l.SentBytes(len(block.Data)) @@ -96,10 +116,16 @@ func (s *strategist) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error { } func (s *strategist) NumBytesSentTo(p peer.Peer) uint64 { + s.lock.RLock() + defer s.lock.RUnlock() + return s.ledger(p).Accounting.BytesSent } func (s *strategist) NumBytesReceivedFrom(p peer.Peer) uint64 { + s.lock.RLock() + defer s.lock.RUnlock() + return s.ledger(p).Accounting.BytesRecv }