diff --git a/eth/backend.go b/eth/backend.go index 0d434a9130..1cf5e33cbf 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -638,7 +638,7 @@ func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruni func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer } func (s *Ethereum) Merger() *consensus.Merger { return s.merger } func (s *Ethereum) SyncMode() downloader.SyncMode { - mode, _ := s.handler.chainSync.modeAndLocalHead() + mode, _, _ := s.handler.chainSync.modeAndLocalHead() return mode } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 8338fd9316..5eaeb1c39c 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -680,7 +680,7 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty mode := d.getMode() // Request the advertised remote head block and wait for the response - latest, _ := p.peer.Head() + latest, _, _ := p.peer.Head() fetch := 1 if mode == SnapSync { fetch = 2 // head + pivot headers diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 0a007644d2..9d3782a682 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -156,11 +156,12 @@ type downloadTesterPeer struct { func (dlp *downloadTesterPeer) MarkLagging() { } -// Head constructs a function to retrieve a peer's current head hash -// and total difficulty. -func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) { +// Head constructs a function to retrieve a peer's current head hash, +// justifiedNumber and total difficulty. +func (dlp *downloadTesterPeer) Head() (common.Hash, *uint64, *big.Int) { head := dlp.chain.CurrentBlock() - return head.Hash(), dlp.chain.GetTd(head.Hash(), head.Number.Uint64()) + justifiedNumber := dlp.chain.GetJustifiedNumber(head) + return head.Hash(), &justifiedNumber, dlp.chain.GetTd(head.Hash(), head.Number.Uint64()) } func unmarshalRlpHeaders(rlpdata []rlp.RawValue) []*types.Header { diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 2184cef159..3575648236 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -57,7 +57,7 @@ type peerConnection struct { // Peer encapsulates the methods required to synchronise with a remote full peer. type Peer interface { - Head() (common.Hash, *big.Int) + Head() (common.Hash, *uint64, *big.Int) MarkLagging() RequestHeadersByHash(common.Hash, int, int, bool, chan *eth.Response) (*eth.Request, error) RequestHeadersByNumber(uint64, int, int, bool, chan *eth.Response) (*eth.Request, error) diff --git a/eth/handler.go b/eth/handler.go index cc3bf382b4..2a02f7f106 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -436,14 +436,15 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { // Execute the Ethereum handshake var ( - genesis = h.chain.Genesis() - head = h.chain.CurrentHeader() - hash = head.Hash() - number = head.Number.Uint64() - td = h.chain.GetTd(hash, number) + genesis = h.chain.Genesis() + head = h.chain.CurrentHeader() + justifiedNumber = h.chain.GetJustifiedNumber(head) + hash = head.Hash() + number = head.Number.Uint64() + td = h.chain.GetTd(hash, number) ) forkID := forkid.NewID(h.chain.Config(), genesis, number, head.Time) - if err := peer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter, ð.UpgradeStatusExtension{DisablePeerTxBroadcast: h.disablePeerTxBroadcast}); err != nil { + if err := peer.Handshake(h.networkID, justifiedNumber, td, hash, genesis.Hash(), forkID, h.forkFilter, ð.UpgradeStatusExtension{DisablePeerTxBroadcast: h.disablePeerTxBroadcast}); err != nil { peer.Log().Debug("Ethereum handshake failed", "err", err) return err } @@ -920,7 +921,7 @@ func (h *handler) BroadcastVote(vote *types.VoteEnvelope) { headBlock := h.chain.CurrentBlock() currentTD := h.chain.GetTd(headBlock.Hash(), headBlock.Number.Uint64()) for _, peer := range peers { - _, peerTD := peer.Head() + _, _, peerTD := peer.Head() deltaTD := new(big.Int).Abs(new(big.Int).Sub(currentTD, peerTD)) if deltaTD.Cmp(big.NewInt(deltaTdThreshold)) < 1 && peer.bscExt != nil { voteMap[peer] = vote diff --git a/eth/handler_bsc_test.go b/eth/handler_bsc_test.go index 076b08c213..cb1a67722d 100644 --- a/eth/handler_bsc_test.go +++ b/eth/handler_bsc_test.go @@ -125,7 +125,7 @@ func testSendVotes(t *testing.T, protocol uint) { td = handler.chain.GetTd(head.Hash(), head.Number.Uint64()) ) time.Sleep(200 * time.Millisecond) - if err := remoteEth.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil { + if err := remoteEth.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil { t.Fatalf("failed to run protocol handshake: %d", err) } // After the handshake completes, the source handler should stream the sink @@ -227,7 +227,7 @@ func testRecvVotes(t *testing.T, protocol uint) { td = handler.chain.GetTd(head.Hash(), head.Number.Uint64()) ) time.Sleep(200 * time.Millisecond) - if err := remoteEth.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil { + if err := remoteEth.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil { t.Fatalf("failed to run protocol handshake: %d", err) } diff --git a/eth/handler_eth.go b/eth/handler_eth.go index dd119fdf6e..ec901849e1 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/protocols/eth" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" ) @@ -135,13 +136,38 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, packet *eth.NewBlockPa // Assuming the block is importable by the peer, but possibly not yet done so, // calculate the head hash and TD that the peer truly must have. var ( - trueHead = block.ParentHash() - trueTD = new(big.Int).Sub(td, block.Difficulty()) + trueHead = block.ParentHash() + trueJustifiedNumber *uint64 + trueTD = new(big.Int).Sub(td, block.Difficulty()) ) - // Update the peer's total difficulty if better than the previous - if _, td := peer.Head(); trueTD.Cmp(td) > 0 { - peer.SetHead(trueHead, trueTD) - h.chainSync.handlePeerEvent() + if trueHeadHeader := h.chain.GetHeaderByHash(trueHead); trueHeadHeader != nil { + // If the trueHeadHeader is not found in the local chain, GetJustifiedNumber will return 0. + // Ignore cases where GetJustifiedNumber actually returns 0, as this only occurs in a self-test environment. + if tmp := h.chain.GetJustifiedNumber(trueHeadHeader); tmp != 0 { + trueJustifiedNumber = &tmp + } + } + // Update the peer's justifiedNumber and total difficulty if better than the previous + if _, justifiedNumber, td := peer.Head(); trueJustifiedNumber != nil && justifiedNumber != nil { + if *trueJustifiedNumber > *justifiedNumber || + (*trueJustifiedNumber == *justifiedNumber && trueTD.Cmp(td) > 0) { + peer.SetHead(trueHead, trueJustifiedNumber, trueTD) + h.chainSync.handlePeerEvent() + log.Trace("handleBlockBroadcast|SetHead", "justifiedNumber", *justifiedNumber, "td", td.Uint64(), "trueHead", trueHead, "trueJustifiedNumber", *trueJustifiedNumber, "trueTD", trueTD.Uint64()) + } + } else { + // back to behavior without fast finality + if trueTD.Cmp(td) > 0 { + peer.SetHead(trueHead, trueJustifiedNumber, trueTD) + h.chainSync.handlePeerEvent() + + if trueJustifiedNumber == nil { + log.Trace("handleBlockBroadcast|SetHead", "trueHead", trueHead, "trueJustifiedNumber", nil, "trueTD", trueTD.Uint64(), "td", td.Uint64()) + } else { + log.Trace("handleBlockBroadcast|SetHead", "trueHead", trueHead, "trueJustifiedNumber", *trueJustifiedNumber, "trueTD", trueTD.Uint64(), "td", td.Uint64()) + } + } } + return nil } diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index c35bf0f4ce..637c8f2f6d 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -285,7 +285,7 @@ func testRecvTransactions(t *testing.T, protocol uint) { head = handler.chain.CurrentBlock() td = handler.chain.GetTd(head.Hash(), head.Number.Uint64()) ) - if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil { + if err := src.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil { t.Fatalf("failed to run protocol handshake") } // Send the transaction to the sink and verify that it's added to the tx pool @@ -419,7 +419,7 @@ func testSendTransactions(t *testing.T, protocol uint) { head = handler.chain.CurrentBlock() td = handler.chain.GetTd(head.Hash(), head.Number.Uint64()) ) - if err := sink.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil { + if err := sink.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil { t.Fatalf("failed to run protocol handshake") } // After the handshake completes, the source handler should stream the sink @@ -636,7 +636,7 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) { // Wait a bit for the above handlers to start time.Sleep(100 * time.Millisecond) - if err := sinkPeer.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil { + if err := sinkPeer.Handshake(1, 0, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil { t.Fatalf("failed to run protocol handshake") } go eth.Handle(sink, sinkPeer) @@ -709,7 +709,7 @@ func testBroadcastMalformedBlock(t *testing.T, protocol uint) { genesis = source.chain.Genesis() td = source.chain.GetTd(genesis.Hash(), genesis.NumberU64()) ) - if err := sink.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil { + if err := sink.Handshake(1, 0, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil { t.Fatalf("failed to run protocol handshake") } // After the handshake completes, the source handler should stream the sink @@ -811,7 +811,7 @@ func TestOptionMaxPeersPerIP(t *testing.T) { t.Errorf("current num is %d, maxPeersPerIP is %d, but failed:%s", num, maxPeersPerIP, err) }(tryNum) - if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil { + if err := src.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil { t.Fatalf("failed to run protocol handshake") } // make sure runEthPeer execute one by one. diff --git a/eth/peerset.go b/eth/peerset.go index ff4215eda3..caebec7858 100644 --- a/eth/peerset.go +++ b/eth/peerset.go @@ -23,12 +23,14 @@ import ( "sync" "time" + "github.com/cometbft/cometbft/libs/rand" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/eth/protocols/bsc" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/eth/protocols/trust" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" ) @@ -515,24 +517,55 @@ func (ps *peerSet) snapLen() int { return ps.snapPeers } -// peerWithHighestTD retrieves the known peer with the currently highest total -// difficulty, but below the given PoS switchover threshold. -func (ps *peerSet) peerWithHighestTD() *eth.Peer { +// peerWithHighestHead retrieves the known peer with the currently highest head +func (ps *peerSet) peerWithHighestHead() *eth.Peer { ps.lock.RLock() defer ps.lock.RUnlock() - var ( - bestPeer *eth.Peer - bestTd *big.Int - ) + var knowJustifiedPeers, notKnowJustifiedPeers []*ethPeer for _, p := range ps.peers { if p.Lagging() { continue } - if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 { + if _, justifiedNumber, td := p.Head(); justifiedNumber != nil { + knowJustifiedPeers = append(knowJustifiedPeers, p) + log.Trace("peerWithHighestHead", "id", p.Peer.ID(), "justifiedNumber", *justifiedNumber, "td", td.Uint64()) + } else { + notKnowJustifiedPeers = append(notKnowJustifiedPeers, p) + log.Trace("peerWithHighestHead", "id", p.Peer.ID(), "td", td.Uint64()) + } + } + + var ( + bestPeer *eth.Peer + bestJustified *uint64 + bestTd *big.Int + randUint = rand.Uint() + ) + for _, p := range knowJustifiedPeers { + if _, justifiedNumber, td := p.Head(); bestPeer == nil { + bestPeer, bestJustified, bestTd = p.Peer, justifiedNumber, td + } else if *justifiedNumber > *bestJustified { + bestPeer, bestJustified = p.Peer, justifiedNumber + if td.Cmp(bestTd) > 0 { + bestTd = td // may be not equal `to bestPeer.td` + } + } else if *justifiedNumber == *bestJustified { + if td.Cmp(bestTd) > 0 || (td.Cmp(bestTd) == 0 && randUint%2 == 0) { + bestPeer, bestTd = p.Peer, td + } + } + } + // if some nodes does not have justified number, back to behavior without fast finality + for _, p := range notKnowJustifiedPeers { + if _, _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 || (td.Cmp(bestTd) == 0 && randUint%2 == 0) { bestPeer, bestTd = p.Peer, td } } + if bestPeer != nil { + log.Trace("peerWithHighestHead|selected", "id", bestPeer.Peer.ID(), "td", bestTd.Uint64()) + } + return bestPeer } diff --git a/eth/protocols/eth/handshake.go b/eth/protocols/eth/handshake.go index e7d42aa0da..c6ab33fd5f 100644 --- a/eth/protocols/eth/handshake.go +++ b/eth/protocols/eth/handshake.go @@ -36,8 +36,8 @@ const ( ) // Handshake executes the eth protocol handshake, negotiating version number, -// network IDs, difficulties, head and genesis blocks. -func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter, extension *UpgradeStatusExtension) error { +// network IDs, justifiedNumber, difficulties, head and genesis blocks. +func (p *Peer) Handshake(network uint64, justifiedNumber uint64, td *big.Int, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter, extension *UpgradeStatusExtension) error { // Send out own handshake in a new thread errc := make(chan error, 2) @@ -51,6 +51,8 @@ func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis Head: head, Genesis: genesis, ForkID: forkID, + // Step 2 + // JustifiedNumber: &justifiedNumber, }) }) gopool.Submit(func() { @@ -70,7 +72,11 @@ func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis return p2p.DiscReadTimeout } } - p.td, p.head = status.TD, status.Head + p.td, p.justifiedNumber, p.head = status.TD, status.JustifiedNumber, status.Head + // Step 3 + // if p.justifiedNumber == nil { + // return errors.New("nil justifiedNumber when Handshake") + // } if p.version >= ETH68 { var upgradeStatus UpgradeStatusPacket // safe to read after two values have been received from errc diff --git a/eth/protocols/eth/handshake_test.go b/eth/protocols/eth/handshake_test.go index 3ad73b58ea..0f69018f6e 100644 --- a/eth/protocols/eth/handshake_test.go +++ b/eth/protocols/eth/handshake_test.go @@ -52,19 +52,19 @@ func testHandshake(t *testing.T, protocol uint) { want: errNoStatusMsg, }, { - code: StatusMsg, data: StatusPacket{10, 1, td, head.Hash(), genesis.Hash(), forkID}, + code: StatusMsg, data: StatusPacket{10, 1, td, head.Hash(), genesis.Hash(), forkID, nil}, want: errProtocolVersionMismatch, }, { - code: StatusMsg, data: StatusPacket{uint32(protocol), 999, td, head.Hash(), genesis.Hash(), forkID}, + code: StatusMsg, data: StatusPacket{uint32(protocol), 999, td, head.Hash(), genesis.Hash(), forkID, nil}, want: errNetworkIDMismatch, }, { - code: StatusMsg, data: StatusPacket{uint32(protocol), 1, td, head.Hash(), common.Hash{3}, forkID}, + code: StatusMsg, data: StatusPacket{uint32(protocol), 1, td, head.Hash(), common.Hash{3}, forkID, nil}, want: errGenesisMismatch, }, { - code: StatusMsg, data: StatusPacket{uint32(protocol), 1, td, head.Hash(), genesis.Hash(), forkid.ID{Hash: [4]byte{0x00, 0x01, 0x02, 0x03}}}, + code: StatusMsg, data: StatusPacket{uint32(protocol), 1, td, head.Hash(), genesis.Hash(), forkid.ID{Hash: [4]byte{0x00, 0x01, 0x02, 0x03}}, nil}, want: errForkIDRejected, }, } @@ -80,7 +80,7 @@ func testHandshake(t *testing.T, protocol uint) { // Send the junk test with one peer, check the handshake failure go p2p.Send(app, test.code, test.data) - err := peer.Handshake(1, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain), nil) + err := peer.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain), nil) if err == nil { t.Errorf("test %d: protocol returned nil error, want %q", i, test.want) } else if !errors.Is(err, test.want) { diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 7720d9bd44..9b10467df2 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -73,9 +73,10 @@ type Peer struct { version uint // Protocol version negotiated statusExtension *UpgradeStatusExtension - lagging bool // lagging peer is still connected, but won't be used to sync. - head common.Hash // Latest advertised head block hash - td *big.Int // Latest advertised head block total difficulty + lagging bool // lagging peer is still connected, but won't be used to sync. + head common.Hash // Latest advertised head block hash + justifiedNumber *uint64 // Latest advertised justified block number + td *big.Int // Latest advertised head block total difficulty knownBlocks *knownCache // Set of block hashes known to be known by this peer queuedBlocks chan *blockPropagation // Queue of blocks to broadcast to the peer @@ -160,21 +161,30 @@ func (p *Peer) MarkLagging() { p.lagging = true } -// Head retrieves the current head hash and total difficulty of the peer. -func (p *Peer) Head() (hash common.Hash, td *big.Int) { +// Head retrieves the current head hash, justifiedNumber and total difficulty of the peer. +func (p *Peer) Head() (hash common.Hash, justifiedNumber *uint64, td *big.Int) { p.lock.RLock() defer p.lock.RUnlock() copy(hash[:], p.head[:]) - return hash, new(big.Int).Set(p.td) + if p.justifiedNumber != nil { + tmp := *p.justifiedNumber + justifiedNumber = &tmp + } + return hash, justifiedNumber, new(big.Int).Set(p.td) } -// SetHead updates the head hash and total difficulty of the peer. -func (p *Peer) SetHead(hash common.Hash, td *big.Int) { +// SetHead updates the head hash, justifiedNumber and total difficulty of the peer. +func (p *Peer) SetHead(hash common.Hash, justifiedNumber *uint64, td *big.Int) { p.lock.Lock() defer p.lock.Unlock() p.lagging = false copy(p.head[:], hash[:]) + p.justifiedNumber = nil + if justifiedNumber != nil { + tmp := *justifiedNumber + p.justifiedNumber = &tmp + } p.td.Set(td) } diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index 46bc97fbb8..50deb41be7 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -91,6 +91,13 @@ type StatusPacket struct { Head common.Hash Genesis common.Hash ForkID forkid.ID + + // step 1: add the optional `JustifiedNumber` in client + // step 2: after one hard fork, all nodes can pase new `StatusPacket`, then pass `JustifiedNumber` in StatusMsg in a maintenance release + // step 3: after another hard fork, all nodes send StatusMsg with `JustifiedNumber` now, then refuse StatusMsg without `JustifiedNumber` in a maintenance release + + // Step 1 + JustifiedNumber *uint64 `rlp:"optional"` } type UpgradeStatusExtension struct { diff --git a/eth/sync.go b/eth/sync.go index db0ec62617..cf0cb8f80a 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -68,10 +68,11 @@ type chainSyncer struct { // chainSyncOp is a scheduled sync operation. type chainSyncOp struct { - mode downloader.SyncMode - peer *eth.Peer - td *big.Int - head common.Hash + mode downloader.SyncMode + peer *eth.Peer + justifiedNumber *uint64 + td *big.Int + head common.Hash } // newChainSyncer creates a chainSyncer. @@ -163,16 +164,17 @@ func (cs *chainSyncer) nextSyncOp() *chainSyncOp { if cs.handler.peers.len() < minPeers { return nil } - // We have enough peers, pick the one with the highest TD, but avoid going - // over the terminal total difficulty. Above that we expect the consensus + // We have enough peers, pick the one with the highest Head. Above that we expect the consensus // clients to direct the chain head to sync to. - peer := cs.handler.peers.peerWithHighestTD() + peer := cs.handler.peers.peerWithHighestHead() if peer == nil { return nil } - mode, ourTD := cs.modeAndLocalHead() + mode, ourJustifiedNumber, ourTD := cs.modeAndLocalHead() op := peerToSyncOp(mode, peer) - if op.td.Cmp(ourTD) <= 0 { + // TODO: op.td.Cmp(ourTD) <= 0 --> op.td.Cmp(ourTD) < 0? + if (op.justifiedNumber == nil && op.td.Cmp(ourTD) <= 0) || + (op.justifiedNumber != nil && (*op.justifiedNumber < *ourJustifiedNumber || (*op.justifiedNumber == *ourJustifiedNumber && op.td.Cmp(ourTD) <= 0))) { // We seem to be in sync according to the legacy rules. In the merge // world, it can also mean we're stuck on the merge block, waiting for // a beacon client. In the latter case, notify the user. @@ -186,16 +188,16 @@ func (cs *chainSyncer) nextSyncOp() *chainSyncOp { } func peerToSyncOp(mode downloader.SyncMode, p *eth.Peer) *chainSyncOp { - peerHead, peerTD := p.Head() - return &chainSyncOp{mode: mode, peer: p, td: peerTD, head: peerHead} + peerHead, peerJustifiedNumber, peerTD := p.Head() + return &chainSyncOp{mode: mode, peer: p, justifiedNumber: peerJustifiedNumber, td: peerTD, head: peerHead} } -func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) { +func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *uint64, *big.Int) { // If we're in snap sync mode, return that directly if cs.handler.snapSync.Load() { block := cs.handler.chain.CurrentSnapBlock() td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64()) - return downloader.SnapSync, td + return downloader.SnapSync, nil, td } // We are probably in full sync, but we might have rewound to before the // snap sync pivot, check if we should re-enable snap sync. @@ -207,7 +209,7 @@ func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) { } block := cs.handler.chain.CurrentSnapBlock() td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64()) - return downloader.SnapSync, td + return downloader.SnapSync, nil, td } } // We are in a full sync, but the associated head state is missing. To complete @@ -217,11 +219,12 @@ func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) { block := cs.handler.chain.CurrentSnapBlock() td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64()) log.Info("Reenabled snap sync as chain is stateless") - return downloader.SnapSync, td + return downloader.SnapSync, nil, td } // Nope, we're really full syncing + justifiedNumber := cs.handler.chain.GetJustifiedNumber(head) td := cs.handler.chain.GetTd(head.Hash(), head.Number.Uint64()) - return downloader.FullSync, td + return downloader.FullSync, &justifiedNumber, td } // startSync launches doSync in a new goroutine. diff --git a/eth/sync_test.go b/eth/sync_test.go index 72d018527e..2c29f40ea2 100644 --- a/eth/sync_test.go +++ b/eth/sync_test.go @@ -89,7 +89,7 @@ func testSnapSyncDisabling(t *testing.T, ethVer uint, snapVer uint) { time.Sleep(250 * time.Millisecond) // Check that snap sync was disabled - op := peerToSyncOp(downloader.SnapSync, empty.handler.peers.peerWithHighestTD()) + op := peerToSyncOp(downloader.SnapSync, empty.handler.peers.peerWithHighestHead()) if err := empty.handler.doSync(op); err != nil { t.Fatal("sync failed:", err) } @@ -171,7 +171,7 @@ func testChainSyncWithBlobs(t *testing.T, mode downloader.SyncMode, preCancunBlk time.Sleep(100 * time.Millisecond) } - op := peerToSyncOp(mode, empty.handler.peers.peerWithHighestTD()) + op := peerToSyncOp(mode, empty.handler.peers.peerWithHighestHead()) if err := empty.handler.doSync(op); err != nil { t.Fatal("sync failed:", err) }