Skip to content

Commit

Permalink
fix: decouple client/conn/icq from channel/packets
Browse files Browse the repository at this point in the history
  • Loading branch information
Giuseppe Valente committed May 5, 2023
1 parent 64c882c commit 7025511
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 77 deletions.
47 changes: 36 additions & 11 deletions interchaintest/multihop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,6 @@ func TestRelayerMultihop(t *testing.T) {
_ = ic.Close()
})

// Create clients and connections

// Single hop wasm1 -> osmosis
err = r.GeneratePath(ctx, eRep, wasm1.Config().ChainID, osmosis.Config().ChainID, pathWasm1Osmosis)
require.NoError(t, err)
Expand All @@ -468,18 +466,41 @@ func TestRelayerMultihop(t *testing.T) {
// Wait a few blocks for the clients to be created.
err = testutil.WaitForBlocks(ctx, 2, wasm1, osmosis)
require.NoError(t, err)

// Create clients and connections
checkConnections := func(chain *cosmos.CosmosChain, expected int) {
connections, err := r.GetConnections(ctx, eRep, chain.Config().ChainID)
require.NoError(t, err)
count := 0
for _, connection := range connections {
if connection.ClientID != "09-localhost" {
count++
}
}
require.Equal(t, expected, count, "unexpected number of connections for %s: %#v",
chain.Config().ChainID, connections)
}

err = r.CreateConnections(ctx, eRep, pathWasm1Osmosis)
require.NoError(t, err)
// Offset by 1 in wasmd for 09-localhost
checkConnections(wasm1, 1)
checkConnections(osmosis, 1)

// Single hop osmosis -> wasm2
err = r.GeneratePath(ctx, eRep, osmosis.Config().ChainID, wasm2.Config().ChainID, pathOsmosisWasm2)
require.NoError(t, err)
err = r.CreateClients(ctx, eRep, pathOsmosisWasm2, ibc.DefaultClientOpts())
require.NoError(t, err)
// Wait a few blocks for the clients to be created.
err = testutil.WaitForBlocks(ctx, 2, osmosis, wasm2)
require.NoError(t, err)

err = r.CreateConnections(ctx, eRep, pathOsmosisWasm2)
require.NoError(t, err)
err = testutil.WaitForBlocks(ctx, 2, osmosis, wasm2)
require.NoError(t, err)
checkConnections(osmosis, 2)
checkConnections(wasm2, 1)

// Multihop wasm1 -> wasm2
err = r.GeneratePath(ctx, eRep, wasm1.Config().ChainID, wasm2.Config().ChainID, pathWasm1Wasm2,
Expand All @@ -494,19 +515,23 @@ func TestRelayerMultihop(t *testing.T) {
err = testutil.WaitForBlocks(ctx, 2, osmosis, wasm2)
require.NoError(t, err)

wasm1Cfg, osmosisCfg, wasm2Cfg := wasm1.Config(), osmosis.Config(), wasm2.Config()
// Check that the underlying clients and connections didn't change
checkConnections(wasm1, 1)
checkConnections(osmosis, 2)
checkConnections(wasm1, 1)

wasm1Chans, err := r.GetChannels(ctx, eRep, wasm1Cfg.ChainID)
// Check that the channel is open
wasm1Chans, err := r.GetChannels(ctx, eRep, wasm1.Config().ChainID)
require.NoError(t, err)
require.Len(t, wasm1Chans, 1)
require.Equal(t, "STATE_OPEN", wasm1Chans[0].State)

wasm2Chans, err := r.GetChannels(ctx, eRep, wasm2Cfg.ChainID)
wasm2Chans, err := r.GetChannels(ctx, eRep, wasm2.Config().ChainID)
require.NoError(t, err)
require.Len(t, wasm2Chans, 1)
require.Equal(t, "STATE_OPEN", wasm2Chans[0].State)

osmosisChans, err := r.GetChannels(ctx, eRep, osmosisCfg.ChainID)
osmosisChans, err := r.GetChannels(ctx, eRep, osmosis.Config().ChainID)
require.NoError(t, err)
require.Len(t, osmosisChans, 0)

Expand Down Expand Up @@ -544,8 +569,8 @@ func TestRelayerMultihop(t *testing.T) {
wasm2Address := wasm2User.FormattedAddress()
require.NotEmpty(t, wasm2Address)

wasm1IBCDenom := transfertypes.ParseDenomTrace(transfertypes.GetPrefixedDenom(wasm1Chans[0].Counterparty.PortID, wasm1Chans[0].Counterparty.ChannelID, wasm1Cfg.Denom)).IBCDenom()
wasm2IBCDenom := transfertypes.ParseDenomTrace(transfertypes.GetPrefixedDenom(wasm2Chans[0].Counterparty.PortID, wasm2Chans[0].Counterparty.ChannelID, wasm2Cfg.Denom)).IBCDenom()
wasm1IBCDenom := transfertypes.ParseDenomTrace(transfertypes.GetPrefixedDenom(wasm1Chans[0].Counterparty.PortID, wasm1Chans[0].Counterparty.ChannelID, wasm1.Config().Denom)).IBCDenom()
wasm2IBCDenom := transfertypes.ParseDenomTrace(transfertypes.GetPrefixedDenom(wasm2Chans[0].Counterparty.PortID, wasm2Chans[0].Counterparty.ChannelID, wasm2.Config().Denom)).IBCDenom()

const transferAmount = int64(1_000_000)

Expand All @@ -555,7 +580,7 @@ func TestRelayerMultihop(t *testing.T) {
t.Logf("Initiating transfer from %s to %s", wasm1.Config().ChainID, wasm2.Config().ChainID)
tx, err := wasm1.SendIBCTransfer(ctx, wasm1Chans[0].ChannelID, wasm1User.KeyName(), ibc.WalletAmount{
Amount: transferAmount,
Denom: wasm1Cfg.Denom,
Denom: wasm1.Config().Denom,
Address: wasm2Address,
}, ibc.TransferOptions{})
require.NoError(t, err)
Expand All @@ -568,7 +593,7 @@ func TestRelayerMultihop(t *testing.T) {
t.Logf("Initiating transfer from %s to %s", wasm1.Config().ChainID, wasm2.Config().ChainID)
tx, err = wasm2.SendIBCTransfer(ctx, wasm2Chans[0].ChannelID, wasm2User.KeyName(), ibc.WalletAmount{
Amount: transferAmount,
Denom: wasm2Cfg.Denom,
Denom: wasm2.Config().Denom,
Address: wasm1Address,
}, ibc.TransferOptions{})
require.NoError(t, err)
Expand Down
22 changes: 1 addition & 21 deletions relayer/processor/path_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,27 +494,7 @@ func (pp *PathProcessor) Run(ctx context.Context, cancel func()) {
return
}

// Process latest message cache state from all pathEnds
if len(pp.hopsPathEnd1to2) > 0 {
lastHop := pp.pathEnd1
for i, hop := range append(pp.hopsPathEnd2to1, pp.pathEnd2) {
// TODO: only do clients and connections here
if err := pp.processLatestMessages(ctx, lastHop, hop); err != nil {
// in case of IBC message send errors, schedule retry after durationErrorRetry
if retryTimer != nil {
retryTimer.Stop()
}
if ctx.Err() == nil {
retryTimer = time.AfterFunc(durationErrorRetry, pp.ProcessBacklogIfReady)
}
}
if i < len(pp.hopsPathEnd1to2) {
lastHop = pp.hopsPathEnd1to2[i]
}
}
}
// TODO: only do channels and packets here
if err := pp.processLatestMessages(ctx, pp.pathEnd1, pp.pathEnd2); err != nil {
if err := pp.processLatestMessages(ctx); err != nil {
// in case of IBC message send errors, schedule retry after durationErrorRetry
if retryTimer != nil {
retryTimer.Stop()
Expand Down
94 changes: 49 additions & 45 deletions relayer/processor/path_processor_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,12 +505,45 @@ func (pp *PathProcessor) appendInitialMessageIfNecessary(pathEnd1Messages, pathE
}
}

func (pp *PathProcessor) processLatestChannelMessages(ctx context.Context, pathEnd1, pathEnd2 *pathEndRuntime) ([]channelIBCMessage, []packetIBCMessage, []channelIBCMessage, []packetIBCMessage) {
// We only allow channels between the first and last chain in paths
// TODO: do we need to check both directions?
if pathEnd1 != pp.pathEnd1 || pathEnd2 != pp.pathEnd2 {
return nil, nil, nil, nil
func (pp *PathProcessor) processLatestConnectionMessages(ctx context.Context) ([]connectionIBCMessage, []connectionIBCMessage) {
pathEnd1ConnectionHandshakeMessages := pathEndConnectionHandshakeMessages{
Src: pp.pathEnd1,
Dst: pp.pathEnd2,
SrcMsgConnectionOpenInit: pp.pathEnd1.messageCache.ConnectionHandshake[conntypes.EventTypeConnectionOpenInit],
DstMsgConnectionOpenTry: pp.pathEnd2.messageCache.ConnectionHandshake[conntypes.EventTypeConnectionOpenTry],
SrcMsgConnectionOpenAck: pp.pathEnd1.messageCache.ConnectionHandshake[conntypes.EventTypeConnectionOpenAck],
DstMsgConnectionOpenConfirm: pp.pathEnd2.messageCache.ConnectionHandshake[conntypes.EventTypeConnectionOpenConfirm],
}
pathEnd2ConnectionHandshakeMessages := pathEndConnectionHandshakeMessages{
Src: pp.pathEnd2,
Dst: pp.pathEnd1,
SrcMsgConnectionOpenInit: pp.pathEnd2.messageCache.ConnectionHandshake[conntypes.EventTypeConnectionOpenInit],
DstMsgConnectionOpenTry: pp.pathEnd1.messageCache.ConnectionHandshake[conntypes.EventTypeConnectionOpenTry],
SrcMsgConnectionOpenAck: pp.pathEnd2.messageCache.ConnectionHandshake[conntypes.EventTypeConnectionOpenAck],
DstMsgConnectionOpenConfirm: pp.pathEnd1.messageCache.ConnectionHandshake[conntypes.EventTypeConnectionOpenConfirm],
}
pathEnd1ConnectionHandshakeRes := pp.getUnrelayedConnectionHandshakeMessagesAndToDelete(pathEnd1ConnectionHandshakeMessages)
pathEnd2ConnectionHandshakeRes := pp.getUnrelayedConnectionHandshakeMessagesAndToDelete(pathEnd2ConnectionHandshakeMessages)

// concatenate applicable messages for pathend
return pp.connectionMessagesToSend(pathEnd1ConnectionHandshakeRes, pathEnd2ConnectionHandshakeRes)
}

func (pp *PathProcessor) processLatestClientICQMessages(ctx context.Context) ([]clientICQMessage, []clientICQMessage) {
pathEnd1ClientICQMessages := pp.getUnrelayedClientICQMessages(
pp.pathEnd1,
pp.pathEnd1.messageCache.ClientICQ[ClientICQTypeRequest],
pp.pathEnd1.messageCache.ClientICQ[ClientICQTypeResponse],
)
pathEnd2ClientICQMessages := pp.getUnrelayedClientICQMessages(
pp.pathEnd2,
pp.pathEnd2.messageCache.ClientICQ[ClientICQTypeRequest],
pp.pathEnd2.messageCache.ClientICQ[ClientICQTypeResponse],
)
return pathEnd1ClientICQMessages, pathEnd2ClientICQMessages
}

func (pp *PathProcessor) processLatestChannelMessages(ctx context.Context) ([]channelIBCMessage, []packetIBCMessage, []channelIBCMessage, []packetIBCMessage) {
pathEnd1ChannelHandshakeMessages := pathEndChannelHandshakeMessages{
Src: pp.pathEnd1,
Dst: pp.pathEnd2,
Expand Down Expand Up @@ -605,45 +638,17 @@ func (pp *PathProcessor) processLatestChannelMessages(ctx context.Context, pathE
}

// messages from both pathEnds are needed in order to determine what needs to be relayed for a single pathEnd
func (pp *PathProcessor) processLatestMessages(ctx context.Context, pathEnd1, pathEnd2 *pathEndRuntime) error {
// Update trusted client state for both pathends
pp.updateClientTrustedState(pathEnd1, pathEnd2)
pp.updateClientTrustedState(pathEnd2, pathEnd1)

pathEnd1ConnectionHandshakeMessages := pathEndConnectionHandshakeMessages{
Src: pathEnd1,
Dst: pathEnd2,
SrcMsgConnectionOpenInit: pathEnd1.messageCache.ConnectionHandshake[conntypes.EventTypeConnectionOpenInit],
DstMsgConnectionOpenTry: pathEnd2.messageCache.ConnectionHandshake[conntypes.EventTypeConnectionOpenTry],
SrcMsgConnectionOpenAck: pathEnd1.messageCache.ConnectionHandshake[conntypes.EventTypeConnectionOpenAck],
DstMsgConnectionOpenConfirm: pathEnd2.messageCache.ConnectionHandshake[conntypes.EventTypeConnectionOpenConfirm],
}
pathEnd2ConnectionHandshakeMessages := pathEndConnectionHandshakeMessages{
Src: pathEnd2,
Dst: pathEnd1,
SrcMsgConnectionOpenInit: pathEnd2.messageCache.ConnectionHandshake[conntypes.EventTypeConnectionOpenInit],
DstMsgConnectionOpenTry: pathEnd1.messageCache.ConnectionHandshake[conntypes.EventTypeConnectionOpenTry],
SrcMsgConnectionOpenAck: pathEnd2.messageCache.ConnectionHandshake[conntypes.EventTypeConnectionOpenAck],
DstMsgConnectionOpenConfirm: pathEnd1.messageCache.ConnectionHandshake[conntypes.EventTypeConnectionOpenConfirm],
}
pathEnd1ConnectionHandshakeRes := pp.getUnrelayedConnectionHandshakeMessagesAndToDelete(pathEnd1ConnectionHandshakeMessages)
pathEnd2ConnectionHandshakeRes := pp.getUnrelayedConnectionHandshakeMessagesAndToDelete(pathEnd2ConnectionHandshakeMessages)

// concatenate applicable messages for pathend
pathEnd1ConnectionMessages, pathEnd2ConnectionMessages := pp.connectionMessagesToSend(pathEnd1ConnectionHandshakeRes, pathEnd2ConnectionHandshakeRes)

pathEnd1ClientICQMessages := pp.getUnrelayedClientICQMessages(
pathEnd1,
pathEnd1.messageCache.ClientICQ[ClientICQTypeRequest],
pathEnd1.messageCache.ClientICQ[ClientICQTypeResponse],
)
pathEnd2ClientICQMessages := pp.getUnrelayedClientICQMessages(
pathEnd2,
pathEnd2.messageCache.ClientICQ[ClientICQTypeRequest],
pathEnd2.messageCache.ClientICQ[ClientICQTypeResponse],
)

pathEnd1ChannelMessages, pathEnd1PacketMessages, pathEnd2ChannelMessages, pathEnd2PacketMessages := pp.processLatestChannelMessages(ctx, pathEnd1, pathEnd2)
func (pp *PathProcessor) processLatestMessages(ctx context.Context) error {
var pathEnd1ConnectionMessages, pathEnd2ConnectionMessages []connectionIBCMessage
var pathEnd1ClientICQMessages, pathEnd2ClientICQMessages []clientICQMessage
if len(pp.hopsPathEnd1to2) == 0 {
// Update trusted client state for both pathends
pp.updateClientTrustedState(pp.pathEnd1, pp.pathEnd2)
pp.updateClientTrustedState(pp.pathEnd2, pp.pathEnd1)
pathEnd1ConnectionMessages, pathEnd2ConnectionMessages = pp.processLatestConnectionMessages(ctx)
pathEnd1ClientICQMessages, pathEnd2ClientICQMessages = pp.processLatestClientICQMessages(ctx)
}
pathEnd1ChannelMessages, pathEnd1PacketMessages, pathEnd2ChannelMessages, pathEnd2PacketMessages := pp.processLatestChannelMessages(ctx)

pathEnd1Messages := pathEndMessages{
connectionMessages: pathEnd1ConnectionMessages,
Expand All @@ -659,7 +664,6 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context, pathEnd1, pa
clientICQMessages: pathEnd2ClientICQMessages,
}

// TODO: make sure this always gets sent when there's channel traffic
pp.appendInitialMessageIfNecessary(&pathEnd1Messages, &pathEnd2Messages)

// now assemble and send messages in parallel
Expand Down

0 comments on commit 7025511

Please sign in to comment.