Skip to content

Commit

Permalink
Rename IpfsDHT to KadDHT
Browse files Browse the repository at this point in the history
  • Loading branch information
jhert0 committed Jan 12, 2021
1 parent 03d4b62 commit e9e07e0
Show file tree
Hide file tree
Showing 17 changed files with 142 additions and 139 deletions.
95 changes: 49 additions & 46 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,12 @@ type addPeerRTReq struct {
queryPeer bool
}

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// IpfsDHT is an alias for KadDHT.
type IpfsDHT KadDHT

// KadDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
type KadDHT struct {
host host.Host // the network services we need
self peer.ID // Local peer (yourself)
selfKey kb.ID
Expand Down Expand Up @@ -151,18 +154,18 @@ type IpfsDHT struct {
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
_ routing.ContentRouting = (*IpfsDHT)(nil)
_ routing.Routing = (*IpfsDHT)(nil)
_ routing.PeerRouting = (*IpfsDHT)(nil)
_ routing.PubKeyFetcher = (*IpfsDHT)(nil)
_ routing.ValueStore = (*IpfsDHT)(nil)
_ routing.ContentRouting = (*KadDHT)(nil)
_ routing.Routing = (*KadDHT)(nil)
_ routing.PeerRouting = (*KadDHT)(nil)
_ routing.PubKeyFetcher = (*KadDHT)(nil)
_ routing.ValueStore = (*KadDHT)(nil)
)

// New creates a new DHT with the specified host and options.
// Please note that being connected to a DHT peer does not necessarily imply that it's also in the DHT Routing Table.
// If the Routing Table has more than "minRTRefreshThreshold" peers, we consider a peer as a Routing Table candidate ONLY when
// we successfully get a query response from it OR if it send us a query.
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
func New(ctx context.Context, h host.Host, options ...Option) (*KadDHT, error) {
var cfg config
if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
return nil, err
Expand Down Expand Up @@ -244,9 +247,9 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
}

// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
// KadDHT's initialized with this function will respond to DHT requests,
// whereas KadDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *KadDHT {
dht, err := New(ctx, h, Datastore(dstore))
if err != nil {
panic(err)
Expand All @@ -255,17 +258,17 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
}

// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// host. KadDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *KadDHT {
dht, err := New(ctx, h, Datastore(dstore), Mode(ModeClient))
if err != nil {
panic(err)
}
return dht
}

func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
func makeDHT(ctx context.Context, h host.Host, cfg config) (*KadDHT, error) {
var protocols, serverProtocols []protocol.ID

v1proto := cfg.protocolPrefix + kad1
Expand All @@ -277,7 +280,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
protocols = []protocol.ID{v1proto}
serverProtocols = []protocol.ID{v1proto}

dht := &IpfsDHT{
dht := &KadDHT{
datastore: cfg.datastore,
self: h.ID(),
selfKey: kb.ConvertPeerID(h.ID()),
Expand Down Expand Up @@ -351,7 +354,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
return dht, nil
}

func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
func makeRtRefreshManager(dht *KadDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
keyGenFnc := func(cpl uint) (string, error) {
p, err := dht.routingTable.GenRandPeerID(cpl)
return string(p), err
Expand All @@ -374,7 +377,7 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThr
return r, err
}

func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) {
func makeRoutingTable(dht *KadDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) {
// make a Routing Table Diversity Filter
var filter *peerdiversity.Filter
if dht.rtPeerDiversityFilter != nil {
Expand Down Expand Up @@ -416,16 +419,16 @@ func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThresho
}

// GetRoutingTableDiversityStats returns the diversity stats for the Routing Table.
func (dht *IpfsDHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats {
func (dht *KadDHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats {
return dht.routingTable.GetDiversityStats()
}

// Mode allows introspection of the operation mode of the DHT
func (dht *IpfsDHT) Mode() ModeOpt {
func (dht *KadDHT) Mode() ModeOpt {
return dht.auto
}

func (dht *IpfsDHT) populatePeers(_ goprocess.Process) {
func (dht *KadDHT) populatePeers(_ goprocess.Process) {
if !dht.disableFixLowPeers {
dht.fixLowPeers(dht.ctx)
}
Expand All @@ -442,7 +445,7 @@ func (dht *IpfsDHT) populatePeers(_ goprocess.Process) {
}

// fixLowPeersRouting manages simultaneous requests to fixLowPeers
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
func (dht *KadDHT) fixLowPeersRoutine(proc goprocess.Process) {
ticker := time.NewTicker(periodicBootstrapInterval)
defer ticker.Stop()

Expand All @@ -460,7 +463,7 @@ func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
}

// fixLowPeers tries to get more peers into the routing table if we're below the threshold
func (dht *IpfsDHT) fixLowPeers(ctx context.Context) {
func (dht *KadDHT) fixLowPeers(ctx context.Context) {
if dht.routingTable.Size() > minRTRefreshThreshold {
return
}
Expand Down Expand Up @@ -519,7 +522,7 @@ func (dht *IpfsDHT) fixLowPeers(ctx context.Context) {

// TODO This is hacky, horrible and the programmer needs to have his mother called a hamster.
// SHOULD be removed once https://github.com/libp2p/go-libp2p/issues/800 goes in.
func (dht *IpfsDHT) persistRTPeersInPeerStore() {
func (dht *KadDHT) persistRTPeersInPeerStore() {
tickr := time.NewTicker(peerstore.RecentlyConnectedAddrTTL / 3)
defer tickr.Stop()

Expand All @@ -540,7 +543,7 @@ func (dht *IpfsDHT) persistRTPeersInPeerStore() {
//
// returns nil, nil when either nothing is found or the value found doesn't properly validate.
// returns nil, some_error when there's a *datastore* error (i.e., something goes very wrong)
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
func (dht *KadDHT) getLocal(key string) (*recpb.Record, error) {
logger.Debugw("finding value in datastore", "key", internal.LoggableRecordKeyString(key))

rec, err := dht.getRecordFromDatastore(mkDsKey(key))
Expand All @@ -559,7 +562,7 @@ func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
}

// putLocal stores the key value pair in the datastore
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
func (dht *KadDHT) putLocal(key string, rec *recpb.Record) error {
data, err := proto.Marshal(rec)
if err != nil {
logger.Warnw("failed to put marshal record for local put", "error", err, "key", internal.LoggableRecordKeyString(key))
Expand All @@ -569,7 +572,7 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
return dht.datastore.Put(mkDsKey(key), data)
}

func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
func (dht *KadDHT) rtPeerLoop(proc goprocess.Process) {
bootstrapCount := 0
isBootsrapping := false
var timerCh <-chan time.Time
Expand Down Expand Up @@ -626,7 +629,7 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
// LastUsefulAt remains unchanged
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
// Do Nothing.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
func (dht *KadDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
c.Write(zap.String("peer", p.String()))
}
Expand All @@ -643,22 +646,22 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
}

// peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore.
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
func (dht *KadDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
logger.Debugw("peer stopped dht", "peer", p)
// A peer that does not support the DHT protocol is dead for us.
// There's no point in talking to anymore till it starts supporting the DHT protocol again.
dht.routingTable.RemovePeer(p)
}

func (dht *IpfsDHT) fixRTIfNeeded() {
func (dht *KadDHT) fixRTIfNeeded() {
select {
case dht.fixLowPeersChan <- struct{}{}:
default:
}
}

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
func (dht *KadDHT) FindLocal(id peer.ID) peer.AddrInfo {
switch dht.host.Network().Connectedness(id) {
case network.Connected, network.CanConnect:
return dht.peerstore.PeerInfo(id)
Expand All @@ -668,13 +671,13 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
}

// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
func (dht *KadDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
return closer
}

// betterPeersToQuery returns nearestPeersToQuery with some additional filtering
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID {
func (dht *KadDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID {
closer := dht.nearestPeersToQuery(pmes, count)

// no node? nil
Expand Down Expand Up @@ -703,7 +706,7 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int
return filtered
}

func (dht *IpfsDHT) setMode(m mode) error {
func (dht *KadDHT) setMode(m mode) error {
dht.modeLk.Lock()
defer dht.modeLk.Unlock()

Expand All @@ -724,7 +727,7 @@ func (dht *IpfsDHT) setMode(m mode) error {
// moveToServerMode advertises (via libp2p identify updates) that we are able to respond to DHT queries and sets the appropriate stream handlers.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
func (dht *IpfsDHT) moveToServerMode() error {
func (dht *KadDHT) moveToServerMode() error {
dht.mode = modeServer
for _, p := range dht.serverProtocols {
dht.host.SetStreamHandler(p, dht.handleNewStream)
Expand All @@ -737,7 +740,7 @@ func (dht *IpfsDHT) moveToServerMode() error {
// utilizing the handled protocols.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
func (dht *IpfsDHT) moveToClientMode() error {
func (dht *KadDHT) moveToClientMode() error {
dht.mode = modeClient
for _, p := range dht.serverProtocols {
dht.host.RemoveStreamHandler(p)
Expand All @@ -760,29 +763,29 @@ func (dht *IpfsDHT) moveToClientMode() error {
return nil
}

func (dht *IpfsDHT) getMode() mode {
func (dht *KadDHT) getMode() mode {
dht.modeLk.Lock()
defer dht.modeLk.Unlock()
return dht.mode
}

// Context returns the DHT's context.
func (dht *IpfsDHT) Context() context.Context {
func (dht *KadDHT) Context() context.Context {
return dht.ctx
}

// Process returns the DHT's process.
func (dht *IpfsDHT) Process() goprocess.Process {
func (dht *KadDHT) Process() goprocess.Process {
return dht.proc
}

// RoutingTable returns the DHT's routingTable.
func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {
func (dht *KadDHT) RoutingTable() *kb.RoutingTable {
return dht.routingTable
}

// Close calls Process Close.
func (dht *IpfsDHT) Close() error {
func (dht *KadDHT) Close() error {
return dht.proc.Close()
}

Expand All @@ -791,29 +794,29 @@ func mkDsKey(s string) ds.Key {
}

// PeerID returns the DHT node's Peer ID.
func (dht *IpfsDHT) PeerID() peer.ID {
func (dht *KadDHT) PeerID() peer.ID {
return dht.self
}

// PeerKey returns a DHT key, converted from the DHT node's Peer ID.
func (dht *IpfsDHT) PeerKey() []byte {
func (dht *KadDHT) PeerKey() []byte {
return kb.ConvertPeerID(dht.self)
}

// Host returns the libp2p host this DHT is operating with.
func (dht *IpfsDHT) Host() host.Host {
func (dht *KadDHT) Host() host.Host {
return dht.host
}

// Ping sends a ping message to the passed peer and waits for a response.
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
func (dht *KadDHT) Ping(ctx context.Context, p peer.ID) error {
return dht.protoMessenger.Ping(ctx, p)
}

// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
func (dht *KadDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
extraTags = append(
extraTags,
tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
Expand All @@ -826,7 +829,7 @@ func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...ta
return ctx
}

func (dht *IpfsDHT) maybeAddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
func (dht *KadDHT) maybeAddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
// Don't add addresses for self or our connected peers. We have better ones.
if p == dht.self || dht.host.Network().Connectedness(p) == network.Connected {
return
Expand Down
6 changes: 3 additions & 3 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func GetDefaultBootstrapPeerAddrInfos() []peer.AddrInfo {

// Bootstrap tells the DHT to get into a bootstrapped state satisfying the
// IpfsRouter interface.
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
func (dht *KadDHT) Bootstrap(ctx context.Context) error {
dht.fixRTIfNeeded()
dht.rtRefreshManager.RefreshNoWait()
return nil
Expand All @@ -67,7 +67,7 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
//
// The returned channel will block until the refresh finishes, then yield the
// error and close. The channel is buffered and safe to ignore.
func (dht *IpfsDHT) RefreshRoutingTable() <-chan error {
func (dht *KadDHT) RefreshRoutingTable() <-chan error {
return dht.rtRefreshManager.Refresh(false)
}

Expand All @@ -76,6 +76,6 @@ func (dht *IpfsDHT) RefreshRoutingTable() <-chan error {
//
// The returned channel will block until the refresh finishes, then yield the
// error and close. The channel is buffered and safe to ignore.
func (dht *IpfsDHT) ForceRefresh() <-chan error {
func (dht *KadDHT) ForceRefresh() <-chan error {
return dht.rtRefreshManager.Refresh(true)
}
Loading

0 comments on commit e9e07e0

Please sign in to comment.