diff --git a/client/asset/eth/eth.go b/client/asset/eth/eth.go index a27900c0ba..e57544836e 100644 --- a/client/asset/eth/eth.go +++ b/client/asset/eth/eth.go @@ -420,7 +420,11 @@ func createWallet(createWalletParams *asset.CreateWalletParams, skipConnect bool case walletTypeRPC: // Check that we can connect to all endpoints. - endpoints := strings.Split(createWalletParams.Settings[providersKey], " ") + providerDef := createWalletParams.Settings[providersKey] + if len(providerDef) == 0 { + return errors.New("no providers specified") + } + endpoints := strings.Split(providerDef, " ") n := len(endpoints) // TODO: This procedure may actually work for walletTypeGeth too. diff --git a/client/asset/eth/multirpc.go b/client/asset/eth/multirpc.go index a31743a519..38298498ef 100644 --- a/client/asset/eth/multirpc.go +++ b/client/asset/eth/multirpc.go @@ -33,7 +33,10 @@ import ( "github.com/ethereum/go-ethereum/rpc" ) -const failQuarantine = time.Minute +const ( + failQuarantine = time.Minute + receiptCacheExpiration = time.Hour +) var nonceProviderStickiness = time.Minute @@ -112,6 +115,11 @@ func (p *provider) bestHeader(ctx context.Context, log dex.Logger) (*types.Heade return hdr, nil } +type receiptRecord struct { + r *types.Receipt + lastAccess time.Time +} + // multiRPCClient is an ethFetcher backed by one or more public infrastructure // providers. // MATIC providers at @@ -134,6 +142,12 @@ type multiRPCClient struct { *provider stamp time.Time } + + receipts struct { + sync.RWMutex + cache map[common.Hash]*receiptRecord + lastClean time.Time + } } var _ ethFetcher = (*multiRPCClient)(nil) @@ -145,13 +159,17 @@ func newMultiRPCClient(dir string, endpoints []string, log dex.Logger, cfg *para return nil, fmt.Errorf("error parsing credentials from %q: %w", dir, err) } - return &multiRPCClient{ + m := &multiRPCClient{ cfg: cfg, log: log, creds: creds, chainID: chainID, endpoints: endpoints, - }, nil + } + m.receipts.cache = make(map[common.Hash]*receiptRecord) + m.receipts.lastClean = time.Now() + + return m, nil } func connectProviders(ctx context.Context, endpoints []string, addr common.Address, log dex.Logger) ([]*provider, error) { @@ -290,7 +308,11 @@ func (m *multiRPCClient) connect(ctx context.Context) (err error) { } func (m *multiRPCClient) reconfigure(ctx context.Context, settings map[string]string) error { - endpoints := strings.Split(settings[providersKey], " ") + providerDef := settings[providersKey] + if len(providerDef) == 0 { + return errors.New("no providers specified") + } + endpoints := strings.Split(providerDef, " ") providers, err := connectProviders(ctx, endpoints, m.creds.addr, m.log) if err != nil { return err @@ -306,39 +328,6 @@ func (m *multiRPCClient) reconfigure(ctx context.Context, settings map[string]st return nil } -const retryError = dex.ErrorKind("retrying") - -type retrySubscriptionDummy struct { - retry chan error -} - -func newRetrySubscription(ctx context.Context, t time.Duration) *retrySubscriptionDummy { - timer := time.NewTimer(t) - r := &retrySubscriptionDummy{ - retry: make(chan error), - } - go func() { - select { - case <-timer.C: - r.retry <- retryError - case <-ctx.Done(): - timer.Stop() - close(r.retry) - return - } - }() - - return r -} - -func (r *retrySubscriptionDummy) Unsubscribe() { - close(r.retry) -} - -func (r *retrySubscriptionDummy) Err() <-chan error { - return r.retry -} - func subHeaders(ctx context.Context, p *provider, sub ethereum.Subscription, h chan *types.Header, addr common.Address, log dex.Logger) { defer sub.Unsubscribe() var lastWarning time.Time @@ -360,23 +349,26 @@ func subHeaders(ctx context.Context, p *provider, sub ethereum.Subscription, h c } } - logs := make(chan types.Log, 128) - newAcctSub := func(retryTimeout time.Duration) ethereum.Subscription { - config := ethereum.FilterQuery{ - Addresses: []common.Address{addr}, - } + // I thought the filter logs might catch some transactions we coudld cache + // to avoid rpc calls, but in testing, I get nothing in the channel. May + // revisit later. + // logs := make(chan types.Log, 128) + // newAcctSub := func(retryTimeout time.Duration) ethereum.Subscription { + // config := ethereum.FilterQuery{ + // Addresses: []common.Address{addr}, + // } - acctSub, err := p.cl().SubscribeFilterLogs(ctx, config, logs) - if err != nil { - log.Errorf("failed to subscribe to filter logs: %v", err) - return newRetrySubscription(ctx, retryTimeout) - } - return acctSub - } + // acctSub, err := p.cl().SubscribeFilterLogs(ctx, config, logs) + // if err != nil { + // log.Errorf("failed to subscribe to filter logs: %v", err) + // return newRetrySubscription(ctx, retryTimeout) + // } + // return acctSub + // } - // If we fail the first time, don't try again. - acctSub := newAcctSub(time.Hour * 24 * 365) - defer acctSub.Unsubscribe() + // // If we fail the first time, don't try again. + // acctSub := newAcctSub(time.Hour * 24 * 365) + // defer acctSub.Unsubscribe() // Start the background filtering log.Tracef("handling websocket subscriptions") @@ -396,26 +388,69 @@ func subHeaders(ctx context.Context, p *provider, sub ethereum.Subscription, h c if err != nil { // context cancelled return } - case l := <-logs: - log.Tracef("%q log reported: %+v", p.host, l) - case err, ok := <-acctSub.Err(): - if err != nil && !errors.Is(err, retryError) { - log.Errorf("%q log subscription error: %v", p.host, err) - } - if ok { - acctSub = newAcctSub(time.Minute * 5) - } + // case l := <-logs: + // log.Tracef("%q log reported: %+v", p.host, l) + // case err, ok := <-acctSub.Err(): + // if err != nil && !errors.Is(err, retryError) { + // log.Errorf("%q log subscription error: %v", p.host, err) + // } + // if ok { + // acctSub = newAcctSub(time.Minute * 5) + // } case <-ctx.Done(): return } } } +// cleanReceipts cleans up the receipt cache, deleting any receipts that haven't +// been access for > receiptCacheExpiration. +func (m *multiRPCClient) cleanReceipts() { + m.receipts.Lock() + for txHash, rec := range m.receipts.cache { + if time.Since(rec.lastAccess) > receiptCacheExpiration { + delete(m.receipts.cache, txHash) + } + } + m.receipts.Unlock() +} + func (m *multiRPCClient) transactionReceipt(ctx context.Context, txHash common.Hash) (r *types.Receipt, err error) { - return r, m.withPreferred(func(p *provider) error { + + // TODO + // TODO: Plug into the monitoredTx system from #1638. + // TODO + + // Check the cache. + m.receipts.RLock() + cached := m.receipts.cache[txHash] + if cached != nil { + cached.lastAccess = time.Now() + } + if time.Since(m.receipts.lastClean) > time.Minute*20 { + m.receipts.lastClean = time.Now() + go m.cleanReceipts() + } + m.receipts.RUnlock() + if cached != nil { + return cached.r, nil + } + + if err = m.withPreferred(func(p *provider) error { r, err = p.cl().TransactionReceipt(ctx, txHash) return err - }) + }); err != nil { + return nil, err + } + + m.receipts.Lock() + m.receipts.cache[txHash] = &receiptRecord{ + r: r, + lastAccess: time.Now(), + } + m.receipts.Unlock() + + return r, nil } func (m *multiRPCClient) providerList() []*provider { @@ -441,13 +476,15 @@ func (m *multiRPCClient) withOne(providers []*provider, f func(*provider) error) return fmt.Errorf("all providers errored") } -func (m *multiRPCClient) withAny(f func(*provider) error) error { - providers := m.providerList() - - rand.Shuffle(len(providers), func(i, j int) { - providers[i], providers[j] = providers[j], providers[i] +func shuffleProviders(p []*provider) { + rand.Shuffle(len(p), func(i, j int) { + p[i], p[j] = p[j], p[i] }) +} +func (m *multiRPCClient) withAny(f func(*provider) error) error { + providers := m.providerList() + shuffleProviders(providers) return m.withOne(providers, f) } @@ -473,9 +510,7 @@ func (m *multiRPCClient) nonceProviderList() []*provider { providers = append(providers, p) } - rand.Shuffle(len(providers), func(i, j int) { - providers[i], providers[j] = providers[j], providers[i] - }) + shuffleProviders(providers) if lastProvider != nil { providers = append([]*provider{lastProvider}, providers...) @@ -503,6 +538,26 @@ func (m *multiRPCClient) addressBalance(ctx context.Context, addr common.Address } func (m *multiRPCClient) bestHeader(ctx context.Context) (hdr *types.Header, err error) { + var bestHeader *types.Header + for _, p := range m.providerList() { + h := p.cachedTip() + if h == nil { + continue + } + // This block choosing algo is probably too rudimentary. Really need + // shnuld traverse parents to a common block and sum up gas (including + // uncles?), I think. + if bestHeader == nil || // first one + h.Number.Cmp(bestHeader.Number) > 0 || // newer + (h.Number.Cmp(bestHeader.Number) == 0 && h.GasUsed > bestHeader.GasUsed) { // same height, but more gas used + + bestHeader = h + } + } + if bestHeader != nil { + return bestHeader, nil + } + return hdr, m.withAny(func(p *provider) error { hdr, err = p.bestHeader(ctx, m.log) return err @@ -565,15 +620,10 @@ func (m *multiRPCClient) sendSignedTransaction(ctx context.Context, tx *types.Tr } func (m *multiRPCClient) sendTransaction(ctx context.Context, txOpts *bind.TransactOpts, to common.Address, data []byte) (*types.Transaction, error) { - nonce, err := m.nextNonce(ctx) - if err != nil { - return nil, err - } - tx, err := m.creds.ks.SignTx(*m.creds.acct, types.NewTx(&types.DynamicFeeTx{ To: &to, ChainID: m.chainID, - Nonce: nonce, + Nonce: txOpts.Nonce.Uint64(), Gas: txOpts.GasLimit, GasFeeCap: txOpts.GasFeeCap, GasTipCap: txOpts.GasTipCap, diff --git a/client/asset/eth/multirpc_live_test.go b/client/asset/eth/multirpc_live_test.go index 1cc96e5322..b92404f9d7 100644 --- a/client/asset/eth/multirpc_live_test.go +++ b/client/asset/eth/multirpc_live_test.go @@ -176,6 +176,33 @@ func TestMultiRPCClient(t *testing.T) { } } +// +// Create a providers.json file in your .dexc directory. +// 1. Seed can be anything. Just generate randomness. +// 2. Can connect to a host's websocket and http endpoints simultaneously. +// Actually nothing preventing you from connecting to a single provider +// 100 times, but that may be a guardrail added in the future. +// +// Example ~/.dexc/providers.json +/* +{ + "testnet": { + "seed": "9e0084387c3ba7ac4b5bb409c220c08d4ee74f7b8c73b03fff18c727c5ce9f48", + "providers": [ + "https://goerli.infura.io/v3/", + "https://rpc.ankr.com/eth_goerli" + ] + }, + "mainnet": { + "seed": "9e0084387c3ba7ac4b5bb409c220c08d4ee74f7b8c73b03fff18c727c5ce9f48", + "providers": [ + "wss://mainnet.infura.io/ws/v3/", + "https://rpc.ankr.com/eth" + ] + } +} +*/ + func TestMonitorTestnet(t *testing.T) { testMonitorNet(t, dex.Testnet) } diff --git a/client/asset/eth/nodeclient_harness_test.go b/client/asset/eth/nodeclient_harness_test.go index 6f6f4b9eb6..2980436b23 100644 --- a/client/asset/eth/nodeclient_harness_test.go +++ b/client/asset/eth/nodeclient_harness_test.go @@ -234,14 +234,15 @@ func prepareRPCClient(name, dataDir, endpoint string, net dex.Network) (*multiRP return c, c.creds.acct, nil } -func prepareTestRPCClients(initiatorDir, participantDir string, net dex.Network) (err error) { - initiatorEndpoint, participantEndpoint := alphaIPCFile, betaIPCFile +func rpcEndpoints(net dex.Network) (string, string) { if net == dex.Testnet { - if rpcNode == "" { - return fmt.Errorf("rpcNode must be specified for testnet") - } - initiatorEndpoint, participantEndpoint = rpcNode, rpcNode + return rpcNode, rpcNode } + return alphaIPCFile, betaIPCFile +} + +func prepareTestRPCClients(initiatorDir, participantDir string, net dex.Network) (err error) { + initiatorEndpoint, participantEndpoint := rpcEndpoints(net) ethClient, simnetAcct, err = prepareRPCClient("initiator", initiatorDir, initiatorEndpoint, net) if err != nil { @@ -325,12 +326,14 @@ func runSimnet(m *testing.M) (int, error) { ethSwapContractAddr = dexeth.ContractAddresses[0][dex.Simnet] - err = setupWallet(simnetWalletDir, simnetWalletSeed, "localhost:30355", dex.Simnet) + initiatorRPC, participantRPC := rpcEndpoints(dex.Simnet) + + err = setupWallet(simnetWalletDir, simnetWalletSeed, "localhost:30355", initiatorRPC, dex.Simnet) if err != nil { return 1, err } - err = setupWallet(participantWalletDir, participantWalletSeed, "localhost:30356", dex.Simnet) + err = setupWallet(participantWalletDir, participantWalletSeed, "localhost:30356", participantRPC, dex.Simnet) if err != nil { return 1, err } @@ -448,11 +451,14 @@ func runTestnet(m *testing.M) (int, error) { secPerBlock = testnetSecPerBlock ethSwapContractAddr = dexeth.ContractAddresses[0][dex.Testnet] fmt.Printf("ETH swap contract address is %v\n", ethSwapContractAddr) - err = setupWallet(testnetWalletDir, testnetWalletSeed, "localhost:30355", dex.Testnet) + + initiatorRPC, participantRPC := rpcEndpoints(dex.Simnet) + + err = setupWallet(testnetWalletDir, testnetWalletSeed, "localhost:30355", initiatorRPC, dex.Testnet) if err != nil { return 1, err } - err = setupWallet(testnetParticipantWalletDir, testnetParticipantWalletSeed, "localhost:30356", dex.Testnet) + err = setupWallet(testnetParticipantWalletDir, testnetParticipantWalletSeed, "localhost:30356", participantRPC, dex.Testnet) if err != nil { return 1, err } @@ -565,21 +571,25 @@ func TestMain(m *testing.M) { os.Exit(exitCode) } -func setupWallet(walletDir, seed, listenAddress string, net dex.Network) error { +func setupWallet(walletDir, seed, listenAddress, rpcAddr string, net dex.Network) error { walletType := walletTypeGeth + settings := map[string]string{ + "nodelistenaddr": listenAddress, + } if useRPC { walletType = walletTypeRPC + settings = map[string]string{ + providersKey: rpcAddr, + } } seedB, _ := hex.DecodeString(seed) createWalletParams := asset.CreateWalletParams{ - Type: walletType, - Seed: seedB, - Pass: []byte(pw), - Settings: map[string]string{ - "nodelistenaddr": listenAddress, - }, - DataDir: walletDir, - Net: net, + Type: walletType, + Seed: seedB, + Pass: []byte(pw), + Settings: settings, + DataDir: walletDir, + Net: net, } return CreateWallet(&createWalletParams) } diff --git a/client/core/core.go b/client/core/core.go index eef4e585be..d1d2c1402d 100644 --- a/client/core/core.go +++ b/client/core/core.go @@ -2100,7 +2100,7 @@ func (c *Core) createSeededWallet(assetID uint32, crypter encrypt.Crypter, form } defer encode.ClearBytes(seed) - c.log.Infof("Initializing a built-in %s wallet", unbip(assetID)) + c.log.Infof("Initializing a %s wallet", unbip(assetID)) if err = asset.CreateWallet(assetID, &asset.CreateWalletParams{ Type: form.Type, Seed: seed, diff --git a/client/core/simnet_trade.go b/client/core/simnet_trade.go index 97ecdb669b..5bdf43a97e 100644 --- a/client/core/simnet_trade.go +++ b/client/core/simnet_trade.go @@ -58,8 +58,9 @@ import ( var ( dexHost = "127.0.0.1:17273" - homeDir = os.Getenv("HOME") - dextestDir = filepath.Join(homeDir, "dextest") + homeDir = os.Getenv("HOME") + dextestDir = filepath.Join(homeDir, "dextest") + alphaIPCFile = filepath.Join(dextestDir, "eth", "alpha", "node", "geth.ipc") tLockTimeTaker = 30 * time.Second tLockTimeMaker = 1 * time.Minute @@ -1582,7 +1583,8 @@ func bchWallet(wt SimWalletType, node string) (*tWallet, error) { func ethWallet() (*tWallet, error) { return &tWallet{ fund: true, - walletType: "geth", + walletType: "rpc", + config: map[string]string{"providers": alphaIPCFile}, }, nil } @@ -1591,8 +1593,9 @@ func dexttWallet() (*tWallet, error) { fund: true, walletType: "token", parent: &WalletForm{ - Type: "geth", + Type: "rpc", AssetID: eth.BipID, + Config: map[string]string{"providers": alphaIPCFile}, }, }, nil } diff --git a/dex/testing/loadbot/loadbot.go b/dex/testing/loadbot/loadbot.go index 109a1d8858..322282fd1f 100644 --- a/dex/testing/loadbot/loadbot.go +++ b/dex/testing/loadbot/loadbot.go @@ -79,9 +79,11 @@ var ( log dex.Logger unbip = dex.BipIDSymbol - usr, _ = user.Current() - dextestDir = filepath.Join(usr.HomeDir, "dextest") - botDir = filepath.Join(dextestDir, "loadbot") + usr, _ = user.Current() + dextestDir = filepath.Join(usr.HomeDir, "dextest") + botDir = filepath.Join(dextestDir, "loadbot") + alphaIPCFile = filepath.Join(dextestDir, "eth", "alpha", "node", "geth.ipc") + betaIPCFile = filepath.Join(dextestDir, "eth", "beta", "node", "geth.ipc") ctx, quit = context.WithCancel(context.Background()) diff --git a/dex/testing/loadbot/mantle.go b/dex/testing/loadbot/mantle.go index 25fd7a8c6d..fa55bdae65 100644 --- a/dex/testing/loadbot/mantle.go +++ b/dex/testing/loadbot/mantle.go @@ -679,14 +679,15 @@ func newBotWallet(symbol, node, name string, port string, pass []byte, minFunds, }, } case eth, dextt: + rpcProvider := alphaIPCFile + if node == beta { + rpcProvider = betaIPCFile + } form = &core.WalletForm{ - Type: "geth", + Type: "rpc", AssetID: ethID, Config: map[string]string{ - "walletname": name, - "rpcuser": "user", - "rpcpassword": "pass", - "rpcport": port, + "providers": rpcProvider, }, } if symbol == dextt {