Skip to content

Commit

Permalink
Cache metric names provided by KEDA Metrics Server (#2279)
Browse files Browse the repository at this point in the history
Signed-off-by: Zbynek Roubalik <zroubali@redhat.com>
  • Loading branch information
zroubalik authored Nov 23, 2021
1 parent 2b43d40 commit 988329b
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 47 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- Improve Redis Scaler, upgrade library, add username and Sentinel support ([#2181](https://github.com/kedacore/keda/pull/2181))
- Add GCP identity authentication when using Pubsub Scaler ([#2225](https://github.com/kedacore/keda/pull/2225))
- Add ScalersCache to reuse scalers unless they need changing ([#2187](https://github.com/kedacore/keda/pull/2187))
- Cache metric names provided by KEDA Metrics Server ([#2279](https://github.com/kedacore/keda/pull/2279))

### Improvements

Expand Down
14 changes: 10 additions & 4 deletions adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"runtime"
"strconv"
"sync"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -103,6 +104,8 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: "keda-metrics-adapter"})
handler := scaling.NewScaleHandler(kubeclient, nil, scheme, globalHTTPTimeout, recorder)
externalMetricsInfo := &[]provider.ExternalMetricInfo{}
externalMetricsInfoLock := &sync.RWMutex{}

namespace, err := getWatchNamespace()
if err != nil {
Expand All @@ -113,14 +116,14 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat
prometheusServer := &prommetrics.PrometheusMetricServer{}
go func() { prometheusServer.NewServer(fmt.Sprintf(":%v", prometheusMetricsPort), prometheusMetricsPath) }()
stopCh := make(chan struct{})
if err := runScaledObjectController(ctx, scheme, namespace, handler, logger, stopCh); err != nil {
if err := runScaledObjectController(ctx, scheme, namespace, handler, logger, externalMetricsInfo, externalMetricsInfoLock, stopCh); err != nil {
return nil, nil, err
}

return kedaprovider.NewProvider(ctx, logger, handler, kubeclient, namespace), stopCh, nil
return kedaprovider.NewProvider(ctx, logger, handler, kubeclient, namespace, externalMetricsInfo, externalMetricsInfoLock), stopCh, nil
}

func runScaledObjectController(ctx context.Context, scheme *k8sruntime.Scheme, namespace string, scaleHandler scaling.ScaleHandler, logger logr.Logger, stopCh chan<- struct{}) error {
func runScaledObjectController(ctx context.Context, scheme *k8sruntime.Scheme, namespace string, scaleHandler scaling.ScaleHandler, logger logr.Logger, externalMetricsInfo *[]provider.ExternalMetricInfo, externalMetricsInfoLock *sync.RWMutex, stopCh chan<- struct{}) error {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Namespace: namespace,
Expand All @@ -130,7 +133,10 @@ func runScaledObjectController(ctx context.Context, scheme *k8sruntime.Scheme, n
}

if err := (&kedacontrollers.MetricsScaledObjectReconciler{
ScaleHandler: scaleHandler,
Client: mgr.GetClient(),
ScaleHandler: scaleHandler,
ExternalMetricsInfo: externalMetricsInfo,
ExternalMetricsInfoLock: externalMetricsInfoLock,
}).SetupWithManager(mgr, controller.Options{}); err != nil {
return err
}
Expand Down
99 changes: 96 additions & 3 deletions controllers/keda/metrics_adapter_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,69 @@ package keda

import (
"context"
"sync"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/scaling"
"k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/custom-metrics-apiserver/pkg/provider"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/scaling"
)

type MetricsScaledObjectReconciler struct {
ScaleHandler scaling.ScaleHandler
Client client.Client
ScaleHandler scaling.ScaleHandler
ExternalMetricsInfo *[]provider.ExternalMetricInfo
ExternalMetricsInfoLock *sync.RWMutex
}

var (
scaledObjectsMetrics = map[string][]string{}
scaledObjectsMetricsLock = &sync.Mutex{}
)

func (r *MetricsScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
reqLogger := log.FromContext(ctx)

// Fetch the ScaledObject instance
scaledObject := &kedav1alpha1.ScaledObject{}
err := r.Client.Get(ctx, req.NamespacedName, scaledObject)
if err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
// Return and don't requeue

r.removeFromCache(req.NamespacedName.String())
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
reqLogger.Error(err, "Failed to get ScaledObject")
return ctrl.Result{}, err
}

// Check if the ScaledObject instance is marked to be deleted, which is
// indicated by the deletion timestamp being set.
// This depends on the preexisting finalizer setup in ScaledObjectController.
if scaledObject.GetDeletionTimestamp() != nil {
r.removeFromCache(req.NamespacedName.String())
return ctrl.Result{}, nil
}

reqLogger.V(1).Info("Reconciling ScaledObject", "externalMetricNames", scaledObject.Status.ExternalMetricNames)

// The ScaledObject hasn't yet been properly initialized and ExternalMetricsNames list popoluted => requeue
if scaledObject.Status.ExternalMetricNames == nil || len(scaledObject.Status.ExternalMetricNames) < 1 {
return ctrl.Result{Requeue: true}, nil
}

r.addToMetricsCache(req.NamespacedName.String(), scaledObject.Status.ExternalMetricNames)
r.ScaleHandler.ClearScalersCache(ctx, req.Name, req.Namespace)
return ctrl.Result{}, nil
}
Expand All @@ -43,3 +92,47 @@ func (r *MetricsScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, optio
WithOptions(options).
Complete(r)
}

func (r *MetricsScaledObjectReconciler) addToMetricsCache(namespacedName string, metrics []string) {
scaledObjectsMetricsLock.Lock()
defer scaledObjectsMetricsLock.Unlock()
scaledObjectsMetrics[namespacedName] = metrics
extMetrics := populateExternalMetrics(scaledObjectsMetrics)

r.ExternalMetricsInfoLock.Lock()
defer r.ExternalMetricsInfoLock.Unlock()
(*r.ExternalMetricsInfo) = extMetrics
}

func (r *MetricsScaledObjectReconciler) removeFromCache(namespacedName string) {
scaledObjectsMetricsLock.Lock()
defer scaledObjectsMetricsLock.Unlock()
delete(scaledObjectsMetrics, namespacedName)
extMetrics := populateExternalMetrics(scaledObjectsMetrics)

// the metric could have been already removed by the previous call
// in this case we don't have to rewrite r.ExternalMetricsInfo
changed := false
r.ExternalMetricsInfoLock.RLock()
if len(*r.ExternalMetricsInfo) != len(extMetrics) {
changed = true
}
r.ExternalMetricsInfoLock.RUnlock()

if changed {
r.ExternalMetricsInfoLock.Lock()
defer r.ExternalMetricsInfoLock.Unlock()
(*r.ExternalMetricsInfo) = extMetrics
}
}

func populateExternalMetrics(scaledObjectsMetrics map[string][]string) []provider.ExternalMetricInfo {
externalMetrics := []provider.ExternalMetricInfo{}
for _, metrics := range scaledObjectsMetrics {
for _, m := range metrics {
externalMetrics = append(externalMetrics, provider.ExternalMetricInfo{Metric: m})
}
}

return externalMetrics
}
3 changes: 0 additions & 3 deletions pkg/provider/fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
"sigs.k8s.io/custom-metrics-apiserver/pkg/provider"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/mock/mock_client"
Expand Down Expand Up @@ -64,8 +63,6 @@ var _ = Describe("fallback", func() {
scaleHandler = mock_scaling.NewMockScaleHandler(ctrl)
client = mock_client.NewMockClient(ctrl)
providerUnderTest = &KedaProvider{
values: make(map[provider.CustomMetricInfo]int64),
externalMetrics: make([]externalMetric, 2, 10),
client: client,
scaleHandler: scaleHandler,
watchedNamespace: "",
Expand Down
62 changes: 25 additions & 37 deletions pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"strings"
"sync"

"github.com/go-logr/logr"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -37,28 +38,28 @@ import (

// KedaProvider implements External Metrics Provider
type KedaProvider struct {
client client.Client
values map[provider.CustomMetricInfo]int64
externalMetrics []externalMetric
scaleHandler scaling.ScaleHandler
watchedNamespace string
ctx context.Context
client client.Client
scaleHandler scaling.ScaleHandler
watchedNamespace string
ctx context.Context
externalMetricsInfo *[]provider.ExternalMetricInfo
externalMetricsInfoLock *sync.RWMutex
}

type externalMetric struct{}

var logger logr.Logger
var metricsServer prommetrics.PrometheusMetricServer
var (
logger logr.Logger
metricsServer prommetrics.PrometheusMetricServer
)

// NewProvider returns an instance of KedaProvider
func NewProvider(ctx context.Context, adapterLogger logr.Logger, scaleHandler scaling.ScaleHandler, client client.Client, watchedNamespace string) provider.MetricsProvider {
func NewProvider(ctx context.Context, adapterLogger logr.Logger, scaleHandler scaling.ScaleHandler, client client.Client, watchedNamespace string, externalMetricsInfo *[]provider.ExternalMetricInfo, externalMetricsInfoLock *sync.RWMutex) provider.MetricsProvider {
provider := &KedaProvider{
values: make(map[provider.CustomMetricInfo]int64),
externalMetrics: make([]externalMetric, 2, 10),
client: client,
scaleHandler: scaleHandler,
watchedNamespace: watchedNamespace,
ctx: ctx,
client: client,
scaleHandler: scaleHandler,
watchedNamespace: watchedNamespace,
ctx: ctx,
externalMetricsInfo: externalMetricsInfo,
externalMetricsInfoLock: externalMetricsInfoLock,
}
logger = adapterLogger.WithName("provider")
logger.Info("starting")
Expand All @@ -71,9 +72,9 @@ func NewProvider(ctx context.Context, adapterLogger logr.Logger, scaleHandler sc
// Namespace can be used by the implementation for metric identification, access control or ignored.
func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, metricSelector labels.Selector, info provider.ExternalMetricInfo) (*external_metrics.ExternalMetricValueList, error) {
// Note:
// metric name and namespace is used to lookup for the CRD which contains configuration to call azure
// metric name and namespace is used to lookup for the CRD which contains configuration
// if not found then ignored and label selector is parsed for all the metrics
logger.V(1).Info("KEDA provider received request for external metrics", "namespace", namespace, "metric name", info.Metric, "metricSelector", metricSelector.String())
logger.V(1).Info("KEDA Metrics Server received request for external metrics", "namespace", namespace, "metric name", info.Metric, "metricSelector", metricSelector.String())
selector, err := labels.ConvertSelectorToLabelsMap(metricSelector.String())
if err != nil {
logger.Error(err, "Error converting Selector to Labels Map")
Expand All @@ -90,7 +91,7 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string,
if err != nil {
return nil, err
} else if len(scaledObjects.Items) != 1 {
return nil, fmt.Errorf("exactly one scaled object should match label %s", metricSelector.String())
return nil, fmt.Errorf("exactly one ScaledObject should match label %s", metricSelector.String())
}

scaledObject := &scaledObjects.Items[0]
Expand Down Expand Up @@ -144,25 +145,12 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string,

// ListAllExternalMetrics returns the supported external metrics for this provider
func (p *KedaProvider) ListAllExternalMetrics() []provider.ExternalMetricInfo {
externalMetricsInfo := []provider.ExternalMetricInfo{}
logger.V(1).Info("KEDA Metrics Server received request for list of all provided external metrics names")

// get all ScaledObjects in namespace(s) watched by the operator
scaledObjects := &kedav1alpha1.ScaledObjectList{}
opts := []client.ListOption{
client.InNamespace(p.watchedNamespace),
}
err := p.client.List(p.ctx, scaledObjects, opts...)
if err != nil {
logger.Error(err, "Cannot get list of ScaledObjects", "WatchedNamespace", p.watchedNamespace)
return nil
}
p.externalMetricsInfoLock.RLock()
defer p.externalMetricsInfoLock.RUnlock()
externalMetricsInfo := *p.externalMetricsInfo

// get metrics from all watched ScaledObjects
for _, scaledObject := range scaledObjects.Items {
for _, metric := range scaledObject.Status.ExternalMetricNames {
externalMetricsInfo = append(externalMetricsInfo, provider.ExternalMetricInfo{Metric: metric})
}
}
return externalMetricsInfo
}

Expand Down

0 comments on commit 988329b

Please sign in to comment.