Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Potential leak of message queues. #341

Closed
Stebalien opened this issue Apr 8, 2020 · 8 comments
Closed

Potential leak of message queues. #341

Stebalien opened this issue Apr 8, 2020 · 8 comments

Comments

@Stebalien
Copy link
Member

We could just be really busy sending cancels? But it really looks like we're leaking something somewhere. Assuming each CID takes up at most 100 bytes (should take ~50 at most):

(242<<20)/100 = 2.5e6 cancels

With 5K peers, that's 500 wants peer peer. That's a lot.

PPROF:

OUTINE ======================== github.com/ipfs/go-bitswap/internal/peermanager.(*PeerManager).SendCancels in pkg/mod/github.com/ipfs/go-bitswap@v0.2.8/internal/peermanager/peermanager.go
         0   242.85MB (flat, cum) 26.51% of Total
         .          .    166:	defer pm.pqLk.Unlock()
         .          .    167:
         .          .    168:	// Send a CANCEL to each peer that has been sent a want-block or want-have
         .          .    169:	for p, ks := range pm.pwm.prepareSendCancels(cancelKs) {
         .          .    170:		if pqi, ok := pm.peerQueues[p]; ok {
         .   242.85MB    171:			pqi.pq.AddCancels(ks)
         .          .    172:		}
         .          .    173:	}
         .          .    174:}
         .          .    175:
         .          .    176:// CurrentWants returns the list of pending wants (both want-haves and want-blocks).
(pprof) list AddCancels
Total: 916.20MB
ROUTINE ======================== github.com/ipfs/go-bitswap/internal/messagequeue.(*MessageQueue).AddCancels in pkg/mod/github.com/ipfs/go-bitswap@v0.2.8/internal/messagequeue/messagequeue.go
         0   242.85MB (flat, cum) 26.51% of Total
         .          .    253:
         .          .    254:	// Remove keys from broadcast and peer wants, and add to cancels
         .          .    255:	for _, c := range cancelKs {
         .          .    256:		mq.bcstWants.Remove(c)
         .          .    257:		mq.peerWants.Remove(c)
         .   242.85MB    258:		mq.cancels.Add(c)
         .          .    259:	}
         .          .    260:
         .          .    261:	// Schedule a message send
         .          .    262:	mq.signalWorkReady()
         .          .    263:}
ROUTINE ======================== github.com/ipfs/go-bitswap/internal/messagequeue.(*MessageQueue).AddBroadcastWantHaves in pkg/mod/github.com/ipfs/go-bitswap@v0.2.8/internal/messagequeue/messagequeue.go
         0    45.24MB (flat, cum)  4.94% of Total
         .          .    195:
         .          .    196:	mq.wllock.Lock()
         .          .    197:	defer mq.wllock.Unlock()
         .          .    198:
         .          .    199:	for _, c := range wantHaves {
         .    45.24MB    200:		mq.bcstWants.Add(c, mq.priority, pb.Message_Wantlist_Have)
         .          .    201:		mq.priority--
         .          .    202:
         .          .    203:		// We're adding a want-have for the cid, so clear any pending cancel
         .          .    204:		// for the cid
         .          .    205:		mq.cancels.Remove(c)
ROUTINE ======================== github.com/ipfs/go-bitswap/internal/peermanager.(*PeerManager).BroadcastWantHaves in pkg/mod/github.com/ipfs/go-bitswap@v0.2.8/internal/peermanager/peermanager.go
         0    33.64MB (flat, cum)  3.67% of Total
         .          .    138:// the peer.
         .          .    139:func (pm *PeerManager) BroadcastWantHaves(ctx context.Context, wantHaves []cid.Cid) {
         .          .    140:	pm.pqLk.Lock()
         .          .    141:	defer pm.pqLk.Unlock()
         .          .    142:
         .     7.01MB    143:	for p, ks := range pm.pwm.prepareBroadcastWantHaves(wantHaves) {
         .          .    144:		if pqi, ok := pm.peerQueues[p]; ok {
         .    26.63MB    145:			pqi.pq.AddBroadcastWantHaves(ks)
         .          .    146:		}
         .          .    147:	}
         .          .    148:}
         .          .    149:
         .          .    150:// SendWants sends the given want-blocks and want-haves to the given peer.
ROUTINE ======================== github.com/ipfs/go-bitswap/internal/peermanager.(*peerWantManager).prepareBroadcastWantHaves in pkg/mod/github.com/ipfs/go-bitswap@v0.2.8/internal/peermanager/peerwantmanager.go
         0     7.01MB (flat, cum)  0.77% of Total
         .          .     73:		// Iterate over all want-haves
         .          .     74:		for _, c := range wantHaves {
         .          .     75:			// If the CID has not been sent as a want-block or want-have
         .          .     76:			if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
         .          .     77:				// Record that the CID has been sent as a want-have
         .     7.01MB     78:				pws.wantHaves.Add(c)
         .          .     79:
         .          .     80:				// Add the CID to the results
         .          .     81:				if _, ok := res[p]; !ok {
         .          .     82:					res[p] = make([]cid.Cid, 0, 1)
         .          .     83:				}
ROUTINE ======================== github.com/ipfs/go-bitswap/internal/wantmanager.(*WantManager).BroadcastWantHaves in pkg/mod/github.com/ipfs/go-bitswap@v0.2.8/internal/wantmanager/wantmanager.go
         0    33.64MB (flat, cum)  3.67% of Total
         .          .     82:
         .          .     83:	// Record broadcast wants
         .          .     84:	wm.bcwl.Add(wantHaves, ses)
         .          .     85:
         .          .     86:	// Send want-haves to all peers
         .    33.64MB     87:	wm.peerHandler.BroadcastWantHaves(ctx, wantHaves)
         .          .     88:}
         .          .     89:
         .          .     90:// RemoveSession is called when the session is shut down
         .          .     91:func (wm *WantManager) RemoveSession(ctx context.Context, ses uint64) {
         .          .     92:	// Remove session's interest in the given blocks.
@dirkmc
Copy link
Contributor

dirkmc commented Apr 8, 2020

I tried running one of the benchmarks on my local machine where 3 peers fetch 1000 blocks from each other, and grepping the logs for want / cancels sent by the message queue, it seems like it's sending about the right number of cancels:

$ grep '"type": "WANT' /tmp/out.txt | wc -l
   11525
$ grep '"type": "CANCEL' /tmp/out.txt | wc -l
   11286

We may need to do another custom build that outputs these logs on the staging server to see if the numbers add up there.

@Stebalien
Copy link
Member Author

I'm concerned that we might be leaking the queues themselves. But maybe not.

@dirkmc
Copy link
Contributor

dirkmc commented Apr 8, 2020

That could happen if there are more Connect events than Disconnect events per peer that emanate from libp2p. Is that possible?

@Stebalien
Copy link
Member Author

That shouldn't be. Guarantees:

  1. If we fire a connect event, we will fire a disconnect event.
  2. For any given connection, the disconnect event will fire after the connect event.

We can pull more stats and see if this has changed.

@dirkmc
Copy link
Contributor

dirkmc commented Apr 8, 2020

This is how we handle Connected and Disconnected in PeerManager:

// Connected is called to add a new peer to the pool, and send it an initial set
// of wants.
func (pm *PeerManager) Connected(p peer.ID, initialWantHaves []cid.Cid) {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()
pq := pm.getOrCreate(p)
pq.refcnt++
// If this is the first connection to the peer
if pq.refcnt == 1 {
// Inform the peer want manager that there's a new peer
pm.pwm.addPeer(p)
// Record that the want-haves are being sent to the peer
_, wantHaves := pm.pwm.prepareSendWants(p, nil, initialWantHaves)
// Broadcast any live want-haves to the newly connected peers
pq.pq.AddBroadcastWantHaves(wantHaves)
// Inform the sessions that the peer has connected
pm.signalAvailability(p, true)
}
}
// Disconnected is called to remove a peer from the pool.
func (pm *PeerManager) Disconnected(p peer.ID) {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()
pq, ok := pm.peerQueues[p]
if !ok {
return
}
pq.refcnt--
if pq.refcnt > 0 {
return
}
// Inform the sessions that the peer has disconnected
pm.signalAvailability(p, false)
// Clean up the peer
delete(pm.peerQueues, p)
pq.pq.Shutdown()
pm.pwm.removePeer(p)
}

When we send cancels we first check to make sure we previously sent a want to the peer:

func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid) {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()
// Send a CANCEL to each peer that has been sent a want-block or want-have
for p, ks := range pm.pwm.prepareSendCancels(cancelKs) {
if pqi, ok := pm.peerQueues[p]; ok {
pqi.pq.AddCancels(ks)
}
}
}

func (pwm *peerWantManager) prepareSendCancels(cancelKs []cid.Cid) map[peer.ID][]cid.Cid {
res := make(map[peer.ID][]cid.Cid)
// Iterate over all known peers
for p, pws := range pwm.peerWants {
// Iterate over all requested cancels
for _, c := range cancelKs {
isWantBlock := pws.wantBlocks.Has(c)
isWantHave := pws.wantHaves.Has(c)
// If the CID was sent as a want-block, decrement the want-block count
if isWantBlock {
pwm.wantBlockGauge.Dec()
}
// If the CID was sent as a want-block or want-have
if isWantBlock || isWantHave {
// Remove the CID from the recorded want-blocks and want-haves
pws.wantBlocks.Remove(c)
pws.wantHaves.Remove(c)
// Add the CID to the results
if _, ok := res[p]; !ok {
res[p] = make([]cid.Cid, 0, 1)
}
res[p] = append(res[p], c)
}
}
}
return res
}

@Stebalien
Copy link
Member Author

Definitely a leak. We're now up to ~1GiB of CIDs for cancels held in memory. I'm not seeing anything close for wants etc.

@Stebalien
Copy link
Member Author

Ok, forcibly disconnecting all peers has fixed the issue so we're clearly not leaking entire queues. However, we're still collecting cancels we should be removing.

@Stebalien
Copy link
Member Author

@dirkmc has narrowed this down. It looks like we're backed up trying to send cancels to peers that aren't actually accepting our streams. That means we're:

  1. Sending wants.
  2. Sending cancels, canceling the wants but keeping the cancels.

Repeatedly... Then, in the send loop, we're really slow about actually aborting and giving up.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants
@dirkmc @Stebalien and others