Skip to content

Commit

Permalink
feat(mempool): priority nonce mempool option with tx replacement (bac…
Browse files Browse the repository at this point in the history
…kport #14484) (#15126)

Co-authored-by: JayT106 <JayT106@users.noreply.github.com>
  • Loading branch information
mergify[bot] and JayT106 committed Feb 22, 2023
1 parent b6de22a commit 9c32873
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 7 deletions.
5 changes: 4 additions & 1 deletion docs/docs/building-apps/02-app-mempool.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ It is an integer value that sets the mempool in one of three modes, *bounded*, *

#### Callback

Allow to set a callback to be called when a transaction is read from the mempool.
The priority nonce mempool provides mempool options allowing the application sets callback(s).

* **OnRead**: Set a callback to be called when a transaction is read from the mempool.
* **TxReplacement**: Sets a callback to be called when duplicated transaction nonce detected during mempool insert. Application can define a transaction replacement rule based on tx priority or certain transaction fields.

More information on the SDK mempool implementation can be found in the [godocs](https://pkg.go.dev/github.com/cosmos/cosmos-sdk/types/mempool).
31 changes: 25 additions & 6 deletions types/mempool/priority_nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type priorityNonceMempool struct {
senderIndices map[string]*skiplist.SkipList
scores map[txMeta]txMeta
onRead func(tx sdk.Tx)
txReplacement func(op, np int64, oTx, nTx sdk.Tx) bool
maxTx int
}

Expand Down Expand Up @@ -92,6 +93,14 @@ func PriorityNonceWithOnRead(onRead func(tx sdk.Tx)) PriorityNonceMempoolOption
}
}

// PriorityNonceWithTxReplacement sets a callback to be called when duplicated transaction nonce detected during mempool insert.
// Application can define a transaction replacement rule based on tx priority or certain transaction fields.
func PriorityNonceWithTxReplacement(txReplacementRule func(op, np int64, oTx, nTx sdk.Tx) bool) PriorityNonceMempoolOption {
return func(mp *priorityNonceMempool) {
mp.txReplacement = txReplacementRule
}
}

// PriorityNonceWithMaxTx sets the maximum number of transactions allowed in the mempool with the semantics:
//
// <0: disabled, `Insert` is a no-op
Expand Down Expand Up @@ -166,12 +175,6 @@ func (mp *priorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
mp.senderIndices[sender] = senderIndex
}

mp.priorityCounts[priority]++

// Since senderIndex is scored by nonce, a changed priority will overwrite the
// existing key.
key.senderElement = senderIndex.Set(key, tx)

// Since mp.priorityIndex is scored by priority, then sender, then nonce, a
// changed priority will create a new key, so we must remove the old key and
// re-insert it to avoid having the same tx with different priorityIndex indexed
Expand All @@ -181,6 +184,16 @@ func (mp *priorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
// changes.
sk := txMeta{nonce: nonce, sender: sender}
if oldScore, txExists := mp.scores[sk]; txExists {
if mp.txReplacement != nil && !mp.txReplacement(oldScore.priority, priority, senderIndex.Get(key).Value.(sdk.Tx), tx) {
return fmt.Errorf(
"tx doesn't fit the replacement rule, oldPriority: %v, newPriority: %v, oldTx: %v, newTx: %v",
oldScore.priority,
priority,
senderIndex.Get(key).Value.(sdk.Tx),
tx,
)
}

mp.priorityIndex.Remove(txMeta{
nonce: nonce,
sender: sender,
Expand All @@ -190,6 +203,12 @@ func (mp *priorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
mp.priorityCounts[oldScore.priority]--
}

mp.priorityCounts[priority]++

// Since senderIndex is scored by nonce, a changed priority will overwrite the
// existing key.
key.senderElement = senderIndex.Set(key, tx)

mp.scores[sk] = txMeta{priority: priority}
mp.priorityIndex.Set(key, tx)

Expand Down
52 changes: 52 additions & 0 deletions types/mempool/priority_nonce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,3 +639,55 @@ func TestTxLimit(t *testing.T) {
require.Equal(t, 0, mp.CountTx())
}
}

func TestTxReplacement(t *testing.T) {
accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 1)
ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger())
sa := accounts[0].Address

txs := []testTx{
{priority: 20, nonce: 1, address: sa},
{priority: 15, nonce: 1, address: sa}, // priority is less than the first Tx, failed tx replacement when the option enabled.
{priority: 23, nonce: 1, address: sa}, // priority is not 20% more than the first Tx, failed tx replacement when the option enabled.
{priority: 24, nonce: 1, address: sa}, // priority is 20% more than the first Tx, the first tx will be replaced.
}

// test Priority with default mempool
mp := mempool.NewPriorityMempool()
for _, tx := range txs {
c := ctx.WithPriority(tx.priority)
require.NoError(t, mp.Insert(c, tx))
require.Equal(t, 1, mp.CountTx())

iter := mp.Select(ctx, nil)
require.Equal(t, tx, iter.Tx())
}

// test Priority with TxReplacement
// we set a TestTxReplacement rule which the priority of the new Tx must be 20% more than the priority of the old Tx
// otherwise, the Insert will return error
feeBump := 20
mp = mempool.NewPriorityMempool(mempool.PriorityNonceWithTxReplacement(func(op, np int64, oTx, nTx sdk.Tx) bool {
threshold := int64(100 + feeBump)
return np >= op*threshold/100
}))

c := ctx.WithPriority(txs[0].priority)
require.NoError(t, mp.Insert(c, txs[0]))
require.Equal(t, 1, mp.CountTx())

c = ctx.WithPriority(txs[1].priority)
require.Error(t, mp.Insert(c, txs[1]))
require.Equal(t, 1, mp.CountTx())

c = ctx.WithPriority(txs[2].priority)
require.Error(t, mp.Insert(c, txs[2]))
require.Equal(t, 1, mp.CountTx())

c = ctx.WithPriority(txs[3].priority)
require.NoError(t, mp.Insert(c, txs[3]))
require.Equal(t, 1, mp.CountTx())

iter := mp.Select(ctx, nil)
require.Equal(t, txs[3], iter.Tx())
}

0 comments on commit 9c32873

Please sign in to comment.