Skip to content

Commit

Permalink
Merge pull request #21311 from r-vasquez/rpk-quotas-import
Browse files Browse the repository at this point in the history
rpk: add `rpk cluster quotas import`
  • Loading branch information
r-vasquez authored Jul 12, 2024
2 parents 2662a25 + 6dd45a2 commit 4a06aa5
Show file tree
Hide file tree
Showing 7 changed files with 483 additions and 7 deletions.
12 changes: 6 additions & 6 deletions src/go/rpk/pkg/cli/cluster/quotas/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import (
"github.com/twmb/franz-go/pkg/kadm"
)

type quotaValues struct {
type quotaValue struct {
Key string `json:"key" yaml:"key"`
Value string `json:"values" yaml:"values"`
Value string `json:"value" yaml:"value"`
}
type describedQuota struct {
Entity []entityData `json:"entity" yaml:"entity"`
Values []quotaValues `json:"values" yaml:"values"`
Entity []entityData `json:"entity" yaml:"entity"`
Values []quotaValue `json:"values" yaml:"values"`

entityStr string
}
Expand Down Expand Up @@ -137,9 +137,9 @@ func printDescribedQuotas(f config.OutFormatter, quotas []kadm.DescribedClientQu
var described []describedQuota
for _, q := range quotas {
entity, entityStr := parseEntityData(q.Entity)
var qv []quotaValues
var qv []quotaValue
for _, v := range q.Values {
qv = append(qv, quotaValues{
qv = append(qv, quotaValue{
Key: v.Key,
Value: strconv.FormatFloat(v.Value, 'f', -1, 64),
})
Expand Down
304 changes: 304 additions & 0 deletions src/go/rpk/pkg/cli/cluster/quotas/import.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package quotas

import (
"encoding/json"
"fmt"
"os"
"strconv"
"strings"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/kafka"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/types"
"gopkg.in/yaml.v3"
)

// quotasDiff represents a delta in the quotas after importing a quota.
type quotasDiff struct {
EntityStr string `json:"entity,omitempty" yaml:"entity,omitempty"`
QuotaType string `json:"quota-type,omitempty" yaml:"quota-type,omitempty"`
OldValue string `json:"old-value,omitempty" yaml:"old-value,omitempty"`
NewValue string `json:"new-value,omitempty" yaml:"new-value,omitempty"`
}

func importCommand(fs afero.Fs, p *config.Params) *cobra.Command {
var (
from string
noConfirm bool
)
cmd := &cobra.Command{
Use: "import",
Short: "Import client quotas",
Long: `Import client quotas.
Use this command to import client quotas in the format produced by
'rpk cluster quotas describe --format json/yaml'.
The schema of the import string matches the schema from
'rpk cluster quotas describe --format help':
{
quotas: []{
entity: []{
name: string
type: string
}
values: []{
key: string
values: string
}
}
}
Use the '--no-confirm' flag if you wish to avoid the confirmation prompt.
`,
Example: `
Import client quotas from a file:
rpk cluster quotas import --from /path/to/file
Import client quotas from a string:
rpk cluster quotas import --from '{"quotas":...}'
`,
Run: func(cmd *cobra.Command, args []string) {
f := p.Formatter
if h, ok := f.Help([]quotasDiff{}); ok {
out.Exit(h)
}

p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)

adm, err := kafka.NewAdmin(fs, p)
out.MaybeDie(err, "unable to initialize kafka client: %v", err)
defer adm.Close()

var quotas describeResponse
var source []byte
// --from flag accepts either a file, or a string, we try first
// to read as a file as is the most expected usage.
file, err := afero.ReadFile(fs, from)
if err == nil {
source = file
} else {
if os.IsNotExist(err) || strings.Contains(err.Error(), "file name too long") {
source = []byte(from)
} else {
out.Exit("unable to read file: %v", err)
}
}
if err = json.Unmarshal(source, &quotas); err != nil {
yamlErr := yaml.Unmarshal(source, &quotas)
out.MaybeDie(yamlErr, "unable to parse quotas from %q: %v: %v", from, err, yamlErr)
}

importedQuotas, err := responseToDescribed(quotas)
out.MaybeDie(err, "unable to parse quotas: %v", err)

// Describe all quotas.
currentQuotas, err := adm.DescribeClientQuotas(cmd.Context(), false, []kadm.DescribeClientQuotaComponent{})
out.MaybeDie(err, "unable to describe client quotas: %v", err)

diff := calculateQuotasDiff(currentQuotas, importedQuotas)
if len(diff) == 0 {
out.Exit("No changes detected from import")
}
printDiff(f, diff)

if !noConfirm {
ok, err := out.Confirm("Confirm client quotas import above?")
out.MaybeDie(err, "unable to confirm deletion: %v", err)
if !ok {
out.Exit("Import canceled.")
}
}
_, err = adm.AlterClientQuotas(cmd.Context(), describedToAlterEntry(currentQuotas, importedQuotas))
out.MaybeDie(err, "unable to alter quotas: %v", err)

if f.Kind == "text" {
fmt.Println("Successfully imported the client quotas")
}
},
}

cmd.Flags().StringVar(&from, "from", "", "Either the quotas or a path to a file containing the quotas to import; check help text for more information")
cmd.Flags().BoolVar(&noConfirm, "no-confirm", false, "Disable confirmation prompt")

cmd.MarkFlagRequired("from")
return cmd
}

// responseToDescribed converts the describeResponse quota (imported source) to
// a []kadm.DescribedClientQuota.
func responseToDescribed(quotas describeResponse) ([]kadm.DescribedClientQuota, error) {
var resp []kadm.DescribedClientQuota
for _, q := range quotas.DescribedQuotas {
var (
entity []kadm.ClientQuotaEntityComponent
values []kadm.ClientQuotaValue
)
for _, e := range q.Entity {
e := e
entity = append(entity, kadm.ClientQuotaEntityComponent{
Type: e.Type,
Name: &e.Name,
})
}
for _, v := range q.Values {
floatVal, err := strconv.ParseFloat(v.Value, 64)
if err != nil {
return nil, fmt.Errorf("unable to parse client quota value %q: %v", v.Value, err)
}
values = append(values, kadm.ClientQuotaValue{
Key: v.Key,
Value: floatVal,
})
}
resp = append(resp, kadm.DescribedClientQuota{
Entity: entity,
Values: values,
})
}
return resp, nil
}

// describedToAlterEntry creates a []kadm.AlterClientQuotaEntry based on the
// toDelete and toAdd described client quotas. The entry can be used to issue
// an alter client quota request.
func describedToAlterEntry(toDelete, toAdd []kadm.DescribedClientQuota) []kadm.AlterClientQuotaEntry {
var entries []kadm.AlterClientQuotaEntry
addEntries := func(described []kadm.DescribedClientQuota, delete bool) {
for _, d := range described {
var (
entity []kadm.ClientQuotaEntityComponent
operations []kadm.AlterClientQuotaOp
)
for _, e := range d.Entity {
entity = append(entity, kadm.ClientQuotaEntityComponent{
Type: e.Type,
Name: e.Name,
})
}
for _, o := range d.Values {
if delete {
operations = append(operations, kadm.AlterClientQuotaOp{
Key: o.Key,
Remove: true,
})
} else {
operations = append(operations, kadm.AlterClientQuotaOp{
Key: o.Key,
Value: o.Value,
})
}
}
entries = append(entries, kadm.AlterClientQuotaEntry{
Entity: entity,
Ops: operations,
})
}
}
addEntries(toDelete, true)
addEntries(toAdd, false)
return entries
}

// calculateQuotasDiff calculates the diff between 'before' and 'after', any
// value that is not present will be marked as '-' in the result string.
func calculateQuotasDiff(before, after []kadm.DescribedClientQuota) []quotasDiff {
type delta struct {
oldValue string
newValue string
}
type quotaTypeMap map[string]delta

entityMap := make(map[string]quotaTypeMap)

// Fill the map with old values.
for _, q := range before {
e := q.Entity
types.Sort(e)
_, entityStr := parseEntityData(e)
qMap := entityMap[entityStr]
if qMap == nil {
qMap = quotaTypeMap{}
entityMap[entityStr] = qMap
}
for _, v := range q.Values {
qMap[v.Key] = delta{
oldValue: strconv.FormatFloat(v.Value, 'f', -1, 64),
}
}
}

// Update map with new values and track differences.
for _, q := range after {
e := q.Entity
types.Sort(e)
_, entityStr := parseEntityData(e)
qMap := entityMap[entityStr]
if qMap == nil {
qMap = quotaTypeMap{}
entityMap[entityStr] = qMap
}
for _, v := range q.Values {
newVal := strconv.FormatFloat(v.Value, 'f', -1, 64)
if d, exists := qMap[v.Key]; exists {
// If the value changed, we add it to the map.
if newVal != d.oldValue {
qMap[v.Key] = delta{oldValue: d.oldValue, newValue: newVal}
} else {
// If not, we remove it, we do nothing if the values are
// the same.
delete(qMap, v.Key)
}
} else {
qMap[v.Key] = delta{oldValue: "-", newValue: newVal}
}
}
}

// Prepare the result
var diffResult []quotasDiff
for entityStr, qMap := range entityMap {
for key, d := range qMap {
newValue := d.newValue
if newValue == "" {
newValue = "-"
}
diffResult = append(diffResult, quotasDiff{
EntityStr: entityStr,
QuotaType: key,
OldValue: d.oldValue,
NewValue: newValue,
})
}
}
return diffResult
}

func printDiff(f config.OutFormatter, diff []quotasDiff) {
if isText, _, formatted, err := f.Format(diff); !isText {
out.MaybeDie(err, "unable to print in the required format %q: %v", f.Kind, err)
fmt.Println(formatted)
return
}
tw := out.NewTable("ENTITY", "QUOTA-TYPE", "OLD-VALUE", "NEW-VALUE")
defer tw.Flush()

for _, d := range diff {
tw.PrintStructFields(d)
}
}
1 change: 1 addition & 0 deletions src/go/rpk/pkg/cli/cluster/quotas/quotas.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func NewCommand(fs afero.Fs, p *config.Params) *cobra.Command {
cmd.AddCommand(
alterCommand(fs, p),
describeCommand(fs, p),
importCommand(fs, p),
)
p.InstallKafkaFlags(cmd)
p.InstallFormatFlag(cmd)
Expand Down
13 changes: 13 additions & 0 deletions src/go/rpk/pkg/out/in.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,16 @@ func ParsePartitionString(ntp string) (ns, topic string, partitions []int, rerr
}
return ns, match[2], partitions, nil
}

// ParseFileOrStringFlag parses a flag string, if it starts with '@' it
// will treat it as a filepath and will attempt to read the file.
func ParseFileOrStringFlag(fs afero.Fs, flag string) ([]byte, error) {
if strings.HasPrefix(flag, "@") {
file, err := afero.ReadFile(fs, strings.TrimPrefix(flag, "@"))
if err != nil {
return nil, fmt.Errorf("unable to read file: %v", err)
}
return file, nil
}
return []byte(flag), nil
}
4 changes: 4 additions & 0 deletions tests/rptest/clients/rpk.py
Original file line number Diff line number Diff line change
Expand Up @@ -1910,6 +1910,10 @@ def alter_cluster_quotas(self,
output_format=output_format,
node=node)

def import_cluster_quota(self, source, output_format="json"):
cmd = ["import", "--no-confirm", "--from", source]
return self._run_cluster_quotas(cmd, output_format=output_format)

def _run_cluster_quotas(self,
cmd,
output_format="json",
Expand Down
2 changes: 1 addition & 1 deletion tests/rptest/tests/quota_management_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def controller_mutation_rate(value):

@classmethod
def from_dict(cls, d: dict):
return cls(key=QuotaValueType(d['key']), values=d['values'])
return cls(key=QuotaValueType(d['key']), values=d['value'])


class Quota(NamedTuple):
Expand Down
Loading

0 comments on commit 4a06aa5

Please sign in to comment.