Skip to content

Commit

Permalink
peer: Implement stall detection.
Browse files Browse the repository at this point in the history
This commit implements stall detection logic at the peer level to detect
and disconnect peers that are either not following the protocol in
regards to expected response messages or have otherwise stalled.  This
is accomplished by setting deadlines for each message type which expects
a response and periodically checking them while properly taking into
account processing time.

There are an increasing number of nodes on the network which claim to be
full nodes, but don't actually properly implement the entire p2p
protocol even though they implement it enough to cause properly
implemented nodes to make data requests to which they never respond.

Since btcd currently only syncs new blocks via single sync peer and,
prior to this commit only had very basic stall detection, this could
lead to a situation where the block download became stalled indefinitely
due to one of these misbehaving peers.  This commit fixes that issue
since the stalled peer will now be detected and disconnected which leads
to a new sync peer being selected.

This logic will also fit nicely with the future multi-peer sync model
which is on the roadmap and for which infrastructure work is underway.

Fixes #486 and fixes #229.
  • Loading branch information
davecgh committed Oct 23, 2015
1 parent f1bd2f8 commit cbbe3a8
Showing 1 changed file with 249 additions and 15 deletions.
264 changes: 249 additions & 15 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ const (
// MaxProtocolVersion is the max protocol version the peer supports.
MaxProtocolVersion = 70011

// BlockStallTimeout is the number of seconds we will wait for a
// "block" response after we send out a "getdata" for an announced
// block before we deem the peer inactive, and disconnect it.
BlockStallTimeout = 5 * time.Second

// outputBufferSize is the number of elements the output channels use.
outputBufferSize = 50

Expand All @@ -54,6 +49,16 @@ const (
// idleTimeout is the duration of inactivity before we time out a peer.
idleTimeout = 5 * time.Minute

// stallTickInterval is the interval of time between each check for
// stalled peers.
stallTickInterval = 15 * time.Second

// stallResponseTimeout is the base maximum amount of time messages that
// expect a response will wait before disconnecting the peer for
// stalling. The deadlines are adjusted for callback running times and
// only checked on each stall tick interval.
stallResponseTimeout = 30 * time.Second

// trickleTimeout is the duration of the ticker which trickles down the
// inventory to a peer.
trickleTimeout = 10 * time.Second
Expand Down Expand Up @@ -83,10 +88,10 @@ var (
// during peer initialization is ignored. Execution of multiple message
// listeners occurs serially, so one callback blocks the excution of the next.
//
// NOTE: Unless otherwise documented, these listeners must NOT directly call
// any blocking calls (such as WaitForShutdown) on the peer instance since the
// input handler goroutine blocks until the callback has completed. Doing so
// will result in a deadlock situation.
// NOTE: Unless otherwise documented, these listeners must NOT directly call any
// blocking calls (such as WaitForShutdown) on the peer instance since the input
// handler goroutine blocks until the callback has completed. Doing so will
// result in a deadlock.
type MessageListeners struct {
// OnGetAddr is invoked when a peer receives a getaddr bitcoin message.
OnGetAddr func(p *Peer, msg *wire.MsgGetAddr)
Expand Down Expand Up @@ -291,6 +296,32 @@ type outMsg struct {
doneChan chan struct{}
}

// stallControlCmd represents the command of a stall control message.
type stallControlCmd uint8

// Constants for the command of a stall control message.
const (
// sccSendMessage indicates a message is being sent to the remote peer.
sccSendMessage stallControlCmd = iota

// sccReceiveMessage indicates a message has been received from the
// remote peer.
sccReceiveMessage

// sccHandlerStart indicates a callback handler is about to be invoked.
sccHandlerStart

// sccHandlerStart indicates a callback handler has completed.
sccHandlerDone
)

// stallControlMsg is used to signal the stall handler about specific events
// so it can properly detect and handle stalled remote peers.
type stallControlMsg struct {
command stallControlCmd
message wire.Message
}

// stats is the collection of stats related to a peer.
type stats struct {
statsMtx sync.RWMutex // protects all statistics below here.
Expand Down Expand Up @@ -396,12 +427,16 @@ type Peer struct {
prevGetHdrsMtx sync.Mutex
prevGetHdrsBegin *wire.ShaHash
prevGetHdrsStop *wire.ShaHash
outputQueue chan outMsg
sendQueue chan outMsg
sendDoneQueue chan struct{}
outputInvChan chan *wire.InvVect
queueQuit chan struct{}
quit chan struct{}

stallControl chan stallControlMsg
outputQueue chan outMsg
sendQueue chan outMsg
sendDoneQueue chan struct{}
outputInvChan chan *wire.InvVect
inQuit chan struct{}
queueQuit chan struct{}
outQuit chan struct{}
quit chan struct{}

stats
}
Expand Down Expand Up @@ -1226,6 +1261,194 @@ func (p *Peer) shouldHandleReadError(err error) bool {
return true
}

// maybeAddDeadline potentially adds a deadline for the appropriate expected
// response for the passed wire protocol command to the pending responses map.
func (p *Peer) maybeAddDeadline(pendingResponses map[string]time.Time, msgCmd string) {
// Setup a deadline for each message being sent that expects a response.
deadline := time.Now().Add(stallResponseTimeout)
switch msgCmd {
case wire.CmdVersion:
// Expects a verack message.
pendingResponses[wire.CmdVerAck] = deadline

case wire.CmdGetAddr:
// Expects an addr message.
pendingResponses[wire.CmdAddr] = deadline

case wire.CmdPing:
// Expects a pong message in later protocol versions.
if p.ProtocolVersion() > wire.BIP0031Version {
pendingResponses[wire.CmdPong] = deadline
}

case wire.CmdMemPool:
// Expects an inv message.
pendingResponses[wire.CmdInv] = deadline

case wire.CmdGetBlocks:
// Expects an inv message.
pendingResponses[wire.CmdInv] = deadline

case wire.CmdGetData:
// Expects a block, tx, or notfound message.
pendingResponses[wire.CmdBlock] = deadline
pendingResponses[wire.CmdTx] = deadline
pendingResponses[wire.CmdNotFound] = deadline

case wire.CmdGetHeaders:
// Expects a headers message. Use a longer deadline since it
// can take a while for the remote peer to load all of the
// headers.
deadline = time.Now().Add(stallResponseTimeout * 3)
pendingResponses[wire.CmdHeaders] = deadline
}
}

// stallHandler handles stall detection for the peer. This entails keeping
// track of expected responses and assigning them deadlines while accounting for
// the time spent in callbacks. It must be run as a goroutine.
func (p *Peer) stallHandler() {
// These variables are used to adjust the deadline times forward by the
// time it takes callbacks to execute. This is done because new
// messages aren't read until the previous one is finished processing
// (which includes callbacks), so the deadline for receiving a response
// for a given message must account for the processing time as well.
var handlerActive bool
var handlersStartTime time.Time
var deadlineOffset time.Duration

// pendingResponses tracks the expected response deadline times.
pendingResponses := make(map[string]time.Time)

// stallTicker is used to periodically check pending responses that have
// exceeded the expected deadline and disconnect the peer due to
// stalling.
stallTicker := time.NewTicker(stallTickInterval)
defer stallTicker.Stop()

// ioStopped is used to detect when both the input and output handler
// goroutines are done.
var ioStopped bool
out:
for {
select {
case msg := <-p.stallControl:
switch msg.command {
case sccSendMessage:
// Add a deadline for the expected response
// message if needed.
p.maybeAddDeadline(pendingResponses,
msg.message.Command())

case sccReceiveMessage:
// Remove received messages from the expected
// reponse map. Since certain commands expect
// one of a group of responses, remove everyting
// in the expected group accordingly.
switch msgCmd := msg.message.Command(); msgCmd {
case wire.CmdBlock:
fallthrough
case wire.CmdTx:
fallthrough
case wire.CmdNotFound:
delete(pendingResponses, wire.CmdBlock)
delete(pendingResponses, wire.CmdTx)
delete(pendingResponses, wire.CmdNotFound)

default:
delete(pendingResponses, msgCmd)
}

case sccHandlerStart:
// Warn on unbalanced callback signalling.
if handlerActive {
log.Warn("Received handler start " +
"control command while a " +
"handler is already active")
continue
}

handlerActive = true
handlersStartTime = time.Now()

case sccHandlerDone:
// Warn on unbalanced callback signalling.
if !handlerActive {
log.Warn("Received handler done " +
"control command when a " +
"handler is not already active")
continue
}

// Extend active deadlines by the time it took
// to execute the callback.
duration := time.Now().Sub(handlersStartTime)
deadlineOffset += duration
handlerActive = false

default:
log.Warnf("Unsupported message command %v",
msg.command)
}

case <-stallTicker.C:
// Calculate the offset to apply to the deadline based
// on how long the handlers have taken to execute since
// the last tick.
now := time.Now()
offset := deadlineOffset
if handlerActive {
offset += now.Sub(handlersStartTime)
}

// Disconnect the peer if any of the pending responses
// don't arrive by their adjusted deadline.
for command, deadline := range pendingResponses {
if now.Before(deadline.Add(offset)) {
continue
}

log.Debugf("Peer %s appears to be stalled or "+
"misbehaving, %s timeout -- "+
"disconnecting", p, command)
p.Disconnect()
break
}

// Reset the deadline offset for the next tick.
deadlineOffset = 0

case <-p.inQuit:
// The stall handler can exit once both the input and
// output handler goroutines are done.
if ioStopped {
break out
}
ioStopped = true

case <-p.outQuit:
// The stall handler can exit once both the input and
// output handler goroutines are done.
if ioStopped {
break out
}
ioStopped = true
}
}

// Drain any wait channels before going away so there is nothing left
// waiting on this goroutine.
cleanup:
for {
select {
case <-p.stallControl:
default:
break cleanup
}
}
log.Tracef("Peer stall handler done for %s", p)
}

// inHandler handles all incoming messages for the peer. It must be run as a
// goroutine.
func (p *Peer) inHandler() {
Expand All @@ -1242,6 +1465,7 @@ func (p *Peer) inHandler() {
}
p.Disconnect()
})

out:
for atomic.LoadInt32(&p.disconnect) == 0 {
// Read a message and stop the idle timer as soon as the read
Expand Down Expand Up @@ -1286,6 +1510,7 @@ out:
p.statsMtx.Lock()
p.lastRecv = time.Now()
p.statsMtx.Unlock()
p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg}

// Ensure version message comes first.
if vmsg, ok := rmsg.(*wire.MsgVersion); !ok && !p.VersionKnown() {
Expand All @@ -1300,6 +1525,7 @@ out:
}

// Handle each supported message type.
p.stallControl <- stallControlMsg{sccHandlerStart, rmsg}
switch msg := rmsg.(type) {
case *wire.MsgVersion:
p.handleVersionMsg(msg)
Expand Down Expand Up @@ -1437,6 +1663,7 @@ out:
log.Debugf("Received unhandled message of type %v:",
rmsg.Command())
}
p.stallControl <- stallControlMsg{sccHandlerDone, rmsg}

// A message was received so reset the idle timer.
idleTimer.Reset(idleTimeout)
Expand All @@ -1448,6 +1675,7 @@ out:
// Ensure connection is closed.
p.Disconnect()

close(p.inQuit)
log.Tracef("Peer input handler done for %s", p)
}

Expand Down Expand Up @@ -1609,6 +1837,7 @@ out:
}
}

p.stallControl <- stallControlMsg{sccSendMessage, msg.msg}
p.writeMessage(msg.msg)
p.statsMtx.Lock()
p.lastSend = time.Now()
Expand Down Expand Up @@ -1649,6 +1878,7 @@ cleanup:
break cleanup
}
}
close(p.outQuit)
log.Tracef("Peer output handler done for %s", p)
}

Expand Down Expand Up @@ -1751,6 +1981,7 @@ func (p *Peer) Start() error {
}

// Start processing input and output.
go p.stallHandler()
go p.inHandler()
go p.queueHandler()
go p.outHandler()
Expand Down Expand Up @@ -1790,11 +2021,14 @@ func newPeerBase(cfg *Config, inbound bool) *Peer {
p := Peer{
inbound: inbound,
knownInventory: NewMruInventoryMap(maxKnownInventory),
stallControl: make(chan stallControlMsg, 1), // nonblocking sync
outputQueue: make(chan outMsg, outputBufferSize),
sendQueue: make(chan outMsg, 1), // nonblocking sync
sendDoneQueue: make(chan struct{}, 1), // nonblocking sync
outputInvChan: make(chan *wire.InvVect, outputBufferSize),
inQuit: make(chan struct{}),
queueQuit: make(chan struct{}),
outQuit: make(chan struct{}),
quit: make(chan struct{}),
stats: stats{},
cfg: *cfg, // Copy so caller can't mutate.
Expand Down

0 comments on commit cbbe3a8

Please sign in to comment.