Skip to content

Commit

Permalink
k8s: add broker tls config to pp/sr client
Browse files Browse the repository at this point in the history
  • Loading branch information
alenkacz committed Dec 19, 2022
1 parent 3f699ef commit 06d17f8
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 5 deletions.
7 changes: 7 additions & 0 deletions src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,13 @@ type TLSConfig struct {

// Kafka API

// IsNodeCertificateSelfSigned whether we use generated self-signed cert
//
//nolint:gocritic // TODO KafkaAPI is now 81 bytes, consider a pointer
func (k KafkaAPI) IsNodeCertificateSelfSigned() bool {
return k.TLS.Enabled && k.TLS.IssuerRef == nil && k.TLS.NodeSecretRef == nil
}

// GetPort returns API port
//
//nolint:gocritic // TODO KafkaAPI is now 81 bytes, consider a pointer
Expand Down
13 changes: 13 additions & 0 deletions src/go/k8s/controllers/redpanda/cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ var _ = Describe("RedPandaCluster controller", func() {
corev1.VolumeMount{Name: "tlscert", MountPath: "/etc/tls/certs"},
corev1.VolumeMount{Name: "tlsca", MountPath: "/etc/tls/certs/ca"},
))
fmt.Printf("%v", sts.Spec.Template.Spec.Volumes)
Expect(sts.Spec.Template.Spec.Volumes).Should(
ContainElements(
corev1.Volume{
Expand All @@ -338,6 +339,10 @@ var _ = Describe("RedPandaCluster controller", func() {
Key: "tls.crt",
Path: "tls.crt",
},
{
Key: "ca.crt",
Path: "ca.crt",
},
},
DefaultMode: &defaultMode,
},
Expand All @@ -353,6 +358,14 @@ var _ = Describe("RedPandaCluster controller", func() {
Key: "ca.crt",
Path: "ca.crt",
},
{
Key: "tls.key",
Path: "tls.key",
},
{
Key: "tls.crt",
Path: "tls.crt",
},
},
DefaultMode: &defaultMode,
},
Expand Down
47 changes: 42 additions & 5 deletions src/go/k8s/pkg/resources/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (r *ConfigMapResource) CreateConfiguration(

r.preparePandaproxy(&cfg.NodeConfiguration)
r.preparePandaproxyTLS(&cfg.NodeConfiguration, mountPoints)
err := r.preparePandaproxyClient(ctx, cfg)
err := r.preparePandaproxyClient(ctx, cfg, mountPoints)
if err != nil {
return nil, err
}
Expand All @@ -362,7 +362,7 @@ func (r *ConfigMapResource) CreateConfiguration(
}
}
r.prepareSchemaRegistryTLS(&cfg.NodeConfiguration, mountPoints)
err = r.prepareSchemaRegistryClient(ctx, cfg)
err = r.prepareSchemaRegistryClient(ctx, cfg, mountPoints)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -490,11 +490,16 @@ func (r *ConfigMapResource) preparePandaproxy(cfgRpk *config.Config) {
}

func (r *ConfigMapResource) preparePandaproxyClient(
ctx context.Context, cfg *configuration.GlobalConfiguration,
ctx context.Context, cfg *configuration.GlobalConfiguration, mountPoints *resourcetypes.TLSMountPoints,
) error {
if internal := r.pandaCluster.PandaproxyAPIInternal(); internal == nil {
return nil
}
kafkaInternal := r.pandaCluster.InternalListener()
if kafkaInternal == nil {
r.logger.Error(errors.New("pandaproxy is missing internal kafka listener. This state is forbidden by the webhook"), "") //nolint:goerr113 // no need for static error
return nil
}

replicas := r.pandaCluster.GetCurrentReplicas()
cfg.NodeConfiguration.PandaproxyClient = &config.KafkaClient{}
Expand All @@ -505,6 +510,11 @@ func (r *ConfigMapResource) preparePandaproxyClient(
})
}

clientBrokerTLS := prepareClientBrokerTLS(kafkaInternal, mountPoints)
if clientBrokerTLS != nil {
cfg.NodeConfiguration.PandaproxyClient.BrokerTLS = *clientBrokerTLS
}

if !r.pandaCluster.IsSASLOnInternalEnabled() {
return nil
}
Expand All @@ -523,17 +533,39 @@ func (r *ConfigMapResource) preparePandaproxyClient(
cfg.NodeConfiguration.PandaproxyClient.SCRAMUsername = &username
cfg.NodeConfiguration.PandaproxyClient.SCRAMPassword = &password
cfg.NodeConfiguration.PandaproxyClient.SASLMechanism = &mechanism

// Add username as superuser
return cfg.AppendToAdditionalRedpandaProperty(superusersConfigurationKey, username)
}

func prepareClientBrokerTLS(kafkaListener *redpandav1alpha1.KafkaAPI, mountPoints *resourcetypes.TLSMountPoints) *config.ServerTLS {
if !kafkaListener.TLS.Enabled {
return nil
}
result := config.ServerTLS{
Enabled: true,
}
if kafkaListener.TLS.RequireClientAuth {
result.KeyFile = fmt.Sprintf("%s/%s", mountPoints.KafkaAPI.ClientCAMountDir, corev1.TLSPrivateKeyKey)
result.CertFile = fmt.Sprintf("%s/%s", mountPoints.AdminAPI.ClientCAMountDir, corev1.TLSCertKey)
}
if kafkaListener.IsNodeCertificateSelfSigned() {
// we need to also include the node ca since the node cert is self-signed
result.TruststoreFile = fmt.Sprintf("%s/%s", mountPoints.KafkaAPI.NodeCertMountDir, cmetav1.TLSCAKey)
}
return &result
}

func (r *ConfigMapResource) prepareSchemaRegistryClient(
ctx context.Context, cfg *configuration.GlobalConfiguration,
ctx context.Context, cfg *configuration.GlobalConfiguration, mountPoints *resourcetypes.TLSMountPoints,
) error {
if r.pandaCluster.Spec.Configuration.SchemaRegistry == nil {
return nil
}
kafkaInternal := r.pandaCluster.InternalListener()
if kafkaInternal == nil {
r.logger.Error(errors.New("pandaproxy is missing internal kafka listener. This state is forbidden by the webhook"), "") //nolint:goerr113 // no need for static error
return nil
}

replicas := r.pandaCluster.GetCurrentReplicas()
cfg.NodeConfiguration.SchemaRegistryClient = &config.KafkaClient{}
Expand All @@ -544,6 +576,11 @@ func (r *ConfigMapResource) prepareSchemaRegistryClient(
})
}

clientBrokerTLS := prepareClientBrokerTLS(kafkaInternal, mountPoints)
if clientBrokerTLS != nil {
cfg.NodeConfiguration.SchemaRegistryClient.BrokerTLS = *clientBrokerTLS
}

if !r.pandaCluster.IsSASLOnInternalEnabled() {
return nil
}
Expand Down
48 changes: 48 additions & 0 deletions src/go/k8s/pkg/resources/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/go-logr/logr"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -289,3 +290,50 @@ func TestConfigMapResource_prepareSeedServerList(t *testing.T) {
})
}
}

func TestConfigmap_BrokerTLSClients(t *testing.T) {
panda := pandaCluster().DeepCopy()
panda.Spec.Configuration.KafkaAPI[0].TLS = redpandav1alpha1.KafkaAPITLS{
Enabled: true,
RequireClientAuth: true,
}
panda.Spec.Configuration.SchemaRegistry = &redpandav1alpha1.SchemaRegistryAPI{
Port: 8081,
}
panda.Spec.Configuration.PandaproxyAPI = []redpandav1alpha1.PandaproxyAPI{
{Port: 8082},
}
c := fake.NewClientBuilder().Build()
secret := v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "archival",
Namespace: "default",
},
Data: map[string][]byte{
"archival": []byte("XXX"),
},
}
require.NoError(t, c.Create(context.TODO(), &secret))
cfgRes := resources.NewConfigMap(
c,
panda,
scheme.Scheme,
"cluster.local",
types.NamespacedName{Name: "test", Namespace: "test"},
types.NamespacedName{Name: "test", Namespace: "test"},
ctrl.Log.WithName("test"))
require.NoError(t, cfgRes.Ensure(context.TODO()))

actual := &v1.ConfigMap{}
err := c.Get(context.Background(), cfgRes.Key(), actual)
require.NoError(t, err)
data := actual.Data["redpanda.yaml"]
cfg := &config.Config{}
require.NoError(t, yaml.Unmarshal([]byte(data), cfg))
require.Equal(t, "/etc/tls/certs/ca/tls.key", cfg.PandaproxyClient.BrokerTLS.KeyFile)
require.Equal(t, "/etc/tls/certs/ca/tls.crt", cfg.PandaproxyClient.BrokerTLS.CertFile)
require.Equal(t, "/etc/tls/certs/ca.crt", cfg.PandaproxyClient.BrokerTLS.TruststoreFile)
require.Equal(t, "/etc/tls/certs/ca/tls.key", cfg.SchemaRegistryClient.BrokerTLS.KeyFile)
require.Equal(t, "/etc/tls/certs/ca/tls.crt", cfg.SchemaRegistryClient.BrokerTLS.CertFile)
require.Equal(t, "/etc/tls/certs/ca.crt", cfg.SchemaRegistryClient.BrokerTLS.TruststoreFile)
}

0 comments on commit 06d17f8

Please sign in to comment.