From 1b38a7fe1fc7c05535c96955fd043a99e9fb22f1 Mon Sep 17 00:00:00 2001 From: Daniel Imberman Date: Wed, 22 Jan 2020 16:03:00 -0800 Subject: [PATCH] postgres scaler (#553) --- .../crds/keda.k8s.io_scaledobjects_crd.yaml | 48 +++-- go.mod | 1 + go.sum | 2 + pkg/handler/scale_handler.go | 2 + pkg/scalers/postgres_scaler.go | 201 ++++++++++++++++++ 5 files changed, 238 insertions(+), 16 deletions(-) create mode 100644 pkg/scalers/postgres_scaler.go diff --git a/deploy/crds/keda.k8s.io_scaledobjects_crd.yaml b/deploy/crds/keda.k8s.io_scaledobjects_crd.yaml index 22c77e10ade..a254ce9965b 100644 --- a/deploy/crds/keda.k8s.io_scaledobjects_crd.yaml +++ b/deploy/crds/keda.k8s.io_scaledobjects_crd.yaml @@ -1087,12 +1087,13 @@ spec: type: string port: anyOf: - - type: string - type: integer + - type: string description: Name or number of the port to access on the container. Number must be in the range 1 to 65535. Name must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true scheme: description: Scheme to use for connecting to the host. Defaults to HTTP. @@ -1112,12 +1113,13 @@ spec: type: string port: anyOf: - - type: string - type: integer + - type: string description: Number or name of the port to access on the container. Number must be in the range 1 to 65535. Name must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true required: - port type: object @@ -1192,12 +1194,13 @@ spec: type: string port: anyOf: - - type: string - type: integer + - type: string description: Name or number of the port to access on the container. Number must be in the range 1 to 65535. Name must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true scheme: description: Scheme to use for connecting to the host. Defaults to HTTP. @@ -1217,12 +1220,13 @@ spec: type: string port: anyOf: - - type: string - type: integer + - type: string description: Number or name of the port to access on the container. Number must be in the range 1 to 65535. Name must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true required: - port type: object @@ -1291,12 +1295,13 @@ spec: type: string port: anyOf: - - type: string - type: integer + - type: string description: Name or number of the port to access on the container. Number must be in the range 1 to 65535. Name must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true scheme: description: Scheme to use for connecting to the host. Defaults to HTTP. @@ -1334,12 +1339,13 @@ spec: type: string port: anyOf: - - type: string - type: integer + - type: string description: Number or name of the port to access on the container. Number must be in the range 1 to 65535. Name must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true required: - port type: object @@ -1464,12 +1470,13 @@ spec: type: string port: anyOf: - - type: string - type: integer + - type: string description: Name or number of the port to access on the container. Number must be in the range 1 to 65535. Name must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true scheme: description: Scheme to use for connecting to the host. Defaults to HTTP. @@ -1507,12 +1514,13 @@ spec: type: string port: anyOf: - - type: string - type: integer + - type: string description: Number or name of the port to access on the container. Number must be in the range 1 to 65535. Name must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true required: - port type: object @@ -2173,12 +2181,13 @@ spec: type: string port: anyOf: - - type: string - type: integer + - type: string description: Name or number of the port to access on the container. Number must be in the range 1 to 65535. Name must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true scheme: description: Scheme to use for connecting to the host. Defaults to HTTP. @@ -2198,12 +2207,13 @@ spec: type: string port: anyOf: - - type: string - type: integer + - type: string description: Number or name of the port to access on the container. Number must be in the range 1 to 65535. Name must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true required: - port type: object @@ -2278,12 +2288,13 @@ spec: type: string port: anyOf: - - type: string - type: integer + - type: string description: Name or number of the port to access on the container. Number must be in the range 1 to 65535. Name must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true scheme: description: Scheme to use for connecting to the host. Defaults to HTTP. @@ -2303,12 +2314,13 @@ spec: type: string port: anyOf: - - type: string - type: integer + - type: string description: Number or name of the port to access on the container. Number must be in the range 1 to 65535. Name must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true required: - port type: object @@ -2377,12 +2389,13 @@ spec: type: string port: anyOf: - - type: string - type: integer + - type: string description: Name or number of the port to access on the container. Number must be in the range 1 to 65535. Name must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true scheme: description: Scheme to use for connecting to the host. Defaults to HTTP. @@ -2420,12 +2433,13 @@ spec: type: string port: anyOf: - - type: string - type: integer + - type: string description: Number or name of the port to access on the container. Number must be in the range 1 to 65535. Name must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true required: - port type: object @@ -2550,12 +2564,13 @@ spec: type: string port: anyOf: - - type: string - type: integer + - type: string description: Name or number of the port to access on the container. Number must be in the range 1 to 65535. Name must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true scheme: description: Scheme to use for connecting to the host. Defaults to HTTP. @@ -2593,12 +2608,13 @@ spec: type: string port: anyOf: - - type: string - type: integer + - type: string description: Number or name of the port to access on the container. Number must be in the range 1 to 65535. Name must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true required: - port type: object diff --git a/go.mod b/go.mod index 875d84fffe3..7d06608b36e 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,7 @@ require ( github.com/golang/protobuf v1.3.2 github.com/imdario/mergo v0.3.8 github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20190918110929-3d9be26a50eb + github.com/lib/pq v1.3.0 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/operator-framework/operator-sdk v0.0.0-00010101000000-000000000000 github.com/pkg/errors v0.8.1 diff --git a/go.sum b/go.sum index 36b3f36c9a9..b5a0528f869 100644 --- a/go.sum +++ b/go.sum @@ -403,6 +403,8 @@ github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LE github.com/leanovate/gopter v0.2.4/go.mod h1:gNcbPWNEWRe4lm+bycKqxUYoH5uoVje5SkOJ3uoLer8= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.3.0 h1:/qkRGz8zljWiDcFvgpwUpwIAPu3r07TDvs3Rws+o/pU= +github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lightstep/lightstep-tracer-go v0.15.6/go.mod h1:6AMpwZpsyCFwSovxzM78e+AsYxE8sGwiM6C3TytaWeI= github.com/lovoo/gcloud-opentracing v0.3.0/go.mod h1:ZFqk2y38kMDDikZPAK7ynTTGuyt17nSPdS3K5e+ZTBY= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= diff --git a/pkg/handler/scale_handler.go b/pkg/handler/scale_handler.go index af030378819..cbcc0702f85 100644 --- a/pkg/handler/scale_handler.go +++ b/pkg/handler/scale_handler.go @@ -296,6 +296,8 @@ func (h *ScaleHandler) getScaler(name, namespace, triggerType string, resolvedEn return scalers.NewHuaweiCloudeyeScaler(triggerMetadata, authParams) case "azure-blob": return scalers.NewAzureBlobScaler(resolvedEnv, triggerMetadata, authParams, podIdentity) + case "postgres": + return scalers.NewPostgresScaler(resolvedEnv, triggerMetadata, authParams) default: return nil, fmt.Errorf("no scaler found for type: %s", triggerType) } diff --git a/pkg/scalers/postgres_scaler.go b/pkg/scalers/postgres_scaler.go new file mode 100644 index 00000000000..65c840a69de --- /dev/null +++ b/pkg/scalers/postgres_scaler.go @@ -0,0 +1,201 @@ +package scalers + +import ( + "context" + "database/sql" + "fmt" + _ "github.com/lib/pq" + "k8s.io/api/autoscaling/v2beta1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +const ( + pgMetricName = "num" + defaultPostgresPassword = "" +) + +type postGRESScaler struct { + metadata *postGRESMetadata + connection *sql.DB +} + +type postGRESMetadata struct { + connStr string + userName string + password string + host string + port string + query string + dbName string + sslmode string +} + +var postgresLog = logf.Log.WithName("postgres_scaler") + +// NewPostgresScaler creates a new postgres scaler +func NewPostgresScaler(resolvedEnv, metadata, authParams map[string]string) (Scaler, error) { + meta, err := parsePostgresMetadata(resolvedEnv, metadata, authParams) + if err != nil { + return nil, fmt.Errorf("error parsing postgres metadata: %s", err) + } + + conn, err := getConnection(meta) + if err != nil { + return nil, fmt.Errorf("error establishing postgres connection: %s", err) + } + return &postGRESScaler{ + metadata: meta, + connection: conn, + }, nil +} + +func parsePostgresMetadata(resolvedEnv, metadata, authParams map[string]string) (*postGRESMetadata, error) { + meta := postGRESMetadata{} + + if val, ok := metadata["query"]; ok { + meta.query = val + } else { + return nil, fmt.Errorf("no query given") + } + + if val, ok := authParams["connStr"]; ok { + meta.connStr = val + "?sslmode=disable" + } else if val, ok := metadata["connStr"]; ok { + hostSetting := val + + if val, ok := resolvedEnv[hostSetting]; ok { + meta.connStr = val + "?sslmode=disable" + } + } else { + meta.connStr = "" + if val, ok := metadata["host"]; ok { + meta.host = val + } else { + return nil, fmt.Errorf("no host given") + } + if val, ok := metadata["port"]; ok { + meta.port = val + } else { + return nil, fmt.Errorf("no port given") + } + + if val, ok := metadata["userName"]; ok { + meta.userName = val + } else { + return nil, fmt.Errorf("no username given") + } + if val, ok := metadata["dbName"]; ok { + meta.dbName = val + } else { + return nil, fmt.Errorf("no dbname given") + } + if val, ok := metadata["sslmode"]; ok { + meta.sslmode = val + } else { + return nil, fmt.Errorf("no sslmode name given") + } + meta.password = defaultPostgresPassword + if val, ok := authParams["password"]; ok { + meta.password = val + } else if val, ok := metadata["password"]; ok && val != "" { + if passd, ok := resolvedEnv[val]; ok { + meta.password = passd + } + } + } + + return &meta, nil +} + +func getConnection(meta *postGRESMetadata) (*sql.DB, error) { + var connStr string + if meta.connStr != "" { + connStr = meta.connStr + } else { + connStr = fmt.Sprintf( + "host=%s port=%s user=%s dbname=%s sslmode=%s password=%s", + meta.host, + meta.port, + meta.userName, + meta.dbName, + meta.sslmode, + meta.password, + ) + } + db, err := sql.Open("postgres", connStr) + if err != nil { + postgresLog.Error(err, fmt.Sprintf("Found error opening: %s", err)) + return nil, err + } + err = db.Ping() + if err != nil { + postgresLog.Error(err, fmt.Sprintf("Found error pinging: %s", err)) + return nil, err + } + return db, nil +} + +// Close disposes of postgres connections +func (s *postGRESScaler) Close() error { + err := s.connection.Close() + if err != nil { + postgresLog.Error(err, "Error closing postgres connection") + return err + } + return nil +} + +// IsActive returns true if there are pending messages to be processed +func (s *postGRESScaler) IsActive(ctx context.Context) (bool, error) { + messages, err := s.getActiveNumber() + if err != nil { + return false, fmt.Errorf("error inspecting postgres: %s", err) + } + + return messages > 0, nil +} + +func (s *postGRESScaler) getActiveNumber() (int, error) { + var id int + postgresLog.Info(fmt.Sprintf("Inspecting with query: %s", s.metadata.query)) + err := s.connection.QueryRow(s.metadata.query).Scan(&id) + if err != nil { + postgresLog.Error(err, fmt.Sprintf("could not query PG: %s", err)) + return 0, fmt.Errorf("could not query PG: %s", err) + } + postgresLog.Info(fmt.Sprintf("Num expected: %d", id)) + return id, nil +} + +// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler +func (s *postGRESScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { + targetListLengthQty := resource.NewQuantity(1, resource.DecimalSI) + externalMetric := &v2beta1.ExternalMetricSource{ + MetricName: pgMetricName, + TargetAverageValue: targetListLengthQty, + } + metricSpec := v2beta1.MetricSpec{ + External: externalMetric, Type: externalMetricType, + } + return []v2beta1.MetricSpec{metricSpec} +} + +// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric +func (s *postGRESScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + num, err := s.getActiveNumber() + if err != nil { + return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting postgres: %s", err) + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: pgMetricName, + Value: *resource.NewQuantity(int64(num), resource.DecimalSI), + Timestamp: metav1.Now(), + } + + return append([]external_metrics.ExternalMetricValue{}, metric), nil +}