Skip to content

Commit

Permalink
new scale_handler test, add certificate for grpc clients, comment cle…
Browse files Browse the repository at this point in the history
…anup
  • Loading branch information
gauron99 committed Jul 17, 2023
1 parent 322ad90 commit 8ffd497
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 22 deletions.
5 changes: 4 additions & 1 deletion apis/keda/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,14 @@ type ComplexScalingLogic struct {

// ExternalCalculation structure describes name and URL of a gRPC server
// that KEDA can connect to with collected metrics and modify them. Each server
// has a timeout and optional Fallback functionality.
// has a timeout and optional Fallback functionality and tls certification. If
// certDir is left empty, it will connect with insecure.NewCredentials()
type ExternalCalculation struct {
Name string `json:"name"`
URL string `json:"url"`
Timeout string `json:"timeout"`
// +optional
CertificateDirectory string `json:"certDir"`
}

// HorizontalPodAutoscalerConfig specifies horizontal scale config
Expand Down
56 changes: 54 additions & 2 deletions pkg/externalscaling/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package externalscaling

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"path"
"time"

"github.com/go-logr/logr"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
Expand All @@ -20,7 +25,7 @@ type GrpcClient struct {
Connection *grpc.ClientConn
}

func NewGrpcClient(url string) (*GrpcClient, error) {
func NewGrpcClient(url string, certDir string) (*GrpcClient, error) {
retryPolicy := `{
"methodConfig": [{
"timeout": "3s",
Expand All @@ -37,9 +42,22 @@ func NewGrpcClient(url string) (*GrpcClient, error) {
grpc.WithDefaultServiceConfig(retryPolicy),
grpc.WithTransportCredentials(insecure.NewCredentials()),
}

// if certDir is not empty, load certificates
if certDir != "" {
creds, err := loadCertificates(certDir)
if err != nil {
return nil, fmt.Errorf("externalCalculator error while creating new client: %w", err)
}
opts = []grpc.DialOption{
grpc.WithDefaultServiceConfig(retryPolicy),
grpc.WithTransportCredentials(creds),
}
}

conn, err := grpc.Dial(url, opts...)
if err != nil {
return nil, fmt.Errorf("error in grpc.Dial: %w", err)
return nil, fmt.Errorf("externalCalculator error while creating new client: %w", err)
}

return &GrpcClient{Client: cl.NewExternalCalculationClient(conn), Connection: conn}, nil
Expand Down Expand Up @@ -116,3 +134,37 @@ func (c *GrpcClient) CloseConnection() error {
}
return nil
}

// load certificates taken from a directory given as an argument
// expects ca.crt, tls.crt and tls.key to be present in the directory
func loadCertificates(certDir string) (credentials.TransportCredentials, error) {
// Load certificate of the CA who signed client's certificate
pemClientCA, err := os.ReadFile(path.Join(certDir, "ca.crt"))
if err != nil {
return nil, err
}

// Get the SystemCertPool, continue with an empty pool on error
certPool, _ := x509.SystemCertPool()
if certPool == nil {
certPool = x509.NewCertPool()
}
if !certPool.AppendCertsFromPEM(pemClientCA) {
return nil, fmt.Errorf("failed to add client CA's certificate")
}

// Load certificate and private key
cert, err := tls.LoadX509KeyPair(path.Join(certDir, "tls.crt"), path.Join(certDir, "tls.key"))
if err != nil {
return nil, err
}

// Create the credentials and return it
config := &tls.Config{
MinVersion: tls.VersionTLS13,
Certificates: []tls.Certificate{cert},
}
config.RootCAs = certPool

return credentials.NewTLS(config), nil
}
7 changes: 0 additions & 7 deletions pkg/fallback/fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,13 +350,6 @@ var _ = Describe("fallback", func() {
// fallback for ComplexScalingLogic ExternalCalculators
// ---------------------------------------------------------------------------

// It("should return error when ec-fallback is enabled but ExternalCalculator returns error", func() {
// todo
// })

// It("should return error when ec-fallback is enabled but ExternalCalculator returns empty list", func() {
// todo
// })
// --- set condition to false ---
// invalid FailureThreshold eg. < 0
It("should set the ec-fallback condition to false if the Fallback FailureThreshold is invalid", func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/scaling/cache/scalers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (c *ScalersCache) RefreshExternalCalcClientsCache(ctx context.Context, so *
}
timeout = int64(parsedTime.Seconds())
}
ecClient, err := externalscaling.NewGrpcClient(ec.URL)
ecClient, err := externalscaling.NewGrpcClient(ec.URL, ec.CertificateDirectory)
var connected bool
if err != nil {
log.Error(err, fmt.Sprintf("error creating new grpc client for external calculator at %s", ec.URL))
Expand Down
2 changes: 1 addition & 1 deletion pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func (h *scaleHandler) performGetScalersCache(ctx context.Context, key string, s
}
timeout = int64(parsedTime.Seconds())
}
ecClient, err := externalscaling.NewGrpcClient(ec.URL)
ecClient, err := externalscaling.NewGrpcClient(ec.URL, ec.CertificateDirectory)

var connected bool
if err != nil {
Expand Down
156 changes: 146 additions & 10 deletions pkg/scaling/scale_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,15 +623,19 @@ func createScaler(ctrl *gomock.Controller, queueLength int64, averageValue int64
return scaler
}

// createMetricSpec creates MetricSpec for given metric name and target value.
// -----------------------------------------------------------------------------
// test for complexScalingLogic formula & external calculators
// -----------------------------------------------------------------------------

const triggerName1 = "trigger_one"
const triggerName2 = "trigger_two"
const metricName1 = "metric_one"
const metricName2 = "metric_two"

func TestComplexScalingFormula(t *testing.T) {
scaledObjectName := testNameGlobal
scaledObjectNamespace := testNamespaceGlobal
compositeMetricName := compositeMetricNameGlobal
metricName1 := "metric_one"
metricName2 := "metric_two"
triggerName1 := "trigger_one"
triggerName2 := "trigger_two"

ctrl := gomock.NewController(t)
recorder := record.NewFakeRecorder(1)
Expand Down Expand Up @@ -668,7 +672,7 @@ func TestComplexScalingFormula(t *testing.T) {
Advanced: &kedav1alpha1.AdvancedConfig{
ComplexScalingLogic: kedav1alpha1.ComplexScalingLogic{
Target: "2",
Formula: "trigger_one + trigger_two",
Formula: fmt.Sprintf("%s + %s", triggerName1, triggerName2),
},
},
Triggers: []kedav1alpha1.ScaleTriggers{
Expand Down Expand Up @@ -738,10 +742,6 @@ func TestComplexScalingExternalCalculator(t *testing.T) {
scaledObjectName := testNameGlobal
scaledObjectNamespace := testNamespaceGlobal
compositeMetricName := compositeMetricNameGlobal
metricName1 := "metric_one"
metricName2 := "metric_two"
triggerName1 := "trigger_one"
triggerName2 := "trigger_two"

ctrl := gomock.NewController(t)
recorder := record.NewFakeRecorder(1)
Expand Down Expand Up @@ -860,6 +860,142 @@ func TestComplexScalingExternalCalculator(t *testing.T) {
assert.Equal(t, float64(5), metrics.Items[0].Value.AsApproximateFloat64())
}

// test external calculator fallback logic in GetScaledObjectMetrics
func TestComplexScalingExternalCalculatorFallback(t *testing.T) {
scaledObjectName := testNameGlobal
scaledObjectNamespace := testNamespaceGlobal
compositeMetricName := compositeMetricNameGlobal

ctrl := gomock.NewController(t)
recorder := record.NewFakeRecorder(1)
mockClient := mock_client.NewMockClient(ctrl)
mockExecutor := mock_executor.NewMockScaleExecutor(ctrl)
mockStatusWriter := mock_client.NewMockStatusWriter(ctrl)

metricsSpecs1 := []v2.MetricSpec{createMetricSpec(2, metricName1)}
metricsSpecs2 := []v2.MetricSpec{createMetricSpec(8, metricName2)}
metricValue1 := scalers.GenerateMetricInMili(metricName1, float64(2))
metricValue2 := scalers.GenerateMetricInMili(metricName2, float64(8))

scaler1 := mock_scalers.NewMockScaler(ctrl)
scaler2 := mock_scalers.NewMockScaler(ctrl)
// dont use cached metrics
scalerConfig1 := scalers.ScalerConfig{TriggerUseCachedMetrics: false, TriggerName: triggerName1, ScalerIndex: 0}
scalerConfig2 := scalers.ScalerConfig{TriggerUseCachedMetrics: false, TriggerName: triggerName2, ScalerIndex: 1}
factory1 := func() (scalers.Scaler, *scalers.ScalerConfig, error) {
return scaler1, &scalerConfig1, nil
}
factory2 := func() (scalers.Scaler, *scalers.ScalerConfig, error) {
return scaler2, &scalerConfig2, nil
}

ecClient := mock_ec.NewMockExternalCalculationClient(ctrl)
numOfFailures := int32(5)

scaledObject := kedav1alpha1.ScaledObject{
ObjectMeta: metav1.ObjectMeta{
Name: scaledObjectName,
Namespace: scaledObjectNamespace,
},
Spec: kedav1alpha1.ScaledObjectSpec{
ScaleTargetRef: &kedav1alpha1.ScaleTarget{
Name: "test",
},
Advanced: &kedav1alpha1.AdvancedConfig{
ComplexScalingLogic: kedav1alpha1.ComplexScalingLogic{
Target: "2",
ExternalCalculations: []kedav1alpha1.ExternalCalculation{
{Name: "fake_calc", URL: "fake_url", Timeout: "10s"},
},
Fallback: &kedav1alpha1.Fallback{
FailureThreshold: 3,
Replicas: int32(6),
},
},
},
Triggers: []kedav1alpha1.ScaleTriggers{
{Name: triggerName1, Type: "fake_trig1"},
{Name: triggerName2, Type: "fake_trig2"},
},
},
Status: kedav1alpha1.ScaledObjectStatus{
ScaleTargetGVKR: &kedav1alpha1.GroupVersionKindResource{
Group: "apps",
Kind: "Deployment",
},
ExternalMetricNames: []string{metricName1, metricName2},
ExternalCalculationHealth: map[string]kedav1alpha1.HealthStatus{
"fake_calc": {
NumberOfFailures: &numOfFailures,
Status: kedav1alpha1.HealthStatusFailing,
},
},
},
}

scalerCache := cache.ScalersCache{
ScaledObject: &scaledObject,
Scalers: []cache.ScalerBuilder{{
Scaler: scaler1,
ScalerConfig: scalerConfig1,
Factory: factory1,
},
{
Scaler: scaler2,
ScalerConfig: scalerConfig2,
Factory: factory2,
},
},
Recorder: recorder,
ExternalCalculationGrpcClients: []cache.ExternalCalculationClient{
{Name: "fake_calc", Client: &externalscaling.GrpcClient{Client: ecClient}, Connected: true},
},
}

caches := map[string]*cache.ScalersCache{}
caches[scaledObject.GenerateIdentifier()] = &scalerCache

sh := scaleHandler{
client: mockClient,
scaleLoopContexts: &sync.Map{},
scaleExecutor: mockExecutor,
globalHTTPTimeout: time.Duration(1000),
recorder: recorder,
scalerCaches: caches,
scalerCachesLock: &sync.RWMutex{},
scaledObjectsMetricCache: metricscache.NewMetricsCache(),
}

mockClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
scaler1.EXPECT().GetMetricSpecForScaling(gomock.Any()).Return(metricsSpecs1)
scaler2.EXPECT().GetMetricSpecForScaling(gomock.Any()).Return(metricsSpecs2)
scaler1.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Any()).Return([]external_metrics.ExternalMetricValue{metricValue1, metricValue2}, true, nil)
scaler2.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Any()).Return([]external_metrics.ExternalMetricValue{metricValue1, metricValue2}, true, nil)
mockExecutor.EXPECT().RequestScale(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any())
sh.checkScalers(context.TODO(), &scaledObject, &sync.RWMutex{})

mockClient.EXPECT().Status().Return(mockStatusWriter).Times(3)
mockStatusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(3)
scaler1.EXPECT().GetMetricSpecForScaling(gomock.Any()).Return(metricsSpecs1)
scaler2.EXPECT().GetMetricSpecForScaling(gomock.Any()).Return(metricsSpecs2)
scaler1.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Any()).Return([]external_metrics.ExternalMetricValue{metricValue1, metricValue2}, true, nil)
scaler2.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Any()).Return([]external_metrics.ExternalMetricValue{metricValue1, metricValue2}, true, nil)

// return error in calculate for externalCalculator
ecClient.EXPECT().Calculate(context.TODO(), gomock.Any()).Return(&ec.Response{List: nil, Error: "error in calculate"}, fmt.Errorf("error in calculate")).Times(2)

ecRes, err := ecClient.Calculate(context.Background(), &ec.MetricsList{MetricValues: []*ec.Metric{{Name: "one", Value: 2}, {Name: "two", Value: 8}}})
assert.NotNil(t, err)
assert.Equal(t, fmt.Errorf("error in calculate"), err)
assert.Equal(t, "error in calculate", ecRes.Error)

metrics, err := sh.GetScaledObjectMetrics(context.TODO(), scaledObjectName, scaledObjectNamespace, compositeMetricName)
assert.Nil(t, err)
// fallback set to 6 replicas, target is 2 -> value should be 12
assert.Equal(t, float64(12), metrics.Items[0].Value.AsApproximateFloat64())
}

// createMetricSpec creates MetricSpec for given metric name and target value.
func createMetricSpec(averageValue int64, metricName string) v2.MetricSpec {
qty := resource.NewQuantity(averageValue, resource.DecimalSI)
return v2.MetricSpec{
Expand Down

0 comments on commit 8ffd497

Please sign in to comment.