Skip to content

Commit

Permalink
to pull
Browse files Browse the repository at this point in the history
  • Loading branch information
gauron99 committed Jun 19, 2023
1 parent 97584e6 commit d647b9a
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 64 deletions.
9 changes: 5 additions & 4 deletions apis/keda/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,22 +109,23 @@ type AdvancedConfig struct {
// and gRPC server for external calculations
type ComplexScalingLogic struct {
// +optional
ExternalCalculations []ExternalCalculation `json:"externalCalculator,omitempty"`
ExternalCalculations []ExternalCalculation `json:"externalCalculators,omitempty"`
// +optional
Formula string `json:"formula,omitempty"`
// +optional
Target string `json:"target,omitempty"`
}

// ExternalCalculation structure describes names and URLs of a gRPC server
// ExternalCalculation structure describes name and URL of a gRPC server
// that KEDA can connect to with collected metrics and modify them. Each server
// has a timeout and optional Fallback functionality.
// has a timeout and optional Fallback replica count. It is not a complete
// fallback functionality because the threshold should be always 1.
type ExternalCalculation struct {
Name string `json:"name"`
URL string `json:"url"`
Timeout string `json:"timeout"`
// +optional
Fallback Fallback `json:"fallback,omitempty"`
FallbackReplicas int32 `json:"fallbackReplicas,omitempty"`
}

// HorizontalPodAutoscalerConfig specifies horizontal scale config
Expand Down
8 changes: 7 additions & 1 deletion apis/keda/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 45 additions & 15 deletions pkg/externalscaling/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package externalscaling
import (
"context"
"fmt"
"strconv"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -56,14 +57,17 @@ func (c *GrpcClient) Calculate(ctx context.Context, list *cl.MetricsList, logger

// WaitForConnectionReady waits for gRPC connection to be ready
// returns true if the connection was successful, false if we hit a timeut from context
func (c *GrpcClient) WaitForConnectionReady(ctx context.Context, logger logr.Logger) bool {
func (c *GrpcClient) WaitForConnectionReady(ctx context.Context, url string, timeout time.Duration, logger logr.Logger) bool {
currentState := c.connection.GetState()
if currentState != connectivity.Ready {
logger.Info("Waiting for establishing a gRPC connection to server")
logger.Info(fmt.Sprintf("Waiting for establishing a gRPC connection to server for external calculator at %s", url))
timer := time.After(timeout)
for {
select {
case <-ctx.Done():
return false
case <-timer:
return false
default:
c.connection.Connect()
time.Sleep(500 * time.Millisecond)
Expand All @@ -79,34 +83,60 @@ func (c *GrpcClient) WaitForConnectionReady(ctx context.Context, logger logr.Log

// ConvertToGeneratedStruct converts K8s external metrics list to gRPC generated
// external metrics list
func ConvertToGeneratedStruct(inK8sList []external_metrics.ExternalMetricValue) (outExternal *cl.MetricsList) {
listStruct := cl.MetricsList{}
func ConvertToGeneratedStruct(inK8sList []external_metrics.ExternalMetricValue, l logr.Logger) *cl.MetricsList {
outExternal := cl.MetricsList{}
for _, val := range inK8sList {
// if value is 0, its empty in the list
metric := &cl.Metric{Name: val.MetricName, Value: float32(val.Value.Value())}
listStruct.MetricValues = append(listStruct.MetricValues, metric)
metric := cl.Metric{Name: val.MetricName, Value: float32(val.Value.Value())}
outExternal.MetricValues = append(outExternal.MetricValues, &metric)
}
return
return &outExternal
}

// ConvertFromGeneratedStruct converts gRPC generated external metrics list to
// K8s external_metrics list
func ConvertFromGeneratedStruct(inExternal *cl.MetricsList) (outK8sList []external_metrics.ExternalMetricValue) {
func ConvertFromGeneratedStruct(inExternal *cl.MetricsList) []external_metrics.ExternalMetricValue {
outK8sList := []external_metrics.ExternalMetricValue{}
for _, inValue := range inExternal.MetricValues {
outValue := external_metrics.ExternalMetricValue{}
outValue.MetricName = inValue.Name
outValue.Timestamp = v1.Now()
outValue.Value.SetMilli(int64(inValue.Value * 1000))
outK8sList = append(outK8sList, outValue)
}
return
return outK8sList
}

func Fallback(err bool, list *cl.MetricsList, ec v1alpha1.ExternalCalculation) (listOut *cl.MetricsList, errOut bool) {
if err {
// returned metrics
return
// Fallback function returns generated structure for metrics if its given in
// scaledObject. Returned structure has one metric value. Name of the metric is
// either name of already existing metric if its the only one, otherwise it will
// be named after current external calculator
func Fallback(err error, list *cl.MetricsList, ec v1alpha1.ExternalCalculation, targetValueString string, logger logr.Logger) (*cl.MetricsList, error) {
if err == nil {
// return unmodified list when no error exists
return list, nil
}

targetValue, errParse := strconv.ParseFloat(targetValueString, 64)
if errParse != nil {
return nil, errParse
}

return
listOut := cl.MetricsList{}
// if list contains only one metric, return the same one (by name) otherwise
// if multiple metrics are given, return just one with the name of ExternalCalculator
metricName := ""
if len(list.MetricValues) == 1 {
metricName = list.MetricValues[0].Name
} else {
metricName = ec.Name
}
// returned metrics
metricValue := int64((targetValue * 1000) * float64(ec.FallbackReplicas))
metric := cl.Metric{
Name: metricName,
Value: float32(metricValue),
}
listOut.MetricValues = append(listOut.MetricValues, &metric)
logger.Info(fmt.Sprintf("surpressing error for externalCalculator '%s' by activating its fallback", ec.Name))
return &listOut, nil
}
114 changes: 76 additions & 38 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package scaling
import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -443,6 +444,8 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN
cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
}

logger.V(0).Info(">>2: MATCHED-METRICS", "metricsArr", metricsArray, "metricsName", metricsName)

if len(metricsArray) == 0 {
err = fmt.Errorf("no metrics found getting metricsArray array")
logger.Error(err, "error metricsArray is empty")
Expand All @@ -454,14 +457,14 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN
continue
}

// Filter only the desired metric or if composite scaler is active, metricsArray
// is all external metrics
// Filter only the desired metric or if composite scaler is active,
// metricsArray contains all external metrics
if arrayContainsElement(spec.External.Metric.Name, metricsArray) {
// if compositeScaler is used, override with current metric, otherwise do nothing
metricName := spec.External.Metric.Name

// if compositeScaler is given, add pair to the list
if scaledObject.Spec.Advanced.ComplexScalingLogic.Target != "" {
// if ComplexScalingLogic custom formula is given, create metric-trigger pair list
if scaledObject.Spec.Advanced.ComplexScalingLogic.Formula != "" {
metricTriggerPairList, err = pairTriggersAndMetrics(metricTriggerPairList, metricName, scalerConfigs[scalerIndex].TriggerName)
if err != nil {
logger.Error(err, "error pairing triggers & metrics for compositeScaler")
Expand Down Expand Up @@ -524,59 +527,72 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN

logger.V(0).Info(">>3: MATCHED-METRICS", "metrics", matchingMetrics, "metricsName", metricsName)

grpcMetricList := externalscaling.ConvertToGeneratedStruct(matchingMetrics)
logger.V(0).Info(">>3.1 LIST", "list", grpcMetricList.MetricValues)
// convert k8s list to grpc generated list structure
grpcMetricList := externalscaling.ConvertToGeneratedStruct(matchingMetrics, logger)
// grpcMetricList := &externalscalingAPI.MetricsList{}
// for _, val := range matchingMetrics {
// metric := externalscalingAPI.Metric{Name: val.MetricName, Value: float32(val.Value.Value())}
// grpcMetricList.MetricValues = append(grpcMetricList.MetricValues, &metric)
// }

// Apply external calculations - call gRPC server on each url and return
// modified metric list in order
for i, ec := range scaledObject.Spec.Advanced.ComplexScalingLogic.ExternalCalculations {
hasError := false
timeout, err := strconv.ParseInt(ec.Timeout, 10, 64)
if err != nil {
// expect timeout in time format like 1m10s
parsedTime, err := time.ParseDuration(ec.Timeout)
if err != nil {
logger.Error(err, "error while converting type of timeout for external calculation")
break
}
timeout = int64(parsedTime.Seconds())
}

esGrpcClient, err := externalscaling.NewGrpcClient(ec.URL, logger)
if err != nil {
logger.Error(err, "error connecting external calculation grpc client to server")
hasError = true
logger.Error(err, "error creating new grpc client for external calculator")
} else {
if !esGrpcClient.WaitForConnectionReady(ctx, logger) {
if !esGrpcClient.WaitForConnectionReady(ctx, ec.URL, time.Duration(timeout), logger) {
err = fmt.Errorf("client didnt connect to server successfully")
logger.Error(err, "error in connection")
hasError = true
logger.Error(err, fmt.Sprintf("error in grpc connection for external calculator %s", ec.URL))
} else {
logger.Info(fmt.Sprintf("connected to gRPC server for external calculation %s", ec.Name))
}
logger.Info("connected to gRPC server")

grpcMetricList, err = esGrpcClient.Calculate(ctx, grpcMetricList, logger)
if err != nil {
logger.Error(err, "error calculating in grpc server at %s for external calculation", ec.URL)
hasError = true
}

// run Fallback if error was given
grpcMetricList, hasError = externalscaling.Fallback(hasError, grpcMetricList, ec)
if hasError {
// if fallback metrics not used, stop calculating
break
if grpcMetricList == nil {
err = fmt.Errorf("grpc method Calculate returned nil metric list for external calculator")
logger.Error(err, "error in external calculator after Calculate")
}
}
grpcMetricList, errFallback := externalscaling.Fallback(err, grpcMetricList, ec, scaledObject.Spec.Advanced.ComplexScalingLogic.Target, logger)
if errFallback != nil {
logger.Error(errFallback, "subsequent error occurred when trying to apply fallback metrics for external calculation")
break
}
logger.V(0).Info(fmt.Sprintf(">>3.5:%d CALCULATE END", i), "metrics", grpcMetricList)
}

// Convert from generated structure to k8s structure
matchingMetrics = externalscaling.ConvertFromGeneratedStruct(grpcMetricList)
// outK8sList := []external_metrics.ExternalMetricValue{}
// for _, inValue := range grpcMetricList.MetricValues {
// outValue := external_metrics.ExternalMetricValue{}
// outValue.MetricName = inValue.Name
// outValue.Timestamp = v1.Now()
// outValue.Value.SetMilli(int64(inValue.Value * 1000))
// outK8sList = append(outK8sList, outValue)
// }
// matchingMetrics = outK8sList

// apply formula
if scaledObject.Spec.Advanced.ComplexScalingLogic.Formula != "" {
// add last external calculation name as a possible trigger (user can
// manipulate with metrics in ExternalCalculation service and it is expected
// to be named as the ExternalCalculation[len()-1] value)
if len(scaledObject.Spec.Advanced.ComplexScalingLogic.ExternalCalculations) > 0 {
lastElemIndex := len(scaledObject.Spec.Advanced.ComplexScalingLogic.ExternalCalculations) - 1
lastElem := scaledObject.Spec.Advanced.ComplexScalingLogic.ExternalCalculations[lastElemIndex].Name
// expect last element of external calculation array via its name
metricTriggerPairList[lastElem] = lastElem
}
logger.V(0).Info(">>3.2 FORMULA", "pairlist", metricTriggerPairList)
logger.V(0).Info(">>3.2 FORMULA", "formula", scaledObject.Spec.Advanced.ComplexScalingLogic.Formula, "target", scaledObject.Spec.Advanced.ComplexScalingLogic.Target)
matchingMetrics, err = applyCustomScalerFormula(matchingMetrics, scaledObject.Spec.Advanced.ComplexScalingLogic.Formula, metricTriggerPairList, logger)
if err != nil {
logger.Error(err, "error applying custom compositeScaler formula")
}
matchingMetrics, err = applyComplexLogicFormula(scaledObject.Spec.Advanced.ComplexScalingLogic, matchingMetrics, metricTriggerPairList, logger)
if err != nil {
logger.Error(err, "error applying custom compositeScaler formula")
}

return &external_metrics.ExternalMetricValueList{
Expand Down Expand Up @@ -718,8 +734,28 @@ func arrayContainsElement(el string, arr []string) bool {
return false
}

// apply custom formula to metrics and return calculated and finalized metric
func applyCustomScalerFormula(list []external_metrics.ExternalMetricValue, formula string, pairList map[string]string, logger logr.Logger) ([]external_metrics.ExternalMetricValue, error) {
// if given right conditions, try to apply the given custom formula in SO
func applyComplexLogicFormula(csl kedav1alpha1.ComplexScalingLogic, metrics []external_metrics.ExternalMetricValue, pairList map[string]string, logger logr.Logger) ([]external_metrics.ExternalMetricValue, error) {
if csl.Formula != "" {
// add last external calculation name as a possible trigger (user can
// manipulate with metrics in ExternalCalculation service and it is expected
// to be named as the ExternalCalculation[len()-1] value)
if len(csl.ExternalCalculations) > 0 {
lastElemIndex := len(csl.ExternalCalculations) - 1
lastElem := csl.ExternalCalculations[lastElemIndex].Name
// expect last element of external calculation array via its name
pairList[lastElem] = lastElem
}
logger.V(0).Info(">>3.2 FORMULA", "pairlist", pairList)
logger.V(0).Info(">>3.2 FORMULA", "formula", csl.Formula, "target", csl.Target)
metrics, err := calculateComplexLogicFormula(metrics, csl.Formula, pairList, logger)
return metrics, err
}
return metrics, nil
}

// calculate custom formula to metrics and return calculated and finalized metric
func calculateComplexLogicFormula(list []external_metrics.ExternalMetricValue, formula string, pairList map[string]string, logger logr.Logger) ([]external_metrics.ExternalMetricValue, error) {
var ret external_metrics.ExternalMetricValue
var out float64
ret.MetricName = "composite-metric-name"
Expand Down Expand Up @@ -748,6 +784,8 @@ func applyCustomScalerFormula(list []external_metrics.ExternalMetricValue, formu
return []external_metrics.ExternalMetricValue{ret}, nil
}

// Pair trigger names and metric names for custom formula. Trigger name is used in
// formula itself (in SO) and metric name is used for its value
func pairTriggersAndMetrics(list map[string]string, metric string, trigger string) (map[string]string, error) {
if trigger == "" {
return list, fmt.Errorf("trigger name not given with compositeScaler for metric %s", metric)
Expand Down
Loading

0 comments on commit d647b9a

Please sign in to comment.