Skip to content

Commit

Permalink
add receipts cache. improved bestHeader handling. e2e tests running
Browse files Browse the repository at this point in the history
  • Loading branch information
buck54321 committed Aug 31, 2022
1 parent fc3be7e commit 95f9c31
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 110 deletions.
6 changes: 5 additions & 1 deletion client/asset/eth/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,11 @@ func createWallet(createWalletParams *asset.CreateWalletParams, skipConnect bool
case walletTypeRPC:

// Check that we can connect to all endpoints.
endpoints := strings.Split(createWalletParams.Settings[providersKey], " ")
providerDef := createWalletParams.Settings[providersKey]
if len(providerDef) == 0 {
return errors.New("no providers specified")
}
endpoints := strings.Split(providerDef, " ")
n := len(endpoints)

// TODO: This procedure may actually work for walletTypeGeth too.
Expand Down
204 changes: 127 additions & 77 deletions client/asset/eth/multirpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ import (
"github.com/ethereum/go-ethereum/rpc"
)

const failQuarantine = time.Minute
const (
failQuarantine = time.Minute
receiptCacheExpiration = time.Hour
)

var nonceProviderStickiness = time.Minute

Expand Down Expand Up @@ -112,6 +115,11 @@ func (p *provider) bestHeader(ctx context.Context, log dex.Logger) (*types.Heade
return hdr, nil
}

type receiptRecord struct {
r *types.Receipt
lastAccess time.Time
}

// multiRPCClient is an ethFetcher backed by one or more public infrastructure
// providers.
// MATIC providers at
Expand All @@ -134,6 +142,12 @@ type multiRPCClient struct {
*provider
stamp time.Time
}

receipts struct {
sync.RWMutex
cache map[common.Hash]*receiptRecord
lastClean time.Time
}
}

var _ ethFetcher = (*multiRPCClient)(nil)
Expand All @@ -145,13 +159,17 @@ func newMultiRPCClient(dir string, endpoints []string, log dex.Logger, cfg *para
return nil, fmt.Errorf("error parsing credentials from %q: %w", dir, err)
}

return &multiRPCClient{
m := &multiRPCClient{
cfg: cfg,
log: log,
creds: creds,
chainID: chainID,
endpoints: endpoints,
}, nil
}
m.receipts.cache = make(map[common.Hash]*receiptRecord)
m.receipts.lastClean = time.Now()

return m, nil
}

func connectProviders(ctx context.Context, endpoints []string, addr common.Address, log dex.Logger) ([]*provider, error) {
Expand Down Expand Up @@ -290,7 +308,11 @@ func (m *multiRPCClient) connect(ctx context.Context) (err error) {
}

func (m *multiRPCClient) reconfigure(ctx context.Context, settings map[string]string) error {
endpoints := strings.Split(settings[providersKey], " ")
providerDef := settings[providersKey]
if len(providerDef) == 0 {
return errors.New("no providers specified")
}
endpoints := strings.Split(providerDef, " ")
providers, err := connectProviders(ctx, endpoints, m.creds.addr, m.log)
if err != nil {
return err
Expand All @@ -306,39 +328,6 @@ func (m *multiRPCClient) reconfigure(ctx context.Context, settings map[string]st
return nil
}

const retryError = dex.ErrorKind("retrying")

type retrySubscriptionDummy struct {
retry chan error
}

func newRetrySubscription(ctx context.Context, t time.Duration) *retrySubscriptionDummy {
timer := time.NewTimer(t)
r := &retrySubscriptionDummy{
retry: make(chan error),
}
go func() {
select {
case <-timer.C:
r.retry <- retryError
case <-ctx.Done():
timer.Stop()
close(r.retry)
return
}
}()

return r
}

func (r *retrySubscriptionDummy) Unsubscribe() {
close(r.retry)
}

func (r *retrySubscriptionDummy) Err() <-chan error {
return r.retry
}

func subHeaders(ctx context.Context, p *provider, sub ethereum.Subscription, h chan *types.Header, addr common.Address, log dex.Logger) {
defer sub.Unsubscribe()
var lastWarning time.Time
Expand All @@ -360,23 +349,26 @@ func subHeaders(ctx context.Context, p *provider, sub ethereum.Subscription, h c
}
}

logs := make(chan types.Log, 128)
newAcctSub := func(retryTimeout time.Duration) ethereum.Subscription {
config := ethereum.FilterQuery{
Addresses: []common.Address{addr},
}
// I thought the filter logs might catch some transactions we coudld cache
// to avoid rpc calls, but in testing, I get nothing in the channel. May
// revisit later.
// logs := make(chan types.Log, 128)
// newAcctSub := func(retryTimeout time.Duration) ethereum.Subscription {
// config := ethereum.FilterQuery{
// Addresses: []common.Address{addr},
// }

acctSub, err := p.cl().SubscribeFilterLogs(ctx, config, logs)
if err != nil {
log.Errorf("failed to subscribe to filter logs: %v", err)
return newRetrySubscription(ctx, retryTimeout)
}
return acctSub
}
// acctSub, err := p.cl().SubscribeFilterLogs(ctx, config, logs)
// if err != nil {
// log.Errorf("failed to subscribe to filter logs: %v", err)
// return newRetrySubscription(ctx, retryTimeout)
// }
// return acctSub
// }

// If we fail the first time, don't try again.
acctSub := newAcctSub(time.Hour * 24 * 365)
defer acctSub.Unsubscribe()
// // If we fail the first time, don't try again.
// acctSub := newAcctSub(time.Hour * 24 * 365)
// defer acctSub.Unsubscribe()

// Start the background filtering
log.Tracef("handling websocket subscriptions")
Expand All @@ -396,26 +388,69 @@ func subHeaders(ctx context.Context, p *provider, sub ethereum.Subscription, h c
if err != nil { // context cancelled
return
}
case l := <-logs:
log.Tracef("%q log reported: %+v", p.host, l)
case err, ok := <-acctSub.Err():
if err != nil && !errors.Is(err, retryError) {
log.Errorf("%q log subscription error: %v", p.host, err)
}
if ok {
acctSub = newAcctSub(time.Minute * 5)
}
// case l := <-logs:
// log.Tracef("%q log reported: %+v", p.host, l)
// case err, ok := <-acctSub.Err():
// if err != nil && !errors.Is(err, retryError) {
// log.Errorf("%q log subscription error: %v", p.host, err)
// }
// if ok {
// acctSub = newAcctSub(time.Minute * 5)
// }
case <-ctx.Done():
return
}
}
}

// cleanReceipts cleans up the receipt cache, deleting any receipts that haven't
// been access for > receiptCacheExpiration.
func (m *multiRPCClient) cleanReceipts() {
m.receipts.Lock()
for txHash, rec := range m.receipts.cache {
if time.Since(rec.lastAccess) > receiptCacheExpiration {
delete(m.receipts.cache, txHash)
}
}
m.receipts.Unlock()
}

func (m *multiRPCClient) transactionReceipt(ctx context.Context, txHash common.Hash) (r *types.Receipt, err error) {
return r, m.withPreferred(func(p *provider) error {

// TODO
// TODO: Plug into the monitoredTx system from #1638.
// TODO

// Check the cache.
m.receipts.RLock()
cached := m.receipts.cache[txHash]
if cached != nil {
cached.lastAccess = time.Now()
}
if time.Since(m.receipts.lastClean) > time.Minute*20 {
m.receipts.lastClean = time.Now()
go m.cleanReceipts()
}
m.receipts.RUnlock()
if cached != nil {
return cached.r, nil
}

if err = m.withPreferred(func(p *provider) error {
r, err = p.cl().TransactionReceipt(ctx, txHash)
return err
})
}); err != nil {
return nil, err
}

m.receipts.Lock()
m.receipts.cache[txHash] = &receiptRecord{
r: r,
lastAccess: time.Now(),
}
m.receipts.Unlock()

return r, nil
}

func (m *multiRPCClient) providerList() []*provider {
Expand All @@ -441,13 +476,15 @@ func (m *multiRPCClient) withOne(providers []*provider, f func(*provider) error)
return fmt.Errorf("all providers errored")
}

func (m *multiRPCClient) withAny(f func(*provider) error) error {
providers := m.providerList()

rand.Shuffle(len(providers), func(i, j int) {
providers[i], providers[j] = providers[j], providers[i]
func shuffleProviders(p []*provider) {
rand.Shuffle(len(p), func(i, j int) {
p[i], p[j] = p[j], p[i]
})
}

func (m *multiRPCClient) withAny(f func(*provider) error) error {
providers := m.providerList()
shuffleProviders(providers)
return m.withOne(providers, f)
}

Expand All @@ -473,9 +510,7 @@ func (m *multiRPCClient) nonceProviderList() []*provider {
providers = append(providers, p)
}

rand.Shuffle(len(providers), func(i, j int) {
providers[i], providers[j] = providers[j], providers[i]
})
shuffleProviders(providers)

if lastProvider != nil {
providers = append([]*provider{lastProvider}, providers...)
Expand Down Expand Up @@ -503,6 +538,26 @@ func (m *multiRPCClient) addressBalance(ctx context.Context, addr common.Address
}

func (m *multiRPCClient) bestHeader(ctx context.Context) (hdr *types.Header, err error) {
var bestHeader *types.Header
for _, p := range m.providerList() {
h := p.cachedTip()
if h == nil {
continue
}
// This block choosing algo is probably too rudimentary. Really need
// shnuld traverse parents to a common block and sum up gas (including
// uncles?), I think.
if bestHeader == nil || // first one
h.Number.Cmp(bestHeader.Number) > 0 || // newer
(h.Number.Cmp(bestHeader.Number) == 0 && h.GasUsed > bestHeader.GasUsed) { // same height, but more gas used

bestHeader = h
}
}
if bestHeader != nil {
return bestHeader, nil
}

return hdr, m.withAny(func(p *provider) error {
hdr, err = p.bestHeader(ctx, m.log)
return err
Expand Down Expand Up @@ -565,15 +620,10 @@ func (m *multiRPCClient) sendSignedTransaction(ctx context.Context, tx *types.Tr
}

func (m *multiRPCClient) sendTransaction(ctx context.Context, txOpts *bind.TransactOpts, to common.Address, data []byte) (*types.Transaction, error) {
nonce, err := m.nextNonce(ctx)
if err != nil {
return nil, err
}

tx, err := m.creds.ks.SignTx(*m.creds.acct, types.NewTx(&types.DynamicFeeTx{
To: &to,
ChainID: m.chainID,
Nonce: nonce,
Nonce: txOpts.Nonce.Uint64(),
Gas: txOpts.GasLimit,
GasFeeCap: txOpts.GasFeeCap,
GasTipCap: txOpts.GasTipCap,
Expand Down
27 changes: 27 additions & 0 deletions client/asset/eth/multirpc_live_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,33 @@ func TestMultiRPCClient(t *testing.T) {
}
}

//
// Create a providers.json file in your .dexc directory.
// 1. Seed can be anything. Just generate randomness.
// 2. Can connect to a host's websocket and http endpoints simultaneously.
// Actually nothing preventing you from connecting to a single provider
// 100 times, but that may be a guardrail added in the future.
//
// Example ~/.dexc/providers.json
/*
{
"testnet": {
"seed": "9e0084387c3ba7ac4b5bb409c220c08d4ee74f7b8c73b03fff18c727c5ce9f48",
"providers": [
"https://goerli.infura.io/v3/<API KEY>",
"https://rpc.ankr.com/eth_goerli"
]
},
"mainnet": {
"seed": "9e0084387c3ba7ac4b5bb409c220c08d4ee74f7b8c73b03fff18c727c5ce9f48",
"providers": [
"wss://mainnet.infura.io/ws/v3/<API KEY>",
"https://rpc.ankr.com/eth"
]
}
}
*/

func TestMonitorTestnet(t *testing.T) {
testMonitorNet(t, dex.Testnet)
}
Expand Down
Loading

0 comments on commit 95f9c31

Please sign in to comment.