Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.1.x] k8s: parse configuration arrays correctly #11012

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
"time"

"github.com/moby/moby/pkg/namesgenerator"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -27,8 +29,6 @@ import (
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin"

"github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
"github.com/redpanda-data/redpanda/src/go/k8s/internal/testutils"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/configuration"
Expand Down Expand Up @@ -243,6 +243,33 @@ var _ = Describe("RedPandaCluster configuration controller", func() {
Consistently(annotationGetter(key, &appsv1.StatefulSet{}, configMapHashKey), timeoutShort, intervalShort).Should(Equal(hash))
Consistently(annotationGetter(key, &appsv1.StatefulSet{}, centralizedConfigurationHashKey), timeoutShort, intervalShort).Should(BeEmpty())

By("Adding configuration of array type")
Expect(k8sClient.Get(context.Background(), key, &cluster)).To(Succeed())

testAdminAPI.RegisterPropertySchema("kafka_nodelete_topics", admin.ConfigPropertyMetadata{NeedsRestart: false, Type: "array", Items: admin.ConfigPropertyItems{Type: "string"}})
cluster.Spec.AdditionalConfiguration["redpanda.kafka_nodelete_topics"] = "[_internal_connectors_configs _internal_connectors_offsets _internal_connectors_status _audit __consumer_offsets _redpanda_e2e_probe _schemas]"
Expect(k8sClient.Update(context.Background(), &cluster)).To(Succeed())

Eventually(testAdminAPI.PropertyGetter("kafka_nodelete_topics")).Should(Equal([]string{
"__consumer_offsets",
"_audit",
"_internal_connectors_configs",
"_internal_connectors_offsets",
"_internal_connectors_status",
"_redpanda_e2e_probe",
"_schemas",
}))

patches = testAdminAPI.PatchesGetter()()
Expect(patches).NotTo(BeEmpty())
expectedPatch := patches[len(patches)-1]
Expect(expectedPatch).NotTo(BeNil())
arrayInterface := expectedPatch.Upsert["kafka_nodelete_topics"]
Expect(arrayInterface).NotTo(BeNil())
Expect(reflect.TypeOf(arrayInterface).String()).To(Equal("[]interface {}"))
Expect(len(arrayInterface.([]interface{})) == 7).To(BeTrue())
Eventually(clusterConfiguredConditionStatusGetter(key), timeout, interval).Should(BeTrue())

By("Deleting the cluster")
Expect(k8sClient.Delete(context.Background(), redpandaCluster, deleteOptions)).Should(Succeed())
testutils.DeleteAllInNamespace(testEnv, k8sClient, namespace)
Expand Down
52 changes: 51 additions & 1 deletion src/go/k8s/pkg/resources/configuration/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@ import (
"encoding/json"
"fmt"
"math"
"reflect"
"sort"
"strconv"
"strings"

"github.com/go-logr/logr"
"gopkg.in/yaml.v3"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin"
)

Expand Down Expand Up @@ -76,7 +79,8 @@ func ThreeWayMerge(
}
for k, v := range apply {
if oldValue, ok := current[k]; !ok || !PropertiesEqual(log, v, oldValue, schema[k]) {
patch.Upsert[k] = v
metadata := schema[k]
patch.Upsert[k] = parseConfigValueBeforeUpsert(log, v, &metadata)
}
}
invalidSet := make(map[string]bool, len(invalidProperties))
Expand All @@ -95,6 +99,44 @@ func ThreeWayMerge(
return patch
}

func parseConfigValueBeforeUpsert(log logr.Logger, value interface{}, metadata *admin.ConfigPropertyMetadata) interface{} {
tempValue := fmt.Sprintf("%v", value)

//nolint:gocritic // no need to be a switch case
if metadata.Nullable && tempValue == "null" {
return nil
} else if metadata.Type != "string" && (tempValue == "") {
log.Info(fmt.Sprintf("Non-string types that receive an empty string: %v", value))
return value
} else if metadata.Type == "array" { //nolint:goconst // we wish to be explicit here
a, err := convertStringToStringArray(tempValue)
if err != nil {
log.Info(fmt.Sprintf("could not parse: invalid list syntax: %v", value))
return value
}
return a
}

return value
}

func convertStringToStringArray(value string) ([]string, error) {
a := make([]string, 0)
err := yaml.Unmarshal([]byte(value), &a)

if len(a) == 1 {
// it is possible this was not comma separated, so let's make it so and retry unmarshalling
b := make([]string, 0)
errB := yaml.Unmarshal([]byte(strings.ReplaceAll(value, " ", ",")), &b)
if errB == nil && len(b) > len(a) {
sort.Strings(b)
return b, errB
}
}
sort.Strings(a)
return a, err
}

// PropertiesEqual tries to compare two property values using metadata information about the schema,
// falling back to loose comparison in case of missing data (e.g. it happens with unknown properties).
//
Expand All @@ -113,6 +155,14 @@ func PropertiesEqual(
return i1 == i2
}
log.Info(schemaMismatchInfoLog, "type", metadata.Type, "v1", v1, "v2", v2)
case "array":
v1Parsed, errV1 := convertStringToStringArray(fmt.Sprintf("%v", v1))
v2Parsed, errV2 := convertStringToStringArray(fmt.Sprintf("%v", v2))
if errV1 == nil && errV2 == nil {
// must be sorted the same way otherwise the return will be false even though they contain the same items
return reflect.DeepEqual(v1Parsed, v2Parsed)
}
log.Info(fmt.Sprintf("error occurred trying to parse configurations: %s, %s", errV1, errV2), "type", metadata.Type, "v1", v1, "v2", v2)
}
// Other cases are correctly managed by LooseEqual
return LooseEqual(v1, v2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ spec:
redpanda.internal_topic_replication_factor: "3" # this is a change
pandaproxy_client.retries: "11"
schema_registry.schema_registry_api: "[{'name':'external','address':'0.0.0.0','port':8081}]"
redpanda.kafka_nodelete_topics: "[_internal_connectors_configs _internal_connectors_offsets _internal_connectors_status _audit __consumer_offsets _redpanda_e2e_probe _schemas]"