diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e456c471..d5a46901d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ * [\#1178](https://github.com/cosmos/relayer/pull/1178) Add max-gas-amount parameter in chain configs. * [\#1180](https://github.com/cosmos/relayer/pull/1180) Update SDK from v0.47.0 to v0.47.2. * [\#1205](https://github.com/cosmos/relayer/pull/1205) Update ibc-go to v7.0.1. +* [\#1196](https://github.com/cosmos/relayer/pull/1196) Add missing `EventTypeChannelClosed` when parse from event. * [\#1179](https://github.com/cosmos/relayer/pull/1179) Add extension-options parameter in chain configs and update SDK to v0.47.3. * [\#1208](https://github.com/cosmos/relayer/pull/1208) Replace gogo/protobuf to cosmos/gogoproto. * [\#1221](https://github.com/cosmos/relayer/pull/1221) Update cometbft to v0.37.2 and ibc-go to v7.2.0. diff --git a/relayer/chains/cosmos/cosmos_chain_processor.go b/relayer/chains/cosmos/cosmos_chain_processor.go index 8f706a0e1..aa93f64ac 100644 --- a/relayer/chains/cosmos/cosmos_chain_processor.go +++ b/relayer/chains/cosmos/cosmos_chain_processor.go @@ -17,6 +17,7 @@ import ( "github.com/cosmos/relayer/v2/relayer/provider" ctypes "github.com/cometbft/cometbft/rpc/core/types" + "github.com/cosmos/relayer/v2/relayer/chains" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -89,22 +90,22 @@ const ( // latestClientState is a map of clientID to the latest clientInfo for that client. type latestClientState map[string]provider.ClientState -func (l latestClientState) update(ctx context.Context, clientInfo clientInfo, ccp *CosmosChainProcessor) { - existingClientInfo, ok := l[clientInfo.clientID] +func (l latestClientState) update(ctx context.Context, clientInfo chains.ClientInfo, ccp *CosmosChainProcessor) { + existingClientInfo, ok := l[clientInfo.ClientID] var trustingPeriod time.Duration if ok { - if clientInfo.consensusHeight.LT(existingClientInfo.ConsensusHeight) { + if clientInfo.ConsensusHeight.LT(existingClientInfo.ConsensusHeight) { // height is less than latest, so no-op return } trustingPeriod = existingClientInfo.TrustingPeriod } if trustingPeriod == 0 { - cs, err := ccp.chainProvider.queryTMClientState(ctx, 0, clientInfo.clientID) + cs, err := ccp.chainProvider.queryTMClientState(ctx, 0, clientInfo.ClientID) if err != nil { ccp.log.Error( "Failed to query client state to get trusting period", - zap.String("client_id", clientInfo.clientID), + zap.String("client_id", clientInfo.ClientID), zap.Error(err), ) return @@ -114,7 +115,7 @@ func (l latestClientState) update(ctx context.Context, clientInfo clientInfo, cc clientState := clientInfo.ClientState(trustingPeriod) // update latest if no existing state or provided consensus height is newer - l[clientInfo.clientID] = clientState + l[clientInfo.ClientID] = clientState } // Provider returns the ChainProvider, which provides the methods for querying, assembling IBC messages, and sending transactions. @@ -467,7 +468,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu // tx was not successful continue } - messages := ibcMessagesFromEvents(ccp.log, tx.Events, chainID, heightUint64, base64Encoded) + messages := chains.IbcMessagesFromEvents(ccp.log, tx.Events, chainID, heightUint64, base64Encoded) for _, m := range messages { ccp.handleMessage(ctx, m, ibcMessagesCache) diff --git a/relayer/chains/cosmos/event_parser.go b/relayer/chains/cosmos/event_parser.go index 38ef47288..46745cfcd 100644 --- a/relayer/chains/cosmos/event_parser.go +++ b/relayer/chains/cosmos/event_parser.go @@ -1,472 +1,16 @@ package cosmos import ( - "encoding/base64" - "encoding/hex" - "fmt" - "strconv" - "strings" - "time" - abci "github.com/cometbft/cometbft/abci/types" - sdk "github.com/cosmos/cosmos-sdk/types" - clienttypes "github.com/cosmos/ibc-go/v7/modules/core/02-client/types" - conntypes "github.com/cosmos/ibc-go/v7/modules/core/03-connection/types" - chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" - "github.com/cosmos/relayer/v2/relayer/processor" - "github.com/cosmos/relayer/v2/relayer/provider" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" + "github.com/cosmos/relayer/v2/relayer/chains" ) -// ibcMessage is the type used for parsing all possible properties of IBC messages -type ibcMessage struct { - eventType string - info ibcMessageInfo -} - -type ibcMessageInfo interface { - parseAttrs(log *zap.Logger, attrs []sdk.Attribute) - MarshalLogObject(enc zapcore.ObjectEncoder) error -} - func (ccp *CosmosChainProcessor) ibcMessagesFromBlockEvents( beginBlockEvents, endBlockEvents []abci.Event, height uint64, base64Encoded bool, -) (res []ibcMessage) { +) (res []chains.IbcMessage) { chainID := ccp.chainProvider.ChainId() - res = append(res, ibcMessagesFromEvents(ccp.log, beginBlockEvents, chainID, height, base64Encoded)...) - res = append(res, ibcMessagesFromEvents(ccp.log, endBlockEvents, chainID, height, base64Encoded)...) + res = append(res, chains.IbcMessagesFromEvents(ccp.log, beginBlockEvents, chainID, height, base64Encoded)...) + res = append(res, chains.IbcMessagesFromEvents(ccp.log, endBlockEvents, chainID, height, base64Encoded)...) return res } - -func parseBase64Event(log *zap.Logger, event abci.Event) sdk.StringEvent { - evt := sdk.StringEvent{Type: event.Type} - for _, attr := range event.Attributes { - key, err := base64.StdEncoding.DecodeString(attr.Key) - if err != nil { - log.Error("Failed to decode legacy key as base64", zap.String("base64", attr.Key), zap.Error(err)) - continue - } - value, err := base64.StdEncoding.DecodeString(attr.Value) - if err != nil { - log.Error("Failed to decode legacy value as base64", zap.String("base64", attr.Value), zap.Error(err)) - continue - } - evt.Attributes = append(evt.Attributes, sdk.Attribute{ - Key: string(key), - Value: string(value), - }) - } - return evt -} - -// ibcMessagesFromTransaction parses all events within a transaction to find IBC messages -func ibcMessagesFromEvents( - log *zap.Logger, - events []abci.Event, - chainID string, - height uint64, - base64Encoded bool, -) (messages []ibcMessage) { - for _, event := range events { - var evt sdk.StringEvent - if base64Encoded { - evt = parseBase64Event(log, event) - } else { - evt = sdk.StringifyEvent(event) - } - m := parseIBCMessageFromEvent(log, evt, chainID, height) - if m == nil || m.info == nil { - // Not an IBC message, don't need to log here - continue - } - messages = append(messages, *m) - } - return messages -} - -func parseIBCMessageFromEvent( - log *zap.Logger, - event sdk.StringEvent, - chainID string, - height uint64, -) *ibcMessage { - switch event.Type { - case chantypes.EventTypeSendPacket, chantypes.EventTypeRecvPacket, chantypes.EventTypeWriteAck, - chantypes.EventTypeAcknowledgePacket, chantypes.EventTypeTimeoutPacket, - chantypes.EventTypeTimeoutPacketOnClose: - pi := &packetInfo{Height: height} - pi.parseAttrs(log, event.Attributes) - return &ibcMessage{ - eventType: event.Type, - info: pi, - } - case chantypes.EventTypeChannelOpenInit, chantypes.EventTypeChannelOpenTry, - chantypes.EventTypeChannelOpenAck, chantypes.EventTypeChannelOpenConfirm, - chantypes.EventTypeChannelCloseInit, chantypes.EventTypeChannelCloseConfirm: - ci := &channelInfo{Height: height} - ci.parseAttrs(log, event.Attributes) - return &ibcMessage{ - eventType: event.Type, - info: ci, - } - case conntypes.EventTypeConnectionOpenInit, conntypes.EventTypeConnectionOpenTry, - conntypes.EventTypeConnectionOpenAck, conntypes.EventTypeConnectionOpenConfirm: - ci := &connectionInfo{Height: height} - ci.parseAttrs(log, event.Attributes) - return &ibcMessage{ - eventType: event.Type, - info: ci, - } - case clienttypes.EventTypeCreateClient, clienttypes.EventTypeUpdateClient, - clienttypes.EventTypeUpgradeClient, clienttypes.EventTypeSubmitMisbehaviour, - clienttypes.EventTypeUpdateClientProposal: - ci := new(clientInfo) - ci.parseAttrs(log, event.Attributes) - return &ibcMessage{ - eventType: event.Type, - info: ci, - } - - case string(processor.ClientICQTypeRequest), string(processor.ClientICQTypeResponse): - ci := &clientICQInfo{ - Height: height, - Source: chainID, - } - ci.parseAttrs(log, event.Attributes) - return &ibcMessage{ - eventType: event.Type, - info: ci, - } - } - return nil -} - -func (msg *ibcMessage) parseIBCPacketReceiveMessageFromEvent( - log *zap.Logger, - event sdk.StringEvent, - chainID string, - height uint64, -) *ibcMessage { - var pi *packetInfo - if msg.info == nil { - pi = &packetInfo{Height: height} - msg.info = pi - } else { - pi = msg.info.(*packetInfo) - } - pi.parseAttrs(log, event.Attributes) - if event.Type != chantypes.EventTypeWriteAck { - msg.eventType = event.Type - } - return msg -} - -// clientInfo contains the consensus height of the counterparty chain for a client. -type clientInfo struct { - clientID string - consensusHeight clienttypes.Height - header []byte -} - -func (c clientInfo) ClientState(trustingPeriod time.Duration) provider.ClientState { - return provider.ClientState{ - ClientID: c.clientID, - ConsensusHeight: c.consensusHeight, - TrustingPeriod: trustingPeriod, - Header: c.header, - } -} - -func (res *clientInfo) MarshalLogObject(enc zapcore.ObjectEncoder) error { - enc.AddString("client_id", res.clientID) - enc.AddUint64("consensus_height", res.consensusHeight.RevisionHeight) - enc.AddUint64("consensus_height_revision", res.consensusHeight.RevisionNumber) - return nil -} - -func (res *clientInfo) parseAttrs(log *zap.Logger, attributes []sdk.Attribute) { - for _, attr := range attributes { - res.parseClientAttribute(log, attr) - } -} - -func (res *clientInfo) parseClientAttribute(log *zap.Logger, attr sdk.Attribute) { - switch attr.Key { - case clienttypes.AttributeKeyClientID: - res.clientID = attr.Value - case clienttypes.AttributeKeyConsensusHeight: - revisionSplit := strings.Split(attr.Value, "-") - if len(revisionSplit) != 2 { - log.Error("Error parsing client consensus height", - zap.String("client_id", res.clientID), - zap.String("value", attr.Value), - ) - return - } - revisionNumberString := revisionSplit[0] - revisionNumber, err := strconv.ParseUint(revisionNumberString, 10, 64) - if err != nil { - log.Error("Error parsing client consensus height revision number", - zap.Error(err), - ) - return - } - revisionHeightString := revisionSplit[1] - revisionHeight, err := strconv.ParseUint(revisionHeightString, 10, 64) - if err != nil { - log.Error("Error parsing client consensus height revision height", - zap.Error(err), - ) - return - } - res.consensusHeight = clienttypes.Height{ - RevisionNumber: revisionNumber, - RevisionHeight: revisionHeight, - } - case clienttypes.AttributeKeyHeader: - data, err := hex.DecodeString(attr.Value) - if err != nil { - log.Error("Error parsing client header", - zap.String("header", attr.Value), - zap.Error(err), - ) - return - } - res.header = data - } -} - -// alias type to the provider types, used for adding parser methods -type packetInfo provider.PacketInfo - -func (res *packetInfo) MarshalLogObject(enc zapcore.ObjectEncoder) error { - enc.AddUint64("sequence", res.Sequence) - enc.AddString("src_channel", res.SourceChannel) - enc.AddString("src_port", res.SourcePort) - enc.AddString("dst_channel", res.DestChannel) - enc.AddString("dst_port", res.DestPort) - return nil -} - -// parsePacketInfo is treated differently from the others since it can be constructed from the accumulation of multiple events -func (res *packetInfo) parseAttrs(log *zap.Logger, attrs []sdk.Attribute) { - for _, attr := range attrs { - res.parsePacketAttribute(log, attr) - } -} - -func (res *packetInfo) parsePacketAttribute(log *zap.Logger, attr sdk.Attribute) { - var err error - switch attr.Key { - case chantypes.AttributeKeySequence: - res.Sequence, err = strconv.ParseUint(attr.Value, 10, 64) - if err != nil { - log.Error("Error parsing packet sequence", - zap.String("value", attr.Value), - zap.Error(err), - ) - return - } - case chantypes.AttributeKeyTimeoutTimestamp: - res.TimeoutTimestamp, err = strconv.ParseUint(attr.Value, 10, 64) - if err != nil { - log.Error("Error parsing packet timestamp", - zap.Uint64("sequence", res.Sequence), - zap.String("value", attr.Value), - zap.Error(err), - ) - return - } - // NOTE: deprecated per IBC spec - case chantypes.AttributeKeyData: - res.Data = []byte(attr.Value) - case chantypes.AttributeKeyDataHex: - data, err := hex.DecodeString(attr.Value) - if err != nil { - log.Error("Error parsing packet data", - zap.Uint64("sequence", res.Sequence), - zap.Error(err), - ) - return - } - res.Data = data - // NOTE: deprecated per IBC spec - case chantypes.AttributeKeyAck: - res.Ack = []byte(attr.Value) - case chantypes.AttributeKeyAckHex: - data, err := hex.DecodeString(attr.Value) - if err != nil { - log.Error("Error parsing packet ack", - zap.Uint64("sequence", res.Sequence), - zap.String("value", attr.Value), - zap.Error(err), - ) - return - } - res.Ack = data - case chantypes.AttributeKeyTimeoutHeight: - timeoutSplit := strings.Split(attr.Value, "-") - if len(timeoutSplit) != 2 { - log.Error("Error parsing packet height timeout", - zap.Uint64("sequence", res.Sequence), - zap.String("value", attr.Value), - ) - return - } - revisionNumber, err := strconv.ParseUint(timeoutSplit[0], 10, 64) - if err != nil { - log.Error("Error parsing packet timeout height revision number", - zap.Uint64("sequence", res.Sequence), - zap.String("value", timeoutSplit[0]), - zap.Error(err), - ) - return - } - revisionHeight, err := strconv.ParseUint(timeoutSplit[1], 10, 64) - if err != nil { - log.Error("Error parsing packet timeout height revision height", - zap.Uint64("sequence", res.Sequence), - zap.String("value", timeoutSplit[1]), - zap.Error(err), - ) - return - } - res.TimeoutHeight = clienttypes.Height{ - RevisionNumber: revisionNumber, - RevisionHeight: revisionHeight, - } - case chantypes.AttributeKeySrcPort: - res.SourcePort = attr.Value - case chantypes.AttributeKeySrcChannel: - res.SourceChannel = attr.Value - case chantypes.AttributeKeyDstPort: - res.DestPort = attr.Value - case chantypes.AttributeKeyDstChannel: - res.DestChannel = attr.Value - case chantypes.AttributeKeyChannelOrdering: - res.ChannelOrder = attr.Value - } -} - -// alias type to the provider types, used for adding parser methods -type channelInfo provider.ChannelInfo - -func (res *channelInfo) MarshalLogObject(enc zapcore.ObjectEncoder) error { - enc.AddString("channel_id", res.ChannelID) - enc.AddString("port_id", res.PortID) - enc.AddString("counterparty_channel_id", res.CounterpartyChannelID) - enc.AddString("counterparty_port_id", res.CounterpartyPortID) - return nil -} - -func (res *channelInfo) parseAttrs(log *zap.Logger, attrs []sdk.Attribute) { - for _, attr := range attrs { - res.parseChannelAttribute(attr) - } -} - -// parseChannelAttribute parses channel attributes from an event. -// If the attribute has already been parsed into the channelInfo, -// it will not overwrite, and return true to inform the caller that -// the attribute already exists. -func (res *channelInfo) parseChannelAttribute(attr sdk.Attribute) { - switch attr.Key { - case chantypes.AttributeKeyPortID: - res.PortID = attr.Value - case chantypes.AttributeKeyChannelID: - res.ChannelID = attr.Value - case chantypes.AttributeCounterpartyPortID: - res.CounterpartyPortID = attr.Value - case chantypes.AttributeCounterpartyChannelID: - res.CounterpartyChannelID = attr.Value - case chantypes.AttributeKeyConnectionID: - res.ConnID = attr.Value - case chantypes.AttributeVersion: - res.Version = attr.Value - } -} - -// alias type to the provider types, used for adding parser methods -type connectionInfo provider.ConnectionInfo - -func (res *connectionInfo) MarshalLogObject(enc zapcore.ObjectEncoder) error { - enc.AddString("connection_id", res.ConnID) - enc.AddString("client_id", res.ClientID) - enc.AddString("counterparty_connection_id", res.CounterpartyConnID) - enc.AddString("counterparty_client_id", res.CounterpartyClientID) - return nil -} - -func (res *connectionInfo) parseAttrs(log *zap.Logger, attrs []sdk.Attribute) { - for _, attr := range attrs { - res.parseConnectionAttribute(attr) - } -} - -func (res *connectionInfo) parseConnectionAttribute(attr sdk.Attribute) { - switch attr.Key { - case conntypes.AttributeKeyConnectionID: - res.ConnID = attr.Value - case conntypes.AttributeKeyClientID: - res.ClientID = attr.Value - case conntypes.AttributeKeyCounterpartyConnectionID: - res.CounterpartyConnID = attr.Value - case conntypes.AttributeKeyCounterpartyClientID: - res.CounterpartyClientID = attr.Value - } -} - -type clientICQInfo struct { - Source string - Connection string - Chain string - QueryID provider.ClientICQQueryID - Type string - Request []byte - Height uint64 -} - -func (res *clientICQInfo) MarshalLogObject(enc zapcore.ObjectEncoder) error { - enc.AddString("connection_id", res.Connection) - enc.AddString("chain_id", res.Chain) - enc.AddString("query_id", string(res.QueryID)) - enc.AddString("type", res.Type) - enc.AddString("request", hex.EncodeToString(res.Request)) - enc.AddUint64("height", res.Height) - - return nil -} - -func (res *clientICQInfo) parseAttrs(log *zap.Logger, attrs []sdk.Attribute) { - for _, attr := range attrs { - if err := res.parseAttribute(attr); err != nil { - panic(fmt.Errorf("failed to parse attributes from client ICQ message: %w", err)) - } - } -} - -func (res *clientICQInfo) parseAttribute(attr sdk.Attribute) (err error) { - switch attr.Key { - case "connection_id": - res.Connection = attr.Value - case "chain_id": - res.Chain = attr.Value - case "query_id": - res.QueryID = provider.ClientICQQueryID(attr.Value) - case "type": - res.Type = attr.Value - case "request": - res.Request, err = hex.DecodeString(attr.Value) - if err != nil { - return err - } - case "height": - res.Height, err = strconv.ParseUint(attr.Value, 10, 64) - if err != nil { - return err - } - } - return nil -} diff --git a/relayer/chains/cosmos/event_parser_test.go b/relayer/chains/cosmos/event_parser_test.go index 95106356b..bba49947d 100644 --- a/relayer/chains/cosmos/event_parser_test.go +++ b/relayer/chains/cosmos/event_parser_test.go @@ -13,6 +13,8 @@ import ( "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" "go.uber.org/zap" + + "github.com/cosmos/relayer/v2/relayer/chains" ) func TestParsePacket(t *testing.T) { @@ -62,8 +64,8 @@ func TestParsePacket(t *testing.T) { }, } - parsed := new(packetInfo) - parsed.parseAttrs(zap.NewNop(), packetEventAttributes) + parsed := new(chains.PacketInfo) + parsed.ParseAttrs(zap.NewNop(), packetEventAttributes) packetData, err := hex.DecodeString(testPacketDataHex) require.NoError(t, err, "error decoding test packet data") @@ -105,20 +107,20 @@ func TestParseClient(t *testing.T) { }, } - parsed := new(clientInfo) - parsed.parseAttrs(zap.NewNop(), clientEventAttributes) + parsed := new(chains.ClientInfo) + parsed.ParseAttrs(zap.NewNop(), clientEventAttributes) clientHeader, err := hex.DecodeString(testClientHeader) require.NoError(t, err, "error parsing test client header") - require.Empty(t, cmp.Diff(*parsed, clientInfo{ - clientID: testClientID1, - consensusHeight: clienttypes.Height{ + require.Empty(t, cmp.Diff(*parsed, *chains.NewClientInfo( + testClientID1, + clienttypes.Height{ RevisionNumber: uint64(1), RevisionHeight: uint64(1023), }, - header: clientHeader, - }, cmp.AllowUnexported(clientInfo{}, clienttypes.Height{})), "parsed client info does not match expected") + clientHeader, + ), cmp.AllowUnexported(chains.ClientInfo{}, clienttypes.Height{})), "parsed client info does not match expected") } func TestParseChannel(t *testing.T) { @@ -153,8 +155,8 @@ func TestParseChannel(t *testing.T) { }, } - parsed := new(channelInfo) - parsed.parseAttrs(zap.NewNop(), channelEventAttributes) + parsed := new(chains.ChannelInfo) + parsed.ParseAttrs(zap.NewNop(), channelEventAttributes) require.Empty(t, cmp.Diff(provider.ChannelInfo(*parsed), provider.ChannelInfo{ ConnID: testConnectionID1, @@ -192,8 +194,8 @@ func TestParseConnection(t *testing.T) { }, } - parsed := new(connectionInfo) - parsed.parseAttrs(zap.NewNop(), connectionEventAttributes) + parsed := new(chains.ConnectionInfo) + parsed.ParseAttrs(zap.NewNop(), connectionEventAttributes) require.Empty(t, cmp.Diff(provider.ConnectionInfo(*parsed), provider.ConnectionInfo{ ClientID: testClientID1, @@ -300,34 +302,35 @@ func TestParseEventLogs(t *testing.T) { }, } - ibcMessages := ibcMessagesFromEvents(zap.NewNop(), events, "", 0, false) + ibcMessages := chains.IbcMessagesFromEvents(zap.NewNop(), events, "", 0, false) require.Len(t, ibcMessages, 3) msgUpdateClient := ibcMessages[0] - require.Equal(t, clienttypes.EventTypeUpdateClient, msgUpdateClient.eventType) + require.Equal(t, clienttypes.EventTypeUpdateClient, msgUpdateClient.EventType) - clientInfoParsed, isClientInfo := msgUpdateClient.info.(*clientInfo) + clientInfoParsed, isClientInfo := msgUpdateClient.Info.(*chains.ClientInfo) require.True(t, isClientInfo, "messageInfo is not clientInfo") - require.Empty(t, cmp.Diff(*clientInfoParsed, clientInfo{ - clientID: testClientID1, - consensusHeight: clienttypes.Height{ + require.Empty(t, cmp.Diff(*clientInfoParsed, *chains.NewClientInfo( + testClientID1, + clienttypes.Height{ RevisionNumber: uint64(1), RevisionHeight: uint64(1023), }, - }, cmp.AllowUnexported(clientInfo{}, clienttypes.Height{})), "parsed client info does not match expected") + nil, + ), cmp.AllowUnexported(chains.ClientInfo{}, clienttypes.Height{})), "parsed client info does not match expected") msgRecvPacket := ibcMessages[1] - require.Equal(t, chantypes.EventTypeRecvPacket, msgRecvPacket.eventType, "message event is not recv_packet") + require.Equal(t, chantypes.EventTypeRecvPacket, msgRecvPacket.EventType, "message event is not recv_packet") - packetInfoParsed, isPacketInfo := msgRecvPacket.info.(*packetInfo) + packetInfoParsed, isPacketInfo := msgRecvPacket.Info.(*chains.PacketInfo) require.True(t, isPacketInfo, "recv_packet messageInfo is not packetInfo") msgWriteAcknowledgement := ibcMessages[2] - require.Equal(t, chantypes.EventTypeWriteAck, msgWriteAcknowledgement.eventType, "message event is not write_acknowledgement") + require.Equal(t, chantypes.EventTypeWriteAck, msgWriteAcknowledgement.EventType, "message event is not write_acknowledgement") - ackPacketInfoParsed, isPacketInfo := msgWriteAcknowledgement.info.(*packetInfo) + ackPacketInfoParsed, isPacketInfo := msgWriteAcknowledgement.Info.(*chains.PacketInfo) require.True(t, isPacketInfo, "ack messageInfo is not packetInfo") packetAck, err := hex.DecodeString(testPacketAckHex) diff --git a/relayer/chains/cosmos/message_handlers.go b/relayer/chains/cosmos/message_handlers.go index 029ddd652..5f9fce7b1 100644 --- a/relayer/chains/cosmos/message_handlers.go +++ b/relayer/chains/cosmos/message_handlers.go @@ -6,24 +6,25 @@ import ( conntypes "github.com/cosmos/ibc-go/v7/modules/core/03-connection/types" chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" + "github.com/cosmos/relayer/v2/relayer/chains" "github.com/cosmos/relayer/v2/relayer/processor" "github.com/cosmos/relayer/v2/relayer/provider" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) -func (ccp *CosmosChainProcessor) handleMessage(ctx context.Context, m ibcMessage, c processor.IBCMessagesCache) { - switch t := m.info.(type) { - case *packetInfo: - ccp.handlePacketMessage(m.eventType, provider.PacketInfo(*t), c) - case *channelInfo: - ccp.handleChannelMessage(m.eventType, provider.ChannelInfo(*t), c) - case *connectionInfo: - ccp.handleConnectionMessage(m.eventType, provider.ConnectionInfo(*t), c) - case *clientInfo: - ccp.handleClientMessage(ctx, m.eventType, *t) - case *clientICQInfo: - ccp.handleClientICQMessage(m.eventType, provider.ClientICQInfo(*t), c) +func (ccp *CosmosChainProcessor) handleMessage(ctx context.Context, m chains.IbcMessage, c processor.IBCMessagesCache) { + switch t := m.Info.(type) { + case *chains.PacketInfo: + ccp.handlePacketMessage(m.EventType, provider.PacketInfo(*t), c) + case *chains.ChannelInfo: + ccp.handleChannelMessage(m.EventType, provider.ChannelInfo(*t), c) + case *chains.ConnectionInfo: + ccp.handleConnectionMessage(m.EventType, provider.ConnectionInfo(*t), c) + case *chains.ClientInfo: + ccp.handleClientMessage(ctx, m.EventType, *t) + case *chains.ClientICQInfo: + ccp.handleClientICQMessage(m.EventType, provider.ClientICQInfo(*t), c) } } @@ -87,7 +88,7 @@ func (ccp *CosmosChainProcessor) handleChannelMessage(eventType string, ci provi case chantypes.EventTypeChannelOpenAck, chantypes.EventTypeChannelOpenConfirm: ccp.channelStateCache.SetOpen(channelKey, true, ci.Order) ccp.logChannelOpenMessage(eventType, ci) - case chantypes.EventTypeChannelCloseConfirm: + case chantypes.EventTypeChannelClosed, chantypes.EventTypeChannelCloseConfirm: for k := range ccp.channelStateCache { if k.PortID == ci.PortID && k.ChannelID == ci.ChannelID { ccp.channelStateCache.SetOpen(channelKey, false, ci.Order) @@ -132,9 +133,9 @@ func (ccp *CosmosChainProcessor) handleConnectionMessage(eventType string, ci pr ccp.logConnectionMessage(eventType, ci) } -func (ccp *CosmosChainProcessor) handleClientMessage(ctx context.Context, eventType string, ci clientInfo) { +func (ccp *CosmosChainProcessor) handleClientMessage(ctx context.Context, eventType string, ci chains.ClientInfo) { ccp.latestClientState.update(ctx, ci, ccp) - ccp.logObservedIBCMessage(eventType, zap.String("client_id", ci.clientID)) + ccp.logObservedIBCMessage(eventType, zap.String("client_id", ci.ClientID)) } func (ccp *CosmosChainProcessor) handleClientICQMessage( diff --git a/relayer/chains/cosmos/query.go b/relayer/chains/cosmos/query.go index d41f25bdc..665d851aa 100644 --- a/relayer/chains/cosmos/query.go +++ b/relayer/chains/cosmos/query.go @@ -32,6 +32,7 @@ import ( host "github.com/cosmos/ibc-go/v7/modules/core/24-host" ibcexported "github.com/cosmos/ibc-go/v7/modules/core/exported" tmclient "github.com/cosmos/ibc-go/v7/modules/light-clients/07-tendermint" + "github.com/cosmos/relayer/v2/relayer/chains" "github.com/cosmos/relayer/v2/relayer/provider" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -43,7 +44,7 @@ const PaginationDelay = 10 * time.Millisecond var _ provider.QueryProvider = &CosmosProvider{} // queryIBCMessages returns an array of IBC messages given a tag -func (cc *CosmosProvider) queryIBCMessages(ctx context.Context, log *zap.Logger, page, limit int, query string, base64Encoded bool) ([]ibcMessage, error) { +func (cc *CosmosProvider) queryIBCMessages(ctx context.Context, log *zap.Logger, page, limit int, query string, base64Encoded bool) ([]chains.IbcMessage, error) { if query == "" { return nil, errors.New("query string must be provided") } @@ -58,7 +59,7 @@ func (cc *CosmosProvider) queryIBCMessages(ctx context.Context, log *zap.Logger, var eg errgroup.Group chainID := cc.ChainId() - var ibcMsgs []ibcMessage + var ibcMsgs []chains.IbcMessage var mu sync.Mutex eg.Go(func() error { @@ -79,8 +80,8 @@ func (cc *CosmosProvider) queryIBCMessages(ctx context.Context, log *zap.Logger, mu.Lock() defer mu.Unlock() - ibcMsgs = append(ibcMsgs, ibcMessagesFromEvents(log, block.BeginBlockEvents, chainID, 0, base64Encoded)...) - ibcMsgs = append(ibcMsgs, ibcMessagesFromEvents(log, block.EndBlockEvents, chainID, 0, base64Encoded)...) + ibcMsgs = append(ibcMsgs, chains.IbcMessagesFromEvents(log, block.BeginBlockEvents, chainID, 0, base64Encoded)...) + ibcMsgs = append(ibcMsgs, chains.IbcMessagesFromEvents(log, block.EndBlockEvents, chainID, 0, base64Encoded)...) return nil }) @@ -97,7 +98,7 @@ func (cc *CosmosProvider) queryIBCMessages(ctx context.Context, log *zap.Logger, mu.Lock() defer mu.Unlock() for _, tx := range res.Txs { - ibcMsgs = append(ibcMsgs, ibcMessagesFromEvents(log, tx.TxResult.Events, chainID, 0, base64Encoded)...) + ibcMsgs = append(ibcMsgs, chains.IbcMessagesFromEvents(log, tx.TxResult.Events, chainID, 0, base64Encoded)...) } return nil @@ -1025,10 +1026,10 @@ func (cc *CosmosProvider) QuerySendPacket( return provider.PacketInfo{}, err } for _, msg := range ibcMsgs { - if msg.eventType != chantypes.EventTypeSendPacket { + if msg.EventType != chantypes.EventTypeSendPacket { continue } - if pi, ok := msg.info.(*packetInfo); ok { + if pi, ok := msg.Info.(*chains.PacketInfo); ok { if pi.SourceChannel == srcChanID && pi.SourcePort == srcPortID && pi.Sequence == sequence { return provider.PacketInfo(*pi), nil } @@ -1054,10 +1055,10 @@ func (cc *CosmosProvider) QueryRecvPacket( return provider.PacketInfo{}, err } for _, msg := range ibcMsgs { - if msg.eventType != chantypes.EventTypeWriteAck { + if msg.EventType != chantypes.EventTypeWriteAck { continue } - if pi, ok := msg.info.(*packetInfo); ok { + if pi, ok := msg.Info.(*chains.PacketInfo); ok { if pi.DestChannel == dstChanID && pi.DestPort == dstPortID && pi.Sequence == sequence { return provider.PacketInfo(*pi), nil } diff --git a/relayer/chains/parsing.go b/relayer/chains/parsing.go new file mode 100644 index 000000000..8a267072f --- /dev/null +++ b/relayer/chains/parsing.go @@ -0,0 +1,458 @@ +package chains + +import ( + "encoding/base64" + "encoding/hex" + "fmt" + "strconv" + "strings" + "time" + + abci "github.com/cometbft/cometbft/abci/types" + sdk "github.com/cosmos/cosmos-sdk/types" + clienttypes "github.com/cosmos/ibc-go/v7/modules/core/02-client/types" + conntypes "github.com/cosmos/ibc-go/v7/modules/core/03-connection/types" + chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" + "github.com/cosmos/relayer/v2/relayer/processor" + "github.com/cosmos/relayer/v2/relayer/provider" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// IbcMessage is the type used for parsing all possible properties of IBC messages +type IbcMessage struct { + EventType string + Info ibcMessageInfo +} + +type ibcMessageInfo interface { + ParseAttrs(log *zap.Logger, attrs []sdk.Attribute) + MarshalLogObject(enc zapcore.ObjectEncoder) error +} + +func parseBase64Event(log *zap.Logger, event abci.Event) sdk.StringEvent { + evt := sdk.StringEvent{Type: event.Type} + for _, attr := range event.Attributes { + key, err := base64.StdEncoding.DecodeString(attr.Key) + if err != nil { + log.Error("Failed to decode legacy key as base64", zap.String("base64", attr.Key), zap.Error(err)) + continue + } + value, err := base64.StdEncoding.DecodeString(attr.Value) + if err != nil { + log.Error("Failed to decode legacy value as base64", zap.String("base64", attr.Value), zap.Error(err)) + continue + } + evt.Attributes = append(evt.Attributes, sdk.Attribute{ + Key: string(key), + Value: string(value), + }) + } + return evt +} + +// IbcMessagesFromEvents parses all events within a transaction to find IBC messages +func IbcMessagesFromEvents( + log *zap.Logger, + events []abci.Event, + chainID string, + height uint64, + base64Encoded bool, +) (messages []IbcMessage) { + for _, event := range events { + var evt sdk.StringEvent + if base64Encoded { + evt = parseBase64Event(log, event) + } else { + evt = sdk.StringifyEvent(event) + } + m := parseIBCMessageFromEvent(log, evt, chainID, height) + if m == nil || m.Info == nil { + // Not an IBC message, don't need to log here + continue + } + messages = append(messages, *m) + } + return messages +} + +type messageInfo interface { + ibcMessageInfo + ParseAttrs(log *zap.Logger, attrs []sdk.Attribute) +} + +func parseIBCMessageFromEvent( + log *zap.Logger, + event sdk.StringEvent, + chainID string, + height uint64, +) *IbcMessage { + var msgInfo messageInfo + switch event.Type { + case chantypes.EventTypeSendPacket, chantypes.EventTypeRecvPacket, chantypes.EventTypeWriteAck, + chantypes.EventTypeAcknowledgePacket, chantypes.EventTypeTimeoutPacket, + chantypes.EventTypeTimeoutPacketOnClose: + msgInfo = &PacketInfo{Height: height} + case chantypes.EventTypeChannelOpenInit, chantypes.EventTypeChannelOpenTry, + chantypes.EventTypeChannelOpenAck, chantypes.EventTypeChannelOpenConfirm, + chantypes.EventTypeChannelCloseInit, chantypes.EventTypeChannelClosed, chantypes.EventTypeChannelCloseConfirm: + msgInfo = &ChannelInfo{Height: height} + case conntypes.EventTypeConnectionOpenInit, conntypes.EventTypeConnectionOpenTry, + conntypes.EventTypeConnectionOpenAck, conntypes.EventTypeConnectionOpenConfirm: + msgInfo = &ConnectionInfo{Height: height} + case clienttypes.EventTypeCreateClient, clienttypes.EventTypeUpdateClient, + clienttypes.EventTypeUpgradeClient, clienttypes.EventTypeSubmitMisbehaviour, + clienttypes.EventTypeUpdateClientProposal: + msgInfo = new(ClientInfo) + case string(processor.ClientICQTypeRequest), string(processor.ClientICQTypeResponse): + msgInfo = &ClientICQInfo{ + Height: height, + Source: chainID, + } + default: + return nil + } + msgInfo.ParseAttrs(log, event.Attributes) + return &IbcMessage{ + EventType: event.Type, + Info: msgInfo, + } +} + +func (msg *IbcMessage) parseIBCPacketReceiveMessageFromEvent( + log *zap.Logger, + event sdk.StringEvent, + chainID string, + height uint64, +) *IbcMessage { + var pi *PacketInfo + if msg.Info == nil { + pi = &PacketInfo{Height: height} + msg.Info = pi + } else { + pi = msg.Info.(*PacketInfo) + } + pi.ParseAttrs(log, event.Attributes) + if event.Type != chantypes.EventTypeWriteAck { + msg.EventType = event.Type + } + return msg +} + +// ClientInfo contains the consensus height of the counterparty chain for a client. +type ClientInfo struct { + ClientID string + ConsensusHeight clienttypes.Height + Header []byte +} + +func NewClientInfo( + clientID string, + consensusHeight clienttypes.Height, + header []byte, +) *ClientInfo { + return &ClientInfo{ + clientID, consensusHeight, header, + } +} + +func (c ClientInfo) ClientState(trustingPeriod time.Duration) provider.ClientState { + return provider.ClientState{ + ClientID: c.ClientID, + ConsensusHeight: c.ConsensusHeight, + TrustingPeriod: trustingPeriod, + Header: c.Header, + } +} + +func (res *ClientInfo) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddString("client_id", res.ClientID) + enc.AddUint64("consensus_height", res.ConsensusHeight.RevisionHeight) + enc.AddUint64("consensus_height_revision", res.ConsensusHeight.RevisionNumber) + return nil +} + +func (res *ClientInfo) ParseAttrs(log *zap.Logger, attributes []sdk.Attribute) { + for _, attr := range attributes { + res.parseClientAttribute(log, attr) + } +} + +func (res *ClientInfo) parseClientAttribute(log *zap.Logger, attr sdk.Attribute) { + switch attr.Key { + case clienttypes.AttributeKeyClientID: + res.ClientID = attr.Value + case clienttypes.AttributeKeyConsensusHeight: + revisionSplit := strings.Split(attr.Value, "-") + if len(revisionSplit) != 2 { + log.Error("Error parsing client consensus height", + zap.String("client_id", res.ClientID), + zap.String("value", attr.Value), + ) + return + } + revisionNumberString := revisionSplit[0] + revisionNumber, err := strconv.ParseUint(revisionNumberString, 10, 64) + if err != nil { + log.Error("Error parsing client consensus height revision number", + zap.Error(err), + ) + return + } + revisionHeightString := revisionSplit[1] + revisionHeight, err := strconv.ParseUint(revisionHeightString, 10, 64) + if err != nil { + log.Error("Error parsing client consensus height revision height", + zap.Error(err), + ) + return + } + res.ConsensusHeight = clienttypes.Height{ + RevisionNumber: revisionNumber, + RevisionHeight: revisionHeight, + } + case clienttypes.AttributeKeyHeader: + data, err := hex.DecodeString(attr.Value) + if err != nil { + log.Error("Error parsing client header", + zap.String("header", attr.Value), + zap.Error(err), + ) + return + } + res.Header = data + } +} + +// alias type to the provider types, used for adding parser methods +type PacketInfo provider.PacketInfo + +func (res *PacketInfo) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddUint64("sequence", res.Sequence) + enc.AddString("src_channel", res.SourceChannel) + enc.AddString("src_port", res.SourcePort) + enc.AddString("dst_channel", res.DestChannel) + enc.AddString("dst_port", res.DestPort) + return nil +} + +// parsePacketInfo is treated differently from the others since it can be constructed from the accumulation of multiple events +func (res *PacketInfo) ParseAttrs(log *zap.Logger, attrs []sdk.Attribute) { + for _, attr := range attrs { + res.parsePacketAttribute(log, attr) + } +} + +func (res *PacketInfo) parsePacketAttribute(log *zap.Logger, attr sdk.Attribute) { + var err error + switch attr.Key { + case chantypes.AttributeKeySequence: + res.Sequence, err = strconv.ParseUint(attr.Value, 10, 64) + if err != nil { + log.Error("Error parsing packet sequence", + zap.String("value", attr.Value), + zap.Error(err), + ) + return + } + case chantypes.AttributeKeyTimeoutTimestamp: + res.TimeoutTimestamp, err = strconv.ParseUint(attr.Value, 10, 64) + if err != nil { + log.Error("Error parsing packet timestamp", + zap.Uint64("sequence", res.Sequence), + zap.String("value", attr.Value), + zap.Error(err), + ) + return + } + // NOTE: deprecated per IBC spec + case chantypes.AttributeKeyData: + res.Data = []byte(attr.Value) + case chantypes.AttributeKeyDataHex: + data, err := hex.DecodeString(attr.Value) + if err != nil { + log.Error("Error parsing packet data", + zap.Uint64("sequence", res.Sequence), + zap.Error(err), + ) + return + } + res.Data = data + // NOTE: deprecated per IBC spec + case chantypes.AttributeKeyAck: + res.Ack = []byte(attr.Value) + case chantypes.AttributeKeyAckHex: + data, err := hex.DecodeString(attr.Value) + if err != nil { + log.Error("Error parsing packet ack", + zap.Uint64("sequence", res.Sequence), + zap.String("value", attr.Value), + zap.Error(err), + ) + return + } + res.Ack = data + case chantypes.AttributeKeyTimeoutHeight: + timeoutSplit := strings.Split(attr.Value, "-") + if len(timeoutSplit) != 2 { + log.Error("Error parsing packet height timeout", + zap.Uint64("sequence", res.Sequence), + zap.String("value", attr.Value), + ) + return + } + revisionNumber, err := strconv.ParseUint(timeoutSplit[0], 10, 64) + if err != nil { + log.Error("Error parsing packet timeout height revision number", + zap.Uint64("sequence", res.Sequence), + zap.String("value", timeoutSplit[0]), + zap.Error(err), + ) + return + } + revisionHeight, err := strconv.ParseUint(timeoutSplit[1], 10, 64) + if err != nil { + log.Error("Error parsing packet timeout height revision height", + zap.Uint64("sequence", res.Sequence), + zap.String("value", timeoutSplit[1]), + zap.Error(err), + ) + return + } + res.TimeoutHeight = clienttypes.Height{ + RevisionNumber: revisionNumber, + RevisionHeight: revisionHeight, + } + case chantypes.AttributeKeySrcPort: + res.SourcePort = attr.Value + case chantypes.AttributeKeySrcChannel: + res.SourceChannel = attr.Value + case chantypes.AttributeKeyDstPort: + res.DestPort = attr.Value + case chantypes.AttributeKeyDstChannel: + res.DestChannel = attr.Value + case chantypes.AttributeKeyChannelOrdering: + res.ChannelOrder = attr.Value + } +} + +// alias type to the provider types, used for adding parser methods +type ChannelInfo provider.ChannelInfo + +func (res *ChannelInfo) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddString("channel_id", res.ChannelID) + enc.AddString("port_id", res.PortID) + enc.AddString("counterparty_channel_id", res.CounterpartyChannelID) + enc.AddString("counterparty_port_id", res.CounterpartyPortID) + return nil +} + +func (res *ChannelInfo) ParseAttrs(log *zap.Logger, attrs []sdk.Attribute) { + for _, attr := range attrs { + res.parseChannelAttribute(attr) + } +} + +// parseChannelAttribute parses channel attributes from an event. +// If the attribute has already been parsed into the channelInfo, +// it will not overwrite, and return true to inform the caller that +// the attribute already exists. +func (res *ChannelInfo) parseChannelAttribute(attr sdk.Attribute) { + switch attr.Key { + case chantypes.AttributeKeyPortID: + res.PortID = attr.Value + case chantypes.AttributeKeyChannelID: + res.ChannelID = attr.Value + case chantypes.AttributeCounterpartyPortID: + res.CounterpartyPortID = attr.Value + case chantypes.AttributeCounterpartyChannelID: + res.CounterpartyChannelID = attr.Value + case chantypes.AttributeKeyConnectionID: + res.ConnID = attr.Value + case chantypes.AttributeVersion: + res.Version = attr.Value + } +} + +// alias type to the provider types, used for adding parser methods +type ConnectionInfo provider.ConnectionInfo + +func (res *ConnectionInfo) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddString("connection_id", res.ConnID) + enc.AddString("client_id", res.ClientID) + enc.AddString("counterparty_connection_id", res.CounterpartyConnID) + enc.AddString("counterparty_client_id", res.CounterpartyClientID) + return nil +} + +func (res *ConnectionInfo) ParseAttrs(log *zap.Logger, attrs []sdk.Attribute) { + for _, attr := range attrs { + res.parseConnectionAttribute(attr) + } +} + +func (res *ConnectionInfo) parseConnectionAttribute(attr sdk.Attribute) { + switch attr.Key { + case conntypes.AttributeKeyConnectionID: + res.ConnID = attr.Value + case conntypes.AttributeKeyClientID: + res.ClientID = attr.Value + case conntypes.AttributeKeyCounterpartyConnectionID: + res.CounterpartyConnID = attr.Value + case conntypes.AttributeKeyCounterpartyClientID: + res.CounterpartyClientID = attr.Value + } +} + +type ClientICQInfo struct { + Source string + Connection string + Chain string + QueryID provider.ClientICQQueryID + Type string + Request []byte + Height uint64 +} + +func (res *ClientICQInfo) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddString("connection_id", res.Connection) + enc.AddString("chain_id", res.Chain) + enc.AddString("query_id", string(res.QueryID)) + enc.AddString("type", res.Type) + enc.AddString("request", hex.EncodeToString(res.Request)) + enc.AddUint64("height", res.Height) + + return nil +} + +func (res *ClientICQInfo) ParseAttrs(log *zap.Logger, attrs []sdk.Attribute) { + for _, attr := range attrs { + if err := res.parseAttribute(attr); err != nil { + panic(fmt.Errorf("failed to parse attributes from client ICQ message: %w", err)) + } + } +} + +func (res *ClientICQInfo) parseAttribute(attr sdk.Attribute) (err error) { + switch attr.Key { + case "connection_id": + res.Connection = attr.Value + case "chain_id": + res.Chain = attr.Value + case "query_id": + res.QueryID = provider.ClientICQQueryID(attr.Value) + case "type": + res.Type = attr.Value + case "request": + res.Request, err = hex.DecodeString(attr.Value) + if err != nil { + return err + } + case "height": + res.Height, err = strconv.ParseUint(attr.Value, 10, 64) + if err != nil { + return err + } + } + return nil +} diff --git a/relayer/chains/penumbra/event_parser.go b/relayer/chains/penumbra/event_parser.go index 319a8157b..19707325b 100644 --- a/relayer/chains/penumbra/event_parser.go +++ b/relayer/chains/penumbra/event_parser.go @@ -1,472 +1,16 @@ package penumbra import ( - "encoding/base64" - "encoding/hex" - "fmt" - "strconv" - "strings" - "time" - abci "github.com/cometbft/cometbft/abci/types" - sdk "github.com/cosmos/cosmos-sdk/types" - clienttypes "github.com/cosmos/ibc-go/v7/modules/core/02-client/types" - conntypes "github.com/cosmos/ibc-go/v7/modules/core/03-connection/types" - chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" - "github.com/cosmos/relayer/v2/relayer/processor" - "github.com/cosmos/relayer/v2/relayer/provider" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" + "github.com/cosmos/relayer/v2/relayer/chains" ) -// ibcMessage is the type used for parsing all possible properties of IBC messages -type ibcMessage struct { - eventType string - info ibcMessageInfo -} - -type ibcMessageInfo interface { - parseAttrs(log *zap.Logger, attrs []sdk.Attribute) - MarshalLogObject(enc zapcore.ObjectEncoder) error -} - func (ccp *PenumbraChainProcessor) ibcMessagesFromBlockEvents( beginBlockEvents, endBlockEvents []abci.Event, height uint64, base64Encoded bool, -) (res []ibcMessage) { +) (res []chains.IbcMessage) { chainID := ccp.chainProvider.ChainId() - res = append(res, ibcMessagesFromEvents(ccp.log, beginBlockEvents, chainID, height, base64Encoded)...) - res = append(res, ibcMessagesFromEvents(ccp.log, endBlockEvents, chainID, height, base64Encoded)...) + res = append(res, chains.IbcMessagesFromEvents(ccp.log, beginBlockEvents, chainID, height, base64Encoded)...) + res = append(res, chains.IbcMessagesFromEvents(ccp.log, endBlockEvents, chainID, height, base64Encoded)...) return res } - -func parseBase64Event(log *zap.Logger, event abci.Event) sdk.StringEvent { - evt := sdk.StringEvent{Type: event.Type} - for _, attr := range event.Attributes { - key, err := base64.StdEncoding.DecodeString(attr.Key) - if err != nil { - log.Error("Failed to decode legacy key as base64", zap.String("base64", attr.Key), zap.Error(err)) - continue - } - value, err := base64.StdEncoding.DecodeString(attr.Value) - if err != nil { - log.Error("Failed to decode legacy value as base64", zap.String("base64", attr.Value), zap.Error(err)) - continue - } - evt.Attributes = append(evt.Attributes, sdk.Attribute{ - Key: string(key), - Value: string(value), - }) - } - return evt -} - -// ibcMessagesFromTransaction parses all events within a transaction to find IBC messages -func ibcMessagesFromEvents( - log *zap.Logger, - events []abci.Event, - chainID string, - height uint64, - base64Encoded bool, -) (messages []ibcMessage) { - for _, event := range events { - var evt sdk.StringEvent - if base64Encoded { - evt = parseBase64Event(log, event) - } else { - evt = sdk.StringifyEvent(event) - } - m := parseIBCMessageFromEvent(log, evt, chainID, height) - if m == nil || m.info == nil { - // Not an IBC message, don't need to log here - continue - } - messages = append(messages, *m) - } - return messages -} - -func parseIBCMessageFromEvent( - log *zap.Logger, - event sdk.StringEvent, - chainID string, - height uint64, -) *ibcMessage { - switch event.Type { - case chantypes.EventTypeSendPacket, chantypes.EventTypeRecvPacket, chantypes.EventTypeWriteAck, - chantypes.EventTypeAcknowledgePacket, chantypes.EventTypeTimeoutPacket, - chantypes.EventTypeTimeoutPacketOnClose: - pi := &packetInfo{Height: height} - pi.parseAttrs(log, event.Attributes) - return &ibcMessage{ - eventType: event.Type, - info: pi, - } - case chantypes.EventTypeChannelOpenInit, chantypes.EventTypeChannelOpenTry, - chantypes.EventTypeChannelOpenAck, chantypes.EventTypeChannelOpenConfirm, - chantypes.EventTypeChannelCloseInit, chantypes.EventTypeChannelCloseConfirm: - ci := &channelInfo{Height: height} - ci.parseAttrs(log, event.Attributes) - return &ibcMessage{ - eventType: event.Type, - info: ci, - } - case conntypes.EventTypeConnectionOpenInit, conntypes.EventTypeConnectionOpenTry, - conntypes.EventTypeConnectionOpenAck, conntypes.EventTypeConnectionOpenConfirm: - ci := &connectionInfo{Height: height} - ci.parseAttrs(log, event.Attributes) - return &ibcMessage{ - eventType: event.Type, - info: ci, - } - case clienttypes.EventTypeCreateClient, clienttypes.EventTypeUpdateClient, - clienttypes.EventTypeUpgradeClient, clienttypes.EventTypeSubmitMisbehaviour, - clienttypes.EventTypeUpdateClientProposal: - ci := new(clientInfo) - ci.parseAttrs(log, event.Attributes) - return &ibcMessage{ - eventType: event.Type, - info: ci, - } - - case string(processor.ClientICQTypeRequest), string(processor.ClientICQTypeResponse): - ci := &clientICQInfo{ - Height: height, - Source: chainID, - } - ci.parseAttrs(log, event.Attributes) - return &ibcMessage{ - eventType: event.Type, - info: ci, - } - } - return nil -} - -func (msg *ibcMessage) parseIBCPacketReceiveMessageFromEvent( - log *zap.Logger, - event sdk.StringEvent, - chainID string, - height uint64, -) *ibcMessage { - var pi *packetInfo - if msg.info == nil { - pi = &packetInfo{Height: height} - msg.info = pi - } else { - pi = msg.info.(*packetInfo) - } - pi.parseAttrs(log, event.Attributes) - if event.Type != chantypes.EventTypeWriteAck { - msg.eventType = event.Type - } - return msg -} - -// clientInfo contains the consensus height of the counterparty chain for a client. -type clientInfo struct { - clientID string - consensusHeight clienttypes.Height - header []byte -} - -func (c clientInfo) ClientState(trustingPeriod time.Duration) provider.ClientState { - return provider.ClientState{ - ClientID: c.clientID, - ConsensusHeight: c.consensusHeight, - TrustingPeriod: trustingPeriod, - Header: c.header, - } -} - -func (res *clientInfo) MarshalLogObject(enc zapcore.ObjectEncoder) error { - enc.AddString("client_id", res.clientID) - enc.AddUint64("consensus_height", res.consensusHeight.RevisionHeight) - enc.AddUint64("consensus_height_revision", res.consensusHeight.RevisionNumber) - return nil -} - -func (res *clientInfo) parseAttrs(log *zap.Logger, attributes []sdk.Attribute) { - for _, attr := range attributes { - res.parseClientAttribute(log, attr) - } -} - -func (res *clientInfo) parseClientAttribute(log *zap.Logger, attr sdk.Attribute) { - switch attr.Key { - case clienttypes.AttributeKeyClientID: - res.clientID = attr.Value - case clienttypes.AttributeKeyConsensusHeight: - revisionSplit := strings.Split(attr.Value, "-") - if len(revisionSplit) != 2 { - log.Error("Error parsing client consensus height", - zap.String("client_id", res.clientID), - zap.String("value", attr.Value), - ) - return - } - revisionNumberString := revisionSplit[0] - revisionNumber, err := strconv.ParseUint(revisionNumberString, 10, 64) - if err != nil { - log.Error("Error parsing client consensus height revision number", - zap.Error(err), - ) - return - } - revisionHeightString := revisionSplit[1] - revisionHeight, err := strconv.ParseUint(revisionHeightString, 10, 64) - if err != nil { - log.Error("Error parsing client consensus height revision height", - zap.Error(err), - ) - return - } - res.consensusHeight = clienttypes.Height{ - RevisionNumber: revisionNumber, - RevisionHeight: revisionHeight, - } - case clienttypes.AttributeKeyHeader: - data, err := hex.DecodeString(attr.Value) - if err != nil { - log.Error("Error parsing client header", - zap.String("header", attr.Value), - zap.Error(err), - ) - return - } - res.header = data - } -} - -// alias type to the provider types, used for adding parser methods -type packetInfo provider.PacketInfo - -func (res *packetInfo) MarshalLogObject(enc zapcore.ObjectEncoder) error { - enc.AddUint64("sequence", res.Sequence) - enc.AddString("src_channel", res.SourceChannel) - enc.AddString("src_port", res.SourcePort) - enc.AddString("dst_channel", res.DestChannel) - enc.AddString("dst_port", res.DestPort) - return nil -} - -// parsePacketInfo is treated differently from the others since it can be constructed from the accumulation of multiple events -func (res *packetInfo) parseAttrs(log *zap.Logger, attrs []sdk.Attribute) { - for _, attr := range attrs { - res.parsePacketAttribute(log, attr) - } -} - -func (res *packetInfo) parsePacketAttribute(log *zap.Logger, attr sdk.Attribute) { - var err error - switch attr.Key { - case chantypes.AttributeKeySequence: - res.Sequence, err = strconv.ParseUint(attr.Value, 10, 64) - if err != nil { - log.Error("Error parsing packet sequence", - zap.String("value", attr.Value), - zap.Error(err), - ) - return - } - case chantypes.AttributeKeyTimeoutTimestamp: - res.TimeoutTimestamp, err = strconv.ParseUint(attr.Value, 10, 64) - if err != nil { - log.Error("Error parsing packet timestamp", - zap.Uint64("sequence", res.Sequence), - zap.String("value", attr.Value), - zap.Error(err), - ) - return - } - // NOTE: deprecated per IBC spec - case chantypes.AttributeKeyData: - res.Data = []byte(attr.Value) - case chantypes.AttributeKeyDataHex: - data, err := hex.DecodeString(attr.Value) - if err != nil { - log.Error("Error parsing packet data", - zap.Uint64("sequence", res.Sequence), - zap.Error(err), - ) - return - } - res.Data = data - // NOTE: deprecated per IBC spec - case chantypes.AttributeKeyAck: - res.Ack = []byte(attr.Value) - case chantypes.AttributeKeyAckHex: - data, err := hex.DecodeString(attr.Value) - if err != nil { - log.Error("Error parsing packet ack", - zap.Uint64("sequence", res.Sequence), - zap.String("value", attr.Value), - zap.Error(err), - ) - return - } - res.Ack = data - case chantypes.AttributeKeyTimeoutHeight: - timeoutSplit := strings.Split(attr.Value, "-") - if len(timeoutSplit) != 2 { - log.Error("Error parsing packet height timeout", - zap.Uint64("sequence", res.Sequence), - zap.String("value", attr.Value), - ) - return - } - revisionNumber, err := strconv.ParseUint(timeoutSplit[0], 10, 64) - if err != nil { - log.Error("Error parsing packet timeout height revision number", - zap.Uint64("sequence", res.Sequence), - zap.String("value", timeoutSplit[0]), - zap.Error(err), - ) - return - } - revisionHeight, err := strconv.ParseUint(timeoutSplit[1], 10, 64) - if err != nil { - log.Error("Error parsing packet timeout height revision height", - zap.Uint64("sequence", res.Sequence), - zap.String("value", timeoutSplit[1]), - zap.Error(err), - ) - return - } - res.TimeoutHeight = clienttypes.Height{ - RevisionNumber: revisionNumber, - RevisionHeight: revisionHeight, - } - case chantypes.AttributeKeySrcPort: - res.SourcePort = attr.Value - case chantypes.AttributeKeySrcChannel: - res.SourceChannel = attr.Value - case chantypes.AttributeKeyDstPort: - res.DestPort = attr.Value - case chantypes.AttributeKeyDstChannel: - res.DestChannel = attr.Value - case chantypes.AttributeKeyChannelOrdering: - res.ChannelOrder = attr.Value - } -} - -// alias type to the provider types, used for adding parser methods -type channelInfo provider.ChannelInfo - -func (res *channelInfo) MarshalLogObject(enc zapcore.ObjectEncoder) error { - enc.AddString("channel_id", res.ChannelID) - enc.AddString("port_id", res.PortID) - enc.AddString("counterparty_channel_id", res.CounterpartyChannelID) - enc.AddString("counterparty_port_id", res.CounterpartyPortID) - return nil -} - -func (res *channelInfo) parseAttrs(log *zap.Logger, attrs []sdk.Attribute) { - for _, attr := range attrs { - res.parseChannelAttribute(attr) - } -} - -// parseChannelAttribute parses channel attributes from an event. -// If the attribute has already been parsed into the channelInfo, -// it will not overwrite, and return true to inform the caller that -// the attribute already exists. -func (res *channelInfo) parseChannelAttribute(attr sdk.Attribute) { - switch attr.Key { - case chantypes.AttributeKeyPortID: - res.PortID = attr.Value - case chantypes.AttributeKeyChannelID: - res.ChannelID = attr.Value - case chantypes.AttributeCounterpartyPortID: - res.CounterpartyPortID = attr.Value - case chantypes.AttributeCounterpartyChannelID: - res.CounterpartyChannelID = attr.Value - case chantypes.AttributeKeyConnectionID: - res.ConnID = attr.Value - case chantypes.AttributeVersion: - res.Version = attr.Value - } -} - -// alias type to the provider types, used for adding parser methods -type connectionInfo provider.ConnectionInfo - -func (res *connectionInfo) MarshalLogObject(enc zapcore.ObjectEncoder) error { - enc.AddString("connection_id", res.ConnID) - enc.AddString("client_id", res.ClientID) - enc.AddString("counterparty_connection_id", res.CounterpartyConnID) - enc.AddString("counterparty_client_id", res.CounterpartyClientID) - return nil -} - -func (res *connectionInfo) parseAttrs(log *zap.Logger, attrs []sdk.Attribute) { - for _, attr := range attrs { - res.parseConnectionAttribute(attr) - } -} - -func (res *connectionInfo) parseConnectionAttribute(attr sdk.Attribute) { - switch attr.Key { - case conntypes.AttributeKeyConnectionID: - res.ConnID = attr.Value - case conntypes.AttributeKeyClientID: - res.ClientID = attr.Value - case conntypes.AttributeKeyCounterpartyConnectionID: - res.CounterpartyConnID = attr.Value - case conntypes.AttributeKeyCounterpartyClientID: - res.CounterpartyClientID = attr.Value - } -} - -type clientICQInfo struct { - Source string - Connection string - Chain string - QueryID provider.ClientICQQueryID - Type string - Request []byte - Height uint64 -} - -func (res *clientICQInfo) MarshalLogObject(enc zapcore.ObjectEncoder) error { - enc.AddString("connection_id", res.Connection) - enc.AddString("chain_id", res.Chain) - enc.AddString("query_id", string(res.QueryID)) - enc.AddString("type", res.Type) - enc.AddString("request", hex.EncodeToString(res.Request)) - enc.AddUint64("height", res.Height) - - return nil -} - -func (res *clientICQInfo) parseAttrs(log *zap.Logger, attrs []sdk.Attribute) { - for _, attr := range attrs { - if err := res.parseAttribute(attr); err != nil { - panic(fmt.Errorf("failed to parse attributes from client ICQ message: %w", err)) - } - } -} - -func (res *clientICQInfo) parseAttribute(attr sdk.Attribute) (err error) { - switch attr.Key { - case "connection_id": - res.Connection = attr.Value - case "chain_id": - res.Chain = attr.Value - case "query_id": - res.QueryID = provider.ClientICQQueryID(attr.Value) - case "type": - res.Type = attr.Value - case "request": - res.Request, err = hex.DecodeString(attr.Value) - if err != nil { - return err - } - case "height": - res.Height, err = strconv.ParseUint(attr.Value, 10, 64) - if err != nil { - return err - } - } - return nil -} diff --git a/relayer/chains/penumbra/message_handlers.go b/relayer/chains/penumbra/message_handlers.go index 22b570eb6..a67071190 100644 --- a/relayer/chains/penumbra/message_handlers.go +++ b/relayer/chains/penumbra/message_handlers.go @@ -3,22 +3,23 @@ package penumbra import ( conntypes "github.com/cosmos/ibc-go/v7/modules/core/03-connection/types" chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" + "github.com/cosmos/relayer/v2/relayer/chains" "github.com/cosmos/relayer/v2/relayer/processor" "github.com/cosmos/relayer/v2/relayer/provider" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) -func (pcp *PenumbraChainProcessor) handleMessage(m ibcMessage, c processor.IBCMessagesCache) { - switch t := m.info.(type) { - case *packetInfo: - pcp.handlePacketMessage(m.eventType, provider.PacketInfo(*t), c) - case *channelInfo: - pcp.handleChannelMessage(m.eventType, provider.ChannelInfo(*t), c) - case *connectionInfo: - pcp.handleConnectionMessage(m.eventType, provider.ConnectionInfo(*t), c) - case *clientInfo: - pcp.handleClientMessage(m.eventType, *t) +func (pcp *PenumbraChainProcessor) handleMessage(m chains.IbcMessage, c processor.IBCMessagesCache) { + switch t := m.Info.(type) { + case *chains.PacketInfo: + pcp.handlePacketMessage(m.EventType, provider.PacketInfo(*t), c) + case *chains.ChannelInfo: + pcp.handleChannelMessage(m.EventType, provider.ChannelInfo(*t), c) + case *chains.ConnectionInfo: + pcp.handleConnectionMessage(m.EventType, provider.ConnectionInfo(*t), c) + case *chains.ClientInfo: + pcp.handleClientMessage(m.EventType, *t) } } @@ -71,7 +72,7 @@ func (pcp *PenumbraChainProcessor) handleChannelMessage(eventType string, ci pro pcp.channelStateCache.SetOpen(channelKey, false, ci.Order) case chantypes.EventTypeChannelOpenAck, chantypes.EventTypeChannelOpenConfirm: pcp.channelStateCache.SetOpen(channelKey, true, ci.Order) - case chantypes.EventTypeChannelCloseConfirm: + case chantypes.EventTypeChannelClosed, chantypes.EventTypeChannelCloseConfirm: for k := range pcp.channelStateCache { if k.PortID == ci.PortID && k.ChannelID == ci.ChannelID { pcp.channelStateCache.SetOpen(channelKey, false, ci.Order) @@ -116,9 +117,9 @@ func (pcp *PenumbraChainProcessor) handleConnectionMessage(eventType string, ci pcp.logConnectionMessage(eventType, ci) } -func (pcp *PenumbraChainProcessor) handleClientMessage(eventType string, ci clientInfo) { +func (pcp *PenumbraChainProcessor) handleClientMessage(eventType string, ci chains.ClientInfo) { pcp.latestClientState.update(ci) - pcp.logObservedIBCMessage(eventType, zap.String("client_id", ci.clientID)) + pcp.logObservedIBCMessage(eventType, zap.String("client_id", ci.ClientID)) } func (pcp *PenumbraChainProcessor) logObservedIBCMessage(m string, fields ...zap.Field) { diff --git a/relayer/chains/penumbra/penumbra_chain_processor.go b/relayer/chains/penumbra/penumbra_chain_processor.go index 4142c1bf8..ffa1515db 100644 --- a/relayer/chains/penumbra/penumbra_chain_processor.go +++ b/relayer/chains/penumbra/penumbra_chain_processor.go @@ -15,6 +15,7 @@ import ( "github.com/cosmos/relayer/v2/relayer/processor" "github.com/cosmos/relayer/v2/relayer/provider" + "github.com/cosmos/relayer/v2/relayer/chains" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -82,9 +83,9 @@ type msgHandlerParams struct { // latestClientState is a map of clientID to the latest clientInfo for that client. type latestClientState map[string]provider.ClientState -func (l latestClientState) update(clientInfo clientInfo) { - existingClientInfo, ok := l[clientInfo.clientID] - if ok && clientInfo.consensusHeight.LT(existingClientInfo.ConsensusHeight) { +func (l latestClientState) update(clientInfo chains.ClientInfo) { + existingClientInfo, ok := l[clientInfo.ClientID] + if ok && clientInfo.ConsensusHeight.LT(existingClientInfo.ConsensusHeight) { // height is less than latest, so no-op return } @@ -92,7 +93,7 @@ func (l latestClientState) update(clientInfo clientInfo) { tp := time.Hour * 2 // update latest if no existing state or provided consensus height is newer - l[clientInfo.clientID] = clientInfo.ClientState(tp) + l[clientInfo.ClientID] = clientInfo.ClientState(tp) } // Provider returns the ChainProvider, which provides the methods for querying, assembling IBC messages, and sending transactions. @@ -369,7 +370,7 @@ func (pcp *PenumbraChainProcessor) queryCycle(ctx context.Context, persistence * // tx was not successful continue } - messages := ibcMessagesFromEvents(pcp.log, tx.Events, chainID, heightUint64, true) + messages := chains.IbcMessagesFromEvents(pcp.log, tx.Events, chainID, heightUint64, true) for _, m := range messages { pcp.handleMessage(m, ibcMessagesCache) diff --git a/relayer/chains/penumbra/query.go b/relayer/chains/penumbra/query.go index ce1f08ed6..4ba47ec4d 100644 --- a/relayer/chains/penumbra/query.go +++ b/relayer/chains/penumbra/query.go @@ -27,6 +27,7 @@ import ( host "github.com/cosmos/ibc-go/v7/modules/core/24-host" ibcexported "github.com/cosmos/ibc-go/v7/modules/core/exported" tmclient "github.com/cosmos/ibc-go/v7/modules/light-clients/07-tendermint" + "github.com/cosmos/relayer/v2/relayer/chains" "github.com/cosmos/relayer/v2/relayer/provider" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -901,7 +902,7 @@ func (cc *PenumbraProvider) QueryConsensusStateABCI(ctx context.Context, clientI } // queryIBCMessages returns an array of IBC messages given a tag -func (cc *PenumbraProvider) queryIBCMessages(ctx context.Context, log *zap.Logger, page, limit int, query string) ([]ibcMessage, error) { +func (cc *PenumbraProvider) queryIBCMessages(ctx context.Context, log *zap.Logger, page, limit int, query string) ([]chains.IbcMessage, error) { if query == "" { return nil, errors.New("query string must be provided") } @@ -918,10 +919,10 @@ func (cc *PenumbraProvider) queryIBCMessages(ctx context.Context, log *zap.Logge if err != nil { return nil, err } - var ibcMsgs []ibcMessage + var ibcMsgs []chains.IbcMessage chainID := cc.ChainId() for _, tx := range res.Txs { - ibcMsgs = append(ibcMsgs, ibcMessagesFromEvents(log, tx.TxResult.Events, chainID, 0, true)...) + ibcMsgs = append(ibcMsgs, chains.IbcMessagesFromEvents(log, tx.TxResult.Events, chainID, 0, true)...) } return ibcMsgs, nil @@ -948,10 +949,10 @@ func (cc *PenumbraProvider) QuerySendPacket( return provider.PacketInfo{}, err } for _, msg := range ibcMsgs { - if msg.eventType != chantypes.EventTypeSendPacket { + if msg.EventType != chantypes.EventTypeSendPacket { continue } - if pi, ok := msg.info.(*packetInfo); ok { + if pi, ok := msg.Info.(*chains.PacketInfo); ok { if pi.SourceChannel == srcChanID && pi.SourcePort == srcPortID && pi.Sequence == sequence { return provider.PacketInfo(*pi), nil } @@ -981,10 +982,10 @@ func (cc *PenumbraProvider) QueryRecvPacket( return provider.PacketInfo{}, err } for _, msg := range ibcMsgs { - if msg.eventType != chantypes.EventTypeWriteAck { + if msg.EventType != chantypes.EventTypeWriteAck { continue } - if pi, ok := msg.info.(*packetInfo); ok { + if pi, ok := msg.Info.(*chains.PacketInfo); ok { if pi.DestChannel == dstChanID && pi.DestPort == dstPortID && pi.Sequence == sequence { return provider.PacketInfo(*pi), nil }