From 2fd55d198cb2dec4caf1170955a9934cbb3b64e1 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Mon, 5 Apr 2021 12:48:32 -0400 Subject: [PATCH 1/3] integrate experimental AcceleratedDHTClient The experimental AcceleratedDHTClient can be enabled from the config When enabled it modifies the output of the `ipfs stats dht` command. --- core/commands/dht.go | 56 ++++++++++++-------- core/commands/stat_dht.go | 54 ++++++++++++++++++- core/core.go | 7 ++- core/node/groups.go | 2 +- core/node/libp2p/host.go | 2 +- core/node/libp2p/routing.go | 97 +++++++++++++++++++++++++++++----- core/node/libp2p/routingopt.go | 1 - go.mod | 4 +- go.sum | 15 ++++-- 9 files changed, 190 insertions(+), 48 deletions(-) diff --git a/core/commands/dht.go b/core/commands/dht.go index 07136a72379..c481c776a99 100644 --- a/core/commands/dht.go +++ b/core/commands/dht.go @@ -45,6 +45,12 @@ const ( dhtVerboseOptionName = "verbose" ) +// kademlia extends the routing interface with a command to get the peers closest to the target +type kademlia interface { + routing.Routing + GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) +} + var queryDhtCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Find the closest Peer IDs to a given Peer ID by querying the DHT.", @@ -63,7 +69,7 @@ var queryDhtCmd = &cmds.Command{ return err } - if nd.DHT == nil { + if nd.DHTClient == nil { return ErrNotDHT } @@ -73,40 +79,46 @@ var queryDhtCmd = &cmds.Command{ } ctx, cancel := context.WithCancel(req.Context) + defer cancel() ctx, events := routing.RegisterForQueryEvents(ctx) - dht := nd.DHT.WAN - if !nd.DHT.WANActive() { - dht = nd.DHT.LAN + client := nd.DHTClient + if client == nd.DHT { + client = nd.DHT.WAN + if !nd.DHT.WANActive() { + client = nd.DHT.LAN + } } - errCh := make(chan error, 1) - go func() { - defer close(errCh) - defer cancel() - closestPeers, err := dht.GetClosestPeers(ctx, string(id)) - if closestPeers != nil { - for p := range closestPeers { + if d, ok := client.(kademlia); !ok { + return fmt.Errorf("dht client does not support GetClosestPeers") + } else { + errCh := make(chan error, 1) + go func() { + defer close(errCh) + defer cancel() + closestPeers, err := d.GetClosestPeers(ctx, string(id)) + for _, p := range closestPeers { routing.PublishQueryEvent(ctx, &routing.QueryEvent{ ID: p, Type: routing.FinalPeer, }) } - } - if err != nil { - errCh <- err - return - } - }() + if err != nil { + errCh <- err + return + } + }() - for e := range events { - if err := res.Emit(e); err != nil { - return err + for e := range events { + if err := res.Emit(e); err != nil { + return err + } } - } - return <-errCh + return <-errCh + } }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error { diff --git a/core/commands/stat_dht.go b/core/commands/stat_dht.go index a8b5323c507..c76fce2c05e 100644 --- a/core/commands/stat_dht.go +++ b/core/commands/stat_dht.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p-core/network" pstore "github.com/libp2p/go-libp2p-core/peerstore" dht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/libp2p/go-libp2p-kad-dht/fullrt" kbucket "github.com/libp2p/go-libp2p-kbucket" ) @@ -43,7 +44,8 @@ This interface is not stable and may change from release to release. `, }, Arguments: []cmds.Argument{ - cmds.StringArg("dht", false, true, "The DHT whose table should be listed (wan or lan). Defaults to both."), + cmds.StringArg("dht", false, true, "The DHT whose table should be listed (wanserver, lanserver, wan, lan). "+ + "wan and lan refer to client routing tables. When using the experimental DHT client only WAN is supported. Defaults to wan and lan."), }, Options: []cmds.Option{}, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { @@ -67,12 +69,62 @@ This interface is not stable and may change from release to release. dhts = []string{"wan", "lan"} } + dhttypeloop: for _, name := range dhts { var dht *dht.IpfsDHT + + var separateClient bool + if nd.DHTClient != nd.DHT { + separateClient = true + } + switch name { case "wan": + if separateClient { + client, ok := nd.DHTClient.(*fullrt.FullRT) + if !ok { + return cmds.Errorf(cmds.ErrClient, "could not generate stats for the WAN DHT client type") + } + peerMap := client.Stat() + buckets := make([]dhtBucket, 1) + b := &dhtBucket{} + for _, p := range peerMap { + info := dhtPeerInfo{ID: p.String()} + + if ver, err := nd.Peerstore.Get(p, "AgentVersion"); err == nil { + info.AgentVersion, _ = ver.(string) + } else if err == pstore.ErrNotFound { + // ignore + } else { + // this is a bug, usually. + log.Errorw( + "failed to get agent version from peerstore", + "error", err, + ) + } + + info.Connected = nd.PeerHost.Network().Connectedness(p) == network.Connected + b.Peers = append(b.Peers, info) + } + buckets[0] = *b + + if err := res.Emit(dhtStat{ + Name: name, + Buckets: buckets, + }); err != nil { + return err + } + continue dhttypeloop + } + fallthrough + case "wanserver": dht = nd.DHT.WAN case "lan": + if separateClient { + return cmds.Errorf(cmds.ErrClient, "no LAN client found") + } + fallthrough + case "lanserver": dht = nd.DHT.LAN default: return cmds.Errorf(cmds.ErrClient, "unknown dht type: %s", name) diff --git a/core/core.go b/core/core.go index 75fc95ff7d8..49f1185646b 100644 --- a/core/core.go +++ b/core/core.go @@ -98,8 +98,11 @@ type IpfsNode struct { PubSub *pubsub.PubSub `optional:"true"` PSRouter *psrouter.PubsubValueStore `optional:"true"` - DHT *ddht.DHT `optional:"true"` - P2P *p2p.P2P `optional:"true"` + + DHT *ddht.DHT `optional:"true"` + DHTClient routing.Routing `name:"dhtc" optional:"true"` + + P2P *p2p.P2P `optional:"true"` Process goprocess.Process ctx context.Context diff --git a/core/node/groups.go b/core/node/groups.go index 6863043396c..f55b052c380 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -139,7 +139,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { fx.Provide(libp2p.Security(!bcfg.DisableEncryptedConnections, cfg.Swarm.Transports)), fx.Provide(libp2p.Routing), - fx.Provide(libp2p.BaseRouting), + fx.Provide(libp2p.BaseRouting(cfg.Experimental.AcceleratedDHTClient)), maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")), maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics), diff --git a/core/node/libp2p/host.go b/core/node/libp2p/host.go index 4005f0a7e74..04682682b48 100644 --- a/core/node/libp2p/host.go +++ b/core/node/libp2p/host.go @@ -34,7 +34,7 @@ type P2PHostOut struct { fx.Out Host host.Host - Routing BaseIpfsRouting + Routing routing.Routing `name:"initialrouting"` } func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHostOut, err error) { diff --git a/core/node/libp2p/routing.go b/core/node/libp2p/routing.go index 14b8fa40c10..b249611cc49 100644 --- a/core/node/libp2p/routing.go +++ b/core/node/libp2p/routing.go @@ -7,9 +7,12 @@ import ( "github.com/ipfs/go-ipfs/core/node/helpers" + "github.com/ipfs/go-ipfs/repo" host "github.com/libp2p/go-libp2p-core/host" routing "github.com/libp2p/go-libp2p-core/routing" + dht "github.com/libp2p/go-libp2p-kad-dht" ddht "github.com/libp2p/go-libp2p-kad-dht/dual" + "github.com/libp2p/go-libp2p-kad-dht/fullrt" "github.com/libp2p/go-libp2p-pubsub" namesys "github.com/libp2p/go-libp2p-pubsub-router" record "github.com/libp2p/go-libp2p-record" @@ -32,23 +35,89 @@ type p2pRouterOut struct { Router Router `group:"routers"` } -func BaseRouting(lc fx.Lifecycle, in BaseIpfsRouting) (out p2pRouterOut, dr *ddht.DHT) { - if dht, ok := in.(*ddht.DHT); ok { - dr = dht +type processInitialRoutingIn struct { + fx.In + + Router routing.Routing `name:"initialrouting"` + + // For setting up experimental DHT client + Host host.Host + Repo repo.Repo + Validator record.Validator +} + +type processInitialRoutingOut struct { + fx.Out + + Router Router `group:"routers"` + DHT *ddht.DHT + DHTClient routing.Routing `name:"dhtc"` + BaseRT BaseIpfsRouting +} - lc.Append(fx.Hook{ - OnStop: func(ctx context.Context) error { - return dr.Close() +func BaseRouting(experimentalDHTClient bool) interface{} { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, in processInitialRoutingIn) (out processInitialRoutingOut, err error) { + var dr *ddht.DHT + if dht, ok := in.Router.(*ddht.DHT); ok { + dr = dht + + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return dr.Close() + }, + }) + } + + if dr != nil && experimentalDHTClient { + cfg, err := in.Repo.Config() + if err != nil { + return out, err + } + bspeers, err := cfg.BootstrapPeers() + if err != nil { + return out, err + } + + expClient, err := fullrt.NewFullRT(in.Host, + dht.DefaultPrefix, + fullrt.DHTOption( + dht.Validator(in.Validator), + dht.Datastore(in.Repo.Datastore()), + dht.BootstrapPeers(bspeers...), + dht.BucketSize(20), + ), + ) + if err != nil { + return out, err + } + + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return expClient.Close() + }, + }) + + return processInitialRoutingOut{ + Router: Router{ + Routing: expClient, + Priority: 1000, + }, + DHT: dr, + DHTClient: expClient, + BaseRT: expClient, + }, nil + } + + return processInitialRoutingOut{ + Router: Router{ + Priority: 1000, + Routing: in.Router, }, - }) + DHT: dr, + DHTClient: dr, + BaseRT: in.Router, + }, nil } - - return p2pRouterOut{ - Router: Router{ - Priority: 1000, - Routing: in, - }, - }, dr } type p2pOnlineRoutingIn struct { diff --git a/core/node/libp2p/routingopt.go b/core/node/libp2p/routingopt.go index 96bd8be4c26..6d29cb6caf1 100644 --- a/core/node/libp2p/routingopt.go +++ b/core/node/libp2p/routingopt.go @@ -2,7 +2,6 @@ package libp2p import ( "context" - "github.com/ipfs/go-datastore" host "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" diff --git a/go.mod b/go.mod index f90636ae894..21388775322 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( github.com/ipfs/go-ipfs-blockstore v0.1.4 github.com/ipfs/go-ipfs-chunker v0.0.5 github.com/ipfs/go-ipfs-cmds v0.6.0 - github.com/ipfs/go-ipfs-config v0.13.0 + github.com/ipfs/go-ipfs-config v0.14.0 github.com/ipfs/go-ipfs-exchange-interface v0.0.1 github.com/ipfs/go-ipfs-exchange-offline v0.0.1 github.com/ipfs/go-ipfs-files v0.0.8 @@ -65,7 +65,7 @@ require ( github.com/libp2p/go-libp2p-core v0.8.5 github.com/libp2p/go-libp2p-discovery v0.5.0 github.com/libp2p/go-libp2p-http v0.2.0 - github.com/libp2p/go-libp2p-kad-dht v0.11.1 + github.com/libp2p/go-libp2p-kad-dht v0.12.0 github.com/libp2p/go-libp2p-kbucket v0.4.7 github.com/libp2p/go-libp2p-loggables v0.1.0 github.com/libp2p/go-libp2p-mplex v0.4.1 diff --git a/go.sum b/go.sum index b345652a262..b9772aeeef5 100644 --- a/go.sum +++ b/go.sum @@ -271,8 +271,9 @@ github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hf github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY= github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= @@ -411,8 +412,8 @@ github.com/ipfs/go-ipfs-chunker v0.0.5 h1:ojCf7HV/m+uS2vhUGWcogIIxiO5ubl5O57Q7Na github.com/ipfs/go-ipfs-chunker v0.0.5/go.mod h1:jhgdF8vxRHycr00k13FM8Y0E+6BoalYeobXmUyTreP8= github.com/ipfs/go-ipfs-cmds v0.6.0 h1:yAxdowQZzoFKjcLI08sXVNnqVj3jnABbf9smrPQmBsw= github.com/ipfs/go-ipfs-cmds v0.6.0/go.mod h1:ZgYiWVnCk43ChwoH8hAmI1IRbuVtq3GSTHwtRB/Kqhk= -github.com/ipfs/go-ipfs-config v0.13.0 h1:ZH3dTmkVR9TTFBIbfWnFNC1JdwHbj8F0ryiaIFo7U/o= -github.com/ipfs/go-ipfs-config v0.13.0/go.mod h1:Ei/FLgHGTdPyqCPK0oPCwGTe8VSnsjJjx7HZqUb6Ry0= +github.com/ipfs/go-ipfs-config v0.14.0 h1:KijwGU788UycqPWv4GxzyfyN6EtfJjjDRzd/wSA86VU= +github.com/ipfs/go-ipfs-config v0.14.0/go.mod h1:Ei/FLgHGTdPyqCPK0oPCwGTe8VSnsjJjx7HZqUb6Ry0= github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ= github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= @@ -677,8 +678,10 @@ github.com/libp2p/go-libp2p-interface-connmgr v0.0.1/go.mod h1:GarlRLH0LdeWcLnYM github.com/libp2p/go-libp2p-interface-connmgr v0.0.4/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k= github.com/libp2p/go-libp2p-interface-connmgr v0.0.5/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k= github.com/libp2p/go-libp2p-interface-pnet v0.0.1/go.mod h1:el9jHpQAXK5dnTpKA4yfCNBZXvrzdOU75zz+C6ryp3k= -github.com/libp2p/go-libp2p-kad-dht v0.11.1 h1:FsriVQhOUZpCotWIjyFSjEDNJmUzuMma/RyyTDZanwc= github.com/libp2p/go-libp2p-kad-dht v0.11.1/go.mod h1:5ojtR2acDPqh/jXf5orWy8YGb8bHQDS+qeDcoscL/PI= +github.com/libp2p/go-libp2p-kad-dht v0.12.0 h1:R5vvp8kuXjsyDE/HEMKgM8XIwlRsP5BdAZexM+tJxdU= +github.com/libp2p/go-libp2p-kad-dht v0.12.0/go.mod h1:zdQYru1c7dnluMpZls4i9Fj2TwYXS7YyDkJ1Yahv0w0= +github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio= github.com/libp2p/go-libp2p-kbucket v0.4.7 h1:spZAcgxifvFZHBD8tErvppbnNiKA5uokDu3CV7axu70= github.com/libp2p/go-libp2p-kbucket v0.4.7/go.mod h1:XyVo99AfQH0foSf176k4jY1xUJ2+jUJIZCSDm7r2YKk= github.com/libp2p/go-libp2p-loggables v0.0.1/go.mod h1:lDipDlBNYbpyqyPX/KcoO+eq0sJYEVR2JgOexcivchg= @@ -777,6 +780,8 @@ github.com/libp2p/go-libp2p-transport-upgrader v0.3.0/go.mod h1:i+SKzbRnvXdVbU3D github.com/libp2p/go-libp2p-transport-upgrader v0.4.0/go.mod h1:J4ko0ObtZSmgn5BX5AmegP+dK3CSnU2lMCKsSq/EY0s= github.com/libp2p/go-libp2p-transport-upgrader v0.4.2 h1:4JsnbfJzgZeRS9AWN7B9dPqn/LY/HoQTlO9gtdJTIYM= github.com/libp2p/go-libp2p-transport-upgrader v0.4.2/go.mod h1:NR8ne1VwfreD5VIWIU62Agt/J18ekORFU/j1i2y8zvk= +github.com/libp2p/go-libp2p-xor v0.0.0-20200501025846-71e284145d58 h1:GcTNu27BMpOTtMnQqun03+kbtHA1qTxJ/J8cZRRYu2k= +github.com/libp2p/go-libp2p-xor v0.0.0-20200501025846-71e284145d58/go.mod h1:AYjOiqJIdcmI4SXE2ouKQuFrUbE5myv8txWaB2pl4TI= github.com/libp2p/go-libp2p-yamux v0.1.2/go.mod h1:xUoV/RmYkg6BW/qGxA9XJyg+HzXFYkeXbnhjmnYzKp8= github.com/libp2p/go-libp2p-yamux v0.1.3/go.mod h1:VGSQVrqkh6y4nm0189qqxMtvyBft44MOYYPpYKXiVt4= github.com/libp2p/go-libp2p-yamux v0.2.0/go.mod h1:Db2gU+XfLpm6E4rG5uGCFX6uXA8MEXOxFcRoXUODaK8= @@ -1208,6 +1213,7 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX github.com/urfave/cli/v2 v2.0.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= +github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE= github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a h1:G++j5e0OC488te356JvdhaM8YS6nMsjLAYF7JxCv07w= @@ -1312,6 +1318,7 @@ golang.org/x/crypto v0.0.0-20200602180216-279210d13fed/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf h1:B2n+Zi5QeYRDAEodEu72OS36gmTWjgpXr2+cWcBW90o= golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= From cf45f1a8621b965eb89bebaca5574bb8cb53ca43 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 23:56:19 -0400 Subject: [PATCH 2/3] added support for an experimental batched provider system The batched provider system is enabled when the experimental AcceleratedDHTClient is enabled There is also an `ipfs stats provide` command which gives stats about the providing/reproviding system when the batched provider system is enabled --- core/commands/commands_test.go | 1 + core/commands/stat.go | 1 + core/commands/stat_provide.go | 51 +++++++++++++++++++++++++++++ core/node/groups.go | 4 +-- core/node/provider.go | 60 +++++++++++++++++++++++++++++++--- go.mod | 2 +- go.sum | 4 +-- 7 files changed, 114 insertions(+), 9 deletions(-) create mode 100644 core/commands/stat_provide.go diff --git a/core/commands/commands_test.go b/core/commands/commands_test.go index ef106acb393..81f07c01bf4 100644 --- a/core/commands/commands_test.go +++ b/core/commands/commands_test.go @@ -210,6 +210,7 @@ func TestCommands(t *testing.T) { "/stats/bitswap", "/stats/bw", "/stats/dht", + "/stats/provide", "/stats/repo", "/swarm", "/swarm/addrs", diff --git a/core/commands/stat.go b/core/commands/stat.go index cc51d7123a7..e38b5b31b34 100644 --- a/core/commands/stat.go +++ b/core/commands/stat.go @@ -30,6 +30,7 @@ for your IPFS node.`, "repo": repoStatCmd, "bitswap": bitswapStatCmd, "dht": statDhtCmd, + "provide": statProvideCmd, }, } diff --git a/core/commands/stat_provide.go b/core/commands/stat_provide.go new file mode 100644 index 00000000000..ac02c344cd6 --- /dev/null +++ b/core/commands/stat_provide.go @@ -0,0 +1,51 @@ +package commands + +import ( + "fmt" + + cmds "github.com/ipfs/go-ipfs-cmds" + "github.com/ipfs/go-ipfs/core/commands/cmdenv" + + "github.com/ipfs/go-ipfs-provider/batched" +) + +var statProvideCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Returns statistics about the node's (re)provider system.", + ShortDescription: ` +Returns statistics about the content the node is advertising. + +This interface is not stable and may change from release to release. +`, + }, + Arguments: []cmds.Argument{}, + Options: []cmds.Option{}, + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + nd, err := cmdenv.GetNode(env) + if err != nil { + return err + } + + if !nd.IsOnline { + return ErrNotOnline + } + + sys, ok := nd.Provider.(*batched.BatchProvidingSystem) + if !ok { + return fmt.Errorf("can only return stats if Experimental.AcceleratedDHTClient is enabled") + } + + stats, err := sys.Stat(req.Context) + if err != nil { + return err + } + + if err := res.Emit(stats); err != nil { + return err + } + + return nil + }, + Encoders: cmds.EncoderMap{}, + Type: batched.BatchedProviderStats{}, +} diff --git a/core/node/groups.go b/core/node/groups.go index f55b052c380..8d37f84ce13 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -275,7 +275,7 @@ func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option { fx.Provide(p2p.New), LibP2P(bcfg, cfg), - OnlineProviders(cfg.Experimental.StrategicProviding, cfg.Reprovider.Strategy, cfg.Reprovider.Interval), + OnlineProviders(cfg.Experimental.StrategicProviding, cfg.Experimental.AcceleratedDHTClient, cfg.Reprovider.Strategy, cfg.Reprovider.Interval), ) } @@ -286,7 +286,7 @@ func Offline(cfg *config.Config) fx.Option { fx.Provide(DNSResolver), fx.Provide(Namesys(0)), fx.Provide(offroute.NewOfflineRouter), - OfflineProviders(cfg.Experimental.StrategicProviding, cfg.Reprovider.Strategy, cfg.Reprovider.Interval), + OfflineProviders(cfg.Experimental.StrategicProviding, cfg.Experimental.AcceleratedDHTClient, cfg.Reprovider.Strategy, cfg.Reprovider.Interval), ) } diff --git a/core/node/provider.go b/core/node/provider.go index 52d48036efc..e865d2b5fd9 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -7,13 +7,16 @@ import ( "github.com/ipfs/go-ipfs-pinner" "github.com/ipfs/go-ipfs-provider" + "github.com/ipfs/go-ipfs-provider/batched" q "github.com/ipfs/go-ipfs-provider/queue" "github.com/ipfs/go-ipfs-provider/simple" ipld "github.com/ipfs/go-ipld-format" "github.com/libp2p/go-libp2p-core/routing" + "github.com/multiformats/go-multihash" "go.uber.org/fx" "github.com/ipfs/go-ipfs/core/node/helpers" + "github.com/ipfs/go-ipfs/core/node/libp2p" "github.com/ipfs/go-ipfs/repo" ) @@ -59,29 +62,78 @@ func SimpleProviderSys(isOnline bool) interface{} { } } +type provideMany interface { + ProvideMany(ctx context.Context, keys []multihash.Multihash) error + Ready() bool +} + +// BatchedProviderSys creates new provider system +func BatchedProviderSys(isOnline bool, reprovideInterval string) interface{} { + return func(lc fx.Lifecycle, cr libp2p.BaseIpfsRouting, q *q.Queue, keyProvider simple.KeyChanFunc, repo repo.Repo) (provider.System, error) { + r, ok := (cr).(provideMany) + if !ok { + return nil, fmt.Errorf("BatchedProviderSys requires a content router that supports provideMany") + } + + reprovideIntervalDuration := kReprovideFrequency + if reprovideInterval != "" { + dur, err := time.ParseDuration(reprovideInterval) + if err != nil { + return nil, err + } + + reprovideIntervalDuration = dur + } + + sys, err := batched.New(r, q, + batched.ReproviderInterval(reprovideIntervalDuration), + batched.Datastore(repo.Datastore()), + batched.KeyProvider(keyProvider)) + if err != nil { + return nil, err + } + + if isOnline { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + sys.Run() + return nil + }, + OnStop: func(ctx context.Context) error { + return sys.Close() + }, + }) + } + + return sys, nil + } +} + // ONLINE/OFFLINE // OnlineProviders groups units managing provider routing records online -func OnlineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option { +func OnlineProviders(useStrategicProviding bool, useBatchedProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option { if useStrategicProviding { return fx.Provide(provider.NewOfflineProvider) } return fx.Options( SimpleProviders(reprovideStrategy, reprovideInterval), - fx.Provide(SimpleProviderSys(true)), + maybeProvide(SimpleProviderSys(true), !useBatchedProviding), + maybeProvide(BatchedProviderSys(true, reprovideInterval), useBatchedProviding), ) } // OfflineProviders groups units managing provider routing records offline -func OfflineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option { +func OfflineProviders(useStrategicProviding bool, useBatchedProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option { if useStrategicProviding { return fx.Provide(provider.NewOfflineProvider) } return fx.Options( SimpleProviders(reprovideStrategy, reprovideInterval), - fx.Provide(SimpleProviderSys(false)), + maybeProvide(SimpleProviderSys(false), true), + //maybeProvide(BatchedProviderSys(false, reprovideInterval), useBatchedProviding), ) } diff --git a/go.mod b/go.mod index 21388775322..99c39198158 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/ipfs/go-ipfs-keystore v0.0.2 github.com/ipfs/go-ipfs-pinner v0.1.1 github.com/ipfs/go-ipfs-posinfo v0.0.1 - github.com/ipfs/go-ipfs-provider v0.4.3 + github.com/ipfs/go-ipfs-provider v0.4.4-0.20210513014626-1c19caa05024 github.com/ipfs/go-ipfs-routing v0.1.0 github.com/ipfs/go-ipfs-util v0.0.2 github.com/ipfs/go-ipld-cbor v0.0.5 diff --git a/go.sum b/go.sum index b9772aeeef5..49d9cc81d40 100644 --- a/go.sum +++ b/go.sum @@ -436,8 +436,8 @@ github.com/ipfs/go-ipfs-posinfo v0.0.1/go.mod h1:SwyeVP+jCwiDu0C313l/8jg6ZxM0qqt github.com/ipfs/go-ipfs-pq v0.0.1/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY= github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY= github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY= -github.com/ipfs/go-ipfs-provider v0.4.3 h1:k54OHXZcFBkhL6l3GnPS9PfpaLeLqZjVASG1bgfBdfQ= -github.com/ipfs/go-ipfs-provider v0.4.3/go.mod h1:rcQBVqfblDQRk5LaCtf2uxuKxMJxvKmF5pLS0pO4au4= +github.com/ipfs/go-ipfs-provider v0.4.4-0.20210513014626-1c19caa05024 h1:eYfdZ27ogtwfnwKdfphOwcQ7PEOjKqXlWzVOakK0a60= +github.com/ipfs/go-ipfs-provider v0.4.4-0.20210513014626-1c19caa05024/go.mod h1:kUMTf1R8c+KgWUWKTGSZiXCDZWMCkxCX3wyepk0cYEA= github.com/ipfs/go-ipfs-routing v0.0.1/go.mod h1:k76lf20iKFxQTjcJokbPM9iBXVXVZhcOwc360N4nuKs= github.com/ipfs/go-ipfs-routing v0.1.0 h1:gAJTT1cEeeLj6/DlLX6t+NxD9fQe2ymTO6qWRDI/HQQ= github.com/ipfs/go-ipfs-routing v0.1.0/go.mod h1:hYoUkJLyAUKhF58tysKpids8RNDPO42BVMgK5dNsoqY= From a67cab1c60bc3165d7492355863b6dd847c4fb14 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 13 May 2021 01:05:04 -0400 Subject: [PATCH 3/3] docs: added experimental features documentation on the Accelerated DHT Client and batched providing system --- docs/experimental-features.md | 54 +++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/docs/experimental-features.md b/docs/experimental-features.md index 600bc1a0154..9b0bffa1e0f 100644 --- a/docs/experimental-features.md +++ b/docs/experimental-features.md @@ -26,6 +26,7 @@ the above issue. - [Strategic Providing](#strategic-providing) - [Graphsync](#graphsync) - [Noise](#noise) +- [Accelerated DHT Client](#accelerated-dht-client) --- @@ -547,3 +548,56 @@ ipfs config --json Experimental.GraphsyncEnabled true Stable, enabled by default [Noise](https://github.com/libp2p/specs/tree/master/noise) libp2p transport based on the [Noise Protocol Framework](https://noiseprotocol.org/noise.html). While TLS remains the default transport in go-ipfs, Noise is easier to implement and is thus the "interop" transport between IPFS and libp2p implementations. + +## Accelerated DHT Client + +### In Version + +0.9.0 + +### State + +Experimental, default-disabled. + +Utilizes an alternative DHT client that searches for and maintains more information about the network +in exchange for being more performant. + +When it is enabled: +- DHT operations should complete much faster than with it disabled +- A batching reprovider system will be enabled which takes advantage of some properties of the experimental client to + very efficiently put provider records into the network +- The standard DHT client (and server if enabled) are run alongside the alternative client +- The operations `ipfs stats dht` and `ipfs stats provide` will have different outputs + - `ipfs stats provide` only works when the accelerated DHT client is enabled and shows various statistics regarding + the provider/reprovider system + - `ipfs stats dht` will default to showing information about the new client + +**Caveats:** +1. Running the experimental client likely will result in more resource consumption (connections, RAM, CPU, bandwidth) + - Users that are limited in the number of parallel connections their machines/networks can perform will likely suffer + - Currently, the resource usage is not smooth as the client crawls the network in rounds and reproviding is similarly + done in rounds + - Users who previously had a lot of content but were unable to advertise it on the network will see an increase in + egress bandwidth as their nodes start to advertise all of their CIDs into the network. If you have lots of data + entering your node that you don't want to advertise consider using [Reprovider Strategies](config.md#reproviderstrategy) + to reduce the number of CIDs that you are reproviding. Similarly, if you are running a node that deals mostly with + short-lived temporary data (e.g. you use a separate node for ingesting data then for storing and serving it) then + you may benefit from using [Strategic Providing](#strategic-providing) to prevent advertising of data that you + ultimately will not have. +2. Currently, the DHT is not usable for queries for the first 5-10 minutes of operation as the routing table is being +prepared. This means operations like searching the DHT for particular peers or content will not work + - You can see if the DHT has been initially populated by running `ipfs stats dht` +3. Currently, the accelerated DHT client is not compatible with LAN-based DHTs and will not perform operations against +them + +### How to enable + +``` +ipfs config --json Experimental.AcceleratedDHTClient true +``` + +### Road to being a real feature + +- [ ] Needs more people to use and report on how well it works +- [ ] Should be usable for queries (even if slower/less efficient) shortly after startup +- [ ] Should be usable with non-WAN DHTs \ No newline at end of file