From 7b7f40bfd276ab87c1a97b06e10ab16effdc6e6b Mon Sep 17 00:00:00 2001 From: lmittmann Date: Tue, 28 Jun 2022 11:18:34 +0200 Subject: [PATCH 1/6] eth/filter, ethclient/gethclient: added fullTx-flag to NewPendingTransactions * #24524 --- eth/filters/api.go | 38 ++++++++++++++--------- eth/filters/filter_system.go | 28 ++++++++--------- eth/filters/filter_system_test.go | 22 ++++++------- ethclient/gethclient/gethclient.go | 9 ++++-- ethclient/gethclient/gethclient_test.go | 41 +++++++++++++++++++++++-- 5 files changed, 93 insertions(+), 45 deletions(-) diff --git a/eth/filters/api.go b/eth/filters/api.go index 43e63d5ba98a..c0582e170a59 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -38,6 +38,7 @@ type filter struct { typ Type deadline *time.Timer // filter is inactive when deadline triggers hashes []common.Hash + txs []*types.Transaction crit FilterCriteria logs []*types.Log s *Subscription // associated subscription in event system @@ -96,28 +97,28 @@ func (api *FilterAPI) timeoutLoop(timeout time.Duration) { } } -// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes +// NewPendingTransactionFilter creates a filter that fetches pending transactions // as transactions enter the pending state. // // It is part of the filter package because this filter can be used through the // `eth_getFilterChanges` polling method that is also used for log filters. func (api *FilterAPI) NewPendingTransactionFilter() rpc.ID { var ( - pendingTxs = make(chan []common.Hash) + pendingTxs = make(chan []*types.Transaction) pendingTxSub = api.events.SubscribePendingTxs(pendingTxs) ) api.filtersMu.Lock() - api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub} + api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub} api.filtersMu.Unlock() go func() { for { select { - case ph := <-pendingTxs: + case pTx := <-pendingTxs: api.filtersMu.Lock() if f, found := api.filters[pendingTxSub.ID]; found { - f.hashes = append(f.hashes, ph...) + f.txs = append(f.txs, pTx...) } api.filtersMu.Unlock() case <-pendingTxSub.Err(): @@ -132,9 +133,10 @@ func (api *FilterAPI) NewPendingTransactionFilter() rpc.ID { return pendingTxSub.ID } -// NewPendingTransactions creates a subscription that is triggered each time a transaction -// enters the transaction pool and was signed from one of the transactions this nodes manages. -func (api *FilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) { +// NewPendingTransactions creates a subscription that is triggered each time a +// transaction enters the transaction pool. If fullTx is true the full tx is +// sent to the client, otherwise the hash is sent. +func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx bool) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported @@ -143,16 +145,20 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscrip rpcSub := notifier.CreateSubscription() go func() { - txHashes := make(chan []common.Hash, 128) - pendingTxSub := api.events.SubscribePendingTxs(txHashes) + txs := make(chan []*types.Transaction, 128) + pendingTxSub := api.events.SubscribePendingTxs(txs) for { select { - case hashes := <-txHashes: + case txs := <-txs: // To keep the original behaviour, send a single tx hash in one notification. // TODO(rjl493456442) Send a batch of tx hashes in one notification - for _, h := range hashes { - notifier.Notify(rpcSub.ID, h) + for _, tx := range txs { + if fullTx { + notifier.Notify(rpcSub.ID, tx) + } else { + notifier.Notify(rpcSub.ID, tx.Hash()) + } } case <-rpcSub.Err(): pendingTxSub.Unsubscribe() @@ -411,10 +417,14 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { f.deadline.Reset(api.timeout) switch f.typ { - case PendingTransactionsSubscription, BlocksSubscription: + case BlocksSubscription: hashes := f.hashes f.hashes = nil return returnHashes(hashes), nil + case PendingTransactionsSubscription: + txs := f.txs + f.txs = nil + return txs, nil case LogsSubscription, MinedAndPendingLogsSubscription: logs := f.logs f.logs = nil diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index ab9858f45495..e86a67abfda3 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -124,8 +124,8 @@ const ( PendingLogsSubscription // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks. MinedAndPendingLogsSubscription - // PendingTransactionsSubscription queries tx hashes for pending - // transactions entering the pending state + // PendingTransactionsSubscription queries for pending transactions entering + // the pending state PendingTransactionsSubscription // BlocksSubscription queries hashes for blocks that are imported BlocksSubscription @@ -151,7 +151,7 @@ type subscription struct { created time.Time logsCrit ethereum.FilterQuery logs chan []*types.Log - hashes chan []common.Hash + txs chan []*types.Transaction headers chan *types.Header installed chan struct{} // closed when the filter is installed err chan error // closed when the filter is uninstalled @@ -244,7 +244,7 @@ func (sub *Subscription) Unsubscribe() { case sub.es.uninstall <- sub.f: break uninstallLoop case <-sub.f.logs: - case <-sub.f.hashes: + case <-sub.f.txs: case <-sub.f.headers: } } @@ -311,7 +311,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan []common.Hash), + txs: make(chan []*types.Transaction), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -328,7 +328,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan []common.Hash), + txs: make(chan []*types.Transaction), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -345,7 +345,7 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan []common.Hash), + txs: make(chan []*types.Transaction), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -361,7 +361,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti typ: BlocksSubscription, created: time.Now(), logs: make(chan []*types.Log), - hashes: make(chan []common.Hash), + txs: make(chan []*types.Transaction), headers: headers, installed: make(chan struct{}), err: make(chan error), @@ -369,15 +369,15 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti return es.subscribe(sub) } -// SubscribePendingTxs creates a subscription that writes transaction hashes for +// SubscribePendingTxs creates a subscription that writes transactions for // transactions that enter the transaction pool. -func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription { +func (es *EventSystem) SubscribePendingTxs(txs chan []*types.Transaction) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: PendingTransactionsSubscription, created: time.Now(), logs: make(chan []*types.Log), - hashes: hashes, + txs: txs, headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -421,12 +421,8 @@ func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLog } func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) { - hashes := make([]common.Hash, 0, len(ev.Txs)) - for _, tx := range ev.Txs { - hashes = append(hashes, tx.Hash()) - } for _, f := range filters[PendingTransactionsSubscription] { - f.hashes <- hashes + f.txs <- ev.Txs } } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 4386f0e5bde6..a41271f7b843 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -240,7 +240,7 @@ func TestPendingTxFilter(t *testing.T) { types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), } - hashes []common.Hash + txs []*types.Transaction ) fid0 := api.NewPendingTransactionFilter() @@ -255,9 +255,9 @@ func TestPendingTxFilter(t *testing.T) { t.Fatalf("Unable to retrieve logs: %v", err) } - h := results.([]common.Hash) - hashes = append(hashes, h...) - if len(hashes) >= len(transactions) { + tx := results.([]*types.Transaction) + txs = append(txs, tx...) + if len(txs) >= len(transactions) { break } // check timeout @@ -268,13 +268,13 @@ func TestPendingTxFilter(t *testing.T) { time.Sleep(100 * time.Millisecond) } - if len(hashes) != len(transactions) { - t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes)) + if len(txs) != len(transactions) { + t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(txs)) return } - for i := range hashes { - if hashes[i] != transactions[i].Hash() { - t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i]) + for i := range txs { + if txs[i].Hash() != transactions[i].Hash() { + t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), txs[i].Hash()) } } } @@ -705,11 +705,11 @@ func TestPendingTxFilterDeadlock(t *testing.T) { fids[i] = fid // Wait for at least one tx to arrive in filter for { - hashes, err := api.GetFilterChanges(fid) + txs, err := api.GetFilterChanges(fid) if err != nil { t.Fatalf("Filter should exist: %v\n", err) } - if len(hashes.([]common.Hash)) > 0 { + if len(txs.([]*types.Transaction)) > 0 { break } runtime.Gosched() diff --git a/ethclient/gethclient/gethclient.go b/ethclient/gethclient/gethclient.go index e182911aa5de..0c4802400eef 100644 --- a/ethclient/gethclient/gethclient.go +++ b/ethclient/gethclient/gethclient.go @@ -167,8 +167,13 @@ func (ec *Client) GetNodeInfo(ctx context.Context) (*p2p.NodeInfo, error) { } // SubscribePendingTransactions subscribes to new pending transactions. -func (ec *Client) SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error) { - return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions") +func (ec *Client) SubscribePendingTransactions(ctx context.Context, ch chan<- *types.Transaction) (*rpc.ClientSubscription, error) { + return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions", true) +} + +// SubscribePendingTransactionHashes subscribes to new pending transaction hashes. +func (ec *Client) SubscribePendingTransactionHashes(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error) { + return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions", false) } func toBlockNumArg(number *big.Int) string { diff --git a/ethclient/gethclient/gethclient_test.go b/ethclient/gethclient/gethclient_test.go index da0118887b26..a6a4fc86ba65 100644 --- a/ethclient/gethclient/gethclient_test.go +++ b/ethclient/gethclient/gethclient_test.go @@ -121,6 +121,9 @@ func TestGethClient(t *testing.T) { }, { "TestSetHead", func(t *testing.T) { testSetHead(t, client) }, + }, { + "TestSubscribePendingTxHashes", + func(t *testing.T) { testSubscribePendingTransactionHashes(t, client) }, }, { "TestSubscribePendingTxs", func(t *testing.T) { testSubscribePendingTransactions(t, client) }, @@ -269,12 +272,12 @@ func testSetHead(t *testing.T, client *rpc.Client) { } } -func testSubscribePendingTransactions(t *testing.T, client *rpc.Client) { +func testSubscribePendingTransactionHashes(t *testing.T, client *rpc.Client) { ec := New(client) ethcl := ethclient.NewClient(client) // Subscribe to Transactions ch := make(chan common.Hash) - ec.SubscribePendingTransactions(context.Background(), ch) + ec.SubscribePendingTransactionHashes(context.Background(), ch) // Send a transaction chainID, err := ethcl.ChainID(context.Background()) if err != nil { @@ -303,6 +306,40 @@ func testSubscribePendingTransactions(t *testing.T, client *rpc.Client) { } } +func testSubscribePendingTransactions(t *testing.T, client *rpc.Client) { + ec := New(client) + ethcl := ethclient.NewClient(client) + // Subscribe to Transactions + ch := make(chan *types.Transaction) + ec.SubscribePendingTransactions(context.Background(), ch) + // Send a transaction + chainID, err := ethcl.ChainID(context.Background()) + if err != nil { + t.Fatal(err) + } + // Create transaction + tx := types.NewTransaction(1, common.Address{1}, big.NewInt(1), 22000, big.NewInt(1), nil) + signer := types.LatestSignerForChainID(chainID) + signature, err := crypto.Sign(signer.Hash(tx).Bytes(), testKey) + if err != nil { + t.Fatal(err) + } + signedTx, err := tx.WithSignature(signer, signature) + if err != nil { + t.Fatal(err) + } + // Send transaction + err = ethcl.SendTransaction(context.Background(), signedTx) + if err != nil { + t.Fatal(err) + } + // Check that the transaction was send over the channel + tx = <-ch + if tx.Hash() != signedTx.Hash() { + t.Fatalf("Invalid tx hash received, got %v, want %v", tx.Hash(), signedTx.Hash()) + } +} + func testCallContract(t *testing.T, client *rpc.Client) { ec := New(client) msg := ethereum.CallMsg{ From e3f94fc8c26d2b4d73e04a5783ace76e0807daf1 Mon Sep 17 00:00:00 2001 From: lmittmann Date: Tue, 28 Jun 2022 12:59:15 +0200 Subject: [PATCH 2/6] made `fullTx` arg optional to keep backwards compatibility --- eth/filters/api.go | 4 ++-- ethclient/gethclient/gethclient.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/eth/filters/api.go b/eth/filters/api.go index c0582e170a59..f52bff6f3c32 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -136,7 +136,7 @@ func (api *FilterAPI) NewPendingTransactionFilter() rpc.ID { // NewPendingTransactions creates a subscription that is triggered each time a // transaction enters the transaction pool. If fullTx is true the full tx is // sent to the client, otherwise the hash is sent. -func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx bool) (*rpc.Subscription, error) { +func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported @@ -154,7 +154,7 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx bool) ( // To keep the original behaviour, send a single tx hash in one notification. // TODO(rjl493456442) Send a batch of tx hashes in one notification for _, tx := range txs { - if fullTx { + if fullTx != nil && *fullTx { notifier.Notify(rpcSub.ID, tx) } else { notifier.Notify(rpcSub.ID, tx.Hash()) diff --git a/ethclient/gethclient/gethclient.go b/ethclient/gethclient/gethclient.go index 0c4802400eef..218b7b64a6ca 100644 --- a/ethclient/gethclient/gethclient.go +++ b/ethclient/gethclient/gethclient.go @@ -173,7 +173,7 @@ func (ec *Client) SubscribePendingTransactions(ctx context.Context, ch chan<- *t // SubscribePendingTransactionHashes subscribes to new pending transaction hashes. func (ec *Client) SubscribePendingTransactionHashes(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error) { - return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions", false) + return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions") } func toBlockNumArg(number *big.Int) string { From e90ab4337e151ed92b7cf93294de44d19aa902bf Mon Sep 17 00:00:00 2001 From: lmittmann Date: Thu, 11 Aug 2022 15:33:49 +0200 Subject: [PATCH 3/6] renamed `SubscribePendingTransactions` -> `SubscribeFullPendingTransactions` --- ethclient/gethclient/gethclient.go | 8 ++++---- ethclient/gethclient/gethclient_test.go | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ethclient/gethclient/gethclient.go b/ethclient/gethclient/gethclient.go index 218b7b64a6ca..fcf37180c815 100644 --- a/ethclient/gethclient/gethclient.go +++ b/ethclient/gethclient/gethclient.go @@ -166,13 +166,13 @@ func (ec *Client) GetNodeInfo(ctx context.Context) (*p2p.NodeInfo, error) { return &result, err } -// SubscribePendingTransactions subscribes to new pending transactions. -func (ec *Client) SubscribePendingTransactions(ctx context.Context, ch chan<- *types.Transaction) (*rpc.ClientSubscription, error) { +// SubscribeFullPendingTransactions subscribes to new pending transactions. +func (ec *Client) SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *types.Transaction) (*rpc.ClientSubscription, error) { return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions", true) } -// SubscribePendingTransactionHashes subscribes to new pending transaction hashes. -func (ec *Client) SubscribePendingTransactionHashes(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error) { +// SubscribePendingTransaction subscribes to new pending transaction hashes. +func (ec *Client) SubscribePendingTransaction(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error) { return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions") } diff --git a/ethclient/gethclient/gethclient_test.go b/ethclient/gethclient/gethclient_test.go index a6a4fc86ba65..864bca235907 100644 --- a/ethclient/gethclient/gethclient_test.go +++ b/ethclient/gethclient/gethclient_test.go @@ -277,7 +277,7 @@ func testSubscribePendingTransactionHashes(t *testing.T, client *rpc.Client) { ethcl := ethclient.NewClient(client) // Subscribe to Transactions ch := make(chan common.Hash) - ec.SubscribePendingTransactionHashes(context.Background(), ch) + ec.SubscribePendingTransaction(context.Background(), ch) // Send a transaction chainID, err := ethcl.ChainID(context.Background()) if err != nil { @@ -311,7 +311,7 @@ func testSubscribePendingTransactions(t *testing.T, client *rpc.Client) { ethcl := ethclient.NewClient(client) // Subscribe to Transactions ch := make(chan *types.Transaction) - ec.SubscribePendingTransactions(context.Background(), ch) + ec.SubscribeFullPendingTransactions(context.Background(), ch) // Send a transaction chainID, err := ethcl.ChainID(context.Background()) if err != nil { From edbe4804bf5c66ecfd37ca755688bf55980722b1 Mon Sep 17 00:00:00 2001 From: lmittmann Date: Thu, 11 Aug 2022 15:41:24 +0200 Subject: [PATCH 4/6] fixed nameing --- ethclient/gethclient/gethclient_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ethclient/gethclient/gethclient_test.go b/ethclient/gethclient/gethclient_test.go index 864bca235907..d1bb1b16ff42 100644 --- a/ethclient/gethclient/gethclient_test.go +++ b/ethclient/gethclient/gethclient_test.go @@ -123,10 +123,10 @@ func TestGethClient(t *testing.T) { func(t *testing.T) { testSetHead(t, client) }, }, { "TestSubscribePendingTxHashes", - func(t *testing.T) { testSubscribePendingTransactionHashes(t, client) }, + func(t *testing.T) { testSubscribePendingTransaction(t, client) }, }, { "TestSubscribePendingTxs", - func(t *testing.T) { testSubscribePendingTransactions(t, client) }, + func(t *testing.T) { testSubscribeFullPendingTransactions(t, client) }, }, { "TestCallContract", func(t *testing.T) { testCallContract(t, client) }, @@ -272,7 +272,7 @@ func testSetHead(t *testing.T, client *rpc.Client) { } } -func testSubscribePendingTransactionHashes(t *testing.T, client *rpc.Client) { +func testSubscribePendingTransaction(t *testing.T, client *rpc.Client) { ec := New(client) ethcl := ethclient.NewClient(client) // Subscribe to Transactions @@ -306,7 +306,7 @@ func testSubscribePendingTransactionHashes(t *testing.T, client *rpc.Client) { } } -func testSubscribePendingTransactions(t *testing.T, client *rpc.Client) { +func testSubscribeFullPendingTransactions(t *testing.T, client *rpc.Client) { ec := New(client) ethcl := ethclient.NewClient(client) // Subscribe to Transactions From be05027976132119fa4e40978da3651f7e778885 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 19 Aug 2022 13:30:49 +0200 Subject: [PATCH 5/6] Update gethclient.go --- ethclient/gethclient/gethclient.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ethclient/gethclient/gethclient.go b/ethclient/gethclient/gethclient.go index fcf37180c815..8211ee75ae01 100644 --- a/ethclient/gethclient/gethclient.go +++ b/ethclient/gethclient/gethclient.go @@ -171,8 +171,8 @@ func (ec *Client) SubscribeFullPendingTransactions(ctx context.Context, ch chan< return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions", true) } -// SubscribePendingTransaction subscribes to new pending transaction hashes. -func (ec *Client) SubscribePendingTransaction(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error) { +// SubscribePendingTransactions subscribes to new pending transaction hashes. +func (ec *Client) SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error) { return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions") } From 1e7d5da68255767e0bef49d5b5b88dc568c753b4 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 19 Aug 2022 13:31:48 +0200 Subject: [PATCH 6/6] Update gethclient_test.go --- ethclient/gethclient/gethclient_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ethclient/gethclient/gethclient_test.go b/ethclient/gethclient/gethclient_test.go index d1bb1b16ff42..e60490c61646 100644 --- a/ethclient/gethclient/gethclient_test.go +++ b/ethclient/gethclient/gethclient_test.go @@ -123,7 +123,7 @@ func TestGethClient(t *testing.T) { func(t *testing.T) { testSetHead(t, client) }, }, { "TestSubscribePendingTxHashes", - func(t *testing.T) { testSubscribePendingTransaction(t, client) }, + func(t *testing.T) { testSubscribePendingTransactions(t, client) }, }, { "TestSubscribePendingTxs", func(t *testing.T) { testSubscribeFullPendingTransactions(t, client) }, @@ -272,12 +272,12 @@ func testSetHead(t *testing.T, client *rpc.Client) { } } -func testSubscribePendingTransaction(t *testing.T, client *rpc.Client) { +func testSubscribePendingTransactions(t *testing.T, client *rpc.Client) { ec := New(client) ethcl := ethclient.NewClient(client) // Subscribe to Transactions ch := make(chan common.Hash) - ec.SubscribePendingTransaction(context.Background(), ch) + ec.SubscribePendingTransactions(context.Background(), ch) // Send a transaction chainID, err := ethcl.ChainID(context.Background()) if err != nil {