Skip to content

Commit

Permalink
Change IpfsDHT to KadDHT
Browse files Browse the repository at this point in the history
  • Loading branch information
jhert0 committed Dec 29, 2020
1 parent 09d923f commit 0f9a370
Show file tree
Hide file tree
Showing 17 changed files with 151 additions and 149 deletions.
104 changes: 53 additions & 51 deletions dht.go

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func GetDefaultBootstrapPeerAddrInfos() []peer.AddrInfo {

// Bootstrap tells the DHT to get into a bootstrapped state satisfying the
// IpfsRouter interface.
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
func (dht *KadDHT) Bootstrap(ctx context.Context) error {
dht.fixRTIfNeeded()
dht.rtRefreshManager.RefreshNoWait()
return nil
Expand All @@ -67,7 +67,7 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
//
// The returned channel will block until the refresh finishes, then yield the
// error and close. The channel is buffered and safe to ignore.
func (dht *IpfsDHT) RefreshRoutingTable() <-chan error {
func (dht *KadDHT) RefreshRoutingTable() <-chan error {
return dht.rtRefreshManager.Refresh(false)
}

Expand All @@ -76,6 +76,6 @@ func (dht *IpfsDHT) RefreshRoutingTable() <-chan error {
//
// The returned channel will block until the refresh finishes, then yield the
// error and close. The channel is buffered and safe to ignore.
func (dht *IpfsDHT) ForceRefresh() <-chan error {
func (dht *KadDHT) ForceRefresh() <-chan error {
return dht.rtRefreshManager.Refresh(true)
}
16 changes: 8 additions & 8 deletions dht_bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestSelfWalkOnAddressChange(t *testing.T) {
d2 := setupDHT(ctx, t, false, DisableAutoRefresh())
d3 := setupDHT(ctx, t, false, DisableAutoRefresh())

var connectedTo *IpfsDHT
var connectedTo *KadDHT
// connect d1 to whoever is "further"
if kb.CommonPrefixLen(kb.ConvertPeerID(d1.self), kb.ConvertPeerID(d2.self)) <=
kb.CommonPrefixLen(kb.ConvertPeerID(d1.self), kb.ConvertPeerID(d3.self)) {
Expand All @@ -34,14 +34,14 @@ func TestSelfWalkOnAddressChange(t *testing.T) {
connect(t, ctx, d2, d3)

// d1 should have ONLY 1 peer in it's RT
waitForWellFormedTables(t, []*IpfsDHT{d1}, 1, 1, 2*time.Second)
waitForWellFormedTables(t, []*KadDHT{d1}, 1, 1, 2*time.Second)
require.Equal(t, connectedTo.self, d1.routingTable.ListPeers()[0])

// now emit the address change event
em, err := d1.host.EventBus().Emitter(&event.EvtLocalAddressesUpdated{})
require.NoError(t, err)
require.NoError(t, em.Emit(event.EvtLocalAddressesUpdated{}))
waitForWellFormedTables(t, []*IpfsDHT{d1}, 2, 2, 2*time.Second)
waitForWellFormedTables(t, []*KadDHT{d1}, 2, 2, 2*time.Second)
// it should now have both peers in the RT
ps := d1.routingTable.ListPeers()
require.Contains(t, ps, d2.self)
Expand Down Expand Up @@ -80,8 +80,8 @@ func TestBootstrappersReplacable(t *testing.T) {
defer d.host.Close()
defer d.Close()

var d1 *IpfsDHT
var d2 *IpfsDHT
var d1 *KadDHT
var d2 *KadDHT

// d1 & d2 have a cpl of 0
for {
Expand All @@ -108,8 +108,8 @@ func TestBootstrappersReplacable(t *testing.T) {
require.Len(t, d.routingTable.ListPeers(), 2)

// d3 & d4 with cpl=0 will go in as d1 & d2 are replacable.
var d3 *IpfsDHT
var d4 *IpfsDHT
var d3 *KadDHT
var d4 *KadDHT

for {
d3 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestBootstrappersReplacable(t *testing.T) {
time.Sleep(1 * time.Second)

// adding d5 fails because RT is frozen
var d5 *IpfsDHT
var d5 *KadDHT
for {
d5 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d5.selfKey) == 0 {
Expand Down
12 changes: 6 additions & 6 deletions dht_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import (
)

// QueryFilterFunc is a filter applied when considering peers to dial when querying
type QueryFilterFunc func(dht *IpfsDHT, ai peer.AddrInfo) bool
type QueryFilterFunc func(dht *KadDHT, ai peer.AddrInfo) bool

// RouteTableFilterFunc is a filter applied when considering connections to keep in
// the local route table.
type RouteTableFilterFunc func(dht *IpfsDHT, conns []network.Conn) bool
type RouteTableFilterFunc func(dht *KadDHT, conns []network.Conn) bool

var publicCIDR6 = "2000::/3"
var public6 *net.IPNet
Expand Down Expand Up @@ -59,7 +59,7 @@ func isPrivateAddr(a ma.Multiaddr) bool {
}

// PublicQueryFilter returns true if the peer is suspected of being publicly accessible
func PublicQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool {
func PublicQueryFilter(_ *KadDHT, ai peer.AddrInfo) bool {
if len(ai.Addrs) == 0 {
return false
}
Expand All @@ -77,7 +77,7 @@ var _ QueryFilterFunc = PublicQueryFilter

// PublicRoutingTableFilter allows a peer to be added to the routing table if the connections to that peer indicate
// that it is on a public network
func PublicRoutingTableFilter(dht *IpfsDHT, conns []network.Conn) bool {
func PublicRoutingTableFilter(dht *KadDHT, conns []network.Conn) bool {
if len(conns) == 0 {
return false
}
Expand All @@ -97,7 +97,7 @@ func PublicRoutingTableFilter(dht *IpfsDHT, conns []network.Conn) bool {
var _ RouteTableFilterFunc = PublicRoutingTableFilter

// PrivateQueryFilter doens't currently restrict which peers we are willing to query from the local DHT.
func PrivateQueryFilter(dht *IpfsDHT, ai peer.AddrInfo) bool {
func PrivateQueryFilter(dht *KadDHT, ai peer.AddrInfo) bool {
return len(ai.Addrs) > 0
}

Expand Down Expand Up @@ -137,7 +137,7 @@ func getCachedRouter() routing.Router {

// PrivateRoutingTableFilter allows a peer to be added to the routing table if the connections to that peer indicate
// that it is on a private network
func PrivateRoutingTableFilter(dht *IpfsDHT, conns []network.Conn) bool {
func PrivateRoutingTableFilter(dht *KadDHT, conns []network.Conn) bool {
router := getCachedRouter()
myAdvertisedIPs := make([]net.IP, 0)
for _, a := range dht.Host().Addrs() {
Expand Down
12 changes: 6 additions & 6 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (w *bufferedDelimitedWriter) Flush() error {
}

// handleNewStream implements the network.StreamHandler
func (dht *IpfsDHT) handleNewStream(s network.Stream) {
func (dht *KadDHT) handleNewStream(s network.Stream) {
if dht.handleNewMessage(s) {
// If we exited without error, close gracefully.
_ = s.Close()
Expand All @@ -73,7 +73,7 @@ func (dht *IpfsDHT) handleNewStream(s network.Stream) {
}

// Returns true on orderly completion of writes (so we can Close the stream).
func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
func (dht *KadDHT) handleNewMessage(s network.Stream) bool {
ctx := dht.ctx
r := msgio.NewVarintReaderSize(s, network.MessageSizeMax)

Expand Down Expand Up @@ -210,7 +210,7 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {

// sendRequest sends out a request, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
func (dht *KadDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
ctx, _ = tag.New(ctx, metrics.UpsertMessageType(pmes))

ms, err := dht.messageSenderForPeer(ctx, p)
Expand Down Expand Up @@ -245,7 +245,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
}

// sendMessage sends out a message
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
func (dht *KadDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
ctx, _ = tag.New(ctx, metrics.UpsertMessageType(pmes))

ms, err := dht.messageSenderForPeer(ctx, p)
Expand Down Expand Up @@ -274,7 +274,7 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
return nil
}

func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messageSender, error) {
func (dht *KadDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messageSender, error) {
dht.smlk.Lock()
ms, ok := dht.strmap[p]
if ok {
Expand Down Expand Up @@ -311,7 +311,7 @@ type messageSender struct {
r msgio.ReadCloser
lk ctxMutex
p peer.ID
dht *IpfsDHT
dht *KadDHT

invalid bool
singleMes int
Expand Down
4 changes: 2 additions & 2 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ type config struct {
testAddressUpdateProcessing bool
}

func emptyQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool { return true }
func emptyRTFilter(_ *IpfsDHT, conns []network.Conn) bool { return true }
func emptyQueryFilter(_ *KadDHT, ai peer.AddrInfo) bool { return true }
func emptyRTFilter(_ *KadDHT, conns []network.Conn) bool { return true }

// apply applies the given options to this Option
func (c *config) apply(opts ...Option) error {
Expand Down
42 changes: 21 additions & 21 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (testAtomicPutValidator) Select(_ string, bs [][]byte) (int, error) {

var testPrefix = ProtocolPrefix("/test")

func setupDHT(ctx context.Context, t *testing.T, client bool, options ...Option) *IpfsDHT {
func setupDHT(ctx context.Context, t *testing.T, client bool, options ...Option) *KadDHT {
baseOpts := []Option{
testPrefix,
NamespacedValidator("v", blankValidator{}),
Expand All @@ -127,9 +127,9 @@ func setupDHT(ctx context.Context, t *testing.T, client bool, options ...Option)
return d
}

func setupDHTS(t *testing.T, ctx context.Context, n int, options ...Option) []*IpfsDHT {
func setupDHTS(t *testing.T, ctx context.Context, n int, options ...Option) []*KadDHT {
addrs := make([]ma.Multiaddr, n)
dhts := make([]*IpfsDHT, n)
dhts := make([]*KadDHT, n)
peers := make([]peer.ID, n)

sanityAddrsMap := make(map[string]struct{})
Expand All @@ -155,7 +155,7 @@ func setupDHTS(t *testing.T, ctx context.Context, n int, options ...Option) []*I
return dhts
}

func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
func connectNoSync(t *testing.T, ctx context.Context, a, b *KadDHT) {
t.Helper()

idB := b.self
Expand All @@ -171,7 +171,7 @@ func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
}
}

func wait(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
func wait(t *testing.T, ctx context.Context, a, b *KadDHT) {
t.Helper()

// loop until connection notification has been received.
Expand All @@ -185,14 +185,14 @@ func wait(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
}
}

func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
func connect(t *testing.T, ctx context.Context, a, b *KadDHT) {
t.Helper()
connectNoSync(t, ctx, a, b)
wait(t, ctx, a, b)
wait(t, ctx, b, a)
}

func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
func bootstrap(t *testing.T, ctx context.Context, dhts []*KadDHT) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down Expand Up @@ -263,7 +263,7 @@ func TestValueGetSet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var dhts [5]*IpfsDHT
var dhts [5]*KadDHT

for i := range dhts {
dhts[i] = setupDHT(ctx, t, false)
Expand Down Expand Up @@ -671,7 +671,7 @@ func TestLocalProvides(t *testing.T) {
}

// if minPeers or avgPeers is 0, dont test for it.
func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) {
func waitForWellFormedTables(t *testing.T, dhts []*KadDHT, minPeers, avgPeers int, timeout time.Duration) {
// test "well-formed-ness" (>= minPeers peers in every routing table)
t.Helper()

Expand Down Expand Up @@ -709,7 +709,7 @@ func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers i
}
}

func printRoutingTables(dhts []*IpfsDHT) {
func printRoutingTables(dhts []*KadDHT) {
// the routing tables should be full now. let's inspect them.
fmt.Printf("checking routing table of %d\n", len(dhts))
for _, dht := range dhts {
Expand Down Expand Up @@ -804,7 +804,7 @@ func TestRefreshBelowMinRTThreshold(t *testing.T) {
// we ONLY init bootstrap on A
dhtA.RefreshRoutingTable()
// and wait for one round to complete i.e. A should be connected to both B & C
waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 2, 2, 20*time.Second)
waitForWellFormedTables(t, []*KadDHT{dhtA}, 2, 2, 20*time.Second)

// now we create two new peers
dhtD := setupDHT(ctx, t, false)
Expand All @@ -826,7 +826,7 @@ func TestRefreshBelowMinRTThreshold(t *testing.T) {
connect(t, ctx, dhtA, dhtD)

// and because of the above bootstrap, A also discovers E !
waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 4, 4, 20*time.Second)
waitForWellFormedTables(t, []*KadDHT{dhtA}, 4, 4, 20*time.Second)
assert.Equal(t, dhtE.self, dhtA.routingTable.Find(dhtE.self), "A's routing table should have peer E!")
}

Expand Down Expand Up @@ -907,7 +907,7 @@ func TestPeriodicRefresh(t *testing.T) {
var wg sync.WaitGroup
for _, dht := range dhts {
wg.Add(1)
go func(d *IpfsDHT) {
go func(d *KadDHT) {
<-d.RefreshRoutingTable()
wg.Done()
}(dht)
Expand Down Expand Up @@ -983,7 +983,7 @@ func TestProvidesMany(t *testing.T) {
defer cancel()

var wg sync.WaitGroup
getProvider := func(dht *IpfsDHT, k cid.Cid) {
getProvider := func(dht *KadDHT, k cid.Cid) {
defer wg.Done()

expected := providers[k]
Expand Down Expand Up @@ -1422,7 +1422,7 @@ func testFindPeerQuery(t *testing.T,
var wg sync.WaitGroup
for _, dht := range dhts {
wg.Add(1)
go func(d *IpfsDHT) {
go func(d *KadDHT) {
<-d.RefreshRoutingTable()
wg.Done()
}(dht)
Expand Down Expand Up @@ -1516,7 +1516,7 @@ func TestFixLowPeers(t *testing.T) {
require.NoError(t, mainD.Host().Connect(ctx, peer.AddrInfo{ID: d.self}))
}

waitForWellFormedTables(t, []*IpfsDHT{mainD}, minRTRefreshThreshold, minRTRefreshThreshold+4, 5*time.Second)
waitForWellFormedTables(t, []*KadDHT{mainD}, minRTRefreshThreshold, minRTRefreshThreshold+4, 5*time.Second)

// run a refresh on all of them
for _, d := range dhts {
Expand All @@ -1530,7 +1530,7 @@ func TestFixLowPeers(t *testing.T) {
}

// but we will still get enough peers in the RT because of fix low Peers
waitForWellFormedTables(t, []*IpfsDHT{mainD}, minRTRefreshThreshold, minRTRefreshThreshold, 5*time.Second)
waitForWellFormedTables(t, []*KadDHT{mainD}, minRTRefreshThreshold, minRTRefreshThreshold, 5*time.Second)
}

func TestProvideDisabled(t *testing.T) {
Expand Down Expand Up @@ -1625,19 +1625,19 @@ func TestHandleRemotePeerProtocolChanges(t *testing.T) {
connect(t, ctx, dhtA, dhtB)

// now assert both have each other in their RT
waitForWellFormedTables(t, []*IpfsDHT{dhtA, dhtB}, 1, 1, 10*time.Second)
waitForWellFormedTables(t, []*KadDHT{dhtA, dhtB}, 1, 1, 10*time.Second)

// dhtB becomes a client
require.NoError(t, dhtB.setMode(modeClient))

// which means that dhtA should evict it from it's RT
waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 0, 0, 10*time.Second)
waitForWellFormedTables(t, []*KadDHT{dhtA}, 0, 0, 10*time.Second)

// dhtB becomes a server
require.NoError(t, dhtB.setMode(modeServer))

// which means dhtA should have it in the RT again because of fixLowPeers
waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 1, 1, 10*time.Second)
waitForWellFormedTables(t, []*KadDHT{dhtA}, 1, 1, 10*time.Second)
}

func TestGetSetPluggedProtocol(t *testing.T) {
Expand Down Expand Up @@ -1876,7 +1876,7 @@ func TestV1ProtocolOverride(t *testing.T) {
d3 := setupDHT(ctx, t, false, V1ProtocolOverride("/myproto2"))
d4 := setupDHT(ctx, t, false)

dhts := []*IpfsDHT{d1, d2, d3, d4}
dhts := []*KadDHT{d1, d2, d3, d4}

for i, dout := range dhts {
for _, din := range dhts[i+1:] {
Expand Down
6 changes: 3 additions & 3 deletions dual/dual.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
// DHT implements the routing interface to provide two concrete DHT implementationts for use
// in IPFS that are used to support both global network users and disjoint LAN usecases.
type DHT struct {
WAN *dht.IpfsDHT
LAN *dht.IpfsDHT
WAN *dht.KadDHT
LAN *dht.KadDHT
}

// LanExtension is used to differentiate local protocol requests from those on the WAN DHT.
Expand Down Expand Up @@ -90,7 +90,7 @@ func DHTOption(opts ...dht.Option) Option {
}

// New creates a new DualDHT instance. Options provided are forwarded on to the two concrete
// IpfsDHT internal constructions, modulo additional options used by the Dual DHT to enforce
// KadDHT internal constructions, modulo additional options used by the Dual DHT to enforce
// the LAN-vs-WAN distinction.
// Note: query or routing table functional options provided as arguments to this function
// will be overriden by this constructor.
Expand Down
Loading

0 comments on commit 0f9a370

Please sign in to comment.