Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/protocols/eth: add JustifiedNumber into StatusPacket #2702

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruni
func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer }
func (s *Ethereum) Merger() *consensus.Merger { return s.merger }
func (s *Ethereum) SyncMode() downloader.SyncMode {
mode, _ := s.handler.chainSync.modeAndLocalHead()
mode, _, _ := s.handler.chainSync.modeAndLocalHead()
return mode
}

Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty
mode := d.getMode()

// Request the advertised remote head block and wait for the response
latest, _ := p.peer.Head()
latest, _, _ := p.peer.Head()
fetch := 1
if mode == SnapSync {
fetch = 2 // head + pivot headers
Expand Down
9 changes: 5 additions & 4 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,12 @@ type downloadTesterPeer struct {
func (dlp *downloadTesterPeer) MarkLagging() {
}

// Head constructs a function to retrieve a peer's current head hash
// and total difficulty.
func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) {
// Head constructs a function to retrieve a peer's current head hash,
// justifiedNumber and total difficulty.
func (dlp *downloadTesterPeer) Head() (common.Hash, *uint64, *big.Int) {
head := dlp.chain.CurrentBlock()
return head.Hash(), dlp.chain.GetTd(head.Hash(), head.Number.Uint64())
justifiedNumber := dlp.chain.GetJustifiedNumber(head)
return head.Hash(), &justifiedNumber, dlp.chain.GetTd(head.Hash(), head.Number.Uint64())
}

func unmarshalRlpHeaders(rlpdata []rlp.RawValue) []*types.Header {
Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type peerConnection struct {

// Peer encapsulates the methods required to synchronise with a remote full peer.
type Peer interface {
Head() (common.Hash, *big.Int)
Head() (common.Hash, *uint64, *big.Int)
MarkLagging()
RequestHeadersByHash(common.Hash, int, int, bool, chan *eth.Response) (*eth.Request, error)
RequestHeadersByNumber(uint64, int, int, bool, chan *eth.Response) (*eth.Request, error)
Expand Down
15 changes: 8 additions & 7 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,14 +436,15 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {

// Execute the Ethereum handshake
var (
genesis = h.chain.Genesis()
head = h.chain.CurrentHeader()
hash = head.Hash()
number = head.Number.Uint64()
td = h.chain.GetTd(hash, number)
genesis = h.chain.Genesis()
head = h.chain.CurrentHeader()
justifiedNumber = h.chain.GetJustifiedNumber(head)
hash = head.Hash()
number = head.Number.Uint64()
td = h.chain.GetTd(hash, number)
)
forkID := forkid.NewID(h.chain.Config(), genesis, number, head.Time)
if err := peer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter, &eth.UpgradeStatusExtension{DisablePeerTxBroadcast: h.disablePeerTxBroadcast}); err != nil {
if err := peer.Handshake(h.networkID, justifiedNumber, td, hash, genesis.Hash(), forkID, h.forkFilter, &eth.UpgradeStatusExtension{DisablePeerTxBroadcast: h.disablePeerTxBroadcast}); err != nil {
peer.Log().Debug("Ethereum handshake failed", "err", err)
return err
}
Expand Down Expand Up @@ -920,7 +921,7 @@ func (h *handler) BroadcastVote(vote *types.VoteEnvelope) {
headBlock := h.chain.CurrentBlock()
currentTD := h.chain.GetTd(headBlock.Hash(), headBlock.Number.Uint64())
for _, peer := range peers {
_, peerTD := peer.Head()
_, _, peerTD := peer.Head()
deltaTD := new(big.Int).Abs(new(big.Int).Sub(currentTD, peerTD))
if deltaTD.Cmp(big.NewInt(deltaTdThreshold)) < 1 && peer.bscExt != nil {
voteMap[peer] = vote
Expand Down
4 changes: 2 additions & 2 deletions eth/handler_bsc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func testSendVotes(t *testing.T, protocol uint) {
td = handler.chain.GetTd(head.Hash(), head.Number.Uint64())
)
time.Sleep(200 * time.Millisecond)
if err := remoteEth.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
if err := remoteEth.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake: %d", err)
}
// After the handshake completes, the source handler should stream the sink
Expand Down Expand Up @@ -227,7 +227,7 @@ func testRecvVotes(t *testing.T, protocol uint) {
td = handler.chain.GetTd(head.Hash(), head.Number.Uint64())
)
time.Sleep(200 * time.Millisecond)
if err := remoteEth.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
if err := remoteEth.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake: %d", err)
}

Expand Down
38 changes: 32 additions & 6 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
)

Expand Down Expand Up @@ -135,13 +136,38 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, packet *eth.NewBlockPa
// Assuming the block is importable by the peer, but possibly not yet done so,
// calculate the head hash and TD that the peer truly must have.
var (
trueHead = block.ParentHash()
trueTD = new(big.Int).Sub(td, block.Difficulty())
trueHead = block.ParentHash()
trueJustifiedNumber *uint64
trueTD = new(big.Int).Sub(td, block.Difficulty())
)
// Update the peer's total difficulty if better than the previous
if _, td := peer.Head(); trueTD.Cmp(td) > 0 {
peer.SetHead(trueHead, trueTD)
h.chainSync.handlePeerEvent()
if trueHeadHeader := h.chain.GetHeaderByHash(trueHead); trueHeadHeader != nil {
// If the trueHeadHeader is not found in the local chain, GetJustifiedNumber will return 0.
// Ignore cases where GetJustifiedNumber actually returns 0, as this only occurs in a self-test environment.
if tmp := h.chain.GetJustifiedNumber(trueHeadHeader); tmp != 0 {
trueJustifiedNumber = &tmp
}
}
// Update the peer's justifiedNumber and total difficulty if better than the previous
if _, justifiedNumber, td := peer.Head(); trueJustifiedNumber != nil && justifiedNumber != nil {
if *trueJustifiedNumber > *justifiedNumber ||
(*trueJustifiedNumber == *justifiedNumber && trueTD.Cmp(td) > 0) {
peer.SetHead(trueHead, trueJustifiedNumber, trueTD)
h.chainSync.handlePeerEvent()
log.Trace("handleBlockBroadcast|SetHead", "justifiedNumber", *justifiedNumber, "td", td.Uint64(), "trueHead", trueHead, "trueJustifiedNumber", *trueJustifiedNumber, "trueTD", trueTD.Uint64())
}
} else {
// back to behavior without fast finality
if trueTD.Cmp(td) > 0 {
peer.SetHead(trueHead, trueJustifiedNumber, trueTD)
h.chainSync.handlePeerEvent()

if trueJustifiedNumber == nil {
log.Trace("handleBlockBroadcast|SetHead", "trueHead", trueHead, "trueJustifiedNumber", nil, "trueTD", trueTD.Uint64(), "td", td.Uint64())
} else {
log.Trace("handleBlockBroadcast|SetHead", "trueHead", trueHead, "trueJustifiedNumber", *trueJustifiedNumber, "trueTD", trueTD.Uint64(), "td", td.Uint64())
}
}
}

return nil
}
10 changes: 5 additions & 5 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func testRecvTransactions(t *testing.T, protocol uint) {
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.Number.Uint64())
)
if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
if err := src.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// Send the transaction to the sink and verify that it's added to the tx pool
Expand Down Expand Up @@ -419,7 +419,7 @@ func testSendTransactions(t *testing.T, protocol uint) {
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.Number.Uint64())
)
if err := sink.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
if err := sink.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// After the handshake completes, the source handler should stream the sink
Expand Down Expand Up @@ -636,7 +636,7 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
// Wait a bit for the above handlers to start
time.Sleep(100 * time.Millisecond)

if err := sinkPeer.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil {
if err := sinkPeer.Handshake(1, 0, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
go eth.Handle(sink, sinkPeer)
Expand Down Expand Up @@ -709,7 +709,7 @@ func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
genesis = source.chain.Genesis()
td = source.chain.GetTd(genesis.Hash(), genesis.NumberU64())
)
if err := sink.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil {
if err := sink.Handshake(1, 0, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// After the handshake completes, the source handler should stream the sink
Expand Down Expand Up @@ -811,7 +811,7 @@ func TestOptionMaxPeersPerIP(t *testing.T) {
t.Errorf("current num is %d, maxPeersPerIP is %d, but failed:%s", num, maxPeersPerIP, err)
}(tryNum)

if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
if err := src.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// make sure runEthPeer execute one by one.
Expand Down
49 changes: 41 additions & 8 deletions eth/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (
"sync"
"time"

"github.com/cometbft/cometbft/libs/rand"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/eth/protocols/bsc"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/eth/protocols/trust"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
)

Expand Down Expand Up @@ -515,24 +517,55 @@ func (ps *peerSet) snapLen() int {
return ps.snapPeers
}

// peerWithHighestTD retrieves the known peer with the currently highest total
// difficulty, but below the given PoS switchover threshold.
func (ps *peerSet) peerWithHighestTD() *eth.Peer {
// peerWithHighestHead retrieves the known peer with the currently highest head
func (ps *peerSet) peerWithHighestHead() *eth.Peer {
ps.lock.RLock()
defer ps.lock.RUnlock()

var (
bestPeer *eth.Peer
bestTd *big.Int
)
var knowJustifiedPeers, notKnowJustifiedPeers []*ethPeer
for _, p := range ps.peers {
if p.Lagging() {
continue
}
if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 {
if _, justifiedNumber, td := p.Head(); justifiedNumber != nil {
knowJustifiedPeers = append(knowJustifiedPeers, p)
log.Trace("peerWithHighestHead", "id", p.Peer.ID(), "justifiedNumber", *justifiedNumber, "td", td.Uint64())
} else {
notKnowJustifiedPeers = append(notKnowJustifiedPeers, p)
log.Trace("peerWithHighestHead", "id", p.Peer.ID(), "td", td.Uint64())
}
}

var (
bestPeer *eth.Peer
bestJustified *uint64
bestTd *big.Int
randUint = rand.Uint()
)
for _, p := range knowJustifiedPeers {
if _, justifiedNumber, td := p.Head(); bestPeer == nil {
bestPeer, bestJustified, bestTd = p.Peer, justifiedNumber, td
} else if *justifiedNumber > *bestJustified {
bestPeer, bestJustified = p.Peer, justifiedNumber
if td.Cmp(bestTd) > 0 {
bestTd = td // may be not equal `to bestPeer.td`
}
} else if *justifiedNumber == *bestJustified {
if td.Cmp(bestTd) > 0 || (td.Cmp(bestTd) == 0 && randUint%2 == 0) {
bestPeer, bestTd = p.Peer, td
}
}
}
// if some nodes does not have justified number, back to behavior without fast finality
for _, p := range notKnowJustifiedPeers {
if _, _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 || (td.Cmp(bestTd) == 0 && randUint%2 == 0) {
buddh0 marked this conversation as resolved.
Show resolved Hide resolved
bestPeer, bestTd = p.Peer, td
}
}
if bestPeer != nil {
log.Trace("peerWithHighestHead|selected", "id", bestPeer.Peer.ID(), "td", bestTd.Uint64())
}

return bestPeer
}

Expand Down
12 changes: 9 additions & 3 deletions eth/protocols/eth/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ const (
)

// Handshake executes the eth protocol handshake, negotiating version number,
// network IDs, difficulties, head and genesis blocks.
func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter, extension *UpgradeStatusExtension) error {
// network IDs, justifiedNumber, difficulties, head and genesis blocks.
func (p *Peer) Handshake(network uint64, justifiedNumber uint64, td *big.Int, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter, extension *UpgradeStatusExtension) error {
// Send out own handshake in a new thread
errc := make(chan error, 2)

Expand All @@ -51,6 +51,8 @@ func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis
Head: head,
Genesis: genesis,
ForkID: forkID,
// Step 2
buddh0 marked this conversation as resolved.
Show resolved Hide resolved
// JustifiedNumber: &justifiedNumber,
})
})
gopool.Submit(func() {
Expand All @@ -70,7 +72,11 @@ func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis
return p2p.DiscReadTimeout
}
}
p.td, p.head = status.TD, status.Head
p.td, p.justifiedNumber, p.head = status.TD, status.JustifiedNumber, status.Head
// Step 3
// if p.justifiedNumber == nil {
// return errors.New("nil justifiedNumber when Handshake")
// }

if p.version >= ETH68 {
var upgradeStatus UpgradeStatusPacket // safe to read after two values have been received from errc
Expand Down
10 changes: 5 additions & 5 deletions eth/protocols/eth/handshake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,19 @@ func testHandshake(t *testing.T, protocol uint) {
want: errNoStatusMsg,
},
{
code: StatusMsg, data: StatusPacket{10, 1, td, head.Hash(), genesis.Hash(), forkID},
code: StatusMsg, data: StatusPacket{10, 1, td, head.Hash(), genesis.Hash(), forkID, nil},
want: errProtocolVersionMismatch,
},
{
code: StatusMsg, data: StatusPacket{uint32(protocol), 999, td, head.Hash(), genesis.Hash(), forkID},
code: StatusMsg, data: StatusPacket{uint32(protocol), 999, td, head.Hash(), genesis.Hash(), forkID, nil},
want: errNetworkIDMismatch,
},
{
code: StatusMsg, data: StatusPacket{uint32(protocol), 1, td, head.Hash(), common.Hash{3}, forkID},
code: StatusMsg, data: StatusPacket{uint32(protocol), 1, td, head.Hash(), common.Hash{3}, forkID, nil},
want: errGenesisMismatch,
},
{
code: StatusMsg, data: StatusPacket{uint32(protocol), 1, td, head.Hash(), genesis.Hash(), forkid.ID{Hash: [4]byte{0x00, 0x01, 0x02, 0x03}}},
code: StatusMsg, data: StatusPacket{uint32(protocol), 1, td, head.Hash(), genesis.Hash(), forkid.ID{Hash: [4]byte{0x00, 0x01, 0x02, 0x03}}, nil},
want: errForkIDRejected,
},
}
Expand All @@ -80,7 +80,7 @@ func testHandshake(t *testing.T, protocol uint) {
// Send the junk test with one peer, check the handshake failure
go p2p.Send(app, test.code, test.data)

err := peer.Handshake(1, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain), nil)
err := peer.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain), nil)
if err == nil {
t.Errorf("test %d: protocol returned nil error, want %q", i, test.want)
} else if !errors.Is(err, test.want) {
Expand Down
26 changes: 18 additions & 8 deletions eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ type Peer struct {
version uint // Protocol version negotiated
statusExtension *UpgradeStatusExtension

lagging bool // lagging peer is still connected, but won't be used to sync.
head common.Hash // Latest advertised head block hash
td *big.Int // Latest advertised head block total difficulty
lagging bool // lagging peer is still connected, but won't be used to sync.
head common.Hash // Latest advertised head block hash
justifiedNumber *uint64 // Latest advertised justified block number
td *big.Int // Latest advertised head block total difficulty

knownBlocks *knownCache // Set of block hashes known to be known by this peer
queuedBlocks chan *blockPropagation // Queue of blocks to broadcast to the peer
Expand Down Expand Up @@ -160,21 +161,30 @@ func (p *Peer) MarkLagging() {
p.lagging = true
}

// Head retrieves the current head hash and total difficulty of the peer.
func (p *Peer) Head() (hash common.Hash, td *big.Int) {
// Head retrieves the current head hash, justifiedNumber and total difficulty of the peer.
func (p *Peer) Head() (hash common.Hash, justifiedNumber *uint64, td *big.Int) {
p.lock.RLock()
defer p.lock.RUnlock()

copy(hash[:], p.head[:])
return hash, new(big.Int).Set(p.td)
if p.justifiedNumber != nil {
tmp := *p.justifiedNumber
justifiedNumber = &tmp
}
return hash, justifiedNumber, new(big.Int).Set(p.td)
}

// SetHead updates the head hash and total difficulty of the peer.
func (p *Peer) SetHead(hash common.Hash, td *big.Int) {
// SetHead updates the head hash, justifiedNumber and total difficulty of the peer.
func (p *Peer) SetHead(hash common.Hash, justifiedNumber *uint64, td *big.Int) {
p.lock.Lock()
defer p.lock.Unlock()
p.lagging = false
copy(p.head[:], hash[:])
p.justifiedNumber = nil
if justifiedNumber != nil {
tmp := *justifiedNumber
p.justifiedNumber = &tmp
}
p.td.Set(td)
}

Expand Down
7 changes: 7 additions & 0 deletions eth/protocols/eth/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ type StatusPacket struct {
Head common.Hash
Genesis common.Hash
ForkID forkid.ID

// step 1: add the optional `JustifiedNumber` in client
// step 2: after one hard fork, all nodes can pase new `StatusPacket`, then pass `JustifiedNumber` in StatusMsg in a maintenance release
// step 3: after another hard fork, all nodes send StatusMsg with `JustifiedNumber` now, then refuse StatusMsg without `JustifiedNumber` in a maintenance release

// Step 1
JustifiedNumber *uint64 `rlp:"optional"`
}

type UpgradeStatusExtension struct {
Expand Down
Loading
Loading