Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Neutron launch fixes and optimizations #1185

Merged
merged 8 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,12 @@ func urlFlag(v *viper.Viper, cmd *cobra.Command) *cobra.Command {
}

func strategyFlag(v *viper.Viper, cmd *cobra.Command) *cobra.Command {
cmd.Flags().StringP(flagMaxTxSize, "s", "2", "strategy of path to generate of the messages in a relay transaction")
cmd.Flags().StringP(flagMaxMsgLength, "l", "5", "maximum number of messages in a relay transaction")
if err := v.BindPFlag(flagMaxTxSize, cmd.Flags().Lookup(flagMaxTxSize)); err != nil {
panic(err)
}
cmd.Flags().Uint64P(
flagMaxMsgLength,
"l",
relayer.DefaultMaxMsgLength,
"maximum number of messages per transaction",
)
if err := v.BindPFlag(flagMaxMsgLength, cmd.Flags().Lookup(flagMaxMsgLength)); err != nil {
panic(err)
}
Expand Down
16 changes: 2 additions & 14 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,9 @@ import (
"github.com/spf13/viper"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/term"
)

const (
MB = 1024 * 1024 // in bytes
appName = "rly"
)
const appName = "rly"

var defaultHome = filepath.Join(os.Getenv("HOME"), ".relayer")

Expand Down Expand Up @@ -185,18 +181,10 @@ func newRootLogger(format string, debug bool) (*zap.Logger, error) {
switch format {
case "json":
enc = zapcore.NewJSONEncoder(config)
case "console":
case "auto", "console":
enc = zapcore.NewConsoleEncoder(config)
case "logfmt":
enc = zaplogfmt.NewEncoder(config)
case "auto":
if term.IsTerminal(int(os.Stderr.Fd())) {
// When a user runs relayer in the foreground, use easier to read output.
enc = zapcore.NewConsoleEncoder(config)
} else {
// Otherwise, use consistent logfmt format for simplistic machine processing.
enc = zaplogfmt.NewEncoder(config)
}
default:
return nil, fmt.Errorf("unrecognized log format %q", format)
}
Expand Down
30 changes: 2 additions & 28 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"errors"
"fmt"
"net"
"strconv"
"strings"

"github.com/cosmos/relayer/v2/internal/relaydebug"
Expand Down Expand Up @@ -88,7 +87,7 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName, appName, appName)),
return err
}

maxTxSize, maxMsgLength, err := GetStartOptions(cmd)
maxMsgLength, err := cmd.Flags().GetUint64(flagMaxMsgLength)
if err != nil {
return err
}
Expand Down Expand Up @@ -149,7 +148,7 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName, appName, appName)),
a.log,
chains,
paths,
maxTxSize, maxMsgLength,
maxMsgLength,
a.config.memo(cmd),
clientUpdateThresholdTime,
flushInterval,
Expand Down Expand Up @@ -182,28 +181,3 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName, appName, appName)),
cmd = memoFlag(a.viper, cmd)
return cmd
}

// GetStartOptions sets strategy specific fields.
func GetStartOptions(cmd *cobra.Command) (uint64, uint64, error) {
maxTxSize, err := cmd.Flags().GetString(flagMaxTxSize)
if err != nil {
return 0, 0, err
}

txSize, err := strconv.ParseUint(maxTxSize, 10, 64)
if err != nil {
return 0, 0, err
}

maxMsgLength, err := cmd.Flags().GetString(flagMaxMsgLength)
if err != nil {
return txSize * MB, 0, err
}

msgLen, err := strconv.ParseUint(maxMsgLength, 10, 64)
if err != nil {
return txSize * MB, 0, err
}

return txSize * MB, msgLen, nil
}
4 changes: 2 additions & 2 deletions cmd/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ $ %s tx flush demo-path channel-0`,
return err
}

maxTxSize, maxMsgLength, err := GetStartOptions(cmd)
maxMsgLength, err := cmd.Flags().GetUint64(flagMaxMsgLength)
if err != nil {
return err
}
Expand All @@ -802,7 +802,7 @@ $ %s tx flush demo-path channel-0`,
a.log,
chains,
paths,
maxTxSize, maxMsgLength,
maxMsgLength,
a.config.memo(cmd),
0,
0,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ require (
go.uber.org/zap v1.24.0
golang.org/x/mod v0.8.0
golang.org/x/sync v0.1.0
golang.org/x/term v0.7.0
golang.org/x/text v0.9.0
google.golang.org/grpc v1.54.0
gopkg.in/yaml.v2 v2.4.0
Expand Down Expand Up @@ -171,6 +170,7 @@ require (
golang.org/x/net v0.9.0 // indirect
golang.org/x/oauth2 v0.5.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/term v0.7.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.110.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
7 changes: 7 additions & 0 deletions relayer/chains/cosmos/cosmos_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,13 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
break
}

ccp.log.Debug(
"Queried block",
zap.Int64("height", i),
zap.Int64("latest", persistence.latestHeight),
zap.Int64("delta", persistence.latestHeight-i),
)

persistence.retriesAtLatestQueriedBlock = 0

latestHeader = ibcHeader.(provider.TendermintIBCHeader)
Expand Down
2 changes: 0 additions & 2 deletions relayer/chains/cosmos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,6 @@ func (cc *CosmosProvider) QueryUnreceivedPackets(ctx context.Context, height uin
func sendPacketQuery(channelID string, portID string, seq uint64) string {
x := []string{
fmt.Sprintf("%s.packet_src_channel='%s'", spTag, channelID),
fmt.Sprintf("%s.packet_src_port='%s'", spTag, portID),
fmt.Sprintf("%s.packet_sequence='%d'", spTag, seq),
}
return strings.Join(x, " AND ")
Expand All @@ -906,7 +905,6 @@ func sendPacketQuery(channelID string, portID string, seq uint64) string {
func writeAcknowledgementQuery(channelID string, portID string, seq uint64) string {
x := []string{
fmt.Sprintf("%s.packet_dst_channel='%s'", waTag, channelID),
fmt.Sprintf("%s.packet_dst_port='%s'", waTag, portID),
fmt.Sprintf("%s.packet_sequence='%d'", waTag, seq),
}
return strings.Join(x, " AND ")
Expand Down
4 changes: 3 additions & 1 deletion relayer/chains/mock/mock_chain_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

clienttypes "github.com/cosmos/ibc-go/v7/modules/core/02-client/types"
chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types"
"github.com/cosmos/relayer/v2/relayer"
"github.com/cosmos/relayer/v2/relayer/chains/mock"
"github.com/cosmos/relayer/v2/relayer/processor"
"github.com/prometheus/client_golang/prometheus/testutil"
Expand Down Expand Up @@ -61,7 +62,8 @@ func TestMockChainAndPathProcessors(t *testing.T) {
clientUpdateThresholdTime := 6 * time.Hour
flushInterval := 6 * time.Hour

pathProcessor := processor.NewPathProcessor(log, pathEnd1, pathEnd2, metrics, "", clientUpdateThresholdTime, flushInterval)
pathProcessor := processor.NewPathProcessor(log, pathEnd1, pathEnd2, metrics, "",
clientUpdateThresholdTime, flushInterval, relayer.DefaultMaxMsgLength)

eventProcessor := processor.NewEventProcessor().
WithChainProcessors(
Expand Down
3 changes: 3 additions & 0 deletions relayer/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (c *Chain) CreateOpenChannels(
memo,
DefaultClientUpdateThreshold,
DefaultFlushInterval,
DefaultMaxMsgLength,
)

c.log.Info("Starting event processor for channel handshake",
Expand Down Expand Up @@ -131,6 +132,7 @@ func (c *Chain) CloseChannel(
memo,
DefaultClientUpdateThreshold,
DefaultFlushInterval,
DefaultMaxMsgLength,
)).
WithInitialBlockHistory(0).
WithMessageLifecycle(&processor.FlushLifecycle{}).
Expand Down Expand Up @@ -168,6 +170,7 @@ func (c *Chain) CloseChannel(
memo,
DefaultClientUpdateThreshold,
DefaultFlushInterval,
DefaultMaxMsgLength,
)).
WithInitialBlockHistory(0).
WithMessageLifecycle(&processor.ChannelCloseLifecycle{
Expand Down
1 change: 1 addition & 0 deletions relayer/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (c *Chain) CreateOpenConnections(
memo,
DefaultClientUpdateThreshold,
DefaultFlushInterval,
DefaultMaxMsgLength,
)

var connectionSrc, connectionDst string
Expand Down
28 changes: 22 additions & 6 deletions relayer/processor/path_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ const (
// Amount of time to wait for interchain queries.
interchainQueryTimeout = 60 * time.Second

// Amount of time between flushes if the previous flush failed.
flushFailureRetry = 15 * time.Second

// If message assembly fails from either proof query failure on the source
// or assembling the message for the destination, how many blocks should pass
// before retrying.
Expand Down Expand Up @@ -63,14 +66,16 @@ type PathProcessor struct {
messageLifecycle MessageLifecycle

initialFlushComplete bool
flushTicker *time.Ticker
flushTimer *time.Timer
flushInterval time.Duration

// Signals to retry.
retryProcess chan struct{}

sentInitialMsg bool

maxMsgs uint64

metrics *PrometheusMetrics
}

Expand All @@ -94,6 +99,7 @@ func NewPathProcessor(
memo string,
clientUpdateThresholdTime time.Duration,
flushInterval time.Duration,
maxMsgs uint64,
) *PathProcessor {
pp := &PathProcessor{
log: log,
Expand All @@ -104,6 +110,7 @@ func NewPathProcessor(
clientUpdateThresholdTime: clientUpdateThresholdTime,
flushInterval: flushInterval,
metrics: metrics,
maxMsgs: maxMsgs,
}
if flushInterval == 0 {
pp.disablePeriodicFlush()
Expand Down Expand Up @@ -264,6 +271,16 @@ func (pp *PathProcessor) HandleNewData(chainID string, cacheData ChainProcessorC
}
}

func (pp *PathProcessor) handleFlush(ctx context.Context) {
flushTimer := pp.flushInterval
if err := pp.flush(ctx); err != nil {
pp.log.Warn("Flush not complete", zap.Error(err))
flushTimer = flushFailureRetry
}
pp.flushTimer.Stop()
pp.flushTimer = time.NewTimer(flushTimer)
}

// processAvailableSignals will block if signals are not yet available, otherwise it will process one of the available signals.
// It returns whether or not the pathProcessor should quit.
func (pp *PathProcessor) processAvailableSignals(ctx context.Context, cancel func()) bool {
Expand All @@ -287,9 +304,9 @@ func (pp *PathProcessor) processAvailableSignals(ctx context.Context, cancel fun

case <-pp.retryProcess:
// No new data to merge in, just retry handling.
case <-pp.flushTicker.C:
case <-pp.flushTimer.C:
// Periodic flush to clear out any old packets
pp.flush(ctx)
pp.handleFlush(ctx)
}
return false
}
Expand All @@ -298,8 +315,7 @@ func (pp *PathProcessor) processAvailableSignals(ctx context.Context, cancel fun
func (pp *PathProcessor) Run(ctx context.Context, cancel func()) {
var retryTimer *time.Timer

pp.flushTicker = time.NewTicker(pp.flushInterval)
defer pp.flushTicker.Stop()
pp.flushTimer = time.NewTimer(time.Hour)

for {
// block until we have any signals to process
Expand All @@ -319,7 +335,7 @@ func (pp *PathProcessor) Run(ctx context.Context, cancel func()) {
}

if pp.shouldFlush() && !pp.initialFlushComplete {
pp.flush(ctx)
pp.handleFlush(ctx)
pp.initialFlushComplete = true
} else if pp.shouldTerminateForFlushComplete() {
cancel()
Expand Down
Loading
Loading