Skip to content

feat: add flags to enable broadcast blocks and transactions to all peers #1219

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,10 @@ var (
utils.CircuitCapacityCheckWorkersFlag,
utils.RollupVerifyEnabledFlag,
utils.ShadowforkPeersFlag,
utils.TxGossipBroadcastDisabledFlag,
utils.TxGossipReceivingDisabledFlag,
utils.GossipTxBroadcastDisabledFlag,
utils.GossipTxReceivingDisabledFlag,
utils.GossipBroadcastToAllEnabledFlag,
utils.GossipBroadcastToAllCapFlag,
utils.DASyncEnabledFlag,
utils.DABlockNativeAPIEndpointFlag,
utils.DABlobScanAPIEndpointFlag,
Expand Down
6 changes: 4 additions & 2 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,10 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.DARecoveryProduceBlocksFlag,
utils.CircuitCapacityCheckEnabledFlag,
utils.CircuitCapacityCheckWorkersFlag,
utils.TxGossipBroadcastDisabledFlag,
utils.TxGossipReceivingDisabledFlag,
utils.GossipTxBroadcastDisabledFlag,
utils.GossipTxReceivingDisabledFlag,
utils.GossipBroadcastToAllEnabledFlag,
utils.GossipBroadcastToAllCapFlag,
},
},
{
Expand Down
40 changes: 29 additions & 11 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,15 +893,23 @@ var (
Usage: "peer ids of shadow fork peers",
}

// Tx gossip settings
TxGossipBroadcastDisabledFlag = cli.BoolFlag{
Name: "txgossip.disablebroadcast",
// Gossip settings
GossipTxBroadcastDisabledFlag = cli.BoolFlag{
Name: "gossip.disabletxbroadcast",
Usage: "Disable gossip broadcast transactions to other peers",
}
TxGossipReceivingDisabledFlag = cli.BoolFlag{
Name: "txgossip.disablereceiving",
GossipTxReceivingDisabledFlag = cli.BoolFlag{
Name: "gossip.disabletxreceiving",
Usage: "Disable gossip receiving transactions from other peers",
}
GossipBroadcastToAllEnabledFlag = cli.BoolFlag{
Name: "gossip.enablebroadcasttoall",
Usage: "Enable gossip broadcast blocks and transactions to all peers",
}
GossipBroadcastToAllCapFlag = cli.IntFlag{
Name: "gossip.broadcasttoallcap",
Usage: "Maximum number of peers for broadcasting blocks and transactions (effective only when gossip.enablebroadcasttoall is enabled)",
}

// DA syncing settings
DASyncEnabledFlag = cli.BoolFlag{
Expand Down Expand Up @@ -1807,14 +1815,24 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
cfg.ShadowForkPeerIDs = ctx.GlobalStringSlice(ShadowforkPeersFlag.Name)
log.Info("Shadow fork peers", "ids", cfg.ShadowForkPeerIDs)
}
if ctx.GlobalIsSet(TxGossipBroadcastDisabledFlag.Name) {
cfg.TxGossipBroadcastDisabled = ctx.GlobalBool(TxGossipBroadcastDisabledFlag.Name)
log.Info("Transaction gossip broadcast disabled", "disabled", cfg.TxGossipBroadcastDisabled)
if ctx.GlobalIsSet(GossipTxBroadcastDisabledFlag.Name) {
cfg.GossipTxBroadcastDisabled = ctx.GlobalBool(GossipTxBroadcastDisabledFlag.Name)
log.Info("Gossip transaction broadcast disabled", "disabled", cfg.GossipTxBroadcastDisabled)
}
if ctx.GlobalIsSet(GossipTxReceivingDisabledFlag.Name) {
cfg.GossipTxReceivingDisabled = ctx.GlobalBool(GossipTxReceivingDisabledFlag.Name)
log.Info("Gossip transaction receiving disabled", "disabled", cfg.GossipTxReceivingDisabled)
}
if ctx.GlobalIsSet(GossipBroadcastToAllEnabledFlag.Name) {
cfg.GossipBroadcastToAllEnabled = ctx.GlobalBool(GossipBroadcastToAllEnabledFlag.Name)
log.Info("Gossip broadcast to all enabled", "enabled", cfg.GossipBroadcastToAllEnabled)
}
if ctx.GlobalIsSet(TxGossipReceivingDisabledFlag.Name) {
cfg.TxGossipReceivingDisabled = ctx.GlobalBool(TxGossipReceivingDisabledFlag.Name)
log.Info("Transaction gossip receiving disabled", "disabled", cfg.TxGossipReceivingDisabled)
// Only configure the gossip broadcast-to-all flag if --gossip.enablebroadcasttoall is set to true.
if ctx.GlobalIsSet(GossipBroadcastToAllCapFlag.Name) && cfg.GossipBroadcastToAllEnabled {
cfg.GossipBroadcastToAllCap = ctx.GlobalInt(GossipBroadcastToAllCapFlag.Name)
log.Info("Maximum number of peers for broadcasting blocks and transactions is set", "cap", cfg.GossipBroadcastToAllCap)
}


// Cap the cache allowance and tune the garbage collector
mem, err := gopsutil.VirtualMemory()
Expand Down
26 changes: 14 additions & 12 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,18 +273,20 @@ func New(stack *node.Node, config *ethconfig.Config, l1Client l1.Client) (*Ether
checkpoint = params.TrustedCheckpoints[genesisHash]
}
if eth.handler, err = newHandler(&handlerConfig{
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Network: config.NetworkId,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
Checkpoint: checkpoint,
Whitelist: config.Whitelist,
ShadowForkPeerIDs: config.ShadowForkPeerIDs,
DisableTxBroadcast: config.TxGossipBroadcastDisabled,
DisableTxReceiving: config.TxGossipReceivingDisabled,
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Network: config.NetworkId,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
Checkpoint: checkpoint,
Whitelist: config.Whitelist,
ShadowForkPeerIDs: config.ShadowForkPeerIDs,
DisableTxBroadcast: config.GossipTxBroadcastDisabled,
DisableTxReceiving: config.GossipTxReceivingDisabled,
EnableBroadcastToAll: config.GossipBroadcastToAllEnabled,
BroadcastToAllCap: config.GossipBroadcastToAllCap,
}); err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,10 @@ type Config struct {
// DA syncer options
DA da_syncer.Config

TxGossipBroadcastDisabled bool
TxGossipReceivingDisabled bool
GossipTxBroadcastDisabled bool
GossipTxReceivingDisabled bool
GossipBroadcastToAllEnabled bool
GossipBroadcastToAllCap int
}

// CreateConsensusEngine creates a consensus engine for the given chain configuration.
Expand Down
56 changes: 38 additions & 18 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,11 @@ type handlerConfig struct {
Whitelist map[uint64]common.Hash // Hard coded whitelist for sync challenged
ShadowForkPeerIDs []string // List of peer ids that take part in the shadow-fork

DisableTxBroadcast bool
DisableTxReceiving bool
// Gossip configs
DisableTxBroadcast bool
DisableTxReceiving bool
EnableBroadcastToAll bool
BroadcastToAllCap int
}

type handler struct {
Expand Down Expand Up @@ -134,9 +137,11 @@ type handler struct {
wg sync.WaitGroup
peerWG sync.WaitGroup

shadowForkPeerIDs []string
disableTxBroadcast bool
disableTxReceiving bool
shadowForkPeerIDs []string
disableTxBroadcast bool
disableTxReceiving bool
enableBroadcastToAll bool
broadcastToAllCap int
}

// newHandler returns a handler for all Ethereum chain management protocol.
Expand All @@ -146,18 +151,24 @@ func newHandler(config *handlerConfig) (*handler, error) {
config.EventMux = new(event.TypeMux) // Nicety initialization for tests
}
h := &handler{
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
chain: config.Chain,
peers: newPeerSet(),
whitelist: config.Whitelist,
quitSync: make(chan struct{}),
shadowForkPeerIDs: config.ShadowForkPeerIDs,
disableTxBroadcast: config.DisableTxBroadcast,
disableTxReceiving: config.DisableTxReceiving,
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
chain: config.Chain,
peers: newPeerSet(),
whitelist: config.Whitelist,
quitSync: make(chan struct{}),
shadowForkPeerIDs: config.ShadowForkPeerIDs,
disableTxBroadcast: config.DisableTxBroadcast,
disableTxReceiving: config.DisableTxReceiving,
enableBroadcastToAll: config.EnableBroadcastToAll,
}
h.broadcastToAllCap = config.BroadcastToAllCap
if config.BroadcastToAllCap == 0 && config.EnableBroadcastToAll {
// Set default broadcast cap to 30 if not specified
h.broadcastToAllCap = 30
}
if config.Sync == downloader.FullSync {
// The database seems empty as the current block is the genesis. Yet the fast
Expand Down Expand Up @@ -477,7 +488,12 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
return
}
// Send the block to a subset of our peers
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
numDirect := int(math.Sqrt(float64(len(peers))))
// If enableBroadcastToAll is true, broadcast blocks directly to all peers (capped at 100).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A gut feeling is 20 to 30 might be sufficient. But for robustness, this can also be added to the configuration, enabled together with enableBroadcastToAll.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice advise, added this with a default cap 30.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the chart of bootnode-0 provided by @colinlyguo it looks like it has a max peers of 163. For bootnodes and other nodes operated by Scroll, will we be setting this value higher to include all 163 peers?

if h.enableBroadcastToAll {
numDirect = min(h.broadcastToAllCap, len(peers))
}
transfer := peers[:numDirect]
for _, peer := range transfer {
peer.AsyncSendNewBlock(block, td)
}
Expand Down Expand Up @@ -518,6 +534,10 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
peers := onlyShadowForkPeers(h.shadowForkPeerIDs, h.peers.peersWithoutTransaction(tx.Hash()))
// Send the tx unconditionally to a subset of our peers
numDirect := int(math.Sqrt(float64(len(peers))))
// If enableBroadcastToAll is true, broadcast transactions directly to all peers (capped at 100).
if h.enableBroadcastToAll {
numDirect = min(h.broadcastToAllCap, len(peers))
}
for _, peer := range peers[:numDirect] {
txset[peer] = append(txset[peer], tx.Hash())
}
Expand Down
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const (
VersionMajor = 5 // Major version component of the current release
VersionMinor = 8 // Minor version component of the current release
VersionPatch = 65 // Patch version component of the current release
VersionPatch = 66 // Patch version component of the current release
VersionMeta = "mainnet" // Version metadata to append to the version string
)

Expand Down