Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New higher capacity Txpool #2660

Open
wants to merge 42 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
dfb046e
txpool: incomplete pool2 and pool3
emailtovamos Jul 16, 2024
e30248b
pool: optimise sending in reorg
emailtovamos Jul 29, 2024
f80ac01
pool: add static info and simplify transfer
emailtovamos Aug 1, 2024
8655d30
pool: remove comments, use queue not pool23
emailtovamos Aug 1, 2024
4538e92
pool: remove unused function
emailtovamos Aug 1, 2024
4369e3d
pool: refactor and bugfix
emailtovamos Aug 1, 2024
d0d6a27
pool: buffer test and size logic
emailtovamos Aug 2, 2024
5faf413
pool: add discarded ones to pool3 by default.
emailtovamos Aug 2, 2024
5bb78b3
pool: minor refactor
emailtovamos Aug 20, 2024
6b4e16b
pool: make slots config
emailtovamos Aug 21, 2024
d03f7e5
pool: initialise pool3 slots
emailtovamos Aug 21, 2024
94a60a9
pool: add underpriced to pool2 or 3
emailtovamos Aug 23, 2024
ed2d1d7
pool: enqueue in pool2 & drop properly
emailtovamos Aug 25, 2024
6daecfb
pool: bugfix:always drop drop and pool2 size
emailtovamos Aug 27, 2024
9d7298f
pool: TestDualHeapEviction passing partly
emailtovamos Aug 27, 2024
a1a25e9
pool: TestDualHeapEviction fully pass
emailtovamos Aug 27, 2024
253d9a5
pool: some cleanups
emailtovamos Aug 27, 2024
0692a99
pool: fix the TestTransactionFutureAttack test
emailtovamos Aug 28, 2024
40dcfcd
pool: cleanup debug logs
emailtovamos Aug 28, 2024
6673f3e
pool: fix TestUnderpricingDynamicFee based on new pool
emailtovamos Aug 28, 2024
ebd8f59
pool: fix all old tests
emailtovamos Aug 28, 2024
bdb4cc2
pool: lint
emailtovamos Aug 29, 2024
e45e7eb
pool: include static in flatten
emailtovamos Aug 29, 2024
069eaf2
pool: proper use of AsyncSendPooledTransactionHashes
emailtovamos Aug 29, 2024
70ece93
pool: flags for pool2 and 3 capacity
emailtovamos Aug 29, 2024
e7d0a16
pool: fix test as now by default pool2 and pool3 aren't empty
emailtovamos Aug 30, 2024
16a2a53
Merge remote-tracking branch 'origin/develop' into txpool-new
emailtovamos Sep 2, 2024
0f8a1b5
pool: test for transfer
emailtovamos Sep 3, 2024
76d157d
pool: set transfer time in config
emailtovamos Sep 3, 2024
aeec0c7
pool: remove unused criticalpathpool
emailtovamos Sep 3, 2024
53042e1
buffer: make private
emailtovamos Sep 3, 2024
8e6833c
pool: bug fix and test fix
emailtovamos Sep 3, 2024
5f398db
pool: pool2 can have 0 size
emailtovamos Sep 3, 2024
706a24e
pool: lint fix
emailtovamos Sep 4, 2024
0e61543
test: requestPromoteExecutables after every enqueue for testing
emailtovamos Sep 6, 2024
0a5dbef
pool: queued goes to 0 locally after this change
emailtovamos Sep 6, 2024
248bb6b
pool: fastcache, interface, metrics modify
emailtovamos Sep 11, 2024
cf10c5c
eth: send to some peers of pool2, not just static
emailtovamos Sep 11, 2024
b818cb7
pool: transfer on block import and simplify it
emailtovamos Sep 12, 2024
774e314
pool: truly discard underpriced, Transfer after lock is over
emailtovamos Sep 18, 2024
629af6d
pool: else ifs instead of ifs
emailtovamos Sep 18, 2024
3e3c56b
pool: address minor issues
emailtovamos Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ var (
utils.TxPoolGlobalSlotsFlag,
utils.TxPoolAccountQueueFlag,
utils.TxPoolGlobalQueueFlag,
utils.TxPoolPool2SlotsFlag,
emailtovamos marked this conversation as resolved.
Show resolved Hide resolved
utils.TxPoolPool3SlotsFlag,
emailtovamos marked this conversation as resolved.
Show resolved Hide resolved
utils.TxPoolLifetimeFlag,
utils.TxPoolReannounceTimeFlag,
utils.BlobPoolDataDirFlag,
Expand Down
20 changes: 20 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,18 @@ var (
Value: ethconfig.Defaults.TxPool.GlobalQueue,
Category: flags.TxPoolCategory,
}
TxPoolPool2SlotsFlag = &cli.Uint64Flag{
Name: "txpool.pool2slots",
Usage: "Maximum number of transaction slots in pool 2",
Value: ethconfig.Defaults.TxPool.Pool2Slots,
Category: flags.TxPoolCategory,
}
TxPoolPool3SlotsFlag = &cli.Uint64Flag{
Name: "txpool.pool3slots",
Usage: "Maximum number of transaction slots in pool 3",
Value: ethconfig.Defaults.TxPool.Pool3Slots,
Category: flags.TxPoolCategory,
}
TxPoolLifetimeFlag = &cli.DurationFlag{
Name: "txpool.lifetime",
Usage: "Maximum amount of time non-executable transaction are queued",
Expand Down Expand Up @@ -1762,6 +1774,12 @@ func setTxPool(ctx *cli.Context, cfg *legacypool.Config) {
if ctx.IsSet(TxPoolGlobalQueueFlag.Name) {
cfg.GlobalQueue = ctx.Uint64(TxPoolGlobalQueueFlag.Name)
}
if ctx.IsSet(TxPoolPool2SlotsFlag.Name) {
cfg.Pool2Slots = ctx.Uint64(TxPoolPool2SlotsFlag.Name)
}
if ctx.IsSet(TxPoolPool3SlotsFlag.Name) {
cfg.Pool3Slots = ctx.Uint64(TxPoolPool3SlotsFlag.Name)
}
if ctx.IsSet(TxPoolLifetimeFlag.Name) {
cfg.Lifetime = ctx.Duration(TxPoolLifetimeFlag.Name)
}
Expand Down Expand Up @@ -2292,6 +2310,8 @@ func EnableNodeInfo(poolConfig *legacypool.Config, nodeInfo *p2p.NodeInfo) Setup
"GlobalSlots": poolConfig.GlobalSlots,
"AccountQueue": poolConfig.AccountQueue,
"GlobalQueue": poolConfig.GlobalQueue,
"Pool2Slots": poolConfig.Pool2Slots,
"Pool3Slots": poolConfig.Pool3Slots,
"Lifetime": poolConfig.Lifetime,
})
}
Expand Down
8 changes: 7 additions & 1 deletion core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ import (
)

// NewTxsEvent is posted when a batch of transactions enters the transaction pool.
type NewTxsEvent struct{ Txs []*types.Transaction }
type NewTxsEvent struct {
Txs []*types.Transaction
// Static bool is Whether to send to only Static peer or not.
// This is because at high traffic we still want to broadcast transactions to at least some peers so that we
// minimize the transaction lost.
Static bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would call it differently, maybe BroadcastToStatic or something more meaningful

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion-1:
For pool-2: it has its own peers to broadcast the Txs, the peers of pool-2 could be consisted by:
1.StaticNodes
2.A subset of the other peers, sqrt()?
Because, many node may not contain any StaticNodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sqrt() of peers is already what happens in pool-1. So we might do cube root or something?

Copy link
Collaborator

@zzzckck zzzckck Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sqrt() of peers is already what happens in pool-1
// ==
do you mean the sqrt() logic in BroadcastTransactions? https://github.com/bnb-chain/bsc/blob/master/eth/handler.go#L865?

What I mean here is that: suppose we have 400 connected peers, 10 StaticNodes, then:
1.For pool-1: it has 400 peers, directly send full tx content to 20(sqrt(400) peers, and send announcement to the left 380 peers
2.For pool-2: it has sqrt(400) + 10(size of static nodes), discard the overlapped one, it could have 20 -> 30 peers. Suppose it is 25 peets, base on it, send full tx content to 5(sqrt(25)) peers, and send announcement to the left 20 peers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the above example, there is a chance that all 5 peers are non-static. Is that fine? Or we should have at least 1 static peer (if it exists) to send full tx always.

}

// ReannoTxsEvent is posted when a batch of local pending transactions exceed a specified duration.
type ReannoTxsEvent struct{ Txs []*types.Transaction }
Expand Down
4 changes: 4 additions & 0 deletions core/txpool/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ var (
// configured for the transaction pool.
ErrUnderpriced = errors.New("transaction underpriced")

// ErrUnderpriced is returned if a transaction's gas price is below the minimum
// configured for the transaction pool.
ErrUnderpricedTransferredtoAnotherPool = errors.New("transaction underpriced, so it is either in pool2 or pool3")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need this error? I think underpriced transaction can be simply discarded


// ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced
// with a different one without the required price bump.
ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")
Expand Down
111 changes: 111 additions & 0 deletions core/txpool/legacypool/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package legacypool

import (
containerList "container/list"
"fmt"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)

type LRUBuffer struct {
Copy link
Collaborator

@zzzckck zzzckck Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion-2:
maybe we can just use the fastcache instead? it is a RingBuffer, not LRU, RingBuffer has some advantages, you may refer: https://github.com/bnb-chain/bsc/blob/master/core/state/snapshot/disklayer.go#L36

Fastcache may not work directly, as it does not support iteration. We can add a list of TxHash to support the iteration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also suggest here to use well tested lib, like github.com/ethereum/go-ethereum/common/lru, github.com/VictoriaMetrics/fastcache, etc. They have less bug and better performance.

capacity int
buffer *containerList.List
index map[common.Hash]*containerList.Element
mu sync.Mutex
size int // Total number of slots used
}

func NewLRUBuffer(capacity int) *LRUBuffer {
return &LRUBuffer{
capacity: capacity,
buffer: containerList.New(),
index: make(map[common.Hash]*containerList.Element),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should pre-allocate this to max size or some lvl e.g. 1/2 of capacity

size: 0, // Initialize size to 0
}
}

func (lru *LRUBuffer) Add(tx *types.Transaction) {
lru.mu.Lock()
defer lru.mu.Unlock()

if elem, ok := lru.index[tx.Hash()]; ok {
lru.buffer.MoveToFront(elem)
return
}

txSlots := numSlots(tx)

// Remove elements until there is enough capacity
for lru.size+txSlots > lru.capacity && lru.buffer.Len() > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why lru.buffer.Len() > 0 ?

back := lru.buffer.Back()
removedTx := back.Value.(*types.Transaction)
lru.buffer.Remove(back)
delete(lru.index, removedTx.Hash())
lru.size -= numSlots(removedTx) // Decrease size by the slots of the removed transaction
Comment on lines +42 to +46
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if deleting one is not enough because of # of slots?

}

elem := lru.buffer.PushFront(tx)
lru.index[tx.Hash()] = elem
lru.size += txSlots // Increase size by the slots of the new transaction
}

func (lru *LRUBuffer) Get(hash common.Hash) (*types.Transaction, bool) {
lru.mu.Lock()
defer lru.mu.Unlock()

if elem, ok := lru.index[hash]; ok {
lru.buffer.MoveToFront(elem)
return elem.Value.(*types.Transaction), true
}
return nil, false
}

func (lru *LRUBuffer) Flush(maxTransactions int) []*types.Transaction {
lru.mu.Lock()
defer lru.mu.Unlock()

txs := make([]*types.Transaction, 0, maxTransactions)
count := 0
for count < maxTransactions && lru.buffer.Len() > 0 {
back := lru.buffer.Back()
removedTx := back.Value.(*types.Transaction)
txs = append(txs, removedTx)
lru.buffer.Remove(back)
delete(lru.index, removedTx.Hash())
lru.size -= numSlots(removedTx) // Decrease size by the slots of the removed transaction
Comment on lines +74 to +79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the same as in if before, maybe it would be nicer to have it under one private function

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the same as in if before, maybe it would be nicer to have it under one private function

+1

count++
}
return txs
}

// New method to get the current size of the buffer in terms of slots
func (lru *LRUBuffer) Size() int {
lru.mu.Lock()
defer lru.mu.Unlock()
return lru.size
}

// New iterator method to iterate over all transactions, ONLY used for printing and debugging
func (lru *LRUBuffer) iterate() <-chan *types.Transaction {
ch := make(chan *types.Transaction)
go func() {
lru.mu.Lock()
defer lru.mu.Unlock()
defer close(ch)

for e := lru.buffer.Front(); e != nil; e = e.Next() {
ch <- e.Value.(*types.Transaction)
}
}()
return ch
}

func (lru *LRUBuffer) PrintTxStats() {
// Iterating over the transactions
for tx := range lru.iterate() {
// Print transaction details or process them as needed
fmt.Println(tx.Hash().String(), tx.GasFeeCap().String(), tx.GasTipCap().String())
}
}
126 changes: 126 additions & 0 deletions core/txpool/legacypool/buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package legacypool

import (
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)

// Helper function to create a dummy transaction of specified size
func createDummyTransaction(size int) *types.Transaction {
data := make([]byte, size)
return types.NewTransaction(0, common.Address{}, nil, 0, nil, data)
}

func TestNewLRUBuffer(t *testing.T) {
capacity := 10
lru := NewLRUBuffer(capacity)
if lru.capacity != capacity {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use github.com/stretchr/testify/require to assert:

require.Equal(t, capacity, lru.capacity, "capacity wrong")

t.Errorf("expected capacity %d, got %d", capacity, lru.capacity)
}
if lru.buffer.Len() != 0 {
t.Errorf("expected buffer length 0, got %d", lru.buffer.Len())
}
if len(lru.index) != 0 {
t.Errorf("expected index length 0, got %d", len(lru.index))
}
if lru.size != 0 {
t.Errorf("expected size 0, got %d", lru.size)
}
}

func TestAddAndGet(t *testing.T) {
lru := NewLRUBuffer(10)

tx1 := createDummyTransaction(500)
tx2 := createDummyTransaction(1500)

lru.Add(tx1)
lru.Add(tx2)

if lru.Size() != 2 {
t.Errorf("expected size 2, got %d", lru.Size())
}

retrievedTx, ok := lru.Get(tx1.Hash())
if !ok || retrievedTx.Hash() != tx1.Hash() {
t.Errorf("failed to retrieve tx1")
}

retrievedTx, ok = lru.Get(tx2.Hash())
if !ok || retrievedTx.Hash() != tx2.Hash() {
t.Errorf("failed to retrieve tx2")
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should add also scenario which I mentioned before, that you can go over the slots on addition, if newly added one has more slots than last


func TestBufferCapacity(t *testing.T) {
lru := NewLRUBuffer(2) // Capacity in slots

tx1 := createDummyTransaction(500) // 1 slot
tx2 := createDummyTransaction(1500) // 1 slot
tx3 := createDummyTransaction(1000) // 1 slot

lru.Add(tx1)
lru.Add(tx2)

if lru.Size() != 2 {
t.Errorf("expected size 2, got %d", lru.Size())
}

lru.Add(tx3)

if lru.Size() != 2 {
t.Errorf("expected size 2 after adding tx3, got %d", lru.Size())
}

if _, ok := lru.Get(tx1.Hash()); ok {
t.Errorf("expected tx1 to be evicted")
}
}

func TestFlush(t *testing.T) {
lru := NewLRUBuffer(10)

tx1 := createDummyTransaction(500)
tx2 := createDummyTransaction(1500)
tx3 := createDummyTransaction(1000)

lru.Add(tx1)
lru.Add(tx2)
lru.Add(tx3)

flushedTxs := lru.Flush(2)

if len(flushedTxs) != 2 {
t.Errorf("expected to flush 2 transactions, got %d", len(flushedTxs))
}

expectedSize := 1
actualSize := lru.Size()
if expectedSize != actualSize {
t.Errorf("expected size after flush %d, got %d", expectedSize, actualSize)
}
}

func TestSize(t *testing.T) {
lru := NewLRUBuffer(10)

tx1 := createDummyTransaction(500) // 1 slot
tx2 := createDummyTransaction(1500) // 2 slots

lru.Add(tx1)
if lru.Size() != 1 {
t.Errorf("expected size 1, got %d", lru.Size())
}

lru.Add(tx2)
if lru.Size() != 2 {
t.Errorf("expected size 2, got %d", lru.Size())
}

lru.Flush(1)
if lru.Size() != 1 {
t.Errorf("expected size 1 after flush, got %d", lru.Size())
}
}
Loading
Loading