Skip to content

Commit

Permalink
Merge pull request #8061 from ipfs/feat/experimental-dht-client
Browse files Browse the repository at this point in the history
Added support for an experimental DHT client and provider system via the Experiments.AcceleratedDHTClient config option
  • Loading branch information
aschmahmann committed May 14, 2021
2 parents afa9899 + a67cab1 commit 0bd3b6e
Show file tree
Hide file tree
Showing 14 changed files with 358 additions and 57 deletions.
1 change: 1 addition & 0 deletions core/commands/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func TestCommands(t *testing.T) {
"/stats/bitswap",
"/stats/bw",
"/stats/dht",
"/stats/provide",
"/stats/repo",
"/swarm",
"/swarm/addrs",
Expand Down
56 changes: 34 additions & 22 deletions core/commands/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ const (
dhtVerboseOptionName = "verbose"
)

// kademlia extends the routing interface with a command to get the peers closest to the target
type kademlia interface {
routing.Routing
GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error)
}

var queryDhtCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Find the closest Peer IDs to a given Peer ID by querying the DHT.",
Expand All @@ -63,7 +69,7 @@ var queryDhtCmd = &cmds.Command{
return err
}

if nd.DHT == nil {
if nd.DHTClient == nil {
return ErrNotDHT
}

Expand All @@ -73,40 +79,46 @@ var queryDhtCmd = &cmds.Command{
}

ctx, cancel := context.WithCancel(req.Context)
defer cancel()
ctx, events := routing.RegisterForQueryEvents(ctx)

dht := nd.DHT.WAN
if !nd.DHT.WANActive() {
dht = nd.DHT.LAN
client := nd.DHTClient
if client == nd.DHT {
client = nd.DHT.WAN
if !nd.DHT.WANActive() {
client = nd.DHT.LAN
}
}

errCh := make(chan error, 1)
go func() {
defer close(errCh)
defer cancel()
closestPeers, err := dht.GetClosestPeers(ctx, string(id))
if closestPeers != nil {
for p := range closestPeers {
if d, ok := client.(kademlia); !ok {
return fmt.Errorf("dht client does not support GetClosestPeers")
} else {
errCh := make(chan error, 1)
go func() {
defer close(errCh)
defer cancel()
closestPeers, err := d.GetClosestPeers(ctx, string(id))
for _, p := range closestPeers {
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
ID: p,
Type: routing.FinalPeer,
})
}
}

if err != nil {
errCh <- err
return
}
}()
if err != nil {
errCh <- err
return
}
}()

for e := range events {
if err := res.Emit(e); err != nil {
return err
for e := range events {
if err := res.Emit(e); err != nil {
return err
}
}
}

return <-errCh
return <-errCh
}
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error {
Expand Down
1 change: 1 addition & 0 deletions core/commands/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ for your IPFS node.`,
"repo": repoStatCmd,
"bitswap": bitswapStatCmd,
"dht": statDhtCmd,
"provide": statProvideCmd,
},
}

Expand Down
54 changes: 53 additions & 1 deletion core/commands/stat_dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
pstore "github.com/libp2p/go-libp2p-core/peerstore"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
kbucket "github.com/libp2p/go-libp2p-kbucket"
)

Expand Down Expand Up @@ -43,7 +44,8 @@ This interface is not stable and may change from release to release.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("dht", false, true, "The DHT whose table should be listed (wan or lan). Defaults to both."),
cmds.StringArg("dht", false, true, "The DHT whose table should be listed (wanserver, lanserver, wan, lan). "+
"wan and lan refer to client routing tables. When using the experimental DHT client only WAN is supported. Defaults to wan and lan."),
},
Options: []cmds.Option{},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
Expand All @@ -67,12 +69,62 @@ This interface is not stable and may change from release to release.
dhts = []string{"wan", "lan"}
}

dhttypeloop:
for _, name := range dhts {
var dht *dht.IpfsDHT

var separateClient bool
if nd.DHTClient != nd.DHT {
separateClient = true
}

switch name {
case "wan":
if separateClient {
client, ok := nd.DHTClient.(*fullrt.FullRT)
if !ok {
return cmds.Errorf(cmds.ErrClient, "could not generate stats for the WAN DHT client type")
}
peerMap := client.Stat()
buckets := make([]dhtBucket, 1)
b := &dhtBucket{}
for _, p := range peerMap {
info := dhtPeerInfo{ID: p.String()}

if ver, err := nd.Peerstore.Get(p, "AgentVersion"); err == nil {
info.AgentVersion, _ = ver.(string)
} else if err == pstore.ErrNotFound {
// ignore
} else {
// this is a bug, usually.
log.Errorw(
"failed to get agent version from peerstore",
"error", err,
)
}

info.Connected = nd.PeerHost.Network().Connectedness(p) == network.Connected
b.Peers = append(b.Peers, info)
}
buckets[0] = *b

if err := res.Emit(dhtStat{
Name: name,
Buckets: buckets,
}); err != nil {
return err
}
continue dhttypeloop
}
fallthrough
case "wanserver":
dht = nd.DHT.WAN
case "lan":
if separateClient {
return cmds.Errorf(cmds.ErrClient, "no LAN client found")
}
fallthrough
case "lanserver":
dht = nd.DHT.LAN
default:
return cmds.Errorf(cmds.ErrClient, "unknown dht type: %s", name)
Expand Down
51 changes: 51 additions & 0 deletions core/commands/stat_provide.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package commands

import (
"fmt"

cmds "github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/go-ipfs/core/commands/cmdenv"

"github.com/ipfs/go-ipfs-provider/batched"
)

var statProvideCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Returns statistics about the node's (re)provider system.",
ShortDescription: `
Returns statistics about the content the node is advertising.
This interface is not stable and may change from release to release.
`,
},
Arguments: []cmds.Argument{},
Options: []cmds.Option{},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
nd, err := cmdenv.GetNode(env)
if err != nil {
return err
}

if !nd.IsOnline {
return ErrNotOnline
}

sys, ok := nd.Provider.(*batched.BatchProvidingSystem)
if !ok {
return fmt.Errorf("can only return stats if Experimental.AcceleratedDHTClient is enabled")
}

stats, err := sys.Stat(req.Context)
if err != nil {
return err
}

if err := res.Emit(stats); err != nil {
return err
}

return nil
},
Encoders: cmds.EncoderMap{},
Type: batched.BatchedProviderStats{},
}
7 changes: 5 additions & 2 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,11 @@ type IpfsNode struct {

PubSub *pubsub.PubSub `optional:"true"`
PSRouter *psrouter.PubsubValueStore `optional:"true"`
DHT *ddht.DHT `optional:"true"`
P2P *p2p.P2P `optional:"true"`

DHT *ddht.DHT `optional:"true"`
DHTClient routing.Routing `name:"dhtc" optional:"true"`

P2P *p2p.P2P `optional:"true"`

Process goprocess.Process
ctx context.Context
Expand Down
6 changes: 3 additions & 3 deletions core/node/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option {
fx.Provide(libp2p.Security(!bcfg.DisableEncryptedConnections, cfg.Swarm.Transports)),

fx.Provide(libp2p.Routing),
fx.Provide(libp2p.BaseRouting),
fx.Provide(libp2p.BaseRouting(cfg.Experimental.AcceleratedDHTClient)),
maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")),

maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics),
Expand Down Expand Up @@ -275,7 +275,7 @@ func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option {
fx.Provide(p2p.New),

LibP2P(bcfg, cfg),
OnlineProviders(cfg.Experimental.StrategicProviding, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
OnlineProviders(cfg.Experimental.StrategicProviding, cfg.Experimental.AcceleratedDHTClient, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
)
}

Expand All @@ -286,7 +286,7 @@ func Offline(cfg *config.Config) fx.Option {
fx.Provide(DNSResolver),
fx.Provide(Namesys(0)),
fx.Provide(offroute.NewOfflineRouter),
OfflineProviders(cfg.Experimental.StrategicProviding, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
OfflineProviders(cfg.Experimental.StrategicProviding, cfg.Experimental.AcceleratedDHTClient, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
)
}

Expand Down
2 changes: 1 addition & 1 deletion core/node/libp2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type P2PHostOut struct {
fx.Out

Host host.Host
Routing BaseIpfsRouting
Routing routing.Routing `name:"initialrouting"`
}

func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHostOut, err error) {
Expand Down
Loading

0 comments on commit 0bd3b6e

Please sign in to comment.