Skip to content

Commit

Permalink
Merge branch 'refactor/bitswap-initialization' into maybebtc-november
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Tiger Chow committed Nov 11, 2014
2 parents 325dc75 + 5cf91cf commit aaba9bb
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 46 deletions.
5 changes: 3 additions & 2 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
diag "github.com/jbenet/go-ipfs/diagnostics"
exchange "github.com/jbenet/go-ipfs/exchange"
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
merkledag "github.com/jbenet/go-ipfs/merkledag"
namesys "github.com/jbenet/go-ipfs/namesys"
inet "github.com/jbenet/go-ipfs/net"
Expand Down Expand Up @@ -147,8 +148,8 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) {

// setup exchange service
const alwaysSendToPeer = true // use YesManStrategy
n.Exchange = bitswap.NetMessageSession(ctx, n.Identity, n.Network, exchangeService, n.Routing, n.Datastore, alwaysSendToPeer)
// ok, this function call is ridiculous o/ consider making it simpler.
bitswapNetwork := bsnet.NewFromIpfsNetwork(exchangeService, n.Network)
n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Routing, n.Datastore, alwaysSendToPeer)

go initConnections(ctx, n.Config, n.Peerstore, dhtRouting)
}
Expand Down
21 changes: 9 additions & 12 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,21 @@ import (
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
inet "github.com/jbenet/go-ipfs/net"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)

var log = u.Logger("bitswap")

// NetMessageSession initializes a BitSwap session that communicates over the
// provided NetMessage service.
// New initializes a BitSwap instance that communicates over the
// provided BitSwapNetwork. This function registers the returned instance as
// the network delegate.
// Runs until context is cancelled
func NetMessageSession(ctx context.Context, p peer.Peer,
net inet.Network, srv inet.Service, directory bsnet.Routing,
func New(ctx context.Context, p peer.Peer,
network bsnet.BitSwapNetwork, routing bsnet.Routing,
d ds.ThreadSafeDatastore, nice bool) exchange.Interface {

networkAdapter := bsnet.NetMessageAdapter(srv, net, nil)

notif := notifications.New()

go func() {
select {
case <-ctx.Done():
Expand All @@ -44,11 +41,11 @@ func NetMessageSession(ctx context.Context, p peer.Peer,
blockstore: blockstore.NewBlockstore(d),
notifications: notif,
strategy: strategy.New(nice),
routing: directory,
sender: networkAdapter,
routing: routing,
sender: network,
wantlist: u.NewKeySet(),
}
networkAdapter.SetDelegate(bs)
network.SetDelegate(bs)

return bs
}
Expand All @@ -57,7 +54,7 @@ func NetMessageSession(ctx context.Context, p peer.Peer,
type bitswap struct {

// sender delivers messages on behalf of the session
sender bsnet.Adapter
sender bsnet.BitSwapNetwork

// blockstore is the local database
// NB: ensure threadsafety
Expand Down
6 changes: 3 additions & 3 deletions exchange/bitswap/network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
u "github.com/jbenet/go-ipfs/util"
)

// Adapter provides network connectivity for BitSwap sessions
type Adapter interface {
// BitSwapNetwork provides network connectivity for BitSwap sessions
type BitSwapNetwork interface {

// DialPeer ensures there is a connection to peer.
DialPeer(context.Context, peer.Peer) error
Expand All @@ -31,6 +31,7 @@ type Adapter interface {
SetDelegate(Receiver)
}

// Implement Receiver to receive messages from the BitSwapNetwork
type Receiver interface {
ReceiveMessage(
ctx context.Context, sender peer.Peer, incoming bsmsg.BitSwapMessage) (
Expand All @@ -39,7 +40,6 @@ type Receiver interface {
ReceiveError(error)
}

// TODO rename -> Router?
type Routing interface {
// FindProvidersAsync returns a channel of providers for the given key
FindProvidersAsync(context.Context, u.Key, int) <-chan peer.Peer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,74 +4,75 @@ import (
"errors"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"github.com/jbenet/go-ipfs/util"

bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
inet "github.com/jbenet/go-ipfs/net"
netmsg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
util "github.com/jbenet/go-ipfs/util"
)

var log = util.Logger("net_message_adapter")
var log = util.Logger("bitswap_network")

// NetMessageAdapter wraps a NetMessage network service
func NetMessageAdapter(s inet.Service, n inet.Network, r Receiver) Adapter {
adapter := impl{
nms: s,
net: n,
receiver: r,
// NewFromIpfsNetwork returns a BitSwapNetwork supported by underlying IPFS
// Dialer & Service
func NewFromIpfsNetwork(s inet.Service, dialer inet.Dialer) BitSwapNetwork {
bitswapNetwork := impl{
service: s,
dialer: dialer,
}
s.SetHandler(&adapter)
return &adapter
s.SetHandler(&bitswapNetwork)
return &bitswapNetwork
}

// implements an Adapter that integrates with a NetMessage network service
// impl transforms the ipfs network interface, which sends and receives
// NetMessage objects, into the bitswap network interface.
type impl struct {
nms inet.Service
net inet.Network
service inet.Service
dialer inet.Dialer

// inbound messages from the network are forwarded to the receiver
receiver Receiver
}

// HandleMessage marshals and unmarshals net messages, forwarding them to the
// BitSwapMessage receiver
func (adapter *impl) HandleMessage(
func (bsnet *impl) HandleMessage(
ctx context.Context, incoming netmsg.NetMessage) netmsg.NetMessage {

if adapter.receiver == nil {
if bsnet.receiver == nil {
return nil
}

received, err := bsmsg.FromNet(incoming)
if err != nil {
go adapter.receiver.ReceiveError(err)
go bsnet.receiver.ReceiveError(err)
return nil
}

p, bsmsg := adapter.receiver.ReceiveMessage(ctx, incoming.Peer(), received)
p, bsmsg := bsnet.receiver.ReceiveMessage(ctx, incoming.Peer(), received)

// TODO(brian): put this in a helper function
if bsmsg == nil || p == nil {
adapter.receiver.ReceiveError(errors.New("ReceiveMessage returned nil peer or message"))
bsnet.receiver.ReceiveError(errors.New("ReceiveMessage returned nil peer or message"))
return nil
}

outgoing, err := bsmsg.ToNet(p)
if err != nil {
go adapter.receiver.ReceiveError(err)
go bsnet.receiver.ReceiveError(err)
return nil
}

log.Debugf("Message size: %d", len(outgoing.Data()))
return outgoing
}

func (adapter *impl) DialPeer(ctx context.Context, p peer.Peer) error {
return adapter.net.DialPeer(ctx, p)
func (bsnet *impl) DialPeer(ctx context.Context, p peer.Peer) error {
return bsnet.dialer.DialPeer(ctx, p)
}

func (adapter *impl) SendMessage(
func (bsnet *impl) SendMessage(
ctx context.Context,
p peer.Peer,
outgoing bsmsg.BitSwapMessage) error {
Expand All @@ -80,10 +81,10 @@ func (adapter *impl) SendMessage(
if err != nil {
return err
}
return adapter.nms.SendMessage(ctx, nmsg)
return bsnet.service.SendMessage(ctx, nmsg)
}

func (adapter *impl) SendRequest(
func (bsnet *impl) SendRequest(
ctx context.Context,
p peer.Peer,
outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) {
Expand All @@ -92,13 +93,13 @@ func (adapter *impl) SendRequest(
if err != nil {
return nil, err
}
incomingMsg, err := adapter.nms.SendRequest(ctx, outgoingMsg)
incomingMsg, err := bsnet.service.SendRequest(ctx, outgoingMsg)
if err != nil {
return nil, err
}
return bsmsg.FromNet(incomingMsg)
}

func (adapter *impl) SetDelegate(r Receiver) {
adapter.receiver = r
func (bsnet *impl) SetDelegate(r Receiver) {
bsnet.receiver = r
}
4 changes: 2 additions & 2 deletions exchange/bitswap/testnet/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type Network interface {
Adapter(peer.Peer) bsnet.Adapter
Adapter(peer.Peer) bsnet.BitSwapNetwork

HasPeer(peer.Peer) bool

Expand Down Expand Up @@ -43,7 +43,7 @@ type network struct {
clients map[util.Key]bsnet.Receiver
}

func (n *network) Adapter(p peer.Peer) bsnet.Adapter {
func (n *network) Adapter(p peer.Peer) bsnet.BitSwapNetwork {
client := &networkClient{
local: p,
network: n,
Expand Down

0 comments on commit aaba9bb

Please sign in to comment.