diff --git a/tools/cosmovisor/CHANGELOG.md b/tools/cosmovisor/CHANGELOG.md index a4c78e469c45..1443435deb22 100644 --- a/tools/cosmovisor/CHANGELOG.md +++ b/tools/cosmovisor/CHANGELOG.md @@ -36,6 +36,10 @@ Ref: https://keepachangelog.com/en/1.0.0/ ## [Unreleased] +### Features + +* [#21790](https://github.com/cosmos/cosmos-sdk/pull/21790) Add `add-batch-upgrade` command. + ### Improvements * [#21462](https://github.com/cosmos/cosmos-sdk/pull/21462) Pass `stdin` to binary. diff --git a/tools/cosmovisor/args.go b/tools/cosmovisor/args.go index be94fbadaf6c..fb1b4a5f5b18 100644 --- a/tools/cosmovisor/args.go +++ b/tools/cosmovisor/args.go @@ -38,6 +38,7 @@ const ( EnvTimeFormatLogs = "COSMOVISOR_TIMEFORMAT_LOGS" EnvCustomPreupgrade = "COSMOVISOR_CUSTOM_PREUPGRADE" EnvDisableRecase = "COSMOVISOR_DISABLE_RECASE" + EnvCometBftRpcEndpoint = "COMETBFT_RPC_ENDPOINT" ) const ( @@ -68,6 +69,7 @@ type Config struct { TimeFormatLogs string `toml:"cosmovisor_timeformat_logs" mapstructure:"cosmovisor_timeformat_logs" default:"kitchen"` CustomPreUpgrade string `toml:"cosmovisor_custom_preupgrade" mapstructure:"cosmovisor_custom_preupgrade" default:""` DisableRecase bool `toml:"cosmovisor_disable_recase" mapstructure:"cosmovisor_disable_recase" default:"false"` + CometBftRpcEndpoint string `toml:"combetbft_rpc_endpoint" mapstructure:"combetbft_rpc_endpoint" default:"http://localhost:26657"` // currently running upgrade currentUpgrade upgradetypes.Plan @@ -109,6 +111,11 @@ func (cfg *Config) UpgradeInfoFilePath() string { return filepath.Join(cfg.Home, "data", upgradetypes.UpgradeInfoFilename) } +// UpgradeInfoBatchFilePath is the same as UpgradeInfoFilePath but with a batch suffix. +func (cfg *Config) UpgradeInfoBatchFilePath() string { + return cfg.UpgradeInfoFilePath() + ".batch" +} + // SymLinkToGenesis creates a symbolic link from "./current" to the genesis directory. func (cfg *Config) SymLinkToGenesis() (string, error) { genesis := filepath.Join(cfg.Root(), genesisDir) @@ -207,16 +214,21 @@ func GetConfigFromFile(filePath string) (*Config, error) { func GetConfigFromEnv(skipValidate bool) (*Config, error) { var errs []error cfg := &Config{ - Home: os.Getenv(EnvHome), - Name: os.Getenv(EnvName), - DataBackupPath: os.Getenv(EnvDataBackupPath), - CustomPreUpgrade: os.Getenv(EnvCustomPreupgrade), + Home: os.Getenv(EnvHome), + Name: os.Getenv(EnvName), + DataBackupPath: os.Getenv(EnvDataBackupPath), + CustomPreUpgrade: os.Getenv(EnvCustomPreupgrade), + CometBftRpcEndpoint: os.Getenv(EnvCometBftRpcEndpoint), } if cfg.DataBackupPath == "" { cfg.DataBackupPath = cfg.Home } + if cfg.CometBftRpcEndpoint == "" { + cfg.CometBftRpcEndpoint = "http://localhost:26657" + } + var err error if cfg.AllowDownloadBinaries, err = BooleanOption(EnvDownloadBin, false); err != nil { errs = append(errs, err) @@ -548,6 +560,7 @@ func (cfg Config) DetailString() string { {EnvTimeFormatLogs, cfg.TimeFormatLogs}, {EnvCustomPreupgrade, cfg.CustomPreUpgrade}, {EnvDisableRecase, fmt.Sprintf("%t", cfg.DisableRecase)}, + {EnvCometBftRpcEndpoint, cfg.CometBftRpcEndpoint}, } derivedEntries := []struct{ name, value string }{ diff --git a/tools/cosmovisor/args_test.go b/tools/cosmovisor/args_test.go index f121a3989d9f..e32e1658bb07 100644 --- a/tools/cosmovisor/args_test.go +++ b/tools/cosmovisor/args_test.go @@ -477,6 +477,7 @@ var newConfig = func( CustomPreUpgrade: customPreUpgrade, DisableRecase: disableRecase, ShutdownGrace: time.Duration(shutdownGrace), + CometBftRpcEndpoint: "http://localhost:26657", } } diff --git a/tools/cosmovisor/cmd/cosmovisor/add_upgrade.go b/tools/cosmovisor/cmd/cosmovisor/add_upgrade.go index a197f33bc1dc..9c544aa60fc2 100644 --- a/tools/cosmovisor/cmd/cosmovisor/add_upgrade.go +++ b/tools/cosmovisor/cmd/cosmovisor/add_upgrade.go @@ -19,7 +19,7 @@ func NewAddUpgradeCmd() *cobra.Command { Short: "Add APP upgrade binary to cosmovisor", SilenceUsage: true, Args: cobra.ExactArgs(2), - RunE: AddUpgrade, + RunE: AddUpgradeCmd, } addUpgrade.Flags().Bool(cosmovisor.FlagForce, false, "overwrite existing upgrade binary / upgrade-info.json file") @@ -28,26 +28,14 @@ func NewAddUpgradeCmd() *cobra.Command { return addUpgrade } -// AddUpgrade adds upgrade info to manifest -func AddUpgrade(cmd *cobra.Command, args []string) error { - configPath, err := cmd.Flags().GetString(cosmovisor.FlagCosmovisorConfig) - if err != nil { - return fmt.Errorf("failed to get config flag: %w", err) - } - - cfg, err := cosmovisor.GetConfigFromFile(configPath) - if err != nil { - return err - } - +// addUpgrade adds upgrade info to manifest +func addUpgrade(cfg *cosmovisor.Config, force bool, upgradeHeight int64, upgradeName, executablePath, upgradeInfoPath string) error { logger := cfg.Logger(os.Stdout) - upgradeName := args[0] if !cfg.DisableRecase { - upgradeName = strings.ToLower(args[0]) + upgradeName = strings.ToLower(upgradeName) } - executablePath := args[1] if _, err := os.Stat(executablePath); err != nil { if os.IsNotExist(err) { return fmt.Errorf("invalid executable path: %w", err) @@ -68,11 +56,6 @@ func AddUpgrade(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to read binary: %w", err) } - force, err := cmd.Flags().GetBool(cosmovisor.FlagForce) - if err != nil { - return fmt.Errorf("failed to get force flag: %w", err) - } - if err := saveOrAbort(cfg.UpgradeBin(upgradeName), executableData, force); err != nil { return err } @@ -80,9 +63,7 @@ func AddUpgrade(cmd *cobra.Command, args []string) error { logger.Info(fmt.Sprintf("Using %s for %s upgrade", executablePath, upgradeName)) logger.Info(fmt.Sprintf("Upgrade binary located at %s", cfg.UpgradeBin(upgradeName))) - if upgradeHeight, err := cmd.Flags().GetInt64(cosmovisor.FlagUpgradeHeight); err != nil { - return fmt.Errorf("failed to get upgrade-height flag: %w", err) - } else if upgradeHeight > 0 { + if upgradeHeight > 0 { plan := upgradetypes.Plan{Name: upgradeName, Height: upgradeHeight} if err := plan.ValidateBasic(); err != nil { panic(fmt.Errorf("something is wrong with cosmovisor: %w", err)) @@ -94,16 +75,52 @@ func AddUpgrade(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to marshal upgrade plan: %w", err) } - if err := saveOrAbort(cfg.UpgradeInfoFilePath(), planData, force); err != nil { + if err := saveOrAbort(upgradeInfoPath, planData, force); err != nil { return err } - logger.Info(fmt.Sprintf("%s created, %s upgrade binary will switch at height %d", cfg.UpgradeInfoFilePath(), upgradeName, upgradeHeight)) + logger.Info(fmt.Sprintf("%s created, %s upgrade binary will switch at height %d", upgradeInfoPath, upgradeName, upgradeHeight)) } return nil } +// GetConfig returns a Config using passed-in flag +func getConfigFromCmd(cmd *cobra.Command) (*cosmovisor.Config, error) { + configPath, err := cmd.Flags().GetString(cosmovisor.FlagCosmovisorConfig) + if err != nil { + return nil, fmt.Errorf("failed to get config flag: %w", err) + } + + cfg, err := cosmovisor.GetConfigFromFile(configPath) + if err != nil { + return nil, err + } + return cfg, nil +} + +// AddUpgradeCmd parses input flags and adds upgrade info to manifest +func AddUpgradeCmd(cmd *cobra.Command, args []string) error { + cfg, err := getConfigFromCmd(cmd) + if err != nil { + return err + } + + upgradeName, executablePath := args[0], args[1] + + force, err := cmd.Flags().GetBool(cosmovisor.FlagForce) + if err != nil { + return fmt.Errorf("failed to get force flag: %w", err) + } + + upgradeHeight, err := cmd.Flags().GetInt64(cosmovisor.FlagUpgradeHeight) + if err != nil { + return fmt.Errorf("failed to get upgrade-height flag: %w", err) + } + + return addUpgrade(cfg, force, upgradeHeight, upgradeName, executablePath, cfg.UpgradeInfoFilePath()) +} + // saveOrAbort saves data to path or aborts if file exists and force is false func saveOrAbort(path string, data []byte, force bool) error { if _, err := os.Stat(path); err == nil { diff --git a/tools/cosmovisor/cmd/cosmovisor/batch_upgrade.go b/tools/cosmovisor/cmd/cosmovisor/batch_upgrade.go new file mode 100644 index 000000000000..a482953a3c41 --- /dev/null +++ b/tools/cosmovisor/cmd/cosmovisor/batch_upgrade.go @@ -0,0 +1,80 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/spf13/cobra" +) + +func NewBatchAddUpgradeCmd() *cobra.Command { + return &cobra.Command{ + Use: "add-batch-upgrade :: .. ::", + Short: "Add APP upgrades binary to cosmovisor", + SilenceUsage: true, + Args: cobra.MinimumNArgs(1), + RunE: AddBatchUpgrade, + } +} + +// AddBatchUpgrade takes in multiple specified upgrades and creates a single +// batch upgrade file out of them +func AddBatchUpgrade(cmd *cobra.Command, args []string) error { + cfg, err := getConfigFromCmd(cmd) + if err != nil { + return err + } + upgradeInfoPaths := []string{} + for i, as := range args { + a := strings.Split(as, ":") + if len(a) != 3 { + return fmt.Errorf("argument at position %d (%s) is invalid", i, as) + } + upgradeName := filepath.Base(a[0]) + upgradePath := a[1] + upgradeHeight, err := strconv.ParseInt(a[2], 10, 64) + if err != nil { + return fmt.Errorf("upgrade height at position %d (%s) is invalid", i, a[2]) + } + upgradeInfoPath := filepath.Join(cfg.UpgradeInfoFilePath(), upgradeName) + upgradeInfoPaths = append(upgradeInfoPaths, upgradeInfoPath) + if err := addUpgrade(cfg, true, upgradeHeight, upgradeName, upgradePath, upgradeInfoPath); err != nil { + return err + } + } + + var allData []json.RawMessage + for _, uip := range upgradeInfoPaths { + fileData, err := os.ReadFile(uip) + if err != nil { + return fmt.Errorf("error reading file %s: %w", uip, err) + } + + // Verify it's valid JSON + var jsonData json.RawMessage + if err := json.Unmarshal(fileData, &jsonData); err != nil { + return fmt.Errorf("error parsing JSON from file %s: %w", uip, err) + } + + // Add to our slice + allData = append(allData, jsonData) + } + + // Marshal the combined data + batchData, err := json.MarshalIndent(allData, "", " ") + if err != nil { + return fmt.Errorf("error marshaling combined JSON: %w", err) + } + + // Write to output file + err = os.WriteFile(cfg.UpgradeInfoBatchFilePath(), batchData, 0o600) + if err != nil { + return fmt.Errorf("error writing combined JSON to file: %w", err) + } + + return nil +} diff --git a/tools/cosmovisor/cmd/cosmovisor/root.go b/tools/cosmovisor/cmd/cosmovisor/root.go index d9f6094d593c..ea64c9d4b0f5 100644 --- a/tools/cosmovisor/cmd/cosmovisor/root.go +++ b/tools/cosmovisor/cmd/cosmovisor/root.go @@ -19,6 +19,7 @@ func NewRootCmd() *cobra.Command { configCmd, NewVersionCmd(), NewAddUpgradeCmd(), + NewBatchAddUpgradeCmd(), ) rootCmd.PersistentFlags().StringP(cosmovisor.FlagCosmovisorConfig, "c", "", "path to cosmovisor config file") diff --git a/tools/cosmovisor/go.mod b/tools/cosmovisor/go.mod index 008d8f1eae5a..e11d5c7ec5cc 100644 --- a/tools/cosmovisor/go.mod +++ b/tools/cosmovisor/go.mod @@ -5,6 +5,8 @@ go 1.23 require ( cosmossdk.io/log v1.4.1 cosmossdk.io/x/upgrade v0.1.4 + github.com/cometbft/cometbft v0.38.9 + github.com/fsnotify/fsnotify v1.7.0 github.com/otiai10/copy v1.14.0 github.com/pelletier/go-toml/v2 v2.2.3 github.com/spf13/cobra v1.8.1 @@ -45,7 +47,6 @@ require ( github.com/cockroachdb/pebble v1.1.0 // indirect github.com/cockroachdb/redact v1.1.5 // indirect github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect - github.com/cometbft/cometbft v0.38.9 // indirect github.com/cometbft/cometbft-db v0.12.0 // indirect github.com/cosmos/btcutil v1.0.5 // indirect github.com/cosmos/cosmos-db v1.0.2 // indirect @@ -68,7 +69,6 @@ require ( github.com/emicklei/dot v1.6.2 // indirect github.com/fatih/color v1.17.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/getsentry/sentry-go v0.28.0 // indirect github.com/go-kit/kit v0.13.0 // indirect github.com/go-kit/log v0.2.1 // indirect diff --git a/tools/cosmovisor/process.go b/tools/cosmovisor/process.go index ac759bde39f4..4e7223d4e97f 100644 --- a/tools/cosmovisor/process.go +++ b/tools/cosmovisor/process.go @@ -1,19 +1,25 @@ package cosmovisor import ( + "context" "encoding/json" "errors" "fmt" "io" + "net/http" "os" "os/exec" "os/signal" "path/filepath" + "sort" "strconv" "strings" "syscall" "time" + cmthttp "github.com/cometbft/cometbft/rpc/client/http" + cmttypes "github.com/cometbft/cometbft/types" + "github.com/fsnotify/fsnotify" "github.com/otiai10/copy" "cosmossdk.io/log" @@ -36,6 +42,140 @@ func NewLauncher(logger log.Logger, cfg *Config) (Launcher, error) { return Launcher{logger: logger, cfg: cfg, fw: fw}, nil } +// loadBatchUpgradeFile loads the batch upgrade file into memory, sorted by +// their upgrade heights +func loadBatchUpgradeFile(cfg *Config) ([]upgradetypes.Plan, error) { + var uInfos []upgradetypes.Plan + upgradeInfoFile, err := os.ReadFile(cfg.UpgradeInfoBatchFilePath()) + if os.IsNotExist(err) { + return uInfos, nil + } else if err != nil { + return nil, fmt.Errorf("error while reading %s: %w", cfg.UpgradeInfoBatchFilePath(), err) + } + + if err = json.Unmarshal(upgradeInfoFile, &uInfos); err != nil { + return nil, err + } + sort.Slice(uInfos, func(i, j int) bool { + return uInfos[i].Height < uInfos[j].Height + }) + return uInfos, nil +} + +// BatchUpgradeWatcher starts a watcher loop that swaps upgrade manifests at the correct +// height, given the batch upgrade file. It watches the current state of the chain +// via the websocket API. +func BatchUpgradeWatcher(ctx context.Context, cfg *Config, logger log.Logger) { + // load batch file in memory + uInfos, err := loadBatchUpgradeFile(cfg) + if err != nil { + logger.Warn("failed to load batch upgrade file", "error", err) + return + } + + watcher, err := fsnotify.NewWatcher() + if err != nil { + logger.Warn(fmt.Sprintf("failed to init watcher: %s", err)) + return + } + defer watcher.Close() + err = watcher.Add(filepath.Dir(cfg.UpgradeInfoBatchFilePath())) + if err != nil { + logger.Warn("failed to init watcher", "error", err) + return + } + + // Wait for the chain process to be ready +pollLoop: + for { + select { + case <-ctx.Done(): + return + default: + _, err := http.Get(cfg.CometBftRpcEndpoint) + if err == nil { + break pollLoop + } + time.Sleep(time.Second) + } + } + client, err := cmthttp.New(cfg.CometBftRpcEndpoint, "/websocket") + if err != nil { + logger.Warn("failed to create CometBFT client", "error", err) + return + } + defer func() { + if err := client.Stop(); err != nil { + logger.Warn("couldn't stop CometBFT client", "errror", err) + } + }() + + err = client.Start() + if err != nil { + logger.Warn("failed to start CometBFT client", "error", err) + return + } + + eventCh, err := client.Subscribe(ctx, "cosmovisor-watcher", cmttypes.EventQueryNewBlock.String()) + if err != nil { + logger.Warn("failed to subscribe to new blocks", "error", err) + return + } + + var prevUpgradeHeight int64 = -1 + + logger.Info("starting the batch watcher loop") + for { + select { + case e := <-eventCh: + if len(uInfos) == 0 { + continue + } + nb, ok := e.Data.(cmttypes.EventDataNewBlock) + if !ok { + logger.Warn("batch watcher: unexpected event data type", "eventData", e.Data) + continue + } + h := nb.Block.Height + upcomingUpgrade := uInfos[0].Height + // replace upgrade-info and upgrade-info batch file + if h > prevUpgradeHeight && h < upcomingUpgrade { + jsonBytes, err := json.Marshal(uInfos[0]) + if err != nil { + logger.Warn("error marshaling JSON", "error", err) + return + } + if err := os.WriteFile(cfg.UpgradeInfoFilePath(), jsonBytes, 0o600); err != nil { + logger.Warn("error writing upgrade-info.json", "error", err) + return + } + uInfos = uInfos[1:] + + jsonBytes, err = json.Marshal(uInfos) + if err != nil { + logger.Warn("error marshaling JSON", "error", err) + return + } + if err := os.WriteFile(cfg.UpgradeInfoBatchFilePath(), jsonBytes, 0o600); err != nil { + logger.Warn("error writing upgrade-info.json.batch", "error", err) + return + } + prevUpgradeHeight = upcomingUpgrade + } + case event := <-watcher.Events: + if event.Op&(fsnotify.Write|fsnotify.Create) != 0 { + uInfos, err = loadBatchUpgradeFile(cfg) + if err != nil { + logger.Warn("failed to load batch upgrade file", "error", err) + return + } + } + case <-ctx.Done(): + return + } + } +} + // Run launches the app in a subprocess and returns when the subprocess (app) // exits (either when it dies, or *after* a successful upgrade.) and upgrade finished. // Returns true if the upgrade request was detected and the upgrade process started. @@ -58,16 +198,21 @@ func (l Launcher) Run(args []string, stdin io.Reader, stdout, stderr io.Writer) return false, fmt.Errorf("launching process %s %s failed: %w", bin, strings.Join(args, " "), err) } + ctx, cancel := context.WithCancel(context.Background()) + sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGQUIT, syscall.SIGTERM) go func() { sig := <-sigs + cancel() if err := cmd.Process.Signal(sig); err != nil { l.logger.Error("terminated", "error", err, "bin", bin) os.Exit(1) } }() + go BatchUpgradeWatcher(ctx, l.cfg, l.logger) + if needsUpdate, err := l.WaitForUpgradeOrExit(cmd); err != nil || !needsUpdate { return false, err }