Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bitswap) data races #198

Merged
merged 2 commits into from
Oct 24, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ type Blockstore interface {
Put(*blocks.Block) error
}

func NewBlockstore(d ds.Datastore) Blockstore {
func NewBlockstore(d ds.ThreadSafeDatastore) Blockstore {
return &blockstore{
datastore: d,
}
}

type blockstore struct {
datastore ds.Datastore
datastore ds.ThreadSafeDatastore
}

func (bs *blockstore) Get(k u.Key) (*blocks.Block, error) {
Expand Down
7 changes: 4 additions & 3 deletions blockstore/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"testing"

ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blocks "github.com/jbenet/go-ipfs/blocks"
u "github.com/jbenet/go-ipfs/util"
)

// TODO(brian): TestGetReturnsNil

func TestGetWhenKeyNotPresent(t *testing.T) {
bs := NewBlockstore(ds.NewMapDatastore())
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
_, err := bs.Get(u.Key("not present"))

if err != nil {
Expand All @@ -23,7 +24,7 @@ func TestGetWhenKeyNotPresent(t *testing.T) {
}

func TestPutThenGetBlock(t *testing.T) {
bs := NewBlockstore(ds.NewMapDatastore())
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
block := blocks.NewBlock([]byte("some data"))

err := bs.Put(block)
Expand All @@ -46,7 +47,7 @@ func TestValueTypeMismatch(t *testing.T) {
datastore := ds.NewMapDatastore()
datastore.Put(block.Key().DsKey(), "data that isn't a block!")

blockstore := NewBlockstore(datastore)
blockstore := NewBlockstore(ds_sync.MutexWrap(datastore))

_, err := blockstore.Get(block.Key())
if err != ValueTypeMismatch {
Expand Down
2 changes: 1 addition & 1 deletion exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var log = u.Logger("bitswap")
// provided NetMessage service
func NetMessageSession(parent context.Context, p peer.Peer,
net inet.Network, srv inet.Service, directory bsnet.Routing,
d ds.Datastore, nice bool) exchange.Interface {
d ds.ThreadSafeDatastore, nice bool) exchange.Interface {

networkAdapter := bsnet.NetMessageAdapter(srv, net, nil)
bs := &bitswap{
Expand Down
3 changes: 2 additions & 1 deletion exchange/bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blocks "github.com/jbenet/go-ipfs/blocks"
bstore "github.com/jbenet/go-ipfs/blockstore"
exchange "github.com/jbenet/go-ipfs/exchange"
Expand Down Expand Up @@ -279,7 +280,7 @@ func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance {
adapter := net.Adapter(p)
htc := rs.Client(p)

blockstore := bstore.NewBlockstore(ds.NewMapDatastore())
blockstore := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
const alwaysSendToPeer = true
bs := &bitswap{
blockstore: blockstore,
Expand Down
21 changes: 1 addition & 20 deletions exchange/bitswap/strategy/ledger.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package strategy

import (
"sync"
"time"

peer "github.com/jbenet/go-ipfs/peer"
Expand All @@ -21,9 +20,8 @@ func newLedger(p peer.Peer, strategy strategyFunc) *ledger {
}

// ledger stores the data exchange relationship between two peers.
// NOT threadsafe
type ledger struct {
lock sync.RWMutex

// Partner is the remote Peer.
Partner peer.Peer

Expand All @@ -46,48 +44,31 @@ type ledger struct {
}

func (l *ledger) ShouldSend() bool {
l.lock.Lock()
defer l.lock.Unlock()

return l.Strategy(l)
}

func (l *ledger) SentBytes(n int) {
l.lock.Lock()
defer l.lock.Unlock()

l.exchangeCount++
l.lastExchange = time.Now()
l.Accounting.BytesSent += uint64(n)
}

func (l *ledger) ReceivedBytes(n int) {
l.lock.Lock()
defer l.lock.Unlock()

l.exchangeCount++
l.lastExchange = time.Now()
l.Accounting.BytesRecv += uint64(n)
}

// TODO: this needs to be different. We need timeouts.
func (l *ledger) Wants(k u.Key) {
l.lock.Lock()
defer l.lock.Unlock()

l.wantList[k] = struct{}{}
}

func (l *ledger) WantListContains(k u.Key) bool {
l.lock.RLock()
defer l.lock.RUnlock()

_, ok := l.wantList[k]
return ok
}

func (l *ledger) ExchangeCount() uint64 {
l.lock.RLock()
defer l.lock.RUnlock()
return l.exchangeCount
}
22 changes: 0 additions & 22 deletions exchange/bitswap/strategy/ledger_test.go
Original file line number Diff line number Diff line change
@@ -1,23 +1 @@
package strategy

import (
"sync"
"testing"
)

func TestRaceConditions(t *testing.T) {
const numberOfExpectedExchanges = 10000
l := new(ledger)
var wg sync.WaitGroup
for i := 0; i < numberOfExpectedExchanges; i++ {
wg.Add(1)
go func() {
defer wg.Done()
l.ReceivedBytes(1)
}()
}
wg.Wait()
if l.ExchangeCount() != numberOfExpectedExchanges {
t.Fail()
}
}
26 changes: 26 additions & 0 deletions exchange/bitswap/strategy/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package strategy

import (
"errors"
"sync"

bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
"github.com/jbenet/go-ipfs/peer"
Expand All @@ -26,6 +27,7 @@ func New(nice bool) Strategy {
}

type strategist struct {
lock sync.RWMutex
ledgerMap
strategyFunc
}
Expand All @@ -38,6 +40,9 @@ type peerKey u.Key

// Peers returns a list of peers
func (s *strategist) Peers() []peer.Peer {
s.lock.RLock()
defer s.lock.RUnlock()

response := make([]peer.Peer, 0)
for _, ledger := range s.ledgerMap {
response = append(response, ledger.Partner)
Expand All @@ -46,20 +51,32 @@ func (s *strategist) Peers() []peer.Peer {
}

func (s *strategist) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool {
s.lock.RLock()
defer s.lock.RUnlock()

ledger := s.ledger(p)
return ledger.WantListContains(k)
}

func (s *strategist) ShouldSendBlockToPeer(k u.Key, p peer.Peer) bool {
s.lock.RLock()
defer s.lock.RUnlock()

ledger := s.ledger(p)
return ledger.ShouldSend()
}

func (s *strategist) Seed(int64) {
s.lock.Lock()
defer s.lock.Unlock()

// TODO
}

func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
s.lock.Lock()
defer s.lock.Unlock()

// TODO find a more elegant way to handle this check
if p == nil {
return errors.New("Strategy received nil peer")
Expand All @@ -85,6 +102,9 @@ func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error
// send happen atomically

func (s *strategist) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
s.lock.Lock()
defer s.lock.Unlock()

l := s.ledger(p)
for _, block := range m.Blocks() {
l.SentBytes(len(block.Data))
Expand All @@ -96,10 +116,16 @@ func (s *strategist) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
}

func (s *strategist) NumBytesSentTo(p peer.Peer) uint64 {
s.lock.RLock()
defer s.lock.RUnlock()

return s.ledger(p).Accounting.BytesSent
}

func (s *strategist) NumBytesReceivedFrom(p peer.Peer) uint64 {
s.lock.RLock()
defer s.lock.RUnlock()

return s.ledger(p).Accounting.BytesRecv
}

Expand Down