Skip to content

Commit

Permalink
refactor observed address manager to do mapping at thin waist layer
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Marco Munizaga <git@marcopolo.io>
  • Loading branch information
sukunrt and MarcoPolo committed May 15, 2024
1 parent a86d94e commit 45b6ccd
Show file tree
Hide file tree
Showing 8 changed files with 1,372 additions and 1,011 deletions.
1 change: 1 addition & 0 deletions core/peerstore/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
RecentlyConnectedAddrTTL = time.Minute * 30

// OwnObservedAddrTTL is used for our own external addresses observed by peers.
// Deprecated: observed addresses are maintained till we disconnect from the peer which provided it
OwnObservedAddrTTL = time.Minute * 30
)

Expand Down
58 changes: 47 additions & 11 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ type idService struct {
addrMu sync.Mutex

// our own observed addresses.
observedAddrs *ObservedAddrManager
observedAddrMgr *ObservedAddrManager
disableObservedAddrManager bool

emitters struct {
evtPeerProtocolsUpdated event.Emitter
Expand All @@ -171,6 +172,12 @@ type idService struct {
sync.Mutex
snapshot identifySnapshot
}

natEmitter *natEmitter
}

type normalizer interface {
NormalizeMultiaddr(ma.Multiaddr) ma.Multiaddr
}

// NewIDService constructs a new *idService and activates it by
Expand Down Expand Up @@ -199,11 +206,27 @@ func NewIDService(h host.Host, opts ...Option) (*idService, error) {
metricsTracer: cfg.metricsTracer,
}

observedAddrs, err := NewObservedAddrManager(h)
if err != nil {
return nil, fmt.Errorf("failed to create observed address manager: %s", err)
var normalize func(ma.Multiaddr) ma.Multiaddr
if hn, ok := h.(normalizer); ok {
normalize = hn.NormalizeMultiaddr
}

var err error
if cfg.disableObservedAddrManager {
s.disableObservedAddrManager = true
} else {
observedAddrs, err := NewObservedAddrManager(h.Network().ListenAddresses,
h.Addrs, h.Network().InterfaceListenAddresses, normalize)
if err != nil {
return nil, fmt.Errorf("failed to create observed address manager: %s", err)
}
natEmitter, err := newNATEmitter(h, observedAddrs, time.Minute)
if err != nil {
return nil, fmt.Errorf("failed to create nat emitter: %s", err)
}
s.natEmitter = natEmitter
s.observedAddrMgr = observedAddrs
}
s.observedAddrs = observedAddrs

s.emitters.evtPeerProtocolsUpdated, err = h.EventBus().Emitter(&event.EvtPeerProtocolsUpdated{})
if err != nil {
Expand Down Expand Up @@ -341,17 +364,26 @@ func (ids *idService) sendPushes(ctx context.Context) {
// Close shuts down the idService
func (ids *idService) Close() error {
ids.ctxCancel()
ids.observedAddrs.Close()
if !ids.disableObservedAddrManager {
ids.observedAddrMgr.Close()
ids.natEmitter.Close()
}
ids.refCount.Wait()
return nil
}

func (ids *idService) OwnObservedAddrs() []ma.Multiaddr {
return ids.observedAddrs.Addrs()
if ids.disableObservedAddrManager {
return nil
}
return ids.observedAddrMgr.Addrs()
}

func (ids *idService) ObservedAddrsFor(local ma.Multiaddr) []ma.Multiaddr {
return ids.observedAddrs.AddrsFor(local)
if ids.disableObservedAddrManager {
return nil
}
return ids.observedAddrMgr.AddrsFor(local)
}

// IdentifyConn runs the Identify protocol on a connection.
Expand Down Expand Up @@ -715,9 +747,9 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo
obsAddr = nil
}

if obsAddr != nil {
if obsAddr != nil && !ids.disableObservedAddrManager {
// TODO refactor this to use the emitted events instead of having this func call explicitly.
ids.observedAddrs.Record(c, obsAddr)
ids.observedAddrMgr.Record(c, obsAddr)
}

// mes.ListenAddrs
Expand Down Expand Up @@ -981,15 +1013,19 @@ func (nn *netNotifiee) Disconnected(_ network.Network, c network.Conn) {
delete(ids.conns, c)
ids.connsMu.Unlock()

if !ids.disableObservedAddrManager {
ids.observedAddrMgr.removeConn(c)
}

switch ids.Host.Network().Connectedness(c.RemotePeer()) {
case network.Connected, network.Limited:
return
}
// Last disconnect.
// Undo the setting of addresses to peer.ConnectedAddrTTL we did
ids.addrMu.Lock()
defer ids.addrMu.Unlock()
ids.Host.Peerstore().UpdateAddrs(c.RemotePeer(), peerstore.ConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
ids.addrMu.Unlock()
}

func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {}
Expand Down
204 changes: 108 additions & 96 deletions p2p/protocol/identify/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,104 +107,116 @@ func emitAddrChangeEvt(t *testing.T, h host.Host) {
// this is because it used to be concurrent. Now, Dial wait till the
// id service is done.
func TestIDService(t *testing.T) {
if race.WithRace() {
t.Skip("This test modifies peerstore.RecentlyConnectedAddrTTL, which is racy.")
}
// This test is highly timing dependent, waiting on timeouts/expiration.
oldTTL := peerstore.RecentlyConnectedAddrTTL
peerstore.RecentlyConnectedAddrTTL = 500 * time.Millisecond
t.Cleanup(func() { peerstore.RecentlyConnectedAddrTTL = oldTTL })

clk := mockClock.NewMock()
swarm1 := swarmt.GenSwarm(t, swarmt.WithClock(clk))
swarm2 := swarmt.GenSwarm(t, swarmt.WithClock(clk))
h1 := blhost.NewBlankHost(swarm1)
h2 := blhost.NewBlankHost(swarm2)

h1p := h1.ID()
h2p := h2.ID()

ids1, err := identify.NewIDService(h1)
require.NoError(t, err)
defer ids1.Close()
ids1.Start()

ids2, err := identify.NewIDService(h2)
require.NoError(t, err)
defer ids2.Close()
ids2.Start()

sub, err := ids1.Host.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted))
if err != nil {
t.Fatal(err)
}

testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{}) // nothing
testKnowsAddrs(t, h2, h1p, []ma.Multiaddr{}) // nothing

// the forgetMe addr represents an address for h1 that h2 has learned out of band
// (not via identify protocol). During the identify exchange, it will be
// forgotten and replaced by the addrs h1 sends.
forgetMe, _ := ma.NewMultiaddr("/ip4/1.2.3.4/tcp/1234")

h2.Peerstore().AddAddr(h1p, forgetMe, peerstore.RecentlyConnectedAddrTTL)
h2pi := h2.Peerstore().PeerInfo(h2p)
require.NoError(t, h1.Connect(context.Background(), h2pi))

h1t2c := h1.Network().ConnsToPeer(h2p)
require.NotEmpty(t, h1t2c, "should have a conn here")

ids1.IdentifyConn(h1t2c[0])

// the idService should be opened automatically, by the network.
// what we should see now is that both peers know about each others listen addresses.
t.Log("test peer1 has peer2 addrs correctly")
testKnowsAddrs(t, h1, h2p, h2.Addrs()) // has them
testHasAgentVersion(t, h1, h2p)
testHasPublicKey(t, h1, h2p, h2.Peerstore().PubKey(h2p)) // h1 should have h2's public key

// now, this wait we do have to do. it's the wait for the Listening side
// to be done identifying the connection.
c := h2.Network().ConnsToPeer(h1.ID())
require.NotEmpty(t, c, "should have connection by now at least.")
ids2.IdentifyConn(c[0])

// and the protocol versions.
t.Log("test peer2 has peer1 addrs correctly")
testKnowsAddrs(t, h2, h1p, h1.Addrs()) // has them
testHasAgentVersion(t, h2, h1p)
testHasPublicKey(t, h2, h1p, h1.Peerstore().PubKey(h1p)) // h1 should have h2's public key

// Need both sides to actually notice that the connection has been closed.
sentDisconnect1 := waitForDisconnectNotification(swarm1)
sentDisconnect2 := waitForDisconnectNotification(swarm2)
h1.Network().ClosePeer(h2p)
h2.Network().ClosePeer(h1p)
if len(h2.Network().ConnsToPeer(h1.ID())) != 0 || len(h1.Network().ConnsToPeer(h2.ID())) != 0 {
t.Fatal("should have no connections")
}

t.Log("testing addrs just after disconnect")
// addresses don't immediately expire on disconnect, so we should still have them
testKnowsAddrs(t, h2, h1p, h1.Addrs())
testKnowsAddrs(t, h1, h2p, h2.Addrs())

<-sentDisconnect1
<-sentDisconnect2
for _, withObsAddrManager := range []bool{false, true} {
t.Run(fmt.Sprintf("withObsAddrManager=%t", withObsAddrManager), func(t *testing.T) {
if race.WithRace() {
t.Skip("This test modifies peerstore.RecentlyConnectedAddrTTL, which is racy.")
}
// This test is highly timing dependent, waiting on timeouts/expiration.
oldTTL := peerstore.RecentlyConnectedAddrTTL
peerstore.RecentlyConnectedAddrTTL = 500 * time.Millisecond
t.Cleanup(func() { peerstore.RecentlyConnectedAddrTTL = oldTTL })

clk := mockClock.NewMock()
swarm1 := swarmt.GenSwarm(t, swarmt.WithClock(clk))
swarm2 := swarmt.GenSwarm(t, swarmt.WithClock(clk))
h1 := blhost.NewBlankHost(swarm1)
h2 := blhost.NewBlankHost(swarm2)

h1p := h1.ID()
h2p := h2.ID()

opts := []identify.Option{}
if !withObsAddrManager {
opts = append(opts, identify.DisableObservedAddrManager())
}
ids1, err := identify.NewIDService(h1, opts...)
require.NoError(t, err)
defer ids1.Close()
ids1.Start()

opts = []identify.Option{}
if !withObsAddrManager {
opts = append(opts, identify.DisableObservedAddrManager())
}
ids2, err := identify.NewIDService(h2, opts...)
require.NoError(t, err)
defer ids2.Close()
ids2.Start()

sub, err := ids1.Host.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted))
if err != nil {
t.Fatal(err)
}

// the addrs had their TTLs reduced on disconnect, and
// will be forgotten soon after
t.Log("testing addrs after TTL expiration")
clk.Add(time.Second)
testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{})
testKnowsAddrs(t, h2, h1p, []ma.Multiaddr{})
testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{}) // nothing
testKnowsAddrs(t, h2, h1p, []ma.Multiaddr{}) // nothing

// the forgetMe addr represents an address for h1 that h2 has learned out of band
// (not via identify protocol). During the identify exchange, it will be
// forgotten and replaced by the addrs h1 sends.
forgetMe, _ := ma.NewMultiaddr("/ip4/1.2.3.4/tcp/1234")

h2.Peerstore().AddAddr(h1p, forgetMe, peerstore.RecentlyConnectedAddrTTL)
h2pi := h2.Peerstore().PeerInfo(h2p)
require.NoError(t, h1.Connect(context.Background(), h2pi))

h1t2c := h1.Network().ConnsToPeer(h2p)
require.NotEmpty(t, h1t2c, "should have a conn here")

ids1.IdentifyConn(h1t2c[0])

// the idService should be opened automatically, by the network.
// what we should see now is that both peers know about each others listen addresses.
t.Log("test peer1 has peer2 addrs correctly")
testKnowsAddrs(t, h1, h2p, h2.Addrs()) // has them
testHasAgentVersion(t, h1, h2p)
testHasPublicKey(t, h1, h2p, h2.Peerstore().PubKey(h2p)) // h1 should have h2's public key

// now, this wait we do have to do. it's the wait for the Listening side
// to be done identifying the connection.
c := h2.Network().ConnsToPeer(h1.ID())
require.NotEmpty(t, c, "should have connection by now at least.")
ids2.IdentifyConn(c[0])

// and the protocol versions.
t.Log("test peer2 has peer1 addrs correctly")
testKnowsAddrs(t, h2, h1p, h1.Addrs()) // has them
testHasAgentVersion(t, h2, h1p)
testHasPublicKey(t, h2, h1p, h1.Peerstore().PubKey(h1p)) // h1 should have h2's public key

// Need both sides to actually notice that the connection has been closed.
sentDisconnect1 := waitForDisconnectNotification(swarm1)
sentDisconnect2 := waitForDisconnectNotification(swarm2)
h1.Network().ClosePeer(h2p)
h2.Network().ClosePeer(h1p)
if len(h2.Network().ConnsToPeer(h1.ID())) != 0 || len(h1.Network().ConnsToPeer(h2.ID())) != 0 {
t.Fatal("should have no connections")
}

// test that we received the "identify completed" event.
select {
case evtAny := <-sub.Out():
assertCorrectEvtPeerIdentificationCompleted(t, evtAny, h2)
case <-time.After(3 * time.Second):
t.Fatalf("expected EvtPeerIdentificationCompleted event within 10 seconds; none received")
t.Log("testing addrs just after disconnect")
// addresses don't immediately expire on disconnect, so we should still have them
testKnowsAddrs(t, h2, h1p, h1.Addrs())
testKnowsAddrs(t, h1, h2p, h2.Addrs())

<-sentDisconnect1
<-sentDisconnect2

// the addrs had their TTLs reduced on disconnect, and
// will be forgotten soon after
t.Log("testing addrs after TTL expiration")
clk.Add(time.Second)
testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{})
testKnowsAddrs(t, h2, h1p, []ma.Multiaddr{})

// test that we received the "identify completed" event.
select {
case evtAny := <-sub.Out():
assertCorrectEvtPeerIdentificationCompleted(t, evtAny, h2)
case <-time.After(3 * time.Second):
t.Fatalf("expected EvtPeerIdentificationCompleted event within 10 seconds; none received")
}
})
}
}

Expand Down
Loading

0 comments on commit 45b6ccd

Please sign in to comment.