Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add complex scaling logic for custom formula & external scaling via grpc server #4583

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
### Improvements

- **General:**: Add ScaledObject/ScaledJob names to output of `kubectl get triggerauthentication/clustertriggerauthentication` ([#796](https://github.com/kedacore/keda/issues/796))

- **General**: Add ComplexScalingLogic structure to SO for advanced scaling options ([#3567](https://github.com/kedacore/keda/issues/3567)) and ([#2440](https://github.com/kedacore/keda/issues/2440))
### Fixes

- **General**: Metrics server exposes Prometheus metrics ([#4776](https://github.com/kedacore/keda/issues/4776))
Expand Down
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,16 @@ proto-gen: protoc-gen ## Generate Liiklus, ExternalScaler and MetricsService pro
PATH="$(LOCALBIN):$(PATH)" protoc -I vendor --proto_path=pkg/metricsservice/api metrics.proto --go_out=pkg/metricsservice/api --go-grpc_out=pkg/metricsservice/api

.PHONY: mockgen-gen
mockgen-gen: mockgen pkg/mock/mock_scaling/mock_interface.go pkg/mock/mock_scaling/mock_executor/mock_interface.go pkg/mock/mock_scaler/mock_scaler.go pkg/mock/mock_scale/mock_interfaces.go pkg/mock/mock_client/mock_interfaces.go pkg/scalers/liiklus/mocks/mock_liiklus.go pkg/mock/mock_secretlister/mock_interfaces.go
mockgen-gen: mockgen pkg/mock/mock_scaling/mock_interface.go pkg/mock/mock_scaling/mock_executor/mock_interface.go pkg/mock/mock_scaler/mock_scaler.go pkg/mock/mock_scale/mock_interfaces.go pkg/mock/mock_client/mock_interfaces.go pkg/scalers/liiklus/mocks/mock_liiklus.go pkg/mock/mock_secretlister/mock_interfaces.go pkg/mock/mock_externalscaling/mock_externalscaling.go

pkg/mock/mock_scaling/mock_interface.go: pkg/scaling/scale_handler.go
$(MOCKGEN) -destination=$@ -package=mock_scaling -source=$^
pkg/mock/mock_scaling/mock_executor/mock_interface.go: pkg/scaling/executor/scale_executor.go
$(MOCKGEN) -destination=$@ -package=mock_executor -source=$^
pkg/mock/mock_scaler/mock_scaler.go: pkg/scalers/scaler.go
$(MOCKGEN) -destination=$@ -package=mock_scalers -source=$^
pkg/mock/mock_externalscaling/mock_externalscaling.go: pkg/externalscaling/api/externalCalculation_grpc.pb.go
$(MOCKGEN) -destination=$@ -package=mock_externalscaling -source=$^
pkg/mock/mock_secretlister/mock_interfaces.go: vendor/k8s.io/client-go/listers/core/v1/secret.go
mkdir -p pkg/mock/mock_secretlister
$(MOCKGEN) k8s.io/client-go/listers/core/v1 SecretLister,SecretNamespaceLister > $@
Expand Down
28 changes: 26 additions & 2 deletions apis/keda/v1alpha1/condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
ConditionActive ConditionType = "Active"
// ConditionFallback specifies that the resource has a fallback active.
ConditionFallback ConditionType = "Fallback"
// ConditionExternalFallback specifies that the resource has external fallback active.
ConditionExternalFallback ConditionType = "ExternalFallback"
// ConditionPaused specifies that the resource is paused.
ConditionPaused ConditionType = "Paused"
)
Expand Down Expand Up @@ -88,6 +90,8 @@ func (c *Conditions) AreInitialized() bool {
foundActive := false
foundFallback := false
foundPaused := false
foundExternalFallback := false

if *c != nil {
for _, condition := range *c {
if condition.Type == ConditionReady {
Expand All @@ -113,14 +117,19 @@ func (c *Conditions) AreInitialized() bool {
break
}
}
for _, condition := range *c {
if condition.Type == ConditionExternalFallback {
foundExternalFallback = true
}
}
}

return foundReady && foundActive && foundFallback && foundPaused
return foundReady && foundActive && foundFallback && foundPaused && foundExternalFallback
}

// GetInitializedConditions returns Conditions initialized to the default -> Status: Unknown
func GetInitializedConditions() *Conditions {
return &Conditions{{Type: ConditionReady, Status: metav1.ConditionUnknown}, {Type: ConditionActive, Status: metav1.ConditionUnknown}, {Type: ConditionFallback, Status: metav1.ConditionUnknown}, {Type: ConditionPaused, Status: metav1.ConditionUnknown}}
return &Conditions{{Type: ConditionReady, Status: metav1.ConditionUnknown}, {Type: ConditionActive, Status: metav1.ConditionUnknown}, {Type: ConditionFallback, Status: metav1.ConditionUnknown}, {Type: ConditionPaused, Status: metav1.ConditionUnknown}, {Type: ConditionExternalFallback, Status: metav1.ConditionUnknown}}
}

// IsTrue is true if the condition is True
Expand Down Expand Up @@ -171,6 +180,14 @@ func (c *Conditions) SetFallbackCondition(status metav1.ConditionStatus, reason
c.setCondition(ConditionFallback, status, reason, message)
}

// SetExternalFallbackCondition modifies ExternalFallback Condition according to input parameters (for ExternalCalculators)
func (c *Conditions) SetExternalFallbackCondition(status metav1.ConditionStatus, reason string, message string) {
if *c == nil {
c = GetInitializedConditions()
}
c.setCondition(ConditionExternalFallback, status, reason, message)
}

// SetPausedCondition modifies Paused Condition according to input parameters
func (c *Conditions) SetPausedCondition(status metav1.ConditionStatus, reason string, message string) {
if *c == nil {
Expand Down Expand Up @@ -211,6 +228,13 @@ func (c *Conditions) GetPausedCondition() Condition {
return c.getCondition(ConditionPaused)
}

func (c *Conditions) GetExternalFallbackCondition() Condition {
if *c == nil {
c = GetInitializedConditions()
}
return c.getCondition(ConditionExternalFallback)
}

func (c Conditions) getCondition(conditionType ConditionType) Condition {
for i := range c {
if c[i].Type == conditionType {
Expand Down
29 changes: 29 additions & 0 deletions apis/keda/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,31 @@ type AdvancedConfig struct {
HorizontalPodAutoscalerConfig *HorizontalPodAutoscalerConfig `json:"horizontalPodAutoscalerConfig,omitempty"`
// +optional
RestoreToOriginalReplicaCount bool `json:"restoreToOriginalReplicaCount,omitempty"`
// +optional
ComplexScalingLogic ComplexScalingLogic `json:"complexScalingLogic,omitempty"`
}

// ComplexScalingLogic describes advanced scaling logic options like formula
// and gRPC server for external calculations
type ComplexScalingLogic struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a point from @JorTurFer kedacore/keda-docs#1189 (comment) that this structure could use a better name - something like modifiers in yaml file. In such case i think it'd be good to change this one as well

// +optional
ExternalCalculations []ExternalCalculation `json:"externalCalculators,omitempty"`
// +optional
Formula string `json:"formula,omitempty"`
// +optional
Target string `json:"target,omitempty"`
}

// ExternalCalculation structure describes name and URL of a gRPC server
// that KEDA can connect to with collected metrics and modify them. Each server
// has a timeout and tls certification. If certDir is left empty, it will
// connect with insecure.NewCredentials()
type ExternalCalculation struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this struct as well

Name string `json:"name"`
URL string `json:"url"`
Timeout string `json:"timeout"`
// +optional
CertificateDirectory string `json:"certDir"`
}

// HorizontalPodAutoscalerConfig specifies horizontal scale config
Expand Down Expand Up @@ -141,6 +166,10 @@ type ScaledObjectStatus struct {
// +optional
ResourceMetricNames []string `json:"resourceMetricNames,omitempty"`
// +optional
CompositeScalerName string `json:"compositeScalerName,omitempty"`
// +optional
ExternalCalculationHealth map[string]HealthStatus `json:"externalCalculationHealth,omitempty"`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i created a new structure for grpc servers to have their own health status. Option number 2 could be to add them to already existing health status and prefix them with something like external-calculator or modifier depending on name of struct (see above). Or not prefix it at all and just keep it how it is but all with one health struct

// +optional
Conditions Conditions `json:"conditions,omitempty"`
// +optional
Health map[string]HealthStatus `json:"health,omitempty"`
Expand Down
120 changes: 120 additions & 0 deletions apis/keda/v1alpha1/scaledobject_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ package v1alpha1
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"strconv"
"time"

appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
Expand Down Expand Up @@ -213,6 +217,16 @@ func verifyScaledObjects(incomingSo *ScaledObject, action string) error {
}
}

// verify ComplexScalingLogic structure if defined in ScaledObject
if incomingSo.Spec.Advanced != nil && !reflect.DeepEqual(incomingSo.Spec.Advanced.ComplexScalingLogic, ComplexScalingLogic{}) {
_, _, err = ValidateComplexScalingLogic(incomingSo, []autoscalingv2.MetricSpec{})
if err != nil {
scaledobjectlog.Error(err, "error validating ComplexScalingLogic")
prommetrics.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "complex-scaling-logic")

return err
}
}
return nil
}

Expand Down Expand Up @@ -297,3 +311,109 @@ func verifyCPUMemoryScalers(incomingSo *ScaledObject, action string) error {
}
return nil
}

// ValidateComplexScalingLogic validates all combinations of given arguments
// and their values
func ValidateComplexScalingLogic(so *ScaledObject, specs []autoscalingv2.MetricSpec) (float64, autoscalingv2.MetricTargetType, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im not so sure these validation funcs should be in apis/keda/v1alpha/scaledobject_webhook.go but wasnt sure where to put them. there is dependency cycle if its in hpa.go (where the other call to this function is) because it imports kedav1alpha1

csl := so.Spec.Advanced.ComplexScalingLogic

// if Formula AND ExternalCalculations is empty, return an error
if csl.Formula == "" && len(csl.ExternalCalculations) < 1 {
return -1, autoscalingv2.MetricTargetType(""), fmt.Errorf("error atleast one ComplexScalingLogic function needs to be specified (formula or externalCalculation)")
}

var num float64
var metricType autoscalingv2.MetricTargetType

// validate formula if not empty
if err := validateCSLformula(so); err != nil {
err := errors.Join(fmt.Errorf("error validating formula in ComplexScalingLogic"), err)
return -1, autoscalingv2.MetricTargetType(""), err
}
// validate externalCalculators if not empty
if err := validateCSLexternalCalculations(csl); err != nil {
err := errors.Join(fmt.Errorf("error validating externalCalculator in ComplexScalingLogic"), err)
return -1, autoscalingv2.MetricTargetType(""), err
}
// validate target if not empty
num, metricType, err := validateCSLtarget(csl, specs)
if err != nil {
err := errors.Join(fmt.Errorf("error validating target in ComplexScalingLogic"), err)
return -1, autoscalingv2.MetricTargetType(""), err
}
return num, metricType, nil
}

func validateCSLformula(so *ScaledObject) error {
csl := so.Spec.Advanced.ComplexScalingLogic

// if formula is empty, nothing to validate
if csl.Formula == "" {
return nil
}
// formula needs target because it's always transformed to Composite scaler
if csl.Target == "" {
return fmt.Errorf("formula is given but target is empty")
}

// possible TODO: this could be more soffisticated - only check for names that
// are used in the formula itself. This would require parsing the formula.
for _, trig := range so.Spec.Triggers {
if trig.Name == "" {
return fmt.Errorf("trigger of type '%s' has empty name but csl.Formula is defined", trig.Type)
}
}
if len(csl.ExternalCalculations) > 0 {
if csl.ExternalCalculations[len(csl.ExternalCalculations)-1].Name == "" {
return fmt.Errorf("last externalCalculator has empty name but csl.Formula is defined")
}
}
return nil
}

func validateCSLexternalCalculations(cls ComplexScalingLogic) error {
// timeout check
for _, ec := range cls.ExternalCalculations {
_, err := strconv.ParseInt(ec.Timeout, 10, 64)
if err != nil {
// expect timeout in time format like 1m10s
_, err = time.ParseDuration(ec.Timeout)
if err != nil {
return fmt.Errorf("%s: error while converting type of timeout for external calculator", err)
}
}
if ec.URL == "" {
return fmt.Errorf("URL is empty for externalCalculator '%s'", ec.Name)
}
}

return nil
}

func validateCSLtarget(csl ComplexScalingLogic, specs []autoscalingv2.MetricSpec) (float64, autoscalingv2.MetricTargetType, error) {
if csl.Target == "" {
return -1, "", nil
}
// convert string to float
num, err := strconv.ParseFloat(csl.Target, 64)
if err != nil || num <= 0.0 {
return -1, "", fmt.Errorf("error converting target for complex logic (string->float) to valid target: %w", err)
}

var metricType autoscalingv2.MetricTargetType
// if target is given, composite scaler for metric collection will be
// passed to HPA config -> all types need to be the same
// make sure all scalers have the same metricTargetType
for _, metric := range specs {
if metric.External == nil {
continue
}
if metricType == "" {
metricType = metric.External.Target.Type
} else if metric.External.Target.Type != metricType {
err := fmt.Errorf("error metric target type not the same for composite scaler: %s & %s", metricType, metric.External.Target.Type)
return -1, "", err
}
}
return num, metricType, nil
}
Loading
Loading