Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.2.x] rpk connect as a managed plugin #23351

Merged
36 changes: 10 additions & 26 deletions src/go/rpk/pkg/cli/cloud/byoc/uninstall.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
package byoc

import (
"fmt"
"os"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
Expand Down Expand Up @@ -38,33 +37,18 @@ disk space.
if !pluginExists {
out.Exit("The BYOC managed plugin is not installed!")
}
messages, anyFailed := removePluginAll(byoc)
for _, message := range messages {
fmt.Println(message)
}
if anyFailed {
os.Exit(1)
messages, anyFailed := byoc.Uninstall(true)
tw := out.NewTable("PATH", "MESSAGE")
defer func() {
tw.Flush()
if anyFailed {
os.Exit(1)
}
}()
for _, m := range messages {
tw.Print(m.Path, m.Message)
}
},
}
return cmd
}

func removePluginAll(p *plugin.Plugin) (messages []string, anyFailed bool) {
if err := os.Remove(p.Path); err != nil {
messages = append(messages, fmt.Sprintf("Unable to remove %q: %v", p.Path, err))
anyFailed = true
} else {
messages = append(messages, fmt.Sprintf("Removed %q", p.Path))
}

for _, shadowed := range p.ShadowedPaths {
if err := os.Remove(shadowed); err != nil {
messages = append(messages, fmt.Sprintf("Unable to remove shadowed at %q: %v", p.Path, err))
anyFailed = true
} else {
messages = append(messages, fmt.Sprintf("Remove shadowed at %q", p.Path))
}
}
return
}
112 changes: 112 additions & 0 deletions src/go/rpk/pkg/cli/connect/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package connect

import (
"context"
"errors"
"fmt"
"net/http"
"os"
"runtime"
"time"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/httpapi"
)

const pluginBaseURL = "https://rpk-plugins.redpanda.com"

type connectArtifact struct {
Path string `json:"path"`
Sha256 string `json:"sha256"`
}

type archive struct {
Version string `json:"version"`
IsLatest bool `json:"is_latest"`
Artifacts map[string]connectArtifact `json:"artifacts"`
}

type connectManifest struct {
Archives []archive `json:"archives"`
}

func (c *connectManifest) LatestArtifact() (connectArtifact, string, error) {
osArch := runtime.GOOS + "-" + runtime.GOARCH
for _, a := range c.Archives {
if a.IsLatest {
if artifact, ok := a.Artifacts[osArch]; ok {
return artifact, a.Version, nil
} else {
return connectArtifact{}, "", fmt.Errorf("no artifact found for os-arch: %s in our latest release. Please report this issue with Redpanda Support", osArch)
}
}
}
return connectArtifact{}, "", errors.New("no latest artifact found. Please report this issue with Redpanda Support")
}

func (c *connectManifest) ArtifactVersion(version string) (connectArtifact, error) {
osArch := runtime.GOOS + "-" + runtime.GOARCH
for _, a := range c.Archives {
if a.Version == version {
if artifact, ok := a.Artifacts[osArch]; ok {
return artifact, nil
} else {
return connectArtifact{}, fmt.Errorf("no artifact found for os-arch: %s in Redpanda Connect version %q. Please report this issue with Redpanda Support", osArch, version)
}
}
}
return connectArtifact{}, fmt.Errorf("unable to find version %q", version)
}

// connectRepoClient is a client to connect against our repository containing
// the Redpanda Connect packages.
type connectRepoClient struct {
cl *httpapi.Client
os string
arch string
}

func newRepoClient() (*connectRepoClient, error) {
timeout := 240 * time.Second
if t := os.Getenv("RPK_PLUGIN_DOWNLOAD_TIMEOUT"); t != "" {
duration, err := time.ParseDuration(t)
if err != nil {
return nil, fmt.Errorf("unable to parse RPK_PLUGIN_DOWNLOAD_TIMEOUT: %v", err)
}
timeout = duration
}
return &connectRepoClient{
cl: httpapi.NewClient(
httpapi.HTTPClient(&http.Client{
Timeout: timeout,
}),
),
os: runtime.GOOS,
arch: runtime.GOARCH,
}, nil
}

func (c *connectRepoClient) Manifest(ctx context.Context) (*connectManifest, error) {
var manifest connectManifest
err := c.cl.Get(ctx, fmt.Sprintf("%v/connect/manifest.json", getPluginURL()), nil, &manifest)
if err != nil {
return nil, fmt.Errorf("unable to retrieve Redpanda Connect manifest: %v", err)
}
return &manifest, nil
}

func getPluginURL() string {
url := pluginBaseURL
if repoURL := os.Getenv("RPK_PLUGIN_REPOSITORY"); repoURL != "" {
url = repoURL
}
return url
}
119 changes: 119 additions & 0 deletions src/go/rpk/pkg/cli/connect/connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package connect

import (
"fmt"
"slices"
"strings"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cobraext"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/plugin"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"go.uber.org/zap"
)

func init() {
plugin.RegisterManaged("connect", []string{"connect"}, func(cmd *cobra.Command, _ afero.Fs, p *config.Params) *cobra.Command {
run := cmd.Run
cmd.Run = func(cmd *cobra.Command, args []string) {
pluginArgs, err := parseConnectFlags(p, cmd, args)
out.MaybeDie(err, "unable to parse flags: %v", err)
run(cmd, pluginArgs)
}
return cmd
})
}

func NewCommand(fs afero.Fs, p *config.Params, execFn func(string, []string) error) *cobra.Command {
cmd := &cobra.Command{
Use: "connect",
Short: "A stream processor for mundane tasks - https://docs.redpanda.com/redpanda-connect",
DisableFlagParsing: true, // Required for managed plugins, we manually parse the flags.
Args: cobra.MinimumNArgs(0), // Connect can be run without commands.
Run: func(cmd *cobra.Command, args []string) {
pluginArgs, err := parseConnectFlags(p, cmd, args)
out.MaybeDie(err, "unable to parse flags: %v", err)
connect, pluginExists := plugin.ListPlugins(fs, plugin.UserPaths()).Find("connect")
var pluginPath string
if !pluginExists {
// If it doesn't exist we only download when the user runs a
// subcommand.
var isSubcommand bool
for _, arg := range pluginArgs {
switch {
case arg == "-c":
out.Die("-c flag is not supported by this command; run 'rpk connect run' instead")
case arg == "--version":
fmt.Println("cannot get connect version: rpk connect is not installed; run 'rpk connect install'")
cmd.Help()
return
case strings.HasPrefix(arg, "--") || strings.HasPrefix(arg, "-"):
continue
default:
isSubcommand = true
}
}
if !isSubcommand {
cmd.Help()
return
}
fmt.Println("Downloading latest Redpanda Connect")
path, _, err := installConnect(cmd.Context(), fs, "latest")
out.MaybeDie(err, "unable to install Redpanda Connect: %v; if running on an air-gapped environment you may install 'redpanda-connect' with your package manager.", err)
pluginPath = path
}
if pluginExists {
pluginPath = connect.Path
if !connect.Managed {
zap.L().Sugar().Warn("rpk is using a self-managed version of Redpanda Connect. If you want rpk to manage connect, use rpk connect uninstall && rpk connect install. To continue managing Connect manually, use our redpanda-connect package.")
}
}
zap.L().Debug("executing connect plugin", zap.String("path", pluginPath), zap.Strings("args", pluginArgs))
err = execFn(pluginPath, pluginArgs)
out.MaybeDie(err, "unable to execute redpanda connect plugin: %v", err)
},
}
cmd.AddCommand(
installCommand(fs),
uninstallCommand(fs),
upgradeCommand(fs),
)
return cmd
}

func parseConnectFlags(p *config.Params, cmd *cobra.Command, args []string) ([]string, error) {
f := cmd.Flags()

keepForPlugin, stripForRpk := cobraext.StripFlagset(args, f)
if err := f.Parse(stripForRpk); err != nil {
return nil, err
}
// Since we are manually parsing the flags, we need to force build the
// logger again.
zap.ReplaceGlobals(p.BuildLogger())
// We need to add back the Help and Version flags manually since we strip
// them for rpk.
if cobraext.LongFlagValue(args, f, "help", "h") == "true" && !slices.Contains(keepForPlugin, "--help") {
keepForPlugin = append(keepForPlugin, "--help")
}
// In rpk --verbose has a shorthand -v, in connect -v is used for version.
// This is _only_ valid for the 'connect' command:
isSubCommand := slices.ContainsFunc(keepForPlugin, func(s string) bool { return !strings.HasPrefix(s, "-") })
if cmd.Name() == "connect" && !isSubCommand {
if cobraext.LongFlagValue(args, f, "verbose", "v") == "true" && !slices.Contains(keepForPlugin, "--version") && !slices.Contains(keepForPlugin, "-v") {
keepForPlugin = append(keepForPlugin, "--version")
}
}
return keepForPlugin, nil
}
Loading
Loading