diff --git a/CHANGELOG.md b/CHANGELOG.md index 61711cff40d..6b3a11827ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ - Add Cassandra Scaler ([#2211](https://github.com/kedacore/keda/pull/2211)) - Improve Redis Scaler, upgrade library, add username and Sentinel support ([#2181](https://github.com/kedacore/keda/pull/2181)) - Add GCP identity authentication when using Pubsub Scaler ([#2225](https://github.com/kedacore/keda/pull/2225)) +- Add ScalersCache to reuse scalers unless they need changing ([#2187](https://github.com/kedacore/keda/pull/2187)) ### Improvements diff --git a/CREATE-NEW-SCALER.md b/CREATE-NEW-SCALER.md index 06526b15adf..b380aa4f270 100644 --- a/CREATE-NEW-SCALER.md +++ b/CREATE-NEW-SCALER.md @@ -93,7 +93,7 @@ The constructor should have the following parameters: ## Lifecycle of a scaler -The scaler is created and closed everytime KEDA or HPA wants to call `GetMetrics`, and everytime a new ScaledObject is created or updated that has a trigger for that scaler. Thus, a developer of a scaler should not assume that the scaler will maintain any state between these calls. +Scalers are created and cached until the ScaledObject is modified, or `.IsActive()`/`GetMetrics()` result in an error. The cached scaler is then invalidated and a new scaler is created. `Close()` is called on all scalers when disposed. ## Note The scaler code is embedded into the two separate binaries comprising KEDA, the operator and the custom metrics server component. The metrics server must be occasionally rebuilt published and deployed to k8s for it to have the same code as your operator. diff --git a/adapter/main.go b/adapter/main.go index 6ac8e7807cb..b3c3baf0e0c 100644 --- a/adapter/main.go +++ b/adapter/main.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "context" "flag" "fmt" "os" @@ -24,9 +25,10 @@ import ( "strconv" "time" + "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/wait" + k8sruntime "k8s.io/apimachinery/pkg/runtime" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/client-go/kubernetes/scheme" @@ -36,12 +38,14 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/controller" basecmd "sigs.k8s.io/custom-metrics-apiserver/pkg/cmd" "sigs.k8s.io/custom-metrics-apiserver/pkg/provider" generatedopenapi "github.com/kedacore/keda/v2/adapter/generated/openapi" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" + kedacontrollers "github.com/kedacore/keda/v2/controllers/keda" prommetrics "github.com/kedacore/keda/v2/pkg/metrics" kedaprovider "github.com/kedacore/keda/v2/pkg/provider" "github.com/kedacore/keda/v2/pkg/scaling" @@ -65,7 +69,7 @@ var ( adapterClientRequestBurst int ) -func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.MetricsProvider, error) { +func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.MetricsProvider, <-chan struct{}, error) { // Get a config to talk to the apiserver cfg, err := config.GetConfig() if cfg != nil { @@ -75,17 +79,17 @@ func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.Metric if err != nil { logger.Error(err, "failed to get the config") - return nil, fmt.Errorf("failed to get the config (%s)", err) + return nil, nil, fmt.Errorf("failed to get the config (%s)", err) } scheme := scheme.Scheme if err := appsv1.SchemeBuilder.AddToScheme(scheme); err != nil { logger.Error(err, "failed to add apps/v1 scheme to runtime scheme") - return nil, fmt.Errorf("failed to add apps/v1 scheme to runtime scheme (%s)", err) + return nil, nil, fmt.Errorf("failed to add apps/v1 scheme to runtime scheme (%s)", err) } if err := kedav1alpha1.SchemeBuilder.AddToScheme(scheme); err != nil { logger.Error(err, "failed to add keda scheme to runtime scheme") - return nil, fmt.Errorf("failed to add keda scheme to runtime scheme (%s)", err) + return nil, nil, fmt.Errorf("failed to add keda scheme to runtime scheme (%s)", err) } kubeclient, err := client.New(cfg, client.Options{ @@ -93,7 +97,7 @@ func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.Metric }) if err != nil { logger.Error(err, "unable to construct new client") - return nil, fmt.Errorf("unable to construct new client (%s)", err) + return nil, nil, fmt.Errorf("unable to construct new client (%s)", err) } broadcaster := record.NewBroadcaster() @@ -103,13 +107,43 @@ func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.Metric namespace, err := getWatchNamespace() if err != nil { logger.Error(err, "failed to get watch namespace") - return nil, fmt.Errorf("failed to get watch namespace (%s)", err) + return nil, nil, fmt.Errorf("failed to get watch namespace (%s)", err) } prometheusServer := &prommetrics.PrometheusMetricServer{} go func() { prometheusServer.NewServer(fmt.Sprintf(":%v", prometheusMetricsPort), prometheusMetricsPath) }() + stopCh := make(chan struct{}) + if err := runScaledObjectController(scheme, namespace, handler, logger, stopCh); err != nil { + return nil, nil, err + } + + return kedaprovider.NewProvider(logger, handler, kubeclient, namespace), stopCh, nil +} + +func runScaledObjectController(scheme *k8sruntime.Scheme, namespace string, scaleHandler scaling.ScaleHandler, logger logr.Logger, stopCh chan<- struct{}) error { + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + Scheme: scheme, + Namespace: namespace, + }) + if err != nil { + return err + } + + if err := (&kedacontrollers.MetricsScaledObjectReconciler{ + ScaleHandler: scaleHandler, + }).SetupWithManager(mgr, controller.Options{}); err != nil { + return err + } + + go func() { + if err := mgr.Start(context.Background()); err != nil { + logger.Error(err, "controller-runtime encountered an error") + stopCh <- struct{}{} + close(stopCh) + } + }() - return kedaprovider.NewProvider(logger, handler, kubeclient, namespace), nil + return nil } func printVersion() { @@ -171,7 +205,7 @@ func main() { return } - kedaProvider, err := cmd.makeProvider(time.Duration(globalHTTPTimeoutMS) * time.Millisecond) + kedaProvider, stopCh, err := cmd.makeProvider(time.Duration(globalHTTPTimeoutMS) * time.Millisecond) if err != nil { logger.Error(err, "making provider") return @@ -179,7 +213,7 @@ func main() { cmd.WithExternalMetrics(kedaProvider) logger.Info(cmd.Message) - if err = cmd.Run(wait.NeverStop); err != nil { + if err = cmd.Run(stopCh); err != nil { return } } diff --git a/apis/keda/v1alpha1/zz_generated.deepcopy.go b/apis/keda/v1alpha1/zz_generated.deepcopy.go index d6e286bf9e8..53d00a01491 100644 --- a/apis/keda/v1alpha1/zz_generated.deepcopy.go +++ b/apis/keda/v1alpha1/zz_generated.deepcopy.go @@ -1,4 +1,3 @@ -//go:build !ignore_autogenerated // +build !ignore_autogenerated /* diff --git a/config/crd/bases/keda.sh_clustertriggerauthentications.yaml b/config/crd/bases/keda.sh_clustertriggerauthentications.yaml index a33408d758d..c871e853d2c 100644 --- a/config/crd/bases/keda.sh_clustertriggerauthentications.yaml +++ b/config/crd/bases/keda.sh_clustertriggerauthentications.yaml @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.4.1 + controller-gen.kubebuilder.io/version: v0.6.1 creationTimestamp: null name: clustertriggerauthentications.keda.sh spec: @@ -90,6 +90,8 @@ spec: type: object mount: type: string + namespace: + type: string role: type: string secrets: diff --git a/config/crd/bases/keda.sh_scaledobjects.yaml b/config/crd/bases/keda.sh_scaledobjects.yaml index 0781a3e86d0..1ff6f64fd0c 100644 --- a/config/crd/bases/keda.sh_scaledobjects.yaml +++ b/config/crd/bases/keda.sh_scaledobjects.yaml @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.4.1 + controller-gen.kubebuilder.io/version: v0.6.1 creationTimestamp: null name: scaledobjects.keda.sh spec: diff --git a/controllers/keda/hpa.go b/controllers/keda/hpa.go index 302098a5674..310090f9ed6 100644 --- a/controllers/keda/hpa.go +++ b/controllers/keda/hpa.go @@ -160,35 +160,32 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(ctx context.Context, var externalMetricNames []string var resourceMetricNames []string - scalers, err := r.scaleHandler.GetScalers(ctx, scaledObject) + cache, err := r.scaleHandler.GetScalersCache(ctx, scaledObject) if err != nil { logger.Error(err, "Error getting scalers") return nil, err } - for _, scaler := range scalers { - metricSpecs := scaler.GetMetricSpecForScaling(ctx) + metricSpecs := cache.GetMetricSpecForScaling(ctx) - for _, metricSpec := range metricSpecs { - if metricSpec.Resource != nil { - resourceMetricNames = append(resourceMetricNames, string(metricSpec.Resource.Name)) - } - - if metricSpec.External != nil { - externalMetricName := metricSpec.External.Metric.Name - if kedacontrollerutil.Contains(externalMetricNames, externalMetricName) { - return nil, fmt.Errorf("metricName %s defined multiple times in ScaledObject %s, please refer the documentation how to define metricName manually", externalMetricName, scaledObject.Name) - } + for _, metricSpec := range metricSpecs { + if metricSpec.Resource != nil { + resourceMetricNames = append(resourceMetricNames, string(metricSpec.Resource.Name)) + } - // add the scaledobject.keda.sh/name label. This is how the MetricsAdapter will know which scaledobject a metric is for when the HPA queries it. - metricSpec.External.Metric.Selector = &metav1.LabelSelector{MatchLabels: make(map[string]string)} - metricSpec.External.Metric.Selector.MatchLabels["scaledobject.keda.sh/name"] = scaledObject.Name - externalMetricNames = append(externalMetricNames, externalMetricName) + if metricSpec.External != nil { + externalMetricName := metricSpec.External.Metric.Name + if kedacontrollerutil.Contains(externalMetricNames, externalMetricName) { + return nil, fmt.Errorf("metricName %s defined multiple times in ScaledObject %s, please refer the documentation how to define metricName manually", externalMetricName, scaledObject.Name) } + + // add the scaledobject.keda.sh/name label. This is how the MetricsAdapter will know which scaledobject a metric is for when the HPA queries it. + metricSpec.External.Metric.Selector = &metav1.LabelSelector{MatchLabels: make(map[string]string)} + metricSpec.External.Metric.Selector.MatchLabels["scaledobject.keda.sh/name"] = scaledObject.Name + externalMetricNames = append(externalMetricNames, externalMetricName) } - scaledObjectMetricSpecs = append(scaledObjectMetricSpecs, metricSpecs...) - scaler.Close(ctx) } + scaledObjectMetricSpecs = append(scaledObjectMetricSpecs, metricSpecs...) // sort metrics in ScaledObject, this way we always check the same resource in Reconcile loop and we can prevent unnecessary HPA updates, // see https://github.com/kedacore/keda/issues/1531 for details diff --git a/controllers/keda/hpa_test.go b/controllers/keda/hpa_test.go index 51a092f4863..6b8c18f39ad 100644 --- a/controllers/keda/hpa_test.go +++ b/controllers/keda/hpa_test.go @@ -30,7 +30,8 @@ import ( "github.com/kedacore/keda/v2/pkg/mock/mock_client" mock_scalers "github.com/kedacore/keda/v2/pkg/mock/mock_scaler" "github.com/kedacore/keda/v2/pkg/mock/mock_scaling" - kedascalers "github.com/kedacore/keda/v2/pkg/scalers" + "github.com/kedacore/keda/v2/pkg/scalers" + "github.com/kedacore/keda/v2/pkg/scaling/cache" ) var _ = Describe("hpa", func() { @@ -129,7 +130,16 @@ func setupTest(health map[string]v1alpha1.HealthStatus, scaler *mock_scalers.Moc }, } - scalers := []kedascalers.Scaler{scaler} + scalersCache := cache.ScalersCache{ + Scalers: []cache.ScalerBuilder{{ + Scaler: scaler, + Factory: func() (scalers.Scaler, error) { + return scaler, nil + }, + }}, + Logger: nil, + Recorder: nil, + } metricSpec := v2beta2.MetricSpec{ External: &v2beta2.ExternalMetricSource{ Metric: v2beta2.MetricIdentifier{ @@ -140,8 +150,7 @@ func setupTest(health map[string]v1alpha1.HealthStatus, scaler *mock_scalers.Moc metricSpecs := []v2beta2.MetricSpec{metricSpec} ctx := context.Background() scaler.EXPECT().GetMetricSpecForScaling(ctx).Return(metricSpecs) - scaler.EXPECT().Close(ctx) - scaleHandler.EXPECT().GetScalers(context.Background(), gomock.Eq(scaledObject)).Return(scalers, nil) + scaleHandler.EXPECT().GetScalersCache(context.Background(), gomock.Eq(scaledObject)).Return(&scalersCache, nil) return scaledObject } diff --git a/controllers/keda/metrics_adapter_controller.go b/controllers/keda/metrics_adapter_controller.go new file mode 100644 index 00000000000..4b79b2c29ba --- /dev/null +++ b/controllers/keda/metrics_adapter_controller.go @@ -0,0 +1,45 @@ +/* +Copyright 2021 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package keda + +import ( + "context" + + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" + "github.com/kedacore/keda/v2/pkg/scaling" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +type MetricsScaledObjectReconciler struct { + ScaleHandler scaling.ScaleHandler +} + +func (r *MetricsScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + r.ScaleHandler.ClearScalersCache(ctx, req.Name, req.Namespace) + return ctrl.Result{}, nil +} + +func (r *MetricsScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { + return ctrl.NewControllerManagedBy(mgr). + For(&kedav1alpha1.ScaledObject{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + Owns(&kedav1alpha1.ScaledObject{}). + WithOptions(options). + Complete(r) +} diff --git a/controllers/keda/scaledjob_controller.go b/controllers/keda/scaledjob_controller.go index 05c7fb6866e..e14fc90bdff 100644 --- a/controllers/keda/scaledjob_controller.go +++ b/controllers/keda/scaledjob_controller.go @@ -140,7 +140,7 @@ func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger log } // Check ScaledJob is Ready or not - _, err = r.scaleHandler.GetScalers(ctx, scaledJob) + _, err = r.scaleHandler.GetScalersCache(ctx, scaledJob) if err != nil { logger.Error(err, "Error getting scalers") return "Failed to ensure ScaledJob is correctly created", err diff --git a/controllers/keda/scaledobject_controller_test.go b/controllers/keda/scaledobject_controller_test.go index 6b18aec6744..a8052bcbe4d 100644 --- a/controllers/keda/scaledobject_controller_test.go +++ b/controllers/keda/scaledobject_controller_test.go @@ -35,6 +35,7 @@ import ( "github.com/kedacore/keda/v2/pkg/mock/mock_client" "github.com/kedacore/keda/v2/pkg/mock/mock_scaling" "github.com/kedacore/keda/v2/pkg/scalers" + "github.com/kedacore/keda/v2/pkg/scaling/cache" ) type GinkgoTestReporter struct{} @@ -82,7 +83,7 @@ var _ = Describe("ScaledObjectController", func() { It("should pass metric name validation", func() { // Generate test data - testScalers := make([]scalers.Scaler, 0) + testScalers := make([]cache.ScalerBuilder, 0) expectedExternalMetricNames := make([]string, 0) for i, tm := range triggerMeta { @@ -99,7 +100,12 @@ var _ = Describe("ScaledObjectController", func() { Fail(err.Error()) } - testScalers = append(testScalers, s) + testScalers = append(testScalers, cache.ScalerBuilder{ + Scaler: s, + Factory: func() (scalers.Scaler, error) { + return scalers.NewPrometheusScaler(config) + }, + }) for _, metricSpec := range s.GetMetricSpecForScaling(context.Background()) { if metricSpec.External != nil { expectedExternalMetricNames = append(expectedExternalMetricNames, metricSpec.External.Metric.Name) @@ -108,7 +114,10 @@ var _ = Describe("ScaledObjectController", func() { } // Set up expectations - mockScaleHandler.EXPECT().GetScalers(context.Background(), uniquelyNamedScaledObject).Return(testScalers, nil) + scalerCache := cache.ScalersCache{ + Scalers: testScalers, + } + mockScaleHandler.EXPECT().GetScalersCache(context.Background(), uniquelyNamedScaledObject).Return(&scalerCache, nil) mockClient.EXPECT().Status().Return(mockStatusWriter) mockStatusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()) @@ -121,6 +130,7 @@ var _ = Describe("ScaledObjectController", func() { // Test returned values Ω(len(metricSpecs)).Should(Equal(len(testScalers))) Ω(err).Should(BeNil()) + scalerCache.Close(ctx) }) It("should pass metric name validation with single value", func() { @@ -145,8 +155,16 @@ var _ = Describe("ScaledObjectController", func() { } } + scalersCache := cache.ScalersCache{ + Scalers: []cache.ScalerBuilder{{ + Scaler: s, + Factory: func() (scalers.Scaler, error) { + return s, nil + }, + }}, + } // Set up expectations - mockScaleHandler.EXPECT().GetScalers(context.Background(), uniquelyNamedScaledObject).Return([]scalers.Scaler{s}, nil) + mockScaleHandler.EXPECT().GetScalersCache(context.Background(), uniquelyNamedScaledObject).Return(&scalersCache, nil) mockClient.EXPECT().Status().Return(mockStatusWriter) mockStatusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()) @@ -167,7 +185,7 @@ var _ = Describe("ScaledObjectController", func() { It("should pass metric name validation", func() { // Generate test data - testScalers := make([]scalers.Scaler, 0) + testScalers := make([]cache.ScalerBuilder, 0) for i := 0; i < 4; i++ { config := &scalers.ScalerConfig{ Name: fmt.Sprintf("test.%d", i), @@ -182,11 +200,19 @@ var _ = Describe("ScaledObjectController", func() { Fail(err.Error()) } - testScalers = append(testScalers, s) + testScalers = append(testScalers, cache.ScalerBuilder{ + Scaler: s, + Factory: func() (scalers.Scaler, error) { + return s, nil + }, + }) + } + scalersCache := cache.ScalersCache{ + Scalers: testScalers, } // Set up expectations - mockScaleHandler.EXPECT().GetScalers(context.Background(), duplicateNamedScaledObject).Return(testScalers, nil) + mockScaleHandler.EXPECT().GetScalersCache(context.Background(), duplicateNamedScaledObject).Return(&scalersCache, nil) // Call function tobe tested metricSpecs, err := metricNameTestReconciler.getScaledObjectMetricSpecs(context.Background(), testLogger, duplicateNamedScaledObject) diff --git a/pkg/mock/mock_scale/mock_interfaces.go b/pkg/mock/mock_scale/mock_interfaces.go index 4cb4a636375..48c6afb71bd 100644 --- a/pkg/mock/mock_scale/mock_interfaces.go +++ b/pkg/mock/mock_scale/mock_interfaces.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: /home/ecomaz/go/pkg/mod/k8s.io/client-go@v0.22.2/scale/interfaces.go +// Source: /home/ahmed/go/pkg/mod/k8s.io/client-go@v0.22.2/scale/interfaces.go // Package mock_scale is a generated GoMock package. package mock_scale diff --git a/pkg/mock/mock_scaling/mock_interface.go b/pkg/mock/mock_scaling/mock_interface.go index f398a17fc12..6ac7c5e7ee1 100644 --- a/pkg/mock/mock_scaling/mock_interface.go +++ b/pkg/mock/mock_scaling/mock_interface.go @@ -9,7 +9,7 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" - scalers "github.com/kedacore/keda/v2/pkg/scalers" + cache "github.com/kedacore/keda/v2/pkg/scaling/cache" ) // MockScaleHandler is a mock of ScaleHandler interface. @@ -35,6 +35,18 @@ func (m *MockScaleHandler) EXPECT() *MockScaleHandlerMockRecorder { return m.recorder } +// ClearScalersCache mocks base method. +func (m *MockScaleHandler) ClearScalersCache(ctx context.Context, name, namespace string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ClearScalersCache", ctx, name, namespace) +} + +// ClearScalersCache indicates an expected call of ClearScalersCache. +func (mr *MockScaleHandlerMockRecorder) ClearScalersCache(ctx, name, namespace interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearScalersCache", reflect.TypeOf((*MockScaleHandler)(nil).ClearScalersCache), ctx, name, namespace) +} + // DeleteScalableObject mocks base method. func (m *MockScaleHandler) DeleteScalableObject(scalableObject interface{}) error { m.ctrl.T.Helper() @@ -49,19 +61,19 @@ func (mr *MockScaleHandlerMockRecorder) DeleteScalableObject(scalableObject inte return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteScalableObject", reflect.TypeOf((*MockScaleHandler)(nil).DeleteScalableObject), scalableObject) } -// GetScalers mocks base method. -func (m *MockScaleHandler) GetScalers(ctx context.Context, scalableObject interface{}) ([]scalers.Scaler, error) { +// GetScalersCache mocks base method. +func (m *MockScaleHandler) GetScalersCache(ctx context.Context, scalableObject interface{}) (*cache.ScalersCache, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetScalers", ctx, scalableObject) - ret0, _ := ret[0].([]scalers.Scaler) + ret := m.ctrl.Call(m, "GetScalersCache", ctx, scalableObject) + ret0, _ := ret[0].(*cache.ScalersCache) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetScalers indicates an expected call of GetScalers. -func (mr *MockScaleHandlerMockRecorder) GetScalers(ctx, scalableObject interface{}) *gomock.Call { +// GetScalersCache indicates an expected call of GetScalersCache. +func (mr *MockScaleHandlerMockRecorder) GetScalersCache(ctx, scalableObject interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetScalers", reflect.TypeOf((*MockScaleHandler)(nil).GetScalers), ctx, scalableObject) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetScalersCache", reflect.TypeOf((*MockScaleHandler)(nil).GetScalersCache), ctx, scalableObject) } // HandleScalableObject mocks base method. diff --git a/pkg/provider/fallback.go b/pkg/provider/fallback.go index 8e186d685ed..164bf56cfc5 100644 --- a/pkg/provider/fallback.go +++ b/pkg/provider/fallback.go @@ -23,26 +23,23 @@ import ( "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/metrics/pkg/apis/external_metrics" runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" - "github.com/kedacore/keda/v2/pkg/scalers" ) func isFallbackEnabled(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.MetricSpec) bool { return scaledObject.Spec.Fallback != nil && metricSpec.External.Target.Type == v2beta2.AverageValueMetricType } -func (p *KedaProvider) getMetricsWithFallback(ctx context.Context, scaler scalers.Scaler, metricName string, metricSelector labels.Selector, scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.MetricSpec) ([]external_metrics.ExternalMetricValue, error) { +func (p *KedaProvider) getMetricsWithFallback(ctx context.Context, metrics []external_metrics.ExternalMetricValue, suppressedError error, metricName string, scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.MetricSpec) ([]external_metrics.ExternalMetricValue, error) { status := scaledObject.Status.DeepCopy() initHealthStatus(status) - metrics, err := scaler.GetMetrics(ctx, metricName, metricSelector) healthStatus := getHealthStatus(status, metricName) - if err == nil { + if suppressedError == nil { zero := int32(0) healthStatus.NumberOfFailures = &zero healthStatus.Status = kedav1alpha1.HealthStatusHappy @@ -60,14 +57,14 @@ func (p *KedaProvider) getMetricsWithFallback(ctx context.Context, scaler scaler switch { case !isFallbackEnabled(scaledObject, metricSpec): - return nil, err + return nil, suppressedError case !validateFallback(scaledObject): logger.Info("Failed to validate ScaledObject Spec. Please check that parameters are positive integers") - return nil, err + return nil, suppressedError case *healthStatus.NumberOfFailures > scaledObject.Spec.Fallback.FailureThreshold: - return doFallback(scaledObject, metricSpec, metricName, err), nil + return doFallback(scaledObject, metricSpec, metricName, suppressedError), nil default: - return nil, err + return nil, suppressedError } } diff --git a/pkg/provider/fallback_test.go b/pkg/provider/fallback_test.go index 8646ff598ca..a25aeca75db 100644 --- a/pkg/provider/fallback_test.go +++ b/pkg/provider/fallback_test.go @@ -87,7 +87,8 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(3) expectStatusPatch(ctrl, client) - metrics, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) + metrics, err := scaler.GetMetrics(context.Background(), metricName, nil) + metrics, err = providerUnderTest.getMetricsWithFallback(context.Background(), metrics, err, metricName, so, metricSpec) Expect(err).ToNot(HaveOccurred()) value, _ := metrics[0].Value.AsInt64() @@ -117,7 +118,8 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(3) expectStatusPatch(ctrl, client) - metrics, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) + metrics, err := scaler.GetMetrics(context.Background(), metricName, nil) + metrics, err = providerUnderTest.getMetricsWithFallback(context.Background(), metrics, err, metricName, so, metricSpec) Expect(err).ToNot(HaveOccurred()) value, _ := metrics[0].Value.AsInt64() @@ -132,7 +134,8 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(3) expectStatusPatch(ctrl, client) - _, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) + metrics, err := scaler.GetMetrics(context.Background(), metricName, nil) + _, err = providerUnderTest.getMetricsWithFallback(context.Background(), metrics, err, metricName, so, metricSpec) Expect(err).ShouldNot(BeNil()) Expect(err.Error()).Should(Equal("Some error")) @@ -160,7 +163,8 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(10) expectStatusPatch(ctrl, client) - _, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) + metrics, err := scaler.GetMetrics(context.Background(), metricName, nil) + _, err = providerUnderTest.getMetricsWithFallback(context.Background(), metrics, err, metricName, so, metricSpec) Expect(err).ShouldNot(BeNil()) Expect(err.Error()).Should(Equal("Some error")) @@ -189,7 +193,8 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(10) expectStatusPatch(ctrl, client) - metrics, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) + metrics, err := scaler.GetMetrics(context.Background(), metricName, nil) + metrics, err = providerUnderTest.getMetricsWithFallback(context.Background(), metrics, err, metricName, so, metricSpec) Expect(err).ToNot(HaveOccurred()) value, _ := metrics[0].Value.AsInt64() @@ -244,7 +249,8 @@ var _ = Describe("fallback", func() { statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("Some error")) client.EXPECT().Status().Return(statusWriter) - metrics, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) + metrics, err := scaler.GetMetrics(context.Background(), metricName, nil) + metrics, err = providerUnderTest.getMetricsWithFallback(context.Background(), metrics, err, metricName, so, metricSpec) Expect(err).ToNot(HaveOccurred()) value, _ := metrics[0].Value.AsInt64() @@ -273,7 +279,8 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(10) expectStatusPatch(ctrl, client) - _, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) + metrics, err := scaler.GetMetrics(context.Background(), metricName, nil) + _, err = providerUnderTest.getMetricsWithFallback(context.Background(), metrics, err, metricName, so, metricSpec) Expect(err).ShouldNot(BeNil()) Expect(err.Error()).Should(Equal("Some error")) @@ -306,7 +313,8 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(10) expectStatusPatch(ctrl, client) - _, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) + metrics, err := scaler.GetMetrics(context.Background(), metricName, nil) + _, err = providerUnderTest.getMetricsWithFallback(context.Background(), metrics, err, metricName, so, metricSpec) Expect(err).ToNot(HaveOccurred()) condition := so.Status.Conditions.GetFallbackCondition() Expect(condition.IsTrue()).Should(BeTrue()) @@ -339,7 +347,8 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(10) expectStatusPatch(ctrl, client) - _, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) + metrics, err := scaler.GetMetrics(context.Background(), metricName, nil) + _, err = providerUnderTest.getMetricsWithFallback(context.Background(), metrics, err, metricName, so, metricSpec) Expect(err).ShouldNot(BeNil()) Expect(err.Error()).Should(Equal("Some error")) condition := so.Status.Conditions.GetFallbackCondition() diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index b306e32ce3f..2c46057a8fa 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -92,14 +92,18 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, } scaledObject := &scaledObjects.Items[0] - matchingMetrics := []external_metrics.ExternalMetricValue{} - scalers, err := p.scaleHandler.GetScalers(ctx, scaledObject) + var matchingMetrics []external_metrics.ExternalMetricValue + cache, err := p.scaleHandler.GetScalersCache(ctx, scaledObject) + if err != nil { + return nil, err + } + metricsServer.RecordScalerObjectError(scaledObject.Namespace, scaledObject.Name, err) if err != nil { return nil, fmt.Errorf("error when getting scalers %s", err) } - for scalerIndex, scaler := range scalers { + for scalerIndex, scaler := range cache.GetScalers() { metricSpecs := scaler.GetMetricSpecForScaling(ctx) scalerName := strings.Replace(fmt.Sprintf("%T", scaler), "*scalers.", "", 1) @@ -110,7 +114,8 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, } // Filter only the desired metric if strings.EqualFold(metricSpec.External.Metric.Name, info.Metric) { - metrics, err := p.getMetricsWithFallback(ctx, scaler, info.Metric, metricSelector, scaledObject, metricSpec) + metrics, err := cache.GetMetricsForScaler(ctx, scalerIndex, info.Metric, metricSelector) + metrics, err = p.getMetricsWithFallback(ctx, metrics, err, info.Metric, scaledObject, metricSpec) if err != nil { logger.Error(err, "error getting metric for scaler", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "scaler", scaler) @@ -124,7 +129,6 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, metricsServer.RecordHPAScalerError(namespace, scaledObject.Name, scalerName, scalerIndex, info.Metric, err) } } - scaler.Close(ctx) } if len(matchingMetrics) == 0 { diff --git a/pkg/scalers/scaler.go b/pkg/scalers/scaler.go index 5a75110daf9..32583336ede 100644 --- a/pkg/scalers/scaler.go +++ b/pkg/scalers/scaler.go @@ -21,7 +21,7 @@ import ( "fmt" "time" - v2beta2 "k8s.io/api/autoscaling/v2beta2" + "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/labels" "k8s.io/metrics/pkg/apis/external_metrics" diff --git a/pkg/scaling/cache/scalers_cache.go b/pkg/scaling/cache/scalers_cache.go new file mode 100644 index 00000000000..163b797421f --- /dev/null +++ b/pkg/scaling/cache/scalers_cache.go @@ -0,0 +1,341 @@ +/* +Copyright 2021 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" + "github.com/kedacore/keda/v2/pkg/eventreason" + "github.com/kedacore/keda/v2/pkg/scalers" + "k8s.io/api/autoscaling/v2beta2" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/record" + "k8s.io/metrics/pkg/apis/external_metrics" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +type ScalersCache struct { + Generation int64 + Scalers []ScalerBuilder + Logger logr.Logger + Recorder record.EventRecorder +} + +type ScalerBuilder struct { + Scaler scalers.Scaler + Factory func() (scalers.Scaler, error) +} + +func (c *ScalersCache) GetScalers() []scalers.Scaler { + result := make([]scalers.Scaler, 0, len(c.Scalers)) + for _, s := range c.Scalers { + result = append(result, s.Scaler) + } + return result +} + +func (c *ScalersCache) GetPushScalers() []scalers.PushScaler { + var result []scalers.PushScaler + for _, s := range c.Scalers { + if ps, ok := s.Scaler.(scalers.PushScaler); ok { + result = append(result, ps) + } + } + return result +} + +func (c *ScalersCache) GetMetricsForScaler(ctx context.Context, id int, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + if id < 0 || id >= len(c.Scalers) { + return nil, fmt.Errorf("scaler with id %d not found. Len = %d", id, len(c.Scalers)) + } + m, err := c.Scalers[id].Scaler.GetMetrics(ctx, metricName, metricSelector) + if err == nil { + return m, nil + } + + ns, err := c.refreshScaler(ctx, id) + if err != nil { + return nil, err + } + + return ns.GetMetrics(ctx, metricName, metricSelector) +} + +func (c *ScalersCache) IsScaledObjectActive(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject) (bool, bool, []external_metrics.ExternalMetricValue) { + isActive := false + isError := false + for i, s := range c.Scalers { + isTriggerActive, err := s.Scaler.IsActive(ctx) + if err != nil { + var ns scalers.Scaler + ns, err = c.refreshScaler(ctx, i) + if err == nil { + isTriggerActive, err = ns.IsActive(ctx) + } + } + + if err != nil { + c.Logger.V(1).Info("Error getting scale decision", "Error", err) + isError = true + c.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) + } else if isTriggerActive { + isActive = true + if externalMetricsSpec := s.Scaler.GetMetricSpecForScaling(ctx)[0].External; externalMetricsSpec != nil { + c.Logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", externalMetricsSpec.Metric.Name) + } + if resourceMetricsSpec := s.Scaler.GetMetricSpecForScaling(ctx)[0].Resource; resourceMetricsSpec != nil { + c.Logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", resourceMetricsSpec.Name) + } + break + } + } + + return isActive, isError, []external_metrics.ExternalMetricValue{} +} + +func (c *ScalersCache) IsScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) { + var queueLength int64 + var maxValue int64 + isActive := false + + logger := logf.Log.WithName("scalemetrics") + scalersMetrics := c.getScaledJobMetrics(ctx, scaledJob) + switch scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation { + case "min": + for _, metrics := range scalersMetrics { + if (queueLength == 0 || metrics.queueLength < queueLength) && metrics.isActive { + queueLength = metrics.queueLength + maxValue = metrics.maxValue + isActive = metrics.isActive + } + } + case "avg": + queueLengthSum := int64(0) + maxValueSum := int64(0) + length := 0 + for _, metrics := range scalersMetrics { + if metrics.isActive { + queueLengthSum += metrics.queueLength + maxValueSum += metrics.maxValue + isActive = metrics.isActive + length++ + } + } + if length != 0 { + queueLength = divideWithCeil(queueLengthSum, int64(length)) + maxValue = divideWithCeil(maxValueSum, int64(length)) + } + case "sum": + for _, metrics := range scalersMetrics { + if metrics.isActive { + queueLength += metrics.queueLength + maxValue += metrics.maxValue + isActive = metrics.isActive + } + } + default: // max + for _, metrics := range scalersMetrics { + if metrics.queueLength > queueLength && metrics.isActive { + queueLength = metrics.queueLength + maxValue = metrics.maxValue + isActive = metrics.isActive + } + } + } + maxValue = min(scaledJob.MaxReplicaCount(), maxValue) + logger.V(1).WithValues("ScaledJob", scaledJob.Name).Info("Checking if ScaleJob Scalers are active", "isActive", isActive, "maxValue", maxValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation) + + return isActive, queueLength, maxValue +} + +func (c *ScalersCache) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + var metrics []external_metrics.ExternalMetricValue + for i, s := range c.Scalers { + m, err := s.Scaler.GetMetrics(ctx, metricName, metricSelector) + if err != nil { + ns, err := c.refreshScaler(ctx, i) + if err != nil { + return metrics, err + } + m, err = ns.GetMetrics(ctx, metricName, metricSelector) + if err != nil { + return metrics, err + } + } + metrics = append(metrics, m...) + } + + return metrics, nil +} + +func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scaler, error) { + if id < 0 || id >= len(c.Scalers) { + return nil, fmt.Errorf("scaler with id %d not found. Len = %d", id, len(c.Scalers)) + } + + sb := c.Scalers[id] + ns, err := sb.Factory() + if err != nil { + return nil, err + } + + c.Scalers[id] = ScalerBuilder{ + Scaler: ns, + Factory: sb.Factory, + } + sb.Scaler.Close(ctx) + + return ns, nil +} + +func (c *ScalersCache) GetMetricSpecForScaling(ctx context.Context) []v2beta2.MetricSpec { + var spec []v2beta2.MetricSpec + for _, s := range c.Scalers { + spec = append(spec, s.Scaler.GetMetricSpecForScaling(ctx)...) + } + return spec +} + +func (c *ScalersCache) Close(ctx context.Context) { + scalers := c.Scalers + c.Scalers = nil + for _, s := range scalers { + err := s.Scaler.Close(ctx) + if err != nil { + c.Logger.Error(err, "error closing scaler", "scaler", s) + } + } +} + +type scalerMetrics struct { + queueLength int64 + maxValue int64 + isActive bool +} + +func (c *ScalersCache) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scalerMetrics { + var scalersMetrics []scalerMetrics + for i, s := range c.Scalers { + var queueLength int64 + var targetAverageValue int64 + isActive := false + maxValue := int64(0) + scalerType := fmt.Sprintf("%T:", s) + + scalerLogger := c.Logger.WithValues("ScaledJob", scaledJob.Name, "Scaler", scalerType) + + metricSpecs := s.Scaler.GetMetricSpecForScaling(ctx) + + // skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata) + // or skip cpu/memory resource scaler + if len(metricSpecs) < 1 || metricSpecs[0].External == nil { + continue + } + + isTriggerActive, err := s.Scaler.IsActive(ctx) + if err != nil { + var ns scalers.Scaler + ns, err = c.refreshScaler(ctx, i) + if err == nil { + isTriggerActive, err = ns.IsActive(ctx) + } + } + + if err != nil { + scalerLogger.V(1).Info("Error getting scaler.IsActive, but continue", "Error", err) + c.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) + continue + } + + targetAverageValue = getTargetAverageValue(metricSpecs) + + metrics, err := s.Scaler.GetMetrics(ctx, "queueLength", nil) + if err != nil { + scalerLogger.V(1).Info("Error getting scaler metrics, but continue", "Error", err) + c.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) + continue + } + + var metricValue int64 + + for _, m := range metrics { + if m.MetricName == "queueLength" { + metricValue, _ = m.Value.AsInt64() + queueLength += metricValue + } + } + scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, "queueLength", queueLength, "targetAverageValue", targetAverageValue) + + if isTriggerActive { + isActive = true + } + + if targetAverageValue != 0 { + maxValue = min(scaledJob.MaxReplicaCount(), divideWithCeil(queueLength, targetAverageValue)) + } + scalersMetrics = append(scalersMetrics, scalerMetrics{ + queueLength: queueLength, + maxValue: maxValue, + isActive: isActive, + }) + } + return scalersMetrics +} + +func getTargetAverageValue(metricSpecs []v2beta2.MetricSpec) int64 { + var targetAverageValue int64 + var metricValue int64 + var flag bool + for _, metric := range metricSpecs { + if metric.External.Target.AverageValue == nil { + metricValue = 0 + } else { + metricValue, flag = metric.External.Target.AverageValue.AsInt64() + if !flag { + metricValue = 0 + } + } + + targetAverageValue += metricValue + } + count := int64(len(metricSpecs)) + if count != 0 { + return targetAverageValue / count + } + return 0 +} + +func divideWithCeil(x, y int64) int64 { + ans := x / y + remainder := x % y + if remainder != 0 { + return ans + 1 + } + return ans +} + +// Min function for int64 +func min(x, y int64) int64 { + if x > y { + return y + } + return x +} diff --git a/pkg/scaling/scaledjob/scale_metrics_test.go b/pkg/scaling/cache/scalers_cache_test.go similarity index 71% rename from pkg/scaling/scaledjob/scale_metrics_test.go rename to pkg/scaling/cache/scalers_cache_test.go index 64bbecd556b..6f7995dc0a1 100644 --- a/pkg/scaling/scaledjob/scale_metrics_test.go +++ b/pkg/scaling/cache/scalers_cache_test.go @@ -1,12 +1,13 @@ -package scaledjob +package cache import ( "context" "fmt" "testing" - "github.com/go-playground/assert/v2" + "github.com/go-logr/logr" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/client-go/tools/record" @@ -66,24 +67,44 @@ func TestIsScaledJobActive(t *testing.T) { // Keep the current behavior // Assme 1 trigger only scaledJobSingle := createScaledObject(100, "") // testing default = max - scalerSingle := []scalers.Scaler{ - createScaler(ctrl, int64(20), int32(2), true), + scalerSingle := []ScalerBuilder{{ + Scaler: createScaler(ctrl, int64(20), int32(2), true), + Factory: func() (scalers.Scaler, error) { + return createScaler(ctrl, int64(20), int32(2), true), nil + }, + }} + + cache := ScalersCache{ + Scalers: scalerSingle, + Logger: logr.DiscardLogger{}, + Recorder: recorder, } - isActive, queueLength, maxValue := GetScaleMetrics(context.TODO(), scalerSingle, scaledJobSingle, recorder) + isActive, queueLength, maxValue := cache.IsScaledJobActive(context.TODO(), scaledJobSingle) assert.Equal(t, true, isActive) assert.Equal(t, int64(20), queueLength) assert.Equal(t, int64(10), maxValue) + cache.Close(context.Background()) // Non-Active trigger only - scalerSingle = []scalers.Scaler{ - createScaler(ctrl, int64(0), int32(2), false), + scalerSingle = []ScalerBuilder{{ + Scaler: createScaler(ctrl, int64(0), int32(2), false), + Factory: func() (scalers.Scaler, error) { + return createScaler(ctrl, int64(0), int32(2), false), nil + }, + }} + + cache = ScalersCache{ + Scalers: scalerSingle, + Logger: logr.DiscardLogger{}, + Recorder: recorder, } - isActive, queueLength, maxValue = GetScaleMetrics(context.TODO(), scalerSingle, scaledJobSingle, recorder) + isActive, queueLength, maxValue = cache.IsScaledJobActive(context.TODO(), scaledJobSingle) assert.Equal(t, false, isActive) assert.Equal(t, int64(0), queueLength) assert.Equal(t, int64(0), maxValue) + cache.Close(context.Background()) // Test the valiation scalerTestDatam := []scalerTestData{ @@ -96,18 +117,40 @@ func TestIsScaledJobActive(t *testing.T) { for index, scalerTestData := range scalerTestDatam { scaledJob := createScaledObject(scalerTestData.MaxReplicaCount, scalerTestData.MultipleScalersCalculation) - scalers := []scalers.Scaler{ - createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive), - createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive), - createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive), - createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive), + scalersToTest := []ScalerBuilder{{ + Scaler: createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive), + Factory: func() (scalers.Scaler, error) { + return createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive), nil + }, + }, { + Scaler: createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive), + Factory: func() (scalers.Scaler, error) { + return createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive), nil + }, + }, { + Scaler: createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive), + Factory: func() (scalers.Scaler, error) { + return createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive), nil + }, + }, { + Scaler: createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive), + Factory: func() (scalers.Scaler, error) { + return createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive), nil + }, + }} + + cache = ScalersCache{ + Scalers: scalersToTest, + Logger: logr.DiscardLogger{}, + Recorder: recorder, } fmt.Printf("index: %d", index) - isActive, queueLength, maxValue = GetScaleMetrics(context.TODO(), scalers, scaledJob, recorder) + isActive, queueLength, maxValue = cache.IsScaledJobActive(context.TODO(), scaledJob) // assert.Equal(t, 5, index) assert.Equal(t, scalerTestData.ResultIsActive, isActive) assert.Equal(t, scalerTestData.ResultQueueLength, queueLength) assert.Equal(t, scalerTestData.ResultMaxValue, maxValue) + cache.Close(context.Background()) } } diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index c2bb8638939..8a0e7e6fb18 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -19,6 +19,7 @@ package scaling import ( "context" "fmt" + "strings" "sync" "time" @@ -34,9 +35,9 @@ import ( kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/pkg/eventreason" "github.com/kedacore/keda/v2/pkg/scalers" + "github.com/kedacore/keda/v2/pkg/scaling/cache" "github.com/kedacore/keda/v2/pkg/scaling/executor" "github.com/kedacore/keda/v2/pkg/scaling/resolver" - "github.com/kedacore/keda/v2/pkg/scaling/scaledjob" ) // ScaleHandler encapsulates the logic of calling the right scalers for @@ -44,7 +45,8 @@ import ( type ScaleHandler interface { HandleScalableObject(scalableObject interface{}) error DeleteScalableObject(scalableObject interface{}) error - GetScalers(ctx context.Context, scalableObject interface{}) ([]scalers.Scaler, error) + GetScalersCache(ctx context.Context, scalableObject interface{}) (*cache.ScalersCache, error) + ClearScalersCache(ctx context.Context, name, namespace string) } type scaleHandler struct { @@ -54,6 +56,8 @@ type scaleHandler struct { scaleExecutor executor.ScaleExecutor globalHTTPTimeout time.Duration recorder record.EventRecorder + scalerCaches map[string]*cache.ScalersCache + lock *sync.RWMutex } // NewScaleHandler creates a ScaleHandler object @@ -65,23 +69,11 @@ func NewScaleHandler(client client.Client, scaleClient scale.ScalesGetter, recon scaleExecutor: executor.NewScaleExecutor(client, scaleClient, reconcilerScheme, recorder), globalHTTPTimeout: globalHTTPTimeout, recorder: recorder, + scalerCaches: map[string]*cache.ScalersCache{}, + lock: &sync.RWMutex{}, } } -func (h *scaleHandler) GetScalers(ctx context.Context, scalableObject interface{}) ([]scalers.Scaler, error) { - withTriggers, err := asDuckWithTriggers(scalableObject) - if err != nil { - return nil, err - } - - podTemplateSpec, containerName, err := resolver.ResolveScaleTargetPodSpec(h.client, h.logger, scalableObject) - if err != nil { - return nil, err - } - - return h.buildScalers(ctx, withTriggers, podTemplateSpec, containerName) -} - func (h *scaleHandler) HandleScalableObject(scalableObject interface{}) error { withTriggers, err := asDuckWithTriggers(scalableObject) if err != nil { @@ -146,46 +138,89 @@ func (h *scaleHandler) DeleteScalableObject(scalableObject interface{}) error { func (h *scaleHandler) startScaleLoop(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, scalableObject interface{}, scalingMutex sync.Locker) { logger := h.logger.WithValues("type", withTriggers.Kind, "namespace", withTriggers.Namespace, "name", withTriggers.Name) - // kick off one check to the scalers now - h.checkScalers(ctx, scalableObject, scalingMutex) - pollingInterval := withTriggers.GetPollingInterval() logger.V(1).Info("Watching with pollingInterval", "PollingInterval", pollingInterval) for { tmr := time.NewTimer(pollingInterval) + h.checkScalers(ctx, scalableObject, scalingMutex) select { case <-tmr.C: - h.checkScalers(ctx, scalableObject, scalingMutex) tmr.Stop() case <-ctx.Done(): logger.V(1).Info("Context canceled") + h.ClearScalersCache(ctx, withTriggers.Name, withTriggers.Namespace) tmr.Stop() return } } } +func (h *scaleHandler) GetScalersCache(ctx context.Context, scalableObject interface{}) (*cache.ScalersCache, error) { + withTriggers, err := asDuckWithTriggers(scalableObject) + if err != nil { + return nil, err + } + + key := strings.ToLower(fmt.Sprintf("%s.%s.%s", withTriggers.Kind, withTriggers.Name, withTriggers.Namespace)) + + h.lock.RLock() + if cache, ok := h.scalerCaches[key]; ok && cache.Generation == withTriggers.Generation { + h.lock.RUnlock() + return cache, nil + } + h.lock.RUnlock() + + h.lock.Lock() + defer h.lock.Unlock() + if cache, ok := h.scalerCaches[key]; ok && cache.Generation == withTriggers.Generation { + return cache, nil + } else if ok { + cache.Close(ctx) + } + + podTemplateSpec, containerName, err := resolver.ResolveScaleTargetPodSpec(h.client, h.logger, scalableObject) + if err != nil { + return nil, err + } + + scalers := h.buildScalers(ctx, withTriggers, podTemplateSpec, containerName) + + h.scalerCaches[key] = &cache.ScalersCache{ + Generation: withTriggers.Generation, + Scalers: scalers, + Logger: h.logger, + Recorder: h.recorder, + } + + return h.scalerCaches[key], nil +} + +func (h *scaleHandler) ClearScalersCache(ctx context.Context, name, namespace string) { + h.lock.Lock() + defer h.lock.Unlock() + + key := strings.ToLower(fmt.Sprintf("%s.%s", name, namespace)) + if cache, ok := h.scalerCaches[key]; ok { + cache.Close(ctx) + delete(h.scalerCaches, key) + } +} + func (h *scaleHandler) startPushScalers(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, scalableObject interface{}, scalingMutex sync.Locker) { logger := h.logger.WithValues("type", withTriggers.Kind, "namespace", withTriggers.Namespace, "name", withTriggers.Name) - ss, err := h.GetScalers(ctx, scalableObject) + cache, err := h.GetScalersCache(ctx, scalableObject) if err != nil { logger.Error(err, "Error getting scalers", "object", scalableObject) return } - for _, s := range ss { - scaler, ok := s.(scalers.PushScaler) - if !ok { - s.Close(ctx) - continue - } - - go func() { + for _, ps := range cache.GetPushScalers() { + go func(s scalers.PushScaler) { activeCh := make(chan bool) - go scaler.Run(ctx, activeCh) - defer scaler.Close(ctx) + go s.Run(ctx, activeCh) + defer s.Close(ctx) for { select { case <-ctx.Done(): @@ -201,14 +236,14 @@ func (h *scaleHandler) startPushScalers(ctx context.Context, withTriggers *kedav scalingMutex.Unlock() } } - }() + }(ps) } } // checkScalers contains the main logic for the ScaleHandler scaling logic. // It'll check each trigger active status then call RequestScale func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interface{}, scalingMutex sync.Locker) { - scalers, err := h.GetScalers(ctx, scalableObject) + cache, err := h.GetScalersCache(ctx, scalableObject) if err != nil { h.logger.Error(err, "Error getting scalers", "object", scalableObject) return @@ -223,7 +258,7 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac h.logger.Error(err, "Error getting scaledObject", "object", scalableObject) return } - isActive, isError := h.isScaledObjectActive(ctx, scalers, obj) + isActive, isError, _ := cache.IsScaledObjectActive(ctx, obj) h.scaleExecutor.RequestScale(ctx, obj, isActive, isError) case *kedav1alpha1.ScaledJob: err = h.client.Get(ctx, types.NamespacedName{Name: obj.Name, Namespace: obj.Namespace}, obj) @@ -231,83 +266,62 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac h.logger.Error(err, "Error getting scaledJob", "object", scalableObject) return } - isActive, scaleTo, maxScale := h.isScaledJobActive(ctx, scalers, obj) + isActive, scaleTo, maxScale := cache.IsScaledJobActive(ctx, obj) h.scaleExecutor.RequestJobScale(ctx, obj, isActive, scaleTo, maxScale) } } -func (h *scaleHandler) isScaledObjectActive(ctx context.Context, scalers []scalers.Scaler, scaledObject *kedav1alpha1.ScaledObject) (bool, bool) { - isActive := false - isError := false - for i, scaler := range scalers { - isTriggerActive, err := scaler.IsActive(ctx) - scaler.Close(ctx) - - if err != nil { - h.logger.V(1).Info("Error getting scale decision", "Error", err) - isError = true - h.recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) - continue - } else if isTriggerActive { - isActive = true - if externalMetricsSpec := scaler.GetMetricSpecForScaling(ctx)[0].External; externalMetricsSpec != nil { - h.logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", externalMetricsSpec.Metric.Name) - } - if resourceMetricsSpec := scaler.GetMetricSpecForScaling(ctx)[0].Resource; resourceMetricsSpec != nil { - h.logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", resourceMetricsSpec.Name) - } - closeScalers(ctx, scalers[i+1:]) - break - } - } - return isActive, isError -} - -func (h *scaleHandler) isScaledJobActive(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) { - return scaledjob.GetScaleMetrics(ctx, scalers, scaledJob, h.recorder) -} - // buildScalers returns list of Scalers for the specified triggers -func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, podTemplateSpec *corev1.PodTemplateSpec, containerName string) ([]scalers.Scaler, error) { +func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, podTemplateSpec *corev1.PodTemplateSpec, containerName string) []cache.ScalerBuilder { logger := h.logger.WithValues("type", withTriggers.Kind, "namespace", withTriggers.Namespace, "name", withTriggers.Name) - var scalersRes []scalers.Scaler var err error resolvedEnv := make(map[string]string) - if podTemplateSpec != nil { - resolvedEnv, err = resolver.ResolveContainerEnv(h.client, logger, &podTemplateSpec.Spec, containerName, withTriggers.Namespace) - if err != nil { - return scalersRes, fmt.Errorf("error resolving secrets for ScaleTarget: %s", err) - } - } + result := make([]cache.ScalerBuilder, 0, len(withTriggers.Spec.Triggers)) + + for scalerIndex, t := range withTriggers.Spec.Triggers { + triggerName, trigger := scalerIndex, t + factory := func() (scalers.Scaler, error) { + if podTemplateSpec != nil { + resolvedEnv, err = resolver.ResolveContainerEnv(h.client, logger, &podTemplateSpec.Spec, containerName, withTriggers.Namespace) + if err != nil { + return nil, fmt.Errorf("error resolving secrets for ScaleTarget: %s", err) + } + } + config := &scalers.ScalerConfig{ + Name: withTriggers.Name, + Namespace: withTriggers.Namespace, + TriggerMetadata: trigger.Metadata, + ResolvedEnv: resolvedEnv, + AuthParams: make(map[string]string), + GlobalHTTPTimeout: h.globalHTTPTimeout, + ScalerIndex: scalerIndex, + } - for scalerIndex, trigger := range withTriggers.Spec.Triggers { - config := &scalers.ScalerConfig{ - Name: withTriggers.Name, - Namespace: withTriggers.Namespace, - TriggerMetadata: trigger.Metadata, - ResolvedEnv: resolvedEnv, - AuthParams: make(map[string]string), - GlobalHTTPTimeout: h.globalHTTPTimeout, - ScalerIndex: scalerIndex, - } + config.AuthParams, config.PodIdentity, err = resolver.ResolveAuthRefAndPodIdentity(h.client, logger, trigger.AuthenticationRef, podTemplateSpec, withTriggers.Namespace) + if err != nil { + return nil, err + } - config.AuthParams, config.PodIdentity, err = resolver.ResolveAuthRefAndPodIdentity(h.client, logger, trigger.AuthenticationRef, podTemplateSpec, withTriggers.Namespace) - if err != nil { - closeScalers(ctx, scalersRes) - return []scalers.Scaler{}, err + return buildScaler(ctx, h.client, trigger.Type, config) } - scaler, err := buildScaler(ctx, h.client, trigger.Type, config) + scaler, err := factory() if err != nil { - closeScalers(ctx, scalersRes) h.recorder.Event(withTriggers, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) - return []scalers.Scaler{}, fmt.Errorf("error getting scaler for trigger #%d: %s", scalerIndex, err) + h.logger.Error(err, "error resolving auth params", "scalerIndex", scalerIndex, "object", withTriggers, "trigger", triggerName) + if scaler != nil { + scaler.Close(ctx) + } + continue } - scalersRes = append(scalersRes, scaler) + result = append(result, cache.ScalerBuilder{ + Scaler: scaler, + Factory: factory, + }) } - return scalersRes, nil + return result } func buildScaler(ctx context.Context, client client.Client, triggerType string, config *scalers.ScalerConfig) (scalers.Scaler, error) { @@ -430,9 +444,3 @@ func asDuckWithTriggers(scalableObject interface{}) (*kedav1alpha1.WithTriggers, return nil, fmt.Errorf("unknown scalable object type %v", scalableObject) } } - -func closeScalers(ctx context.Context, scalers []scalers.Scaler) { - for _, scaler := range scalers { - defer scaler.Close(ctx) - } -} diff --git a/pkg/scaling/scale_handler_test.go b/pkg/scaling/scale_handler_test.go index eaf1cbc1c8c..4aee0d3f607 100644 --- a/pkg/scaling/scale_handler_test.go +++ b/pkg/scaling/scale_handler_test.go @@ -19,45 +19,53 @@ package scaling import ( "context" "errors" - "sync" "testing" - "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" logf "sigs.k8s.io/controller-runtime/pkg/log" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" - "github.com/kedacore/keda/v2/pkg/mock/mock_client" mock_scalers "github.com/kedacore/keda/v2/pkg/mock/mock_scaler" "github.com/kedacore/keda/v2/pkg/scalers" - "github.com/kedacore/keda/v2/pkg/scaling/executor" + "github.com/kedacore/keda/v2/pkg/scaling/cache" ) func TestCheckScaledObjectScalersWithError(t *testing.T) { ctrl := gomock.NewController(t) - client := mock_client.NewMockClient(ctrl) recorder := record.NewFakeRecorder(1) - scaleHandler := &scaleHandler{ - client: client, - logger: logf.Log.WithName("scalehandler"), - scaleLoopContexts: &sync.Map{}, - scaleExecutor: executor.NewScaleExecutor(client, nil, nil, recorder), - globalHTTPTimeout: 5 * time.Second, - recorder: recorder, + factory := func() (scalers.Scaler, error) { + scaler := mock_scalers.NewMockScaler(ctrl) + scaler.EXPECT().IsActive(gomock.Any()).Return(false, errors.New("some error")) + scaler.EXPECT().Close(gomock.Any()) + return scaler, nil } - scaler := mock_scalers.NewMockScaler(ctrl) - scalers := []scalers.Scaler{scaler} - scaledObject := &kedav1alpha1.ScaledObject{} + scaler, err := factory() + assert.Nil(t, err) - scaler.EXPECT().IsActive(gomock.Any()).Return(false, errors.New("Some error")) - scaler.EXPECT().Close(gomock.Any()) + scaledObject := kedav1alpha1.ScaledObject{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + } + + cache := cache.ScalersCache{ + Scalers: []cache.ScalerBuilder{{ + Scaler: scaler, + Factory: factory, + }}, + Logger: logf.Log.WithName("scalehandler"), + Recorder: recorder, + } - isActive, isError := scaleHandler.isScaledObjectActive(context.TODO(), scalers, scaledObject) + isActive, isError, _ := cache.IsScaledObjectActive(context.TODO(), &scaledObject) + cache.Close(context.Background()) assert.Equal(t, false, isActive) assert.Equal(t, true, isError) @@ -65,22 +73,15 @@ func TestCheckScaledObjectScalersWithError(t *testing.T) { func TestCheckScaledObjectFindFirstActiveIgnoringOthers(t *testing.T) { ctrl := gomock.NewController(t) - client := mock_client.NewMockClient(ctrl) recorder := record.NewFakeRecorder(1) - - scaleHandler := &scaleHandler{ - client: client, - logger: logf.Log.WithName("scalehandler"), - scaleLoopContexts: &sync.Map{}, - scaleExecutor: executor.NewScaleExecutor(client, nil, nil, recorder), - globalHTTPTimeout: 5 * time.Second, - recorder: recorder, - } - activeScaler := mock_scalers.NewMockScaler(ctrl) failingScaler := mock_scalers.NewMockScaler(ctrl) - scalers := []scalers.Scaler{activeScaler, failingScaler} - scaledObject := &kedav1alpha1.ScaledObject{} + scaledObject := &kedav1alpha1.ScaledObject{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + } metricsSpecs := []v2beta2.MetricSpec{createMetricSpec(1)} @@ -89,7 +90,25 @@ func TestCheckScaledObjectFindFirstActiveIgnoringOthers(t *testing.T) { activeScaler.EXPECT().Close(gomock.Any()) failingScaler.EXPECT().Close(gomock.Any()) - isActive, isError := scaleHandler.isScaledObjectActive(context.TODO(), scalers, scaledObject) + factory := func() (scalers.Scaler, error) { + return mock_scalers.NewMockScaler(ctrl), nil + } + scalers := []cache.ScalerBuilder{{ + Scaler: activeScaler, + Factory: factory, + }, { + Scaler: failingScaler, + Factory: factory, + }} + + scalersCache := cache.ScalersCache{ + Scalers: scalers, + Logger: logf.Log.WithName("scalercache"), + Recorder: recorder, + } + + isActive, isError, _ := scalersCache.IsScaledObjectActive(context.TODO(), scaledObject) + scalersCache.Close(context.Background()) assert.Equal(t, true, isActive) assert.Equal(t, false, isError) diff --git a/pkg/scaling/scaledjob/scale_metrics.go b/pkg/scaling/scaledjob/scale_metrics.go deleted file mode 100644 index 1c702bd5042..00000000000 --- a/pkg/scaling/scaledjob/scale_metrics.go +++ /dev/null @@ -1,184 +0,0 @@ -package scaledjob - -import ( - "context" - "fmt" - - "github.com/go-logr/logr" - "k8s.io/api/autoscaling/v2beta2" - corev1 "k8s.io/api/core/v1" - "k8s.io/client-go/tools/record" - logf "sigs.k8s.io/controller-runtime/pkg/log" - - kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" - "github.com/kedacore/keda/v2/pkg/eventreason" - "github.com/kedacore/keda/v2/pkg/scalers" -) - -type scalerMetrics struct { - queueLength int64 - maxValue int64 - isActive bool -} - -// GetScaleMetrics gets the metrics for decision making of scaling. -func GetScaleMetrics(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob, recorder record.EventRecorder) (bool, int64, int64) { - var queueLength int64 - var maxValue int64 - isActive := false - - logger := logf.Log.WithName("scalemetrics") - scalersMetrics := getScalersMetrics(ctx, scalers, scaledJob, logger, recorder) - switch scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation { - case "min": - for _, metrics := range scalersMetrics { - if (queueLength == 0 || metrics.queueLength < queueLength) && metrics.isActive { - queueLength = metrics.queueLength - maxValue = metrics.maxValue - isActive = metrics.isActive - } - } - case "avg": - queueLengthSum := int64(0) - maxValueSum := int64(0) - length := 0 - for _, metrics := range scalersMetrics { - if metrics.isActive { - queueLengthSum += metrics.queueLength - maxValueSum += metrics.maxValue - isActive = metrics.isActive - length++ - } - } - if length != 0 { - queueLength = divideWithCeil(queueLengthSum, int64(length)) - maxValue = divideWithCeil(maxValueSum, int64(length)) - } - case "sum": - for _, metrics := range scalersMetrics { - if metrics.isActive { - queueLength += metrics.queueLength - maxValue += metrics.maxValue - isActive = metrics.isActive - } - } - default: // max - for _, metrics := range scalersMetrics { - if metrics.queueLength > queueLength && metrics.isActive { - queueLength = metrics.queueLength - maxValue = metrics.maxValue - isActive = metrics.isActive - } - } - } - maxValue = min(scaledJob.MaxReplicaCount(), maxValue) - logger.V(1).WithValues("ScaledJob", scaledJob.Name).Info("Checking if ScaleJob scalers are active", "isActive", isActive, "maxValue", maxValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation) - - return isActive, queueLength, maxValue -} - -func getScalersMetrics(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob, logger logr.Logger, recorder record.EventRecorder) []scalerMetrics { - scalersMetrics := []scalerMetrics{} - - for _, scaler := range scalers { - var queueLength int64 - var targetAverageValue int64 - isActive := false - maxValue := int64(0) - scalerType := fmt.Sprintf("%T:", scaler) - - scalerLogger := logger.WithValues("ScaledJob", scaledJob.Name, "Scaler", scalerType) - - metricSpecs := scaler.GetMetricSpecForScaling(ctx) - - // skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata) - // or skip cpu/memory resource scaler - if len(metricSpecs) < 1 || metricSpecs[0].External == nil { - continue - } - - isTriggerActive, err := scaler.IsActive(ctx) - if err != nil { - scalerLogger.V(1).Info("Error getting scaler.IsActive, but continue", "Error", err) - recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) - scaler.Close(ctx) - continue - } - - targetAverageValue = getTargetAverageValue(metricSpecs) - - metrics, err := scaler.GetMetrics(ctx, "queueLength", nil) - if err != nil { - scalerLogger.V(1).Info("Error getting scaler metrics, but continue", "Error", err) - recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) - scaler.Close(ctx) - continue - } - - var metricValue int64 - - for _, m := range metrics { - if m.MetricName == "queueLength" { - metricValue, _ = m.Value.AsInt64() - queueLength += metricValue - } - } - scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, "queueLength", queueLength, "targetAverageValue", targetAverageValue) - - scaler.Close(ctx) - - if isTriggerActive { - isActive = true - } - - if targetAverageValue != 0 { - maxValue = min(scaledJob.MaxReplicaCount(), divideWithCeil(queueLength, targetAverageValue)) - } - scalersMetrics = append(scalersMetrics, scalerMetrics{ - queueLength: queueLength, - maxValue: maxValue, - isActive: isActive, - }) - } - return scalersMetrics -} - -func getTargetAverageValue(metricSpecs []v2beta2.MetricSpec) int64 { - var targetAverageValue int64 - var metricValue int64 - var flag bool - for _, metric := range metricSpecs { - if metric.External.Target.AverageValue == nil { - metricValue = 0 - } else { - metricValue, flag = metric.External.Target.AverageValue.AsInt64() - if !flag { - metricValue = 0 - } - } - - targetAverageValue += metricValue - } - count := int64(len(metricSpecs)) - if count != 0 { - return targetAverageValue / count - } - return 0 -} - -func divideWithCeil(x, y int64) int64 { - ans := x / y - reminder := x % y - if reminder != 0 { - return ans + 1 - } - return ans -} - -// Min function for int64 -func min(x, y int64) int64 { - if x > y { - return y - } - return x -} diff --git a/tests/scalers/mongodb.test.ts b/tests/scalers/mongodb.test.ts index 4ccb12a8a02..1ee0c80f854 100644 --- a/tests/scalers/mongodb.test.ts +++ b/tests/scalers/mongodb.test.ts @@ -175,6 +175,24 @@ spec: const deployYaml = ` apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: mongodb-trigger +spec: + secretTargetRef: + - parameter: connectionString + name: mongodb-secret + key: connect +--- +apiVersion: v1 +kind: Secret +metadata: + name: mongodb-secret +type: Opaque +data: + connect: {{MONGODB_CONNECTION_STRING_BASE64}} +--- +apiVersion: keda.sh/v1alpha1 kind: ScaledJob metadata: name: {{MONGODB_JOB_NAME}} @@ -205,21 +223,4 @@ spec: authenticationRef: name: mongodb-trigger --- -apiVersion: keda.sh/v1alpha1 -kind: TriggerAuthentication -metadata: - name: mongodb-trigger -spec: - secretTargetRef: - - parameter: connectionString - name: mongodb-secret - key: connect ---- -apiVersion: v1 -kind: Secret -metadata: - name: mongodb-secret -type: Opaque -data: - connect: {{MONGODB_CONNECTION_STRING_BASE64}} `