diff --git a/cmd/flags.go b/cmd/flags.go index 1f8396899..a2b6ff07f 100644 --- a/cmd/flags.go +++ b/cmd/flags.go @@ -11,50 +11,52 @@ import ( ) const ( - flagHome = "home" - flagURL = "url" - flagSkip = "skip" - flagTimeout = "timeout" - flagJSON = "json" - flagYAML = "yaml" - flagFile = "file" - flagPath = "path" - flagMaxTxSize = "max-tx-size" - flagMaxMsgLength = "max-msgs" - flagIBCDenoms = "ibc-denoms" - flagTimeoutHeightOffset = "timeout-height-offset" - flagTimeoutTimeOffset = "timeout-time-offset" - flagMaxRetries = "max-retries" - flagThresholdTime = "time-threshold" - flagUpdateAfterExpiry = "update-after-expiry" - flagUpdateAfterMisbehaviour = "update-after-misbehaviour" - flagClientTrustingPeriod = "client-tp" - flagOverride = "override" - flagSrcPort = "src-port" - flagDstPort = "dst-port" - flagOrder = "order" - flagVersion = "version" - flagDebugAddr = "debug-addr" - flagOverwriteConfig = "overwrite" - flagLimit = "limit" - flagHeight = "height" - flagPage = "page" - flagPageKey = "page-key" - flagCountTotal = "count-total" - flagReverse = "reverse" - flagProcessor = "processor" - flagInitialBlockHistory = "block-history" - flagFlushInterval = "flush-interval" - flagMemo = "memo" - flagFilterRule = "filter-rule" - flagFilterChannels = "filter-channels" - flagSrcChainID = "src-chain-id" - flagDstChainID = "dst-chain-id" - flagSrcClientID = "src-client-id" - flagDstClientID = "dst-client-id" - flagSrcConnID = "src-connection-id" - flagDstConnID = "dst-connection-id" - flagBtpBlockHeight = "btp-block-height" + flagHome = "home" + flagURL = "url" + flagSkip = "skip" + flagTimeout = "timeout" + flagJSON = "json" + flagYAML = "yaml" + flagFile = "file" + flagPath = "path" + flagMaxTxSize = "max-tx-size" + flagMaxMsgLength = "max-msgs" + flagIBCDenoms = "ibc-denoms" + flagTimeoutHeightOffset = "timeout-height-offset" + flagTimeoutTimeOffset = "timeout-time-offset" + flagMaxRetries = "max-retries" + flagThresholdTime = "time-threshold" + flagUpdateAfterExpiry = "update-after-expiry" + flagUpdateAfterMisbehaviour = "update-after-misbehaviour" + flagClientTrustingPeriod = "client-tp" + flagOverride = "override" + flagSrcPort = "src-port" + flagDstPort = "dst-port" + flagOrder = "order" + flagVersion = "version" + flagDebugAddr = "debug-addr" + flagOverwriteConfig = "overwrite" + flagLimit = "limit" + flagHeight = "height" + flagPage = "page" + flagPageKey = "page-key" + flagCountTotal = "count-total" + flagReverse = "reverse" + flagProcessor = "processor" + flagInitialBlockHistory = "block-history" + flagFlushInterval = "flush-interval" + flagMemo = "memo" + flagFilterRule = "filter-rule" + flagFilterChannels = "filter-channels" + flagSrcChainID = "src-chain-id" + flagDstChainID = "dst-chain-id" + flagSrcClientID = "src-client-id" + flagDstClientID = "dst-client-id" + flagSrcConnID = "src-connection-id" + flagDstConnID = "dst-connection-id" + flagBtpBlockHeight = "btp-block-height" + flagBtpUpdateProofContext = "btp-update-proof-context" + flagBTPUpdateProofContextFromHeight = "btp-update-proof-context-from-height" ) const ( @@ -390,3 +392,19 @@ func OverwriteConfigFlag(v *viper.Viper, cmd *cobra.Command) *cobra.Command { } return cmd } + +func BtpUpdateProofContextFlag(v *viper.Viper, cmd *cobra.Command) *cobra.Command { + cmd.Flags().BoolP(flagBtpUpdateProofContext, "u", false, "update all proof context change") + if err := v.BindPFlag(flagFlushInterval, cmd.Flags().Lookup(flagFlushInterval)); err != nil { + panic(err) + } + return cmd +} + +func BtpUpdateProofContextFromHeightFlag(v *viper.Viper, cmd *cobra.Command) *cobra.Command { + cmd.Flags().Int64(flagBTPUpdateProofContextFromHeight, 0, "btp update proof context from height") + if err := v.BindPFlag(flagFlushInterval, cmd.Flags().Lookup(flagFlushInterval)); err != nil { + panic(err) + } + return cmd +} diff --git a/cmd/start.go b/cmd/start.go index c2cdf9406..a03c92915 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -26,6 +26,7 @@ import ( "github.com/cosmos/relayer/v2/internal/relaydebug" "github.com/cosmos/relayer/v2/relayer" "github.com/cosmos/relayer/v2/relayer/chains/cosmos" + "github.com/cosmos/relayer/v2/relayer/common" "github.com/cosmos/relayer/v2/relayer/processor" "github.com/spf13/cobra" "go.uber.org/zap" @@ -143,8 +144,24 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName, appName, appName)), return err } + btpProofContextUpdateFrom, err := cmd.Flags().GetInt64(flagBTPUpdateProofContextFromHeight) + if err != nil { + return err + } + + ctx := cmd.Context() + shouldUpdateProofContext, err := cmd.Flags().GetBool(flagBtpUpdateProofContext) + // runUsing the flag + if shouldUpdateProofContext { + iconStartHeight, err := relayer.RunProofContextUpdate(cmd.Context(), a.log, chains, paths, btpProofContextUpdateFrom) + if err != nil { + return fmt.Errorf("unable to complete proofContextUpdate error: %v", err) + } + ctx = context.WithValue(ctx, common.IconStartHeightFromPreRunContext, iconStartHeight) + } + rlyErrCh := relayer.StartRelayer( - cmd.Context(), + ctx, a.log, chains, paths, @@ -179,5 +196,7 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName, appName, appName)), cmd = initBlockFlag(a.viper, cmd) cmd = flushIntervalFlag(a.viper, cmd) cmd = memoFlag(a.viper, cmd) + cmd = BtpUpdateProofContextFlag(a.viper, cmd) + cmd = BtpUpdateProofContextFromHeightFlag(a.viper, cmd) return cmd } diff --git a/examples/config_IBC_ICON.yaml b/examples/config_IBC_ICON.yaml index d0f69c19b..9f2f68ec1 100644 --- a/examples/config_IBC_ICON.yaml +++ b/examples/config_IBC_ICON.yaml @@ -1,59 +1,57 @@ global: - api-listen-addr: :5183 - timeout: 10s - memo: "" - light-cache-size: 20 + api-listen-addr: :5183 + timeout: 10s + memo: "" + light-cache-size: 20 chains: - archway: - type: wasm - value: - key-directory: /home/user/.relayer/keys - key: relayWallet - chain-id: localnet - rpc-addr: http://localhost:26657 - account-prefix: archway - keyring-backend: test - gas-adjustment: 1.5 - gas-prices: 0.025stake - min-gas-amount: 1000000 - debug: true - timeout: 20s - block-timeout: "" - output-format: json - sign-mode: direct - extra-codecs: [] - coin-type: 0 - broadcast-mode: batch - ibc-handler-address: archway1pvrwmjuusn9wh34j7y520g8gumuy9xtl3gvprlljfdpwju3x7ucszwhc7n - first-retry-block-after: 0 - start-height: 0 - block-interval: 3000 - icon: - type: icon - value: - key-directory: /home/user/.relayer/keys - chain-id: ibc-icon - rpc-addr: http://localhost:9082/api/v3/ - timeout: 30s - keystore: godWallet - password: gochain - icon-network-id: 3 - btp-network-id: 1 - btp-network-type-id: 1 - start-height: 0 - ibc-handler-address: cxbeb5929616e0dbd2fec1e6e950ab09e45e6fb25a - first-retry-block-after: 0 - block-interval: 2000 + archway: + type: wasm + value: + key-directory: /home/user/.relayer/keys + key: relayWallet + chain-id: localnet + rpc-addr: http://localhost:26657 + account-prefix: archway + keyring-backend: test + gas-adjustment: 1.5 + gas-prices: 0.025stake + min-gas-amount: 1000000 + debug: true + timeout: 20s + block-timeout: "" + output-format: json + sign-mode: direct + extra-codecs: [] + coin-type: 0 + broadcast-mode: batch + ibc-handler-address: archway1pvrwmjuusn9wh34j7y520g8gumuy9xtl3gvprlljfdpwju3x7ucszwhc7n + first-retry-block-after: 0 + block-interval: 3000 + icon: + type: icon + value: + key-directory: /home/user/.relayer/keys + chain-id: ibc-icon + rpc-addr: http://localhost:9082/api/v3/ + timeout: 30s + keystore: godWallet + password: gochain + icon-network-id: 3 + btp-network-id: 1 + btp-network-type-id: 1 + ibc-handler-address: cxbeb5929616e0dbd2fec1e6e950ab09e45e6fb25a + first-retry-block-after: 0 + block-interval: 2000 paths: - icon-archway: - src: - chain-id: ibc-icon - client-id: 07-tendermint-0 - connection-id: connection-0 - dst: - chain-id: localnet - client-id: iconclient-0 - connection-id: connection-0 - src-channel-filter: - rule: "" - channel-list: [] + icon-archway: + src: + chain-id: ibc-icon + client-id: 07-tendermint-0 + connection-id: connection-0 + dst: + chain-id: localnet + client-id: iconclient-0 + connection-id: connection-0 + src-channel-filter: + rule: "" + channel-list: [] diff --git a/examples/demo/configs/chains/ibc-icon.json b/examples/demo/configs/chains/ibc-icon.json index 466f153ee..912a1e2a6 100644 --- a/examples/demo/configs/chains/ibc-icon.json +++ b/examples/demo/configs/chains/ibc-icon.json @@ -10,7 +10,6 @@ "icon-network-id": 3, "btp-network-id": 2, "btp-network-type-id": 1, - "start-height": 0, "ibc-handler-address": "cxbeb5929616e0dbd2fec1e6e950ab09e45e6fb25a", "first-retry-block-after": 0, "block-interval": 2000 diff --git a/examples/demo/configs/chains/ibc-wasm.json b/examples/demo/configs/chains/ibc-wasm.json index d0f6bbe3a..73b035289 100644 --- a/examples/demo/configs/chains/ibc-wasm.json +++ b/examples/demo/configs/chains/ibc-wasm.json @@ -20,7 +20,7 @@ "broadcast-mode": "batch", "ibc-handler-address": "archway1pvrwmjuusn9wh34j7y520g8gumuy9xtl3gvprlljfdpwju3x7ucszwhc7n", "first-retry-block-after": 0, - "start-height": 0, - "block-interval": 3000 + "block-interval": 3000, + "concurrency": 100 } } \ No newline at end of file diff --git a/relayer/chains/cosmos/query.go b/relayer/chains/cosmos/query.go index 5e03ed3c6..687deca48 100644 --- a/relayer/chains/cosmos/query.go +++ b/relayer/chains/cosmos/query.go @@ -1173,5 +1173,21 @@ func (cc *CosmosProvider) QueryConsensusStateABCI(ctx context.Context, clientID func (ap *CosmosProvider) QueryClientPrevConsensusStateHeight(ctx context.Context, chainHeight int64, clientId string, clientHeight int64) (exported.Height, error) { panic("QueryClientPrevConsensusStateHeight not implemented") +} + +func (ap *CosmosProvider) QueryPacketMessageByEventHeight(ctx context.Context, eventType, srcChanID, srcPortID string, sequence uint64, seqHeight uint64) (provider.PacketInfo, error) { + panic("QuerySendPacketByHeight not implemented") +} +func (ap *CosmosProvider) QueryPacketHeights(ctx context.Context, latestHeight int64, channelId, portId string, startSeq, endSeq uint64) (packetHeights provider.MessageHeights, err error) { + panic("QueryPacketHeights not implemented") +} +func (ap *CosmosProvider) QueryAckHeights(ctx context.Context, latestHeight int64, channelId, portId string, startSeq, endSeq uint64) (packetHeights provider.MessageHeights, err error) { + panic("QueryAckHeights not implemented") +} +func (ap *CosmosProvider) QueryMissingPacketReceipts(ctx context.Context, latestHeight int64, channelId, portId string, startSeq, endSeq uint64) (missingReceipts []uint64, err error) { + panic("QueryMissingPacketReceipts not implemented") +} +func (ap *CosmosProvider) QueryNextSeqSend(ctx context.Context, height int64, channelid, portid string) (seq uint64, err error) { + panic("QueryNextSeqSend not implemented") } diff --git a/relayer/chains/icon/client.go b/relayer/chains/icon/client.go index 034865743..8b2024c50 100644 --- a/relayer/chains/icon/client.go +++ b/relayer/chains/icon/client.go @@ -64,6 +64,7 @@ type IClient interface { GetLastBlock() (*types.Block, error) GetBlockHeaderByHeight(height int64) (*types.BlockHeader, error) GetValidatorsByHash(hash common.HexHash) ([]common.Address, error) + GetPrepTerm() (*types.PrepTerm, error) } type Client struct { @@ -622,6 +623,25 @@ func (c *Client) EstimateStep(param *types.TransactionParamForEstimate) (*types. return &result, nil } +func (c *Client) GetPrepTerm() (*types.PrepTerm, error) { + + param := types.CallParam{ + FromAddress: types.Address(fmt.Sprintf("hx%s", strings.Repeat("0", 40))), + ToAddress: types.Address(genesisContract), + DataType: "call", + Data: &types.CallData{ + Method: MethodGetPrepTerm, + Params: map[string]string{}, + }, + } + + var op types.PrepTerm + if err := c.Call(¶m, &op); err != nil { + return nil, err + } + return &op, nil +} + func NewClient(uri string, l *zap.Logger) *Client { //TODO options {MaxRetrySendTx, MaxRetryGetResult, MaxIdleConnsPerHost, Debug, Dump} tr := &http.Transport{MaxIdleConnsPerHost: 1000} diff --git a/relayer/chains/icon/icon_chain_processor.go b/relayer/chains/icon/icon_chain_processor.go index cc2be0940..ca1b094e2 100644 --- a/relayer/chains/icon/icon_chain_processor.go +++ b/relayer/chains/icon/icon_chain_processor.go @@ -76,7 +76,7 @@ type Verifier struct { prevNetworkSectionHash []byte } -func NewIconChainProcessor(log *zap.Logger, provider *IconProvider, metrics *processor.PrometheusMetrics, heightSnapshot chan struct{}) *IconChainProcessor { +func NewIconChainProcessor(log *zap.Logger, provider *IconProvider, metrics *processor.PrometheusMetrics) *IconChainProcessor { return &IconChainProcessor{ log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())), chainProvider: provider, @@ -86,7 +86,7 @@ func NewIconChainProcessor(log *zap.Logger, provider *IconProvider, metrics *pro connectionClients: make(map[string]string), channelConnections: make(map[string]string), metrics: metrics, - heightSnapshotChan: heightSnapshot, + // heightSnapshotChan: heightSnapshot, } } @@ -135,12 +135,12 @@ type queryCyclePersistence struct { } func (icp *IconChainProcessor) Run(ctx context.Context, initialBlockHistory uint64) error { - persistence := queryCyclePersistence{ - minQueryLoopDuration: time.Second, - } var eg errgroup.Group + // passing the value from the context + iconStartHeightFromPreRunContext, _ := rlycommon.AnyToInt64(ctx.Value(rlycommon.IconStartHeightFromPreRunContext)) + eg.Go(func() error { return icp.initializeConnectionState(ctx) }) @@ -153,24 +153,24 @@ func (icp *IconChainProcessor) Run(ctx context.Context, initialBlockHistory uint // start_query_cycle icp.log.Debug("Starting query cycle") - err := icp.monitoring(ctx, &persistence) + err := icp.monitoring(ctx, iconStartHeightFromPreRunContext) return err } -func (icp *IconChainProcessor) StartFromHeight(ctx context.Context) int64 { - cfg := icp.Provider().ProviderConfig().(*IconProviderConfig) - - if cfg.StartHeight != 0 { - return cfg.StartHeight - } - snapshotHeight, err := rlycommon.LoadSnapshotHeight(icp.Provider().ChainId()) - if err != nil { - icp.log.Warn("Failed to load height from snapshot", zap.Error(err)) - } else { - icp.log.Info("Obtained start height from config", zap.Int64("height", snapshotHeight)) - } - return snapshotHeight -} +// func (icp *IconChainProcessor) StartFromHeight(ctx context.Context) int64 { +// cfg := icp.Provider().ProviderConfig().(*IconProviderConfig) + +// if cfg.StartHeight != 0 { +// return cfg.StartHeight +// } +// snapshotHeight, err := rlycommon.LoadSnapshotHeight(icp.Provider().ChainId()) +// if err != nil { +// icp.log.Warn("Failed to load height from snapshot", zap.Error(err)) +// } else { +// icp.log.Info("Obtained start height from config", zap.Int64("height", snapshotHeight)) +// } +// return snapshotHeight +// } func (icp *IconChainProcessor) getLastSavedHeight() int64 { snapshotHeight, err := rlycommon.LoadSnapshotHeight(icp.Provider().ChainId()) @@ -253,7 +253,7 @@ func (icp *IconChainProcessor) GetLatestHeight() uint64 { return icp.latestBlock.Height } -func (icp *IconChainProcessor) monitoring(ctx context.Context, persistence *queryCyclePersistence) error { +func (icp *IconChainProcessor) monitoring(ctx context.Context, startFromHeight int64) error { errCh := make(chan error) // error channel reconnectCh := make(chan struct{}, 1) // reconnect channel @@ -274,7 +274,7 @@ func (icp *IconChainProcessor) monitoring(ctx context.Context, persistence *quer } var err error - processedheight := icp.StartFromHeight(ctx) + processedheight := startFromHeight latestHeight, err := icp.chainProvider.QueryLatestHeight(ctx) if err != nil { icp.log.Error("Error fetching block", zap.Error(err)) @@ -311,8 +311,8 @@ loop: case err := <-errCh: return err - case <-icp.heightSnapshotChan: - icp.SnapshotHeight(icp.getHeightToSave(int64(icp.latestBlock.Height))) + // case <-icp.heightSnapshotChan: + // icp.SnapshotHeight(icp.getHeightToSave(int64(icp.latestBlock.Height))) case <-reconnectCh: cancelMonitorBlock() @@ -329,10 +329,10 @@ loop: }, func(conn *websocket.Conn) { }, func(conn *websocket.Conn, err error) {}) if err != nil { - ht := icp.getHeightToSave(processedheight) - if ht != icp.getLastSavedHeight() { - icp.SnapshotHeight(ht) - } + // ht := icp.getHeightToSave(processedheight) + // if ht != icp.getLastSavedHeight() { + // icp.SnapshotHeight(ht) + // } if errors.Is(err, context.Canceled) { return } @@ -379,9 +379,6 @@ loop: break } time.Sleep(10 * time.Millisecond) - if icp.firstTime { - time.Sleep(4000 * time.Millisecond) - } icp.firstTime = false if br = nil; len(btpBlockRespCh) > 0 { br = <-btpBlockRespCh @@ -476,22 +473,22 @@ loop: } } -func (icp *IconChainProcessor) getHeightToSave(height int64) int64 { - retryAfter := icp.Provider().ProviderConfig().GetFirstRetryBlockAfter() - ht := height - int64(retryAfter) - if ht < 0 { - return 0 - } - return ht -} - -func (icp *IconChainProcessor) SnapshotHeight(height int64) { - icp.log.Info("Save height for snapshot", zap.Int64("height", height)) - err := rlycommon.SnapshotHeight(icp.Provider().ChainId(), height) - if err != nil { - icp.log.Warn("Failed saving height snapshot for height", zap.Int64("height", height)) - } -} +// func (icp *IconChainProcessor) getHeightToSave(height int64) int64 { +// retryAfter := icp.Provider().ProviderConfig().GetFirstRetryBlockAfter() +// ht := height - int64(retryAfter) +// if ht < 0 { +// return 0 +// } +// return ht +// } + +// func (icp *IconChainProcessor) SnapshotHeight(height int64) { +// icp.log.Info("Save height for snapshot", zap.Int64("height", height)) +// err := rlycommon.SnapshotHeight(icp.Provider().ChainId(), height) +// if err != nil { +// icp.log.Warn("Failed saving height snapshot for height", zap.Int64("height", height)) +// } +// } func (icp *IconChainProcessor) verifyBlock(ctx context.Context, ibcHeader provider.IBCHeader) error { header, ok := ibcHeader.(IconIBCHeader) diff --git a/relayer/chains/icon/methods.go b/relayer/chains/icon/methods.go index 2c5b43db4..1c33e4738 100644 --- a/relayer/chains/icon/methods.go +++ b/relayer/chains/icon/methods.go @@ -41,5 +41,11 @@ const ( MethodRequestTimeout = "requestTimeout" MethodTimeoutPacket = "timeoutPacket" - MethodGetAllPorts = "getAllPorts" + MethodGetAllPorts = "getAllPorts" + MethodGetMissingPacketReceipts = "getMissingPacketReceipts" + MethodGetPacketHeights = "getPacketHeights" + MethodGetAckHeights = "getAckHeights" + + // + MethodGetPrepTerm = "getPRepTerm" ) diff --git a/relayer/chains/icon/provider.go b/relayer/chains/icon/provider.go index b98583a0d..e95a31e2f 100644 --- a/relayer/chains/icon/provider.go +++ b/relayer/chains/icon/provider.go @@ -51,17 +51,17 @@ var ( * KeyDirectory/Keystore.json */ type IconProviderConfig struct { - KeyDirectory string `json:"key-directory" yaml:"key-directory"` - ChainName string `json:"-" yaml:"-"` - ChainID string `json:"chain-id" yaml:"chain-id"` - RPCAddr string `json:"rpc-addr" yaml:"rpc-addr"` - Timeout string `json:"timeout" yaml:"timeout"` - Keystore string `json:"keystore" yaml:"keystore"` - Password string `json:"password" yaml:"password"` - ICONNetworkID int64 `json:"icon-network-id" yaml:"icon-network-id" default:"3"` - BTPNetworkID int64 `json:"btp-network-id" yaml:"btp-network-id"` - BTPNetworkTypeID int64 `json:"btp-network-type-id" yaml:"btp-network-type-id"` - StartHeight int64 `json:"start-height" yaml:"start-height"` + KeyDirectory string `json:"key-directory" yaml:"key-directory"` + ChainName string `json:"-" yaml:"-"` + ChainID string `json:"chain-id" yaml:"chain-id"` + RPCAddr string `json:"rpc-addr" yaml:"rpc-addr"` + Timeout string `json:"timeout" yaml:"timeout"` + Keystore string `json:"keystore" yaml:"keystore"` + Password string `json:"password" yaml:"password"` + ICONNetworkID int64 `json:"icon-network-id" yaml:"icon-network-id" default:"3"` + BTPNetworkID int64 `json:"btp-network-id" yaml:"btp-network-id"` + BTPNetworkTypeID int64 `json:"btp-network-type-id" yaml:"btp-network-type-id"` + // StartHeight int64 `json:"start-height" yaml:"start-height"` IbcHandlerAddress string `json:"ibc-handler-address" yaml:"ibc-handler-address"` FirstRetryBlockAfter uint64 `json:"first-retry-block-after" yaml:"first-retry-block-after"` BlockInterval uint64 `json:"block-interval" yaml:"block-interval"` @@ -76,6 +76,7 @@ func (pp *IconProviderConfig) Validate() error { return fmt.Errorf("Ibc handler Address cannot be empty") } + // BlockInterval should be in milliseconds if pp.BlockInterval == 0 { return fmt.Errorf("Block interval cannot be zero") } @@ -103,15 +104,14 @@ func (pp *IconProviderConfig) NewProvider(log *zap.Logger, homepath string, debu return nil, err } - codec := MakeCodec(ModuleBasics, []string{}) - return &IconProvider{ - log: log.With(zap.String("chain_id", pp.ChainID)), - client: NewClient(pp.getRPCAddr(), log), - PCfg: pp, - StartHeight: uint64(pp.StartHeight), - codec: codec, + log: log.With(zap.String("chain_id", pp.ChainID)), + client: NewClient(pp.getRPCAddr(), log), + PCfg: pp, + // StartHeight: uint64(pp.StartHeight), + codec: MakeCodec(ModuleBasics, []string{}), }, nil + } func (pp IconProviderConfig) getRPCAddr() string { @@ -192,20 +192,6 @@ func (h IconIBCHeader) ShouldUpdateForProofContextChange() bool { //ChainProvider Methods func (icp *IconProvider) Init(ctx context.Context) error { - // if _, err := os.Stat(icp.PCfg.Keystore); err != nil { - // return err - // } - - // ksByte, err := os.ReadFile(icp.PCfg.Keystore) - // if err != nil { - // return err - // } - - // wallet, err := wallet.NewFromKeyStore(ksByte, []byte(icp.PCfg.Password)) - // if err != nil { - // return err - // } - // icp.AddWallet(wallet) return nil } @@ -226,7 +212,8 @@ func (icp *IconProvider) NewClientState( return nil, fmt.Errorf("Blockinterval cannot be empty in Icon config") } - trustingBlockPeriod := uint64(dstTrustingPeriod) / (icp.PCfg.BlockInterval * uint64(common.NanoToMilliRatio)) + // BlockInterval should be in milliseconds + trustingBlockPeriod := uint64(dstTrustingPeriod) / (icp.PCfg.BlockInterval * uint64(time.Millisecond)) return &icon.ClientState{ // In case of Icon: Trusting Period is block Difference // see: light.proto in ibc-integration diff --git a/relayer/chains/icon/query.go b/relayer/chains/icon/query.go index 1804ac8ce..1e2f2f303 100644 --- a/relayer/chains/icon/query.go +++ b/relayer/chains/icon/query.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "fmt" "math/big" + "sort" "strings" "time" @@ -14,6 +15,7 @@ import ( "github.com/cosmos/gogoproto/proto" "github.com/cosmos/ibc-go/v7/modules/core/exported" ibcexported "github.com/cosmos/ibc-go/v7/modules/core/exported" + "github.com/gorilla/websocket" "github.com/pkg/errors" "go.uber.org/zap" @@ -39,12 +41,19 @@ import ( var _ provider.QueryProvider = &IconProvider{} const ( - epoch = 24 * 3600 * 1000 + epoch = 24 * 3600 * 1000 + sequenceLimit = 2 + genesisContract = "cx0000000000000000000000000000000000000000" ) type CallParamOption func(*types.CallParam) +// if height is less than or zero don't set height func callParamsWithHeight(height types.HexInt) CallParamOption { + val, _ := height.Value() + if val <= 0 { + return func(*types.CallParam) {} + } return func(cp *types.CallParam) { cp.Height = height } @@ -306,7 +315,7 @@ func (icp *IconProvider) QueryConsensusState(ctx context.Context, height int64) // query all the clients of the chain func (icp *IconProvider) QueryClients(ctx context.Context) (clienttypes.IdentifiedClientStates, error) { - seq, err := icp.getNextSequence(ctx, MethodGetNextClientSequence) + seq, err := icp.getNextSequence(ctx, MethodGetNextClientSequence, 0, map[string]interface{}{}) if err != nil { return nil, err @@ -398,7 +407,8 @@ var emptyConnRes = conntypes.NewQueryConnectionResponse( // ics 03 - connection func (icp *IconProvider) QueryConnections(ctx context.Context) (conns []*conntypes.IdentifiedConnection, err error) { - nextSeq, err := icp.getNextSequence(ctx, MethodGetNextConnectionSequence) + // sending -1 for latest height + nextSeq, err := icp.getNextSequence(ctx, MethodGetNextConnectionSequence, -1, map[string]interface{}{}) if err != nil { return nil, err } @@ -440,27 +450,16 @@ func (icp *IconProvider) QueryConnections(ctx context.Context) (conns []*conntyp return conns, nil } -func (icp *IconProvider) getNextSequence(ctx context.Context, methodName string) (uint64, error) { - +func (icp *IconProvider) getNextSequence(ctx context.Context, methodName string, height int64, params map[string]interface{}) (uint64, error) { var seq types.HexInt - switch methodName { - case MethodGetNextClientSequence: - callParam := icp.prepareCallParams(MethodGetNextClientSequence, map[string]interface{}{}) - if err := icp.client.Call(callParam, &seq); err != nil { - return 0, err - } - case MethodGetNextChannelSequence: - callParam := icp.prepareCallParams(MethodGetNextChannelSequence, map[string]interface{}{}) - if err := icp.client.Call(callParam, &seq); err != nil { - return 0, err - } - case MethodGetNextConnectionSequence: - callParam := icp.prepareCallParams(MethodGetNextConnectionSequence, map[string]interface{}{}) - if err := icp.client.Call(callParam, &seq); err != nil { - return 0, err - } - default: - return 0, errors.New("Invalid method name") + options := make([]CallParamOption, 0) + if height > 0 { + options = append(options, callParamsWithHeight(types.NewHexInt(height))) + } + + callParam := icp.prepareCallParams(methodName, params, options...) + if err := icp.client.Call(callParam, &seq); err != nil { + return 0, err } val, _ := seq.Value() return uint64(val), nil @@ -592,7 +591,7 @@ func (icp *IconProvider) QueryConnectionChannels(ctx context.Context, height int } func (icp *IconProvider) QueryChannels(ctx context.Context) ([]*chantypes.IdentifiedChannel, error) { - nextSeq, err := icp.getNextSequence(ctx, MethodGetNextChannelSequence) + nextSeq, err := icp.getNextSequence(ctx, MethodGetNextChannelSequence, 0, map[string]interface{}{}) if err != nil { return nil, err } @@ -670,28 +669,22 @@ func (icp *IconProvider) QueryUnreceivedAcknowledgements(ctx context.Context, he } func (icp *IconProvider) QueryNextSeqRecv(ctx context.Context, height int64, channelid, portid string) (recvRes *chantypes.QueryNextSequenceReceiveResponse, err error) { - callParam := icp.prepareCallParams(MethodGetNextSequenceReceive, map[string]interface{}{ + + seq, err := icp.getNextSequence(ctx, MethodGetNextSequenceReceive, height, map[string]interface{}{ "portId": portid, "channelId": channelid, - }, callParamsWithHeight(types.NewHexInt(height))) - var nextSeqRecv types.HexInt - if err := icp.client.Call(callParam, &nextSeqRecv); err != nil { - return nil, err - } + }) + key := common.GetNextSequenceRecvCommitmentKey(portid, channelid) - keyHash := common.Sha3keccak256(key, []byte(nextSeqRecv)) + keyHash := common.Sha3keccak256(key, []byte(types.NewHexInt(int64(seq)))) proof, err := icp.QueryIconProof(ctx, height, keyHash) if err != nil { return nil, err } - nextSeq, err := nextSeqRecv.Value() - if err != nil { - return nil, err - } return &chantypes.QueryNextSequenceReceiveResponse{ - NextSequenceReceive: uint64(nextSeq), + NextSequenceReceive: seq, Proof: proof, ProofHeight: clienttypes.NewHeight(0, uint64(height)), }, nil @@ -792,6 +785,187 @@ func (icp *IconProvider) QueryPacketReceipt(ctx context.Context, height int64, c }, nil } +func (icp *IconProvider) QueryMissingPacketReceipts(ctx context.Context, latestHeight int64, channelId, portId string, startSeq, endSeq uint64) ([]uint64, error) { + receipts := make([]uint64, 0) + + if endSeq <= startSeq { + return receipts, fmt.Errorf("start sequence %d is greater than end sequence: %d ", startSeq, endSeq) + } + + paginate := common.NewPaginate(startSeq, endSeq, sequenceLimit) + + for paginate.HasNext() { + start, end, err := paginate.Next() + if err != nil { + return nil, err + } + callParam := icp.prepareCallParams(MethodGetMissingPacketReceipts, map[string]interface{}{ + "portId": portId, + "channelId": channelId, + "startSequence": types.NewHexInt(int64(start)), + "endSequence": types.NewHexInt(int64(end)), + }, callParamsWithHeight(types.NewHexInt(latestHeight))) + + var missingReceipts []types.HexInt + if err := icp.client.Call(callParam, &missingReceipts); err != nil { + return nil, err + } + + for _, h := range missingReceipts { + val, err := h.Value() + if err != nil { + return nil, err + } + receipts = append(receipts, uint64(val)) + } + + } + + return receipts, nil +} + +func (icp *IconProvider) QueryPacketHeights(ctx context.Context, latestHeight int64, channelId, portId string, startSeq, endSeq uint64) (provider.MessageHeights, error) { + return icp.QueryMessageHeights(ctx, MethodGetPacketHeights, latestHeight, channelId, portId, startSeq, endSeq) +} + +func (icp *IconProvider) QueryAckHeights(ctx context.Context, latestHeight int64, channelId, portId string, startSeq, endSeq uint64) (provider.MessageHeights, error) { + return icp.QueryMessageHeights(ctx, MethodGetAckHeights, latestHeight, channelId, portId, startSeq, endSeq) +} + +func (icp *IconProvider) QueryMessageHeights(ctx context.Context, methodName string, latestHeight int64, channelId, portId string, startSeq, endSeq uint64) (provider.MessageHeights, error) { + + packetHeights := make(provider.MessageHeights, 0) + + if methodName != MethodGetPacketHeights && + methodName != MethodGetAckHeights { + return provider.MessageHeights{}, fmt.Errorf("invalid methodName: %s", methodName) + } + + if endSeq <= startSeq { + return provider.MessageHeights{}, fmt.Errorf("start sequence %d is greater than end sequence: %d ", startSeq, endSeq) + } + + paginate := common.NewPaginate(startSeq, endSeq, sequenceLimit) + for paginate.HasNext() { + start, end, err := paginate.Next() + if err != nil { + return nil, err + } + + callParam := icp.prepareCallParams(methodName, map[string]interface{}{ + "portId": portId, + "channelId": channelId, + "startSequence": types.NewHexInt(int64(start)), + "endSequence": types.NewHexInt(int64(end)), + }, callParamsWithHeight(types.NewHexInt(latestHeight))) + + var rawPacketHeights map[int64]types.HexInt + if err := icp.client.Call(callParam, &rawPacketHeights); err != nil { + return nil, err + } + + for seq, h := range rawPacketHeights { + heightInt, err := h.Value() + if err != nil { + return nil, err + } + + packetHeights[uint64(seq)] = uint64(heightInt) + } + } + + return packetHeights, nil +} + +func (ap *IconProvider) QueryPacketMessageByEventHeight(ctx context.Context, eventType string, srcChanID, srcPortID string, sequence uint64, seqHeight uint64) (provider.PacketInfo, error) { + var eventName = "" + switch eventType { + case chantypes.EventTypeSendPacket: + eventName = EventTypeSendPacket + case chantypes.EventTypeWriteAck: + eventName = EventTypeWriteAcknowledgement + } + + block, err := ap.client.GetBlockByHeight(&types.BlockHeightParam{ + Height: types.NewHexInt(int64(seqHeight)), + }) + if err != nil { + return provider.PacketInfo{}, err + } + + for _, res := range block.NormalTransactions { + + txResult, err := ap.client.GetTransactionResult(&types.TransactionHashParam{ + Hash: res.TxHash, + }) + if err != nil { + return provider.PacketInfo{}, err + } + for _, el := range txResult.EventLogs { + if el.Addr != types.Address(ap.PCfg.IbcHandlerAddress) && + // sendPacket will be of index length 2 + len(el.Indexed) != 2 && + el.Indexed[0] != eventName { + continue + } + // for ack + if eventName == EventTypeWriteAcknowledgement { + if len(el.Data) == 0 || el.Data[0] == "" { + continue + } + } + + packetStr := el.Indexed[1] + packetByte, err := hex.DecodeString(strings.TrimPrefix(packetStr, "0x")) + if err != nil { + return provider.PacketInfo{}, err + } + var packet icon.Packet + if err := proto.Unmarshal(packetByte, &packet); err != nil { + return provider.PacketInfo{}, err + } + + if packet.Sequence == sequence && packet.SourceChannel == srcChanID && packet.SourcePort == srcPortID { + packet := provider.PacketInfo{ + // in case of icon we need to consider btp block because of which if a message is send at height h + // btp header will be in h + 1 + Height: seqHeight + 1, + Sequence: packet.Sequence, + SourcePort: packet.SourcePort, + SourceChannel: packet.SourceChannel, + DestPort: packet.DestinationPort, + DestChannel: packet.DestinationChannel, + Data: packet.Data, + TimeoutHeight: clienttypes.NewHeight(packet.TimeoutHeight.RevisionNumber, packet.TimeoutHeight.RevisionHeight), + TimeoutTimestamp: packet.TimeoutTimestamp, + } + // adding ack bytes + if eventName == EventTypeWriteAcknowledgement { + packet.Ack, err = hex.DecodeString(strings.TrimPrefix(el.Data[0], "0x")) + if err != nil { + return provider.PacketInfo{}, err + } + } + return packet, nil + } + + } + + } + + return provider.PacketInfo{}, fmt.Errorf( + fmt.Sprintf("Packet of seq number : %d, srcchannel:%s, srcPort:%s not found at height %d", + sequence, srcChanID, srcPortID, seqHeight)) + +} + +func (ap *IconProvider) QueryNextSeqSend(ctx context.Context, height int64, channelid, portid string) (seq uint64, err error) { + return ap.getNextSequence(ctx, MethodGetNextSequenceSend, height, map[string]interface{}{ + "channelId": channelid, + "portId": portid, + }) +} + // ics 20 - transfer // not required for icon func (icp *IconProvider) QueryDenomTrace(ctx context.Context, denom string) (*transfertypes.DenomTrace, error) { @@ -860,3 +1034,245 @@ func (icp *IconProvider) HexStringToProtoUnmarshal(encoded string, v proto.Messa return inputBytes, nil } + +func (ip *IconProvider) GetProofContextChangePeriod() (uint64, error) { + // assigning termPeriod + prep, err := ip.client.GetPrepTerm() + if err != nil { + return 0, fmt.Errorf("fail to get prepterm: %v", err) + } + + decentralized, err := prep.IsDecentralized.Value() + if err != nil { + return 0, err + } + + // storing prep-term term only if decentralized + if decentralized == 1 { + period, err := prep.Period.Value() + if err != nil { + return 0, err + } + return uint64(period), nil + + } + return 0, nil +} + +func (icp *IconProvider) GetProofContextChangeHeaders(ctx context.Context, afterHeight uint64) ([]provider.IBCHeader, uint64, error) { + proofContextChangeHeights := make([]provider.IBCHeader, 0) + + logTicker := time.NewTicker(10 * time.Second) + + errCh := make(chan error) // error channel + reconnectCh := make(chan struct{}, 1) // reconnect channel + btpBlockNotifCh := make(chan *types.BlockNotification, 10) // block notification channel + btpBlockRespCh := make(chan *btpBlockResponse, cap(btpBlockNotifCh)) // block result channel + + // uptoHeight + uptoHeight, err := icp.QueryLatestHeight(ctx) + if err != nil { + return nil, 0, fmt.Errorf("error fetching latest height %v", err) + } + + reconnect := func() { + select { + case reconnectCh <- struct{}{}: + default: + } + for len(btpBlockRespCh) > 0 || len(btpBlockNotifCh) > 0 { + select { + case <-btpBlockRespCh: // clear block result channel + case <-btpBlockNotifCh: // clear block notification channel + } + } + } + + icp.log.Info("Start to check from height", zap.Int64("height", int64(afterHeight))) + // subscribe to monitor block + ctxMonitorBlock, cancelMonitorBlock := context.WithCancel(ctx) + reconnect() + + processedheight := int64(afterHeight) + 1 + + blockReq := &types.BlockRequest{ + Height: types.NewHexInt(processedheight), + } + +loop: + for { + select { + case <-ctx.Done(): + return nil, 0, nil + case err := <-errCh: + return nil, 0, err + + // this ticker is just to show log + case <-logTicker.C: + // fetching latest height also + h, _ := icp.QueryLatestHeight(ctx) + if h > 0 { + uptoHeight = h + icp.log.Info("finding proof context change height continues...", + zap.Int64("reached height", processedheight)) + } + + case <-reconnectCh: + cancelMonitorBlock() + ctxMonitorBlock, cancelMonitorBlock = context.WithCancel(ctx) + + go func(ctx context.Context, cancel context.CancelFunc) { + blockReq.Height = types.NewHexInt(processedheight) + err := icp.client.MonitorBlock(ctx, blockReq, func(conn *websocket.Conn, v *types.BlockNotification) error { + if !errors.Is(ctx.Err(), context.Canceled) { + btpBlockNotifCh <- v + } + return nil + }, func(conn *websocket.Conn) { + }, func(conn *websocket.Conn, err error) {}) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + time.Sleep(time.Second * 5) + reconnect() + } + + }(ctxMonitorBlock, cancelMonitorBlock) + case br := <-btpBlockRespCh: + for ; br != nil; processedheight++ { + + if br.Header.ShouldUpdateForProofContextChange() { + icp.log.Info("proof context changed at", zap.Int64("height", int64(br.Header.MainHeight))) + proofContextChangeHeights = append(proofContextChangeHeights, br.Header) + } + // process completed + if br.Header.Height() == uint64(uptoHeight) { + return proofContextChangeHeights, uint64(uptoHeight), nil + } + + if br = nil; len(btpBlockRespCh) > 0 { + br = <-btpBlockRespCh + } + } + // remove unprocessed blockResponses + for len(btpBlockRespCh) > 0 { + <-btpBlockRespCh + } + + default: + select { + default: + case bn := <-btpBlockNotifCh: + requestCh := make(chan *btpBlockRequest, cap(btpBlockNotifCh)) + for i := int64(0); bn != nil; i++ { + height, err := bn.Height.Value() + if err != nil { + return nil, 0, err + } else if height != processedheight+i { + icp.log.Warn("Reconnect: missing block notification", + zap.Int64("got", height), + zap.Int64("expected", processedheight+i), + ) + reconnect() + continue loop + } + + requestCh <- &btpBlockRequest{ + height: height, + hash: bn.Hash, + indexes: bn.Indexes, + events: bn.Events, + retry: queryRetries, + } + if bn = nil; len(btpBlockNotifCh) > 0 && len(requestCh) < cap(requestCh) { + bn = <-btpBlockNotifCh + } + } + + brs := make([]*btpBlockResponse, 0, len(requestCh)) + for request := range requestCh { + switch { + case request.err != nil: + if request.retry > 0 { + request.retry-- + request.response, request.err = nil, nil + requestCh <- request + continue + } + icp.log.Info("Request error ", + zap.Any("height", request.height), + zap.Error(request.err)) + brs = append(brs, nil) + if len(brs) == cap(brs) { + close(requestCh) + } + case request.response != nil: + brs = append(brs, request.response) + if len(brs) == cap(brs) { + close(requestCh) + } + default: + go icp.handleBlockRequest(request, requestCh) + + } + + } + // filter nil + _brs, brs := brs, brs[:0] + for _, v := range _brs { + if v.IsProcessed == processed { + brs = append(brs, v) + } + } + + // sort and forward notifications + if len(brs) > 0 { + sort.SliceStable(brs, func(i, j int) bool { + return brs[i].Height < brs[j].Height + }) + for i, d := range brs { + if d.Height == processedheight+int64(i) { + btpBlockRespCh <- d + } + } + } + + } + } + } +} + +func (icp *IconProvider) handleBlockRequest( + request *btpBlockRequest, requestCh chan *btpBlockRequest) { + defer func() { + time.Sleep(50 * time.Millisecond) + requestCh <- request + }() + + if request.response == nil { + request.response = &btpBlockResponse{ + IsProcessed: notProcessed, + Height: request.height, + } + } + + validators, err := icp.GetProofContextByHeight(request.height) + if err != nil { + request.err = errors.Wrapf(err, "Failed to get proof context: %v", err) + return + } + + btpHeader, err := icp.GetBtpHeader(request.height) + if err != nil { + if btpBlockNotPresent(err) { + request.response.Header = NewIconIBCHeader(nil, validators, (request.height)) + request.response.IsProcessed = processed + return + } + request.err = errors.Wrapf(err, "Failed to get btp header: %v", err) + return + } + request.response.Header = NewIconIBCHeader(btpHeader, validators, int64(btpHeader.MainHeight)) + request.response.IsProcessed = processed +} diff --git a/relayer/chains/icon/types/types.go b/relayer/chains/icon/types/types.go index 7026f4a54..721a9e851 100644 --- a/relayer/chains/icon/types/types.go +++ b/relayer/chains/icon/types/types.go @@ -666,3 +666,23 @@ func (h *NetworkTypeSection) Encode() []byte { func (h *NetworkTypeSection) Hash() []byte { return relayer_common.Sha3keccak256(h.Encode()) } + +type PrepTerm struct { + BlockHeight HexInt + bondRequirement HexInt + EndBlockHeight HexInt + IissVersion HexInt + Irep HexInt + IsDecentralized HexInt + MainPRepCount HexInt + Period HexInt + Preps interface{} //dont need right now + Revision HexInt + RewardFund interface{} //dont need right now + Rrep HexInt + Sequence HexInt + StartBlockHeight HexInt + TotalDelegated HexInt + TotalPower HexInt + TotalSupply HexInt +} diff --git a/relayer/chains/penumbra/query.go b/relayer/chains/penumbra/query.go index e20a64889..0ed2ecbb8 100644 --- a/relayer/chains/penumbra/query.go +++ b/relayer/chains/penumbra/query.go @@ -988,3 +988,20 @@ func (cc *PenumbraProvider) QueryICQWithProof(ctx context.Context, msgType strin func (cc *PenumbraProvider) QueryClientPrevConsensusStateHeight(ctx context.Context, chainHeight int64, clientId string, clientHeight int64) (exported.Height, error) { panic("QueryClientPrevConsensusStateHeight not implemented") } + +func (ap *PenumbraProvider) QueryPacketMessageByEventHeight(ctx context.Context, eventType, srcChanID, srcPortID string, sequence uint64, height uint64) (provider.PacketInfo, error) { + panic("QuerySendPacketByHeight not implemented") +} + +func (ap *PenumbraProvider) QueryPacketHeights(ctx context.Context, latestHeight int64, channelId, portId string, startSeq, endSeq uint64) (packetHeights provider.MessageHeights, err error) { + panic("QueryPacketHeights not implemented") +} +func (ap *PenumbraProvider) QueryAckHeights(ctx context.Context, latestHeight int64, channelId, portId string, startSeq, endSeq uint64) (packetHeights provider.MessageHeights, err error) { + panic("QueryAckHeights not implemented") +} +func (ap *PenumbraProvider) QueryMissingPacketReceipts(ctx context.Context, latestHeight int64, channelId, portId string, startSeq, endSeq uint64) (missingReceipts []uint64, err error) { + panic("QueryMissingPacketReceipts not implemented") +} +func (ap *PenumbraProvider) QueryNextSeqSend(ctx context.Context, height int64, channelid, portid string) (seq uint64, err error) { + panic("QueryNextSeqSend not implemented") +} diff --git a/relayer/chains/wasm/consts.go b/relayer/chains/wasm/consts.go index e68b1bd1e..039430032 100644 --- a/relayer/chains/wasm/consts.go +++ b/relayer/chains/wasm/consts.go @@ -20,9 +20,31 @@ const ( MethodAcknowledgePacket = "acknowledgement_packet" MethodTimeoutPacket = "timeout_packet" + // queryMethods MethodGetNextClientSequence = "get_next_client_sequence" MethodGetNextChannelSequence = "get_next_channel_sequence" MethodGetNextConnectionSequence = "get_next_connection_sequence" + + MethodGetNextSequenceSend = "get_next_sequence_send" + MethodGetNextSequenceReceive = "get_next_sequence_receive" + MethodGetNextSequenceAcknowledgement = "get_next_sequence_acknowledgement" + + MethodGetClientState = "get_client_state" + MethodGetChannel = "get_channel" + MethodGetConnection = "get_connection" + MethodGetConsensusStateByHeight = "get_consensus_state_by_height" + + MethodGetPacketCommitment = "get_packet_commitment" + MethodGetPacketAcknowledgementCommitment = "get_packet_acknowledgement_commitment" + MethodGetPacketReceipt = "get_packet_receipt" + + MethodGetAllPorts = "get_all_ports" + MethodGetCommitmentPrefix = "get_commitment_prefix" + + MethodGetMissingPacketReceipts = "get_missing_packet_receipts" + MethodGetPacketHeights = "get_packet_heights" + MethodGetAckHeights = "get_ack_heights" + MethodGetPreviousConsensusStateHeight = "get_previous_consensus_state_height" ) const ( diff --git a/relayer/chains/wasm/provider.go b/relayer/chains/wasm/provider.go index 0fae2a4bd..475ad6334 100644 --- a/relayer/chains/wasm/provider.go +++ b/relayer/chains/wasm/provider.go @@ -29,6 +29,7 @@ import ( commitmenttypes "github.com/cosmos/ibc-go/v7/modules/core/23-commitment/types" ibcexported "github.com/cosmos/ibc-go/v7/modules/core/exported" "github.com/cosmos/relayer/v2/relayer/codecs/ethermint" + "github.com/cosmos/relayer/v2/relayer/common" "github.com/cosmos/relayer/v2/relayer/processor" "github.com/cosmos/relayer/v2/relayer/provider" @@ -69,8 +70,9 @@ type WasmProviderConfig struct { Broadcast provider.BroadcastMode `json:"broadcast-mode" yaml:"broadcast-mode"` IbcHandlerAddress string `json:"ibc-handler-address" yaml:"ibc-handler-address"` FirstRetryBlockAfter uint64 `json:"first-retry-block-after" yaml:"first-retry-block-after"` - StartHeight uint64 `json:"start-height" yaml:"start-height"` - BlockInterval uint64 `json:"block-interval" yaml:"block-interval"` + // StartHeight uint64 `json:"start-height" yaml:"start-height"` + BlockInterval uint64 `json:"block-interval" yaml:"block-interval"` + Concurrency int64 `json:"concurrency" yaml:"concurrency"` } type WasmIBCHeader struct { @@ -307,7 +309,7 @@ func (ap *WasmProvider) ChainName() string { } func (ap *WasmProvider) Type() string { - return "wasm" + return common.WasmModule } func (ap *WasmProvider) Key() string { @@ -371,7 +373,6 @@ func (ap *WasmProvider) Init(ctx context.Context) error { if addr != nil { clientCtx = clientCtx. WithFromAddress(addr) - } ap.QueryClient = wasmtypes.NewQueryClient(clientCtx) diff --git a/relayer/chains/wasm/query.go b/relayer/chains/wasm/query.go index 546ab61a6..cd2f2d3d9 100644 --- a/relayer/chains/wasm/query.go +++ b/relayer/chains/wasm/query.go @@ -253,7 +253,9 @@ func (ap *WasmProvider) QueryClientStateResponse(ctx context.Context, height int } func (ap *WasmProvider) QueryClientStateContract(ctx context.Context, clientId string) (*icon.ClientState, error) { - clientStateParam, err := types.NewClientState(clientId).Bytes() + clientStateParam, err := types.GenerateQueryParams(MethodGetClientState, types.ClientState{ + ClientId: clientId, + }) if err != nil { return nil, err } @@ -279,7 +281,9 @@ func (ap *WasmProvider) QueryClientStateContract(ctx context.Context, clientId s } func (ap *WasmProvider) QueryConnectionContract(ctx context.Context, connId string) (*conntypes.ConnectionEnd, error) { - connStateParam, err := types.NewConnection(connId).Bytes() + connStateParam, err := types.GenerateQueryParams(MethodGetConnection, types.Connection{ + ConnectionId: connId, + }) if err != nil { return nil, err } @@ -298,12 +302,14 @@ func (ap *WasmProvider) QueryConnectionContract(ctx context.Context, connId stri } func (ap *WasmProvider) QueryChannelContractNoRetry(ctx context.Context, portId, channelId string) (*chantypes.Channel, error) { - channelStateParam, err := types.NewChannel(portId, channelId).Bytes() + param, err := types.GenerateQueryParams(MethodGetChannel, + types.NewCapability(channelId, portId), + ) if err != nil { return nil, err } - channelState, err := ap.QueryIBCHandlerContractNoRetry(ctx, channelStateParam) + channelState, err := ap.QueryIBCHandlerContractNoRetry(ctx, param) if err != nil { return nil, err } @@ -317,11 +323,14 @@ func (ap *WasmProvider) QueryChannelContractNoRetry(ctx context.Context, portId, func (ap *WasmProvider) QueryClientConsensusState(ctx context.Context, chainHeight int64, clientid string, clientHeight ibcexported.Height) (*clienttypes.QueryConsensusStateResponse, error) { - consensusStateParam, err := types.NewConsensusStateByHeight(clientid, uint64(clientHeight.GetRevisionHeight())).Bytes() + param, err := types.GenerateQueryParams(MethodGetConsensusStateByHeight, types.ConsensusStateByHeight{ + ClientId: clientid, + Height: clientHeight.GetRevisionHeight(), + }) if err != nil { return nil, err } - consensusState, err := ap.QueryIBCHandlerContractNoRetry(ctx, consensusStateParam) + consensusState, err := ap.QueryIBCHandlerContractNoRetry(ctx, param) if err != nil { return nil, err } @@ -366,15 +375,37 @@ func (ap *WasmProvider) QueryIBCHandlerContractNoRetry(ctx context.Context, para Address: ap.PCfg.IbcHandlerAddress, QueryData: param, }) + if err != nil { + return nil, err + } + return ProcessContractResponse(resp) +} + +func (ap *WasmProvider) QueryIBCHandlerContractWithHeight(ctx context.Context, param wasmtypes.RawContractMessage, height int64, response interface{}) error { + + clientCtx := ap.ClientCtx + if height > 0 { + clientCtx = clientCtx.WithHeight(height) + + } + qc := wasmtypes.NewQueryClient(clientCtx) + // holding sdk just before and after + done := ap.SetSDKContext() + r, err := qc.SmartContractState(ctx, &wasmtypes.QuerySmartContractStateRequest{ + Address: ap.PCfg.IbcHandlerAddress, + QueryData: param, + }) + defer done() if err != nil { ap.log.Error( "Failed to query", zap.Any("Param", param), zap.Error(err), ) - return nil, err + return err } - return ProcessContractResponse(resp) + + return json.Unmarshal(r.Data.Bytes(), &response) } func (ap *WasmProvider) QueryIBCHandlerContractProcessed(ctx context.Context, param wasmtypes.RawContractMessage) ([]byte, error) { @@ -419,7 +450,7 @@ func (ap *WasmProvider) QueryConsensusState(ctx context.Context, height int64) ( } func (ap *WasmProvider) getAllPorts(ctx context.Context) ([]string, error) { - param, err := types.NewGetAllPorts().Bytes() + param, err := types.GenerateQueryParams(MethodGetAllPorts, struct{}{}) if err != nil { return make([]string, 0), err } @@ -437,41 +468,24 @@ func (ap *WasmProvider) getAllPorts(ctx context.Context) ([]string, error) { } func (ap *WasmProvider) getNextSequence(ctx context.Context, methodName string) (int, error) { - switch methodName { - case MethodGetNextClientSequence: - param, err := types.NewNextClientSequence().Bytes() - if err != nil { - return 0, err - } - op, err := ap.QueryIBCHandlerContract(ctx, param) + switch methodName { + case MethodGetNextClientSequence, MethodGetNextChannelSequence, MethodGetNextConnectionSequence: + param, err := types.GenerateQueryParams(methodName, struct{}{}) if err != nil { return 0, err } - return byteToInt(op.Data.Bytes()) - - case MethodGetNextChannelSequence: - param, err := types.NewNextChannelSequence().Bytes() - if err != nil { - return 0, err - } - op, err := ap.QueryIBCHandlerContract(ctx, param) + res, err := ap.QueryIBCHandlerContract(ctx, param) if err != nil { return 0, err } - return byteToInt(op.Data.Bytes()) - case MethodGetNextConnectionSequence: - param, err := types.NewNextConnectionSequence().Bytes() - if err != nil { - return 0, err - } - op, err := ap.QueryIBCHandlerContract(ctx, param) - if err != nil { + var op int + if err := json.Unmarshal(res.Data.Bytes(), &op); err != nil { return 0, err } - return byteToInt(op.Data.Bytes()) + return op, nil default: return 0, errors.New("Invalid method name") @@ -504,12 +518,14 @@ func (ap *WasmProvider) QueryClients(ctx context.Context) (clienttypes.Identifie // ics 03 - connection func (ap *WasmProvider) QueryConnection(ctx context.Context, height int64, connectionid string) (*conntypes.QueryConnectionResponse, error) { - connectionStateParams, err := types.NewConnection(connectionid).Bytes() + param, err := types.GenerateQueryParams(MethodGetConnection, types.Connection{ + ConnectionId: connectionid, + }) if err != nil { return nil, err } - connState, err := ap.QueryIBCHandlerContractProcessed(ctx, connectionStateParams) + connState, err := ap.QueryIBCHandlerContractProcessed(ctx, param) if err != nil { return nil, err } @@ -645,13 +661,15 @@ func (ap *WasmProvider) GenerateConnHandshakeProof(ctx context.Context, height i } // ics 04 - channel -func (ap *WasmProvider) QueryChannel(ctx context.Context, height int64, channelid, portid string) (chanRes *chantypes.QueryChannelResponse, err error) { - channelParams, err := types.NewChannel(portid, channelid).Bytes() +func (ap *WasmProvider) QueryChannel(ctx context.Context, height int64, channelId, portId string) (chanRes *chantypes.QueryChannelResponse, err error) { + param, err := types.GenerateQueryParams(MethodGetChannel, + types.NewCapability(channelId, portId), + ) if err != nil { return nil, err } - channelState, err := ap.QueryIBCHandlerContractProcessed(ctx, channelParams) + channelState, err := ap.QueryIBCHandlerContractProcessed(ctx, param) if err != nil { return nil, err } @@ -665,7 +683,7 @@ func (ap *WasmProvider) QueryChannel(ctx context.Context, height int64, channeli return nil, err } - storageKey := getStorageKeyFromPath(common.GetChannelCommitmentKey(portid, channelid)) + storageKey := getStorageKeyFromPath(common.GetChannelCommitmentKey(portId, channelId)) proof, err := ap.QueryWasmProof(ctx, storageKey, height) if err != nil { return nil, err @@ -674,7 +692,7 @@ func (ap *WasmProvider) QueryChannel(ctx context.Context, height int64, channeli return chantypes.NewQueryChannelResponse(channelS, proof, clienttypes.NewHeight(0, uint64(height))), nil } -func (ap *WasmProvider) QueryChannelClient(ctx context.Context, height int64, channelid, portid string) (*clienttypes.IdentifiedClientState, error) { +func (ap *WasmProvider) QueryChannelClient(ctx context.Context, height int64, channelId, portId string) (*clienttypes.IdentifiedClientState, error) { panic(fmt.Sprintf("%s%s", ap.ChainName(), NOT_IMPLEMENTED)) } @@ -734,28 +752,30 @@ func (ap *WasmProvider) QueryChannels(ctx context.Context) ([]*chantypes.Identif return channels, nil } -func (ap *WasmProvider) QueryPacketCommitments(ctx context.Context, height uint64, channelid, portid string) (commitments *chantypes.QueryPacketCommitmentsResponse, err error) { +func (ap *WasmProvider) QueryPacketCommitments(ctx context.Context, height uint64, channelId, portId string) (commitments *chantypes.QueryPacketCommitmentsResponse, err error) { panic(fmt.Sprintf("%s%s", ap.ChainName(), NOT_IMPLEMENTED)) } -func (ap *WasmProvider) QueryPacketAcknowledgements(ctx context.Context, height uint64, channelid, portid string) (acknowledgements []*chantypes.PacketState, err error) { +func (ap *WasmProvider) QueryPacketAcknowledgements(ctx context.Context, height uint64, channelId, portId string) (acknowledgements []*chantypes.PacketState, err error) { panic(fmt.Sprintf("%s%s", ap.ChainName(), NOT_IMPLEMENTED)) } -func (ap *WasmProvider) QueryUnreceivedPackets(ctx context.Context, height uint64, channelid, portid string, seqs []uint64) ([]uint64, error) { +func (ap *WasmProvider) QueryUnreceivedPackets(ctx context.Context, height uint64, channelId, portId string, seqs []uint64) ([]uint64, error) { panic(fmt.Sprintf("%s%s", ap.ChainName(), NOT_IMPLEMENTED)) } -func (ap *WasmProvider) QueryUnreceivedAcknowledgements(ctx context.Context, height uint64, channelid, portid string, seqs []uint64) ([]uint64, error) { +func (ap *WasmProvider) QueryUnreceivedAcknowledgements(ctx context.Context, height uint64, channelId, portId string, seqs []uint64) ([]uint64, error) { panic(fmt.Sprintf("%s%s", ap.ChainName(), NOT_IMPLEMENTED)) } -func (ap *WasmProvider) QueryNextSeqRecv(ctx context.Context, height int64, channelid, portid string) (recvRes *chantypes.QueryNextSequenceReceiveResponse, err error) { - nextSeqRecvParams, err := types.NewNextSequenceReceive(portid, channelid).Bytes() +func (ap *WasmProvider) QueryNextSeqRecv(ctx context.Context, height int64, channelId, portId string) (recvRes *chantypes.QueryNextSequenceReceiveResponse, err error) { + param, err := types.GenerateQueryParams(MethodGetNextSequenceReceive, + types.NewCapability(channelId, portId), + ) if err != nil { return nil, err } - nextSeqRecv, err := ap.QueryIBCHandlerContract(ctx, nextSeqRecvParams) + nextSeqRecv, err := ap.QueryIBCHandlerContract(ctx, param) if err != nil { return nil, err } @@ -776,16 +796,18 @@ func (ap *WasmProvider) QueryNextSeqRecv(ctx context.Context, height int64, chan }, nil } -func (ap *WasmProvider) QueryPacketCommitment(ctx context.Context, height int64, channelid, portid string, seq uint64) (comRes *chantypes.QueryPacketCommitmentResponse, err error) { - pktCommitmentParams, err := types.NewPacketCommitment(portid, channelid, seq).Bytes() +func (ap *WasmProvider) QueryPacketCommitment(ctx context.Context, height int64, channelId, portId string, seq uint64) (comRes *chantypes.QueryPacketCommitmentResponse, err error) { + param, err := types.GenerateQueryParams(MethodGetPacketCommitment, + types.NewPacketIdentity(channelId, portId, seq), + ) if err != nil { return nil, err } - pktCommitment, err := ap.QueryIBCHandlerContract(ctx, pktCommitmentParams) + pktCommitment, err := ap.QueryIBCHandlerContract(ctx, param) if err != nil { return nil, err } - storageKey := getStorageKeyFromPath(common.GetPacketCommitmentKey(portid, channelid, big.NewInt(int64(seq)))) + storageKey := getStorageKeyFromPath(common.GetPacketCommitmentKey(portId, channelId, big.NewInt(int64(seq)))) proof, err := ap.QueryWasmProof(ctx, storageKey, height) if err != nil { @@ -799,16 +821,19 @@ func (ap *WasmProvider) QueryPacketCommitment(ctx context.Context, height int64, } -func (ap *WasmProvider) QueryPacketAcknowledgement(ctx context.Context, height int64, channelid, portid string, seq uint64) (ackRes *chantypes.QueryPacketAcknowledgementResponse, err error) { - pktAcknowledgementParams, err := types.NewPacketAcknowledgementCommitment(portid, channelid, seq).Bytes() +func (ap *WasmProvider) QueryPacketAcknowledgement(ctx context.Context, height int64, channelId, portId string, seq uint64) (ackRes *chantypes.QueryPacketAcknowledgementResponse, err error) { + param, err := types.GenerateQueryParams( + MethodGetPacketAcknowledgementCommitment, + types.NewPacketIdentity(channelId, portId, seq), + ) if err != nil { return nil, err } - pktAcknowledgement, err := ap.QueryIBCHandlerContract(ctx, pktAcknowledgementParams) + pktAcknowledgement, err := ap.QueryIBCHandlerContract(ctx, param) if err != nil { return nil, err } - storageKey := getStorageKeyFromPath(common.GetPacketAcknowledgementCommitmentKey(portid, channelid, big.NewInt(int64(seq)))) + storageKey := getStorageKeyFromPath(common.GetPacketAcknowledgementCommitmentKey(portId, channelId, big.NewInt(int64(seq)))) proof, err := ap.QueryWasmProof(ctx, storageKey, height) return &chantypes.QueryPacketAcknowledgementResponse{ @@ -818,27 +843,27 @@ func (ap *WasmProvider) QueryPacketAcknowledgement(ctx context.Context, height i }, nil } -func (ap *WasmProvider) QueryPacketReceipt(ctx context.Context, height int64, channelid, portid string, seq uint64) (recRes *chantypes.QueryPacketReceiptResponse, err error) { +func (ap *WasmProvider) QueryPacketReceipt(ctx context.Context, height int64, channelId, portId string, seq uint64) (recRes *chantypes.QueryPacketReceiptResponse, err error) { - // getting proof from commitment map in contract - storageKey := getStorageKeyFromPath(common.GetPacketReceiptCommitmentKey(portid, channelid, big.NewInt(int64(seq)))) - proof, err := ap.QueryWasmProof(ctx, storageKey, height) + param, err := types.GenerateQueryParams( + MethodGetPacketReceipt, types.NewPacketIdentity(channelId, portId, seq)) if err != nil { return nil, err } - pktReceiptParams, err := types.NewPacketReceipt(portid, channelid, seq).Bytes() - if err != nil { + pktReceipt, err := ap.QueryIBCHandlerContract(ctx, param) + if err != nil && !strings.Contains(err.Error(), "PacketReceiptNotFound") { return nil, err } - pktReceipt, err := ap.QueryIBCHandlerContract(ctx, pktReceiptParams) - if err != nil && !strings.Contains(err.Error(), "PacketReceiptNotFound") { + storageKey := getStorageKeyFromPath(common.GetPacketReceiptCommitmentKey(portId, channelId, big.NewInt(int64(seq)))) + proof, err := ap.QueryWasmProof(ctx, storageKey, height) + if err != nil { return nil, err } return &chantypes.QueryPacketReceiptResponse{ - Received: pktReceipt != nil, // TODO: Bytes to boolean + Received: pktReceipt != nil, Proof: proof, ProofHeight: clienttypes.NewHeight(0, uint64(height)), }, nil @@ -846,11 +871,11 @@ func (ap *WasmProvider) QueryPacketReceipt(ctx context.Context, height int64, ch func (ap *WasmProvider) GetCommitmentPrefixFromContract(ctx context.Context) ([]byte, error) { - pktCommitmentParams, err := types.NewCommitmentPrefix().Bytes() + param, err := types.GenerateQueryParams(MethodGetCommitmentPrefix, struct{}{}) if err != nil { return nil, err } - return ap.QueryIBCHandlerContractProcessed(ctx, pktCommitmentParams) + return ap.QueryIBCHandlerContractProcessed(ctx, param) } @@ -892,16 +917,14 @@ func (ap *WasmProvider) QueryDenomTraces(ctx context.Context, offset, limit uint } func (ap *WasmProvider) QueryClientPrevConsensusStateHeight(ctx context.Context, chainHeight int64, clientId string, clientHeight int64) (exported.Height, error) { - param, err := types.NewPrevConsensusStateHeight(clientId, uint64(clientHeight)).Bytes() - res, err := ap.QueryIBCHandlerContract(ctx, param) + param, err := types.GenerateQueryParams(MethodGetPreviousConsensusStateHeight, + types.NewConsensusStateByHeight(clientId, uint64(clientHeight)), + ) if err != nil { return nil, err } - var heights []int64 - err = json.Unmarshal(res.Data.Bytes(), &heights) - - if err != nil { + if err := ap.QueryIBCHandlerContractWithHeight(ctx, param, chainHeight, &heights); err != nil { return nil, err } @@ -910,3 +933,101 @@ func (ap *WasmProvider) QueryClientPrevConsensusStateHeight(ctx context.Context, } return clienttypes.Height{RevisionNumber: 0, RevisionHeight: uint64(heights[0])}, nil } + +func (ap *WasmProvider) QueryMissingPacketReceipts(ctx context.Context, latestHeight int64, channelId, portId string, startSeq, endSeq uint64) (missingReceipts []uint64, err error) { + + param, err := types.GenerateQueryParams(MethodGetMissingPacketReceipts, + types.NewRangeParams(channelId, portId, startSeq, endSeq)) + if err != nil { + return nil, err + } + + var receipts []uint64 + if err := ap.QueryIBCHandlerContractWithHeight(ctx, param, latestHeight, &receipts); err != nil { + return nil, err + } + + return receipts, nil +} + +func (ap *WasmProvider) QueryMessageHeights(ctx context.Context, methodName string, latestHeight int64, channelId, portId string, startSeq, endSeq uint64) (provider.MessageHeights, error) { + + if methodName != MethodGetAckHeights && + methodName != MethodGetPacketHeights { + return provider.MessageHeights{}, fmt.Errorf("incorrect method name: %s", methodName) + } + + param, err := types.GenerateQueryParams(methodName, + types.NewRangeParams(channelId, portId, startSeq, endSeq)) + if err != nil { + return nil, err + } + + var packetHeights provider.MessageHeights + if err := ap.QueryIBCHandlerContractWithHeight(ctx, param, latestHeight, &packetHeights); err != nil { + return nil, err + } + + return packetHeights, nil +} + +func (ap *WasmProvider) QueryPacketHeights(ctx context.Context, latestHeight int64, channelId, portId string, startSeq, endSeq uint64) (provider.MessageHeights, error) { + return ap.QueryMessageHeights(ctx, MethodGetPacketHeights, latestHeight, channelId, portId, startSeq, endSeq) +} + +func (ap *WasmProvider) QueryAckHeights(ctx context.Context, latestHeight int64, channelId, portId string, startSeq, endSeq uint64) (provider.MessageHeights, error) { + return ap.QueryMessageHeights(ctx, MethodGetAckHeights, latestHeight, channelId, portId, startSeq, endSeq) +} + +func (ap *WasmProvider) QueryPacketMessageByEventHeight(ctx context.Context, eventType, srcChanID, srcPortID string, sequence uint64, seqHeight uint64) (provider.PacketInfo, error) { + + h := int64(seqHeight) + blockRes, err := ap.RPCClient.BlockResults(ctx, &h) + if err != nil { + return provider.PacketInfo{}, err + } + + base64Encoded := true + for _, tx := range blockRes.TxsResults { + if tx.Code != 0 { + // tx was not successful + continue + } + messages := ibcMessagesFromEvents(ap.log, tx.Events, ap.ChainId(), seqHeight, ap.PCfg.IbcHandlerAddress, base64Encoded) + for _, m := range messages { + // in case eventtype donot match + if m.eventType != eventType { + continue + } + switch t := m.info.(type) { + case *packetInfo: + packet := provider.PacketInfo(*t) + if packet.Sequence == sequence && packet.SourceChannel == srcChanID && packet.SourcePort == srcPortID { + // for ack byte length cannot be empty + if eventType == chantypes.EventTypeAcknowledgePacket && len(packet.Ack) == 0 { + continue + } + + return packet, nil + } + default: + continue + } + } + } + return provider.PacketInfo{}, fmt.Errorf("Packet not found on height") +} + +func (ap *WasmProvider) QueryNextSeqSend(ctx context.Context, height int64, channelId, portId string) (uint64, error) { + param, err := types.GenerateQueryParams(MethodGetNextSequenceSend, + types.NewCapability(channelId, portId), + ) + if err != nil { + return 0, err + } + var response uint64 + if err := ap.QueryIBCHandlerContractWithHeight(ctx, param, height, &response); err != nil { + return 0, err + } + return response, nil +} diff --git a/relayer/chains/wasm/types/types.go b/relayer/chains/wasm/types/types.go index f6085a42c..b68c785c7 100644 --- a/relayer/chains/wasm/types/types.go +++ b/relayer/chains/wasm/types/types.go @@ -19,22 +19,10 @@ func NewHexBytes(b []byte) HexBytes { return HexBytes("0x" + hex.EncodeToString(b)) } -// / IBC Handler Contract Methods and Parameters - -// / EXTERNAL METHODS - type ContractCall struct { Msg HexBytes `json:"msg"` } -type CreateClientMsg struct { - CreateClient ContractCall `json:"create_client"` -} - -func (c *CreateClientMsg) Bytes() ([]byte, error) { - return json.Marshal(c) -} - func GenerateTxnParams(methodName string, value HexBytes) ([]byte, error) { if len(methodName) <= 0 { return nil, fmt.Errorf("Empty Method Name") @@ -50,322 +38,63 @@ func GenerateTxnParams(methodName string, value HexBytes) ([]byte, error) { return json.Marshal(m) } -// / READONLY METHODS -type GetClientState struct { - ClientState struct { - ClientId string `json:"client_id"` - } `json:"get_client_state"` +func GenerateQueryParams(methodName string, params interface{}) ([]byte, error) { + queryObj := make(map[string]interface{}, 0) + queryObj[methodName] = params + return json.Marshal(queryObj) } -func (x *GetClientState) Bytes() ([]byte, error) { - return json.Marshal(x) +type ClientState struct { + ClientId string `json:"client_id"` } -func NewClientState(clientId string) *GetClientState { - return &GetClientState{ - struct { - ClientId string `json:"client_id"` - }{ - ClientId: clientId, - }, - } +func NewClientState(ClientId string) ClientState { + return ClientState{ClientId} } type ConsensusStateByHeight struct { - ClientId string "json:\"client_id\"" - Height uint64 "json:\"height\"" + ClientId string `json:"client_id"` + Height uint64 `json:"height"` } -type GetConsensusStateByHeight struct { - ConsensusStateByHeight ConsensusStateByHeight `json:"get_consensus_state_by_height"` +func NewConsensusStateByHeight(ClientId string, Height uint64) ConsensusStateByHeight { + return ConsensusStateByHeight{ClientId, Height} } -func (x *GetConsensusStateByHeight) Bytes() ([]byte, error) { - return json.Marshal(x) +type Connection struct { + ConnectionId string `json:"connection_id"` } -func NewConsensusStateByHeight(clientId string, height uint64) *GetConsensusStateByHeight { - return &GetConsensusStateByHeight{ - ConsensusStateByHeight: ConsensusStateByHeight{ - ClientId: clientId, - Height: height, - }, - } +func NewConnection(ConnectionId string) Connection { + return Connection{ConnectionId} } -type GetConnection struct { - Connection struct { - ConnectionId string `json:"connection_id"` - } `json:"get_connection"` +type Capability struct { + ChannelId string `json:"channel_id"` + PortId string `json:"port_id"` } -func (x *GetConnection) Bytes() ([]byte, error) { - return json.Marshal(x) +func NewCapability(ChannelId, PortId string) Capability { + return Capability{ChannelId, PortId} } -func NewConnection(connId string) *GetConnection { - return &GetConnection{ - Connection: struct { - ConnectionId string "json:\"connection_id\"" - }{ - ConnectionId: connId, - }, - } +type PacketIdentity struct { + ChannelId string `json:"channel_id"` + PortId string `json:"port_id"` + Sequence uint64 `json:"sequence"` } -type GetChannel struct { - Channel struct { - PortId string `json:"port_id"` - ChannelId string `json:"channel_id"` - } `json:"get_channel"` +func NewPacketIdentity(ChannelId, PortId string, Sequence uint64) PacketIdentity { + return PacketIdentity{ChannelId, PortId, Sequence} } -func (x *GetChannel) Bytes() ([]byte, error) { - return json.Marshal(x) +type RangeParams struct { + ChannelId string `json:"channel_id"` + PortId string `json:"port_id"` + StartSequence uint64 `json:"start_sequence"` + EndSequence uint64 `json:"end_sequence"` } -func NewChannel(portId, channelId string) *GetChannel { - return &GetChannel{ - Channel: struct { - PortId string "json:\"port_id\"" - ChannelId string "json:\"channel_id\"" - }{ - PortId: portId, - ChannelId: channelId, - }, - } -} - -type GetPacketCommitment struct { - PacketCommitment struct { - PortId string `json:"port_id"` - ChannelId string `json:"channel_id"` - Sequence uint64 `json:"sequence"` - } `json:"get_packet_commitment"` -} - -func (x *GetPacketCommitment) Bytes() ([]byte, error) { - return json.Marshal(x) -} - -func NewPacketCommitment(portId, channelId string, sequence uint64) *GetPacketCommitment { - return &GetPacketCommitment{ - PacketCommitment: struct { - PortId string "json:\"port_id\"" - ChannelId string "json:\"channel_id\"" - Sequence uint64 "json:\"sequence\"" - }{ - PortId: portId, - ChannelId: channelId, - Sequence: sequence, - }, - } -} - -type GetPacketAcknowledgementCommitment struct { - PacketCommitment struct { - PortId string `json:"port_id"` - ChannelId string `json:"channel_id"` - Sequence uint64 `json:"sequence"` - } `json:"get_packet_acknowledgement_commitment"` -} - -func (x *GetPacketAcknowledgementCommitment) Bytes() ([]byte, error) { - return json.Marshal(x) -} - -func NewPacketAcknowledgementCommitment(portId, channelId string, sequence uint64) *GetPacketAcknowledgementCommitment { - return &GetPacketAcknowledgementCommitment{ - PacketCommitment: struct { - PortId string "json:\"port_id\"" - ChannelId string "json:\"channel_id\"" - Sequence uint64 "json:\"sequence\"" - }{ - PortId: portId, - ChannelId: channelId, - Sequence: sequence, - }, - } -} - -type GetNextSequenceSend struct { - NextSequenceSend struct { - PortId string `json:"port_id"` - ChannelId string `json:"channel_id"` - } `json:"get_next_sequence_send"` -} - -func (x *GetNextSequenceSend) Bytes() ([]byte, error) { - return json.Marshal(x) -} - -func NewNextSequenceSend(portId, channelId string) *GetNextSequenceSend { - return &GetNextSequenceSend{ - NextSequenceSend: struct { - PortId string "json:\"port_id\"" - ChannelId string "json:\"channel_id\"" - }{ - PortId: portId, - ChannelId: channelId, - }, - } -} - -type GetNextSequenceReceive struct { - NextSequenceReceive struct { - PortId string `json:"port_id"` - ChannelId string `json:"channel_id"` - } `json:"get_next_sequence_receive"` -} - -func (x *GetNextSequenceReceive) Bytes() ([]byte, error) { - return json.Marshal(x) -} - -func NewNextSequenceReceive(portId, channelId string) *GetNextSequenceReceive { - return &GetNextSequenceReceive{ - NextSequenceReceive: struct { - PortId string "json:\"port_id\"" - ChannelId string "json:\"channel_id\"" - }{ - PortId: portId, - ChannelId: channelId, - }, - } -} - -type GetNextSequenceAcknowledgement struct { - NextSequenceAck struct { - PortId string `json:"port_id"` - ChannelId string `json:"channel_id"` - } `json:"get_next_sequence_acknowledgement"` -} - -func (x *GetNextSequenceAcknowledgement) Bytes() ([]byte, error) { - return json.Marshal(x) -} - -func NewNextSequenceAcknowledgement(portId, channelId string) *GetNextSequenceAcknowledgement { - return &GetNextSequenceAcknowledgement{ - NextSequenceAck: struct { - PortId string "json:\"port_id\"" - ChannelId string "json:\"channel_id\"" - }{ - PortId: portId, - ChannelId: channelId, - }, - } -} - -type GetPacketReceipt struct { - PacketReceipt struct { - PortId string `json:"port_id"` - ChannelId string `json:"channel_id"` - Sequence uint64 `json:"sequence"` - } `json:"get_packet_receipt"` -} - -func (x *GetPacketReceipt) Bytes() ([]byte, error) { - return json.Marshal(x) -} - -func NewPacketReceipt(portId, channelId string, sequence uint64) *GetPacketReceipt { - return &GetPacketReceipt{ - PacketReceipt: struct { - PortId string "json:\"port_id\"" - ChannelId string "json:\"channel_id\"" - Sequence uint64 "json:\"sequence\"" - }{ - PortId: portId, - ChannelId: channelId, - Sequence: sequence, - }, - } -} - -type GetNextClientSequence struct { - Sequence struct{} `json:"get_next_client_sequence"` -} - -func (x *GetNextClientSequence) Bytes() ([]byte, error) { - return json.Marshal(x) -} - -func NewNextClientSequence() *GetNextClientSequence { - return &GetNextClientSequence{ - Sequence: struct{}{}, - } -} - -type GetNextConnectionSequence struct { - Sequence struct{} `json:"get_next_connection_sequence"` -} - -func (x *GetNextConnectionSequence) Bytes() ([]byte, error) { - return json.Marshal(x) -} - -func NewNextConnectionSequence() *GetNextConnectionSequence { - return &GetNextConnectionSequence{ - Sequence: struct{}{}, - } -} - -type GetNextChannelSequence struct { - Sequence struct{} `json:"get_next_channel_sequence"` -} - -func (x *GetNextChannelSequence) Bytes() ([]byte, error) { - return json.Marshal(x) -} - -func NewNextChannelSequence() *GetNextChannelSequence { - return &GetNextChannelSequence{ - Sequence: struct{}{}, - } -} - -type GetAllPorts struct { - AllPorts struct{} `json:"get_all_ports"` -} - -func (x *GetAllPorts) Bytes() ([]byte, error) { - return json.Marshal(x) -} - -func NewGetAllPorts() *GetAllPorts { - return &GetAllPorts{ - AllPorts: struct{}{}, - } -} - -type GetCommitmentPrefix struct { - GetCommitment struct{} `json:"get_commitment_prefix"` -} - -func (x *GetCommitmentPrefix) Bytes() ([]byte, error) { - return json.Marshal(x) -} - -func NewCommitmentPrefix() *GetCommitmentPrefix { - return &GetCommitmentPrefix{ - GetCommitment: struct{}{}, - } -} - -type GetPrevConsensusStateHeight struct { - ConsensusStateByHeight ConsensusStateByHeight `json:"get_previous_consensus_state_height"` -} - -func (x *GetPrevConsensusStateHeight) Bytes() ([]byte, error) { - return json.Marshal(x) -} - -func NewPrevConsensusStateHeight(clientId string, height uint64) *GetPrevConsensusStateHeight { - return &GetPrevConsensusStateHeight{ - ConsensusStateByHeight: ConsensusStateByHeight{ - ClientId: clientId, - Height: height, - }, - } +func NewRangeParams(ChannelId, PortId string, StartSequence, EndSequence uint64) RangeParams { + return RangeParams{ChannelId, PortId, StartSequence, EndSequence} } diff --git a/relayer/chains/wasm/utils.go b/relayer/chains/wasm/utils.go index d842b7a64..fe5bbd35d 100644 --- a/relayer/chains/wasm/utils.go +++ b/relayer/chains/wasm/utils.go @@ -5,7 +5,6 @@ import ( "encoding/hex" "encoding/json" "fmt" - "strconv" wasmtypes "github.com/CosmWasm/wasmd/x/wasm/types" "github.com/cosmos/relayer/v2/relayer/common" @@ -24,11 +23,6 @@ func getKeyLength(data string) string { return fmt.Sprintf("%x", buf) } -func byteToInt(b []byte) (int, error) { - return strconv.Atoi(string(b)) - -} - func ProcessContractResponse(p *wasmtypes.QuerySmartContractStateResponse) ([]byte, error) { var output string if err := json.Unmarshal(p.Data.Bytes(), &output); err != nil { diff --git a/relayer/chains/wasm/wasm_chain_processor.go b/relayer/chains/wasm/wasm_chain_processor.go index 0eb2712d9..f8386bae2 100644 --- a/relayer/chains/wasm/wasm_chain_processor.go +++ b/relayer/chains/wasm/wasm_chain_processor.go @@ -13,7 +13,6 @@ import ( clienttypes "github.com/cosmos/ibc-go/v7/modules/core/02-client/types" conntypes "github.com/cosmos/ibc-go/v7/modules/core/03-connection/types" chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" - "github.com/cosmos/relayer/v2/relayer/common" "github.com/cosmos/relayer/v2/relayer/processor" "github.com/cosmos/relayer/v2/relayer/provider" @@ -59,14 +58,14 @@ type WasmChainProcessor struct { verifier *Verifier - heightSnapshotChan chan struct{} + // heightSnapshotChan chan struct{} } type Verifier struct { Header *types.LightBlock } -func NewWasmChainProcessor(log *zap.Logger, provider *WasmProvider, metrics *processor.PrometheusMetrics, heightSnapshot chan struct{}) *WasmChainProcessor { +func NewWasmChainProcessor(log *zap.Logger, provider *WasmProvider, metrics *processor.PrometheusMetrics) *WasmChainProcessor { return &WasmChainProcessor{ log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())), chainProvider: provider, @@ -76,7 +75,7 @@ func NewWasmChainProcessor(log *zap.Logger, provider *WasmProvider, metrics *pro connectionClients: make(map[string]string), channelConnections: make(map[string]string), metrics: metrics, - heightSnapshotChan: heightSnapshot, + // heightSnapshotChan: heightSnapshot, } } @@ -90,7 +89,7 @@ const ( defaultMinQueryLoopDuration = 1 * time.Second defaultBalanceUpdateWaitDuration = 60 * time.Second inSyncNumBlocksThreshold = 2 - MaxBlockFetch = 100 + MaxBlockFetch = 10000 ) // latestClientState is a map of clientID to the latest clientInfo for that client. @@ -203,19 +202,19 @@ type queryCyclePersistence struct { balanceUpdateWaitDuration time.Duration } -func (ccp *WasmChainProcessor) StartFromHeight(ctx context.Context) int64 { - cfg := ccp.Provider().ProviderConfig().(*WasmProviderConfig) - if cfg.StartHeight != 0 { - return int64(cfg.StartHeight) - } - snapshotHeight, err := common.LoadSnapshotHeight(ccp.Provider().ChainId()) - if err != nil { - ccp.log.Warn("Failed to load height from snapshot", zap.Error(err)) - } else { - ccp.log.Info("Obtained start height from config", zap.Int64("height", snapshotHeight)) - } - return snapshotHeight -} +// func (ccp *WasmChainProcessor) StartFromHeight(ctx context.Context) int64 { +// cfg := ccp.Provider().ProviderConfig().(*WasmProviderConfig) +// if cfg.StartHeight != 0 { +// return int64(cfg.StartHeight) +// } +// snapshotHeight, err := common.LoadSnapshotHeight(ccp.Provider().ChainId()) +// if err != nil { +// ccp.log.Warn("Failed to load height from snapshot", zap.Error(err)) +// } else { +// ccp.log.Info("Obtained start height from config", zap.Int64("height", snapshotHeight)) +// } +// return snapshotHeight +// } // Run starts the query loop for the chain which will gather applicable ibc messages and push events out to the relevant PathProcessors. // The initialBlockHistory parameter determines how many historical blocks should be fetched and processed before continuing with current blocks. @@ -247,14 +246,14 @@ func (ccp *WasmChainProcessor) Run(ctx context.Context, initialBlockHistory uint } // this will make initial QueryLoop iteration look back initialBlockHistory blocks in history - latestQueriedBlock := ccp.StartFromHeight(ctx) - if latestQueriedBlock <= 0 || latestQueriedBlock > persistence.latestHeight { - latestQueriedBlock = persistence.latestHeight - } + // latestQueriedBlock := ccp.StartFromHeight(ctx) + // if latestQueriedBlock <= 0 || latestQueriedBlock > persistence.latestHeight { + // latestQueriedBlock = persistence.latestHeight + // } - persistence.latestQueriedBlock = int64(latestQueriedBlock) + persistence.latestQueriedBlock = int64(persistence.latestHeight) - ccp.log.Info("Start to query from height ", zap.Int64("height", latestQueriedBlock)) + ccp.log.Info("Start to query from height ", zap.Int64("height", persistence.latestQueriedBlock)) _, lightBlock, err := ccp.chainProvider.QueryLightBlock(ctx, persistence.latestQueriedBlock) if err != nil { @@ -292,8 +291,8 @@ func (ccp *WasmChainProcessor) Run(ctx context.Context, initialBlockHistory uint select { case <-ctx.Done(): return nil - case <-ccp.heightSnapshotChan: - ccp.SnapshotHeight(ccp.getHeightToSave(persistence.latestHeight)) + // case <-ccp.heightSnapshotChan: + // ccp.SnapshotHeight(ccp.getHeightToSave(persistence.latestHeight)) case <-ticker.C: ticker.Reset(persistence.minQueryLoopDuration) } @@ -405,9 +404,15 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer chainID := ccp.chainProvider.ChainId() var latestHeader provider.IBCHeader + // max number of block that can be processed at a time + concurrency := ccp.chainProvider.PCfg.Concurrency + if concurrency <= 0 || concurrency > MaxBlockFetch { + concurrency = MaxBlockFetch + } + syncUpHeight := func() int64 { - if persistence.latestHeight-persistence.latestQueriedBlock > MaxBlockFetch { - return persistence.latestQueriedBlock + MaxBlockFetch + if persistence.latestHeight-persistence.latestQueriedBlock > concurrency { + return persistence.latestQueriedBlock + concurrency } return persistence.latestHeight } @@ -519,22 +524,22 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer return nil } -func (ccp *WasmChainProcessor) getHeightToSave(height int64) int64 { - retryAfter := ccp.Provider().ProviderConfig().GetFirstRetryBlockAfter() - ht := height - int64(retryAfter) - if ht < 0 { - return 0 - } - return ht -} +// func (ccp *WasmChainProcessor) getHeightToSave(height int64) int64 { +// retryAfter := ccp.Provider().ProviderConfig().GetFirstRetryBlockAfter() +// ht := height - int64(retryAfter) +// if ht < 0 { +// return 0 +// } +// return ht +// } -func (ccp *WasmChainProcessor) SnapshotHeight(height int64) { - ccp.log.Info("Save height for snapshot", zap.Int64("height", height)) - err := common.SnapshotHeight(ccp.Provider().ChainId(), height) - if err != nil { - ccp.log.Warn("Failed saving height snapshot for height", zap.Int64("height", height)) - } -} +// func (ccp *WasmChainProcessor) SnapshotHeight(height int64) { +// ccp.log.Info("Save height for snapshot", zap.Int64("height", height)) +// err := common.SnapshotHeight(ccp.Provider().ChainId(), height) +// if err != nil { +// ccp.log.Warn("Failed saving height snapshot for height", zap.Int64("height", height)) +// } +// } func (ccp *WasmChainProcessor) CollectMetrics(ctx context.Context, persistence *queryCyclePersistence) { ccp.CurrentBlockHeight(ctx, persistence) diff --git a/relayer/channel.go b/relayer/channel.go index 184ed599c..2f3f32b64 100644 --- a/relayer/channel.go +++ b/relayer/channel.go @@ -71,8 +71,8 @@ func (c *Chain) CreateOpenChannels( return processor.NewEventProcessor(). WithChainProcessors( - c.chainProcessor(c.log, nil, nil), - dst.chainProcessor(c.log, nil, nil), + c.chainProcessor(c.log, nil), + dst.chainProcessor(c.log, nil), ). WithPathProcessors(pp). WithInitialBlockHistory(0). @@ -121,8 +121,8 @@ func (c *Chain) CloseChannel( flushProcessor := processor.NewEventProcessor(). WithChainProcessors( - c.chainProcessor(c.log, nil, nil), - dst.chainProcessor(c.log, nil, nil), + c.chainProcessor(c.log, nil), + dst.chainProcessor(c.log, nil), ). WithPathProcessors(processor.NewPathProcessor( c.log, @@ -159,8 +159,8 @@ func (c *Chain) CloseChannel( return processor.NewEventProcessor(). WithChainProcessors( - c.chainProcessor(c.log, nil, nil), - dst.chainProcessor(c.log, nil, nil), + c.chainProcessor(c.log, nil), + dst.chainProcessor(c.log, nil), ). WithPathProcessors(processor.NewPathProcessor( c.log, diff --git a/relayer/common/const.go b/relayer/common/const.go index 55dee1afc..f2923c161 100644 --- a/relayer/common/const.go +++ b/relayer/common/const.go @@ -6,16 +6,15 @@ import ( ) var ( - EventTimeoutRequest = "TimeoutRequest(bytes)" - IconModule = "icon" - WasmModule = "wasm" - ArchwayModule = "archway" - TendermintLightClient = "07-tendermint" - IconLightClient = "iconclient" - ConnectionKey = "connection" - ChannelKey = "channel" - ONE_HOUR = 60 * 60 * 1000 - NanoToMilliRatio = 1000_000 + EventTimeoutRequest = "TimeoutRequest(bytes)" + IconModule = "icon" + WasmModule = "wasm" + TendermintLightClient = "07-tendermint" + IconLightClient = "iconclient" + ConnectionKey = "connection" + ChannelKey = "channel" + ONE_HOUR = 60 * 60 * 1000 + IconStartHeightFromPreRunContext = "icon-start-height-from-prerun-context" ) var ( diff --git a/relayer/common/paginate.go b/relayer/common/paginate.go new file mode 100644 index 000000000..6b99d7058 --- /dev/null +++ b/relayer/common/paginate.go @@ -0,0 +1,41 @@ +package common + +import "fmt" + +type Paginate struct { + startSeq uint64 + endSeq uint64 + limit uint64 + page uint64 +} + +func NewPaginate(startSeq, endSeq, limit uint64) *Paginate { + return &Paginate{ + startSeq: startSeq, + endSeq: endSeq, + limit: limit, + page: 0, + } +} + +func (p *Paginate) HasNext() bool { + return p.startSeq <= p.endSeq +} + +func (p *Paginate) Next() (uint64, uint64, error) { + if !p.HasNext() { + return 0, 0, fmt.Errorf("no more pages available") + } + + start := p.startSeq + end := p.startSeq + p.limit - 1 + + if end > p.endSeq { + end = p.endSeq + } + + p.startSeq = end + 1 + p.page++ + + return start, end, nil +} diff --git a/relayer/common/utils.go b/relayer/common/utils.go index 47b4ee60c..8f2ea56af 100644 --- a/relayer/common/utils.go +++ b/relayer/common/utils.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path" + "reflect" "strconv" "strings" ) @@ -58,3 +59,19 @@ func LoadSnapshotHeight(chain_id string) (int64, error) { } return strconv.ParseInt(strings.TrimSuffix(string(content), "\n"), 10, 64) } + +func AnyToInt64(value interface{}) (int64, error) { + switch v := reflect.ValueOf(value); v.Kind() { + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + return v.Int(), nil + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + return int64(v.Uint()), nil + case reflect.Float32, reflect.Float64: + return int64(v.Float()), nil + case reflect.String: + // Parse the string as an int64 + return strconv.ParseInt(v.String(), 10, 64) + default: + return 0, fmt.Errorf("unsupported type: %v", v.Kind()) + } +} diff --git a/relayer/connection.go b/relayer/connection.go index 9c64619cc..df784d504 100644 --- a/relayer/connection.go +++ b/relayer/connection.go @@ -61,8 +61,8 @@ func (c *Chain) CreateOpenConnections( return connectionSrc, connectionDst, processor.NewEventProcessor(). WithChainProcessors( - c.chainProcessor(c.log, nil, nil), - dst.chainProcessor(c.log, nil, nil), + c.chainProcessor(c.log, nil), + dst.chainProcessor(c.log, nil), ). WithPathProcessors(pp). WithInitialBlockHistory(initialBlockHistory). diff --git a/relayer/processor/chain_processor.go b/relayer/processor/chain_processor.go index 5637a47ce..7b1ea825a 100644 --- a/relayer/processor/chain_processor.go +++ b/relayer/processor/chain_processor.go @@ -23,11 +23,11 @@ type ChainProcessor interface { // Take snapshot of height every N blocks or when the chain processor fails, so that the relayer // can restart from that height - SnapshotHeight(height int64) + // SnapshotHeight(height int64) // If the relay goes down, start chain processor from height returned by this function // CAN return max(snapshotHeight, latestHeightFromClient) - StartFromHeight(ctx context.Context) int64 + // StartFromHeight(ctx context.Context) int64 } // ChainProcessors is a slice of ChainProcessor instances. diff --git a/relayer/processor/message_processor.go b/relayer/processor/message_processor.go index 645eee8c3..aec51172e 100644 --- a/relayer/processor/message_processor.go +++ b/relayer/processor/message_processor.go @@ -23,6 +23,7 @@ type messageProcessor struct { memo string msgUpdateClient provider.RelayerMessage + msgUpdateClientHeight uint64 clientUpdateThresholdTime time.Duration pktMsgs []packetMessageToTrack @@ -110,12 +111,17 @@ func (mp *messageProcessor) shouldUpdateClientNow(ctx context.Context, src, dst if src.BTPHeightQueue.Size() == 0 { return false, nil } - btpHeightInfo, err := src.BTPHeightQueue.GetQueue() + h, err := src.BTPHeightQueue.GetQueue() if err != nil { return false, nil } - if btpHeightInfo.IsProcessing { + blockHeightInfo, err := src.BTPHeightQueue.GetHeightInfo(h) + if err != nil { + return false, err + } + + if blockHeightInfo.IsProcessing { return false, nil } return true, nil @@ -299,6 +305,7 @@ func (mp *messageProcessor) assembleMsgUpdateClient(ctx context.Context, src, ds } mp.msgUpdateClient = msgUpdateClient + mp.msgUpdateClientHeight = src.latestHeader.Height() return nil } @@ -311,7 +318,7 @@ func (mp *messageProcessor) handleMsgUpdateClientForBTPClient(ctx context.Contex if src.BTPHeightQueue.Size() == 0 { return nil } - btpHeightInfo, err := src.BTPHeightQueue.GetQueue() + nextBtpHeight, err := src.BTPHeightQueue.GetQueue() if err != nil { return nil } @@ -320,9 +327,9 @@ func (mp *messageProcessor) handleMsgUpdateClientForBTPClient(ctx context.Contex return nil } - header, err := src.chainProvider.QueryIBCHeader(ctx, btpHeightInfo.Height) + header, err := src.chainProvider.QueryIBCHeader(ctx, int64(nextBtpHeight)) if err != nil { - return fmt.Errorf("Failed to query header for height %d", btpHeightInfo.Height) + return fmt.Errorf("Failed to query header for height %d", nextBtpHeight) } if !header.IsCompleteBlock() { @@ -361,6 +368,7 @@ func (mp *messageProcessor) handleMsgUpdateClientForBTPClient(ctx context.Contex } mp.msgUpdateClient = msgUpdateClient + mp.msgUpdateClientHeight = header.Height() return nil } @@ -416,18 +424,25 @@ func (mp *messageProcessor) sendClientUpdate( dst.log.Debug("Will relay client update") if IsBTPLightClient(dst.clientState) { - blockInfoHeight, err := src.BTPHeightQueue.GetQueue() + h, err := src.BTPHeightQueue.GetQueue() + if err != nil { + mp.log.Debug("No message in the queue", zap.Error(err)) + return + } + blockHeightInfo, err := src.BTPHeightQueue.GetHeightInfo(h) if err != nil { mp.log.Debug("No message in the queue", zap.Error(err)) return } + if blockHeightInfo.IsProcessing { + return + } dst.lastClientUpdateHeightMu.Lock() - dst.lastClientUpdateHeight = uint64(blockInfoHeight.Height) + dst.lastClientUpdateHeight = h dst.lastClientUpdateHeightMu.Unlock() - src.BTPHeightQueue.ReplaceQueue(zeroIndex, BlockInfoHeight{ - Height: int64(blockInfoHeight.Height), + src.BTPHeightQueue.ReplaceQueue(h, BlockInfoHeight{ IsProcessing: true, - RetryCount: blockInfoHeight.RetryCount + 1, + RetryCount: blockHeightInfo.RetryCount, }) } else { @@ -446,34 +461,39 @@ func (mp *messageProcessor) sendClientUpdate( zap.Error(err)) } if IsBTPLightClient(dst.clientState) { - if src.BTPHeightQueue.Size() == 0 { - return - } - blockHeightInfo, err := src.BTPHeightQueue.GetQueue() + h, err := src.BTPHeightQueue.GetQueue() if err != nil { return } if rtr != nil && rtr.Code == 0 { - if blockHeightInfo.Height == int64(dst.lastClientUpdateHeight) { + if h == dst.lastClientUpdateHeight { src.BTPHeightQueue.Dequeue() } return } // this would represent a failure case in that case isProcessing should be false - if blockHeightInfo.Height == int64(dst.lastClientUpdateHeight) { - if blockHeightInfo.RetryCount >= 5 { - // removing btpBLock update - src.BTPHeightQueue.Dequeue() + // replacing the attempt only if the attempts fails + if h == dst.lastClientUpdateHeight { + heightInfo, err := src.BTPHeightQueue.GetHeightInfo(h) + if err != nil { + mp.log.Error("Error occured when getting Btp block height Info", zap.Error(err)) return } - src.BTPHeightQueue.ReplaceQueue(zeroIndex, BlockInfoHeight{ - Height: int64(dst.lastClientUpdateHeight), + src.BTPHeightQueue.ReplaceQueue(h, BlockInfoHeight{ IsProcessing: false, - RetryCount: blockHeightInfo.RetryCount + 1, + RetryCount: heightInfo.RetryCount + 1, }) + if heightInfo.RetryCount >= 5 { + // throwing error if the Retry count is greater than 5 + // not processing forward until height x btp block is updated + mp.log.Error("Unable to update Btp Client even after attempt:", + zap.String("ClientId", dst.clientState.ClientID), zap.Error(err)) + return + } + } } } diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index aa48f414e..3c12d416d 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -55,7 +55,7 @@ type pathEndRuntime struct { lastClientUpdateHeightMu sync.Mutex metrics *PrometheusMetrics - BTPHeightQueue Queue[BlockInfoHeight] + BTPHeightQueue *BtpHeightMapQueue } func newPathEndRuntime(log *zap.Logger, pathEnd PathEnd, metrics *PrometheusMetrics) *pathEndRuntime { @@ -77,7 +77,7 @@ func newPathEndRuntime(log *zap.Logger, pathEnd PathEnd, metrics *PrometheusMetr clientICQProcessing: make(clientICQProcessingCache), connSubscribers: make(map[string][]func(provider.ConnectionInfo)), metrics: metrics, - BTPHeightQueue: NewBlockInfoHeightQueue[BlockInfoHeight](), + BTPHeightQueue: NewBtpHeightMapQueue(), } } @@ -434,7 +434,7 @@ func (pathEnd *pathEndRuntime) updateBTPQueue(d ChainProcessorCacheData, counter return } - btpHeightKey := BlockInfoHeight{Height: int64(d.LatestHeader.Height()), IsProcessing: false} + btpHeightKey := d.LatestBlock.Height if d.LatestHeader.ShouldUpdateForProofContextChange() { pathEnd.BTPHeightQueue.Enqueue(btpHeightKey) return @@ -544,7 +544,7 @@ func (pathEnd *pathEndRuntime) shouldSendPacketMessage(message packetIBCMessage, ) return false } - if counterparty.BTPHeightQueue.ItemExist(int64(message.info.Height)) { + if counterparty.BTPHeightQueue.ItemExist(uint64(message.info.Height)) { pathEnd.log.Debug("Waiting to relay packet message until clientState is in queue since btp height exist", zap.Inline(message), zap.String("event_type", eventType), diff --git a/relayer/processor/path_processor.go b/relayer/processor/path_processor.go index b14fbc1a0..768418c89 100644 --- a/relayer/processor/path_processor.go +++ b/relayer/processor/path_processor.go @@ -49,6 +49,9 @@ const ( // made to retrieve the client consensus state in order to assemble a // MsgUpdateClient message. clientConsensusHeightUpdateThresholdBlocks = 2 + + // Needed for finding trusting block one week + defaultTrustingPeriod = 7 * 24 * time.Hour ) // PathProcessor is a process that handles incoming IBC messages from a pair of chains. @@ -276,7 +279,8 @@ func (pp *PathProcessor) HandleNewData(chainID string, cacheData ChainProcessorC func (pp *PathProcessor) handleFlush(ctx context.Context) { flushTimer := pp.flushInterval - if err := pp.flush(ctx); err != nil { + + if err := pp.flushByCase(ctx); err != nil { pp.log.Warn("Flush not complete", zap.Error(err)) flushTimer = flushFailureRetry } @@ -310,7 +314,7 @@ func (pp *PathProcessor) processAvailableSignals(ctx context.Context, cancel fun // No new data to merge in, just retry handling. case <-pp.flushTimer.C: // Periodic flush to clear out any old packets - // pp.handleFlush(ctx) + pp.handleFlush(ctx) } return false } @@ -319,7 +323,7 @@ func (pp *PathProcessor) processAvailableSignals(ctx context.Context, cancel fun func (pp *PathProcessor) Run(ctx context.Context, cancel func()) { var retryTimer *time.Timer - pp.flushTimer = time.NewTimer(time.Hour) + pp.flushTimer = time.NewTimer(pp.flushInterval) for { // block until we have any signals to process @@ -339,7 +343,7 @@ func (pp *PathProcessor) Run(ctx context.Context, cancel func()) { } if pp.shouldFlush() && !pp.initialFlushComplete { - // pp.handleFlush(ctx) + pp.handleFlush(ctx) pp.initialFlushComplete = true } else if pp.shouldTerminateForFlushComplete() { cancel() diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 8dd0c7f2f..8f51f15e4 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -7,6 +7,7 @@ import ( "fmt" "sort" "sync" + "time" clienttypes "github.com/cosmos/ibc-go/v7/modules/core/02-client/types" conntypes "github.com/cosmos/ibc-go/v7/modules/core/03-connection/types" @@ -1569,19 +1570,19 @@ func (pp *PathProcessor) UpdateBTPHeight(ctx context.Context, src *pathEndRuntim } size := src.BTPHeightQueue.Size() for i := 0; i < size; i++ { - btpHeightInfo, err := src.BTPHeightQueue.GetQueue() + btpHeight, err := src.BTPHeightQueue.GetQueue() if err != nil { continue } - if dst.clientState.ConsensusHeight.RevisionHeight < uint64(btpHeightInfo.Height) { + if dst.clientState.ConsensusHeight.RevisionHeight < btpHeight { break } - if dst.clientState.ConsensusHeight.RevisionHeight == uint64(btpHeightInfo.Height) { + if dst.clientState.ConsensusHeight.RevisionHeight == btpHeight { src.BTPHeightQueue.Dequeue() continue } - if dst.clientState.ConsensusHeight.RevisionHeight > uint64(btpHeightInfo.Height) { - cs, err := dst.chainProvider.QueryClientConsensusState(ctx, int64(dst.latestBlock.Height), dst.clientState.ClientID, clienttypes.NewHeight(0, uint64(btpHeightInfo.Height))) + if dst.clientState.ConsensusHeight.RevisionHeight > btpHeight { + cs, err := dst.chainProvider.QueryClientConsensusState(ctx, int64(dst.latestBlock.Height), dst.clientState.ClientID, clienttypes.NewHeight(0, uint64(btpHeight))) if err == nil && cs != nil { // removing latest height element src.BTPHeightQueue.Dequeue() @@ -1589,3 +1590,390 @@ func (pp *PathProcessor) UpdateBTPHeight(ctx context.Context, src *pathEndRuntim } } } + +func (pp *PathProcessor) flushByCase(ctx context.Context) error { + mod1 := pp.pathEnd1.chainProvider.Type() + mod2 := pp.pathEnd2.chainProvider.Type() + if mod1 == common.IconModule && mod2 == common.WasmModule || mod1 == common.WasmModule && mod2 == common.WasmModule { + return pp.ibcContractBasedFlush(ctx) + } + return pp.flush(ctx) + +} + +// both source and dst chain is ibc implemented contracts +func (pp *PathProcessor) ibcContractBasedFlush(ctx context.Context) error { + + var ( + channelPacketheights1 = make(map[ChannelKey]provider.MessageHeightsInfo) + channelPacketheights2 = make(map[ChannelKey]provider.MessageHeightsInfo) + channelPacketheights1Mu, channelPacketheights2Mu sync.Mutex + + pathEnd1Cache = NewIBCMessagesCache() + pathEnd2Cache = NewIBCMessagesCache() + pathEnd1CacheMu, pathEnd2CacheMu sync.Mutex + ) + + // Query remaining packet commitments on both chains + var eg errgroup.Group + for k, open := range pp.pathEnd1.channelStateCache { + if !open { + continue + } + if !pp.pathEnd1.info.ShouldRelayChannel(ChainChannelKey{ + ChainID: pp.pathEnd1.info.ChainID, + CounterpartyChainID: pp.pathEnd2.info.ChainID, + ChannelKey: k, + }) { + continue + } + eg.Go(QueryPacketHeights(ctx, pp.pathEnd1, pp.pathEnd2, k, channelPacketheights1, &channelPacketheights1Mu)) + } + for k, open := range pp.pathEnd2.channelStateCache { + if !open { + continue + } + if !pp.pathEnd2.info.ShouldRelayChannel(ChainChannelKey{ + ChainID: pp.pathEnd2.info.ChainID, + CounterpartyChainID: pp.pathEnd1.info.ChainID, + ChannelKey: k, + }) { + continue + } + eg.Go(QueryPacketHeights(ctx, pp.pathEnd2, pp.pathEnd1, k, channelPacketheights2, &channelPacketheights2Mu)) + } + + if err := eg.Wait(); err != nil { + return fmt.Errorf("failed to query packet commitments: %w", err) + } + + skipped := false + for k, seqs := range channelPacketheights1 { + k := k + seqs := seqs + eg.Go(func() error { + done, err := pp.queuePendingRecvAndAcksByHeights(ctx, pp.pathEnd1, pp.pathEnd2, k, seqs, pathEnd1Cache.PacketFlow, pathEnd2Cache.PacketFlow, &pathEnd1CacheMu, &pathEnd2CacheMu) + if err != nil { + return err + } + if !done { + skipped = true + } + return nil + }) + } + + for k, seqs := range channelPacketheights2 { + k := k + seqs := seqs + eg.Go(func() error { + done, err := pp.queuePendingRecvAndAcksByHeights(ctx, pp.pathEnd2, pp.pathEnd1, k, seqs, pathEnd2Cache.PacketFlow, pathEnd1Cache.PacketFlow, &pathEnd2CacheMu, &pathEnd1CacheMu) + if err != nil { + return err + } + if !done { + skipped = true + } + return nil + }) + } + + if err := eg.Wait(); err != nil { + return fmt.Errorf("failed to enqueue pending messages for flush: %w", err) + } + + pp.pathEnd1.mergeMessageCache(pathEnd1Cache, pp.pathEnd2.info.ChainID, pp.pathEnd2.inSync) + pp.pathEnd2.mergeMessageCache(pathEnd2Cache, pp.pathEnd1.info.ChainID, pp.pathEnd1.inSync) + + if skipped { + return fmt.Errorf("flush was successful, but more packet sequences are still pending") + } + + return nil +} + +func QueryPacketHeights( + ctx context.Context, + src *pathEndRuntime, + dst *pathEndRuntime, + k ChannelKey, + packetHeights map[ChannelKey]provider.MessageHeightsInfo, + mu sync.Locker, +) func() error { + return func() error { + src.log.Debug("Flushing", zap.String("channel", k.ChannelID), zap.String("port", k.PortID)) + + endSeq, err := src.chainProvider.QueryNextSeqSend(ctx, int64(src.latestBlock.Height), k.ChannelID, k.PortID) + if err != nil { + return err + } + + startSeq := uint64(0) + if src.chainProvider.ProviderConfig().GetBlockInterval() > 0 { + // blockInterval from provider config will be in milliseconds + blockInterval := int64(src.chainProvider.ProviderConfig().GetBlockInterval()) * int64(time.Millisecond) + startBlock := int64(src.latestBlock.Height) - defaultTrustingPeriod.Nanoseconds()/blockInterval + if startBlock > 0 { + startSeq, err = src.chainProvider.QueryNextSeqSend(ctx, startBlock, k.ChannelID, k.PortID) + if err != nil { + // don't care about error here, + // if error start from startSeq 0 + } + } + + } + + c, err := src.chainProvider.QueryPacketHeights(ctx, int64(src.latestBlock.Height), k.ChannelID, k.PortID, startSeq, endSeq) + if err != nil { + return err + } + mu.Lock() + defer mu.Unlock() + packetHeights[k] = provider.MessageHeightsInfo{ + MessageHeights: c, + StartSeq: startSeq, + EndSeq: endSeq, + } + return nil + } +} + +// queuePendingRecvAndAcks returns whether flush can be considered complete (none skipped). +func (pp *PathProcessor) queuePendingRecvAndAcksByHeights( + ctx context.Context, + src, dst *pathEndRuntime, + k ChannelKey, + packetHeights provider.MessageHeightsInfo, + srcCache ChannelPacketMessagesCache, + dstCache ChannelPacketMessagesCache, + srcMu sync.Locker, + dstMu sync.Locker, +) (bool, error) { + + if len(packetHeights.MessageHeights) == 0 { + src.log.Debug("Nothing to flush", zap.String("channel", k.ChannelID), zap.String("port", k.PortID)) + return true, nil + } + + dstChan, dstPort := k.CounterpartyChannelID, k.CounterpartyPortID + + unrecv, err := dst.chainProvider.QueryMissingPacketReceipts(ctx, int64(dst.latestBlock.Height), dstChan, dstPort, packetHeights.StartSeq, packetHeights.EndSeq) + if err != nil { + return false, err + } + + dstHeight := int64(dst.latestBlock.Height) + + if len(unrecv) > 0 { + channel, err := dst.chainProvider.QueryChannel(ctx, dstHeight, dstChan, dstPort) + if err != nil { + return false, err + } + + if channel.Channel.Ordering == chantypes.ORDERED { + nextSeqRecv, err := dst.chainProvider.QueryNextSeqRecv(ctx, dstHeight, dstChan, dstPort) + if err != nil { + return false, err + } + + var newUnrecv []uint64 + + for _, seq := range unrecv { + if seq >= nextSeqRecv.NextSequenceReceive { + newUnrecv = append(newUnrecv, seq) + } + } + + unrecv = newUnrecv + + sort.SliceStable(unrecv, func(i, j int) bool { + return unrecv[i] < unrecv[j] + }) + } + } + + var eg errgroup.Group + + skipped := false + + for i, seq := range unrecv { + // seq could be only queried if in packetheights + seqHeight, ok := packetHeights.MessageHeights[seq] + if !ok { + continue + } + + srcMu.Lock() + if srcCache.IsCached(chantypes.EventTypeSendPacket, k, seq) { + continue // already cached + } + srcMu.Unlock() + + if i >= maxPacketsPerFlush { + skipped = true + break + } + + src.log.Debug("Querying send packet", + zap.String("channel", k.ChannelID), + zap.String("port", k.PortID), + zap.Any("Seq", seq), + ) + + seq := seq + + eg.Go(func() error { + sendPacket, err := src.chainProvider.QueryPacketMessageByEventHeight(ctx, chantypes.EventTypeSendPacket, k.ChannelID, k.PortID, seq, seqHeight) + if err != nil { + return err + } + + // save btp block if height is Icon + // height+1 + if src.chainProvider.Type() == common.IconModule { + _, err := dst.chainProvider.QueryClientConsensusState(ctx, int64(dst.latestBlock.Height), dst.clientState.ClientID, clienttypes.NewHeight(0, sendPacket.Height)) + if err != nil { + // could mean update client for the message is not present + header, err := src.chainProvider.QueryIBCHeader(ctx, int64(sendPacket.Height)) + if err != nil { + return err + } + if !header.IsCompleteBlock() { + return fmt.Errorf("icon module should have complete block at height: %d because of send packet message: %v", sendPacket.Height, sendPacket) + } + // enqueuing this height if it doesn't exist + if !src.BTPHeightQueue.ItemExist(sendPacket.Height) { + src.BTPHeightQueue.Enqueue(sendPacket.Height) + } + } + } + + srcMu.Lock() + srcCache.Cache(chantypes.EventTypeSendPacket, k, seq, sendPacket) + srcMu.Unlock() + + src.log.Debug("Cached send packet", + zap.String("channel", k.ChannelID), + zap.String("port", k.PortID), + zap.String("ctrpty_channel", k.CounterpartyChannelID), + zap.String("ctrpty_port", k.CounterpartyPortID), + zap.Uint64("sequence", seq), + ) + return nil + }) + // not trying to enqueue btpBlock height if height is missing + } + + if err := eg.Wait(); err != nil { + return false, err + } + + if len(unrecv) > 0 { + src.log.Debug("Will flush MsgRecvPacket", + zap.String("channel", k.ChannelID), + zap.String("port", k.PortID), + zap.Any("PacketHeights", packetHeights), + ) + } else { + src.log.Debug("No MsgRecvPacket to flush", + zap.String("channel", k.ChannelID), + zap.String("port", k.PortID), + ) + } + + var unacked []uint64 + + ackHeights, err := dst.chainProvider.QueryAckHeights(ctx, int64(dst.latestBlock.Height), dstChan, dstPort, packetHeights.StartSeq, packetHeights.EndSeq) + if err != nil { + return false, err + } + + counter := 0 + for seq, height := range ackHeights { + + // Is packetHeights is present then Ack not received + // not present means ack already received + _, ok := packetHeights.MessageHeights[seq] + if !ok { + continue + } + + dstMu.Lock() + ck := k.Counterparty() + if dstCache.IsCached(chantypes.EventTypeWriteAck, ck, seq) { + continue // already cached + } + dstMu.Unlock() + + if counter >= maxPacketsPerFlush { + skipped = true + break + } + + dst.log.Debug("Querying write Ack", + zap.String("channel", k.CounterpartyChannelID), + zap.String("port", k.CounterpartyPortID), + zap.Uint64("sequence", seq), + zap.Uint64("height", height), + ) + + seq := seq + height := height + eg.Go(func() error { + ackPacket, err := dst.chainProvider.QueryPacketMessageByEventHeight(ctx, chantypes.EventTypeWriteAck, k.CounterpartyChannelID, k.CounterpartyPortID, seq, height) + if err != nil { + return err + } + + // save btp block if height is Icon + // height+1 + if dst.chainProvider.Type() == common.IconModule { + _, err := src.chainProvider.QueryClientConsensusState(ctx, int64(dst.latestBlock.Height), src.clientState.ClientID, clienttypes.NewHeight(0, ackPacket.Height)) + if err != nil { + // could mean update client for the message is not present + header, err := dst.chainProvider.QueryIBCHeader(ctx, int64(ackPacket.Height)) + if err != nil { + return err + } + if !header.IsCompleteBlock() { + return fmt.Errorf("icon module should have complete block at height:%d because of send packet message: %v", + ackPacket.Height, ackPacket) + } + + // enqueuing this height + if !dst.BTPHeightQueue.ItemExist(ackPacket.Height) { + dst.BTPHeightQueue.Enqueue(ackPacket.Height) + } + } + } + + ck := k.Counterparty() + dstMu.Lock() + dstCache.Cache(chantypes.EventTypeWriteAck, ck, seq, ackPacket) + dstMu.Unlock() + + return nil + }) + counter++ + } + + if err := eg.Wait(); err != nil { + return false, err + } + + if len(unacked) > 0 { + dst.log.Debug( + "Will flush MsgAcknowledgement", + zap.Object("channel", k), + zap.Uint64s("sequences", unacked), + ) + } else { + dst.log.Debug( + "No MsgAcknowledgement to flush", + zap.String("channel", k.CounterpartyChannelID), + zap.String("port", k.CounterpartyPortID), + ) + } + + return !skipped, nil +} diff --git a/relayer/processor/types.go b/relayer/processor/types.go index f32a636d3..985caf1b9 100644 --- a/relayer/processor/types.go +++ b/relayer/processor/types.go @@ -598,105 +598,93 @@ func ConnectionInfoConnectionKey(info provider.ConnectionInfo) ConnectionKey { } } -type Queue[T any] interface { - Enqueue(item T) - Dequeue() (T, error) - MustGetQueue() T - GetQueue() (T, error) - ItemExist(interface{}) bool - ReplaceQueue(index int, item T) - Size() int -} - type ExistenceChecker interface { Exists(target interface{}) bool } type BlockInfoHeight struct { - Height int64 IsProcessing bool RetryCount int64 } -func (bi BlockInfoHeight) Exists(target interface{}) bool { - if height, ok := target.(int64); ok { - return bi.Height == height - } - return false +type BtpHeightMapQueue struct { + HeightMap map[uint64]BlockInfoHeight + HeightQueue []uint64 + mu *sync.Mutex } -type ArrayQueue[T ExistenceChecker] struct { - items []T - mu *sync.Mutex +func NewBtpHeightMapQueue() *BtpHeightMapQueue { + return &BtpHeightMapQueue{ + HeightMap: make(map[uint64]BlockInfoHeight, 0), + HeightQueue: make([]uint64, 0), + mu: &sync.Mutex{}, + } } -func (q *ArrayQueue[T]) Enqueue(item T) { - q.mu.Lock() - defer q.mu.Unlock() - q.items = append(q.items, item) -} +func (m *BtpHeightMapQueue) Enqueue(height uint64) { + m.mu.Lock() + defer m.mu.Unlock() + m.HeightQueue = append(m.HeightQueue, height) + m.HeightMap[height] = BlockInfoHeight{ + IsProcessing: false, + RetryCount: 0, + } -func (q *ArrayQueue[T]) MustGetQueue() T { +} +func (q *BtpHeightMapQueue) Dequeue() (BlockInfoHeight, error) { q.mu.Lock() defer q.mu.Unlock() if q.Size() == 0 { - panic("the size of queue is zero") + return BlockInfoHeight{}, fmt.Errorf("all element dequed") } + h := q.HeightQueue[0] - item := q.items[0] - return item -} + blockInfo := q.HeightMap[h] + delete(q.HeightMap, h) + q.HeightQueue = q.HeightQueue[1:] + return blockInfo, nil -func (q *ArrayQueue[T]) ItemExist(target interface{}) bool { - q.mu.Lock() - defer q.mu.Unlock() - for _, item := range q.items { - if item.Exists(target) { - return true - } - } - return false } - -func (q *ArrayQueue[T]) GetQueue() (T, error) { - q.mu.Lock() - defer q.mu.Unlock() - if q.Size() == 0 { - var element T - return element, fmt.Errorf("The queue is of empty length") +func (m *BtpHeightMapQueue) MustGetQueue() uint64 { + if m.Size() == 0 { + panic("the size of queue is zero") } - item := q.items[0] - return item, nil - + return m.HeightQueue[0] } - -func (q *ArrayQueue[T]) ReplaceQueue(index int, element T) { - q.mu.Lock() - defer q.mu.Unlock() - if index >= 0 && index < len(q.items) { - q.items[index] = element +func (m *BtpHeightMapQueue) GetQueue() (uint64, error) { + if m.Size() == 0 { + return 0, fmt.Errorf("The queue is of empty length") } + h := m.HeightQueue[0] + _, ok := m.HeightMap[h] + if !ok { + return 0, fmt.Errorf("Btp data of height %d missing from map", h) + } + return h, nil } -func (q *ArrayQueue[T]) Dequeue() (T, error) { - q.mu.Lock() - defer q.mu.Unlock() - if q.Size() == 0 { - var element T - return element, fmt.Errorf("all element dequed") +func (m *BtpHeightMapQueue) GetHeightInfo(h uint64) (BlockInfoHeight, error) { + b, ok := m.HeightMap[h] + if !ok { + return BlockInfoHeight{}, fmt.Errorf("Height Info not found %d", h) } - item := q.items[0] - q.items = q.items[1:] - return item, nil + return b, nil } -func (q *ArrayQueue[T]) Size() int { - return len(q.items) -} +func (m *BtpHeightMapQueue) ItemExist(height uint64) bool { + _, ok := m.HeightMap[height] + return ok -func NewBlockInfoHeightQueue[T ExistenceChecker]() *ArrayQueue[T] { - return &ArrayQueue[T]{ - items: make([]T, 0), - mu: &sync.Mutex{}, +} +func (m *BtpHeightMapQueue) ReplaceQueue(height uint64, b BlockInfoHeight) error { + m.mu.Lock() + defer m.mu.Unlock() + if _, ok := m.HeightMap[height]; !ok { + return fmt.Errorf("Height Info not found %d", height) } + m.HeightMap[height] = b + return nil +} +func (m *BtpHeightMapQueue) Size() int { + return len(m.HeightQueue) } diff --git a/relayer/processor/types_test.go b/relayer/processor/types_test.go index e6ea4a4ee..363230a3b 100644 --- a/relayer/processor/types_test.go +++ b/relayer/processor/types_test.go @@ -5,6 +5,7 @@ import ( ibcexported "github.com/cosmos/ibc-go/v7/modules/core/exported" "github.com/cosmos/relayer/v2/relayer/processor" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -43,3 +44,30 @@ func TestIBCHeaderCachePrune(t *testing.T) { require.Len(t, cache, 5) require.NotNil(t, cache[uint64(15)], cache[uint64(16)], cache[uint64(17)], cache[uint64(18)], cache[uint64(19)]) } + +func TestBtpQueue(t *testing.T) { + + q := processor.NewBtpHeightMapQueue() + + q.Enqueue(20) + q.Enqueue(30) + q.Enqueue(40) + + assert.Equal(t, q.Size(), 3) + + q.Dequeue() + assert.Equal(t, q.Size(), 2) + + // testing getQueue + h := uint64(40) + hInfo, err := q.GetHeightInfo(h) + assert.NoError(t, err) + assert.Equal(t, processor.BlockInfoHeight{IsProcessing: false, RetryCount: 0}, hInfo) + + replace := processor.BlockInfoHeight{IsProcessing: true, RetryCount: 2} + q.ReplaceQueue(h, replace) + hInfo, err = q.GetHeightInfo(h) + assert.NoError(t, err) + assert.Equal(t, replace, hInfo) + +} diff --git a/relayer/provider/provider.go b/relayer/provider/provider.go index 7cad0cb44..3bbd3eccb 100644 --- a/relayer/provider/provider.go +++ b/relayer/provider/provider.go @@ -217,6 +217,14 @@ func (r RelayerTxResponse) MarshalLogObject(enc zapcore.ObjectEncoder) error { return nil } +type MessageHeights map[uint64]uint64 + +type MessageHeightsInfo struct { + MessageHeights MessageHeights + StartSeq uint64 + EndSeq uint64 +} + type KeyProvider interface { CreateKeystore(path string) error KeystoreCreated(path string) bool @@ -429,7 +437,7 @@ type QueryProvider interface { // query packet info for sequence QuerySendPacket(ctx context.Context, srcChanID, srcPortID string, sequence uint64) (PacketInfo, error) QueryRecvPacket(ctx context.Context, dstChanID, dstPortID string, sequence uint64) (PacketInfo, error) - + QueryPacketMessageByEventHeight(ctx context.Context, eventType string, srcChanID, srcPortID string, sequence uint64, height uint64) (PacketInfo, error) // bank QueryBalance(ctx context.Context, keyName string) (sdk.Coins, error) QueryBalanceWithAddress(ctx context.Context, addr string) (sdk.Coins, error) @@ -467,6 +475,10 @@ type QueryProvider interface { QueryPacketCommitment(ctx context.Context, height int64, channelid, portid string, seq uint64) (comRes *chantypes.QueryPacketCommitmentResponse, err error) QueryPacketAcknowledgement(ctx context.Context, height int64, channelid, portid string, seq uint64) (ackRes *chantypes.QueryPacketAcknowledgementResponse, err error) QueryPacketReceipt(ctx context.Context, height int64, channelid, portid string, seq uint64) (recRes *chantypes.QueryPacketReceiptResponse, err error) + QueryPacketHeights(ctx context.Context, latestHeight int64, channelId, portId string, startSeq, endSeq uint64) (packetHeights MessageHeights, err error) + QueryAckHeights(ctx context.Context, latestHeight int64, channelId, portId string, startSeq, endSeq uint64) (packetHeights MessageHeights, err error) + QueryMissingPacketReceipts(ctx context.Context, latestHeight int64, channelId, portId string, startSeq, endSeq uint64) (missingReceipts []uint64, err error) + QueryNextSeqSend(ctx context.Context, height int64, channelid, portid string) (seq uint64, err error) // ics 20 - transfer QueryDenomTrace(ctx context.Context, denom string) (*transfertypes.DenomTrace, error) diff --git a/relayer/strategies.go b/relayer/strategies.go index 710589605..06d0e3a8e 100644 --- a/relayer/strategies.go +++ b/relayer/strategies.go @@ -9,12 +9,16 @@ import ( "time" "github.com/avast/retry-go/v4" + clienttypes "github.com/cosmos/ibc-go/v7/modules/core/02-client/types" "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" + "github.com/cosmos/ibc-go/v7/modules/core/exported" "github.com/cosmos/relayer/v2/relayer/chains/cosmos" "github.com/cosmos/relayer/v2/relayer/chains/icon" penumbraprocessor "github.com/cosmos/relayer/v2/relayer/chains/penumbra" "github.com/cosmos/relayer/v2/relayer/chains/wasm" + "github.com/cosmos/relayer/v2/relayer/common" "github.com/cosmos/relayer/v2/relayer/processor" + "github.com/cosmos/relayer/v2/relayer/provider" "go.uber.org/zap" ) @@ -33,26 +37,26 @@ const ( TwoMB = 2 * 1024 * 1024 ) -func timerChannel(ctx context.Context, log *zap.Logger, timerChan map[string]chan struct{}, chains map[string]*Chain) { - ticker := time.NewTicker(time.Hour) - defer ticker.Stop() - for { - NamedLoop: - select { - case <-ticker.C: - for _, c := range chains { - _, err := c.ChainProvider.QueryLatestHeight(ctx) - if err != nil { - log.Warn("Failed getting status of chain", zap.String("chain_id", c.ChainID()), zap.Error(err)) - break NamedLoop - } - } - for _, c := range timerChan { - c <- struct{}{} - } - } - } -} +// func timerChannel(ctx context.Context, log *zap.Logger, timerChan map[string]chan struct{}, chains map[string]*Chain) { +// ticker := time.NewTicker(time.Hour) +// defer ticker.Stop() +// for { +// NamedLoop: +// select { +// case <-ticker.C: +// for _, c := range chains { +// _, err := c.ChainProvider.QueryLatestHeight(ctx) +// if err != nil { +// log.Warn("Failed getting status of chain", zap.String("chain_id", c.ChainID()), zap.Error(err)) +// break NamedLoop +// } +// } +// for _, c := range timerChan { +// c <- struct{}{} +// } +// } +// } +// } // StartRelayer starts the main relaying loop and returns a channel that will contain any control-flow related errors. func StartRelayer( @@ -70,20 +74,20 @@ func StartRelayer( metrics *processor.PrometheusMetrics, ) chan error { errorChan := make(chan error, 1) - chans := make(map[string]chan struct{}) + // chans := make(map[string]chan struct{}) - for k := range chains { - chans[k] = make(chan struct{}) - } + // for k := range chains { + // chans[k] = make(chan struct{}) + // } - go timerChannel(ctx, log, chans, chains) + // go timerChannel(ctx, log, chans, chains) switch processorType { case ProcessorEvents: chainProcessors := make([]processor.ChainProcessor, 0, len(chains)) - for name, chain := range chains { - chainProcessors = append(chainProcessors, chain.chainProcessor(log, metrics, chans[name])) + for _, chain := range chains { + chainProcessors = append(chainProcessors, chain.chainProcessor(log, metrics)) } ePaths := make([]path, len(paths)) @@ -144,7 +148,7 @@ type path struct { } // chainProcessor returns the corresponding ChainProcessor implementation instance for a pathChain. -func (chain *Chain) chainProcessor(log *zap.Logger, metrics *processor.PrometheusMetrics, timerChan chan struct{}) processor.ChainProcessor { +func (chain *Chain) chainProcessor(log *zap.Logger, metrics *processor.PrometheusMetrics) processor.ChainProcessor { // Handle new ChainProcessor implementations as cases here switch p := chain.ChainProvider.(type) { case *penumbraprocessor.PenumbraProvider: @@ -152,9 +156,9 @@ func (chain *Chain) chainProcessor(log *zap.Logger, metrics *processor.Prometheu case *cosmos.CosmosProvider: return cosmos.NewCosmosChainProcessor(log, p, metrics) case *icon.IconProvider: - return icon.NewIconChainProcessor(log, p, metrics, timerChan) + return icon.NewIconChainProcessor(log, p, metrics) case *wasm.WasmProvider: - return wasm.NewWasmChainProcessor(log, p, metrics, timerChan) + return wasm.NewWasmChainProcessor(log, p, metrics) default: panic(fmt.Errorf("unsupported chain provider type: %T", chain.ChainProvider)) } @@ -552,3 +556,180 @@ func relayUnrelayedAcks(ctx context.Context, log *zap.Logger, src, dst *Chain, m return true } + +type SrcProviderClientState struct { + SrcChainProvider provider.ChainProvider + ClientState exported.ClientState + ClientId string +} + +func RunProofContextUpdate(ctx context.Context, log *zap.Logger, chains map[string]*Chain, paths []NamedPath, fromHeight int64) (uint64, error) { + + log.Info("running iconchain proof context change update") + + // finding the height to start from + srcProviderClientStates := make([]SrcProviderClientState, 0) + +pathloop: + for _, p := range paths { + + if strings.Contains(p.Path.Src.ClientID, common.IconLightClient) { + // src should be iconchain + chainId := p.Path.Src.ChainID + clientId := p.Path.Src.ClientID + for _, chain := range chains { + if chain.ChainID() == chainId { + // getting clientState + cs, err := chain.ChainProvider.QueryClientState(ctx, 0, clientId) + if err != nil { + log.Debug("error occured when fetching client state", + zap.String("chainid ", chainId), + zap.String("clientid", clientId)) + continue + } + srcProviderClientStates = append(srcProviderClientStates, SrcProviderClientState{ + SrcChainProvider: chain.ChainProvider, + ClientState: cs, + ClientId: clientId, + }) + continue pathloop + } + } + } + + if strings.Contains(p.Path.Dst.ClientID, common.IconLightClient) { + // src should be iconchain + chainId := p.Path.Dst.ChainID + clientId := p.Path.Dst.ClientID + for _, chain := range chains { + if chain.ChainID() == chainId { + // getting clientState + // all the chain 0 should return current height clientState + cs, err := chain.ChainProvider.QueryClientState(ctx, 0, clientId) + if err != nil { + log.Debug("error occured when fetching client state", + zap.String("chainid ", chainId), + zap.String("clientid", clientId)) + continue pathloop + } + srcProviderClientStates = append(srcProviderClientStates, SrcProviderClientState{ + SrcChainProvider: chain.ChainProvider, + ClientState: cs, + ClientId: clientId, + }) + continue pathloop + } + } + } + } + + startQueryHeight := uint64(0) + + if fromHeight > 0 { + startQueryHeight = uint64(fromHeight) + } else { + // find height to query From + for _, chainStruct := range srcProviderClientStates { + if chainStruct.ClientState == nil && chainStruct.ClientState.GetLatestHeight() == nil { + continue + } + h := chainStruct.ClientState.GetLatestHeight().GetRevisionHeight() + if startQueryHeight == 0 { + startQueryHeight = h + } + if h > 0 && startQueryHeight > h { + startQueryHeight = h + } + } + } + + if startQueryHeight == 0 { + return 0, nil + } + + // query all the height until the latest height + var chain provider.ChainProvider + for _, c := range chains { + // assumption: there will be only one icon module config in config.yaml + if strings.Contains(c.ChainProvider.Type(), common.IconModule) { + chain = c.ChainProvider + } + } + + iconChainProvider, ok := chain.(*icon.IconProvider) + if !ok { + return 0, fmt.Errorf("iconChainProvider not found in chain list") + } + + // this will move upto the latest height + btpBlockHeaders, uptoHeight, err := iconChainProvider.GetProofContextChangeHeaders(ctx, startQueryHeight) + if err != nil { + return 0, fmt.Errorf("failed to get proofContextChangeHeight %v", err) + } + + if len(btpBlockHeaders) == 0 { + log.Info("No btpHeight to update: clientContextdidn't changed") + } + + // updateClientMessage and create tx + for _, c := range srcProviderClientStates { + trustedHeight := clienttypes.Height{ + RevisionNumber: c.ClientState.GetLatestHeight().GetRevisionNumber(), + RevisionHeight: c.ClientState.GetLatestHeight().GetRevisionHeight(), + } + for _, blockHeader := range btpBlockHeaders { + + _, err := c.SrcChainProvider.QueryClientConsensusState(ctx, 0, c.ClientId, clienttypes.NewHeight(0, blockHeader.Height())) + if err == nil { + log.Info("clientHeight is already updated", + zap.String("chain id", c.SrcChainProvider.ChainId()), + zap.String("client id ", c.ClientId), + zap.Int64("client height", int64(blockHeader.Height()))) + continue + } + + trustedHeader, err := iconChainProvider.QueryIBCHeader(ctx, int64(trustedHeight.GetRevisionHeight())) + if err != nil { + return 0, fmt.Errorf(" query and update for chain %v", err) + } + + clientMessage, err := iconChainProvider.MsgUpdateClientHeader(blockHeader, trustedHeight, trustedHeader) + if err != nil { + return 0, fmt.Errorf("error occured: %v", err) + } + msg, err := c.SrcChainProvider.MsgUpdateClient(c.ClientId, clientMessage) + if err != nil { + return 0, fmt.Errorf("error occured: %v ", err) + } + + res, sucess, err := c.SrcChainProvider.SendMessage(ctx, msg, "") + if err != nil { + return 0, fmt.Errorf("tx not successfull %v", err) + } + + if !sucess { + return 0, fmt.Errorf("tx not successful, chainId: %s , clientId: %s ", + c.SrcChainProvider.ChainId(), + c.ClientId) + } + + log.Debug("update client successful", + zap.String("chain id ", c.SrcChainProvider.ChainId()), + zap.String("client id ", c.ClientId), + zap.String("txhash", res.TxHash)) + + // updating trustedHeight + cs, err := c.SrcChainProvider.QueryClientState(ctx, 0, c.ClientId) + if err != nil { + return 0, fmt.Errorf("error fetching clientState %v", err) + } + csLatestHeight := cs.GetLatestHeight() + trustedHeight = + clienttypes.Height{ + RevisionNumber: csLatestHeight.GetRevisionNumber(), + RevisionHeight: csLatestHeight.GetRevisionHeight(), + } + } + } + return uptoHeight, nil +}